bitFlyer APIのラッパークラスのつもり[python][asyncio]

ちょっと変更しました。
前回はexchangeクラスのプロパティにレスポンスを格納する仕様でしたが、わかりづらいのと、インデントのバグがあって一部動かなかったので修正しましたw

[基本的な使い方]
bot側からはExchangeクラスの各APIメソッドをコールして、Exchange.sendメソッドを実行してください。
sendメソッドを実行するまでリクエストはExchangeクラスのメンバ変数に格納されたままになります。
sendメソッドをコールするとレスポンスがリスト形式で返されます。
現状、複数リクエストを非同期で投げた際のリクエストごとに識別可能なIDなど設定できるようにはしていないので、複数リクエストを行う場合は同一APIメソッドに限った方が管理が楽になります。
スレッドと同じように、異なるメソッドはイベントを分けた方が良い気がします。

リアルタイムAPIの購読はインスタンス作成後にchannelsへセットしてください。
デフォルトは全チャネル受信となっていますので、レンタルサーバーを使用する場合、料金が高くなってしまう可能性がありますので、サンプルbotのように購読チャネルの指定を行って下さい。

[2銘柄以上の取引に関して]
product_codeを指定してインスタンス化するため、ひとつのインスタンス内で2銘柄取引することはできません。(できますがめんどくさくなります)
リアルタイムデータの受信もproduct_codeに依存していますので、SFDBotのような2銘柄以上の取引を行う場合は、インスタンスを、

self.exfx = Exchange()
self.exfx.product_code = 'FX_BTC_JPY'
self.exstock = Exchange()
self.exstock.product_code = 'BTC_JPY'

のように作り、それぞれに発注やポジション取得を行うことで対応可能かと思います。私自身ではまだ試してませんので、上手く行くかはちょっとわかりません。。

[現在の構成]
exchange.py [bitFlyer APIのラッパークラス]
bot.py [ラッパークラス他、本noteのクラスを使用したbotサンプル]
notify.py [LINE通知を含む通知クラス]

[更新]
2019/3/31 Notifyクラスを追加しました[LINE通知]
2019/4/3 重複した定義のバグを修正しました

# exchange.py

import aiohttp
import asyncio
import async_timeout
import json
from aiohttp import WSMsgType
import traceback
import time
from datetime import datetime
import hmac
import hashlib
import urllib

class Exchange():

# ------------------------------------------------------------------------------
# PROPERTY
# ------------------------------------------------------------------------------

    api_key = ''
    api_secret = ''

    product_code = 'FX_BTC_JPY'  # 銘柄 "FX_BTC_JPY"
    minute_to_expire = 43200  # 注文の有効期限(分 43200=30日
    time_in_force = 'GTC'  # 執行条件 "GTC", "IOC", "FOK"
    currency_code = 'JPY'  # 口座で使用している通貨のコード "JPY", "USD"

    # リアルタイムAPI購読チャネル
    channels = ['ticker', 'executions', 'board_snapshot', 'board']

    timeout = 10  # タイムアウト
    session = None  # セッション保持
    requests = []  # リクエストパラメータ

    urls = {'public': 'https://api.bitflyer.com/v1/',
            'private': 'https://api.bitflyer.com/v1/me/',
            'path': '/v1/me/'}

# ------------------------------------------------------------------------------
# INIT
# ------------------------------------------------------------------------------

    def __init__(self, api_key, api_secret):
        # APIキー・SECRETをセット
        self.api_key = api_key
        self.api_secret = api_secret

