• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.example.tracing.demo.experiments
17 
18 import android.os.Trace
19 import com.android.app.tracing.TraceUtils.traceAsync
20 import com.android.app.tracing.coroutines.createCoroutineTracingContext
21 import com.android.app.tracing.coroutines.flow.asStateFlowTraced
22 import com.android.app.tracing.coroutines.flow.filterTraced
23 import com.android.app.tracing.coroutines.flow.flowName
24 import com.android.app.tracing.coroutines.flow.mapTraced
25 import com.android.app.tracing.coroutines.flow.shareInTraced
26 import com.android.app.tracing.coroutines.flow.stateInTraced
27 import com.android.app.tracing.coroutines.flow.traceAs
28 import com.android.app.tracing.coroutines.launchInTraced
29 import com.android.app.tracing.coroutines.launchTraced
30 import com.example.tracing.demo.FixedThread1
31 import com.example.tracing.demo.FixedThread2
32 import javax.inject.Inject
33 import javax.inject.Singleton
34 import kotlin.contracts.ExperimentalContracts
35 import kotlin.contracts.InvocationKind
36 import kotlin.contracts.contract
37 import kotlin.coroutines.CoroutineContext
38 import kotlinx.coroutines.CoroutineDispatcher
39 import kotlinx.coroutines.CoroutineScope
40 import kotlinx.coroutines.cancelChildren
41 import kotlinx.coroutines.coroutineScope
42 import kotlinx.coroutines.flow.Flow
43 import kotlinx.coroutines.flow.MutableStateFlow
44 import kotlinx.coroutines.flow.SharingStarted
45 import kotlinx.coroutines.flow.flow
46 import kotlinx.coroutines.flow.flowOn
47 import kotlinx.coroutines.job
48 
49 @Singleton
50 class FlowTracingTutorial
51 @Inject
52 constructor(
53     @FixedThread1 private var dispatcher1: CoroutineDispatcher,
54     @FixedThread2 private var dispatcher2: CoroutineDispatcher,
55 ) : Experiment() {
56 
57     override val description: String = "Flow tracing tutorial"
58 
59     private lateinit var scope: CoroutineScope
60     private lateinit var bgScope: CoroutineScope
61 
62     @OptIn(ExperimentalContracts::class)
runStepnull63     private suspend inline fun runStep(stepName: String, crossinline block: () -> Unit) {
64         contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
65         traceAsync(TRACK_NAME, "Step #$stepName") {
66             block()
67             traceAsync(TRACK_NAME, "running") { forceSuspend(timeMillis = 40) }
68             traceAsync(TRACK_NAME, "cleanup") {
69                 traceAsync(TRACK_NAME, "cancel-main") { scope.coroutineContext.cancelChildren() }
70                 traceAsync(TRACK_NAME, "cancel-bg") { bgScope.coroutineContext.cancelChildren() }
71                 forceSuspend(timeMillis = 10)
72             }
73         }
74     }
75 
createTracingContextnull76     private fun createTracingContext(name: String): CoroutineContext {
77         return createCoroutineTracingContext(
78             name = name,
79             walkStackForDefaultNames = true,
80             countContinuations = true,
81         )
82     }
83 
84     /** 1.1: */
step1p1null85     private fun step1p1() {
86         scope.launchTraced("LAUNCH_FOR_COLLECT_1.1") {
87             fibFlow.collect { Trace.instant(Trace.TRACE_TAG_APP, "got:$it") }
88         }
89     }
90 
91     /** 1.2: */
step1p2null92     private fun step1p2() {
93         fibFlow.launchInTraced("LAUNCH_FOR_COLLECT_1.2", scope)
94     }
95 
96     /** 2.1: */
step2p1null97     private fun step2p1() {
98         val coldFlow = fibFlow.flowName("FIB_FLOW_NAME_2.1")
99         coldFlow.launchInTracedForDemo("LAUNCH_NAME_2.1", scope)
100     }
101 
102     /** 2.2: */
step2p2null103     private fun step2p2() {
104         val coldFlow = fibFlow.flowName("FIB_FLOW_NAME_2.2").flowOn(dispatcher2)
105         coldFlow.launchInTracedForDemo("LAUNCH_NAME_2.2", scope)
106     }
107 
108     /** 2.3: */
step2p3null109     private fun step2p3() {
110         val coldFlow = fibFlow.flowOn(dispatcher2).flowName("FIB_FLOW_NAME_2.3")
111         coldFlow.launchInTracedForDemo("LAUNCH_NAME_2.3", scope)
112     }
113 
114     /** 2.4: */
step2p4null115     private fun step2p4() {
116         val coldFlow = fibFlow.flowName("FIB_AAA").flowOn(dispatcher2).flowName("FIB_BBB")
117         coldFlow.launchInTracedForDemo("LAUNCH_NAME_2.4", scope)
118     }
119 
120     /** 3: */
step3null121     private fun step3() {
122         val coldFlow =
123             fibFlow
124                 .mapTraced("x2") { it * 2 }
125                 .filterTraced("%3==0") { it % 3 == 0 }
126                 .flowName("(fib x 2) % 3 == 0")
127         coldFlow.launchInTracedForDemo("LAUNCH_NAME_3", scope)
128     }
129 
130     /** 4: */
step4null131     private fun step4() {
132         val sharedFlow =
133             fibFlow.shareInTraced("SHARED_FLOW_NAME_4", bgScope, SharingStarted.Eagerly, 3)
134         scope.launchTraced("LAUNCH_NAME_4") {
135             forceSuspend("before-collect", 5)
136             sharedFlow.collect(::traceInstant)
137         }
138     }
139 
140     /** 5.1: */
step5p1null141     private fun step5p1() {
142         val sharedFlow =
143             fibFlow.stateInTraced("STATE_FLOW_NAME_5.1", bgScope, SharingStarted.Eagerly, 3)
144         scope.launchTraced("LAUNCH_NAME_5.1") {
145             forceSuspend("before-collect", 5)
146             sharedFlow.collect(::traceInstant)
147         }
148     }
149 
150     /** 5.2: */
step5p2null151     private fun step5p2() {
152         val sharedFlow =
153             fibFlow.shareInTraced("STATE_FLOW_NAME_5.2", bgScope, SharingStarted.Eagerly, 3)
154         val stateFlow = sharedFlow.stateInTraced("", bgScope, SharingStarted.Eagerly, 2)
155         scope.launchTraced("LAUNCH_NAME_5.2") {
156             forceSuspend("before-collect", 5)
157             stateFlow.collect(::traceInstant)
158         }
159     }
160 
161     /** 6.1: */
step6p1null162     private fun step6p1() {
163         val state = MutableStateFlow(1).traceAs("MUTABLE_STATE_FLOW_6.1")
164         state.launchInTraced("LAUNCH_FOR_STATE_FLOW_COLLECT_6.1", scope)
165         bgScope.launchTraced("FWD_FIB_TO_STATE_6.1") {
166             forceSuspend("before-collect", 5)
167             fibFlow.collect {
168                 traceInstant(it)
169                 // Manually forward values from the cold flow to the MutableStateFlow
170                 state.value = it
171             }
172         }
173     }
174 
175     /** 6.2: */
step6p2null176     private fun step6p2() {
177         val state = MutableStateFlow(1).traceAs("MUTABLE_STATE_FLOW_6.2")
178         val readOnlyState = state.asStateFlowTraced("READ_ONLY_STATE_6.2")
179         readOnlyState.launchInTraced("LAUNCH_FOR_STATE_FLOW_COLLECT_6.2", scope)
180         bgScope.launchTraced("FWD_FIB_TO_STATE_6.2") {
181             fibFlow.collect {
182                 traceInstant(it)
183                 // Manually forward values from the cold flow to the MutableStateFlow
184                 state.value = it
185             }
186         }
187     }
188 
<lambda>null189     override suspend fun runExperiment(): Unit = coroutineScope {
190         val job = coroutineContext.job
191         scope = CoroutineScope(job + dispatcher1 + createTracingContext("main-scope"))
192         bgScope = CoroutineScope(job + dispatcher2 + createTracingContext("bg-scope"))
193         runStep("1.1", ::step1p1)
194         runStep("1.2", ::step1p2)
195         runStep("2.1", ::step2p1)
196         runStep("2.2", ::step2p2)
197         runStep("2.3", ::step2p3)
198         runStep("2.4", ::step2p4)
199         runStep("3", ::step3)
200         runStep("4", ::step4)
201         runStep("5.1", ::step5p1)
202         runStep("5.2", ::step5p2)
203         runStep("6.1", ::step6p1)
204         runStep("6.2", ::step6p2)
205     }
206 }
207 
launchInTracedForDemonull208 private fun <T> Flow<T>.launchInTracedForDemo(name: String, scope: CoroutineScope) {
209     scope.launchTraced(name) { collect(::traceInstant) }
210 }
211 
traceInstantnull212 private fun <T> traceInstant(value: T) {
213     Trace.instant(Trace.TRACE_TAG_APP, "got:$value")
214 }
215 
<lambda>null216 private val fibFlow = flow {
217     var n0 = 0
218     var n1 = 1
219     while (true) {
220         emit(n0)
221         val n2 = n0 + n1
222         n0 = n1
223         n1 = n2
224         forceSuspend("after-emit", 1)
225     }
226 }
227