1// Copyright (C) 2021 The Android Open Source Project 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// This file deals with deserialization and iteration of the proto-encoded 16// byte buffer that is returned by TraceProcessor when invoking the 17// TPM_QUERY_STREAMING method. The returned |query_result| buffer is optimized 18// for being moved cheaply across workers and decoded on-the-flight as we step 19// through the iterator. 20// See comments around QueryResult in trace_processor.proto for more details. 21 22// The classes in this file are organized as follows: 23// 24// QueryResultImpl: 25// The object returned by the Engine.query(sql) method. 26// This object is a holder of row data. Batches of raw get appended 27// incrementally as they are received by the remote TraceProcessor instance. 28// QueryResultImpl also deals with asynchronicity of queries and allows callers 29// to obtain a promise that waits for more (or all) rows. 30// At any point in time the following objects hold a reference to QueryResult: 31// - The Engine: for appending row batches. 32// - UI code, typically controllers, who make queries. 33// 34// ResultBatch: 35// Hold the data, returned by the remote TraceProcessor instance, for a number 36// of rows (TP typically chunks the results in batches of 128KB). 37// A QueryResultImpl holds exclusively ResultBatches for a given query. 38// ResultBatch is not exposed externally, it's just an internal representation 39// that helps with proto decoding. ResultBatch is immutable after it gets 40// appended and decoded. The iteration state is held by the RowIteratorImpl. 41// 42// RowIteratorImpl: 43// Decouples the data owned by QueryResultImpl (and its ResultBatch(es)) from 44// the iteration state. The iterator effectively is the union of a ResultBatch 45// and the row number in it. Rows within the batch are decoded as the user calls 46// next(). When getting at the end of the batch, it takes care of switching to 47// the next batch (if any) within the QueryResultImpl. 48// This object is part of the API exposed to tracks / controllers. 49 50import * as protobuf from 'protobufjs/minimal'; 51 52import {defer, Deferred} from '../base/deferred'; 53import {assertExists, assertFalse, assertTrue} from '../base/logging'; 54import {utf8Decode} from '../base/string_utils'; 55 56export const NUM = 0; 57export const STR = 'str'; 58export const NUM_NULL: number|null = 1; 59export const STR_NULL: string|null = 'str_null'; 60 61export type ColumnType = string|number|null; 62 63// Info that could help debug a query error. For example the query 64// in question, the stack where the query was issued, the active 65// plugin etc. 66export interface QueryErrorInfo { 67 query: string; 68} 69 70export class QueryError extends Error { 71 readonly query: string; 72 73 constructor(message: string, info: QueryErrorInfo) { 74 super(message); 75 this.query = info.query; 76 } 77 78 toString() { 79 return `Query: ${this.query}\n` + super.toString(); 80 } 81} 82 83// One row extracted from an SQL result: 84export interface Row { 85 [key: string]: ColumnType; 86} 87 88// The methods that any iterator has to implement. 89export interface RowIteratorBase { 90 valid(): boolean; 91 next(): void; 92 93 // Reflection support for cases where the column names are not known upfront 94 // (e.g. the query result table for user-provided SQL queries). 95 // It throws if the passed column name doesn't exist. 96 // Example usage: 97 // for (const it = queryResult.iter({}); it.valid(); it.next()) { 98 // for (const columnName : queryResult.columns()) { 99 // console.log(it.get(columnName)); 100 get(columnName: string): ColumnType; 101} 102 103// A RowIterator is a type that has all the fields defined in the query spec 104// plus the valid() and next() operators. This is to ultimately allow the 105// clients to do: 106// const result = await engine.query("select name, surname, id from people;"); 107// const iter = queryResult.iter({name: STR, surname: STR, id: NUM}); 108// for (; iter.valid(); iter.next()) 109// console.log(iter.name, iter.surname); 110export type RowIterator<T extends Row> = RowIteratorBase&T; 111 112function columnTypeToString(t: ColumnType): string { 113 switch (t) { 114 case NUM: 115 return 'NUM'; 116 case NUM_NULL: 117 return 'NUM_NULL'; 118 case STR: 119 return 'STR'; 120 case STR_NULL: 121 return 'STR_NULL'; 122 default: 123 return `INVALID(${t})`; 124 } 125} 126 127// Disable Long.js support in protobuf. This seems to be enabled only in tests 128// but not in production code. In any case, for now we want casting to number 129// accepting the 2**53 limitation. This is consistent with passing 130// --force-number in the protobuf.js codegen invocation in //ui/BUILD.gn . 131// See also https://github.com/protobufjs/protobuf.js/issues/1253 . 132(protobuf.util as {} as {Long: undefined}).Long = undefined; 133protobuf.configure(); 134 135// This has to match CellType in trace_processor.proto. 136enum CellType { 137 CELL_NULL = 1, 138 CELL_VARINT = 2, 139 CELL_FLOAT64 = 3, 140 CELL_STRING = 4, 141 CELL_BLOB = 5, 142} 143 144const CELL_TYPE_NAMES = 145 ['UNKNOWN', 'NULL', 'VARINT', 'FLOAT64', 'STRING', 'BLOB']; 146 147const TAG_LEN_DELIM = 2; 148 149// This is the interface exposed to readers (e.g. tracks). The underlying object 150// (QueryResultImpl) owns the result data. This allows to obtain iterators on 151// that. In future it will allow to wait for incremental updates (new rows being 152// fetched) for streaming queries. 153export interface QueryResult { 154 // Obtains an iterator. 155 // TODO(primiano): this should have an option to destruct data as we read. In 156 // the case of a long query (e.g. `SELECT * FROM sched` in the query prompt) 157 // we don't want to accumulate everything in memory. OTOH UI tracks want to 158 // keep the data around so they can redraw them on each animation frame. For 159 // now we keep everything in memory in the QueryResultImpl object. 160 // iter<T extends Row>(spec: T): RowIterator<T>; 161 iter<T extends Row>(spec: T): RowIterator<T>; 162 163 // Like iter() for queries that expect only one row. It embeds the valid() 164 // check (i.e. throws if no rows are available) and returns directly the 165 // first result. 166 firstRow<T extends Row>(spec: T): T; 167 168 // If != undefined the query errored out and error() contains the message. 169 error(): string|undefined; 170 171 // Returns the number of rows accumulated so far. Note that this number can 172 // change over time as more batches are received. It becomes stable only 173 // when isComplete() returns true or after waitAllRows() is resolved. 174 numRows(): number; 175 176 // If true all rows have been fetched. Calling iter() will iterate through the 177 // last row. If false, iter() will return an iterator which might iterate 178 // through some rows (or none) but will surely not reach the end. 179 180 isComplete(): boolean; 181 182 // Returns a promise that is resolved only when all rows (i.e. all batches) 183 // have been fetched. The promise return value is always the object iself. 184 waitAllRows(): Promise<QueryResult>; 185 186 // Can return an empty array if called before the first batch is resolved. 187 // This should be called only after having awaited for at least one batch. 188 columns(): string[]; 189 190 // Returns the number of SQL statements in the query 191 // (e.g. 2 'if SELECT 1; SELECT 2;') 192 statementCount(): number; 193 194 // Returns the number of SQL statement that produced output rows. This number 195 // is <= statementCount(). 196 statementWithOutputCount(): number; 197 198 // TODO(primiano): next CLs will introduce a waitMoreRows() to allow tracks 199 // to await until some more data (but not necessarily all) is available. For 200 // now everything uses waitAllRows(). 201} 202 203// Interface exposed to engine.ts to pump in the data as new row batches arrive. 204export interface WritableQueryResult extends QueryResult { 205 // |resBytes| is a proto-encoded trace_processor.QueryResult message. 206 // The overall flow looks as follows: 207 // - The user calls engine.query('select ...') and gets a QueryResult back. 208 // - The query call posts a message to the worker that runs the SQL engine ( 209 // or sends a HTTP request in case of the RPC+HTTP interface). 210 // - The returned QueryResult object is initially empty. 211 // - Over time, the sql engine will postMessage() back results in batches. 212 // - Each bach will end up calling this appendResultBatch() method. 213 // - If there is any pending promise (e.g. the caller called 214 // queryResult.waitAllRows()), this call will awake them (if this is the 215 // last batch). 216 appendResultBatch(resBytes: Uint8Array): void; 217} 218 219// The actual implementation, which bridges together the reader side and the 220// writer side (the one exposed to the Engine). This is the same object so that 221// when the engine pumps new row batches we can resolve pending promises that 222// readers (e.g. track code) are waiting for. 223class QueryResultImpl implements QueryResult, WritableQueryResult { 224 columnNames: string[] = []; 225 private _error?: string; 226 private _numRows = 0; 227 private _isComplete = false; 228 private _errorInfo: QueryErrorInfo; 229 private _statementCount = 0; 230 private _statementWithOutputCount = 0; 231 232 constructor(errorInfo: QueryErrorInfo) { 233 this._errorInfo = errorInfo; 234 } 235 236 // --- QueryResult implementation. 237 238 // TODO(primiano): for the moment new batches are appended but old batches 239 // are never removed. This won't work with abnormally large result sets, as 240 // it will stash all rows in memory. We could switch to a model where the 241 // iterator is destructive and deletes batch objects once iterating past the 242 // end of each batch. If we do that, than we need to assign monotonic IDs to 243 // batches. Also if we do that, we should prevent creating more than one 244 // iterator for a QueryResult. 245 batches: ResultBatch[] = []; 246 247 // Promise awaiting on waitAllRows(). This should be resolved only when the 248 // last result batch has been been retrieved. 249 private allRowsPromise?: Deferred<QueryResult>; 250 251 isComplete(): boolean { 252 return this._isComplete; 253 } 254 numRows(): number { 255 return this._numRows; 256 } 257 error(): string|undefined { 258 return this._error; 259 } 260 columns(): string[] { 261 return this.columnNames; 262 } 263 statementCount(): number { 264 return this._statementCount; 265 } 266 statementWithOutputCount(): number { 267 return this._statementWithOutputCount; 268 } 269 270 iter<T extends Row>(spec: T): RowIterator<T> { 271 const impl = new RowIteratorImplWithRowData(spec, this); 272 return impl as {} as RowIterator<T>; 273 } 274 275 firstRow<T extends Row>(spec: T): T { 276 const impl = new RowIteratorImplWithRowData(spec, this); 277 assertTrue(impl.valid()); 278 return impl as {} as RowIterator<T>as T; 279 } 280 281 // Can be called only once. 282 waitAllRows(): Promise<QueryResult> { 283 assertTrue(this.allRowsPromise === undefined); 284 this.allRowsPromise = defer<QueryResult>(); 285 if (this._isComplete) { 286 this.resolveOrReject(this.allRowsPromise, this); 287 } 288 return this.allRowsPromise; 289 } 290 291 // --- WritableQueryResult implementation. 292 293 // Called by the engine when a new QueryResult is available. Note that a 294 // single Query() call can yield >1 QueryResult due to result batching 295 // if more than ~64K of data are returned, e.g. when returning O(M) rows. 296 // |resBytes| is a proto-encoded trace_processor.QueryResult message. 297 // It is fine to retain the resBytes without slicing a copy, because 298 // ProtoRingBuffer does the slice() for us (or passes through the buffer 299 // coming from postMessage() (Wasm case) of fetch() (HTTP+RPC case). 300 appendResultBatch(resBytes: Uint8Array) { 301 const reader = protobuf.Reader.create(resBytes); 302 assertTrue(reader.pos === 0); 303 const columnNamesEmptyAtStartOfBatch = this.columnNames.length === 0; 304 const columnNamesSet = new Set<string>(); 305 while (reader.pos < reader.len) { 306 const tag = reader.uint32(); 307 switch (tag >>> 3) { 308 case 1: // column_names 309 // Only the first batch should contain the column names. If this fires 310 // something is going wrong in the handling of the batch stream. 311 assertTrue(columnNamesEmptyAtStartOfBatch); 312 const origColName = reader.string(); 313 let colName = origColName; 314 // In some rare cases two columns can have the same name (b/194891824) 315 // e.g. `select 1 as x, 2 as x`. These queries don't happen in the 316 // UI code, but they can happen when the user types a query (e.g. 317 // with a join). The most practical thing we can do here is renaming 318 // the columns with a suffix. Keeping the same name will break when 319 // iterating, because column names become iterator object keys. 320 for (let i = 1; columnNamesSet.has(colName); ++i) { 321 colName = `${origColName}_${i}`; 322 assertTrue(i < 100); // Give up at some point; 323 } 324 columnNamesSet.add(colName); 325 this.columnNames.push(colName); 326 break; 327 case 2: // error 328 // The query has errored only if the |error| field is non-empty. 329 // In protos, we don't distinguish between non-present and empty. 330 // Make sure we don't propagate ambiguous empty strings to JS. 331 const err = reader.string(); 332 this._error = (err !== undefined && err.length) ? err : undefined; 333 break; 334 case 3: // batch 335 const batchLen = reader.uint32(); 336 const batchRaw = resBytes.subarray(reader.pos, reader.pos + batchLen); 337 reader.pos += batchLen; 338 339 // The ResultBatch ctor parses the CellsBatch submessage. 340 const parsedBatch = new ResultBatch(batchRaw); 341 this.batches.push(parsedBatch); 342 this._isComplete = parsedBatch.isLastBatch; 343 344 // In theory one could construct a valid proto serializing the column 345 // names after the cell batches. In practice the QueryResultSerializer 346 // doesn't do that so it's not worth complicating the code. 347 const numColumns = this.columnNames.length; 348 if (numColumns !== 0) { 349 assertTrue(parsedBatch.numCells % numColumns === 0); 350 this._numRows += parsedBatch.numCells / numColumns; 351 } else { 352 // numColumns == 0 is plausible for queries like CREATE TABLE ... . 353 assertTrue(parsedBatch.numCells === 0); 354 } 355 break; 356 357 case 4: 358 this._statementCount = reader.uint32(); 359 break; 360 361 case 5: 362 this._statementWithOutputCount = reader.uint32(); 363 break; 364 365 default: 366 console.warn(`Unexpected QueryResult field ${tag >>> 3}`); 367 reader.skipType(tag & 7); 368 break; 369 } // switch (tag) 370 } // while (pos < end) 371 372 if (this._isComplete && this.allRowsPromise !== undefined) { 373 this.resolveOrReject(this.allRowsPromise, this); 374 } 375 } 376 377 ensureAllRowsPromise(): Promise<QueryResult> { 378 if (this.allRowsPromise === undefined) { 379 this.waitAllRows(); // Will populate |this.allRowsPromise|. 380 } 381 return assertExists(this.allRowsPromise); 382 } 383 384 private resolveOrReject(promise: Deferred<QueryResult>, arg: QueryResult) { 385 if (this._error === undefined) { 386 promise.resolve(arg); 387 } else { 388 promise.reject(new QueryError(this._error, this._errorInfo)); 389 } 390 } 391} 392 393// This class holds onto a received result batch (a Uint8Array) and does some 394// partial parsing to tokenize the various cell groups. This parsing mainly 395// consists of identifying and caching the offsets of each cell group and 396// initializing the varint decoders. This half parsing is done to keep the 397// iterator's next() fast, without decoding everything into memory. 398// This is an internal implementation detail and is not exposed outside. The 399// RowIteratorImpl uses this class to iterate through batches (this class takes 400// care of iterating within a batch, RowIteratorImpl takes care of switching 401// batches when needed). 402// Note: at any point in time there can be more than one ResultIterator 403// referencing the same batch. The batch must be immutable. 404class ResultBatch { 405 readonly isLastBatch: boolean = false; 406 readonly batchBytes: Uint8Array; 407 readonly cellTypesOff: number = 0; 408 readonly cellTypesLen: number = 0; 409 readonly varintOff: number = 0; 410 readonly varintLen: number = 0; 411 readonly float64Cells = new Float64Array(); 412 readonly blobCells: Uint8Array[] = []; 413 readonly stringCells: string[] = []; 414 415 // batchBytes is a trace_processor.QueryResult.CellsBatch proto. 416 constructor(batchBytes: Uint8Array) { 417 this.batchBytes = batchBytes; 418 const reader = protobuf.Reader.create(batchBytes); 419 assertTrue(reader.pos === 0); 420 const end = reader.len; 421 422 // Here we deconstruct the proto by hand. The CellsBatch is carefully 423 // designed to allow a very fast parsing from the TS side. We pack all cells 424 // of the same types together, so we can do only one call (per batch) to 425 // TextDecoder.decode(), we can overlay a memory-aligned typedarray for 426 // float values and can quickly tell and type-check the cell types. 427 // One row = N cells (we know the number upfront from the outer message). 428 // Each bach contains always an integer multiple of N cells (i.e. rows are 429 // never fragmented across different batches). 430 while (reader.pos < end) { 431 const tag = reader.uint32(); 432 switch (tag >>> 3) { 433 case 1: // cell types, a packed array containing one CellType per cell. 434 assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint. 435 this.cellTypesLen = reader.uint32(); 436 this.cellTypesOff = reader.pos; 437 reader.pos += this.cellTypesLen; 438 break; 439 440 case 2: // varint_cells, a packed varint buffer. 441 assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint. 442 const packLen = reader.uint32(); 443 this.varintOff = reader.pos; 444 this.varintLen = packLen; 445 assertTrue(reader.buf === batchBytes); 446 assertTrue( 447 this.varintOff + this.varintLen <= 448 batchBytes.byteOffset + batchBytes.byteLength); 449 reader.pos += packLen; 450 break; 451 452 case 3: // float64_cells, a 64-bit aligned packed fixed64 buffer. 453 assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint. 454 const f64Len = reader.uint32(); 455 assertTrue(f64Len % 8 === 0); 456 // Float64Array's constructor is evil: the offset is in bytes but the 457 // length is in 8-byte words. 458 const f64Words = f64Len / 8; 459 const f64Off = batchBytes.byteOffset + reader.pos; 460 if (f64Off % 8 === 0) { 461 this.float64Cells = 462 new Float64Array(batchBytes.buffer, f64Off, f64Words); 463 } else { 464 // When using the production code in trace_processor's rpc.cc, the 465 // float64 should be 8-bytes aligned. The slow-path case is only for 466 // tests. 467 const slice = batchBytes.buffer.slice(f64Off, f64Off + f64Len); 468 this.float64Cells = new Float64Array(slice); 469 } 470 reader.pos += f64Len; 471 break; 472 473 case 4: // blob_cells: one entry per blob. 474 assertTrue((tag & 7) === TAG_LEN_DELIM); 475 // protobufjs's bytes() under the hoods calls slice() and creates 476 // a copy. Fine here as blobs are rare and not a fastpath. 477 this.blobCells.push(new Uint8Array(reader.bytes())); 478 break; 479 480 case 5: // string_cells: all the string cells concatenated with \0s. 481 assertTrue((tag & 7) === TAG_LEN_DELIM); 482 const strLen = reader.uint32(); 483 assertTrue(reader.pos + strLen <= end); 484 const subArr = batchBytes.subarray(reader.pos, reader.pos + strLen); 485 assertTrue(subArr.length === strLen); 486 // The reason why we do this split rather than creating one string 487 // per entry is that utf8 decoding has some non-negligible cost. See 488 // go/postmessage-benchmark . 489 this.stringCells = utf8Decode(subArr).split('\0'); 490 reader.pos += strLen; 491 break; 492 493 case 6: // is_last_batch (boolean). 494 this.isLastBatch = !!reader.bool(); 495 break; 496 497 case 7: // padding for realignment, skip silently. 498 reader.skipType(tag & 7); 499 break; 500 501 default: 502 console.warn(`Unexpected QueryResult.CellsBatch field ${tag >>> 3}`); 503 reader.skipType(tag & 7); 504 break; 505 } // switch(tag) 506 } // while (pos < end) 507 } 508 509 get numCells() { 510 return this.cellTypesLen; 511 } 512} 513 514class RowIteratorImpl implements RowIteratorBase { 515 // The spec passed to the iter call containing the expected types, e.g.: 516 // {'colA': NUM, 'colB': NUM_NULL, 'colC': STRING}. 517 // This doesn't ever change. 518 readonly rowSpec: Row; 519 520 // The object that holds the current row. This points to the parent 521 // RowIteratorImplWithRowData instance that created this class. 522 rowData: Row; 523 524 // The QueryResult object we are reading data from. The engine will pump 525 // batches over time into this object. 526 private resultObj: QueryResultImpl; 527 528 // All the member variables in the group below point to the identically-named 529 // members in result.batch[batchIdx]. This is to avoid indirection layers in 530 // the next() hotpath, so we can do this.float64Cells vs 531 // this.resultObj.batch[this.batchIdx].float64Cells. 532 // These are re-set every time tryMoveToNextBatch() is called (and succeeds). 533 private batchIdx = -1; // The batch index within |result.batches[]|. 534 private batchBytes = new Uint8Array(); 535 private columnNames: string[] = []; 536 private numColumns = 0; 537 private cellTypesEnd = -1; // -1 so the 1st next() hits tryMoveToNextBatch(). 538 private float64Cells = new Float64Array(); 539 private varIntReader = protobuf.Reader.create(this.batchBytes); 540 private blobCells: Uint8Array[] = []; 541 private stringCells: string[] = []; 542 543 // These members instead are incremented as we read cells from next(). They 544 // are the mutable state of the iterator. 545 private nextCellTypeOff = 0; 546 private nextFloat64Cell = 0; 547 private nextStringCell = 0; 548 private nextBlobCell = 0; 549 private isValid = false; 550 551 constructor(querySpec: Row, rowData: Row, res: QueryResultImpl) { 552 Object.assign(this, querySpec); 553 this.rowData = rowData; 554 this.rowSpec = {...querySpec}; // ... -> Copy all the key/value pairs. 555 this.resultObj = res; 556 this.next(); 557 } 558 559 valid(): boolean { 560 return this.isValid; 561 } 562 563 564 get(columnName: string): ColumnType { 565 const res = this.rowData[columnName]; 566 if (res === undefined) { 567 throw new Error( 568 `Column '${columnName}' doesn't exist. ` + 569 `Actual columns: [${this.columnNames.join(',')}]`); 570 } 571 return res; 572 } 573 574 // Moves the cursor next by one row and updates |isValid|. 575 // When this fails to move, two cases are possible: 576 // 1. We reached the end of the result set (this is the case if 577 // QueryResult.isComplete() == true when this fails). 578 // 2. We reached the end of the current batch, but more rows might come later 579 // (if QueryResult.isComplete() == false). 580 next() { 581 // At some point we might reach the end of the current batch, but the next 582 // batch might be available already. In this case we want next() to 583 // transparently move on to the next batch. 584 while (this.nextCellTypeOff + this.numColumns > this.cellTypesEnd) { 585 // If TraceProcessor is behaving well, we should never end up in a 586 // situation where we have leftover cells. TP is expected to serialize 587 // whole rows in each QueryResult batch and NOT truncate them midway. 588 // If this assert fires the TP RPC logic has a bug. 589 assertTrue( 590 this.nextCellTypeOff === this.cellTypesEnd || 591 this.cellTypesEnd === -1); 592 if (!this.tryMoveToNextBatch()) { 593 this.isValid = false; 594 return; 595 } 596 } 597 598 const rowData = this.rowData; 599 const numColumns = this.numColumns; 600 601 // Read the current row. 602 for (let i = 0; i < numColumns; i++) { 603 const cellType = this.batchBytes[this.nextCellTypeOff++]; 604 const colName = this.columnNames[i]; 605 606 switch (cellType) { 607 case CellType.CELL_NULL: 608 rowData[colName] = null; 609 break; 610 611 case CellType.CELL_VARINT: 612 const val = this.varIntReader.int64(); 613 // This is very subtle. The return type of int64 can be either a 614 // number or a Long.js {high:number, low:number} if Long.js support is 615 // enabled. The default state seems different in node and browser. 616 // We force-disable Long.js support in the top of this source file. 617 rowData[colName] = val as {} as number; 618 break; 619 620 case CellType.CELL_FLOAT64: 621 rowData[colName] = this.float64Cells[this.nextFloat64Cell++]; 622 break; 623 624 case CellType.CELL_STRING: 625 rowData[colName] = this.stringCells[this.nextStringCell++]; 626 break; 627 628 case CellType.CELL_BLOB: 629 const blob = this.blobCells[this.nextBlobCell++]; 630 throw new Error(`TODO implement BLOB support (${blob})`); 631 // outRow[colName] = blob; 632 break; 633 634 default: 635 throw new Error(`Invalid cell type ${cellType}`); 636 } 637 } // For (cells) 638 this.isValid = true; 639 } 640 641 private tryMoveToNextBatch(): boolean { 642 const nextBatchIdx = this.batchIdx + 1; 643 if (nextBatchIdx >= this.resultObj.batches.length) { 644 return false; 645 } 646 647 this.columnNames = this.resultObj.columnNames; 648 this.numColumns = this.columnNames.length; 649 650 this.batchIdx = nextBatchIdx; 651 const batch = assertExists(this.resultObj.batches[nextBatchIdx]); 652 this.batchBytes = batch.batchBytes; 653 this.nextCellTypeOff = batch.cellTypesOff; 654 this.cellTypesEnd = batch.cellTypesOff + batch.cellTypesLen; 655 this.float64Cells = batch.float64Cells; 656 this.blobCells = batch.blobCells; 657 this.stringCells = batch.stringCells; 658 this.varIntReader = protobuf.Reader.create(batch.batchBytes); 659 this.varIntReader.pos = batch.varintOff; 660 this.varIntReader.len = batch.varintOff + batch.varintLen; 661 this.nextFloat64Cell = 0; 662 this.nextStringCell = 0; 663 this.nextBlobCell = 0; 664 665 // Check that all the expected columns are present. 666 for (const expectedCol of Object.keys(this.rowSpec)) { 667 if (this.columnNames.indexOf(expectedCol) < 0) { 668 throw new Error( 669 `Column ${expectedCol} not found in the SQL result ` + 670 `set {${this.columnNames.join(' ')}}`); 671 } 672 } 673 674 // Check that the cells types are consistent. 675 const numColumns = this.numColumns; 676 if (batch.numCells === 0) { 677 // This can happen if the query result contains just an error. In this 678 // an empty batch with isLastBatch=true is appended as an EOF marker. 679 // In theory TraceProcessor could return an empty batch in the middle and 680 // that would be fine from a protocol viewpoint. In practice, no code path 681 // does that today so it doesn't make sense trying supporting it with a 682 // recursive call to tryMoveToNextBatch(). 683 assertTrue(batch.isLastBatch); 684 return false; 685 } 686 687 assertTrue(numColumns > 0); 688 for (let i = this.nextCellTypeOff; i < this.cellTypesEnd; i++) { 689 const col = (i - this.nextCellTypeOff) % numColumns; 690 const colName = this.columnNames[col]; 691 const actualType = this.batchBytes[i] as CellType; 692 const expType = this.rowSpec[colName]; 693 694 // If undefined, the caller doesn't want to read this column at all, so 695 // it can be whatever. 696 if (expType === undefined) continue; 697 698 let err = ''; 699 if (actualType === CellType.CELL_NULL && 700 (expType !== STR_NULL && expType !== NUM_NULL)) { 701 err = 'SQL value is NULL but that was not expected' + 702 ` (expected type: ${columnTypeToString(expType)}). ` + 703 'Did you intend to use NUM_NULL or STR_NULL?'; 704 } else if ( 705 ((actualType === CellType.CELL_VARINT || 706 actualType === CellType.CELL_FLOAT64) && 707 (expType !== NUM && expType !== NUM_NULL)) || 708 ((actualType === CellType.CELL_STRING) && 709 (expType !== STR && expType !== STR_NULL))) { 710 err = `Incompatible cell type. Expected: ${ 711 columnTypeToString( 712 expType)} actual: ${CELL_TYPE_NAMES[actualType]}`; 713 } 714 if (err.length > 0) { 715 throw new Error( 716 `Error @ row: ${Math.floor(i / numColumns)} col: '` + 717 `${colName}': ${err}`); 718 } 719 } 720 return true; 721 } 722} 723 724// This is the object ultimately returned to the client when calling 725// QueryResult.iter(...). 726// The only reason why this is disjoint from RowIteratorImpl is to avoid 727// naming collisions between the members variables required by RowIteratorImpl 728// and the column names returned by the iterator. 729class RowIteratorImplWithRowData implements RowIteratorBase { 730 private _impl: RowIteratorImpl; 731 732 next: () => void; 733 valid: () => boolean; 734 get: (columnName: string) => ColumnType; 735 736 constructor(querySpec: Row, res: QueryResultImpl) { 737 const thisAsRow = this as {} as Row; 738 Object.assign(thisAsRow, querySpec); 739 this._impl = new RowIteratorImpl(querySpec, thisAsRow, res); 740 this.next = this._impl.next.bind(this._impl); 741 this.valid = this._impl.valid.bind(this._impl); 742 this.get = this._impl.get.bind(this._impl); 743 } 744} 745 746// This is a proxy object that wraps QueryResultImpl, adding await-ability. 747// This is so that: 748// 1. Clients that just want to await for the full result set can just call 749// await engine.query('...') and will get a QueryResult that is guaranteed 750// to be complete. 751// 2. Clients that know how to handle the streaming can use it straight away. 752class WaitableQueryResultImpl implements QueryResult, WritableQueryResult, 753 PromiseLike<QueryResult> { 754 private impl: QueryResultImpl; 755 private thenCalled = false; 756 757 constructor(errorInfo: QueryErrorInfo) { 758 this.impl = new QueryResultImpl(errorInfo); 759 } 760 761 // QueryResult implementation. Proxies all calls to the impl object. 762 iter<T extends Row>(spec: T) { 763 return this.impl.iter(spec); 764 } 765 firstRow<T extends Row>(spec: T) { 766 return this.impl.firstRow(spec); 767 } 768 waitAllRows() { 769 return this.impl.waitAllRows(); 770 } 771 isComplete() { 772 return this.impl.isComplete(); 773 } 774 numRows() { 775 return this.impl.numRows(); 776 } 777 columns() { 778 return this.impl.columns(); 779 } 780 error() { 781 return this.impl.error(); 782 } 783 statementCount() { 784 return this.impl.statementCount(); 785 } 786 statementWithOutputCount() { 787 return this.impl.statementWithOutputCount(); 788 } 789 790 // WritableQueryResult implementation. 791 appendResultBatch(resBytes: Uint8Array) { 792 return this.impl.appendResultBatch(resBytes); 793 } 794 795 // PromiseLike<QueryResult> implementaton. 796 797 // tslint:disable-next-line no-any 798 then(onfulfilled: any, onrejected: any): any { 799 assertFalse(this.thenCalled); 800 this.thenCalled = true; 801 return this.impl.ensureAllRowsPromise().then(onfulfilled, onrejected); 802 } 803 804 // tslint:disable-next-line no-any 805 catch(error: any): any { 806 return this.impl.ensureAllRowsPromise().catch(error); 807 } 808 809 // tslint:disable-next-line no-any 810 finally(callback: () => void): any { 811 return this.impl.ensureAllRowsPromise().finally(callback); 812 } 813 814 get[Symbol.toStringTag](): string { 815 return 'Promise<WaitableQueryResult>'; 816 } 817} 818 819export function createQueryResult(errorInfo: QueryErrorInfo): QueryResult& 820 Promise<QueryResult>&WritableQueryResult { 821 return new WaitableQueryResultImpl(errorInfo); 822} 823