佇列 (Queues)¶
asyncio 佇列被設計成與queue
模組類似。儘管 asyncio 佇列不支援執行緒安全 (thread-safe),但他們是被設計來專用於 async/await 程式。
注意 asyncio 的佇列沒有timeout 參數;請使用asyncio.wait_for()
函式來為佇列新增具有超時 (timeout) 設定的操作。
另請參閱下方Examples。
Queue¶
- classasyncio.Queue(maxsize=0)¶
先進先出 (FIFO) 佇列。
如果maxsize 小於或等於零,則佇列大小是無限制的。如果是大於
0
的整數,則當佇列達到maxsize 時,awaitput()
將會阻塞 (block),直到某個元素被get()
取出。不像標準函式庫中執行緒類型的
queue
,佇列的大小一直是已知的,可以透過呼叫qsize()
方法回傳。在 3.10 版的變更:移除loop 參數。
這個類別是不支援執行緒安全的。
- maxsize¶
佇列中可存放的元素數量。
- empty()¶
如果佇列為空則回傳
True
,否則回傳False
。
- asyncget()¶
從佇列中刪除並回傳一個元素。如果佇列為空,則持續等待直到佇列中有元素。
Raises
QueueShutDown
if the queue has been shut down andis empty, or if the queue has been shut down immediately.
- get_nowait()¶
如果佇列內有值則立即回傳佇列中的元素,否則引發
QueueEmpty
。
- asyncjoin()¶
持續阻塞直到佇列中所有的元素都被接收和處理完畢。
當條目新增到佇列的時候,未完成任務的計數就會增加。每當一個消耗者 (consumer) 協程呼叫
task_done()
,表示這個條目已經被取回且被它包含的所有工作都已完成,未完成任務計數就會減少。當未完成計數降到零的時候,join()
阻塞會被解除 (unblock)。
- asyncput(item)¶
將一個元素放進佇列。如果佇列滿了,在新增元素之前,會持續等待直到有空閒插槽 (free slot) 能被使用。
如果佇列已經被關閉,則引發
QueueShutDown
。
- qsize()¶
回傳佇列中的元素數量。
- shutdown(immediate=False)¶
Put a
Queue
instance into a shutdown mode.The queue can no longer grow.Future calls to
put()
raiseQueueShutDown
.Currently blocked callers ofput()
will be unblockedand will raiseQueueShutDown
in the formerly blocked thread.Ifimmediate is false (the default), the queue can be wounddown normally with
get()
calls to extract tasksthat have already been loaded.And if
task_done()
is called for each remaining task, apendingjoin()
will be unblocked normally.Once the queue is empty, future calls to
get()
willraiseQueueShutDown
.Ifimmediate is true, the queue is terminated immediately.The queue is drained to be completely empty. All callers of
join()
are unblocked regardless of the numberof unfinished tasks. Blocked callers ofget()
are unblocked and will raiseQueueShutDown
because thequeue is empty.Use caution when using
join()
withimmediate setto true. This unblocks the join even when no work has been doneon the tasks, violating the usual invariant for joining a queue.在 3.13 版被加入.
- task_done()¶
表示前面一個排隊的工作項目已經完成。
由佇列消耗者使用。對於每個用於取得一個工作項目的
get()
,接續的task_done()
呼叫會告訴佇列這個工作項目的處理已經完成。如果
join()
目前正在阻塞,在所有項目都被處理後會解除阻塞(意味著每個以put()
放進佇列的條目都會收到一個task_done()
)。如果被呼叫的次數多於放入佇列中的項目數量,將引發
ValueError
。
Priority Queue(優先佇列)¶
LIFO Queue¶
例外¶
- exceptionasyncio.QueueEmpty¶
當佇列為空的時候,呼叫
get_nowait()
方法會引發這個例外。
- exceptionasyncio.QueueFull¶
當佇列中條目數量已經達到它的maxsize 時,呼叫
put_nowait()
方法會引發這個例外。
範例¶
佇列能被用於多個並行任務的工作分配:
importasyncioimportrandomimporttimeasyncdefworker(name,queue):whileTrue:# Get a "work item" out of the queue.sleep_for=awaitqueue.get()# Sleep for the "sleep_for" seconds.awaitasyncio.sleep(sleep_for)# Notify the queue that the "work item" has been processed.queue.task_done()print(f'{name} has slept for{sleep_for:.2f} seconds')asyncdefmain():# Create a queue that we will use to store our "workload".queue=asyncio.Queue()# Generate random timings and put them into the queue.total_sleep_time=0for_inrange(20):sleep_for=random.uniform(0.05,1.0)total_sleep_time+=sleep_forqueue.put_nowait(sleep_for)# Create three worker tasks to process the queue concurrently.tasks=[]foriinrange(3):task=asyncio.create_task(worker(f'worker-{i}',queue))tasks.append(task)# Wait until the queue is fully processed.started_at=time.monotonic()awaitqueue.join()total_slept_for=time.monotonic()-started_at# Cancel our worker tasks.fortaskintasks:task.cancel()# Wait until all worker tasks are cancelled.awaitasyncio.gather(*tasks,return_exceptions=True)print('====')print(f'3 workers slept in parallel for{total_slept_for:.2f} seconds')print(f'total expected sleep time:{total_sleep_time:.2f} seconds')asyncio.run(main())