見出し画像

Pythonで話題のWEBフレームワークresponderでサンプルのtodoリストを作成

Responderとは

Responderは、2018年10月に公開されたPythonの軽量WEBフレームワークです。requestsとpipenvを開発されたKenneth Reitz氏が開発されたものであり、公開後わずか2ヶ月でスター数は2000をも超えています。

Responderの特徴とは

・FlaskとFalconの良いところを一つにまとめている
・非同期処理を簡単に書くことが可能
・標準でGraphQLを扱うことが可能(grapheneを使用している)
・SPAとHSTSに対応
・FlaskやFalconなどのWSGI対応アプリケーションを組み合わせてマウントすることが可能

Responderを用いて簡易的なtodoリストを作成

今回はこのResponderを使って簡易的なtodoリスト作成してみます。
DBにはsqlite3、ORMにはSQLAlchemyを使用します。

使用環境
Python 3.6.5

使用ライブラリ
SQLAlchemy

環境構築 & インストール作業

まず最初にpipでstarletteを取り入れます。実はresponderは、内部ではstarletteを使用しており、ほぼstarletteのラッパーライブラリのようなものです。

バージョンは必ず0.8.0以下を指定してください。responder側が0.8.0以上のstarletteに対応していないらしく、responderが動作しなくなります。

$ pip install starlette==0.8.0

次にresponder本体をインストールします。

$ pip install responder

Todoリストのモデル定義を行う

todoリストを保存するためのDBの設定ファイルを作成し、DBのモデル定義を行います。実際にSQLを扱うため、Pythonでは有名なORMであるSQLAlchemyを使用します。

SQLAlchemyの設定ファイルを作成する

今回はサンプルのためにsqlite3を使用しますが、実際にMySQLなどのDBを建てて扱う場合は各自でRDB_PATHにMySQLのパスを入れてください。

ECHO_LOGをTrueにすると、DBアクセス時のログが表示されるようになります。今回はFalseを設定しているため、ログが表示されないようになります。

ファイル名はrdb.pyとします。

#!/usr/bin/python
# -*- coding: utf-8 -*-
# rdb.py

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

Base = declarative_base()
RDB_PATH = 'sqlite:///db.sqlite3'
ECHO_LOG = False
engine = create_engine(
   RDB_PATH, echo=ECHO_LOG
)

Session = sessionmaker(bind=engine)
session = Session()


Todoリストのモデル定義をして、テーブルを作成する

__tablename__にはtodoリストを保存するためのテーブル名を入力します。今回はtasksとします。
定義しているカラムには、id, name, text, created_at, updated_atの5つです。

・idはtodoリストのユニークなkey
・nameはtodoリストの名称
・textはtodoリストの内容
・created_atはtodoリストの作成日時、server_default=current_timestamp()としているため、レコード作成時に現在日時と時刻が自動で入力されます。
・updated_atはtodoリストの更新日時、onupdate=datetime.now()としているため、レコード更新時に自動で現在日時と時刻が自動で入力されます。

ファイル名はmodels.pyとします。

# -*- coding: utf-8 -*-
# models.py

import os
from datetime import datetime

from rdb import Base
from rdb import engine

from sqlalchemy import Column, String, DateTime, text
from sqlalchemy.sql.functions import current_timestamp
from sqlalchemy.dialects.mysql import INTEGER

SQLITE3_NAME = "./db.sqlite3"

class Tasks(Base):
    __tablename__ = 'tasks'

    id = Column(
        INTEGER(unsigned=True),
        primary_key=True,
        autoincrement=True
    )
    name = Column(String(256))
    text = Column(String(256))
    created_at = Column(
        DateTime,
        default=datetime.now(),
        nullable=False,
        server_default=current_timestamp()
    )
    updated_at = Column(
        DateTime,
        default=datetime.now(),
        nullable=False,
        onupdate=datetime.now()
        # server_default=text(
        #     'CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP'
        # )
    )

