1 /*
2 * Copyright (C) 2024 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package com.example.tracing.demo.experiments
17
18 import android.os.Trace
19 import com.android.app.tracing.TraceUtils.traceAsync
20 import com.android.app.tracing.coroutines.createCoroutineTracingContext
21 import com.android.app.tracing.coroutines.flow.asStateFlowTraced
22 import com.android.app.tracing.coroutines.flow.filterTraced
23 import com.android.app.tracing.coroutines.flow.flowName
24 import com.android.app.tracing.coroutines.flow.mapTraced
25 import com.android.app.tracing.coroutines.flow.shareInTraced
26 import com.android.app.tracing.coroutines.flow.stateInTraced
27 import com.android.app.tracing.coroutines.flow.traceAs
28 import com.android.app.tracing.coroutines.launchInTraced
29 import com.android.app.tracing.coroutines.launchTraced
30 import com.example.tracing.demo.FixedThread1
31 import com.example.tracing.demo.FixedThread2
32 import javax.inject.Inject
33 import javax.inject.Singleton
34 import kotlin.contracts.ExperimentalContracts
35 import kotlin.contracts.InvocationKind
36 import kotlin.contracts.contract
37 import kotlin.coroutines.CoroutineContext
38 import kotlinx.coroutines.CoroutineDispatcher
39 import kotlinx.coroutines.CoroutineScope
40 import kotlinx.coroutines.cancelChildren
41 import kotlinx.coroutines.coroutineScope
42 import kotlinx.coroutines.flow.Flow
43 import kotlinx.coroutines.flow.MutableStateFlow
44 import kotlinx.coroutines.flow.SharingStarted
45 import kotlinx.coroutines.flow.flow
46 import kotlinx.coroutines.flow.flowOn
47 import kotlinx.coroutines.job
48
49 @Singleton
50 class FlowTracingTutorial
51 @Inject
52 constructor(
53 @FixedThread1 private var dispatcher1: CoroutineDispatcher,
54 @FixedThread2 private var dispatcher2: CoroutineDispatcher,
55 ) : Experiment() {
56
57 override val description: String = "Flow tracing tutorial"
58
59 private lateinit var scope: CoroutineScope
60 private lateinit var bgScope: CoroutineScope
61
62 @OptIn(ExperimentalContracts::class)
runStepnull63 private suspend inline fun runStep(stepName: String, crossinline block: () -> Unit) {
64 contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
65 traceAsync(TRACK_NAME, "Step #$stepName") {
66 block()
67 traceAsync(TRACK_NAME, "running") { forceSuspend(timeMillis = 40) }
68 traceAsync(TRACK_NAME, "cleanup") {
69 traceAsync(TRACK_NAME, "cancel-main") { scope.coroutineContext.cancelChildren() }
70 traceAsync(TRACK_NAME, "cancel-bg") { bgScope.coroutineContext.cancelChildren() }
71 forceSuspend(timeMillis = 10)
72 }
73 }
74 }
75
createTracingContextnull76 private fun createTracingContext(name: String): CoroutineContext {
77 return createCoroutineTracingContext(
78 name = name,
79 walkStackForDefaultNames = true,
80 countContinuations = true,
81 )
82 }
83
84 /** 1.1: */
step1p1null85 private fun step1p1() {
86 scope.launchTraced("LAUNCH_FOR_COLLECT_1.1") {
87 fibFlow.collect { Trace.instant(Trace.TRACE_TAG_APP, "got:$it") }
88 }
89 }
90
91 /** 1.2: */
step1p2null92 private fun step1p2() {
93 fibFlow.launchInTraced("LAUNCH_FOR_COLLECT_1.2", scope)
94 }
95
96 /** 2.1: */
step2p1null97 private fun step2p1() {
98 val coldFlow = fibFlow.flowName("FIB_FLOW_NAME_2.1")
99 coldFlow.launchInTracedForDemo("LAUNCH_NAME_2.1", scope)
100 }
101
102 /** 2.2: */
step2p2null103 private fun step2p2() {
104 val coldFlow = fibFlow.flowName("FIB_FLOW_NAME_2.2").flowOn(dispatcher2)
105 coldFlow.launchInTracedForDemo("LAUNCH_NAME_2.2", scope)
106 }
107
108 /** 2.3: */
step2p3null109 private fun step2p3() {
110 val coldFlow = fibFlow.flowOn(dispatcher2).flowName("FIB_FLOW_NAME_2.3")
111 coldFlow.launchInTracedForDemo("LAUNCH_NAME_2.3", scope)
112 }
113
114 /** 2.4: */
step2p4null115 private fun step2p4() {
116 val coldFlow = fibFlow.flowName("FIB_AAA").flowOn(dispatcher2).flowName("FIB_BBB")
117 coldFlow.launchInTracedForDemo("LAUNCH_NAME_2.4", scope)
118 }
119
120 /** 3: */
step3null121 private fun step3() {
122 val coldFlow =
123 fibFlow
124 .mapTraced("x2") { it * 2 }
125 .filterTraced("%3==0") { it % 3 == 0 }
126 .flowName("(fib x 2) % 3 == 0")
127 coldFlow.launchInTracedForDemo("LAUNCH_NAME_3", scope)
128 }
129
130 /** 4: */
step4null131 private fun step4() {
132 val sharedFlow =
133 fibFlow.shareInTraced("SHARED_FLOW_NAME_4", bgScope, SharingStarted.Eagerly, 3)
134 scope.launchTraced("LAUNCH_NAME_4") {
135 forceSuspend("before-collect", 5)
136 sharedFlow.collect(::traceInstant)
137 }
138 }
139
140 /** 5.1: */
step5p1null141 private fun step5p1() {
142 val sharedFlow =
143 fibFlow.stateInTraced("STATE_FLOW_NAME_5.1", bgScope, SharingStarted.Eagerly, 3)
144 scope.launchTraced("LAUNCH_NAME_5.1") {
145 forceSuspend("before-collect", 5)
146 sharedFlow.collect(::traceInstant)
147 }
148 }
149
150 /** 5.2: */
step5p2null151 private fun step5p2() {
152 val sharedFlow =
153 fibFlow.shareInTraced("STATE_FLOW_NAME_5.2", bgScope, SharingStarted.Eagerly, 3)
154 val stateFlow = sharedFlow.stateInTraced("", bgScope, SharingStarted.Eagerly, 2)
155 scope.launchTraced("LAUNCH_NAME_5.2") {
156 forceSuspend("before-collect", 5)
157 stateFlow.collect(::traceInstant)
158 }
159 }
160
161 /** 6.1: */
step6p1null162 private fun step6p1() {
163 val state = MutableStateFlow(1).traceAs("MUTABLE_STATE_FLOW_6.1")
164 state.launchInTraced("LAUNCH_FOR_STATE_FLOW_COLLECT_6.1", scope)
165 bgScope.launchTraced("FWD_FIB_TO_STATE_6.1") {
166 forceSuspend("before-collect", 5)
167 fibFlow.collect {
168 traceInstant(it)
169 // Manually forward values from the cold flow to the MutableStateFlow
170 state.value = it
171 }
172 }
173 }
174
175 /** 6.2: */
step6p2null176 private fun step6p2() {
177 val state = MutableStateFlow(1).traceAs("MUTABLE_STATE_FLOW_6.2")
178 val readOnlyState = state.asStateFlowTraced("READ_ONLY_STATE_6.2")
179 readOnlyState.launchInTraced("LAUNCH_FOR_STATE_FLOW_COLLECT_6.2", scope)
180 bgScope.launchTraced("FWD_FIB_TO_STATE_6.2") {
181 fibFlow.collect {
182 traceInstant(it)
183 // Manually forward values from the cold flow to the MutableStateFlow
184 state.value = it
185 }
186 }
187 }
188
<lambda>null189 override suspend fun runExperiment(): Unit = coroutineScope {
190 val job = coroutineContext.job
191 scope = CoroutineScope(job + dispatcher1 + createTracingContext("main-scope"))
192 bgScope = CoroutineScope(job + dispatcher2 + createTracingContext("bg-scope"))
193 runStep("1.1", ::step1p1)
194 runStep("1.2", ::step1p2)
195 runStep("2.1", ::step2p1)
196 runStep("2.2", ::step2p2)
197 runStep("2.3", ::step2p3)
198 runStep("2.4", ::step2p4)
199 runStep("3", ::step3)
200 runStep("4", ::step4)
201 runStep("5.1", ::step5p1)
202 runStep("5.2", ::step5p2)
203 runStep("6.1", ::step6p1)
204 runStep("6.2", ::step6p2)
205 }
206 }
207
launchInTracedForDemonull208 private fun <T> Flow<T>.launchInTracedForDemo(name: String, scope: CoroutineScope) {
209 scope.launchTraced(name) { collect(::traceInstant) }
210 }
211
traceInstantnull212 private fun <T> traceInstant(value: T) {
213 Trace.instant(Trace.TRACE_TAG_APP, "got:$value")
214 }
215
<lambda>null216 private val fibFlow = flow {
217 var n0 = 0
218 var n1 = 1
219 while (true) {
220 emit(n0)
221 val n2 = n0 + n1
222 n0 = n1
223 n1 = n2
224 forceSuspend("after-emit", 1)
225 }
226 }
227