• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright (C) 2019 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15import * as protobuf from 'protobufjs/minimal';
16
17import {perfetto} from '../gen/protos';
18
19import {AdbAuthState, AdbBaseConsumerPort} from './adb_base_controller';
20import {Adb, AdbStream} from './adb_interfaces';
21import {
22  isReadBuffersResponse,
23} from './consumer_port_types';
24import {Consumer} from './record_controller_interfaces';
25
26enum SocketState {
27  DISCONNECTED,
28  BINDING_IN_PROGRESS,
29  BOUND,
30}
31
32// See wire_protocol.proto for more details.
33const WIRE_PROTOCOL_HEADER_SIZE = 4;
34const MAX_IPC_BUFFER_SIZE = 128 * 1024;
35
36const PROTO_LEN_DELIMITED_WIRE_TYPE = 2;
37const TRACE_PACKET_PROTO_ID = 1;
38const TRACE_PACKET_PROTO_TAG =
39    (TRACE_PACKET_PROTO_ID << 3) | PROTO_LEN_DELIMITED_WIRE_TYPE;
40
41declare type Frame = perfetto.protos.IPCFrame;
42declare type IMethodInfo =
43    perfetto.protos.IPCFrame.BindServiceReply.IMethodInfo;
44declare type ISlice = perfetto.protos.ReadBuffersResponse.ISlice;
45
46interface Command {
47  method: string;
48  params: Uint8Array;
49}
50
51const TRACED_SOCKET = '/dev/socket/traced_consumer';
52
53export class AdbSocketConsumerPort extends AdbBaseConsumerPort {
54  private socketState = SocketState.DISCONNECTED;
55
56  private socket?: AdbStream;
57  // Wire protocol request ID. After each request it is increased. It is needed
58  // to keep track of the type of request, and parse the response correctly.
59  private requestId = 1;
60
61  // Buffers received wire protocol data.
62  private incomingBuffer = new Uint8Array(MAX_IPC_BUFFER_SIZE);
63  private incomingBufferLen = 0;
64  private frameToParseLen = 0;
65
66  private availableMethods: IMethodInfo[] = [];
67  private serviceId = -1;
68
69  private resolveBindingPromise!: VoidFunction;
70  private requestMethods = new Map<number, string>();
71
72  // Needed for ReadBufferResponse: all the trace packets are split into
73  // several slices. |partialPacket| is the buffer for them. Once we receive a
74  // slice with the flag |lastSliceForPacket|, a new packet is created.
75  private partialPacket: ISlice[] = [];
76  // Accumulates trace packets into a proto trace file..
77  private traceProtoWriter = protobuf.Writer.create();
78
79  private socketCommandQueue: Command[] = [];
80
81  constructor(adb: Adb, consumer: Consumer) {
82    super(adb, consumer);
83  }
84
85  async invoke(method: string, params: Uint8Array) {
86    // ADB connection & authentication is handled by the superclass.
87    console.assert(this.state === AdbAuthState.CONNECTED);
88    this.socketCommandQueue.push({method, params});
89
90    if (this.socketState === SocketState.BINDING_IN_PROGRESS) return;
91    if (this.socketState === SocketState.DISCONNECTED) {
92      this.socketState = SocketState.BINDING_IN_PROGRESS;
93      await this.listenForMessages();
94      await this.bind();
95      this.traceProtoWriter = protobuf.Writer.create();
96      this.socketState = SocketState.BOUND;
97    }
98
99    console.assert(this.socketState === SocketState.BOUND);
100
101    for (const cmd of this.socketCommandQueue) {
102      this.invokeInternal(cmd.method, cmd.params);
103    }
104    this.socketCommandQueue = [];
105  }
106
107  private invokeInternal(method: string, argsProto: Uint8Array) {
108    // Socket is bound in invoke().
109    console.assert(this.socketState === SocketState.BOUND);
110    const requestId = this.requestId++;
111    const methodId = this.findMethodId(method);
112    if (methodId === undefined) {
113      // This can happen with 'GetTraceStats': it seems that not all the Android
114      // <= 9 devices support it.
115      console.error(`Method ${method} not supported by the target`);
116      return;
117    }
118    const frame = new perfetto.protos.IPCFrame({
119      requestId,
120      msgInvokeMethod: new perfetto.protos.IPCFrame.InvokeMethod(
121          {serviceId: this.serviceId, methodId, argsProto})
122    });
123    this.requestMethods.set(requestId, method);
124    this.sendFrame(frame);
125
126    if (method === 'EnableTracing') this.setDurationStatus(argsProto);
127  }
128
129  static generateFrameBufferToSend(frame: Frame): Uint8Array {
130    const frameProto: Uint8Array =
131        perfetto.protos.IPCFrame.encode(frame).finish();
132    const frameLen = frameProto.length;
133    const buf = new Uint8Array(WIRE_PROTOCOL_HEADER_SIZE + frameLen);
134    const dv = new DataView(buf.buffer);
135    dv.setUint32(0, frameProto.length, /* littleEndian */ true);
136    for (let i = 0; i < frameLen; i++) {
137      dv.setUint8(WIRE_PROTOCOL_HEADER_SIZE + i, frameProto[i]);
138    }
139    return buf;
140  }
141
142  async sendFrame(frame: Frame) {
143    console.assert(this.socket !== undefined);
144    if (!this.socket) return;
145    const buf = AdbSocketConsumerPort.generateFrameBufferToSend(frame);
146    await this.socket.write(buf);
147  }
148
149  async listenForMessages() {
150    this.socket = await this.adb.socket(TRACED_SOCKET);
151    this.socket.onData = (raw) => this.handleReceivedData(raw);
152    this.socket.onClose = () => {
153      this.socketState = SocketState.DISCONNECTED;
154      this.socketCommandQueue = [];
155    };
156  }
157
158  private parseMessageSize(buffer: Uint8Array) {
159    const dv = new DataView(buffer.buffer, buffer.byteOffset, buffer.length);
160    return dv.getUint32(0, true);
161  }
162
163  private parseMessage(frameBuffer: Uint8Array) {
164    const frame = perfetto.protos.IPCFrame.decode(frameBuffer);
165    this.handleIncomingFrame(frame);
166  }
167
168  private incompleteSizeHeader() {
169    if (!this.frameToParseLen) {
170      console.assert(this.incomingBufferLen < WIRE_PROTOCOL_HEADER_SIZE);
171      return true;
172    }
173    return false;
174  }
175
176  private canCompleteSizeHeader(newData: Uint8Array) {
177    return newData.length + this.incomingBufferLen > WIRE_PROTOCOL_HEADER_SIZE;
178  }
179
180  private canParseFullMessage(newData: Uint8Array) {
181    return this.frameToParseLen &&
182        this.incomingBufferLen + newData.length >= this.frameToParseLen;
183  }
184
185  private appendToIncomingBuffer(array: Uint8Array) {
186    this.incomingBuffer.set(array, this.incomingBufferLen);
187    this.incomingBufferLen += array.length;
188  }
189
190  handleReceivedData(newData: Uint8Array) {
191    if (this.incompleteSizeHeader() && this.canCompleteSizeHeader(newData)) {
192      const newDataBytesToRead =
193          WIRE_PROTOCOL_HEADER_SIZE - this.incomingBufferLen;
194      // Add to the incoming buffer the remaining bytes to arrive at
195      // WIRE_PROTOCOL_HEADER_SIZE
196      this.appendToIncomingBuffer(newData.subarray(0, newDataBytesToRead));
197      newData = newData.subarray(newDataBytesToRead);
198
199      this.frameToParseLen = this.parseMessageSize(this.incomingBuffer);
200      this.incomingBufferLen = 0;
201    }
202
203    // Parse all complete messages in incomingBuffer and newData.
204    while (this.canParseFullMessage(newData)) {
205      // All the message is in the newData buffer.
206      if (this.incomingBufferLen === 0) {
207        this.parseMessage(newData.subarray(0, this.frameToParseLen));
208        newData = newData.subarray(this.frameToParseLen);
209      } else {  // We need to complete the local buffer.
210        // Read the remaining part of this message.
211        const bytesToCompleteMessage =
212            this.frameToParseLen - this.incomingBufferLen;
213        this.appendToIncomingBuffer(
214            newData.subarray(0, bytesToCompleteMessage));
215        this.parseMessage(
216            this.incomingBuffer.subarray(0, this.frameToParseLen));
217        this.incomingBufferLen = 0;
218        // Remove the data just parsed.
219        newData = newData.subarray(bytesToCompleteMessage);
220      }
221      this.frameToParseLen = 0;
222      if (!this.canCompleteSizeHeader(newData)) break;
223
224      this.frameToParseLen =
225          this.parseMessageSize(newData.subarray(0, WIRE_PROTOCOL_HEADER_SIZE));
226      newData = newData.subarray(WIRE_PROTOCOL_HEADER_SIZE);
227    }
228    // Buffer the remaining data (part of the next header + message).
229    this.appendToIncomingBuffer(newData);
230  }
231
232  decodeResponse(
233      requestId: number, responseProto: Uint8Array, hasMore = false) {
234    const method = this.requestMethods.get(requestId);
235    if (!method) {
236      console.error(`Unknown request id: ${requestId}`);
237      this.sendErrorMessage(`Wire protocol error.`);
238      return;
239    }
240    const decoder = decoders.get(method);
241    if (decoder === undefined) {
242      console.error(`Unable to decode method: ${method}`);
243      return;
244    }
245    const decodedResponse = decoder(responseProto);
246    const response = {type: `${method}Response`, ...decodedResponse};
247
248    // TODO(nicomazz): Fix this.
249    // We assemble all the trace and then send it back to the main controller.
250    // This is a temporary solution, that will be changed in a following CL,
251    // because now both the chrome consumer port and the other adb consumer port
252    // send back the entire trace, while the correct behavior should be to send
253    // back the slices, that are assembled by the main record controller.
254    if (isReadBuffersResponse(response)) {
255      if (response.slices) this.handleSlices(response.slices);
256      if (!hasMore) this.sendReadBufferResponse();
257      return;
258    }
259    this.sendMessage(response);
260  }
261
262  handleSlices(slices: ISlice[]) {
263    for (const slice of slices) {
264      this.partialPacket.push(slice);
265      if (slice.lastSliceForPacket) {
266        const tracePacket = this.generateTracePacket(this.partialPacket);
267        this.traceProtoWriter.uint32(TRACE_PACKET_PROTO_TAG);
268        this.traceProtoWriter.bytes(tracePacket);
269        this.partialPacket = [];
270      }
271    }
272  }
273
274  generateTracePacket(slices: ISlice[]): Uint8Array {
275    let bufferSize = 0;
276    for (const slice of slices) bufferSize += slice.data!.length;
277    const fullBuffer = new Uint8Array(bufferSize);
278    let written = 0;
279    for (const slice of slices) {
280      const data = slice.data!;
281      fullBuffer.set(data, written);
282      written += data.length;
283    }
284    return fullBuffer;
285  }
286
287  sendReadBufferResponse() {
288    this.sendMessage(this.generateChunkReadResponse(
289        this.traceProtoWriter.finish(), /* last */ true));
290  }
291
292  bind() {
293    console.assert(this.socket !== undefined);
294    const requestId = this.requestId++;
295    const frame = new perfetto.protos.IPCFrame({
296      requestId,
297      msgBindService: new perfetto.protos.IPCFrame.BindService(
298          {serviceName: 'ConsumerPort'})
299    });
300    return new Promise((resolve, _) => {
301      this.resolveBindingPromise = resolve;
302      this.sendFrame(frame);
303    });
304  }
305
306  findMethodId(method: string): number|undefined {
307    const methodObject = this.availableMethods.find((m) => m.name === method);
308    if (methodObject && methodObject.id) return methodObject.id;
309    return undefined;
310  }
311
312  static async hasSocketAccess(device: USBDevice, adb: Adb): Promise<boolean> {
313    await adb.connect(device);
314    try {
315      const socket = await adb.socket(TRACED_SOCKET);
316      socket.close();
317      return true;
318    } catch (e) {
319      return false;
320    }
321  }
322
323  handleIncomingFrame(frame: perfetto.protos.IPCFrame) {
324    const requestId = frame.requestId as number;
325    switch (frame.msg) {
326      case 'msgBindServiceReply': {
327        const msgBindServiceReply = frame.msgBindServiceReply;
328        if (msgBindServiceReply && msgBindServiceReply.methods &&
329            msgBindServiceReply.serviceId) {
330          console.assert(msgBindServiceReply.success);
331          this.availableMethods = msgBindServiceReply.methods;
332          this.serviceId = msgBindServiceReply.serviceId;
333          this.resolveBindingPromise();
334          this.resolveBindingPromise = () => {};
335        }
336        return;
337      }
338      case 'msgInvokeMethodReply': {
339        const msgInvokeMethodReply = frame.msgInvokeMethodReply;
340        if (msgInvokeMethodReply && msgInvokeMethodReply.replyProto) {
341          if (!msgInvokeMethodReply.success) {
342            console.error(
343                'Unsuccessful method invocation: ', msgInvokeMethodReply);
344            return;
345          }
346          this.decodeResponse(
347              requestId,
348              msgInvokeMethodReply.replyProto,
349              msgInvokeMethodReply.hasMore === true);
350        }
351        return;
352      }
353      default:
354        console.error(`not recognized frame message: ${frame.msg}`);
355    }
356  }
357}
358
359const decoders =
360    new Map<string, Function>()
361        .set('EnableTracing', perfetto.protos.EnableTracingResponse.decode)
362        .set('FreeBuffers', perfetto.protos.FreeBuffersResponse.decode)
363        .set('ReadBuffers', perfetto.protos.ReadBuffersResponse.decode)
364        .set('DisableTracing', perfetto.protos.DisableTracingResponse.decode)
365        .set('GetTraceStats', perfetto.protos.GetTraceStatsResponse.decode);