見出し画像

bitFlyerFXの板情報を1秒おきに取得し、CSV・DBに保存し、Pandasデータフレームとして出力するPython3スクリプト

目次

はじめに
本noteが提供するコード情報
CSV保存版 (csv_board.py)
DB (SQLite3) 保存版 (db_board.py)
おまけ:板情報の可視化 (display_board.py)

はじめに

トレードbot作成の戦略構築・バックテスト・検証に、約定履歴に加え、板情報が必要になるケースがあります。bitFlyerのAPIにより約定履歴は1ヶ月分が提供されているのですが、板情報は現在時点でのデータしか提供されていません。そのため、板情報は自前で取得・蓄積する必要があります。

本noteが提供するコード情報

本noteでは、REST APIを用いて1秒おきに板情報を取得し、CSVもしくはDB(SQLite3)へと保存し、Pandasのデータフレームとして出力するためのPython3スクリプトを提供します。CSV版・DB版、どちらも50行ほどのコードです。また、おまけとして、JSON RPCを用いて板情報をコンソール上で可視化するためのスクリプトも掲載しています。

画像1

また、約定履歴の取得についても別のnoteにて紹介しています。

CSV保存版 (csv_board.py)

※こちらはQiitaの記事、板情報分析入門~ビットコインの板情報を入手し解析してみた~を参考にしています(誤り修正・コメント追記を行った以外はほぼそのままです)。

画像2

標準以外のライブラリはrequests, pandasさえインストールされていれば動きます。

#!/usr/bin/python3
import requests
import json
from datetime import datetime as dt
import time
import pandas as pd
import os

base_url = 'https://api.bitflyer.jp/v1/board?product_code='
pair = 'FX_BTC_JPY'
data = requests.get(base_url + pair, timeout=5)
SLEEP_T = 1 # スリープ秒数
DEPTH = 100 # 取得する板の深さ

def get_board(): # boardデータ取得関数
    data = requests.get(base_url + pair, timeout=5)
    board = json.loads(data.text) # JSONデータを辞書データへ変換
    dict_data = {'time': dt.now()} # 現在時刻の取得
    dict_data.update({'mid_price': board['mid_price']})
    dict_data.update({'ask_price_{}'.format(i): board['asks'][i]['price'] for i in range(DEPTH)})
    dict_data.update({'ask_size_{}'.format(i) : board['asks'][i]['size']  for i in range(DEPTH)})
    dict_data.update({'bid_price_{}'.format(i): board['bids'][i]['price'] for i in range(DEPTH)})
    dict_data.update({'bid_size_{}'.format(i) : board['bids'][i]['size']  for i in range(DEPTH)})
    return pd.Series(dict_data) # 辞書データをPandas Seriesへ変換

def get_btc_board(): # 1秒待ってのループ処理
    init_time = dt.now() # 現在時刻の取得
    end_time = dt.now() # 現在時刻の取得
    print('START /', init_time)
    main_list = [] # Pandasデータフレームの元となるリストを初期化
    while (end_time - init_time).seconds < 3600: # データ保存の時間[s] # 1hで1つのdfを作成
        try:
            dict_data = get_board() # JSONデータをPandas Seriesへ変換
            main_list.append(dict_data) # Pandas Seriesをリストへ追加
        except Exception as e:
            print('exception: ', e.args)
        time.sleep(SLEEP_T)
        end_time = dt.now() # 現在時刻の取得
    df_data = pd.concat(main_list, axis=1).T # Pandas Seriesのリストを結合・転置し、キー名をデータフレームの列名とする
    df_data.to_csv('./data/bitflyerfx_hourly_board_day_{}_init_{}_{}_end_{}_{}.csv'.format(init_time.day, init_time.hour, init_time.minute, end_time.hour, end_time.minute))
    print('END /', end_time)

if __name__ == '__main__':
    if not os.path.exists('./data'):
        os.mkdir('./data') # dataディレクトリが無ければ作成する
    while True:
        try:
            get_btc_board()
        except Exception as e:
            print('exception: ', e.args)

コメントをできるだけ丁寧に記述したため、コメントとコードを追っていただければ、挙動は理解していただけると思います。

以下、DB保存部分・pandasデータフレーム出力部分は有料パートとなります。※note売上は良質な情報を発信している方のnote購入・サポートに充てさせていただきます。

画像3

ここからは有料パートです。まずはご購入いただき、どうもありがとうございます。今後、的を絞った、より良い情報を発信していくための励みになります。引き続きよろしくお願いします。

DB (SQLite3) 保存版 (db_board.py)

標準以外のライブラリはrequests, pandas, sqlite3さえインストールされていれば動きます。

