<lambda>null1 package kotlinx.coroutines.rx3
2
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.core.*
5 import io.reactivex.rxjava3.exceptions.*
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.flow.*
8 import kotlinx.coroutines.reactive.*
9 import org.junit.Test
10 import java.util.concurrent.Executors
11 import java.util.concurrent.TimeUnit
12 import kotlin.test.*
13
14 // Check that exception is not leaked to the global exception handler
15 class LeakedExceptionTest : TestBase() {
16
17 private val handler: (Throwable) -> Unit =
18 { assertTrue { it is UndeliverableException && it.cause is TestException } }
19
20 @Test
21 fun testSingle() = withExceptionHandler(handler) {
22 withFixedThreadPool(4) { dispatcher ->
23 val flow = rxSingle<Unit>(dispatcher) { throw TestException() }.toFlowable().asFlow()
24 runBlocking {
25 repeat(10000) {
26 combine(flow, flow) { _, _ -> Unit }
27 .catch {}
28 .collect {}
29 }
30 }
31 }
32 }
33
34 @Test
35 fun testObservable() = withExceptionHandler(handler) {
36 withFixedThreadPool(4) { dispatcher ->
37 val flow = rxObservable<Unit>(dispatcher) { throw TestException() }
38 .toFlowable(BackpressureStrategy.BUFFER)
39 .asFlow()
40 runBlocking {
41 repeat(10000) {
42 combine(flow, flow) { _, _ -> Unit }
43 .catch {}
44 .collect {}
45 }
46 }
47 }
48 }
49
50 @Test
51 fun testFlowable() = withExceptionHandler(handler) {
52 withFixedThreadPool(4) { dispatcher ->
53 val flow = rxFlowable<Unit>(dispatcher) { throw TestException() }.asFlow()
54 runBlocking {
55 repeat(10000) {
56 combine(flow, flow) { _, _ -> Unit }
57 .catch {}
58 .collect {}
59 }
60 }
61 }
62 }
63
64 /**
65 * This test doesn't test much and was added to display a problem with straighforward use of
66 * [withExceptionHandler].
67 *
68 * If one was to remove `dispatcher` and launch `rxFlowable` with an empty coroutine context,
69 * this test would fail fairly often, while other tests were also vulnerable, but the problem is
70 * much more difficult to reproduce. Thus, this test is a justification for adding `dispatcher`
71 * to other tests.
72 *
73 * See the commit that introduced this test for a better explanation.
74 */
75 @Test
76 fun testResettingExceptionHandler() = withExceptionHandler(handler) {
77 withFixedThreadPool(4) { dispatcher ->
78 val flow = rxFlowable<Unit>(dispatcher) {
79 if ((0..1).random() == 0) {
80 Thread.sleep(100)
81 }
82 throw TestException()
83 }.asFlow()
84 runBlocking {
85 combine(flow, flow) { _, _ -> Unit }
86 .catch {}
87 .collect {}
88 }
89 }
90 }
91
92 /**
93 * Run in a thread pool, then wait for all the tasks to finish.
94 */
95 private fun withFixedThreadPool(numberOfThreads: Int, block: (CoroutineDispatcher) -> Unit) {
96 val pool = Executors.newFixedThreadPool(numberOfThreads)
97 val dispatcher = pool.asCoroutineDispatcher()
98 block(dispatcher)
99 pool.shutdown()
100 while (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
101 /* deliberately empty */
102 }
103 }
104 }
105