見出し画像

Pythonで非同期でタスクを実行して、モニタリングする環境をDockerで構築する


Pythonで非同期処理をやる場合はceleryを使うのが定番です。今回はceleryを試しに動作させるための環境をDockerを使って簡単に作る方法をご紹介いたします。

コード自体はQiitaにある参考資料のものをそのまま持ってきています。参考源は一番下に載せておきます。

今回はdocker-composeを使用して、動作に必要なコンテナを作っていきます。

※関係ないですが、画像は野菜のセロリを使用しています。画像はテキトウにに選びました・・・

使用するツール類

今回は以下の3つを使用します。それぞれ別なコンテナで動作させるため、本番の運用などでは、別々なサーバーで動作させることを想定しています。

・redis (キューを動作させるためのKVS)
・celery (Pythonのタスクキューサービス、別なプロセスで動作させる)
・flower (celery内にあるタスクを監視するためのツール、webで動作)

今回作成するもの

今回は至ってシンプルな物を作成します。キューに貯めるタスクは以下のような処理です。
単純に10秒待って標準出力をするための関数と、引数二つの足し算をする関数の二つだけです。これらを非同期で動作させます。

def run():
   time.sleep(10)
   print('処理 おわた')
   return 'おわったよ'

def calc(a, b):
   return a+b

簡単な図ではありますが、以下のような感じです。

(Task) => (Queue Redis) => (Worker Celery)

※一応ではありますが、図としては以下の記事にある画像の図が一番わかりやすいです。

非同期で動作させることの利点

上記の様な簡単な処理ではわざわざceleryを導入して、非同期処理をする必要はありませんが、CPUに負荷がかかってしまうような計算処理や、RDBへの永続処理などの一つ一つが重いような処理を行う必要性がある場合は、キューに処理であるタスクを貯めていきます。

後はキューであるRedisにタスクが貯まるのをワーカーであるCeleryが検知し、自動で負荷状況などを調整して、タスクを徐々に実行してくれます。

docker-compose.ymlを作成する

それでは実際にDocker上で動作させるため、docker-compose.ymlに設定を記述していきます。

以下のように4つのコンテナを作成します。docker-compose.ymlのファイル内容は以下になります。

version: '3.7'
services:
 python:
   build: .
   tty: true
   image: python
   container_name: python
   volumes:
     - ./:/usr/src/app
   environment:
     - CELERY_BROKER=redis://redis:6379/0
     - CELERY_BACKEND=redis://redis:6379/0
   depends_on:
     - redis

 celery:
   image: python
   tty: true
   container_name: celery
   volumes:
     - ./:/usr/src/app
     - ./logs:/usr/src/app/logs
   command: celery -A tasks worker --loglevel=info  --logfile=logs/celery.log
   environment:
     - CELERY_BROKER=redis://redis:6379/0
     - CELERY_BACKEND=redis://redis:6379/0
   depends_on:
     - python
     - redis

 redis:
   image: redis:5.0.3-alpine
   container_name: redis
   tty: true

 monitor:
   image: python
   tty: true
   container_name: monitor
   ports:
     - 5555:5555
   command:  flower -A tasks --port=5555 --broker=redis://redis:6379/0
   depends_on:
     - python
     - redis

それぞれのコンテナに名称を振ってあり、それぞれのコンテナの役割を以下にまとめました。

・python
キューにタスクを格納する処理を実行するコンテナ

・celery
KVSであるRedisのキューにタスクが入ったのを検知するワーカープロセスを動かすコンテナ。キューに貯まったタスクの実行も行う。

・redis
KVSであるRedisを動作させるコンテナ。Redis以外にもRabbitMQなどを使用してもかまいません。

・monitor
celeryのタスク監視を行うflowerを動作させるコンテナ

Pythonコンテナに必要なファイル類Dockefile

キューであるRedisに対してタスクの発行を行うコンテナのDockefileを記述していきます。他3つのコンテナも「image: python」を指定しているため、他3つのコンテナもDockefileで設定したコンテナが動作します。

# Dockerfile

FROM python:3.6.8-alpine

WORKDIR /usr/src/app

COPY ./requirements.txt /usr/src/app/requirements.txt

RUN pip install -r requirements.txt

COPY . /usr/src/app

Dockerfileの記述内に「RUN pip install -r requirements.txt」とあるように、必要なライブラリを記述するrequirements.txtを作成します。

# requirements.txt

celery==4.2.1
flower==0.9.2
redis==3.2.0
tornado>=4.2.0,<6.0.0

Celeryで動作させるタスク処理を実行するファイル

タスク処理を実行するためのtasks.pyとceleryの設定ファイルであるceleryconfig.pyを作成していきます。

# celeryconfig.py

# celeryを動かすための設定ファイル。
BROKER_URL = 'redis://localhost/0'

