佇列 (Queues)

原始碼:Lib/asyncio/queues.py


asyncio 佇列被設計成與queue 模組類似。儘管 asyncio 佇列不支援執行緒安全 (thread-safe),但他們是被設計來專用於 async/await 程式。

注意 asyncio 的佇列沒有timeout 參數;請使用asyncio.wait_for() 函式來為佇列新增具有超時 (timeout) 設定的操作。

另請參閱下方Examples

Queue

classasyncio.Queue(maxsize=0)

先進先出 (FIFO) 佇列。

如果maxsize 小於或等於零,則佇列大小是無限制的。如果是大於0 的整數,則當佇列達到maxsize 時,awaitput() 將會阻塞 (block),直到某個元素被get() 取出。

不像標準函式庫中執行緒類型的queue,佇列的大小一直是已知的,可以透過呼叫qsize() 方法回傳。

在 3.10 版的變更:移除loop 參數。

這個類別是不支援執行緒安全的

maxsize

佇列中可存放的元素數量。

empty()

如果佇列為空則回傳True,否則回傳False

full()

如果有maxsize 個條目在佇列中,則回傳True

如果佇列用maxsize=0 (預設)初始化,則full() 永遠不會回傳True

asyncget()

從佇列中刪除並回傳一個元素。如果佇列為空,則持續等待直到佇列中有元素。

RaisesQueueShutDown if the queue has been shut down andis empty, or if the queue has been shut down immediately.

get_nowait()

如果佇列內有值則立即回傳佇列中的元素,否則引發QueueEmpty

asyncjoin()

持續阻塞直到佇列中所有的元素都被接收和處理完畢。

當條目新增到佇列的時候,未完成任務的計數就會增加。每當一個消耗者 (consumer) 協程呼叫task_done(),表示這個條目已經被取回且被它包含的所有工作都已完成,未完成任務計數就會減少。當未完成計數降到零的時候,join() 阻塞會被解除 (unblock)。

asyncput(item)

將一個元素放進佇列。如果佇列滿了,在新增元素之前,會持續等待直到有空閒插槽 (free slot) 能被使用。

如果佇列已經被關閉,則引發QueueShutDown

put_nowait(item)

不阻塞地將一個元素放入佇列。

如果沒有立即可用的空閒插槽,引發QueueFull

qsize()

回傳佇列中的元素數量。

shutdown(immediate=False)

Put aQueue instance into a shutdown mode.

The queue can no longer grow.Future calls toput() raiseQueueShutDown.Currently blocked callers ofput() will be unblockedand will raiseQueueShutDown in the formerly blocked thread.

Ifimmediate is false (the default), the queue can be wounddown normally withget() calls to extract tasksthat have already been loaded.

And iftask_done() is called for each remaining task, apendingjoin() will be unblocked normally.

Once the queue is empty, future calls toget() willraiseQueueShutDown.

Ifimmediate is true, the queue is terminated immediately.The queue is drained to be completely empty. All callers ofjoin() are unblocked regardless of the numberof unfinished tasks. Blocked callers ofget()are unblocked and will raiseQueueShutDown because thequeue is empty.

Use caution when usingjoin() withimmediate setto true. This unblocks the join even when no work has been doneon the tasks, violating the usual invariant for joining a queue.

在 3.13 版被加入.

task_done()

表示前面一個排隊的工作項目已經完成。

由佇列消耗者使用。對於每個用於取得一個工作項目的get(),接續的task_done() 呼叫會告訴佇列這個工作項目的處理已經完成。

如果join() 目前正在阻塞,在所有項目都被處理後會解除阻塞(意味著每個以put() 放進佇列的條目都會收到一個task_done())。

如果被呼叫的次數多於放入佇列中的項目數量,將引發ValueError

Priority Queue(優先佇列)

classasyncio.PriorityQueue

Queue 的變形;按優先順序取出條目 (最小的先取出)。

條目通常是(priority_number,data) 形式的 tuple(元組)。

LIFO Queue

classasyncio.LifoQueue

Queue 的變形,先取出最近新增的條目(後進先出)。

例外

exceptionasyncio.QueueEmpty

當佇列為空的時候,呼叫get_nowait() 方法會引發這個例外。

exceptionasyncio.QueueFull

當佇列中條目數量已經達到它的maxsize 時,呼叫put_nowait() 方法會引發這個例外。

exceptionasyncio.QueueShutDown

Exception raised whenput() orget() iscalled on a queue which has been shut down.

在 3.13 版被加入.

範例

佇列能被用於多個並行任務的工作分配:

importasyncioimportrandomimporttimeasyncdefworker(name,queue):whileTrue:# Get a "work item" out of the queue.sleep_for=awaitqueue.get()# Sleep for the "sleep_for" seconds.awaitasyncio.sleep(sleep_for)# Notify the queue that the "work item" has been processed.queue.task_done()print(f'{name} has slept for{sleep_for:.2f} seconds')asyncdefmain():# Create a queue that we will use to store our "workload".queue=asyncio.Queue()# Generate random timings and put them into the queue.total_sleep_time=0for_inrange(20):sleep_for=random.uniform(0.05,1.0)total_sleep_time+=sleep_forqueue.put_nowait(sleep_for)# Create three worker tasks to process the queue concurrently.tasks=[]foriinrange(3):task=asyncio.create_task(worker(f'worker-{i}',queue))tasks.append(task)# Wait until the queue is fully processed.started_at=time.monotonic()awaitqueue.join()total_slept_for=time.monotonic()-started_at# Cancel our worker tasks.fortaskintasks:task.cancel()# Wait until all worker tasks are cancelled.awaitasyncio.gather(*tasks,return_exceptions=True)print('====')print(f'3 workers slept in parallel for{total_slept_for:.2f} seconds')print(f'total expected sleep time:{total_sleep_time:.2f} seconds')asyncio.run(main())