• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/perf/cpu_sampler.h"
17 
18 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
19 #include <sys/syscall.h>
20 #endif
21 #include <algorithm>
22 #include <cmath>
23 #include <cstdio>
24 #include <memory>
25 #include <fstream>
26 #include <string>
27 #include <utility>
28 
29 #include "minddata/dataset/api/python/pybind_conversion.h"
30 #include "minddata/dataset/core/config_manager.h"
31 #include "minddata/dataset/engine/execution_tree.h"
32 #include "minddata/dataset/util/path.h"
33 #include "utils/file_utils.h"
34 
35 namespace mindspore {
36 namespace dataset {
37 using json = nlohmann::json;
38 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
39 #define USING_LINUX
40 #endif
41 
42 #if defined(USING_LINUX)
43 int32_t SystemInfo::num_cpu_ = get_nprocs_conf();
44 #else
45 int32_t SystemInfo::num_cpu_ = 0;
46 #endif
47 
48 constexpr uint64_t kBInMB = 1024;  // Constant for kByte to MByte division conversion
49 
ParseCpuInfo(const std::string & str)50 Status SystemInfo::ParseCpuInfo(const std::string &str) {
51   SystemStat system_cpu_stat;
52   uint64_t nice = 0;
53   uint64_t irq = 0;
54   uint64_t softirq = 0;
55   if (sscanf_s(str.c_str(), "%*s %lu %lu %lu %lu %lu %lu %lu", &system_cpu_stat.user_stat, &nice,
56                &system_cpu_stat.sys_stat, &system_cpu_stat.idle_stat, &system_cpu_stat.io_stat, &irq,
57                &softirq) == EOF) {
58     return Status(StatusCode::kMDUnexpectedError, "Get System CPU failed.");
59   }
60 
61   system_cpu_stat.total_stat = system_cpu_stat.user_stat + nice + system_cpu_stat.sys_stat + system_cpu_stat.idle_stat +
62                                system_cpu_stat.io_stat + irq + softirq;
63   SystemUtil system_cpu_util = {0, 0, 0, 0};
64   // Calculate the utilization from the second sampling
65   if (!first_sample_) {
66     int one_hundred = 100;
67     system_cpu_util.user_utilization =
68       static_cast<uint8_t>(round((system_cpu_stat.user_stat - prev_sys_stat_.user_stat) * 1.0 /
69                                  (system_cpu_stat.total_stat - prev_sys_stat_.total_stat) * one_hundred));
70     system_cpu_util.sys_utilization =
71       static_cast<uint8_t>(round((system_cpu_stat.sys_stat - prev_sys_stat_.sys_stat) * 1.0 /
72                                  (system_cpu_stat.total_stat - prev_sys_stat_.total_stat) * one_hundred));
73     system_cpu_util.io_utilization =
74       static_cast<uint8_t>(round((system_cpu_stat.io_stat - prev_sys_stat_.io_stat) * 1.0 /
75                                  (system_cpu_stat.total_stat - prev_sys_stat_.total_stat) * one_hundred));
76     system_cpu_util.idle_utilization =
77       static_cast<uint8_t>(round((system_cpu_stat.idle_stat - prev_sys_stat_.idle_stat) * 1.0 /
78                                  (system_cpu_stat.total_stat - prev_sys_stat_.total_stat) * one_hundred));
79   }
80   // append the 0 util as well to maintain sys_cpu_util_.size == ts_.size
81   (void)sys_cpu_util_.emplace_back(system_cpu_util);
82   prev_sys_stat_ = system_cpu_stat;
83   return Status::OK();
84 }
85 
ParseCtxt(const std::string & str)86 Status SystemInfo::ParseCtxt(const std::string &str) {
87   uint64_t ctxt;
88   if (sscanf_s(str.c_str(), "%*s %lu", &ctxt) == EOF) {
89     return Status(StatusCode::kMDUnexpectedError, "Get context switch count failed.");
90   }
91   // first context switch count will be 0
92   auto val = first_sample_ ? 0 : ctxt - prev_context_switch_count_;
93   context_switch_count_.push_back(val);
94   prev_context_switch_count_ = ctxt;
95   return Status::OK();
96 }
97 
ParseRunningProcess(const std::string & str)98 Status SystemInfo::ParseRunningProcess(const std::string &str) {
99   uint32_t running_process;
100   if (sscanf_s(str.c_str(), "%*s %ud", &running_process) == EOF) {
101     return Status(StatusCode::kMDUnexpectedError, "Get context switch count failed.");
102   }
103   running_process_.push_back(running_process);
104   return Status::OK();
105 }
106 
SampleAndGetCurrPrevStat(SystemStat * current_stat,SystemStat * previous_stat)107 Status SystemInfo::SampleAndGetCurrPrevStat(SystemStat *current_stat, SystemStat *previous_stat) {
108   RETURN_UNEXPECTED_IF_NULL(previous_stat);
109   std::ifstream file("/proc/stat", std::ios::in);
110   if (!file.is_open()) {
111     MS_LOG(INFO) << "Failed to open /proc/stat file.";
112     return {StatusCode::kMDUnexpectedError, "Failed to open /proc/stat file."};
113   }
114   *previous_stat = prev_sys_stat_;
115   bool first_line = true;
116   std::string line;
117   Status s;
118   while (getline(file, line)) {
119     if (first_line) {
120       first_line = false;
121       s = ParseCpuInfo(line);
122       if (s != Status::OK()) {
123         file.close();
124         return s;
125       }
126       s = ParseCpuInfo(line);
127       if (s.IsError()) {
128         file.close();
129         return s;
130       }
131     }
132     if (line.find("ctxt") != std::string::npos) {
133       s = ParseCtxt(line);
134       if (s != Status::OK()) {
135         file.close();
136         return s;
137       }
138       s = ParseCtxt(line);
139       if (s.IsError()) {
140         file.close();
141         return s;
142       }
143     }
144     if (line.find("procs_running") != std::string::npos) {
145       s = ParseRunningProcess(line);
146       if (s != Status::OK()) {
147         file.close();
148         return s;
149       }
150       s = ParseRunningProcess(line);
151       if (s.IsError()) {
152         file.close();
153         return s;
154       }
155     }
156   }
157   // after the loop above, prev_sys_stat_ has the current value
158   *current_stat = prev_sys_stat_;
159   file.close();
160 
161   first_sample_ = false;
162   RETURN_IF_NOT_OK(SampleSystemMemInfo());
163   return Status::OK();
164 }
165 
SampleSystemMemInfo()166 Status SystemInfo::SampleSystemMemInfo() {
167   std::ifstream file("/proc/meminfo", std::ios::in);
168   if (!file.is_open()) {
169     MS_LOG(INFO) << "Unable to open /proc/meminfo. Continue processing.";
170     last_mem_sampling_failed_ = true;
171     // Note: Return Status:OK() although failed to open /proc/meminfo file
172     return Status::OK();
173   }
174   std::string line;
175   uint64_t total = 0;
176   uint64_t available = 0;
177   uint64_t used = 0;
178   uint64_t curr_val = 0;
179 
180   (void)getline(file, line);
181   if (sscanf_s(line.c_str(), "%*[MemTotal:] %lu %*[kB]", &curr_val) == 1) {
182     total = curr_val;
183     (void)getline(file, line);
184     (void)getline(file, line);
185     if (sscanf_s(line.c_str(), "%*[MemAvailable:] %lu %*[kB]", &curr_val) == 1) {
186       available = curr_val;
187       used = total - available;
188       last_mem_sampling_failed_ = false;
189     } else {
190       prev_system_memory_info_ = {0, 0, 0};
191       last_mem_sampling_failed_ = true;
192     }
193   } else {
194     prev_system_memory_info_ = {0, 0, 0};
195     last_mem_sampling_failed_ = true;
196   }
197   // Note: Must close file before returning from this function.
198   file.close();
199 
200   if (last_mem_sampling_failed_) {
201     return Status::OK();
202   }
203 
204   prev_system_memory_info_.total_mem = static_cast<float>(total) / kBInMB;
205   prev_system_memory_info_.available_mem = static_cast<float>(available) / kBInMB;
206   prev_system_memory_info_.used_mem = static_cast<float>(used) / kBInMB;
207 
208   system_memory_info_.push_back(SystemMemInfo{
209     prev_system_memory_info_.total_mem, prev_system_memory_info_.available_mem, prev_system_memory_info_.used_mem});
210 
211   return Status::OK();
212 }
213 
GetUserCpuUtil(uint64_t start_index,uint64_t end_index,std::vector<uint8_t> * result) const214 Status SystemInfo::GetUserCpuUtil(uint64_t start_index, uint64_t end_index, std::vector<uint8_t> *result) const {
215   RETURN_UNEXPECTED_IF_NULL(result);
216   MS_LOG(DEBUG) << "start_index: " << start_index << " end_index: " << end_index
217                 << " sys_cpu_util_.size: " << sys_cpu_util_.size();
218   CHECK_FAIL_RETURN_UNEXPECTED(start_index <= end_index,
219                                "Expected start_index <= end_index. Got start_index: " + std::to_string(start_index) +
220                                  " end_index: " + std::to_string(end_index));
221   CHECK_FAIL_RETURN_UNEXPECTED(
222     end_index <= sys_cpu_util_.size(),
223     "Expected end_index <= sys_cpu_util_.size(). Got end_index: " + std::to_string(end_index) +
224       " sys_cpu_util_.size: " + std::to_string(sys_cpu_util_.size()));
225   (void)std::transform(sys_cpu_util_.begin() + start_index, sys_cpu_util_.begin() + end_index,
226                        std::back_inserter(*result), [&](const SystemUtil &info) { return info.user_utilization; });
227   return Status::OK();
228 }
229 
GetSysCpuUtil(uint64_t start_index,uint64_t end_index,std::vector<uint8_t> * result) const230 Status SystemInfo::GetSysCpuUtil(uint64_t start_index, uint64_t end_index, std::vector<uint8_t> *result) const {
231   RETURN_UNEXPECTED_IF_NULL(result);
232   MS_LOG(DEBUG) << "start_index: " << start_index << " end_index: " << end_index
233                 << "sys_cpu_util_.size: " << sys_cpu_util_.size();
234   CHECK_FAIL_RETURN_UNEXPECTED(start_index <= end_index,
235                                "Expected start_index <= end_index. Got start_index: " + std::to_string(start_index) +
236                                  " end_index: " + std::to_string(end_index));
237   CHECK_FAIL_RETURN_UNEXPECTED(
238     end_index <= sys_cpu_util_.size(),
239     "Expected end_index <= sys_cpu_util_.size(). Got end_index: " + std::to_string(end_index) +
240       " sys_cpu_util_.size: " + std::to_string(sys_cpu_util_.size()));
241   (void)std::transform(sys_cpu_util_.begin() + start_index, sys_cpu_util_.begin() + end_index,
242                        std::back_inserter(*result), [&](const SystemUtil &info) { return info.sys_utilization; });
243   return Status::OK();
244 }
245 
GetIOCpuUtil() const246 std::vector<uint8_t> SystemInfo::GetIOCpuUtil() const {
247   std::vector<uint8_t> io_util;
248   (void)std::transform(sys_cpu_util_.begin(), sys_cpu_util_.end(), std::back_inserter(io_util),
249                        [&](const SystemUtil &info) { return info.io_utilization; });
250   return io_util;
251 }
252 
GetIdleCpuUtil() const253 std::vector<uint8_t> SystemInfo::GetIdleCpuUtil() const {
254   std::vector<uint8_t> idle_util;
255   (void)std::transform(sys_cpu_util_.begin(), sys_cpu_util_.end(), std::back_inserter(idle_util),
256                        [&](const SystemUtil &info) { return info.idle_utilization; });
257   return idle_util;
258 }
259 
GetSysCpuUtil() const260 std::vector<uint16_t> TaskCpuInfo::GetSysCpuUtil() const {
261   std::vector<uint16_t> sys_util;
262   (void)std::transform(task_cpu_util_.begin(), task_cpu_util_.end(), std::back_inserter(sys_util),
263                        [&](const TaskUtil &info) {
264                          return static_cast<uint16_t>(info.sys_utilization * static_cast<float>(SystemInfo::num_cpu_));
265                        });
266   return sys_util;
267 }
268 
GetUserCpuUtil() const269 std::vector<uint16_t> TaskCpuInfo::GetUserCpuUtil() const {
270   std::vector<uint16_t> user_util;
271   (void)std::transform(task_cpu_util_.begin(), task_cpu_util_.end(), std::back_inserter(user_util),
272                        [&](const TaskUtil &info) {
273                          return static_cast<uint16_t>(info.user_utilization * static_cast<float>(SystemInfo::num_cpu_));
274                        });
275   return user_util;
276 }
277 
GetLatestCpuUtil() const278 TaskUtil TaskCpuInfo::GetLatestCpuUtil() const {
279   TaskUtil ret = {0, 0};
280   if (!task_cpu_util_.empty() && !last_sampling_failed_) {
281     ret = task_cpu_util_.back();
282   }
283   return ret;
284 }
285 
SampleMemInfo()286 Status ProcessInfo::SampleMemInfo() {
287   std::ifstream file("/proc/" + std::to_string(pid_) + "/smaps", std::ios::in);
288   if (!file.is_open()) {
289     MS_LOG(INFO) << "Unable to open /proc/" << pid_ << "/smaps file. Continue processing.";
290     last_mem_sampling_failed_ = true;
291     // Note: Return Status:OK() although failed to open /proc/<pid>/smaps file
292     return Status::OK();
293   }
294   std::string line;
295   uint64_t total_vss = 0;
296   uint64_t total_rss = 0;
297   uint64_t total_pss = 0;
298   uint64_t curr_val = 0;
299   while (getline(file, line)) {
300     if (sscanf_s(line.c_str(), "%*[Size:] %lu %*[kB]", &curr_val) == 1) {
301       total_vss += curr_val;
302     } else if (sscanf_s(line.c_str(), "%*[Rss:] %lu %*[kB]", &curr_val) == 1) {
303       total_rss += curr_val;
304     } else if (sscanf_s(line.c_str(), "%*[Pss:] %lu %*[kB]", &curr_val) == 1) {
305       total_pss += curr_val;
306     }
307   }
308   file.close();
309   last_mem_sampling_failed_ = false;
310 
311   prev_memory_info_.vss = static_cast<float>(total_vss) / kBInMB;
312   prev_memory_info_.rss = static_cast<float>(total_rss) / kBInMB;
313   prev_memory_info_.pss = static_cast<float>(total_pss) / kBInMB;
314 
315   // Sum the memory usage of all child processes and add to parent process
316   if (IsParent()) {
317     for (auto child : child_processes_) {
318       MemoryInfo child_mem_info = child->GetLatestMemoryInfo();
319       prev_memory_info_.vss += child_mem_info.vss;
320       prev_memory_info_.rss += child_mem_info.rss;
321       prev_memory_info_.pss += child_mem_info.pss;
322     }
323   }
324 
325   // Append latest data to vector if we want to track history for this process
326   if (track_sampled_history_) {
327     process_memory_info_.push_back(MemoryInfo{prev_memory_info_.vss, prev_memory_info_.rss, prev_memory_info_.pss});
328   }
329 
330   return Status::OK();
331 }
332 
Sample(uint64_t total_time_elapsed)333 Status ProcessInfo::Sample(uint64_t total_time_elapsed) {
334   std::ifstream file("/proc/" + std::to_string(pid_) + "/stat", std::ios::in);
335   if (!file.is_open()) {
336     MS_LOG(INFO) << "Unable to open /proc/" << pid_ << "/stat file. Continue processing.";
337     last_sampling_failed_ = true;
338     RETURN_IF_NOT_OK(SampleMemInfo());
339     // Note: Return Status:OK() although failed to open /proc/<pid>/stat file
340     return Status::OK();
341   }
342   std::string str;
343   (void)getline(file, str);
344   uint64_t utime = 0, stime = 0;
345   if (sscanf_s(str.c_str(), "%*d %*s %*s %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %lu %lu", &utime, &stime) ==
346       EOF) {
347     file.close();
348     last_sampling_failed_ = true;
349     return Status(StatusCode::kMDUnexpectedError, "Get device CPU failed.");
350   }
351   file.close();
352   last_sampling_failed_ = false;
353   if (!first_sample_ && total_time_elapsed > 0) {
354     float user_util = (utime - prev_task_stat_.user_stat) * 1.0 / (total_time_elapsed)*100.0;
355     float sys_util = (stime - prev_task_stat_.sys_stat) * 1.0 / (total_time_elapsed)*100.0;
356     (void)task_cpu_util_.emplace_back(TaskUtil{user_util, sys_util});
357   }
358   prev_task_stat_.user_stat = utime;
359   prev_task_stat_.sys_stat = stime;
360   first_sample_ = false;
361   RETURN_IF_NOT_OK(SampleMemInfo());
362   return Status::OK();
363 }
364 
Sample(uint64_t total_time_elapsed)365 Status ThreadCpuInfo::Sample(uint64_t total_time_elapsed) {
366   if (last_sampling_failed_) {
367     // thread is probably terminated
368     return Status::OK();
369   }
370   std::ifstream file("/proc/" + std::to_string(pid_) + "/task/" + std::to_string(tid_) + "/stat", std::ios::in);
371   if (!file.is_open()) {
372     MS_LOG(INFO) << "Unable to open /proc/" << pid_ << "/task/" << tid_ << "/stat file. Continue processing.";
373     last_sampling_failed_ = true;
374     return Status::OK();
375   }
376   std::string str;
377   (void)getline(file, str);
378   uint64_t utime;
379   uint64_t stime;
380   if (sscanf_s(str.c_str(), "%*d %*s %*s %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %lu %lu", &utime, &stime) ==
381       EOF) {
382     file.close();
383     last_sampling_failed_ = true;
384     return Status(StatusCode::kMDUnexpectedError, "Get thread CPU failed.");
385   }
386   file.close();
387   last_sampling_failed_ = false;
388   if (!first_sample_) {
389     float user_util = ((utime - prev_task_stat_.user_stat) * 1.0 / total_time_elapsed) * 100.0;
390     float sys_util = ((stime - prev_task_stat_.sys_stat) * 1.0 / total_time_elapsed) * 100.0;
391     (void)task_cpu_util_.emplace_back(TaskUtil{user_util, sys_util});
392   }
393   prev_task_stat_.user_stat = utime;
394   prev_task_stat_.sys_stat = stime;
395   first_sample_ = false;
396   return Status::OK();
397 }
398 
TaskExists(pid_t id) const399 bool MDOperatorCpuInfo::TaskExists(pid_t id) const { return task_by_id_.find(id) != task_by_id_.end(); }
400 
AddTask(const std::shared_ptr<TaskCpuInfo> & task_ptr)401 void MDOperatorCpuInfo::AddTask(const std::shared_ptr<TaskCpuInfo> &task_ptr) {
402   auto id = task_ptr->GetId();
403   if (!TaskExists(id)) {
404     (void)task_by_id_.emplace(id, task_ptr);
405   }
406 }
407 
CalculateOperatorUtilization()408 void MDOperatorCpuInfo::CalculateOperatorUtilization() {
409   OpUtil op_util{0, 0};
410   for (auto const &[task_id, task_ptr] : task_by_id_) {
411     MS_LOG(DEBUG) << "Processing task_id: " << task_id;
412     auto task_util = task_ptr->GetLatestCpuUtil();
413     op_util.user_utilization += task_util.user_utilization;
414     op_util.sys_utilization += task_util.sys_utilization;
415   }
416   (void)op_cpu_util_.emplace_back(op_util);
417 }
418 
GetUserCpuUtil(uint64_t start_index,uint64_t end_index,std::vector<uint16_t> * result) const419 Status MDOperatorCpuInfo::GetUserCpuUtil(uint64_t start_index, uint64_t end_index,
420                                          std::vector<uint16_t> *result) const {
421   RETURN_UNEXPECTED_IF_NULL(result);
422   MS_LOG(DEBUG) << "start_index: " << start_index << " end_index: " << end_index
423                 << " op_cpu_util_.size: " << op_cpu_util_.size();
424   CHECK_FAIL_RETURN_UNEXPECTED(start_index <= end_index,
425                                "Expected start_index <= end_index. Got start_index: " + std::to_string(start_index) +
426                                  " end_index: " + std::to_string(end_index));
427   CHECK_FAIL_RETURN_UNEXPECTED(
428     end_index <= op_cpu_util_.size(),
429     "Expected end_index <= op_cpu_util_.size(). Got end_index: " + std::to_string(end_index) +
430       " op_cpu_util_.size: " + std::to_string(op_cpu_util_.size()));
431   auto first_iter = op_cpu_util_.begin() + start_index;
432   auto last_iter = op_cpu_util_.begin() + end_index;
433   (void)std::transform(first_iter, last_iter, std::back_inserter(*result), [&](const OpUtil &info) {
434     return static_cast<uint16_t>(info.user_utilization * static_cast<float>(SystemInfo::num_cpu_));
435   });
436   return Status::OK();
437 }
438 
GetSysCpuUtil(uint64_t start_index,uint64_t end_index,std::vector<uint16_t> * result) const439 Status MDOperatorCpuInfo::GetSysCpuUtil(uint64_t start_index, uint64_t end_index, std::vector<uint16_t> *result) const {
440   RETURN_UNEXPECTED_IF_NULL(result);
441   MS_LOG(DEBUG) << "start_index: " << start_index << " end_index: " << end_index
442                 << " op_cpu_util_.size: " << op_cpu_util_.size();
443   CHECK_FAIL_RETURN_UNEXPECTED(start_index <= end_index,
444                                "Expected start_index <= end_index. Got start_index: " + std::to_string(start_index) +
445                                  " end_index: " + std::to_string(end_index));
446   CHECK_FAIL_RETURN_UNEXPECTED(
447     end_index <= op_cpu_util_.size(),
448     "Expected end_index <= op_cpu_util_.size(). Got end_index: " + std::to_string(end_index) +
449       " op_cpu_util_.size: " + std::to_string(op_cpu_util_.size()));
450   auto first_iter = op_cpu_util_.begin() + start_index;
451   auto last_iter = op_cpu_util_.begin() + end_index;
452   (void)std::transform(first_iter, last_iter, std::back_inserter(*result), [&](const OpUtil &info) {
453     return static_cast<uint16_t>(info.sys_utilization * static_cast<float>(SystemInfo::num_cpu_));
454   });
455   return Status::OK();
456 }
457 
Sample()458 Status CpuSampler::Sample() {
459   if (active_ == false) {
460     return Status::OK();
461   }
462   std::lock_guard<std::mutex> guard(lock_);
463   // Function to Update TaskList
464   // Loop through all tasks to find any new threads
465   // Get all multi-processing Ops from Python only if fetched_all_process = False
466   // Create new TaskCpuInfo as required and update OpInfo
467   RETURN_IF_NOT_OK(UpdateTaskList());
468 
469   // Sample SystemInfo - Update current and move current to previous stat and calc Util
470   SystemStat current_sys_stat;
471   SystemStat previous_sys_stat;
472   RETURN_IF_NOT_OK(sys_info_.SampleAndGetCurrPrevStat(&current_sys_stat, &previous_sys_stat));
473   auto total_time_elapsed = current_sys_stat.total_stat - previous_sys_stat.total_stat;
474 
475   // Call Sample on all
476   // Read /proc/ files and get stat, calculate util
477   for (auto &task_ptr : tasks_) {
478     (void)task_ptr->Sample(total_time_elapsed);
479   }
480 
481   // Call after Sample is called on all child processes
482   (void)main_process_info_->Sample(total_time_elapsed);
483 
484   // Calculate OperatorCpuInfo
485   for (auto &[op_id, op_info] : op_info_by_id_) {
486     MS_LOG(DEBUG) << "Calculate operator cpu utilization for OpId: " << op_id;
487     op_info.CalculateOperatorUtilization();
488   }
489 
490   // Get sampling time.
491   (void)ts_.emplace_back(ProfilingTime::GetCurMilliSecond());
492 
493   return Status::OK();
494 }
495 
UpdateTaskList()496 Status CpuSampler::UpdateTaskList() {
497   List<Task> allTasks = tree->AllTasks()->GetTask();
498   for (auto &task : allTasks) {
499     int32_t op_id = task.get_operator_id();
500     // check if the op_info was initialized in Init
501     auto iter = op_info_by_id_.find(op_id);
502     if (iter != op_info_by_id_.end()) {
503       int32_t tid = task.get_linux_id();
504       if (!iter->second.TaskExists(tid)) {
505         auto task_cpu_info_ptr = std::make_shared<ThreadCpuInfo>(main_pid_, tid);
506         (void)tasks_.emplace_back(task_cpu_info_ptr);
507         iter->second.AddTask(task_cpu_info_ptr);
508       }
509     }
510   }
511   for (const auto &op : *tree) {
512     std::vector<int32_t> pids = op.GetMPWorkerPIDs();
513     int32_t op_id = op.id();
514     auto iter = op_info_by_id_.find(op_id);
515     if (iter != op_info_by_id_.end()) {
516       for (auto pid : pids) {
517         if (!iter->second.TaskExists(pid)) {
518           auto task_cpu_info_ptr = std::make_shared<ProcessInfo>(pid);
519           (void)tasks_.emplace_back(task_cpu_info_ptr);
520           main_process_info_->AddChildProcess(task_cpu_info_ptr);
521           iter->second.AddTask(task_cpu_info_ptr);
522         }
523       }
524     }
525   }
526 
527   if (!fetched_all_python_multiprocesses_ && tree->IsPython()) {
528     py::gil_scoped_acquire gil_acquire;
529     py::module ds = py::module::import("mindspore.dataset.engine.datasets");
530     py::tuple process_info = ds.attr("_get_operator_process")();
531     auto sub_process = py::reinterpret_borrow<py::dict>(process_info[0]);
532     fetched_all_python_multiprocesses_ = py::reinterpret_borrow<py::bool_>(process_info[1]);
533     // parse dict value
534     auto op_to_process = toIntMap(sub_process);
535     for (auto const &[op_id, process_list] : op_to_process) {
536       for (auto pid : process_list) {
537         auto iter = op_info_by_id_.find(op_id);
538         if (iter != op_info_by_id_.end()) {
539           if (!iter->second.TaskExists(pid)) {
540             auto task_cpu_info_ptr = std::make_shared<ProcessInfo>(pid);
541             (void)tasks_.emplace_back(task_cpu_info_ptr);
542             main_process_info_->AddChildProcess(task_cpu_info_ptr);
543             iter->second.AddTask(task_cpu_info_ptr);
544           }
545         }
546       }
547     }
548   }
549 
550   return Status::OK();
551 }
552 
Init()553 Status CpuSampler::Init() {
554 #if defined(USING_LINUX)
555   main_pid_ = syscall(SYS_getpid);
556 #endif
557   for (auto iter = tree->begin(); iter != tree->end(); (void)iter++) {
558     auto op_id = iter->id();
559     (void)op_info_by_id_.emplace(std::make_pair(op_id, MDOperatorCpuInfo(op_id)));
560   }
561   // thread id of main thread is same as the process ID
562   main_thread_cpu_info_ = std::make_shared<ThreadCpuInfo>(main_pid_, main_pid_);
563   (void)tasks_.emplace_back(main_thread_cpu_info_);
564   main_process_info_ = std::make_shared<ProcessInfo>(main_pid_, true);
565   return Status::OK();
566 }
567 
Clear()568 void CpuSampler::Clear() {
569   ts_.clear();
570   tasks_.clear();
571   main_thread_cpu_info_.reset();
572   main_process_info_.reset();
573   op_info_by_id_.clear();
574   fetched_all_python_multiprocesses_ = false;
575 }
576 
ChangeFileMode(const std::string & dir_path,const std::string & rank_id)577 Status CpuSampler::ChangeFileMode(const std::string &dir_path, const std::string &rank_id) {
578   Path path = GetFileName(dir_path, rank_id);
579   std::string file_path = path.ToString();
580   mindspore::ChangeFileMode(file_path, S_IRUSR | S_IWUSR);
581   return Status::OK();
582 }
583 
SaveToFile(const std::string & dir_path,const std::string & rank_id)584 Status CpuSampler::SaveToFile(const std::string &dir_path, const std::string &rank_id) {
585   Path path = GetFileName(dir_path, rank_id);
586   // Remove the file if it exists (from prior profiling usage)
587   RETURN_IF_NOT_OK(path.Remove());
588   std::string file_path = path.ToString();
589 
590   // construct json obj to write to file
591   json output;
592   output["cpu_processor_num"] = SystemInfo::num_cpu_;
593   std::vector<uint8_t> system_user_util, system_sys_util;
594   // end_index = ts_.size() essentially means to get all sampled points
595   (void)sys_info_.GetUserCpuUtil(0, ts_.size(), &system_user_util);
596   (void)sys_info_.GetSysCpuUtil(0, ts_.size(), &system_sys_util);
597   output["device_info"] = {{"context_switch_count", sys_info_.GetContextSwitchCount()},
598                            {"idle_utilization", sys_info_.GetIdleCpuUtil()},
599                            {"io_utilization", sys_info_.GetIOCpuUtil()},
600                            {"sys_utilization", system_sys_util},
601                            {"user_utilization", system_user_util},
602                            {"runnable_process", sys_info_.GetRunningProcess()}};
603   // array of op_info json objects
604   json op_infos;
605   for (auto &[op_id, op_info] : op_info_by_id_) {
606     MS_LOG(INFO) << "Processing op_id: " << op_id;
607     std::vector<uint16_t> user_util, sys_util;
608     (void)op_info.GetSysCpuUtil(0, ts_.size(), &sys_util);
609     (void)op_info.GetUserCpuUtil(0, ts_.size(), &user_util);
610     json op_info_json = {{"metrics", {{"user_utilization", user_util}, {"sys_utilization", sys_util}}},
611                          {"op_id", op_id}};
612     (void)op_infos.emplace_back(op_info_json);
613   }
614   output["op_info"] = op_infos;
615 
616   output["process_info"] = {{"user_utilization", main_process_info_->GetUserCpuUtil()},
617                             {"sys_utilization", main_process_info_->GetSysCpuUtil()}};
618 
619   output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
620   output["time_stamp"] = ts_;
621 
622   std::vector<float> vss, rss, pss;
623   (void)main_process_info_->GetMemoryInfo(ProcessMemoryMetric::kVSS, 0, ts_.size(), &vss);
624   (void)main_process_info_->GetMemoryInfo(ProcessMemoryMetric::kRSS, 0, ts_.size(), &rss);
625   (void)main_process_info_->GetMemoryInfo(ProcessMemoryMetric::kPSS, 0, ts_.size(), &pss);
626   output["process_memory_info"] = {{"vss_mbytes", vss}, {"rss_mbytes", rss}, {"pss_mbytes", pss}};
627 
628   std::vector<float> mem_total, mem_avail, mem_used;
629   (void)sys_info_.GetSystemMemInfo(SystemMemoryMetric::kMemoryTotal, 0, ts_.size(), &mem_total);
630   (void)sys_info_.GetSystemMemInfo(SystemMemoryMetric::kMemoryAvailable, 0, ts_.size(), &mem_avail);
631   (void)sys_info_.GetSystemMemInfo(SystemMemoryMetric::kMemoryUsed, 0, ts_.size(), &mem_used);
632   output["system_memory_info"] = {{"total_sys_memory_mbytes", mem_total},
633                                   {"available_sys_memory_mbytes", mem_avail},
634                                   {"used_sys_memory_mbytes", mem_used}};
635 
636   // Discard the content of the file when opening.
637   std::ofstream os(file_path, std::ios::out | std::ios::trunc);
638   os << output;
639   os.close();
640 
641   mindspore::ChangeFileMode(file_path, S_IRUSR | S_IWUSR);
642 
643   return Status::OK();
644 }
645 
GetOpUserCpuUtil(int32_t op_id,uint64_t start_ts,uint64_t end_ts,std::vector<uint16_t> * result)646 Status CpuSampler::GetOpUserCpuUtil(int32_t op_id, uint64_t start_ts, uint64_t end_ts, std::vector<uint16_t> *result) {
647   std::lock_guard<std::mutex> guard(lock_);
648   // find first ts that is not less than start_ts
649   auto lower = std::lower_bound(ts_.begin(), ts_.end(), start_ts);
650   // find first ts that is greater than end_ts
651   auto upper = std::upper_bound(ts_.begin(), ts_.end(), end_ts);
652   // std::distance is O(1) since vector allows random access
653   auto start_index = std::distance(ts_.begin(), lower);
654   auto end_index = std::distance(ts_.begin(), upper);
655   auto op_info = op_info_by_id_.find(op_id);
656   CHECK_FAIL_RETURN_UNEXPECTED(op_info != op_info_by_id_.end(), "Op Id: " + std::to_string(op_id) + " not found.");
657   return op_info->second.GetUserCpuUtil(start_index, end_index, result);
658 }
659 
GetOpSysCpuUtil(int32_t op_id,uint64_t start_ts,uint64_t end_ts,std::vector<uint16_t> * result)660 Status CpuSampler::GetOpSysCpuUtil(int32_t op_id, uint64_t start_ts, uint64_t end_ts, std::vector<uint16_t> *result) {
661   std::lock_guard<std::mutex> guard(lock_);
662   // find first ts that is not less than start_ts
663   auto lower = std::lower_bound(ts_.begin(), ts_.end(), start_ts);
664   // find first ts that is greater than end_ts
665   auto upper = std::upper_bound(ts_.begin(), ts_.end(), end_ts);
666   // std::distance is O(1) since vector allows random access
667   auto start_index = std::distance(ts_.begin(), lower);
668   auto end_index = std::distance(ts_.begin(), upper);
669   auto op_info = op_info_by_id_.find(op_id);
670   CHECK_FAIL_RETURN_UNEXPECTED(op_info != op_info_by_id_.end(), "Op Id: " + std::to_string(op_id) + " not found.");
671   return op_info->second.GetSysCpuUtil(start_index, end_index, result);
672 }
673 
GetSystemUserCpuUtil(uint64_t start_ts,uint64_t end_ts,std::vector<uint8_t> * result)674 Status CpuSampler::GetSystemUserCpuUtil(uint64_t start_ts, uint64_t end_ts, std::vector<uint8_t> *result) {
675   std::lock_guard<std::mutex> guard(lock_);
676   // find first ts that is not less than start_ts
677   auto lower = std::lower_bound(ts_.begin(), ts_.end(), start_ts);
678   // find first ts that is greater than end_ts
679   auto upper = std::upper_bound(ts_.begin(), ts_.end(), end_ts);
680   // std::distance is O(1) since vector allows random access
681   auto start_index = std::distance(ts_.begin(), lower);
682   auto end_index = std::distance(ts_.begin(), upper);
683   return sys_info_.GetUserCpuUtil(start_index, end_index, result);
684 }
685 
GetSystemSysCpuUtil(uint64_t start_ts,uint64_t end_ts,std::vector<uint8_t> * result)686 Status CpuSampler::GetSystemSysCpuUtil(uint64_t start_ts, uint64_t end_ts, std::vector<uint8_t> *result) {
687   std::lock_guard<std::mutex> guard(lock_);
688   // find first ts that is not less than start_ts
689   auto lower = std::lower_bound(ts_.begin(), ts_.end(), start_ts);
690   // find first ts that is greater than end_ts
691   auto upper = std::upper_bound(ts_.begin(), ts_.end(), end_ts);
692   // std::distance is O(1) since vector allows random access
693   auto start_index = std::distance(ts_.begin(), lower);
694   auto end_index = std::distance(ts_.begin(), upper);
695   return sys_info_.GetSysCpuUtil(start_index, end_index, result);
696 }
697 
GetFileName(const std::string & dir_path,const std::string & rank_id)698 Path CpuSampler::GetFileName(const std::string &dir_path, const std::string &rank_id) {
699   return Path(dir_path) / Path("minddata_cpu_utilization_" + rank_id + ".json");
700 }
701 
GetLatestMemoryInfo() const702 MemoryInfo ProcessInfo::GetLatestMemoryInfo() const {
703   MemoryInfo ret = {0, 0, 0};
704   if (!last_mem_sampling_failed_) {
705     ret = prev_memory_info_;
706   }
707   return ret;
708 }
709 
GetMemoryInfo(ProcessMemoryMetric metric,uint64_t start_index,uint64_t end_index,std::vector<float> * result) const710 Status ProcessInfo::GetMemoryInfo(ProcessMemoryMetric metric, uint64_t start_index, uint64_t end_index,
711                                   std::vector<float> *result) const {
712   RETURN_UNEXPECTED_IF_NULL(result);
713   MS_LOG(DEBUG) << "start_index: " << start_index << " end_index: " << end_index
714                 << "process_memory_info_.size: " << process_memory_info_.size();
715   CHECK_FAIL_RETURN_UNEXPECTED(start_index <= end_index,
716                                "Expected start_index <= end_index. Got start_index: " + std::to_string(start_index) +
717                                  " end_index: " + std::to_string(end_index));
718   CHECK_FAIL_RETURN_UNEXPECTED(
719     end_index <= process_memory_info_.size(),
720     "Expected end_index <= process_memory_info_.size(). Got end_index: " + std::to_string(end_index) +
721       " process_memory_info_.size: " + std::to_string(process_memory_info_.size()));
722   if (metric == ProcessMemoryMetric::kVSS) {
723     (void)std::transform(process_memory_info_.begin() + start_index, process_memory_info_.begin() + end_index,
724                          std::back_inserter(*result),
725                          [&](const MemoryInfo &info) { return static_cast<float>(info.vss); });
726   } else if (metric == ProcessMemoryMetric::kRSS) {
727     (void)std::transform(process_memory_info_.begin() + start_index, process_memory_info_.begin() + end_index,
728                          std::back_inserter(*result),
729                          [&](const MemoryInfo &info) { return static_cast<float>(info.rss); });
730   } else if (metric == ProcessMemoryMetric::kPSS) {
731     (void)std::transform(process_memory_info_.begin() + start_index, process_memory_info_.begin() + end_index,
732                          std::back_inserter(*result),
733                          [&](const MemoryInfo &info) { return static_cast<float>(info.pss); });
734   }
735   return Status::OK();
736 }
737 
GetProcessMemoryInfo(ProcessMemoryMetric metric,uint64_t start_index,uint64_t end_index,std::vector<float> * result)738 Status CpuSampler::GetProcessMemoryInfo(ProcessMemoryMetric metric, uint64_t start_index, uint64_t end_index,
739                                         std::vector<float> *result) {
740   return (main_process_info_->GetMemoryInfo(metric, start_index, end_index, result));
741 }
742 
AddChildProcess(const std::shared_ptr<ProcessInfo> & child_ptr)743 void ProcessInfo::AddChildProcess(const std::shared_ptr<ProcessInfo> &child_ptr) {
744   (void)child_processes_.emplace_back(child_ptr);
745 }
746 
IsParent()747 bool ProcessInfo::IsParent() { return !(child_processes_.empty()); }
748 
GetSystemMemInfo(SystemMemoryMetric metric,uint64_t start_index,uint64_t end_index,std::vector<float> * result) const749 Status SystemInfo::GetSystemMemInfo(SystemMemoryMetric metric, uint64_t start_index, uint64_t end_index,
750                                     std::vector<float> *result) const {
751   RETURN_UNEXPECTED_IF_NULL(result);
752   MS_LOG(DEBUG) << "start_index: " << start_index << " end_index: " << end_index
753                 << "system_memory_info_.size: " << system_memory_info_.size();
754   CHECK_FAIL_RETURN_UNEXPECTED(start_index <= end_index,
755                                "Expected start_index <= end_index. Got start_index: " + std::to_string(start_index) +
756                                  " end_index: " + std::to_string(end_index));
757   CHECK_FAIL_RETURN_UNEXPECTED(
758     end_index <= system_memory_info_.size(),
759     "Expected end_index <= system_memory_info_.size(). Got end_index: " + std::to_string(end_index) +
760       " system_memory_info_.size: " + std::to_string(system_memory_info_.size()));
761   if (metric == SystemMemoryMetric::kMemoryTotal) {
762     (void)std::transform(system_memory_info_.begin() + start_index, system_memory_info_.begin() + end_index,
763                          std::back_inserter(*result),
764                          [&](const SystemMemInfo &info) { return static_cast<float>(info.total_mem); });
765   } else if (metric == SystemMemoryMetric::kMemoryAvailable) {
766     (void)std::transform(system_memory_info_.begin() + start_index, system_memory_info_.begin() + end_index,
767                          std::back_inserter(*result),
768                          [&](const SystemMemInfo &info) { return static_cast<float>(info.available_mem); });
769   } else if (metric == SystemMemoryMetric::kMemoryUsed) {
770     (void)std::transform(system_memory_info_.begin() + start_index, system_memory_info_.begin() + end_index,
771                          std::back_inserter(*result),
772                          [&](const SystemMemInfo &info) { return static_cast<float>(info.used_mem); });
773   }
774   return Status::OK();
775 }
776 
GetSystemMemoryInfo(SystemMemoryMetric metric,uint64_t start_index,uint64_t end_index,std::vector<float> * result)777 Status CpuSampler::GetSystemMemoryInfo(SystemMemoryMetric metric, uint64_t start_index, uint64_t end_index,
778                                        std::vector<float> *result) {
779   return (sys_info_.GetSystemMemInfo(metric, start_index, end_index, result));
780 }
781 }  // namespace dataset
782 }  // namespace mindspore
783