1// Copyright (C) 2018 The Android Open Source Project 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15import protos from '../protos'; 16import {defer, Deferred} from '../base/deferred'; 17import {assertExists, assertTrue} from '../base/logging'; 18import {ProtoRingBuffer} from './proto_ring_buffer'; 19import { 20 createQueryResult, 21 QueryError, 22 QueryResult, 23 WritableQueryResult, 24} from './query_result'; 25import TPM = protos.TraceProcessorRpc.TraceProcessorMethod; 26import {exists} from '../base/utils'; 27import {errResult, okResult, Result} from '../base/result'; 28 29export type EngineMode = 'WASM' | 'HTTP_RPC'; 30export type NewEngineMode = 'USE_HTTP_RPC_IF_AVAILABLE' | 'FORCE_BUILTIN_WASM'; 31 32// This is used to skip the decoding of queryResult from protobufjs and deal 33// with it ourselves. See the comment below around `QueryResult.decode = ...`. 34interface QueryResultBypass { 35 rawQueryResult: Uint8Array; 36} 37 38export interface TraceProcessorConfig { 39 cropTrackEvents: boolean; 40 ingestFtraceInRawTable: boolean; 41 analyzeTraceProtoContent: boolean; 42 ftraceDropUntilAllCpusValid: boolean; 43} 44 45const QUERY_LOG_BUFFER_SIZE = 100; 46 47interface QueryLog { 48 readonly tag?: string; 49 readonly query: string; 50 readonly startTime: number; 51 readonly endTime?: number; 52 readonly success?: boolean; 53} 54 55export interface Engine { 56 readonly mode: EngineMode; 57 readonly engineId: string; 58 59 /** 60 * A list of the most recent queries along with their start times, end times 61 * and success status (if completed). 62 */ 63 readonly queryLog: ReadonlyArray<QueryLog>; 64 65 /** 66 * Execute a query against the database, returning a promise that resolves 67 * when the query has completed but rejected when the query fails for whatever 68 * reason. On success, the promise will only resolve once all the resulting 69 * rows have been received. 70 * 71 * The promise will be rejected if the query fails. 72 * 73 * @param sql The query to execute. 74 * @param tag An optional tag used to trace the origin of the query. 75 */ 76 query(sql: string, tag?: string): Promise<QueryResult>; 77 78 /** 79 * Execute a query against the database, returning a promise that resolves 80 * when the query has completed or failed. The promise will never get 81 * rejected, it will always successfully resolve. Use the returned wrapper 82 * object to determine whether the query completed successfully. 83 * 84 * The promise will only resolve once all the resulting rows have been 85 * received. 86 * 87 * @param sql The query to execute. 88 * @param tag An optional tag used to trace the origin of the query. 89 */ 90 tryQuery(sql: string, tag?: string): Promise<Result<QueryResult>>; 91 92 /** 93 * Execute one or more metric and get the result. 94 * 95 * @param metrics The metrics to run. 96 * @param format The format of the response. 97 */ 98 computeMetric( 99 metrics: string[], 100 format: 'json' | 'prototext' | 'proto', 101 ): Promise<string | Uint8Array>; 102 103 enableMetatrace(categories?: protos.MetatraceCategories): void; 104 stopAndGetMetatrace(): Promise<protos.DisableAndReadMetatraceResult>; 105 106 analyzeStructuredQuery( 107 structuredQueries: protos.PerfettoSqlStructuredQuery[], 108 ): Promise<protos.AnalyzeStructuredQueryResult>; 109 110 getProxy(tag: string): EngineProxy; 111 readonly numRequestsPending: number; 112 readonly failed: string | undefined; 113} 114 115// Abstract interface of a trace proccessor. 116// This is the TypeScript equivalent of src/trace_processor/rpc.h. 117// There are two concrete implementations: 118// 1. WasmEngineProxy: creates a Wasm module and interacts over postMessage(). 119// 2. HttpRpcEngine: connects to an external `trace_processor_shell --httpd`. 120// and interacts via fetch(). 121// In both cases, we have a byte-oriented pipe to interact with TraceProcessor. 122// The derived class is only expected to deal with these two functions: 123// 1. Implement the abstract rpcSendRequestBytes() function, sending the 124// proto-encoded TraceProcessorRpc requests to the TraceProcessor instance. 125// 2. Call onRpcResponseBytes() when response data is received. 126export abstract class EngineBase implements Engine, Disposable { 127 abstract readonly id: string; 128 abstract readonly mode: EngineMode; 129 private txSeqId = 0; 130 private rxSeqId = 0; 131 private rxBuf = new ProtoRingBuffer(); 132 private pendingParses = new Array<Deferred<void>>(); 133 private pendingEOFs = new Array<Deferred<void>>(); 134 private pendingResetTraceProcessors = new Array<Deferred<void>>(); 135 private pendingQueries = new Array<WritableQueryResult>(); 136 private pendingRestoreTables = new Array<Deferred<void>>(); 137 private pendingComputeMetrics = new Array<Deferred<string | Uint8Array>>(); 138 private pendingReadMetatrace?: Deferred<protos.DisableAndReadMetatraceResult>; 139 private pendingRegisterSqlPackage?: Deferred<void>; 140 private pendingAnalyzeStructuredQueries?: Deferred<protos.AnalyzeStructuredQueryResult>; 141 private _isMetatracingEnabled = false; 142 private _numRequestsPending = 0; 143 private _failed: string | undefined = undefined; 144 private _queryLog: Array<QueryLog> = []; 145 146 get queryLog(): ReadonlyArray<QueryLog> { 147 return this._queryLog; 148 } 149 150 // TraceController sets this to raf.scheduleFullRedraw(). 151 onResponseReceived?: () => void; 152 153 // Called to send data to the TraceProcessor instance. This turns into a 154 // postMessage() or a HTTP request, depending on the Engine implementation. 155 abstract rpcSendRequestBytes(data: Uint8Array): void; 156 157 // Called when an inbound message is received by the Engine implementation 158 // (e.g. onmessage for the Wasm case, on when HTTP replies are received for 159 // the HTTP+RPC case). 160 onRpcResponseBytes(dataWillBeRetained: Uint8Array) { 161 // Note: when hitting the fastpath inside ProtoRingBuffer, the |data| buffer 162 // is returned back by readMessage() (% subarray()-ing it) and held onto by 163 // other classes (e.g., QueryResult). For both fetch() and Wasm we are fine 164 // because every response creates a new buffer. 165 this.rxBuf.append(dataWillBeRetained); 166 for (;;) { 167 const msg = this.rxBuf.readMessage(); 168 if (msg === undefined) break; 169 this.onRpcResponseMessage(msg); 170 } 171 } 172 173 // Parses a response message. 174 // |rpcMsgEncoded| is a sub-array to to the start of a TraceProcessorRpc 175 // proto-encoded message (without the proto preamble and varint size). 176 private onRpcResponseMessage(rpcMsgEncoded: Uint8Array) { 177 // Here we override the protobufjs-generated code to skip the parsing of the 178 // new streaming QueryResult and instead passing it through like a buffer. 179 // This is the overall problem: All trace processor responses are wrapped 180 // into a TraceProcessorRpc proto message. In all cases % 181 // TPM_QUERY_STREAMING, we want protobufjs to decode the proto bytes and 182 // give us a structured object. In the case of TPM_QUERY_STREAMING, instead, 183 // we want to deal with the proto parsing ourselves using the new 184 // QueryResult.appendResultBatch() method, because that handled streaming 185 // results more efficiently and skips several copies. 186 // By overriding the decode method below, we achieve two things: 187 // 1. We avoid protobufjs decoding the TraceProcessorRpc.query_result field. 188 // 2. We stash (a view of) the original buffer into the |rawQueryResult| so 189 // the `case TPM_QUERY_STREAMING` below can take it. 190 protos.QueryResult.decode = (reader: protobuf.Reader, length: number) => { 191 const res = protos.QueryResult.create() as {} as QueryResultBypass; 192 res.rawQueryResult = reader.buf.subarray(reader.pos, reader.pos + length); 193 // All this works only if protobufjs returns the original ArrayBuffer 194 // from |rpcMsgEncoded|. It should be always the case given the 195 // current implementation. This check mainly guards against future 196 // behavioral changes of protobufjs. We don't want to accidentally 197 // hold onto some internal protobufjs buffer. We are fine holding 198 // onto |rpcMsgEncoded| because those come from ProtoRingBuffer which 199 // is buffer-retention-friendly. 200 assertTrue(res.rawQueryResult.buffer === rpcMsgEncoded.buffer); 201 reader.pos += length; 202 return res as {} as protos.QueryResult; 203 }; 204 205 const rpc = protos.TraceProcessorRpc.decode(rpcMsgEncoded); 206 207 if (rpc.fatalError !== undefined && rpc.fatalError.length > 0) { 208 this.fail(`${rpc.fatalError}`); 209 } 210 211 // Allow restarting sequences from zero (when reloading the browser). 212 if (rpc.seq !== this.rxSeqId + 1 && this.rxSeqId !== 0 && rpc.seq !== 0) { 213 // "(ERR:rpc_seq)" is intercepted by error_dialog.ts to show a more 214 // graceful and actionable error. 215 this.fail( 216 `RPC sequence id mismatch ` + 217 `cur=${rpc.seq} last=${this.rxSeqId} (ERR:rpc_seq)`, 218 ); 219 } 220 221 this.rxSeqId = rpc.seq; 222 223 let isFinalResponse = true; 224 225 switch (rpc.response) { 226 case TPM.TPM_APPEND_TRACE_DATA: { 227 const appendResult = assertExists(rpc.appendResult); 228 const pendingPromise = assertExists(this.pendingParses.shift()); 229 if (exists(appendResult.error) && appendResult.error.length > 0) { 230 pendingPromise.reject(appendResult.error); 231 } else { 232 pendingPromise.resolve(); 233 } 234 break; 235 } 236 case TPM.TPM_FINALIZE_TRACE_DATA: { 237 const finalizeResult = assertExists(rpc.finalizeDataResult); 238 const pendingPromise = assertExists(this.pendingEOFs.shift()); 239 if (exists(finalizeResult.error) && finalizeResult.error.length > 0) { 240 pendingPromise.reject(finalizeResult.error); 241 } else { 242 pendingPromise.resolve(); 243 } 244 break; 245 } 246 case TPM.TPM_RESET_TRACE_PROCESSOR: 247 assertExists(this.pendingResetTraceProcessors.shift()).resolve(); 248 break; 249 case TPM.TPM_RESTORE_INITIAL_TABLES: 250 assertExists(this.pendingRestoreTables.shift()).resolve(); 251 break; 252 case TPM.TPM_QUERY_STREAMING: 253 const qRes = assertExists(rpc.queryResult) as {} as QueryResultBypass; 254 const pendingQuery = assertExists(this.pendingQueries[0]); 255 pendingQuery.appendResultBatch(qRes.rawQueryResult); 256 if (pendingQuery.isComplete()) { 257 this.pendingQueries.shift(); 258 } else { 259 isFinalResponse = false; 260 } 261 break; 262 case TPM.TPM_COMPUTE_METRIC: 263 const metricRes = assertExists( 264 rpc.metricResult, 265 ) as protos.ComputeMetricResult; 266 const pendingComputeMetric = assertExists( 267 this.pendingComputeMetrics.shift(), 268 ); 269 if (exists(metricRes.error) && metricRes.error.length > 0) { 270 const error = new QueryError( 271 `ComputeMetric() error: ${metricRes.error}`, 272 { 273 query: 'COMPUTE_METRIC', 274 }, 275 ); 276 pendingComputeMetric.reject(error); 277 } else { 278 const result = 279 metricRes.metricsAsPrototext ?? 280 metricRes.metricsAsJson ?? 281 metricRes.metrics ?? 282 ''; 283 pendingComputeMetric.resolve(result); 284 } 285 break; 286 case TPM.TPM_DISABLE_AND_READ_METATRACE: 287 const metatraceRes = assertExists( 288 rpc.metatrace, 289 ) as protos.DisableAndReadMetatraceResult; 290 assertExists(this.pendingReadMetatrace).resolve(metatraceRes); 291 this.pendingReadMetatrace = undefined; 292 break; 293 case TPM.TPM_REGISTER_SQL_PACKAGE: 294 const registerResult = assertExists(rpc.registerSqlPackageResult); 295 const res = assertExists(this.pendingRegisterSqlPackage); 296 if (exists(registerResult.error) && registerResult.error.length > 0) { 297 res.reject(registerResult.error); 298 } else { 299 res.resolve(); 300 } 301 break; 302 case TPM.TPM_ANALYZE_STRUCTURED_QUERY: 303 const analyzeRes = assertExists( 304 rpc.analyzeStructuredQueryResult, 305 ) as {} as protos.AnalyzeStructuredQueryResult; 306 const x = assertExists(this.pendingAnalyzeStructuredQueries); 307 x.resolve(analyzeRes); 308 this.pendingAnalyzeStructuredQueries = undefined; 309 break; 310 default: 311 console.log( 312 'Unexpected TraceProcessor response received: ', 313 rpc.response, 314 ); 315 break; 316 } // switch(rpc.response); 317 318 if (isFinalResponse) { 319 --this._numRequestsPending; 320 } 321 322 this.onResponseReceived?.(); 323 } 324 325 // TraceProcessor methods below this point. 326 // The methods below are called by the various controllers in the UI and 327 // deal with marshalling / unmarshaling requests to/from TraceProcessor. 328 329 // Push trace data into the engine. The engine is supposed to automatically 330 // figure out the type of the trace (JSON vs Protobuf). 331 parse(data: Uint8Array): Promise<void> { 332 const asyncRes = defer<void>(); 333 this.pendingParses.push(asyncRes); 334 const rpc = protos.TraceProcessorRpc.create(); 335 rpc.request = TPM.TPM_APPEND_TRACE_DATA; 336 rpc.appendTraceData = data; 337 this.rpcSendRequest(rpc); 338 return asyncRes; // Linearize with the worker. 339 } 340 341 // Notify the engine that we reached the end of the trace. 342 // Called after the last parse() call. 343 notifyEof(): Promise<void> { 344 const asyncRes = defer<void>(); 345 this.pendingEOFs.push(asyncRes); 346 const rpc = protos.TraceProcessorRpc.create(); 347 rpc.request = TPM.TPM_FINALIZE_TRACE_DATA; 348 this.rpcSendRequest(rpc); 349 return asyncRes; // Linearize with the worker. 350 } 351 352 // Updates the TraceProcessor Config. This method creates a new 353 // TraceProcessor instance, so it should be called before passing any trace 354 // data. 355 resetTraceProcessor({ 356 cropTrackEvents, 357 ingestFtraceInRawTable, 358 analyzeTraceProtoContent, 359 ftraceDropUntilAllCpusValid, 360 }: TraceProcessorConfig): Promise<void> { 361 const asyncRes = defer<void>(); 362 this.pendingResetTraceProcessors.push(asyncRes); 363 const rpc = protos.TraceProcessorRpc.create(); 364 rpc.request = TPM.TPM_RESET_TRACE_PROCESSOR; 365 const args = (rpc.resetTraceProcessorArgs = 366 new protos.ResetTraceProcessorArgs()); 367 args.dropTrackEventDataBefore = cropTrackEvents 368 ? protos.ResetTraceProcessorArgs.DropTrackEventDataBefore 369 .TRACK_EVENT_RANGE_OF_INTEREST 370 : protos.ResetTraceProcessorArgs.DropTrackEventDataBefore.NO_DROP; 371 args.ingestFtraceInRawTable = ingestFtraceInRawTable; 372 args.analyzeTraceProtoContent = analyzeTraceProtoContent; 373 args.ftraceDropUntilAllCpusValid = ftraceDropUntilAllCpusValid; 374 this.rpcSendRequest(rpc); 375 return asyncRes; 376 } 377 378 // Resets the trace processor state by destroying any table/views created by 379 // the UI after loading. 380 restoreInitialTables(): Promise<void> { 381 const asyncRes = defer<void>(); 382 this.pendingRestoreTables.push(asyncRes); 383 const rpc = protos.TraceProcessorRpc.create(); 384 rpc.request = TPM.TPM_RESTORE_INITIAL_TABLES; 385 this.rpcSendRequest(rpc); 386 return asyncRes; // Linearize with the worker. 387 } 388 389 // Shorthand for sending a compute metrics request to the engine. 390 async computeMetric( 391 metrics: string[], 392 format: 'json' | 'prototext' | 'proto', 393 ): Promise<string | Uint8Array> { 394 const asyncRes = defer<string | Uint8Array>(); 395 this.pendingComputeMetrics.push(asyncRes); 396 const rpc = protos.TraceProcessorRpc.create(); 397 rpc.request = TPM.TPM_COMPUTE_METRIC; 398 const args = (rpc.computeMetricArgs = new protos.ComputeMetricArgs()); 399 args.metricNames = metrics; 400 if (format === 'json') { 401 args.format = protos.ComputeMetricArgs.ResultFormat.JSON; 402 } else if (format === 'prototext') { 403 args.format = protos.ComputeMetricArgs.ResultFormat.TEXTPROTO; 404 } else if (format === 'proto') { 405 args.format = protos.ComputeMetricArgs.ResultFormat.BINARY_PROTOBUF; 406 } else { 407 throw new Error(`Unknown compute metric format ${format}`); 408 } 409 this.rpcSendRequest(rpc); 410 return asyncRes; 411 } 412 413 // Issues a streaming query and retrieve results in batches. 414 // The returned QueryResult object will be populated over time with batches 415 // of rows (each batch conveys ~128KB of data and a variable number of rows). 416 // The caller can decide whether to wait that all batches have been received 417 // (by awaiting the returned object or calling result.waitAllRows()) or handle 418 // the rows incrementally. 419 // 420 // Example usage: 421 // const res = engine.execute('SELECT foo, bar FROM table'); 422 // console.log(res.numRows()); // Will print 0 because we didn't await. 423 // await(res.waitAllRows()); 424 // console.log(res.numRows()); // Will print the total number of rows. 425 // 426 // for (const it = res.iter({foo: NUM, bar:STR}); it.valid(); it.next()) { 427 // console.log(it.foo, it.bar); 428 // } 429 // 430 // Optional |tag| (usually a component name) can be provided to allow 431 // attributing trace processor workload to different UI components. 432 private streamingQuery( 433 sqlQuery: string, 434 tag?: string, 435 ): Promise<QueryResult> & QueryResult { 436 const rpc = protos.TraceProcessorRpc.create(); 437 rpc.request = TPM.TPM_QUERY_STREAMING; 438 rpc.queryArgs = new protos.QueryArgs(); 439 rpc.queryArgs.sqlQuery = sqlQuery; 440 if (tag) { 441 rpc.queryArgs.tag = tag; 442 } 443 const result = createQueryResult({ 444 query: sqlQuery, 445 }); 446 this.pendingQueries.push(result); 447 this.rpcSendRequest(rpc); 448 return result; 449 } 450 451 private logQueryStart( 452 query: string, 453 tag?: string, 454 ): { 455 endTime?: number; 456 success?: boolean; 457 } { 458 const startTime = performance.now(); 459 const queryLog: QueryLog = {query, tag, startTime}; 460 this._queryLog.push(queryLog); 461 if (this._queryLog.length > QUERY_LOG_BUFFER_SIZE) { 462 this._queryLog.shift(); 463 } 464 return queryLog; 465 } 466 467 // Wraps .streamingQuery(), captures errors and re-throws with current stack. 468 // 469 // Note: This function is less flexible than .execute() as it only returns a 470 // promise which must be unwrapped before the QueryResult may be accessed. 471 async query(sqlQuery: string, tag?: string): Promise<QueryResult> { 472 const queryLog = this.logQueryStart(sqlQuery); 473 try { 474 const result = await this.streamingQuery(sqlQuery, tag); 475 queryLog.success = true; 476 return result; 477 } catch (e) { 478 // Replace the error's stack trace with the one from here 479 // Note: It seems only V8 can trace the stack up the promise chain, so its 480 // likely this stack won't be useful on !V8. 481 // See 482 // https://docs.google.com/document/d/13Sy_kBIJGP0XT34V1CV3nkWya4TwYx9L3Yv45LdGB6Q 483 captureStackTrace(e); 484 queryLog.success = false; 485 throw e; 486 } finally { 487 queryLog.endTime = performance.now(); 488 } 489 } 490 491 async tryQuery(sql: string, tag?: string): Promise<Result<QueryResult>> { 492 try { 493 const result = await this.query(sql, tag); 494 return okResult(result); 495 } catch (error) { 496 const msg = 'message' in error ? `${error.message}` : `${error}`; 497 return errResult(msg); 498 } 499 } 500 501 isMetatracingEnabled(): boolean { 502 return this._isMetatracingEnabled; 503 } 504 505 enableMetatrace(categories?: protos.MetatraceCategories) { 506 const rpc = protos.TraceProcessorRpc.create(); 507 rpc.request = TPM.TPM_ENABLE_METATRACE; 508 if ( 509 categories !== undefined && 510 categories !== protos.MetatraceCategories.NONE 511 ) { 512 rpc.enableMetatraceArgs = new protos.EnableMetatraceArgs(); 513 rpc.enableMetatraceArgs.categories = categories; 514 } 515 this._isMetatracingEnabled = true; 516 this.rpcSendRequest(rpc); 517 } 518 519 stopAndGetMetatrace(): Promise<protos.DisableAndReadMetatraceResult> { 520 // If we are already finalising a metatrace, ignore the request. 521 if (this.pendingReadMetatrace) { 522 return Promise.reject(new Error('Already finalising a metatrace')); 523 } 524 525 const result = defer<protos.DisableAndReadMetatraceResult>(); 526 527 const rpc = protos.TraceProcessorRpc.create(); 528 rpc.request = TPM.TPM_DISABLE_AND_READ_METATRACE; 529 this._isMetatracingEnabled = false; 530 this.pendingReadMetatrace = result; 531 this.rpcSendRequest(rpc); 532 return result; 533 } 534 535 registerSqlPackages(pkg: { 536 name: string; 537 modules: {name: string; sql: string}[]; 538 }): Promise<void> { 539 if (this.pendingRegisterSqlPackage) { 540 return Promise.reject(new Error('Already registering SQL package')); 541 } 542 543 const result = defer<void>(); 544 545 const rpc = protos.TraceProcessorRpc.create(); 546 rpc.request = TPM.TPM_REGISTER_SQL_PACKAGE; 547 const args = (rpc.registerSqlPackageArgs = 548 new protos.RegisterSqlPackageArgs()); 549 args.packageName = pkg.name; 550 args.modules = pkg.modules; 551 args.allowOverride = true; 552 this.pendingRegisterSqlPackage = result; 553 this.rpcSendRequest(rpc); 554 return result; 555 } 556 557 analyzeStructuredQuery( 558 structuredQueries: protos.PerfettoSqlStructuredQuery[], 559 ): Promise<protos.AnalyzeStructuredQueryResult> { 560 if (this.pendingAnalyzeStructuredQueries) { 561 return Promise.reject(new Error('Already analyzing structured queries')); 562 } 563 const result = defer<protos.AnalyzeStructuredQueryResult>(); 564 const rpc = protos.TraceProcessorRpc.create(); 565 rpc.request = TPM.TPM_ANALYZE_STRUCTURED_QUERY; 566 const args = (rpc.analyzeStructuredQueryArgs = 567 new protos.AnalyzeStructuredQueryArgs()); 568 args.queries = structuredQueries; 569 this.pendingAnalyzeStructuredQueries = result; 570 this.rpcSendRequest(rpc); 571 return result; 572 } 573 574 // Marshals the TraceProcessorRpc request arguments and sends the request 575 // to the concrete Engine (Wasm or HTTP). 576 private rpcSendRequest(rpc: protos.TraceProcessorRpc) { 577 rpc.seq = this.txSeqId++; 578 // Each message is wrapped in a TraceProcessorRpcStream to add the varint 579 // preamble with the size, which allows tokenization on the other end. 580 const outerProto = protos.TraceProcessorRpcStream.create(); 581 outerProto.msg.push(rpc); 582 const buf = protos.TraceProcessorRpcStream.encode(outerProto).finish(); 583 ++this._numRequestsPending; 584 this.rpcSendRequestBytes(buf); 585 } 586 587 get engineId(): string { 588 return this.id; 589 } 590 591 get numRequestsPending(): number { 592 return this._numRequestsPending; 593 } 594 595 getProxy(tag: string): EngineProxy { 596 return new EngineProxy(this, tag); 597 } 598 599 protected fail(reason: string) { 600 this._failed = reason; 601 throw new Error(reason); 602 } 603 604 get failed(): string | undefined { 605 return this._failed; 606 } 607 608 abstract [Symbol.dispose](): void; 609} 610 611// Lightweight engine proxy which annotates all queries with a tag 612export class EngineProxy implements Engine, Disposable { 613 private engine: EngineBase; 614 private tag: string; 615 private disposed = false; 616 617 get queryLog() { 618 return this.engine.queryLog; 619 } 620 621 constructor(engine: EngineBase, tag: string) { 622 this.engine = engine; 623 this.tag = tag; 624 } 625 626 async query(query: string, tag?: string): Promise<QueryResult> { 627 if (this.disposed) { 628 // If we are disposed (the trace was closed), return an empty QueryResult 629 // that will never see any data or EOF. We can't do otherwise or it will 630 // cause crashes to code calling firstRow() and expecting data. 631 return createQueryResult({query}); 632 } 633 return await this.engine.query(query, tag); 634 } 635 636 async tryQuery(query: string, tag?: string): Promise<Result<QueryResult>> { 637 if (this.disposed) { 638 return errResult(`EngineProxy ${this.tag} was disposed`); 639 } 640 return await this.engine.tryQuery(query, tag); 641 } 642 643 async computeMetric( 644 metrics: string[], 645 format: 'json' | 'prototext' | 'proto', 646 ): Promise<string | Uint8Array> { 647 if (this.disposed) { 648 return defer<string>(); // Return a promise that will hang forever. 649 } 650 return this.engine.computeMetric(metrics, format); 651 } 652 653 enableMetatrace(categories?: protos.MetatraceCategories): void { 654 this.engine.enableMetatrace(categories); 655 } 656 657 stopAndGetMetatrace(): Promise<protos.DisableAndReadMetatraceResult> { 658 return this.engine.stopAndGetMetatrace(); 659 } 660 661 analyzeStructuredQuery( 662 structuredQueries: protos.PerfettoSqlStructuredQuery[], 663 ): Promise<protos.AnalyzeStructuredQueryResult> { 664 return this.engine.analyzeStructuredQuery(structuredQueries); 665 } 666 667 get engineId(): string { 668 return this.engine.id; 669 } 670 671 getProxy(tag: string): EngineProxy { 672 return this.engine.getProxy(`${this.tag}/${tag}`); 673 } 674 675 get numRequestsPending() { 676 return this.engine.numRequestsPending; 677 } 678 679 get mode() { 680 return this.engine.mode; 681 } 682 683 get failed() { 684 return this.engine.failed; 685 } 686 687 [Symbol.dispose]() { 688 this.disposed = true; 689 } 690} 691 692// Capture stack trace and attach to the given error object 693function captureStackTrace(e: Error): void { 694 const stack = new Error().stack; 695 if ('captureStackTrace' in Error) { 696 // V8 specific 697 Error.captureStackTrace(e, captureStackTrace); 698 } else { 699 // Generic 700 Object.defineProperty(e, 'stack', { 701 value: stack, 702 writable: true, 703 configurable: true, 704 }); 705 } 706} 707 708// A convenience interface to inject the App in Mithril components. 709export interface EngineAttrs { 710 engine: Engine; 711} 712