• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright (C) 2018 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15import {defer, Deferred} from '../base/deferred';
16import {assertExists, assertTrue} from '../base/logging';
17import {perfetto} from '../gen/protos';
18
19import {ProtoRingBuffer} from './proto_ring_buffer';
20import {
21  ComputeMetricArgs,
22  ComputeMetricResult,
23  DisableAndReadMetatraceResult,
24  QueryArgs,
25  ResetTraceProcessorArgs,
26} from './protos';
27import {LONG, LONG_NULL, NUM, STR} from './query_result';
28import {
29  createQueryResult,
30  QueryError,
31  QueryResult,
32  WritableQueryResult,
33} from './query_result';
34import {TPTime, TPTimeSpan} from './time';
35
36import TraceProcessorRpc = perfetto.protos.TraceProcessorRpc;
37import TraceProcessorRpcStream = perfetto.protos.TraceProcessorRpcStream;
38import TPM = perfetto.protos.TraceProcessorRpc.TraceProcessorMethod;
39import {Span} from '../common/time';
40import {BigintMath} from '../base/bigint_math';
41
42export interface LoadingTracker {
43  beginLoading(): void;
44  endLoading(): void;
45}
46
47export class NullLoadingTracker implements LoadingTracker {
48  beginLoading(): void {}
49  endLoading(): void {}
50}
51
52
53// This is used to skip the decoding of queryResult from protobufjs and deal
54// with it ourselves. See the comment below around `QueryResult.decode = ...`.
55interface QueryResultBypass {
56  rawQueryResult: Uint8Array;
57}
58
59export interface TraceProcessorConfig {
60  cropTrackEvents: boolean;
61  ingestFtraceInRawTable: boolean;
62  analyzeTraceProtoContent: boolean;
63}
64
65// Abstract interface of a trace proccessor.
66// This is the TypeScript equivalent of src/trace_processor/rpc.h.
67// There are two concrete implementations:
68//   1. WasmEngineProxy: creates a Wasm module and interacts over postMessage().
69//   2. HttpRpcEngine: connects to an external `trace_processor_shell --httpd`.
70//      and interacts via fetch().
71// In both cases, we have a byte-oriented pipe to interact with TraceProcessor.
72// The derived class is only expected to deal with these two functions:
73// 1. Implement the abstract rpcSendRequestBytes() function, sending the
74//    proto-encoded TraceProcessorRpc requests to the TraceProcessor instance.
75// 2. Call onRpcResponseBytes() when response data is received.
76export abstract class Engine {
77  abstract readonly id: string;
78  private _cpus?: number[];
79  private _numGpus?: number;
80  private loadingTracker: LoadingTracker;
81  private txSeqId = 0;
82  private rxSeqId = 0;
83  private rxBuf = new ProtoRingBuffer();
84  private pendingParses = new Array<Deferred<void>>();
85  private pendingEOFs = new Array<Deferred<void>>();
86  private pendingResetTraceProcessors = new Array<Deferred<void>>();
87  private pendingQueries = new Array<WritableQueryResult>();
88  private pendingRestoreTables = new Array<Deferred<void>>();
89  private pendingComputeMetrics = new Array<Deferred<ComputeMetricResult>>();
90  private pendingReadMetatrace?: Deferred<DisableAndReadMetatraceResult>;
91  private _isMetatracingEnabled = false;
92
93  constructor(tracker?: LoadingTracker) {
94    this.loadingTracker = tracker ? tracker : new NullLoadingTracker();
95  }
96
97  // Called to send data to the TraceProcessor instance. This turns into a
98  // postMessage() or a HTTP request, depending on the Engine implementation.
99  abstract rpcSendRequestBytes(data: Uint8Array): void;
100
101  // Called when an inbound message is received by the Engine implementation
102  // (e.g. onmessage for the Wasm case, on when HTTP replies are received for
103  // the HTTP+RPC case).
104  onRpcResponseBytes(dataWillBeRetained: Uint8Array) {
105    // Note: when hitting the fastpath inside ProtoRingBuffer, the |data| buffer
106    // is returned back by readMessage() (% subarray()-ing it) and held onto by
107    // other classes (e.g., QueryResult). For both fetch() and Wasm we are fine
108    // because every response creates a new buffer.
109    this.rxBuf.append(dataWillBeRetained);
110    for (;;) {
111      const msg = this.rxBuf.readMessage();
112      if (msg === undefined) break;
113      this.onRpcResponseMessage(msg);
114    }
115  }
116
117  // Parses a response message.
118  // |rpcMsgEncoded| is a sub-array to to the start of a TraceProcessorRpc
119  // proto-encoded message (without the proto preamble and varint size).
120  private onRpcResponseMessage(rpcMsgEncoded: Uint8Array) {
121    // Here we override the protobufjs-generated code to skip the parsing of the
122    // new streaming QueryResult and instead passing it through like a buffer.
123    // This is the overall problem: All trace processor responses are wrapped
124    // into a perfetto.protos.TraceProcessorRpc proto message. In all cases %
125    // TPM_QUERY_STREAMING, we want protobufjs to decode the proto bytes and
126    // give us a structured object. In the case of TPM_QUERY_STREAMING, instead,
127    // we want to deal with the proto parsing ourselves using the new
128    // QueryResult.appendResultBatch() method, because that handled streaming
129    // results more efficiently and skips several copies.
130    // By overriding the decode method below, we achieve two things:
131    // 1. We avoid protobufjs decoding the TraceProcessorRpc.query_result field.
132    // 2. We stash (a view of) the original buffer into the |rawQueryResult| so
133    //    the `case TPM_QUERY_STREAMING` below can take it.
134    perfetto.protos.QueryResult.decode =
135        (reader: protobuf.Reader, length: number) => {
136          const res =
137              perfetto.protos.QueryResult.create() as {} as QueryResultBypass;
138          res.rawQueryResult =
139              reader.buf.subarray(reader.pos, reader.pos + length);
140          // All this works only if protobufjs returns the original ArrayBuffer
141          // from |rpcMsgEncoded|. It should be always the case given the
142          // current implementation. This check mainly guards against future
143          // behavioral changes of protobufjs. We don't want to accidentally
144          // hold onto some internal protobufjs buffer. We are fine holding
145          // onto |rpcMsgEncoded| because those come from ProtoRingBuffer which
146          // is buffer-retention-friendly.
147          assertTrue(res.rawQueryResult.buffer === rpcMsgEncoded.buffer);
148          reader.pos += length;
149          return res as {} as perfetto.protos.QueryResult;
150        };
151
152    const rpc = TraceProcessorRpc.decode(rpcMsgEncoded);
153
154    if (rpc.fatalError !== undefined && rpc.fatalError.length > 0) {
155      throw new Error(`${rpc.fatalError}`);
156    }
157
158    // Allow restarting sequences from zero (when reloading the browser).
159    if (rpc.seq !== this.rxSeqId + 1 && this.rxSeqId !== 0 && rpc.seq !== 0) {
160      // "(ERR:rpc_seq)" is intercepted by error_dialog.ts to show a more
161      // graceful and actionable error.
162      throw new Error(`RPC sequence id mismatch cur=${rpc.seq} last=${
163          this.rxSeqId} (ERR:rpc_seq)`);
164    }
165
166    this.rxSeqId = rpc.seq;
167
168    let isFinalResponse = true;
169
170    switch (rpc.response) {
171      case TPM.TPM_APPEND_TRACE_DATA:
172        const appendResult = assertExists(rpc.appendResult);
173        const pendingPromise = assertExists(this.pendingParses.shift());
174        if (appendResult.error && appendResult.error.length > 0) {
175          pendingPromise.reject(appendResult.error);
176        } else {
177          pendingPromise.resolve();
178        }
179        break;
180      case TPM.TPM_FINALIZE_TRACE_DATA:
181        assertExists(this.pendingEOFs.shift()).resolve();
182        break;
183      case TPM.TPM_RESET_TRACE_PROCESSOR:
184        assertExists(this.pendingResetTraceProcessors.shift()).resolve();
185        break;
186      case TPM.TPM_RESTORE_INITIAL_TABLES:
187        assertExists(this.pendingRestoreTables.shift()).resolve();
188        break;
189      case TPM.TPM_QUERY_STREAMING:
190        const qRes = assertExists(rpc.queryResult) as {} as QueryResultBypass;
191        const pendingQuery = assertExists(this.pendingQueries[0]);
192        pendingQuery.appendResultBatch(qRes.rawQueryResult);
193        if (pendingQuery.isComplete()) {
194          this.pendingQueries.shift();
195        } else {
196          isFinalResponse = false;
197        }
198        break;
199      case TPM.TPM_COMPUTE_METRIC:
200        const metricRes = assertExists(rpc.metricResult) as ComputeMetricResult;
201        const pendingComputeMetric =
202            assertExists(this.pendingComputeMetrics.shift());
203        if (metricRes.error && metricRes.error.length > 0) {
204          const error =
205              new QueryError(`ComputeMetric() error: ${metricRes.error}`, {
206                query: 'COMPUTE_METRIC',
207              });
208          pendingComputeMetric.reject(error);
209        } else {
210          pendingComputeMetric.resolve(metricRes);
211        }
212        break;
213      case TPM.TPM_DISABLE_AND_READ_METATRACE:
214        const metatraceRes =
215            assertExists(rpc.metatrace) as DisableAndReadMetatraceResult;
216        assertExists(this.pendingReadMetatrace).resolve(metatraceRes);
217        this.pendingReadMetatrace = undefined;
218        break;
219      default:
220        console.log(
221            'Unexpected TraceProcessor response received: ', rpc.response);
222        break;
223    }  // switch(rpc.response);
224
225    if (isFinalResponse) {
226      this.loadingTracker.endLoading();
227    }
228  }
229
230  // TraceProcessor methods below this point.
231  // The methods below are called by the various controllers in the UI and
232  // deal with marshalling / unmarshaling requests to/from TraceProcessor.
233
234
235  // Push trace data into the engine. The engine is supposed to automatically
236  // figure out the type of the trace (JSON vs Protobuf).
237  parse(data: Uint8Array): Promise<void> {
238    const asyncRes = defer<void>();
239    this.pendingParses.push(asyncRes);
240    const rpc = TraceProcessorRpc.create();
241    rpc.request = TPM.TPM_APPEND_TRACE_DATA;
242    rpc.appendTraceData = data;
243    this.rpcSendRequest(rpc);
244    return asyncRes;  // Linearize with the worker.
245  }
246
247  // Notify the engine that we reached the end of the trace.
248  // Called after the last parse() call.
249  notifyEof(): Promise<void> {
250    const asyncRes = defer<void>();
251    this.pendingEOFs.push(asyncRes);
252    const rpc = TraceProcessorRpc.create();
253    rpc.request = TPM.TPM_FINALIZE_TRACE_DATA;
254    this.rpcSendRequest(rpc);
255    return asyncRes;  // Linearize with the worker.
256  }
257
258  // Updates the TraceProcessor Config. This method creates a new
259  // TraceProcessor instance, so it should be called before passing any trace
260  // data.
261  resetTraceProcessor(
262      {cropTrackEvents, ingestFtraceInRawTable, analyzeTraceProtoContent}:
263          TraceProcessorConfig): Promise<void> {
264    const asyncRes = defer<void>();
265    this.pendingResetTraceProcessors.push(asyncRes);
266    const rpc = TraceProcessorRpc.create();
267    rpc.request = TPM.TPM_RESET_TRACE_PROCESSOR;
268    const args = rpc.resetTraceProcessorArgs = new ResetTraceProcessorArgs();
269    args.dropTrackEventDataBefore = cropTrackEvents ?
270        ResetTraceProcessorArgs.DropTrackEventDataBefore
271            .TRACK_EVENT_RANGE_OF_INTEREST :
272        ResetTraceProcessorArgs.DropTrackEventDataBefore.NO_DROP;
273    args.ingestFtraceInRawTable = ingestFtraceInRawTable;
274    args.analyzeTraceProtoContent = analyzeTraceProtoContent;
275    this.rpcSendRequest(rpc);
276    return asyncRes;
277  }
278
279  // Resets the trace processor state by destroying any table/views created by
280  // the UI after loading.
281  restoreInitialTables(): Promise<void> {
282    const asyncRes = defer<void>();
283    this.pendingRestoreTables.push(asyncRes);
284    const rpc = TraceProcessorRpc.create();
285    rpc.request = TPM.TPM_RESTORE_INITIAL_TABLES;
286    this.rpcSendRequest(rpc);
287    return asyncRes;  // Linearize with the worker.
288  }
289
290  // Shorthand for sending a compute metrics request to the engine.
291  async computeMetric(metrics: string[]): Promise<ComputeMetricResult> {
292    const asyncRes = defer<ComputeMetricResult>();
293    this.pendingComputeMetrics.push(asyncRes);
294    const rpc = TraceProcessorRpc.create();
295    rpc.request = TPM.TPM_COMPUTE_METRIC;
296    const args = rpc.computeMetricArgs = new ComputeMetricArgs();
297    args.metricNames = metrics;
298    args.format = ComputeMetricArgs.ResultFormat.TEXTPROTO;
299    this.rpcSendRequest(rpc);
300    return asyncRes;
301  }
302
303  // Issues a streaming query and retrieve results in batches.
304  // The returned QueryResult object will be populated over time with batches
305  // of rows (each batch conveys ~128KB of data and a variable number of rows).
306  // The caller can decide whether to wait that all batches have been received
307  // (by awaiting the returned object or calling result.waitAllRows()) or handle
308  // the rows incrementally.
309  //
310  // Example usage:
311  // const res = engine.query('SELECT foo, bar FROM table');
312  // console.log(res.numRows());  // Will print 0 because we didn't await.
313  // await(res.waitAllRows());
314  // console.log(res.numRows());  // Will print the total number of rows.
315  //
316  // for (const it = res.iter({foo: NUM, bar:STR}); it.valid(); it.next()) {
317  //   console.log(it.foo, it.bar);
318  // }
319  //
320  // Optional |tag| (usually a component name) can be provided to allow
321  // attributing trace processor workload to different UI components.
322  query(sqlQuery: string, tag?: string): Promise<QueryResult>&QueryResult {
323    const rpc = TraceProcessorRpc.create();
324    rpc.request = TPM.TPM_QUERY_STREAMING;
325    rpc.queryArgs = new QueryArgs();
326    rpc.queryArgs.sqlQuery = sqlQuery;
327    if (tag) {
328      rpc.queryArgs.tag = tag;
329    }
330    const result = createQueryResult({
331      query: sqlQuery,
332    });
333    this.pendingQueries.push(result);
334    this.rpcSendRequest(rpc);
335    return result;
336  }
337
338  isMetatracingEnabled(): boolean {
339    return this._isMetatracingEnabled;
340  }
341
342  enableMetatrace(categories?: perfetto.protos.MetatraceCategories) {
343    const rpc = TraceProcessorRpc.create();
344    rpc.request = TPM.TPM_ENABLE_METATRACE;
345    if (categories) {
346      rpc.enableMetatraceArgs = new perfetto.protos.EnableMetatraceArgs();
347      rpc.enableMetatraceArgs.categories = categories;
348    }
349    this._isMetatracingEnabled = true;
350    this.rpcSendRequest(rpc);
351  }
352
353  stopAndGetMetatrace(): Promise<DisableAndReadMetatraceResult> {
354    // If we are already finalising a metatrace, ignore the request.
355    if (this.pendingReadMetatrace) {
356      return Promise.reject(new Error('Already finalising a metatrace'));
357    }
358
359    const result = defer<DisableAndReadMetatraceResult>();
360
361    const rpc = TraceProcessorRpc.create();
362    rpc.request = TPM.TPM_DISABLE_AND_READ_METATRACE;
363    this._isMetatracingEnabled = false;
364    this.pendingReadMetatrace = result;
365    this.rpcSendRequest(rpc);
366    return result;
367  }
368
369  // Marshals the TraceProcessorRpc request arguments and sends the request
370  // to the concrete Engine (Wasm or HTTP).
371  private rpcSendRequest(rpc: TraceProcessorRpc) {
372    rpc.seq = this.txSeqId++;
373    // Each message is wrapped in a TraceProcessorRpcStream to add the varint
374    // preamble with the size, which allows tokenization on the other end.
375    const outerProto = TraceProcessorRpcStream.create();
376    outerProto.msg.push(rpc);
377    const buf = TraceProcessorRpcStream.encode(outerProto).finish();
378    this.loadingTracker.beginLoading();
379    this.rpcSendRequestBytes(buf);
380  }
381
382  // TODO(hjd): When streaming must invalidate this somehow.
383  async getCpus(): Promise<number[]> {
384    if (!this._cpus) {
385      const cpus = [];
386      const queryRes = await this.query(
387          'select distinct(cpu) as cpu from sched order by cpu;');
388      for (const it = queryRes.iter({cpu: NUM}); it.valid(); it.next()) {
389        cpus.push(it.cpu);
390      }
391      this._cpus = cpus;
392    }
393    return this._cpus;
394  }
395
396  async getNumberOfGpus(): Promise<number> {
397    if (!this._numGpus) {
398      const result = await this.query(`
399        select count(distinct(gpu_id)) as gpuCount
400        from gpu_counter_track
401        where name = 'gpufreq';
402      `);
403      this._numGpus = result.firstRow({gpuCount: NUM}).gpuCount;
404    }
405    return this._numGpus;
406  }
407
408  // TODO: This should live in code that's more specific to chrome, instead of
409  // in engine.
410  async getNumberOfProcesses(): Promise<number> {
411    const result = await this.query('select count(*) as cnt from process;');
412    return result.firstRow({cnt: NUM}).cnt;
413  }
414
415  async getTraceTimeBounds(): Promise<Span<TPTime>> {
416    const result = await this.query(
417        `select start_ts as startTs, end_ts as endTs from trace_bounds`);
418    const bounds = result.firstRow({
419      startTs: LONG,
420      endTs: LONG,
421    });
422    return new TPTimeSpan(bounds.startTs, bounds.endTs);
423  }
424
425  async getTracingMetadataTimeBounds(): Promise<Span<TPTime>> {
426    const queryRes = await this.query(`select
427         name,
428         int_value as intValue
429         from metadata
430         where name = 'tracing_started_ns' or name = 'tracing_disabled_ns'
431         or name = 'all_data_source_started_ns'`);
432    let startBound = 0n;
433    let endBound = BigintMath.INT64_MAX;
434    const it = queryRes.iter({'name': STR, 'intValue': LONG_NULL});
435    for (; it.valid(); it.next()) {
436      const columnName = it.name;
437      const timestamp = it.intValue;
438      if (timestamp === null) continue;
439      if (columnName === 'tracing_disabled_ns') {
440        endBound = BigintMath.min(endBound, timestamp);
441      } else {
442        startBound = BigintMath.max(startBound, timestamp);
443      }
444    }
445
446    return new TPTimeSpan(startBound, endBound);
447  }
448
449  getProxy(tag: string): EngineProxy {
450    return new EngineProxy(this, tag);
451  }
452}
453
454// Lightweight wrapper over Engine exposing only `query` method and annotating
455// all queries going through it with a tag.
456export class EngineProxy {
457  private engine: Engine;
458  private tag: string;
459
460  constructor(engine: Engine, tag: string) {
461    this.engine = engine;
462    this.tag = tag;
463  }
464
465  query(sqlQuery: string, tag?: string): Promise<QueryResult>&QueryResult {
466    return this.engine.query(sqlQuery, tag || this.tag);
467  }
468
469  get engineId(): string {
470    return this.engine.id;
471  }
472}
473