見出し画像

【Python】Prefectで作るパイプライン


Prefectとは

Prefect is a workflow orchestration tool empowering developers to build, observe, and react to data pipelines.
(Prefectはワークフロー・オーケストレーション・ツールで、開発者はデータ・パイプラインを構築、観察、対応することができます。)


Pythonでワークフロー・パイプラインを構築するためのライブラリです。
ライセンスは「Apache License 2.0」。

Prefectはクラウドサービス(Prefect Cloud)も提供していて、クラウドへのデプロイやジョブ実行に対応しています。 ただし、Prefect Cloudを業務で使う場合は有料ライセンスが必要になってきます。

本記事ではクラウド機能は使用せず、ローカルやオンプレミス環境での利用を想定しています。

Prefectの良いところ

Pythonで完結する

XMLによる定義や、(Airflowなどのように)ジョブの管理・実行のための独自環境が不要。
普通のPythonアプリケーションと同じように実行するだけです。

学習コストが低い

基本的には呼び元と呼び先のファンクションにデコレータを付けるだけでパイプライン化できます。
なので、通常のPythonバッチアプリケーションを開発するのとほとんど変わりません。
他の類似ライブラリ(Luigiなど)と比較して、独自の記法や制限がなく圧倒的に学習コストが低いと思います。

<サンプル>

from prefect import flow

from sample_task_1 import run_1
from sample_task_2 import run_2

# 呼び元(flow)
@flow
def sample_flow():
    message = "Hello, "
    message = run_1(message)
    message = run_2(message)
    print(f"Final message: {message}")


if __name__ == "__main__":
    sample_flow()
from prefect import task

# 呼び先(task)
@task
def run_1(message: str):
    return message + "World"

@task
def run_2(message: str):
    return message + "!"

<実行結果>

実行結果:「Final message: Hello, World!」

<Web UI(Prefect Server)上での確認画面>

ジョブの実行履歴

便利な機能

logging

Prefect UIからジョブの実行ログや可視化されたフローを確認できます。
「log_prints=True」を設定するとprint文の出力結果もロギングできます。

@flow(log_prints=True)
def my_flow():
    print("we're ==logging== print statements from a flow")
    my_task()

retries

taskにパラメータを指定するだけでリトライ処理をパイプラインに組み込めるので、非同期実行やリトライの制御を実装する必要がなくなります。

@task(retries=2, retry_delay_seconds=5)
def some_task():
	# エラーの可能性がある処理

caching

個人的に最も便利だと思うのがこの機能です。
Prefectでは各タスクの実行結果はDB(SQLite)に自動的に保存されます。

例えばA→B→Cというタスク順のパイプラインにおいて以下のような挙動となります。


A・Bは完了しCだけ失敗

再実行

A・Bの実行はスキップされてCのみ実行される

特に開発中はパイプラインの途中にある特定タスクを何度も修正&再実行することがあるので非常に役立ちます。
なお、タスクごとにキャッシュの有効無効を設定可能です(refresh_cache=True|False)。

from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1), refresh_cache=False)
def hello_task(name_input):
    # Doing some work
    print("Saying hello")
    return "hello " + name_input

Prefect Server

Web UIによる実行履歴やログの確認が可能です。
以下のコマンドでPrefect Serverを起動し、「Prefect Server」にアクセスします。

% prefect server start

 ___ ___ ___ ___ ___ ___ _____
| _ \ _ \ __| __| __/ __|_   _|
|  _/   / _|| _|| _| (__  | |
|_| |_|_\___|_| |___\___| |_|

Configure Prefect to communicate with the server with:

    prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api

View the API reference documentation at http://127.0.0.1:4200/docs

Check out the dashboard at http://127.0.0.1:4200

まとめ

以上、Prefectの紹介でした。
私は業務での機械学習モデルの学習・デプロイ用のパイプライン開発にPrefectを使用しており、とても便利だと感じています。
他にも色々な機能があるので詳細はドキュメントをご参照ください。


Header photo by Sigmund on Unsplash

この記事が気に入ったらサポートをしてみませんか?