1 /*
<lambda>null2  * Copyright 2025 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.tracing.driver.wire
18 
19 import androidx.annotation.GuardedBy
20 import androidx.tracing.driver.PooledTracePacketArray
21 import androidx.tracing.driver.Queue
22 import androidx.tracing.driver.TraceEvent
23 import androidx.tracing.driver.TraceSink
24 import com.squareup.wire.ProtoWriter
25 import kotlin.coroutines.Continuation
26 import kotlin.coroutines.CoroutineContext
27 import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
28 import kotlin.coroutines.intrinsics.createCoroutineUnintercepted
29 import kotlin.coroutines.resume
30 import kotlin.coroutines.suspendCoroutine
31 import kotlinx.coroutines.Dispatchers
32 import kotlinx.coroutines.Job
33 import okio.BufferedSink
34 
35 /**
36  * The trace sink that writes [BufferedSink], to a new file per trace session.
37  *
38  * This implementation converts [TraceEvent]s into binary protos using
39  * [the Wire library](https://square.github.io/wire/).
40  *
41  * The outputs created by `WireTraceSync` can be visualized with
42  * [ui.perfetto.dev](https://ui.perfetto.dev/), and queried by
43  * [TraceProcessor](https://developer.android.com/reference/androidx/benchmark/traceprocessor/TraceProcessor)
44  * from the `androidx.benchmark:benchmark-traceprocessor` library, the
45  * [C++](https://perfetto.dev/docs/analysis/trace-processor) tool it's built on, or the
46  * [Python](https://perfetto.dev/docs/analysis/trace-processor-python) wrapper.
47  *
48  * As binary protos embed strings as UTF-8, note that any strings serialized by WireTraceSink will
49  * be serialized as UTF-8.
50  *
51  * To create a WireTraceSink for a File, you can use `File("myFile").appendingSink().buffer()`.
52  */
53 public class WireTraceSink(
54     /**
55      * ID which uniquely identifies the trace capture system, within which uuids are guaranteed to
56      * be unique.
57      *
58      * This is only relevant when merging traces across multiple sources (e.g. combining the trace
59      * output of this library with a trace captured on Android with Perfetto).
60      */
61     sequenceId: Int,
62 
63     /** Output [BufferedSink] the trace will be written to. */
64     private val bufferedSink: BufferedSink,
65 
66     /** Coroutine context to execute the serialization on. */
67     private val coroutineContext: CoroutineContext = Dispatchers.IO
68 ) : TraceSink() {
69     private val wireTraceEventSerializer =
70         WireTraceEventSerializer(sequenceId, ProtoWriter(bufferedSink))
71 
72     // There are 2 distinct mechanisms for thread safety here, and they are not necessarily in sync.
73     // The Queue by itself is thread-safe, but after we drain the queue we mark drainRequested
74     // to false (not an atomic operation). So a writer can come along and add a pooled array of
75     // trace packets. That is still okay given, those packets will get picked during the next
76     // drain request; or on flush() prior to the close() of the Sink.
77     // No packets are lost or dropped; and therefore we are still okay with this small
78     // compromise with thread safety.
79     private val queue = Queue<PooledTracePacketArray>()
80 
81     private val drainLock = Any() // Lock used to keep drainRequested, resumeDrain in sync.
82 
83     @GuardedBy("drainLock") private var drainRequested = false
84 
85     // Once the sink is marked as closed. No more enqueue()'s are allowed. This way we can never
86     // race between a new drainRequest() after the last request for flush() happened. This
87     // is because we simply disallow adding more items to the underlying queue.
88     @Volatile private var closed = false
89 
90     @GuardedBy("drainLock") private var resumeDrain: Continuation<Unit>? = null
91 
92     init {
93         resumeDrain =
94             suspend {
95                     coroutineContext[Job]?.invokeOnCompletion { makeDrainRequest() }
96                     while (true) {
97                         drainQueue() // Sets drainRequested to false on completion
98                         suspendCoroutine<Unit> { continuation ->
99                             synchronized(drainLock) { resumeDrain = continuation }
100                             COROUTINE_SUSPENDED // Suspend
101                         }
102                     }
103                 }
104                 .createCoroutineUnintercepted(Continuation(context = coroutineContext) {})
105 
106         // Kick things off and suspend
107         makeDrainRequest()
108     }
109 
110     override fun enqueue(pooledPacketArray: PooledTracePacketArray) {
111         if (!closed) {
112             queue.addLast(pooledPacketArray)
113             makeDrainRequest()
114         }
115     }
116 
117     override fun flush() {
118         makeDrainRequest()
119         while (queue.isNotEmpty() && synchronized(drainLock) { drainRequested }) {
120             // Await completion of the drain.
121         }
122         bufferedSink.flush()
123     }
124 
125     private fun makeDrainRequest() {
126         // Only make a request if one is not already ongoing
127         synchronized(drainLock) {
128             if (!drainRequested) {
129                 drainRequested = true
130                 resumeDrain?.resume(Unit)
131             }
132         }
133     }
134 
135     private fun drainQueue() {
136         while (queue.isNotEmpty()) {
137             val pooledPacketArray = queue.removeFirstOrNull()
138             if (pooledPacketArray != null) {
139                 pooledPacketArray.forEach { wireTraceEventSerializer.writeTraceEvent(it) }
140                 pooledPacketArray.recycle()
141             }
142         }
143         synchronized(drainLock) {
144             drainRequested = false
145             // Mark resumeDrain as consumed because the Coroutines Machinery might still consider
146             // the Continuation as resumed after drainQueue() completes. This was the Atomics
147             // drainRequested, and the Continuation resumeDrain are in sync.
148             resumeDrain = null
149         }
150     }
151 
152     override fun close() {
153         // Mark closed.
154         // We don't need a critical section here, given we have one final flush() that blocks
155         // until the queue is drained. So even if we are racing against additions to the queue,
156         // that should still be okay, because enqueue()'s will eventually start no-oping.
157         closed = true
158         flush()
159         bufferedSink.close()
160     }
161 }
162