1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import static java.util.concurrent.TimeUnit.NANOSECONDS; 39 40 import java.util.ArrayList; 41 import java.util.Collection; 42 import java.util.Iterator; 43 import java.util.List; 44 45 /** 46 * Provides default implementations of {@link ExecutorService} 47 * execution methods. This class implements the {@code submit}, 48 * {@code invokeAny} and {@code invokeAll} methods using a 49 * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults 50 * to the {@link FutureTask} class provided in this package. For example, 51 * the implementation of {@code submit(Runnable)} creates an 52 * associated {@code RunnableFuture} that is executed and 53 * returned. Subclasses may override the {@code newTaskFor} methods 54 * to return {@code RunnableFuture} implementations other than 55 * {@code FutureTask}. 56 * 57 * <p><b>Extension example</b>. Here is a sketch of a class 58 * that customizes {@link ThreadPoolExecutor} to use 59 * a {@code CustomTask} class instead of the default {@code FutureTask}: 60 * <pre> {@code 61 * public class CustomThreadPoolExecutor extends ThreadPoolExecutor { 62 * 63 * static class CustomTask<V> implements RunnableFuture<V> {...} 64 * 65 * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) { 66 * return new CustomTask<V>(c); 67 * } 68 * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) { 69 * return new CustomTask<V>(r, v); 70 * } 71 * // ... add constructors, etc. 72 * }}</pre> 73 * 74 * @since 1.5 75 * @author Doug Lea 76 */ 77 public abstract class AbstractExecutorService implements ExecutorService { 78 79 /** 80 * Returns a {@code RunnableFuture} for the given runnable and default 81 * value. 82 * 83 * @param runnable the runnable task being wrapped 84 * @param value the default value for the returned future 85 * @param <T> the type of the given value 86 * @return a {@code RunnableFuture} which, when run, will run the 87 * underlying runnable and which, as a {@code Future}, will yield 88 * the given value as its result and provide for cancellation of 89 * the underlying task 90 * @since 1.6 91 */ newTaskFor(Runnable runnable, T value)92 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 93 return new FutureTask<T>(runnable, value); 94 } 95 96 /** 97 * Returns a {@code RunnableFuture} for the given callable task. 98 * 99 * @param callable the callable task being wrapped 100 * @param <T> the type of the callable's result 101 * @return a {@code RunnableFuture} which, when run, will call the 102 * underlying callable and which, as a {@code Future}, will yield 103 * the callable's result as its result and provide for 104 * cancellation of the underlying task 105 * @since 1.6 106 */ newTaskFor(Callable<T> callable)107 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 108 return new FutureTask<T>(callable); 109 } 110 111 /** 112 * @throws RejectedExecutionException {@inheritDoc} 113 * @throws NullPointerException {@inheritDoc} 114 */ submit(Runnable task)115 public Future<?> submit(Runnable task) { 116 if (task == null) throw new NullPointerException(); 117 RunnableFuture<Void> ftask = newTaskFor(task, null); 118 execute(ftask); 119 return ftask; 120 } 121 122 /** 123 * @throws RejectedExecutionException {@inheritDoc} 124 * @throws NullPointerException {@inheritDoc} 125 */ submit(Runnable task, T result)126 public <T> Future<T> submit(Runnable task, T result) { 127 if (task == null) throw new NullPointerException(); 128 RunnableFuture<T> ftask = newTaskFor(task, result); 129 execute(ftask); 130 return ftask; 131 } 132 133 /** 134 * @throws RejectedExecutionException {@inheritDoc} 135 * @throws NullPointerException {@inheritDoc} 136 */ submit(Callable<T> task)137 public <T> Future<T> submit(Callable<T> task) { 138 if (task == null) throw new NullPointerException(); 139 RunnableFuture<T> ftask = newTaskFor(task); 140 execute(ftask); 141 return ftask; 142 } 143 144 /** 145 * the main mechanics of invokeAny. 146 */ doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)147 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, 148 boolean timed, long nanos) 149 throws InterruptedException, ExecutionException, TimeoutException { 150 if (tasks == null) 151 throw new NullPointerException(); 152 int ntasks = tasks.size(); 153 if (ntasks == 0) 154 throw new IllegalArgumentException(); 155 ArrayList<Future<T>> futures = new ArrayList<>(ntasks); 156 ExecutorCompletionService<T> ecs = 157 new ExecutorCompletionService<T>(this); 158 159 // For efficiency, especially in executors with limited 160 // parallelism, check to see if previously submitted tasks are 161 // done before submitting more of them. This interleaving 162 // plus the exception mechanics account for messiness of main 163 // loop. 164 165 try { 166 // Record exceptions so that if we fail to obtain any 167 // result, we can throw the last exception we got. 168 ExecutionException ee = null; 169 final long deadline = timed ? System.nanoTime() + nanos : 0L; 170 Iterator<? extends Callable<T>> it = tasks.iterator(); 171 172 // Start one task for sure; the rest incrementally 173 futures.add(ecs.submit(it.next())); 174 --ntasks; 175 int active = 1; 176 177 for (;;) { 178 Future<T> f = ecs.poll(); 179 if (f == null) { 180 if (ntasks > 0) { 181 --ntasks; 182 futures.add(ecs.submit(it.next())); 183 ++active; 184 } 185 else if (active == 0) 186 break; 187 else if (timed) { 188 f = ecs.poll(nanos, NANOSECONDS); 189 if (f == null) 190 throw new TimeoutException(); 191 nanos = deadline - System.nanoTime(); 192 } 193 else 194 f = ecs.take(); 195 } 196 if (f != null) { 197 --active; 198 try { 199 return f.get(); 200 } catch (ExecutionException eex) { 201 ee = eex; 202 } catch (RuntimeException rex) { 203 ee = new ExecutionException(rex); 204 } 205 } 206 } 207 208 if (ee == null) 209 ee = new ExecutionException(); 210 throw ee; 211 212 } finally { 213 cancelAll(futures); 214 } 215 } 216 invokeAny(Collection<? extends Callable<T>> tasks)217 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 218 throws InterruptedException, ExecutionException { 219 try { 220 return doInvokeAny(tasks, false, 0); 221 } catch (TimeoutException cannotHappen) { 222 assert false; 223 return null; 224 } 225 } 226 invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)227 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 228 long timeout, TimeUnit unit) 229 throws InterruptedException, ExecutionException, TimeoutException { 230 return doInvokeAny(tasks, true, unit.toNanos(timeout)); 231 } 232 invokeAll(Collection<? extends Callable<T>> tasks)233 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 234 throws InterruptedException { 235 if (tasks == null) 236 throw new NullPointerException(); 237 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); 238 try { 239 for (Callable<T> t : tasks) { 240 RunnableFuture<T> f = newTaskFor(t); 241 futures.add(f); 242 execute(f); 243 } 244 for (int i = 0, size = futures.size(); i < size; i++) { 245 Future<T> f = futures.get(i); 246 if (!f.isDone()) { 247 try { f.get(); } 248 catch (CancellationException ignore) {} 249 catch (ExecutionException ignore) {} 250 } 251 } 252 return futures; 253 } catch (Throwable t) { 254 cancelAll(futures); 255 throw t; 256 } 257 } 258 invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)259 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 260 long timeout, TimeUnit unit) 261 throws InterruptedException { 262 if (tasks == null) 263 throw new NullPointerException(); 264 final long nanos = unit.toNanos(timeout); 265 final long deadline = System.nanoTime() + nanos; 266 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); 267 int j = 0; 268 timedOut: try { 269 for (Callable<T> t : tasks) 270 futures.add(newTaskFor(t)); 271 272 final int size = futures.size(); 273 274 // Interleave time checks and calls to execute in case 275 // executor doesn't have any/much parallelism. 276 for (int i = 0; i < size; i++) { 277 if (((i == 0) ? nanos : deadline - System.nanoTime()) <= 0L) 278 break timedOut; 279 execute((Runnable)futures.get(i)); 280 } 281 282 for (; j < size; j++) { 283 Future<T> f = futures.get(j); 284 if (!f.isDone()) { 285 try { f.get(deadline - System.nanoTime(), NANOSECONDS); } 286 catch (CancellationException ignore) {} 287 catch (ExecutionException ignore) {} 288 catch (TimeoutException timedOut) { 289 break timedOut; 290 } 291 } 292 } 293 return futures; 294 } catch (Throwable t) { 295 cancelAll(futures); 296 throw t; 297 } 298 // Timed out before all the tasks could be completed; cancel remaining 299 cancelAll(futures, j); 300 return futures; 301 } 302 cancelAll(ArrayList<Future<T>> futures)303 private static <T> void cancelAll(ArrayList<Future<T>> futures) { 304 cancelAll(futures, 0); 305 } 306 307 /** Cancels all futures with index at least j. */ cancelAll(ArrayList<Future<T>> futures, int j)308 private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) { 309 for (int size = futures.size(); j < size; j++) 310 futures.get(j).cancel(true); 311 } 312 } 313