1 /*
2  * Copyright 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.paging
18 
19 import androidx.kruth.assertThat
20 import java.util.Collections
21 import java.util.concurrent.CountDownLatch
22 import kotlin.test.Test
23 import kotlin.test.assertEquals
24 import kotlin.test.assertFalse
25 import kotlin.test.assertTrue
26 import kotlin.time.Duration.Companion.seconds
27 import kotlinx.coroutines.CancellationException
28 import kotlinx.coroutines.DelicateCoroutinesApi
29 import kotlinx.coroutines.Dispatchers
30 import kotlinx.coroutines.ExperimentalCoroutinesApi
31 import kotlinx.coroutines.GlobalScope
32 import kotlinx.coroutines.delay
33 import kotlinx.coroutines.launch
34 import kotlinx.coroutines.runBlocking
35 import kotlinx.coroutines.test.TestScope
36 import kotlinx.coroutines.test.advanceTimeBy
37 import kotlinx.coroutines.test.advanceUntilIdle
38 import kotlinx.coroutines.test.runCurrent
39 import kotlinx.coroutines.test.runTest
40 import kotlinx.coroutines.withContext
41 import kotlinx.coroutines.withTimeout
42 
43 @OptIn(ExperimentalCoroutinesApi::class)
44 class SingleRunnerTest {
45     private val testScope = TestScope()
46 
47     @Test
cancelsPreviousRunnull48     fun cancelsPreviousRun() =
49         testScope.runTest {
50             val runner = SingleRunner()
51             val job =
52                 launch(Dispatchers.Unconfined) { runner.runInIsolation { delay(Long.MAX_VALUE) } }
53 
54             runner.runInIsolation {
55                 // Immediately return.
56             }
57 
58             assertFalse { job.isCancelled }
59             assertTrue { job.isCompleted }
60         }
61 
62     @Test
previousRunCanCancelItselfnull63     fun previousRunCanCancelItself() =
64         testScope.runTest {
65             val runner = SingleRunner()
66             val job =
67                 launch(Dispatchers.Unconfined) {
68                     runner.runInIsolation { throw CancellationException() }
69                 }
70             assertTrue { job.isCancelled }
71             assertTrue { job.isCompleted }
72         }
73 
74     @Test
preventsCompletionUntilBlockCompletesnull75     fun preventsCompletionUntilBlockCompletes() =
76         testScope.runTest {
77             val runner = SingleRunner()
78             val job = testScope.launch { runner.runInIsolation { delay(1000) } }
79 
80             advanceTimeBy(500)
81             runCurrent()
82             assertFalse { job.isCompleted }
83 
84             advanceTimeBy(500)
85             runCurrent()
86             assertTrue { job.isCompleted }
87         }
88 
89     @Test
orderedExecutionnull90     fun orderedExecution() =
91         testScope.runTest {
92             val jobStartList = mutableListOf<Int>()
93 
94             val runner = SingleRunner()
95             for (index in 0..9) {
96                 launch {
97                     runner.runInIsolation {
98                         jobStartList.add(index)
99                         delay(Long.MAX_VALUE)
100                     }
101                 }
102             }
103             runCurrent()
104 
105             // Cancel previous job.
106             runner.runInIsolation {
107                 // Immediately return.
108             }
109 
110             assertEquals(List(10) { it }, jobStartList)
111         }
112 
113     @Test
racingCoroutinesnull114     fun racingCoroutines() =
115         testScope.runTest {
116             val runner = SingleRunner()
117             val output = mutableListOf<Char>()
118             withContext(coroutineContext) {
119                 launch {
120                     ('0' until '4').forEach {
121                         runner.runInIsolation {
122                             output.add(it)
123                             delay(100)
124                         }
125                     }
126                 }
127 
128                 launch {
129                     ('a' until 'e').forEach {
130                         runner.runInIsolation {
131                             output.add(it)
132                             delay(40)
133                         }
134                     }
135                 }
136                 // don't let delays finish to ensure they are really cancelled
137                 advanceTimeBy(1)
138             }
139             // Despite launching separately, with different delays, we should see these always
140             // interleave in the same order, since the delays aren't allowed to run in parallel and
141             // each launch will cancel the other one's delay.
142             assertThat(output.joinToString("")).isEqualTo("0a1b2c3d")
143         }
144 
145     @OptIn(DelicateCoroutinesApi::class)
146     @Test
ensureIsolation_whenCancelationIsIgnoredByThePreviousBlocknull147     fun ensureIsolation_whenCancelationIsIgnoredByThePreviousBlock() {
148         // make sure we wait for previous one if it ignores cancellation
149         val singleRunner = SingleRunner()
150         val output = Collections.synchronizedList(mutableListOf<Int>())
151         // using a latch instead of a mutex to avoid suspension
152         val firstStarted = CountDownLatch(1)
153         GlobalScope.launch {
154             singleRunner.runInIsolation {
155                 // this code uses latches and thread sleeps instead of Mutex and delay to mimic
156                 // a code path which ignores coroutine cancellation
157                 firstStarted.countDown()
158                 repeat(10) {
159                     @Suppress("BlockingMethodInNonBlockingContext") Thread.sleep(100)
160                     output.add(it)
161                 }
162             }
163         }
164 
165         val job2 =
166             GlobalScope.launch {
167                 @Suppress("BlockingMethodInNonBlockingContext") firstStarted.await()
168                 singleRunner.runInIsolation { repeat(10) { output.add(it + 10) } }
169             }
170         runBlocking { withTimeout(10.seconds) { job2.join() } }
171         assertThat(output)
172             .isEqualTo(
173                 // if cancellation is ignored, make sure we wait for it to finish.
174                 (0 until 20).toList()
175             )
176     }
177 
178     @Test
prioritynull179     fun priority() =
180         testScope.runTest {
181             val runner = SingleRunner()
182             val output = mutableListOf<String>()
183             launch {
184                 runner.runInIsolation(priority = 2) {
185                     output.add("a")
186                     delay(10)
187                     output.add("b")
188                     delay(100)
189                     output.add("unexpected")
190                 }
191             }
192             runCurrent()
193 
194             // should not run
195             runner.runInIsolation(priority = 1) { output.add("unexpected - 2") }
196             advanceTimeBy(20)
197             runner.runInIsolation(priority = 3) { output.add("c") }
198             advanceUntilIdle()
199             // now lower priority can run since higher priority is complete
200             runner.runInIsolation(priority = 1) { output.add("d") }
201             assertThat(output).containsExactly("a", "b", "c", "d")
202         }
203 }
204