1// Copyright (C) 2021 The Android Open Source Project 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// This file deals with deserialization and iteration of the proto-encoded 16// byte buffer that is returned by TraceProcessor when invoking the 17// TPM_QUERY_STREAMING method. The returned |query_result| buffer is optimized 18// for being moved cheaply across workers and decoded on-the-flight as we step 19// through the iterator. 20// See comments around QueryResult in trace_processor.proto for more details. 21 22// The classes in this file are organized as follows: 23// 24// QueryResultImpl: 25// The object returned by the Engine.query(sql) method. 26// This object is a holder of row data. Batches of raw get appended 27// incrementally as they are received by the remote TraceProcessor instance. 28// QueryResultImpl also deals with asynchronicity of queries and allows callers 29// to obtain a promise that waits for more (or all) rows. 30// At any point in time the following objects hold a reference to QueryResult: 31// - The Engine: for appending row batches. 32// - UI code, typically controllers, who make queries. 33// 34// ResultBatch: 35// Hold the data, returned by the remote TraceProcessor instance, for a number 36// of rows (TP typically chunks the results in batches of 128KB). 37// A QueryResultImpl holds exclusively ResultBatches for a given query. 38// ResultBatch is not exposed externally, it's just an internal representation 39// that helps with proto decoding. ResultBatch is immutable after it gets 40// appended and decoded. The iteration state is held by the RowIteratorImpl. 41// 42// RowIteratorImpl: 43// Decouples the data owned by QueryResultImpl (and its ResultBatch(es)) from 44// the iteration state. The iterator effectively is the union of a ResultBatch 45// and the row number in it. Rows within the batch are decoded as the user calls 46// next(). When getting at the end of the batch, it takes care of switching to 47// the next batch (if any) within the QueryResultImpl. 48// This object is part of the API exposed to tracks / controllers. 49 50// Ensure protobuf is initialized. 51import '../base/static_initializers'; 52 53import protobuf from 'protobufjs/minimal'; 54 55import {defer, Deferred} from '../base/deferred'; 56import {assertExists, assertFalse, assertTrue} from '../base/logging'; 57import {utf8Decode} from '../base/string_utils'; 58import {Duration, duration, Time, time} from '../base/time'; 59 60export type SqlValue = string | number | bigint | null | Uint8Array; 61// TODO(altimin): Replace ColumnType with SqlValue across the codebase and 62// remove export here. 63export type ColumnType = SqlValue; 64 65export const UNKNOWN: ColumnType = null; 66export const NUM = 0; 67export const STR = 'str'; 68export const NUM_NULL: number | null = 1; 69export const STR_NULL: string | null = 'str_null'; 70export const BLOB: Uint8Array = new Uint8Array(); 71export const BLOB_NULL: Uint8Array | null = new Uint8Array(); 72export const LONG: bigint = 0n; 73export const LONG_NULL: bigint | null = 1n; 74 75const SHIFT_32BITS = 32n; 76 77// Fast decode varint int64 into a bigint 78// Inspired by 79// https://github.com/protobufjs/protobuf.js/blob/56b1e64979dae757b67a21d326e16acee39f2267/src/reader.js#L123 80export function decodeInt64Varint(buf: Uint8Array, pos: number): bigint { 81 let hi: number = 0; 82 let lo: number = 0; 83 let i = 0; 84 85 if (buf.length - pos > 4) { 86 // fast route (lo) 87 for (; i < 4; ++i) { 88 // 1st..4th 89 lo = (lo | ((buf[pos] & 127) << (i * 7))) >>> 0; 90 if (buf[pos++] < 128) { 91 return BigInt(lo); 92 } 93 } 94 // 5th 95 lo = (lo | ((buf[pos] & 127) << 28)) >>> 0; 96 hi = (hi | ((buf[pos] & 127) >> 4)) >>> 0; 97 if (buf[pos++] < 128) { 98 return (BigInt(hi) << SHIFT_32BITS) | BigInt(lo); 99 } 100 i = 0; 101 } else { 102 for (; i < 3; ++i) { 103 if (pos >= buf.length) { 104 throw Error('Index out of range'); 105 } 106 // 1st..3rd 107 lo = (lo | ((buf[pos] & 127) << (i * 7))) >>> 0; 108 if (buf[pos++] < 128) { 109 return BigInt(lo); 110 } 111 } 112 // 4th 113 lo = (lo | ((buf[pos++] & 127) << (i * 7))) >>> 0; 114 return (BigInt(hi) << SHIFT_32BITS) | BigInt(lo); 115 } 116 if (buf.length - pos > 4) { 117 // fast route (hi) 118 for (; i < 5; ++i) { 119 // 6th..10th 120 hi = (hi | ((buf[pos] & 127) << (i * 7 + 3))) >>> 0; 121 if (buf[pos++] < 128) { 122 const big = (BigInt(hi) << SHIFT_32BITS) | BigInt(lo); 123 return BigInt.asIntN(64, big); 124 } 125 } 126 } else { 127 for (; i < 5; ++i) { 128 if (pos >= buf.length) { 129 throw Error('Index out of range'); 130 } 131 // 6th..10th 132 hi = (hi | ((buf[pos] & 127) << (i * 7 + 3))) >>> 0; 133 if (buf[pos++] < 128) { 134 const big = (BigInt(hi) << SHIFT_32BITS) | BigInt(lo); 135 return BigInt.asIntN(64, big); 136 } 137 } 138 } 139 throw Error('invalid varint encoding'); 140} 141 142// Info that could help debug a query error. For example the query 143// in question, the stack where the query was issued, the active 144// plugin etc. 145export interface QueryErrorInfo { 146 query: string; 147} 148 149export class QueryError extends Error { 150 readonly query: string; 151 152 constructor(message: string, info: QueryErrorInfo) { 153 super(message); 154 this.query = info.query; 155 } 156 157 toString() { 158 return `${super.toString()}\nQuery:\n${this.query}`; 159 } 160} 161 162// One row extracted from an SQL result: 163export interface Row { 164 [key: string]: ColumnType; 165} 166 167// The methods that any iterator has to implement. 168export interface RowIteratorBase { 169 valid(): boolean; 170 next(): void; 171 172 // Reflection support for cases where the column names are not known upfront 173 // (e.g. the query result table for user-provided SQL queries). 174 // It throws if the passed column name doesn't exist. 175 // Example usage: 176 // for (const it = queryResult.iter({}); it.valid(); it.next()) { 177 // for (const columnName : queryResult.columns()) { 178 // console.log(it.get(columnName)); 179 get(columnName: string): ColumnType; 180} 181 182// A RowIterator is a type that has all the fields defined in the query spec 183// plus the valid() and next() operators. This is to ultimately allow the 184// clients to do: 185// const result = await engine.query("select name, surname, id from people;"); 186// const iter = queryResult.iter({name: STR, surname: STR, id: NUM}); 187// for (; iter.valid(); iter.next()) 188// console.log(iter.name, iter.surname); 189export type RowIterator<T extends Row> = RowIteratorBase & T; 190 191function columnTypeToString(t: ColumnType): string { 192 switch (t) { 193 case NUM: 194 return 'NUM'; 195 case NUM_NULL: 196 return 'NUM_NULL'; 197 case STR: 198 return 'STR'; 199 case STR_NULL: 200 return 'STR_NULL'; 201 case BLOB: 202 return 'BLOB'; 203 case BLOB_NULL: 204 return 'BLOB_NULL'; 205 case LONG: 206 return 'LONG'; 207 case LONG_NULL: 208 return 'LONG_NULL'; 209 case UNKNOWN: 210 return 'UNKNOWN'; 211 default: 212 return `INVALID(${t})`; 213 } 214} 215 216function isCompatible(actual: CellType, expected: ColumnType): boolean { 217 switch (actual) { 218 case CellType.CELL_NULL: 219 return ( 220 expected === NUM_NULL || 221 expected === STR_NULL || 222 expected === BLOB_NULL || 223 expected === LONG_NULL || 224 expected === UNKNOWN 225 ); 226 case CellType.CELL_VARINT: 227 return ( 228 expected === NUM || 229 expected === NUM_NULL || 230 expected === LONG || 231 expected === LONG_NULL || 232 expected === UNKNOWN 233 ); 234 case CellType.CELL_FLOAT64: 235 return expected === NUM || expected === NUM_NULL || expected === UNKNOWN; 236 case CellType.CELL_STRING: 237 return expected === STR || expected === STR_NULL || expected === UNKNOWN; 238 case CellType.CELL_BLOB: 239 return ( 240 expected === BLOB || expected === BLOB_NULL || expected === UNKNOWN 241 ); 242 default: 243 throw new Error(`Unknown CellType ${actual}`); 244 } 245} 246 247// This has to match CellType in trace_processor.proto. 248enum CellType { 249 CELL_NULL = 1, 250 CELL_VARINT = 2, 251 CELL_FLOAT64 = 3, 252 CELL_STRING = 4, 253 CELL_BLOB = 5, 254} 255 256const CELL_TYPE_NAMES = [ 257 'UNKNOWN', 258 'NULL', 259 'VARINT', 260 'FLOAT64', 261 'STRING', 262 'BLOB', 263]; 264 265const TAG_LEN_DELIM = 2; 266 267// This is the interface exposed to readers (e.g. tracks). The underlying object 268// (QueryResultImpl) owns the result data. This allows to obtain iterators on 269// that. In future it will allow to wait for incremental updates (new rows being 270// fetched) for streaming queries. 271export interface QueryResult { 272 // Obtains an iterator. 273 // TODO(primiano): this should have an option to destruct data as we read. In 274 // the case of a long query (e.g. `SELECT * FROM sched` in the query prompt) 275 // we don't want to accumulate everything in memory. OTOH UI tracks want to 276 // keep the data around so they can redraw them on each animation frame. For 277 // now we keep everything in memory in the QueryResultImpl object. 278 // iter<T extends Row>(spec: T): RowIterator<T>; 279 iter<T extends Row>(spec: T): RowIterator<T>; 280 281 // Like iter() for queries that expect only one row. It embeds the valid() 282 // check (i.e. throws if no rows are available) and returns directly the 283 // first result. 284 firstRow<T extends Row>(spec: T): T; 285 286 // If != undefined the query errored out and error() contains the message. 287 error(): string | undefined; 288 289 // Returns the number of rows accumulated so far. Note that this number can 290 // change over time as more batches are received. It becomes stable only 291 // when isComplete() returns true or after waitAllRows() is resolved. 292 numRows(): number; 293 294 // If true all rows have been fetched. Calling iter() will iterate through the 295 // last row. If false, iter() will return an iterator which might iterate 296 // through some rows (or none) but will surely not reach the end. 297 isComplete(): boolean; 298 299 // Returns a promise that is resolved only when all rows (i.e. all batches) 300 // have been fetched. The promise return value is always the object itself. 301 waitAllRows(): Promise<QueryResult>; 302 303 // Returns a promise that is resolved when either: 304 // - more rows are available 305 // - all rows are available 306 // The promise return value is always the object iself. 307 waitMoreRows(): Promise<QueryResult>; 308 309 // Can return an empty array if called before the first batch is resolved. 310 // This should be called only after having awaited for at least one batch. 311 columns(): string[]; 312 313 // Returns the number of SQL statements in the query 314 // (e.g. 2 'if SELECT 1; SELECT 2;') 315 statementCount(): number; 316 317 // Returns the number of SQL statement that produced output rows. This number 318 // is <= statementCount(). 319 statementWithOutputCount(): number; 320 321 // Returns the last SQL statement. 322 lastStatementSql(): string; 323} 324 325// Interface exposed to engine.ts to pump in the data as new row batches arrive. 326export interface WritableQueryResult extends QueryResult { 327 // |resBytes| is a proto-encoded trace_processor.QueryResult message. 328 // The overall flow looks as follows: 329 // - The user calls engine.query('select ...') and gets a QueryResult back. 330 // - The query call posts a message to the worker that runs the SQL engine ( 331 // or sends a HTTP request in case of the RPC+HTTP interface). 332 // - The returned QueryResult object is initially empty. 333 // - Over time, the sql engine will postMessage() back results in batches. 334 // - Each bach will end up calling this appendResultBatch() method. 335 // - If there is any pending promise (e.g. the caller called 336 // queryResult.waitAllRows()), this call will awake them (if this is the 337 // last batch). 338 appendResultBatch(resBytes: Uint8Array): void; 339} 340 341// The actual implementation, which bridges together the reader side and the 342// writer side (the one exposed to the Engine). This is the same object so that 343// when the engine pumps new row batches we can resolve pending promises that 344// readers (e.g. track code) are waiting for. 345class QueryResultImpl implements QueryResult, WritableQueryResult { 346 columnNames: string[] = []; 347 private _error?: string; 348 private _numRows = 0; 349 private _isComplete = false; 350 private _errorInfo: QueryErrorInfo; 351 private _statementCount = 0; 352 private _statementWithOutputCount = 0; 353 private _lastStatementSql = ''; 354 355 constructor(errorInfo: QueryErrorInfo) { 356 this._errorInfo = errorInfo; 357 } 358 359 // --- QueryResult implementation. 360 361 // TODO(primiano): for the moment new batches are appended but old batches 362 // are never removed. This won't work with abnormally large result sets, as 363 // it will stash all rows in memory. We could switch to a model where the 364 // iterator is destructive and deletes batch objects once iterating past the 365 // end of each batch. If we do that, than we need to assign monotonic IDs to 366 // batches. Also if we do that, we should prevent creating more than one 367 // iterator for a QueryResult. 368 batches: ResultBatch[] = []; 369 370 // Promise awaiting on waitAllRows(). This should be resolved only when the 371 // last result batch has been been retrieved. 372 private allRowsPromise?: Deferred<QueryResult>; 373 374 // Promise awaiting on waitMoreRows(). This resolved when the next 375 // batch is appended via appendResultBatch. 376 private moreRowsPromise?: Deferred<QueryResult>; 377 378 isComplete(): boolean { 379 return this._isComplete; 380 } 381 numRows(): number { 382 return this._numRows; 383 } 384 error(): string | undefined { 385 return this._error; 386 } 387 columns(): string[] { 388 return this.columnNames; 389 } 390 statementCount(): number { 391 return this._statementCount; 392 } 393 statementWithOutputCount(): number { 394 return this._statementWithOutputCount; 395 } 396 lastStatementSql(): string { 397 return this._lastStatementSql; 398 } 399 400 iter<T extends Row>(spec: T): RowIterator<T> { 401 const impl = new RowIteratorImplWithRowData(spec, this); 402 return impl as {} as RowIterator<T>; 403 } 404 405 firstRow<T extends Row>(spec: T): T { 406 const impl = new RowIteratorImplWithRowData(spec, this); 407 assertTrue(impl.valid()); 408 return impl as {} as RowIterator<T> as T; 409 } 410 411 // Can be called only once. 412 waitAllRows(): Promise<QueryResult> { 413 assertTrue(this.allRowsPromise === undefined); 414 this.allRowsPromise = defer<QueryResult>(); 415 if (this._isComplete) { 416 this.resolveOrReject(this.allRowsPromise, this); 417 } 418 return this.allRowsPromise; 419 } 420 421 waitMoreRows(): Promise<QueryResult> { 422 if (this.moreRowsPromise !== undefined) { 423 return this.moreRowsPromise; 424 } 425 426 const moreRowsPromise = defer<QueryResult>(); 427 if (this._isComplete) { 428 this.resolveOrReject(moreRowsPromise, this); 429 } else { 430 this.moreRowsPromise = moreRowsPromise; 431 } 432 return moreRowsPromise; 433 } 434 435 // --- WritableQueryResult implementation. 436 437 // Called by the engine when a new QueryResult is available. Note that a 438 // single Query() call can yield >1 QueryResult due to result batching 439 // if more than ~64K of data are returned, e.g. when returning O(M) rows. 440 // |resBytes| is a proto-encoded trace_processor.QueryResult message. 441 // It is fine to retain the resBytes without slicing a copy, because 442 // ProtoRingBuffer does the slice() for us (or passes through the buffer 443 // coming from postMessage() (Wasm case) of fetch() (HTTP+RPC case). 444 appendResultBatch(resBytes: Uint8Array) { 445 const reader = protobuf.Reader.create(resBytes); 446 assertTrue(reader.pos === 0); 447 const columnNamesEmptyAtStartOfBatch = this.columnNames.length === 0; 448 const columnNamesSet = new Set<string>(); 449 while (reader.pos < reader.len) { 450 const tag = reader.uint32(); 451 switch (tag >>> 3) { 452 case 1: // column_names 453 // Only the first batch should contain the column names. If this fires 454 // something is going wrong in the handling of the batch stream. 455 assertTrue(columnNamesEmptyAtStartOfBatch); 456 const origColName = reader.string(); 457 let colName = origColName; 458 // In some rare cases two columns can have the same name (b/194891824) 459 // e.g. `select 1 as x, 2 as x`. These queries don't happen in the 460 // UI code, but they can happen when the user types a query (e.g. 461 // with a join). The most practical thing we can do here is renaming 462 // the columns with a suffix. Keeping the same name will break when 463 // iterating, because column names become iterator object keys. 464 for (let i = 1; columnNamesSet.has(colName); ++i) { 465 colName = `${origColName}_${i}`; 466 assertTrue(i < 100); // Give up at some point; 467 } 468 columnNamesSet.add(colName); 469 this.columnNames.push(colName); 470 break; 471 case 2: // error 472 // The query has errored only if the |error| field is non-empty. 473 // In protos, we don't distinguish between non-present and empty. 474 // Make sure we don't propagate ambiguous empty strings to JS. 475 const err = reader.string(); 476 this._error = err !== undefined && err.length ? err : undefined; 477 break; 478 case 3: // batch 479 const batchLen = reader.uint32(); 480 const batchRaw = resBytes.subarray(reader.pos, reader.pos + batchLen); 481 reader.pos += batchLen; 482 483 // The ResultBatch ctor parses the CellsBatch submessage. 484 const parsedBatch = new ResultBatch(batchRaw); 485 this.batches.push(parsedBatch); 486 this._isComplete = parsedBatch.isLastBatch; 487 488 // In theory one could construct a valid proto serializing the column 489 // names after the cell batches. In practice the QueryResultSerializer 490 // doesn't do that so it's not worth complicating the code. 491 const numColumns = this.columnNames.length; 492 if (numColumns !== 0) { 493 assertTrue(parsedBatch.numCells % numColumns === 0); 494 this._numRows += parsedBatch.numCells / numColumns; 495 } else { 496 // numColumns == 0 is plausible for queries like CREATE TABLE ... . 497 assertTrue(parsedBatch.numCells === 0); 498 } 499 break; 500 501 case 4: 502 this._statementCount = reader.uint32(); 503 break; 504 505 case 5: 506 this._statementWithOutputCount = reader.uint32(); 507 break; 508 509 case 6: 510 this._lastStatementSql = reader.string(); 511 break; 512 513 default: 514 console.warn(`Unexpected QueryResult field ${tag >>> 3}`); 515 reader.skipType(tag & 7); 516 break; 517 } // switch (tag) 518 } // while (pos < end) 519 520 if (this.moreRowsPromise !== undefined) { 521 this.resolveOrReject(this.moreRowsPromise, this); 522 this.moreRowsPromise = undefined; 523 } 524 525 if (this._isComplete && this.allRowsPromise !== undefined) { 526 this.resolveOrReject(this.allRowsPromise, this); 527 } 528 } 529 530 ensureAllRowsPromise(): Promise<QueryResult> { 531 if (this.allRowsPromise === undefined) { 532 this.waitAllRows(); // Will populate |this.allRowsPromise|. 533 } 534 return assertExists(this.allRowsPromise); 535 } 536 537 get errorInfo(): QueryErrorInfo { 538 return this._errorInfo; 539 } 540 541 private resolveOrReject(promise: Deferred<QueryResult>, arg: QueryResult) { 542 if (this._error === undefined) { 543 promise.resolve(arg); 544 } else { 545 promise.reject(new QueryError(this._error, this._errorInfo)); 546 } 547 } 548} 549 550// This class holds onto a received result batch (a Uint8Array) and does some 551// partial parsing to tokenize the various cell groups. This parsing mainly 552// consists of identifying and caching the offsets of each cell group and 553// initializing the varint decoders. This half parsing is done to keep the 554// iterator's next() fast, without decoding everything into memory. 555// This is an internal implementation detail and is not exposed outside. The 556// RowIteratorImpl uses this class to iterate through batches (this class takes 557// care of iterating within a batch, RowIteratorImpl takes care of switching 558// batches when needed). 559// Note: at any point in time there can be more than one ResultIterator 560// referencing the same batch. The batch must be immutable. 561class ResultBatch { 562 readonly isLastBatch: boolean = false; 563 readonly batchBytes: Uint8Array; 564 readonly cellTypesOff: number = 0; 565 readonly cellTypesLen: number = 0; 566 readonly varintOff: number = 0; 567 readonly varintLen: number = 0; 568 readonly float64Cells = new Float64Array(); 569 readonly blobCells: Uint8Array[] = []; 570 readonly stringCells: string[] = []; 571 572 // batchBytes is a trace_processor.QueryResult.CellsBatch proto. 573 constructor(batchBytes: Uint8Array) { 574 this.batchBytes = batchBytes; 575 const reader = protobuf.Reader.create(batchBytes); 576 assertTrue(reader.pos === 0); 577 const end = reader.len; 578 579 // Here we deconstruct the proto by hand. The CellsBatch is carefully 580 // designed to allow a very fast parsing from the TS side. We pack all cells 581 // of the same types together, so we can do only one call (per batch) to 582 // TextDecoder.decode(), we can overlay a memory-aligned typedarray for 583 // float values and can quickly tell and type-check the cell types. 584 // One row = N cells (we know the number upfront from the outer message). 585 // Each bach contains always an integer multiple of N cells (i.e. rows are 586 // never fragmented across different batches). 587 while (reader.pos < end) { 588 const tag = reader.uint32(); 589 switch (tag >>> 3) { 590 case 1: // cell types, a packed array containing one CellType per cell. 591 assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint. 592 this.cellTypesLen = reader.uint32(); 593 this.cellTypesOff = reader.pos; 594 reader.pos += this.cellTypesLen; 595 break; 596 597 case 2: // varint_cells, a packed varint buffer. 598 assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint. 599 const packLen = reader.uint32(); 600 this.varintOff = reader.pos; 601 this.varintLen = packLen; 602 assertTrue(reader.buf === batchBytes); 603 assertTrue( 604 this.varintOff + this.varintLen <= 605 batchBytes.byteOffset + batchBytes.byteLength, 606 ); 607 reader.pos += packLen; 608 break; 609 610 case 3: // float64_cells, a 64-bit aligned packed fixed64 buffer. 611 assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint. 612 const f64Len = reader.uint32(); 613 assertTrue(f64Len % 8 === 0); 614 // Float64Array's constructor is evil: the offset is in bytes but the 615 // length is in 8-byte words. 616 const f64Words = f64Len / 8; 617 const f64Off = batchBytes.byteOffset + reader.pos; 618 if (f64Off % 8 === 0) { 619 this.float64Cells = new Float64Array( 620 batchBytes.buffer, 621 f64Off, 622 f64Words, 623 ); 624 } else { 625 // When using the production code in trace_processor's rpc.cc, the 626 // float64 should be 8-bytes aligned. The slow-path case is only for 627 // tests. 628 const slice = batchBytes.buffer.slice(f64Off, f64Off + f64Len); 629 this.float64Cells = new Float64Array(slice); 630 } 631 reader.pos += f64Len; 632 break; 633 634 case 4: // blob_cells: one entry per blob. 635 assertTrue((tag & 7) === TAG_LEN_DELIM); 636 // protobufjs's bytes() under the hoods calls slice() and creates 637 // a copy. Fine here as blobs are rare and not a fastpath. 638 this.blobCells.push(new Uint8Array(reader.bytes())); 639 break; 640 641 case 5: // string_cells: all the string cells concatenated with \0s. 642 assertTrue((tag & 7) === TAG_LEN_DELIM); 643 const strLen = reader.uint32(); 644 assertTrue(reader.pos + strLen <= end); 645 const subArr = batchBytes.subarray(reader.pos, reader.pos + strLen); 646 assertTrue(subArr.length === strLen); 647 // The reason why we do this split rather than creating one string 648 // per entry is that utf8 decoding has some non-negligible cost. See 649 // go/postmessage-benchmark . 650 this.stringCells = utf8Decode(subArr).split('\0'); 651 reader.pos += strLen; 652 break; 653 654 case 6: // is_last_batch (boolean). 655 this.isLastBatch = !!reader.bool(); 656 break; 657 658 case 7: // padding for realignment, skip silently. 659 reader.skipType(tag & 7); 660 break; 661 662 default: 663 console.warn(`Unexpected QueryResult.CellsBatch field ${tag >>> 3}`); 664 reader.skipType(tag & 7); 665 break; 666 } // switch(tag) 667 } // while (pos < end) 668 } 669 670 get numCells() { 671 return this.cellTypesLen; 672 } 673} 674 675class RowIteratorImpl implements RowIteratorBase { 676 // The spec passed to the iter call containing the expected types, e.g.: 677 // {'colA': NUM, 'colB': NUM_NULL, 'colC': STRING}. 678 // This doesn't ever change. 679 readonly rowSpec: Row; 680 681 // The object that holds the current row. This points to the parent 682 // RowIteratorImplWithRowData instance that created this class. 683 rowData: Row; 684 685 // The QueryResult object we are reading data from. The engine will pump 686 // batches over time into this object. 687 private resultObj: QueryResultImpl; 688 689 // All the member variables in the group below point to the identically-named 690 // members in result.batch[batchIdx]. This is to avoid indirection layers in 691 // the next() hotpath, so we can do this.float64Cells vs 692 // this.resultObj.batch[this.batchIdx].float64Cells. 693 // These are re-set every time tryMoveToNextBatch() is called (and succeeds). 694 private batchIdx = -1; // The batch index within |result.batches[]|. 695 private batchBytes = new Uint8Array(); 696 private columnNames: string[] = []; 697 private numColumns = 0; 698 private cellTypesEnd = -1; // -1 so the 1st next() hits tryMoveToNextBatch(). 699 private float64Cells = new Float64Array(); 700 private varIntReader = protobuf.Reader.create(this.batchBytes); 701 private blobCells: Uint8Array[] = []; 702 private stringCells: string[] = []; 703 704 // These members instead are incremented as we read cells from next(). They 705 // are the mutable state of the iterator. 706 private nextCellTypeOff = 0; 707 private nextFloat64Cell = 0; 708 private nextStringCell = 0; 709 private nextBlobCell = 0; 710 private isValid = false; 711 712 constructor(querySpec: Row, rowData: Row, res: QueryResultImpl) { 713 Object.assign(this, querySpec); 714 this.rowData = rowData; 715 this.rowSpec = {...querySpec}; // ... -> Copy all the key/value pairs. 716 this.resultObj = res; 717 this.next(); 718 } 719 720 valid(): boolean { 721 return this.isValid; 722 } 723 724 private makeError(message: string): QueryError { 725 return new QueryError(message, this.resultObj.errorInfo); 726 } 727 728 get(columnName: string): ColumnType { 729 const res = this.rowData[columnName]; 730 if (res === undefined) { 731 throw this.makeError( 732 `Column '${columnName}' doesn't exist. ` + 733 `Actual columns: [${this.columnNames.join(',')}]`, 734 ); 735 } 736 return res; 737 } 738 739 // Moves the cursor next by one row and updates |isValid|. 740 // When this fails to move, two cases are possible: 741 // 1. We reached the end of the result set (this is the case if 742 // QueryResult.isComplete() == true when this fails). 743 // 2. We reached the end of the current batch, but more rows might come later 744 // (if QueryResult.isComplete() == false). 745 next() { 746 // At some point we might reach the end of the current batch, but the next 747 // batch might be available already. In this case we want next() to 748 // transparently move on to the next batch. 749 while (this.nextCellTypeOff + this.numColumns > this.cellTypesEnd) { 750 // If TraceProcessor is behaving well, we should never end up in a 751 // situation where we have leftover cells. TP is expected to serialize 752 // whole rows in each QueryResult batch and NOT truncate them midway. 753 // If this assert fires the TP RPC logic has a bug. 754 assertTrue( 755 this.nextCellTypeOff === this.cellTypesEnd || this.cellTypesEnd === -1, 756 ); 757 if (!this.tryMoveToNextBatch()) { 758 this.isValid = false; 759 return; 760 } 761 } 762 763 const rowData = this.rowData; 764 const numColumns = this.numColumns; 765 766 // Read the current row. 767 for (let i = 0; i < numColumns; i++) { 768 const cellType = this.batchBytes[this.nextCellTypeOff++]; 769 const colName = this.columnNames[i]; 770 const expType = this.rowSpec[colName]; 771 772 switch (cellType) { 773 case CellType.CELL_NULL: 774 rowData[colName] = null; 775 break; 776 777 case CellType.CELL_VARINT: 778 if (expType === NUM || expType === NUM_NULL) { 779 // This is very subtle. The return type of int64 can be either a 780 // number or a Long.js {high:number, low:number} if Long.js is 781 // installed. The default state seems different in node and browser. 782 // We force-disable Long.js support in the top of this source file. 783 const val = this.varIntReader.int64(); 784 rowData[colName] = val as {} as number; 785 } else { 786 // LONG, LONG_NULL, or unspecified - return as bigint 787 const value = decodeInt64Varint( 788 this.batchBytes, 789 this.varIntReader.pos, 790 ); 791 rowData[colName] = value; 792 this.varIntReader.skip(); // Skips a varint 793 } 794 break; 795 796 case CellType.CELL_FLOAT64: 797 rowData[colName] = this.float64Cells[this.nextFloat64Cell++]; 798 break; 799 800 case CellType.CELL_STRING: 801 rowData[colName] = this.stringCells[this.nextStringCell++]; 802 break; 803 804 case CellType.CELL_BLOB: 805 const blob = this.blobCells[this.nextBlobCell++]; 806 rowData[colName] = blob; 807 break; 808 809 default: 810 throw this.makeError(`Invalid cell type ${cellType}`); 811 } 812 } // For (cells) 813 this.isValid = true; 814 } 815 816 private tryMoveToNextBatch(): boolean { 817 const nextBatchIdx = this.batchIdx + 1; 818 if (nextBatchIdx >= this.resultObj.batches.length) { 819 return false; 820 } 821 822 this.columnNames = this.resultObj.columnNames; 823 this.numColumns = this.columnNames.length; 824 825 this.batchIdx = nextBatchIdx; 826 const batch = assertExists(this.resultObj.batches[nextBatchIdx]); 827 this.batchBytes = batch.batchBytes; 828 this.nextCellTypeOff = batch.cellTypesOff; 829 this.cellTypesEnd = batch.cellTypesOff + batch.cellTypesLen; 830 this.float64Cells = batch.float64Cells; 831 this.blobCells = batch.blobCells; 832 this.stringCells = batch.stringCells; 833 this.varIntReader = protobuf.Reader.create(batch.batchBytes); 834 this.varIntReader.pos = batch.varintOff; 835 this.varIntReader.len = batch.varintOff + batch.varintLen; 836 this.nextFloat64Cell = 0; 837 this.nextStringCell = 0; 838 this.nextBlobCell = 0; 839 840 // Check that all the expected columns are present. 841 for (const expectedCol of Object.keys(this.rowSpec)) { 842 if (this.columnNames.indexOf(expectedCol) < 0) { 843 throw this.makeError( 844 `Column ${expectedCol} not found in the SQL result ` + 845 `set {${this.columnNames.join(' ')}}`, 846 ); 847 } 848 } 849 850 // Check that the cells types are consistent. 851 const numColumns = this.numColumns; 852 if (batch.numCells === 0) { 853 // This can happen if the query result contains just an error. In this 854 // an empty batch with isLastBatch=true is appended as an EOF marker. 855 // In theory TraceProcessor could return an empty batch in the middle and 856 // that would be fine from a protocol viewpoint. In practice, no code path 857 // does that today so it doesn't make sense trying supporting it with a 858 // recursive call to tryMoveToNextBatch(). 859 assertTrue(batch.isLastBatch); 860 return false; 861 } 862 863 assertTrue(numColumns > 0); 864 for (let i = this.nextCellTypeOff; i < this.cellTypesEnd; i++) { 865 const col = (i - this.nextCellTypeOff) % numColumns; 866 const colName = this.columnNames[col]; 867 const actualType = this.batchBytes[i] as CellType; 868 const expType = this.rowSpec[colName]; 869 870 // If undefined, the caller doesn't want to read this column at all, so 871 // it can be whatever. 872 if (expType === undefined) continue; 873 874 let err = ''; 875 if (!isCompatible(actualType, expType)) { 876 if (actualType === CellType.CELL_NULL) { 877 err = 878 'SQL value is NULL but that was not expected' + 879 ` (expected type: ${columnTypeToString(expType)}). ` + 880 'Did you mean NUM_NULL, LONG_NULL, STR_NULL or BLOB_NULL?'; 881 } else { 882 err = `Incompatible cell type. Expected: ${columnTypeToString( 883 expType, 884 )} actual: ${CELL_TYPE_NAMES[actualType]}`; 885 } 886 } 887 if (err.length > 0) { 888 const row = Math.floor(i / numColumns); 889 const message = `Error @ row: ${row} col: '${colName}': ${err}`; 890 throw this.makeError(message); 891 } 892 } 893 return true; 894 } 895} 896 897// This is the object ultimately returned to the client when calling 898// QueryResult.iter(...). 899// The only reason why this is disjoint from RowIteratorImpl is to avoid 900// naming collisions between the members variables required by RowIteratorImpl 901// and the column names returned by the iterator. 902class RowIteratorImplWithRowData implements RowIteratorBase { 903 private _impl: RowIteratorImpl; 904 905 next: () => void; 906 valid: () => boolean; 907 get: (columnName: string) => ColumnType; 908 909 constructor(querySpec: Row, res: QueryResultImpl) { 910 const thisAsRow = this as {} as Row; 911 Object.assign(thisAsRow, querySpec); 912 this._impl = new RowIteratorImpl(querySpec, thisAsRow, res); 913 this.next = this._impl.next.bind(this._impl); 914 this.valid = this._impl.valid.bind(this._impl); 915 this.get = this._impl.get.bind(this._impl); 916 } 917} 918 919// This is a proxy object that wraps QueryResultImpl, adding await-ability. 920// This is so that: 921// 1. Clients that just want to await for the full result set can just call 922// await engine.query('...') and will get a QueryResult that is guaranteed 923// to be complete. 924// 2. Clients that know how to handle the streaming can use it straight away. 925class WaitableQueryResultImpl 926 implements QueryResult, WritableQueryResult, PromiseLike<QueryResult> 927{ 928 private impl: QueryResultImpl; 929 private thenCalled = false; 930 931 constructor(errorInfo: QueryErrorInfo) { 932 this.impl = new QueryResultImpl(errorInfo); 933 } 934 935 // QueryResult implementation. Proxies all calls to the impl object. 936 iter<T extends Row>(spec: T) { 937 return this.impl.iter(spec); 938 } 939 firstRow<T extends Row>(spec: T) { 940 return this.impl.firstRow(spec); 941 } 942 waitAllRows() { 943 return this.impl.waitAllRows(); 944 } 945 waitMoreRows() { 946 return this.impl.waitMoreRows(); 947 } 948 isComplete() { 949 return this.impl.isComplete(); 950 } 951 numRows() { 952 return this.impl.numRows(); 953 } 954 columns() { 955 return this.impl.columns(); 956 } 957 error() { 958 return this.impl.error(); 959 } 960 statementCount() { 961 return this.impl.statementCount(); 962 } 963 statementWithOutputCount() { 964 return this.impl.statementWithOutputCount(); 965 } 966 lastStatementSql() { 967 return this.impl.lastStatementSql(); 968 } 969 970 // WritableQueryResult implementation. 971 appendResultBatch(resBytes: Uint8Array) { 972 return this.impl.appendResultBatch(resBytes); 973 } 974 975 // PromiseLike<QueryResult> implementaton. 976 977 // eslint-disable-next-line @typescript-eslint/no-explicit-any 978 then(onfulfilled: any, onrejected: any): any { 979 assertFalse(this.thenCalled); 980 this.thenCalled = true; 981 return this.impl.ensureAllRowsPromise().then(onfulfilled, onrejected); 982 } 983 984 // eslint-disable-next-line @typescript-eslint/no-explicit-any 985 catch(error: any): any { 986 return this.impl.ensureAllRowsPromise().catch(error); 987 } 988 989 // eslint-disable-next-line @typescript-eslint/no-explicit-any 990 finally(callback: () => void): any { 991 return this.impl.ensureAllRowsPromise().finally(callback); 992 } 993 994 // eslint and clang-format disagree on how to format get[foo](). Let 995 // clang-format win: 996 // eslint-disable-next-line keyword-spacing 997 get [Symbol.toStringTag](): string { 998 return 'Promise<WaitableQueryResult>'; 999 } 1000} 1001 1002export function createQueryResult( 1003 errorInfo: QueryErrorInfo, 1004): QueryResult & Promise<QueryResult> & WritableQueryResult { 1005 return new WaitableQueryResultImpl(errorInfo); 1006} 1007 1008// Throws if the value cannot be reasonably converted to a bigint. 1009// Assumes value is in native time units. 1010export function timeFromSql(value: ColumnType): time { 1011 if (typeof value === 'bigint') { 1012 return Time.fromRaw(value); 1013 } else if (typeof value === 'number') { 1014 return Time.fromRaw(BigInt(Math.floor(value))); 1015 } else if (value === null) { 1016 return Time.ZERO; 1017 } else { 1018 throw Error(`Refusing to create time from unrelated type ${value}`); 1019 } 1020} 1021 1022// Throws if the value cannot be reasonably converted to a bigint. 1023// Assumes value is in nanoseconds. 1024export function durationFromSql(value: ColumnType): duration { 1025 if (typeof value === 'bigint') { 1026 return value; 1027 } else if (typeof value === 'number') { 1028 return BigInt(Math.floor(value)); 1029 } else if (value === null) { 1030 return Duration.ZERO; 1031 } else { 1032 throw Error(`Refusing to create duration from unrelated type ${value}`); 1033 } 1034} 1035