• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright (C) 2021 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// This file deals with deserialization and iteration of the proto-encoded
16// byte buffer that is returned by TraceProcessor when invoking the
17// TPM_QUERY_STREAMING method. The returned |query_result| buffer is optimized
18// for being moved cheaply across workers and decoded on-the-flight as we step
19// through the iterator.
20// See comments around QueryResult in trace_processor.proto for more details.
21
22// The classes in this file are organized as follows:
23//
24// QueryResultImpl:
25// The object returned by the Engine.query(sql) method.
26// This object is a holder of row data. Batches of raw get appended
27// incrementally as they are received by the remote TraceProcessor instance.
28// QueryResultImpl also deals with asynchronicity of queries and allows callers
29// to obtain a promise that waits for more (or all) rows.
30// At any point in time the following objects hold a reference to QueryResult:
31// - The Engine: for appending row batches.
32// - UI code, typically controllers, who make queries.
33//
34// ResultBatch:
35// Hold the data, returned by the remote TraceProcessor instance, for a number
36// of rows (TP typically chunks the results in batches of 128KB).
37// A QueryResultImpl holds exclusively ResultBatches for a given query.
38// ResultBatch is not exposed externally, it's just an internal representation
39// that helps with proto decoding. ResultBatch is immutable after it gets
40// appended and decoded. The iteration state is held by the RowIteratorImpl.
41//
42// RowIteratorImpl:
43// Decouples the data owned by QueryResultImpl (and its ResultBatch(es)) from
44// the iteration state. The iterator effectively is the union of a ResultBatch
45// and the row number in it. Rows within the batch are decoded as the user calls
46// next(). When getting at the end of the batch, it takes care of switching to
47// the next batch (if any) within the QueryResultImpl.
48// This object is part of the API exposed to tracks / controllers.
49
50import * as protobuf from 'protobufjs/minimal';
51
52import {defer, Deferred} from '../base/deferred';
53import {assertExists, assertFalse, assertTrue} from '../base/logging';
54import {utf8Decode} from '../base/string_utils';
55
56export const NUM = 0;
57export const STR = 'str';
58export const NUM_NULL: number|null = 1;
59export const STR_NULL: string|null = 'str_null';
60
61export type ColumnType = string|number|null;
62
63// Info that could help debug a query error. For example the query
64// in question, the stack where the query was issued, the active
65// plugin etc.
66export interface QueryErrorInfo {
67  query: string;
68}
69
70export class QueryError extends Error {
71  readonly query: string;
72
73  constructor(message: string, info: QueryErrorInfo) {
74    super(message);
75    this.query = info.query;
76  }
77
78  toString() {
79    return `Query: ${this.query}\n` + super.toString();
80  }
81}
82
83// One row extracted from an SQL result:
84export interface Row {
85  [key: string]: ColumnType;
86}
87
88// The methods that any iterator has to implement.
89export interface RowIteratorBase {
90  valid(): boolean;
91  next(): void;
92
93  // Reflection support for cases where the column names are not known upfront
94  // (e.g. the query result table for user-provided SQL queries).
95  // It throws if the passed column name doesn't exist.
96  // Example usage:
97  // for (const it = queryResult.iter({}); it.valid(); it.next()) {
98  //   for (const columnName : queryResult.columns()) {
99  //      console.log(it.get(columnName));
100  get(columnName: string): ColumnType;
101}
102
103// A RowIterator is a type that has all the fields defined in the query spec
104// plus the valid() and next() operators. This is to ultimately allow the
105// clients to do:
106// const result = await engine.query("select name, surname, id from people;");
107// const iter = queryResult.iter({name: STR, surname: STR, id: NUM});
108// for (; iter.valid(); iter.next())
109//  console.log(iter.name, iter.surname);
110export type RowIterator<T extends Row> = RowIteratorBase&T;
111
112function columnTypeToString(t: ColumnType): string {
113  switch (t) {
114    case NUM:
115      return 'NUM';
116    case NUM_NULL:
117      return 'NUM_NULL';
118    case STR:
119      return 'STR';
120    case STR_NULL:
121      return 'STR_NULL';
122    default:
123      return `INVALID(${t})`;
124  }
125}
126
127// Disable Long.js support in protobuf. This seems to be enabled only in tests
128// but not in production code. In any case, for now we want casting to number
129// accepting the 2**53 limitation. This is consistent with passing
130// --force-number in the protobuf.js codegen invocation in //ui/BUILD.gn .
131// See also https://github.com/protobufjs/protobuf.js/issues/1253 .
132(protobuf.util as {} as {Long: undefined}).Long = undefined;
133protobuf.configure();
134
135// This has to match CellType in trace_processor.proto.
136enum CellType {
137  CELL_NULL = 1,
138  CELL_VARINT = 2,
139  CELL_FLOAT64 = 3,
140  CELL_STRING = 4,
141  CELL_BLOB = 5,
142}
143
144const CELL_TYPE_NAMES =
145    ['UNKNOWN', 'NULL', 'VARINT', 'FLOAT64', 'STRING', 'BLOB'];
146
147const TAG_LEN_DELIM = 2;
148
149// This is the interface exposed to readers (e.g. tracks). The underlying object
150// (QueryResultImpl) owns the result data. This allows to obtain iterators on
151// that. In future it will allow to wait for incremental updates (new rows being
152// fetched) for streaming queries.
153export interface QueryResult {
154  // Obtains an iterator.
155  // TODO(primiano): this should have an option to destruct data as we read. In
156  // the case of a long query (e.g. `SELECT * FROM sched` in the query prompt)
157  // we don't want to accumulate everything in memory. OTOH UI tracks want to
158  // keep the data around so they can redraw them on each animation frame. For
159  // now we keep everything in memory in the QueryResultImpl object.
160  // iter<T extends Row>(spec: T): RowIterator<T>;
161  iter<T extends Row>(spec: T): RowIterator<T>;
162
163  // Like iter() for queries that expect only one row. It embeds the valid()
164  // check (i.e. throws if no rows are available) and returns directly the
165  // first result.
166  firstRow<T extends Row>(spec: T): T;
167
168  // If != undefined the query errored out and error() contains the message.
169  error(): string|undefined;
170
171  // Returns the number of rows accumulated so far. Note that this number can
172  // change over time as more batches are received. It becomes stable only
173  // when isComplete() returns true or after waitAllRows() is resolved.
174  numRows(): number;
175
176  // If true all rows have been fetched. Calling iter() will iterate through the
177  // last row. If false, iter() will return an iterator which might iterate
178  // through some rows (or none) but will surely not reach the end.
179
180  isComplete(): boolean;
181
182  // Returns a promise that is resolved only when all rows (i.e. all batches)
183  // have been fetched. The promise return value is always the object iself.
184  waitAllRows(): Promise<QueryResult>;
185
186  // Can return an empty array if called before the first batch is resolved.
187  // This should be called only after having awaited for at least one batch.
188  columns(): string[];
189
190  // Returns the number of SQL statements in the query
191  // (e.g. 2 'if SELECT 1; SELECT 2;')
192  statementCount(): number;
193
194  // Returns the number of SQL statement that produced output rows. This number
195  // is <= statementCount().
196  statementWithOutputCount(): number;
197
198  // TODO(primiano): next CLs will introduce a waitMoreRows() to allow tracks
199  // to await until some more data (but not necessarily all) is available. For
200  // now everything uses waitAllRows().
201}
202
203// Interface exposed to engine.ts to pump in the data as new row batches arrive.
204export interface WritableQueryResult extends QueryResult {
205  // |resBytes| is a proto-encoded trace_processor.QueryResult message.
206  //  The overall flow looks as follows:
207  // - The user calls engine.query('select ...') and gets a QueryResult back.
208  // - The query call posts a message to the worker that runs the SQL engine (
209  //   or sends a HTTP request in case of the RPC+HTTP interface).
210  // - The returned QueryResult object is initially empty.
211  // - Over time, the sql engine will postMessage() back results in batches.
212  // - Each bach will end up calling this appendResultBatch() method.
213  // - If there is any pending promise (e.g. the caller called
214  //   queryResult.waitAllRows()), this call will awake them (if this is the
215  //   last batch).
216  appendResultBatch(resBytes: Uint8Array): void;
217}
218
219// The actual implementation, which bridges together the reader side and the
220// writer side (the one exposed to the Engine). This is the same object so that
221// when the engine pumps new row batches we can resolve pending promises that
222// readers (e.g. track code) are waiting for.
223class QueryResultImpl implements QueryResult, WritableQueryResult {
224  columnNames: string[] = [];
225  private _error?: string;
226  private _numRows = 0;
227  private _isComplete = false;
228  private _errorInfo: QueryErrorInfo;
229  private _statementCount = 0;
230  private _statementWithOutputCount = 0;
231
232  constructor(errorInfo: QueryErrorInfo) {
233    this._errorInfo = errorInfo;
234  }
235
236  // --- QueryResult implementation.
237
238  // TODO(primiano): for the moment new batches are appended but old batches
239  // are never removed. This won't work with abnormally large result sets, as
240  // it will stash all rows in memory. We could switch to a model where the
241  // iterator is destructive and deletes batch objects once iterating past the
242  // end of each batch. If we do that, than we need to assign monotonic IDs to
243  // batches. Also if we do that, we should prevent creating more than one
244  // iterator for a QueryResult.
245  batches: ResultBatch[] = [];
246
247  // Promise awaiting on waitAllRows(). This should be resolved only when the
248  // last result batch has been been retrieved.
249  private allRowsPromise?: Deferred<QueryResult>;
250
251  isComplete(): boolean {
252    return this._isComplete;
253  }
254  numRows(): number {
255    return this._numRows;
256  }
257  error(): string|undefined {
258    return this._error;
259  }
260  columns(): string[] {
261    return this.columnNames;
262  }
263  statementCount(): number {
264    return this._statementCount;
265  }
266  statementWithOutputCount(): number {
267    return this._statementWithOutputCount;
268  }
269
270  iter<T extends Row>(spec: T): RowIterator<T> {
271    const impl = new RowIteratorImplWithRowData(spec, this);
272    return impl as {} as RowIterator<T>;
273  }
274
275  firstRow<T extends Row>(spec: T): T {
276    const impl = new RowIteratorImplWithRowData(spec, this);
277    assertTrue(impl.valid());
278    return impl as {} as RowIterator<T>as T;
279  }
280
281  // Can be called only once.
282  waitAllRows(): Promise<QueryResult> {
283    assertTrue(this.allRowsPromise === undefined);
284    this.allRowsPromise = defer<QueryResult>();
285    if (this._isComplete) {
286      this.resolveOrReject(this.allRowsPromise, this);
287    }
288    return this.allRowsPromise;
289  }
290
291  // --- WritableQueryResult implementation.
292
293  // Called by the engine when a new QueryResult is available. Note that a
294  // single Query() call can yield >1 QueryResult due to result batching
295  // if more than ~64K of data are returned, e.g. when returning O(M) rows.
296  // |resBytes| is a proto-encoded trace_processor.QueryResult message.
297  // It is fine to retain the resBytes without slicing a copy, because
298  // ProtoRingBuffer does the slice() for us (or passes through the buffer
299  // coming from postMessage() (Wasm case) of fetch() (HTTP+RPC case).
300  appendResultBatch(resBytes: Uint8Array) {
301    const reader = protobuf.Reader.create(resBytes);
302    assertTrue(reader.pos === 0);
303    const columnNamesEmptyAtStartOfBatch = this.columnNames.length === 0;
304    const columnNamesSet = new Set<string>();
305    while (reader.pos < reader.len) {
306      const tag = reader.uint32();
307      switch (tag >>> 3) {
308        case 1:  // column_names
309          // Only the first batch should contain the column names. If this fires
310          // something is going wrong in the handling of the batch stream.
311          assertTrue(columnNamesEmptyAtStartOfBatch);
312          const origColName = reader.string();
313          let colName = origColName;
314          // In some rare cases two columns can have the same name (b/194891824)
315          // e.g. `select 1 as x, 2 as x`. These queries don't happen in the
316          // UI code, but they can happen when the user types a query (e.g.
317          // with a join). The most practical thing we can do here is renaming
318          // the columns with a suffix. Keeping the same name will break when
319          // iterating, because column names become iterator object keys.
320          for (let i = 1; columnNamesSet.has(colName); ++i) {
321            colName = `${origColName}_${i}`;
322            assertTrue(i < 100);  // Give up at some point;
323          }
324          columnNamesSet.add(colName);
325          this.columnNames.push(colName);
326          break;
327        case 2:  // error
328          // The query has errored only if the |error| field is non-empty.
329          // In protos, we don't distinguish between non-present and empty.
330          // Make sure we don't propagate ambiguous empty strings to JS.
331          const err = reader.string();
332          this._error = (err !== undefined && err.length) ? err : undefined;
333          break;
334        case 3:  // batch
335          const batchLen = reader.uint32();
336          const batchRaw = resBytes.subarray(reader.pos, reader.pos + batchLen);
337          reader.pos += batchLen;
338
339          // The ResultBatch ctor parses the CellsBatch submessage.
340          const parsedBatch = new ResultBatch(batchRaw);
341          this.batches.push(parsedBatch);
342          this._isComplete = parsedBatch.isLastBatch;
343
344          // In theory one could construct a valid proto serializing the column
345          // names after the cell batches. In practice the QueryResultSerializer
346          // doesn't do that so it's not worth complicating the code.
347          const numColumns = this.columnNames.length;
348          if (numColumns !== 0) {
349            assertTrue(parsedBatch.numCells % numColumns === 0);
350            this._numRows += parsedBatch.numCells / numColumns;
351          } else {
352            // numColumns == 0 is  plausible for queries like CREATE TABLE ... .
353            assertTrue(parsedBatch.numCells === 0);
354          }
355          break;
356
357        case 4:
358          this._statementCount = reader.uint32();
359          break;
360
361        case 5:
362          this._statementWithOutputCount = reader.uint32();
363          break;
364
365        default:
366          console.warn(`Unexpected QueryResult field ${tag >>> 3}`);
367          reader.skipType(tag & 7);
368          break;
369      }  // switch (tag)
370    }    // while (pos < end)
371
372    if (this._isComplete && this.allRowsPromise !== undefined) {
373      this.resolveOrReject(this.allRowsPromise, this);
374    }
375  }
376
377  ensureAllRowsPromise(): Promise<QueryResult> {
378    if (this.allRowsPromise === undefined) {
379      this.waitAllRows();  // Will populate |this.allRowsPromise|.
380    }
381    return assertExists(this.allRowsPromise);
382  }
383
384  private resolveOrReject(promise: Deferred<QueryResult>, arg: QueryResult) {
385    if (this._error === undefined) {
386      promise.resolve(arg);
387    } else {
388      promise.reject(new QueryError(this._error, this._errorInfo));
389    }
390  }
391}
392
393// This class holds onto a received result batch (a Uint8Array) and does some
394// partial parsing to tokenize the various cell groups. This parsing mainly
395// consists of identifying and caching the offsets of each cell group and
396// initializing the varint decoders. This half parsing is done to keep the
397// iterator's next() fast, without decoding everything into memory.
398// This is an internal implementation detail and is not exposed outside. The
399// RowIteratorImpl uses this class to iterate through batches (this class takes
400// care of iterating within a batch, RowIteratorImpl takes care of switching
401// batches when needed).
402// Note: at any point in time there can be more than one ResultIterator
403// referencing the same batch. The batch must be immutable.
404class ResultBatch {
405  readonly isLastBatch: boolean = false;
406  readonly batchBytes: Uint8Array;
407  readonly cellTypesOff: number = 0;
408  readonly cellTypesLen: number = 0;
409  readonly varintOff: number = 0;
410  readonly varintLen: number = 0;
411  readonly float64Cells = new Float64Array();
412  readonly blobCells: Uint8Array[] = [];
413  readonly stringCells: string[] = [];
414
415  // batchBytes is a trace_processor.QueryResult.CellsBatch proto.
416  constructor(batchBytes: Uint8Array) {
417    this.batchBytes = batchBytes;
418    const reader = protobuf.Reader.create(batchBytes);
419    assertTrue(reader.pos === 0);
420    const end = reader.len;
421
422    // Here we deconstruct the proto by hand. The CellsBatch is carefully
423    // designed to allow a very fast parsing from the TS side. We pack all cells
424    // of the same types together, so we can do only one call (per batch) to
425    // TextDecoder.decode(), we can overlay a memory-aligned typedarray for
426    // float values and can quickly tell and type-check the cell types.
427    // One row = N cells (we know the number upfront from the outer message).
428    // Each bach contains always an integer multiple of N cells (i.e. rows are
429    // never fragmented across different batches).
430    while (reader.pos < end) {
431      const tag = reader.uint32();
432      switch (tag >>> 3) {
433        case 1:  // cell types, a packed array containing one CellType per cell.
434          assertTrue((tag & 7) === TAG_LEN_DELIM);  // Must be packed varint.
435          this.cellTypesLen = reader.uint32();
436          this.cellTypesOff = reader.pos;
437          reader.pos += this.cellTypesLen;
438          break;
439
440        case 2:  // varint_cells, a packed varint buffer.
441          assertTrue((tag & 7) === TAG_LEN_DELIM);  // Must be packed varint.
442          const packLen = reader.uint32();
443          this.varintOff = reader.pos;
444          this.varintLen = packLen;
445          assertTrue(reader.buf === batchBytes);
446          assertTrue(
447              this.varintOff + this.varintLen <=
448              batchBytes.byteOffset + batchBytes.byteLength);
449          reader.pos += packLen;
450          break;
451
452        case 3:  // float64_cells, a 64-bit aligned packed fixed64 buffer.
453          assertTrue((tag & 7) === TAG_LEN_DELIM);  // Must be packed varint.
454          const f64Len = reader.uint32();
455          assertTrue(f64Len % 8 === 0);
456          // Float64Array's constructor is evil: the offset is in bytes but the
457          // length is in 8-byte words.
458          const f64Words = f64Len / 8;
459          const f64Off = batchBytes.byteOffset + reader.pos;
460          if (f64Off % 8 === 0) {
461            this.float64Cells =
462                new Float64Array(batchBytes.buffer, f64Off, f64Words);
463          } else {
464            // When using the production code in trace_processor's rpc.cc, the
465            // float64 should be 8-bytes aligned. The slow-path case is only for
466            // tests.
467            const slice = batchBytes.buffer.slice(f64Off, f64Off + f64Len);
468            this.float64Cells = new Float64Array(slice);
469          }
470          reader.pos += f64Len;
471          break;
472
473        case 4:  // blob_cells: one entry per blob.
474          assertTrue((tag & 7) === TAG_LEN_DELIM);
475          // protobufjs's bytes() under the hoods calls slice() and creates
476          // a copy. Fine here as blobs are rare and not a fastpath.
477          this.blobCells.push(new Uint8Array(reader.bytes()));
478          break;
479
480        case 5:  // string_cells: all the string cells concatenated with \0s.
481          assertTrue((tag & 7) === TAG_LEN_DELIM);
482          const strLen = reader.uint32();
483          assertTrue(reader.pos + strLen <= end);
484          const subArr = batchBytes.subarray(reader.pos, reader.pos + strLen);
485          assertTrue(subArr.length === strLen);
486          // The reason why we do this split rather than creating one string
487          // per entry is that utf8 decoding has some non-negligible cost. See
488          // go/postmessage-benchmark .
489          this.stringCells = utf8Decode(subArr).split('\0');
490          reader.pos += strLen;
491          break;
492
493        case 6:  // is_last_batch (boolean).
494          this.isLastBatch = !!reader.bool();
495          break;
496
497        case 7:  // padding for realignment, skip silently.
498          reader.skipType(tag & 7);
499          break;
500
501        default:
502          console.warn(`Unexpected QueryResult.CellsBatch field ${tag >>> 3}`);
503          reader.skipType(tag & 7);
504          break;
505      }  // switch(tag)
506    }    // while (pos < end)
507  }
508
509  get numCells() {
510    return this.cellTypesLen;
511  }
512}
513
514class RowIteratorImpl implements RowIteratorBase {
515  // The spec passed to the iter call containing the expected types, e.g.:
516  // {'colA': NUM, 'colB': NUM_NULL, 'colC': STRING}.
517  // This doesn't ever change.
518  readonly rowSpec: Row;
519
520  // The object that holds the current row. This points to the parent
521  // RowIteratorImplWithRowData instance that created this class.
522  rowData: Row;
523
524  // The QueryResult object we are reading data from. The engine will pump
525  // batches over time into this object.
526  private resultObj: QueryResultImpl;
527
528  // All the member variables in the group below point to the identically-named
529  // members in result.batch[batchIdx]. This is to avoid indirection layers in
530  // the next() hotpath, so we can do this.float64Cells vs
531  // this.resultObj.batch[this.batchIdx].float64Cells.
532  // These are re-set every time tryMoveToNextBatch() is called (and succeeds).
533  private batchIdx = -1;  // The batch index within |result.batches[]|.
534  private batchBytes = new Uint8Array();
535  private columnNames: string[] = [];
536  private numColumns = 0;
537  private cellTypesEnd = -1;  // -1 so the 1st next() hits tryMoveToNextBatch().
538  private float64Cells = new Float64Array();
539  private varIntReader = protobuf.Reader.create(this.batchBytes);
540  private blobCells: Uint8Array[] = [];
541  private stringCells: string[] = [];
542
543  // These members instead are incremented as we read cells from next(). They
544  // are the mutable state of the iterator.
545  private nextCellTypeOff = 0;
546  private nextFloat64Cell = 0;
547  private nextStringCell = 0;
548  private nextBlobCell = 0;
549  private isValid = false;
550
551  constructor(querySpec: Row, rowData: Row, res: QueryResultImpl) {
552    Object.assign(this, querySpec);
553    this.rowData = rowData;
554    this.rowSpec = {...querySpec};  // ... -> Copy all the key/value pairs.
555    this.resultObj = res;
556    this.next();
557  }
558
559  valid(): boolean {
560    return this.isValid;
561  }
562
563
564  get(columnName: string): ColumnType {
565    const res = this.rowData[columnName];
566    if (res === undefined) {
567      throw new Error(
568          `Column '${columnName}' doesn't exist. ` +
569          `Actual columns: [${this.columnNames.join(',')}]`);
570    }
571    return res;
572  }
573
574  // Moves the cursor next by one row and updates |isValid|.
575  // When this fails to move, two cases are possible:
576  // 1. We reached the end of the result set (this is the case if
577  //    QueryResult.isComplete() == true when this fails).
578  // 2. We reached the end of the current batch, but more rows might come later
579  //    (if QueryResult.isComplete() == false).
580  next() {
581    // At some point we might reach the end of the current batch, but the next
582    // batch might be available already. In this case we want next() to
583    // transparently move on to the next batch.
584    while (this.nextCellTypeOff + this.numColumns > this.cellTypesEnd) {
585      // If TraceProcessor is behaving well, we should never end up in a
586      // situation where we have leftover cells. TP is expected to serialize
587      // whole rows in each QueryResult batch and NOT truncate them midway.
588      // If this assert fires the TP RPC logic has a bug.
589      assertTrue(
590          this.nextCellTypeOff === this.cellTypesEnd ||
591          this.cellTypesEnd === -1);
592      if (!this.tryMoveToNextBatch()) {
593        this.isValid = false;
594        return;
595      }
596    }
597
598    const rowData = this.rowData;
599    const numColumns = this.numColumns;
600
601    // Read the current row.
602    for (let i = 0; i < numColumns; i++) {
603      const cellType = this.batchBytes[this.nextCellTypeOff++];
604      const colName = this.columnNames[i];
605
606      switch (cellType) {
607        case CellType.CELL_NULL:
608          rowData[colName] = null;
609          break;
610
611        case CellType.CELL_VARINT:
612          const val = this.varIntReader.int64();
613          // This is very subtle. The return type of int64 can be either a
614          // number or a Long.js {high:number, low:number} if Long.js support is
615          // enabled. The default state seems different in node and browser.
616          // We force-disable Long.js support in the top of this source file.
617          rowData[colName] = val as {} as number;
618          break;
619
620        case CellType.CELL_FLOAT64:
621          rowData[colName] = this.float64Cells[this.nextFloat64Cell++];
622          break;
623
624        case CellType.CELL_STRING:
625          rowData[colName] = this.stringCells[this.nextStringCell++];
626          break;
627
628        case CellType.CELL_BLOB:
629          const blob = this.blobCells[this.nextBlobCell++];
630          throw new Error(`TODO implement BLOB support (${blob})`);
631          // outRow[colName] = blob;
632          break;
633
634        default:
635          throw new Error(`Invalid cell type ${cellType}`);
636      }
637    }  // For (cells)
638    this.isValid = true;
639  }
640
641  private tryMoveToNextBatch(): boolean {
642    const nextBatchIdx = this.batchIdx + 1;
643    if (nextBatchIdx >= this.resultObj.batches.length) {
644      return false;
645    }
646
647    this.columnNames = this.resultObj.columnNames;
648    this.numColumns = this.columnNames.length;
649
650    this.batchIdx = nextBatchIdx;
651    const batch = assertExists(this.resultObj.batches[nextBatchIdx]);
652    this.batchBytes = batch.batchBytes;
653    this.nextCellTypeOff = batch.cellTypesOff;
654    this.cellTypesEnd = batch.cellTypesOff + batch.cellTypesLen;
655    this.float64Cells = batch.float64Cells;
656    this.blobCells = batch.blobCells;
657    this.stringCells = batch.stringCells;
658    this.varIntReader = protobuf.Reader.create(batch.batchBytes);
659    this.varIntReader.pos = batch.varintOff;
660    this.varIntReader.len = batch.varintOff + batch.varintLen;
661    this.nextFloat64Cell = 0;
662    this.nextStringCell = 0;
663    this.nextBlobCell = 0;
664
665    // Check that all the expected columns are present.
666    for (const expectedCol of Object.keys(this.rowSpec)) {
667      if (this.columnNames.indexOf(expectedCol) < 0) {
668        throw new Error(
669            `Column ${expectedCol} not found in the SQL result ` +
670            `set {${this.columnNames.join(' ')}}`);
671      }
672    }
673
674    // Check that the cells types are consistent.
675    const numColumns = this.numColumns;
676    if (batch.numCells === 0) {
677      // This can happen if the query result contains just an error. In this
678      // an empty batch with isLastBatch=true is appended as an EOF marker.
679      // In theory TraceProcessor could return an empty batch in the middle and
680      // that would be fine from a protocol viewpoint. In practice, no code path
681      // does that today so it doesn't make sense trying supporting it with a
682      // recursive call to tryMoveToNextBatch().
683      assertTrue(batch.isLastBatch);
684      return false;
685    }
686
687    assertTrue(numColumns > 0);
688    for (let i = this.nextCellTypeOff; i < this.cellTypesEnd; i++) {
689      const col = (i - this.nextCellTypeOff) % numColumns;
690      const colName = this.columnNames[col];
691      const actualType = this.batchBytes[i] as CellType;
692      const expType = this.rowSpec[colName];
693
694      // If undefined, the caller doesn't want to read this column at all, so
695      // it can be whatever.
696      if (expType === undefined) continue;
697
698      let err = '';
699      if (actualType === CellType.CELL_NULL &&
700          (expType !== STR_NULL && expType !== NUM_NULL)) {
701        err = 'SQL value is NULL but that was not expected' +
702            ` (expected type: ${columnTypeToString(expType)}). ` +
703            'Did you intend to use NUM_NULL or STR_NULL?';
704      } else if (
705          ((actualType === CellType.CELL_VARINT ||
706            actualType === CellType.CELL_FLOAT64) &&
707           (expType !== NUM && expType !== NUM_NULL)) ||
708          ((actualType === CellType.CELL_STRING) &&
709           (expType !== STR && expType !== STR_NULL))) {
710        err = `Incompatible cell type. Expected: ${
711            columnTypeToString(
712                expType)} actual: ${CELL_TYPE_NAMES[actualType]}`;
713      }
714      if (err.length > 0) {
715        throw new Error(
716            `Error @ row: ${Math.floor(i / numColumns)} col: '` +
717            `${colName}': ${err}`);
718      }
719    }
720    return true;
721  }
722}
723
724// This is the object ultimately returned to the client when calling
725// QueryResult.iter(...).
726// The only reason why this is disjoint from RowIteratorImpl is to avoid
727// naming collisions between the members variables required by RowIteratorImpl
728// and the column names returned by the iterator.
729class RowIteratorImplWithRowData implements RowIteratorBase {
730  private _impl: RowIteratorImpl;
731
732  next: () => void;
733  valid: () => boolean;
734  get: (columnName: string) => ColumnType;
735
736  constructor(querySpec: Row, res: QueryResultImpl) {
737    const thisAsRow = this as {} as Row;
738    Object.assign(thisAsRow, querySpec);
739    this._impl = new RowIteratorImpl(querySpec, thisAsRow, res);
740    this.next = this._impl.next.bind(this._impl);
741    this.valid = this._impl.valid.bind(this._impl);
742    this.get = this._impl.get.bind(this._impl);
743  }
744}
745
746// This is a proxy object that wraps QueryResultImpl, adding await-ability.
747// This is so that:
748// 1. Clients that just want to await for the full result set can just call
749//    await engine.query('...') and will get a QueryResult that is guaranteed
750//    to be complete.
751// 2. Clients that know how to handle the streaming can use it straight away.
752class WaitableQueryResultImpl implements QueryResult, WritableQueryResult,
753                                         PromiseLike<QueryResult> {
754  private impl: QueryResultImpl;
755  private thenCalled = false;
756
757  constructor(errorInfo: QueryErrorInfo) {
758    this.impl = new QueryResultImpl(errorInfo);
759  }
760
761  // QueryResult implementation. Proxies all calls to the impl object.
762  iter<T extends Row>(spec: T) {
763     return this.impl.iter(spec);
764  }
765  firstRow<T extends Row>(spec: T) {
766     return this.impl.firstRow(spec);
767  }
768  waitAllRows() {
769     return this.impl.waitAllRows();
770  }
771  isComplete() {
772     return this.impl.isComplete();
773  }
774  numRows() {
775     return this.impl.numRows();
776  }
777  columns() {
778    return this.impl.columns();
779  }
780  error() {
781     return this.impl.error();
782  }
783  statementCount() {
784    return this.impl.statementCount();
785  }
786  statementWithOutputCount() {
787    return this.impl.statementWithOutputCount();
788  }
789
790  // WritableQueryResult implementation.
791  appendResultBatch(resBytes: Uint8Array) {
792    return this.impl.appendResultBatch(resBytes);
793  }
794
795  // PromiseLike<QueryResult> implementaton.
796
797  // tslint:disable-next-line no-any
798  then(onfulfilled: any, onrejected: any): any {
799    assertFalse(this.thenCalled);
800    this.thenCalled = true;
801    return this.impl.ensureAllRowsPromise().then(onfulfilled, onrejected);
802  }
803
804  // tslint:disable-next-line no-any
805  catch(error: any): any {
806    return this.impl.ensureAllRowsPromise().catch(error);
807  }
808
809  // tslint:disable-next-line no-any
810  finally(callback: () => void): any {
811    return this.impl.ensureAllRowsPromise().finally(callback);
812  }
813
814  get[Symbol.toStringTag](): string {
815    return 'Promise<WaitableQueryResult>';
816  }
817}
818
819export function createQueryResult(errorInfo: QueryErrorInfo): QueryResult&
820    Promise<QueryResult>&WritableQueryResult {
821  return new WaitableQueryResultImpl(errorInfo);
822}
823