Movatterモバイル変換


[0]ホーム

URL:


Zenn
alivelimbalivelimb

asyncioで並行処理すると本当に速くなるのか、実際にベンチマークしてみた

に公開7件

こんにちわ alivelimb です。
FastAPI などで見える機会が増えたasyncioですが、本当に恩恵があるのかベンチマークテストしてみました。

はじめに

「そもそもasyncioって何?」という方のために簡単に紹介してみます。
詳細は公式ドキュメントまたは@JunyaFffさんのスライドが非常にわかりやすいです。

asyncio とは?

asyncio はその名の通り非同期(async) I/O の実装に活用できます。
ネットワーク通信を含む Input/Ouput の際は処理待ちが発生し CPU を持て余してしまいます。

blocking

File I/O の間 CPU を別の処理に割り当てることで CPU をフル活用させることができます。

non-blocking

フル活用と言ってもasyncio単体では 1 スレッドの CPU 使用率が 100%ということです。
multiprocessing,joblibなどのマルチプロセスを行う処理では CPU の全ての論理コアで CPU を活用できます。

!

asyncio などの 1 スレッドで、CPU を利用する処理を切り替える方法を「並処理」、joblib などの複数スレッドで同時に処理を行う方法を「並処理」と言ったりします。

ではマルチプロセスの方が良いのか?というとそういう訳ではありません。

プログラムの処理速度が CPU 速度に大きく依存する場合(CPU bound と呼んだりします)、マルチプロセスが適しています。使える CPU がたくさんあった方が良いからですね。

逆に I/O 速度に大きく依存する場合(I/O bound と呼んだりします)、非同期 I/O が適しています。CPU がたくさんあっても、CPU が暇を持て余すだけだからですね。

また、マルチプロセスと非同期 I/O は組み合わせることもできますが、本記事ではこちらの検証は行ないません。興味のある方は英語ですが以下の記事を参照してください。

どうやって実装するの?

asyncioを用いて 3 つの sleep を並行処理してみましょう。

まずは普通に書くと以下のようになります。

import timeperiods=[1,2,3]for periodin periods:    time.sleep(period)

1, 2, 3 秒ずつ sleep するため処理が終了するには合計 6 秒かかります。

次にasyncioを用いると以下のようになります。

import asyncio# コルーチン関数の定義asyncdefasync_multi_sleep(periods:list[float])->None:for periodin periods:await asyncio.sleep(period)periods=[1.0,2.0,3.0]# コルーチンの実行asyncio.run(async_multi_sleep(periods))

上記の通り、大まかの手順としては以下の通りです。

  1. async defでコルーチン関数を定義する
  2. asyncioでコルーチンを実行する

ただし、上記を実行しても合計時間は 6 秒かかります。
コルーチンを並行処理するには Task を作成する必要があります。

import asyncio# コルーチン関数の定義asyncdefasync_multi_sleep(periods:list[float])->None:    tasks=[]# タスクの作成for periodin periods:        task= asyncio.create_task(asyncio.sleep(period))        tasks.append(task)    ´# 他のタスクに順番を譲るfor taskin tasks:await taskperiods=[1.0,2.0,3.0]# コルーチンの実行asyncio.run(async_multi_sleep(periods))

上記で実装した場合、3 秒で処理が終了します。
なお、上記の内容はasyncio.gatherを用いて以下のように実装することもできます。

asyncdefasync_multi_sleep(periods:list[float])->None:await asyncio.gather(*[asyncio.sleep(period)for periodin periods])periods=[1.0,2.0,3.0]# コルーチンの実行asyncio.run(async_multi_sleep(periods))

benchmark

ベンチマークテストとして処理時間を計測します。
テスト対象としては以下の通りです。

No内容利用パッケージ
1スリープasyncio.sleep
2File I/Oaiofiles
3Network I/O - HTTPaiohttp
4Network I/O - DBaiosqlite

また、各処理に CPU 処理を組み合わせてテストを行います。
テストツールとしては以下を利用します。

  • pytest
  • pytest-benchmark
  • pytest-asyncio
  • pytest-aiohttp

なお、pytest-benchmarkは本記事の執筆時点でasyncioに対応していません。そのためGithub の Issueを参考に以下のような fixture を作成して対応しました。

@pytest.fixture(scope="function")defaio_benchmark(benchmark):import asyncioimport threadingclassSync2Async:def__init__(self, coro,*args,**kwargs):            self.coro= coro            self.args= args            self.kwargs= kwargs            self.custom_loop=None            self.thread=Nonedefstart_background_loop(self)->None:            asyncio.set_event_loop(self.custom_loop)            self.custom_loop.run_forever()def__call__(self):            evloop=None            awaitable= self.coro(*self.args,**self.kwargs)try:                evloop= asyncio.get_running_loop()except:passif evloopisNone:return asyncio.run(awaitable)else:ifnot self.custom_loopornot self.threadornot self.thread.is_alive():                    self.custom_loop= asyncio.new_event_loop()                    self.thread= threading.Thread(target=self.start_background_loop, daemon=True)                    self.thread.start()return asyncio.run_coroutine_threadsafe(awaitable, self.custom_loop).result()def_wrapper(func,*args,**kwargs):if asyncio.iscoroutinefunction(func):            benchmark(Sync2Async(func,*args,**kwargs))else:            benchmark(func,*args,**kwargs)return _wrapper

benchmark: sleep

まずは既に紹介したスリープについてです。

テスト対象コード

import asynciofrom timeimport sleep# -----------------------------------------------------------# 指定された時間スリープする# -----------------------------------------------------------defmulti_sleep(periods:list[float])->None:for periodin periods:        sleep(period)asyncdefasync_multi_sleep(periods:list[float])->None:for periodin periods:await asyncio.sleep(period)asyncdefasync_multi_sleep_gather(periods:list[float])->None:await asyncio.gather(*[asyncio.sleep(period)for periodin periods])

テストコード

import pytestfrom c001_asyncioimport(    async_multi_sleep,    async_multi_sleep_gather,    async_multi_sleep_gather_from_blocking,    multi_sleep,)SLEEP_PERIODS=[0.2,0.2,0.2]# -----------------------------------------------------------# 指定された時間スリープするテスト# -----------------------------------------------------------deftest_multi_sleep(benchmark):@benchmarkdef_():        multi_sleep(SLEEP_PERIODS)@pytest.mark.asyncioasyncdeftest_async_multi_sleep(aio_benchmark):@aio_benchmarkasyncdef_():await async_multi_sleep(SLEEP_PERIODS)@pytest.mark.asyncioasyncdeftest_async_multi_sleep_gather(aio_benchmark):@aio_benchmarkasyncdef_():await async_multi_sleep_gather(SLEEP_PERIODS)

