1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 9 import static java.util.concurrent.TimeUnit.NANOSECONDS; 10 11 import java.util.AbstractQueue; 12 import java.util.Collection; 13 import java.util.Iterator; 14 import java.util.NoSuchElementException; 15 import java.util.PriorityQueue; 16 import java.util.concurrent.locks.Condition; 17 import java.util.concurrent.locks.ReentrantLock; 18 19 // BEGIN android-note 20 // removed link to collections framework docs 21 // END android-note 22 23 /** 24 * An unbounded {@linkplain BlockingQueue blocking queue} of 25 * {@code Delayed} elements, in which an element can only be taken 26 * when its delay has expired. The <em>head</em> of the queue is that 27 * {@code Delayed} element whose delay expired furthest in the 28 * past. If no delay has expired there is no head and {@code poll} 29 * will return {@code null}. Expiration occurs when an element's 30 * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less 31 * than or equal to zero. Even though unexpired elements cannot be 32 * removed using {@code take} or {@code poll}, they are otherwise 33 * treated as normal elements. For example, the {@code size} method 34 * returns the count of both expired and unexpired elements. 35 * This queue does not permit null elements. 36 * 37 * <p>This class and its iterator implement all of the 38 * <em>optional</em> methods of the {@link Collection} and {@link 39 * Iterator} interfaces. The Iterator provided in method {@link 40 * #iterator()} is <em>not</em> guaranteed to traverse the elements of 41 * the DelayQueue in any particular order. 42 * 43 * @since 1.5 44 * @author Doug Lea 45 * @param <E> the type of elements held in this queue 46 */ 47 public class DelayQueue<E extends Delayed> extends AbstractQueue<E> 48 implements BlockingQueue<E> { 49 50 private final transient ReentrantLock lock = new ReentrantLock(); 51 private final PriorityQueue<E> q = new PriorityQueue<E>(); 52 53 /** 54 * Thread designated to wait for the element at the head of 55 * the queue. This variant of the Leader-Follower pattern 56 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to 57 * minimize unnecessary timed waiting. When a thread becomes 58 * the leader, it waits only for the next delay to elapse, but 59 * other threads await indefinitely. The leader thread must 60 * signal some other thread before returning from take() or 61 * poll(...), unless some other thread becomes leader in the 62 * interim. Whenever the head of the queue is replaced with 63 * an element with an earlier expiration time, the leader 64 * field is invalidated by being reset to null, and some 65 * waiting thread, but not necessarily the current leader, is 66 * signalled. So waiting threads must be prepared to acquire 67 * and lose leadership while waiting. 68 */ 69 private Thread leader; 70 71 /** 72 * Condition signalled when a newer element becomes available 73 * at the head of the queue or a new thread may need to 74 * become leader. 75 */ 76 private final Condition available = lock.newCondition(); 77 78 /** 79 * Creates a new {@code DelayQueue} that is initially empty. 80 */ DelayQueue()81 public DelayQueue() {} 82 83 /** 84 * Creates a {@code DelayQueue} initially containing the elements of the 85 * given collection of {@link Delayed} instances. 86 * 87 * @param c the collection of elements to initially contain 88 * @throws NullPointerException if the specified collection or any 89 * of its elements are null 90 */ DelayQueue(Collection<? extends E> c)91 public DelayQueue(Collection<? extends E> c) { 92 this.addAll(c); 93 } 94 95 /** 96 * Inserts the specified element into this delay queue. 97 * 98 * @param e the element to add 99 * @return {@code true} (as specified by {@link Collection#add}) 100 * @throws NullPointerException if the specified element is null 101 */ add(E e)102 public boolean add(E e) { 103 return offer(e); 104 } 105 106 /** 107 * Inserts the specified element into this delay queue. 108 * 109 * @param e the element to add 110 * @return {@code true} 111 * @throws NullPointerException if the specified element is null 112 */ offer(E e)113 public boolean offer(E e) { 114 final ReentrantLock lock = this.lock; 115 lock.lock(); 116 try { 117 q.offer(e); 118 if (q.peek() == e) { 119 leader = null; 120 available.signal(); 121 } 122 return true; 123 } finally { 124 lock.unlock(); 125 } 126 } 127 128 /** 129 * Inserts the specified element into this delay queue. As the queue is 130 * unbounded this method will never block. 131 * 132 * @param e the element to add 133 * @throws NullPointerException {@inheritDoc} 134 */ put(E e)135 public void put(E e) { 136 offer(e); 137 } 138 139 /** 140 * Inserts the specified element into this delay queue. As the queue is 141 * unbounded this method will never block. 142 * 143 * @param e the element to add 144 * @param timeout This parameter is ignored as the method never blocks 145 * @param unit This parameter is ignored as the method never blocks 146 * @return {@code true} 147 * @throws NullPointerException {@inheritDoc} 148 */ offer(E e, long timeout, TimeUnit unit)149 public boolean offer(E e, long timeout, TimeUnit unit) { 150 return offer(e); 151 } 152 153 /** 154 * Retrieves and removes the head of this queue, or returns {@code null} 155 * if this queue has no elements with an expired delay. 156 * 157 * @return the head of this queue, or {@code null} if this 158 * queue has no elements with an expired delay 159 */ poll()160 public E poll() { 161 final ReentrantLock lock = this.lock; 162 lock.lock(); 163 try { 164 E first = q.peek(); 165 return (first == null || first.getDelay(NANOSECONDS) > 0) 166 ? null 167 : q.poll(); 168 } finally { 169 lock.unlock(); 170 } 171 } 172 173 /** 174 * Retrieves and removes the head of this queue, waiting if necessary 175 * until an element with an expired delay is available on this queue. 176 * 177 * @return the head of this queue 178 * @throws InterruptedException {@inheritDoc} 179 */ take()180 public E take() throws InterruptedException { 181 final ReentrantLock lock = this.lock; 182 lock.lockInterruptibly(); 183 try { 184 for (;;) { 185 E first = q.peek(); 186 if (first == null) 187 available.await(); 188 else { 189 long delay = first.getDelay(NANOSECONDS); 190 if (delay <= 0L) 191 return q.poll(); 192 first = null; // don't retain ref while waiting 193 if (leader != null) 194 available.await(); 195 else { 196 Thread thisThread = Thread.currentThread(); 197 leader = thisThread; 198 try { 199 available.awaitNanos(delay); 200 } finally { 201 if (leader == thisThread) 202 leader = null; 203 } 204 } 205 } 206 } 207 } finally { 208 if (leader == null && q.peek() != null) 209 available.signal(); 210 lock.unlock(); 211 } 212 } 213 214 /** 215 * Retrieves and removes the head of this queue, waiting if necessary 216 * until an element with an expired delay is available on this queue, 217 * or the specified wait time expires. 218 * 219 * @return the head of this queue, or {@code null} if the 220 * specified waiting time elapses before an element with 221 * an expired delay becomes available 222 * @throws InterruptedException {@inheritDoc} 223 */ poll(long timeout, TimeUnit unit)224 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 225 long nanos = unit.toNanos(timeout); 226 final ReentrantLock lock = this.lock; 227 lock.lockInterruptibly(); 228 try { 229 for (;;) { 230 E first = q.peek(); 231 if (first == null) { 232 if (nanos <= 0L) 233 return null; 234 else 235 nanos = available.awaitNanos(nanos); 236 } else { 237 long delay = first.getDelay(NANOSECONDS); 238 if (delay <= 0L) 239 return q.poll(); 240 if (nanos <= 0L) 241 return null; 242 first = null; // don't retain ref while waiting 243 if (nanos < delay || leader != null) 244 nanos = available.awaitNanos(nanos); 245 else { 246 Thread thisThread = Thread.currentThread(); 247 leader = thisThread; 248 try { 249 long timeLeft = available.awaitNanos(delay); 250 nanos -= delay - timeLeft; 251 } finally { 252 if (leader == thisThread) 253 leader = null; 254 } 255 } 256 } 257 } 258 } finally { 259 if (leader == null && q.peek() != null) 260 available.signal(); 261 lock.unlock(); 262 } 263 } 264 265 /** 266 * Retrieves, but does not remove, the head of this queue, or 267 * returns {@code null} if this queue is empty. Unlike 268 * {@code poll}, if no expired elements are available in the queue, 269 * this method returns the element that will expire next, 270 * if one exists. 271 * 272 * @return the head of this queue, or {@code null} if this 273 * queue is empty 274 */ peek()275 public E peek() { 276 final ReentrantLock lock = this.lock; 277 lock.lock(); 278 try { 279 return q.peek(); 280 } finally { 281 lock.unlock(); 282 } 283 } 284 size()285 public int size() { 286 final ReentrantLock lock = this.lock; 287 lock.lock(); 288 try { 289 return q.size(); 290 } finally { 291 lock.unlock(); 292 } 293 } 294 295 /** 296 * Returns first element only if it is expired. 297 * Used only by drainTo. Call only when holding lock. 298 */ peekExpired()299 private E peekExpired() { 300 // assert lock.isHeldByCurrentThread(); 301 E first = q.peek(); 302 return (first == null || first.getDelay(NANOSECONDS) > 0) ? 303 null : first; 304 } 305 306 /** 307 * @throws UnsupportedOperationException {@inheritDoc} 308 * @throws ClassCastException {@inheritDoc} 309 * @throws NullPointerException {@inheritDoc} 310 * @throws IllegalArgumentException {@inheritDoc} 311 */ drainTo(Collection<? super E> c)312 public int drainTo(Collection<? super E> c) { 313 if (c == null) 314 throw new NullPointerException(); 315 if (c == this) 316 throw new IllegalArgumentException(); 317 final ReentrantLock lock = this.lock; 318 lock.lock(); 319 try { 320 int n = 0; 321 for (E e; (e = peekExpired()) != null;) { 322 c.add(e); // In this order, in case add() throws. 323 q.poll(); 324 ++n; 325 } 326 return n; 327 } finally { 328 lock.unlock(); 329 } 330 } 331 332 /** 333 * @throws UnsupportedOperationException {@inheritDoc} 334 * @throws ClassCastException {@inheritDoc} 335 * @throws NullPointerException {@inheritDoc} 336 * @throws IllegalArgumentException {@inheritDoc} 337 */ drainTo(Collection<? super E> c, int maxElements)338 public int drainTo(Collection<? super E> c, int maxElements) { 339 if (c == null) 340 throw new NullPointerException(); 341 if (c == this) 342 throw new IllegalArgumentException(); 343 if (maxElements <= 0) 344 return 0; 345 final ReentrantLock lock = this.lock; 346 lock.lock(); 347 try { 348 int n = 0; 349 for (E e; n < maxElements && (e = peekExpired()) != null;) { 350 c.add(e); // In this order, in case add() throws. 351 q.poll(); 352 ++n; 353 } 354 return n; 355 } finally { 356 lock.unlock(); 357 } 358 } 359 360 /** 361 * Atomically removes all of the elements from this delay queue. 362 * The queue will be empty after this call returns. 363 * Elements with an unexpired delay are not waited for; they are 364 * simply discarded from the queue. 365 */ clear()366 public void clear() { 367 final ReentrantLock lock = this.lock; 368 lock.lock(); 369 try { 370 q.clear(); 371 } finally { 372 lock.unlock(); 373 } 374 } 375 376 /** 377 * Always returns {@code Integer.MAX_VALUE} because 378 * a {@code DelayQueue} is not capacity constrained. 379 * 380 * @return {@code Integer.MAX_VALUE} 381 */ remainingCapacity()382 public int remainingCapacity() { 383 return Integer.MAX_VALUE; 384 } 385 386 /** 387 * Returns an array containing all of the elements in this queue. 388 * The returned array elements are in no particular order. 389 * 390 * <p>The returned array will be "safe" in that no references to it are 391 * maintained by this queue. (In other words, this method must allocate 392 * a new array). The caller is thus free to modify the returned array. 393 * 394 * <p>This method acts as bridge between array-based and collection-based 395 * APIs. 396 * 397 * @return an array containing all of the elements in this queue 398 */ toArray()399 public Object[] toArray() { 400 final ReentrantLock lock = this.lock; 401 lock.lock(); 402 try { 403 return q.toArray(); 404 } finally { 405 lock.unlock(); 406 } 407 } 408 409 /** 410 * Returns an array containing all of the elements in this queue; the 411 * runtime type of the returned array is that of the specified array. 412 * The returned array elements are in no particular order. 413 * If the queue fits in the specified array, it is returned therein. 414 * Otherwise, a new array is allocated with the runtime type of the 415 * specified array and the size of this queue. 416 * 417 * <p>If this queue fits in the specified array with room to spare 418 * (i.e., the array has more elements than this queue), the element in 419 * the array immediately following the end of the queue is set to 420 * {@code null}. 421 * 422 * <p>Like the {@link #toArray()} method, this method acts as bridge between 423 * array-based and collection-based APIs. Further, this method allows 424 * precise control over the runtime type of the output array, and may, 425 * under certain circumstances, be used to save allocation costs. 426 * 427 * <p>The following code can be used to dump a delay queue into a newly 428 * allocated array of {@code Delayed}: 429 * 430 * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre> 431 * 432 * Note that {@code toArray(new Object[0])} is identical in function to 433 * {@code toArray()}. 434 * 435 * @param a the array into which the elements of the queue are to 436 * be stored, if it is big enough; otherwise, a new array of the 437 * same runtime type is allocated for this purpose 438 * @return an array containing all of the elements in this queue 439 * @throws ArrayStoreException if the runtime type of the specified array 440 * is not a supertype of the runtime type of every element in 441 * this queue 442 * @throws NullPointerException if the specified array is null 443 */ toArray(T[] a)444 public <T> T[] toArray(T[] a) { 445 final ReentrantLock lock = this.lock; 446 lock.lock(); 447 try { 448 return q.toArray(a); 449 } finally { 450 lock.unlock(); 451 } 452 } 453 454 /** 455 * Removes a single instance of the specified element from this 456 * queue, if it is present, whether or not it has expired. 457 */ remove(Object o)458 public boolean remove(Object o) { 459 final ReentrantLock lock = this.lock; 460 lock.lock(); 461 try { 462 return q.remove(o); 463 } finally { 464 lock.unlock(); 465 } 466 } 467 468 /** 469 * Identity-based version for use in Itr.remove. 470 */ removeEQ(Object o)471 void removeEQ(Object o) { 472 final ReentrantLock lock = this.lock; 473 lock.lock(); 474 try { 475 for (Iterator<E> it = q.iterator(); it.hasNext(); ) { 476 if (o == it.next()) { 477 it.remove(); 478 break; 479 } 480 } 481 } finally { 482 lock.unlock(); 483 } 484 } 485 486 /** 487 * Returns an iterator over all the elements (both expired and 488 * unexpired) in this queue. The iterator does not return the 489 * elements in any particular order. 490 * 491 * <p>The returned iterator is 492 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 493 * 494 * @return an iterator over the elements in this queue 495 */ iterator()496 public Iterator<E> iterator() { 497 return new Itr(toArray()); 498 } 499 500 /** 501 * Snapshot iterator that works off copy of underlying q array. 502 */ 503 private class Itr implements Iterator<E> { 504 final Object[] array; // Array of all elements 505 int cursor; // index of next element to return 506 int lastRet; // index of last element, or -1 if no such 507 Itr(Object[] array)508 Itr(Object[] array) { 509 lastRet = -1; 510 this.array = array; 511 } 512 hasNext()513 public boolean hasNext() { 514 return cursor < array.length; 515 } 516 517 @SuppressWarnings("unchecked") next()518 public E next() { 519 if (cursor >= array.length) 520 throw new NoSuchElementException(); 521 lastRet = cursor; 522 return (E)array[cursor++]; 523 } 524 remove()525 public void remove() { 526 if (lastRet < 0) 527 throw new IllegalStateException(); 528 removeEQ(array[lastRet]); 529 lastRet = -1; 530 } 531 } 532 533 } 534