1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 9 /** 10 * A {@link CompletionService} that uses a supplied {@link Executor} 11 * to execute tasks. This class arranges that submitted tasks are, 12 * upon completion, placed on a queue accessible using {@code take}. 13 * The class is lightweight enough to be suitable for transient use 14 * when processing groups of tasks. 15 * 16 * <p> 17 * 18 * <b>Usage Examples.</b> 19 * 20 * Suppose you have a set of solvers for a certain problem, each 21 * returning a value of some type {@code Result}, and would like to 22 * run them concurrently, processing the results of each of them that 23 * return a non-null value, in some method {@code use(Result r)}. You 24 * could write this as: 25 * 26 * <pre> {@code 27 * void solve(Executor e, 28 * Collection<Callable<Result>> solvers) 29 * throws InterruptedException, ExecutionException { 30 * CompletionService<Result> ecs 31 * = new ExecutorCompletionService<Result>(e); 32 * for (Callable<Result> s : solvers) 33 * ecs.submit(s); 34 * int n = solvers.size(); 35 * for (int i = 0; i < n; ++i) { 36 * Result r = ecs.take().get(); 37 * if (r != null) 38 * use(r); 39 * } 40 * }}</pre> 41 * 42 * Suppose instead that you would like to use the first non-null result 43 * of the set of tasks, ignoring any that encounter exceptions, 44 * and cancelling all other tasks when the first one is ready: 45 * 46 * <pre> {@code 47 * void solve(Executor e, 48 * Collection<Callable<Result>> solvers) 49 * throws InterruptedException { 50 * CompletionService<Result> ecs 51 * = new ExecutorCompletionService<Result>(e); 52 * int n = solvers.size(); 53 * List<Future<Result>> futures = new ArrayList<>(n); 54 * Result result = null; 55 * try { 56 * for (Callable<Result> s : solvers) 57 * futures.add(ecs.submit(s)); 58 * for (int i = 0; i < n; ++i) { 59 * try { 60 * Result r = ecs.take().get(); 61 * if (r != null) { 62 * result = r; 63 * break; 64 * } 65 * } catch (ExecutionException ignore) {} 66 * } 67 * } 68 * finally { 69 * for (Future<Result> f : futures) 70 * f.cancel(true); 71 * } 72 * 73 * if (result != null) 74 * use(result); 75 * }}</pre> 76 */ 77 public class ExecutorCompletionService<V> implements CompletionService<V> { 78 private final Executor executor; 79 private final AbstractExecutorService aes; 80 private final BlockingQueue<Future<V>> completionQueue; 81 82 /** 83 * FutureTask extension to enqueue upon completion. 84 */ 85 private static class QueueingFuture<V> extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task, BlockingQueue<Future<V>> completionQueue)86 QueueingFuture(RunnableFuture<V> task, 87 BlockingQueue<Future<V>> completionQueue) { 88 super(task, null); 89 this.task = task; 90 this.completionQueue = completionQueue; 91 } 92 private final Future<V> task; 93 private final BlockingQueue<Future<V>> completionQueue; done()94 protected void done() { completionQueue.add(task); } 95 } 96 newTaskFor(Callable<V> task)97 private RunnableFuture<V> newTaskFor(Callable<V> task) { 98 if (aes == null) 99 return new FutureTask<V>(task); 100 else 101 return aes.newTaskFor(task); 102 } 103 newTaskFor(Runnable task, V result)104 private RunnableFuture<V> newTaskFor(Runnable task, V result) { 105 if (aes == null) 106 return new FutureTask<V>(task, result); 107 else 108 return aes.newTaskFor(task, result); 109 } 110 111 /** 112 * Creates an ExecutorCompletionService using the supplied 113 * executor for base task execution and a 114 * {@link LinkedBlockingQueue} as a completion queue. 115 * 116 * @param executor the executor to use 117 * @throws NullPointerException if executor is {@code null} 118 */ ExecutorCompletionService(Executor executor)119 public ExecutorCompletionService(Executor executor) { 120 if (executor == null) 121 throw new NullPointerException(); 122 this.executor = executor; 123 this.aes = (executor instanceof AbstractExecutorService) ? 124 (AbstractExecutorService) executor : null; 125 this.completionQueue = new LinkedBlockingQueue<Future<V>>(); 126 } 127 128 /** 129 * Creates an ExecutorCompletionService using the supplied 130 * executor for base task execution and the supplied queue as its 131 * completion queue. 132 * 133 * @param executor the executor to use 134 * @param completionQueue the queue to use as the completion queue 135 * normally one dedicated for use by this service. This 136 * queue is treated as unbounded -- failed attempted 137 * {@code Queue.add} operations for completed tasks cause 138 * them not to be retrievable. 139 * @throws NullPointerException if executor or completionQueue are {@code null} 140 */ ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)141 public ExecutorCompletionService(Executor executor, 142 BlockingQueue<Future<V>> completionQueue) { 143 if (executor == null || completionQueue == null) 144 throw new NullPointerException(); 145 this.executor = executor; 146 this.aes = (executor instanceof AbstractExecutorService) ? 147 (AbstractExecutorService) executor : null; 148 this.completionQueue = completionQueue; 149 } 150 submit(Callable<V> task)151 public Future<V> submit(Callable<V> task) { 152 if (task == null) throw new NullPointerException(); 153 RunnableFuture<V> f = newTaskFor(task); 154 executor.execute(new QueueingFuture<V>(f, completionQueue)); 155 return f; 156 } 157 submit(Runnable task, V result)158 public Future<V> submit(Runnable task, V result) { 159 if (task == null) throw new NullPointerException(); 160 RunnableFuture<V> f = newTaskFor(task, result); 161 executor.execute(new QueueingFuture<V>(f, completionQueue)); 162 return f; 163 } 164 take()165 public Future<V> take() throws InterruptedException { 166 return completionQueue.take(); 167 } 168 poll()169 public Future<V> poll() { 170 return completionQueue.poll(); 171 } 172 poll(long timeout, TimeUnit unit)173 public Future<V> poll(long timeout, TimeUnit unit) 174 throws InterruptedException { 175 return completionQueue.poll(timeout, unit); 176 } 177 178 } 179