[Python Websocket JSON-RPC] bitFlyer Realtime API の受信データを記録し続けて、圧縮後Google Driveにアップロードする

これはなに?

bitFlyer の Realtime API の情報を延々と記録し続けるだけのコード

・定期メンテナンス(04:00〜04:10)までひたすらファイルにロギング
・定期メンテナンス中
 - 前日のログファイル圧縮、圧縮前ログファイルの削除
 - GoogleDriveへのログファイルのアップロード、アップロードしたログファイルの削除


ダウンロードするやつ


確認環境

Mac/Ubuntu で動作確認
※websocket-clientは0.49以降だとうまく受信できないかもしれないので注意

MacBookPro

$ sw_vers
ProductName:	Mac OS X
ProductVersion:	10.14
BuildVersion:	18A391

$ python --version
Python 3.6.2 :: Anaconda, Inc.

$ pip --version
pypip 18.0

$ pip show websocket-client PyDrive google-api-python-client
Name: websocket-client
Version: 0.48.0

Name: PyDrive
Version: 1.3.1

Name: google-api-python-client
Version: 1.7.4


Ubuntu

$ cat /etc/os-release
NAME="Ubuntu"
VERSION="16.04.4 LTS (Xenial Xerus)"

$ python --version
Python 3.6.6

$ pip --version
pip 18.0

$ pip show websocket-client PyDrive google-api-python-client
Name: websocket-client
Version: 0.48.0

Name: PyDrive
Version: 1.3.1

Name: google-api-python-client
Version: 1.7.4


コード

適当なので気になる箇所あれば各自で修正してください

# -*- Coding: utf-8 -*-
import os
import pathlib
import traceback
import websocket
import json
from datetime import datetime, timedelta
import zipfile
from zipfile import ZIP_DEFLATED, ZIP_BZIP2, ZIP_LZMA
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
import logging
from logging import getLogger, StreamHandler, handlers, Formatter, INFO

UPLOAD_GDRIVE = True
GDRIVE_FOLDER_ID = 'GoogleDriveFolderId'

LOG_FMT_TH = '%(asctime)s\t%(levelname)s\t%(message)s'
LOG_FMT_DATA = '%(asctime)s\t%(created).6f\t%(message)s'

SYMBOL_BTC_FX = 'FX_BTC_JPY'
SYMBOL_BTC_SPOT = 'BTC_JPY'

WSS_URL = "wss://ws.lightstream.bitflyer.com/json-rpc"

WSS_BOARD_SS_BASE = "lightning_board_snapshot_"
WSS_BOARD_SS_FX = WSS_BOARD_SS_BASE + SYMBOL_BTC_FX
WSS_BOARD_SS_SPOT = WSS_BOARD_SS_BASE + SYMBOL_BTC_SPOT

WSS_BOARD_BASE = "lightning_board_"
WSS_BOARD_FX = WSS_BOARD_BASE + SYMBOL_BTC_FX
WSS_BOARD_SPOT = WSS_BOARD_BASE + SYMBOL_BTC_SPOT

WSS_EXCE_BASE = "lightning_executions_"
WSS_EXEC_FX = WSS_EXCE_BASE + SYMBOL_BTC_FX
WSS_EXEC_SPOT = WSS_EXCE_BASE + SYMBOL_BTC_SPOT

WSS_TICKER_BASE = "lightning_ticker_"
WSS_TICKER_FX = WSS_TICKER_BASE + SYMBOL_BTC_FX
WSS_TICKER_SPOT = WSS_TICKER_BASE + SYMBOL_BTC_SPOT

MNT_START_HOUR = 4
MNT_START_MIN = 0
MNT_START_MARGIN = 1
ITV_MNT = 10

