concurrent.futures --- 啟動平行任務

在 3.2 版被加入.

原始碼:Lib/concurrent/futures/thread.pyLib/concurrent/futures/process.py


concurrent.futures 模組提供了一個高階介面來非同步地 (asynchronously) 執行可呼叫物件 (callable) 。

非同步執行可以透過ThreadPoolExecutor 來使用執行緒 (thread) 執行,或透過ProcessPoolExecutor 來使用單獨行程 (process) 執行。兩者都實作了相同的介面,該介面由抽象的Executor 類別定義。

適用: not WASI.

此模組在 WebAssembly 平台上不起作用或無法使用。更多資訊請參閱WebAssembly 平台

Executor 物件

classconcurrent.futures.Executor

提供非同步執行呼叫方法的抽象類別。不應直接使用它,而應透過其具體子類別來使用。

submit(fn,/,*args,**kwargs)

為可呼叫物件fn 排程來以fn(*args,**kwargs) 的形式執行並回傳一個表示可呼叫的執行的Future 物件。

withThreadPoolExecutor(max_workers=1)asexecutor:future=executor.submit(pow,323,1235)print(future.result())
map(fn,*iterables,timeout=None,chunksize=1)

類似於map(fn,*iterables),除了:

  • iterables 立即被收集而不是延遲 (lazily) 收集;

  • fn 是非同步執行的,並且對fn 的多次呼叫可以並行處理。

如果__next__() 被呼叫,且在原先呼叫Executor.map()timeout 秒後結果仍不可用,回傳的疊代器就會引發TimeoutErrortimeout 可以是整數或浮點數。如果未指定timeout 或為None,則等待時間就不會有限制。

如果fn 呼叫引發例外,則當從疊代器中檢索到它的值時將引發該例外。

使用ProcessPoolExecutor 時,此方法將iterables 分成許多分塊 (chunks),並將其作為獨立的任務來提交給池 (pool)。可以透過將chunksize 設定為正整數來指定這些分塊的(約略)大小。對於非常長的可疊代物件,chunksize 使用較大的值(與預設大小 1 相比)可以顯著提高性能。對於ThreadPoolExecutorchunksize 無效。

在 3.5 版的變更:新增chunksize 引數。

shutdown(wait=True,*,cancel_futures=False)

向 executor 發出訊號 (signal),表明它應該在目前未定 (pending) 的 future 完成執行時釋放它正在使用的任何資源。在關閉後呼叫Executor.submit()Executor.map() 將引發RuntimeError

如果waitTrue 則此方法將不會回傳,直到所有未定的 futures 完成執行並且與 executor 關聯的資源都被釋放。如果waitFalse 則此方法將立即回傳,並且當所有未定的 future 執行完畢時,與 executor 關聯的資源將被釋放。不管wait 的值如何,整個 Python 程式都不會退出,直到所有未定的 futures 執行完畢。

如果cancel_futuresTrue,此方法將取消 executor 尚未開始運行的所有未定 future。無論cancel_futures 的值如何,任何已完成或正在運行的 future 都不會被取消。

如果cancel_futureswait 都為True,則 executor 已開始運行的所有 future 將在此方法回傳之前完成。剩餘的 future 被取消。

如果使用with 陳述句,你就可以不用明確地呼叫此方法,這將會自己關閉Executor(如同呼叫Executor.shutdown()wait 被設定為True 般等待):

importshutilwithThreadPoolExecutor(max_workers=4)ase:e.submit(shutil.copy,'src1.txt','dest1.txt')e.submit(shutil.copy,'src2.txt','dest2.txt')e.submit(shutil.copy,'src3.txt','dest3.txt')e.submit(shutil.copy,'src4.txt','dest4.txt')

在 3.9 版的變更:新增cancel_futures

ThreadPoolExecutor

ThreadPoolExecutor 是一個Executor 子類別,它使用執行緒池來非同步地執行呼叫。

當與Future 關聯的可呼叫物件等待另一個Future 的結果時,可能會發生死鎖 (deadlock)。例如:

importtimedefwait_on_b():time.sleep(5)print(b.result())# b will never complete because it is waiting on a.return5defwait_on_a():time.sleep(5)print(a.result())# a will never complete because it is waiting on b.return6executor=ThreadPoolExecutor(max_workers=2)a=executor.submit(wait_on_b)b=executor.submit(wait_on_a)

和:

