1/* 2 * Copyright (C) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16import { query } from './SqlLite.js'; 17 18class ProcedureThread extends Worker { 19 busy: boolean = false; 20 isCancelled: boolean = false; 21 id: number = -1; 22 taskMap: any = {}; 23 name: string | undefined; 24 25 uuid(): string { 26 // @ts-ignore 27 return ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (c) => 28 (c ^ (crypto.getRandomValues(new Uint8Array(1))[0] & (15 >> (c / 4)))).toString(16) 29 ); 30 } 31 32 queryFunc(type: string, args: any, transfer: any, handler: Function) { 33 this.busy = true; 34 let id = this.uuid(); 35 this.taskMap[id] = handler; 36 let pam = { 37 id: id, 38 type: type, 39 params: args, 40 }; 41 if (transfer) { 42 try { 43 if (Array.isArray(transfer)) { 44 if (transfer.length > 0) { 45 this.postMessage(pam, [...transfer]); 46 } else { 47 this.postMessage(pam); 48 } 49 } else { 50 this.postMessage(pam, [transfer]); 51 } 52 } catch (e: any) {} 53 } else { 54 this.postMessage(pam); 55 } 56 } 57 58 cancel() { 59 this.isCancelled = true; 60 this.terminate(); 61 } 62} 63 64class ProcedurePool { 65 static cpuCount = Math.floor((window.navigator.hardwareConcurrency || 4) / 2); 66 maxThreadNumber: number = 1; 67 works: Array<ProcedureThread> = []; 68 timelineChange: ((a: any) => void) | undefined | null = null; 69 cpusLen = ProcedurePool.build('cpu', 0); 70 freqLen = ProcedurePool.build('freq', 0); 71 processLen = ProcedurePool.build('process', 0); 72 logicDataLen = ProcedurePool.build('logic', 2); 73 names = [...this.cpusLen, ...this.processLen, ...this.freqLen]; 74 logicDataHandles = [...this.logicDataLen]; 75 76 onComplete: Function | undefined; //任务完成回调 77 78 constructor(threadBuild: (() => ProcedureThread) | undefined = undefined) { 79 this.init(threadBuild); 80 } 81 82 static build(name: string, len: number) { 83 return [...Array(len).keys()].map((it) => `${name}${it}`); 84 } 85 86 init(threadBuild: (() => ProcedureThread) | undefined = undefined) { 87 this.maxThreadNumber = this.names.length; 88 for (let i = 0; i < this.maxThreadNumber; i++) { 89 this.newThread(); 90 } 91 for (let j = 0; j < this.logicDataHandles.length; j++) { 92 this.logicDataThread(); 93 } 94 } 95 96 newThread() { 97 // @ts-ignore 98 if (window.useWb) { 99 return; 100 } 101 let newThread: ProcedureThread = new ProcedureThread('trace/database/ui-worker/ProcedureWorker.js', { 102 type: 'module', 103 }); 104 newThread.name = this.names[this.works.length]; 105 newThread.onmessage = (event: MessageEvent) => { 106 newThread.busy = false; 107 if ((event.data.type as string) == 'timeline-range-changed') { 108 this.timelineChange && this.timelineChange(event.data.results); 109 newThread.busy = false; 110 return; 111 } 112 if (Reflect.has(newThread.taskMap, event.data.id)) { 113 if (event.data) { 114 let fun = newThread.taskMap[event.data.id]; 115 if (fun) { 116 fun(event.data.results, event.data.hover); 117 } 118 Reflect.deleteProperty(newThread.taskMap, event.data.id); 119 } 120 } 121 if (this.isIdle() && this.onComplete) { 122 this.onComplete(); 123 } 124 }; 125 newThread.onmessageerror = (e) => {}; 126 newThread.onerror = (e) => {}; 127 newThread.id = this.works.length; 128 newThread.busy = false; 129 this.works?.push(newThread); 130 return newThread; 131 } 132 133 logicDataThread() { 134 // @ts-ignore 135 if (window.useWb) { 136 return; 137 } 138 let thread: ProcedureThread = new ProcedureThread('trace/database/logic-worker/ProcedureLogicWorker.js', { 139 type: 'module', 140 }); 141 thread.name = this.logicDataHandles[this.works.length - this.names.length]; 142 thread.onmessage = (event: MessageEvent) => { 143 thread.busy = false; 144 if (event.data.isQuery) { 145 query(event.data.type, event.data.sql, event.data.args, 'exec-buf').then((res: any) => { 146 thread.postMessage({ 147 type: event.data.type, 148 params: { 149 list: res, 150 }, 151 id: event.data.id, 152 }); 153 }); 154 return; 155 } 156 if (event.data.isSending) { 157 if (Reflect.has(thread.taskMap, event.data.id)) { 158 if (event.data) { 159 let fun = thread.taskMap[event.data.id]; 160 if (fun) { 161 fun(event.data.results, event.data.hover); 162 } 163 return; 164 } 165 } 166 } 167 if (Reflect.has(thread.taskMap, event.data.id)) { 168 if (event.data) { 169 let fun = thread.taskMap[event.data.id]; 170 if (fun) { 171 fun(event.data.results, event.data.hover); 172 } 173 Reflect.deleteProperty(thread.taskMap, event.data.id); 174 } 175 } 176 if (this.isIdle() && this.onComplete) { 177 this.onComplete(); 178 } 179 }; 180 thread.onmessageerror = (e) => {}; 181 thread.onerror = (e) => {}; 182 thread.id = this.works.length; 183 thread.busy = false; 184 this.works?.push(thread); 185 return thread; 186 } 187 188 close = () => { 189 for (let i = 0; i < this.works.length; i++) { 190 let thread = this.works[i]; 191 thread.terminate(); 192 } 193 this.works.length = 0; 194 }; 195 196 clearCache = () => { 197 for (let i = 0; i < this.works.length; i++) { 198 let thread = this.works[i]; 199 thread.queryFunc('clear', {}, undefined, () => {}); 200 } 201 }; 202 203 submitWithName(name: string, type: string, args: any, transfer: any, handler: Function): ProcedureThread | undefined { 204 let noBusyThreads = this.works.filter((it) => it.name === name); 205 let thread: ProcedureThread | undefined; 206 if (noBusyThreads.length > 0) { 207 //取第一个空闲的线程进行任务 208 thread = noBusyThreads[0]; 209 thread!.queryFunc(type, args, transfer, handler); 210 } 211 return thread; 212 } 213 214 submitWithNamePromise(name: string, type: string, args: any, transfer: any): Promise<any> { 215 return new Promise((resolve, reject) => { 216 let noBusyThreads = this.works.filter((it) => it.name === name); 217 let thread: ProcedureThread | undefined; 218 if (noBusyThreads.length > 0) { 219 //取第一个空闲的线程进行任务 220 thread = noBusyThreads[0]; 221 thread!.queryFunc(type, args, transfer, (res: any, hover: any) => { 222 resolve({ 223 res: res, 224 hover: hover, 225 }); 226 }); 227 } 228 }); 229 } 230 231 isIdle() { 232 return this.works.every((it) => !it.busy); 233 } 234} 235 236export const procedurePool = new ProcedurePool(); 237