1 /* 2 * Copyright (C) 2007 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 22 import com.google.common.annotations.Beta; 23 import com.google.common.annotations.VisibleForTesting; 24 import com.google.common.base.Supplier; 25 import com.google.common.base.Throwables; 26 import com.google.common.collect.Lists; 27 import com.google.common.collect.Queues; 28 import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture; 29 30 import java.lang.reflect.InvocationTargetException; 31 import java.util.Collection; 32 import java.util.Collections; 33 import java.util.Iterator; 34 import java.util.List; 35 import java.util.concurrent.BlockingQueue; 36 import java.util.concurrent.Callable; 37 import java.util.concurrent.Delayed; 38 import java.util.concurrent.ExecutionException; 39 import java.util.concurrent.Executor; 40 import java.util.concurrent.ExecutorService; 41 import java.util.concurrent.Executors; 42 import java.util.concurrent.Future; 43 import java.util.concurrent.RejectedExecutionException; 44 import java.util.concurrent.ScheduledExecutorService; 45 import java.util.concurrent.ScheduledFuture; 46 import java.util.concurrent.ScheduledThreadPoolExecutor; 47 import java.util.concurrent.ThreadFactory; 48 import java.util.concurrent.ThreadPoolExecutor; 49 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; 50 import java.util.concurrent.TimeUnit; 51 import java.util.concurrent.TimeoutException; 52 import java.util.concurrent.locks.Condition; 53 import java.util.concurrent.locks.Lock; 54 import java.util.concurrent.locks.ReentrantLock; 55 56 /** 57 * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link 58 * ExecutorService}, and {@link ThreadFactory}. 59 * 60 * @author Eric Fellheimer 61 * @author Kyle Littlefield 62 * @author Justin Mahoney 63 * @since 3.0 64 */ 65 public final class MoreExecutors { MoreExecutors()66 private MoreExecutors() {} 67 68 /** 69 * Converts the given ThreadPoolExecutor into an ExecutorService that exits 70 * when the application is complete. It does so by using daemon threads and 71 * adding a shutdown hook to wait for their completion. 72 * 73 * <p>This is mainly for fixed thread pools. 74 * See {@link Executors#newFixedThreadPool(int)}. 75 * 76 * @param executor the executor to modify to make sure it exits when the 77 * application is finished 78 * @param terminationTimeout how long to wait for the executor to 79 * finish before terminating the JVM 80 * @param timeUnit unit of time for the time parameter 81 * @return an unmodifiable version of the input which will not hang the JVM 82 */ 83 @Beta getExitingExecutorService( ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit)84 public static ExecutorService getExitingExecutorService( 85 ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) { 86 return new Application() 87 .getExitingExecutorService(executor, terminationTimeout, timeUnit); 88 } 89 90 /** 91 * Converts the given ScheduledThreadPoolExecutor into a 92 * ScheduledExecutorService that exits when the application is complete. It 93 * does so by using daemon threads and adding a shutdown hook to wait for 94 * their completion. 95 * 96 * <p>This is mainly for fixed thread pools. 97 * See {@link Executors#newScheduledThreadPool(int)}. 98 * 99 * @param executor the executor to modify to make sure it exits when the 100 * application is finished 101 * @param terminationTimeout how long to wait for the executor to 102 * finish before terminating the JVM 103 * @param timeUnit unit of time for the time parameter 104 * @return an unmodifiable version of the input which will not hang the JVM 105 */ 106 @Beta getExitingScheduledExecutorService( ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit)107 public static ScheduledExecutorService getExitingScheduledExecutorService( 108 ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) { 109 return new Application() 110 .getExitingScheduledExecutorService(executor, terminationTimeout, timeUnit); 111 } 112 113 /** 114 * Add a shutdown hook to wait for thread completion in the given 115 * {@link ExecutorService service}. This is useful if the given service uses 116 * daemon threads, and we want to keep the JVM from exiting immediately on 117 * shutdown, instead giving these daemon threads a chance to terminate 118 * normally. 119 * @param service ExecutorService which uses daemon threads 120 * @param terminationTimeout how long to wait for the executor to finish 121 * before terminating the JVM 122 * @param timeUnit unit of time for the time parameter 123 */ 124 @Beta addDelayedShutdownHook( ExecutorService service, long terminationTimeout, TimeUnit timeUnit)125 public static void addDelayedShutdownHook( 126 ExecutorService service, long terminationTimeout, TimeUnit timeUnit) { 127 new Application() 128 .addDelayedShutdownHook(service, terminationTimeout, timeUnit); 129 } 130 131 /** 132 * Converts the given ThreadPoolExecutor into an ExecutorService that exits 133 * when the application is complete. It does so by using daemon threads and 134 * adding a shutdown hook to wait for their completion. 135 * 136 * <p>This method waits 120 seconds before continuing with JVM termination, 137 * even if the executor has not finished its work. 138 * 139 * <p>This is mainly for fixed thread pools. 140 * See {@link Executors#newFixedThreadPool(int)}. 141 * 142 * @param executor the executor to modify to make sure it exits when the 143 * application is finished 144 * @return an unmodifiable version of the input which will not hang the JVM 145 */ 146 @Beta getExitingExecutorService(ThreadPoolExecutor executor)147 public static ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) { 148 return new Application().getExitingExecutorService(executor); 149 } 150 151 /** 152 * Converts the given ThreadPoolExecutor into a ScheduledExecutorService that 153 * exits when the application is complete. It does so by using daemon threads 154 * and adding a shutdown hook to wait for their completion. 155 * 156 * <p>This method waits 120 seconds before continuing with JVM termination, 157 * even if the executor has not finished its work. 158 * 159 * <p>This is mainly for fixed thread pools. 160 * See {@link Executors#newScheduledThreadPool(int)}. 161 * 162 * @param executor the executor to modify to make sure it exits when the 163 * application is finished 164 * @return an unmodifiable version of the input which will not hang the JVM 165 */ 166 @Beta getExitingScheduledExecutorService( ScheduledThreadPoolExecutor executor)167 public static ScheduledExecutorService getExitingScheduledExecutorService( 168 ScheduledThreadPoolExecutor executor) { 169 return new Application().getExitingScheduledExecutorService(executor); 170 } 171 172 /** Represents the current application to register shutdown hooks. */ 173 @VisibleForTesting static class Application { 174 getExitingExecutorService( ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit)175 final ExecutorService getExitingExecutorService( 176 ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) { 177 useDaemonThreadFactory(executor); 178 ExecutorService service = Executors.unconfigurableExecutorService(executor); 179 addDelayedShutdownHook(service, terminationTimeout, timeUnit); 180 return service; 181 } 182 getExitingScheduledExecutorService( ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit)183 final ScheduledExecutorService getExitingScheduledExecutorService( 184 ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) { 185 useDaemonThreadFactory(executor); 186 ScheduledExecutorService service = Executors.unconfigurableScheduledExecutorService(executor); 187 addDelayedShutdownHook(service, terminationTimeout, timeUnit); 188 return service; 189 } 190 addDelayedShutdownHook( final ExecutorService service, final long terminationTimeout, final TimeUnit timeUnit)191 final void addDelayedShutdownHook( 192 final ExecutorService service, final long terminationTimeout, final TimeUnit timeUnit) { 193 checkNotNull(service); 194 checkNotNull(timeUnit); 195 addShutdownHook(MoreExecutors.newThread("DelayedShutdownHook-for-" + service, new Runnable() { 196 @Override 197 public void run() { 198 try { 199 // We'd like to log progress and failures that may arise in the 200 // following code, but unfortunately the behavior of logging 201 // is undefined in shutdown hooks. 202 // This is because the logging code installs a shutdown hook of its 203 // own. See Cleaner class inside {@link LogManager}. 204 service.shutdown(); 205 service.awaitTermination(terminationTimeout, timeUnit); 206 } catch (InterruptedException ignored) { 207 // We're shutting down anyway, so just ignore. 208 } 209 } 210 })); 211 } 212 getExitingExecutorService(ThreadPoolExecutor executor)213 final ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) { 214 return getExitingExecutorService(executor, 120, TimeUnit.SECONDS); 215 } 216 getExitingScheduledExecutorService( ScheduledThreadPoolExecutor executor)217 final ScheduledExecutorService getExitingScheduledExecutorService( 218 ScheduledThreadPoolExecutor executor) { 219 return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS); 220 } 221 addShutdownHook(Thread hook)222 @VisibleForTesting void addShutdownHook(Thread hook) { 223 Runtime.getRuntime().addShutdownHook(hook); 224 } 225 } 226 useDaemonThreadFactory(ThreadPoolExecutor executor)227 private static void useDaemonThreadFactory(ThreadPoolExecutor executor) { 228 executor.setThreadFactory(new ThreadFactoryBuilder() 229 .setDaemon(true) 230 .setThreadFactory(executor.getThreadFactory()) 231 .build()); 232 } 233 234 /** 235 * Creates an executor service that runs each task in the thread 236 * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy} This 237 * applies both to individually submitted tasks and to collections of tasks 238 * submitted via {@code invokeAll} or {@code invokeAny}. In the latter case, 239 * tasks will run serially on the calling thread. Tasks are run to 240 * completion before a {@code Future} is returned to the caller (unless the 241 * executor has been shutdown). 242 * 243 * <p>Although all tasks are immediately executed in the thread that 244 * submitted the task, this {@code ExecutorService} imposes a small 245 * locking overhead on each task submission in order to implement shutdown 246 * and termination behavior. 247 * 248 * <p>The implementation deviates from the {@code ExecutorService} 249 * specification with regards to the {@code shutdownNow} method. First, 250 * "best-effort" with regards to canceling running tasks is implemented 251 * as "no-effort". No interrupts or other attempts are made to stop 252 * threads executing tasks. Second, the returned list will always be empty, 253 * as any submitted task is considered to have started execution. 254 * This applies also to tasks given to {@code invokeAll} or {@code invokeAny} 255 * which are pending serial execution, even the subset of the tasks that 256 * have not yet started execution. It is unclear from the 257 * {@code ExecutorService} specification if these should be included, and 258 * it's much easier to implement the interpretation that they not be. 259 * Finally, a call to {@code shutdown} or {@code shutdownNow} may result 260 * in concurrent calls to {@code invokeAll/invokeAny} throwing 261 * RejectedExecutionException, although a subset of the tasks may already 262 * have been executed. 263 * 264 * @since 10.0 (<a href="http://code.google.com/p/guava-libraries/wiki/Compatibility" 265 * >mostly source-compatible</a> since 3.0) 266 * @deprecated Use {@link #directExecutor()} if you only require an {@link Executor} and 267 * {@link #newDirectExecutorService()} if you need a {@link ListeningExecutorService}. 268 */ sameThreadExecutor()269 @Deprecated public static ListeningExecutorService sameThreadExecutor() { 270 return new DirectExecutorService(); 271 } 272 273 // See sameThreadExecutor javadoc for behavioral notes. 274 private static class DirectExecutorService 275 extends AbstractListeningExecutorService { 276 /** 277 * Lock used whenever accessing the state variables 278 * (runningTasks, shutdown, terminationCondition) of the executor 279 */ 280 private final Lock lock = new ReentrantLock(); 281 282 /** Signaled after the executor is shutdown and running tasks are done */ 283 private final Condition termination = lock.newCondition(); 284 285 /* 286 * Conceptually, these two variables describe the executor being in 287 * one of three states: 288 * - Active: shutdown == false 289 * - Shutdown: runningTasks > 0 and shutdown == true 290 * - Terminated: runningTasks == 0 and shutdown == true 291 */ 292 private int runningTasks = 0; 293 private boolean shutdown = false; 294 295 @Override execute(Runnable command)296 public void execute(Runnable command) { 297 startTask(); 298 try { 299 command.run(); 300 } finally { 301 endTask(); 302 } 303 } 304 305 @Override isShutdown()306 public boolean isShutdown() { 307 lock.lock(); 308 try { 309 return shutdown; 310 } finally { 311 lock.unlock(); 312 } 313 } 314 315 @Override shutdown()316 public void shutdown() { 317 lock.lock(); 318 try { 319 shutdown = true; 320 } finally { 321 lock.unlock(); 322 } 323 } 324 325 // See sameThreadExecutor javadoc for unusual behavior of this method. 326 @Override shutdownNow()327 public List<Runnable> shutdownNow() { 328 shutdown(); 329 return Collections.emptyList(); 330 } 331 332 @Override isTerminated()333 public boolean isTerminated() { 334 lock.lock(); 335 try { 336 return shutdown && runningTasks == 0; 337 } finally { 338 lock.unlock(); 339 } 340 } 341 342 @Override awaitTermination(long timeout, TimeUnit unit)343 public boolean awaitTermination(long timeout, TimeUnit unit) 344 throws InterruptedException { 345 long nanos = unit.toNanos(timeout); 346 lock.lock(); 347 try { 348 for (;;) { 349 if (isTerminated()) { 350 return true; 351 } else if (nanos <= 0) { 352 return false; 353 } else { 354 nanos = termination.awaitNanos(nanos); 355 } 356 } 357 } finally { 358 lock.unlock(); 359 } 360 } 361 362 /** 363 * Checks if the executor has been shut down and increments the running 364 * task count. 365 * 366 * @throws RejectedExecutionException if the executor has been previously 367 * shutdown 368 */ startTask()369 private void startTask() { 370 lock.lock(); 371 try { 372 if (isShutdown()) { 373 throw new RejectedExecutionException("Executor already shutdown"); 374 } 375 runningTasks++; 376 } finally { 377 lock.unlock(); 378 } 379 } 380 381 /** 382 * Decrements the running task count. 383 */ endTask()384 private void endTask() { 385 lock.lock(); 386 try { 387 runningTasks--; 388 if (isTerminated()) { 389 termination.signalAll(); 390 } 391 } finally { 392 lock.unlock(); 393 } 394 } 395 } 396 397 /** 398 * Creates an executor service that runs each task in the thread 399 * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy} This 400 * applies both to individually submitted tasks and to collections of tasks 401 * submitted via {@code invokeAll} or {@code invokeAny}. In the latter case, 402 * tasks will run serially on the calling thread. Tasks are run to 403 * completion before a {@code Future} is returned to the caller (unless the 404 * executor has been shutdown). 405 * 406 * <p>Although all tasks are immediately executed in the thread that 407 * submitted the task, this {@code ExecutorService} imposes a small 408 * locking overhead on each task submission in order to implement shutdown 409 * and termination behavior. 410 * 411 * <p>The implementation deviates from the {@code ExecutorService} 412 * specification with regards to the {@code shutdownNow} method. First, 413 * "best-effort" with regards to canceling running tasks is implemented 414 * as "no-effort". No interrupts or other attempts are made to stop 415 * threads executing tasks. Second, the returned list will always be empty, 416 * as any submitted task is considered to have started execution. 417 * This applies also to tasks given to {@code invokeAll} or {@code invokeAny} 418 * which are pending serial execution, even the subset of the tasks that 419 * have not yet started execution. It is unclear from the 420 * {@code ExecutorService} specification if these should be included, and 421 * it's much easier to implement the interpretation that they not be. 422 * Finally, a call to {@code shutdown} or {@code shutdownNow} may result 423 * in concurrent calls to {@code invokeAll/invokeAny} throwing 424 * RejectedExecutionException, although a subset of the tasks may already 425 * have been executed. 426 * 427 * @since 18.0 (present as MoreExecutors.sameThreadExecutor() since 10.0) 428 */ newDirectExecutorService()429 public static ListeningExecutorService newDirectExecutorService() { 430 return new DirectExecutorService(); 431 } 432 433 /** 434 * Returns an {@link Executor} that runs each task in the thread that invokes 435 * {@link Executor#execute execute}, as in {@link CallerRunsPolicy}. 436 * 437 * <p>This instance is equivalent to: <pre> {@code 438 * final class DirectExecutor implements Executor { 439 * public void execute(Runnable r) { 440 * r.run(); 441 * } 442 * }}</pre> 443 * 444 * <p>This should be preferred to {@link #newDirectExecutorService()} because the implementing the 445 * {@link ExecutorService} subinterface necessitates significant performance overhead. 446 * 447 * @since 18.0 448 */ directExecutor()449 public static Executor directExecutor() { 450 return DirectExecutor.INSTANCE; 451 } 452 453 /** See {@link #directExecutor} for behavioral notes. */ 454 private enum DirectExecutor implements Executor { 455 INSTANCE; execute(Runnable command)456 @Override public void execute(Runnable command) { 457 command.run(); 458 } 459 } 460 461 /** 462 * Creates an {@link ExecutorService} whose {@code submit} and {@code 463 * invokeAll} methods submit {@link ListenableFutureTask} instances to the 464 * given delegate executor. Those methods, as well as {@code execute} and 465 * {@code invokeAny}, are implemented in terms of calls to {@code 466 * delegate.execute}. All other methods are forwarded unchanged to the 467 * delegate. This implies that the returned {@code ListeningExecutorService} 468 * never calls the delegate's {@code submit}, {@code invokeAll}, and {@code 469 * invokeAny} methods, so any special handling of tasks must be implemented in 470 * the delegate's {@code execute} method or by wrapping the returned {@code 471 * ListeningExecutorService}. 472 * 473 * <p>If the delegate executor was already an instance of {@code 474 * ListeningExecutorService}, it is returned untouched, and the rest of this 475 * documentation does not apply. 476 * 477 * @since 10.0 478 */ listeningDecorator( ExecutorService delegate)479 public static ListeningExecutorService listeningDecorator( 480 ExecutorService delegate) { 481 return (delegate instanceof ListeningExecutorService) 482 ? (ListeningExecutorService) delegate 483 : (delegate instanceof ScheduledExecutorService) 484 ? new ScheduledListeningDecorator((ScheduledExecutorService) delegate) 485 : new ListeningDecorator(delegate); 486 } 487 488 /** 489 * Creates a {@link ScheduledExecutorService} whose {@code submit} and {@code 490 * invokeAll} methods submit {@link ListenableFutureTask} instances to the 491 * given delegate executor. Those methods, as well as {@code execute} and 492 * {@code invokeAny}, are implemented in terms of calls to {@code 493 * delegate.execute}. All other methods are forwarded unchanged to the 494 * delegate. This implies that the returned {@code 495 * ListeningScheduledExecutorService} never calls the delegate's {@code 496 * submit}, {@code invokeAll}, and {@code invokeAny} methods, so any special 497 * handling of tasks must be implemented in the delegate's {@code execute} 498 * method or by wrapping the returned {@code 499 * ListeningScheduledExecutorService}. 500 * 501 * <p>If the delegate executor was already an instance of {@code 502 * ListeningScheduledExecutorService}, it is returned untouched, and the rest 503 * of this documentation does not apply. 504 * 505 * @since 10.0 506 */ listeningDecorator( ScheduledExecutorService delegate)507 public static ListeningScheduledExecutorService listeningDecorator( 508 ScheduledExecutorService delegate) { 509 return (delegate instanceof ListeningScheduledExecutorService) 510 ? (ListeningScheduledExecutorService) delegate 511 : new ScheduledListeningDecorator(delegate); 512 } 513 514 private static class ListeningDecorator 515 extends AbstractListeningExecutorService { 516 private final ExecutorService delegate; 517 ListeningDecorator(ExecutorService delegate)518 ListeningDecorator(ExecutorService delegate) { 519 this.delegate = checkNotNull(delegate); 520 } 521 522 @Override awaitTermination(long timeout, TimeUnit unit)523 public boolean awaitTermination(long timeout, TimeUnit unit) 524 throws InterruptedException { 525 return delegate.awaitTermination(timeout, unit); 526 } 527 528 @Override isShutdown()529 public boolean isShutdown() { 530 return delegate.isShutdown(); 531 } 532 533 @Override isTerminated()534 public boolean isTerminated() { 535 return delegate.isTerminated(); 536 } 537 538 @Override shutdown()539 public void shutdown() { 540 delegate.shutdown(); 541 } 542 543 @Override shutdownNow()544 public List<Runnable> shutdownNow() { 545 return delegate.shutdownNow(); 546 } 547 548 @Override execute(Runnable command)549 public void execute(Runnable command) { 550 delegate.execute(command); 551 } 552 } 553 554 private static class ScheduledListeningDecorator 555 extends ListeningDecorator implements ListeningScheduledExecutorService { 556 @SuppressWarnings("hiding") 557 final ScheduledExecutorService delegate; 558 ScheduledListeningDecorator(ScheduledExecutorService delegate)559 ScheduledListeningDecorator(ScheduledExecutorService delegate) { 560 super(delegate); 561 this.delegate = checkNotNull(delegate); 562 } 563 564 @Override schedule( Runnable command, long delay, TimeUnit unit)565 public ListenableScheduledFuture<?> schedule( 566 Runnable command, long delay, TimeUnit unit) { 567 ListenableFutureTask<Void> task = 568 ListenableFutureTask.create(command, null); 569 ScheduledFuture<?> scheduled = delegate.schedule(task, delay, unit); 570 return new ListenableScheduledTask<Void>(task, scheduled); 571 } 572 573 @Override schedule( Callable<V> callable, long delay, TimeUnit unit)574 public <V> ListenableScheduledFuture<V> schedule( 575 Callable<V> callable, long delay, TimeUnit unit) { 576 ListenableFutureTask<V> task = ListenableFutureTask.create(callable); 577 ScheduledFuture<?> scheduled = delegate.schedule(task, delay, unit); 578 return new ListenableScheduledTask<V>(task, scheduled); 579 } 580 581 @Override scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit)582 public ListenableScheduledFuture<?> scheduleAtFixedRate( 583 Runnable command, long initialDelay, long period, TimeUnit unit) { 584 NeverSuccessfulListenableFutureTask task = 585 new NeverSuccessfulListenableFutureTask(command); 586 ScheduledFuture<?> scheduled = 587 delegate.scheduleAtFixedRate(task, initialDelay, period, unit); 588 return new ListenableScheduledTask<Void>(task, scheduled); 589 } 590 591 @Override scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit)592 public ListenableScheduledFuture<?> scheduleWithFixedDelay( 593 Runnable command, long initialDelay, long delay, TimeUnit unit) { 594 NeverSuccessfulListenableFutureTask task = 595 new NeverSuccessfulListenableFutureTask(command); 596 ScheduledFuture<?> scheduled = 597 delegate.scheduleWithFixedDelay(task, initialDelay, delay, unit); 598 return new ListenableScheduledTask<Void>(task, scheduled); 599 } 600 601 private static final class ListenableScheduledTask<V> 602 extends SimpleForwardingListenableFuture<V> 603 implements ListenableScheduledFuture<V> { 604 605 private final ScheduledFuture<?> scheduledDelegate; 606 ListenableScheduledTask( ListenableFuture<V> listenableDelegate, ScheduledFuture<?> scheduledDelegate)607 public ListenableScheduledTask( 608 ListenableFuture<V> listenableDelegate, 609 ScheduledFuture<?> scheduledDelegate) { 610 super(listenableDelegate); 611 this.scheduledDelegate = scheduledDelegate; 612 } 613 614 @Override cancel(boolean mayInterruptIfRunning)615 public boolean cancel(boolean mayInterruptIfRunning) { 616 boolean cancelled = super.cancel(mayInterruptIfRunning); 617 if (cancelled) { 618 // Unless it is cancelled, the delegate may continue being scheduled 619 scheduledDelegate.cancel(mayInterruptIfRunning); 620 621 // TODO(user): Cancel "this" if "scheduledDelegate" is cancelled. 622 } 623 return cancelled; 624 } 625 626 @Override getDelay(TimeUnit unit)627 public long getDelay(TimeUnit unit) { 628 return scheduledDelegate.getDelay(unit); 629 } 630 631 @Override compareTo(Delayed other)632 public int compareTo(Delayed other) { 633 return scheduledDelegate.compareTo(other); 634 } 635 } 636 637 private static final class NeverSuccessfulListenableFutureTask 638 extends AbstractFuture<Void> 639 implements Runnable { 640 private final Runnable delegate; 641 NeverSuccessfulListenableFutureTask(Runnable delegate)642 public NeverSuccessfulListenableFutureTask(Runnable delegate) { 643 this.delegate = checkNotNull(delegate); 644 } 645 run()646 @Override public void run() { 647 try { 648 delegate.run(); 649 } catch (Throwable t) { 650 setException(t); 651 throw Throwables.propagate(t); 652 } 653 } 654 } 655 } 656 657 /* 658 * This following method is a modified version of one found in 659 * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/AbstractExecutorServiceTest.java?revision=1.30 660 * which contained the following notice: 661 * 662 * Written by Doug Lea with assistance from members of JCP JSR-166 663 * Expert Group and released to the public domain, as explained at 664 * http://creativecommons.org/publicdomain/zero/1.0/ 665 * Other contributors include Andrew Wright, Jeffrey Hayes, 666 * Pat Fisher, Mike Judd. 667 */ 668 669 /** 670 * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService} 671 * implementations. invokeAnyImpl(ListeningExecutorService executorService, Collection<? extends Callable<T>> tasks, boolean timed, long nanos)672 */ static <T> T invokeAnyImpl(ListeningExecutorService executorService, 673 Collection<? extends Callable<T>> tasks, boolean timed, long nanos) 674 throws InterruptedException, ExecutionException, TimeoutException { 675 checkNotNull(executorService); 676 int ntasks = tasks.size(); 677 checkArgument(ntasks > 0); 678 List<Future<T>> futures = Lists.newArrayListWithCapacity(ntasks); 679 BlockingQueue<Future<T>> futureQueue = Queues.newLinkedBlockingQueue(); 680 681 // For efficiency, especially in executors with limited 682 // parallelism, check to see if previously submitted tasks are 683 // done before submitting more of them. This interleaving 684 // plus the exception mechanics account for messiness of main 685 // loop. 686 687 try { 688 // Record exceptions so that if we fail to obtain any 689 // result, we can throw the last exception we got. 690 ExecutionException ee = null; 691 long lastTime = timed ? System.nanoTime() : 0; 692 Iterator<? extends Callable<T>> it = tasks.iterator(); 693 694 futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue)); 695 --ntasks; 696 int active = 1; 697 698 for (;;) { 699 Future<T> f = futureQueue.poll(); 700 if (f == null) { 701 if (ntasks > 0) { 702 --ntasks; 703 futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue)); 704 ++active; 705 } else if (active == 0) { 706 break; 707 } else if (timed) { 708 f = futureQueue.poll(nanos, TimeUnit.NANOSECONDS); 709 if (f == null) { 710 throw new TimeoutException(); 711 } 712 long now = System.nanoTime(); 713 nanos -= now - lastTime; 714 lastTime = now; 715 } else { 716 f = futureQueue.take(); 717 } 718 } 719 if (f != null) { 720 --active; 721 try { 722 return f.get(); 723 } catch (ExecutionException eex) { 724 ee = eex; 725 } catch (RuntimeException rex) { 726 ee = new ExecutionException(rex); 727 } 728 } 729 } 730 731 if (ee == null) { 732 ee = new ExecutionException(null); 733 } 734 throw ee; 735 } finally { 736 for (Future<T> f : futures) { 737 f.cancel(true); 738 } 739 } 740 } 741 742 /** 743 * Submits the task and adds a listener that adds the future to {@code queue} when it completes. 744 */ submitAndAddQueueListener( ListeningExecutorService executorService, Callable<T> task, final BlockingQueue<Future<T>> queue)745 private static <T> ListenableFuture<T> submitAndAddQueueListener( 746 ListeningExecutorService executorService, Callable<T> task, 747 final BlockingQueue<Future<T>> queue) { 748 final ListenableFuture<T> future = executorService.submit(task); 749 future.addListener(new Runnable() { 750 @Override public void run() { 751 queue.add(future); 752 } 753 }, directExecutor()); 754 return future; 755 } 756 757 /** 758 * Returns a default thread factory used to create new threads. 759 * 760 * <p>On AppEngine, returns {@code ThreadManager.currentRequestThreadFactory()}. 761 * Otherwise, returns {@link Executors#defaultThreadFactory()}. 762 * 763 * @since 14.0 764 */ 765 @Beta platformThreadFactory()766 public static ThreadFactory platformThreadFactory() { 767 if (!isAppEngine()) { 768 return Executors.defaultThreadFactory(); 769 } 770 try { 771 return (ThreadFactory) Class.forName("com.google.appengine.api.ThreadManager") 772 .getMethod("currentRequestThreadFactory") 773 .invoke(null); 774 } catch (IllegalAccessException e) { 775 throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e); 776 } catch (ClassNotFoundException e) { 777 throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e); 778 } catch (NoSuchMethodException e) { 779 throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e); 780 } catch (InvocationTargetException e) { 781 throw Throwables.propagate(e.getCause()); 782 } 783 } 784 isAppEngine()785 private static boolean isAppEngine() { 786 if (System.getProperty("com.google.appengine.runtime.environment") == null) { 787 return false; 788 } 789 try { 790 // If the current environment is null, we're not inside AppEngine. 791 return Class.forName("com.google.apphosting.api.ApiProxy") 792 .getMethod("getCurrentEnvironment") 793 .invoke(null) != null; 794 } catch (ClassNotFoundException e) { 795 // If ApiProxy doesn't exist, we're not on AppEngine at all. 796 return false; 797 } catch (InvocationTargetException e) { 798 // If ApiProxy throws an exception, we're not in a proper AppEngine environment. 799 return false; 800 } catch (IllegalAccessException e) { 801 // If the method isn't accessible, we're not on a supported version of AppEngine; 802 return false; 803 } catch (NoSuchMethodException e) { 804 // If the method doesn't exist, we're not on a supported version of AppEngine; 805 return false; 806 } 807 } 808 809 /** 810 * Creates a thread using {@link #platformThreadFactory}, and sets its name to {@code name} 811 * unless changing the name is forbidden by the security manager. 812 */ newThread(String name, Runnable runnable)813 static Thread newThread(String name, Runnable runnable) { 814 checkNotNull(name); 815 checkNotNull(runnable); 816 Thread result = platformThreadFactory().newThread(runnable); 817 try { 818 result.setName(name); 819 } catch (SecurityException e) { 820 // OK if we can't set the name in this environment. 821 } 822 return result; 823 } 824 825 // TODO(user): provide overloads for ListeningExecutorService? ListeningScheduledExecutorService? 826 // TODO(user): provide overloads that take constant strings? Function<Runnable, String>s to 827 // calculate names? 828 829 /** 830 * Creates an {@link Executor} that renames the {@link Thread threads} that its tasks run in. 831 * 832 * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed 833 * right before each task is run. The renaming is best effort, if a {@link SecurityManager} 834 * prevents the renaming then it will be skipped but the tasks will still execute. 835 * 836 * 837 * @param executor The executor to decorate 838 * @param nameSupplier The source of names for each task 839 */ renamingDecorator(final Executor executor, final Supplier<String> nameSupplier)840 static Executor renamingDecorator(final Executor executor, final Supplier<String> nameSupplier) { 841 checkNotNull(executor); 842 checkNotNull(nameSupplier); 843 if (isAppEngine()) { 844 // AppEngine doesn't support thread renaming, so don't even try 845 return executor; 846 } 847 return new Executor() { 848 @Override public void execute(Runnable command) { 849 executor.execute(Callables.threadRenaming(command, nameSupplier)); 850 } 851 }; 852 } 853 854 /** 855 * Creates an {@link ExecutorService} that renames the {@link Thread threads} that its tasks run 856 * in. 857 * 858 * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed 859 * right before each task is run. The renaming is best effort, if a {@link SecurityManager} 860 * prevents the renaming then it will be skipped but the tasks will still execute. 861 * 862 * 863 * @param service The executor to decorate 864 * @param nameSupplier The source of names for each task 865 */ 866 static ExecutorService renamingDecorator(final ExecutorService service, 867 final Supplier<String> nameSupplier) { 868 checkNotNull(service); 869 checkNotNull(nameSupplier); 870 if (isAppEngine()) { 871 // AppEngine doesn't support thread renaming, so don't even try. 872 return service; 873 } 874 return new WrappingExecutorService(service) { 875 @Override protected <T> Callable<T> wrapTask(Callable<T> callable) { 876 return Callables.threadRenaming(callable, nameSupplier); 877 } 878 @Override protected Runnable wrapTask(Runnable command) { 879 return Callables.threadRenaming(command, nameSupplier); 880 } 881 }; 882 } 883 884 /** 885 * Creates a {@link ScheduledExecutorService} that renames the {@link Thread threads} that its 886 * tasks run in. 887 * 888 * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed 889 * right before each task is run. The renaming is best effort, if a {@link SecurityManager} 890 * prevents the renaming then it will be skipped but the tasks will still execute. 891 * 892 * 893 * @param service The executor to decorate 894 * @param nameSupplier The source of names for each task 895 */ 896 static ScheduledExecutorService renamingDecorator(final ScheduledExecutorService service, 897 final Supplier<String> nameSupplier) { 898 checkNotNull(service); 899 checkNotNull(nameSupplier); 900 if (isAppEngine()) { 901 // AppEngine doesn't support thread renaming, so don't even try. 902 return service; 903 } 904 return new WrappingScheduledExecutorService(service) { 905 @Override protected <T> Callable<T> wrapTask(Callable<T> callable) { 906 return Callables.threadRenaming(callable, nameSupplier); 907 } 908 @Override protected Runnable wrapTask(Runnable command) { 909 return Callables.threadRenaming(command, nameSupplier); 910 } 911 }; 912 } 913 914 /** 915 * Shuts down the given executor gradually, first disabling new submissions and later cancelling 916 * existing tasks. 917 * 918 * <p>The method takes the following steps: 919 * <ol> 920 * <li>calls {@link ExecutorService#shutdown()}, disabling acceptance of new submitted tasks. 921 * <li>waits for half of the specified timeout. 922 * <li>if the timeout expires, it calls {@link ExecutorService#shutdownNow()}, cancelling 923 * pending tasks and interrupting running tasks. 924 * <li>waits for the other half of the specified timeout. 925 * </ol> 926 * 927 * <p>If, at any step of the process, the given executor is terminated or the calling thread is 928 * interrupted, the method calls {@link ExecutorService#shutdownNow()}, cancelling 929 * pending tasks and interrupting running tasks. 930 * 931 * @param service the {@code ExecutorService} to shut down 932 * @param timeout the maximum time to wait for the {@code ExecutorService} to terminate 933 * @param unit the time unit of the timeout argument 934 * @return {@code true} if the pool was terminated successfully, {@code false} if the 935 * {@code ExecutorService} could not terminate <b>or</b> the thread running this method 936 * is interrupted while waiting for the {@code ExecutorService} to terminate 937 * @since 17.0 938 */ 939 @Beta 940 public static boolean shutdownAndAwaitTermination( 941 ExecutorService service, long timeout, TimeUnit unit) { 942 checkNotNull(unit); 943 // Disable new tasks from being submitted 944 service.shutdown(); 945 try { 946 long halfTimeoutNanos = TimeUnit.NANOSECONDS.convert(timeout, unit) / 2; 947 // Wait for half the duration of the timeout for existing tasks to terminate 948 if (!service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS)) { 949 // Cancel currently executing tasks 950 service.shutdownNow(); 951 // Wait the other half of the timeout for tasks to respond to being cancelled 952 service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS); 953 } 954 } catch (InterruptedException ie) { 955 // Preserve interrupt status 956 Thread.currentThread().interrupt(); 957 // (Re-)Cancel if current thread also interrupted 958 service.shutdownNow(); 959 } 960 return service.isTerminated(); 961 } 962 } 963