「Amazon価格ツイート」アプリ作成⑦ 基本動作完成
どうも、タケオです。
今回は今まで作成した処理をまとめて、データの収集からTwitterへの投稿まで一連の処理を実行してみたいと思います。
はじめに
DB設計については詳細は公開していないので、ソース内に書かれたクエリを元に想像していただければ。
基本的には以下のテーブルを用意しました。
・アカウントマスタ (account_mst)
複数のTwitterアカウントを管理できるように、それぞれのAPI KEYなどを登録しています。
・検索条件マスタ (search_condition)
指定の商品や金額などを登録しています。
・ツイート済み管理テーブル (tweet_history)
一度送信した内容を再度送信しないように履歴によって管理しています。
メイン処理
新規予約情報を取得する処理のみ作成しています。
※デバッグ用のprint文は残したままにしてあります。
from Lib import AmazonAPI
from Lib import dbAccess
from Lib import TweetAPI
import datetime
# Amazon API
SERVICE_NAME = "ProductAdvertisingAPI"
REGION = "us-west-2"
PARTNER_TAG = "パートナータグ"
ACCESS_KEY = "アクセスキー"
SECRET_KEY = "シークレットキー"
HOST = "webservices.amazon.co.jp"
SORT_BY = "NewestArrivals"
#ITEM_COUNT = 0 #取得する情報数(最大数:10)
#ITEM_PAGE = 0 #取得するページ番号
def main():
# Amazon API 初期化
aws = AmazonAPI.AwsV4(ACCESS_KEY, SECRET_KEY)
aws.setServiceName(SERVICE_NAME)
aws.setRegionName(REGION)
aws.setPartnerTag(PARTNER_TAG)
aws.setHost(HOST)
aws.setSortBy(SORT_BY)
#aws.setItemCount(ITEM_COUNT)
#aws.setItemPage(ITEM_PAGE)
today = datetime.datetime.now()
# DB接続
con = dbAccess.dbAccessor()
# ユーザーリスト取得
user_list = getUserList(con)
# ユーザー別検索
for user in user_list:
print("new user")
# Twitter API 呼び出し
tweet = TweetAPI.Tweet(user[1], user[2], user[3], user[4])
# 検索条件取得
search_list = getSearchList(con, user)
# 検索条件
for search in search_list:
# Amazonの商品情報を取得
match search[1]:
case 1: # Price Down
print("Price Down")
case 2: # New Release
print("New Release")
responses = aws.SearchItems(search[3], search[4], search[5])
if ('Errors' in responses):
# データ取得エラーのため次へ
print("Errors: " + search[3] + "/" + search[4])
continue
if ("SearchResult" in responses):
# 逆順にする(ツイートの順番を古い順に)
rev_response =responses["SearchResult"]["Items"]
rev_response =list(reversed(rev_response))
# 商品毎に処理
for item in rev_response:
# 出版日情報が含まれている場合のみ
if "ProductInfo" in item["ItemInfo"]:
if "ReleaseDate" in item["ItemInfo"]["ProductInfo"]:
r_date = item["ItemInfo"]["ProductInfo"]["ReleaseDate"]["DisplayValue"]
release_date = datetime.datetime.strptime(r_date, '%Y-%m-%dT%H:%M:%SZ')
# 出版日が未来の場合
if today < release_date:
# ツイート済みチェック
if not checkSendTweet(con, user, search, item):
# ツイート内容を作成
message = ''
message += item["ItemInfo"]["Title"]["DisplayValue"] + "\r\n"
message += item["ItemInfo"]["ByLineInfo"]["Contributors"][0]["Name"] + "\r\n"
message += item["Offers"]["Listings"][0]["Availability"]["Message"] + "\r\n"
message += item["DetailPageURL"] + "\r\n"
# Tweet
print(message)
tweet.sendTwitter(message)
# 送信済み登録
regSendTweetHistory(con, user, search, item)
case 3: # Start Time Sale
print("Start Time Sale")
case 4: # Inventory Back
print("Inventory Back")
# -----------------------------------
# ユーザーリスト取得
#
# Twitterユーザーのアクセスリストを返す
# -----------------------------------
def getUserList(con):
if con.is_connected:
# ユーザーリスト取得
sql = ('''
SELECT
id_account,
api_key,
api_key_secret,
access_token,
access_token_secret
FROM account_mst
WHERE stop_flg = 0 AND del_flg = 0
ORDER BY id_account
''')
result = con.excecuteQuery(sql)
# 結果
return result
# -----------------------------------
# ユーザー別検索
#
# ユーザー毎の検索条件を取得
# -----------------------------------
def getSearchList(con, user):
if con.is_connected:
# 検索条件取得
sql = ('''
SELECT
id_search,
search_type,
asin,
key_word,
brand,
author,
target_price
FROM amazontweet.search_condition
WHERE del_flg = 0 AND disable_flg = 0 AND id_account = {0}
ORDER BY id_search
''')
sql = sql.format(user[0])
result = con.excecuteQuery(sql)
# 結果
return result
# -----------------------------------
# ツイート済みチェック
#
# すでに同じ内容のツイートを行っているか確認する。
# -----------------------------------
def checkSendTweet(con, user, search, item):
exist = False
if con.is_connected:
# ツイート履歴取得
sql = "SELECT id_history "
sql += "FROM tweet_history "
sql += "WHERE del_flg = 0 "
sql += " AND disable_flg = 0 "
sql += " AND id_account = " + str(user[0]) + " "
sql += " AND search_type = " + str(search[1]) + " "
sql += " AND asin = '" + item["ASIN"] + "'"
result = con.excecuteQuery(sql)
if len(result):
if result[0][0] > 0:
exist = True
# 結果
return exist
def regSendTweetHistory(con, user, search, item):
count = 0
if con.is_connected:
# ツイート履歴登録
sql = "INSERT INTO tweet_history (id_account, search_type, id_search, asin, item_url, title, message, amount, public_date, release_date, entry_date)" + "\n"
sql += " VALUES ("
sql += " " + str(user[0]) + ", " # id_account
sql += " " + str(search[1]) + ", " # search_type
sql += " " + str(search[0]) + ", " # id_search
sql += "'" + item["ASIN"] + "', " # asin
sql += "'" + item["DetailPageURL"] + "', " # item_url
sql += "'" + cutString(item["ItemInfo"]["Title"]["DisplayValue"],100) + "', " # title
sql += "'" + cutString(item["Offers"]["Listings"][0]["Availability"]["Message"],100) + "', " # message
sql += " " + str(item["Offers"]["Listings"][0]["Price"]["Amount"]) + ", " # amount
if "PublicationDate" in item["ItemInfo"]["ContentInfo"]: # public_date
sql += "'" + normalDateFormat(item["ItemInfo"]["ContentInfo"]["PublicationDate"]["DisplayValue"]) + "', "
else:
sql += "'',"
if "ReleaseDate" in item["ItemInfo"]["ProductInfo"]: # release_date
sql += "'" + normalDateFormat(item["ItemInfo"]["ProductInfo"]["ReleaseDate"]["DisplayValue"]) + "', "
else:
sql += "'',"
sql += "'" + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + "' " # entry_date
sql += ")"
count = con.excecuteInsert(sql)
# 結果
return count
# -----------------------------------
# 日付フォーマット変更
#
# Amazonの日付からシステム標準に変更
# -----------------------------------
def normalDateFormat(date):
return datetime.datetime.strptime(date, '%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m-%d')
# -----------------------------------
# 文字長調整
#
# LEFT関数の代替処理
# -----------------------------------
def cutString(args, num):
if len(args) > num:
return args[0: num]
else:
return args
# -----------------------------------
# メイン処理呼び出し
# -----------------------------------
if __name__ == '__main__':
main()
DBアクセスクラス
とても使いやすくまとまった処理を公開していただいていたので、こちらを参考(コピー)にさせていただきました。
DBへのアクセスが自分だけだったため、接続情報をファイル内に記載する形を採っていますが、本来はよろしくない処理ですのでご了承ください。
import mysql.connector
#
# DBアクセス管理クラス
#
class dbAccessor:
DB_HOST = 'サーバー名'
DB_USER = 'ユーザー名'
DB_PWD = 'パスワード'
DB_NAME = 'DB名'
# -----------------------------------
# コンストラクタ
#
# コネクションを取得し、クラス変数にカーソルを保持する。
# -----------------------------------
def __init__(self, hostName=DB_HOST, user=DB_USER, password=DB_PWD, dbName=DB_NAME):
print("start:__init__")
try:
# DBに接続する
self.conn = mysql.connector.connect(
host = hostName,
user = user,
password = password,
database = dbName,
)
# コネクションの設定
self.conn.autocommit = False
# カーソル情報をクラス変数に格納
self.conn.is_connected()
self.cur = self.conn.cursor()
except (mysql.connector.errors.ProgrammingError) as e:
print(e)
print("end:__init__")
# -----------------------------------
# クエリの実行
#
# クエリを実行し、取得結果を呼び出し元に通知する。
# -----------------------------------
def excecuteQuery(self, sql):
print("start:excecuteQuery")
try:
self.cur.execute(sql)
rows = self.cur.fetchall()
return rows
except (mysql.connector.errors.ProgrammingError) as e:
print(e)
print("end:excecuteQuery")
# -----------------------------------
# インサートの実行
#
# インサートを実行する。
# -----------------------------------
def excecuteInsert(self, sql):
print("start:excecuteInsert")
try:
self.cur.execute(sql)
self.conn.commit()
return self.cur.rowcount
except (mysql.connector.errors.ProgrammingError) as e:
self.conn.rollback()
print(e)
print("end:excecuteInsert")
# -----------------------------------
# アップデートの実行
#
# アップデートを実行する。
# -----------------------------------
def excecuteUpdate(self, sql):
print("start:excecuteUpdate")
try:
self.cur.execute(sql)
self.conn.commit()
return self.cur.rowcount
except (mysql.connector.errors.ProgrammingError) as e:
self.conn.rollback()
print(e)
print("end:excecuteUpdate")
# -----------------------------------
# デリートの実行
#
# デリートを実行する。
# -----------------------------------
def excecuteDelete(self, sql):
print("start:excecuteDelete")
try:
self.cur.execute(sql)
self.conn.commit()
return self.cur.rowcount
except (mysql.connector.errors.ProgrammingError) as e:
self.conn.rollback()
print(e)
print("end:excecuteDelete")
# -----------------------------------
# コネクションチェック
#
# コネクションの接続状態を返す
# -----------------------------------
def is_connected(self):
return self.conn.is_connected
# -----------------------------------
# デストラクタ
#
# コネクションを解放する。
# -----------------------------------
def __del__(self):
print("start:__del__")
try:
self.conn.close()
except (mysql.connector.errors.ProgrammingError) as e:
print(e)
print("end:__del__")
Amzon APIクラス
メイン処理もそうですが、今回は一部処理のみ対応させています。
import hmac
import hashlib
import datetime
import json
import requests
class AwsV4:
# -----------------------------------
# コンストラクタ
#
# Amazon APIを初期化する。
# -----------------------------------
def __init__(self, accessKey="", secretKey=""):
self.accessKey = accessKey
self.secretKey = secretKey
self.regionName = ""
self.serviceName = "";
self.httpMethodName = "POST"
self.path = ""
self.queryParametes = {}
self.awsHeaders = {}
self.payload = ""
self.itemCount = 0
self.itemPage = 0
self.HMACAlgorithm = "AWS4-HMAC-SHA256"
self.aws4Request = "aws4_request"
self.strSignedHeader = ""
self.xAmzDate = self.__getTimeStamp()
self.currentDate = self.__getDate()
def setApiKey(self, accessKey, secretKey):
self.accessKey = accessKey
self.secretKey = secretKey
def setPartnerTag(self, partnerTag):
self.partnerTag = partnerTag
def setPath(self, path):
self.path = path
def setServiceName(self, serviceName):
self.serviceName = serviceName;
def setRegionName(self, regionName):
self.regionName = regionName
def setPayload(self, payload):
self.payload = payload
def setRequestMethod(self, method):
self.httpMethodName = method
def setHost(self, host):
self.host = host
def setSortBy(self, sortBy):
self.sortBy = sortBy
def setItemCount(self, itemCount):
self.itemCount = itemCount
def setItemPage(self, itemPage):
self.itemPage = itemPage
def addHeader(self, headerName, headerValue):
self.awsHeaders[headerName] = headerValue
def clearHeader(self):
self.awsHeaders = {}
def __prepareCanonicalRequest(self):
canonicalURL = ""
canonicalURL += self.httpMethodName + "\n"
canonicalURL += self.path + "\n" + "\n"
signedHeaders = ''
for key, value in self.awsHeaders.items():
signedHeaders += key + ";"
canonicalURL += key + ":" + value + "\n"
canonicalURL += "\n"
self.strSignedHeader = signedHeaders[0:len(signedHeaders)-1]
canonicalURL += self.strSignedHeader + "\n"
canonicalURL += self.__generateHex(self.payload)
return canonicalURL
def __prepareStringToSign(self, canonicalURL):
stringToSign = ''
stringToSign += self.HMACAlgorithm + "\n"
stringToSign += self.xAmzDate + "\n"
stringToSign += self.currentDate + "/" + self.regionName + "/" + self.serviceName + "/" + self.aws4Request + "\n"
stringToSign += self.__generateHex(canonicalURL)
return stringToSign
def __calculateSignature(self, stringToSign):
signatureKey = self.__getSignatureKey (self.secretKey, self.currentDate, self.regionName, self.serviceName)
signature = hmac.new(signatureKey, stringToSign.encode('utf-8'), hashlib.sha256).hexdigest()
return signature;
def getHeaders(self):
self.awsHeaders['x-amz-date'] = self.xAmzDate
self.awsHeaders = sorted(self.awsHeaders.items()) #ソート
self.awsHeaders = dict((x, y) for x, y in self.awsHeaders) #タプルを辞書に変換
# Step 1: CREATE A CANONICAL REQUEST
canonicalURL = self.__prepareCanonicalRequest()
# Step 2: CREATE THE STRING TO SIGN
stringToSign = self.__prepareStringToSign(canonicalURL)
# Step 3: CALCULATE THE SIGNATURE
signature = self.__calculateSignature(stringToSign)
# Step 4: CALCULATE AUTHORIZATION HEADER
if signature != "":
self.awsHeaders['Authorization'] = self.__buildAuthorizationString(signature)
return self.awsHeaders
def __buildAuthorizationString(self, strSignature):
return self.HMACAlgorithm + " " + "Credential=" + self.accessKey + "/" + self.__getDate() + "/" + self.regionName + "/" + self.serviceName + "/" + self.aws4Request + "," + "SignedHeaders=" + self.strSignedHeader + "," + "Signature=" + strSignature;
def __generateHex(self, data):
return hashlib.sha256(data.encode('utf-8')).hexdigest()
def __sign(self, key, msg):
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
def __getSignatureKey(self, key, date, regionName, serviceName):
strSecret = "AWS4" + key
kDate = self.__sign(strSecret.encode('utf-8'), date)
kRegion = self.__sign(kDate, regionName)
kService = self.__sign(kRegion, serviceName)
kSigning = self.__sign(kService, self.aws4Request)
return kSigning
def __getTimeStamp(self):
return datetime.datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')
def __getDate(self):
return datetime.datetime.utcnow().strftime('%Y%m%d')
# -----------------------------------
# SearchItems
#
# 条件に合致した商品情報を取得する。
# -----------------------------------
def SearchItems(self, key_word, brand, author):
# create payload
payload = "{"
payload += " \"Keywords\": \"" + key_word + "\","
payload += " \"Resources\": ["
payload += " \"ItemInfo.ByLineInfo\","
payload += " \"ItemInfo.ContentInfo\","
payload += " \"ItemInfo.ContentRating\","
payload += " \"ItemInfo.Classifications\","
payload += " \"ItemInfo.ExternalIds\","
payload += " \"ItemInfo.Features\","
payload += " \"ItemInfo.ManufactureInfo\","
payload += " \"ItemInfo.ProductInfo\","
payload += " \"ItemInfo.TechnicalInfo\","
payload += " \"ItemInfo.Title\","
payload += " \"ItemInfo.TradeInInfo\","
payload += " \"Offers.Listings.Availability.Message\","
payload += " \"Offers.Listings.Availability.Type\","
payload += " \"Offers.Listings.Condition\","
payload += " \"Offers.Listings.MerchantInfo\","
payload += " \"Offers.Listings.Price\","
payload += " \"Offers.Listings.ProgramEligibility.IsPrimeExclusive\","
payload += " \"Offers.Listings.ProgramEligibility.IsPrimePantry\","
payload += " \"Offers.Listings.SavingBasis\","
payload += " \"Offers.Summaries.HighestPrice\","
payload += " \"Offers.Summaries.LowestPrice\","
payload += " \"Offers.Summaries.OfferCount\","
payload += " \"ParentASIN\""
payload += " ],"
if brand != "":
payload += " \"Brand\": \"" + brand + "\","
if author != "":
payload += " \"Author\": \"" + author + "\","
payload += " \"SortBy\": \"" + self.sortBy + "\","
if self.itemCount > 0:
payload += " \"ItemCount\": " + str(self.itemCount) + ","
if self.itemPage > 0:
payload += " \"ItemPage\": " + str(self.itemPage) + ","
payload += " \"Merchant\": \"Amazon\","
payload += " \"PartnerTag\": \"" + self.partnerTag + "\","
payload += " \"PartnerType\": \"Associates\","
payload += " \"Marketplace\": \"www.amazon.co.jp\","
payload += " \"Operation\": \"SearchItems\""
payload += "}"
#print(payload)
return self.getResponse(payload, "SearchItems", "getitems")
def getResponse(self, payload, operation, uriPath):
# set param
self.setPath("/paapi5/" + uriPath)
self.setPayload (payload)
# create header
self.clearHeader()
self.addHeader('content-encoding', 'amz-1.0')
self.addHeader('content-type', 'application/json; charset=utf-8')
self.addHeader('host', self.host)
self.addHeader('x-amz-target', 'com.amazon.paapi5.v1.ProductAdvertisingAPIv1.' + operation)
headers = self.getHeaders()
try:
response = requests.post('https://' + self.host + "/paapi5/" + uriPath, headers=headers, data=payload.encode("utf-8"))
json_response = json.loads(response.text)
return json_response
except :
pass
return {"Result": "error"}
Twitter APIクラス
API KEYなどはDBに登録してあるものを利用しています。
import tweepy
#
# Twitterアクセス管理クラス
#
class Tweet:
# -----------------------------------
# コンストラクタ
#
# 認証済みのTwitter APIを用意する。
# -----------------------------------
def __init__(self, api_key, api_secret, access_token, access_token_secret):
try:
# Twitterライブラリの呼び出し
auth = tweepy.OAuthHandler(api_key, api_secret)
auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(auth)
except Exception as e:
print(f"Tweet init Error: {e}")
# -----------------------------------
# ツイートの送信
#
# 作成したメッセージをツイートする
# -----------------------------------
def sendTwitter(self, msg):
# メッセージをツイート
try:
if msg != '':
self.api.update_status(msg)
except Exception as e:
print(f"sendTwitter Error: {e}")
実行結果
新規予約情報を投稿するため、実行当日よりも未来に発行予定のものがあればTwitterに投稿し、すでに発売中の物については無視するようになっています。
また、一度投稿したものは再度投稿しないように送信済みチェックを行なっていますので、同じ内容が2回ツイートされることはありません。
というわけで
汚いソースを公開するのは恥ずかしいですが、何か皆様にも得るものがあればと恥を偲んで公開してみました。
処理には十分なエラー対策が含まれておりませんので参考にされる場合はご注意ください。
当然ながらこのソースを利用したことによって何らかの問題が発生しても責任は負えませんので何卒ご了承ください。
ひと段落するところまでは動かせたので次は何をしましょう。ログ出力や全体的なエラー対策も必要なところではありますが。
この記事が気に入ったらサポートをしてみませんか?