1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 9 import java.util.*; 10 11 /** 12 * Provides default implementations of {@link ExecutorService} 13 * execution methods. This class implements the {@code submit}, 14 * {@code invokeAny} and {@code invokeAll} methods using a 15 * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults 16 * to the {@link FutureTask} class provided in this package. For example, 17 * the implementation of {@code submit(Runnable)} creates an 18 * associated {@code RunnableFuture} that is executed and 19 * returned. Subclasses may override the {@code newTaskFor} methods 20 * to return {@code RunnableFuture} implementations other than 21 * {@code FutureTask}. 22 * 23 * <p><b>Extension example</b>. Here is a sketch of a class 24 * that customizes {@link ThreadPoolExecutor} to use 25 * a {@code CustomTask} class instead of the default {@code FutureTask}: 26 * <pre> {@code 27 * public class CustomThreadPoolExecutor extends ThreadPoolExecutor { 28 * 29 * static class CustomTask<V> implements RunnableFuture<V> {...} 30 * 31 * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) { 32 * return new CustomTask<V>(c); 33 * } 34 * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) { 35 * return new CustomTask<V>(r, v); 36 * } 37 * // ... add constructors, etc. 38 * }}</pre> 39 * 40 * @since 1.5 41 * @author Doug Lea 42 */ 43 public abstract class AbstractExecutorService implements ExecutorService { 44 45 /** 46 * Returns a {@code RunnableFuture} for the given runnable and default 47 * value. 48 * 49 * @param runnable the runnable task being wrapped 50 * @param value the default value for the returned future 51 * @return a {@code RunnableFuture} which, when run, will run the 52 * underlying runnable and which, as a {@code Future}, will yield 53 * the given value as its result and provide for cancellation of 54 * the underlying task 55 * @since 1.6 56 */ newTaskFor(Runnable runnable, T value)57 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 58 return new FutureTask<T>(runnable, value); 59 } 60 61 /** 62 * Returns a {@code RunnableFuture} for the given callable task. 63 * 64 * @param callable the callable task being wrapped 65 * @return a {@code RunnableFuture} which, when run, will call the 66 * underlying callable and which, as a {@code Future}, will yield 67 * the callable's result as its result and provide for 68 * cancellation of the underlying task 69 * @since 1.6 70 */ newTaskFor(Callable<T> callable)71 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 72 return new FutureTask<T>(callable); 73 } 74 75 /** 76 * @throws RejectedExecutionException {@inheritDoc} 77 * @throws NullPointerException {@inheritDoc} 78 */ submit(Runnable task)79 public Future<?> submit(Runnable task) { 80 if (task == null) throw new NullPointerException(); 81 RunnableFuture<Void> ftask = newTaskFor(task, null); 82 execute(ftask); 83 return ftask; 84 } 85 86 /** 87 * @throws RejectedExecutionException {@inheritDoc} 88 * @throws NullPointerException {@inheritDoc} 89 */ submit(Runnable task, T result)90 public <T> Future<T> submit(Runnable task, T result) { 91 if (task == null) throw new NullPointerException(); 92 RunnableFuture<T> ftask = newTaskFor(task, result); 93 execute(ftask); 94 return ftask; 95 } 96 97 /** 98 * @throws RejectedExecutionException {@inheritDoc} 99 * @throws NullPointerException {@inheritDoc} 100 */ submit(Callable<T> task)101 public <T> Future<T> submit(Callable<T> task) { 102 if (task == null) throw new NullPointerException(); 103 RunnableFuture<T> ftask = newTaskFor(task); 104 execute(ftask); 105 return ftask; 106 } 107 108 /** 109 * the main mechanics of invokeAny. 110 */ doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)111 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, 112 boolean timed, long nanos) 113 throws InterruptedException, ExecutionException, TimeoutException { 114 if (tasks == null) 115 throw new NullPointerException(); 116 int ntasks = tasks.size(); 117 if (ntasks == 0) 118 throw new IllegalArgumentException(); 119 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks); 120 ExecutorCompletionService<T> ecs = 121 new ExecutorCompletionService<T>(this); 122 123 // For efficiency, especially in executors with limited 124 // parallelism, check to see if previously submitted tasks are 125 // done before submitting more of them. This interleaving 126 // plus the exception mechanics account for messiness of main 127 // loop. 128 129 try { 130 // Record exceptions so that if we fail to obtain any 131 // result, we can throw the last exception we got. 132 ExecutionException ee = null; 133 final long deadline = timed ? System.nanoTime() + nanos : 0L; 134 Iterator<? extends Callable<T>> it = tasks.iterator(); 135 136 // Start one task for sure; the rest incrementally 137 futures.add(ecs.submit(it.next())); 138 --ntasks; 139 int active = 1; 140 141 for (;;) { 142 Future<T> f = ecs.poll(); 143 if (f == null) { 144 if (ntasks > 0) { 145 --ntasks; 146 futures.add(ecs.submit(it.next())); 147 ++active; 148 } 149 else if (active == 0) 150 break; 151 else if (timed) { 152 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); 153 if (f == null) 154 throw new TimeoutException(); 155 nanos = deadline - System.nanoTime(); 156 } 157 else 158 f = ecs.take(); 159 } 160 if (f != null) { 161 --active; 162 try { 163 return f.get(); 164 } catch (ExecutionException eex) { 165 ee = eex; 166 } catch (RuntimeException rex) { 167 ee = new ExecutionException(rex); 168 } 169 } 170 } 171 172 if (ee == null) 173 ee = new ExecutionException(); 174 throw ee; 175 176 } finally { 177 for (int i = 0, size = futures.size(); i < size; i++) 178 futures.get(i).cancel(true); 179 } 180 } 181 invokeAny(Collection<? extends Callable<T>> tasks)182 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 183 throws InterruptedException, ExecutionException { 184 try { 185 return doInvokeAny(tasks, false, 0); 186 } catch (TimeoutException cannotHappen) { 187 assert false; 188 return null; 189 } 190 } 191 invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)192 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 193 long timeout, TimeUnit unit) 194 throws InterruptedException, ExecutionException, TimeoutException { 195 return doInvokeAny(tasks, true, unit.toNanos(timeout)); 196 } 197 invokeAll(Collection<? extends Callable<T>> tasks)198 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 199 throws InterruptedException { 200 if (tasks == null) 201 throw new NullPointerException(); 202 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 203 boolean done = false; 204 try { 205 for (Callable<T> t : tasks) { 206 RunnableFuture<T> f = newTaskFor(t); 207 futures.add(f); 208 execute(f); 209 } 210 for (int i = 0, size = futures.size(); i < size; i++) { 211 Future<T> f = futures.get(i); 212 if (!f.isDone()) { 213 try { 214 f.get(); 215 } catch (CancellationException ignore) { 216 } catch (ExecutionException ignore) { 217 } 218 } 219 } 220 done = true; 221 return futures; 222 } finally { 223 if (!done) 224 for (int i = 0, size = futures.size(); i < size; i++) 225 futures.get(i).cancel(true); 226 } 227 } 228 invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)229 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 230 long timeout, TimeUnit unit) 231 throws InterruptedException { 232 if (tasks == null) 233 throw new NullPointerException(); 234 long nanos = unit.toNanos(timeout); 235 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 236 boolean done = false; 237 try { 238 for (Callable<T> t : tasks) 239 futures.add(newTaskFor(t)); 240 241 final long deadline = System.nanoTime() + nanos; 242 final int size = futures.size(); 243 244 // Interleave time checks and calls to execute in case 245 // executor doesn't have any/much parallelism. 246 for (int i = 0; i < size; i++) { 247 execute((Runnable)futures.get(i)); 248 nanos = deadline - System.nanoTime(); 249 if (nanos <= 0L) 250 return futures; 251 } 252 253 for (int i = 0; i < size; i++) { 254 Future<T> f = futures.get(i); 255 if (!f.isDone()) { 256 if (nanos <= 0L) 257 return futures; 258 try { 259 f.get(nanos, TimeUnit.NANOSECONDS); 260 } catch (CancellationException ignore) { 261 } catch (ExecutionException ignore) { 262 } catch (TimeoutException toe) { 263 return futures; 264 } 265 nanos = deadline - System.nanoTime(); 266 } 267 } 268 done = true; 269 return futures; 270 } finally { 271 if (!done) 272 for (int i = 0, size = futures.size(); i < size; i++) 273 futures.get(i).cancel(true); 274 } 275 } 276 277 } 278