• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx3
2 
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.exceptions.*
5 import kotlinx.coroutines.*
6 import org.junit.*
7 import org.junit.Test
8 import java.util.concurrent.*
9 import kotlin.test.*
10 
11 class ObservableExceptionHandlingTest : TestBase() {
12 
13     @Before
14     fun setup() {
15         ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
16     }
17 
18     private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable ->
19         assertTrue(t is UndeliverableException && t.cause is T, "$t")
20         expect(expect)
21     }
22 
23     private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() }
24 
25     @Test
26     fun testException() = withExceptionHandler({ expectUnreached() }) {
27         rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) {
28             expect(1)
29             throw TestException()
30         }.subscribe({
31             expectUnreached()
32         }, {
33             expect(2) // Reported to onError
34         })
35         finish(3)
36     }
37 
38     @Test
39     fun testFatalException() = withExceptionHandler({ expectUnreached() }) {
40         rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) {
41             expect(1)
42             throw LinkageError()
43         }.subscribe({
44             expectUnreached()
45         }, {
46             expect(2)
47         })
48         finish(3)
49     }
50 
51     @Test
52     fun testExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
53         rxObservable<Int>(Dispatchers.Unconfined) {
54             expect(1)
55             throw TestException()
56         }.publish()
57             .refCount()
58             .subscribe({
59                 expectUnreached()
60             }, {
61                 expect(2) // Reported to onError
62             })
63         finish(3)
64     }
65 
66     @Test
67     fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
68         rxObservable<Int>(Dispatchers.Unconfined) {
69             expect(1)
70             throw LinkageError()
71         }.publish()
72             .refCount()
73             .subscribe({
74                 expectUnreached()
75             }, {
76                 expect(2) // Fatal exceptions are not treated in a special manner
77             })
78         finish(3)
79     }
80 
81     @Test
82     fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
83         val latch = CountDownLatch(1)
84         rxObservable(Dispatchers.Unconfined) {
85             expect(1)
86             val result = trySend(Unit)
87             val exception = result.exceptionOrNull()
88             assertIs<UndeliverableException>(exception)
89             assertIs<LinkageError>(exception.cause)
90             assertTrue(isClosedForSend)
91             expect(4)
92             latch.countDown()
93         }.subscribe({
94             expect(2)
95             throw LinkageError()
96         }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw.
97         latch.await()
98         finish(5)
99     }
100 
101     @Test
102     fun testExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
103         rxObservable(Dispatchers.Unconfined) {
104             expect(1)
105             send(Unit)
106         }.subscribe({
107             expect(2)
108             throw TestException()
109         }, { expect(3) })
110         finish(4)
111     }
112 
113     @Test
114     fun testAsynchronousExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
115         rxObservable(Dispatchers.Unconfined) {
116             expect(1)
117             send(Unit)
118         }.publish()
119             .refCount()
120             .subscribe({
121                 expect(2)
122                 throw RuntimeException()
123             }, { expect(3) })
124         finish(4)
125     }
126 
127     @Test
128     fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
129         rxObservable(Dispatchers.Unconfined) {
130             expect(1)
131             send(Unit)
132         }.publish()
133             .refCount()
134             .subscribe({
135                 expect(2)
136                 throw LinkageError()
137             }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw.
138         finish(4)
139     }
140 }
141