# ------------------------------------------------------------------------------
# ASYNC REQUEST FOR REST API
# ------------------------------------------------------------------------------

    def set_request(self, method, access_modifiers, target_path, params):
        if access_modifiers == 'public':
            url = ''.join([self.urls['public'], target_path])
            if method == 'GET':
                headers = ''
                self.requests.append({'method': method,
                                      'access_modifiers': access_modifiers,
                                      'target_path': target_path, 'url': url,
                                      'params': params, 'headers':{}})

            if method == 'POST':
                headers = {'Content-Type': 'application/json'}
                self.requests.append({'method': method,
                                      'access_modifiers': access_modifiers,
                                      'target_path': target_path, 'url': url,
                                      'params': params, 'headers':headers})

        if access_modifiers == 'private':
            url = ''.join([self.urls['private'], target_path])
            path = ''.join([self.urls['path'], target_path])
            timestamp = str(time.time())
            if method == 'GET':
                if len(params) > 0:
                    text = ''.join([timestamp, method, path,
                           '?{}'.format(urllib.parse.urlencode(params))])
                else:
                    text = ''.join([timestamp, method, path])

                sign = self.get_sign(text)
                headers = self.set_headers_for_private(timestamp=timestamp,
                                                       sign=sign)

                self.requests.append({'method': method,
                                      'access_modifiers': access_modifiers,
                                      'target_path': target_path, 'url': url,
                                      'params': params, 'headers': headers})

            if method == 'POST':
                if len(params) > 0:
                    post_data = json.dumps(params)
                else:
                    post_data = params

                text = ''.join([timestamp, method, path, post_data])
                sign = self.get_sign(text)
                headers = self.set_headers_for_private(timestamp=timestamp,
                                                       sign=sign)

                self.requests.append({'method': method,
                                      'access_modifiers': access_modifiers,
                                      'target_path': target_path, 'url': url,
                                      'params': post_data, 'headers': headers})

    def set_headers_for_private(self, timestamp, sign):
        headers = {'Content-Type': 'application/json',
                   'ACCESS-KEY': self.api_key, 'ACCESS-TIMESTAMP': timestamp,
                   'ACCESS-SIGN': sign}
        return headers

    def get_sign(self, text):
        sign = hmac.new(self.api_secret.encode('ascii'),
                        text.encode('ascii'), hashlib.sha256).hexdigest()
        return sign

    async def fetch(self, request):
        status = 0
        content = []

        async with async_timeout.timeout(self.timeout):
            try:
                if self.session is None:
                    self.session = await aiohttp.ClientSession().__aenter__()
                if request['method'] is 'GET':
                    async with self.session.get(url=request['url'],
                                                params=request['params'],
                                                headers=request['headers']) as response:
                        status = response.status
                        content = await response.read()
                        if status != 200:
                            # エラーのログ出力など必要な場合
                            pass

                elif request['method'] is 'POST':
                    async with self.session.post(url=request['url'],
                                                 data=request['params'],
                                                 headers=request['headers']) as response:
                        status = response.status
                        content = await response.read()
                        if status != 200:
                            # エラーのログ出力など必要な場合
                            pass

                if len(content) == 0:
                    result = []

                else:
                    try:
                        result = json.loads(content.decode('utf-8'))
                    except Exception as e:
                        traceback.print_exc()

                return result

            except Exception as e:
                # セッション終了
                if self.session is not None:
                    await self.session.__aexit__(None, None, None)
                    await asyncio.sleep(0)
                    self.session = None

                traceback.print_exc()

    async def send(self):
        promises = [self.fetch(req) for req in self.requests]
        self.requests.clear()
        return await asyncio.gather(*promises)

# ------------------------------------------------------------------------------
# PUBLIC API
# ------------------------------------------------------------------------------

    # マーケットの一覧を取得
    def getmarkets(self):
        params = {}
        self.set_request(method='GET', access_modifiers='public',
                         target_path='getmarkets', params=params)

    # 板情報を取得
    def getboard(self):
        params = {'product_code': self.product_code}
        self.set_request(method='GET', access_modifiers='public',
                         target_path='getboard', params=params)

    # Tickerを取得
    def getticker(self):
        params = {'product_code': self.product_code}
        self.set_request(method='GET', access_modifiers='public',
                         target_path='getticker', params=params)

    # 約定履歴を取得
    def getexecutions(self):
        params = {'product_code': self.product_code}
        self.set_request(method='GET', access_modifiers='public',
                         target_path='getexecutions', params=params)

    # 板の状態を取得
    def getboardstate(self):
        params = {'product_code': self.product_code}
        self.set_request(method='GET', access_modifiers='public',
                         target_path='getboardstate', params=params)

    # 取引所の状態を取得
    def gethealth(self):
        params = {'product_code': self.product_code}
        self.set_request(method='GET', access_modifiers='public',
                         target_path='gethealth', params=params)

    # チャットを取得
    def getchats(self, from_date=''):
        params = {'from_date': from_date}
        self.set_request(method='GET', access_modifiers='public',
                         target_path='getchats', params=params)

