<lambda>null1 package kotlinx.coroutines.rx3
2
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.exceptions.*
5 import kotlinx.coroutines.*
6 import org.junit.*
7 import org.junit.Test
8 import java.util.concurrent.*
9 import kotlin.test.*
10
11 class ObservableExceptionHandlingTest : TestBase() {
12
13 @Before
14 fun setup() {
15 ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
16 }
17
18 private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable ->
19 assertTrue(t is UndeliverableException && t.cause is T, "$t")
20 expect(expect)
21 }
22
23 private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() }
24
25 @Test
26 fun testException() = withExceptionHandler({ expectUnreached() }) {
27 rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) {
28 expect(1)
29 throw TestException()
30 }.subscribe({
31 expectUnreached()
32 }, {
33 expect(2) // Reported to onError
34 })
35 finish(3)
36 }
37
38 @Test
39 fun testFatalException() = withExceptionHandler({ expectUnreached() }) {
40 rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) {
41 expect(1)
42 throw LinkageError()
43 }.subscribe({
44 expectUnreached()
45 }, {
46 expect(2)
47 })
48 finish(3)
49 }
50
51 @Test
52 fun testExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
53 rxObservable<Int>(Dispatchers.Unconfined) {
54 expect(1)
55 throw TestException()
56 }.publish()
57 .refCount()
58 .subscribe({
59 expectUnreached()
60 }, {
61 expect(2) // Reported to onError
62 })
63 finish(3)
64 }
65
66 @Test
67 fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
68 rxObservable<Int>(Dispatchers.Unconfined) {
69 expect(1)
70 throw LinkageError()
71 }.publish()
72 .refCount()
73 .subscribe({
74 expectUnreached()
75 }, {
76 expect(2) // Fatal exceptions are not treated in a special manner
77 })
78 finish(3)
79 }
80
81 @Test
82 fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
83 val latch = CountDownLatch(1)
84 rxObservable(Dispatchers.Unconfined) {
85 expect(1)
86 val result = trySend(Unit)
87 val exception = result.exceptionOrNull()
88 assertIs<UndeliverableException>(exception)
89 assertIs<LinkageError>(exception.cause)
90 assertTrue(isClosedForSend)
91 expect(4)
92 latch.countDown()
93 }.subscribe({
94 expect(2)
95 throw LinkageError()
96 }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw.
97 latch.await()
98 finish(5)
99 }
100
101 @Test
102 fun testExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
103 rxObservable(Dispatchers.Unconfined) {
104 expect(1)
105 send(Unit)
106 }.subscribe({
107 expect(2)
108 throw TestException()
109 }, { expect(3) })
110 finish(4)
111 }
112
113 @Test
114 fun testAsynchronousExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
115 rxObservable(Dispatchers.Unconfined) {
116 expect(1)
117 send(Unit)
118 }.publish()
119 .refCount()
120 .subscribe({
121 expect(2)
122 throw RuntimeException()
123 }, { expect(3) })
124 finish(4)
125 }
126
127 @Test
128 fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
129 rxObservable(Dispatchers.Unconfined) {
130 expect(1)
131 send(Unit)
132 }.publish()
133 .refCount()
134 .subscribe({
135 expect(2)
136 throw LinkageError()
137 }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw.
138 finish(4)
139 }
140 }
141