#!/usr/bin/python3
import requests
import json
from datetime import datetime as dt
import time
import pandas as pd
import pandas.io.sql as psql
import sqlite3

SLEEP_T = 1 # スリープ秒数
DEPTH = 100 # 取得する板の深さ

def create_table(con, cur):
    sql = 'CREATE TABLE IF NOT EXISTS boards (timestamp datetime, mid_price int, '
    for i in range(DEPTH):
        sql += 'ask_price_{} int, '.format(i)
    for i in range(DEPTH):
        sql += 'ask_size_{} real, '.format(i)
    for i in range(DEPTH):
        sql += 'bid_price_{} int, '.format(i)
    for i in range(DEPTH):
        sql += 'bid_size_{} real, '.format(i)
    cur.execute(sql[:-2] + ')') # 末尾のカンマを削り括弧を閉じ、SQLを実行

def get_board(con, cur): # 板情報データ取得&DBインサート関数
    board = json.loads(requests.get('https://api.bitflyer.jp/v1/board?product_code=FX_BTC_JPY', timeout=5).text) # JSONデータを辞書データへ変換
    sql = 'INSERT INTO boards values ("' + str(dt.now()) + '", ' + str(board['mid_price']) + ', '
    for i in range(DEPTH):
        sql += str(board['asks'][i]['price']) + ', '
    for i in range(DEPTH):
        sql += str(board['asks'][i]['size']) + ', '
    for i in range(DEPTH):
        sql += str(board['bids'][i]['price']) + ', '
    for i in range(DEPTH):
        sql += str(board['bids'][i]['size']) + ', '
    cur.execute(sql[:-2] + ')') # 末尾のカンマを削り括弧を閉じ、SQLを実行
    con.commit()

if __name__ == '__main__':
    con = sqlite3.connect('board.db')
    cur = con.cursor()
    create_table(con, cur) # テーブルの作成
    while True:
        try:
            get_board(con, cur)
            df = psql.read_sql('SELECT max(timestamp), mid_price, ask_price_0, ask_size_0, bid_price_0, bid_size_0 FROM boards;', con) # DBからPandasデータフレーム取得
            print(df) # Pandasデータフレームの表示サンプル
            time.sleep(SLEEP_T) # 指定時間だけスリープ
        except Exception as e:
            print('exception: ', e.args)

(2019/01/29 22:41追記) 本note掲載のコードでは、板の深さごとにテーブルの列を作成しており、JSONとして得られたデータをそのままの形で保存しています。しかし、皆様ご自身の利用用途に合わせて、お好みでテーブルをtimestamp, side, depth, price, sizeのような構成にしても良いかと思います。親と子の2テーブルに分割するなども一案でしょう。

また、DBの検索パフォーマンスに問題がでるようであれば、必要に応じてカラムにインデックスを張ると良いかと思います。

おまけ:板情報の可視化 (display_board.py)

#!/usr/bin/python3
import json
import websocket
from datetime import datetime as dt

def my_on_open(ws):
    ws.send('{"method":"subscribe","params":{"channel":"lightning_board_snapshot_FX_BTC_JPY"}, "jsonrpc":"2.0","id":0}')

def my_on_close(ws):
    print('[SYSTEM:on_close]')

def my_on_error(ws, error):
    print('[SYSTEM:on_error]')
    print(error)

def my_on_message(ws, message):
    json_data = json.loads(message)['params']['message']
    print("="*50)
    print(dt.now())
    
    print("Asks:")
    for m in json_data['asks'][9::-1]:
        print("Price:", int(m['price']), ":", round(m['size']/0.1)*"#")
        
    print("MidPrice:")
    print("Price:", int(json_data['mid_price']), "/ Sprd:", int(json_data['asks'][0]['price']-json_data['bids'][0]['price']))
    
    print("Bids:")
    for m in json_data['bids'][0:10]:
        print("Price:", int(m['price']), ":", round(m['size']/0.1)*"#")

ws = websocket.WebSocketApp("wss://ws.lightstream.bitflyer.com/json-rpc", on_message=my_on_message, on_open=my_on_open, on_close=my_on_close, on_error=my_on_error)
ws.run_forever()

エラーが出る場合は、websocket-clientが入っていないか、新しすぎる場合があります。「pip list」で確認すると、websocket-clientが見当たらないか、0.54.0などになっているのではないでしょうか。「sudo pip install websocket-client==0.47.0」で0.47.0をインストールしてください。

以上となります。本noteに関する質問についてはTwitter(@akagami_v2)のDMにて、できる限りはお受けしたいと思います。では、良いトレードbot作成ライフをお送りください!

最後まで読んでいただき、どうもありがとうございます。頂いたサポートは、良質な情報を発信している方のnote購入・サポートに充てさせていただきます。