1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 19 * or visit www.oracle.com if you need additional information or have any 20 * questions. 21 */ 22 23 /* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea and Martin Buchholz with assistance from 30 * members of JCP JSR-166 Expert Group and released to the public 31 * domain, as explained at 32 * http://creativecommons.org/publicdomain/zero/1.0/ 33 */ 34 35 package test.java.util.concurrent.tck; 36 import static org.junit.Assert.assertEquals; 37 import static org.junit.Assert.assertFalse; 38 import static org.junit.Assert.assertNull; 39 import static org.junit.Assert.assertNotNull; 40 import static org.junit.Assert.assertSame; 41 import static org.junit.Assert.assertNotSame; 42 import static org.junit.Assert.assertTrue; 43 import static org.junit.Assert.fail; 44 45 import java.util.concurrent.CompletableFuture; 46 import java.util.concurrent.CountDownLatch; 47 import java.util.concurrent.Executor; 48 import java.util.concurrent.Executors; 49 import java.util.concurrent.Flow; 50 import java.util.concurrent.ForkJoinPool; 51 import java.util.concurrent.SubmissionPublisher; 52 import java.util.concurrent.atomic.AtomicInteger; 53 import org.junit.Test; 54 import org.junit.runner.RunWith; 55 import org.junit.runners.JUnit4; 56 57 import static java.util.concurrent.Flow.Subscriber; 58 import static java.util.concurrent.Flow.Subscription; 59 import static java.util.concurrent.TimeUnit.MILLISECONDS; 60 61 // Android-changed: Use JUnit4. 62 @RunWith(JUnit4.class) 63 public class SubmissionPublisherTest extends JSR166TestCase { 64 65 // Android-changed: Use JUnitCore.main. main(String[] args)66 public static void main(String[] args) { 67 // main(suite(), args); 68 org.junit.runner.JUnitCore.main("test.java.util.concurrent.tck.SubmissionPublisherTest"); 69 } 70 // public static Test suite() { 71 // return new TestSuite(SubmissionPublisherTest.class); 72 // } 73 74 final Executor basicExecutor = basicPublisher().getExecutor(); 75 basicPublisher()76 static SubmissionPublisher<Integer> basicPublisher() { 77 return new SubmissionPublisher<Integer>(); 78 } 79 80 static class SPException extends RuntimeException {} 81 82 class TestSubscriber implements Subscriber<Integer> { 83 volatile Subscription sn; 84 int last; // Requires that onNexts are in numeric order 85 volatile int nexts; 86 volatile int errors; 87 volatile int completes; 88 volatile boolean throwOnCall = false; 89 volatile boolean request = true; 90 volatile Throwable lastError; 91 onSubscribe(Subscription s)92 public synchronized void onSubscribe(Subscription s) { 93 threadAssertTrue(sn == null); 94 sn = s; 95 notifyAll(); 96 if (throwOnCall) 97 throw new SPException(); 98 if (request) 99 sn.request(1L); 100 } onNext(Integer t)101 public synchronized void onNext(Integer t) { 102 ++nexts; 103 notifyAll(); 104 int current = t.intValue(); 105 threadAssertTrue(current >= last); 106 last = current; 107 if (request) 108 sn.request(1L); 109 if (throwOnCall) 110 throw new SPException(); 111 } onError(Throwable t)112 public synchronized void onError(Throwable t) { 113 threadAssertTrue(completes == 0); 114 threadAssertTrue(errors == 0); 115 lastError = t; 116 ++errors; 117 notifyAll(); 118 } onComplete()119 public synchronized void onComplete() { 120 threadAssertTrue(completes == 0); 121 ++completes; 122 notifyAll(); 123 } 124 awaitSubscribe()125 synchronized void awaitSubscribe() { 126 while (sn == null) { 127 try { 128 wait(); 129 } catch (Exception ex) { 130 threadUnexpectedException(ex); 131 break; 132 } 133 } 134 } awaitNext(int n)135 synchronized void awaitNext(int n) { 136 while (nexts < n) { 137 try { 138 wait(); 139 } catch (Exception ex) { 140 threadUnexpectedException(ex); 141 break; 142 } 143 } 144 } awaitComplete()145 synchronized void awaitComplete() { 146 while (completes == 0 && errors == 0) { 147 try { 148 wait(); 149 } catch (Exception ex) { 150 threadUnexpectedException(ex); 151 break; 152 } 153 } 154 } awaitError()155 synchronized void awaitError() { 156 while (errors == 0) { 157 try { 158 wait(); 159 } catch (Exception ex) { 160 threadUnexpectedException(ex); 161 break; 162 } 163 } 164 } 165 166 } 167 168 /** 169 * A new SubmissionPublisher has no subscribers, a non-null 170 * executor, a power-of-two capacity, is not closed, and reports 171 * zero demand and lag 172 */ checkInitialState(SubmissionPublisher<?> p)173 void checkInitialState(SubmissionPublisher<?> p) { 174 assertFalse(p.hasSubscribers()); 175 assertEquals(0, p.getNumberOfSubscribers()); 176 assertTrue(p.getSubscribers().isEmpty()); 177 assertFalse(p.isClosed()); 178 assertNull(p.getClosedException()); 179 int n = p.getMaxBufferCapacity(); 180 assertTrue((n & (n - 1)) == 0); // power of two 181 assertNotNull(p.getExecutor()); 182 assertEquals(0, p.estimateMinimumDemand()); 183 assertEquals(0, p.estimateMaximumLag()); 184 } 185 186 /** 187 * A default-constructed SubmissionPublisher has no subscribers, 188 * is not closed, has default buffer size, and uses the 189 * defaultExecutor 190 */ 191 @Test testConstructor1()192 public void testConstructor1() { 193 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(); 194 checkInitialState(p); 195 assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize()); 196 Executor e = p.getExecutor(), c = ForkJoinPool.commonPool(); 197 if (ForkJoinPool.getCommonPoolParallelism() > 1) 198 assertSame(e, c); 199 else 200 assertNotSame(e, c); 201 } 202 203 /** 204 * A new SubmissionPublisher has no subscribers, is not closed, 205 * has the given buffer size, and uses the given executor 206 */ 207 @Test testConstructor2()208 public void testConstructor2() { 209 Executor e = Executors.newFixedThreadPool(1); 210 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(e, 8); 211 checkInitialState(p); 212 assertSame(p.getExecutor(), e); 213 assertEquals(8, p.getMaxBufferCapacity()); 214 } 215 216 /** 217 * A null Executor argument to SubmissionPublisher constructor 218 * throws NullPointerException 219 */ 220 @Test testConstructor3()221 public void testConstructor3() { 222 try { 223 new SubmissionPublisher<Integer>(null, 8); 224 shouldThrow(); 225 } catch (NullPointerException success) {} 226 } 227 228 /** 229 * A negative capacity argument to SubmissionPublisher constructor 230 * throws IllegalArgumentException 231 */ 232 @Test testConstructor4()233 public void testConstructor4() { 234 Executor e = Executors.newFixedThreadPool(1); 235 try { 236 new SubmissionPublisher<Integer>(e, -1); 237 shouldThrow(); 238 } catch (IllegalArgumentException success) {} 239 } 240 241 /** 242 * A closed publisher reports isClosed with no closedException and 243 * throws IllegalStateException upon attempted submission; a 244 * subsequent close or closeExceptionally has no additional 245 * effect. 246 */ 247 @Test testClose()248 public void testClose() { 249 SubmissionPublisher<Integer> p = basicPublisher(); 250 checkInitialState(p); 251 p.close(); 252 assertTrue(p.isClosed()); 253 assertNull(p.getClosedException()); 254 try { 255 p.submit(1); 256 shouldThrow(); 257 } catch (IllegalStateException success) {} 258 Throwable ex = new SPException(); 259 p.closeExceptionally(ex); 260 assertTrue(p.isClosed()); 261 assertNull(p.getClosedException()); 262 } 263 264 /** 265 * A publisher closedExceptionally reports isClosed with the 266 * closedException and throws IllegalStateException upon attempted 267 * submission; a subsequent close or closeExceptionally has no 268 * additional effect. 269 */ 270 @Test testCloseExceptionally()271 public void testCloseExceptionally() { 272 SubmissionPublisher<Integer> p = basicPublisher(); 273 checkInitialState(p); 274 Throwable ex = new SPException(); 275 p.closeExceptionally(ex); 276 assertTrue(p.isClosed()); 277 assertSame(p.getClosedException(), ex); 278 try { 279 p.submit(1); 280 shouldThrow(); 281 } catch (IllegalStateException success) {} 282 p.close(); 283 assertTrue(p.isClosed()); 284 assertSame(p.getClosedException(), ex); 285 } 286 287 /** 288 * Upon subscription, the subscriber's onSubscribe is called, no 289 * other Subscriber methods are invoked, the publisher 290 * hasSubscribers, isSubscribed is true, and existing 291 * subscriptions are unaffected. 292 */ 293 @Test testSubscribe1()294 public void testSubscribe1() { 295 TestSubscriber s = new TestSubscriber(); 296 SubmissionPublisher<Integer> p = basicPublisher(); 297 p.subscribe(s); 298 assertTrue(p.hasSubscribers()); 299 assertEquals(1, p.getNumberOfSubscribers()); 300 assertTrue(p.getSubscribers().contains(s)); 301 assertTrue(p.isSubscribed(s)); 302 s.awaitSubscribe(); 303 assertNotNull(s.sn); 304 assertEquals(0, s.nexts); 305 assertEquals(0, s.errors); 306 assertEquals(0, s.completes); 307 TestSubscriber s2 = new TestSubscriber(); 308 p.subscribe(s2); 309 assertTrue(p.hasSubscribers()); 310 assertEquals(2, p.getNumberOfSubscribers()); 311 assertTrue(p.getSubscribers().contains(s)); 312 assertTrue(p.getSubscribers().contains(s2)); 313 assertTrue(p.isSubscribed(s)); 314 assertTrue(p.isSubscribed(s2)); 315 s2.awaitSubscribe(); 316 assertNotNull(s2.sn); 317 assertEquals(0, s2.nexts); 318 assertEquals(0, s2.errors); 319 assertEquals(0, s2.completes); 320 p.close(); 321 } 322 323 /** 324 * If closed, upon subscription, the subscriber's onComplete 325 * method is invoked 326 */ 327 @Test testSubscribe2()328 public void testSubscribe2() { 329 TestSubscriber s = new TestSubscriber(); 330 SubmissionPublisher<Integer> p = basicPublisher(); 331 p.close(); 332 p.subscribe(s); 333 s.awaitComplete(); 334 assertEquals(0, s.nexts); 335 assertEquals(0, s.errors); 336 assertEquals(1, s.completes, 1); 337 } 338 339 /** 340 * If closedExceptionally, upon subscription, the subscriber's 341 * onError method is invoked 342 */ 343 @Test testSubscribe3()344 public void testSubscribe3() { 345 TestSubscriber s = new TestSubscriber(); 346 SubmissionPublisher<Integer> p = basicPublisher(); 347 Throwable ex = new SPException(); 348 p.closeExceptionally(ex); 349 assertTrue(p.isClosed()); 350 assertSame(p.getClosedException(), ex); 351 p.subscribe(s); 352 s.awaitError(); 353 assertEquals(0, s.nexts); 354 assertEquals(1, s.errors); 355 } 356 357 /** 358 * Upon attempted resubscription, the subscriber's onError is 359 * called and the subscription is cancelled. 360 */ 361 @Test testSubscribe4()362 public void testSubscribe4() { 363 TestSubscriber s = new TestSubscriber(); 364 SubmissionPublisher<Integer> p = basicPublisher(); 365 p.subscribe(s); 366 assertTrue(p.hasSubscribers()); 367 assertEquals(1, p.getNumberOfSubscribers()); 368 assertTrue(p.getSubscribers().contains(s)); 369 assertTrue(p.isSubscribed(s)); 370 s.awaitSubscribe(); 371 assertNotNull(s.sn); 372 assertEquals(0, s.nexts); 373 assertEquals(0, s.errors); 374 assertEquals(0, s.completes); 375 p.subscribe(s); 376 s.awaitError(); 377 assertEquals(0, s.nexts); 378 assertEquals(1, s.errors); 379 assertFalse(p.isSubscribed(s)); 380 } 381 382 /** 383 * An exception thrown in onSubscribe causes onError 384 */ 385 @Test testSubscribe5()386 public void testSubscribe5() { 387 TestSubscriber s = new TestSubscriber(); 388 SubmissionPublisher<Integer> p = basicPublisher(); 389 s.throwOnCall = true; 390 p.subscribe(s); 391 s.awaitError(); 392 assertEquals(0, s.nexts); 393 assertEquals(1, s.errors); 394 assertEquals(0, s.completes); 395 } 396 397 /** 398 * subscribe(null) throws NPE 399 */ 400 @Test testSubscribe6()401 public void testSubscribe6() { 402 SubmissionPublisher<Integer> p = basicPublisher(); 403 try { 404 p.subscribe(null); 405 shouldThrow(); 406 } catch (NullPointerException success) {} 407 checkInitialState(p); 408 } 409 410 /** 411 * Closing a publisher causes onComplete to subscribers 412 */ 413 @Test testCloseCompletes()414 public void testCloseCompletes() { 415 SubmissionPublisher<Integer> p = basicPublisher(); 416 TestSubscriber s1 = new TestSubscriber(); 417 TestSubscriber s2 = new TestSubscriber(); 418 p.subscribe(s1); 419 p.subscribe(s2); 420 p.submit(1); 421 p.close(); 422 assertTrue(p.isClosed()); 423 assertNull(p.getClosedException()); 424 s1.awaitComplete(); 425 assertEquals(1, s1.nexts); 426 assertEquals(1, s1.completes); 427 s2.awaitComplete(); 428 assertEquals(1, s2.nexts); 429 assertEquals(1, s2.completes); 430 } 431 432 /** 433 * Closing a publisher exceptionally causes onError to subscribers 434 * after they are subscribed 435 */ 436 @Test testCloseExceptionallyError()437 public void testCloseExceptionallyError() { 438 SubmissionPublisher<Integer> p = basicPublisher(); 439 TestSubscriber s1 = new TestSubscriber(); 440 TestSubscriber s2 = new TestSubscriber(); 441 p.subscribe(s1); 442 p.subscribe(s2); 443 p.submit(1); 444 p.closeExceptionally(new SPException()); 445 assertTrue(p.isClosed()); 446 s1.awaitSubscribe(); 447 s1.awaitError(); 448 assertTrue(s1.nexts <= 1); 449 assertEquals(1, s1.errors); 450 s2.awaitSubscribe(); 451 s2.awaitError(); 452 assertTrue(s2.nexts <= 1); 453 assertEquals(1, s2.errors); 454 } 455 456 /** 457 * Cancelling a subscription eventually causes no more onNexts to be issued 458 */ 459 @Test testCancel()460 public void testCancel() { 461 SubmissionPublisher<Integer> p = 462 new SubmissionPublisher<>(basicExecutor, 4); // must be < 20 463 TestSubscriber s1 = new TestSubscriber(); 464 TestSubscriber s2 = new TestSubscriber(); 465 p.subscribe(s1); 466 p.subscribe(s2); 467 s1.awaitSubscribe(); 468 p.submit(1); 469 s1.sn.cancel(); 470 for (int i = 2; i <= 20; ++i) 471 p.submit(i); 472 p.close(); 473 s2.awaitComplete(); 474 assertEquals(20, s2.nexts); 475 assertEquals(1, s2.completes); 476 assertTrue(s1.nexts < 20); 477 assertFalse(p.isSubscribed(s1)); 478 } 479 480 /** 481 * Throwing an exception in onNext causes onError 482 */ 483 @Test 484 public void testThrowOnNext() { 485 SubmissionPublisher<Integer> p = basicPublisher(); 486 TestSubscriber s1 = new TestSubscriber(); 487 TestSubscriber s2 = new TestSubscriber(); 488 p.subscribe(s1); 489 p.subscribe(s2); 490 s1.awaitSubscribe(); 491 p.submit(1); 492 s1.throwOnCall = true; 493 p.submit(2); 494 p.close(); 495 s2.awaitComplete(); 496 assertEquals(2, s2.nexts); 497 s1.awaitComplete(); 498 assertEquals(1, s1.errors); 499 } 500 501 /** 502 * If a handler is supplied in constructor, it is invoked when 503 * subscriber throws an exception in onNext 504 */ 505 @Test 506 public void testThrowOnNextHandler() { 507 AtomicInteger calls = new AtomicInteger(); 508 SubmissionPublisher<Integer> p = new SubmissionPublisher<>( 509 basicExecutor, 8, (s, e) -> calls.getAndIncrement()); 510 TestSubscriber s1 = new TestSubscriber(); 511 TestSubscriber s2 = new TestSubscriber(); 512 p.subscribe(s1); 513 p.subscribe(s2); 514 s1.awaitSubscribe(); 515 p.submit(1); 516 s1.throwOnCall = true; 517 p.submit(2); 518 p.close(); 519 s2.awaitComplete(); 520 assertEquals(2, s2.nexts); 521 assertEquals(1, s2.completes); 522 s1.awaitError(); 523 assertEquals(1, s1.errors); 524 assertEquals(1, calls.get()); 525 } 526 527 /** 528 * onNext items are issued in the same order to each subscriber 529 */ 530 @Test testOrder()531 public void testOrder() { 532 SubmissionPublisher<Integer> p = basicPublisher(); 533 TestSubscriber s1 = new TestSubscriber(); 534 TestSubscriber s2 = new TestSubscriber(); 535 p.subscribe(s1); 536 p.subscribe(s2); 537 for (int i = 1; i <= 20; ++i) 538 p.submit(i); 539 p.close(); 540 s2.awaitComplete(); 541 s1.awaitComplete(); 542 assertEquals(20, s2.nexts); 543 assertEquals(1, s2.completes); 544 assertEquals(20, s1.nexts); 545 assertEquals(1, s1.completes); 546 } 547 548 /** 549 * onNext is issued only if requested 550 */ 551 @Test testRequest1()552 public void testRequest1() { 553 SubmissionPublisher<Integer> p = basicPublisher(); 554 TestSubscriber s1 = new TestSubscriber(); 555 s1.request = false; 556 p.subscribe(s1); 557 s1.awaitSubscribe(); 558 assertEquals(0, p.estimateMinimumDemand()); 559 TestSubscriber s2 = new TestSubscriber(); 560 p.subscribe(s2); 561 p.submit(1); 562 p.submit(2); 563 s2.awaitNext(1); 564 assertEquals(0, s1.nexts); 565 s1.sn.request(3); 566 p.submit(3); 567 p.close(); 568 s2.awaitComplete(); 569 assertEquals(3, s2.nexts); 570 assertEquals(1, s2.completes); 571 s1.awaitComplete(); 572 assertTrue(s1.nexts > 0); 573 assertEquals(1, s1.completes); 574 } 575 576 /** 577 * onNext is not issued when requests become zero 578 */ 579 @Test testRequest2()580 public void testRequest2() { 581 SubmissionPublisher<Integer> p = basicPublisher(); 582 TestSubscriber s1 = new TestSubscriber(); 583 TestSubscriber s2 = new TestSubscriber(); 584 p.subscribe(s1); 585 p.subscribe(s2); 586 s2.awaitSubscribe(); 587 s1.awaitSubscribe(); 588 s1.request = false; 589 p.submit(1); 590 p.submit(2); 591 p.close(); 592 s2.awaitComplete(); 593 assertEquals(2, s2.nexts); 594 assertEquals(1, s2.completes); 595 s1.awaitNext(1); 596 assertEquals(1, s1.nexts); 597 } 598 599 /** 600 * Non-positive request causes error 601 */ 602 @Test testRequest3()603 public void testRequest3() { 604 SubmissionPublisher<Integer> p = basicPublisher(); 605 TestSubscriber s1 = new TestSubscriber(); 606 TestSubscriber s2 = new TestSubscriber(); 607 TestSubscriber s3 = new TestSubscriber(); 608 p.subscribe(s1); 609 p.subscribe(s2); 610 p.subscribe(s3); 611 s3.awaitSubscribe(); 612 s2.awaitSubscribe(); 613 s1.awaitSubscribe(); 614 s1.sn.request(-1L); 615 s3.sn.request(0L); 616 p.submit(1); 617 p.submit(2); 618 p.close(); 619 s2.awaitComplete(); 620 assertEquals(2, s2.nexts); 621 assertEquals(1, s2.completes); 622 s1.awaitError(); 623 assertEquals(1, s1.errors); 624 assertTrue(s1.lastError instanceof IllegalArgumentException); 625 s3.awaitError(); 626 assertEquals(1, s3.errors); 627 assertTrue(s3.lastError instanceof IllegalArgumentException); 628 } 629 630 /** 631 * estimateMinimumDemand reports 0 until request, nonzero after 632 * request 633 */ 634 @Test testEstimateMinimumDemand()635 public void testEstimateMinimumDemand() { 636 TestSubscriber s = new TestSubscriber(); 637 SubmissionPublisher<Integer> p = basicPublisher(); 638 s.request = false; 639 p.subscribe(s); 640 s.awaitSubscribe(); 641 assertEquals(0, p.estimateMinimumDemand()); 642 s.sn.request(1); 643 assertEquals(1, p.estimateMinimumDemand()); 644 } 645 646 /** 647 * submit to a publisher with no subscribers returns lag 0 648 */ 649 @Test testEmptySubmit()650 public void testEmptySubmit() { 651 SubmissionPublisher<Integer> p = basicPublisher(); 652 assertEquals(0, p.submit(1)); 653 } 654 655 /** 656 * submit(null) throws NPE 657 */ 658 @Test testNullSubmit()659 public void testNullSubmit() { 660 SubmissionPublisher<Integer> p = basicPublisher(); 661 try { 662 p.submit(null); 663 shouldThrow(); 664 } catch (NullPointerException success) {} 665 } 666 667 /** 668 * submit returns number of lagged items, compatible with result 669 * of estimateMaximumLag. 670 */ 671 @Test testLaggedSubmit()672 public void testLaggedSubmit() { 673 SubmissionPublisher<Integer> p = basicPublisher(); 674 TestSubscriber s1 = new TestSubscriber(); 675 s1.request = false; 676 TestSubscriber s2 = new TestSubscriber(); 677 s2.request = false; 678 p.subscribe(s1); 679 p.subscribe(s2); 680 s2.awaitSubscribe(); 681 s1.awaitSubscribe(); 682 assertEquals(1, p.submit(1)); 683 assertTrue(p.estimateMaximumLag() >= 1); 684 assertTrue(p.submit(2) >= 2); 685 assertTrue(p.estimateMaximumLag() >= 2); 686 s1.sn.request(4); 687 assertTrue(p.submit(3) >= 3); 688 assertTrue(p.estimateMaximumLag() >= 3); 689 s2.sn.request(4); 690 p.submit(4); 691 p.close(); 692 s2.awaitComplete(); 693 assertEquals(4, s2.nexts); 694 s1.awaitComplete(); 695 assertEquals(4, s2.nexts); 696 } 697 698 /** 699 * submit eventually issues requested items when buffer capacity is 1 700 */ 701 @Test testCap1Submit()702 public void testCap1Submit() { 703 SubmissionPublisher<Integer> p 704 = new SubmissionPublisher<>(basicExecutor, 1); 705 TestSubscriber s1 = new TestSubscriber(); 706 TestSubscriber s2 = new TestSubscriber(); 707 p.subscribe(s1); 708 p.subscribe(s2); 709 for (int i = 1; i <= 20; ++i) { 710 assertTrue(p.submit(i) >= 0); 711 } 712 p.close(); 713 s2.awaitComplete(); 714 s1.awaitComplete(); 715 assertEquals(20, s2.nexts); 716 assertEquals(1, s2.completes); 717 assertEquals(20, s1.nexts); 718 assertEquals(1, s1.completes); 719 } 720 noopHandle(AtomicInteger count)721 static boolean noopHandle(AtomicInteger count) { 722 count.getAndIncrement(); 723 return false; 724 } 725 reqHandle(AtomicInteger count, Subscriber s)726 static boolean reqHandle(AtomicInteger count, Subscriber s) { 727 count.getAndIncrement(); 728 ((TestSubscriber)s).sn.request(Long.MAX_VALUE); 729 return true; 730 } 731 732 /** 733 * offer to a publisher with no subscribers returns lag 0 734 */ 735 @Test testEmptyOffer()736 public void testEmptyOffer() { 737 SubmissionPublisher<Integer> p = basicPublisher(); 738 assertEquals(0, p.offer(1, null)); 739 } 740 741 /** 742 * offer(null) throws NPE 743 */ 744 @Test testNullOffer()745 public void testNullOffer() { 746 SubmissionPublisher<Integer> p = basicPublisher(); 747 try { 748 p.offer(null, null); 749 shouldThrow(); 750 } catch (NullPointerException success) {} 751 } 752 753 /** 754 * offer returns number of lagged items if not saturated 755 */ 756 @Test testLaggedOffer()757 public void testLaggedOffer() { 758 SubmissionPublisher<Integer> p = basicPublisher(); 759 TestSubscriber s1 = new TestSubscriber(); 760 s1.request = false; 761 TestSubscriber s2 = new TestSubscriber(); 762 s2.request = false; 763 p.subscribe(s1); 764 p.subscribe(s2); 765 s2.awaitSubscribe(); 766 s1.awaitSubscribe(); 767 assertTrue(p.offer(1, null) >= 1); 768 assertTrue(p.offer(2, null) >= 2); 769 s1.sn.request(4); 770 assertTrue(p.offer(3, null) >= 3); 771 s2.sn.request(4); 772 p.offer(4, null); 773 p.close(); 774 s2.awaitComplete(); 775 assertEquals(4, s2.nexts); 776 s1.awaitComplete(); 777 assertEquals(4, s2.nexts); 778 } 779 780 /** 781 * offer reports drops if saturated 782 */ 783 @Test testDroppedOffer()784 public void testDroppedOffer() { 785 SubmissionPublisher<Integer> p 786 = new SubmissionPublisher<>(basicExecutor, 4); 787 TestSubscriber s1 = new TestSubscriber(); 788 s1.request = false; 789 TestSubscriber s2 = new TestSubscriber(); 790 s2.request = false; 791 p.subscribe(s1); 792 p.subscribe(s2); 793 s2.awaitSubscribe(); 794 s1.awaitSubscribe(); 795 for (int i = 1; i <= 4; ++i) 796 assertTrue(p.offer(i, null) >= 0); 797 p.offer(5, null); 798 assertTrue(p.offer(6, null) < 0); 799 s1.sn.request(64); 800 assertTrue(p.offer(7, null) < 0); 801 s2.sn.request(64); 802 p.close(); 803 s2.awaitComplete(); 804 assertTrue(s2.nexts >= 4); 805 s1.awaitComplete(); 806 assertTrue(s1.nexts >= 4); 807 } 808 809 /** 810 * offer invokes drop handler if saturated 811 */ 812 @Test testHandledDroppedOffer()813 public void testHandledDroppedOffer() { 814 AtomicInteger calls = new AtomicInteger(); 815 SubmissionPublisher<Integer> p 816 = new SubmissionPublisher<>(basicExecutor, 4); 817 TestSubscriber s1 = new TestSubscriber(); 818 s1.request = false; 819 TestSubscriber s2 = new TestSubscriber(); 820 s2.request = false; 821 p.subscribe(s1); 822 p.subscribe(s2); 823 s2.awaitSubscribe(); 824 s1.awaitSubscribe(); 825 for (int i = 1; i <= 4; ++i) 826 assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0); 827 p.offer(4, (s, x) -> noopHandle(calls)); 828 assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0); 829 s1.sn.request(64); 830 assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0); 831 s2.sn.request(64); 832 p.close(); 833 s2.awaitComplete(); 834 s1.awaitComplete(); 835 assertTrue(calls.get() >= 4); 836 } 837 838 /** 839 * offer succeeds if drop handler forces request 840 */ 841 @Test testRecoveredHandledDroppedOffer()842 public void testRecoveredHandledDroppedOffer() { 843 AtomicInteger calls = new AtomicInteger(); 844 SubmissionPublisher<Integer> p 845 = new SubmissionPublisher<>(basicExecutor, 4); 846 TestSubscriber s1 = new TestSubscriber(); 847 s1.request = false; 848 TestSubscriber s2 = new TestSubscriber(); 849 s2.request = false; 850 p.subscribe(s1); 851 p.subscribe(s2); 852 s2.awaitSubscribe(); 853 s1.awaitSubscribe(); 854 int n = 0; 855 for (int i = 1; i <= 8; ++i) { 856 int d = p.offer(i, (s, x) -> reqHandle(calls, s)); 857 n = n + 2 + (d < 0 ? d : 0); 858 } 859 p.close(); 860 s2.awaitComplete(); 861 s1.awaitComplete(); 862 assertEquals(n, s1.nexts + s2.nexts); 863 assertTrue(calls.get() >= 2); 864 } 865 866 /** 867 * Timed offer to a publisher with no subscribers returns lag 0 868 */ 869 @Test testEmptyTimedOffer()870 public void testEmptyTimedOffer() { 871 SubmissionPublisher<Integer> p = basicPublisher(); 872 long startTime = System.nanoTime(); 873 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null)); 874 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2); 875 } 876 877 /** 878 * Timed offer with null item or TimeUnit throws NPE 879 */ 880 @Test 881 public void testNullTimedOffer() { 882 SubmissionPublisher<Integer> p = basicPublisher(); 883 long startTime = System.nanoTime(); 884 try { 885 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null); 886 shouldThrow(); 887 } catch (NullPointerException success) {} 888 try { 889 p.offer(1, LONG_DELAY_MS, null, null); 890 shouldThrow(); 891 } catch (NullPointerException success) {} 892 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2); 893 } 894 895 /** 896 * Timed offer returns number of lagged items if not saturated 897 */ 898 @Test 899 public void testLaggedTimedOffer() { 900 SubmissionPublisher<Integer> p = basicPublisher(); 901 TestSubscriber s1 = new TestSubscriber(); 902 s1.request = false; 903 TestSubscriber s2 = new TestSubscriber(); 904 s2.request = false; 905 p.subscribe(s1); 906 p.subscribe(s2); 907 s2.awaitSubscribe(); 908 s1.awaitSubscribe(); 909 long startTime = System.nanoTime(); 910 assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1); 911 assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2); 912 s1.sn.request(4); 913 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3); 914 s2.sn.request(4); 915 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null); 916 p.close(); 917 s2.awaitComplete(); 918 assertEquals(4, s2.nexts); 919 s1.awaitComplete(); 920 assertEquals(4, s2.nexts); 921 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2); 922 } 923 924 /** 925 * Timed offer reports drops if saturated 926 */ 927 @Test 928 public void testDroppedTimedOffer() { 929 SubmissionPublisher<Integer> p 930 = new SubmissionPublisher<>(basicExecutor, 4); 931 TestSubscriber s1 = new TestSubscriber(); 932 s1.request = false; 933 TestSubscriber s2 = new TestSubscriber(); 934 s2.request = false; 935 p.subscribe(s1); 936 p.subscribe(s2); 937 s2.awaitSubscribe(); 938 s1.awaitSubscribe(); 939 long delay = timeoutMillis(); 940 for (int i = 1; i <= 4; ++i) 941 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0); 942 long startTime = System.nanoTime(); 943 assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0); 944 s1.sn.request(64); 945 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0); 946 // 2 * delay should elapse but check only 1 * delay to allow timer slop 947 assertTrue(millisElapsedSince(startTime) >= delay); 948 s2.sn.request(64); 949 p.close(); 950 s2.awaitComplete(); 951 assertTrue(s2.nexts >= 2); 952 s1.awaitComplete(); 953 assertTrue(s1.nexts >= 2); 954 } 955 956 /** 957 * Timed offer invokes drop handler if saturated 958 */ 959 @Test testHandledDroppedTimedOffer()960 public void testHandledDroppedTimedOffer() { 961 AtomicInteger calls = new AtomicInteger(); 962 SubmissionPublisher<Integer> p 963 = new SubmissionPublisher<>(basicExecutor, 4); 964 TestSubscriber s1 = new TestSubscriber(); 965 s1.request = false; 966 TestSubscriber s2 = new TestSubscriber(); 967 s2.request = false; 968 p.subscribe(s1); 969 p.subscribe(s2); 970 s2.awaitSubscribe(); 971 s1.awaitSubscribe(); 972 long delay = timeoutMillis(); 973 for (int i = 1; i <= 4; ++i) 974 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0); 975 long startTime = System.nanoTime(); 976 assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); 977 s1.sn.request(64); 978 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); 979 assertTrue(millisElapsedSince(startTime) >= delay); 980 s2.sn.request(64); 981 p.close(); 982 s2.awaitComplete(); 983 s1.awaitComplete(); 984 assertTrue(calls.get() >= 2); 985 } 986 987 /** 988 * Timed offer succeeds if drop handler forces request 989 */ 990 @Test testRecoveredHandledDroppedTimedOffer()991 public void testRecoveredHandledDroppedTimedOffer() { 992 AtomicInteger calls = new AtomicInteger(); 993 SubmissionPublisher<Integer> p 994 = new SubmissionPublisher<>(basicExecutor, 4); 995 TestSubscriber s1 = new TestSubscriber(); 996 s1.request = false; 997 TestSubscriber s2 = new TestSubscriber(); 998 s2.request = false; 999 p.subscribe(s1); 1000 p.subscribe(s2); 1001 s2.awaitSubscribe(); 1002 s1.awaitSubscribe(); 1003 int n = 0; 1004 long delay = timeoutMillis(); 1005 long startTime = System.nanoTime(); 1006 for (int i = 1; i <= 6; ++i) { 1007 int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s)); 1008 n = n + 2 + (d < 0 ? d : 0); 1009 } 1010 assertTrue(millisElapsedSince(startTime) >= delay); 1011 p.close(); 1012 s2.awaitComplete(); 1013 s1.awaitComplete(); 1014 assertEquals(n, s1.nexts + s2.nexts); 1015 assertTrue(calls.get() >= 2); 1016 } 1017 1018 /** 1019 * consume returns a CompletableFuture that is done when 1020 * publisher completes 1021 */ 1022 @Test testConsume()1023 public void testConsume() { 1024 AtomicInteger sum = new AtomicInteger(); 1025 SubmissionPublisher<Integer> p = basicPublisher(); 1026 CompletableFuture<Void> f = 1027 p.consume((Integer x) -> sum.getAndAdd(x.intValue())); 1028 int n = 20; 1029 for (int i = 1; i <= n; ++i) 1030 p.submit(i); 1031 p.close(); 1032 f.join(); 1033 assertEquals((n * (n + 1)) / 2, sum.get()); 1034 } 1035 1036 /** 1037 * consume(null) throws NPE 1038 */ 1039 @Test testConsumeNPE()1040 public void testConsumeNPE() { 1041 SubmissionPublisher<Integer> p = basicPublisher(); 1042 try { 1043 CompletableFuture<Void> f = p.consume(null); 1044 shouldThrow(); 1045 } catch (NullPointerException success) {} 1046 } 1047 1048 /** 1049 * consume eventually stops processing published items if cancelled 1050 */ 1051 @Test testCancelledConsume()1052 public void testCancelledConsume() { 1053 AtomicInteger count = new AtomicInteger(); 1054 SubmissionPublisher<Integer> p = basicPublisher(); 1055 CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement()); 1056 f.cancel(true); 1057 int n = 1000000; // arbitrary limit 1058 for (int i = 1; i <= n; ++i) 1059 p.submit(i); 1060 assertTrue(count.get() < n); 1061 } 1062 1063 /** 1064 * Tests scenario for 1065 * JDK-8187947: A race condition in SubmissionPublisher 1066 * cvs update -D '2017-11-25' src/main/java/util/concurrent/SubmissionPublisher.java && ant -Djsr166.expensiveTests=true -Djsr166.tckTestClass=SubmissionPublisherTest -Djsr166.methodFilter=testMissedSignal tck; cvs update -A src/main/java/util/concurrent/SubmissionPublisher.java 1067 */ 1068 @Test 1069 public void testMissedSignal_8187947() throws Exception { 1070 if (!atLeastJava9()) return; // backport to jdk8 too hard 1071 final int N = expensiveTests ? (1 << 20) : (1 << 10); 1072 final CountDownLatch finished = new CountDownLatch(1); 1073 final SubmissionPublisher<Boolean> pub = new SubmissionPublisher<>(); 1074 class Sub implements Subscriber<Boolean> { 1075 int received; 1076 public void onSubscribe(Subscription s) { 1077 s.request(N); 1078 } 1079 public void onNext(Boolean item) { 1080 if (++received == N) 1081 finished.countDown(); 1082 else 1083 CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE)); 1084 } 1085 public void onError(Throwable t) { throw new AssertionError(t); } 1086 public void onComplete() {} 1087 } 1088 pub.subscribe(new Sub()); 1089 CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE)); 1090 await(finished); 1091 } 1092 } 1093