# ------------------------------------------------------------------------------
# PRIVATE API
# ------------------------------------------------------------------------------

    # API キーの権限を取得
    def getpermissions(self):
        params = {}
        self.set_request(method='GET', access_modifiers='private',
                         target_path='getpermissions', params=params)

    # 資産残高を取得
    def getbalance(self):
        params = {}
        self.set_request(method='GET', access_modifiers='private',
                         target_path='getbalance', params=params)

    # 証拠金の状態を取得
    def getcollateral(self):
        params = {}
        self.set_request(method='GET', access_modifiers='private',
                         target_path='getcollateral', params=params)

    # 預入用アドレスを取得
    def getaddresses(self):
        params = {}
        self.set_request(method='GET', access_modifiers='private',
                         target_path='getaddresses', params=params)

    # 仮想通貨預入履歴を取得
    def getcoinins(self, count=100, before=0, after=0):
        params = {'count': count, 'before': before, 'after': after}
        self.set_request(method='GET', access_modifiers='private',
                         target_path='getcoinins', params=params)

    # 仮想通貨預入履歴を取得
    def getcoinouts(self, count=100, before=0, after=0):
        params = {'count': count, 'before': before, 'after': after}
        self.set_request(method='GET', access_modifiers='private',
                         target_path='getcoinouts', params=params)

    # 銀行口座一覧を取得
    def getbankaccounts(self):
        params = {}
        self.set_request(method='GET', access_modifiers='private',
                         target_path='getbankaccounts', params=params)

    # 入金履歴を取得
    def getdeposits(self, count=100, before=0, after=0):
        params = {'count': count, 'before': before, 'after': after}
        self.set_request(method='GET', access_modifiers='private',
                         target_path='getdeposits', params=params)

    # 出金
    def withdraw(self, currency_code, bank_account_id, amount, code):
        params = {'currency_code': self.currency_code,
                  'bank_account_id': bank_account_id,
                  'amount': amount, 'code': code}
        self.set_request(method='POST', access_modifiers='private',
                         target_path='withdraw', params=params)

    # 出金履歴を取得
    def getwithdrawals(self, count=100, before=0, after=0, message_id=''):
        params = {'count': count, 'before': before, 'after': after,
                  'message_id': message_id}
        self.set_request(method='GET', access_modifiers='private',
                         target_path='getwithdrawals', params=params)

    # 新規注文を出す
    def sendchildorder(self, child_order_type, side, price, size):
        params = {'product_code': self.product_code,
                  'child_order_type': child_order_type,
                  'side': side, 'price': price, 'size': size,
                  'minute_to_expire': self.minute_to_expire,
                  'time_in_force': self.time_in_force}
        self.set_request(method='POST', access_modifiers='private',
                         target_path='sendchildorder', params=params)

    # 注文をキャンセルする
    def cancelchildorder(self, child_order_id='',
                         child_order_acceptance_id=''):
        params = {'product_code': self.product_code}
        if len(child_order_id) > 0:
            params['child_order_id'] = child_order_id
        elif len(child_order_acceptance_id) > 0:
            params['child_order_acceptance_id'] = child_order_acceptance_id

        self.set_request(method='POST', access_modifiers='private',
                         target_path='cancelchildorder', params=params)

    # 特殊注文を出す
    def sendparentorder(self, order_method, parameters_1, parameters_2,
                        parameters_3):
        params ={'order_method': order_method,
                 'minute_to_expire': self.minute_to_expire,
                 'time_in_force': self.time_in_force, 'parameters': {}}
        if order_method == 'SIMPLE':
            parameters_1['product_code'] = self.product_code
            params['parameters'] = [parameters_1]
        elif order_method == 'IFD' or order_method == 'OCO':
            parameters_1['product_code'] = self.product_code
            parameters_2['product_code'] = self.product_code
            params['parameters'] = [parameters_1, parameters_2]
        elif order_method == 'IFDOCO':
            parameters_1['product_code'] = self.product_code
            parameters_2['product_code'] = self.product_code
            parameters_3['product_code'] = self.product_code
            params['parameters'] = [parameters_1, parameters_2, parameters_3]

        self.set_request(method='POST', access_modifiers='private',
                         target_path='sendparentorder', params=params)

    # 親注文をキャンセルする
    def cancelparentorder(self, parent_order_id='',
                         parent_order_acceptance_id=''):
        params = {'product_code': self.product_code}
        if len(parent_order_id) > 0:
            params['parent_order_id'] = parent_order_id
        elif len(parent_order_acceptance_id) > 0:
            params['parent_order_acceptance_id'] = parent_order_acceptance_id

        self.set_request(method='POST', access_modifiers='private',
                         target_path='cancelparentorder', params=params)

    # すべての注文をキャンセル
    def cancelallchildorders(self):
        params = {'product_code': self.product_code}
        self.set_request(method='POST', access_modifiers='private',
                         target_path='cancelallchildorders', params=params)

    # 注文の一覧を取得
    def getchildorders(self, count=100, before=0, after=0,
                       child_order_state='', child_order_id='',
                       child_order_acceptance_id='',
                       parent_order_id=''):
        params = {'product_code': self.product_code, 'count': count,
                  'before': before, 'after': after,
                  'child_order_state': child_order_state,
                  'child_order_id': child_order_id,
                  'child_order_acceptance_id': child_order_acceptance_id,
                  'parent_order_id': parent_order_id}
        self.set_request(method='GET', access_modifiers='private',
                         target_path='getchildorders', params=params)

    # 親注文の一覧を取得
    def getparentorders(self, count=100, before=0, after=0,
                       parent_order_state=''):
        params = {'product_code': self.product_code, 'count': count,
                  'before': before, 'after': after,
                  'parent_order_state': parent_order_state}

        self.set_request(method='GET', access_modifiers='private',
                         target_path='getparentorders', params=params)

    # 親注文の詳細を取得
    def getparentorder(self, parent_order_id='',
                         parent_order_acceptance_id=''):
        params = {}
        if len(parent_order_id) > 0:
            params = {'parent_order_id': parent_order_id}
        elif len(parent_order_acceptance_id) > 0:
            params = {'parent_order_acceptance_id': parent_order_acceptance_id}

        self.set_request(method='GET', access_modifiers='private',
                         target_path='getparentorder', params=params)

    # 約定の一覧を取得
    def getexecutions(self, child_order_id='', child_order_acceptance_id='',
                      count=100, before=0, after=0):
        params = {}
        if len(child_order_id) > 0:
            params = {'product_code': self.product_code,
                      'child_order_id': child_order_id,
                      'count': count, 'before': before, 'after': after}
        elif len(child_order_acceptance_id) > 0:
            params = {'product_code': self.product_code,
                      'child_order_acceptance_id': child_order_acceptance_id,
                      'count': count, 'before': before, 'after': after}

        self.set_request(method='GET', access_modifiers='private',
                         target_path='getexecutions', params=params)

    # 残高履歴を取得
    def getbalancehistory(self, currency_code='JPY', count=100, before=0,
                          after=0):
        params = {'currency_code': self.currency_code,'count': count,
                  'before': before, 'after': after}

        self.set_request(method='GET', access_modifiers='private',
                         target_path='getbalancehistory', params=params)

    # 建玉の一覧を取得
    def getpositions(self):
        params = {'product_code': self.product_code}
        self.set_request(method='GET', access_modifiers='private',
                         target_path='getpositions', params=params)

    # 証拠金の変動履歴を取得
    def getcollateralhistory(self, count=100, before=0, after=0):
        params = {'count': count, 'before': before, 'after': after}

        self.set_request(method='GET', access_modifiers='private',
                         target_path='getcollateralhistory', params=params)

    # 取引手数料を取得
    def gettradingcommission(self):
        params = {'product_code': self.product_code}
        self.set_request(method='GET', access_modifiers='private',
                         target_path='gettradingcommission', params=params)

