1 /* <lambda>null2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx3 6 7 import io.reactivex.rxjava3.core.* 8 import io.reactivex.rxjava3.exceptions.* 9 import kotlinx.coroutines.* 10 import kotlinx.coroutines.flow.* 11 import kotlinx.coroutines.reactive.* 12 import org.junit.Test 13 import java.util.concurrent.Executors 14 import java.util.concurrent.TimeUnit 15 import kotlin.test.* 16 17 // Check that exception is not leaked to the global exception handler 18 class LeakedExceptionTest : TestBase() { 19 20 private val handler: (Throwable) -> Unit = 21 { assertTrue { it is UndeliverableException && it.cause is TestException } } 22 23 @Test 24 fun testSingle() = withExceptionHandler(handler) { 25 withFixedThreadPool(4) { dispatcher -> 26 val flow = rxSingle<Unit>(dispatcher) { throw TestException() }.toFlowable().asFlow() 27 runBlocking { 28 repeat(10000) { 29 combine(flow, flow) { _, _ -> Unit } 30 .catch {} 31 .collect {} 32 } 33 } 34 } 35 } 36 37 @Test 38 fun testObservable() = withExceptionHandler(handler) { 39 withFixedThreadPool(4) { dispatcher -> 40 val flow = rxObservable<Unit>(dispatcher) { throw TestException() } 41 .toFlowable(BackpressureStrategy.BUFFER) 42 .asFlow() 43 runBlocking { 44 repeat(10000) { 45 combine(flow, flow) { _, _ -> Unit } 46 .catch {} 47 .collect {} 48 } 49 } 50 } 51 } 52 53 @Test 54 fun testFlowable() = withExceptionHandler(handler) { 55 withFixedThreadPool(4) { dispatcher -> 56 val flow = rxFlowable<Unit>(dispatcher) { throw TestException() }.asFlow() 57 runBlocking { 58 repeat(10000) { 59 combine(flow, flow) { _, _ -> Unit } 60 .catch {} 61 .collect {} 62 } 63 } 64 } 65 } 66 67 /** 68 * This test doesn't test much and was added to display a problem with straighforward use of 69 * [withExceptionHandler]. 70 * 71 * If one was to remove `dispatcher` and launch `rxFlowable` with an empty coroutine context, 72 * this test would fail fairly often, while other tests were also vulnerable, but the problem is 73 * much more difficult to reproduce. Thus, this test is a justification for adding `dispatcher` 74 * to other tests. 75 * 76 * See the commit that introduced this test for a better explanation. 77 */ 78 @Test 79 fun testResettingExceptionHandler() = withExceptionHandler(handler) { 80 withFixedThreadPool(4) { dispatcher -> 81 val flow = rxFlowable<Unit>(dispatcher) { 82 if ((0..1).random() == 0) { 83 Thread.sleep(100) 84 } 85 throw TestException() 86 }.asFlow() 87 runBlocking { 88 combine(flow, flow) { _, _ -> Unit } 89 .catch {} 90 .collect {} 91 } 92 } 93 } 94 95 /** 96 * Run in a thread pool, then wait for all the tasks to finish. 97 */ 98 private fun withFixedThreadPool(numberOfThreads: Int, block: (CoroutineDispatcher) -> Unit) { 99 val pool = Executors.newFixedThreadPool(numberOfThreads) 100 val dispatcher = pool.asCoroutineDispatcher() 101 block(dispatcher) 102 pool.shutdown() 103 while (!pool.awaitTermination(10, TimeUnit.SECONDS)) { 104 /* deliberately empty */ 105 } 106 } 107 } 108