def getMyLogger(loggerName=__name__, file=None, fmt=LOG_FMT_TH):
  if not file:
    return None

  # Logger 生成
  logger = getLogger(loggerName)
  logger.setLevel(INFO)

  # ディレクトリがない場合は作成
  dir = pathlib.Path(file).parent
  if dir and not dir.exists():
    dir.mkdir()

  # ログ取得開始日が違っていればリネーム
  if pathlib.Path(file).exists():
    l = ""
    with open(file) as f:
      l = f.readline()

    if l and l[:10] != datetime.now().strftime("%Y-%m-%d"):
      pathlib.Path(file).rename(file + '.' + l[:10])

  # Handler 生成
  handler = handlers.TimedRotatingFileHandler(file, when="midnight", interval=1, backupCount=30)
  handler.setLevel(INFO)

  # ログフォーマット設定
  formatter = Formatter(fmt)
  formatter.default_msec_format = '%s.%03d'
  handler.setFormatter(formatter)

  # Handler 登録
  logger.addHandler(handler)

  return logger

loggerTh = getMyLogger(__name__ + "Th", file="log/realtime_api_thread.log")

class ReailtimeApiReceiver(object):
  def __init__(self, rcvChannels, oneFile=False):
    self.loop = True
    self.itvWssReconnect = 0.5
    self.rcvChannels = rcvChannels
    if oneFile:
      logger = getMyLogger(__name__ + "All", file="log/realtime_api_all.log", fmt=LOG_FMT_DATA)

    self.loggers = {}
    for ch, isRcv in rcvChannels.items():
      if not isRcv: continue
      if not oneFile: logger = getMyLogger(__name__ + ch, file="log/realtime_api_" + ch + ".log", fmt=LOG_FMT_DATA)
      self.loggers[ch] = logger

    #Define Websocket
    self.ws = websocket.WebSocketApp(WSS_URL, header=None, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)
    websocket.enableTrace(True)

    loggerTh.info("Initialized Realtime API Receiver.")

  def run(self):
    loggerTh.info("Start Realtime API Receiver.")
    while self.loop:
      self.ws.run_forever()
      loggerTh.info('Web Socket process ended. Retrying reconnect.')
      sleep(self.itvWssReconnect)
    loggerTh.info("Stop Realtime API Receiver.")

  """
  Below are callback functions of websocket.
  """
  # when we get message
  def on_message(self, ws, message):
    ch = json.loads(message)["params"]["channel"]
    self.loggers[ch].info(message)

  # when error occurs
  def on_error(self, ws, error):
    loggerTh.error(error)

  # when websocket closed.
  def on_close(self, ws):
    loggerTh.info('disconnected streaming server')

  # when websocket opened.
  def on_open(self, ws):
    loggerTh.info('connected streaming server')
    for ch, isRcv in self.rcvChannels.items():
      if isRcv:
        ws.send(json.dumps({"method": "subscribe", "params": {"channel": ch}}))

  def is_alive(self):
    return self.ws.keep_running

  def exitLoop(self):
    self.loop = False
    self.ws.close()

def calcMntTime():
  now = datetime.now()
  mntStart = datetime(now.year, now.month, now.day, MNT_START_HOUR, MNT_START_MIN + MNT_START_MARGIN)
  if mntStart + timedelta(minutes=ITV_MNT) < now:
    mntStart += timedelta(days=1)
  return mntStart

def compressLogs(yesterday, compression=ZIP_LZMA):
  logFiles = pathlib.Path().glob("realtime_api_*.log." + yesterday)
  for logFile in logFiles:
    loggerTh.info("compress {}".format(logFile))
    with zipfile.ZipFile(str(logFile) + ".zip", "w", compression=ZIP_DEFLATED) as logZip:
      logZip.write(logFile)
    logFile.unlink()

def upload2gdrive(yesterday, gauth, drive):
  zippedFiles = pathlib.Path().glob("realtime_api_*.log." + yesterday + ".zip")
  for zippedFile in zippedFiles:
    loggerTh.info("upload {}".format(zippedFile))
    metadata = {
      'title': str(zippedFile),
    }
    if GDRIVE_FOLDER_ID:
      metadata['parents'] = [{'id': GDRIVE_FOLDER_ID}]
    driveFile = drive.CreateFile(metadata)
    driveFile.SetContentFile(str(zippedFile))
    driveFile.Upload()
    zippedFile.unlink()

