• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.
7  *
8  * This code is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * version 2 for more details (a copy is included in the LICENSE file that
12  * accompanied this code).
13  *
14  * You should have received a copy of the GNU General Public License version
15  * 2 along with this work; if not, write to the Free Software Foundation,
16  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
17  *
18  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
19  * or visit www.oracle.com if you need additional information or have any
20  * questions.
21  */
22 
23 /*
24  * This file is available under and governed by the GNU General Public
25  * License version 2 only, as published by the Free Software Foundation.
26  * However, the following notice accompanied the original version of this
27  * file:
28  *
29  * Written by Doug Lea and Martin Buchholz with assistance from
30  * members of JCP JSR-166 Expert Group and released to the public
31  * domain, as explained at
32  * http://creativecommons.org/publicdomain/zero/1.0/
33  */
34 
35 package test.java.util.concurrent.tck;
36 import static org.junit.Assert.assertEquals;
37 import static org.junit.Assert.assertFalse;
38 import static org.junit.Assert.assertNull;
39 import static org.junit.Assert.assertNotNull;
40 import static org.junit.Assert.assertSame;
41 import static org.junit.Assert.assertNotSame;
42 import static org.junit.Assert.assertTrue;
43 import static org.junit.Assert.fail;
44 
45 import java.util.concurrent.CompletableFuture;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.Executor;
48 import java.util.concurrent.Executors;
49 import java.util.concurrent.Flow;
50 import java.util.concurrent.ForkJoinPool;
51 import java.util.concurrent.SubmissionPublisher;
52 import java.util.concurrent.atomic.AtomicInteger;
53 import org.junit.Test;
54 import org.junit.runner.RunWith;
55 import org.junit.runners.JUnit4;
56 
57 import static java.util.concurrent.Flow.Subscriber;
58 import static java.util.concurrent.Flow.Subscription;
59 import static java.util.concurrent.TimeUnit.MILLISECONDS;
60 
61 // Android-changed: Use JUnit4.
62 @RunWith(JUnit4.class)
63 public class SubmissionPublisherTest extends JSR166TestCase {
64 
65     // Android-changed: Use JUnitCore.main.
main(String[] args)66     public static void main(String[] args) {
67         // main(suite(), args);
68         org.junit.runner.JUnitCore.main("test.java.util.concurrent.tck.SubmissionPublisherTest");
69     }
70     // public static Test suite() {
71     //     return new TestSuite(SubmissionPublisherTest.class);
72     // }
73 
74     final Executor basicExecutor = basicPublisher().getExecutor();
75 
basicPublisher()76     static SubmissionPublisher<Integer> basicPublisher() {
77         return new SubmissionPublisher<Integer>();
78     }
79 
80     static class SPException extends RuntimeException {}
81 
82     class TestSubscriber implements Subscriber<Integer> {
83         volatile Subscription sn;
84         int last;  // Requires that onNexts are in numeric order
85         volatile int nexts;
86         volatile int errors;
87         volatile int completes;
88         volatile boolean throwOnCall = false;
89         volatile boolean request = true;
90         volatile Throwable lastError;
91 
onSubscribe(Subscription s)92         public synchronized void onSubscribe(Subscription s) {
93             threadAssertTrue(sn == null);
94             sn = s;
95             notifyAll();
96             if (throwOnCall)
97                 throw new SPException();
98             if (request)
99                 sn.request(1L);
100         }
onNext(Integer t)101         public synchronized void onNext(Integer t) {
102             ++nexts;
103             notifyAll();
104             int current = t.intValue();
105             threadAssertTrue(current >= last);
106             last = current;
107             if (request)
108                 sn.request(1L);
109             if (throwOnCall)
110                 throw new SPException();
111         }
onError(Throwable t)112         public synchronized void onError(Throwable t) {
113             threadAssertTrue(completes == 0);
114             threadAssertTrue(errors == 0);
115             lastError = t;
116             ++errors;
117             notifyAll();
118         }
onComplete()119         public synchronized void onComplete() {
120             threadAssertTrue(completes == 0);
121             ++completes;
122             notifyAll();
123         }
124 
awaitSubscribe()125         synchronized void awaitSubscribe() {
126             while (sn == null) {
127                 try {
128                     wait();
129                 } catch (Exception ex) {
130                     threadUnexpectedException(ex);
131                     break;
132                 }
133             }
134         }
awaitNext(int n)135         synchronized void awaitNext(int n) {
136             while (nexts < n) {
137                 try {
138                     wait();
139                 } catch (Exception ex) {
140                     threadUnexpectedException(ex);
141                     break;
142                 }
143             }
144         }
awaitComplete()145         synchronized void awaitComplete() {
146             while (completes == 0 && errors == 0) {
147                 try {
148                     wait();
149                 } catch (Exception ex) {
150                     threadUnexpectedException(ex);
151                     break;
152                 }
153             }
154         }
awaitError()155         synchronized void awaitError() {
156             while (errors == 0) {
157                 try {
158                     wait();
159                 } catch (Exception ex) {
160                     threadUnexpectedException(ex);
161                     break;
162                 }
163             }
164         }
165 
166     }
167 
168     /**
169      * A new SubmissionPublisher has no subscribers, a non-null
170      * executor, a power-of-two capacity, is not closed, and reports
171      * zero demand and lag
172      */
checkInitialState(SubmissionPublisher<?> p)173     void checkInitialState(SubmissionPublisher<?> p) {
174         assertFalse(p.hasSubscribers());
175         assertEquals(0, p.getNumberOfSubscribers());
176         assertTrue(p.getSubscribers().isEmpty());
177         assertFalse(p.isClosed());
178         assertNull(p.getClosedException());
179         int n = p.getMaxBufferCapacity();
180         assertTrue((n & (n - 1)) == 0); // power of two
181         assertNotNull(p.getExecutor());
182         assertEquals(0, p.estimateMinimumDemand());
183         assertEquals(0, p.estimateMaximumLag());
184     }
185 
186     /**
187      * A default-constructed SubmissionPublisher has no subscribers,
188      * is not closed, has default buffer size, and uses the
189      * defaultExecutor
190      */
191     @Test
testConstructor1()192     public void testConstructor1() {
193         SubmissionPublisher<Integer> p = new SubmissionPublisher<>();
194         checkInitialState(p);
195         assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
196         Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
197         if (ForkJoinPool.getCommonPoolParallelism() > 1)
198             assertSame(e, c);
199         else
200             assertNotSame(e, c);
201     }
202 
203     /**
204      * A new SubmissionPublisher has no subscribers, is not closed,
205      * has the given buffer size, and uses the given executor
206      */
207     @Test
testConstructor2()208     public void testConstructor2() {
209         Executor e = Executors.newFixedThreadPool(1);
210         SubmissionPublisher<Integer> p = new SubmissionPublisher<>(e, 8);
211         checkInitialState(p);
212         assertSame(p.getExecutor(), e);
213         assertEquals(8, p.getMaxBufferCapacity());
214     }
215 
216     /**
217      * A null Executor argument to SubmissionPublisher constructor
218      * throws NullPointerException
219      */
220     @Test
testConstructor3()221     public void testConstructor3() {
222         try {
223             new SubmissionPublisher<Integer>(null, 8);
224             shouldThrow();
225         } catch (NullPointerException success) {}
226     }
227 
228     /**
229      * A negative capacity argument to SubmissionPublisher constructor
230      * throws IllegalArgumentException
231      */
232     @Test
testConstructor4()233     public void testConstructor4() {
234         Executor e = Executors.newFixedThreadPool(1);
235         try {
236             new SubmissionPublisher<Integer>(e, -1);
237             shouldThrow();
238         } catch (IllegalArgumentException success) {}
239     }
240 
241     /**
242      * A closed publisher reports isClosed with no closedException and
243      * throws IllegalStateException upon attempted submission; a
244      * subsequent close or closeExceptionally has no additional
245      * effect.
246      */
247     @Test
testClose()248     public void testClose() {
249         SubmissionPublisher<Integer> p = basicPublisher();
250         checkInitialState(p);
251         p.close();
252         assertTrue(p.isClosed());
253         assertNull(p.getClosedException());
254         try {
255             p.submit(1);
256             shouldThrow();
257         } catch (IllegalStateException success) {}
258         Throwable ex = new SPException();
259         p.closeExceptionally(ex);
260         assertTrue(p.isClosed());
261         assertNull(p.getClosedException());
262     }
263 
264     /**
265      * A publisher closedExceptionally reports isClosed with the
266      * closedException and throws IllegalStateException upon attempted
267      * submission; a subsequent close or closeExceptionally has no
268      * additional effect.
269      */
270     @Test
testCloseExceptionally()271     public void testCloseExceptionally() {
272         SubmissionPublisher<Integer> p = basicPublisher();
273         checkInitialState(p);
274         Throwable ex = new SPException();
275         p.closeExceptionally(ex);
276         assertTrue(p.isClosed());
277         assertSame(p.getClosedException(), ex);
278         try {
279             p.submit(1);
280             shouldThrow();
281         } catch (IllegalStateException success) {}
282         p.close();
283         assertTrue(p.isClosed());
284         assertSame(p.getClosedException(), ex);
285     }
286 
287     /**
288      * Upon subscription, the subscriber's onSubscribe is called, no
289      * other Subscriber methods are invoked, the publisher
290      * hasSubscribers, isSubscribed is true, and existing
291      * subscriptions are unaffected.
292      */
293     @Test
testSubscribe1()294     public void testSubscribe1() {
295         TestSubscriber s = new TestSubscriber();
296         SubmissionPublisher<Integer> p = basicPublisher();
297         p.subscribe(s);
298         assertTrue(p.hasSubscribers());
299         assertEquals(1, p.getNumberOfSubscribers());
300         assertTrue(p.getSubscribers().contains(s));
301         assertTrue(p.isSubscribed(s));
302         s.awaitSubscribe();
303         assertNotNull(s.sn);
304         assertEquals(0, s.nexts);
305         assertEquals(0, s.errors);
306         assertEquals(0, s.completes);
307         TestSubscriber s2 = new TestSubscriber();
308         p.subscribe(s2);
309         assertTrue(p.hasSubscribers());
310         assertEquals(2, p.getNumberOfSubscribers());
311         assertTrue(p.getSubscribers().contains(s));
312         assertTrue(p.getSubscribers().contains(s2));
313         assertTrue(p.isSubscribed(s));
314         assertTrue(p.isSubscribed(s2));
315         s2.awaitSubscribe();
316         assertNotNull(s2.sn);
317         assertEquals(0, s2.nexts);
318         assertEquals(0, s2.errors);
319         assertEquals(0, s2.completes);
320         p.close();
321     }
322 
323     /**
324      * If closed, upon subscription, the subscriber's onComplete
325      * method is invoked
326      */
327     @Test
testSubscribe2()328     public void testSubscribe2() {
329         TestSubscriber s = new TestSubscriber();
330         SubmissionPublisher<Integer> p = basicPublisher();
331         p.close();
332         p.subscribe(s);
333         s.awaitComplete();
334         assertEquals(0, s.nexts);
335         assertEquals(0, s.errors);
336         assertEquals(1, s.completes, 1);
337     }
338 
339     /**
340      * If closedExceptionally, upon subscription, the subscriber's
341      * onError method is invoked
342      */
343     @Test
testSubscribe3()344     public void testSubscribe3() {
345         TestSubscriber s = new TestSubscriber();
346         SubmissionPublisher<Integer> p = basicPublisher();
347         Throwable ex = new SPException();
348         p.closeExceptionally(ex);
349         assertTrue(p.isClosed());
350         assertSame(p.getClosedException(), ex);
351         p.subscribe(s);
352         s.awaitError();
353         assertEquals(0, s.nexts);
354         assertEquals(1, s.errors);
355     }
356 
357     /**
358      * Upon attempted resubscription, the subscriber's onError is
359      * called and the subscription is cancelled.
360      */
361     @Test
testSubscribe4()362     public void testSubscribe4() {
363         TestSubscriber s = new TestSubscriber();
364         SubmissionPublisher<Integer> p = basicPublisher();
365         p.subscribe(s);
366         assertTrue(p.hasSubscribers());
367         assertEquals(1, p.getNumberOfSubscribers());
368         assertTrue(p.getSubscribers().contains(s));
369         assertTrue(p.isSubscribed(s));
370         s.awaitSubscribe();
371         assertNotNull(s.sn);
372         assertEquals(0, s.nexts);
373         assertEquals(0, s.errors);
374         assertEquals(0, s.completes);
375         p.subscribe(s);
376         s.awaitError();
377         assertEquals(0, s.nexts);
378         assertEquals(1, s.errors);
379         assertFalse(p.isSubscribed(s));
380     }
381 
382     /**
383      * An exception thrown in onSubscribe causes onError
384      */
385     @Test
testSubscribe5()386     public void testSubscribe5() {
387         TestSubscriber s = new TestSubscriber();
388         SubmissionPublisher<Integer> p = basicPublisher();
389         s.throwOnCall = true;
390         p.subscribe(s);
391         s.awaitError();
392         assertEquals(0, s.nexts);
393         assertEquals(1, s.errors);
394         assertEquals(0, s.completes);
395     }
396 
397     /**
398      * subscribe(null) throws NPE
399      */
400     @Test
testSubscribe6()401     public void testSubscribe6() {
402         SubmissionPublisher<Integer> p = basicPublisher();
403         try {
404             p.subscribe(null);
405             shouldThrow();
406         } catch (NullPointerException success) {}
407         checkInitialState(p);
408     }
409 
410     /**
411      * Closing a publisher causes onComplete to subscribers
412      */
413     @Test
testCloseCompletes()414     public void testCloseCompletes() {
415         SubmissionPublisher<Integer> p = basicPublisher();
416         TestSubscriber s1 = new TestSubscriber();
417         TestSubscriber s2 = new TestSubscriber();
418         p.subscribe(s1);
419         p.subscribe(s2);
420         p.submit(1);
421         p.close();
422         assertTrue(p.isClosed());
423         assertNull(p.getClosedException());
424         s1.awaitComplete();
425         assertEquals(1, s1.nexts);
426         assertEquals(1, s1.completes);
427         s2.awaitComplete();
428         assertEquals(1, s2.nexts);
429         assertEquals(1, s2.completes);
430     }
431 
432     /**
433      * Closing a publisher exceptionally causes onError to subscribers
434      * after they are subscribed
435      */
436     @Test
testCloseExceptionallyError()437     public void testCloseExceptionallyError() {
438         SubmissionPublisher<Integer> p = basicPublisher();
439         TestSubscriber s1 = new TestSubscriber();
440         TestSubscriber s2 = new TestSubscriber();
441         p.subscribe(s1);
442         p.subscribe(s2);
443         p.submit(1);
444         p.closeExceptionally(new SPException());
445         assertTrue(p.isClosed());
446         s1.awaitSubscribe();
447         s1.awaitError();
448         assertTrue(s1.nexts <= 1);
449         assertEquals(1, s1.errors);
450         s2.awaitSubscribe();
451         s2.awaitError();
452         assertTrue(s2.nexts <= 1);
453         assertEquals(1, s2.errors);
454     }
455 
456     /**
457      * Cancelling a subscription eventually causes no more onNexts to be issued
458      */
459     @Test
testCancel()460     public void testCancel() {
461         SubmissionPublisher<Integer> p =
462             new SubmissionPublisher<>(basicExecutor, 4); // must be < 20
463         TestSubscriber s1 = new TestSubscriber();
464         TestSubscriber s2 = new TestSubscriber();
465         p.subscribe(s1);
466         p.subscribe(s2);
467         s1.awaitSubscribe();
468         p.submit(1);
469         s1.sn.cancel();
470         for (int i = 2; i <= 20; ++i)
471             p.submit(i);
472         p.close();
473         s2.awaitComplete();
474         assertEquals(20, s2.nexts);
475         assertEquals(1, s2.completes);
476         assertTrue(s1.nexts < 20);
477         assertFalse(p.isSubscribed(s1));
478     }
479 
480     /**
481      * Throwing an exception in onNext causes onError
482      */
483     @Test
484     public void testThrowOnNext() {
485         SubmissionPublisher<Integer> p = basicPublisher();
486         TestSubscriber s1 = new TestSubscriber();
487         TestSubscriber s2 = new TestSubscriber();
488         p.subscribe(s1);
489         p.subscribe(s2);
490         s1.awaitSubscribe();
491         p.submit(1);
492         s1.throwOnCall = true;
493         p.submit(2);
494         p.close();
495         s2.awaitComplete();
496         assertEquals(2, s2.nexts);
497         s1.awaitComplete();
498         assertEquals(1, s1.errors);
499     }
500 
501     /**
502      * If a handler is supplied in constructor, it is invoked when
503      * subscriber throws an exception in onNext
504      */
505     @Test
506     public void testThrowOnNextHandler() {
507         AtomicInteger calls = new AtomicInteger();
508         SubmissionPublisher<Integer> p = new SubmissionPublisher<>(
509             basicExecutor, 8, (s, e) -> calls.getAndIncrement());
510         TestSubscriber s1 = new TestSubscriber();
511         TestSubscriber s2 = new TestSubscriber();
512         p.subscribe(s1);
513         p.subscribe(s2);
514         s1.awaitSubscribe();
515         p.submit(1);
516         s1.throwOnCall = true;
517         p.submit(2);
518         p.close();
519         s2.awaitComplete();
520         assertEquals(2, s2.nexts);
521         assertEquals(1, s2.completes);
522         s1.awaitError();
523         assertEquals(1, s1.errors);
524         assertEquals(1, calls.get());
525     }
526 
527     /**
528      * onNext items are issued in the same order to each subscriber
529      */
530     @Test
testOrder()531     public void testOrder() {
532         SubmissionPublisher<Integer> p = basicPublisher();
533         TestSubscriber s1 = new TestSubscriber();
534         TestSubscriber s2 = new TestSubscriber();
535         p.subscribe(s1);
536         p.subscribe(s2);
537         for (int i = 1; i <= 20; ++i)
538             p.submit(i);
539         p.close();
540         s2.awaitComplete();
541         s1.awaitComplete();
542         assertEquals(20, s2.nexts);
543         assertEquals(1, s2.completes);
544         assertEquals(20, s1.nexts);
545         assertEquals(1, s1.completes);
546     }
547 
548     /**
549      * onNext is issued only if requested
550      */
551     @Test
testRequest1()552     public void testRequest1() {
553         SubmissionPublisher<Integer> p = basicPublisher();
554         TestSubscriber s1 = new TestSubscriber();
555         s1.request = false;
556         p.subscribe(s1);
557         s1.awaitSubscribe();
558         assertEquals(0, p.estimateMinimumDemand());
559         TestSubscriber s2 = new TestSubscriber();
560         p.subscribe(s2);
561         p.submit(1);
562         p.submit(2);
563         s2.awaitNext(1);
564         assertEquals(0, s1.nexts);
565         s1.sn.request(3);
566         p.submit(3);
567         p.close();
568         s2.awaitComplete();
569         assertEquals(3, s2.nexts);
570         assertEquals(1, s2.completes);
571         s1.awaitComplete();
572         assertTrue(s1.nexts > 0);
573         assertEquals(1, s1.completes);
574     }
575 
576     /**
577      * onNext is not issued when requests become zero
578      */
579     @Test
testRequest2()580     public void testRequest2() {
581         SubmissionPublisher<Integer> p = basicPublisher();
582         TestSubscriber s1 = new TestSubscriber();
583         TestSubscriber s2 = new TestSubscriber();
584         p.subscribe(s1);
585         p.subscribe(s2);
586         s2.awaitSubscribe();
587         s1.awaitSubscribe();
588         s1.request = false;
589         p.submit(1);
590         p.submit(2);
591         p.close();
592         s2.awaitComplete();
593         assertEquals(2, s2.nexts);
594         assertEquals(1, s2.completes);
595         s1.awaitNext(1);
596         assertEquals(1, s1.nexts);
597     }
598 
599     /**
600      * Non-positive request causes error
601      */
602     @Test
testRequest3()603     public void testRequest3() {
604         SubmissionPublisher<Integer> p = basicPublisher();
605         TestSubscriber s1 = new TestSubscriber();
606         TestSubscriber s2 = new TestSubscriber();
607         TestSubscriber s3 = new TestSubscriber();
608         p.subscribe(s1);
609         p.subscribe(s2);
610         p.subscribe(s3);
611         s3.awaitSubscribe();
612         s2.awaitSubscribe();
613         s1.awaitSubscribe();
614         s1.sn.request(-1L);
615         s3.sn.request(0L);
616         p.submit(1);
617         p.submit(2);
618         p.close();
619         s2.awaitComplete();
620         assertEquals(2, s2.nexts);
621         assertEquals(1, s2.completes);
622         s1.awaitError();
623         assertEquals(1, s1.errors);
624         assertTrue(s1.lastError instanceof IllegalArgumentException);
625         s3.awaitError();
626         assertEquals(1, s3.errors);
627         assertTrue(s3.lastError instanceof IllegalArgumentException);
628     }
629 
630     /**
631      * estimateMinimumDemand reports 0 until request, nonzero after
632      * request
633      */
634     @Test
testEstimateMinimumDemand()635     public void testEstimateMinimumDemand() {
636         TestSubscriber s = new TestSubscriber();
637         SubmissionPublisher<Integer> p = basicPublisher();
638         s.request = false;
639         p.subscribe(s);
640         s.awaitSubscribe();
641         assertEquals(0, p.estimateMinimumDemand());
642         s.sn.request(1);
643         assertEquals(1, p.estimateMinimumDemand());
644     }
645 
646     /**
647      * submit to a publisher with no subscribers returns lag 0
648      */
649     @Test
testEmptySubmit()650     public void testEmptySubmit() {
651         SubmissionPublisher<Integer> p = basicPublisher();
652         assertEquals(0, p.submit(1));
653     }
654 
655     /**
656      * submit(null) throws NPE
657      */
658     @Test
testNullSubmit()659     public void testNullSubmit() {
660         SubmissionPublisher<Integer> p = basicPublisher();
661         try {
662             p.submit(null);
663             shouldThrow();
664         } catch (NullPointerException success) {}
665     }
666 
667     /**
668      * submit returns number of lagged items, compatible with result
669      * of estimateMaximumLag.
670      */
671     @Test
testLaggedSubmit()672     public void testLaggedSubmit() {
673         SubmissionPublisher<Integer> p = basicPublisher();
674         TestSubscriber s1 = new TestSubscriber();
675         s1.request = false;
676         TestSubscriber s2 = new TestSubscriber();
677         s2.request = false;
678         p.subscribe(s1);
679         p.subscribe(s2);
680         s2.awaitSubscribe();
681         s1.awaitSubscribe();
682         assertEquals(1, p.submit(1));
683         assertTrue(p.estimateMaximumLag() >= 1);
684         assertTrue(p.submit(2) >= 2);
685         assertTrue(p.estimateMaximumLag() >= 2);
686         s1.sn.request(4);
687         assertTrue(p.submit(3) >= 3);
688         assertTrue(p.estimateMaximumLag() >= 3);
689         s2.sn.request(4);
690         p.submit(4);
691         p.close();
692         s2.awaitComplete();
693         assertEquals(4, s2.nexts);
694         s1.awaitComplete();
695         assertEquals(4, s2.nexts);
696     }
697 
698     /**
699      * submit eventually issues requested items when buffer capacity is 1
700      */
701     @Test
testCap1Submit()702     public void testCap1Submit() {
703         SubmissionPublisher<Integer> p
704             = new SubmissionPublisher<>(basicExecutor, 1);
705         TestSubscriber s1 = new TestSubscriber();
706         TestSubscriber s2 = new TestSubscriber();
707         p.subscribe(s1);
708         p.subscribe(s2);
709         for (int i = 1; i <= 20; ++i) {
710             assertTrue(p.submit(i) >= 0);
711         }
712         p.close();
713         s2.awaitComplete();
714         s1.awaitComplete();
715         assertEquals(20, s2.nexts);
716         assertEquals(1, s2.completes);
717         assertEquals(20, s1.nexts);
718         assertEquals(1, s1.completes);
719     }
720 
noopHandle(AtomicInteger count)721     static boolean noopHandle(AtomicInteger count) {
722         count.getAndIncrement();
723         return false;
724     }
725 
reqHandle(AtomicInteger count, Subscriber s)726     static boolean reqHandle(AtomicInteger count, Subscriber s) {
727         count.getAndIncrement();
728         ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
729         return true;
730     }
731 
732     /**
733      * offer to a publisher with no subscribers returns lag 0
734      */
735     @Test
testEmptyOffer()736     public void testEmptyOffer() {
737         SubmissionPublisher<Integer> p = basicPublisher();
738         assertEquals(0, p.offer(1, null));
739     }
740 
741     /**
742      * offer(null) throws NPE
743      */
744     @Test
testNullOffer()745     public void testNullOffer() {
746         SubmissionPublisher<Integer> p = basicPublisher();
747         try {
748             p.offer(null, null);
749             shouldThrow();
750         } catch (NullPointerException success) {}
751     }
752 
753     /**
754      * offer returns number of lagged items if not saturated
755      */
756     @Test
testLaggedOffer()757     public void testLaggedOffer() {
758         SubmissionPublisher<Integer> p = basicPublisher();
759         TestSubscriber s1 = new TestSubscriber();
760         s1.request = false;
761         TestSubscriber s2 = new TestSubscriber();
762         s2.request = false;
763         p.subscribe(s1);
764         p.subscribe(s2);
765         s2.awaitSubscribe();
766         s1.awaitSubscribe();
767         assertTrue(p.offer(1, null) >= 1);
768         assertTrue(p.offer(2, null) >= 2);
769         s1.sn.request(4);
770         assertTrue(p.offer(3, null) >= 3);
771         s2.sn.request(4);
772         p.offer(4, null);
773         p.close();
774         s2.awaitComplete();
775         assertEquals(4, s2.nexts);
776         s1.awaitComplete();
777         assertEquals(4, s2.nexts);
778     }
779 
780     /**
781      * offer reports drops if saturated
782      */
783     @Test
testDroppedOffer()784     public void testDroppedOffer() {
785         SubmissionPublisher<Integer> p
786             = new SubmissionPublisher<>(basicExecutor, 4);
787         TestSubscriber s1 = new TestSubscriber();
788         s1.request = false;
789         TestSubscriber s2 = new TestSubscriber();
790         s2.request = false;
791         p.subscribe(s1);
792         p.subscribe(s2);
793         s2.awaitSubscribe();
794         s1.awaitSubscribe();
795         for (int i = 1; i <= 4; ++i)
796             assertTrue(p.offer(i, null) >= 0);
797         p.offer(5, null);
798         assertTrue(p.offer(6, null) < 0);
799         s1.sn.request(64);
800         assertTrue(p.offer(7, null) < 0);
801         s2.sn.request(64);
802         p.close();
803         s2.awaitComplete();
804         assertTrue(s2.nexts >= 4);
805         s1.awaitComplete();
806         assertTrue(s1.nexts >= 4);
807     }
808 
809     /**
810      * offer invokes drop handler if saturated
811      */
812     @Test
testHandledDroppedOffer()813     public void testHandledDroppedOffer() {
814         AtomicInteger calls = new AtomicInteger();
815         SubmissionPublisher<Integer> p
816             = new SubmissionPublisher<>(basicExecutor, 4);
817         TestSubscriber s1 = new TestSubscriber();
818         s1.request = false;
819         TestSubscriber s2 = new TestSubscriber();
820         s2.request = false;
821         p.subscribe(s1);
822         p.subscribe(s2);
823         s2.awaitSubscribe();
824         s1.awaitSubscribe();
825         for (int i = 1; i <= 4; ++i)
826             assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
827         p.offer(4, (s, x) -> noopHandle(calls));
828         assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
829         s1.sn.request(64);
830         assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
831         s2.sn.request(64);
832         p.close();
833         s2.awaitComplete();
834         s1.awaitComplete();
835         assertTrue(calls.get() >= 4);
836     }
837 
838     /**
839      * offer succeeds if drop handler forces request
840      */
841     @Test
testRecoveredHandledDroppedOffer()842     public void testRecoveredHandledDroppedOffer() {
843         AtomicInteger calls = new AtomicInteger();
844         SubmissionPublisher<Integer> p
845             = new SubmissionPublisher<>(basicExecutor, 4);
846         TestSubscriber s1 = new TestSubscriber();
847         s1.request = false;
848         TestSubscriber s2 = new TestSubscriber();
849         s2.request = false;
850         p.subscribe(s1);
851         p.subscribe(s2);
852         s2.awaitSubscribe();
853         s1.awaitSubscribe();
854         int n = 0;
855         for (int i = 1; i <= 8; ++i) {
856             int d = p.offer(i, (s, x) -> reqHandle(calls, s));
857             n = n + 2 + (d < 0 ? d : 0);
858         }
859         p.close();
860         s2.awaitComplete();
861         s1.awaitComplete();
862         assertEquals(n, s1.nexts + s2.nexts);
863         assertTrue(calls.get() >= 2);
864     }
865 
866     /**
867      * Timed offer to a publisher with no subscribers returns lag 0
868      */
869     @Test
testEmptyTimedOffer()870     public void testEmptyTimedOffer() {
871         SubmissionPublisher<Integer> p = basicPublisher();
872         long startTime = System.nanoTime();
873         assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
874         assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
875     }
876 
877     /**
878      * Timed offer with null item or TimeUnit throws NPE
879      */
880     @Test
881     public void testNullTimedOffer() {
882         SubmissionPublisher<Integer> p = basicPublisher();
883         long startTime = System.nanoTime();
884         try {
885             p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
886             shouldThrow();
887         } catch (NullPointerException success) {}
888         try {
889             p.offer(1, LONG_DELAY_MS, null, null);
890             shouldThrow();
891         } catch (NullPointerException success) {}
892         assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
893     }
894 
895     /**
896      * Timed offer returns number of lagged items if not saturated
897      */
898     @Test
899     public void testLaggedTimedOffer() {
900         SubmissionPublisher<Integer> p = basicPublisher();
901         TestSubscriber s1 = new TestSubscriber();
902         s1.request = false;
903         TestSubscriber s2 = new TestSubscriber();
904         s2.request = false;
905         p.subscribe(s1);
906         p.subscribe(s2);
907         s2.awaitSubscribe();
908         s1.awaitSubscribe();
909         long startTime = System.nanoTime();
910         assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
911         assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
912         s1.sn.request(4);
913         assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
914         s2.sn.request(4);
915         p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
916         p.close();
917         s2.awaitComplete();
918         assertEquals(4, s2.nexts);
919         s1.awaitComplete();
920         assertEquals(4, s2.nexts);
921         assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
922     }
923 
924     /**
925      * Timed offer reports drops if saturated
926      */
927     @Test
928     public void testDroppedTimedOffer() {
929         SubmissionPublisher<Integer> p
930             = new SubmissionPublisher<>(basicExecutor, 4);
931         TestSubscriber s1 = new TestSubscriber();
932         s1.request = false;
933         TestSubscriber s2 = new TestSubscriber();
934         s2.request = false;
935         p.subscribe(s1);
936         p.subscribe(s2);
937         s2.awaitSubscribe();
938         s1.awaitSubscribe();
939         long delay = timeoutMillis();
940         for (int i = 1; i <= 4; ++i)
941             assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
942         long startTime = System.nanoTime();
943         assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
944         s1.sn.request(64);
945         assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
946         // 2 * delay should elapse but check only 1 * delay to allow timer slop
947         assertTrue(millisElapsedSince(startTime) >= delay);
948         s2.sn.request(64);
949         p.close();
950         s2.awaitComplete();
951         assertTrue(s2.nexts >= 2);
952         s1.awaitComplete();
953         assertTrue(s1.nexts >= 2);
954     }
955 
956     /**
957      * Timed offer invokes drop handler if saturated
958      */
959     @Test
testHandledDroppedTimedOffer()960     public void testHandledDroppedTimedOffer() {
961         AtomicInteger calls = new AtomicInteger();
962         SubmissionPublisher<Integer> p
963             = new SubmissionPublisher<>(basicExecutor, 4);
964         TestSubscriber s1 = new TestSubscriber();
965         s1.request = false;
966         TestSubscriber s2 = new TestSubscriber();
967         s2.request = false;
968         p.subscribe(s1);
969         p.subscribe(s2);
970         s2.awaitSubscribe();
971         s1.awaitSubscribe();
972         long delay = timeoutMillis();
973         for (int i = 1; i <= 4; ++i)
974             assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
975         long startTime = System.nanoTime();
976         assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
977         s1.sn.request(64);
978         assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
979         assertTrue(millisElapsedSince(startTime) >= delay);
980         s2.sn.request(64);
981         p.close();
982         s2.awaitComplete();
983         s1.awaitComplete();
984         assertTrue(calls.get() >= 2);
985     }
986 
987     /**
988      * Timed offer succeeds if drop handler forces request
989      */
990     @Test
testRecoveredHandledDroppedTimedOffer()991     public void testRecoveredHandledDroppedTimedOffer() {
992         AtomicInteger calls = new AtomicInteger();
993         SubmissionPublisher<Integer> p
994             = new SubmissionPublisher<>(basicExecutor, 4);
995         TestSubscriber s1 = new TestSubscriber();
996         s1.request = false;
997         TestSubscriber s2 = new TestSubscriber();
998         s2.request = false;
999         p.subscribe(s1);
1000         p.subscribe(s2);
1001         s2.awaitSubscribe();
1002         s1.awaitSubscribe();
1003         int n = 0;
1004         long delay = timeoutMillis();
1005         long startTime = System.nanoTime();
1006         for (int i = 1; i <= 6; ++i) {
1007             int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
1008             n = n + 2 + (d < 0 ? d : 0);
1009         }
1010         assertTrue(millisElapsedSince(startTime) >= delay);
1011         p.close();
1012         s2.awaitComplete();
1013         s1.awaitComplete();
1014         assertEquals(n, s1.nexts + s2.nexts);
1015         assertTrue(calls.get() >= 2);
1016     }
1017 
1018     /**
1019      * consume returns a CompletableFuture that is done when
1020      * publisher completes
1021      */
1022     @Test
testConsume()1023     public void testConsume() {
1024         AtomicInteger sum = new AtomicInteger();
1025         SubmissionPublisher<Integer> p = basicPublisher();
1026         CompletableFuture<Void> f =
1027             p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
1028         int n = 20;
1029         for (int i = 1; i <= n; ++i)
1030             p.submit(i);
1031         p.close();
1032         f.join();
1033         assertEquals((n * (n + 1)) / 2, sum.get());
1034     }
1035 
1036     /**
1037      * consume(null) throws NPE
1038      */
1039     @Test
testConsumeNPE()1040     public void testConsumeNPE() {
1041         SubmissionPublisher<Integer> p = basicPublisher();
1042         try {
1043             CompletableFuture<Void> f = p.consume(null);
1044             shouldThrow();
1045         } catch (NullPointerException success) {}
1046     }
1047 
1048     /**
1049      * consume eventually stops processing published items if cancelled
1050      */
1051     @Test
testCancelledConsume()1052     public void testCancelledConsume() {
1053         AtomicInteger count = new AtomicInteger();
1054         SubmissionPublisher<Integer> p = basicPublisher();
1055         CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
1056         f.cancel(true);
1057         int n = 1000000; // arbitrary limit
1058         for (int i = 1; i <= n; ++i)
1059             p.submit(i);
1060         assertTrue(count.get() < n);
1061     }
1062 
1063     /**
1064      * Tests scenario for
1065      * JDK-8187947: A race condition in SubmissionPublisher
1066      * cvs update -D '2017-11-25' src/main/java/util/concurrent/SubmissionPublisher.java && ant -Djsr166.expensiveTests=true -Djsr166.tckTestClass=SubmissionPublisherTest -Djsr166.methodFilter=testMissedSignal tck; cvs update -A src/main/java/util/concurrent/SubmissionPublisher.java
1067      */
1068     @Test
1069     public void testMissedSignal_8187947() throws Exception {
1070         if (!atLeastJava9()) return; // backport to jdk8 too hard
1071         final int N = expensiveTests ? (1 << 20) : (1 << 10);
1072         final CountDownLatch finished = new CountDownLatch(1);
1073         final SubmissionPublisher<Boolean> pub = new SubmissionPublisher<>();
1074         class Sub implements Subscriber<Boolean> {
1075             int received;
1076             public void onSubscribe(Subscription s) {
1077                 s.request(N);
1078             }
1079             public void onNext(Boolean item) {
1080                 if (++received == N)
1081                     finished.countDown();
1082                 else
1083                     CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE));
1084             }
1085             public void onError(Throwable t) { throw new AssertionError(t); }
1086             public void onComplete() {}
1087         }
1088         pub.subscribe(new Sub());
1089         CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE));
1090         await(finished);
1091     }
1092 }
1093