• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2012, 2021, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.Comparator;
28 import java.util.Objects;
29 import java.util.Spliterator;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.atomic.AtomicLong;
32 import java.util.function.BooleanSupplier;
33 import java.util.function.Consumer;
34 import java.util.function.DoubleConsumer;
35 import java.util.function.DoubleSupplier;
36 import java.util.function.IntConsumer;
37 import java.util.function.IntSupplier;
38 import java.util.function.LongConsumer;
39 import java.util.function.LongSupplier;
40 import java.util.function.Supplier;
41 
42 /**
43  * Spliterator implementations for wrapping and delegating spliterators, used
44  * in the implementation of the {@link Stream#spliterator()} method.
45  *
46  * @since 1.8
47  */
48 class StreamSpliterators {
49 
50     /**
51      * Abstract wrapping spliterator that binds to the spliterator of a
52      * pipeline helper on first operation.
53      *
54      * <p>This spliterator is not late-binding and will bind to the source
55      * spliterator when first operated on.
56      *
57      * <p>A wrapping spliterator produced from a sequential stream
58      * cannot be split if there are stateful operations present.
59      */
60     private abstract static class AbstractWrappingSpliterator<P_IN, P_OUT,
61                                                               T_BUFFER extends AbstractSpinedBuffer>
62             implements Spliterator<P_OUT> {
63 
64         // @@@ Detect if stateful operations are present or not
65         //     If not then can split otherwise cannot
66 
67         /**
68          * True if this spliterator supports splitting
69          */
70         final boolean isParallel;
71 
72         final PipelineHelper<P_OUT> ph;
73 
74         /**
75          * Supplier for the source spliterator.  Client provides either a
76          * spliterator or a supplier.
77          */
78         private Supplier<Spliterator<P_IN>> spliteratorSupplier;
79 
80         /**
81          * Source spliterator.  Either provided from client or obtained from
82          * supplier.
83          */
84         Spliterator<P_IN> spliterator;
85 
86         /**
87          * Sink chain for the downstream stages of the pipeline, ultimately
88          * leading to the buffer. Used during partial traversal.
89          */
90         Sink<P_IN> bufferSink;
91 
92         /**
93          * A function that advances one element of the spliterator, pushing
94          * it to bufferSink.  Returns whether any elements were processed.
95          * Used during partial traversal.
96          */
97         BooleanSupplier pusher;
98 
99         /** Next element to consume from the buffer, used during partial traversal */
100         long nextToConsume;
101 
102         /** Buffer into which elements are pushed.  Used during partial traversal. */
103         T_BUFFER buffer;
104 
105         /**
106          * True if full traversal has occurred (with possible cancellation).
107          * If doing a partial traversal, there may be still elements in buffer.
108          */
109         boolean finished;
110 
111         /**
112          * Construct an AbstractWrappingSpliterator from a
113          * {@code Supplier<Spliterator>}.
114          */
AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, Supplier<Spliterator<P_IN>> spliteratorSupplier, boolean parallel)115         AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph,
116                                     Supplier<Spliterator<P_IN>> spliteratorSupplier,
117                                     boolean parallel) {
118             this.ph = ph;
119             this.spliteratorSupplier = spliteratorSupplier;
120             this.spliterator = null;
121             this.isParallel = parallel;
122         }
123 
124         /**
125          * Construct an AbstractWrappingSpliterator from a
126          * {@code Spliterator}.
127          */
AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, Spliterator<P_IN> spliterator, boolean parallel)128         AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph,
129                                     Spliterator<P_IN> spliterator,
130                                     boolean parallel) {
131             this.ph = ph;
132             this.spliteratorSupplier = null;
133             this.spliterator = spliterator;
134             this.isParallel = parallel;
135         }
136 
137         /**
138          * Called before advancing to set up spliterator, if needed.
139          */
init()140         final void init() {
141             if (spliterator == null) {
142                 spliterator = spliteratorSupplier.get();
143                 spliteratorSupplier = null;
144             }
145         }
146 
147         /**
148          * Get an element from the source, pushing it into the sink chain,
149          * setting up the buffer if needed
150          * @return whether there are elements to consume from the buffer
151          */
doAdvance()152         final boolean doAdvance() {
153             if (buffer == null) {
154                 if (finished)
155                     return false;
156 
157                 init();
158                 initPartialTraversalState();
159                 nextToConsume = 0;
160                 bufferSink.begin(spliterator.getExactSizeIfKnown());
161                 return fillBuffer();
162             }
163             else {
164                 ++nextToConsume;
165                 boolean hasNext = nextToConsume < buffer.count();
166                 if (!hasNext) {
167                     nextToConsume = 0;
168                     buffer.clear();
169                     hasNext = fillBuffer();
170                 }
171                 return hasNext;
172             }
173         }
174 
175         /**
176          * Invokes the shape-specific constructor with the provided arguments
177          * and returns the result.
178          */
179         abstract AbstractWrappingSpliterator<P_IN, P_OUT, ?> wrap(Spliterator<P_IN> s);
180 
181         /**
182          * Initializes buffer, sink chain, and pusher for a shape-specific
183          * implementation.
184          */
185         abstract void initPartialTraversalState();
186 
187         @Override
188         public Spliterator<P_OUT> trySplit() {
189             if (isParallel && buffer == null && !finished) {
190                 init();
191 
192                 Spliterator<P_IN> split = spliterator.trySplit();
193                 return (split == null) ? null : wrap(split);
194             }
195             else
196                 return null;
197         }
198 
199         /**
200          * If the buffer is empty, push elements into the sink chain until
201          * the source is empty or cancellation is requested.
202          * @return whether there are elements to consume from the buffer
203          */
204         private boolean fillBuffer() {
205             while (buffer.count() == 0) {
206                 if (bufferSink.cancellationRequested() || !pusher.getAsBoolean()) {
207                     if (finished)
208                         return false;
209                     else {
210                         bufferSink.end(); // might trigger more elements
211                         finished = true;
212                     }
213                 }
214             }
215             return true;
216         }
217 
218         @Override
219         public final long estimateSize() {
220             long exactSizeIfKnown = getExactSizeIfKnown();
221             // Use the estimate of the wrapped spliterator
222             // Note this may not be accurate if there are filter/flatMap
223             // operations filtering or adding elements to the stream
224             return exactSizeIfKnown == -1 ? spliterator.estimateSize() : exactSizeIfKnown;
225         }
226 
227         @Override
228         public final long getExactSizeIfKnown() {
229             init();
230             return ph.exactOutputSizeIfKnown(spliterator);
231         }
232 
233         @Override
234         public final int characteristics() {
235             init();
236 
237             // Get the characteristics from the pipeline
238             int c = StreamOpFlag.toCharacteristics(StreamOpFlag.toStreamFlags(ph.getStreamAndOpFlags()));
239 
240             // Mask off the size and uniform characteristics and replace with
241             // those of the spliterator
242             // Note that a non-uniform spliterator can change from something
243             // with an exact size to an estimate for a sub-split, for example
244             // with HashSet where the size is known at the top level spliterator
245             // but for sub-splits only an estimate is known
246             if ((c & Spliterator.SIZED) != 0) {
247                 c &= ~(Spliterator.SIZED | Spliterator.SUBSIZED);
248                 c |= (spliterator.characteristics() & (Spliterator.SIZED | Spliterator.SUBSIZED));
249             }
250 
251             return c;
252         }
253 
254         @Override
255         public Comparator<? super P_OUT> getComparator() {
256             if (!hasCharacteristics(SORTED))
257                 throw new IllegalStateException();
258             return null;
259         }
260 
261         @Override
262         public final String toString() {
263             return String.format("%s[%s]", getClass().getName(), spliterator);
264         }
265     }
266 
267     static final class WrappingSpliterator<P_IN, P_OUT>
268             extends AbstractWrappingSpliterator<P_IN, P_OUT, SpinedBuffer<P_OUT>> {
269 
270         WrappingSpliterator(PipelineHelper<P_OUT> ph,
271                             Supplier<Spliterator<P_IN>> supplier,
272                             boolean parallel) {
273             super(ph, supplier, parallel);
274         }
275 
276         WrappingSpliterator(PipelineHelper<P_OUT> ph,
277                             Spliterator<P_IN> spliterator,
278                             boolean parallel) {
279             super(ph, spliterator, parallel);
280         }
281 
282         @Override
283         WrappingSpliterator<P_IN, P_OUT> wrap(Spliterator<P_IN> s) {
284             return new WrappingSpliterator<>(ph, s, isParallel);
285         }
286 
287         @Override
288         void initPartialTraversalState() {
289             SpinedBuffer<P_OUT> b = new SpinedBuffer<>();
290             buffer = b;
291             bufferSink = ph.wrapSink(b::accept);
292             pusher = () -> spliterator.tryAdvance(bufferSink);
293         }
294 
295         @Override
tryAdvance(Consumer<? super P_OUT> consumer)296         public boolean tryAdvance(Consumer<? super P_OUT> consumer) {
297             Objects.requireNonNull(consumer);
298             boolean hasNext = doAdvance();
299             if (hasNext)
300                 consumer.accept(buffer.get(nextToConsume));
301             return hasNext;
302         }
303 
304         @Override
forEachRemaining(Consumer<? super P_OUT> consumer)305         public void forEachRemaining(Consumer<? super P_OUT> consumer) {
306             if (buffer == null && !finished) {
307                 Objects.requireNonNull(consumer);
308                 init();
309 
310                 ph.wrapAndCopyInto((Sink<P_OUT>) consumer::accept, spliterator);
311                 finished = true;
312             }
313             else {
314                 do { } while (tryAdvance(consumer));
315             }
316         }
317     }
318 
319     static final class IntWrappingSpliterator<P_IN>
320             extends AbstractWrappingSpliterator<P_IN, Integer, SpinedBuffer.OfInt>
321             implements Spliterator.OfInt {
322 
IntWrappingSpliterator(PipelineHelper<Integer> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)323         IntWrappingSpliterator(PipelineHelper<Integer> ph,
324                                Supplier<Spliterator<P_IN>> supplier,
325                                boolean parallel) {
326             super(ph, supplier, parallel);
327         }
328 
IntWrappingSpliterator(PipelineHelper<Integer> ph, Spliterator<P_IN> spliterator, boolean parallel)329         IntWrappingSpliterator(PipelineHelper<Integer> ph,
330                                Spliterator<P_IN> spliterator,
331                                boolean parallel) {
332             super(ph, spliterator, parallel);
333         }
334 
335         @Override
wrap(Spliterator<P_IN> s)336         AbstractWrappingSpliterator<P_IN, Integer, ?> wrap(Spliterator<P_IN> s) {
337             return new IntWrappingSpliterator<>(ph, s, isParallel);
338         }
339 
340         @Override
initPartialTraversalState()341         void initPartialTraversalState() {
342             SpinedBuffer.OfInt b = new SpinedBuffer.OfInt();
343             buffer = b;
344             bufferSink = ph.wrapSink((Sink.OfInt) b::accept);
345             pusher = () -> spliterator.tryAdvance(bufferSink);
346         }
347 
348         @Override
trySplit()349         public Spliterator.OfInt trySplit() {
350             return (Spliterator.OfInt) super.trySplit();
351         }
352 
353         @Override
tryAdvance(IntConsumer consumer)354         public boolean tryAdvance(IntConsumer consumer) {
355             Objects.requireNonNull(consumer);
356             boolean hasNext = doAdvance();
357             if (hasNext)
358                 consumer.accept(buffer.get(nextToConsume));
359             return hasNext;
360         }
361 
362         @Override
forEachRemaining(IntConsumer consumer)363         public void forEachRemaining(IntConsumer consumer) {
364             if (buffer == null && !finished) {
365                 Objects.requireNonNull(consumer);
366                 init();
367 
368                 ph.wrapAndCopyInto((Sink.OfInt) consumer::accept, spliterator);
369                 finished = true;
370             }
371             else {
372                 do { } while (tryAdvance(consumer));
373             }
374         }
375     }
376 
377     static final class LongWrappingSpliterator<P_IN>
378             extends AbstractWrappingSpliterator<P_IN, Long, SpinedBuffer.OfLong>
379             implements Spliterator.OfLong {
380 
LongWrappingSpliterator(PipelineHelper<Long> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)381         LongWrappingSpliterator(PipelineHelper<Long> ph,
382                                 Supplier<Spliterator<P_IN>> supplier,
383                                 boolean parallel) {
384             super(ph, supplier, parallel);
385         }
386 
LongWrappingSpliterator(PipelineHelper<Long> ph, Spliterator<P_IN> spliterator, boolean parallel)387         LongWrappingSpliterator(PipelineHelper<Long> ph,
388                                 Spliterator<P_IN> spliterator,
389                                 boolean parallel) {
390             super(ph, spliterator, parallel);
391         }
392 
393         @Override
wrap(Spliterator<P_IN> s)394         AbstractWrappingSpliterator<P_IN, Long, ?> wrap(Spliterator<P_IN> s) {
395             return new LongWrappingSpliterator<>(ph, s, isParallel);
396         }
397 
398         @Override
initPartialTraversalState()399         void initPartialTraversalState() {
400             SpinedBuffer.OfLong b = new SpinedBuffer.OfLong();
401             buffer = b;
402             bufferSink = ph.wrapSink((Sink.OfLong) b::accept);
403             pusher = () -> spliterator.tryAdvance(bufferSink);
404         }
405 
406         @Override
trySplit()407         public Spliterator.OfLong trySplit() {
408             return (Spliterator.OfLong) super.trySplit();
409         }
410 
411         @Override
tryAdvance(LongConsumer consumer)412         public boolean tryAdvance(LongConsumer consumer) {
413             Objects.requireNonNull(consumer);
414             boolean hasNext = doAdvance();
415             if (hasNext)
416                 consumer.accept(buffer.get(nextToConsume));
417             return hasNext;
418         }
419 
420         @Override
forEachRemaining(LongConsumer consumer)421         public void forEachRemaining(LongConsumer consumer) {
422             if (buffer == null && !finished) {
423                 Objects.requireNonNull(consumer);
424                 init();
425 
426                 ph.wrapAndCopyInto((Sink.OfLong) consumer::accept, spliterator);
427                 finished = true;
428             }
429             else {
430                 do { } while (tryAdvance(consumer));
431             }
432         }
433     }
434 
435     static final class DoubleWrappingSpliterator<P_IN>
436             extends AbstractWrappingSpliterator<P_IN, Double, SpinedBuffer.OfDouble>
437             implements Spliterator.OfDouble {
438 
DoubleWrappingSpliterator(PipelineHelper<Double> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)439         DoubleWrappingSpliterator(PipelineHelper<Double> ph,
440                                   Supplier<Spliterator<P_IN>> supplier,
441                                   boolean parallel) {
442             super(ph, supplier, parallel);
443         }
444 
DoubleWrappingSpliterator(PipelineHelper<Double> ph, Spliterator<P_IN> spliterator, boolean parallel)445         DoubleWrappingSpliterator(PipelineHelper<Double> ph,
446                                   Spliterator<P_IN> spliterator,
447                                   boolean parallel) {
448             super(ph, spliterator, parallel);
449         }
450 
451         @Override
wrap(Spliterator<P_IN> s)452         AbstractWrappingSpliterator<P_IN, Double, ?> wrap(Spliterator<P_IN> s) {
453             return new DoubleWrappingSpliterator<>(ph, s, isParallel);
454         }
455 
456         @Override
initPartialTraversalState()457         void initPartialTraversalState() {
458             SpinedBuffer.OfDouble b = new SpinedBuffer.OfDouble();
459             buffer = b;
460             bufferSink = ph.wrapSink((Sink.OfDouble) b::accept);
461             pusher = () -> spliterator.tryAdvance(bufferSink);
462         }
463 
464         @Override
trySplit()465         public Spliterator.OfDouble trySplit() {
466             return (Spliterator.OfDouble) super.trySplit();
467         }
468 
469         @Override
tryAdvance(DoubleConsumer consumer)470         public boolean tryAdvance(DoubleConsumer consumer) {
471             Objects.requireNonNull(consumer);
472             boolean hasNext = doAdvance();
473             if (hasNext)
474                 consumer.accept(buffer.get(nextToConsume));
475             return hasNext;
476         }
477 
478         @Override
forEachRemaining(DoubleConsumer consumer)479         public void forEachRemaining(DoubleConsumer consumer) {
480             if (buffer == null && !finished) {
481                 Objects.requireNonNull(consumer);
482                 init();
483 
484                 ph.wrapAndCopyInto((Sink.OfDouble) consumer::accept, spliterator);
485                 finished = true;
486             }
487             else {
488                 do { } while (tryAdvance(consumer));
489             }
490         }
491     }
492 
493     /**
494      * Spliterator implementation that delegates to an underlying spliterator,
495      * acquiring the spliterator from a {@code Supplier<Spliterator>} on the
496      * first call to any spliterator method.
497      * @param <T>
498      */
499     static class DelegatingSpliterator<T, T_SPLITR extends Spliterator<T>>
500             implements Spliterator<T> {
501         private final Supplier<? extends T_SPLITR> supplier;
502 
503         private T_SPLITR s;
504 
DelegatingSpliterator(Supplier<? extends T_SPLITR> supplier)505         DelegatingSpliterator(Supplier<? extends T_SPLITR> supplier) {
506             this.supplier = supplier;
507         }
508 
get()509         T_SPLITR get() {
510             if (s == null) {
511                 s = supplier.get();
512             }
513             return s;
514         }
515 
516         @Override
517         @SuppressWarnings("unchecked")
trySplit()518         public T_SPLITR trySplit() {
519             return (T_SPLITR) get().trySplit();
520         }
521 
522         @Override
tryAdvance(Consumer<? super T> consumer)523         public boolean tryAdvance(Consumer<? super T> consumer) {
524             return get().tryAdvance(consumer);
525         }
526 
527         @Override
forEachRemaining(Consumer<? super T> consumer)528         public void forEachRemaining(Consumer<? super T> consumer) {
529             get().forEachRemaining(consumer);
530         }
531 
532         @Override
estimateSize()533         public long estimateSize() {
534             return get().estimateSize();
535         }
536 
537         @Override
characteristics()538         public int characteristics() {
539             return get().characteristics();
540         }
541 
542         @Override
getComparator()543         public Comparator<? super T> getComparator() {
544             return get().getComparator();
545         }
546 
547         @Override
getExactSizeIfKnown()548         public long getExactSizeIfKnown() {
549             return get().getExactSizeIfKnown();
550         }
551 
552         @Override
toString()553         public String toString() {
554             return getClass().getName() + "[" + get() + "]";
555         }
556 
557         static class OfPrimitive<T, T_CONS, T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>>
558             extends DelegatingSpliterator<T, T_SPLITR>
559             implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> {
OfPrimitive(Supplier<? extends T_SPLITR> supplier)560             OfPrimitive(Supplier<? extends T_SPLITR> supplier) {
561                 super(supplier);
562             }
563 
564             @Override
tryAdvance(T_CONS consumer)565             public boolean tryAdvance(T_CONS consumer) {
566                 return get().tryAdvance(consumer);
567             }
568 
569             @Override
forEachRemaining(T_CONS consumer)570             public void forEachRemaining(T_CONS consumer) {
571                 get().forEachRemaining(consumer);
572             }
573         }
574 
575         @SuppressWarnings("overloads")
576         static final class OfInt
577                 extends OfPrimitive<Integer, IntConsumer, Spliterator.OfInt>
578                 implements Spliterator.OfInt {
579 
OfInt(Supplier<Spliterator.OfInt> supplier)580             OfInt(Supplier<Spliterator.OfInt> supplier) {
581                 super(supplier);
582             }
583         }
584 
585         @SuppressWarnings("overloads")
586         static final class OfLong
587                 extends OfPrimitive<Long, LongConsumer, Spliterator.OfLong>
588                 implements Spliterator.OfLong {
589 
OfLong(Supplier<Spliterator.OfLong> supplier)590             OfLong(Supplier<Spliterator.OfLong> supplier) {
591                 super(supplier);
592             }
593         }
594 
595         @SuppressWarnings("overloads")
596         static final class OfDouble
597                 extends OfPrimitive<Double, DoubleConsumer, Spliterator.OfDouble>
598                 implements Spliterator.OfDouble {
599 
OfDouble(Supplier<Spliterator.OfDouble> supplier)600             OfDouble(Supplier<Spliterator.OfDouble> supplier) {
601                 super(supplier);
602             }
603         }
604     }
605 
606     /**
607      * A slice Spliterator from a source Spliterator that reports
608      * {@code SUBSIZED}.
609      *
610      */
611     abstract static class SliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
612         // The start index of the slice
613         final long sliceOrigin;
614         // One past the last index of the slice
615         final long sliceFence;
616 
617         // The spliterator to slice
618         T_SPLITR s;
619         // current (absolute) index, modified on advance/split
620         long index;
621         // one past last (absolute) index or sliceFence, which ever is smaller
622         long fence;
623 
SliceSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)624         SliceSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence) {
625             assert s.hasCharacteristics(Spliterator.SUBSIZED);
626             this.s = s;
627             this.sliceOrigin = sliceOrigin;
628             this.sliceFence = sliceFence;
629             this.index = origin;
630             this.fence = fence;
631         }
632 
makeSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)633         protected abstract T_SPLITR makeSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence);
634 
trySplit()635         public T_SPLITR trySplit() {
636             if (sliceOrigin >= fence)
637                 return null;
638 
639             if (index >= fence)
640                 return null;
641 
642             // Keep splitting until the left and right splits intersect with the slice
643             // thereby ensuring the size estimate decreases.
644             // This also avoids creating empty spliterators which can result in
645             // existing and additionally created F/J tasks that perform
646             // redundant work on no elements.
647             while (true) {
648                 @SuppressWarnings("unchecked")
649                 T_SPLITR leftSplit = (T_SPLITR) s.trySplit();
650                 if (leftSplit == null)
651                     return null;
652 
653                 long leftSplitFenceUnbounded = index + leftSplit.estimateSize();
654                 long leftSplitFence = Math.min(leftSplitFenceUnbounded, sliceFence);
655                 if (sliceOrigin >= leftSplitFence) {
656                     // The left split does not intersect with, and is to the left of, the slice
657                     // The right split does intersect
658                     // Discard the left split and split further with the right split
659                     index = leftSplitFence;
660                 }
661                 else if (leftSplitFence >= sliceFence) {
662                     // The right split does not intersect with, and is to the right of, the slice
663                     // The left split does intersect
664                     // Discard the right split and split further with the left split
665                     s = leftSplit;
666                     fence = leftSplitFence;
667                 }
668                 else if (index >= sliceOrigin && leftSplitFenceUnbounded <= sliceFence) {
669                     // The left split is contained within the slice, return the underlying left split
670                     // Right split is contained within or intersects with the slice
671                     index = leftSplitFence;
672                     return leftSplit;
673                 } else {
674                     // The left split intersects with the slice
675                     // Right split is contained within or intersects with the slice
676                     return makeSpliterator(leftSplit, sliceOrigin, sliceFence, index, index = leftSplitFence);
677                 }
678             }
679         }
680 
estimateSize()681         public long estimateSize() {
682             return (sliceOrigin < fence)
683                    ? fence - Math.max(sliceOrigin, index) : 0;
684         }
685 
characteristics()686         public int characteristics() {
687             return s.characteristics();
688         }
689 
690         static final class OfRef<T>
691                 extends SliceSpliterator<T, Spliterator<T>>
692                 implements Spliterator<T> {
693 
OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence)694             OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence) {
695                 this(s, sliceOrigin, sliceFence, 0, Math.min(s.estimateSize(), sliceFence));
696             }
697 
OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence, long origin, long fence)698             private OfRef(Spliterator<T> s,
699                           long sliceOrigin, long sliceFence, long origin, long fence) {
700                 super(s, sliceOrigin, sliceFence, origin, fence);
701             }
702 
703             @Override
makeSpliterator(Spliterator<T> s, long sliceOrigin, long sliceFence, long origin, long fence)704             protected Spliterator<T> makeSpliterator(Spliterator<T> s,
705                                                      long sliceOrigin, long sliceFence,
706                                                      long origin, long fence) {
707                 return new OfRef<>(s, sliceOrigin, sliceFence, origin, fence);
708             }
709 
710             @Override
tryAdvance(Consumer<? super T> action)711             public boolean tryAdvance(Consumer<? super T> action) {
712                 Objects.requireNonNull(action);
713 
714                 if (sliceOrigin >= fence)
715                     return false;
716 
717                 while (sliceOrigin > index) {
718                     s.tryAdvance(e -> {});
719                     index++;
720                 }
721 
722                 if (index >= fence)
723                     return false;
724 
725                 index++;
726                 return s.tryAdvance(action);
727             }
728 
729             @Override
forEachRemaining(Consumer<? super T> action)730             public void forEachRemaining(Consumer<? super T> action) {
731                 Objects.requireNonNull(action);
732 
733                 if (sliceOrigin >= fence)
734                     return;
735 
736                 if (index >= fence)
737                     return;
738 
739                 if (index >= sliceOrigin && (index + s.estimateSize()) <= sliceFence) {
740                     // The spliterator is contained within the slice
741                     s.forEachRemaining(action);
742                     index = fence;
743                 } else {
744                     // The spliterator intersects with the slice
745                     while (sliceOrigin > index) {
746                         s.tryAdvance(e -> {});
747                         index++;
748                     }
749                     // Traverse elements up to the fence
750                     for (;index < fence; index++) {
751                         s.tryAdvance(action);
752                     }
753                 }
754             }
755         }
756 
757         abstract static class OfPrimitive<T,
758                 T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>,
759                 T_CONS>
760                 extends SliceSpliterator<T, T_SPLITR>
761                 implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> {
762 
OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence)763             OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence) {
764                 this(s, sliceOrigin, sliceFence, 0, Math.min(s.estimateSize(), sliceFence));
765             }
766 
OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)767             private OfPrimitive(T_SPLITR s,
768                                 long sliceOrigin, long sliceFence, long origin, long fence) {
769                 super(s, sliceOrigin, sliceFence, origin, fence);
770             }
771 
772             @Override
tryAdvance(T_CONS action)773             public boolean tryAdvance(T_CONS action) {
774                 Objects.requireNonNull(action);
775 
776                 if (sliceOrigin >= fence)
777                     return false;
778 
779                 while (sliceOrigin > index) {
780                     s.tryAdvance(emptyConsumer());
781                     index++;
782                 }
783 
784                 if (index >= fence)
785                     return false;
786 
787                 index++;
788                 return s.tryAdvance(action);
789             }
790 
791             @Override
forEachRemaining(T_CONS action)792             public void forEachRemaining(T_CONS action) {
793                 Objects.requireNonNull(action);
794 
795                 if (sliceOrigin >= fence)
796                     return;
797 
798                 if (index >= fence)
799                     return;
800 
801                 if (index >= sliceOrigin && (index + s.estimateSize()) <= sliceFence) {
802                     // The spliterator is contained within the slice
803                     s.forEachRemaining(action);
804                     index = fence;
805                 } else {
806                     // The spliterator intersects with the slice
807                     while (sliceOrigin > index) {
808                         s.tryAdvance(emptyConsumer());
809                         index++;
810                     }
811                     // Traverse elements up to the fence
812                     for (;index < fence; index++) {
813                         s.tryAdvance(action);
814                     }
815                 }
816             }
817 
emptyConsumer()818             protected abstract T_CONS emptyConsumer();
819         }
820 
821         @SuppressWarnings("overloads")
822         static final class OfInt extends OfPrimitive<Integer, Spliterator.OfInt, IntConsumer>
823                 implements Spliterator.OfInt {
OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence)824             OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence) {
825                 super(s, sliceOrigin, sliceFence);
826             }
827 
OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence, long origin, long fence)828             OfInt(Spliterator.OfInt s,
829                   long sliceOrigin, long sliceFence, long origin, long fence) {
830                 super(s, sliceOrigin, sliceFence, origin, fence);
831             }
832 
833             @Override
makeSpliterator(Spliterator.OfInt s, long sliceOrigin, long sliceFence, long origin, long fence)834             protected Spliterator.OfInt makeSpliterator(Spliterator.OfInt s,
835                                                         long sliceOrigin, long sliceFence,
836                                                         long origin, long fence) {
837                 return new SliceSpliterator.OfInt(s, sliceOrigin, sliceFence, origin, fence);
838             }
839 
840             @Override
emptyConsumer()841             protected IntConsumer emptyConsumer() {
842                 return e -> {};
843             }
844         }
845 
846         @SuppressWarnings("overloads")
847         static final class OfLong extends OfPrimitive<Long, Spliterator.OfLong, LongConsumer>
848                 implements Spliterator.OfLong {
OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence)849             OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence) {
850                 super(s, sliceOrigin, sliceFence);
851             }
852 
OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence, long origin, long fence)853             OfLong(Spliterator.OfLong s,
854                    long sliceOrigin, long sliceFence, long origin, long fence) {
855                 super(s, sliceOrigin, sliceFence, origin, fence);
856             }
857 
858             @Override
makeSpliterator(Spliterator.OfLong s, long sliceOrigin, long sliceFence, long origin, long fence)859             protected Spliterator.OfLong makeSpliterator(Spliterator.OfLong s,
860                                                          long sliceOrigin, long sliceFence,
861                                                          long origin, long fence) {
862                 return new SliceSpliterator.OfLong(s, sliceOrigin, sliceFence, origin, fence);
863             }
864 
865             @Override
emptyConsumer()866             protected LongConsumer emptyConsumer() {
867                 return e -> {};
868             }
869         }
870 
871         @SuppressWarnings("overloads")
872         static final class OfDouble extends OfPrimitive<Double, Spliterator.OfDouble, DoubleConsumer>
873                 implements Spliterator.OfDouble {
OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence)874             OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence) {
875                 super(s, sliceOrigin, sliceFence);
876             }
877 
OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence, long origin, long fence)878             OfDouble(Spliterator.OfDouble s,
879                      long sliceOrigin, long sliceFence, long origin, long fence) {
880                 super(s, sliceOrigin, sliceFence, origin, fence);
881             }
882 
883             @Override
makeSpliterator(Spliterator.OfDouble s, long sliceOrigin, long sliceFence, long origin, long fence)884             protected Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s,
885                                                            long sliceOrigin, long sliceFence,
886                                                            long origin, long fence) {
887                 return new SliceSpliterator.OfDouble(s, sliceOrigin, sliceFence, origin, fence);
888             }
889 
890             @Override
emptyConsumer()891             protected DoubleConsumer emptyConsumer() {
892                 return e -> {};
893             }
894         }
895     }
896 
897     /**
898      * A slice Spliterator that does not preserve order, if any, of a source
899      * Spliterator.
900      *
901      * Note: The source spliterator may report {@code ORDERED} since that
902      * spliterator be the result of a previous pipeline stage that was
903      * collected to a {@code Node}. It is the order of the pipeline stage
904      * that governs whether this slice spliterator is to be used or not.
905      */
906     abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
907         static final int CHUNK_SIZE = 1 << 7;
908 
909         // The spliterator to slice
910         protected final T_SPLITR s;
911         protected final boolean unlimited;
912         protected final int chunkSize;
913         private final long skipThreshold;
914         private final AtomicLong permits;
915 
UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit)916         UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
917             this.s = s;
918             this.unlimited = limit < 0;
919             this.skipThreshold = limit >= 0 ? limit : 0;
920             this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE,
921                                                         ((skip + limit) / AbstractTask.getLeafTarget()) + 1) : CHUNK_SIZE;
922             this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
923         }
924 
UnorderedSliceSpliterator(T_SPLITR s, UnorderedSliceSpliterator<T, T_SPLITR> parent)925         UnorderedSliceSpliterator(T_SPLITR s,
926                                   UnorderedSliceSpliterator<T, T_SPLITR> parent) {
927             this.s = s;
928             this.unlimited = parent.unlimited;
929             this.permits = parent.permits;
930             this.skipThreshold = parent.skipThreshold;
931             this.chunkSize = parent.chunkSize;
932         }
933 
934         /**
935          * Acquire permission to skip or process elements.  The caller must
936          * first acquire the elements, then consult this method for guidance
937          * as to what to do with the data.
938          *
939          * <p>We use an {@code AtomicLong} to atomically maintain a counter,
940          * which is initialized as skip+limit if we are limiting, or skip only
941          * if we are not limiting.  The user should consult the method
942          * {@code checkPermits()} before acquiring data elements.
943          *
944          * @param numElements the number of elements the caller has in hand
945          * @return the number of elements that should be processed; any
946          * remaining elements should be discarded.
947          */
acquirePermits(long numElements)948         protected final long acquirePermits(long numElements) {
949             long remainingPermits;
950             long grabbing;
951             // permits never increase, and don't decrease below zero
952             assert numElements > 0;
953             do {
954                 remainingPermits = permits.get();
955                 if (remainingPermits == 0)
956                     return unlimited ? numElements : 0;
957                 grabbing = Math.min(remainingPermits, numElements);
958             } while (grabbing > 0 &&
959                      !permits.compareAndSet(remainingPermits, remainingPermits - grabbing));
960 
961             if (unlimited)
962                 return Math.max(numElements - grabbing, 0);
963             else if (remainingPermits > skipThreshold)
964                 return Math.max(grabbing - (remainingPermits - skipThreshold), 0);
965             else
966                 return grabbing;
967         }
968 
969         enum PermitStatus { NO_MORE, MAYBE_MORE, UNLIMITED }
970 
971         /** Call to check if permits might be available before acquiring data */
permitStatus()972         protected final PermitStatus permitStatus() {
973             if (permits.get() > 0)
974                 return PermitStatus.MAYBE_MORE;
975             else
976                 return unlimited ?  PermitStatus.UNLIMITED : PermitStatus.NO_MORE;
977         }
978 
trySplit()979         public final T_SPLITR trySplit() {
980             // Stop splitting when there are no more limit permits
981             if (permits.get() == 0)
982                 return null;
983             @SuppressWarnings("unchecked")
984             T_SPLITR split = (T_SPLITR) s.trySplit();
985             return split == null ? null : makeSpliterator(split);
986         }
987 
makeSpliterator(T_SPLITR s)988         protected abstract T_SPLITR makeSpliterator(T_SPLITR s);
989 
estimateSize()990         public final long estimateSize() {
991             return s.estimateSize();
992         }
993 
characteristics()994         public final int characteristics() {
995             return s.characteristics() &
996                    ~(Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED);
997         }
998 
999         static final class OfRef<T> extends UnorderedSliceSpliterator<T, Spliterator<T>>
1000                 implements Spliterator<T>, Consumer<T> {
1001             T tmpSlot;
1002 
OfRef(Spliterator<T> s, long skip, long limit)1003             OfRef(Spliterator<T> s, long skip, long limit) {
1004                 super(s, skip, limit);
1005             }
1006 
OfRef(Spliterator<T> s, OfRef<T> parent)1007             OfRef(Spliterator<T> s, OfRef<T> parent) {
1008                 super(s, parent);
1009             }
1010 
1011             @Override
accept(T t)1012             public final void accept(T t) {
1013                 tmpSlot = t;
1014             }
1015 
1016             @Override
tryAdvance(Consumer<? super T> action)1017             public boolean tryAdvance(Consumer<? super T> action) {
1018                 Objects.requireNonNull(action);
1019 
1020                 while (permitStatus() != PermitStatus.NO_MORE) {
1021                     if (!s.tryAdvance(this))
1022                         return false;
1023                     else if (acquirePermits(1) == 1) {
1024                         action.accept(tmpSlot);
1025                         tmpSlot = null;
1026                         return true;
1027                     }
1028                 }
1029                 return false;
1030             }
1031 
1032             @Override
forEachRemaining(Consumer<? super T> action)1033             public void forEachRemaining(Consumer<? super T> action) {
1034                 Objects.requireNonNull(action);
1035 
1036                 ArrayBuffer.OfRef<T> sb = null;
1037                 PermitStatus permitStatus;
1038                 while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
1039                     if (permitStatus == PermitStatus.MAYBE_MORE) {
1040                         // Optimistically traverse elements up to a threshold of chunkSize
1041                         if (sb == null)
1042                             sb = new ArrayBuffer.OfRef<>(chunkSize);
1043                         else
1044                             sb.reset();
1045                         long permitsRequested = 0;
1046                         do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize);
1047                         if (permitsRequested == 0)
1048                             return;
1049                         sb.forEach(action, acquirePermits(permitsRequested));
1050                     }
1051                     else {
1052                         // Must be UNLIMITED; let 'er rip
1053                         s.forEachRemaining(action);
1054                         return;
1055                     }
1056                 }
1057             }
1058 
1059             @Override
makeSpliterator(Spliterator<T> s)1060             protected Spliterator<T> makeSpliterator(Spliterator<T> s) {
1061                 return new UnorderedSliceSpliterator.OfRef<>(s, this);
1062             }
1063         }
1064 
1065         /**
1066          * Concrete sub-types must also be an instance of type {@code T_CONS}.
1067          *
1068          * @param <T_BUFF> the type of the spined buffer. Must also be a type of
1069          *        {@code T_CONS}.
1070          */
1071         abstract static class OfPrimitive<
1072                 T,
1073                 T_CONS,
1074                 T_BUFF extends ArrayBuffer.OfPrimitive<T_CONS>,
1075                 T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>>
1076                 extends UnorderedSliceSpliterator<T, T_SPLITR>
1077                 implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> {
OfPrimitive(T_SPLITR s, long skip, long limit)1078             OfPrimitive(T_SPLITR s, long skip, long limit) {
1079                 super(s, skip, limit);
1080             }
1081 
OfPrimitive(T_SPLITR s, UnorderedSliceSpliterator.OfPrimitive<T, T_CONS, T_BUFF, T_SPLITR> parent)1082             OfPrimitive(T_SPLITR s, UnorderedSliceSpliterator.OfPrimitive<T, T_CONS, T_BUFF, T_SPLITR> parent) {
1083                 super(s, parent);
1084             }
1085 
1086             @Override
tryAdvance(T_CONS action)1087             public boolean tryAdvance(T_CONS action) {
1088                 Objects.requireNonNull(action);
1089                 @SuppressWarnings("unchecked")
1090                 T_CONS consumer = (T_CONS) this;
1091 
1092                 while (permitStatus() != PermitStatus.NO_MORE) {
1093                     if (!s.tryAdvance(consumer))
1094                         return false;
1095                     else if (acquirePermits(1) == 1) {
1096                         acceptConsumed(action);
1097                         return true;
1098                     }
1099                 }
1100                 return false;
1101             }
1102 
acceptConsumed(T_CONS action)1103             protected abstract void acceptConsumed(T_CONS action);
1104 
1105             @Override
forEachRemaining(T_CONS action)1106             public void forEachRemaining(T_CONS action) {
1107                 Objects.requireNonNull(action);
1108 
1109                 T_BUFF sb = null;
1110                 PermitStatus permitStatus;
1111                 while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
1112                     if (permitStatus == PermitStatus.MAYBE_MORE) {
1113                         // Optimistically traverse elements up to a threshold of chunkSize
1114                         if (sb == null)
1115                             sb = bufferCreate(chunkSize);
1116                         else
1117                             sb.reset();
1118                         @SuppressWarnings("unchecked")
1119                         T_CONS sbc = (T_CONS) sb;
1120                         long permitsRequested = 0;
1121                         do { } while (s.tryAdvance(sbc) && ++permitsRequested < chunkSize);
1122                         if (permitsRequested == 0)
1123                             return;
1124                         sb.forEach(action, acquirePermits(permitsRequested));
1125                     }
1126                     else {
1127                         // Must be UNLIMITED; let 'er rip
1128                         s.forEachRemaining(action);
1129                         return;
1130                     }
1131                 }
1132             }
1133 
bufferCreate(int initialCapacity)1134             protected abstract T_BUFF bufferCreate(int initialCapacity);
1135         }
1136 
1137         @SuppressWarnings("overloads")
1138         static final class OfInt
1139                 extends OfPrimitive<Integer, IntConsumer, ArrayBuffer.OfInt, Spliterator.OfInt>
1140                 implements Spliterator.OfInt, IntConsumer {
1141 
1142             int tmpValue;
1143 
OfInt(Spliterator.OfInt s, long skip, long limit)1144             OfInt(Spliterator.OfInt s, long skip, long limit) {
1145                 super(s, skip, limit);
1146             }
1147 
OfInt(Spliterator.OfInt s, UnorderedSliceSpliterator.OfInt parent)1148             OfInt(Spliterator.OfInt s, UnorderedSliceSpliterator.OfInt parent) {
1149                 super(s, parent);
1150             }
1151 
1152             @Override
accept(int value)1153             public void accept(int value) {
1154                 tmpValue = value;
1155             }
1156 
1157             @Override
acceptConsumed(IntConsumer action)1158             protected void acceptConsumed(IntConsumer action) {
1159                 action.accept(tmpValue);
1160             }
1161 
1162             @Override
bufferCreate(int initialCapacity)1163             protected ArrayBuffer.OfInt bufferCreate(int initialCapacity) {
1164                 return new ArrayBuffer.OfInt(initialCapacity);
1165             }
1166 
1167             @Override
makeSpliterator(Spliterator.OfInt s)1168             protected Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) {
1169                 return new UnorderedSliceSpliterator.OfInt(s, this);
1170             }
1171         }
1172 
1173         @SuppressWarnings("overloads")
1174         static final class OfLong
1175                 extends OfPrimitive<Long, LongConsumer, ArrayBuffer.OfLong, Spliterator.OfLong>
1176                 implements Spliterator.OfLong, LongConsumer {
1177 
1178             long tmpValue;
1179 
OfLong(Spliterator.OfLong s, long skip, long limit)1180             OfLong(Spliterator.OfLong s, long skip, long limit) {
1181                 super(s, skip, limit);
1182             }
1183 
OfLong(Spliterator.OfLong s, UnorderedSliceSpliterator.OfLong parent)1184             OfLong(Spliterator.OfLong s, UnorderedSliceSpliterator.OfLong parent) {
1185                 super(s, parent);
1186             }
1187 
1188             @Override
accept(long value)1189             public void accept(long value) {
1190                 tmpValue = value;
1191             }
1192 
1193             @Override
acceptConsumed(LongConsumer action)1194             protected void acceptConsumed(LongConsumer action) {
1195                 action.accept(tmpValue);
1196             }
1197 
1198             @Override
bufferCreate(int initialCapacity)1199             protected ArrayBuffer.OfLong bufferCreate(int initialCapacity) {
1200                 return new ArrayBuffer.OfLong(initialCapacity);
1201             }
1202 
1203             @Override
makeSpliterator(Spliterator.OfLong s)1204             protected Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) {
1205                 return new UnorderedSliceSpliterator.OfLong(s, this);
1206             }
1207         }
1208 
1209         @SuppressWarnings("overloads")
1210         static final class OfDouble
1211                 extends OfPrimitive<Double, DoubleConsumer, ArrayBuffer.OfDouble, Spliterator.OfDouble>
1212                 implements Spliterator.OfDouble, DoubleConsumer {
1213 
1214             double tmpValue;
1215 
OfDouble(Spliterator.OfDouble s, long skip, long limit)1216             OfDouble(Spliterator.OfDouble s, long skip, long limit) {
1217                 super(s, skip, limit);
1218             }
1219 
OfDouble(Spliterator.OfDouble s, UnorderedSliceSpliterator.OfDouble parent)1220             OfDouble(Spliterator.OfDouble s, UnorderedSliceSpliterator.OfDouble parent) {
1221                 super(s, parent);
1222             }
1223 
1224             @Override
accept(double value)1225             public void accept(double value) {
1226                 tmpValue = value;
1227             }
1228 
1229             @Override
acceptConsumed(DoubleConsumer action)1230             protected void acceptConsumed(DoubleConsumer action) {
1231                 action.accept(tmpValue);
1232             }
1233 
1234             @Override
bufferCreate(int initialCapacity)1235             protected ArrayBuffer.OfDouble bufferCreate(int initialCapacity) {
1236                 return new ArrayBuffer.OfDouble(initialCapacity);
1237             }
1238 
1239             @Override
makeSpliterator(Spliterator.OfDouble s)1240             protected Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) {
1241                 return new UnorderedSliceSpliterator.OfDouble(s, this);
1242             }
1243         }
1244     }
1245 
1246     /**
1247      * A wrapping spliterator that only reports distinct elements of the
1248      * underlying spliterator. Does not preserve size and encounter order.
1249      */
1250     static final class DistinctSpliterator<T> implements Spliterator<T>, Consumer<T> {
1251 
1252         // The value to represent null in the ConcurrentHashMap
1253         private static final Object NULL_VALUE = new Object();
1254 
1255         // The underlying spliterator
1256         private final Spliterator<T> s;
1257 
1258         // ConcurrentHashMap holding distinct elements as keys
1259         private final ConcurrentHashMap<T, Boolean> seen;
1260 
1261         // Temporary element, only used with tryAdvance
1262         private T tmpSlot;
1263 
DistinctSpliterator(Spliterator<T> s)1264         DistinctSpliterator(Spliterator<T> s) {
1265             this(s, new ConcurrentHashMap<>());
1266         }
1267 
DistinctSpliterator(Spliterator<T> s, ConcurrentHashMap<T, Boolean> seen)1268         private DistinctSpliterator(Spliterator<T> s, ConcurrentHashMap<T, Boolean> seen) {
1269             this.s = s;
1270             this.seen = seen;
1271         }
1272 
1273         @Override
accept(T t)1274         public void accept(T t) {
1275             this.tmpSlot = t;
1276         }
1277 
1278         @SuppressWarnings("unchecked")
mapNull(T t)1279         private T mapNull(T t) {
1280             return t != null ? t : (T) NULL_VALUE;
1281         }
1282 
1283         @Override
tryAdvance(Consumer<? super T> action)1284         public boolean tryAdvance(Consumer<? super T> action) {
1285             while (s.tryAdvance(this)) {
1286                 if (seen.putIfAbsent(mapNull(tmpSlot), Boolean.TRUE) == null) {
1287                     action.accept(tmpSlot);
1288                     tmpSlot = null;
1289                     return true;
1290                 }
1291             }
1292             return false;
1293         }
1294 
1295         @Override
forEachRemaining(Consumer<? super T> action)1296         public void forEachRemaining(Consumer<? super T> action) {
1297             s.forEachRemaining(t -> {
1298                 if (seen.putIfAbsent(mapNull(t), Boolean.TRUE) == null) {
1299                     action.accept(t);
1300                 }
1301             });
1302         }
1303 
1304         @Override
trySplit()1305         public Spliterator<T> trySplit() {
1306             Spliterator<T> split = s.trySplit();
1307             return (split != null) ? new DistinctSpliterator<>(split, seen) : null;
1308         }
1309 
1310         @Override
estimateSize()1311         public long estimateSize() {
1312             return s.estimateSize();
1313         }
1314 
1315         @Override
characteristics()1316         public int characteristics() {
1317             return (s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED |
1318                                             Spliterator.SORTED | Spliterator.ORDERED))
1319                    | Spliterator.DISTINCT;
1320         }
1321 
1322         @Override
getComparator()1323         public Comparator<? super T> getComparator() {
1324             return s.getComparator();
1325         }
1326     }
1327 
1328     /**
1329      * A Spliterator that infinitely supplies elements in no particular order.
1330      *
1331      * <p>Splitting divides the estimated size in two and stops when the
1332      * estimate size is 0.
1333      *
1334      * <p>The {@code forEachRemaining} method if invoked will never terminate.
1335      * The {@code tryAdvance} method always returns true.
1336      *
1337      */
1338     abstract static class InfiniteSupplyingSpliterator<T> implements Spliterator<T> {
1339         long estimate;
1340 
InfiniteSupplyingSpliterator(long estimate)1341         protected InfiniteSupplyingSpliterator(long estimate) {
1342             this.estimate = estimate;
1343         }
1344 
1345         @Override
estimateSize()1346         public long estimateSize() {
1347             return estimate;
1348         }
1349 
1350         @Override
characteristics()1351         public int characteristics() {
1352             return IMMUTABLE;
1353         }
1354 
1355         static final class OfRef<T> extends InfiniteSupplyingSpliterator<T> {
1356             final Supplier<? extends T> s;
1357 
OfRef(long size, Supplier<? extends T> s)1358             OfRef(long size, Supplier<? extends T> s) {
1359                 super(size);
1360                 this.s = s;
1361             }
1362 
1363             @Override
tryAdvance(Consumer<? super T> action)1364             public boolean tryAdvance(Consumer<? super T> action) {
1365                 Objects.requireNonNull(action);
1366 
1367                 action.accept(s.get());
1368                 return true;
1369             }
1370 
1371             @Override
trySplit()1372             public Spliterator<T> trySplit() {
1373                 if (estimate == 0)
1374                     return null;
1375                 return new InfiniteSupplyingSpliterator.OfRef<>(estimate >>>= 1, s);
1376             }
1377         }
1378 
1379         static final class OfInt extends InfiniteSupplyingSpliterator<Integer>
1380                 implements Spliterator.OfInt {
1381             final IntSupplier s;
1382 
OfInt(long size, IntSupplier s)1383             OfInt(long size, IntSupplier s) {
1384                 super(size);
1385                 this.s = s;
1386             }
1387 
1388             @Override
tryAdvance(IntConsumer action)1389             public boolean tryAdvance(IntConsumer action) {
1390                 Objects.requireNonNull(action);
1391 
1392                 action.accept(s.getAsInt());
1393                 return true;
1394             }
1395 
1396             @Override
trySplit()1397             public Spliterator.OfInt trySplit() {
1398                 if (estimate == 0)
1399                     return null;
1400                 return new InfiniteSupplyingSpliterator.OfInt(estimate = estimate >>> 1, s);
1401             }
1402         }
1403 
1404         static final class OfLong extends InfiniteSupplyingSpliterator<Long>
1405                 implements Spliterator.OfLong {
1406             final LongSupplier s;
1407 
OfLong(long size, LongSupplier s)1408             OfLong(long size, LongSupplier s) {
1409                 super(size);
1410                 this.s = s;
1411             }
1412 
1413             @Override
tryAdvance(LongConsumer action)1414             public boolean tryAdvance(LongConsumer action) {
1415                 Objects.requireNonNull(action);
1416 
1417                 action.accept(s.getAsLong());
1418                 return true;
1419             }
1420 
1421             @Override
trySplit()1422             public Spliterator.OfLong trySplit() {
1423                 if (estimate == 0)
1424                     return null;
1425                 return new InfiniteSupplyingSpliterator.OfLong(estimate = estimate >>> 1, s);
1426             }
1427         }
1428 
1429         static final class OfDouble extends InfiniteSupplyingSpliterator<Double>
1430                 implements Spliterator.OfDouble {
1431             final DoubleSupplier s;
1432 
OfDouble(long size, DoubleSupplier s)1433             OfDouble(long size, DoubleSupplier s) {
1434                 super(size);
1435                 this.s = s;
1436             }
1437 
1438             @Override
tryAdvance(DoubleConsumer action)1439             public boolean tryAdvance(DoubleConsumer action) {
1440                 Objects.requireNonNull(action);
1441 
1442                 action.accept(s.getAsDouble());
1443                 return true;
1444             }
1445 
1446             @Override
trySplit()1447             public Spliterator.OfDouble trySplit() {
1448                 if (estimate == 0)
1449                     return null;
1450                 return new InfiniteSupplyingSpliterator.OfDouble(estimate = estimate >>> 1, s);
1451             }
1452         }
1453     }
1454 
1455     // @@@ Consolidate with Node.Builder
1456     abstract static class ArrayBuffer {
1457         int index;
1458 
reset()1459         void reset() {
1460             index = 0;
1461         }
1462 
1463         static final class OfRef<T> extends ArrayBuffer implements Consumer<T> {
1464             final Object[] array;
1465 
OfRef(int size)1466             OfRef(int size) {
1467                 this.array = new Object[size];
1468             }
1469 
1470             @Override
accept(T t)1471             public void accept(T t) {
1472                 array[index++] = t;
1473             }
1474 
forEach(Consumer<? super T> action, long fence)1475             public void forEach(Consumer<? super T> action, long fence) {
1476                 for (int i = 0; i < fence; i++) {
1477                     @SuppressWarnings("unchecked")
1478                     T t = (T) array[i];
1479                     action.accept(t);
1480                 }
1481             }
1482         }
1483 
1484         abstract static class OfPrimitive<T_CONS> extends ArrayBuffer {
1485             int index;
1486 
1487             @Override
reset()1488             void reset() {
1489                 index = 0;
1490             }
1491 
forEach(T_CONS action, long fence)1492             abstract void forEach(T_CONS action, long fence);
1493         }
1494 
1495         static final class OfInt extends OfPrimitive<IntConsumer>
1496                 implements IntConsumer {
1497             final int[] array;
1498 
OfInt(int size)1499             OfInt(int size) {
1500                 this.array = new int[size];
1501             }
1502 
1503             @Override
accept(int t)1504             public void accept(int t) {
1505                 array[index++] = t;
1506             }
1507 
1508             @Override
forEach(IntConsumer action, long fence)1509             public void forEach(IntConsumer action, long fence) {
1510                 for (int i = 0; i < fence; i++) {
1511                     action.accept(array[i]);
1512                 }
1513             }
1514         }
1515 
1516         static final class OfLong extends OfPrimitive<LongConsumer>
1517                 implements LongConsumer {
1518             final long[] array;
1519 
OfLong(int size)1520             OfLong(int size) {
1521                 this.array = new long[size];
1522             }
1523 
1524             @Override
accept(long t)1525             public void accept(long t) {
1526                 array[index++] = t;
1527             }
1528 
1529             @Override
forEach(LongConsumer action, long fence)1530             public void forEach(LongConsumer action, long fence) {
1531                 for (int i = 0; i < fence; i++) {
1532                     action.accept(array[i]);
1533                 }
1534             }
1535         }
1536 
1537         static final class OfDouble extends OfPrimitive<DoubleConsumer>
1538                 implements DoubleConsumer {
1539             final double[] array;
1540 
OfDouble(int size)1541             OfDouble(int size) {
1542                 this.array = new double[size];
1543             }
1544 
1545             @Override
accept(double t)1546             public void accept(double t) {
1547                 array[index++] = t;
1548             }
1549 
1550             @Override
forEach(DoubleConsumer action, long fence)1551             void forEach(DoubleConsumer action, long fence) {
1552                 for (int i = 0; i < fence; i++) {
1553                     action.accept(array[i]);
1554                 }
1555             }
1556         }
1557     }
1558 }
1559 
1560