同步化原始物件 (Synchronization Primitives)

原始碼:Lib/asyncio/locks.py


asyncio 的同步化原始物件被設計成和那些threading 模組 (module) 中的同名物件相似,但有兩個重要的限制條件:

  • asyncio 原始物件並不支援執行緒安全 (thread-safe),因此他們不可被用於 OS 執行緒同步化(請改用threading);

  • 這些同步化原始物件的方法 (method) 並不接受timeout 引數;要達成有超時 (timeout) 設定的操作請改用asyncio.wait_for() 函式。

asyncio 有以下基礎同步化原始物件:


Lock

classasyncio.Lock

實作了一個給 asyncio 任務 (task) 用的互斥鎖 (mutex lock)。不支援執行緒安全。

一個 asyncio 的鎖可以用來確保一個共享資源的存取權被獨佔。

使用 Lock 的推薦方式是透過asyncwith 陳述式:

lock=asyncio.Lock()# ... laterasyncwithlock:# access shared state

這等價於:

lock=asyncio.Lock()# ... laterawaitlock.acquire()try:# access shared statefinally:lock.release()

在 3.10 版的變更:移除loop 參數。

asyncacquire()

獲得鎖。

此方法會持續等待直到鎖的狀態成為unlocked,並將其設置為locked 和回傳True

當多於一個的協程 (coroutine) 在acquire() 中等待解鎖而被阻塞,最終只會有其中的一個被處理。

鎖的取得方式是公平的:被處理的協程會是最早開始等待解鎖的那一個。

release()

釋放鎖。

如果鎖的狀態為locked 則將其重置為unlocked 並回傳。

如果鎖的狀態為unlockedRuntimeError 會被引發。

locked()

如果鎖的狀態為locked 則回傳True

Event

classasyncio.Event

一個事件 (event) 物件。不支援執行緒安全。

一個 asyncio 事件可以被用於通知多個有發生某些事件於其中的 asyncio 任務。

一個 Event 物件會管理一個內部旗標 (flag),它可以透過set() 方法來被設為true 並透過clear() 方法來重置為falsewait() 方法會被阻塞 (block) 直到該旗標被設為true。該旗標初始設置為false

在 3.10 版的變更:移除loop 參數。

範例:

asyncdefwaiter(event):print('waiting for it ...')awaitevent.wait()print('... got it!')asyncdefmain():# Create an Event object.event=asyncio.Event()# Spawn a Task to wait until 'event' is set.waiter_task=asyncio.create_task(waiter(event))# Sleep for 1 second and set the event.awaitasyncio.sleep(1)event.set()# Wait until the waiter task is finished.awaitwaiter_taskasyncio.run(main())
asyncwait()

持續等待直到事件被設置。

如果事件有被設置則立刻回傳True。否則持續阻塞直到另一個任務呼叫set()

set()

設置事件。

所有正在等待事件被設置的任務會立即被喚醒。

clear()

清除(還原)事件。

正透過wait() 等待的 Tasks 現在會持續阻塞直到set() 方法再次被呼叫。

is_set()

如果事件有被設置則回傳True

Condition

classasyncio.Condition(lock=None)

一個條件 (codition) 物件。不支援執行緒安全。

一個 asyncio 條件原始物件可以被任務用來等待某事件發生,並獲得一個共享資源的獨佔存取權。

本質上,一個 Condition 物件會結合EventLock 的功能。多個 Condition 物件共享一個 Lock 是有可能發生的,這能夠協調關注同一共享資源的不同狀態以取得其獨佔存取權的多個任務。

可選的lock 引數必須是一個Lock 物件或者為None。如為後者則一個新的 Lock 物件會被自動建立。

在 3.10 版的變更:移除loop 參數。

使用 Condition 的推薦方式是透過asyncwith 陳述式:

cond=asyncio.Condition()# ... laterasyncwithcond:awaitcond.wait()

這等價於:

cond=asyncio.Condition()# ... laterawaitcond.acquire()try:awaitcond.wait()finally:cond.release()
asyncacquire()

取得底層的鎖。

此方法會持續等待直到底層的鎖為unlocked,並將其設為locked 並回傳True

notify(n=1)

喚醒至多n 個正在等待此條件的任務(預設為 1),如果少於n 個任務則全部被喚醒。

在此方法被呼叫前必須先獲得鎖,並在之後立刻將其釋放。如果呼叫於一個unlocked 的鎖則RuntimeError 錯誤會被引發。

locked()

如果已取得底層的鎖則回傳True

notify_all()

喚醒所有正在等待此條件的任務。

這個方法的行為就像notify(),但會喚醒所有正在等待的任務。

在此方法被呼叫前必須先獲得鎖,並在之後立刻將其釋放。如果呼叫於一個unlocked 的鎖則RuntimeError 錯誤會被引發。

release()

釋放底層的鎖。

當叫用於一個未被解開的鎖之上時,會引發一個RuntimeError

asyncwait()

持續等待直到被通知 (notify)。

當此方法被呼叫時,如果呼叫它的任務還沒有取得鎖的話,RuntimeError 會被引發。

