1 /**
2 * Copyright 2021-2022 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/perf/cpu_sampler.h"
17
18 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
19 #include <sys/syscall.h>
20 #endif
21 #include <algorithm>
22 #include <cmath>
23 #include <cstdio>
24 #include <memory>
25 #include <fstream>
26 #include <string>
27 #include <utility>
28
29 #include "minddata/dataset/api/python/pybind_conversion.h"
30 #include "minddata/dataset/core/config_manager.h"
31 #include "minddata/dataset/engine/execution_tree.h"
32 #include "minddata/dataset/util/path.h"
33 #include "utils/file_utils.h"
34
35 namespace mindspore {
36 namespace dataset {
37 using json = nlohmann::json;
38 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
39 #define USING_LINUX
40 #endif
41
42 #if defined(USING_LINUX)
43 int32_t SystemInfo::num_cpu_ = get_nprocs_conf();
44 #else
45 int32_t SystemInfo::num_cpu_ = 0;
46 #endif
47
48 constexpr uint64_t kBInMB = 1024; // Constant for kByte to MByte division conversion
49
ParseCpuInfo(const std::string & str)50 Status SystemInfo::ParseCpuInfo(const std::string &str) {
51 SystemStat system_cpu_stat;
52 uint64_t nice = 0;
53 uint64_t irq = 0;
54 uint64_t softirq = 0;
55 if (sscanf_s(str.c_str(), "%*s %lu %lu %lu %lu %lu %lu %lu", &system_cpu_stat.user_stat, &nice,
56 &system_cpu_stat.sys_stat, &system_cpu_stat.idle_stat, &system_cpu_stat.io_stat, &irq,
57 &softirq) == EOF) {
58 return Status(StatusCode::kMDUnexpectedError, "Get System CPU failed.");
59 }
60
61 system_cpu_stat.total_stat = system_cpu_stat.user_stat + nice + system_cpu_stat.sys_stat + system_cpu_stat.idle_stat +
62 system_cpu_stat.io_stat + irq + softirq;
63 SystemUtil system_cpu_util = {0, 0, 0, 0};
64 // Calculate the utilization from the second sampling
65 if (!first_sample_) {
66 int one_hundred = 100;
67 system_cpu_util.user_utilization =
68 static_cast<uint8_t>(round((system_cpu_stat.user_stat - prev_sys_stat_.user_stat) * 1.0 /
69 (system_cpu_stat.total_stat - prev_sys_stat_.total_stat) * one_hundred));
70 system_cpu_util.sys_utilization =
71 static_cast<uint8_t>(round((system_cpu_stat.sys_stat - prev_sys_stat_.sys_stat) * 1.0 /
72 (system_cpu_stat.total_stat - prev_sys_stat_.total_stat) * one_hundred));
73 system_cpu_util.io_utilization =
74 static_cast<uint8_t>(round((system_cpu_stat.io_stat - prev_sys_stat_.io_stat) * 1.0 /
75 (system_cpu_stat.total_stat - prev_sys_stat_.total_stat) * one_hundred));
76 system_cpu_util.idle_utilization =
77 static_cast<uint8_t>(round((system_cpu_stat.idle_stat - prev_sys_stat_.idle_stat) * 1.0 /
78 (system_cpu_stat.total_stat - prev_sys_stat_.total_stat) * one_hundred));
79 }
80 // append the 0 util as well to maintain sys_cpu_util_.size == ts_.size
81 (void)sys_cpu_util_.emplace_back(system_cpu_util);
82 prev_sys_stat_ = system_cpu_stat;
83 return Status::OK();
84 }
85
ParseCtxt(const std::string & str)86 Status SystemInfo::ParseCtxt(const std::string &str) {
87 uint64_t ctxt;
88 if (sscanf_s(str.c_str(), "%*s %lu", &ctxt) == EOF) {
89 return Status(StatusCode::kMDUnexpectedError, "Get context switch count failed.");
90 }
91 // first context switch count will be 0
92 auto val = first_sample_ ? 0 : ctxt - prev_context_switch_count_;
93 context_switch_count_.push_back(val);
94 prev_context_switch_count_ = ctxt;
95 return Status::OK();
96 }
97
ParseRunningProcess(const std::string & str)98 Status SystemInfo::ParseRunningProcess(const std::string &str) {
99 uint32_t running_process;
100 if (sscanf_s(str.c_str(), "%*s %ud", &running_process) == EOF) {
101 return Status(StatusCode::kMDUnexpectedError, "Get context switch count failed.");
102 }
103 running_process_.push_back(running_process);
104 return Status::OK();
105 }
106
SampleAndGetCurrPrevStat(SystemStat * current_stat,SystemStat * previous_stat)107 Status SystemInfo::SampleAndGetCurrPrevStat(SystemStat *current_stat, SystemStat *previous_stat) {
108 RETURN_UNEXPECTED_IF_NULL(previous_stat);
109 std::ifstream file("/proc/stat", std::ios::in);
110 if (!file.is_open()) {
111 MS_LOG(INFO) << "Failed to open /proc/stat file.";
112 return {StatusCode::kMDUnexpectedError, "Failed to open /proc/stat file."};
113 }
114 *previous_stat = prev_sys_stat_;
115 bool first_line = true;
116 std::string line;
117 Status s;
118 while (getline(file, line)) {
119 if (first_line) {
120 first_line = false;
121 s = ParseCpuInfo(line);
122 if (s != Status::OK()) {
123 file.close();
124 return s;
125 }
126 s = ParseCpuInfo(line);
127 if (s.IsError()) {
128 file.close();
129 return s;
130 }
131 }
132 if (line.find("ctxt") != std::string::npos) {
133 s = ParseCtxt(line);
134 if (s != Status::OK()) {
135 file.close();
136 return s;
137 }
138 s = ParseCtxt(line);
139 if (s.IsError()) {
140 file.close();
141 return s;
142 }
143 }
144 if (line.find("procs_running") != std::string::npos) {
145 s = ParseRunningProcess(line);
146 if (s != Status::OK()) {
147 file.close();
148 return s;
149 }
150 s = ParseRunningProcess(line);
151 if (s.IsError()) {
152 file.close();
153 return s;
154 }
155 }
156 }
157 // after the loop above, prev_sys_stat_ has the current value
158 *current_stat = prev_sys_stat_;
159 file.close();
160
161 first_sample_ = false;
162 RETURN_IF_NOT_OK(SampleSystemMemInfo());
163 return Status::OK();
164 }
165
SampleSystemMemInfo()166 Status SystemInfo::SampleSystemMemInfo() {
167 std::ifstream file("/proc/meminfo", std::ios::in);
168 if (!file.is_open()) {
169 MS_LOG(INFO) << "Unable to open /proc/meminfo. Continue processing.";
170 last_mem_sampling_failed_ = true;
171 // Note: Return Status:OK() although failed to open /proc/meminfo file
172 return Status::OK();
173 }
174 std::string line;
175 uint64_t total = 0;
176 uint64_t available = 0;
177 uint64_t used = 0;
178 uint64_t curr_val = 0;
179
180 (void)getline(file, line);
181 if (sscanf_s(line.c_str(), "%*[MemTotal:] %lu %*[kB]", &curr_val) == 1) {
182 total = curr_val;
183 (void)getline(file, line);
184 (void)getline(file, line);
185 if (sscanf_s(line.c_str(), "%*[MemAvailable:] %lu %*[kB]", &curr_val) == 1) {
186 available = curr_val;
187 used = total - available;
188 last_mem_sampling_failed_ = false;
189 } else {
190 prev_system_memory_info_ = {0, 0, 0};
191 last_mem_sampling_failed_ = true;
192 }
193 } else {
194 prev_system_memory_info_ = {0, 0, 0};
195 last_mem_sampling_failed_ = true;
196 }
197 // Note: Must close file before returning from this function.
198 file.close();
199
200 if (last_mem_sampling_failed_) {
201 return Status::OK();
202 }
203
204 prev_system_memory_info_.total_mem = static_cast<float>(total) / kBInMB;
205 prev_system_memory_info_.available_mem = static_cast<float>(available) / kBInMB;
206 prev_system_memory_info_.used_mem = static_cast<float>(used) / kBInMB;
207
208 system_memory_info_.push_back(SystemMemInfo{
209 prev_system_memory_info_.total_mem, prev_system_memory_info_.available_mem, prev_system_memory_info_.used_mem});
210
211 return Status::OK();
212 }
213
GetUserCpuUtil(uint64_t start_index,uint64_t end_index,std::vector<uint8_t> * result) const214 Status SystemInfo::GetUserCpuUtil(uint64_t start_index, uint64_t end_index, std::vector<uint8_t> *result) const {
215 RETURN_UNEXPECTED_IF_NULL(result);
216 MS_LOG(DEBUG) << "start_index: " << start_index << " end_index: " << end_index
217 << " sys_cpu_util_.size: " << sys_cpu_util_.size();
218 CHECK_FAIL_RETURN_UNEXPECTED(start_index <= end_index,
219 "Expected start_index <= end_index. Got start_index: " + std::to_string(start_index) +
220 " end_index: " + std::to_string(end_index));
221 CHECK_FAIL_RETURN_UNEXPECTED(
222 end_index <= sys_cpu_util_.size(),
223 "Expected end_index <= sys_cpu_util_.size(). Got end_index: " + std::to_string(end_index) +
224 " sys_cpu_util_.size: " + std::to_string(sys_cpu_util_.size()));
225 (void)std::transform(sys_cpu_util_.begin() + start_index, sys_cpu_util_.begin() + end_index,
226 std::back_inserter(*result), [&](const SystemUtil &info) { return info.user_utilization; });
227 return Status::OK();
228 }
229
GetSysCpuUtil(uint64_t start_index,uint64_t end_index,std::vector<uint8_t> * result) const230 Status SystemInfo::GetSysCpuUtil(uint64_t start_index, uint64_t end_index, std::vector<uint8_t> *result) const {
231 RETURN_UNEXPECTED_IF_NULL(result);
232 MS_LOG(DEBUG) << "start_index: " << start_index << " end_index: " << end_index
233 << "sys_cpu_util_.size: " << sys_cpu_util_.size();
234 CHECK_FAIL_RETURN_UNEXPECTED(start_index <= end_index,
235 "Expected start_index <= end_index. Got start_index: " + std::to_string(start_index) +
236 " end_index: " + std::to_string(end_index));
237 CHECK_FAIL_RETURN_UNEXPECTED(
238 end_index <= sys_cpu_util_.size(),
239 "Expected end_index <= sys_cpu_util_.size(). Got end_index: " + std::to_string(end_index) +
240 " sys_cpu_util_.size: " + std::to_string(sys_cpu_util_.size()));
241 (void)std::transform(sys_cpu_util_.begin() + start_index, sys_cpu_util_.begin() + end_index,
242 std::back_inserter(*result), [&](const SystemUtil &info) { return info.sys_utilization; });
243 return Status::OK();
244 }
245
GetIOCpuUtil() const246 std::vector<uint8_t> SystemInfo::GetIOCpuUtil() const {
247 std::vector<uint8_t> io_util;
248 (void)std::transform(sys_cpu_util_.begin(), sys_cpu_util_.end(), std::back_inserter(io_util),
249 [&](const SystemUtil &info) { return info.io_utilization; });
250 return io_util;
251 }
252
GetIdleCpuUtil() const253 std::vector<uint8_t> SystemInfo::GetIdleCpuUtil() const {
254 std::vector<uint8_t> idle_util;
255 (void)std::transform(sys_cpu_util_.begin(), sys_cpu_util_.end(), std::back_inserter(idle_util),
256 [&](const SystemUtil &info) { return info.idle_utilization; });
257 return idle_util;
258 }
259
GetSysCpuUtil() const260 std::vector<uint16_t> TaskCpuInfo::GetSysCpuUtil() const {
261 std::vector<uint16_t> sys_util;
262 (void)std::transform(task_cpu_util_.begin(), task_cpu_util_.end(), std::back_inserter(sys_util),
263 [&](const TaskUtil &info) {
264 return static_cast<uint16_t>(info.sys_utilization * static_cast<float>(SystemInfo::num_cpu_));
265 });
266 return sys_util;
267 }
268
GetUserCpuUtil() const269 std::vector<uint16_t> TaskCpuInfo::GetUserCpuUtil() const {
270 std::vector<uint16_t> user_util;
271 (void)std::transform(task_cpu_util_.begin(), task_cpu_util_.end(), std::back_inserter(user_util),
272 [&](const TaskUtil &info) {
273 return static_cast<uint16_t>(info.user_utilization * static_cast<float>(SystemInfo::num_cpu_));
274 });
275 return user_util;
276 }
277
GetLatestCpuUtil() const278 TaskUtil TaskCpuInfo::GetLatestCpuUtil() const {
279 TaskUtil ret = {0, 0};
280 if (!task_cpu_util_.empty() && !last_sampling_failed_) {
281 ret = task_cpu_util_.back();
282 }
283 return ret;
284 }
285
SampleMemInfo()286 Status ProcessInfo::SampleMemInfo() {
287 std::ifstream file("/proc/" + std::to_string(pid_) + "/smaps", std::ios::in);
288 if (!file.is_open()) {
289 MS_LOG(INFO) << "Unable to open /proc/" << pid_ << "/smaps file. Continue processing.";
290 last_mem_sampling_failed_ = true;
291 // Note: Return Status:OK() although failed to open /proc/<pid>/smaps file
292 return Status::OK();
293 }
294 std::string line;
295 uint64_t total_vss = 0;
296 uint64_t total_rss = 0;
297 uint64_t total_pss = 0;
298 uint64_t curr_val = 0;
299 while (getline(file, line)) {
300 if (sscanf_s(line.c_str(), "%*[Size:] %lu %*[kB]", &curr_val) == 1) {
301 total_vss += curr_val;
302 } else if (sscanf_s(line.c_str(), "%*[Rss:] %lu %*[kB]", &curr_val) == 1) {
303 total_rss += curr_val;
304 } else if (sscanf_s(line.c_str(), "%*[Pss:] %lu %*[kB]", &curr_val) == 1) {
305 total_pss += curr_val;
306 }
307 }
308 file.close();
309 last_mem_sampling_failed_ = false;
310
311 prev_memory_info_.vss = static_cast<float>(total_vss) / kBInMB;
312 prev_memory_info_.rss = static_cast<float>(total_rss) / kBInMB;
313 prev_memory_info_.pss = static_cast<float>(total_pss) / kBInMB;
314
315 // Sum the memory usage of all child processes and add to parent process
316 if (IsParent()) {
317 for (auto child : child_processes_) {
318 MemoryInfo child_mem_info = child->GetLatestMemoryInfo();
319 prev_memory_info_.vss += child_mem_info.vss;
320 prev_memory_info_.rss += child_mem_info.rss;
321 prev_memory_info_.pss += child_mem_info.pss;
322 }
323 }
324
325 // Append latest data to vector if we want to track history for this process
326 if (track_sampled_history_) {
327 process_memory_info_.push_back(MemoryInfo{prev_memory_info_.vss, prev_memory_info_.rss, prev_memory_info_.pss});
328 }
329
330 return Status::OK();
331 }
332
Sample(uint64_t total_time_elapsed)333 Status ProcessInfo::Sample(uint64_t total_time_elapsed) {
334 std::ifstream file("/proc/" + std::to_string(pid_) + "/stat", std::ios::in);
335 if (!file.is_open()) {
336 MS_LOG(INFO) << "Unable to open /proc/" << pid_ << "/stat file. Continue processing.";
337 last_sampling_failed_ = true;
338 RETURN_IF_NOT_OK(SampleMemInfo());
339 // Note: Return Status:OK() although failed to open /proc/<pid>/stat file
340 return Status::OK();
341 }
342 std::string str;
343 (void)getline(file, str);
344 uint64_t utime = 0, stime = 0;
345 if (sscanf_s(str.c_str(), "%*d %*s %*s %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %lu %lu", &utime, &stime) ==
346 EOF) {
347 file.close();
348 last_sampling_failed_ = true;
349 return Status(StatusCode::kMDUnexpectedError, "Get device CPU failed.");
350 }
351 file.close();
352 last_sampling_failed_ = false;
353 if (!first_sample_ && total_time_elapsed > 0) {
354 float user_util = (utime - prev_task_stat_.user_stat) * 1.0 / (total_time_elapsed)*100.0;
355 float sys_util = (stime - prev_task_stat_.sys_stat) * 1.0 / (total_time_elapsed)*100.0;
356 (void)task_cpu_util_.emplace_back(TaskUtil{user_util, sys_util});
357 }
358 prev_task_stat_.user_stat = utime;
359 prev_task_stat_.sys_stat = stime;
360 first_sample_ = false;
361 RETURN_IF_NOT_OK(SampleMemInfo());
362 return Status::OK();
363 }
364
Sample(uint64_t total_time_elapsed)365 Status ThreadCpuInfo::Sample(uint64_t total_time_elapsed) {
366 if (last_sampling_failed_) {
367 // thread is probably terminated
368 return Status::OK();
369 }
370 std::ifstream file("/proc/" + std::to_string(pid_) + "/task/" + std::to_string(tid_) + "/stat", std::ios::in);
371 if (!file.is_open()) {
372 MS_LOG(INFO) << "Unable to open /proc/" << pid_ << "/task/" << tid_ << "/stat file. Continue processing.";
373 last_sampling_failed_ = true;
374 return Status::OK();
375 }
376 std::string str;
377 (void)getline(file, str);
378 uint64_t utime;
379 uint64_t stime;
380 if (sscanf_s(str.c_str(), "%*d %*s %*s %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %lu %lu", &utime, &stime) ==
381 EOF) {
382 file.close();
383 last_sampling_failed_ = true;
384 return Status(StatusCode::kMDUnexpectedError, "Get thread CPU failed.");
385 }
386 file.close();
387 last_sampling_failed_ = false;
388 if (!first_sample_) {
389 float user_util = ((utime - prev_task_stat_.user_stat) * 1.0 / total_time_elapsed) * 100.0;
390 float sys_util = ((stime - prev_task_stat_.sys_stat) * 1.0 / total_time_elapsed) * 100.0;
391 (void)task_cpu_util_.emplace_back(TaskUtil{user_util, sys_util});
392 }
393 prev_task_stat_.user_stat = utime;
394 prev_task_stat_.sys_stat = stime;
395 first_sample_ = false;
396 return Status::OK();
397 }
398
TaskExists(pid_t id) const399 bool MDOperatorCpuInfo::TaskExists(pid_t id) const { return task_by_id_.find(id) != task_by_id_.end(); }
400
AddTask(const std::shared_ptr<TaskCpuInfo> & task_ptr)401 void MDOperatorCpuInfo::AddTask(const std::shared_ptr<TaskCpuInfo> &task_ptr) {
402 auto id = task_ptr->GetId();
403 if (!TaskExists(id)) {
404 (void)task_by_id_.emplace(id, task_ptr);
405 }
406 }
407
CalculateOperatorUtilization()408 void MDOperatorCpuInfo::CalculateOperatorUtilization() {
409 OpUtil op_util{0, 0};
410 for (auto const &[task_id, task_ptr] : task_by_id_) {
411 MS_LOG(DEBUG) << "Processing task_id: " << task_id;
412 auto task_util = task_ptr->GetLatestCpuUtil();
413 op_util.user_utilization += task_util.user_utilization;
414 op_util.sys_utilization += task_util.sys_utilization;
415 }
416 (void)op_cpu_util_.emplace_back(op_util);
417 }
418
GetUserCpuUtil(uint64_t start_index,uint64_t end_index,std::vector<uint16_t> * result) const419 Status MDOperatorCpuInfo::GetUserCpuUtil(uint64_t start_index, uint64_t end_index,
420 std::vector<uint16_t> *result) const {
421 RETURN_UNEXPECTED_IF_NULL(result);
422 MS_LOG(DEBUG) << "start_index: " << start_index << " end_index: " << end_index
423 << " op_cpu_util_.size: " << op_cpu_util_.size();
424 CHECK_FAIL_RETURN_UNEXPECTED(start_index <= end_index,
425 "Expected start_index <= end_index. Got start_index: " + std::to_string(start_index) +
426 " end_index: " + std::to_string(end_index));
427 CHECK_FAIL_RETURN_UNEXPECTED(
428 end_index <= op_cpu_util_.size(),
429 "Expected end_index <= op_cpu_util_.size(). Got end_index: " + std::to_string(end_index) +
430 " op_cpu_util_.size: " + std::to_string(op_cpu_util_.size()));
431 auto first_iter = op_cpu_util_.begin() + start_index;
432 auto last_iter = op_cpu_util_.begin() + end_index;
433 (void)std::transform(first_iter, last_iter, std::back_inserter(*result), [&](const OpUtil &info) {
434 return static_cast<uint16_t>(info.user_utilization * static_cast<float>(SystemInfo::num_cpu_));
435 });
436 return Status::OK();
437 }
438
GetSysCpuUtil(uint64_t start_index,uint64_t end_index,std::vector<uint16_t> * result) const439 Status MDOperatorCpuInfo::GetSysCpuUtil(uint64_t start_index, uint64_t end_index, std::vector<uint16_t> *result) const {
440 RETURN_UNEXPECTED_IF_NULL(result);
441 MS_LOG(DEBUG) << "start_index: " << start_index << " end_index: " << end_index
442 << " op_cpu_util_.size: " << op_cpu_util_.size();
443 CHECK_FAIL_RETURN_UNEXPECTED(start_index <= end_index,
444 "Expected start_index <= end_index. Got start_index: " + std::to_string(start_index) +
445 " end_index: " + std::to_string(end_index));
446 CHECK_FAIL_RETURN_UNEXPECTED(
447 end_index <= op_cpu_util_.size(),
448 "Expected end_index <= op_cpu_util_.size(). Got end_index: " + std::to_string(end_index) +
449 " op_cpu_util_.size: " + std::to_string(op_cpu_util_.size()));
450 auto first_iter = op_cpu_util_.begin() + start_index;
451 auto last_iter = op_cpu_util_.begin() + end_index;
452 (void)std::transform(first_iter, last_iter, std::back_inserter(*result), [&](const OpUtil &info) {
453 return static_cast<uint16_t>(info.sys_utilization * static_cast<float>(SystemInfo::num_cpu_));
454 });
455 return Status::OK();
456 }
457
Sample()458 Status CpuSampler::Sample() {
459 if (active_ == false) {
460 return Status::OK();
461 }
462 std::lock_guard<std::mutex> guard(lock_);
463 // Function to Update TaskList
464 // Loop through all tasks to find any new threads
465 // Get all multi-processing Ops from Python only if fetched_all_process = False
466 // Create new TaskCpuInfo as required and update OpInfo
467 RETURN_IF_NOT_OK(UpdateTaskList());
468
469 // Sample SystemInfo - Update current and move current to previous stat and calc Util
470 SystemStat current_sys_stat;
471 SystemStat previous_sys_stat;
472 RETURN_IF_NOT_OK(sys_info_.SampleAndGetCurrPrevStat(¤t_sys_stat, &previous_sys_stat));
473 auto total_time_elapsed = current_sys_stat.total_stat - previous_sys_stat.total_stat;
474
475 // Call Sample on all
476 // Read /proc/ files and get stat, calculate util
477 for (auto &task_ptr : tasks_) {
478 (void)task_ptr->Sample(total_time_elapsed);
479 }
480
481 // Call after Sample is called on all child processes
482 (void)main_process_info_->Sample(total_time_elapsed);
483
484 // Calculate OperatorCpuInfo
485 for (auto &[op_id, op_info] : op_info_by_id_) {
486 MS_LOG(DEBUG) << "Calculate operator cpu utilization for OpId: " << op_id;
487 op_info.CalculateOperatorUtilization();
488 }
489
490 // Get sampling time.
491 (void)ts_.emplace_back(ProfilingTime::GetCurMilliSecond());
492
493 return Status::OK();
494 }
495
UpdateTaskList()496 Status CpuSampler::UpdateTaskList() {
497 List<Task> allTasks = tree->AllTasks()->GetTask();
498 for (auto &task : allTasks) {
499 int32_t op_id = task.get_operator_id();
500 // check if the op_info was initialized in Init
501 auto iter = op_info_by_id_.find(op_id);
502 if (iter != op_info_by_id_.end()) {
503 int32_t tid = task.get_linux_id();
504 if (!iter->second.TaskExists(tid)) {
505 auto task_cpu_info_ptr = std::make_shared<ThreadCpuInfo>(main_pid_, tid);
506 (void)tasks_.emplace_back(task_cpu_info_ptr);
507 iter->second.AddTask(task_cpu_info_ptr);
508 }
509 }
510 }
511 for (const auto &op : *tree) {
512 std::vector<int32_t> pids = op.GetMPWorkerPIDs();
513 int32_t op_id = op.id();
514 auto iter = op_info_by_id_.find(op_id);
515 if (iter != op_info_by_id_.end()) {
516 for (auto pid : pids) {
517 if (!iter->second.TaskExists(pid)) {
518 auto task_cpu_info_ptr = std::make_shared<ProcessInfo>(pid);
519 (void)tasks_.emplace_back(task_cpu_info_ptr);
520 main_process_info_->AddChildProcess(task_cpu_info_ptr);
521 iter->second.AddTask(task_cpu_info_ptr);
522 }
523 }
524 }
525 }
526
527 if (!fetched_all_python_multiprocesses_ && tree->IsPython()) {
528 py::gil_scoped_acquire gil_acquire;
529 py::module ds = py::module::import("mindspore.dataset.engine.datasets");
530 py::tuple process_info = ds.attr("_get_operator_process")();
531 auto sub_process = py::reinterpret_borrow<py::dict>(process_info[0]);
532 fetched_all_python_multiprocesses_ = py::reinterpret_borrow<py::bool_>(process_info[1]);
533 // parse dict value
534 auto op_to_process = toIntMap(sub_process);
535 for (auto const &[op_id, process_list] : op_to_process) {
536 for (auto pid : process_list) {
537 auto iter = op_info_by_id_.find(op_id);
538 if (iter != op_info_by_id_.end()) {
539 if (!iter->second.TaskExists(pid)) {
540 auto task_cpu_info_ptr = std::make_shared<ProcessInfo>(pid);
541 (void)tasks_.emplace_back(task_cpu_info_ptr);
542 main_process_info_->AddChildProcess(task_cpu_info_ptr);
543 iter->second.AddTask(task_cpu_info_ptr);
544 }
545 }
546 }
547 }
548 }
549
550 return Status::OK();
551 }
552
Init()553 Status CpuSampler::Init() {
554 #if defined(USING_LINUX)
555 main_pid_ = syscall(SYS_getpid);
556 #endif
557 for (auto iter = tree->begin(); iter != tree->end(); (void)iter++) {
558 auto op_id = iter->id();
559 (void)op_info_by_id_.emplace(std::make_pair(op_id, MDOperatorCpuInfo(op_id)));
560 }
561 // thread id of main thread is same as the process ID
562 main_thread_cpu_info_ = std::make_shared<ThreadCpuInfo>(main_pid_, main_pid_);
563 (void)tasks_.emplace_back(main_thread_cpu_info_);
564 main_process_info_ = std::make_shared<ProcessInfo>(main_pid_, true);
565 return Status::OK();
566 }
567
Clear()568 void CpuSampler::Clear() {
569 ts_.clear();
570 tasks_.clear();
571 main_thread_cpu_info_.reset();
572 main_process_info_.reset();
573 op_info_by_id_.clear();
574 fetched_all_python_multiprocesses_ = false;
575 }
576
ChangeFileMode(const std::string & dir_path,const std::string & rank_id)577 Status CpuSampler::ChangeFileMode(const std::string &dir_path, const std::string &rank_id) {
578 Path path = GetFileName(dir_path, rank_id);
579 std::string file_path = path.ToString();
580 mindspore::ChangeFileMode(file_path, S_IRUSR | S_IWUSR);
581 return Status::OK();
582 }
583
SaveToFile(const std::string & dir_path,const std::string & rank_id)584 Status CpuSampler::SaveToFile(const std::string &dir_path, const std::string &rank_id) {
585 Path path = GetFileName(dir_path, rank_id);
586 // Remove the file if it exists (from prior profiling usage)
587 RETURN_IF_NOT_OK(path.Remove());
588 std::string file_path = path.ToString();
589
590 // construct json obj to write to file
591 json output;
592 output["cpu_processor_num"] = SystemInfo::num_cpu_;
593 std::vector<uint8_t> system_user_util, system_sys_util;
594 // end_index = ts_.size() essentially means to get all sampled points
595 (void)sys_info_.GetUserCpuUtil(0, ts_.size(), &system_user_util);
596 (void)sys_info_.GetSysCpuUtil(0, ts_.size(), &system_sys_util);
597 output["device_info"] = {{"context_switch_count", sys_info_.GetContextSwitchCount()},
598 {"idle_utilization", sys_info_.GetIdleCpuUtil()},
599 {"io_utilization", sys_info_.GetIOCpuUtil()},
600 {"sys_utilization", system_sys_util},
601 {"user_utilization", system_user_util},
602 {"runnable_process", sys_info_.GetRunningProcess()}};
603 // array of op_info json objects
604 json op_infos;
605 for (auto &[op_id, op_info] : op_info_by_id_) {
606 MS_LOG(INFO) << "Processing op_id: " << op_id;
607 std::vector<uint16_t> user_util, sys_util;
608 (void)op_info.GetSysCpuUtil(0, ts_.size(), &sys_util);
609 (void)op_info.GetUserCpuUtil(0, ts_.size(), &user_util);
610 json op_info_json = {{"metrics", {{"user_utilization", user_util}, {"sys_utilization", sys_util}}},
611 {"op_id", op_id}};
612 (void)op_infos.emplace_back(op_info_json);
613 }
614 output["op_info"] = op_infos;
615
616 output["process_info"] = {{"user_utilization", main_process_info_->GetUserCpuUtil()},
617 {"sys_utilization", main_process_info_->GetSysCpuUtil()}};
618
619 output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
620 output["time_stamp"] = ts_;
621
622 std::vector<float> vss, rss, pss;
623 (void)main_process_info_->GetMemoryInfo(ProcessMemoryMetric::kVSS, 0, ts_.size(), &vss);
624 (void)main_process_info_->GetMemoryInfo(ProcessMemoryMetric::kRSS, 0, ts_.size(), &rss);
625 (void)main_process_info_->GetMemoryInfo(ProcessMemoryMetric::kPSS, 0, ts_.size(), &pss);
626 output["process_memory_info"] = {{"vss_mbytes", vss}, {"rss_mbytes", rss}, {"pss_mbytes", pss}};
627
628 std::vector<float> mem_total, mem_avail, mem_used;
629 (void)sys_info_.GetSystemMemInfo(SystemMemoryMetric::kMemoryTotal, 0, ts_.size(), &mem_total);
630 (void)sys_info_.GetSystemMemInfo(SystemMemoryMetric::kMemoryAvailable, 0, ts_.size(), &mem_avail);
631 (void)sys_info_.GetSystemMemInfo(SystemMemoryMetric::kMemoryUsed, 0, ts_.size(), &mem_used);
632 output["system_memory_info"] = {{"total_sys_memory_mbytes", mem_total},
633 {"available_sys_memory_mbytes", mem_avail},
634 {"used_sys_memory_mbytes", mem_used}};
635
636 // Discard the content of the file when opening.
637 std::ofstream os(file_path, std::ios::out | std::ios::trunc);
638 os << output;
639 os.close();
640
641 mindspore::ChangeFileMode(file_path, S_IRUSR | S_IWUSR);
642
643 return Status::OK();
644 }
645
GetOpUserCpuUtil(int32_t op_id,uint64_t start_ts,uint64_t end_ts,std::vector<uint16_t> * result)646 Status CpuSampler::GetOpUserCpuUtil(int32_t op_id, uint64_t start_ts, uint64_t end_ts, std::vector<uint16_t> *result) {
647 std::lock_guard<std::mutex> guard(lock_);
648 // find first ts that is not less than start_ts
649 auto lower = std::lower_bound(ts_.begin(), ts_.end(), start_ts);
650 // find first ts that is greater than end_ts
651 auto upper = std::upper_bound(ts_.begin(), ts_.end(), end_ts);
652 // std::distance is O(1) since vector allows random access
653 auto start_index = std::distance(ts_.begin(), lower);
654 auto end_index = std::distance(ts_.begin(), upper);
655 auto op_info = op_info_by_id_.find(op_id);
656 CHECK_FAIL_RETURN_UNEXPECTED(op_info != op_info_by_id_.end(), "Op Id: " + std::to_string(op_id) + " not found.");
657 return op_info->second.GetUserCpuUtil(start_index, end_index, result);
658 }
659
GetOpSysCpuUtil(int32_t op_id,uint64_t start_ts,uint64_t end_ts,std::vector<uint16_t> * result)660 Status CpuSampler::GetOpSysCpuUtil(int32_t op_id, uint64_t start_ts, uint64_t end_ts, std::vector<uint16_t> *result) {
661 std::lock_guard<std::mutex> guard(lock_);
662 // find first ts that is not less than start_ts
663 auto lower = std::lower_bound(ts_.begin(), ts_.end(), start_ts);
664 // find first ts that is greater than end_ts
665 auto upper = std::upper_bound(ts_.begin(), ts_.end(), end_ts);
666 // std::distance is O(1) since vector allows random access
667 auto start_index = std::distance(ts_.begin(), lower);
668 auto end_index = std::distance(ts_.begin(), upper);
669 auto op_info = op_info_by_id_.find(op_id);
670 CHECK_FAIL_RETURN_UNEXPECTED(op_info != op_info_by_id_.end(), "Op Id: " + std::to_string(op_id) + " not found.");
671 return op_info->second.GetSysCpuUtil(start_index, end_index, result);
672 }
673
GetSystemUserCpuUtil(uint64_t start_ts,uint64_t end_ts,std::vector<uint8_t> * result)674 Status CpuSampler::GetSystemUserCpuUtil(uint64_t start_ts, uint64_t end_ts, std::vector<uint8_t> *result) {
675 std::lock_guard<std::mutex> guard(lock_);
676 // find first ts that is not less than start_ts
677 auto lower = std::lower_bound(ts_.begin(), ts_.end(), start_ts);
678 // find first ts that is greater than end_ts
679 auto upper = std::upper_bound(ts_.begin(), ts_.end(), end_ts);
680 // std::distance is O(1) since vector allows random access
681 auto start_index = std::distance(ts_.begin(), lower);
682 auto end_index = std::distance(ts_.begin(), upper);
683 return sys_info_.GetUserCpuUtil(start_index, end_index, result);
684 }
685
GetSystemSysCpuUtil(uint64_t start_ts,uint64_t end_ts,std::vector<uint8_t> * result)686 Status CpuSampler::GetSystemSysCpuUtil(uint64_t start_ts, uint64_t end_ts, std::vector<uint8_t> *result) {
687 std::lock_guard<std::mutex> guard(lock_);
688 // find first ts that is not less than start_ts
689 auto lower = std::lower_bound(ts_.begin(), ts_.end(), start_ts);
690 // find first ts that is greater than end_ts
691 auto upper = std::upper_bound(ts_.begin(), ts_.end(), end_ts);
692 // std::distance is O(1) since vector allows random access
693 auto start_index = std::distance(ts_.begin(), lower);
694 auto end_index = std::distance(ts_.begin(), upper);
695 return sys_info_.GetSysCpuUtil(start_index, end_index, result);
696 }
697
GetFileName(const std::string & dir_path,const std::string & rank_id)698 Path CpuSampler::GetFileName(const std::string &dir_path, const std::string &rank_id) {
699 return Path(dir_path) / Path("minddata_cpu_utilization_" + rank_id + ".json");
700 }
701
GetLatestMemoryInfo() const702 MemoryInfo ProcessInfo::GetLatestMemoryInfo() const {
703 MemoryInfo ret = {0, 0, 0};
704 if (!last_mem_sampling_failed_) {
705 ret = prev_memory_info_;
706 }
707 return ret;
708 }
709
GetMemoryInfo(ProcessMemoryMetric metric,uint64_t start_index,uint64_t end_index,std::vector<float> * result) const710 Status ProcessInfo::GetMemoryInfo(ProcessMemoryMetric metric, uint64_t start_index, uint64_t end_index,
711 std::vector<float> *result) const {
712 RETURN_UNEXPECTED_IF_NULL(result);
713 MS_LOG(DEBUG) << "start_index: " << start_index << " end_index: " << end_index
714 << "process_memory_info_.size: " << process_memory_info_.size();
715 CHECK_FAIL_RETURN_UNEXPECTED(start_index <= end_index,
716 "Expected start_index <= end_index. Got start_index: " + std::to_string(start_index) +
717 " end_index: " + std::to_string(end_index));
718 CHECK_FAIL_RETURN_UNEXPECTED(
719 end_index <= process_memory_info_.size(),
720 "Expected end_index <= process_memory_info_.size(). Got end_index: " + std::to_string(end_index) +
721 " process_memory_info_.size: " + std::to_string(process_memory_info_.size()));
722 if (metric == ProcessMemoryMetric::kVSS) {
723 (void)std::transform(process_memory_info_.begin() + start_index, process_memory_info_.begin() + end_index,
724 std::back_inserter(*result),
725 [&](const MemoryInfo &info) { return static_cast<float>(info.vss); });
726 } else if (metric == ProcessMemoryMetric::kRSS) {
727 (void)std::transform(process_memory_info_.begin() + start_index, process_memory_info_.begin() + end_index,
728 std::back_inserter(*result),
729 [&](const MemoryInfo &info) { return static_cast<float>(info.rss); });
730 } else if (metric == ProcessMemoryMetric::kPSS) {
731 (void)std::transform(process_memory_info_.begin() + start_index, process_memory_info_.begin() + end_index,
732 std::back_inserter(*result),
733 [&](const MemoryInfo &info) { return static_cast<float>(info.pss); });
734 }
735 return Status::OK();
736 }
737
GetProcessMemoryInfo(ProcessMemoryMetric metric,uint64_t start_index,uint64_t end_index,std::vector<float> * result)738 Status CpuSampler::GetProcessMemoryInfo(ProcessMemoryMetric metric, uint64_t start_index, uint64_t end_index,
739 std::vector<float> *result) {
740 return (main_process_info_->GetMemoryInfo(metric, start_index, end_index, result));
741 }
742
AddChildProcess(const std::shared_ptr<ProcessInfo> & child_ptr)743 void ProcessInfo::AddChildProcess(const std::shared_ptr<ProcessInfo> &child_ptr) {
744 (void)child_processes_.emplace_back(child_ptr);
745 }
746
IsParent()747 bool ProcessInfo::IsParent() { return !(child_processes_.empty()); }
748
GetSystemMemInfo(SystemMemoryMetric metric,uint64_t start_index,uint64_t end_index,std::vector<float> * result) const749 Status SystemInfo::GetSystemMemInfo(SystemMemoryMetric metric, uint64_t start_index, uint64_t end_index,
750 std::vector<float> *result) const {
751 RETURN_UNEXPECTED_IF_NULL(result);
752 MS_LOG(DEBUG) << "start_index: " << start_index << " end_index: " << end_index
753 << "system_memory_info_.size: " << system_memory_info_.size();
754 CHECK_FAIL_RETURN_UNEXPECTED(start_index <= end_index,
755 "Expected start_index <= end_index. Got start_index: " + std::to_string(start_index) +
756 " end_index: " + std::to_string(end_index));
757 CHECK_FAIL_RETURN_UNEXPECTED(
758 end_index <= system_memory_info_.size(),
759 "Expected end_index <= system_memory_info_.size(). Got end_index: " + std::to_string(end_index) +
760 " system_memory_info_.size: " + std::to_string(system_memory_info_.size()));
761 if (metric == SystemMemoryMetric::kMemoryTotal) {
762 (void)std::transform(system_memory_info_.begin() + start_index, system_memory_info_.begin() + end_index,
763 std::back_inserter(*result),
764 [&](const SystemMemInfo &info) { return static_cast<float>(info.total_mem); });
765 } else if (metric == SystemMemoryMetric::kMemoryAvailable) {
766 (void)std::transform(system_memory_info_.begin() + start_index, system_memory_info_.begin() + end_index,
767 std::back_inserter(*result),
768 [&](const SystemMemInfo &info) { return static_cast<float>(info.available_mem); });
769 } else if (metric == SystemMemoryMetric::kMemoryUsed) {
770 (void)std::transform(system_memory_info_.begin() + start_index, system_memory_info_.begin() + end_index,
771 std::back_inserter(*result),
772 [&](const SystemMemInfo &info) { return static_cast<float>(info.used_mem); });
773 }
774 return Status::OK();
775 }
776
GetSystemMemoryInfo(SystemMemoryMetric metric,uint64_t start_index,uint64_t end_index,std::vector<float> * result)777 Status CpuSampler::GetSystemMemoryInfo(SystemMemoryMetric metric, uint64_t start_index, uint64_t end_index,
778 std::vector<float> *result) {
779 return (sys_info_.GetSystemMemInfo(metric, start_index, end_index, result));
780 }
781 } // namespace dataset
782 } // namespace mindspore
783