結果は以下の通りです。

---------------------------------------------------------------------------------------- benchmark: 3 tests ----------------------------------------------------------------------------------------Name (time in ms)                      Min                 Max                Mean            StdDev              Median               IQR            Outliers     OPS            Rounds  Iterations----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_async_multi_sleep_gather     201.6565 (1.0)      202.6901 (1.0)      202.3728 (1.0)      0.4125 (1.0)      202.5256 (1.0)      0.3803 (1.0)           1;0  4.9414 (1.0)           5           1test_async_multi_sleep            603.5638 (2.99)     616.0021 (3.04)     606.5454 (3.00)     5.3204 (12.90)    604.3013 (2.98)     4.1270 (10.85)         1;1  1.6487 (0.33)          5           1test_multi_sleep                  606.8920 (3.01)     611.6967 (3.02)     608.6299 (3.01)     1.8088 (4.39)     608.0931 (3.00)     1.4615 (3.84)          1;1  1.6430 (0.33)          5           1----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Mean 列に注目すると、想定通りtest_async_multi_sleep_gatherのみ並行処理の恩恵を受けて約 200ms で終了しています。ただ、実際のプロダクトで sleep することはあまりないと思うので、別のパターンも見ていきましょう。

benchmark: aiofiles

次は File I/O です。
非同期の File I/O にはaiofilesを用います。
今回は 512MB のデータを 10 個のファイルに書き込む処理でベンチマークをしてみます。

テスト対象コード

import asynciofrom pathlibimport Pathimport aiofiles# -----------------------------------------------------------# 指定されたデータをファイルに書き込む# -----------------------------------------------------------defcreate_file_with_data(path: Path, data:bytes)->None:with path.open("wb")as f:        f.write(data)asyncdefasync_create_file_with_data(path: Path, data:bytes)->None:asyncwith aiofiles.open(path.as_posix(),"wb")as f:await f.write(data)# -----------------------------------------------------------# 指定されたデータを指定された数だけファイルに書き込む# -----------------------------------------------------------defcreate_multi_files_with_data(paths:list[Path], data:bytes)->None:for pathin paths:        create_file_with_data(path, data)asyncdefasync_create_multi_files_with_data(paths:list[Path], data:bytes)->None:await asyncio.gather(*[async_create_file_with_data(path, data)for pathin paths])

テストコード

from pathlibimport Pathfrom tempfileimport NamedTemporaryFileimport pytestfrom c002_aiofilesimport(    async_create_file_with_data,    async_create_multi_files_with_data,    create_file_with_data,    create_multi_files_with_data,)CREATE_FILE_SIZE=2**28CREATE_FILE_NUM=10@pytest.fixture(scope="function")defnamed_tempfile():    fw= NamedTemporaryFile("w")yield fw    fw.close()@pytest.fixture(scope="function")defnamed_tempfiles():    files=[NamedTemporaryFile("w")for _inrange(CREATE_FILE_NUM)]yield filesfor fwin files:        fw.close()@pytest.fixture(scope="module")defhuge_bytes():returnb"\0"* CREATE_FILE_SIZE# -----------------------------------------------------------# 指定されたデータをファイルに書き込むテスト# -----------------------------------------------------------deftest_create_file_with_data(benchmark, named_tempfile, huge_bytes):@benchmarkdef_():        create_file_with_data(Path(named_tempfile.name), huge_bytes)@pytest.mark.asyncioasyncdeftest_async_create_file_with_data(aio_benchmark, named_tempfile, huge_bytes):@aio_benchmarkasyncdef_():await async_create_file_with_data(Path(named_tempfile.name), huge_bytes)# -----------------------------------------------------------# 指定されたデータを指定された数だけファイルに書き込むテスト# -----------------------------------------------------------deftest_create_multi_files_with_data(benchmark, named_tempfiles, huge_bytes):@benchmarkdef_():        create_multi_files_with_data([Path(named_tempfile.name)for named_tempfilein named_tempfiles],            huge_bytes,)@pytest.mark.asyncioasyncdeftest_async_create_multi_files_with_data(aio_benchmark, named_tempfiles, huge_bytes):@aio_benchmarkasyncdef_():await async_create_multi_files_with_data([Path(named_tempfile.name)for named_tempfilein named_tempfiles],            huge_bytes,)

結果は以下の通りです

--------------------------------------------------------------------------------------------------- benchmark: 4 tests --------------------------------------------------------------------------------------------------Name (time in ms)                                  Min                   Max                  Mean             StdDev                Median                 IQR            Outliers     OPS            Rounds  Iterations-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_async_create_file_with_data               92.4087 (1.0)        198.6375 (1.07)       127.5377 (1.0)      37.2938 (1.10)       108.2839 (1.0)       61.2919 (1.15)          3;0  7.8408 (1.0)          11           1test_create_file_with_data                     99.5028 (1.08)       185.3546 (1.0)        150.5592 (1.18)     37.0968 (1.09)       158.6906 (1.47)      63.7308 (1.19)          1;0  6.6419 (0.85)          5           1test_async_create_multi_files_with_data     1,171.8733 (12.68)    1,254.7769 (6.77)     1,219.3601 (9.56)     33.9193 (1.0)      1,231.4451 (11.37)     53.3607 (1.0)           2;0  0.8201 (0.10)          5           1test_create_multi_files_with_data           1,175.0913 (12.72)    1,327.1361 (7.16)     1,242.5917 (9.74)     67.4861 (1.99)     1,232.8131 (11.39)    122.8071 (2.30)          2;0  0.8048 (0.10)          5           1-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

!?

これは想定とは異なります。
File I/O 段階では sleep と同様に待ちが発生するため、並列処理を行なった方が 10 倍近く早くなる想定でしたが結果は異なり、処理速度に大きな違いは見られませんでした。

aiofiles の Issueを見ると「処理が早くはならないかも知れないが、メインスレッドで別の処理を行えることが出来るのが利点だ」との記述があります。腑に落ちないですが、CPU 処理も組み合わせてみましょう。

組み合わせる前に、まずは CPU 処理のベンチマークをとってみましょう。

CPU 処理は以下の通り、単純な数列和(高校数学で習うシグマ)です。
1000 万までのシグマを 3 回計算します。

