• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/licenses/publicdomain
5  */
6 
7 package java.util.concurrent;
8 import java.util.*;
9 
10 /**
11  * Provides default implementations of {@link ExecutorService}
12  * execution methods. This class implements the <tt>submit</tt>,
13  * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
14  * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
15  * to the {@link FutureTask} class provided in this package.  For example,
16  * the implementation of <tt>submit(Runnable)</tt> creates an
17  * associated <tt>RunnableFuture</tt> that is executed and
18  * returned. Subclasses may override the <tt>newTaskFor</tt> methods
19  * to return <tt>RunnableFuture</tt> implementations other than
20  * <tt>FutureTask</tt>.
21  *
22  * <p> <b>Extension example</b>. Here is a sketch of a class
23  * that customizes {@link ThreadPoolExecutor} to use
24  * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
25  *  <pre> {@code
26  * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
27  *
28  *   static class CustomTask<V> implements RunnableFuture<V> {...}
29  *
30  *   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
31  *       return new CustomTask<V>(c);
32  *   }
33  *   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
34  *       return new CustomTask<V>(r, v);
35  *   }
36  *   // ... add constructors, etc.
37  * }}</pre>
38  *
39  * @since 1.5
40  * @author Doug Lea
41  */
42 public abstract class AbstractExecutorService implements ExecutorService {
43 
44     /**
45      * Returns a <tt>RunnableFuture</tt> for the given runnable and default
46      * value.
47      *
48      * @param runnable the runnable task being wrapped
49      * @param value the default value for the returned future
50      * @return a <tt>RunnableFuture</tt> which when run will run the
51      * underlying runnable and which, as a <tt>Future</tt>, will yield
52      * the given value as its result and provide for cancellation of
53      * the underlying task.
54      * @since 1.6
55      */
newTaskFor(Runnable runnable, T value)56     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
57         return new FutureTask<T>(runnable, value);
58     }
59 
60     /**
61      * Returns a <tt>RunnableFuture</tt> for the given callable task.
62      *
63      * @param callable the callable task being wrapped
64      * @return a <tt>RunnableFuture</tt> which when run will call the
65      * underlying callable and which, as a <tt>Future</tt>, will yield
66      * the callable's result as its result and provide for
67      * cancellation of the underlying task.
68      * @since 1.6
69      */
newTaskFor(Callable<T> callable)70     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
71         return new FutureTask<T>(callable);
72     }
73 
74     /**
75      * @throws RejectedExecutionException {@inheritDoc}
76      * @throws NullPointerException       {@inheritDoc}
77      */
submit(Runnable task)78     public Future<?> submit(Runnable task) {
79         if (task == null) throw new NullPointerException();
80         RunnableFuture<Void> ftask = newTaskFor(task, null);
81         execute(ftask);
82         return ftask;
83     }
84 
85     /**
86      * @throws RejectedExecutionException {@inheritDoc}
87      * @throws NullPointerException       {@inheritDoc}
88      */
submit(Runnable task, T result)89     public <T> Future<T> submit(Runnable task, T result) {
90         if (task == null) throw new NullPointerException();
91         RunnableFuture<T> ftask = newTaskFor(task, result);
92         execute(ftask);
93         return ftask;
94     }
95 
96     /**
97      * @throws RejectedExecutionException {@inheritDoc}
98      * @throws NullPointerException       {@inheritDoc}
99      */
submit(Callable<T> task)100     public <T> Future<T> submit(Callable<T> task) {
101         if (task == null) throw new NullPointerException();
102         RunnableFuture<T> ftask = newTaskFor(task);
103         execute(ftask);
104         return ftask;
105     }
106 
107     /**
108      * the main mechanics of invokeAny.
109      */
doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)110     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
111                             boolean timed, long nanos)
112         throws InterruptedException, ExecutionException, TimeoutException {
113         if (tasks == null)
114             throw new NullPointerException();
115         int ntasks = tasks.size();
116         if (ntasks == 0)
117             throw new IllegalArgumentException();
118         List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
119         ExecutorCompletionService<T> ecs =
120             new ExecutorCompletionService<T>(this);
121 
122         // For efficiency, especially in executors with limited
123         // parallelism, check to see if previously submitted tasks are
124         // done before submitting more of them. This interleaving
125         // plus the exception mechanics account for messiness of main
126         // loop.
127 
128         try {
129             // Record exceptions so that if we fail to obtain any
130             // result, we can throw the last exception we got.
131             ExecutionException ee = null;
132             long lastTime = timed ? System.nanoTime() : 0;
133             Iterator<? extends Callable<T>> it = tasks.iterator();
134 
135             // Start one task for sure; the rest incrementally
136             futures.add(ecs.submit(it.next()));
137             --ntasks;
138             int active = 1;
139 
140             for (;;) {
141                 Future<T> f = ecs.poll();
142                 if (f == null) {
143                     if (ntasks > 0) {
144                         --ntasks;
145                         futures.add(ecs.submit(it.next()));
146                         ++active;
147                     }
148                     else if (active == 0)
149                         break;
150                     else if (timed) {
151                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
152                         if (f == null)
153                             throw new TimeoutException();
154                         long now = System.nanoTime();
155                         nanos -= now - lastTime;
156                         lastTime = now;
157                     }
158                     else
159                         f = ecs.take();
160                 }
161                 if (f != null) {
162                     --active;
163                     try {
164                         return f.get();
165                     } catch (ExecutionException eex) {
166                         ee = eex;
167                     } catch (RuntimeException rex) {
168                         ee = new ExecutionException(rex);
169                     }
170                 }
171             }
172 
173             if (ee == null)
174                 ee = new ExecutionException();
175             throw ee;
176 
177         } finally {
178             for (Future<T> f : futures)
179                 f.cancel(true);
180         }
181     }
182 
invokeAny(Collection<? extends Callable<T>> tasks)183     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
184         throws InterruptedException, ExecutionException {
185         try {
186             return doInvokeAny(tasks, false, 0);
187         } catch (TimeoutException cannotHappen) {
188             assert false;
189             return null;
190         }
191     }
192 
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)193     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
194                            long timeout, TimeUnit unit)
195         throws InterruptedException, ExecutionException, TimeoutException {
196         return doInvokeAny(tasks, true, unit.toNanos(timeout));
197     }
198 
invokeAll(Collection<? extends Callable<T>> tasks)199     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
200         throws InterruptedException {
201         if (tasks == null)
202             throw new NullPointerException();
203         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
204         boolean done = false;
205         try {
206             for (Callable<T> t : tasks) {
207                 RunnableFuture<T> f = newTaskFor(t);
208                 futures.add(f);
209                 execute(f);
210             }
211             for (Future<T> f : futures) {
212                 if (!f.isDone()) {
213                     try {
214                         f.get();
215                     } catch (CancellationException ignore) {
216                     } catch (ExecutionException ignore) {
217                     }
218                 }
219             }
220             done = true;
221             return futures;
222         } finally {
223             if (!done)
224                 for (Future<T> f : futures)
225                     f.cancel(true);
226         }
227     }
228 
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)229     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
230                                          long timeout, TimeUnit unit)
231         throws InterruptedException {
232         if (tasks == null || unit == null)
233             throw new NullPointerException();
234         long nanos = unit.toNanos(timeout);
235         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
236         boolean done = false;
237         try {
238             for (Callable<T> t : tasks)
239                 futures.add(newTaskFor(t));
240 
241             long lastTime = System.nanoTime();
242 
243             // Interleave time checks and calls to execute in case
244             // executor doesn't have any/much parallelism.
245             Iterator<Future<T>> it = futures.iterator();
246             while (it.hasNext()) {
247                 execute((Runnable)(it.next()));
248                 long now = System.nanoTime();
249                 nanos -= now - lastTime;
250                 lastTime = now;
251                 if (nanos <= 0)
252                     return futures;
253             }
254 
255             for (Future<T> f : futures) {
256                 if (!f.isDone()) {
257                     if (nanos <= 0)
258                         return futures;
259                     try {
260                         f.get(nanos, TimeUnit.NANOSECONDS);
261                     } catch (CancellationException ignore) {
262                     } catch (ExecutionException ignore) {
263                     } catch (TimeoutException toe) {
264                         return futures;
265                     }
266                     long now = System.nanoTime();
267                     nanos -= now - lastTime;
268                     lastTime = now;
269                 }
270             }
271             done = true;
272             return futures;
273         } finally {
274             if (!done)
275                 for (Future<T> f : futures)
276                     f.cancel(true);
277         }
278     }
279 
280 }
281