• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright (C) 2018 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15import {defer, Deferred} from '../base/deferred';
16import {assertExists, assertTrue} from '../base/logging';
17import {perfetto} from '../gen/protos';
18
19import {ProtoRingBuffer} from './proto_ring_buffer';
20import {
21  ComputeMetricArgs,
22  ComputeMetricResult,
23  QueryArgs,
24} from './protos';
25import {NUM, NUM_NULL, STR} from './query_result';
26import {
27  createQueryResult,
28  QueryError,
29  QueryResult,
30  WritableQueryResult,
31} from './query_result';
32import {TimeSpan} from './time';
33
34import TraceProcessorRpc = perfetto.protos.TraceProcessorRpc;
35import TraceProcessorRpcStream = perfetto.protos.TraceProcessorRpcStream;
36import TPM = perfetto.protos.TraceProcessorRpc.TraceProcessorMethod;
37
38export interface LoadingTracker {
39  beginLoading(): void;
40  endLoading(): void;
41}
42
43export class NullLoadingTracker implements LoadingTracker {
44  beginLoading(): void {}
45  endLoading(): void {}
46}
47
48
49// This is used to skip the decoding of queryResult from protobufjs and deal
50// with it ourselves. See the comment below around `QueryResult.decode = ...`.
51interface QueryResultBypass {
52  rawQueryResult: Uint8Array;
53}
54
55/**
56 * Abstract interface of a trace proccessor.
57 * This is the TypeScript equivalent of src/trace_processor/rpc.h.
58 * There are two concrete implementations:
59 *   1. WasmEngineProxy: creates a Wasm module and interacts over postMessage().
60 *   2. HttpRpcEngine: connects to an external `trace_processor_shell --httpd`.
61 *      and interacts via fetch().
62 * In both cases, we have a byte-oriented pipe to interact with TraceProcessor.
63 * The derived class is only expected to deal with these two functions:
64 * 1. Implement the abstract rpcSendRequestBytes() function, sending the
65 *    proto-encoded TraceProcessorRpc requests to the TraceProcessor instance.
66 * 2. Call onRpcResponseBytes() when response data is received.
67 */
68export abstract class Engine {
69  abstract readonly id: string;
70  private _cpus?: number[];
71  private _numGpus?: number;
72  private loadingTracker: LoadingTracker;
73  private txSeqId = 0;
74  private rxSeqId = 0;
75  private rxBuf = new ProtoRingBuffer();
76  private pendingParses = new Array<Deferred<void>>();
77  private pendingEOFs = new Array<Deferred<void>>();
78  private pendingQueries = new Array<WritableQueryResult>();
79  private pendingRestoreTables = new Array<Deferred<void>>();
80  private pendingComputeMetrics = new Array<Deferred<ComputeMetricResult>>();
81
82  constructor(tracker?: LoadingTracker) {
83    this.loadingTracker = tracker ? tracker : new NullLoadingTracker();
84  }
85
86  /**
87   * Called to send data to the TraceProcessor instance. This turns into a
88   * postMessage() or a HTTP request, depending on the Engine implementation.
89   */
90  abstract rpcSendRequestBytes(data: Uint8Array): void;
91
92  /**
93   * Called when an inbound message is received by the Engine implementation
94   * (e.g. onmessage for the Wasm case, on when HTTP replies are received for
95   * the HTTP+RPC case).
96   */
97  onRpcResponseBytes(dataWillBeRetained: Uint8Array) {
98    // Note: when hitting the fastpath inside ProtoRingBuffer, the |data| buffer
99    // is returned back by readMessage() (% subarray()-ing it) and held onto by
100    // other classes (e.g., QueryResult). For both fetch() and Wasm we are fine
101    // because every response creates a new buffer.
102    this.rxBuf.append(dataWillBeRetained);
103    for (;;) {
104      const msg = this.rxBuf.readMessage();
105      if (msg === undefined) break;
106      this.onRpcResponseMessage(msg);
107    }
108  }
109
110  /*
111   * Parses a response message.
112   * |rpcMsgEncoded| is a sub-array to to the start of a TraceProcessorRpc
113   * proto-encoded message (without the proto preamble and varint size).
114   */
115  private onRpcResponseMessage(rpcMsgEncoded: Uint8Array) {
116    // Here we override the protobufjs-generated code to skip the parsing of the
117    // new streaming QueryResult and instead passing it through like a buffer.
118    // This is the overall problem: All trace processor responses are wrapped
119    // into a perfetto.protos.TraceProcessorRpc proto message. In all cases %
120    // TPM_QUERY_STREAMING, we want protobufjs to decode the proto bytes and
121    // give us a structured object. In the case of TPM_QUERY_STREAMING, instead,
122    // we want to deal with the proto parsing ourselves using the new
123    // QueryResult.appendResultBatch() method, because that handled streaming
124    // results more efficiently and skips several copies.
125    // By overriding the decode method below, we achieve two things:
126    // 1. We avoid protobufjs decoding the TraceProcessorRpc.query_result field.
127    // 2. We stash (a view of) the original buffer into the |rawQueryResult| so
128    //    the `case TPM_QUERY_STREAMING` below can take it.
129    perfetto.protos.QueryResult.decode =
130        (reader: protobuf.Reader, length: number) => {
131          const res =
132              perfetto.protos.QueryResult.create() as {} as QueryResultBypass;
133          res.rawQueryResult =
134              reader.buf.subarray(reader.pos, reader.pos + length);
135          // All this works only if protobufjs returns the original ArrayBuffer
136          // from |rpcMsgEncoded|. It should be always the case given the
137          // current implementation. This check mainly guards against future
138          // behavioral changes of protobufjs. We don't want to accidentally
139          // hold onto some internal protobufjs buffer. We are fine holding
140          // onto |rpcMsgEncoded| because those come from ProtoRingBuffer which
141          // is buffer-retention-friendly.
142          assertTrue(res.rawQueryResult.buffer === rpcMsgEncoded.buffer);
143          reader.pos += length;
144          return res as {} as perfetto.protos.QueryResult;
145        };
146
147    const rpc = TraceProcessorRpc.decode(rpcMsgEncoded);
148
149    if (rpc.fatalError !== undefined && rpc.fatalError.length > 0) {
150      throw new Error(`${rpc.fatalError}`);
151    }
152
153    // Allow restarting sequences from zero (when reloading the browser).
154    if (rpc.seq !== this.rxSeqId + 1 && this.rxSeqId !== 0 && rpc.seq !== 0) {
155      // "(ERR:rpc_seq)" is intercepted by error_dialog.ts to show a more
156      // graceful and actionable error.
157      throw new Error(`RPC sequence id mismatch cur=${rpc.seq} last=${
158          this.rxSeqId} (ERR:rpc_seq)`);
159    }
160
161    this.rxSeqId = rpc.seq;
162
163    let isFinalResponse = true;
164
165    switch (rpc.response) {
166      case TPM.TPM_APPEND_TRACE_DATA:
167        const appendResult = assertExists(rpc.appendResult);
168        const pendingPromise = assertExists(this.pendingParses.shift());
169        if (appendResult.error && appendResult.error.length > 0) {
170          pendingPromise.reject(appendResult.error);
171        } else {
172          pendingPromise.resolve();
173        }
174        break;
175      case TPM.TPM_FINALIZE_TRACE_DATA:
176        assertExists(this.pendingEOFs.shift()).resolve();
177        break;
178      case TPM.TPM_RESTORE_INITIAL_TABLES:
179        assertExists(this.pendingRestoreTables.shift()).resolve();
180        break;
181      case TPM.TPM_QUERY_STREAMING:
182        const qRes = assertExists(rpc.queryResult) as {} as QueryResultBypass;
183        const pendingQuery = assertExists(this.pendingQueries[0]);
184        pendingQuery.appendResultBatch(qRes.rawQueryResult);
185        if (pendingQuery.isComplete()) {
186          this.pendingQueries.shift();
187        } else {
188          isFinalResponse = false;
189        }
190        break;
191      case TPM.TPM_COMPUTE_METRIC:
192        const metricRes = assertExists(rpc.metricResult) as ComputeMetricResult;
193        if (metricRes.error && metricRes.error.length > 0) {
194          throw new QueryError(`ComputeMetric() error: ${metricRes.error}`, {
195            query: 'COMPUTE_METRIC',
196          });
197        }
198        assertExists(this.pendingComputeMetrics.shift()).resolve(metricRes);
199        break;
200      default:
201        console.log(
202            'Unexpected TraceProcessor response received: ', rpc.response);
203        break;
204    }  // switch(rpc.response);
205
206    if (isFinalResponse) {
207      this.loadingTracker.endLoading();
208    }
209  }
210
211  /**
212   * TraceProcessor methods below this point.
213   * The methods below are called by the various controllers in the UI and
214   * deal with marshalling / unmarshaling requests to/from TraceProcessor.
215   */
216
217
218  /**
219   * Push trace data into the engine. The engine is supposed to automatically
220   * figure out the type of the trace (JSON vs Protobuf).
221   */
222  parse(data: Uint8Array): Promise<void> {
223    const asyncRes = defer<void>();
224    this.pendingParses.push(asyncRes);
225    const rpc = TraceProcessorRpc.create();
226    rpc.request = TPM.TPM_APPEND_TRACE_DATA;
227    rpc.appendTraceData = data;
228    this.rpcSendRequest(rpc);
229    return asyncRes;  // Linearize with the worker.
230  }
231
232  /**
233   * Notify the engine that we reached the end of the trace.
234   * Called after the last parse() call.
235   */
236  notifyEof(): Promise<void> {
237    const asyncRes = defer<void>();
238    this.pendingEOFs.push(asyncRes);
239    const rpc = TraceProcessorRpc.create();
240    rpc.request = TPM.TPM_FINALIZE_TRACE_DATA;
241    this.rpcSendRequest(rpc);
242    return asyncRes;  // Linearize with the worker.
243  }
244
245  /**
246   * Resets the trace processor state by destroying any table/views created by
247   * the UI after loading.
248   */
249  restoreInitialTables(): Promise<void> {
250    const asyncRes = defer<void>();
251    this.pendingRestoreTables.push(asyncRes);
252    const rpc = TraceProcessorRpc.create();
253    rpc.request = TPM.TPM_RESTORE_INITIAL_TABLES;
254    this.rpcSendRequest(rpc);
255    return asyncRes;  // Linearize with the worker.
256  }
257
258  /**
259   * Shorthand for sending a compute metrics request to the engine.
260   */
261  async computeMetric(metrics: string[]): Promise<ComputeMetricResult> {
262    const asyncRes = defer<ComputeMetricResult>();
263    this.pendingComputeMetrics.push(asyncRes);
264    const rpc = TraceProcessorRpc.create();
265    rpc.request = TPM.TPM_COMPUTE_METRIC;
266    const args = rpc.computeMetricArgs = new ComputeMetricArgs();
267    args.metricNames = metrics;
268    args.format = ComputeMetricArgs.ResultFormat.TEXTPROTO;
269    this.rpcSendRequest(rpc);
270    return asyncRes;
271  }
272
273  /*
274   * Issues a streaming query and retrieve results in batches.
275   * The returned QueryResult object will be populated over time with batches
276   * of rows (each batch conveys ~128KB of data and a variable number of rows).
277   * The caller can decide whether to wait that all batches have been received
278   * (by awaiting the returned object or calling result.waitAllRows()) or handle
279   * the rows incrementally.
280   *
281   * Example usage:
282   * const res = engine.query('SELECT foo, bar FROM table');
283   * console.log(res.numRows());  // Will print 0 because we didn't await.
284   * await(res.waitAllRows());
285   * console.log(res.numRows());  // Will print the total number of rows.
286   *
287   * for (const it = res.iter({foo: NUM, bar:STR}); it.valid(); it.next()) {
288   *   console.log(it.foo, it.bar);
289   * }
290   */
291  query(sqlQuery: string): Promise<QueryResult>&QueryResult {
292    const rpc = TraceProcessorRpc.create();
293    rpc.request = TPM.TPM_QUERY_STREAMING;
294    rpc.queryArgs = new QueryArgs();
295    rpc.queryArgs.sqlQuery = sqlQuery;
296    const result = createQueryResult({
297      query: sqlQuery,
298    });
299    this.pendingQueries.push(result);
300    this.rpcSendRequest(rpc);
301    return result;
302  }
303
304  /**
305   * Marshals the TraceProcessorRpc request arguments and sends the request
306   * to the concrete Engine (Wasm or HTTP).
307   */
308  private rpcSendRequest(rpc: TraceProcessorRpc) {
309    rpc.seq = this.txSeqId++;
310    // Each message is wrapped in a TraceProcessorRpcStream to add the varint
311    // preamble with the size, which allows tokenization on the other end.
312    const outerProto = TraceProcessorRpcStream.create();
313    outerProto.msg.push(rpc);
314    const buf = TraceProcessorRpcStream.encode(outerProto).finish();
315    this.loadingTracker.beginLoading();
316    this.rpcSendRequestBytes(buf);
317  }
318
319  // TODO(hjd): When streaming must invalidate this somehow.
320  async getCpus(): Promise<number[]> {
321    if (!this._cpus) {
322      const cpus = [];
323      const queryRes = await this.query(
324          'select distinct(cpu) as cpu from sched order by cpu;');
325      for (const it = queryRes.iter({cpu: NUM}); it.valid(); it.next()) {
326        cpus.push(it.cpu);
327      }
328      this._cpus = cpus;
329    }
330    return this._cpus;
331  }
332
333  async getNumberOfGpus(): Promise<number> {
334    if (!this._numGpus) {
335      const result = await this.query(`
336        select count(distinct(gpu_id)) as gpuCount
337        from gpu_counter_track
338        where name = 'gpufreq';
339      `);
340      this._numGpus = result.firstRow({gpuCount: NUM}).gpuCount;
341    }
342    return this._numGpus;
343  }
344
345  // TODO: This should live in code that's more specific to chrome, instead of
346  // in engine.
347  async getNumberOfProcesses(): Promise<number> {
348    const result = await this.query('select count(*) as cnt from process;');
349    return result.firstRow({cnt: NUM}).cnt;
350  }
351
352  async getTraceTimeBounds(): Promise<TimeSpan> {
353    const result = await this.query(
354        `select start_ts as startTs, end_ts as endTs from trace_bounds`);
355    const bounds = result.firstRow({
356      startTs: NUM,
357      endTs: NUM,
358    });
359    return new TimeSpan(bounds.startTs / 1e9, bounds.endTs / 1e9);
360  }
361
362  async getTracingMetadataTimeBounds(): Promise<TimeSpan> {
363    const queryRes = await this.query(`select
364         name,
365         int_value as intValue
366         from metadata
367         where name = 'tracing_started_ns' or name = 'tracing_disabled_ns'
368         or name = 'all_data_source_started_ns'`);
369    let startBound = -Infinity;
370    let endBound = Infinity;
371    const it = queryRes.iter({'name': STR, 'intValue': NUM_NULL});
372    for (; it.valid(); it.next()) {
373      const columnName = it.name;
374      const timestamp = it.intValue;
375      if (timestamp === null) continue;
376      if (columnName === 'tracing_disabled_ns') {
377        endBound = Math.min(endBound, timestamp / 1e9);
378      } else {
379        startBound = Math.max(startBound, timestamp / 1e9);
380      }
381    }
382
383    return new TimeSpan(startBound, endBound);
384  }
385}
386