• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.test
2 
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.channels.*
5 import kotlinx.coroutines.flow.*
6 import kotlin.test.*
7 
8 class UnconfinedTestDispatcherTest {
9 
10     @Test
11     fun reproducer1742() {
12         class ObservableValue<T>(initial: T) {
13             var value: T = initial
14                 private set
15 
16             private val listeners = mutableListOf<(T) -> Unit>()
17 
18             fun set(value: T) {
19                 this.value = value
20                 listeners.forEach { it(value) }
21             }
22 
23             fun addListener(listener: (T) -> Unit) {
24                 listeners.add(listener)
25             }
26 
27             fun removeListener(listener: (T) -> Unit) {
28                 listeners.remove(listener)
29             }
30         }
31 
32         fun <T> ObservableValue<T>.observe(): Flow<T> =
33             callbackFlow {
34                 val listener = { value: T ->
35                     if (!isClosedForSend) {
36                         trySend(value)
37                     }
38                 }
39                 addListener(listener)
40                 listener(value)
41                 awaitClose { removeListener(listener) }
42             }
43 
44         val intProvider = ObservableValue(0)
45         val stringProvider = ObservableValue("")
46         var data = Pair(0, "")
47         val scope = CoroutineScope(UnconfinedTestDispatcher())
48         scope.launch {
49             combine(
50                 intProvider.observe(),
51                 stringProvider.observe()
52             ) { intValue, stringValue -> Pair(intValue, stringValue) }
53                 .collect { pair ->
54                     data = pair
55                 }
56         }
57 
58         intProvider.set(1)
59         stringProvider.set("3")
60         intProvider.set(2)
61         intProvider.set(3)
62 
63         scope.cancel()
64         assertEquals(Pair(3, "3"), data)
65     }
66 
67     @Test
68     fun reproducer2082() = runTest {
69         val subject1 = MutableStateFlow(1)
70         val subject2 = MutableStateFlow("a")
71         val values = mutableListOf<Pair<Int, String>>()
72 
73         val job = launch(UnconfinedTestDispatcher(testScheduler)) {
74             combine(subject1, subject2) { intVal, strVal -> intVal to strVal }
75                 .collect {
76                     delay(10000)
77                     values += it
78                 }
79         }
80 
81         subject1.value = 2
82         delay(10000)
83         subject2.value = "b"
84         delay(10000)
85 
86         subject1.value = 3
87         delay(10000)
88         subject2.value = "c"
89         delay(10000)
90         delay(10000)
91         delay(1)
92 
93         job.cancel()
94 
95         assertEquals(listOf(Pair(1, "a"), Pair(2, "a"), Pair(2, "b"), Pair(3, "b"), Pair(3, "c")), values)
96     }
97 
98     @Test
99     fun reproducer2405() = createTestResult {
100         val dispatcher = UnconfinedTestDispatcher()
101         var collectedError = false
102         withContext(dispatcher) {
103             flow { emit(1) }
104                 .combine(
105                     flow<String> { throw IllegalArgumentException() }
106                 ) { int, string -> int.toString() + string }
107                 .catch { emit("error") }
108                 .collect {
109                     assertEquals("error", it)
110                     collectedError = true
111                 }
112         }
113         assertTrue(collectedError)
114     }
115 
116     /** An example from the [UnconfinedTestDispatcher] documentation. */
117     @Test
118     fun testUnconfinedDispatcher() = runTest {
119         val values = mutableListOf<Int>()
120         val stateFlow = MutableStateFlow(0)
121         val job = launch(UnconfinedTestDispatcher(testScheduler)) {
122             stateFlow.collect {
123                 values.add(it)
124             }
125         }
126         stateFlow.value = 1
127         stateFlow.value = 2
128         stateFlow.value = 3
129         job.cancel()
130         assertEquals(listOf(0, 1, 2, 3), values)
131     }
132 
133     /** Tests that child coroutines are eagerly entered. */
134     @Test
135     fun testEagerlyEnteringChildCoroutines() = runTest(UnconfinedTestDispatcher()) {
136         var entered = false
137         val deferred = CompletableDeferred<Unit>()
138         var completed = false
139         launch {
140             entered = true
141             deferred.await()
142             completed = true
143         }
144         assertTrue(entered) // `entered = true` already executed.
145         assertFalse(completed) // however, the child coroutine then suspended, so it is enqueued.
146         deferred.complete(Unit) // resume the coroutine.
147         assertTrue(completed) // now the child coroutine is immediately completed.
148     }
149 
150     /** Tests that the [TestCoroutineScheduler] used for [Dispatchers.Main] gets used by default. */
151     @Test
152     fun testSchedulerReuse() {
153         val dispatcher1 = StandardTestDispatcher()
154         Dispatchers.setMain(dispatcher1)
155         try {
156             val dispatcher2 = UnconfinedTestDispatcher()
157             assertSame(dispatcher1.scheduler, dispatcher2.scheduler)
158         } finally {
159             Dispatchers.resetMain()
160         }
161     }
162 
163 }
164