queue --- 同步佇列 (synchronized queue) 類別

原始碼:Lib/queue.py


queue module(模組)實作多生產者、多消費者佇列。在執行緒程式設計中,必須在多執行緒之間安全地交換資訊時,特別有用。此 module 中的Queue class 實作所有必需的鎖定語義 (locking semantics)。

此 module 實作三種型別的佇列,它們僅在取出條目的順序上有所不同。在FIFO 佇列中,先加入的任務是第一個被取出的。在LIFO 佇列中,最近被加入的條目是第一個被取出的(像堆疊 (stack) 一樣操作)。使用優先佇列 (priority queue) 時,條目將保持排序狀態(使用heapq module),並先取出最低值條目。

在內部,這三種型別的佇列使用鎖 (lock) 來暫時阻塞競爭執行緒;但是,它們並不是被設計來處理執行緒內的 reentrancy(可重入)。

此外,此 module 實作一個「簡單」的FIFO 佇列型別SimpleQueue,其特定的實作是以較少的功能為代價,來提供額外的保證。

queue module 定義了以下的 class 和例外:

classqueue.Queue(maxsize=0)

FIFO 佇列的建構子 (constructor)。maxsize 是一個整數,用於設置佇列中可放置的項目數的上限。一旦達到此大小,插入將會阻塞,直到佇列中的項目被消耗。如果maxsize 小於或等於零,則佇列大小為無限。

classqueue.LifoQueue(maxsize=0)

LIFO 佇列的建構子。maxsize 是一個整數,用於設置佇列中可放置的項目數的上限。一旦達到此大小,插入將被鎖定,到佇列中的項目被消耗。如果maxsize 小於或等於零,則佇列大小為無限。

classqueue.PriorityQueue(maxsize=0)

優先佇列的建構子。maxsize 是一個整數,用於設置佇列中可放置的項目數的上限。一旦達到此大小,插入將被阻塞,直到佇列中的項目被消耗。如果maxsize 小於或等於零,則佇列大小為無限。

最低值的條目會最先被取出(最低值的條目是被會min(entries) 回傳的那一個)。條目的典型模式是格式為(priority_number,data) 的 tuple(元組)。

如果data 元素為不可比較的,則可以將資料包裝在一個 class 中,該 class 忽略資料項目並僅比較優先數:

fromdataclassesimportdataclass,fieldfromtypingimportAny@dataclass(order=True)classPrioritizedItem:priority:intitem:Any=field(compare=False)
classqueue.SimpleQueue

無界的FIFO 佇列的建構子。簡單佇列缺少任務追蹤等進階功能。

exceptionqueue.Empty

當對一個空的Queue 物件呼叫非阻塞的 (non-blocking)get()(或get_nowait())將引發此例外。

exceptionqueue.Full

當對一個已滿的Queue 物件呼叫非阻塞的put()(或put_nowait())將引發此例外。

exceptionqueue.ShutDown

當對一個已關閉的Queue 物件呼叫put()get() 時將引發此例外。

在 3.13 版被加入.

佇列物件

佇列物件(QueueLifoQueuePriorityQueue)提供下面描述的公用 method。

Queue.qsize()

回傳佇列的近似大小。注意,qsize() > 0 並不能保證後續的 get() 不會阻塞,qsize() < maxsize 也不會保證 put() 不會阻塞。

Queue.empty()

如果佇列為空,則回傳True,否則回傳False。如果 empty() 回傳True,則不保證後續呼叫 put() 不會阻塞。同樣,如果 empty() 回傳False,則不保證後續呼叫 get() 不會阻塞。

Queue.full()

如果佇列已滿,則回傳True ,否則回傳False。如果 full() 回傳True,則不保證後續呼叫 get() 不會阻塞。同樣,如果 full() 回傳False,則不保證後續呼叫 put() 不會阻塞。

Queue.put(item,block=True,timeout=None)

item 放入佇列中。如果可選的 argsblock 為 true、timeoutNone(預設值),則在必要時阻塞,直到自由槽 (free slot) 可用。如果timeout 為正數,則最多阻塞timeout 秒,如果該時間內沒有可用的自由槽,則會引發Full 例外。否則(block 為 false),如果自由槽立即可用,則將項目放在佇列中,否則引發Full 例外(在這種情況下,timeout 將被忽略)。

如果佇列已被關閉,則引發ShutDown

Queue.put_nowait(item)

等效於put(item,block=False)

Queue.get(block=True,timeout=None)

從佇列中移除並回傳一個項目。如果可選的 argsblock 為 true,且timeoutNone(預設值),則在必要時阻塞,直到有可用的項目。如果timeout 是正數,則最多會阻塞timeout 秒,如果該時間內沒有可用的項目,則會引發Empty 例外。否則(block 為 false),如果立即可用,則回傳一個項目,否則引發Empty 例外(在這種情況下,timeout 將被忽略)。

