1 /*
2  * Copyright 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.paging
18 
19 import kotlinx.coroutines.CancellationException
20 import kotlinx.coroutines.Job
21 import kotlinx.coroutines.coroutineScope
22 import kotlinx.coroutines.sync.Mutex
23 import kotlinx.coroutines.sync.withLock
24 
25 /**
26  * Class which guarantees single execution of blocks passed to [runInIsolation] by cancelling the
27  * previous call. [runInIsolation] is backed by a [Mutex], which is fair, so concurrent callers of
28  * [runInIsolation] will trigger in order, with the last call winning (by cancelling previous calls)
29  *
30  * When priorities are used, if the currently running block has a higher priority, the new one is
31  * cancelled. If the currently running block has lower priority, currently running block is
32  * cancelled. If they have equal priority:
33  * * if cancelPreviousInEqualPriority == true, existing block is cancelled
34  * * if cancelPreviousInEqualPriority == false, new block is cancelled
35  *
36  * Note: When a block is cancelled, the outer scope (which called runInIsolation) is NOT cancelled.
37  */
38 internal class SingleRunner(cancelPreviousInEqualPriority: Boolean = true) {
39     private val holder = Holder(this, cancelPreviousInEqualPriority)
40 
runInIsolationnull41     suspend fun runInIsolation(priority: Int = DEFAULT_PRIORITY, block: suspend () -> Unit) {
42         try {
43             coroutineScope {
44                 val myJob =
45                     checkNotNull(coroutineContext[Job]) {
46                         "Internal error. coroutineScope should've created a job."
47                     }
48                 val run = holder.tryEnqueue(priority = priority, job = myJob)
49                 if (run) {
50                     try {
51                         block()
52                     } finally {
53                         holder.onFinish(myJob)
54                     }
55                 }
56             }
57         } catch (cancelIsolatedRunner: CancelIsolatedRunnerException) {
58             // if i was cancelled by another caller to this SingleRunner, silently cancel me
59             if (cancelIsolatedRunner.runner !== this@SingleRunner) {
60                 throw cancelIsolatedRunner
61             }
62         }
63     }
64 
65     /**
66      * Internal exception which is used to cancel previous instance of an isolated runner. We use
67      * this special class so that we can still support regular cancelation coming from the `block`
68      * but don't cancel its coroutine just to cancel the block.
69      */
70     private class CancelIsolatedRunnerException(val runner: SingleRunner) :
71         CancellationException("Cancelled isolated runner")
72 
73     private class Holder(
74         private val singleRunner: SingleRunner,
75         private val cancelPreviousInEqualPriority: Boolean
76     ) {
77         private val mutex = Mutex()
78         private var previous: Job? = null
79         private var previousPriority: Int = 0
80 
tryEnqueuenull81         suspend fun tryEnqueue(priority: Int, job: Job): Boolean {
82             mutex.withLock {
83                 val prev = previous
84                 return if (
85                     prev == null ||
86                         !prev.isActive ||
87                         previousPriority < priority ||
88                         (previousPriority == priority && cancelPreviousInEqualPriority)
89                 ) {
90                     prev?.cancel(CancelIsolatedRunnerException(singleRunner))
91                     prev?.join()
92                     previous = job
93                     previousPriority = priority
94                     true
95                 } else {
96                     false
97                 }
98             }
99         }
100 
onFinishnull101         suspend fun onFinish(job: Job) {
102             mutex.withLock {
103                 if (job === previous) {
104                     previous = null
105                 }
106             }
107         }
108     }
109 
110     companion object {
111         const val DEFAULT_PRIORITY = 0
112     }
113 }
114