見出し画像

ZeroMQメッセージのサブスクライブおよびリスニング

この記事はSymbol/NEMのコア開発者であるgimreさんの記事「Subscribing and listening to ZeroMQ messages」を機械翻訳したものです。


1 ZeroMQメッセージのサブスクライブおよびリスニング

blockexplorerが新しいブロックをどのように表示するか、新しい取引がウォレットにどのように表示されるか、不思議に思ったことはありませんか?
(すべてのウォレットが対応しているわけではありませんが、通常、少なくともマルチシグの表示には対応しています。)

RESTサーバー経由で公開されたウェブソケットに接続し、特定の「トピック」を購読するのです。

developer tools

RESTサーバーはこの情報をどこから取得するのでしょうか?詳細はさておき、RESTはプロキシとして動作し、catapult.brokerプロセスから公開されたZeroMQエンドポイントに接続します。

この短い記事では、中間業者をスキップしてブローカーが伝えることを直接聞く方法を紹介します。

旅立ちの準備

もしあなたがノードを実行していないなら、問題は - なぜしないのか?

もし実行しているのであれば、おそらく典型的な「デュアル」ノード(ピア+APIノード)を実行しており、 docker-compose.yml には少なくともこの4つのサービス(コンテナ)が含まれているはずです。

  • db - このコンテナで Mongo データベースを実行する。

  • node - デュアルモードでセットアップされた実際のCatapultクライアントを実行しているコンテナです。

  • broker - このコンテナは、ブローカープロセスを実行します。

  • rest-gateway - node.jsのRESTレイヤーを実行します。

docker-compose.ymlでは同じネットワークを使っているので、RESTはbrokerと会話することができますが、brokerと会話したい場合は、2つのオプションがあります。

 broker:
       user: '1000:1000'
       container_name: broker
       image: symbolplatform/symbol-server:gcc-1.0.3.5
       working_dir: /symbol-workdir
       ports:
           - '127.0.0.1:7902:7902'                 <===============================================
       command: /bin/bash /symbol-commands/start.sh /usr/catapult ./data broker server broker NORMAL
       stop_signal: SIGINT
       restart: on-failure:2
       volumes:
           - ../nodes/node:/symbol-workdir:rw
           - ./server:/symbol-commands:ro
       depends_on:
           - db

ブローカーを全世界に公開するのはあまり良いアイデアではないので、ループバック (127.0.0.1) インターフェースにのみブローカーを公開することに注意してください。

もう会話もない

Pythonを使う予定です。現在の開発ブランチはブロックヘッダをうまく読み込めるからですが、ZMQバインディングがある言語なら何でも大丈夫です。

初期設定

ZMQ のバインディングは zmq モジュールにあります (python3 -m pip install zmq)。ブローカーへの接続は簡単です。ブローカーは pub-sub メッセージングパターンを使用しており、作成したリスナーは SUB リスナーである必要があります。

import zmq

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:7902")

そして、それだけで十分なのです。

トピックを購読する

今度は別のトピックを購読する番です。

注意: おそらく、すべてのトピックではなく、いくつかのトピックにしか興味を持たれないでしょう。ここでは、概要を説明するために、いくつかのトピックを選びました。

block_marker = unhexlify('9FF2D8E480CA6A49')[::-1]
finalized_marker = unhexlify('4D4832A031CE7954')[::-1]
drop_marker = unhexlify('5C20D68AEE25B0B0')[::-1]
transaction_marker = b'a'  # 0x61
ut_add_marker = b'u'  # 0x75

socket.setsockopt(zmq.SUBSCRIBE, block_marker)
socket.setsockopt(zmq.SUBSCRIBE, finalized_marker)
socket.setsockopt(zmq.SUBSCRIBE, drop_marker)

socket.setsockopt(zmq.SUBSCRIBE, transaction_marker)
socket.setsockopt(zmq.SUBSCRIBE, ut_add_marker)

あのマークはどこから出ているのですか?というご質問ですね。テクニカルリファレンスの17章に文書化しました。メッセージングです。

公開メッセージの解析

パースは非常に簡単で、以下のようにSDKの型やオブジェクトが使用されていますが、これは単に表示を少しきれいにするためです。

while True:
	topic = socket.recv()
	if block_marker == topic:
		block_header = socket.recv()
		entity_hash = Hash256(socket.recv())
		generation_hash = Hash256(socket.recv())
		header = BlockFactory.deserialize(block_header)
		print(f'block height: {header.height} ({header.height.value}) entity_hash {entity_hash} generation_hash {generation_hash}')
		print(f'block harvested by: {header.signer_public_key} {facade.network.public_key_to_address(header.signer_public_key)}')
	elif finalized_marker == topic:
		body_part_1 = socket.recv()
		finalization_round = int.from_bytes(body_part_1[0:8], byteorder='little')
		finalizated_height = int.from_bytes(body_part_1[8:16], byteorder='little')
		entity_hash =  Hash256(body_part_1[16:])
		print(f'FINALIZED height: {finalization_round} ({finalizated_height}) entity_hash {entity_hash}')
	elif drop_marker == topic:
		body_part_1 = socket.recv()
		height = int.from_bytes(body_part_1[0:8], byteorder='little')
		print(f'drop after height: {height}')
	elif ut_add_marker[0] == topic[0] or transaction_marker[0] == topic[0]:  # mind [0]
		message = 'UT add' if ut_add_marker[0] == topic[0] else 'transaction add'
		# rest of the topic contains address
		address = SymbolFacade.Address(topic[1:])
		transaction = socket.recv()
		entity_hash = Hash256(socket.recv())
		merkle_component_hash = Hash256(socket.recv())
		body_part_1 = socket.recv()
		height = int.from_bytes(body_part_1[0:8], byteorder='little')
		print(f'{message} {address} {entity_hash} {height}')
	else:
		print("unknown [ %d %s ]" % (len(topic), topic))

