見出し画像

matplotlibとasyncioでリアルタイムティックチャート(bitFlyer FXBTC/JPY)を描画[Python][asyncio][multiprocessing][queue][SQLite3]

こんばんわ、ニッケルメッキです。
最近noteも書いていないこともあり、個人的な勉強のため書いたチャートクラスを晒させて頂きます。
データソースはbitFlyerのFXBTCJPYとなっており、設定は関数のなかに書き込んであるなど適当な仕様となってますが、その分わかりやすいと思います。
というより、私が複雑なことがわかりません笑
コードを見てもわかる通り、玄人志向ではありませんのでPython初心者のかたなどに見て貰えれば幸いです。
コードの一部にnoa(Twitter垢@crptoresearcher)さんのコードを引用させて頂きましたので、そこら辺のほうが参考になるかもしれません。

仕様としては、イベントループ内でJSONRPCを購読し、ティック更新時にグラフをmatplotlib(GUIです)で描画します。
描画される内容は、LTP、最良気配値BID、最良気配値ASK、移動平均(チャートの表示期間)になります。

以下コード
※ライブラリなどはインストールしてください。

[更新]
2019/4/6 SQLite3を使用したレシーバーとプロットを別個に書いたバージョンを掲載しました[executionsのみ]
2019/4/6 multiprocessingとqueueを使用したバージョンを追加しました

from matplotlib import pyplot as plt
import numpy as np
import aiohttp
import asyncio
from aiohttp import WSMsgType
import json
from collections import deque
import traceback
from datetime import datetime

class RealTimeChart:
    length = 500  # チャートの期間

    # キューの宣言
    x = deque(maxlen=length)  # x軸(ティックカウント)
    y = deque(maxlen=length)  # y軸(LTP)
    ma = deque(maxlen=length)  # y軸(移動平均)
    bid = deque(maxlen=length)  # y軸(最良気配値BID)
    ask = deque(maxlen=length)  # y軸(最良気配値ASK)

    def __init__(self):
        # イベントループ
        loop = asyncio.get_event_loop()
        tasks = asyncio.wait([self.receive_from_rpc()])  # タスクの設定
        loop.run_until_complete(tasks)  # イベントループ開始

    # ティッカーの購読[Twitter→@crptoresearcherさんの関数を引用]
    async def receive_from_rpc(self):
        # URL クエリパラメータの設定
        uri = 'wss://ws.lightstream.bitflyer.com/json-rpc'
        query = {'method': 'subscribe',
                 'params': {'channel': 'lightning_ticker_FX_BTC_JPY'}}
        while True:
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.ws_connect(uri,
                                                  receive_timeout=10) as client:
                        await asyncio.wait([client.send_str(json.dumps(query))])
                        async for response in client:
                            if response.type != WSMsgType.TEXT:
                                print('response:' + str(response))
                                break

                            data = json.loads(response[1])['params']['message']

                            # チャート呼び出し
                            self.plot(data)

            except Exception as e:
                traceback.print_exc()

    # チャートの描画
    def plot(self, data):
        # レスポンスデータの整形
        id = int(data['tick_id'])  # ID
        ltp = int(data['ltp'])  # LTP
        best_bid = int(data['best_bid'])  # 最良気配値BID
        best_ask = int(data['best_ask'])  # 最良気配値ASK

        # レスポンスデータをキューへ格納
        self.x.append(id)  # ティックID
        self.y.append(ltp)  # LTP
        self.bid.append(best_bid)  # 最良気配値BID
        self.ask.append(best_ask)  # 最良気配値ASK
        self.ma.append(int(np.mean(self.y)))  # 移動平均

        # チャート描画
        plt.ion()
        plt.cla()  # チャートを初期化
        plt.title('FXBTC/JPY')  # グラフタイトル
        plt.xlabel('ID')  # x軸ラベル
        plt.ylabel('Price')  # y軸ラベル
        plt.subplots_adjust(left=0.1, right=0.95,
                            bottom=0.1, top=0.95)  # スペース
        plt.plot(self.x, self.y, color='blue',
                 linewidth = 2.0, linestyle='solid')  # LTP
        plt.plot(self.x, self.bid, color='magenta',
                 linewidth = 1.0, linestyle='dashed')  # 最良気配値BID
        plt.plot(self.x, self.ask, color='green',
                 linewidth = 1.0, linestyle='dashed')  # 最良気配値ASK
        plt.plot(self.x, self.ma, color='red',
                 linewidth = 1.0, linestyle='solid')  # 移動平均
        plt.draw()  # 描画
        plt.pause(0.01)

