[Python Websocket JSON-RPC OHLCV] bitFlyer の約定情報からオレオレOHLCデータを作成する(その2)

関連note


前のnoteからの変更点

・時刻情報を Timestamp 型に変更
・Volume も計算するようにした
・解像度も指定できるようにした(1分足以外にも対応)
・OHLCV データの保持数を指定できるようにした(前回は最新データのみ)
・開始直後の中途半端なデータは無視するようにした(1分足で 21:00:30 に実行した場合、21:00:30〜21:00:59 のデータは処理しない。21:01:00〜21:01:59 が初回データ)
・他スレッドから利用しやすいようにした(スレッドセーフな collections.deque でデータ保持。OHLCV データ取得用のメソッド作成)


コード

import json
import websocket
from datetime import datetime, timedelta
import dateutil.parser
import concurrent.futures
import collections
import numpy as np
from time import sleep
from logging import getLogger,INFO,StreamHandler
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(INFO)
logger.setLevel(INFO)
logger.addHandler(handler)

TICK_RES = 60 # OHLCV データの解像度(1分足 : 60, 5分足 : 300, 10分足 : 600, ...)
OHLCV_LEN = 5 # OHLCV データの保持数、指定数+1 を管理(0番目は最新データ(随時更新)、1番目以降が確定データ)
IDX_DATE = 0
IDX_OPEN = 1
IDX_HIGH = 2
IDX_LOW = 3
IDX_CLOSE = 4
IDX_VOLUME = 5
ITV_SLEEP_WSS_RECONNECT = 1

"""
This program calls Bitflyer real time API JSON-RPC2.0 over Websocket
"""
class RealtimeOHLCV(object):
  def __init__(self, url, channel):
    self.url = url
    self.channel = channel

    #Define Websocket
    self.ws = websocket.WebSocketApp(self.url, header=None, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)
    websocket.enableTrace(True)

  def init_ohlcvs(self):
    self.ohlcvs = collections.deque([], OHLCV_LEN + 1)
    self.firstTick = self.get_tick_datetime(datetime.now() + timedelta(seconds=TICK_RES))
    for i in range(0, OHLCV_LEN + 1):
      self.ohlcvs.appendleft([0, 0.0, 0.0, 0.0, 0.0, 0.0])

  def run(self):
    #ws has loop. To break this press ctrl + c to occur Keyboard Interruption Exception.
    while(True):
      self.init_ohlcvs()
      self.ws.run_forever()
      logger.info('Web Socket process ended. Retrying reconnect.')
      self.ohlcvs.clear()
      sleep(ITV_SLEEP_WSS_RECONNECT)

  def is_alive(self):
    return self.ws.keep_running

  """
  Below are callback functions of websocket.
  """
  # when we get message
  def on_message(self, ws, message):
    output = json.loads(message)['params']
    self.create_ohlcv(output)

  # when error occurs
  def on_error(self, ws, error):
    logger.error(error)

  # when websocket closed.
  def on_close(self, ws):
    logger.info('disconnected streaming server')

  # when websocket opened.
  def on_open(self, ws):
    logger.info('connected streaming server')
    output_json = json.dumps(
      {'method' : 'subscribe',
      'params' : {'channel' : self.channel}
      }
    )
    ws.send(output_json)

  """
  Below are OHLCV functions.
  """
  # OHLCV 生成
  def create_ohlcv(self, output):
    now = datetime.now()
    for d in output["message"]:
      exec_date = self.get_exec_datetime(d)

      # 中途半端な時刻のデータは捨てる
      if(self.firstTick > exec_date):
        break

      # OHLCV データの時刻が更新された場合
      if(self.is_next_tick(exec_date)):
        if(self.ohlcvs[0][IDX_DATE] != 0):
          # ローテーション(古いデータを上書きして再利用)
          self.ohlcvs.rotate()
        # OHLCV データ初期値設定
        self.init_ohlcv(exec_date, d)

      # OHLCV データの時刻が同じ場合
      else:
        # OHLCV データ更新
        self.update_ohlcv(exec_date, d)

  # 約定データから datetime 生成
  def get_exec_datetime(self, d):
    exec_date = d["exec_date"].replace('T', ' ')[:-1]
    return dateutil.parser.parse(exec_date) + timedelta(hours=9)

  # OHLCV データの基準時刻
  def get_tick_datetime(self, dt):
    tickTs = int(dt.timestamp() / TICK_RES) * TICK_RES
    return datetime.fromtimestamp(tickTs)

  # 次の足かどうか
  def is_next_tick(self, exec_date):
    return self.get_tick_datetime(datetime.fromtimestamp(self.ohlcvs[0][IDX_DATE])) != self.get_tick_datetime(exec_date)

  def init_ohlcv(self, exec_date, d):
    price = float(d["price"])
    vol = float(d["size"])
    self.ohlcvs[0][IDX_DATE] = self.get_tick_datetime(exec_date).timestamp()
    self.ohlcvs[0][IDX_OPEN] = price
    self.ohlcvs[0][IDX_HIGH] = price
    self.ohlcvs[0][IDX_LOW] = price
    self.ohlcvs[0][IDX_CLOSE] = price
    self.ohlcvs[0][IDX_VOLUME] = vol

  def update_ohlcv(self, exec_date, d):
    if self.ohlcvs[0][IDX_DATE] == self.get_tick_datetime(exec_date).timestamp():
      price = float(d["price"])
      vol = float(d["size"])
      self.ohlcvs[0][IDX_HIGH] = max(self.ohlcvs[0][IDX_HIGH], price)
      self.ohlcvs[0][IDX_LOW] = min(self.ohlcvs[0][IDX_LOW], price)
      self.ohlcvs[0][IDX_CLOSE] = price
      self.ohlcvs[0][IDX_VOLUME] += vol
    else:
      logger.info("Past data {} {} {}".format(exec_date, d["price"], d["size"]))

  # 最新(更新中)の OHLCV 取得
  def get_current_ohlcv(self):
    return self.ohlcvs[0]

  # 確定済の最新 OHLCV 取得
  def get_newest_ohlcv(self):
    return self.ohlcvs[1]

  # OHLCV 取得
  # デフォルト: 新しい順で全てのデータを取得
  def get_ohlcvs(self, num=OHLCV_LEN+1, asc=False):
    ohlcvs = []
    for i in range(0, num):
      ohlcvs.append(self.ohlcvs[i])
    if(asc):
      ohlcvs.reverse()
    return ohlcvs

