1// Copyright (C) 2018 The Android Open Source Project 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15import {defer, Deferred} from '../base/deferred'; 16import {assertExists, assertTrue} from '../base/logging'; 17import { 18 ComputeMetricArgs, 19 ComputeMetricResult, 20 DisableAndReadMetatraceResult, 21 EnableMetatraceArgs, 22 MetatraceCategories, 23 QueryArgs, 24 QueryResult as ProtoQueryResult, 25 ResetTraceProcessorArgs, 26 TraceProcessorRpc, 27 TraceProcessorRpcStream, 28} from '../protos'; 29 30import {ProtoRingBuffer} from './proto_ring_buffer'; 31import { 32 createQueryResult, 33 QueryError, 34 QueryResult, 35 WritableQueryResult, 36} from './query_result'; 37 38import TPM = TraceProcessorRpc.TraceProcessorMethod; 39import {Disposable} from '../base/disposable'; 40import {Result} from '../base/utils'; 41 42export interface LoadingTracker { 43 beginLoading(): void; 44 endLoading(): void; 45} 46 47export class NullLoadingTracker implements LoadingTracker { 48 beginLoading(): void {} 49 endLoading(): void {} 50} 51 52// This is used to skip the decoding of queryResult from protobufjs and deal 53// with it ourselves. See the comment below around `QueryResult.decode = ...`. 54interface QueryResultBypass { 55 rawQueryResult: Uint8Array; 56} 57 58export interface TraceProcessorConfig { 59 cropTrackEvents: boolean; 60 ingestFtraceInRawTable: boolean; 61 analyzeTraceProtoContent: boolean; 62 ftraceDropUntilAllCpusValid: boolean; 63} 64 65export interface Engine { 66 /** 67 * Execute a query against the database, returning a promise that resolves 68 * when the query has completed but rejected when the query fails for whatever 69 * reason. On success, the promise will only resolve once all the resulting 70 * rows have been received. 71 * 72 * The promise will be rejected if the query fails. 73 * 74 * @param sql The query to execute. 75 * @param tag An optional tag used to trace the origin of the query. 76 */ 77 query(sql: string, tag?: string): Promise<QueryResult>; 78 79 /** 80 * Execute a query against the database, returning a promise that resolves 81 * when the query has completed or failed. The promise will never get 82 * rejected, it will always successfully resolve. Use the returned wrapper 83 * object to determine whether the query completed successfully. 84 * 85 * The promise will only resolve once all the resulting rows have been 86 * received. 87 * 88 * @param sql The query to execute. 89 * @param tag An optional tag used to trace the origin of the query. 90 */ 91 tryQuery(sql: string, tag?: string): Promise<Result<QueryResult, Error>>; 92 93 /** 94 * Execute one or more metric and get the result. 95 * 96 * @param metrics The metrics to run. 97 * @param format The format of the response. 98 */ 99 computeMetric( 100 metrics: string[], 101 format: 'json' | 'prototext' | 'proto', 102 ): Promise<string | Uint8Array>; 103} 104 105// Abstract interface of a trace proccessor. 106// This is the TypeScript equivalent of src/trace_processor/rpc.h. 107// There are two concrete implementations: 108// 1. WasmEngineProxy: creates a Wasm module and interacts over postMessage(). 109// 2. HttpRpcEngine: connects to an external `trace_processor_shell --httpd`. 110// and interacts via fetch(). 111// In both cases, we have a byte-oriented pipe to interact with TraceProcessor. 112// The derived class is only expected to deal with these two functions: 113// 1. Implement the abstract rpcSendRequestBytes() function, sending the 114// proto-encoded TraceProcessorRpc requests to the TraceProcessor instance. 115// 2. Call onRpcResponseBytes() when response data is received. 116export abstract class EngineBase implements Engine { 117 abstract readonly id: string; 118 private loadingTracker: LoadingTracker; 119 private txSeqId = 0; 120 private rxSeqId = 0; 121 private rxBuf = new ProtoRingBuffer(); 122 private pendingParses = new Array<Deferred<void>>(); 123 private pendingEOFs = new Array<Deferred<void>>(); 124 private pendingResetTraceProcessors = new Array<Deferred<void>>(); 125 private pendingQueries = new Array<WritableQueryResult>(); 126 private pendingRestoreTables = new Array<Deferred<void>>(); 127 private pendingComputeMetrics = new Array<Deferred<string | Uint8Array>>(); 128 private pendingReadMetatrace?: Deferred<DisableAndReadMetatraceResult>; 129 private _isMetatracingEnabled = false; 130 131 constructor(tracker?: LoadingTracker) { 132 this.loadingTracker = tracker ? tracker : new NullLoadingTracker(); 133 } 134 135 // Called to send data to the TraceProcessor instance. This turns into a 136 // postMessage() or a HTTP request, depending on the Engine implementation. 137 abstract rpcSendRequestBytes(data: Uint8Array): void; 138 139 // Called when an inbound message is received by the Engine implementation 140 // (e.g. onmessage for the Wasm case, on when HTTP replies are received for 141 // the HTTP+RPC case). 142 onRpcResponseBytes(dataWillBeRetained: Uint8Array) { 143 // Note: when hitting the fastpath inside ProtoRingBuffer, the |data| buffer 144 // is returned back by readMessage() (% subarray()-ing it) and held onto by 145 // other classes (e.g., QueryResult). For both fetch() and Wasm we are fine 146 // because every response creates a new buffer. 147 this.rxBuf.append(dataWillBeRetained); 148 for (;;) { 149 const msg = this.rxBuf.readMessage(); 150 if (msg === undefined) break; 151 this.onRpcResponseMessage(msg); 152 } 153 } 154 155 // Parses a response message. 156 // |rpcMsgEncoded| is a sub-array to to the start of a TraceProcessorRpc 157 // proto-encoded message (without the proto preamble and varint size). 158 private onRpcResponseMessage(rpcMsgEncoded: Uint8Array) { 159 // Here we override the protobufjs-generated code to skip the parsing of the 160 // new streaming QueryResult and instead passing it through like a buffer. 161 // This is the overall problem: All trace processor responses are wrapped 162 // into a perfetto.protos.TraceProcessorRpc proto message. In all cases % 163 // TPM_QUERY_STREAMING, we want protobufjs to decode the proto bytes and 164 // give us a structured object. In the case of TPM_QUERY_STREAMING, instead, 165 // we want to deal with the proto parsing ourselves using the new 166 // QueryResult.appendResultBatch() method, because that handled streaming 167 // results more efficiently and skips several copies. 168 // By overriding the decode method below, we achieve two things: 169 // 1. We avoid protobufjs decoding the TraceProcessorRpc.query_result field. 170 // 2. We stash (a view of) the original buffer into the |rawQueryResult| so 171 // the `case TPM_QUERY_STREAMING` below can take it. 172 ProtoQueryResult.decode = (reader: protobuf.Reader, length: number) => { 173 const res = ProtoQueryResult.create() as {} as QueryResultBypass; 174 res.rawQueryResult = reader.buf.subarray(reader.pos, reader.pos + length); 175 // All this works only if protobufjs returns the original ArrayBuffer 176 // from |rpcMsgEncoded|. It should be always the case given the 177 // current implementation. This check mainly guards against future 178 // behavioral changes of protobufjs. We don't want to accidentally 179 // hold onto some internal protobufjs buffer. We are fine holding 180 // onto |rpcMsgEncoded| because those come from ProtoRingBuffer which 181 // is buffer-retention-friendly. 182 assertTrue(res.rawQueryResult.buffer === rpcMsgEncoded.buffer); 183 reader.pos += length; 184 return res as {} as ProtoQueryResult; 185 }; 186 187 const rpc = TraceProcessorRpc.decode(rpcMsgEncoded); 188 189 if (rpc.fatalError !== undefined && rpc.fatalError.length > 0) { 190 throw new Error(`${rpc.fatalError}`); 191 } 192 193 // Allow restarting sequences from zero (when reloading the browser). 194 if (rpc.seq !== this.rxSeqId + 1 && this.rxSeqId !== 0 && rpc.seq !== 0) { 195 // "(ERR:rpc_seq)" is intercepted by error_dialog.ts to show a more 196 // graceful and actionable error. 197 throw new Error( 198 `RPC sequence id mismatch cur=${rpc.seq} last=${this.rxSeqId} (ERR:rpc_seq)`, 199 ); 200 } 201 202 this.rxSeqId = rpc.seq; 203 204 let isFinalResponse = true; 205 206 switch (rpc.response) { 207 case TPM.TPM_APPEND_TRACE_DATA: 208 const appendResult = assertExists(rpc.appendResult); 209 const pendingPromise = assertExists(this.pendingParses.shift()); 210 if (appendResult.error && appendResult.error.length > 0) { 211 pendingPromise.reject(appendResult.error); 212 } else { 213 pendingPromise.resolve(); 214 } 215 break; 216 case TPM.TPM_FINALIZE_TRACE_DATA: 217 assertExists(this.pendingEOFs.shift()).resolve(); 218 break; 219 case TPM.TPM_RESET_TRACE_PROCESSOR: 220 assertExists(this.pendingResetTraceProcessors.shift()).resolve(); 221 break; 222 case TPM.TPM_RESTORE_INITIAL_TABLES: 223 assertExists(this.pendingRestoreTables.shift()).resolve(); 224 break; 225 case TPM.TPM_QUERY_STREAMING: 226 const qRes = assertExists(rpc.queryResult) as {} as QueryResultBypass; 227 const pendingQuery = assertExists(this.pendingQueries[0]); 228 pendingQuery.appendResultBatch(qRes.rawQueryResult); 229 if (pendingQuery.isComplete()) { 230 this.pendingQueries.shift(); 231 } else { 232 isFinalResponse = false; 233 } 234 break; 235 case TPM.TPM_COMPUTE_METRIC: 236 const metricRes = assertExists(rpc.metricResult) as ComputeMetricResult; 237 const pendingComputeMetric = assertExists( 238 this.pendingComputeMetrics.shift(), 239 ); 240 if (metricRes.error && metricRes.error.length > 0) { 241 const error = new QueryError( 242 `ComputeMetric() error: ${metricRes.error}`, 243 { 244 query: 'COMPUTE_METRIC', 245 }, 246 ); 247 pendingComputeMetric.reject(error); 248 } else { 249 const result = 250 metricRes.metricsAsPrototext || 251 metricRes.metricsAsJson || 252 metricRes.metrics || 253 ''; 254 pendingComputeMetric.resolve(result); 255 } 256 break; 257 case TPM.TPM_DISABLE_AND_READ_METATRACE: 258 const metatraceRes = assertExists( 259 rpc.metatrace, 260 ) as DisableAndReadMetatraceResult; 261 assertExists(this.pendingReadMetatrace).resolve(metatraceRes); 262 this.pendingReadMetatrace = undefined; 263 break; 264 default: 265 console.log( 266 'Unexpected TraceProcessor response received: ', 267 rpc.response, 268 ); 269 break; 270 } // switch(rpc.response); 271 272 if (isFinalResponse) { 273 this.loadingTracker.endLoading(); 274 } 275 } 276 277 // TraceProcessor methods below this point. 278 // The methods below are called by the various controllers in the UI and 279 // deal with marshalling / unmarshaling requests to/from TraceProcessor. 280 281 // Push trace data into the engine. The engine is supposed to automatically 282 // figure out the type of the trace (JSON vs Protobuf). 283 parse(data: Uint8Array): Promise<void> { 284 const asyncRes = defer<void>(); 285 this.pendingParses.push(asyncRes); 286 const rpc = TraceProcessorRpc.create(); 287 rpc.request = TPM.TPM_APPEND_TRACE_DATA; 288 rpc.appendTraceData = data; 289 this.rpcSendRequest(rpc); 290 return asyncRes; // Linearize with the worker. 291 } 292 293 // Notify the engine that we reached the end of the trace. 294 // Called after the last parse() call. 295 notifyEof(): Promise<void> { 296 const asyncRes = defer<void>(); 297 this.pendingEOFs.push(asyncRes); 298 const rpc = TraceProcessorRpc.create(); 299 rpc.request = TPM.TPM_FINALIZE_TRACE_DATA; 300 this.rpcSendRequest(rpc); 301 return asyncRes; // Linearize with the worker. 302 } 303 304 // Updates the TraceProcessor Config. This method creates a new 305 // TraceProcessor instance, so it should be called before passing any trace 306 // data. 307 resetTraceProcessor({ 308 cropTrackEvents, 309 ingestFtraceInRawTable, 310 analyzeTraceProtoContent, 311 ftraceDropUntilAllCpusValid, 312 }: TraceProcessorConfig): Promise<void> { 313 const asyncRes = defer<void>(); 314 this.pendingResetTraceProcessors.push(asyncRes); 315 const rpc = TraceProcessorRpc.create(); 316 rpc.request = TPM.TPM_RESET_TRACE_PROCESSOR; 317 const args = (rpc.resetTraceProcessorArgs = new ResetTraceProcessorArgs()); 318 args.dropTrackEventDataBefore = cropTrackEvents 319 ? ResetTraceProcessorArgs.DropTrackEventDataBefore 320 .TRACK_EVENT_RANGE_OF_INTEREST 321 : ResetTraceProcessorArgs.DropTrackEventDataBefore.NO_DROP; 322 args.ingestFtraceInRawTable = ingestFtraceInRawTable; 323 args.analyzeTraceProtoContent = analyzeTraceProtoContent; 324 args.ftraceDropUntilAllCpusValid = ftraceDropUntilAllCpusValid; 325 this.rpcSendRequest(rpc); 326 return asyncRes; 327 } 328 329 // Resets the trace processor state by destroying any table/views created by 330 // the UI after loading. 331 restoreInitialTables(): Promise<void> { 332 const asyncRes = defer<void>(); 333 this.pendingRestoreTables.push(asyncRes); 334 const rpc = TraceProcessorRpc.create(); 335 rpc.request = TPM.TPM_RESTORE_INITIAL_TABLES; 336 this.rpcSendRequest(rpc); 337 return asyncRes; // Linearize with the worker. 338 } 339 340 // Shorthand for sending a compute metrics request to the engine. 341 async computeMetric( 342 metrics: string[], 343 format: 'json' | 'prototext' | 'proto', 344 ): Promise<string | Uint8Array> { 345 const asyncRes = defer<string | Uint8Array>(); 346 this.pendingComputeMetrics.push(asyncRes); 347 const rpc = TraceProcessorRpc.create(); 348 rpc.request = TPM.TPM_COMPUTE_METRIC; 349 const args = (rpc.computeMetricArgs = new ComputeMetricArgs()); 350 args.metricNames = metrics; 351 if (format === 'json') { 352 args.format = ComputeMetricArgs.ResultFormat.JSON; 353 } else if (format === 'prototext') { 354 args.format = ComputeMetricArgs.ResultFormat.TEXTPROTO; 355 } else if (format === 'proto') { 356 args.format = ComputeMetricArgs.ResultFormat.BINARY_PROTOBUF; 357 } else { 358 throw new Error(`Unknown compute metric format ${format}`); 359 } 360 this.rpcSendRequest(rpc); 361 return asyncRes; 362 } 363 364 // Issues a streaming query and retrieve results in batches. 365 // The returned QueryResult object will be populated over time with batches 366 // of rows (each batch conveys ~128KB of data and a variable number of rows). 367 // The caller can decide whether to wait that all batches have been received 368 // (by awaiting the returned object or calling result.waitAllRows()) or handle 369 // the rows incrementally. 370 // 371 // Example usage: 372 // const res = engine.execute('SELECT foo, bar FROM table'); 373 // console.log(res.numRows()); // Will print 0 because we didn't await. 374 // await(res.waitAllRows()); 375 // console.log(res.numRows()); // Will print the total number of rows. 376 // 377 // for (const it = res.iter({foo: NUM, bar:STR}); it.valid(); it.next()) { 378 // console.log(it.foo, it.bar); 379 // } 380 // 381 // Optional |tag| (usually a component name) can be provided to allow 382 // attributing trace processor workload to different UI components. 383 private streamingQuery( 384 sqlQuery: string, 385 tag?: string, 386 ): Promise<QueryResult> & QueryResult { 387 const rpc = TraceProcessorRpc.create(); 388 rpc.request = TPM.TPM_QUERY_STREAMING; 389 rpc.queryArgs = new QueryArgs(); 390 rpc.queryArgs.sqlQuery = sqlQuery; 391 if (tag) { 392 rpc.queryArgs.tag = tag; 393 } 394 const result = createQueryResult({ 395 query: sqlQuery, 396 }); 397 this.pendingQueries.push(result); 398 this.rpcSendRequest(rpc); 399 return result; 400 } 401 402 // Wraps .streamingQuery(), captures errors and re-throws with current stack. 403 // 404 // Note: This function is less flexible than .execute() as it only returns a 405 // promise which must be unwrapped before the QueryResult may be accessed. 406 async query(sqlQuery: string, tag?: string): Promise<QueryResult> { 407 try { 408 return await this.streamingQuery(sqlQuery, tag); 409 } catch (e) { 410 // Replace the error's stack trace with the one from here 411 // Note: It seems only V8 can trace the stack up the promise chain, so its 412 // likely this stack won't be useful on !V8. 413 // See 414 // https://docs.google.com/document/d/13Sy_kBIJGP0XT34V1CV3nkWya4TwYx9L3Yv45LdGB6Q 415 captureStackTrace(e); 416 throw e; 417 } 418 } 419 420 async tryQuery( 421 sql: string, 422 tag?: string, 423 ): Promise<Result<QueryResult, Error>> { 424 try { 425 const result = await this.query(sql, tag); 426 return {success: true, result}; 427 } catch (error: unknown) { 428 // We know we only throw Error type objects so we can type assert safely 429 return {success: false, error: error as Error}; 430 } 431 } 432 433 isMetatracingEnabled(): boolean { 434 return this._isMetatracingEnabled; 435 } 436 437 enableMetatrace(categories?: MetatraceCategories) { 438 const rpc = TraceProcessorRpc.create(); 439 rpc.request = TPM.TPM_ENABLE_METATRACE; 440 if (categories) { 441 rpc.enableMetatraceArgs = new EnableMetatraceArgs(); 442 rpc.enableMetatraceArgs.categories = categories; 443 } 444 this._isMetatracingEnabled = true; 445 this.rpcSendRequest(rpc); 446 } 447 448 stopAndGetMetatrace(): Promise<DisableAndReadMetatraceResult> { 449 // If we are already finalising a metatrace, ignore the request. 450 if (this.pendingReadMetatrace) { 451 return Promise.reject(new Error('Already finalising a metatrace')); 452 } 453 454 const result = defer<DisableAndReadMetatraceResult>(); 455 456 const rpc = TraceProcessorRpc.create(); 457 rpc.request = TPM.TPM_DISABLE_AND_READ_METATRACE; 458 this._isMetatracingEnabled = false; 459 this.pendingReadMetatrace = result; 460 this.rpcSendRequest(rpc); 461 return result; 462 } 463 464 // Marshals the TraceProcessorRpc request arguments and sends the request 465 // to the concrete Engine (Wasm or HTTP). 466 private rpcSendRequest(rpc: TraceProcessorRpc) { 467 rpc.seq = this.txSeqId++; 468 // Each message is wrapped in a TraceProcessorRpcStream to add the varint 469 // preamble with the size, which allows tokenization on the other end. 470 const outerProto = TraceProcessorRpcStream.create(); 471 outerProto.msg.push(rpc); 472 const buf = TraceProcessorRpcStream.encode(outerProto).finish(); 473 this.loadingTracker.beginLoading(); 474 this.rpcSendRequestBytes(buf); 475 } 476 477 getProxy(tag: string): EngineProxy { 478 return new EngineProxy(this, tag); 479 } 480} 481 482// Lightweight engine proxy which annotates all queries with a tag 483export class EngineProxy implements Engine, Disposable { 484 private engine: EngineBase; 485 private tag: string; 486 private _isAlive: boolean; 487 488 constructor(engine: EngineBase, tag: string) { 489 this.engine = engine; 490 this.tag = tag; 491 this._isAlive = true; 492 } 493 494 async query(query: string, tag?: string): Promise<QueryResult> { 495 if (!this._isAlive) { 496 throw new Error(`EngineProxy ${this.tag} was disposed.`); 497 } 498 return await this.engine.query(query, tag); 499 } 500 501 async tryQuery( 502 query: string, 503 tag?: string, 504 ): Promise<Result<QueryResult, Error>> { 505 if (!this._isAlive) { 506 return { 507 success: false, 508 error: new Error(`EngineProxy ${this.tag} was disposed.`), 509 }; 510 } 511 return await this.engine.tryQuery(query, tag); 512 } 513 514 async computeMetric( 515 metrics: string[], 516 format: 'json' | 'prototext' | 'proto', 517 ): Promise<string | Uint8Array> { 518 if (!this._isAlive) { 519 return Promise.reject(new Error(`EngineProxy ${this.tag} was disposed.`)); 520 } 521 return this.engine.computeMetric(metrics, format); 522 } 523 524 get engineId(): string { 525 return this.engine.id; 526 } 527 528 dispose() { 529 this._isAlive = false; 530 } 531} 532 533// Capture stack trace and attach to the given error object 534function captureStackTrace(e: Error): void { 535 const stack = new Error().stack; 536 if ('captureStackTrace' in Error) { 537 // V8 specific 538 Error.captureStackTrace(e, captureStackTrace); 539 } else { 540 // Generic 541 Object.defineProperty(e, 'stack', { 542 value: stack, 543 writable: true, 544 configurable: true, 545 }); 546 } 547} 548