1// Copyright (C) 2024 The Android Open Source Project 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15import {assertUnreachable} from '../base/logging'; 16import {getOrCreate} from '../base/utils'; 17import {checkExtends, ColumnType, SqlValue, unionTypes} from './query_result'; 18import {sqlValueToSqliteString} from './sql_utils'; 19 20/** 21 * A dataset defines a set of rows in TraceProcessor and a schema of the 22 * resultant columns. Dataset implementations describe how to get the data in 23 * different ways - e.g. 'source' datasets define a dataset as a table name (or 24 * select statement) + filters, whereas a 'union' dataset defines a dataset as 25 * the union of other datasets. 26 * 27 * The idea is that users can build arbitrarily complex trees of datasets, then 28 * at any point call `optimize()` to create the smallest possible tree that 29 * represents the same dataset, and `query()` which produces a select statement 30 * for the resultant dataset. 31 * 32 * Users can also use the `schema` property and `implements()` to get and test 33 * the schema of a given dataset. 34 */ 35export interface Dataset<T extends DatasetSchema = DatasetSchema> { 36 /** 37 * Get or calculate the resultant schema of this dataset. 38 */ 39 readonly schema: T; 40 41 /** 42 * Produce a query for this dataset. 43 * 44 * @param schema - The schema to use for extracting columns - if undefined, 45 * the most specific possible schema is evaluated from the dataset first and 46 * used instead. 47 */ 48 query(schema?: DatasetSchema): string; 49 50 /** 51 * Optimizes a dataset into the smallest possible expression. 52 * 53 * For example by combining elements of union data sets that have the same src 54 * and similar filters into a single set. 55 * 56 * For example, the following 'union' dataset... 57 * 58 * ``` 59 * { 60 * union: [ 61 * { 62 * src: 'foo', 63 * schema: { 64 * 'a': NUM, 65 * 'b': NUM, 66 * }, 67 * filter: {col: 'a', eq: 1}, 68 * }, 69 * { 70 * src: 'foo', 71 * schema: { 72 * 'a': NUM, 73 * 'b': NUM, 74 * }, 75 * filter: {col: 'a', eq: 2}, 76 * }, 77 * ] 78 * } 79 * ``` 80 * 81 * ...will be combined into a single 'source' dataset... 82 * 83 * ``` 84 * { 85 * src: 'foo', 86 * schema: { 87 * 'a': NUM, 88 * 'b': NUM, 89 * }, 90 * filter: {col: 'a', in: [1, 2]}, 91 * }, 92 * ``` 93 */ 94 optimize(): Dataset<T>; 95 96 /** 97 * Returns true if this dataset implements a given schema. 98 * 99 * @param schema - The schema to test against. 100 */ 101 implements(schema: DatasetSchema): boolean; 102} 103 104/** 105 * Defines a list of columns and types that define the shape of the data 106 * represented by a dataset. 107 */ 108export type DatasetSchema = Readonly<Record<string, ColumnType>>; 109 110/** 111 * A filter used to express that a column must equal a value. 112 */ 113interface EqFilter { 114 readonly col: string; 115 readonly eq: SqlValue; 116} 117 118/** 119 * A filter used to express that column must be one of a set of values. 120 */ 121interface InFilter { 122 readonly col: string; 123 readonly in: ReadonlyArray<SqlValue>; 124} 125 126/** 127 * Union of all filter types. 128 */ 129type Filter = EqFilter | InFilter; 130 131/** 132 * Named arguments for a SourceDataset. 133 */ 134interface SourceDatasetConfig<T extends DatasetSchema> { 135 readonly src: string; 136 readonly schema: T; 137 readonly filter?: Filter; 138} 139 140/** 141 * Defines a dataset with a source SQL select statement of table name, a 142 * schema describing the columns, and an optional filter. 143 */ 144export class SourceDataset<T extends DatasetSchema = DatasetSchema> 145 implements Dataset<T> 146{ 147 readonly src: string; 148 readonly schema: T; 149 readonly filter?: Filter; 150 151 constructor(config: SourceDatasetConfig<T>) { 152 this.src = config.src; 153 this.schema = config.schema; 154 this.filter = config.filter; 155 } 156 157 query(schema?: DatasetSchema) { 158 schema = schema ?? this.schema; 159 const cols = Object.keys(schema); 160 const selectSql = `select ${cols.join(', ')} from (${this.src})`; 161 const filterSql = this.filterQuery(); 162 if (filterSql === undefined) { 163 return selectSql; 164 } 165 return `${selectSql} where ${filterSql}`; 166 } 167 168 optimize() { 169 // Cannot optimize SourceDataset 170 return this; 171 } 172 173 implements(required: DatasetSchema) { 174 return Object.entries(required).every(([name, required]) => { 175 return name in this.schema && checkExtends(required, this.schema[name]); 176 }); 177 } 178 179 // Convert filter to a SQL expression (without the where clause), or undefined 180 // if we have no filter. 181 private filterQuery() { 182 if (!this.filter) return undefined; 183 184 if ('eq' in this.filter) { 185 return `${this.filter.col} = ${sqlValueToSqliteString(this.filter.eq)}`; 186 } else if ('in' in this.filter) { 187 return `${this.filter.col} in (${sqlValueToSqliteString(this.filter.in)})`; 188 } else { 189 assertUnreachable(this.filter); 190 } 191 } 192} 193 194/** 195 * Maximum number of sub-queries to include in a single union statement 196 * to avoid hitting SQLite limits. 197 * See: https://www.sqlite.org/limits.html#max_compound_select 198 */ 199const MAX_SUBQUERIES_PER_UNION = 500; 200 201/** 202 * A dataset that represents the union of multiple datasets. 203 */ 204export class UnionDataset implements Dataset { 205 constructor(readonly union: ReadonlyArray<Dataset>) {} 206 207 get schema(): DatasetSchema { 208 // Find the minimal set of columns that are supported by all datasets of 209 // the union 210 let unionSchema: Record<string, ColumnType> | undefined = undefined; 211 this.union.forEach((ds) => { 212 const dsSchema = ds.schema; 213 if (unionSchema === undefined) { 214 // First time just use this one 215 unionSchema = dsSchema; 216 } else { 217 const newSch: Record<string, ColumnType> = {}; 218 for (const [key, value] of Object.entries(unionSchema)) { 219 if (key in dsSchema) { 220 const commonType = unionTypes(value, dsSchema[key]); 221 if (commonType !== undefined) { 222 newSch[key] = commonType; 223 } 224 } 225 } 226 unionSchema = newSch; 227 } 228 }); 229 return unionSchema ?? {}; 230 } 231 232 query(schema?: DatasetSchema): string { 233 schema = schema ?? this.schema; 234 const subQueries = this.union.map((dataset) => dataset.query(schema)); 235 236 // If we have a small number of sub-queries, just use a single union all. 237 if (subQueries.length <= MAX_SUBQUERIES_PER_UNION) { 238 return subQueries.join('\nunion all\n'); 239 } 240 241 // Handle large number of sub-queries by batching into multiple CTEs. 242 let sql = 'with\n'; 243 const cteNames: string[] = []; 244 245 // Create CTEs for batches of sub-queries 246 for (let i = 0; i < subQueries.length; i += MAX_SUBQUERIES_PER_UNION) { 247 const batch = subQueries.slice(i, i + MAX_SUBQUERIES_PER_UNION); 248 const cteName = `union_batch_${Math.floor(i / MAX_SUBQUERIES_PER_UNION)}`; 249 cteNames.push(cteName); 250 251 sql += `${cteName} as (\n${batch.join('\nunion all\n')}\n)`; 252 253 // Add comma unless this is the last CTE. 254 if (i + MAX_SUBQUERIES_PER_UNION < subQueries.length) { 255 sql += ',\n'; 256 } 257 } 258 259 const cols = Object.keys(schema); 260 261 // Union all the CTEs together in the final query. 262 sql += '\n'; 263 sql += cteNames 264 .map((name) => `select ${cols.join(',')} from ${name}`) 265 .join('\nunion all\n'); 266 267 return sql; 268 } 269 270 optimize(): Dataset { 271 // Recursively optimize each dataset of this union 272 const optimizedUnion = this.union.map((ds) => ds.optimize()); 273 274 // Find all source datasets and combine then based on src 275 const combinedSrcSets = new Map<string, SourceDataset[]>(); 276 const otherDatasets: Dataset[] = []; 277 for (const e of optimizedUnion) { 278 if (e instanceof SourceDataset) { 279 const set = getOrCreate(combinedSrcSets, e.src, () => []); 280 set.push(e); 281 } else { 282 otherDatasets.push(e); 283 } 284 } 285 286 const mergedSrcSets = Array.from(combinedSrcSets.values()).map( 287 (srcGroup) => { 288 if (srcGroup.length === 1) return srcGroup[0]; 289 290 // Combine schema across all members in the union 291 const combinedSchema = srcGroup.reduce((acc, e) => { 292 Object.assign(acc, e.schema); 293 return acc; 294 }, {} as DatasetSchema); 295 296 // Merge filters for the same src 297 const inFilters: InFilter[] = []; 298 for (const {filter} of srcGroup) { 299 if (filter) { 300 if ('eq' in filter) { 301 inFilters.push({col: filter.col, in: [filter.eq]}); 302 } else { 303 inFilters.push(filter); 304 } 305 } 306 } 307 308 const mergedFilter = mergeFilters(inFilters); 309 return new SourceDataset({ 310 src: srcGroup[0].src, 311 schema: combinedSchema, 312 filter: mergedFilter, 313 }); 314 }, 315 ); 316 317 const finalUnion = [...mergedSrcSets, ...otherDatasets]; 318 319 if (finalUnion.length === 1) { 320 return finalUnion[0]; 321 } else { 322 return new UnionDataset(finalUnion); 323 } 324 } 325 326 implements(required: DatasetSchema) { 327 return Object.entries(required).every(([name, required]) => { 328 return name in this.schema && checkExtends(required, this.schema[name]); 329 }); 330 } 331} 332 333function mergeFilters(filters: InFilter[]): InFilter | undefined { 334 if (filters.length === 0) return undefined; 335 const col = filters[0].col; 336 const values = new Set(filters.flatMap((filter) => filter.in)); 337 return {col, in: Array.from(values)}; 338} 339