• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 * Copyright (C) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15class DataWorkerThread {
16  taskMap: any = {};
17  worker?: Worker;
18  constructor(worker: Worker) {
19    this.worker = worker;
20  }
21  uuid(): string {
22    // @ts-ignore
23    return ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (c: any) =>
24      (c ^ (crypto.getRandomValues(new Uint8Array(1))[0] & (15 >> (c / 4)))).toString(16)
25    );
26  }
27
28  //发送方法名 参数 回调
29  queryFunc(action: string, args: any, handler: Function) {
30    let id = this.uuid();
31    this.taskMap[id] = handler;
32    let msg = {
33      id: id,
34      action: action,
35      args: args,
36    };
37    this.worker!.postMessage(msg);
38  }
39}
40
41class DbThread {
42  busy: boolean = false;
43  isCancelled: boolean = false;
44  id: number = -1;
45  taskMap: any = {};
46  cacheArray: Array<any> = [];
47  worker?: Worker;
48
49  constructor(worker: Worker) {
50    this.worker = worker;
51  }
52
53  uuid(): string {
54    // @ts-ignore
55    return ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (c: any) =>
56      (c ^ (crypto.getRandomValues(new Uint8Array(1))[0] & (15 >> (c / 4)))).toString(16)
57    );
58  }
59
60  queryFunc(name: string, sql: string, args: any, handler: Function, action: string | null) {
61    this.busy = true;
62    let id = this.uuid();
63    this.taskMap[id] = handler;
64    let msg = {
65      id: id,
66      name: name,
67      action: action || 'exec',
68      sql: sql,
69      params: args,
70    };
71    this.worker?.postMessage(msg);
72  }
73
74  queryProto(name: number, args: any, handler: Function) {
75    this.busy = true;
76    let id = this.uuid();
77    this.taskMap[id] = handler;
78    let msg = {
79      id: id,
80      name: name,
81      action: 'exec-proto',
82      params: args,
83    };
84    this.worker?.postMessage(msg);
85  }
86
87  cutFileByRange(
88    leftTs: number,
89    rightTs: number,
90    handler: (status: boolean, msg: string, splitBuffer?: ArrayBuffer) => void
91  ) {
92    this.busy = true;
93    let id = this.uuid();
94    this.taskMap[id] = (res: any) => {
95      DbPool.sharedBuffer = res.buffer;
96      if (res.cutStatus) {
97        handler(res.cutStatus, res.msg, res.cutBuffer);
98      } else {
99        handler(res.cutStatus, res.msg);
100      }
101    };
102    caches.match(DbPool.fileCacheKey).then((resData) => {
103      if (resData) {
104        resData.arrayBuffer().then((buffer) => {
105          this.worker!.postMessage(
106            {
107              id: id,
108              action: 'cut-file',
109              leftTs: leftTs,
110              rightTs: rightTs,
111              buffer: buffer!,
112            },
113            [buffer!]
114          );
115        });
116      }
117    });
118  }
119
120  dbOpen = async (
121    parseConfig: string,
122    sdkWasmConfig?: string,
123    buffer?: ArrayBuffer
124  ): Promise<{
125    status: boolean;
126    msg: string;
127    buffer: ArrayBuffer;
128    sdkConfigMap: any;
129    fileKey: string;
130  }> => {
131    return new Promise<any>((resolve, reject) => {
132      let id = this.uuid();
133      this.taskMap[id] = (res: any) => {
134        if (res.init) {
135          resolve({
136            status: res.init,
137            msg: res.msg,
138            sdkConfigMap: res.configSqlMap,
139            buffer: res.buffer,
140            fileKey: res.fileKey,
141          });
142        } else {
143          resolve({ status: res.init, msg: res.msg });
144        }
145      };
146      this.worker?.postMessage(
147        {
148          id: id,
149          action: 'open',
150          parseConfig: parseConfig,
151          wasmConfig: sdkWasmConfig,
152          buffer: buffer! /*Optional. An ArrayBuffer representing an SQLite Database file*/,
153        },
154        [buffer!]
155      );
156    });
157  };
158
159  resetWASM() {
160    this.worker?.postMessage({
161      id: this.uuid(),
162      action: 'reset',
163    });
164  }
165}
166
167export class DbPool {
168  static sharedBuffer: ArrayBuffer | null = null;
169  static fileCacheKey: string = 'null';
170  maxThreadNumber: number = 0;
171  works: Array<DbThread> = [];
172  progress: Function | undefined | null;
173  num = Math.floor(Math.random() * 10 + 1) + 20;
174  cutDownTimer: any | undefined;
175  dataWorker: DataWorkerThread | undefined | null;
176  currentWasmThread: DbThread | undefined = undefined;
177
178  init = async (type: string, threadBuild: (() => DbThread) | undefined = undefined) => {
179    // wasm | server | sqlite
180    if (this.currentWasmThread) {
181      this.currentWasmThread.resetWASM();
182      this.currentWasmThread = undefined;
183    }
184    await this.close();
185    this.maxThreadNumber = 1;
186    for (let i = 0; i < this.maxThreadNumber; i++) {
187      let thread: DbThread | undefined;
188      if (threadBuild) {
189        thread = threadBuild();
190      } else {
191        if (type === 'wasm') {
192          thread = new DbThread(new Worker(new URL('./TraceWorker', import.meta.url)));
193        } else if (type === 'server') {
194          thread = new DbThread(new Worker(new URL('./SqlLiteWorker', import.meta.url)));
195        } else if (type === 'sqlite') {
196          thread = new DbThread(new Worker(new URL('./SqlLiteWorker', import.meta.url)));
197        }
198      }
199      if (thread) {
200        this.currentWasmThread = thread;
201        thread!.worker!.onerror = (err) => {
202          console.warn(err);
203        };
204        thread!.worker!.onmessageerror = (err) => {
205          console.warn(err);
206        };
207        this.threadPostMessage(thread);
208        thread!.id = i;
209        thread!.busy = false;
210        this.works?.push(thread!);
211      }
212    }
213  };
214  threadPostMessage(thread: DbThread) {
215    thread!.worker!.onmessage = (event: MessageEvent) => {
216      thread!.busy = false;
217      if (Reflect.has(thread!.taskMap, event.data.id)) {
218        if (event.data.results) {
219          let fun = thread!.taskMap[event.data.id];
220          if (fun) {
221            fun(event.data.results, event.data.len, event.data.transfer, event.data.isEmpty);
222          }
223          Reflect.deleteProperty(thread!.taskMap, event.data.id);
224        } else if (Reflect.has(event.data, 'cutStatus')) {
225          let fun = thread!.taskMap[event.data.id];
226          if (fun) {
227            fun(event.data);
228          }
229        } else if (Reflect.has(event.data, 'ready')) {
230          this.progress!('database opened', this.num + event.data.index);
231          this.progressTimer(this.num + event.data.index, this.progress!);
232          DbPool.sharedBuffer = null;
233        } else if (Reflect.has(event.data, 'init')) {
234          if (this.cutDownTimer != undefined) {
235            clearInterval(this.cutDownTimer);
236          }
237          let fun = thread!.taskMap[event.data.id];
238          if (!event.data.init && !event.data.status) {
239            if (fun) {
240              fun(['error', event.data.msg]);
241            }
242          } else {
243            this.progress!('database ready', 40);
244            if (fun) {
245              fun(event.data);
246            }
247          }
248          Reflect.deleteProperty(thread!.taskMap, event.data.id);
249        } else {
250          let fun = thread!.taskMap[event.data.id];
251          if (fun) {
252            fun([]);
253          }
254          Reflect.deleteProperty(thread!.taskMap, event.data.id);
255        }
256      }
257    };
258  }
259
260  initServer = async (url: string, progress: Function): Promise<{ status: boolean; msg: string }> => {
261    this.progress = progress;
262    progress('database loaded', 15);
263    DbPool.sharedBuffer = await fetch(url).then((res) => res.arrayBuffer());
264    progress('open database', 20);
265    for (let thread of this.works) {
266      let { status, msg } = await thread.dbOpen('');
267      if (!status) {
268        DbPool.sharedBuffer = null;
269        return { status, msg };
270      }
271    }
272    return { status: true, msg: 'ok' };
273  };
274  initSqlite = async (buf: ArrayBuffer, parseConfig: string, sdkWasmConfig: string, progress: Function) => {
275    this.progress = progress;
276    progress('database loaded', 15);
277    DbPool.sharedBuffer = buf;
278    progress('parse database', 20);
279    let configMap;
280    for (let thread of this.works) {
281      let { status, msg, buffer, sdkConfigMap, fileKey } = await thread.dbOpen(parseConfig, sdkWasmConfig, buf);
282      if (!status) {
283        DbPool.sharedBuffer = null;
284        return { status, msg };
285      } else {
286        configMap = sdkConfigMap;
287        DbPool.sharedBuffer = buffer;
288        if (fileKey !== '-1') {
289          DbPool.fileCacheKey = fileKey;
290        } else {
291          DbPool.fileCacheKey = `trace/${new Date().getTime()}-${buffer.byteLength}`;
292          this.saveTraceFileBuffer(DbPool.fileCacheKey, buffer).then();
293        }
294      }
295    }
296    return { status: true, msg: 'ok', sdkConfigMap: configMap };
297  };
298
299  async saveTraceFileBuffer(key: string, buffer: ArrayBuffer): Promise<void> {
300    await this.obligateFileBufferSpace(buffer.byteLength);
301    caches.open(key).then((cache) => {
302      let headers = new Headers();
303      headers.append('Content-Length', `${buffer.byteLength}`);
304      headers.append('Content-Type', 'application/octet-stream');
305      cache
306        .put(
307          key,
308          new Response(buffer, {
309            status: 200,
310            headers: headers,
311          })
312        )
313        .then();
314    });
315  }
316
317  /**
318   * 计算预留缓存空间,如果空间不够,则删除部分缓存
319   * @param size
320   */
321  async obligateFileBufferSpace(size: number): Promise<void> {
322    let es = await navigator.storage.estimate();
323    let remainderByte = (es.quota || 0) - (es.usage || 0) - 20 * 1024 * 1024;
324    if (remainderByte < size) {
325      let keys = await caches.keys();
326      keys.sort((keyA, keyB) => {
327        if (keyA.includes('/') && keyB.includes('/')) {
328          let splitA = keyA.split('/');
329          let splitB = keyB.split('/');
330          let timeA = splitA[splitA.length - 1].split('-')[0];
331          let timeB = splitB[splitB.length - 1].split('-')[0];
332          return parseInt(timeA) - parseInt(timeB);
333        } else {
334          return 0;
335        }
336      });
337      let needSize = size - remainderByte;
338      for (let key of keys) {
339        await caches.delete(key);
340        let keySize = parseInt(key.split('-')[1]);
341        if (keySize > needSize) {
342          return;
343        } else {
344          needSize -= keySize;
345        }
346      }
347    }
348  }
349
350  close = async () => {
351    clearInterval(this.cutDownTimer);
352    for (let thread of this.works) {
353      thread.worker?.terminate();
354    }
355    this.works.length = 0;
356  };
357
358  submit(name: string, sql: string, args: any, handler: Function, action: string | null) {
359    let noBusyThreads = this.works.filter((it) => !it.busy);
360    let thread: DbThread;
361    if (noBusyThreads.length > 0) {
362      //取第一个空闲的线程进行任务
363      thread = noBusyThreads[0];
364      thread.queryFunc(name, sql, args, handler, action);
365    } else {
366      // 随机插入一个线程中
367      thread = this.works[Math.floor(Math.random() * this.works.length)];
368      thread.queryFunc(name, sql, args, handler, action);
369    }
370  }
371
372  submitProto(name: number, args: any, handler: Function) {
373    let noBusyThreads = this.works.filter((it) => !it.busy);
374    let thread: DbThread;
375    if (noBusyThreads.length > 0) {
376      //取第一个空闲的线程进行任务
377      thread = noBusyThreads[0];
378      thread.queryProto(name, args, handler);
379    } else {
380      // 随机插入一个线程中
381      thread = this.works[Math.floor(Math.random() * this.works.length)];
382      if (thread) {
383        thread.queryProto(name, args, handler);
384      }
385    }
386  }
387
388  //new method replace submit() method
389  submitTask(action: string, args: any, handler: Function) {
390    this.dataWorker?.queryFunc(action, args, handler);
391  }
392
393  cutFile(leftTs: number, rightTs: number, handler: (status: boolean, msg: string, splitBuffer?: ArrayBuffer) => void) {
394    let noBusyThreads = this.works.filter((it) => !it.busy);
395    let thread: DbThread;
396    if (noBusyThreads.length > 0) {
397      thread = noBusyThreads[0];
398      thread.cutFileByRange(leftTs, rightTs, handler);
399    } else {
400      thread = this.works[Math.floor(Math.random() * this.works.length)];
401      thread.cutFileByRange(leftTs, rightTs, handler);
402    }
403  }
404
405  progressTimer(num: number, progress: Function) {
406    let currentNum = num;
407    clearInterval(this.cutDownTimer);
408    this.cutDownTimer = setInterval(() => {
409      currentNum += Math.floor(Math.random() * 3);
410      if (currentNum >= 50) {
411        progress('database opened', 40);
412        clearInterval(this.cutDownTimer);
413      } else {
414        progress('database opened', currentNum);
415      }
416    }, Math.floor(Math.random() * 2500 + 1000));
417  }
418}
419
420export const threadPool = new DbPool();
421
422export function query<T>(name: string, sql: string, args: any = null, action: string | null = null): Promise<Array<T>> {
423  return new Promise<Array<T>>((resolve, reject) => {
424    threadPool.submit(
425      name,
426      sql,
427      args,
428      (res: any) => {
429        if (res[0] && res[0] === 'error') {
430          window.publish(window.SmartEvent.UI.Error, res[1]);
431          reject(res);
432        } else {
433          resolve(res);
434        }
435      },
436      action
437    );
438  });
439}
440