1 /*
<lambda>null2 * Copyright (C) 2024 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.android.app.tracing.coroutines.flow
18
19 import com.android.app.tracing.coroutines.CoroutineTraceName
20 import com.android.app.tracing.coroutines.traceCoroutine
21 import com.android.app.tracing.coroutines.traceName
22 import com.android.app.tracing.traceBlocking
23 import kotlin.experimental.ExperimentalTypeInference
24 import kotlinx.coroutines.CoroutineScope
25 import kotlinx.coroutines.ExperimentalCoroutinesApi
26 import kotlinx.coroutines.ExperimentalForInheritanceCoroutinesApi
27 import kotlinx.coroutines.flow.Flow
28 import kotlinx.coroutines.flow.FlowCollector
29 import kotlinx.coroutines.flow.MutableSharedFlow
30 import kotlinx.coroutines.flow.MutableStateFlow
31 import kotlinx.coroutines.flow.SharedFlow
32 import kotlinx.coroutines.flow.SharingStarted
33 import kotlinx.coroutines.flow.StateFlow
34 import kotlinx.coroutines.flow.asSharedFlow
35 import kotlinx.coroutines.flow.asStateFlow
36 import kotlinx.coroutines.flow.collect
37 import kotlinx.coroutines.flow.collectLatest
38 import kotlinx.coroutines.flow.filter
39 import kotlinx.coroutines.flow.flow as safeFlow
40 import kotlinx.coroutines.flow.flowOn
41 import kotlinx.coroutines.flow.map
42 import kotlinx.coroutines.flow.mapLatest
43 import kotlinx.coroutines.flow.onEach
44 import kotlinx.coroutines.flow.shareIn
45 import kotlinx.coroutines.flow.stateIn
46 import kotlinx.coroutines.flow.transform
47
48 /** @see kotlinx.coroutines.flow.internal.unsafeFlow */
49 @OptIn(ExperimentalTypeInference::class)
50 @PublishedApi
51 internal inline fun <T> unsafeFlow(
52 @BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit
53 ): Flow<T> {
54 return object : Flow<T> {
55 override suspend fun collect(collector: FlowCollector<T>) {
56 collector.block()
57 }
58 }
59 }
60
61 /** @see kotlinx.coroutines.flow.unsafeTransform */
62 @PublishedApi
unsafeTransformnull63 internal inline fun <T, R> Flow<T>.unsafeTransform(
64 crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
65 ): Flow<R> = unsafeFlow { collect { value -> transform(value) } }
66
67 @OptIn(ExperimentalForInheritanceCoroutinesApi::class)
68 private open class TracedSharedFlow<out T>(
69 private val name: String,
70 private val flow: SharedFlow<T>,
71 ) : SharedFlow<T> {
72 override val replayCache: List<T>
<lambda>null73 get() = traceBlocking("replayCache:$name") { flow.replayCache }
74
collectnull75 override suspend fun collect(collector: FlowCollector<T>): Nothing {
76 traceCoroutine("collect:$name") {
77 flow.collect { traceCoroutine("emit:$name") { collector.emit(it) } }
78 }
79 }
80 }
81
82 @OptIn(ExperimentalForInheritanceCoroutinesApi::class)
83 private open class TracedStateFlow<out T>(
84 private val name: String,
85 private val flow: StateFlow<T>,
86 ) : StateFlow<T>, TracedSharedFlow<T>(name, flow) {
87 override val value: T
<lambda>null88 get() = traceBlocking("get:$name") { flow.value }
89 }
90
91 @OptIn(ExperimentalForInheritanceCoroutinesApi::class)
92 private open class TracedMutableSharedFlow<T>(
93 private val name: String,
94 private val flow: MutableSharedFlow<T>,
95 ) : MutableSharedFlow<T>, TracedSharedFlow<T>(name, flow) {
96 override val subscriptionCount: StateFlow<Int>
<lambda>null97 get() = traceBlocking("subscriptionCount:$name") { flow.subscriptionCount }
98
99 @ExperimentalCoroutinesApi
resetReplayCachenull100 override fun resetReplayCache() {
101 traceBlocking("resetReplayCache:$name") { flow.resetReplayCache() }
102 }
103
emitnull104 override suspend fun emit(value: T) {
105 traceCoroutine("emit:$name") { flow.emit(value) }
106 }
107
tryEmitnull108 override fun tryEmit(value: T): Boolean {
109 return traceBlocking("tryEmit:$name") { flow.tryEmit(value) }
110 }
111 }
112
113 @OptIn(ExperimentalForInheritanceCoroutinesApi::class)
114 private class TracedMutableStateFlow<T>(
115 private val name: String,
116 private val flow: MutableStateFlow<T>,
117 ) : MutableStateFlow<T>, TracedMutableSharedFlow<T>(name, flow) {
118 override var value: T
<lambda>null119 get() = traceBlocking("get:$name") { flow.value }
120 set(newValue) {
<lambda>null121 traceBlocking("updateState:$name") { flow.value = newValue }
122 }
123
compareAndSetnull124 override fun compareAndSet(expect: T, update: T): Boolean {
125 return traceBlocking("compareAndSet:$name") { flow.compareAndSet(expect, update) }
126 }
127 }
128
129 /**
130 * Helper for adding trace sections for when a trace is collected.
131 *
132 * For example, the following would `emit(1)` from a trace section named "my-flow" and collect in a
133 * coroutine scope named "my-launch".
134 *
135 * ```
136 * val flow {
137 * // The open trace section here would be:
138 * // "coroutine execution;my-launch", and "collect:my-flow"
139 * emit(1)
140 * }
141 * launchTraced("my-launch") {
142 * .flowName("my-flow")
143 * .collect {
144 * // The open trace sections here would be:
145 * // "coroutine execution;my-launch", "collect:my-flow", and "emit:my-flow"
146 * }
147 * }
148 * ```
149 *
150 * TODO(b/334171711): Rename via @Deprecated("Renamed to .traceAs()", ReplaceWith("traceAs(name)"))
151 */
flowNamenull152 public fun <T> Flow<T>.flowName(name: String): Flow<T> = traceAs(name)
153
154 public fun <T> Flow<T>.traceAs(name: String): Flow<T> {
155 return if (com.android.systemui.Flags.coroutineTracing()) {
156 return when (this) {
157 is SharedFlow -> traceAs(name)
158 else ->
159 unsafeFlow {
160 traceCoroutine("collect:$name") {
161 collect { value -> traceCoroutine("emit:$name") { emit(value) } }
162 }
163 }
164 }
165 } else {
166 this
167 }
168 }
169
traceAsnull170 public fun <T> SharedFlow<T>.traceAs(name: String): SharedFlow<T> {
171 return if (com.android.systemui.Flags.coroutineTracing()) {
172 when (this) {
173 is MutableSharedFlow -> traceAs(name)
174 is StateFlow -> traceAs(name)
175 else -> TracedSharedFlow(name, this)
176 }
177 } else {
178 this
179 }
180 }
181
traceAsnull182 public fun <T> StateFlow<T>.traceAs(name: String): StateFlow<T> {
183 return if (com.android.systemui.Flags.coroutineTracing()) {
184 when (this) {
185 is MutableStateFlow -> traceAs(name)
186 else -> TracedStateFlow(name, this)
187 }
188 } else {
189 this
190 }
191 }
192
traceAsnull193 public fun <T> MutableSharedFlow<T>.traceAs(name: String): MutableSharedFlow<T> {
194 return if (com.android.systemui.Flags.coroutineTracing()) {
195 when (this) {
196 is MutableStateFlow -> traceAs(name)
197 else -> TracedMutableSharedFlow(name, this)
198 }
199 } else {
200 this
201 }
202 }
203
traceAsnull204 public fun <T> MutableStateFlow<T>.traceAs(name: String): MutableStateFlow<T> {
205 return if (com.android.systemui.Flags.coroutineTracing()) {
206 TracedMutableStateFlow(name, this)
207 } else {
208 this
209 }
210 }
211
onEachTracednull212 public fun <T> Flow<T>.onEachTraced(name: String, action: suspend (T) -> Unit): Flow<T> {
213 return onEach { value -> traceCoroutine(name) { action(value) } }
214 }
215
216 /**
217 * NOTE: [Flow.collect] is a member function and takes precedence if this function is imported as
218 * `collect` and the default parameter is used. (In Kotlin, when an extension function has the same
219 * receiver type, name, and applicable arguments as a class member function, the member takes
220 * precedence).
221 *
222 * For example,
223 * ```
224 * import com.android.app.tracing.coroutines.flow.collectTraced as collect
225 * ...
226 * flowOf(1).collect { ... } // this will call `Flow.collect`
227 * flowOf(1).collect(null) { ... } // this will call `collectTraced`
228 * ```
229 *
230 * @see kotlinx.coroutines.flow.collect
231 */
collectTracednull232 public suspend fun <T> Flow<T>.collectTraced(name: String, collector: FlowCollector<T>) {
233 if (com.android.systemui.Flags.coroutineTracing()) {
234 traceAs(name).collect(collector)
235 } else {
236 collect(collector)
237 }
238 }
239
240 /** @see kotlinx.coroutines.flow.collect */
collectTracednull241 public suspend fun <T> Flow<T>.collectTraced(name: String) {
242 if (com.android.systemui.Flags.coroutineTracing()) {
243 traceAs(name).collect()
244 } else {
245 collect()
246 }
247 }
248
249 /** @see kotlinx.coroutines.flow.collect */
collectTracednull250 public suspend fun <T> Flow<T>.collectTraced(collector: FlowCollector<T>) {
251 if (com.android.systemui.Flags.coroutineTracing()) {
252 collectTraced(name = collector.traceName, collector = collector)
253 } else {
254 collect(collector)
255 }
256 }
257
258 @OptIn(ExperimentalTypeInference::class)
259 @ExperimentalCoroutinesApi
mapLatestTracednull260 public fun <T, R> Flow<T>.mapLatestTraced(
261 name: String,
262 @BuilderInference transform: suspend (value: T) -> R,
263 ): Flow<R> {
264 return if (com.android.systemui.Flags.coroutineTracing()) {
265 traceAs("mapLatest:$name").mapLatest { traceCoroutine(name) { transform(it) } }
266 } else {
267 mapLatest(transform)
268 }
269 }
270
271 @OptIn(ExperimentalTypeInference::class)
272 @ExperimentalCoroutinesApi
mapLatestTracednull273 public fun <T, R> Flow<T>.mapLatestTraced(
274 @BuilderInference transform: suspend (value: T) -> R
275 ): Flow<R> {
276 return if (com.android.systemui.Flags.coroutineTracing()) {
277 mapLatestTraced(transform.traceName, transform)
278 } else {
279 mapLatestTraced(transform)
280 }
281 }
282
283 /** @see kotlinx.coroutines.flow.collectLatest */
collectLatestTracednull284 internal suspend fun <T> Flow<T>.collectLatestTraced(
285 name: String,
286 action: suspend (value: T) -> Unit,
287 ) {
288 if (com.android.systemui.Flags.coroutineTracing()) {
289 return traceAs("collectLatest:$name").collectLatest { traceCoroutine(name) { action(it) } }
290 } else {
291 collectLatest(action)
292 }
293 }
294
295 /** @see kotlinx.coroutines.flow.collectLatest */
collectLatestTracednull296 public suspend fun <T> Flow<T>.collectLatestTraced(action: suspend (value: T) -> Unit) {
297 if (com.android.systemui.Flags.coroutineTracing()) {
298 collectLatestTraced(action.traceName, action)
299 } else {
300 collectLatest(action)
301 }
302 }
303
304 /** @see kotlinx.coroutines.flow.transform */
305 @OptIn(ExperimentalTypeInference::class)
transformTracednull306 public inline fun <T, R> Flow<T>.transformTraced(
307 name: String,
308 @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit,
309 ): Flow<R> {
310 return if (com.android.systemui.Flags.coroutineTracing()) {
311 // Safe flow must be used because collector is exposed to the caller
312 safeFlow {
313 collect { value ->
314 traceCoroutine(name) {
315 return@collect transform(value)
316 }
317 }
318 }
319 } else {
320 transform(transform)
321 }
322 }
323
324 /** @see kotlinx.coroutines.flow.filter */
filterTracednull325 public inline fun <T> Flow<T>.filterTraced(
326 name: String,
327 crossinline predicate: suspend (T) -> Boolean,
328 ): Flow<T> {
329 return if (com.android.systemui.Flags.coroutineTracing()) {
330 unsafeTransform { value ->
331 if (traceCoroutine(name) { predicate(value) }) {
332 emit(value)
333 }
334 }
335 } else {
336 filter(predicate)
337 }
338 }
339
340 /** @see kotlinx.coroutines.flow.map */
mapTracednull341 public inline fun <T, R> Flow<T>.mapTraced(
342 name: String,
343 crossinline transform: suspend (value: T) -> R,
344 ): Flow<R> {
345 return if (com.android.systemui.Flags.coroutineTracing()) {
346 unsafeTransform { value ->
347 val transformedValue = traceCoroutine(name) { transform(value) }
348 emit(transformedValue)
349 }
350 } else {
351 map(transform)
352 }
353 }
354
355 /** @see kotlinx.coroutines.flow.shareIn */
shareInTracednull356 public fun <T> Flow<T>.shareInTraced(
357 name: String,
358 scope: CoroutineScope,
359 started: SharingStarted,
360 replay: Int = 0,
361 ): SharedFlow<T> {
362 // .shareIn calls this.launch(context), where this === scope, and the previous upstream flow's
363 // context is passed to launch (caveat: the upstream context is only passed to the downstream
364 // SharedFlow if certain conditions are met). For instead, if the upstream is a SharedFlow,
365 // the `.flowOn()` operator will have no effect.
366 return maybeFuseTraceName(name).shareIn(scope, started, replay).traceAs(name)
367 }
368
369 /** @see kotlinx.coroutines.flow.stateIn */
stateInTracednull370 public fun <T> Flow<T>.stateInTraced(
371 name: String,
372 scope: CoroutineScope,
373 started: SharingStarted,
374 initialValue: T,
375 ): StateFlow<T> {
376 // .stateIn calls this.launch(context), where this === scope, and the previous upstream flow's
377 // context is passed to launch
378 return maybeFuseTraceName(name).stateIn(scope, started, initialValue).traceAs(name)
379 }
380
381 /** @see kotlinx.coroutines.flow.stateIn */
stateInTracednull382 public suspend fun <T> Flow<T>.stateInTraced(name: String, scope: CoroutineScope): StateFlow<T> {
383 // .stateIn calls this.launch(context), where this === scope, and the previous upstream flow's
384 // context is passed to launch
385 return maybeFuseTraceName(name).stateIn(scope).traceAs(name)
386 }
387
asSharedFlowTracednull388 public fun <T> MutableSharedFlow<T>.asSharedFlowTraced(name: String): SharedFlow<T> {
389 return asSharedFlow().traceAs(name)
390 }
391
asStateFlowTracednull392 public fun <T> MutableStateFlow<T>.asStateFlowTraced(name: String): StateFlow<T> {
393 return asStateFlow().traceAs(name)
394 }
395
maybeFuseTraceNamenull396 private fun <T> Flow<T>.maybeFuseTraceName(name: String): Flow<T> =
397 if (com.android.systemui.Flags.coroutineTracing()) flowOn(CoroutineTraceName(name)) else this
398