# CELERYD_CONCURRENCY=1なので、1こずつキューを捌いていく
# ここはCPU数に合わせていくのがよい
CELERYD_CONCURRENCY = 1
CELERY_RESULT_BACKEND = 'redis'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = "redis"
CELERYD_LOG_FILE = "./celeryd.log"

# CELERYD_LOG_LEVELをINFOにしておくと、
# タスクの標準出力もログ(celeryd.log)に書かれる
CELERYD_LOG_LEVEL = "INFO"

# ワーカーはtasks.pyを読み込み、
# 非同期処理させる関数を
# 含むスクリプト全てを指定
CELERY_IMPORTS = ("tasks", )

以下はcelery上で動作するtasks.pyです。デコレーターである@app.taskを付けることで、関数を読み出すとredisにタスクが格納され、celeryで動作するようになります。os.environ.getの環境変数はdocker-compose.ymlでそれぞれのコンテナで設定しています。

# tasks.py
import os
import time
import celery
import celeryconfig

CELERY_BROKER = os.environ.get('CELERY_BROKER')
CELERY_BACKEND = os.environ.get('CELERY_BACKEND')

app = celery.Celery(
  'tasks',
  broker=CELERY_BROKER,
  backend=CELERY_BACKEND
)
app.config_from_object(celeryconfig)

@app.task
def run():
   time.sleep(10)
   print('処理 おわた')
   return 'おわったよ'

@app.task
def calc(a, b):
   return a+b

次にtasks.pyを呼び出すためのmain.pyを作成します。.delay()と入れることで、非同期で処理を実行させます。
処理結果自体は.resultで取り出すことが可能です。

# main.py

import tasks
print('<first task>')
# ここでタスク起動 (runタスク)
worker = tasks.run.delay()

# 終わらぬなら終わるまで待とうホトトギス
while not worker.ready():
   pass
# 返り値をだす
print(worker.result)

print('<second task>')
# ここでタスク起動 (calcタスク)
worker = tasks.calc.delay(100, 200)
# 終わらぬなら終わるまで待とうホトトギス
while not worker.ready():
   pass
# 返り値をだす
print(worker.result)

ログを格納するためのディレクトリを作成

最後に処理結果のログを格納するためのディレクトリを以下のコマンドを実行して作成します。

$ mkdir logs

docker-compose.yml内のcommandのタグを見ればわかりますが、celeryとmonitorコンテナにはログの格納先のディレクトリを指定しています。

celery -A tasks worker --loglevel=info --logfile=logs/celery.log
flower -A tasks --port=5555 --broker=redis://redis:6379/0

ディレクトリ構成

treeコマンドで以下のように動作に必要なコンポーネント類が揃っていれば問題ありません。

$ tree
.
├── celeryconfig.py
├── docker-compose.yml
├── Dockerfile
├── logs
├── main.py
├── requirements.txt
└── tasks.py
1 directory, 6 files

実際に動作させる

必要なファイル類は作成したので、それでは実際に動作させてみましょう。以下のコマンドを実行してコンテナ類を動作させます。

最初にまずはイメージの作成などが行われるはずです。

$ docker-compose up -d

「docker-compose ps」コマンドを実行することで、現在立ち上がっているコンテナ類がわかります。以下のようになっていれば問題ありません。

$ docker-compose ps

 Name                Command               State           Ports
-------------------------------------------------------------------------
celery    celery -A tasks worker --l ...   Up
monitor   flower -A tasks --port=555 ...   Up      0.0.0.0:5555->5555/tcp
python    python3                          Up
redis     docker-entrypoint.sh redis ...   Up      6379/tcp

redisは6379ポートで動作しており、celeryの動作を監視するためのflowerは、5555ポートで動作していることがわかります。

実際に127.0.0.1:5555にアクセスして見ると、以下のような画面が表示されます。celeryが現在処理しているタスクなどのキューの様子がリアルタイムで見れます。

以下の4つの項目があります。

・Active - 動作中のタスク
・Processed - 処理し終わったタスク
・Failed - 失敗したタスク
・Succeeded - 成功したタスク

上記の画面を見ながら、タスクを実行させてみましょう。まずは以下のコマンドでpythonコンテナの中に入り、main.pyを実行してみます。

$ docker exec -it python /bin/sh

$ python main.py
<first task>
おわったよ
<second task>
300

flowerの画面に戻ってみると、「Active」と「Processed」がそれぞれ1が付いてタスクが動作中なのがわかります。

しばらくすると、以下のように「Succeeded」と「Processed」がそれぞれ2になって、タスクが処理されて成功したことがわかります。


実際にdockerを使用して、非同期処理を試しに実行させるサンプルを作成しました。ご参考になれば幸いです。

参考資料


この記事が気に入ったらサポートをしてみませんか?