佇列 (Queues)

原始碼:Lib/asyncio/queues.py


asyncio 佇列被設計成與queue 模組類似。儘管 asyncio 佇列不支援執行緒安全 (thread-safe),但他們是被設計來專用於 async/await 程式。

注意 asyncio 的佇列沒有timeout 參數;請使用asyncio.wait_for() 函式來為佇列新增具有超時 (timeout) 設定的操作。

另請參閱下方Examples

Queue

classasyncio.Queue(maxsize=0)

先進先出 (FIFO) 佇列。

如果maxsize 小於或等於零,則佇列大小是無限制的。如果是大於0 的整數,則當佇列達到maxsize 時,awaitput() 將會阻塞 (block),直到某個元素被get() 取出。

不像標準函式庫中執行緒類型的queue,佇列的大小一直是已知的,可以透過呼叫qsize() 方法回傳。

在 3.10 版的變更:移除loop 參數。

這個類別是不支援執行緒安全的

maxsize

佇列中可存放的元素數量。

empty()

如果佇列為空則回傳True,否則回傳False

full()

如果有maxsize 個條目在佇列中,則回傳True

如果佇列用maxsize=0 (預設)初始化,則full() 永遠不會回傳True

asyncget()

從佇列中刪除並回傳一個元素。如果佇列為空,則持續等待直到佇列中有元素。

RaisesQueueShutDown if the queue has been shut down andis empty, or if the queue has been shut down immediately.

get_nowait()

如果佇列內有值則立即回傳佇列中的元素,否則引發QueueEmpty

asyncjoin()

持續阻塞直到佇列中所有的元素都被接收和處理完畢。

當條目新增到佇列的時候,未完成任務的計數就會增加。每當一個消耗者 (consumer) 協程呼叫task_done(),表示這個條目已經被取回且被它包含的所有工作都已完成,未完成任務計數就會減少。當未完成計數降到零的時候,join() 阻塞會被解除 (unblock)。

asyncput(item)

將一個元素放進佇列。如果佇列滿了,在新增元素之前,會持續等待直到有空閒插槽 (free slot) 能被使用。

如果佇列已經被關閉,則引發QueueShutDown

put_nowait(item)

不阻塞地將一個元素放入佇列。

如果沒有立即可用的空閒插槽,引發QueueFull

qsize()

回傳佇列中的元素數量。

shutdown(immediate=False)

Shut down the queue, makingget() andput()raiseQueueShutDown.

By default,get() on a shut down queue will onlyraise once the queue is empty. Setimmediate to true to makeget() raise immediately instead.

All blocked callers ofput() andget()will be unblocked. Ifimmediate is true, a task will be markedas done for each remaining item in the queue, which may unblockcallers ofjoin().

在 3.13 版被加入.

task_done()

表示前面一個排隊的工作項目已經完成。

由佇列消耗者使用。對於每個用於取得一個工作項目的get(),接續的task_done() 呼叫會告訴佇列這個工作項目的處理已經完成。

如果join() 目前正在阻塞,在所有項目都被處理後會解除阻塞(意味著每個以put() 放進佇列的條目都會收到一個task_done())。

shutdown(immediate=True) callstask_done() for eachremaining item in the queue.

如果被呼叫的次數多於放入佇列中的項目數量,將引發ValueError

Priority Queue(優先佇列)

classasyncio.PriorityQueue

Queue 的變形;按優先順序取出條目 (最小的先取出)。

條目通常是(priority_number,data) 形式的 tuple(元組)。

LIFO Queue

classasyncio.LifoQueue

Queue 的變形,先取出最近新增的條目(後進先出)。

例外

exceptionasyncio.QueueEmpty

當佇列為空的時候,呼叫get_nowait() 方法會引發這個例外。

exceptionasyncio.QueueFull

當佇列中條目數量已經達到它的maxsize 時,呼叫put_nowait() 方法會引發這個例外。

exceptionasyncio.QueueShutDown

Exception raised whenput() orget() iscalled on a queue which has been shut down.

在 3.13 版被加入.

範例

佇列能被用於多個並行任務的工作分配:

importasyncioimportrandomimporttimeasyncdefworker(name,queue):whileTrue:# Get a "work item" out of the queue.sleep_for=awaitqueue.get()# Sleep for the "sleep_for" seconds.awaitasyncio.sleep(sleep_for)# Notify the queue that the "work item" has been processed.queue.task_done()print(f'{name} has slept for{sleep_for:.2f} seconds')asyncdefmain():# Create a queue that we will use to store our "workload".queue=asyncio.Queue()# Generate random timings and put them into the queue.total_sleep_time=0for_inrange(20):sleep_for=random.uniform(0.05,1.0)total_sleep_time+=sleep_forqueue.put_nowait(sleep_for)# Create three worker tasks to process the queue concurrently.tasks=[]foriinrange(3):task=asyncio.create_task(worker(f'worker-{i}',queue))tasks.append(task)# Wait until the queue is fully processed.started_at=time.monotonic()awaitqueue.join()total_slept_for=time.monotonic()-started_at# Cancel our worker tasks.fortaskintasks:task.cancel()# Wait until all worker tasks are cancelled.awaitasyncio.gather(*tasks,return_exceptions=True)print('====')print(f'3 workers slept in parallel for{total_slept_for:.2f} seconds')print(f'total expected sleep time:{total_sleep_time:.2f} seconds')asyncio.run(main())