1 /* 2 * Copyright (c) 2012, 2021, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Comparator; 28 import java.util.Objects; 29 import java.util.Spliterator; 30 import java.util.concurrent.ConcurrentHashMap; 31 import java.util.concurrent.atomic.AtomicLong; 32 import java.util.function.BooleanSupplier; 33 import java.util.function.Consumer; 34 import java.util.function.DoubleConsumer; 35 import java.util.function.DoubleSupplier; 36 import java.util.function.IntConsumer; 37 import java.util.function.IntSupplier; 38 import java.util.function.LongConsumer; 39 import java.util.function.LongSupplier; 40 import java.util.function.Supplier; 41 42 /** 43 * Spliterator implementations for wrapping and delegating spliterators, used 44 * in the implementation of the {@link Stream#spliterator()} method. 45 * 46 * @since 1.8 47 */ 48 class StreamSpliterators { 49 50 /** 51 * Abstract wrapping spliterator that binds to the spliterator of a 52 * pipeline helper on first operation. 53 * 54 * <p>This spliterator is not late-binding and will bind to the source 55 * spliterator when first operated on. 56 * 57 * <p>A wrapping spliterator produced from a sequential stream 58 * cannot be split if there are stateful operations present. 59 */ 60 private abstract static class AbstractWrappingSpliterator<P_IN, P_OUT, 61 T_BUFFER extends AbstractSpinedBuffer> 62 implements Spliterator<P_OUT> { 63 64 // @@@ Detect if stateful operations are present or not 65 // If not then can split otherwise cannot 66 67 /** 68 * True if this spliterator supports splitting 69 */ 70 final boolean isParallel; 71 72 final PipelineHelper<P_OUT> ph; 73 74 /** 75 * Supplier for the source spliterator. Client provides either a 76 * spliterator or a supplier. 77 */ 78 private Supplier<Spliterator<P_IN>> spliteratorSupplier; 79 80 /** 81 * Source spliterator. Either provided from client or obtained from 82 * supplier. 83 */ 84 Spliterator<P_IN> spliterator; 85 86 /** 87 * Sink chain for the downstream stages of the pipeline, ultimately 88 * leading to the buffer. Used during partial traversal. 89 */ 90 Sink<P_IN> bufferSink; 91 92 /** 93 * A function that advances one element of the spliterator, pushing 94 * it to bufferSink. Returns whether any elements were processed. 95 * Used during partial traversal. 96 */ 97 BooleanSupplier pusher; 98 99 /** Next element to consume from the buffer, used during partial traversal */ 100 long nextToConsume; 101 102 /** Buffer into which elements are pushed. Used during partial traversal. */ 103 T_BUFFER buffer; 104 105 /** 106 * True if full traversal has occurred (with possible cancellation). 107 * If doing a partial traversal, there may be still elements in buffer. 108 */ 109 boolean finished; 110 111 /** 112 * Construct an AbstractWrappingSpliterator from a 113 * {@code Supplier<Spliterator>}. 114 */ AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, Supplier<Spliterator<P_IN>> spliteratorSupplier, boolean parallel)115 AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, 116 Supplier<Spliterator<P_IN>> spliteratorSupplier, 117 boolean parallel) { 118 this.ph = ph; 119 this.spliteratorSupplier = spliteratorSupplier; 120 this.spliterator = null; 121 this.isParallel = parallel; 122 } 123 124 /** 125 * Construct an AbstractWrappingSpliterator from a 126 * {@code Spliterator}. 127 */ AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, Spliterator<P_IN> spliterator, boolean parallel)128 AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, 129 Spliterator<P_IN> spliterator, 130 boolean parallel) { 131 this.ph = ph; 132 this.spliteratorSupplier = null; 133 this.spliterator = spliterator; 134 this.isParallel = parallel; 135 } 136 137 /** 138 * Called before advancing to set up spliterator, if needed. 139 */ init()140 final void init() { 141 if (spliterator == null) { 142 spliterator = spliteratorSupplier.get(); 143 spliteratorSupplier = null; 144 } 145 } 146 147 /** 148 * Get an element from the source, pushing it into the sink chain, 149 * setting up the buffer if needed 150 * @return whether there are elements to consume from the buffer 151 */ doAdvance()152 final boolean doAdvance() { 153 if (buffer == null) { 154 if (finished) 155 return false; 156 157 init(); 158 initPartialTraversalState(); 159 nextToConsume = 0; 160 bufferSink.begin(spliterator.getExactSizeIfKnown()); 161 return fillBuffer(); 162 } 163 else { 164 ++nextToConsume; 165 boolean hasNext = nextToConsume < buffer.count(); 166 if (!hasNext) { 167 nextToConsume = 0; 168 buffer.clear(); 169 hasNext = fillBuffer(); 170 } 171 return hasNext; 172 } 173 } 174 175 /** 176 * Invokes the shape-specific constructor with the provided arguments 177 * and returns the result. 178 */ 179 abstract AbstractWrappingSpliterator<P_IN, P_OUT, ?> wrap(Spliterator<P_IN> s); 180 181 /** 182 * Initializes buffer, sink chain, and pusher for a shape-specific 183 * implementation. 184 */ 185 abstract void initPartialTraversalState(); 186 187 @Override 188 public Spliterator<P_OUT> trySplit() { 189 if (isParallel && buffer == null && !finished) { 190 init(); 191 192 Spliterator<P_IN> split = spliterator.trySplit(); 193 return (split == null) ? null : wrap(split); 194 } 195 else 196 return null; 197 } 198 199 /** 200 * If the buffer is empty, push elements into the sink chain until 201 * the source is empty or cancellation is requested. 202 * @return whether there are elements to consume from the buffer 203 */ 204 private boolean fillBuffer() { 205 while (buffer.count() == 0) { 206 if (bufferSink.cancellationRequested() || !pusher.getAsBoolean()) { 207 if (finished) 208 return false; 209 else { 210 bufferSink.end(); // might trigger more elements 211 finished = true; 212 } 213 } 214 } 215 return true; 216 } 217 218 @Override 219 public final long estimateSize() { 220 long exactSizeIfKnown = getExactSizeIfKnown(); 221 // Use the estimate of the wrapped spliterator 222 // Note this may not be accurate if there are filter/flatMap 223 // operations filtering or adding elements to the stream 224 return exactSizeIfKnown == -1 ? spliterator.estimateSize() : exactSizeIfKnown; 225 } 226 227 @Override 228 public final long getExactSizeIfKnown() { 229 init(); 230 return ph.exactOutputSizeIfKnown(spliterator); 231 } 232 233 @Override 234 public final int characteristics() { 235 init(); 236 237 // Get the characteristics from the pipeline 238 int c = StreamOpFlag.toCharacteristics(StreamOpFlag.toStreamFlags(ph.getStreamAndOpFlags())); 239 240 // Mask off the size and uniform characteristics and replace with 241 // those of the spliterator 242 // Note that a non-uniform spliterator can change from something 243 // with an exact size to an estimate for a sub-split, for example 244 // with HashSet where the size is known at the top level spliterator 245 // but for sub-splits only an estimate is known 246 if ((c & Spliterator.SIZED) != 0) { 247 c &= ~(Spliterator.SIZED | Spliterator.SUBSIZED); 248 c |= (spliterator.characteristics() & (Spliterator.SIZED | Spliterator.SUBSIZED)); 249 } 250 251 return c; 252 } 253 254 @Override 255 public Comparator<? super P_OUT> getComparator() { 256 if (!hasCharacteristics(SORTED)) 257 throw new IllegalStateException(); 258 return null; 259 } 260 261 @Override 262 public final String toString() { 263 return String.format("%s[%s]", getClass().getName(), spliterator); 264 } 265 } 266 267 static final class WrappingSpliterator<P_IN, P_OUT> 268 extends AbstractWrappingSpliterator<P_IN, P_OUT, SpinedBuffer<P_OUT>> { 269 270 WrappingSpliterator(PipelineHelper<P_OUT> ph, 271 Supplier<Spliterator<P_IN>> supplier, 272 boolean parallel) { 273 super(ph, supplier, parallel); 274 } 275 276 WrappingSpliterator(PipelineHelper<P_OUT> ph, 277 Spliterator<P_IN> spliterator, 278 boolean parallel) { 279 super(ph, spliterator, parallel); 280 } 281 282 @Override 283 WrappingSpliterator<P_IN, P_OUT> wrap(Spliterator<P_IN> s) { 284 return new WrappingSpliterator<>(ph, s, isParallel); 285 } 286 287 @Override 288 void initPartialTraversalState() { 289 SpinedBuffer<P_OUT> b = new SpinedBuffer<>(); 290 buffer = b; 291 bufferSink = ph.wrapSink(b::accept); 292 pusher = () -> spliterator.tryAdvance(bufferSink); 293 } 294 295 @Override tryAdvance(Consumer<? super P_OUT> consumer)296 public boolean tryAdvance(Consumer<? super P_OUT> consumer) { 297 Objects.requireNonNull(consumer); 298 boolean hasNext = doAdvance(); 299 if (hasNext) 300 consumer.accept(buffer.get(nextToConsume)); 301 return hasNext; 302 } 303 304 @Override forEachRemaining(Consumer<? super P_OUT> consumer)305 public void forEachRemaining(Consumer<? super P_OUT> consumer) { 306 if (buffer == null && !finished) { 307 Objects.requireNonNull(consumer); 308 init(); 309 310 ph.wrapAndCopyInto((Sink<P_OUT>) consumer::accept, spliterator); 311 finished = true; 312 } 313 else { 314 do { } while (tryAdvance(consumer)); 315 } 316 } 317 } 318 319 static final class IntWrappingSpliterator<P_IN> 320 extends AbstractWrappingSpliterator<P_IN, Integer, SpinedBuffer.OfInt> 321 implements Spliterator.OfInt { 322 IntWrappingSpliterator(PipelineHelper<Integer> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)323 IntWrappingSpliterator(PipelineHelper<Integer> ph, 324 Supplier<Spliterator<P_IN>> supplier, 325 boolean parallel) { 326 super(ph, supplier, parallel); 327 } 328 IntWrappingSpliterator(PipelineHelper<Integer> ph, Spliterator<P_IN> spliterator, boolean parallel)329 IntWrappingSpliterator(PipelineHelper<Integer> ph, 330 Spliterator<P_IN> spliterator, 331 boolean parallel) { 332 super(ph, spliterator, parallel); 333 } 334 335 @Override wrap(Spliterator<P_IN> s)336 AbstractWrappingSpliterator<P_IN, Integer, ?> wrap(Spliterator<P_IN> s) { 337 return new IntWrappingSpliterator<>(ph, s, isParallel); 338 } 339 340 @Override initPartialTraversalState()341 void initPartialTraversalState() { 342 SpinedBuffer.OfInt b = new SpinedBuffer.OfInt(); 343 buffer = b; 344 bufferSink = ph.wrapSink((Sink.OfInt) b::accept); 345 pusher = () -> spliterator.tryAdvance(bufferSink); 346 } 347 348 @Override trySplit()349 public Spliterator.OfInt trySplit() { 350 return (Spliterator.OfInt) super.trySplit(); 351 } 352 353 @Override tryAdvance(IntConsumer consumer)354 public boolean tryAdvance(IntConsumer consumer) { 355 Objects.requireNonNull(consumer); 356 boolean hasNext = doAdvance(); 357 if (hasNext) 358 consumer.accept(buffer.get(nextToConsume)); 359 return hasNext; 360 } 361 362 @Override forEachRemaining(IntConsumer consumer)363 public void forEachRemaining(IntConsumer consumer) { 364 if (buffer == null && !finished) { 365 Objects.requireNonNull(consumer); 366 init(); 367 368 ph.wrapAndCopyInto((Sink.OfInt) consumer::accept, spliterator); 369 finished = true; 370 } 371 else { 372 do { } while (tryAdvance(consumer)); 373 } 374 } 375 } 376 377 static final class LongWrappingSpliterator<P_IN> 378 extends AbstractWrappingSpliterator<P_IN, Long, SpinedBuffer.OfLong> 379 implements Spliterator.OfLong { 380 LongWrappingSpliterator(PipelineHelper<Long> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)381 LongWrappingSpliterator(PipelineHelper<Long> ph, 382 Supplier<Spliterator<P_IN>> supplier, 383 boolean parallel) { 384 super(ph, supplier, parallel); 385 } 386 LongWrappingSpliterator(PipelineHelper<Long> ph, Spliterator<P_IN> spliterator, boolean parallel)387 LongWrappingSpliterator(PipelineHelper<Long> ph, 388 Spliterator<P_IN> spliterator, 389 boolean parallel) { 390 super(ph, spliterator, parallel); 391 } 392 393 @Override wrap(Spliterator<P_IN> s)394 AbstractWrappingSpliterator<P_IN, Long, ?> wrap(Spliterator<P_IN> s) { 395 return new LongWrappingSpliterator<>(ph, s, isParallel); 396 } 397 398 @Override initPartialTraversalState()399 void initPartialTraversalState() { 400 SpinedBuffer.OfLong b = new SpinedBuffer.OfLong(); 401 buffer = b; 402 bufferSink = ph.wrapSink((Sink.OfLong) b::accept); 403 pusher = () -> spliterator.tryAdvance(bufferSink); 404 } 405 406 @Override trySplit()407 public Spliterator.OfLong trySplit() { 408 return (Spliterator.OfLong) super.trySplit(); 409 } 410 411 @Override tryAdvance(LongConsumer consumer)412 public boolean tryAdvance(LongConsumer consumer) { 413 Objects.requireNonNull(consumer); 414 boolean hasNext = doAdvance(); 415 if (hasNext) 416 consumer.accept(buffer.get(nextToConsume)); 417 return hasNext; 418 } 419 420 @Override forEachRemaining(LongConsumer consumer)421 public void forEachRemaining(LongConsumer consumer) { 422 if (buffer == null && !finished) { 423 Objects.requireNonNull(consumer); 424 init(); 425 426 ph.wrapAndCopyInto((Sink.OfLong) consumer::accept, spliterator); 427 finished = true; 428 } 429 else { 430 do { } while (tryAdvance(consumer)); 431 } 432 } 433 } 434 435 static final class DoubleWrappingSpliterator<P_IN> 436 extends AbstractWrappingSpliterator<P_IN, Double, SpinedBuffer.OfDouble> 437 implements Spliterator.OfDouble { 438 DoubleWrappingSpliterator(PipelineHelper<Double> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)439 DoubleWrappingSpliterator(PipelineHelper<Double> ph, 440 Supplier<Spliterator<P_IN>> supplier, 441 boolean parallel) { 442 super(ph, supplier, parallel); 443 } 444 DoubleWrappingSpliterator(PipelineHelper<Double> ph, Spliterator<P_IN> spliterator, boolean parallel)445 DoubleWrappingSpliterator(PipelineHelper<Double> ph, 446 Spliterator<P_IN> spliterator, 447 boolean parallel) { 448 super(ph, spliterator, parallel); 449 } 450 451 @Override wrap(Spliterator<P_IN> s)452 AbstractWrappingSpliterator<P_IN, Double, ?> wrap(Spliterator<P_IN> s) { 453 return new DoubleWrappingSpliterator<>(ph, s, isParallel); 454 } 455 456 @Override initPartialTraversalState()457 void initPartialTraversalState() { 458 SpinedBuffer.OfDouble b = new SpinedBuffer.OfDouble(); 459 buffer = b; 460 bufferSink = ph.wrapSink((Sink.OfDouble) b::accept); 461 pusher = () -> spliterator.tryAdvance(bufferSink); 462 } 463 464 @Override trySplit()465 public Spliterator.OfDouble trySplit() { 466 return (Spliterator.OfDouble) super.trySplit(); 467 } 468 469 @Override tryAdvance(DoubleConsumer consumer)470 public boolean tryAdvance(DoubleConsumer consumer) { 471 Objects.requireNonNull(consumer); 472 boolean hasNext = doAdvance(); 473 if (hasNext) 474 consumer.accept(buffer.get(nextToConsume)); 475 return hasNext; 476 } 477 478 @Override forEachRemaining(DoubleConsumer consumer)479 public void forEachRemaining(DoubleConsumer consumer) { 480 if (buffer == null && !finished) { 481 Objects.requireNonNull(consumer); 482 init(); 483 484 ph.wrapAndCopyInto((Sink.OfDouble) consumer::accept, spliterator); 485 finished = true; 486 } 487 else { 488 do { } while (tryAdvance(consumer)); 489 } 490 } 491 } 492 493 /** 494 * Spliterator implementation that delegates to an underlying spliterator, 495 * acquiring the spliterator from a {@code Supplier<Spliterator>} on the 496 * first call to any spliterator method. 497 * @param <T> 498 */ 499 static class DelegatingSpliterator<T, T_SPLITR extends Spliterator<T>> 500 implements Spliterator<T> { 501 private final Supplier<? extends T_SPLITR> supplier; 502 503 private T_SPLITR s; 504 DelegatingSpliterator(Supplier<? extends T_SPLITR> supplier)505 DelegatingSpliterator(Supplier<? extends T_SPLITR> supplier) { 506 this.supplier = supplier; 507 } 508 get()509 T_SPLITR get() { 510 if (s == null) { 511 s = supplier.get(); 512 } 513 return s; 514 } 515 516 @Override 517 @SuppressWarnings("unchecked") trySplit()518 public T_SPLITR trySplit() { 519 return (T_SPLITR) get().trySplit(); 520 } 521 522 @Override tryAdvance(Consumer<? super T> consumer)523 public boolean tryAdvance(Consumer<? super T> consumer) { 524 return get().tryAdvance(consumer); 525 } 526 527 @Override forEachRemaining(Consumer<? super T> consumer)528 public void forEachRemaining(Consumer<? super T> consumer) { 529 get().forEachRemaining(consumer); 530 } 531 532 @Override estimateSize()533 public long estimateSize() { 534 return get().estimateSize(); 535 } 536 537 @Override characteristics()538 public int characteristics() { 539 return get().characteristics(); 540 } 541 542 @Override getComparator()543 public Comparator<? super T> getComparator() { 544 return get().getComparator(); 545 } 546 547 @Override getExactSizeIfKnown()548 public long getExactSizeIfKnown() { 549 return get().getExactSizeIfKnown(); 550 } 551 552 @Override toString()553 public String toString() { 554 return getClass().getName() + "[" + get() + "]"; 555 } 556 557 static class OfPrimitive<T, T_CONS, T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>> 558 extends DelegatingSpliterator<T, T_SPLITR> 559 implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> { OfPrimitive(Supplier<? extends T_SPLITR> supplier)560 OfPrimitive(Supplier<? extends T_SPLITR> supplier) { 561 super(supplier); 562 } 563 564 @Override tryAdvance(T_CONS consumer)565 public boolean tryAdvance(T_CONS consumer) { 566 return get().tryAdvance(consumer); 567 } 568 569 @Override forEachRemaining(T_CONS consumer)570 public void forEachRemaining(T_CONS consumer) { 571 get().forEachRemaining(consumer); 572 } 573 } 574 575 @SuppressWarnings("overloads") 576 static final class OfInt 577 extends OfPrimitive<Integer, IntConsumer, Spliterator.OfInt> 578 implements Spliterator.OfInt { 579 OfInt(Supplier<Spliterator.OfInt> supplier)580 OfInt(Supplier<Spliterator.OfInt> supplier) { 581 super(supplier); 582 } 583 } 584 585 @SuppressWarnings("overloads") 586 static final class OfLong 587 extends OfPrimitive<Long, LongConsumer, Spliterator.OfLong> 588 implements Spliterator.OfLong { 589 OfLong(Supplier<Spliterator.OfLong> supplier)590 OfLong(Supplier<Spliterator.OfLong> supplier) { 591 super(supplier); 592 } 593 } 594 595 @SuppressWarnings("overloads") 596 static final class OfDouble 597 extends OfPrimitive<Double, DoubleConsumer, Spliterator.OfDouble> 598 implements Spliterator.OfDouble { 599 OfDouble(Supplier<Spliterator.OfDouble> supplier)600 OfDouble(Supplier<Spliterator.OfDouble> supplier) { 601 super(supplier); 602 } 603 } 604 } 605 606 /** 607 * A slice Spliterator from a source Spliterator that reports 608 * {@code SUBSIZED}. 609 * 610 */ 611 abstract static class SliceSpliterator<T, T_SPLITR extends Spliterator<T>> { 612 // The start index of the slice 613 final long sliceOrigin; 614 // One past the last index of the slice 615 final long sliceFence; 616 617 // The spliterator to slice 618 T_SPLITR s; 619 // current (absolute) index, modified on advance/split 620 long index; 621 // one past last (absolute) index or sliceFence, which ever is smaller 622 long fence; 623 SliceSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)624 SliceSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence) { 625 assert s.hasCharacteristics(Spliterator.SUBSIZED); 626 this.s = s; 627 this.sliceOrigin = sliceOrigin; 628 this.sliceFence = sliceFence; 629 this.index = origin; 630 this.fence = fence; 631 } 632 makeSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)633 protected abstract T_SPLITR makeSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence); 634 trySplit()635 public T_SPLITR trySplit() { 636 if (sliceOrigin >= fence) 637 return null; 638 639 if (index >= fence) 640 return null; 641 642 // Keep splitting until the left and right splits intersect with the slice 643 // thereby ensuring the size estimate decreases. 644 // This also avoids creating empty spliterators which can result in 645 // existing and additionally created F/J tasks that perform 646 // redundant work on no elements. 647 while (true) { 648 @SuppressWarnings("unchecked") 649 T_SPLITR leftSplit = (T_SPLITR) s.trySplit(); 650 if (leftSplit == null) 651 return null; 652 653 long leftSplitFenceUnbounded = index + leftSplit.estimateSize(); 654 long leftSplitFence = Math.min(leftSplitFenceUnbounded, sliceFence); 655 if (sliceOrigin >= leftSplitFence) { 656 // The left split does not intersect with, and is to the left of, the slice 657 // The right split does intersect 658 // Discard the left split and split further with the right split 659 index = leftSplitFence; 660 } 661 else if (leftSplitFence >= sliceFence) { 662 // The right split does not intersect with, and is to the right of, the slice 663 // The left split does intersect 664 // Discard the right split and split further with the left split 665 s = leftSplit; 666 fence = leftSplitFence; 667 } 668 else if (index >= sliceOrigin && leftSplitFenceUnbounded <= sliceFence) { 669 // The left split is contained within the slice, return the underlying left split 670 // Right split is contained within or intersects with the slice 671 index = leftSplitFence; 672 return leftSplit; 673 } else { 674 // The left split intersects with the slice 675 // Right split is contained within or intersects with the slice 676 return makeSpliterator(leftSplit, sliceOrigin, sliceFence, index, index = leftSplitFence); 677 } 678 } 679 } 680 estimateSize()681 public long estimateSize() { 682 return (sliceOrigin < fence) 683 ? fence - Math.max(sliceOrigin, index) : 0; 684 } 685 characteristics()686 public int characteristics() { 687 return s.characteristics(); 688 } 689 690 static final class OfRef<T> 691 extends SliceSpliterator<T, Spliterator<T>> 692 implements Spliterator<T> { 693 OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence)694 OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence) { 695 this(s, sliceOrigin, sliceFence, 0, Math.min(s.estimateSize(), sliceFence)); 696 } 697 OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence, long origin, long fence)698 private OfRef(Spliterator<T> s, 699 long sliceOrigin, long sliceFence, long origin, long fence) { 700 super(s, sliceOrigin, sliceFence, origin, fence); 701 } 702 703 @Override makeSpliterator(Spliterator<T> s, long sliceOrigin, long sliceFence, long origin, long fence)704 protected Spliterator<T> makeSpliterator(Spliterator<T> s, 705 long sliceOrigin, long sliceFence, 706 long origin, long fence) { 707 return new OfRef<>(s, sliceOrigin, sliceFence, origin, fence); 708 } 709 710 @Override tryAdvance(Consumer<? super T> action)711 public boolean tryAdvance(Consumer<? super T> action) { 712 Objects.requireNonNull(action); 713 714 if (sliceOrigin >= fence) 715 return false; 716 717 while (sliceOrigin > index) { 718 s.tryAdvance(e -> {}); 719 index++; 720 } 721 722 if (index >= fence) 723 return false; 724 725 index++; 726 return s.tryAdvance(action); 727 } 728 729 @Override forEachRemaining(Consumer<? super T> action)730 public void forEachRemaining(Consumer<? super T> action) { 731 Objects.requireNonNull(action); 732 733 if (sliceOrigin >= fence) 734 return; 735 736 if (index >= fence) 737 return; 738 739 if (index >= sliceOrigin && (index + s.estimateSize()) <= sliceFence) { 740 // The spliterator is contained within the slice 741 s.forEachRemaining(action); 742 index = fence; 743 } else { 744 // The spliterator intersects with the slice 745 while (sliceOrigin > index) { 746 s.tryAdvance(e -> {}); 747 index++; 748 } 749 // Traverse elements up to the fence 750 for (;index < fence; index++) { 751 s.tryAdvance(action); 752 } 753 } 754 } 755 } 756 757 abstract static class OfPrimitive<T, 758 T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>, 759 T_CONS> 760 extends SliceSpliterator<T, T_SPLITR> 761 implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> { 762 OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence)763 OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence) { 764 this(s, sliceOrigin, sliceFence, 0, Math.min(s.estimateSize(), sliceFence)); 765 } 766 OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)767 private OfPrimitive(T_SPLITR s, 768 long sliceOrigin, long sliceFence, long origin, long fence) { 769 super(s, sliceOrigin, sliceFence, origin, fence); 770 } 771 772 @Override tryAdvance(T_CONS action)773 public boolean tryAdvance(T_CONS action) { 774 Objects.requireNonNull(action); 775 776 if (sliceOrigin >= fence) 777 return false; 778 779 while (sliceOrigin > index) { 780 s.tryAdvance(emptyConsumer()); 781 index++; 782 } 783 784 if (index >= fence) 785 return false; 786 787 index++; 788 return s.tryAdvance(action); 789 } 790 791 @Override forEachRemaining(T_CONS action)792 public void forEachRemaining(T_CONS action) { 793 Objects.requireNonNull(action); 794 795 if (sliceOrigin >= fence) 796 return; 797 798 if (index >= fence) 799 return; 800 801 if (index >= sliceOrigin && (index + s.estimateSize()) <= sliceFence) { 802 // The spliterator is contained within the slice 803 s.forEachRemaining(action); 804 index = fence; 805 } else { 806 // The spliterator intersects with the slice 807 while (sliceOrigin > index) { 808 s.tryAdvance(emptyConsumer()); 809 index++; 810 } 811 // Traverse elements up to the fence 812 for (;index < fence; index++) { 813 s.tryAdvance(action); 814 } 815 } 816 } 817 emptyConsumer()818 protected abstract T_CONS emptyConsumer(); 819 } 820 821 @SuppressWarnings("overloads") 822 static final class OfInt extends OfPrimitive<Integer, Spliterator.OfInt, IntConsumer> 823 implements Spliterator.OfInt { OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence)824 OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence) { 825 super(s, sliceOrigin, sliceFence); 826 } 827 OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence, long origin, long fence)828 OfInt(Spliterator.OfInt s, 829 long sliceOrigin, long sliceFence, long origin, long fence) { 830 super(s, sliceOrigin, sliceFence, origin, fence); 831 } 832 833 @Override makeSpliterator(Spliterator.OfInt s, long sliceOrigin, long sliceFence, long origin, long fence)834 protected Spliterator.OfInt makeSpliterator(Spliterator.OfInt s, 835 long sliceOrigin, long sliceFence, 836 long origin, long fence) { 837 return new SliceSpliterator.OfInt(s, sliceOrigin, sliceFence, origin, fence); 838 } 839 840 @Override emptyConsumer()841 protected IntConsumer emptyConsumer() { 842 return e -> {}; 843 } 844 } 845 846 @SuppressWarnings("overloads") 847 static final class OfLong extends OfPrimitive<Long, Spliterator.OfLong, LongConsumer> 848 implements Spliterator.OfLong { OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence)849 OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence) { 850 super(s, sliceOrigin, sliceFence); 851 } 852 OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence, long origin, long fence)853 OfLong(Spliterator.OfLong s, 854 long sliceOrigin, long sliceFence, long origin, long fence) { 855 super(s, sliceOrigin, sliceFence, origin, fence); 856 } 857 858 @Override makeSpliterator(Spliterator.OfLong s, long sliceOrigin, long sliceFence, long origin, long fence)859 protected Spliterator.OfLong makeSpliterator(Spliterator.OfLong s, 860 long sliceOrigin, long sliceFence, 861 long origin, long fence) { 862 return new SliceSpliterator.OfLong(s, sliceOrigin, sliceFence, origin, fence); 863 } 864 865 @Override emptyConsumer()866 protected LongConsumer emptyConsumer() { 867 return e -> {}; 868 } 869 } 870 871 @SuppressWarnings("overloads") 872 static final class OfDouble extends OfPrimitive<Double, Spliterator.OfDouble, DoubleConsumer> 873 implements Spliterator.OfDouble { OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence)874 OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence) { 875 super(s, sliceOrigin, sliceFence); 876 } 877 OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence, long origin, long fence)878 OfDouble(Spliterator.OfDouble s, 879 long sliceOrigin, long sliceFence, long origin, long fence) { 880 super(s, sliceOrigin, sliceFence, origin, fence); 881 } 882 883 @Override makeSpliterator(Spliterator.OfDouble s, long sliceOrigin, long sliceFence, long origin, long fence)884 protected Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s, 885 long sliceOrigin, long sliceFence, 886 long origin, long fence) { 887 return new SliceSpliterator.OfDouble(s, sliceOrigin, sliceFence, origin, fence); 888 } 889 890 @Override emptyConsumer()891 protected DoubleConsumer emptyConsumer() { 892 return e -> {}; 893 } 894 } 895 } 896 897 /** 898 * A slice Spliterator that does not preserve order, if any, of a source 899 * Spliterator. 900 * 901 * Note: The source spliterator may report {@code ORDERED} since that 902 * spliterator be the result of a previous pipeline stage that was 903 * collected to a {@code Node}. It is the order of the pipeline stage 904 * that governs whether this slice spliterator is to be used or not. 905 */ 906 abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> { 907 static final int CHUNK_SIZE = 1 << 7; 908 909 // The spliterator to slice 910 protected final T_SPLITR s; 911 protected final boolean unlimited; 912 protected final int chunkSize; 913 private final long skipThreshold; 914 private final AtomicLong permits; 915 UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit)916 UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) { 917 this.s = s; 918 this.unlimited = limit < 0; 919 this.skipThreshold = limit >= 0 ? limit : 0; 920 this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE, 921 ((skip + limit) / AbstractTask.getLeafTarget()) + 1) : CHUNK_SIZE; 922 this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip); 923 } 924 UnorderedSliceSpliterator(T_SPLITR s, UnorderedSliceSpliterator<T, T_SPLITR> parent)925 UnorderedSliceSpliterator(T_SPLITR s, 926 UnorderedSliceSpliterator<T, T_SPLITR> parent) { 927 this.s = s; 928 this.unlimited = parent.unlimited; 929 this.permits = parent.permits; 930 this.skipThreshold = parent.skipThreshold; 931 this.chunkSize = parent.chunkSize; 932 } 933 934 /** 935 * Acquire permission to skip or process elements. The caller must 936 * first acquire the elements, then consult this method for guidance 937 * as to what to do with the data. 938 * 939 * <p>We use an {@code AtomicLong} to atomically maintain a counter, 940 * which is initialized as skip+limit if we are limiting, or skip only 941 * if we are not limiting. The user should consult the method 942 * {@code checkPermits()} before acquiring data elements. 943 * 944 * @param numElements the number of elements the caller has in hand 945 * @return the number of elements that should be processed; any 946 * remaining elements should be discarded. 947 */ acquirePermits(long numElements)948 protected final long acquirePermits(long numElements) { 949 long remainingPermits; 950 long grabbing; 951 // permits never increase, and don't decrease below zero 952 assert numElements > 0; 953 do { 954 remainingPermits = permits.get(); 955 if (remainingPermits == 0) 956 return unlimited ? numElements : 0; 957 grabbing = Math.min(remainingPermits, numElements); 958 } while (grabbing > 0 && 959 !permits.compareAndSet(remainingPermits, remainingPermits - grabbing)); 960 961 if (unlimited) 962 return Math.max(numElements - grabbing, 0); 963 else if (remainingPermits > skipThreshold) 964 return Math.max(grabbing - (remainingPermits - skipThreshold), 0); 965 else 966 return grabbing; 967 } 968 969 enum PermitStatus { NO_MORE, MAYBE_MORE, UNLIMITED } 970 971 /** Call to check if permits might be available before acquiring data */ permitStatus()972 protected final PermitStatus permitStatus() { 973 if (permits.get() > 0) 974 return PermitStatus.MAYBE_MORE; 975 else 976 return unlimited ? PermitStatus.UNLIMITED : PermitStatus.NO_MORE; 977 } 978 trySplit()979 public final T_SPLITR trySplit() { 980 // Stop splitting when there are no more limit permits 981 if (permits.get() == 0) 982 return null; 983 @SuppressWarnings("unchecked") 984 T_SPLITR split = (T_SPLITR) s.trySplit(); 985 return split == null ? null : makeSpliterator(split); 986 } 987 makeSpliterator(T_SPLITR s)988 protected abstract T_SPLITR makeSpliterator(T_SPLITR s); 989 estimateSize()990 public final long estimateSize() { 991 return s.estimateSize(); 992 } 993 characteristics()994 public final int characteristics() { 995 return s.characteristics() & 996 ~(Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED); 997 } 998 999 static final class OfRef<T> extends UnorderedSliceSpliterator<T, Spliterator<T>> 1000 implements Spliterator<T>, Consumer<T> { 1001 T tmpSlot; 1002 OfRef(Spliterator<T> s, long skip, long limit)1003 OfRef(Spliterator<T> s, long skip, long limit) { 1004 super(s, skip, limit); 1005 } 1006 OfRef(Spliterator<T> s, OfRef<T> parent)1007 OfRef(Spliterator<T> s, OfRef<T> parent) { 1008 super(s, parent); 1009 } 1010 1011 @Override accept(T t)1012 public final void accept(T t) { 1013 tmpSlot = t; 1014 } 1015 1016 @Override tryAdvance(Consumer<? super T> action)1017 public boolean tryAdvance(Consumer<? super T> action) { 1018 Objects.requireNonNull(action); 1019 1020 while (permitStatus() != PermitStatus.NO_MORE) { 1021 if (!s.tryAdvance(this)) 1022 return false; 1023 else if (acquirePermits(1) == 1) { 1024 action.accept(tmpSlot); 1025 tmpSlot = null; 1026 return true; 1027 } 1028 } 1029 return false; 1030 } 1031 1032 @Override forEachRemaining(Consumer<? super T> action)1033 public void forEachRemaining(Consumer<? super T> action) { 1034 Objects.requireNonNull(action); 1035 1036 ArrayBuffer.OfRef<T> sb = null; 1037 PermitStatus permitStatus; 1038 while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) { 1039 if (permitStatus == PermitStatus.MAYBE_MORE) { 1040 // Optimistically traverse elements up to a threshold of chunkSize 1041 if (sb == null) 1042 sb = new ArrayBuffer.OfRef<>(chunkSize); 1043 else 1044 sb.reset(); 1045 long permitsRequested = 0; 1046 do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize); 1047 if (permitsRequested == 0) 1048 return; 1049 sb.forEach(action, acquirePermits(permitsRequested)); 1050 } 1051 else { 1052 // Must be UNLIMITED; let 'er rip 1053 s.forEachRemaining(action); 1054 return; 1055 } 1056 } 1057 } 1058 1059 @Override makeSpliterator(Spliterator<T> s)1060 protected Spliterator<T> makeSpliterator(Spliterator<T> s) { 1061 return new UnorderedSliceSpliterator.OfRef<>(s, this); 1062 } 1063 } 1064 1065 /** 1066 * Concrete sub-types must also be an instance of type {@code T_CONS}. 1067 * 1068 * @param <T_BUFF> the type of the spined buffer. Must also be a type of 1069 * {@code T_CONS}. 1070 */ 1071 abstract static class OfPrimitive< 1072 T, 1073 T_CONS, 1074 T_BUFF extends ArrayBuffer.OfPrimitive<T_CONS>, 1075 T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>> 1076 extends UnorderedSliceSpliterator<T, T_SPLITR> 1077 implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> { OfPrimitive(T_SPLITR s, long skip, long limit)1078 OfPrimitive(T_SPLITR s, long skip, long limit) { 1079 super(s, skip, limit); 1080 } 1081 OfPrimitive(T_SPLITR s, UnorderedSliceSpliterator.OfPrimitive<T, T_CONS, T_BUFF, T_SPLITR> parent)1082 OfPrimitive(T_SPLITR s, UnorderedSliceSpliterator.OfPrimitive<T, T_CONS, T_BUFF, T_SPLITR> parent) { 1083 super(s, parent); 1084 } 1085 1086 @Override tryAdvance(T_CONS action)1087 public boolean tryAdvance(T_CONS action) { 1088 Objects.requireNonNull(action); 1089 @SuppressWarnings("unchecked") 1090 T_CONS consumer = (T_CONS) this; 1091 1092 while (permitStatus() != PermitStatus.NO_MORE) { 1093 if (!s.tryAdvance(consumer)) 1094 return false; 1095 else if (acquirePermits(1) == 1) { 1096 acceptConsumed(action); 1097 return true; 1098 } 1099 } 1100 return false; 1101 } 1102 acceptConsumed(T_CONS action)1103 protected abstract void acceptConsumed(T_CONS action); 1104 1105 @Override forEachRemaining(T_CONS action)1106 public void forEachRemaining(T_CONS action) { 1107 Objects.requireNonNull(action); 1108 1109 T_BUFF sb = null; 1110 PermitStatus permitStatus; 1111 while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) { 1112 if (permitStatus == PermitStatus.MAYBE_MORE) { 1113 // Optimistically traverse elements up to a threshold of chunkSize 1114 if (sb == null) 1115 sb = bufferCreate(chunkSize); 1116 else 1117 sb.reset(); 1118 @SuppressWarnings("unchecked") 1119 T_CONS sbc = (T_CONS) sb; 1120 long permitsRequested = 0; 1121 do { } while (s.tryAdvance(sbc) && ++permitsRequested < chunkSize); 1122 if (permitsRequested == 0) 1123 return; 1124 sb.forEach(action, acquirePermits(permitsRequested)); 1125 } 1126 else { 1127 // Must be UNLIMITED; let 'er rip 1128 s.forEachRemaining(action); 1129 return; 1130 } 1131 } 1132 } 1133 bufferCreate(int initialCapacity)1134 protected abstract T_BUFF bufferCreate(int initialCapacity); 1135 } 1136 1137 @SuppressWarnings("overloads") 1138 static final class OfInt 1139 extends OfPrimitive<Integer, IntConsumer, ArrayBuffer.OfInt, Spliterator.OfInt> 1140 implements Spliterator.OfInt, IntConsumer { 1141 1142 int tmpValue; 1143 OfInt(Spliterator.OfInt s, long skip, long limit)1144 OfInt(Spliterator.OfInt s, long skip, long limit) { 1145 super(s, skip, limit); 1146 } 1147 OfInt(Spliterator.OfInt s, UnorderedSliceSpliterator.OfInt parent)1148 OfInt(Spliterator.OfInt s, UnorderedSliceSpliterator.OfInt parent) { 1149 super(s, parent); 1150 } 1151 1152 @Override accept(int value)1153 public void accept(int value) { 1154 tmpValue = value; 1155 } 1156 1157 @Override acceptConsumed(IntConsumer action)1158 protected void acceptConsumed(IntConsumer action) { 1159 action.accept(tmpValue); 1160 } 1161 1162 @Override bufferCreate(int initialCapacity)1163 protected ArrayBuffer.OfInt bufferCreate(int initialCapacity) { 1164 return new ArrayBuffer.OfInt(initialCapacity); 1165 } 1166 1167 @Override makeSpliterator(Spliterator.OfInt s)1168 protected Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) { 1169 return new UnorderedSliceSpliterator.OfInt(s, this); 1170 } 1171 } 1172 1173 @SuppressWarnings("overloads") 1174 static final class OfLong 1175 extends OfPrimitive<Long, LongConsumer, ArrayBuffer.OfLong, Spliterator.OfLong> 1176 implements Spliterator.OfLong, LongConsumer { 1177 1178 long tmpValue; 1179 OfLong(Spliterator.OfLong s, long skip, long limit)1180 OfLong(Spliterator.OfLong s, long skip, long limit) { 1181 super(s, skip, limit); 1182 } 1183 OfLong(Spliterator.OfLong s, UnorderedSliceSpliterator.OfLong parent)1184 OfLong(Spliterator.OfLong s, UnorderedSliceSpliterator.OfLong parent) { 1185 super(s, parent); 1186 } 1187 1188 @Override accept(long value)1189 public void accept(long value) { 1190 tmpValue = value; 1191 } 1192 1193 @Override acceptConsumed(LongConsumer action)1194 protected void acceptConsumed(LongConsumer action) { 1195 action.accept(tmpValue); 1196 } 1197 1198 @Override bufferCreate(int initialCapacity)1199 protected ArrayBuffer.OfLong bufferCreate(int initialCapacity) { 1200 return new ArrayBuffer.OfLong(initialCapacity); 1201 } 1202 1203 @Override makeSpliterator(Spliterator.OfLong s)1204 protected Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) { 1205 return new UnorderedSliceSpliterator.OfLong(s, this); 1206 } 1207 } 1208 1209 @SuppressWarnings("overloads") 1210 static final class OfDouble 1211 extends OfPrimitive<Double, DoubleConsumer, ArrayBuffer.OfDouble, Spliterator.OfDouble> 1212 implements Spliterator.OfDouble, DoubleConsumer { 1213 1214 double tmpValue; 1215 OfDouble(Spliterator.OfDouble s, long skip, long limit)1216 OfDouble(Spliterator.OfDouble s, long skip, long limit) { 1217 super(s, skip, limit); 1218 } 1219 OfDouble(Spliterator.OfDouble s, UnorderedSliceSpliterator.OfDouble parent)1220 OfDouble(Spliterator.OfDouble s, UnorderedSliceSpliterator.OfDouble parent) { 1221 super(s, parent); 1222 } 1223 1224 @Override accept(double value)1225 public void accept(double value) { 1226 tmpValue = value; 1227 } 1228 1229 @Override acceptConsumed(DoubleConsumer action)1230 protected void acceptConsumed(DoubleConsumer action) { 1231 action.accept(tmpValue); 1232 } 1233 1234 @Override bufferCreate(int initialCapacity)1235 protected ArrayBuffer.OfDouble bufferCreate(int initialCapacity) { 1236 return new ArrayBuffer.OfDouble(initialCapacity); 1237 } 1238 1239 @Override makeSpliterator(Spliterator.OfDouble s)1240 protected Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) { 1241 return new UnorderedSliceSpliterator.OfDouble(s, this); 1242 } 1243 } 1244 } 1245 1246 /** 1247 * A wrapping spliterator that only reports distinct elements of the 1248 * underlying spliterator. Does not preserve size and encounter order. 1249 */ 1250 static final class DistinctSpliterator<T> implements Spliterator<T>, Consumer<T> { 1251 1252 // The value to represent null in the ConcurrentHashMap 1253 private static final Object NULL_VALUE = new Object(); 1254 1255 // The underlying spliterator 1256 private final Spliterator<T> s; 1257 1258 // ConcurrentHashMap holding distinct elements as keys 1259 private final ConcurrentHashMap<T, Boolean> seen; 1260 1261 // Temporary element, only used with tryAdvance 1262 private T tmpSlot; 1263 DistinctSpliterator(Spliterator<T> s)1264 DistinctSpliterator(Spliterator<T> s) { 1265 this(s, new ConcurrentHashMap<>()); 1266 } 1267 DistinctSpliterator(Spliterator<T> s, ConcurrentHashMap<T, Boolean> seen)1268 private DistinctSpliterator(Spliterator<T> s, ConcurrentHashMap<T, Boolean> seen) { 1269 this.s = s; 1270 this.seen = seen; 1271 } 1272 1273 @Override accept(T t)1274 public void accept(T t) { 1275 this.tmpSlot = t; 1276 } 1277 1278 @SuppressWarnings("unchecked") mapNull(T t)1279 private T mapNull(T t) { 1280 return t != null ? t : (T) NULL_VALUE; 1281 } 1282 1283 @Override tryAdvance(Consumer<? super T> action)1284 public boolean tryAdvance(Consumer<? super T> action) { 1285 while (s.tryAdvance(this)) { 1286 if (seen.putIfAbsent(mapNull(tmpSlot), Boolean.TRUE) == null) { 1287 action.accept(tmpSlot); 1288 tmpSlot = null; 1289 return true; 1290 } 1291 } 1292 return false; 1293 } 1294 1295 @Override forEachRemaining(Consumer<? super T> action)1296 public void forEachRemaining(Consumer<? super T> action) { 1297 s.forEachRemaining(t -> { 1298 if (seen.putIfAbsent(mapNull(t), Boolean.TRUE) == null) { 1299 action.accept(t); 1300 } 1301 }); 1302 } 1303 1304 @Override trySplit()1305 public Spliterator<T> trySplit() { 1306 Spliterator<T> split = s.trySplit(); 1307 return (split != null) ? new DistinctSpliterator<>(split, seen) : null; 1308 } 1309 1310 @Override estimateSize()1311 public long estimateSize() { 1312 return s.estimateSize(); 1313 } 1314 1315 @Override characteristics()1316 public int characteristics() { 1317 return (s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED | 1318 Spliterator.SORTED | Spliterator.ORDERED)) 1319 | Spliterator.DISTINCT; 1320 } 1321 1322 @Override getComparator()1323 public Comparator<? super T> getComparator() { 1324 return s.getComparator(); 1325 } 1326 } 1327 1328 /** 1329 * A Spliterator that infinitely supplies elements in no particular order. 1330 * 1331 * <p>Splitting divides the estimated size in two and stops when the 1332 * estimate size is 0. 1333 * 1334 * <p>The {@code forEachRemaining} method if invoked will never terminate. 1335 * The {@code tryAdvance} method always returns true. 1336 * 1337 */ 1338 abstract static class InfiniteSupplyingSpliterator<T> implements Spliterator<T> { 1339 long estimate; 1340 InfiniteSupplyingSpliterator(long estimate)1341 protected InfiniteSupplyingSpliterator(long estimate) { 1342 this.estimate = estimate; 1343 } 1344 1345 @Override estimateSize()1346 public long estimateSize() { 1347 return estimate; 1348 } 1349 1350 @Override characteristics()1351 public int characteristics() { 1352 return IMMUTABLE; 1353 } 1354 1355 static final class OfRef<T> extends InfiniteSupplyingSpliterator<T> { 1356 final Supplier<? extends T> s; 1357 OfRef(long size, Supplier<? extends T> s)1358 OfRef(long size, Supplier<? extends T> s) { 1359 super(size); 1360 this.s = s; 1361 } 1362 1363 @Override tryAdvance(Consumer<? super T> action)1364 public boolean tryAdvance(Consumer<? super T> action) { 1365 Objects.requireNonNull(action); 1366 1367 action.accept(s.get()); 1368 return true; 1369 } 1370 1371 @Override trySplit()1372 public Spliterator<T> trySplit() { 1373 if (estimate == 0) 1374 return null; 1375 return new InfiniteSupplyingSpliterator.OfRef<>(estimate >>>= 1, s); 1376 } 1377 } 1378 1379 static final class OfInt extends InfiniteSupplyingSpliterator<Integer> 1380 implements Spliterator.OfInt { 1381 final IntSupplier s; 1382 OfInt(long size, IntSupplier s)1383 OfInt(long size, IntSupplier s) { 1384 super(size); 1385 this.s = s; 1386 } 1387 1388 @Override tryAdvance(IntConsumer action)1389 public boolean tryAdvance(IntConsumer action) { 1390 Objects.requireNonNull(action); 1391 1392 action.accept(s.getAsInt()); 1393 return true; 1394 } 1395 1396 @Override trySplit()1397 public Spliterator.OfInt trySplit() { 1398 if (estimate == 0) 1399 return null; 1400 return new InfiniteSupplyingSpliterator.OfInt(estimate = estimate >>> 1, s); 1401 } 1402 } 1403 1404 static final class OfLong extends InfiniteSupplyingSpliterator<Long> 1405 implements Spliterator.OfLong { 1406 final LongSupplier s; 1407 OfLong(long size, LongSupplier s)1408 OfLong(long size, LongSupplier s) { 1409 super(size); 1410 this.s = s; 1411 } 1412 1413 @Override tryAdvance(LongConsumer action)1414 public boolean tryAdvance(LongConsumer action) { 1415 Objects.requireNonNull(action); 1416 1417 action.accept(s.getAsLong()); 1418 return true; 1419 } 1420 1421 @Override trySplit()1422 public Spliterator.OfLong trySplit() { 1423 if (estimate == 0) 1424 return null; 1425 return new InfiniteSupplyingSpliterator.OfLong(estimate = estimate >>> 1, s); 1426 } 1427 } 1428 1429 static final class OfDouble extends InfiniteSupplyingSpliterator<Double> 1430 implements Spliterator.OfDouble { 1431 final DoubleSupplier s; 1432 OfDouble(long size, DoubleSupplier s)1433 OfDouble(long size, DoubleSupplier s) { 1434 super(size); 1435 this.s = s; 1436 } 1437 1438 @Override tryAdvance(DoubleConsumer action)1439 public boolean tryAdvance(DoubleConsumer action) { 1440 Objects.requireNonNull(action); 1441 1442 action.accept(s.getAsDouble()); 1443 return true; 1444 } 1445 1446 @Override trySplit()1447 public Spliterator.OfDouble trySplit() { 1448 if (estimate == 0) 1449 return null; 1450 return new InfiniteSupplyingSpliterator.OfDouble(estimate = estimate >>> 1, s); 1451 } 1452 } 1453 } 1454 1455 // @@@ Consolidate with Node.Builder 1456 abstract static class ArrayBuffer { 1457 int index; 1458 reset()1459 void reset() { 1460 index = 0; 1461 } 1462 1463 static final class OfRef<T> extends ArrayBuffer implements Consumer<T> { 1464 final Object[] array; 1465 OfRef(int size)1466 OfRef(int size) { 1467 this.array = new Object[size]; 1468 } 1469 1470 @Override accept(T t)1471 public void accept(T t) { 1472 array[index++] = t; 1473 } 1474 forEach(Consumer<? super T> action, long fence)1475 public void forEach(Consumer<? super T> action, long fence) { 1476 for (int i = 0; i < fence; i++) { 1477 @SuppressWarnings("unchecked") 1478 T t = (T) array[i]; 1479 action.accept(t); 1480 } 1481 } 1482 } 1483 1484 abstract static class OfPrimitive<T_CONS> extends ArrayBuffer { 1485 int index; 1486 1487 @Override reset()1488 void reset() { 1489 index = 0; 1490 } 1491 forEach(T_CONS action, long fence)1492 abstract void forEach(T_CONS action, long fence); 1493 } 1494 1495 static final class OfInt extends OfPrimitive<IntConsumer> 1496 implements IntConsumer { 1497 final int[] array; 1498 OfInt(int size)1499 OfInt(int size) { 1500 this.array = new int[size]; 1501 } 1502 1503 @Override accept(int t)1504 public void accept(int t) { 1505 array[index++] = t; 1506 } 1507 1508 @Override forEach(IntConsumer action, long fence)1509 public void forEach(IntConsumer action, long fence) { 1510 for (int i = 0; i < fence; i++) { 1511 action.accept(array[i]); 1512 } 1513 } 1514 } 1515 1516 static final class OfLong extends OfPrimitive<LongConsumer> 1517 implements LongConsumer { 1518 final long[] array; 1519 OfLong(int size)1520 OfLong(int size) { 1521 this.array = new long[size]; 1522 } 1523 1524 @Override accept(long t)1525 public void accept(long t) { 1526 array[index++] = t; 1527 } 1528 1529 @Override forEach(LongConsumer action, long fence)1530 public void forEach(LongConsumer action, long fence) { 1531 for (int i = 0; i < fence; i++) { 1532 action.accept(array[i]); 1533 } 1534 } 1535 } 1536 1537 static final class OfDouble extends OfPrimitive<DoubleConsumer> 1538 implements DoubleConsumer { 1539 final double[] array; 1540 OfDouble(int size)1541 OfDouble(int size) { 1542 this.array = new double[size]; 1543 } 1544 1545 @Override accept(double t)1546 public void accept(double t) { 1547 array[index++] = t; 1548 } 1549 1550 @Override forEach(DoubleConsumer action, long fence)1551 void forEach(DoubleConsumer action, long fence) { 1552 for (int i = 0; i < fence; i++) { 1553 action.accept(array[i]); 1554 } 1555 } 1556 } 1557 } 1558 } 1559 1560