スクリーンショット_2019-09-07_2

複数ロジックでWebsocketの接続を共有したい[Python][シングルトン?][asyncio]

こんにちわ、ニッケルメッキです。
最近BFSというbotフレームワークが仮想通貨界隈では流行っていますね。
どのような機能がBFSに含まれているかというと、複数のロジックを単独のbitFlyer口座で稼働することができたりするそうです。
弊ガチシステムでもBFSに含まれると思われる機能を妄想で取り入れようと努力している次第です。
さて、似たようなフレームワークの作成に取り掛かると自動取引botのスタート地点とも言えるデータ受信でつまづいてしまいました。

データ受信の具体的な要件としては、下記のようになります。
・受信は一つのインスタンスで行う
・受信データを使用するbotは複数
・全てのbotにおいて、受信データは全く同じ、かつ極力同期的

解決策としては、ソケット通信やJSONRPCサーバーをローカルで構築するなどを考えましたが、速度的にもリソース的にもコストが大きく、そもそも勝てないbotになってしまいそうです。

そこで、美味いことbotにデータを参照させるにはどうしたら良いかな、、クラス変数使うのかな?と物欲しげに考えを呟いていたところ、以下のソリューションをご紹介いただきました。

シングルトン? ちょっとわかりませんが、今回の用途にはうってつけのようです。
インスタンスが唯一であることを保証する実装になるそうで、Wsbsocketの受信を1本に限定する要件とも一致していて最高です。

早速、シングルトンになっているか不透明ですが試験的にコードを書いてみました。以下コード(Python 3) ※修正しました

import aiohttp
import asyncio
from aiohttp import WSMsgType
import json


# シングルトンパターンWebsocketクラス
class SingletonWebsocket:
   _instance = None
   _callbacks = []

   def __new__(cls):
       raise NotImplementedError('Cannot initialize via Constructor')

   @classmethod
   def __internal_new__(cls):
       return super().__new__(cls)

   @classmethod
   def get_instance(cls):
       if cls._instance is None:
           cls._instance = cls.__internal_new__()

       return cls._instance

   @classmethod
   def set_callback(cls, callback):
       cls._callbacks.append(callback)

   @classmethod
   async def run(cls):
       while True:
           try:
               async with aiohttp.ClientSession() as session:
                   async with session.ws_connect('wss://ws.lightstream.bitflyer.com/json-rpc',
                                                 receive_timeout=10) as ws:
                       query = [{'method': 'subscribe',
                                 'params': {'channel': 'lightning_ticker_FX_BTC_JPY'}}]
                       await asyncio.wait([ws.send_str(json.dumps(query))])
                       async for response in ws:
                           if response.type != WSMsgType.TEXT:
                               print('response:' + str(response))
                               break

                           message = json.loads(response[1])['params']['message']['tick_id']
                           print('Receiver', message)
                           await asyncio.gather(asyncio.wait([func(message) for func in cls._callbacks]))

           except Exception as e:
               print(e)


# wsの購読クラス
class SubscriberFirst:

   def __init__(self):
       pass

   # ws受信用コールバック
   async def on_message(self, message):
       print('Subscriber 1', message)


class SubscriberSecond:

   def __init__(self):
       pass

   async def on_message(self, message):
       print('Subscriber 2', message)


# Singletonインスタンスの生成
ws = SingletonWebsocket.get_instance()
print(ws)
ws = SingletonWebsocket.get_instance()
print(ws)
# Singletonになっているか。インスタンスはひとつだけなので良し
# <__main__.SingletonWebsocket object at 0x00000207AFBBFE10>
# <__main__.SingletonWebsocket object at 0x00000207AFBBFE10>

# Subscriberインスタンスの生成、コールバックの登録
subscriber_first = SubscriberFirst()
subscriber_second = SubscriberSecond()
ws.set_callback(callback=subscriber_first.on_message)
ws.set_callback(callback=subscriber_second.on_message)

# メインループ ws受信開始
loop = asyncio.get_event_loop()  # イベントループを取得
loop.run_until_complete(ws.run())  # イベントループの開始

# Result ws一本に対して、subscriberが複数受信しているので良し!
# >> Receiver 27237855
# >> Subscriber 1 27237855
# >> Subscriber 2 27237855

仕様としては、SingletonWebsocket(たぶんシングルトン)クラスでWebsocketを実装し、インスタンスを作成後、レシーバークラスへインスタンスを渡しイベントループで受信を開始します。

どうやら上手く行っているようです。きっとそう。
何かの役にたてば幸いです。

と思ったのですが、下記のご意見のとおり足りなかったり間違っていたようです。

orz..
そのため、上記のコードを見直し、下記の点を訂正いたしました。

・run()の呼び出しを1回へ変更、callbackを複数呼び出し
・get_instanceがgetしてない点を修正
・__init__を削除
・他、コメントを追加

訂正したものはきっと大丈夫と信じたいです。
そして、以下の点に気づきました。

Websocketの接続ごとにシングルトンでクラス作る方が安全面では良いのかもしれないけども、ちょっとめんどくさいかもしれない。
なので、普通にWebsocketクラスにコールバックを複数登録する実装だけで良い気はします。

更に、pythonのシングルトンの実装にはモジュールが便利ということのようですw

一瞬にして知見が集まるTwitter、すごい。
また、間違いがあったらご指摘お願いいたします!

この記事が気に入ったら、サポートをしてみませんか?気軽にクリエイターを支援できます。

19

ニッケルメッキ

相場好きです!それだけ。

仮想通貨

3つ のマガジンに含まれています
コメントを投稿するには、 ログイン または 会員登録 をする必要があります。