見出し画像

「Amazon価格ツイート」アプリ作成⑦ 基本動作完成

どうも、タケオです。

今回は今まで作成した処理をまとめて、データの収集からTwitterへの投稿まで一連の処理を実行してみたいと思います。

はじめに

DB設計については詳細は公開していないので、ソース内に書かれたクエリを元に想像していただければ。
基本的には以下のテーブルを用意しました。

・アカウントマスタ (account_mst)
 複数のTwitterアカウントを管理できるように、それぞれのAPI KEYなどを登録しています。
・検索条件マスタ (search_condition)
 指定の商品や金額などを登録しています。
・ツイート済み管理テーブル (tweet_history)
 一度送信した内容を再度送信しないように履歴によって管理しています。

メイン処理

新規予約情報を取得する処理のみ作成しています。
※デバッグ用のprint文は残したままにしてあります。

from Lib import AmazonAPI
from Lib import dbAccess
from Lib import TweetAPI
import datetime


# Amazon API
SERVICE_NAME   = "ProductAdvertisingAPI"
REGION         = "us-west-2"
PARTNER_TAG    = "パートナータグ"
ACCESS_KEY     = "アクセスキー"
SECRET_KEY     = "シークレットキー"

HOST           = "webservices.amazon.co.jp"
SORT_BY        = "NewestArrivals"

#ITEM_COUNT     = 0  #取得する情報数(最大数:10)
#ITEM_PAGE      = 0  #取得するページ番号

def main():
    
    # Amazon API 初期化
    aws = AmazonAPI.AwsV4(ACCESS_KEY, SECRET_KEY)
    aws.setServiceName(SERVICE_NAME)
    aws.setRegionName(REGION)
    aws.setPartnerTag(PARTNER_TAG)
    aws.setHost(HOST)
    aws.setSortBy(SORT_BY)
    #aws.setItemCount(ITEM_COUNT)
    #aws.setItemPage(ITEM_PAGE)

    today = datetime.datetime.now()

    # DB接続
    con = dbAccess.dbAccessor()

    # ユーザーリスト取得
    user_list = getUserList(con)

    # ユーザー別検索
    for user in user_list:
        print("new user")
        # Twitter API 呼び出し
        tweet = TweetAPI.Tweet(user[1], user[2], user[3], user[4])
        # 検索条件取得
        search_list = getSearchList(con, user)

        # 検索条件
        for search in search_list:
            # Amazonの商品情報を取得
            match search[1]:
                case 1: # Price Down
                    print("Price Down")

                case 2: # New Release
                    print("New Release")
                    responses = aws.SearchItems(search[3], search[4], search[5])
                    if ('Errors' in responses):
                        # データ取得エラーのため次へ
                        print("Errors: " + search[3] + "/" + search[4])
                        continue
                    if ("SearchResult" in responses):
                        # 逆順にする(ツイートの順番を古い順に)
                        rev_response =responses["SearchResult"]["Items"]
                        rev_response =list(reversed(rev_response))
                        # 商品毎に処理
                        for item in rev_response:
                            # 出版日情報が含まれている場合のみ
                            if "ProductInfo" in item["ItemInfo"]:
                                if "ReleaseDate" in item["ItemInfo"]["ProductInfo"]:
                                    r_date  = item["ItemInfo"]["ProductInfo"]["ReleaseDate"]["DisplayValue"]
                                    release_date = datetime.datetime.strptime(r_date, '%Y-%m-%dT%H:%M:%SZ')
                                    # 出版日が未来の場合
                                    if today < release_date:
                                        # ツイート済みチェック
                                        if not checkSendTweet(con, user, search, item):
                                            # ツイート内容を作成
                                            message = ''
                                            message += item["ItemInfo"]["Title"]["DisplayValue"] + "\r\n"
                                            message += item["ItemInfo"]["ByLineInfo"]["Contributors"][0]["Name"] + "\r\n"
                                            message += item["Offers"]["Listings"][0]["Availability"]["Message"] + "\r\n"
                                            message += item["DetailPageURL"] + "\r\n"
                                    
                                            # Tweet
                                            print(message)
                                            tweet.sendTwitter(message)
                                    
                                            # 送信済み登録
                                            regSendTweetHistory(con, user, search, item)

                case 3: # Start Time Sale
                    print("Start Time Sale")

                case 4: # Inventory Back
                    print("Inventory Back")

    
