• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.LongSummaryStatistics;
28 import java.util.Objects;
29 import java.util.OptionalDouble;
30 import java.util.OptionalLong;
31 import java.util.PrimitiveIterator;
32 import java.util.Spliterator;
33 import java.util.Spliterators;
34 import java.util.function.BiConsumer;
35 import java.util.function.BinaryOperator;
36 import java.util.function.IntFunction;
37 import java.util.function.LongBinaryOperator;
38 import java.util.function.LongConsumer;
39 import java.util.function.LongFunction;
40 import java.util.function.LongPredicate;
41 import java.util.function.LongToDoubleFunction;
42 import java.util.function.LongToIntFunction;
43 import java.util.function.LongUnaryOperator;
44 import java.util.function.ObjLongConsumer;
45 import java.util.function.Supplier;
46 
47 /**
48  * Abstract base class for an intermediate pipeline stage or pipeline source
49  * stage implementing whose elements are of type {@code long}.
50  *
51  * @param <E_IN> type of elements in the upstream source
52  * @since 1.8
53  * @hide Visible for CTS testing only (OpenJDK8 tests).
54  */
55 // Android-changed: Made public for CTS tests only.
56 public abstract class LongPipeline<E_IN>
57         extends AbstractPipeline<E_IN, Long, LongStream>
58         implements LongStream {
59 
60     /**
61      * Constructor for the head of a stream pipeline.
62      *
63      * @param source {@code Supplier<Spliterator>} describing the stream source
64      * @param sourceFlags the source flags for the stream source, described in
65      *        {@link StreamOpFlag}
66      * @param parallel {@code true} if the pipeline is parallel
67      */
LongPipeline(Supplier<? extends Spliterator<Long>> source, int sourceFlags, boolean parallel)68     LongPipeline(Supplier<? extends Spliterator<Long>> source,
69                  int sourceFlags, boolean parallel) {
70         super(source, sourceFlags, parallel);
71     }
72 
73     /**
74      * Constructor for the head of a stream pipeline.
75      *
76      * @param source {@code Spliterator} describing the stream source
77      * @param sourceFlags the source flags for the stream source, described in
78      *        {@link StreamOpFlag}
79      * @param parallel {@code true} if the pipeline is parallel
80      */
LongPipeline(Spliterator<Long> source, int sourceFlags, boolean parallel)81     LongPipeline(Spliterator<Long> source,
82                  int sourceFlags, boolean parallel) {
83         super(source, sourceFlags, parallel);
84     }
85 
86     /**
87      * Constructor for appending an intermediate operation onto an existing pipeline.
88      *
89      * @param upstream the upstream element source.
90      * @param opFlags the operation flags
91      */
LongPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags)92     LongPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
93         super(upstream, opFlags);
94     }
95 
96     /**
97      * Adapt a {@code Sink<Long> to an {@code LongConsumer}, ideally simply
98      * by casting.
99      */
adapt(Sink<Long> sink)100     private static LongConsumer adapt(Sink<Long> sink) {
101         if (sink instanceof LongConsumer) {
102             return (LongConsumer) sink;
103         } else {
104             if (Tripwire.ENABLED)
105                 Tripwire.trip(AbstractPipeline.class,
106                               "using LongStream.adapt(Sink<Long> s)");
107             return sink::accept;
108         }
109     }
110 
111     /**
112      * Adapt a {@code Spliterator<Long>} to a {@code Spliterator.OfLong}.
113      *
114      * @implNote
115      * The implementation attempts to cast to a Spliterator.OfLong, and throws
116      * an exception if this cast is not possible.
117      */
adapt(Spliterator<Long> s)118     private static Spliterator.OfLong adapt(Spliterator<Long> s) {
119         if (s instanceof Spliterator.OfLong) {
120             return (Spliterator.OfLong) s;
121         } else {
122             if (Tripwire.ENABLED)
123                 Tripwire.trip(AbstractPipeline.class,
124                               "using LongStream.adapt(Spliterator<Long> s)");
125             throw new UnsupportedOperationException("LongStream.adapt(Spliterator<Long> s)");
126         }
127     }
128 
129 
130     // Shape-specific methods
131 
132     @Override
133     // Android-changed: Make public, to match the method it's overriding.
getOutputShape()134     public final StreamShape getOutputShape() {
135         return StreamShape.LONG_VALUE;
136     }
137 
138     @Override
139     // Android-changed: Make public, to match the method it's overriding.
evaluateToNode(PipelineHelper<Long> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<Long[]> generator)140     public final <P_IN> Node<Long> evaluateToNode(PipelineHelper<Long> helper,
141                                            Spliterator<P_IN> spliterator,
142                                            boolean flattenTree,
143                                            IntFunction<Long[]> generator) {
144         return Nodes.collectLong(helper, spliterator, flattenTree);
145     }
146 
147     @Override
148     // Android-changed: Make public, to match the method it's overriding.
wrap(PipelineHelper<Long> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel)149     public final <P_IN> Spliterator<Long> wrap(PipelineHelper<Long> ph,
150                                         Supplier<Spliterator<P_IN>> supplier,
151                                         boolean isParallel) {
152         return new StreamSpliterators.LongWrappingSpliterator<>(ph, supplier, isParallel);
153     }
154 
155     @Override
156     @SuppressWarnings("unchecked")
157     // Android-changed: Make public, to match the method it's overriding.
lazySpliterator(Supplier<? extends Spliterator<Long>> supplier)158     public final Spliterator.OfLong lazySpliterator(Supplier<? extends Spliterator<Long>> supplier) {
159         return new StreamSpliterators.DelegatingSpliterator.OfLong((Supplier<Spliterator.OfLong>) supplier);
160     }
161 
162     @Override
163     // Android-changed: Make public, to match the method it's overriding.
forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink)164     public final boolean forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) {
165         Spliterator.OfLong spl = adapt(spliterator);
166         LongConsumer adaptedSink =  adapt(sink);
167         boolean cancelled;
168         do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink));
169         return cancelled;
170     }
171 
172     @Override
173     // Android-changed: Make public, to match the method it's overriding.
makeNodeBuilder(long exactSizeIfKnown, IntFunction<Long[]> generator)174     public final Node.Builder<Long> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Long[]> generator) {
175         return Nodes.longBuilder(exactSizeIfKnown);
176     }
177 
178 
179     // LongStream
180 
181     @Override
iterator()182     public final PrimitiveIterator.OfLong iterator() {
183         return Spliterators.iterator(spliterator());
184     }
185 
186     @Override
spliterator()187     public final Spliterator.OfLong spliterator() {
188         return adapt(super.spliterator());
189     }
190 
191     // Stateless intermediate ops from LongStream
192 
193     @Override
asDoubleStream()194     public final DoubleStream asDoubleStream() {
195         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
196                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
197             @Override
198             // Android-changed: Make public, to match the method it's overriding.
199             public Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
200                 return new Sink.ChainedLong<Double>(sink) {
201                     @Override
202                     public void accept(long t) {
203                         downstream.accept((double) t);
204                     }
205                 };
206             }
207         };
208     }
209 
210     @Override
211     public final Stream<Long> boxed() {
212         return mapToObj(Long::valueOf);
213     }
214 
215     @Override
216     public final LongStream map(LongUnaryOperator mapper) {
217         Objects.requireNonNull(mapper);
218         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
219                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
220             @Override
221             // Android-changed: Make public, to match the method it's overriding.
222             public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
223                 return new Sink.ChainedLong<Long>(sink) {
224                     @Override
225                     public void accept(long t) {
226                         downstream.accept(mapper.applyAsLong(t));
227                     }
228                 };
229             }
230         };
231     }
232 
233     @Override
234     public final <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) {
235         Objects.requireNonNull(mapper);
236         return new ReferencePipeline.StatelessOp<Long, U>(this, StreamShape.LONG_VALUE,
237                                                           StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
238             @Override
239             // Android-changed: Make public, to match the method it's overriding.
240             public Sink<Long> opWrapSink(int flags, Sink<U> sink) {
241                 return new Sink.ChainedLong<U>(sink) {
242                     @Override
243                     public void accept(long t) {
244                         downstream.accept(mapper.apply(t));
245                     }
246                 };
247             }
248         };
249     }
250 
251     @Override
252     public final IntStream mapToInt(LongToIntFunction mapper) {
253         Objects.requireNonNull(mapper);
254         return new IntPipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
255                                                  StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
256             @Override
257             // Android-changed: Make public, to match the method it's overriding.
258             public Sink<Long> opWrapSink(int flags, Sink<Integer> sink) {
259                 return new Sink.ChainedLong<Integer>(sink) {
260                     @Override
261                     public void accept(long t) {
262                         downstream.accept(mapper.applyAsInt(t));
263                     }
264                 };
265             }
266         };
267     }
268 
269     @Override
270     public final DoubleStream mapToDouble(LongToDoubleFunction mapper) {
271         Objects.requireNonNull(mapper);
272         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
273                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
274             @Override
275             // Android-changed: Make public, to match the method it's overriding.
276             public Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
277                 return new Sink.ChainedLong<Double>(sink) {
278                     @Override
279                     public void accept(long t) {
280                         downstream.accept(mapper.applyAsDouble(t));
281                     }
282                 };
283             }
284         };
285     }
286 
287     @Override
288     public final LongStream flatMap(LongFunction<? extends LongStream> mapper) {
289         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
290                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
291             @Override
292             // Android-changed: Make public, to match the method it's overriding.
293             public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
294                 return new Sink.ChainedLong<Long>(sink) {
295                     @Override
296                     public void begin(long size) {
297                         downstream.begin(-1);
298                     }
299 
300                     @Override
301                     public void accept(long t) {
302                         try (LongStream result = mapper.apply(t)) {
303                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
304                             if (result != null)
305                                 result.sequential().forEach(i -> downstream.accept(i));
306                         }
307                     }
308                 };
309             }
310         };
311     }
312 
313     @Override
314     public LongStream unordered() {
315         if (!isOrdered())
316             return this;
317         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE, StreamOpFlag.NOT_ORDERED) {
318             @Override
319             // Android-changed: Make public, to match the method it's overriding.
320             public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
321                 return sink;
322             }
323         };
324     }
325 
326     @Override
327     public final LongStream filter(LongPredicate predicate) {
328         Objects.requireNonNull(predicate);
329         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
330                                      StreamOpFlag.NOT_SIZED) {
331             @Override
332             // Android-changed: Make public, to match the method it's overriding.
333             public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
334                 return new Sink.ChainedLong<Long>(sink) {
335                     @Override
336                     public void begin(long size) {
337                         downstream.begin(-1);
338                     }
339 
340                     @Override
341                     public void accept(long t) {
342                         if (predicate.test(t))
343                             downstream.accept(t);
344                     }
345                 };
346             }
347         };
348     }
349 
350     @Override
351     public final LongStream peek(LongConsumer action) {
352         Objects.requireNonNull(action);
353         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
354                                      0) {
355             @Override
356             // Android-changed: Make public, to match the method it's overriding.
357             public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
358                 return new Sink.ChainedLong<Long>(sink) {
359                     @Override
360                     public void accept(long t) {
361                         action.accept(t);
362                         downstream.accept(t);
363                     }
364                 };
365             }
366         };
367     }
368 
369     // Stateful intermediate ops from LongStream
370 
371     @Override
372     public final LongStream limit(long maxSize) {
373         if (maxSize < 0)
374             throw new IllegalArgumentException(Long.toString(maxSize));
375         return SliceOps.makeLong(this, 0, maxSize);
376     }
377 
378     @Override
379     public final LongStream skip(long n) {
380         if (n < 0)
381             throw new IllegalArgumentException(Long.toString(n));
382         if (n == 0)
383             return this;
384         else
385             return SliceOps.makeLong(this, n, -1);
386     }
387 
388     @Override
389     public final LongStream sorted() {
390         return SortedOps.makeLong(this);
391     }
392 
393     @Override
394     public final LongStream distinct() {
395         // While functional and quick to implement, this approach is not very efficient.
396         // An efficient version requires a long-specific map/set implementation.
397         return boxed().distinct().mapToLong(i -> (long) i);
398     }
399 
400     // Terminal ops from LongStream
401 
402     @Override
403     public void forEach(LongConsumer action) {
404         evaluate(ForEachOps.makeLong(action, false));
405     }
406 
407     @Override
408     public void forEachOrdered(LongConsumer action) {
409         evaluate(ForEachOps.makeLong(action, true));
410     }
411 
412     @Override
413     public final long sum() {
414         // use better algorithm to compensate for intermediate overflow?
415         return reduce(0, Long::sum);
416     }
417 
418     @Override
419     public final OptionalLong min() {
420         return reduce(Math::min);
421     }
422 
423     @Override
424     public final OptionalLong max() {
425         return reduce(Math::max);
426     }
427 
428     @Override
429     public final OptionalDouble average() {
430         long[] avg = collect(() -> new long[2],
431                              (ll, i) -> {
432                                  ll[0]++;
433                                  ll[1] += i;
434                              },
435                              (ll, rr) -> {
436                                  ll[0] += rr[0];
437                                  ll[1] += rr[1];
438                              });
439         return avg[0] > 0
440                ? OptionalDouble.of((double) avg[1] / avg[0])
441                : OptionalDouble.empty();
442     }
443 
444     @Override
445     public final long count() {
446         return map(e -> 1L).sum();
447     }
448 
449     @Override
450     public final LongSummaryStatistics summaryStatistics() {
451         return collect(LongSummaryStatistics::new, LongSummaryStatistics::accept,
452                        LongSummaryStatistics::combine);
453     }
454 
455     @Override
456     public final long reduce(long identity, LongBinaryOperator op) {
457         return evaluate(ReduceOps.makeLong(identity, op));
458     }
459 
460     @Override
461     public final OptionalLong reduce(LongBinaryOperator op) {
462         return evaluate(ReduceOps.makeLong(op));
463     }
464 
465     @Override
466     public final <R> R collect(Supplier<R> supplier,
467                                ObjLongConsumer<R> accumulator,
468                                BiConsumer<R, R> combiner) {
469         BinaryOperator<R> operator = (left, right) -> {
470             combiner.accept(left, right);
471             return left;
472         };
473         return evaluate(ReduceOps.makeLong(supplier, accumulator, operator));
474     }
475 
476     @Override
477     public final boolean anyMatch(LongPredicate predicate) {
478         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ANY));
479     }
480 
481     @Override
482     public final boolean allMatch(LongPredicate predicate) {
483         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ALL));
484     }
485 
486     @Override
487     public final boolean noneMatch(LongPredicate predicate) {
488         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.NONE));
489     }
490 
491     @Override
492     public final OptionalLong findFirst() {
493         return evaluate(FindOps.makeLong(true));
494     }
495 
496     @Override
497     public final OptionalLong findAny() {
498         return evaluate(FindOps.makeLong(false));
499     }
500 
501     @Override
502     public final long[] toArray() {
503         return Nodes.flattenLong((Node.OfLong) evaluateToArrayNode(Long[]::new))
504                 .asPrimitiveArray();
505     }
506 
507 
508     //
509 
510     /**
511      * Source stage of a LongPipeline.
512      *
513      * @param <E_IN> type of elements in the upstream source
514      * @since 1.8
515      * @hide Made public for CTS tests only (OpenJDK 8 streams tests).
516      */
517     // Android-changed: Made public for CTS tests only.
518     public static class Head<E_IN> extends LongPipeline<E_IN> {
519         /**
520          * Constructor for the source stage of a LongStream.
521          *
522          * @param source {@code Supplier<Spliterator>} describing the stream
523          *               source
524          * @param sourceFlags the source flags for the stream source, described
525          *                    in {@link StreamOpFlag}
526          * @param parallel {@code true} if the pipeline is parallel
527          */
528         // Android-changed: Made public for CTS tests only.
529         public Head(Supplier<? extends Spliterator<Long>> source,
530              int sourceFlags, boolean parallel) {
531             super(source, sourceFlags, parallel);
532         }
533 
534         /**
535          * Constructor for the source stage of a LongStream.
536          *
537          * @param source {@code Spliterator} describing the stream source
538          * @param sourceFlags the source flags for the stream source, described
539          *                    in {@link StreamOpFlag}
540          * @param parallel {@code true} if the pipeline is parallel
541          */
542         // Android-changed: Made public for CTS tests only.
543         public Head(Spliterator<Long> source,
544              int sourceFlags, boolean parallel) {
545             super(source, sourceFlags, parallel);
546         }
547 
548         @Override
549         // Android-changed: Make public, to match the method it's overriding.
550         public final boolean opIsStateful() {
551             throw new UnsupportedOperationException();
552         }
553 
554         @Override
555         // Android-changed: Make public, to match the method it's overriding.
556         public final Sink<E_IN> opWrapSink(int flags, Sink<Long> sink) {
557             throw new UnsupportedOperationException();
558         }
559 
560         // Optimized sequential terminal operations for the head of the pipeline
561 
562         @Override
563         public void forEach(LongConsumer action) {
564             if (!isParallel()) {
565                 adapt(sourceStageSpliterator()).forEachRemaining(action);
566             } else {
567                 super.forEach(action);
568             }
569         }
570 
571         @Override
572         public void forEachOrdered(LongConsumer action) {
573             if (!isParallel()) {
574                 adapt(sourceStageSpliterator()).forEachRemaining(action);
575             } else {
576                 super.forEachOrdered(action);
577             }
578         }
579     }
580 
581     /** Base class for a stateless intermediate stage of a LongStream.
582      *
583      * @param <E_IN> type of elements in the upstream source
584      * @since 1.8
585      * @hide Visible for CTS testing only (OpenJDK8 tests).
586      */
587     // Android-changed: Made public for CTS tests only.
588     public abstract static class StatelessOp<E_IN> extends LongPipeline<E_IN> {
589         /**
590          * Construct a new LongStream by appending a stateless intermediate
591          * operation to an existing stream.
592          * @param upstream The upstream pipeline stage
593          * @param inputShape The stream shape for the upstream pipeline stage
594          * @param opFlags Operation flags for the new stage
595          */
596         // Android-changed: Made public for CTS tests only.
597         public StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
598                     StreamShape inputShape,
599                     int opFlags) {
600             super(upstream, opFlags);
601             assert upstream.getOutputShape() == inputShape;
602         }
603 
604         @Override
605         // Android-changed: Make public, to match the method it's overriding.
606         public final boolean opIsStateful() {
607             return false;
608         }
609     }
610 
611     /**
612      * Base class for a stateful intermediate stage of a LongStream.
613      *
614      * @param <E_IN> type of elements in the upstream source
615      * @since 1.8
616      * @hide Visible for CTS testing only (OpenJDK8 tests).
617      */
618     // Android-changed: Made public for CTS tests only.
619     public abstract static class StatefulOp<E_IN> extends LongPipeline<E_IN> {
620         /**
621          * Construct a new LongStream by appending a stateful intermediate
622          * operation to an existing stream.
623          * @param upstream The upstream pipeline stage
624          * @param inputShape The stream shape for the upstream pipeline stage
625          * @param opFlags Operation flags for the new stage
626          * @hide Visible for CTS testing only (OpenJDK8 tests).
627          */
628         // Android-changed: Made public for CTS tests only.
629         public StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
630                    StreamShape inputShape,
631                    int opFlags) {
632             super(upstream, opFlags);
633             assert upstream.getOutputShape() == inputShape;
634         }
635 
636         @Override
637         // Android-changed: Make public, to match the method it's overriding.
638         public final boolean opIsStateful() {
639             return true;
640         }
641 
642         @Override
643         // Android-changed: Make public, to match the method it's overriding.
644         public abstract <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
645                                                       Spliterator<P_IN> spliterator,
646                                                       IntFunction<Long[]> generator);
647     }
648 }
649