見出し画像

SnowflakeでPython非同期処理を実装してみた

分析屋の中田(ナカタ)です。
SnowflakeでPythonの非同期処理を試してみました。


非同期処理とは

複数の処理を、順番に進めていく逐次的な方法を「同期処理」と言います。
反対に、一部の処理を同時並行で進めていく方法を「非同期処理」と言います。
同期処理の方がイメージつきやすく実装も簡単です。

非同期処理の大きなメリットは
複数の処理を同時並行で進めたことでの時間短縮です。

例えば、5分かかる処理と3分かかる処理があったとします。
同期処理なら計8分かかる見積になりますが
非同期処理なら同時に処理が進むため5分で終わる見積になります。

よく料理で例えられますが、

電子レンジで食材をチンしている間に、別の食材をカットするのが非同期処理
電子レンジの温めが終わるまでじーっと待って、終わったら別の食材をカットするのが同期処理

というイメージです。

今回やること

SnowflakeのPythonワークシートにて
複数の処理を定義し、非同期処理として実行させます。
例として、洗濯機を回している間にお湯を沸かしてみそ汁を作る処理を書きます。
比較のため、洗濯機が回り終わってからお湯を沸かしてみそ汁を作ること(同期処理)も実装します。

また、コードの中ではsleep(10)として10秒停止、とさせていますが
10分停止ということで脳内変換してください。
sleep(600)で本当に10分停止させてもいいのですが、10分間ウェアハウスが起動し続けて課金時間が長くなってしまいます。

環境

Snowflakeのエディション:エンタープライズ版
クラウド:AWS(東京リージョン)

事前準備

Pythonワークシートを作成します。
ワークシート画面上部のタブに+ボタンがあります。
+ボタンをクリックして「Pythonワークシート」をクリックすれば
Pythonワークシートが1つ作られます。

非同期処理を実装してみた

①必要ライブラリのインポート

Pythonワークシートを作成した時点で、サンプルコードが書かれています。
サンプルコードを軸に、いろいろ追記していきます。

Snowflake上でPythonを記述するためのsnowparkがインポートされています。

import snowflake.snowpark as snowpark # Snowflake上でPythonを動かす
from snowflake.snowpark.functions import col # カラムを取り出す関数(mainで使用)

他にも以下を追加でインポートします。

import time # 時刻の取得
# ↓日本語をprint出力するためのおまじない
import io,sys
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
import asyncio # 非同期処理を実現する

非同期処理で必要なのは標準モジュールの asyncio(えーしんく あいおー)です。
他は今回のサンプルコードのためのおまけです。

②main関数

デフォルトで表示されています。
こちらはそのままにしておきます。

def main(session: snowpark.Session): 
    # Your code goes here, inside the "main" handler.
    tableName = 'information_schema.packages'
    dataframe = session.table(tableName).filter(col("language") == 'python')

    # Print a sample of the dataframe to standard output.
    dataframe.show()

    # Return value will appear in the Results tab.
    return dataframe

③洗濯機を回す

洗濯機を回す処理をする関数、sentakuを実装しています。
defの前にasyncをつけることで「この関数は中断・再開できるよ!」と宣言できます。
awaitは「この処理が終わらないと先には進まないよ!でも他の処理は並行できるよ!」です。
以下の例では
・print文で「[!]洗濯機を回しています...」と表示
・10秒停止(他の関数の処理を並行できる)
・print文で「[!]洗濯が終わりました」と表示
の順で処理が進みます。

async def sentaku():
    print("[!]洗濯機を回しています...")
    await asyncio.sleep(10)
    print("[!]洗濯が終わりました")

10分間で洗濯が回り終わったことになります。
お急ぎモードてきな洗濯をしたんだと思ってください。

④お湯を沸かす

async def boil_water():
    print("お湯を沸かします...")
    time.sleep(3)
    print("お湯が沸きました")

お湯を沸かす関数boil_waterを作りました。
先程のawait asyncio.sleepではなく、time.sleepを使っています。
使い分けですが、他の処理を並行させるのか?並行させずに完全に待たせるのか?で分けます。
今回はお湯が沸くまで3分間、おとなしく待ってもらおうとしているということです。

⑤みそ汁を作る

もちろん(?)インスタントみそ汁ですので、お湯を注いで1分で完成です。
まずはお湯を沸かす関数boil_waterを呼び出して、awaitで沸くまで待たせています。

async def miso_soup():
    await boil_water()
    print("みそ汁を作ります...")
    time.sleep(1)
    print("みそ汁ができました")

他に並行する作業がないのでtime.sleepとしています。
今回の例だとawait asyncio.sleepに置き換えても変化はありません。

⑥全体のフロー

async def yarukoto_all():
    await asyncio.gather(sentaku(),miso_soup())
    print("洗濯回してみそ汁を飲みました")

洗濯して、みそ汁をつくるという処理を走らせる関数です。
最後にprint文で「洗濯回してみそ汁を飲みました」と表示させて完了です。

⑦実行部分

begin_time = time.time() # 処理の開始時刻
asyncio.run(yarukoto_all()) # 非同期で処理実行
end_time = time.time() # 処理の終了時刻
print(f"家事の時間: {end_time - begin_time:.0f} 分")

