noteのタイトル画像

[bitmex_websocket] websocketでticker,orderbook,trade情報を取得しようとしてハマるの回

pythonの扱いにもだんだんと慣れてくると、色々とやってみたいことがあります。
websocketthreadingです。

巷で人気の高速通信mmbotには、まず間違いなくwebsocketが実装されているでしょう。
サーバからプッシュされてくるデータを捕捉するモジュールは別スレッドで動いていることと思います。

最近は便利な時代ですね。
ネットで検索すると知りたいことは大抵ヒットします。(もちろん、情報が古かったり、間違っていることも多々あるので検証は必須です)

bitmex, websocket,で検索すると、そのものずばりな情報がヒットしました。

bitmexさん自身がapi-connectorsとして公開しているようです。
ライセンスの表記は見当たりませんが、pythonなので思いっきりソースコードしてますから、そのまま使えるってことでしょうかね。

また、このnote上にも情報があるようです。

です。

モッチオさん感謝です!

さっそく高速化したbitmex_websocketライブラリ(といっても、pythonのソースコード)をダウンロードし、試運転します。

モッチオさんのライブラリを呼び出す次のプログラムでticker, orderbook, position, tradeを取得してみましょう。

#!/usr/bin/python3
from datetime import datetime as dt
import time
#from bitmex_websocket import BitMEXWebsocket
from tuned_bitmex_websocket import BitMEXWebsocket

# symbol
SYMBOL = 'XBTUSD'

APIKEY = 'your_apieky'
SECRET = 'your_secret'

USE_TESTNET = True

try:
    # WebSocket API接続用オブジェクトを生成
    ws = BitMEXWebsocket(
        endpoint='wss://www.bitmex.com/realtime' if USE_TESTNET is False else 'wss://testnet.bitmex.com/realtime', 
        symbol=SYMBOL, 
        api_key=APIKEY, 
        api_secret=SECRET)
    # instrumentメソッドを一度呼び出さないとエラーを吐くので追加(内部的にget_tickerがこの情報を使用するため)
    ws.get_instrument()
    
    # Socketの接続が活きている限り処理を続けます
    while ws.ws.sock.connected :
        
        print('------------------------------------------------------')

        now = dt.now().strftime('%Y-%m-%d %H:%M:%S')
        # Ticker情報を取得!
        tick = ws.get_ticker()
        print(tick)

        # ポジション情報を取得!(get_position()はありませんが、プロパティ「data」の中にちゃんと入ってるんですね)
        # ただし、ポジションが一つもない場合、'position'というデータ自体がありませんので、そこを考慮します
        if 'position' in ws.data:
            positions = ws.data['position']
        else:
            positions = []
        # 取得したポジション情報には現在保有している全ての通貨セットのポジションが含まれますので、'XBTUSD'でフィルタリングします
        positions = [position for position in positions if position['symbol'] == SYMBOL]
        
        # コンソールに表示してみます
        print("%s : last trade : %s" % (now, ws.data['trade'][-1]))
        #print(ws.timemark)
        
        for position in positions:
            print("%s : position : symbol %s : qty %s" % (now, position['symbol'], position['currentQty']))

        books = ws.market_depth()
        """
        for book in books:
            print('{} side={}, size={}, price={}'.format(book['id'], book['side'], book['size'], book['price']))
        """

        sells = [sell for sell in books if sell['side'] == 'Sell']
        sorted_sells = sorted(sells, key=lambda x: x['price'], reverse=False)
        """
        for sell in sorted_sells:
            print('{} side={}, size={}, price={}'.format(sell['id'], sell['side'], sell['size'], sell['price']))
        """
        print('book[sell] price={}, size={}'.format(sorted_sells[0]['price'], sorted_sells[0]['size']))

        buys = [buy for buy in books if buy['side'] == 'Buy']
        sorted_buys = sorted(buys, key=lambda x: x['price'], reverse=True)
        """
        for buy in sorted_buys:
            print('{} side={}, size={}, price={}'.format(buy['id'], buy['side'], buy['size'], buy['price']))
        """
        print('book[buy ] price={}, size={}'.format(sorted_buys[0]['price'], sorted_buys[0]['size']))

        # 1秒ごとに繰り返します!
        time.sleep(1)
except Exception as e:
    print(e)
    ws.exit()

(APIKEY,SECRETはご自分のapikey,secretを設定してください)

実行すると軽快にデータを出力していきます。とても気持ちが良いです。