# -----------------------------------------------------------# 指定された整数までの総和を計算する# -----------------------------------------------------------defsigma(num:int)->int:    v=0for iinrange(num+1):        v+= ireturn vasyncdefasync_sigma(num:int)->int:    v=0for iinrange(num+1):        v+= ireturn v# -----------------------------------------------------------# 指定された各整数までの総和を計算する# -----------------------------------------------------------defmulti_sigma(nums:list[int])->list[int]:return[sigma(num)for numin nums]asyncdefasync_multi_sigma(nums:list[int])->list[int]:returnawait asyncio.gather(*[async_sigma(num)for numin nums])

テストコード

from c002_aiofilesimport(    async_multi_sigma,    async_sigma,    multi_sigma,    sigma,)N_SIGMA=10**7V_SIGMA=50000005000000N_SIGMA_TIMES=3# -----------------------------------------------------------# 指定された整数までの総和を計算するテスト# -----------------------------------------------------------deftest_sigma(benchmark):@benchmarkdef_():assert sigma(N_SIGMA)== V_SIGMA@pytest.mark.asyncioasyncdeftest_async_sigma(aio_benchmark):@aio_benchmarkasyncdef_():assertawait async_sigma(N_SIGMA)== V_SIGMA# -----------------------------------------------------------# 指定された各整数までの総和を計算するテスト# -----------------------------------------------------------deftest_multi_sigma(benchmark):@benchmarkdef_():        result= multi_sigma([N_SIGMAfor _inrange(N_SIGMA_TIMES)])assert result==[V_SIGMAfor _inrange(N_SIGMA_TIMES)]@pytest.mark.asyncioasyncdeftest_async_multi_sigma(aio_benchmark):@aio_benchmarkasyncdef_():        result=await async_multi_sigma([N_SIGMAfor _inrange(N_SIGMA_TIMES)])assert result==[V_SIGMAfor _inrange(N_SIGMA_TIMES)]

結果は以下の通りです。

------------------------------------------------------------------------------------------ benchmark: 4 tests -----------------------------------------------------------------------------------------Name (time in ms)                 Min                   Max                  Mean             StdDev                Median                IQR            Outliers     OPS            Rounds  Iterations-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_async_sigma             480.9756 (1.0)        499.0507 (1.0)        492.1004 (1.0)       6.7240 (1.0)        493.6568 (1.0)       6.3340 (1.0)           2;0  2.0321 (1.0)           5           1test_sigma                   500.9957 (1.04)       519.9479 (1.04)       511.1150 (1.04)      7.5248 (1.12)       512.2812 (1.04)     11.8297 (1.87)          2;0  1.9565 (0.96)          5           1test_async_multi_sigma     1,466.5203 (3.05)     1,494.5071 (2.99)     1,479.8562 (3.01)     10.9483 (1.63)     1,481.6104 (3.00)     16.4346 (2.59)          2;0  0.6757 (0.33)          5           1test_multi_sigma           1,492.3126 (3.10)     1,547.9187 (3.10)     1,523.0065 (3.09)     27.9279 (4.15)     1,534.3684 (3.11)     53.7643 (8.49)          2;0  0.6566 (0.32)          5           1-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

並列処理によって処理速度に違いはありませんが、こちらは想定通りです。今回の処理は CPU bound なため、CPU に待ちが発生せず並列処理の恩恵を受けられません。では I/O 処理と組み合わせるとどうでしょうか?

テスト対象コード

# -----------------------------------------------------------# File I/O と CPUタスクを行う(データ指定)# -----------------------------------------------------------defmix_io_cpu_with_data(paths:list[Path], data:bytes, nums:list[int])->list[int]:    create_multi_files_with_data(paths, data)return multi_sigma(nums)asyncdefasync_mix_io_cpu_with_data(paths:list[Path], data:bytes, nums:list[int])->list[int]:    result=await asyncio.gather(        async_create_multi_files_with_data(paths, data),        async_multi_sigma(nums),)return result[1]

テストコード

from c002_aiofilesimport(    async_mix_io_cpu_with_data,    mix_io_cpu_with_data,)# -----------------------------------------------------------# File I/O と CPUタスクを行うテスト(データ指定)# -----------------------------------------------------------deftest_mix_io_cpu_with_data(benchmark, named_tempfiles, huge_bytes):@benchmarkdef_():        result= mix_io_cpu_with_data([Path(named_tempfile.name)for named_tempfilein named_tempfiles],            huge_bytes,[N_SIGMAfor _inrange(N_SIGMA_TIMES)],)assert result==[V_SIGMAfor _inrange(N_SIGMA_TIMES)]@pytest.mark.asyncioasyncdeftest_async_mix_io_cpu_with_data(aio_benchmark, named_tempfiles, huge_bytes):@aio_benchmarkasyncdef_():        result=await async_mix_io_cpu_with_data([Path(named_tempfile.name)for named_tempfilein named_tempfiles],            huge_bytes,[N_SIGMAfor _inrange(N_SIGMA_TIMES)],)assert result==[V_SIGMAfor _inrange(N_SIGMA_TIMES)]

結果は以下の通りです。

------------------------------------------------------------------------------------- benchmark: 2 tests -------------------------------------------------------------------------------------Name (time in s)                       Min               Max              Mean            StdDev            Median               IQR            Outliers     OPS            Rounds  Iterations----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_async_mix_io_cpu_with_data     2.4868 (1.0)      2.6220 (1.0)      2.5505 (1.0)      0.0528 (1.0)      2.5410 (1.0)      0.0798 (1.0)           2;0  0.3921 (1.0)           5           1test_mix_io_cpu_with_data           2.6234 (1.05)     2.9007 (1.11)     2.8053 (1.10)     0.1114 (2.11)     2.8472 (1.12)     0.1423 (1.78)          1;0  0.3565 (0.91)          5           1---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

asyncio を使った方が、やや早くなっていますが劇的な変化は見られませんでした。。。
私自身、腑に落ちる理由が分かっていないので、心当たりのある方はコメントで教えていただけると幸いです。

benchmark: aiohttp

次にaiohttpを用いたネットワーク I/O (HTTP)のベンチマークを行ってみましょう。まずアクセスされたら指定の時間だけスリープする API を作成します。

defcreate_app()-> web.Application:asyncdefhandle(request: Any)-> web.Response:        period=float(request.rel_url.query["period"])await asyncio.sleep(period)return web.Response()    app= web.Application()    app.add_routes([web.get("/sleep", handle)])return appif __name__=="__main__":    app= create_app()    web.run_app(app)

これとは別プロセスで HTTP リクエストを行います。

