• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactive
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.CancellationException
9 import kotlinx.coroutines.channels.*
10 import kotlinx.coroutines.flow.*
11 import kotlinx.coroutines.sync.*
12 import org.junit.Test
13 import org.reactivestreams.*
14 import java.util.concurrent.*
15 import kotlin.test.*
16 
17 class PublishTest : TestBase() {
18     @Test
19     fun testBasicEmpty() = runTest {
20         expect(1)
21         val publisher = publish<Int>(currentDispatcher()) {
22             expect(5)
23         }
24         expect(2)
25         publisher.subscribe(object : Subscriber<Int> {
26             override fun onSubscribe(s: Subscription?) { expect(3) }
27             override fun onNext(t: Int?) { expectUnreached() }
28             override fun onComplete() { expect(6) }
29             override fun onError(t: Throwable?) { expectUnreached() }
30         })
31         expect(4)
32         yield() // to publish coroutine
33         finish(7)
34     }
35 
36     @Test
37     fun testBasicSingle() = runTest {
38         expect(1)
39         val publisher = publish(currentDispatcher()) {
40             expect(5)
41             send(42)
42             expect(7)
43         }
44         expect(2)
45         publisher.subscribe(object : Subscriber<Int> {
46             override fun onSubscribe(s: Subscription) {
47                 expect(3)
48                 s.request(1)
49             }
50             override fun onNext(t: Int) {
51                 expect(6)
52                 assertEquals(42, t)
53             }
54             override fun onComplete() { expect(8) }
55             override fun onError(t: Throwable?) { expectUnreached() }
56         })
57         expect(4)
58         yield() // to publish coroutine
59         finish(9)
60     }
61 
62     @Test
63     fun testBasicError() = runTest {
64         expect(1)
65         val publisher = publish<Int>(currentDispatcher()) {
66             expect(5)
67             throw RuntimeException("OK")
68         }
69         expect(2)
70         publisher.subscribe(object : Subscriber<Int> {
71             override fun onSubscribe(s: Subscription) {
72                 expect(3)
73                 s.request(1)
74             }
75             override fun onNext(t: Int) { expectUnreached() }
76             override fun onComplete() { expectUnreached() }
77             override fun onError(t: Throwable) {
78                 expect(6)
79                 assertTrue(t is RuntimeException)
80                 assertEquals("OK", t.message)
81             }
82         })
83         expect(4)
84         yield() // to publish coroutine
85         finish(7)
86     }
87 
88     @Test
89     fun testHandleFailureAfterCancel() = runTest {
90         expect(1)
91 
92         val eh = CoroutineExceptionHandler { _, t ->
93             assertTrue(t is RuntimeException)
94             expect(6)
95         }
96         val publisher = publish<Unit>(Dispatchers.Unconfined + eh) {
97             try {
98                 expect(3)
99                 delay(10000)
100             } finally {
101                 expect(5)
102                 throw RuntimeException("FAILED") // crash after cancel
103             }
104         }
105         var sub: Subscription? = null
106         publisher.subscribe(object : Subscriber<Unit> {
107             override fun onComplete() {
108                 expectUnreached()
109             }
110 
111             override fun onSubscribe(s: Subscription) {
112                 expect(2)
113                 sub = s
114             }
115 
116             override fun onNext(t: Unit?) {
117                 expectUnreached()
118             }
119 
120             override fun onError(t: Throwable?) {
121                 expectUnreached()
122             }
123         })
124         expect(4)
125         sub!!.cancel()
126         finish(7)
127     }
128 
129     /** Tests that, as soon as `ProducerScope.close` is called, `isClosedForSend` starts returning `true`. */
130     @Test
131     fun testChannelClosing() = runTest {
132         expect(1)
133         val publisher = publish<Int>(Dispatchers.Unconfined) {
134             expect(3)
135             close()
136             assert(isClosedForSend)
137             expect(4)
138         }
139         try {
140             expect(2)
141             publisher.awaitFirstOrNull()
142         } catch (e: CancellationException) {
143             expect(5)
144         }
145         finish(6)
146     }
147 
148     @Test
149     fun testOnNextError() = runTest {
150         val latch = CompletableDeferred<Unit>()
151         expect(1)
152         assertCallsExceptionHandlerWith<TestException> { exceptionHandler ->
153             val publisher = publish(currentDispatcher() + exceptionHandler) {
154                 expect(4)
155                 try {
156                     send("OK")
157                 } catch (e: Throwable) {
158                     expect(6)
159                     assert(e is TestException)
160                     assert(isClosedForSend)
161                     latch.complete(Unit)
162                 }
163             }
164             expect(2)
165             publisher.subscribe(object : Subscriber<String> {
166                 override fun onComplete() {
167                     expectUnreached()
168                 }
169 
170                 override fun onSubscribe(s: Subscription) {
171                     expect(3)
172                     s.request(1)
173                 }
174 
175                 override fun onNext(t: String) {
176                     expect(5)
177                     assertEquals("OK", t)
178                     throw TestException()
179                 }
180 
181                 override fun onError(t: Throwable) {
182                     expectUnreached()
183                 }
184             })
185             latch.await()
186         }
187         finish(7)
188     }
189 
190     /** Tests the behavior when a call to `onNext` fails after the channel is already closed. */
191     @Test
192     fun testOnNextErrorAfterCancellation() = runTest {
193         assertCallsExceptionHandlerWith<TestException> { handler ->
194             var producerScope: ProducerScope<Int>? = null
195             CompletableDeferred<Unit>()
196             expect(1)
197             var job: Job? = null
198             val publisher = publish<Int>(handler + Dispatchers.Unconfined) {
199                 producerScope = this
200                 expect(4)
201                 job = launch {
202                     delay(Long.MAX_VALUE)
203                 }
204             }
205             expect(2)
206             publisher.subscribe(object: Subscriber<Int> {
207                 override fun onSubscribe(s: Subscription) {
208                     expect(3)
209                     s.request(Long.MAX_VALUE)
210                 }
211                 override fun onNext(t: Int) {
212                     expect(6)
213                     assertEquals(1, t)
214                     job!!.cancel()
215                     throw TestException()
216                 }
217                 override fun onError(t: Throwable?) {
218                     /* Correct changes to the implementation could lead to us entering or not entering this method, but
219                     it only matters that if we do, it is the "correct" exception that was validly used to cancel the
220                     coroutine that gets passed here and not `TestException`. */
221                     assertTrue(t is CancellationException)
222                 }
223                 override fun onComplete() { expectUnreached() }
224             })
225             expect(5)
226             val result: ChannelResult<Unit> = producerScope!!.trySend(1)
227             val e = result.exceptionOrNull()!!
228             assertTrue(e is CancellationException, "The actual error: $e")
229             assertTrue(producerScope!!.isClosedForSend)
230             assertTrue(result.isFailure)
231         }
232         finish(7)
233     }
234 
235     @Test
236     fun testFailingConsumer() = runTest {
237         val pub = publish(currentDispatcher()) {
238             repeat(3) {
239                 expect(it + 1) // expect(1), expect(2) *should* be invoked
240                 send(it)
241             }
242         }
243         try {
244             pub.collect {
245                 throw TestException()
246             }
247         } catch (e: TestException) {
248             finish(3)
249         }
250     }
251 
252     @Test
253     fun testIllegalArgumentException() {
254         assertFailsWith<IllegalArgumentException> { publish<Int>(Job()) { } }
255     }
256 
257     /** Tests that `trySend` doesn't throw in `publish`. */
258     @Test
259     fun testTrySendNotThrowing() = runTest {
260         var producerScope: ProducerScope<Int>? = null
261         expect(1)
262         val publisher = publish<Int>(Dispatchers.Unconfined) {
263             producerScope = this
264             expect(3)
265             delay(Long.MAX_VALUE)
266         }
267         val job = launch(start = CoroutineStart.UNDISPATCHED) {
268             expect(2)
269             publisher.awaitFirstOrNull()
270             expectUnreached()
271         }
272         job.cancel()
273         expect(4)
274         val result = producerScope!!.trySend(1)
275         assertTrue(result.isFailure)
276         finish(5)
277     }
278 
279     /** Tests that all methods on `publish` fail without closing the channel when attempting to emit `null`. */
280     @Test
281     fun testEmittingNull() = runTest {
282         val publisher = publish {
283             assertFailsWith<NullPointerException> { send(null) }
284             assertFailsWith<NullPointerException> { trySend(null) }
285             send("OK")
286         }
287         assertEquals("OK", publisher.awaitFirstOrNull())
288     }
289 
290     @Test
291     fun testOnSendCancelled() = runTest {
292         val latch = CountDownLatch(1)
293         val published = publish(Dispatchers.Default) {
294             expect(2)
295             // Collector is ready
296             send(1)
297             try {
298                 send(2)
299                 expectUnreached()
300             } catch (e: CancellationException) {
301                 // publisher cancellation is async
302                 latch.countDown()
303                 throw e
304             }
305         }
306 
307         expect(1)
308         val collectorLatch = Mutex(true)
309         val job = launch {
310             published.asFlow().buffer(0).collect {
311                 collectorLatch.unlock()
312                 hang { expect(4) }
313             }
314         }
315         collectorLatch.lock()
316         expect(3)
317         job.cancelAndJoin()
318         latch.await()
319         finish(5)
320     }
321 }
322