1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Objects; 28 import java.util.function.Consumer; 29 import java.util.function.DoubleConsumer; 30 import java.util.function.IntConsumer; 31 import java.util.function.LongConsumer; 32 33 /** 34 * An extension of {@link Consumer} used to conduct values through the stages of 35 * a stream pipeline, with additional methods to manage size information, 36 * control flow, etc. Before calling the {@code accept()} method on a 37 * {@code Sink} for the first time, you must first call the {@code begin()} 38 * method to inform it that data is coming (optionally informing the sink how 39 * much data is coming), and after all data has been sent, you must call the 40 * {@code end()} method. After calling {@code end()}, you should not call 41 * {@code accept()} without again calling {@code begin()}. {@code Sink} also 42 * offers a mechanism by which the sink can cooperatively signal that it does 43 * not wish to receive any more data (the {@code cancellationRequested()} 44 * method), which a source can poll before sending more data to the 45 * {@code Sink}. 46 * 47 * <p>A sink may be in one of two states: an initial state and an active state. 48 * It starts out in the initial state; the {@code begin()} method transitions 49 * it to the active state, and the {@code end()} method transitions it back into 50 * the initial state, where it can be re-used. Data-accepting methods (such as 51 * {@code accept()} are only valid in the active state. 52 * 53 * @apiNote 54 * A stream pipeline consists of a source, zero or more intermediate stages 55 * (such as filtering or mapping), and a terminal stage, such as reduction or 56 * for-each. For concreteness, consider the pipeline: 57 * 58 * <pre>{@code 59 * int longestStringLengthStartingWithA 60 * = strings.stream() 61 * .filter(s -> s.startsWith("A")) 62 * .mapToInt(String::length) 63 * .max(); 64 * }</pre> 65 * 66 * <p>Here, we have three stages, filtering, mapping, and reducing. The 67 * filtering stage consumes strings and emits a subset of those strings; the 68 * mapping stage consumes strings and emits ints; the reduction stage consumes 69 * those ints and computes the maximal value. 70 * 71 * <p>A {@code Sink} instance is used to represent each stage of this pipeline, 72 * whether the stage accepts objects, ints, longs, or doubles. Sink has entry 73 * points for {@code accept(Object)}, {@code accept(int)}, etc, so that we do 74 * not need a specialized interface for each primitive specialization. (It 75 * might be called a "kitchen sink" for this omnivorous tendency.) The entry 76 * point to the pipeline is the {@code Sink} for the filtering stage, which 77 * sends some elements "downstream" -- into the {@code Sink} for the mapping 78 * stage, which in turn sends integral values downstream into the {@code Sink} 79 * for the reduction stage. The {@code Sink} implementations associated with a 80 * given stage is expected to know the data type for the next stage, and call 81 * the correct {@code accept} method on its downstream {@code Sink}. Similarly, 82 * each stage must implement the correct {@code accept} method corresponding to 83 * the data type it accepts. 84 * 85 * <p>The specialized subtypes such as {@link Sink.OfInt} override 86 * {@code accept(Object)} to call the appropriate primitive specialization of 87 * {@code accept}, implement the appropriate primitive specialization of 88 * {@code Consumer}, and re-abstract the appropriate primitive specialization of 89 * {@code accept}. 90 * 91 * <p>The chaining subtypes such as {@link ChainedInt} not only implement 92 * {@code Sink.OfInt}, but also maintain a {@code downstream} field which 93 * represents the downstream {@code Sink}, and implement the methods 94 * {@code begin()}, {@code end()}, and {@code cancellationRequested()} to 95 * delegate to the downstream {@code Sink}. Most implementations of 96 * intermediate operations will use these chaining wrappers. For example, the 97 * mapping stage in the above example would look like: 98 * 99 * <pre>{@code 100 * IntSink is = new Sink.ChainedReference<U>(sink) { 101 * public void accept(U u) { 102 * downstream.accept(mapper.applyAsInt(u)); 103 * } 104 * }; 105 * }</pre> 106 * 107 * <p>Here, we implement {@code Sink.ChainedReference<U>}, meaning that we expect 108 * to receive elements of type {@code U} as input, and pass the downstream sink 109 * to the constructor. Because the next stage expects to receive integers, we 110 * must call the {@code accept(int)} method when emitting values to the downstream. 111 * The {@code accept()} method applies the mapping function from {@code U} to 112 * {@code int} and passes the resulting value to the downstream {@code Sink}. 113 * 114 * @param <T> type of elements for value streams 115 * @since 1.8 116 * @hide Visible for CTS testing only (OpenJDK8 tests). 117 */ 118 // Android-changed: Made public for CTS tests only. 119 public interface Sink<T> extends Consumer<T> { 120 /** 121 * Resets the sink state to receive a fresh data set. This must be called 122 * before sending any data to the sink. After calling {@link #end()}, 123 * you may call this method to reset the sink for another calculation. 124 * @param size The exact size of the data to be pushed downstream, if 125 * known or {@code -1} if unknown or infinite. 126 * 127 * <p>Prior to this call, the sink must be in the initial state, and after 128 * this call it is in the active state. 129 */ begin(long size)130 default void begin(long size) {} 131 132 /** 133 * Indicates that all elements have been pushed. If the {@code Sink} is 134 * stateful, it should send any stored state downstream at this time, and 135 * should clear any accumulated state (and associated resources). 136 * 137 * <p>Prior to this call, the sink must be in the active state, and after 138 * this call it is returned to the initial state. 139 */ end()140 default void end() {} 141 142 /** 143 * Indicates that this {@code Sink} does not wish to receive any more data. 144 * 145 * @implSpec The default implementation always returns false. 146 * 147 * @return true if cancellation is requested 148 */ cancellationRequested()149 default boolean cancellationRequested() { 150 return false; 151 } 152 153 /** 154 * Accepts an int value. 155 * 156 * @implSpec The default implementation throws IllegalStateException. 157 * 158 * @throws IllegalStateException if this sink does not accept int values 159 */ accept(int value)160 default void accept(int value) { 161 throw new IllegalStateException("called wrong accept method"); 162 } 163 164 /** 165 * Accepts a long value. 166 * 167 * @implSpec The default implementation throws IllegalStateException. 168 * 169 * @throws IllegalStateException if this sink does not accept long values 170 */ accept(long value)171 default void accept(long value) { 172 throw new IllegalStateException("called wrong accept method"); 173 } 174 175 /** 176 * Accepts a double value. 177 * 178 * @implSpec The default implementation throws IllegalStateException. 179 * 180 * @throws IllegalStateException if this sink does not accept double values 181 */ accept(double value)182 default void accept(double value) { 183 throw new IllegalStateException("called wrong accept method"); 184 } 185 186 /** 187 * {@code Sink} that implements {@code Sink<Integer>}, re-abstracts 188 * {@code accept(int)}, and wires {@code accept(Integer)} to bridge to 189 * {@code accept(int)}. 190 */ 191 @SuppressWarnings("overloads") 192 interface OfInt extends Sink<Integer>, IntConsumer { 193 @Override accept(int value)194 void accept(int value); 195 196 @Override accept(Integer i)197 default void accept(Integer i) { 198 if (Tripwire.ENABLED) 199 Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)"); 200 accept(i.intValue()); 201 } 202 } 203 204 /** 205 * {@code Sink} that implements {@code Sink<Long>}, re-abstracts 206 * {@code accept(long)}, and wires {@code accept(Long)} to bridge to 207 * {@code accept(long)}. 208 */ 209 @SuppressWarnings("overloads") 210 interface OfLong extends Sink<Long>, LongConsumer { 211 @Override accept(long value)212 void accept(long value); 213 214 @Override accept(Long i)215 default void accept(Long i) { 216 if (Tripwire.ENABLED) 217 Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)"); 218 accept(i.longValue()); 219 } 220 } 221 222 /** 223 * {@code Sink} that implements {@code Sink<Double>}, re-abstracts 224 * {@code accept(double)}, and wires {@code accept(Double)} to bridge to 225 * {@code accept(double)}. 226 */ 227 @SuppressWarnings("overloads") 228 interface OfDouble extends Sink<Double>, DoubleConsumer { 229 @Override accept(double value)230 void accept(double value); 231 232 @Override accept(Double i)233 default void accept(Double i) { 234 if (Tripwire.ENABLED) 235 Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)"); 236 accept(i.doubleValue()); 237 } 238 } 239 240 /** 241 * Abstract {@code Sink} implementation for creating chains of 242 * sinks. The {@code begin}, {@code end}, and 243 * {@code cancellationRequested} methods are wired to chain to the 244 * downstream {@code Sink}. This implementation takes a downstream 245 * {@code Sink} of unknown input shape and produces a {@code Sink<T>}. The 246 * implementation of the {@code accept()} method must call the correct 247 * {@code accept()} method on the downstream {@code Sink}. 248 */ 249 abstract static class ChainedReference<T, E_OUT> implements Sink<T> { 250 protected final Sink<? super E_OUT> downstream; 251 ChainedReference(Sink<? super E_OUT> downstream)252 public ChainedReference(Sink<? super E_OUT> downstream) { 253 this.downstream = Objects.requireNonNull(downstream); 254 } 255 256 @Override begin(long size)257 public void begin(long size) { 258 downstream.begin(size); 259 } 260 261 @Override end()262 public void end() { 263 downstream.end(); 264 } 265 266 @Override cancellationRequested()267 public boolean cancellationRequested() { 268 return downstream.cancellationRequested(); 269 } 270 } 271 272 /** 273 * Abstract {@code Sink} implementation designed for creating chains of 274 * sinks. The {@code begin}, {@code end}, and 275 * {@code cancellationRequested} methods are wired to chain to the 276 * downstream {@code Sink}. This implementation takes a downstream 277 * {@code Sink} of unknown input shape and produces a {@code Sink.OfInt}. 278 * The implementation of the {@code accept()} method must call the correct 279 * {@code accept()} method on the downstream {@code Sink}. 280 */ 281 abstract static class ChainedInt<E_OUT> implements Sink.OfInt { 282 protected final Sink<? super E_OUT> downstream; 283 ChainedInt(Sink<? super E_OUT> downstream)284 public ChainedInt(Sink<? super E_OUT> downstream) { 285 this.downstream = Objects.requireNonNull(downstream); 286 } 287 288 @Override begin(long size)289 public void begin(long size) { 290 downstream.begin(size); 291 } 292 293 @Override end()294 public void end() { 295 downstream.end(); 296 } 297 298 @Override cancellationRequested()299 public boolean cancellationRequested() { 300 return downstream.cancellationRequested(); 301 } 302 } 303 304 /** 305 * Abstract {@code Sink} implementation designed for creating chains of 306 * sinks. The {@code begin}, {@code end}, and 307 * {@code cancellationRequested} methods are wired to chain to the 308 * downstream {@code Sink}. This implementation takes a downstream 309 * {@code Sink} of unknown input shape and produces a {@code Sink.OfLong}. 310 * The implementation of the {@code accept()} method must call the correct 311 * {@code accept()} method on the downstream {@code Sink}. 312 */ 313 abstract static class ChainedLong<E_OUT> implements Sink.OfLong { 314 protected final Sink<? super E_OUT> downstream; 315 ChainedLong(Sink<? super E_OUT> downstream)316 public ChainedLong(Sink<? super E_OUT> downstream) { 317 this.downstream = Objects.requireNonNull(downstream); 318 } 319 320 @Override begin(long size)321 public void begin(long size) { 322 downstream.begin(size); 323 } 324 325 @Override end()326 public void end() { 327 downstream.end(); 328 } 329 330 @Override cancellationRequested()331 public boolean cancellationRequested() { 332 return downstream.cancellationRequested(); 333 } 334 } 335 336 /** 337 * Abstract {@code Sink} implementation designed for creating chains of 338 * sinks. The {@code begin}, {@code end}, and 339 * {@code cancellationRequested} methods are wired to chain to the 340 * downstream {@code Sink}. This implementation takes a downstream 341 * {@code Sink} of unknown input shape and produces a {@code Sink.OfDouble}. 342 * The implementation of the {@code accept()} method must call the correct 343 * {@code accept()} method on the downstream {@code Sink}. 344 */ 345 abstract static class ChainedDouble<E_OUT> implements Sink.OfDouble { 346 protected final Sink<? super E_OUT> downstream; 347 ChainedDouble(Sink<? super E_OUT> downstream)348 public ChainedDouble(Sink<? super E_OUT> downstream) { 349 this.downstream = Objects.requireNonNull(downstream); 350 } 351 352 @Override begin(long size)353 public void begin(long size) { 354 downstream.begin(size); 355 } 356 357 @Override end()358 public void end() { 359 downstream.end(); 360 } 361 362 @Override cancellationRequested()363 public boolean cancellationRequested() { 364 return downstream.cancellationRequested(); 365 } 366 } 367 } 368