1 /* 2 * Copyright (C) 2010 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import com.google.common.collect.ObjectArrays; 20 import com.google.errorprone.annotations.CanIgnoreReturnValue; 21 import java.util.AbstractQueue; 22 import java.util.Collection; 23 import java.util.ConcurrentModificationException; 24 import java.util.Iterator; 25 import java.util.NoSuchElementException; 26 import java.util.concurrent.BlockingQueue; 27 import java.util.concurrent.TimeUnit; 28 import org.checkerframework.checker.nullness.qual.Nullable; 29 30 /** 31 * A bounded {@linkplain BlockingQueue blocking queue} backed by an array. This queue orders 32 * elements FIFO (first-in-first-out). The <em>head</em> of the queue is that element that has been 33 * on the queue the longest time. The <em>tail</em> of the queue is that element that has been on 34 * the queue the shortest time. New elements are inserted at the tail of the queue, and the queue 35 * retrieval operations obtain elements at the head of the queue. 36 * 37 * <p>This is a classic "bounded buffer", in which a fixed-sized array holds elements 38 * inserted by producers and extracted by consumers. Once created, the capacity cannot be increased. 39 * Attempts to {@code put} an element into a full queue will result in the operation blocking; 40 * attempts to {@code take} an element from an empty queue will similarly block. 41 * 42 * <p>This class supports an optional fairness policy for ordering waiting producer and consumer 43 * threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness 44 * set to {@code true} grants threads access in FIFO order. Fairness generally decreases throughput 45 * but reduces variability and avoids starvation. 46 * 47 * <p>This class and its iterator implement all of the <em>optional</em> methods of the {@link 48 * Collection} and {@link Iterator} interfaces. 49 * 50 * @author Doug Lea 51 * @author Justin T. Sampson 52 * @param <E> the type of elements held in this collection 53 */ 54 @CanIgnoreReturnValue 55 public class MonitorBasedArrayBlockingQueue<E> extends AbstractQueue<E> 56 implements BlockingQueue<E> { 57 58 // Based on revision 1.58 of ArrayBlockingQueue by Doug Lea, from 59 // http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/ 60 61 /** The queued items */ 62 final E[] items; 63 /** items index for next take, poll or remove */ 64 int takeIndex; 65 /** items index for next put, offer, or add. */ 66 int putIndex; 67 /** Number of items in the queue */ 68 private int count; 69 70 /* 71 * Concurrency control uses the classic two-condition algorithm 72 * found in any textbook. 73 */ 74 75 /** Monitor guarding all access */ 76 final Monitor monitor; 77 78 /** Guard for waiting takes */ 79 private final Monitor.Guard notEmpty; 80 81 /** Guard for waiting puts */ 82 private final Monitor.Guard notFull; 83 84 // Internal helper methods 85 86 /** Circularly increment i. */ inc(int i)87 final int inc(int i) { 88 return (++i == items.length) ? 0 : i; 89 } 90 91 /** 92 * Inserts element at current put position, advances, and signals. Call only when occupying 93 * monitor. 94 */ insert(E x)95 private void insert(E x) { 96 items[putIndex] = x; 97 putIndex = inc(putIndex); 98 ++count; 99 } 100 101 /** 102 * Extracts element at current take position, advances, and signals. Call only when occupying 103 * monitor. 104 */ extract()105 private E extract() { 106 final E[] items = this.items; 107 E x = items[takeIndex]; 108 items[takeIndex] = null; 109 takeIndex = inc(takeIndex); 110 --count; 111 return x; 112 } 113 114 /** 115 * Utility for remove and iterator.remove: Delete item at position i. Call only when occupying 116 * monitor. 117 */ removeAt(int i)118 void removeAt(int i) { 119 final E[] items = this.items; 120 // if removing front item, just advance 121 if (i == takeIndex) { 122 items[takeIndex] = null; 123 takeIndex = inc(takeIndex); 124 } else { 125 // slide over all others up through putIndex. 126 for (; ; ) { 127 int nexti = inc(i); 128 if (nexti != putIndex) { 129 items[i] = items[nexti]; 130 i = nexti; 131 } else { 132 items[i] = null; 133 putIndex = i; 134 break; 135 } 136 } 137 } 138 --count; 139 } 140 141 /** 142 * Creates an {@code MonitorBasedArrayBlockingQueue} with the given (fixed) capacity and default 143 * access policy. 144 * 145 * @param capacity the capacity of this queue 146 * @throws IllegalArgumentException if {@code capacity} is less than 1 147 */ MonitorBasedArrayBlockingQueue(int capacity)148 public MonitorBasedArrayBlockingQueue(int capacity) { 149 this(capacity, false); 150 } 151 152 /** 153 * Creates an {@code MonitorBasedArrayBlockingQueue} with the given (fixed) capacity and the 154 * specified access policy. 155 * 156 * @param capacity the capacity of this queue 157 * @param fair if {@code true} then queue accesses for threads blocked on insertion or removal, 158 * are processed in FIFO order; if {@code false} the access order is unspecified. 159 * @throws IllegalArgumentException if {@code capacity} is less than 1 160 */ MonitorBasedArrayBlockingQueue(int capacity, boolean fair)161 public MonitorBasedArrayBlockingQueue(int capacity, boolean fair) { 162 if (capacity <= 0) throw new IllegalArgumentException(); 163 this.items = newEArray(capacity); 164 monitor = new Monitor(fair); 165 notEmpty = 166 new Monitor.Guard(monitor) { 167 @Override 168 public boolean isSatisfied() { 169 return count > 0; 170 } 171 }; 172 notFull = 173 new Monitor.Guard(monitor) { 174 @Override 175 public boolean isSatisfied() { 176 return count < items.length; 177 } 178 }; 179 } 180 181 /** 182 * Creates an {@code MonitorBasedArrayBlockingQueue} with the given (fixed) capacity, the 183 * specified access policy and initially containing the elements of the given collection, added in 184 * traversal order of the collection's iterator. 185 * 186 * @param capacity the capacity of this queue 187 * @param fair if {@code true} then queue accesses for threads blocked on insertion or removal, 188 * are processed in FIFO order; if {@code false} the access order is unspecified. 189 * @param c the collection of elements to initially contain 190 * @throws IllegalArgumentException if {@code capacity} is less than {@code c.size()}, or less 191 * than 1. 192 * @throws NullPointerException if the specified collection or any of its elements are null 193 */ MonitorBasedArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)194 public MonitorBasedArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { 195 this(capacity, fair); 196 if (capacity < c.size()) throw new IllegalArgumentException(); 197 198 for (E e : c) add(e); 199 } 200 201 @SuppressWarnings("unchecked") // please don't try this home, kids newEArray(int capacity)202 private static <E> E[] newEArray(int capacity) { 203 return (E[]) new Object[capacity]; 204 } 205 206 /** 207 * Inserts the specified element at the tail of this queue if it is possible to do so immediately 208 * without exceeding the queue's capacity, returning {@code true} upon success and throwing an 209 * {@code IllegalStateException} if this queue is full. 210 * 211 * @param e the element to add 212 * @return {@code true} (as specified by {@link Collection#add}) 213 * @throws IllegalStateException if this queue is full 214 * @throws NullPointerException if the specified element is null 215 */ 216 @Override add(E e)217 public boolean add(E e) { 218 return super.add(e); 219 } 220 221 /** 222 * Inserts the specified element at the tail of this queue if it is possible to do so immediately 223 * without exceeding the queue's capacity, returning {@code true} upon success and {@code false} 224 * if this queue is full. This method is generally preferable to method {@link #add}, which can 225 * fail to insert an element only by throwing an exception. 226 * 227 * @throws NullPointerException if the specified element is null 228 */ 229 @Override offer(E e)230 public boolean offer(E e) { 231 if (e == null) throw new NullPointerException(); 232 final Monitor monitor = this.monitor; 233 if (monitor.enterIf(notFull)) { 234 try { 235 insert(e); 236 return true; 237 } finally { 238 monitor.leave(); 239 } 240 } else { 241 return false; 242 } 243 } 244 245 /** 246 * Inserts the specified element at the tail of this queue, waiting up to the specified wait time 247 * for space to become available if the queue is full. 248 * 249 * @throws InterruptedException {@inheritDoc} 250 * @throws NullPointerException {@inheritDoc} 251 */ 252 @Override offer(E e, long timeout, TimeUnit unit)253 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { 254 255 if (e == null) throw new NullPointerException(); 256 final Monitor monitor = this.monitor; 257 if (monitor.enterWhen(notFull, timeout, unit)) { 258 try { 259 insert(e); 260 return true; 261 } finally { 262 monitor.leave(); 263 } 264 } else { 265 return false; 266 } 267 } 268 269 /** 270 * Inserts the specified element at the tail of this queue, waiting for space to become available 271 * if the queue is full. 272 * 273 * @throws InterruptedException {@inheritDoc} 274 * @throws NullPointerException {@inheritDoc} 275 */ 276 @Override put(E e)277 public void put(E e) throws InterruptedException { 278 if (e == null) throw new NullPointerException(); 279 final Monitor monitor = this.monitor; 280 monitor.enterWhen(notFull); 281 try { 282 insert(e); 283 } finally { 284 monitor.leave(); 285 } 286 } 287 288 @Override poll()289 public E poll() { 290 final Monitor monitor = this.monitor; 291 if (monitor.enterIf(notEmpty)) { 292 try { 293 return extract(); 294 } finally { 295 monitor.leave(); 296 } 297 } else { 298 return null; 299 } 300 } 301 302 @Override poll(long timeout, TimeUnit unit)303 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 304 final Monitor monitor = this.monitor; 305 if (monitor.enterWhen(notEmpty, timeout, unit)) { 306 try { 307 return extract(); 308 } finally { 309 monitor.leave(); 310 } 311 } else { 312 return null; 313 } 314 } 315 316 @Override take()317 public E take() throws InterruptedException { 318 final Monitor monitor = this.monitor; 319 monitor.enterWhen(notEmpty); 320 try { 321 return extract(); 322 } finally { 323 monitor.leave(); 324 } 325 } 326 327 @Override peek()328 public E peek() { 329 final Monitor monitor = this.monitor; 330 if (monitor.enterIf(notEmpty)) { 331 try { 332 return items[takeIndex]; 333 } finally { 334 monitor.leave(); 335 } 336 } else { 337 return null; 338 } 339 } 340 341 // this doc comment is overridden to remove the reference to collections 342 // greater in size than Integer.MAX_VALUE 343 /** 344 * Returns the number of elements in this queue. 345 * 346 * @return the number of elements in this queue 347 */ 348 @Override size()349 public int size() { 350 final Monitor monitor = this.monitor; 351 monitor.enter(); 352 try { 353 return count; 354 } finally { 355 monitor.leave(); 356 } 357 } 358 359 // this doc comment is a modified copy of the inherited doc comment, 360 // without the reference to unlimited queues. 361 /** 362 * Returns the number of additional elements that this queue can ideally (in the absence of memory 363 * or resource constraints) accept without blocking. This is always equal to the initial capacity 364 * of this queue less the current {@code size} of this queue. 365 * 366 * <p>Note that you <em>cannot</em> always tell if an attempt to insert an element will succeed by 367 * inspecting {@code remainingCapacity} because it may be the case that another thread is about to 368 * insert or remove an element. 369 */ 370 @Override remainingCapacity()371 public int remainingCapacity() { 372 final Monitor monitor = this.monitor; 373 monitor.enter(); 374 try { 375 return items.length - count; 376 } finally { 377 monitor.leave(); 378 } 379 } 380 381 /** 382 * Removes a single instance of the specified element from this queue, if it is present. More 383 * formally, removes an element {@code e} such that {@code o.equals(e)}, if this queue contains 384 * one or more such elements. Returns {@code true} if this queue contained the specified element 385 * (or equivalently, if this queue changed as a result of the call). 386 * 387 * @param o element to be removed from this queue, if present 388 * @return {@code true} if this queue changed as a result of the call 389 */ 390 @Override remove(@ullable Object o)391 public boolean remove(@Nullable Object o) { 392 if (o == null) return false; 393 final E[] items = this.items; 394 final Monitor monitor = this.monitor; 395 monitor.enter(); 396 try { 397 int i = takeIndex; 398 int k = 0; 399 for (; ; ) { 400 if (k++ >= count) return false; 401 if (o.equals(items[i])) { 402 removeAt(i); 403 return true; 404 } 405 i = inc(i); 406 } 407 } finally { 408 monitor.leave(); 409 } 410 } 411 412 /** 413 * Returns {@code true} if this queue contains the specified element. More formally, returns 414 * {@code true} if and only if this queue contains at least one element {@code e} such that {@code 415 * o.equals(e)}. 416 * 417 * @param o object to be checked for containment in this queue 418 * @return {@code true} if this queue contains the specified element 419 */ 420 @Override contains(@ullable Object o)421 public boolean contains(@Nullable Object o) { 422 if (o == null) return false; 423 final E[] items = this.items; 424 final Monitor monitor = this.monitor; 425 monitor.enter(); 426 try { 427 int i = takeIndex; 428 int k = 0; 429 while (k++ < count) { 430 if (o.equals(items[i])) return true; 431 i = inc(i); 432 } 433 return false; 434 } finally { 435 monitor.leave(); 436 } 437 } 438 439 /** 440 * Returns an array containing all of the elements in this queue, in proper sequence. 441 * 442 * <p>The returned array will be "safe" in that no references to it are maintained by this queue. 443 * (In other words, this method must allocate a new array). The caller is thus free to modify the 444 * returned array. 445 * 446 * <p>This method acts as bridge between array-based and collection-based APIs. 447 * 448 * @return an array containing all of the elements in this queue 449 */ 450 @Override toArray()451 public Object[] toArray() { 452 final E[] items = this.items; 453 final Monitor monitor = this.monitor; 454 monitor.enter(); 455 try { 456 Object[] a = new Object[count]; 457 int k = 0; 458 int i = takeIndex; 459 while (k < count) { 460 a[k++] = items[i]; 461 i = inc(i); 462 } 463 return a; 464 } finally { 465 monitor.leave(); 466 } 467 } 468 469 /** 470 * Returns an array containing all of the elements in this queue, in proper sequence; the runtime 471 * type of the returned array is that of the specified array. If the queue fits in the specified 472 * array, it is returned therein. Otherwise, a new array is allocated with the runtime type of the 473 * specified array and the size of this queue. 474 * 475 * <p>If this queue fits in the specified array with room to spare (i.e., the array has more 476 * elements than this queue), the element in the array immediately following the end of the queue 477 * is set to {@code null}. 478 * 479 * <p>Like the {@link #toArray()} method, this method acts as bridge between array-based and 480 * collection-based APIs. Further, this method allows precise control over the runtime type of the 481 * output array, and may, under certain circumstances, be used to save allocation costs. 482 * 483 * <p>Suppose {@code x} is a queue known to contain only strings. The following code can be used 484 * to dump the queue into a newly allocated array of {@code String}: 485 * 486 * <pre> 487 * String[] y = x.toArray(new String[0]);</pre> 488 * 489 * <p>Note that {@code toArray(new Object[0])} is identical in function to {@code toArray()}. 490 * 491 * @param a the array into which the elements of the queue are to be stored, if it is big enough; 492 * otherwise, a new array of the same runtime type is allocated for this purpose 493 * @return an array containing all of the elements in this queue 494 * @throws ArrayStoreException if the runtime type of the specified array is not a supertype of 495 * the runtime type of every element in this queue 496 * @throws NullPointerException if the specified array is null 497 */ 498 @Override toArray(T[] a)499 public <T> T[] toArray(T[] a) { 500 final E[] items = this.items; 501 final Monitor monitor = this.monitor; 502 monitor.enter(); 503 try { 504 if (a.length < count) a = ObjectArrays.newArray(a, count); 505 506 int k = 0; 507 int i = takeIndex; 508 while (k < count) { 509 // This cast is not itself safe, but the following statement 510 // will fail if the runtime type of items[i] is not assignable 511 // to the runtime type of a[k++], which is all that the method 512 // contract requires (see @throws ArrayStoreException above). 513 @SuppressWarnings("unchecked") 514 T t = (T) items[i]; 515 a[k++] = t; 516 i = inc(i); 517 } 518 if (a.length > count) a[count] = null; 519 return a; 520 } finally { 521 monitor.leave(); 522 } 523 } 524 525 @Override toString()526 public String toString() { 527 final Monitor monitor = this.monitor; 528 monitor.enter(); 529 try { 530 return super.toString(); 531 } finally { 532 monitor.leave(); 533 } 534 } 535 536 /** 537 * Atomically removes all of the elements from this queue. The queue will be empty after this call 538 * returns. 539 */ 540 @Override clear()541 public void clear() { 542 final E[] items = this.items; 543 final Monitor monitor = this.monitor; 544 monitor.enter(); 545 try { 546 int i = takeIndex; 547 int k = count; 548 while (k-- > 0) { 549 items[i] = null; 550 i = inc(i); 551 } 552 count = 0; 553 putIndex = 0; 554 takeIndex = 0; 555 } finally { 556 monitor.leave(); 557 } 558 } 559 560 /** 561 * @throws UnsupportedOperationException {@inheritDoc} 562 * @throws ClassCastException {@inheritDoc} 563 * @throws NullPointerException {@inheritDoc} 564 * @throws IllegalArgumentException {@inheritDoc} 565 */ 566 @Override drainTo(Collection<? super E> c)567 public int drainTo(Collection<? super E> c) { 568 if (c == null) throw new NullPointerException(); 569 if (c == this) throw new IllegalArgumentException(); 570 final E[] items = this.items; 571 final Monitor monitor = this.monitor; 572 monitor.enter(); 573 try { 574 int i = takeIndex; 575 int n = 0; 576 int max = count; 577 while (n < max) { 578 c.add(items[i]); 579 items[i] = null; 580 i = inc(i); 581 ++n; 582 } 583 if (n > 0) { 584 count = 0; 585 putIndex = 0; 586 takeIndex = 0; 587 } 588 return n; 589 } finally { 590 monitor.leave(); 591 } 592 } 593 594 /** 595 * @throws UnsupportedOperationException {@inheritDoc} 596 * @throws ClassCastException {@inheritDoc} 597 * @throws NullPointerException {@inheritDoc} 598 * @throws IllegalArgumentException {@inheritDoc} 599 */ 600 @Override drainTo(Collection<? super E> c, int maxElements)601 public int drainTo(Collection<? super E> c, int maxElements) { 602 if (c == null) throw new NullPointerException(); 603 if (c == this) throw new IllegalArgumentException(); 604 if (maxElements <= 0) return 0; 605 final E[] items = this.items; 606 final Monitor monitor = this.monitor; 607 monitor.enter(); 608 try { 609 int i = takeIndex; 610 int n = 0; 611 int max = (maxElements < count) ? maxElements : count; 612 while (n < max) { 613 c.add(items[i]); 614 items[i] = null; 615 i = inc(i); 616 ++n; 617 } 618 if (n > 0) { 619 count -= n; 620 takeIndex = i; 621 } 622 return n; 623 } finally { 624 monitor.leave(); 625 } 626 } 627 628 /** 629 * Returns an iterator over the elements in this queue in proper sequence. The returned {@code 630 * Iterator} is a "weakly consistent" iterator that will never throw {@link 631 * ConcurrentModificationException}, and guarantees to traverse elements as they existed upon 632 * construction of the iterator, and may (but is not guaranteed to) reflect any modifications 633 * subsequent to construction. 634 * 635 * @return an iterator over the elements in this queue in proper sequence 636 */ 637 @Override iterator()638 public Iterator<E> iterator() { 639 final Monitor monitor = this.monitor; 640 monitor.enter(); 641 try { 642 return new Itr(); 643 } finally { 644 monitor.leave(); 645 } 646 } 647 648 /** Iterator for MonitorBasedArrayBlockingQueue */ 649 private class Itr implements Iterator<E> { 650 /** Index of element to be returned by next, or a negative number if no such. */ 651 private int nextIndex; 652 653 /** 654 * nextItem holds on to item fields because once we claim that an element exists in hasNext(), 655 * we must return it in the following next() call even if it was in the process of being removed 656 * when hasNext() was called. 657 */ 658 private E nextItem; 659 660 /** 661 * Index of element returned by most recent call to next. Reset to -1 if this element is deleted 662 * by a call to remove. 663 */ 664 private int lastRet; 665 Itr()666 Itr() { 667 lastRet = -1; 668 if (count == 0) nextIndex = -1; 669 else { 670 nextIndex = takeIndex; 671 nextItem = items[takeIndex]; 672 } 673 } 674 675 @Override hasNext()676 public boolean hasNext() { 677 /* 678 * No sync. We can return true by mistake here 679 * only if this iterator passed across threads, 680 * which we don't support anyway. 681 */ 682 return nextIndex >= 0; 683 } 684 685 /** 686 * Checks whether nextIndex is valid; if so setting nextItem. Stops iterator when either hits 687 * putIndex or sees null item. 688 */ checkNext()689 private void checkNext() { 690 if (nextIndex == putIndex) { 691 nextIndex = -1; 692 nextItem = null; 693 } else { 694 nextItem = items[nextIndex]; 695 if (nextItem == null) nextIndex = -1; 696 } 697 } 698 699 @Override next()700 public E next() { 701 final Monitor monitor = MonitorBasedArrayBlockingQueue.this.monitor; 702 monitor.enter(); 703 try { 704 if (nextIndex < 0) throw new NoSuchElementException(); 705 lastRet = nextIndex; 706 E x = nextItem; 707 nextIndex = inc(nextIndex); 708 checkNext(); 709 return x; 710 } finally { 711 monitor.leave(); 712 } 713 } 714 715 @Override remove()716 public void remove() { 717 final Monitor monitor = MonitorBasedArrayBlockingQueue.this.monitor; 718 monitor.enter(); 719 try { 720 int i = lastRet; 721 if (i == -1) throw new IllegalStateException(); 722 lastRet = -1; 723 724 int ti = takeIndex; 725 removeAt(i); 726 // back up cursor (reset to front if was first element) 727 nextIndex = (i == ti) ? takeIndex : i; 728 checkNext(); 729 } finally { 730 monitor.leave(); 731 } 732 } 733 } 734 } 735