1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import static java.util.concurrent.TimeUnit.MILLISECONDS; 39 import static java.util.concurrent.TimeUnit.NANOSECONDS; 40 41 import android.compat.annotation.ChangeId; 42 import android.compat.annotation.EnabledAfter; 43 import android.compat.Compatibility; 44 45 import dalvik.annotation.compat.VersionCodes; 46 47 import java.util.AbstractQueue; 48 import java.util.Arrays; 49 import java.util.Collection; 50 import java.util.Iterator; 51 import java.util.List; 52 import java.util.NoSuchElementException; 53 import java.util.Objects; 54 import java.util.concurrent.atomic.AtomicLong; 55 import java.util.concurrent.locks.Condition; 56 import java.util.concurrent.locks.ReentrantLock; 57 58 // BEGIN android-note 59 // omit class-level docs on setRemoveOnCancelPolicy() 60 // END android-note 61 62 /** 63 * A {@link ThreadPoolExecutor} that can additionally schedule 64 * commands to run after a given delay, or to execute periodically. 65 * This class is preferable to {@link java.util.Timer} when multiple 66 * worker threads are needed, or when the additional flexibility or 67 * capabilities of {@link ThreadPoolExecutor} (which this class 68 * extends) are required. 69 * 70 * <p>Delayed tasks execute no sooner than they are enabled, but 71 * without any real-time guarantees about when, after they are 72 * enabled, they will commence. Tasks scheduled for exactly the same 73 * execution time are enabled in first-in-first-out (FIFO) order of 74 * submission. 75 * 76 * <p>When a submitted task is cancelled before it is run, execution 77 * is suppressed. By default, such a cancelled task is not 78 * automatically removed from the work queue until its delay elapses. 79 * While this enables further inspection and monitoring, it may also 80 * cause unbounded retention of cancelled tasks. 81 * 82 * <p>Successive executions of a periodic task scheduled via 83 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or 84 * {@link #scheduleWithFixedDelay scheduleWithFixedDelay} 85 * do not overlap. While different executions may be performed by 86 * different threads, the effects of prior executions 87 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 88 * those of subsequent ones. 89 * 90 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few 91 * of the inherited tuning methods are not useful for it. In 92 * particular, because it acts as a fixed-sized pool using 93 * {@code corePoolSize} threads and an unbounded queue, adjustments 94 * to {@code maximumPoolSize} have no useful effect. Additionally, it 95 * is almost never a good idea to set {@code corePoolSize} to zero or 96 * use {@code allowCoreThreadTimeOut} because this may leave the pool 97 * without threads to handle tasks once they become eligible to run. 98 * 99 * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified, 100 * this class uses {@link Executors#defaultThreadFactory} as the 101 * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy} 102 * as the default rejected execution handler. 103 * 104 * <p><b>Extension notes:</b> This class overrides the 105 * {@link ThreadPoolExecutor#execute(Runnable) execute} and 106 * {@link AbstractExecutorService#submit(Runnable) submit} 107 * methods to generate internal {@link ScheduledFuture} objects to 108 * control per-task delays and scheduling. To preserve 109 * functionality, any further overrides of these methods in 110 * subclasses must invoke superclass versions, which effectively 111 * disables additional task customization. However, this class 112 * provides alternative protected extension method 113 * {@code decorateTask} (one version each for {@code Runnable} and 114 * {@code Callable}) that can be used to customize the concrete task 115 * types used to execute commands entered via {@code execute}, 116 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate}, 117 * and {@code scheduleWithFixedDelay}. By default, a 118 * {@code ScheduledThreadPoolExecutor} uses a task type extending 119 * {@link FutureTask}. However, this may be modified or replaced using 120 * subclasses of the form: 121 * 122 * <pre> {@code 123 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor { 124 * 125 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... } 126 * 127 * protected <V> RunnableScheduledFuture<V> decorateTask( 128 * Runnable r, RunnableScheduledFuture<V> task) { 129 * return new CustomTask<V>(r, task); 130 * } 131 * 132 * protected <V> RunnableScheduledFuture<V> decorateTask( 133 * Callable<V> c, RunnableScheduledFuture<V> task) { 134 * return new CustomTask<V>(c, task); 135 * } 136 * // ... add constructors, etc. 137 * }}</pre> 138 * 139 * @since 1.5 140 * @author Doug Lea 141 */ 142 public class ScheduledThreadPoolExecutor 143 extends ThreadPoolExecutor 144 implements ScheduledExecutorService { 145 146 /** 147 * For fixed rate tasks, prevent multiple tasks from running back-to-back to 148 * account for missed periods. 149 * On Android, it's often the case that app processes will miss multiple 150 * scheduled periods because the CPU often enters suspended states, or 151 * because app processes may be moved to the Cached Apps Freezer. 152 * This flag prevents apps from thrashing upon exiting suspend or frozen 153 * states to needlessly "catch up" to lost time. 154 * 155 * @hide 156 */ 157 @ChangeId 158 @EnabledAfter(targetSdkVersion = VersionCodes.VANILLA_ICE_CREAM) 159 public static final long STPE_SKIP_MULTIPLE_MISSED_PERIODIC_TASKS = 288912692L; 160 161 /** @hide */ skipMultipleMissedPeriodicTasks()162 public static boolean skipMultipleMissedPeriodicTasks() { 163 return Compatibility.isChangeEnabled( 164 STPE_SKIP_MULTIPLE_MISSED_PERIODIC_TASKS) 165 || com.android.libcore.Flags.scheduleAtFixedRateNewBehavior(); 166 } 167 168 /* 169 * This class specializes ThreadPoolExecutor implementation by 170 * 171 * 1. Using a custom task type ScheduledFutureTask, even for tasks 172 * that don't require scheduling because they are submitted 173 * using ExecutorService rather than ScheduledExecutorService 174 * methods, which are treated as tasks with a delay of zero. 175 * 176 * 2. Using a custom queue (DelayedWorkQueue), a variant of 177 * unbounded DelayQueue. The lack of capacity constraint and 178 * the fact that corePoolSize and maximumPoolSize are 179 * effectively identical simplifies some execution mechanics 180 * (see delayedExecute) compared to ThreadPoolExecutor. 181 * 182 * 3. Supporting optional run-after-shutdown parameters, which 183 * leads to overrides of shutdown methods to remove and cancel 184 * tasks that should NOT be run after shutdown, as well as 185 * different recheck logic when task (re)submission overlaps 186 * with a shutdown. 187 * 188 * 4. Task decoration methods to allow interception and 189 * instrumentation, which are needed because subclasses cannot 190 * otherwise override submit methods to get this effect. These 191 * don't have any impact on pool control logic though. 192 */ 193 194 /** 195 * False if should cancel/suppress periodic tasks on shutdown. 196 */ 197 private volatile boolean continueExistingPeriodicTasksAfterShutdown; 198 199 // Android-changed: Preserving behaviour on expired tasks (b/202927404) 200 /** 201 * False if should cancel non-periodic tasks on shutdown. 202 */ 203 private volatile boolean executeExistingDelayedTasksAfterShutdown = true; 204 205 /** 206 * True if ScheduledFutureTask.cancel should remove from queue. 207 */ 208 volatile boolean removeOnCancel; 209 210 /** 211 * Sequence number to break scheduling ties, and in turn to 212 * guarantee FIFO order among tied entries. 213 */ 214 private static final AtomicLong sequencer = new AtomicLong(); 215 216 private class ScheduledFutureTask<V> 217 extends FutureTask<V> implements RunnableScheduledFuture<V> { 218 219 /** Sequence number to break ties FIFO */ 220 private final long sequenceNumber; 221 222 /** The nanoTime-based time when the task is enabled to execute. */ 223 private volatile long time; 224 225 /** 226 * Period for repeating tasks, in nanoseconds. 227 * A positive value indicates fixed-rate execution. 228 * A negative value indicates fixed-delay execution. 229 * A value of 0 indicates a non-repeating (one-shot) task. 230 */ 231 private final long period; 232 233 /** The actual task to be re-enqueued by reExecutePeriodic */ 234 RunnableScheduledFuture<V> outerTask = this; 235 236 /** 237 * Index into delay queue, to support faster cancellation. 238 */ 239 int heapIndex; 240 241 /** 242 * Creates a one-shot action with given nanoTime-based trigger time. 243 */ ScheduledFutureTask(Runnable r, V result, long triggerTime, long sequenceNumber)244 ScheduledFutureTask(Runnable r, V result, long triggerTime, 245 long sequenceNumber) { 246 super(r, result); 247 this.time = triggerTime; 248 this.period = 0; 249 this.sequenceNumber = sequenceNumber; 250 } 251 252 /** 253 * Creates a periodic action with given nanoTime-based initial 254 * trigger time and period. 255 */ ScheduledFutureTask(Runnable r, V result, long triggerTime, long period, long sequenceNumber)256 ScheduledFutureTask(Runnable r, V result, long triggerTime, 257 long period, long sequenceNumber) { 258 super(r, result); 259 this.time = triggerTime; 260 this.period = period; 261 this.sequenceNumber = sequenceNumber; 262 } 263 264 /** 265 * Creates a one-shot action with given nanoTime-based trigger time. 266 */ ScheduledFutureTask(Callable<V> callable, long triggerTime, long sequenceNumber)267 ScheduledFutureTask(Callable<V> callable, long triggerTime, 268 long sequenceNumber) { 269 super(callable); 270 this.time = triggerTime; 271 this.period = 0; 272 this.sequenceNumber = sequenceNumber; 273 } 274 getDelay(TimeUnit unit)275 public long getDelay(TimeUnit unit) { 276 return unit.convert(time - System.nanoTime(), NANOSECONDS); 277 } 278 compareTo(Delayed other)279 public int compareTo(Delayed other) { 280 if (other == this) // compare zero if same object 281 return 0; 282 if (other instanceof ScheduledFutureTask) { 283 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; 284 long diff = time - x.time; 285 if (diff < 0) 286 return -1; 287 else if (diff > 0) 288 return 1; 289 else if (sequenceNumber < x.sequenceNumber) 290 return -1; 291 else 292 return 1; 293 } 294 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); 295 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; 296 } 297 298 /** 299 * Returns {@code true} if this is a periodic (not a one-shot) action. 300 * 301 * @return {@code true} if periodic 302 */ isPeriodic()303 public boolean isPeriodic() { 304 return period != 0; 305 } 306 307 /** 308 * Sets the next time to run for a periodic task. 309 */ 310 // Android-changed: b/288912692 relax scheduling on missed fixed rate 311 // tasks. setNextRunTime()312 private void setNextRunTime() { 313 long p = period; 314 if (p > 0) { 315 // Schedule for one period past the last start 316 time += p; 317 if (skipMultipleMissedPeriodicTasks()) { 318 final long now = System.nanoTime(); 319 // If next schedule is in the past 320 if (time < now - period) { 321 // Schedule for last missed period, so we don't attempt 322 // to catch up the rate to multiple missed tasks. 323 time = now - ((now - time + p) % p); 324 } 325 } 326 } else { 327 time = triggerTime(-p); 328 } 329 } 330 cancel(boolean mayInterruptIfRunning)331 public boolean cancel(boolean mayInterruptIfRunning) { 332 // The racy read of heapIndex below is benign: 333 // if heapIndex < 0, then OOTA guarantees that we have surely 334 // been removed; else we recheck under lock in remove() 335 boolean cancelled = super.cancel(mayInterruptIfRunning); 336 if (cancelled && removeOnCancel && heapIndex >= 0) 337 remove(this); 338 return cancelled; 339 } 340 341 /** 342 * Overrides FutureTask version so as to reset/requeue if periodic. 343 */ run()344 public void run() { 345 if (!canRunInCurrentRunState(this)) 346 cancel(false); 347 else if (!isPeriodic()) 348 super.run(); 349 else if (super.runAndReset()) { 350 setNextRunTime(); 351 reExecutePeriodic(outerTask); 352 } 353 } 354 } 355 356 /** 357 * Returns true if can run a task given current run state and 358 * run-after-shutdown parameters. 359 */ canRunInCurrentRunState(RunnableScheduledFuture<?> task)360 boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) { 361 if (!isShutdown()) 362 return true; 363 if (isStopped()) 364 return false; 365 return task.isPeriodic() 366 ? continueExistingPeriodicTasksAfterShutdown 367 : (executeExistingDelayedTasksAfterShutdown 368 // Android-changed: Preserving behaviour on expired tasks (b/202927404) 369 // || task.getDelay(NANOSECONDS) <= 0); 370 ); 371 } 372 373 /** 374 * Main execution method for delayed or periodic tasks. If pool 375 * is shut down, rejects the task. Otherwise adds task to queue 376 * and starts a thread, if necessary, to run it. (We cannot 377 * prestart the thread to run the task because the task (probably) 378 * shouldn't be run yet.) If the pool is shut down while the task 379 * is being added, cancel and remove it if required by state and 380 * run-after-shutdown parameters. 381 * 382 * @param task the task 383 */ delayedExecute(RunnableScheduledFuture<?> task)384 private void delayedExecute(RunnableScheduledFuture<?> task) { 385 if (isShutdown()) 386 reject(task); 387 else { 388 super.getQueue().add(task); 389 if (!canRunInCurrentRunState(task) && remove(task)) 390 task.cancel(false); 391 else 392 ensurePrestart(); 393 } 394 } 395 396 /** 397 * Requeues a periodic task unless current run state precludes it. 398 * Same idea as delayedExecute except drops task rather than rejecting. 399 * 400 * @param task the task 401 */ reExecutePeriodic(RunnableScheduledFuture<?> task)402 void reExecutePeriodic(RunnableScheduledFuture<?> task) { 403 if (canRunInCurrentRunState(task)) { 404 super.getQueue().add(task); 405 if (canRunInCurrentRunState(task) || !remove(task)) { 406 ensurePrestart(); 407 return; 408 } 409 } 410 task.cancel(false); 411 } 412 413 /** 414 * Cancels and clears the queue of all tasks that should not be run 415 * due to shutdown policy. Invoked within super.shutdown. 416 */ onShutdown()417 @Override void onShutdown() { 418 BlockingQueue<Runnable> q = super.getQueue(); 419 boolean keepDelayed = 420 getExecuteExistingDelayedTasksAfterShutdownPolicy(); 421 boolean keepPeriodic = 422 getContinueExistingPeriodicTasksAfterShutdownPolicy(); 423 // Traverse snapshot to avoid iterator exceptions 424 // TODO: implement and use efficient removeIf 425 // super.getQueue().removeIf(...); 426 for (Object e : q.toArray()) { 427 if (e instanceof RunnableScheduledFuture) { 428 RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; 429 if ((t.isPeriodic() 430 ? !keepPeriodic 431 // Android-changed: Preserving behaviour on expired tasks (b/202927404) 432 // : (!keepDelayed && t.getDelay(NANOSECONDS) > 0)) 433 : !keepDelayed) 434 || t.isCancelled()) { // also remove if already cancelled 435 if (q.remove(t)) 436 t.cancel(false); 437 } 438 } 439 } 440 tryTerminate(); 441 } 442 443 /** 444 * Modifies or replaces the task used to execute a runnable. 445 * This method can be used to override the concrete 446 * class used for managing internal tasks. 447 * The default implementation simply returns the given task. 448 * 449 * @param runnable the submitted Runnable 450 * @param task the task created to execute the runnable 451 * @param <V> the type of the task's result 452 * @return a task that can execute the runnable 453 * @since 1.6 454 */ decorateTask( Runnable runnable, RunnableScheduledFuture<V> task)455 protected <V> RunnableScheduledFuture<V> decorateTask( 456 Runnable runnable, RunnableScheduledFuture<V> task) { 457 return task; 458 } 459 460 /** 461 * Modifies or replaces the task used to execute a callable. 462 * This method can be used to override the concrete 463 * class used for managing internal tasks. 464 * The default implementation simply returns the given task. 465 * 466 * @param callable the submitted Callable 467 * @param task the task created to execute the callable 468 * @param <V> the type of the task's result 469 * @return a task that can execute the callable 470 * @since 1.6 471 */ decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task)472 protected <V> RunnableScheduledFuture<V> decorateTask( 473 Callable<V> callable, RunnableScheduledFuture<V> task) { 474 return task; 475 } 476 477 /** 478 * The default keep-alive time for pool threads. 479 * 480 * Normally, this value is unused because all pool threads will be 481 * core threads, but if a user creates a pool with a corePoolSize 482 * of zero (against our advice), we keep a thread alive as long as 483 * there are queued tasks. If the keep alive time is zero (the 484 * historic value), we end up hot-spinning in getTask, wasting a 485 * CPU. But on the other hand, if we set the value too high, and 486 * users create a one-shot pool which they don't cleanly shutdown, 487 * the pool's non-daemon threads will prevent JVM termination. A 488 * small but non-zero value (relative to a JVM's lifetime) seems 489 * best. 490 */ 491 private static final long DEFAULT_KEEPALIVE_MILLIS = 10L; 492 493 /** 494 * Creates a new {@code ScheduledThreadPoolExecutor} with the 495 * given core pool size. 496 * 497 * @param corePoolSize the number of threads to keep in the pool, even 498 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 499 * @throws IllegalArgumentException if {@code corePoolSize < 0} 500 */ ScheduledThreadPoolExecutor(int corePoolSize)501 public ScheduledThreadPoolExecutor(int corePoolSize) { 502 super(corePoolSize, Integer.MAX_VALUE, 503 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, 504 new DelayedWorkQueue()); 505 } 506 507 /** 508 * Creates a new {@code ScheduledThreadPoolExecutor} with the 509 * given initial parameters. 510 * 511 * @param corePoolSize the number of threads to keep in the pool, even 512 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 513 * @param threadFactory the factory to use when the executor 514 * creates a new thread 515 * @throws IllegalArgumentException if {@code corePoolSize < 0} 516 * @throws NullPointerException if {@code threadFactory} is null 517 */ ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)518 public ScheduledThreadPoolExecutor(int corePoolSize, 519 ThreadFactory threadFactory) { 520 super(corePoolSize, Integer.MAX_VALUE, 521 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, 522 new DelayedWorkQueue(), threadFactory); 523 } 524 525 /** 526 * Creates a new {@code ScheduledThreadPoolExecutor} with the 527 * given initial parameters. 528 * 529 * @param corePoolSize the number of threads to keep in the pool, even 530 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 531 * @param handler the handler to use when execution is blocked 532 * because the thread bounds and queue capacities are reached 533 * @throws IllegalArgumentException if {@code corePoolSize < 0} 534 * @throws NullPointerException if {@code handler} is null 535 */ ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)536 public ScheduledThreadPoolExecutor(int corePoolSize, 537 RejectedExecutionHandler handler) { 538 super(corePoolSize, Integer.MAX_VALUE, 539 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, 540 new DelayedWorkQueue(), handler); 541 } 542 543 /** 544 * Creates a new {@code ScheduledThreadPoolExecutor} with the 545 * given initial parameters. 546 * 547 * @param corePoolSize the number of threads to keep in the pool, even 548 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 549 * @param threadFactory the factory to use when the executor 550 * creates a new thread 551 * @param handler the handler to use when execution is blocked 552 * because the thread bounds and queue capacities are reached 553 * @throws IllegalArgumentException if {@code corePoolSize < 0} 554 * @throws NullPointerException if {@code threadFactory} or 555 * {@code handler} is null 556 */ ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)557 public ScheduledThreadPoolExecutor(int corePoolSize, 558 ThreadFactory threadFactory, 559 RejectedExecutionHandler handler) { 560 super(corePoolSize, Integer.MAX_VALUE, 561 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, 562 new DelayedWorkQueue(), threadFactory, handler); 563 } 564 565 /** 566 * Returns the nanoTime-based trigger time of a delayed action. 567 */ triggerTime(long delay, TimeUnit unit)568 private long triggerTime(long delay, TimeUnit unit) { 569 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); 570 } 571 572 /** 573 * Returns the nanoTime-based trigger time of a delayed action. 574 */ triggerTime(long delay)575 long triggerTime(long delay) { 576 return System.nanoTime() + 577 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); 578 } 579 580 /** 581 * Constrains the values of all delays in the queue to be within 582 * Long.MAX_VALUE of each other, to avoid overflow in compareTo. 583 * This may occur if a task is eligible to be dequeued, but has 584 * not yet been, while some other task is added with a delay of 585 * Long.MAX_VALUE. 586 */ overflowFree(long delay)587 private long overflowFree(long delay) { 588 Delayed head = (Delayed) super.getQueue().peek(); 589 if (head != null) { 590 long headDelay = head.getDelay(NANOSECONDS); 591 if (headDelay < 0 && (delay - headDelay < 0)) 592 delay = Long.MAX_VALUE + headDelay; 593 } 594 return delay; 595 } 596 597 /** 598 * @throws RejectedExecutionException {@inheritDoc} 599 * @throws NullPointerException {@inheritDoc} 600 */ schedule(Runnable command, long delay, TimeUnit unit)601 public ScheduledFuture<?> schedule(Runnable command, 602 long delay, 603 TimeUnit unit) { 604 if (command == null || unit == null) 605 throw new NullPointerException(); 606 RunnableScheduledFuture<Void> t = decorateTask(command, 607 new ScheduledFutureTask<Void>(command, null, 608 triggerTime(delay, unit), 609 sequencer.getAndIncrement())); 610 delayedExecute(t); 611 return t; 612 } 613 614 /** 615 * @throws RejectedExecutionException {@inheritDoc} 616 * @throws NullPointerException {@inheritDoc} 617 */ schedule(Callable<V> callable, long delay, TimeUnit unit)618 public <V> ScheduledFuture<V> schedule(Callable<V> callable, 619 long delay, 620 TimeUnit unit) { 621 if (callable == null || unit == null) 622 throw new NullPointerException(); 623 RunnableScheduledFuture<V> t = decorateTask(callable, 624 new ScheduledFutureTask<V>(callable, 625 triggerTime(delay, unit), 626 sequencer.getAndIncrement())); 627 delayedExecute(t); 628 return t; 629 } 630 631 // Android-changed: document go/scheduleAtFixedRate-behavior-change 632 /** 633 * Submits a periodic action that becomes enabled first after the 634 * given initial delay, and subsequently with the given period; 635 * that is, executions will commence after 636 * {@code initialDelay}, then {@code initialDelay + period}, then 637 * {@code initialDelay + 2 * period}, and so on. 638 * 639 * <p>The sequence of task executions continues indefinitely until 640 * one of the following exceptional completions occur: 641 * <ul> 642 * <li>The task is {@linkplain Future#cancel explicitly cancelled} 643 * via the returned future. 644 * <li>Method {@link #shutdown} is called and the {@linkplain 645 * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on 646 * whether to continue after shutdown} is not set true, or method 647 * {@link #shutdownNow} is called; also resulting in task 648 * cancellation. 649 * <li>An execution of the task throws an exception. In this case 650 * calling {@link Future#get() get} on the returned future will throw 651 * {@link ExecutionException}, holding the exception as its cause. 652 * </ul> 653 * Subsequent executions are suppressed. Subsequent calls to 654 * {@link Future#isDone isDone()} on the returned future will 655 * return {@code true}. 656 * 657 * <p>Since API level 31: If the app is frozen by the Android cached apps 658 * freezer before the fixed rate task is done or canceled, the task may run 659 * many times immediately when the app unfreezes, just as if a single 660 * execution of the command had taken the duration of the frozen period to 661 * execute. 662 * 663 * <p>Since API level 36: If any execution of this task takes longer than 664 * its period, then the subsequent execution will be scheduled for the most 665 * recent missed period. 666 * 667 * @throws RejectedExecutionException {@inheritDoc} 668 * @throws NullPointerException {@inheritDoc} 669 * @throws IllegalArgumentException {@inheritDoc} 670 */ scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)671 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 672 long initialDelay, 673 long period, 674 TimeUnit unit) { 675 if (command == null || unit == null) 676 throw new NullPointerException(); 677 if (period <= 0L) 678 throw new IllegalArgumentException(); 679 ScheduledFutureTask<Void> sft = 680 new ScheduledFutureTask<Void>(command, 681 null, 682 triggerTime(initialDelay, unit), 683 unit.toNanos(period), 684 sequencer.getAndIncrement()); 685 RunnableScheduledFuture<Void> t = decorateTask(command, sft); 686 sft.outerTask = t; 687 delayedExecute(t); 688 return t; 689 } 690 691 /** 692 * Submits a periodic action that becomes enabled first after the 693 * given initial delay, and subsequently with the given delay 694 * between the termination of one execution and the commencement of 695 * the next. 696 * 697 * <p>The sequence of task executions continues indefinitely until 698 * one of the following exceptional completions occur: 699 * <ul> 700 * <li>The task is {@linkplain Future#cancel explicitly cancelled} 701 * via the returned future. 702 * <li>Method {@link #shutdown} is called and the {@linkplain 703 * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on 704 * whether to continue after shutdown} is not set true, or method 705 * {@link #shutdownNow} is called; also resulting in task 706 * cancellation. 707 * <li>An execution of the task throws an exception. In this case 708 * calling {@link Future#get() get} on the returned future will throw 709 * {@link ExecutionException}, holding the exception as its cause. 710 * </ul> 711 * Subsequent executions are suppressed. Subsequent calls to 712 * {@link Future#isDone isDone()} on the returned future will 713 * return {@code true}. 714 * 715 * @throws RejectedExecutionException {@inheritDoc} 716 * @throws NullPointerException {@inheritDoc} 717 * @throws IllegalArgumentException {@inheritDoc} 718 */ scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)719 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 720 long initialDelay, 721 long delay, 722 TimeUnit unit) { 723 if (command == null || unit == null) 724 throw new NullPointerException(); 725 if (delay <= 0L) 726 throw new IllegalArgumentException(); 727 ScheduledFutureTask<Void> sft = 728 new ScheduledFutureTask<Void>(command, 729 null, 730 triggerTime(initialDelay, unit), 731 -unit.toNanos(delay), 732 sequencer.getAndIncrement()); 733 RunnableScheduledFuture<Void> t = decorateTask(command, sft); 734 sft.outerTask = t; 735 delayedExecute(t); 736 return t; 737 } 738 739 /** 740 * Executes {@code command} with zero required delay. 741 * This has effect equivalent to 742 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}. 743 * Note that inspections of the queue and of the list returned by 744 * {@code shutdownNow} will access the zero-delayed 745 * {@link ScheduledFuture}, not the {@code command} itself. 746 * 747 * <p>A consequence of the use of {@code ScheduledFuture} objects is 748 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always 749 * called with a null second {@code Throwable} argument, even if the 750 * {@code command} terminated abruptly. Instead, the {@code Throwable} 751 * thrown by such a task can be obtained via {@link Future#get}. 752 * 753 * @throws RejectedExecutionException at discretion of 754 * {@code RejectedExecutionHandler}, if the task 755 * cannot be accepted for execution because the 756 * executor has been shut down 757 * @throws NullPointerException {@inheritDoc} 758 */ execute(Runnable command)759 public void execute(Runnable command) { 760 schedule(command, 0, NANOSECONDS); 761 } 762 763 // Override AbstractExecutorService methods 764 765 /** 766 * @throws RejectedExecutionException {@inheritDoc} 767 * @throws NullPointerException {@inheritDoc} 768 */ submit(Runnable task)769 public Future<?> submit(Runnable task) { 770 return schedule(task, 0, NANOSECONDS); 771 } 772 773 /** 774 * @throws RejectedExecutionException {@inheritDoc} 775 * @throws NullPointerException {@inheritDoc} 776 */ submit(Runnable task, T result)777 public <T> Future<T> submit(Runnable task, T result) { 778 return schedule(Executors.callable(task, result), 0, NANOSECONDS); 779 } 780 781 /** 782 * @throws RejectedExecutionException {@inheritDoc} 783 * @throws NullPointerException {@inheritDoc} 784 */ submit(Callable<T> task)785 public <T> Future<T> submit(Callable<T> task) { 786 return schedule(task, 0, NANOSECONDS); 787 } 788 789 /** 790 * Sets the policy on whether to continue executing existing 791 * periodic tasks even when this executor has been {@code shutdown}. 792 * In this case, executions will continue until {@code shutdownNow} 793 * or the policy is set to {@code false} when already shutdown. 794 * This value is by default {@code false}. 795 * 796 * @param value if {@code true}, continue after shutdown, else don't 797 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy 798 */ setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)799 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { 800 continueExistingPeriodicTasksAfterShutdown = value; 801 if (!value && isShutdown()) 802 onShutdown(); 803 } 804 805 /** 806 * Gets the policy on whether to continue executing existing 807 * periodic tasks even when this executor has been {@code shutdown}. 808 * In this case, executions will continue until {@code shutdownNow} 809 * or the policy is set to {@code false} when already shutdown. 810 * This value is by default {@code false}. 811 * 812 * @return {@code true} if will continue after shutdown 813 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy 814 */ getContinueExistingPeriodicTasksAfterShutdownPolicy()815 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { 816 return continueExistingPeriodicTasksAfterShutdown; 817 } 818 819 /** 820 * Sets the policy on whether to execute existing delayed 821 * tasks even when this executor has been {@code shutdown}. 822 * In this case, these tasks will only terminate upon 823 * {@code shutdownNow}, or after setting the policy to 824 * {@code false} when already shutdown. 825 * This value is by default {@code true}. 826 * 827 * @param value if {@code true}, execute after shutdown, else don't 828 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy 829 */ setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)830 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { 831 executeExistingDelayedTasksAfterShutdown = value; 832 if (!value && isShutdown()) 833 onShutdown(); 834 } 835 836 /** 837 * Gets the policy on whether to execute existing delayed 838 * tasks even when this executor has been {@code shutdown}. 839 * In this case, these tasks will only terminate upon 840 * {@code shutdownNow}, or after setting the policy to 841 * {@code false} when already shutdown. 842 * This value is by default {@code true}. 843 * 844 * @return {@code true} if will execute after shutdown 845 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy 846 */ getExecuteExistingDelayedTasksAfterShutdownPolicy()847 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { 848 return executeExistingDelayedTasksAfterShutdown; 849 } 850 851 /** 852 * Sets the policy on whether cancelled tasks should be immediately 853 * removed from the work queue at time of cancellation. This value is 854 * by default {@code false}. 855 * 856 * @param value if {@code true}, remove on cancellation, else don't 857 * @see #getRemoveOnCancelPolicy 858 * @since 1.7 859 */ setRemoveOnCancelPolicy(boolean value)860 public void setRemoveOnCancelPolicy(boolean value) { 861 removeOnCancel = value; 862 } 863 864 /** 865 * Gets the policy on whether cancelled tasks should be immediately 866 * removed from the work queue at time of cancellation. This value is 867 * by default {@code false}. 868 * 869 * @return {@code true} if cancelled tasks are immediately removed 870 * from the queue 871 * @see #setRemoveOnCancelPolicy 872 * @since 1.7 873 */ getRemoveOnCancelPolicy()874 public boolean getRemoveOnCancelPolicy() { 875 return removeOnCancel; 876 } 877 878 /** 879 * Initiates an orderly shutdown in which previously submitted 880 * tasks are executed, but no new tasks will be accepted. 881 * Invocation has no additional effect if already shut down. 882 * 883 * <p>This method does not wait for previously submitted tasks to 884 * complete execution. Use {@link #awaitTermination awaitTermination} 885 * to do that. 886 * 887 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} 888 * has been set {@code false}, existing delayed tasks whose delays 889 * have not yet elapsed are cancelled. And unless the {@code 890 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set 891 * {@code true}, future executions of existing periodic tasks will 892 * be cancelled. 893 */ 894 // android-note: Removed "throws SecurityException" doc. shutdown()895 public void shutdown() { 896 super.shutdown(); 897 } 898 899 /** 900 * Attempts to stop all actively executing tasks, halts the 901 * processing of waiting tasks, and returns a list of the tasks 902 * that were awaiting execution. These tasks are drained (removed) 903 * from the task queue upon return from this method. 904 * 905 * <p>This method does not wait for actively executing tasks to 906 * terminate. Use {@link #awaitTermination awaitTermination} to 907 * do that. 908 * 909 * <p>There are no guarantees beyond best-effort attempts to stop 910 * processing actively executing tasks. This implementation 911 * interrupts tasks via {@link Thread#interrupt}; any task that 912 * fails to respond to interrupts may never terminate. 913 * 914 * @return list of tasks that never commenced execution. 915 * Each element of this list is a {@link ScheduledFuture}. 916 * For tasks submitted via one of the {@code schedule} 917 * methods, the element will be identical to the returned 918 * {@code ScheduledFuture}. For tasks submitted using 919 * {@link #execute execute}, the element will be a 920 * zero-delay {@code ScheduledFuture}. 921 */ 922 // android-note: Removed "throws SecurityException" doc. shutdownNow()923 public List<Runnable> shutdownNow() { 924 return super.shutdownNow(); 925 } 926 927 /** 928 * Returns the task queue used by this executor. Access to the 929 * task queue is intended primarily for debugging and monitoring. 930 * This queue may be in active use. Retrieving the task queue 931 * does not prevent queued tasks from executing. 932 * 933 * <p>Each element of this queue is a {@link ScheduledFuture}. 934 * For tasks submitted via one of the {@code schedule} methods, the 935 * element will be identical to the returned {@code ScheduledFuture}. 936 * For tasks submitted using {@link #execute execute}, the element 937 * will be a zero-delay {@code ScheduledFuture}. 938 * 939 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse 940 * tasks in the order in which they will execute. 941 * 942 * @return the task queue 943 */ getQueue()944 public BlockingQueue<Runnable> getQueue() { 945 return super.getQueue(); 946 } 947 948 /** 949 * Specialized delay queue. To mesh with TPE declarations, this 950 * class must be declared as a BlockingQueue<Runnable> even though 951 * it can only hold RunnableScheduledFutures. 952 */ 953 static class DelayedWorkQueue extends AbstractQueue<Runnable> 954 implements BlockingQueue<Runnable> { 955 956 /* 957 * A DelayedWorkQueue is based on a heap-based data structure 958 * like those in DelayQueue and PriorityQueue, except that 959 * every ScheduledFutureTask also records its index into the 960 * heap array. This eliminates the need to find a task upon 961 * cancellation, greatly speeding up removal (down from O(n) 962 * to O(log n)), and reducing garbage retention that would 963 * otherwise occur by waiting for the element to rise to top 964 * before clearing. But because the queue may also hold 965 * RunnableScheduledFutures that are not ScheduledFutureTasks, 966 * we are not guaranteed to have such indices available, in 967 * which case we fall back to linear search. (We expect that 968 * most tasks will not be decorated, and that the faster cases 969 * will be much more common.) 970 * 971 * All heap operations must record index changes -- mainly 972 * within siftUp and siftDown. Upon removal, a task's 973 * heapIndex is set to -1. Note that ScheduledFutureTasks can 974 * appear at most once in the queue (this need not be true for 975 * other kinds of tasks or work queues), so are uniquely 976 * identified by heapIndex. 977 */ 978 979 private static final int INITIAL_CAPACITY = 16; 980 private RunnableScheduledFuture<?>[] queue = 981 new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; 982 private final ReentrantLock lock = new ReentrantLock(); 983 private int size; 984 985 /** 986 * Thread designated to wait for the task at the head of the 987 * queue. This variant of the Leader-Follower pattern 988 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to 989 * minimize unnecessary timed waiting. When a thread becomes 990 * the leader, it waits only for the next delay to elapse, but 991 * other threads await indefinitely. The leader thread must 992 * signal some other thread before returning from take() or 993 * poll(...), unless some other thread becomes leader in the 994 * interim. Whenever the head of the queue is replaced with a 995 * task with an earlier expiration time, the leader field is 996 * invalidated by being reset to null, and some waiting 997 * thread, but not necessarily the current leader, is 998 * signalled. So waiting threads must be prepared to acquire 999 * and lose leadership while waiting. 1000 */ 1001 private Thread leader; 1002 1003 /** 1004 * Condition signalled when a newer task becomes available at the 1005 * head of the queue or a new thread may need to become leader. 1006 */ 1007 private final Condition available = lock.newCondition(); 1008 1009 /** 1010 * Sets f's heapIndex if it is a ScheduledFutureTask. 1011 */ setIndex(RunnableScheduledFuture<?> f, int idx)1012 private static void setIndex(RunnableScheduledFuture<?> f, int idx) { 1013 if (f instanceof ScheduledFutureTask) 1014 ((ScheduledFutureTask)f).heapIndex = idx; 1015 } 1016 1017 /** 1018 * Sifts element added at bottom up to its heap-ordered spot. 1019 * Call only when holding lock. 1020 */ siftUp(int k, RunnableScheduledFuture<?> key)1021 private void siftUp(int k, RunnableScheduledFuture<?> key) { 1022 while (k > 0) { 1023 int parent = (k - 1) >>> 1; 1024 RunnableScheduledFuture<?> e = queue[parent]; 1025 if (key.compareTo(e) >= 0) 1026 break; 1027 queue[k] = e; 1028 setIndex(e, k); 1029 k = parent; 1030 } 1031 queue[k] = key; 1032 setIndex(key, k); 1033 } 1034 1035 /** 1036 * Sifts element added at top down to its heap-ordered spot. 1037 * Call only when holding lock. 1038 */ siftDown(int k, RunnableScheduledFuture<?> key)1039 private void siftDown(int k, RunnableScheduledFuture<?> key) { 1040 int half = size >>> 1; 1041 while (k < half) { 1042 int child = (k << 1) + 1; 1043 RunnableScheduledFuture<?> c = queue[child]; 1044 int right = child + 1; 1045 if (right < size && c.compareTo(queue[right]) > 0) 1046 c = queue[child = right]; 1047 if (key.compareTo(c) <= 0) 1048 break; 1049 queue[k] = c; 1050 setIndex(c, k); 1051 k = child; 1052 } 1053 queue[k] = key; 1054 setIndex(key, k); 1055 } 1056 1057 /** 1058 * Resizes the heap array. Call only when holding lock. 1059 */ grow()1060 private void grow() { 1061 int oldCapacity = queue.length; 1062 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% 1063 if (newCapacity < 0) // overflow 1064 newCapacity = Integer.MAX_VALUE; 1065 queue = Arrays.copyOf(queue, newCapacity); 1066 } 1067 1068 /** 1069 * Finds index of given object, or -1 if absent. 1070 */ indexOf(Object x)1071 private int indexOf(Object x) { 1072 if (x != null) { 1073 if (x instanceof ScheduledFutureTask) { 1074 int i = ((ScheduledFutureTask) x).heapIndex; 1075 // Sanity check; x could conceivably be a 1076 // ScheduledFutureTask from some other pool. 1077 if (i >= 0 && i < size && queue[i] == x) 1078 return i; 1079 } else { 1080 for (int i = 0; i < size; i++) 1081 if (x.equals(queue[i])) 1082 return i; 1083 } 1084 } 1085 return -1; 1086 } 1087 contains(Object x)1088 public boolean contains(Object x) { 1089 final ReentrantLock lock = this.lock; 1090 lock.lock(); 1091 try { 1092 return indexOf(x) != -1; 1093 } finally { 1094 lock.unlock(); 1095 } 1096 } 1097 remove(Object x)1098 public boolean remove(Object x) { 1099 final ReentrantLock lock = this.lock; 1100 lock.lock(); 1101 try { 1102 int i = indexOf(x); 1103 if (i < 0) 1104 return false; 1105 1106 setIndex(queue[i], -1); 1107 int s = --size; 1108 RunnableScheduledFuture<?> replacement = queue[s]; 1109 queue[s] = null; 1110 if (s != i) { 1111 siftDown(i, replacement); 1112 if (queue[i] == replacement) 1113 siftUp(i, replacement); 1114 } 1115 return true; 1116 } finally { 1117 lock.unlock(); 1118 } 1119 } 1120 size()1121 public int size() { 1122 final ReentrantLock lock = this.lock; 1123 lock.lock(); 1124 try { 1125 return size; 1126 } finally { 1127 lock.unlock(); 1128 } 1129 } 1130 isEmpty()1131 public boolean isEmpty() { 1132 return size() == 0; 1133 } 1134 remainingCapacity()1135 public int remainingCapacity() { 1136 return Integer.MAX_VALUE; 1137 } 1138 peek()1139 public RunnableScheduledFuture<?> peek() { 1140 final ReentrantLock lock = this.lock; 1141 lock.lock(); 1142 try { 1143 return queue[0]; 1144 } finally { 1145 lock.unlock(); 1146 } 1147 } 1148 offer(Runnable x)1149 public boolean offer(Runnable x) { 1150 if (x == null) 1151 throw new NullPointerException(); 1152 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; 1153 final ReentrantLock lock = this.lock; 1154 lock.lock(); 1155 try { 1156 int i = size; 1157 if (i >= queue.length) 1158 grow(); 1159 size = i + 1; 1160 if (i == 0) { 1161 queue[0] = e; 1162 setIndex(e, 0); 1163 } else { 1164 siftUp(i, e); 1165 } 1166 if (queue[0] == e) { 1167 leader = null; 1168 available.signal(); 1169 } 1170 } finally { 1171 lock.unlock(); 1172 } 1173 return true; 1174 } 1175 put(Runnable e)1176 public void put(Runnable e) { 1177 offer(e); 1178 } 1179 add(Runnable e)1180 public boolean add(Runnable e) { 1181 return offer(e); 1182 } 1183 offer(Runnable e, long timeout, TimeUnit unit)1184 public boolean offer(Runnable e, long timeout, TimeUnit unit) { 1185 return offer(e); 1186 } 1187 1188 /** 1189 * Performs common bookkeeping for poll and take: Replaces 1190 * first element with last and sifts it down. Call only when 1191 * holding lock. 1192 * @param f the task to remove and return 1193 */ finishPoll(RunnableScheduledFuture<?> f)1194 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { 1195 int s = --size; 1196 RunnableScheduledFuture<?> x = queue[s]; 1197 queue[s] = null; 1198 if (s != 0) 1199 siftDown(0, x); 1200 setIndex(f, -1); 1201 return f; 1202 } 1203 poll()1204 public RunnableScheduledFuture<?> poll() { 1205 final ReentrantLock lock = this.lock; 1206 lock.lock(); 1207 try { 1208 RunnableScheduledFuture<?> first = queue[0]; 1209 return (first == null || first.getDelay(NANOSECONDS) > 0) 1210 ? null 1211 : finishPoll(first); 1212 } finally { 1213 lock.unlock(); 1214 } 1215 } 1216 take()1217 public RunnableScheduledFuture<?> take() throws InterruptedException { 1218 final ReentrantLock lock = this.lock; 1219 lock.lockInterruptibly(); 1220 try { 1221 for (;;) { 1222 RunnableScheduledFuture<?> first = queue[0]; 1223 if (first == null) 1224 available.await(); 1225 else { 1226 long delay = first.getDelay(NANOSECONDS); 1227 if (delay <= 0L) 1228 return finishPoll(first); 1229 first = null; // don't retain ref while waiting 1230 if (leader != null) 1231 available.await(); 1232 else { 1233 Thread thisThread = Thread.currentThread(); 1234 leader = thisThread; 1235 try { 1236 available.awaitNanos(delay); 1237 } finally { 1238 if (leader == thisThread) 1239 leader = null; 1240 } 1241 } 1242 } 1243 } 1244 } finally { 1245 if (leader == null && queue[0] != null) 1246 available.signal(); 1247 lock.unlock(); 1248 } 1249 } 1250 poll(long timeout, TimeUnit unit)1251 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) 1252 throws InterruptedException { 1253 long nanos = unit.toNanos(timeout); 1254 final ReentrantLock lock = this.lock; 1255 lock.lockInterruptibly(); 1256 try { 1257 for (;;) { 1258 RunnableScheduledFuture<?> first = queue[0]; 1259 if (first == null) { 1260 if (nanos <= 0L) 1261 return null; 1262 else 1263 nanos = available.awaitNanos(nanos); 1264 } else { 1265 long delay = first.getDelay(NANOSECONDS); 1266 if (delay <= 0L) 1267 return finishPoll(first); 1268 if (nanos <= 0L) 1269 return null; 1270 first = null; // don't retain ref while waiting 1271 if (nanos < delay || leader != null) 1272 nanos = available.awaitNanos(nanos); 1273 else { 1274 Thread thisThread = Thread.currentThread(); 1275 leader = thisThread; 1276 try { 1277 long timeLeft = available.awaitNanos(delay); 1278 nanos -= delay - timeLeft; 1279 } finally { 1280 if (leader == thisThread) 1281 leader = null; 1282 } 1283 } 1284 } 1285 } 1286 } finally { 1287 if (leader == null && queue[0] != null) 1288 available.signal(); 1289 lock.unlock(); 1290 } 1291 } 1292 clear()1293 public void clear() { 1294 final ReentrantLock lock = this.lock; 1295 lock.lock(); 1296 try { 1297 for (int i = 0; i < size; i++) { 1298 RunnableScheduledFuture<?> t = queue[i]; 1299 if (t != null) { 1300 queue[i] = null; 1301 setIndex(t, -1); 1302 } 1303 } 1304 size = 0; 1305 } finally { 1306 lock.unlock(); 1307 } 1308 } 1309 drainTo(Collection<? super Runnable> c)1310 public int drainTo(Collection<? super Runnable> c) { 1311 return drainTo(c, Integer.MAX_VALUE); 1312 } 1313 drainTo(Collection<? super Runnable> c, int maxElements)1314 public int drainTo(Collection<? super Runnable> c, int maxElements) { 1315 Objects.requireNonNull(c); 1316 if (c == this) 1317 throw new IllegalArgumentException(); 1318 if (maxElements <= 0) 1319 return 0; 1320 final ReentrantLock lock = this.lock; 1321 lock.lock(); 1322 try { 1323 int n = 0; 1324 for (RunnableScheduledFuture<?> first; 1325 n < maxElements 1326 && (first = queue[0]) != null 1327 && first.getDelay(NANOSECONDS) <= 0;) { 1328 c.add(first); // In this order, in case add() throws. 1329 finishPoll(first); 1330 ++n; 1331 } 1332 return n; 1333 } finally { 1334 lock.unlock(); 1335 } 1336 } 1337 toArray()1338 public Object[] toArray() { 1339 final ReentrantLock lock = this.lock; 1340 lock.lock(); 1341 try { 1342 return Arrays.copyOf(queue, size, Object[].class); 1343 } finally { 1344 lock.unlock(); 1345 } 1346 } 1347 1348 @SuppressWarnings("unchecked") toArray(T[] a)1349 public <T> T[] toArray(T[] a) { 1350 final ReentrantLock lock = this.lock; 1351 lock.lock(); 1352 try { 1353 if (a.length < size) 1354 return (T[]) Arrays.copyOf(queue, size, a.getClass()); 1355 System.arraycopy(queue, 0, a, 0, size); 1356 if (a.length > size) 1357 a[size] = null; 1358 return a; 1359 } finally { 1360 lock.unlock(); 1361 } 1362 } 1363 iterator()1364 public Iterator<Runnable> iterator() { 1365 final ReentrantLock lock = this.lock; 1366 lock.lock(); 1367 try { 1368 return new Itr(Arrays.copyOf(queue, size)); 1369 } finally { 1370 lock.unlock(); 1371 } 1372 } 1373 1374 /** 1375 * Snapshot iterator that works off copy of underlying q array. 1376 */ 1377 private class Itr implements Iterator<Runnable> { 1378 final RunnableScheduledFuture<?>[] array; 1379 int cursor; // index of next element to return; initially 0 1380 int lastRet = -1; // index of last element returned; -1 if no such 1381 Itr(RunnableScheduledFuture<?>[] array)1382 Itr(RunnableScheduledFuture<?>[] array) { 1383 this.array = array; 1384 } 1385 hasNext()1386 public boolean hasNext() { 1387 return cursor < array.length; 1388 } 1389 next()1390 public Runnable next() { 1391 if (cursor >= array.length) 1392 throw new NoSuchElementException(); 1393 return array[lastRet = cursor++]; 1394 } 1395 remove()1396 public void remove() { 1397 if (lastRet < 0) 1398 throw new IllegalStateException(); 1399 DelayedWorkQueue.this.remove(array[lastRet]); 1400 lastRet = -1; 1401 } 1402 } 1403 } 1404 } 1405