• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.jdk9
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.channels.*
6 import kotlinx.coroutines.testing.exceptions.*
7 import org.junit.Test
8 import java.util.concurrent.Flow as JFlow
9 import kotlin.test.*
10 
11 class PublishTest : TestBase() {
12     @Test
13     fun testBasicEmpty() = runTest {
14         expect(1)
15         val publisher = flowPublish<Int>(currentDispatcher()) {
16             expect(5)
17         }
18         expect(2)
19         publisher.subscribe(object : JFlow.Subscriber<Int> {
20             override fun onSubscribe(s: JFlow.Subscription?) { expect(3) }
21             override fun onNext(t: Int?) { expectUnreached() }
22             override fun onComplete() { expect(6) }
23             override fun onError(t: Throwable?) { expectUnreached() }
24         })
25         expect(4)
26         yield() // to publish coroutine
27         finish(7)
28     }
29 
30     @Test
31     fun testBasicSingle() = runTest {
32         expect(1)
33         val publisher = flowPublish(currentDispatcher()) {
34             expect(5)
35             send(42)
36             expect(7)
37         }
38         expect(2)
39         publisher.subscribe(object : JFlow.Subscriber<Int> {
40             override fun onSubscribe(s: JFlow.Subscription) {
41                 expect(3)
42                 s.request(1)
43             }
44             override fun onNext(t: Int) {
45                 expect(6)
46                 assertEquals(42, t)
47             }
48             override fun onComplete() { expect(8) }
49             override fun onError(t: Throwable?) { expectUnreached() }
50         })
51         expect(4)
52         yield() // to publish coroutine
53         finish(9)
54     }
55 
56     @Test
57     fun testBasicError() = runTest {
58         expect(1)
59         val publisher = flowPublish<Int>(currentDispatcher()) {
60             expect(5)
61             throw RuntimeException("OK")
62         }
63         expect(2)
64         publisher.subscribe(object : JFlow.Subscriber<Int> {
65             override fun onSubscribe(s: JFlow.Subscription) {
66                 expect(3)
67                 s.request(1)
68             }
69             override fun onNext(t: Int) { expectUnreached() }
70             override fun onComplete() { expectUnreached() }
71             override fun onError(t: Throwable) {
72                 expect(6)
73                 assertIs<RuntimeException>(t)
74                 assertEquals("OK", t.message)
75             }
76         })
77         expect(4)
78         yield() // to publish coroutine
79         finish(7)
80     }
81 
82     @Test
83     fun testHandleFailureAfterCancel() = runTest {
84         expect(1)
85 
86         val eh = CoroutineExceptionHandler { _, t ->
87             assertIs<RuntimeException>(t)
88             expect(6)
89         }
90         val publisher = flowPublish<Unit>(Dispatchers.Unconfined + eh) {
91             try {
92                 expect(3)
93                 delay(10000)
94             } finally {
95                 expect(5)
96                 throw RuntimeException("FAILED") // crash after cancel
97             }
98         }
99         var sub: JFlow.Subscription? = null
100         publisher.subscribe(object : JFlow.Subscriber<Unit> {
101             override fun onComplete() {
102                 expectUnreached()
103             }
104 
105             override fun onSubscribe(s: JFlow.Subscription) {
106                 expect(2)
107                 sub = s
108             }
109 
110             override fun onNext(t: Unit?) {
111                 expectUnreached()
112             }
113 
114             override fun onError(t: Throwable?) {
115                 expectUnreached()
116             }
117         })
118         expect(4)
119         sub!!.cancel()
120         finish(7)
121     }
122 
123     /** Tests that, as soon as `ProducerScope.close` is called, `isClosedForSend` starts returning `true`. */
124     @Test
125     fun testChannelClosing() = runTest {
126         expect(1)
127         val publisher = flowPublish<Int>(Dispatchers.Unconfined) {
128             expect(3)
129             close()
130             assert(isClosedForSend)
131             expect(4)
132         }
133         try {
134             expect(2)
135             publisher.awaitFirstOrNull()
136         } catch (e: CancellationException) {
137             expect(5)
138         }
139         finish(6)
140     }
141 
142     @Test
143     fun testOnNextError() = runTest {
144         val latch = CompletableDeferred<Unit>()
145         expect(1)
146         assertCallsExceptionHandlerWith<TestException> { exceptionHandler ->
147             val publisher = flowPublish(currentDispatcher() + exceptionHandler) {
148                 expect(4)
149                 try {
150                     send("OK")
151                 } catch (e: Throwable) {
152                     expect(6)
153                     assert(e is TestException)
154                     assert(isClosedForSend)
155                     latch.complete(Unit)
156                 }
157             }
158             expect(2)
159             publisher.subscribe(object : JFlow.Subscriber<String> {
160                 override fun onComplete() {
161                     expectUnreached()
162                 }
163 
164                 override fun onSubscribe(s: JFlow.Subscription) {
165                     expect(3)
166                     s.request(1)
167                 }
168 
169                 override fun onNext(t: String) {
170                     expect(5)
171                     assertEquals("OK", t)
172                     throw TestException()
173                 }
174 
175                 override fun onError(t: Throwable) {
176                     expectUnreached()
177                 }
178             })
179             latch.await()
180         }
181         finish(7)
182     }
183 
184     /** Tests the behavior when a call to `onNext` fails after the channel is already closed. */
185     @Test
186     fun testOnNextErrorAfterCancellation() = runTest {
187         assertCallsExceptionHandlerWith<TestException> { handler ->
188             var producerScope: ProducerScope<Int>? = null
189             CompletableDeferred<Unit>()
190             expect(1)
191             var job: Job? = null
192             val publisher = flowPublish<Int>(handler + Dispatchers.Unconfined) {
193                 producerScope = this
194                 expect(4)
195                 job = launch {
196                     delay(Long.MAX_VALUE)
197                 }
198             }
199             expect(2)
200             publisher.subscribe(object : JFlow.Subscriber<Int> {
201                 override fun onSubscribe(s: JFlow.Subscription) {
202                     expect(3)
203                     s.request(Long.MAX_VALUE)
204                 }
205 
206                 override fun onNext(t: Int) {
207                     expect(6)
208                     assertEquals(1, t)
209                     job!!.cancel()
210                     throw TestException()
211                 }
212 
213                 override fun onError(t: Throwable?) {
214                     /* Correct changes to the implementation could lead to us entering or not entering this method, but
215                     it only matters that if we do, it is the "correct" exception that was validly used to cancel the
216                     coroutine that gets passed here and not `TestException`. */
217                     assertIs<CancellationException>(t)
218                 }
219 
220                 override fun onComplete() {
221                     expectUnreached()
222                 }
223             })
224             expect(5)
225             val result: ChannelResult<Unit> = producerScope!!.trySend(1)
226             val e = result.exceptionOrNull()!!
227             assertIs<CancellationException>(e, "The actual error: $e")
228             assertTrue(producerScope!!.isClosedForSend)
229             assertTrue(result.isFailure)
230         }
231         finish(7)
232     }
233 
234     @Test
235     fun testFailingConsumer() = runTest {
236         val pub = flowPublish(currentDispatcher()) {
237             repeat(3) {
238                 expect(it + 1) // expect(1), expect(2) *should* be invoked
239                 send(it)
240             }
241         }
242         try {
243             pub.collect {
244                 throw TestException()
245             }
246         } catch (e: TestException) {
247             finish(3)
248         }
249     }
250 
251     @Test
252     fun testIllegalArgumentException() {
253         assertFailsWith<IllegalArgumentException> { flowPublish<Int>(Job()) { } }
254     }
255 
256     /** Tests that `trySend` doesn't throw in `flowPublish`. */
257     @Test
258     fun testTrySendNotThrowing() = runTest {
259         var producerScope: ProducerScope<Int>? = null
260         expect(1)
261         val publisher = flowPublish<Int>(Dispatchers.Unconfined) {
262             producerScope = this
263             expect(3)
264             delay(Long.MAX_VALUE)
265         }
266         val job = launch(start = CoroutineStart.UNDISPATCHED) {
267             expect(2)
268             publisher.awaitFirstOrNull()
269             expectUnreached()
270         }
271         job.cancel()
272         expect(4)
273         val result = producerScope!!.trySend(1)
274         assertTrue(result.isFailure)
275         finish(5)
276     }
277 
278     /** Tests that all methods on `flowPublish` fail without closing the channel when attempting to emit `null`. */
279     @Test
280     fun testEmittingNull() = runTest {
281         val publisher = flowPublish {
282             assertFailsWith<NullPointerException> { send(null) }
283             assertFailsWith<NullPointerException> { trySend(null) }
284             send("OK")
285         }
286         assertEquals("OK", publisher.awaitFirstOrNull())
287     }
288 }
289