defwait_on_future():f=executor.submit(pow,5,2)# This will never complete because there is only one worker thread and# it is executing this function.print(f.result())executor=ThreadPoolExecutor(max_workers=1)executor.submit(wait_on_future)
classconcurrent.futures.ThreadPoolExecutor(max_workers=None,thread_name_prefix='',initializer=None,initargs=())

一個Executor 子類別,它使用最多有max_workers 個執行緒的池來非同步地執行呼叫。

所有排隊到ThreadPoolExecutor 的執行緒都將在直譯器退出之前加入。請注意,執行此操作的退出處理程式會在任何使用atexit 新增的退出處理程式之前執行。這意味著必須捕獲並處理主執行緒中的例外,以便向執行緒發出訊號來正常退出 (gracefully exit)。因此,建議不要將ThreadPoolExecutor 用於長時間運行的任務。

initializer 是一個可選的可呼叫物件,在每個工作執行緒開始時呼叫;initargs 是傳遞給 initializer 的引數元組 (tuple)。如果initializer 引發例外,所有目前未定的作業以及任何向池中提交 (submit) 更多作業的嘗試都將引發BrokenThreadPool

在 3.5 版的變更:如果max_workersNone 或未給定,它將預設為機器上的處理器數量乘以5,這假定了ThreadPoolExecutor 通常用於 I/O 重疊而非 CPU 密集的作業,並且 worker 的數量應該高於ProcessPoolExecutor 的 worker 數量。

在 3.6 版的變更:新增thread_name_prefix 參數以允許使用者控制由池所建立的工作執行緒 (worker thread) 的threading.Thread 名稱,以便於除錯。

在 3.7 版的變更:新增initializerinitargs 引數。

在 3.8 版的變更:max_workers 的預設值改為min(32,os.cpu_count()+4)。此預設值為 I/O 密集任務至少保留了 5 個 worker。它最多使用 32 個 CPU 核心來執行CPU 密集任務,以釋放 GIL。並且它避免了在多核機器上隱晦地使用非常大量的資源。

ThreadPoolExecutor 現在在啟動max_workers 工作執行緒之前會重用 (reuse) 空閒的工作執行緒。

在 3.13 版的變更:Default value ofmax_workers is changed tomin(32,(os.process_cpu_count()or1)+4).

ThreadPoolExecutor 範例

importconcurrent.futuresimporturllib.requestURLS=['http://www.foxnews.com/','http://www.cnn.com/','http://europe.wsj.com/','http://www.bbc.co.uk/','http://nonexistent-subdomain.python.org/']# Retrieve a single page and report the URL and contentsdefload_url(url,timeout):withurllib.request.urlopen(url,timeout=timeout)asconn:returnconn.read()# We can use a with statement to ensure threads are cleaned up promptlywithconcurrent.futures.ThreadPoolExecutor(max_workers=5)asexecutor:# Start the load operations and mark each future with its URLfuture_to_url={executor.submit(load_url,url,60):urlforurlinURLS}forfutureinconcurrent.futures.as_completed(future_to_url):url=future_to_url[future]try:data=future.result()exceptExceptionasexc:print('%r generated an exception:%s'%(url,exc))else:print('%r page is%d bytes'%(url,len(data)))

ProcessPoolExecutor

ProcessPoolExecutor 類別是一個Executor 的子類別,它使用行程池來非同步地執行呼叫。ProcessPoolExecutor 使用了multiprocessing 模組,這允許它避開全域直譯器鎖 (Global Interpreter Lock),但也意味著只能執行和回傳可被 pickle 的 (picklable) 物件。

__main__ 模組必須可以被工作子行程 (worker subprocess) 引入。這意味著ProcessPoolExecutor 將無法在交互式直譯器 (interactive interpreter) 中工作。

從提交給ProcessPoolExecutor 的可呼叫物件中呼叫ExecutorFuture 方法將導致死鎖。

classconcurrent.futures.ProcessPoolExecutor(max_workers=None,mp_context=None,initializer=None,initargs=(),max_tasks_per_child=None)

一個Executor 子類別,它使用了最多有max_workers 個行程的池來非同步地執行呼叫。如果max_workersNone 或未給定,它將被預設為os.process_cpu_count()。如果max_workers 小於或等於0,則會引發ValueError。在 Windows 上,max_workers 必須小於或等於61。如果不是,則會引發ValueError。如果max_workersNone,則預設選擇最多為61,即便有更多處理器可用。mp_context 可以是multiprocessing 情境 (context) 或None。它將用於啟動 worker。如果mp_contextNone 或未給定,則使用預設的multiprocessing 情境。請見Contexts and start methods