処理全体を実行し、かかる時間を計測しています。

並行実行のコード全体

import snowflake.snowpark as snowpark # Snowflake上でPythonを動かす
from snowflake.snowpark.functions import col # カラムを取り出す関数(mainで使用)
import time # 時刻の取得
# 日本語をprint出力するためのおまじない
import io,sys
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
import asyncio # 非同期処理を実現する

def main(session: snowpark.Session): 
    # Your code goes here, inside the "main" handler.
    tableName = 'information_schema.packages'
    dataframe = session.table(tableName).filter(col("language") == 'python')

    # Print a sample of the dataframe to standard output.
    dataframe.show()

    # Return value will appear in the Results tab.
    return dataframe

async def sentaku():
    print("[!]洗濯機を回しています...")
    await asyncio.sleep(10)
    print("[!]洗濯が終わりました")

async def boil_water():
    print("お湯を沸かします...")
    time.sleep(3)
    print("お湯が沸きました")

async def miso_soup():
    await boil_water()
    print("みそ汁を作ります...")
    time.sleep(1)
    print("みそ汁ができました")

async def yarukoto_all():
    await asyncio.gather(sentaku(),miso_soup())
    print("洗濯回してみそ汁を飲みました")

begin_time = time.time() # 処理の開始時刻
asyncio.run(yarukoto_all()) # 非同期で処理実行
end_time = time.time() # 処理の終了時刻
print(f"家事の時間: {end_time - begin_time:.0f} 分")

結果は以下の通りです。

洗濯機が回っている10分間の待ち時間で、お湯を沸かしてみそ汁を作っています。

参考:逐次実行の場合

非同期処理を実装しなかったコードは以下の通りです。

import snowflake.snowpark as snowpark # Snowflake上でPythonを動かす
from snowflake.snowpark.functions import col # カラムを取り出す関数(mainで使用)
import time # 時刻の取得
# 日本語をprint出力するためのおまじない
import io,sys
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')

def main(session: snowpark.Session): 
    # Your code goes here, inside the "main" handler.
    tableName = 'information_schema.packages'
    dataframe = session.table(tableName).filter(col("language") == 'python')
    # Print a sample of the dataframe to standard output.
    dataframe.show()
    # Return value will appear in the Results tab.
    return dataframe

def sentaku():
    print("[!]洗濯機を回しています...")
    time.sleep(10)
    print("[!]洗濯が終わりました")

def boil_water():
    print("お湯を沸かします...")
    time.sleep(3)
    print("お湯が沸きました")

def miso_soup():
    boil_water()
    print("みそ汁を作ります...")
    time.sleep(1)
    print("みそ汁ができました")

def yarukoto_all():
    sentaku()
    miso_soup()
    print("洗濯回してみそ汁を飲みました")

begin_time = time.time() # 処理の開始時刻
yarukoto_all()
end_time = time.time() # 処理の終了時刻
print(f"家事の時間: {end_time - begin_time:.0f} 分")

結果は以下の通りです。

回る洗濯機をじっと監視して(10分)
洗濯機が回り終わってからお湯を沸かし(3分)
インスタントみそ汁を作り(1分)
合計で14分かかっています。

最後に

Snowflake上での実際の処理時間は、ウェアハウスの起動時間などがあるため少し膨れます。
Snowflake上ではタスクを実装することでDAGの実行ができますが
今回のように関数単位で非同期処理を実装することも可能です。

実行時間を短縮できる可能性がある反面、実行順序を把握しきれずにバグを生み出す可能性もあります。
処理速度がシビアに求められる場合は非同期処理を実装して、綿密にテストするという方針でよいかもしれません。



ここまでお読みいただき、ありがとうございました!
この記事が少しでも参考になりましたら「スキ」を押していただけると幸いです!

これまでの記事はこちら!

株式会社分析屋について

弊社が作成を行いました分析レポートを、鎌倉市観光協会様HPに掲載いただきました。

ホームページはこちら。

noteでの会社紹介記事はこちら。

【データ分析で日本を豊かに】
分析屋はシステム分野・ライフサイエンス分野・マーケティング分野の知見を生かし、多種多様な分野の企業様のデータ分析のご支援をさせていただいております。 「あなたの問題解決をする」をモットーに、お客様の抱える課題にあわせた解析・分析手法を用いて、問題解決へのお手伝いをいたします!
【マーケティング】
マーケティング戦略上の目的に向けて、各種のデータ統合及び加工ならびにPDCAサイクル運用全般を支援や高度なデータ分析技術により複雑な課題解決に向けての分析サービスを提供いたします。
【システム】
アプリケーション開発やデータベース構築、WEBサイト構築、運用保守業務などお客様の問題やご要望に沿ってご支援いたします。
【ライフサイエンス】
機械学習や各種アルゴリズムなどの解析アルゴリズム開発サービスを提供いたします。過去には医療系のバイタルデータを扱った解析が主でしたが、今後はそれらで培った経験・技術を工業など他の分野の企業様の問題解決にも役立てていく方針です。
【SES】
SESサービスも行っております。

この記事が参加している募集

やってみた