class Thread::Queue
TheThread::Queue class implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. TheThread::Queue class implements all the required locking semantics.
The class implements FIFO (first in, first out) type of queue. In a FIFO queue, the first tasks added are the first retrieved.
Example:
queue =Thread::Queue.newproducer =Thread.newdo5.timesdo|i|sleeprand(i)# simulate expensequeue<<iputs"#{i} produced"endendconsumer =Thread.newdo5.timesdo|i|value =queue.popsleeprand(i/2)# simulate expenseputs"consumed #{value}"endendconsumer.join
Public Class Methods
Source
static VALUErb_queue_initialize(int argc, VALUE *argv, VALUE self){ VALUE initial; struct rb_queue *q = queue_ptr(self); if ((argc = rb_scan_args(argc, argv, "01", &initial)) == 1) { initial = rb_to_array(initial); } RB_OBJ_WRITE(self, queue_list(q), ary_buf_new()); ccan_list_head_init(queue_waitq(q)); if (argc == 1) { rb_ary_concat(q->que, initial); } return self;}Creates a new queue instance, optionally using the contents of anenumerable for its initial state.
Example:
q =Thread::Queue.new#=> #<Thread::Queue:0x00007ff7501110d0>q.empty?#=> trueq =Thread::Queue.new([1,2,3])#=> #<Thread::Queue:0x00007ff7500ec500>q.empty?#=> falseq.pop#=> 1
Public Instance Methods
Source
static VALUErb_queue_clear(VALUE self){ struct rb_queue *q = queue_ptr(self); rb_ary_clear(check_array(self, q->que)); return self;}Removes all objects from the queue.
Source
static VALUErb_queue_close(VALUE self){ struct rb_queue *q = queue_ptr(self); if (!queue_closed_p(self)) { FL_SET(self, QUEUE_CLOSED); wakeup_all(queue_waitq(q)); } return self;}Closes the queue. A closed queue cannot be re-opened.
After the call to close completes, the following are true:
closed?will return trueclosewill be ignored.calling enq/push/<< will raise a
ClosedQueueError.when
empty?is false, calling deq/pop/shift will return an object from the queue as usual.when
empty?is true, deq(false) will not suspend the thread and will return nil. deq(true) will raise aThreadError.
ClosedQueueError is inherited fromStopIteration, so that you can break loop block.
Example:
q =Thread::Queue.newThread.new{whilee =q.deq# wait for nil to break loop# ...end}q.close
Source
static VALUErb_queue_closed_p(VALUE self){ return RBOOL(queue_closed_p(self));}Returnstrue if the queue is closed.
Source
static VALUErb_queue_empty_p(VALUE self){ return RBOOL(queue_length(self, queue_ptr(self)) == 0);}Returnstrue if the queue is empty.
Source
static VALUErb_queue_freeze(VALUE self){ rb_raise(rb_eTypeError, "cannot freeze " "%+"PRIsVALUE, self); UNREACHABLE_RETURN(self);}The queue can’t be frozen, so this method raises an exception:
Thread::Queue.new.freeze# Raises TypeError (cannot freeze #<Thread::Queue:0x...>)
Source
static VALUErb_queue_length(VALUE self){ return LONG2NUM(queue_length(self, queue_ptr(self)));}Returns the length of the queue.
Source
static VALUErb_queue_num_waiting(VALUE self){ struct rb_queue *q = queue_ptr(self); return INT2NUM(q->num_waiting);}Returns the number of threads waiting on the queue.
Source
# File thread_sync.rb, line 16defpop(non_block =false,timeout:nil)ifnon_block&&timeoutraiseArgumentError,"can't set a timeout if non_block is enabled"endPrimitive.rb_queue_pop(non_block,timeout)end
Retrieves data from the queue.
If the queue is empty, the calling thread is suspended until data is pushed onto the queue. Ifnon_block is true, the thread isn’t suspended, andThreadError is raised.
Iftimeout seconds have passed and no data is availablenil is returned. Iftimeout is0 it returns immediately.
Source
static VALUErb_queue_push(VALUE self, VALUE obj){ return queue_do_push(self, queue_ptr(self), obj);}Pushes the givenobject to the queue.