此方法會釋放底層的鎖,然後持續阻塞直到被notify()notify_all() 的呼叫所喚醒。一但被喚醒,Condition 會重新取得該鎖且此方法會回傳True

Note that a taskmay return from this call spuriously,which is why the caller should always re-check the stateand be prepared towait() again. For this reason, you mayprefer to usewait_for() instead.

asyncwait_for(predicate)

持續等待直到謂語 (predicate) 成為true

謂語必須是一個結果可被直譯為一個 boolean 值的可呼叫物件 (callable)。此方法會重複地wait() 直到謂語求值結果為true。最終的值即為回傳值。

Semaphore

classasyncio.Semaphore(value=1)

一個旗號 (semaphore) 物件。不支援執行緒安全。

一個旗號物件會管理一個內部計數器,會在每次呼叫acquire() 時減少一、每次呼叫release() 時增加一。此計數器永遠不會少於零;當acquire() 發現它是零時,它會持續阻塞並等待某任務呼叫release()

可選的value 引數給定了內部計數器的初始值(預設為1)。如給定的值少於0ValueError 會被引發。

在 3.10 版的變更:移除loop 參數。

使用 Semaphore 的推薦方式是透過asyncwith 陳述式:

sem=asyncio.Semaphore(10)# ... laterasyncwithsem:# work with shared resource

這等價於:

sem=asyncio.Semaphore(10)# ... laterawaitsem.acquire()try:# work with shared resourcefinally:sem.release()
asyncacquire()

取得一個旗號。

如果內部計數器大於零,將其減一並立刻回傳True。如果為零,則持續等待直到release() 被呼叫,並回傳True

locked()

如果旗號無法立即被取得則回傳True

release()

釋放一個旗號,並為其內部的計數器數值增加一。可以把一個正在等待取得旗號的任務叫醒。

BoundedSemaphore 不同,Semaphore 允許release() 的呼叫次數多於acquire()

BoundedSemaphore

classasyncio.BoundedSemaphore(value=1)

一個有界的旗號物件。不支援執行緒安全。

Bounded Semaphore 是Semaphore 的另一版本,如果其內部的計數器數值增加至大於初始value 值的話,ValueError 會在release() 時被引發。

在 3.10 版的變更:移除loop 參數。

Barrier

classasyncio.Barrier(parties)

一個屏障 (barrier) 物件。不支援執行緒安全。

A barrier is a simple synchronization primitive that allows to block untilparties number of tasks are waiting on it.Tasks can wait on thewait() method and would be blocked untilthe specified number of tasks end up waiting onwait().At that point all of the waiting tasks would unblock simultaneously.

asyncwith can be used as an alternative to awaiting onwait().

The barrier can be reused any number of times.

範例:

asyncdefexample_barrier():# barrier with 3 partiesb=asyncio.Barrier(3)# create 2 new waiting tasksasyncio.create_task(b.wait())asyncio.create_task(b.wait())awaitasyncio.sleep(0)print(b)# The third .wait() call passes the barrierawaitb.wait()print(b)print("barrier passed")awaitasyncio.sleep(0)print(b)asyncio.run(example_barrier())

Result of this example is:

<asyncio.locks.Barrierobjectat0x...[filling,waiters:2/3]><asyncio.locks.Barrierobjectat0x...[draining,waiters:0/3]>barrierpassed<asyncio.locks.Barrierobjectat0x...[filling,waiters:0/3]>

在 3.11 版被加入.

asyncwait()

Pass the barrier. When all the tasks party to the barrier have calledthis function, they are all unblocked simultaneously.

When a waiting or blocked task in the barrier is cancelled,this task exits the barrier which stays in the same state.If the state of the barrier is "filling", the number of waiting taskdecreases by 1.

The return value is an integer in the range of 0 toparties-1, differentfor each task. This can be used to select a task to do some specialhousekeeping, e.g.:

...asyncwithbarrierasposition:ifposition==0:# Only one task prints thisprint('End of *draining phase*')

This method may raise aBrokenBarrierError exception if thebarrier is broken or reset while a task is waiting.It could raise aCancelledError if a task is cancelled.

asyncreset()

Return the barrier to the default, empty state. Any tasks waiting on itwill receive theBrokenBarrierError exception.

If a barrier is broken it may be better to just leave it and create a new one.

asyncabort()

Put the barrier into a broken state. This causes any active or futurecalls towait() to fail with theBrokenBarrierError.Use this for example if one of the tasks needs to abort, to avoid infinitewaiting tasks.

parties

The number of tasks required to pass the barrier.

n_waiting

The number of tasks currently waiting in the barrier while filling.

broken

A boolean that isTrue if the barrier is in the broken state.

exceptionasyncio.BrokenBarrierError

This exception, a subclass ofRuntimeError, is raised when theBarrier object is reset or broken.


在 3.9 版的變更:透過awaitlockyieldfromlock 和/或with 陳述式 (withawaitlock,with(yieldfromlock)) 來取得鎖的方式已被移除。請改用asyncwithlock