• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright (C) 2021 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// This file deals with deserialization and iteration of the proto-encoded
16// byte buffer that is returned by TraceProcessor when invoking the
17// TPM_QUERY_STREAMING method. The returned |query_result| buffer is optimized
18// for being moved cheaply across workers and decoded on-the-flight as we step
19// through the iterator.
20// See comments around QueryResult in trace_processor.proto for more details.
21
22// The classes in this file are organized as follows:
23//
24// QueryResultImpl:
25// The object returned by the Engine.query(sql) method.
26// This object is a holder of row data. Batches of raw get appended
27// incrementally as they are received by the remote TraceProcessor instance.
28// QueryResultImpl also deals with asynchronicity of queries and allows callers
29// to obtain a promise that waits for more (or all) rows.
30// At any point in time the following objects hold a reference to QueryResult:
31// - The Engine: for appending row batches.
32// - UI code, typically controllers, who make queries.
33//
34// ResultBatch:
35// Hold the data, returned by the remote TraceProcessor instance, for a number
36// of rows (TP typically chunks the results in batches of 128KB).
37// A QueryResultImpl holds exclusively ResultBatches for a given query.
38// ResultBatch is not exposed externally, it's just an internal representation
39// that helps with proto decoding. ResultBatch is immutable after it gets
40// appended and decoded. The iteration state is held by the RowIteratorImpl.
41//
42// RowIteratorImpl:
43// Decouples the data owned by QueryResultImpl (and its ResultBatch(es)) from
44// the iteration state. The iterator effectively is the union of a ResultBatch
45// and the row number in it. Rows within the batch are decoded as the user calls
46// next(). When getting at the end of the batch, it takes care of switching to
47// the next batch (if any) within the QueryResultImpl.
48// This object is part of the API exposed to tracks / controllers.
49
50// Ensure protobuf is initialized.
51import '../base/static_initializers';
52
53import protobuf from 'protobufjs/minimal';
54
55import {defer, Deferred} from '../base/deferred';
56import {assertExists, assertFalse, assertTrue} from '../base/logging';
57import {utf8Decode} from '../base/string_utils';
58import {Duration, duration, Time, time} from '../base/time';
59
60export type SqlValue = string | number | bigint | null | Uint8Array;
61// TODO(altimin): Replace ColumnType with SqlValue across the codebase and
62// remove export here.
63export type ColumnType = SqlValue;
64
65export const UNKNOWN: ColumnType = null;
66export const NUM = 0;
67export const STR = 'str';
68export const NUM_NULL: number | null = 1;
69export const STR_NULL: string | null = 'str_null';
70export const BLOB: Uint8Array = new Uint8Array();
71export const BLOB_NULL: Uint8Array | null = new Uint8Array();
72export const LONG: bigint = 0n;
73export const LONG_NULL: bigint | null = 1n;
74
75const SHIFT_32BITS = 32n;
76
77// Fast decode varint int64 into a bigint
78// Inspired by
79// https://github.com/protobufjs/protobuf.js/blob/56b1e64979dae757b67a21d326e16acee39f2267/src/reader.js#L123
80export function decodeInt64Varint(buf: Uint8Array, pos: number): bigint {
81  let hi: number = 0;
82  let lo: number = 0;
83  let i = 0;
84
85  if (buf.length - pos > 4) {
86    // fast route (lo)
87    for (; i < 4; ++i) {
88      // 1st..4th
89      lo = (lo | ((buf[pos] & 127) << (i * 7))) >>> 0;
90      if (buf[pos++] < 128) {
91        return BigInt(lo);
92      }
93    }
94    // 5th
95    lo = (lo | ((buf[pos] & 127) << 28)) >>> 0;
96    hi = (hi | ((buf[pos] & 127) >> 4)) >>> 0;
97    if (buf[pos++] < 128) {
98      return (BigInt(hi) << SHIFT_32BITS) | BigInt(lo);
99    }
100    i = 0;
101  } else {
102    for (; i < 3; ++i) {
103      if (pos >= buf.length) {
104        throw Error('Index out of range');
105      }
106      // 1st..3rd
107      lo = (lo | ((buf[pos] & 127) << (i * 7))) >>> 0;
108      if (buf[pos++] < 128) {
109        return BigInt(lo);
110      }
111    }
112    // 4th
113    lo = (lo | ((buf[pos++] & 127) << (i * 7))) >>> 0;
114    return (BigInt(hi) << SHIFT_32BITS) | BigInt(lo);
115  }
116  if (buf.length - pos > 4) {
117    // fast route (hi)
118    for (; i < 5; ++i) {
119      // 6th..10th
120      hi = (hi | ((buf[pos] & 127) << (i * 7 + 3))) >>> 0;
121      if (buf[pos++] < 128) {
122        const big = (BigInt(hi) << SHIFT_32BITS) | BigInt(lo);
123        return BigInt.asIntN(64, big);
124      }
125    }
126  } else {
127    for (; i < 5; ++i) {
128      if (pos >= buf.length) {
129        throw Error('Index out of range');
130      }
131      // 6th..10th
132      hi = (hi | ((buf[pos] & 127) << (i * 7 + 3))) >>> 0;
133      if (buf[pos++] < 128) {
134        const big = (BigInt(hi) << SHIFT_32BITS) | BigInt(lo);
135        return BigInt.asIntN(64, big);
136      }
137    }
138  }
139  throw Error('invalid varint encoding');
140}
141
142// Info that could help debug a query error. For example the query
143// in question, the stack where the query was issued, the active
144// plugin etc.
145export interface QueryErrorInfo {
146  query: string;
147}
148
149export class QueryError extends Error {
150  readonly query: string;
151
152  constructor(message: string, info: QueryErrorInfo) {
153    super(message);
154    this.query = info.query;
155  }
156
157  toString() {
158    return `${super.toString()}\nQuery:\n${this.query}`;
159  }
160}
161
162// One row extracted from an SQL result:
163export interface Row {
164  [key: string]: ColumnType;
165}
166
167// The methods that any iterator has to implement.
168export interface RowIteratorBase {
169  valid(): boolean;
170  next(): void;
171
172  // Reflection support for cases where the column names are not known upfront
173  // (e.g. the query result table for user-provided SQL queries).
174  // It throws if the passed column name doesn't exist.
175  // Example usage:
176  // for (const it = queryResult.iter({}); it.valid(); it.next()) {
177  //   for (const columnName : queryResult.columns()) {
178  //      console.log(it.get(columnName));
179  get(columnName: string): ColumnType;
180}
181
182// A RowIterator is a type that has all the fields defined in the query spec
183// plus the valid() and next() operators. This is to ultimately allow the
184// clients to do:
185// const result = await engine.query("select name, surname, id from people;");
186// const iter = queryResult.iter({name: STR, surname: STR, id: NUM});
187// for (; iter.valid(); iter.next())
188//  console.log(iter.name, iter.surname);
189export type RowIterator<T extends Row> = RowIteratorBase & T;
190
191function columnTypeToString(t: ColumnType): string {
192  switch (t) {
193    case NUM:
194      return 'NUM';
195    case NUM_NULL:
196      return 'NUM_NULL';
197    case STR:
198      return 'STR';
199    case STR_NULL:
200      return 'STR_NULL';
201    case BLOB:
202      return 'BLOB';
203    case BLOB_NULL:
204      return 'BLOB_NULL';
205    case LONG:
206      return 'LONG';
207    case LONG_NULL:
208      return 'LONG_NULL';
209    case UNKNOWN:
210      return 'UNKNOWN';
211    default:
212      return `INVALID(${t})`;
213  }
214}
215
216function isCompatible(actual: CellType, expected: ColumnType): boolean {
217  switch (actual) {
218    case CellType.CELL_NULL:
219      return (
220        expected === NUM_NULL ||
221        expected === STR_NULL ||
222        expected === BLOB_NULL ||
223        expected === LONG_NULL ||
224        expected === UNKNOWN
225      );
226    case CellType.CELL_VARINT:
227      return (
228        expected === NUM ||
229        expected === NUM_NULL ||
230        expected === LONG ||
231        expected === LONG_NULL ||
232        expected === UNKNOWN
233      );
234    case CellType.CELL_FLOAT64:
235      return expected === NUM || expected === NUM_NULL || expected === UNKNOWN;
236    case CellType.CELL_STRING:
237      return expected === STR || expected === STR_NULL || expected === UNKNOWN;
238    case CellType.CELL_BLOB:
239      return (
240        expected === BLOB || expected === BLOB_NULL || expected === UNKNOWN
241      );
242    default:
243      throw new Error(`Unknown CellType ${actual}`);
244  }
245}
246
247// This has to match CellType in trace_processor.proto.
248enum CellType {
249  CELL_NULL = 1,
250  CELL_VARINT = 2,
251  CELL_FLOAT64 = 3,
252  CELL_STRING = 4,
253  CELL_BLOB = 5,
254}
255
256const CELL_TYPE_NAMES = [
257  'UNKNOWN',
258  'NULL',
259  'VARINT',
260  'FLOAT64',
261  'STRING',
262  'BLOB',
263];
264
265const TAG_LEN_DELIM = 2;
266
267// This is the interface exposed to readers (e.g. tracks). The underlying object
268// (QueryResultImpl) owns the result data. This allows to obtain iterators on
269// that. In future it will allow to wait for incremental updates (new rows being
270// fetched) for streaming queries.
271export interface QueryResult {
272  // Obtains an iterator.
273  // TODO(primiano): this should have an option to destruct data as we read. In
274  // the case of a long query (e.g. `SELECT * FROM sched` in the query prompt)
275  // we don't want to accumulate everything in memory. OTOH UI tracks want to
276  // keep the data around so they can redraw them on each animation frame. For
277  // now we keep everything in memory in the QueryResultImpl object.
278  // iter<T extends Row>(spec: T): RowIterator<T>;
279  iter<T extends Row>(spec: T): RowIterator<T>;
280
281  // Like iter() for queries that expect only one row. It embeds the valid()
282  // check (i.e. throws if no rows are available) and returns directly the
283  // first result.
284  firstRow<T extends Row>(spec: T): T;
285
286  // If != undefined the query errored out and error() contains the message.
287  error(): string | undefined;
288
289  // Returns the number of rows accumulated so far. Note that this number can
290  // change over time as more batches are received. It becomes stable only
291  // when isComplete() returns true or after waitAllRows() is resolved.
292  numRows(): number;
293
294  // If true all rows have been fetched. Calling iter() will iterate through the
295  // last row. If false, iter() will return an iterator which might iterate
296  // through some rows (or none) but will surely not reach the end.
297  isComplete(): boolean;
298
299  // Returns a promise that is resolved only when all rows (i.e. all batches)
300  // have been fetched. The promise return value is always the object itself.
301  waitAllRows(): Promise<QueryResult>;
302
303  // Returns a promise that is resolved when either:
304  // - more rows are available
305  // - all rows are available
306  // The promise return value is always the object iself.
307  waitMoreRows(): Promise<QueryResult>;
308
309  // Can return an empty array if called before the first batch is resolved.
310  // This should be called only after having awaited for at least one batch.
311  columns(): string[];
312
313  // Returns the number of SQL statements in the query
314  // (e.g. 2 'if SELECT 1; SELECT 2;')
315  statementCount(): number;
316
317  // Returns the number of SQL statement that produced output rows. This number
318  // is <= statementCount().
319  statementWithOutputCount(): number;
320
321  // Returns the last SQL statement.
322  lastStatementSql(): string;
323}
324
325// Interface exposed to engine.ts to pump in the data as new row batches arrive.
326export interface WritableQueryResult extends QueryResult {
327  // |resBytes| is a proto-encoded trace_processor.QueryResult message.
328  //  The overall flow looks as follows:
329  // - The user calls engine.query('select ...') and gets a QueryResult back.
330  // - The query call posts a message to the worker that runs the SQL engine (
331  //   or sends a HTTP request in case of the RPC+HTTP interface).
332  // - The returned QueryResult object is initially empty.
333  // - Over time, the sql engine will postMessage() back results in batches.
334  // - Each bach will end up calling this appendResultBatch() method.
335  // - If there is any pending promise (e.g. the caller called
336  //   queryResult.waitAllRows()), this call will awake them (if this is the
337  //   last batch).
338  appendResultBatch(resBytes: Uint8Array): void;
339}
340
341// The actual implementation, which bridges together the reader side and the
342// writer side (the one exposed to the Engine). This is the same object so that
343// when the engine pumps new row batches we can resolve pending promises that
344// readers (e.g. track code) are waiting for.
345class QueryResultImpl implements QueryResult, WritableQueryResult {
346  columnNames: string[] = [];
347  private _error?: string;
348  private _numRows = 0;
349  private _isComplete = false;
350  private _errorInfo: QueryErrorInfo;
351  private _statementCount = 0;
352  private _statementWithOutputCount = 0;
353  private _lastStatementSql = '';
354
355  constructor(errorInfo: QueryErrorInfo) {
356    this._errorInfo = errorInfo;
357  }
358
359  // --- QueryResult implementation.
360
361  // TODO(primiano): for the moment new batches are appended but old batches
362  // are never removed. This won't work with abnormally large result sets, as
363  // it will stash all rows in memory. We could switch to a model where the
364  // iterator is destructive and deletes batch objects once iterating past the
365  // end of each batch. If we do that, than we need to assign monotonic IDs to
366  // batches. Also if we do that, we should prevent creating more than one
367  // iterator for a QueryResult.
368  batches: ResultBatch[] = [];
369
370  // Promise awaiting on waitAllRows(). This should be resolved only when the
371  // last result batch has been been retrieved.
372  private allRowsPromise?: Deferred<QueryResult>;
373
374  // Promise awaiting on waitMoreRows(). This resolved when the next
375  // batch is appended via appendResultBatch.
376  private moreRowsPromise?: Deferred<QueryResult>;
377
378  isComplete(): boolean {
379    return this._isComplete;
380  }
381  numRows(): number {
382    return this._numRows;
383  }
384  error(): string | undefined {
385    return this._error;
386  }
387  columns(): string[] {
388    return this.columnNames;
389  }
390  statementCount(): number {
391    return this._statementCount;
392  }
393  statementWithOutputCount(): number {
394    return this._statementWithOutputCount;
395  }
396  lastStatementSql(): string {
397    return this._lastStatementSql;
398  }
399
400  iter<T extends Row>(spec: T): RowIterator<T> {
401    const impl = new RowIteratorImplWithRowData(spec, this);
402    return impl as {} as RowIterator<T>;
403  }
404
405  firstRow<T extends Row>(spec: T): T {
406    const impl = new RowIteratorImplWithRowData(spec, this);
407    assertTrue(impl.valid());
408    return impl as {} as RowIterator<T> as T;
409  }
410
411  // Can be called only once.
412  waitAllRows(): Promise<QueryResult> {
413    assertTrue(this.allRowsPromise === undefined);
414    this.allRowsPromise = defer<QueryResult>();
415    if (this._isComplete) {
416      this.resolveOrReject(this.allRowsPromise, this);
417    }
418    return this.allRowsPromise;
419  }
420
421  waitMoreRows(): Promise<QueryResult> {
422    if (this.moreRowsPromise !== undefined) {
423      return this.moreRowsPromise;
424    }
425
426    const moreRowsPromise = defer<QueryResult>();
427    if (this._isComplete) {
428      this.resolveOrReject(moreRowsPromise, this);
429    } else {
430      this.moreRowsPromise = moreRowsPromise;
431    }
432    return moreRowsPromise;
433  }
434
435  // --- WritableQueryResult implementation.
436
437  // Called by the engine when a new QueryResult is available. Note that a
438  // single Query() call can yield >1 QueryResult due to result batching
439  // if more than ~64K of data are returned, e.g. when returning O(M) rows.
440  // |resBytes| is a proto-encoded trace_processor.QueryResult message.
441  // It is fine to retain the resBytes without slicing a copy, because
442  // ProtoRingBuffer does the slice() for us (or passes through the buffer
443  // coming from postMessage() (Wasm case) of fetch() (HTTP+RPC case).
444  appendResultBatch(resBytes: Uint8Array) {
445    const reader = protobuf.Reader.create(resBytes);
446    assertTrue(reader.pos === 0);
447    const columnNamesEmptyAtStartOfBatch = this.columnNames.length === 0;
448    const columnNamesSet = new Set<string>();
449    while (reader.pos < reader.len) {
450      const tag = reader.uint32();
451      switch (tag >>> 3) {
452        case 1: // column_names
453          // Only the first batch should contain the column names. If this fires
454          // something is going wrong in the handling of the batch stream.
455          assertTrue(columnNamesEmptyAtStartOfBatch);
456          const origColName = reader.string();
457          let colName = origColName;
458          // In some rare cases two columns can have the same name (b/194891824)
459          // e.g. `select 1 as x, 2 as x`. These queries don't happen in the
460          // UI code, but they can happen when the user types a query (e.g.
461          // with a join). The most practical thing we can do here is renaming
462          // the columns with a suffix. Keeping the same name will break when
463          // iterating, because column names become iterator object keys.
464          for (let i = 1; columnNamesSet.has(colName); ++i) {
465            colName = `${origColName}_${i}`;
466            assertTrue(i < 100); // Give up at some point;
467          }
468          columnNamesSet.add(colName);
469          this.columnNames.push(colName);
470          break;
471        case 2: // error
472          // The query has errored only if the |error| field is non-empty.
473          // In protos, we don't distinguish between non-present and empty.
474          // Make sure we don't propagate ambiguous empty strings to JS.
475          const err = reader.string();
476          this._error = err !== undefined && err.length ? err : undefined;
477          break;
478        case 3: // batch
479          const batchLen = reader.uint32();
480          const batchRaw = resBytes.subarray(reader.pos, reader.pos + batchLen);
481          reader.pos += batchLen;
482
483          // The ResultBatch ctor parses the CellsBatch submessage.
484          const parsedBatch = new ResultBatch(batchRaw);
485          this.batches.push(parsedBatch);
486          this._isComplete = parsedBatch.isLastBatch;
487
488          // In theory one could construct a valid proto serializing the column
489          // names after the cell batches. In practice the QueryResultSerializer
490          // doesn't do that so it's not worth complicating the code.
491          const numColumns = this.columnNames.length;
492          if (numColumns !== 0) {
493            assertTrue(parsedBatch.numCells % numColumns === 0);
494            this._numRows += parsedBatch.numCells / numColumns;
495          } else {
496            // numColumns == 0 is  plausible for queries like CREATE TABLE ... .
497            assertTrue(parsedBatch.numCells === 0);
498          }
499          break;
500
501        case 4:
502          this._statementCount = reader.uint32();
503          break;
504
505        case 5:
506          this._statementWithOutputCount = reader.uint32();
507          break;
508
509        case 6:
510          this._lastStatementSql = reader.string();
511          break;
512
513        default:
514          console.warn(`Unexpected QueryResult field ${tag >>> 3}`);
515          reader.skipType(tag & 7);
516          break;
517      } // switch (tag)
518    } // while (pos < end)
519
520    if (this.moreRowsPromise !== undefined) {
521      this.resolveOrReject(this.moreRowsPromise, this);
522      this.moreRowsPromise = undefined;
523    }
524
525    if (this._isComplete && this.allRowsPromise !== undefined) {
526      this.resolveOrReject(this.allRowsPromise, this);
527    }
528  }
529
530  ensureAllRowsPromise(): Promise<QueryResult> {
531    if (this.allRowsPromise === undefined) {
532      this.waitAllRows(); // Will populate |this.allRowsPromise|.
533    }
534    return assertExists(this.allRowsPromise);
535  }
536
537  get errorInfo(): QueryErrorInfo {
538    return this._errorInfo;
539  }
540
541  private resolveOrReject(promise: Deferred<QueryResult>, arg: QueryResult) {
542    if (this._error === undefined) {
543      promise.resolve(arg);
544    } else {
545      promise.reject(new QueryError(this._error, this._errorInfo));
546    }
547  }
548}
549
550// This class holds onto a received result batch (a Uint8Array) and does some
551// partial parsing to tokenize the various cell groups. This parsing mainly
552// consists of identifying and caching the offsets of each cell group and
553// initializing the varint decoders. This half parsing is done to keep the
554// iterator's next() fast, without decoding everything into memory.
555// This is an internal implementation detail and is not exposed outside. The
556// RowIteratorImpl uses this class to iterate through batches (this class takes
557// care of iterating within a batch, RowIteratorImpl takes care of switching
558// batches when needed).
559// Note: at any point in time there can be more than one ResultIterator
560// referencing the same batch. The batch must be immutable.
561class ResultBatch {
562  readonly isLastBatch: boolean = false;
563  readonly batchBytes: Uint8Array;
564  readonly cellTypesOff: number = 0;
565  readonly cellTypesLen: number = 0;
566  readonly varintOff: number = 0;
567  readonly varintLen: number = 0;
568  readonly float64Cells = new Float64Array();
569  readonly blobCells: Uint8Array[] = [];
570  readonly stringCells: string[] = [];
571
572  // batchBytes is a trace_processor.QueryResult.CellsBatch proto.
573  constructor(batchBytes: Uint8Array) {
574    this.batchBytes = batchBytes;
575    const reader = protobuf.Reader.create(batchBytes);
576    assertTrue(reader.pos === 0);
577    const end = reader.len;
578
579    // Here we deconstruct the proto by hand. The CellsBatch is carefully
580    // designed to allow a very fast parsing from the TS side. We pack all cells
581    // of the same types together, so we can do only one call (per batch) to
582    // TextDecoder.decode(), we can overlay a memory-aligned typedarray for
583    // float values and can quickly tell and type-check the cell types.
584    // One row = N cells (we know the number upfront from the outer message).
585    // Each bach contains always an integer multiple of N cells (i.e. rows are
586    // never fragmented across different batches).
587    while (reader.pos < end) {
588      const tag = reader.uint32();
589      switch (tag >>> 3) {
590        case 1: // cell types, a packed array containing one CellType per cell.
591          assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint.
592          this.cellTypesLen = reader.uint32();
593          this.cellTypesOff = reader.pos;
594          reader.pos += this.cellTypesLen;
595          break;
596
597        case 2: // varint_cells, a packed varint buffer.
598          assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint.
599          const packLen = reader.uint32();
600          this.varintOff = reader.pos;
601          this.varintLen = packLen;
602          assertTrue(reader.buf === batchBytes);
603          assertTrue(
604            this.varintOff + this.varintLen <=
605              batchBytes.byteOffset + batchBytes.byteLength,
606          );
607          reader.pos += packLen;
608          break;
609
610        case 3: // float64_cells, a 64-bit aligned packed fixed64 buffer.
611          assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint.
612          const f64Len = reader.uint32();
613          assertTrue(f64Len % 8 === 0);
614          // Float64Array's constructor is evil: the offset is in bytes but the
615          // length is in 8-byte words.
616          const f64Words = f64Len / 8;
617          const f64Off = batchBytes.byteOffset + reader.pos;
618          if (f64Off % 8 === 0) {
619            this.float64Cells = new Float64Array(
620              batchBytes.buffer,
621              f64Off,
622              f64Words,
623            );
624          } else {
625            // When using the production code in trace_processor's rpc.cc, the
626            // float64 should be 8-bytes aligned. The slow-path case is only for
627            // tests.
628            const slice = batchBytes.buffer.slice(f64Off, f64Off + f64Len);
629            this.float64Cells = new Float64Array(slice);
630          }
631          reader.pos += f64Len;
632          break;
633
634        case 4: // blob_cells: one entry per blob.
635          assertTrue((tag & 7) === TAG_LEN_DELIM);
636          // protobufjs's bytes() under the hoods calls slice() and creates
637          // a copy. Fine here as blobs are rare and not a fastpath.
638          this.blobCells.push(new Uint8Array(reader.bytes()));
639          break;
640
641        case 5: // string_cells: all the string cells concatenated with \0s.
642          assertTrue((tag & 7) === TAG_LEN_DELIM);
643          const strLen = reader.uint32();
644          assertTrue(reader.pos + strLen <= end);
645          const subArr = batchBytes.subarray(reader.pos, reader.pos + strLen);
646          assertTrue(subArr.length === strLen);
647          // The reason why we do this split rather than creating one string
648          // per entry is that utf8 decoding has some non-negligible cost. See
649          // go/postmessage-benchmark .
650          this.stringCells = utf8Decode(subArr).split('\0');
651          reader.pos += strLen;
652          break;
653
654        case 6: // is_last_batch (boolean).
655          this.isLastBatch = !!reader.bool();
656          break;
657
658        case 7: // padding for realignment, skip silently.
659          reader.skipType(tag & 7);
660          break;
661
662        default:
663          console.warn(`Unexpected QueryResult.CellsBatch field ${tag >>> 3}`);
664          reader.skipType(tag & 7);
665          break;
666      } // switch(tag)
667    } // while (pos < end)
668  }
669
670  get numCells() {
671    return this.cellTypesLen;
672  }
673}
674
675class RowIteratorImpl implements RowIteratorBase {
676  // The spec passed to the iter call containing the expected types, e.g.:
677  // {'colA': NUM, 'colB': NUM_NULL, 'colC': STRING}.
678  // This doesn't ever change.
679  readonly rowSpec: Row;
680
681  // The object that holds the current row. This points to the parent
682  // RowIteratorImplWithRowData instance that created this class.
683  rowData: Row;
684
685  // The QueryResult object we are reading data from. The engine will pump
686  // batches over time into this object.
687  private resultObj: QueryResultImpl;
688
689  // All the member variables in the group below point to the identically-named
690  // members in result.batch[batchIdx]. This is to avoid indirection layers in
691  // the next() hotpath, so we can do this.float64Cells vs
692  // this.resultObj.batch[this.batchIdx].float64Cells.
693  // These are re-set every time tryMoveToNextBatch() is called (and succeeds).
694  private batchIdx = -1; // The batch index within |result.batches[]|.
695  private batchBytes = new Uint8Array();
696  private columnNames: string[] = [];
697  private numColumns = 0;
698  private cellTypesEnd = -1; // -1 so the 1st next() hits tryMoveToNextBatch().
699  private float64Cells = new Float64Array();
700  private varIntReader = protobuf.Reader.create(this.batchBytes);
701  private blobCells: Uint8Array[] = [];
702  private stringCells: string[] = [];
703
704  // These members instead are incremented as we read cells from next(). They
705  // are the mutable state of the iterator.
706  private nextCellTypeOff = 0;
707  private nextFloat64Cell = 0;
708  private nextStringCell = 0;
709  private nextBlobCell = 0;
710  private isValid = false;
711
712  constructor(querySpec: Row, rowData: Row, res: QueryResultImpl) {
713    Object.assign(this, querySpec);
714    this.rowData = rowData;
715    this.rowSpec = {...querySpec}; // ... -> Copy all the key/value pairs.
716    this.resultObj = res;
717    this.next();
718  }
719
720  valid(): boolean {
721    return this.isValid;
722  }
723
724  private makeError(message: string): QueryError {
725    return new QueryError(message, this.resultObj.errorInfo);
726  }
727
728  get(columnName: string): ColumnType {
729    const res = this.rowData[columnName];
730    if (res === undefined) {
731      throw this.makeError(
732        `Column '${columnName}' doesn't exist. ` +
733          `Actual columns: [${this.columnNames.join(',')}]`,
734      );
735    }
736    return res;
737  }
738
739  // Moves the cursor next by one row and updates |isValid|.
740  // When this fails to move, two cases are possible:
741  // 1. We reached the end of the result set (this is the case if
742  //    QueryResult.isComplete() == true when this fails).
743  // 2. We reached the end of the current batch, but more rows might come later
744  //    (if QueryResult.isComplete() == false).
745  next() {
746    // At some point we might reach the end of the current batch, but the next
747    // batch might be available already. In this case we want next() to
748    // transparently move on to the next batch.
749    while (this.nextCellTypeOff + this.numColumns > this.cellTypesEnd) {
750      // If TraceProcessor is behaving well, we should never end up in a
751      // situation where we have leftover cells. TP is expected to serialize
752      // whole rows in each QueryResult batch and NOT truncate them midway.
753      // If this assert fires the TP RPC logic has a bug.
754      assertTrue(
755        this.nextCellTypeOff === this.cellTypesEnd || this.cellTypesEnd === -1,
756      );
757      if (!this.tryMoveToNextBatch()) {
758        this.isValid = false;
759        return;
760      }
761    }
762
763    const rowData = this.rowData;
764    const numColumns = this.numColumns;
765
766    // Read the current row.
767    for (let i = 0; i < numColumns; i++) {
768      const cellType = this.batchBytes[this.nextCellTypeOff++];
769      const colName = this.columnNames[i];
770      const expType = this.rowSpec[colName];
771
772      switch (cellType) {
773        case CellType.CELL_NULL:
774          rowData[colName] = null;
775          break;
776
777        case CellType.CELL_VARINT:
778          if (expType === NUM || expType === NUM_NULL) {
779            // This is very subtle. The return type of int64 can be either a
780            // number or a Long.js {high:number, low:number} if Long.js is
781            // installed. The default state seems different in node and browser.
782            // We force-disable Long.js support in the top of this source file.
783            const val = this.varIntReader.int64();
784            rowData[colName] = val as {} as number;
785          } else {
786            // LONG, LONG_NULL, or unspecified - return as bigint
787            const value = decodeInt64Varint(
788              this.batchBytes,
789              this.varIntReader.pos,
790            );
791            rowData[colName] = value;
792            this.varIntReader.skip(); // Skips a varint
793          }
794          break;
795
796        case CellType.CELL_FLOAT64:
797          rowData[colName] = this.float64Cells[this.nextFloat64Cell++];
798          break;
799
800        case CellType.CELL_STRING:
801          rowData[colName] = this.stringCells[this.nextStringCell++];
802          break;
803
804        case CellType.CELL_BLOB:
805          const blob = this.blobCells[this.nextBlobCell++];
806          rowData[colName] = blob;
807          break;
808
809        default:
810          throw this.makeError(`Invalid cell type ${cellType}`);
811      }
812    } // For (cells)
813    this.isValid = true;
814  }
815
816  private tryMoveToNextBatch(): boolean {
817    const nextBatchIdx = this.batchIdx + 1;
818    if (nextBatchIdx >= this.resultObj.batches.length) {
819      return false;
820    }
821
822    this.columnNames = this.resultObj.columnNames;
823    this.numColumns = this.columnNames.length;
824
825    this.batchIdx = nextBatchIdx;
826    const batch = assertExists(this.resultObj.batches[nextBatchIdx]);
827    this.batchBytes = batch.batchBytes;
828    this.nextCellTypeOff = batch.cellTypesOff;
829    this.cellTypesEnd = batch.cellTypesOff + batch.cellTypesLen;
830    this.float64Cells = batch.float64Cells;
831    this.blobCells = batch.blobCells;
832    this.stringCells = batch.stringCells;
833    this.varIntReader = protobuf.Reader.create(batch.batchBytes);
834    this.varIntReader.pos = batch.varintOff;
835    this.varIntReader.len = batch.varintOff + batch.varintLen;
836    this.nextFloat64Cell = 0;
837    this.nextStringCell = 0;
838    this.nextBlobCell = 0;
839
840    // Check that all the expected columns are present.
841    for (const expectedCol of Object.keys(this.rowSpec)) {
842      if (this.columnNames.indexOf(expectedCol) < 0) {
843        throw this.makeError(
844          `Column ${expectedCol} not found in the SQL result ` +
845            `set {${this.columnNames.join(' ')}}`,
846        );
847      }
848    }
849
850    // Check that the cells types are consistent.
851    const numColumns = this.numColumns;
852    if (batch.numCells === 0) {
853      // This can happen if the query result contains just an error. In this
854      // an empty batch with isLastBatch=true is appended as an EOF marker.
855      // In theory TraceProcessor could return an empty batch in the middle and
856      // that would be fine from a protocol viewpoint. In practice, no code path
857      // does that today so it doesn't make sense trying supporting it with a
858      // recursive call to tryMoveToNextBatch().
859      assertTrue(batch.isLastBatch);
860      return false;
861    }
862
863    assertTrue(numColumns > 0);
864    for (let i = this.nextCellTypeOff; i < this.cellTypesEnd; i++) {
865      const col = (i - this.nextCellTypeOff) % numColumns;
866      const colName = this.columnNames[col];
867      const actualType = this.batchBytes[i] as CellType;
868      const expType = this.rowSpec[colName];
869
870      // If undefined, the caller doesn't want to read this column at all, so
871      // it can be whatever.
872      if (expType === undefined) continue;
873
874      let err = '';
875      if (!isCompatible(actualType, expType)) {
876        if (actualType === CellType.CELL_NULL) {
877          err =
878            'SQL value is NULL but that was not expected' +
879            ` (expected type: ${columnTypeToString(expType)}). ` +
880            'Did you mean NUM_NULL, LONG_NULL, STR_NULL or BLOB_NULL?';
881        } else {
882          err = `Incompatible cell type. Expected: ${columnTypeToString(
883            expType,
884          )} actual: ${CELL_TYPE_NAMES[actualType]}`;
885        }
886      }
887      if (err.length > 0) {
888        const row = Math.floor(i / numColumns);
889        const message = `Error @ row: ${row} col: '${colName}': ${err}`;
890        throw this.makeError(message);
891      }
892    }
893    return true;
894  }
895}
896
897// This is the object ultimately returned to the client when calling
898// QueryResult.iter(...).
899// The only reason why this is disjoint from RowIteratorImpl is to avoid
900// naming collisions between the members variables required by RowIteratorImpl
901// and the column names returned by the iterator.
902class RowIteratorImplWithRowData implements RowIteratorBase {
903  private _impl: RowIteratorImpl;
904
905  next: () => void;
906  valid: () => boolean;
907  get: (columnName: string) => ColumnType;
908
909  constructor(querySpec: Row, res: QueryResultImpl) {
910    const thisAsRow = this as {} as Row;
911    Object.assign(thisAsRow, querySpec);
912    this._impl = new RowIteratorImpl(querySpec, thisAsRow, res);
913    this.next = this._impl.next.bind(this._impl);
914    this.valid = this._impl.valid.bind(this._impl);
915    this.get = this._impl.get.bind(this._impl);
916  }
917}
918
919// This is a proxy object that wraps QueryResultImpl, adding await-ability.
920// This is so that:
921// 1. Clients that just want to await for the full result set can just call
922//    await engine.query('...') and will get a QueryResult that is guaranteed
923//    to be complete.
924// 2. Clients that know how to handle the streaming can use it straight away.
925class WaitableQueryResultImpl
926  implements QueryResult, WritableQueryResult, PromiseLike<QueryResult>
927{
928  private impl: QueryResultImpl;
929  private thenCalled = false;
930
931  constructor(errorInfo: QueryErrorInfo) {
932    this.impl = new QueryResultImpl(errorInfo);
933  }
934
935  // QueryResult implementation. Proxies all calls to the impl object.
936  iter<T extends Row>(spec: T) {
937    return this.impl.iter(spec);
938  }
939  firstRow<T extends Row>(spec: T) {
940    return this.impl.firstRow(spec);
941  }
942  waitAllRows() {
943    return this.impl.waitAllRows();
944  }
945  waitMoreRows() {
946    return this.impl.waitMoreRows();
947  }
948  isComplete() {
949    return this.impl.isComplete();
950  }
951  numRows() {
952    return this.impl.numRows();
953  }
954  columns() {
955    return this.impl.columns();
956  }
957  error() {
958    return this.impl.error();
959  }
960  statementCount() {
961    return this.impl.statementCount();
962  }
963  statementWithOutputCount() {
964    return this.impl.statementWithOutputCount();
965  }
966  lastStatementSql() {
967    return this.impl.lastStatementSql();
968  }
969
970  // WritableQueryResult implementation.
971  appendResultBatch(resBytes: Uint8Array) {
972    return this.impl.appendResultBatch(resBytes);
973  }
974
975  // PromiseLike<QueryResult> implementaton.
976
977  // eslint-disable-next-line @typescript-eslint/no-explicit-any
978  then(onfulfilled: any, onrejected: any): any {
979    assertFalse(this.thenCalled);
980    this.thenCalled = true;
981    return this.impl.ensureAllRowsPromise().then(onfulfilled, onrejected);
982  }
983
984  // eslint-disable-next-line @typescript-eslint/no-explicit-any
985  catch(error: any): any {
986    return this.impl.ensureAllRowsPromise().catch(error);
987  }
988
989  // eslint-disable-next-line @typescript-eslint/no-explicit-any
990  finally(callback: () => void): any {
991    return this.impl.ensureAllRowsPromise().finally(callback);
992  }
993
994  // eslint and clang-format disagree on how to format get[foo](). Let
995  // clang-format win:
996  // eslint-disable-next-line keyword-spacing
997  get [Symbol.toStringTag](): string {
998    return 'Promise<WaitableQueryResult>';
999  }
1000}
1001
1002export function createQueryResult(
1003  errorInfo: QueryErrorInfo,
1004): QueryResult & Promise<QueryResult> & WritableQueryResult {
1005  return new WaitableQueryResultImpl(errorInfo);
1006}
1007
1008// Throws if the value cannot be reasonably converted to a bigint.
1009// Assumes value is in native time units.
1010export function timeFromSql(value: ColumnType): time {
1011  if (typeof value === 'bigint') {
1012    return Time.fromRaw(value);
1013  } else if (typeof value === 'number') {
1014    return Time.fromRaw(BigInt(Math.floor(value)));
1015  } else if (value === null) {
1016    return Time.ZERO;
1017  } else {
1018    throw Error(`Refusing to create time from unrelated type ${value}`);
1019  }
1020}
1021
1022// Throws if the value cannot be reasonably converted to a bigint.
1023// Assumes value is in nanoseconds.
1024export function durationFromSql(value: ColumnType): duration {
1025  if (typeof value === 'bigint') {
1026    return value;
1027  } else if (typeof value === 'number') {
1028    return BigInt(Math.floor(value));
1029  } else if (value === null) {
1030    return Duration.ZERO;
1031  } else {
1032    throw Error(`Refusing to create duration from unrelated type ${value}`);
1033  }
1034}
1035