# -----------------------------------
# ユーザーリスト取得
#
# Twitterユーザーのアクセスリストを返す
# -----------------------------------
def getUserList(con):
    
    if con.is_connected:
        # ユーザーリスト取得
        sql = ('''
        SELECT
	        id_account,
            api_key,
            api_key_secret,
            access_token,
            access_token_secret
         FROM account_mst
         WHERE stop_flg = 0 AND del_flg = 0
         ORDER BY id_account
        ''')
        result = con.excecuteQuery(sql)
    
    # 結果
    return result

# -----------------------------------
# ユーザー別検索
#
# ユーザー毎の検索条件を取得
# -----------------------------------
def getSearchList(con, user):
    
    if con.is_connected:
        # 検索条件取得
        sql = ('''
        SELECT 
            id_search, 
            search_type, 
            asin,
            key_word,
            brand,
            author,
            target_price
        FROM amazontweet.search_condition
        WHERE del_flg = 0 AND disable_flg = 0 AND id_account = {0}
        ORDER BY id_search
        ''')
        sql = sql.format(user[0])

        result = con.excecuteQuery(sql)
    
    # 結果
    return result

# -----------------------------------
# ツイート済みチェック
#
# すでに同じ内容のツイートを行っているか確認する。
# -----------------------------------
def checkSendTweet(con, user, search, item):
    
    exist = False

    if con.is_connected:
        # ツイート履歴取得
        sql = "SELECT id_history "
        sql += "FROM tweet_history "
        sql += "WHERE del_flg = 0 "
        sql += "  AND disable_flg = 0 "
        sql += "  AND id_account = " + str(user[0]) + " "
        sql += "  AND search_type = " + str(search[1]) + " "
        sql += "  AND asin = '" + item["ASIN"] + "'"

        result = con.excecuteQuery(sql)
        
        if len(result):
            if result[0][0] > 0:
                exist = True
                
    # 結果
    return exist

def regSendTweetHistory(con, user, search, item):
    
    count = 0

    if con.is_connected:
        # ツイート履歴登録
        sql = "INSERT INTO tweet_history (id_account, search_type, id_search, asin, item_url, title, message, amount, public_date, release_date, entry_date)" + "\n"
        sql += " VALUES ("
        sql += " " + str(user[0]) + ", "    # id_account
        sql += " " + str(search[1]) + ", "  # search_type
        sql += " " + str(search[0]) + ", "  # id_search
        sql += "'" + item["ASIN"] + "', "   # asin
        sql += "'" + item["DetailPageURL"] + "', "   # item_url
        sql += "'" + cutString(item["ItemInfo"]["Title"]["DisplayValue"],100) + "', "   # title
        sql += "'" + cutString(item["Offers"]["Listings"][0]["Availability"]["Message"],100) + "', "   # message
        sql += " " + str(item["Offers"]["Listings"][0]["Price"]["Amount"]) + ", "   # amount
        if "PublicationDate" in item["ItemInfo"]["ContentInfo"]:    # public_date
            sql += "'" + normalDateFormat(item["ItemInfo"]["ContentInfo"]["PublicationDate"]["DisplayValue"]) + "', "
        else:
            sql += "'',"
        if "ReleaseDate" in item["ItemInfo"]["ProductInfo"]:        # release_date
            sql += "'" + normalDateFormat(item["ItemInfo"]["ProductInfo"]["ReleaseDate"]["DisplayValue"]) + "', "
        else:
            sql += "'',"
        sql += "'" + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + "' "   # entry_date
        sql += ")"
    
        count = con.excecuteInsert(sql)
          
    # 結果
    return count

# -----------------------------------
# 日付フォーマット変更
#
# Amazonの日付からシステム標準に変更
# -----------------------------------
def normalDateFormat(date):

    return datetime.datetime.strptime(date, '%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m-%d')

# -----------------------------------
# 文字長調整
#
# LEFT関数の代替処理
# -----------------------------------
def cutString(args, num):

    if len(args) > num:
       return args[0: num]
    else:
        return args


# -----------------------------------
# メイン処理呼び出し
# -----------------------------------
if __name__ == '__main__':
    main()

DBアクセスクラス

とても使いやすくまとまった処理を公開していただいていたので、こちらを参考(コピー)にさせていただきました。

DBへのアクセスが自分だけだったため、接続情報をファイル内に記載する形を採っていますが、本来はよろしくない処理ですのでご了承ください。

import mysql.connector

