見出し画像

自己流 Vertex AI Pipelines 開発プラクティス

こんにちは、カウシェで機械学習エンジニアをしている tatsuya (白川達也)です。最近まで取り組んでいたレコメンド機能の実装を通じて、Vertex AI Pipelines のカウシェにおけるプラクティスを自分なりに固めてみたので、公開してみたいと思います。

今回紹介するプラクティスのなかには実は Bad Practice なものもあるかもしれません。より良い方法をご存じの方は、直接教えていただいたり、Twitter などで共有・公開していただけるととても嬉しいです。この記事は半分それを期待して書いています。プラクティスはどんどん共有・公開して、大事な部分にフォーカスできるようにしましょう!

なお、Vertex AI Pipelines は Kubeflow Pipelines を GCP 上で実行するサービスなので、本当は Vertex AI Pipelines と Kubeflow Pipelines をちゃんと区別した方が正確なのですが、めんどうなので 本記事では Kuberflow Pipelines を内包するフレームワークとして、単に Vertex AI Pipelines と呼んでしまうことがあります。

以下に紹介する自己流 Practice の多くは、いずれもローカル環境(これもクラウド上のインスタンスでもなんでも良いのですが、面倒なのでローカル環境と呼びます)での開発と Vertex AI Pipelines での開発の差分をなくすことを目的としたものです。ローカルで開発・検証したものが可能な限り差分なく Vertex AI Pipelines 上で実行できるようになっていると良いなと考えています。

Practice 1. コンポーネントは YAML ベースでつくり、コマンドライン実行できるようにする

Vertex AI PIpelines のコンポーネント(処理 DAG = パイプラインを構成するステップ)には、

  • Python 関数ベース … Pythonの関数として実装する

  • YAML ベース … YAML形式の定義ファイルを作成しロードする

という 2 種類の作り方がありますが、プロダクトとして実装する際は、必ず YAML ベースで作るようにしています。

Python 関数ベースで作る場合、その関数は standalone でなければならないという制約があります。

この standalone の意味は上記の公式ドキュメントに記載がありますが、

  • 関数外で定義されたコードを一切使用しない

  • パッケージの import も関数内で行う

  • ヘルパー関数も関数内で定義する

ということを指します。つまり、こんな感じになります。

from kfp.v2.dsl import component


@component
def main(input_path: str, output_path: str) -> None:
    import numpy as np
    import pandas as pd

   
    def my_func(...):
        ...

    ...
    data = load_data(input_path)
    ...
    my_func(...)
    ...
    save_result(result, output_path)

ちょっと試してみるくらいであればよいのですが、それなりの規模の開発をしようとするとこの standalone な関数でなければならないというのは物理的には大きな制約となり、精神的には大きな苦痛となるため、コンポーネントは Python 関数ベースではつくらないことにしています。

かわりに、下記のような YAML 形式のファイルを書きます。

# component.yaml

name: get_purchase_data
description: get purchase data from DWH
inputs:
    - name: project_id
      type: str
      description: GCP project id
outputs:
    - name: output_file
      type: Data
      description: output path to the purchase data
implementation:
    container:
      image: <<image-uri>>
      command: [python, main.py]
      args: [
        --project_id, {inputValue: project_id},
        --output_file, {outputPath: output_file}
      ]

そして、下記のように読み込みます。

import kfp.components import load_component_from_file

@dsl.pipeline(
    name="...",
    description="...",
    pipeline_root="...",
)
def pipeline() -> None:
    ...
    get_purchase_data_op = load_component_from_file("component.yaml")
    ...

YAML ベースで実装することの恩恵は、コンポーネントの処理をコマンドラインのプログラムとして実行できることです。実際、上の例で言えば、

$ python main.py --project_id <<project-id>> --output_file <<output-file>>

というコマンドで main.py を実行できなければならず、逆にこのコマンドを発行すれば(実行に必要なデータや適切なパーミッションなどがあれば)コンポーネントと同等の処理がローカルでも実行できるはずです。機械学習のロジックは試行錯誤しながらつくることが多いので、ローカルでコマンド実行できると非常に取りまわしが良いです。ローカルでの開発から Vertex AI Pipelines へ移行するのもスムーズになります。