if __name__ == "__main__":
    path = SQLITE3_NAME
    if not os.path.isfile(path):
        # テーブル作成
        Base.metadata.create_all(engine)

実際にテーブルを作成してみましょう。以下のコマンドを入力します。

$ python models.py

models.pyの以下の部分が実行され、実際にテーブルが作成されます。上記のコード通りのものを実行すると、同ディレクトリ上にdb.sqlite3というファイルが作成されます。
SQLITE3_NAMEには保存したいsqlite3のファイル名を入れれば、sqlite3のファイル名を変えられます。

if __name__ == "__main__":
    path = SQLITE3_NAME
    if not os.path.isfile(path):
        # テーブル作成
        Base.metadata.create_all(engine)


Todoリストのトランザクション処理を実装

それでは実際にSQLatodoリストを作成・保存・更新・削除を行うためのロジック部分の実装を行っていきましょう。

ファイル名はtodo.pyです。

#!/usr/bin/python
# -*- coding: utf-8 -*-
# todo.py

from rdb import session
from models import Tasks
from sqlalchemy.exc import SQLAlchemyError

def add_todo(name, text):
   """ todo listの追加 """
   try:
       task = Tasks(
           name=name,
           text=text
       )
       session.add(task)
       session.commit()
   except SQLAlchemyError as e:
       print(e)
       session.rollback()
   except Exception as e:
       session.rollback()
   finally:
       session.close()

def update_todo(id, name, text):
   """ todo listの更新 """
   try:
       """ SELECT 時に排他ロックを取得 """
       query = session.query(
           Tasks
       ).with_lockmode('update')
       task = query.filter(
           Tasks.id == id
       ).first()
       # time.sleep(1)
       """ name, text, img_pathの更新 """
       task.name = name
       task.text = text
       session.commit()
   except SQLAlchemyError as e:
       session.rollback()
   except Exception as e:
       session.rollback()
   finally:
       session.close()

def delete_todo(id):
   """ todo listの削除 """
   try:
       task = session.query(
           Tasks
       ).filter(
           Tasks.id == id
       ).first()
       session.delete(task)
       session.commit()
   except SQLAlchemyError:
       session.rollback()
   except Exception as e:
       session.rollback()
   finally:
       session.close()

def get_todo_list():
   """ todo listを全て取得 """
   task_list = []
   try:
       tasks = session.query(
           Tasks
       ).all()
       for task in tasks:
           created_at = task.created_at
           updated_at = task.updated_at
           task_list.append({
               "id": task.id,
               "name": task.name,
               "text": task.text,
               "created_at": created_at.strftime('%Y-%m-%d %H:%M:%S'),
               "updated_at": updated_at.strftime('%Y-%m-%d %H:%M:%S')
           })
   except Exception as e:
       print(e)
   finally:
       session.close()
       return task_list

def get_todo(id):
   """ todo listを取得 """
   task_dict = {}
   try:
       task = session.query(
           Tasks
       ).filter(
           Tasks.id == id
       ).first()
       if task:
           created_at = task.created_at
           updated_at = task.updated_at
           task_dict = {
               "id": task.id,
               "name": task.name,
               "text": task.text,
               "created_at": created_at.strftime('%Y-%m-%d %H:%M:%S'),
               "updated_at": updated_at.strftime('%Y-%m-%d %H:%M:%S')
           }
   except Exception as e:
       print(e)
   finally:
       session.close()
    

todoリスト追加部分の実装を少し解説

todoリストを追加する処理であるadd_todoメソッドについて少し解説します。引数はtodoリストのカラムであるname, textの二つです。

session.commit()の行で、実際にDBへのinsert処理であるトランザクションが発行されます。

try ~ exceptを入れることで、tryの中で処理が失敗してしまった場合、例外処理が行われ、session.rollback()でロールバックするようにしています。

