• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactor
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.flow.*
10 import kotlinx.coroutines.reactive.*
11 import org.junit.Test
12 import kotlin.test.*
13 
14 class FluxTest : TestBase() {
15     @Test
16     fun testBasicSuccess() = runBlocking {
17         expect(1)
18         val flux = flux(currentDispatcher()) {
19             expect(4)
20             send("OK")
21         }
22         expect(2)
23         flux.subscribe { value ->
24             expect(5)
25             assertEquals("OK", value)
26         }
27         expect(3)
28         yield() // to started coroutine
29         finish(6)
30     }
31 
32     @Test
33     fun testBasicFailure() = runBlocking {
34         expect(1)
35         val flux = flux<String>(currentDispatcher()) {
36             expect(4)
37             throw RuntimeException("OK")
38         }
39         expect(2)
40         flux.subscribe({
41             expectUnreached()
42         }, { error ->
43             expect(5)
44             assertTrue(error is RuntimeException)
45             assertEquals("OK", error.message)
46         })
47         expect(3)
48         yield() // to started coroutine
49         finish(6)
50     }
51 
52     @Test
53     fun testBasicUnsubscribe() = runBlocking {
54         expect(1)
55         val flux = flux<String>(currentDispatcher()) {
56             expect(4)
57             yield() // back to main, will get cancelled
58             expectUnreached()
59         }
60         expect(2)
61         val sub = flux.subscribe({
62             expectUnreached()
63         }, {
64             expectUnreached()
65         })
66         expect(3)
67         yield() // to started coroutine
68         expect(5)
69         sub.dispose() // will cancel coroutine
70         yield()
71         finish(6)
72     }
73 
74     @Test
75     fun testNotifyOnceOnCancellation() = runTest {
76         expect(1)
77         val observable =
78             flux(currentDispatcher()) {
79                 expect(5)
80                 send("OK")
81                 try {
82                     delay(Long.MAX_VALUE)
83                 } catch (e: CancellationException) {
84                     expect(11)
85                 }
86             }
87             .doOnNext {
88                 expect(6)
89                 assertEquals("OK", it)
90             }
91             .doOnCancel {
92                 expect(10) // notified once!
93             }
94         expect(2)
95         val job = launch(start = CoroutineStart.UNDISPATCHED) {
96             expect(3)
97             observable.collect {
98                 expect(8)
99                 assertEquals("OK", it)
100             }
101         }
102         expect(4)
103         yield() // to observable code
104         expect(7)
105         yield() // to consuming coroutines
106         expect(9)
107         job.cancel()
108         job.join()
109         finish(12)
110     }
111 
112     @Test
113     fun testFailingConsumer() = runTest {
114         val pub = flux(currentDispatcher()) {
115             repeat(3) {
116                 expect(it + 1) // expect(1), expect(2) *should* be invoked
117                 send(it)
118             }
119         }
120         try {
121             pub.collect {
122                 throw TestException()
123             }
124         } catch (e: TestException) {
125             finish(3)
126         }
127     }
128 
129     @Test
130     fun testIllegalArgumentException() {
131         assertFailsWith<IllegalArgumentException> { flux<Int>(Job()) { } }
132     }
133 
134     @Test
135     fun testLeakedException() = runBlocking {
136         // Test exception is not reported to global handler
137         val flow = flux<Unit> { throw TestException() }.asFlow()
138         repeat(2000) {
139             combine(flow, flow) { _, _ -> Unit }
140                 .catch {}
141                 .collect { }
142         }
143     }
144 
145     /** Tests that `trySend` doesn't throw in `flux`. */
146     @Test
147     fun testTrySendNotThrowing() = runTest {
148         var producerScope: ProducerScope<Int>? = null
149         expect(1)
150         val flux = flux<Int>(Dispatchers.Unconfined) {
151             producerScope = this
152             expect(3)
153             delay(Long.MAX_VALUE)
154         }
155         val job = launch(start = CoroutineStart.UNDISPATCHED) {
156             expect(2)
157             flux.awaitFirstOrNull()
158             expectUnreached()
159         }
160         job.cancel()
161         expect(4)
162         val result = producerScope!!.trySend(1)
163         assertTrue(result.isFailure)
164         finish(5)
165     }
166 
167     /** Tests that all methods on `flux` fail without closing the channel when attempting to emit `null`. */
168     @Test
169     fun testEmittingNull() = runTest {
170         val flux = flux {
171             assertFailsWith<NullPointerException> { send(null) }
172             assertFailsWith<NullPointerException> { trySend(null) }
173             send("OK")
174         }
175         assertEquals("OK", flux.awaitFirstOrNull())
176     }
177 }
178