これに関連して、下記のプラクティスも心がけています

Practice 2. コンポーネントは単体でテストできるようにつくり、可能な限りモジュラーにする

コンポーネントはなるべく常識的な範囲内で単機能にするようにします。とくに、ネットワーク通信を介してデータやモデルを読み書きするような処理はそれ単体でコンポーネントとします。

たとえば BigQuery に特定のクエリーを発行してデータを取得したり、GCS にデータやモデルをアップロードしたりする処理は汎用性が高く、コンポーネントとして切り出しておくとモジュール的に再利用することができます。また、ネットワーク通信が発生する部分を切り離すことで、それ以外のロジックを担うコンポーネントがネットワーク通信(したがって外部のクラウドサービスなど)に依存しなくなりオフラインで実行できるようになるのでテストや検証がしやすくなります(その部分は普通の CLI プログラムを開発するのと同じように開発できるようになります)。

Practice 3. 汎用的な処理はモジュール化し、複数のコンポーネントで参照しあうようにする

それなりに複雑なパイプラインを複数作っていると、複数のコンポーネント内で同じ処理をさせたくなることがあります。その部分をコンポーネントとして切り出せれば良いのですが、難しい場合もあります。たとえば、機械学習であれば、学習時の前処理方法と予測時の前処理方法は Data Augmentation などをする場合を除き基本的には一致させる必要がありますが、こういった部分をコンポーネントとして切り出すのは現実的には厳しいことがあります。

そのため、これらをコードレベルで一致させるため、複数のコンポーネント間で同一のコードを共有する構成にしたくなります。

自分の場合、これを実現させるために下記のようなフォルダ構成にしています(説明のためコンポーネントの粒度はだいぶ粗くし、コンポーネント部分だけ抽出しています)。

src
 └── components
      ├── get_data  // データ取得するコンポーネント
      ├── train     // モデルを学習させるコンポーネント
      ├── predict   // 予測を行うコンポーネント
      └── utils     // 共通モジュール

各種コンポーネントの実装(= Python スクリプト)は必要であれば src/components/utils 配下の共通モジュールを参照するようにし、コンポーネントを実行させる Docker イメージをビルドするときには src/components/{{コンポーネント名}} と src/compoenents/utils をImage へ COPY するようにしています。これによりコンテナ内でも共通モジュール(utils)の参照が崩れず、ローカルでの開発との一貫性が保たれます。

// Dockerfile
...
WORKDIR /workspace
...
COPY src/components/{{コンポーネント名}} /workspace/src/components/{{コンポーネント名}}
COPY src/components/utils /workspace/src/components/utils
...

要はローカルで開発しているときに参照している自作モジュールごと構成を変えずにイメージへ COPY すればよいだけなので、ディレクトリ構成は何でも良いと思いますが、どこに依存しているのかをいちいち判断するのがめんどくさかったので、依存するものはすべて utils に放り込む構成にしてサボっています。

Practice 4. YAMLをテンプレート化し、dev/stg/prod などの値をロード時に埋める

プロダクト開発をしていると dev/stg/prod などに環境を分けて順次デプロイ・検証していくことが多いと思います。この場合、dev/stg/prod では極力同一のコードを動かしたいところです。

YAML ベースでコンポーネントを開発する場合、以下のようにして、YAML 上で環境依存の部分をテンプレート化しておきます。

# component.yaml

name: get_purchase_data
description: get purchase data from DWH
inputs:
    - name: project_id
      type: str
      description: GCP project id
outputs:
    - name: output_file
      type: Data
      description: output path to the purchase data
implementation:
    container:
      image: asia-northeast1-docker.pkg.dev/{PROJECT_ID}/path/to/image:{TAG}
      command: [python, main.py]
      args: [
        --project_id, {inputValue: project_id},
        --output_file, {outputPath: output_file}
      ]

上記の例では、image の URI において、PROJECT_ID と TAG をテンプレートにしています。

こうしておいてからコンポーネントの読み込み時に下記のような関数でコンポーネントをロードすることで、テンプレート部分に環境変数経由で値を埋めこむようにしています。

# src/pipelines/utils/components.py
import os

from kfp import components
from kfp.dsl import ContainerOp


