1 /**
2 * Copyright 2020-2022 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/perf/profiling.h"
17
18 #include <sys/stat.h>
19
20 #include <algorithm>
21 #include <cstdlib>
22 #include <fstream>
23
24 #include "minddata/dataset/engine/execution_tree.h"
25 #include "minddata/dataset/engine/perf/connector_size.h"
26 #include "minddata/dataset/engine/perf/cpu_sampler.h"
27 #include "minddata/dataset/engine/perf/monitor.h"
28 #include "minddata/dataset/engine/tree_adapter.h"
29 #include "minddata/dataset/util/log_adapter.h"
30 #include "minddata/dataset/util/path.h"
31 #ifdef WITH_BACKEND
32 #include "utils/ms_context.h"
33 #endif
34 #include "utils/ms_utils.h"
35 #ifndef BUILD_LITE
36 #include "mindspore/core/utils/file_utils.h"
37 namespace platform = mindspore;
38 #else
39 #include "mindspore/lite/src/common/file_utils.h"
40 namespace platform = mindspore::lite;
41 #endif
42
43 namespace mindspore {
44 namespace dataset {
45 constexpr int32_t PUSH_TIME_OFFSET = 0;
46 constexpr int32_t BATCH_TIME_OFFSET = 1;
47 constexpr int32_t PIPELINE_TIME_OFFSET = 2;
48 constexpr int32_t CONNECTOR_DEPTH_OFFSET = 3;
49
Start()50 Status Profiling::Start() {
51 CHECK_FAIL_RETURN_UNEXPECTED(active_ == false, "Profiling node is already active.");
52 active_ = true;
53 return Status::OK();
54 }
55
Stop()56 Status Profiling::Stop() {
57 CHECK_FAIL_RETURN_UNEXPECTED(active_ == true, "Profiling node is already deactivated.");
58 active_ = false;
59 return Status::OK();
60 }
61
SaveToFile(const std::string & dir_path,const std::string & rank_id)62 Status Tracing::SaveToFile(const std::string &dir_path, const std::string &rank_id) {
63 if (value_.empty()) {
64 return Status::OK();
65 }
66
67 Path path = GetFileName(dir_path, rank_id);
68 // Remove the file if it exists (from prior profiling usage)
69 RETURN_IF_NOT_OK(path.Remove());
70 std::string file_path = path.ToString();
71
72 MS_LOG(INFO) << "Start to save profiling data for a tracing node.";
73 std::ofstream handle(file_path, std::ios::out | std::ios::trunc);
74 if (!handle.is_open()) {
75 RETURN_STATUS_UNEXPECTED("Profiling file can not be opened.");
76 }
77 for (const auto &value : value_) {
78 handle << value << "\n";
79 }
80 handle.close();
81
82 platform::ChangeFileMode(file_path, S_IRUSR | S_IWUSR);
83
84 return Status::OK();
85 }
86
ChangeFileMode(const std::string & dir_path,const std::string & rank_id)87 Status Tracing::ChangeFileMode(const std::string &dir_path, const std::string &rank_id) {
88 if (value_.empty()) {
89 return Status::OK();
90 }
91
92 Path path = GetFileName(dir_path, rank_id);
93 std::string file_path = path.ToString();
94 if (chmod(common::SafeCStr(file_path), S_IRUSR | S_IWUSR) == -1) {
95 std::string err_str = "Change file mode failed," + file_path;
96 return Status(StatusCode::kMDUnexpectedError, err_str);
97 }
98 return Status::OK();
99 }
100
Record(const int32_t type,const int32_t extra_info,const int32_t batch_num,const int32_t value,const uint64_t time_stamp)101 void Tracing::Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, const int32_t value,
102 const uint64_t time_stamp) {
103 // Format: "type extra-info batch-num value"
104 // type: 0: time, 1: connector size
105 // extra-info: if type is 0 - 0: pipeline time, 1: push tdt time, 2: batch time
106 // if type is 1 - connector capacity
107 // batch-num: batch number
108 // value: if type is 0 - value is time(ms)
109 // if type is 1 - value is connector size
110 // time-stamp: time stamp
111 // Examples:
112 // 0 0 20 10 xxx- The 20th batch took 10ms to get data from pipeline.
113 // 1 64 20 5 xxx- Connector size is 5 when get the 20th batch.Connector capacity is 64.
114 if (!active_) {
115 return;
116 }
117 TracingRecord record = {type, extra_info, batch_num, value, time_stamp};
118 std::lock_guard<std::mutex> guard(lock_);
119 (void)records_.emplace_back(record);
120 (void)value_.emplace_back(record.ToString());
121 // save timestamp per batch
122 const constexpr int32_t RECORDS_PER_STEP = 4;
123 if (records_.size() % RECORDS_PER_STEP == 0) {
124 (void)ts_.emplace_back(time_stamp);
125 }
126 }
127
TimeIntervalForStepRange(int32_t start_step,int32_t end_step,uint64_t * start_ts,uint64_t * end_ts)128 Status Tracing::TimeIntervalForStepRange(int32_t start_step, int32_t end_step, uint64_t *start_ts, uint64_t *end_ts) {
129 RETURN_UNEXPECTED_IF_NULL(start_ts);
130 RETURN_UNEXPECTED_IF_NULL(end_ts);
131 std::lock_guard<std::mutex> guard(lock_);
132 MS_LOG(DEBUG) << "start_step: " << start_step << " end_step: " << end_step;
133 CHECK_FAIL_RETURN_UNEXPECTED(start_step > 0,
134 "Expected start_step > 0. Got start_step: " + std::to_string(start_step));
135 CHECK_FAIL_RETURN_UNEXPECTED(end_step >= start_step,
136 "Expected end_step >= start_step. Got start_step: " + std::to_string(start_step) +
137 " end_step: " + std::to_string(end_step));
138 CHECK_FAIL_RETURN_UNEXPECTED(end_step < static_cast<int32_t>(ts_.size()),
139 "Expected end_step < ts_.size(). Got end_step: " + std::to_string(end_step) +
140 " ts_.size: " + std::to_string(ts_.size()));
141 // end timestamp of (start_step - 1) step
142 *start_ts = ts_[start_step - 1];
143 *end_ts = ts_[end_step];
144 return Status::OK();
145 }
146
StepIntervalForTimeRange(uint64_t start_ts,uint64_t end_ts,int32_t * start_step,int32_t * end_step)147 Status Tracing::StepIntervalForTimeRange(uint64_t start_ts, uint64_t end_ts, int32_t *start_step, int32_t *end_step) {
148 RETURN_UNEXPECTED_IF_NULL(start_step);
149 RETURN_UNEXPECTED_IF_NULL(end_step);
150 CHECK_FAIL_RETURN_UNEXPECTED(start_ts < end_ts, "Expected start_ts < end_ts. Got start_ts: " +
151 std::to_string(start_ts) + " end_ts: " + std::to_string(end_ts));
152 std::lock_guard<std::mutex> guard(lock_);
153 CHECK_FAIL_RETURN_UNEXPECTED(ts_.size() > 1, "No tracing data available yet.");
154 // find first ts that is not less than start_ts
155 auto lower = std::lower_bound(ts_.begin(), ts_.end(), start_ts);
156 CHECK_FAIL_RETURN_UNEXPECTED(lower != ts_.end(),
157 "No data available for time >= start_ts. start_ts: " + std::to_string(start_ts));
158 // there is no 0th step. If start_ts == 0, then lower == ts_.begin()
159 *start_step = std::max(1, static_cast<int32_t>(std::distance(ts_.begin(), lower)));
160 // find first ts that is greater than end_ts
161 auto upper = std::upper_bound(ts_.begin(), ts_.end(), end_ts);
162 if (upper == ts_.end()) {
163 *end_step = std::max(1, static_cast<int32_t>(std::distance(ts_.begin(), upper) - 1));
164 } else {
165 *end_step = std::max(1, static_cast<int32_t>(std::distance(ts_.begin(), upper)));
166 }
167 return Status::OK();
168 }
169
GetRecordEntryFieldValue(int32_t start_step,int32_t end_step,int32_t record_offset,const std::string & field,std::vector<int32_t> * result)170 Status Tracing::GetRecordEntryFieldValue(int32_t start_step, int32_t end_step, int32_t record_offset,
171 const std::string &field, std::vector<int32_t> *result) {
172 RETURN_UNEXPECTED_IF_NULL(result);
173 std::lock_guard<std::mutex> guard(lock_);
174 const constexpr int32_t RECORDS_PER_STEP = 4;
175 auto total_steps = records_.size() / RECORDS_PER_STEP;
176 MS_LOG(DEBUG) << "start_step: " << start_step << " end_step: " << end_step;
177 CHECK_FAIL_RETURN_UNEXPECTED(start_step <= static_cast<int32_t>(total_steps),
178 "Expected start_step <= total_steps. Got start_step: " + std::to_string(start_step) +
179 " total_steps: " + std::to_string(total_steps));
180 CHECK_FAIL_RETURN_UNEXPECTED(end_step <= static_cast<int32_t>(total_steps),
181 "Expected end_step <= total_steps. Got end_step: " + std::to_string(end_step) +
182 " total_steps: " + std::to_string(total_steps));
183 CHECK_FAIL_RETURN_UNEXPECTED(start_step <= end_step,
184 "Expected start_step <= end_step. Got start_step: " + std::to_string(start_step) +
185 " end_step: " + std::to_string(end_step));
186
187 for (auto step_num = start_step; step_num <= end_step; step_num++) {
188 auto idx = (step_num - 1) * RECORDS_PER_STEP + record_offset;
189 CHECK_FAIL_RETURN_UNEXPECTED(idx >= 0, "Expected idx >= 0. Got idx: " + std::to_string(idx));
190 if (field == "value") {
191 (void)result->emplace_back(records_[static_cast<size_t>(idx)].value);
192 } else if (field == "extra_info") {
193 (void)result->emplace_back(records_[static_cast<size_t>(idx)].extra_info);
194 } else {
195 return {StatusCode::kMDUnexpectedError,
196 "Received unexpected field: " + field + R"(. Expected: ["value", "extra_info"].)"};
197 }
198 }
199 return Status::OK();
200 }
201
GetPipelineTime(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)202 Status Tracing::GetPipelineTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
203 return GetRecordEntryFieldValue(start_step, end_step, PIPELINE_TIME_OFFSET, "value", result);
204 }
205
GetPushTime(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)206 Status Tracing::GetPushTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
207 return GetRecordEntryFieldValue(start_step, end_step, PUSH_TIME_OFFSET, "value", result);
208 }
209
GetBatchTime(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)210 Status Tracing::GetBatchTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
211 return GetRecordEntryFieldValue(start_step, end_step, BATCH_TIME_OFFSET, "value", result);
212 }
213
GetConnectorSize(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)214 Status Tracing::GetConnectorSize(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
215 return GetRecordEntryFieldValue(start_step, end_step, CONNECTOR_DEPTH_OFFSET, "value", result);
216 }
217
GetConnectorCapacity(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)218 Status Tracing::GetConnectorCapacity(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
219 return GetRecordEntryFieldValue(start_step, end_step, CONNECTOR_DEPTH_OFFSET, "extra_info", result);
220 }
221
GetEmptyQueueFrequency(int32_t start_step,int32_t end_step,float_t * empty_queue_freq)222 Status Tracing::GetEmptyQueueFrequency(int32_t start_step, int32_t end_step, float_t *empty_queue_freq) {
223 RETURN_UNEXPECTED_IF_NULL(empty_queue_freq);
224 std::vector<int32_t> sizes;
225 RETURN_IF_NOT_OK(GetConnectorSize(start_step, end_step, &sizes));
226 int32_t total = end_step - start_step + 1;
227 CHECK_FAIL_RETURN_UNEXPECTED(total > 0, "Start step is greater than end step.");
228 uint32_t count = static_cast<uint32_t>(std::count(sizes.begin(), sizes.end(), 0));
229 *empty_queue_freq = static_cast<float_t>(count) / static_cast<float_t>(total);
230 return Status::OK();
231 }
232
Init()233 Status Tracing::Init() {
234 (void)ts_.emplace_back(0);
235 return Status::OK();
236 }
237
GetNumberSteps()238 size_t Tracing::GetNumberSteps() { return ts_.size(); }
239
Clear()240 void Tracing::Clear() {
241 value_.clear();
242 records_.clear();
243 ts_.clear();
244 }
245
246 // Constructor
ProfilingManager()247 ProfilingManager::ProfilingManager()
248 : profiling_state_(ProfilingState::kProfilingStateUnBegun), tree_(nullptr), autotuning_(false), profiling_(false) {}
249
IsProfilingEnable(const ExecutionTree * tree) const250 bool ProfilingManager::IsProfilingEnable(const ExecutionTree *tree) const {
251 auto external_state = GetProfilerTreeState(tree);
252 return (external_state == kEnabledTreeNotRegistered || external_state == kEnabledTreeRegistered);
253 }
254
RegisterTree(const TreeAdapter * tree_adapter)255 Status ProfilingManager::RegisterTree(const TreeAdapter *tree_adapter) {
256 RETURN_UNEXPECTED_IF_NULL(tree_adapter);
257 CHECK_FAIL_RETURN_UNEXPECTED(tree_ == nullptr, "Another tree is already registered.");
258 CHECK_FAIL_RETURN_UNEXPECTED((autotuning_ || profiling_) == true,
259 "MD Profiler is disabled. Cannot register the tree.");
260 tree_ = tree_adapter->tree_.get();
261 MS_LOG(INFO) << "Registering tree: " + tree_->GetUniqueId();
262 perf_monitor_ = std::make_unique<Monitor>(this);
263 // Register all sampling nodes here.
264 // Tracing node registration is the responsibility of the Consumer
265 std::shared_ptr<Sampling> connector_size_sampling = std::make_shared<ConnectorSize>(tree_);
266 RETURN_IF_NOT_OK(RegisterSamplingNode(connector_size_sampling));
267
268 #ifndef ENABLE_ANDROID
269 std::shared_ptr<Sampling> cpu_sampler = std::make_shared<CpuSampler>(tree_);
270 RETURN_IF_NOT_OK(RegisterSamplingNode(cpu_sampler));
271 #endif
272 // can insert a correct timestamp so that we can ignore the samples that were taken
273 // during start up of the pipeline.
274 (void)epoch_end_ts_.emplace_back(0);
275 (void)epoch_end_step_.emplace_back(0);
276 return Status::OK();
277 }
278
279 // Launch monitoring thread.
LaunchMonitor()280 Status ProfilingManager::LaunchMonitor() {
281 RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Monitor Thread launched", std::ref(*perf_monitor_)));
282 return Status::OK();
283 }
284
285 // Profiling node registration
RegisterTracingNode(const std::shared_ptr<Tracing> & node)286 Status ProfilingManager::RegisterTracingNode(const std::shared_ptr<Tracing> &node) {
287 // Check if node with the same name has already been registered.
288 auto exist = tracing_nodes_.find(node->Name());
289 if (exist != tracing_nodes_.end()) {
290 return Status(StatusCode::kMDProfilingError, "Profiling node already exist: " + node->Name());
291 }
292 // Register the node with its name as key.
293 RETURN_IF_NOT_OK(node->Init());
294 tracing_nodes_[node->Name()] = node;
295
296 // the user may have already started profiling.
297 if (profiling_state_ == ProfilingState::kProfilingStateRunning) {
298 RETURN_IF_NOT_OK(node->Start());
299 }
300 return Status::OK();
301 }
302
303 // Profiling node getter
GetTracingNode(const std::string & name,std::shared_ptr<Tracing> * node)304 Status ProfilingManager::GetTracingNode(const std::string &name, std::shared_ptr<Tracing> *node) {
305 // Check if node with the same name has already been registered.
306 auto exist = tracing_nodes_.find(name);
307 if (exist == tracing_nodes_.end()) {
308 return Status(StatusCode::kMDProfilingError, "Profiling node does not exist: " + name);
309 }
310 // Fetch node.
311 *node = tracing_nodes_[name];
312 return Status::OK();
313 }
314
315 // Profiling node registration
RegisterSamplingNode(const std::shared_ptr<Sampling> & node)316 Status ProfilingManager::RegisterSamplingNode(const std::shared_ptr<Sampling> &node) {
317 // Check if node with the same name has already been registered.
318 auto exist = sampling_nodes_.find(node->Name());
319 if (exist != sampling_nodes_.end()) {
320 return Status(StatusCode::kMDProfilingError, "Profiling node already exist: " + node->Name());
321 }
322 // Register the node with its name as key.
323 RETURN_IF_NOT_OK(node->Init());
324 sampling_nodes_[node->Name()] = node;
325
326 // the user may have already started profiling.
327 if (profiling_state_ == ProfilingState::kProfilingStateRunning) {
328 RETURN_IF_NOT_OK(node->Start());
329 }
330 return Status::OK();
331 }
332
333 // Profiling node getter
GetSamplingNode(const std::string & name,std::shared_ptr<Sampling> * node)334 Status ProfilingManager::GetSamplingNode(const std::string &name, std::shared_ptr<Sampling> *node) {
335 // Check if node with the same name has already been registered.
336 auto exist = sampling_nodes_.find(name);
337 if (exist == sampling_nodes_.end()) {
338 return Status(StatusCode::kMDProfilingError, "Profiling node does not exist: " + name);
339 }
340 // Fetch node.
341 *node = sampling_nodes_[name];
342 return Status::OK();
343 }
344
SaveProfilingData(const std::string & dir_path,const std::string & rank_id)345 Status ProfilingManager::SaveProfilingData(const std::string &dir_path, const std::string &rank_id) {
346 MS_LOG(INFO) << "Start to save profiling data.";
347 for (const auto &node : tracing_nodes_) {
348 RETURN_IF_NOT_OK(node.second->SaveToFile(dir_path, rank_id));
349 }
350 for (const auto &node : sampling_nodes_) {
351 RETURN_IF_NOT_OK(node.second->SaveToFile(dir_path, rank_id));
352 }
353 MS_LOG(INFO) << "Save profiling data end.";
354 return Status::OK();
355 }
356
ChangeFileMode(const std::string & dir_path,const std::string & rank_id)357 Status ProfilingManager::ChangeFileMode(const std::string &dir_path, const std::string &rank_id) {
358 MS_LOG(INFO) << "Start to change file mode.";
359 for (const auto &node : tracing_nodes_) {
360 RETURN_IF_NOT_OK(node.second->ChangeFileMode(dir_path, rank_id));
361 }
362 for (const auto &node : sampling_nodes_) {
363 RETURN_IF_NOT_OK(node.second->ChangeFileMode(dir_path, rank_id));
364 }
365 MS_LOG(INFO) << "Change file mode end.";
366 return Status::OK();
367 }
368
369 #ifndef ENABLE_ANDROID
GetUserCpuUtilByEpoch(int32_t epoch_num,std::vector<uint8_t> * result)370 Status ProfilingManager::GetUserCpuUtilByEpoch(int32_t epoch_num, std::vector<uint8_t> *result) {
371 uint64_t start_ts = 0, end_ts = 0;
372 RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, &start_ts, &end_ts));
373 return GetUserCpuUtilByTime(start_ts, end_ts, result);
374 }
375
GetUserCpuUtilByStep(int32_t start_step,int32_t end_step,std::vector<uint8_t> * result)376 Status ProfilingManager::GetUserCpuUtilByStep(int32_t start_step, int32_t end_step, std::vector<uint8_t> *result) {
377 uint64_t start_ts = 0, end_ts = 0;
378 RETURN_IF_NOT_OK(StepToTimeInterval(start_step, end_step, &start_ts, &end_ts));
379 return GetUserCpuUtilByTime(start_ts, end_ts, result);
380 }
381
GetUserCpuUtilByTime(uint64_t start_ts,uint64_t end_ts,std::vector<uint8_t> * result)382 Status ProfilingManager::GetUserCpuUtilByTime(uint64_t start_ts, uint64_t end_ts, std::vector<uint8_t> *result) {
383 std::shared_ptr<Sampling> sampling_node;
384 RETURN_IF_NOT_OK(GetSamplingNode(kCpuSamplerName, &sampling_node));
385 auto node = std::dynamic_pointer_cast<CpuSampler>(sampling_node);
386 return node->GetSystemUserCpuUtil(start_ts, end_ts, result);
387 }
388
GetSysCpuUtilByEpoch(int32_t epoch_num,std::vector<uint8_t> * result)389 Status ProfilingManager::GetSysCpuUtilByEpoch(int32_t epoch_num, std::vector<uint8_t> *result) {
390 uint64_t start_ts = 0, end_ts = 0;
391 RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, &start_ts, &end_ts));
392 return GetSysCpuUtilByTime(start_ts, end_ts, result);
393 }
394
GetSysCpuUtilByStep(int32_t start_step,int32_t end_step,std::vector<uint8_t> * result)395 Status ProfilingManager::GetSysCpuUtilByStep(int32_t start_step, int32_t end_step, std::vector<uint8_t> *result) {
396 uint64_t start_ts = 0, end_ts = 0;
397 RETURN_IF_NOT_OK(StepToTimeInterval(start_step, end_step, &start_ts, &end_ts));
398 return GetSysCpuUtilByTime(start_ts, end_ts, result);
399 }
400
GetSysCpuUtilByTime(uint64_t start_ts,uint64_t end_ts,std::vector<uint8_t> * result)401 Status ProfilingManager::GetSysCpuUtilByTime(uint64_t start_ts, uint64_t end_ts, std::vector<uint8_t> *result) {
402 std::shared_ptr<Sampling> sampling_node;
403 RETURN_IF_NOT_OK(GetSamplingNode(kCpuSamplerName, &sampling_node));
404 auto node = std::dynamic_pointer_cast<CpuSampler>(sampling_node);
405 return node->GetSystemSysCpuUtil(start_ts, end_ts, result);
406 }
407
GetUserCpuUtilByEpoch(int32_t op_id,int32_t epoch_num,std::vector<uint16_t> * result)408 Status ProfilingManager::GetUserCpuUtilByEpoch(int32_t op_id, int32_t epoch_num, std::vector<uint16_t> *result) {
409 uint64_t start_ts = 0, end_ts = 0;
410 RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, &start_ts, &end_ts));
411 return GetUserCpuUtilByTime(op_id, start_ts, end_ts, result);
412 }
413
GetUserCpuUtilByStep(int32_t op_id,int32_t start_step,int32_t end_step,std::vector<uint16_t> * result)414 Status ProfilingManager::GetUserCpuUtilByStep(int32_t op_id, int32_t start_step, int32_t end_step,
415 std::vector<uint16_t> *result) {
416 uint64_t start_ts = 0, end_ts = 0;
417 RETURN_IF_NOT_OK(StepToTimeInterval(start_step, end_step, &start_ts, &end_ts));
418 return GetUserCpuUtilByTime(op_id, start_ts, end_ts, result);
419 }
420
GetUserCpuUtilByTime(int32_t op_id,uint64_t start_ts,uint64_t end_ts,std::vector<uint16_t> * result)421 Status ProfilingManager::GetUserCpuUtilByTime(int32_t op_id, uint64_t start_ts, uint64_t end_ts,
422 std::vector<uint16_t> *result) {
423 std::shared_ptr<Sampling> sampling_node;
424 RETURN_IF_NOT_OK(GetSamplingNode(kCpuSamplerName, &sampling_node));
425 auto node = std::dynamic_pointer_cast<CpuSampler>(sampling_node);
426 return node->GetOpUserCpuUtil(op_id, start_ts, end_ts, result);
427 }
428
GetSysCpuUtilByEpoch(int32_t op_id,int32_t epoch_num,std::vector<uint16_t> * result)429 Status ProfilingManager::GetSysCpuUtilByEpoch(int32_t op_id, int32_t epoch_num, std::vector<uint16_t> *result) {
430 uint64_t start_ts = 0, end_ts = 0;
431 RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, &start_ts, &end_ts));
432 return GetSysCpuUtilByTime(op_id, start_ts, end_ts, result);
433 }
434
GetSysCpuUtilByStep(int32_t op_id,int32_t start_step,int32_t end_step,std::vector<uint16_t> * result)435 Status ProfilingManager::GetSysCpuUtilByStep(int32_t op_id, int32_t start_step, int32_t end_step,
436 std::vector<uint16_t> *result) {
437 uint64_t start_ts = 0, end_ts = 0;
438 RETURN_IF_NOT_OK(StepToTimeInterval(start_step, end_step, &start_ts, &end_ts));
439 return GetSysCpuUtilByTime(op_id, start_ts, end_ts, result);
440 }
441
GetSysCpuUtilByTime(int32_t op_id,uint64_t start_ts,uint64_t end_ts,std::vector<uint16_t> * result)442 Status ProfilingManager::GetSysCpuUtilByTime(int32_t op_id, uint64_t start_ts, uint64_t end_ts,
443 std::vector<uint16_t> *result) {
444 std::shared_ptr<Sampling> sampling_node;
445 RETURN_IF_NOT_OK(GetSamplingNode(kCpuSamplerName, &sampling_node));
446 auto node = std::dynamic_pointer_cast<CpuSampler>(sampling_node);
447 return node->GetOpSysCpuUtil(op_id, start_ts, end_ts, result);
448 }
449
GetMainProcessMemoryInfoByEpoch(ProcessMemoryMetric metric,int32_t epoch_num,std::vector<float> * result)450 Status ProfilingManager::GetMainProcessMemoryInfoByEpoch(ProcessMemoryMetric metric, int32_t epoch_num,
451 std::vector<float> *result) {
452 uint64_t start_ts = 0, end_ts = 0;
453 RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, &start_ts, &end_ts));
454 return GetMainProcessMemoryInfoByTime(metric, start_ts, end_ts, result);
455 }
456
GetMainProcessMemoryInfoByStep(ProcessMemoryMetric metric,int32_t start_step,int32_t end_step,std::vector<float> * result)457 Status ProfilingManager::GetMainProcessMemoryInfoByStep(ProcessMemoryMetric metric, int32_t start_step,
458 int32_t end_step, std::vector<float> *result) {
459 uint64_t start_ts = 0, end_ts = 0;
460 RETURN_IF_NOT_OK(StepToTimeInterval(start_step, end_step, &start_ts, &end_ts));
461 return GetMainProcessMemoryInfoByTime(metric, start_ts, end_ts, result);
462 }
463
GetMainProcessMemoryInfoByTime(ProcessMemoryMetric metric,uint64_t start_ts,uint64_t end_ts,std::vector<float> * result)464 Status ProfilingManager::GetMainProcessMemoryInfoByTime(ProcessMemoryMetric metric, uint64_t start_ts, uint64_t end_ts,
465 std::vector<float> *result) {
466 std::shared_ptr<Sampling> sampling_node;
467 RETURN_IF_NOT_OK(GetSamplingNode(kCpuSamplerName, &sampling_node));
468 auto node = std::dynamic_pointer_cast<CpuSampler>(sampling_node);
469 return node->GetProcessMemoryInfo(metric, start_ts, end_ts, result);
470 }
471
GetSystemMemoryInfoByEpoch(SystemMemoryMetric metric,int32_t epoch_num,std::vector<float> * result)472 Status ProfilingManager::GetSystemMemoryInfoByEpoch(SystemMemoryMetric metric, int32_t epoch_num,
473 std::vector<float> *result) {
474 uint64_t start_ts = 0, end_ts = 0;
475 RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, &start_ts, &end_ts));
476 return GetSystemMemoryInfoByTime(metric, start_ts, end_ts, result);
477 }
478
GetSystemMemoryInfoByStep(SystemMemoryMetric metric,int32_t start_step,int32_t end_step,std::vector<float> * result)479 Status ProfilingManager::GetSystemMemoryInfoByStep(SystemMemoryMetric metric, int32_t start_step, int32_t end_step,
480 std::vector<float> *result) {
481 uint64_t start_ts = 0, end_ts = 0;
482 RETURN_IF_NOT_OK(StepToTimeInterval(start_step, end_step, &start_ts, &end_ts));
483 return GetSystemMemoryInfoByTime(metric, start_ts, end_ts, result);
484 }
485
GetSystemMemoryInfoByTime(SystemMemoryMetric metric,uint64_t start_ts,uint64_t end_ts,std::vector<float> * result)486 Status ProfilingManager::GetSystemMemoryInfoByTime(SystemMemoryMetric metric, uint64_t start_ts, uint64_t end_ts,
487 std::vector<float> *result) {
488 std::shared_ptr<Sampling> sampling_node;
489 RETURN_IF_NOT_OK(GetSamplingNode(kCpuSamplerName, &sampling_node));
490 auto node = std::dynamic_pointer_cast<CpuSampler>(sampling_node);
491 return node->GetSystemMemoryInfo(metric, start_ts, end_ts, result);
492 }
493 #endif
494
EpochToTimeInterval(int32_t epoch_num,uint64_t * start_ts,uint64_t * end_ts)495 Status ProfilingManager::EpochToTimeInterval(int32_t epoch_num, uint64_t *start_ts, uint64_t *end_ts) {
496 RETURN_UNEXPECTED_IF_NULL(start_ts);
497 RETURN_UNEXPECTED_IF_NULL(end_ts);
498 if (epoch_num <= 0 || epoch_num >= static_cast<int32_t>(epoch_end_ts_.size())) {
499 std::string err = "Epoch: " + std::to_string(epoch_num) + " is invalid.";
500 MS_LOG(INFO) << err;
501 return {StatusCode::kMDUnexpectedError, err};
502 }
503 *start_ts = epoch_end_ts_[epoch_num - 1];
504 *end_ts = epoch_end_ts_[epoch_num];
505 return Status::OK();
506 }
507
EpochToStepInterval(int32_t epoch_num,uint32_t * start_step,uint32_t * end_step)508 Status ProfilingManager::EpochToStepInterval(int32_t epoch_num, uint32_t *start_step, uint32_t *end_step) {
509 RETURN_UNEXPECTED_IF_NULL(start_step);
510 RETURN_UNEXPECTED_IF_NULL(end_step);
511 if (epoch_num <= 0 || epoch_num >= static_cast<int32_t>(epoch_end_step_.size())) {
512 std::string err = "Epoch: " + std::to_string(epoch_num) + " is invalid.";
513 return {StatusCode::kMDUnexpectedError, err};
514 }
515 *start_step = epoch_end_step_[epoch_num - 1] + 1;
516 *end_step = epoch_end_step_[epoch_num];
517 return Status::OK();
518 }
519
StepToTimeInterval(int32_t start_step,int32_t end_step,uint64_t * start_ts,uint64_t * end_ts)520 Status ProfilingManager::StepToTimeInterval(int32_t start_step, int32_t end_step, uint64_t *start_ts,
521 uint64_t *end_ts) {
522 std::shared_ptr<Tracing> node;
523 if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
524 GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
525 return node->TimeIntervalForStepRange(start_step, end_step, start_ts, end_ts);
526 } else {
527 return {StatusCode::kMDUnexpectedError,
528 "Cannot find appropriate tracing node to convert step range to time interval."};
529 }
530 }
531
TimeToStepInterval(uint64_t start_ts,uint64_t end_ts,int32_t * start_step,int32_t * end_step)532 Status ProfilingManager::TimeToStepInterval(uint64_t start_ts, uint64_t end_ts, int32_t *start_step,
533 int32_t *end_step) {
534 std::shared_ptr<Tracing> node;
535 if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
536 GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
537 return node->StepIntervalForTimeRange(start_ts, end_ts, start_step, end_step);
538 } else {
539 return {StatusCode::kMDUnexpectedError,
540 "Cannot find appropriate tracing node to convert time interval to step range."};
541 }
542 }
543
GetConnectorSizeByEpoch(int32_t op_id,int32_t epoch_num,std::vector<int32_t> * result)544 Status ProfilingManager::GetConnectorSizeByEpoch(int32_t op_id, int32_t epoch_num, std::vector<int32_t> *result) {
545 uint64_t start_ts = 0, end_ts = 0;
546 RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, &start_ts, &end_ts));
547 return GetConnectorSizeByTime(op_id, start_ts, end_ts, result);
548 }
549
GetConnectorSizeByStep(int32_t op_id,int32_t start_step,int32_t end_step,std::vector<int32_t> * result)550 Status ProfilingManager::GetConnectorSizeByStep(int32_t op_id, int32_t start_step, int32_t end_step,
551 std::vector<int32_t> *result) {
552 uint64_t start_ts = 0, end_ts = 0;
553 RETURN_IF_NOT_OK(StepToTimeInterval(start_step, end_step, &start_ts, &end_ts));
554 return GetConnectorSizeByTime(op_id, start_ts, end_ts, result);
555 }
556
GetConnectorSizeByTime(int32_t op_id,uint64_t start_ts,uint64_t end_ts,std::vector<int32_t> * result)557 Status ProfilingManager::GetConnectorSizeByTime(int32_t op_id, uint64_t start_ts, uint64_t end_ts,
558 std::vector<int32_t> *result) {
559 std::shared_ptr<Sampling> node;
560 RETURN_IF_NOT_OK(GetSamplingNode(kConnectorSizeSamplingName, &node));
561 auto connector_node = std::dynamic_pointer_cast<ConnectorSize>(node);
562 return connector_node->GetOpConnectorSize(op_id, start_ts, end_ts, result);
563 }
564
GetPipelineTimeByEpoch(int32_t epoch_num,std::vector<int32_t> * result)565 Status ProfilingManager::GetPipelineTimeByEpoch(int32_t epoch_num, std::vector<int32_t> *result) {
566 uint32_t start_step = 0, end_step = 0;
567 RETURN_IF_NOT_OK(EpochToStepInterval(epoch_num, &start_step, &end_step));
568 return GetPipelineTimeByStep(start_step, end_step, result);
569 }
570
GetPipelineTimeByStep(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)571 Status ProfilingManager::GetPipelineTimeByStep(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
572 std::shared_ptr<Tracing> node;
573 if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
574 GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
575 return node->GetPipelineTime(start_step, end_step, result);
576 } else {
577 return {StatusCode::kMDUnexpectedError, "Cannot find appropriate tracing node"};
578 }
579 }
580
GetPipelineTimeByTime(uint64_t start_ts,uint64_t end_ts,std::vector<int32_t> * result)581 Status ProfilingManager::GetPipelineTimeByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result) {
582 int32_t start_step = 0, end_step = 0;
583 RETURN_IF_NOT_OK(TimeToStepInterval(start_ts, end_ts, &start_step, &end_step));
584 return GetPipelineTimeByStep(start_step, end_step, result);
585 }
586
GetPushTimeByEpoch(int32_t epoch_num,std::vector<int32_t> * result)587 Status ProfilingManager::GetPushTimeByEpoch(int32_t epoch_num, std::vector<int32_t> *result) {
588 uint32_t start_step = 0, end_step = 0;
589 RETURN_IF_NOT_OK(EpochToStepInterval(epoch_num, &start_step, &end_step));
590 return GetPushTimeByStep(start_step, end_step, result);
591 }
592
GetPushTimeByStep(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)593 Status ProfilingManager::GetPushTimeByStep(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
594 std::shared_ptr<Tracing> node;
595 if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
596 GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
597 return node->GetPushTime(start_step, end_step, result);
598 } else {
599 return {StatusCode::kMDUnexpectedError, "Cannot find appropriate tracing node"};
600 }
601 }
602
GetPushTimeByTime(uint64_t start_ts,uint64_t end_ts,std::vector<int32_t> * result)603 Status ProfilingManager::GetPushTimeByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result) {
604 int32_t start_step = 0, end_step = 0;
605 RETURN_IF_NOT_OK(TimeToStepInterval(start_ts, end_ts, &start_step, &end_step));
606 return GetPushTimeByStep(start_step, end_step, result);
607 }
608
GetBatchTimeByEpoch(int32_t epoch_num,std::vector<int32_t> * result)609 Status ProfilingManager::GetBatchTimeByEpoch(int32_t epoch_num, std::vector<int32_t> *result) {
610 uint32_t start_step = 0, end_step = 0;
611 RETURN_IF_NOT_OK(EpochToStepInterval(epoch_num, &start_step, &end_step));
612 return GetBatchTimeByStep(start_step, end_step, result);
613 }
614
GetBatchTimeByStep(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)615 Status ProfilingManager::GetBatchTimeByStep(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
616 std::shared_ptr<Tracing> node;
617 if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
618 GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
619 return node->GetBatchTime(start_step, end_step, result);
620 } else {
621 return {StatusCode::kMDUnexpectedError, "Cannot find appropriate tracing node"};
622 }
623 }
624
GetBatchTimeByTime(uint64_t start_ts,uint64_t end_ts,std::vector<int32_t> * result)625 Status ProfilingManager::GetBatchTimeByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result) {
626 int32_t start_step = 0, end_step = 0;
627 RETURN_IF_NOT_OK(TimeToStepInterval(start_ts, end_ts, &start_step, &end_step));
628 return GetBatchTimeByStep(start_step, end_step, result);
629 }
630
GetConnectorSizeByEpoch(int32_t epoch_num,std::vector<int32_t> * result)631 Status ProfilingManager::GetConnectorSizeByEpoch(int32_t epoch_num, std::vector<int32_t> *result) {
632 uint32_t start_step = 0, end_step = 0;
633 RETURN_IF_NOT_OK(EpochToStepInterval(epoch_num, &start_step, &end_step));
634 return GetConnectorSizeByStep(start_step, end_step, result);
635 }
636
GetConnectorSizeByStep(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)637 Status ProfilingManager::GetConnectorSizeByStep(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
638 std::shared_ptr<Tracing> node;
639 if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
640 GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
641 return node->GetConnectorSize(start_step, end_step, result);
642 } else {
643 return {StatusCode::kMDUnexpectedError, "Cannot find appropriate tracing node"};
644 }
645 }
646
GetConnectorSizeByTime(uint64_t start_ts,uint64_t end_ts,std::vector<int32_t> * result)647 Status ProfilingManager::GetConnectorSizeByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result) {
648 int32_t start_step = 0, end_step = 0;
649 RETURN_IF_NOT_OK(TimeToStepInterval(start_ts, end_ts, &start_step, &end_step));
650 return GetConnectorSizeByStep(start_step, end_step, result);
651 }
652
GetEmptyQueueFrequencyByEpoch(int32_t epoch_num,float_t * result)653 Status ProfilingManager::GetEmptyQueueFrequencyByEpoch(int32_t epoch_num, float_t *result) {
654 uint32_t start_step = 0, end_step = 0;
655 RETURN_IF_NOT_OK(EpochToStepInterval(epoch_num, &start_step, &end_step));
656 return GetEmptyQueueFrequencyByStep(start_step, end_step, result);
657 }
658
GetEmptyQueueFrequencyByStep(int32_t start_step,int32_t end_step,float_t * result)659 Status ProfilingManager::GetEmptyQueueFrequencyByStep(int32_t start_step, int32_t end_step, float_t *result) {
660 std::shared_ptr<Tracing> node;
661 if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
662 GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
663 return node->GetEmptyQueueFrequency(start_step, end_step, result);
664 } else {
665 return {StatusCode::kMDUnexpectedError, "Cannot find appropriate tracing node"};
666 }
667 }
668
GetEmptyQueueFrequencyByTime(uint64_t start_ts,uint64_t end_ts,float_t * result)669 Status ProfilingManager::GetEmptyQueueFrequencyByTime(uint64_t start_ts, uint64_t end_ts, float_t *result) {
670 int32_t start_step = 0, end_step = 0;
671 RETURN_IF_NOT_OK(TimeToStepInterval(start_ts, end_ts, &start_step, &end_step));
672 return GetEmptyQueueFrequencyByStep(start_step, end_step, result);
673 }
674
GetConnectorCapacityByEpoch(int32_t epoch_num,std::vector<int32_t> * result)675 Status ProfilingManager::GetConnectorCapacityByEpoch(int32_t epoch_num, std::vector<int32_t> *result) {
676 uint32_t start_step = 0, end_step = 0;
677 RETURN_IF_NOT_OK(EpochToStepInterval(epoch_num, &start_step, &end_step));
678 return GetConnectorCapacityByStep(start_step, end_step, result);
679 }
680
GetConnectorCapacityByStep(int32_t start_step,int32_t end_step,std::vector<int32_t> * result)681 Status ProfilingManager::GetConnectorCapacityByStep(int32_t start_step, int32_t end_step,
682 std::vector<int32_t> *result) {
683 std::shared_ptr<Tracing> node;
684 if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
685 GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
686 return node->GetConnectorCapacity(start_step, end_step, result);
687 } else {
688 return {StatusCode::kMDUnexpectedError, "Cannot find appropriate tracing node"};
689 }
690 }
691
GetConnectorCapacityByTime(uint64_t start_ts,uint64_t end_ts,std::vector<int32_t> * result)692 Status ProfilingManager::GetConnectorCapacityByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result) {
693 int32_t start_step = 0, end_step = 0;
694 RETURN_IF_NOT_OK(TimeToStepInterval(start_ts, end_ts, &start_step, &end_step));
695 return GetConnectorCapacityByStep(start_step, end_step, result);
696 }
697
GetNumberOfProfiledSteps(int32_t * steps)698 Status ProfilingManager::GetNumberOfProfiledSteps(int32_t *steps) {
699 std::shared_ptr<Tracing> node;
700 if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
701 GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
702 *steps = node->GetNumberSteps();
703 return Status::OK();
704 } else {
705 return {StatusCode::kMDUnexpectedError, "Cannot find appropriate tracing node"};
706 }
707 }
708
RecordEndOfEpoch(uint32_t step_num)709 void ProfilingManager::RecordEndOfEpoch(uint32_t step_num) {
710 if (profiling_state_ != ProfilingState::kProfilingStateRunning) {
711 return;
712 }
713 MS_LOG(INFO) << "Recording end of epoch. step_num: " << step_num;
714 (void)epoch_end_ts_.emplace_back(ProfilingTime::GetCurMilliSecond());
715 (void)epoch_end_step_.emplace_back(step_num);
716 }
717
Reset()718 Status ProfilingManager::Reset() {
719 for (const auto &node : tracing_nodes_) {
720 node.second->Clear();
721 }
722 for (const auto &node : sampling_nodes_) {
723 node.second->Clear();
724 }
725 epoch_end_ts_.clear();
726 epoch_end_step_.clear();
727 profiling_state_ = ProfilingState::kProfilingStateUnBegun;
728 autotuning_ = false;
729 profiling_ = false;
730 return Status::OK();
731 }
732
Init(const bool for_autotune)733 Status ProfilingManager::Init(const bool for_autotune) {
734 // Reinitialization should only be done in case of UT with sequential pipelines and should not be used externally.
735 // Reinitialization with parallel data pipelines can have unexpected consequences.
736 CHECK_FAIL_RETURN_UNEXPECTED(!autotuning_, "Stop MD Autotune before initializing the MD Profiler.");
737 CHECK_FAIL_RETURN_UNEXPECTED(!profiling_, "Stop MD Profiler before initializing it.");
738 CHECK_FAIL_RETURN_UNEXPECTED(profiling_state_ != ProfilingState::kProfilingStateRunning,
739 "Stop MD Profiler before reinitializing it.");
740 RETURN_IF_NOT_OK(Reset());
741 tracing_nodes_.clear();
742 sampling_nodes_.clear();
743 tree_ = nullptr;
744 CHECK_FAIL_RETURN_UNEXPECTED(profiling_state_ == ProfilingState::kProfilingStateUnBegun,
745 "MD Profiler is in an unexpected state.");
746 if (for_autotune) {
747 autotuning_ = true;
748 MS_LOG(INFO) << "MD profiler is initialized successfully for autotuning.";
749 } else {
750 profiling_ = true;
751 MS_LOG(INFO) << "MD profiler is initialized successfully for profiling.";
752 }
753 return Status::OK();
754 }
755
Start()756 Status ProfilingManager::Start() {
757 CHECK_FAIL_RETURN_UNEXPECTED(profiling_state_ != ProfilingState::kProfilingStateRunning,
758 "MD ProfilingManager is already running.");
759 if (profiling_state_ == ProfilingState::kProfilingStateFinished) {
760 // This scenario (start, stop, and then start again) only happens in profiling, not autotune.
761 MS_LOG(INFO) << "MD ProfilingManager had already stopped. Resetting...";
762 RETURN_IF_NOT_OK(Reset());
763 for (const auto &node : sampling_nodes_) {
764 RETURN_IF_NOT_OK(node.second->Init());
765 }
766 for (const auto &node : tracing_nodes_) {
767 RETURN_IF_NOT_OK(node.second->Init());
768 }
769 profiling_ = true;
770 MS_LOG(INFO) << "MD profiler is reset successfully for profiling.";
771 }
772
773 profiling_state_ = ProfilingState::kProfilingStateRunning;
774 for (const auto &node : tracing_nodes_) {
775 RETURN_IF_NOT_OK(node.second->Start());
776 }
777 for (const auto &node : sampling_nodes_) {
778 RETURN_IF_NOT_OK(node.second->Start());
779 }
780 MS_LOG(INFO) << "MD profiler is started.";
781 return Status::OK();
782 }
783
Stop()784 Status ProfilingManager::Stop() {
785 CHECK_FAIL_RETURN_UNEXPECTED(profiling_state_ != ProfilingState::kProfilingStateUnBegun,
786 "MD ProfilingManager has not started yet.");
787 // It's OK if we are in kProfilingStateFinished state. We allow user to call Stop twice.
788 if (profiling_state_ == ProfilingState::kProfilingStateFinished) {
789 MS_LOG(WARNING) << "MD ProfilingManager had already stopped.";
790 return Status::OK();
791 }
792
793 for (const auto &node : tracing_nodes_) {
794 RETURN_IF_NOT_OK(node.second->Stop());
795 }
796 for (const auto &node : sampling_nodes_) {
797 RETURN_IF_NOT_OK(node.second->Stop());
798 }
799 profiling_state_ = ProfilingState::kProfilingStateFinished;
800 if (autotuning_) {
801 autotuning_ = false;
802 MS_LOG(INFO) << "MD Autotune is stopped.";
803 }
804 if (profiling_) {
805 profiling_ = false;
806 MS_LOG(INFO) << "MD Profiler is stopped.";
807 }
808 return Status::OK();
809 }
810
Save(const std::string & profile_data_path)811 Status ProfilingManager::Save(const std::string &profile_data_path) {
812 // Validate input profile data path
813 CHECK_FAIL_RETURN_UNEXPECTED(!profile_data_path.empty(), "Invalid parameter, Profiling directory is not set.");
814 CHECK_FAIL_RETURN_UNEXPECTED(profile_data_path.size() < PATH_MAX, "Invalid file, Profiling directory is invalid.");
815
816 // profiling file: <profile_data_path>/filename_rank_id.suffix
817 char real_path[PATH_MAX] = {0};
818 #if defined(_WIN32) || defined(_WIN64)
819 if (_fullpath(real_path, common::SafeCStr(profile_data_path), PATH_MAX) == nullptr) {
820 RETURN_STATUS_UNEXPECTED("Profiling dir is invalid.");
821 }
822 #else
823 if (realpath(common::SafeCStr(profile_data_path), real_path) == nullptr) {
824 RETURN_STATUS_UNEXPECTED("Invalid file, can not get realpath of Profiling directory.");
825 }
826 #endif
827
828 std::string rank_id = GetRankID();
829 // Output all profiling data upon request.
830 RETURN_IF_NOT_OK(SaveProfilingData(std::string(profile_data_path), rank_id));
831 RETURN_IF_NOT_OK(ChangeFileMode(std::string(profile_data_path), rank_id));
832 return Status::OK();
833 }
834
GetProfilerTreeState(const ExecutionTree * tree) const835 ProfilingManager::ProfilingRegistrationState ProfilingManager::GetProfilerTreeState(const ExecutionTree *tree) const {
836 auto enabled = (profiling_ || autotuning_);
837 if (!enabled) {
838 return kNotEnabled;
839 }
840 if (tree_ == nullptr) {
841 return kEnabledTreeNotRegistered;
842 } else {
843 return tree_ == tree ? kEnabledTreeRegistered : kEnabledDifferentTreeRegistered;
844 }
845 }
846
GetRankID() const847 std::string ProfilingManager::GetRankID() const {
848 std::string rank_id = common::GetEnv("RANK_ID");
849 #ifdef WITH_BACKEND
850 MS_EXCEPTION_IF_NULL(MsContext::GetInstance());
851 if (MsContext::GetInstance()->get_param<std::string>(MS_CTX_DEVICE_TARGET) == kGPUDevice) {
852 std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
853 int32_t rank_id_int = cfg->rank_id();
854 // If DEVICE_ID is not set, default value is 0
855 if (rank_id_int < 0) {
856 rank_id = common::GetEnv("DEVICE_ID");
857 } else {
858 rank_id = std::to_string(rank_id_int);
859 }
860 }
861 #endif
862 // If RANK_ID is not set, default value is 0
863 if (rank_id.empty()) {
864 rank_id = "0";
865 }
866 return rank_id;
867 }
868
GetCurMilliSecond()869 uint64_t ProfilingTime::GetCurMilliSecond() {
870 // because cpplint does not allow using namespace
871 using std::chrono::duration_cast;
872 using std::chrono::milliseconds;
873 using std::chrono::steady_clock;
874 return static_cast<uint64_t>(duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count());
875 }
876 } // namespace dataset
877 } // namespace mindspore
878