#
# DBアクセス管理クラス
#
class dbAccessor:
    
    DB_HOST = 'サーバー名'
    DB_USER = 'ユーザー名'
    DB_PWD  = 'パスワード'
    DB_NAME = 'DB名'

    # -----------------------------------
    # コンストラクタ
    #
    # コネクションを取得し、クラス変数にカーソルを保持する。
    # -----------------------------------
    def __init__(self, hostName=DB_HOST, user=DB_USER, password=DB_PWD, dbName=DB_NAME):
        print("start:__init__")

        try:
            # DBに接続する
            self.conn = mysql.connector.connect(
                host     = hostName,
                user     = user,
                password = password,
                database = dbName,
            )

            # コネクションの設定
            self.conn.autocommit = False

            # カーソル情報をクラス変数に格納
            self.conn.is_connected()
            self.cur = self.conn.cursor()
        except (mysql.connector.errors.ProgrammingError) as e:
            print(e)

        print("end:__init__")

    # -----------------------------------
    # クエリの実行
    #
    # クエリを実行し、取得結果を呼び出し元に通知する。
    # -----------------------------------
    def excecuteQuery(self, sql):
        print("start:excecuteQuery")

        try:
            self.cur.execute(sql)
            rows = self.cur.fetchall()
            return rows
        except (mysql.connector.errors.ProgrammingError) as e:
            print(e)

        print("end:excecuteQuery")

    # -----------------------------------
    # インサートの実行
    #
    # インサートを実行する。
    # -----------------------------------
    def excecuteInsert(self, sql):
        print("start:excecuteInsert")

        try:
            self.cur.execute(sql)
            self.conn.commit()
            return self.cur.rowcount
        except (mysql.connector.errors.ProgrammingError) as e:
            self.conn.rollback()
            print(e)

        print("end:excecuteInsert")

    # -----------------------------------
    # アップデートの実行
    #
    # アップデートを実行する。
    # -----------------------------------
    def excecuteUpdate(self, sql):
        print("start:excecuteUpdate")

        try:
            self.cur.execute(sql)
            self.conn.commit()
            return self.cur.rowcount
        except (mysql.connector.errors.ProgrammingError) as e:
            self.conn.rollback()
            print(e)

        print("end:excecuteUpdate")

    # -----------------------------------
    # デリートの実行
    #
    # デリートを実行する。
    # -----------------------------------
    def excecuteDelete(self, sql):
        print("start:excecuteDelete")

        try:
            self.cur.execute(sql)
            self.conn.commit()
            return self.cur.rowcount
        except (mysql.connector.errors.ProgrammingError) as e:
            self.conn.rollback()
            print(e)

        print("end:excecuteDelete")
        
    # -----------------------------------
    # コネクションチェック
    #
    # コネクションの接続状態を返す
    # -----------------------------------
    def is_connected(self):
        return self.conn.is_connected

    # -----------------------------------
    # デストラクタ
    #
    # コネクションを解放する。
    # -----------------------------------
    def __del__(self):
        print("start:__del__")
        try:
            self.conn.close()
        except (mysql.connector.errors.ProgrammingError) as e:
            print(e)
        print("end:__del__")

Amzon APIクラス

メイン処理もそうですが、今回は一部処理のみ対応させています。

import hmac
import hashlib
import datetime
import json
import requests

