// Copyright (C) 2018 The Android Open Source Project // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. import {defer, Deferred} from '../base/deferred'; import {assertExists, assertTrue} from '../base/logging'; import {perfetto} from '../gen/protos'; import {ProtoRingBuffer} from './proto_ring_buffer'; import { ComputeMetricArgs, ComputeMetricResult, QueryArgs, } from './protos'; import {NUM, NUM_NULL, STR} from './query_result'; import { createQueryResult, QueryError, QueryResult, WritableQueryResult, } from './query_result'; import {TimeSpan} from './time'; import TraceProcessorRpc = perfetto.protos.TraceProcessorRpc; import TraceProcessorRpcStream = perfetto.protos.TraceProcessorRpcStream; import TPM = perfetto.protos.TraceProcessorRpc.TraceProcessorMethod; export interface LoadingTracker { beginLoading(): void; endLoading(): void; } export class NullLoadingTracker implements LoadingTracker { beginLoading(): void {} endLoading(): void {} } // This is used to skip the decoding of queryResult from protobufjs and deal // with it ourselves. See the comment below around `QueryResult.decode = ...`. interface QueryResultBypass { rawQueryResult: Uint8Array; } /** * Abstract interface of a trace proccessor. * This is the TypeScript equivalent of src/trace_processor/rpc.h. * There are two concrete implementations: * 1. WasmEngineProxy: creates a Wasm module and interacts over postMessage(). * 2. HttpRpcEngine: connects to an external `trace_processor_shell --httpd`. * and interacts via fetch(). * In both cases, we have a byte-oriented pipe to interact with TraceProcessor. * The derived class is only expected to deal with these two functions: * 1. Implement the abstract rpcSendRequestBytes() function, sending the * proto-encoded TraceProcessorRpc requests to the TraceProcessor instance. * 2. Call onRpcResponseBytes() when response data is received. */ export abstract class Engine { abstract readonly id: string; private _cpus?: number[]; private _numGpus?: number; private loadingTracker: LoadingTracker; private txSeqId = 0; private rxSeqId = 0; private rxBuf = new ProtoRingBuffer(); private pendingParses = new Array>(); private pendingEOFs = new Array>(); private pendingQueries = new Array(); private pendingRestoreTables = new Array>(); private pendingComputeMetrics = new Array>(); constructor(tracker?: LoadingTracker) { this.loadingTracker = tracker ? tracker : new NullLoadingTracker(); } /** * Called to send data to the TraceProcessor instance. This turns into a * postMessage() or a HTTP request, depending on the Engine implementation. */ abstract rpcSendRequestBytes(data: Uint8Array): void; /** * Called when an inbound message is received by the Engine implementation * (e.g. onmessage for the Wasm case, on when HTTP replies are received for * the HTTP+RPC case). */ onRpcResponseBytes(dataWillBeRetained: Uint8Array) { // Note: when hitting the fastpath inside ProtoRingBuffer, the |data| buffer // is returned back by readMessage() (% subarray()-ing it) and held onto by // other classes (e.g., QueryResult). For both fetch() and Wasm we are fine // because every response creates a new buffer. this.rxBuf.append(dataWillBeRetained); for (;;) { const msg = this.rxBuf.readMessage(); if (msg === undefined) break; this.onRpcResponseMessage(msg); } } /* * Parses a response message. * |rpcMsgEncoded| is a sub-array to to the start of a TraceProcessorRpc * proto-encoded message (without the proto preamble and varint size). */ private onRpcResponseMessage(rpcMsgEncoded: Uint8Array) { // Here we override the protobufjs-generated code to skip the parsing of the // new streaming QueryResult and instead passing it through like a buffer. // This is the overall problem: All trace processor responses are wrapped // into a perfetto.protos.TraceProcessorRpc proto message. In all cases % // TPM_QUERY_STREAMING, we want protobufjs to decode the proto bytes and // give us a structured object. In the case of TPM_QUERY_STREAMING, instead, // we want to deal with the proto parsing ourselves using the new // QueryResult.appendResultBatch() method, because that handled streaming // results more efficiently and skips several copies. // By overriding the decode method below, we achieve two things: // 1. We avoid protobufjs decoding the TraceProcessorRpc.query_result field. // 2. We stash (a view of) the original buffer into the |rawQueryResult| so // the `case TPM_QUERY_STREAMING` below can take it. perfetto.protos.QueryResult.decode = (reader: protobuf.Reader, length: number) => { const res = perfetto.protos.QueryResult.create() as {} as QueryResultBypass; res.rawQueryResult = reader.buf.subarray(reader.pos, reader.pos + length); // All this works only if protobufjs returns the original ArrayBuffer // from |rpcMsgEncoded|. It should be always the case given the // current implementation. This check mainly guards against future // behavioral changes of protobufjs. We don't want to accidentally // hold onto some internal protobufjs buffer. We are fine holding // onto |rpcMsgEncoded| because those come from ProtoRingBuffer which // is buffer-retention-friendly. assertTrue(res.rawQueryResult.buffer === rpcMsgEncoded.buffer); reader.pos += length; return res as {} as perfetto.protos.QueryResult; }; const rpc = TraceProcessorRpc.decode(rpcMsgEncoded); if (rpc.fatalError !== undefined && rpc.fatalError.length > 0) { throw new Error(`${rpc.fatalError}`); } // Allow restarting sequences from zero (when reloading the browser). if (rpc.seq !== this.rxSeqId + 1 && this.rxSeqId !== 0 && rpc.seq !== 0) { // "(ERR:rpc_seq)" is intercepted by error_dialog.ts to show a more // graceful and actionable error. throw new Error(`RPC sequence id mismatch cur=${rpc.seq} last=${ this.rxSeqId} (ERR:rpc_seq)`); } this.rxSeqId = rpc.seq; let isFinalResponse = true; switch (rpc.response) { case TPM.TPM_APPEND_TRACE_DATA: const appendResult = assertExists(rpc.appendResult); const pendingPromise = assertExists(this.pendingParses.shift()); if (appendResult.error && appendResult.error.length > 0) { pendingPromise.reject(appendResult.error); } else { pendingPromise.resolve(); } break; case TPM.TPM_FINALIZE_TRACE_DATA: assertExists(this.pendingEOFs.shift()).resolve(); break; case TPM.TPM_RESTORE_INITIAL_TABLES: assertExists(this.pendingRestoreTables.shift()).resolve(); break; case TPM.TPM_QUERY_STREAMING: const qRes = assertExists(rpc.queryResult) as {} as QueryResultBypass; const pendingQuery = assertExists(this.pendingQueries[0]); pendingQuery.appendResultBatch(qRes.rawQueryResult); if (pendingQuery.isComplete()) { this.pendingQueries.shift(); } else { isFinalResponse = false; } break; case TPM.TPM_COMPUTE_METRIC: const metricRes = assertExists(rpc.metricResult) as ComputeMetricResult; if (metricRes.error && metricRes.error.length > 0) { throw new QueryError(`ComputeMetric() error: ${metricRes.error}`, { query: 'COMPUTE_METRIC', }); } assertExists(this.pendingComputeMetrics.shift()).resolve(metricRes); break; default: console.log( 'Unexpected TraceProcessor response received: ', rpc.response); break; } // switch(rpc.response); if (isFinalResponse) { this.loadingTracker.endLoading(); } } /** * TraceProcessor methods below this point. * The methods below are called by the various controllers in the UI and * deal with marshalling / unmarshaling requests to/from TraceProcessor. */ /** * Push trace data into the engine. The engine is supposed to automatically * figure out the type of the trace (JSON vs Protobuf). */ parse(data: Uint8Array): Promise { const asyncRes = defer(); this.pendingParses.push(asyncRes); const rpc = TraceProcessorRpc.create(); rpc.request = TPM.TPM_APPEND_TRACE_DATA; rpc.appendTraceData = data; this.rpcSendRequest(rpc); return asyncRes; // Linearize with the worker. } /** * Notify the engine that we reached the end of the trace. * Called after the last parse() call. */ notifyEof(): Promise { const asyncRes = defer(); this.pendingEOFs.push(asyncRes); const rpc = TraceProcessorRpc.create(); rpc.request = TPM.TPM_FINALIZE_TRACE_DATA; this.rpcSendRequest(rpc); return asyncRes; // Linearize with the worker. } /** * Resets the trace processor state by destroying any table/views created by * the UI after loading. */ restoreInitialTables(): Promise { const asyncRes = defer(); this.pendingRestoreTables.push(asyncRes); const rpc = TraceProcessorRpc.create(); rpc.request = TPM.TPM_RESTORE_INITIAL_TABLES; this.rpcSendRequest(rpc); return asyncRes; // Linearize with the worker. } /** * Shorthand for sending a compute metrics request to the engine. */ async computeMetric(metrics: string[]): Promise { const asyncRes = defer(); this.pendingComputeMetrics.push(asyncRes); const rpc = TraceProcessorRpc.create(); rpc.request = TPM.TPM_COMPUTE_METRIC; const args = rpc.computeMetricArgs = new ComputeMetricArgs(); args.metricNames = metrics; args.format = ComputeMetricArgs.ResultFormat.TEXTPROTO; this.rpcSendRequest(rpc); return asyncRes; } /* * Issues a streaming query and retrieve results in batches. * The returned QueryResult object will be populated over time with batches * of rows (each batch conveys ~128KB of data and a variable number of rows). * The caller can decide whether to wait that all batches have been received * (by awaiting the returned object or calling result.waitAllRows()) or handle * the rows incrementally. * * Example usage: * const res = engine.query('SELECT foo, bar FROM table'); * console.log(res.numRows()); // Will print 0 because we didn't await. * await(res.waitAllRows()); * console.log(res.numRows()); // Will print the total number of rows. * * for (const it = res.iter({foo: NUM, bar:STR}); it.valid(); it.next()) { * console.log(it.foo, it.bar); * } */ query(sqlQuery: string): Promise&QueryResult { const rpc = TraceProcessorRpc.create(); rpc.request = TPM.TPM_QUERY_STREAMING; rpc.queryArgs = new QueryArgs(); rpc.queryArgs.sqlQuery = sqlQuery; const result = createQueryResult({ query: sqlQuery, }); this.pendingQueries.push(result); this.rpcSendRequest(rpc); return result; } /** * Marshals the TraceProcessorRpc request arguments and sends the request * to the concrete Engine (Wasm or HTTP). */ private rpcSendRequest(rpc: TraceProcessorRpc) { rpc.seq = this.txSeqId++; // Each message is wrapped in a TraceProcessorRpcStream to add the varint // preamble with the size, which allows tokenization on the other end. const outerProto = TraceProcessorRpcStream.create(); outerProto.msg.push(rpc); const buf = TraceProcessorRpcStream.encode(outerProto).finish(); this.loadingTracker.beginLoading(); this.rpcSendRequestBytes(buf); } // TODO(hjd): When streaming must invalidate this somehow. async getCpus(): Promise { if (!this._cpus) { const cpus = []; const queryRes = await this.query( 'select distinct(cpu) as cpu from sched order by cpu;'); for (const it = queryRes.iter({cpu: NUM}); it.valid(); it.next()) { cpus.push(it.cpu); } this._cpus = cpus; } return this._cpus; } async getNumberOfGpus(): Promise { if (!this._numGpus) { const result = await this.query(` select count(distinct(gpu_id)) as gpuCount from gpu_counter_track where name = 'gpufreq'; `); this._numGpus = result.firstRow({gpuCount: NUM}).gpuCount; } return this._numGpus; } // TODO: This should live in code that's more specific to chrome, instead of // in engine. async getNumberOfProcesses(): Promise { const result = await this.query('select count(*) as cnt from process;'); return result.firstRow({cnt: NUM}).cnt; } async getTraceTimeBounds(): Promise { const result = await this.query( `select start_ts as startTs, end_ts as endTs from trace_bounds`); const bounds = result.firstRow({ startTs: NUM, endTs: NUM, }); return new TimeSpan(bounds.startTs / 1e9, bounds.endTs / 1e9); } async getTracingMetadataTimeBounds(): Promise { const queryRes = await this.query(`select name, int_value as intValue from metadata where name = 'tracing_started_ns' or name = 'tracing_disabled_ns' or name = 'all_data_source_started_ns'`); let startBound = -Infinity; let endBound = Infinity; const it = queryRes.iter({'name': STR, 'intValue': NUM_NULL}); for (; it.valid(); it.next()) { const columnName = it.name; const timestamp = it.intValue; if (timestamp === null) continue; if (columnName === 'tracing_disabled_ns') { endBound = Math.min(endBound, timestamp / 1e9); } else { startBound = Math.max(startBound, timestamp / 1e9); } } return new TimeSpan(startBound, endBound); } }