def compressAndUpload():
  if UPLOAD_GDRIVE:
    gauth = GoogleAuth()
    gauth.CommandLineAuth()
    drive = GoogleDrive(gauth)

  while True:
    nextMnt = calcMntTime()
    loggerTh.info("sleep start, until {}".format(nextMnt))
    while nextMnt > datetime.now():
      sleep(1)

    loggerTh.info("sleep finish")
    os.chdir("log")
    yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
    compressLogs(yesterday)
    if UPLOAD_GDRIVE:
      upload2gdrive(yesterday, gauth, drive)
    os.chdir("..")
    sleep(ITV_MNT * 60)

if __name__ == "__main__":
  rcvChannels = {
    WSS_EXEC_FX: True,
    WSS_BOARD_SS_FX: True,
    WSS_BOARD_FX: True,
    WSS_TICKER_FX: True,
    WSS_EXEC_SPOT: True,
    WSS_BOARD_SS_SPOT: True,
    WSS_BOARD_SPOT: True,
    WSS_TICKER_SPOT: True,
  }
  receiver = ReailtimeApiReceiver(rcvChannels, False)

  import concurrent.futures
  executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
  executor.submit(receiver.run)
  executor.submit(compressAndUpload)

  from time import sleep
  try:
    while True:
      sleep(1)
  except (KeyboardInterrupt, Exception) as e:
    loggerTh.error(e)
    loggerTh.error(traceback.format_exc())

  receiver.exitLoop()
  executor.shutdown(wait=True)


動作について

ReailtimeApiReceiverの引数

- rcvChannels: {channel: isRcv} でどのチャンネルを購読するかを決定
- oneFile: ログをチャンネル別に分けるか、1つのファイルにまとめるか


ログファイル

- フォーマットは <時刻(YYYY-MM-DD hh:mm:ss.f)><タブ><タイムスタンプ><タブ><受信データ>
  <タイムスタンプ> を出力しているのは usec を取得するため
  サンプル: 2018-11-03 09:55:15.240	1541206515.240989	{"jsonrpc":"2.0","method":"channelMessage","params":{"channel":"lightning_ticker_FX_BTC_JPY","message":{"product_code":"FX_BTC_JPY","timestamp":"2018-11-03T00:55:15.1120093Z","tick_id":12899309,"best_bid":730944,"best_ask":730961,"best_bid_size":0.03,"best_ask_size":0.04086046,"total_bid_depth":13427.18752245,"total_ask_depth":10912.80026445,"ltp":730961,"volume":172394.54656461,"volume_by_product":172394.54656461}}}
- oneFile = True の場合 log/realtime_api_all.log に出力
- oneFile = False の場合 log/realtime_api_<channel_name>.log に出力
- (システム時刻における)日付変更でローテーション
- 定期メンテナンス(04:0004:10)になったら、前日のログファイルを圧縮して削除する


Google Driveへのアップロード

- UPLOAD_GDRIVE = True でアップロード
- GDRIVE_FOLDER_ID に folder id を設定(値は URL の末尾部分、詳しくは参考リンク参照)


注意事項

・ディスクフル(空き容量ゼロ)にはご注意
・Google Driveの空き容量にはご注意


参考サイト


おわりに

有料(¥100)にしてるけど、これで内容は全部です。募金してくれる人がいれば、ジュース代としていただけると嬉しい。コードは、インデントくずれが起きたりするようなので、コピペ時には注意してください。


マガジン


コメント用note(未購入者向け)


干し芋


ここから先は

0字

¥ 100

サポート頂けると励みになります BTC,BCH: 39kcicufyycWVf8gcGxgsFn2B8Nd7reNUA LTC: LUFGHgdx1qqashDw4WxDcSYQPzd9w9f3iL MONA: MJXExiB7T7FFXKYf9SLqykrtGYDFn3gnaM