見出し画像

300行の “秘伝のタレ” SQLをデータ変換支援ツール「dbt」できれいにする

こんにちは。データエンジニアの諏訪遼 (@r10swa) です。
グロービスの GDP データサイエンスチームにおいて、データパイプラインの構築・運用を行っています。

はじめに

あなたの SQL が “秘伝のタレ” になるのを防ぐには

皆さんは、事業の KPI をモニタリングしたいとき、必要なデータをどのように用意していますか。

「何百行もある1つの SQL ファイルを “秘伝のタレ” のように使っている...」
「処理のロジックが複雑に絡み合っていて解読できない…」
といったことはないでしょうか。

/* queries/lengthy_query_1.sql */

WITH
    t1 AS (
        SELECT
            id
            , some_attribute_1
            , some_attribute_2
            , ...
        FROM  /* 前段のクエリのデスティネーションをハードコード */
            `some_upstream_table`
        WHERE  /* しきい値をハードコード */
            some_attribute_1 == 42
        GROUP BY
            ...
    )
    , t2 AS (
        ...
    )
    , t3 AS (
        ...
    )
    , ...  /* 積み重なった WITH 句 */
    , t10 AS (
        SELECT ...
        FROM  /* ネストされたサブクエリ */
            (
                SELECT ...
                FROM  /* 絡みあった JOIN */
                    (SELECT ... FROM t7 WHERE ... ) AS t7_2
                    LEFT JOIN (SELECT ... FROM t9 WHERE ... ) AS t9_2
                        ON ...
                WHERE ...
            ) AS t10_1
        WHERE ...
        GROUP BY ...
    )
SELECT
    ...
FROM
    t10
GROUP BY
    ...

▲ 長大なクエリ

筆者にも苦い経験がありますが、そのような状況ではコードが「レガシー化」してしまいます。
なお、ここでいうレガシーとは、書籍『レガシーコードからの脱却』にならって「修正、拡張、作業が非常に難しい」ことを指します。

一番シンプルに説明するなら、レガシーコードとは理由はなんであれ修正、拡張、作業が非常に難しいコードのことだ。

David Scott Bernstein, 『レガシーコードからの脱却』, はじめに

SQL は、多様な処理内容を簡単に記述できる優れた道具です。
しかしこのことは、「不必要な複雑さ」が容易に忍び込み、成果物が扱いづらくなる危険と隣合わせでもあります。

レガシー化を避けるためには、どうすればよいのでしょうか。
私たちは、「高品質なコード」を生み出すソフトウェア開発上のプラクティスが、そのヒントになると考えています。

近年、dbt というツールの登場により、データ処理の領域でもそのような「高品質なコード」を生み出すプラクティスを適用しやすくなりました。 dbt のいくつかの機能を活用すれば、書きやすく、読みやすく、メンテナンスしやすい形でデータ処理を記述できます。

この記事では、グロービスの GDP データサイエンスチームで KPI モニタリングを行った事例を題材に、dbt による「脱・レガシー化」を目指したデータ処理の一例をご紹介します。
SQL を頻繁に利用するデータエンジニア・データサイエンティストの方々を主な読者として想定しています。

なおこの記事では、「コード」という語で「SQL クエリを含む、データ処理のためのテキスト形式の成果物全体」を指すことにします。

dbt とは何か

dbt は、「DWH 製品におけるデータ処理の記述」を支援するオープンソースのコマンドラインツールです。
その名称は “data build tool” の頭文字に由来します。

データパイプライン全体における位置づけとしては 「ELT の T (変換)」 を担う存在といえます。
中でも SQL によるデータ処理に特化しており、Apache Airflow, Prefect などの汎用的ワークフロー管理ツールと 組み合わせて使う ことが想定されています。

dbt の使い方はシンプルです。
まず初期化のための “dbt init” コマンドを実行すると、「dbt プロジェクト」と呼ばれるディレクトリが生成されます。
その中に実行したい SQL ファイルを作成・配置し、DWH に接続するための認証情報などを与えます。

あとは、CLI で “dbt run” コマンドを実行するだけです。
これにより、内部的に次のことを行ってくれます:

  1. コンパイル: ユーザーが書いた SQL を実行可能な SQL へ変換します

  2. 実行: コンパイルされた SQL を DWH 上で実行します