def print_ohlcv(rtOHLCV):
  prevData = rtOHLCV.get_current_ohlcv()
  while(True):
    # OHLCV データが更新されたら出力
    if(rtOHLCV.is_alive()):
      tempData = rtOHLCV.get_current_ohlcv()
      if(tempData != prevData):
        ohlcv = rtOHLCV.get_newest_ohlcv()
        logger.info("{}, {}, {}, {}, {}, {}, {}".format(datetime.now(), datetime.fromtimestamp(ohlcv[IDX_DATE]), ohlcv[IDX_OPEN], ohlcv[IDX_HIGH], ohlcv[IDX_LOW], ohlcv[IDX_CLOSE], ohlcv[IDX_VOLUME]))
        prevData = tempData
        sleep(TICK_RES * 0.9)
        continue
    sleep(0.5)

if __name__ == '__main__':
  # Thread
  executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

  #API endpoint
  url = 'wss://ws.lightstream.bitflyer.com/json-rpc'
  channel = 'lightning_executions_FX_BTC_JPY' # 約定
  rtOHLCV = RealtimeOHLCV(url=url, channel=channel)

  #ctrl + cで終了
  executor.submit(rtOHLCV.run)
  executor.submit(print_ohlcv, rtOHLCV)


コード説明

あまり細かい説明はしません。

・他スレッドでデータ取得できるよう、OHLCV データはスレッドセーフな collections.deque で管理
・上位階層で RealtimeOHLCV のインスタンスを作成して、他スレッドに渡すことでデータ共有してます。他に良いやり方あると思うので、好きなように改造してください
・collections.deque は slice([0:-1] みたいなやつ)ができないので、get_ohlcvs では list に詰め直してます
・WebSocket が切断されることがあったので、run() の中でループするように変更
・以下のメソッドで OHLCV 取得可能

rtOHLCV.get_current_ohlcv() : 現在更新中の OHLCV を返す
rtOHLCV.get_newest_ohlcv() : 確定した最新 OHLCV を返す
rtOHLCV.get_ohlcvs() : 全て(更新中含む)の OHLCV を新しい順(0最新〜n最古)で返す
rtOHLCV.get_ohlcvs(asc=True) : 全ての OHLCV を古い順で返す
rtOHLCV.get_ohlcvs(num=3) : 3つ(更新中含む)の OHLCV を新しい順で返す
rtOHLCV.get_ohlcvs(num=3, asc=True) : 3つ(更新中含む)の OHLCV を古い順で返す
rtOHLCV.is_alive() : WebSocket の接続状態(True: 接続中, False: 切断中)を返す


TradingView との比較

ロジック変わってないので、前回のnoteを見てください。


おわりに

有料(¥100)にしてるけど、これで内容は全部です。募金してくれる人がいれば、ジュース代としていただけると嬉しい。コードは、インデントくずれが起きたりするようなので、コピペ時には注意してください。


マガジン


コメント用note(未購入者向け)


干し芋


続きをみるには

残り 0字

¥ 100

サポート頂けると励みになります BTC,BCH: 39kcicufyycWVf8gcGxgsFn2B8Nd7reNUA LTC: LUFGHgdx1qqashDw4WxDcSYQPzd9w9f3iL MONA: MJXExiB7T7FFXKYf9SLqykrtGYDFn3gnaM