if __name__ == '__main__':
    RealTimeChart()

[別バージョン(レシーバー+プロッタ SQLiteでデータ共有)]
レシーバーを起動してから、プロッタを起動する形になります。マルチプロセスは使用していません。

# receiver.py

import aiohttp
import asyncio
from aiohttp import WSMsgType
import json
import traceback
from datetime import datetime
import sqlite3


class Receiver:

    def __init__(self):
        # SQLite DB 初期化
        try:
            self.connect = sqlite3.connect('executions.db')
            self.cursor = self.connect.cursor()
            query = 'CREATE TABLE IF NOT EXISTS executions(id INTEGER PRIMARY KEY, price INTEGER NOT NULL)'
            self.cursor.execute(query)
            self.cursor.execute('PRAGMA journal_mode=MEMORY')
            self.connect.commit()

        except Exception as e:
            import sys
            traceback.print_exc()
            sys.exit(1)

        # イベントループ
        loop = asyncio.get_event_loop()
        tasks = asyncio.wait([self.receive_from_rpc(),
                              self.heartbeat()])  # タスクの設定
        loop.run_until_complete(tasks)  # イベントループ開始

    def __del__(self):
        self.delete()
        self.cursor.close()
        self.connect.close()

    async def heartbeat(self):
        print('Receiver running ...')
        await asyncio.sleep(60)

    # ティッカーの購読[Twitter→@crptoresearcherさんの関数を引用]
    async def receive_from_rpc(self):
        # URL クエリパラメータの設定
        uri = 'wss://ws.lightstream.bitflyer.com/json-rpc'
        query = {'method': 'subscribe',
                 'params': {'channel': 'lightning_executions_FX_BTC_JPY'}}
        while True:
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.ws_connect(uri,
                                                  receive_timeout=10) as client:
                        await asyncio.wait([client.send_str(json.dumps(query))])
                        async for response in client:
                            if response.type != WSMsgType.TEXT:
                                print('response:' + str(response))
                                break

                            data = json.loads(response[1])['params']['message']

                            # レスポンスを処理
                            self.update(data)

            except KeyboardInterrupt:
                break

            except Exception as e:
                traceback.print_exc()

    def update(self, executions):
        self.delete()

        # レスポンスデータをDBへ格納
        query = 'INSERT INTO executions VALUES'
        for execution in executions:
            query += '({}, {}),'.format(int(execution['id']), int(execution['price']))
        query = query[:-1]

        try:
            self.cursor.execute(query)
            self.connect.commit()

        except Exception as e:
            traceback.print_exc()

    def delete(self):
        # EXECUTIONS TABLEを初期化
        try:
            self.cursor.execute('DELETE FROM executions')
            self.connect.commit()

            self.cursor.execute('VACUUM')
            self.connect.commit()

        except Exception as e:
            traceback.print_exc()

if __name__ == '__main__':
    Receiver()
# realtime_plot.py

from matplotlib import pyplot as plt
import matplotlib.animation as animation
from matplotlib.animation import PillowWriter
import numpy as np
from collections import deque
import traceback
import sqlite3