finallyで、必ずsession.close()で必ずセッションを閉じるようにしています。

def add_todo(name, text):
   """ todo listの追加 """
   try:
       task = Tasks(
           name=name,
           text=text
       )
       session.add(task)
       session.commit()
   except SQLAlchemyError as e:
       print(e)
       session.rollback()
   except Exception as e:
       session.rollback()
   finally:
       session.close()


ResponderのWEB部分の実装

Responderの実装をようやく解説します。Responderはflaskと違い、Falconと同様にクラスベースでの実装が行なえます。
チュートリアルや他の解説記事では普通にメソッドで実装を行っていますが、ここではFalconみたいにクラスベースでの実装方法を解説します。

ファイル名はapp.pyとします。

#!/usr/bin/python
# -*- coding: utf-8 -*-
# app.py

import responder
from responder import API
import time

from todo import add_todo
from todo import delete_todo
from todo import update_todo
from todo import get_todo
from todo import get_todo_list

api = responder.API(
   cors=True,
   allowed_hosts=["*"],
)

class UpdateGetDeleteTodo:
   def on_get(self, req, resp, *, id):
       todo = get_todo(id)
       resp.media = {
           "status": True,
           "todo": todo
       }
   async def on_put(self, req, resp, *, id):
       @api.background.task
       def process_update_todo(name, text):
           time.sleep(3)
           update_todo(id, name, text)
       data = await req.media()
       name = data['name']
       text = data['text']
       process_update_todo(name, text)
       resp.media = {
           'status': True
       }
   async def on_delete(self, req, resp, *, id):
       @api.background.task
       def process_delete_todo():
           time.sleep(3)
           delete_todo(id)
       process_delete_todo()
       resp.media = {
           'status': True
       }

class AddGetTodo:
   def on_get(self, req, resp):
       todos = get_todo_list()
       resp.media = {
           "status": True,
           "todos": todos
       }
   async def on_post(self, req, resp):
       @api.background.task
       def process_add_todo(name, text):
           time.sleep(3)
           add_todo(name, text)
       data = await req.media()
       name = data['name']
       text = data['text']
       process_add_todo(name, text)
       resp.media = {
           'status': True
       }

api.add_route("/api/todo", AddGetTodo)
api.add_route("/api/todo/{id}", UpdateGetDeleteTodo)

if __name__ == "__main__":
   port = 5000
   api.run(port=port)

以下の部分でルーティング部分を記述しています。第一引数にはアクセスを受け付けるURLのパス名、第二引数にはエンドポイントとなる実際に処理を行うクラスを指定します。

api.add_route("/api/todo", AddGetTodo)

HTTPメソッドによって処理を受け付けるメソッドを定義できる

AddGetTodoには、todoリストの新規作成とtodoリスト全体を取得する部分の処理を実装しています。

on_get メソッドではGETリクエストが来た際の処理を行い、on_post メソッドではPOST処理が来た際の処理を受け付けます。

他のHTTPメソッドであるPUT, DELETEでも、on_put, on_delete とすることで処理が可能になります。

class AddGetTodo:

   def on_get(self, req, resp):
       todos = get_todo_list()
       resp.media = {
           "status": True,
           "todos": todos
       }

   async def on_post(self, req, resp):

       @api.background.task
       def process_add_todo(name, text):
           time.sleep(3)
           add_todo(name, text)
       data = await req.media()
       name = data['name']
       text = data['text']
       process_add_todo(name, text)
       resp.media = {
           'status': True
       }

追加・更新・削除部分を非同期で処理する

responderの機能の一つである非同期部分を簡単に実装できる特徴があります。
例えば、動画や画像などといった容量のでかいファイルが送られてきた場合、DBへの保存や画像ファイルの保存処理などといったことで時間を取られ、リクエストを送ってきた側(クライアント)へのレスポンスに時間がかかってします。

