1/* 2 * Copyright (c) 2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16import { 17 parentPort, 18 workerData 19} from 'worker_threads'; 20import * as fs from 'fs'; 21import { MemoryReport } from './memory_report'; 22 23interface StageReport { 24 // Is the current StageReport in the record stage 25 record: boolean; 26 // Indicates the maximum RSS memory usage in the current Stage 27 top: number; 28 // The minimum RSS memory usage in the current stage 29 bottom: number; 30 // the total number of memory reports for the current Stage 31 data: MemoryReport[]; 32} 33 34interface QueryMemReport { 35 [stageName: string]: StageReport; 36} 37 38class MemoryCollector { 39 private reports: MemoryReport[] = []; 40 private filePath: string; 41 private bufferInterval: number; 42 private writeInterval: number; 43 private writeTimeoutId: NodeJS.Timeout | null = null; 44 private stage: string = ''; 45 private parentStage: string = ''; 46 private intervalId: NodeJS.Timeout | null = null; 47 private stopIntervalId: NodeJS.Timeout | null = null; 48 private recordMap = new Map<string, string>(); 49 private recordStack: string[] = []; 50 private lastRecordTime: number = 0; 51 private lastRssValue: number = 0; 52 private lastStage: string = ''; 53 private throttlePercentage: number = 1.05; 54 private dottingFileExitFlag: boolean = false; 55 56 57 constructor(filePath: string, bufferInterval: number = 100, writeInterval: number = 1000) { 58 this.filePath = filePath; 59 this.bufferInterval = bufferInterval; 60 this.writeInterval = writeInterval; 61 } 62 63 private recordMemoryUsage(memReport?: MemoryReport): void { 64 const now = Date.now(); 65 if (!memReport) { 66 const rss = process.memoryUsage.rss(); 67 memReport = { 68 timestamp: now, 69 rss: rss, 70 stage: this.stage, 71 parentStage: this.parentStage, 72 }; 73 } 74 const currentRss = memReport?.rss || process.memoryUsage().rss; 75 if ( 76 (memReport.stage && memReport.stage !== this.lastStage) || 77 now - this.lastRecordTime >= this.bufferInterval || 78 currentRss / this.lastRssValue >= this.throttlePercentage 79 ) { 80 this.lastRecordTime = now; 81 this.lastRssValue = currentRss; 82 this.lastStage = this.stage; 83 if (memReport.stage !== '') { 84 memReport.stage = this.getRealStageName(memReport.stage); 85 memReport.parentStage = this.getRealStageName(memReport.parentStage); 86 this.reports.push(memReport); 87 } 88 } 89 if (this.writeTimeoutId == null) { 90 this.writeTimeoutId = setTimeout(() => { 91 this.flushReports(); 92 }, this.writeInterval); 93 } 94 } 95 96 private flushReports(): void { 97 if (this.writeTimeoutId !== null) { 98 clearTimeout(this.writeTimeoutId); 99 this.writeTimeoutId = null; 100 } 101 if (this.reports.length > 0) { 102 try { 103 fs.accessSync(this.filePath, fs.constants.F_OK); 104 } catch (err) { 105 if (!this.dottingFileExitFlag) { 106 let data = this.reports.map((report) => JSON.stringify(report)).join(',\n') + ',\n'; 107 this.reports = []; 108 fs.writeFileSync(this.filePath, data, 'utf8'); 109 this.dottingFileExitFlag = true; 110 } 111 return; 112 } 113 if (this.dottingFileExitFlag) { 114 let data = this.reports.map((report) => JSON.stringify(report)).join(',\n') + ',\n'; 115 this.reports = []; 116 fs.appendFileSync(this.filePath, data, 'utf8'); 117 } 118 } 119 } 120 121 private getRealStageName(str: string): string { 122 let index = str.lastIndexOf(')'); 123 if (index !== -1) { 124 return str.slice(0, index + 1); 125 } 126 return str; 127 } 128 129 private containsNonStopValue(map: Map<string, string>): boolean { 130 for (let value of map.values()) { 131 if (value !== 'stop') { 132 return true; 133 } 134 } 135 return false; 136 } 137 138 start(): void { 139 this.intervalId = setInterval(() => { 140 this.recordMemoryUsage(); 141 }, this.bufferInterval); 142 } 143 144 stop(): void { 145 this.stopIntervalId = setInterval(() => { 146 if (this.containsNonStopValue(this.recordMap)) { 147 return; 148 } 149 150 if (this.stopIntervalId) { 151 clearInterval(this.stopIntervalId); 152 this.stopIntervalId = null; 153 } 154 if (this.intervalId) { 155 clearInterval(this.intervalId); 156 this.intervalId = null; 157 } 158 if (this.writeTimeoutId) { 159 clearTimeout(this.writeTimeoutId); 160 this.writeTimeoutId = null; 161 } 162 this.flushReports(); 163 parentPort!.postMessage({ action: 'stop_end'}); 164 }, 5); 165 } 166 167 record(stage: string, memReport?: MemoryReport): void { 168 this.recordMap.set(stage, 'record'); 169 if (this.recordStack.length > 0) { 170 this.parentStage = this.stage; 171 } 172 this.stage = stage; 173 this.recordStack.push(stage); 174 if (memReport) { 175 memReport.parentStage = this.parentStage; 176 } 177 this.recordMemoryUsage(memReport); 178 } 179 180 stopRecord(stage: string, memReport?: MemoryReport): void { 181 this.recordMap.set(stage, 'stop'); 182 if (stage === this.stage) { 183 this.recordStack.pop(); 184 this.updateStages(); 185 this.updateMemReport(memReport); 186 } else { 187 const index = this.recordStack.indexOf(stage); 188 if (index !== -1) { 189 this.updateMemReport(memReport, index); 190 this.recordStack.splice(index); 191 this.updateStages(); 192 } else { 193 return; 194 } 195 } 196 this.recordMemoryUsage(memReport); 197 } 198 199 private updateStages(): void { 200 const length = this.recordStack.length; 201 if (length > 1) { 202 this.stage = this.recordStack[length - 1]; 203 this.parentStage = this.recordStack[length - 2]; 204 } else if (length === 1) { 205 this.stage = this.recordStack[0]; 206 this.parentStage = ''; 207 } else { 208 this.stage = ''; 209 this.parentStage = ''; 210 } 211 } 212 213 private updateMemReport(memReport: MemoryReport | undefined, index?: number): void { 214 if (memReport) { 215 if (index !== undefined && index - 1 >= 0) { 216 memReport.parentStage = this.recordStack[index - 1]; 217 } else { 218 memReport.parentStage = this.stage; 219 } 220 } 221 } 222 223 addMemoryUsage(memReport: MemoryReport): void { 224 this.recordMemoryUsage(memReport); 225 } 226 227 handleRecord(recordStr: string, stage: string, report: QueryMemReport): void { 228 try { 229 const reportObj = JSON.parse(recordStr); 230 if (reportObj.stage.indexof(stage)) { 231 if (reportObj.rss > report.stage.top) { 232 report.stage.top = reportObj.rss; 233 } 234 if (reportObj.rss < report.stage.bottom) { 235 report.stage.bottom = reportObj.rss; 236 } 237 reportObj.stage = stage; 238 report.stage.data.push(reportObj); 239 } 240 } catch (e) { 241 console.error(`Error parsing JSON: ${recordStr}`); 242 console.error(e); 243 } 244 } 245 246 queryMemoryUsage(requestId: number, stage: string): void { 247 let record = this.recordMap.has(stage); 248 let stageReport: StageReport = { 249 record: record, 250 top: -1, 251 bottom: -1, 252 data: [], 253 }; 254 let report: QueryMemReport = {}; 255 report[stage] = stageReport; 256 let currentRecord = ''; 257 let inRecord = false; 258 const stream = fs.createReadStream(this.filePath, { encoding: 'utf8' }); 259 stream.on('data', (chunk) => { 260 for (let char of chunk) { 261 if (char === '{') { 262 inRecord = true; 263 currentRecord = char; 264 } else if (char === '}') { 265 inRecord = false; 266 currentRecord += char; 267 this.handleRecord(currentRecord, stage, report); 268 currentRecord = ''; 269 } else if (inRecord) { 270 currentRecord += char; 271 } 272 } 273 }); 274 275 stream.on('end', () => { 276 parentPort!.postMessage({ action: 'memoryReport', requestId: requestId, report: report }); 277 }); 278 stream.on('error', (err) => { 279 parentPort!.postMessage({ action: 'memoryReport', requestId: requestId, report: {} }); 280 }); 281 } 282} 283 284if (workerData) { 285 const { filePath, bufferInterval, writeInterval } = workerData; 286 const collector = new MemoryCollector(filePath, bufferInterval, writeInterval); 287 collector.start(); 288 parentPort!.on('message', (msg) => { 289 if (msg.action === 'stop') { 290 collector.stop(); 291 } else if (msg.action === 'recordStage') { 292 collector.record(msg.stage + msg.recordIndex, msg.memoryReport); 293 } else if (msg.action === 'stopRecordStage') { 294 collector.stopRecord(msg.stage + msg.recordIndex, msg.memoryReport); 295 } else if (msg.action === 'addMemoryReport') { 296 collector.addMemoryUsage(msg.memoryReport); 297 } else if (msg.action === 'queryMemoryUsage') { 298 collector.queryMemoryUsage(msg.requestId, msg.stage); 299 } 300 }); 301} 302 303