class AwsV4:
        
    # -----------------------------------
    # コンストラクタ
    #
    # Amazon APIを初期化する。
    # -----------------------------------
    def __init__(self, accessKey="", secretKey=""):
        self.accessKey       = accessKey
        self.secretKey       = secretKey
        self.regionName      = ""
        self.serviceName     = "";
        self.httpMethodName  = "POST"

        self.path            = ""
        self.queryParametes  = {}
        self.awsHeaders      = {}
        self.payload         = ""
        
        self.itemCount       = 0
        self.itemPage        = 0
        
        self.HMACAlgorithm   = "AWS4-HMAC-SHA256"
        self.aws4Request     = "aws4_request"
        self.strSignedHeader = ""
        self.xAmzDate        = self.__getTimeStamp()
        self.currentDate     = self.__getDate()
    
    def setApiKey(self, accessKey, secretKey):
        self.accessKey = accessKey
        self.secretKey = secretKey

    def setPartnerTag(self, partnerTag):
        self.partnerTag = partnerTag

    def setPath(self, path):
        self.path = path

    def setServiceName(self, serviceName):
        self.serviceName = serviceName;
        
    def setRegionName(self, regionName):
        self.regionName = regionName
        
    def setPayload(self, payload):
        self.payload = payload
        
    def setRequestMethod(self, method):
        self.httpMethodName = method
        
    def setHost(self, host):
        self.host = host
        
    def setSortBy(self, sortBy):
        self.sortBy = sortBy
        
    def setItemCount(self, itemCount):
        self.itemCount = itemCount
        
    def setItemPage(self, itemPage):
        self.itemPage = itemPage
                
    def addHeader(self, headerName, headerValue):
        self.awsHeaders[headerName] = headerValue

    def clearHeader(self):
        self.awsHeaders = {}

    def __prepareCanonicalRequest(self):
        canonicalURL = ""
        canonicalURL += self.httpMethodName + "\n"
        canonicalURL += self.path + "\n" + "\n"
        signedHeaders = ''
        for key, value in self.awsHeaders.items():
            signedHeaders += key + ";"
            canonicalURL += key + ":" + value + "\n"
        canonicalURL += "\n"
        self.strSignedHeader = signedHeaders[0:len(signedHeaders)-1] 
        canonicalURL += self.strSignedHeader + "\n"
        canonicalURL += self.__generateHex(self.payload)
        return canonicalURL
    
    def __prepareStringToSign(self, canonicalURL):
        stringToSign = ''
        stringToSign += self.HMACAlgorithm + "\n"
        stringToSign += self.xAmzDate + "\n"
        stringToSign += self.currentDate + "/" + self.regionName + "/" + self.serviceName + "/" + self.aws4Request + "\n"
        stringToSign += self.__generateHex(canonicalURL)
        return stringToSign
    
    def __calculateSignature(self, stringToSign):    
        signatureKey = self.__getSignatureKey (self.secretKey, self.currentDate, self.regionName, self.serviceName)
        signature = hmac.new(signatureKey, stringToSign.encode('utf-8'), hashlib.sha256).hexdigest()
        return signature;

    def getHeaders(self):
        self.awsHeaders['x-amz-date'] = self.xAmzDate
        self.awsHeaders = sorted(self.awsHeaders.items())           #ソート
        self.awsHeaders = dict((x, y) for x, y in self.awsHeaders)  #タプルを辞書に変換

        # Step 1: CREATE A CANONICAL REQUEST
        canonicalURL = self.__prepareCanonicalRequest()

        # Step 2: CREATE THE STRING TO SIGN
        stringToSign = self.__prepareStringToSign(canonicalURL)

        # Step 3: CALCULATE THE SIGNATURE
        signature = self.__calculateSignature(stringToSign)
        
        # Step 4: CALCULATE AUTHORIZATION HEADER
        if signature != "":
            self.awsHeaders['Authorization'] = self.__buildAuthorizationString(signature)
            return self.awsHeaders

    def __buildAuthorizationString(self, strSignature):
        return self.HMACAlgorithm + " " + "Credential=" + self.accessKey + "/" + self.__getDate() + "/" + self.regionName + "/" + self.serviceName + "/" + self.aws4Request + "," + "SignedHeaders=" + self.strSignedHeader + "," + "Signature=" + strSignature;

    def __generateHex(self, data):
        return hashlib.sha256(data.encode('utf-8')).hexdigest()
    
    def __sign(self, key, msg):
        return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
  
    def __getSignatureKey(self, key, date, regionName, serviceName):
        strSecret = "AWS4" + key        
        kDate    = self.__sign(strSecret.encode('utf-8'), date)
        kRegion  = self.__sign(kDate, regionName)
        kService = self.__sign(kRegion, serviceName)
        kSigning = self.__sign(kService, self.aws4Request)
        return kSigning
    
    def __getTimeStamp(self):
        return datetime.datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')
    
    def __getDate(self):
        return datetime.datetime.utcnow().strftime('%Y%m%d')
    
    # -----------------------------------
    # SearchItems
    #
    # 条件に合致した商品情報を取得する。
    # -----------------------------------
    def SearchItems(self, key_word, brand, author):
         # create payload
        payload = "{"
        payload += " \"Keywords\": \"" + key_word + "\","
        payload += " \"Resources\": ["
        payload += "  \"ItemInfo.ByLineInfo\","
        payload += "  \"ItemInfo.ContentInfo\","
        payload += "  \"ItemInfo.ContentRating\","
        payload += "  \"ItemInfo.Classifications\","
        payload += "  \"ItemInfo.ExternalIds\","
        payload += "  \"ItemInfo.Features\","
        payload += "  \"ItemInfo.ManufactureInfo\","
        payload += "  \"ItemInfo.ProductInfo\","
        payload += "  \"ItemInfo.TechnicalInfo\","
        payload += "  \"ItemInfo.Title\","
        payload += "  \"ItemInfo.TradeInInfo\","
        payload += "  \"Offers.Listings.Availability.Message\","
        payload += "  \"Offers.Listings.Availability.Type\","
        payload += "  \"Offers.Listings.Condition\","
        payload += "  \"Offers.Listings.MerchantInfo\","
        payload += "  \"Offers.Listings.Price\","
        payload += "  \"Offers.Listings.ProgramEligibility.IsPrimeExclusive\","
        payload += "  \"Offers.Listings.ProgramEligibility.IsPrimePantry\","
        payload += "  \"Offers.Listings.SavingBasis\","
        payload += "  \"Offers.Summaries.HighestPrice\","
        payload += "  \"Offers.Summaries.LowestPrice\","
        payload += "  \"Offers.Summaries.OfferCount\","
        payload += "  \"ParentASIN\""
        payload += " ],"
        if brand != "":
            payload += " \"Brand\": \"" + brand + "\","
        if author != "":
            payload += " \"Author\": \"" + author + "\","
        payload += " \"SortBy\": \"" + self.sortBy + "\","
        if self.itemCount > 0:
            payload += " \"ItemCount\": " + str(self.itemCount) + ","
        if self.itemPage > 0:
            payload += " \"ItemPage\": " + str(self.itemPage) + ","
        payload += " \"Merchant\": \"Amazon\","
        payload += " \"PartnerTag\": \"" + self.partnerTag + "\","
        payload += " \"PartnerType\": \"Associates\","
        payload += " \"Marketplace\": \"www.amazon.co.jp\","
        payload += " \"Operation\": \"SearchItems\""
        payload += "}"

        #print(payload)

        return self.getResponse(payload, "SearchItems", "getitems")
    

    def getResponse(self, payload, operation, uriPath):
        # set param
        self.setPath("/paapi5/" + uriPath)
        self.setPayload (payload)
        # create header
        self.clearHeader()
        self.addHeader('content-encoding', 'amz-1.0')
        self.addHeader('content-type', 'application/json; charset=utf-8')
        self.addHeader('host', self.host)
        self.addHeader('x-amz-target', 'com.amazon.paapi5.v1.ProductAdvertisingAPIv1.' + operation)
        headers = self.getHeaders()

        try:
            response = requests.post('https://' + self.host + "/paapi5/" + uriPath, headers=headers, data=payload.encode("utf-8"))
            json_response = json.loads(response.text)
            return json_response
        except :
            pass
            return {"Result": "error"}

