2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
12 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
13 * array. This queue orders elements FIFO (first-in-first-out). The
14 * <em>head</em> of the queue is that element that has been on the
15 * queue the longest time. The <em>tail</em> of the queue is that
16 * element that has been on the queue the shortest time. New elements
17 * are inserted at the tail of the queue, and the queue retrieval
18 * operations obtain elements at the head of the queue.
20 * <p>This is a classic "bounded buffer", in which a
21 * fixed-sized array holds elements inserted by producers and
22 * extracted by consumers. Once created, the capacity cannot be
23 * increased. Attempts to <tt>put</tt> an element into a full queue
24 * will result in the operation blocking; attempts to <tt>take</tt> an
25 * element from an empty queue will similarly block.
27 * <p> This class supports an optional fairness policy for ordering
28 * waiting producer and consumer threads. By default, this ordering
29 * is not guaranteed. However, a queue constructed with fairness set
30 * to <tt>true</tt> grants threads access in FIFO order. Fairness
31 * generally decreases throughput but reduces variability and avoids
34 * <p>This class and its iterator implement all of the
35 * <em>optional</em> methods of the {@link Collection} and {@link
36 * Iterator} interfaces.
38 * <p>This class is a member of the
39 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
40 * Java Collections Framework</a>.
44 * @param <E> the type of elements held in this collection
46 public class ArrayBlockingQueue/*<E>*/ extends AbstractQueue/*<E>*/
47 //implements BlockingQueue<E>, java.io.Serializable {
51 * Serialization ID. This class relies on default serialization
52 * even for the items array, which is default-serialized, even if
53 * it is empty. Otherwise it could not be declared final, which is
56 private static final long serialVersionUID = -817911632652898426L;
58 /** The queued items */
59 private final Object/*E*/[] items;
60 /** items index for next take, poll or remove */
61 private int takeIndex;
62 /** items index for next put, offer, or add. */
64 /** Number of items in the queue */
68 * Concurrency control uses the classic two-condition algorithm
69 * found in any textbook.
72 /** Main lock guarding all access */
73 private final ReentrantLock lock;
74 /** Condition for waiting takes */
75 private final Condition notEmpty;
76 /** Condition for waiting puts */
77 private final Condition notFull;
79 // Internal helper methods
82 * Circularly increment i.
84 final int inc(int i) {
85 return (++i == items.length)? 0 : i;
89 * Inserts element at current put position, advances, and signals.
90 * Call only when holding lock.
92 private void insert(Object/*E*/ x) {
94 putIndex = inc(putIndex);
100 * Extracts element at current take position, advances, and signals.
101 * Call only when holding lock.
103 private Object/*E*/ extract() {
104 final Object/*E*/[] items = this.items;
105 /*E*/ x = items[takeIndex];
106 items[takeIndex] = null;
107 takeIndex = inc(takeIndex);
114 * Utility for remove and iterator.remove: Delete item at position i.
115 * Call only when holding lock.
117 void removeAt(int i) {
118 final Object/*E*/[] items = this.items;
119 // if removing front item, just advance
120 if (i == takeIndex) {
121 items[takeIndex] = null;
122 takeIndex = inc(takeIndex);
124 // slide over all others up through putIndex.
127 if (nexti != putIndex) {
128 items[i] = items[nexti];
142 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
143 * capacity and default access policy.
145 * @param capacity the capacity of this queue
146 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
148 public ArrayBlockingQueue(int capacity) {
149 this(capacity, false);
153 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
154 * capacity and the specified access policy.
156 * @param capacity the capacity of this queue
157 * @param fair if <tt>true</tt> then queue accesses for threads blocked
158 * on insertion or removal, are processed in FIFO order;
159 * if <tt>false</tt> the access order is unspecified.
160 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
162 public ArrayBlockingQueue(int capacity, boolean fair) {
164 throw new IllegalArgumentException();
165 this.items = /*(E[])*/ new Object[capacity];
166 lock = new ReentrantLock(fair);
167 notEmpty = lock.newCondition();
168 notFull = lock.newCondition();
172 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
173 * capacity, the specified access policy and initially containing the
174 * elements of the given collection,
175 * added in traversal order of the collection's iterator.
177 * @param capacity the capacity of this queue
178 * @param fair if <tt>true</tt> then queue accesses for threads blocked
179 * on insertion or removal, are processed in FIFO order;
180 * if <tt>false</tt> the access order is unspecified.
181 * @param c the collection of elements to initially contain
182 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
183 * <tt>c.size()</tt>, or less than 1.
184 * @throws NullPointerException if the specified collection or any
185 * of its elements are null
187 public ArrayBlockingQueue(int capacity, boolean fair,
188 Collection/*<? extends E>*/ c) {
189 this(capacity, fair);
190 if (capacity < c.size())
191 throw new IllegalArgumentException();
193 for (Iterator/*<? extends E>*/ it = c.iterator(); it.hasNext();)
198 * Inserts the specified element at the tail of this queue if it is
199 * possible to do so immediately without exceeding the queue's capacity,
200 * returning <tt>true</tt> upon success and throwing an
201 * <tt>IllegalStateException</tt> if this queue is full.
203 * @param e the element to add
204 * @return <tt>true</tt> (as specified by {@link Collection#add})
205 * @throws IllegalStateException if this queue is full
206 * @throws NullPointerException if the specified element is null
208 public boolean add(Object/*E*/ e) {
213 * Inserts the specified element at the tail of this queue if it is
214 * possible to do so immediately without exceeding the queue's capacity,
215 * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
216 * is full. This method is generally preferable to method {@link #add},
217 * which can fail to insert an element only by throwing an exception.
219 * @throws NullPointerException if the specified element is null
221 public boolean offer(Object/*E*/ e) {
222 if (e == null) throw new NullPointerException();
223 final ReentrantLock lock = this.lock;
226 if (count == items.length)
238 * Inserts the specified element at the tail of this queue, waiting
239 * for space to become available if the queue is full.
241 * @throws InterruptedException {@inheritDoc}
242 * @throws NullPointerException {@inheritDoc}
244 public void put(Object/*E*/ e) throws InterruptedException {
245 if (e == null) throw new NullPointerException();
246 final Object/*E*/[] items = this.items;
247 final ReentrantLock lock = this.lock;
248 lock.lockInterruptibly();
251 while (count == items.length)
253 } catch (InterruptedException ie) {
254 notFull.signal(); // propagate to non-interrupted thread
264 * Inserts the specified element at the tail of this queue, waiting
265 * up to the specified wait time for space to become available if
268 * @throws InterruptedException {@inheritDoc}
269 * @throws NullPointerException {@inheritDoc}
271 /*public boolean offer(E e, long timeout, TimeUnit unit)
272 throws InterruptedException {
274 if (e == null) throw new NullPointerException();
275 long nanos = unit.toNanos(timeout);
276 final ReentrantLock lock = this.lock;
277 lock.lockInterruptibly();
280 if (count != items.length) {
287 nanos = notFull.awaitNanos(nanos);
288 } catch (InterruptedException ie) {
289 notFull.signal(); // propagate to non-interrupted thread
298 public Object/*E*/ poll() {
299 final ReentrantLock lock = this.lock;
304 Object/*E*/ x = extract();
311 public Object/*E*/ take() throws InterruptedException {
312 final ReentrantLock lock = this.lock;
313 lock.lockInterruptibly();
318 } catch (InterruptedException ie) {
319 notEmpty.signal(); // propagate to non-interrupted thread
322 Object/*E*/ x = extract();
329 /*public E poll(long timeout, TimeUnit unit) throws InterruptedException {
330 long nanos = unit.toNanos(timeout);
331 final ReentrantLock lock = this.lock;
332 lock.lockInterruptibly();
342 nanos = notEmpty.awaitNanos(nanos);
343 } catch (InterruptedException ie) {
344 notEmpty.signal(); // propagate to non-interrupted thread
354 public Object/*E*/ peek() {
355 final ReentrantLock lock = this.lock;
358 return (count == 0) ? null : items[takeIndex];
364 // this doc comment is overridden to remove the reference to collections
365 // greater in size than Integer.MAX_VALUE
367 * Returns the number of elements in this queue.
369 * @return the number of elements in this queue
372 final ReentrantLock lock = this.lock;
381 // this doc comment is a modified copy of the inherited doc comment,
382 // without the reference to unlimited queues.
384 * Returns the number of additional elements that this queue can ideally
385 * (in the absence of memory or resource constraints) accept without
386 * blocking. This is always equal to the initial capacity of this queue
387 * less the current <tt>size</tt> of this queue.
389 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
390 * an element will succeed by inspecting <tt>remainingCapacity</tt>
391 * because it may be the case that another thread is about to
392 * insert or remove an element.
394 public int remainingCapacity() {
395 final ReentrantLock lock = this.lock;
398 return items.length - count;
405 * Removes a single instance of the specified element from this queue,
406 * if it is present. More formally, removes an element <tt>e</tt> such
407 * that <tt>o.equals(e)</tt>, if this queue contains one or more such
409 * Returns <tt>true</tt> if this queue contained the specified element
410 * (or equivalently, if this queue changed as a result of the call).
412 * @param o element to be removed from this queue, if present
413 * @return <tt>true</tt> if this queue changed as a result of the call
415 public boolean remove(Object o) {
416 if (o == null) return false;
417 final Object/*E*/[] items = this.items;
418 final ReentrantLock lock = this.lock;
426 if (o.equals(items[i])) {
439 * Returns <tt>true</tt> if this queue contains the specified element.
440 * More formally, returns <tt>true</tt> if and only if this queue contains
441 * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
443 * @param o object to be checked for containment in this queue
444 * @return <tt>true</tt> if this queue contains the specified element
446 public boolean contains(Object o) {
447 if (o == null) return false;
448 final Object/*E*/[] items = this.items;
449 final ReentrantLock lock = this.lock;
454 while (k++ < count) {
455 if (o.equals(items[i]))
466 * Returns an array containing all of the elements in this queue, in
469 * <p>The returned array will be "safe" in that no references to it are
470 * maintained by this queue. (In other words, this method must allocate
471 * a new array). The caller is thus free to modify the returned array.
473 * <p>This method acts as bridge between array-based and collection-based
476 * @return an array containing all of the elements in this queue
478 public Object[] toArray() {
479 final Object/*E*/[] items = this.items;
480 final ReentrantLock lock = this.lock;
483 Object[] a = new Object[count];
497 * Returns an array containing all of the elements in this queue, in
498 * proper sequence; the runtime type of the returned array is that of
499 * the specified array. If the queue fits in the specified array, it
500 * is returned therein. Otherwise, a new array is allocated with the
501 * runtime type of the specified array and the size of this queue.
503 * <p>If this queue fits in the specified array with room to spare
504 * (i.e., the array has more elements than this queue), the element in
505 * the array immediately following the end of the queue is set to
508 * <p>Like the {@link #toArray()} method, this method acts as bridge between
509 * array-based and collection-based APIs. Further, this method allows
510 * precise control over the runtime type of the output array, and may,
511 * under certain circumstances, be used to save allocation costs.
513 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
514 * The following code can be used to dump the queue into a newly
515 * allocated array of <tt>String</tt>:
518 * String[] y = x.toArray(new String[0]);</pre>
520 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
521 * <tt>toArray()</tt>.
523 * @param a the array into which the elements of the queue are to
524 * be stored, if it is big enough; otherwise, a new array of the
525 * same runtime type is allocated for this purpose
526 * @return an array containing all of the elements in this queue
527 * @throws ArrayStoreException if the runtime type of the specified array
528 * is not a supertype of the runtime type of every element in
530 * @throws NullPointerException if the specified array is null
532 public /*<T> T*/Object[] toArray(Object/*T*/[] a) {
533 final Object/*E*/[] items = this.items;
534 final ReentrantLock lock = this.lock;
537 if (a.length < count)
538 a = /*(T[])java.lang.reflect.Array.newInstance(
539 a.getClass().getComponentType(),
541 )*/new Object[count];
546 a[k++] = /*(T)*/items[i];
549 if (a.length > count)
557 public String toString() {
558 final ReentrantLock lock = this.lock;
561 return super.toString();
568 * Atomically removes all of the elements from this queue.
569 * The queue will be empty after this call returns.
571 public void clear() {
572 final Object/*E*/[] items = this.items;
573 final ReentrantLock lock = this.lock;
592 * @throws UnsupportedOperationException {@inheritDoc}
593 * @throws ClassCastException {@inheritDoc}
594 * @throws NullPointerException {@inheritDoc}
595 * @throws IllegalArgumentException {@inheritDoc}
597 public int drainTo(Collection/*<? super E>*/ c) {
599 throw new NullPointerException();
601 throw new IllegalArgumentException();
602 final Object/*E*/[] items = this.items;
603 final ReentrantLock lock = this.lock;
628 * @throws UnsupportedOperationException {@inheritDoc}
629 * @throws ClassCastException {@inheritDoc}
630 * @throws NullPointerException {@inheritDoc}
631 * @throws IllegalArgumentException {@inheritDoc}
633 public int drainTo(Collection/*<? super E>*/ c, int maxElements) {
635 throw new NullPointerException();
637 throw new IllegalArgumentException();
638 if (maxElements <= 0)
640 final Object/*E*/[] items = this.items;
641 final ReentrantLock lock = this.lock;
647 int max = (maxElements < count)? maxElements : count;
667 * Returns an iterator over the elements in this queue in proper sequence.
668 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
669 * will never throw {@link ConcurrentModificationException},
670 * and guarantees to traverse elements as they existed upon
671 * construction of the iterator, and may (but is not guaranteed to)
672 * reflect any modifications subsequent to construction.
674 * @return an iterator over the elements in this queue in proper sequence
676 public Iterator/*<E>*/ iterator() {
677 final ReentrantLock lock = this.lock;
687 * Iterator for ArrayBlockingQueue
689 private class Itr implements Iterator/*<E>*/ {
691 * Index of element to be returned by next,
692 * or a negative number if no such.
694 private int nextIndex;
697 * nextItem holds on to item fields because once we claim
698 * that an element exists in hasNext(), we must return it in
699 * the following next() call even if it was in the process of
700 * being removed when hasNext() was called.
702 private Object/*E*/ nextItem;
705 * Index of element returned by most recent call to next.
706 * Reset to -1 if this element is deleted by a call to remove.
715 nextIndex = takeIndex;
716 nextItem = items[takeIndex];
720 public boolean hasNext() {
722 * No sync. We can return true by mistake here
723 * only if this iterator passed across threads,
724 * which we don't support anyway.
726 return nextIndex >= 0;
730 * Checks whether nextIndex is valid; if so setting nextItem.
731 * Stops iterator when either hits putIndex or sees null item.
733 private void checkNext() {
734 if (nextIndex == putIndex) {
738 nextItem = items[nextIndex];
739 if (nextItem == null)
744 public Object/*E*/ next() {
745 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
749 throw new NoSuchElementException();
751 Object/*E*/ x = nextItem;
752 nextIndex = inc(nextIndex);
760 public void remove() {
761 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
766 throw new IllegalStateException();
771 // back up cursor (reset to front if was first element)
772 nextIndex = (i == ti) ? takeIndex : i;