1// Copyright (C) 2018 The Android Open Source Project 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15import {defer, Deferred} from '../base/deferred'; 16import {assertExists, assertTrue} from '../base/logging'; 17import {perfetto} from '../gen/protos'; 18 19import {ProtoRingBuffer} from './proto_ring_buffer'; 20import { 21 ComputeMetricArgs, 22 ComputeMetricResult, 23 QueryArgs, 24} from './protos'; 25import {NUM, NUM_NULL, STR} from './query_result'; 26import { 27 createQueryResult, 28 QueryError, 29 QueryResult, 30 WritableQueryResult, 31} from './query_result'; 32import {TimeSpan} from './time'; 33 34import TraceProcessorRpc = perfetto.protos.TraceProcessorRpc; 35import TraceProcessorRpcStream = perfetto.protos.TraceProcessorRpcStream; 36import TPM = perfetto.protos.TraceProcessorRpc.TraceProcessorMethod; 37 38export interface LoadingTracker { 39 beginLoading(): void; 40 endLoading(): void; 41} 42 43export class NullLoadingTracker implements LoadingTracker { 44 beginLoading(): void {} 45 endLoading(): void {} 46} 47 48 49// This is used to skip the decoding of queryResult from protobufjs and deal 50// with it ourselves. See the comment below around `QueryResult.decode = ...`. 51interface QueryResultBypass { 52 rawQueryResult: Uint8Array; 53} 54 55/** 56 * Abstract interface of a trace proccessor. 57 * This is the TypeScript equivalent of src/trace_processor/rpc.h. 58 * There are two concrete implementations: 59 * 1. WasmEngineProxy: creates a Wasm module and interacts over postMessage(). 60 * 2. HttpRpcEngine: connects to an external `trace_processor_shell --httpd`. 61 * and interacts via fetch(). 62 * In both cases, we have a byte-oriented pipe to interact with TraceProcessor. 63 * The derived class is only expected to deal with these two functions: 64 * 1. Implement the abstract rpcSendRequestBytes() function, sending the 65 * proto-encoded TraceProcessorRpc requests to the TraceProcessor instance. 66 * 2. Call onRpcResponseBytes() when response data is received. 67 */ 68export abstract class Engine { 69 abstract readonly id: string; 70 private _cpus?: number[]; 71 private _numGpus?: number; 72 private loadingTracker: LoadingTracker; 73 private txSeqId = 0; 74 private rxSeqId = 0; 75 private rxBuf = new ProtoRingBuffer(); 76 private pendingParses = new Array<Deferred<void>>(); 77 private pendingEOFs = new Array<Deferred<void>>(); 78 private pendingQueries = new Array<WritableQueryResult>(); 79 private pendingRestoreTables = new Array<Deferred<void>>(); 80 private pendingComputeMetrics = new Array<Deferred<ComputeMetricResult>>(); 81 82 constructor(tracker?: LoadingTracker) { 83 this.loadingTracker = tracker ? tracker : new NullLoadingTracker(); 84 } 85 86 /** 87 * Called to send data to the TraceProcessor instance. This turns into a 88 * postMessage() or a HTTP request, depending on the Engine implementation. 89 */ 90 abstract rpcSendRequestBytes(data: Uint8Array): void; 91 92 /** 93 * Called when an inbound message is received by the Engine implementation 94 * (e.g. onmessage for the Wasm case, on when HTTP replies are received for 95 * the HTTP+RPC case). 96 */ 97 onRpcResponseBytes(dataWillBeRetained: Uint8Array) { 98 // Note: when hitting the fastpath inside ProtoRingBuffer, the |data| buffer 99 // is returned back by readMessage() (% subarray()-ing it) and held onto by 100 // other classes (e.g., QueryResult). For both fetch() and Wasm we are fine 101 // because every response creates a new buffer. 102 this.rxBuf.append(dataWillBeRetained); 103 for (;;) { 104 const msg = this.rxBuf.readMessage(); 105 if (msg === undefined) break; 106 this.onRpcResponseMessage(msg); 107 } 108 } 109 110 /* 111 * Parses a response message. 112 * |rpcMsgEncoded| is a sub-array to to the start of a TraceProcessorRpc 113 * proto-encoded message (without the proto preamble and varint size). 114 */ 115 private onRpcResponseMessage(rpcMsgEncoded: Uint8Array) { 116 // Here we override the protobufjs-generated code to skip the parsing of the 117 // new streaming QueryResult and instead passing it through like a buffer. 118 // This is the overall problem: All trace processor responses are wrapped 119 // into a perfetto.protos.TraceProcessorRpc proto message. In all cases % 120 // TPM_QUERY_STREAMING, we want protobufjs to decode the proto bytes and 121 // give us a structured object. In the case of TPM_QUERY_STREAMING, instead, 122 // we want to deal with the proto parsing ourselves using the new 123 // QueryResult.appendResultBatch() method, because that handled streaming 124 // results more efficiently and skips several copies. 125 // By overriding the decode method below, we achieve two things: 126 // 1. We avoid protobufjs decoding the TraceProcessorRpc.query_result field. 127 // 2. We stash (a view of) the original buffer into the |rawQueryResult| so 128 // the `case TPM_QUERY_STREAMING` below can take it. 129 perfetto.protos.QueryResult.decode = 130 (reader: protobuf.Reader, length: number) => { 131 const res = 132 perfetto.protos.QueryResult.create() as {} as QueryResultBypass; 133 res.rawQueryResult = 134 reader.buf.subarray(reader.pos, reader.pos + length); 135 // All this works only if protobufjs returns the original ArrayBuffer 136 // from |rpcMsgEncoded|. It should be always the case given the 137 // current implementation. This check mainly guards against future 138 // behavioral changes of protobufjs. We don't want to accidentally 139 // hold onto some internal protobufjs buffer. We are fine holding 140 // onto |rpcMsgEncoded| because those come from ProtoRingBuffer which 141 // is buffer-retention-friendly. 142 assertTrue(res.rawQueryResult.buffer === rpcMsgEncoded.buffer); 143 reader.pos += length; 144 return res as {} as perfetto.protos.QueryResult; 145 }; 146 147 const rpc = TraceProcessorRpc.decode(rpcMsgEncoded); 148 149 if (rpc.fatalError !== undefined && rpc.fatalError.length > 0) { 150 throw new Error(`${rpc.fatalError}`); 151 } 152 153 // Allow restarting sequences from zero (when reloading the browser). 154 if (rpc.seq !== this.rxSeqId + 1 && this.rxSeqId !== 0 && rpc.seq !== 0) { 155 // "(ERR:rpc_seq)" is intercepted by error_dialog.ts to show a more 156 // graceful and actionable error. 157 throw new Error(`RPC sequence id mismatch cur=${rpc.seq} last=${ 158 this.rxSeqId} (ERR:rpc_seq)`); 159 } 160 161 this.rxSeqId = rpc.seq; 162 163 let isFinalResponse = true; 164 165 switch (rpc.response) { 166 case TPM.TPM_APPEND_TRACE_DATA: 167 const appendResult = assertExists(rpc.appendResult); 168 const pendingPromise = assertExists(this.pendingParses.shift()); 169 if (appendResult.error && appendResult.error.length > 0) { 170 pendingPromise.reject(appendResult.error); 171 } else { 172 pendingPromise.resolve(); 173 } 174 break; 175 case TPM.TPM_FINALIZE_TRACE_DATA: 176 assertExists(this.pendingEOFs.shift()).resolve(); 177 break; 178 case TPM.TPM_RESTORE_INITIAL_TABLES: 179 assertExists(this.pendingRestoreTables.shift()).resolve(); 180 break; 181 case TPM.TPM_QUERY_STREAMING: 182 const qRes = assertExists(rpc.queryResult) as {} as QueryResultBypass; 183 const pendingQuery = assertExists(this.pendingQueries[0]); 184 pendingQuery.appendResultBatch(qRes.rawQueryResult); 185 if (pendingQuery.isComplete()) { 186 this.pendingQueries.shift(); 187 } else { 188 isFinalResponse = false; 189 } 190 break; 191 case TPM.TPM_COMPUTE_METRIC: 192 const metricRes = assertExists(rpc.metricResult) as ComputeMetricResult; 193 if (metricRes.error && metricRes.error.length > 0) { 194 throw new QueryError(`ComputeMetric() error: ${metricRes.error}`, { 195 query: 'COMPUTE_METRIC', 196 }); 197 } 198 assertExists(this.pendingComputeMetrics.shift()).resolve(metricRes); 199 break; 200 default: 201 console.log( 202 'Unexpected TraceProcessor response received: ', rpc.response); 203 break; 204 } // switch(rpc.response); 205 206 if (isFinalResponse) { 207 this.loadingTracker.endLoading(); 208 } 209 } 210 211 /** 212 * TraceProcessor methods below this point. 213 * The methods below are called by the various controllers in the UI and 214 * deal with marshalling / unmarshaling requests to/from TraceProcessor. 215 */ 216 217 218 /** 219 * Push trace data into the engine. The engine is supposed to automatically 220 * figure out the type of the trace (JSON vs Protobuf). 221 */ 222 parse(data: Uint8Array): Promise<void> { 223 const asyncRes = defer<void>(); 224 this.pendingParses.push(asyncRes); 225 const rpc = TraceProcessorRpc.create(); 226 rpc.request = TPM.TPM_APPEND_TRACE_DATA; 227 rpc.appendTraceData = data; 228 this.rpcSendRequest(rpc); 229 return asyncRes; // Linearize with the worker. 230 } 231 232 /** 233 * Notify the engine that we reached the end of the trace. 234 * Called after the last parse() call. 235 */ 236 notifyEof(): Promise<void> { 237 const asyncRes = defer<void>(); 238 this.pendingEOFs.push(asyncRes); 239 const rpc = TraceProcessorRpc.create(); 240 rpc.request = TPM.TPM_FINALIZE_TRACE_DATA; 241 this.rpcSendRequest(rpc); 242 return asyncRes; // Linearize with the worker. 243 } 244 245 /** 246 * Resets the trace processor state by destroying any table/views created by 247 * the UI after loading. 248 */ 249 restoreInitialTables(): Promise<void> { 250 const asyncRes = defer<void>(); 251 this.pendingRestoreTables.push(asyncRes); 252 const rpc = TraceProcessorRpc.create(); 253 rpc.request = TPM.TPM_RESTORE_INITIAL_TABLES; 254 this.rpcSendRequest(rpc); 255 return asyncRes; // Linearize with the worker. 256 } 257 258 /** 259 * Shorthand for sending a compute metrics request to the engine. 260 */ 261 async computeMetric(metrics: string[]): Promise<ComputeMetricResult> { 262 const asyncRes = defer<ComputeMetricResult>(); 263 this.pendingComputeMetrics.push(asyncRes); 264 const rpc = TraceProcessorRpc.create(); 265 rpc.request = TPM.TPM_COMPUTE_METRIC; 266 const args = rpc.computeMetricArgs = new ComputeMetricArgs(); 267 args.metricNames = metrics; 268 args.format = ComputeMetricArgs.ResultFormat.TEXTPROTO; 269 this.rpcSendRequest(rpc); 270 return asyncRes; 271 } 272 273 /* 274 * Issues a streaming query and retrieve results in batches. 275 * The returned QueryResult object will be populated over time with batches 276 * of rows (each batch conveys ~128KB of data and a variable number of rows). 277 * The caller can decide whether to wait that all batches have been received 278 * (by awaiting the returned object or calling result.waitAllRows()) or handle 279 * the rows incrementally. 280 * 281 * Example usage: 282 * const res = engine.query('SELECT foo, bar FROM table'); 283 * console.log(res.numRows()); // Will print 0 because we didn't await. 284 * await(res.waitAllRows()); 285 * console.log(res.numRows()); // Will print the total number of rows. 286 * 287 * for (const it = res.iter({foo: NUM, bar:STR}); it.valid(); it.next()) { 288 * console.log(it.foo, it.bar); 289 * } 290 */ 291 query(sqlQuery: string): Promise<QueryResult>&QueryResult { 292 const rpc = TraceProcessorRpc.create(); 293 rpc.request = TPM.TPM_QUERY_STREAMING; 294 rpc.queryArgs = new QueryArgs(); 295 rpc.queryArgs.sqlQuery = sqlQuery; 296 const result = createQueryResult({ 297 query: sqlQuery, 298 }); 299 this.pendingQueries.push(result); 300 this.rpcSendRequest(rpc); 301 return result; 302 } 303 304 /** 305 * Marshals the TraceProcessorRpc request arguments and sends the request 306 * to the concrete Engine (Wasm or HTTP). 307 */ 308 private rpcSendRequest(rpc: TraceProcessorRpc) { 309 rpc.seq = this.txSeqId++; 310 // Each message is wrapped in a TraceProcessorRpcStream to add the varint 311 // preamble with the size, which allows tokenization on the other end. 312 const outerProto = TraceProcessorRpcStream.create(); 313 outerProto.msg.push(rpc); 314 const buf = TraceProcessorRpcStream.encode(outerProto).finish(); 315 this.loadingTracker.beginLoading(); 316 this.rpcSendRequestBytes(buf); 317 } 318 319 // TODO(hjd): When streaming must invalidate this somehow. 320 async getCpus(): Promise<number[]> { 321 if (!this._cpus) { 322 const cpus = []; 323 const queryRes = await this.query( 324 'select distinct(cpu) as cpu from sched order by cpu;'); 325 for (const it = queryRes.iter({cpu: NUM}); it.valid(); it.next()) { 326 cpus.push(it.cpu); 327 } 328 this._cpus = cpus; 329 } 330 return this._cpus; 331 } 332 333 async getNumberOfGpus(): Promise<number> { 334 if (!this._numGpus) { 335 const result = await this.query(` 336 select count(distinct(gpu_id)) as gpuCount 337 from gpu_counter_track 338 where name = 'gpufreq'; 339 `); 340 this._numGpus = result.firstRow({gpuCount: NUM}).gpuCount; 341 } 342 return this._numGpus; 343 } 344 345 // TODO: This should live in code that's more specific to chrome, instead of 346 // in engine. 347 async getNumberOfProcesses(): Promise<number> { 348 const result = await this.query('select count(*) as cnt from process;'); 349 return result.firstRow({cnt: NUM}).cnt; 350 } 351 352 async getTraceTimeBounds(): Promise<TimeSpan> { 353 const result = await this.query( 354 `select start_ts as startTs, end_ts as endTs from trace_bounds`); 355 const bounds = result.firstRow({ 356 startTs: NUM, 357 endTs: NUM, 358 }); 359 return new TimeSpan(bounds.startTs / 1e9, bounds.endTs / 1e9); 360 } 361 362 async getTracingMetadataTimeBounds(): Promise<TimeSpan> { 363 const queryRes = await this.query(`select 364 name, 365 int_value as intValue 366 from metadata 367 where name = 'tracing_started_ns' or name = 'tracing_disabled_ns' 368 or name = 'all_data_source_started_ns'`); 369 let startBound = -Infinity; 370 let endBound = Infinity; 371 const it = queryRes.iter({'name': STR, 'intValue': NUM_NULL}); 372 for (; it.valid(); it.next()) { 373 const columnName = it.name; 374 const timestamp = it.intValue; 375 if (timestamp === null) continue; 376 if (columnName === 'tracing_disabled_ns') { 377 endBound = Math.min(endBound, timestamp / 1e9); 378 } else { 379 startBound = Math.max(startBound, timestamp / 1e9); 380 } 381 } 382 383 return new TimeSpan(startBound, endBound); 384 } 385} 386