------------------------------------------------------
{'last': 5289.0, 'buy': 5288.0, 'sell': 5289.0, 'mid': 5289.0}
2019-04-22 20:26:39 : last trade : {'timestamp': '2019-04-22T11:26:04.102Z', 'symbol': 'XBTUSD', 'side': 'Buy', 'size': 4, 'price': 5289, 'tickDirection': 'PlusTick', 'trdMatchID': '4045e54f-7c58-2aca-c5a1-4bb8dd83cd38', 'grossValue': 75628, 'homeNotional': 0.00075628, 'foreignNotional': 4}
2019-04-22 20:26:39 : position : symbol XBTUSD : qty -300
book[sell] price=5289, size=12317
book[buy ] price=5288.5, size=28567
------------------------------------------------------
{'last': 5289.0, 'buy': 5288.0, 'sell': 5289.0, 'mid': 5289.0}
2019-04-22 20:26:40 : last trade : {'timestamp': '2019-04-22T11:26:04.102Z', 'symbol': 'XBTUSD', 'side': 'Buy', 'size': 4, 'price': 5289, 'tickDirection': 'PlusTick', 'trdMatchID': '4045e54f-7c58-2aca-c5a1-4bb8dd83cd38', 'grossValue': 75628, 'homeNotional': 0.00075628, 'foreignNotional': 4}
2019-04-22 20:26:40 : position : symbol XBTUSD : qty -300
book[sell] price=5289, size=12317
book[buy ] price=5288.5, size=28567
------------------------------------------------------
{'last': 5289.0, 'buy': 5288.0, 'sell': 5289.0, 'mid': 5289.0}
2019-04-22 20:26:41 : last trade : {'timestamp': '2019-04-22T11:26:04.102Z', 'symbol': 'XBTUSD', 'side': 'Buy', 'size': 4, 'price': 5289, 'tickDirection': 'PlusTick', 'trdMatchID': '4045e54f-7c58-2aca-c5a1-4bb8dd83cd38', 'grossValue': 75628, 'homeNotional': 0.00075628, 'foreignNotional': 4}
2019-04-22 20:26:41 : position : symbol XBTUSD : qty -300
book[sell] price=5289, size=12317
book[buy ] price=5288.5, size=28567
 
・・・(以下略)・・・

ところが、少し経つと、ときどきエラーが表示されるようになってきました。
エラーの内容は

Traceback (most recent call last):
  File "/Users/temp/source/bitmex_websocket/tuned_bitmex_websocket.py", line 336, in __on_message
    self.data[table].remove(item)
ValueError: list.remove(x): x not in list

です。
リストから要素の削除できないと言って怒られています。

当該箇所は

elif action == 'delete':
    #処理時間計測開始
    #start = time.time()

    self.logger.debug('%s: deleting %s' % (table, message['data']))
    # Locate the item in the collection and remove it.
    for deleteData in message['data']:
        # 高速化のため、itemIdxsを追加で引数指定
        item = self.findItemByKeys(self.keys[table], self.data[table], deleteData, self.itemIdxs[table])
        self.data[table].remove(item)
    # インデックスの再構築をします
    for i in range(len(self.data[table])):
        item = self.data[table][i]
        keyvalues = "-".join([str(v) for k,v in item.items() if k in self.keys[table]])
        self.itemIdxs[table][keyvalues] = i

の「self.data[table].remove(item)」のようです。
itemがNoneで戻されるケースがあるようです。
本来の動きなのかどうかわかりませんが、とりあえずNoneを除外するために

if item:
    # itemNone になることがある?
    elf.data[table].remove(item)

を加えました。

これでエラーは出なくなってめでたしめでたしのはずだったのですが、運転を続けていると今度は板情報が途中からトンデもない数値を叩き出していました。
やっぱりデータが壊れてしまっている感じです。
(モッチオさんのnoteのコメント欄にも板情報が変な値になると言うコメントが書かれているのを発見)

ロジック自体に問題なければ、考えられる原因は大抵タイミング問題だったりします。
別スレッドで実行され、非同期で呼び出される部分の排他制御に問題があるケースによく似ています。
C++で実装していた頃はこの手の問題に散々悩まされてきました。

pythonのスレッドを制御するセマフォを取得して排他をしてみましょう。

python3のthreadingのドキュメントを漁ります。

ここに

