1 /* 2 * Copyright 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package androidx.paging 18 19 import androidx.kruth.assertThat 20 import java.util.Collections 21 import java.util.concurrent.CountDownLatch 22 import kotlin.test.Test 23 import kotlin.test.assertEquals 24 import kotlin.test.assertFalse 25 import kotlin.test.assertTrue 26 import kotlin.time.Duration.Companion.seconds 27 import kotlinx.coroutines.CancellationException 28 import kotlinx.coroutines.DelicateCoroutinesApi 29 import kotlinx.coroutines.Dispatchers 30 import kotlinx.coroutines.ExperimentalCoroutinesApi 31 import kotlinx.coroutines.GlobalScope 32 import kotlinx.coroutines.delay 33 import kotlinx.coroutines.launch 34 import kotlinx.coroutines.runBlocking 35 import kotlinx.coroutines.test.TestScope 36 import kotlinx.coroutines.test.advanceTimeBy 37 import kotlinx.coroutines.test.advanceUntilIdle 38 import kotlinx.coroutines.test.runCurrent 39 import kotlinx.coroutines.test.runTest 40 import kotlinx.coroutines.withContext 41 import kotlinx.coroutines.withTimeout 42 43 @OptIn(ExperimentalCoroutinesApi::class) 44 class SingleRunnerTest { 45 private val testScope = TestScope() 46 47 @Test cancelsPreviousRunnull48 fun cancelsPreviousRun() = 49 testScope.runTest { 50 val runner = SingleRunner() 51 val job = 52 launch(Dispatchers.Unconfined) { runner.runInIsolation { delay(Long.MAX_VALUE) } } 53 54 runner.runInIsolation { 55 // Immediately return. 56 } 57 58 assertFalse { job.isCancelled } 59 assertTrue { job.isCompleted } 60 } 61 62 @Test previousRunCanCancelItselfnull63 fun previousRunCanCancelItself() = 64 testScope.runTest { 65 val runner = SingleRunner() 66 val job = 67 launch(Dispatchers.Unconfined) { 68 runner.runInIsolation { throw CancellationException() } 69 } 70 assertTrue { job.isCancelled } 71 assertTrue { job.isCompleted } 72 } 73 74 @Test preventsCompletionUntilBlockCompletesnull75 fun preventsCompletionUntilBlockCompletes() = 76 testScope.runTest { 77 val runner = SingleRunner() 78 val job = testScope.launch { runner.runInIsolation { delay(1000) } } 79 80 advanceTimeBy(500) 81 runCurrent() 82 assertFalse { job.isCompleted } 83 84 advanceTimeBy(500) 85 runCurrent() 86 assertTrue { job.isCompleted } 87 } 88 89 @Test orderedExecutionnull90 fun orderedExecution() = 91 testScope.runTest { 92 val jobStartList = mutableListOf<Int>() 93 94 val runner = SingleRunner() 95 for (index in 0..9) { 96 launch { 97 runner.runInIsolation { 98 jobStartList.add(index) 99 delay(Long.MAX_VALUE) 100 } 101 } 102 } 103 runCurrent() 104 105 // Cancel previous job. 106 runner.runInIsolation { 107 // Immediately return. 108 } 109 110 assertEquals(List(10) { it }, jobStartList) 111 } 112 113 @Test racingCoroutinesnull114 fun racingCoroutines() = 115 testScope.runTest { 116 val runner = SingleRunner() 117 val output = mutableListOf<Char>() 118 withContext(coroutineContext) { 119 launch { 120 ('0' until '4').forEach { 121 runner.runInIsolation { 122 output.add(it) 123 delay(100) 124 } 125 } 126 } 127 128 launch { 129 ('a' until 'e').forEach { 130 runner.runInIsolation { 131 output.add(it) 132 delay(40) 133 } 134 } 135 } 136 // don't let delays finish to ensure they are really cancelled 137 advanceTimeBy(1) 138 } 139 // Despite launching separately, with different delays, we should see these always 140 // interleave in the same order, since the delays aren't allowed to run in parallel and 141 // each launch will cancel the other one's delay. 142 assertThat(output.joinToString("")).isEqualTo("0a1b2c3d") 143 } 144 145 @OptIn(DelicateCoroutinesApi::class) 146 @Test ensureIsolation_whenCancelationIsIgnoredByThePreviousBlocknull147 fun ensureIsolation_whenCancelationIsIgnoredByThePreviousBlock() { 148 // make sure we wait for previous one if it ignores cancellation 149 val singleRunner = SingleRunner() 150 val output = Collections.synchronizedList(mutableListOf<Int>()) 151 // using a latch instead of a mutex to avoid suspension 152 val firstStarted = CountDownLatch(1) 153 GlobalScope.launch { 154 singleRunner.runInIsolation { 155 // this code uses latches and thread sleeps instead of Mutex and delay to mimic 156 // a code path which ignores coroutine cancellation 157 firstStarted.countDown() 158 repeat(10) { 159 @Suppress("BlockingMethodInNonBlockingContext") Thread.sleep(100) 160 output.add(it) 161 } 162 } 163 } 164 165 val job2 = 166 GlobalScope.launch { 167 @Suppress("BlockingMethodInNonBlockingContext") firstStarted.await() 168 singleRunner.runInIsolation { repeat(10) { output.add(it + 10) } } 169 } 170 runBlocking { withTimeout(10.seconds) { job2.join() } } 171 assertThat(output) 172 .isEqualTo( 173 // if cancellation is ignored, make sure we wait for it to finish. 174 (0 until 20).toList() 175 ) 176 } 177 178 @Test prioritynull179 fun priority() = 180 testScope.runTest { 181 val runner = SingleRunner() 182 val output = mutableListOf<String>() 183 launch { 184 runner.runInIsolation(priority = 2) { 185 output.add("a") 186 delay(10) 187 output.add("b") 188 delay(100) 189 output.add("unexpected") 190 } 191 } 192 runCurrent() 193 194 // should not run 195 runner.runInIsolation(priority = 1) { output.add("unexpected - 2") } 196 advanceTimeBy(20) 197 runner.runInIsolation(priority = 3) { output.add("c") } 198 advanceUntilIdle() 199 // now lower priority can run since higher priority is complete 200 runner.runInIsolation(priority = 1) { output.add("d") } 201 assertThat(output).containsExactly("a", "b", "c", "d") 202 } 203 } 204