<lambda>null1 package kotlinx.coroutines.rx3
2
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.exceptions.*
5 import kotlinx.coroutines.*
6 import org.junit.*
7 import org.junit.Test
8 import kotlin.test.*
9
10 class FlowableExceptionHandlingTest : TestBase() {
11
12 @Before
13 fun setup() {
14 ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
15 }
16
17 private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable ->
18 assertTrue(t is UndeliverableException && t.cause is T)
19 expect(expect)
20 }
21
22 private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() }
23
24 @Test
25 fun testException() = withExceptionHandler({ expectUnreached() }) {
26 rxFlowable<Int>(Dispatchers.Unconfined + cehUnreached()) {
27 expect(1)
28 throw TestException()
29 }.subscribe({
30 expectUnreached()
31 }, {
32 expect(2) // Reported to onError
33 })
34 finish(3)
35 }
36
37 @Test
38 fun testFatalException() = withExceptionHandler({ expectUnreached() }) {
39 rxFlowable<Int>(Dispatchers.Unconfined) {
40 expect(1)
41 throw LinkageError()
42 }.subscribe({
43 expectUnreached()
44 }, {
45 expect(2) // Fatal exceptions are not treated as special
46 })
47 finish(3)
48 }
49
50 @Test
51 fun testExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
52 rxFlowable<Int>(Dispatchers.Unconfined + cehUnreached()) {
53 expect(1)
54 throw TestException()
55 }.publish()
56 .refCount()
57 .subscribe({
58 expectUnreached()
59 }, {
60 expect(2) // Reported to onError
61 })
62 finish(3)
63 }
64
65 @Test
66 fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
67 rxFlowable<Int>(Dispatchers.Unconfined) {
68 expect(1)
69 throw LinkageError()
70 }.publish()
71 .refCount()
72 .subscribe({
73 expectUnreached()
74 }, {
75 expect(2)
76 })
77 finish(3)
78 }
79
80 @Test
81 fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
82 rxFlowable(Dispatchers.Unconfined) {
83 expect(1)
84 send(Unit)
85 }.subscribe({
86 expect(2)
87 throw LinkageError()
88 }, { expectUnreached() }) // Fatal exception is rethrown from `onNext` => the subscription is thought to be cancelled
89 finish(4)
90 }
91
92 @Test
93 fun testExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
94 rxFlowable(Dispatchers.Unconfined + cehUnreached()) {
95 expect(1)
96 send(Unit)
97 }.subscribe({
98 expect(2)
99 throw TestException()
100 }, { expect(3) }) // not reported to onError because came from the subscribe itself
101 finish(4)
102 }
103
104 @Test
105 fun testAsynchronousExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
106 rxFlowable(Dispatchers.Unconfined + cehUnreached()) {
107 expect(1)
108 send(Unit)
109 }.publish()
110 .refCount()
111 .subscribe({
112 expect(2)
113 throw RuntimeException()
114 }, { expect(3) })
115 finish(4)
116 }
117
118 @Test
119 fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
120 rxFlowable(Dispatchers.Unconfined) {
121 expect(1)
122 send(Unit)
123 }.publish()
124 .refCount()
125 .subscribe({
126 expect(2)
127 throw LinkageError()
128 }, { expectUnreached() })
129 finish(4)
130 }
131 }
132