initializer 是一個可選的可呼叫物件,在每個工作行程 (worker process) 開始時呼叫;initargs 是傳遞給 initializer 的引數元組。如果initializer 引發例外,所有目前未定的作業以及任何向池中提交更多作業的嘗試都將引發BrokenProcessPool

max_tasks_per_child 是一個可選引數,它指定單個行程在退出並被新的工作行程替換之前可以執行的最大任務數。預設情況下max_tasks_per_childNone,這意味著工作行程的生命週期將與池一樣長。當指定最大值時,在沒有mp_context 參數的情況下,將預設使用 "spawn" 做為 multiprocessing 啟動方法。此功能與 "fork" 啟動方法不相容。

在 3.3 版的變更:當其中一個工作行程突然終止時,現在會引發BrokenProcessPool 錯誤。在過去,此行為是未定義的 (undefined),但對 executor 或其 future 的操作經常會發生凍結或死鎖。

在 3.7 版的變更:新增了mp_context 引數以允許使用者控制由池所建立的工作行程的 start_method。

新增initializerinitargs 引數。

備註

預設的multiprocessing 啟動方法(請參閱Contexts and start methods)將不再是 Python 3.14 中的fork。需要fork 用於其ProcessPoolExecutor 的程式碼應透過傳遞mp_context=multiprocessing.get_context("fork") 參數來明確指定。

在 3.11 版的變更:新增了max_tasks_per_child 引數以允許使用者控制池中 worker 的生命週期。

在 3.12 版的變更:在 POSIX 系統上,如果你的應用程式有多個執行緒並且multiprocessing 情境使用了"fork" 啟動方法:內部呼叫以產生 worker 的os.fork() 函式可能會引發DeprecationWarning。傳遞一個mp_context 以配置為使用不同的啟動方法。更多說明請參閱os.fork() 文件。

在 3.13 版的變更:max_workers usesos.process_cpu_count() by default, instead ofos.cpu_count().

ProcessPoolExecutor 範例

importconcurrent.futuresimportmathPRIMES=[112272535095293,112582705942171,112272535095293,115280095190773,115797848077099,1099726899285419]defis_prime(n):ifn<2:returnFalseifn==2:returnTrueifn%2==0:returnFalsesqrt_n=int(math.floor(math.sqrt(n)))foriinrange(3,sqrt_n+1,2):ifn%i==0:returnFalsereturnTruedefmain():withconcurrent.futures.ProcessPoolExecutor()asexecutor:fornumber,primeinzip(PRIMES,executor.map(is_prime,PRIMES)):print('%d is prime:%s'%(number,prime))if__name__=='__main__':main()

Future 物件

Future 類別封裝了可呼叫物件的非同步執行。Future 實例由Executor.submit() 建立。

classconcurrent.futures.Future

封裝可呼叫物件的非同步執行。Future 實例由Executor.submit() 建立,且除測試外不應直接建立。

cancel()

嘗試取消呼叫。如果呼叫目前正在執行或已完成運行且無法取消,則該方法將回傳False,否則呼叫將被取消並且該方法將回傳True

cancelled()

如果該呼叫成功被取消,則回傳True

running()

如果呼叫正在執行且無法取消,則回傳True

done()

如果呼叫成功被取消或結束運行,則回傳True

result(timeout=None)

回傳該呼叫回傳的值。如果呼叫尚未完成,則此方法將等待至多timeout 秒。如果呼叫在timeout 秒內未完成,則會引發TimeoutErrortimeout 可以是整數或浮點數。如果未指定timeout 或為None,則等待時間就不會有限制。

如果 future 在完成之前被取消,那麼CancelledError 將被引發。

如果該呼叫引發了例外,此方法將引發相同的例外。

exception(timeout=None)

回傳該呼叫引發的例外。如果呼叫尚未完成,則此方法將等待至多timeout 秒。如果呼叫在timeout 秒內未完成,則會引發TimeoutErrortimeout 可以是整數或浮點數。如果未指定timeout 或為None,則等待時間就不會有限制。

如果 future 在完成之前被取消,那麼CancelledError 將被引發。

如果呼叫在沒有引發的情況下完成,則回傳None

add_done_callback(fn)

將可呼叫的fn 附加到 future 上。當 future 被取消或完成運行時,fn 將被以 future 作為其唯一引數來呼叫。

