1// Copyright (C) 2021 The Android Open Source Project 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// This file deals with deserialization and iteration of the proto-encoded 16// byte buffer that is returned by TraceProcessor when invoking the 17// TPM_QUERY_STREAMING method. The returned |query_result| buffer is optimized 18// for being moved cheaply across workers and decoded on-the-flight as we step 19// through the iterator. 20// See comments around QueryResult in trace_processor.proto for more details. 21 22// The classes in this file are organized as follows: 23// 24// QueryResultImpl: 25// The object returned by the Engine.query(sql) method. 26// This object is a holder of row data. Batches of raw get appended 27// incrementally as they are received by the remote TraceProcessor instance. 28// QueryResultImpl also deals with asynchronicity of queries and allows callers 29// to obtain a promise that waits for more (or all) rows. 30// At any point in time the following objects hold a reference to QueryResult: 31// - The Engine: for appending row batches. 32// - UI code, typically controllers, who make queries. 33// 34// ResultBatch: 35// Hold the data, returned by the remote TraceProcessor instance, for a number 36// of rows (TP typically chunks the results in batches of 128KB). 37// A QueryResultImpl holds exclusively ResultBatches for a given query. 38// ResultBatch is not exposed externally, it's just an internal representation 39// that helps with proto decoding. ResultBatch is immutable after it gets 40// appended and decoded. The iteration state is held by the RowIteratorImpl. 41// 42// RowIteratorImpl: 43// Decouples the data owned by QueryResultImpl (and its ResultBatch(es)) from 44// the iteration state. The iterator effectively is the union of a ResultBatch 45// and the row number in it. Rows within the batch are decoded as the user calls 46// next(). When getting at the end of the batch, it takes care of switching to 47// the next batch (if any) within the QueryResultImpl. 48// This object is part of the API exposed to tracks / controllers. 49 50// Ensure protobuf is initialized. 51import '../base/static_initializers'; 52import protobuf from 'protobufjs/minimal'; 53import {defer, Deferred} from '../base/deferred'; 54import {assertExists, assertFalse, assertTrue} from '../base/logging'; 55import {utf8Decode} from '../base/string_utils'; 56import {Duration, duration, Time, time} from '../base/time'; 57 58export type SqlValue = string | number | bigint | null | Uint8Array; 59// TODO(altimin): Replace ColumnType with SqlValue across the codebase and 60// remove export here. 61export type ColumnType = SqlValue; 62 63export const UNKNOWN: ColumnType = null; 64export const NUM = 0; 65export const STR = 'str'; 66export const NUM_NULL: number | null = 1; 67export const STR_NULL: string | null = 'str_null'; 68export const BLOB: Uint8Array = new Uint8Array(); 69export const BLOB_NULL: Uint8Array | null = new Uint8Array(); 70export const LONG: bigint = 0n; 71export const LONG_NULL: bigint | null = 1n; 72 73const SHIFT_32BITS = 32n; 74 75// Describes the inheritance tree of the above types. 76const inheritanceTree = new Map<ColumnType, {readonly extends?: ColumnType}>([ 77 [NUM, {extends: NUM_NULL}], 78 [NUM_NULL, {extends: UNKNOWN}], 79 [LONG, {extends: LONG_NULL}], 80 [LONG_NULL, {extends: UNKNOWN}], 81 [BLOB, {extends: BLOB_NULL}], 82 [BLOB_NULL, {extends: UNKNOWN}], 83 [STR, {extends: STR_NULL}], 84 [STR_NULL, {extends: UNKNOWN}], 85 [UNKNOWN, {}], 86]); 87 88/** 89 * Check whether a given type extends another. 90 * 91 * @param required - The type we want to extend. 92 * @param actual - The type to test. 93 * @returns - True if `actual` extends `required`. 94 */ 95export function checkExtends( 96 required: ColumnType, 97 actual: ColumnType, 98): boolean { 99 // If the types are the same, just return true 100 if (required === actual) return true; 101 102 const ancestry = getAncestryPath(actual); 103 return ancestry.includes(required); 104} 105 106/** 107 * Returns the closest common ancestor of two types. 108 */ 109export function unionTypes( 110 typeA: ColumnType, 111 typeB: ColumnType, 112): ColumnType | undefined { 113 // If the types are the same, just return the same type 114 if (typeA === typeB) return typeA; 115 116 // Get the ancestry path for each type 117 const pathA = getAncestryPath(typeA); 118 const pathB = getAncestryPath(typeB); 119 120 // Find the first common type in both ancestry paths 121 for (const type of pathA) { 122 if (pathB.includes(type)) { 123 return type; 124 } 125 } 126 127 return undefined; 128} 129 130/** 131 * Returns the ancestry path from the given type to the root, inclusive. 132 */ 133function getAncestryPath(type: ColumnType): ColumnType[] { 134 const path: ColumnType[] = [type]; 135 let current = inheritanceTree.get(type); 136 137 while (current && current.extends !== undefined) { 138 path.push(current.extends); 139 current = inheritanceTree.get(current.extends); 140 } 141 142 return path; 143} 144 145// Fast decode varint int64 into a bigint 146// Inspired by 147// https://github.com/protobufjs/protobuf.js/blob/56b1e64979dae757b67a21d326e16acee39f2267/src/reader.js#L123 148export function decodeInt64Varint(buf: Uint8Array, pos: number): bigint { 149 let hi: number = 0; 150 let lo: number = 0; 151 let i = 0; 152 153 if (buf.length - pos > 4) { 154 // fast route (lo) 155 for (; i < 4; ++i) { 156 // 1st..4th 157 lo = (lo | ((buf[pos] & 127) << (i * 7))) >>> 0; 158 if (buf[pos++] < 128) { 159 return BigInt(lo); 160 } 161 } 162 // 5th 163 lo = (lo | ((buf[pos] & 127) << 28)) >>> 0; 164 hi = (hi | ((buf[pos] & 127) >> 4)) >>> 0; 165 if (buf[pos++] < 128) { 166 return (BigInt(hi) << SHIFT_32BITS) | BigInt(lo); 167 } 168 i = 0; 169 } else { 170 for (; i < 3; ++i) { 171 if (pos >= buf.length) { 172 throw Error('Index out of range'); 173 } 174 // 1st..3rd 175 lo = (lo | ((buf[pos] & 127) << (i * 7))) >>> 0; 176 if (buf[pos++] < 128) { 177 return BigInt(lo); 178 } 179 } 180 // 4th 181 lo = (lo | ((buf[pos++] & 127) << (i * 7))) >>> 0; 182 return (BigInt(hi) << SHIFT_32BITS) | BigInt(lo); 183 } 184 if (buf.length - pos > 4) { 185 // fast route (hi) 186 for (; i < 5; ++i) { 187 // 6th..10th 188 hi = (hi | ((buf[pos] & 127) << (i * 7 + 3))) >>> 0; 189 if (buf[pos++] < 128) { 190 const big = (BigInt(hi) << SHIFT_32BITS) | BigInt(lo); 191 return BigInt.asIntN(64, big); 192 } 193 } 194 } else { 195 for (; i < 5; ++i) { 196 if (pos >= buf.length) { 197 throw Error('Index out of range'); 198 } 199 // 6th..10th 200 hi = (hi | ((buf[pos] & 127) << (i * 7 + 3))) >>> 0; 201 if (buf[pos++] < 128) { 202 const big = (BigInt(hi) << SHIFT_32BITS) | BigInt(lo); 203 return BigInt.asIntN(64, big); 204 } 205 } 206 } 207 throw Error('invalid varint encoding'); 208} 209 210// Info that could help debug a query error. For example the query 211// in question, the stack where the query was issued, the active 212// plugin etc. 213export interface QueryErrorInfo { 214 query: string; 215} 216 217export class QueryError extends Error { 218 readonly query: string; 219 220 constructor(message: string, info: QueryErrorInfo) { 221 super(message); 222 this.query = info.query; 223 } 224 225 toString() { 226 return `${super.toString()}\nQuery:\n${this.query}`; 227 } 228} 229 230// One row extracted from an SQL result: 231export interface Row { 232 [key: string]: ColumnType; 233} 234 235// The methods that any iterator has to implement. 236export interface RowIteratorBase { 237 valid(): boolean; 238 next(): void; 239 240 // Reflection support for cases where the column names are not known upfront 241 // (e.g. the query result table for user-provided SQL queries). 242 // It throws if the passed column name doesn't exist. 243 // Example usage: 244 // for (const it = queryResult.iter({}); it.valid(); it.next()) { 245 // for (const columnName : queryResult.columns()) { 246 // console.log(it.get(columnName)); 247 get(columnName: string): ColumnType; 248} 249 250// A RowIterator is a type that has all the fields defined in the query spec 251// plus the valid() and next() operators. This is to ultimately allow the 252// clients to do: 253// const result = await engine.query("select name, surname, id from people;"); 254// const iter = queryResult.iter({name: STR, surname: STR, id: NUM}); 255// for (; iter.valid(); iter.next()) 256// console.log(iter.name, iter.surname); 257export type RowIterator<T extends Row> = RowIteratorBase & T; 258 259function columnTypeToString(t: ColumnType): string { 260 switch (t) { 261 case NUM: 262 return 'NUM'; 263 case NUM_NULL: 264 return 'NUM_NULL'; 265 case STR: 266 return 'STR'; 267 case STR_NULL: 268 return 'STR_NULL'; 269 case BLOB: 270 return 'BLOB'; 271 case BLOB_NULL: 272 return 'BLOB_NULL'; 273 case LONG: 274 return 'LONG'; 275 case LONG_NULL: 276 return 'LONG_NULL'; 277 case UNKNOWN: 278 return 'UNKNOWN'; 279 default: 280 return `INVALID(${t})`; 281 } 282} 283 284function isCompatible(actual: CellType, expected: ColumnType): boolean { 285 switch (actual) { 286 case CellType.CELL_NULL: 287 return ( 288 expected === NUM_NULL || 289 expected === STR_NULL || 290 expected === BLOB_NULL || 291 expected === LONG_NULL || 292 expected === UNKNOWN 293 ); 294 case CellType.CELL_VARINT: 295 return ( 296 expected === NUM || 297 expected === NUM_NULL || 298 expected === LONG || 299 expected === LONG_NULL || 300 expected === UNKNOWN 301 ); 302 case CellType.CELL_FLOAT64: 303 return expected === NUM || expected === NUM_NULL || expected === UNKNOWN; 304 case CellType.CELL_STRING: 305 return expected === STR || expected === STR_NULL || expected === UNKNOWN; 306 case CellType.CELL_BLOB: 307 return ( 308 expected === BLOB || expected === BLOB_NULL || expected === UNKNOWN 309 ); 310 default: 311 throw new Error(`Unknown CellType ${actual}`); 312 } 313} 314 315// This has to match CellType in trace_processor.proto. 316enum CellType { 317 CELL_NULL = 1, 318 CELL_VARINT = 2, 319 CELL_FLOAT64 = 3, 320 CELL_STRING = 4, 321 CELL_BLOB = 5, 322} 323 324const CELL_TYPE_NAMES = [ 325 'UNKNOWN', 326 'NULL', 327 'VARINT', 328 'FLOAT64', 329 'STRING', 330 'BLOB', 331]; 332 333const TAG_LEN_DELIM = 2; 334 335// This is the interface exposed to readers (e.g. tracks). The underlying object 336// (QueryResultImpl) owns the result data. This allows to obtain iterators on 337// that. In future it will allow to wait for incremental updates (new rows being 338// fetched) for streaming queries. 339export interface QueryResult { 340 // Obtains an iterator. 341 // TODO(primiano): this should have an option to destruct data as we read. In 342 // the case of a long query (e.g. `SELECT * FROM sched` in the query prompt) 343 // we don't want to accumulate everything in memory. OTOH UI tracks want to 344 // keep the data around so they can redraw them on each animation frame. For 345 // now we keep everything in memory in the QueryResultImpl object. 346 // iter<T extends Row>(spec: T): RowIterator<T>; 347 iter<T extends Row>(spec: T): RowIterator<T>; 348 349 // Like iter() for queries that expect only one row. It embeds the valid() 350 // check (i.e. throws if no rows are available) and returns directly the 351 // first result. 352 firstRow<T extends Row>(spec: T): T; 353 354 // Like firstRow() but returns undefined if no rows are available. 355 maybeFirstRow<T extends Row>(spec: T): T | undefined; 356 357 // If != undefined the query errored out and error() contains the message. 358 error(): string | undefined; 359 360 // Returns the number of rows accumulated so far. Note that this number can 361 // change over time as more batches are received. It becomes stable only 362 // when isComplete() returns true or after waitAllRows() is resolved. 363 numRows(): number; 364 365 // If true all rows have been fetched. Calling iter() will iterate through the 366 // last row. If false, iter() will return an iterator which might iterate 367 // through some rows (or none) but will surely not reach the end. 368 isComplete(): boolean; 369 370 // Returns a promise that is resolved only when all rows (i.e. all batches) 371 // have been fetched. The promise return value is always the object itself. 372 waitAllRows(): Promise<QueryResult>; 373 374 // Returns a promise that is resolved when either: 375 // - more rows are available 376 // - all rows are available 377 // The promise return value is always the object iself. 378 waitMoreRows(): Promise<QueryResult>; 379 380 // Can return an empty array if called before the first batch is resolved. 381 // This should be called only after having awaited for at least one batch. 382 columns(): string[]; 383 384 // Returns the number of SQL statements in the query 385 // (e.g. 2 'if SELECT 1; SELECT 2;') 386 statementCount(): number; 387 388 // Returns the number of SQL statement that produced output rows. This number 389 // is <= statementCount(). 390 statementWithOutputCount(): number; 391 392 // Returns the last SQL statement. 393 lastStatementSql(): string; 394} 395 396// Interface exposed to engine.ts to pump in the data as new row batches arrive. 397export interface WritableQueryResult extends QueryResult { 398 // |resBytes| is a proto-encoded trace_processor.QueryResult message. 399 // The overall flow looks as follows: 400 // - The user calls engine.query('select ...') and gets a QueryResult back. 401 // - The query call posts a message to the worker that runs the SQL engine ( 402 // or sends a HTTP request in case of the RPC+HTTP interface). 403 // - The returned QueryResult object is initially empty. 404 // - Over time, the sql engine will postMessage() back results in batches. 405 // - Each bach will end up calling this appendResultBatch() method. 406 // - If there is any pending promise (e.g. the caller called 407 // queryResult.waitAllRows()), this call will awake them (if this is the 408 // last batch). 409 appendResultBatch(resBytes: Uint8Array): void; 410} 411 412// The actual implementation, which bridges together the reader side and the 413// writer side (the one exposed to the Engine). This is the same object so that 414// when the engine pumps new row batches we can resolve pending promises that 415// readers (e.g. track code) are waiting for. 416class QueryResultImpl implements QueryResult, WritableQueryResult { 417 columnNames: string[] = []; 418 private _error?: string; 419 private _numRows = 0; 420 private _isComplete = false; 421 private _errorInfo: QueryErrorInfo; 422 private _statementCount = 0; 423 private _statementWithOutputCount = 0; 424 private _lastStatementSql = ''; 425 426 constructor(errorInfo: QueryErrorInfo) { 427 this._errorInfo = errorInfo; 428 } 429 430 // --- QueryResult implementation. 431 432 // TODO(primiano): for the moment new batches are appended but old batches 433 // are never removed. This won't work with abnormally large result sets, as 434 // it will stash all rows in memory. We could switch to a model where the 435 // iterator is destructive and deletes batch objects once iterating past the 436 // end of each batch. If we do that, than we need to assign monotonic IDs to 437 // batches. Also if we do that, we should prevent creating more than one 438 // iterator for a QueryResult. 439 batches: ResultBatch[] = []; 440 441 // Promise awaiting on waitAllRows(). This should be resolved only when the 442 // last result batch has been been retrieved. 443 private allRowsPromise?: Deferred<QueryResult>; 444 445 // Promise awaiting on waitMoreRows(). This resolved when the next 446 // batch is appended via appendResultBatch. 447 private moreRowsPromise?: Deferred<QueryResult>; 448 449 isComplete(): boolean { 450 return this._isComplete; 451 } 452 numRows(): number { 453 return this._numRows; 454 } 455 error(): string | undefined { 456 return this._error; 457 } 458 columns(): string[] { 459 return this.columnNames; 460 } 461 statementCount(): number { 462 return this._statementCount; 463 } 464 statementWithOutputCount(): number { 465 return this._statementWithOutputCount; 466 } 467 lastStatementSql(): string { 468 return this._lastStatementSql; 469 } 470 471 iter<T extends Row>(spec: T): RowIterator<T> { 472 const impl = new RowIteratorImplWithRowData(spec, this); 473 return impl as {} as RowIterator<T>; 474 } 475 476 firstRow<T extends Row>(spec: T): T { 477 const impl = new RowIteratorImplWithRowData(spec, this); 478 assertTrue(impl.valid()); 479 return impl as {} as RowIterator<T> as T; 480 } 481 482 maybeFirstRow<T extends Row>(spec: T): T | undefined { 483 const impl = new RowIteratorImplWithRowData(spec, this); 484 if (!impl.valid()) { 485 return undefined; 486 } 487 return impl as {} as RowIterator<T> as T; 488 } 489 490 // Can be called only once. 491 waitAllRows(): Promise<QueryResult> { 492 assertTrue(this.allRowsPromise === undefined); 493 this.allRowsPromise = defer<QueryResult>(); 494 if (this._isComplete) { 495 this.resolveOrReject(this.allRowsPromise, this); 496 } 497 return this.allRowsPromise; 498 } 499 500 waitMoreRows(): Promise<QueryResult> { 501 if (this.moreRowsPromise !== undefined) { 502 return this.moreRowsPromise; 503 } 504 505 const moreRowsPromise = defer<QueryResult>(); 506 if (this._isComplete) { 507 this.resolveOrReject(moreRowsPromise, this); 508 } else { 509 this.moreRowsPromise = moreRowsPromise; 510 } 511 return moreRowsPromise; 512 } 513 514 // --- WritableQueryResult implementation. 515 516 // Called by the engine when a new QueryResult is available. Note that a 517 // single Query() call can yield >1 QueryResult due to result batching 518 // if more than ~64K of data are returned, e.g. when returning O(M) rows. 519 // |resBytes| is a proto-encoded trace_processor.QueryResult message. 520 // It is fine to retain the resBytes without slicing a copy, because 521 // ProtoRingBuffer does the slice() for us (or passes through the buffer 522 // coming from postMessage() (Wasm case) of fetch() (HTTP+RPC case). 523 appendResultBatch(resBytes: Uint8Array) { 524 const reader = protobuf.Reader.create(resBytes); 525 assertTrue(reader.pos === 0); 526 const columnNamesEmptyAtStartOfBatch = this.columnNames.length === 0; 527 const columnNamesSet = new Set<string>(); 528 while (reader.pos < reader.len) { 529 const tag = reader.uint32(); 530 switch (tag >>> 3) { 531 case 1: // column_names 532 // Only the first batch should contain the column names. If this fires 533 // something is going wrong in the handling of the batch stream. 534 assertTrue(columnNamesEmptyAtStartOfBatch); 535 const origColName = reader.string(); 536 let colName = origColName; 537 // In some rare cases two columns can have the same name (b/194891824) 538 // e.g. `select 1 as x, 2 as x`. These queries don't happen in the 539 // UI code, but they can happen when the user types a query (e.g. 540 // with a join). The most practical thing we can do here is renaming 541 // the columns with a suffix. Keeping the same name will break when 542 // iterating, because column names become iterator object keys. 543 for (let i = 1; columnNamesSet.has(colName); ++i) { 544 colName = `${origColName}_${i}`; 545 assertTrue(i < 100); // Give up at some point; 546 } 547 columnNamesSet.add(colName); 548 this.columnNames.push(colName); 549 break; 550 case 2: // error 551 // The query has errored only if the |error| field is non-empty. 552 // In protos, we don't distinguish between non-present and empty. 553 // Make sure we don't propagate ambiguous empty strings to JS. 554 const err = reader.string(); 555 this._error = err !== undefined && err.length ? err : undefined; 556 break; 557 case 3: // batch 558 const batchLen = reader.uint32(); 559 const batchRaw = resBytes.subarray(reader.pos, reader.pos + batchLen); 560 reader.pos += batchLen; 561 562 // The ResultBatch ctor parses the CellsBatch submessage. 563 const parsedBatch = new ResultBatch(batchRaw); 564 this.batches.push(parsedBatch); 565 this._isComplete = parsedBatch.isLastBatch; 566 567 // In theory one could construct a valid proto serializing the column 568 // names after the cell batches. In practice the QueryResultSerializer 569 // doesn't do that so it's not worth complicating the code. 570 const numColumns = this.columnNames.length; 571 if (numColumns !== 0) { 572 assertTrue(parsedBatch.numCells % numColumns === 0); 573 this._numRows += parsedBatch.numCells / numColumns; 574 } else { 575 // numColumns == 0 is plausible for queries like CREATE TABLE ... . 576 assertTrue(parsedBatch.numCells === 0); 577 } 578 break; 579 580 case 4: 581 this._statementCount = reader.uint32(); 582 break; 583 584 case 5: 585 this._statementWithOutputCount = reader.uint32(); 586 break; 587 588 case 6: 589 this._lastStatementSql = reader.string(); 590 break; 591 592 default: 593 console.warn(`Unexpected QueryResult field ${tag >>> 3}`); 594 reader.skipType(tag & 7); 595 break; 596 } // switch (tag) 597 } // while (pos < end) 598 599 if (this.moreRowsPromise !== undefined) { 600 this.resolveOrReject(this.moreRowsPromise, this); 601 this.moreRowsPromise = undefined; 602 } 603 604 if (this._isComplete && this.allRowsPromise !== undefined) { 605 this.resolveOrReject(this.allRowsPromise, this); 606 } 607 } 608 609 ensureAllRowsPromise(): Promise<QueryResult> { 610 if (this.allRowsPromise === undefined) { 611 this.waitAllRows(); // Will populate |this.allRowsPromise|. 612 } 613 return assertExists(this.allRowsPromise); 614 } 615 616 get errorInfo(): QueryErrorInfo { 617 return this._errorInfo; 618 } 619 620 private resolveOrReject(promise: Deferred<QueryResult>, arg: QueryResult) { 621 if (this._error === undefined) { 622 promise.resolve(arg); 623 } else { 624 promise.reject(new QueryError(this._error, this._errorInfo)); 625 } 626 } 627} 628 629// This class holds onto a received result batch (a Uint8Array) and does some 630// partial parsing to tokenize the various cell groups. This parsing mainly 631// consists of identifying and caching the offsets of each cell group and 632// initializing the varint decoders. This half parsing is done to keep the 633// iterator's next() fast, without decoding everything into memory. 634// This is an internal implementation detail and is not exposed outside. The 635// RowIteratorImpl uses this class to iterate through batches (this class takes 636// care of iterating within a batch, RowIteratorImpl takes care of switching 637// batches when needed). 638// Note: at any point in time there can be more than one ResultIterator 639// referencing the same batch. The batch must be immutable. 640class ResultBatch { 641 readonly isLastBatch: boolean = false; 642 readonly batchBytes: Uint8Array; 643 readonly cellTypesOff: number = 0; 644 readonly cellTypesLen: number = 0; 645 readonly varintOff: number = 0; 646 readonly varintLen: number = 0; 647 readonly float64Cells = new Float64Array(); 648 readonly blobCells: Uint8Array[] = []; 649 readonly stringCells: string[] = []; 650 651 // batchBytes is a trace_processor.QueryResult.CellsBatch proto. 652 constructor(batchBytes: Uint8Array) { 653 this.batchBytes = batchBytes; 654 const reader = protobuf.Reader.create(batchBytes); 655 assertTrue(reader.pos === 0); 656 const end = reader.len; 657 658 // Here we deconstruct the proto by hand. The CellsBatch is carefully 659 // designed to allow a very fast parsing from the TS side. We pack all cells 660 // of the same types together, so we can do only one call (per batch) to 661 // TextDecoder.decode(), we can overlay a memory-aligned typedarray for 662 // float values and can quickly tell and type-check the cell types. 663 // One row = N cells (we know the number upfront from the outer message). 664 // Each bach contains always an integer multiple of N cells (i.e. rows are 665 // never fragmented across different batches). 666 while (reader.pos < end) { 667 const tag = reader.uint32(); 668 switch (tag >>> 3) { 669 case 1: // cell types, a packed array containing one CellType per cell. 670 assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint. 671 this.cellTypesLen = reader.uint32(); 672 this.cellTypesOff = reader.pos; 673 reader.pos += this.cellTypesLen; 674 break; 675 676 case 2: // varint_cells, a packed varint buffer. 677 assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint. 678 const packLen = reader.uint32(); 679 this.varintOff = reader.pos; 680 this.varintLen = packLen; 681 assertTrue(reader.buf === batchBytes); 682 assertTrue( 683 this.varintOff + this.varintLen <= 684 batchBytes.byteOffset + batchBytes.byteLength, 685 ); 686 reader.pos += packLen; 687 break; 688 689 case 3: // float64_cells, a 64-bit aligned packed fixed64 buffer. 690 assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint. 691 const f64Len = reader.uint32(); 692 assertTrue(f64Len % 8 === 0); 693 // Float64Array's constructor is evil: the offset is in bytes but the 694 // length is in 8-byte words. 695 const f64Words = f64Len / 8; 696 const f64Off = batchBytes.byteOffset + reader.pos; 697 if (f64Off % 8 === 0) { 698 this.float64Cells = new Float64Array( 699 batchBytes.buffer, 700 f64Off, 701 f64Words, 702 ); 703 } else { 704 // When using the production code in trace_processor's rpc.cc, the 705 // float64 should be 8-bytes aligned. The slow-path case is only for 706 // tests. 707 const slice = batchBytes.buffer.slice(f64Off, f64Off + f64Len); 708 this.float64Cells = new Float64Array(slice); 709 } 710 reader.pos += f64Len; 711 break; 712 713 case 4: // blob_cells: one entry per blob. 714 assertTrue((tag & 7) === TAG_LEN_DELIM); 715 // protobufjs's bytes() under the hoods calls slice() and creates 716 // a copy. Fine here as blobs are rare and not a fastpath. 717 this.blobCells.push(new Uint8Array(reader.bytes())); 718 break; 719 720 case 5: // string_cells: all the string cells concatenated with \0s. 721 assertTrue((tag & 7) === TAG_LEN_DELIM); 722 const strLen = reader.uint32(); 723 assertTrue(reader.pos + strLen <= end); 724 const subArr = batchBytes.subarray(reader.pos, reader.pos + strLen); 725 assertTrue(subArr.length === strLen); 726 // The reason why we do this split rather than creating one string 727 // per entry is that utf8 decoding has some non-negligible cost. See 728 // go/postmessage-benchmark . 729 this.stringCells = utf8Decode(subArr).split('\0'); 730 reader.pos += strLen; 731 break; 732 733 case 6: // is_last_batch (boolean). 734 this.isLastBatch = !!reader.bool(); 735 break; 736 737 case 7: // padding for realignment, skip silently. 738 reader.skipType(tag & 7); 739 break; 740 741 default: 742 console.warn(`Unexpected QueryResult.CellsBatch field ${tag >>> 3}`); 743 reader.skipType(tag & 7); 744 break; 745 } // switch(tag) 746 } // while (pos < end) 747 } 748 749 get numCells() { 750 return this.cellTypesLen; 751 } 752} 753 754class RowIteratorImpl implements RowIteratorBase { 755 // The spec passed to the iter call containing the expected types, e.g.: 756 // {'colA': NUM, 'colB': NUM_NULL, 'colC': STRING}. 757 // This doesn't ever change. 758 readonly rowSpec: Row; 759 760 // The object that holds the current row. This points to the parent 761 // RowIteratorImplWithRowData instance that created this class. 762 rowData: Row; 763 764 // The QueryResult object we are reading data from. The engine will pump 765 // batches over time into this object. 766 private resultObj: QueryResultImpl; 767 768 // All the member variables in the group below point to the identically-named 769 // members in result.batch[batchIdx]. This is to avoid indirection layers in 770 // the next() hotpath, so we can do this.float64Cells vs 771 // this.resultObj.batch[this.batchIdx].float64Cells. 772 // These are re-set every time tryMoveToNextBatch() is called (and succeeds). 773 private batchIdx = -1; // The batch index within |result.batches[]|. 774 private batchBytes = new Uint8Array(); 775 private columnNames: string[] = []; 776 private numColumns = 0; 777 private cellTypesEnd = -1; // -1 so the 1st next() hits tryMoveToNextBatch(). 778 private float64Cells = new Float64Array(); 779 private varIntReader = protobuf.Reader.create(this.batchBytes); 780 private blobCells: Uint8Array[] = []; 781 private stringCells: string[] = []; 782 783 // These members instead are incremented as we read cells from next(). They 784 // are the mutable state of the iterator. 785 private nextCellTypeOff = 0; 786 private nextFloat64Cell = 0; 787 private nextStringCell = 0; 788 private nextBlobCell = 0; 789 private isValid = false; 790 791 constructor(querySpec: Row, rowData: Row, res: QueryResultImpl) { 792 Object.assign(this, querySpec); 793 this.rowData = rowData; 794 this.rowSpec = {...querySpec}; // ... -> Copy all the key/value pairs. 795 this.resultObj = res; 796 this.next(); 797 } 798 799 valid(): boolean { 800 return this.isValid; 801 } 802 803 private makeError(message: string): QueryError { 804 return new QueryError(message, this.resultObj.errorInfo); 805 } 806 807 get(columnName: string): ColumnType { 808 const res = this.rowData[columnName]; 809 if (res === undefined) { 810 throw this.makeError( 811 `Column '${columnName}' doesn't exist. ` + 812 `Actual columns: [${this.columnNames.join(',')}]`, 813 ); 814 } 815 return res; 816 } 817 818 // Moves the cursor next by one row and updates |isValid|. 819 // When this fails to move, two cases are possible: 820 // 1. We reached the end of the result set (this is the case if 821 // QueryResult.isComplete() == true when this fails). 822 // 2. We reached the end of the current batch, but more rows might come later 823 // (if QueryResult.isComplete() == false). 824 next() { 825 // At some point we might reach the end of the current batch, but the next 826 // batch might be available already. In this case we want next() to 827 // transparently move on to the next batch. 828 while (this.nextCellTypeOff + this.numColumns > this.cellTypesEnd) { 829 // If TraceProcessor is behaving well, we should never end up in a 830 // situation where we have leftover cells. TP is expected to serialize 831 // whole rows in each QueryResult batch and NOT truncate them midway. 832 // If this assert fires the TP RPC logic has a bug. 833 assertTrue( 834 this.nextCellTypeOff === this.cellTypesEnd || this.cellTypesEnd === -1, 835 ); 836 if (!this.tryMoveToNextBatch()) { 837 this.isValid = false; 838 return; 839 } 840 } 841 842 const rowData = this.rowData; 843 const numColumns = this.numColumns; 844 845 // Read the current row. 846 for (let i = 0; i < numColumns; i++) { 847 const cellType = this.batchBytes[this.nextCellTypeOff++]; 848 const colName = this.columnNames[i]; 849 const expType = this.rowSpec[colName]; 850 851 switch (cellType) { 852 case CellType.CELL_NULL: 853 rowData[colName] = null; 854 break; 855 856 case CellType.CELL_VARINT: 857 if (expType === NUM || expType === NUM_NULL) { 858 // This is very subtle. The return type of int64 can be either a 859 // number or a Long.js {high:number, low:number} if Long.js is 860 // installed. The default state seems different in node and browser. 861 // We force-disable Long.js support in the top of this source file. 862 const val = this.varIntReader.int64(); 863 rowData[colName] = val as {} as number; 864 } else { 865 // LONG, LONG_NULL, or unspecified - return as bigint 866 const value = decodeInt64Varint( 867 this.batchBytes, 868 this.varIntReader.pos, 869 ); 870 rowData[colName] = value; 871 this.varIntReader.skip(); // Skips a varint 872 } 873 break; 874 875 case CellType.CELL_FLOAT64: 876 rowData[colName] = this.float64Cells[this.nextFloat64Cell++]; 877 break; 878 879 case CellType.CELL_STRING: 880 rowData[colName] = this.stringCells[this.nextStringCell++]; 881 break; 882 883 case CellType.CELL_BLOB: 884 const blob = this.blobCells[this.nextBlobCell++]; 885 rowData[colName] = blob; 886 break; 887 888 default: 889 throw this.makeError(`Invalid cell type ${cellType}`); 890 } 891 } // For (cells) 892 this.isValid = true; 893 } 894 895 private tryMoveToNextBatch(): boolean { 896 const nextBatchIdx = this.batchIdx + 1; 897 if (nextBatchIdx >= this.resultObj.batches.length) { 898 return false; 899 } 900 901 this.columnNames = this.resultObj.columnNames; 902 this.numColumns = this.columnNames.length; 903 904 this.batchIdx = nextBatchIdx; 905 const batch = assertExists(this.resultObj.batches[nextBatchIdx]); 906 this.batchBytes = batch.batchBytes; 907 this.nextCellTypeOff = batch.cellTypesOff; 908 this.cellTypesEnd = batch.cellTypesOff + batch.cellTypesLen; 909 this.float64Cells = batch.float64Cells; 910 this.blobCells = batch.blobCells; 911 this.stringCells = batch.stringCells; 912 this.varIntReader = protobuf.Reader.create(batch.batchBytes); 913 this.varIntReader.pos = batch.varintOff; 914 this.varIntReader.len = batch.varintOff + batch.varintLen; 915 this.nextFloat64Cell = 0; 916 this.nextStringCell = 0; 917 this.nextBlobCell = 0; 918 919 // Check that all the expected columns are present. 920 for (const expectedCol of Object.keys(this.rowSpec)) { 921 if (this.columnNames.indexOf(expectedCol) < 0) { 922 throw this.makeError( 923 `Column ${expectedCol} not found in the SQL result ` + 924 `set {${this.columnNames.join(' ')}}`, 925 ); 926 } 927 } 928 929 // Check that the cells types are consistent. 930 const numColumns = this.numColumns; 931 if (batch.numCells === 0) { 932 // This can happen if the query result contains just an error. In this 933 // an empty batch with isLastBatch=true is appended as an EOF marker. 934 // In theory TraceProcessor could return an empty batch in the middle and 935 // that would be fine from a protocol viewpoint. In practice, no code path 936 // does that today so it doesn't make sense trying supporting it with a 937 // recursive call to tryMoveToNextBatch(). 938 assertTrue(batch.isLastBatch); 939 return false; 940 } 941 942 assertTrue(numColumns > 0); 943 for (let i = this.nextCellTypeOff; i < this.cellTypesEnd; i++) { 944 const col = (i - this.nextCellTypeOff) % numColumns; 945 const colName = this.columnNames[col]; 946 const actualType = this.batchBytes[i] as CellType; 947 const expType = this.rowSpec[colName]; 948 949 // If undefined, the caller doesn't want to read this column at all, so 950 // it can be whatever. 951 if (expType === undefined) continue; 952 953 let err = ''; 954 if (!isCompatible(actualType, expType)) { 955 if (actualType === CellType.CELL_NULL) { 956 err = 957 'SQL value is NULL but that was not expected' + 958 ` (expected type: ${columnTypeToString(expType)}). ` + 959 'Did you mean NUM_NULL, LONG_NULL, STR_NULL or BLOB_NULL?'; 960 } else { 961 err = `Incompatible cell type. Expected: ${columnTypeToString( 962 expType, 963 )} actual: ${CELL_TYPE_NAMES[actualType]}`; 964 } 965 } 966 if (err.length > 0) { 967 const row = Math.floor(i / numColumns); 968 const message = `Error @ row: ${row} col: '${colName}': ${err}`; 969 throw this.makeError(message); 970 } 971 } 972 return true; 973 } 974} 975 976// This is the object ultimately returned to the client when calling 977// QueryResult.iter(...). 978// The only reason why this is disjoint from RowIteratorImpl is to avoid 979// naming collisions between the members variables required by RowIteratorImpl 980// and the column names returned by the iterator. 981class RowIteratorImplWithRowData implements RowIteratorBase { 982 private _impl: RowIteratorImpl; 983 984 next: () => void; 985 valid: () => boolean; 986 get: (columnName: string) => ColumnType; 987 988 constructor(querySpec: Row, res: QueryResultImpl) { 989 const thisAsRow = this as {} as Row; 990 Object.assign(thisAsRow, querySpec); 991 this._impl = new RowIteratorImpl(querySpec, thisAsRow, res); 992 this.next = this._impl.next.bind(this._impl); 993 this.valid = this._impl.valid.bind(this._impl); 994 this.get = this._impl.get.bind(this._impl); 995 } 996} 997 998// This is a proxy object that wraps QueryResultImpl, adding await-ability. 999// This is so that: 1000// 1. Clients that just want to await for the full result set can just call 1001// await engine.query('...') and will get a QueryResult that is guaranteed 1002// to be complete. 1003// 2. Clients that know how to handle the streaming can use it straight away. 1004class WaitableQueryResultImpl 1005 implements QueryResult, WritableQueryResult, PromiseLike<QueryResult> 1006{ 1007 private impl: QueryResultImpl; 1008 private thenCalled = false; 1009 1010 constructor(errorInfo: QueryErrorInfo) { 1011 this.impl = new QueryResultImpl(errorInfo); 1012 } 1013 1014 // QueryResult implementation. Proxies all calls to the impl object. 1015 iter<T extends Row>(spec: T) { 1016 return this.impl.iter(spec); 1017 } 1018 firstRow<T extends Row>(spec: T) { 1019 return this.impl.firstRow(spec); 1020 } 1021 maybeFirstRow<T extends Row>(spec: T) { 1022 return this.impl.maybeFirstRow(spec); 1023 } 1024 waitAllRows() { 1025 return this.impl.waitAllRows(); 1026 } 1027 waitMoreRows() { 1028 return this.impl.waitMoreRows(); 1029 } 1030 isComplete() { 1031 return this.impl.isComplete(); 1032 } 1033 numRows() { 1034 return this.impl.numRows(); 1035 } 1036 columns() { 1037 return this.impl.columns(); 1038 } 1039 error() { 1040 return this.impl.error(); 1041 } 1042 statementCount() { 1043 return this.impl.statementCount(); 1044 } 1045 statementWithOutputCount() { 1046 return this.impl.statementWithOutputCount(); 1047 } 1048 lastStatementSql() { 1049 return this.impl.lastStatementSql(); 1050 } 1051 1052 // WritableQueryResult implementation. 1053 appendResultBatch(resBytes: Uint8Array) { 1054 return this.impl.appendResultBatch(resBytes); 1055 } 1056 1057 // PromiseLike<QueryResult> implementaton. 1058 1059 // eslint-disable-next-line @typescript-eslint/no-explicit-any 1060 then(onfulfilled: any, onrejected: any): any { 1061 assertFalse(this.thenCalled); 1062 this.thenCalled = true; 1063 return this.impl.ensureAllRowsPromise().then(onfulfilled, onrejected); 1064 } 1065 1066 // eslint-disable-next-line @typescript-eslint/no-explicit-any 1067 catch(error: any): any { 1068 return this.impl.ensureAllRowsPromise().catch(error); 1069 } 1070 1071 // eslint-disable-next-line @typescript-eslint/no-explicit-any 1072 finally(callback: () => void): any { 1073 return this.impl.ensureAllRowsPromise().finally(callback); 1074 } 1075 1076 // eslint and clang-format disagree on how to format get[foo](). Let 1077 // clang-format win: 1078 get [Symbol.toStringTag](): string { 1079 return 'Promise<WaitableQueryResult>'; 1080 } 1081} 1082 1083export function createQueryResult( 1084 errorInfo: QueryErrorInfo, 1085): QueryResult & Promise<QueryResult> & WritableQueryResult { 1086 return new WaitableQueryResultImpl(errorInfo); 1087} 1088 1089// Throws if the value cannot be reasonably converted to a bigint. 1090// Assumes value is in native time units. 1091export function timeFromSql(value: ColumnType): time { 1092 if (typeof value === 'bigint') { 1093 return Time.fromRaw(value); 1094 } else if (typeof value === 'number') { 1095 return Time.fromRaw(BigInt(Math.floor(value))); 1096 } else if (value === null) { 1097 return Time.ZERO; 1098 } else { 1099 throw Error(`Refusing to create time from unrelated type ${value}`); 1100 } 1101} 1102 1103// Throws if the value cannot be reasonably converted to a bigint. 1104// Assumes value is in nanoseconds. 1105export function durationFromSql(value: ColumnType): duration { 1106 if (typeof value === 'bigint') { 1107 return value; 1108 } else if (typeof value === 'number') { 1109 return BigInt(Math.floor(value)); 1110 } else if (value === null) { 1111 return Duration.ZERO; 1112 } else { 1113 throw Error(`Refusing to create duration from unrelated type ${value}`); 1114 } 1115} 1116