• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx3
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.reactive.*
6 import org.junit.Test
7 import kotlin.test.*
8 
9 class FlowableTest : TestBase() {
10     @Test
11     fun testBasicSuccess() = runBlocking {
12         expect(1)
13         val observable = rxFlowable(currentDispatcher()) {
14             expect(4)
15             send("OK")
16         }
17         expect(2)
18         observable.subscribe { value ->
19             expect(5)
20             assertEquals("OK", value)
21         }
22         expect(3)
23         yield() // to started coroutine
24         finish(6)
25     }
26 
27     @Test
28     fun testBasicFailure() = runBlocking {
29         expect(1)
30         val observable = rxFlowable<String>(currentDispatcher()) {
31             expect(4)
32             throw RuntimeException("OK")
33         }
34         expect(2)
35         observable.subscribe({
36             expectUnreached()
37         }, { error ->
38             expect(5)
39             assertIs<RuntimeException>(error)
40             assertEquals("OK", error.message)
41         })
42         expect(3)
43         yield() // to started coroutine
44         finish(6)
45     }
46 
47     @Test
48     fun testBasicUnsubscribe() = runBlocking {
49         expect(1)
50         val observable = rxFlowable<String>(currentDispatcher()) {
51             expect(4)
52             yield() // back to main, will get cancelled
53             expectUnreached()
54         }
55         expect(2)
56         val sub = observable.subscribe({
57             expectUnreached()
58         }, {
59             expectUnreached()
60         })
61         expect(3)
62         yield() // to started coroutine
63         expect(5)
64         sub.dispose() // will cancel coroutine
65         yield()
66         finish(6)
67     }
68 
69     @Test
70     fun testNotifyOnceOnCancellation() = runTest {
71         expect(1)
72         val observable =
73             rxFlowable(currentDispatcher()) {
74                 expect(5)
75                 send("OK")
76                 try {
77                     delay(Long.MAX_VALUE)
78                 } catch (e: CancellationException) {
79                     expect(11)
80                 }
81             }
82             .doOnNext {
83                 expect(6)
84                 assertEquals("OK", it)
85             }
86             .doOnCancel {
87                 expect(10) // notified once!
88             }
89         expect(2)
90         val job = launch(start = CoroutineStart.UNDISPATCHED) {
91             expect(3)
92             observable.collect {
93                 expect(8)
94                 assertEquals("OK", it)
95             }
96         }
97         expect(4)
98         yield() // to observable code
99         expect(7)
100         yield() // to consuming coroutines
101         expect(9)
102         job.cancel()
103         job.join()
104         finish(12)
105     }
106 
107     @Test
108     fun testFailingConsumer() = runTest {
109         val pub = rxFlowable(currentDispatcher()) {
110             repeat(3) {
111                 expect(it + 1) // expect(1), expect(2) *should* be invoked
112                 send(it)
113             }
114         }
115         try {
116             pub.collect {
117                 throw TestException()
118             }
119         } catch (e: TestException) {
120             finish(3)
121         }
122     }
123 }
124