• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2012, 2014, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.IntSummaryStatistics;
28 import java.util.Objects;
29 import java.util.OptionalDouble;
30 import java.util.OptionalInt;
31 import java.util.PrimitiveIterator;
32 import java.util.Spliterator;
33 import java.util.Spliterators;
34 import java.util.function.BiConsumer;
35 import java.util.function.BinaryOperator;
36 import java.util.function.IntBinaryOperator;
37 import java.util.function.IntConsumer;
38 import java.util.function.IntFunction;
39 import java.util.function.IntPredicate;
40 import java.util.function.IntToDoubleFunction;
41 import java.util.function.IntToLongFunction;
42 import java.util.function.IntUnaryOperator;
43 import java.util.function.ObjIntConsumer;
44 import java.util.function.Supplier;
45 
46 /**
47  * Abstract base class for an intermediate pipeline stage or pipeline source
48  * stage implementing whose elements are of type {@code int}.
49  *
50  * @param <E_IN> type of elements in the upstream source
51  * @since 1.8
52  * @hide Visible for CTS testing only (OpenJDK8 tests).
53  */
54 // Android-changed: Made public for CTS tests only.
55 public abstract class IntPipeline<E_IN>
56         extends AbstractPipeline<E_IN, Integer, IntStream>
57         implements IntStream {
58 
59     /**
60      * Constructor for the head of a stream pipeline.
61      *
62      * @param source {@code Supplier<Spliterator>} describing the stream source
63      * @param sourceFlags The source flags for the stream source, described in
64      *        {@link StreamOpFlag}
65      * @param parallel {@code true} if the pipeline is parallel
66      */
IntPipeline(Supplier<? extends Spliterator<Integer>> source, int sourceFlags, boolean parallel)67     IntPipeline(Supplier<? extends Spliterator<Integer>> source,
68                 int sourceFlags, boolean parallel) {
69         super(source, sourceFlags, parallel);
70     }
71 
72     /**
73      * Constructor for the head of a stream pipeline.
74      *
75      * @param source {@code Spliterator} describing the stream source
76      * @param sourceFlags The source flags for the stream source, described in
77      *        {@link StreamOpFlag}
78      * @param parallel {@code true} if the pipeline is parallel
79      */
IntPipeline(Spliterator<Integer> source, int sourceFlags, boolean parallel)80     IntPipeline(Spliterator<Integer> source,
81                 int sourceFlags, boolean parallel) {
82         super(source, sourceFlags, parallel);
83     }
84 
85     /**
86      * Constructor for appending an intermediate operation onto an existing
87      * pipeline.
88      *
89      * @param upstream the upstream element source
90      * @param opFlags the operation flags for the new operation
91      */
IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags)92     IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
93         super(upstream, opFlags);
94     }
95 
96     /**
97      * Adapt a {@code Sink<Integer> to an {@code IntConsumer}, ideally simply
98      * by casting.
99      */
adapt(Sink<Integer> sink)100     private static IntConsumer adapt(Sink<Integer> sink) {
101         if (sink instanceof IntConsumer) {
102             return (IntConsumer) sink;
103         }
104         else {
105             if (Tripwire.ENABLED)
106                 Tripwire.trip(AbstractPipeline.class,
107                               "using IntStream.adapt(Sink<Integer> s)");
108             return sink::accept;
109         }
110     }
111 
112     /**
113      * Adapt a {@code Spliterator<Integer>} to a {@code Spliterator.OfInt}.
114      *
115      * @implNote
116      * The implementation attempts to cast to a Spliterator.OfInt, and throws an
117      * exception if this cast is not possible.
118      */
adapt(Spliterator<Integer> s)119     private static Spliterator.OfInt adapt(Spliterator<Integer> s) {
120         if (s instanceof Spliterator.OfInt) {
121             return (Spliterator.OfInt) s;
122         }
123         else {
124             if (Tripwire.ENABLED)
125                 Tripwire.trip(AbstractPipeline.class,
126                               "using IntStream.adapt(Spliterator<Integer> s)");
127             throw new UnsupportedOperationException("IntStream.adapt(Spliterator<Integer> s)");
128         }
129     }
130 
131 
132     // Shape-specific methods
133 
134     @Override
135     // Android-changed: Make public, to match the method it's overriding.
getOutputShape()136     public final StreamShape getOutputShape() {
137         return StreamShape.INT_VALUE;
138     }
139 
140     @Override
141     // Android-changed: Make public, to match the method it's overriding.
evaluateToNode(PipelineHelper<Integer> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<Integer[]> generator)142     public final <P_IN> Node<Integer> evaluateToNode(PipelineHelper<Integer> helper,
143                                                      Spliterator<P_IN> spliterator,
144                                                      boolean flattenTree,
145                                                      IntFunction<Integer[]> generator) {
146         return Nodes.collectInt(helper, spliterator, flattenTree);
147     }
148 
149     @Override
150     // Android-changed: Make public, to match the method it's overriding.
wrap(PipelineHelper<Integer> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel)151     public final <P_IN> Spliterator<Integer> wrap(PipelineHelper<Integer> ph,
152                                                   Supplier<Spliterator<P_IN>> supplier,
153                                                   boolean isParallel) {
154         return new StreamSpliterators.IntWrappingSpliterator<>(ph, supplier, isParallel);
155     }
156 
157     @Override
158     @SuppressWarnings("unchecked")
159     // Android-changed: Make public, to match the method it's overriding.
lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier)160     public final Spliterator.OfInt lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier) {
161         return new StreamSpliterators.DelegatingSpliterator.OfInt((Supplier<Spliterator.OfInt>) supplier);
162     }
163 
164     @Override
165     // Android-changed: Make public, to match the method it's overriding.
forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink)166     public final void forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) {
167         Spliterator.OfInt spl = adapt(spliterator);
168         IntConsumer adaptedSink = adapt(sink);
169         do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
170     }
171 
172     @Override
173     // Android-changed: Make public, to match the method it's overriding.
makeNodeBuilder(long exactSizeIfKnown, IntFunction<Integer[]> generator)174     public final Node.Builder<Integer> makeNodeBuilder(long exactSizeIfKnown,
175                                                        IntFunction<Integer[]> generator) {
176         return Nodes.intBuilder(exactSizeIfKnown);
177     }
178 
179 
180     // IntStream
181 
182     @Override
iterator()183     public final PrimitiveIterator.OfInt iterator() {
184         return Spliterators.iterator(spliterator());
185     }
186 
187     @Override
spliterator()188     public final Spliterator.OfInt spliterator() {
189         return adapt(super.spliterator());
190     }
191 
192     // Stateless intermediate ops from IntStream
193 
194     @Override
asLongStream()195     public final LongStream asLongStream() {
196         return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
197                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
198             @Override
199             public Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
200                 return new Sink.ChainedInt<Long>(sink) {
201                     @Override
202                     public void accept(int t) {
203                         downstream.accept((long) t);
204                     }
205                 };
206             }
207         };
208     }
209 
210     @Override
211     public final DoubleStream asDoubleStream() {
212         return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
213                                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
214             @Override
215             // Android-changed: Make public, to match the method it's overriding.
216             public Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
217                 return new Sink.ChainedInt<Double>(sink) {
218                     @Override
219                     public void accept(int t) {
220                         downstream.accept((double) t);
221                     }
222                 };
223             }
224         };
225     }
226 
227     @Override
228     public final Stream<Integer> boxed() {
229         return mapToObj(Integer::valueOf);
230     }
231 
232     @Override
233     public final IntStream map(IntUnaryOperator mapper) {
234         Objects.requireNonNull(mapper);
235         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
236                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
237             @Override
238             // Android-changed: Make public, to match the method it's overriding.
239             public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
240                 return new Sink.ChainedInt<Integer>(sink) {
241                     @Override
242                     public void accept(int t) {
243                         downstream.accept(mapper.applyAsInt(t));
244                     }
245                 };
246             }
247         };
248     }
249 
250     @Override
251     public final <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) {
252         Objects.requireNonNull(mapper);
253         return new ReferencePipeline.StatelessOp<Integer, U>(this, StreamShape.INT_VALUE,
254                                                              StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
255             @Override
256             // Android-changed: Make public, to match the method it's overriding.
257             public Sink<Integer> opWrapSink(int flags, Sink<U> sink) {
258                 return new Sink.ChainedInt<U>(sink) {
259                     @Override
260                     public void accept(int t) {
261                         downstream.accept(mapper.apply(t));
262                     }
263                 };
264             }
265         };
266     }
267 
268     @Override
269     public final LongStream mapToLong(IntToLongFunction mapper) {
270         Objects.requireNonNull(mapper);
271         return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
272                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
273             @Override
274             // Android-changed: Make public, to match the method it's overriding.
275             public Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
276                 return new Sink.ChainedInt<Long>(sink) {
277                     @Override
278                     public void accept(int t) {
279                         downstream.accept(mapper.applyAsLong(t));
280                     }
281                 };
282             }
283         };
284     }
285 
286     @Override
287     public final DoubleStream mapToDouble(IntToDoubleFunction mapper) {
288         Objects.requireNonNull(mapper);
289         return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
290                                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
291             @Override
292             // Android-changed: Make public, to match the method it's overriding.
293             public Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
294                 return new Sink.ChainedInt<Double>(sink) {
295                     @Override
296                     public void accept(int t) {
297                         downstream.accept(mapper.applyAsDouble(t));
298                     }
299                 };
300             }
301         };
302     }
303 
304     @Override
305     public final IntStream flatMap(IntFunction<? extends IntStream> mapper) {
306         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
307                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
308             @Override
309             // Android-changed: Make public, to match the method it's overriding.
310             public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
311                 return new Sink.ChainedInt<Integer>(sink) {
312                     @Override
313                     public void begin(long size) {
314                         downstream.begin(-1);
315                     }
316 
317                     @Override
318                     public void accept(int t) {
319                         try (IntStream result = mapper.apply(t)) {
320                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
321                             if (result != null)
322                                 result.sequential().forEach(i -> downstream.accept(i));
323                         }
324                     }
325                 };
326             }
327         };
328     }
329 
330     @Override
331     public IntStream unordered() {
332         if (!isOrdered())
333             return this;
334         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, StreamOpFlag.NOT_ORDERED) {
335             @Override
336             // Android-changed: Make public, to match the method it's overriding.
337             public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
338                 return sink;
339             }
340         };
341     }
342 
343     @Override
344     public final IntStream filter(IntPredicate predicate) {
345         Objects.requireNonNull(predicate);
346         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
347                                         StreamOpFlag.NOT_SIZED) {
348             @Override
349             // Android-changed: Make public, to match the method it's overriding.
350             public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
351                 return new Sink.ChainedInt<Integer>(sink) {
352                     @Override
353                     public void begin(long size) {
354                         downstream.begin(-1);
355                     }
356 
357                     @Override
358                     public void accept(int t) {
359                         if (predicate.test(t))
360                             downstream.accept(t);
361                     }
362                 };
363             }
364         };
365     }
366 
367     @Override
368     public final IntStream peek(IntConsumer action) {
369         Objects.requireNonNull(action);
370         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
371                                         0) {
372             @Override
373             // Android-changed: Make public, to match the method it's overriding.
374             public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
375                 return new Sink.ChainedInt<Integer>(sink) {
376                     @Override
377                     public void accept(int t) {
378                         action.accept(t);
379                         downstream.accept(t);
380                     }
381                 };
382             }
383         };
384     }
385 
386     // Stateful intermediate ops from IntStream
387 
388     @Override
389     public final IntStream limit(long maxSize) {
390         if (maxSize < 0)
391             throw new IllegalArgumentException(Long.toString(maxSize));
392         return SliceOps.makeInt(this, 0, maxSize);
393     }
394 
395     @Override
396     public final IntStream skip(long n) {
397         if (n < 0)
398             throw new IllegalArgumentException(Long.toString(n));
399         if (n == 0)
400             return this;
401         else
402             return SliceOps.makeInt(this, n, -1);
403     }
404 
405     @Override
406     public final IntStream sorted() {
407         return SortedOps.makeInt(this);
408     }
409 
410     @Override
411     public final IntStream distinct() {
412         // While functional and quick to implement, this approach is not very efficient.
413         // An efficient version requires an int-specific map/set implementation.
414         return boxed().distinct().mapToInt(i -> i);
415     }
416 
417     // Terminal ops from IntStream
418 
419     @Override
420     public void forEach(IntConsumer action) {
421         evaluate(ForEachOps.makeInt(action, false));
422     }
423 
424     @Override
425     public void forEachOrdered(IntConsumer action) {
426         evaluate(ForEachOps.makeInt(action, true));
427     }
428 
429     @Override
430     public final int sum() {
431         return reduce(0, Integer::sum);
432     }
433 
434     @Override
435     public final OptionalInt min() {
436         return reduce(Math::min);
437     }
438 
439     @Override
440     public final OptionalInt max() {
441         return reduce(Math::max);
442     }
443 
444     @Override
445     public final long count() {
446         return mapToLong(e -> 1L).sum();
447     }
448 
449     @Override
450     public final OptionalDouble average() {
451         long[] avg = collect(() -> new long[2],
452                              (ll, i) -> {
453                                  ll[0]++;
454                                  ll[1] += i;
455                              },
456                              (ll, rr) -> {
457                                  ll[0] += rr[0];
458                                  ll[1] += rr[1];
459                              });
460         return avg[0] > 0
461                ? OptionalDouble.of((double) avg[1] / avg[0])
462                : OptionalDouble.empty();
463     }
464 
465     @Override
466     public final IntSummaryStatistics summaryStatistics() {
467         return collect(IntSummaryStatistics::new, IntSummaryStatistics::accept,
468                        IntSummaryStatistics::combine);
469     }
470 
471     @Override
472     public final int reduce(int identity, IntBinaryOperator op) {
473         return evaluate(ReduceOps.makeInt(identity, op));
474     }
475 
476     @Override
477     public final OptionalInt reduce(IntBinaryOperator op) {
478         return evaluate(ReduceOps.makeInt(op));
479     }
480 
481     @Override
482     public final <R> R collect(Supplier<R> supplier,
483                                ObjIntConsumer<R> accumulator,
484                                BiConsumer<R, R> combiner) {
485         BinaryOperator<R> operator = (left, right) -> {
486             combiner.accept(left, right);
487             return left;
488         };
489         return evaluate(ReduceOps.makeInt(supplier, accumulator, operator));
490     }
491 
492     @Override
493     public final boolean anyMatch(IntPredicate predicate) {
494         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ANY));
495     }
496 
497     @Override
498     public final boolean allMatch(IntPredicate predicate) {
499         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ALL));
500     }
501 
502     @Override
503     public final boolean noneMatch(IntPredicate predicate) {
504         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.NONE));
505     }
506 
507     @Override
508     public final OptionalInt findFirst() {
509         return evaluate(FindOps.makeInt(true));
510     }
511 
512     @Override
513     public final OptionalInt findAny() {
514         return evaluate(FindOps.makeInt(false));
515     }
516 
517     @Override
518     public final int[] toArray() {
519         return Nodes.flattenInt((Node.OfInt) evaluateToArrayNode(Integer[]::new))
520                         .asPrimitiveArray();
521     }
522 
523     //
524 
525     /**
526      * Source stage of an IntStream.
527      *
528      * @param <E_IN> type of elements in the upstream source
529      * @since 1.8
530      * @hide Visible for CTS testing only (OpenJDK8 tests).
531      */
532     // Android-changed: Made public for CTS tests only.
533     public static class Head<E_IN> extends IntPipeline<E_IN> {
534         /**
535          * Constructor for the source stage of an IntStream.
536          *
537          * @param source {@code Supplier<Spliterator>} describing the stream
538          *               source
539          * @param sourceFlags the source flags for the stream source, described
540          *                    in {@link StreamOpFlag}
541          * @param parallel {@code true} if the pipeline is parallel
542          */
543         // Android-changed: Made public for CTS tests only.
544         public Head(Supplier<? extends Spliterator<Integer>> source,
545              int sourceFlags, boolean parallel) {
546             super(source, sourceFlags, parallel);
547         }
548 
549         /**
550          * Constructor for the source stage of an IntStream.
551          *
552          * @param source {@code Spliterator} describing the stream source
553          * @param sourceFlags the source flags for the stream source, described
554          *                    in {@link StreamOpFlag}
555          * @param parallel {@code true} if the pipeline is parallel
556          */
557         // Android-changed: Made public for CTS tests only.
558         public Head(Spliterator<Integer> source,
559              int sourceFlags, boolean parallel) {
560             super(source, sourceFlags, parallel);
561         }
562 
563         @Override
564         // Android-changed: Make public, to match the method it's overriding.
565         public final boolean opIsStateful() {
566             throw new UnsupportedOperationException();
567         }
568 
569         @Override
570         // Android-changed: Make public, to match the method it's overriding.
571         public final Sink<E_IN> opWrapSink(int flags, Sink<Integer> sink) {
572             throw new UnsupportedOperationException();
573         }
574 
575         // Optimized sequential terminal operations for the head of the pipeline
576 
577         @Override
578         public void forEach(IntConsumer action) {
579             if (!isParallel()) {
580                 adapt(sourceStageSpliterator()).forEachRemaining(action);
581             }
582             else {
583                 super.forEach(action);
584             }
585         }
586 
587         @Override
588         public void forEachOrdered(IntConsumer action) {
589             if (!isParallel()) {
590                 adapt(sourceStageSpliterator()).forEachRemaining(action);
591             }
592             else {
593                 super.forEachOrdered(action);
594             }
595         }
596     }
597 
598     /**
599      * Base class for a stateless intermediate stage of an IntStream
600      *
601      * @param <E_IN> type of elements in the upstream source
602      * @since 1.8
603      * @hide Visible for CTS testing only (OpenJDK8 tests).
604      */
605     // Android-changed: Made public for CTS tests only.
606     public abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN> {
607         /**
608          * Construct a new IntStream by appending a stateless intermediate
609          * operation to an existing stream.
610          * @param upstream The upstream pipeline stage
611          * @param inputShape The stream shape for the upstream pipeline stage
612          * @param opFlags Operation flags for the new stage
613          */
614         // Android-changed: Made public for CTS tests only.
615         public StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
616                     StreamShape inputShape,
617                     int opFlags) {
618             super(upstream, opFlags);
619             assert upstream.getOutputShape() == inputShape;
620         }
621 
622         @Override
623         // Android-changed: Make public, to match the method it's overriding.
624         public final boolean opIsStateful() {
625             return false;
626         }
627     }
628 
629     /**
630      * Base class for a stateful intermediate stage of an IntStream.
631      *
632      * @param <E_IN> type of elements in the upstream source
633      * @since 1.8
634      * @hide Visible for CTS testing only (OpenJDK8 tests).
635      */
636     // Android-changed: Made public for CTS tests only.
637     public abstract static class StatefulOp<E_IN> extends IntPipeline<E_IN> {
638         /**
639          * Construct a new IntStream by appending a stateful intermediate
640          * operation to an existing stream.
641          * @param upstream The upstream pipeline stage
642          * @param inputShape The stream shape for the upstream pipeline stage
643          * @param opFlags Operation flags for the new stage
644          */
645         // Android-changed: Made public for CTS tests only.
646         public StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
647                    StreamShape inputShape,
648                    int opFlags) {
649             super(upstream, opFlags);
650             assert upstream.getOutputShape() == inputShape;
651         }
652 
653         @Override
654         // Android-changed: Make public, to match the method it's overriding.
655         public final boolean opIsStateful() {
656             return true;
657         }
658 
659         @Override
660         // Android-changed: Make public, to match the method it's overriding.
661         public abstract <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
662                                                          Spliterator<P_IN> spliterator,
663                                                          IntFunction<Integer[]> generator);
664     }
665 }
666