Movatterモバイル変換


[0]ホーム

URL:


HappyCoders logo
ArrayBlockingQueue - Feature ImageArrayBlockingQueue - Feature Image
HappyCoders Glasses

JavaArrayBlockingQueue Whatisit for?+Howtouseit?

Sven Woltmann
Sven Woltmann
Last update: December 2, 2024
Sven WoltmannSven WoltmannApril 26, 2022

This article is about theArrayBlockingQueue and its properties. You will see how theArrayBlockingQueue is used with an example. I will also give you a recommendation in which cases you should use this queue.

Here we are in the class hierarchy:

ArrayBlockingQueue in the class hierarchy
ArrayBlockingQueue in the class hierarchy

ArrayBlockingQueue Characteristics

The classjava.util.concurrent.ArrayBlockingQueue is based on an array and – like most queue implementations – is thread-safe (see below). It is bounded (has a maximum capacity), accordingly blocking, and provides a fairness policy (i.e., blocking methods are served in the order they were called).

The characteristics at a glance:

Underlying data structureThread-safe?Blocking/
non-blocking
Fairness
policy
Bounded/
unbounded
Iterator type
ArrayYes
(pessimistic locking with a lock)
BlockingOptionalBoundedWeakly consistent¹

¹ Weakly consistent: All elements that exist when the iterator is created are traversed by the iterator exactly once. Changes that occur after this can, but do not need to, be reflected by the iterator.

Recommended Use Case

Due to the possibly high contention with simultaneous read and write access, you should – if you need a blocking, thread-safe queue – test whether aLinkedBlockingQueue is more performant for your specific purpose. While this queue is based on a linked list, it uses two separateReentrantLocks for writing and reading, which reduces access conflicts.

ArrayBlockingQueue Example

In the following example, we create anArrayBlockingQueue with capacity 3. Then we have aScheduledExecutorService write and read elements to and from the queue at specified intervals (→code on GitHub):

