1 /* 2 * Copyright (c) 2013, 2014, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.DoubleSummaryStatistics; 28 import java.util.Objects; 29 import java.util.OptionalDouble; 30 import java.util.PrimitiveIterator; 31 import java.util.Spliterator; 32 import java.util.Spliterators; 33 import java.util.function.BiConsumer; 34 import java.util.function.BinaryOperator; 35 import java.util.function.DoubleBinaryOperator; 36 import java.util.function.DoubleConsumer; 37 import java.util.function.DoubleFunction; 38 import java.util.function.DoublePredicate; 39 import java.util.function.DoubleToIntFunction; 40 import java.util.function.DoubleToLongFunction; 41 import java.util.function.DoubleUnaryOperator; 42 import java.util.function.IntFunction; 43 import java.util.function.ObjDoubleConsumer; 44 import java.util.function.Supplier; 45 46 /** 47 * Abstract base class for an intermediate pipeline stage or pipeline source 48 * stage implementing whose elements are of type {@code double}. 49 * 50 * @param <E_IN> type of elements in the upstream source 51 * 52 * @since 1.8 53 * @hide Visible for CTS testing only (OpenJDK8 tests). 54 */ 55 public abstract class DoublePipeline<E_IN> 56 extends AbstractPipeline<E_IN, Double, DoubleStream> 57 implements DoubleStream { 58 59 /** 60 * Constructor for the head of a stream pipeline. 61 * 62 * @param source {@code Supplier<Spliterator>} describing the stream source 63 * @param sourceFlags the source flags for the stream source, described in 64 * {@link StreamOpFlag} 65 */ DoublePipeline(Supplier<? extends Spliterator<Double>> source, int sourceFlags, boolean parallel)66 DoublePipeline(Supplier<? extends Spliterator<Double>> source, 67 int sourceFlags, boolean parallel) { 68 super(source, sourceFlags, parallel); 69 } 70 71 /** 72 * Constructor for the head of a stream pipeline. 73 * 74 * @param source {@code Spliterator} describing the stream source 75 * @param sourceFlags the source flags for the stream source, described in 76 * {@link StreamOpFlag} 77 */ DoublePipeline(Spliterator<Double> source, int sourceFlags, boolean parallel)78 DoublePipeline(Spliterator<Double> source, 79 int sourceFlags, boolean parallel) { 80 super(source, sourceFlags, parallel); 81 } 82 83 /** 84 * Constructor for appending an intermediate operation onto an existing 85 * pipeline. 86 * 87 * @param upstream the upstream element source. 88 * @param opFlags the operation flags 89 */ DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags)90 DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) { 91 super(upstream, opFlags); 92 } 93 94 /** 95 * Adapt a {@code Sink<Double> to a {@code DoubleConsumer}, ideally simply 96 * by casting. 97 */ adapt(Sink<Double> sink)98 private static DoubleConsumer adapt(Sink<Double> sink) { 99 if (sink instanceof DoubleConsumer) { 100 return (DoubleConsumer) sink; 101 } else { 102 if (Tripwire.ENABLED) 103 Tripwire.trip(AbstractPipeline.class, 104 "using DoubleStream.adapt(Sink<Double> s)"); 105 return sink::accept; 106 } 107 } 108 109 /** 110 * Adapt a {@code Spliterator<Double>} to a {@code Spliterator.OfDouble}. 111 * 112 * @implNote 113 * The implementation attempts to cast to a Spliterator.OfDouble, and throws 114 * an exception if this cast is not possible. 115 */ adapt(Spliterator<Double> s)116 private static Spliterator.OfDouble adapt(Spliterator<Double> s) { 117 if (s instanceof Spliterator.OfDouble) { 118 return (Spliterator.OfDouble) s; 119 } else { 120 if (Tripwire.ENABLED) 121 Tripwire.trip(AbstractPipeline.class, 122 "using DoubleStream.adapt(Spliterator<Double> s)"); 123 throw new UnsupportedOperationException("DoubleStream.adapt(Spliterator<Double> s)"); 124 } 125 } 126 127 128 // Shape-specific methods 129 130 @Override getOutputShape()131 public final StreamShape getOutputShape() { 132 return StreamShape.DOUBLE_VALUE; 133 } 134 135 @Override evaluateToNode(PipelineHelper<Double> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<Double[]> generator)136 public final <P_IN> Node<Double> evaluateToNode(PipelineHelper<Double> helper, 137 Spliterator<P_IN> spliterator, 138 boolean flattenTree, 139 IntFunction<Double[]> generator) { 140 return Nodes.collectDouble(helper, spliterator, flattenTree); 141 } 142 143 @Override wrap(PipelineHelper<Double> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel)144 public final <P_IN> Spliterator<Double> wrap(PipelineHelper<Double> ph, 145 Supplier<Spliterator<P_IN>> supplier, 146 boolean isParallel) { 147 return new StreamSpliterators.DoubleWrappingSpliterator<>(ph, supplier, isParallel); 148 } 149 150 @Override 151 @SuppressWarnings("unchecked") lazySpliterator(Supplier<? extends Spliterator<Double>> supplier)152 public final Spliterator.OfDouble lazySpliterator(Supplier<? extends Spliterator<Double>> supplier) { 153 return new StreamSpliterators.DelegatingSpliterator.OfDouble((Supplier<Spliterator.OfDouble>) supplier); 154 } 155 156 @Override forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink)157 public final void forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) { 158 Spliterator.OfDouble spl = adapt(spliterator); 159 DoubleConsumer adaptedSink = adapt(sink); 160 do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)); 161 } 162 163 @Override makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator)164 public final Node.Builder<Double> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator) { 165 return Nodes.doubleBuilder(exactSizeIfKnown); 166 } 167 168 169 // DoubleStream 170 171 @Override iterator()172 public final PrimitiveIterator.OfDouble iterator() { 173 return Spliterators.iterator(spliterator()); 174 } 175 176 @Override spliterator()177 public final Spliterator.OfDouble spliterator() { 178 return adapt(super.spliterator()); 179 } 180 181 // Stateless intermediate ops from DoubleStream 182 183 @Override boxed()184 public final Stream<Double> boxed() { 185 return mapToObj(Double::valueOf); 186 } 187 188 @Override map(DoubleUnaryOperator mapper)189 public final DoubleStream map(DoubleUnaryOperator mapper) { 190 Objects.requireNonNull(mapper); 191 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 192 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 193 @Override 194 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 195 return new Sink.ChainedDouble<Double>(sink) { 196 @Override 197 public void accept(double t) { 198 downstream.accept(mapper.applyAsDouble(t)); 199 } 200 }; 201 } 202 }; 203 } 204 205 @Override 206 public final <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) { 207 Objects.requireNonNull(mapper); 208 return new ReferencePipeline.StatelessOp<Double, U>(this, StreamShape.DOUBLE_VALUE, 209 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 210 @Override 211 public Sink<Double> opWrapSink(int flags, Sink<U> sink) { 212 return new Sink.ChainedDouble<U>(sink) { 213 @Override 214 public void accept(double t) { 215 downstream.accept(mapper.apply(t)); 216 } 217 }; 218 } 219 }; 220 } 221 222 @Override 223 public final IntStream mapToInt(DoubleToIntFunction mapper) { 224 Objects.requireNonNull(mapper); 225 return new IntPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 226 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 227 @Override 228 public Sink<Double> opWrapSink(int flags, Sink<Integer> sink) { 229 return new Sink.ChainedDouble<Integer>(sink) { 230 @Override 231 public void accept(double t) { 232 downstream.accept(mapper.applyAsInt(t)); 233 } 234 }; 235 } 236 }; 237 } 238 239 @Override 240 public final LongStream mapToLong(DoubleToLongFunction mapper) { 241 Objects.requireNonNull(mapper); 242 return new LongPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 243 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 244 @Override 245 public Sink<Double> opWrapSink(int flags, Sink<Long> sink) { 246 return new Sink.ChainedDouble<Long>(sink) { 247 @Override 248 public void accept(double t) { 249 downstream.accept(mapper.applyAsLong(t)); 250 } 251 }; 252 } 253 }; 254 } 255 256 @Override 257 public final DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) { 258 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 259 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 260 @Override 261 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 262 return new Sink.ChainedDouble<Double>(sink) { 263 @Override 264 public void begin(long size) { 265 downstream.begin(-1); 266 } 267 268 @Override 269 public void accept(double t) { 270 try (DoubleStream result = mapper.apply(t)) { 271 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 272 if (result != null) 273 result.sequential().forEach(i -> downstream.accept(i)); 274 } 275 } 276 }; 277 } 278 }; 279 } 280 281 @Override 282 public DoubleStream unordered() { 283 if (!isOrdered()) 284 return this; 285 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, StreamOpFlag.NOT_ORDERED) { 286 @Override 287 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 288 return sink; 289 } 290 }; 291 } 292 293 @Override 294 public final DoubleStream filter(DoublePredicate predicate) { 295 Objects.requireNonNull(predicate); 296 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 297 StreamOpFlag.NOT_SIZED) { 298 @Override 299 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 300 return new Sink.ChainedDouble<Double>(sink) { 301 @Override 302 public void begin(long size) { 303 downstream.begin(-1); 304 } 305 306 @Override 307 public void accept(double t) { 308 if (predicate.test(t)) 309 downstream.accept(t); 310 } 311 }; 312 } 313 }; 314 } 315 316 @Override 317 public final DoubleStream peek(DoubleConsumer action) { 318 Objects.requireNonNull(action); 319 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 320 0) { 321 @Override 322 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 323 return new Sink.ChainedDouble<Double>(sink) { 324 @Override 325 public void accept(double t) { 326 action.accept(t); 327 downstream.accept(t); 328 } 329 }; 330 } 331 }; 332 } 333 334 // Stateful intermediate ops from DoubleStream 335 336 @Override 337 public final DoubleStream limit(long maxSize) { 338 if (maxSize < 0) 339 throw new IllegalArgumentException(Long.toString(maxSize)); 340 return SliceOps.makeDouble(this, (long) 0, maxSize); 341 } 342 343 @Override 344 public final DoubleStream skip(long n) { 345 if (n < 0) 346 throw new IllegalArgumentException(Long.toString(n)); 347 if (n == 0) 348 return this; 349 else { 350 long limit = -1; 351 return SliceOps.makeDouble(this, n, limit); 352 } 353 } 354 355 @Override 356 public final DoubleStream sorted() { 357 return SortedOps.makeDouble(this); 358 } 359 360 @Override 361 public final DoubleStream distinct() { 362 // While functional and quick to implement, this approach is not very efficient. 363 // An efficient version requires a double-specific map/set implementation. 364 return boxed().distinct().mapToDouble(i -> (double) i); 365 } 366 367 // Terminal ops from DoubleStream 368 369 @Override 370 public void forEach(DoubleConsumer consumer) { 371 evaluate(ForEachOps.makeDouble(consumer, false)); 372 } 373 374 @Override 375 public void forEachOrdered(DoubleConsumer consumer) { 376 evaluate(ForEachOps.makeDouble(consumer, true)); 377 } 378 379 @Override 380 public final double sum() { 381 /* 382 * In the arrays allocated for the collect operation, index 0 383 * holds the high-order bits of the running sum, index 1 holds 384 * the low-order bits of the sum computed via compensated 385 * summation, and index 2 holds the simple sum used to compute 386 * the proper result if the stream contains infinite values of 387 * the same sign. 388 */ 389 double[] summation = collect(() -> new double[3], 390 (ll, d) -> { 391 Collectors.sumWithCompensation(ll, d); 392 ll[2] += d; 393 }, 394 (ll, rr) -> { 395 Collectors.sumWithCompensation(ll, rr[0]); 396 Collectors.sumWithCompensation(ll, rr[1]); 397 ll[2] += rr[2]; 398 }); 399 400 return Collectors.computeFinalSum(summation); 401 } 402 403 @Override 404 public final OptionalDouble min() { 405 return reduce(Math::min); 406 } 407 408 @Override 409 public final OptionalDouble max() { 410 return reduce(Math::max); 411 } 412 413 /** 414 * {@inheritDoc} 415 * 416 * @implNote The {@code double} format can represent all 417 * consecutive integers in the range -2<sup>53</sup> to 418 * 2<sup>53</sup>. If the pipeline has more than 2<sup>53</sup> 419 * values, the divisor in the average computation will saturate at 420 * 2<sup>53</sup>, leading to additional numerical errors. 421 */ 422 @Override 423 public final OptionalDouble average() { 424 /* 425 * In the arrays allocated for the collect operation, index 0 426 * holds the high-order bits of the running sum, index 1 holds 427 * the low-order bits of the sum computed via compensated 428 * summation, index 2 holds the number of values seen, index 3 429 * holds the simple sum. 430 */ 431 double[] avg = collect(() -> new double[4], 432 (ll, d) -> { 433 ll[2]++; 434 Collectors.sumWithCompensation(ll, d); 435 ll[3] += d; 436 }, 437 (ll, rr) -> { 438 Collectors.sumWithCompensation(ll, rr[0]); 439 Collectors.sumWithCompensation(ll, rr[1]); 440 ll[2] += rr[2]; 441 ll[3] += rr[3]; 442 }); 443 return avg[2] > 0 444 ? OptionalDouble.of(Collectors.computeFinalSum(avg) / avg[2]) 445 : OptionalDouble.empty(); 446 } 447 448 @Override 449 public final long count() { 450 return mapToLong(e -> 1L).sum(); 451 } 452 453 @Override 454 public final DoubleSummaryStatistics summaryStatistics() { 455 return collect(DoubleSummaryStatistics::new, DoubleSummaryStatistics::accept, 456 DoubleSummaryStatistics::combine); 457 } 458 459 @Override 460 public final double reduce(double identity, DoubleBinaryOperator op) { 461 return evaluate(ReduceOps.makeDouble(identity, op)); 462 } 463 464 @Override 465 public final OptionalDouble reduce(DoubleBinaryOperator op) { 466 return evaluate(ReduceOps.makeDouble(op)); 467 } 468 469 @Override 470 public final <R> R collect(Supplier<R> supplier, 471 ObjDoubleConsumer<R> accumulator, 472 BiConsumer<R, R> combiner) { 473 BinaryOperator<R> operator = (left, right) -> { 474 combiner.accept(left, right); 475 return left; 476 }; 477 return evaluate(ReduceOps.makeDouble(supplier, accumulator, operator)); 478 } 479 480 @Override 481 public final boolean anyMatch(DoublePredicate predicate) { 482 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ANY)); 483 } 484 485 @Override 486 public final boolean allMatch(DoublePredicate predicate) { 487 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ALL)); 488 } 489 490 @Override 491 public final boolean noneMatch(DoublePredicate predicate) { 492 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.NONE)); 493 } 494 495 @Override 496 public final OptionalDouble findFirst() { 497 return evaluate(FindOps.makeDouble(true)); 498 } 499 500 @Override 501 public final OptionalDouble findAny() { 502 return evaluate(FindOps.makeDouble(false)); 503 } 504 505 @Override 506 public final double[] toArray() { 507 return Nodes.flattenDouble((Node.OfDouble) evaluateToArrayNode(Double[]::new)) 508 .asPrimitiveArray(); 509 } 510 511 // 512 513 /** 514 * Source stage of a DoubleStream 515 * 516 * @param <E_IN> type of elements in the upstream source 517 * @hide Visibility for CTS only (OpenJDK 8 streams tests). 518 */ 519 public static class Head<E_IN> extends DoublePipeline<E_IN> { 520 /** 521 * Constructor for the source stage of a DoubleStream. 522 * 523 * @param source {@code Supplier<Spliterator>} describing the stream 524 * source 525 * @param sourceFlags the source flags for the stream source, described 526 * in {@link StreamOpFlag} 527 * @param parallel {@code true} if the pipeline is parallel 528 */ 529 public Head(Supplier<? extends Spliterator<Double>> source, 530 int sourceFlags, boolean parallel) { 531 super(source, sourceFlags, parallel); 532 } 533 534 /** 535 * Constructor for the source stage of a DoubleStream. 536 * 537 * @param source {@code Spliterator} describing the stream source 538 * @param sourceFlags the source flags for the stream source, described 539 * in {@link StreamOpFlag} 540 * @param parallel {@code true} if the pipeline is parallel 541 */ 542 public Head(Spliterator<Double> source, 543 int sourceFlags, boolean parallel) { 544 super(source, sourceFlags, parallel); 545 } 546 547 @Override 548 public final boolean opIsStateful() { 549 throw new UnsupportedOperationException(); 550 } 551 552 @Override 553 public final Sink<E_IN> opWrapSink(int flags, Sink<Double> sink) { 554 throw new UnsupportedOperationException(); 555 } 556 557 // Optimized sequential terminal operations for the head of the pipeline 558 559 @Override 560 public void forEach(DoubleConsumer consumer) { 561 if (!isParallel()) { 562 adapt(sourceStageSpliterator()).forEachRemaining(consumer); 563 } 564 else { 565 super.forEach(consumer); 566 } 567 } 568 569 @Override 570 public void forEachOrdered(DoubleConsumer consumer) { 571 if (!isParallel()) { 572 adapt(sourceStageSpliterator()).forEachRemaining(consumer); 573 } 574 else { 575 super.forEachOrdered(consumer); 576 } 577 } 578 579 } 580 581 /** 582 * Base class for a stateless intermediate stage of a DoubleStream. 583 * 584 * @param <E_IN> type of elements in the upstream source 585 * @since 1.8 586 * @hide Visible for CTS testing only (OpenJDK8 tests). 587 */ 588 public abstract static class StatelessOp<E_IN> extends DoublePipeline<E_IN> { 589 /** 590 * Construct a new DoubleStream by appending a stateless intermediate 591 * operation to an existing stream. 592 * 593 * @param upstream the upstream pipeline stage 594 * @param inputShape the stream shape for the upstream pipeline stage 595 * @param opFlags operation flags for the new stage 596 */ 597 public StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, 598 StreamShape inputShape, 599 int opFlags) { 600 super(upstream, opFlags); 601 assert upstream.getOutputShape() == inputShape; 602 } 603 604 @Override 605 public final boolean opIsStateful() { 606 return false; 607 } 608 } 609 610 /** 611 * Base class for a stateful intermediate stage of a DoubleStream. 612 * 613 * @param <E_IN> type of elements in the upstream source 614 * @since 1.8 615 * @hide Visible for CTS testing only (OpenJDK8 tests). 616 */ 617 public abstract static class StatefulOp<E_IN> extends DoublePipeline<E_IN> { 618 /** 619 * Construct a new DoubleStream by appending a stateful intermediate 620 * operation to an existing stream. 621 * 622 * @param upstream the upstream pipeline stage 623 * @param inputShape the stream shape for the upstream pipeline stage 624 * @param opFlags operation flags for the new stage 625 */ 626 public StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, 627 StreamShape inputShape, 628 int opFlags) { 629 super(upstream, opFlags); 630 assert upstream.getOutputShape() == inputShape; 631 } 632 633 @Override 634 public final boolean opIsStateful() { 635 return true; 636 } 637 638 @Override 639 public abstract <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, 640 Spliterator<P_IN> spliterator, 641 IntFunction<Double[]> generator); 642 } 643 } 644