queue
--- 同步佇列 (synchronized queue) 類別¶
原始碼:Lib/queue.py
queue
module(模組)實作多生產者、多消費者佇列。在執行緒程式設計中,必須在多執行緒之間安全地交換資訊時,特別有用。此 module 中的Queue
class 實作所有必需的鎖定語義 (locking semantics)。
此 module 實作三種型別的佇列,它們僅在取出條目的順序上有所不同。在FIFO 佇列中,先加入的任務是第一個被取出的。在LIFO 佇列中,最近被加入的條目是第一個被取出的(像堆疊 (stack) 一樣操作)。使用優先佇列 (priority queue) 時,條目將保持排序狀態(使用heapq
module),並先取出最低值條目。
在內部,這三種型別的佇列使用鎖 (lock) 來暫時阻塞競爭執行緒;但是,它們並不是被設計來處理執行緒內的 reentrancy(可重入)。
此外,此 module 實作一個「簡單」的FIFO 佇列型別SimpleQueue
,其特定的實作是以較少的功能為代價,來提供額外的保證。
queue
module 定義了以下的 class 和例外:
- classqueue.Queue(maxsize=0)¶
- classqueue.LifoQueue(maxsize=0)¶
LIFO 佇列的建構子。maxsize 是一個整數,用於設置佇列中可放置的項目數的上限。一旦達到此大小,插入將被鎖定,到佇列中的項目被消耗。如果maxsize 小於或等於零,則佇列大小為無限。
- classqueue.PriorityQueue(maxsize=0)¶
優先佇列的建構子。maxsize 是一個整數,用於設置佇列中可放置的項目數的上限。一旦達到此大小,插入將被阻塞,直到佇列中的項目被消耗。如果maxsize 小於或等於零,則佇列大小為無限。
最低值的條目會最先被取出(最低值的條目是被會
min(entries)
回傳的那一個)。條目的典型模式是格式為(priority_number,data)
的 tuple(元組)。如果data 元素為不可比較的,則可以將資料包裝在一個 class 中,該 class 忽略資料項目並僅比較優先數:
fromdataclassesimportdataclass,fieldfromtypingimportAny@dataclass(order=True)classPrioritizedItem:priority:intitem:Any=field(compare=False)
- classqueue.SimpleQueue¶
無界的FIFO 佇列的建構子。簡單佇列缺少任務追蹤等進階功能。
在 3.7 版被加入.
- exceptionqueue.Empty¶
當對一個空的
Queue
物件呼叫非阻塞的 (non-blocking)get()
(或get_nowait()
)將引發此例外。
- exceptionqueue.Full¶
當對一個已滿的
Queue
物件呼叫非阻塞的put()
(或put_nowait()
)將引發此例外。
- exceptionqueue.ShutDown¶
Exception raised when
put()
orget()
is called onaQueue
object which has been shut down.在 3.13 版被加入.
佇列物件¶
佇列物件(Queue
、LifoQueue
、PriorityQueue
)提供下面描述的公用 method。
- Queue.qsize()¶
回傳佇列的近似大小。注意,qsize() > 0 並不能保證後續的 get() 不會阻塞,qsize() < maxsize 也不會保證 put() 不會阻塞。
- Queue.empty()¶
如果佇列為空,則回傳
True
,否則回傳False
。如果 empty() 回傳True
,則不保證後續呼叫 put() 不會阻塞。同樣,如果 empty() 回傳False
,則不保證後續呼叫 get() 不會阻塞。
- Queue.full()¶
如果佇列已滿,則回傳
True
,否則回傳False
。如果 full() 回傳True
,則不保證後續呼叫 get() 不會阻塞。同樣,如果 full() 回傳False
,則不保證後續呼叫 put() 不會阻塞。
- Queue.put(item,block=True,timeout=None)¶
將item 放入佇列中。如果可選的 argsblock 為 true、timeout 為
None
(預設值),則在必要時阻塞,直到自由槽 (free slot) 可用。如果timeout 為正數,則最多阻塞timeout 秒,如果該時間內沒有可用的自由槽,則會引發Full
例外。否則(block 為 false),如果自由槽立即可用,則將項目放在佇列中,否則引發Full
例外(在這種情況下,timeout 將被忽略)。Raises
ShutDown
if the queue has been shut down.
- Queue.put_nowait(item)¶
等效於
put(item,block=False)
。
- Queue.get(block=True,timeout=None)¶
從佇列中移除並回傳一個項目。如果可選的 argsblock 為 true,且timeout 為
None
(預設值),則在必要時阻塞,直到有可用的項目。如果timeout 是正數,則最多會阻塞timeout 秒,如果該時間內沒有可用的項目,則會引發Empty
例外。否則(block 為 false),如果立即可用,則回傳一個項目,否則引發Empty
例外(在這種情況下,timeout 將被忽略)。在 POSIX 系統的 3.0 版之前,以及 Windows 的所有版本,如果block 為 true 且timeout 為
None
,則此操作將在底層鎖上進入不間斷等待。這意味著不會發生例外,特別是 SIGINT(中斷訊號)不會觸發KeyboardInterrupt
。Raises
ShutDown
if the queue has been shut down and is empty, or ifthe queue has been shut down immediately.
- Queue.get_nowait()¶
等效於
get(False)
。
有兩個 method 可以支援追蹤放入佇列的任務是否已由常駐消費者執行緒 (daemon consumer threads) 完全處理。
- Queue.task_done()¶
表示先前放入佇列的任務已完成。由佇列消費者執行緒使用。對於用來提取任務的每個
get()
,隨後呼叫task_done()
告訴佇列任務的處理已完成。如果目前
join()
阻塞,它將會在所有項目都已處理完畢後恢復(代表對於以put()
放進佇列的每個項目,都要收到task_done()
的呼叫)。shutdown(immediate=True)
callstask_done()
for each remaining itemin the queue.如果呼叫次數超過佇列中放置的項目數量,則引發
ValueError
。
- Queue.join()¶
持續阻塞直到佇列中的所有項目都已被取得並處理完畢。
每當項目被加到佇列中時,未完成任務的計數都會增加。每當消費者執行緒呼叫
task_done()
以指示該項目已被取出並且對其的所有工作都已完成時,計數就會下降。當未完成任務的計數降至零時,join()
將停止阻塞。
如何等待放入佇列的任務完成的範例:
importthreadingimportqueueq=queue.Queue()defworker():whileTrue:item=q.get()print(f'Working on{item}')print(f'Finished{item}')q.task_done()# Turn-on the worker thread.threading.Thread(target=worker,daemon=True).start()# Send thirty task requests to the worker.foriteminrange(30):q.put(item)# Block until all tasks are done.q.join()print('All work completed')
Terminating queues¶
Queue
objects can be made to prevent further interaction by shuttingthem down.
- Queue.shutdown(immediate=False)¶
Shut down the queue, making
get()
andput()
raiseShutDown
.By default,
get()
on a shut down queue will only raise once thequeue is empty. Setimmediate to true to makeget()
raiseimmediately instead.All blocked callers of
put()
andget()
will beunblocked. Ifimmediate is true, a task will be marked as done for eachremaining item in the queue, which may unblock callers ofjoin()
.在 3.13 版被加入.
SimpleQueue 物件¶
SimpleQueue
物件提供下面描述的公用 method。
- SimpleQueue.qsize()¶
傳回佇列的近似大小。注意,qsize() > 0 並不能保證後續的 get() 不會阻塞。
- SimpleQueue.empty()¶
如果佇列為空,則回傳
True
,否則回傳False
。如果 empty() 回傳False
,則不保證後續呼叫 get() 不會阻塞。
- SimpleQueue.put(item,block=True,timeout=None)¶
將item 放入佇列中。此 method 從不阻塞,並且都會成功(除了潛在的低階錯誤,像是分配記憶體失敗)。可選的 argsblock 和timeout 會被忽略,它們僅是為了與
Queue.put()
相容才存在。此 method 有一個可重入 (reentrant) 的 C 實作。意思就是,一個
put()
或get()
呼叫,可以被同一執行緒中的另一個put()
呼叫中斷,而不會造成死鎖 (deadlock) 或損壞佇列中的內部狀態。這使得它適合在解構子 (destructor) 中使用,像是__del__
method 或weakref
回呼函式 (callback)。
- SimpleQueue.put_nowait(item)¶
等效於
put(item,block=False)
,用於與Queue.put_nowait()
相容。
- SimpleQueue.get(block=True,timeout=None)¶
從佇列中移除並回傳一個項目。如果可選的 argsblock 為 true,且timeout 為
None
(預設值),則在必要時阻塞,直到有可用的項目。如果timeout 是正數,則最多會阻塞timeout 秒,如果該時間內沒有可用的項目,則會引發Empty
例外。否則(block 為 false),如果立即可用,則回傳一個項目,否則引發Empty
例外(在這種情況下,timeout 將被忽略)。
- SimpleQueue.get_nowait()¶
等效於
get(False)
。
也參考
- Class
multiprocessing.Queue
用於多行程處理 (multi-processing)(而非多執行緒)情境 (context) 的佇列 class。
collections.deque
是無界佇列的替代實作,有快速且具原子性 (atomic) 的append()
和popleft()
操作,這些操作不需要鎖定,並且還支持索引。