Lock オブジェクト
プリミティブロックとは、ロックが生じた際に特定のスレッドによって所有されない同期プリミティブです。 Python では現在のところ拡張モジュール _thread で直接実装されている最も低水準の同期プリミティブを使えます。
プリミティブロックは2つの状態、 "ロック" または "アンロック" があります。ロックはアンロック状態で作成されます。ロックには基本となる二つのメソッド、 acquire() と release() があります。ロックの状態がアンロックである場合、 acquire() は状態をロックに変更して即座に処理を戻します。 状態がロックの場合、 acquire() は他のスレッドが release() を呼び出してロックの状態をアンロックに変更するまでブロックします。その後、 acquire() 呼び出しは状態を再度ロックに設定してから処理を戻します。 release() メソッドを呼び出すのはロック状態のときでなければなりません; このメソッドはロックの状態をアンロックに変更して、即座に処理を戻します。 アンロックの状態のロックを解放しようとすると RuntimeError が送出されます。
ロックは コンテキストマネージメントプロトコル もサポートします。
複数のスレッドにおいて acquire() がアンロック状態への遷移を待っているためにブロックが起きている時に release() を呼び出してロックの状態をアンロックにすると、一つのスレッドだけが処理を進行できます。 どのスレッドが処理を進行できるのかは定義されておらず、実装によって異なるかもしれません。
全てのメソッドはアトミックに実行されます。

という記述を見つけました。
ふむふむ、Lockを取得できそうです。

他にも

RLock オブジェクト
再入可能ロック (reentrant lock) とは、同じスレッドが複数回獲得できるような同期プリミティブです。再入可能ロックの内部では、プリミティブロックの使うロック/アンロック状態に加え、 "所有スレッド (owning thread)" と "再帰レベル (recursion level)" という概念を用いています。ロック状態では何らかのスレッドがロックを所有しており、アンロック状態ではいかなるスレッドもロックを所有していません。
このロックの状態をロックにするには、スレッドがロックの acquire() メソッドを呼び出します。このメソッドはスレッドがロックを所有すると処理を戻します。ロックの状態をアンロックにするには release() メソッドを呼び出します。 acquire() / release() からなるペアの呼び出しはネストできます; 最後に呼び出した release() (最も外側の呼び出しペアの release()) だけがロックの状態をアンロックにリセットして、 acquire() でブロック中の別のスレッドの処理を進行させることができます。
再入可能ロックは コンテキストマネージメントプロトコル もサポートします。

というRLockオブジェクトもありますが、再入可能ロックは今回は除外なので、先ほどのLockオブジェクトで良いでしょう。

Lockが取得できるとして、次の課題は「どの範囲を排他制御すれば良いか?」です。

広範囲を排他制御すると、処理は簡単ですが、下手をすると長時間スレッドをロックしてしまうことになり、パフォーマンスが落ちます。
また、範囲を狭くしすぎると、排他制御の役目を果たさない可能性もあります。

この辺りははっきり言って「感覚」に頼る部分もあります。

それから、過去にbitflyerのwebsocketを使って板情報を取得した時にinsert,update,deleteが期待する順番で取得できずに、内部で保持していたindexが崩れてしまうことがしばしばあったのを思い出しました。

ということで、さらに念のためにインデックスの再構築時には一旦インデックスをクリアしてゴミが残らないようにしましょう。

インデックスを再構築している箇所に

# indexをクリアする
self.itemIdxs[table].clear()

を追加するで良いでしょう。

これだけやれば大抵のことはOKなはず。(と、思いたい)

で、、、運転再開しましたが、

NG!!!

どうしても板情報(orderbook)が変な値になります。

・スレッドセーフの方法が間違っているのか?
・インデックスの再構築に何か問題があるのか?
・partial, insert, update, deleteで取得されるデータに欠損があるのか?

について散々悩み、過去に私がnode.jsで作った板情報取得プログラムを参考にしながら改修を続け、気づいてみたらほとんど作り変えていました(笑)
コア部分なんて原型をとどめていない。
でもまあ、一応ちゃんと動いているっぽい。

あー、疲れた。。

念には念を入れて、少し試運転してからプログラムを公開しようかと考えています。

ご期待いただけたら「スキ」ボタンをポチっと押していただけると、嬉しいです。

楽しいbotライフを!





ソフトウェア・エンジニアを40年以上やってます。 「Botを作りたいけど敷居が高い」と思われている方にも「わかる」「できる」を感じてもらえるように頑張ります。 よろしくお願い致します。