より詳しい利用方法は 公式ドキュメント にまとまっています。
すでに BigQuery, Redshift, Snowflake などの DWH 上で SQL によるデータ処理を行っている方であれば、dbt の学習コストはさほど高くありませんので、ぜひ試してみてください。

なお正確には、「dbt」と呼ばれるものには次の2つがあります:

  • 「dbt Core」: dbt の主要部分です。オープンソースで提供されるコマンドラインツールです。

  • 「dbt Cloud」: dbt Core がマネージドサービス化されたものです。

ここでは「dbt」という言葉で、前者の「dbt Core」を指すものとします。

私たちが取り組んだこと

「KPI モニタリング」の概要

まず、私たちが取り組んだ「KPI モニタリング環境構築」プロジェクトについて、概要をご紹介します。

このプロジェクトは、「グロービス学び放題」事業における個人のお客様のリテンション = 利用定着度の改善を目的としています。
データサイエンスチームではこれを支援するため、次の項目を実施しました。

  1. リテンションを計測する指標の定義

  2. 1の指標に対するモニタリング手段の提供

1では、「ユーザー登録日別の、登録後 N 日後離脱率」という指標を選定しました。
このようなディメンション・計算条件を持つ指標を求めるクエリは、一般に多くのステップにわたります。

2では、データサイエンスチームにて運用している GCP 上のデータ基盤を利用し、日次更新のデータマート・ダッシュボードを社内に提供することとしました。
具体的には下図に示すように、次のコンポーネントを組み合わせて実現しました:

  • Cloud Composer

  • Google Kubernetes Engine

  • BigQuery

  • BI ツール (今回は Data Portal)

▲ dbt を利用するためのサービス構成

Cloud Composer では、DAG と呼ばれる Apache Airflow のワークフロー定義ファイルを定期実行します。その DAG には、Google Kubernetes Engine への API リクエストを行うタスクが含まれます。
リクエストを受けた Kubernetes クラスタでは dbt プロジェクトが実行されます。これにより、dbt プロジェクトの記述に従って BigQuery 上でクエリが実行され、データマートのテーブルが出力されます。

なお、データ基盤の全体像については こちらの記事 をお読みいただければと思います。

これまでの何が問題だったか

『レガシーコードからの脱却』によると、コードがレガシー化する要因の一つは、“CLEAN” でないコードを生んでしまうことにあります。
“CLEAN” とは、高品質なコードの持つ5つの性質「凝集性・疎結合・カプセル化・断定的・非冗長」を指します。

Cohesive (凝集性)
Loosely Coupled (疎結合)
Encapsulated (カプセル化)
Assertive (断定的)
Nonredundant (非冗長)

David Scott Bernstein, 『レガシーコードからの脱却』, 第9章

通常、これらの性質はソフトウェア開発の文脈で論じられるものであり、そのままでは SQL によるデータ処理ロジックの記述に当てはめにくい側面もあります。
他方、参考にできる要素も少なからずあると思われるため、以下、チームで従来作成してきたコードがどのように “CLEAN” でなかったかを振り返ります。

従来の記述方法

従来は、先に示したようなクエリとあわせ、次のような Airflow DAG ファイルを使って実行するケースがありました。

# run_queries.py

...

# タスクの定義、クエリへの参照

task_1 = BigQueryInsertJobOperator(
    task_id='run_query_1',
    configuration={
        "query": {
            "query": "{% include 'queries/lengthy_query_1.sql' %}",
            "destinationTable": ...,
        },
    },
    dag=...,
)

task_2 = BigQueryInsertJobOperator(
    task_id='run_query_2',
    configuration={
        "query": {
            "query": "{% include 'queries/lengthy_query_2.sql' %}",
            "destinationTable": ...,
        },
    },
    dag=...,
)

task_3 = BigQueryInsertJobOperator(
    task_id='run_query_3',
    configuration={
        "query": {
            "query": "{% include 'queries/lengthy_query_3.sql' %}",
            "destinationTable": ...,
        },
    },
    dag=...,
)

...

# 依存関係の定義

task_1 >> task_2 >> task_3 >> ...

...

▲ 長大なクエリ群を参照する DAG

以下、これらの何が問題なのか、具体的に見ていきましょう。

問題1. ワークフローに対してデータ処理の詳細がカプセル化されていない

上記コードでは、データ処理のロジックは、DAG ファイル、および DAG ファイルから参照する SQL ファイルによって表現されています。
各 DAG ファイルでは、クエリ実行のためのタスクを複数定義すると同時に、それらのタスク間の依存性を明示していました。

