1// Copyright (C) 2019 The Android Open Source Project 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15import * as protobuf from 'protobufjs/minimal'; 16 17import {perfetto} from '../gen/protos'; 18 19import {AdbAuthState, AdbBaseConsumerPort} from './adb_base_controller'; 20import {Adb, AdbStream} from './adb_interfaces'; 21import { 22 isReadBuffersResponse, 23} from './consumer_port_types'; 24import {Consumer} from './record_controller_interfaces'; 25 26enum SocketState { 27 DISCONNECTED, 28 BINDING_IN_PROGRESS, 29 BOUND, 30} 31 32// See wire_protocol.proto for more details. 33const WIRE_PROTOCOL_HEADER_SIZE = 4; 34const MAX_IPC_BUFFER_SIZE = 128 * 1024; 35 36const PROTO_LEN_DELIMITED_WIRE_TYPE = 2; 37const TRACE_PACKET_PROTO_ID = 1; 38const TRACE_PACKET_PROTO_TAG = 39 (TRACE_PACKET_PROTO_ID << 3) | PROTO_LEN_DELIMITED_WIRE_TYPE; 40 41declare type Frame = perfetto.protos.IPCFrame; 42declare type IMethodInfo = 43 perfetto.protos.IPCFrame.BindServiceReply.IMethodInfo; 44declare type ISlice = perfetto.protos.ReadBuffersResponse.ISlice; 45 46interface Command { 47 method: string; 48 params: Uint8Array; 49} 50 51const TRACED_SOCKET = '/dev/socket/traced_consumer'; 52 53export class AdbSocketConsumerPort extends AdbBaseConsumerPort { 54 private socketState = SocketState.DISCONNECTED; 55 56 private socket?: AdbStream; 57 // Wire protocol request ID. After each request it is increased. It is needed 58 // to keep track of the type of request, and parse the response correctly. 59 private requestId = 1; 60 61 // Buffers received wire protocol data. 62 private incomingBuffer = new Uint8Array(MAX_IPC_BUFFER_SIZE); 63 private incomingBufferLen = 0; 64 private frameToParseLen = 0; 65 66 private availableMethods: IMethodInfo[] = []; 67 private serviceId = -1; 68 69 private resolveBindingPromise!: VoidFunction; 70 private requestMethods = new Map<number, string>(); 71 72 // Needed for ReadBufferResponse: all the trace packets are split into 73 // several slices. |partialPacket| is the buffer for them. Once we receive a 74 // slice with the flag |lastSliceForPacket|, a new packet is created. 75 private partialPacket: ISlice[] = []; 76 // Accumulates trace packets into a proto trace file.. 77 private traceProtoWriter = protobuf.Writer.create(); 78 79 private socketCommandQueue: Command[] = []; 80 81 constructor(adb: Adb, consumer: Consumer) { 82 super(adb, consumer); 83 } 84 85 async invoke(method: string, params: Uint8Array) { 86 // ADB connection & authentication is handled by the superclass. 87 console.assert(this.state === AdbAuthState.CONNECTED); 88 this.socketCommandQueue.push({method, params}); 89 90 if (this.socketState === SocketState.BINDING_IN_PROGRESS) return; 91 if (this.socketState === SocketState.DISCONNECTED) { 92 this.socketState = SocketState.BINDING_IN_PROGRESS; 93 await this.listenForMessages(); 94 await this.bind(); 95 this.traceProtoWriter = protobuf.Writer.create(); 96 this.socketState = SocketState.BOUND; 97 } 98 99 console.assert(this.socketState === SocketState.BOUND); 100 101 for (const cmd of this.socketCommandQueue) { 102 this.invokeInternal(cmd.method, cmd.params); 103 } 104 this.socketCommandQueue = []; 105 } 106 107 private invokeInternal(method: string, argsProto: Uint8Array) { 108 // Socket is bound in invoke(). 109 console.assert(this.socketState === SocketState.BOUND); 110 const requestId = this.requestId++; 111 const methodId = this.findMethodId(method); 112 if (methodId === undefined) { 113 // This can happen with 'GetTraceStats': it seems that not all the Android 114 // <= 9 devices support it. 115 console.error(`Method ${method} not supported by the target`); 116 return; 117 } 118 const frame = new perfetto.protos.IPCFrame({ 119 requestId, 120 msgInvokeMethod: new perfetto.protos.IPCFrame.InvokeMethod( 121 {serviceId: this.serviceId, methodId, argsProto}) 122 }); 123 this.requestMethods.set(requestId, method); 124 this.sendFrame(frame); 125 126 if (method === 'EnableTracing') this.setDurationStatus(argsProto); 127 } 128 129 static generateFrameBufferToSend(frame: Frame): Uint8Array { 130 const frameProto: Uint8Array = 131 perfetto.protos.IPCFrame.encode(frame).finish(); 132 const frameLen = frameProto.length; 133 const buf = new Uint8Array(WIRE_PROTOCOL_HEADER_SIZE + frameLen); 134 const dv = new DataView(buf.buffer); 135 dv.setUint32(0, frameProto.length, /* littleEndian */ true); 136 for (let i = 0; i < frameLen; i++) { 137 dv.setUint8(WIRE_PROTOCOL_HEADER_SIZE + i, frameProto[i]); 138 } 139 return buf; 140 } 141 142 async sendFrame(frame: Frame) { 143 console.assert(this.socket !== undefined); 144 if (!this.socket) return; 145 const buf = AdbSocketConsumerPort.generateFrameBufferToSend(frame); 146 await this.socket.write(buf); 147 } 148 149 async listenForMessages() { 150 this.socket = await this.adb.socket(TRACED_SOCKET); 151 this.socket.onData = (raw) => this.handleReceivedData(raw); 152 this.socket.onClose = () => { 153 this.socketState = SocketState.DISCONNECTED; 154 this.socketCommandQueue = []; 155 }; 156 } 157 158 private parseMessageSize(buffer: Uint8Array) { 159 const dv = new DataView(buffer.buffer, buffer.byteOffset, buffer.length); 160 return dv.getUint32(0, true); 161 } 162 163 private parseMessage(frameBuffer: Uint8Array) { 164 const frame = perfetto.protos.IPCFrame.decode(frameBuffer); 165 this.handleIncomingFrame(frame); 166 } 167 168 private incompleteSizeHeader() { 169 if (!this.frameToParseLen) { 170 console.assert(this.incomingBufferLen < WIRE_PROTOCOL_HEADER_SIZE); 171 return true; 172 } 173 return false; 174 } 175 176 private canCompleteSizeHeader(newData: Uint8Array) { 177 return newData.length + this.incomingBufferLen > WIRE_PROTOCOL_HEADER_SIZE; 178 } 179 180 private canParseFullMessage(newData: Uint8Array) { 181 return this.frameToParseLen && 182 this.incomingBufferLen + newData.length >= this.frameToParseLen; 183 } 184 185 private appendToIncomingBuffer(array: Uint8Array) { 186 this.incomingBuffer.set(array, this.incomingBufferLen); 187 this.incomingBufferLen += array.length; 188 } 189 190 handleReceivedData(newData: Uint8Array) { 191 if (this.incompleteSizeHeader() && this.canCompleteSizeHeader(newData)) { 192 const newDataBytesToRead = 193 WIRE_PROTOCOL_HEADER_SIZE - this.incomingBufferLen; 194 // Add to the incoming buffer the remaining bytes to arrive at 195 // WIRE_PROTOCOL_HEADER_SIZE 196 this.appendToIncomingBuffer(newData.subarray(0, newDataBytesToRead)); 197 newData = newData.subarray(newDataBytesToRead); 198 199 this.frameToParseLen = this.parseMessageSize(this.incomingBuffer); 200 this.incomingBufferLen = 0; 201 } 202 203 // Parse all complete messages in incomingBuffer and newData. 204 while (this.canParseFullMessage(newData)) { 205 // All the message is in the newData buffer. 206 if (this.incomingBufferLen === 0) { 207 this.parseMessage(newData.subarray(0, this.frameToParseLen)); 208 newData = newData.subarray(this.frameToParseLen); 209 } else { // We need to complete the local buffer. 210 // Read the remaining part of this message. 211 const bytesToCompleteMessage = 212 this.frameToParseLen - this.incomingBufferLen; 213 this.appendToIncomingBuffer( 214 newData.subarray(0, bytesToCompleteMessage)); 215 this.parseMessage( 216 this.incomingBuffer.subarray(0, this.frameToParseLen)); 217 this.incomingBufferLen = 0; 218 // Remove the data just parsed. 219 newData = newData.subarray(bytesToCompleteMessage); 220 } 221 this.frameToParseLen = 0; 222 if (!this.canCompleteSizeHeader(newData)) break; 223 224 this.frameToParseLen = 225 this.parseMessageSize(newData.subarray(0, WIRE_PROTOCOL_HEADER_SIZE)); 226 newData = newData.subarray(WIRE_PROTOCOL_HEADER_SIZE); 227 } 228 // Buffer the remaining data (part of the next header + message). 229 this.appendToIncomingBuffer(newData); 230 } 231 232 decodeResponse( 233 requestId: number, responseProto: Uint8Array, hasMore = false) { 234 const method = this.requestMethods.get(requestId); 235 if (!method) { 236 console.error(`Unknown request id: ${requestId}`); 237 this.sendErrorMessage(`Wire protocol error.`); 238 return; 239 } 240 const decoder = decoders.get(method); 241 if (decoder === undefined) { 242 console.error(`Unable to decode method: ${method}`); 243 return; 244 } 245 const decodedResponse = decoder(responseProto); 246 const response = {type: `${method}Response`, ...decodedResponse}; 247 248 // TODO(nicomazz): Fix this. 249 // We assemble all the trace and then send it back to the main controller. 250 // This is a temporary solution, that will be changed in a following CL, 251 // because now both the chrome consumer port and the other adb consumer port 252 // send back the entire trace, while the correct behavior should be to send 253 // back the slices, that are assembled by the main record controller. 254 if (isReadBuffersResponse(response)) { 255 if (response.slices) this.handleSlices(response.slices); 256 if (!hasMore) this.sendReadBufferResponse(); 257 return; 258 } 259 this.sendMessage(response); 260 } 261 262 handleSlices(slices: ISlice[]) { 263 for (const slice of slices) { 264 this.partialPacket.push(slice); 265 if (slice.lastSliceForPacket) { 266 const tracePacket = this.generateTracePacket(this.partialPacket); 267 this.traceProtoWriter.uint32(TRACE_PACKET_PROTO_TAG); 268 this.traceProtoWriter.bytes(tracePacket); 269 this.partialPacket = []; 270 } 271 } 272 } 273 274 generateTracePacket(slices: ISlice[]): Uint8Array { 275 let bufferSize = 0; 276 for (const slice of slices) bufferSize += slice.data!.length; 277 const fullBuffer = new Uint8Array(bufferSize); 278 let written = 0; 279 for (const slice of slices) { 280 const data = slice.data!; 281 fullBuffer.set(data, written); 282 written += data.length; 283 } 284 return fullBuffer; 285 } 286 287 sendReadBufferResponse() { 288 this.sendMessage(this.generateChunkReadResponse( 289 this.traceProtoWriter.finish(), /* last */ true)); 290 } 291 292 bind() { 293 console.assert(this.socket !== undefined); 294 const requestId = this.requestId++; 295 const frame = new perfetto.protos.IPCFrame({ 296 requestId, 297 msgBindService: new perfetto.protos.IPCFrame.BindService( 298 {serviceName: 'ConsumerPort'}) 299 }); 300 return new Promise((resolve, _) => { 301 this.resolveBindingPromise = resolve; 302 this.sendFrame(frame); 303 }); 304 } 305 306 findMethodId(method: string): number|undefined { 307 const methodObject = this.availableMethods.find((m) => m.name === method); 308 if (methodObject && methodObject.id) return methodObject.id; 309 return undefined; 310 } 311 312 static async hasSocketAccess(device: USBDevice, adb: Adb): Promise<boolean> { 313 await adb.connect(device); 314 try { 315 const socket = await adb.socket(TRACED_SOCKET); 316 socket.close(); 317 return true; 318 } catch (e) { 319 return false; 320 } 321 } 322 323 handleIncomingFrame(frame: perfetto.protos.IPCFrame) { 324 const requestId = frame.requestId as number; 325 switch (frame.msg) { 326 case 'msgBindServiceReply': { 327 const msgBindServiceReply = frame.msgBindServiceReply; 328 if (msgBindServiceReply && msgBindServiceReply.methods && 329 msgBindServiceReply.serviceId) { 330 console.assert(msgBindServiceReply.success); 331 this.availableMethods = msgBindServiceReply.methods; 332 this.serviceId = msgBindServiceReply.serviceId; 333 this.resolveBindingPromise(); 334 this.resolveBindingPromise = () => {}; 335 } 336 return; 337 } 338 case 'msgInvokeMethodReply': { 339 const msgInvokeMethodReply = frame.msgInvokeMethodReply; 340 if (msgInvokeMethodReply && msgInvokeMethodReply.replyProto) { 341 if (!msgInvokeMethodReply.success) { 342 console.error( 343 'Unsuccessful method invocation: ', msgInvokeMethodReply); 344 return; 345 } 346 this.decodeResponse( 347 requestId, 348 msgInvokeMethodReply.replyProto, 349 msgInvokeMethodReply.hasMore === true); 350 } 351 return; 352 } 353 default: 354 console.error(`not recognized frame message: ${frame.msg}`); 355 } 356 } 357} 358 359const decoders = 360 new Map<string, Function>() 361 .set('EnableTracing', perfetto.protos.EnableTracingResponse.decode) 362 .set('FreeBuffers', perfetto.protos.FreeBuffersResponse.decode) 363 .set('ReadBuffers', perfetto.protos.ReadBuffersResponse.decode) 364 .set('DisableTracing', perfetto.protos.DisableTracingResponse.decode) 365 .set('GetTraceStats', perfetto.protos.GetTraceStatsResponse.decode);