# ------------------------------------------------------------------------------
# REALTIME
# ------------------------------------------------------------------------------

    async def subscribe(self, callback):
        if len(self.channels) == 0:
            await asyncio.sleep(0)
            return
        uri = 'wss://ws.lightstream.bitflyer.com/json-rpc'
        while True:
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.ws_connect(uri,
                                                  receive_timeout=self.timeout) as client:
                        for channel in self.channels:
                            params = {'channel': 'lightning_'\
                                      + channel + '_' + self.product_code}
                            query = {'method': 'subscribe', 'params': params}
                            await asyncio.wait([client.send_str(json.dumps(query))])
                        async for response in client:
                            if response.type != WSMsgType.TEXT:
                                print('response:' + str(response))
                                break
                            data = json.loads(response[1])['params']
                            await self.handler(callback, data)

            except Exception as e:
                traceback.print_exc()

# ------------------------------------------------------------------------------
# UTILS
# ------------------------------------------------------------------------------

    # コールバック、ハンドラー
    async def handler(self, func, *args):
        return await func(*args)

# bot.py

import asyncio
import traceback
import time
from exchange import Exchange
from notify import Notify


class Bot():

# ------------------------------------------------------------------------------
# PROPERTY
# ------------------------------------------------------------------------------

    tradable = False  # トレード許可(可=True, 不可=False)
    allow_status = ['NORMAL', 'BUSY', 'VERY BUSY', 'SUPER BUSY']  # トレードを許可する取引所ステータス
    disallow_status = ['NO ORDER', 'STOP']  # トレードを許可しない取引所ステータス
    allow_order = True  # オーダーの許可(可=True, 不可=False)
    line_notify = False  # LINE通知の有効無効(有効=True, 無効=False)

    collateral = []
    position = {}
    positions = []
    ticker = []

