1 /**
2 * Copyright 2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/perf/cpu_sampling.h"
17
18 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
19 #include <sys/syscall.h>
20 #endif
21 #include <cmath>
22 #include <cstdio>
23 #include <algorithm>
24 #include <fstream>
25 #include <memory>
26 #include <string>
27
28 #include "minddata/dataset/api/python/pybind_conversion.h"
29 #include "minddata/dataset/core/config_manager.h"
30 #include "minddata/dataset/engine/execution_tree.h"
31 #include "minddata/dataset/util/path.h"
32
33 using json = nlohmann::json;
34 namespace mindspore {
35 namespace dataset {
36 bool BaseCpu::fetched_all_process_shared_ = false;
37 std::unordered_map<int32_t, std::vector<pid_t>> BaseCpu::op_process_shared_ = {};
38
39 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
40 #define USING_LINUX
41 #endif
42
BaseCpu()43 BaseCpu::BaseCpu() {
44 pre_cpu_stat_.user_stat_ = 0;
45 pre_cpu_stat_.sys_stat_ = 0;
46 pre_cpu_stat_.io_stat_ = 0;
47 pre_cpu_stat_.idle_stat_ = 0;
48 pre_cpu_stat_.total_stat_ = 0;
49 fetched_all_process_ = false;
50 pre_fetched_state_ = false;
51 cpu_processor_num_ = 0;
52 }
53
GetTotalCpuTime(uint64_t * total_stat)54 Status BaseCpu::GetTotalCpuTime(uint64_t *total_stat) {
55 std::ifstream file("/proc/stat");
56 if (!file.is_open()) {
57 MS_LOG(INFO) << "Open CPU file failed when collect CPU information";
58 return Status::OK();
59 }
60 std::string str;
61 getline(file, str);
62 uint64_t user = 0, sys = 0, idle = 0, iowait = 0, nice = 0, irq = 0, softirq = 0;
63 if (sscanf_s(str.c_str(), "%*s %lu %lu %lu %lu %lu %lu %lu", &user, &nice, &sys, &idle, &iowait, &irq, &softirq) ==
64 EOF) {
65 file.close();
66 return Status(StatusCode::kMDUnexpectedError, "Get device CPU failed.");
67 }
68 file.close();
69 *total_stat = user + nice + sys + idle + iowait + irq + softirq;
70
71 return Status::OK();
72 }
73
ParseCpuInfo(const std::string & str)74 Status DeviceCpu::ParseCpuInfo(const std::string &str) {
75 CpuStat cpu_stat;
76 uint64_t nice = 0;
77 uint64_t irq = 0;
78 uint64_t softirq = 0;
79 if (sscanf_s(str.c_str(), "%*s %lu %lu %lu %lu %lu %lu %lu", &cpu_stat.user_stat_, &nice, &cpu_stat.sys_stat_,
80 &cpu_stat.idle_stat_, &cpu_stat.io_stat_, &irq, &softirq) == EOF) {
81 return Status(StatusCode::kMDUnexpectedError, "Get device CPU failed.");
82 }
83
84 cpu_stat.total_stat_ =
85 cpu_stat.user_stat_ + nice + cpu_stat.sys_stat_ + cpu_stat.idle_stat_ + cpu_stat.io_stat_ + irq + softirq;
86 // Calculate the utilization from the second sampling
87 if (!first_collect_) {
88 CpuUtil info;
89 info.user_utilization_ = round((cpu_stat.user_stat_ - pre_cpu_stat_.user_stat_) * 1.0 /
90 (cpu_stat.total_stat_ - pre_cpu_stat_.total_stat_) * 100);
91 info.sys_utilization_ = round((cpu_stat.sys_stat_ - pre_cpu_stat_.sys_stat_) * 1.0 /
92 (cpu_stat.total_stat_ - pre_cpu_stat_.total_stat_) * 100);
93 info.io_utilization_ = round((cpu_stat.io_stat_ - pre_cpu_stat_.io_stat_) * 1.0 /
94 (cpu_stat.total_stat_ - pre_cpu_stat_.total_stat_) * 100);
95 info.idle_utilization_ = round((cpu_stat.idle_stat_ - pre_cpu_stat_.idle_stat_) * 1.0 /
96 (cpu_stat.total_stat_ - pre_cpu_stat_.total_stat_) * 100);
97 cpu_util_.emplace_back(info);
98 }
99 pre_cpu_stat_.user_stat_ = cpu_stat.user_stat_;
100 pre_cpu_stat_.sys_stat_ = cpu_stat.sys_stat_;
101 pre_cpu_stat_.io_stat_ = cpu_stat.io_stat_;
102 pre_cpu_stat_.idle_stat_ = cpu_stat.idle_stat_;
103 pre_cpu_stat_.total_stat_ = cpu_stat.total_stat_;
104
105 return Status::OK();
106 }
107
ParseCtxt(const std::string & str)108 Status DeviceCpu::ParseCtxt(const std::string &str) {
109 uint64_t ctxt;
110 if (sscanf_s(str.c_str(), "%*s %lu", &ctxt) == EOF) {
111 return Status(StatusCode::kMDUnexpectedError, "Get context switch count failed.");
112 }
113 // Calculate the utilization from the second sampling
114 if (!first_collect_) {
115 context_switch_count_.push_back(ctxt - pre_context_switch_count_);
116 }
117 pre_context_switch_count_ = ctxt;
118 return Status::OK();
119 }
120
ParseRunningProcess(const std::string & str)121 Status DeviceCpu::ParseRunningProcess(const std::string &str) {
122 uint32_t running_process;
123 if (sscanf_s(str.c_str(), "%*s %ud", &running_process) == EOF) {
124 return Status(StatusCode::kMDUnexpectedError, "Get context switch count failed.");
125 }
126 // Drop the first value in order to collect same amount of CPU utilization
127 if (!first_collect_) {
128 running_process_.push_back(running_process);
129 }
130
131 return Status::OK();
132 }
133
Collect(const ExecutionTree * tree)134 Status DeviceCpu::Collect(const ExecutionTree *tree) {
135 std::ifstream file("/proc/stat");
136 if (!file.is_open()) {
137 MS_LOG(INFO) << "Open CPU file failed when collect CPU information";
138 return Status::OK();
139 }
140 bool first_line = true;
141 std::string line;
142 while (getline(file, line)) {
143 if (first_line) {
144 first_line = false;
145 RETURN_IF_NOT_OK(ParseCpuInfo(line));
146 }
147 if (line.find("ctxt") != std::string::npos) {
148 RETURN_IF_NOT_OK(ParseCtxt(line));
149 }
150 if (line.find("procs_running") != std::string::npos) {
151 RETURN_IF_NOT_OK(ParseRunningProcess(line));
152 }
153 }
154 file.close();
155
156 first_collect_ = false;
157 return Status::OK();
158 }
Analyze(std::string * name,double * utilization,std::string * extra_message)159 Status DeviceCpu::Analyze(std::string *name, double *utilization, std::string *extra_message) {
160 RETURN_UNEXPECTED_IF_NULL(name);
161 name->clear();
162 name->append("device_info");
163 int total_samples = cpu_util_.size();
164 int sum = 0;
165 // Only analyze the middle half of the samples
166 // Starting and ending may be impacted by startup or ending pipeline activities
167 int start_analyze = total_samples / 4;
168 int end_analyze = total_samples - start_analyze;
169
170 for (int i = start_analyze; i < end_analyze; i++) {
171 sum += cpu_util_[i].user_utilization_;
172 sum += cpu_util_[i].sys_utilization_;
173 }
174
175 // Note device utilization is already in range of 0-1, so don't
176 // need to divide by number of CPUS
177 if ((end_analyze - start_analyze) > 0) {
178 *utilization = sum / (end_analyze - start_analyze);
179 }
180 return Status::OK();
181 }
182
SaveToFile(const std::string & file_path)183 Status DeviceCpu::SaveToFile(const std::string &file_path) {
184 Path path = Path(file_path);
185 json output;
186 if (path.Exists()) {
187 MS_LOG(DEBUG) << file_path << " exists already";
188 try {
189 std::ifstream file(file_path);
190 file >> output;
191 } catch (const std::exception &err) {
192 RETURN_STATUS_UNEXPECTED("Invalid file, failed to open json file: " + file_path +
193 ", please delete it and try again!");
194 }
195 } else {
196 output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
197 }
198
199 std::vector<int8_t> user_util;
200 std::transform(cpu_util_.begin(), cpu_util_.end(), std::back_inserter(user_util),
201 [&](const CpuUtil &info) { return info.user_utilization_; });
202 std::vector<int8_t> sys_util;
203 std::transform(cpu_util_.begin(), cpu_util_.end(), std::back_inserter(sys_util),
204 [&](const CpuUtil &info) { return info.sys_utilization_; });
205 std::vector<int8_t> io_util;
206 std::transform(cpu_util_.begin(), cpu_util_.end(), std::back_inserter(io_util),
207 [&](const CpuUtil &info) { return info.io_utilization_; });
208 std::vector<int8_t> idle_util;
209 std::transform(cpu_util_.begin(), cpu_util_.end(), std::back_inserter(idle_util),
210 [&](const CpuUtil &info) { return info.idle_utilization_; });
211
212 output["device_info"] = {{"user_utilization", user_util},
213 {"sys_utilization", sys_util},
214 {"io_utilization", io_util},
215 {"idle_utilization", idle_util},
216 {"runable_processes", running_process_},
217 {"context_switch_count", context_switch_count_}};
218
219 // Discard the content of the file when opening.
220 std::ofstream os(file_path, std::ios::trunc);
221 os << output;
222 os.close();
223
224 MS_LOG(INFO) << "Save device CPU success.";
225 return Status::OK();
226 }
227
ParseCpuInfo(int32_t op_id,int64_t thread_id,std::unordered_map<int32_t,std::unordered_map<int64_t,CpuOpStat>> * op_stat)228 Status OperatorCpu::ParseCpuInfo(int32_t op_id, int64_t thread_id,
229 std::unordered_map<int32_t, std::unordered_map<int64_t, CpuOpStat>> *op_stat) {
230 RETURN_UNEXPECTED_IF_NULL(op_stat);
231 pid_t pid = 0;
232 #if defined(USING_LINUX)
233 pid = syscall(SYS_getpid);
234 #endif
235 std::string stat_path = "/proc/" + std::to_string(pid) + "/task/" + std::to_string(thread_id) + "/stat";
236
237 // Judge whether file exist first
238 Path temp_path(stat_path);
239 if (!temp_path.Exists()) {
240 (*op_stat)[op_id][thread_id].user_stat_ = 0;
241 (*op_stat)[op_id][thread_id].sys_stat_ = 0;
242 return Status(StatusCode::kMDFileNotExist);
243 }
244
245 std::ifstream file(stat_path);
246 if (!file.is_open()) {
247 MS_LOG(INFO) << "Open CPU file failed when collect CPU information";
248 return Status::OK();
249 }
250 std::string str;
251 getline(file, str);
252 uint64_t utime;
253 uint64_t stime;
254 if (sscanf_s(str.c_str(), "%*d %*s %*s %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %lu %lu", &utime, &stime) ==
255 EOF) {
256 file.close();
257 return Status(StatusCode::kMDUnexpectedError, "Get device CPU failed.");
258 }
259 file.close();
260 (*op_stat)[op_id][thread_id].user_stat_ = utime;
261 (*op_stat)[op_id][thread_id].sys_stat_ = stime;
262
263 return Status::OK();
264 }
265
Collect(const ExecutionTree * tree)266 Status OperatorCpu::Collect(const ExecutionTree *tree) {
267 RETURN_UNEXPECTED_IF_NULL(tree);
268 if (first_collect_) {
269 for (auto iter = tree->begin(); iter != tree->end(); ++iter) {
270 id_count_++;
271 op_name_[iter->id()] = iter->NameWithID();
272 op_parallel_workers_[iter->id()] = iter->NumWorkers();
273 }
274 #if defined(USING_LINUX)
275 cpu_processor_num_ = get_nprocs_conf();
276 #endif
277 }
278
279 // Obtain the op and thread mapping
280 op_thread_.clear();
281 List<Task> allTasks = tree->AllTasks()->GetTask();
282 for (auto &task1 : allTasks) {
283 int32_t op_id = task1.get_operator_id();
284 op_thread_[op_id].emplace_back(task1.get_linux_id());
285 }
286
287 // add process id into op_thread
288 if (!fetched_all_process_) {
289 {
290 py::gil_scoped_acquire gil_acquire;
291 py::module ds = py::module::import("mindspore.dataset.engine.datasets");
292 py::tuple process_info = ds.attr("_get_operator_process")();
293 py::dict sub_process = py::reinterpret_borrow<py::dict>(process_info[0]);
294 fetched_all_process_ = py::reinterpret_borrow<py::bool_>(process_info[1]);
295 // parse dict value
296 op_process_ = toIntMap(sub_process);
297 BaseCpu::op_process_shared_ = op_process_;
298 BaseCpu::fetched_all_process_shared_ = fetched_all_process_;
299 }
300
301 // judge whether there is device_que operator, if so operator id may need increase by one, temp use directly
302 for (auto item : op_process_) {
303 if (!item.second.empty()) {
304 if (op_thread_.find(item.first) != op_thread_.end()) {
305 op_thread_[item.first].insert(op_thread_[item.first].end(), item.second.begin(), item.second.end());
306 } else {
307 op_thread_[item.first] = item.second;
308 }
309 }
310 }
311 }
312
313 uint64_t total_stat_;
314 RETURN_IF_NOT_OK(GetTotalCpuTime(&total_stat_));
315 std::vector<CpuOpUtil> cpu_step_util_;
316 std::unordered_map<int32_t, std::unordered_map<int64_t, CpuOpStat>> op_stat_;
317
318 if (!first_collect_) {
319 // obtain all the op id in current tasks
320 std::vector<int32_t> total_op_id;
321 (void)std::transform(op_thread_.begin(), op_thread_.end(), std::back_inserter(total_op_id),
322 [](const auto &iter) { return iter.first; });
323
324 // iter all the op, and obtain the CPU utilization of each operator
325 for (auto op_id = -1; op_id < id_count_; op_id++) {
326 float user_util = 0, sys_util = 0;
327 auto iter = std::find(total_op_id.begin(), total_op_id.end(), op_id);
328 if (iter != total_op_id.end()) {
329 for (auto thread_id : op_thread_[op_id]) {
330 if (ParseCpuInfo(op_id, thread_id, &op_stat_) == Status::OK()) {
331 user_util += (op_stat_[op_id][thread_id].user_stat_ - pre_op_stat_[op_id][thread_id].user_stat_) * 1.0 /
332 (total_stat_ - pre_total_stat_) * 100;
333 sys_util += (op_stat_[op_id][thread_id].sys_stat_ - pre_op_stat_[op_id][thread_id].sys_stat_) * 1.0 /
334 (total_stat_ - pre_total_stat_) * 100;
335 }
336 }
337 }
338 CpuOpUtil info;
339 info.op_id_ = op_id;
340 info.sys_utilization_ = sys_util;
341 info.user_utilization_ = user_util;
342 cpu_step_util_.emplace_back(info);
343 }
344 cpu_op_util_.emplace_back(cpu_step_util_);
345 } else {
346 // mainly obtain the init CPU execute time in first collect
347 for (const auto &iter : op_thread_) {
348 int32_t op_id = iter.first;
349 for (auto thread_id_ : iter.second) {
350 // ParseCpuInfo may execute failed for cpu data not ready, but we still get next thread cpu info
351 (void)ParseCpuInfo(op_id, thread_id_, &op_stat_);
352 }
353 }
354 }
355
356 // copy current op_stat into pre_op_stat
357 pre_op_stat_ = op_stat_;
358 pre_total_stat_ = total_stat_;
359
360 first_collect_ = false;
361 return Status::OK();
362 }
363
Analyze(std::string * name,double * utilization,std::string * extra_message)364 Status OperatorCpu::Analyze(std::string *name, double *utilization, std::string *extra_message) {
365 RETURN_UNEXPECTED_IF_NULL(name);
366 RETURN_UNEXPECTED_IF_NULL(extra_message);
367 int total_samples = cpu_op_util_.size();
368
369 // Only analyze the middle half of the samples
370 // Starting and ending may be impacted by startup or ending pipeline activities
371 constexpr int64_t sample_sections = 4;
372 int64 start_analyze = total_samples / sample_sections;
373 int64 end_analyze = total_samples - start_analyze;
374 double op_util = 0;
375 *utilization = 0;
376
377 // start loop from 0 was as don't want to analyze op -1
378 for (auto op_id = 0; op_id < id_count_; op_id++) {
379 int64 sum = 0;
380 int64 index = op_id + 1;
381 for (int i = start_analyze; i < end_analyze; i++) {
382 sum += cpu_op_util_[i][index].user_utilization_;
383 sum += cpu_op_util_[i][index].sys_utilization_;
384 }
385 if ((end_analyze - start_analyze) > 0) {
386 op_util = 1.0 * sum * cpu_processor_num_ / (op_parallel_workers_[op_id] * (end_analyze - start_analyze));
387 }
388 if (op_util > *utilization) {
389 *utilization = op_util;
390 name->clear();
391 (void)name->append(op_name_[op_id]);
392 }
393 (void)extra_message->append(op_name_[op_id] + " utilization per thread: " + std::to_string(op_util) + "% (" +
394 std::to_string(op_parallel_workers_[op_id]) + " parallel_workers); ");
395 }
396 return Status::OK();
397 }
398
SaveToFile(const std::string & file_path)399 Status OperatorCpu::SaveToFile(const std::string &file_path) {
400 Path path = Path(file_path);
401 json output;
402 if (path.Exists()) {
403 MS_LOG(DEBUG) << file_path << "already exist.";
404 try {
405 std::ifstream file(file_path);
406 file >> output;
407 } catch (const std::exception &err) {
408 RETURN_STATUS_UNEXPECTED("Invalid file, failed to open json file: " + file_path +
409 ", please delete it and try again!");
410 }
411 }
412
413 uint8_t index = 0;
414 json OpWriter;
415 for (auto op_id = -1; op_id < id_count_; op_id++) {
416 std::vector<uint16_t> user_util;
417 std::vector<uint16_t> sys_util;
418 std::transform(
419 cpu_op_util_.begin(), cpu_op_util_.end(), std::back_inserter(user_util),
420 [&](const std::vector<CpuOpUtil> &info) { return int16_t(info[index].user_utilization_ * cpu_processor_num_); });
421 std::transform(
422 cpu_op_util_.begin(), cpu_op_util_.end(), std::back_inserter(sys_util),
423 [&](const std::vector<CpuOpUtil> &info) { return int16_t(info[index].sys_utilization_ * cpu_processor_num_); });
424
425 json per_op_info = {{"metrics", {{"user_utilization", user_util}, {"sys_utilization", sys_util}}},
426 {"op_id", op_id}};
427 OpWriter.emplace_back(per_op_info);
428 index++;
429 }
430 output["op_info"] = OpWriter;
431
432 // Discard the content of the file when opening.
433 std::ofstream os(file_path, std::ios::trunc);
434 os << output;
435 os.close();
436
437 MS_LOG(INFO) << "Save device CPU success.";
438 return Status::OK();
439 }
440
ParseCpuInfo()441 Status ProcessCpu::ParseCpuInfo() {
442 uint64_t total_stat_;
443 RETURN_IF_NOT_OK(GetTotalCpuTime(&total_stat_));
444
445 if (!pre_fetched_state_) {
446 process_id_.clear();
447 pid_t main_pid = 0;
448 #if defined(USING_LINUX)
449 main_pid = syscall(SYS_getpid);
450 #endif
451 process_id_.emplace_back(main_pid);
452 op_process_ = BaseCpu::op_process_shared_;
453 fetched_all_process_ = BaseCpu::fetched_all_process_shared_;
454 for (const auto &item : op_process_) {
455 for (const auto &id : item.second) {
456 process_id_.emplace_back(id);
457 }
458 }
459 }
460
461 float user_util = 0, sys_util = 0;
462 for (const auto &pid : process_id_) {
463 std::string stat_path = "/proc/" + std::to_string(pid) + "/stat";
464
465 std::ifstream file(stat_path);
466 if (!file.is_open()) {
467 MS_LOG(INFO) << "Open CPU file failed when collect CPU information";
468 continue;
469 }
470 std::string str;
471 getline(file, str);
472 uint64_t user = 0, sys = 0;
473 if (sscanf_s(str.c_str(), "%*d %*s %*s %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %lu %lu", &user, &sys) ==
474 EOF) {
475 file.close();
476 return Status(StatusCode::kMDUnexpectedError, "Get device CPU failed.");
477 }
478 file.close();
479
480 // Calculate the utilization from the second sampling
481 if (!first_collect_ && (pre_process_stat_.find(pid) != pre_process_stat_.end())) {
482 user_util += (user - pre_process_stat_[pid].user_stat_) * 1.0 / (total_stat_ - pre_total_stat_) * 100;
483 sys_util += (sys - pre_process_stat_[pid].sys_stat_) * 1.0 / (total_stat_ - pre_total_stat_) * 100;
484 }
485 pre_process_stat_[pid].user_stat_ = user;
486 pre_process_stat_[pid].sys_stat_ = sys;
487 }
488 if (!first_collect_) {
489 CpuProcessUtil info;
490 info.user_utilization_ = user_util;
491 info.sys_utilization_ = sys_util;
492 process_util_.emplace_back(info);
493 }
494 pre_total_stat_ = total_stat_;
495 first_collect_ = false;
496 pre_fetched_state_ = fetched_all_process_;
497 return Status::OK();
498 }
499
Collect(const ExecutionTree * tree)500 Status ProcessCpu::Collect(const ExecutionTree *tree) {
501 RETURN_UNEXPECTED_IF_NULL(tree);
502 if (first_collect_) {
503 #if defined(USING_LINUX)
504 cpu_processor_num_ = get_nprocs_conf();
505 #endif
506 }
507 RETURN_IF_NOT_OK(ParseCpuInfo());
508
509 return Status::OK();
510 }
511
Analyze(std::string * name,double * utilization,std::string * extra_message)512 Status ProcessCpu::Analyze(std::string *name, double *utilization, std::string *extra_message) {
513 RETURN_UNEXPECTED_IF_NULL(name);
514 RETURN_UNEXPECTED_IF_NULL(utilization);
515 RETURN_UNEXPECTED_IF_NULL(extra_message);
516 name->clear();
517 name->append("process_info");
518 int total_samples = process_util_.size();
519 int64 sum = 0;
520 // Only analyze the middle half of the samples
521 // Starting and ending may be impacted by startup or ending pipeline activities
522 constexpr int64_t sample_sections = 4;
523 int64 start_analyze = total_samples / sample_sections;
524 int64 end_analyze = total_samples - start_analyze;
525
526 for (int i = start_analyze; i < end_analyze; i++) {
527 sum += process_util_[i].user_utilization_;
528 sum += process_util_[i].sys_utilization_;
529 }
530
531 if ((end_analyze - start_analyze) > 0) {
532 *utilization = sum / (end_analyze - start_analyze);
533 }
534 return Status::OK();
535 }
536
SaveToFile(const std::string & file_path)537 Status ProcessCpu::SaveToFile(const std::string &file_path) {
538 Path path = Path(file_path);
539 json output;
540 if (path.Exists()) {
541 MS_LOG(DEBUG) << file_path << "already exist.";
542 try {
543 std::ifstream file(file_path);
544 file >> output;
545 } catch (const std::exception &err) {
546 RETURN_STATUS_UNEXPECTED("Invalid file, failed to open json file: " + file_path +
547 ", please delete it and try again!");
548 }
549 } else {
550 output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
551 }
552
553 std::vector<int16_t> user_util;
554 std::transform(process_util_.begin(), process_util_.end(), std::back_inserter(user_util),
555 [&](const CpuProcessUtil &info) { return uint16_t(info.user_utilization_ * cpu_processor_num_); });
556 std::vector<int16_t> sys_util;
557 std::transform(process_util_.begin(), process_util_.end(), std::back_inserter(sys_util),
558 [&](const CpuProcessUtil &info) { return uint16_t(info.sys_utilization_ * cpu_processor_num_); });
559
560 output["process_info"] = {{"user_utilization", user_util}, {"sys_utilization", sys_util}};
561 output["cpu_processor_num"] = cpu_processor_num_;
562
563 // Discard the content of the file when opening.
564 std::ofstream os(file_path, std::ios::trunc);
565 os << output;
566 os.close();
567
568 MS_LOG(INFO) << "Save process CPU success.";
569 return Status::OK();
570 }
571
CollectTimeStamp()572 Status CpuSampling::CollectTimeStamp() {
573 time_stamp_.emplace_back(ProfilingTime::GetCurMilliSecond());
574 return Status::OK();
575 }
576
577 // Sample action
Sample()578 Status CpuSampling::Sample() {
579 // Collect cpu information
580 for (auto cpu : cpu_) {
581 RETURN_IF_NOT_OK(cpu->Collect(this->tree_));
582 }
583
584 // Collect time stamp
585 RETURN_IF_NOT_OK(CollectTimeStamp());
586 return Status::OK();
587 }
588
SaveTimeStampToFile()589 Status CpuSampling::SaveTimeStampToFile() {
590 // Save time stamp to json file
591 // If the file is already exist, simply add the data to corresponding field.
592 Path path = Path(file_path_);
593 json output;
594 if (path.Exists()) {
595 try {
596 std::ifstream file(file_path_);
597 file >> output;
598 } catch (const std::exception &err) {
599 RETURN_STATUS_UNEXPECTED("Invalid file, failed to open json file: " + file_path_ +
600 ", please delete it and try again!");
601 }
602 }
603 output["time_stamp"] = time_stamp_;
604 std::ofstream os(file_path_, std::ios::trunc);
605 os << output;
606 os.close();
607
608 return Status::OK();
609 }
610
SaveSamplingItervalToFile()611 Status CpuSampling::SaveSamplingItervalToFile() {
612 // If the file is already exist, simply add the data to corresponding field.
613 Path path = Path(file_path_);
614 json output;
615 if (path.Exists()) {
616 try {
617 std::ifstream file(file_path_);
618 file >> output;
619 } catch (const std::exception &err) {
620 RETURN_STATUS_UNEXPECTED("Invalid file, failed to open json file: " + file_path_ +
621 ", please delete it and try again!");
622 }
623 }
624 output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
625 std::ofstream os(file_path_, std::ios::trunc);
626 os << output;
627 os.close();
628
629 return Status::OK();
630 }
631
632 // Analyze profiling data and output warning messages
Analyze()633 Status CpuSampling::Analyze() {
634 std::string name;
635 double utilization = 0;
636 constexpr double total_cpu_thold = 90;
637 constexpr double op_cpu_thold = 80;
638 // Keep track of specific information returned by differentn CPU sampling types
639 double total_utilization = 0;
640 double max_op_utilization = 0;
641 std::string max_op_name;
642 std::string detailed_op_cpu_message;
643
644 // Save cpu information to json file
645 for (auto cpu : cpu_) {
646 std::string extra_message;
647 RETURN_IF_NOT_OK(cpu->Analyze(&name, &utilization, &extra_message));
648 if (name == "device_info") {
649 total_utilization = utilization;
650 } else if (name != "process_info") {
651 max_op_utilization = utilization;
652 max_op_name = name;
653 detailed_op_cpu_message = extra_message;
654 }
655 }
656 if ((total_utilization < total_cpu_thold) && (max_op_utilization > op_cpu_thold)) {
657 MS_LOG(WARNING) << "Operator " << max_op_name << " is using " << max_op_utilization << "% CPU per thread. "
658 << "This operator may benefit from increasing num_parallel_workers."
659 << "Full Operator CPU utiliization for all operators: " << detailed_op_cpu_message << std::endl;
660 }
661 return Status::OK();
662 }
663
664 // Save profiling data to file
SaveToFile()665 Status CpuSampling::SaveToFile() {
666 // Save time stamp to json file
667 RETURN_IF_NOT_OK(SaveTimeStampToFile());
668
669 // Save time stamp to json file
670 RETURN_IF_NOT_OK(SaveSamplingItervalToFile());
671
672 // Save cpu information to json file
673 for (auto cpu : cpu_) {
674 RETURN_IF_NOT_OK(cpu->SaveToFile(file_path_));
675 }
676
677 return Status::OK();
678 }
679
Init(const std::string & dir_path,const std::string & device_id)680 Status CpuSampling::Init(const std::string &dir_path, const std::string &device_id) {
681 file_path_ = (Path(dir_path) / Path("minddata_cpu_utilization_" + device_id + ".json")).ToString();
682 std::shared_ptr<DeviceCpu> device_cpu = std::make_shared<DeviceCpu>();
683 std::shared_ptr<OperatorCpu> operator_cpu = std::make_shared<OperatorCpu>();
684 std::shared_ptr<ProcessCpu> process_cpu = std::make_shared<ProcessCpu>();
685 cpu_.push_back(device_cpu);
686 cpu_.push_back(operator_cpu);
687 cpu_.push_back(process_cpu);
688 return Status::OK();
689 }
690
ChangeFileMode()691 Status CpuSampling::ChangeFileMode() {
692 if (chmod(common::SafeCStr(file_path_), S_IRUSR | S_IWUSR) == -1) {
693 std::string err_str = "Change file mode failed," + file_path_;
694 return Status(StatusCode::kMDUnexpectedError, err_str);
695 }
696 return Status::OK();
697 }
698 } // namespace dataset
699 } // namespace mindspore
700