<lambda>null1 package kotlinx.coroutines.rx3
2
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.core.*
5 import io.reactivex.rxjava3.plugins.*
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.CancellationException
8 import org.junit.*
9 import org.junit.Test
10 import java.util.concurrent.*
11 import kotlin.test.*
12
13 class ObservableTest : TestBase() {
14 @Before
15 fun setup() {
16 ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
17 }
18
19 @Test
20 fun testBasicSuccess() = runBlocking {
21 expect(1)
22 val observable = rxObservable(currentDispatcher()) {
23 expect(4)
24 send("OK")
25 }
26 expect(2)
27 observable.subscribe { value ->
28 expect(5)
29 assertEquals("OK", value)
30 }
31 expect(3)
32 yield() // to started coroutine
33 finish(6)
34 }
35
36 @Test
37 fun testBasicFailure() = runBlocking {
38 expect(1)
39 val observable = rxObservable<String>(currentDispatcher()) {
40 expect(4)
41 throw RuntimeException("OK")
42 }
43 expect(2)
44 observable.subscribe({
45 expectUnreached()
46 }, { error ->
47 expect(5)
48 assertIs<RuntimeException>(error)
49 assertEquals("OK", error.message)
50 })
51 expect(3)
52 yield() // to started coroutine
53 finish(6)
54 }
55
56 @Test
57 fun testBasicUnsubscribe() = runBlocking {
58 expect(1)
59 val observable = rxObservable<String>(currentDispatcher()) {
60 expect(4)
61 yield() // back to main, will get cancelled
62 expectUnreached()
63 }
64 expect(2)
65 val sub = observable.subscribe({
66 expectUnreached()
67 }, {
68 expectUnreached()
69 })
70 expect(3)
71 yield() // to started coroutine
72 expect(5)
73 sub.dispose() // will cancel coroutine
74 yield()
75 finish(6)
76 }
77
78 @Test
79 fun testNotifyOnceOnCancellation() = runTest {
80 expect(1)
81 val observable =
82 rxObservable(currentDispatcher()) {
83 expect(5)
84 send("OK")
85 try {
86 delay(Long.MAX_VALUE)
87 } catch (e: CancellationException) {
88 expect(11)
89 }
90 }
91 .doOnNext {
92 expect(6)
93 assertEquals("OK", it)
94 }
95 .doOnDispose {
96 expect(10) // notified once!
97 }
98 expect(2)
99 val job = launch(start = CoroutineStart.UNDISPATCHED) {
100 expect(3)
101 observable.collect {
102 expect(8)
103 assertEquals("OK", it)
104 }
105 }
106 expect(4)
107 yield() // to observable code
108 expect(7)
109 yield() // to consuming coroutines
110 expect(9)
111 job.cancel()
112 job.join()
113 finish(12)
114 }
115
116 @Test
117 fun testFailingConsumer() = runTest {
118 expect(1)
119 val pub = rxObservable(currentDispatcher()) {
120 expect(2)
121 send("OK")
122 try {
123 delay(Long.MAX_VALUE)
124 } catch (e: CancellationException) {
125 finish(5)
126 }
127 }
128 try {
129 pub.collect {
130 expect(3)
131 throw TestException()
132 }
133 } catch (e: TestException) {
134 expect(4)
135 }
136 }
137
138 @Test
139 fun testExceptionAfterCancellation() {
140 // Test that no exceptions were reported to the global EH (it will fail the test if so)
141 val handler = { e: Throwable ->
142 assertFalse(e is CancellationException)
143 }
144 withExceptionHandler(handler) {
145 RxJavaPlugins.setErrorHandler {
146 require(it !is CancellationException)
147 }
148 Observable
149 .interval(1, TimeUnit.MILLISECONDS)
150 .take(1000)
151 .switchMapSingle {
152 rxSingle {
153 timeBomb().await()
154 }
155 }
156 .blockingSubscribe({}, {})
157 }
158 }
159
160 private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() }
161 }
162