1 /* <lambda>null2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx2 6 7 import io.reactivex.exceptions.* 8 import kotlinx.coroutines.* 9 import org.junit.* 10 import org.junit.Test 11 import kotlin.test.* 12 13 class FlowableExceptionHandlingTest : TestBase() { 14 15 @Before 16 fun setup() { 17 ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") 18 } 19 20 private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable -> 21 assertTrue(t is UndeliverableException && t.cause is T) 22 expect(expect) 23 } 24 25 private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() } 26 27 @Test 28 fun testException() = withExceptionHandler({ expectUnreached() }) { 29 rxFlowable<Int>(Dispatchers.Unconfined + cehUnreached()) { 30 expect(1) 31 throw TestException() 32 }.subscribe({ 33 expectUnreached() 34 }, { 35 expect(2) // Reported to onError 36 }) 37 finish(3) 38 } 39 40 @Test 41 fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) { 42 rxFlowable<Int>(Dispatchers.Unconfined) { 43 expect(1) 44 throw LinkageError() 45 }.subscribe({ 46 expectUnreached() 47 }, { 48 expect(2) // Fatal exception is reported to both onError and CEH 49 }) 50 finish(4) 51 } 52 53 @Test 54 fun testExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) { 55 rxFlowable<Int>(Dispatchers.Unconfined + cehUnreached()) { 56 expect(1) 57 throw TestException() 58 }.publish() 59 .refCount() 60 .subscribe({ 61 expectUnreached() 62 }, { 63 expect(2) // Reported to onError 64 }) 65 finish(3) 66 } 67 68 @Test 69 fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) { 70 rxFlowable<Int>(Dispatchers.Unconfined) { 71 expect(1) 72 throw LinkageError() 73 }.publish() 74 .refCount() 75 .subscribe({ 76 expectUnreached() 77 }, { 78 expect(2) 79 }) 80 finish(4) 81 } 82 83 @Test 84 fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) { 85 rxFlowable(Dispatchers.Unconfined) { 86 expect(1) 87 send(Unit) 88 }.subscribe({ 89 expect(2) 90 throw LinkageError() 91 }, { expect(3) }) // Fatal exception is reported to both onError and CEH 92 finish(5) 93 } 94 95 @Test 96 fun testExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) { 97 rxFlowable(Dispatchers.Unconfined + cehUnreached()) { 98 expect(1) 99 send(Unit) 100 }.subscribe({ 101 expect(2) 102 throw TestException() 103 }, { expect(3) }) // not reported to onError because came from the subscribe itself 104 finish(4) 105 } 106 107 @Test 108 fun testAsynchronousExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) { 109 rxFlowable(Dispatchers.Unconfined + cehUnreached()) { 110 expect(1) 111 send(Unit) 112 }.publish() 113 .refCount() 114 .subscribe({ 115 expect(2) 116 throw RuntimeException() 117 }, { expect(3) }) 118 finish(4) 119 } 120 121 @Test 122 fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) { 123 rxFlowable(Dispatchers.Unconfined) { 124 expect(1) 125 send(Unit) 126 }.publish() 127 .refCount() 128 .subscribe({ 129 expect(2) 130 throw LinkageError() 131 }, { expectUnreached() }) 132 finish(4) 133 } 134 } 135