• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16import {
17    parentPort,
18    workerData
19} from 'worker_threads';
20import * as fs from 'fs';
21import { MemoryReport } from './memory_report';
22
23interface StageReport {
24  // Is the current StageReport in the record stage
25  record: boolean;
26  // Indicates the maximum RSS memory usage in the current Stage
27  top: number;
28  // The minimum RSS memory usage in the current stage
29  bottom: number;
30  // the total number of memory reports for the current Stage
31  data: MemoryReport[];
32}
33
34interface QueryMemReport {
35  [stageName: string]: StageReport;
36}
37
38class MemoryCollector {
39  private reports: MemoryReport[] = [];
40  private filePath: string;
41  private bufferInterval: number;
42  private writeInterval: number;
43  private writeTimeoutId: NodeJS.Timeout | null = null;
44  private stage: string = '';
45  private parentStage: string = '';
46  private intervalId: NodeJS.Timeout | null = null;
47  private stopIntervalId: NodeJS.Timeout | null = null;
48  private recordMap = new Map<string, string>();
49  private recordStack: string[] = [];
50  private lastRecordTime: number = 0;
51  private lastRssValue: number = 0;
52  private lastStage: string = '';
53  private throttlePercentage: number = 1.05;
54  private dottingFileExitFlag: boolean = false;
55
56
57  constructor(filePath: string, bufferInterval: number = 100, writeInterval: number = 1000) {
58    this.filePath = filePath;
59    this.bufferInterval = bufferInterval;
60    this.writeInterval = writeInterval;
61  }
62
63  private recordMemoryUsage(memReport?: MemoryReport): void {
64    const now = Date.now();
65    if (!memReport) {
66      const rss = process.memoryUsage.rss();
67      memReport = {
68        timestamp: now,
69        rss: rss,
70        stage: this.stage,
71        parentStage: this.parentStage,
72      };
73    }
74    const currentRss = memReport?.rss || process.memoryUsage().rss;
75    if (
76      (memReport.stage && memReport.stage !== this.lastStage) ||
77      now - this.lastRecordTime >= this.bufferInterval ||
78      currentRss / this.lastRssValue >= this.throttlePercentage
79    ) {
80      this.lastRecordTime = now;
81      this.lastRssValue = currentRss;
82      this.lastStage = this.stage;
83      if (memReport.stage !== '') {
84		memReport.stage = this.getRealStageName(memReport.stage);
85		memReport.parentStage = this.getRealStageName(memReport.parentStage);
86        this.reports.push(memReport);
87      }
88    }
89    if (this.writeTimeoutId == null) {
90      this.writeTimeoutId = setTimeout(() => {
91        this.flushReports();
92      }, this.writeInterval);
93    }
94  }
95
96  private flushReports(): void {
97    if (this.writeTimeoutId !== null) {
98      clearTimeout(this.writeTimeoutId);
99      this.writeTimeoutId = null;
100    }
101    if (this.reports.length > 0) {
102      try {
103        fs.accessSync(this.filePath, fs.constants.F_OK);
104      } catch (err) {
105		if (!this.dottingFileExitFlag) {
106			let data = this.reports.map((report) => JSON.stringify(report)).join(',\n') + ',\n';
107            this.reports = [];
108			fs.writeFileSync(this.filePath, data, 'utf8');
109			this.dottingFileExitFlag = true;
110		}
111        return;
112      }
113	  if (this.dottingFileExitFlag) {
114		let data = this.reports.map((report) => JSON.stringify(report)).join(',\n') + ',\n';
115		this.reports = [];
116		fs.appendFileSync(this.filePath, data, 'utf8');
117	  }
118    }
119  }
120
121  private getRealStageName(str: string): string {
122	let index = str.lastIndexOf(')');
123	if (index !== -1) {
124	  return str.slice(0, index + 1);
125	}
126	return str;
127  }
128
129  private containsNonStopValue(map: Map<string, string>): boolean {
130    for (let value of map.values()) {
131      if (value !== 'stop') {
132        return true;
133      }
134    }
135    return false;
136  }
137
138  start(): void {
139    this.intervalId = setInterval(() => {
140      this.recordMemoryUsage();
141    }, this.bufferInterval);
142  }
143
144  stop(): void {
145    this.stopIntervalId = setInterval(() => {
146      if (this.containsNonStopValue(this.recordMap)) {
147        return;
148      }
149
150      if (this.stopIntervalId) {
151        clearInterval(this.stopIntervalId);
152        this.stopIntervalId = null;
153      }
154      if (this.intervalId) {
155        clearInterval(this.intervalId);
156        this.intervalId = null;
157      }
158      if (this.writeTimeoutId) {
159        clearTimeout(this.writeTimeoutId);
160        this.writeTimeoutId = null;
161      }
162      this.flushReports();
163      parentPort!.postMessage({ action: 'stop_end'});
164    }, 5);
165  }
166
167  record(stage: string, memReport?: MemoryReport): void {
168    this.recordMap.set(stage, 'record');
169    if (this.recordStack.length > 0) {
170      this.parentStage = this.stage;
171    }
172    this.stage = stage;
173    this.recordStack.push(stage);
174    if (memReport) {
175      memReport.parentStage = this.parentStage;
176    }
177    this.recordMemoryUsage(memReport);
178  }
179
180  stopRecord(stage: string, memReport?: MemoryReport): void {
181    this.recordMap.set(stage, 'stop');
182    if (stage === this.stage) {
183      this.recordStack.pop();
184      this.updateStages();
185      this.updateMemReport(memReport);
186    } else {
187      const index = this.recordStack.indexOf(stage);
188      if (index !== -1) {
189        this.updateMemReport(memReport, index);
190        this.recordStack.splice(index);
191        this.updateStages();
192      } else {
193        return;
194      }
195    }
196    this.recordMemoryUsage(memReport);
197  }
198
199  private updateStages(): void {
200    const length = this.recordStack.length;
201    if (length > 1) {
202      this.stage = this.recordStack[length - 1];
203      this.parentStage = this.recordStack[length - 2];
204    } else if (length === 1) {
205      this.stage = this.recordStack[0];
206      this.parentStage = '';
207    } else {
208      this.stage = '';
209      this.parentStage = '';
210    }
211  }
212
213  private updateMemReport(memReport: MemoryReport | undefined, index?: number): void {
214    if (memReport) {
215      if (index !== undefined && index - 1 >= 0) {
216        memReport.parentStage = this.recordStack[index - 1];
217      } else {
218        memReport.parentStage = this.stage;
219      }
220    }
221  }
222
223  addMemoryUsage(memReport: MemoryReport): void {
224    this.recordMemoryUsage(memReport);
225  }
226
227  handleRecord(recordStr: string, stage: string, report: QueryMemReport): void {
228    try {
229      const reportObj = JSON.parse(recordStr);
230      if (reportObj.stage.indexof(stage)) {
231        if (reportObj.rss > report.stage.top) {
232          report.stage.top = reportObj.rss;
233        }
234        if (reportObj.rss < report.stage.bottom) {
235          report.stage.bottom = reportObj.rss;
236        }
237		reportObj.stage = stage;
238        report.stage.data.push(reportObj);
239      }
240    } catch (e) {
241      console.error(`Error parsing JSON: ${recordStr}`);
242      console.error(e);
243    }
244  }
245
246  queryMemoryUsage(requestId: number, stage: string): void {
247    let record = this.recordMap.has(stage);
248    let stageReport: StageReport = {
249      record: record,
250      top: -1,
251      bottom: -1,
252      data: [],
253    };
254    let report: QueryMemReport = {};
255    report[stage] = stageReport;
256    let currentRecord = '';
257    let inRecord = false;
258    const stream = fs.createReadStream(this.filePath, { encoding: 'utf8' });
259    stream.on('data', (chunk) => {
260      for (let char of chunk) {
261        if (char === '{') {
262          inRecord = true;
263          currentRecord = char;
264        } else if (char === '}') {
265          inRecord = false;
266          currentRecord += char;
267          this.handleRecord(currentRecord, stage, report);
268          currentRecord = '';
269        } else if (inRecord) {
270          currentRecord += char;
271        }
272      }
273    });
274
275    stream.on('end', () => {
276      parentPort!.postMessage({ action: 'memoryReport', requestId: requestId, report: report });
277    });
278    stream.on('error', (err) => {
279      parentPort!.postMessage({ action: 'memoryReport', requestId: requestId, report: {} });
280    });
281  }
282}
283
284if (workerData) {
285  const { filePath, bufferInterval, writeInterval } = workerData;
286  const collector = new MemoryCollector(filePath, bufferInterval, writeInterval);
287  collector.start();
288  parentPort!.on('message', (msg) => {
289    if (msg.action === 'stop') {
290      collector.stop();
291    } else if (msg.action === 'recordStage') {
292      collector.record(msg.stage + msg.recordIndex, msg.memoryReport);
293    } else if (msg.action === 'stopRecordStage') {
294      collector.stopRecord(msg.stage + msg.recordIndex, msg.memoryReport);
295    } else if (msg.action === 'addMemoryReport') {
296      collector.addMemoryUsage(msg.memoryReport);
297    } else if (msg.action === 'queryMemoryUsage') {
298      collector.queryMemoryUsage(msg.requestId, msg.stage);
299    }
300  });
301}
302
303