そこで、DBへのアクセスや画像ファイルの保存処理などといった時間がかかることは、別プロセスでやらせてしまうことで、クライアントへのレスポンスを一旦速やかに返すことができます。

@api.background.taskのデコレーターがふられているメソッドは、非同期で実行されます。ここではprocess_add_todoメソッドにtodoリストのnameとtextを引数で渡すことで、resp.mediaで速やかにレスポンスが返されます。

ここではあえてtime.sleepと3秒間停止してから、add_todoメソッドを呼んで作成処理を実行しています。

async def on_post(self, req, resp):

    @api.background.task
    def process_add_todo(name, text):
        time.sleep(3)
        add_todo(name, text)

    data = await req.media()
    name = data['name']
    text = data['text']

    process_add_todo(name, text)
    resp.media = {
        'status': True
    }

ちなみにリクエストボディのデーター部分を取り出すには、必ずawait req.media()と記述する必用があり、メソッドには必ずasyncで非同期として受け取らなければ、data部分を取り出すことはできません。

実際に動作させる

以下のコマンドを実行して、先程実装したapp.pyを動かしましょう。

$ python app.py

api.run()でresponderを動かしており、引数portにポート番号をしていすることで、responderを受け付けたいポートをしていすることができます。

if __name__ == "__main__":
   port = 5000
   api.run(port=port)


todoリストの作成を行う

以下のcurlコマンドを実行して、todoリストを作成してみます。

todoリストのnameにはtodo_name, text部分にはtodo_textと入れます。

$ curl http://127.0.0.1:5000/api/todo -X POST -H "Content-Type: application/json" -d '{"name": "value", "text": "test"}'

以下のようなレスポンスが返ってくれば作成が成功しているはずです。ちなみにサーバー側ではレスポンスが返ってきて3秒後にDBへの保存処理が行われています。

{"status": true}

todoリスト全体の取得

$ curl http://127.0.0.1:5000/api/todo

以下のような結果が返ってくるはず

{"status": true, "todos": [{"id": 1, "name": "value", "text": "test", "created_at": "2018-12-07 02:18:27", "updated_at": "2018-12-07 02:18:27"}, {"id": 2, "name": "value", "text": "test", "created_at": "2018-12-07 04:19:27", "updated_at": "2018-12-07 04:19:27"}]}

IDを指定して、特定のtodoを取得

# curl http://127.0.0.1:5000/api/todo/{id}
$ curl http://127.0.0.1:5000/api/todo/1

todoリストが一つだけ取得される

{"status": true, "todo": {"id": 1, "name": "value", "text": "test", "created_at": "2018-12-07 02:18:27", "updated_at": "2018-12-07 02:18:27"}}

todoの更新

$ curl http://127.0.0.1:5000/api/todo/1 -X PUT -H "Content-Type: application/json" -d '{"name": "value", "text": "test"}'

{"status": true}

3秒後ぐらいして、もう一度todoを取得してみると、ちゃんと更新されているはず。

$ curl http://127.0.0.1:5000/api/todo/1

{"status": true, "todo": {"id": 1, "name": "value", "text": "test", "created_at": "2018-12-07 02:18:27", "updated_at": "2018-12-07 02:18:27"}}

todoの削除

$ curl http://127.0.0.1:5000/api/todo/1 -X DELETE

{"status": true}

3秒後ぐらいして、もう一度同じIDのtodoを取得してみると、todoの部分が空になって返ってくるはずです。

$ curl http://127.0.0.1:5000/api/todo/1
{"status": true, "todo": {}}


まとめ

以上で簡易的なtodoリストの作成のサンプルの解説をしました。
Responderを用いれば簡単に非同期処理などもかけるため、容量の大きいファイルをリクエストされた際に扱うには良いのではないかと思います。

実際のソースコードはGitHubで以下に上げました。ご参考になれば幸いです。

https://github.com/shimakaze-git/responder-todo

この記事が気に入ったらサポートをしてみませんか?