pythonを使ったBitMex用のバックテストコード

# coding: utf-8
import requests
import time
import json
import csv
import pandas as pd
#---------------------------------------------------------------過去データ取得部分---------------------------------------------------------------
def get_ohlcv_json():
	candlesize = '1m'  #公式で対応している時間足を入力 	period = 10  # ×500足分取得されます。
	print('[System messege:BitMEXに接続。OHLCVデータを取得中です。]')
	while True:
		try:
			for a in range(period):
				ohlcv_data = requests.get('https://www.bitmex.com/api/v1/trade/bucketed?binSize=' + candlesize + '&partial=false&symbol=XBTUSD&count=500&start=' + str(a * 500) + '&reverse=true').json()
				if a == 0:
					raw_ohlcv = ohlcv_data
				else:
					raw_ohlcv = raw_ohlcv + ohlcv_data
				print('[system message:現在' + str(a * 500) + '件取得完了。]')
				time.sleep(1)
			break
		except Exception as e:
			print(('[system message:データ取得失敗、5秒待機して再取得します。]'))
			time.sleep(5)
	OHLC = []
	for x in range(len(raw_ohlcv)):
		OHLC.append([raw_ohlcv[x]['open'],raw_ohlcv[x]['high'],raw_ohlcv[x]['low'],raw_ohlcv[x]['close'],raw_ohlcv[x]['volume'],raw_ohlcv[x]['timestamp']])
	f = open('store_ohlcv.json', 'w')
	json.dump(OHLC,f)
	f.close()
	print('[system message:過去データ' + str(period * 500) + '件取得完了。]')
	time.sleep(2)
get_ohlcv_json()

#----------------------------------------------インジケーター用関数記載部-------------------------------------------------------
#単純平均線作成関数
def get_ma(period, close_prices):
    lst_close = []
    for i in range(period):
        lst_close.append(close_prices[i])
    global ma
    ma = round(sum(lst_close) / period,1)
    #期間9で終値を用いた単純移動平均の例、get_ma(9,OHLC.ix[:, 3].values.tolist())←これの記載は後述
#--------------------------------------------------------売買設定部-----------------------------------------------------------------
averaging = 5 #ナンピン回数
profit_take = 55 #利確幅
stop_loss = 40 #損切幅
fee = 0.15 * 0.01 #売買手数料往復分
direction_setting = 0#ロングショート両方とるなら0、ロングのみなら1、ショートのみなら2
#-------------その他変数設定部-----------------
mybtclong = 0
mybtcshort = 0
entry_price = 0
lst_entry = []
win_long = 0
lose_long = 0
win_short = 0
lose_short = 0
income_long = 0
income_short = 0
loss_long =0
loss_short = 0
margin = 0
pal = 0
wplong = 0
wpshort = 0
#----------------------------------------------
#------------------------------------------------------OHLCデータ取得再現部分---------------------------------------------------------
print('[system message:バックテストを開始します。]')
time.sleep(2)
# ローカルJSONファイルの読み込み
with open('store_ohlcv.json','r') as f:
    data = json.load(f)
global ohlcv
ohlcv = data
ohlcv.reverse()
#yは0~len(ohlc)-500
def get_data(y):
    global OHLC
    OHLC = []
    for x in range(500):
        OHLC.insert(0,ohlcv[x + y])
    OHLC = pd.DataFrame(OHLC)
    global next_trade_price
	#売買時に使う価格、現在足から見て次のOPENとCLOSEの平均です。
    next_trade_price = (ohlcv[500 + y][0] + ohlcv[500 + y][3])/2
    global Open
    Open = OHLC.ix[:, 0].values.tolist()
    global high
    high = OHLC.ix[:, 1].values.tolist()
    global low
    low = OHLC.ix[:, 2].values.tolist()
    global Close
    Close = OHLC.ix[:, 3].values.tolist()
    global UTCtime
    UTCtime = OHLC.ix[:, 5].values.tolist()
#--------------------------------------------------ロジック記載部----------------------------------------------------------
for y in range(0,len(ohlcv)-501):
    get_data(y)
    get_ma(9,OHLC.ix[:, 3].values.tolist())
    ma9 = ma
    get_ma(26,OHLC.ix[:, 3].values.tolist())
    ma26 = ma
    # ロング(損切り)注文
    if mybtclong > 0:
        loss_price = sum(lst_entry) / len(lst_entry) - Close[0]
        if loss_price >= stop_loss:  #その他の損切条件があればorで記載             margin = mybtclong * (next_trade_price - (sum(lst_entry) / len(lst_entry)))
            lose_long = lose_long + 1
            loss_long = loss_long + margin - (next_trade_price * fee)
            mybtclong = 0
            lst_entry = []
    # ショート(損切り)注文
    if mybtcshort > 0:
        loss_price = Close[0] - sum(lst_entry) / len(lst_entry)
        if loss_price >= stop_loss:  #その他の損切条件があればorで記載             margin = mybtcshort * (sum(lst_entry) / len(lst_entry) - next_trade_price)
            lose_short = lose_short + 1
            loss_short = loss_short + margin - (next_trade_price * fee)
            mybtcshort = 0
            lst_entry = []
    # ロング(利確)注文
    if mybtclong > 0:
        if (Close[0] - sum(lst_entry) / len(lst_entry)) > profit_take:  #その他の利確条件があればorで記載             margin = mybtclong * (next_trade_price - (sum(lst_entry) / len(lst_entry)))
            if margin > 0:
                win_long = win_long + 1
                income_long = income_long + margin - (next_trade_price * fee)
            else:
                lose_long = lose_long + 1
                loss_long = loss_long + margin - (next_trade_price * fee)
            mybtclong = 0
            lst_entry = []
    # ショート(利確)注文
    if mybtcshort > 0:
        if (sum(lst_entry) / len(lst_entry) - Close[0]) > profit_take:  #その他の利確条件があればorで記載             margin = mybtcshort * (sum(lst_entry) / len(lst_entry) - next_trade_price)
            if margin > 0:
                win_short = win_short + 1
                income_short = income_short + margin - (next_trade_price * fee)
            else:
                lose_short = lose_short + 1
                loss_short = loss_short + margin - (next_trade_price * fee)
            mybtcshort = 0
            lst_entry = []
    # ロング(買い)注文
    if direction_setting == 0 or direction_setting == 1:
        if 0 <= mybtclong < averaging and mybtcshort == 0:
            if ma9 < ma26:  #ロングの条件記載                 mybtclong = mybtclong + 1
                entry_price = next_trade_price
                lst_entry.append(entry_price)
    # ショート(売り)注文
    if direction_setting == 0 or direction_setting == 2:
        if 0 <= mybtcshort < averaging and mybtclong == 0:
            if ma9 > ma26:  #ショートの条件記載                 mybtcshort = mybtcshort + 1
                entry_price = next_trade_price
                lst_entry.append(entry_price)
    #進度表示、計算を高速化したければ削除推奨
    print(Open[0],high[0],low[0],Close[0],UTCtime[0],'進度' + str(round((y + 501) / len(ohlcv) * 100,2)) + '%')
