• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx3
2 
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.exceptions.*
5 import kotlinx.coroutines.*
6 import org.junit.*
7 import org.junit.Test
8 import kotlin.test.*
9 
10 class FlowableExceptionHandlingTest : TestBase() {
11 
12     @Before
13     fun setup() {
14         ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
15     }
16 
17     private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable ->
18         assertTrue(t is UndeliverableException && t.cause is T)
19         expect(expect)
20     }
21 
22     private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() }
23 
24     @Test
25     fun testException() = withExceptionHandler({ expectUnreached() }) {
26         rxFlowable<Int>(Dispatchers.Unconfined + cehUnreached()) {
27             expect(1)
28             throw TestException()
29         }.subscribe({
30             expectUnreached()
31         }, {
32             expect(2) // Reported to onError
33         })
34         finish(3)
35     }
36 
37     @Test
38     fun testFatalException() = withExceptionHandler({ expectUnreached() }) {
39         rxFlowable<Int>(Dispatchers.Unconfined) {
40             expect(1)
41             throw LinkageError()
42         }.subscribe({
43             expectUnreached()
44         }, {
45             expect(2) // Fatal exceptions are not treated as special
46         })
47         finish(3)
48     }
49 
50     @Test
51     fun testExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
52         rxFlowable<Int>(Dispatchers.Unconfined + cehUnreached()) {
53             expect(1)
54             throw TestException()
55         }.publish()
56             .refCount()
57             .subscribe({
58                 expectUnreached()
59             }, {
60                 expect(2) // Reported to onError
61             })
62         finish(3)
63     }
64 
65     @Test
66     fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
67         rxFlowable<Int>(Dispatchers.Unconfined) {
68             expect(1)
69             throw LinkageError()
70         }.publish()
71             .refCount()
72             .subscribe({
73                 expectUnreached()
74             }, {
75                 expect(2)
76             })
77         finish(3)
78     }
79 
80     @Test
81     fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
82         rxFlowable(Dispatchers.Unconfined) {
83             expect(1)
84             send(Unit)
85         }.subscribe({
86             expect(2)
87             throw LinkageError()
88         }, { expectUnreached() }) // Fatal exception is rethrown from `onNext` => the subscription is thought to be cancelled
89         finish(4)
90     }
91 
92     @Test
93     fun testExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
94         rxFlowable(Dispatchers.Unconfined + cehUnreached()) {
95             expect(1)
96             send(Unit)
97         }.subscribe({
98             expect(2)
99             throw TestException()
100         }, { expect(3) }) // not reported to onError because came from the subscribe itself
101         finish(4)
102     }
103 
104     @Test
105     fun testAsynchronousExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
106         rxFlowable(Dispatchers.Unconfined + cehUnreached()) {
107             expect(1)
108             send(Unit)
109         }.publish()
110             .refCount()
111             .subscribe({
112                 expect(2)
113                 throw RuntimeException()
114             }, { expect(3) })
115         finish(4)
116     }
117 
118     @Test
119     fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
120         rxFlowable(Dispatchers.Unconfined) {
121             expect(1)
122             send(Unit)
123         }.publish()
124             .refCount()
125             .subscribe({
126                 expect(2)
127                 throw LinkageError()
128             }, { expectUnreached() })
129         finish(4)
130     }
131 }
132