• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright (C) 2022 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15import protobuf from 'protobufjs/minimal';
16
17import {defer, Deferred} from '../../base/deferred';
18import {assertExists, assertFalse, assertTrue} from '../../base/logging';
19import {
20  DisableTracingRequest,
21  DisableTracingResponse,
22  EnableTracingRequest,
23  EnableTracingResponse,
24  FreeBuffersRequest,
25  FreeBuffersResponse,
26  GetTraceStatsRequest,
27  GetTraceStatsResponse,
28  IBufferStats,
29  IMethodInfo,
30  IPCFrame,
31  ISlice,
32  QueryServiceStateRequest,
33  QueryServiceStateResponse,
34  ReadBuffersRequest,
35  ReadBuffersResponse,
36  TraceConfig,
37} from '../../protos';
38
39import {RecordingError} from './recording_error_handling';
40import {
41  ByteStream,
42  DataSource,
43  TracingSession,
44  TracingSessionListener,
45} from './recording_interfaces_v2';
46import {
47  BUFFER_USAGE_INCORRECT_FORMAT,
48  BUFFER_USAGE_NOT_ACCESSIBLE,
49  PARSING_UNABLE_TO_DECODE_METHOD,
50  PARSING_UNKNWON_REQUEST_ID,
51  PARSING_UNRECOGNIZED_MESSAGE,
52  PARSING_UNRECOGNIZED_PORT,
53  RECORDING_IN_PROGRESS,
54} from './recording_utils';
55
56// See wire_protocol.proto for more details.
57const WIRE_PROTOCOL_HEADER_SIZE = 4;
58// See basic_types.h (kIPCBufferSize) for more details.
59const MAX_IPC_BUFFER_SIZE = 128 * 1024;
60
61const PROTO_LEN_DELIMITED_WIRE_TYPE = 2;
62const TRACE_PACKET_PROTO_ID = 1;
63const TRACE_PACKET_PROTO_TAG =
64  (TRACE_PACKET_PROTO_ID << 3) | PROTO_LEN_DELIMITED_WIRE_TYPE;
65
66function parseMessageSize(buffer: Uint8Array) {
67  const dv = new DataView(buffer.buffer, buffer.byteOffset, buffer.length);
68  return dv.getUint32(0, true);
69}
70
71// This class implements the protocol described in
72// https://perfetto.dev/docs/design-docs/api-and-abi#tracing-protocol-abi
73export class TracedTracingSession implements TracingSession {
74  // Buffers received wire protocol data.
75  private incomingBuffer = new Uint8Array(MAX_IPC_BUFFER_SIZE);
76  private bufferedPartLength = 0;
77  private currentFrameLength?: number;
78
79  private availableMethods: IMethodInfo[] = [];
80  private serviceId = -1;
81
82  private resolveBindingPromise!: Deferred<void>;
83  private requestMethods = new Map<number, string>();
84
85  // Needed for ReadBufferResponse: all the trace packets are split into
86  // several slices. |partialPacket| is the buffer for them. Once we receive a
87  // slice with the flag |lastSliceForPacket|, a new packet is created.
88  private partialPacket: ISlice[] = [];
89  // Accumulates trace packets into a proto trace file..
90  private traceProtoWriter = protobuf.Writer.create();
91
92  // Accumulates DataSource objects from QueryServiceStateResponse,
93  // which can have >1 replies for each query
94  // go/codesearch/android/external/perfetto/protos/
95  // perfetto/ipc/consumer_port.proto;l=243-246
96  private pendingDataSources: DataSource[] = [];
97
98  // For concurrent calls to 'QueryServiceState', we return the same value.
99  private pendingQssMessage?: Deferred<DataSource[]>;
100
101  // Wire protocol request ID. After each request it is increased. It is needed
102  // to keep track of the type of request, and parse the response correctly.
103  private requestId = 1;
104
105  private pendingStatsMessages = new Array<Deferred<IBufferStats[]>>();
106
107  // The bytestream is obtained when creating a connection with a target.
108  // For instance, the AdbStream is obtained from a connection with an Adb
109  // device.
110  constructor(
111    private byteStream: ByteStream,
112    private tracingSessionListener: TracingSessionListener,
113  ) {
114    this.byteStream.addOnStreamDataCallback((data) =>
115      this.handleReceivedData(data),
116    );
117    this.byteStream.addOnStreamCloseCallback(() => this.clearState());
118  }
119
120  queryServiceState(): Promise<DataSource[]> {
121    if (this.pendingQssMessage) {
122      return this.pendingQssMessage;
123    }
124
125    const requestProto = QueryServiceStateRequest.encode(
126      new QueryServiceStateRequest(),
127    ).finish();
128    this.rpcInvoke('QueryServiceState', requestProto);
129
130    return (this.pendingQssMessage = defer<DataSource[]>());
131  }
132
133  start(config: TraceConfig): void {
134    const duration = config.durationMs;
135    this.tracingSessionListener.onStatus(
136      `${RECORDING_IN_PROGRESS}${
137        duration ? ' for ' + duration.toString() + ' ms' : ''
138      }...`,
139    );
140
141    const enableTracingRequest = new EnableTracingRequest();
142    enableTracingRequest.traceConfig = config;
143    const enableTracingRequestProto =
144      EnableTracingRequest.encode(enableTracingRequest).finish();
145    this.rpcInvoke('EnableTracing', enableTracingRequestProto);
146  }
147
148  cancel(): void {
149    this.terminateConnection();
150  }
151
152  stop(): void {
153    const requestProto = DisableTracingRequest.encode(
154      new DisableTracingRequest(),
155    ).finish();
156    this.rpcInvoke('DisableTracing', requestProto);
157  }
158
159  async getTraceBufferUsage(): Promise<number> {
160    if (!this.byteStream.isConnected()) {
161      // TODO(octaviant): make this more in line with the other trace buffer
162      //  error cases.
163      return 0;
164    }
165    const bufferStats = await this.getBufferStats();
166    let percentageUsed = -1;
167    for (const buffer of bufferStats) {
168      if (
169        !Number.isFinite(buffer.bytesWritten) ||
170        !Number.isFinite(buffer.bufferSize)
171      ) {
172        continue;
173      }
174      const used = assertExists(buffer.bytesWritten);
175      const total = assertExists(buffer.bufferSize);
176      if (total >= 0) {
177        percentageUsed = Math.max(percentageUsed, used / total);
178      }
179    }
180
181    if (percentageUsed === -1) {
182      return Promise.reject(new RecordingError(BUFFER_USAGE_INCORRECT_FORMAT));
183    }
184    return percentageUsed;
185  }
186
187  initConnection(): Promise<void> {
188    // bind IPC methods
189    const requestId = this.requestId++;
190    const frame = new IPCFrame({
191      requestId,
192      msgBindService: new IPCFrame.BindService({serviceName: 'ConsumerPort'}),
193    });
194    this.writeFrame(frame);
195
196    // We shouldn't bind multiple times to the service in the same tracing
197    // session.
198    // eslint-disable-next-line @typescript-eslint/strict-boolean-expressions
199    assertFalse(!!this.resolveBindingPromise);
200    this.resolveBindingPromise = defer<void>();
201    return this.resolveBindingPromise;
202  }
203
204  private getBufferStats(): Promise<IBufferStats[]> {
205    const getTraceStatsRequestProto = GetTraceStatsRequest.encode(
206      new GetTraceStatsRequest(),
207    ).finish();
208    try {
209      this.rpcInvoke('GetTraceStats', getTraceStatsRequestProto);
210    } catch (e) {
211      // GetTraceStats was introduced only on Android 10.
212      this.raiseError(e);
213    }
214
215    const statsMessage = defer<IBufferStats[]>();
216    this.pendingStatsMessages.push(statsMessage);
217    return statsMessage;
218  }
219
220  private terminateConnection(): void {
221    this.clearState();
222    const requestProto = FreeBuffersRequest.encode(
223      new FreeBuffersRequest(),
224    ).finish();
225    this.rpcInvoke('FreeBuffers', requestProto);
226    this.byteStream.close();
227  }
228
229  private clearState() {
230    for (const statsMessage of this.pendingStatsMessages) {
231      statsMessage.reject(new RecordingError(BUFFER_USAGE_NOT_ACCESSIBLE));
232    }
233    this.pendingStatsMessages = [];
234    this.pendingDataSources = [];
235    this.pendingQssMessage = undefined;
236  }
237
238  private rpcInvoke(methodName: string, argsProto: Uint8Array): void {
239    if (!this.byteStream.isConnected()) {
240      return;
241    }
242    const method = this.availableMethods.find((m) => m.name === methodName);
243    // eslint-disable-next-line @typescript-eslint/strict-boolean-expressions
244    if (!method || !method.id) {
245      throw new RecordingError(
246        `Method ${methodName} not supported by the target`,
247      );
248    }
249    const requestId = this.requestId++;
250    const frame = new IPCFrame({
251      requestId,
252      msgInvokeMethod: new IPCFrame.InvokeMethod({
253        serviceId: this.serviceId,
254        methodId: method.id,
255        argsProto,
256      }),
257    });
258    this.requestMethods.set(requestId, methodName);
259    this.writeFrame(frame);
260  }
261
262  private writeFrame(frame: IPCFrame): void {
263    const frameProto: Uint8Array = IPCFrame.encode(frame).finish();
264    const frameLen = frameProto.length;
265    const buf = new Uint8Array(WIRE_PROTOCOL_HEADER_SIZE + frameLen);
266    const dv = new DataView(buf.buffer);
267    dv.setUint32(0, frameProto.length, /* littleEndian */ true);
268    for (let i = 0; i < frameLen; i++) {
269      dv.setUint8(WIRE_PROTOCOL_HEADER_SIZE + i, frameProto[i]);
270    }
271    this.byteStream.write(buf);
272  }
273
274  private handleReceivedData(rawData: Uint8Array): void {
275    // we parse the length of the next frame if it's available
276    if (
277      this.currentFrameLength === undefined &&
278      this.canCompleteLengthHeader(rawData)
279    ) {
280      const remainingFrameBytes =
281        WIRE_PROTOCOL_HEADER_SIZE - this.bufferedPartLength;
282      this.appendToIncomingBuffer(rawData.subarray(0, remainingFrameBytes));
283      rawData = rawData.subarray(remainingFrameBytes);
284
285      this.currentFrameLength = parseMessageSize(this.incomingBuffer);
286      this.bufferedPartLength = 0;
287    }
288
289    // Parse all complete frames.
290    while (
291      this.currentFrameLength !== undefined &&
292      this.bufferedPartLength + rawData.length >= this.currentFrameLength
293    ) {
294      // Read the remaining part of this message.
295      const bytesToCompleteMessage =
296        this.currentFrameLength - this.bufferedPartLength;
297      this.appendToIncomingBuffer(rawData.subarray(0, bytesToCompleteMessage));
298      this.parseFrame(this.incomingBuffer.subarray(0, this.currentFrameLength));
299      this.bufferedPartLength = 0;
300      // Remove the data just parsed.
301      rawData = rawData.subarray(bytesToCompleteMessage);
302
303      if (!this.canCompleteLengthHeader(rawData)) {
304        this.currentFrameLength = undefined;
305        break;
306      }
307      this.currentFrameLength = parseMessageSize(rawData);
308      rawData = rawData.subarray(WIRE_PROTOCOL_HEADER_SIZE);
309    }
310
311    // Buffer the remaining data (part of the next message).
312    this.appendToIncomingBuffer(rawData);
313  }
314
315  private canCompleteLengthHeader(newData: Uint8Array): boolean {
316    return newData.length + this.bufferedPartLength > WIRE_PROTOCOL_HEADER_SIZE;
317  }
318
319  private appendToIncomingBuffer(array: Uint8Array): void {
320    this.incomingBuffer.set(array, this.bufferedPartLength);
321    this.bufferedPartLength += array.length;
322  }
323
324  private parseFrame(frameBuffer: Uint8Array): void {
325    // Get a copy of the ArrayBuffer to avoid the original being overriden.
326    // See 170256902#comment21
327    const frame = IPCFrame.decode(frameBuffer.slice());
328    if (frame.msg === 'msgBindServiceReply') {
329      const msgBindServiceReply = frame.msgBindServiceReply;
330      /* eslint-disable @typescript-eslint/strict-boolean-expressions */
331      if (
332        msgBindServiceReply &&
333        msgBindServiceReply.methods &&
334        msgBindServiceReply.serviceId
335      ) {
336        /* eslint-enable */
337        assertTrue(msgBindServiceReply.success === true);
338        this.availableMethods = msgBindServiceReply.methods;
339        this.serviceId = msgBindServiceReply.serviceId;
340        this.resolveBindingPromise.resolve();
341      }
342    } else if (frame.msg === 'msgInvokeMethodReply') {
343      const msgInvokeMethodReply = frame.msgInvokeMethodReply;
344      // We process messages without a `replyProto` field (for instance
345      // `FreeBuffers` does not have `replyProto`). However, we ignore messages
346      // without a valid 'success' field.
347      if (!msgInvokeMethodReply || !msgInvokeMethodReply.success) {
348        return;
349      }
350
351      const method = this.requestMethods.get(frame.requestId);
352      if (!method) {
353        this.raiseError(`${PARSING_UNKNWON_REQUEST_ID}: ${frame.requestId}`);
354        return;
355      }
356      const decoder = decoders.get(method);
357      if (decoder === undefined) {
358        this.raiseError(`${PARSING_UNABLE_TO_DECODE_METHOD}: ${method}`);
359        return;
360      }
361      const data = {...decoder(msgInvokeMethodReply.replyProto)};
362
363      if (method === 'ReadBuffers') {
364        if (data.slices) {
365          for (const slice of data.slices) {
366            this.partialPacket.push(slice);
367            if (slice.lastSliceForPacket) {
368              let bufferSize = 0;
369              for (const slice of this.partialPacket) {
370                bufferSize += slice.data!.length;
371              }
372              const tracePacket = new Uint8Array(bufferSize);
373              let written = 0;
374              for (const slice of this.partialPacket) {
375                const data = slice.data!;
376                tracePacket.set(data, written);
377                written += data.length;
378              }
379              this.traceProtoWriter.uint32(TRACE_PACKET_PROTO_TAG);
380              this.traceProtoWriter.bytes(tracePacket);
381              this.partialPacket = [];
382            }
383          }
384        }
385        if (msgInvokeMethodReply.hasMore === false) {
386          this.tracingSessionListener.onTraceData(
387            this.traceProtoWriter.finish(),
388          );
389          this.terminateConnection();
390        }
391      } else if (method === 'EnableTracing') {
392        const readBuffersRequestProto = ReadBuffersRequest.encode(
393          new ReadBuffersRequest(),
394        ).finish();
395        this.rpcInvoke('ReadBuffers', readBuffersRequestProto);
396      } else if (method === 'GetTraceStats') {
397        const maybePendingStatsMessage = this.pendingStatsMessages.shift();
398        if (maybePendingStatsMessage) {
399          maybePendingStatsMessage.resolve(data?.traceStats?.bufferStats || []);
400        }
401      } else if (method === 'FreeBuffers') {
402        // No action required. If we successfully read a whole trace,
403        // we close the connection. Alternatively, if the tracing finishes
404        // with an exception or if the user cancels it, we also close the
405        // connection.
406      } else if (method === 'DisableTracing') {
407        // No action required. Same reasoning as for FreeBuffers.
408      } else if (method === 'QueryServiceState') {
409        const dataSources =
410          (data as QueryServiceStateResponse)?.serviceState?.dataSources || [];
411        for (const dataSource of dataSources) {
412          const name = dataSource?.dsDescriptor?.name;
413          if (name) {
414            this.pendingDataSources.push({
415              name,
416              descriptor: dataSource.dsDescriptor,
417            });
418          }
419        }
420        if (msgInvokeMethodReply.hasMore === false) {
421          assertExists(this.pendingQssMessage).resolve(this.pendingDataSources);
422          this.pendingDataSources = [];
423          this.pendingQssMessage = undefined;
424        }
425      } else {
426        this.raiseError(`${PARSING_UNRECOGNIZED_PORT}: ${method}`);
427      }
428    } else {
429      this.raiseError(`${PARSING_UNRECOGNIZED_MESSAGE}: ${frame.msg}`);
430    }
431  }
432
433  private raiseError(message: string): void {
434    this.terminateConnection();
435    this.tracingSessionListener.onError(message);
436  }
437}
438
439const decoders = new Map<string, Function>()
440  .set('EnableTracing', EnableTracingResponse.decode)
441  .set('FreeBuffers', FreeBuffersResponse.decode)
442  .set('ReadBuffers', ReadBuffersResponse.decode)
443  .set('DisableTracing', DisableTracingResponse.decode)
444  .set('GetTraceStats', GetTraceStatsResponse.decode)
445  .set('QueryServiceState', QueryServiceStateResponse.decode);
446