• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.example.tracing.demo.experiments
17 
18 import com.android.app.tracing.coroutines.flow.collectTraced as collect
19 import com.android.app.tracing.coroutines.flow.collectTraced
20 import com.android.app.tracing.coroutines.flow.filterTraced as filter
21 import com.android.app.tracing.coroutines.flow.flowName
22 import com.android.app.tracing.coroutines.flow.mapTraced as map
23 import com.android.app.tracing.coroutines.flow.stateInTraced
24 import com.android.app.tracing.coroutines.launchTraced as launch
25 import com.android.app.tracing.coroutines.traceCoroutine
26 import com.example.tracing.demo.FixedThread1
27 import com.example.tracing.demo.FixedThread2
28 import com.example.tracing.demo.FixedThread3
29 import com.example.tracing.demo.FixedThread4
30 import javax.inject.Inject
31 import javax.inject.Singleton
32 import kotlinx.coroutines.CoroutineDispatcher
33 import kotlinx.coroutines.coroutineScope
34 import kotlinx.coroutines.flow.SharingStarted
35 import kotlinx.coroutines.flow.flow
36 import kotlinx.coroutines.flow.flowOn
37 
38 @Singleton
39 class SharedFlowUsage
40 @Inject
41 constructor(
42     @FixedThread1 private var dispatcher1: CoroutineDispatcher,
43     @FixedThread2 private var dispatcher2: CoroutineDispatcher,
44     @FixedThread3 private var dispatcher3: CoroutineDispatcher,
45     @FixedThread4 private var dispatcher4: CoroutineDispatcher,
46 ) : TracedExperiment() {
47 
48     override val description: String = "Create a shared flow and collect from it"
49 
50     private val coldFlow =
<lambda>null51         flow {
52                 var n = 0
53                 while (n < 20) {
54                     emit(n++)
55                     forceSuspend(timeMillis = 5)
56                 }
57             }
<lambda>null58             .map("pow2") {
59                 val rv = it * it
60                 forceSuspend("map($it) -> $rv", 50)
61                 rv
62             }
63             // this trace name is used here because the dispatcher changed
64             .flowOn(dispatcher3)
<lambda>null65             .filter("mod4") {
66                 val rv = it % 4 == 0
67                 forceSuspend("filter($it) -> $rv", 50)
68                 rv
69             }
70             .flowName("COLD_FLOW")
71 
<lambda>null72     override suspend fun runExperiment(): Unit = coroutineScope {
73         val stateFlow = coldFlow.stateInTraced("My-StateFlow", this, SharingStarted.Eagerly, 10)
74         launch("launchAAAA", dispatcher1) {
75             stateFlow.collect("collectAAAA") {
76                 traceCoroutine("AAAA collected: $it") { forceSuspend("AAAA", 15) }
77             }
78         }
79         launch("launchBBBB", dispatcher2) {
80             // Don't pass a string. Instead, rely on default behavior to walk the stack for the
81             // name. This results in trace sections like:
82             // `collect:SharedFlowUsage$start$1$2:emit`
83             // NOTE: `Flow.collect` is a member function and takes precedence, so we need
84             // to invoke `collectTraced` using its original name instead of its `collect` alias
85             stateFlow.collectTraced {
86                 traceCoroutine("BBBB collected: $it") { forceSuspend("BBBB", 30) }
87             }
88         }
89         launch("launchCCCC", dispatcher3) {
90             stateFlow.collect("collectCCCC") {
91                 traceCoroutine("CCCC collected: $it") { forceSuspend("CCCC", 60) }
92             }
93         }
94         launch("launchDDDD", dispatcher4) {
95             // Uses Flow.collect member function instead of collectTraced:
96             stateFlow.collect { traceCoroutine("DDDD collected: $it") { forceSuspend("DDDD", 90) } }
97         }
98     }
99 }
100