1 /* 2 * Copyright (c) 2013, 2014, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.DoubleSummaryStatistics; 28 import java.util.Objects; 29 import java.util.OptionalDouble; 30 import java.util.PrimitiveIterator; 31 import java.util.Spliterator; 32 import java.util.Spliterators; 33 import java.util.function.BiConsumer; 34 import java.util.function.BinaryOperator; 35 import java.util.function.DoubleBinaryOperator; 36 import java.util.function.DoubleConsumer; 37 import java.util.function.DoubleFunction; 38 import java.util.function.DoublePredicate; 39 import java.util.function.DoubleToIntFunction; 40 import java.util.function.DoubleToLongFunction; 41 import java.util.function.DoubleUnaryOperator; 42 import java.util.function.IntFunction; 43 import java.util.function.ObjDoubleConsumer; 44 import java.util.function.Supplier; 45 46 /** 47 * Abstract base class for an intermediate pipeline stage or pipeline source 48 * stage implementing whose elements are of type {@code double}. 49 * 50 * @param <E_IN> type of elements in the upstream source 51 * 52 * @since 1.8 53 * @hide Visible for CTS testing only (OpenJDK8 tests). 54 */ 55 // Android-changed: Made public for CTS tests only. 56 public abstract class DoublePipeline<E_IN> 57 extends AbstractPipeline<E_IN, Double, DoubleStream> 58 implements DoubleStream { 59 60 /** 61 * Constructor for the head of a stream pipeline. 62 * 63 * @param source {@code Supplier<Spliterator>} describing the stream source 64 * @param sourceFlags the source flags for the stream source, described in 65 * {@link StreamOpFlag} 66 */ DoublePipeline(Supplier<? extends Spliterator<Double>> source, int sourceFlags, boolean parallel)67 DoublePipeline(Supplier<? extends Spliterator<Double>> source, 68 int sourceFlags, boolean parallel) { 69 super(source, sourceFlags, parallel); 70 } 71 72 /** 73 * Constructor for the head of a stream pipeline. 74 * 75 * @param source {@code Spliterator} describing the stream source 76 * @param sourceFlags the source flags for the stream source, described in 77 * {@link StreamOpFlag} 78 */ DoublePipeline(Spliterator<Double> source, int sourceFlags, boolean parallel)79 DoublePipeline(Spliterator<Double> source, 80 int sourceFlags, boolean parallel) { 81 super(source, sourceFlags, parallel); 82 } 83 84 /** 85 * Constructor for appending an intermediate operation onto an existing 86 * pipeline. 87 * 88 * @param upstream the upstream element source. 89 * @param opFlags the operation flags 90 */ DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags)91 DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) { 92 super(upstream, opFlags); 93 } 94 95 /** 96 * Adapt a {@code Sink<Double> to a {@code DoubleConsumer}, ideally simply 97 * by casting. 98 */ adapt(Sink<Double> sink)99 private static DoubleConsumer adapt(Sink<Double> sink) { 100 if (sink instanceof DoubleConsumer) { 101 return (DoubleConsumer) sink; 102 } else { 103 if (Tripwire.ENABLED) 104 Tripwire.trip(AbstractPipeline.class, 105 "using DoubleStream.adapt(Sink<Double> s)"); 106 return sink::accept; 107 } 108 } 109 110 /** 111 * Adapt a {@code Spliterator<Double>} to a {@code Spliterator.OfDouble}. 112 * 113 * @implNote 114 * The implementation attempts to cast to a Spliterator.OfDouble, and throws 115 * an exception if this cast is not possible. 116 */ adapt(Spliterator<Double> s)117 private static Spliterator.OfDouble adapt(Spliterator<Double> s) { 118 if (s instanceof Spliterator.OfDouble) { 119 return (Spliterator.OfDouble) s; 120 } else { 121 if (Tripwire.ENABLED) 122 Tripwire.trip(AbstractPipeline.class, 123 "using DoubleStream.adapt(Spliterator<Double> s)"); 124 throw new UnsupportedOperationException("DoubleStream.adapt(Spliterator<Double> s)"); 125 } 126 } 127 128 129 // Shape-specific methods 130 131 @Override 132 // Android-changed: Make public, to match the method it's overriding. getOutputShape()133 public final StreamShape getOutputShape() { 134 return StreamShape.DOUBLE_VALUE; 135 } 136 137 @Override 138 // Android-changed: Make public, to match the method it's overriding. evaluateToNode(PipelineHelper<Double> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<Double[]> generator)139 public final <P_IN> Node<Double> evaluateToNode(PipelineHelper<Double> helper, 140 Spliterator<P_IN> spliterator, 141 boolean flattenTree, 142 IntFunction<Double[]> generator) { 143 return Nodes.collectDouble(helper, spliterator, flattenTree); 144 } 145 146 @Override 147 // Android-changed: Make public, to match the method it's overriding. wrap(PipelineHelper<Double> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel)148 public final <P_IN> Spliterator<Double> wrap(PipelineHelper<Double> ph, 149 Supplier<Spliterator<P_IN>> supplier, 150 boolean isParallel) { 151 return new StreamSpliterators.DoubleWrappingSpliterator<>(ph, supplier, isParallel); 152 } 153 154 @Override 155 @SuppressWarnings("unchecked") 156 // Android-changed: Make public, to match the method it's overriding. lazySpliterator(Supplier<? extends Spliterator<Double>> supplier)157 public final Spliterator.OfDouble lazySpliterator(Supplier<? extends Spliterator<Double>> supplier) { 158 return new StreamSpliterators.DelegatingSpliterator.OfDouble((Supplier<Spliterator.OfDouble>) supplier); 159 } 160 161 @Override 162 // Android-changed: Make public, to match the method it's overriding. forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink)163 public final void forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) { 164 Spliterator.OfDouble spl = adapt(spliterator); 165 DoubleConsumer adaptedSink = adapt(sink); 166 do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)); 167 } 168 169 @Override 170 // Android-changed: Make public, to match the method it's overriding. makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator)171 public final Node.Builder<Double> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator) { 172 return Nodes.doubleBuilder(exactSizeIfKnown); 173 } 174 175 176 // DoubleStream 177 178 @Override iterator()179 public final PrimitiveIterator.OfDouble iterator() { 180 return Spliterators.iterator(spliterator()); 181 } 182 183 @Override spliterator()184 public final Spliterator.OfDouble spliterator() { 185 return adapt(super.spliterator()); 186 } 187 188 // Stateless intermediate ops from DoubleStream 189 190 @Override boxed()191 public final Stream<Double> boxed() { 192 return mapToObj(Double::valueOf); 193 } 194 195 @Override map(DoubleUnaryOperator mapper)196 public final DoubleStream map(DoubleUnaryOperator mapper) { 197 Objects.requireNonNull(mapper); 198 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 199 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 200 @Override 201 // Android-changed: Make public, to match the method it's overriding. 202 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 203 return new Sink.ChainedDouble<Double>(sink) { 204 @Override 205 public void accept(double t) { 206 downstream.accept(mapper.applyAsDouble(t)); 207 } 208 }; 209 } 210 }; 211 } 212 213 @Override 214 public final <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) { 215 Objects.requireNonNull(mapper); 216 return new ReferencePipeline.StatelessOp<Double, U>(this, StreamShape.DOUBLE_VALUE, 217 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 218 @Override 219 // Android-changed: Make public, to match the method it's overriding. 220 public Sink<Double> opWrapSink(int flags, Sink<U> sink) { 221 return new Sink.ChainedDouble<U>(sink) { 222 @Override 223 public void accept(double t) { 224 downstream.accept(mapper.apply(t)); 225 } 226 }; 227 } 228 }; 229 } 230 231 @Override 232 public final IntStream mapToInt(DoubleToIntFunction mapper) { 233 Objects.requireNonNull(mapper); 234 return new IntPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 235 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 236 @Override 237 // Android-changed: Make public, to match the method it's overriding. 238 public Sink<Double> opWrapSink(int flags, Sink<Integer> sink) { 239 return new Sink.ChainedDouble<Integer>(sink) { 240 @Override 241 public void accept(double t) { 242 downstream.accept(mapper.applyAsInt(t)); 243 } 244 }; 245 } 246 }; 247 } 248 249 @Override 250 public final LongStream mapToLong(DoubleToLongFunction mapper) { 251 Objects.requireNonNull(mapper); 252 return new LongPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 253 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 254 @Override 255 // Android-changed: Make public, to match the method it's overriding. 256 public Sink<Double> opWrapSink(int flags, Sink<Long> sink) { 257 return new Sink.ChainedDouble<Long>(sink) { 258 @Override 259 public void accept(double t) { 260 downstream.accept(mapper.applyAsLong(t)); 261 } 262 }; 263 } 264 }; 265 } 266 267 @Override 268 public final DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) { 269 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 270 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 271 @Override 272 // Android-changed: Make public, to match the method it's overriding. 273 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 274 return new Sink.ChainedDouble<Double>(sink) { 275 @Override 276 public void begin(long size) { 277 downstream.begin(-1); 278 } 279 280 @Override 281 public void accept(double t) { 282 try (DoubleStream result = mapper.apply(t)) { 283 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 284 if (result != null) 285 result.sequential().forEach(i -> downstream.accept(i)); 286 } 287 } 288 }; 289 } 290 }; 291 } 292 293 @Override 294 public DoubleStream unordered() { 295 if (!isOrdered()) 296 return this; 297 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, StreamOpFlag.NOT_ORDERED) { 298 @Override 299 // Android-changed: Make public, to match the method it's overriding. 300 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 301 return sink; 302 } 303 }; 304 } 305 306 @Override 307 public final DoubleStream filter(DoublePredicate predicate) { 308 Objects.requireNonNull(predicate); 309 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 310 StreamOpFlag.NOT_SIZED) { 311 @Override 312 // Android-changed: Make public, to match the method it's overriding. 313 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 314 return new Sink.ChainedDouble<Double>(sink) { 315 @Override 316 public void begin(long size) { 317 downstream.begin(-1); 318 } 319 320 @Override 321 public void accept(double t) { 322 if (predicate.test(t)) 323 downstream.accept(t); 324 } 325 }; 326 } 327 }; 328 } 329 330 @Override 331 public final DoubleStream peek(DoubleConsumer action) { 332 Objects.requireNonNull(action); 333 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 334 0) { 335 @Override 336 // Android-changed: Make public, to match the method it's overriding. 337 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 338 return new Sink.ChainedDouble<Double>(sink) { 339 @Override 340 public void accept(double t) { 341 action.accept(t); 342 downstream.accept(t); 343 } 344 }; 345 } 346 }; 347 } 348 349 // Stateful intermediate ops from DoubleStream 350 351 @Override 352 public final DoubleStream limit(long maxSize) { 353 if (maxSize < 0) 354 throw new IllegalArgumentException(Long.toString(maxSize)); 355 return SliceOps.makeDouble(this, (long) 0, maxSize); 356 } 357 358 @Override 359 public final DoubleStream skip(long n) { 360 if (n < 0) 361 throw new IllegalArgumentException(Long.toString(n)); 362 if (n == 0) 363 return this; 364 else { 365 long limit = -1; 366 return SliceOps.makeDouble(this, n, limit); 367 } 368 } 369 370 @Override 371 public final DoubleStream sorted() { 372 return SortedOps.makeDouble(this); 373 } 374 375 @Override 376 public final DoubleStream distinct() { 377 // While functional and quick to implement, this approach is not very efficient. 378 // An efficient version requires a double-specific map/set implementation. 379 return boxed().distinct().mapToDouble(i -> (double) i); 380 } 381 382 // Terminal ops from DoubleStream 383 384 @Override 385 public void forEach(DoubleConsumer consumer) { 386 evaluate(ForEachOps.makeDouble(consumer, false)); 387 } 388 389 @Override 390 public void forEachOrdered(DoubleConsumer consumer) { 391 evaluate(ForEachOps.makeDouble(consumer, true)); 392 } 393 394 @Override 395 public final double sum() { 396 /* 397 * In the arrays allocated for the collect operation, index 0 398 * holds the high-order bits of the running sum, index 1 holds 399 * the low-order bits of the sum computed via compensated 400 * summation, and index 2 holds the simple sum used to compute 401 * the proper result if the stream contains infinite values of 402 * the same sign. 403 */ 404 double[] summation = collect(() -> new double[3], 405 (ll, d) -> { 406 Collectors.sumWithCompensation(ll, d); 407 ll[2] += d; 408 }, 409 (ll, rr) -> { 410 Collectors.sumWithCompensation(ll, rr[0]); 411 Collectors.sumWithCompensation(ll, rr[1]); 412 ll[2] += rr[2]; 413 }); 414 415 return Collectors.computeFinalSum(summation); 416 } 417 418 @Override 419 public final OptionalDouble min() { 420 return reduce(Math::min); 421 } 422 423 @Override 424 public final OptionalDouble max() { 425 return reduce(Math::max); 426 } 427 428 /** 429 * {@inheritDoc} 430 * 431 * @implNote The {@code double} format can represent all 432 * consecutive integers in the range -2<sup>53</sup> to 433 * 2<sup>53</sup>. If the pipeline has more than 2<sup>53</sup> 434 * values, the divisor in the average computation will saturate at 435 * 2<sup>53</sup>, leading to additional numerical errors. 436 */ 437 @Override 438 public final OptionalDouble average() { 439 /* 440 * In the arrays allocated for the collect operation, index 0 441 * holds the high-order bits of the running sum, index 1 holds 442 * the low-order bits of the sum computed via compensated 443 * summation, index 2 holds the number of values seen, index 3 444 * holds the simple sum. 445 */ 446 double[] avg = collect(() -> new double[4], 447 (ll, d) -> { 448 ll[2]++; 449 Collectors.sumWithCompensation(ll, d); 450 ll[3] += d; 451 }, 452 (ll, rr) -> { 453 Collectors.sumWithCompensation(ll, rr[0]); 454 Collectors.sumWithCompensation(ll, rr[1]); 455 ll[2] += rr[2]; 456 ll[3] += rr[3]; 457 }); 458 return avg[2] > 0 459 ? OptionalDouble.of(Collectors.computeFinalSum(avg) / avg[2]) 460 : OptionalDouble.empty(); 461 } 462 463 @Override 464 public final long count() { 465 return mapToLong(e -> 1L).sum(); 466 } 467 468 @Override 469 public final DoubleSummaryStatistics summaryStatistics() { 470 return collect(DoubleSummaryStatistics::new, DoubleSummaryStatistics::accept, 471 DoubleSummaryStatistics::combine); 472 } 473 474 @Override 475 public final double reduce(double identity, DoubleBinaryOperator op) { 476 return evaluate(ReduceOps.makeDouble(identity, op)); 477 } 478 479 @Override 480 public final OptionalDouble reduce(DoubleBinaryOperator op) { 481 return evaluate(ReduceOps.makeDouble(op)); 482 } 483 484 @Override 485 public final <R> R collect(Supplier<R> supplier, 486 ObjDoubleConsumer<R> accumulator, 487 BiConsumer<R, R> combiner) { 488 BinaryOperator<R> operator = (left, right) -> { 489 combiner.accept(left, right); 490 return left; 491 }; 492 return evaluate(ReduceOps.makeDouble(supplier, accumulator, operator)); 493 } 494 495 @Override 496 public final boolean anyMatch(DoublePredicate predicate) { 497 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ANY)); 498 } 499 500 @Override 501 public final boolean allMatch(DoublePredicate predicate) { 502 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ALL)); 503 } 504 505 @Override 506 public final boolean noneMatch(DoublePredicate predicate) { 507 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.NONE)); 508 } 509 510 @Override 511 public final OptionalDouble findFirst() { 512 return evaluate(FindOps.makeDouble(true)); 513 } 514 515 @Override 516 public final OptionalDouble findAny() { 517 return evaluate(FindOps.makeDouble(false)); 518 } 519 520 @Override 521 public final double[] toArray() { 522 return Nodes.flattenDouble((Node.OfDouble) evaluateToArrayNode(Double[]::new)) 523 .asPrimitiveArray(); 524 } 525 526 // 527 528 /** 529 * Source stage of a DoubleStream 530 * 531 * @param <E_IN> type of elements in the upstream source 532 * @hide Made public for CTS tests only (OpenJDK 8 streams tests). 533 */ 534 // Android-changed: Made public for CTS tests only. 535 public static class Head<E_IN> extends DoublePipeline<E_IN> { 536 /** 537 * Constructor for the source stage of a DoubleStream. 538 * 539 * @param source {@code Supplier<Spliterator>} describing the stream 540 * source 541 * @param sourceFlags the source flags for the stream source, described 542 * in {@link StreamOpFlag} 543 * @param parallel {@code true} if the pipeline is parallel 544 */ 545 // Android-changed: Made public for CTS tests only. 546 public Head(Supplier<? extends Spliterator<Double>> source, 547 int sourceFlags, boolean parallel) { 548 super(source, sourceFlags, parallel); 549 } 550 551 /** 552 * Constructor for the source stage of a DoubleStream. 553 * 554 * @param source {@code Spliterator} describing the stream source 555 * @param sourceFlags the source flags for the stream source, described 556 * in {@link StreamOpFlag} 557 * @param parallel {@code true} if the pipeline is parallel 558 */ 559 // Android-changed: Made public for CTS tests only. 560 public Head(Spliterator<Double> source, 561 int sourceFlags, boolean parallel) { 562 super(source, sourceFlags, parallel); 563 } 564 565 @Override 566 // Android-changed: Made public for CTS tests only. 567 public final boolean opIsStateful() { 568 throw new UnsupportedOperationException(); 569 } 570 571 @Override 572 // Android-changed: Made public for CTS tests only. 573 public final Sink<E_IN> opWrapSink(int flags, Sink<Double> sink) { 574 throw new UnsupportedOperationException(); 575 } 576 577 // Optimized sequential terminal operations for the head of the pipeline 578 579 @Override 580 public void forEach(DoubleConsumer consumer) { 581 if (!isParallel()) { 582 adapt(sourceStageSpliterator()).forEachRemaining(consumer); 583 } 584 else { 585 super.forEach(consumer); 586 } 587 } 588 589 @Override 590 public void forEachOrdered(DoubleConsumer consumer) { 591 if (!isParallel()) { 592 adapt(sourceStageSpliterator()).forEachRemaining(consumer); 593 } 594 else { 595 super.forEachOrdered(consumer); 596 } 597 } 598 599 } 600 601 /** 602 * Base class for a stateless intermediate stage of a DoubleStream. 603 * 604 * @param <E_IN> type of elements in the upstream source 605 * @since 1.8 606 * @hide Visible for CTS testing only (OpenJDK8 tests). 607 */ 608 // Android-changed: Made public for CTS tests only. 609 public abstract static class StatelessOp<E_IN> extends DoublePipeline<E_IN> { 610 /** 611 * Construct a new DoubleStream by appending a stateless intermediate 612 * operation to an existing stream. 613 * 614 * @param upstream the upstream pipeline stage 615 * @param inputShape the stream shape for the upstream pipeline stage 616 * @param opFlags operation flags for the new stage 617 */ 618 // Android-changed: Made public for CTS tests only. 619 public StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, 620 StreamShape inputShape, 621 int opFlags) { 622 super(upstream, opFlags); 623 assert upstream.getOutputShape() == inputShape; 624 } 625 626 @Override 627 // Android-changed: Make public, to match the method it's overriding. 628 public final boolean opIsStateful() { 629 return false; 630 } 631 } 632 633 /** 634 * Base class for a stateful intermediate stage of a DoubleStream. 635 * 636 * @param <E_IN> type of elements in the upstream source 637 * @since 1.8 638 * @hide Visible for CTS testing only (OpenJDK8 tests). 639 */ 640 // Android-changed: Made public for CTS tests only. 641 public abstract static class StatefulOp<E_IN> extends DoublePipeline<E_IN> { 642 /** 643 * Construct a new DoubleStream by appending a stateful intermediate 644 * operation to an existing stream. 645 * 646 * @param upstream the upstream pipeline stage 647 * @param inputShape the stream shape for the upstream pipeline stage 648 * @param opFlags operation flags for the new stage 649 */ 650 // Android-changed: Made public for CTS tests only. 651 public StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, 652 StreamShape inputShape, 653 int opFlags) { 654 super(upstream, opFlags); 655 assert upstream.getOutputShape() == inputShape; 656 } 657 658 @Override 659 // Android-changed: Make public, to match the method it's overriding. 660 public final boolean opIsStateful() { 661 return true; 662 } 663 664 @Override 665 // Android-changed: Make public, to match the method it's overriding. 666 public abstract <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, 667 Spliterator<P_IN> spliterator, 668 IntFunction<Double[]> generator); 669 } 670 } 671