1 /*
<lambda>null2  * Copyright 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.compose.runtime
18 
19 import androidx.compose.runtime.internal.AtomicInt
20 import androidx.compose.runtime.platform.makeSynchronizedObject
21 import androidx.compose.runtime.platform.synchronized
22 import androidx.compose.runtime.snapshots.fastForEach
23 import kotlin.coroutines.Continuation
24 import kotlin.coroutines.resumeWithException
25 import kotlinx.coroutines.CancellationException
26 import kotlinx.coroutines.suspendCancellableCoroutine
27 
28 /**
29  * A simple frame clock.
30  *
31  * This implementation is intended for low-contention environments involving low total numbers of
32  * threads in a pool on the order of ~number of CPU cores available for UI recomposition work, while
33  * avoiding additional allocation where possible.
34  *
35  * [onNewAwaiters] will be invoked whenever the number of awaiters has changed from 0 to 1. If
36  * [onNewAwaiters] **fails** by throwing an exception it will permanently fail this
37  * [BroadcastFrameClock]; all current and future awaiters will resume with the thrown exception.
38  */
39 class BroadcastFrameClock(private val onNewAwaiters: (() -> Unit)? = null) : MonotonicFrameClock {
40 
41     private class FrameAwaiter<R>(val onFrame: (Long) -> R, val continuation: Continuation<R>) {
42         fun resume(timeNanos: Long) {
43             continuation.resumeWith(runCatching { onFrame(timeNanos) })
44         }
45     }
46 
47     private val lock = makeSynchronizedObject()
48     private var failureCause: Throwable? = null
49     private var awaiters = mutableListOf<FrameAwaiter<*>>()
50     private var spareList = mutableListOf<FrameAwaiter<*>>()
51 
52     // Uses AtomicInt to avoid adding AtomicBoolean to the Expect/Actual requirements of the
53     // runtime.
54     private val hasAwaitersUnlocked = AtomicInt(0)
55 
56     /** `true` if there are any callers of [withFrameNanos] awaiting to run for a pending frame. */
57     val hasAwaiters: Boolean
58         get() = hasAwaitersUnlocked.get() != 0
59 
60     /**
61      * Send a frame for time [timeNanos] to all current callers of [withFrameNanos]. The `onFrame`
62      * callback for each caller is invoked synchronously during the call to [sendFrame].
63      */
64     fun sendFrame(timeNanos: Long) {
65         synchronized(lock) {
66             // Rotate the lists so that if a resumed continuation on an immediate dispatcher
67             // bound to the thread calling sendFrame immediately awaits again we don't disrupt
68             // iteration of resuming the rest.
69             val toResume = awaiters
70             awaiters = spareList
71             spareList = toResume
72             hasAwaitersUnlocked.set(0)
73 
74             for (i in 0 until toResume.size) {
75                 toResume[i].resume(timeNanos)
76             }
77             toResume.clear()
78         }
79     }
80 
81     override suspend fun <R> withFrameNanos(onFrame: (Long) -> R): R =
82         suspendCancellableCoroutine { co ->
83             val awaiter = FrameAwaiter(onFrame, co)
84             val hasNewAwaiters =
85                 synchronized(lock) {
86                     val cause = failureCause
87                     if (cause != null) {
88                         co.resumeWithException(cause)
89                         return@suspendCancellableCoroutine
90                     }
91                     val hadAwaiters = awaiters.isNotEmpty()
92                     awaiters.add(awaiter)
93                     if (!hadAwaiters) hasAwaitersUnlocked.set(1)
94                     !hadAwaiters
95                 }
96 
97             co.invokeOnCancellation {
98                 synchronized(lock) {
99                     awaiters.remove(awaiter)
100                     if (awaiters.isEmpty()) hasAwaitersUnlocked.set(0)
101                 }
102             }
103 
104             // Wake up anything that was waiting for someone to schedule a frame
105             if (hasNewAwaiters && onNewAwaiters != null) {
106                 try {
107                     // BUG: Kotlin 1.4.21 plugin doesn't smart cast for a direct onNewAwaiters()
108                     // here
109                     onNewAwaiters.invoke()
110                 } catch (t: Throwable) {
111                     // If onNewAwaiters fails, we permanently fail the BroadcastFrameClock.
112                     fail(t)
113                 }
114             }
115         }
116 
117     private fun fail(cause: Throwable) {
118         synchronized(lock) {
119             if (failureCause != null) return
120             failureCause = cause
121             awaiters.fastForEach { awaiter -> awaiter.continuation.resumeWithException(cause) }
122             awaiters.clear()
123             hasAwaitersUnlocked.set(0)
124         }
125     }
126 
127     /**
128      * Permanently cancel this [BroadcastFrameClock] and cancel all current and future awaiters with
129      * [cancellationException].
130      */
131     fun cancel(
132         cancellationException: CancellationException = CancellationException("clock cancelled")
133     ) {
134         fail(cancellationException)
135     }
136 }
137