• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright (C) 2018 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15import {defer, Deferred} from '../base/deferred';
16import {assertExists, assertTrue} from '../base/logging';
17import {
18  ComputeMetricArgs,
19  ComputeMetricResult,
20  DisableAndReadMetatraceResult,
21  EnableMetatraceArgs,
22  MetatraceCategories,
23  QueryArgs,
24  QueryResult as ProtoQueryResult,
25  ResetTraceProcessorArgs,
26  TraceProcessorRpc,
27  TraceProcessorRpcStream,
28} from '../protos';
29
30import {ProtoRingBuffer} from './proto_ring_buffer';
31import {
32  createQueryResult,
33  QueryError,
34  QueryResult,
35  WritableQueryResult,
36} from './query_result';
37
38import TPM = TraceProcessorRpc.TraceProcessorMethod;
39import {Disposable} from '../base/disposable';
40import {Result} from '../base/utils';
41
42export interface LoadingTracker {
43  beginLoading(): void;
44  endLoading(): void;
45}
46
47export class NullLoadingTracker implements LoadingTracker {
48  beginLoading(): void {}
49  endLoading(): void {}
50}
51
52// This is used to skip the decoding of queryResult from protobufjs and deal
53// with it ourselves. See the comment below around `QueryResult.decode = ...`.
54interface QueryResultBypass {
55  rawQueryResult: Uint8Array;
56}
57
58export interface TraceProcessorConfig {
59  cropTrackEvents: boolean;
60  ingestFtraceInRawTable: boolean;
61  analyzeTraceProtoContent: boolean;
62  ftraceDropUntilAllCpusValid: boolean;
63}
64
65export interface Engine {
66  /**
67   * Execute a query against the database, returning a promise that resolves
68   * when the query has completed but rejected when the query fails for whatever
69   * reason. On success, the promise will only resolve once all the resulting
70   * rows have been received.
71   *
72   * The promise will be rejected if the query fails.
73   *
74   * @param sql The query to execute.
75   * @param tag An optional tag used to trace the origin of the query.
76   */
77  query(sql: string, tag?: string): Promise<QueryResult>;
78
79  /**
80   * Execute a query against the database, returning a promise that resolves
81   * when the query has completed or failed. The promise will never get
82   * rejected, it will always successfully resolve. Use the returned wrapper
83   * object to determine whether the query completed successfully.
84   *
85   * The promise will only resolve once all the resulting rows have been
86   * received.
87   *
88   * @param sql The query to execute.
89   * @param tag An optional tag used to trace the origin of the query.
90   */
91  tryQuery(sql: string, tag?: string): Promise<Result<QueryResult, Error>>;
92
93  /**
94   * Execute one or more metric and get the result.
95   *
96   * @param metrics The metrics to run.
97   * @param format The format of the response.
98   */
99  computeMetric(
100    metrics: string[],
101    format: 'json' | 'prototext' | 'proto',
102  ): Promise<string | Uint8Array>;
103}
104
105// Abstract interface of a trace proccessor.
106// This is the TypeScript equivalent of src/trace_processor/rpc.h.
107// There are two concrete implementations:
108//   1. WasmEngineProxy: creates a Wasm module and interacts over postMessage().
109//   2. HttpRpcEngine: connects to an external `trace_processor_shell --httpd`.
110//      and interacts via fetch().
111// In both cases, we have a byte-oriented pipe to interact with TraceProcessor.
112// The derived class is only expected to deal with these two functions:
113// 1. Implement the abstract rpcSendRequestBytes() function, sending the
114//    proto-encoded TraceProcessorRpc requests to the TraceProcessor instance.
115// 2. Call onRpcResponseBytes() when response data is received.
116export abstract class EngineBase implements Engine {
117  abstract readonly id: string;
118  private loadingTracker: LoadingTracker;
119  private txSeqId = 0;
120  private rxSeqId = 0;
121  private rxBuf = new ProtoRingBuffer();
122  private pendingParses = new Array<Deferred<void>>();
123  private pendingEOFs = new Array<Deferred<void>>();
124  private pendingResetTraceProcessors = new Array<Deferred<void>>();
125  private pendingQueries = new Array<WritableQueryResult>();
126  private pendingRestoreTables = new Array<Deferred<void>>();
127  private pendingComputeMetrics = new Array<Deferred<string | Uint8Array>>();
128  private pendingReadMetatrace?: Deferred<DisableAndReadMetatraceResult>;
129  private _isMetatracingEnabled = false;
130
131  constructor(tracker?: LoadingTracker) {
132    this.loadingTracker = tracker ? tracker : new NullLoadingTracker();
133  }
134
135  // Called to send data to the TraceProcessor instance. This turns into a
136  // postMessage() or a HTTP request, depending on the Engine implementation.
137  abstract rpcSendRequestBytes(data: Uint8Array): void;
138
139  // Called when an inbound message is received by the Engine implementation
140  // (e.g. onmessage for the Wasm case, on when HTTP replies are received for
141  // the HTTP+RPC case).
142  onRpcResponseBytes(dataWillBeRetained: Uint8Array) {
143    // Note: when hitting the fastpath inside ProtoRingBuffer, the |data| buffer
144    // is returned back by readMessage() (% subarray()-ing it) and held onto by
145    // other classes (e.g., QueryResult). For both fetch() and Wasm we are fine
146    // because every response creates a new buffer.
147    this.rxBuf.append(dataWillBeRetained);
148    for (;;) {
149      const msg = this.rxBuf.readMessage();
150      if (msg === undefined) break;
151      this.onRpcResponseMessage(msg);
152    }
153  }
154
155  // Parses a response message.
156  // |rpcMsgEncoded| is a sub-array to to the start of a TraceProcessorRpc
157  // proto-encoded message (without the proto preamble and varint size).
158  private onRpcResponseMessage(rpcMsgEncoded: Uint8Array) {
159    // Here we override the protobufjs-generated code to skip the parsing of the
160    // new streaming QueryResult and instead passing it through like a buffer.
161    // This is the overall problem: All trace processor responses are wrapped
162    // into a perfetto.protos.TraceProcessorRpc proto message. In all cases %
163    // TPM_QUERY_STREAMING, we want protobufjs to decode the proto bytes and
164    // give us a structured object. In the case of TPM_QUERY_STREAMING, instead,
165    // we want to deal with the proto parsing ourselves using the new
166    // QueryResult.appendResultBatch() method, because that handled streaming
167    // results more efficiently and skips several copies.
168    // By overriding the decode method below, we achieve two things:
169    // 1. We avoid protobufjs decoding the TraceProcessorRpc.query_result field.
170    // 2. We stash (a view of) the original buffer into the |rawQueryResult| so
171    //    the `case TPM_QUERY_STREAMING` below can take it.
172    ProtoQueryResult.decode = (reader: protobuf.Reader, length: number) => {
173      const res = ProtoQueryResult.create() as {} as QueryResultBypass;
174      res.rawQueryResult = reader.buf.subarray(reader.pos, reader.pos + length);
175      // All this works only if protobufjs returns the original ArrayBuffer
176      // from |rpcMsgEncoded|. It should be always the case given the
177      // current implementation. This check mainly guards against future
178      // behavioral changes of protobufjs. We don't want to accidentally
179      // hold onto some internal protobufjs buffer. We are fine holding
180      // onto |rpcMsgEncoded| because those come from ProtoRingBuffer which
181      // is buffer-retention-friendly.
182      assertTrue(res.rawQueryResult.buffer === rpcMsgEncoded.buffer);
183      reader.pos += length;
184      return res as {} as ProtoQueryResult;
185    };
186
187    const rpc = TraceProcessorRpc.decode(rpcMsgEncoded);
188
189    if (rpc.fatalError !== undefined && rpc.fatalError.length > 0) {
190      throw new Error(`${rpc.fatalError}`);
191    }
192
193    // Allow restarting sequences from zero (when reloading the browser).
194    if (rpc.seq !== this.rxSeqId + 1 && this.rxSeqId !== 0 && rpc.seq !== 0) {
195      // "(ERR:rpc_seq)" is intercepted by error_dialog.ts to show a more
196      // graceful and actionable error.
197      throw new Error(
198        `RPC sequence id mismatch cur=${rpc.seq} last=${this.rxSeqId} (ERR:rpc_seq)`,
199      );
200    }
201
202    this.rxSeqId = rpc.seq;
203
204    let isFinalResponse = true;
205
206    switch (rpc.response) {
207      case TPM.TPM_APPEND_TRACE_DATA:
208        const appendResult = assertExists(rpc.appendResult);
209        const pendingPromise = assertExists(this.pendingParses.shift());
210        if (appendResult.error && appendResult.error.length > 0) {
211          pendingPromise.reject(appendResult.error);
212        } else {
213          pendingPromise.resolve();
214        }
215        break;
216      case TPM.TPM_FINALIZE_TRACE_DATA:
217        assertExists(this.pendingEOFs.shift()).resolve();
218        break;
219      case TPM.TPM_RESET_TRACE_PROCESSOR:
220        assertExists(this.pendingResetTraceProcessors.shift()).resolve();
221        break;
222      case TPM.TPM_RESTORE_INITIAL_TABLES:
223        assertExists(this.pendingRestoreTables.shift()).resolve();
224        break;
225      case TPM.TPM_QUERY_STREAMING:
226        const qRes = assertExists(rpc.queryResult) as {} as QueryResultBypass;
227        const pendingQuery = assertExists(this.pendingQueries[0]);
228        pendingQuery.appendResultBatch(qRes.rawQueryResult);
229        if (pendingQuery.isComplete()) {
230          this.pendingQueries.shift();
231        } else {
232          isFinalResponse = false;
233        }
234        break;
235      case TPM.TPM_COMPUTE_METRIC:
236        const metricRes = assertExists(rpc.metricResult) as ComputeMetricResult;
237        const pendingComputeMetric = assertExists(
238          this.pendingComputeMetrics.shift(),
239        );
240        if (metricRes.error && metricRes.error.length > 0) {
241          const error = new QueryError(
242            `ComputeMetric() error: ${metricRes.error}`,
243            {
244              query: 'COMPUTE_METRIC',
245            },
246          );
247          pendingComputeMetric.reject(error);
248        } else {
249          const result =
250            metricRes.metricsAsPrototext ||
251            metricRes.metricsAsJson ||
252            metricRes.metrics ||
253            '';
254          pendingComputeMetric.resolve(result);
255        }
256        break;
257      case TPM.TPM_DISABLE_AND_READ_METATRACE:
258        const metatraceRes = assertExists(
259          rpc.metatrace,
260        ) as DisableAndReadMetatraceResult;
261        assertExists(this.pendingReadMetatrace).resolve(metatraceRes);
262        this.pendingReadMetatrace = undefined;
263        break;
264      default:
265        console.log(
266          'Unexpected TraceProcessor response received: ',
267          rpc.response,
268        );
269        break;
270    } // switch(rpc.response);
271
272    if (isFinalResponse) {
273      this.loadingTracker.endLoading();
274    }
275  }
276
277  // TraceProcessor methods below this point.
278  // The methods below are called by the various controllers in the UI and
279  // deal with marshalling / unmarshaling requests to/from TraceProcessor.
280
281  // Push trace data into the engine. The engine is supposed to automatically
282  // figure out the type of the trace (JSON vs Protobuf).
283  parse(data: Uint8Array): Promise<void> {
284    const asyncRes = defer<void>();
285    this.pendingParses.push(asyncRes);
286    const rpc = TraceProcessorRpc.create();
287    rpc.request = TPM.TPM_APPEND_TRACE_DATA;
288    rpc.appendTraceData = data;
289    this.rpcSendRequest(rpc);
290    return asyncRes; // Linearize with the worker.
291  }
292
293  // Notify the engine that we reached the end of the trace.
294  // Called after the last parse() call.
295  notifyEof(): Promise<void> {
296    const asyncRes = defer<void>();
297    this.pendingEOFs.push(asyncRes);
298    const rpc = TraceProcessorRpc.create();
299    rpc.request = TPM.TPM_FINALIZE_TRACE_DATA;
300    this.rpcSendRequest(rpc);
301    return asyncRes; // Linearize with the worker.
302  }
303
304  // Updates the TraceProcessor Config. This method creates a new
305  // TraceProcessor instance, so it should be called before passing any trace
306  // data.
307  resetTraceProcessor({
308    cropTrackEvents,
309    ingestFtraceInRawTable,
310    analyzeTraceProtoContent,
311    ftraceDropUntilAllCpusValid,
312  }: TraceProcessorConfig): Promise<void> {
313    const asyncRes = defer<void>();
314    this.pendingResetTraceProcessors.push(asyncRes);
315    const rpc = TraceProcessorRpc.create();
316    rpc.request = TPM.TPM_RESET_TRACE_PROCESSOR;
317    const args = (rpc.resetTraceProcessorArgs = new ResetTraceProcessorArgs());
318    args.dropTrackEventDataBefore = cropTrackEvents
319      ? ResetTraceProcessorArgs.DropTrackEventDataBefore
320          .TRACK_EVENT_RANGE_OF_INTEREST
321      : ResetTraceProcessorArgs.DropTrackEventDataBefore.NO_DROP;
322    args.ingestFtraceInRawTable = ingestFtraceInRawTable;
323    args.analyzeTraceProtoContent = analyzeTraceProtoContent;
324    args.ftraceDropUntilAllCpusValid = ftraceDropUntilAllCpusValid;
325    this.rpcSendRequest(rpc);
326    return asyncRes;
327  }
328
329  // Resets the trace processor state by destroying any table/views created by
330  // the UI after loading.
331  restoreInitialTables(): Promise<void> {
332    const asyncRes = defer<void>();
333    this.pendingRestoreTables.push(asyncRes);
334    const rpc = TraceProcessorRpc.create();
335    rpc.request = TPM.TPM_RESTORE_INITIAL_TABLES;
336    this.rpcSendRequest(rpc);
337    return asyncRes; // Linearize with the worker.
338  }
339
340  // Shorthand for sending a compute metrics request to the engine.
341  async computeMetric(
342    metrics: string[],
343    format: 'json' | 'prototext' | 'proto',
344  ): Promise<string | Uint8Array> {
345    const asyncRes = defer<string | Uint8Array>();
346    this.pendingComputeMetrics.push(asyncRes);
347    const rpc = TraceProcessorRpc.create();
348    rpc.request = TPM.TPM_COMPUTE_METRIC;
349    const args = (rpc.computeMetricArgs = new ComputeMetricArgs());
350    args.metricNames = metrics;
351    if (format === 'json') {
352      args.format = ComputeMetricArgs.ResultFormat.JSON;
353    } else if (format === 'prototext') {
354      args.format = ComputeMetricArgs.ResultFormat.TEXTPROTO;
355    } else if (format === 'proto') {
356      args.format = ComputeMetricArgs.ResultFormat.BINARY_PROTOBUF;
357    } else {
358      throw new Error(`Unknown compute metric format ${format}`);
359    }
360    this.rpcSendRequest(rpc);
361    return asyncRes;
362  }
363
364  // Issues a streaming query and retrieve results in batches.
365  // The returned QueryResult object will be populated over time with batches
366  // of rows (each batch conveys ~128KB of data and a variable number of rows).
367  // The caller can decide whether to wait that all batches have been received
368  // (by awaiting the returned object or calling result.waitAllRows()) or handle
369  // the rows incrementally.
370  //
371  // Example usage:
372  // const res = engine.execute('SELECT foo, bar FROM table');
373  // console.log(res.numRows());  // Will print 0 because we didn't await.
374  // await(res.waitAllRows());
375  // console.log(res.numRows());  // Will print the total number of rows.
376  //
377  // for (const it = res.iter({foo: NUM, bar:STR}); it.valid(); it.next()) {
378  //   console.log(it.foo, it.bar);
379  // }
380  //
381  // Optional |tag| (usually a component name) can be provided to allow
382  // attributing trace processor workload to different UI components.
383  private streamingQuery(
384    sqlQuery: string,
385    tag?: string,
386  ): Promise<QueryResult> & QueryResult {
387    const rpc = TraceProcessorRpc.create();
388    rpc.request = TPM.TPM_QUERY_STREAMING;
389    rpc.queryArgs = new QueryArgs();
390    rpc.queryArgs.sqlQuery = sqlQuery;
391    if (tag) {
392      rpc.queryArgs.tag = tag;
393    }
394    const result = createQueryResult({
395      query: sqlQuery,
396    });
397    this.pendingQueries.push(result);
398    this.rpcSendRequest(rpc);
399    return result;
400  }
401
402  // Wraps .streamingQuery(), captures errors and re-throws with current stack.
403  //
404  // Note: This function is less flexible than .execute() as it only returns a
405  // promise which must be unwrapped before the QueryResult may be accessed.
406  async query(sqlQuery: string, tag?: string): Promise<QueryResult> {
407    try {
408      return await this.streamingQuery(sqlQuery, tag);
409    } catch (e) {
410      // Replace the error's stack trace with the one from here
411      // Note: It seems only V8 can trace the stack up the promise chain, so its
412      // likely this stack won't be useful on !V8.
413      // See
414      // https://docs.google.com/document/d/13Sy_kBIJGP0XT34V1CV3nkWya4TwYx9L3Yv45LdGB6Q
415      captureStackTrace(e);
416      throw e;
417    }
418  }
419
420  async tryQuery(
421    sql: string,
422    tag?: string,
423  ): Promise<Result<QueryResult, Error>> {
424    try {
425      const result = await this.query(sql, tag);
426      return {success: true, result};
427    } catch (error: unknown) {
428      // We know we only throw Error type objects so we can type assert safely
429      return {success: false, error: error as Error};
430    }
431  }
432
433  isMetatracingEnabled(): boolean {
434    return this._isMetatracingEnabled;
435  }
436
437  enableMetatrace(categories?: MetatraceCategories) {
438    const rpc = TraceProcessorRpc.create();
439    rpc.request = TPM.TPM_ENABLE_METATRACE;
440    if (categories) {
441      rpc.enableMetatraceArgs = new EnableMetatraceArgs();
442      rpc.enableMetatraceArgs.categories = categories;
443    }
444    this._isMetatracingEnabled = true;
445    this.rpcSendRequest(rpc);
446  }
447
448  stopAndGetMetatrace(): Promise<DisableAndReadMetatraceResult> {
449    // If we are already finalising a metatrace, ignore the request.
450    if (this.pendingReadMetatrace) {
451      return Promise.reject(new Error('Already finalising a metatrace'));
452    }
453
454    const result = defer<DisableAndReadMetatraceResult>();
455
456    const rpc = TraceProcessorRpc.create();
457    rpc.request = TPM.TPM_DISABLE_AND_READ_METATRACE;
458    this._isMetatracingEnabled = false;
459    this.pendingReadMetatrace = result;
460    this.rpcSendRequest(rpc);
461    return result;
462  }
463
464  // Marshals the TraceProcessorRpc request arguments and sends the request
465  // to the concrete Engine (Wasm or HTTP).
466  private rpcSendRequest(rpc: TraceProcessorRpc) {
467    rpc.seq = this.txSeqId++;
468    // Each message is wrapped in a TraceProcessorRpcStream to add the varint
469    // preamble with the size, which allows tokenization on the other end.
470    const outerProto = TraceProcessorRpcStream.create();
471    outerProto.msg.push(rpc);
472    const buf = TraceProcessorRpcStream.encode(outerProto).finish();
473    this.loadingTracker.beginLoading();
474    this.rpcSendRequestBytes(buf);
475  }
476
477  getProxy(tag: string): EngineProxy {
478    return new EngineProxy(this, tag);
479  }
480}
481
482// Lightweight engine proxy which annotates all queries with a tag
483export class EngineProxy implements Engine, Disposable {
484  private engine: EngineBase;
485  private tag: string;
486  private _isAlive: boolean;
487
488  constructor(engine: EngineBase, tag: string) {
489    this.engine = engine;
490    this.tag = tag;
491    this._isAlive = true;
492  }
493
494  async query(query: string, tag?: string): Promise<QueryResult> {
495    if (!this._isAlive) {
496      throw new Error(`EngineProxy ${this.tag} was disposed.`);
497    }
498    return await this.engine.query(query, tag);
499  }
500
501  async tryQuery(
502    query: string,
503    tag?: string,
504  ): Promise<Result<QueryResult, Error>> {
505    if (!this._isAlive) {
506      return {
507        success: false,
508        error: new Error(`EngineProxy ${this.tag} was disposed.`),
509      };
510    }
511    return await this.engine.tryQuery(query, tag);
512  }
513
514  async computeMetric(
515    metrics: string[],
516    format: 'json' | 'prototext' | 'proto',
517  ): Promise<string | Uint8Array> {
518    if (!this._isAlive) {
519      return Promise.reject(new Error(`EngineProxy ${this.tag} was disposed.`));
520    }
521    return this.engine.computeMetric(metrics, format);
522  }
523
524  get engineId(): string {
525    return this.engine.id;
526  }
527
528  dispose() {
529    this._isAlive = false;
530  }
531}
532
533// Capture stack trace and attach to the given error object
534function captureStackTrace(e: Error): void {
535  const stack = new Error().stack;
536  if ('captureStackTrace' in Error) {
537    // V8 specific
538    Error.captureStackTrace(e, captureStackTrace);
539  } else {
540    // Generic
541    Object.defineProperty(e, 'stack', {
542      value: stack,
543      writable: true,
544      configurable: true,
545    });
546  }
547}
548