<lambda>null1 package kotlinx.coroutines.test
2
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.channels.*
5 import kotlinx.coroutines.flow.*
6 import kotlin.test.*
7
8 class UnconfinedTestDispatcherTest {
9
10 @Test
11 fun reproducer1742() {
12 class ObservableValue<T>(initial: T) {
13 var value: T = initial
14 private set
15
16 private val listeners = mutableListOf<(T) -> Unit>()
17
18 fun set(value: T) {
19 this.value = value
20 listeners.forEach { it(value) }
21 }
22
23 fun addListener(listener: (T) -> Unit) {
24 listeners.add(listener)
25 }
26
27 fun removeListener(listener: (T) -> Unit) {
28 listeners.remove(listener)
29 }
30 }
31
32 fun <T> ObservableValue<T>.observe(): Flow<T> =
33 callbackFlow {
34 val listener = { value: T ->
35 if (!isClosedForSend) {
36 trySend(value)
37 }
38 }
39 addListener(listener)
40 listener(value)
41 awaitClose { removeListener(listener) }
42 }
43
44 val intProvider = ObservableValue(0)
45 val stringProvider = ObservableValue("")
46 var data = Pair(0, "")
47 val scope = CoroutineScope(UnconfinedTestDispatcher())
48 scope.launch {
49 combine(
50 intProvider.observe(),
51 stringProvider.observe()
52 ) { intValue, stringValue -> Pair(intValue, stringValue) }
53 .collect { pair ->
54 data = pair
55 }
56 }
57
58 intProvider.set(1)
59 stringProvider.set("3")
60 intProvider.set(2)
61 intProvider.set(3)
62
63 scope.cancel()
64 assertEquals(Pair(3, "3"), data)
65 }
66
67 @Test
68 fun reproducer2082() = runTest {
69 val subject1 = MutableStateFlow(1)
70 val subject2 = MutableStateFlow("a")
71 val values = mutableListOf<Pair<Int, String>>()
72
73 val job = launch(UnconfinedTestDispatcher(testScheduler)) {
74 combine(subject1, subject2) { intVal, strVal -> intVal to strVal }
75 .collect {
76 delay(10000)
77 values += it
78 }
79 }
80
81 subject1.value = 2
82 delay(10000)
83 subject2.value = "b"
84 delay(10000)
85
86 subject1.value = 3
87 delay(10000)
88 subject2.value = "c"
89 delay(10000)
90 delay(10000)
91 delay(1)
92
93 job.cancel()
94
95 assertEquals(listOf(Pair(1, "a"), Pair(2, "a"), Pair(2, "b"), Pair(3, "b"), Pair(3, "c")), values)
96 }
97
98 @Test
99 fun reproducer2405() = createTestResult {
100 val dispatcher = UnconfinedTestDispatcher()
101 var collectedError = false
102 withContext(dispatcher) {
103 flow { emit(1) }
104 .combine(
105 flow<String> { throw IllegalArgumentException() }
106 ) { int, string -> int.toString() + string }
107 .catch { emit("error") }
108 .collect {
109 assertEquals("error", it)
110 collectedError = true
111 }
112 }
113 assertTrue(collectedError)
114 }
115
116 /** An example from the [UnconfinedTestDispatcher] documentation. */
117 @Test
118 fun testUnconfinedDispatcher() = runTest {
119 val values = mutableListOf<Int>()
120 val stateFlow = MutableStateFlow(0)
121 val job = launch(UnconfinedTestDispatcher(testScheduler)) {
122 stateFlow.collect {
123 values.add(it)
124 }
125 }
126 stateFlow.value = 1
127 stateFlow.value = 2
128 stateFlow.value = 3
129 job.cancel()
130 assertEquals(listOf(0, 1, 2, 3), values)
131 }
132
133 /** Tests that child coroutines are eagerly entered. */
134 @Test
135 fun testEagerlyEnteringChildCoroutines() = runTest(UnconfinedTestDispatcher()) {
136 var entered = false
137 val deferred = CompletableDeferred<Unit>()
138 var completed = false
139 launch {
140 entered = true
141 deferred.await()
142 completed = true
143 }
144 assertTrue(entered) // `entered = true` already executed.
145 assertFalse(completed) // however, the child coroutine then suspended, so it is enqueued.
146 deferred.complete(Unit) // resume the coroutine.
147 assertTrue(completed) // now the child coroutine is immediately completed.
148 }
149
150 /** Tests that the [TestCoroutineScheduler] used for [Dispatchers.Main] gets used by default. */
151 @Test
152 fun testSchedulerReuse() {
153 val dispatcher1 = StandardTestDispatcher()
154 Dispatchers.setMain(dispatcher1)
155 try {
156 val dispatcher2 = UnconfinedTestDispatcher()
157 assertSame(dispatcher1.scheduler, dispatcher2.scheduler)
158 } finally {
159 Dispatchers.resetMain()
160 }
161 }
162
163 }
164