1 /* 2 * Copyright (c) 2012, 2014, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.IntSummaryStatistics; 28 import java.util.Objects; 29 import java.util.OptionalDouble; 30 import java.util.OptionalInt; 31 import java.util.PrimitiveIterator; 32 import java.util.Spliterator; 33 import java.util.Spliterators; 34 import java.util.function.BiConsumer; 35 import java.util.function.BinaryOperator; 36 import java.util.function.IntBinaryOperator; 37 import java.util.function.IntConsumer; 38 import java.util.function.IntFunction; 39 import java.util.function.IntPredicate; 40 import java.util.function.IntToDoubleFunction; 41 import java.util.function.IntToLongFunction; 42 import java.util.function.IntUnaryOperator; 43 import java.util.function.ObjIntConsumer; 44 import java.util.function.Supplier; 45 46 /** 47 * Abstract base class for an intermediate pipeline stage or pipeline source 48 * stage implementing whose elements are of type {@code int}. 49 * 50 * @param <E_IN> type of elements in the upstream source 51 * @since 1.8 52 * @hide Visible for CTS testing only (OpenJDK8 tests). 53 */ 54 // Android-changed: Made public for CTS tests only. 55 public abstract class IntPipeline<E_IN> 56 extends AbstractPipeline<E_IN, Integer, IntStream> 57 implements IntStream { 58 59 /** 60 * Constructor for the head of a stream pipeline. 61 * 62 * @param source {@code Supplier<Spliterator>} describing the stream source 63 * @param sourceFlags The source flags for the stream source, described in 64 * {@link StreamOpFlag} 65 * @param parallel {@code true} if the pipeline is parallel 66 */ IntPipeline(Supplier<? extends Spliterator<Integer>> source, int sourceFlags, boolean parallel)67 IntPipeline(Supplier<? extends Spliterator<Integer>> source, 68 int sourceFlags, boolean parallel) { 69 super(source, sourceFlags, parallel); 70 } 71 72 /** 73 * Constructor for the head of a stream pipeline. 74 * 75 * @param source {@code Spliterator} describing the stream source 76 * @param sourceFlags The source flags for the stream source, described in 77 * {@link StreamOpFlag} 78 * @param parallel {@code true} if the pipeline is parallel 79 */ IntPipeline(Spliterator<Integer> source, int sourceFlags, boolean parallel)80 IntPipeline(Spliterator<Integer> source, 81 int sourceFlags, boolean parallel) { 82 super(source, sourceFlags, parallel); 83 } 84 85 /** 86 * Constructor for appending an intermediate operation onto an existing 87 * pipeline. 88 * 89 * @param upstream the upstream element source 90 * @param opFlags the operation flags for the new operation 91 */ IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags)92 IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) { 93 super(upstream, opFlags); 94 } 95 96 /** 97 * Adapt a {@code Sink<Integer> to an {@code IntConsumer}, ideally simply 98 * by casting. 99 */ adapt(Sink<Integer> sink)100 private static IntConsumer adapt(Sink<Integer> sink) { 101 if (sink instanceof IntConsumer) { 102 return (IntConsumer) sink; 103 } 104 else { 105 if (Tripwire.ENABLED) 106 Tripwire.trip(AbstractPipeline.class, 107 "using IntStream.adapt(Sink<Integer> s)"); 108 return sink::accept; 109 } 110 } 111 112 /** 113 * Adapt a {@code Spliterator<Integer>} to a {@code Spliterator.OfInt}. 114 * 115 * @implNote 116 * The implementation attempts to cast to a Spliterator.OfInt, and throws an 117 * exception if this cast is not possible. 118 */ adapt(Spliterator<Integer> s)119 private static Spliterator.OfInt adapt(Spliterator<Integer> s) { 120 if (s instanceof Spliterator.OfInt) { 121 return (Spliterator.OfInt) s; 122 } 123 else { 124 if (Tripwire.ENABLED) 125 Tripwire.trip(AbstractPipeline.class, 126 "using IntStream.adapt(Spliterator<Integer> s)"); 127 throw new UnsupportedOperationException("IntStream.adapt(Spliterator<Integer> s)"); 128 } 129 } 130 131 132 // Shape-specific methods 133 134 @Override 135 // Android-changed: Make public, to match the method it's overriding. getOutputShape()136 public final StreamShape getOutputShape() { 137 return StreamShape.INT_VALUE; 138 } 139 140 @Override 141 // Android-changed: Make public, to match the method it's overriding. evaluateToNode(PipelineHelper<Integer> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<Integer[]> generator)142 public final <P_IN> Node<Integer> evaluateToNode(PipelineHelper<Integer> helper, 143 Spliterator<P_IN> spliterator, 144 boolean flattenTree, 145 IntFunction<Integer[]> generator) { 146 return Nodes.collectInt(helper, spliterator, flattenTree); 147 } 148 149 @Override 150 // Android-changed: Make public, to match the method it's overriding. wrap(PipelineHelper<Integer> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel)151 public final <P_IN> Spliterator<Integer> wrap(PipelineHelper<Integer> ph, 152 Supplier<Spliterator<P_IN>> supplier, 153 boolean isParallel) { 154 return new StreamSpliterators.IntWrappingSpliterator<>(ph, supplier, isParallel); 155 } 156 157 @Override 158 @SuppressWarnings("unchecked") 159 // Android-changed: Make public, to match the method it's overriding. lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier)160 public final Spliterator.OfInt lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier) { 161 return new StreamSpliterators.DelegatingSpliterator.OfInt((Supplier<Spliterator.OfInt>) supplier); 162 } 163 164 @Override 165 // Android-changed: Make public, to match the method it's overriding. forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink)166 public final boolean forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) { 167 Spliterator.OfInt spl = adapt(spliterator); 168 IntConsumer adaptedSink = adapt(sink); 169 boolean cancelled; 170 do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink)); 171 return cancelled; 172 } 173 174 @Override 175 // Android-changed: Make public, to match the method it's overriding. makeNodeBuilder(long exactSizeIfKnown, IntFunction<Integer[]> generator)176 public final Node.Builder<Integer> makeNodeBuilder(long exactSizeIfKnown, 177 IntFunction<Integer[]> generator) { 178 return Nodes.intBuilder(exactSizeIfKnown); 179 } 180 181 182 // IntStream 183 184 @Override iterator()185 public final PrimitiveIterator.OfInt iterator() { 186 return Spliterators.iterator(spliterator()); 187 } 188 189 @Override spliterator()190 public final Spliterator.OfInt spliterator() { 191 return adapt(super.spliterator()); 192 } 193 194 // Stateless intermediate ops from IntStream 195 196 @Override asLongStream()197 public final LongStream asLongStream() { 198 return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 199 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 200 @Override 201 public Sink<Integer> opWrapSink(int flags, Sink<Long> sink) { 202 return new Sink.ChainedInt<Long>(sink) { 203 @Override 204 public void accept(int t) { 205 downstream.accept((long) t); 206 } 207 }; 208 } 209 }; 210 } 211 212 @Override 213 public final DoubleStream asDoubleStream() { 214 return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 215 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 216 @Override 217 // Android-changed: Make public, to match the method it's overriding. 218 public Sink<Integer> opWrapSink(int flags, Sink<Double> sink) { 219 return new Sink.ChainedInt<Double>(sink) { 220 @Override 221 public void accept(int t) { 222 downstream.accept((double) t); 223 } 224 }; 225 } 226 }; 227 } 228 229 @Override 230 public final Stream<Integer> boxed() { 231 return mapToObj(Integer::valueOf); 232 } 233 234 @Override 235 public final IntStream map(IntUnaryOperator mapper) { 236 Objects.requireNonNull(mapper); 237 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 238 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 239 @Override 240 // Android-changed: Make public, to match the method it's overriding. 241 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 242 return new Sink.ChainedInt<Integer>(sink) { 243 @Override 244 public void accept(int t) { 245 downstream.accept(mapper.applyAsInt(t)); 246 } 247 }; 248 } 249 }; 250 } 251 252 @Override 253 public final <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) { 254 Objects.requireNonNull(mapper); 255 return new ReferencePipeline.StatelessOp<Integer, U>(this, StreamShape.INT_VALUE, 256 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 257 @Override 258 // Android-changed: Make public, to match the method it's overriding. 259 public Sink<Integer> opWrapSink(int flags, Sink<U> sink) { 260 return new Sink.ChainedInt<U>(sink) { 261 @Override 262 public void accept(int t) { 263 downstream.accept(mapper.apply(t)); 264 } 265 }; 266 } 267 }; 268 } 269 270 @Override 271 public final LongStream mapToLong(IntToLongFunction mapper) { 272 Objects.requireNonNull(mapper); 273 return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 274 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 275 @Override 276 // Android-changed: Make public, to match the method it's overriding. 277 public Sink<Integer> opWrapSink(int flags, Sink<Long> sink) { 278 return new Sink.ChainedInt<Long>(sink) { 279 @Override 280 public void accept(int t) { 281 downstream.accept(mapper.applyAsLong(t)); 282 } 283 }; 284 } 285 }; 286 } 287 288 @Override 289 public final DoubleStream mapToDouble(IntToDoubleFunction mapper) { 290 Objects.requireNonNull(mapper); 291 return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 292 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 293 @Override 294 // Android-changed: Make public, to match the method it's overriding. 295 public Sink<Integer> opWrapSink(int flags, Sink<Double> sink) { 296 return new Sink.ChainedInt<Double>(sink) { 297 @Override 298 public void accept(int t) { 299 downstream.accept(mapper.applyAsDouble(t)); 300 } 301 }; 302 } 303 }; 304 } 305 306 @Override 307 public final IntStream flatMap(IntFunction<? extends IntStream> mapper) { 308 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 309 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 310 @Override 311 // Android-changed: Make public, to match the method it's overriding. 312 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 313 return new Sink.ChainedInt<Integer>(sink) { 314 @Override 315 public void begin(long size) { 316 downstream.begin(-1); 317 } 318 319 @Override 320 public void accept(int t) { 321 try (IntStream result = mapper.apply(t)) { 322 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 323 if (result != null) 324 result.sequential().forEach(i -> downstream.accept(i)); 325 } 326 } 327 }; 328 } 329 }; 330 } 331 332 @Override 333 public IntStream unordered() { 334 if (!isOrdered()) 335 return this; 336 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, StreamOpFlag.NOT_ORDERED) { 337 @Override 338 // Android-changed: Make public, to match the method it's overriding. 339 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 340 return sink; 341 } 342 }; 343 } 344 345 @Override 346 public final IntStream filter(IntPredicate predicate) { 347 Objects.requireNonNull(predicate); 348 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 349 StreamOpFlag.NOT_SIZED) { 350 @Override 351 // Android-changed: Make public, to match the method it's overriding. 352 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 353 return new Sink.ChainedInt<Integer>(sink) { 354 @Override 355 public void begin(long size) { 356 downstream.begin(-1); 357 } 358 359 @Override 360 public void accept(int t) { 361 if (predicate.test(t)) 362 downstream.accept(t); 363 } 364 }; 365 } 366 }; 367 } 368 369 @Override 370 public final IntStream peek(IntConsumer action) { 371 Objects.requireNonNull(action); 372 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 373 0) { 374 @Override 375 // Android-changed: Make public, to match the method it's overriding. 376 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 377 return new Sink.ChainedInt<Integer>(sink) { 378 @Override 379 public void accept(int t) { 380 action.accept(t); 381 downstream.accept(t); 382 } 383 }; 384 } 385 }; 386 } 387 388 // Stateful intermediate ops from IntStream 389 390 @Override 391 public final IntStream limit(long maxSize) { 392 if (maxSize < 0) 393 throw new IllegalArgumentException(Long.toString(maxSize)); 394 return SliceOps.makeInt(this, 0, maxSize); 395 } 396 397 @Override 398 public final IntStream skip(long n) { 399 if (n < 0) 400 throw new IllegalArgumentException(Long.toString(n)); 401 if (n == 0) 402 return this; 403 else 404 return SliceOps.makeInt(this, n, -1); 405 } 406 407 @Override 408 public final IntStream sorted() { 409 return SortedOps.makeInt(this); 410 } 411 412 @Override 413 public final IntStream distinct() { 414 // While functional and quick to implement, this approach is not very efficient. 415 // An efficient version requires an int-specific map/set implementation. 416 return boxed().distinct().mapToInt(i -> i); 417 } 418 419 // Terminal ops from IntStream 420 421 @Override 422 public void forEach(IntConsumer action) { 423 evaluate(ForEachOps.makeInt(action, false)); 424 } 425 426 @Override 427 public void forEachOrdered(IntConsumer action) { 428 evaluate(ForEachOps.makeInt(action, true)); 429 } 430 431 @Override 432 public final int sum() { 433 return reduce(0, Integer::sum); 434 } 435 436 @Override 437 public final OptionalInt min() { 438 return reduce(Math::min); 439 } 440 441 @Override 442 public final OptionalInt max() { 443 return reduce(Math::max); 444 } 445 446 @Override 447 public final long count() { 448 return mapToLong(e -> 1L).sum(); 449 } 450 451 @Override 452 public final OptionalDouble average() { 453 long[] avg = collect(() -> new long[2], 454 (ll, i) -> { 455 ll[0]++; 456 ll[1] += i; 457 }, 458 (ll, rr) -> { 459 ll[0] += rr[0]; 460 ll[1] += rr[1]; 461 }); 462 return avg[0] > 0 463 ? OptionalDouble.of((double) avg[1] / avg[0]) 464 : OptionalDouble.empty(); 465 } 466 467 @Override 468 public final IntSummaryStatistics summaryStatistics() { 469 return collect(IntSummaryStatistics::new, IntSummaryStatistics::accept, 470 IntSummaryStatistics::combine); 471 } 472 473 @Override 474 public final int reduce(int identity, IntBinaryOperator op) { 475 return evaluate(ReduceOps.makeInt(identity, op)); 476 } 477 478 @Override 479 public final OptionalInt reduce(IntBinaryOperator op) { 480 return evaluate(ReduceOps.makeInt(op)); 481 } 482 483 @Override 484 public final <R> R collect(Supplier<R> supplier, 485 ObjIntConsumer<R> accumulator, 486 BiConsumer<R, R> combiner) { 487 BinaryOperator<R> operator = (left, right) -> { 488 combiner.accept(left, right); 489 return left; 490 }; 491 return evaluate(ReduceOps.makeInt(supplier, accumulator, operator)); 492 } 493 494 @Override 495 public final boolean anyMatch(IntPredicate predicate) { 496 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ANY)); 497 } 498 499 @Override 500 public final boolean allMatch(IntPredicate predicate) { 501 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ALL)); 502 } 503 504 @Override 505 public final boolean noneMatch(IntPredicate predicate) { 506 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.NONE)); 507 } 508 509 @Override 510 public final OptionalInt findFirst() { 511 return evaluate(FindOps.makeInt(true)); 512 } 513 514 @Override 515 public final OptionalInt findAny() { 516 return evaluate(FindOps.makeInt(false)); 517 } 518 519 @Override 520 public final int[] toArray() { 521 return Nodes.flattenInt((Node.OfInt) evaluateToArrayNode(Integer[]::new)) 522 .asPrimitiveArray(); 523 } 524 525 // 526 527 /** 528 * Source stage of an IntStream. 529 * 530 * @param <E_IN> type of elements in the upstream source 531 * @since 1.8 532 * @hide Visible for CTS testing only (OpenJDK8 tests). 533 */ 534 // Android-changed: Made public for CTS tests only. 535 public static class Head<E_IN> extends IntPipeline<E_IN> { 536 /** 537 * Constructor for the source stage of an IntStream. 538 * 539 * @param source {@code Supplier<Spliterator>} describing the stream 540 * source 541 * @param sourceFlags the source flags for the stream source, described 542 * in {@link StreamOpFlag} 543 * @param parallel {@code true} if the pipeline is parallel 544 */ 545 // Android-changed: Made public for CTS tests only. 546 public Head(Supplier<? extends Spliterator<Integer>> source, 547 int sourceFlags, boolean parallel) { 548 super(source, sourceFlags, parallel); 549 } 550 551 /** 552 * Constructor for the source stage of an IntStream. 553 * 554 * @param source {@code Spliterator} describing the stream source 555 * @param sourceFlags the source flags for the stream source, described 556 * in {@link StreamOpFlag} 557 * @param parallel {@code true} if the pipeline is parallel 558 */ 559 // Android-changed: Made public for CTS tests only. 560 public Head(Spliterator<Integer> source, 561 int sourceFlags, boolean parallel) { 562 super(source, sourceFlags, parallel); 563 } 564 565 @Override 566 // Android-changed: Make public, to match the method it's overriding. 567 public final boolean opIsStateful() { 568 throw new UnsupportedOperationException(); 569 } 570 571 @Override 572 // Android-changed: Make public, to match the method it's overriding. 573 public final Sink<E_IN> opWrapSink(int flags, Sink<Integer> sink) { 574 throw new UnsupportedOperationException(); 575 } 576 577 // Optimized sequential terminal operations for the head of the pipeline 578 579 @Override 580 public void forEach(IntConsumer action) { 581 if (!isParallel()) { 582 adapt(sourceStageSpliterator()).forEachRemaining(action); 583 } 584 else { 585 super.forEach(action); 586 } 587 } 588 589 @Override 590 public void forEachOrdered(IntConsumer action) { 591 if (!isParallel()) { 592 adapt(sourceStageSpliterator()).forEachRemaining(action); 593 } 594 else { 595 super.forEachOrdered(action); 596 } 597 } 598 } 599 600 /** 601 * Base class for a stateless intermediate stage of an IntStream 602 * 603 * @param <E_IN> type of elements in the upstream source 604 * @since 1.8 605 * @hide Visible for CTS testing only (OpenJDK8 tests). 606 */ 607 // Android-changed: Made public for CTS tests only. 608 public abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN> { 609 /** 610 * Construct a new IntStream by appending a stateless intermediate 611 * operation to an existing stream. 612 * @param upstream The upstream pipeline stage 613 * @param inputShape The stream shape for the upstream pipeline stage 614 * @param opFlags Operation flags for the new stage 615 */ 616 // Android-changed: Made public for CTS tests only. 617 public StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, 618 StreamShape inputShape, 619 int opFlags) { 620 super(upstream, opFlags); 621 assert upstream.getOutputShape() == inputShape; 622 } 623 624 @Override 625 // Android-changed: Make public, to match the method it's overriding. 626 public final boolean opIsStateful() { 627 return false; 628 } 629 } 630 631 /** 632 * Base class for a stateful intermediate stage of an IntStream. 633 * 634 * @param <E_IN> type of elements in the upstream source 635 * @since 1.8 636 * @hide Visible for CTS testing only (OpenJDK8 tests). 637 */ 638 // Android-changed: Made public for CTS tests only. 639 public abstract static class StatefulOp<E_IN> extends IntPipeline<E_IN> { 640 /** 641 * Construct a new IntStream by appending a stateful intermediate 642 * operation to an existing stream. 643 * @param upstream The upstream pipeline stage 644 * @param inputShape The stream shape for the upstream pipeline stage 645 * @param opFlags Operation flags for the new stage 646 */ 647 // Android-changed: Made public for CTS tests only. 648 public StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, 649 StreamShape inputShape, 650 int opFlags) { 651 super(upstream, opFlags); 652 assert upstream.getOutputShape() == inputShape; 653 } 654 655 @Override 656 // Android-changed: Make public, to match the method it's overriding. 657 public final boolean opIsStateful() { 658 return true; 659 } 660 661 @Override 662 // Android-changed: Make public, to match the method it's overriding. 663 public abstract <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, 664 Spliterator<P_IN> spliterator, 665 IntFunction<Integer[]> generator); 666 } 667 } 668