1// Copyright (C) 2018 The Android Open Source Project 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15import {defer, Deferred} from '../base/deferred'; 16import {assertExists, assertTrue} from '../base/logging'; 17import {perfetto} from '../gen/protos'; 18 19import {ProtoRingBuffer} from './proto_ring_buffer'; 20import { 21 ComputeMetricArgs, 22 ComputeMetricResult, 23 DisableAndReadMetatraceResult, 24 QueryArgs, 25 ResetTraceProcessorArgs, 26} from './protos'; 27import {LONG, LONG_NULL, NUM, STR} from './query_result'; 28import { 29 createQueryResult, 30 QueryError, 31 QueryResult, 32 WritableQueryResult, 33} from './query_result'; 34import {TPTime, TPTimeSpan} from './time'; 35 36import TraceProcessorRpc = perfetto.protos.TraceProcessorRpc; 37import TraceProcessorRpcStream = perfetto.protos.TraceProcessorRpcStream; 38import TPM = perfetto.protos.TraceProcessorRpc.TraceProcessorMethod; 39import {Span} from '../common/time'; 40import {BigintMath} from '../base/bigint_math'; 41 42export interface LoadingTracker { 43 beginLoading(): void; 44 endLoading(): void; 45} 46 47export class NullLoadingTracker implements LoadingTracker { 48 beginLoading(): void {} 49 endLoading(): void {} 50} 51 52 53// This is used to skip the decoding of queryResult from protobufjs and deal 54// with it ourselves. See the comment below around `QueryResult.decode = ...`. 55interface QueryResultBypass { 56 rawQueryResult: Uint8Array; 57} 58 59export interface TraceProcessorConfig { 60 cropTrackEvents: boolean; 61 ingestFtraceInRawTable: boolean; 62 analyzeTraceProtoContent: boolean; 63} 64 65// Abstract interface of a trace proccessor. 66// This is the TypeScript equivalent of src/trace_processor/rpc.h. 67// There are two concrete implementations: 68// 1. WasmEngineProxy: creates a Wasm module and interacts over postMessage(). 69// 2. HttpRpcEngine: connects to an external `trace_processor_shell --httpd`. 70// and interacts via fetch(). 71// In both cases, we have a byte-oriented pipe to interact with TraceProcessor. 72// The derived class is only expected to deal with these two functions: 73// 1. Implement the abstract rpcSendRequestBytes() function, sending the 74// proto-encoded TraceProcessorRpc requests to the TraceProcessor instance. 75// 2. Call onRpcResponseBytes() when response data is received. 76export abstract class Engine { 77 abstract readonly id: string; 78 private _cpus?: number[]; 79 private _numGpus?: number; 80 private loadingTracker: LoadingTracker; 81 private txSeqId = 0; 82 private rxSeqId = 0; 83 private rxBuf = new ProtoRingBuffer(); 84 private pendingParses = new Array<Deferred<void>>(); 85 private pendingEOFs = new Array<Deferred<void>>(); 86 private pendingResetTraceProcessors = new Array<Deferred<void>>(); 87 private pendingQueries = new Array<WritableQueryResult>(); 88 private pendingRestoreTables = new Array<Deferred<void>>(); 89 private pendingComputeMetrics = new Array<Deferred<ComputeMetricResult>>(); 90 private pendingReadMetatrace?: Deferred<DisableAndReadMetatraceResult>; 91 private _isMetatracingEnabled = false; 92 93 constructor(tracker?: LoadingTracker) { 94 this.loadingTracker = tracker ? tracker : new NullLoadingTracker(); 95 } 96 97 // Called to send data to the TraceProcessor instance. This turns into a 98 // postMessage() or a HTTP request, depending on the Engine implementation. 99 abstract rpcSendRequestBytes(data: Uint8Array): void; 100 101 // Called when an inbound message is received by the Engine implementation 102 // (e.g. onmessage for the Wasm case, on when HTTP replies are received for 103 // the HTTP+RPC case). 104 onRpcResponseBytes(dataWillBeRetained: Uint8Array) { 105 // Note: when hitting the fastpath inside ProtoRingBuffer, the |data| buffer 106 // is returned back by readMessage() (% subarray()-ing it) and held onto by 107 // other classes (e.g., QueryResult). For both fetch() and Wasm we are fine 108 // because every response creates a new buffer. 109 this.rxBuf.append(dataWillBeRetained); 110 for (;;) { 111 const msg = this.rxBuf.readMessage(); 112 if (msg === undefined) break; 113 this.onRpcResponseMessage(msg); 114 } 115 } 116 117 // Parses a response message. 118 // |rpcMsgEncoded| is a sub-array to to the start of a TraceProcessorRpc 119 // proto-encoded message (without the proto preamble and varint size). 120 private onRpcResponseMessage(rpcMsgEncoded: Uint8Array) { 121 // Here we override the protobufjs-generated code to skip the parsing of the 122 // new streaming QueryResult and instead passing it through like a buffer. 123 // This is the overall problem: All trace processor responses are wrapped 124 // into a perfetto.protos.TraceProcessorRpc proto message. In all cases % 125 // TPM_QUERY_STREAMING, we want protobufjs to decode the proto bytes and 126 // give us a structured object. In the case of TPM_QUERY_STREAMING, instead, 127 // we want to deal with the proto parsing ourselves using the new 128 // QueryResult.appendResultBatch() method, because that handled streaming 129 // results more efficiently and skips several copies. 130 // By overriding the decode method below, we achieve two things: 131 // 1. We avoid protobufjs decoding the TraceProcessorRpc.query_result field. 132 // 2. We stash (a view of) the original buffer into the |rawQueryResult| so 133 // the `case TPM_QUERY_STREAMING` below can take it. 134 perfetto.protos.QueryResult.decode = 135 (reader: protobuf.Reader, length: number) => { 136 const res = 137 perfetto.protos.QueryResult.create() as {} as QueryResultBypass; 138 res.rawQueryResult = 139 reader.buf.subarray(reader.pos, reader.pos + length); 140 // All this works only if protobufjs returns the original ArrayBuffer 141 // from |rpcMsgEncoded|. It should be always the case given the 142 // current implementation. This check mainly guards against future 143 // behavioral changes of protobufjs. We don't want to accidentally 144 // hold onto some internal protobufjs buffer. We are fine holding 145 // onto |rpcMsgEncoded| because those come from ProtoRingBuffer which 146 // is buffer-retention-friendly. 147 assertTrue(res.rawQueryResult.buffer === rpcMsgEncoded.buffer); 148 reader.pos += length; 149 return res as {} as perfetto.protos.QueryResult; 150 }; 151 152 const rpc = TraceProcessorRpc.decode(rpcMsgEncoded); 153 154 if (rpc.fatalError !== undefined && rpc.fatalError.length > 0) { 155 throw new Error(`${rpc.fatalError}`); 156 } 157 158 // Allow restarting sequences from zero (when reloading the browser). 159 if (rpc.seq !== this.rxSeqId + 1 && this.rxSeqId !== 0 && rpc.seq !== 0) { 160 // "(ERR:rpc_seq)" is intercepted by error_dialog.ts to show a more 161 // graceful and actionable error. 162 throw new Error(`RPC sequence id mismatch cur=${rpc.seq} last=${ 163 this.rxSeqId} (ERR:rpc_seq)`); 164 } 165 166 this.rxSeqId = rpc.seq; 167 168 let isFinalResponse = true; 169 170 switch (rpc.response) { 171 case TPM.TPM_APPEND_TRACE_DATA: 172 const appendResult = assertExists(rpc.appendResult); 173 const pendingPromise = assertExists(this.pendingParses.shift()); 174 if (appendResult.error && appendResult.error.length > 0) { 175 pendingPromise.reject(appendResult.error); 176 } else { 177 pendingPromise.resolve(); 178 } 179 break; 180 case TPM.TPM_FINALIZE_TRACE_DATA: 181 assertExists(this.pendingEOFs.shift()).resolve(); 182 break; 183 case TPM.TPM_RESET_TRACE_PROCESSOR: 184 assertExists(this.pendingResetTraceProcessors.shift()).resolve(); 185 break; 186 case TPM.TPM_RESTORE_INITIAL_TABLES: 187 assertExists(this.pendingRestoreTables.shift()).resolve(); 188 break; 189 case TPM.TPM_QUERY_STREAMING: 190 const qRes = assertExists(rpc.queryResult) as {} as QueryResultBypass; 191 const pendingQuery = assertExists(this.pendingQueries[0]); 192 pendingQuery.appendResultBatch(qRes.rawQueryResult); 193 if (pendingQuery.isComplete()) { 194 this.pendingQueries.shift(); 195 } else { 196 isFinalResponse = false; 197 } 198 break; 199 case TPM.TPM_COMPUTE_METRIC: 200 const metricRes = assertExists(rpc.metricResult) as ComputeMetricResult; 201 const pendingComputeMetric = 202 assertExists(this.pendingComputeMetrics.shift()); 203 if (metricRes.error && metricRes.error.length > 0) { 204 const error = 205 new QueryError(`ComputeMetric() error: ${metricRes.error}`, { 206 query: 'COMPUTE_METRIC', 207 }); 208 pendingComputeMetric.reject(error); 209 } else { 210 pendingComputeMetric.resolve(metricRes); 211 } 212 break; 213 case TPM.TPM_DISABLE_AND_READ_METATRACE: 214 const metatraceRes = 215 assertExists(rpc.metatrace) as DisableAndReadMetatraceResult; 216 assertExists(this.pendingReadMetatrace).resolve(metatraceRes); 217 this.pendingReadMetatrace = undefined; 218 break; 219 default: 220 console.log( 221 'Unexpected TraceProcessor response received: ', rpc.response); 222 break; 223 } // switch(rpc.response); 224 225 if (isFinalResponse) { 226 this.loadingTracker.endLoading(); 227 } 228 } 229 230 // TraceProcessor methods below this point. 231 // The methods below are called by the various controllers in the UI and 232 // deal with marshalling / unmarshaling requests to/from TraceProcessor. 233 234 235 // Push trace data into the engine. The engine is supposed to automatically 236 // figure out the type of the trace (JSON vs Protobuf). 237 parse(data: Uint8Array): Promise<void> { 238 const asyncRes = defer<void>(); 239 this.pendingParses.push(asyncRes); 240 const rpc = TraceProcessorRpc.create(); 241 rpc.request = TPM.TPM_APPEND_TRACE_DATA; 242 rpc.appendTraceData = data; 243 this.rpcSendRequest(rpc); 244 return asyncRes; // Linearize with the worker. 245 } 246 247 // Notify the engine that we reached the end of the trace. 248 // Called after the last parse() call. 249 notifyEof(): Promise<void> { 250 const asyncRes = defer<void>(); 251 this.pendingEOFs.push(asyncRes); 252 const rpc = TraceProcessorRpc.create(); 253 rpc.request = TPM.TPM_FINALIZE_TRACE_DATA; 254 this.rpcSendRequest(rpc); 255 return asyncRes; // Linearize with the worker. 256 } 257 258 // Updates the TraceProcessor Config. This method creates a new 259 // TraceProcessor instance, so it should be called before passing any trace 260 // data. 261 resetTraceProcessor( 262 {cropTrackEvents, ingestFtraceInRawTable, analyzeTraceProtoContent}: 263 TraceProcessorConfig): Promise<void> { 264 const asyncRes = defer<void>(); 265 this.pendingResetTraceProcessors.push(asyncRes); 266 const rpc = TraceProcessorRpc.create(); 267 rpc.request = TPM.TPM_RESET_TRACE_PROCESSOR; 268 const args = rpc.resetTraceProcessorArgs = new ResetTraceProcessorArgs(); 269 args.dropTrackEventDataBefore = cropTrackEvents ? 270 ResetTraceProcessorArgs.DropTrackEventDataBefore 271 .TRACK_EVENT_RANGE_OF_INTEREST : 272 ResetTraceProcessorArgs.DropTrackEventDataBefore.NO_DROP; 273 args.ingestFtraceInRawTable = ingestFtraceInRawTable; 274 args.analyzeTraceProtoContent = analyzeTraceProtoContent; 275 this.rpcSendRequest(rpc); 276 return asyncRes; 277 } 278 279 // Resets the trace processor state by destroying any table/views created by 280 // the UI after loading. 281 restoreInitialTables(): Promise<void> { 282 const asyncRes = defer<void>(); 283 this.pendingRestoreTables.push(asyncRes); 284 const rpc = TraceProcessorRpc.create(); 285 rpc.request = TPM.TPM_RESTORE_INITIAL_TABLES; 286 this.rpcSendRequest(rpc); 287 return asyncRes; // Linearize with the worker. 288 } 289 290 // Shorthand for sending a compute metrics request to the engine. 291 async computeMetric(metrics: string[]): Promise<ComputeMetricResult> { 292 const asyncRes = defer<ComputeMetricResult>(); 293 this.pendingComputeMetrics.push(asyncRes); 294 const rpc = TraceProcessorRpc.create(); 295 rpc.request = TPM.TPM_COMPUTE_METRIC; 296 const args = rpc.computeMetricArgs = new ComputeMetricArgs(); 297 args.metricNames = metrics; 298 args.format = ComputeMetricArgs.ResultFormat.TEXTPROTO; 299 this.rpcSendRequest(rpc); 300 return asyncRes; 301 } 302 303 // Issues a streaming query and retrieve results in batches. 304 // The returned QueryResult object will be populated over time with batches 305 // of rows (each batch conveys ~128KB of data and a variable number of rows). 306 // The caller can decide whether to wait that all batches have been received 307 // (by awaiting the returned object or calling result.waitAllRows()) or handle 308 // the rows incrementally. 309 // 310 // Example usage: 311 // const res = engine.query('SELECT foo, bar FROM table'); 312 // console.log(res.numRows()); // Will print 0 because we didn't await. 313 // await(res.waitAllRows()); 314 // console.log(res.numRows()); // Will print the total number of rows. 315 // 316 // for (const it = res.iter({foo: NUM, bar:STR}); it.valid(); it.next()) { 317 // console.log(it.foo, it.bar); 318 // } 319 // 320 // Optional |tag| (usually a component name) can be provided to allow 321 // attributing trace processor workload to different UI components. 322 query(sqlQuery: string, tag?: string): Promise<QueryResult>&QueryResult { 323 const rpc = TraceProcessorRpc.create(); 324 rpc.request = TPM.TPM_QUERY_STREAMING; 325 rpc.queryArgs = new QueryArgs(); 326 rpc.queryArgs.sqlQuery = sqlQuery; 327 if (tag) { 328 rpc.queryArgs.tag = tag; 329 } 330 const result = createQueryResult({ 331 query: sqlQuery, 332 }); 333 this.pendingQueries.push(result); 334 this.rpcSendRequest(rpc); 335 return result; 336 } 337 338 isMetatracingEnabled(): boolean { 339 return this._isMetatracingEnabled; 340 } 341 342 enableMetatrace(categories?: perfetto.protos.MetatraceCategories) { 343 const rpc = TraceProcessorRpc.create(); 344 rpc.request = TPM.TPM_ENABLE_METATRACE; 345 if (categories) { 346 rpc.enableMetatraceArgs = new perfetto.protos.EnableMetatraceArgs(); 347 rpc.enableMetatraceArgs.categories = categories; 348 } 349 this._isMetatracingEnabled = true; 350 this.rpcSendRequest(rpc); 351 } 352 353 stopAndGetMetatrace(): Promise<DisableAndReadMetatraceResult> { 354 // If we are already finalising a metatrace, ignore the request. 355 if (this.pendingReadMetatrace) { 356 return Promise.reject(new Error('Already finalising a metatrace')); 357 } 358 359 const result = defer<DisableAndReadMetatraceResult>(); 360 361 const rpc = TraceProcessorRpc.create(); 362 rpc.request = TPM.TPM_DISABLE_AND_READ_METATRACE; 363 this._isMetatracingEnabled = false; 364 this.pendingReadMetatrace = result; 365 this.rpcSendRequest(rpc); 366 return result; 367 } 368 369 // Marshals the TraceProcessorRpc request arguments and sends the request 370 // to the concrete Engine (Wasm or HTTP). 371 private rpcSendRequest(rpc: TraceProcessorRpc) { 372 rpc.seq = this.txSeqId++; 373 // Each message is wrapped in a TraceProcessorRpcStream to add the varint 374 // preamble with the size, which allows tokenization on the other end. 375 const outerProto = TraceProcessorRpcStream.create(); 376 outerProto.msg.push(rpc); 377 const buf = TraceProcessorRpcStream.encode(outerProto).finish(); 378 this.loadingTracker.beginLoading(); 379 this.rpcSendRequestBytes(buf); 380 } 381 382 // TODO(hjd): When streaming must invalidate this somehow. 383 async getCpus(): Promise<number[]> { 384 if (!this._cpus) { 385 const cpus = []; 386 const queryRes = await this.query( 387 'select distinct(cpu) as cpu from sched order by cpu;'); 388 for (const it = queryRes.iter({cpu: NUM}); it.valid(); it.next()) { 389 cpus.push(it.cpu); 390 } 391 this._cpus = cpus; 392 } 393 return this._cpus; 394 } 395 396 async getNumberOfGpus(): Promise<number> { 397 if (!this._numGpus) { 398 const result = await this.query(` 399 select count(distinct(gpu_id)) as gpuCount 400 from gpu_counter_track 401 where name = 'gpufreq'; 402 `); 403 this._numGpus = result.firstRow({gpuCount: NUM}).gpuCount; 404 } 405 return this._numGpus; 406 } 407 408 // TODO: This should live in code that's more specific to chrome, instead of 409 // in engine. 410 async getNumberOfProcesses(): Promise<number> { 411 const result = await this.query('select count(*) as cnt from process;'); 412 return result.firstRow({cnt: NUM}).cnt; 413 } 414 415 async getTraceTimeBounds(): Promise<Span<TPTime>> { 416 const result = await this.query( 417 `select start_ts as startTs, end_ts as endTs from trace_bounds`); 418 const bounds = result.firstRow({ 419 startTs: LONG, 420 endTs: LONG, 421 }); 422 return new TPTimeSpan(bounds.startTs, bounds.endTs); 423 } 424 425 async getTracingMetadataTimeBounds(): Promise<Span<TPTime>> { 426 const queryRes = await this.query(`select 427 name, 428 int_value as intValue 429 from metadata 430 where name = 'tracing_started_ns' or name = 'tracing_disabled_ns' 431 or name = 'all_data_source_started_ns'`); 432 let startBound = 0n; 433 let endBound = BigintMath.INT64_MAX; 434 const it = queryRes.iter({'name': STR, 'intValue': LONG_NULL}); 435 for (; it.valid(); it.next()) { 436 const columnName = it.name; 437 const timestamp = it.intValue; 438 if (timestamp === null) continue; 439 if (columnName === 'tracing_disabled_ns') { 440 endBound = BigintMath.min(endBound, timestamp); 441 } else { 442 startBound = BigintMath.max(startBound, timestamp); 443 } 444 } 445 446 return new TPTimeSpan(startBound, endBound); 447 } 448 449 getProxy(tag: string): EngineProxy { 450 return new EngineProxy(this, tag); 451 } 452} 453 454// Lightweight wrapper over Engine exposing only `query` method and annotating 455// all queries going through it with a tag. 456export class EngineProxy { 457 private engine: Engine; 458 private tag: string; 459 460 constructor(engine: Engine, tag: string) { 461 this.engine = engine; 462 this.tag = tag; 463 } 464 465 query(sqlQuery: string, tag?: string): Promise<QueryResult>&QueryResult { 466 return this.engine.query(sqlQuery, tag || this.tag); 467 } 468 469 get engineId(): string { 470 return this.engine.id; 471 } 472} 473