1つのトピックに対して何回socket.recv()を呼び出すかは、明確でない場合があります。

これは実際のメッセージがどのように構成されているかによりますが、私たちはそれを文書化しています。

block message layout
finalized block message layout
transaction add message (mempool)

注目すべきは、確定したブロックメッセージのデータが、すべて1つの「パケット」に収まっていることです。

そして、ここまで来たなら、もう一つ特筆すべきことがある。前のセクションで、私は「一般的な」トランザクションのトピックを購読していた。

ソケット.setsockopt(zmq.SUBSCRIBE, transaction_marker)を使っていました。

transaction_marker = b'a'
address = SymbolFacade.Address('NCHVMMCVPZGUWZTWTLNH46OFRM2QIPILE4SKZEA')

scoped_address_marker = transaction_marker + address.bytes
socket.setsockopt(zmq.SUBSCRIBE, scoped_address_marker)

最後に、少し重要な詳細: BlockFactory を使用して block_header を逆シリアル化していますが、

コード、出力、その他のランダムなコメント

全コードはこちらでご覧いただけます: https://gist.github.com/gimre-xymcity/718cc15d9e9c3ff48a493bcfb7986834 

出力の一部を見てみましょう。

block height: 0x00000000001BB327 (1815335) entity_hash C8E617712A81D5DC13DC36C699E5DCA42C11972120E7A4DB37187E9C22941FE3 generation_hash FFFED500BCB2522FA892265D4F379AFC55CD1A9155F8FBB2DB43D30C5C99DD82
  block harvested by: 44E0DB9EC1FF08C392AAC8A2A787E68C2C8F36324E1065D90D976576580E7EA6 NCE5QOGVUM6ZHJIYXTA6NHZYJUHMBNRNVMG2L4I
+ transaction add NA6JCCGCVLTNCXFP6ZZHCEKIQN252LEWQMULS5Q 09B00910A2A55ADAFD79AB9CD1170E14B792B01AA64EC10EF263A1AB982CFE89 1815335
+ transaction add NA2NFUHQWYIASA5BHFJBM6OBQDEZDI34RUMNDHA 09B00910A2A55ADAFD79AB9CD1170E14B792B01AA64EC10EF263A1AB982CFE89 1815335
+ transaction add NB67BYHT34LHNPCEPUVIIHPNXZE7FRTX5BHQJVA 09B00910A2A55ADAFD79AB9CD1170E14B792B01AA64EC10EF263A1AB982CFE89 1815335
  block height: 0x00000000001BB328 (1815336) entity_hash 9EE91BB4BFE409A11789D5FDD398384BC86C528208C50E21884B6139E14E874E generation_hash FFED290A09BA44F1352F93BE6AA5FEF6400C7D1D6A69EC2662B98F4B12A9EC28
  block harvested by: 68ADEC7660181CD266F97BABAB6C9905D0DD7F669C2B107BBFB68B98074CCB9B NDGGHWO5PXID32IPA2C3EZCIMC7WHOZTXZDLTYY

ここで観察できるのは、1つの通知が3つの公開メッセージになったということです(読みやすくするためにマークしてあります)。前の部分を最後まで読んでくださった方は、その理由がもうお分かりかもしれません。これら3つのメッセージはすべて、3つの異なるトピックで送信されているのです。なぜでしょうか?

答えは明白で、問題のトランザクションはアグリゲートトランザクションであり、ブローカーは関係するすべてのアカウントに通知する必要があるからです。

  • multisigアカウント:NA2NFUHQWYAA5BHFJBM6OBQDEZDI34RUMNDHA

  • 送信者(連帯保証人):NB67BYHT34LHNPCEUVIHPNXZE7FRTX5BHQJVA

  • 送信先アカウント:NA6JCCGCVULTNCXFP6ZHCEKIQN252LEWQMULS5Q

これは、大きなアグリゲート(または多くの共同署名者)になると少し騒がしくなります。これは、RESTレイヤーが「スコープ付き」トピック(つまりアドレスを含む)のみを購読することになっている理由にもなっています。

Mongo-less ブローカー (オプション / 楽しいもの)

何度も繰り返しているように、カタパルトは設定に関して非常に柔軟性があります。

zmqメッセージをスヌープするために、nodeをセットアップするのはとても簡単で、サーバとブローカーを動かしますが、mongoは使いません(RESTも使いません、mongoなしではREST層は全く意味がないからです)。

1 catapult.serverは通常のデュアルモードセットアップと同様に設定する必要があり、これはconfig-extensions-server.propertiesでfilespoolingとpartialtransaction拡張をtrueに設定することを意味します。

[extensions]
# api extensions
extension.filespooling = true
extension.partialtransaction = true
# addressextraction must be first because mongo and zeromq depend on extracted addresses
extension.addressextraction = false
extension.mongo = false
extension.zeromq = false
...

2 ブローカーは zeromq を必要とするが mongo は必要としない、 config-extensions-broker.properties

# addressextraction must be first because mongo and zeromq depend on extracted addresses
extension.addressextraction = true
extension.mongo = false
extension.zeromq = true

extension.hashcache = true

3 最後になりますが、docker-compose.ymlには、サーバーとブローカーサービス(コンテナ)用のエントリのみを記述してください。

補足すると、もし将来誰かがデータを別のデータベースにプッシュしたい場合、必要なのは適切な拡張機能を書くことだけで、それはブローカーを通してロードされる。

この記事が気に入ったらサポートをしてみませんか?