• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.jdk9
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import org.junit.Test
10 import java.util.concurrent.Flow as JFlow
11 import kotlin.test.*
12 
13 class PublishTest : TestBase() {
14     @Test
15     fun testBasicEmpty() = runTest {
16         expect(1)
17         val publisher = flowPublish<Int>(currentDispatcher()) {
18             expect(5)
19         }
20         expect(2)
21         publisher.subscribe(object : JFlow.Subscriber<Int> {
22             override fun onSubscribe(s: JFlow.Subscription?) { expect(3) }
23             override fun onNext(t: Int?) { expectUnreached() }
24             override fun onComplete() { expect(6) }
25             override fun onError(t: Throwable?) { expectUnreached() }
26         })
27         expect(4)
28         yield() // to publish coroutine
29         finish(7)
30     }
31 
32     @Test
33     fun testBasicSingle() = runTest {
34         expect(1)
35         val publisher = flowPublish(currentDispatcher()) {
36             expect(5)
37             send(42)
38             expect(7)
39         }
40         expect(2)
41         publisher.subscribe(object : JFlow.Subscriber<Int> {
42             override fun onSubscribe(s: JFlow.Subscription) {
43                 expect(3)
44                 s.request(1)
45             }
46             override fun onNext(t: Int) {
47                 expect(6)
48                 assertEquals(42, t)
49             }
50             override fun onComplete() { expect(8) }
51             override fun onError(t: Throwable?) { expectUnreached() }
52         })
53         expect(4)
54         yield() // to publish coroutine
55         finish(9)
56     }
57 
58     @Test
59     fun testBasicError() = runTest {
60         expect(1)
61         val publisher = flowPublish<Int>(currentDispatcher()) {
62             expect(5)
63             throw RuntimeException("OK")
64         }
65         expect(2)
66         publisher.subscribe(object : JFlow.Subscriber<Int> {
67             override fun onSubscribe(s: JFlow.Subscription) {
68                 expect(3)
69                 s.request(1)
70             }
71             override fun onNext(t: Int) { expectUnreached() }
72             override fun onComplete() { expectUnreached() }
73             override fun onError(t: Throwable) {
74                 expect(6)
75                 assertTrue(t is RuntimeException)
76                 assertEquals("OK", t.message)
77             }
78         })
79         expect(4)
80         yield() // to publish coroutine
81         finish(7)
82     }
83 
84     @Test
85     fun testHandleFailureAfterCancel() = runTest {
86         expect(1)
87 
88         val eh = CoroutineExceptionHandler { _, t ->
89             assertTrue(t is RuntimeException)
90             expect(6)
91         }
92         val publisher = flowPublish<Unit>(Dispatchers.Unconfined + eh) {
93             try {
94                 expect(3)
95                 delay(10000)
96             } finally {
97                 expect(5)
98                 throw RuntimeException("FAILED") // crash after cancel
99             }
100         }
101         var sub: JFlow.Subscription? = null
102         publisher.subscribe(object : JFlow.Subscriber<Unit> {
103             override fun onComplete() {
104                 expectUnreached()
105             }
106 
107             override fun onSubscribe(s: JFlow.Subscription) {
108                 expect(2)
109                 sub = s
110             }
111 
112             override fun onNext(t: Unit?) {
113                 expectUnreached()
114             }
115 
116             override fun onError(t: Throwable?) {
117                 expectUnreached()
118             }
119         })
120         expect(4)
121         sub!!.cancel()
122         finish(7)
123     }
124 
125     /** Tests that, as soon as `ProducerScope.close` is called, `isClosedForSend` starts returning `true`. */
126     @Test
127     fun testChannelClosing() = runTest {
128         expect(1)
129         val publisher = flowPublish<Int>(Dispatchers.Unconfined) {
130             expect(3)
131             close()
132             assert(isClosedForSend)
133             expect(4)
134         }
135         try {
136             expect(2)
137             publisher.awaitFirstOrNull()
138         } catch (e: CancellationException) {
139             expect(5)
140         }
141         finish(6)
142     }
143 
144     @Test
145     fun testOnNextError() = runTest {
146         val latch = CompletableDeferred<Unit>()
147         expect(1)
148         assertCallsExceptionHandlerWith<TestException> { exceptionHandler ->
149             val publisher = flowPublish(currentDispatcher() + exceptionHandler) {
150                 expect(4)
151                 try {
152                     send("OK")
153                 } catch (e: Throwable) {
154                     expect(6)
155                     assert(e is TestException)
156                     assert(isClosedForSend)
157                     latch.complete(Unit)
158                 }
159             }
160             expect(2)
161             publisher.subscribe(object : JFlow.Subscriber<String> {
162                 override fun onComplete() {
163                     expectUnreached()
164                 }
165 
166                 override fun onSubscribe(s: JFlow.Subscription) {
167                     expect(3)
168                     s.request(1)
169                 }
170 
171                 override fun onNext(t: String) {
172                     expect(5)
173                     assertEquals("OK", t)
174                     throw TestException()
175                 }
176 
177                 override fun onError(t: Throwable) {
178                     expectUnreached()
179                 }
180             })
181             latch.await()
182         }
183         finish(7)
184     }
185 
186     /** Tests the behavior when a call to `onNext` fails after the channel is already closed. */
187     @Test
188     fun testOnNextErrorAfterCancellation() = runTest {
189         assertCallsExceptionHandlerWith<TestException> { handler ->
190             var producerScope: ProducerScope<Int>? = null
191             CompletableDeferred<Unit>()
192             expect(1)
193             var job: Job? = null
194             val publisher = flowPublish<Int>(handler + Dispatchers.Unconfined) {
195                 producerScope = this
196                 expect(4)
197                 job = launch {
198                     delay(Long.MAX_VALUE)
199                 }
200             }
201             expect(2)
202             publisher.subscribe(object: JFlow.Subscriber<Int> {
203                 override fun onSubscribe(s: JFlow.Subscription) {
204                     expect(3)
205                     s.request(Long.MAX_VALUE)
206                 }
207                 override fun onNext(t: Int) {
208                     expect(6)
209                     assertEquals(1, t)
210                     job!!.cancel()
211                     throw TestException()
212                 }
213                 override fun onError(t: Throwable?) {
214                     /* Correct changes to the implementation could lead to us entering or not entering this method, but
215                     it only matters that if we do, it is the "correct" exception that was validly used to cancel the
216                     coroutine that gets passed here and not `TestException`. */
217                     assertTrue(t is CancellationException)
218                 }
219                 override fun onComplete() { expectUnreached() }
220             })
221             expect(5)
222             val result: ChannelResult<Unit> = producerScope!!.trySend(1)
223             val e = result.exceptionOrNull()!!
224             assertTrue(e is CancellationException, "The actual error: $e")
225             assertTrue(producerScope!!.isClosedForSend)
226             assertTrue(result.isFailure)
227         }
228         finish(7)
229     }
230 
231     @Test
232     fun testFailingConsumer() = runTest {
233         val pub = flowPublish(currentDispatcher()) {
234             repeat(3) {
235                 expect(it + 1) // expect(1), expect(2) *should* be invoked
236                 send(it)
237             }
238         }
239         try {
240             pub.collect {
241                 throw TestException()
242             }
243         } catch (e: TestException) {
244             finish(3)
245         }
246     }
247 
248     @Test
249     fun testIllegalArgumentException() {
250         assertFailsWith<IllegalArgumentException> { flowPublish<Int>(Job()) { } }
251     }
252 
253     /** Tests that `trySend` doesn't throw in `flowPublish`. */
254     @Test
255     fun testTrySendNotThrowing() = runTest {
256         var producerScope: ProducerScope<Int>? = null
257         expect(1)
258         val publisher = flowPublish<Int>(Dispatchers.Unconfined) {
259             producerScope = this
260             expect(3)
261             delay(Long.MAX_VALUE)
262         }
263         val job = launch(start = CoroutineStart.UNDISPATCHED) {
264             expect(2)
265             publisher.awaitFirstOrNull()
266             expectUnreached()
267         }
268         job.cancel()
269         expect(4)
270         val result = producerScope!!.trySend(1)
271         assertTrue(result.isFailure)
272         finish(5)
273     }
274 
275     /** Tests that all methods on `flowPublish` fail without closing the channel when attempting to emit `null`. */
276     @Test
277     fun testEmittingNull() = runTest {
278         val publisher = flowPublish {
279             assertFailsWith<NullPointerException> { send(null) }
280             assertFailsWith<NullPointerException> { trySend(null) }
281             send("OK")
282         }
283         assertEquals("OK", publisher.awaitFirstOrNull())
284     }
285 }
286