• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright (C) 2021 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// This file deals with deserialization and iteration of the proto-encoded
16// byte buffer that is returned by TraceProcessor when invoking the
17// TPM_QUERY_STREAMING method. The returned |query_result| buffer is optimized
18// for being moved cheaply across workers and decoded on-the-flight as we step
19// through the iterator.
20// See comments around QueryResult in trace_processor.proto for more details.
21
22// The classes in this file are organized as follows:
23//
24// QueryResultImpl:
25// The object returned by the Engine.query(sql) method.
26// This object is a holder of row data. Batches of raw get appended
27// incrementally as they are received by the remote TraceProcessor instance.
28// QueryResultImpl also deals with asynchronicity of queries and allows callers
29// to obtain a promise that waits for more (or all) rows.
30// At any point in time the following objects hold a reference to QueryResult:
31// - The Engine: for appending row batches.
32// - UI code, typically controllers, who make queries.
33//
34// ResultBatch:
35// Hold the data, returned by the remote TraceProcessor instance, for a number
36// of rows (TP typically chunks the results in batches of 128KB).
37// A QueryResultImpl holds exclusively ResultBatches for a given query.
38// ResultBatch is not exposed externally, it's just an internal representation
39// that helps with proto decoding. ResultBatch is immutable after it gets
40// appended and decoded. The iteration state is held by the RowIteratorImpl.
41//
42// RowIteratorImpl:
43// Decouples the data owned by QueryResultImpl (and its ResultBatch(es)) from
44// the iteration state. The iterator effectively is the union of a ResultBatch
45// and the row number in it. Rows within the batch are decoded as the user calls
46// next(). When getting at the end of the batch, it takes care of switching to
47// the next batch (if any) within the QueryResultImpl.
48// This object is part of the API exposed to tracks / controllers.
49
50// Ensure protobuf is initialized.
51import '../base/static_initializers';
52import protobuf from 'protobufjs/minimal';
53import {defer, Deferred} from '../base/deferred';
54import {assertExists, assertFalse, assertTrue} from '../base/logging';
55import {utf8Decode} from '../base/string_utils';
56import {Duration, duration, Time, time} from '../base/time';
57
58export type SqlValue = string | number | bigint | null | Uint8Array;
59// TODO(altimin): Replace ColumnType with SqlValue across the codebase and
60// remove export here.
61export type ColumnType = SqlValue;
62
63export const UNKNOWN: ColumnType = null;
64export const NUM = 0;
65export const STR = 'str';
66export const NUM_NULL: number | null = 1;
67export const STR_NULL: string | null = 'str_null';
68export const BLOB: Uint8Array = new Uint8Array();
69export const BLOB_NULL: Uint8Array | null = new Uint8Array();
70export const LONG: bigint = 0n;
71export const LONG_NULL: bigint | null = 1n;
72
73const SHIFT_32BITS = 32n;
74
75// Describes the inheritance tree of the above types.
76const inheritanceTree = new Map<ColumnType, {readonly extends?: ColumnType}>([
77  [NUM, {extends: NUM_NULL}],
78  [NUM_NULL, {extends: UNKNOWN}],
79  [LONG, {extends: LONG_NULL}],
80  [LONG_NULL, {extends: UNKNOWN}],
81  [BLOB, {extends: BLOB_NULL}],
82  [BLOB_NULL, {extends: UNKNOWN}],
83  [STR, {extends: STR_NULL}],
84  [STR_NULL, {extends: UNKNOWN}],
85  [UNKNOWN, {}],
86]);
87
88/**
89 * Check whether a given type extends another.
90 *
91 * @param required - The type we want to extend.
92 * @param actual - The type to test.
93 * @returns - True if `actual` extends `required`.
94 */
95export function checkExtends(
96  required: ColumnType,
97  actual: ColumnType,
98): boolean {
99  // If the types are the same, just return true
100  if (required === actual) return true;
101
102  const ancestry = getAncestryPath(actual);
103  return ancestry.includes(required);
104}
105
106/**
107 * Returns the closest common ancestor of two types.
108 */
109export function unionTypes(
110  typeA: ColumnType,
111  typeB: ColumnType,
112): ColumnType | undefined {
113  // If the types are the same, just return the same type
114  if (typeA === typeB) return typeA;
115
116  // Get the ancestry path for each type
117  const pathA = getAncestryPath(typeA);
118  const pathB = getAncestryPath(typeB);
119
120  // Find the first common type in both ancestry paths
121  for (const type of pathA) {
122    if (pathB.includes(type)) {
123      return type;
124    }
125  }
126
127  return undefined;
128}
129
130/**
131 * Returns the ancestry path from the given type to the root, inclusive.
132 */
133function getAncestryPath(type: ColumnType): ColumnType[] {
134  const path: ColumnType[] = [type];
135  let current = inheritanceTree.get(type);
136
137  while (current && current.extends !== undefined) {
138    path.push(current.extends);
139    current = inheritanceTree.get(current.extends);
140  }
141
142  return path;
143}
144
145// Fast decode varint int64 into a bigint
146// Inspired by
147// https://github.com/protobufjs/protobuf.js/blob/56b1e64979dae757b67a21d326e16acee39f2267/src/reader.js#L123
148export function decodeInt64Varint(buf: Uint8Array, pos: number): bigint {
149  let hi: number = 0;
150  let lo: number = 0;
151  let i = 0;
152
153  if (buf.length - pos > 4) {
154    // fast route (lo)
155    for (; i < 4; ++i) {
156      // 1st..4th
157      lo = (lo | ((buf[pos] & 127) << (i * 7))) >>> 0;
158      if (buf[pos++] < 128) {
159        return BigInt(lo);
160      }
161    }
162    // 5th
163    lo = (lo | ((buf[pos] & 127) << 28)) >>> 0;
164    hi = (hi | ((buf[pos] & 127) >> 4)) >>> 0;
165    if (buf[pos++] < 128) {
166      return (BigInt(hi) << SHIFT_32BITS) | BigInt(lo);
167    }
168    i = 0;
169  } else {
170    for (; i < 3; ++i) {
171      if (pos >= buf.length) {
172        throw Error('Index out of range');
173      }
174      // 1st..3rd
175      lo = (lo | ((buf[pos] & 127) << (i * 7))) >>> 0;
176      if (buf[pos++] < 128) {
177        return BigInt(lo);
178      }
179    }
180    // 4th
181    lo = (lo | ((buf[pos++] & 127) << (i * 7))) >>> 0;
182    return (BigInt(hi) << SHIFT_32BITS) | BigInt(lo);
183  }
184  if (buf.length - pos > 4) {
185    // fast route (hi)
186    for (; i < 5; ++i) {
187      // 6th..10th
188      hi = (hi | ((buf[pos] & 127) << (i * 7 + 3))) >>> 0;
189      if (buf[pos++] < 128) {
190        const big = (BigInt(hi) << SHIFT_32BITS) | BigInt(lo);
191        return BigInt.asIntN(64, big);
192      }
193    }
194  } else {
195    for (; i < 5; ++i) {
196      if (pos >= buf.length) {
197        throw Error('Index out of range');
198      }
199      // 6th..10th
200      hi = (hi | ((buf[pos] & 127) << (i * 7 + 3))) >>> 0;
201      if (buf[pos++] < 128) {
202        const big = (BigInt(hi) << SHIFT_32BITS) | BigInt(lo);
203        return BigInt.asIntN(64, big);
204      }
205    }
206  }
207  throw Error('invalid varint encoding');
208}
209
210// Info that could help debug a query error. For example the query
211// in question, the stack where the query was issued, the active
212// plugin etc.
213export interface QueryErrorInfo {
214  query: string;
215}
216
217export class QueryError extends Error {
218  readonly query: string;
219
220  constructor(message: string, info: QueryErrorInfo) {
221    super(message);
222    this.query = info.query;
223  }
224
225  toString() {
226    return `${super.toString()}\nQuery:\n${this.query}`;
227  }
228}
229
230// One row extracted from an SQL result:
231export interface Row {
232  [key: string]: ColumnType;
233}
234
235// The methods that any iterator has to implement.
236export interface RowIteratorBase {
237  valid(): boolean;
238  next(): void;
239
240  // Reflection support for cases where the column names are not known upfront
241  // (e.g. the query result table for user-provided SQL queries).
242  // It throws if the passed column name doesn't exist.
243  // Example usage:
244  // for (const it = queryResult.iter({}); it.valid(); it.next()) {
245  //   for (const columnName : queryResult.columns()) {
246  //      console.log(it.get(columnName));
247  get(columnName: string): ColumnType;
248}
249
250// A RowIterator is a type that has all the fields defined in the query spec
251// plus the valid() and next() operators. This is to ultimately allow the
252// clients to do:
253// const result = await engine.query("select name, surname, id from people;");
254// const iter = queryResult.iter({name: STR, surname: STR, id: NUM});
255// for (; iter.valid(); iter.next())
256//  console.log(iter.name, iter.surname);
257export type RowIterator<T extends Row> = RowIteratorBase & T;
258
259function columnTypeToString(t: ColumnType): string {
260  switch (t) {
261    case NUM:
262      return 'NUM';
263    case NUM_NULL:
264      return 'NUM_NULL';
265    case STR:
266      return 'STR';
267    case STR_NULL:
268      return 'STR_NULL';
269    case BLOB:
270      return 'BLOB';
271    case BLOB_NULL:
272      return 'BLOB_NULL';
273    case LONG:
274      return 'LONG';
275    case LONG_NULL:
276      return 'LONG_NULL';
277    case UNKNOWN:
278      return 'UNKNOWN';
279    default:
280      return `INVALID(${t})`;
281  }
282}
283
284function isCompatible(actual: CellType, expected: ColumnType): boolean {
285  switch (actual) {
286    case CellType.CELL_NULL:
287      return (
288        expected === NUM_NULL ||
289        expected === STR_NULL ||
290        expected === BLOB_NULL ||
291        expected === LONG_NULL ||
292        expected === UNKNOWN
293      );
294    case CellType.CELL_VARINT:
295      return (
296        expected === NUM ||
297        expected === NUM_NULL ||
298        expected === LONG ||
299        expected === LONG_NULL ||
300        expected === UNKNOWN
301      );
302    case CellType.CELL_FLOAT64:
303      return expected === NUM || expected === NUM_NULL || expected === UNKNOWN;
304    case CellType.CELL_STRING:
305      return expected === STR || expected === STR_NULL || expected === UNKNOWN;
306    case CellType.CELL_BLOB:
307      return (
308        expected === BLOB || expected === BLOB_NULL || expected === UNKNOWN
309      );
310    default:
311      throw new Error(`Unknown CellType ${actual}`);
312  }
313}
314
315// This has to match CellType in trace_processor.proto.
316enum CellType {
317  CELL_NULL = 1,
318  CELL_VARINT = 2,
319  CELL_FLOAT64 = 3,
320  CELL_STRING = 4,
321  CELL_BLOB = 5,
322}
323
324const CELL_TYPE_NAMES = [
325  'UNKNOWN',
326  'NULL',
327  'VARINT',
328  'FLOAT64',
329  'STRING',
330  'BLOB',
331];
332
333const TAG_LEN_DELIM = 2;
334
335// This is the interface exposed to readers (e.g. tracks). The underlying object
336// (QueryResultImpl) owns the result data. This allows to obtain iterators on
337// that. In future it will allow to wait for incremental updates (new rows being
338// fetched) for streaming queries.
339export interface QueryResult {
340  // Obtains an iterator.
341  // TODO(primiano): this should have an option to destruct data as we read. In
342  // the case of a long query (e.g. `SELECT * FROM sched` in the query prompt)
343  // we don't want to accumulate everything in memory. OTOH UI tracks want to
344  // keep the data around so they can redraw them on each animation frame. For
345  // now we keep everything in memory in the QueryResultImpl object.
346  // iter<T extends Row>(spec: T): RowIterator<T>;
347  iter<T extends Row>(spec: T): RowIterator<T>;
348
349  // Like iter() for queries that expect only one row. It embeds the valid()
350  // check (i.e. throws if no rows are available) and returns directly the
351  // first result.
352  firstRow<T extends Row>(spec: T): T;
353
354  // Like firstRow() but returns undefined if no rows are available.
355  maybeFirstRow<T extends Row>(spec: T): T | undefined;
356
357  // If != undefined the query errored out and error() contains the message.
358  error(): string | undefined;
359
360  // Returns the number of rows accumulated so far. Note that this number can
361  // change over time as more batches are received. It becomes stable only
362  // when isComplete() returns true or after waitAllRows() is resolved.
363  numRows(): number;
364
365  // If true all rows have been fetched. Calling iter() will iterate through the
366  // last row. If false, iter() will return an iterator which might iterate
367  // through some rows (or none) but will surely not reach the end.
368  isComplete(): boolean;
369
370  // Returns a promise that is resolved only when all rows (i.e. all batches)
371  // have been fetched. The promise return value is always the object itself.
372  waitAllRows(): Promise<QueryResult>;
373
374  // Returns a promise that is resolved when either:
375  // - more rows are available
376  // - all rows are available
377  // The promise return value is always the object iself.
378  waitMoreRows(): Promise<QueryResult>;
379
380  // Can return an empty array if called before the first batch is resolved.
381  // This should be called only after having awaited for at least one batch.
382  columns(): string[];
383
384  // Returns the number of SQL statements in the query
385  // (e.g. 2 'if SELECT 1; SELECT 2;')
386  statementCount(): number;
387
388  // Returns the number of SQL statement that produced output rows. This number
389  // is <= statementCount().
390  statementWithOutputCount(): number;
391
392  // Returns the last SQL statement.
393  lastStatementSql(): string;
394}
395
396// Interface exposed to engine.ts to pump in the data as new row batches arrive.
397export interface WritableQueryResult extends QueryResult {
398  // |resBytes| is a proto-encoded trace_processor.QueryResult message.
399  //  The overall flow looks as follows:
400  // - The user calls engine.query('select ...') and gets a QueryResult back.
401  // - The query call posts a message to the worker that runs the SQL engine (
402  //   or sends a HTTP request in case of the RPC+HTTP interface).
403  // - The returned QueryResult object is initially empty.
404  // - Over time, the sql engine will postMessage() back results in batches.
405  // - Each bach will end up calling this appendResultBatch() method.
406  // - If there is any pending promise (e.g. the caller called
407  //   queryResult.waitAllRows()), this call will awake them (if this is the
408  //   last batch).
409  appendResultBatch(resBytes: Uint8Array): void;
410}
411
412// The actual implementation, which bridges together the reader side and the
413// writer side (the one exposed to the Engine). This is the same object so that
414// when the engine pumps new row batches we can resolve pending promises that
415// readers (e.g. track code) are waiting for.
416class QueryResultImpl implements QueryResult, WritableQueryResult {
417  columnNames: string[] = [];
418  private _error?: string;
419  private _numRows = 0;
420  private _isComplete = false;
421  private _errorInfo: QueryErrorInfo;
422  private _statementCount = 0;
423  private _statementWithOutputCount = 0;
424  private _lastStatementSql = '';
425
426  constructor(errorInfo: QueryErrorInfo) {
427    this._errorInfo = errorInfo;
428  }
429
430  // --- QueryResult implementation.
431
432  // TODO(primiano): for the moment new batches are appended but old batches
433  // are never removed. This won't work with abnormally large result sets, as
434  // it will stash all rows in memory. We could switch to a model where the
435  // iterator is destructive and deletes batch objects once iterating past the
436  // end of each batch. If we do that, than we need to assign monotonic IDs to
437  // batches. Also if we do that, we should prevent creating more than one
438  // iterator for a QueryResult.
439  batches: ResultBatch[] = [];
440
441  // Promise awaiting on waitAllRows(). This should be resolved only when the
442  // last result batch has been been retrieved.
443  private allRowsPromise?: Deferred<QueryResult>;
444
445  // Promise awaiting on waitMoreRows(). This resolved when the next
446  // batch is appended via appendResultBatch.
447  private moreRowsPromise?: Deferred<QueryResult>;
448
449  isComplete(): boolean {
450    return this._isComplete;
451  }
452  numRows(): number {
453    return this._numRows;
454  }
455  error(): string | undefined {
456    return this._error;
457  }
458  columns(): string[] {
459    return this.columnNames;
460  }
461  statementCount(): number {
462    return this._statementCount;
463  }
464  statementWithOutputCount(): number {
465    return this._statementWithOutputCount;
466  }
467  lastStatementSql(): string {
468    return this._lastStatementSql;
469  }
470
471  iter<T extends Row>(spec: T): RowIterator<T> {
472    const impl = new RowIteratorImplWithRowData(spec, this);
473    return impl as {} as RowIterator<T>;
474  }
475
476  firstRow<T extends Row>(spec: T): T {
477    const impl = new RowIteratorImplWithRowData(spec, this);
478    assertTrue(impl.valid());
479    return impl as {} as RowIterator<T> as T;
480  }
481
482  maybeFirstRow<T extends Row>(spec: T): T | undefined {
483    const impl = new RowIteratorImplWithRowData(spec, this);
484    if (!impl.valid()) {
485      return undefined;
486    }
487    return impl as {} as RowIterator<T> as T;
488  }
489
490  // Can be called only once.
491  waitAllRows(): Promise<QueryResult> {
492    assertTrue(this.allRowsPromise === undefined);
493    this.allRowsPromise = defer<QueryResult>();
494    if (this._isComplete) {
495      this.resolveOrReject(this.allRowsPromise, this);
496    }
497    return this.allRowsPromise;
498  }
499
500  waitMoreRows(): Promise<QueryResult> {
501    if (this.moreRowsPromise !== undefined) {
502      return this.moreRowsPromise;
503    }
504
505    const moreRowsPromise = defer<QueryResult>();
506    if (this._isComplete) {
507      this.resolveOrReject(moreRowsPromise, this);
508    } else {
509      this.moreRowsPromise = moreRowsPromise;
510    }
511    return moreRowsPromise;
512  }
513
514  // --- WritableQueryResult implementation.
515
516  // Called by the engine when a new QueryResult is available. Note that a
517  // single Query() call can yield >1 QueryResult due to result batching
518  // if more than ~64K of data are returned, e.g. when returning O(M) rows.
519  // |resBytes| is a proto-encoded trace_processor.QueryResult message.
520  // It is fine to retain the resBytes without slicing a copy, because
521  // ProtoRingBuffer does the slice() for us (or passes through the buffer
522  // coming from postMessage() (Wasm case) of fetch() (HTTP+RPC case).
523  appendResultBatch(resBytes: Uint8Array) {
524    const reader = protobuf.Reader.create(resBytes);
525    assertTrue(reader.pos === 0);
526    const columnNamesEmptyAtStartOfBatch = this.columnNames.length === 0;
527    const columnNamesSet = new Set<string>();
528    while (reader.pos < reader.len) {
529      const tag = reader.uint32();
530      switch (tag >>> 3) {
531        case 1: // column_names
532          // Only the first batch should contain the column names. If this fires
533          // something is going wrong in the handling of the batch stream.
534          assertTrue(columnNamesEmptyAtStartOfBatch);
535          const origColName = reader.string();
536          let colName = origColName;
537          // In some rare cases two columns can have the same name (b/194891824)
538          // e.g. `select 1 as x, 2 as x`. These queries don't happen in the
539          // UI code, but they can happen when the user types a query (e.g.
540          // with a join). The most practical thing we can do here is renaming
541          // the columns with a suffix. Keeping the same name will break when
542          // iterating, because column names become iterator object keys.
543          for (let i = 1; columnNamesSet.has(colName); ++i) {
544            colName = `${origColName}_${i}`;
545            assertTrue(i < 100); // Give up at some point;
546          }
547          columnNamesSet.add(colName);
548          this.columnNames.push(colName);
549          break;
550        case 2: // error
551          // The query has errored only if the |error| field is non-empty.
552          // In protos, we don't distinguish between non-present and empty.
553          // Make sure we don't propagate ambiguous empty strings to JS.
554          const err = reader.string();
555          this._error = err !== undefined && err.length ? err : undefined;
556          break;
557        case 3: // batch
558          const batchLen = reader.uint32();
559          const batchRaw = resBytes.subarray(reader.pos, reader.pos + batchLen);
560          reader.pos += batchLen;
561
562          // The ResultBatch ctor parses the CellsBatch submessage.
563          const parsedBatch = new ResultBatch(batchRaw);
564          this.batches.push(parsedBatch);
565          this._isComplete = parsedBatch.isLastBatch;
566
567          // In theory one could construct a valid proto serializing the column
568          // names after the cell batches. In practice the QueryResultSerializer
569          // doesn't do that so it's not worth complicating the code.
570          const numColumns = this.columnNames.length;
571          if (numColumns !== 0) {
572            assertTrue(parsedBatch.numCells % numColumns === 0);
573            this._numRows += parsedBatch.numCells / numColumns;
574          } else {
575            // numColumns == 0 is  plausible for queries like CREATE TABLE ... .
576            assertTrue(parsedBatch.numCells === 0);
577          }
578          break;
579
580        case 4:
581          this._statementCount = reader.uint32();
582          break;
583
584        case 5:
585          this._statementWithOutputCount = reader.uint32();
586          break;
587
588        case 6:
589          this._lastStatementSql = reader.string();
590          break;
591
592        default:
593          console.warn(`Unexpected QueryResult field ${tag >>> 3}`);
594          reader.skipType(tag & 7);
595          break;
596      } // switch (tag)
597    } // while (pos < end)
598
599    if (this.moreRowsPromise !== undefined) {
600      this.resolveOrReject(this.moreRowsPromise, this);
601      this.moreRowsPromise = undefined;
602    }
603
604    if (this._isComplete && this.allRowsPromise !== undefined) {
605      this.resolveOrReject(this.allRowsPromise, this);
606    }
607  }
608
609  ensureAllRowsPromise(): Promise<QueryResult> {
610    if (this.allRowsPromise === undefined) {
611      this.waitAllRows(); // Will populate |this.allRowsPromise|.
612    }
613    return assertExists(this.allRowsPromise);
614  }
615
616  get errorInfo(): QueryErrorInfo {
617    return this._errorInfo;
618  }
619
620  private resolveOrReject(promise: Deferred<QueryResult>, arg: QueryResult) {
621    if (this._error === undefined) {
622      promise.resolve(arg);
623    } else {
624      promise.reject(new QueryError(this._error, this._errorInfo));
625    }
626  }
627}
628
629// This class holds onto a received result batch (a Uint8Array) and does some
630// partial parsing to tokenize the various cell groups. This parsing mainly
631// consists of identifying and caching the offsets of each cell group and
632// initializing the varint decoders. This half parsing is done to keep the
633// iterator's next() fast, without decoding everything into memory.
634// This is an internal implementation detail and is not exposed outside. The
635// RowIteratorImpl uses this class to iterate through batches (this class takes
636// care of iterating within a batch, RowIteratorImpl takes care of switching
637// batches when needed).
638// Note: at any point in time there can be more than one ResultIterator
639// referencing the same batch. The batch must be immutable.
640class ResultBatch {
641  readonly isLastBatch: boolean = false;
642  readonly batchBytes: Uint8Array;
643  readonly cellTypesOff: number = 0;
644  readonly cellTypesLen: number = 0;
645  readonly varintOff: number = 0;
646  readonly varintLen: number = 0;
647  readonly float64Cells = new Float64Array();
648  readonly blobCells: Uint8Array[] = [];
649  readonly stringCells: string[] = [];
650
651  // batchBytes is a trace_processor.QueryResult.CellsBatch proto.
652  constructor(batchBytes: Uint8Array) {
653    this.batchBytes = batchBytes;
654    const reader = protobuf.Reader.create(batchBytes);
655    assertTrue(reader.pos === 0);
656    const end = reader.len;
657
658    // Here we deconstruct the proto by hand. The CellsBatch is carefully
659    // designed to allow a very fast parsing from the TS side. We pack all cells
660    // of the same types together, so we can do only one call (per batch) to
661    // TextDecoder.decode(), we can overlay a memory-aligned typedarray for
662    // float values and can quickly tell and type-check the cell types.
663    // One row = N cells (we know the number upfront from the outer message).
664    // Each bach contains always an integer multiple of N cells (i.e. rows are
665    // never fragmented across different batches).
666    while (reader.pos < end) {
667      const tag = reader.uint32();
668      switch (tag >>> 3) {
669        case 1: // cell types, a packed array containing one CellType per cell.
670          assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint.
671          this.cellTypesLen = reader.uint32();
672          this.cellTypesOff = reader.pos;
673          reader.pos += this.cellTypesLen;
674          break;
675
676        case 2: // varint_cells, a packed varint buffer.
677          assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint.
678          const packLen = reader.uint32();
679          this.varintOff = reader.pos;
680          this.varintLen = packLen;
681          assertTrue(reader.buf === batchBytes);
682          assertTrue(
683            this.varintOff + this.varintLen <=
684              batchBytes.byteOffset + batchBytes.byteLength,
685          );
686          reader.pos += packLen;
687          break;
688
689        case 3: // float64_cells, a 64-bit aligned packed fixed64 buffer.
690          assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint.
691          const f64Len = reader.uint32();
692          assertTrue(f64Len % 8 === 0);
693          // Float64Array's constructor is evil: the offset is in bytes but the
694          // length is in 8-byte words.
695          const f64Words = f64Len / 8;
696          const f64Off = batchBytes.byteOffset + reader.pos;
697          if (f64Off % 8 === 0) {
698            this.float64Cells = new Float64Array(
699              batchBytes.buffer,
700              f64Off,
701              f64Words,
702            );
703          } else {
704            // When using the production code in trace_processor's rpc.cc, the
705            // float64 should be 8-bytes aligned. The slow-path case is only for
706            // tests.
707            const slice = batchBytes.buffer.slice(f64Off, f64Off + f64Len);
708            this.float64Cells = new Float64Array(slice);
709          }
710          reader.pos += f64Len;
711          break;
712
713        case 4: // blob_cells: one entry per blob.
714          assertTrue((tag & 7) === TAG_LEN_DELIM);
715          // protobufjs's bytes() under the hoods calls slice() and creates
716          // a copy. Fine here as blobs are rare and not a fastpath.
717          this.blobCells.push(new Uint8Array(reader.bytes()));
718          break;
719
720        case 5: // string_cells: all the string cells concatenated with \0s.
721          assertTrue((tag & 7) === TAG_LEN_DELIM);
722          const strLen = reader.uint32();
723          assertTrue(reader.pos + strLen <= end);
724          const subArr = batchBytes.subarray(reader.pos, reader.pos + strLen);
725          assertTrue(subArr.length === strLen);
726          // The reason why we do this split rather than creating one string
727          // per entry is that utf8 decoding has some non-negligible cost. See
728          // go/postmessage-benchmark .
729          this.stringCells = utf8Decode(subArr).split('\0');
730          reader.pos += strLen;
731          break;
732
733        case 6: // is_last_batch (boolean).
734          this.isLastBatch = !!reader.bool();
735          break;
736
737        case 7: // padding for realignment, skip silently.
738          reader.skipType(tag & 7);
739          break;
740
741        default:
742          console.warn(`Unexpected QueryResult.CellsBatch field ${tag >>> 3}`);
743          reader.skipType(tag & 7);
744          break;
745      } // switch(tag)
746    } // while (pos < end)
747  }
748
749  get numCells() {
750    return this.cellTypesLen;
751  }
752}
753
754class RowIteratorImpl implements RowIteratorBase {
755  // The spec passed to the iter call containing the expected types, e.g.:
756  // {'colA': NUM, 'colB': NUM_NULL, 'colC': STRING}.
757  // This doesn't ever change.
758  readonly rowSpec: Row;
759
760  // The object that holds the current row. This points to the parent
761  // RowIteratorImplWithRowData instance that created this class.
762  rowData: Row;
763
764  // The QueryResult object we are reading data from. The engine will pump
765  // batches over time into this object.
766  private resultObj: QueryResultImpl;
767
768  // All the member variables in the group below point to the identically-named
769  // members in result.batch[batchIdx]. This is to avoid indirection layers in
770  // the next() hotpath, so we can do this.float64Cells vs
771  // this.resultObj.batch[this.batchIdx].float64Cells.
772  // These are re-set every time tryMoveToNextBatch() is called (and succeeds).
773  private batchIdx = -1; // The batch index within |result.batches[]|.
774  private batchBytes = new Uint8Array();
775  private columnNames: string[] = [];
776  private numColumns = 0;
777  private cellTypesEnd = -1; // -1 so the 1st next() hits tryMoveToNextBatch().
778  private float64Cells = new Float64Array();
779  private varIntReader = protobuf.Reader.create(this.batchBytes);
780  private blobCells: Uint8Array[] = [];
781  private stringCells: string[] = [];
782
783  // These members instead are incremented as we read cells from next(). They
784  // are the mutable state of the iterator.
785  private nextCellTypeOff = 0;
786  private nextFloat64Cell = 0;
787  private nextStringCell = 0;
788  private nextBlobCell = 0;
789  private isValid = false;
790
791  constructor(querySpec: Row, rowData: Row, res: QueryResultImpl) {
792    Object.assign(this, querySpec);
793    this.rowData = rowData;
794    this.rowSpec = {...querySpec}; // ... -> Copy all the key/value pairs.
795    this.resultObj = res;
796    this.next();
797  }
798
799  valid(): boolean {
800    return this.isValid;
801  }
802
803  private makeError(message: string): QueryError {
804    return new QueryError(message, this.resultObj.errorInfo);
805  }
806
807  get(columnName: string): ColumnType {
808    const res = this.rowData[columnName];
809    if (res === undefined) {
810      throw this.makeError(
811        `Column '${columnName}' doesn't exist. ` +
812          `Actual columns: [${this.columnNames.join(',')}]`,
813      );
814    }
815    return res;
816  }
817
818  // Moves the cursor next by one row and updates |isValid|.
819  // When this fails to move, two cases are possible:
820  // 1. We reached the end of the result set (this is the case if
821  //    QueryResult.isComplete() == true when this fails).
822  // 2. We reached the end of the current batch, but more rows might come later
823  //    (if QueryResult.isComplete() == false).
824  next() {
825    // At some point we might reach the end of the current batch, but the next
826    // batch might be available already. In this case we want next() to
827    // transparently move on to the next batch.
828    while (this.nextCellTypeOff + this.numColumns > this.cellTypesEnd) {
829      // If TraceProcessor is behaving well, we should never end up in a
830      // situation where we have leftover cells. TP is expected to serialize
831      // whole rows in each QueryResult batch and NOT truncate them midway.
832      // If this assert fires the TP RPC logic has a bug.
833      assertTrue(
834        this.nextCellTypeOff === this.cellTypesEnd || this.cellTypesEnd === -1,
835      );
836      if (!this.tryMoveToNextBatch()) {
837        this.isValid = false;
838        return;
839      }
840    }
841
842    const rowData = this.rowData;
843    const numColumns = this.numColumns;
844
845    // Read the current row.
846    for (let i = 0; i < numColumns; i++) {
847      const cellType = this.batchBytes[this.nextCellTypeOff++];
848      const colName = this.columnNames[i];
849      const expType = this.rowSpec[colName];
850
851      switch (cellType) {
852        case CellType.CELL_NULL:
853          rowData[colName] = null;
854          break;
855
856        case CellType.CELL_VARINT:
857          if (expType === NUM || expType === NUM_NULL) {
858            // This is very subtle. The return type of int64 can be either a
859            // number or a Long.js {high:number, low:number} if Long.js is
860            // installed. The default state seems different in node and browser.
861            // We force-disable Long.js support in the top of this source file.
862            const val = this.varIntReader.int64();
863            rowData[colName] = val as {} as number;
864          } else {
865            // LONG, LONG_NULL, or unspecified - return as bigint
866            const value = decodeInt64Varint(
867              this.batchBytes,
868              this.varIntReader.pos,
869            );
870            rowData[colName] = value;
871            this.varIntReader.skip(); // Skips a varint
872          }
873          break;
874
875        case CellType.CELL_FLOAT64:
876          rowData[colName] = this.float64Cells[this.nextFloat64Cell++];
877          break;
878
879        case CellType.CELL_STRING:
880          rowData[colName] = this.stringCells[this.nextStringCell++];
881          break;
882
883        case CellType.CELL_BLOB:
884          const blob = this.blobCells[this.nextBlobCell++];
885          rowData[colName] = blob;
886          break;
887
888        default:
889          throw this.makeError(`Invalid cell type ${cellType}`);
890      }
891    } // For (cells)
892    this.isValid = true;
893  }
894
895  private tryMoveToNextBatch(): boolean {
896    const nextBatchIdx = this.batchIdx + 1;
897    if (nextBatchIdx >= this.resultObj.batches.length) {
898      return false;
899    }
900
901    this.columnNames = this.resultObj.columnNames;
902    this.numColumns = this.columnNames.length;
903
904    this.batchIdx = nextBatchIdx;
905    const batch = assertExists(this.resultObj.batches[nextBatchIdx]);
906    this.batchBytes = batch.batchBytes;
907    this.nextCellTypeOff = batch.cellTypesOff;
908    this.cellTypesEnd = batch.cellTypesOff + batch.cellTypesLen;
909    this.float64Cells = batch.float64Cells;
910    this.blobCells = batch.blobCells;
911    this.stringCells = batch.stringCells;
912    this.varIntReader = protobuf.Reader.create(batch.batchBytes);
913    this.varIntReader.pos = batch.varintOff;
914    this.varIntReader.len = batch.varintOff + batch.varintLen;
915    this.nextFloat64Cell = 0;
916    this.nextStringCell = 0;
917    this.nextBlobCell = 0;
918
919    // Check that all the expected columns are present.
920    for (const expectedCol of Object.keys(this.rowSpec)) {
921      if (this.columnNames.indexOf(expectedCol) < 0) {
922        throw this.makeError(
923          `Column ${expectedCol} not found in the SQL result ` +
924            `set {${this.columnNames.join(' ')}}`,
925        );
926      }
927    }
928
929    // Check that the cells types are consistent.
930    const numColumns = this.numColumns;
931    if (batch.numCells === 0) {
932      // This can happen if the query result contains just an error. In this
933      // an empty batch with isLastBatch=true is appended as an EOF marker.
934      // In theory TraceProcessor could return an empty batch in the middle and
935      // that would be fine from a protocol viewpoint. In practice, no code path
936      // does that today so it doesn't make sense trying supporting it with a
937      // recursive call to tryMoveToNextBatch().
938      assertTrue(batch.isLastBatch);
939      return false;
940    }
941
942    assertTrue(numColumns > 0);
943    for (let i = this.nextCellTypeOff; i < this.cellTypesEnd; i++) {
944      const col = (i - this.nextCellTypeOff) % numColumns;
945      const colName = this.columnNames[col];
946      const actualType = this.batchBytes[i] as CellType;
947      const expType = this.rowSpec[colName];
948
949      // If undefined, the caller doesn't want to read this column at all, so
950      // it can be whatever.
951      if (expType === undefined) continue;
952
953      let err = '';
954      if (!isCompatible(actualType, expType)) {
955        if (actualType === CellType.CELL_NULL) {
956          err =
957            'SQL value is NULL but that was not expected' +
958            ` (expected type: ${columnTypeToString(expType)}). ` +
959            'Did you mean NUM_NULL, LONG_NULL, STR_NULL or BLOB_NULL?';
960        } else {
961          err = `Incompatible cell type. Expected: ${columnTypeToString(
962            expType,
963          )} actual: ${CELL_TYPE_NAMES[actualType]}`;
964        }
965      }
966      if (err.length > 0) {
967        const row = Math.floor(i / numColumns);
968        const message = `Error @ row: ${row} col: '${colName}': ${err}`;
969        throw this.makeError(message);
970      }
971    }
972    return true;
973  }
974}
975
976// This is the object ultimately returned to the client when calling
977// QueryResult.iter(...).
978// The only reason why this is disjoint from RowIteratorImpl is to avoid
979// naming collisions between the members variables required by RowIteratorImpl
980// and the column names returned by the iterator.
981class RowIteratorImplWithRowData implements RowIteratorBase {
982  private _impl: RowIteratorImpl;
983
984  next: () => void;
985  valid: () => boolean;
986  get: (columnName: string) => ColumnType;
987
988  constructor(querySpec: Row, res: QueryResultImpl) {
989    const thisAsRow = this as {} as Row;
990    Object.assign(thisAsRow, querySpec);
991    this._impl = new RowIteratorImpl(querySpec, thisAsRow, res);
992    this.next = this._impl.next.bind(this._impl);
993    this.valid = this._impl.valid.bind(this._impl);
994    this.get = this._impl.get.bind(this._impl);
995  }
996}
997
998// This is a proxy object that wraps QueryResultImpl, adding await-ability.
999// This is so that:
1000// 1. Clients that just want to await for the full result set can just call
1001//    await engine.query('...') and will get a QueryResult that is guaranteed
1002//    to be complete.
1003// 2. Clients that know how to handle the streaming can use it straight away.
1004class WaitableQueryResultImpl
1005  implements QueryResult, WritableQueryResult, PromiseLike<QueryResult>
1006{
1007  private impl: QueryResultImpl;
1008  private thenCalled = false;
1009
1010  constructor(errorInfo: QueryErrorInfo) {
1011    this.impl = new QueryResultImpl(errorInfo);
1012  }
1013
1014  // QueryResult implementation. Proxies all calls to the impl object.
1015  iter<T extends Row>(spec: T) {
1016    return this.impl.iter(spec);
1017  }
1018  firstRow<T extends Row>(spec: T) {
1019    return this.impl.firstRow(spec);
1020  }
1021  maybeFirstRow<T extends Row>(spec: T) {
1022    return this.impl.maybeFirstRow(spec);
1023  }
1024  waitAllRows() {
1025    return this.impl.waitAllRows();
1026  }
1027  waitMoreRows() {
1028    return this.impl.waitMoreRows();
1029  }
1030  isComplete() {
1031    return this.impl.isComplete();
1032  }
1033  numRows() {
1034    return this.impl.numRows();
1035  }
1036  columns() {
1037    return this.impl.columns();
1038  }
1039  error() {
1040    return this.impl.error();
1041  }
1042  statementCount() {
1043    return this.impl.statementCount();
1044  }
1045  statementWithOutputCount() {
1046    return this.impl.statementWithOutputCount();
1047  }
1048  lastStatementSql() {
1049    return this.impl.lastStatementSql();
1050  }
1051
1052  // WritableQueryResult implementation.
1053  appendResultBatch(resBytes: Uint8Array) {
1054    return this.impl.appendResultBatch(resBytes);
1055  }
1056
1057  // PromiseLike<QueryResult> implementaton.
1058
1059  // eslint-disable-next-line @typescript-eslint/no-explicit-any
1060  then(onfulfilled: any, onrejected: any): any {
1061    assertFalse(this.thenCalled);
1062    this.thenCalled = true;
1063    return this.impl.ensureAllRowsPromise().then(onfulfilled, onrejected);
1064  }
1065
1066  // eslint-disable-next-line @typescript-eslint/no-explicit-any
1067  catch(error: any): any {
1068    return this.impl.ensureAllRowsPromise().catch(error);
1069  }
1070
1071  // eslint-disable-next-line @typescript-eslint/no-explicit-any
1072  finally(callback: () => void): any {
1073    return this.impl.ensureAllRowsPromise().finally(callback);
1074  }
1075
1076  // eslint and clang-format disagree on how to format get[foo](). Let
1077  // clang-format win:
1078  get [Symbol.toStringTag](): string {
1079    return 'Promise<WaitableQueryResult>';
1080  }
1081}
1082
1083export function createQueryResult(
1084  errorInfo: QueryErrorInfo,
1085): QueryResult & Promise<QueryResult> & WritableQueryResult {
1086  return new WaitableQueryResultImpl(errorInfo);
1087}
1088
1089// Throws if the value cannot be reasonably converted to a bigint.
1090// Assumes value is in native time units.
1091export function timeFromSql(value: ColumnType): time {
1092  if (typeof value === 'bigint') {
1093    return Time.fromRaw(value);
1094  } else if (typeof value === 'number') {
1095    return Time.fromRaw(BigInt(Math.floor(value)));
1096  } else if (value === null) {
1097    return Time.ZERO;
1098  } else {
1099    throw Error(`Refusing to create time from unrelated type ${value}`);
1100  }
1101}
1102
1103// Throws if the value cannot be reasonably converted to a bigint.
1104// Assumes value is in nanoseconds.
1105export function durationFromSql(value: ColumnType): duration {
1106  if (typeof value === 'bigint') {
1107    return value;
1108  } else if (typeof value === 'number') {
1109    return BigInt(Math.floor(value));
1110  } else if (value === null) {
1111    return Duration.ZERO;
1112  } else {
1113    throw Error(`Refusing to create duration from unrelated type ${value}`);
1114  }
1115}
1116