# -----------------------------------------------------------# /sleepリソースに停止時間を指定してHTTPリクエストを行う# -----------------------------------------------------------defsleep_request(url:str, period:float)->int:with urllib.request.urlopen(f"{url}/sleep?period={period}")as response:return response.statusasyncdefasync_sleep_request(url:str, period:float)->int:asyncwith aiohttp.ClientSession()as session:asyncwith session.get(f"{url}/sleep?period={period}")as response:return response.status# -----------------------------------------------------------# /sleepリソースに停止時間を指定した数だけHTTPリクエストを行う# -----------------------------------------------------------defmulti_sleep_requests(url:str, periods:list[float])->list[int]:return[sleep_request(url, period)for periodin periods]asyncdefasync_multi_sleep_requests(url:str, periods:list[float])->list[int]:returnawait asyncio.gather(*[async_sleep_request(url, period)for periodin periods])

これまでのようにベンチマークを実施してみましょう。

SERVER_URL="http://0.0.0.0:8080"deftest_sleep_request(benchmark):@benchmarkdef_():        status= sleep_request(SERVER_URL,0.2)assert status==200@pytest.mark.asyncioasyncdeftest_async_sleep_request(aio_benchmark):@aio_benchmarkasyncdef_():        status=await async_sleep_request(SERVER_URL,0.2)assert status==200deftest_multi_sleep_requests(benchmark):@benchmarkdef_():        status= multi_sleep_requests(SERVER_URL,[0.2,0.2,0.2])assert status==[200,200,200]@pytest.mark.asyncioasyncdeftest_async_multi_sleep_requests(aio_benchmark):@aio_benchmarkasyncdef_():        status=await async_multi_sleep_requests(SERVER_URL,[0.2,0.2,0.2])assert status==[200,200,200]

結果は以下の通りです。

----------------------------------------------------------------------------------------- benchmark: 4 tests -----------------------------------------------------------------------------------------Name (time in ms)                        Min                 Max                Mean            StdDev              Median               IQR            Outliers     OPS            Rounds  Iterations------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_sleep_request                  202.4012 (1.0)      203.9504 (1.0)      203.4526 (1.0)      0.6106 (1.38)     203.6606 (1.0)      0.5861 (1.0)           1;0  4.9151 (1.0)           5           1test_async_sleep_request            205.5099 (1.02)     207.0953 (1.02)     206.3653 (1.01)     0.7193 (1.63)     206.6543 (1.01)     1.2889 (2.20)          2;0  4.8458 (0.99)          5           1test_async_multi_sleep_requests     208.4528 (1.03)     209.4410 (1.03)     208.8828 (1.03)     0.4413 (1.0)      208.6549 (1.02)     0.7507 (1.28)          1;0  4.7874 (0.97)          5           1test_multi_sleep_requests           609.3281 (3.01)     623.3630 (3.06)     613.3552 (3.01)     5.7904 (13.12)    610.7264 (3.00)     5.9304 (10.12)         1;0  1.6304 (0.33)          5           1------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

こちらは想定通りの動きをしています。HTTP リクエストをしている間に並行処理が走っているため、200ms x 3 = 600msではなく、1 回分の 200ms で処理が終わっています。

ではクライアント側で重めの CPU 処理を回している場合はどうでしょうか。

N_SIGMA=10**5V_SIGMA=5000050000N_SIGMA_TIMES=3# -----------------------------------------------------------# Network I/O と CPUタスクを行う# -----------------------------------------------------------defmix_network_io_cpu(url:str, periods:list[float], nums:list[int])->list[int]:    multi_sleep_requests(url, periods)return multi_sigma(nums)asyncdefasync_mix_network_io_cpu(url:str, periods:list[float], nums:list[int])->list[int]:    result=await asyncio.gather(        async_multi_sleep_requests(url, periods),        async_multi_sigma(nums),)return result[1]

こちらもベンチマークを回してみましょう。

# -----------------------------------------------------------# Network I/O と CPUタスクを行うテスト(サイズ指定)# -----------------------------------------------------------deftest_mix_network_io_cpu(benchmark):@benchmarkdef_():        result= mix_network_io_cpu(            SERVER_URL,[1,1,1],[N_SIGMAfor _inrange(N_SIGMA_TIMES)],)assert result==[V_SIGMAfor _inrange(N_SIGMA_TIMES)]@pytest.mark.asyncioasyncdeftest_async_mix_network_io_cpu(aio_benchmark):@aio_benchmarkasyncdef_():        result=await async_mix_network_io_cpu(            SERVER_URL,[1,1,1],[N_SIGMAfor _inrange(N_SIGMA_TIMES)],)assert result==[V_SIGMAfor _inrange(N_SIGMA_TIMES)]

結果は以下の通りです。

------------------------------------------------------------------------------------ benchmark: 2 tests ------------------------------------------------------------------------------------Name (time in s)                     Min               Max              Mean            StdDev            Median               IQR            Outliers     OPS            Rounds  Iterations--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_async_mix_network_io_cpu     1.0208 (1.0)      1.0343 (1.0)      1.0253 (1.0)      0.0053 (1.0)      1.0243 (1.0)      0.0053 (1.0)           1;0  0.9753 (1.0)           5           1test_mix_network_io_cpu           3.0221 (2.96)     3.0393 (2.94)     3.0311 (2.96)     0.0072 (1.37)     3.0341 (2.96)     0.0119 (2.23)          2;0  0.3299 (0.34)          5           1--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

こちらも想定通り、並行処理の効果が効いていますね。
aiohttp を用いた Network I/O では、並行処理の恩恵で実際に処理が早くなることが確認できました。

ちなみに aiohttp ではaiohttp_clientという fixture を利用することが出来るため、本来であれば別プロセスでアプリを起動せず、pytest のみで完結するはずです。しかし以下の実装で試すとRuntimeError: Task got Future attached to a different loop.というエラーが出たため、今回は別プロセスでアプリを起動する方法で迂回しました。

# ※エラーが発生するコード@pytest.mark.asyncioasyncdeftest_sleep_request(aiohttp_client, aio_benchmark):@aio_benchmarkasyncdef_():        app_client=await aiohttp_client(create_app())        period=1        response=await app_client.get(f"/sleep?period={period}")assert response.status==200

benchmark: aiosqlite

最後は aiosqlite を用いた DB 接続です。

!

今回は SQLite を用いますが、databasesには他の DB のasyncioに対応したクライアントが用意されています。

これまで通り、まずはスリープから検証していきましょう。
DB にはスリープ関数がないため、自作関数を作成してスリープを実装します