# これを kfp.components.load_component_from_file の代わりに使う
def load_component_from_file(yml_path: str) -> ContainerOp:
    op = components.load_component_from_file(yml_path)
    template = op.component_spec.implementation.container.image
    op.component_spec.implementation.container.image = template.format(
        PROJECT_ID=os.environ["PROJECT_ID"],
        TAG=os.environ["TAG"],
    )
    return op

Practice 5. Compile して得た .json のキャッシュ設定を無効にしておく

これはカウシェ特有かもしれないですが、同じ問題に悩む方がいるかも知れないので記載しておきます。

現行の Vertex AI Pipelines (Kubeflow Pipelines)で

from kfp.v2 import dsl


@dsl.pipeline(
    name=...,
    description=...
    pipeline_root=...
)
def pipeline() -> None:
    ...


# package_path へ json ファイルが出力される
compile(
    pipeline_func=pipeline,
    package_path=...
)

などとして得た JSON ファイルでは、コンポーネントのキャッシュがデフォルトで有効化されています。

しかし、現在時刻をもとに最新のデータを取得するコンポーネントなどの場合、キャッシュはデフォルトでは無効にしておき、必要な場合にのみ有効化したくなります。

Python の SDK から Vertex AI Pipelines へパイプライン実行を submit する場合には、enable_caching を False にすることで、動的にキャッシュを切り替えることができます。そのため、Python SDK で submit をする限り、デフォルトでキャッシュを無効化する処理は簡単に実現可能です。

from google.cloud import aiplatform


job = aiplatform.PipelineJob(
    display_name=...,
    template_path=...,
    project=...,
    location=...,
    enable_caching=False  # デフォルトで False
)

job.submit()

ところで、カウシェではバックエンドはほぼ全て Go 言語で書かれているため、Vertex AI Pipelines のパイプラインの submit も Go で書いた内製の Pipeline Runner により実行しています。Pipeline Runner は Cloud Scheduler でスケジュール実行させています。

この Pipeline Runner は Vertex AI の API を利用しています。構成をシンプルで見通し良くするため、この Pipeline Runner では渡された Pipeline Spec をそのまま忠実に実行させたいと考えています。そのため、キャッシュの設定も submit 時ではなくコンパイル時に確定させたいと考えました。

しかし、Vertex AI Pipelines が依存する Kubeflow Pipelines v1 では kubectl でキャッシュのコントロールをする仕組みになっており、Compile 時にキャッシュを無効化するうまい方法が見つかりませんでした(Kubeflow Pipelines v2 ではその仕組みが導入されています)。

なので、以下のように compile をラップして強制的にキャッシュを無効化させるようにしています。Vertex AI Pipelines の submit の実装をみても似たことをしていたのでよりよい標準的な方法は現状ないのだろうと想像しています。

import tempfile
from typing import Callable

from kfp.v2 import compiler


def compile(pipeline_func: Callable, package_path: str) -> None:
    with tempfile.NamedTemporaryFile(buffering=0, suffix=".json") as temp_fout:
        compiler.Compiler().compile(pipeline_func=pipeline_func, package_path=temp_fout.name)
        with open(temp_fout.name) as fin:
            contents = json.load(fin)
    # disable caching by default
    for task in contents["pipelineSpec"]["root"]["dag"]["tasks"].values():
        task["cachingOptions"]["enableCache"] = False
    with open(package_path, "w") as fout:
        json.dump(contents, fout, indent=2, sort_keys=True)

JSON を吐き出させてから強制的にキャッシュ設定を上書きしているのでお行儀が悪いですが、Kubeflow Pipelines v2 への過渡期なのでしょうがないと思うことにして、ぐっと目をつぶっています。

Practice 6. イメージのビルドやコンパイルはmake の wildcard で共通化する

コンポーネントは一つ一つイメージの build をしないといけないですし、 パイプラインは compile しないといけません。複雑になってくるとこれらの build や compile を一つ一つのコンポーネントやパイプラインに対して書くのはだるくなってくるので、下記のような Makefile を作ってサボっています(compile についてだけ書いています)。

# Makefile

...

comipile_%:
        python src/pipelines/${@:compile_%=%}/compile.py
...

make はサブコマンドを受け付けないので、

