• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx3
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.reactive.*
9 import org.junit.Test
10 import kotlin.test.*
11 
12 class FlowableTest : TestBase() {
13     @Test
14     fun testBasicSuccess() = runBlocking {
15         expect(1)
16         val observable = rxFlowable(currentDispatcher()) {
17             expect(4)
18             send("OK")
19         }
20         expect(2)
21         observable.subscribe { value ->
22             expect(5)
23             assertEquals("OK", value)
24         }
25         expect(3)
26         yield() // to started coroutine
27         finish(6)
28     }
29 
30     @Test
31     fun testBasicFailure() = runBlocking {
32         expect(1)
33         val observable = rxFlowable<String>(currentDispatcher()) {
34             expect(4)
35             throw RuntimeException("OK")
36         }
37         expect(2)
38         observable.subscribe({
39             expectUnreached()
40         }, { error ->
41             expect(5)
42             assertTrue(error is RuntimeException)
43             assertEquals("OK", error.message)
44         })
45         expect(3)
46         yield() // to started coroutine
47         finish(6)
48     }
49 
50     @Test
51     fun testBasicUnsubscribe() = runBlocking {
52         expect(1)
53         val observable = rxFlowable<String>(currentDispatcher()) {
54             expect(4)
55             yield() // back to main, will get cancelled
56             expectUnreached()
57         }
58         expect(2)
59         val sub = observable.subscribe({
60             expectUnreached()
61         }, {
62             expectUnreached()
63         })
64         expect(3)
65         yield() // to started coroutine
66         expect(5)
67         sub.dispose() // will cancel coroutine
68         yield()
69         finish(6)
70     }
71 
72     @Test
73     fun testNotifyOnceOnCancellation() = runTest {
74         expect(1)
75         val observable =
76             rxFlowable(currentDispatcher()) {
77                 expect(5)
78                 send("OK")
79                 try {
80                     delay(Long.MAX_VALUE)
81                 } catch (e: CancellationException) {
82                     expect(11)
83                 }
84             }
85             .doOnNext {
86                 expect(6)
87                 assertEquals("OK", it)
88             }
89             .doOnCancel {
90                 expect(10) // notified once!
91             }
92         expect(2)
93         val job = launch(start = CoroutineStart.UNDISPATCHED) {
94             expect(3)
95             observable.collect {
96                 expect(8)
97                 assertEquals("OK", it)
98             }
99         }
100         expect(4)
101         yield() // to observable code
102         expect(7)
103         yield() // to consuming coroutines
104         expect(9)
105         job.cancel()
106         job.join()
107         finish(12)
108     }
109 
110     @Test
111     fun testFailingConsumer() = runTest {
112         val pub = rxFlowable(currentDispatcher()) {
113             repeat(3) {
114                 expect(it + 1) // expect(1), expect(2) *should* be invoked
115                 send(it)
116             }
117         }
118         try {
119             pub.collect {
120                 throw TestException()
121             }
122         } catch (e: TestException) {
123             finish(3)
124         }
125     }
126 }
127