• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx3
2 
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.core.*
5 import io.reactivex.rxjava3.plugins.*
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.CancellationException
8 import org.junit.*
9 import org.junit.Test
10 import java.util.concurrent.*
11 import kotlin.test.*
12 
13 class ObservableTest : TestBase() {
14     @Before
15     fun setup() {
16         ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
17     }
18 
19     @Test
20     fun testBasicSuccess() = runBlocking {
21         expect(1)
22         val observable = rxObservable(currentDispatcher()) {
23             expect(4)
24             send("OK")
25         }
26         expect(2)
27         observable.subscribe { value ->
28             expect(5)
29             assertEquals("OK", value)
30         }
31         expect(3)
32         yield() // to started coroutine
33         finish(6)
34     }
35 
36     @Test
37     fun testBasicFailure() = runBlocking {
38         expect(1)
39         val observable = rxObservable<String>(currentDispatcher()) {
40             expect(4)
41             throw RuntimeException("OK")
42         }
43         expect(2)
44         observable.subscribe({
45             expectUnreached()
46         }, { error ->
47             expect(5)
48             assertIs<RuntimeException>(error)
49             assertEquals("OK", error.message)
50         })
51         expect(3)
52         yield() // to started coroutine
53         finish(6)
54     }
55 
56     @Test
57     fun testBasicUnsubscribe() = runBlocking {
58         expect(1)
59         val observable = rxObservable<String>(currentDispatcher()) {
60             expect(4)
61             yield() // back to main, will get cancelled
62             expectUnreached()
63         }
64         expect(2)
65         val sub = observable.subscribe({
66             expectUnreached()
67         }, {
68             expectUnreached()
69         })
70         expect(3)
71         yield() // to started coroutine
72         expect(5)
73         sub.dispose() // will cancel coroutine
74         yield()
75         finish(6)
76     }
77 
78     @Test
79     fun testNotifyOnceOnCancellation() = runTest {
80         expect(1)
81         val observable =
82             rxObservable(currentDispatcher()) {
83                 expect(5)
84                 send("OK")
85                 try {
86                     delay(Long.MAX_VALUE)
87                 } catch (e: CancellationException) {
88                     expect(11)
89                 }
90             }
91             .doOnNext {
92                 expect(6)
93                 assertEquals("OK", it)
94             }
95             .doOnDispose {
96                 expect(10) // notified once!
97             }
98         expect(2)
99         val job = launch(start = CoroutineStart.UNDISPATCHED) {
100             expect(3)
101             observable.collect {
102                 expect(8)
103                 assertEquals("OK", it)
104             }
105         }
106         expect(4)
107         yield() // to observable code
108         expect(7)
109         yield() // to consuming coroutines
110         expect(9)
111         job.cancel()
112         job.join()
113         finish(12)
114     }
115 
116     @Test
117     fun testFailingConsumer() = runTest {
118         expect(1)
119         val pub = rxObservable(currentDispatcher()) {
120             expect(2)
121             send("OK")
122             try {
123                 delay(Long.MAX_VALUE)
124             } catch (e: CancellationException) {
125                 finish(5)
126             }
127         }
128         try {
129             pub.collect {
130                 expect(3)
131                 throw TestException()
132             }
133         } catch (e: TestException) {
134             expect(4)
135         }
136     }
137 
138     @Test
139     fun testExceptionAfterCancellation() {
140         // Test that no exceptions were reported to the global EH (it will fail the test if so)
141         val handler = { e: Throwable ->
142             assertFalse(e is CancellationException)
143         }
144         withExceptionHandler(handler) {
145             RxJavaPlugins.setErrorHandler {
146                 require(it !is CancellationException)
147             }
148             Observable
149                 .interval(1, TimeUnit.MILLISECONDS)
150                 .take(1000)
151                 .switchMapSingle {
152                     rxSingle {
153                         timeBomb().await()
154                     }
155                 }
156                 .blockingSubscribe({}, {})
157         }
158     }
159 
160     private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() }
161 }
162