$ make compile <<コンパイルしたいパイプライン名>>

のような実行はできません。その代わり、wildcard(Makefile だと %)をつかった compile_% のようなターゲットをつくることで

$ make compile_my_pipeline

のようなコマンドを有効にできます。wildcard 部分は上記の例にもある通り、 {@:compile_%=%} などとすれば取り出すことができます。

この方法は下記の記事で知り、大変便利に使っています(とはいえ、 make を使うことが限界なのかも)。

Practice 7. ロジックは共有するが引数の異なる Component はcomponent.yaml を分ける

(2023-04-20追記)

ほぼ同じロジックですがあるケースでは入力引数を省略させたい場合などがあります。

たとえばレコメンド機能の実装をしていて商品データの Embeddings をつくるコンポーネントを作っていたのですが、この処理が非常に重たくなることが想定されました。一方、差分が発生する商品情報は全体にたいしてそれほど多くないので、前回計算しておいた商品 Embeddings から差分更新をしたくなります。これを実現するには、初回の計算時(Create)はベースの商品 Embeddings のパスを入力せずに 0 から全商品の Embeddings を構築し、それ以降(Update)はベースの商品 Embeddings のパスを入力して差分更新する、という実装にすると良さそうです。商品 Embeddings のパスをコンポーネントレベルで Nullable にすることも頑張ればできそうではありましたが、手元でいろいろ試した結果、そもそも component.yaml レベルで分けてしまう方が楽という結論になりました。

つまり、こんな感じになります。

# component_create.yaml
name: create_product_embeddings
description: create product embeddings
inputs:
    - name: input_products_path
      type: Data
      description: input path to product data
    ...
outputs:
    - name: output_product_embeddings_path
      type: Model
      description: output path to product embeddings
    ...
implementation:
    container:
      image: ...
      command: [python, ./src/components/create_product_embeddings/main.py]
      args: [
        --input_products_path, {inputPath: input_products_path},
        --input_model_path, {inputPath: input_model_path},
        --output_product_embeddings_path, {outputPath: output_product_embeddings_path},
      ]
# component_update.yaml
name: update_product_embeddings
description: update product embeddings
inputs:
    - name: input_products_path
      type: Data
      description: input path to product data
    - name: input_base_product_embeddings_path
      type: Model
      description: input path to base product embeddings path
    ...
outputs:
    - name: output_product_embeddings_path
      type: Model
      description: output path to product embeddings
    ...
implementation:
    container:
      image: ...
      command: [python, ./src/components/create_product_embeddings/main.py]
      args: [
        --input_products_path, {inputPath: input_products_path},
        --input_base_product_embeddings_path, {inputPath: input_base_product_embeddings_path},
        --input_model_path, {inputPath: input_model_path},
        --output_product_embeddings_path, {outputPath: output_product_embeddings_path},
      ]

ただし、ロジックの共有部分は共有モジュールとして分離し、Create と Update を完全に別コンポーネントとして実装する、という選択肢もあり得ると思います。個人的にはなるべく特定のロジックに依存した処理は共有モジュールとして配置したくないので、上記のように、component.yaml で切り分ける方法を取るようにしています。

Practice 8. パイプラインがコンパイルできるかどうかのテストをする

(2023-04-20追記)

ローカル環境中心にゴリゴリ開発しているとパイプラインの定義コードとの整合性チェックがおろそかになりがちです。そのため、CIなどでテストをする際はパイプラインがコンパイルできるかどうかのテストもするようにしています。普通にコンパイルするスクリプトを実行して、エラーがなく実行されるか確かめているだけです。

おわりに

なにか書き漏らしたことがあるような気がしますが、自己流の Vertex AI Pipelines 開発プラクティスを公開してみました。参考になれば幸いです。そしてベタープラクティスをお持ちの方、ぜひ教えてください。共有・公開していただけたらもっと嬉しいです。

カウシェにご興味を持った方がいらっしゃいましたら Twitter @s_tat1204 もしくは YOUTRUSTのカジュアル面談 などでお気軽にお声がけください。軽い気持ちでご応募頂いて話を聞いてみる、というのも大歓迎です!ぜひよろしくお願いします。


この記事が気に入ったらサポートをしてみませんか?