• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.exceptions.*
8 import kotlinx.coroutines.*
9 import org.junit.*
10 import org.junit.Test
11 import kotlin.test.*
12 
13 class FlowableExceptionHandlingTest : TestBase() {
14 
15     @Before
16     fun setup() {
17         ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
18     }
19 
20     private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable ->
21         assertTrue(t is UndeliverableException && t.cause is T)
22         expect(expect)
23     }
24 
25     private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() }
26 
27     @Test
28     fun testException() = withExceptionHandler({ expectUnreached() }) {
29         rxFlowable<Int>(Dispatchers.Unconfined + cehUnreached()) {
30             expect(1)
31             throw TestException()
32         }.subscribe({
33             expectUnreached()
34         }, {
35             expect(2) // Reported to onError
36         })
37         finish(3)
38     }
39 
40     @Test
41     fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) {
42         rxFlowable<Int>(Dispatchers.Unconfined) {
43             expect(1)
44             throw LinkageError()
45         }.subscribe({
46             expectUnreached()
47         }, {
48             expect(2) // Fatal exception is reported to both onError and CEH
49         })
50         finish(4)
51     }
52 
53     @Test
54     fun testExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
55         rxFlowable<Int>(Dispatchers.Unconfined + cehUnreached()) {
56             expect(1)
57             throw TestException()
58         }.publish()
59             .refCount()
60             .subscribe({
61                 expectUnreached()
62             }, {
63                 expect(2) // Reported to onError
64             })
65         finish(3)
66     }
67 
68     @Test
69     fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) {
70         rxFlowable<Int>(Dispatchers.Unconfined) {
71             expect(1)
72             throw LinkageError()
73         }.publish()
74             .refCount()
75             .subscribe({
76                 expectUnreached()
77             }, {
78                 expect(2)
79             })
80         finish(4)
81     }
82 
83     @Test
84     fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
85         rxFlowable(Dispatchers.Unconfined) {
86             expect(1)
87             send(Unit)
88         }.subscribe({
89             expect(2)
90             throw LinkageError()
91         }, { expect(3) }) // Fatal exception is reported to both onError and CEH
92         finish(5)
93     }
94 
95     @Test
96     fun testExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
97         rxFlowable(Dispatchers.Unconfined + cehUnreached()) {
98             expect(1)
99             send(Unit)
100         }.subscribe({
101             expect(2)
102             throw TestException()
103         }, { expect(3) }) // not reported to onError because came from the subscribe itself
104         finish(4)
105     }
106 
107     @Test
108     fun testAsynchronousExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
109         rxFlowable(Dispatchers.Unconfined + cehUnreached()) {
110             expect(1)
111             send(Unit)
112         }.publish()
113             .refCount()
114             .subscribe({
115                 expect(2)
116                 throw RuntimeException()
117             }, { expect(3) })
118         finish(4)
119     }
120 
121     @Test
122     fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
123         rxFlowable(Dispatchers.Unconfined) {
124             expect(1)
125             send(Unit)
126         }.publish()
127             .refCount()
128             .subscribe({
129                 expect(2)
130                 throw LinkageError()
131             }, { expectUnreached() })
132         finish(4)
133     }
134 }
135