• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.test
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.flow.*
10 import kotlin.test.*
11 
12 class UnconfinedTestDispatcherTest {
13 
14     @Test
15     fun reproducer1742() {
16         class ObservableValue<T>(initial: T) {
17             var value: T = initial
18                 private set
19 
20             private val listeners = mutableListOf<(T) -> Unit>()
21 
22             fun set(value: T) {
23                 this.value = value
24                 listeners.forEach { it(value) }
25             }
26 
27             fun addListener(listener: (T) -> Unit) {
28                 listeners.add(listener)
29             }
30 
31             fun removeListener(listener: (T) -> Unit) {
32                 listeners.remove(listener)
33             }
34         }
35 
36         fun <T> ObservableValue<T>.observe(): Flow<T> =
37             callbackFlow {
38                 val listener = { value: T ->
39                     if (!isClosedForSend) {
40                         trySend(value)
41                     }
42                 }
43                 addListener(listener)
44                 listener(value)
45                 awaitClose { removeListener(listener) }
46             }
47 
48         val intProvider = ObservableValue(0)
49         val stringProvider = ObservableValue("")
50         var data = Pair(0, "")
51         val scope = CoroutineScope(UnconfinedTestDispatcher())
52         scope.launch {
53             combine(
54                 intProvider.observe(),
55                 stringProvider.observe()
56             ) { intValue, stringValue -> Pair(intValue, stringValue) }
57                 .collect { pair ->
58                     data = pair
59                 }
60         }
61 
62         intProvider.set(1)
63         stringProvider.set("3")
64         intProvider.set(2)
65         intProvider.set(3)
66 
67         scope.cancel()
68         assertEquals(Pair(3, "3"), data)
69     }
70 
71     @Test
72     fun reproducer2082() = runTest {
73         val subject1 = MutableStateFlow(1)
74         val subject2 = MutableStateFlow("a")
75         val values = mutableListOf<Pair<Int, String>>()
76 
77         val job = launch(UnconfinedTestDispatcher(testScheduler)) {
78             combine(subject1, subject2) { intVal, strVal -> intVal to strVal }
79                 .collect {
80                     delay(10000)
81                     values += it
82                 }
83         }
84 
85         subject1.value = 2
86         delay(10000)
87         subject2.value = "b"
88         delay(10000)
89 
90         subject1.value = 3
91         delay(10000)
92         subject2.value = "c"
93         delay(10000)
94         delay(10000)
95         delay(1)
96 
97         job.cancel()
98 
99         assertEquals(listOf(Pair(1, "a"), Pair(2, "a"), Pair(2, "b"), Pair(3, "b"), Pair(3, "c")), values)
100     }
101 
102     @Test
103     fun reproducer2405() = createTestResult {
104         val dispatcher = UnconfinedTestDispatcher()
105         var collectedError = false
106         withContext(dispatcher) {
107             flow { emit(1) }
108                 .combine(
109                     flow<String> { throw IllegalArgumentException() }
110                 ) { int, string -> int.toString() + string }
111                 .catch { emit("error") }
112                 .collect {
113                     assertEquals("error", it)
114                     collectedError = true
115                 }
116         }
117         assertTrue(collectedError)
118     }
119 
120     /** An example from the [UnconfinedTestDispatcher] documentation. */
121     @Test
122     fun testUnconfinedDispatcher() = runTest {
123         val values = mutableListOf<Int>()
124         val stateFlow = MutableStateFlow(0)
125         val job = launch(UnconfinedTestDispatcher(testScheduler)) {
126             stateFlow.collect {
127                 values.add(it)
128             }
129         }
130         stateFlow.value = 1
131         stateFlow.value = 2
132         stateFlow.value = 3
133         job.cancel()
134         assertEquals(listOf(0, 1, 2, 3), values)
135     }
136 
137     /** Tests that child coroutines are eagerly entered. */
138     @Test
139     fun testEagerlyEnteringChildCoroutines() = runTest(UnconfinedTestDispatcher()) {
140         var entered = false
141         val deferred = CompletableDeferred<Unit>()
142         var completed = false
143         launch {
144             entered = true
145             deferred.await()
146             completed = true
147         }
148         assertTrue(entered) // `entered = true` already executed.
149         assertFalse(completed) // however, the child coroutine then suspended, so it is enqueued.
150         deferred.complete(Unit) // resume the coroutine.
151         assertTrue(completed) // now the child coroutine is immediately completed.
152     }
153 
154     /** Tests that the [TestCoroutineScheduler] used for [Dispatchers.Main] gets used by default. */
155     @Test
156     fun testSchedulerReuse() {
157         val dispatcher1 = StandardTestDispatcher()
158         Dispatchers.setMain(dispatcher1)
159         try {
160             val dispatcher2 = UnconfinedTestDispatcher()
161             assertSame(dispatcher1.scheduler, dispatcher2.scheduler)
162         } finally {
163             Dispatchers.resetMain()
164         }
165     }
166 
167 }
168