• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2007 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 
21 import com.google.common.annotations.Beta;
22 
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.ScheduledFuture;
31 import java.util.concurrent.ScheduledThreadPoolExecutor;
32 import java.util.concurrent.ThreadFactory;
33 import java.util.concurrent.ThreadPoolExecutor;
34 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.locks.Condition;
37 import java.util.concurrent.locks.Lock;
38 import java.util.concurrent.locks.ReentrantLock;
39 
40 /**
41  * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link
42  * ExecutorService}, and {@link ThreadFactory}.
43  *
44  * @author Eric Fellheimer
45  * @author Kyle Littlefield
46  * @author Justin Mahoney
47  * @since 3.0
48  */
49 public final class MoreExecutors {
MoreExecutors()50   private MoreExecutors() {}
51 
52   /**
53    * Converts the given ThreadPoolExecutor into an ExecutorService that exits
54    * when the application is complete.  It does so by using daemon threads and
55    * adding a shutdown hook to wait for their completion.
56    *
57    * <p>This is mainly for fixed thread pools.
58    * See {@link Executors#newFixedThreadPool(int)}.
59    *
60    * @param executor the executor to modify to make sure it exits when the
61    *        application is finished
62    * @param terminationTimeout how long to wait for the executor to
63    *        finish before terminating the JVM
64    * @param timeUnit unit of time for the time parameter
65    * @return an unmodifiable version of the input which will not hang the JVM
66    */
67   @Beta
getExitingExecutorService( ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit)68   public static ExecutorService getExitingExecutorService(
69       ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
70     executor.setThreadFactory(new ThreadFactoryBuilder()
71         .setDaemon(true)
72         .setThreadFactory(executor.getThreadFactory())
73         .build());
74 
75     ExecutorService service = Executors.unconfigurableExecutorService(executor);
76 
77     addDelayedShutdownHook(service, terminationTimeout, timeUnit);
78 
79     return service;
80   }
81 
82   /**
83    * Converts the given ScheduledThreadPoolExecutor into a
84    * ScheduledExecutorService that exits when the application is complete.  It
85    * does so by using daemon threads and adding a shutdown hook to wait for
86    * their completion.
87    *
88    * <p>This is mainly for fixed thread pools.
89    * See {@link Executors#newScheduledThreadPool(int)}.
90    *
91    * @param executor the executor to modify to make sure it exits when the
92    *        application is finished
93    * @param terminationTimeout how long to wait for the executor to
94    *        finish before terminating the JVM
95    * @param timeUnit unit of time for the time parameter
96    * @return an unmodifiable version of the input which will not hang the JVM
97    */
98   @Beta
getExitingScheduledExecutorService( ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit)99   public static ScheduledExecutorService getExitingScheduledExecutorService(
100       ScheduledThreadPoolExecutor executor, long terminationTimeout,
101       TimeUnit timeUnit) {
102     executor.setThreadFactory(new ThreadFactoryBuilder()
103         .setDaemon(true)
104         .setThreadFactory(executor.getThreadFactory())
105         .build());
106 
107     ScheduledExecutorService service =
108         Executors.unconfigurableScheduledExecutorService(executor);
109 
110     addDelayedShutdownHook(service, terminationTimeout, timeUnit);
111 
112     return service;
113   }
114 
115   /**
116    * Add a shutdown hook to wait for thread completion in the given
117    * {@link ExecutorService service}.  This is useful if the given service uses
118    * daemon threads, and we want to keep the JVM from exiting immediately on
119    * shutdown, instead giving these daemon threads a chance to terminate
120    * normally.
121    * @param service ExecutorService which uses daemon threads
122    * @param terminationTimeout how long to wait for the executor to finish
123    *        before terminating the JVM
124    * @param timeUnit unit of time for the time parameter
125    */
126   @Beta
addDelayedShutdownHook( final ExecutorService service, final long terminationTimeout, final TimeUnit timeUnit)127   public static void addDelayedShutdownHook(
128       final ExecutorService service, final long terminationTimeout,
129       final TimeUnit timeUnit) {
130     Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
131       @Override
132       public void run() {
133         try {
134           // We'd like to log progress and failures that may arise in the
135           // following code, but unfortunately the behavior of logging
136           // is undefined in shutdown hooks.
137           // This is because the logging code installs a shutdown hook of its
138           // own. See Cleaner class inside {@link LogManager}.
139           service.shutdown();
140           service.awaitTermination(terminationTimeout, timeUnit);
141         } catch (InterruptedException ignored) {
142           // We're shutting down anyway, so just ignore.
143         }
144       }
145     }));
146   }
147 
148   /**
149    * Converts the given ThreadPoolExecutor into an ExecutorService that exits
150    * when the application is complete.  It does so by using daemon threads and
151    * adding a shutdown hook to wait for their completion.
152    *
153    * <p>This method waits 120 seconds before continuing with JVM termination,
154    * even if the executor has not finished its work.
155    *
156    * <p>This is mainly for fixed thread pools.
157    * See {@link Executors#newFixedThreadPool(int)}.
158    *
159    * @param executor the executor to modify to make sure it exits when the
160    *        application is finished
161    * @return an unmodifiable version of the input which will not hang the JVM
162    */
163   @Beta
getExitingExecutorService( ThreadPoolExecutor executor)164   public static ExecutorService getExitingExecutorService(
165       ThreadPoolExecutor executor) {
166     return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
167   }
168 
169   /**
170    * Converts the given ThreadPoolExecutor into a ScheduledExecutorService that
171    * exits when the application is complete.  It does so by using daemon threads
172    * and adding a shutdown hook to wait for their completion.
173    *
174    * <p>This method waits 120 seconds before continuing with JVM termination,
175    * even if the executor has not finished its work.
176    *
177    * <p>This is mainly for fixed thread pools.
178    * See {@link Executors#newScheduledThreadPool(int)}.
179    *
180    * @param executor the executor to modify to make sure it exits when the
181    *        application is finished
182    * @return an unmodifiable version of the input which will not hang the JVM
183    */
184   @Beta
getExitingScheduledExecutorService( ScheduledThreadPoolExecutor executor)185   public static ScheduledExecutorService getExitingScheduledExecutorService(
186       ScheduledThreadPoolExecutor executor) {
187     return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
188   }
189 
190   /**
191    * Creates an executor service that runs each task in the thread
192    * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy}  This
193    * applies both to individually submitted tasks and to collections of tasks
194    * submitted via {@code invokeAll} or {@code invokeAny}.  In the latter case,
195    * tasks will run serially on the calling thread.  Tasks are run to
196    * completion before a {@code Future} is returned to the caller (unless the
197    * executor has been shutdown).
198    *
199    * <p>Although all tasks are immediately executed in the thread that
200    * submitted the task, this {@code ExecutorService} imposes a small
201    * locking overhead on each task submission in order to implement shutdown
202    * and termination behavior.
203    *
204    * <p>The implementation deviates from the {@code ExecutorService}
205    * specification with regards to the {@code shutdownNow} method.  First,
206    * "best-effort" with regards to canceling running tasks is implemented
207    * as "no-effort".  No interrupts or other attempts are made to stop
208    * threads executing tasks.  Second, the returned list will always be empty,
209    * as any submitted task is considered to have started execution.
210    * This applies also to tasks given to {@code invokeAll} or {@code invokeAny}
211    * which are pending serial execution, even the subset of the tasks that
212    * have not yet started execution.  It is unclear from the
213    * {@code ExecutorService} specification if these should be included, and
214    * it's much easier to implement the interpretation that they not be.
215    * Finally, a call to {@code shutdown} or {@code shutdownNow} may result
216    * in concurrent calls to {@code invokeAll/invokeAny} throwing
217    * RejectedExecutionException, although a subset of the tasks may already
218    * have been executed.
219    *
220    * @since 10.0 (<a href="http://code.google.com/p/guava-libraries/wiki/Compatibility"
221    *        >mostly source-compatible</a> since 3.0)
222    */
sameThreadExecutor()223   public static ListeningExecutorService sameThreadExecutor() {
224     return new SameThreadExecutorService();
225   }
226 
227   // See sameThreadExecutor javadoc for behavioral notes.
228   private static class SameThreadExecutorService
229       extends AbstractListeningExecutorService {
230     /**
231      * Lock used whenever accessing the state variables
232      * (runningTasks, shutdown, terminationCondition) of the executor
233      */
234     private final Lock lock = new ReentrantLock();
235 
236     /** Signaled after the executor is shutdown and running tasks are done */
237     private final Condition termination = lock.newCondition();
238 
239     /*
240      * Conceptually, these two variables describe the executor being in
241      * one of three states:
242      *   - Active: shutdown == false
243      *   - Shutdown: runningTasks > 0 and shutdown == true
244      *   - Terminated: runningTasks == 0 and shutdown == true
245      */
246     private int runningTasks = 0;
247     private boolean shutdown = false;
248 
249     @Override
execute(Runnable command)250     public void execute(Runnable command) {
251       startTask();
252       try {
253         command.run();
254       } finally {
255         endTask();
256       }
257     }
258 
259     @Override
isShutdown()260     public boolean isShutdown() {
261       lock.lock();
262       try {
263         return shutdown;
264       } finally {
265         lock.unlock();
266       }
267     }
268 
269     @Override
shutdown()270     public void shutdown() {
271       lock.lock();
272       try {
273         shutdown = true;
274       } finally {
275         lock.unlock();
276       }
277     }
278 
279     // See sameThreadExecutor javadoc for unusual behavior of this method.
280     @Override
shutdownNow()281     public List<Runnable> shutdownNow() {
282       shutdown();
283       return Collections.emptyList();
284     }
285 
286     @Override
isTerminated()287     public boolean isTerminated() {
288       lock.lock();
289       try {
290         return shutdown && runningTasks == 0;
291       } finally {
292         lock.unlock();
293       }
294     }
295 
296     @Override
awaitTermination(long timeout, TimeUnit unit)297     public boolean awaitTermination(long timeout, TimeUnit unit)
298         throws InterruptedException {
299       long nanos = unit.toNanos(timeout);
300       lock.lock();
301       try {
302         for (;;) {
303           if (isTerminated()) {
304             return true;
305           } else if (nanos <= 0) {
306             return false;
307           } else {
308             nanos = termination.awaitNanos(nanos);
309           }
310         }
311       } finally {
312         lock.unlock();
313       }
314     }
315 
316     /**
317      * Checks if the executor has been shut down and increments the running
318      * task count.
319      *
320      * @throws RejectedExecutionException if the executor has been previously
321      *         shutdown
322      */
startTask()323     private void startTask() {
324       lock.lock();
325       try {
326         if (isShutdown()) {
327           throw new RejectedExecutionException("Executor already shutdown");
328         }
329         runningTasks++;
330       } finally {
331         lock.unlock();
332       }
333     }
334 
335     /**
336      * Decrements the running task count.
337      */
endTask()338     private void endTask() {
339       lock.lock();
340       try {
341         runningTasks--;
342         if (isTerminated()) {
343           termination.signalAll();
344         }
345       } finally {
346         lock.unlock();
347       }
348     }
349   }
350 
351   /**
352    * Creates an {@link ExecutorService} whose {@code submit} and {@code
353    * invokeAll} methods submit {@link ListenableFutureTask} instances to the
354    * given delegate executor. Those methods, as well as {@code execute} and
355    * {@code invokeAny}, are implemented in terms of calls to {@code
356    * delegate.execute}. All other methods are forwarded unchanged to the
357    * delegate. This implies that the returned {@code ListeningExecutorService}
358    * never calls the delegate's {@code submit}, {@code invokeAll}, and {@code
359    * invokeAny} methods, so any special handling of tasks must be implemented in
360    * the delegate's {@code execute} method or by wrapping the returned {@code
361    * ListeningExecutorService}.
362    *
363    * <p>If the delegate executor was already an instance of {@code
364    * ListeningExecutorService}, it is returned untouched, and the rest of this
365    * documentation does not apply.
366    *
367    * @since 10.0
368    */
listeningDecorator( ExecutorService delegate)369   public static ListeningExecutorService listeningDecorator(
370       ExecutorService delegate) {
371     return (delegate instanceof ListeningExecutorService)
372         ? (ListeningExecutorService) delegate
373         : (delegate instanceof ScheduledExecutorService)
374         ? new ScheduledListeningDecorator((ScheduledExecutorService) delegate)
375         : new ListeningDecorator(delegate);
376   }
377 
378   /**
379    * Creates a {@link ScheduledExecutorService} whose {@code submit} and {@code
380    * invokeAll} methods submit {@link ListenableFutureTask} instances to the
381    * given delegate executor. Those methods, as well as {@code execute} and
382    * {@code invokeAny}, are implemented in terms of calls to {@code
383    * delegate.execute}. All other methods are forwarded unchanged to the
384    * delegate. This implies that the returned {@code
385    * SchedulingListeningExecutorService} never calls the delegate's {@code
386    * submit}, {@code invokeAll}, and {@code invokeAny} methods, so any special
387    * handling of tasks must be implemented in the delegate's {@code execute}
388    * method or by wrapping the returned {@code
389    * SchedulingListeningExecutorService}.
390    *
391    * <p>If the delegate executor was already an instance of {@code
392    * ListeningScheduledExecutorService}, it is returned untouched, and the rest
393    * of this documentation does not apply.
394    *
395    * @since 10.0
396    */
listeningDecorator( ScheduledExecutorService delegate)397   public static ListeningScheduledExecutorService listeningDecorator(
398       ScheduledExecutorService delegate) {
399     return (delegate instanceof ListeningScheduledExecutorService)
400         ? (ListeningScheduledExecutorService) delegate
401         : new ScheduledListeningDecorator(delegate);
402   }
403 
404   private static class ListeningDecorator
405       extends AbstractListeningExecutorService {
406     final ExecutorService delegate;
407 
ListeningDecorator(ExecutorService delegate)408     ListeningDecorator(ExecutorService delegate) {
409       this.delegate = checkNotNull(delegate);
410     }
411 
412     @Override
awaitTermination(long timeout, TimeUnit unit)413     public boolean awaitTermination(long timeout, TimeUnit unit)
414         throws InterruptedException {
415       return delegate.awaitTermination(timeout, unit);
416     }
417 
418     @Override
isShutdown()419     public boolean isShutdown() {
420       return delegate.isShutdown();
421     }
422 
423     @Override
isTerminated()424     public boolean isTerminated() {
425       return delegate.isTerminated();
426     }
427 
428     @Override
shutdown()429     public void shutdown() {
430       delegate.shutdown();
431     }
432 
433     @Override
shutdownNow()434     public List<Runnable> shutdownNow() {
435       return delegate.shutdownNow();
436     }
437 
438     @Override
execute(Runnable command)439     public void execute(Runnable command) {
440       delegate.execute(command);
441     }
442   }
443 
444   private static class ScheduledListeningDecorator
445       extends ListeningDecorator implements ListeningScheduledExecutorService {
446     final ScheduledExecutorService delegate;
447 
ScheduledListeningDecorator(ScheduledExecutorService delegate)448     ScheduledListeningDecorator(ScheduledExecutorService delegate) {
449       super(delegate);
450       this.delegate = checkNotNull(delegate);
451     }
452 
453     @Override
schedule( Runnable command, long delay, TimeUnit unit)454     public ScheduledFuture<?> schedule(
455         Runnable command, long delay, TimeUnit unit) {
456       return delegate.schedule(command, delay, unit);
457     }
458 
459     @Override
schedule( Callable<V> callable, long delay, TimeUnit unit)460     public <V> ScheduledFuture<V> schedule(
461         Callable<V> callable, long delay, TimeUnit unit) {
462       return delegate.schedule(callable, delay, unit);
463     }
464 
465     @Override
scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit)466     public ScheduledFuture<?> scheduleAtFixedRate(
467         Runnable command, long initialDelay, long period, TimeUnit unit) {
468       return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
469     }
470 
471     @Override
scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit)472     public ScheduledFuture<?> scheduleWithFixedDelay(
473         Runnable command, long initialDelay, long delay, TimeUnit unit) {
474       return delegate.scheduleWithFixedDelay(
475           command, initialDelay, delay, unit);
476     }
477   }
478 }
479