在 POSIX 系統的 3.0 版之前,以及 Windows 的所有版本,如果block 為 true 且timeoutNone,則此操作將在底層鎖上進入不間斷等待。這意味著不會發生例外,特別是 SIGINT(中斷訊號)不會觸發KeyboardInterrupt

如果佇列已被關閉且為空,或佇列已立即關閉,則引發ShutDown

Queue.get_nowait()

等效於get(False)

有兩個 method 可以支援追蹤放入佇列的任務是否已由常駐消費者執行緒 (daemon consumer threads) 完全處理。

Queue.task_done()

表示先前放入佇列的任務已完成。由佇列消費者執行緒使用。對於用來提取任務的每個get(),隨後呼叫task_done() 告訴佇列任務的處理已完成。

如果目前join() 阻塞,它將會在所有項目都已處理完畢後恢復(代表對於以put() 放進佇列的每個項目,都要收到task_done() 的呼叫)。

如果呼叫次數超過佇列中放置的項目數量,則引發ValueError

Queue.join()

持續阻塞直到佇列中的所有項目都已被取得並處理完畢。

每當項目被加到佇列中時,未完成任務的計數都會增加。每當消費者執行緒呼叫task_done() 以指示該項目已被取出並且對其的所有工作都已完成時,計數就會下降。當未完成任務的計數降至零時,join() 將停止阻塞。

Waiting for task completion

如何等待放入佇列的任務完成的範例:

importthreadingimportqueueq=queue.Queue()defworker():whileTrue:item=q.get()print(f'Working on{item}')print(f'Finished{item}')q.task_done()# Turn-on the worker thread.threading.Thread(target=worker,daemon=True).start()# Send thirty task requests to the worker.foriteminrange(30):q.put(item)# Block until all tasks are done.q.join()print('All work completed')

Terminating queues

When no longer needed,Queue objects can be wound downuntil empty or terminated immediately with a hard shutdown.

Queue.shutdown(immediate=False)

Put aQueue instance into a shutdown mode.

The queue can no longer grow.Future calls toput() raiseShutDown.Currently blocked callers ofput() will be unblockedand will raiseShutDown in the formerly blocked thread.

Ifimmediate is false (the default), the queue can be wounddown normally withget() calls to extract tasksthat have already been loaded.

And iftask_done() is called for each remaining task, apendingjoin() will be unblocked normally.

Once the queue is empty, future calls toget() willraiseShutDown.

Ifimmediate is true, the queue is terminated immediately.The queue is drained to be completely empty. All callers ofjoin() are unblocked regardless of the numberof unfinished tasks. Blocked callers ofget()are unblocked and will raiseShutDown because thequeue is empty.

Use caution when usingjoin() withimmediate setto true. This unblocks the join even when no work has been doneon the tasks, violating the usual invariant for joining a queue.

在 3.13 版被加入.

SimpleQueue 物件

SimpleQueue 物件提供下面描述的公用 method。

SimpleQueue.qsize()

傳回佇列的近似大小。注意,qsize() > 0 並不能保證後續的 get() 不會阻塞。

SimpleQueue.empty()

如果佇列為空,則回傳True,否則回傳False。如果 empty() 回傳False,則不保證後續呼叫 get() 不會阻塞。

SimpleQueue.put(item,block=True,timeout=None)

item 放入佇列中。此 method 從不阻塞,並且都會成功(除了潛在的低階錯誤,像是分配記憶體失敗)。可選的 argsblocktimeout 會被忽略,它們僅是為了與Queue.put() 相容才存在。

此 method 有一個可重入 (reentrant) 的 C 實作。意思就是,一個put()get() 呼叫,可以被同一執行緒中的另一個put() 呼叫中斷,而不會造成死鎖 (deadlock) 或損壞佇列中的內部狀態。這使得它適合在解構子 (destructor) 中使用,像是__del__ method 或weakref 回呼函式 (callback)。

SimpleQueue.put_nowait(item)

等效於put(item,block=False),用於與Queue.put_nowait() 相容。

SimpleQueue.get(block=True,timeout=None)

從佇列中移除並回傳一個項目。如果可選的 argsblock 為 true,且timeoutNone(預設值),則在必要時阻塞,直到有可用的項目。如果timeout 是正數,則最多會阻塞timeout 秒,如果該時間內沒有可用的項目,則會引發Empty 例外。否則(block 為 false),如果立即可用,則回傳一個項目,否則引發Empty 例外(在這種情況下,timeout 將被忽略)。

SimpleQueue.get_nowait()

等效於get(False)

也參考

Classmultiprocessing.Queue

用於多行程處理 (multi-processing)(而非多執行緒)情境 (context) 的佇列 class。

collections.deque 是無界佇列的替代實作,有快速且具原子性 (atomic) 的append()popleft() 操作,這些操作不需要鎖定,並且還支持索引。