1/* 2 * Copyright (C) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16import { query } from './SqlLite'; 17 18class ProcedureThread { 19 busy: boolean = false; 20 isCancelled: boolean = false; 21 id: number = -1; 22 taskMap: any = {}; 23 name: string | undefined; 24 worker?: Worker; 25 constructor(worker: Worker) { 26 this.worker = worker; 27 } 28 uuid(): string { 29 // @ts-ignore 30 return ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (c) => 31 (c ^ (crypto.getRandomValues(new Uint8Array(1))[0] & (15 >> (c / 4)))).toString(16) 32 ); 33 } 34 35 queryFunc(type: string, args: any, transfer: any, handler: Function) { 36 this.busy = true; 37 let id = this.uuid(); 38 this.taskMap[id] = handler; 39 let pam = { 40 id: id, 41 type: type, 42 params: args, 43 }; 44 if (transfer) { 45 try { 46 if (Array.isArray(transfer)) { 47 if (transfer.length > 0) { 48 this.worker!.postMessage(pam, [...transfer]); 49 } else { 50 this.worker!.postMessage(pam); 51 } 52 } else { 53 this.worker!.postMessage(pam, [transfer]); 54 } 55 } catch (e: any) {} 56 } else { 57 this.worker!.postMessage(pam); 58 } 59 } 60 61 cancel() { 62 this.isCancelled = true; 63 this.worker!.terminate(); 64 } 65} 66 67class ProcedurePool { 68 static cpuCount = Math.floor((window.navigator.hardwareConcurrency || 4) / 2); 69 maxThreadNumber: number = 1; 70 works: Array<ProcedureThread> = []; 71 timelineChange: ((a: any) => void) | undefined | null = null; 72 cpusLen = ProcedurePool.build('cpu', 0); 73 freqLen = ProcedurePool.build('freq', 0); 74 processLen = ProcedurePool.build('process', 0); 75 logicDataLen = ProcedurePool.build('logic', 2); 76 names = [...this.cpusLen, ...this.processLen, ...this.freqLen]; 77 logicDataHandles = [...this.logicDataLen]; 78 79 onComplete: Function | undefined; //任务完成回调 80 81 constructor(threadBuild: (() => ProcedureThread) | undefined = undefined) { 82 this.init(threadBuild); 83 } 84 85 static build(name: string, len: number) { 86 return [...Array(len).keys()].map((it) => `${name}${it}`); 87 } 88 89 init(threadBuild: (() => ProcedureThread) | undefined = undefined) { 90 this.maxThreadNumber = this.names.length; 91 for (let i = 0; i < this.maxThreadNumber; i++) { 92 this.newThread(); 93 } 94 for (let j = 0; j < this.logicDataHandles.length; j++) { 95 this.logicDataThread(); 96 } 97 } 98 99 newThread() { 100 // @ts-ignore 101 if (window.useWb) { 102 return; 103 } 104 let newThread: ProcedureThread = new ProcedureThread( 105 new Worker(new URL('./ui-worker/ProcedureWorker', import.meta.url), { 106 type: 'module', 107 }) 108 ); 109 newThread.name = this.names[this.works.length]; 110 newThread.worker!.onmessage = (event: MessageEvent) => { 111 newThread.busy = false; 112 if ((event.data.type as string) == 'timeline-range-changed') { 113 this.timelineChange?.(event.data.results); 114 newThread.busy = false; 115 return; 116 } 117 if (Reflect.has(newThread.taskMap, event.data.id)) { 118 if (event.data) { 119 let fun = newThread.taskMap[event.data.id]; 120 if (fun) { 121 fun(event.data.results, event.data.hover); 122 } 123 Reflect.deleteProperty(newThread.taskMap, event.data.id); 124 } 125 } 126 if (this.isIdle() && this.onComplete) { 127 this.onComplete(); 128 } 129 }; 130 newThread.worker!.onmessageerror = (e) => {}; 131 newThread.worker!.onerror = (e) => {}; 132 newThread.id = this.works.length; 133 newThread.busy = false; 134 this.works?.push(newThread); 135 return newThread; 136 } 137 138 private logicDataThread(): ProcedureThread | undefined { 139 // @ts-ignore 140 if (window.useWb) { 141 return; 142 } 143 let thread: ProcedureThread = new ProcedureThread( 144 new Worker(new URL('./logic-worker/ProcedureLogicWorker', import.meta.url), { 145 type: 'module', 146 }) 147 ); 148 thread.name = this.logicDataHandles[this.works.length - this.names.length]; 149 this.sendMessage(thread); 150 thread.worker!.onmessageerror = (e) => {}; 151 thread.worker!.onerror = (e) => {}; 152 thread.id = this.works.length; 153 thread.busy = false; 154 this.works?.push(thread); 155 return thread; 156 } 157 158 private sendMessage(thread: ProcedureThread): void { 159 thread.worker!.onmessage = (event: MessageEvent) => { 160 thread.busy = false; 161 if (event.data.isQuery) { 162 query(event.data.type, event.data.sql, event.data.args, 'exec-buf').then((res: any) => { 163 thread.worker!.postMessage({ 164 type: event.data.type, 165 params: { 166 list: res, 167 }, 168 id: event.data.id, 169 }); 170 }); 171 return; 172 } 173 if (event.data.isSending) { 174 if (Reflect.has(thread.taskMap, event.data.id)) { 175 if (event.data) { 176 let fun = thread.taskMap[event.data.id]; 177 if (fun) { 178 fun(event.data.results, event.data.hover); 179 } 180 return; 181 } 182 } 183 } 184 if (Reflect.has(thread.taskMap, event.data.id)) { 185 if (event.data) { 186 let fun = thread.taskMap[event.data.id]; 187 if (fun) { 188 fun(event.data.results, event.data.hover); 189 } 190 Reflect.deleteProperty(thread.taskMap, event.data.id); 191 } 192 } 193 if (this.isIdle() && this.onComplete) { 194 this.onComplete(); 195 } 196 }; 197 } 198 199 close = () => { 200 for (let thread of this.works) { 201 thread.worker!.terminate(); 202 } 203 this.works.length = 0; 204 }; 205 206 clearCache = () => { 207 for (let thread of this.works) { 208 thread.queryFunc('clear', {}, undefined, () => {}); 209 } 210 }; 211 212 submitWithName(name: string, type: string, args: any, transfer: any, handler: Function): ProcedureThread | undefined { 213 let noBusyThreads = this.works.filter((it) => it.name === name); 214 let thread: ProcedureThread | undefined; 215 if (noBusyThreads.length > 0) { 216 //取第一个空闲的线程进行任务 217 thread = noBusyThreads[0]; 218 thread!.queryFunc(type, args, transfer, handler); 219 } 220 return thread; 221 } 222 223 submitWithNamePromise(name: string, type: string, args: any, transfer: any): Promise<any> { 224 return new Promise((resolve, reject) => { 225 let noBusyThreads = this.works.filter((it) => it.name === name); 226 let thread: ProcedureThread | undefined; 227 if (noBusyThreads.length > 0) { 228 //取第一个空闲的线程进行任务 229 thread = noBusyThreads[0]; 230 thread!.queryFunc(type, args, transfer, (res: any, hover: any) => { 231 resolve({ 232 res: res, 233 hover: hover, 234 }); 235 }); 236 } 237 }); 238 } 239 240 isIdle() { 241 return this.works.every((it) => !it.busy); 242 } 243} 244 245export const procedurePool = new ProcedurePool(); 246