• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * This file is a modified version of
3  * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java?revision=1.20
4  * which contained the following notice:
5  *
6  * Written by Doug Lea with assistance from members of JCP JSR-166
7  * Expert Group and released to the public domain, as explained at
8  * http://creativecommons.org/licenses/publicdomain
9  */
10 
11 package java.util.concurrent;
12 
13 import java.util.*;
14 
15 public abstract class AbstractExecutorService implements ExecutorService {
16 
submit(Runnable task)17     public Future<?> submit(Runnable task) {
18         if (task == null) throw new NullPointerException();
19         FutureTask<Object> ftask = new FutureTask<Object>(task, null);
20         execute(ftask);
21         return ftask;
22     }
23 
submit(Runnable task, T result)24     public <T> Future<T> submit(Runnable task, T result) {
25         if (task == null) throw new NullPointerException();
26         FutureTask<T> ftask = new FutureTask<T>(task, result);
27         execute(ftask);
28         return ftask;
29     }
30 
submit(Callable<T> task)31     public <T> Future<T> submit(Callable<T> task) {
32         if (task == null) throw new NullPointerException();
33         FutureTask<T> ftask = new FutureTask<T>(task);
34         execute(ftask);
35         return ftask;
36     }
37 
doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)38     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
39                             boolean timed, long nanos)
40         throws InterruptedException, ExecutionException, TimeoutException {
41         if (tasks == null)
42             throw new NullPointerException();
43         int ntasks = tasks.size();
44         if (ntasks == 0)
45             throw new IllegalArgumentException();
46         List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
47         ExecutorCompletionService<T> ecs =
48             new ExecutorCompletionService<T>(this);
49 
50         try {
51             ExecutionException ee = null;
52             long lastTime = (timed)? System.nanoTime() : 0;
53             Iterator<? extends Callable<T>> it = tasks.iterator();
54 
55             futures.add(ecs.submit(it.next()));
56             --ntasks;
57             int active = 1;
58 
59             for (;;) {
60                 Future<T> f = ecs.poll();
61                 if (f == null) {
62                     if (ntasks > 0) {
63                         --ntasks;
64                         futures.add(ecs.submit(it.next()));
65                         ++active;
66                     }
67                     else if (active == 0)
68                         break;
69                     else if (timed) {
70                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
71                         if (f == null)
72                             throw new TimeoutException();
73                         long now = System.nanoTime();
74                         nanos -= now - lastTime;
75                         lastTime = now;
76                     }
77                     else
78                         f = ecs.take();
79                 }
80                 if (f != null) {
81                     --active;
82                     try {
83                         return f.get();
84                     } catch (InterruptedException ie) {
85                         throw ie;
86                     } catch (ExecutionException eex) {
87                         ee = eex;
88                     } catch (RuntimeException rex) {
89                         ee = new ExecutionException(rex);
90                     }
91                 }
92             }
93 
94             if (ee == null)
95                 ee = new ExecutionException();
96             throw ee;
97 
98         } finally {
99             for (Future<T> f : futures)
100                 f.cancel(true);
101         }
102     }
103 
invokeAny(Collection<? extends Callable<T>> tasks)104     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
105         throws InterruptedException, ExecutionException {
106         try {
107             return doInvokeAny(tasks, false, 0);
108         } catch (TimeoutException cannotHappen) {
109             assert false;
110             return null;
111         }
112     }
113 
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)114     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
115                            long timeout, TimeUnit unit)
116         throws InterruptedException, ExecutionException, TimeoutException {
117         return doInvokeAny(tasks, true, unit.toNanos(timeout));
118     }
119 
invokeAll(Collection<? extends Callable<T>> tasks)120     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
121         throws InterruptedException {
122         if (tasks == null)
123             throw new NullPointerException();
124         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
125         boolean done = false;
126         try {
127             for (Callable<T> t : tasks) {
128                 FutureTask<T> f = new FutureTask<T>(t);
129                 futures.add(f);
130                 execute(f);
131             }
132             for (Future<T> f : futures) {
133                 if (!f.isDone()) {
134                     try {
135                         f.get();
136                     } catch (CancellationException ignore) {
137                     } catch (ExecutionException ignore) {
138                     }
139                 }
140             }
141             done = true;
142             return futures;
143         } finally {
144             if (!done)
145                 for (Future<T> f : futures)
146                     f.cancel(true);
147         }
148     }
149 
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)150     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
151                                          long timeout, TimeUnit unit)
152         throws InterruptedException {
153         if (tasks == null || unit == null)
154             throw new NullPointerException();
155         long nanos = unit.toNanos(timeout);
156         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
157         boolean done = false;
158         try {
159             for (Callable<T> t : tasks)
160                 futures.add(new FutureTask<T>(t));
161 
162             long lastTime = System.nanoTime();
163 
164             Iterator<Future<T>> it = futures.iterator();
165             while (it.hasNext()) {
166                 execute((Runnable)(it.next()));
167                 long now = System.nanoTime();
168                 nanos -= now - lastTime;
169                 lastTime = now;
170                 if (nanos <= 0)
171                     return futures;
172             }
173 
174             for (Future<T> f : futures) {
175                 if (!f.isDone()) {
176                     if (nanos <= 0)
177                         return futures;
178                     try {
179                         f.get(nanos, TimeUnit.NANOSECONDS);
180                     } catch (CancellationException ignore) {
181                     } catch (ExecutionException ignore) {
182                     } catch (TimeoutException toe) {
183                         return futures;
184                     }
185                     long now = System.nanoTime();
186                     nanos -= now - lastTime;
187                     lastTime = now;
188                 }
189             }
190             done = true;
191             return futures;
192         } finally {
193             if (!done)
194                 for (Future<T> f : futures)
195                     f.cancel(true);
196         }
197     }
198 
199 }
200