queue --- 同步佇列 (synchronized queue) 類別

原始碼:Lib/queue.py


queue module(模組)實作多生產者、多消費者佇列。在執行緒程式設計中,必須在多執行緒之間安全地交換資訊時,特別有用。此 module 中的Queue class 實作所有必需的鎖定語義 (locking semantics)。

此 module 實作三種型別的佇列,它們僅在取出條目的順序上有所不同。在FIFO 佇列中,先加入的任務是第一個被取出的。在LIFO 佇列中,最近被加入的條目是第一個被取出的(像堆疊 (stack) 一樣操作)。使用優先佇列 (priority queue) 時,條目將保持排序狀態(使用heapq module),並先取出最低值條目。

在內部,這三種型別的佇列使用鎖 (lock) 來暫時阻塞競爭執行緒;但是,它們並不是被設計來處理執行緒內的 reentrancy(可重入)。

此外,此 module 實作一個「簡單」的FIFO 佇列型別SimpleQueue,其特定的實作是以較少的功能為代價,來提供額外的保證。

queue module 定義了以下的 class 和例外:

classqueue.Queue(maxsize=0)

FIFO 佇列的建構子 (constructor)。maxsize 是一個整數,用於設置佇列中可放置的項目數的上限。一旦達到此大小,插入將會阻塞,直到佇列中的項目被消耗。如果maxsize 小於或等於零,則佇列大小為無限。

classqueue.LifoQueue(maxsize=0)

LIFO 佇列的建構子。maxsize 是一個整數,用於設置佇列中可放置的項目數的上限。一旦達到此大小,插入將被鎖定,到佇列中的項目被消耗。如果maxsize 小於或等於零,則佇列大小為無限。

classqueue.PriorityQueue(maxsize=0)

優先佇列的建構子。maxsize 是一個整數,用於設置佇列中可放置的項目數的上限。一旦達到此大小,插入將被阻塞,直到佇列中的項目被消耗。如果maxsize 小於或等於零,則佇列大小為無限。

最低值的條目會最先被取出(最低值的條目是被會min(entries) 回傳的那一個)。條目的典型模式是格式為(priority_number,data) 的 tuple(元組)。

如果data 元素為不可比較的,則可以將資料包裝在一個 class 中,該 class 忽略資料項目並僅比較優先數:

fromdataclassesimportdataclass,fieldfromtypingimportAny@dataclass(order=True)classPrioritizedItem:priority:intitem:Any=field(compare=False)
classqueue.SimpleQueue

無界的FIFO 佇列的建構子。簡單佇列缺少任務追蹤等進階功能。

exceptionqueue.Empty

當對一個空的Queue 物件呼叫非阻塞的 (non-blocking)get()(或get_nowait())將引發此例外。

exceptionqueue.Full

當對一個已滿的Queue 物件呼叫非阻塞的put()(或put_nowait())將引發此例外。

exceptionqueue.ShutDown

Exception raised whenput() orget() is called onaQueue object which has been shut down.

在 3.13 版被加入.

佇列物件

佇列物件(QueueLifoQueuePriorityQueue)提供下面描述的公用 method。

Queue.qsize()

回傳佇列的近似大小。注意,qsize() > 0 並不能保證後續的 get() 不會阻塞,qsize() < maxsize 也不會保證 put() 不會阻塞。

Queue.empty()

如果佇列為空,則回傳True,否則回傳False。如果 empty() 回傳True,則不保證後續呼叫 put() 不會阻塞。同樣,如果 empty() 回傳False,則不保證後續呼叫 get() 不會阻塞。

Queue.full()

如果佇列已滿,則回傳True ,否則回傳False。如果 full() 回傳True,則不保證後續呼叫 get() 不會阻塞。同樣,如果 full() 回傳False,則不保證後續呼叫 put() 不會阻塞。

Queue.put(item,block=True,timeout=None)

item 放入佇列中。如果可選的 argsblock 為 true、timeoutNone(預設值),則在必要時阻塞,直到自由槽 (free slot) 可用。如果timeout 為正數,則最多阻塞timeout 秒,如果該時間內沒有可用的自由槽,則會引發Full 例外。否則(block 為 false),如果自由槽立即可用,則將項目放在佇列中,否則引發Full 例外(在這種情況下,timeout 將被忽略)。

RaisesShutDown if the queue has been shut down.

Queue.put_nowait(item)

等效於put(item,block=False)

Queue.get(block=True,timeout=None)

