1 /* 2 * Copyright (c) 2012, 2014, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.IntSummaryStatistics; 28 import java.util.Objects; 29 import java.util.OptionalDouble; 30 import java.util.OptionalInt; 31 import java.util.PrimitiveIterator; 32 import java.util.Spliterator; 33 import java.util.Spliterators; 34 import java.util.function.BiConsumer; 35 import java.util.function.BinaryOperator; 36 import java.util.function.IntBinaryOperator; 37 import java.util.function.IntConsumer; 38 import java.util.function.IntFunction; 39 import java.util.function.IntPredicate; 40 import java.util.function.IntToDoubleFunction; 41 import java.util.function.IntToLongFunction; 42 import java.util.function.IntUnaryOperator; 43 import java.util.function.ObjIntConsumer; 44 import java.util.function.Supplier; 45 46 /** 47 * Abstract base class for an intermediate pipeline stage or pipeline source 48 * stage implementing whose elements are of type {@code int}. 49 * 50 * @param <E_IN> type of elements in the upstream source 51 * @since 1.8 52 * @hide Visible for CTS testing only (OpenJDK8 tests). 53 */ 54 // Android-changed: Made public for CTS tests only. 55 public abstract class IntPipeline<E_IN> 56 extends AbstractPipeline<E_IN, Integer, IntStream> 57 implements IntStream { 58 59 /** 60 * Constructor for the head of a stream pipeline. 61 * 62 * @param source {@code Supplier<Spliterator>} describing the stream source 63 * @param sourceFlags The source flags for the stream source, described in 64 * {@link StreamOpFlag} 65 * @param parallel {@code true} if the pipeline is parallel 66 */ IntPipeline(Supplier<? extends Spliterator<Integer>> source, int sourceFlags, boolean parallel)67 IntPipeline(Supplier<? extends Spliterator<Integer>> source, 68 int sourceFlags, boolean parallel) { 69 super(source, sourceFlags, parallel); 70 } 71 72 /** 73 * Constructor for the head of a stream pipeline. 74 * 75 * @param source {@code Spliterator} describing the stream source 76 * @param sourceFlags The source flags for the stream source, described in 77 * {@link StreamOpFlag} 78 * @param parallel {@code true} if the pipeline is parallel 79 */ IntPipeline(Spliterator<Integer> source, int sourceFlags, boolean parallel)80 IntPipeline(Spliterator<Integer> source, 81 int sourceFlags, boolean parallel) { 82 super(source, sourceFlags, parallel); 83 } 84 85 /** 86 * Constructor for appending an intermediate operation onto an existing 87 * pipeline. 88 * 89 * @param upstream the upstream element source 90 * @param opFlags the operation flags for the new operation 91 */ IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags)92 IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) { 93 super(upstream, opFlags); 94 } 95 96 /** 97 * Adapt a {@code Sink<Integer> to an {@code IntConsumer}, ideally simply 98 * by casting. 99 */ adapt(Sink<Integer> sink)100 private static IntConsumer adapt(Sink<Integer> sink) { 101 if (sink instanceof IntConsumer) { 102 return (IntConsumer) sink; 103 } 104 else { 105 if (Tripwire.ENABLED) 106 Tripwire.trip(AbstractPipeline.class, 107 "using IntStream.adapt(Sink<Integer> s)"); 108 return sink::accept; 109 } 110 } 111 112 /** 113 * Adapt a {@code Spliterator<Integer>} to a {@code Spliterator.OfInt}. 114 * 115 * @implNote 116 * The implementation attempts to cast to a Spliterator.OfInt, and throws an 117 * exception if this cast is not possible. 118 */ adapt(Spliterator<Integer> s)119 private static Spliterator.OfInt adapt(Spliterator<Integer> s) { 120 if (s instanceof Spliterator.OfInt) { 121 return (Spliterator.OfInt) s; 122 } 123 else { 124 if (Tripwire.ENABLED) 125 Tripwire.trip(AbstractPipeline.class, 126 "using IntStream.adapt(Spliterator<Integer> s)"); 127 throw new UnsupportedOperationException("IntStream.adapt(Spliterator<Integer> s)"); 128 } 129 } 130 131 132 // Shape-specific methods 133 134 @Override 135 // Android-changed: Make public, to match the method it's overriding. getOutputShape()136 public final StreamShape getOutputShape() { 137 return StreamShape.INT_VALUE; 138 } 139 140 @Override 141 // Android-changed: Make public, to match the method it's overriding. evaluateToNode(PipelineHelper<Integer> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<Integer[]> generator)142 public final <P_IN> Node<Integer> evaluateToNode(PipelineHelper<Integer> helper, 143 Spliterator<P_IN> spliterator, 144 boolean flattenTree, 145 IntFunction<Integer[]> generator) { 146 return Nodes.collectInt(helper, spliterator, flattenTree); 147 } 148 149 @Override 150 // Android-changed: Make public, to match the method it's overriding. wrap(PipelineHelper<Integer> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel)151 public final <P_IN> Spliterator<Integer> wrap(PipelineHelper<Integer> ph, 152 Supplier<Spliterator<P_IN>> supplier, 153 boolean isParallel) { 154 return new StreamSpliterators.IntWrappingSpliterator<>(ph, supplier, isParallel); 155 } 156 157 @Override 158 @SuppressWarnings("unchecked") 159 // Android-changed: Make public, to match the method it's overriding. lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier)160 public final Spliterator.OfInt lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier) { 161 return new StreamSpliterators.DelegatingSpliterator.OfInt((Supplier<Spliterator.OfInt>) supplier); 162 } 163 164 @Override 165 // Android-changed: Make public, to match the method it's overriding. forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink)166 public final void forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) { 167 Spliterator.OfInt spl = adapt(spliterator); 168 IntConsumer adaptedSink = adapt(sink); 169 do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)); 170 } 171 172 @Override 173 // Android-changed: Make public, to match the method it's overriding. makeNodeBuilder(long exactSizeIfKnown, IntFunction<Integer[]> generator)174 public final Node.Builder<Integer> makeNodeBuilder(long exactSizeIfKnown, 175 IntFunction<Integer[]> generator) { 176 return Nodes.intBuilder(exactSizeIfKnown); 177 } 178 179 180 // IntStream 181 182 @Override iterator()183 public final PrimitiveIterator.OfInt iterator() { 184 return Spliterators.iterator(spliterator()); 185 } 186 187 @Override spliterator()188 public final Spliterator.OfInt spliterator() { 189 return adapt(super.spliterator()); 190 } 191 192 // Stateless intermediate ops from IntStream 193 194 @Override asLongStream()195 public final LongStream asLongStream() { 196 return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 197 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 198 @Override 199 public Sink<Integer> opWrapSink(int flags, Sink<Long> sink) { 200 return new Sink.ChainedInt<Long>(sink) { 201 @Override 202 public void accept(int t) { 203 downstream.accept((long) t); 204 } 205 }; 206 } 207 }; 208 } 209 210 @Override 211 public final DoubleStream asDoubleStream() { 212 return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 213 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 214 @Override 215 // Android-changed: Make public, to match the method it's overriding. 216 public Sink<Integer> opWrapSink(int flags, Sink<Double> sink) { 217 return new Sink.ChainedInt<Double>(sink) { 218 @Override 219 public void accept(int t) { 220 downstream.accept((double) t); 221 } 222 }; 223 } 224 }; 225 } 226 227 @Override 228 public final Stream<Integer> boxed() { 229 return mapToObj(Integer::valueOf); 230 } 231 232 @Override 233 public final IntStream map(IntUnaryOperator mapper) { 234 Objects.requireNonNull(mapper); 235 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 236 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 237 @Override 238 // Android-changed: Make public, to match the method it's overriding. 239 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 240 return new Sink.ChainedInt<Integer>(sink) { 241 @Override 242 public void accept(int t) { 243 downstream.accept(mapper.applyAsInt(t)); 244 } 245 }; 246 } 247 }; 248 } 249 250 @Override 251 public final <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) { 252 Objects.requireNonNull(mapper); 253 return new ReferencePipeline.StatelessOp<Integer, U>(this, StreamShape.INT_VALUE, 254 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 255 @Override 256 // Android-changed: Make public, to match the method it's overriding. 257 public Sink<Integer> opWrapSink(int flags, Sink<U> sink) { 258 return new Sink.ChainedInt<U>(sink) { 259 @Override 260 public void accept(int t) { 261 downstream.accept(mapper.apply(t)); 262 } 263 }; 264 } 265 }; 266 } 267 268 @Override 269 public final LongStream mapToLong(IntToLongFunction mapper) { 270 Objects.requireNonNull(mapper); 271 return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 272 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 273 @Override 274 // Android-changed: Make public, to match the method it's overriding. 275 public Sink<Integer> opWrapSink(int flags, Sink<Long> sink) { 276 return new Sink.ChainedInt<Long>(sink) { 277 @Override 278 public void accept(int t) { 279 downstream.accept(mapper.applyAsLong(t)); 280 } 281 }; 282 } 283 }; 284 } 285 286 @Override 287 public final DoubleStream mapToDouble(IntToDoubleFunction mapper) { 288 Objects.requireNonNull(mapper); 289 return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 290 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 291 @Override 292 // Android-changed: Make public, to match the method it's overriding. 293 public Sink<Integer> opWrapSink(int flags, Sink<Double> sink) { 294 return new Sink.ChainedInt<Double>(sink) { 295 @Override 296 public void accept(int t) { 297 downstream.accept(mapper.applyAsDouble(t)); 298 } 299 }; 300 } 301 }; 302 } 303 304 @Override 305 public final IntStream flatMap(IntFunction<? extends IntStream> mapper) { 306 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 307 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 308 @Override 309 // Android-changed: Make public, to match the method it's overriding. 310 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 311 return new Sink.ChainedInt<Integer>(sink) { 312 @Override 313 public void begin(long size) { 314 downstream.begin(-1); 315 } 316 317 @Override 318 public void accept(int t) { 319 try (IntStream result = mapper.apply(t)) { 320 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 321 if (result != null) 322 result.sequential().forEach(i -> downstream.accept(i)); 323 } 324 } 325 }; 326 } 327 }; 328 } 329 330 @Override 331 public IntStream unordered() { 332 if (!isOrdered()) 333 return this; 334 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, StreamOpFlag.NOT_ORDERED) { 335 @Override 336 // Android-changed: Make public, to match the method it's overriding. 337 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 338 return sink; 339 } 340 }; 341 } 342 343 @Override 344 public final IntStream filter(IntPredicate predicate) { 345 Objects.requireNonNull(predicate); 346 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 347 StreamOpFlag.NOT_SIZED) { 348 @Override 349 // Android-changed: Make public, to match the method it's overriding. 350 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 351 return new Sink.ChainedInt<Integer>(sink) { 352 @Override 353 public void begin(long size) { 354 downstream.begin(-1); 355 } 356 357 @Override 358 public void accept(int t) { 359 if (predicate.test(t)) 360 downstream.accept(t); 361 } 362 }; 363 } 364 }; 365 } 366 367 @Override 368 public final IntStream peek(IntConsumer action) { 369 Objects.requireNonNull(action); 370 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 371 0) { 372 @Override 373 // Android-changed: Make public, to match the method it's overriding. 374 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 375 return new Sink.ChainedInt<Integer>(sink) { 376 @Override 377 public void accept(int t) { 378 action.accept(t); 379 downstream.accept(t); 380 } 381 }; 382 } 383 }; 384 } 385 386 // Stateful intermediate ops from IntStream 387 388 @Override 389 public final IntStream limit(long maxSize) { 390 if (maxSize < 0) 391 throw new IllegalArgumentException(Long.toString(maxSize)); 392 return SliceOps.makeInt(this, 0, maxSize); 393 } 394 395 @Override 396 public final IntStream skip(long n) { 397 if (n < 0) 398 throw new IllegalArgumentException(Long.toString(n)); 399 if (n == 0) 400 return this; 401 else 402 return SliceOps.makeInt(this, n, -1); 403 } 404 405 @Override 406 public final IntStream sorted() { 407 return SortedOps.makeInt(this); 408 } 409 410 @Override 411 public final IntStream distinct() { 412 // While functional and quick to implement, this approach is not very efficient. 413 // An efficient version requires an int-specific map/set implementation. 414 return boxed().distinct().mapToInt(i -> i); 415 } 416 417 // Terminal ops from IntStream 418 419 @Override 420 public void forEach(IntConsumer action) { 421 evaluate(ForEachOps.makeInt(action, false)); 422 } 423 424 @Override 425 public void forEachOrdered(IntConsumer action) { 426 evaluate(ForEachOps.makeInt(action, true)); 427 } 428 429 @Override 430 public final int sum() { 431 return reduce(0, Integer::sum); 432 } 433 434 @Override 435 public final OptionalInt min() { 436 return reduce(Math::min); 437 } 438 439 @Override 440 public final OptionalInt max() { 441 return reduce(Math::max); 442 } 443 444 @Override 445 public final long count() { 446 return mapToLong(e -> 1L).sum(); 447 } 448 449 @Override 450 public final OptionalDouble average() { 451 long[] avg = collect(() -> new long[2], 452 (ll, i) -> { 453 ll[0]++; 454 ll[1] += i; 455 }, 456 (ll, rr) -> { 457 ll[0] += rr[0]; 458 ll[1] += rr[1]; 459 }); 460 return avg[0] > 0 461 ? OptionalDouble.of((double) avg[1] / avg[0]) 462 : OptionalDouble.empty(); 463 } 464 465 @Override 466 public final IntSummaryStatistics summaryStatistics() { 467 return collect(IntSummaryStatistics::new, IntSummaryStatistics::accept, 468 IntSummaryStatistics::combine); 469 } 470 471 @Override 472 public final int reduce(int identity, IntBinaryOperator op) { 473 return evaluate(ReduceOps.makeInt(identity, op)); 474 } 475 476 @Override 477 public final OptionalInt reduce(IntBinaryOperator op) { 478 return evaluate(ReduceOps.makeInt(op)); 479 } 480 481 @Override 482 public final <R> R collect(Supplier<R> supplier, 483 ObjIntConsumer<R> accumulator, 484 BiConsumer<R, R> combiner) { 485 BinaryOperator<R> operator = (left, right) -> { 486 combiner.accept(left, right); 487 return left; 488 }; 489 return evaluate(ReduceOps.makeInt(supplier, accumulator, operator)); 490 } 491 492 @Override 493 public final boolean anyMatch(IntPredicate predicate) { 494 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ANY)); 495 } 496 497 @Override 498 public final boolean allMatch(IntPredicate predicate) { 499 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ALL)); 500 } 501 502 @Override 503 public final boolean noneMatch(IntPredicate predicate) { 504 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.NONE)); 505 } 506 507 @Override 508 public final OptionalInt findFirst() { 509 return evaluate(FindOps.makeInt(true)); 510 } 511 512 @Override 513 public final OptionalInt findAny() { 514 return evaluate(FindOps.makeInt(false)); 515 } 516 517 @Override 518 public final int[] toArray() { 519 return Nodes.flattenInt((Node.OfInt) evaluateToArrayNode(Integer[]::new)) 520 .asPrimitiveArray(); 521 } 522 523 // 524 525 /** 526 * Source stage of an IntStream. 527 * 528 * @param <E_IN> type of elements in the upstream source 529 * @since 1.8 530 * @hide Visible for CTS testing only (OpenJDK8 tests). 531 */ 532 // Android-changed: Made public for CTS tests only. 533 public static class Head<E_IN> extends IntPipeline<E_IN> { 534 /** 535 * Constructor for the source stage of an IntStream. 536 * 537 * @param source {@code Supplier<Spliterator>} describing the stream 538 * source 539 * @param sourceFlags the source flags for the stream source, described 540 * in {@link StreamOpFlag} 541 * @param parallel {@code true} if the pipeline is parallel 542 */ 543 // Android-changed: Made public for CTS tests only. 544 public Head(Supplier<? extends Spliterator<Integer>> source, 545 int sourceFlags, boolean parallel) { 546 super(source, sourceFlags, parallel); 547 } 548 549 /** 550 * Constructor for the source stage of an IntStream. 551 * 552 * @param source {@code Spliterator} describing the stream source 553 * @param sourceFlags the source flags for the stream source, described 554 * in {@link StreamOpFlag} 555 * @param parallel {@code true} if the pipeline is parallel 556 */ 557 // Android-changed: Made public for CTS tests only. 558 public Head(Spliterator<Integer> source, 559 int sourceFlags, boolean parallel) { 560 super(source, sourceFlags, parallel); 561 } 562 563 @Override 564 // Android-changed: Make public, to match the method it's overriding. 565 public final boolean opIsStateful() { 566 throw new UnsupportedOperationException(); 567 } 568 569 @Override 570 // Android-changed: Make public, to match the method it's overriding. 571 public final Sink<E_IN> opWrapSink(int flags, Sink<Integer> sink) { 572 throw new UnsupportedOperationException(); 573 } 574 575 // Optimized sequential terminal operations for the head of the pipeline 576 577 @Override 578 public void forEach(IntConsumer action) { 579 if (!isParallel()) { 580 adapt(sourceStageSpliterator()).forEachRemaining(action); 581 } 582 else { 583 super.forEach(action); 584 } 585 } 586 587 @Override 588 public void forEachOrdered(IntConsumer action) { 589 if (!isParallel()) { 590 adapt(sourceStageSpliterator()).forEachRemaining(action); 591 } 592 else { 593 super.forEachOrdered(action); 594 } 595 } 596 } 597 598 /** 599 * Base class for a stateless intermediate stage of an IntStream 600 * 601 * @param <E_IN> type of elements in the upstream source 602 * @since 1.8 603 * @hide Visible for CTS testing only (OpenJDK8 tests). 604 */ 605 // Android-changed: Made public for CTS tests only. 606 public abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN> { 607 /** 608 * Construct a new IntStream by appending a stateless intermediate 609 * operation to an existing stream. 610 * @param upstream The upstream pipeline stage 611 * @param inputShape The stream shape for the upstream pipeline stage 612 * @param opFlags Operation flags for the new stage 613 */ 614 // Android-changed: Made public for CTS tests only. 615 public StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, 616 StreamShape inputShape, 617 int opFlags) { 618 super(upstream, opFlags); 619 assert upstream.getOutputShape() == inputShape; 620 } 621 622 @Override 623 // Android-changed: Make public, to match the method it's overriding. 624 public final boolean opIsStateful() { 625 return false; 626 } 627 } 628 629 /** 630 * Base class for a stateful intermediate stage of an IntStream. 631 * 632 * @param <E_IN> type of elements in the upstream source 633 * @since 1.8 634 * @hide Visible for CTS testing only (OpenJDK8 tests). 635 */ 636 // Android-changed: Made public for CTS tests only. 637 public abstract static class StatefulOp<E_IN> extends IntPipeline<E_IN> { 638 /** 639 * Construct a new IntStream by appending a stateful intermediate 640 * operation to an existing stream. 641 * @param upstream The upstream pipeline stage 642 * @param inputShape The stream shape for the upstream pipeline stage 643 * @param opFlags Operation flags for the new stage 644 */ 645 // Android-changed: Made public for CTS tests only. 646 public StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, 647 StreamShape inputShape, 648 int opFlags) { 649 super(upstream, opFlags); 650 assert upstream.getOutputShape() == inputShape; 651 } 652 653 @Override 654 // Android-changed: Make public, to match the method it's overriding. 655 public final boolean opIsStateful() { 656 return true; 657 } 658 659 @Override 660 // Android-changed: Make public, to match the method it's overriding. 661 public abstract <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, 662 Spliterator<P_IN> spliterator, 663 IntFunction<Integer[]> generator); 664 } 665 } 666