- Notifications
You must be signed in to change notification settings - Fork0
License
cenotelie/onering
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
High throughput synchronous queue and channels.The implementation of the queue is freely inspired by theLMAX Disruptor.As in a typical disruptor fashion, consumers see all items pushed onto the queue.The implementation is then better suited for dispatching all items to all consumers.
Therefore, the queue provided here do not allow sending the ownership of queued items onto other threads.Instead, receivers (consumers) will only see immutable references to the items.When items can be copied (implementsCopy
), copies can be obtained instead.
Create a queue with a single producer and 5 event consumers.
use std::sync::Arc;use onering::errors::TryRecvError;use onering::queue::{Consumer,ConsumerMode,RingBuffer,SingleProducer};let ring =Arc::new(RingBuffer::<usize,_>::new_single_producer(256));letmut consumers =(0..5).map(|_|Consumer::new(ring.clone(),ConsumerMode::Blocking)).collect::<Vec<_>>();letmut producer =SingleProducer::new(ring);let consumer_threads =(0..5).map(|_|{letmut consumer = consumers.pop().unwrap(); std::thread::spawn({move ||{letmut count =0;loop{match consumer.try_recv(){Ok(items) =>{for _itemin items{// handle item}},Err(TryRecvError::Disconnected) =>{break;}Err(_) =>{/* retry */}}}}})}).collect::<Vec<_>>();for itemin0..1000{while producer.try_push(item).is_err(){}}drop(producer);// so that `TryRecvError::Disconnected` is raisedfor consumerin consumer_threads{ consumer.join().unwrap();}
onering
is compatible withno-std
context, having astd
feature which is activated by default.To useonering
without thestd
, deactivate the default features in yourCargo.toml
file.
Contributions are welcome!
Open a ticket, ask a question or submit a pull request.
This project is licensed under theMIT license.