1// Copyright (C) 2019 The Android Open Source Project 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15import protobuf from 'protobufjs/minimal'; 16 17import { 18 DisableTracingResponse, 19 EnableTracingResponse, 20 FreeBuffersResponse, 21 GetTraceStatsResponse, 22 IPCFrame, 23 ReadBuffersResponse, 24} from '../protos'; 25 26import {AdbBaseConsumerPort, AdbConnectionState} from './adb_base_controller'; 27import {Adb, AdbStream} from './adb_interfaces'; 28import {isReadBuffersResponse} from './consumer_port_types'; 29import {Consumer} from './record_controller_interfaces'; 30 31enum SocketState { 32 DISCONNECTED, 33 BINDING_IN_PROGRESS, 34 BOUND, 35} 36 37// See wire_protocol.proto for more details. 38const WIRE_PROTOCOL_HEADER_SIZE = 4; 39const MAX_IPC_BUFFER_SIZE = 128 * 1024; 40 41const PROTO_LEN_DELIMITED_WIRE_TYPE = 2; 42const TRACE_PACKET_PROTO_ID = 1; 43const TRACE_PACKET_PROTO_TAG = 44 (TRACE_PACKET_PROTO_ID << 3) | PROTO_LEN_DELIMITED_WIRE_TYPE; 45 46declare type Frame = IPCFrame; 47declare type IMethodInfo = IPCFrame.BindServiceReply.IMethodInfo; 48declare type ISlice = ReadBuffersResponse.ISlice; 49 50interface Command { 51 method: string; 52 params: Uint8Array; 53} 54 55const TRACED_SOCKET = '/dev/socket/traced_consumer'; 56 57export class AdbSocketConsumerPort extends AdbBaseConsumerPort { 58 private socketState = SocketState.DISCONNECTED; 59 60 private socket?: AdbStream; 61 // Wire protocol request ID. After each request it is increased. It is needed 62 // to keep track of the type of request, and parse the response correctly. 63 private requestId = 1; 64 65 // Buffers received wire protocol data. 66 private incomingBuffer = new Uint8Array(MAX_IPC_BUFFER_SIZE); 67 private incomingBufferLen = 0; 68 private frameToParseLen = 0; 69 70 private availableMethods: IMethodInfo[] = []; 71 private serviceId = -1; 72 73 private resolveBindingPromise!: VoidFunction; 74 private requestMethods = new Map<number, string>(); 75 76 // Needed for ReadBufferResponse: all the trace packets are split into 77 // several slices. |partialPacket| is the buffer for them. Once we receive a 78 // slice with the flag |lastSliceForPacket|, a new packet is created. 79 private partialPacket: ISlice[] = []; 80 // Accumulates trace packets into a proto trace file.. 81 private traceProtoWriter = protobuf.Writer.create(); 82 83 private socketCommandQueue: Command[] = []; 84 85 constructor(adb: Adb, consumer: Consumer) { 86 super(adb, consumer); 87 } 88 89 async invoke(method: string, params: Uint8Array) { 90 // ADB connection & authentication is handled by the superclass. 91 console.assert(this.state === AdbConnectionState.CONNECTED); 92 this.socketCommandQueue.push({method, params}); 93 94 if (this.socketState === SocketState.BINDING_IN_PROGRESS) return; 95 if (this.socketState === SocketState.DISCONNECTED) { 96 this.socketState = SocketState.BINDING_IN_PROGRESS; 97 await this.listenForMessages(); 98 await this.bind(); 99 this.traceProtoWriter = protobuf.Writer.create(); 100 this.socketState = SocketState.BOUND; 101 } 102 103 console.assert(this.socketState === SocketState.BOUND); 104 105 for (const cmd of this.socketCommandQueue) { 106 this.invokeInternal(cmd.method, cmd.params); 107 } 108 this.socketCommandQueue = []; 109 } 110 111 private invokeInternal(method: string, argsProto: Uint8Array) { 112 // Socket is bound in invoke(). 113 console.assert(this.socketState === SocketState.BOUND); 114 const requestId = this.requestId++; 115 const methodId = this.findMethodId(method); 116 if (methodId === undefined) { 117 // This can happen with 'GetTraceStats': it seems that not all the Android 118 // <= 9 devices support it. 119 console.error(`Method ${method} not supported by the target`); 120 return; 121 } 122 const frame = new IPCFrame({ 123 requestId, 124 msgInvokeMethod: new IPCFrame.InvokeMethod({ 125 serviceId: this.serviceId, 126 methodId, 127 argsProto, 128 }), 129 }); 130 this.requestMethods.set(requestId, method); 131 this.sendFrame(frame); 132 133 if (method === 'EnableTracing') this.setDurationStatus(argsProto); 134 } 135 136 static generateFrameBufferToSend(frame: Frame): Uint8Array { 137 const frameProto: Uint8Array = IPCFrame.encode(frame).finish(); 138 const frameLen = frameProto.length; 139 const buf = new Uint8Array(WIRE_PROTOCOL_HEADER_SIZE + frameLen); 140 const dv = new DataView(buf.buffer); 141 dv.setUint32(0, frameProto.length, /* littleEndian */ true); 142 for (let i = 0; i < frameLen; i++) { 143 dv.setUint8(WIRE_PROTOCOL_HEADER_SIZE + i, frameProto[i]); 144 } 145 return buf; 146 } 147 148 async sendFrame(frame: Frame) { 149 console.assert(this.socket !== undefined); 150 if (!this.socket) return; 151 const buf = AdbSocketConsumerPort.generateFrameBufferToSend(frame); 152 await this.socket.write(buf); 153 } 154 155 async listenForMessages() { 156 this.socket = await this.adb.socket(TRACED_SOCKET); 157 this.socket.onData = (raw) => this.handleReceivedData(raw); 158 this.socket.onClose = () => { 159 this.socketState = SocketState.DISCONNECTED; 160 this.socketCommandQueue = []; 161 }; 162 } 163 164 private parseMessageSize(buffer: Uint8Array) { 165 const dv = new DataView(buffer.buffer, buffer.byteOffset, buffer.length); 166 return dv.getUint32(0, true); 167 } 168 169 private parseMessage(frameBuffer: Uint8Array) { 170 // Copy message to new array: 171 const buf = new ArrayBuffer(frameBuffer.byteLength); 172 const arr = new Uint8Array(buf); 173 arr.set(frameBuffer); 174 const frame = IPCFrame.decode(arr); 175 this.handleIncomingFrame(frame); 176 } 177 178 private incompleteSizeHeader() { 179 if (!this.frameToParseLen) { 180 console.assert(this.incomingBufferLen < WIRE_PROTOCOL_HEADER_SIZE); 181 return true; 182 } 183 return false; 184 } 185 186 private canCompleteSizeHeader(newData: Uint8Array) { 187 return newData.length + this.incomingBufferLen > WIRE_PROTOCOL_HEADER_SIZE; 188 } 189 190 private canParseFullMessage(newData: Uint8Array) { 191 return ( 192 this.frameToParseLen && 193 this.incomingBufferLen + newData.length >= this.frameToParseLen 194 ); 195 } 196 197 private appendToIncomingBuffer(array: Uint8Array) { 198 this.incomingBuffer.set(array, this.incomingBufferLen); 199 this.incomingBufferLen += array.length; 200 } 201 202 handleReceivedData(newData: Uint8Array) { 203 if (this.incompleteSizeHeader() && this.canCompleteSizeHeader(newData)) { 204 const newDataBytesToRead = 205 WIRE_PROTOCOL_HEADER_SIZE - this.incomingBufferLen; 206 // Add to the incoming buffer the remaining bytes to arrive at 207 // WIRE_PROTOCOL_HEADER_SIZE 208 this.appendToIncomingBuffer(newData.subarray(0, newDataBytesToRead)); 209 newData = newData.subarray(newDataBytesToRead); 210 211 this.frameToParseLen = this.parseMessageSize(this.incomingBuffer); 212 this.incomingBufferLen = 0; 213 } 214 215 // Parse all complete messages in incomingBuffer and newData. 216 // eslint-disable-next-line @typescript-eslint/strict-boolean-expressions 217 while (this.canParseFullMessage(newData)) { 218 // All the message is in the newData buffer. 219 if (this.incomingBufferLen === 0) { 220 this.parseMessage(newData.subarray(0, this.frameToParseLen)); 221 newData = newData.subarray(this.frameToParseLen); 222 } else { 223 // We need to complete the local buffer. 224 // Read the remaining part of this message. 225 const bytesToCompleteMessage = 226 this.frameToParseLen - this.incomingBufferLen; 227 this.appendToIncomingBuffer( 228 newData.subarray(0, bytesToCompleteMessage), 229 ); 230 this.parseMessage( 231 this.incomingBuffer.subarray(0, this.frameToParseLen), 232 ); 233 this.incomingBufferLen = 0; 234 // Remove the data just parsed. 235 newData = newData.subarray(bytesToCompleteMessage); 236 } 237 this.frameToParseLen = 0; 238 if (!this.canCompleteSizeHeader(newData)) break; 239 240 this.frameToParseLen = this.parseMessageSize( 241 newData.subarray(0, WIRE_PROTOCOL_HEADER_SIZE), 242 ); 243 newData = newData.subarray(WIRE_PROTOCOL_HEADER_SIZE); 244 } 245 // Buffer the remaining data (part of the next header + message). 246 this.appendToIncomingBuffer(newData); 247 } 248 249 decodeResponse( 250 requestId: number, 251 responseProto: Uint8Array, 252 hasMore = false, 253 ) { 254 const method = this.requestMethods.get(requestId); 255 if (!method) { 256 console.error(`Unknown request id: ${requestId}`); 257 this.sendErrorMessage(`Wire protocol error.`); 258 return; 259 } 260 const decoder = decoders.get(method); 261 if (decoder === undefined) { 262 console.error(`Unable to decode method: ${method}`); 263 return; 264 } 265 const decodedResponse = decoder(responseProto); 266 const response = {type: `${method}Response`, ...decodedResponse}; 267 268 // TODO(nicomazz): Fix this. 269 // We assemble all the trace and then send it back to the main controller. 270 // This is a temporary solution, that will be changed in a following CL, 271 // because now both the chrome consumer port and the other adb consumer port 272 // send back the entire trace, while the correct behavior should be to send 273 // back the slices, that are assembled by the main record controller. 274 if (isReadBuffersResponse(response)) { 275 if (response.slices) this.handleSlices(response.slices); 276 if (!hasMore) this.sendReadBufferResponse(); 277 return; 278 } 279 this.sendMessage(response); 280 } 281 282 handleSlices(slices: ISlice[]) { 283 for (const slice of slices) { 284 this.partialPacket.push(slice); 285 if (slice.lastSliceForPacket) { 286 const tracePacket = this.generateTracePacket(this.partialPacket); 287 this.traceProtoWriter.uint32(TRACE_PACKET_PROTO_TAG); 288 this.traceProtoWriter.bytes(tracePacket); 289 this.partialPacket = []; 290 } 291 } 292 } 293 294 generateTracePacket(slices: ISlice[]): Uint8Array { 295 let bufferSize = 0; 296 for (const slice of slices) bufferSize += slice.data!.length; 297 const fullBuffer = new Uint8Array(bufferSize); 298 let written = 0; 299 for (const slice of slices) { 300 const data = slice.data!; 301 fullBuffer.set(data, written); 302 written += data.length; 303 } 304 return fullBuffer; 305 } 306 307 sendReadBufferResponse() { 308 this.sendMessage( 309 this.generateChunkReadResponse( 310 this.traceProtoWriter.finish(), 311 /* last */ true, 312 ), 313 ); 314 this.traceProtoWriter = protobuf.Writer.create(); 315 } 316 317 bind() { 318 console.assert(this.socket !== undefined); 319 const requestId = this.requestId++; 320 const frame = new IPCFrame({ 321 requestId, 322 msgBindService: new IPCFrame.BindService({serviceName: 'ConsumerPort'}), 323 }); 324 return new Promise<void>((resolve, _) => { 325 this.resolveBindingPromise = resolve; 326 this.sendFrame(frame); 327 }); 328 } 329 330 findMethodId(method: string): number | undefined { 331 const methodObject = this.availableMethods.find((m) => m.name === method); 332 return methodObject?.id ?? undefined; 333 } 334 335 static async hasSocketAccess(device: USBDevice, adb: Adb): Promise<boolean> { 336 await adb.connect(device); 337 try { 338 const socket = await adb.socket(TRACED_SOCKET); 339 socket.close(); 340 return true; 341 } catch (e) { 342 return false; 343 } 344 } 345 346 handleIncomingFrame(frame: IPCFrame) { 347 const requestId = frame.requestId; 348 switch (frame.msg) { 349 case 'msgBindServiceReply': { 350 const msgBindServiceReply = frame.msgBindServiceReply; 351 if ( 352 msgBindServiceReply && 353 msgBindServiceReply.methods && 354 /* eslint-disable @typescript-eslint/strict-boolean-expressions */ 355 msgBindServiceReply.serviceId 356 ) { 357 /* eslint-enable */ 358 console.assert(msgBindServiceReply.success); 359 this.availableMethods = msgBindServiceReply.methods; 360 this.serviceId = msgBindServiceReply.serviceId; 361 this.resolveBindingPromise(); 362 this.resolveBindingPromise = () => {}; 363 } 364 return; 365 } 366 case 'msgInvokeMethodReply': { 367 const msgInvokeMethodReply = frame.msgInvokeMethodReply; 368 if (msgInvokeMethodReply && msgInvokeMethodReply.replyProto) { 369 if (!msgInvokeMethodReply.success) { 370 console.error( 371 'Unsuccessful method invocation: ', 372 msgInvokeMethodReply, 373 ); 374 return; 375 } 376 this.decodeResponse( 377 requestId, 378 msgInvokeMethodReply.replyProto, 379 msgInvokeMethodReply.hasMore === true, 380 ); 381 } 382 return; 383 } 384 default: 385 console.error(`not recognized frame message: ${frame.msg}`); 386 } 387 } 388} 389 390const decoders = new Map<string, Function>() 391 .set('EnableTracing', EnableTracingResponse.decode) 392 .set('FreeBuffers', FreeBuffersResponse.decode) 393 .set('ReadBuffers', ReadBuffersResponse.decode) 394 .set('DisableTracing', DisableTracingResponse.decode) 395 .set('GetTraceStats', GetTraceStatsResponse.decode); 396