從佇列中移除並回傳一個項目。如果可選的 argsblock 為 true,且timeoutNone(預設值),則在必要時阻塞,直到有可用的項目。如果timeout 是正數,則最多會阻塞timeout 秒,如果該時間內沒有可用的項目,則會引發Empty 例外。否則(block 為 false),如果立即可用,則回傳一個項目,否則引發Empty 例外(在這種情況下,timeout 將被忽略)。

在 POSIX 系統的 3.0 版之前,以及 Windows 的所有版本,如果block 為 true 且timeoutNone,則此操作將在底層鎖上進入不間斷等待。這意味著不會發生例外,特別是 SIGINT(中斷訊號)不會觸發KeyboardInterrupt

RaisesShutDown if the queue has been shut down and is empty, or ifthe queue has been shut down immediately.

Queue.get_nowait()

等效於get(False)

有兩個 method 可以支援追蹤放入佇列的任務是否已由常駐消費者執行緒 (daemon consumer threads) 完全處理。

Queue.task_done()

表示先前放入佇列的任務已完成。由佇列消費者執行緒使用。對於用來提取任務的每個get(),隨後呼叫task_done() 告訴佇列任務的處理已完成。

如果目前join() 阻塞,它將會在所有項目都已處理完畢後恢復(代表對於以put() 放進佇列的每個項目,都要收到task_done() 的呼叫)。

shutdown(immediate=True) callstask_done() for each remaining itemin the queue.

如果呼叫次數超過佇列中放置的項目數量,則引發ValueError

Queue.join()

持續阻塞直到佇列中的所有項目都已被取得並處理完畢。

每當項目被加到佇列中時,未完成任務的計數都會增加。每當消費者執行緒呼叫task_done() 以指示該項目已被取出並且對其的所有工作都已完成時,計數就會下降。當未完成任務的計數降至零時,join() 將停止阻塞。

如何等待放入佇列的任務完成的範例:

importthreadingimportqueueq=queue.Queue()defworker():whileTrue:item=q.get()print(f'Working on{item}')print(f'Finished{item}')q.task_done()# Turn-on the worker thread.threading.Thread(target=worker,daemon=True).start()# Send thirty task requests to the worker.foriteminrange(30):q.put(item)# Block until all tasks are done.q.join()print('All work completed')

Terminating queues

Queue objects can be made to prevent further interaction by shuttingthem down.

Queue.shutdown(immediate=False)

Shut down the queue, makingget() andput() raiseShutDown.

By default,get() on a shut down queue will only raise once thequeue is empty. Setimmediate to true to makeget() raiseimmediately instead.

All blocked callers ofput() andget() will beunblocked. Ifimmediate is true, a task will be marked as done for eachremaining item in the queue, which may unblock callers ofjoin().

在 3.13 版被加入.

SimpleQueue 物件

SimpleQueue 物件提供下面描述的公用 method。

SimpleQueue.qsize()

傳回佇列的近似大小。注意,qsize() > 0 並不能保證後續的 get() 不會阻塞。

SimpleQueue.empty()

如果佇列為空,則回傳True,否則回傳False。如果 empty() 回傳False,則不保證後續呼叫 get() 不會阻塞。

SimpleQueue.put(item,block=True,timeout=None)

item 放入佇列中。此 method 從不阻塞,並且都會成功(除了潛在的低階錯誤,像是分配記憶體失敗)。可選的 argsblocktimeout 會被忽略,它們僅是為了與Queue.put() 相容才存在。

此 method 有一個可重入 (reentrant) 的 C 實作。意思就是,一個put()get() 呼叫,可以被同一執行緒中的另一個put() 呼叫中斷,而不會造成死鎖 (deadlock) 或損壞佇列中的內部狀態。這使得它適合在解構子 (destructor) 中使用,像是__del__ method 或weakref 回呼函式 (callback)。

SimpleQueue.put_nowait(item)

等效於put(item,block=False),用於與Queue.put_nowait() 相容。

SimpleQueue.get(block=True,timeout=None)

從佇列中移除並回傳一個項目。如果可選的 argsblock 為 true,且timeoutNone(預設值),則在必要時阻塞,直到有可用的項目。如果timeout 是正數,則最多會阻塞timeout 秒,如果該時間內沒有可用的項目,則會引發Empty 例外。否則(block 為 false),如果立即可用,則回傳一個項目,否則引發Empty 例外(在這種情況下,timeout 將被忽略)。

SimpleQueue.get_nowait()

等效於get(False)

也參考

Classmultiprocessing.Queue

用於多行程處理 (multi-processing)(而非多執行緒)情境 (context) 的佇列 class。

collections.deque 是無界佇列的替代實作,有快速且具原子性 (atomic) 的append()popleft() 操作,這些操作不需要鎖定,並且還支持索引。