bitFlyerFXの板情報を1秒おきに取得し、CSV・DBに保存し、Pandasデータフレームとして出力するPython3スクリプト
目次
はじめに
本noteが提供するコード情報
CSV保存版 (csv_board.py)
DB (SQLite3) 保存版 (db_board.py)
おまけ:板情報の可視化 (display_board.py)
はじめに
トレードbot作成の戦略構築・バックテスト・検証に、約定履歴に加え、板情報が必要になるケースがあります。bitFlyerのAPIにより約定履歴は1ヶ月分が提供されているのですが、板情報は現在時点でのデータしか提供されていません。そのため、板情報は自前で取得・蓄積する必要があります。
本noteが提供するコード情報
本noteでは、REST APIを用いて1秒おきに板情報を取得し、CSVもしくはDB(SQLite3)へと保存し、Pandasのデータフレームとして出力するためのPython3スクリプトを提供します。CSV版・DB版、どちらも50行ほどのコードです。また、おまけとして、JSON RPCを用いて板情報をコンソール上で可視化するためのスクリプトも掲載しています。
また、約定履歴の取得についても別のnoteにて紹介しています。
CSV保存版 (csv_board.py)
※こちらはQiitaの記事、板情報分析入門~ビットコインの板情報を入手し解析してみた~を参考にしています(誤り修正・コメント追記を行った以外はほぼそのままです)。
標準以外のライブラリはrequests, pandasさえインストールされていれば動きます。
#!/usr/bin/python3
import requests
import json
from datetime import datetime as dt
import time
import pandas as pd
import os
base_url = 'https://api.bitflyer.jp/v1/board?product_code='
pair = 'FX_BTC_JPY'
data = requests.get(base_url + pair, timeout=5)
SLEEP_T = 1 # スリープ秒数
DEPTH = 100 # 取得する板の深さ
def get_board(): # boardデータ取得関数
data = requests.get(base_url + pair, timeout=5)
board = json.loads(data.text) # JSONデータを辞書データへ変換
dict_data = {'time': dt.now()} # 現在時刻の取得
dict_data.update({'mid_price': board['mid_price']})
dict_data.update({'ask_price_{}'.format(i): board['asks'][i]['price'] for i in range(DEPTH)})
dict_data.update({'ask_size_{}'.format(i) : board['asks'][i]['size'] for i in range(DEPTH)})
dict_data.update({'bid_price_{}'.format(i): board['bids'][i]['price'] for i in range(DEPTH)})
dict_data.update({'bid_size_{}'.format(i) : board['bids'][i]['size'] for i in range(DEPTH)})
return pd.Series(dict_data) # 辞書データをPandas Seriesへ変換
def get_btc_board(): # 1秒待ってのループ処理
init_time = dt.now() # 現在時刻の取得
end_time = dt.now() # 現在時刻の取得
print('START /', init_time)
main_list = [] # Pandasデータフレームの元となるリストを初期化
while (end_time - init_time).seconds < 3600: # データ保存の時間[s] # 1hで1つのdfを作成
try:
dict_data = get_board() # JSONデータをPandas Seriesへ変換
main_list.append(dict_data) # Pandas Seriesをリストへ追加
except Exception as e:
print('exception: ', e.args)
time.sleep(SLEEP_T)
end_time = dt.now() # 現在時刻の取得
df_data = pd.concat(main_list, axis=1).T # Pandas Seriesのリストを結合・転置し、キー名をデータフレームの列名とする
df_data.to_csv('./data/bitflyerfx_hourly_board_day_{}_init_{}_{}_end_{}_{}.csv'.format(init_time.day, init_time.hour, init_time.minute, end_time.hour, end_time.minute))
print('END /', end_time)
if __name__ == '__main__':
if not os.path.exists('./data'):
os.mkdir('./data') # dataディレクトリが無ければ作成する
while True:
try:
get_btc_board()
except Exception as e:
print('exception: ', e.args)
コメントをできるだけ丁寧に記述したため、コメントとコードを追っていただければ、挙動は理解していただけると思います。
以下、DB保存部分・pandasデータフレーム出力部分は有料パートとなります。※note売上は良質な情報を発信している方のnote購入・サポートに充てさせていただきます。
ここからは有料パートです。まずはご購入いただき、どうもありがとうございます。今後、的を絞った、より良い情報を発信していくための励みになります。引き続きよろしくお願いします。
DB (SQLite3) 保存版 (db_board.py)
標準以外のライブラリはrequests, pandas, sqlite3さえインストールされていれば動きます。
#!/usr/bin/python3
import requests
import json
from datetime import datetime as dt
import time
import pandas as pd
import pandas.io.sql as psql
import sqlite3
SLEEP_T = 1 # スリープ秒数
DEPTH = 100 # 取得する板の深さ
def create_table(con, cur):
sql = 'CREATE TABLE IF NOT EXISTS boards (timestamp datetime, mid_price int, '
for i in range(DEPTH):
sql += 'ask_price_{} int, '.format(i)
for i in range(DEPTH):
sql += 'ask_size_{} real, '.format(i)
for i in range(DEPTH):
sql += 'bid_price_{} int, '.format(i)
for i in range(DEPTH):
sql += 'bid_size_{} real, '.format(i)
cur.execute(sql[:-2] + ')') # 末尾のカンマを削り括弧を閉じ、SQLを実行
def get_board(con, cur): # 板情報データ取得&DBインサート関数
board = json.loads(requests.get('https://api.bitflyer.jp/v1/board?product_code=FX_BTC_JPY', timeout=5).text) # JSONデータを辞書データへ変換
sql = 'INSERT INTO boards values ("' + str(dt.now()) + '", ' + str(board['mid_price']) + ', '
for i in range(DEPTH):
sql += str(board['asks'][i]['price']) + ', '
for i in range(DEPTH):
sql += str(board['asks'][i]['size']) + ', '
for i in range(DEPTH):
sql += str(board['bids'][i]['price']) + ', '
for i in range(DEPTH):
sql += str(board['bids'][i]['size']) + ', '
cur.execute(sql[:-2] + ')') # 末尾のカンマを削り括弧を閉じ、SQLを実行
con.commit()
if __name__ == '__main__':
con = sqlite3.connect('board.db')
cur = con.cursor()
create_table(con, cur) # テーブルの作成
while True:
try:
get_board(con, cur)
df = psql.read_sql('SELECT max(timestamp), mid_price, ask_price_0, ask_size_0, bid_price_0, bid_size_0 FROM boards;', con) # DBからPandasデータフレーム取得
print(df) # Pandasデータフレームの表示サンプル
time.sleep(SLEEP_T) # 指定時間だけスリープ
except Exception as e:
print('exception: ', e.args)
(2019/01/29 22:41追記) 本note掲載のコードでは、板の深さごとにテーブルの列を作成しており、JSONとして得られたデータをそのままの形で保存しています。しかし、皆様ご自身の利用用途に合わせて、お好みでテーブルをtimestamp, side, depth, price, sizeのような構成にしても良いかと思います。親と子の2テーブルに分割するなども一案でしょう。
また、DBの検索パフォーマンスに問題がでるようであれば、必要に応じてカラムにインデックスを張ると良いかと思います。
おまけ:板情報の可視化 (display_board.py)
#!/usr/bin/python3
import json
import websocket
from datetime import datetime as dt
def my_on_open(ws):
ws.send('{"method":"subscribe","params":{"channel":"lightning_board_snapshot_FX_BTC_JPY"}, "jsonrpc":"2.0","id":0}')
def my_on_close(ws):
print('[SYSTEM:on_close]')
def my_on_error(ws, error):
print('[SYSTEM:on_error]')
print(error)
def my_on_message(ws, message):
json_data = json.loads(message)['params']['message']
print("="*50)
print(dt.now())
print("Asks:")
for m in json_data['asks'][9::-1]:
print("Price:", int(m['price']), ":", round(m['size']/0.1)*"#")
print("MidPrice:")
print("Price:", int(json_data['mid_price']), "/ Sprd:", int(json_data['asks'][0]['price']-json_data['bids'][0]['price']))
print("Bids:")
for m in json_data['bids'][0:10]:
print("Price:", int(m['price']), ":", round(m['size']/0.1)*"#")
ws = websocket.WebSocketApp("wss://ws.lightstream.bitflyer.com/json-rpc", on_message=my_on_message, on_open=my_on_open, on_close=my_on_close, on_error=my_on_error)
ws.run_forever()
エラーが出る場合は、websocket-clientが入っていないか、新しすぎる場合があります。「pip list」で確認すると、websocket-clientが見当たらないか、0.54.0などになっているのではないでしょうか。「sudo pip install websocket-client==0.47.0」で0.47.0をインストールしてください。
以上となります。本noteに関する質問についてはTwitter(@akagami_v2)のDMにて、できる限りはお受けしたいと思います。では、良いトレードbot作成ライフをお送りください!
最後まで読んでいただき、どうもありがとうございます。頂いたサポートは、良質な情報を発信している方のnote購入・サポートに充てさせていただきます。