1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Spliterator; 28 import java.util.concurrent.CountedCompleter; 29 import java.util.function.IntFunction; 30 31 /** 32 * Factory for instances of a short-circuiting stateful intermediate operations 33 * that produce subsequences of their input stream. 34 * 35 * @since 1.8 36 */ 37 final class SliceOps { 38 39 // No instances SliceOps()40 private SliceOps() { } 41 42 /** 43 * Calculates the sliced size given the current size, number of elements 44 * skip, and the number of elements to limit. 45 * 46 * @param size the current size 47 * @param skip the number of elements to skip, assumed to be >= 0 48 * @param limit the number of elements to limit, assumed to be >= 0, with 49 * a value of {@code Long.MAX_VALUE} if there is no limit 50 * @return the sliced size 51 */ calcSize(long size, long skip, long limit)52 private static long calcSize(long size, long skip, long limit) { 53 return size >= 0 ? Math.max(-1, Math.min(size - skip, limit)) : -1; 54 } 55 56 /** 57 * Calculates the slice fence, which is one past the index of the slice 58 * range 59 * @param skip the number of elements to skip, assumed to be >= 0 60 * @param limit the number of elements to limit, assumed to be >= 0, with 61 * a value of {@code Long.MAX_VALUE} if there is no limit 62 * @return the slice fence. 63 */ calcSliceFence(long skip, long limit)64 private static long calcSliceFence(long skip, long limit) { 65 long sliceFence = limit >= 0 ? skip + limit : Long.MAX_VALUE; 66 // Check for overflow 67 return (sliceFence >= 0) ? sliceFence : Long.MAX_VALUE; 68 } 69 70 /** 71 * Creates a slice spliterator given a stream shape governing the 72 * spliterator type. Requires that the underlying Spliterator 73 * be SUBSIZED. 74 */ 75 @SuppressWarnings("unchecked") sliceSpliterator(StreamShape shape, Spliterator<P_IN> s, long skip, long limit)76 private static <P_IN> Spliterator<P_IN> sliceSpliterator(StreamShape shape, 77 Spliterator<P_IN> s, 78 long skip, long limit) { 79 assert s.hasCharacteristics(Spliterator.SUBSIZED); 80 long sliceFence = calcSliceFence(skip, limit); 81 switch (shape) { 82 case REFERENCE: 83 return new StreamSpliterators 84 .SliceSpliterator.OfRef<>(s, skip, sliceFence); 85 case INT_VALUE: 86 return (Spliterator<P_IN>) new StreamSpliterators 87 .SliceSpliterator.OfInt((Spliterator.OfInt) s, skip, sliceFence); 88 case LONG_VALUE: 89 return (Spliterator<P_IN>) new StreamSpliterators 90 .SliceSpliterator.OfLong((Spliterator.OfLong) s, skip, sliceFence); 91 case DOUBLE_VALUE: 92 return (Spliterator<P_IN>) new StreamSpliterators 93 .SliceSpliterator.OfDouble((Spliterator.OfDouble) s, skip, sliceFence); 94 default: 95 throw new IllegalStateException("Unknown shape " + shape); 96 } 97 } 98 99 @SuppressWarnings("unchecked") castingArray()100 private static <T> IntFunction<T[]> castingArray() { 101 return size -> (T[]) new Object[size]; 102 } 103 104 /** 105 * Appends a "slice" operation to the provided stream. The slice operation 106 * may be may be skip-only, limit-only, or skip-and-limit. 107 * 108 * @param <T> the type of both input and output elements 109 * @param upstream a reference stream with element type T 110 * @param skip the number of elements to skip. Must be >= 0. 111 * @param limit the maximum size of the resulting stream, or -1 if no limit 112 * is to be imposed 113 */ makeRef(AbstractPipeline<?, T, ?> upstream, long skip, long limit)114 public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream, 115 long skip, long limit) { 116 if (skip < 0) 117 throw new IllegalArgumentException("Skip must be non-negative: " + skip); 118 119 return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE, 120 flags(limit)) { 121 Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s, 122 long skip, long limit, long sizeIfKnown) { 123 if (skip <= sizeIfKnown) { 124 // Use just the limit if the number of elements 125 // to skip is <= the known pipeline size 126 limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; 127 skip = 0; 128 } 129 return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit); 130 } 131 132 @Override 133 public <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { 134 long size = helper.exactOutputSizeIfKnown(spliterator); 135 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 136 return new StreamSpliterators.SliceSpliterator.OfRef<>( 137 helper.wrapSpliterator(spliterator), 138 skip, 139 calcSliceFence(skip, limit)); 140 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 141 return unorderedSkipLimitSpliterator( 142 helper.wrapSpliterator(spliterator), 143 skip, limit, size); 144 } 145 else { 146 // @@@ OOMEs will occur for LongStream.longs().filter(i -> true).limit(n) 147 // regardless of the value of n 148 // Need to adjust the target size of splitting for the 149 // SliceTask from say (size / k) to say min(size / k, 1 << 14) 150 // This will limit the size of the buffers created at the leaf nodes 151 // cancellation will be more aggressive cancelling later tasks 152 // if the target slice size has been reached from a given task, 153 // cancellation should also clear local results if any 154 return new SliceTask<>(this, helper, spliterator, castingArray(), skip, limit). 155 invoke().spliterator(); 156 } 157 } 158 159 @Override 160 public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, 161 Spliterator<P_IN> spliterator, 162 IntFunction<T[]> generator) { 163 long size = helper.exactOutputSizeIfKnown(spliterator); 164 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 165 // Because the pipeline is SIZED the slice spliterator 166 // can be created from the source, this requires matching 167 // to shape of the source, and is potentially more efficient 168 // than creating the slice spliterator from the pipeline 169 // wrapping spliterator 170 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); 171 return Nodes.collect(helper, s, true, generator); 172 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 173 Spliterator<T> s = unorderedSkipLimitSpliterator( 174 helper.wrapSpliterator(spliterator), 175 skip, limit, size); 176 // Collect using this pipeline, which is empty and therefore 177 // can be used with the pipeline wrapping spliterator 178 // Note that we cannot create a slice spliterator from 179 // the source spliterator if the pipeline is not SIZED 180 return Nodes.collect(this, s, true, generator); 181 } 182 else { 183 return new SliceTask<>(this, helper, spliterator, generator, skip, limit). 184 invoke(); 185 } 186 } 187 188 @Override 189 public Sink<T> opWrapSink(int flags, Sink<T> sink) { 190 return new Sink.ChainedReference<T, T>(sink) { 191 long n = skip; 192 long m = limit >= 0 ? limit : Long.MAX_VALUE; 193 194 @Override 195 public void begin(long size) { 196 downstream.begin(calcSize(size, skip, m)); 197 } 198 199 @Override 200 public void accept(T t) { 201 if (n == 0) { 202 if (m > 0) { 203 m--; 204 downstream.accept(t); 205 } 206 } 207 else { 208 n--; 209 } 210 } 211 212 @Override 213 public boolean cancellationRequested() { 214 return m == 0 || downstream.cancellationRequested(); 215 } 216 }; 217 } 218 }; 219 } 220 221 /** 222 * Appends a "slice" operation to the provided IntStream. The slice 223 * operation may be may be skip-only, limit-only, or skip-and-limit. 224 * 225 * @param upstream An IntStream 226 * @param skip The number of elements to skip. Must be >= 0. 227 * @param limit The maximum size of the resulting stream, or -1 if no limit 228 * is to be imposed 229 */ 230 public static IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream, 231 long skip, long limit) { 232 if (skip < 0) 233 throw new IllegalArgumentException("Skip must be non-negative: " + skip); 234 235 return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE, 236 flags(limit)) { 237 Spliterator.OfInt unorderedSkipLimitSpliterator( 238 Spliterator.OfInt s, long skip, long limit, long sizeIfKnown) { 239 if (skip <= sizeIfKnown) { 240 // Use just the limit if the number of elements 241 // to skip is <= the known pipeline size 242 limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; 243 skip = 0; 244 } 245 return new StreamSpliterators.UnorderedSliceSpliterator.OfInt(s, skip, limit); 246 } 247 248 @Override 249 public <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper, 250 Spliterator<P_IN> spliterator) { 251 long size = helper.exactOutputSizeIfKnown(spliterator); 252 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 253 return new StreamSpliterators.SliceSpliterator.OfInt( 254 (Spliterator.OfInt) helper.wrapSpliterator(spliterator), 255 skip, 256 calcSliceFence(skip, limit)); 257 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 258 return unorderedSkipLimitSpliterator( 259 (Spliterator.OfInt) helper.wrapSpliterator(spliterator), 260 skip, limit, size); 261 } 262 else { 263 return new SliceTask<>(this, helper, spliterator, Integer[]::new, skip, limit). 264 invoke().spliterator(); 265 } 266 } 267 268 @Override 269 public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, 270 Spliterator<P_IN> spliterator, 271 IntFunction<Integer[]> generator) { 272 long size = helper.exactOutputSizeIfKnown(spliterator); 273 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 274 // Because the pipeline is SIZED the slice spliterator 275 // can be created from the source, this requires matching 276 // to shape of the source, and is potentially more efficient 277 // than creating the slice spliterator from the pipeline 278 // wrapping spliterator 279 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); 280 return Nodes.collectInt(helper, s, true); 281 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 282 Spliterator.OfInt s = unorderedSkipLimitSpliterator( 283 (Spliterator.OfInt) helper.wrapSpliterator(spliterator), 284 skip, limit, size); 285 // Collect using this pipeline, which is empty and therefore 286 // can be used with the pipeline wrapping spliterator 287 // Note that we cannot create a slice spliterator from 288 // the source spliterator if the pipeline is not SIZED 289 return Nodes.collectInt(this, s, true); 290 } 291 else { 292 return new SliceTask<>(this, helper, spliterator, generator, skip, limit). 293 invoke(); 294 } 295 } 296 297 @Override 298 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 299 return new Sink.ChainedInt<Integer>(sink) { 300 long n = skip; 301 long m = limit >= 0 ? limit : Long.MAX_VALUE; 302 303 @Override 304 public void begin(long size) { 305 downstream.begin(calcSize(size, skip, m)); 306 } 307 308 @Override 309 public void accept(int t) { 310 if (n == 0) { 311 if (m > 0) { 312 m--; 313 downstream.accept(t); 314 } 315 } 316 else { 317 n--; 318 } 319 } 320 321 @Override 322 public boolean cancellationRequested() { 323 return m == 0 || downstream.cancellationRequested(); 324 } 325 }; 326 } 327 }; 328 } 329 330 /** 331 * Appends a "slice" operation to the provided LongStream. The slice 332 * operation may be may be skip-only, limit-only, or skip-and-limit. 333 * 334 * @param upstream A LongStream 335 * @param skip The number of elements to skip. Must be >= 0. 336 * @param limit The maximum size of the resulting stream, or -1 if no limit 337 * is to be imposed 338 */ 339 public static LongStream makeLong(AbstractPipeline<?, Long, ?> upstream, 340 long skip, long limit) { 341 if (skip < 0) 342 throw new IllegalArgumentException("Skip must be non-negative: " + skip); 343 344 return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE, 345 flags(limit)) { 346 Spliterator.OfLong unorderedSkipLimitSpliterator( 347 Spliterator.OfLong s, long skip, long limit, long sizeIfKnown) { 348 if (skip <= sizeIfKnown) { 349 // Use just the limit if the number of elements 350 // to skip is <= the known pipeline size 351 limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; 352 skip = 0; 353 } 354 return new StreamSpliterators.UnorderedSliceSpliterator.OfLong(s, skip, limit); 355 } 356 357 @Override 358 public <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper, 359 Spliterator<P_IN> spliterator) { 360 long size = helper.exactOutputSizeIfKnown(spliterator); 361 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 362 return new StreamSpliterators.SliceSpliterator.OfLong( 363 (Spliterator.OfLong) helper.wrapSpliterator(spliterator), 364 skip, 365 calcSliceFence(skip, limit)); 366 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 367 return unorderedSkipLimitSpliterator( 368 (Spliterator.OfLong) helper.wrapSpliterator(spliterator), 369 skip, limit, size); 370 } 371 else { 372 return new SliceTask<>(this, helper, spliterator, Long[]::new, skip, limit). 373 invoke().spliterator(); 374 } 375 } 376 377 @Override 378 public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, 379 Spliterator<P_IN> spliterator, 380 IntFunction<Long[]> generator) { 381 long size = helper.exactOutputSizeIfKnown(spliterator); 382 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 383 // Because the pipeline is SIZED the slice spliterator 384 // can be created from the source, this requires matching 385 // to shape of the source, and is potentially more efficient 386 // than creating the slice spliterator from the pipeline 387 // wrapping spliterator 388 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); 389 return Nodes.collectLong(helper, s, true); 390 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 391 Spliterator.OfLong s = unorderedSkipLimitSpliterator( 392 (Spliterator.OfLong) helper.wrapSpliterator(spliterator), 393 skip, limit, size); 394 // Collect using this pipeline, which is empty and therefore 395 // can be used with the pipeline wrapping spliterator 396 // Note that we cannot create a slice spliterator from 397 // the source spliterator if the pipeline is not SIZED 398 return Nodes.collectLong(this, s, true); 399 } 400 else { 401 return new SliceTask<>(this, helper, spliterator, generator, skip, limit). 402 invoke(); 403 } 404 } 405 406 @Override 407 public Sink<Long> opWrapSink(int flags, Sink<Long> sink) { 408 return new Sink.ChainedLong<Long>(sink) { 409 long n = skip; 410 long m = limit >= 0 ? limit : Long.MAX_VALUE; 411 412 @Override 413 public void begin(long size) { 414 downstream.begin(calcSize(size, skip, m)); 415 } 416 417 @Override 418 public void accept(long t) { 419 if (n == 0) { 420 if (m > 0) { 421 m--; 422 downstream.accept(t); 423 } 424 } 425 else { 426 n--; 427 } 428 } 429 430 @Override 431 public boolean cancellationRequested() { 432 return m == 0 || downstream.cancellationRequested(); 433 } 434 }; 435 } 436 }; 437 } 438 439 /** 440 * Appends a "slice" operation to the provided DoubleStream. The slice 441 * operation may be may be skip-only, limit-only, or skip-and-limit. 442 * 443 * @param upstream A DoubleStream 444 * @param skip The number of elements to skip. Must be >= 0. 445 * @param limit The maximum size of the resulting stream, or -1 if no limit 446 * is to be imposed 447 */ 448 public static DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream, 449 long skip, long limit) { 450 if (skip < 0) 451 throw new IllegalArgumentException("Skip must be non-negative: " + skip); 452 453 return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE, 454 flags(limit)) { 455 Spliterator.OfDouble unorderedSkipLimitSpliterator( 456 Spliterator.OfDouble s, long skip, long limit, long sizeIfKnown) { 457 if (skip <= sizeIfKnown) { 458 // Use just the limit if the number of elements 459 // to skip is <= the known pipeline size 460 limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; 461 skip = 0; 462 } 463 return new StreamSpliterators.UnorderedSliceSpliterator.OfDouble(s, skip, limit); 464 } 465 466 @Override 467 public <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper, 468 Spliterator<P_IN> spliterator) { 469 long size = helper.exactOutputSizeIfKnown(spliterator); 470 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 471 return new StreamSpliterators.SliceSpliterator.OfDouble( 472 (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), 473 skip, 474 calcSliceFence(skip, limit)); 475 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 476 return unorderedSkipLimitSpliterator( 477 (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), 478 skip, limit, size); 479 } 480 else { 481 return new SliceTask<>(this, helper, spliterator, Double[]::new, skip, limit). 482 invoke().spliterator(); 483 } 484 } 485 486 @Override 487 public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, 488 Spliterator<P_IN> spliterator, 489 IntFunction<Double[]> generator) { 490 long size = helper.exactOutputSizeIfKnown(spliterator); 491 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 492 // Because the pipeline is SIZED the slice spliterator 493 // can be created from the source, this requires matching 494 // to shape of the source, and is potentially more efficient 495 // than creating the slice spliterator from the pipeline 496 // wrapping spliterator 497 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); 498 return Nodes.collectDouble(helper, s, true); 499 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 500 Spliterator.OfDouble s = unorderedSkipLimitSpliterator( 501 (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), 502 skip, limit, size); 503 // Collect using this pipeline, which is empty and therefore 504 // can be used with the pipeline wrapping spliterator 505 // Note that we cannot create a slice spliterator from 506 // the source spliterator if the pipeline is not SIZED 507 return Nodes.collectDouble(this, s, true); 508 } 509 else { 510 return new SliceTask<>(this, helper, spliterator, generator, skip, limit). 511 invoke(); 512 } 513 } 514 515 @Override 516 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 517 return new Sink.ChainedDouble<Double>(sink) { 518 long n = skip; 519 long m = limit >= 0 ? limit : Long.MAX_VALUE; 520 521 @Override 522 public void begin(long size) { 523 downstream.begin(calcSize(size, skip, m)); 524 } 525 526 @Override 527 public void accept(double t) { 528 if (n == 0) { 529 if (m > 0) { 530 m--; 531 downstream.accept(t); 532 } 533 } 534 else { 535 n--; 536 } 537 } 538 539 @Override 540 public boolean cancellationRequested() { 541 return m == 0 || downstream.cancellationRequested(); 542 } 543 }; 544 } 545 }; 546 } 547 548 private static int flags(long limit) { 549 return StreamOpFlag.NOT_SIZED | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0); 550 } 551 552 /** 553 * {@code ForkJoinTask} implementing slice computation. 554 * 555 * @param <P_IN> Input element type to the stream pipeline 556 * @param <P_OUT> Output element type from the stream pipeline 557 */ 558 @SuppressWarnings("serial") 559 private static final class SliceTask<P_IN, P_OUT> 560 extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, SliceTask<P_IN, P_OUT>> { 561 private final AbstractPipeline<P_OUT, P_OUT, ?> op; 562 private final IntFunction<P_OUT[]> generator; 563 private final long targetOffset, targetSize; 564 private long thisNodeSize; 565 566 private volatile boolean completed; 567 568 SliceTask(AbstractPipeline<P_OUT, P_OUT, ?> op, 569 PipelineHelper<P_OUT> helper, 570 Spliterator<P_IN> spliterator, 571 IntFunction<P_OUT[]> generator, 572 long offset, long size) { 573 super(helper, spliterator); 574 this.op = op; 575 this.generator = generator; 576 this.targetOffset = offset; 577 this.targetSize = size; 578 } 579 580 SliceTask(SliceTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) { 581 super(parent, spliterator); 582 this.op = parent.op; 583 this.generator = parent.generator; 584 this.targetOffset = parent.targetOffset; 585 this.targetSize = parent.targetSize; 586 } 587 588 @Override 589 protected SliceTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) { 590 return new SliceTask<>(this, spliterator); 591 } 592 593 @Override 594 protected final Node<P_OUT> getEmptyResult() { 595 return Nodes.emptyNode(op.getOutputShape()); 596 } 597 598 @Override 599 protected final Node<P_OUT> doLeaf() { 600 if (isRoot()) { 601 long sizeIfKnown = StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags) 602 ? op.exactOutputSizeIfKnown(spliterator) 603 : -1; 604 final Node.Builder<P_OUT> nb = op.makeNodeBuilder(sizeIfKnown, generator); 605 Sink<P_OUT> opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb); 606 helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator); 607 // There is no need to truncate since the op performs the 608 // skipping and limiting of elements 609 return nb.build(); 610 } 611 else { 612 Node<P_OUT> node = helper.wrapAndCopyInto(helper.makeNodeBuilder(-1, generator), 613 spliterator).build(); 614 thisNodeSize = node.count(); 615 completed = true; 616 spliterator = null; 617 return node; 618 } 619 } 620 621 @Override 622 public final void onCompletion(CountedCompleter<?> caller) { 623 if (!isLeaf()) { 624 Node<P_OUT> result; 625 thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize; 626 if (canceled) { 627 thisNodeSize = 0; 628 result = getEmptyResult(); 629 } 630 else if (thisNodeSize == 0) 631 result = getEmptyResult(); 632 else if (leftChild.thisNodeSize == 0) 633 result = rightChild.getLocalResult(); 634 else { 635 result = Nodes.conc(op.getOutputShape(), 636 leftChild.getLocalResult(), rightChild.getLocalResult()); 637 } 638 setLocalResult(isRoot() ? doTruncate(result) : result); 639 completed = true; 640 } 641 if (targetSize >= 0 642 && !isRoot() 643 && isLeftCompleted(targetOffset + targetSize)) 644 cancelLaterNodes(); 645 646 super.onCompletion(caller); 647 } 648 649 @Override 650 protected void cancel() { 651 super.cancel(); 652 if (completed) 653 setLocalResult(getEmptyResult()); 654 } 655 656 private Node<P_OUT> doTruncate(Node<P_OUT> input) { 657 long to = targetSize >= 0 ? Math.min(input.count(), targetOffset + targetSize) : thisNodeSize; 658 return input.truncate(targetOffset, to, generator); 659 } 660 661 /** 662 * Determine if the number of completed elements in this node and nodes 663 * to the left of this node is greater than or equal to the target size. 664 * 665 * @param target the target size 666 * @return true if the number of elements is greater than or equal to 667 * the target size, otherwise false. 668 */ 669 private boolean isLeftCompleted(long target) { 670 long size = completed ? thisNodeSize : completedSize(target); 671 if (size >= target) 672 return true; 673 for (SliceTask<P_IN, P_OUT> parent = getParent(), node = this; 674 parent != null; 675 node = parent, parent = parent.getParent()) { 676 if (node == parent.rightChild) { 677 SliceTask<P_IN, P_OUT> left = parent.leftChild; 678 if (left != null) { 679 size += left.completedSize(target); 680 if (size >= target) 681 return true; 682 } 683 } 684 } 685 return size >= target; 686 } 687 688 /** 689 * Compute the number of completed elements in this node. 690 * <p> 691 * Computation terminates if all nodes have been processed or the 692 * number of completed elements is greater than or equal to the target 693 * size. 694 * 695 * @param target the target size 696 * @return return the number of completed elements 697 */ 698 private long completedSize(long target) { 699 if (completed) 700 return thisNodeSize; 701 else { 702 SliceTask<P_IN, P_OUT> left = leftChild; 703 SliceTask<P_IN, P_OUT> right = rightChild; 704 if (left == null || right == null) { 705 // must be completed 706 return thisNodeSize; 707 } 708 else { 709 long leftSize = left.completedSize(target); 710 return (leftSize >= target) ? leftSize : leftSize + right.completedSize(target); 711 } 712 } 713 } 714 } 715 } 716