• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 /**
39  * A {@link CompletionService} that uses a supplied {@link Executor}
40  * to execute tasks.  This class arranges that submitted tasks are,
41  * upon completion, placed on a queue accessible using {@code take}.
42  * The class is lightweight enough to be suitable for transient use
43  * when processing groups of tasks.
44  *
45  * <p>
46  *
47  * <b>Usage Examples.</b>
48  *
49  * Suppose you have a set of solvers for a certain problem, each
50  * returning a value of some type {@code Result}, and would like to
51  * run them concurrently, processing the results of each of them that
52  * return a non-null value, in some method {@code use(Result r)}. You
53  * could write this as:
54  *
55  * <pre> {@code
56  * void solve(Executor e,
57  *            Collection<Callable<Result>> solvers)
58  *     throws InterruptedException, ExecutionException {
59  *   CompletionService<Result> cs
60  *       = new ExecutorCompletionService<>(e);
61  *   solvers.forEach(cs::submit);
62  *   for (int i = solvers.size(); i > 0; i--) {
63  *     Result r = cs.take().get();
64  *     if (r != null)
65  *       use(r);
66  *   }
67  * }}</pre>
68  *
69  * Suppose instead that you would like to use the first non-null result
70  * of the set of tasks, ignoring any that encounter exceptions,
71  * and cancelling all other tasks when the first one is ready:
72  *
73  * <pre> {@code
74  * void solve(Executor e,
75  *            Collection<Callable<Result>> solvers)
76  *     throws InterruptedException {
77  *   CompletionService<Result> cs
78  *       = new ExecutorCompletionService<>(e);
79  *   int n = solvers.size();
80  *   List<Future<Result>> futures = new ArrayList<>(n);
81  *   Result result = null;
82  *   try {
83  *     solvers.forEach(solver -> futures.add(cs.submit(solver)));
84  *     for (int i = n; i > 0; i--) {
85  *       try {
86  *         Result r = cs.take().get();
87  *         if (r != null) {
88  *           result = r;
89  *           break;
90  *         }
91  *       } catch (ExecutionException ignore) {}
92  *     }
93  *   } finally {
94  *     futures.forEach(future -> future.cancel(true));
95  *   }
96  *
97  *   if (result != null)
98  *     use(result);
99  * }}</pre>
100  *
101  * @param <V> the type of values the tasks of this service produce and consume
102  *
103  * @since 1.5
104  */
105 public class ExecutorCompletionService<V> implements CompletionService<V> {
106     private final Executor executor;
107     private final AbstractExecutorService aes;
108     private final BlockingQueue<Future<V>> completionQueue;
109 
110     /**
111      * FutureTask extension to enqueue upon completion.
112      */
113     private static class QueueingFuture<V> extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task, BlockingQueue<Future<V>> completionQueue)114         QueueingFuture(RunnableFuture<V> task,
115                        BlockingQueue<Future<V>> completionQueue) {
116             super(task, null);
117             this.task = task;
118             this.completionQueue = completionQueue;
119         }
120         private final Future<V> task;
121         private final BlockingQueue<Future<V>> completionQueue;
done()122         protected void done() { completionQueue.add(task); }
123     }
124 
newTaskFor(Callable<V> task)125     private RunnableFuture<V> newTaskFor(Callable<V> task) {
126         if (aes == null)
127             return new FutureTask<V>(task);
128         else
129             return aes.newTaskFor(task);
130     }
131 
newTaskFor(Runnable task, V result)132     private RunnableFuture<V> newTaskFor(Runnable task, V result) {
133         if (aes == null)
134             return new FutureTask<V>(task, result);
135         else
136             return aes.newTaskFor(task, result);
137     }
138 
139     /**
140      * Creates an ExecutorCompletionService using the supplied
141      * executor for base task execution and a
142      * {@link LinkedBlockingQueue} as a completion queue.
143      *
144      * @param executor the executor to use
145      * @throws NullPointerException if executor is {@code null}
146      */
ExecutorCompletionService(Executor executor)147     public ExecutorCompletionService(Executor executor) {
148         if (executor == null)
149             throw new NullPointerException();
150         this.executor = executor;
151         this.aes = (executor instanceof AbstractExecutorService) ?
152             (AbstractExecutorService) executor : null;
153         this.completionQueue = new LinkedBlockingQueue<Future<V>>();
154     }
155 
156     /**
157      * Creates an ExecutorCompletionService using the supplied
158      * executor for base task execution and the supplied queue as its
159      * completion queue.
160      *
161      * @param executor the executor to use
162      * @param completionQueue the queue to use as the completion queue
163      *        normally one dedicated for use by this service. This
164      *        queue is treated as unbounded -- failed attempted
165      *        {@code Queue.add} operations for completed tasks cause
166      *        them not to be retrievable.
167      * @throws NullPointerException if executor or completionQueue are {@code null}
168      */
ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)169     public ExecutorCompletionService(Executor executor,
170                                      BlockingQueue<Future<V>> completionQueue) {
171         if (executor == null || completionQueue == null)
172             throw new NullPointerException();
173         this.executor = executor;
174         this.aes = (executor instanceof AbstractExecutorService) ?
175             (AbstractExecutorService) executor : null;
176         this.completionQueue = completionQueue;
177     }
178 
179     /**
180      * @throws RejectedExecutionException {@inheritDoc}
181      * @throws NullPointerException       {@inheritDoc}
182      */
submit(Callable<V> task)183     public Future<V> submit(Callable<V> task) {
184         if (task == null) throw new NullPointerException();
185         RunnableFuture<V> f = newTaskFor(task);
186         executor.execute(new QueueingFuture<V>(f, completionQueue));
187         return f;
188     }
189 
190     /**
191      * @throws RejectedExecutionException {@inheritDoc}
192      * @throws NullPointerException       {@inheritDoc}
193      */
submit(Runnable task, V result)194     public Future<V> submit(Runnable task, V result) {
195         if (task == null) throw new NullPointerException();
196         RunnableFuture<V> f = newTaskFor(task, result);
197         executor.execute(new QueueingFuture<V>(f, completionQueue));
198         return f;
199     }
200 
take()201     public Future<V> take() throws InterruptedException {
202         return completionQueue.take();
203     }
204 
poll()205     public Future<V> poll() {
206         return completionQueue.poll();
207     }
208 
poll(long timeout, TimeUnit unit)209     public Future<V> poll(long timeout, TimeUnit unit)
210             throws InterruptedException {
211         return completionQueue.poll(timeout, unit);
212     }
213 
214 }
215