1 /* 2 * Copyright (c) 2013, 2014, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.DoubleSummaryStatistics; 28 import java.util.Objects; 29 import java.util.OptionalDouble; 30 import java.util.PrimitiveIterator; 31 import java.util.Spliterator; 32 import java.util.Spliterators; 33 import java.util.function.BiConsumer; 34 import java.util.function.BinaryOperator; 35 import java.util.function.DoubleBinaryOperator; 36 import java.util.function.DoubleConsumer; 37 import java.util.function.DoubleFunction; 38 import java.util.function.DoublePredicate; 39 import java.util.function.DoubleToIntFunction; 40 import java.util.function.DoubleToLongFunction; 41 import java.util.function.DoubleUnaryOperator; 42 import java.util.function.IntFunction; 43 import java.util.function.ObjDoubleConsumer; 44 import java.util.function.Supplier; 45 46 /** 47 * Abstract base class for an intermediate pipeline stage or pipeline source 48 * stage implementing whose elements are of type {@code double}. 49 * 50 * @param <E_IN> type of elements in the upstream source 51 * 52 * @since 1.8 53 * @hide Visible for CTS testing only (OpenJDK8 tests). 54 */ 55 // Android-changed: Made public for CTS tests only. 56 public abstract class DoublePipeline<E_IN> 57 extends AbstractPipeline<E_IN, Double, DoubleStream> 58 implements DoubleStream { 59 60 /** 61 * Constructor for the head of a stream pipeline. 62 * 63 * @param source {@code Supplier<Spliterator>} describing the stream source 64 * @param sourceFlags the source flags for the stream source, described in 65 * {@link StreamOpFlag} 66 */ DoublePipeline(Supplier<? extends Spliterator<Double>> source, int sourceFlags, boolean parallel)67 DoublePipeline(Supplier<? extends Spliterator<Double>> source, 68 int sourceFlags, boolean parallel) { 69 super(source, sourceFlags, parallel); 70 } 71 72 /** 73 * Constructor for the head of a stream pipeline. 74 * 75 * @param source {@code Spliterator} describing the stream source 76 * @param sourceFlags the source flags for the stream source, described in 77 * {@link StreamOpFlag} 78 */ DoublePipeline(Spliterator<Double> source, int sourceFlags, boolean parallel)79 DoublePipeline(Spliterator<Double> source, 80 int sourceFlags, boolean parallel) { 81 super(source, sourceFlags, parallel); 82 } 83 84 /** 85 * Constructor for appending an intermediate operation onto an existing 86 * pipeline. 87 * 88 * @param upstream the upstream element source. 89 * @param opFlags the operation flags 90 */ DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags)91 DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) { 92 super(upstream, opFlags); 93 } 94 95 /** 96 * Adapt a {@code Sink<Double> to a {@code DoubleConsumer}, ideally simply 97 * by casting. 98 */ adapt(Sink<Double> sink)99 private static DoubleConsumer adapt(Sink<Double> sink) { 100 if (sink instanceof DoubleConsumer) { 101 return (DoubleConsumer) sink; 102 } else { 103 if (Tripwire.ENABLED) 104 Tripwire.trip(AbstractPipeline.class, 105 "using DoubleStream.adapt(Sink<Double> s)"); 106 return sink::accept; 107 } 108 } 109 110 /** 111 * Adapt a {@code Spliterator<Double>} to a {@code Spliterator.OfDouble}. 112 * 113 * @implNote 114 * The implementation attempts to cast to a Spliterator.OfDouble, and throws 115 * an exception if this cast is not possible. 116 */ adapt(Spliterator<Double> s)117 private static Spliterator.OfDouble adapt(Spliterator<Double> s) { 118 if (s instanceof Spliterator.OfDouble) { 119 return (Spliterator.OfDouble) s; 120 } else { 121 if (Tripwire.ENABLED) 122 Tripwire.trip(AbstractPipeline.class, 123 "using DoubleStream.adapt(Spliterator<Double> s)"); 124 throw new UnsupportedOperationException("DoubleStream.adapt(Spliterator<Double> s)"); 125 } 126 } 127 128 129 // Shape-specific methods 130 131 @Override 132 // Android-changed: Make public, to match the method it's overriding. getOutputShape()133 public final StreamShape getOutputShape() { 134 return StreamShape.DOUBLE_VALUE; 135 } 136 137 @Override 138 // Android-changed: Make public, to match the method it's overriding. evaluateToNode(PipelineHelper<Double> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<Double[]> generator)139 public final <P_IN> Node<Double> evaluateToNode(PipelineHelper<Double> helper, 140 Spliterator<P_IN> spliterator, 141 boolean flattenTree, 142 IntFunction<Double[]> generator) { 143 return Nodes.collectDouble(helper, spliterator, flattenTree); 144 } 145 146 @Override 147 // Android-changed: Make public, to match the method it's overriding. wrap(PipelineHelper<Double> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel)148 public final <P_IN> Spliterator<Double> wrap(PipelineHelper<Double> ph, 149 Supplier<Spliterator<P_IN>> supplier, 150 boolean isParallel) { 151 return new StreamSpliterators.DoubleWrappingSpliterator<>(ph, supplier, isParallel); 152 } 153 154 @Override 155 @SuppressWarnings("unchecked") 156 // Android-changed: Make public, to match the method it's overriding. lazySpliterator(Supplier<? extends Spliterator<Double>> supplier)157 public final Spliterator.OfDouble lazySpliterator(Supplier<? extends Spliterator<Double>> supplier) { 158 return new StreamSpliterators.DelegatingSpliterator.OfDouble((Supplier<Spliterator.OfDouble>) supplier); 159 } 160 161 @Override 162 // Android-changed: Make public, to match the method it's overriding. forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink)163 public final boolean forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) { 164 Spliterator.OfDouble spl = adapt(spliterator); 165 DoubleConsumer adaptedSink = adapt(sink); 166 boolean cancelled; 167 do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink)); 168 return cancelled; 169 } 170 171 @Override 172 // Android-changed: Make public, to match the method it's overriding. makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator)173 public final Node.Builder<Double> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator) { 174 return Nodes.doubleBuilder(exactSizeIfKnown); 175 } 176 177 178 // DoubleStream 179 180 @Override iterator()181 public final PrimitiveIterator.OfDouble iterator() { 182 return Spliterators.iterator(spliterator()); 183 } 184 185 @Override spliterator()186 public final Spliterator.OfDouble spliterator() { 187 return adapt(super.spliterator()); 188 } 189 190 // Stateless intermediate ops from DoubleStream 191 192 @Override boxed()193 public final Stream<Double> boxed() { 194 return mapToObj(Double::valueOf); 195 } 196 197 @Override map(DoubleUnaryOperator mapper)198 public final DoubleStream map(DoubleUnaryOperator mapper) { 199 Objects.requireNonNull(mapper); 200 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 201 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 202 @Override 203 // Android-changed: Make public, to match the method it's overriding. 204 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 205 return new Sink.ChainedDouble<Double>(sink) { 206 @Override 207 public void accept(double t) { 208 downstream.accept(mapper.applyAsDouble(t)); 209 } 210 }; 211 } 212 }; 213 } 214 215 @Override 216 public final <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) { 217 Objects.requireNonNull(mapper); 218 return new ReferencePipeline.StatelessOp<Double, U>(this, StreamShape.DOUBLE_VALUE, 219 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 220 @Override 221 // Android-changed: Make public, to match the method it's overriding. 222 public Sink<Double> opWrapSink(int flags, Sink<U> sink) { 223 return new Sink.ChainedDouble<U>(sink) { 224 @Override 225 public void accept(double t) { 226 downstream.accept(mapper.apply(t)); 227 } 228 }; 229 } 230 }; 231 } 232 233 @Override 234 public final IntStream mapToInt(DoubleToIntFunction mapper) { 235 Objects.requireNonNull(mapper); 236 return new IntPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 237 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 238 @Override 239 // Android-changed: Make public, to match the method it's overriding. 240 public Sink<Double> opWrapSink(int flags, Sink<Integer> sink) { 241 return new Sink.ChainedDouble<Integer>(sink) { 242 @Override 243 public void accept(double t) { 244 downstream.accept(mapper.applyAsInt(t)); 245 } 246 }; 247 } 248 }; 249 } 250 251 @Override 252 public final LongStream mapToLong(DoubleToLongFunction mapper) { 253 Objects.requireNonNull(mapper); 254 return new LongPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 255 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 256 @Override 257 // Android-changed: Make public, to match the method it's overriding. 258 public Sink<Double> opWrapSink(int flags, Sink<Long> sink) { 259 return new Sink.ChainedDouble<Long>(sink) { 260 @Override 261 public void accept(double t) { 262 downstream.accept(mapper.applyAsLong(t)); 263 } 264 }; 265 } 266 }; 267 } 268 269 @Override 270 public final DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) { 271 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 272 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 273 @Override 274 // Android-changed: Make public, to match the method it's overriding. 275 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 276 return new Sink.ChainedDouble<Double>(sink) { 277 @Override 278 public void begin(long size) { 279 downstream.begin(-1); 280 } 281 282 @Override 283 public void accept(double t) { 284 try (DoubleStream result = mapper.apply(t)) { 285 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 286 if (result != null) 287 result.sequential().forEach(i -> downstream.accept(i)); 288 } 289 } 290 }; 291 } 292 }; 293 } 294 295 @Override 296 public DoubleStream unordered() { 297 if (!isOrdered()) 298 return this; 299 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, StreamOpFlag.NOT_ORDERED) { 300 @Override 301 // Android-changed: Make public, to match the method it's overriding. 302 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 303 return sink; 304 } 305 }; 306 } 307 308 @Override 309 public final DoubleStream filter(DoublePredicate predicate) { 310 Objects.requireNonNull(predicate); 311 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 312 StreamOpFlag.NOT_SIZED) { 313 @Override 314 // Android-changed: Make public, to match the method it's overriding. 315 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 316 return new Sink.ChainedDouble<Double>(sink) { 317 @Override 318 public void begin(long size) { 319 downstream.begin(-1); 320 } 321 322 @Override 323 public void accept(double t) { 324 if (predicate.test(t)) 325 downstream.accept(t); 326 } 327 }; 328 } 329 }; 330 } 331 332 @Override 333 public final DoubleStream peek(DoubleConsumer action) { 334 Objects.requireNonNull(action); 335 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 336 0) { 337 @Override 338 // Android-changed: Make public, to match the method it's overriding. 339 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 340 return new Sink.ChainedDouble<Double>(sink) { 341 @Override 342 public void accept(double t) { 343 action.accept(t); 344 downstream.accept(t); 345 } 346 }; 347 } 348 }; 349 } 350 351 // Stateful intermediate ops from DoubleStream 352 353 @Override 354 public final DoubleStream limit(long maxSize) { 355 if (maxSize < 0) 356 throw new IllegalArgumentException(Long.toString(maxSize)); 357 return SliceOps.makeDouble(this, (long) 0, maxSize); 358 } 359 360 @Override 361 public final DoubleStream skip(long n) { 362 if (n < 0) 363 throw new IllegalArgumentException(Long.toString(n)); 364 if (n == 0) 365 return this; 366 else { 367 long limit = -1; 368 return SliceOps.makeDouble(this, n, limit); 369 } 370 } 371 372 @Override 373 public final DoubleStream sorted() { 374 return SortedOps.makeDouble(this); 375 } 376 377 @Override 378 public final DoubleStream distinct() { 379 // While functional and quick to implement, this approach is not very efficient. 380 // An efficient version requires a double-specific map/set implementation. 381 return boxed().distinct().mapToDouble(i -> (double) i); 382 } 383 384 // Terminal ops from DoubleStream 385 386 @Override 387 public void forEach(DoubleConsumer consumer) { 388 evaluate(ForEachOps.makeDouble(consumer, false)); 389 } 390 391 @Override 392 public void forEachOrdered(DoubleConsumer consumer) { 393 evaluate(ForEachOps.makeDouble(consumer, true)); 394 } 395 396 @Override 397 public final double sum() { 398 /* 399 * In the arrays allocated for the collect operation, index 0 400 * holds the high-order bits of the running sum, index 1 holds 401 * the low-order bits of the sum computed via compensated 402 * summation, and index 2 holds the simple sum used to compute 403 * the proper result if the stream contains infinite values of 404 * the same sign. 405 */ 406 double[] summation = collect(() -> new double[3], 407 (ll, d) -> { 408 Collectors.sumWithCompensation(ll, d); 409 ll[2] += d; 410 }, 411 (ll, rr) -> { 412 Collectors.sumWithCompensation(ll, rr[0]); 413 Collectors.sumWithCompensation(ll, rr[1]); 414 ll[2] += rr[2]; 415 }); 416 417 return Collectors.computeFinalSum(summation); 418 } 419 420 @Override 421 public final OptionalDouble min() { 422 return reduce(Math::min); 423 } 424 425 @Override 426 public final OptionalDouble max() { 427 return reduce(Math::max); 428 } 429 430 /** 431 * {@inheritDoc} 432 * 433 * @implNote The {@code double} format can represent all 434 * consecutive integers in the range -2<sup>53</sup> to 435 * 2<sup>53</sup>. If the pipeline has more than 2<sup>53</sup> 436 * values, the divisor in the average computation will saturate at 437 * 2<sup>53</sup>, leading to additional numerical errors. 438 */ 439 @Override 440 public final OptionalDouble average() { 441 /* 442 * In the arrays allocated for the collect operation, index 0 443 * holds the high-order bits of the running sum, index 1 holds 444 * the low-order bits of the sum computed via compensated 445 * summation, index 2 holds the number of values seen, index 3 446 * holds the simple sum. 447 */ 448 double[] avg = collect(() -> new double[4], 449 (ll, d) -> { 450 ll[2]++; 451 Collectors.sumWithCompensation(ll, d); 452 ll[3] += d; 453 }, 454 (ll, rr) -> { 455 Collectors.sumWithCompensation(ll, rr[0]); 456 Collectors.sumWithCompensation(ll, rr[1]); 457 ll[2] += rr[2]; 458 ll[3] += rr[3]; 459 }); 460 return avg[2] > 0 461 ? OptionalDouble.of(Collectors.computeFinalSum(avg) / avg[2]) 462 : OptionalDouble.empty(); 463 } 464 465 @Override 466 public final long count() { 467 return mapToLong(e -> 1L).sum(); 468 } 469 470 @Override 471 public final DoubleSummaryStatistics summaryStatistics() { 472 return collect(DoubleSummaryStatistics::new, DoubleSummaryStatistics::accept, 473 DoubleSummaryStatistics::combine); 474 } 475 476 @Override 477 public final double reduce(double identity, DoubleBinaryOperator op) { 478 return evaluate(ReduceOps.makeDouble(identity, op)); 479 } 480 481 @Override 482 public final OptionalDouble reduce(DoubleBinaryOperator op) { 483 return evaluate(ReduceOps.makeDouble(op)); 484 } 485 486 @Override 487 public final <R> R collect(Supplier<R> supplier, 488 ObjDoubleConsumer<R> accumulator, 489 BiConsumer<R, R> combiner) { 490 BinaryOperator<R> operator = (left, right) -> { 491 combiner.accept(left, right); 492 return left; 493 }; 494 return evaluate(ReduceOps.makeDouble(supplier, accumulator, operator)); 495 } 496 497 @Override 498 public final boolean anyMatch(DoublePredicate predicate) { 499 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ANY)); 500 } 501 502 @Override 503 public final boolean allMatch(DoublePredicate predicate) { 504 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ALL)); 505 } 506 507 @Override 508 public final boolean noneMatch(DoublePredicate predicate) { 509 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.NONE)); 510 } 511 512 @Override 513 public final OptionalDouble findFirst() { 514 return evaluate(FindOps.makeDouble(true)); 515 } 516 517 @Override 518 public final OptionalDouble findAny() { 519 return evaluate(FindOps.makeDouble(false)); 520 } 521 522 @Override 523 public final double[] toArray() { 524 return Nodes.flattenDouble((Node.OfDouble) evaluateToArrayNode(Double[]::new)) 525 .asPrimitiveArray(); 526 } 527 528 // 529 530 /** 531 * Source stage of a DoubleStream 532 * 533 * @param <E_IN> type of elements in the upstream source 534 * @hide Made public for CTS tests only (OpenJDK 8 streams tests). 535 */ 536 // Android-changed: Made public for CTS tests only. 537 public static class Head<E_IN> extends DoublePipeline<E_IN> { 538 /** 539 * Constructor for the source stage of a DoubleStream. 540 * 541 * @param source {@code Supplier<Spliterator>} describing the stream 542 * source 543 * @param sourceFlags the source flags for the stream source, described 544 * in {@link StreamOpFlag} 545 * @param parallel {@code true} if the pipeline is parallel 546 */ 547 // Android-changed: Made public for CTS tests only. 548 public Head(Supplier<? extends Spliterator<Double>> source, 549 int sourceFlags, boolean parallel) { 550 super(source, sourceFlags, parallel); 551 } 552 553 /** 554 * Constructor for the source stage of a DoubleStream. 555 * 556 * @param source {@code Spliterator} describing the stream source 557 * @param sourceFlags the source flags for the stream source, described 558 * in {@link StreamOpFlag} 559 * @param parallel {@code true} if the pipeline is parallel 560 */ 561 // Android-changed: Made public for CTS tests only. 562 public Head(Spliterator<Double> source, 563 int sourceFlags, boolean parallel) { 564 super(source, sourceFlags, parallel); 565 } 566 567 @Override 568 // Android-changed: Made public for CTS tests only. 569 public final boolean opIsStateful() { 570 throw new UnsupportedOperationException(); 571 } 572 573 @Override 574 // Android-changed: Made public for CTS tests only. 575 public final Sink<E_IN> opWrapSink(int flags, Sink<Double> sink) { 576 throw new UnsupportedOperationException(); 577 } 578 579 // Optimized sequential terminal operations for the head of the pipeline 580 581 @Override 582 public void forEach(DoubleConsumer consumer) { 583 if (!isParallel()) { 584 adapt(sourceStageSpliterator()).forEachRemaining(consumer); 585 } 586 else { 587 super.forEach(consumer); 588 } 589 } 590 591 @Override 592 public void forEachOrdered(DoubleConsumer consumer) { 593 if (!isParallel()) { 594 adapt(sourceStageSpliterator()).forEachRemaining(consumer); 595 } 596 else { 597 super.forEachOrdered(consumer); 598 } 599 } 600 601 } 602 603 /** 604 * Base class for a stateless intermediate stage of a DoubleStream. 605 * 606 * @param <E_IN> type of elements in the upstream source 607 * @since 1.8 608 * @hide Visible for CTS testing only (OpenJDK8 tests). 609 */ 610 // Android-changed: Made public for CTS tests only. 611 public abstract static class StatelessOp<E_IN> extends DoublePipeline<E_IN> { 612 /** 613 * Construct a new DoubleStream by appending a stateless intermediate 614 * operation to an existing stream. 615 * 616 * @param upstream the upstream pipeline stage 617 * @param inputShape the stream shape for the upstream pipeline stage 618 * @param opFlags operation flags for the new stage 619 */ 620 // Android-changed: Made public for CTS tests only. 621 public StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, 622 StreamShape inputShape, 623 int opFlags) { 624 super(upstream, opFlags); 625 assert upstream.getOutputShape() == inputShape; 626 } 627 628 @Override 629 // Android-changed: Make public, to match the method it's overriding. 630 public final boolean opIsStateful() { 631 return false; 632 } 633 } 634 635 /** 636 * Base class for a stateful intermediate stage of a DoubleStream. 637 * 638 * @param <E_IN> type of elements in the upstream source 639 * @since 1.8 640 * @hide Visible for CTS testing only (OpenJDK8 tests). 641 */ 642 // Android-changed: Made public for CTS tests only. 643 public abstract static class StatefulOp<E_IN> extends DoublePipeline<E_IN> { 644 /** 645 * Construct a new DoubleStream by appending a stateful intermediate 646 * operation to an existing stream. 647 * 648 * @param upstream the upstream pipeline stage 649 * @param inputShape the stream shape for the upstream pipeline stage 650 * @param opFlags operation flags for the new stage 651 */ 652 // Android-changed: Made public for CTS tests only. 653 public StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, 654 StreamShape inputShape, 655 int opFlags) { 656 super(upstream, opFlags); 657 assert upstream.getOutputShape() == inputShape; 658 } 659 660 @Override 661 // Android-changed: Make public, to match the method it's overriding. 662 public final boolean opIsStateful() { 663 return true; 664 } 665 666 @Override 667 // Android-changed: Make public, to match the method it's overriding. 668 public abstract <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, 669 Spliterator<P_IN> spliterator, 670 IntFunction<Double[]> generator); 671 } 672 } 673