# ------------------------------------------------------------------------------
# INIT
# ------------------------------------------------------------------------------

    def __init__(self, api_key, api_secret, line_token):

        self.exchange = Exchange(api_key=api_key, api_secret=api_secret)
        self.exchange.minute_to_expire = 1
        self.exchange.channels = ['ticker']
        self.exchange.time_in_force = 'GTC'
        self.exchange.currency_code = 'JPY'

        if len(line_token) > 0:
            self.notify = Notify(line_token=line_token)
            self.line_notify = True

        # タスクの設定及びイベントループの開始
        loop = asyncio.get_event_loop()
        tasks = [self.exchange.subscribe(self.realtime),
                 self.is_tradable(0.5),
                 self.update_collateral(60),
                 self.update_position(1),
                 self.run()]
        loop.run_until_complete(asyncio.wait(tasks))

# ------------------------------------------------------------------------------
# BOT 本体
# ------------------------------------------------------------------------------

    async def run(self):
        while True:
            await self.trade(1)
            await asyncio.sleep(0)

    async def trade(self, interval):
        """
        # EXAMPLE[ORDER AND CANCEL]
        if self.allow_order is True:
            self.exchange.sendparentorder(order_method='IFDOCO',
                                          parameters_1={'condition_type': 'LIMIT',
                                                        'side': 'BUY',
                                                        'price': 446000,
                                                        'size': 0.01},
                                          parameters_2={'condition_type': 'LIMIT',
                                                        'side': 'SELL',
                                                        'price': 450000,
                                                        'size': 0.01},
                                          parameters_3={'condition_type': 'STOP_LIMIT',
                                                        'side': 'SELL',
                                                        'price': 444800,
                                                        'trigger_price': 445000,
                                                        'size': 0.01})
            response = await self.exchange.send()
            print(response[0])

            self.exchange.cancelparentorder(parent_order_acceptance_id=response[0]['parent_order_acceptance_id'])
            response = await self.exchange.send()
            print(response)
            self.allow_order = False
        """
        await asyncio.sleep(interval)

    # 証拠金の状態を更新
    async def update_collateral(self, interval):
        while True:
            self.exchange.getcollateral()
            response = await self.exchange.send()
            if len(response[0]) > 0 and 'collateral' in response[0]:
                self.collateral = response[0]

            # 証拠金をLINE通知
            if self.line_notify is True:
                self.notify.sendlinenotify(response[0]['collateral'])
            print('COLLATERAL:', self.collateral)
            await asyncio.sleep(interval)

    # 建玉の状態を更新
    async def update_position(self, interval):
        while True:
            self.exchange.getpositions()
            response = await self.exchange.send()

            positions = response[0]
            if len(response[0]) > 0 and 'side' in positions[0]:
                side = ''
                size = 0.0
                total_size = 0.0
                subtotal = 0.0
                for position in positions:
                    side = str(position['side'])
                    size = float(position['size'])
                    total_size += size
                    subtotal += int(position['price']) * size
                self.position = {'side': str(positions[0]['side']),
                                 'price': int(subtotal / total_size),
                                 'size': round(total_size, 8)}
                self.positions = positions
            else:
                self.position = []
                self.positions = []

            print('POSITIONS:', positions)
            await asyncio.sleep(interval)

    # リアルタイムデータの受信
    async def realtime(self, data):
        self.ticker = data['message']
        print('TICKER:', data['message'])
        await asyncio.sleep(0)

    # 取引可能か判定(取引所ステータス基準)
    async def is_tradable(self, interval):
        while True:
            self.exchange.gethealth()
            response = await self.exchange.send()
            status = response[0]['status']

            if status in self.disallow_status:
                self.tradable = False
            else:
                self.tradable = True

            print('HEALTH:', response[0])
            await asyncio.sleep(interval)

