1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Spliterator; 28 import java.util.concurrent.CountedCompleter; 29 import java.util.function.IntFunction; 30 31 /** 32 * Factory for instances of a short-circuiting stateful intermediate operations 33 * that produce subsequences of their input stream. 34 * 35 * @since 1.8 36 */ 37 final class SliceOps { 38 39 // No instances SliceOps()40 private SliceOps() { } 41 42 /** 43 * Calculates the sliced size given the current size, number of elements 44 * skip, and the number of elements to limit. 45 * 46 * @param size the current size 47 * @param skip the number of elements to skip, assumed to be >= 0 48 * @param limit the number of elements to limit, assumed to be >= 0, with 49 * a value of {@code Long.MAX_VALUE} if there is no limit 50 * @return the sliced size 51 */ calcSize(long size, long skip, long limit)52 private static long calcSize(long size, long skip, long limit) { 53 return size >= 0 ? Math.max(-1, Math.min(size - skip, limit)) : -1; 54 } 55 56 /** 57 * Calculates the slice fence, which is one past the index of the slice 58 * range 59 * @param skip the number of elements to skip, assumed to be >= 0 60 * @param limit the number of elements to limit, assumed to be >= 0, with 61 * a value of {@code Long.MAX_VALUE} if there is no limit 62 * @return the slice fence. 63 */ calcSliceFence(long skip, long limit)64 private static long calcSliceFence(long skip, long limit) { 65 long sliceFence = limit >= 0 ? skip + limit : Long.MAX_VALUE; 66 // Check for overflow 67 return (sliceFence >= 0) ? sliceFence : Long.MAX_VALUE; 68 } 69 70 /** 71 * Creates a slice spliterator given a stream shape governing the 72 * spliterator type. Requires that the underlying Spliterator 73 * be SUBSIZED. 74 */ 75 @SuppressWarnings("unchecked") sliceSpliterator(StreamShape shape, Spliterator<P_IN> s, long skip, long limit)76 private static <P_IN> Spliterator<P_IN> sliceSpliterator(StreamShape shape, 77 Spliterator<P_IN> s, 78 long skip, long limit) { 79 assert s.hasCharacteristics(Spliterator.SUBSIZED); 80 long sliceFence = calcSliceFence(skip, limit); 81 switch (shape) { 82 case REFERENCE: 83 return new StreamSpliterators 84 .SliceSpliterator.OfRef<>(s, skip, sliceFence); 85 case INT_VALUE: 86 return (Spliterator<P_IN>) new StreamSpliterators 87 .SliceSpliterator.OfInt((Spliterator.OfInt) s, skip, sliceFence); 88 case LONG_VALUE: 89 return (Spliterator<P_IN>) new StreamSpliterators 90 .SliceSpliterator.OfLong((Spliterator.OfLong) s, skip, sliceFence); 91 case DOUBLE_VALUE: 92 return (Spliterator<P_IN>) new StreamSpliterators 93 .SliceSpliterator.OfDouble((Spliterator.OfDouble) s, skip, sliceFence); 94 default: 95 throw new IllegalStateException("Unknown shape " + shape); 96 } 97 } 98 99 @SuppressWarnings("unchecked") castingArray()100 private static <T> IntFunction<T[]> castingArray() { 101 return size -> (T[]) new Object[size]; 102 } 103 104 /** 105 * Appends a "slice" operation to the provided stream. The slice operation 106 * may be may be skip-only, limit-only, or skip-and-limit. 107 * 108 * @param <T> the type of both input and output elements 109 * @param upstream a reference stream with element type T 110 * @param skip the number of elements to skip. Must be >= 0. 111 * @param limit the maximum size of the resulting stream, or -1 if no limit 112 * is to be imposed 113 */ makeRef(AbstractPipeline<?, T, ?> upstream, long skip, long limit)114 public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream, 115 long skip, long limit) { 116 if (skip < 0) 117 throw new IllegalArgumentException("Skip must be non-negative: " + skip); 118 119 return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE, 120 flags(limit)) { 121 Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s, 122 long skip, long limit, long sizeIfKnown) { 123 if (skip <= sizeIfKnown) { 124 // Use just the limit if the number of elements 125 // to skip is <= the known pipeline size 126 limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; 127 skip = 0; 128 } 129 return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit); 130 } 131 132 @Override 133 // Android-changed: Make public, to match the method it's overriding. 134 public <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { 135 long size = helper.exactOutputSizeIfKnown(spliterator); 136 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 137 return new StreamSpliterators.SliceSpliterator.OfRef<>( 138 helper.wrapSpliterator(spliterator), 139 skip, 140 calcSliceFence(skip, limit)); 141 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 142 return unorderedSkipLimitSpliterator( 143 helper.wrapSpliterator(spliterator), 144 skip, limit, size); 145 } 146 else { 147 // @@@ OOMEs will occur for LongStream.longs().filter(i -> true).limit(n) 148 // regardless of the value of n 149 // Need to adjust the target size of splitting for the 150 // SliceTask from say (size / k) to say min(size / k, 1 << 14) 151 // This will limit the size of the buffers created at the leaf nodes 152 // cancellation will be more aggressive cancelling later tasks 153 // if the target slice size has been reached from a given task, 154 // cancellation should also clear local results if any 155 return new SliceTask<>(this, helper, spliterator, castingArray(), skip, limit). 156 invoke().spliterator(); 157 } 158 } 159 160 @Override 161 // Android-changed: Make public, to match the method it's overriding. 162 public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, 163 Spliterator<P_IN> spliterator, 164 IntFunction<T[]> generator) { 165 long size = helper.exactOutputSizeIfKnown(spliterator); 166 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 167 // Because the pipeline is SIZED the slice spliterator 168 // can be created from the source, this requires matching 169 // to shape of the source, and is potentially more efficient 170 // than creating the slice spliterator from the pipeline 171 // wrapping spliterator 172 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); 173 return Nodes.collect(helper, s, true, generator); 174 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 175 Spliterator<T> s = unorderedSkipLimitSpliterator( 176 helper.wrapSpliterator(spliterator), 177 skip, limit, size); 178 // Collect using this pipeline, which is empty and therefore 179 // can be used with the pipeline wrapping spliterator 180 // Note that we cannot create a slice spliterator from 181 // the source spliterator if the pipeline is not SIZED 182 return Nodes.collect(this, s, true, generator); 183 } 184 else { 185 return new SliceTask<>(this, helper, spliterator, generator, skip, limit). 186 invoke(); 187 } 188 } 189 190 @Override 191 // Android-changed: Make public, to match the method it's overriding. 192 public Sink<T> opWrapSink(int flags, Sink<T> sink) { 193 return new Sink.ChainedReference<T, T>(sink) { 194 long n = skip; 195 long m = limit >= 0 ? limit : Long.MAX_VALUE; 196 197 @Override 198 public void begin(long size) { 199 downstream.begin(calcSize(size, skip, m)); 200 } 201 202 @Override 203 public void accept(T t) { 204 if (n == 0) { 205 if (m > 0) { 206 m--; 207 downstream.accept(t); 208 } 209 } 210 else { 211 n--; 212 } 213 } 214 215 @Override 216 public boolean cancellationRequested() { 217 return m == 0 || downstream.cancellationRequested(); 218 } 219 }; 220 } 221 }; 222 } 223 224 /** 225 * Appends a "slice" operation to the provided IntStream. The slice 226 * operation may be may be skip-only, limit-only, or skip-and-limit. 227 * 228 * @param upstream An IntStream 229 * @param skip The number of elements to skip. Must be >= 0. 230 * @param limit The maximum size of the resulting stream, or -1 if no limit 231 * is to be imposed 232 */ 233 public static IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream, 234 long skip, long limit) { 235 if (skip < 0) 236 throw new IllegalArgumentException("Skip must be non-negative: " + skip); 237 238 return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE, 239 flags(limit)) { 240 Spliterator.OfInt unorderedSkipLimitSpliterator( 241 Spliterator.OfInt s, long skip, long limit, long sizeIfKnown) { 242 if (skip <= sizeIfKnown) { 243 // Use just the limit if the number of elements 244 // to skip is <= the known pipeline size 245 limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; 246 skip = 0; 247 } 248 return new StreamSpliterators.UnorderedSliceSpliterator.OfInt(s, skip, limit); 249 } 250 251 @Override 252 // Android-changed: Make public, to match the method it's overriding. 253 public <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper, 254 Spliterator<P_IN> spliterator) { 255 long size = helper.exactOutputSizeIfKnown(spliterator); 256 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 257 return new StreamSpliterators.SliceSpliterator.OfInt( 258 (Spliterator.OfInt) helper.wrapSpliterator(spliterator), 259 skip, 260 calcSliceFence(skip, limit)); 261 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 262 return unorderedSkipLimitSpliterator( 263 (Spliterator.OfInt) helper.wrapSpliterator(spliterator), 264 skip, limit, size); 265 } 266 else { 267 return new SliceTask<>(this, helper, spliterator, Integer[]::new, skip, limit). 268 invoke().spliterator(); 269 } 270 } 271 272 @Override 273 // Android-changed: Make public, to match the method it's overriding. 274 public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, 275 Spliterator<P_IN> spliterator, 276 IntFunction<Integer[]> generator) { 277 long size = helper.exactOutputSizeIfKnown(spliterator); 278 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 279 // Because the pipeline is SIZED the slice spliterator 280 // can be created from the source, this requires matching 281 // to shape of the source, and is potentially more efficient 282 // than creating the slice spliterator from the pipeline 283 // wrapping spliterator 284 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); 285 return Nodes.collectInt(helper, s, true); 286 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 287 Spliterator.OfInt s = unorderedSkipLimitSpliterator( 288 (Spliterator.OfInt) helper.wrapSpliterator(spliterator), 289 skip, limit, size); 290 // Collect using this pipeline, which is empty and therefore 291 // can be used with the pipeline wrapping spliterator 292 // Note that we cannot create a slice spliterator from 293 // the source spliterator if the pipeline is not SIZED 294 return Nodes.collectInt(this, s, true); 295 } 296 else { 297 return new SliceTask<>(this, helper, spliterator, generator, skip, limit). 298 invoke(); 299 } 300 } 301 302 @Override 303 // Android-changed: Make public, to match the method it's overriding. 304 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 305 return new Sink.ChainedInt<Integer>(sink) { 306 long n = skip; 307 long m = limit >= 0 ? limit : Long.MAX_VALUE; 308 309 @Override 310 public void begin(long size) { 311 downstream.begin(calcSize(size, skip, m)); 312 } 313 314 @Override 315 public void accept(int t) { 316 if (n == 0) { 317 if (m > 0) { 318 m--; 319 downstream.accept(t); 320 } 321 } 322 else { 323 n--; 324 } 325 } 326 327 @Override 328 public boolean cancellationRequested() { 329 return m == 0 || downstream.cancellationRequested(); 330 } 331 }; 332 } 333 }; 334 } 335 336 /** 337 * Appends a "slice" operation to the provided LongStream. The slice 338 * operation may be may be skip-only, limit-only, or skip-and-limit. 339 * 340 * @param upstream A LongStream 341 * @param skip The number of elements to skip. Must be >= 0. 342 * @param limit The maximum size of the resulting stream, or -1 if no limit 343 * is to be imposed 344 */ 345 public static LongStream makeLong(AbstractPipeline<?, Long, ?> upstream, 346 long skip, long limit) { 347 if (skip < 0) 348 throw new IllegalArgumentException("Skip must be non-negative: " + skip); 349 350 return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE, 351 flags(limit)) { 352 Spliterator.OfLong unorderedSkipLimitSpliterator( 353 Spliterator.OfLong s, long skip, long limit, long sizeIfKnown) { 354 if (skip <= sizeIfKnown) { 355 // Use just the limit if the number of elements 356 // to skip is <= the known pipeline size 357 limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; 358 skip = 0; 359 } 360 return new StreamSpliterators.UnorderedSliceSpliterator.OfLong(s, skip, limit); 361 } 362 363 @Override 364 // Android-changed: Make public, to match the method it's overriding. 365 public <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper, 366 Spliterator<P_IN> spliterator) { 367 long size = helper.exactOutputSizeIfKnown(spliterator); 368 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 369 return new StreamSpliterators.SliceSpliterator.OfLong( 370 (Spliterator.OfLong) helper.wrapSpliterator(spliterator), 371 skip, 372 calcSliceFence(skip, limit)); 373 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 374 return unorderedSkipLimitSpliterator( 375 (Spliterator.OfLong) helper.wrapSpliterator(spliterator), 376 skip, limit, size); 377 } 378 else { 379 return new SliceTask<>(this, helper, spliterator, Long[]::new, skip, limit). 380 invoke().spliterator(); 381 } 382 } 383 384 @Override 385 // Android-changed: Make public, to match the method it's overriding. 386 public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, 387 Spliterator<P_IN> spliterator, 388 IntFunction<Long[]> generator) { 389 long size = helper.exactOutputSizeIfKnown(spliterator); 390 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 391 // Because the pipeline is SIZED the slice spliterator 392 // can be created from the source, this requires matching 393 // to shape of the source, and is potentially more efficient 394 // than creating the slice spliterator from the pipeline 395 // wrapping spliterator 396 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); 397 return Nodes.collectLong(helper, s, true); 398 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 399 Spliterator.OfLong s = unorderedSkipLimitSpliterator( 400 (Spliterator.OfLong) helper.wrapSpliterator(spliterator), 401 skip, limit, size); 402 // Collect using this pipeline, which is empty and therefore 403 // can be used with the pipeline wrapping spliterator 404 // Note that we cannot create a slice spliterator from 405 // the source spliterator if the pipeline is not SIZED 406 return Nodes.collectLong(this, s, true); 407 } 408 else { 409 return new SliceTask<>(this, helper, spliterator, generator, skip, limit). 410 invoke(); 411 } 412 } 413 414 @Override 415 // Android-changed: Make public, to match the method it's overriding. 416 public Sink<Long> opWrapSink(int flags, Sink<Long> sink) { 417 return new Sink.ChainedLong<Long>(sink) { 418 long n = skip; 419 long m = limit >= 0 ? limit : Long.MAX_VALUE; 420 421 @Override 422 public void begin(long size) { 423 downstream.begin(calcSize(size, skip, m)); 424 } 425 426 @Override 427 public void accept(long t) { 428 if (n == 0) { 429 if (m > 0) { 430 m--; 431 downstream.accept(t); 432 } 433 } 434 else { 435 n--; 436 } 437 } 438 439 @Override 440 public boolean cancellationRequested() { 441 return m == 0 || downstream.cancellationRequested(); 442 } 443 }; 444 } 445 }; 446 } 447 448 /** 449 * Appends a "slice" operation to the provided DoubleStream. The slice 450 * operation may be may be skip-only, limit-only, or skip-and-limit. 451 * 452 * @param upstream A DoubleStream 453 * @param skip The number of elements to skip. Must be >= 0. 454 * @param limit The maximum size of the resulting stream, or -1 if no limit 455 * is to be imposed 456 */ 457 public static DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream, 458 long skip, long limit) { 459 if (skip < 0) 460 throw new IllegalArgumentException("Skip must be non-negative: " + skip); 461 462 return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE, 463 flags(limit)) { 464 Spliterator.OfDouble unorderedSkipLimitSpliterator( 465 Spliterator.OfDouble s, long skip, long limit, long sizeIfKnown) { 466 if (skip <= sizeIfKnown) { 467 // Use just the limit if the number of elements 468 // to skip is <= the known pipeline size 469 limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; 470 skip = 0; 471 } 472 return new StreamSpliterators.UnorderedSliceSpliterator.OfDouble(s, skip, limit); 473 } 474 475 @Override 476 // Android-changed: Make public, to match the method it's overriding. 477 public <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper, 478 Spliterator<P_IN> spliterator) { 479 long size = helper.exactOutputSizeIfKnown(spliterator); 480 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 481 return new StreamSpliterators.SliceSpliterator.OfDouble( 482 (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), 483 skip, 484 calcSliceFence(skip, limit)); 485 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 486 return unorderedSkipLimitSpliterator( 487 (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), 488 skip, limit, size); 489 } 490 else { 491 return new SliceTask<>(this, helper, spliterator, Double[]::new, skip, limit). 492 invoke().spliterator(); 493 } 494 } 495 496 @Override 497 // Android-changed: Make public, to match the method it's overriding. 498 public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, 499 Spliterator<P_IN> spliterator, 500 IntFunction<Double[]> generator) { 501 long size = helper.exactOutputSizeIfKnown(spliterator); 502 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 503 // Because the pipeline is SIZED the slice spliterator 504 // can be created from the source, this requires matching 505 // to shape of the source, and is potentially more efficient 506 // than creating the slice spliterator from the pipeline 507 // wrapping spliterator 508 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); 509 return Nodes.collectDouble(helper, s, true); 510 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 511 Spliterator.OfDouble s = unorderedSkipLimitSpliterator( 512 (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), 513 skip, limit, size); 514 // Collect using this pipeline, which is empty and therefore 515 // can be used with the pipeline wrapping spliterator 516 // Note that we cannot create a slice spliterator from 517 // the source spliterator if the pipeline is not SIZED 518 return Nodes.collectDouble(this, s, true); 519 } 520 else { 521 return new SliceTask<>(this, helper, spliterator, generator, skip, limit). 522 invoke(); 523 } 524 } 525 526 @Override 527 // Android-changed: Make public, to match the method it's overriding. 528 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 529 return new Sink.ChainedDouble<Double>(sink) { 530 long n = skip; 531 long m = limit >= 0 ? limit : Long.MAX_VALUE; 532 533 @Override 534 public void begin(long size) { 535 downstream.begin(calcSize(size, skip, m)); 536 } 537 538 @Override 539 public void accept(double t) { 540 if (n == 0) { 541 if (m > 0) { 542 m--; 543 downstream.accept(t); 544 } 545 } 546 else { 547 n--; 548 } 549 } 550 551 @Override 552 public boolean cancellationRequested() { 553 return m == 0 || downstream.cancellationRequested(); 554 } 555 }; 556 } 557 }; 558 } 559 560 private static int flags(long limit) { 561 return StreamOpFlag.NOT_SIZED | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0); 562 } 563 564 /** 565 * {@code ForkJoinTask} implementing slice computation. 566 * 567 * @param <P_IN> Input element type to the stream pipeline 568 * @param <P_OUT> Output element type from the stream pipeline 569 */ 570 @SuppressWarnings("serial") 571 private static final class SliceTask<P_IN, P_OUT> 572 extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, SliceTask<P_IN, P_OUT>> { 573 private final AbstractPipeline<P_OUT, P_OUT, ?> op; 574 private final IntFunction<P_OUT[]> generator; 575 private final long targetOffset, targetSize; 576 private long thisNodeSize; 577 578 private volatile boolean completed; 579 580 SliceTask(AbstractPipeline<P_OUT, P_OUT, ?> op, 581 PipelineHelper<P_OUT> helper, 582 Spliterator<P_IN> spliterator, 583 IntFunction<P_OUT[]> generator, 584 long offset, long size) { 585 super(helper, spliterator); 586 this.op = op; 587 this.generator = generator; 588 this.targetOffset = offset; 589 this.targetSize = size; 590 } 591 592 SliceTask(SliceTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) { 593 super(parent, spliterator); 594 this.op = parent.op; 595 this.generator = parent.generator; 596 this.targetOffset = parent.targetOffset; 597 this.targetSize = parent.targetSize; 598 } 599 600 @Override 601 protected SliceTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) { 602 return new SliceTask<>(this, spliterator); 603 } 604 605 @Override 606 protected final Node<P_OUT> getEmptyResult() { 607 return Nodes.emptyNode(op.getOutputShape()); 608 } 609 610 @Override 611 protected final Node<P_OUT> doLeaf() { 612 if (isRoot()) { 613 long sizeIfKnown = StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags) 614 ? op.exactOutputSizeIfKnown(spliterator) 615 : -1; 616 final Node.Builder<P_OUT> nb = op.makeNodeBuilder(sizeIfKnown, generator); 617 Sink<P_OUT> opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb); 618 helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator); 619 // There is no need to truncate since the op performs the 620 // skipping and limiting of elements 621 return nb.build(); 622 } 623 else { 624 Node<P_OUT> node = helper.wrapAndCopyInto(helper.makeNodeBuilder(-1, generator), 625 spliterator).build(); 626 thisNodeSize = node.count(); 627 completed = true; 628 spliterator = null; 629 return node; 630 } 631 } 632 633 @Override 634 public final void onCompletion(CountedCompleter<?> caller) { 635 if (!isLeaf()) { 636 Node<P_OUT> result; 637 thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize; 638 if (canceled) { 639 thisNodeSize = 0; 640 result = getEmptyResult(); 641 } 642 else if (thisNodeSize == 0) 643 result = getEmptyResult(); 644 else if (leftChild.thisNodeSize == 0) 645 result = rightChild.getLocalResult(); 646 else { 647 result = Nodes.conc(op.getOutputShape(), 648 leftChild.getLocalResult(), rightChild.getLocalResult()); 649 } 650 setLocalResult(isRoot() ? doTruncate(result) : result); 651 completed = true; 652 } 653 if (targetSize >= 0 654 && !isRoot() 655 && isLeftCompleted(targetOffset + targetSize)) 656 cancelLaterNodes(); 657 658 super.onCompletion(caller); 659 } 660 661 @Override 662 protected void cancel() { 663 super.cancel(); 664 if (completed) 665 setLocalResult(getEmptyResult()); 666 } 667 668 private Node<P_OUT> doTruncate(Node<P_OUT> input) { 669 long to = targetSize >= 0 ? Math.min(input.count(), targetOffset + targetSize) : thisNodeSize; 670 return input.truncate(targetOffset, to, generator); 671 } 672 673 /** 674 * Determine if the number of completed elements in this node and nodes 675 * to the left of this node is greater than or equal to the target size. 676 * 677 * @param target the target size 678 * @return true if the number of elements is greater than or equal to 679 * the target size, otherwise false. 680 */ 681 private boolean isLeftCompleted(long target) { 682 long size = completed ? thisNodeSize : completedSize(target); 683 if (size >= target) 684 return true; 685 for (SliceTask<P_IN, P_OUT> parent = getParent(), node = this; 686 parent != null; 687 node = parent, parent = parent.getParent()) { 688 if (node == parent.rightChild) { 689 SliceTask<P_IN, P_OUT> left = parent.leftChild; 690 if (left != null) { 691 size += left.completedSize(target); 692 if (size >= target) 693 return true; 694 } 695 } 696 } 697 return size >= target; 698 } 699 700 /** 701 * Compute the number of completed elements in this node. 702 * <p> 703 * Computation terminates if all nodes have been processed or the 704 * number of completed elements is greater than or equal to the target 705 * size. 706 * 707 * @param target the target size 708 * @return return the number of completed elements 709 */ 710 private long completedSize(long target) { 711 if (completed) 712 return thisNodeSize; 713 else { 714 SliceTask<P_IN, P_OUT> left = leftChild; 715 SliceTask<P_IN, P_OUT> right = rightChild; 716 if (left == null || right == null) { 717 // must be completed 718 return thisNodeSize; 719 } 720 else { 721 long leftSize = left.completedSize(target); 722 return (leftSize >= target) ? leftSize : leftSize + right.completedSize(target); 723 } 724 } 725 } 726 } 727 } 728