1/* 2 * Copyright (C) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 17import {query} from "./SqlLite.js"; 18 19class ProcedureThread extends Worker { 20 busy: boolean = false; 21 isCancelled: boolean = false; 22 id: number = -1; 23 taskMap: any = {}; 24 name: string | undefined 25 26 uuid(): string { 27 // @ts-ignore 28 return ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, c => (c ^ crypto.getRandomValues(new Uint8Array(1))[0] & 15 >> c / 4).toString(16)); 29 } 30 31 queryFunc(type: string, args: any, transfer: any, handler: Function) { 32 this.busy = true; 33 let id = this.uuid(); 34 this.taskMap[id] = handler 35 let pam = { 36 id: id, 37 type: type, 38 params: args, 39 } 40 if (transfer) { 41 try { 42 if (Array.isArray(transfer)) { 43 if (transfer.length > 0) { 44 this.postMessage(pam, [...transfer]); 45 } else { 46 this.postMessage(pam); 47 } 48 } else { 49 this.postMessage(pam, [transfer]); 50 } 51 } catch (e: any) { 52 } 53 } else { 54 this.postMessage(pam); 55 } 56 } 57 58 cancel() { 59 this.isCancelled = true; 60 this.terminate(); 61 } 62} 63 64class ProcedurePool { 65 static cpuCount = Math.floor((window.navigator.hardwareConcurrency || 4) / 2); 66 maxThreadNumber: number = 1 67 works: Array<ProcedureThread> = [] 68 timelineChange: ((a: any) => void) | undefined | null = null; 69 cpusLen = ProcedurePool.build('cpu', 8); 70 freqLen = ProcedurePool.build('freq', 2); 71 processLen = ProcedurePool.build('process', 8); 72 logicDataLen = ProcedurePool.build('logic', 2); 73 names = [...this.cpusLen, ...this.processLen, ...this.freqLen]; 74 logicDataHandles = [...this.logicDataLen] 75 76 onComplete: Function | undefined;//任务完成回调 77 78 constructor(threadBuild: (() => ProcedureThread) | undefined = undefined) { 79 this.init(threadBuild); 80 } 81 82 static build(name: string, len: number) { 83 return [...Array(len).keys()].map(it => `${name}${it}`) 84 } 85 86 init(threadBuild: (() => ProcedureThread) | undefined = undefined) { 87 this.maxThreadNumber = this.names.length 88 for (let i = 0; i < this.maxThreadNumber; i++) { 89 this.newThread(); 90 } 91 for (let j = 0; j < this.logicDataHandles.length; j++) { 92 this.logicDataThread() 93 } 94 } 95 96 newThread() { 97 let thread: ProcedureThread = new ProcedureThread("trace/database/ui-worker/ProcedureWorker.js", {type: "module"}) 98 thread.name = this.names[this.works.length] 99 thread.onmessage = (event: MessageEvent) => { 100 thread.busy = false; 101 if ((event.data.type as string) == "timeline-range-changed") { 102 this.timelineChange && this.timelineChange(event.data.results); 103 thread.busy = false; 104 return; 105 } 106 if (Reflect.has(thread.taskMap, event.data.id)) { 107 if (event.data) { 108 let fun = thread.taskMap[event.data.id]; 109 if (fun) { 110 fun(event.data.results, event.data.hover); 111 } 112 Reflect.deleteProperty(thread.taskMap, event.data.id) 113 } 114 } 115 if (this.isIdle() && this.onComplete) { 116 this.onComplete(); 117 } 118 } 119 thread.onmessageerror = e => { 120 } 121 thread.onerror = e => { 122 } 123 thread.id = this.works.length 124 thread.busy = false 125 this.works?.push(thread) 126 return thread; 127 } 128 129 logicDataThread(){ 130 let thread: ProcedureThread = new ProcedureThread("trace/database/logic-worker/ProcedureLogicWorker.js", {type: "module"}) 131 thread.name = this.logicDataHandles[this.works.length - this.names.length] 132 thread.onmessage = (event: MessageEvent) => { 133 thread.busy = false; 134 if(event.data.isQuery){ 135 query(event.data.type,event.data.sql,event.data.args,"exec-buf").then((res:any)=>{ 136 thread.postMessage({ 137 type:event.data.type, 138 params:{ 139 list:res 140 }, 141 id:event.data.id 142 }) 143 }) 144 return 145 } 146 if(event.data.isSending){ 147 if (Reflect.has(thread.taskMap, event.data.id)) { 148 if (event.data) { 149 let fun = thread.taskMap[event.data.id]; 150 if (fun) { 151 fun(event.data.results, event.data.hover); 152 } 153 return; 154 } 155 } 156 } 157 if (Reflect.has(thread.taskMap, event.data.id)) { 158 if (event.data) { 159 let fun = thread.taskMap[event.data.id]; 160 if (fun) { 161 fun(event.data.results, event.data.hover); 162 } 163 Reflect.deleteProperty(thread.taskMap, event.data.id) 164 } 165 } 166 if (this.isIdle() && this.onComplete) { 167 this.onComplete(); 168 } 169 } 170 thread.onmessageerror = e => { 171 } 172 thread.onerror = e => { 173 } 174 thread.id = this.works.length 175 thread.busy = false 176 this.works?.push(thread) 177 return thread; 178 } 179 180 close = () => { 181 for (let i = 0; i < this.works.length; i++) { 182 let thread = this.works[i]; 183 thread.terminate(); 184 } 185 this.works.length = 0; 186 } 187 188 clearCache = () => { 189 for (let i = 0; i < this.works.length; i++) { 190 let thread = this.works[i]; 191 thread.queryFunc("clear", {}, undefined, () => { 192 }) 193 } 194 } 195 196 submitWithName(name: string, type: string, args: any, transfer: any, handler: Function): ProcedureThread | undefined { 197 let noBusyThreads = this.works.filter(it => it.name === name); 198 let thread: ProcedureThread | undefined 199 if (noBusyThreads.length > 0) { //取第一个空闲的线程进行任务 200 thread = noBusyThreads[0]; 201 thread!.queryFunc(type, args, transfer, handler) 202 } 203 return thread; 204 } 205 206 submitWithNamePromise(name: string, type: string, args: any, transfer: any): Promise<any> { 207 return new Promise((resolve, reject) => { 208 let noBusyThreads = this.works.filter(it => it.name === name); 209 let thread: ProcedureThread | undefined 210 if (noBusyThreads.length > 0) { //取第一个空闲的线程进行任务 211 thread = noBusyThreads[0]; 212 thread!.queryFunc(type, args, transfer, (res: any, hover: any) => { 213 resolve({ 214 res: res, 215 hover: hover, 216 }) 217 }); 218 } 219 }) 220 } 221 222 isIdle() { 223 return this.works.every(it => !it.busy); 224 } 225} 226 227 228export const procedurePool = new ProcedurePool()