• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.exceptions.*
8 import kotlinx.coroutines.*
9 import org.junit.*
10 import org.junit.Test
11 import java.util.concurrent.*
12 import kotlin.test.*
13 
14 class ObservableExceptionHandlingTest : TestBase() {
15 
16     @Before
17     fun setup() {
18         ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
19     }
20 
21     private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable ->
22         assertTrue(t is UndeliverableException && t.cause is T, "$t")
23         expect(expect)
24     }
25 
26     private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() }
27 
28     @Test
29     fun testException() = withExceptionHandler({ expectUnreached() }) {
30         rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) {
31             expect(1)
32             throw TestException()
33         }.subscribe({
34             expectUnreached()
35         }, {
36             expect(2) // Reported to onError
37         })
38         finish(3)
39     }
40 
41     @Test
42     fun testFatalException() = withExceptionHandler({ expectUnreached() }) {
43         rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) {
44             expect(1)
45             throw LinkageError()
46         }.subscribe({
47             expectUnreached()
48         }, {
49             expect(2)
50         })
51         finish(3)
52     }
53 
54     @Test
55     fun testExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
56         rxObservable<Int>(Dispatchers.Unconfined) {
57             expect(1)
58             throw TestException()
59         }.publish()
60             .refCount()
61             .subscribe({
62                 expectUnreached()
63             }, {
64                 expect(2) // Reported to onError
65             })
66         finish(3)
67     }
68 
69     @Test
70     fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
71         rxObservable<Int>(Dispatchers.Unconfined) {
72             expect(1)
73             throw LinkageError()
74         }.publish()
75             .refCount()
76             .subscribe({
77                 expectUnreached()
78             }, {
79                 expect(2) // Fatal exceptions are not treated in a special manner
80             })
81         finish(3)
82     }
83 
84     @Test
85     fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
86         val latch = CountDownLatch(1)
87         rxObservable(Dispatchers.Unconfined) {
88             expect(1)
89             val result = trySend(Unit)
90             val exception = result.exceptionOrNull()
91             assertTrue(exception is UndeliverableException)
92             assertTrue(exception.cause is LinkageError)
93             assertTrue(isClosedForSend)
94             expect(4)
95             latch.countDown()
96         }.subscribe({
97             expect(2)
98             throw LinkageError()
99         }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw.
100         latch.await()
101         finish(5)
102     }
103 
104     @Test
105     fun testExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
106         rxObservable(Dispatchers.Unconfined) {
107             expect(1)
108             send(Unit)
109         }.subscribe({
110             expect(2)
111             throw TestException()
112         }, { expect(3) })
113         finish(4)
114     }
115 
116     @Test
117     fun testAsynchronousExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
118         rxObservable(Dispatchers.Unconfined) {
119             expect(1)
120             send(Unit)
121         }.publish()
122             .refCount()
123             .subscribe({
124                 expect(2)
125                 throw RuntimeException()
126             }, { expect(3) })
127         finish(4)
128     }
129 
130     @Test
131     fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
132         rxObservable(Dispatchers.Unconfined) {
133             expect(1)
134             send(Unit)
135         }.publish()
136             .refCount()
137             .subscribe({
138                 expect(2)
139                 throw LinkageError()
140             }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw.
141         finish(4)
142     }
143 }
144