見出し画像

Pyhonで非同期処理を考える。

まずは"asyncio"を使った処理を考えていきます。Colabを使って実行していこうと思って少しコードを試したときに、エラーが出た時の対処法です。

を参考に

import nest_asyncio

nest_asyncio.apply()

を付け加えればうまくエラーなしで実行されるようになります。

参考

 juputerで動かすときの注意点:Colabでも上記の対応以外にこの方法が使えます

「RuntimeError: asyncio.run() cannot be called from a running event loop」
なぜこんなエラーが発生するか?といえば、なんとjuputerそのものがイベントループで動いているらしい。
最初の方に述べたが、「イベントループの中でイベントループを発生させることはできない」からエラーになるのである。

解決策はいくつかあるが、簡単なのは以下のようにasyncio.run()の代わりにawait関数として実行させてやればいい

https://qiita.com/ku_a_i/items/129543dd7a05342d132f

"asyncio"の使い所

・何が便利?
非同期
(処理をしている間、同期して完了を待つのでなく、次の処理を実行するやり方)を実現します。
非同期により、全体の 処理速度を爆上げ できる場合がよくあります。
例えば、外部サービスリクエストや、ファイル・DBへの読み書きなど、I/O関連は時間がかかる割にCPUは空いてたりするので、そこが有効に活用されるようになるわけです。
多くのシステムはI/Oが大量にあったりするので圧倒的です。

・それってスレッドでもできるのでは?
もちろん可能ですが、コンセプトが異なり、スレッドに比べて 圧倒的にお手軽 です。

スレッドと何が違うのかというと、主な違いは、スレッドは複数の処理が同時に走るということです。処理の裏で別の処理が同時に走るため、値の読み/書きでは必ずロックやスレッドセーフを考慮しなければなりません。

一方で asyncio は処理の裏で別の処理が走ることはあり得ません。(I/Oが走ってることはあり得ますけど。)
すべての処理は直列に走ります。走ってる処理が await になると、待ち行列に並んでいる次の処理が走り始めます。

これが最高に良いのです。ロックもスレッドセーフも考慮不要なのですから。
なぜなら、裏で別の処理が走ることはないわけですから、1つの処理を実行している間は(await するまでは)必ずその処理だけが全てを占有できるわけです。占有できるのでロックやスレッドセーフの考慮は不要となり、ロック/スレッドセーフの高コストな処理がない点も高速化に寄与します。それ以上にコードがシンプルになるのでメンテナンスが楽になります。

では逆に、スレッドが有利になる場合はなんでしょうか。複数のCPUがあり、それを各スレッドに割り当てた時ですかね。(プロセスと呼ぶべきか。)
ただ、ロックやスレッドセーフはそれなりにコストもかかるので、本当にCPUを複数使うことで効果が出るかは要件次第です。
デバッグもとても大変なものになりますので、まずは asyncio から検討するのが良いかと思います。

その他、上記サイトでは具体的な使い方として以下紹介されています。

まず基本的なところですが

import asyncio

async def main():
  print('Hello ...')
  await asyncio.sleep(1)
  print('... World!')

asyncio.run(main())

を実行すると'Hello ...'が出てきて1秒後に'... World!'が出てきます。

await asyncio.sleep(1)

で1秒間待ってから次が実行されます。

他の機能として、sleepgathercreate_task / TaskTask.resultTask.doneTaskGroupLockSemaphoreEventQueuePriorityQueue
などサンプルコードをあげて紹介されています。

次にこちらのサイトのコードを使わせていただきColabで実行してみてasyncioの効果を測定します。
まず"requests"を利用してやってみます。

import requests
import time
import pandas as pd

result = []

def get_poke(id):
    r = requests.get(f'https://pokeapi.co/api/v2/pokemon/{id}')
    result.append([r.json()['id'], r.json()['name']])
    
def main():
    start = time.time() 
    for poke_id in range(1,152):
        get_poke(poke_id)
    end = time.time()
    print(f"processing time:{end-start}") #処理時間を測定する
    
main()
print(pd.DataFrame(result,columns=['id','name']).sort_values('id').reset_index(drop=True))
asyncioの効果を測定

実行すると

processing time:25.087804317474365
      id        name
0      1   bulbasaur
1      2     ivysaur
2      3    venusaur
3      4  charmander
4      5  charmeleon
..   ...         ...
146  147     dratini
147  148   dragonair
148  149   dragonite
149  150      mewtwo
150  151         mew

[151 rows x 2 columns]

processing time:25.087804317474365

25秒かかっていることがわかります。

次にasyncio:並行処理でやってみます。

これを実行する前にasyncio版のrequestsである"httpx"を

 !pip install httpx

とインストールして以下実行します。

import httpx #asyncio版のrequests
import time
import asyncio
import pandas as pd

result = []

async def get_poke(id):
    async with httpx.AsyncClient() as client:
        r = await client.get(f'https://pokeapi.co/api/v2/pokemon/{id}')
        result.append([r.json()['id'], r.json()['name']])

#コルーチン
async def main():
    start = time.time() 
    tasks = []
    #タスクを設定する(151匹分)
    for poke_id in range(1,152):
        tasks.append(get_poke(poke_id))
    #タスク実行
    await asyncio.gather(*tasks)
    end = time.time()
    print(f"processing time:{end-start}") #処理時間を測定する
    
asyncio.run(main()) #イベントループ作成
print(pd.DataFrame(result,columns=['id','name']).sort_values('id').reset_index(drop=True))

を実行すると

processing time:17.935431003570557
      id        name
0      1   bulbasaur
1      2     ivysaur
2      3    venusaur
3      4  charmander
4      5  charmeleon
..   ...         ...
146  147     dratini
147  148   dragonair
148  149   dragonite
149  150      mewtwo
150  151         mew

[151 rows x 2 columns]

processing time:17.935431003570557

17秒となりました。参考サイトでは3秒となっていましたがColabの環境では少し時間がかかるようですが、asyncioを使うことで短縮できました。

この記事が気に入ったらサポートをしてみませんか?