# -----------------------------------------------------------# DBにアクセスして指定した時間だけスリープする# -----------------------------------------------------------defsleep_db_request(period:float)->None:with sqlite3.connect(":memory:")as db:        db.create_function("sleep",1, time.sleep)        cur= db.cursor()        cur.execute(f"SELECT sleep({period})")asyncdefasync_sleep_db_request(period:float)->None:asyncwith aiosqlite.connect(":memory:")as db:await db.create_function("sleep",1, time.sleep)        cur=await db.cursor()await cur.execute(f"SELECT sleep({period})")# -----------------------------------------------------------# DBにアクセスして指定した時間の数だけスリープする# -----------------------------------------------------------defmulti_sleep_db_requests(periods:list[float])->None:for periodin periods:        sleep_db_request(period)asyncdefasync_multi_sleep_db_requests(periods:list[float])->None:await asyncio.gather(*[async_sleep_db_request(period)for periodin periods])

テストケースも今まで通りです

deftest_sleep_db_request(benchmark):@benchmarkdef_():        sleep_db_request(0.2)@pytest.mark.asyncioasyncdeftest_async_sleep_db_request(aio_benchmark):@aio_benchmarkasyncdef_():await async_sleep_db_request(0.2)deftest_multi_sleep_requests(benchmark):@benchmarkdef_():        multi_sleep_db_requests([0.2,0.2,0.2])@pytest.mark.asyncioasyncdeftest_async_multi_sleep_requests(aio_benchmark):@aio_benchmarkasyncdef_():await async_multi_sleep_db_requests([0.2,0.2,0.2])

結果は以下の通りです。

----------------------------------------------------------------------------------------- benchmark: 4 tests -----------------------------------------------------------------------------------------Name (time in ms)                        Min                 Max                Mean            StdDev              Median               IQR            Outliers     OPS            Rounds  Iterations------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_sleep_db_request               201.3555 (1.0)      205.2846 (1.0)      203.7979 (1.0)      1.8179 (1.73)     204.9371 (1.0)      3.0303 (1.87)          1;0  4.9068 (1.0)           5           1test_async_sleep_db_request         204.7722 (1.02)     207.3065 (1.01)     206.2676 (1.01)     1.0537 (1.0)      206.7830 (1.01)     1.6162 (1.0)           1;0  4.8481 (0.99)          5           1test_async_multi_sleep_requests     209.6312 (1.04)     214.5691 (1.05)     211.9721 (1.04)     1.8173 (1.72)     212.0738 (1.03)     2.2477 (1.39)          2;0  4.7176 (0.96)          5           1test_multi_sleep_requests           607.1996 (3.02)     611.5772 (2.98)     609.8215 (2.99)     2.2275 (2.11)     611.2378 (2.98)     4.0439 (2.50)          2;0  1.6398 (0.33)          5           1------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

こちらも aiohttp の時と同様、並列処理の恩恵を受けていますね。

念の為クライアント側で重めの CPU 処理を追加して検証します。

# -----------------------------------------------------------# DB Network I/O と CPUタスクを行う# -----------------------------------------------------------defmix_db_io_cpu(periods:list[float], nums:list[int])->list[int]:    multi_sleep_db_requests(periods)return multi_sigma(nums)asyncdefasync_mix_db_io_cpu(periods:list[float], nums:list[int])->list[int]:    result=await asyncio.gather(        async_multi_sleep_db_requests(periods),        async_multi_sigma(nums),)return result[1]

テストコードもほとんど変更はありません

# -----------------------------------------------------------# DB Network I/O と CPUタスクを行うテスト(サイズ指定)# -----------------------------------------------------------deftest_mix_db_io_cpu(benchmark):@benchmarkdef_():        result= mix_db_io_cpu([1,1,1],[N_SIGMAfor _inrange(N_SIGMA_TIMES)],)assert result==[V_SIGMAfor _inrange(N_SIGMA_TIMES)]@pytest.mark.asyncioasyncdeftest_async_mix_db_io_cpu(aio_benchmark):@aio_benchmarkasyncdef_():        result=await async_mix_db_io_cpu([1,1,1],[N_SIGMAfor _inrange(N_SIGMA_TIMES)],)assert result==[V_SIGMAfor _inrange(N_SIGMA_TIMES)]

結果は以下の通りです。

---------------------------------------------------------------------------------- benchmark: 2 tests ---------------------------------------------------------------------------------Name (time in s)                Min               Max              Mean            StdDev            Median               IQR            Outliers     OPS            Rounds  Iterations---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_async_mix_db_io_cpu     1.0220 (1.0)      1.0400 (1.0)      1.0344 (1.0)      0.0072 (1.0)      1.0371 (1.0)      0.0076 (1.0)           1;0  0.9668 (1.0)           5           1test_mix_db_io_cpu           3.0259 (2.96)     3.0451 (2.93)     3.0336 (2.93)     0.0081 (1.13)     3.0320 (2.92)     0.0135 (1.78)          1;0  0.3296 (0.34)          5           1---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

やはり想定通り、aiosqlite でも並列処理の恩恵を受けることを確認できました。

まとめ

今回は 「asyncio で並列処理をすることで本当に処理が早くなるか」をベンチマークテストで確認しました。aiofiles のみ処理が速くなることは確認できませんでしたが、aiohttp, aiosqlite では並列処理の恩恵を受けることが確認できました。

FastAPI をはじめとする ASGI 対応の Web フレームワークでの DB 接続やネットワーク I/O では積極的に asyncio、コルーチンを使っていくことで処理の高速化が図れそうです。

aiofiles を用いたベンチマークで結果が想定通りにならなかった件については、そもそも aiofiles では処理が速くならないのか、それとも実装に問題があるのか、もしわかる方がいればコメント頂けると幸いです。

alivelimb

SIer / Python / AWS認定 x13 / GCP認定 x11※Google Analytics, Amazonアソシエイトを利用しています

バッジを贈って著者を応援しよう

バッジを受け取った著者にはZennから現金やAmazonギフトカードが還元されます。

hankei6kmhankei6km

aiofiles を用いたベンチマークで結果が想定通りにならなかった件については、そもそも aiofiles では処理が速くならないのか、それとも実装に問題があるのか、もしわかる方がいればコメント頂けると幸いです。

Python と asyncio を使ったのが数年前なので間違っていたらすみません。

今回のケースでは ThreadPoolExecutor の最大ワーカースレッド数に引っかかているのでないでしょうか?

issue の記述とソースをざっと読んだ感じではaiofileswrite()run_in_executor() をデフォルトのエクゼキューターで使っていると思われます。

What aiofiles does is delegate the file reading operations to a threadpool.

https://github.com/Tinche/aiofiles/blob/6c29e3d98d7e25ae9f58f052086ac9bbd7633ddd/src/aiofiles/threadpool/utils.py#L41-L47

write() はブロッキング処理になるので、これを多用するとスレッドプールを消費してしまうはずです。

