【Python】Prefectで作るパイプライン
Prefectとは
Pythonでワークフロー・パイプラインを構築するためのライブラリです。
ライセンスは「Apache License 2.0」。
Prefectはクラウドサービス(Prefect Cloud)も提供していて、クラウドへのデプロイやジョブ実行に対応しています。 ただし、Prefect Cloudを業務で使う場合は有料ライセンスが必要になってきます。
本記事ではクラウド機能は使用せず、ローカルやオンプレミス環境での利用を想定しています。
Prefectの良いところ
Pythonで完結する
XMLによる定義や、(Airflowなどのように)ジョブの管理・実行のための独自環境が不要。
普通のPythonアプリケーションと同じように実行するだけです。
学習コストが低い
基本的には呼び元と呼び先のファンクションにデコレータを付けるだけでパイプライン化できます。
なので、通常のPythonバッチアプリケーションを開発するのとほとんど変わりません。
他の類似ライブラリ(Luigiなど)と比較して、独自の記法や制限がなく圧倒的に学習コストが低いと思います。
<サンプル>
from prefect import flow
from sample_task_1 import run_1
from sample_task_2 import run_2
# 呼び元(flow)
@flow
def sample_flow():
message = "Hello, "
message = run_1(message)
message = run_2(message)
print(f"Final message: {message}")
if __name__ == "__main__":
sample_flow()
from prefect import task
# 呼び先(task)
@task
def run_1(message: str):
return message + "World"
@task
def run_2(message: str):
return message + "!"
<実行結果>
<Web UI(Prefect Server)上での確認画面>
便利な機能
logging
Prefect UIからジョブの実行ログや可視化されたフローを確認できます。
「log_prints=True」を設定するとprint文の出力結果もロギングできます。
@flow(log_prints=True)
def my_flow():
print("we're ==logging== print statements from a flow")
my_task()
retries
taskにパラメータを指定するだけでリトライ処理をパイプラインに組み込めるので、非同期実行やリトライの制御を実装する必要がなくなります。
@task(retries=2, retry_delay_seconds=5)
def some_task():
# エラーの可能性がある処理
caching
個人的に最も便利だと思うのがこの機能です。
Prefectでは各タスクの実行結果はDB(SQLite)に自動的に保存されます。
例えばA→B→Cというタスク順のパイプラインにおいて以下のような挙動となります。
A・Bは完了しCだけ失敗
↓
再実行
↓
A・Bの実行はスキップされてCのみ実行される
特に開発中はパイプラインの途中にある特定タスクを何度も修正&再実行することがあるので非常に役立ちます。
なお、タスクごとにキャッシュの有効無効を設定可能です(refresh_cache=True|False)。
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1), refresh_cache=False)
def hello_task(name_input):
# Doing some work
print("Saying hello")
return "hello " + name_input
Prefect Server
Web UIによる実行履歴やログの確認が可能です。
以下のコマンドでPrefect Serverを起動し、「Prefect Server」にアクセスします。
% prefect server start
___ ___ ___ ___ ___ ___ _____
| _ \ _ \ __| __| __/ __|_ _|
| _/ / _|| _|| _| (__ | |
|_| |_|_\___|_| |___\___| |_|
Configure Prefect to communicate with the server with:
prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api
View the API reference documentation at http://127.0.0.1:4200/docs
Check out the dashboard at http://127.0.0.1:4200
まとめ
以上、Prefectの紹介でした。
私は業務での機械学習モデルの学習・デプロイ用のパイプライン開発にPrefectを使用しており、とても便利だと感じています。
他にも色々な機能があるので詳細はドキュメントをご参照ください。
この記事が気に入ったらサポートをしてみませんか?