1 /* 2 * Copyright (c) 2012, 2014, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.IntSummaryStatistics; 28 import java.util.Objects; 29 import java.util.OptionalDouble; 30 import java.util.OptionalInt; 31 import java.util.PrimitiveIterator; 32 import java.util.Spliterator; 33 import java.util.Spliterators; 34 import java.util.function.BiConsumer; 35 import java.util.function.BinaryOperator; 36 import java.util.function.IntBinaryOperator; 37 import java.util.function.IntConsumer; 38 import java.util.function.IntFunction; 39 import java.util.function.IntPredicate; 40 import java.util.function.IntToDoubleFunction; 41 import java.util.function.IntToLongFunction; 42 import java.util.function.IntUnaryOperator; 43 import java.util.function.ObjIntConsumer; 44 import java.util.function.Supplier; 45 46 /** 47 * Abstract base class for an intermediate pipeline stage or pipeline source 48 * stage implementing whose elements are of type {@code int}. 49 * 50 * @param <E_IN> type of elements in the upstream source 51 * @since 1.8 52 * @hide Visible for CTS testing only (OpenJDK8 tests). 53 */ 54 public abstract class IntPipeline<E_IN> 55 extends AbstractPipeline<E_IN, Integer, IntStream> 56 implements IntStream { 57 58 /** 59 * Constructor for the head of a stream pipeline. 60 * 61 * @param source {@code Supplier<Spliterator>} describing the stream source 62 * @param sourceFlags The source flags for the stream source, described in 63 * {@link StreamOpFlag} 64 * @param parallel {@code true} if the pipeline is parallel 65 */ IntPipeline(Supplier<? extends Spliterator<Integer>> source, int sourceFlags, boolean parallel)66 IntPipeline(Supplier<? extends Spliterator<Integer>> source, 67 int sourceFlags, boolean parallel) { 68 super(source, sourceFlags, parallel); 69 } 70 71 /** 72 * Constructor for the head of a stream pipeline. 73 * 74 * @param source {@code Spliterator} describing the stream source 75 * @param sourceFlags The source flags for the stream source, described in 76 * {@link StreamOpFlag} 77 * @param parallel {@code true} if the pipeline is parallel 78 */ IntPipeline(Spliterator<Integer> source, int sourceFlags, boolean parallel)79 IntPipeline(Spliterator<Integer> source, 80 int sourceFlags, boolean parallel) { 81 super(source, sourceFlags, parallel); 82 } 83 84 /** 85 * Constructor for appending an intermediate operation onto an existing 86 * pipeline. 87 * 88 * @param upstream the upstream element source 89 * @param opFlags the operation flags for the new operation 90 */ IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags)91 IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) { 92 super(upstream, opFlags); 93 } 94 95 /** 96 * Adapt a {@code Sink<Integer> to an {@code IntConsumer}, ideally simply 97 * by casting. 98 */ adapt(Sink<Integer> sink)99 private static IntConsumer adapt(Sink<Integer> sink) { 100 if (sink instanceof IntConsumer) { 101 return (IntConsumer) sink; 102 } 103 else { 104 if (Tripwire.ENABLED) 105 Tripwire.trip(AbstractPipeline.class, 106 "using IntStream.adapt(Sink<Integer> s)"); 107 return sink::accept; 108 } 109 } 110 111 /** 112 * Adapt a {@code Spliterator<Integer>} to a {@code Spliterator.OfInt}. 113 * 114 * @implNote 115 * The implementation attempts to cast to a Spliterator.OfInt, and throws an 116 * exception if this cast is not possible. 117 */ adapt(Spliterator<Integer> s)118 private static Spliterator.OfInt adapt(Spliterator<Integer> s) { 119 if (s instanceof Spliterator.OfInt) { 120 return (Spliterator.OfInt) s; 121 } 122 else { 123 if (Tripwire.ENABLED) 124 Tripwire.trip(AbstractPipeline.class, 125 "using IntStream.adapt(Spliterator<Integer> s)"); 126 throw new UnsupportedOperationException("IntStream.adapt(Spliterator<Integer> s)"); 127 } 128 } 129 130 131 // Shape-specific methods 132 133 @Override getOutputShape()134 public final StreamShape getOutputShape() { 135 return StreamShape.INT_VALUE; 136 } 137 138 @Override evaluateToNode(PipelineHelper<Integer> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<Integer[]> generator)139 public final <P_IN> Node<Integer> evaluateToNode(PipelineHelper<Integer> helper, 140 Spliterator<P_IN> spliterator, 141 boolean flattenTree, 142 IntFunction<Integer[]> generator) { 143 return Nodes.collectInt(helper, spliterator, flattenTree); 144 } 145 146 @Override wrap(PipelineHelper<Integer> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel)147 public final <P_IN> Spliterator<Integer> wrap(PipelineHelper<Integer> ph, 148 Supplier<Spliterator<P_IN>> supplier, 149 boolean isParallel) { 150 return new StreamSpliterators.IntWrappingSpliterator<>(ph, supplier, isParallel); 151 } 152 153 @Override 154 @SuppressWarnings("unchecked") lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier)155 public final Spliterator.OfInt lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier) { 156 return new StreamSpliterators.DelegatingSpliterator.OfInt((Supplier<Spliterator.OfInt>) supplier); 157 } 158 159 @Override forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink)160 public final void forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) { 161 Spliterator.OfInt spl = adapt(spliterator); 162 IntConsumer adaptedSink = adapt(sink); 163 do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)); 164 } 165 166 @Override makeNodeBuilder(long exactSizeIfKnown, IntFunction<Integer[]> generator)167 public final Node.Builder<Integer> makeNodeBuilder(long exactSizeIfKnown, 168 IntFunction<Integer[]> generator) { 169 return Nodes.intBuilder(exactSizeIfKnown); 170 } 171 172 173 // IntStream 174 175 @Override iterator()176 public final PrimitiveIterator.OfInt iterator() { 177 return Spliterators.iterator(spliterator()); 178 } 179 180 @Override spliterator()181 public final Spliterator.OfInt spliterator() { 182 return adapt(super.spliterator()); 183 } 184 185 // Stateless intermediate ops from IntStream 186 187 @Override asLongStream()188 public final LongStream asLongStream() { 189 return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 190 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 191 @Override 192 public Sink<Integer> opWrapSink(int flags, Sink<Long> sink) { 193 return new Sink.ChainedInt<Long>(sink) { 194 @Override 195 public void accept(int t) { 196 downstream.accept((long) t); 197 } 198 }; 199 } 200 }; 201 } 202 203 @Override 204 public final DoubleStream asDoubleStream() { 205 return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 206 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 207 @Override 208 public Sink<Integer> opWrapSink(int flags, Sink<Double> sink) { 209 return new Sink.ChainedInt<Double>(sink) { 210 @Override 211 public void accept(int t) { 212 downstream.accept((double) t); 213 } 214 }; 215 } 216 }; 217 } 218 219 @Override 220 public final Stream<Integer> boxed() { 221 return mapToObj(Integer::valueOf); 222 } 223 224 @Override 225 public final IntStream map(IntUnaryOperator mapper) { 226 Objects.requireNonNull(mapper); 227 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 228 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 229 @Override 230 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 231 return new Sink.ChainedInt<Integer>(sink) { 232 @Override 233 public void accept(int t) { 234 downstream.accept(mapper.applyAsInt(t)); 235 } 236 }; 237 } 238 }; 239 } 240 241 @Override 242 public final <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) { 243 Objects.requireNonNull(mapper); 244 return new ReferencePipeline.StatelessOp<Integer, U>(this, StreamShape.INT_VALUE, 245 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 246 @Override 247 public Sink<Integer> opWrapSink(int flags, Sink<U> sink) { 248 return new Sink.ChainedInt<U>(sink) { 249 @Override 250 public void accept(int t) { 251 downstream.accept(mapper.apply(t)); 252 } 253 }; 254 } 255 }; 256 } 257 258 @Override 259 public final LongStream mapToLong(IntToLongFunction mapper) { 260 Objects.requireNonNull(mapper); 261 return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 262 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 263 @Override 264 public Sink<Integer> opWrapSink(int flags, Sink<Long> sink) { 265 return new Sink.ChainedInt<Long>(sink) { 266 @Override 267 public void accept(int t) { 268 downstream.accept(mapper.applyAsLong(t)); 269 } 270 }; 271 } 272 }; 273 } 274 275 @Override 276 public final DoubleStream mapToDouble(IntToDoubleFunction mapper) { 277 Objects.requireNonNull(mapper); 278 return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 279 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 280 @Override 281 public Sink<Integer> opWrapSink(int flags, Sink<Double> sink) { 282 return new Sink.ChainedInt<Double>(sink) { 283 @Override 284 public void accept(int t) { 285 downstream.accept(mapper.applyAsDouble(t)); 286 } 287 }; 288 } 289 }; 290 } 291 292 @Override 293 public final IntStream flatMap(IntFunction<? extends IntStream> mapper) { 294 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 295 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 296 @Override 297 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 298 return new Sink.ChainedInt<Integer>(sink) { 299 @Override 300 public void begin(long size) { 301 downstream.begin(-1); 302 } 303 304 @Override 305 public void accept(int t) { 306 try (IntStream result = mapper.apply(t)) { 307 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 308 if (result != null) 309 result.sequential().forEach(i -> downstream.accept(i)); 310 } 311 } 312 }; 313 } 314 }; 315 } 316 317 @Override 318 public IntStream unordered() { 319 if (!isOrdered()) 320 return this; 321 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, StreamOpFlag.NOT_ORDERED) { 322 @Override 323 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 324 return sink; 325 } 326 }; 327 } 328 329 @Override 330 public final IntStream filter(IntPredicate predicate) { 331 Objects.requireNonNull(predicate); 332 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 333 StreamOpFlag.NOT_SIZED) { 334 @Override 335 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 336 return new Sink.ChainedInt<Integer>(sink) { 337 @Override 338 public void begin(long size) { 339 downstream.begin(-1); 340 } 341 342 @Override 343 public void accept(int t) { 344 if (predicate.test(t)) 345 downstream.accept(t); 346 } 347 }; 348 } 349 }; 350 } 351 352 @Override 353 public final IntStream peek(IntConsumer action) { 354 Objects.requireNonNull(action); 355 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 356 0) { 357 @Override 358 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 359 return new Sink.ChainedInt<Integer>(sink) { 360 @Override 361 public void accept(int t) { 362 action.accept(t); 363 downstream.accept(t); 364 } 365 }; 366 } 367 }; 368 } 369 370 // Stateful intermediate ops from IntStream 371 372 @Override 373 public final IntStream limit(long maxSize) { 374 if (maxSize < 0) 375 throw new IllegalArgumentException(Long.toString(maxSize)); 376 return SliceOps.makeInt(this, 0, maxSize); 377 } 378 379 @Override 380 public final IntStream skip(long n) { 381 if (n < 0) 382 throw new IllegalArgumentException(Long.toString(n)); 383 if (n == 0) 384 return this; 385 else 386 return SliceOps.makeInt(this, n, -1); 387 } 388 389 @Override 390 public final IntStream sorted() { 391 return SortedOps.makeInt(this); 392 } 393 394 @Override 395 public final IntStream distinct() { 396 // While functional and quick to implement, this approach is not very efficient. 397 // An efficient version requires an int-specific map/set implementation. 398 return boxed().distinct().mapToInt(i -> i); 399 } 400 401 // Terminal ops from IntStream 402 403 @Override 404 public void forEach(IntConsumer action) { 405 evaluate(ForEachOps.makeInt(action, false)); 406 } 407 408 @Override 409 public void forEachOrdered(IntConsumer action) { 410 evaluate(ForEachOps.makeInt(action, true)); 411 } 412 413 @Override 414 public final int sum() { 415 return reduce(0, Integer::sum); 416 } 417 418 @Override 419 public final OptionalInt min() { 420 return reduce(Math::min); 421 } 422 423 @Override 424 public final OptionalInt max() { 425 return reduce(Math::max); 426 } 427 428 @Override 429 public final long count() { 430 return mapToLong(e -> 1L).sum(); 431 } 432 433 @Override 434 public final OptionalDouble average() { 435 long[] avg = collect(() -> new long[2], 436 (ll, i) -> { 437 ll[0]++; 438 ll[1] += i; 439 }, 440 (ll, rr) -> { 441 ll[0] += rr[0]; 442 ll[1] += rr[1]; 443 }); 444 return avg[0] > 0 445 ? OptionalDouble.of((double) avg[1] / avg[0]) 446 : OptionalDouble.empty(); 447 } 448 449 @Override 450 public final IntSummaryStatistics summaryStatistics() { 451 return collect(IntSummaryStatistics::new, IntSummaryStatistics::accept, 452 IntSummaryStatistics::combine); 453 } 454 455 @Override 456 public final int reduce(int identity, IntBinaryOperator op) { 457 return evaluate(ReduceOps.makeInt(identity, op)); 458 } 459 460 @Override 461 public final OptionalInt reduce(IntBinaryOperator op) { 462 return evaluate(ReduceOps.makeInt(op)); 463 } 464 465 @Override 466 public final <R> R collect(Supplier<R> supplier, 467 ObjIntConsumer<R> accumulator, 468 BiConsumer<R, R> combiner) { 469 BinaryOperator<R> operator = (left, right) -> { 470 combiner.accept(left, right); 471 return left; 472 }; 473 return evaluate(ReduceOps.makeInt(supplier, accumulator, operator)); 474 } 475 476 @Override 477 public final boolean anyMatch(IntPredicate predicate) { 478 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ANY)); 479 } 480 481 @Override 482 public final boolean allMatch(IntPredicate predicate) { 483 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ALL)); 484 } 485 486 @Override 487 public final boolean noneMatch(IntPredicate predicate) { 488 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.NONE)); 489 } 490 491 @Override 492 public final OptionalInt findFirst() { 493 return evaluate(FindOps.makeInt(true)); 494 } 495 496 @Override 497 public final OptionalInt findAny() { 498 return evaluate(FindOps.makeInt(false)); 499 } 500 501 @Override 502 public final int[] toArray() { 503 return Nodes.flattenInt((Node.OfInt) evaluateToArrayNode(Integer[]::new)) 504 .asPrimitiveArray(); 505 } 506 507 // 508 509 /** 510 * Source stage of an IntStream. 511 * 512 * @param <E_IN> type of elements in the upstream source 513 * @since 1.8 514 * @hide Visible for CTS testing only (OpenJDK8 tests). 515 */ 516 public static class Head<E_IN> extends IntPipeline<E_IN> { 517 /** 518 * Constructor for the source stage of an IntStream. 519 * 520 * @param source {@code Supplier<Spliterator>} describing the stream 521 * source 522 * @param sourceFlags the source flags for the stream source, described 523 * in {@link StreamOpFlag} 524 * @param parallel {@code true} if the pipeline is parallel 525 */ 526 public Head(Supplier<? extends Spliterator<Integer>> source, 527 int sourceFlags, boolean parallel) { 528 super(source, sourceFlags, parallel); 529 } 530 531 /** 532 * Constructor for the source stage of an IntStream. 533 * 534 * @param source {@code Spliterator} describing the stream source 535 * @param sourceFlags the source flags for the stream source, described 536 * in {@link StreamOpFlag} 537 * @param parallel {@code true} if the pipeline is parallel 538 */ 539 public Head(Spliterator<Integer> source, 540 int sourceFlags, boolean parallel) { 541 super(source, sourceFlags, parallel); 542 } 543 544 @Override 545 public final boolean opIsStateful() { 546 throw new UnsupportedOperationException(); 547 } 548 549 @Override 550 public final Sink<E_IN> opWrapSink(int flags, Sink<Integer> sink) { 551 throw new UnsupportedOperationException(); 552 } 553 554 // Optimized sequential terminal operations for the head of the pipeline 555 556 @Override 557 public void forEach(IntConsumer action) { 558 if (!isParallel()) { 559 adapt(sourceStageSpliterator()).forEachRemaining(action); 560 } 561 else { 562 super.forEach(action); 563 } 564 } 565 566 @Override 567 public void forEachOrdered(IntConsumer action) { 568 if (!isParallel()) { 569 adapt(sourceStageSpliterator()).forEachRemaining(action); 570 } 571 else { 572 super.forEachOrdered(action); 573 } 574 } 575 } 576 577 /** 578 * Base class for a stateless intermediate stage of an IntStream 579 * 580 * @param <E_IN> type of elements in the upstream source 581 * @since 1.8 582 * @hide Visible for CTS testing only (OpenJDK8 tests). 583 */ 584 public abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN> { 585 /** 586 * Construct a new IntStream by appending a stateless intermediate 587 * operation to an existing stream. 588 * @param upstream The upstream pipeline stage 589 * @param inputShape The stream shape for the upstream pipeline stage 590 * @param opFlags Operation flags for the new stage 591 */ 592 public StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, 593 StreamShape inputShape, 594 int opFlags) { 595 super(upstream, opFlags); 596 assert upstream.getOutputShape() == inputShape; 597 } 598 599 @Override 600 public final boolean opIsStateful() { 601 return false; 602 } 603 } 604 605 /** 606 * Base class for a stateful intermediate stage of an IntStream. 607 * 608 * @param <E_IN> type of elements in the upstream source 609 * @since 1.8 610 * @hide Visible for CTS testing only (OpenJDK8 tests). 611 */ 612 public abstract static class StatefulOp<E_IN> extends IntPipeline<E_IN> { 613 /** 614 * Construct a new IntStream by appending a stateful intermediate 615 * operation to an existing stream. 616 * @param upstream The upstream pipeline stage 617 * @param inputShape The stream shape for the upstream pipeline stage 618 * @param opFlags Operation flags for the new stage 619 */ 620 public StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, 621 StreamShape inputShape, 622 int opFlags) { 623 super(upstream, opFlags); 624 assert upstream.getOutputShape() == inputShape; 625 } 626 627 @Override 628 public final boolean opIsStateful() { 629 return true; 630 } 631 632 @Override 633 public abstract <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, 634 Spliterator<P_IN> spliterator, 635 IntFunction<Integer[]> generator); 636 } 637 } 638