1 /* <lambda>null2 * Copyright 2025 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package androidx.tracing.driver.wire 18 19 import androidx.annotation.GuardedBy 20 import androidx.tracing.driver.PooledTracePacketArray 21 import androidx.tracing.driver.Queue 22 import androidx.tracing.driver.TraceEvent 23 import androidx.tracing.driver.TraceSink 24 import com.squareup.wire.ProtoWriter 25 import kotlin.coroutines.Continuation 26 import kotlin.coroutines.CoroutineContext 27 import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED 28 import kotlin.coroutines.intrinsics.createCoroutineUnintercepted 29 import kotlin.coroutines.resume 30 import kotlin.coroutines.suspendCoroutine 31 import kotlinx.coroutines.Dispatchers 32 import kotlinx.coroutines.Job 33 import okio.BufferedSink 34 35 /** 36 * The trace sink that writes [BufferedSink], to a new file per trace session. 37 * 38 * This implementation converts [TraceEvent]s into binary protos using 39 * [the Wire library](https://square.github.io/wire/). 40 * 41 * The outputs created by `WireTraceSync` can be visualized with 42 * [ui.perfetto.dev](https://ui.perfetto.dev/), and queried by 43 * [TraceProcessor](https://developer.android.com/reference/androidx/benchmark/traceprocessor/TraceProcessor) 44 * from the `androidx.benchmark:benchmark-traceprocessor` library, the 45 * [C++](https://perfetto.dev/docs/analysis/trace-processor) tool it's built on, or the 46 * [Python](https://perfetto.dev/docs/analysis/trace-processor-python) wrapper. 47 * 48 * As binary protos embed strings as UTF-8, note that any strings serialized by WireTraceSink will 49 * be serialized as UTF-8. 50 * 51 * To create a WireTraceSink for a File, you can use `File("myFile").appendingSink().buffer()`. 52 */ 53 public class WireTraceSink( 54 /** 55 * ID which uniquely identifies the trace capture system, within which uuids are guaranteed to 56 * be unique. 57 * 58 * This is only relevant when merging traces across multiple sources (e.g. combining the trace 59 * output of this library with a trace captured on Android with Perfetto). 60 */ 61 sequenceId: Int, 62 63 /** Output [BufferedSink] the trace will be written to. */ 64 private val bufferedSink: BufferedSink, 65 66 /** Coroutine context to execute the serialization on. */ 67 private val coroutineContext: CoroutineContext = Dispatchers.IO 68 ) : TraceSink() { 69 private val wireTraceEventSerializer = 70 WireTraceEventSerializer(sequenceId, ProtoWriter(bufferedSink)) 71 72 // There are 2 distinct mechanisms for thread safety here, and they are not necessarily in sync. 73 // The Queue by itself is thread-safe, but after we drain the queue we mark drainRequested 74 // to false (not an atomic operation). So a writer can come along and add a pooled array of 75 // trace packets. That is still okay given, those packets will get picked during the next 76 // drain request; or on flush() prior to the close() of the Sink. 77 // No packets are lost or dropped; and therefore we are still okay with this small 78 // compromise with thread safety. 79 private val queue = Queue<PooledTracePacketArray>() 80 81 private val drainLock = Any() // Lock used to keep drainRequested, resumeDrain in sync. 82 83 @GuardedBy("drainLock") private var drainRequested = false 84 85 // Once the sink is marked as closed. No more enqueue()'s are allowed. This way we can never 86 // race between a new drainRequest() after the last request for flush() happened. This 87 // is because we simply disallow adding more items to the underlying queue. 88 @Volatile private var closed = false 89 90 @GuardedBy("drainLock") private var resumeDrain: Continuation<Unit>? = null 91 92 init { 93 resumeDrain = 94 suspend { 95 coroutineContext[Job]?.invokeOnCompletion { makeDrainRequest() } 96 while (true) { 97 drainQueue() // Sets drainRequested to false on completion 98 suspendCoroutine<Unit> { continuation -> 99 synchronized(drainLock) { resumeDrain = continuation } 100 COROUTINE_SUSPENDED // Suspend 101 } 102 } 103 } 104 .createCoroutineUnintercepted(Continuation(context = coroutineContext) {}) 105 106 // Kick things off and suspend 107 makeDrainRequest() 108 } 109 110 override fun enqueue(pooledPacketArray: PooledTracePacketArray) { 111 if (!closed) { 112 queue.addLast(pooledPacketArray) 113 makeDrainRequest() 114 } 115 } 116 117 override fun flush() { 118 makeDrainRequest() 119 while (queue.isNotEmpty() && synchronized(drainLock) { drainRequested }) { 120 // Await completion of the drain. 121 } 122 bufferedSink.flush() 123 } 124 125 private fun makeDrainRequest() { 126 // Only make a request if one is not already ongoing 127 synchronized(drainLock) { 128 if (!drainRequested) { 129 drainRequested = true 130 resumeDrain?.resume(Unit) 131 } 132 } 133 } 134 135 private fun drainQueue() { 136 while (queue.isNotEmpty()) { 137 val pooledPacketArray = queue.removeFirstOrNull() 138 if (pooledPacketArray != null) { 139 pooledPacketArray.forEach { wireTraceEventSerializer.writeTraceEvent(it) } 140 pooledPacketArray.recycle() 141 } 142 } 143 synchronized(drainLock) { 144 drainRequested = false 145 // Mark resumeDrain as consumed because the Coroutines Machinery might still consider 146 // the Continuation as resumed after drainQueue() completes. This was the Atomics 147 // drainRequested, and the Continuation resumeDrain are in sync. 148 resumeDrain = null 149 } 150 } 151 152 override fun close() { 153 // Mark closed. 154 // We don't need a critical section here, given we have one final flush() that blocks 155 // until the queue is drained. So even if we are racing against additions to the queue, 156 // that should still be okay, because enqueue()'s will eventually start no-oping. 157 closed = true 158 flush() 159 bufferedSink.close() 160 } 161 } 162