これは、ワークフローの側にとって、処理の中身に関する詳細が不必要に「見えてしまっている」ことを意味します。

さらに、このようにクエリファイル以外の場所でタスク間の依存関係を明示しなければならないことから、「クエリの妥当な実行順序」を人が解釈する手間も必要になります。
「クエリの変更に伴って依存関係も変更する必要があるのに、誤ってしまう・漏れてしまう」といった人為的なミスのリスクもあります。

問題2. クエリが低凝集である

各タスクから参照されるクエリファイルの中には、1件あたり数百行以上にわたるものがありました。

つまりこれは、一つのクエリが多くの処理を担い、意味のあるまとまりに分割されていない「凝集度の低い」状態といえ、多くの変更に対してもろくなってしまっています。

また可読性も低く、チームでのコミュニケーションに対するハードルや再利用のしにくさがしばしば課題となります。

問題3. データ処理のポリシーに対して実装がカプセル化されていない

データ処理ロジックの中に、ビジネス要件あるいは「ポリシー」への依存度が高く、したがって変更可能性の高い要素がしばしばハードコードされていました。例えば「WHERE 句の条件式のしきい値」がこれに該当します。

「何を計算するか」という目的レベルの記述と、「どう計算するか」という手段レベルの記述が同居してしまっている状況です。

これらが混在すると、前者だけがビジネス上の要請から変更されても、クエリファイル全体を修正しなければならない可能性が高くなります。


以上、従来の方法がコード品質の低下を招きやすいことを確認しました。

dbt をどのように利用したか

今回のプロジェクトでコード品質の低下を避けるため、私たちが dbt を利用して行ったことをご紹介します。

カギになるのは次の2つの機能です:

  1. 依存性グラフの生成機能
    dbt は、複数のクエリファイルを扱うことができます。
    そしてクエリ間に依存関係が存在する場合、プロジェクトのコンパイル時に「依存性グラフ」を生成してくれます。

  2. テンプレーティング機能
    dbt には、Jinja2 ライブラリによるテンプレーティング機能を利用した「変数」「マクロ」などの 仕組み が備わっています。

指標を計算するクエリを dbt プロジェクト上で記述しさえすれば、これらの機能を簡単に利用できます。
結果、次のことを自然な形で実現できました。

1. ワークフローとデータ処理ロジックの分離

クエリ間の依存関係については、dbt がすべて面倒をみてくれるため、DAG ファイル中に記述する必要がなくなります。
そこで、DAG ファイルには、dbt の実行を行う API エンドポイントのキック、および dbt プロジェクト外部の依存関係のみを記述することにしました。

これにより、従来は DAG ファイル中に顔を出していたデータ処理の詳細が、DAG ファイルから隠蔽されています。

# run_dbt_project_foo.py

...

API_ENDPOINT_URL_FOR_PROJECT_FOO = "..."

def request_dbt_foo() -> str:
    r = requests.get(API_ENDPOINT_URL_FOR_PROJECT_FOO)
    if r.status_code != requests.codes.ok:
        r.raise_for_status()
    return 'success'

task = PythonOperator(
    task_id='run_dbt_foo',
    python_callable=request_dbt_foo,
    dag=...,
)

...

▲ Airflow DAG
(dbt のための API エンドポイントを呼び出すタスクの記述)

2. クエリファイルの分割

データ処理を構成する各ステップを、それぞれ小さな責務を持つクエリファイルとして切り出しました。

各クエリは、前段のステップにあたるクエリを参照します。
これまで WITH 句として書かれていた部分は、基本的にすべてこの方法で分割できます。

/* models/visited_on.sql */
/* 訪問日を取得する */

...

SELECT
    user_id  /* ユーザー ID */
    , first_contract_started_on  /* 契約開始日 */
    , visited_on  /* 訪問日 */
FROM
    ...

...

▲ 前段のクエリ

/* models/is_active_in_1wk.sql */
/* アクティブか否かを判定する */

WITH
    _visited_on AS (
        SELECT * FROM {{ ref('visited_on') }}
    )
SELECT 
    user_id
    , MAX ( /* 参考: 契約開始日から数えて2週目 (8-14日目) にユーザーが訪問したかを判定 */
        CASE
            WHEN
                DATE_DIFF(visited_on, first_contract_started_on, DAY)
                BETWEEN ({{ var("n_days_in_week") }})
                    AND ({{ var("n_days_in_week") }} * 2 - 1)
            THEN 1 ELSE 0 
        END
    ) is_active_in_1wk