ちょっと趣旨は違いますが下記の記事が参考になるかと思います。

https://pod.hatenablog.com/entry/2019/03/21/162511

一方でrun_in_executor() には下記のような記述があります。

https://docs.python.org/ja/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

バージョン 3.5.3 で変更:loop.run_in_executor() は内部で生成するスレッドプールエグゼキュータのmax_workers を設定せず、代わりにスレッドプールエグゼキュータ (ThreadPoolExecutor) にデフォルト値を設定させるようになりました。

さらに ThreadPoolExecutor の方を見ると、おそらく最新の Python では下記がデフォルトになっていると思われます。

https://docs.python.org/ja/3/library/concurrent.futures.html#threadpoolexecutor

バージョン 3.8 で変更: Default value of max_workers is changed tomin(32, os.cpu_count() + 4). This default value preserves at least 5 workers for I/O bound tasks. It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL. And it avoids using very large resources implicitly on many-core machines.

これは、Codespace(CPU 4 コア) で確認すると下記のように最大ワーカースレッド数が 8 ということになります。

$ grep "model name" < /proc/cpuinfomodel name      : Intel(R) Xeon(R) Platinum 8168 CPU @ 2.70GHzmodel name      : Intel(R) Xeon(R) Platinum 8168 CPU @ 2.70GHzmodel name      : Intel(R) Xeon(R) Platinum 8168 CPU @ 2.70GHzmodel name      : Intel(R) Xeon(R) Platinum 8168 CPU @ 2.70GHz$ python3Python 3.8.10 (default, Jun 22 2022, 20:18:18) [GCC 9.4.0] on linuxType "help", "copyright", "credits" or "license" for more information.>>> import os>>> min(32, os.cpu_count() + 4)8

もしも測定されている環境での最大ワーカースレッド数に余裕がなさそうでしたら、I/O の並行数をら減らしてみるのはいかがでしょうか?

3
alivelimbalivelimb

コメント頂きありがとうございます!
(Pythonのthreadに関する理解が足りておらず、共有いただいた資料などを読ませて頂いていた都合で回答が遅れてしまいました...)

まず私の環境での最大ワーカスレッド数は以下で確認した通り12となります

$ python -c 'import os; print(min(32, os.cpu_count() + 4))'12

もしも測定されている環境での最大ワーカースレッド数に余裕がなさそうでしたら、I/O の並行数を減らしてみるのはいかがでしょうか?

こちらの「最大ワーカスレッド数に余裕がないか」の検証方法が分からず検証出来ていないのですが、「I/O の並行数を減らしてみる」という部分は以下の記事を参考にSemaphoreを用いて検証してみました。

asyncdefasync_create_file_with_data_sema(path: Path, data:bytes, sema: asyncio.Semaphore)->None:asyncwith aiofiles.open(path.as_posix(),"wb")as f, sema:await f.write(data)# テスト対象コードasyncdefasync_create_multi_files_with_data_sema(paths:list[Path], data:bytes, limit:int)->None:    sema= asyncio.Semaphore(limit)await asyncio.gather(*[async_create_file_with_data_sema(path, data, sema)for pathin paths])# テストコード@pytest.mark.asyncioasyncdeftest_async_create_multi_files_with_data_sema(aio_benchmark, named_tempfiles, huge_bytes):@aio_benchmarkasyncdef_():await async_create_multi_files_with_data_sema([Path(named_tempfile.name)for named_tempfilein named_tempfiles], huge_bytes,12)

limit数は12以外にも4, 8でも検証しましたが、結果としては速度は改善せずでした。。。
「hankei6kmさんが想定していた検証方法ではないかも...?」とも思っているので、お時間ある際に再びコメント頂けると幸いです。

1
hankei6kmhankei6km

ご返答ありがとうございます。

こちらの書き方が曖昧だったばかりにお手間をとらせてしまってすみません。

limit数は12以外にも4, 8でも検証しましたが、結果としては速度は改善せずでした。。。
「hankei6kmさんが想定していた検証方法ではないかも...?」とも思っているので、お時間ある際に再びコメント頂けると幸いです。

想定していた通りですが、もう少し単純に「CREATE_FILE_NUM を最大ワーカースレッド数 -3 くらいにしてみたらいかかでしょうか」という意味で書いていました。
(-3 はとくに意味があるわけではなく、メインスレッドの他に pytest が何かスレッドを生成している可能性を考慮しての値です)

今回は予想だけで書いてしまうのも申し訳ないので実際に環境を作って試してみました。

https://github.com/hankei6km/test-python-asyncio

前述の Codespace(最大ワーカースレッド数 8)の環境でCREATE_FILE_NUM = 5 にしてみましたがこちらでも違いは出ませんでした。

# CREATE_FILE_NUM = 10CREATE_FILE_NUM=5

pytest -k 'multi' で実行しています。

----------------------------------------------------------------------------------------- benchmark: 2 tests -----------------------------------------------------------------------------------------Name (time in s)                               Min               Max              Mean            StdDev            Median               IQR            Outliers     OPS            Rounds  Iterations------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_create_multi_files_with_data           6.5082 (1.0)      6.6423 (1.01)     6.5706 (1.0)      0.0513 (2.17)     6.5615 (1.0)      0.0734 (1.62)          2;0  0.1522 (1.0)           5           1test_async_create_multi_files_with_data     6.5482 (1.01)     6.5976 (1.0)      6.5707 (1.00)     0.0236 (1.0)      6.5645 (1.00)     0.0453 (1.0)           1;0  0.1522 (1.00)          5           1------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

I/O 性能の頭打ちかと思い、iotop で様子を眺めていたのですが、同期と非同期どちらも似たような感じになります。

同期処理時の iostat のスクリーンショット
同期処理時

非同期理時の iostat のスクリーンショット
非同期処理時(write にあわせてスレッドが増えています)

スクリーンショットだと非同期の方が良い値ですが、どちらも Current は 200 M/s を前後しています。

それならばと、ためしに書き出すファイルを異なるボリュームに分散させてみましたが、同期と非同期ともに速くなってしまったのでこちらも違いはでませんでした。
(これ以降もCREATE_FILE_NUM = 5 で試しています)

$df /tmp /home/vscode/tmp/Filesystem     1K-blocks    Used Available Use% Mounted on/dev/sda1       32845584     352  31151232   1%/tmpoverlay         32847680 2353564  28800020   8% /

偶数のときは/tmp、奇数のときは/home/vscode/tmp に書き込みます。

