• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright (C) 2022 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15import protobuf from 'protobufjs/minimal';
16
17import {defer, Deferred} from '../../base/deferred';
18import {assertExists, assertFalse, assertTrue} from '../../base/logging';
19import {
20  DisableTracingRequest,
21  DisableTracingResponse,
22  EnableTracingRequest,
23  EnableTracingResponse,
24  FreeBuffersRequest,
25  FreeBuffersResponse,
26  GetTraceStatsRequest,
27  GetTraceStatsResponse,
28  IBufferStats,
29  IMethodInfo,
30  IPCFrame,
31  ISlice,
32  QueryServiceStateRequest,
33  QueryServiceStateResponse,
34  ReadBuffersRequest,
35  ReadBuffersResponse,
36  TraceConfig,
37} from '../protos';
38
39import {RecordingError} from './recording_error_handling';
40import {
41  ByteStream,
42  DataSource,
43  TracingSession,
44  TracingSessionListener,
45} from './recording_interfaces_v2';
46import {
47  BUFFER_USAGE_INCORRECT_FORMAT,
48  BUFFER_USAGE_NOT_ACCESSIBLE,
49  PARSING_UNABLE_TO_DECODE_METHOD,
50  PARSING_UNKNWON_REQUEST_ID,
51  PARSING_UNRECOGNIZED_MESSAGE,
52  PARSING_UNRECOGNIZED_PORT,
53  RECORDING_IN_PROGRESS,
54} from './recording_utils';
55
56// See wire_protocol.proto for more details.
57const WIRE_PROTOCOL_HEADER_SIZE = 4;
58// See basic_types.h (kIPCBufferSize) for more details.
59const MAX_IPC_BUFFER_SIZE = 128 * 1024;
60
61const PROTO_LEN_DELIMITED_WIRE_TYPE = 2;
62const TRACE_PACKET_PROTO_ID = 1;
63const TRACE_PACKET_PROTO_TAG =
64    (TRACE_PACKET_PROTO_ID << 3) | PROTO_LEN_DELIMITED_WIRE_TYPE;
65
66function parseMessageSize(buffer: Uint8Array) {
67  const dv = new DataView(buffer.buffer, buffer.byteOffset, buffer.length);
68  return dv.getUint32(0, true);
69}
70
71// This class implements the protocol described in
72// https://perfetto.dev/docs/design-docs/api-and-abi#tracing-protocol-abi
73export class TracedTracingSession implements TracingSession {
74  // Buffers received wire protocol data.
75  private incomingBuffer = new Uint8Array(MAX_IPC_BUFFER_SIZE);
76  private bufferedPartLength = 0;
77  private currentFrameLength?: number;
78
79  private availableMethods: IMethodInfo[] = [];
80  private serviceId = -1;
81
82  private resolveBindingPromise!: Deferred<void>;
83  private requestMethods = new Map<number, string>();
84
85  // Needed for ReadBufferResponse: all the trace packets are split into
86  // several slices. |partialPacket| is the buffer for them. Once we receive a
87  // slice with the flag |lastSliceForPacket|, a new packet is created.
88  private partialPacket: ISlice[] = [];
89  // Accumulates trace packets into a proto trace file..
90  private traceProtoWriter = protobuf.Writer.create();
91
92  // Accumulates DataSource objects from QueryServiceStateResponse,
93  // which can have >1 replies for each query
94  // go/codesearch/android/external/perfetto/protos/
95  // perfetto/ipc/consumer_port.proto;l=243-246
96  private pendingDataSources: DataSource[] = [];
97
98  // For concurrent calls to 'QueryServiceState', we return the same value.
99  private pendingQssMessage?: Deferred<DataSource[]>;
100
101  // Wire protocol request ID. After each request it is increased. It is needed
102  // to keep track of the type of request, and parse the response correctly.
103  private requestId = 1;
104
105  private pendingStatsMessages = new Array<Deferred<IBufferStats[]>>();
106
107  // The bytestream is obtained when creating a connection with a target.
108  // For instance, the AdbStream is obtained from a connection with an Adb
109  // device.
110  constructor(
111      private byteStream: ByteStream,
112      private tracingSessionListener: TracingSessionListener) {
113    this.byteStream.addOnStreamDataCallback(
114        (data) => this.handleReceivedData(data));
115    this.byteStream.addOnStreamCloseCallback(() => this.clearState());
116  }
117
118  queryServiceState(): Promise<DataSource[]> {
119    if (this.pendingQssMessage) {
120      return this.pendingQssMessage;
121    }
122
123    const requestProto =
124        QueryServiceStateRequest.encode(new QueryServiceStateRequest())
125            .finish();
126    this.rpcInvoke('QueryServiceState', requestProto);
127
128    return this.pendingQssMessage = defer<DataSource[]>();
129  }
130
131  start(config: TraceConfig): void {
132    const duration = config.durationMs;
133    this.tracingSessionListener.onStatus(`${RECORDING_IN_PROGRESS}${
134        duration ? ' for ' + duration.toString() + ' ms' : ''}...`);
135
136    const enableTracingRequest = new EnableTracingRequest();
137    enableTracingRequest.traceConfig = config;
138    const enableTracingRequestProto =
139        EnableTracingRequest.encode(enableTracingRequest).finish();
140    this.rpcInvoke('EnableTracing', enableTracingRequestProto);
141  }
142
143  cancel(): void {
144    this.terminateConnection();
145  }
146
147  stop(): void {
148    const requestProto =
149        DisableTracingRequest.encode(new DisableTracingRequest()).finish();
150    this.rpcInvoke('DisableTracing', requestProto);
151  }
152
153  async getTraceBufferUsage(): Promise<number> {
154    if (!this.byteStream.isConnected()) {
155      // TODO(octaviant): make this more in line with the other trace buffer
156      //  error cases.
157      return 0;
158    }
159    const bufferStats = await this.getBufferStats();
160    let percentageUsed = -1;
161    for (const buffer of bufferStats) {
162      if (!Number.isFinite(buffer.bytesWritten) ||
163          !Number.isFinite(buffer.bufferSize)) {
164        continue;
165      }
166      const used = assertExists(buffer.bytesWritten);
167      const total = assertExists(buffer.bufferSize);
168      if (total >= 0) {
169        percentageUsed = Math.max(percentageUsed, used / total);
170      }
171    }
172
173    if (percentageUsed === -1) {
174      return Promise.reject(new RecordingError(BUFFER_USAGE_INCORRECT_FORMAT));
175    }
176    return percentageUsed;
177  }
178
179  initConnection(): Promise<void> {
180    // bind IPC methods
181    const requestId = this.requestId++;
182    const frame = new IPCFrame({
183      requestId,
184      msgBindService: new IPCFrame.BindService({serviceName: 'ConsumerPort'}),
185    });
186    this.writeFrame(frame);
187
188    // We shouldn't bind multiple times to the service in the same tracing
189    // session.
190    assertFalse(!!this.resolveBindingPromise);
191    this.resolveBindingPromise = defer<void>();
192    return this.resolveBindingPromise;
193  }
194
195  private getBufferStats(): Promise<IBufferStats[]> {
196    const getTraceStatsRequestProto =
197        GetTraceStatsRequest.encode(new GetTraceStatsRequest()).finish();
198    try {
199      this.rpcInvoke('GetTraceStats', getTraceStatsRequestProto);
200    } catch (e) {
201      // GetTraceStats was introduced only on Android 10.
202      this.raiseError(e);
203    }
204
205    const statsMessage = defer<IBufferStats[]>();
206    this.pendingStatsMessages.push(statsMessage);
207    return statsMessage;
208  }
209
210  private terminateConnection(): void {
211    this.clearState();
212    const requestProto =
213        FreeBuffersRequest.encode(new FreeBuffersRequest()).finish();
214    this.rpcInvoke('FreeBuffers', requestProto);
215    this.byteStream.close();
216  }
217
218  private clearState() {
219    for (const statsMessage of this.pendingStatsMessages) {
220      statsMessage.reject(new RecordingError(BUFFER_USAGE_NOT_ACCESSIBLE));
221    }
222    this.pendingStatsMessages = [];
223    this.pendingDataSources = [];
224    this.pendingQssMessage = undefined;
225  }
226
227  private rpcInvoke(methodName: string, argsProto: Uint8Array): void {
228    if (!this.byteStream.isConnected()) {
229      return;
230    }
231    const method = this.availableMethods.find((m) => m.name === methodName);
232    if (!method || !method.id) {
233      throw new RecordingError(
234          `Method ${methodName} not supported by the target`);
235    }
236    const requestId = this.requestId++;
237    const frame = new IPCFrame({
238      requestId,
239      msgInvokeMethod: new IPCFrame.InvokeMethod(
240          {serviceId: this.serviceId, methodId: method.id, argsProto}),
241    });
242    this.requestMethods.set(requestId, methodName);
243    this.writeFrame(frame);
244  }
245
246  private writeFrame(frame: IPCFrame): void {
247    const frameProto: Uint8Array = IPCFrame.encode(frame).finish();
248    const frameLen = frameProto.length;
249    const buf = new Uint8Array(WIRE_PROTOCOL_HEADER_SIZE + frameLen);
250    const dv = new DataView(buf.buffer);
251    dv.setUint32(0, frameProto.length, /* littleEndian */ true);
252    for (let i = 0; i < frameLen; i++) {
253      dv.setUint8(WIRE_PROTOCOL_HEADER_SIZE + i, frameProto[i]);
254    }
255    this.byteStream.write(buf);
256  }
257
258  private handleReceivedData(rawData: Uint8Array): void {
259    // we parse the length of the next frame if it's available
260    if (this.currentFrameLength === undefined &&
261        this.canCompleteLengthHeader(rawData)) {
262      const remainingFrameBytes =
263          WIRE_PROTOCOL_HEADER_SIZE - this.bufferedPartLength;
264      this.appendToIncomingBuffer(rawData.subarray(0, remainingFrameBytes));
265      rawData = rawData.subarray(remainingFrameBytes);
266
267      this.currentFrameLength = parseMessageSize(this.incomingBuffer);
268      this.bufferedPartLength = 0;
269    }
270
271    // Parse all complete frames.
272    while (this.currentFrameLength !== undefined &&
273           this.bufferedPartLength + rawData.length >=
274               this.currentFrameLength) {
275      // Read the remaining part of this message.
276      const bytesToCompleteMessage =
277          this.currentFrameLength - this.bufferedPartLength;
278      this.appendToIncomingBuffer(rawData.subarray(0, bytesToCompleteMessage));
279      this.parseFrame(this.incomingBuffer.subarray(0, this.currentFrameLength));
280      this.bufferedPartLength = 0;
281      // Remove the data just parsed.
282      rawData = rawData.subarray(bytesToCompleteMessage);
283
284      if (!this.canCompleteLengthHeader(rawData)) {
285        this.currentFrameLength = undefined;
286        break;
287      }
288      this.currentFrameLength = parseMessageSize(rawData);
289      rawData = rawData.subarray(WIRE_PROTOCOL_HEADER_SIZE);
290    }
291
292    // Buffer the remaining data (part of the next message).
293    this.appendToIncomingBuffer(rawData);
294  }
295
296  private canCompleteLengthHeader(newData: Uint8Array): boolean {
297    return newData.length + this.bufferedPartLength > WIRE_PROTOCOL_HEADER_SIZE;
298  }
299
300  private appendToIncomingBuffer(array: Uint8Array): void {
301    this.incomingBuffer.set(array, this.bufferedPartLength);
302    this.bufferedPartLength += array.length;
303  }
304
305  private parseFrame(frameBuffer: Uint8Array): void {
306    // Get a copy of the ArrayBuffer to avoid the original being overriden.
307    // See 170256902#comment21
308    const frame = IPCFrame.decode(frameBuffer.slice());
309    if (frame.msg === 'msgBindServiceReply') {
310      const msgBindServiceReply = frame.msgBindServiceReply;
311      if (msgBindServiceReply && msgBindServiceReply.methods &&
312          msgBindServiceReply.serviceId) {
313        assertTrue(msgBindServiceReply.success === true);
314        this.availableMethods = msgBindServiceReply.methods;
315        this.serviceId = msgBindServiceReply.serviceId;
316        this.resolveBindingPromise.resolve();
317      }
318    } else if (frame.msg === 'msgInvokeMethodReply') {
319      const msgInvokeMethodReply = frame.msgInvokeMethodReply;
320      // We process messages without a `replyProto` field (for instance
321      // `FreeBuffers` does not have `replyProto`). However, we ignore messages
322      // without a valid 'success' field.
323      if (!msgInvokeMethodReply || !msgInvokeMethodReply.success) {
324        return;
325      }
326
327      const method = this.requestMethods.get(frame.requestId);
328      if (!method) {
329        this.raiseError(`${PARSING_UNKNWON_REQUEST_ID}: ${frame.requestId}`);
330        return;
331      }
332      const decoder = decoders.get(method);
333      if (decoder === undefined) {
334        this.raiseError(`${PARSING_UNABLE_TO_DECODE_METHOD}: ${method}`);
335        return;
336      }
337      const data = {...decoder(msgInvokeMethodReply.replyProto)};
338
339      if (method === 'ReadBuffers') {
340        if (data.slices) {
341          for (const slice of data.slices) {
342            this.partialPacket.push(slice);
343            if (slice.lastSliceForPacket) {
344              let bufferSize = 0;
345              for (const slice of this.partialPacket) {
346                bufferSize += slice.data!.length;
347              }
348              const tracePacket = new Uint8Array(bufferSize);
349              let written = 0;
350              for (const slice of this.partialPacket) {
351                const data = slice.data!;
352                tracePacket.set(data, written);
353                written += data.length;
354              }
355              this.traceProtoWriter.uint32(TRACE_PACKET_PROTO_TAG);
356              this.traceProtoWriter.bytes(tracePacket);
357              this.partialPacket = [];
358            }
359          }
360        }
361        if (msgInvokeMethodReply.hasMore === false) {
362          this.tracingSessionListener.onTraceData(
363              this.traceProtoWriter.finish());
364          this.terminateConnection();
365        }
366      } else if (method === 'EnableTracing') {
367        const readBuffersRequestProto =
368            ReadBuffersRequest.encode(new ReadBuffersRequest()).finish();
369        this.rpcInvoke('ReadBuffers', readBuffersRequestProto);
370      } else if (method === 'GetTraceStats') {
371        const maybePendingStatsMessage = this.pendingStatsMessages.shift();
372        if (maybePendingStatsMessage) {
373          maybePendingStatsMessage.resolve(data?.traceStats?.bufferStats || []);
374        }
375      } else if (method === 'FreeBuffers') {
376        // No action required. If we successfully read a whole trace,
377        // we close the connection. Alternatively, if the tracing finishes
378        // with an exception or if the user cancels it, we also close the
379        // connection.
380      } else if (method === 'DisableTracing') {
381        // No action required. Same reasoning as for FreeBuffers.
382      } else if (method === 'QueryServiceState') {
383        const dataSources =
384            (data as QueryServiceStateResponse)?.serviceState?.dataSources ||
385            [];
386        for (const dataSource of dataSources) {
387          const name = dataSource?.dsDescriptor?.name;
388          if (name) {
389            this.pendingDataSources.push(
390                {name, descriptor: dataSource.dsDescriptor});
391          }
392        }
393        if (msgInvokeMethodReply.hasMore === false) {
394          assertExists(this.pendingQssMessage).resolve(this.pendingDataSources);
395          this.pendingDataSources = [];
396          this.pendingQssMessage = undefined;
397        }
398      } else {
399        this.raiseError(`${PARSING_UNRECOGNIZED_PORT}: ${method}`);
400      }
401    } else {
402      this.raiseError(`${PARSING_UNRECOGNIZED_MESSAGE}: ${frame.msg}`);
403    }
404  }
405
406  private raiseError(message: string): void {
407    this.terminateConnection();
408    this.tracingSessionListener.onError(message);
409  }
410}
411
412const decoders =
413    new Map<string, Function>()
414        .set('EnableTracing', EnableTracingResponse.decode)
415        .set('FreeBuffers', FreeBuffersResponse.decode)
416        .set('ReadBuffers', ReadBuffersResponse.decode)
417        .set('DisableTracing', DisableTracingResponse.decode)
418        .set('GetTraceStats', GetTraceStatsResponse.decode)
419        .set('QueryServiceState', QueryServiceStateResponse.decode);
420