Twitter APIクラス

API KEYなどはDBに登録してあるものを利用しています。

import tweepy

#
# Twitterアクセス管理クラス
#


class Tweet:
        
    # -----------------------------------
    # コンストラクタ
    #
    # 認証済みのTwitter APIを用意する。
    # -----------------------------------
    def __init__(self, api_key, api_secret, access_token, access_token_secret):
        
        try:
            # Twitterライブラリの呼び出し
            auth = tweepy.OAuthHandler(api_key, api_secret)
            auth.set_access_token(access_token, access_token_secret)
            self.api = tweepy.API(auth)
            
        except Exception as e:
            print(f"Tweet init Error: {e}")

            
    # -----------------------------------
    # ツイートの送信
    #
    # 作成したメッセージをツイートする
    # -----------------------------------
    def sendTwitter(self, msg):
                
        # メッセージをツイート
        try:
            if msg != '':
                self.api.update_status(msg)

        except Exception as e:
            print(f"sendTwitter Error: {e}")

実行結果

新規予約情報を投稿するため、実行当日よりも未来に発行予定のものがあればTwitterに投稿し、すでに発売中の物については無視するようになっています。
また、一度投稿したものは再度投稿しないように送信済みチェックを行なっていますので、同じ内容が2回ツイートされることはありません。


というわけで

汚いソースを公開するのは恥ずかしいですが、何か皆様にも得るものがあればと恥を偲んで公開してみました。
処理には十分なエラー対策が含まれておりませんので参考にされる場合はご注意ください。
当然ながらこのソースを利用したことによって何らかの問題が発生しても責任は負えませんので何卒ご了承ください。

ひと段落するところまでは動かせたので次は何をしましょう。ログ出力や全体的なエラー対策も必要なところではありますが。

この記事が気に入ったらサポートをしてみませんか?