1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Spliterator; 28 import java.util.concurrent.atomic.AtomicReference; 29 30 /** 31 * Abstract class for fork-join tasks used to implement short-circuiting 32 * stream ops, which can produce a result without processing all elements of the 33 * stream. 34 * 35 * @param <P_IN> type of input elements to the pipeline 36 * @param <P_OUT> type of output elements from the pipeline 37 * @param <R> type of intermediate result, may be different from operation 38 * result type 39 * @param <K> type of child and sibling tasks 40 * @since 1.8 41 */ 42 @SuppressWarnings("serial") 43 abstract class AbstractShortCircuitTask<P_IN, P_OUT, R, 44 K extends AbstractShortCircuitTask<P_IN, P_OUT, R, K>> 45 extends AbstractTask<P_IN, P_OUT, R, K> { 46 /** 47 * The result for this computation; this is shared among all tasks and set 48 * exactly once 49 */ 50 protected final AtomicReference<R> sharedResult; 51 52 /** 53 * Indicates whether this task has been canceled. Tasks may cancel other 54 * tasks in the computation under various conditions, such as in a 55 * find-first operation, a task that finds a value will cancel all tasks 56 * that are later in the encounter order. 57 */ 58 protected volatile boolean canceled; 59 60 /** 61 * Constructor for root tasks. 62 * 63 * @param helper the {@code PipelineHelper} describing the stream pipeline 64 * up to this operation 65 * @param spliterator the {@code Spliterator} describing the source for this 66 * pipeline 67 */ AbstractShortCircuitTask(PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator)68 protected AbstractShortCircuitTask(PipelineHelper<P_OUT> helper, 69 Spliterator<P_IN> spliterator) { 70 super(helper, spliterator); 71 sharedResult = new AtomicReference<>(null); 72 } 73 74 /** 75 * Constructor for non-root nodes. 76 * 77 * @param parent parent task in the computation tree 78 * @param spliterator the {@code Spliterator} for the portion of the 79 * computation tree described by this task 80 */ AbstractShortCircuitTask(K parent, Spliterator<P_IN> spliterator)81 protected AbstractShortCircuitTask(K parent, 82 Spliterator<P_IN> spliterator) { 83 super(parent, spliterator); 84 sharedResult = parent.sharedResult; 85 } 86 87 /** 88 * Returns the value indicating the computation completed with no task 89 * finding a short-circuitable result. For example, for a "find" operation, 90 * this might be null or an empty {@code Optional}. 91 * 92 * @return the result to return when no task finds a result 93 */ getEmptyResult()94 protected abstract R getEmptyResult(); 95 96 /** 97 * Overrides AbstractTask version to include checks for early 98 * exits while splitting or computing. 99 */ 100 @Override compute()101 public void compute() { 102 Spliterator<P_IN> rs = spliterator, ls; 103 long sizeEstimate = rs.estimateSize(); 104 long sizeThreshold = getTargetSize(sizeEstimate); 105 boolean forkRight = false; 106 @SuppressWarnings("unchecked") K task = (K) this; 107 AtomicReference<R> sr = sharedResult; 108 R result; 109 while ((result = sr.get()) == null) { 110 if (task.taskCanceled()) { 111 result = task.getEmptyResult(); 112 break; 113 } 114 if (sizeEstimate <= sizeThreshold || (ls = rs.trySplit()) == null) { 115 result = task.doLeaf(); 116 break; 117 } 118 K leftChild, rightChild, taskToFork; 119 task.leftChild = leftChild = task.makeChild(ls); 120 task.rightChild = rightChild = task.makeChild(rs); 121 task.setPendingCount(1); 122 if (forkRight) { 123 forkRight = false; 124 rs = ls; 125 task = leftChild; 126 taskToFork = rightChild; 127 } 128 else { 129 forkRight = true; 130 task = rightChild; 131 taskToFork = leftChild; 132 } 133 taskToFork.fork(); 134 sizeEstimate = rs.estimateSize(); 135 } 136 task.setLocalResult(result); 137 task.tryComplete(); 138 } 139 140 141 /** 142 * Declares that a globally valid result has been found. If another task has 143 * not already found the answer, the result is installed in 144 * {@code sharedResult}. The {@code compute()} method will check 145 * {@code sharedResult} before proceeding with computation, so this causes 146 * the computation to terminate early. 147 * 148 * @param result the result found 149 */ shortCircuit(R result)150 protected void shortCircuit(R result) { 151 if (result != null) 152 sharedResult.compareAndSet(null, result); 153 } 154 155 /** 156 * Sets a local result for this task. If this task is the root, set the 157 * shared result instead (if not already set). 158 * 159 * @param localResult The result to set for this task 160 */ 161 @Override setLocalResult(R localResult)162 protected void setLocalResult(R localResult) { 163 if (isRoot()) { 164 if (localResult != null) 165 sharedResult.compareAndSet(null, localResult); 166 } 167 else 168 super.setLocalResult(localResult); 169 } 170 171 /** 172 * Retrieves the local result for this task 173 */ 174 @Override getRawResult()175 public R getRawResult() { 176 return getLocalResult(); 177 } 178 179 /** 180 * Retrieves the local result for this task. If this task is the root, 181 * retrieves the shared result instead. 182 */ 183 @Override getLocalResult()184 public R getLocalResult() { 185 if (isRoot()) { 186 R answer = sharedResult.get(); 187 return (answer == null) ? getEmptyResult() : answer; 188 } 189 else 190 return super.getLocalResult(); 191 } 192 193 /** 194 * Mark this task as canceled 195 */ cancel()196 protected void cancel() { 197 canceled = true; 198 } 199 200 /** 201 * Queries whether this task is canceled. A task is considered canceled if 202 * it or any of its parents have been canceled. 203 * 204 * @return {@code true} if this task or any parent is canceled. 205 */ taskCanceled()206 protected boolean taskCanceled() { 207 boolean cancel = canceled; 208 if (!cancel) { 209 for (K parent = getParent(); !cancel && parent != null; parent = parent.getParent()) 210 cancel = parent.canceled; 211 } 212 213 return cancel; 214 } 215 216 /** 217 * Cancels all tasks which succeed this one in the encounter order. This 218 * includes canceling all the current task's right sibling, as well as the 219 * later right siblings of all its parents. 220 */ cancelLaterNodes()221 protected void cancelLaterNodes() { 222 // Go up the tree, cancel right siblings of this node and all parents 223 for (@SuppressWarnings("unchecked") K parent = getParent(), node = (K) this; 224 parent != null; 225 node = parent, parent = parent.getParent()) { 226 // If node is a left child of parent, then has a right sibling 227 if (parent.leftChild == node) { 228 K rightSibling = parent.rightChild; 229 if (!rightSibling.canceled) 230 rightSibling.cancel(); 231 } 232 } 233 } 234 } 235