1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.HashSet; 28 import java.util.LinkedHashSet; 29 import java.util.Objects; 30 import java.util.Set; 31 import java.util.Spliterator; 32 import java.util.concurrent.ConcurrentHashMap; 33 import java.util.concurrent.atomic.AtomicBoolean; 34 import java.util.function.IntFunction; 35 36 /** 37 * Factory methods for transforming streams into duplicate-free streams, using 38 * {@link Object#equals(Object)} to determine equality. 39 * 40 * @since 1.8 41 */ 42 final class DistinctOps { 43 DistinctOps()44 private DistinctOps() { } 45 46 /** 47 * Appends a "distinct" operation to the provided stream, and returns the 48 * new stream. 49 * 50 * @param <T> the type of both input and output elements 51 * @param upstream a reference stream with element type T 52 * @return the new stream 53 */ makeRef(AbstractPipeline<?, T, ?> upstream)54 static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) { 55 return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE, 56 StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) { 57 58 <P_IN> Node<T> reduce(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { 59 // If the stream is SORTED then it should also be ORDERED so the following will also 60 // preserve the sort order 61 TerminalOp<T, LinkedHashSet<T>> reduceOp 62 = ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add, 63 LinkedHashSet::addAll); 64 return Nodes.node(reduceOp.evaluateParallel(helper, spliterator)); 65 } 66 67 @Override 68 public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, 69 Spliterator<P_IN> spliterator, 70 IntFunction<T[]> generator) { 71 if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) { 72 // No-op 73 return helper.evaluate(spliterator, false, generator); 74 } 75 else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 76 return reduce(helper, spliterator); 77 } 78 else { 79 // Holder of null state since ConcurrentHashMap does not support null values 80 AtomicBoolean seenNull = new AtomicBoolean(false); 81 ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap<>(); 82 TerminalOp<T, Void> forEachOp = ForEachOps.makeRef(t -> { 83 if (t == null) 84 seenNull.set(true); 85 else 86 map.putIfAbsent(t, Boolean.TRUE); 87 }, false); 88 forEachOp.evaluateParallel(helper, spliterator); 89 90 // If null has been seen then copy the key set into a HashSet that supports null values 91 // and add null 92 Set<T> keys = map.keySet(); 93 if (seenNull.get()) { 94 // TODO Implement a more efficient set-union view, rather than copying 95 keys = new HashSet<>(keys); 96 keys.add(null); 97 } 98 return Nodes.node(keys); 99 } 100 } 101 102 @Override 103 public <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { 104 if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) { 105 // No-op 106 return helper.wrapSpliterator(spliterator); 107 } 108 else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 109 // Not lazy, barrier required to preserve order 110 return reduce(helper, spliterator).spliterator(); 111 } 112 else { 113 // Lazy 114 return new StreamSpliterators.DistinctSpliterator<>(helper.wrapSpliterator(spliterator)); 115 } 116 } 117 118 @Override 119 public Sink<T> opWrapSink(int flags, Sink<T> sink) { 120 Objects.requireNonNull(sink); 121 122 if (StreamOpFlag.DISTINCT.isKnown(flags)) { 123 return sink; 124 } else if (StreamOpFlag.SORTED.isKnown(flags)) { 125 return new Sink.ChainedReference<T, T>(sink) { 126 boolean seenNull; 127 T lastSeen; 128 129 @Override 130 public void begin(long size) { 131 seenNull = false; 132 lastSeen = null; 133 downstream.begin(-1); 134 } 135 136 @Override 137 public void end() { 138 seenNull = false; 139 lastSeen = null; 140 downstream.end(); 141 } 142 143 @Override 144 public void accept(T t) { 145 if (t == null) { 146 if (!seenNull) { 147 seenNull = true; 148 downstream.accept(lastSeen = null); 149 } 150 } else if (lastSeen == null || !t.equals(lastSeen)) { 151 downstream.accept(lastSeen = t); 152 } 153 } 154 }; 155 } else { 156 return new Sink.ChainedReference<T, T>(sink) { 157 Set<T> seen; 158 159 @Override 160 public void begin(long size) { 161 seen = new HashSet<>(); 162 downstream.begin(-1); 163 } 164 165 @Override 166 public void end() { 167 seen = null; 168 downstream.end(); 169 } 170 171 @Override 172 public void accept(T t) { 173 if (!seen.contains(t)) { 174 seen.add(t); 175 downstream.accept(t); 176 } 177 } 178 }; 179 } 180 } 181 }; 182 } 183 } 184