新增的可呼叫物件按新增順序呼叫,並且始終在屬於新增它們的行程的執行緒中呼叫。如果可呼叫物件引發Exception 子類別,它將被記錄 (log) 並忽略。如果可呼叫物件引發BaseException 子類別,該行為未定義。

如果 future 已經完成或被取消,fn 將立即被呼叫。

以下Future 方法旨在用於單元測試和Executor 實作。

set_running_or_notify_cancel()

此方法只能在與Future 關聯的工作被執行之前於Executor 實作中呼叫,或者在單元測試中呼叫。

如果該方法回傳FalseFuture 已被取消,即Future.cancel() 被呼叫並回傳True。任何等待Future 完成的執行緒(即透過as_completed()wait())將被喚醒。

如果該方法回傳True 則代表Future 未被取消並已進入運行狀態,意即呼叫Future.running() 將回傳True

此方法只能呼叫一次,且不能在呼叫Future.set_result()Future.set_exception() 之後呼叫。

set_result(result)

將與Future 關聯的工作結果設定為result

此方法只能在Executor 實作中和單元測試中使用。

在 3.8 版的變更:如果Future 已經完成,此方法會引發concurrent.futures.InvalidStateError

set_exception(exception)

將與Future 關聯的工作結果設定為Exceptionexception

此方法只能在Executor 實作中和單元測試中使用。

在 3.8 版的變更:如果Future 已經完成,此方法會引發concurrent.futures.InvalidStateError

模組函式

concurrent.futures.wait(fs,timeout=None,return_when=ALL_COMPLETED)

等待fs 給定的Future 實例(可能由不同的Executor 實例建立)完成。提供給fs 的重複 future 將被刪除,並且只會回傳一次。回傳一個集合的附名二元組 (named 2-tuple of sets)。第一組名為done,包含在等待完成之前完成的 future(已完成或被取消的 future)。第二組名為not_done,包含未完成的 future(未定或運行中的 future)。

timeout 可用於控制回傳前等待的最大秒數。timeout 可以是整數或浮點數。如果未指定timeout 或為None,則等待時間就沒有限制。

return_when 表示此函式應回傳的時間。它必須是以下常數之一:

常數

描述

concurrent.futures.FIRST_COMPLETED

當任何 future 完成或被取消時,該函式就會回傳。

concurrent.futures.FIRST_EXCEPTION

該函式會在任何 future 透過引發例外而完結時回傳。如果 future 沒有引發例外,那麼它等同於ALL_COMPLETED

concurrent.futures.ALL_COMPLETED

當所有 future 都完成或被取消時,該函式才會回傳。

concurrent.futures.as_completed(fs,timeout=None)

回傳由fs 給定的Future 實例(可能由不同的Executor 實例建立)的疊代器,它在完成時產生 future(已完成或被取消的 future)。fs 給定的任何重複的 future 將只被回傳一次。呼叫as_completed() 之前完成的任何 future 將首先產生。如果__next__() 被呼叫,並且在原先呼叫as_completed()timeout 秒後結果仍不可用,則回傳的疊代器會引發TimeoutErrortimeout 可以是整數或浮點數。如果未指定timeout 或為None,則等待時間就沒有限制。

也參考

PEP 3148 -- futures - 非同步地執行運算

描述此功能並提出被包含於 Python 標準函式庫中的提案。

例外類別

exceptionconcurrent.futures.CancelledError

當 future 被取消時引發。

exceptionconcurrent.futures.TimeoutError

TimeoutError 的棄用別名,在 future 操作超過給定超時 (timeout) 時引發。

在 3.11 版的變更:這個類別是TimeoutError 的別名。

exceptionconcurrent.futures.BrokenExecutor

衍生自RuntimeError,當執行器因某種原因損壞時會引發此例外類別,並且不能用於提交或執行新任務。

在 3.7 版被加入.

exceptionconcurrent.futures.InvalidStateError

目前狀態下不允許的 future 操作被執行時而引發。

在 3.8 版被加入.

exceptionconcurrent.futures.thread.BrokenThreadPool

衍生自BrokenExecutor,當ThreadPoolExecutor 的其中一個 worker 初始化失敗時會引發此例外類別。

在 3.7 版被加入.

exceptionconcurrent.futures.process.BrokenProcessPool

衍生自BrokenExecutor(以前為RuntimeError),當ProcessPoolExecutor 的其中一個 worker 以不乾淨的方式終止時將引發此例外類別(例如它是從外面被 kill 掉的)。

在 3.3 版被加入.