• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/perf/profiling.h"
17 
18 #include <sys/stat.h>
19 
20 #include <algorithm>
21 #include <cstdlib>
22 #include <fstream>
23 
24 #include "minddata/dataset/engine/execution_tree.h"
25 #include "minddata/dataset/engine/perf/connector_size.h"
26 #include "minddata/dataset/engine/perf/cpu_sampler.h"
27 #include "minddata/dataset/engine/perf/monitor.h"
28 #include "minddata/dataset/engine/tree_adapter.h"
29 #include "minddata/dataset/util/log_adapter.h"
30 #include "minddata/dataset/util/path.h"
31 #ifdef WITH_BACKEND
32 #include "utils/ms_context.h"
33 #endif
34 #include "utils/ms_utils.h"
35 #ifndef BUILD_LITE
36 #include "mindspore/core/utils/file_utils.h"
37 namespace platform = mindspore;
38 #else
39 #include "mindspore/lite/src/common/file_utils.h"
40 namespace platform = mindspore::lite;
41 #endif
42 
43 namespace mindspore {
44 namespace dataset {
45 constexpr int32_t PUSH_TIME_OFFSET = 0;
46 constexpr int32_t BATCH_TIME_OFFSET = 1;
47 constexpr int32_t PIPELINE_TIME_OFFSET = 2;
48 constexpr int32_t CONNECTOR_DEPTH_OFFSET = 3;
49 
Start()50 Status Profiling::Start() {
51   CHECK_FAIL_RETURN_UNEXPECTED(active_ == false, "Profiling node is already active.");
52   active_ = true;
53   return Status::OK();
54 }
55 
Stop()56 Status Profiling::Stop() {
57   CHECK_FAIL_RETURN_UNEXPECTED(active_ == true, "Profiling node is already deactivated.");
58   active_ = false;
59   return Status::OK();
60 }
61 
SaveToFile(const std::string & dir_path,const std::string & rank_id)62 Status Tracing::SaveToFile(const std::string &dir_path, const std::string &rank_id) {
63   if (value_.empty()) {
64     return Status::OK();
65   }
66 
67   Path path = GetFileName(dir_path, rank_id);
68   // Remove the file if it exists (from prior profiling usage)
69   RETURN_IF_NOT_OK(path.Remove());
70   std::string file_path = path.ToString();
71 
72   MS_LOG(INFO) << "Start to save profiling data for a tracing node.";
73   std::ofstream handle(file_path, std::ios::out | std::ios::trunc);
74   if (!handle.is_open()) {
75     RETURN_STATUS_UNEXPECTED("Profiling file can not be opened.");
76   }
77   for (const auto &value : value_) {
78     handle << value << "\n";
79   }
80   handle.close();
81 
82   platform::ChangeFileMode(file_path, S_IRUSR | S_IWUSR);
83 
84   return Status::OK();
85 }
86 
ChangeFileMode(const std::string & dir_path,const std::string & rank_id)87 Status Tracing::ChangeFileMode(const std::string &dir_path, const std::string &rank_id) {
88   if (value_.empty()) {
89     return Status::OK();
90   }
91 
92   Path path = GetFileName(dir_path, rank_id);
93   std::string file_path = path.ToString();
94   if (chmod(common::SafeCStr(file_path), S_IRUSR | S_IWUSR) == -1) {
95     std::string err_str = "Change file mode failed," + file_path;
96     return Status(StatusCode::kMDUnexpectedError, err_str);
97   }
98   return Status::OK();
99 }
100 
Record(const int32_t type,const int32_t extra_info,const int32_t batch_num,const int32_t value,const uint64_t time_stamp)101 void Tracing::Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, const int32_t value,
102                      const uint64_t time_stamp) {
103   // Format: "type extra-info batch-num value"
104   // type: 0: time,  1: connector size
105   // extra-info: if type is 0 - 0: pipeline time, 1: push tdt time, 2: batch time
106   //             if type is 1 - connector capacity
107   // batch-num: batch number
108   // value: if type is 0 - value is time(ms)
109   //        if type is 1 - value is connector size
110   // time-stamp: time stamp
111   // Examples:
112   // 0 0 20 10 xxx- The 20th batch took 10ms to get data from pipeline.
113   // 1 64 20 5 xxx- Connector size is 5 when get the 20th batch.Connector capacity is 64.
114   if (!active_) {
115     return;
116   }
117   TracingRecord record = {type, extra_info, batch_num, value, time_stamp};
118   std::lock_guard<std::mutex> guard(lock_);
119   (void)records_.emplace_back(record);
120   (void)value_.emplace_back(record.ToString());
121   // save timestamp per batch
122   const constexpr int32_t RECORDS_PER_STEP = 4;
123   if (records_.size() % RECORDS_PER_STEP == 0) {
124     (void)ts_.emplace_back(time_stamp);
125   }
126 }
127 
TimeIntervalForStepRange(int32_t start_step,int32_t end_step,uint64_t * start_ts,uint64_t * end_ts)128 Status Tracing::TimeIntervalForStepRange(int32_t start_step, int32_t end_step, uint64_t *start_ts, uint64_t *end_ts) {
129   RETURN_UNEXPECTED_IF_NULL(start_ts);
130   RETURN_UNEXPECTED_IF_NULL(end_ts);
131   std::lock_guard<std::mutex> guard(lock_);
132   MS_LOG(DEBUG) << "start_step: " << start_step << " end_step: " << end_step;
133   CHECK_FAIL_RETURN_UNEXPECTED(start_step > 0,
134                                "Expected start_step > 0. Got start_step: " + std::to_string(start_step));
135   CHECK_FAIL_RETURN_UNEXPECTED(end_step >= start_step,
136                                "Expected end_step >= start_step. Got start_step: " + std::to_string(start_step) +
137                                  " end_step: " + std::to_string(end_step));
138   CHECK_FAIL_RETURN_UNEXPECTED(end_step < static_cast<int32_t>(ts_.size()),
139                                "Expected end_step < ts_.size(). Got end_step: " + std::to_string(end_step) +
140                                  " ts_.size: " + std::to_string(ts_.size()));
141   // end timestamp of (start_step - 1) step
142   *start_ts = ts_[start_step - 1];
143   *end_ts = ts_[end_step];
144   return Status::OK();
145 }
146 
StepIntervalForTimeRange(uint64_t start_ts,uint64_t end_ts,int32_t * start_step,int32_t * end_step)147 Status Tracing::StepIntervalForTimeRange(uint64_t start_ts, uint64_t end_ts, int32_t *start_step, int32_t *end_step) {
148   RETURN_UNEXPECTED_IF_NULL(start_step);
149   RETURN_UNEXPECTED_IF_NULL(end_step);
150   CHECK_FAIL_RETURN_UNEXPECTED(start_ts < end_ts, "Expected start_ts < end_ts. Got start_ts: " +
151                                                     std::to_string(start_ts) + " end_ts: " + std::to_string(end_ts));
152   std::lock_guard<std::mutex> guard(lock_);
153   CHECK_FAIL_RETURN_UNEXPECTED(ts_.size() > 1, "No tracing data available yet.");
154   // find first ts that is not less than start_ts
155   auto lower = std::lower_bound(ts_.begin(), ts_.end(), start_ts);
156   CHECK_FAIL_RETURN_UNEXPECTED(lower != ts_.end(),
157                                "No data available for time >= start_ts. start_ts: " + std::to_string(start_ts));
158   // there is no 0th step. If start_ts == 0, then lower == ts_.begin()
159   *start_step = std::max(1, static_cast<int32_t>(std::distance(ts_.begin(), lower)));
160   // find first ts that is greater than end_ts
161   auto upper = std::upper_bound(ts_.begin(), ts_.end(), end_ts);
162   if (upper == ts_.end()) {
163     *end_step = std::max(1, static_cast<int32_t>(std::distance(ts_.begin(), upper) - 1));
164   } else {
165     *end_step = std::max(1, static_cast<int32_t>(std::distance(ts_.begin(), upper)));
166   }
167   return Status::OK();
168 }
169 
GetRecordEntryFieldValue(int32_t start_step,int32_t end_step,int32_t record_offset,const std::string & field,std::vector<int32_t> * result)170 Status Tracing::GetRecordEntryFieldValue(int32_t start_step, int32_t end_step, int32_t record_offset,
171                                          const std::string &field, std::vector<int32_t> *result) {
172   RETURN_UNEXPECTED_IF_NULL(result);
173   std::lock_guard<std::mutex> guard(lock_);
174   const constexpr int32_t RECORDS_PER_STEP = 4;
175   auto total_steps = records_.size() / RECORDS_PER_STEP;
176   MS_LOG(DEBUG) << "start_step: " << start_step << " end_step: " << end_step;
177   CHECK_FAIL_RETURN_UNEXPECTED(start_step <= static_cast<int32_t>(total_steps),
178                                "Expected start_step <= total_steps. Got start_step: " + std::to_string(start_step) +
179                                  " total_steps: " + std::to_string(total_steps));
180   CHECK_FAIL_RETURN_UNEXPECTED(end_step <= static_cast<int32_t>(total_steps),
181                                "Expected end_step <= total_steps. Got end_step: " + std::to_string(end_step) +
182                                  " total_steps: " + std::to_string(total_steps));
183   CHECK_FAIL_RETURN_UNEXPECTED(start_step <= end_step,
184                                "Expected start_step <= end_step. Got start_step: " + std::to_string(start_step) +
185                                  " end_step: " + std::to_string(end_step));
186 
187   for (auto step_num = start_step; step_num <= end_step; step_num++) {
188     auto idx = (step_num - 1) * RECORDS_PER_STEP + record_offset;
189     CHECK_FAIL_RETURN_UNEXPECTED(idx >= 0, "Expected idx >= 0. Got idx: " + std::to_string(idx));
190     if (field == "value") {
191       (void)result->emplace_back(records_[static_cast<size_t>(idx)].value);
192     } else if (field == "extra_info") {
193       (void)result->emplace_back(records_[static_cast<size_t>(idx)].extra_info);
194     } else {
195       return {StatusCode::kMDUnexpectedError,
196               "Received unexpected field: " + field + R"(. Expected: ["value", "extra_info"].)"};
197     }
198   }
199   return Status::OK();
200 }
201 
GetPipelineTime(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)202 Status Tracing::GetPipelineTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
203   return GetRecordEntryFieldValue(start_step, end_step, PIPELINE_TIME_OFFSET, "value", result);
204 }
205 
GetPushTime(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)206 Status Tracing::GetPushTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
207   return GetRecordEntryFieldValue(start_step, end_step, PUSH_TIME_OFFSET, "value", result);
208 }
209 
GetBatchTime(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)210 Status Tracing::GetBatchTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
211   return GetRecordEntryFieldValue(start_step, end_step, BATCH_TIME_OFFSET, "value", result);
212 }
213 
GetConnectorSize(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)214 Status Tracing::GetConnectorSize(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
215   return GetRecordEntryFieldValue(start_step, end_step, CONNECTOR_DEPTH_OFFSET, "value", result);
216 }
217 
GetConnectorCapacity(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)218 Status Tracing::GetConnectorCapacity(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
219   return GetRecordEntryFieldValue(start_step, end_step, CONNECTOR_DEPTH_OFFSET, "extra_info", result);
220 }
221 
GetEmptyQueueFrequency(int32_t start_step,int32_t end_step,float_t * empty_queue_freq)222 Status Tracing::GetEmptyQueueFrequency(int32_t start_step, int32_t end_step, float_t *empty_queue_freq) {
223   RETURN_UNEXPECTED_IF_NULL(empty_queue_freq);
224   std::vector<int32_t> sizes;
225   RETURN_IF_NOT_OK(GetConnectorSize(start_step, end_step, &sizes));
226   int32_t total = end_step - start_step + 1;
227   CHECK_FAIL_RETURN_UNEXPECTED(total > 0, "Start step is greater than end step.");
228   uint32_t count = static_cast<uint32_t>(std::count(sizes.begin(), sizes.end(), 0));
229   *empty_queue_freq = static_cast<float_t>(count) / static_cast<float_t>(total);
230   return Status::OK();
231 }
232 
Init()233 Status Tracing::Init() {
234   (void)ts_.emplace_back(0);
235   return Status::OK();
236 }
237 
GetNumberSteps()238 size_t Tracing::GetNumberSteps() { return ts_.size(); }
239 
Clear()240 void Tracing::Clear() {
241   value_.clear();
242   records_.clear();
243   ts_.clear();
244 }
245 
246 // Constructor
ProfilingManager()247 ProfilingManager::ProfilingManager()
248     : profiling_state_(ProfilingState::kProfilingStateUnBegun), tree_(nullptr), autotuning_(false), profiling_(false) {}
249 
IsProfilingEnable(const ExecutionTree * tree) const250 bool ProfilingManager::IsProfilingEnable(const ExecutionTree *tree) const {
251   auto external_state = GetProfilerTreeState(tree);
252   return (external_state == kEnabledTreeNotRegistered || external_state == kEnabledTreeRegistered);
253 }
254 
RegisterTree(const TreeAdapter * tree_adapter)255 Status ProfilingManager::RegisterTree(const TreeAdapter *tree_adapter) {
256   RETURN_UNEXPECTED_IF_NULL(tree_adapter);
257   CHECK_FAIL_RETURN_UNEXPECTED(tree_ == nullptr, "Another tree is already registered.");
258   CHECK_FAIL_RETURN_UNEXPECTED((autotuning_ || profiling_) == true,
259                                "MD Profiler is disabled. Cannot register the tree.");
260   tree_ = tree_adapter->tree_.get();
261   MS_LOG(INFO) << "Registering tree: " + tree_->GetUniqueId();
262   perf_monitor_ = std::make_unique<Monitor>(this);
263   // Register all sampling nodes here.
264   // Tracing node registration is the responsibility of the Consumer
265   std::shared_ptr<Sampling> connector_size_sampling = std::make_shared<ConnectorSize>(tree_);
266   RETURN_IF_NOT_OK(RegisterSamplingNode(connector_size_sampling));
267 
268 #ifndef ENABLE_ANDROID
269   std::shared_ptr<Sampling> cpu_sampler = std::make_shared<CpuSampler>(tree_);
270   RETURN_IF_NOT_OK(RegisterSamplingNode(cpu_sampler));
271 #endif
272   // can insert a correct timestamp so that we can ignore the samples that were taken
273   // during start up of the pipeline.
274   (void)epoch_end_ts_.emplace_back(0);
275   (void)epoch_end_step_.emplace_back(0);
276   return Status::OK();
277 }
278 
279 // Launch monitoring thread.
LaunchMonitor()280 Status ProfilingManager::LaunchMonitor() {
281   RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Monitor Thread launched", std::ref(*perf_monitor_)));
282   return Status::OK();
283 }
284 
285 // Profiling node registration
RegisterTracingNode(const std::shared_ptr<Tracing> & node)286 Status ProfilingManager::RegisterTracingNode(const std::shared_ptr<Tracing> &node) {
287   // Check if node with the same name has already been registered.
288   auto exist = tracing_nodes_.find(node->Name());
289   if (exist != tracing_nodes_.end()) {
290     return Status(StatusCode::kMDProfilingError, "Profiling node already exist: " + node->Name());
291   }
292   // Register the node with its name as key.
293   RETURN_IF_NOT_OK(node->Init());
294   tracing_nodes_[node->Name()] = node;
295 
296   // the user may have already started profiling.
297   if (profiling_state_ == ProfilingState::kProfilingStateRunning) {
298     RETURN_IF_NOT_OK(node->Start());
299   }
300   return Status::OK();
301 }
302 
303 // Profiling node getter
GetTracingNode(const std::string & name,std::shared_ptr<Tracing> * node)304 Status ProfilingManager::GetTracingNode(const std::string &name, std::shared_ptr<Tracing> *node) {
305   // Check if node with the same name has already been registered.
306   auto exist = tracing_nodes_.find(name);
307   if (exist == tracing_nodes_.end()) {
308     return Status(StatusCode::kMDProfilingError, "Profiling node does not exist: " + name);
309   }
310   // Fetch node.
311   *node = tracing_nodes_[name];
312   return Status::OK();
313 }
314 
315 // Profiling node registration
RegisterSamplingNode(const std::shared_ptr<Sampling> & node)316 Status ProfilingManager::RegisterSamplingNode(const std::shared_ptr<Sampling> &node) {
317   // Check if node with the same name has already been registered.
318   auto exist = sampling_nodes_.find(node->Name());
319   if (exist != sampling_nodes_.end()) {
320     return Status(StatusCode::kMDProfilingError, "Profiling node already exist: " + node->Name());
321   }
322   // Register the node with its name as key.
323   RETURN_IF_NOT_OK(node->Init());
324   sampling_nodes_[node->Name()] = node;
325 
326   // the user may have already started profiling.
327   if (profiling_state_ == ProfilingState::kProfilingStateRunning) {
328     RETURN_IF_NOT_OK(node->Start());
329   }
330   return Status::OK();
331 }
332 
333 // Profiling node getter
GetSamplingNode(const std::string & name,std::shared_ptr<Sampling> * node)334 Status ProfilingManager::GetSamplingNode(const std::string &name, std::shared_ptr<Sampling> *node) {
335   // Check if node with the same name has already been registered.
336   auto exist = sampling_nodes_.find(name);
337   if (exist == sampling_nodes_.end()) {
338     return Status(StatusCode::kMDProfilingError, "Profiling node does not exist: " + name);
339   }
340   // Fetch node.
341   *node = sampling_nodes_[name];
342   return Status::OK();
343 }
344 
SaveProfilingData(const std::string & dir_path,const std::string & rank_id)345 Status ProfilingManager::SaveProfilingData(const std::string &dir_path, const std::string &rank_id) {
346   MS_LOG(INFO) << "Start to save profiling data.";
347   for (const auto &node : tracing_nodes_) {
348     RETURN_IF_NOT_OK(node.second->SaveToFile(dir_path, rank_id));
349   }
350   for (const auto &node : sampling_nodes_) {
351     RETURN_IF_NOT_OK(node.second->SaveToFile(dir_path, rank_id));
352   }
353   MS_LOG(INFO) << "Save profiling data end.";
354   return Status::OK();
355 }
356 
ChangeFileMode(const std::string & dir_path,const std::string & rank_id)357 Status ProfilingManager::ChangeFileMode(const std::string &dir_path, const std::string &rank_id) {
358   MS_LOG(INFO) << "Start to change file mode.";
359   for (const auto &node : tracing_nodes_) {
360     RETURN_IF_NOT_OK(node.second->ChangeFileMode(dir_path, rank_id));
361   }
362   for (const auto &node : sampling_nodes_) {
363     RETURN_IF_NOT_OK(node.second->ChangeFileMode(dir_path, rank_id));
364   }
365   MS_LOG(INFO) << "Change file mode end.";
366   return Status::OK();
367 }
368 
369 #ifndef ENABLE_ANDROID
GetUserCpuUtilByEpoch(int32_t epoch_num,std::vector<uint8_t> * result)370 Status ProfilingManager::GetUserCpuUtilByEpoch(int32_t epoch_num, std::vector<uint8_t> *result) {
371   uint64_t start_ts = 0, end_ts = 0;
372   RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, &start_ts, &end_ts));
373   return GetUserCpuUtilByTime(start_ts, end_ts, result);
374 }
375 
GetUserCpuUtilByStep(int32_t start_step,int32_t end_step,std::vector<uint8_t> * result)376 Status ProfilingManager::GetUserCpuUtilByStep(int32_t start_step, int32_t end_step, std::vector<uint8_t> *result) {
377   uint64_t start_ts = 0, end_ts = 0;
378   RETURN_IF_NOT_OK(StepToTimeInterval(start_step, end_step, &start_ts, &end_ts));
379   return GetUserCpuUtilByTime(start_ts, end_ts, result);
380 }
381 
GetUserCpuUtilByTime(uint64_t start_ts,uint64_t end_ts,std::vector<uint8_t> * result)382 Status ProfilingManager::GetUserCpuUtilByTime(uint64_t start_ts, uint64_t end_ts, std::vector<uint8_t> *result) {
383   std::shared_ptr<Sampling> sampling_node;
384   RETURN_IF_NOT_OK(GetSamplingNode(kCpuSamplerName, &sampling_node));
385   auto node = std::dynamic_pointer_cast<CpuSampler>(sampling_node);
386   return node->GetSystemUserCpuUtil(start_ts, end_ts, result);
387 }
388 
GetSysCpuUtilByEpoch(int32_t epoch_num,std::vector<uint8_t> * result)389 Status ProfilingManager::GetSysCpuUtilByEpoch(int32_t epoch_num, std::vector<uint8_t> *result) {
390   uint64_t start_ts = 0, end_ts = 0;
391   RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, &start_ts, &end_ts));
392   return GetSysCpuUtilByTime(start_ts, end_ts, result);
393 }
394 
GetSysCpuUtilByStep(int32_t start_step,int32_t end_step,std::vector<uint8_t> * result)395 Status ProfilingManager::GetSysCpuUtilByStep(int32_t start_step, int32_t end_step, std::vector<uint8_t> *result) {
396   uint64_t start_ts = 0, end_ts = 0;
397   RETURN_IF_NOT_OK(StepToTimeInterval(start_step, end_step, &start_ts, &end_ts));
398   return GetSysCpuUtilByTime(start_ts, end_ts, result);
399 }
400 
GetSysCpuUtilByTime(uint64_t start_ts,uint64_t end_ts,std::vector<uint8_t> * result)401 Status ProfilingManager::GetSysCpuUtilByTime(uint64_t start_ts, uint64_t end_ts, std::vector<uint8_t> *result) {
402   std::shared_ptr<Sampling> sampling_node;
403   RETURN_IF_NOT_OK(GetSamplingNode(kCpuSamplerName, &sampling_node));
404   auto node = std::dynamic_pointer_cast<CpuSampler>(sampling_node);
405   return node->GetSystemSysCpuUtil(start_ts, end_ts, result);
406 }
407 
GetUserCpuUtilByEpoch(int32_t op_id,int32_t epoch_num,std::vector<uint16_t> * result)408 Status ProfilingManager::GetUserCpuUtilByEpoch(int32_t op_id, int32_t epoch_num, std::vector<uint16_t> *result) {
409   uint64_t start_ts = 0, end_ts = 0;
410   RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, &start_ts, &end_ts));
411   return GetUserCpuUtilByTime(op_id, start_ts, end_ts, result);
412 }
413 
GetUserCpuUtilByStep(int32_t op_id,int32_t start_step,int32_t end_step,std::vector<uint16_t> * result)414 Status ProfilingManager::GetUserCpuUtilByStep(int32_t op_id, int32_t start_step, int32_t end_step,
415                                               std::vector<uint16_t> *result) {
416   uint64_t start_ts = 0, end_ts = 0;
417   RETURN_IF_NOT_OK(StepToTimeInterval(start_step, end_step, &start_ts, &end_ts));
418   return GetUserCpuUtilByTime(op_id, start_ts, end_ts, result);
419 }
420 
GetUserCpuUtilByTime(int32_t op_id,uint64_t start_ts,uint64_t end_ts,std::vector<uint16_t> * result)421 Status ProfilingManager::GetUserCpuUtilByTime(int32_t op_id, uint64_t start_ts, uint64_t end_ts,
422                                               std::vector<uint16_t> *result) {
423   std::shared_ptr<Sampling> sampling_node;
424   RETURN_IF_NOT_OK(GetSamplingNode(kCpuSamplerName, &sampling_node));
425   auto node = std::dynamic_pointer_cast<CpuSampler>(sampling_node);
426   return node->GetOpUserCpuUtil(op_id, start_ts, end_ts, result);
427 }
428 
GetSysCpuUtilByEpoch(int32_t op_id,int32_t epoch_num,std::vector<uint16_t> * result)429 Status ProfilingManager::GetSysCpuUtilByEpoch(int32_t op_id, int32_t epoch_num, std::vector<uint16_t> *result) {
430   uint64_t start_ts = 0, end_ts = 0;
431   RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, &start_ts, &end_ts));
432   return GetSysCpuUtilByTime(op_id, start_ts, end_ts, result);
433 }
434 
GetSysCpuUtilByStep(int32_t op_id,int32_t start_step,int32_t end_step,std::vector<uint16_t> * result)435 Status ProfilingManager::GetSysCpuUtilByStep(int32_t op_id, int32_t start_step, int32_t end_step,
436                                              std::vector<uint16_t> *result) {
437   uint64_t start_ts = 0, end_ts = 0;
438   RETURN_IF_NOT_OK(StepToTimeInterval(start_step, end_step, &start_ts, &end_ts));
439   return GetSysCpuUtilByTime(op_id, start_ts, end_ts, result);
440 }
441 
GetSysCpuUtilByTime(int32_t op_id,uint64_t start_ts,uint64_t end_ts,std::vector<uint16_t> * result)442 Status ProfilingManager::GetSysCpuUtilByTime(int32_t op_id, uint64_t start_ts, uint64_t end_ts,
443                                              std::vector<uint16_t> *result) {
444   std::shared_ptr<Sampling> sampling_node;
445   RETURN_IF_NOT_OK(GetSamplingNode(kCpuSamplerName, &sampling_node));
446   auto node = std::dynamic_pointer_cast<CpuSampler>(sampling_node);
447   return node->GetOpSysCpuUtil(op_id, start_ts, end_ts, result);
448 }
449 
GetMainProcessMemoryInfoByEpoch(ProcessMemoryMetric metric,int32_t epoch_num,std::vector<float> * result)450 Status ProfilingManager::GetMainProcessMemoryInfoByEpoch(ProcessMemoryMetric metric, int32_t epoch_num,
451                                                          std::vector<float> *result) {
452   uint64_t start_ts = 0, end_ts = 0;
453   RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, &start_ts, &end_ts));
454   return GetMainProcessMemoryInfoByTime(metric, start_ts, end_ts, result);
455 }
456 
GetMainProcessMemoryInfoByStep(ProcessMemoryMetric metric,int32_t start_step,int32_t end_step,std::vector<float> * result)457 Status ProfilingManager::GetMainProcessMemoryInfoByStep(ProcessMemoryMetric metric, int32_t start_step,
458                                                         int32_t end_step, std::vector<float> *result) {
459   uint64_t start_ts = 0, end_ts = 0;
460   RETURN_IF_NOT_OK(StepToTimeInterval(start_step, end_step, &start_ts, &end_ts));
461   return GetMainProcessMemoryInfoByTime(metric, start_ts, end_ts, result);
462 }
463 
GetMainProcessMemoryInfoByTime(ProcessMemoryMetric metric,uint64_t start_ts,uint64_t end_ts,std::vector<float> * result)464 Status ProfilingManager::GetMainProcessMemoryInfoByTime(ProcessMemoryMetric metric, uint64_t start_ts, uint64_t end_ts,
465                                                         std::vector<float> *result) {
466   std::shared_ptr<Sampling> sampling_node;
467   RETURN_IF_NOT_OK(GetSamplingNode(kCpuSamplerName, &sampling_node));
468   auto node = std::dynamic_pointer_cast<CpuSampler>(sampling_node);
469   return node->GetProcessMemoryInfo(metric, start_ts, end_ts, result);
470 }
471 
GetSystemMemoryInfoByEpoch(SystemMemoryMetric metric,int32_t epoch_num,std::vector<float> * result)472 Status ProfilingManager::GetSystemMemoryInfoByEpoch(SystemMemoryMetric metric, int32_t epoch_num,
473                                                     std::vector<float> *result) {
474   uint64_t start_ts = 0, end_ts = 0;
475   RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, &start_ts, &end_ts));
476   return GetSystemMemoryInfoByTime(metric, start_ts, end_ts, result);
477 }
478 
GetSystemMemoryInfoByStep(SystemMemoryMetric metric,int32_t start_step,int32_t end_step,std::vector<float> * result)479 Status ProfilingManager::GetSystemMemoryInfoByStep(SystemMemoryMetric metric, int32_t start_step, int32_t end_step,
480                                                    std::vector<float> *result) {
481   uint64_t start_ts = 0, end_ts = 0;
482   RETURN_IF_NOT_OK(StepToTimeInterval(start_step, end_step, &start_ts, &end_ts));
483   return GetSystemMemoryInfoByTime(metric, start_ts, end_ts, result);
484 }
485 
GetSystemMemoryInfoByTime(SystemMemoryMetric metric,uint64_t start_ts,uint64_t end_ts,std::vector<float> * result)486 Status ProfilingManager::GetSystemMemoryInfoByTime(SystemMemoryMetric metric, uint64_t start_ts, uint64_t end_ts,
487                                                    std::vector<float> *result) {
488   std::shared_ptr<Sampling> sampling_node;
489   RETURN_IF_NOT_OK(GetSamplingNode(kCpuSamplerName, &sampling_node));
490   auto node = std::dynamic_pointer_cast<CpuSampler>(sampling_node);
491   return node->GetSystemMemoryInfo(metric, start_ts, end_ts, result);
492 }
493 #endif
494 
EpochToTimeInterval(int32_t epoch_num,uint64_t * start_ts,uint64_t * end_ts)495 Status ProfilingManager::EpochToTimeInterval(int32_t epoch_num, uint64_t *start_ts, uint64_t *end_ts) {
496   RETURN_UNEXPECTED_IF_NULL(start_ts);
497   RETURN_UNEXPECTED_IF_NULL(end_ts);
498   if (epoch_num <= 0 || epoch_num >= static_cast<int32_t>(epoch_end_ts_.size())) {
499     std::string err = "Epoch: " + std::to_string(epoch_num) + " is invalid.";
500     MS_LOG(INFO) << err;
501     return {StatusCode::kMDUnexpectedError, err};
502   }
503   *start_ts = epoch_end_ts_[epoch_num - 1];
504   *end_ts = epoch_end_ts_[epoch_num];
505   return Status::OK();
506 }
507 
EpochToStepInterval(int32_t epoch_num,uint32_t * start_step,uint32_t * end_step)508 Status ProfilingManager::EpochToStepInterval(int32_t epoch_num, uint32_t *start_step, uint32_t *end_step) {
509   RETURN_UNEXPECTED_IF_NULL(start_step);
510   RETURN_UNEXPECTED_IF_NULL(end_step);
511   if (epoch_num <= 0 || epoch_num >= static_cast<int32_t>(epoch_end_step_.size())) {
512     std::string err = "Epoch: " + std::to_string(epoch_num) + " is invalid.";
513     return {StatusCode::kMDUnexpectedError, err};
514   }
515   *start_step = epoch_end_step_[epoch_num - 1] + 1;
516   *end_step = epoch_end_step_[epoch_num];
517   return Status::OK();
518 }
519 
StepToTimeInterval(int32_t start_step,int32_t end_step,uint64_t * start_ts,uint64_t * end_ts)520 Status ProfilingManager::StepToTimeInterval(int32_t start_step, int32_t end_step, uint64_t *start_ts,
521                                             uint64_t *end_ts) {
522   std::shared_ptr<Tracing> node;
523   if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
524       GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
525     return node->TimeIntervalForStepRange(start_step, end_step, start_ts, end_ts);
526   } else {
527     return {StatusCode::kMDUnexpectedError,
528             "Cannot find appropriate tracing node to convert step range to time interval."};
529   }
530 }
531 
TimeToStepInterval(uint64_t start_ts,uint64_t end_ts,int32_t * start_step,int32_t * end_step)532 Status ProfilingManager::TimeToStepInterval(uint64_t start_ts, uint64_t end_ts, int32_t *start_step,
533                                             int32_t *end_step) {
534   std::shared_ptr<Tracing> node;
535   if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
536       GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
537     return node->StepIntervalForTimeRange(start_ts, end_ts, start_step, end_step);
538   } else {
539     return {StatusCode::kMDUnexpectedError,
540             "Cannot find appropriate tracing node to convert time interval to step range."};
541   }
542 }
543 
GetConnectorSizeByEpoch(int32_t op_id,int32_t epoch_num,std::vector<int32_t> * result)544 Status ProfilingManager::GetConnectorSizeByEpoch(int32_t op_id, int32_t epoch_num, std::vector<int32_t> *result) {
545   uint64_t start_ts = 0, end_ts = 0;
546   RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, &start_ts, &end_ts));
547   return GetConnectorSizeByTime(op_id, start_ts, end_ts, result);
548 }
549 
GetConnectorSizeByStep(int32_t op_id,int32_t start_step,int32_t end_step,std::vector<int32_t> * result)550 Status ProfilingManager::GetConnectorSizeByStep(int32_t op_id, int32_t start_step, int32_t end_step,
551                                                 std::vector<int32_t> *result) {
552   uint64_t start_ts = 0, end_ts = 0;
553   RETURN_IF_NOT_OK(StepToTimeInterval(start_step, end_step, &start_ts, &end_ts));
554   return GetConnectorSizeByTime(op_id, start_ts, end_ts, result);
555 }
556 
GetConnectorSizeByTime(int32_t op_id,uint64_t start_ts,uint64_t end_ts,std::vector<int32_t> * result)557 Status ProfilingManager::GetConnectorSizeByTime(int32_t op_id, uint64_t start_ts, uint64_t end_ts,
558                                                 std::vector<int32_t> *result) {
559   std::shared_ptr<Sampling> node;
560   RETURN_IF_NOT_OK(GetSamplingNode(kConnectorSizeSamplingName, &node));
561   auto connector_node = std::dynamic_pointer_cast<ConnectorSize>(node);
562   return connector_node->GetOpConnectorSize(op_id, start_ts, end_ts, result);
563 }
564 
GetPipelineTimeByEpoch(int32_t epoch_num,std::vector<int32_t> * result)565 Status ProfilingManager::GetPipelineTimeByEpoch(int32_t epoch_num, std::vector<int32_t> *result) {
566   uint32_t start_step = 0, end_step = 0;
567   RETURN_IF_NOT_OK(EpochToStepInterval(epoch_num, &start_step, &end_step));
568   return GetPipelineTimeByStep(start_step, end_step, result);
569 }
570 
GetPipelineTimeByStep(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)571 Status ProfilingManager::GetPipelineTimeByStep(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
572   std::shared_ptr<Tracing> node;
573   if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
574       GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
575     return node->GetPipelineTime(start_step, end_step, result);
576   } else {
577     return {StatusCode::kMDUnexpectedError, "Cannot find appropriate tracing node"};
578   }
579 }
580 
GetPipelineTimeByTime(uint64_t start_ts,uint64_t end_ts,std::vector<int32_t> * result)581 Status ProfilingManager::GetPipelineTimeByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result) {
582   int32_t start_step = 0, end_step = 0;
583   RETURN_IF_NOT_OK(TimeToStepInterval(start_ts, end_ts, &start_step, &end_step));
584   return GetPipelineTimeByStep(start_step, end_step, result);
585 }
586 
GetPushTimeByEpoch(int32_t epoch_num,std::vector<int32_t> * result)587 Status ProfilingManager::GetPushTimeByEpoch(int32_t epoch_num, std::vector<int32_t> *result) {
588   uint32_t start_step = 0, end_step = 0;
589   RETURN_IF_NOT_OK(EpochToStepInterval(epoch_num, &start_step, &end_step));
590   return GetPushTimeByStep(start_step, end_step, result);
591 }
592 
GetPushTimeByStep(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)593 Status ProfilingManager::GetPushTimeByStep(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
594   std::shared_ptr<Tracing> node;
595   if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
596       GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
597     return node->GetPushTime(start_step, end_step, result);
598   } else {
599     return {StatusCode::kMDUnexpectedError, "Cannot find appropriate tracing node"};
600   }
601 }
602 
GetPushTimeByTime(uint64_t start_ts,uint64_t end_ts,std::vector<int32_t> * result)603 Status ProfilingManager::GetPushTimeByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result) {
604   int32_t start_step = 0, end_step = 0;
605   RETURN_IF_NOT_OK(TimeToStepInterval(start_ts, end_ts, &start_step, &end_step));
606   return GetPushTimeByStep(start_step, end_step, result);
607 }
608 
GetBatchTimeByEpoch(int32_t epoch_num,std::vector<int32_t> * result)609 Status ProfilingManager::GetBatchTimeByEpoch(int32_t epoch_num, std::vector<int32_t> *result) {
610   uint32_t start_step = 0, end_step = 0;
611   RETURN_IF_NOT_OK(EpochToStepInterval(epoch_num, &start_step, &end_step));
612   return GetBatchTimeByStep(start_step, end_step, result);
613 }
614 
GetBatchTimeByStep(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)615 Status ProfilingManager::GetBatchTimeByStep(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
616   std::shared_ptr<Tracing> node;
617   if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
618       GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
619     return node->GetBatchTime(start_step, end_step, result);
620   } else {
621     return {StatusCode::kMDUnexpectedError, "Cannot find appropriate tracing node"};
622   }
623 }
624 
GetBatchTimeByTime(uint64_t start_ts,uint64_t end_ts,std::vector<int32_t> * result)625 Status ProfilingManager::GetBatchTimeByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result) {
626   int32_t start_step = 0, end_step = 0;
627   RETURN_IF_NOT_OK(TimeToStepInterval(start_ts, end_ts, &start_step, &end_step));
628   return GetBatchTimeByStep(start_step, end_step, result);
629 }
630 
GetConnectorSizeByEpoch(int32_t epoch_num,std::vector<int32_t> * result)631 Status ProfilingManager::GetConnectorSizeByEpoch(int32_t epoch_num, std::vector<int32_t> *result) {
632   uint32_t start_step = 0, end_step = 0;
633   RETURN_IF_NOT_OK(EpochToStepInterval(epoch_num, &start_step, &end_step));
634   return GetConnectorSizeByStep(start_step, end_step, result);
635 }
636 
GetConnectorSizeByStep(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)637 Status ProfilingManager::GetConnectorSizeByStep(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
638   std::shared_ptr<Tracing> node;
639   if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
640       GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
641     return node->GetConnectorSize(start_step, end_step, result);
642   } else {
643     return {StatusCode::kMDUnexpectedError, "Cannot find appropriate tracing node"};
644   }
645 }
646 
GetConnectorSizeByTime(uint64_t start_ts,uint64_t end_ts,std::vector<int32_t> * result)647 Status ProfilingManager::GetConnectorSizeByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result) {
648   int32_t start_step = 0, end_step = 0;
649   RETURN_IF_NOT_OK(TimeToStepInterval(start_ts, end_ts, &start_step, &end_step));
650   return GetConnectorSizeByStep(start_step, end_step, result);
651 }
652 
GetEmptyQueueFrequencyByEpoch(int32_t epoch_num,float_t * result)653 Status ProfilingManager::GetEmptyQueueFrequencyByEpoch(int32_t epoch_num, float_t *result) {
654   uint32_t start_step = 0, end_step = 0;
655   RETURN_IF_NOT_OK(EpochToStepInterval(epoch_num, &start_step, &end_step));
656   return GetEmptyQueueFrequencyByStep(start_step, end_step, result);
657 }
658 
GetEmptyQueueFrequencyByStep(int32_t start_step,int32_t end_step,float_t * result)659 Status ProfilingManager::GetEmptyQueueFrequencyByStep(int32_t start_step, int32_t end_step, float_t *result) {
660   std::shared_ptr<Tracing> node;
661   if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
662       GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
663     return node->GetEmptyQueueFrequency(start_step, end_step, result);
664   } else {
665     return {StatusCode::kMDUnexpectedError, "Cannot find appropriate tracing node"};
666   }
667 }
668 
GetEmptyQueueFrequencyByTime(uint64_t start_ts,uint64_t end_ts,float_t * result)669 Status ProfilingManager::GetEmptyQueueFrequencyByTime(uint64_t start_ts, uint64_t end_ts, float_t *result) {
670   int32_t start_step = 0, end_step = 0;
671   RETURN_IF_NOT_OK(TimeToStepInterval(start_ts, end_ts, &start_step, &end_step));
672   return GetEmptyQueueFrequencyByStep(start_step, end_step, result);
673 }
674 
GetConnectorCapacityByEpoch(int32_t epoch_num,std::vector<int32_t> * result)675 Status ProfilingManager::GetConnectorCapacityByEpoch(int32_t epoch_num, std::vector<int32_t> *result) {
676   uint32_t start_step = 0, end_step = 0;
677   RETURN_IF_NOT_OK(EpochToStepInterval(epoch_num, &start_step, &end_step));
678   return GetConnectorCapacityByStep(start_step, end_step, result);
679 }
680 
GetConnectorCapacityByStep(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)681 Status ProfilingManager::GetConnectorCapacityByStep(int32_t start_step, int32_t end_step,
682                                                     std::vector<int32_t> *result) {
683   std::shared_ptr<Tracing> node;
684   if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
685       GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
686     return node->GetConnectorCapacity(start_step, end_step, result);
687   } else {
688     return {StatusCode::kMDUnexpectedError, "Cannot find appropriate tracing node"};
689   }
690 }
691 
GetConnectorCapacityByTime(uint64_t start_ts,uint64_t end_ts,std::vector<int32_t> * result)692 Status ProfilingManager::GetConnectorCapacityByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result) {
693   int32_t start_step = 0, end_step = 0;
694   RETURN_IF_NOT_OK(TimeToStepInterval(start_ts, end_ts, &start_step, &end_step));
695   return GetConnectorCapacityByStep(start_step, end_step, result);
696 }
697 
GetNumberOfProfiledSteps(int32_t * steps)698 Status ProfilingManager::GetNumberOfProfiledSteps(int32_t *steps) {
699   std::shared_ptr<Tracing> node;
700   if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
701       GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
702     *steps = node->GetNumberSteps();
703     return Status::OK();
704   } else {
705     return {StatusCode::kMDUnexpectedError, "Cannot find appropriate tracing node"};
706   }
707 }
708 
RecordEndOfEpoch(uint32_t step_num)709 void ProfilingManager::RecordEndOfEpoch(uint32_t step_num) {
710   if (profiling_state_ != ProfilingState::kProfilingStateRunning) {
711     return;
712   }
713   MS_LOG(INFO) << "Recording end of epoch. step_num: " << step_num;
714   (void)epoch_end_ts_.emplace_back(ProfilingTime::GetCurMilliSecond());
715   (void)epoch_end_step_.emplace_back(step_num);
716 }
717 
Reset()718 Status ProfilingManager::Reset() {
719   for (const auto &node : tracing_nodes_) {
720     node.second->Clear();
721   }
722   for (const auto &node : sampling_nodes_) {
723     node.second->Clear();
724   }
725   epoch_end_ts_.clear();
726   epoch_end_step_.clear();
727   profiling_state_ = ProfilingState::kProfilingStateUnBegun;
728   autotuning_ = false;
729   profiling_ = false;
730   return Status::OK();
731 }
732 
Init(const bool for_autotune)733 Status ProfilingManager::Init(const bool for_autotune) {
734   // Reinitialization should only be done in case of UT with sequential pipelines and should not be used externally.
735   // Reinitialization with parallel data pipelines can have unexpected consequences.
736   CHECK_FAIL_RETURN_UNEXPECTED(!autotuning_, "Stop MD Autotune before initializing the MD Profiler.");
737   CHECK_FAIL_RETURN_UNEXPECTED(!profiling_, "Stop MD Profiler before initializing it.");
738   CHECK_FAIL_RETURN_UNEXPECTED(profiling_state_ != ProfilingState::kProfilingStateRunning,
739                                "Stop MD Profiler before reinitializing it.");
740   RETURN_IF_NOT_OK(Reset());
741   tracing_nodes_.clear();
742   sampling_nodes_.clear();
743   tree_ = nullptr;
744   CHECK_FAIL_RETURN_UNEXPECTED(profiling_state_ == ProfilingState::kProfilingStateUnBegun,
745                                "MD Profiler is in an unexpected state.");
746   if (for_autotune) {
747     autotuning_ = true;
748     MS_LOG(INFO) << "MD profiler is initialized successfully for autotuning.";
749   } else {
750     profiling_ = true;
751     MS_LOG(INFO) << "MD profiler is initialized successfully for profiling.";
752   }
753   return Status::OK();
754 }
755 
Start()756 Status ProfilingManager::Start() {
757   CHECK_FAIL_RETURN_UNEXPECTED(profiling_state_ != ProfilingState::kProfilingStateRunning,
758                                "MD ProfilingManager is already running.");
759   if (profiling_state_ == ProfilingState::kProfilingStateFinished) {
760     // This scenario (start, stop, and then start again) only happens in profiling, not autotune.
761     MS_LOG(INFO) << "MD ProfilingManager had already stopped. Resetting...";
762     RETURN_IF_NOT_OK(Reset());
763     for (const auto &node : sampling_nodes_) {
764       RETURN_IF_NOT_OK(node.second->Init());
765     }
766     for (const auto &node : tracing_nodes_) {
767       RETURN_IF_NOT_OK(node.second->Init());
768     }
769     profiling_ = true;
770     MS_LOG(INFO) << "MD profiler is reset successfully for profiling.";
771   }
772 
773   profiling_state_ = ProfilingState::kProfilingStateRunning;
774   for (const auto &node : tracing_nodes_) {
775     RETURN_IF_NOT_OK(node.second->Start());
776   }
777   for (const auto &node : sampling_nodes_) {
778     RETURN_IF_NOT_OK(node.second->Start());
779   }
780   MS_LOG(INFO) << "MD profiler is started.";
781   return Status::OK();
782 }
783 
Stop()784 Status ProfilingManager::Stop() {
785   CHECK_FAIL_RETURN_UNEXPECTED(profiling_state_ != ProfilingState::kProfilingStateUnBegun,
786                                "MD ProfilingManager has not started yet.");
787   // It's OK if we are in kProfilingStateFinished state. We allow user to call Stop twice.
788   if (profiling_state_ == ProfilingState::kProfilingStateFinished) {
789     MS_LOG(WARNING) << "MD ProfilingManager had already stopped.";
790     return Status::OK();
791   }
792 
793   for (const auto &node : tracing_nodes_) {
794     RETURN_IF_NOT_OK(node.second->Stop());
795   }
796   for (const auto &node : sampling_nodes_) {
797     RETURN_IF_NOT_OK(node.second->Stop());
798   }
799   profiling_state_ = ProfilingState::kProfilingStateFinished;
800   if (autotuning_) {
801     autotuning_ = false;
802     MS_LOG(INFO) << "MD Autotune is stopped.";
803   }
804   if (profiling_) {
805     profiling_ = false;
806     MS_LOG(INFO) << "MD Profiler is stopped.";
807   }
808   return Status::OK();
809 }
810 
Save(const std::string & profile_data_path)811 Status ProfilingManager::Save(const std::string &profile_data_path) {
812   // Validate input profile data path
813   CHECK_FAIL_RETURN_UNEXPECTED(!profile_data_path.empty(), "Invalid parameter, Profiling directory is not set.");
814   CHECK_FAIL_RETURN_UNEXPECTED(profile_data_path.size() < PATH_MAX, "Invalid file, Profiling directory is invalid.");
815 
816   //  profiling file: <profile_data_path>/filename_rank_id.suffix
817   char real_path[PATH_MAX] = {0};
818 #if defined(_WIN32) || defined(_WIN64)
819   if (_fullpath(real_path, common::SafeCStr(profile_data_path), PATH_MAX) == nullptr) {
820     RETURN_STATUS_UNEXPECTED("Profiling dir is invalid.");
821   }
822 #else
823   if (realpath(common::SafeCStr(profile_data_path), real_path) == nullptr) {
824     RETURN_STATUS_UNEXPECTED("Invalid file, can not get realpath of Profiling directory.");
825   }
826 #endif
827 
828   std::string rank_id = GetRankID();
829   // Output all profiling data upon request.
830   RETURN_IF_NOT_OK(SaveProfilingData(std::string(profile_data_path), rank_id));
831   RETURN_IF_NOT_OK(ChangeFileMode(std::string(profile_data_path), rank_id));
832   return Status::OK();
833 }
834 
GetProfilerTreeState(const ExecutionTree * tree) const835 ProfilingManager::ProfilingRegistrationState ProfilingManager::GetProfilerTreeState(const ExecutionTree *tree) const {
836   auto enabled = (profiling_ || autotuning_);
837   if (!enabled) {
838     return kNotEnabled;
839   }
840   if (tree_ == nullptr) {
841     return kEnabledTreeNotRegistered;
842   } else {
843     return tree_ == tree ? kEnabledTreeRegistered : kEnabledDifferentTreeRegistered;
844   }
845 }
846 
GetRankID() const847 std::string ProfilingManager::GetRankID() const {
848   std::string rank_id = common::GetEnv("RANK_ID");
849 #ifdef WITH_BACKEND
850   MS_EXCEPTION_IF_NULL(MsContext::GetInstance());
851   if (MsContext::GetInstance()->get_param<std::string>(MS_CTX_DEVICE_TARGET) == kGPUDevice) {
852     std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
853     int32_t rank_id_int = cfg->rank_id();
854     // If DEVICE_ID is not set, default value is 0
855     if (rank_id_int < 0) {
856       rank_id = common::GetEnv("DEVICE_ID");
857     } else {
858       rank_id = std::to_string(rank_id_int);
859     }
860   }
861 #endif
862   // If RANK_ID is not set, default value is 0
863   if (rank_id.empty()) {
864     rank_id = "0";
865   }
866   return rank_id;
867 }
868 
GetCurMilliSecond()869 uint64_t ProfilingTime::GetCurMilliSecond() {
870   // because cpplint does not allow using namespace
871   using std::chrono::duration_cast;
872   using std::chrono::milliseconds;
873   using std::chrono::steady_clock;
874   return static_cast<uint64_t>(duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count());
875 }
876 }  // namespace dataset
877 }  // namespace mindspore
878