• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/perf/profiling.h"
17 #include <cstdlib>
18 #include <fstream>
19 #include "utils/ms_utils.h"
20 #include "minddata/dataset/util/path.h"
21 #ifdef ENABLE_GPUQUE
22 #include "minddata/dataset/core/config_manager.h"
23 #include "minddata/dataset/core/global_context.h"
24 #endif
25 #include "minddata/dataset/engine/perf/monitor.h"
26 #include "minddata/dataset/engine/perf/device_queue_tracing.h"
27 #include "minddata/dataset/engine/perf/connector_size.h"
28 #include "minddata/dataset/engine/perf/connector_throughput.h"
29 #include "minddata/dataset/engine/perf/cpu_sampling.h"
30 #include "minddata/dataset/engine/perf/dataset_iterator_tracing.h"
31 #include "minddata/dataset/util/log_adapter.h"
32 
33 namespace mindspore {
34 namespace dataset {
35 
SaveToFile()36 Status Tracing::SaveToFile() {
37   if (value_.empty()) {
38     return Status::OK();
39   }
40 
41   std::ofstream handle(file_path_, std::ios::trunc);
42   if (!handle.is_open()) {
43     RETURN_STATUS_UNEXPECTED("Profiling file can not be opened.");
44   }
45   for (auto value : value_) {
46     handle << value << "\n";
47   }
48   handle.close();
49 
50   return Status::OK();
51 }
52 
ReadJson(nlohmann::json * output)53 Status Sampling::ReadJson(nlohmann::json *output) {
54   RETURN_UNEXPECTED_IF_NULL(output);
55   Path path = Path(file_path_);
56   if (path.Exists()) {
57     MS_LOG(DEBUG) << file_path_ << " exists";
58     try {
59       std::ifstream file(file_path_);
60       file >> (*output);
61     } catch (const std::exception &err) {
62       RETURN_STATUS_UNEXPECTED("Invalid file, failed to open json file: " + file_path_ +
63                                ", please delete it and try again!");
64     }
65   } else {
66     (*output)["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
67   }
68   return Status::OK();
69 }
70 
71 // Constructor
ProfilingManager(ExecutionTree * tree)72 ProfilingManager::ProfilingManager(ExecutionTree *tree) : tree_(tree), enabled_(true) {
73   perf_monitor_ = std::make_unique<Monitor>(tree_);
74 }
75 
IsProfilingEnable() const76 bool ProfilingManager::IsProfilingEnable() const { return common::GetEnv("PROFILING_MODE") == "true" && enabled_; }
77 
Initialize()78 Status ProfilingManager::Initialize() {
79   // Register nodes based on config
80   std::string dir = common::GetEnv("MINDDATA_PROFILING_DIR");
81   if (dir.empty()) {
82     RETURN_STATUS_UNEXPECTED("Invalid parameter, Profiling directory is not set.");
83   }
84   char real_path[PATH_MAX] = {0};
85   if (dir.size() >= PATH_MAX) {
86     RETURN_STATUS_UNEXPECTED("Invalid file, Profiling directory is invalid.");
87   }
88 #if defined(_WIN32) || defined(_WIN64)
89   if (_fullpath(real_path, common::SafeCStr(dir), PATH_MAX) == nullptr) {
90     RETURN_STATUS_UNEXPECTED("Profiling dir is invalid.");
91   }
92 #else
93   if (realpath(common::SafeCStr(dir), real_path) == nullptr) {
94     RETURN_STATUS_UNEXPECTED("Invalid file, can not get realpath of Profiling directory.");
95   }
96 #endif
97   dir_path_ = real_path;
98 
99 #ifdef ENABLE_GPUQUE
100   std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
101   int32_t rank_id = cfg->rank_id();
102   // If DEVICE_ID is not set, default value is 0
103   if (rank_id < 0) {
104     device_id_ = common::GetEnv("DEVICE_ID");
105     // If DEVICE_ID is not set, default value is 0
106     if (device_id_.empty()) {
107       device_id_ = "0";
108     }
109   } else {
110     device_id_ = std::to_string(rank_id);
111   }
112 #else
113   device_id_ = common::GetEnv("RANK_ID");
114   // If RANK_ID is not set, default value is 0
115   if (device_id_.empty()) {
116     device_id_ = "0";
117   }
118 #endif
119 
120   // Register all profiling node.
121   // device_queue node is used for graph mode
122   std::shared_ptr<Tracing> device_queue_tracing = std::make_shared<DeviceQueueTracing>();
123   RETURN_IF_NOT_OK(RegisterTracingNode(device_queue_tracing));
124 
125   // dataset_iterator node is used for graph mode
126   std::shared_ptr<Tracing> dataset_iterator_tracing = std::make_shared<DatasetIteratorTracing>();
127   RETURN_IF_NOT_OK(RegisterTracingNode(dataset_iterator_tracing));
128 
129   std::shared_ptr<Sampling> connector_size_sampling = std::make_shared<ConnectorSize>(tree_);
130   RETURN_IF_NOT_OK(RegisterSamplingNode(connector_size_sampling));
131 
132   std::shared_ptr<Sampling> connector_thr_sampling = std::make_shared<ConnectorThroughput>(tree_);
133   RETURN_IF_NOT_OK(RegisterSamplingNode(connector_thr_sampling));
134 
135 #ifndef ENABLE_ANDROID
136   std::shared_ptr<Sampling> cpu_sampling = std::make_shared<CpuSampling>(tree_);
137   RETURN_IF_NOT_OK(RegisterSamplingNode(cpu_sampling));
138 #endif
139   return Status::OK();
140 }
141 
142 // Launch monitoring thread.
LaunchMonitor()143 Status ProfilingManager::LaunchMonitor() {
144   RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Monitor Thread launched", std::ref(*perf_monitor_)));
145   return Status::OK();
146 }
147 
148 // Profiling node registration
RegisterTracingNode(std::shared_ptr<Tracing> node)149 Status ProfilingManager::RegisterTracingNode(std::shared_ptr<Tracing> node) {
150   // Check if node with the same name has already been registered.
151   auto exist = tracing_nodes_.find(node->Name());
152   if (exist != tracing_nodes_.end()) {
153     return Status(StatusCode::kMDProfilingError, "Profiling node already exist: " + node->Name());
154   }
155   // Register the node with its name as key.
156   RETURN_IF_NOT_OK(node->Init(dir_path_, device_id_));
157   tracing_nodes_[node->Name()] = node;
158   return Status::OK();
159 }
160 
161 // Profiling node getter
GetTracingNode(const std::string & name,std::shared_ptr<Tracing> * node)162 Status ProfilingManager::GetTracingNode(const std::string &name, std::shared_ptr<Tracing> *node) {
163   // Check if node with the same name has already been registered.
164   auto exist = tracing_nodes_.find(name);
165   if (exist == tracing_nodes_.end()) {
166     return Status(StatusCode::kMDProfilingError, "Profiling node does not exist: " + name);
167   }
168   // Fetch node.
169   *node = tracing_nodes_[name];
170   return Status::OK();
171 }
172 
173 // Profiling node registration
RegisterSamplingNode(std::shared_ptr<Sampling> node)174 Status ProfilingManager::RegisterSamplingNode(std::shared_ptr<Sampling> node) {
175   // Check if node with the same name has already been registered.
176   auto exist = sampling_nodes_.find(node->Name());
177   if (exist != sampling_nodes_.end()) {
178     return Status(StatusCode::kMDProfilingError, "Profiling node already exist: " + node->Name());
179   }
180   // Register the node with its name as key.
181   RETURN_IF_NOT_OK(node->Init(dir_path_, device_id_));
182   sampling_nodes_[node->Name()] = node;
183   return Status::OK();
184 }
185 
186 // Profiling node getter
GetSamplingNode(const std::string & name,std::shared_ptr<Sampling> * node)187 Status ProfilingManager::GetSamplingNode(const std::string &name, std::shared_ptr<Sampling> *node) {
188   // Check if node with the same name has already been registered.
189   auto exist = sampling_nodes_.find(name);
190   if (exist == sampling_nodes_.end()) {
191     return Status(StatusCode::kMDProfilingError, "Profiling node does not exist: " + name);
192   }
193   // Fetch node.
194   *node = sampling_nodes_[name];
195   return Status::OK();
196 }
197 
SaveProfilingData()198 Status ProfilingManager::SaveProfilingData() {
199   if (!IsProfilingEnable()) {
200     return Status::OK();
201   }
202   MS_LOG(INFO) << "Start to save profiling data.";
203   for (auto node : tracing_nodes_) {
204     RETURN_IF_NOT_OK(node.second->SaveToFile());
205   }
206   for (auto node : sampling_nodes_) {
207     RETURN_IF_NOT_OK(node.second->SaveToFile());
208   }
209   MS_LOG(INFO) << "Save profiling data end.";
210   return Status::OK();
211 }
Analyze()212 Status ProfilingManager::Analyze() {
213   if (!IsProfilingEnable()) {
214     return Status::OK();
215   }
216   MS_LOG(INFO) << "Start to analyze profiling data.";
217   for (auto node : sampling_nodes_) {
218     RETURN_IF_NOT_OK(node.second->Analyze());
219   }
220   return Status::OK();
221 }
222 
ChangeFileMode()223 Status ProfilingManager::ChangeFileMode() {
224   if (!IsProfilingEnable()) {
225     return Status::OK();
226   }
227   MS_LOG(INFO) << "Start to change file mode.";
228   for (auto node : tracing_nodes_) {
229     RETURN_IF_NOT_OK(node.second->ChangeFileMode());
230   }
231   for (auto node : sampling_nodes_) {
232     RETURN_IF_NOT_OK(node.second->ChangeFileMode());
233   }
234   MS_LOG(INFO) << "Change file mode end.";
235   return Status::OK();
236 }
237 
GetCurMilliSecond()238 uint64_t ProfilingTime::GetCurMilliSecond() {
239   // because cpplint does not allow using namespace
240   using std::chrono::duration_cast;
241   using std::chrono::milliseconds;
242   using std::chrono::steady_clock;
243   return static_cast<uint64_t>(duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count());
244 }
245 }  // namespace dataset
246 }  // namespace mindspore
247