@pytest.fixture(scope="function")defnamed_tempfiles():    files=[NamedTemporaryFile("w",dir="/tmp"if i%2==0else"/home/vscode/tmp")for iinrange(CREATE_FILE_NUM)]# files = [NamedTemporaryFile("w", dir="/home/vscode/tmp")#          for _ in range(CREATE_FILE_NUM)]# files = [NamedTemporaryFile("w") for _ in range(CREATE_FILE_NUM)]yield filesfor fwin files:        fw.close()
----------------------------------------------------------------------------------------- benchmark: 2 tests -----------------------------------------------------------------------------------------Name (time in s)                               Min               Max              Mean            StdDev            Median               IQR            Outliers     OPS            Rounds  Iterations------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_create_multi_files_with_data           3.8486 (1.0)      4.0886 (1.02)     3.9493 (1.0)      0.1022 (2.20)     3.9499 (1.0)      0.1736 (2.50)          1;0  0.2532 (1.0)           5           1test_async_create_multi_files_with_data     3.8907 (1.01)     4.0031 (1.0)      3.9608 (1.00)     0.0464 (1.0)      3.9607 (1.00)     0.0695 (1.0)           1;0  0.2525 (1.00)          5           1------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

もっと性能良くないとダメかなということで、続いてtmpfs(メモリー上に作られるファイルシステムと思ってください)をマウントし試してみるとようやく違いがでました。

$sudomount-t tmpfs-osize=4000m tmpfs /home/vscode/tmp/$df /home/vscode/tmp/Filesystem     1K-blocks  Used Available Use% Mounted ontmpfs            4096000     0   4096000   0% /home/vscode/tmp

全部のファイルを tmpfs へ書き込みます。

@pytest.fixture(scope="function")defnamed_tempfiles():# files = [NamedTemporaryFile(#     "w", dir="/tmp" if i % 2 == 0 else "/home/vscode/tmp") for i in range(CREATE_FILE_NUM)]    files=[NamedTemporaryFile("w",dir="/home/vscode/tmp")for _inrange(CREATE_FILE_NUM)]# files = [NamedTemporaryFile("w") for _ in range(CREATE_FILE_NUM)]yield filesfor fwin files:        fw.close()
--------------------------------------------------------------------------------------------- benchmark: 2 tests ----------------------------------------------------------------------------------------------Name (time in ms)                                Min                 Max                Mean             StdDev              Median                IQR            Outliers     OPS            Rounds  Iterations----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_async_create_multi_files_with_data     270.3285 (1.0)      283.7868 (1.0)      275.1938 (1.0)       5.4192 (1.0)      275.0887 (1.0)       7.2456 (1.0)           1;0  3.6338 (1.0)           5           1test_create_multi_files_with_data           647.1230 (2.39)     683.4950 (2.41)     660.6459 (2.40)     14.2596 (2.63)     655.9037 (2.38)     18.3033 (2.53)          1;0  1.5137 (0.42)          5           1----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

5 倍まではいきませんが、速くはなったと言える値かと思います。

この状態だと最大ワーカースレッド数を変更すると若干ですが速度に影響がありました。

threadPool= ThreadPoolExecutor(max_workers=2)asyncdefasync_create_file_with_data(path: Path, data:bytes)->None:# async with aiofiles.open(path.as_posix(), "wb") as f:asyncwith aiofiles.open(path.as_posix(),"wb", executor=threadPool)as f:await f.write(data)
---------------------------------------------------------------------------------------------- benchmark: 2 tests ----------------------------------------------------------------------------------------------Name (time in ms)                                Min                 Max                Mean             StdDev              Median                IQR            Outliers     OPS            Rounds  Iterations----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------test_async_create_multi_files_with_data     385.8565 (1.0)      433.2469 (1.0)      397.1514 (1.0)      20.2649 (2.14)     387.9202 (1.0)      14.3998 (1.95)          1;1  2.5179 (1.0)           5           1test_create_multi_files_with_data           649.6608 (1.68)     673.3505 (1.55)     656.7036 (1.65)      9.4867 (1.0)      652.8568 (1.68)      7.3829 (1.0)           1;1  1.5228 (0.60)          5           1----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

そのようなわけで当初の「最大ワーカースレッド数に引っかかっているのでは?」はあまり正しくありませんでした。

まとめると以下のようになるかと思います。

  • Python でファイル I/O を並行化するにはスレッドを使う必要がある
    • aiofiles でも内部的にはスレッドを使っている
  • Python の write をマルチスレッド化しても(おそらくは)I/O の性能に左右される
    • tmpfs などの高速な同時アクセス性能が良いファイルシステムを使うと速度が向上しスレッド数が影響してくる

ただし、I/O 性能が原因という決定的な証拠も見つからなかったというのが正直なところです(「I/O 時にはスレッドのロックが開放されるので速くなるけど、やっぱりファイルの操作は遅いよ」という記述が見つかる程度です)。

最終的に何がネックなっているのか(Python のスレッド制御なのか、I/O性能なのかなど)はイマイチはっきりしていないのが申し訳ないのですが、試した範囲でわかったことをまとめてみました。

下記のコメントに追記しましたが、今回の環境では I/O 性能(同時アクセスの性能)がネックで並行化しても思ったように速くならなかったと考えられます。

hankei6kmhankei6km

小出しになってしまってすみません、追記です。

本当に I/O が頭打ちだったのか気になったのでdd コマンドを使って簡単に試してみました。

実行環境は上記のベンチマークを動かした Codespace を使っています。

まず、各ファイルへ順次書き出す処理です。
(/tmp/test?.tmp\0 を 2**28 バイト書き出しています)

ddif=/dev/zerobs=268435456count=1of=/tmp/test1.tmpddif=/dev/zerobs=268435456count=1of=/tmp/test2.tmpddif=/dev/zerobs=268435456count=1of=/tmp/test3.tmpddif=/dev/zerobs=268435456count=1of=/tmp/test4.tmpddif=/dev/zerobs=268435456count=1of=/tmp/test5.tmp

新規でファイル作成すると 900MB/s くらいはでます(後半、なぜ遅くなっているかは不明です)

1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.289604 s, 927 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.288978 s, 929 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.291378 s, 921 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.90479 s, 297 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 1.08273 s, 248 MB/s

上書き(削除 & 作成)すると 200MB/s 前後になります。

1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 1.07726 s, 249 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 1.28004 s, 210 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 1.27764 s, 210 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 1.20428 s, 223 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 1.37003 s, 196 MB/s

続いて並行化した場合です。