class RealTimePlot:
    length = 100  # チャートの期間

    # キューの宣言
    x = deque(maxlen=length)  # x軸(ティックカウント)
    y = deque(maxlen=length)  # y軸(LTP)

    def __init__(self):
        # SQLite DB 初期化
        try:
            self.connect = sqlite3.connect('executions.db')
            self.cursor = self.connect.cursor()
            query = 'SELECT COUNT(*) FROM sqlite_master WHERE type="table" AND name="executions"'
            self.cursor.execute(query)
            # TABLEが存在しない場合は例外をスロー
            if self.cursor.fetchone()[0] is False:
                raise Exception

            # 変数の初期化
            self.x.append(0)
            self.y.append(0)

            self.plot()

        except Exception as e:
            import sys
            traceback.print_exc()
            sys.exit(1)

    def __del__(self):
        self.cursor.close()
        self.connect.close()

    # チャートの描画
    def plot(self):
        # EXECUTIONS TABLEの読み込み
        def update(data):
            try:
                plt.cla()
                self.cursor.execute('SELECT * FROM executions')
                id = list(self.x)[-1]
                for row in self.cursor:
                    if id < row[0]:
                        self.x.append(row[0])
                        self.y.append(row[1])

                im = plt.plot(self.x, self.y)

            except KeyboardInterrupt:
                return

            except Exception as e:
                traceback.print_exc()

        fig = plt.figure()
        # frames = list(range(200))
        ani = animation.FuncAnimation(fig, update, interval=10)
        # ani = animation.FuncAnimation(fig, update, frames=frames, interval=1)
        # ani.save('executions.gif', writer = 'pillow')
        plt.show()

if __name__ == '__main__':
    RealTimePlot()

[別バージョン2(multiproseccing queueでデータ共有)]
マルチプロセスとキューを使用して描画する方法になります。
windowsではこのまま動作しますが、Mac OSX Sierra以降ではマルチプロセッシング時にmatplotlibでanimationやpauseを行うとエラーが出ます。
.bash_profileに下記を追加することで動作します。
export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
それでもエラーは出ますが一応動きます。

from multiprocessing import Process, Queue
import aiohttp
import asyncio
from aiohttp import WSMsgType
import json
import traceback
from matplotlib import pyplot as plt
import matplotlib.animation as animation
from matplotlib.animation import PillowWriter
from collections import deque

# 受信プロセス[キュー送信]
def receiver(queue):
    async def run():
        # URL クエリパラメータの設定
        uri = 'wss://ws.lightstream.bitflyer.com/json-rpc'
        query = {'method': 'subscribe',
                 'params': {'channel': 'lightning_executions_FX_BTC_JPY'}}
        while queue.empty():
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.ws_connect(uri,
                                                  receive_timeout=10) as client:
                        await asyncio.wait([client.send_str(json.dumps(query))])
                        async for response in client:
                            if response.type != WSMsgType.TEXT:
                                print('response:' + str(response))
                                break

                            queue.put(response[1])

            except Exception as e:
                traceback.print_exc()

    # イベントループ
    loop = asyncio.get_event_loop()
    tasks = asyncio.wait([run()])
    loop.run_until_complete(tasks)

# 表示プロセス[キュー受信]
def plotter(queue):
    # 描画期間
    length = 100

    # 両端キューの宣言
    x = deque(maxlen=length)  # x軸(ID)
    y = deque(maxlen=length)  # y軸(価格)

    def update(data):
        try:
            plt.cla()
            while not queue.empty():
                executions = json.loads(queue.get())['params']['message']
                for execution in executions:
                    x.append(int(execution['id']))
                    y.append(int(execution['price']))

            im = plt.plot(x, y)

        except Exception as e:
            traceback.print_exc()

    figure = plt.figure()
    ani = animation.FuncAnimation(figure, update, interval=10)
    plt.show()
    """
    # アニメーションを保存する場合はpip install pillowしないと動かないかもしれない
    # アニメーション保存後デッドロックするっぽいのでプロセスをkillする処理が必要
    frames = list(range(300))
    ani = animation.FuncAnimation(figure, update, frames=frames, interval=1)
    ani.save('executions.gif', writer = 'pillow')
    """

if __name__ == '__main__':
    queue = Queue()
    receiver_process = Process(target=receiver, args=(queue,))
    plotter_process = Process(target=plotter, args=(queue,))
    try:
        receiver_process.start()
        plotter_process.start()

    except Exception as e:
        receiver_process.terminate()
        plotter_process.terminate()

        receiver_process.join()
        plotter_process.join()
        traceback.print_exc()


この記事が気に入ったら、サポートをしてみませんか?気軽にクリエイターを支援できます。

33

ニッケルメッキ

相場好きです!それだけ。

仮想通貨

2つ のマガジンに含まれています
コメントを投稿するには、 ログイン または 会員登録 をする必要があります。