• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx3
2 
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.core.*
5 import io.reactivex.rxjava3.exceptions.*
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.flow.*
8 import kotlinx.coroutines.reactive.*
9 import org.junit.Test
10 import java.util.concurrent.Executors
11 import java.util.concurrent.TimeUnit
12 import kotlin.test.*
13 
14 // Check that exception is not leaked to the global exception handler
15 class LeakedExceptionTest : TestBase() {
16 
17     private val handler: (Throwable) -> Unit =
18         { assertTrue { it is UndeliverableException && it.cause is TestException } }
19 
20     @Test
21     fun testSingle() = withExceptionHandler(handler) {
22         withFixedThreadPool(4) { dispatcher ->
23             val flow = rxSingle<Unit>(dispatcher) { throw TestException() }.toFlowable().asFlow()
24             runBlocking {
25                 repeat(10000) {
26                     combine(flow, flow) { _, _ -> Unit }
27                         .catch {}
28                         .collect {}
29                 }
30             }
31         }
32     }
33 
34     @Test
35     fun testObservable() = withExceptionHandler(handler) {
36         withFixedThreadPool(4) { dispatcher ->
37             val flow = rxObservable<Unit>(dispatcher) { throw TestException() }
38                 .toFlowable(BackpressureStrategy.BUFFER)
39                 .asFlow()
40             runBlocking {
41                 repeat(10000) {
42                     combine(flow, flow) { _, _ -> Unit }
43                         .catch {}
44                         .collect {}
45                 }
46             }
47         }
48     }
49 
50     @Test
51     fun testFlowable() = withExceptionHandler(handler) {
52         withFixedThreadPool(4) { dispatcher ->
53             val flow = rxFlowable<Unit>(dispatcher) { throw TestException() }.asFlow()
54             runBlocking {
55                 repeat(10000) {
56                     combine(flow, flow) { _, _ -> Unit }
57                         .catch {}
58                         .collect {}
59                 }
60             }
61         }
62     }
63 
64     /**
65      * This test doesn't test much and was added to display a problem with straighforward use of
66      * [withExceptionHandler].
67      *
68      * If one was to remove `dispatcher` and launch `rxFlowable` with an empty coroutine context,
69      * this test would fail fairly often, while other tests were also vulnerable, but the problem is
70      * much more difficult to reproduce. Thus, this test is a justification for adding `dispatcher`
71      * to other tests.
72      *
73      * See the commit that introduced this test for a better explanation.
74      */
75     @Test
76     fun testResettingExceptionHandler() = withExceptionHandler(handler) {
77         withFixedThreadPool(4) { dispatcher ->
78             val flow = rxFlowable<Unit>(dispatcher) {
79                 if ((0..1).random() == 0) {
80                     Thread.sleep(100)
81                 }
82                 throw TestException()
83             }.asFlow()
84             runBlocking {
85                 combine(flow, flow) { _, _ -> Unit }
86                     .catch {}
87                     .collect {}
88             }
89         }
90     }
91 
92     /**
93      * Run in a thread pool, then wait for all the tasks to finish.
94      */
95     private fun withFixedThreadPool(numberOfThreads: Int, block: (CoroutineDispatcher) -> Unit) {
96         val pool = Executors.newFixedThreadPool(numberOfThreads)
97         val dispatcher = pool.asCoroutineDispatcher()
98         block(dispatcher)
99         pool.shutdown()
100         while (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
101             /* deliberately empty */
102         }
103     }
104 }
105