BTC取引所毎の約定データ取得(python3): bitFlyer, BitMEX, Bitfinex, Binance, Liquid

メジャーな各BTC取引所のwebsocket経由の約定履歴取得コードです。on_messageの中身を書き換えて使ってください。​
一応、unix時間(秒)、価格、枚数を表示 するコードを書いています。
sizeはテイカ−が買なら正、売りなら負です。
このクラスはwebsocket専用のスレッドを作ってデーモンとして実行します。
デーモンとして実行されたスレッドはデーモン以外のスレッドがすべてなくなると自動的に終了します。
なので動作確認用のコードではinput()で入力待ちにすることでメインスレッドを終了させずに待機させています。(enterもしくはctrl+C等で終了)

base.py

import websocket
import threading
import datetime


def unix_time_from_ISO8601Z(date):
   td = datetime.datetime.strptime(date[:19], '%Y-%m-%dT%H:%M:%S')
   td = td.replace(tzinfo=datetime.timezone.utc)
   ts = td.timestamp()
   ts += float('0.' + date[20:-1])
   return ts


class Trade:
   def __init__(self, url):
       self.url = url
       self.thread = threading.Thread(target=lambda: self.run())
       self.thread.daemon = True
       self.thread.start()

   def run(self):
       while True:
           # websocket.enableTrace(True)
           self.ws = websocket.WebSocketApp(
               self.url,
               on_open=lambda ws: self.on_open(ws),
               on_close=lambda ws: self.on_close(ws),
               on_message=lambda ws, msg: self.on_message(ws, msg),
               on_error=lambda ws, err: self.on_error(ws, err))
           self.ws.run_forever()

   def on_open(self, ws):
       pass

   def on_close(self, ws):
       pass

   def on_message(self, ws, msg):
       pass

   def on_error(self, ws, err):
       pass

bitflyer.py

import json
from base import Trade, unix_time_from_ISO8601Z


class BitflyerTrade(Trade):
   def __init__(self):
       super().__init__('wss://ws.lightstream.bitflyer.com/json-rpc')

   def on_open(self, ws):
       ws.send(json.dumps({
           'method': 'subscribe',
           'params': {'channel': 'lightning_executions_BTC_JPY'},
       }))
       ws.send(json.dumps({
           'method': 'subscribe',
           'params': {'channel': 'lightning_executions_FX_BTC_JPY'},
       }))

   def on_message(self, ws, msg):
       msg = json.loads(msg)
       if msg['method'] != 'channelMessage':
           return

       p = msg['params']
       ch = p['channel']
       m = p['message']
       for t in m:
           ts = unix_time_from_ISO8601Z(t['exec_date'])
           price = t['price']
           size = t['size']
           side = t['side']
           if side == 'SELL':
               size *= -1
           # buy_id = t['buy_child_order_acceptance_id']
           # sell_id = t['sell_child_order_acceptance_id']

           if ch == 'lightning_executions_BTC_JPY':
               print(ts, price, size)
           elif ch == 'lightning_executions_FX_BTC_JPY':
               print(ts, price, size)


if __name__ == '__main__':
   trade = BitflyerTrade()
   input()

bitfinex.py

import json
from base import Trade, unix_time_from_ISO8601Z


class BitmexTrade(Trade):
   def __init__(self):
       super().__init__('wss://www.bitmex.com/realtime?subscribe=trade:XBTUSD')

   def on_message(self, ws, msg):
       msg = json.loads(msg)
       if 'table' in msg and msg['table'] == 'trade' and \
               msg['action'] == 'insert':
           ts = unix_time_from_ISO8601Z(msg['data'][0]['timestamp'])
           for t in msg['data']:
               price = t['price']
               size = round(t['size'] / price, 8)
               if t['side'] == 'Sell':
                   size *= -1
               print(ts, price, size)


if __name__ == '__main__':
   trade = BitmexTrade()
   input()

binance.py

import json
from base import Trade, unix_time_from_ISO8601Z


class BinanceTrade(Trade):
   def __init__(self):
       super().__init__('wss://stream.binance.com:9443/ws/btcusdt@trade')

   def on_message(self, ws, msg):
       msg = json.loads(msg)
       if msg['e'] == 'trade' and msg['M']:
           ts = msg['T']/1000
           price = float(msg['p'])
           size = float(msg['q'])
           if msg['m']:
               size *= -1
           print(ts, price, size)


if __name__ == '__main__':
   trade = BinanceTrade()
   input()

liquid.py
・pip3等でliquidtap(公式ライブラリ、https://pypi.org/project/liquidtap/)をインストールしてください。

import json
import liquidtap


class LiquidTrade:
   def __init__(self):
       self.tap = liquidtap.Client()
       self.tap.pusher.connection.bind(
           'pusher:connection_established', lambda d: self.on_connect(d))
       self.tap.pusher.connect()

   def on_message(self, msg):
       msg = json.loads(msg)
       ts = msg['created_at']
       size = float(msg['quantity'])
       price = float(msg['price'])
       if msg['taker_side'] == 'sell':
           size *= -1
       print(ts, price, size)

   def on_connect(self, data):
       self.tap.pusher.subscribe("execution_details_cash_btcjpy").bind(
           'created', lambda msg: self.on_message(msg))


if __name__ == "__main__":
   trade = LiquidTrade()
   input()

実行方法
base.pyと取引所毎のファイルを同じディレクトリに置いて実行してください。

$ ls
binance.py  bitfinex.py  bitflyer.py  bitmex.py  exchange_base.py
$ python3 bitflyer.py

更新履歴
・2019/05/29 スレッドをデーモンに変更。
・2019/07/14 スレッドの実行方法を変更。
・2019/07/16 Liquid追加。
・2019/09/01 一部ファイル名、クラス名を変更。
・2019/09/03 python3.7依存のコードを除去。関連する注意書きを削除。
・2019/09/03 命名規則の統一。


この記事が気に入ったら、サポートをしてみませんか?気軽にクリエイターを支援できます。

51

penta

仮想通貨の記事専用のアカウント。 プログラミング一般の記事はQiitaに投稿します。 qiita → https://qiita.com twitter → https://twitter.com/_P_E_N_T_A

仮想通貨

1つ のマガジンに含まれています

コメント1件

Okexも追加希望です!
コメントを投稿するには、 ログイン または 会員登録 をする必要があります。