こんにちわ alivelimb です。
FastAPI などで見える機会が増えたasyncio
ですが、本当に恩恵があるのかベンチマークテストしてみました。
「そもそもasyncio
って何?」という方のために簡単に紹介してみます。
詳細は公式ドキュメントまたは@JunyaFffさんのスライドが非常にわかりやすいです。
asyncio はその名の通り非同期(async) I/O の実装に活用できます。
ネットワーク通信を含む Input/Ouput の際は処理待ちが発生し CPU を持て余してしまいます。
File I/O の間 CPU を別の処理に割り当てることで CPU をフル活用させることができます。
フル活用と言ってもasyncio
単体では 1 スレッドの CPU 使用率が 100%ということです。
multiprocessing,joblibなどのマルチプロセスを行う処理では CPU の全ての論理コアで CPU を活用できます。
asyncio などの 1 スレッドで、CPU を利用する処理を切り替える方法を「並行処理」、joblib などの複数スレッドで同時に処理を行う方法を「並列処理」と言ったりします。
ではマルチプロセスの方が良いのか?というとそういう訳ではありません。
プログラムの処理速度が CPU 速度に大きく依存する場合(CPU bound と呼んだりします)、マルチプロセスが適しています。使える CPU がたくさんあった方が良いからですね。
逆に I/O 速度に大きく依存する場合(I/O bound と呼んだりします)、非同期 I/O が適しています。CPU がたくさんあっても、CPU が暇を持て余すだけだからですね。
また、マルチプロセスと非同期 I/O は組み合わせることもできますが、本記事ではこちらの検証は行ないません。興味のある方は英語ですが以下の記事を参照してください。
asyncio
を用いて 3 つの sleep を並行処理してみましょう。
まずは普通に書くと以下のようになります。
import timeperiods=[1,2,3]for periodin periods: time.sleep(period)
1, 2, 3 秒ずつ sleep するため処理が終了するには合計 6 秒かかります。
次にasyncio
を用いると以下のようになります。
import asyncio# コルーチン関数の定義asyncdefasync_multi_sleep(periods:list[float])->None:for periodin periods:await asyncio.sleep(period)periods=[1.0,2.0,3.0]# コルーチンの実行asyncio.run(async_multi_sleep(periods))
上記の通り、大まかの手順としては以下の通りです。
async def
でコルーチン関数を定義するasyncio
でコルーチンを実行するただし、上記を実行しても合計時間は 6 秒かかります。
コルーチンを並行処理するには Task を作成する必要があります。
import asyncio# コルーチン関数の定義asyncdefasync_multi_sleep(periods:list[float])->None: tasks=[]# タスクの作成for periodin periods: task= asyncio.create_task(asyncio.sleep(period)) tasks.append(task) ´# 他のタスクに順番を譲るfor taskin tasks:await taskperiods=[1.0,2.0,3.0]# コルーチンの実行asyncio.run(async_multi_sleep(periods))
上記で実装した場合、3 秒で処理が終了します。
なお、上記の内容はasyncio.gather
を用いて以下のように実装することもできます。
asyncdefasync_multi_sleep(periods:list[float])->None:await asyncio.gather(*[asyncio.sleep(period)for periodin periods])periods=[1.0,2.0,3.0]# コルーチンの実行asyncio.run(async_multi_sleep(periods))
ベンチマークテストとして処理時間を計測します。
テスト対象としては以下の通りです。
No | 内容 | 利用パッケージ |
---|---|---|
1 | スリープ | asyncio.sleep |
2 | File I/O | aiofiles |
3 | Network I/O - HTTP | aiohttp |
4 | Network I/O - DB | aiosqlite |
また、各処理に CPU 処理を組み合わせてテストを行います。
テストツールとしては以下を利用します。
なお、pytest-benchmark
は本記事の執筆時点でasyncio
に対応していません。そのためGithub の Issueを参考に以下のような fixture を作成して対応しました。
@pytest.fixture(scope="function")defaio_benchmark(benchmark):import asyncioimport threadingclassSync2Async:def__init__(self, coro,*args,**kwargs): self.coro= coro self.args= args self.kwargs= kwargs self.custom_loop=None self.thread=Nonedefstart_background_loop(self)->None: asyncio.set_event_loop(self.custom_loop) self.custom_loop.run_forever()def__call__(self): evloop=None awaitable= self.coro(*self.args,**self.kwargs)try: evloop= asyncio.get_running_loop()except:passif evloopisNone:return asyncio.run(awaitable)else:ifnot self.custom_loopornot self.threadornot self.thread.is_alive(): self.custom_loop= asyncio.new_event_loop() self.thread= threading.Thread(target=self.start_background_loop, daemon=True) self.thread.start()return asyncio.run_coroutine_threadsafe(awaitable, self.custom_loop).result()def_wrapper(func,*args,**kwargs):if asyncio.iscoroutinefunction(func): benchmark(Sync2Async(func,*args,**kwargs))else: benchmark(func,*args,**kwargs)return _wrapper
まずは既に紹介したスリープについてです。
テスト対象コード
import asynciofrom timeimport sleep# -----------------------------------------------------------# 指定された時間スリープする# -----------------------------------------------------------defmulti_sleep(periods:list[float])->None:for periodin periods: sleep(period)asyncdefasync_multi_sleep(periods:list[float])->None:for periodin periods:await asyncio.sleep(period)asyncdefasync_multi_sleep_gather(periods:list[float])->None:await asyncio.gather(*[asyncio.sleep(period)for periodin periods])
テストコード
import pytestfrom c001_asyncioimport( async_multi_sleep, async_multi_sleep_gather, async_multi_sleep_gather_from_blocking, multi_sleep,)SLEEP_PERIODS=[0.2,0.2,0.2]# -----------------------------------------------------------# 指定された時間スリープするテスト# -----------------------------------------------------------deftest_multi_sleep(benchmark):@benchmarkdef_(): multi_sleep(SLEEP_PERIODS)@pytest.mark.asyncioasyncdeftest_async_multi_sleep(aio_benchmark):@aio_benchmarkasyncdef_():await async_multi_sleep(SLEEP_PERIODS)@pytest.mark.asyncioasyncdeftest_async_multi_sleep_gather(aio_benchmark):@aio_benchmarkasyncdef_():await async_multi_sleep_gather(SLEEP_PERIODS)
結果は以下の通りです。
---------------------------------------------------------------------------------------- benchmark: 3 tests ----------------------------------------------------------------------------------------Name (time in ms) Min Max Mean StdDev Median IQR Outliers OPS Rounds Iterations----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_async_multi_sleep_gather 201.6565 (1.0) 202.6901 (1.0) 202.3728 (1.0) 0.4125 (1.0) 202.5256 (1.0) 0.3803 (1.0) 1;0 4.9414 (1.0) 5 1test_async_multi_sleep 603.5638 (2.99) 616.0021 (3.04) 606.5454 (3.00) 5.3204 (12.90) 604.3013 (2.98) 4.1270 (10.85) 1;1 1.6487 (0.33) 5 1test_multi_sleep 606.8920 (3.01) 611.6967 (3.02) 608.6299 (3.01) 1.8088 (4.39) 608.0931 (3.00) 1.4615 (3.84) 1;1 1.6430 (0.33) 5 1----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Mean 列に注目すると、想定通りtest_async_multi_sleep_gather
のみ並行処理の恩恵を受けて約 200ms で終了しています。ただ、実際のプロダクトで sleep することはあまりないと思うので、別のパターンも見ていきましょう。
次は File I/O です。
非同期の File I/O にはaiofilesを用います。
今回は 512MB のデータを 10 個のファイルに書き込む処理でベンチマークをしてみます。
テスト対象コード
import asynciofrom pathlibimport Pathimport aiofiles# -----------------------------------------------------------# 指定されたデータをファイルに書き込む# -----------------------------------------------------------defcreate_file_with_data(path: Path, data:bytes)->None:with path.open("wb")as f: f.write(data)asyncdefasync_create_file_with_data(path: Path, data:bytes)->None:asyncwith aiofiles.open(path.as_posix(),"wb")as f:await f.write(data)# -----------------------------------------------------------# 指定されたデータを指定された数だけファイルに書き込む# -----------------------------------------------------------defcreate_multi_files_with_data(paths:list[Path], data:bytes)->None:for pathin paths: create_file_with_data(path, data)asyncdefasync_create_multi_files_with_data(paths:list[Path], data:bytes)->None:await asyncio.gather(*[async_create_file_with_data(path, data)for pathin paths])
テストコード
from pathlibimport Pathfrom tempfileimport NamedTemporaryFileimport pytestfrom c002_aiofilesimport( async_create_file_with_data, async_create_multi_files_with_data, create_file_with_data, create_multi_files_with_data,)CREATE_FILE_SIZE=2**28CREATE_FILE_NUM=10@pytest.fixture(scope="function")defnamed_tempfile(): fw= NamedTemporaryFile("w")yield fw fw.close()@pytest.fixture(scope="function")defnamed_tempfiles(): files=[NamedTemporaryFile("w")for _inrange(CREATE_FILE_NUM)]yield filesfor fwin files: fw.close()@pytest.fixture(scope="module")defhuge_bytes():returnb"\0"* CREATE_FILE_SIZE# -----------------------------------------------------------# 指定されたデータをファイルに書き込むテスト# -----------------------------------------------------------deftest_create_file_with_data(benchmark, named_tempfile, huge_bytes):@benchmarkdef_(): create_file_with_data(Path(named_tempfile.name), huge_bytes)@pytest.mark.asyncioasyncdeftest_async_create_file_with_data(aio_benchmark, named_tempfile, huge_bytes):@aio_benchmarkasyncdef_():await async_create_file_with_data(Path(named_tempfile.name), huge_bytes)# -----------------------------------------------------------# 指定されたデータを指定された数だけファイルに書き込むテスト# -----------------------------------------------------------deftest_create_multi_files_with_data(benchmark, named_tempfiles, huge_bytes):@benchmarkdef_(): create_multi_files_with_data([Path(named_tempfile.name)for named_tempfilein named_tempfiles], huge_bytes,)@pytest.mark.asyncioasyncdeftest_async_create_multi_files_with_data(aio_benchmark, named_tempfiles, huge_bytes):@aio_benchmarkasyncdef_():await async_create_multi_files_with_data([Path(named_tempfile.name)for named_tempfilein named_tempfiles], huge_bytes,)
結果は以下の通りです
--------------------------------------------------------------------------------------------------- benchmark: 4 tests --------------------------------------------------------------------------------------------------Name (time in ms) Min Max Mean StdDev Median IQR Outliers OPS Rounds Iterations-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_async_create_file_with_data 92.4087 (1.0) 198.6375 (1.07) 127.5377 (1.0) 37.2938 (1.10) 108.2839 (1.0) 61.2919 (1.15) 3;0 7.8408 (1.0) 11 1test_create_file_with_data 99.5028 (1.08) 185.3546 (1.0) 150.5592 (1.18) 37.0968 (1.09) 158.6906 (1.47) 63.7308 (1.19) 1;0 6.6419 (0.85) 5 1test_async_create_multi_files_with_data 1,171.8733 (12.68) 1,254.7769 (6.77) 1,219.3601 (9.56) 33.9193 (1.0) 1,231.4451 (11.37) 53.3607 (1.0) 2;0 0.8201 (0.10) 5 1test_create_multi_files_with_data 1,175.0913 (12.72) 1,327.1361 (7.16) 1,242.5917 (9.74) 67.4861 (1.99) 1,232.8131 (11.39) 122.8071 (2.30) 2;0 0.8048 (0.10) 5 1-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
!?
これは想定とは異なります。
File I/O 段階では sleep と同様に待ちが発生するため、並列処理を行なった方が 10 倍近く早くなる想定でしたが結果は異なり、処理速度に大きな違いは見られませんでした。
aiofiles の Issueを見ると「処理が早くはならないかも知れないが、メインスレッドで別の処理を行えることが出来るのが利点だ」との記述があります。腑に落ちないですが、CPU 処理も組み合わせてみましょう。
組み合わせる前に、まずは CPU 処理のベンチマークをとってみましょう。
CPU 処理は以下の通り、単純な数列和(高校数学で習うシグマ)です。
1000 万までのシグマを 3 回計算します。
# -----------------------------------------------------------# 指定された整数までの総和を計算する# -----------------------------------------------------------defsigma(num:int)->int: v=0for iinrange(num+1): v+= ireturn vasyncdefasync_sigma(num:int)->int: v=0for iinrange(num+1): v+= ireturn v# -----------------------------------------------------------# 指定された各整数までの総和を計算する# -----------------------------------------------------------defmulti_sigma(nums:list[int])->list[int]:return[sigma(num)for numin nums]asyncdefasync_multi_sigma(nums:list[int])->list[int]:returnawait asyncio.gather(*[async_sigma(num)for numin nums])
テストコード
from c002_aiofilesimport( async_multi_sigma, async_sigma, multi_sigma, sigma,)N_SIGMA=10**7V_SIGMA=50000005000000N_SIGMA_TIMES=3# -----------------------------------------------------------# 指定された整数までの総和を計算するテスト# -----------------------------------------------------------deftest_sigma(benchmark):@benchmarkdef_():assert sigma(N_SIGMA)== V_SIGMA@pytest.mark.asyncioasyncdeftest_async_sigma(aio_benchmark):@aio_benchmarkasyncdef_():assertawait async_sigma(N_SIGMA)== V_SIGMA# -----------------------------------------------------------# 指定された各整数までの総和を計算するテスト# -----------------------------------------------------------deftest_multi_sigma(benchmark):@benchmarkdef_(): result= multi_sigma([N_SIGMAfor _inrange(N_SIGMA_TIMES)])assert result==[V_SIGMAfor _inrange(N_SIGMA_TIMES)]@pytest.mark.asyncioasyncdeftest_async_multi_sigma(aio_benchmark):@aio_benchmarkasyncdef_(): result=await async_multi_sigma([N_SIGMAfor _inrange(N_SIGMA_TIMES)])assert result==[V_SIGMAfor _inrange(N_SIGMA_TIMES)]
結果は以下の通りです。
------------------------------------------------------------------------------------------ benchmark: 4 tests -----------------------------------------------------------------------------------------Name (time in ms) Min Max Mean StdDev Median IQR Outliers OPS Rounds Iterations-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_async_sigma 480.9756 (1.0) 499.0507 (1.0) 492.1004 (1.0) 6.7240 (1.0) 493.6568 (1.0) 6.3340 (1.0) 2;0 2.0321 (1.0) 5 1test_sigma 500.9957 (1.04) 519.9479 (1.04) 511.1150 (1.04) 7.5248 (1.12) 512.2812 (1.04) 11.8297 (1.87) 2;0 1.9565 (0.96) 5 1test_async_multi_sigma 1,466.5203 (3.05) 1,494.5071 (2.99) 1,479.8562 (3.01) 10.9483 (1.63) 1,481.6104 (3.00) 16.4346 (2.59) 2;0 0.6757 (0.33) 5 1test_multi_sigma 1,492.3126 (3.10) 1,547.9187 (3.10) 1,523.0065 (3.09) 27.9279 (4.15) 1,534.3684 (3.11) 53.7643 (8.49) 2;0 0.6566 (0.32) 5 1-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
並列処理によって処理速度に違いはありませんが、こちらは想定通りです。今回の処理は CPU bound なため、CPU に待ちが発生せず並列処理の恩恵を受けられません。では I/O 処理と組み合わせるとどうでしょうか?
テスト対象コード
# -----------------------------------------------------------# File I/O と CPUタスクを行う(データ指定)# -----------------------------------------------------------defmix_io_cpu_with_data(paths:list[Path], data:bytes, nums:list[int])->list[int]: create_multi_files_with_data(paths, data)return multi_sigma(nums)asyncdefasync_mix_io_cpu_with_data(paths:list[Path], data:bytes, nums:list[int])->list[int]: result=await asyncio.gather( async_create_multi_files_with_data(paths, data), async_multi_sigma(nums),)return result[1]
テストコード
from c002_aiofilesimport( async_mix_io_cpu_with_data, mix_io_cpu_with_data,)# -----------------------------------------------------------# File I/O と CPUタスクを行うテスト(データ指定)# -----------------------------------------------------------deftest_mix_io_cpu_with_data(benchmark, named_tempfiles, huge_bytes):@benchmarkdef_(): result= mix_io_cpu_with_data([Path(named_tempfile.name)for named_tempfilein named_tempfiles], huge_bytes,[N_SIGMAfor _inrange(N_SIGMA_TIMES)],)assert result==[V_SIGMAfor _inrange(N_SIGMA_TIMES)]@pytest.mark.asyncioasyncdeftest_async_mix_io_cpu_with_data(aio_benchmark, named_tempfiles, huge_bytes):@aio_benchmarkasyncdef_(): result=await async_mix_io_cpu_with_data([Path(named_tempfile.name)for named_tempfilein named_tempfiles], huge_bytes,[N_SIGMAfor _inrange(N_SIGMA_TIMES)],)assert result==[V_SIGMAfor _inrange(N_SIGMA_TIMES)]
結果は以下の通りです。
------------------------------------------------------------------------------------- benchmark: 2 tests -------------------------------------------------------------------------------------Name (time in s) Min Max Mean StdDev Median IQR Outliers OPS Rounds Iterations----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_async_mix_io_cpu_with_data 2.4868 (1.0) 2.6220 (1.0) 2.5505 (1.0) 0.0528 (1.0) 2.5410 (1.0) 0.0798 (1.0) 2;0 0.3921 (1.0) 5 1test_mix_io_cpu_with_data 2.6234 (1.05) 2.9007 (1.11) 2.8053 (1.10) 0.1114 (2.11) 2.8472 (1.12) 0.1423 (1.78) 1;0 0.3565 (0.91) 5 1---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
asyncio を使った方が、やや早くなっていますが劇的な変化は見られませんでした。。。
私自身、腑に落ちる理由が分かっていないので、心当たりのある方はコメントで教えていただけると幸いです。
次にaiohttpを用いたネットワーク I/O (HTTP)のベンチマークを行ってみましょう。まずアクセスされたら指定の時間だけスリープする API を作成します。
defcreate_app()-> web.Application:asyncdefhandle(request: Any)-> web.Response: period=float(request.rel_url.query["period"])await asyncio.sleep(period)return web.Response() app= web.Application() app.add_routes([web.get("/sleep", handle)])return appif __name__=="__main__": app= create_app() web.run_app(app)
これとは別プロセスで HTTP リクエストを行います。
# -----------------------------------------------------------# /sleepリソースに停止時間を指定してHTTPリクエストを行う# -----------------------------------------------------------defsleep_request(url:str, period:float)->int:with urllib.request.urlopen(f"{url}/sleep?period={period}")as response:return response.statusasyncdefasync_sleep_request(url:str, period:float)->int:asyncwith aiohttp.ClientSession()as session:asyncwith session.get(f"{url}/sleep?period={period}")as response:return response.status# -----------------------------------------------------------# /sleepリソースに停止時間を指定した数だけHTTPリクエストを行う# -----------------------------------------------------------defmulti_sleep_requests(url:str, periods:list[float])->list[int]:return[sleep_request(url, period)for periodin periods]asyncdefasync_multi_sleep_requests(url:str, periods:list[float])->list[int]:returnawait asyncio.gather(*[async_sleep_request(url, period)for periodin periods])
これまでのようにベンチマークを実施してみましょう。
SERVER_URL="http://0.0.0.0:8080"deftest_sleep_request(benchmark):@benchmarkdef_(): status= sleep_request(SERVER_URL,0.2)assert status==200@pytest.mark.asyncioasyncdeftest_async_sleep_request(aio_benchmark):@aio_benchmarkasyncdef_(): status=await async_sleep_request(SERVER_URL,0.2)assert status==200deftest_multi_sleep_requests(benchmark):@benchmarkdef_(): status= multi_sleep_requests(SERVER_URL,[0.2,0.2,0.2])assert status==[200,200,200]@pytest.mark.asyncioasyncdeftest_async_multi_sleep_requests(aio_benchmark):@aio_benchmarkasyncdef_(): status=await async_multi_sleep_requests(SERVER_URL,[0.2,0.2,0.2])assert status==[200,200,200]
結果は以下の通りです。
----------------------------------------------------------------------------------------- benchmark: 4 tests -----------------------------------------------------------------------------------------Name (time in ms) Min Max Mean StdDev Median IQR Outliers OPS Rounds Iterations------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_sleep_request 202.4012 (1.0) 203.9504 (1.0) 203.4526 (1.0) 0.6106 (1.38) 203.6606 (1.0) 0.5861 (1.0) 1;0 4.9151 (1.0) 5 1test_async_sleep_request 205.5099 (1.02) 207.0953 (1.02) 206.3653 (1.01) 0.7193 (1.63) 206.6543 (1.01) 1.2889 (2.20) 2;0 4.8458 (0.99) 5 1test_async_multi_sleep_requests 208.4528 (1.03) 209.4410 (1.03) 208.8828 (1.03) 0.4413 (1.0) 208.6549 (1.02) 0.7507 (1.28) 1;0 4.7874 (0.97) 5 1test_multi_sleep_requests 609.3281 (3.01) 623.3630 (3.06) 613.3552 (3.01) 5.7904 (13.12) 610.7264 (3.00) 5.9304 (10.12) 1;0 1.6304 (0.33) 5 1------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
こちらは想定通りの動きをしています。HTTP リクエストをしている間に並行処理が走っているため、200ms x 3 = 600ms
ではなく、1 回分の 200ms で処理が終わっています。
ではクライアント側で重めの CPU 処理を回している場合はどうでしょうか。
N_SIGMA=10**5V_SIGMA=5000050000N_SIGMA_TIMES=3# -----------------------------------------------------------# Network I/O と CPUタスクを行う# -----------------------------------------------------------defmix_network_io_cpu(url:str, periods:list[float], nums:list[int])->list[int]: multi_sleep_requests(url, periods)return multi_sigma(nums)asyncdefasync_mix_network_io_cpu(url:str, periods:list[float], nums:list[int])->list[int]: result=await asyncio.gather( async_multi_sleep_requests(url, periods), async_multi_sigma(nums),)return result[1]
こちらもベンチマークを回してみましょう。
# -----------------------------------------------------------# Network I/O と CPUタスクを行うテスト(サイズ指定)# -----------------------------------------------------------deftest_mix_network_io_cpu(benchmark):@benchmarkdef_(): result= mix_network_io_cpu( SERVER_URL,[1,1,1],[N_SIGMAfor _inrange(N_SIGMA_TIMES)],)assert result==[V_SIGMAfor _inrange(N_SIGMA_TIMES)]@pytest.mark.asyncioasyncdeftest_async_mix_network_io_cpu(aio_benchmark):@aio_benchmarkasyncdef_(): result=await async_mix_network_io_cpu( SERVER_URL,[1,1,1],[N_SIGMAfor _inrange(N_SIGMA_TIMES)],)assert result==[V_SIGMAfor _inrange(N_SIGMA_TIMES)]
結果は以下の通りです。
------------------------------------------------------------------------------------ benchmark: 2 tests ------------------------------------------------------------------------------------Name (time in s) Min Max Mean StdDev Median IQR Outliers OPS Rounds Iterations--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_async_mix_network_io_cpu 1.0208 (1.0) 1.0343 (1.0) 1.0253 (1.0) 0.0053 (1.0) 1.0243 (1.0) 0.0053 (1.0) 1;0 0.9753 (1.0) 5 1test_mix_network_io_cpu 3.0221 (2.96) 3.0393 (2.94) 3.0311 (2.96) 0.0072 (1.37) 3.0341 (2.96) 0.0119 (2.23) 2;0 0.3299 (0.34) 5 1--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
こちらも想定通り、並行処理の効果が効いていますね。
aiohttp を用いた Network I/O では、並行処理の恩恵で実際に処理が早くなることが確認できました。
ちなみに aiohttp ではaiohttp_client
という fixture を利用することが出来るため、本来であれば別プロセスでアプリを起動せず、pytest のみで完結するはずです。しかし以下の実装で試すとRuntimeError: Task got Future attached to a different loop.
というエラーが出たため、今回は別プロセスでアプリを起動する方法で迂回しました。
# ※エラーが発生するコード@pytest.mark.asyncioasyncdeftest_sleep_request(aiohttp_client, aio_benchmark):@aio_benchmarkasyncdef_(): app_client=await aiohttp_client(create_app()) period=1 response=await app_client.get(f"/sleep?period={period}")assert response.status==200
最後は aiosqlite を用いた DB 接続です。
!今回は SQLite を用いますが、databasesには他の DB のasyncio
に対応したクライアントが用意されています。
これまで通り、まずはスリープから検証していきましょう。
DB にはスリープ関数がないため、自作関数を作成してスリープを実装します
# -----------------------------------------------------------# DBにアクセスして指定した時間だけスリープする# -----------------------------------------------------------defsleep_db_request(period:float)->None:with sqlite3.connect(":memory:")as db: db.create_function("sleep",1, time.sleep) cur= db.cursor() cur.execute(f"SELECT sleep({period})")asyncdefasync_sleep_db_request(period:float)->None:asyncwith aiosqlite.connect(":memory:")as db:await db.create_function("sleep",1, time.sleep) cur=await db.cursor()await cur.execute(f"SELECT sleep({period})")# -----------------------------------------------------------# DBにアクセスして指定した時間の数だけスリープする# -----------------------------------------------------------defmulti_sleep_db_requests(periods:list[float])->None:for periodin periods: sleep_db_request(period)asyncdefasync_multi_sleep_db_requests(periods:list[float])->None:await asyncio.gather(*[async_sleep_db_request(period)for periodin periods])
テストケースも今まで通りです
deftest_sleep_db_request(benchmark):@benchmarkdef_(): sleep_db_request(0.2)@pytest.mark.asyncioasyncdeftest_async_sleep_db_request(aio_benchmark):@aio_benchmarkasyncdef_():await async_sleep_db_request(0.2)deftest_multi_sleep_requests(benchmark):@benchmarkdef_(): multi_sleep_db_requests([0.2,0.2,0.2])@pytest.mark.asyncioasyncdeftest_async_multi_sleep_requests(aio_benchmark):@aio_benchmarkasyncdef_():await async_multi_sleep_db_requests([0.2,0.2,0.2])
結果は以下の通りです。
----------------------------------------------------------------------------------------- benchmark: 4 tests -----------------------------------------------------------------------------------------Name (time in ms) Min Max Mean StdDev Median IQR Outliers OPS Rounds Iterations------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_sleep_db_request 201.3555 (1.0) 205.2846 (1.0) 203.7979 (1.0) 1.8179 (1.73) 204.9371 (1.0) 3.0303 (1.87) 1;0 4.9068 (1.0) 5 1test_async_sleep_db_request 204.7722 (1.02) 207.3065 (1.01) 206.2676 (1.01) 1.0537 (1.0) 206.7830 (1.01) 1.6162 (1.0) 1;0 4.8481 (0.99) 5 1test_async_multi_sleep_requests 209.6312 (1.04) 214.5691 (1.05) 211.9721 (1.04) 1.8173 (1.72) 212.0738 (1.03) 2.2477 (1.39) 2;0 4.7176 (0.96) 5 1test_multi_sleep_requests 607.1996 (3.02) 611.5772 (2.98) 609.8215 (2.99) 2.2275 (2.11) 611.2378 (2.98) 4.0439 (2.50) 2;0 1.6398 (0.33) 5 1------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
こちらも aiohttp の時と同様、並列処理の恩恵を受けていますね。
念の為クライアント側で重めの CPU 処理を追加して検証します。
# -----------------------------------------------------------# DB Network I/O と CPUタスクを行う# -----------------------------------------------------------defmix_db_io_cpu(periods:list[float], nums:list[int])->list[int]: multi_sleep_db_requests(periods)return multi_sigma(nums)asyncdefasync_mix_db_io_cpu(periods:list[float], nums:list[int])->list[int]: result=await asyncio.gather( async_multi_sleep_db_requests(periods), async_multi_sigma(nums),)return result[1]
テストコードもほとんど変更はありません
# -----------------------------------------------------------# DB Network I/O と CPUタスクを行うテスト(サイズ指定)# -----------------------------------------------------------deftest_mix_db_io_cpu(benchmark):@benchmarkdef_(): result= mix_db_io_cpu([1,1,1],[N_SIGMAfor _inrange(N_SIGMA_TIMES)],)assert result==[V_SIGMAfor _inrange(N_SIGMA_TIMES)]@pytest.mark.asyncioasyncdeftest_async_mix_db_io_cpu(aio_benchmark):@aio_benchmarkasyncdef_(): result=await async_mix_db_io_cpu([1,1,1],[N_SIGMAfor _inrange(N_SIGMA_TIMES)],)assert result==[V_SIGMAfor _inrange(N_SIGMA_TIMES)]
結果は以下の通りです。
---------------------------------------------------------------------------------- benchmark: 2 tests ---------------------------------------------------------------------------------Name (time in s) Min Max Mean StdDev Median IQR Outliers OPS Rounds Iterations---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_async_mix_db_io_cpu 1.0220 (1.0) 1.0400 (1.0) 1.0344 (1.0) 0.0072 (1.0) 1.0371 (1.0) 0.0076 (1.0) 1;0 0.9668 (1.0) 5 1test_mix_db_io_cpu 3.0259 (2.96) 3.0451 (2.93) 3.0336 (2.93) 0.0081 (1.13) 3.0320 (2.92) 0.0135 (1.78) 1;0 0.3296 (0.34) 5 1---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
やはり想定通り、aiosqlite でも並列処理の恩恵を受けることを確認できました。
今回は 「asyncio で並列処理をすることで本当に処理が早くなるか」をベンチマークテストで確認しました。aiofiles のみ処理が速くなることは確認できませんでしたが、aiohttp, aiosqlite では並列処理の恩恵を受けることが確認できました。
FastAPI をはじめとする ASGI 対応の Web フレームワークでの DB 接続やネットワーク I/O では積極的に asyncio、コルーチンを使っていくことで処理の高速化が図れそうです。
aiofiles を用いたベンチマークで結果が想定通りにならなかった件については、そもそも aiofiles では処理が速くならないのか、それとも実装に問題があるのか、もしわかる方がいればコメント頂けると幸いです。
バッジを受け取った著者にはZennから現金やAmazonギフトカードが還元されます。
aiofiles を用いたベンチマークで結果が想定通りにならなかった件については、そもそも aiofiles では処理が速くならないのか、それとも実装に問題があるのか、もしわかる方がいればコメント頂けると幸いです。
Python と asyncio を使ったのが数年前なので間違っていたらすみません。
今回のケースでは ThreadPoolExecutor の最大ワーカースレッド数に引っかかているのでないでしょうか?
issue の記述とソースをざっと読んだ感じではaiofiles のwrite()
はrun_in_executor()
をデフォルトのエクゼキューターで使っていると思われます。
What aiofiles does is delegate the file reading operations to a threadpool.
write()
はブロッキング処理になるので、これを多用するとスレッドプールを消費してしまうはずです。
ちょっと趣旨は違いますが下記の記事が参考になるかと思います。
https://pod.hatenablog.com/entry/2019/03/21/162511
一方でrun_in_executor()
には下記のような記述があります。
https://docs.python.org/ja/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
バージョン 3.5.3 で変更:loop.run_in_executor() は内部で生成するスレッドプールエグゼキュータの
max_workers
を設定せず、代わりにスレッドプールエグゼキュータ (ThreadPoolExecutor) にデフォルト値を設定させるようになりました。
さらに ThreadPoolExecutor の方を見ると、おそらく最新の Python では下記がデフォルトになっていると思われます。
https://docs.python.org/ja/3/library/concurrent.futures.html#threadpoolexecutor
バージョン 3.8 で変更: Default value of max_workers is changed to
min(32, os.cpu_count() + 4)
. This default value preserves at least 5 workers for I/O bound tasks. It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL. And it avoids using very large resources implicitly on many-core machines.
これは、Codespace(CPU 4 コア) で確認すると下記のように最大ワーカースレッド数が 8 ということになります。
$ grep "model name" < /proc/cpuinfomodel name : Intel(R) Xeon(R) Platinum 8168 CPU @ 2.70GHzmodel name : Intel(R) Xeon(R) Platinum 8168 CPU @ 2.70GHzmodel name : Intel(R) Xeon(R) Platinum 8168 CPU @ 2.70GHzmodel name : Intel(R) Xeon(R) Platinum 8168 CPU @ 2.70GHz$ python3Python 3.8.10 (default, Jun 22 2022, 20:18:18) [GCC 9.4.0] on linuxType "help", "copyright", "credits" or "license" for more information.>>> import os>>> min(32, os.cpu_count() + 4)8
もしも測定されている環境での最大ワーカースレッド数に余裕がなさそうでしたら、I/O の並行数をら減らしてみるのはいかがでしょうか?
コメント頂きありがとうございます!
(Pythonのthreadに関する理解が足りておらず、共有いただいた資料などを読ませて頂いていた都合で回答が遅れてしまいました...)
まず私の環境での最大ワーカスレッド数は以下で確認した通り12となります
$ python -c 'import os; print(min(32, os.cpu_count() + 4))'12
もしも測定されている環境での最大ワーカースレッド数に余裕がなさそうでしたら、I/O の並行数を減らしてみるのはいかがでしょうか?
こちらの「最大ワーカスレッド数に余裕がないか」の検証方法が分からず検証出来ていないのですが、「I/O の並行数を減らしてみる」という部分は以下の記事を参考にSemaphoreを用いて検証してみました。
asyncdefasync_create_file_with_data_sema(path: Path, data:bytes, sema: asyncio.Semaphore)->None:asyncwith aiofiles.open(path.as_posix(),"wb")as f, sema:await f.write(data)# テスト対象コードasyncdefasync_create_multi_files_with_data_sema(paths:list[Path], data:bytes, limit:int)->None: sema= asyncio.Semaphore(limit)await asyncio.gather(*[async_create_file_with_data_sema(path, data, sema)for pathin paths])# テストコード@pytest.mark.asyncioasyncdeftest_async_create_multi_files_with_data_sema(aio_benchmark, named_tempfiles, huge_bytes):@aio_benchmarkasyncdef_():await async_create_multi_files_with_data_sema([Path(named_tempfile.name)for named_tempfilein named_tempfiles], huge_bytes,12)
limit数は12以外にも4, 8でも検証しましたが、結果としては速度は改善せずでした。。。
「hankei6kmさんが想定していた検証方法ではないかも...?」とも思っているので、お時間ある際に再びコメント頂けると幸いです。
ご返答ありがとうございます。
こちらの書き方が曖昧だったばかりにお手間をとらせてしまってすみません。
limit数は12以外にも4, 8でも検証しましたが、結果としては速度は改善せずでした。。。
「hankei6kmさんが想定していた検証方法ではないかも...?」とも思っているので、お時間ある際に再びコメント頂けると幸いです。
想定していた通りですが、もう少し単純に「CREATE_FILE_NUM を最大ワーカースレッド数 -3 くらいにしてみたらいかかでしょうか」という意味で書いていました。
(-3 はとくに意味があるわけではなく、メインスレッドの他に pytest が何かスレッドを生成している可能性を考慮しての値です)
今回は予想だけで書いてしまうのも申し訳ないので実際に環境を作って試してみました。
https://github.com/hankei6km/test-python-asyncio
前述の Codespace(最大ワーカースレッド数 8)の環境でCREATE_FILE_NUM = 5
にしてみましたがこちらでも違いは出ませんでした。
# CREATE_FILE_NUM = 10CREATE_FILE_NUM=5
pytest -k 'multi'
で実行しています。
----------------------------------------------------------------------------------------- benchmark: 2 tests -----------------------------------------------------------------------------------------Name (time in s) Min Max Mean StdDev Median IQR Outliers OPS Rounds Iterations------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_create_multi_files_with_data 6.5082 (1.0) 6.6423 (1.01) 6.5706 (1.0) 0.0513 (2.17) 6.5615 (1.0) 0.0734 (1.62) 2;0 0.1522 (1.0) 5 1test_async_create_multi_files_with_data 6.5482 (1.01) 6.5976 (1.0) 6.5707 (1.00) 0.0236 (1.0) 6.5645 (1.00) 0.0453 (1.0) 1;0 0.1522 (1.00) 5 1------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
I/O 性能の頭打ちかと思い、iotop で様子を眺めていたのですが、同期と非同期どちらも似たような感じになります。
同期処理時
非同期処理時(write にあわせてスレッドが増えています)
スクリーンショットだと非同期の方が良い値ですが、どちらも Current は 200 M/s を前後しています。
それならばと、ためしに書き出すファイルを異なるボリュームに分散させてみましたが、同期と非同期ともに速くなってしまったのでこちらも違いはでませんでした。
(これ以降もCREATE_FILE_NUM = 5
で試しています)
$df /tmp /home/vscode/tmp/Filesystem 1K-blocks Used Available Use% Mounted on/dev/sda1 32845584 352 31151232 1%/tmpoverlay 32847680 2353564 28800020 8% /
偶数のときは/tmp
、奇数のときは/home/vscode/tmp
に書き込みます。
@pytest.fixture(scope="function")defnamed_tempfiles(): files=[NamedTemporaryFile("w",dir="/tmp"if i%2==0else"/home/vscode/tmp")for iinrange(CREATE_FILE_NUM)]# files = [NamedTemporaryFile("w", dir="/home/vscode/tmp")# for _ in range(CREATE_FILE_NUM)]# files = [NamedTemporaryFile("w") for _ in range(CREATE_FILE_NUM)]yield filesfor fwin files: fw.close()
----------------------------------------------------------------------------------------- benchmark: 2 tests -----------------------------------------------------------------------------------------Name (time in s) Min Max Mean StdDev Median IQR Outliers OPS Rounds Iterations------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_create_multi_files_with_data 3.8486 (1.0) 4.0886 (1.02) 3.9493 (1.0) 0.1022 (2.20) 3.9499 (1.0) 0.1736 (2.50) 1;0 0.2532 (1.0) 5 1test_async_create_multi_files_with_data 3.8907 (1.01) 4.0031 (1.0) 3.9608 (1.00) 0.0464 (1.0) 3.9607 (1.00) 0.0695 (1.0) 1;0 0.2525 (1.00) 5 1------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
もっと性能良くないとダメかなということで、続いてtmpfs(メモリー上に作られるファイルシステムと思ってください)をマウントし試してみるとようやく違いがでました。
$sudomount-t tmpfs-osize=4000m tmpfs /home/vscode/tmp/$df /home/vscode/tmp/Filesystem 1K-blocks Used Available Use% Mounted ontmpfs 4096000 0 4096000 0% /home/vscode/tmp
全部のファイルを tmpfs へ書き込みます。
@pytest.fixture(scope="function")defnamed_tempfiles():# files = [NamedTemporaryFile(# "w", dir="/tmp" if i % 2 == 0 else "/home/vscode/tmp") for i in range(CREATE_FILE_NUM)] files=[NamedTemporaryFile("w",dir="/home/vscode/tmp")for _inrange(CREATE_FILE_NUM)]# files = [NamedTemporaryFile("w") for _ in range(CREATE_FILE_NUM)]yield filesfor fwin files: fw.close()
--------------------------------------------------------------------------------------------- benchmark: 2 tests ----------------------------------------------------------------------------------------------Name (time in ms) Min Max Mean StdDev Median IQR Outliers OPS Rounds Iterations----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_async_create_multi_files_with_data 270.3285 (1.0) 283.7868 (1.0) 275.1938 (1.0) 5.4192 (1.0) 275.0887 (1.0) 7.2456 (1.0) 1;0 3.6338 (1.0) 5 1test_create_multi_files_with_data 647.1230 (2.39) 683.4950 (2.41) 660.6459 (2.40) 14.2596 (2.63) 655.9037 (2.38) 18.3033 (2.53) 1;0 1.5137 (0.42) 5 1----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
5 倍まではいきませんが、速くはなったと言える値かと思います。
この状態だと最大ワーカースレッド数を変更すると若干ですが速度に影響がありました。
threadPool= ThreadPoolExecutor(max_workers=2)asyncdefasync_create_file_with_data(path: Path, data:bytes)->None:# async with aiofiles.open(path.as_posix(), "wb") as f:asyncwith aiofiles.open(path.as_posix(),"wb", executor=threadPool)as f:await f.write(data)
---------------------------------------------------------------------------------------------- benchmark: 2 tests ----------------------------------------------------------------------------------------------Name (time in ms) Min Max Mean StdDev Median IQR Outliers OPS Rounds Iterations----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_async_create_multi_files_with_data 385.8565 (1.0) 433.2469 (1.0) 397.1514 (1.0) 20.2649 (2.14) 387.9202 (1.0) 14.3998 (1.95) 1;1 2.5179 (1.0) 5 1test_create_multi_files_with_data 649.6608 (1.68) 673.3505 (1.55) 656.7036 (1.65) 9.4867 (1.0) 652.8568 (1.68) 7.3829 (1.0) 1;1 1.5228 (0.60) 5 1----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
そのようなわけで当初の「最大ワーカースレッド数に引っかかっているのでは?」はあまり正しくありませんでした。
まとめると以下のようになるかと思います。
ただし、I/O 性能が原因という決定的な証拠も見つからなかったというのが正直なところです(「I/O 時にはスレッドのロックが開放されるので速くなるけど、やっぱりファイルの操作は遅いよ」という記述が見つかる程度です)。
最終的に何がネックなっているのか(Python のスレッド制御なのか、I/O性能なのかなど)はイマイチはっきりしていないのが申し訳ないのですが、試した範囲でわかったことをまとめてみました。
下記のコメントに追記しましたが、今回の環境では I/O 性能(同時アクセスの性能)がネックで並行化しても思ったように速くならなかったと考えられます。
小出しになってしまってすみません、追記です。
本当に I/O が頭打ちだったのか気になったのでdd
コマンドを使って簡単に試してみました。
実行環境は上記のベンチマークを動かした Codespace を使っています。
まず、各ファイルへ順次書き出す処理です。
(/tmp/test?.tmp
へ\0
を 2**28 バイト書き出しています)
ddif=/dev/zerobs=268435456count=1of=/tmp/test1.tmpddif=/dev/zerobs=268435456count=1of=/tmp/test2.tmpddif=/dev/zerobs=268435456count=1of=/tmp/test3.tmpddif=/dev/zerobs=268435456count=1of=/tmp/test4.tmpddif=/dev/zerobs=268435456count=1of=/tmp/test5.tmp
新規でファイル作成すると 900MB/s くらいはでます(後半、なぜ遅くなっているかは不明です)
1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.289604 s, 927 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.288978 s, 929 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.291378 s, 921 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.90479 s, 297 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 1.08273 s, 248 MB/s
上書き(削除 & 作成)すると 200MB/s 前後になります。
1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 1.07726 s, 249 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 1.28004 s, 210 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 1.27764 s, 210 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 1.20428 s, 223 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 1.37003 s, 196 MB/s
続いて並行化した場合です。
ddif=/dev/zerobs=268435456count=1of=/tmp/test1.tmp&ddif=/dev/zerobs=268435456count=1of=/tmp/test2.tmp&ddif=/dev/zerobs=268435456count=1of=/tmp/test3.tmp&ddif=/dev/zerobs=268435456count=1of=/tmp/test4.tmp&ddif=/dev/zerobs=268435456count=1of=/tmp/test5.tmp
こちらも最初の 1 回は速くなりますが、5 で割ったような値になります。
1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.949643 s, 283 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 1.44472 s, 186 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 1.61763 s, 166 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 1.81 s, 148 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 2.26454 s, 119 MB/s
上書き(削除 & 作成)すると 200MB/s を 5 で割ったよりかは少し良い値ですが、全体の経過時間では 6.3 秒なのでベンチマークとあまり違いはありません。
1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 5.0185 s, 53.5 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 5.07003 s, 52.9 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 5.9657 s, 45.0 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 6.2768 s, 42.8 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 6.38367 s, 42.1 MB/s
以上のように asyncio のベンチマークのときと比べて大きな違いはないようでした。
このことから、開発マシンの/tmp
などに書き出した場合は asyncio に限らずファイル書き出しの並行化のメリットはあまり受けられないことになるかと思われます。
一方で tmpfs の場合は並行化してもパフォーマンスは 5 で割った値にまでは落ち込みませんでした。
順次書き出した場合。
1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.223789 s, 1.2 GB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.226139 s, 1.2 GB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.235712 s, 1.1 GB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.233061 s, 1.2 GB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.237924 s, 1.1 GB/s
並行して書き出した場合
1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.338262 s, 794 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.417389 s, 643 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.403607 s, 665 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.498165 s, 539 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.49274 s, 545 MB/s
これは速度的な性能というよりも、ファイルシステムの構成上(メモリー上に作成されるので)「同時アクセスの性能が良い」という感じかと思われます。
よって、asycio などによる並行処理でファイル処理を並行化した場合、このような同時アクセスで性能が落ちないストレージならばある程度の速度向上が見込めると考えられます。
(例、冗長化されたディスクなど)
様々な検証を試して頂きありがとうございます!
読ませて頂いていて「なるほど、こうやって検証する方法があるのか」と非常に勉強になりました。
asycio などによる並行処理でファイル処理を並行化した場合、このような同時アクセスで性能が落ちないストレージならばある程度の速度向上が見込めると考えられます。
なるほど、asyncio, aiofilesの問題というよりはファイルシステム側の問題の可能性が高そうですね。記事本文内でも紹介したaiofilesのIssueでは「処理が早くはならないかも知れないが、メインスレッドで別の処理を行えることが出来るのが利点だ」という記述があり、「処理は早くならないの...?なぜ?」という疑問があったのですが、腑に落ちる原因がありそうでスッキリしました。
ベンチマーク取ってませんが、CPU Boundのasync_sigma
関数は内部で待ち状態が発生しないので無意味な比較になっていると思います。内部で待たないのでasyncio.gather
しても別のtaskへの移動は発生せず逐次実行になります。
例えば、async_sigma
関数の中にprint関数を追加してgatherしても次のような逐次実行になります。
asyncdefasync_sigma(num:int)->int:print(f"async sigma({num}) start") v=0for iinrange(num+1):print(f"async sigma({num})", i) v+= iprint(f"async sigma({num}) end")return vasyncdefasync_multi_sigma(nums:list[int])->list[int]:returnawait asyncio.gather(*[async_sigma(num)for numin nums])await async_multi_sigma([5,3,2])
async sigma(5) startasync sigma(5) 0async sigma(5) 1async sigma(5) 2async sigma(5) 3async sigma(5) 4async sigma(5) 5async sigma(5) endasync sigma(3) startasync sigma(3) 0async sigma(3) 1async sigma(3) 2async sigma(3) 3async sigma(3) endasync sigma(2) startasync sigma(2) 0async sigma(2) 1async sigma(2) 2async sigma(2) end[0]: [15, 6, 3]
並行(concurrent)処理になるように中で待たせる簡単な方法としては、await asyncio.sleep(0)
をforの中で呼ぶと一時待ち状態になり別のtaskに移動して並行処理になります。
asyncdefasync_sigma(num:int)->int:print(f"async sigma({num}) start") v=0for iinrange(num+1):print(f"async sigma({num})", i) v+= iawait asyncio.sleep(0)print(f"async sigma({num}) end")return vasyncdefasync_multi_sigma(nums:list[int])->list[int]:returnawait asyncio.gather(*[async_sigma(num)for numin nums])await async_multi_sigma([5,3,2])
async sigma(5) startasync sigma(5) 0async sigma(3) startasync sigma(3) 0async sigma(2) startasync sigma(2) 0async sigma(5) 1async sigma(3) 1async sigma(2) 1async sigma(5) 2async sigma(3) 2async sigma(2) 2async sigma(5) 3async sigma(3) 3async sigma(2) endasync sigma(5) 4async sigma(3) endasync sigma(5) 5async sigma(5) end[1]: [15, 6, 3]
これで速くなるかは不明ですが、並行処理として比較するなら必要かと思います。ややこしいんですがgatherだけでは並行にも並列(parallel)にもならないようです。
また、上のコメントで速くなっていたのはシングルスレッドの並行処理でなくマルチスレッド(ThreadPool)による並列(parallel)処理になっているからだと思われます。一方でCPU Boundの処理はProcessPoolに入れると速くなるのでおそらく実感できるはずです。(下記のドキュメント参照)
このawaitable loop.run_in_executor(executor, func, *args)
はおそらく低水準APIに設計されているようなのですがもう少し雑に使える高水準APIとしてcoroutine asyncio.to_thread(func, /, *args, **kwargs)
があります。おそらく我々がgatherに求めてるのはこいつで、与えた関数(コルーチンじゃダメ) がスレッドになり、gatherにまとめて渡すと並列(parallel)実行になります。これでも速度の実感ができるはずです。
(loop.run_in_executor(executor, func, *args)
のThreadPoolを使った場合とどういう差があるのかまでは自分もよくわかってませんが)
https://docs.python.org/ja/3.11/library/asyncio-task.html#running-in-threads