• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2012, 2014, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.IntSummaryStatistics;
28 import java.util.Objects;
29 import java.util.OptionalDouble;
30 import java.util.OptionalInt;
31 import java.util.PrimitiveIterator;
32 import java.util.Spliterator;
33 import java.util.Spliterators;
34 import java.util.function.BiConsumer;
35 import java.util.function.BinaryOperator;
36 import java.util.function.IntBinaryOperator;
37 import java.util.function.IntConsumer;
38 import java.util.function.IntFunction;
39 import java.util.function.IntPredicate;
40 import java.util.function.IntToDoubleFunction;
41 import java.util.function.IntToLongFunction;
42 import java.util.function.IntUnaryOperator;
43 import java.util.function.ObjIntConsumer;
44 import java.util.function.Supplier;
45 
46 /**
47  * Abstract base class for an intermediate pipeline stage or pipeline source
48  * stage implementing whose elements are of type {@code int}.
49  *
50  * @param <E_IN> type of elements in the upstream source
51  * @since 1.8
52  * @hide Visible for CTS testing only (OpenJDK8 tests).
53  */
54 // Android-changed: Made public for CTS tests only.
55 public abstract class IntPipeline<E_IN>
56         extends AbstractPipeline<E_IN, Integer, IntStream>
57         implements IntStream {
58 
59     /**
60      * Constructor for the head of a stream pipeline.
61      *
62      * @param source {@code Supplier<Spliterator>} describing the stream source
63      * @param sourceFlags The source flags for the stream source, described in
64      *        {@link StreamOpFlag}
65      * @param parallel {@code true} if the pipeline is parallel
66      */
IntPipeline(Supplier<? extends Spliterator<Integer>> source, int sourceFlags, boolean parallel)67     IntPipeline(Supplier<? extends Spliterator<Integer>> source,
68                 int sourceFlags, boolean parallel) {
69         super(source, sourceFlags, parallel);
70     }
71 
72     /**
73      * Constructor for the head of a stream pipeline.
74      *
75      * @param source {@code Spliterator} describing the stream source
76      * @param sourceFlags The source flags for the stream source, described in
77      *        {@link StreamOpFlag}
78      * @param parallel {@code true} if the pipeline is parallel
79      */
IntPipeline(Spliterator<Integer> source, int sourceFlags, boolean parallel)80     IntPipeline(Spliterator<Integer> source,
81                 int sourceFlags, boolean parallel) {
82         super(source, sourceFlags, parallel);
83     }
84 
85     /**
86      * Constructor for appending an intermediate operation onto an existing
87      * pipeline.
88      *
89      * @param upstream the upstream element source
90      * @param opFlags the operation flags for the new operation
91      */
IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags)92     IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
93         super(upstream, opFlags);
94     }
95 
96     /**
97      * Adapt a {@code Sink<Integer> to an {@code IntConsumer}, ideally simply
98      * by casting.
99      */
adapt(Sink<Integer> sink)100     private static IntConsumer adapt(Sink<Integer> sink) {
101         if (sink instanceof IntConsumer) {
102             return (IntConsumer) sink;
103         }
104         else {
105             if (Tripwire.ENABLED)
106                 Tripwire.trip(AbstractPipeline.class,
107                               "using IntStream.adapt(Sink<Integer> s)");
108             return sink::accept;
109         }
110     }
111 
112     /**
113      * Adapt a {@code Spliterator<Integer>} to a {@code Spliterator.OfInt}.
114      *
115      * @implNote
116      * The implementation attempts to cast to a Spliterator.OfInt, and throws an
117      * exception if this cast is not possible.
118      */
adapt(Spliterator<Integer> s)119     private static Spliterator.OfInt adapt(Spliterator<Integer> s) {
120         if (s instanceof Spliterator.OfInt) {
121             return (Spliterator.OfInt) s;
122         }
123         else {
124             if (Tripwire.ENABLED)
125                 Tripwire.trip(AbstractPipeline.class,
126                               "using IntStream.adapt(Spliterator<Integer> s)");
127             throw new UnsupportedOperationException("IntStream.adapt(Spliterator<Integer> s)");
128         }
129     }
130 
131 
132     // Shape-specific methods
133 
134     @Override
135     // Android-changed: Make public, to match the method it's overriding.
getOutputShape()136     public final StreamShape getOutputShape() {
137         return StreamShape.INT_VALUE;
138     }
139 
140     @Override
141     // Android-changed: Make public, to match the method it's overriding.
evaluateToNode(PipelineHelper<Integer> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<Integer[]> generator)142     public final <P_IN> Node<Integer> evaluateToNode(PipelineHelper<Integer> helper,
143                                                      Spliterator<P_IN> spliterator,
144                                                      boolean flattenTree,
145                                                      IntFunction<Integer[]> generator) {
146         return Nodes.collectInt(helper, spliterator, flattenTree);
147     }
148 
149     @Override
150     // Android-changed: Make public, to match the method it's overriding.
wrap(PipelineHelper<Integer> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel)151     public final <P_IN> Spliterator<Integer> wrap(PipelineHelper<Integer> ph,
152                                                   Supplier<Spliterator<P_IN>> supplier,
153                                                   boolean isParallel) {
154         return new StreamSpliterators.IntWrappingSpliterator<>(ph, supplier, isParallel);
155     }
156 
157     @Override
158     @SuppressWarnings("unchecked")
159     // Android-changed: Make public, to match the method it's overriding.
lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier)160     public final Spliterator.OfInt lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier) {
161         return new StreamSpliterators.DelegatingSpliterator.OfInt((Supplier<Spliterator.OfInt>) supplier);
162     }
163 
164     @Override
165     // Android-changed: Make public, to match the method it's overriding.
forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink)166     public final boolean forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) {
167         Spliterator.OfInt spl = adapt(spliterator);
168         IntConsumer adaptedSink = adapt(sink);
169         boolean cancelled;
170         do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink));
171         return cancelled;
172     }
173 
174     @Override
175     // Android-changed: Make public, to match the method it's overriding.
makeNodeBuilder(long exactSizeIfKnown, IntFunction<Integer[]> generator)176     public final Node.Builder<Integer> makeNodeBuilder(long exactSizeIfKnown,
177                                                        IntFunction<Integer[]> generator) {
178         return Nodes.intBuilder(exactSizeIfKnown);
179     }
180 
181 
182     // IntStream
183 
184     @Override
iterator()185     public final PrimitiveIterator.OfInt iterator() {
186         return Spliterators.iterator(spliterator());
187     }
188 
189     @Override
spliterator()190     public final Spliterator.OfInt spliterator() {
191         return adapt(super.spliterator());
192     }
193 
194     // Stateless intermediate ops from IntStream
195 
196     @Override
asLongStream()197     public final LongStream asLongStream() {
198         return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
199                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
200             @Override
201             public Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
202                 return new Sink.ChainedInt<Long>(sink) {
203                     @Override
204                     public void accept(int t) {
205                         downstream.accept((long) t);
206                     }
207                 };
208             }
209         };
210     }
211 
212     @Override
213     public final DoubleStream asDoubleStream() {
214         return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
215                                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
216             @Override
217             // Android-changed: Make public, to match the method it's overriding.
218             public Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
219                 return new Sink.ChainedInt<Double>(sink) {
220                     @Override
221                     public void accept(int t) {
222                         downstream.accept((double) t);
223                     }
224                 };
225             }
226         };
227     }
228 
229     @Override
230     public final Stream<Integer> boxed() {
231         return mapToObj(Integer::valueOf);
232     }
233 
234     @Override
235     public final IntStream map(IntUnaryOperator mapper) {
236         Objects.requireNonNull(mapper);
237         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
238                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
239             @Override
240             // Android-changed: Make public, to match the method it's overriding.
241             public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
242                 return new Sink.ChainedInt<Integer>(sink) {
243                     @Override
244                     public void accept(int t) {
245                         downstream.accept(mapper.applyAsInt(t));
246                     }
247                 };
248             }
249         };
250     }
251 
252     @Override
253     public final <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) {
254         Objects.requireNonNull(mapper);
255         return new ReferencePipeline.StatelessOp<Integer, U>(this, StreamShape.INT_VALUE,
256                                                              StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
257             @Override
258             // Android-changed: Make public, to match the method it's overriding.
259             public Sink<Integer> opWrapSink(int flags, Sink<U> sink) {
260                 return new Sink.ChainedInt<U>(sink) {
261                     @Override
262                     public void accept(int t) {
263                         downstream.accept(mapper.apply(t));
264                     }
265                 };
266             }
267         };
268     }
269 
270     @Override
271     public final LongStream mapToLong(IntToLongFunction mapper) {
272         Objects.requireNonNull(mapper);
273         return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
274                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
275             @Override
276             // Android-changed: Make public, to match the method it's overriding.
277             public Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
278                 return new Sink.ChainedInt<Long>(sink) {
279                     @Override
280                     public void accept(int t) {
281                         downstream.accept(mapper.applyAsLong(t));
282                     }
283                 };
284             }
285         };
286     }
287 
288     @Override
289     public final DoubleStream mapToDouble(IntToDoubleFunction mapper) {
290         Objects.requireNonNull(mapper);
291         return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
292                                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
293             @Override
294             // Android-changed: Make public, to match the method it's overriding.
295             public Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
296                 return new Sink.ChainedInt<Double>(sink) {
297                     @Override
298                     public void accept(int t) {
299                         downstream.accept(mapper.applyAsDouble(t));
300                     }
301                 };
302             }
303         };
304     }
305 
306     @Override
307     public final IntStream flatMap(IntFunction<? extends IntStream> mapper) {
308         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
309                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
310             @Override
311             // Android-changed: Make public, to match the method it's overriding.
312             public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
313                 return new Sink.ChainedInt<Integer>(sink) {
314                     @Override
315                     public void begin(long size) {
316                         downstream.begin(-1);
317                     }
318 
319                     @Override
320                     public void accept(int t) {
321                         try (IntStream result = mapper.apply(t)) {
322                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
323                             if (result != null)
324                                 result.sequential().forEach(i -> downstream.accept(i));
325                         }
326                     }
327                 };
328             }
329         };
330     }
331 
332     @Override
333     public IntStream unordered() {
334         if (!isOrdered())
335             return this;
336         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, StreamOpFlag.NOT_ORDERED) {
337             @Override
338             // Android-changed: Make public, to match the method it's overriding.
339             public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
340                 return sink;
341             }
342         };
343     }
344 
345     @Override
346     public final IntStream filter(IntPredicate predicate) {
347         Objects.requireNonNull(predicate);
348         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
349                                         StreamOpFlag.NOT_SIZED) {
350             @Override
351             // Android-changed: Make public, to match the method it's overriding.
352             public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
353                 return new Sink.ChainedInt<Integer>(sink) {
354                     @Override
355                     public void begin(long size) {
356                         downstream.begin(-1);
357                     }
358 
359                     @Override
360                     public void accept(int t) {
361                         if (predicate.test(t))
362                             downstream.accept(t);
363                     }
364                 };
365             }
366         };
367     }
368 
369     @Override
370     public final IntStream peek(IntConsumer action) {
371         Objects.requireNonNull(action);
372         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
373                                         0) {
374             @Override
375             // Android-changed: Make public, to match the method it's overriding.
376             public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
377                 return new Sink.ChainedInt<Integer>(sink) {
378                     @Override
379                     public void accept(int t) {
380                         action.accept(t);
381                         downstream.accept(t);
382                     }
383                 };
384             }
385         };
386     }
387 
388     // Stateful intermediate ops from IntStream
389 
390     @Override
391     public final IntStream limit(long maxSize) {
392         if (maxSize < 0)
393             throw new IllegalArgumentException(Long.toString(maxSize));
394         return SliceOps.makeInt(this, 0, maxSize);
395     }
396 
397     @Override
398     public final IntStream skip(long n) {
399         if (n < 0)
400             throw new IllegalArgumentException(Long.toString(n));
401         if (n == 0)
402             return this;
403         else
404             return SliceOps.makeInt(this, n, -1);
405     }
406 
407     @Override
408     public final IntStream sorted() {
409         return SortedOps.makeInt(this);
410     }
411 
412     @Override
413     public final IntStream distinct() {
414         // While functional and quick to implement, this approach is not very efficient.
415         // An efficient version requires an int-specific map/set implementation.
416         return boxed().distinct().mapToInt(i -> i);
417     }
418 
419     // Terminal ops from IntStream
420 
421     @Override
422     public void forEach(IntConsumer action) {
423         evaluate(ForEachOps.makeInt(action, false));
424     }
425 
426     @Override
427     public void forEachOrdered(IntConsumer action) {
428         evaluate(ForEachOps.makeInt(action, true));
429     }
430 
431     @Override
432     public final int sum() {
433         return reduce(0, Integer::sum);
434     }
435 
436     @Override
437     public final OptionalInt min() {
438         return reduce(Math::min);
439     }
440 
441     @Override
442     public final OptionalInt max() {
443         return reduce(Math::max);
444     }
445 
446     @Override
447     public final long count() {
448         return mapToLong(e -> 1L).sum();
449     }
450 
451     @Override
452     public final OptionalDouble average() {
453         long[] avg = collect(() -> new long[2],
454                              (ll, i) -> {
455                                  ll[0]++;
456                                  ll[1] += i;
457                              },
458                              (ll, rr) -> {
459                                  ll[0] += rr[0];
460                                  ll[1] += rr[1];
461                              });
462         return avg[0] > 0
463                ? OptionalDouble.of((double) avg[1] / avg[0])
464                : OptionalDouble.empty();
465     }
466 
467     @Override
468     public final IntSummaryStatistics summaryStatistics() {
469         return collect(IntSummaryStatistics::new, IntSummaryStatistics::accept,
470                        IntSummaryStatistics::combine);
471     }
472 
473     @Override
474     public final int reduce(int identity, IntBinaryOperator op) {
475         return evaluate(ReduceOps.makeInt(identity, op));
476     }
477 
478     @Override
479     public final OptionalInt reduce(IntBinaryOperator op) {
480         return evaluate(ReduceOps.makeInt(op));
481     }
482 
483     @Override
484     public final <R> R collect(Supplier<R> supplier,
485                                ObjIntConsumer<R> accumulator,
486                                BiConsumer<R, R> combiner) {
487         BinaryOperator<R> operator = (left, right) -> {
488             combiner.accept(left, right);
489             return left;
490         };
491         return evaluate(ReduceOps.makeInt(supplier, accumulator, operator));
492     }
493 
494     @Override
495     public final boolean anyMatch(IntPredicate predicate) {
496         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ANY));
497     }
498 
499     @Override
500     public final boolean allMatch(IntPredicate predicate) {
501         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ALL));
502     }
503 
504     @Override
505     public final boolean noneMatch(IntPredicate predicate) {
506         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.NONE));
507     }
508 
509     @Override
510     public final OptionalInt findFirst() {
511         return evaluate(FindOps.makeInt(true));
512     }
513 
514     @Override
515     public final OptionalInt findAny() {
516         return evaluate(FindOps.makeInt(false));
517     }
518 
519     @Override
520     public final int[] toArray() {
521         return Nodes.flattenInt((Node.OfInt) evaluateToArrayNode(Integer[]::new))
522                         .asPrimitiveArray();
523     }
524 
525     //
526 
527     /**
528      * Source stage of an IntStream.
529      *
530      * @param <E_IN> type of elements in the upstream source
531      * @since 1.8
532      * @hide Visible for CTS testing only (OpenJDK8 tests).
533      */
534     // Android-changed: Made public for CTS tests only.
535     public static class Head<E_IN> extends IntPipeline<E_IN> {
536         /**
537          * Constructor for the source stage of an IntStream.
538          *
539          * @param source {@code Supplier<Spliterator>} describing the stream
540          *               source
541          * @param sourceFlags the source flags for the stream source, described
542          *                    in {@link StreamOpFlag}
543          * @param parallel {@code true} if the pipeline is parallel
544          */
545         // Android-changed: Made public for CTS tests only.
546         public Head(Supplier<? extends Spliterator<Integer>> source,
547              int sourceFlags, boolean parallel) {
548             super(source, sourceFlags, parallel);
549         }
550 
551         /**
552          * Constructor for the source stage of an IntStream.
553          *
554          * @param source {@code Spliterator} describing the stream source
555          * @param sourceFlags the source flags for the stream source, described
556          *                    in {@link StreamOpFlag}
557          * @param parallel {@code true} if the pipeline is parallel
558          */
559         // Android-changed: Made public for CTS tests only.
560         public Head(Spliterator<Integer> source,
561              int sourceFlags, boolean parallel) {
562             super(source, sourceFlags, parallel);
563         }
564 
565         @Override
566         // Android-changed: Make public, to match the method it's overriding.
567         public final boolean opIsStateful() {
568             throw new UnsupportedOperationException();
569         }
570 
571         @Override
572         // Android-changed: Make public, to match the method it's overriding.
573         public final Sink<E_IN> opWrapSink(int flags, Sink<Integer> sink) {
574             throw new UnsupportedOperationException();
575         }
576 
577         // Optimized sequential terminal operations for the head of the pipeline
578 
579         @Override
580         public void forEach(IntConsumer action) {
581             if (!isParallel()) {
582                 adapt(sourceStageSpliterator()).forEachRemaining(action);
583             }
584             else {
585                 super.forEach(action);
586             }
587         }
588 
589         @Override
590         public void forEachOrdered(IntConsumer action) {
591             if (!isParallel()) {
592                 adapt(sourceStageSpliterator()).forEachRemaining(action);
593             }
594             else {
595                 super.forEachOrdered(action);
596             }
597         }
598     }
599 
600     /**
601      * Base class for a stateless intermediate stage of an IntStream
602      *
603      * @param <E_IN> type of elements in the upstream source
604      * @since 1.8
605      * @hide Visible for CTS testing only (OpenJDK8 tests).
606      */
607     // Android-changed: Made public for CTS tests only.
608     public abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN> {
609         /**
610          * Construct a new IntStream by appending a stateless intermediate
611          * operation to an existing stream.
612          * @param upstream The upstream pipeline stage
613          * @param inputShape The stream shape for the upstream pipeline stage
614          * @param opFlags Operation flags for the new stage
615          */
616         // Android-changed: Made public for CTS tests only.
617         public StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
618                     StreamShape inputShape,
619                     int opFlags) {
620             super(upstream, opFlags);
621             assert upstream.getOutputShape() == inputShape;
622         }
623 
624         @Override
625         // Android-changed: Make public, to match the method it's overriding.
626         public final boolean opIsStateful() {
627             return false;
628         }
629     }
630 
631     /**
632      * Base class for a stateful intermediate stage of an IntStream.
633      *
634      * @param <E_IN> type of elements in the upstream source
635      * @since 1.8
636      * @hide Visible for CTS testing only (OpenJDK8 tests).
637      */
638     // Android-changed: Made public for CTS tests only.
639     public abstract static class StatefulOp<E_IN> extends IntPipeline<E_IN> {
640         /**
641          * Construct a new IntStream by appending a stateful intermediate
642          * operation to an existing stream.
643          * @param upstream The upstream pipeline stage
644          * @param inputShape The stream shape for the upstream pipeline stage
645          * @param opFlags Operation flags for the new stage
646          */
647         // Android-changed: Made public for CTS tests only.
648         public StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
649                    StreamShape inputShape,
650                    int opFlags) {
651             super(upstream, opFlags);
652             assert upstream.getOutputShape() == inputShape;
653         }
654 
655         @Override
656         // Android-changed: Make public, to match the method it's overriding.
657         public final boolean opIsStateful() {
658             return true;
659         }
660 
661         @Override
662         // Android-changed: Make public, to match the method it's overriding.
663         public abstract <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
664                                                          Spliterator<P_IN> spliterator,
665                                                          IntFunction<Integer[]> generator);
666     }
667 }
668