• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.app.tracing.coroutines.flow
18 
19 import com.android.app.tracing.coroutines.CoroutineTraceName
20 import com.android.app.tracing.coroutines.traceCoroutine
21 import com.android.app.tracing.coroutines.traceName
22 import com.android.app.tracing.traceBlocking
23 import kotlin.experimental.ExperimentalTypeInference
24 import kotlinx.coroutines.CoroutineScope
25 import kotlinx.coroutines.ExperimentalCoroutinesApi
26 import kotlinx.coroutines.ExperimentalForInheritanceCoroutinesApi
27 import kotlinx.coroutines.flow.Flow
28 import kotlinx.coroutines.flow.FlowCollector
29 import kotlinx.coroutines.flow.MutableSharedFlow
30 import kotlinx.coroutines.flow.MutableStateFlow
31 import kotlinx.coroutines.flow.SharedFlow
32 import kotlinx.coroutines.flow.SharingStarted
33 import kotlinx.coroutines.flow.StateFlow
34 import kotlinx.coroutines.flow.asSharedFlow
35 import kotlinx.coroutines.flow.asStateFlow
36 import kotlinx.coroutines.flow.collect
37 import kotlinx.coroutines.flow.collectLatest
38 import kotlinx.coroutines.flow.filter
39 import kotlinx.coroutines.flow.flow as safeFlow
40 import kotlinx.coroutines.flow.flowOn
41 import kotlinx.coroutines.flow.map
42 import kotlinx.coroutines.flow.mapLatest
43 import kotlinx.coroutines.flow.onEach
44 import kotlinx.coroutines.flow.shareIn
45 import kotlinx.coroutines.flow.stateIn
46 import kotlinx.coroutines.flow.transform
47 
48 /** @see kotlinx.coroutines.flow.internal.unsafeFlow */
49 @OptIn(ExperimentalTypeInference::class)
50 @PublishedApi
51 internal inline fun <T> unsafeFlow(
52     @BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit
53 ): Flow<T> {
54     return object : Flow<T> {
55         override suspend fun collect(collector: FlowCollector<T>) {
56             collector.block()
57         }
58     }
59 }
60 
61 /** @see kotlinx.coroutines.flow.unsafeTransform */
62 @PublishedApi
unsafeTransformnull63 internal inline fun <T, R> Flow<T>.unsafeTransform(
64     crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
65 ): Flow<R> = unsafeFlow { collect { value -> transform(value) } }
66 
67 @OptIn(ExperimentalForInheritanceCoroutinesApi::class)
68 private open class TracedSharedFlow<out T>(
69     private val name: String,
70     private val flow: SharedFlow<T>,
71 ) : SharedFlow<T> {
72     override val replayCache: List<T>
<lambda>null73         get() = traceBlocking("replayCache:$name") { flow.replayCache }
74 
collectnull75     override suspend fun collect(collector: FlowCollector<T>): Nothing {
76         traceCoroutine("collect:$name") {
77             flow.collect { traceCoroutine("emit:$name") { collector.emit(it) } }
78         }
79     }
80 }
81 
82 @OptIn(ExperimentalForInheritanceCoroutinesApi::class)
83 private open class TracedStateFlow<out T>(
84     private val name: String,
85     private val flow: StateFlow<T>,
86 ) : StateFlow<T>, TracedSharedFlow<T>(name, flow) {
87     override val value: T
<lambda>null88         get() = traceBlocking("get:$name") { flow.value }
89 }
90 
91 @OptIn(ExperimentalForInheritanceCoroutinesApi::class)
92 private open class TracedMutableSharedFlow<T>(
93     private val name: String,
94     private val flow: MutableSharedFlow<T>,
95 ) : MutableSharedFlow<T>, TracedSharedFlow<T>(name, flow) {
96     override val subscriptionCount: StateFlow<Int>
<lambda>null97         get() = traceBlocking("subscriptionCount:$name") { flow.subscriptionCount }
98 
99     @ExperimentalCoroutinesApi
resetReplayCachenull100     override fun resetReplayCache() {
101         traceBlocking("resetReplayCache:$name") { flow.resetReplayCache() }
102     }
103 
emitnull104     override suspend fun emit(value: T) {
105         traceCoroutine("emit:$name") { flow.emit(value) }
106     }
107 
tryEmitnull108     override fun tryEmit(value: T): Boolean {
109         return traceBlocking("tryEmit:$name") { flow.tryEmit(value) }
110     }
111 }
112 
113 @OptIn(ExperimentalForInheritanceCoroutinesApi::class)
114 private class TracedMutableStateFlow<T>(
115     private val name: String,
116     private val flow: MutableStateFlow<T>,
117 ) : MutableStateFlow<T>, TracedMutableSharedFlow<T>(name, flow) {
118     override var value: T
<lambda>null119         get() = traceBlocking("get:$name") { flow.value }
120         set(newValue) {
<lambda>null121             traceBlocking("updateState:$name") { flow.value = newValue }
122         }
123 
compareAndSetnull124     override fun compareAndSet(expect: T, update: T): Boolean {
125         return traceBlocking("compareAndSet:$name") { flow.compareAndSet(expect, update) }
126     }
127 }
128 
129 /**
130  * Helper for adding trace sections for when a trace is collected.
131  *
132  * For example, the following would `emit(1)` from a trace section named "my-flow" and collect in a
133  * coroutine scope named "my-launch".
134  *
135  * ```
136  *   val flow {
137  *     // The open trace section here would be:
138  *     // "coroutine execution;my-launch", and "collect:my-flow"
139  *     emit(1)
140  *   }
141  *   launchTraced("my-launch") {
142  *     .flowName("my-flow")
143  *     .collect {
144  *       // The open trace sections here would be:
145  *       // "coroutine execution;my-launch", "collect:my-flow", and "emit:my-flow"
146  *     }
147  *   }
148  * ```
149  *
150  * TODO(b/334171711): Rename via @Deprecated("Renamed to .traceAs()", ReplaceWith("traceAs(name)"))
151  */
flowNamenull152 public fun <T> Flow<T>.flowName(name: String): Flow<T> = traceAs(name)
153 
154 public fun <T> Flow<T>.traceAs(name: String): Flow<T> {
155     return if (com.android.systemui.Flags.coroutineTracing()) {
156         return when (this) {
157             is SharedFlow -> traceAs(name)
158             else ->
159                 unsafeFlow {
160                     traceCoroutine("collect:$name") {
161                         collect { value -> traceCoroutine("emit:$name") { emit(value) } }
162                     }
163                 }
164         }
165     } else {
166         this
167     }
168 }
169 
traceAsnull170 public fun <T> SharedFlow<T>.traceAs(name: String): SharedFlow<T> {
171     return if (com.android.systemui.Flags.coroutineTracing()) {
172         when (this) {
173             is MutableSharedFlow -> traceAs(name)
174             is StateFlow -> traceAs(name)
175             else -> TracedSharedFlow(name, this)
176         }
177     } else {
178         this
179     }
180 }
181 
traceAsnull182 public fun <T> StateFlow<T>.traceAs(name: String): StateFlow<T> {
183     return if (com.android.systemui.Flags.coroutineTracing()) {
184         when (this) {
185             is MutableStateFlow -> traceAs(name)
186             else -> TracedStateFlow(name, this)
187         }
188     } else {
189         this
190     }
191 }
192 
traceAsnull193 public fun <T> MutableSharedFlow<T>.traceAs(name: String): MutableSharedFlow<T> {
194     return if (com.android.systemui.Flags.coroutineTracing()) {
195         when (this) {
196             is MutableStateFlow -> traceAs(name)
197             else -> TracedMutableSharedFlow(name, this)
198         }
199     } else {
200         this
201     }
202 }
203 
traceAsnull204 public fun <T> MutableStateFlow<T>.traceAs(name: String): MutableStateFlow<T> {
205     return if (com.android.systemui.Flags.coroutineTracing()) {
206         TracedMutableStateFlow(name, this)
207     } else {
208         this
209     }
210 }
211 
onEachTracednull212 public fun <T> Flow<T>.onEachTraced(name: String, action: suspend (T) -> Unit): Flow<T> {
213     return onEach { value -> traceCoroutine(name) { action(value) } }
214 }
215 
216 /**
217  * NOTE: [Flow.collect] is a member function and takes precedence if this function is imported as
218  * `collect` and the default parameter is used. (In Kotlin, when an extension function has the same
219  * receiver type, name, and applicable arguments as a class member function, the member takes
220  * precedence).
221  *
222  * For example,
223  * ```
224  * import com.android.app.tracing.coroutines.flow.collectTraced as collect
225  * ...
226  * flowOf(1).collect { ... } // this will call `Flow.collect`
227  * flowOf(1).collect(null) { ... } // this will call `collectTraced`
228  * ```
229  *
230  * @see kotlinx.coroutines.flow.collect
231  */
collectTracednull232 public suspend fun <T> Flow<T>.collectTraced(name: String, collector: FlowCollector<T>) {
233     if (com.android.systemui.Flags.coroutineTracing()) {
234         traceAs(name).collect(collector)
235     } else {
236         collect(collector)
237     }
238 }
239 
240 /** @see kotlinx.coroutines.flow.collect */
collectTracednull241 public suspend fun <T> Flow<T>.collectTraced(name: String) {
242     if (com.android.systemui.Flags.coroutineTracing()) {
243         traceAs(name).collect()
244     } else {
245         collect()
246     }
247 }
248 
249 /** @see kotlinx.coroutines.flow.collect */
collectTracednull250 public suspend fun <T> Flow<T>.collectTraced(collector: FlowCollector<T>) {
251     if (com.android.systemui.Flags.coroutineTracing()) {
252         collectTraced(name = collector.traceName, collector = collector)
253     } else {
254         collect(collector)
255     }
256 }
257 
258 @OptIn(ExperimentalTypeInference::class)
259 @ExperimentalCoroutinesApi
mapLatestTracednull260 public fun <T, R> Flow<T>.mapLatestTraced(
261     name: String,
262     @BuilderInference transform: suspend (value: T) -> R,
263 ): Flow<R> {
264     return if (com.android.systemui.Flags.coroutineTracing()) {
265         traceAs("mapLatest:$name").mapLatest { traceCoroutine(name) { transform(it) } }
266     } else {
267         mapLatest(transform)
268     }
269 }
270 
271 @OptIn(ExperimentalTypeInference::class)
272 @ExperimentalCoroutinesApi
mapLatestTracednull273 public fun <T, R> Flow<T>.mapLatestTraced(
274     @BuilderInference transform: suspend (value: T) -> R
275 ): Flow<R> {
276     return if (com.android.systemui.Flags.coroutineTracing()) {
277         mapLatestTraced(transform.traceName, transform)
278     } else {
279         mapLatestTraced(transform)
280     }
281 }
282 
283 /** @see kotlinx.coroutines.flow.collectLatest */
collectLatestTracednull284 internal suspend fun <T> Flow<T>.collectLatestTraced(
285     name: String,
286     action: suspend (value: T) -> Unit,
287 ) {
288     if (com.android.systemui.Flags.coroutineTracing()) {
289         return traceAs("collectLatest:$name").collectLatest { traceCoroutine(name) { action(it) } }
290     } else {
291         collectLatest(action)
292     }
293 }
294 
295 /** @see kotlinx.coroutines.flow.collectLatest */
collectLatestTracednull296 public suspend fun <T> Flow<T>.collectLatestTraced(action: suspend (value: T) -> Unit) {
297     if (com.android.systemui.Flags.coroutineTracing()) {
298         collectLatestTraced(action.traceName, action)
299     } else {
300         collectLatest(action)
301     }
302 }
303 
304 /** @see kotlinx.coroutines.flow.transform */
305 @OptIn(ExperimentalTypeInference::class)
transformTracednull306 public inline fun <T, R> Flow<T>.transformTraced(
307     name: String,
308     @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit,
309 ): Flow<R> {
310     return if (com.android.systemui.Flags.coroutineTracing()) {
311         // Safe flow must be used because collector is exposed to the caller
312         safeFlow {
313             collect { value ->
314                 traceCoroutine(name) {
315                     return@collect transform(value)
316                 }
317             }
318         }
319     } else {
320         transform(transform)
321     }
322 }
323 
324 /** @see kotlinx.coroutines.flow.filter */
filterTracednull325 public inline fun <T> Flow<T>.filterTraced(
326     name: String,
327     crossinline predicate: suspend (T) -> Boolean,
328 ): Flow<T> {
329     return if (com.android.systemui.Flags.coroutineTracing()) {
330         unsafeTransform { value ->
331             if (traceCoroutine(name) { predicate(value) }) {
332                 emit(value)
333             }
334         }
335     } else {
336         filter(predicate)
337     }
338 }
339 
340 /** @see kotlinx.coroutines.flow.map */
mapTracednull341 public inline fun <T, R> Flow<T>.mapTraced(
342     name: String,
343     crossinline transform: suspend (value: T) -> R,
344 ): Flow<R> {
345     return if (com.android.systemui.Flags.coroutineTracing()) {
346         unsafeTransform { value ->
347             val transformedValue = traceCoroutine(name) { transform(value) }
348             emit(transformedValue)
349         }
350     } else {
351         map(transform)
352     }
353 }
354 
355 /** @see kotlinx.coroutines.flow.shareIn */
shareInTracednull356 public fun <T> Flow<T>.shareInTraced(
357     name: String,
358     scope: CoroutineScope,
359     started: SharingStarted,
360     replay: Int = 0,
361 ): SharedFlow<T> {
362     // .shareIn calls this.launch(context), where this === scope, and the previous upstream flow's
363     // context is passed to launch (caveat: the upstream context is only passed to the downstream
364     // SharedFlow if certain conditions are met). For instead, if the upstream is a SharedFlow,
365     // the `.flowOn()` operator will have no effect.
366     return maybeFuseTraceName(name).shareIn(scope, started, replay).traceAs(name)
367 }
368 
369 /** @see kotlinx.coroutines.flow.stateIn */
stateInTracednull370 public fun <T> Flow<T>.stateInTraced(
371     name: String,
372     scope: CoroutineScope,
373     started: SharingStarted,
374     initialValue: T,
375 ): StateFlow<T> {
376     // .stateIn calls this.launch(context), where this === scope, and the previous upstream flow's
377     // context is passed to launch
378     return maybeFuseTraceName(name).stateIn(scope, started, initialValue).traceAs(name)
379 }
380 
381 /** @see kotlinx.coroutines.flow.stateIn */
stateInTracednull382 public suspend fun <T> Flow<T>.stateInTraced(name: String, scope: CoroutineScope): StateFlow<T> {
383     // .stateIn calls this.launch(context), where this === scope, and the previous upstream flow's
384     // context is passed to launch
385     return maybeFuseTraceName(name).stateIn(scope).traceAs(name)
386 }
387 
asSharedFlowTracednull388 public fun <T> MutableSharedFlow<T>.asSharedFlowTraced(name: String): SharedFlow<T> {
389     return asSharedFlow().traceAs(name)
390 }
391 
asStateFlowTracednull392 public fun <T> MutableStateFlow<T>.asStateFlowTraced(name: String): StateFlow<T> {
393     return asStateFlow().traceAs(name)
394 }
395 
maybeFuseTraceNamenull396 private fun <T> Flow<T>.maybeFuseTraceName(name: String): Flow<T> =
397     if (com.android.systemui.Flags.coroutineTracing()) flowOn(CoroutineTraceName(name)) else this
398