concurrent.futures --- 啟動平行任務¶
在 3.2 版被加入.
原始碼:Lib/concurrent/futures/thread.py、Lib/concurrent/futures/process.py 與Lib/concurrent/futures/interpreter.py
concurrent.futures 模組提供了一個高階介面來非同步地 (asynchronously) 執行可呼叫物件 (callable) 。
非同步執行可以透過ThreadPoolExecutor 或InterpreterPoolExecutor 來使用執行緒 (thread) 執行,或透過ProcessPoolExecutor 來使用單獨行程 (process) 執行。兩者都實作了相同的介面,該介面由抽象的Executor 類別定義。
可用性: not WASI.
此模組在 WebAssembly 平台上不起作用或無法使用。更多資訊請參閱WebAssembly 平台。
Executor 物件¶
- classconcurrent.futures.Executor¶
提供非同步執行呼叫方法的抽象類別。不應直接使用它,而應透過其具體子類別來使用。
- submit(fn,/,*args,**kwargs)¶
為可呼叫物件fn 排程來以
fn(*args,**kwargs)的形式執行並回傳一個表示可呼叫的執行的Future物件。withThreadPoolExecutor(max_workers=1)asexecutor:future=executor.submit(pow,323,1235)print(future.result())
- map(fn,*iterables,timeout=None,chunksize=1,buffersize=None)¶
類似於
map(fn,*iterables),除了:Theiterables are collected immediately rather than lazily, unless abuffersize is specified to limit the number of submitted tasks whoseresults have not yet been yielded. If the buffer is full, iteration overtheiterables pauses until a result is yielded from the buffer.
fn 是非同步執行的,並且對fn 的多次呼叫可以並行處理。
如果
__next__()被呼叫,且在原先呼叫Executor.map()的timeout 秒後結果仍不可用,回傳的疊代器就會引發TimeoutError。timeout 可以是整數或浮點數。如果未指定timeout 或為None,則等待時間就不會有限制。如果fn 呼叫引發例外,則當從疊代器中檢索到它的值時將引發該例外。
使用
ProcessPoolExecutor時,此方法將iterables 分成許多分塊 (chunks),並將其作為獨立的任務來提交給池 (pool)。可以透過將chunksize 設定為正整數來指定這些分塊的(約略)大小。對於非常長的可疊代物件,chunksize 使用較大的值(與預設大小 1 相比)可以顯著提高性能。對於ThreadPoolExecutor和InterpreterPoolExecutor,chunksize 無效。在 3.5 版的變更:新增chunksize 參數。
在 3.14 版的變更:新增buffersize 參數。
- shutdown(wait=True,*,cancel_futures=False)¶
向 executor 發出訊號 (signal),表明它應該在目前未定 (pending) 的 future 完成執行時釋放它正在使用的任何資源。在關閉後呼叫
Executor.submit()和Executor.map()將引發RuntimeError。如果wait 為
True則此方法將不會回傳,直到所有未定的 futures 完成執行並且與 executor 關聯的資源都被釋放。如果wait 為False則此方法將立即回傳,並且當所有未定的 future 執行完畢時,與 executor 關聯的資源將被釋放。不管wait 的值如何,整個 Python 程式都不會退出,直到所有未定的 futures 執行完畢。如果cancel_futures 為
True,此方法將取消 executor 尚未開始運行的所有未定 future。無論cancel_futures 的值如何,任何已完成或正在運行的 future 都不會被取消。如果cancel_futures 和wait 都為
True,則 executor 已開始運行的所有 future 將在此方法回傳之前完成。剩餘的 future 被取消。如果使用
with陳述式來將 executor 用作context manager,那你就可以不用明確地呼叫此方法,這將會自己關閉Executor(如同呼叫Executor.shutdown()時wait 被設定為True般等待):importshutilwithThreadPoolExecutor(max_workers=4)ase:e.submit(shutil.copy,'src1.txt','dest1.txt')e.submit(shutil.copy,'src2.txt','dest2.txt')e.submit(shutil.copy,'src3.txt','dest3.txt')e.submit(shutil.copy,'src4.txt','dest4.txt')
在 3.9 版的變更:新增cancel_futures。
ThreadPoolExecutor¶
ThreadPoolExecutor 是一個Executor 子類別,它使用執行緒池來非同步地執行呼叫。
當與Future 關聯的可呼叫物件等待另一個Future 的結果時,可能會發生死鎖 (deadlock)。例如:
importtimedefwait_on_b():time.sleep(5)print(b.result())# b 永遠不會完成,因為它正在等待 a。return5defwait_on_a():time.sleep(5)print(a.result())# a 永遠不會完成,因為它正在等待 b。return6executor=ThreadPoolExecutor(max_workers=2)a=executor.submit(wait_on_b)b=executor.submit(wait_on_a)
和:
defwait_on_future():f=executor.submit(pow,5,2)# 這將永遠不會完成,因為只有一個工作執行緒且# 它正在執行這個函式。print(f.result())executor=ThreadPoolExecutor(max_workers=1)executor.submit(wait_on_future)
- classconcurrent.futures.ThreadPoolExecutor(max_workers=None,thread_name_prefix='',initializer=None,initargs=())¶
一個
Executor子類別,它使用最多有max_workers 個執行緒的池來非同步地執行呼叫。所有排隊到
ThreadPoolExecutor的執行緒都將在直譯器退出之前加入。請注意,執行此操作的退出處理程式會在任何使用atexit新增的退出處理程式之前執行。這意味著必須捕獲並處理主執行緒中的例外,以便向執行緒發出訊號來正常退出 (gracefully exit)。因此,建議不要將ThreadPoolExecutor用於長時間運行的任務。initializer 是一個可選的可呼叫物件,在每個工作執行緒開始時呼叫;initargs 是傳遞給 initializer 的引數元組 (tuple)。如果initializer 引發例外,所有目前未定的作業以及任何向池中提交 (submit) 更多作業的嘗試都將引發
BrokenThreadPool。在 3.5 版的變更:如果max_workers 為
None或未給定,它將預設為機器上的處理器數量乘以5,這假定了ThreadPoolExecutor通常用於 I/O 重疊而非 CPU 密集的作業,並且 worker 的數量應該高於ProcessPoolExecutor的 worker 數量。在 3.6 版的變更:新增thread_name_prefix 參數以允許使用者控制由池所建立的工作執行緒 (worker thread) 的
threading.Thread名稱,以便於除錯。在 3.7 版的變更:新增initializer 與initargs 引數。
在 3.8 版的變更:max_workers 的預設值改為
min(32,os.cpu_count()+4)。此預設值為 I/O 密集任務至少保留了 5 個 worker。它最多使用 32 個 CPU 核心來執行CPU 密集任務,以釋放 GIL。並且它避免了在多核機器上隱晦地使用非常大量的資源。ThreadPoolExecutor 現在在啟動max_workers 工作執行緒之前會重用 (reuse) 空閒的工作執行緒。
在 3.13 版的變更:max_workers 的預設值被改為
min(32,(os.process_cpu_count()or1)+4)。
ThreadPoolExecutor 範例¶
importconcurrent.futuresimporturllib.requestURLS=['http://www.foxnews.com/','http://www.cnn.com/','http://europe.wsj.com/','http://www.bbc.co.uk/','http://nonexistent-subdomain.python.org/']# Retrieve a single page and report the URL and contentsdefload_url(url,timeout):withurllib.request.urlopen(url,timeout=timeout)asconn:returnconn.read()# We can use a with statement to ensure threads are cleaned up promptlywithconcurrent.futures.ThreadPoolExecutor(max_workers=5)asexecutor:# Start the load operations and mark each future with its URLfuture_to_url={executor.submit(load_url,url,60):urlforurlinURLS}forfutureinconcurrent.futures.as_completed(future_to_url):url=future_to_url[future]try:data=future.result()exceptExceptionasexc:print('%r generated an exception:%s'%(url,exc))else:print('%r page is%d bytes'%(url,len(data)))
InterpreterPoolExecutor¶
在 3.14 版被加入.
TheInterpreterPoolExecutor class uses a pool of interpretersto execute calls asynchronously. It is aThreadPoolExecutorsubclass, which means each worker is running in its own thread.The difference here is that each worker has its own interpreter,and runs each task using that interpreter.
The biggest benefit to using interpreters instead of only threadsis true multi-core parallelism. Each interpreter has its ownGlobal Interpreter Lock, so coderunning in one interpreter can run on one CPU core, while code inanother interpreter runs unblocked on a different core.
The tradeoff is that writing concurrent code for use with multipleinterpreters can take extra effort. However, this is because itforces you to be deliberate about how and when interpreters interact,and to be explicit about what data is shared between interpreters.This results in several benefits that help balance the extra effort,including true multi-core parallelism, For example, code writtenthis way can make it easier to reason about concurrency. Anothermajor benefit is that you don't have to deal with several of thebig pain points of using threads, like race conditions.
Each worker's interpreter is isolated from all the other interpreters."Isolated" means each interpreter has its own runtime state andoperates completely independently. For example, if you redirectsys.stdout in one interpreter, it will not be automaticallyredirected to any other interpreter. If you import a module in oneinterpreter, it is not automatically imported in any other. Youwould need to import the module separately in interpreter whereyou need it. In fact, each module imported in an interpreter isa completely separate object from the same module in a differentinterpreter, includingsys,builtins,and even__main__.
Isolation means a mutable object, or other data, cannot be usedby more than one interpreter at the same time. That effectively meansinterpreters cannot actually share such objects or data. Instead,each interpreter must have its own copy, and you will have tosynchronize any changes between the copies manually. Immutableobjects and data, like the builtin singletons, strings, and tuplesof immutable objects, don't have these limitations.
Communicating and synchronizing between interpreters is most effectivelydone using dedicated tools, like those proposed inPEP 734. One lessefficient alternative is to serialize withpickle and then sendthe bytes over a sharedsocket orpipe.
- classconcurrent.futures.InterpreterPoolExecutor(max_workers=None,thread_name_prefix='',initializer=None,initargs=())¶
A
ThreadPoolExecutorsubclass that executes calls asynchronouslyusing a pool of at mostmax_workers threads. Each thread runstasks in its own interpreter. The worker interpreters are isolatedfrom each other, which means each has its own runtime state and thatthey can't share any mutable objects or other data. Each interpreterhas its ownGlobal Interpreter Lock,which means code run with this executor has true multi-core parallelism.The optionalinitializer andinitargs arguments have the samemeaning as for
ThreadPoolExecutor: the initializer is runwhen each worker is created, though in this case it is run inthe worker's interpreter. The executor serializes theinitializerandinitargs usingpicklewhen sending them to the worker'sinterpreter.備註
The executor may replace uncaught exceptions frominitializerwith
ExecutionFailed.Other caveats from parent
ThreadPoolExecutorapply here.
submit() andmap() work like normal,except the worker serializes the callable and arguments usingpickle when sending them to its interpreter. The workerlikewise serializes the return value when sending it back.
When a worker's current task raises an uncaught exception, the workeralways tries to preserve the exception as-is. If that is successfulthen it also sets the__cause__ to a correspondingExecutionFailedinstance, which contains a summary of the original exception.In the uncommon case that the worker is not able to preserve theoriginal as-is then it directly preserves the correspondingExecutionFailedinstance instead.
ProcessPoolExecutor¶
ProcessPoolExecutor 類別是一個Executor 的子類別,它使用行程池來非同步地執行呼叫。ProcessPoolExecutor 使用了multiprocessing 模組,這允許它避開全域直譯器鎖 (Global Interpreter Lock),但也意味著只能執行和回傳可被 pickle 的 (picklable) 物件。
__main__ 模組必須可以被工作子行程 (worker subprocess) 引入。這意味著ProcessPoolExecutor 將無法在交互式直譯器 (interactive interpreter) 中工作。
從提交給ProcessPoolExecutor 的可呼叫物件中呼叫Executor 或Future 方法將導致死鎖。
Note that the restrictions on functions and arguments needing to picklable aspermultiprocessing.Process apply when usingsubmit()andmap() on aProcessPoolExecutor. A function definedin a REPL or a lambda should not be expected to work.
- classconcurrent.futures.ProcessPoolExecutor(max_workers=None,mp_context=None,initializer=None,initargs=(),max_tasks_per_child=None)¶
一個
Executor子類別,它使用了最多有max_workers 個行程的池來非同步地執行呼叫。如果max_workers 為None或未給定,它將被預設為os.process_cpu_count()。如果max_workers 小於或等於0,則會引發ValueError。在 Windows 上,max_workers 必須小於或等於61。如果不是,則會引發ValueError。如果max_workers 為None,則預設選擇最多為61,即便有更多處理器可用。mp_context 可以是multiprocessing情境 (context) 或None。它將用於啟動 worker。如果mp_context 為None或未給定,則使用預設的multiprocessing情境。請見Contexts and start methods。initializer 是一個可選的可呼叫物件,在每個工作行程 (worker process) 開始時呼叫;initargs 是傳遞給 initializer 的引數元組。如果initializer 引發例外,所有目前未定的作業以及任何向池中提交更多作業的嘗試都將引發
BrokenProcessPool。max_tasks_per_child 是一個可選引數,它指定單個行程在退出並被新的工作行程替換之前可以執行的最大任務數。預設情況下max_tasks_per_child 是
None,這意味著工作行程的生命週期將與池一樣長。當指定最大值時,在沒有mp_context 參數的情況下,將預設使用 "spawn" 做為 multiprocessing 啟動方法。此功能與 "fork" 啟動方法不相容。在 3.3 版的變更:當其中一個工作行程突然終止時,現在會引發
BrokenProcessPool錯誤。在過去,此行為是未定義的 (undefined),但對 executor 或其 future 的操作經常會發生凍結或死鎖。在 3.7 版的變更:新增了mp_context 引數以允許使用者控制由池所建立的工作行程的 start_method。
新增initializer 與initargs 引數。
在 3.11 版的變更:新增了max_tasks_per_child 引數以允許使用者控制池中 worker 的生命週期。
在 3.12 版的變更:在 POSIX 系統上,如果你的應用程式有多個執行緒並且
multiprocessing情境使用了"fork"啟動方法:內部呼叫以產生 worker 的os.fork()函式可能會引發DeprecationWarning。傳遞一個mp_context 以配置為使用不同的啟動方法。更多說明請參閱os.fork()文件。在 3.13 版的變更:max_workers uses
os.process_cpu_count()by default, instead ofos.cpu_count().在 3.14 版的變更:The default process start method (seeContexts and start methods) changed away fromfork. If yourequire thefork start method for
ProcessPoolExecutoryou mustexplicitly passmp_context=multiprocessing.get_context("fork").- terminate_workers()¶
Attempt to terminate all living worker processes immediately by calling
Process.terminateon each of them.Internally, it will also callExecutor.shutdown()to ensure that allother resources associated with the executor are freed.After calling this method the caller should no longer submit tasks to theexecutor.
在 3.14 版被加入.
- kill_workers()¶
Attempt to kill all living worker processes immediately by calling
Process.killon each of them.Internally, it will also callExecutor.shutdown()to ensure that allother resources associated with the executor are freed.After calling this method the caller should no longer submit tasks to theexecutor.
在 3.14 版被加入.
ProcessPoolExecutor 範例¶
importconcurrent.futuresimportmathPRIMES=[112272535095293,112582705942171,112272535095293,115280095190773,115797848077099,1099726899285419]defis_prime(n):ifn<2:returnFalseifn==2:returnTrueifn%2==0:returnFalsesqrt_n=int(math.floor(math.sqrt(n)))foriinrange(3,sqrt_n+1,2):ifn%i==0:returnFalsereturnTruedefmain():withconcurrent.futures.ProcessPoolExecutor()asexecutor:fornumber,primeinzip(PRIMES,executor.map(is_prime,PRIMES)):print('%d is prime:%s'%(number,prime))if__name__=='__main__':main()
Future 物件¶
Future 類別封裝了可呼叫物件的非同步執行。Future 實例由Executor.submit() 建立。
- classconcurrent.futures.Future¶
封裝可呼叫物件的非同步執行。
Future實例由Executor.submit()建立,且除測試外不應直接建立。- cancel()¶
嘗試取消呼叫。如果呼叫目前正在執行或已完成運行且無法取消,則該方法將回傳
False,否則呼叫將被取消並且該方法將回傳True。
- cancelled()¶
如果該呼叫成功被取消,則回傳
True。
- running()¶
如果呼叫正在執行且無法取消,則回傳
True。
- done()¶
如果呼叫成功被取消或結束運行,則回傳
True。
- result(timeout=None)¶
回傳該呼叫回傳的值。如果呼叫尚未完成,則此方法將等待至多timeout 秒。如果呼叫在timeout 秒內未完成,則會引發
TimeoutError。timeout 可以是整數或浮點數。如果未指定timeout 或為None,則等待時間就不會有限制。如果 future 在完成之前被取消,那麼
CancelledError將被引發。如果該呼叫引發了例外,此方法將引發相同的例外。
- exception(timeout=None)¶
回傳該呼叫引發的例外。如果呼叫尚未完成,則此方法將等待至多timeout 秒。如果呼叫在timeout 秒內未完成,則會引發
TimeoutError。timeout 可以是整數或浮點數。如果未指定timeout 或為None,則等待時間就不會有限制。如果 future 在完成之前被取消,那麼
CancelledError將被引發。如果呼叫在沒有引發的情況下完成,則回傳
None。
- add_done_callback(fn)¶
將可呼叫的fn 附加到 future 上。當 future 被取消或完成運行時,fn 將被以 future 作為其唯一引數來呼叫。
新增的可呼叫物件按新增順序呼叫,並且始終在屬於新增它們的行程的執行緒中呼叫。如果可呼叫物件引發
Exception子類別,它將被記錄 (log) 並忽略。如果可呼叫物件引發BaseException子類別,該行為未定義。如果 future 已經完成或被取消,fn 將立即被呼叫。
以下
Future方法旨在用於單元測試和Executor實作。- set_running_or_notify_cancel()¶
此方法只能在與
Future關聯的工作被執行之前於Executor實作中呼叫,或者在單元測試中呼叫。如果該方法回傳
False則Future已被取消,即Future.cancel()被呼叫並回傳True。任何等待Future完成的執行緒(即透過as_completed()或wait())將被喚醒。如果該方法回傳
True則代表Future未被取消並已進入運行狀態,意即呼叫Future.running()將回傳True。此方法只能呼叫一次,且不能在呼叫
Future.set_result()或Future.set_exception()之後呼叫。
- set_result(result)¶
將與
Future關聯的工作結果設定為result。此方法只能在
Executor實作中和單元測試中使用。在 3.8 版的變更:如果
Future已經完成,此方法會引發concurrent.futures.InvalidStateError。
模組函式¶
- concurrent.futures.wait(fs,timeout=None,return_when=ALL_COMPLETED)¶
等待fs 給定的
Future實例(可能由不同的Executor實例建立)完成。提供給fs 的重複 future 將被刪除,並且只會回傳一次。回傳一個集合的附名二元組 (named 2-tuple of sets)。第一組名為done,包含在等待完成之前完成的 future(已完成或被取消的 future)。第二組名為not_done,包含未完成的 future(未定或運行中的 future)。timeout 可用於控制回傳前等待的最大秒數。timeout 可以是整數或浮點數。如果未指定timeout 或為
None,則等待時間就沒有限制。return_when 表示此函式應回傳的時間。它必須是以下常數之一:
常數
描述
- concurrent.futures.FIRST_COMPLETED¶
當任何 future 完成或被取消時,該函式就會回傳。
- concurrent.futures.FIRST_EXCEPTION¶
該函式會在任何 future 透過引發例外而完結時回傳。如果 future 沒有引發例外,那麼它等同於
ALL_COMPLETED。- concurrent.futures.ALL_COMPLETED¶
當所有 future 都完成或被取消時,該函式才會回傳。
- concurrent.futures.as_completed(fs,timeout=None)¶
回傳由fs 給定的
Future實例(可能由不同的Executor實例建立)的疊代器,它在完成時產生 future(已完成或被取消的 future)。fs 給定的任何重複的 future 將只被回傳一次。呼叫as_completed()之前完成的任何 future 將首先產生。如果__next__()被呼叫,並且在原先呼叫as_completed()的timeout 秒後結果仍不可用,則回傳的疊代器會引發TimeoutError。timeout 可以是整數或浮點數。如果未指定timeout 或為None,則等待時間就沒有限制。
也參考
- PEP 3148 -- futures - 非同步地執行運算
描述此功能並提出被包含於 Python 標準函式庫中的提案。
例外類別¶
- exceptionconcurrent.futures.CancelledError¶
當 future 被取消時引發。
- exceptionconcurrent.futures.TimeoutError¶
TimeoutError的棄用別名,在 future 操作超過給定超時 (timeout) 時引發。在 3.11 版的變更:這個類別是
TimeoutError的別名。
- exceptionconcurrent.futures.BrokenExecutor¶
衍生自
RuntimeError,當執行器因某種原因損壞時會引發此例外類別,並且不能用於提交或執行新任務。在 3.7 版被加入.
- exceptionconcurrent.futures.InvalidStateError¶
目前狀態下不允許的 future 操作被執行時而引發。
在 3.8 版被加入.
- exceptionconcurrent.futures.thread.BrokenThreadPool¶
衍生自
BrokenExecutor,當ThreadPoolExecutor的其中一個 worker 初始化失敗時會引發此例外類別。在 3.7 版被加入.
- exceptionconcurrent.futures.interpreter.BrokenInterpreterPool¶
衍生自
BrokenThreadPool,當InterpreterPoolExecutor的其中一個 worker 初始化失敗時會引發此例外類別。在 3.14 版被加入.
- exceptionconcurrent.futures.interpreter.ExecutionFailed¶
Raised from
InterpreterPoolExecutorwhenthe given initializer fails or fromsubmit()when there's an uncaughtexception from the submitted task.在 3.14 版被加入.
- exceptionconcurrent.futures.process.BrokenProcessPool¶
衍生自
BrokenExecutor(以前為RuntimeError),當ProcessPoolExecutor的其中一個 worker 以不乾淨的方式終止時將引發此例外類別(例如它是從外面被 kill 掉的)。在 3.3 版被加入.