• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  */
6 
7 package java.util.concurrent;
8 
9 import java.util.*;
10 
11 /**
12  * Provides default implementations of {@link ExecutorService}
13  * execution methods. This class implements the {@code submit},
14  * {@code invokeAny} and {@code invokeAll} methods using a
15  * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults
16  * to the {@link FutureTask} class provided in this package.  For example,
17  * the implementation of {@code submit(Runnable)} creates an
18  * associated {@code RunnableFuture} that is executed and
19  * returned. Subclasses may override the {@code newTaskFor} methods
20  * to return {@code RunnableFuture} implementations other than
21  * {@code FutureTask}.
22  *
23  * <p><b>Extension example</b>. Here is a sketch of a class
24  * that customizes {@link ThreadPoolExecutor} to use
25  * a {@code CustomTask} class instead of the default {@code FutureTask}:
26  *  <pre> {@code
27  * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
28  *
29  *   static class CustomTask<V> implements RunnableFuture<V> {...}
30  *
31  *   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
32  *       return new CustomTask<V>(c);
33  *   }
34  *   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
35  *       return new CustomTask<V>(r, v);
36  *   }
37  *   // ... add constructors, etc.
38  * }}</pre>
39  *
40  * @since 1.5
41  * @author Doug Lea
42  */
43 public abstract class AbstractExecutorService implements ExecutorService {
44 
45     /**
46      * Returns a {@code RunnableFuture} for the given runnable and default
47      * value.
48      *
49      * @param runnable the runnable task being wrapped
50      * @param value the default value for the returned future
51      * @return a {@code RunnableFuture} which, when run, will run the
52      * underlying runnable and which, as a {@code Future}, will yield
53      * the given value as its result and provide for cancellation of
54      * the underlying task
55      * @since 1.6
56      */
newTaskFor(Runnable runnable, T value)57     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
58         return new FutureTask<T>(runnable, value);
59     }
60 
61     /**
62      * Returns a {@code RunnableFuture} for the given callable task.
63      *
64      * @param callable the callable task being wrapped
65      * @return a {@code RunnableFuture} which, when run, will call the
66      * underlying callable and which, as a {@code Future}, will yield
67      * the callable's result as its result and provide for
68      * cancellation of the underlying task
69      * @since 1.6
70      */
newTaskFor(Callable<T> callable)71     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
72         return new FutureTask<T>(callable);
73     }
74 
75     /**
76      * @throws RejectedExecutionException {@inheritDoc}
77      * @throws NullPointerException       {@inheritDoc}
78      */
submit(Runnable task)79     public Future<?> submit(Runnable task) {
80         if (task == null) throw new NullPointerException();
81         RunnableFuture<Void> ftask = newTaskFor(task, null);
82         execute(ftask);
83         return ftask;
84     }
85 
86     /**
87      * @throws RejectedExecutionException {@inheritDoc}
88      * @throws NullPointerException       {@inheritDoc}
89      */
submit(Runnable task, T result)90     public <T> Future<T> submit(Runnable task, T result) {
91         if (task == null) throw new NullPointerException();
92         RunnableFuture<T> ftask = newTaskFor(task, result);
93         execute(ftask);
94         return ftask;
95     }
96 
97     /**
98      * @throws RejectedExecutionException {@inheritDoc}
99      * @throws NullPointerException       {@inheritDoc}
100      */
submit(Callable<T> task)101     public <T> Future<T> submit(Callable<T> task) {
102         if (task == null) throw new NullPointerException();
103         RunnableFuture<T> ftask = newTaskFor(task);
104         execute(ftask);
105         return ftask;
106     }
107 
108     /**
109      * the main mechanics of invokeAny.
110      */
doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)111     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
112                               boolean timed, long nanos)
113         throws InterruptedException, ExecutionException, TimeoutException {
114         if (tasks == null)
115             throw new NullPointerException();
116         int ntasks = tasks.size();
117         if (ntasks == 0)
118             throw new IllegalArgumentException();
119         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
120         ExecutorCompletionService<T> ecs =
121             new ExecutorCompletionService<T>(this);
122 
123         // For efficiency, especially in executors with limited
124         // parallelism, check to see if previously submitted tasks are
125         // done before submitting more of them. This interleaving
126         // plus the exception mechanics account for messiness of main
127         // loop.
128 
129         try {
130             // Record exceptions so that if we fail to obtain any
131             // result, we can throw the last exception we got.
132             ExecutionException ee = null;
133             final long deadline = timed ? System.nanoTime() + nanos : 0L;
134             Iterator<? extends Callable<T>> it = tasks.iterator();
135 
136             // Start one task for sure; the rest incrementally
137             futures.add(ecs.submit(it.next()));
138             --ntasks;
139             int active = 1;
140 
141             for (;;) {
142                 Future<T> f = ecs.poll();
143                 if (f == null) {
144                     if (ntasks > 0) {
145                         --ntasks;
146                         futures.add(ecs.submit(it.next()));
147                         ++active;
148                     }
149                     else if (active == 0)
150                         break;
151                     else if (timed) {
152                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
153                         if (f == null)
154                             throw new TimeoutException();
155                         nanos = deadline - System.nanoTime();
156                     }
157                     else
158                         f = ecs.take();
159                 }
160                 if (f != null) {
161                     --active;
162                     try {
163                         return f.get();
164                     } catch (ExecutionException eex) {
165                         ee = eex;
166                     } catch (RuntimeException rex) {
167                         ee = new ExecutionException(rex);
168                     }
169                 }
170             }
171 
172             if (ee == null)
173                 ee = new ExecutionException();
174             throw ee;
175 
176         } finally {
177             for (int i = 0, size = futures.size(); i < size; i++)
178                 futures.get(i).cancel(true);
179         }
180     }
181 
invokeAny(Collection<? extends Callable<T>> tasks)182     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
183         throws InterruptedException, ExecutionException {
184         try {
185             return doInvokeAny(tasks, false, 0);
186         } catch (TimeoutException cannotHappen) {
187             assert false;
188             return null;
189         }
190     }
191 
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)192     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
193                            long timeout, TimeUnit unit)
194         throws InterruptedException, ExecutionException, TimeoutException {
195         return doInvokeAny(tasks, true, unit.toNanos(timeout));
196     }
197 
invokeAll(Collection<? extends Callable<T>> tasks)198     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
199         throws InterruptedException {
200         if (tasks == null)
201             throw new NullPointerException();
202         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
203         boolean done = false;
204         try {
205             for (Callable<T> t : tasks) {
206                 RunnableFuture<T> f = newTaskFor(t);
207                 futures.add(f);
208                 execute(f);
209             }
210             for (int i = 0, size = futures.size(); i < size; i++) {
211                 Future<T> f = futures.get(i);
212                 if (!f.isDone()) {
213                     try {
214                         f.get();
215                     } catch (CancellationException ignore) {
216                     } catch (ExecutionException ignore) {
217                     }
218                 }
219             }
220             done = true;
221             return futures;
222         } finally {
223             if (!done)
224                 for (int i = 0, size = futures.size(); i < size; i++)
225                     futures.get(i).cancel(true);
226         }
227     }
228 
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)229     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
230                                          long timeout, TimeUnit unit)
231         throws InterruptedException {
232         if (tasks == null)
233             throw new NullPointerException();
234         long nanos = unit.toNanos(timeout);
235         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
236         boolean done = false;
237         try {
238             for (Callable<T> t : tasks)
239                 futures.add(newTaskFor(t));
240 
241             final long deadline = System.nanoTime() + nanos;
242             final int size = futures.size();
243 
244             // Interleave time checks and calls to execute in case
245             // executor doesn't have any/much parallelism.
246             for (int i = 0; i < size; i++) {
247                 execute((Runnable)futures.get(i));
248                 nanos = deadline - System.nanoTime();
249                 if (nanos <= 0L)
250                     return futures;
251             }
252 
253             for (int i = 0; i < size; i++) {
254                 Future<T> f = futures.get(i);
255                 if (!f.isDone()) {
256                     if (nanos <= 0L)
257                         return futures;
258                     try {
259                         f.get(nanos, TimeUnit.NANOSECONDS);
260                     } catch (CancellationException ignore) {
261                     } catch (ExecutionException ignore) {
262                     } catch (TimeoutException toe) {
263                         return futures;
264                     }
265                     nanos = deadline - System.nanoTime();
266                 }
267             }
268             done = true;
269             return futures;
270         } finally {
271             if (!done)
272                 for (int i = 0, size = futures.size(); i < size; i++)
273                     futures.get(i).cancel(true);
274         }
275     }
276 
277 }
278