publicclassArrayBlockingQueueExample{privatestaticfinallong startTime = System.currentTimeMillis();publicstaticvoidmain(String[] args)throws InterruptedException{    BlockingQueue<Integer> queue =new ArrayBlockingQueue<>(3);    ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);// Start reading from the queue immediately, every 3 secondsfor (int i =0; i <10; i++) {int delaySeconds = i *3;      pool.schedule(() -> dequeue(queue), delaySeconds, TimeUnit.SECONDS);    }// Start writing to the queue after 3.5 seconds (so there are already 2 threads// waiting), every 1 seconds (so that the queue fills faster than it's emptied,// so that we see a full queue soon)for (int i =0; i <10; i++) {int element = i;// Assign to an effectively final variableint delayMillis =3500 + i *1000;      pool.schedule(() -> enqueue(queue, element), delayMillis, TimeUnit.MILLISECONDS);    }    pool.shutdown();    pool.awaitTermination(1, TimeUnit.MINUTES);  }privatestaticvoidenqueue(BlockingQueue<Integer> queue,int element){    log("Calling queue.put(%d) (queue = %s)...", element, queue);try {      queue.put(element);      log("queue.put(%d) returned (queue = %s)", element, queue);    }catch (InterruptedException e) {      Thread.currentThread().interrupt();    }  }privatestaticvoiddequeue(BlockingQueue<Integer> queue){    log("    Calling queue.take() (queue = %s)...", queue);try {      Integer element = queue.take();      log("    queue.take() returned %d (queue = %s)", element, queue);    }catch (InterruptedException e) {      Thread.currentThread().interrupt();    }  }privatestaticvoidlog(String format, Object... args){    System.out.printf(        Locale.US,"[%4.1fs] [%-16s] %s%n",        (System.currentTimeMillis() - startTime) /1000.0,        Thread.currentThread().getName(),        String.format(format, args));  }}Code language:Java(java)

We try to read an element from the queue every three seconds, starting immediately. We write the elements every second but do not start until 3.5 s have passed. At this point, two reading threads should have already blocked and are waiting for elements to be written to the queue.

Since we write faster than we read, the queue should soon reach its capacity limit. The writing threads should block from that moment until the reading threads have caught up.

Here is a sample output:

[ 0.0s] [pool-1-thread-1 ]     Calling queue.take() (queue = [])...[ 3.0s] [pool-1-thread-2 ]     Calling queue.take() (queue = [])...[ 3.5s] [pool-1-thread-3 ] Calling queue.put(0) (queue = [])...[ 3.5s] [pool-1-thread-3 ] queue.put(0) returned (queue = [])[ 3.5s] [pool-1-thread-1 ]     queue.take() returned 0 (queue = [])[ 4.5s] [pool-1-thread-9 ] Calling queue.put(1) (queue = [])...[ 4.5s] [pool-1-thread-9 ] queue.put(1) returned (queue = [])[ 4.5s] [pool-1-thread-2 ]     queue.take() returned 1 (queue = [])[ 5.5s] [pool-1-thread-7 ] Calling queue.put(2) (queue = [])...[ 5.5s] [pool-1-thread-7 ] queue.put(2) returned (queue = [2])[ 6.0s] [pool-1-thread-8 ]     Calling queue.take() (queue = [2])...[ 6.0s] [pool-1-thread-8 ]     queue.take() returned 2 (queue = [])[ 6.5s] [pool-1-thread-5 ] Calling queue.put(3) (queue = [])...[ 6.5s] [pool-1-thread-5 ] queue.put(3) returned (queue = [3])[ 7.5s] [pool-1-thread-4 ] Calling queue.put(4) (queue = [3])...[ 7.5s] [pool-1-thread-4 ] queue.put(4) returned (queue = [3, 4])[ 8.5s] [pool-1-thread-10] Calling queue.put(5) (queue = [3, 4])...[ 8.5s] [pool-1-thread-10] queue.put(5) returned (queue = [3, 4, 5])[ 9.0s] [pool-1-thread-6 ]     Calling queue.take() (queue = [3, 4, 5])...[ 9.0s] [pool-1-thread-6 ]     queue.take() returned 3 (queue = [4, 5])[ 9.5s] [pool-1-thread-3 ] Calling queue.put(6) (queue = [4, 5])...[ 9.5s] [pool-1-thread-3 ] queue.put(6) returned (queue = [4, 5, 6])[10.5s] [pool-1-thread-1 ] Calling queue.put(7) (queue = [4, 5, 6])...[11.5s] [pool-1-thread-9 ] Calling queue.put(8) (queue = [4, 5, 6])...[12.0s] [pool-1-thread-2 ]     Calling queue.take() (queue = [4, 5, 6])...[12.0s] [pool-1-thread-2 ]     queue.take() returned 4 (queue = [5, 6, 7])[12.0s] [pool-1-thread-1 ] queue.put(7) returned (queue = [5, 6, 7])[12.5s] [pool-1-thread-7 ] Calling queue.put(9) (queue = [5, 6, 7])...[15.0s] [pool-1-thread-8 ]     Calling queue.take() (queue = [5, 6, 7])...[15.0s] [pool-1-thread-8 ]     queue.take() returned 5 (queue = [6, 7, 8])[15.0s] [pool-1-thread-9 ] queue.put(8) returned (queue = [6, 7, 8])[18.0s] [pool-1-thread-5 ]     Calling queue.take() (queue = [6, 7, 8])...[18.0s] [pool-1-thread-5 ]     queue.take() returned 6 (queue = [7, 8, 9])[18.0s] [pool-1-thread-7 ] queue.put(9) returned (queue = [7, 8, 9])[21.0s] [pool-1-thread-4 ]     Calling queue.take() (queue = [7, 8, 9])...[21.0s] [pool-1-thread-4 ]     queue.take() returned 7 (queue = [8, 9])[24.0s] [pool-1-thread-10]     Calling queue.take() (queue = [8, 9])...[24.0s] [pool-1-thread-10]     queue.take() returned 8 (queue = [9])[27.0s] [pool-1-thread-6 ]     Calling queue.take() (queue = [9])...[27.0s] [pool-1-thread-6 ]     queue.take() returned 9 (queue = [])Code language:plaintext(plaintext)

As predicted, the first two read attempts block at 0.0 s and 3.0 s because no elements have yet been written to the queue.

After 3.5 s, the first element is written, which wakes up the first thread and removes this element again. After 4.5 s, the second element is written, waking up the second thread to remove the element.

Since the program writes faster than it reads, after 10.5 s, thread 1 blocks, after 11.5 s, thread 9 blocks, and after 12.5 s, thread 7 blocks when trying to write additional elements into the queue, which is full at that time.

After 12.0 s, an element is removed, and thread 1 can continue writing. After 15.0 s, another element is taken, and thread 9 can continue. After 18.0 s, thread 7 can continue.

Since no other elements are written to the queue, it empties again towards the end.

Is ArrayBlockingQueue Thread-Safe?

Yes,ArrayBlockingQueue is thread-safe.

A single ReentrantLock maintainsArrayBlockingQueue's thread-safety. It is used for the queue's head and tail simultaneously so that access conflicts ("thread contention") between producer and consumer threads can occur in case of simultaneous read and write accesses.

Explicit locks such asReentrantLock are mainly suitable for high-contention applications. Optimistic locking is better for low to moderate thread contention.

Differences from other queues:

  • WithLinkedBlockingQueue, thread safety is provided by not one but two locks. Thus, producer and consumer threads cannot block each other.
  • WithConcurrentLinkedQueue, thread safety is provided by optimistic locking via compare-and-set, resulting in better performance with low to moderate contention.

Summary and Outlook

This article has introduced you to theArrayBlockingQueue. This queue is thread-safe, blocking, and bounded. With an example, you have seen how you can useArrayBlockingQueue.

As the name suggests, this queue is based on an array. The linked list-based counterpart –LinkedBlockingQueue – was covered in the previous part of the series.

The next part of the series is aboutPriorityBlockingQueue – a thread-safe and blocking variant of thePriorityQueue presented previously.

If you still have questions, please ask them via the comment function. Do you want to be informed about new tutorials and articles? Thenclick here to sign up for the HappyCoders.eu newsletter.

ShareTweetShareShare

About the Author

As aconsultant and trainer with over 20 years of experience, I support Java teams in building modern backends – with a focus on performance, scalability, and maintainability.

I analyze bottlenecks, optimize existing systems, and share deep technical knowledge through hands-on, practical training.

More about me »Request consulting »Java trainings »

Leave a ReplyCancel reply

Your email address will not be published.Required fields are marked*

You might also like the following articles


[8]ページ先頭

©2009-2025 Movatter.jp