#-----------------------------------------------------------検証部-----------------------------------------------------
pal = income_long + income_short + loss_long + loss_short
if (win_long + win_short + lose_long + lose_short) > 0:
    wp = (win_long + win_short) / (win_long + win_short + lose_long + lose_short)
    if (win_long + lose_long) > 0:
        wplong = win_long / (win_long + lose_long)
    else:
        wplong = 0.0
    if (win_short + lose_short) > 0:
        wpshort = win_short / (win_short + lose_short)
    else:
        wpshort = 0.0
else:
    wp = 0
global backtest_for_csv
if direction_setting == 0:
    direction = 'ロングショート両方あり'
elif direction_setting == 1:
    direction = 'ロングのみ'
else:
    direction='ショートのみ'
backtest_for_csv = ['純利益',pal,'利確幅',profit_take,'損切幅',stop_loss,'勝率',round(wp * 100,2),'戦い方',direction,'ロングの勝率',wplong * 100,'ショートの勝率',wpshort * 100,'利確損切回数',win_long + win_short + lose_long + lose_short]
print("テスト期間(ローソク足の数) " + str(len(ohlcv)))
with open('backtest.csv','a',encoding='UTF-8') as f:
    writer = csv.writer(f)
    writer.writerow(backtest_for_csv)
print('[system message:バックテスト完了。CSVファイルへの保存完了。結果を表示します。]')
print(backtest_for_csv)

 上記がコード全文です。Noteの投稿に慣れていないのでもっとスマートな書き方があるのかもしれませんけどごり押します。

 必要なライブラリはコード冒頭を参照してください。生意気にもpandas使ってます。今回のバックテストコードはテスト後に本番botへの実装がやりやすいように1回に500件までOHLCVを取得するという部分を再現して作っています。本当は速度重視なら最初の1回目で全件のテクニカル指標を計算しておいて後は参照するだけのほうが素早いんですけど、これは毎回計算しなおしています。また、今回このバックテストで用いているロジックは超適当に組み込んだだけで、テスト結果が良かったからと言って今後通用することは無いかもしれない(無い、きっと無い)ということをご承知おきください。そして私はpythonどころかプログラミング歴が2か月程度なので、エラーがあるかもしれないことを前提で、疑いのまなざしで使ってください。(言い訳!でも無料だもん!)。

 コード冒頭のAPIKEY等はデータ取得のみなので入力不要です。過去データ取得部分では取ってきたい時間足とどれだけのデータを取ってくるかを指定してください。対応している時間足は公式と同様です。periodに入れた数×500件取得します。また、そのあとのテストコードの仕様上2以上の数値を入れてください。その後の記載部分も要所要所にコメントを入れているのでみなさん好きなようにいじってみてください。

 最初の過去データで取得したJSONはローカルに保存されますが、次の起動時に上書きされます。とっておきたければ移動するか名前を変更してください。テスト結果のCSVファイルは結果が追記されていくので前回の結果が消えることはありません。

 説明不足かもしれませんがさほど複雑な動きはしていないと思うので一度動かせば「ふんふん、こんな感じね。」というのがわかると思います。

 それではみなさま、オーバーフィッティングに気を付けて良いFXライフを!


#-----------追記--------------

EMAを計算するコードです。実行するときはこのコードを上のほうに貼り付けて、ema(9)など何本足の計算をするか、数字を括弧に入れてください。実行するとlst_emaというリストにEMAが計算されて新しい順に入っているのでprint(lst_ema[0])とすると最新のEMAを表示してくれます。

#EMA
def ema(period):
   Close.reverse()
   lst_close = []
   global lst_ema
   lst_ema = []
   for i in range(len(Close)):
       lst_close.insert(0,Close[i])
       if len(lst_close) == period:
           ma = sum(lst_close) / period
           lst_ema.insert(0,ma)
       if len(lst_close) > period:
           ema = lst_ema[0] + (2 / (period + 1)) * (lst_close[0] - lst_ema[0])
           lst_ema.insert(0,ema)
   Close.reverse()