• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import io.reactivex.exceptions.*
9 import kotlinx.coroutines.*
10 import kotlinx.coroutines.flow.*
11 import kotlinx.coroutines.reactive.*
12 import org.junit.Test
13 import java.util.concurrent.Executors
14 import java.util.concurrent.TimeUnit
15 import kotlin.test.*
16 
17 // Check that exception is not leaked to the global exception handler
18 class LeakedExceptionTest : TestBase() {
19 
20     private val handler: (Throwable) -> Unit =
21         { assertTrue { it is UndeliverableException && it.cause is TestException } }
22 
23     @Test
24     fun testSingle() = withExceptionHandler(handler) {
25         withFixedThreadPool(4) { dispatcher ->
26             val flow = rxSingle<Unit>(dispatcher) { throw TestException() }.toFlowable().asFlow()
27             runBlocking {
28                 repeat(10000) {
29                     combine(flow, flow) { _, _ -> Unit }
30                         .catch {}
31                         .collect {}
32                 }
33             }
34         }
35     }
36 
37     @Test
38     fun testObservable() = withExceptionHandler(handler) {
39         withFixedThreadPool(4) { dispatcher ->
40             val flow = rxObservable<Unit>(dispatcher) { throw TestException() }
41                 .toFlowable(BackpressureStrategy.BUFFER)
42                 .asFlow()
43             runBlocking {
44                 repeat(10000) {
45                     combine(flow, flow) { _, _ -> Unit }
46                         .catch {}
47                         .collect {}
48                 }
49             }
50         }
51     }
52 
53     @Test
54     fun testFlowable() = withExceptionHandler(handler) {
55         withFixedThreadPool(4) { dispatcher ->
56             val flow = rxFlowable<Unit>(dispatcher) { throw TestException() }.asFlow()
57             runBlocking {
58                 repeat(10000) {
59                     combine(flow, flow) { _, _ -> Unit }
60                         .catch {}
61                         .collect {}
62                 }
63             }
64         }
65     }
66 
67     /**
68      * This test doesn't test much and was added to display a problem with straighforward use of
69      * [withExceptionHandler].
70      *
71      * If one was to remove `dispatcher` and launch `rxFlowable` with an empty coroutine context,
72      * this test would fail fairly often, while other tests were also vulnerable, but the problem is
73      * much more difficult to reproduce. Thus, this test is a justification for adding `dispatcher`
74      * to other tests.
75      *
76      * See the commit that introduced this test for a better explanation.
77      */
78     @Test
79     fun testResettingExceptionHandler() = withExceptionHandler(handler) {
80         withFixedThreadPool(4) { dispatcher ->
81             val flow = rxFlowable<Unit>(dispatcher) {
82                 if ((0..1).random() == 0) {
83                     Thread.sleep(100)
84                 }
85                 throw TestException()
86             }.asFlow()
87             runBlocking {
88                 combine(flow, flow) { _, _ -> Unit }
89                     .catch {}
90                     .collect {}
91             }
92         }
93     }
94 
95     /**
96      * Run in a thread pool, then wait for all the tasks to finish.
97      */
98     private fun withFixedThreadPool(numberOfThreads: Int, block: (CoroutineDispatcher) -> Unit) {
99         val pool = Executors.newFixedThreadPool(numberOfThreads)
100         val dispatcher = pool.asCoroutineDispatcher()
101         block(dispatcher)
102         pool.shutdown()
103         while (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
104             /* deliberately empty */
105         }
106     }
107 }
108