# ------------------------------------------------------------------------------
# メイン
# ------------------------------------------------------------------------------

if __name__ == '__main__':
    api_key = 'YOUR API_KEY'
    api_secret = 'YOUR API_SECRET'
    line_token = 'YOUR LINE TOKEN'
    Bot(api_key=api_key, apci_secret=api_secret, line_token=line_token)
# notify.py

import json
import traceback
import requests


class Notify():

# ------------------------------------------------------------------------------
# PROPERTY
# ------------------------------------------------------------------------------

    line_token = ''

    timeout = 10  # タイムアウト
    session = None  # セッション保持

    urls = {'line': 'https://notify-api.line.me/api/notify'}

# ------------------------------------------------------------------------------
# INIT
# ------------------------------------------------------------------------------

    def __init__(self, line_token=''):
        # LINE TOKENをセット
        self.line_token = line_token

# ------------------------------------------------------------------------------
# HTTP REQUEST [POST ONLY]
# ------------------------------------------------------------------------------

    def send(self, url, payload, headers, files):
        status = 0
        content = []

        try:
            if self.session is None:
                self.session = requests.Session()
            if len(files) > 0:
                response = self.session.post(url, data=payload, headers=headers,
                                             files=files, timeout=self.timeout)
                print(response)
                status = response.status_code
                content = response.json()
                if status != 200:
                    # エラーのログ出力など必要な場合
                    pass
            else:
                response = self.session.post(url, data=payload, headers=headers,
                                             timeout=self.timeout)
                status = response.status_code
                content = response.json()
                if status != 200:
                    # エラーのログ出力など必要な場合
                    pass

            if len(content) == 0:
                result = []
            else:
                try:
                    result = json.dumps(content)
                except Exception as e:
                    traceback.print_exc()

            return result

        except Exception as e:
            if self.session is not None:
                self.session.close()
                self.session = None
            traceback.print_exc()

# ------------------------------------------------------------------------------
# UTILS
# ------------------------------------------------------------------------------

    # LINE通知
    def sendlinenotify(self, message, file=''):
        if len(self.line_token) == 0:
            return

        payload = {'message': message}
        if len(file) > 0:
            file = {'imageFile': open(file, 'rb')}
        headers = {'Authorization': 'Bearer ' + self.line_token}

        self.send(url=self.urls['line'], payload=payload, headers=headers,
                  files=file)

[スペシャルサンクス]
コードの一部にnoa@crptoresearcherさんが公開したコードを使用しています。

この記事が気に入ったら、サポートをしてみませんか?気軽にクリエイターを支援できます。

56

ニッケルメッキ

相場好きです!それだけ。

仮想通貨

2つ のマガジンに含まれています

コメント2件

美しいコードでとても参考になります。ありがとうございます。
RESTのgetexecutionsとgetbalancehistoryの定義が重複しているようです。ご確認ください。
ご指摘頂きました点を修正させて頂きました、ご指摘の件、またお褒め頂きいただきありがとうございます!
コメントを投稿するには、 ログイン または 会員登録 をする必要があります。