ddif=/dev/zerobs=268435456count=1of=/tmp/test1.tmp&ddif=/dev/zerobs=268435456count=1of=/tmp/test2.tmp&ddif=/dev/zerobs=268435456count=1of=/tmp/test3.tmp&ddif=/dev/zerobs=268435456count=1of=/tmp/test4.tmp&ddif=/dev/zerobs=268435456count=1of=/tmp/test5.tmp

こちらも最初の 1 回は速くなりますが、5 で割ったような値になります。

1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.949643 s, 283 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 1.44472 s, 186 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 1.61763 s, 166 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 1.81 s, 148 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 2.26454 s, 119 MB/s

上書き(削除 & 作成)すると 200MB/s を 5 で割ったよりかは少し良い値ですが、全体の経過時間では 6.3 秒なのでベンチマークとあまり違いはありません。

1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 5.0185 s, 53.5 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 5.07003 s, 52.9 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 5.9657 s, 45.0 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 6.2768 s, 42.8 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 6.38367 s, 42.1 MB/s

以上のように asyncio のベンチマークのときと比べて大きな違いはないようでした。

このことから、開発マシンの/tmp などに書き出した場合は asyncio に限らずファイル書き出しの並行化のメリットはあまり受けられないことになるかと思われます。

一方で tmpfs の場合は並行化してもパフォーマンスは 5 で割った値にまでは落ち込みませんでした。

順次書き出した場合。

1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.223789 s, 1.2 GB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.226139 s, 1.2 GB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.235712 s, 1.1 GB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.233061 s, 1.2 GB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.237924 s, 1.1 GB/s

並行して書き出した場合

1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.338262 s, 794 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.417389 s, 643 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.403607 s, 665 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.498165 s, 539 MB/s1+0 records in1+0 records out268435456 bytes (268 MB, 256 MiB) copied, 0.49274 s, 545 MB/s

これは速度的な性能というよりも、ファイルシステムの構成上(メモリー上に作成されるので)「同時アクセスの性能が良い」という感じかと思われます。

よって、asycio などによる並行処理でファイル処理を並行化した場合、このような同時アクセスで性能が落ちないストレージならばある程度の速度向上が見込めると考えられます。
(例、冗長化されたディスクなど)

1
alivelimbalivelimb

様々な検証を試して頂きありがとうございます!
読ませて頂いていて「なるほど、こうやって検証する方法があるのか」と非常に勉強になりました。

asycio などによる並行処理でファイル処理を並行化した場合、このような同時アクセスで性能が落ちないストレージならばある程度の速度向上が見込めると考えられます。

なるほど、asyncio, aiofilesの問題というよりはファイルシステム側の問題の可能性が高そうですね。記事本文内でも紹介したaiofilesのIssueでは「処理が早くはならないかも知れないが、メインスレッドで別の処理を行えることが出来るのが利点だ」という記述があり、「処理は早くならないの...?なぜ?」という疑問があったのですが、腑に落ちる原因がありそうでスッキリしました。

hankei6kmhankei6km

私も漠然と「ファイル I/O も並行化すればある程度は速くなるだろう」と考えていたので、実際に手を動かして検証できたのは良い体験でした。

ありがとうございました。

1
cartcart

ベンチマーク取ってませんが、CPU Boundのasync_sigma関数は内部で待ち状態が発生しないので無意味な比較になっていると思います。内部で待たないのでasyncio.gatherしても別のtaskへの移動は発生せず逐次実行になります。

例えば、async_sigma関数の中にprint関数を追加してgatherしても次のような逐次実行になります。

asyncdefasync_sigma(num:int)->int:print(f"async sigma({num}) start")    v=0for iinrange(num+1):print(f"async sigma({num})", i)        v+= iprint(f"async sigma({num}) end")return vasyncdefasync_multi_sigma(nums:list[int])->list[int]:returnawait asyncio.gather(*[async_sigma(num)for numin nums])await async_multi_sigma([5,3,2])
async sigma(5) startasync sigma(5) 0async sigma(5) 1async sigma(5) 2async sigma(5) 3async sigma(5) 4async sigma(5) 5async sigma(5) endasync sigma(3) startasync sigma(3) 0async sigma(3) 1async sigma(3) 2async sigma(3) 3async sigma(3) endasync sigma(2) startasync sigma(2) 0async sigma(2) 1async sigma(2) 2async sigma(2) end[0]: [15, 6, 3]

並行(concurrent)処理になるように中で待たせる簡単な方法としては、await asyncio.sleep(0)をforの中で呼ぶと一時待ち状態になり別のtaskに移動して並行処理になります。

asyncdefasync_sigma(num:int)->int:print(f"async sigma({num}) start")    v=0for iinrange(num+1):print(f"async sigma({num})", i)        v+= iawait asyncio.sleep(0)print(f"async sigma({num}) end")return vasyncdefasync_multi_sigma(nums:list[int])->list[int]:returnawait asyncio.gather(*[async_sigma(num)for numin nums])await async_multi_sigma([5,3,2])
async sigma(5) startasync sigma(5) 0async sigma(3) startasync sigma(3) 0async sigma(2) startasync sigma(2) 0async sigma(5) 1async sigma(3) 1async sigma(2) 1async sigma(5) 2async sigma(3) 2async sigma(2) 2async sigma(5) 3async sigma(3) 3async sigma(2) endasync sigma(5) 4async sigma(3) endasync sigma(5) 5async sigma(5) end[1]: [15, 6, 3]

これで速くなるかは不明ですが、並行処理として比較するなら必要かと思います。ややこしいんですがgatherだけでは並行にも並列(parallel)にもならないようです。

また、上のコメントで速くなっていたのはシングルスレッドの並行処理でなくマルチスレッド(ThreadPool)による並列(parallel)処理になっているからだと思われます。一方でCPU Boundの処理はProcessPoolに入れると速くなるのでおそらく実感できるはずです。(下記のドキュメント参照)

https://docs.python.org/ja/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools

このawaitable loop.run_in_executor(executor, func, *args)はおそらく低水準APIに設計されているようなのですがもう少し雑に使える高水準APIとしてcoroutine asyncio.to_thread(func, /, *args, **kwargs)があります。おそらく我々がgatherに求めてるのはこいつで、与えた関数(コルーチンじゃダメ) がスレッドになり、gatherにまとめて渡すと並列(parallel)実行になります。これでも速度の実感ができるはずです。
loop.run_in_executor(executor, func, *args)のThreadPoolを使った場合とどういう差があるのかまでは自分もよくわかってませんが)

https://docs.python.org/ja/3.11/library/asyncio-task.html#running-in-threads

SIer / Python / AWS認定 x13 / GCP認定 x11※Google Analytics, Amazonアソシエイトを利用しています


[8]ページ先頭

©2009-2025 Movatter.jp