1// Copyright (C) 2022 The Android Open Source Project 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15import protobuf from 'protobufjs/minimal'; 16 17import {defer, Deferred} from '../../base/deferred'; 18import {assertExists, assertFalse, assertTrue} from '../../base/logging'; 19import { 20 DisableTracingRequest, 21 DisableTracingResponse, 22 EnableTracingRequest, 23 EnableTracingResponse, 24 FreeBuffersRequest, 25 FreeBuffersResponse, 26 GetTraceStatsRequest, 27 GetTraceStatsResponse, 28 IBufferStats, 29 IMethodInfo, 30 IPCFrame, 31 ISlice, 32 QueryServiceStateRequest, 33 QueryServiceStateResponse, 34 ReadBuffersRequest, 35 ReadBuffersResponse, 36 TraceConfig, 37} from '../../protos'; 38 39import {RecordingError} from './recording_error_handling'; 40import { 41 ByteStream, 42 DataSource, 43 TracingSession, 44 TracingSessionListener, 45} from './recording_interfaces_v2'; 46import { 47 BUFFER_USAGE_INCORRECT_FORMAT, 48 BUFFER_USAGE_NOT_ACCESSIBLE, 49 PARSING_UNABLE_TO_DECODE_METHOD, 50 PARSING_UNKNWON_REQUEST_ID, 51 PARSING_UNRECOGNIZED_MESSAGE, 52 PARSING_UNRECOGNIZED_PORT, 53 RECORDING_IN_PROGRESS, 54} from './recording_utils'; 55 56// See wire_protocol.proto for more details. 57const WIRE_PROTOCOL_HEADER_SIZE = 4; 58// See basic_types.h (kIPCBufferSize) for more details. 59const MAX_IPC_BUFFER_SIZE = 128 * 1024; 60 61const PROTO_LEN_DELIMITED_WIRE_TYPE = 2; 62const TRACE_PACKET_PROTO_ID = 1; 63const TRACE_PACKET_PROTO_TAG = 64 (TRACE_PACKET_PROTO_ID << 3) | PROTO_LEN_DELIMITED_WIRE_TYPE; 65 66function parseMessageSize(buffer: Uint8Array) { 67 const dv = new DataView(buffer.buffer, buffer.byteOffset, buffer.length); 68 return dv.getUint32(0, true); 69} 70 71// This class implements the protocol described in 72// https://perfetto.dev/docs/design-docs/api-and-abi#tracing-protocol-abi 73export class TracedTracingSession implements TracingSession { 74 // Buffers received wire protocol data. 75 private incomingBuffer = new Uint8Array(MAX_IPC_BUFFER_SIZE); 76 private bufferedPartLength = 0; 77 private currentFrameLength?: number; 78 79 private availableMethods: IMethodInfo[] = []; 80 private serviceId = -1; 81 82 private resolveBindingPromise!: Deferred<void>; 83 private requestMethods = new Map<number, string>(); 84 85 // Needed for ReadBufferResponse: all the trace packets are split into 86 // several slices. |partialPacket| is the buffer for them. Once we receive a 87 // slice with the flag |lastSliceForPacket|, a new packet is created. 88 private partialPacket: ISlice[] = []; 89 // Accumulates trace packets into a proto trace file.. 90 private traceProtoWriter = protobuf.Writer.create(); 91 92 // Accumulates DataSource objects from QueryServiceStateResponse, 93 // which can have >1 replies for each query 94 // go/codesearch/android/external/perfetto/protos/ 95 // perfetto/ipc/consumer_port.proto;l=243-246 96 private pendingDataSources: DataSource[] = []; 97 98 // For concurrent calls to 'QueryServiceState', we return the same value. 99 private pendingQssMessage?: Deferred<DataSource[]>; 100 101 // Wire protocol request ID. After each request it is increased. It is needed 102 // to keep track of the type of request, and parse the response correctly. 103 private requestId = 1; 104 105 private pendingStatsMessages = new Array<Deferred<IBufferStats[]>>(); 106 107 // The bytestream is obtained when creating a connection with a target. 108 // For instance, the AdbStream is obtained from a connection with an Adb 109 // device. 110 constructor( 111 private byteStream: ByteStream, 112 private tracingSessionListener: TracingSessionListener, 113 ) { 114 this.byteStream.addOnStreamDataCallback((data) => 115 this.handleReceivedData(data), 116 ); 117 this.byteStream.addOnStreamCloseCallback(() => this.clearState()); 118 } 119 120 queryServiceState(): Promise<DataSource[]> { 121 if (this.pendingQssMessage) { 122 return this.pendingQssMessage; 123 } 124 125 const requestProto = QueryServiceStateRequest.encode( 126 new QueryServiceStateRequest(), 127 ).finish(); 128 this.rpcInvoke('QueryServiceState', requestProto); 129 130 return (this.pendingQssMessage = defer<DataSource[]>()); 131 } 132 133 start(config: TraceConfig): void { 134 const duration = config.durationMs; 135 this.tracingSessionListener.onStatus( 136 `${RECORDING_IN_PROGRESS}${ 137 duration ? ' for ' + duration.toString() + ' ms' : '' 138 }...`, 139 ); 140 141 const enableTracingRequest = new EnableTracingRequest(); 142 enableTracingRequest.traceConfig = config; 143 const enableTracingRequestProto = 144 EnableTracingRequest.encode(enableTracingRequest).finish(); 145 this.rpcInvoke('EnableTracing', enableTracingRequestProto); 146 } 147 148 cancel(): void { 149 this.terminateConnection(); 150 } 151 152 stop(): void { 153 const requestProto = DisableTracingRequest.encode( 154 new DisableTracingRequest(), 155 ).finish(); 156 this.rpcInvoke('DisableTracing', requestProto); 157 } 158 159 async getTraceBufferUsage(): Promise<number> { 160 if (!this.byteStream.isConnected()) { 161 // TODO(octaviant): make this more in line with the other trace buffer 162 // error cases. 163 return 0; 164 } 165 const bufferStats = await this.getBufferStats(); 166 let percentageUsed = -1; 167 for (const buffer of bufferStats) { 168 if ( 169 !Number.isFinite(buffer.bytesWritten) || 170 !Number.isFinite(buffer.bufferSize) 171 ) { 172 continue; 173 } 174 const used = assertExists(buffer.bytesWritten); 175 const total = assertExists(buffer.bufferSize); 176 if (total >= 0) { 177 percentageUsed = Math.max(percentageUsed, used / total); 178 } 179 } 180 181 if (percentageUsed === -1) { 182 return Promise.reject(new RecordingError(BUFFER_USAGE_INCORRECT_FORMAT)); 183 } 184 return percentageUsed; 185 } 186 187 initConnection(): Promise<void> { 188 // bind IPC methods 189 const requestId = this.requestId++; 190 const frame = new IPCFrame({ 191 requestId, 192 msgBindService: new IPCFrame.BindService({serviceName: 'ConsumerPort'}), 193 }); 194 this.writeFrame(frame); 195 196 // We shouldn't bind multiple times to the service in the same tracing 197 // session. 198 // eslint-disable-next-line @typescript-eslint/strict-boolean-expressions 199 assertFalse(!!this.resolveBindingPromise); 200 this.resolveBindingPromise = defer<void>(); 201 return this.resolveBindingPromise; 202 } 203 204 private getBufferStats(): Promise<IBufferStats[]> { 205 const getTraceStatsRequestProto = GetTraceStatsRequest.encode( 206 new GetTraceStatsRequest(), 207 ).finish(); 208 try { 209 this.rpcInvoke('GetTraceStats', getTraceStatsRequestProto); 210 } catch (e) { 211 // GetTraceStats was introduced only on Android 10. 212 this.raiseError(e); 213 } 214 215 const statsMessage = defer<IBufferStats[]>(); 216 this.pendingStatsMessages.push(statsMessage); 217 return statsMessage; 218 } 219 220 private terminateConnection(): void { 221 this.clearState(); 222 const requestProto = FreeBuffersRequest.encode( 223 new FreeBuffersRequest(), 224 ).finish(); 225 this.rpcInvoke('FreeBuffers', requestProto); 226 this.byteStream.close(); 227 } 228 229 private clearState() { 230 for (const statsMessage of this.pendingStatsMessages) { 231 statsMessage.reject(new RecordingError(BUFFER_USAGE_NOT_ACCESSIBLE)); 232 } 233 this.pendingStatsMessages = []; 234 this.pendingDataSources = []; 235 this.pendingQssMessage = undefined; 236 } 237 238 private rpcInvoke(methodName: string, argsProto: Uint8Array): void { 239 if (!this.byteStream.isConnected()) { 240 return; 241 } 242 const method = this.availableMethods.find((m) => m.name === methodName); 243 // eslint-disable-next-line @typescript-eslint/strict-boolean-expressions 244 if (!method || !method.id) { 245 throw new RecordingError( 246 `Method ${methodName} not supported by the target`, 247 ); 248 } 249 const requestId = this.requestId++; 250 const frame = new IPCFrame({ 251 requestId, 252 msgInvokeMethod: new IPCFrame.InvokeMethod({ 253 serviceId: this.serviceId, 254 methodId: method.id, 255 argsProto, 256 }), 257 }); 258 this.requestMethods.set(requestId, methodName); 259 this.writeFrame(frame); 260 } 261 262 private writeFrame(frame: IPCFrame): void { 263 const frameProto: Uint8Array = IPCFrame.encode(frame).finish(); 264 const frameLen = frameProto.length; 265 const buf = new Uint8Array(WIRE_PROTOCOL_HEADER_SIZE + frameLen); 266 const dv = new DataView(buf.buffer); 267 dv.setUint32(0, frameProto.length, /* littleEndian */ true); 268 for (let i = 0; i < frameLen; i++) { 269 dv.setUint8(WIRE_PROTOCOL_HEADER_SIZE + i, frameProto[i]); 270 } 271 this.byteStream.write(buf); 272 } 273 274 private handleReceivedData(rawData: Uint8Array): void { 275 // we parse the length of the next frame if it's available 276 if ( 277 this.currentFrameLength === undefined && 278 this.canCompleteLengthHeader(rawData) 279 ) { 280 const remainingFrameBytes = 281 WIRE_PROTOCOL_HEADER_SIZE - this.bufferedPartLength; 282 this.appendToIncomingBuffer(rawData.subarray(0, remainingFrameBytes)); 283 rawData = rawData.subarray(remainingFrameBytes); 284 285 this.currentFrameLength = parseMessageSize(this.incomingBuffer); 286 this.bufferedPartLength = 0; 287 } 288 289 // Parse all complete frames. 290 while ( 291 this.currentFrameLength !== undefined && 292 this.bufferedPartLength + rawData.length >= this.currentFrameLength 293 ) { 294 // Read the remaining part of this message. 295 const bytesToCompleteMessage = 296 this.currentFrameLength - this.bufferedPartLength; 297 this.appendToIncomingBuffer(rawData.subarray(0, bytesToCompleteMessage)); 298 this.parseFrame(this.incomingBuffer.subarray(0, this.currentFrameLength)); 299 this.bufferedPartLength = 0; 300 // Remove the data just parsed. 301 rawData = rawData.subarray(bytesToCompleteMessage); 302 303 if (!this.canCompleteLengthHeader(rawData)) { 304 this.currentFrameLength = undefined; 305 break; 306 } 307 this.currentFrameLength = parseMessageSize(rawData); 308 rawData = rawData.subarray(WIRE_PROTOCOL_HEADER_SIZE); 309 } 310 311 // Buffer the remaining data (part of the next message). 312 this.appendToIncomingBuffer(rawData); 313 } 314 315 private canCompleteLengthHeader(newData: Uint8Array): boolean { 316 return newData.length + this.bufferedPartLength > WIRE_PROTOCOL_HEADER_SIZE; 317 } 318 319 private appendToIncomingBuffer(array: Uint8Array): void { 320 this.incomingBuffer.set(array, this.bufferedPartLength); 321 this.bufferedPartLength += array.length; 322 } 323 324 private parseFrame(frameBuffer: Uint8Array): void { 325 // Get a copy of the ArrayBuffer to avoid the original being overriden. 326 // See 170256902#comment21 327 const frame = IPCFrame.decode(frameBuffer.slice()); 328 if (frame.msg === 'msgBindServiceReply') { 329 const msgBindServiceReply = frame.msgBindServiceReply; 330 /* eslint-disable @typescript-eslint/strict-boolean-expressions */ 331 if ( 332 msgBindServiceReply && 333 msgBindServiceReply.methods && 334 msgBindServiceReply.serviceId 335 ) { 336 /* eslint-enable */ 337 assertTrue(msgBindServiceReply.success === true); 338 this.availableMethods = msgBindServiceReply.methods; 339 this.serviceId = msgBindServiceReply.serviceId; 340 this.resolveBindingPromise.resolve(); 341 } 342 } else if (frame.msg === 'msgInvokeMethodReply') { 343 const msgInvokeMethodReply = frame.msgInvokeMethodReply; 344 // We process messages without a `replyProto` field (for instance 345 // `FreeBuffers` does not have `replyProto`). However, we ignore messages 346 // without a valid 'success' field. 347 if (!msgInvokeMethodReply || !msgInvokeMethodReply.success) { 348 return; 349 } 350 351 const method = this.requestMethods.get(frame.requestId); 352 if (!method) { 353 this.raiseError(`${PARSING_UNKNWON_REQUEST_ID}: ${frame.requestId}`); 354 return; 355 } 356 const decoder = decoders.get(method); 357 if (decoder === undefined) { 358 this.raiseError(`${PARSING_UNABLE_TO_DECODE_METHOD}: ${method}`); 359 return; 360 } 361 const data = {...decoder(msgInvokeMethodReply.replyProto)}; 362 363 if (method === 'ReadBuffers') { 364 if (data.slices) { 365 for (const slice of data.slices) { 366 this.partialPacket.push(slice); 367 if (slice.lastSliceForPacket) { 368 let bufferSize = 0; 369 for (const slice of this.partialPacket) { 370 bufferSize += slice.data!.length; 371 } 372 const tracePacket = new Uint8Array(bufferSize); 373 let written = 0; 374 for (const slice of this.partialPacket) { 375 const data = slice.data!; 376 tracePacket.set(data, written); 377 written += data.length; 378 } 379 this.traceProtoWriter.uint32(TRACE_PACKET_PROTO_TAG); 380 this.traceProtoWriter.bytes(tracePacket); 381 this.partialPacket = []; 382 } 383 } 384 } 385 if (msgInvokeMethodReply.hasMore === false) { 386 this.tracingSessionListener.onTraceData( 387 this.traceProtoWriter.finish(), 388 ); 389 this.terminateConnection(); 390 } 391 } else if (method === 'EnableTracing') { 392 const readBuffersRequestProto = ReadBuffersRequest.encode( 393 new ReadBuffersRequest(), 394 ).finish(); 395 this.rpcInvoke('ReadBuffers', readBuffersRequestProto); 396 } else if (method === 'GetTraceStats') { 397 const maybePendingStatsMessage = this.pendingStatsMessages.shift(); 398 if (maybePendingStatsMessage) { 399 maybePendingStatsMessage.resolve(data?.traceStats?.bufferStats || []); 400 } 401 } else if (method === 'FreeBuffers') { 402 // No action required. If we successfully read a whole trace, 403 // we close the connection. Alternatively, if the tracing finishes 404 // with an exception or if the user cancels it, we also close the 405 // connection. 406 } else if (method === 'DisableTracing') { 407 // No action required. Same reasoning as for FreeBuffers. 408 } else if (method === 'QueryServiceState') { 409 const dataSources = 410 (data as QueryServiceStateResponse)?.serviceState?.dataSources || []; 411 for (const dataSource of dataSources) { 412 const name = dataSource?.dsDescriptor?.name; 413 if (name) { 414 this.pendingDataSources.push({ 415 name, 416 descriptor: dataSource.dsDescriptor, 417 }); 418 } 419 } 420 if (msgInvokeMethodReply.hasMore === false) { 421 assertExists(this.pendingQssMessage).resolve(this.pendingDataSources); 422 this.pendingDataSources = []; 423 this.pendingQssMessage = undefined; 424 } 425 } else { 426 this.raiseError(`${PARSING_UNRECOGNIZED_PORT}: ${method}`); 427 } 428 } else { 429 this.raiseError(`${PARSING_UNRECOGNIZED_MESSAGE}: ${frame.msg}`); 430 } 431 } 432 433 private raiseError(message: string): void { 434 this.terminateConnection(); 435 this.tracingSessionListener.onError(message); 436 } 437} 438 439const decoders = new Map<string, Function>() 440 .set('EnableTracing', EnableTracingResponse.decode) 441 .set('FreeBuffers', FreeBuffersResponse.decode) 442 .set('ReadBuffers', ReadBuffersResponse.decode) 443 .set('DisableTracing', DisableTracingResponse.decode) 444 .set('GetTraceStats', GetTraceStatsResponse.decode) 445 .set('QueryServiceState', QueryServiceStateResponse.decode); 446