佇列 (Queues)¶
asyncio 佇列被設計成與queue
模組類似。儘管 asyncio 佇列不支援執行緒安全 (thread-safe),但他們是被設計來專用於 async/await 程式。
注意 asyncio 的佇列沒有timeout 參數;請使用asyncio.wait_for()
函式來為佇列新增具有超時 (timeout) 設定的操作。
另請參閱下方Examples。
Queue¶
- classasyncio.Queue(maxsize=0)¶
先進先出 (FIFO) 佇列。
如果maxsize 小於或等於零,則佇列大小是無限制的。如果是大於
0
的整數,則當佇列達到maxsize 時,awaitput()
將會阻塞 (block),直到某個元素被get()
取出。不像標準函式庫中執行緒類型的
queue
,佇列的大小一直是已知的,可以透過呼叫qsize()
方法回傳。在 3.10 版的變更:移除loop 參數。
這個類別是不支援執行緒安全的。
- maxsize¶
佇列中可存放的元素數量。
- empty()¶
如果佇列為空則回傳
True
,否則回傳False
。
- asyncget()¶
從佇列中刪除並回傳一個元素。如果佇列為空,則持續等待直到佇列中有元素。
Raises
QueueShutDown
if the queue has been shut down andis empty, or if the queue has been shut down immediately.
- get_nowait()¶
如果佇列內有值則立即回傳佇列中的元素,否則引發
QueueEmpty
。
- asyncjoin()¶
持續阻塞直到佇列中所有的元素都被接收和處理完畢。
當條目新增到佇列的時候,未完成任務的計數就會增加。每當一個消耗者 (consumer) 協程呼叫
task_done()
,表示這個條目已經被取回且被它包含的所有工作都已完成,未完成任務計數就會減少。當未完成計數降到零的時候,join()
阻塞會被解除 (unblock)。
- asyncput(item)¶
將一個元素放進佇列。如果佇列滿了,在新增元素之前,會持續等待直到有空閒插槽 (free slot) 能被使用。
如果佇列已經被關閉,則引發
QueueShutDown
。
- qsize()¶
回傳佇列中的元素數量。
- shutdown(immediate=False)¶
Shut down the queue, making
get()
andput()
raiseQueueShutDown
.By default,
get()
on a shut down queue will onlyraise once the queue is empty. Setimmediate to true to makeget()
raise immediately instead.All blocked callers of
put()
andget()
will be unblocked. Ifimmediate is true, a task will be markedas done for each remaining item in the queue, which may unblockcallers ofjoin()
.在 3.13 版被加入.
- task_done()¶
表示前面一個排隊的工作項目已經完成。
由佇列消耗者使用。對於每個用於取得一個工作項目的
get()
,接續的task_done()
呼叫會告訴佇列這個工作項目的處理已經完成。如果
join()
目前正在阻塞,在所有項目都被處理後會解除阻塞(意味著每個以put()
放進佇列的條目都會收到一個task_done()
)。shutdown(immediate=True)
callstask_done()
for eachremaining item in the queue.如果被呼叫的次數多於放入佇列中的項目數量,將引發
ValueError
。
Priority Queue(優先佇列)¶
LIFO Queue¶
例外¶
- exceptionasyncio.QueueEmpty¶
當佇列為空的時候,呼叫
get_nowait()
方法會引發這個例外。
- exceptionasyncio.QueueFull¶
當佇列中條目數量已經達到它的maxsize 時,呼叫
put_nowait()
方法會引發這個例外。
範例¶
佇列能被用於多個並行任務的工作分配:
importasyncioimportrandomimporttimeasyncdefworker(name,queue):whileTrue:# Get a "work item" out of the queue.sleep_for=awaitqueue.get()# Sleep for the "sleep_for" seconds.awaitasyncio.sleep(sleep_for)# Notify the queue that the "work item" has been processed.queue.task_done()print(f'{name} has slept for{sleep_for:.2f} seconds')asyncdefmain():# Create a queue that we will use to store our "workload".queue=asyncio.Queue()# Generate random timings and put them into the queue.total_sleep_time=0for_inrange(20):sleep_for=random.uniform(0.05,1.0)total_sleep_time+=sleep_forqueue.put_nowait(sleep_for)# Create three worker tasks to process the queue concurrently.tasks=[]foriinrange(3):task=asyncio.create_task(worker(f'worker-{i}',queue))tasks.append(task)# Wait until the queue is fully processed.started_at=time.monotonic()awaitqueue.join()total_slept_for=time.monotonic()-started_at# Cancel our worker tasks.fortaskintasks:task.cancel()# Wait until all worker tasks are cancelled.awaitasyncio.gather(*tasks,return_exceptions=True)print('====')print(f'3 workers slept in parallel for{total_slept_for:.2f} seconds')print(f'total expected sleep time:{total_sleep_time:.2f} seconds')asyncio.run(main())