• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/perf/cpu_sampling.h"
17 
18 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
19 #include <sys/syscall.h>
20 #endif
21 #include <cmath>
22 #include <cstdio>
23 #include <algorithm>
24 #include <fstream>
25 #include <memory>
26 #include <string>
27 
28 #include "minddata/dataset/api/python/pybind_conversion.h"
29 #include "minddata/dataset/core/config_manager.h"
30 #include "minddata/dataset/engine/execution_tree.h"
31 #include "minddata/dataset/util/path.h"
32 
33 using json = nlohmann::json;
34 namespace mindspore {
35 namespace dataset {
36 bool BaseCpu::fetched_all_process_shared_ = false;
37 std::unordered_map<int32_t, std::vector<pid_t>> BaseCpu::op_process_shared_ = {};
38 
39 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
40 #define USING_LINUX
41 #endif
42 
BaseCpu()43 BaseCpu::BaseCpu() {
44   pre_cpu_stat_.user_stat_ = 0;
45   pre_cpu_stat_.sys_stat_ = 0;
46   pre_cpu_stat_.io_stat_ = 0;
47   pre_cpu_stat_.idle_stat_ = 0;
48   pre_cpu_stat_.total_stat_ = 0;
49   fetched_all_process_ = false;
50   pre_fetched_state_ = false;
51   cpu_processor_num_ = 0;
52 }
53 
GetTotalCpuTime(uint64_t * total_stat)54 Status BaseCpu::GetTotalCpuTime(uint64_t *total_stat) {
55   std::ifstream file("/proc/stat");
56   if (!file.is_open()) {
57     MS_LOG(INFO) << "Open CPU file failed when collect CPU information";
58     return Status::OK();
59   }
60   std::string str;
61   getline(file, str);
62   uint64_t user = 0, sys = 0, idle = 0, iowait = 0, nice = 0, irq = 0, softirq = 0;
63   if (sscanf_s(str.c_str(), "%*s %lu %lu %lu %lu %lu %lu %lu", &user, &nice, &sys, &idle, &iowait, &irq, &softirq) ==
64       EOF) {
65     file.close();
66     return Status(StatusCode::kMDUnexpectedError, "Get device CPU failed.");
67   }
68   file.close();
69   *total_stat = user + nice + sys + idle + iowait + irq + softirq;
70 
71   return Status::OK();
72 }
73 
ParseCpuInfo(const std::string & str)74 Status DeviceCpu::ParseCpuInfo(const std::string &str) {
75   CpuStat cpu_stat;
76   uint64_t nice = 0;
77   uint64_t irq = 0;
78   uint64_t softirq = 0;
79   if (sscanf_s(str.c_str(), "%*s %lu %lu %lu %lu %lu %lu %lu", &cpu_stat.user_stat_, &nice, &cpu_stat.sys_stat_,
80                &cpu_stat.idle_stat_, &cpu_stat.io_stat_, &irq, &softirq) == EOF) {
81     return Status(StatusCode::kMDUnexpectedError, "Get device CPU failed.");
82   }
83 
84   cpu_stat.total_stat_ =
85     cpu_stat.user_stat_ + nice + cpu_stat.sys_stat_ + cpu_stat.idle_stat_ + cpu_stat.io_stat_ + irq + softirq;
86   // Calculate the utilization from the second sampling
87   if (!first_collect_) {
88     CpuUtil info;
89     info.user_utilization_ = round((cpu_stat.user_stat_ - pre_cpu_stat_.user_stat_) * 1.0 /
90                                    (cpu_stat.total_stat_ - pre_cpu_stat_.total_stat_) * 100);
91     info.sys_utilization_ = round((cpu_stat.sys_stat_ - pre_cpu_stat_.sys_stat_) * 1.0 /
92                                   (cpu_stat.total_stat_ - pre_cpu_stat_.total_stat_) * 100);
93     info.io_utilization_ = round((cpu_stat.io_stat_ - pre_cpu_stat_.io_stat_) * 1.0 /
94                                  (cpu_stat.total_stat_ - pre_cpu_stat_.total_stat_) * 100);
95     info.idle_utilization_ = round((cpu_stat.idle_stat_ - pre_cpu_stat_.idle_stat_) * 1.0 /
96                                    (cpu_stat.total_stat_ - pre_cpu_stat_.total_stat_) * 100);
97     cpu_util_.emplace_back(info);
98   }
99   pre_cpu_stat_.user_stat_ = cpu_stat.user_stat_;
100   pre_cpu_stat_.sys_stat_ = cpu_stat.sys_stat_;
101   pre_cpu_stat_.io_stat_ = cpu_stat.io_stat_;
102   pre_cpu_stat_.idle_stat_ = cpu_stat.idle_stat_;
103   pre_cpu_stat_.total_stat_ = cpu_stat.total_stat_;
104 
105   return Status::OK();
106 }
107 
ParseCtxt(const std::string & str)108 Status DeviceCpu::ParseCtxt(const std::string &str) {
109   uint64_t ctxt;
110   if (sscanf_s(str.c_str(), "%*s %lu", &ctxt) == EOF) {
111     return Status(StatusCode::kMDUnexpectedError, "Get context switch count failed.");
112   }
113   // Calculate the utilization from the second sampling
114   if (!first_collect_) {
115     context_switch_count_.push_back(ctxt - pre_context_switch_count_);
116   }
117   pre_context_switch_count_ = ctxt;
118   return Status::OK();
119 }
120 
ParseRunningProcess(const std::string & str)121 Status DeviceCpu::ParseRunningProcess(const std::string &str) {
122   uint32_t running_process;
123   if (sscanf_s(str.c_str(), "%*s %ud", &running_process) == EOF) {
124     return Status(StatusCode::kMDUnexpectedError, "Get context switch count failed.");
125   }
126   // Drop the first value in order to collect same amount of CPU utilization
127   if (!first_collect_) {
128     running_process_.push_back(running_process);
129   }
130 
131   return Status::OK();
132 }
133 
Collect(const ExecutionTree * tree)134 Status DeviceCpu::Collect(const ExecutionTree *tree) {
135   std::ifstream file("/proc/stat");
136   if (!file.is_open()) {
137     MS_LOG(INFO) << "Open CPU file failed when collect CPU information";
138     return Status::OK();
139   }
140   bool first_line = true;
141   std::string line;
142   while (getline(file, line)) {
143     if (first_line) {
144       first_line = false;
145       RETURN_IF_NOT_OK(ParseCpuInfo(line));
146     }
147     if (line.find("ctxt") != std::string::npos) {
148       RETURN_IF_NOT_OK(ParseCtxt(line));
149     }
150     if (line.find("procs_running") != std::string::npos) {
151       RETURN_IF_NOT_OK(ParseRunningProcess(line));
152     }
153   }
154   file.close();
155 
156   first_collect_ = false;
157   return Status::OK();
158 }
Analyze(std::string * name,double * utilization,std::string * extra_message)159 Status DeviceCpu::Analyze(std::string *name, double *utilization, std::string *extra_message) {
160   RETURN_UNEXPECTED_IF_NULL(name);
161   name->clear();
162   name->append("device_info");
163   int total_samples = cpu_util_.size();
164   int sum = 0;
165   // Only analyze the middle half of the samples
166   // Starting and ending may be impacted by startup or ending pipeline activities
167   int start_analyze = total_samples / 4;
168   int end_analyze = total_samples - start_analyze;
169 
170   for (int i = start_analyze; i < end_analyze; i++) {
171     sum += cpu_util_[i].user_utilization_;
172     sum += cpu_util_[i].sys_utilization_;
173   }
174 
175   // Note device utilization is already in range of 0-1, so don't
176   // need to divide by number of CPUS
177   if ((end_analyze - start_analyze) > 0) {
178     *utilization = sum / (end_analyze - start_analyze);
179   }
180   return Status::OK();
181 }
182 
SaveToFile(const std::string & file_path)183 Status DeviceCpu::SaveToFile(const std::string &file_path) {
184   Path path = Path(file_path);
185   json output;
186   if (path.Exists()) {
187     MS_LOG(DEBUG) << file_path << " exists already";
188     try {
189       std::ifstream file(file_path);
190       file >> output;
191     } catch (const std::exception &err) {
192       RETURN_STATUS_UNEXPECTED("Invalid file, failed to open json file: " + file_path +
193                                ", please delete it and try again!");
194     }
195   } else {
196     output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
197   }
198 
199   std::vector<int8_t> user_util;
200   std::transform(cpu_util_.begin(), cpu_util_.end(), std::back_inserter(user_util),
201                  [&](const CpuUtil &info) { return info.user_utilization_; });
202   std::vector<int8_t> sys_util;
203   std::transform(cpu_util_.begin(), cpu_util_.end(), std::back_inserter(sys_util),
204                  [&](const CpuUtil &info) { return info.sys_utilization_; });
205   std::vector<int8_t> io_util;
206   std::transform(cpu_util_.begin(), cpu_util_.end(), std::back_inserter(io_util),
207                  [&](const CpuUtil &info) { return info.io_utilization_; });
208   std::vector<int8_t> idle_util;
209   std::transform(cpu_util_.begin(), cpu_util_.end(), std::back_inserter(idle_util),
210                  [&](const CpuUtil &info) { return info.idle_utilization_; });
211 
212   output["device_info"] = {{"user_utilization", user_util},
213                            {"sys_utilization", sys_util},
214                            {"io_utilization", io_util},
215                            {"idle_utilization", idle_util},
216                            {"runable_processes", running_process_},
217                            {"context_switch_count", context_switch_count_}};
218 
219   // Discard the content of the file when opening.
220   std::ofstream os(file_path, std::ios::trunc);
221   os << output;
222   os.close();
223 
224   MS_LOG(INFO) << "Save device CPU success.";
225   return Status::OK();
226 }
227 
ParseCpuInfo(int32_t op_id,int64_t thread_id,std::unordered_map<int32_t,std::unordered_map<int64_t,CpuOpStat>> * op_stat)228 Status OperatorCpu::ParseCpuInfo(int32_t op_id, int64_t thread_id,
229                                  std::unordered_map<int32_t, std::unordered_map<int64_t, CpuOpStat>> *op_stat) {
230   RETURN_UNEXPECTED_IF_NULL(op_stat);
231   pid_t pid = 0;
232 #if defined(USING_LINUX)
233   pid = syscall(SYS_getpid);
234 #endif
235   std::string stat_path = "/proc/" + std::to_string(pid) + "/task/" + std::to_string(thread_id) + "/stat";
236 
237   // Judge whether file exist first
238   Path temp_path(stat_path);
239   if (!temp_path.Exists()) {
240     (*op_stat)[op_id][thread_id].user_stat_ = 0;
241     (*op_stat)[op_id][thread_id].sys_stat_ = 0;
242     return Status(StatusCode::kMDFileNotExist);
243   }
244 
245   std::ifstream file(stat_path);
246   if (!file.is_open()) {
247     MS_LOG(INFO) << "Open CPU file failed when collect CPU information";
248     return Status::OK();
249   }
250   std::string str;
251   getline(file, str);
252   uint64_t utime;
253   uint64_t stime;
254   if (sscanf_s(str.c_str(), "%*d %*s %*s %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %lu %lu", &utime, &stime) ==
255       EOF) {
256     file.close();
257     return Status(StatusCode::kMDUnexpectedError, "Get device CPU failed.");
258   }
259   file.close();
260   (*op_stat)[op_id][thread_id].user_stat_ = utime;
261   (*op_stat)[op_id][thread_id].sys_stat_ = stime;
262 
263   return Status::OK();
264 }
265 
Collect(const ExecutionTree * tree)266 Status OperatorCpu::Collect(const ExecutionTree *tree) {
267   RETURN_UNEXPECTED_IF_NULL(tree);
268   if (first_collect_) {
269     for (auto iter = tree->begin(); iter != tree->end(); ++iter) {
270       id_count_++;
271       op_name_[iter->id()] = iter->NameWithID();
272       op_parallel_workers_[iter->id()] = iter->NumWorkers();
273     }
274 #if defined(USING_LINUX)
275     cpu_processor_num_ = get_nprocs_conf();
276 #endif
277   }
278 
279   // Obtain the op and thread mapping
280   op_thread_.clear();
281   List<Task> allTasks = tree->AllTasks()->GetTask();
282   for (auto &task1 : allTasks) {
283     int32_t op_id = task1.get_operator_id();
284     op_thread_[op_id].emplace_back(task1.get_linux_id());
285   }
286 
287   // add process id into op_thread
288   if (!fetched_all_process_) {
289     {
290       py::gil_scoped_acquire gil_acquire;
291       py::module ds = py::module::import("mindspore.dataset.engine.datasets");
292       py::tuple process_info = ds.attr("_get_operator_process")();
293       py::dict sub_process = py::reinterpret_borrow<py::dict>(process_info[0]);
294       fetched_all_process_ = py::reinterpret_borrow<py::bool_>(process_info[1]);
295       // parse dict value
296       op_process_ = toIntMap(sub_process);
297       BaseCpu::op_process_shared_ = op_process_;
298       BaseCpu::fetched_all_process_shared_ = fetched_all_process_;
299     }
300 
301     // judge whether there is device_que operator, if so operator id may need increase by one, temp use directly
302     for (auto item : op_process_) {
303       if (!item.second.empty()) {
304         if (op_thread_.find(item.first) != op_thread_.end()) {
305           op_thread_[item.first].insert(op_thread_[item.first].end(), item.second.begin(), item.second.end());
306         } else {
307           op_thread_[item.first] = item.second;
308         }
309       }
310     }
311   }
312 
313   uint64_t total_stat_;
314   RETURN_IF_NOT_OK(GetTotalCpuTime(&total_stat_));
315   std::vector<CpuOpUtil> cpu_step_util_;
316   std::unordered_map<int32_t, std::unordered_map<int64_t, CpuOpStat>> op_stat_;
317 
318   if (!first_collect_) {
319     // obtain all the op id in current tasks
320     std::vector<int32_t> total_op_id;
321     (void)std::transform(op_thread_.begin(), op_thread_.end(), std::back_inserter(total_op_id),
322                          [](const auto &iter) { return iter.first; });
323 
324     // iter all the op, and obtain the CPU utilization of each operator
325     for (auto op_id = -1; op_id < id_count_; op_id++) {
326       float user_util = 0, sys_util = 0;
327       auto iter = std::find(total_op_id.begin(), total_op_id.end(), op_id);
328       if (iter != total_op_id.end()) {
329         for (auto thread_id : op_thread_[op_id]) {
330           if (ParseCpuInfo(op_id, thread_id, &op_stat_) == Status::OK()) {
331             user_util += (op_stat_[op_id][thread_id].user_stat_ - pre_op_stat_[op_id][thread_id].user_stat_) * 1.0 /
332                          (total_stat_ - pre_total_stat_) * 100;
333             sys_util += (op_stat_[op_id][thread_id].sys_stat_ - pre_op_stat_[op_id][thread_id].sys_stat_) * 1.0 /
334                         (total_stat_ - pre_total_stat_) * 100;
335           }
336         }
337       }
338       CpuOpUtil info;
339       info.op_id_ = op_id;
340       info.sys_utilization_ = sys_util;
341       info.user_utilization_ = user_util;
342       cpu_step_util_.emplace_back(info);
343     }
344     cpu_op_util_.emplace_back(cpu_step_util_);
345   } else {
346     // mainly obtain the init CPU execute time in first collect
347     for (const auto &iter : op_thread_) {
348       int32_t op_id = iter.first;
349       for (auto thread_id_ : iter.second) {
350         // ParseCpuInfo may execute failed for cpu data not ready, but we still get next thread cpu info
351         (void)ParseCpuInfo(op_id, thread_id_, &op_stat_);
352       }
353     }
354   }
355 
356   // copy current op_stat into pre_op_stat
357   pre_op_stat_ = op_stat_;
358   pre_total_stat_ = total_stat_;
359 
360   first_collect_ = false;
361   return Status::OK();
362 }
363 
Analyze(std::string * name,double * utilization,std::string * extra_message)364 Status OperatorCpu::Analyze(std::string *name, double *utilization, std::string *extra_message) {
365   RETURN_UNEXPECTED_IF_NULL(name);
366   RETURN_UNEXPECTED_IF_NULL(extra_message);
367   int total_samples = cpu_op_util_.size();
368 
369   // Only analyze the middle half of the samples
370   // Starting and ending may be impacted by startup or ending pipeline activities
371   constexpr int64_t sample_sections = 4;
372   int64 start_analyze = total_samples / sample_sections;
373   int64 end_analyze = total_samples - start_analyze;
374   double op_util = 0;
375   *utilization = 0;
376 
377   // start loop from 0 was as don't want to analyze op -1
378   for (auto op_id = 0; op_id < id_count_; op_id++) {
379     int64 sum = 0;
380     int64 index = op_id + 1;
381     for (int i = start_analyze; i < end_analyze; i++) {
382       sum += cpu_op_util_[i][index].user_utilization_;
383       sum += cpu_op_util_[i][index].sys_utilization_;
384     }
385     if ((end_analyze - start_analyze) > 0) {
386       op_util = 1.0 * sum * cpu_processor_num_ / (op_parallel_workers_[op_id] * (end_analyze - start_analyze));
387     }
388     if (op_util > *utilization) {
389       *utilization = op_util;
390       name->clear();
391       (void)name->append(op_name_[op_id]);
392     }
393     (void)extra_message->append(op_name_[op_id] + " utilization per thread: " + std::to_string(op_util) + "% (" +
394                                 std::to_string(op_parallel_workers_[op_id]) + " parallel_workers); ");
395   }
396   return Status::OK();
397 }
398 
SaveToFile(const std::string & file_path)399 Status OperatorCpu::SaveToFile(const std::string &file_path) {
400   Path path = Path(file_path);
401   json output;
402   if (path.Exists()) {
403     MS_LOG(DEBUG) << file_path << "already exist.";
404     try {
405       std::ifstream file(file_path);
406       file >> output;
407     } catch (const std::exception &err) {
408       RETURN_STATUS_UNEXPECTED("Invalid file, failed to open json file: " + file_path +
409                                ", please delete it and try again!");
410     }
411   }
412 
413   uint8_t index = 0;
414   json OpWriter;
415   for (auto op_id = -1; op_id < id_count_; op_id++) {
416     std::vector<uint16_t> user_util;
417     std::vector<uint16_t> sys_util;
418     std::transform(
419       cpu_op_util_.begin(), cpu_op_util_.end(), std::back_inserter(user_util),
420       [&](const std::vector<CpuOpUtil> &info) { return int16_t(info[index].user_utilization_ * cpu_processor_num_); });
421     std::transform(
422       cpu_op_util_.begin(), cpu_op_util_.end(), std::back_inserter(sys_util),
423       [&](const std::vector<CpuOpUtil> &info) { return int16_t(info[index].sys_utilization_ * cpu_processor_num_); });
424 
425     json per_op_info = {{"metrics", {{"user_utilization", user_util}, {"sys_utilization", sys_util}}},
426                         {"op_id", op_id}};
427     OpWriter.emplace_back(per_op_info);
428     index++;
429   }
430   output["op_info"] = OpWriter;
431 
432   // Discard the content of the file when opening.
433   std::ofstream os(file_path, std::ios::trunc);
434   os << output;
435   os.close();
436 
437   MS_LOG(INFO) << "Save device CPU success.";
438   return Status::OK();
439 }
440 
ParseCpuInfo()441 Status ProcessCpu::ParseCpuInfo() {
442   uint64_t total_stat_;
443   RETURN_IF_NOT_OK(GetTotalCpuTime(&total_stat_));
444 
445   if (!pre_fetched_state_) {
446     process_id_.clear();
447     pid_t main_pid = 0;
448 #if defined(USING_LINUX)
449     main_pid = syscall(SYS_getpid);
450 #endif
451     process_id_.emplace_back(main_pid);
452     op_process_ = BaseCpu::op_process_shared_;
453     fetched_all_process_ = BaseCpu::fetched_all_process_shared_;
454     for (const auto &item : op_process_) {
455       for (const auto &id : item.second) {
456         process_id_.emplace_back(id);
457       }
458     }
459   }
460 
461   float user_util = 0, sys_util = 0;
462   for (const auto &pid : process_id_) {
463     std::string stat_path = "/proc/" + std::to_string(pid) + "/stat";
464 
465     std::ifstream file(stat_path);
466     if (!file.is_open()) {
467       MS_LOG(INFO) << "Open CPU file failed when collect CPU information";
468       continue;
469     }
470     std::string str;
471     getline(file, str);
472     uint64_t user = 0, sys = 0;
473     if (sscanf_s(str.c_str(), "%*d %*s %*s %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %lu %lu", &user, &sys) ==
474         EOF) {
475       file.close();
476       return Status(StatusCode::kMDUnexpectedError, "Get device CPU failed.");
477     }
478     file.close();
479 
480     // Calculate the utilization from the second sampling
481     if (!first_collect_ && (pre_process_stat_.find(pid) != pre_process_stat_.end())) {
482       user_util += (user - pre_process_stat_[pid].user_stat_) * 1.0 / (total_stat_ - pre_total_stat_) * 100;
483       sys_util += (sys - pre_process_stat_[pid].sys_stat_) * 1.0 / (total_stat_ - pre_total_stat_) * 100;
484     }
485     pre_process_stat_[pid].user_stat_ = user;
486     pre_process_stat_[pid].sys_stat_ = sys;
487   }
488   if (!first_collect_) {
489     CpuProcessUtil info;
490     info.user_utilization_ = user_util;
491     info.sys_utilization_ = sys_util;
492     process_util_.emplace_back(info);
493   }
494   pre_total_stat_ = total_stat_;
495   first_collect_ = false;
496   pre_fetched_state_ = fetched_all_process_;
497   return Status::OK();
498 }
499 
Collect(const ExecutionTree * tree)500 Status ProcessCpu::Collect(const ExecutionTree *tree) {
501   RETURN_UNEXPECTED_IF_NULL(tree);
502   if (first_collect_) {
503 #if defined(USING_LINUX)
504     cpu_processor_num_ = get_nprocs_conf();
505 #endif
506   }
507   RETURN_IF_NOT_OK(ParseCpuInfo());
508 
509   return Status::OK();
510 }
511 
Analyze(std::string * name,double * utilization,std::string * extra_message)512 Status ProcessCpu::Analyze(std::string *name, double *utilization, std::string *extra_message) {
513   RETURN_UNEXPECTED_IF_NULL(name);
514   RETURN_UNEXPECTED_IF_NULL(utilization);
515   RETURN_UNEXPECTED_IF_NULL(extra_message);
516   name->clear();
517   name->append("process_info");
518   int total_samples = process_util_.size();
519   int64 sum = 0;
520   // Only analyze the middle half of the samples
521   // Starting and ending may be impacted by startup or ending pipeline activities
522   constexpr int64_t sample_sections = 4;
523   int64 start_analyze = total_samples / sample_sections;
524   int64 end_analyze = total_samples - start_analyze;
525 
526   for (int i = start_analyze; i < end_analyze; i++) {
527     sum += process_util_[i].user_utilization_;
528     sum += process_util_[i].sys_utilization_;
529   }
530 
531   if ((end_analyze - start_analyze) > 0) {
532     *utilization = sum / (end_analyze - start_analyze);
533   }
534   return Status::OK();
535 }
536 
SaveToFile(const std::string & file_path)537 Status ProcessCpu::SaveToFile(const std::string &file_path) {
538   Path path = Path(file_path);
539   json output;
540   if (path.Exists()) {
541     MS_LOG(DEBUG) << file_path << "already exist.";
542     try {
543       std::ifstream file(file_path);
544       file >> output;
545     } catch (const std::exception &err) {
546       RETURN_STATUS_UNEXPECTED("Invalid file, failed to open json file: " + file_path +
547                                ", please delete it and try again!");
548     }
549   } else {
550     output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
551   }
552 
553   std::vector<int16_t> user_util;
554   std::transform(process_util_.begin(), process_util_.end(), std::back_inserter(user_util),
555                  [&](const CpuProcessUtil &info) { return uint16_t(info.user_utilization_ * cpu_processor_num_); });
556   std::vector<int16_t> sys_util;
557   std::transform(process_util_.begin(), process_util_.end(), std::back_inserter(sys_util),
558                  [&](const CpuProcessUtil &info) { return uint16_t(info.sys_utilization_ * cpu_processor_num_); });
559 
560   output["process_info"] = {{"user_utilization", user_util}, {"sys_utilization", sys_util}};
561   output["cpu_processor_num"] = cpu_processor_num_;
562 
563   // Discard the content of the file when opening.
564   std::ofstream os(file_path, std::ios::trunc);
565   os << output;
566   os.close();
567 
568   MS_LOG(INFO) << "Save process CPU success.";
569   return Status::OK();
570 }
571 
CollectTimeStamp()572 Status CpuSampling::CollectTimeStamp() {
573   time_stamp_.emplace_back(ProfilingTime::GetCurMilliSecond());
574   return Status::OK();
575 }
576 
577 // Sample action
Sample()578 Status CpuSampling::Sample() {
579   // Collect cpu information
580   for (auto cpu : cpu_) {
581     RETURN_IF_NOT_OK(cpu->Collect(this->tree_));
582   }
583 
584   // Collect time stamp
585   RETURN_IF_NOT_OK(CollectTimeStamp());
586   return Status::OK();
587 }
588 
SaveTimeStampToFile()589 Status CpuSampling::SaveTimeStampToFile() {
590   // Save time stamp to json file
591   // If the file is already exist, simply add the data to corresponding field.
592   Path path = Path(file_path_);
593   json output;
594   if (path.Exists()) {
595     try {
596       std::ifstream file(file_path_);
597       file >> output;
598     } catch (const std::exception &err) {
599       RETURN_STATUS_UNEXPECTED("Invalid file, failed to open json file: " + file_path_ +
600                                ", please delete it and try again!");
601     }
602   }
603   output["time_stamp"] = time_stamp_;
604   std::ofstream os(file_path_, std::ios::trunc);
605   os << output;
606   os.close();
607 
608   return Status::OK();
609 }
610 
SaveSamplingItervalToFile()611 Status CpuSampling::SaveSamplingItervalToFile() {
612   // If the file is already exist, simply add the data to corresponding field.
613   Path path = Path(file_path_);
614   json output;
615   if (path.Exists()) {
616     try {
617       std::ifstream file(file_path_);
618       file >> output;
619     } catch (const std::exception &err) {
620       RETURN_STATUS_UNEXPECTED("Invalid file, failed to open json file: " + file_path_ +
621                                ", please delete it and try again!");
622     }
623   }
624   output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
625   std::ofstream os(file_path_, std::ios::trunc);
626   os << output;
627   os.close();
628 
629   return Status::OK();
630 }
631 
632 // Analyze profiling data and output warning messages
Analyze()633 Status CpuSampling::Analyze() {
634   std::string name;
635   double utilization = 0;
636   constexpr double total_cpu_thold = 90;
637   constexpr double op_cpu_thold = 80;
638   // Keep track of specific information returned by differentn CPU sampling types
639   double total_utilization = 0;
640   double max_op_utilization = 0;
641   std::string max_op_name;
642   std::string detailed_op_cpu_message;
643 
644   // Save cpu information to json file
645   for (auto cpu : cpu_) {
646     std::string extra_message;
647     RETURN_IF_NOT_OK(cpu->Analyze(&name, &utilization, &extra_message));
648     if (name == "device_info") {
649       total_utilization = utilization;
650     } else if (name != "process_info") {
651       max_op_utilization = utilization;
652       max_op_name = name;
653       detailed_op_cpu_message = extra_message;
654     }
655   }
656   if ((total_utilization < total_cpu_thold) && (max_op_utilization > op_cpu_thold)) {
657     MS_LOG(WARNING) << "Operator " << max_op_name << " is using " << max_op_utilization << "% CPU per thread.  "
658                     << "This operator may benefit from increasing num_parallel_workers."
659                     << "Full Operator CPU utiliization for all operators: " << detailed_op_cpu_message << std::endl;
660   }
661   return Status::OK();
662 }
663 
664 // Save profiling data to file
SaveToFile()665 Status CpuSampling::SaveToFile() {
666   // Save time stamp to json file
667   RETURN_IF_NOT_OK(SaveTimeStampToFile());
668 
669   // Save time stamp to json file
670   RETURN_IF_NOT_OK(SaveSamplingItervalToFile());
671 
672   // Save cpu information to json file
673   for (auto cpu : cpu_) {
674     RETURN_IF_NOT_OK(cpu->SaveToFile(file_path_));
675   }
676 
677   return Status::OK();
678 }
679 
Init(const std::string & dir_path,const std::string & device_id)680 Status CpuSampling::Init(const std::string &dir_path, const std::string &device_id) {
681   file_path_ = (Path(dir_path) / Path("minddata_cpu_utilization_" + device_id + ".json")).ToString();
682   std::shared_ptr<DeviceCpu> device_cpu = std::make_shared<DeviceCpu>();
683   std::shared_ptr<OperatorCpu> operator_cpu = std::make_shared<OperatorCpu>();
684   std::shared_ptr<ProcessCpu> process_cpu = std::make_shared<ProcessCpu>();
685   cpu_.push_back(device_cpu);
686   cpu_.push_back(operator_cpu);
687   cpu_.push_back(process_cpu);
688   return Status::OK();
689 }
690 
ChangeFileMode()691 Status CpuSampling::ChangeFileMode() {
692   if (chmod(common::SafeCStr(file_path_), S_IRUSR | S_IWUSR) == -1) {
693     std::string err_str = "Change file mode failed," + file_path_;
694     return Status(StatusCode::kMDUnexpectedError, err_str);
695   }
696   return Status::OK();
697 }
698 }  // namespace dataset
699 }  // namespace mindspore
700