1// Copyright (C) 2022 The Android Open Source Project 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15import protobuf from 'protobufjs/minimal'; 16 17import {defer, Deferred} from '../../base/deferred'; 18import {assertExists, assertFalse, assertTrue} from '../../base/logging'; 19import { 20 DisableTracingRequest, 21 DisableTracingResponse, 22 EnableTracingRequest, 23 EnableTracingResponse, 24 FreeBuffersRequest, 25 FreeBuffersResponse, 26 GetTraceStatsRequest, 27 GetTraceStatsResponse, 28 IBufferStats, 29 IMethodInfo, 30 IPCFrame, 31 ISlice, 32 QueryServiceStateRequest, 33 QueryServiceStateResponse, 34 ReadBuffersRequest, 35 ReadBuffersResponse, 36 TraceConfig, 37} from '../protos'; 38 39import {RecordingError} from './recording_error_handling'; 40import { 41 ByteStream, 42 DataSource, 43 TracingSession, 44 TracingSessionListener, 45} from './recording_interfaces_v2'; 46import { 47 BUFFER_USAGE_INCORRECT_FORMAT, 48 BUFFER_USAGE_NOT_ACCESSIBLE, 49 PARSING_UNABLE_TO_DECODE_METHOD, 50 PARSING_UNKNWON_REQUEST_ID, 51 PARSING_UNRECOGNIZED_MESSAGE, 52 PARSING_UNRECOGNIZED_PORT, 53 RECORDING_IN_PROGRESS, 54} from './recording_utils'; 55 56// See wire_protocol.proto for more details. 57const WIRE_PROTOCOL_HEADER_SIZE = 4; 58// See basic_types.h (kIPCBufferSize) for more details. 59const MAX_IPC_BUFFER_SIZE = 128 * 1024; 60 61const PROTO_LEN_DELIMITED_WIRE_TYPE = 2; 62const TRACE_PACKET_PROTO_ID = 1; 63const TRACE_PACKET_PROTO_TAG = 64 (TRACE_PACKET_PROTO_ID << 3) | PROTO_LEN_DELIMITED_WIRE_TYPE; 65 66function parseMessageSize(buffer: Uint8Array) { 67 const dv = new DataView(buffer.buffer, buffer.byteOffset, buffer.length); 68 return dv.getUint32(0, true); 69} 70 71// This class implements the protocol described in 72// https://perfetto.dev/docs/design-docs/api-and-abi#tracing-protocol-abi 73export class TracedTracingSession implements TracingSession { 74 // Buffers received wire protocol data. 75 private incomingBuffer = new Uint8Array(MAX_IPC_BUFFER_SIZE); 76 private bufferedPartLength = 0; 77 private currentFrameLength?: number; 78 79 private availableMethods: IMethodInfo[] = []; 80 private serviceId = -1; 81 82 private resolveBindingPromise!: Deferred<void>; 83 private requestMethods = new Map<number, string>(); 84 85 // Needed for ReadBufferResponse: all the trace packets are split into 86 // several slices. |partialPacket| is the buffer for them. Once we receive a 87 // slice with the flag |lastSliceForPacket|, a new packet is created. 88 private partialPacket: ISlice[] = []; 89 // Accumulates trace packets into a proto trace file.. 90 private traceProtoWriter = protobuf.Writer.create(); 91 92 // Accumulates DataSource objects from QueryServiceStateResponse, 93 // which can have >1 replies for each query 94 // go/codesearch/android/external/perfetto/protos/ 95 // perfetto/ipc/consumer_port.proto;l=243-246 96 private pendingDataSources: DataSource[] = []; 97 98 // For concurrent calls to 'QueryServiceState', we return the same value. 99 private pendingQssMessage?: Deferred<DataSource[]>; 100 101 // Wire protocol request ID. After each request it is increased. It is needed 102 // to keep track of the type of request, and parse the response correctly. 103 private requestId = 1; 104 105 private pendingStatsMessages = new Array<Deferred<IBufferStats[]>>(); 106 107 // The bytestream is obtained when creating a connection with a target. 108 // For instance, the AdbStream is obtained from a connection with an Adb 109 // device. 110 constructor( 111 private byteStream: ByteStream, 112 private tracingSessionListener: TracingSessionListener) { 113 this.byteStream.addOnStreamDataCallback( 114 (data) => this.handleReceivedData(data)); 115 this.byteStream.addOnStreamCloseCallback(() => this.clearState()); 116 } 117 118 queryServiceState(): Promise<DataSource[]> { 119 if (this.pendingQssMessage) { 120 return this.pendingQssMessage; 121 } 122 123 const requestProto = 124 QueryServiceStateRequest.encode(new QueryServiceStateRequest()) 125 .finish(); 126 this.rpcInvoke('QueryServiceState', requestProto); 127 128 return this.pendingQssMessage = defer<DataSource[]>(); 129 } 130 131 start(config: TraceConfig): void { 132 const duration = config.durationMs; 133 this.tracingSessionListener.onStatus(`${RECORDING_IN_PROGRESS}${ 134 duration ? ' for ' + duration.toString() + ' ms' : ''}...`); 135 136 const enableTracingRequest = new EnableTracingRequest(); 137 enableTracingRequest.traceConfig = config; 138 const enableTracingRequestProto = 139 EnableTracingRequest.encode(enableTracingRequest).finish(); 140 this.rpcInvoke('EnableTracing', enableTracingRequestProto); 141 } 142 143 cancel(): void { 144 this.terminateConnection(); 145 } 146 147 stop(): void { 148 const requestProto = 149 DisableTracingRequest.encode(new DisableTracingRequest()).finish(); 150 this.rpcInvoke('DisableTracing', requestProto); 151 } 152 153 async getTraceBufferUsage(): Promise<number> { 154 if (!this.byteStream.isConnected()) { 155 // TODO(octaviant): make this more in line with the other trace buffer 156 // error cases. 157 return 0; 158 } 159 const bufferStats = await this.getBufferStats(); 160 let percentageUsed = -1; 161 for (const buffer of bufferStats) { 162 if (!Number.isFinite(buffer.bytesWritten) || 163 !Number.isFinite(buffer.bufferSize)) { 164 continue; 165 } 166 const used = assertExists(buffer.bytesWritten); 167 const total = assertExists(buffer.bufferSize); 168 if (total >= 0) { 169 percentageUsed = Math.max(percentageUsed, used / total); 170 } 171 } 172 173 if (percentageUsed === -1) { 174 return Promise.reject(new RecordingError(BUFFER_USAGE_INCORRECT_FORMAT)); 175 } 176 return percentageUsed; 177 } 178 179 initConnection(): Promise<void> { 180 // bind IPC methods 181 const requestId = this.requestId++; 182 const frame = new IPCFrame({ 183 requestId, 184 msgBindService: new IPCFrame.BindService({serviceName: 'ConsumerPort'}), 185 }); 186 this.writeFrame(frame); 187 188 // We shouldn't bind multiple times to the service in the same tracing 189 // session. 190 assertFalse(!!this.resolveBindingPromise); 191 this.resolveBindingPromise = defer<void>(); 192 return this.resolveBindingPromise; 193 } 194 195 private getBufferStats(): Promise<IBufferStats[]> { 196 const getTraceStatsRequestProto = 197 GetTraceStatsRequest.encode(new GetTraceStatsRequest()).finish(); 198 try { 199 this.rpcInvoke('GetTraceStats', getTraceStatsRequestProto); 200 } catch (e) { 201 // GetTraceStats was introduced only on Android 10. 202 this.raiseError(e); 203 } 204 205 const statsMessage = defer<IBufferStats[]>(); 206 this.pendingStatsMessages.push(statsMessage); 207 return statsMessage; 208 } 209 210 private terminateConnection(): void { 211 this.clearState(); 212 const requestProto = 213 FreeBuffersRequest.encode(new FreeBuffersRequest()).finish(); 214 this.rpcInvoke('FreeBuffers', requestProto); 215 this.byteStream.close(); 216 } 217 218 private clearState() { 219 for (const statsMessage of this.pendingStatsMessages) { 220 statsMessage.reject(new RecordingError(BUFFER_USAGE_NOT_ACCESSIBLE)); 221 } 222 this.pendingStatsMessages = []; 223 this.pendingDataSources = []; 224 this.pendingQssMessage = undefined; 225 } 226 227 private rpcInvoke(methodName: string, argsProto: Uint8Array): void { 228 if (!this.byteStream.isConnected()) { 229 return; 230 } 231 const method = this.availableMethods.find((m) => m.name === methodName); 232 if (!method || !method.id) { 233 throw new RecordingError( 234 `Method ${methodName} not supported by the target`); 235 } 236 const requestId = this.requestId++; 237 const frame = new IPCFrame({ 238 requestId, 239 msgInvokeMethod: new IPCFrame.InvokeMethod( 240 {serviceId: this.serviceId, methodId: method.id, argsProto}), 241 }); 242 this.requestMethods.set(requestId, methodName); 243 this.writeFrame(frame); 244 } 245 246 private writeFrame(frame: IPCFrame): void { 247 const frameProto: Uint8Array = IPCFrame.encode(frame).finish(); 248 const frameLen = frameProto.length; 249 const buf = new Uint8Array(WIRE_PROTOCOL_HEADER_SIZE + frameLen); 250 const dv = new DataView(buf.buffer); 251 dv.setUint32(0, frameProto.length, /* littleEndian */ true); 252 for (let i = 0; i < frameLen; i++) { 253 dv.setUint8(WIRE_PROTOCOL_HEADER_SIZE + i, frameProto[i]); 254 } 255 this.byteStream.write(buf); 256 } 257 258 private handleReceivedData(rawData: Uint8Array): void { 259 // we parse the length of the next frame if it's available 260 if (this.currentFrameLength === undefined && 261 this.canCompleteLengthHeader(rawData)) { 262 const remainingFrameBytes = 263 WIRE_PROTOCOL_HEADER_SIZE - this.bufferedPartLength; 264 this.appendToIncomingBuffer(rawData.subarray(0, remainingFrameBytes)); 265 rawData = rawData.subarray(remainingFrameBytes); 266 267 this.currentFrameLength = parseMessageSize(this.incomingBuffer); 268 this.bufferedPartLength = 0; 269 } 270 271 // Parse all complete frames. 272 while (this.currentFrameLength !== undefined && 273 this.bufferedPartLength + rawData.length >= 274 this.currentFrameLength) { 275 // Read the remaining part of this message. 276 const bytesToCompleteMessage = 277 this.currentFrameLength - this.bufferedPartLength; 278 this.appendToIncomingBuffer(rawData.subarray(0, bytesToCompleteMessage)); 279 this.parseFrame(this.incomingBuffer.subarray(0, this.currentFrameLength)); 280 this.bufferedPartLength = 0; 281 // Remove the data just parsed. 282 rawData = rawData.subarray(bytesToCompleteMessage); 283 284 if (!this.canCompleteLengthHeader(rawData)) { 285 this.currentFrameLength = undefined; 286 break; 287 } 288 this.currentFrameLength = parseMessageSize(rawData); 289 rawData = rawData.subarray(WIRE_PROTOCOL_HEADER_SIZE); 290 } 291 292 // Buffer the remaining data (part of the next message). 293 this.appendToIncomingBuffer(rawData); 294 } 295 296 private canCompleteLengthHeader(newData: Uint8Array): boolean { 297 return newData.length + this.bufferedPartLength > WIRE_PROTOCOL_HEADER_SIZE; 298 } 299 300 private appendToIncomingBuffer(array: Uint8Array): void { 301 this.incomingBuffer.set(array, this.bufferedPartLength); 302 this.bufferedPartLength += array.length; 303 } 304 305 private parseFrame(frameBuffer: Uint8Array): void { 306 // Get a copy of the ArrayBuffer to avoid the original being overriden. 307 // See 170256902#comment21 308 const frame = IPCFrame.decode(frameBuffer.slice()); 309 if (frame.msg === 'msgBindServiceReply') { 310 const msgBindServiceReply = frame.msgBindServiceReply; 311 if (msgBindServiceReply && msgBindServiceReply.methods && 312 msgBindServiceReply.serviceId) { 313 assertTrue(msgBindServiceReply.success === true); 314 this.availableMethods = msgBindServiceReply.methods; 315 this.serviceId = msgBindServiceReply.serviceId; 316 this.resolveBindingPromise.resolve(); 317 } 318 } else if (frame.msg === 'msgInvokeMethodReply') { 319 const msgInvokeMethodReply = frame.msgInvokeMethodReply; 320 // We process messages without a `replyProto` field (for instance 321 // `FreeBuffers` does not have `replyProto`). However, we ignore messages 322 // without a valid 'success' field. 323 if (!msgInvokeMethodReply || !msgInvokeMethodReply.success) { 324 return; 325 } 326 327 const method = this.requestMethods.get(frame.requestId); 328 if (!method) { 329 this.raiseError(`${PARSING_UNKNWON_REQUEST_ID}: ${frame.requestId}`); 330 return; 331 } 332 const decoder = decoders.get(method); 333 if (decoder === undefined) { 334 this.raiseError(`${PARSING_UNABLE_TO_DECODE_METHOD}: ${method}`); 335 return; 336 } 337 const data = {...decoder(msgInvokeMethodReply.replyProto)}; 338 339 if (method === 'ReadBuffers') { 340 if (data.slices) { 341 for (const slice of data.slices) { 342 this.partialPacket.push(slice); 343 if (slice.lastSliceForPacket) { 344 let bufferSize = 0; 345 for (const slice of this.partialPacket) { 346 bufferSize += slice.data!.length; 347 } 348 const tracePacket = new Uint8Array(bufferSize); 349 let written = 0; 350 for (const slice of this.partialPacket) { 351 const data = slice.data!; 352 tracePacket.set(data, written); 353 written += data.length; 354 } 355 this.traceProtoWriter.uint32(TRACE_PACKET_PROTO_TAG); 356 this.traceProtoWriter.bytes(tracePacket); 357 this.partialPacket = []; 358 } 359 } 360 } 361 if (msgInvokeMethodReply.hasMore === false) { 362 this.tracingSessionListener.onTraceData( 363 this.traceProtoWriter.finish()); 364 this.terminateConnection(); 365 } 366 } else if (method === 'EnableTracing') { 367 const readBuffersRequestProto = 368 ReadBuffersRequest.encode(new ReadBuffersRequest()).finish(); 369 this.rpcInvoke('ReadBuffers', readBuffersRequestProto); 370 } else if (method === 'GetTraceStats') { 371 const maybePendingStatsMessage = this.pendingStatsMessages.shift(); 372 if (maybePendingStatsMessage) { 373 maybePendingStatsMessage.resolve(data?.traceStats?.bufferStats || []); 374 } 375 } else if (method === 'FreeBuffers') { 376 // No action required. If we successfully read a whole trace, 377 // we close the connection. Alternatively, if the tracing finishes 378 // with an exception or if the user cancels it, we also close the 379 // connection. 380 } else if (method === 'DisableTracing') { 381 // No action required. Same reasoning as for FreeBuffers. 382 } else if (method === 'QueryServiceState') { 383 const dataSources = 384 (data as QueryServiceStateResponse)?.serviceState?.dataSources || 385 []; 386 for (const dataSource of dataSources) { 387 const name = dataSource?.dsDescriptor?.name; 388 if (name) { 389 this.pendingDataSources.push( 390 {name, descriptor: dataSource.dsDescriptor}); 391 } 392 } 393 if (msgInvokeMethodReply.hasMore === false) { 394 assertExists(this.pendingQssMessage).resolve(this.pendingDataSources); 395 this.pendingDataSources = []; 396 this.pendingQssMessage = undefined; 397 } 398 } else { 399 this.raiseError(`${PARSING_UNRECOGNIZED_PORT}: ${method}`); 400 } 401 } else { 402 this.raiseError(`${PARSING_UNRECOGNIZED_MESSAGE}: ${frame.msg}`); 403 } 404 } 405 406 private raiseError(message: string): void { 407 this.terminateConnection(); 408 this.tracingSessionListener.onError(message); 409 } 410} 411 412const decoders = 413 new Map<string, Function>() 414 .set('EnableTracing', EnableTracingResponse.decode) 415 .set('FreeBuffers', FreeBuffersResponse.decode) 416 .set('ReadBuffers', ReadBuffersResponse.decode) 417 .set('DisableTracing', DisableTracingResponse.decode) 418 .set('GetTraceStats', GetTraceStatsResponse.decode) 419 .set('QueryServiceState', QueryServiceStateResponse.decode); 420