This is a unbounded FIFO queue that offers "close" semantics.
It is designed to be used with try-with-resources and ephemeral virtual threads to ensure proper clean-up of threads.
It only supports some of the Queue interface methods, so it is not a drop-in replacement, but most of ommitted methods are probably not useful in a high-volume VT environment.
TODO: use read/write lock to improve the concurrency between readers and writers when not using SingleConsumerQueue
TODO: possibly implement the rest of the BlockingQueue interface methods to make it a drop-in replacement.
The code will most likely be structured similar to:
try(varqueue=newClosableQueue<T>()) {Thread.startVirtualThread(newConsumer(queue)); ...put()itemsintoqueuefromsource/generation ...}and the consumer:
for(Te,e=queue.take();) { ...dosomethingwithe ...}or possibly more efficiently:
ArrayList<T>elements =newArrayList();while(queue.drainToBlocking(elements)>0) { ...foreacheinelementsdo ...elements.clear();}Thetake() in consumer will throw anQueueClosedException (subclass ofIllegalStateException) if the queue is closed and all elements from the queue have been processed (i.e. queue is empty and closed).
Multiple producers and consumers are supported. Once the queue is closed, anyput() related methods will fail with aQueueClosedException.
SeeClosableQueue
The library also includes a highly efficient closable queue specifically designed for the case of a single active reader - which is expected to be most of time when using ephemeral virtual thread queues.
try(varqueue=newSingleConsumerQueue<T>()) {Thread.startVirtualThread(newConsumer(queue)); ...put()itemsintoqueuefromsource/generation ...}SeeSingleConsumerQueue
There are jmh benchmarks that test against the standard concurrent queues:
Benchmark Mode Cnt Score Error UnitsClosableQueueBenchmark.testClosableQueue avgt 9 142.820 ± 11.575 ns/opClosableQueueBenchmark.testSingleConsumerQueue avgt 9 71.104 ± 2.867 ns/opClosableQueueBenchmark.testLinkedBlockingQueue avgt 9 168.452 ± 11.434 ns/opClosableQueueBenchmark.testLinkedTransferQueue avgt 9 64.612 ± 0.924 ns/opClosableQueueBenchmark.testLinkedTransferQueueUsingTransfer avgt 9 1687.000 ± 496.429 ns/op
<dependency> <groupId>io.github.robaho</groupId> <artifactId>closablequeue</artifactId> <version>1.0.8</version></dependency>