FROM 
    _visited_on
GROUP BY 
    user_id

▲ 後段のクエリ

ここでのポイントは、後段のクエリでは、 ref() 関数を利用して前段のクエリの結果を参照しているところです。

3. ポリシーと実装の分離

指標の算出に用いるしきい値など、具体的なビジネス要件への依存度が高い要素をクエリから切り出しました。

/* models/paid_users.sql */

SELECT
    user_id
    , first_contract_started_on
FROM
    `<upstream_data_source>`
WHERE
    is_paid
    AND (first_contract_started_on >= '{{ var("plan_starts_on_min") }}')

▲ 変数を含むクエリ

ここで、変数 “plan_starts_on_min” は、「計算対象とするユーザーの集合に関する契約開始日」のしきい値です。 このしきい値はロジックのその他の部分から分離できます。
この変数を参照するために、var() 関数 を使っています。

なお、 {{ … }} の部分は Jinja2 で expression と呼ばれるもので、dbt のコンパイル時に実際の値に置き換えられます。
実は前述の ref() 関数の記述も、この仕組みを利用したものです。

変数の具体的な値は、次のように別ファイルで定義しています。

# dbt_project.yml

...

vars:
    plan_starts_on_min: '2022-01-01'

...

▲ 変数を定義しているファイル

dbt で実現できたこと

以上のように、私たちは dbt の機能を利用して「ワークフローとデータ処理の詳細の分離」「処理自体の分割」「ポリシーと実装の分離」を行いました。

これにより、次のような意味で多少 “CLEAN” なコードをアウトプットできました:

  • ワークフロー DAG に対し、データ処理の詳細がカプセル化されている

  • データ処理ロジックに凝集性がある

  • データ処理のポリシーに対し、実装の詳細がカプセル化されている

“CLEAN” を追求する余地はまだありますが、これだけでもコードの扱いやすさが大きく改善されるのではないかと思います。

実業務への導入に向けて:
データ処理「以前」に重要なこと

ここまでは、クエリの品質向上という観点から dbt の魅力をお伝えしてきました。
ただ、dbt の活用を業務上の成果につなげるためには、データ処理「以前」の領域でも考慮すべきことが少なくありません。
この記事の最後に、そうしたポイントを挙げたいと思います。

要件のプランニング

当然ながら、データ処理は所期のビジネスゴールの達成を目指して行うものです。
したがって、ゴールの達成可能性や成果を高めるには、次のような項目について、丁寧なプランニングを行うことが重要です:

  • ビジネス要件:

    • 指標の定義

    • アウトプットの定義:

      • 可視化アプローチ

      • モニタリングの要件

  • データ処理上の要件:

    • データソース

    • データフロー

開発チームの生産性と文化のための活動

dbt を利用したデータ処理がワークするためには、開発生産性を高めるプラクティスを組み合わせることが必要になります。

GitHub などによるソースコード管理やレビューの体制づくりはその代表例です。また、“CLEAN” の観点では、適切な命名やコーディングスタイルへの配慮も欠かせません。

さらに、それらを支える健全なチームワークや DevOps の文化も、成果のために大きな役割を果たすでしょう。

データマネジメントの活動

今回着目したデータ処理にとどまらない、上流・下流を含む全工程にわたる高品質なデータ運用においては、いわゆるデータマネジメントに関する取り組みが重要です。

具体的には、「データに対するテストの整備」や「メタデータ管理」などが含まれます。

ここでは詳細に触れることができませんでしたが、dbt にはこれらをサポートする機能もあります。

おわりに

この記事では、私たちのチーム取り組んだ KPI モニタリングの事例を題材に、dbt の利用が、データ処理コードのレガシー化を防ぐ上で有効であることをお伝えしました。
あわせて、dbt の活用と同時に考慮すべき視点をご紹介しました。

皆さんの日々のデータ活用において、この記事が何かの参考になりましたら幸いです。
お読みいただきありがとうございました。



私たちのチームでは、質の高いデータパイプラインや機械学習システムの提供を通じ、経営教育サービスの進化をともに牽引していただける方を募集しています。
ご興味をお持ちの方は、下記ページより詳細をご覧ください。
https://recruiting-tech-globis.wraptas.site/


この記事が気に入ったらサポートをしてみませんか?