1 /* <lambda>null2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx3 6 7 import io.reactivex.rxjava3.exceptions.* 8 import kotlinx.coroutines.* 9 import org.junit.* 10 import org.junit.Test 11 import java.util.concurrent.* 12 import kotlin.test.* 13 14 class ObservableExceptionHandlingTest : TestBase() { 15 16 @Before 17 fun setup() { 18 ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") 19 } 20 21 private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable -> 22 assertTrue(t is UndeliverableException && t.cause is T, "$t") 23 expect(expect) 24 } 25 26 private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() } 27 28 @Test 29 fun testException() = withExceptionHandler({ expectUnreached() }) { 30 rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) { 31 expect(1) 32 throw TestException() 33 }.subscribe({ 34 expectUnreached() 35 }, { 36 expect(2) // Reported to onError 37 }) 38 finish(3) 39 } 40 41 @Test 42 fun testFatalException() = withExceptionHandler({ expectUnreached() }) { 43 rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) { 44 expect(1) 45 throw LinkageError() 46 }.subscribe({ 47 expectUnreached() 48 }, { 49 expect(2) 50 }) 51 finish(3) 52 } 53 54 @Test 55 fun testExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) { 56 rxObservable<Int>(Dispatchers.Unconfined) { 57 expect(1) 58 throw TestException() 59 }.publish() 60 .refCount() 61 .subscribe({ 62 expectUnreached() 63 }, { 64 expect(2) // Reported to onError 65 }) 66 finish(3) 67 } 68 69 @Test 70 fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) { 71 rxObservable<Int>(Dispatchers.Unconfined) { 72 expect(1) 73 throw LinkageError() 74 }.publish() 75 .refCount() 76 .subscribe({ 77 expectUnreached() 78 }, { 79 expect(2) // Fatal exceptions are not treated in a special manner 80 }) 81 finish(3) 82 } 83 84 @Test 85 fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) { 86 val latch = CountDownLatch(1) 87 rxObservable(Dispatchers.Unconfined) { 88 expect(1) 89 val result = trySend(Unit) 90 val exception = result.exceptionOrNull() 91 assertTrue(exception is UndeliverableException) 92 assertTrue(exception.cause is LinkageError) 93 assertTrue(isClosedForSend) 94 expect(4) 95 latch.countDown() 96 }.subscribe({ 97 expect(2) 98 throw LinkageError() 99 }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw. 100 latch.await() 101 finish(5) 102 } 103 104 @Test 105 fun testExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) { 106 rxObservable(Dispatchers.Unconfined) { 107 expect(1) 108 send(Unit) 109 }.subscribe({ 110 expect(2) 111 throw TestException() 112 }, { expect(3) }) 113 finish(4) 114 } 115 116 @Test 117 fun testAsynchronousExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) { 118 rxObservable(Dispatchers.Unconfined) { 119 expect(1) 120 send(Unit) 121 }.publish() 122 .refCount() 123 .subscribe({ 124 expect(2) 125 throw RuntimeException() 126 }, { expect(3) }) 127 finish(4) 128 } 129 130 @Test 131 fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) { 132 rxObservable(Dispatchers.Unconfined) { 133 expect(1) 134 send(Unit) 135 }.publish() 136 .refCount() 137 .subscribe({ 138 expect(2) 139 throw LinkageError() 140 }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw. 141 finish(4) 142 } 143 } 144