1/* 2 * Copyright (C) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15class DataWorkerThread { 16 taskMap: any = {}; 17 worker?: Worker; 18 constructor(worker: Worker) { 19 this.worker = worker; 20 } 21 uuid(): string { 22 // @ts-ignore 23 return ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (c: any) => 24 (c ^ (crypto.getRandomValues(new Uint8Array(1))[0] & (15 >> (c / 4)))).toString(16) 25 ); 26 } 27 28 //发送方法名 参数 回调 29 queryFunc(action: string, args: any, handler: Function) { 30 let id = this.uuid(); 31 this.taskMap[id] = handler; 32 let msg = { 33 id: id, 34 action: action, 35 args: args, 36 }; 37 this.worker!.postMessage(msg); 38 } 39} 40 41class DbThread { 42 busy: boolean = false; 43 isCancelled: boolean = false; 44 id: number = -1; 45 taskMap: any = {}; 46 cacheArray: Array<any> = []; 47 worker?: Worker; 48 49 constructor(worker: Worker) { 50 this.worker = worker; 51 } 52 53 uuid(): string { 54 // @ts-ignore 55 return ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (c: any) => 56 (c ^ (crypto.getRandomValues(new Uint8Array(1))[0] & (15 >> (c / 4)))).toString(16) 57 ); 58 } 59 60 queryFunc(name: string, sql: string, args: any, handler: Function, action: string | null) { 61 this.busy = true; 62 let id = this.uuid(); 63 this.taskMap[id] = handler; 64 let msg = { 65 id: id, 66 name: name, 67 action: action || 'exec', 68 sql: sql, 69 params: args, 70 }; 71 this.worker?.postMessage(msg); 72 } 73 74 queryProto(name: number, args: any, handler: Function) { 75 this.busy = true; 76 let id = this.uuid(); 77 this.taskMap[id] = handler; 78 let msg = { 79 id: id, 80 name: name, 81 action: 'exec-proto', 82 params: args, 83 }; 84 this.worker?.postMessage(msg); 85 } 86 87 cutFileByRange( 88 leftTs: number, 89 rightTs: number, 90 handler: (status: boolean, msg: string, splitBuffer?: ArrayBuffer) => void 91 ) { 92 this.busy = true; 93 let id = this.uuid(); 94 this.taskMap[id] = (res: any) => { 95 DbPool.sharedBuffer = res.buffer; 96 if (res.cutStatus) { 97 handler(res.cutStatus, res.msg, res.cutBuffer); 98 } else { 99 handler(res.cutStatus, res.msg); 100 } 101 }; 102 caches.match(DbPool.fileCacheKey).then((resData) => { 103 if (resData) { 104 resData.arrayBuffer().then((buffer) => { 105 this.worker!.postMessage( 106 { 107 id: id, 108 action: 'cut-file', 109 leftTs: leftTs, 110 rightTs: rightTs, 111 buffer: buffer!, 112 }, 113 [buffer!] 114 ); 115 }); 116 } 117 }); 118 } 119 120 dbOpen = async ( 121 parseConfig: string, 122 sdkWasmConfig?: string, 123 buffer?: ArrayBuffer 124 ): Promise<{ 125 status: boolean; 126 msg: string; 127 buffer: ArrayBuffer; 128 sdkConfigMap: any; 129 fileKey: string; 130 }> => { 131 return new Promise<any>((resolve, reject) => { 132 let id = this.uuid(); 133 this.taskMap[id] = (res: any) => { 134 if (res.init) { 135 resolve({ 136 status: res.init, 137 msg: res.msg, 138 sdkConfigMap: res.configSqlMap, 139 buffer: res.buffer, 140 fileKey: res.fileKey, 141 }); 142 } else { 143 resolve({ status: res.init, msg: res.msg }); 144 } 145 }; 146 this.worker?.postMessage( 147 { 148 id: id, 149 action: 'open', 150 parseConfig: parseConfig, 151 wasmConfig: sdkWasmConfig, 152 buffer: buffer! /*Optional. An ArrayBuffer representing an SQLite Database file*/, 153 }, 154 [buffer!] 155 ); 156 }); 157 }; 158 159 resetWASM() { 160 this.worker?.postMessage({ 161 id: this.uuid(), 162 action: 'reset', 163 }); 164 } 165} 166 167export class DbPool { 168 static sharedBuffer: ArrayBuffer | null = null; 169 static fileCacheKey: string = 'null'; 170 maxThreadNumber: number = 0; 171 works: Array<DbThread> = []; 172 progress: Function | undefined | null; 173 num = Math.floor(Math.random() * 10 + 1) + 20; 174 cutDownTimer: any | undefined; 175 dataWorker: DataWorkerThread | undefined | null; 176 currentWasmThread: DbThread | undefined = undefined; 177 178 init = async (type: string, threadBuild: (() => DbThread) | undefined = undefined) => { 179 // wasm | server | sqlite 180 if (this.currentWasmThread) { 181 this.currentWasmThread.resetWASM(); 182 this.currentWasmThread = undefined; 183 } 184 await this.close(); 185 this.maxThreadNumber = 1; 186 for (let i = 0; i < this.maxThreadNumber; i++) { 187 let thread: DbThread | undefined; 188 if (threadBuild) { 189 thread = threadBuild(); 190 } else { 191 if (type === 'wasm') { 192 thread = new DbThread(new Worker(new URL('./TraceWorker', import.meta.url))); 193 } else if (type === 'server') { 194 thread = new DbThread(new Worker(new URL('./SqlLiteWorker', import.meta.url))); 195 } else if (type === 'sqlite') { 196 thread = new DbThread(new Worker(new URL('./SqlLiteWorker', import.meta.url))); 197 } 198 } 199 if (thread) { 200 this.currentWasmThread = thread; 201 thread!.worker!.onerror = (err) => { 202 console.warn(err); 203 }; 204 thread!.worker!.onmessageerror = (err) => { 205 console.warn(err); 206 }; 207 this.threadPostMessage(thread); 208 thread!.id = i; 209 thread!.busy = false; 210 this.works?.push(thread!); 211 } 212 } 213 }; 214 threadPostMessage(thread: DbThread) { 215 thread!.worker!.onmessage = (event: MessageEvent) => { 216 thread!.busy = false; 217 if (Reflect.has(thread!.taskMap, event.data.id)) { 218 if (event.data.results) { 219 let fun = thread!.taskMap[event.data.id]; 220 if (fun) { 221 fun(event.data.results, event.data.len, event.data.transfer, event.data.isEmpty); 222 } 223 Reflect.deleteProperty(thread!.taskMap, event.data.id); 224 } else if (Reflect.has(event.data, 'cutStatus')) { 225 let fun = thread!.taskMap[event.data.id]; 226 if (fun) { 227 fun(event.data); 228 } 229 } else if (Reflect.has(event.data, 'ready')) { 230 this.progress!('database opened', this.num + event.data.index); 231 this.progressTimer(this.num + event.data.index, this.progress!); 232 DbPool.sharedBuffer = null; 233 } else if (Reflect.has(event.data, 'init')) { 234 if (this.cutDownTimer != undefined) { 235 clearInterval(this.cutDownTimer); 236 } 237 let fun = thread!.taskMap[event.data.id]; 238 if (!event.data.init && !event.data.status) { 239 if (fun) { 240 fun(['error', event.data.msg]); 241 } 242 } else { 243 this.progress!('database ready', 40); 244 if (fun) { 245 fun(event.data); 246 } 247 } 248 Reflect.deleteProperty(thread!.taskMap, event.data.id); 249 } else { 250 let fun = thread!.taskMap[event.data.id]; 251 if (fun) { 252 fun([]); 253 } 254 Reflect.deleteProperty(thread!.taskMap, event.data.id); 255 } 256 } 257 }; 258 } 259 260 initServer = async (url: string, progress: Function): Promise<{ status: boolean; msg: string }> => { 261 this.progress = progress; 262 progress('database loaded', 15); 263 DbPool.sharedBuffer = await fetch(url).then((res) => res.arrayBuffer()); 264 progress('open database', 20); 265 for (let thread of this.works) { 266 let { status, msg } = await thread.dbOpen(''); 267 if (!status) { 268 DbPool.sharedBuffer = null; 269 return { status, msg }; 270 } 271 } 272 return { status: true, msg: 'ok' }; 273 }; 274 initSqlite = async (buf: ArrayBuffer, parseConfig: string, sdkWasmConfig: string, progress: Function) => { 275 this.progress = progress; 276 progress('database loaded', 15); 277 DbPool.sharedBuffer = buf; 278 progress('parse database', 20); 279 let configMap; 280 for (let thread of this.works) { 281 let { status, msg, buffer, sdkConfigMap, fileKey } = await thread.dbOpen(parseConfig, sdkWasmConfig, buf); 282 if (!status) { 283 DbPool.sharedBuffer = null; 284 return { status, msg }; 285 } else { 286 configMap = sdkConfigMap; 287 DbPool.sharedBuffer = buffer; 288 if (fileKey !== '-1') { 289 DbPool.fileCacheKey = fileKey; 290 } else { 291 DbPool.fileCacheKey = `trace/${new Date().getTime()}-${buffer.byteLength}`; 292 this.saveTraceFileBuffer(DbPool.fileCacheKey, buffer).then(); 293 } 294 } 295 } 296 return { status: true, msg: 'ok', sdkConfigMap: configMap }; 297 }; 298 299 async saveTraceFileBuffer(key: string, buffer: ArrayBuffer): Promise<void> { 300 await this.obligateFileBufferSpace(buffer.byteLength); 301 caches.open(key).then((cache) => { 302 let headers = new Headers(); 303 headers.append('Content-Length', `${buffer.byteLength}`); 304 headers.append('Content-Type', 'application/octet-stream'); 305 cache 306 .put( 307 key, 308 new Response(buffer, { 309 status: 200, 310 headers: headers, 311 }) 312 ) 313 .then(); 314 }); 315 } 316 317 /** 318 * 计算预留缓存空间,如果空间不够,则删除部分缓存 319 * @param size 320 */ 321 async obligateFileBufferSpace(size: number): Promise<void> { 322 let es = await navigator.storage.estimate(); 323 let remainderByte = (es.quota || 0) - (es.usage || 0) - 20 * 1024 * 1024; 324 if (remainderByte < size) { 325 let keys = await caches.keys(); 326 keys.sort((keyA, keyB) => { 327 if (keyA.includes('/') && keyB.includes('/')) { 328 let splitA = keyA.split('/'); 329 let splitB = keyB.split('/'); 330 let timeA = splitA[splitA.length - 1].split('-')[0]; 331 let timeB = splitB[splitB.length - 1].split('-')[0]; 332 return parseInt(timeA) - parseInt(timeB); 333 } else { 334 return 0; 335 } 336 }); 337 let needSize = size - remainderByte; 338 for (let key of keys) { 339 await caches.delete(key); 340 let keySize = parseInt(key.split('-')[1]); 341 if (keySize > needSize) { 342 return; 343 } else { 344 needSize -= keySize; 345 } 346 } 347 } 348 } 349 350 close = async () => { 351 clearInterval(this.cutDownTimer); 352 for (let thread of this.works) { 353 thread.worker?.terminate(); 354 } 355 this.works.length = 0; 356 }; 357 358 submit(name: string, sql: string, args: any, handler: Function, action: string | null) { 359 let noBusyThreads = this.works.filter((it) => !it.busy); 360 let thread: DbThread; 361 if (noBusyThreads.length > 0) { 362 //取第一个空闲的线程进行任务 363 thread = noBusyThreads[0]; 364 thread.queryFunc(name, sql, args, handler, action); 365 } else { 366 // 随机插入一个线程中 367 thread = this.works[Math.floor(Math.random() * this.works.length)]; 368 thread.queryFunc(name, sql, args, handler, action); 369 } 370 } 371 372 submitProto(name: number, args: any, handler: Function) { 373 let noBusyThreads = this.works.filter((it) => !it.busy); 374 let thread: DbThread; 375 if (noBusyThreads.length > 0) { 376 //取第一个空闲的线程进行任务 377 thread = noBusyThreads[0]; 378 thread.queryProto(name, args, handler); 379 } else { 380 // 随机插入一个线程中 381 thread = this.works[Math.floor(Math.random() * this.works.length)]; 382 if (thread) { 383 thread.queryProto(name, args, handler); 384 } 385 } 386 } 387 388 //new method replace submit() method 389 submitTask(action: string, args: any, handler: Function) { 390 this.dataWorker?.queryFunc(action, args, handler); 391 } 392 393 cutFile(leftTs: number, rightTs: number, handler: (status: boolean, msg: string, splitBuffer?: ArrayBuffer) => void) { 394 let noBusyThreads = this.works.filter((it) => !it.busy); 395 let thread: DbThread; 396 if (noBusyThreads.length > 0) { 397 thread = noBusyThreads[0]; 398 thread.cutFileByRange(leftTs, rightTs, handler); 399 } else { 400 thread = this.works[Math.floor(Math.random() * this.works.length)]; 401 thread.cutFileByRange(leftTs, rightTs, handler); 402 } 403 } 404 405 progressTimer(num: number, progress: Function) { 406 let currentNum = num; 407 clearInterval(this.cutDownTimer); 408 this.cutDownTimer = setInterval(() => { 409 currentNum += Math.floor(Math.random() * 3); 410 if (currentNum >= 50) { 411 progress('database opened', 40); 412 clearInterval(this.cutDownTimer); 413 } else { 414 progress('database opened', currentNum); 415 } 416 }, Math.floor(Math.random() * 2500 + 1000)); 417 } 418} 419 420export const threadPool = new DbPool(); 421 422export function query<T>(name: string, sql: string, args: any = null, action: string | null = null): Promise<Array<T>> { 423 return new Promise<Array<T>>((resolve, reject) => { 424 threadPool.submit( 425 name, 426 sql, 427 args, 428 (res: any) => { 429 if (res[0] && res[0] === 'error') { 430 window.publish(window.SmartEvent.UI.Error, res[1]); 431 reject(res); 432 } else { 433 resolve(res); 434 } 435 }, 436 action 437 ); 438 }); 439} 440