• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2007 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 
22 import com.google.common.annotations.Beta;
23 import com.google.common.annotations.VisibleForTesting;
24 import com.google.common.base.Supplier;
25 import com.google.common.base.Throwables;
26 import com.google.common.collect.Lists;
27 import com.google.common.collect.Queues;
28 import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
29 
30 import java.lang.reflect.InvocationTargetException;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.concurrent.BlockingQueue;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.Delayed;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.Executor;
40 import java.util.concurrent.ExecutorService;
41 import java.util.concurrent.Executors;
42 import java.util.concurrent.Future;
43 import java.util.concurrent.RejectedExecutionException;
44 import java.util.concurrent.ScheduledExecutorService;
45 import java.util.concurrent.ScheduledFuture;
46 import java.util.concurrent.ScheduledThreadPoolExecutor;
47 import java.util.concurrent.ThreadFactory;
48 import java.util.concurrent.ThreadPoolExecutor;
49 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import java.util.concurrent.locks.Condition;
53 import java.util.concurrent.locks.Lock;
54 import java.util.concurrent.locks.ReentrantLock;
55 
56 /**
57  * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link
58  * ExecutorService}, and {@link ThreadFactory}.
59  *
60  * @author Eric Fellheimer
61  * @author Kyle Littlefield
62  * @author Justin Mahoney
63  * @since 3.0
64  */
65 public final class MoreExecutors {
MoreExecutors()66   private MoreExecutors() {}
67 
68   /**
69    * Converts the given ThreadPoolExecutor into an ExecutorService that exits
70    * when the application is complete.  It does so by using daemon threads and
71    * adding a shutdown hook to wait for their completion.
72    *
73    * <p>This is mainly for fixed thread pools.
74    * See {@link Executors#newFixedThreadPool(int)}.
75    *
76    * @param executor the executor to modify to make sure it exits when the
77    *        application is finished
78    * @param terminationTimeout how long to wait for the executor to
79    *        finish before terminating the JVM
80    * @param timeUnit unit of time for the time parameter
81    * @return an unmodifiable version of the input which will not hang the JVM
82    */
83   @Beta
getExitingExecutorService( ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit)84   public static ExecutorService getExitingExecutorService(
85       ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
86     return new Application()
87         .getExitingExecutorService(executor, terminationTimeout, timeUnit);
88   }
89 
90   /**
91    * Converts the given ScheduledThreadPoolExecutor into a
92    * ScheduledExecutorService that exits when the application is complete.  It
93    * does so by using daemon threads and adding a shutdown hook to wait for
94    * their completion.
95    *
96    * <p>This is mainly for fixed thread pools.
97    * See {@link Executors#newScheduledThreadPool(int)}.
98    *
99    * @param executor the executor to modify to make sure it exits when the
100    *        application is finished
101    * @param terminationTimeout how long to wait for the executor to
102    *        finish before terminating the JVM
103    * @param timeUnit unit of time for the time parameter
104    * @return an unmodifiable version of the input which will not hang the JVM
105    */
106   @Beta
getExitingScheduledExecutorService( ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit)107   public static ScheduledExecutorService getExitingScheduledExecutorService(
108       ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
109     return new Application()
110         .getExitingScheduledExecutorService(executor, terminationTimeout, timeUnit);
111   }
112 
113   /**
114    * Add a shutdown hook to wait for thread completion in the given
115    * {@link ExecutorService service}.  This is useful if the given service uses
116    * daemon threads, and we want to keep the JVM from exiting immediately on
117    * shutdown, instead giving these daemon threads a chance to terminate
118    * normally.
119    * @param service ExecutorService which uses daemon threads
120    * @param terminationTimeout how long to wait for the executor to finish
121    *        before terminating the JVM
122    * @param timeUnit unit of time for the time parameter
123    */
124   @Beta
addDelayedShutdownHook( ExecutorService service, long terminationTimeout, TimeUnit timeUnit)125   public static void addDelayedShutdownHook(
126       ExecutorService service, long terminationTimeout, TimeUnit timeUnit) {
127     new Application()
128         .addDelayedShutdownHook(service, terminationTimeout, timeUnit);
129   }
130 
131   /**
132    * Converts the given ThreadPoolExecutor into an ExecutorService that exits
133    * when the application is complete.  It does so by using daemon threads and
134    * adding a shutdown hook to wait for their completion.
135    *
136    * <p>This method waits 120 seconds before continuing with JVM termination,
137    * even if the executor has not finished its work.
138    *
139    * <p>This is mainly for fixed thread pools.
140    * See {@link Executors#newFixedThreadPool(int)}.
141    *
142    * @param executor the executor to modify to make sure it exits when the
143    *        application is finished
144    * @return an unmodifiable version of the input which will not hang the JVM
145    */
146   @Beta
getExitingExecutorService(ThreadPoolExecutor executor)147   public static ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
148     return new Application().getExitingExecutorService(executor);
149   }
150 
151   /**
152    * Converts the given ThreadPoolExecutor into a ScheduledExecutorService that
153    * exits when the application is complete.  It does so by using daemon threads
154    * and adding a shutdown hook to wait for their completion.
155    *
156    * <p>This method waits 120 seconds before continuing with JVM termination,
157    * even if the executor has not finished its work.
158    *
159    * <p>This is mainly for fixed thread pools.
160    * See {@link Executors#newScheduledThreadPool(int)}.
161    *
162    * @param executor the executor to modify to make sure it exits when the
163    *        application is finished
164    * @return an unmodifiable version of the input which will not hang the JVM
165    */
166   @Beta
getExitingScheduledExecutorService( ScheduledThreadPoolExecutor executor)167   public static ScheduledExecutorService getExitingScheduledExecutorService(
168       ScheduledThreadPoolExecutor executor) {
169     return new Application().getExitingScheduledExecutorService(executor);
170   }
171 
172   /** Represents the current application to register shutdown hooks. */
173   @VisibleForTesting static class Application {
174 
getExitingExecutorService( ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit)175     final ExecutorService getExitingExecutorService(
176         ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
177       useDaemonThreadFactory(executor);
178       ExecutorService service = Executors.unconfigurableExecutorService(executor);
179       addDelayedShutdownHook(service, terminationTimeout, timeUnit);
180       return service;
181     }
182 
getExitingScheduledExecutorService( ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit)183     final ScheduledExecutorService getExitingScheduledExecutorService(
184         ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
185       useDaemonThreadFactory(executor);
186       ScheduledExecutorService service = Executors.unconfigurableScheduledExecutorService(executor);
187       addDelayedShutdownHook(service, terminationTimeout, timeUnit);
188       return service;
189     }
190 
addDelayedShutdownHook( final ExecutorService service, final long terminationTimeout, final TimeUnit timeUnit)191     final void addDelayedShutdownHook(
192         final ExecutorService service, final long terminationTimeout, final TimeUnit timeUnit) {
193       checkNotNull(service);
194       checkNotNull(timeUnit);
195       addShutdownHook(MoreExecutors.newThread("DelayedShutdownHook-for-" + service, new Runnable() {
196         @Override
197         public void run() {
198           try {
199             // We'd like to log progress and failures that may arise in the
200             // following code, but unfortunately the behavior of logging
201             // is undefined in shutdown hooks.
202             // This is because the logging code installs a shutdown hook of its
203             // own. See Cleaner class inside {@link LogManager}.
204             service.shutdown();
205             service.awaitTermination(terminationTimeout, timeUnit);
206           } catch (InterruptedException ignored) {
207             // We're shutting down anyway, so just ignore.
208           }
209         }
210       }));
211     }
212 
getExitingExecutorService(ThreadPoolExecutor executor)213     final ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
214       return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
215     }
216 
getExitingScheduledExecutorService( ScheduledThreadPoolExecutor executor)217     final ScheduledExecutorService getExitingScheduledExecutorService(
218         ScheduledThreadPoolExecutor executor) {
219       return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
220     }
221 
addShutdownHook(Thread hook)222     @VisibleForTesting void addShutdownHook(Thread hook) {
223       Runtime.getRuntime().addShutdownHook(hook);
224     }
225   }
226 
useDaemonThreadFactory(ThreadPoolExecutor executor)227   private static void useDaemonThreadFactory(ThreadPoolExecutor executor) {
228     executor.setThreadFactory(new ThreadFactoryBuilder()
229         .setDaemon(true)
230         .setThreadFactory(executor.getThreadFactory())
231         .build());
232   }
233 
234   /**
235    * Creates an executor service that runs each task in the thread
236    * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy}  This
237    * applies both to individually submitted tasks and to collections of tasks
238    * submitted via {@code invokeAll} or {@code invokeAny}.  In the latter case,
239    * tasks will run serially on the calling thread.  Tasks are run to
240    * completion before a {@code Future} is returned to the caller (unless the
241    * executor has been shutdown).
242    *
243    * <p>Although all tasks are immediately executed in the thread that
244    * submitted the task, this {@code ExecutorService} imposes a small
245    * locking overhead on each task submission in order to implement shutdown
246    * and termination behavior.
247    *
248    * <p>The implementation deviates from the {@code ExecutorService}
249    * specification with regards to the {@code shutdownNow} method.  First,
250    * "best-effort" with regards to canceling running tasks is implemented
251    * as "no-effort".  No interrupts or other attempts are made to stop
252    * threads executing tasks.  Second, the returned list will always be empty,
253    * as any submitted task is considered to have started execution.
254    * This applies also to tasks given to {@code invokeAll} or {@code invokeAny}
255    * which are pending serial execution, even the subset of the tasks that
256    * have not yet started execution.  It is unclear from the
257    * {@code ExecutorService} specification if these should be included, and
258    * it's much easier to implement the interpretation that they not be.
259    * Finally, a call to {@code shutdown} or {@code shutdownNow} may result
260    * in concurrent calls to {@code invokeAll/invokeAny} throwing
261    * RejectedExecutionException, although a subset of the tasks may already
262    * have been executed.
263    *
264    * @since 10.0 (<a href="http://code.google.com/p/guava-libraries/wiki/Compatibility"
265    *        >mostly source-compatible</a> since 3.0)
266    * @deprecated Use {@link #directExecutor()} if you only require an {@link Executor} and
267    *     {@link #newDirectExecutorService()} if you need a {@link ListeningExecutorService}.
268    */
sameThreadExecutor()269   @Deprecated public static ListeningExecutorService sameThreadExecutor() {
270     return new DirectExecutorService();
271   }
272 
273   // See sameThreadExecutor javadoc for behavioral notes.
274   private static class DirectExecutorService
275       extends AbstractListeningExecutorService {
276     /**
277      * Lock used whenever accessing the state variables
278      * (runningTasks, shutdown, terminationCondition) of the executor
279      */
280     private final Lock lock = new ReentrantLock();
281 
282     /** Signaled after the executor is shutdown and running tasks are done */
283     private final Condition termination = lock.newCondition();
284 
285     /*
286      * Conceptually, these two variables describe the executor being in
287      * one of three states:
288      *   - Active: shutdown == false
289      *   - Shutdown: runningTasks > 0 and shutdown == true
290      *   - Terminated: runningTasks == 0 and shutdown == true
291      */
292     private int runningTasks = 0;
293     private boolean shutdown = false;
294 
295     @Override
execute(Runnable command)296     public void execute(Runnable command) {
297       startTask();
298       try {
299         command.run();
300       } finally {
301         endTask();
302       }
303     }
304 
305     @Override
isShutdown()306     public boolean isShutdown() {
307       lock.lock();
308       try {
309         return shutdown;
310       } finally {
311         lock.unlock();
312       }
313     }
314 
315     @Override
shutdown()316     public void shutdown() {
317       lock.lock();
318       try {
319         shutdown = true;
320       } finally {
321         lock.unlock();
322       }
323     }
324 
325     // See sameThreadExecutor javadoc for unusual behavior of this method.
326     @Override
shutdownNow()327     public List<Runnable> shutdownNow() {
328       shutdown();
329       return Collections.emptyList();
330     }
331 
332     @Override
isTerminated()333     public boolean isTerminated() {
334       lock.lock();
335       try {
336         return shutdown && runningTasks == 0;
337       } finally {
338         lock.unlock();
339       }
340     }
341 
342     @Override
awaitTermination(long timeout, TimeUnit unit)343     public boolean awaitTermination(long timeout, TimeUnit unit)
344         throws InterruptedException {
345       long nanos = unit.toNanos(timeout);
346       lock.lock();
347       try {
348         for (;;) {
349           if (isTerminated()) {
350             return true;
351           } else if (nanos <= 0) {
352             return false;
353           } else {
354             nanos = termination.awaitNanos(nanos);
355           }
356         }
357       } finally {
358         lock.unlock();
359       }
360     }
361 
362     /**
363      * Checks if the executor has been shut down and increments the running
364      * task count.
365      *
366      * @throws RejectedExecutionException if the executor has been previously
367      *         shutdown
368      */
startTask()369     private void startTask() {
370       lock.lock();
371       try {
372         if (isShutdown()) {
373           throw new RejectedExecutionException("Executor already shutdown");
374         }
375         runningTasks++;
376       } finally {
377         lock.unlock();
378       }
379     }
380 
381     /**
382      * Decrements the running task count.
383      */
endTask()384     private void endTask() {
385       lock.lock();
386       try {
387         runningTasks--;
388         if (isTerminated()) {
389           termination.signalAll();
390         }
391       } finally {
392         lock.unlock();
393       }
394     }
395   }
396 
397   /**
398    * Creates an executor service that runs each task in the thread
399    * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy}  This
400    * applies both to individually submitted tasks and to collections of tasks
401    * submitted via {@code invokeAll} or {@code invokeAny}.  In the latter case,
402    * tasks will run serially on the calling thread.  Tasks are run to
403    * completion before a {@code Future} is returned to the caller (unless the
404    * executor has been shutdown).
405    *
406    * <p>Although all tasks are immediately executed in the thread that
407    * submitted the task, this {@code ExecutorService} imposes a small
408    * locking overhead on each task submission in order to implement shutdown
409    * and termination behavior.
410    *
411    * <p>The implementation deviates from the {@code ExecutorService}
412    * specification with regards to the {@code shutdownNow} method.  First,
413    * "best-effort" with regards to canceling running tasks is implemented
414    * as "no-effort".  No interrupts or other attempts are made to stop
415    * threads executing tasks.  Second, the returned list will always be empty,
416    * as any submitted task is considered to have started execution.
417    * This applies also to tasks given to {@code invokeAll} or {@code invokeAny}
418    * which are pending serial execution, even the subset of the tasks that
419    * have not yet started execution.  It is unclear from the
420    * {@code ExecutorService} specification if these should be included, and
421    * it's much easier to implement the interpretation that they not be.
422    * Finally, a call to {@code shutdown} or {@code shutdownNow} may result
423    * in concurrent calls to {@code invokeAll/invokeAny} throwing
424    * RejectedExecutionException, although a subset of the tasks may already
425    * have been executed.
426    *
427    * @since 18.0 (present as MoreExecutors.sameThreadExecutor() since 10.0)
428    */
newDirectExecutorService()429   public static ListeningExecutorService newDirectExecutorService() {
430     return new DirectExecutorService();
431   }
432 
433   /**
434    * Returns an {@link Executor} that runs each task in the thread that invokes
435    * {@link Executor#execute execute}, as in {@link CallerRunsPolicy}.
436    *
437    * <p>This instance is equivalent to: <pre>   {@code
438    *   final class DirectExecutor implements Executor {
439    *     public void execute(Runnable r) {
440    *       r.run();
441    *     }
442    *   }}</pre>
443    *
444    * <p>This should be preferred to {@link #newDirectExecutorService()} because the implementing the
445    * {@link ExecutorService} subinterface necessitates significant performance overhead.
446    *
447    * @since 18.0
448    */
directExecutor()449   public static Executor directExecutor() {
450     return DirectExecutor.INSTANCE;
451   }
452 
453   /** See {@link #directExecutor} for behavioral notes. */
454   private enum DirectExecutor implements Executor {
455     INSTANCE;
execute(Runnable command)456     @Override public void execute(Runnable command) {
457       command.run();
458     }
459   }
460 
461   /**
462    * Creates an {@link ExecutorService} whose {@code submit} and {@code
463    * invokeAll} methods submit {@link ListenableFutureTask} instances to the
464    * given delegate executor. Those methods, as well as {@code execute} and
465    * {@code invokeAny}, are implemented in terms of calls to {@code
466    * delegate.execute}. All other methods are forwarded unchanged to the
467    * delegate. This implies that the returned {@code ListeningExecutorService}
468    * never calls the delegate's {@code submit}, {@code invokeAll}, and {@code
469    * invokeAny} methods, so any special handling of tasks must be implemented in
470    * the delegate's {@code execute} method or by wrapping the returned {@code
471    * ListeningExecutorService}.
472    *
473    * <p>If the delegate executor was already an instance of {@code
474    * ListeningExecutorService}, it is returned untouched, and the rest of this
475    * documentation does not apply.
476    *
477    * @since 10.0
478    */
listeningDecorator( ExecutorService delegate)479   public static ListeningExecutorService listeningDecorator(
480       ExecutorService delegate) {
481     return (delegate instanceof ListeningExecutorService)
482         ? (ListeningExecutorService) delegate
483         : (delegate instanceof ScheduledExecutorService)
484         ? new ScheduledListeningDecorator((ScheduledExecutorService) delegate)
485         : new ListeningDecorator(delegate);
486   }
487 
488   /**
489    * Creates a {@link ScheduledExecutorService} whose {@code submit} and {@code
490    * invokeAll} methods submit {@link ListenableFutureTask} instances to the
491    * given delegate executor. Those methods, as well as {@code execute} and
492    * {@code invokeAny}, are implemented in terms of calls to {@code
493    * delegate.execute}. All other methods are forwarded unchanged to the
494    * delegate. This implies that the returned {@code
495    * ListeningScheduledExecutorService} never calls the delegate's {@code
496    * submit}, {@code invokeAll}, and {@code invokeAny} methods, so any special
497    * handling of tasks must be implemented in the delegate's {@code execute}
498    * method or by wrapping the returned {@code
499    * ListeningScheduledExecutorService}.
500    *
501    * <p>If the delegate executor was already an instance of {@code
502    * ListeningScheduledExecutorService}, it is returned untouched, and the rest
503    * of this documentation does not apply.
504    *
505    * @since 10.0
506    */
listeningDecorator( ScheduledExecutorService delegate)507   public static ListeningScheduledExecutorService listeningDecorator(
508       ScheduledExecutorService delegate) {
509     return (delegate instanceof ListeningScheduledExecutorService)
510         ? (ListeningScheduledExecutorService) delegate
511         : new ScheduledListeningDecorator(delegate);
512   }
513 
514   private static class ListeningDecorator
515       extends AbstractListeningExecutorService {
516     private final ExecutorService delegate;
517 
ListeningDecorator(ExecutorService delegate)518     ListeningDecorator(ExecutorService delegate) {
519       this.delegate = checkNotNull(delegate);
520     }
521 
522     @Override
awaitTermination(long timeout, TimeUnit unit)523     public boolean awaitTermination(long timeout, TimeUnit unit)
524         throws InterruptedException {
525       return delegate.awaitTermination(timeout, unit);
526     }
527 
528     @Override
isShutdown()529     public boolean isShutdown() {
530       return delegate.isShutdown();
531     }
532 
533     @Override
isTerminated()534     public boolean isTerminated() {
535       return delegate.isTerminated();
536     }
537 
538     @Override
shutdown()539     public void shutdown() {
540       delegate.shutdown();
541     }
542 
543     @Override
shutdownNow()544     public List<Runnable> shutdownNow() {
545       return delegate.shutdownNow();
546     }
547 
548     @Override
execute(Runnable command)549     public void execute(Runnable command) {
550       delegate.execute(command);
551     }
552   }
553 
554   private static class ScheduledListeningDecorator
555       extends ListeningDecorator implements ListeningScheduledExecutorService {
556     @SuppressWarnings("hiding")
557     final ScheduledExecutorService delegate;
558 
ScheduledListeningDecorator(ScheduledExecutorService delegate)559     ScheduledListeningDecorator(ScheduledExecutorService delegate) {
560       super(delegate);
561       this.delegate = checkNotNull(delegate);
562     }
563 
564     @Override
schedule( Runnable command, long delay, TimeUnit unit)565     public ListenableScheduledFuture<?> schedule(
566         Runnable command, long delay, TimeUnit unit) {
567       ListenableFutureTask<Void> task =
568           ListenableFutureTask.create(command, null);
569       ScheduledFuture<?> scheduled = delegate.schedule(task, delay, unit);
570       return new ListenableScheduledTask<Void>(task, scheduled);
571     }
572 
573     @Override
schedule( Callable<V> callable, long delay, TimeUnit unit)574     public <V> ListenableScheduledFuture<V> schedule(
575         Callable<V> callable, long delay, TimeUnit unit) {
576       ListenableFutureTask<V> task = ListenableFutureTask.create(callable);
577       ScheduledFuture<?> scheduled = delegate.schedule(task, delay, unit);
578       return new ListenableScheduledTask<V>(task, scheduled);
579     }
580 
581     @Override
scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit)582     public ListenableScheduledFuture<?> scheduleAtFixedRate(
583         Runnable command, long initialDelay, long period, TimeUnit unit) {
584       NeverSuccessfulListenableFutureTask task =
585           new NeverSuccessfulListenableFutureTask(command);
586       ScheduledFuture<?> scheduled =
587           delegate.scheduleAtFixedRate(task, initialDelay, period, unit);
588       return new ListenableScheduledTask<Void>(task, scheduled);
589     }
590 
591     @Override
scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit)592     public ListenableScheduledFuture<?> scheduleWithFixedDelay(
593         Runnable command, long initialDelay, long delay, TimeUnit unit) {
594       NeverSuccessfulListenableFutureTask task =
595           new NeverSuccessfulListenableFutureTask(command);
596       ScheduledFuture<?> scheduled =
597           delegate.scheduleWithFixedDelay(task, initialDelay, delay, unit);
598       return new ListenableScheduledTask<Void>(task, scheduled);
599     }
600 
601     private static final class ListenableScheduledTask<V>
602         extends SimpleForwardingListenableFuture<V>
603         implements ListenableScheduledFuture<V> {
604 
605       private final ScheduledFuture<?> scheduledDelegate;
606 
ListenableScheduledTask( ListenableFuture<V> listenableDelegate, ScheduledFuture<?> scheduledDelegate)607       public ListenableScheduledTask(
608           ListenableFuture<V> listenableDelegate,
609           ScheduledFuture<?> scheduledDelegate) {
610         super(listenableDelegate);
611         this.scheduledDelegate = scheduledDelegate;
612       }
613 
614       @Override
cancel(boolean mayInterruptIfRunning)615       public boolean cancel(boolean mayInterruptIfRunning) {
616         boolean cancelled = super.cancel(mayInterruptIfRunning);
617         if (cancelled) {
618           // Unless it is cancelled, the delegate may continue being scheduled
619           scheduledDelegate.cancel(mayInterruptIfRunning);
620 
621           // TODO(user): Cancel "this" if "scheduledDelegate" is cancelled.
622         }
623         return cancelled;
624       }
625 
626       @Override
getDelay(TimeUnit unit)627       public long getDelay(TimeUnit unit) {
628         return scheduledDelegate.getDelay(unit);
629       }
630 
631       @Override
compareTo(Delayed other)632       public int compareTo(Delayed other) {
633         return scheduledDelegate.compareTo(other);
634       }
635     }
636 
637     private static final class NeverSuccessfulListenableFutureTask
638         extends AbstractFuture<Void>
639         implements Runnable {
640       private final Runnable delegate;
641 
NeverSuccessfulListenableFutureTask(Runnable delegate)642       public NeverSuccessfulListenableFutureTask(Runnable delegate) {
643         this.delegate = checkNotNull(delegate);
644       }
645 
run()646       @Override public void run() {
647         try {
648           delegate.run();
649         } catch (Throwable t) {
650           setException(t);
651           throw Throwables.propagate(t);
652         }
653       }
654     }
655   }
656 
657   /*
658    * This following method is a modified version of one found in
659    * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/AbstractExecutorServiceTest.java?revision=1.30
660    * which contained the following notice:
661    *
662    * Written by Doug Lea with assistance from members of JCP JSR-166
663    * Expert Group and released to the public domain, as explained at
664    * http://creativecommons.org/publicdomain/zero/1.0/
665    * Other contributors include Andrew Wright, Jeffrey Hayes,
666    * Pat Fisher, Mike Judd.
667    */
668 
669   /**
670    * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService}
671    * implementations.
invokeAnyImpl(ListeningExecutorService executorService, Collection<? extends Callable<T>> tasks, boolean timed, long nanos)672    */ static <T> T invokeAnyImpl(ListeningExecutorService executorService,
673       Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
674           throws InterruptedException, ExecutionException, TimeoutException {
675     checkNotNull(executorService);
676     int ntasks = tasks.size();
677     checkArgument(ntasks > 0);
678     List<Future<T>> futures = Lists.newArrayListWithCapacity(ntasks);
679     BlockingQueue<Future<T>> futureQueue = Queues.newLinkedBlockingQueue();
680 
681     // For efficiency, especially in executors with limited
682     // parallelism, check to see if previously submitted tasks are
683     // done before submitting more of them. This interleaving
684     // plus the exception mechanics account for messiness of main
685     // loop.
686 
687     try {
688       // Record exceptions so that if we fail to obtain any
689       // result, we can throw the last exception we got.
690       ExecutionException ee = null;
691       long lastTime = timed ? System.nanoTime() : 0;
692       Iterator<? extends Callable<T>> it = tasks.iterator();
693 
694       futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
695       --ntasks;
696       int active = 1;
697 
698       for (;;) {
699         Future<T> f = futureQueue.poll();
700         if (f == null) {
701           if (ntasks > 0) {
702             --ntasks;
703             futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
704             ++active;
705           } else if (active == 0) {
706             break;
707           } else if (timed) {
708             f = futureQueue.poll(nanos, TimeUnit.NANOSECONDS);
709             if (f == null) {
710               throw new TimeoutException();
711             }
712             long now = System.nanoTime();
713             nanos -= now - lastTime;
714             lastTime = now;
715           } else {
716             f = futureQueue.take();
717           }
718         }
719         if (f != null) {
720           --active;
721           try {
722             return f.get();
723           } catch (ExecutionException eex) {
724             ee = eex;
725           } catch (RuntimeException rex) {
726             ee = new ExecutionException(rex);
727           }
728         }
729       }
730 
731       if (ee == null) {
732         ee = new ExecutionException(null);
733       }
734       throw ee;
735     } finally {
736       for (Future<T> f : futures) {
737         f.cancel(true);
738       }
739     }
740   }
741 
742   /**
743    * Submits the task and adds a listener that adds the future to {@code queue} when it completes.
744    */
submitAndAddQueueListener( ListeningExecutorService executorService, Callable<T> task, final BlockingQueue<Future<T>> queue)745   private static <T> ListenableFuture<T> submitAndAddQueueListener(
746       ListeningExecutorService executorService, Callable<T> task,
747       final BlockingQueue<Future<T>> queue) {
748     final ListenableFuture<T> future = executorService.submit(task);
749     future.addListener(new Runnable() {
750       @Override public void run() {
751         queue.add(future);
752       }
753     }, directExecutor());
754     return future;
755   }
756 
757   /**
758    * Returns a default thread factory used to create new threads.
759    *
760    * <p>On AppEngine, returns {@code ThreadManager.currentRequestThreadFactory()}.
761    * Otherwise, returns {@link Executors#defaultThreadFactory()}.
762    *
763    * @since 14.0
764    */
765   @Beta
platformThreadFactory()766   public static ThreadFactory platformThreadFactory() {
767     if (!isAppEngine()) {
768       return Executors.defaultThreadFactory();
769     }
770     try {
771       return (ThreadFactory) Class.forName("com.google.appengine.api.ThreadManager")
772           .getMethod("currentRequestThreadFactory")
773           .invoke(null);
774     } catch (IllegalAccessException e) {
775       throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
776     } catch (ClassNotFoundException e) {
777       throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
778     } catch (NoSuchMethodException e) {
779       throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
780     } catch (InvocationTargetException e) {
781       throw Throwables.propagate(e.getCause());
782     }
783   }
784 
isAppEngine()785   private static boolean isAppEngine() {
786     if (System.getProperty("com.google.appengine.runtime.environment") == null) {
787       return false;
788     }
789     try {
790       // If the current environment is null, we're not inside AppEngine.
791       return Class.forName("com.google.apphosting.api.ApiProxy")
792           .getMethod("getCurrentEnvironment")
793           .invoke(null) != null;
794     } catch (ClassNotFoundException e) {
795       // If ApiProxy doesn't exist, we're not on AppEngine at all.
796       return false;
797     } catch (InvocationTargetException e) {
798       // If ApiProxy throws an exception, we're not in a proper AppEngine environment.
799       return false;
800     } catch (IllegalAccessException e) {
801       // If the method isn't accessible, we're not on a supported version of AppEngine;
802       return false;
803     } catch (NoSuchMethodException e) {
804       // If the method doesn't exist, we're not on a supported version of AppEngine;
805       return false;
806     }
807   }
808 
809   /**
810    * Creates a thread using {@link #platformThreadFactory}, and sets its name to {@code name}
811    * unless changing the name is forbidden by the security manager.
812    */
newThread(String name, Runnable runnable)813   static Thread newThread(String name, Runnable runnable) {
814     checkNotNull(name);
815     checkNotNull(runnable);
816     Thread result = platformThreadFactory().newThread(runnable);
817     try {
818       result.setName(name);
819     } catch (SecurityException e) {
820       // OK if we can't set the name in this environment.
821     }
822     return result;
823   }
824 
825   // TODO(user): provide overloads for ListeningExecutorService? ListeningScheduledExecutorService?
826   // TODO(user): provide overloads that take constant strings? Function<Runnable, String>s to
827   // calculate names?
828 
829   /**
830    * Creates an {@link Executor} that renames the {@link Thread threads} that its tasks run in.
831    *
832    * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
833    * right before each task is run.  The renaming is best effort, if a {@link SecurityManager}
834    * prevents the renaming then it will be skipped but the tasks will still execute.
835    *
836    *
837    * @param executor The executor to decorate
838    * @param nameSupplier The source of names for each task
839    */
renamingDecorator(final Executor executor, final Supplier<String> nameSupplier)840   static Executor renamingDecorator(final Executor executor, final Supplier<String> nameSupplier) {
841     checkNotNull(executor);
842     checkNotNull(nameSupplier);
843     if (isAppEngine()) {
844       // AppEngine doesn't support thread renaming, so don't even try
845       return executor;
846     }
847     return new Executor() {
848       @Override public void execute(Runnable command) {
849         executor.execute(Callables.threadRenaming(command, nameSupplier));
850       }
851     };
852   }
853 
854   /**
855    * Creates an {@link ExecutorService} that renames the {@link Thread threads} that its tasks run
856    * in.
857    *
858    * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
859    * right before each task is run.  The renaming is best effort, if a {@link SecurityManager}
860    * prevents the renaming then it will be skipped but the tasks will still execute.
861    *
862    *
863    * @param service The executor to decorate
864    * @param nameSupplier The source of names for each task
865    */
866   static ExecutorService renamingDecorator(final ExecutorService service,
867       final Supplier<String> nameSupplier) {
868     checkNotNull(service);
869     checkNotNull(nameSupplier);
870     if (isAppEngine()) {
871       // AppEngine doesn't support thread renaming, so don't even try.
872       return service;
873     }
874     return new WrappingExecutorService(service) {
875       @Override protected <T> Callable<T> wrapTask(Callable<T> callable) {
876         return Callables.threadRenaming(callable, nameSupplier);
877       }
878       @Override protected Runnable wrapTask(Runnable command) {
879         return Callables.threadRenaming(command, nameSupplier);
880       }
881     };
882   }
883 
884   /**
885    * Creates a {@link ScheduledExecutorService} that renames the {@link Thread threads} that its
886    * tasks run in.
887    *
888    * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
889    * right before each task is run.  The renaming is best effort, if a {@link SecurityManager}
890    * prevents the renaming then it will be skipped but the tasks will still execute.
891    *
892    *
893    * @param service The executor to decorate
894    * @param nameSupplier The source of names for each task
895    */
896   static ScheduledExecutorService renamingDecorator(final ScheduledExecutorService service,
897       final Supplier<String> nameSupplier) {
898     checkNotNull(service);
899     checkNotNull(nameSupplier);
900     if (isAppEngine()) {
901       // AppEngine doesn't support thread renaming, so don't even try.
902       return service;
903     }
904     return new WrappingScheduledExecutorService(service) {
905       @Override protected <T> Callable<T> wrapTask(Callable<T> callable) {
906         return Callables.threadRenaming(callable, nameSupplier);
907       }
908       @Override protected Runnable wrapTask(Runnable command) {
909         return Callables.threadRenaming(command, nameSupplier);
910       }
911     };
912   }
913 
914   /**
915    * Shuts down the given executor gradually, first disabling new submissions and later cancelling
916    * existing tasks.
917    *
918    * <p>The method takes the following steps:
919    * <ol>
920    *  <li>calls {@link ExecutorService#shutdown()}, disabling acceptance of new submitted tasks.
921    *  <li>waits for half of the specified timeout.
922    *  <li>if the timeout expires, it calls {@link ExecutorService#shutdownNow()}, cancelling
923    *  pending tasks and interrupting running tasks.
924    *  <li>waits for the other half of the specified timeout.
925    * </ol>
926    *
927    * <p>If, at any step of the process, the given executor is terminated or the calling thread is
928    * interrupted, the method calls {@link ExecutorService#shutdownNow()}, cancelling
929    * pending tasks and interrupting running tasks.
930    *
931    * @param service the {@code ExecutorService} to shut down
932    * @param timeout the maximum time to wait for the {@code ExecutorService} to terminate
933    * @param unit the time unit of the timeout argument
934    * @return {@code true} if the pool was terminated successfully, {@code false} if the
935    *     {@code ExecutorService} could not terminate <b>or</b> the thread running this method
936    *     is interrupted while waiting for the {@code ExecutorService} to terminate
937    * @since 17.0
938    */
939   @Beta
940   public static boolean shutdownAndAwaitTermination(
941       ExecutorService service, long timeout, TimeUnit unit) {
942     checkNotNull(unit);
943     // Disable new tasks from being submitted
944     service.shutdown();
945     try {
946       long halfTimeoutNanos = TimeUnit.NANOSECONDS.convert(timeout, unit) / 2;
947       // Wait for half the duration of the timeout for existing tasks to terminate
948       if (!service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS)) {
949         // Cancel currently executing tasks
950         service.shutdownNow();
951         // Wait the other half of the timeout for tasks to respond to being cancelled
952         service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS);
953       }
954     } catch (InterruptedException ie) {
955       // Preserve interrupt status
956       Thread.currentThread().interrupt();
957       // (Re-)Cancel if current thread also interrupted
958       service.shutdownNow();
959     }
960     return service.isTerminated();
961   }
962 }
963