• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.Spliterator;
28 import java.util.concurrent.CountedCompleter;
29 import java.util.function.IntFunction;
30 
31 /**
32  * Factory for instances of a short-circuiting stateful intermediate operations
33  * that produce subsequences of their input stream.
34  *
35  * @since 1.8
36  */
37 final class SliceOps {
38 
39     // No instances
SliceOps()40     private SliceOps() { }
41 
42     /**
43      * Calculates the sliced size given the current size, number of elements
44      * skip, and the number of elements to limit.
45      *
46      * @param size the current size
47      * @param skip the number of elements to skip, assumed to be >= 0
48      * @param limit the number of elements to limit, assumed to be >= 0, with
49      *        a value of {@code Long.MAX_VALUE} if there is no limit
50      * @return the sliced size
51      */
calcSize(long size, long skip, long limit)52     private static long calcSize(long size, long skip, long limit) {
53         return size >= 0 ? Math.max(-1, Math.min(size - skip, limit)) : -1;
54     }
55 
56     /**
57      * Calculates the slice fence, which is one past the index of the slice
58      * range
59      * @param skip the number of elements to skip, assumed to be >= 0
60      * @param limit the number of elements to limit, assumed to be >= 0, with
61      *        a value of {@code Long.MAX_VALUE} if there is no limit
62      * @return the slice fence.
63      */
calcSliceFence(long skip, long limit)64     private static long calcSliceFence(long skip, long limit) {
65         long sliceFence = limit >= 0 ? skip + limit : Long.MAX_VALUE;
66         // Check for overflow
67         return (sliceFence >= 0) ? sliceFence : Long.MAX_VALUE;
68     }
69 
70     /**
71      * Creates a slice spliterator given a stream shape governing the
72      * spliterator type.  Requires that the underlying Spliterator
73      * be SUBSIZED.
74      */
75     @SuppressWarnings("unchecked")
sliceSpliterator(StreamShape shape, Spliterator<P_IN> s, long skip, long limit)76     private static <P_IN> Spliterator<P_IN> sliceSpliterator(StreamShape shape,
77                                                              Spliterator<P_IN> s,
78                                                              long skip, long limit) {
79         assert s.hasCharacteristics(Spliterator.SUBSIZED);
80         long sliceFence = calcSliceFence(skip, limit);
81         switch (shape) {
82             case REFERENCE:
83                 return new StreamSpliterators
84                         .SliceSpliterator.OfRef<>(s, skip, sliceFence);
85             case INT_VALUE:
86                 return (Spliterator<P_IN>) new StreamSpliterators
87                         .SliceSpliterator.OfInt((Spliterator.OfInt) s, skip, sliceFence);
88             case LONG_VALUE:
89                 return (Spliterator<P_IN>) new StreamSpliterators
90                         .SliceSpliterator.OfLong((Spliterator.OfLong) s, skip, sliceFence);
91             case DOUBLE_VALUE:
92                 return (Spliterator<P_IN>) new StreamSpliterators
93                         .SliceSpliterator.OfDouble((Spliterator.OfDouble) s, skip, sliceFence);
94             default:
95                 throw new IllegalStateException("Unknown shape " + shape);
96         }
97     }
98 
99     @SuppressWarnings("unchecked")
castingArray()100     private static <T> IntFunction<T[]> castingArray() {
101         return size -> (T[]) new Object[size];
102     }
103 
104     /**
105      * Appends a "slice" operation to the provided stream.  The slice operation
106      * may be may be skip-only, limit-only, or skip-and-limit.
107      *
108      * @param <T> the type of both input and output elements
109      * @param upstream a reference stream with element type T
110      * @param skip the number of elements to skip.  Must be >= 0.
111      * @param limit the maximum size of the resulting stream, or -1 if no limit
112      *        is to be imposed
113      */
makeRef(AbstractPipeline<?, T, ?> upstream, long skip, long limit)114     public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
115                                         long skip, long limit) {
116         if (skip < 0)
117             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
118 
119         return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
120                                                       flags(limit)) {
121             Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s,
122                                                          long skip, long limit, long sizeIfKnown) {
123                 if (skip <= sizeIfKnown) {
124                     // Use just the limit if the number of elements
125                     // to skip is <= the known pipeline size
126                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
127                     skip = 0;
128                 }
129                 return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit);
130             }
131 
132             @Override
133             // Android-changed: Make public, to match the method it's overriding.
134             public <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
135                 long size = helper.exactOutputSizeIfKnown(spliterator);
136                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
137                     return new StreamSpliterators.SliceSpliterator.OfRef<>(
138                             helper.wrapSpliterator(spliterator),
139                             skip,
140                             calcSliceFence(skip, limit));
141                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
142                     return unorderedSkipLimitSpliterator(
143                             helper.wrapSpliterator(spliterator),
144                             skip, limit, size);
145                 }
146                 else {
147                     // @@@ OOMEs will occur for LongStream.longs().filter(i -> true).limit(n)
148                     //     regardless of the value of n
149                     //     Need to adjust the target size of splitting for the
150                     //     SliceTask from say (size / k) to say min(size / k, 1 << 14)
151                     //     This will limit the size of the buffers created at the leaf nodes
152                     //     cancellation will be more aggressive cancelling later tasks
153                     //     if the target slice size has been reached from a given task,
154                     //     cancellation should also clear local results if any
155                     return new SliceTask<>(this, helper, spliterator, castingArray(), skip, limit).
156                             invoke().spliterator();
157                 }
158             }
159 
160             @Override
161             // Android-changed: Make public, to match the method it's overriding.
162             public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
163                                               Spliterator<P_IN> spliterator,
164                                               IntFunction<T[]> generator) {
165                 long size = helper.exactOutputSizeIfKnown(spliterator);
166                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
167                     // Because the pipeline is SIZED the slice spliterator
168                     // can be created from the source, this requires matching
169                     // to shape of the source, and is potentially more efficient
170                     // than creating the slice spliterator from the pipeline
171                     // wrapping spliterator
172                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
173                     return Nodes.collect(helper, s, true, generator);
174                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
175                     Spliterator<T> s =  unorderedSkipLimitSpliterator(
176                             helper.wrapSpliterator(spliterator),
177                             skip, limit, size);
178                     // Collect using this pipeline, which is empty and therefore
179                     // can be used with the pipeline wrapping spliterator
180                     // Note that we cannot create a slice spliterator from
181                     // the source spliterator if the pipeline is not SIZED
182                     return Nodes.collect(this, s, true, generator);
183                 }
184                 else {
185                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
186                             invoke();
187                 }
188             }
189 
190             @Override
191             // Android-changed: Make public, to match the method it's overriding.
192             public Sink<T> opWrapSink(int flags, Sink<T> sink) {
193                 return new Sink.ChainedReference<T, T>(sink) {
194                     long n = skip;
195                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
196 
197                     @Override
198                     public void begin(long size) {
199                         downstream.begin(calcSize(size, skip, m));
200                     }
201 
202                     @Override
203                     public void accept(T t) {
204                         if (n == 0) {
205                             if (m > 0) {
206                                 m--;
207                                 downstream.accept(t);
208                             }
209                         }
210                         else {
211                             n--;
212                         }
213                     }
214 
215                     @Override
216                     public boolean cancellationRequested() {
217                         return m == 0 || downstream.cancellationRequested();
218                     }
219                 };
220             }
221         };
222     }
223 
224     /**
225      * Appends a "slice" operation to the provided IntStream.  The slice
226      * operation may be may be skip-only, limit-only, or skip-and-limit.
227      *
228      * @param upstream An IntStream
229      * @param skip The number of elements to skip.  Must be >= 0.
230      * @param limit The maximum size of the resulting stream, or -1 if no limit
231      *        is to be imposed
232      */
233     public static IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream,
234                                     long skip, long limit) {
235         if (skip < 0)
236             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
237 
238         return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE,
239                                                    flags(limit)) {
240             Spliterator.OfInt unorderedSkipLimitSpliterator(
241                     Spliterator.OfInt s, long skip, long limit, long sizeIfKnown) {
242                 if (skip <= sizeIfKnown) {
243                     // Use just the limit if the number of elements
244                     // to skip is <= the known pipeline size
245                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
246                     skip = 0;
247                 }
248                 return new StreamSpliterators.UnorderedSliceSpliterator.OfInt(s, skip, limit);
249             }
250 
251             @Override
252             // Android-changed: Make public, to match the method it's overriding.
253             public <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper,
254                                                                Spliterator<P_IN> spliterator) {
255                 long size = helper.exactOutputSizeIfKnown(spliterator);
256                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
257                     return new StreamSpliterators.SliceSpliterator.OfInt(
258                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
259                             skip,
260                             calcSliceFence(skip, limit));
261                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
262                     return unorderedSkipLimitSpliterator(
263                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
264                             skip, limit, size);
265                 }
266                 else {
267                     return new SliceTask<>(this, helper, spliterator, Integer[]::new, skip, limit).
268                             invoke().spliterator();
269                 }
270             }
271 
272             @Override
273             // Android-changed: Make public, to match the method it's overriding.
274             public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
275                                                     Spliterator<P_IN> spliterator,
276                                                     IntFunction<Integer[]> generator) {
277                 long size = helper.exactOutputSizeIfKnown(spliterator);
278                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
279                     // Because the pipeline is SIZED the slice spliterator
280                     // can be created from the source, this requires matching
281                     // to shape of the source, and is potentially more efficient
282                     // than creating the slice spliterator from the pipeline
283                     // wrapping spliterator
284                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
285                     return Nodes.collectInt(helper, s, true);
286                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
287                     Spliterator.OfInt s =  unorderedSkipLimitSpliterator(
288                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
289                             skip, limit, size);
290                     // Collect using this pipeline, which is empty and therefore
291                     // can be used with the pipeline wrapping spliterator
292                     // Note that we cannot create a slice spliterator from
293                     // the source spliterator if the pipeline is not SIZED
294                     return Nodes.collectInt(this, s, true);
295                 }
296                 else {
297                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
298                             invoke();
299                 }
300             }
301 
302             @Override
303             // Android-changed: Make public, to match the method it's overriding.
304             public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
305                 return new Sink.ChainedInt<Integer>(sink) {
306                     long n = skip;
307                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
308 
309                     @Override
310                     public void begin(long size) {
311                         downstream.begin(calcSize(size, skip, m));
312                     }
313 
314                     @Override
315                     public void accept(int t) {
316                         if (n == 0) {
317                             if (m > 0) {
318                                 m--;
319                                 downstream.accept(t);
320                             }
321                         }
322                         else {
323                             n--;
324                         }
325                     }
326 
327                     @Override
328                     public boolean cancellationRequested() {
329                         return m == 0 || downstream.cancellationRequested();
330                     }
331                 };
332             }
333         };
334     }
335 
336     /**
337      * Appends a "slice" operation to the provided LongStream.  The slice
338      * operation may be may be skip-only, limit-only, or skip-and-limit.
339      *
340      * @param upstream A LongStream
341      * @param skip The number of elements to skip.  Must be >= 0.
342      * @param limit The maximum size of the resulting stream, or -1 if no limit
343      *        is to be imposed
344      */
345     public static LongStream makeLong(AbstractPipeline<?, Long, ?> upstream,
346                                       long skip, long limit) {
347         if (skip < 0)
348             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
349 
350         return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE,
351                                                  flags(limit)) {
352             Spliterator.OfLong unorderedSkipLimitSpliterator(
353                     Spliterator.OfLong s, long skip, long limit, long sizeIfKnown) {
354                 if (skip <= sizeIfKnown) {
355                     // Use just the limit if the number of elements
356                     // to skip is <= the known pipeline size
357                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
358                     skip = 0;
359                 }
360                 return new StreamSpliterators.UnorderedSliceSpliterator.OfLong(s, skip, limit);
361             }
362 
363             @Override
364             // Android-changed: Make public, to match the method it's overriding.
365             public <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper,
366                                                             Spliterator<P_IN> spliterator) {
367                 long size = helper.exactOutputSizeIfKnown(spliterator);
368                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
369                     return new StreamSpliterators.SliceSpliterator.OfLong(
370                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
371                             skip,
372                             calcSliceFence(skip, limit));
373                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
374                     return unorderedSkipLimitSpliterator(
375                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
376                             skip, limit, size);
377                 }
378                 else {
379                     return new SliceTask<>(this, helper, spliterator, Long[]::new, skip, limit).
380                             invoke().spliterator();
381                 }
382             }
383 
384             @Override
385             // Android-changed: Make public, to match the method it's overriding.
386             public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
387                                                  Spliterator<P_IN> spliterator,
388                                                  IntFunction<Long[]> generator) {
389                 long size = helper.exactOutputSizeIfKnown(spliterator);
390                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
391                     // Because the pipeline is SIZED the slice spliterator
392                     // can be created from the source, this requires matching
393                     // to shape of the source, and is potentially more efficient
394                     // than creating the slice spliterator from the pipeline
395                     // wrapping spliterator
396                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
397                     return Nodes.collectLong(helper, s, true);
398                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
399                     Spliterator.OfLong s =  unorderedSkipLimitSpliterator(
400                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
401                             skip, limit, size);
402                     // Collect using this pipeline, which is empty and therefore
403                     // can be used with the pipeline wrapping spliterator
404                     // Note that we cannot create a slice spliterator from
405                     // the source spliterator if the pipeline is not SIZED
406                     return Nodes.collectLong(this, s, true);
407                 }
408                 else {
409                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
410                             invoke();
411                 }
412             }
413 
414             @Override
415             // Android-changed: Make public, to match the method it's overriding.
416             public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
417                 return new Sink.ChainedLong<Long>(sink) {
418                     long n = skip;
419                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
420 
421                     @Override
422                     public void begin(long size) {
423                         downstream.begin(calcSize(size, skip, m));
424                     }
425 
426                     @Override
427                     public void accept(long t) {
428                         if (n == 0) {
429                             if (m > 0) {
430                                 m--;
431                                 downstream.accept(t);
432                             }
433                         }
434                         else {
435                             n--;
436                         }
437                     }
438 
439                     @Override
440                     public boolean cancellationRequested() {
441                         return m == 0 || downstream.cancellationRequested();
442                     }
443                 };
444             }
445         };
446     }
447 
448     /**
449      * Appends a "slice" operation to the provided DoubleStream.  The slice
450      * operation may be may be skip-only, limit-only, or skip-and-limit.
451      *
452      * @param upstream A DoubleStream
453      * @param skip The number of elements to skip.  Must be >= 0.
454      * @param limit The maximum size of the resulting stream, or -1 if no limit
455      *        is to be imposed
456      */
457     public static DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream,
458                                           long skip, long limit) {
459         if (skip < 0)
460             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
461 
462         return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE,
463                                                      flags(limit)) {
464             Spliterator.OfDouble unorderedSkipLimitSpliterator(
465                     Spliterator.OfDouble s, long skip, long limit, long sizeIfKnown) {
466                 if (skip <= sizeIfKnown) {
467                     // Use just the limit if the number of elements
468                     // to skip is <= the known pipeline size
469                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
470                     skip = 0;
471                 }
472                 return new StreamSpliterators.UnorderedSliceSpliterator.OfDouble(s, skip, limit);
473             }
474 
475             @Override
476             // Android-changed: Make public, to match the method it's overriding.
477             public <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper,
478                                                               Spliterator<P_IN> spliterator) {
479                 long size = helper.exactOutputSizeIfKnown(spliterator);
480                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
481                     return new StreamSpliterators.SliceSpliterator.OfDouble(
482                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
483                             skip,
484                             calcSliceFence(skip, limit));
485                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
486                     return unorderedSkipLimitSpliterator(
487                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
488                             skip, limit, size);
489                 }
490                 else {
491                     return new SliceTask<>(this, helper, spliterator, Double[]::new, skip, limit).
492                             invoke().spliterator();
493                 }
494             }
495 
496             @Override
497             // Android-changed: Make public, to match the method it's overriding.
498             public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
499                                                    Spliterator<P_IN> spliterator,
500                                                    IntFunction<Double[]> generator) {
501                 long size = helper.exactOutputSizeIfKnown(spliterator);
502                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
503                     // Because the pipeline is SIZED the slice spliterator
504                     // can be created from the source, this requires matching
505                     // to shape of the source, and is potentially more efficient
506                     // than creating the slice spliterator from the pipeline
507                     // wrapping spliterator
508                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
509                     return Nodes.collectDouble(helper, s, true);
510                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
511                     Spliterator.OfDouble s =  unorderedSkipLimitSpliterator(
512                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
513                             skip, limit, size);
514                     // Collect using this pipeline, which is empty and therefore
515                     // can be used with the pipeline wrapping spliterator
516                     // Note that we cannot create a slice spliterator from
517                     // the source spliterator if the pipeline is not SIZED
518                     return Nodes.collectDouble(this, s, true);
519                 }
520                 else {
521                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
522                             invoke();
523                 }
524             }
525 
526             @Override
527             // Android-changed: Make public, to match the method it's overriding.
528             public Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
529                 return new Sink.ChainedDouble<Double>(sink) {
530                     long n = skip;
531                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
532 
533                     @Override
534                     public void begin(long size) {
535                         downstream.begin(calcSize(size, skip, m));
536                     }
537 
538                     @Override
539                     public void accept(double t) {
540                         if (n == 0) {
541                             if (m > 0) {
542                                 m--;
543                                 downstream.accept(t);
544                             }
545                         }
546                         else {
547                             n--;
548                         }
549                     }
550 
551                     @Override
552                     public boolean cancellationRequested() {
553                         return m == 0 || downstream.cancellationRequested();
554                     }
555                 };
556             }
557         };
558     }
559 
560     private static int flags(long limit) {
561         return StreamOpFlag.NOT_SIZED | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0);
562     }
563 
564     /**
565      * {@code ForkJoinTask} implementing slice computation.
566      *
567      * @param <P_IN> Input element type to the stream pipeline
568      * @param <P_OUT> Output element type from the stream pipeline
569      */
570     @SuppressWarnings("serial")
571     private static final class SliceTask<P_IN, P_OUT>
572             extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, SliceTask<P_IN, P_OUT>> {
573         private final AbstractPipeline<P_OUT, P_OUT, ?> op;
574         private final IntFunction<P_OUT[]> generator;
575         private final long targetOffset, targetSize;
576         private long thisNodeSize;
577 
578         private volatile boolean completed;
579 
580         SliceTask(AbstractPipeline<P_OUT, P_OUT, ?> op,
581                   PipelineHelper<P_OUT> helper,
582                   Spliterator<P_IN> spliterator,
583                   IntFunction<P_OUT[]> generator,
584                   long offset, long size) {
585             super(helper, spliterator);
586             this.op = op;
587             this.generator = generator;
588             this.targetOffset = offset;
589             this.targetSize = size;
590         }
591 
592         SliceTask(SliceTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
593             super(parent, spliterator);
594             this.op = parent.op;
595             this.generator = parent.generator;
596             this.targetOffset = parent.targetOffset;
597             this.targetSize = parent.targetSize;
598         }
599 
600         @Override
601         protected SliceTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
602             return new SliceTask<>(this, spliterator);
603         }
604 
605         @Override
606         protected final Node<P_OUT> getEmptyResult() {
607             return Nodes.emptyNode(op.getOutputShape());
608         }
609 
610         @Override
611         protected final Node<P_OUT> doLeaf() {
612             if (isRoot()) {
613                 long sizeIfKnown = StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags)
614                                    ? op.exactOutputSizeIfKnown(spliterator)
615                                    : -1;
616                 final Node.Builder<P_OUT> nb = op.makeNodeBuilder(sizeIfKnown, generator);
617                 Sink<P_OUT> opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb);
618                 helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator);
619                 // There is no need to truncate since the op performs the
620                 // skipping and limiting of elements
621                 return nb.build();
622             }
623             else {
624                 Node<P_OUT> node = helper.wrapAndCopyInto(helper.makeNodeBuilder(-1, generator),
625                                                           spliterator).build();
626                 thisNodeSize = node.count();
627                 completed = true;
628                 spliterator = null;
629                 return node;
630             }
631         }
632 
633         @Override
634         public final void onCompletion(CountedCompleter<?> caller) {
635             if (!isLeaf()) {
636                 Node<P_OUT> result;
637                 thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize;
638                 if (canceled) {
639                     thisNodeSize = 0;
640                     result = getEmptyResult();
641                 }
642                 else if (thisNodeSize == 0)
643                     result = getEmptyResult();
644                 else if (leftChild.thisNodeSize == 0)
645                     result = rightChild.getLocalResult();
646                 else {
647                     result = Nodes.conc(op.getOutputShape(),
648                                         leftChild.getLocalResult(), rightChild.getLocalResult());
649                 }
650                 setLocalResult(isRoot() ? doTruncate(result) : result);
651                 completed = true;
652             }
653             if (targetSize >= 0
654                 && !isRoot()
655                 && isLeftCompleted(targetOffset + targetSize))
656                     cancelLaterNodes();
657 
658             super.onCompletion(caller);
659         }
660 
661         @Override
662         protected void cancel() {
663             super.cancel();
664             if (completed)
665                 setLocalResult(getEmptyResult());
666         }
667 
668         private Node<P_OUT> doTruncate(Node<P_OUT> input) {
669             long to = targetSize >= 0 ? Math.min(input.count(), targetOffset + targetSize) : thisNodeSize;
670             return input.truncate(targetOffset, to, generator);
671         }
672 
673         /**
674          * Determine if the number of completed elements in this node and nodes
675          * to the left of this node is greater than or equal to the target size.
676          *
677          * @param target the target size
678          * @return true if the number of elements is greater than or equal to
679          *         the target size, otherwise false.
680          */
681         private boolean isLeftCompleted(long target) {
682             long size = completed ? thisNodeSize : completedSize(target);
683             if (size >= target)
684                 return true;
685             for (SliceTask<P_IN, P_OUT> parent = getParent(), node = this;
686                  parent != null;
687                  node = parent, parent = parent.getParent()) {
688                 if (node == parent.rightChild) {
689                     SliceTask<P_IN, P_OUT> left = parent.leftChild;
690                     if (left != null) {
691                         size += left.completedSize(target);
692                         if (size >= target)
693                             return true;
694                     }
695                 }
696             }
697             return size >= target;
698         }
699 
700         /**
701          * Compute the number of completed elements in this node.
702          * <p>
703          * Computation terminates if all nodes have been processed or the
704          * number of completed elements is greater than or equal to the target
705          * size.
706          *
707          * @param target the target size
708          * @return return the number of completed elements
709          */
710         private long completedSize(long target) {
711             if (completed)
712                 return thisNodeSize;
713             else {
714                 SliceTask<P_IN, P_OUT> left = leftChild;
715                 SliceTask<P_IN, P_OUT> right = rightChild;
716                 if (left == null || right == null) {
717                     // must be completed
718                     return thisNodeSize;
719                 }
720                 else {
721                     long leftSize = left.completedSize(target);
722                     return (leftSize >= target) ? leftSize : leftSize + right.completedSize(target);
723                 }
724             }
725         }
726     }
727 }
728