• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.Objects;
28 import java.util.function.Consumer;
29 import java.util.function.DoubleConsumer;
30 import java.util.function.IntConsumer;
31 import java.util.function.LongConsumer;
32 
33 /**
34  * An extension of {@link Consumer} used to conduct values through the stages of
35  * a stream pipeline, with additional methods to manage size information,
36  * control flow, etc.  Before calling the {@code accept()} method on a
37  * {@code Sink} for the first time, you must first call the {@code begin()}
38  * method to inform it that data is coming (optionally informing the sink how
39  * much data is coming), and after all data has been sent, you must call the
40  * {@code end()} method.  After calling {@code end()}, you should not call
41  * {@code accept()} without again calling {@code begin()}.  {@code Sink} also
42  * offers a mechanism by which the sink can cooperatively signal that it does
43  * not wish to receive any more data (the {@code cancellationRequested()}
44  * method), which a source can poll before sending more data to the
45  * {@code Sink}.
46  *
47  * <p>A sink may be in one of two states: an initial state and an active state.
48  * It starts out in the initial state; the {@code begin()} method transitions
49  * it to the active state, and the {@code end()} method transitions it back into
50  * the initial state, where it can be re-used.  Data-accepting methods (such as
51  * {@code accept()} are only valid in the active state.
52  *
53  * @apiNote
54  * A stream pipeline consists of a source, zero or more intermediate stages
55  * (such as filtering or mapping), and a terminal stage, such as reduction or
56  * for-each.  For concreteness, consider the pipeline:
57  *
58  * <pre>{@code
59  *     int longestStringLengthStartingWithA
60  *         = strings.stream()
61  *                  .filter(s -> s.startsWith("A"))
62  *                  .mapToInt(String::length)
63  *                  .max();
64  * }</pre>
65  *
66  * <p>Here, we have three stages, filtering, mapping, and reducing.  The
67  * filtering stage consumes strings and emits a subset of those strings; the
68  * mapping stage consumes strings and emits ints; the reduction stage consumes
69  * those ints and computes the maximal value.
70  *
71  * <p>A {@code Sink} instance is used to represent each stage of this pipeline,
72  * whether the stage accepts objects, ints, longs, or doubles.  Sink has entry
73  * points for {@code accept(Object)}, {@code accept(int)}, etc, so that we do
74  * not need a specialized interface for each primitive specialization.  (It
75  * might be called a "kitchen sink" for this omnivorous tendency.)  The entry
76  * point to the pipeline is the {@code Sink} for the filtering stage, which
77  * sends some elements "downstream" -- into the {@code Sink} for the mapping
78  * stage, which in turn sends integral values downstream into the {@code Sink}
79  * for the reduction stage. The {@code Sink} implementations associated with a
80  * given stage is expected to know the data type for the next stage, and call
81  * the correct {@code accept} method on its downstream {@code Sink}.  Similarly,
82  * each stage must implement the correct {@code accept} method corresponding to
83  * the data type it accepts.
84  *
85  * <p>The specialized subtypes such as {@link Sink.OfInt} override
86  * {@code accept(Object)} to call the appropriate primitive specialization of
87  * {@code accept}, implement the appropriate primitive specialization of
88  * {@code Consumer}, and re-abstract the appropriate primitive specialization of
89  * {@code accept}.
90  *
91  * <p>The chaining subtypes such as {@link ChainedInt} not only implement
92  * {@code Sink.OfInt}, but also maintain a {@code downstream} field which
93  * represents the downstream {@code Sink}, and implement the methods
94  * {@code begin()}, {@code end()}, and {@code cancellationRequested()} to
95  * delegate to the downstream {@code Sink}.  Most implementations of
96  * intermediate operations will use these chaining wrappers.  For example, the
97  * mapping stage in the above example would look like:
98  *
99  * <pre>{@code
100  *     IntSink is = new Sink.ChainedReference<U>(sink) {
101  *         public void accept(U u) {
102  *             downstream.accept(mapper.applyAsInt(u));
103  *         }
104  *     };
105  * }</pre>
106  *
107  * <p>Here, we implement {@code Sink.ChainedReference<U>}, meaning that we expect
108  * to receive elements of type {@code U} as input, and pass the downstream sink
109  * to the constructor.  Because the next stage expects to receive integers, we
110  * must call the {@code accept(int)} method when emitting values to the downstream.
111  * The {@code accept()} method applies the mapping function from {@code U} to
112  * {@code int} and passes the resulting value to the downstream {@code Sink}.
113  *
114  * @param <T> type of elements for value streams
115  * @since 1.8
116  * @hide Visible for CTS testing only (OpenJDK8 tests).
117  */
118 // Android-changed: Made public for CTS tests only.
119 public interface Sink<T> extends Consumer<T> {
120     /**
121      * Resets the sink state to receive a fresh data set.  This must be called
122      * before sending any data to the sink.  After calling {@link #end()},
123      * you may call this method to reset the sink for another calculation.
124      * @param size The exact size of the data to be pushed downstream, if
125      * known or {@code -1} if unknown or infinite.
126      *
127      * <p>Prior to this call, the sink must be in the initial state, and after
128      * this call it is in the active state.
129      */
begin(long size)130     default void begin(long size) {}
131 
132     /**
133      * Indicates that all elements have been pushed.  If the {@code Sink} is
134      * stateful, it should send any stored state downstream at this time, and
135      * should clear any accumulated state (and associated resources).
136      *
137      * <p>Prior to this call, the sink must be in the active state, and after
138      * this call it is returned to the initial state.
139      */
end()140     default void end() {}
141 
142     /**
143      * Indicates that this {@code Sink} does not wish to receive any more data.
144      *
145      * @implSpec The default implementation always returns false.
146      *
147      * @return true if cancellation is requested
148      */
cancellationRequested()149     default boolean cancellationRequested() {
150         return false;
151     }
152 
153     /**
154      * Accepts an int value.
155      *
156      * @implSpec The default implementation throws IllegalStateException.
157      *
158      * @throws IllegalStateException if this sink does not accept int values
159      */
accept(int value)160     default void accept(int value) {
161         throw new IllegalStateException("called wrong accept method");
162     }
163 
164     /**
165      * Accepts a long value.
166      *
167      * @implSpec The default implementation throws IllegalStateException.
168      *
169      * @throws IllegalStateException if this sink does not accept long values
170      */
accept(long value)171     default void accept(long value) {
172         throw new IllegalStateException("called wrong accept method");
173     }
174 
175     /**
176      * Accepts a double value.
177      *
178      * @implSpec The default implementation throws IllegalStateException.
179      *
180      * @throws IllegalStateException if this sink does not accept double values
181      */
accept(double value)182     default void accept(double value) {
183         throw new IllegalStateException("called wrong accept method");
184     }
185 
186     /**
187      * {@code Sink} that implements {@code Sink<Integer>}, re-abstracts
188      * {@code accept(int)}, and wires {@code accept(Integer)} to bridge to
189      * {@code accept(int)}.
190      */
191     @SuppressWarnings("overloads")
192     interface OfInt extends Sink<Integer>, IntConsumer {
193         @Override
accept(int value)194         void accept(int value);
195 
196         @Override
accept(Integer i)197         default void accept(Integer i) {
198             if (Tripwire.ENABLED)
199                 Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)");
200             accept(i.intValue());
201         }
202     }
203 
204     /**
205      * {@code Sink} that implements {@code Sink<Long>}, re-abstracts
206      * {@code accept(long)}, and wires {@code accept(Long)} to bridge to
207      * {@code accept(long)}.
208      */
209     @SuppressWarnings("overloads")
210     interface OfLong extends Sink<Long>, LongConsumer {
211         @Override
accept(long value)212         void accept(long value);
213 
214         @Override
accept(Long i)215         default void accept(Long i) {
216             if (Tripwire.ENABLED)
217                 Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)");
218             accept(i.longValue());
219         }
220     }
221 
222     /**
223      * {@code Sink} that implements {@code Sink<Double>}, re-abstracts
224      * {@code accept(double)}, and wires {@code accept(Double)} to bridge to
225      * {@code accept(double)}.
226      */
227     @SuppressWarnings("overloads")
228     interface OfDouble extends Sink<Double>, DoubleConsumer {
229         @Override
accept(double value)230         void accept(double value);
231 
232         @Override
accept(Double i)233         default void accept(Double i) {
234             if (Tripwire.ENABLED)
235                 Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)");
236             accept(i.doubleValue());
237         }
238     }
239 
240     /**
241      * Abstract {@code Sink} implementation for creating chains of
242      * sinks.  The {@code begin}, {@code end}, and
243      * {@code cancellationRequested} methods are wired to chain to the
244      * downstream {@code Sink}.  This implementation takes a downstream
245      * {@code Sink} of unknown input shape and produces a {@code Sink<T>}.  The
246      * implementation of the {@code accept()} method must call the correct
247      * {@code accept()} method on the downstream {@code Sink}.
248      */
249     abstract static class ChainedReference<T, E_OUT> implements Sink<T> {
250         protected final Sink<? super E_OUT> downstream;
251 
ChainedReference(Sink<? super E_OUT> downstream)252         public ChainedReference(Sink<? super E_OUT> downstream) {
253             this.downstream = Objects.requireNonNull(downstream);
254         }
255 
256         @Override
begin(long size)257         public void begin(long size) {
258             downstream.begin(size);
259         }
260 
261         @Override
end()262         public void end() {
263             downstream.end();
264         }
265 
266         @Override
cancellationRequested()267         public boolean cancellationRequested() {
268             return downstream.cancellationRequested();
269         }
270     }
271 
272     /**
273      * Abstract {@code Sink} implementation designed for creating chains of
274      * sinks.  The {@code begin}, {@code end}, and
275      * {@code cancellationRequested} methods are wired to chain to the
276      * downstream {@code Sink}.  This implementation takes a downstream
277      * {@code Sink} of unknown input shape and produces a {@code Sink.OfInt}.
278      * The implementation of the {@code accept()} method must call the correct
279      * {@code accept()} method on the downstream {@code Sink}.
280      */
281     abstract static class ChainedInt<E_OUT> implements Sink.OfInt {
282         protected final Sink<? super E_OUT> downstream;
283 
ChainedInt(Sink<? super E_OUT> downstream)284         public ChainedInt(Sink<? super E_OUT> downstream) {
285             this.downstream = Objects.requireNonNull(downstream);
286         }
287 
288         @Override
begin(long size)289         public void begin(long size) {
290             downstream.begin(size);
291         }
292 
293         @Override
end()294         public void end() {
295             downstream.end();
296         }
297 
298         @Override
cancellationRequested()299         public boolean cancellationRequested() {
300             return downstream.cancellationRequested();
301         }
302     }
303 
304     /**
305      * Abstract {@code Sink} implementation designed for creating chains of
306      * sinks.  The {@code begin}, {@code end}, and
307      * {@code cancellationRequested} methods are wired to chain to the
308      * downstream {@code Sink}.  This implementation takes a downstream
309      * {@code Sink} of unknown input shape and produces a {@code Sink.OfLong}.
310      * The implementation of the {@code accept()} method must call the correct
311      * {@code accept()} method on the downstream {@code Sink}.
312      */
313     abstract static class ChainedLong<E_OUT> implements Sink.OfLong {
314         protected final Sink<? super E_OUT> downstream;
315 
ChainedLong(Sink<? super E_OUT> downstream)316         public ChainedLong(Sink<? super E_OUT> downstream) {
317             this.downstream = Objects.requireNonNull(downstream);
318         }
319 
320         @Override
begin(long size)321         public void begin(long size) {
322             downstream.begin(size);
323         }
324 
325         @Override
end()326         public void end() {
327             downstream.end();
328         }
329 
330         @Override
cancellationRequested()331         public boolean cancellationRequested() {
332             return downstream.cancellationRequested();
333         }
334     }
335 
336     /**
337      * Abstract {@code Sink} implementation designed for creating chains of
338      * sinks.  The {@code begin}, {@code end}, and
339      * {@code cancellationRequested} methods are wired to chain to the
340      * downstream {@code Sink}.  This implementation takes a downstream
341      * {@code Sink} of unknown input shape and produces a {@code Sink.OfDouble}.
342      * The implementation of the {@code accept()} method must call the correct
343      * {@code accept()} method on the downstream {@code Sink}.
344      */
345     abstract static class ChainedDouble<E_OUT> implements Sink.OfDouble {
346         protected final Sink<? super E_OUT> downstream;
347 
ChainedDouble(Sink<? super E_OUT> downstream)348         public ChainedDouble(Sink<? super E_OUT> downstream) {
349             this.downstream = Objects.requireNonNull(downstream);
350         }
351 
352         @Override
begin(long size)353         public void begin(long size) {
354             downstream.begin(size);
355         }
356 
357         @Override
end()358         public void end() {
359             downstream.end();
360         }
361 
362         @Override
cancellationRequested()363         public boolean cancellationRequested() {
364             return downstream.cancellationRequested();
365         }
366     }
367 }
368