1 /* 2 * Copyright 2020 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package androidx.paging 18 19 import kotlinx.coroutines.CancellationException 20 import kotlinx.coroutines.Job 21 import kotlinx.coroutines.coroutineScope 22 import kotlinx.coroutines.sync.Mutex 23 import kotlinx.coroutines.sync.withLock 24 25 /** 26 * Class which guarantees single execution of blocks passed to [runInIsolation] by cancelling the 27 * previous call. [runInIsolation] is backed by a [Mutex], which is fair, so concurrent callers of 28 * [runInIsolation] will trigger in order, with the last call winning (by cancelling previous calls) 29 * 30 * When priorities are used, if the currently running block has a higher priority, the new one is 31 * cancelled. If the currently running block has lower priority, currently running block is 32 * cancelled. If they have equal priority: 33 * * if cancelPreviousInEqualPriority == true, existing block is cancelled 34 * * if cancelPreviousInEqualPriority == false, new block is cancelled 35 * 36 * Note: When a block is cancelled, the outer scope (which called runInIsolation) is NOT cancelled. 37 */ 38 internal class SingleRunner(cancelPreviousInEqualPriority: Boolean = true) { 39 private val holder = Holder(this, cancelPreviousInEqualPriority) 40 runInIsolationnull41 suspend fun runInIsolation(priority: Int = DEFAULT_PRIORITY, block: suspend () -> Unit) { 42 try { 43 coroutineScope { 44 val myJob = 45 checkNotNull(coroutineContext[Job]) { 46 "Internal error. coroutineScope should've created a job." 47 } 48 val run = holder.tryEnqueue(priority = priority, job = myJob) 49 if (run) { 50 try { 51 block() 52 } finally { 53 holder.onFinish(myJob) 54 } 55 } 56 } 57 } catch (cancelIsolatedRunner: CancelIsolatedRunnerException) { 58 // if i was cancelled by another caller to this SingleRunner, silently cancel me 59 if (cancelIsolatedRunner.runner !== this@SingleRunner) { 60 throw cancelIsolatedRunner 61 } 62 } 63 } 64 65 /** 66 * Internal exception which is used to cancel previous instance of an isolated runner. We use 67 * this special class so that we can still support regular cancelation coming from the `block` 68 * but don't cancel its coroutine just to cancel the block. 69 */ 70 private class CancelIsolatedRunnerException(val runner: SingleRunner) : 71 CancellationException("Cancelled isolated runner") 72 73 private class Holder( 74 private val singleRunner: SingleRunner, 75 private val cancelPreviousInEqualPriority: Boolean 76 ) { 77 private val mutex = Mutex() 78 private var previous: Job? = null 79 private var previousPriority: Int = 0 80 tryEnqueuenull81 suspend fun tryEnqueue(priority: Int, job: Job): Boolean { 82 mutex.withLock { 83 val prev = previous 84 return if ( 85 prev == null || 86 !prev.isActive || 87 previousPriority < priority || 88 (previousPriority == priority && cancelPreviousInEqualPriority) 89 ) { 90 prev?.cancel(CancelIsolatedRunnerException(singleRunner)) 91 prev?.join() 92 previous = job 93 previousPriority = priority 94 true 95 } else { 96 false 97 } 98 } 99 } 100 onFinishnull101 suspend fun onFinish(job: Job) { 102 mutex.withLock { 103 if (job === previous) { 104 previous = null 105 } 106 } 107 } 108 } 109 110 companion object { 111 const val DEFAULT_PRIORITY = 0 112 } 113 } 114