• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx2
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.reactive.*
6 import org.junit.*
7 import org.junit.Test
8 import kotlin.test.*
9 
10 class FlowableTest : TestBase() {
11     @Test
12     fun testBasicSuccess() = runBlocking {
13         expect(1)
14         val observable = rxFlowable(currentDispatcher()) {
15             expect(4)
16             send("OK")
17         }
18         expect(2)
19         observable.subscribe { value ->
20             expect(5)
21             assertEquals("OK", value)
22         }
23         expect(3)
24         yield() // to started coroutine
25         finish(6)
26     }
27 
28     @Test
29     fun testBasicFailure() = runBlocking {
30         expect(1)
31         val observable = rxFlowable<String>(currentDispatcher()) {
32             expect(4)
33             throw RuntimeException("OK")
34         }
35         expect(2)
36         observable.subscribe({
37             expectUnreached()
38         }, { error ->
39             expect(5)
40             assertIs<RuntimeException>(error)
41             assertEquals("OK", error.message)
42         })
43         expect(3)
44         yield() // to started coroutine
45         finish(6)
46     }
47 
48     @Test
49     fun testBasicUnsubscribe() = runBlocking {
50         expect(1)
51         val observable = rxFlowable<String>(currentDispatcher()) {
52             expect(4)
53             yield() // back to main, will get cancelled
54             expectUnreached()
55         }
56         expect(2)
57         val sub = observable.subscribe({
58             expectUnreached()
59         }, {
60             expectUnreached()
61         })
62         expect(3)
63         yield() // to started coroutine
64         expect(5)
65         sub.dispose() // will cancel coroutine
66         yield()
67         finish(6)
68     }
69 
70     @Test
71     fun testNotifyOnceOnCancellation() = runTest {
72         expect(1)
73         val observable =
74             rxFlowable(currentDispatcher()) {
75                 expect(5)
76                 send("OK")
77                 try {
78                     delay(Long.MAX_VALUE)
79                 } catch (e: CancellationException) {
80                     expect(11)
81                 }
82             }
83             .doOnNext {
84                 expect(6)
85                 assertEquals("OK", it)
86             }
87             .doOnCancel {
88                 expect(10) // notified once!
89             }
90         expect(2)
91         val job = launch(start = CoroutineStart.UNDISPATCHED) {
92             expect(3)
93             observable.collect {
94                 expect(8)
95                 assertEquals("OK", it)
96             }
97         }
98         expect(4)
99         yield() // to observable code
100         expect(7)
101         yield() // to consuming coroutines
102         expect(9)
103         job.cancel()
104         job.join()
105         finish(12)
106     }
107 
108     @Test
109     fun testFailingConsumer() = runTest {
110         val pub = rxFlowable(currentDispatcher()) {
111             repeat(3) {
112                 expect(it + 1) // expect(1), expect(2) *should* be invoked
113                 send(it)
114             }
115         }
116         try {
117             pub.collect {
118                 throw TestException()
119             }
120         } catch (e: TestException) {
121             finish(3)
122         }
123     }
124 }
125