• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package org.apache.commons.lang3;
18 
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.Set;
24 import java.util.function.BiConsumer;
25 import java.util.function.BinaryOperator;
26 import java.util.function.Consumer;
27 import java.util.function.Function;
28 import java.util.function.Predicate;
29 import java.util.function.Supplier;
30 import java.util.stream.Collector;
31 import java.util.stream.Collectors;
32 import java.util.stream.Stream;
33 
34 import org.apache.commons.lang3.Functions.FailableConsumer;
35 import org.apache.commons.lang3.Functions.FailableFunction;
36 import org.apache.commons.lang3.Functions.FailablePredicate;
37 
38 /**
39  * Provides utility functions, and classes for working with the
40  * {@code java.util.stream} package, or more generally, with Java 8 lambdas. More
41  * specifically, it attempts to address the fact that lambdas are supposed
42  * not to throw Exceptions, at least not checked Exceptions, AKA instances
43  * of {@link Exception}. This enforces the use of constructs like
44  * <pre>
45  *     Consumer&lt;java.lang.reflect.Method&gt; consumer = m -&gt; {
46  *         try {
47  *             m.invoke(o, args);
48  *         } catch (Throwable t) {
49  *             throw Functions.rethrow(t);
50  *         }
51  *    };
52  *    stream.forEach(consumer);
53  * </pre>
54  * Using a {@link FailableStream}, this can be rewritten as follows:
55  * <pre>
56  *     Streams.failable(stream).forEach((m) -&gt; m.invoke(o, args));
57  * </pre>
58  * Obviously, the second version is much more concise and the spirit of
59  * Lambda expressions is met better than in the first version.
60  *
61  * @see Stream
62  * @see Functions
63  * @since 3.10
64  * @deprecated Use {@link org.apache.commons.lang3.stream.Streams}.
65  */
66 @Deprecated
67 public class Streams {
68 
69     /**
70      * A reduced, and simplified version of a {@link Stream} with
71      * failable method signatures.
72      * @param <O> The streams element type.
73      * @deprecated Use {@link org.apache.commons.lang3.stream.Streams.FailableStream}.
74      */
75     @Deprecated
76     public static class FailableStream<O> {
77 
78         private Stream<O> stream;
79         private boolean terminated;
80 
81         /**
82          * Constructs a new instance with the given {@code stream}.
83          * @param stream The stream.
84          */
FailableStream(final Stream<O> stream)85         public FailableStream(final Stream<O> stream) {
86             this.stream = stream;
87         }
88 
89         /**
90          * Throws IllegalStateException if this stream is already terminated.
91          *
92          * @throws IllegalStateException if this stream is already terminated.
93          */
assertNotTerminated()94         protected void assertNotTerminated() {
95             if (terminated) {
96                 throw new IllegalStateException("This stream is already terminated.");
97             }
98         }
99 
100         /**
101          * Marks this stream as terminated.
102          *
103          * @throws IllegalStateException if this stream is already terminated.
104          */
makeTerminated()105         protected void makeTerminated() {
106             assertNotTerminated();
107             terminated = true;
108         }
109 
110         /**
111          * Returns a FailableStream consisting of the elements of this stream that match
112          * the given FailablePredicate.
113          *
114          * <p>
115          * This is an intermediate operation.
116          * </p>
117          *
118          * @param predicate a non-interfering, stateless predicate to apply to each
119          * element to determine if it should be included.
120          * @return the new stream
121          */
filter(final FailablePredicate<O, ?> predicate)122         public FailableStream<O> filter(final FailablePredicate<O, ?> predicate){
123             assertNotTerminated();
124             stream = stream.filter(Functions.asPredicate(predicate));
125             return this;
126         }
127 
128         /**
129          * Performs an action for each element of this stream.
130          *
131          * <p>
132          * This is an intermediate operation.
133          * </p>
134          *
135          * <p>
136          * The behavior of this operation is explicitly nondeterministic.
137          * For parallel stream pipelines, this operation does <em>not</em>
138          * guarantee to respect the encounter order of the stream, as doing so
139          * would sacrifice the benefit of parallelism.  For any given element, the
140          * action may be performed at whatever time and in whatever thread the
141          * library chooses.  If the action accesses shared state, it is
142          * responsible for providing the required synchronization.
143          * </p>
144          *
145          * @param action a non-interfering action to perform on the elements
146          */
forEach(final FailableConsumer<O, ?> action)147         public void forEach(final FailableConsumer<O, ?> action) {
148             makeTerminated();
149             stream().forEach(Functions.asConsumer(action));
150         }
151 
152         /**
153          * Performs a mutable reduction operation on the elements of this stream using a
154          * {@link Collector}.  A {@link Collector}
155          * encapsulates the functions used as arguments to
156          * {@link #collect(Supplier, BiConsumer, BiConsumer)}, allowing for reuse of
157          * collection strategies and composition of collect operations such as
158          * multiple-level grouping or partitioning.
159          *
160          * <p>
161          * If the underlying stream is parallel, and the {@link Collector}
162          * is concurrent, and either the stream is unordered or the collector is
163          * unordered, then a concurrent reduction will be performed
164          * (see {@link Collector} for details on concurrent reduction.)
165          * </p>
166          *
167          * <p>
168          * This is an intermediate operation.
169          * </p>
170          *
171          * <p>
172          * When executed in parallel, multiple intermediate results may be
173          * instantiated, populated, and merged so as to maintain isolation of
174          * mutable data structures.  Therefore, even when executed in parallel
175          * with non-thread-safe data structures (such as {@link ArrayList}), no
176          * additional synchronization is needed for a parallel reduction.
177          * </p>
178          * <p>
179          * Note
180          * The following will accumulate strings into an ArrayList:
181          * </p>
182          * <pre>{@code
183          *     List<String> asList = stringStream.collect(Collectors.toList());
184          * }</pre>
185          *
186          * <p>
187          * The following will classify {@code Person} objects by city:
188          * </p>
189          * <pre>{@code
190          *     Map<String, List<Person>> peopleByCity
191          *         = personStream.collect(Collectors.groupingBy(Person::getCity));
192          * }</pre>
193          *
194          * <p>
195          * The following will classify {@code Person} objects by state and city,
196          * cascading two {@link Collector}s together:
197          * </p>
198          * <pre>{@code
199          *     Map<String, Map<String, List<Person>>> peopleByStateAndCity
200          *         = personStream.collect(Collectors.groupingBy(Person::getState,
201          *                                                      Collectors.groupingBy(Person::getCity)));
202          * }</pre>
203          *
204          * @param <R> the type of the result
205          * @param <A> the intermediate accumulation type of the {@link Collector}
206          * @param collector the {@link Collector} describing the reduction
207          * @return the result of the reduction
208          * @see #collect(Supplier, BiConsumer, BiConsumer)
209          * @see Collectors
210          */
collect(final Collector<? super O, A, R> collector)211         public <A, R> R collect(final Collector<? super O, A, R> collector) {
212             makeTerminated();
213             return stream().collect(collector);
214         }
215 
216         /**
217          * Performs a mutable reduction operation on the elements of this FailableStream.
218          * A mutable reduction is one in which the reduced value is a mutable result
219          * container, such as an {@link ArrayList}, and elements are incorporated by updating
220          * the state of the result rather than by replacing the result. This produces a result equivalent to:
221          * <pre>{@code
222          *     R result = supplier.get();
223          *     for (T element : this stream)
224          *         accumulator.accept(result, element);
225          *     return result;
226          * }</pre>
227          *
228          * <p>
229          * Like {@link #reduce(Object, BinaryOperator)}, {@code collect} operations
230          * can be parallelized without requiring additional synchronization.
231          * </p>
232          *
233          * <p>
234          * This is an intermediate operation.
235          * </p>
236          *
237          * <p>
238          * Note There are many existing classes in the JDK whose signatures are
239          * well-suited for use with method references as arguments to {@code collect()}.
240          * For example, the following will accumulate strings into an {@link ArrayList}:
241          * </p>
242          * <pre>{@code
243          *     List<String> asList = stringStream.collect(ArrayList::new, ArrayList::add,
244          *                                                ArrayList::addAll);
245          * }</pre>
246          *
247          * <p>
248          * The following will take a stream of strings and concatenates them into a
249          * single string:
250          * </p>
251          * <pre>{@code
252          *     String concat = stringStream.collect(StringBuilder::new, StringBuilder::append,
253          *                                          StringBuilder::append)
254          *                                 .toString();
255          * }</pre>
256          *
257          * @param <R> type of the result
258          * @param <A> Type of the accumulator.
259          * @param supplier a function that creates a new result container. For a
260          *                 parallel execution, this function may be called
261          *                 multiple times and must return a fresh value each time.
262          * @param accumulator An associative, non-interfering, stateless function for
263          *   incorporating an additional element into a result
264          * @param combiner An associative, non-interfering, stateless
265          *   function for combining two values, which must be compatible with the
266          *   accumulator function
267          * @return The result of the reduction
268          */
collect(final Supplier<R> supplier, final BiConsumer<R, ? super O> accumulator, final BiConsumer<R, R> combiner)269         public <A, R> R collect(final Supplier<R> supplier, final BiConsumer<R, ? super O> accumulator, final BiConsumer<R, R> combiner) {
270             makeTerminated();
271             return stream().collect(supplier, accumulator, combiner);
272         }
273 
274         /**
275          * Performs a reduction on the elements of this stream, using the provided
276          * identity value and an associative accumulation function, and returns
277          * the reduced value.  This is equivalent to:
278          * <pre>{@code
279          *     T result = identity;
280          *     for (T element : this stream)
281          *         result = accumulator.apply(result, element)
282          *     return result;
283          * }</pre>
284          *
285          * but is not constrained to execute sequentially.
286          *
287          * <p>
288          * The {@code identity} value must be an identity for the accumulator
289          * function. This means that for all {@code t},
290          * {@code accumulator.apply(identity, t)} is equal to {@code t}.
291          * The {@code accumulator} function must be an associative function.
292          * </p>
293          *
294          * <p>
295          * This is an intermediate operation.
296          * </p>
297          *
298          * Note Sum, min, max, average, and string concatenation are all special
299          * cases of reduction. Summing a stream of numbers can be expressed as:
300          *
301          * <pre>{@code
302          *     Integer sum = integers.reduce(0, (a, b) -> a+b);
303          * }</pre>
304          *
305          * or:
306          *
307          * <pre>{@code
308          *     Integer sum = integers.reduce(0, Integer::sum);
309          * }</pre>
310          *
311          * <p>
312          * While this may seem a more roundabout way to perform an aggregation
313          * compared to simply mutating a running total in a loop, reduction
314          * operations parallelize more gracefully, without needing additional
315          * synchronization and with greatly reduced risk of data races.
316          * </p>
317          *
318          * @param identity the identity value for the accumulating function
319          * @param accumulator an associative, non-interfering, stateless
320          *                    function for combining two values
321          * @return the result of the reduction
322          */
reduce(final O identity, final BinaryOperator<O> accumulator)323         public O reduce(final O identity, final BinaryOperator<O> accumulator) {
324             makeTerminated();
325             return stream().reduce(identity, accumulator);
326         }
327 
328         /**
329          * Returns a stream consisting of the results of applying the given
330          * function to the elements of this stream.
331          *
332          * <p>
333          * This is an intermediate operation.
334          * </p>
335          *
336          * @param <R> The element type of the new stream
337          * @param mapper A non-interfering, stateless function to apply to each element
338          * @return the new stream
339          */
map(final FailableFunction<O, R, ?> mapper)340         public <R> FailableStream<R> map(final FailableFunction<O, R, ?> mapper) {
341             assertNotTerminated();
342             return new FailableStream<>(stream.map(Functions.asFunction(mapper)));
343         }
344 
345         /**
346          * Converts the FailableStream into an equivalent stream.
347          * @return A stream, which will return the same elements, which this FailableStream would return.
348          */
stream()349         public Stream<O> stream() {
350             return stream;
351         }
352 
353         /**
354          * Returns whether all elements of this stream match the provided predicate.
355          * May not evaluate the predicate on all elements if not necessary for
356          * determining the result.  If the stream is empty then {@code true} is
357          * returned and the predicate is not evaluated.
358          *
359          * <p>
360          * This is a short-circuiting terminal operation.
361          * </p>
362          *
363          * <p>
364          * Note
365          * This method evaluates the <em>universal quantification</em> of the
366          * predicate over the elements of the stream (for all x P(x)).  If the
367          * stream is empty, the quantification is said to be <em>vacuously
368          * satisfied</em> and is always {@code true} (regardless of P(x)).
369          * </p>
370          *
371          * @param predicate A non-interfering, stateless predicate to apply to
372          * elements of this stream
373          * @return {@code true} If either all elements of the stream match the
374          * provided predicate or the stream is empty, otherwise {@code false}.
375          */
allMatch(final FailablePredicate<O, ?> predicate)376         public boolean allMatch(final FailablePredicate<O, ?> predicate) {
377             assertNotTerminated();
378             return stream().allMatch(Functions.asPredicate(predicate));
379         }
380 
381         /**
382          * Returns whether any elements of this stream match the provided
383          * predicate.  May not evaluate the predicate on all elements if not
384          * necessary for determining the result.  If the stream is empty then
385          * {@code false} is returned and the predicate is not evaluated.
386          *
387          * <p>
388          * This is a short-circuiting terminal operation.
389          * </p>
390          *
391          * Note
392          * This method evaluates the <em>existential quantification</em> of the
393          * predicate over the elements of the stream (for some x P(x)).
394          *
395          * @param predicate A non-interfering, stateless predicate to apply to
396          * elements of this stream
397          * @return {@code true} if any elements of the stream match the provided
398          * predicate, otherwise {@code false}
399          */
anyMatch(final FailablePredicate<O, ?> predicate)400         public boolean anyMatch(final FailablePredicate<O, ?> predicate) {
401             assertNotTerminated();
402             return stream().anyMatch(Functions.asPredicate(predicate));
403         }
404     }
405 
406     /**
407      * Converts the given {@link Stream stream} into a {@link FailableStream}.
408      * This is basically a simplified, reduced version of the {@link Stream}
409      * class, with the same underlying element stream, except that failable
410      * objects, like {@link FailablePredicate}, {@link FailableFunction}, or
411      * {@link FailableConsumer} may be applied, instead of
412      * {@link Predicate}, {@link Function}, or {@link Consumer}. The idea is
413      * to rewrite a code snippet like this:
414      * <pre>
415      *     final List&lt;O&gt; list;
416      *     final Method m;
417      *     final Function&lt;O,String&gt; mapper = (o) -&gt; {
418      *         try {
419      *             return (String) m.invoke(o);
420      *         } catch (Throwable t) {
421      *             throw Functions.rethrow(t);
422      *         }
423      *     };
424      *     final List&lt;String&gt; strList = list.stream()
425      *         .map(mapper).collect(Collectors.toList());
426      *  </pre>
427      *  as follows:
428      *  <pre>
429      *     final List&lt;O&gt; list;
430      *     final Method m;
431      *     final List&lt;String&gt; strList = Functions.stream(list.stream())
432      *         .map((o) -&gt; (String) m.invoke(o)).collect(Collectors.toList());
433      *  </pre>
434      *  While the second version may not be <em>quite</em> as
435      *  efficient (because it depends on the creation of additional,
436      *  intermediate objects, of type FailableStream), it is much more
437      *  concise, and readable, and meets the spirit of Lambdas better
438      *  than the first version.
439      * @param <O> The streams element type.
440      * @param stream The stream, which is being converted.
441      * @return The {@link FailableStream}, which has been created by
442      *   converting the stream.
443      */
stream(final Stream<O> stream)444     public static <O> FailableStream<O> stream(final Stream<O> stream) {
445         return new FailableStream<>(stream);
446     }
447 
448     /**
449      * Converts the given {@link Collection} into a {@link FailableStream}.
450      * This is basically a simplified, reduced version of the {@link Stream}
451      * class, with the same underlying element stream, except that failable
452      * objects, like {@link FailablePredicate}, {@link FailableFunction}, or
453      * {@link FailableConsumer} may be applied, instead of
454      * {@link Predicate}, {@link Function}, or {@link Consumer}. The idea is
455      * to rewrite a code snippet like this:
456      * <pre>
457      *     final List&lt;O&gt; list;
458      *     final Method m;
459      *     final Function&lt;O,String&gt; mapper = (o) -&gt; {
460      *         try {
461      *             return (String) m.invoke(o);
462      *         } catch (Throwable t) {
463      *             throw Functions.rethrow(t);
464      *         }
465      *     };
466      *     final List&lt;String&gt; strList = list.stream()
467      *         .map(mapper).collect(Collectors.toList());
468      *  </pre>
469      *  as follows:
470      *  <pre>
471      *     final List&lt;O&gt; list;
472      *     final Method m;
473      *     final List&lt;String&gt; strList = Functions.stream(list.stream())
474      *         .map((o) -&gt; (String) m.invoke(o)).collect(Collectors.toList());
475      *  </pre>
476      *  While the second version may not be <em>quite</em> as
477      *  efficient (because it depends on the creation of additional,
478      *  intermediate objects, of type FailableStream), it is much more
479      *  concise, and readable, and meets the spirit of Lambdas better
480      *  than the first version.
481      * @param <O> The streams element type.
482      * @param stream The stream, which is being converted.
483      * @return The {@link FailableStream}, which has been created by
484      *   converting the stream.
485      */
stream(final Collection<O> stream)486     public static <O> FailableStream<O> stream(final Collection<O> stream) {
487         return stream(stream.stream());
488     }
489 
490     /**
491      * A Collector type for arrays.
492      *
493      * @param <O> The array type.
494      * @deprecated Use {@link org.apache.commons.lang3.stream.Streams.ArrayCollector}.
495      */
496     @Deprecated
497     public static class ArrayCollector<O> implements Collector<O, List<O>, O[]> {
498         private static final Set<Characteristics> characteristics = Collections.emptySet();
499         private final Class<O> elementType;
500 
501         /**
502          * Constructs a new instance for the given element type.
503          *
504          * @param elementType The element type.
505          */
ArrayCollector(final Class<O> elementType)506         public ArrayCollector(final Class<O> elementType) {
507             this.elementType = elementType;
508         }
509 
510         @Override
supplier()511         public Supplier<List<O>> supplier() {
512             return ArrayList::new;
513         }
514 
515         @Override
accumulator()516         public BiConsumer<List<O>, O> accumulator() {
517             return List::add;
518         }
519 
520         @Override
combiner()521         public BinaryOperator<List<O>> combiner() {
522             return (left, right) -> {
523                 left.addAll(right);
524                 return left;
525             };
526         }
527 
528         @Override
finisher()529         public Function<List<O>, O[]> finisher() {
530             return list -> list.toArray(ArrayUtils.newInstance(elementType, list.size()));
531         }
532 
533         @Override
characteristics()534         public Set<Characteristics> characteristics() {
535             return characteristics;
536         }
537     }
538 
539     /**
540      * Returns a {@link Collector} that accumulates the input elements into a
541      * new array.
542      *
543      * @param pElementType Type of an element in the array.
544      * @param <O> the type of the input elements
545      * @return a {@link Collector} which collects all the input elements into an
546      * array, in encounter order
547      */
toArray(final Class<O> pElementType)548     public static <O> Collector<O, ?, O[]> toArray(final Class<O> pElementType) {
549         return new ArrayCollector<>(pElementType);
550     }
551 }
552