1 /* 2 * This file is a modified version of 3 * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java?revision=1.20 4 * which contained the following notice: 5 * 6 * Written by Doug Lea with assistance from members of JCP JSR-166 7 * Expert Group and released to the public domain, as explained at 8 * http://creativecommons.org/licenses/publicdomain 9 */ 10 11 package java.util.concurrent; 12 13 import java.util.*; 14 15 public abstract class AbstractExecutorService implements ExecutorService { 16 submit(Runnable task)17 public Future<?> submit(Runnable task) { 18 if (task == null) throw new NullPointerException(); 19 FutureTask<Object> ftask = new FutureTask<Object>(task, null); 20 execute(ftask); 21 return ftask; 22 } 23 submit(Runnable task, T result)24 public <T> Future<T> submit(Runnable task, T result) { 25 if (task == null) throw new NullPointerException(); 26 FutureTask<T> ftask = new FutureTask<T>(task, result); 27 execute(ftask); 28 return ftask; 29 } 30 submit(Callable<T> task)31 public <T> Future<T> submit(Callable<T> task) { 32 if (task == null) throw new NullPointerException(); 33 FutureTask<T> ftask = new FutureTask<T>(task); 34 execute(ftask); 35 return ftask; 36 } 37 doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)38 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, 39 boolean timed, long nanos) 40 throws InterruptedException, ExecutionException, TimeoutException { 41 if (tasks == null) 42 throw new NullPointerException(); 43 int ntasks = tasks.size(); 44 if (ntasks == 0) 45 throw new IllegalArgumentException(); 46 List<Future<T>> futures= new ArrayList<Future<T>>(ntasks); 47 ExecutorCompletionService<T> ecs = 48 new ExecutorCompletionService<T>(this); 49 50 try { 51 ExecutionException ee = null; 52 long lastTime = (timed)? System.nanoTime() : 0; 53 Iterator<? extends Callable<T>> it = tasks.iterator(); 54 55 futures.add(ecs.submit(it.next())); 56 --ntasks; 57 int active = 1; 58 59 for (;;) { 60 Future<T> f = ecs.poll(); 61 if (f == null) { 62 if (ntasks > 0) { 63 --ntasks; 64 futures.add(ecs.submit(it.next())); 65 ++active; 66 } 67 else if (active == 0) 68 break; 69 else if (timed) { 70 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); 71 if (f == null) 72 throw new TimeoutException(); 73 long now = System.nanoTime(); 74 nanos -= now - lastTime; 75 lastTime = now; 76 } 77 else 78 f = ecs.take(); 79 } 80 if (f != null) { 81 --active; 82 try { 83 return f.get(); 84 } catch (InterruptedException ie) { 85 throw ie; 86 } catch (ExecutionException eex) { 87 ee = eex; 88 } catch (RuntimeException rex) { 89 ee = new ExecutionException(rex); 90 } 91 } 92 } 93 94 if (ee == null) 95 ee = new ExecutionException(); 96 throw ee; 97 98 } finally { 99 for (Future<T> f : futures) 100 f.cancel(true); 101 } 102 } 103 invokeAny(Collection<? extends Callable<T>> tasks)104 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 105 throws InterruptedException, ExecutionException { 106 try { 107 return doInvokeAny(tasks, false, 0); 108 } catch (TimeoutException cannotHappen) { 109 assert false; 110 return null; 111 } 112 } 113 invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)114 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 115 long timeout, TimeUnit unit) 116 throws InterruptedException, ExecutionException, TimeoutException { 117 return doInvokeAny(tasks, true, unit.toNanos(timeout)); 118 } 119 invokeAll(Collection<? extends Callable<T>> tasks)120 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 121 throws InterruptedException { 122 if (tasks == null) 123 throw new NullPointerException(); 124 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 125 boolean done = false; 126 try { 127 for (Callable<T> t : tasks) { 128 FutureTask<T> f = new FutureTask<T>(t); 129 futures.add(f); 130 execute(f); 131 } 132 for (Future<T> f : futures) { 133 if (!f.isDone()) { 134 try { 135 f.get(); 136 } catch (CancellationException ignore) { 137 } catch (ExecutionException ignore) { 138 } 139 } 140 } 141 done = true; 142 return futures; 143 } finally { 144 if (!done) 145 for (Future<T> f : futures) 146 f.cancel(true); 147 } 148 } 149 invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)150 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 151 long timeout, TimeUnit unit) 152 throws InterruptedException { 153 if (tasks == null || unit == null) 154 throw new NullPointerException(); 155 long nanos = unit.toNanos(timeout); 156 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 157 boolean done = false; 158 try { 159 for (Callable<T> t : tasks) 160 futures.add(new FutureTask<T>(t)); 161 162 long lastTime = System.nanoTime(); 163 164 Iterator<Future<T>> it = futures.iterator(); 165 while (it.hasNext()) { 166 execute((Runnable)(it.next())); 167 long now = System.nanoTime(); 168 nanos -= now - lastTime; 169 lastTime = now; 170 if (nanos <= 0) 171 return futures; 172 } 173 174 for (Future<T> f : futures) { 175 if (!f.isDone()) { 176 if (nanos <= 0) 177 return futures; 178 try { 179 f.get(nanos, TimeUnit.NANOSECONDS); 180 } catch (CancellationException ignore) { 181 } catch (ExecutionException ignore) { 182 } catch (TimeoutException toe) { 183 return futures; 184 } 185 long now = System.nanoTime(); 186 nanos -= now - lastTime; 187 lastTime = now; 188 } 189 } 190 done = true; 191 return futures; 192 } finally { 193 if (!done) 194 for (Future<T> f : futures) 195 f.cancel(true); 196 } 197 } 198 199 } 200