• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright (C) 2021 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// This file deals with deserialization and iteration of the proto-encoded
16// byte buffer that is returned by TraceProcessor when invoking the
17// TPM_QUERY_STREAMING method. The returned |query_result| buffer is optimized
18// for being moved cheaply across workers and decoded on-the-flight as we step
19// through the iterator.
20// See comments around QueryResult in trace_processor.proto for more details.
21
22// The classes in this file are organized as follows:
23//
24// QueryResultImpl:
25// The object returned by the Engine.query(sql) method.
26// This object is a holder of row data. Batches of raw get appended
27// incrementally as they are received by the remote TraceProcessor instance.
28// QueryResultImpl also deals with asynchronicity of queries and allows callers
29// to obtain a promise that waits for more (or all) rows.
30// At any point in time the following objects hold a reference to QueryResult:
31// - The Engine: for appending row batches.
32// - UI code, typically controllers, who make queries.
33//
34// ResultBatch:
35// Hold the data, returned by the remote TraceProcessor instance, for a number
36// of rows (TP typically chunks the results in batches of 128KB).
37// A QueryResultImpl holds exclusively ResultBatches for a given query.
38// ResultBatch is not exposed externally, it's just an internal representation
39// that helps with proto decoding. ResultBatch is immutable after it gets
40// appended and decoded. The iteration state is held by the RowIteratorImpl.
41//
42// RowIteratorImpl:
43// Decouples the data owned by QueryResultImpl (and its ResultBatch(es)) from
44// the iteration state. The iterator effectively is the union of a ResultBatch
45// and the row number in it. Rows within the batch are decoded as the user calls
46// next(). When getting at the end of the batch, it takes care of switching to
47// the next batch (if any) within the QueryResultImpl.
48// This object is part of the API exposed to tracks / controllers.
49
50import protobuf from 'protobufjs/minimal';
51
52// Disable Long.js support in protobuf. This seems to be enabled only in tests
53// but not in production code. In any case, for now we want casting to number
54// accepting the 2**53 limitation. This is consistent with passing
55// --force-number in the protobuf.js codegen invocation in //ui/BUILD.gn .
56// See also https://github.com/protobufjs/protobuf.js/issues/1253 .
57protobuf.util.Long = undefined as any;
58protobuf.configure();
59
60import {defer, Deferred} from '../base/deferred';
61import {assertExists, assertFalse, assertTrue} from '../base/logging';
62import {utf8Decode} from '../base/string_utils';
63
64export const NUM = 0;
65export const STR = 'str';
66export const NUM_NULL: number|null = 1;
67export const STR_NULL: string|null = 'str_null';
68export const BLOB: Uint8Array = new Uint8Array();
69export const BLOB_NULL: Uint8Array|null = new Uint8Array();
70export const LONG: bigint = 0n;
71export const LONG_NULL: bigint|null = 1n;
72
73export type ColumnType = string|number|bigint|null|Uint8Array;
74
75const SHIFT_32BITS = 32n;
76
77// Fast decode varint int64 into a bigint
78// Inspired by
79// https://github.com/protobufjs/protobuf.js/blob/56b1e64979dae757b67a21d326e16acee39f2267/src/reader.js#L123
80export function decodeInt64Varint(buf: Uint8Array, pos: number): bigint {
81  let hi: number = 0;
82  let lo: number = 0;
83  let i = 0;
84
85  if (buf.length - pos > 4) {  // fast route (lo)
86    for (; i < 4; ++i) {
87      // 1st..4th
88      lo = (lo | (buf[pos] & 127) << i * 7) >>> 0;
89      if (buf[pos++] < 128) {
90        return BigInt(lo);
91      }
92    }
93    // 5th
94    lo = (lo | (buf[pos] & 127) << 28) >>> 0;
95    hi = (hi | (buf[pos] & 127) >> 4) >>> 0;
96    if (buf[pos++] < 128) {
97      return BigInt(hi) << SHIFT_32BITS | BigInt(lo);
98    }
99    i = 0;
100  } else {
101    for (; i < 3; ++i) {
102      if (pos >= buf.length) {
103        throw Error('Index out of range');
104      }
105      // 1st..3rd
106      lo = (lo | (buf[pos] & 127) << i * 7) >>> 0;
107      if (buf[pos++] < 128) {
108        return BigInt(lo);
109      }
110    }
111    // 4th
112    lo = (lo | (buf[pos++] & 127) << i * 7) >>> 0;
113    return BigInt(hi) << SHIFT_32BITS | BigInt(lo);
114  }
115  if (buf.length - pos > 4) {  // fast route (hi)
116    for (; i < 5; ++i) {
117      // 6th..10th
118      hi = (hi | (buf[pos] & 127) << i * 7 + 3) >>> 0;
119      if (buf[pos++] < 128) {
120        const big = BigInt(hi) << SHIFT_32BITS | BigInt(lo);
121        return BigInt.asIntN(64, big);
122      }
123    }
124  } else {
125    for (; i < 5; ++i) {
126      if (pos >= buf.length) {
127        throw Error('Index out of range');
128      }
129      // 6th..10th
130      hi = (hi | (buf[pos] & 127) << i * 7 + 3) >>> 0;
131      if (buf[pos++] < 128) {
132        const big = BigInt(hi) << SHIFT_32BITS | BigInt(lo);
133        return BigInt.asIntN(64, big);
134      }
135    }
136  }
137  throw Error('invalid varint encoding');
138}
139
140// Info that could help debug a query error. For example the query
141// in question, the stack where the query was issued, the active
142// plugin etc.
143export interface QueryErrorInfo {
144  query: string;
145}
146
147export class QueryError extends Error {
148  readonly query: string;
149
150  constructor(message: string, info: QueryErrorInfo) {
151    super(message);
152    this.query = info.query;
153  }
154
155  toString() {
156    return `Query: ${this.query}\n` + super.toString();
157  }
158}
159
160// One row extracted from an SQL result:
161export interface Row {
162  [key: string]: ColumnType;
163}
164
165// The methods that any iterator has to implement.
166export interface RowIteratorBase {
167  valid(): boolean;
168  next(): void;
169
170  // Reflection support for cases where the column names are not known upfront
171  // (e.g. the query result table for user-provided SQL queries).
172  // It throws if the passed column name doesn't exist.
173  // Example usage:
174  // for (const it = queryResult.iter({}); it.valid(); it.next()) {
175  //   for (const columnName : queryResult.columns()) {
176  //      console.log(it.get(columnName));
177  get(columnName: string): ColumnType;
178}
179
180// A RowIterator is a type that has all the fields defined in the query spec
181// plus the valid() and next() operators. This is to ultimately allow the
182// clients to do:
183// const result = await engine.query("select name, surname, id from people;");
184// const iter = queryResult.iter({name: STR, surname: STR, id: NUM});
185// for (; iter.valid(); iter.next())
186//  console.log(iter.name, iter.surname);
187export type RowIterator<T extends Row> = RowIteratorBase&T;
188
189function columnTypeToString(t: ColumnType): string {
190  switch (t) {
191    case NUM:
192      return 'NUM';
193    case NUM_NULL:
194      return 'NUM_NULL';
195    case STR:
196      return 'STR';
197    case STR_NULL:
198      return 'STR_NULL';
199    case BLOB:
200      return 'BLOB';
201    case BLOB_NULL:
202      return 'BLOB_NULL';
203    case LONG:
204      return 'LONG';
205    case LONG_NULL:
206      return 'LONG_NULL';
207    default:
208      return `INVALID(${t})`;
209  }
210}
211
212function isCompatible(actual: CellType, expected: ColumnType): boolean {
213  switch (actual) {
214    case CellType.CELL_NULL:
215      return expected === NUM_NULL || expected === STR_NULL ||
216          expected === BLOB_NULL || expected === LONG_NULL;
217    case CellType.CELL_VARINT:
218      return expected === NUM || expected === NUM_NULL || expected === LONG ||
219          expected === LONG_NULL;
220    case CellType.CELL_FLOAT64:
221      return expected === NUM || expected === NUM_NULL;
222    case CellType.CELL_STRING:
223      return expected === STR || expected === STR_NULL;
224    case CellType.CELL_BLOB:
225      return expected === BLOB || expected === BLOB_NULL;
226    default:
227      throw new Error(`Unknown CellType ${actual}`);
228  }
229}
230
231// This has to match CellType in trace_processor.proto.
232enum CellType {
233  CELL_NULL = 1,
234  CELL_VARINT = 2,
235  CELL_FLOAT64 = 3,
236  CELL_STRING = 4,
237  CELL_BLOB = 5,
238}
239
240const CELL_TYPE_NAMES =
241    ['UNKNOWN', 'NULL', 'VARINT', 'FLOAT64', 'STRING', 'BLOB'];
242
243const TAG_LEN_DELIM = 2;
244
245// This is the interface exposed to readers (e.g. tracks). The underlying object
246// (QueryResultImpl) owns the result data. This allows to obtain iterators on
247// that. In future it will allow to wait for incremental updates (new rows being
248// fetched) for streaming queries.
249export interface QueryResult {
250  // Obtains an iterator.
251  // TODO(primiano): this should have an option to destruct data as we read. In
252  // the case of a long query (e.g. `SELECT * FROM sched` in the query prompt)
253  // we don't want to accumulate everything in memory. OTOH UI tracks want to
254  // keep the data around so they can redraw them on each animation frame. For
255  // now we keep everything in memory in the QueryResultImpl object.
256  // iter<T extends Row>(spec: T): RowIterator<T>;
257  iter<T extends Row>(spec: T): RowIterator<T>;
258
259  // Like iter() for queries that expect only one row. It embeds the valid()
260  // check (i.e. throws if no rows are available) and returns directly the
261  // first result.
262  firstRow<T extends Row>(spec: T): T;
263
264  // If != undefined the query errored out and error() contains the message.
265  error(): string|undefined;
266
267  // Returns the number of rows accumulated so far. Note that this number can
268  // change over time as more batches are received. It becomes stable only
269  // when isComplete() returns true or after waitAllRows() is resolved.
270  numRows(): number;
271
272  // If true all rows have been fetched. Calling iter() will iterate through the
273  // last row. If false, iter() will return an iterator which might iterate
274  // through some rows (or none) but will surely not reach the end.
275
276  isComplete(): boolean;
277
278  // Returns a promise that is resolved only when all rows (i.e. all batches)
279  // have been fetched. The promise return value is always the object iself.
280  waitAllRows(): Promise<QueryResult>;
281
282  // Returns a promise that is resolved when either:
283  // - more rows are available
284  // - all rows are available
285  // The promise return value is always the object iself.
286  waitMoreRows(): Promise<QueryResult>;
287
288  // Can return an empty array if called before the first batch is resolved.
289  // This should be called only after having awaited for at least one batch.
290  columns(): string[];
291
292  // Returns the number of SQL statements in the query
293  // (e.g. 2 'if SELECT 1; SELECT 2;')
294  statementCount(): number;
295
296  // Returns the number of SQL statement that produced output rows. This number
297  // is <= statementCount().
298  statementWithOutputCount(): number;
299}
300
301// Interface exposed to engine.ts to pump in the data as new row batches arrive.
302export interface WritableQueryResult extends QueryResult {
303  // |resBytes| is a proto-encoded trace_processor.QueryResult message.
304  //  The overall flow looks as follows:
305  // - The user calls engine.query('select ...') and gets a QueryResult back.
306  // - The query call posts a message to the worker that runs the SQL engine (
307  //   or sends a HTTP request in case of the RPC+HTTP interface).
308  // - The returned QueryResult object is initially empty.
309  // - Over time, the sql engine will postMessage() back results in batches.
310  // - Each bach will end up calling this appendResultBatch() method.
311  // - If there is any pending promise (e.g. the caller called
312  //   queryResult.waitAllRows()), this call will awake them (if this is the
313  //   last batch).
314  appendResultBatch(resBytes: Uint8Array): void;
315}
316
317// The actual implementation, which bridges together the reader side and the
318// writer side (the one exposed to the Engine). This is the same object so that
319// when the engine pumps new row batches we can resolve pending promises that
320// readers (e.g. track code) are waiting for.
321class QueryResultImpl implements QueryResult, WritableQueryResult {
322  columnNames: string[] = [];
323  private _error?: string;
324  private _numRows = 0;
325  private _isComplete = false;
326  private _errorInfo: QueryErrorInfo;
327  private _statementCount = 0;
328  private _statementWithOutputCount = 0;
329
330  constructor(errorInfo: QueryErrorInfo) {
331    this._errorInfo = errorInfo;
332  }
333
334  // --- QueryResult implementation.
335
336  // TODO(primiano): for the moment new batches are appended but old batches
337  // are never removed. This won't work with abnormally large result sets, as
338  // it will stash all rows in memory. We could switch to a model where the
339  // iterator is destructive and deletes batch objects once iterating past the
340  // end of each batch. If we do that, than we need to assign monotonic IDs to
341  // batches. Also if we do that, we should prevent creating more than one
342  // iterator for a QueryResult.
343  batches: ResultBatch[] = [];
344
345  // Promise awaiting on waitAllRows(). This should be resolved only when the
346  // last result batch has been been retrieved.
347  private allRowsPromise?: Deferred<QueryResult>;
348
349  // Promise awaiting on waitMoreRows(). This resolved when the next
350  // batch is appended via appendResultBatch.
351  private moreRowsPromise?: Deferred<QueryResult>;
352
353  isComplete(): boolean {
354    return this._isComplete;
355  }
356  numRows(): number {
357    return this._numRows;
358  }
359  error(): string|undefined {
360    return this._error;
361  }
362  columns(): string[] {
363    return this.columnNames;
364  }
365  statementCount(): number {
366    return this._statementCount;
367  }
368  statementWithOutputCount(): number {
369    return this._statementWithOutputCount;
370  }
371
372  iter<T extends Row>(spec: T): RowIterator<T> {
373    const impl = new RowIteratorImplWithRowData(spec, this);
374    return impl as {} as RowIterator<T>;
375  }
376
377  firstRow<T extends Row>(spec: T): T {
378    const impl = new RowIteratorImplWithRowData(spec, this);
379    assertTrue(impl.valid());
380    return impl as {} as RowIterator<T>as T;
381  }
382
383  // Can be called only once.
384  waitAllRows(): Promise<QueryResult> {
385    assertTrue(this.allRowsPromise === undefined);
386    this.allRowsPromise = defer<QueryResult>();
387    if (this._isComplete) {
388      this.resolveOrReject(this.allRowsPromise, this);
389    }
390    return this.allRowsPromise;
391  }
392
393  waitMoreRows(): Promise<QueryResult> {
394    if (this.moreRowsPromise !== undefined) {
395      return this.moreRowsPromise;
396    }
397
398    const moreRowsPromise = defer<QueryResult>();
399    if (this._isComplete) {
400      this.resolveOrReject(moreRowsPromise, this);
401    } else {
402      this.moreRowsPromise = moreRowsPromise;
403    }
404    return moreRowsPromise;
405  }
406
407  // --- WritableQueryResult implementation.
408
409  // Called by the engine when a new QueryResult is available. Note that a
410  // single Query() call can yield >1 QueryResult due to result batching
411  // if more than ~64K of data are returned, e.g. when returning O(M) rows.
412  // |resBytes| is a proto-encoded trace_processor.QueryResult message.
413  // It is fine to retain the resBytes without slicing a copy, because
414  // ProtoRingBuffer does the slice() for us (or passes through the buffer
415  // coming from postMessage() (Wasm case) of fetch() (HTTP+RPC case).
416  appendResultBatch(resBytes: Uint8Array) {
417    const reader = protobuf.Reader.create(resBytes);
418    assertTrue(reader.pos === 0);
419    const columnNamesEmptyAtStartOfBatch = this.columnNames.length === 0;
420    const columnNamesSet = new Set<string>();
421    while (reader.pos < reader.len) {
422      const tag = reader.uint32();
423      switch (tag >>> 3) {
424        case 1:  // column_names
425          // Only the first batch should contain the column names. If this fires
426          // something is going wrong in the handling of the batch stream.
427          assertTrue(columnNamesEmptyAtStartOfBatch);
428          const origColName = reader.string();
429          let colName = origColName;
430          // In some rare cases two columns can have the same name (b/194891824)
431          // e.g. `select 1 as x, 2 as x`. These queries don't happen in the
432          // UI code, but they can happen when the user types a query (e.g.
433          // with a join). The most practical thing we can do here is renaming
434          // the columns with a suffix. Keeping the same name will break when
435          // iterating, because column names become iterator object keys.
436          for (let i = 1; columnNamesSet.has(colName); ++i) {
437            colName = `${origColName}_${i}`;
438            assertTrue(i < 100);  // Give up at some point;
439          }
440          columnNamesSet.add(colName);
441          this.columnNames.push(colName);
442          break;
443        case 2:  // error
444          // The query has errored only if the |error| field is non-empty.
445          // In protos, we don't distinguish between non-present and empty.
446          // Make sure we don't propagate ambiguous empty strings to JS.
447          const err = reader.string();
448          this._error = (err !== undefined && err.length) ? err : undefined;
449          break;
450        case 3:  // batch
451          const batchLen = reader.uint32();
452          const batchRaw = resBytes.subarray(reader.pos, reader.pos + batchLen);
453          reader.pos += batchLen;
454
455          // The ResultBatch ctor parses the CellsBatch submessage.
456          const parsedBatch = new ResultBatch(batchRaw);
457          this.batches.push(parsedBatch);
458          this._isComplete = parsedBatch.isLastBatch;
459
460          // In theory one could construct a valid proto serializing the column
461          // names after the cell batches. In practice the QueryResultSerializer
462          // doesn't do that so it's not worth complicating the code.
463          const numColumns = this.columnNames.length;
464          if (numColumns !== 0) {
465            assertTrue(parsedBatch.numCells % numColumns === 0);
466            this._numRows += parsedBatch.numCells / numColumns;
467          } else {
468            // numColumns == 0 is  plausible for queries like CREATE TABLE ... .
469            assertTrue(parsedBatch.numCells === 0);
470          }
471          break;
472
473        case 4:
474          this._statementCount = reader.uint32();
475          break;
476
477        case 5:
478          this._statementWithOutputCount = reader.uint32();
479          break;
480
481        default:
482          console.warn(`Unexpected QueryResult field ${tag >>> 3}`);
483          reader.skipType(tag & 7);
484          break;
485      }  // switch (tag)
486    }    // while (pos < end)
487
488    if (this.moreRowsPromise !== undefined) {
489      this.resolveOrReject(this.moreRowsPromise, this);
490      this.moreRowsPromise = undefined;
491    }
492
493    if (this._isComplete && this.allRowsPromise !== undefined) {
494      this.resolveOrReject(this.allRowsPromise, this);
495    }
496  }
497
498  ensureAllRowsPromise(): Promise<QueryResult> {
499    if (this.allRowsPromise === undefined) {
500      this.waitAllRows();  // Will populate |this.allRowsPromise|.
501    }
502    return assertExists(this.allRowsPromise);
503  }
504
505  private resolveOrReject(promise: Deferred<QueryResult>, arg: QueryResult) {
506    if (this._error === undefined) {
507      promise.resolve(arg);
508    } else {
509      promise.reject(new QueryError(this._error, this._errorInfo));
510    }
511  }
512}
513
514// This class holds onto a received result batch (a Uint8Array) and does some
515// partial parsing to tokenize the various cell groups. This parsing mainly
516// consists of identifying and caching the offsets of each cell group and
517// initializing the varint decoders. This half parsing is done to keep the
518// iterator's next() fast, without decoding everything into memory.
519// This is an internal implementation detail and is not exposed outside. The
520// RowIteratorImpl uses this class to iterate through batches (this class takes
521// care of iterating within a batch, RowIteratorImpl takes care of switching
522// batches when needed).
523// Note: at any point in time there can be more than one ResultIterator
524// referencing the same batch. The batch must be immutable.
525class ResultBatch {
526  readonly isLastBatch: boolean = false;
527  readonly batchBytes: Uint8Array;
528  readonly cellTypesOff: number = 0;
529  readonly cellTypesLen: number = 0;
530  readonly varintOff: number = 0;
531  readonly varintLen: number = 0;
532  readonly float64Cells = new Float64Array();
533  readonly blobCells: Uint8Array[] = [];
534  readonly stringCells: string[] = [];
535
536  // batchBytes is a trace_processor.QueryResult.CellsBatch proto.
537  constructor(batchBytes: Uint8Array) {
538    this.batchBytes = batchBytes;
539    const reader = protobuf.Reader.create(batchBytes);
540    assertTrue(reader.pos === 0);
541    const end = reader.len;
542
543    // Here we deconstruct the proto by hand. The CellsBatch is carefully
544    // designed to allow a very fast parsing from the TS side. We pack all cells
545    // of the same types together, so we can do only one call (per batch) to
546    // TextDecoder.decode(), we can overlay a memory-aligned typedarray for
547    // float values and can quickly tell and type-check the cell types.
548    // One row = N cells (we know the number upfront from the outer message).
549    // Each bach contains always an integer multiple of N cells (i.e. rows are
550    // never fragmented across different batches).
551    while (reader.pos < end) {
552      const tag = reader.uint32();
553      switch (tag >>> 3) {
554        case 1:  // cell types, a packed array containing one CellType per cell.
555          assertTrue((tag & 7) === TAG_LEN_DELIM);  // Must be packed varint.
556          this.cellTypesLen = reader.uint32();
557          this.cellTypesOff = reader.pos;
558          reader.pos += this.cellTypesLen;
559          break;
560
561        case 2:  // varint_cells, a packed varint buffer.
562          assertTrue((tag & 7) === TAG_LEN_DELIM);  // Must be packed varint.
563          const packLen = reader.uint32();
564          this.varintOff = reader.pos;
565          this.varintLen = packLen;
566          assertTrue(reader.buf === batchBytes);
567          assertTrue(
568              this.varintOff + this.varintLen <=
569              batchBytes.byteOffset + batchBytes.byteLength);
570          reader.pos += packLen;
571          break;
572
573        case 3:  // float64_cells, a 64-bit aligned packed fixed64 buffer.
574          assertTrue((tag & 7) === TAG_LEN_DELIM);  // Must be packed varint.
575          const f64Len = reader.uint32();
576          assertTrue(f64Len % 8 === 0);
577          // Float64Array's constructor is evil: the offset is in bytes but the
578          // length is in 8-byte words.
579          const f64Words = f64Len / 8;
580          const f64Off = batchBytes.byteOffset + reader.pos;
581          if (f64Off % 8 === 0) {
582            this.float64Cells =
583                new Float64Array(batchBytes.buffer, f64Off, f64Words);
584          } else {
585            // When using the production code in trace_processor's rpc.cc, the
586            // float64 should be 8-bytes aligned. The slow-path case is only for
587            // tests.
588            const slice = batchBytes.buffer.slice(f64Off, f64Off + f64Len);
589            this.float64Cells = new Float64Array(slice);
590          }
591          reader.pos += f64Len;
592          break;
593
594        case 4:  // blob_cells: one entry per blob.
595          assertTrue((tag & 7) === TAG_LEN_DELIM);
596          // protobufjs's bytes() under the hoods calls slice() and creates
597          // a copy. Fine here as blobs are rare and not a fastpath.
598          this.blobCells.push(new Uint8Array(reader.bytes()));
599          break;
600
601        case 5:  // string_cells: all the string cells concatenated with \0s.
602          assertTrue((tag & 7) === TAG_LEN_DELIM);
603          const strLen = reader.uint32();
604          assertTrue(reader.pos + strLen <= end);
605          const subArr = batchBytes.subarray(reader.pos, reader.pos + strLen);
606          assertTrue(subArr.length === strLen);
607          // The reason why we do this split rather than creating one string
608          // per entry is that utf8 decoding has some non-negligible cost. See
609          // go/postmessage-benchmark .
610          this.stringCells = utf8Decode(subArr).split('\0');
611          reader.pos += strLen;
612          break;
613
614        case 6:  // is_last_batch (boolean).
615          this.isLastBatch = !!reader.bool();
616          break;
617
618        case 7:  // padding for realignment, skip silently.
619          reader.skipType(tag & 7);
620          break;
621
622        default:
623          console.warn(`Unexpected QueryResult.CellsBatch field ${tag >>> 3}`);
624          reader.skipType(tag & 7);
625          break;
626      }  // switch(tag)
627    }    // while (pos < end)
628  }
629
630  get numCells() {
631    return this.cellTypesLen;
632  }
633}
634
635class RowIteratorImpl implements RowIteratorBase {
636  // The spec passed to the iter call containing the expected types, e.g.:
637  // {'colA': NUM, 'colB': NUM_NULL, 'colC': STRING}.
638  // This doesn't ever change.
639  readonly rowSpec: Row;
640
641  // The object that holds the current row. This points to the parent
642  // RowIteratorImplWithRowData instance that created this class.
643  rowData: Row;
644
645  // The QueryResult object we are reading data from. The engine will pump
646  // batches over time into this object.
647  private resultObj: QueryResultImpl;
648
649  // All the member variables in the group below point to the identically-named
650  // members in result.batch[batchIdx]. This is to avoid indirection layers in
651  // the next() hotpath, so we can do this.float64Cells vs
652  // this.resultObj.batch[this.batchIdx].float64Cells.
653  // These are re-set every time tryMoveToNextBatch() is called (and succeeds).
654  private batchIdx = -1;  // The batch index within |result.batches[]|.
655  private batchBytes = new Uint8Array();
656  private columnNames: string[] = [];
657  private numColumns = 0;
658  private cellTypesEnd = -1;  // -1 so the 1st next() hits tryMoveToNextBatch().
659  private float64Cells = new Float64Array();
660  private varIntReader = protobuf.Reader.create(this.batchBytes);
661  private blobCells: Uint8Array[] = [];
662  private stringCells: string[] = [];
663
664  // These members instead are incremented as we read cells from next(). They
665  // are the mutable state of the iterator.
666  private nextCellTypeOff = 0;
667  private nextFloat64Cell = 0;
668  private nextStringCell = 0;
669  private nextBlobCell = 0;
670  private isValid = false;
671
672  constructor(querySpec: Row, rowData: Row, res: QueryResultImpl) {
673    Object.assign(this, querySpec);
674    this.rowData = rowData;
675    this.rowSpec = {...querySpec};  // ... -> Copy all the key/value pairs.
676    this.resultObj = res;
677    this.next();
678  }
679
680  valid(): boolean {
681    return this.isValid;
682  }
683
684
685  get(columnName: string): ColumnType {
686    const res = this.rowData[columnName];
687    if (res === undefined) {
688      throw new Error(
689          `Column '${columnName}' doesn't exist. ` +
690          `Actual columns: [${this.columnNames.join(',')}]`);
691    }
692    return res;
693  }
694
695  // Moves the cursor next by one row and updates |isValid|.
696  // When this fails to move, two cases are possible:
697  // 1. We reached the end of the result set (this is the case if
698  //    QueryResult.isComplete() == true when this fails).
699  // 2. We reached the end of the current batch, but more rows might come later
700  //    (if QueryResult.isComplete() == false).
701  next() {
702    // At some point we might reach the end of the current batch, but the next
703    // batch might be available already. In this case we want next() to
704    // transparently move on to the next batch.
705    while (this.nextCellTypeOff + this.numColumns > this.cellTypesEnd) {
706      // If TraceProcessor is behaving well, we should never end up in a
707      // situation where we have leftover cells. TP is expected to serialize
708      // whole rows in each QueryResult batch and NOT truncate them midway.
709      // If this assert fires the TP RPC logic has a bug.
710      assertTrue(
711          this.nextCellTypeOff === this.cellTypesEnd ||
712          this.cellTypesEnd === -1);
713      if (!this.tryMoveToNextBatch()) {
714        this.isValid = false;
715        return;
716      }
717    }
718
719    const rowData = this.rowData;
720    const numColumns = this.numColumns;
721
722    // Read the current row.
723    for (let i = 0; i < numColumns; i++) {
724      const cellType = this.batchBytes[this.nextCellTypeOff++];
725      const colName = this.columnNames[i];
726      const expType = this.rowSpec[colName];
727
728      switch (cellType) {
729        case CellType.CELL_NULL:
730          rowData[colName] = null;
731          break;
732
733        case CellType.CELL_VARINT:
734          if (expType === NUM || expType === NUM_NULL) {
735            // This is very subtle. The return type of int64 can be either a
736            // number or a Long.js {high:number, low:number} if Long.js is
737            // installed. The default state seems different in node and browser.
738            // We force-disable Long.js support in the top of this source file.
739            const val = this.varIntReader.int64();
740            rowData[colName] = val as {} as number;
741          } else {
742            // LONG, LONG_NULL, or unspecified - return as bigint
743            const value =
744                decodeInt64Varint(this.batchBytes, this.varIntReader.pos);
745            rowData[colName] = value;
746            this.varIntReader.skip();  // Skips a varint
747          }
748          break;
749
750        case CellType.CELL_FLOAT64:
751          rowData[colName] = this.float64Cells[this.nextFloat64Cell++];
752          break;
753
754        case CellType.CELL_STRING:
755          rowData[colName] = this.stringCells[this.nextStringCell++];
756          break;
757
758        case CellType.CELL_BLOB:
759          const blob = this.blobCells[this.nextBlobCell++];
760          rowData[colName] = blob;
761          break;
762
763        default:
764          throw new Error(`Invalid cell type ${cellType}`);
765      }
766    }  // For (cells)
767    this.isValid = true;
768  }
769
770  private tryMoveToNextBatch(): boolean {
771    const nextBatchIdx = this.batchIdx + 1;
772    if (nextBatchIdx >= this.resultObj.batches.length) {
773      return false;
774    }
775
776    this.columnNames = this.resultObj.columnNames;
777    this.numColumns = this.columnNames.length;
778
779    this.batchIdx = nextBatchIdx;
780    const batch = assertExists(this.resultObj.batches[nextBatchIdx]);
781    this.batchBytes = batch.batchBytes;
782    this.nextCellTypeOff = batch.cellTypesOff;
783    this.cellTypesEnd = batch.cellTypesOff + batch.cellTypesLen;
784    this.float64Cells = batch.float64Cells;
785    this.blobCells = batch.blobCells;
786    this.stringCells = batch.stringCells;
787    this.varIntReader = protobuf.Reader.create(batch.batchBytes);
788    this.varIntReader.pos = batch.varintOff;
789    this.varIntReader.len = batch.varintOff + batch.varintLen;
790    this.nextFloat64Cell = 0;
791    this.nextStringCell = 0;
792    this.nextBlobCell = 0;
793
794    // Check that all the expected columns are present.
795    for (const expectedCol of Object.keys(this.rowSpec)) {
796      if (this.columnNames.indexOf(expectedCol) < 0) {
797        throw new Error(
798            `Column ${expectedCol} not found in the SQL result ` +
799            `set {${this.columnNames.join(' ')}}`);
800      }
801    }
802
803    // Check that the cells types are consistent.
804    const numColumns = this.numColumns;
805    if (batch.numCells === 0) {
806      // This can happen if the query result contains just an error. In this
807      // an empty batch with isLastBatch=true is appended as an EOF marker.
808      // In theory TraceProcessor could return an empty batch in the middle and
809      // that would be fine from a protocol viewpoint. In practice, no code path
810      // does that today so it doesn't make sense trying supporting it with a
811      // recursive call to tryMoveToNextBatch().
812      assertTrue(batch.isLastBatch);
813      return false;
814    }
815
816    assertTrue(numColumns > 0);
817    for (let i = this.nextCellTypeOff; i < this.cellTypesEnd; i++) {
818      const col = (i - this.nextCellTypeOff) % numColumns;
819      const colName = this.columnNames[col];
820      const actualType = this.batchBytes[i] as CellType;
821      const expType = this.rowSpec[colName];
822
823      // If undefined, the caller doesn't want to read this column at all, so
824      // it can be whatever.
825      if (expType === undefined) continue;
826
827      let err = '';
828      if (!isCompatible(actualType, expType)) {
829        if (actualType === CellType.CELL_NULL) {
830          err = 'SQL value is NULL but that was not expected' +
831              ` (expected type: ${columnTypeToString(expType)}). ` +
832              'Did you mean NUM_NULL, LONG_NULL, STR_NULL or BLOB_NULL?';
833        } else {
834          err = `Incompatible cell type. Expected: ${
835              columnTypeToString(
836                  expType)} actual: ${CELL_TYPE_NAMES[actualType]}`;
837        }
838      }
839      if (err.length > 0) {
840        throw new Error(
841            `Error @ row: ${Math.floor(i / numColumns)} col: '` +
842            `${colName}': ${err}`);
843      }
844    }
845    return true;
846  }
847}
848
849// This is the object ultimately returned to the client when calling
850// QueryResult.iter(...).
851// The only reason why this is disjoint from RowIteratorImpl is to avoid
852// naming collisions between the members variables required by RowIteratorImpl
853// and the column names returned by the iterator.
854class RowIteratorImplWithRowData implements RowIteratorBase {
855  private _impl: RowIteratorImpl;
856
857  next: () => void;
858  valid: () => boolean;
859  get: (columnName: string) => ColumnType;
860
861  constructor(querySpec: Row, res: QueryResultImpl) {
862    const thisAsRow = this as {} as Row;
863    Object.assign(thisAsRow, querySpec);
864    this._impl = new RowIteratorImpl(querySpec, thisAsRow, res);
865    this.next = this._impl.next.bind(this._impl);
866    this.valid = this._impl.valid.bind(this._impl);
867    this.get = this._impl.get.bind(this._impl);
868  }
869}
870
871// This is a proxy object that wraps QueryResultImpl, adding await-ability.
872// This is so that:
873// 1. Clients that just want to await for the full result set can just call
874//    await engine.query('...') and will get a QueryResult that is guaranteed
875//    to be complete.
876// 2. Clients that know how to handle the streaming can use it straight away.
877class WaitableQueryResultImpl implements QueryResult, WritableQueryResult,
878                                         PromiseLike<QueryResult> {
879  private impl: QueryResultImpl;
880  private thenCalled = false;
881
882  constructor(errorInfo: QueryErrorInfo) {
883    this.impl = new QueryResultImpl(errorInfo);
884  }
885
886  // QueryResult implementation. Proxies all calls to the impl object.
887  iter<T extends Row>(spec: T) {
888    return this.impl.iter(spec);
889  }
890  firstRow<T extends Row>(spec: T) {
891    return this.impl.firstRow(spec);
892  }
893  waitAllRows() {
894    return this.impl.waitAllRows();
895  }
896  waitMoreRows() {
897    return this.impl.waitMoreRows();
898  }
899  isComplete() {
900    return this.impl.isComplete();
901  }
902  numRows() {
903    return this.impl.numRows();
904  }
905  columns() {
906    return this.impl.columns();
907  }
908  error() {
909    return this.impl.error();
910  }
911  statementCount() {
912    return this.impl.statementCount();
913  }
914  statementWithOutputCount() {
915    return this.impl.statementWithOutputCount();
916  }
917
918  // WritableQueryResult implementation.
919  appendResultBatch(resBytes: Uint8Array) {
920    return this.impl.appendResultBatch(resBytes);
921  }
922
923  // PromiseLike<QueryResult> implementaton.
924
925  then(onfulfilled: any, onrejected: any): any {
926    assertFalse(this.thenCalled);
927    this.thenCalled = true;
928    return this.impl.ensureAllRowsPromise().then(onfulfilled, onrejected);
929  }
930
931  catch(error: any): any {
932    return this.impl.ensureAllRowsPromise().catch(error);
933  }
934
935  finally(callback: () => void): any {
936    return this.impl.ensureAllRowsPromise().finally(callback);
937  }
938
939  // eslint and clang-format disagree on how to format get[foo](). Let
940  // clang-format win:
941  // eslint-disable-next-line keyword-spacing
942  get[Symbol.toStringTag](): string {
943    return 'Promise<WaitableQueryResult>';
944  }
945}
946
947export function createQueryResult(errorInfo: QueryErrorInfo): QueryResult&
948    Promise<QueryResult>&WritableQueryResult {
949  return new WaitableQueryResultImpl(errorInfo);
950}
951