1 /**
2 * Copyright 2020 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/perf/profiling.h"
17 #include <cstdlib>
18 #include <fstream>
19 #include "utils/ms_utils.h"
20 #include "minddata/dataset/util/path.h"
21 #ifdef ENABLE_GPUQUE
22 #include "minddata/dataset/core/config_manager.h"
23 #include "minddata/dataset/core/global_context.h"
24 #endif
25 #include "minddata/dataset/engine/perf/monitor.h"
26 #include "minddata/dataset/engine/perf/device_queue_tracing.h"
27 #include "minddata/dataset/engine/perf/connector_size.h"
28 #include "minddata/dataset/engine/perf/connector_throughput.h"
29 #include "minddata/dataset/engine/perf/cpu_sampling.h"
30 #include "minddata/dataset/engine/perf/dataset_iterator_tracing.h"
31 #include "minddata/dataset/util/log_adapter.h"
32
33 namespace mindspore {
34 namespace dataset {
35
SaveToFile()36 Status Tracing::SaveToFile() {
37 if (value_.empty()) {
38 return Status::OK();
39 }
40
41 std::ofstream handle(file_path_, std::ios::trunc);
42 if (!handle.is_open()) {
43 RETURN_STATUS_UNEXPECTED("Profiling file can not be opened.");
44 }
45 for (auto value : value_) {
46 handle << value << "\n";
47 }
48 handle.close();
49
50 return Status::OK();
51 }
52
ReadJson(nlohmann::json * output)53 Status Sampling::ReadJson(nlohmann::json *output) {
54 RETURN_UNEXPECTED_IF_NULL(output);
55 Path path = Path(file_path_);
56 if (path.Exists()) {
57 MS_LOG(DEBUG) << file_path_ << " exists";
58 try {
59 std::ifstream file(file_path_);
60 file >> (*output);
61 } catch (const std::exception &err) {
62 RETURN_STATUS_UNEXPECTED("Invalid file, failed to open json file: " + file_path_ +
63 ", please delete it and try again!");
64 }
65 } else {
66 (*output)["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
67 }
68 return Status::OK();
69 }
70
71 // Constructor
ProfilingManager(ExecutionTree * tree)72 ProfilingManager::ProfilingManager(ExecutionTree *tree) : tree_(tree), enabled_(true) {
73 perf_monitor_ = std::make_unique<Monitor>(tree_);
74 }
75
IsProfilingEnable() const76 bool ProfilingManager::IsProfilingEnable() const { return common::GetEnv("PROFILING_MODE") == "true" && enabled_; }
77
Initialize()78 Status ProfilingManager::Initialize() {
79 // Register nodes based on config
80 std::string dir = common::GetEnv("MINDDATA_PROFILING_DIR");
81 if (dir.empty()) {
82 RETURN_STATUS_UNEXPECTED("Invalid parameter, Profiling directory is not set.");
83 }
84 char real_path[PATH_MAX] = {0};
85 if (dir.size() >= PATH_MAX) {
86 RETURN_STATUS_UNEXPECTED("Invalid file, Profiling directory is invalid.");
87 }
88 #if defined(_WIN32) || defined(_WIN64)
89 if (_fullpath(real_path, common::SafeCStr(dir), PATH_MAX) == nullptr) {
90 RETURN_STATUS_UNEXPECTED("Profiling dir is invalid.");
91 }
92 #else
93 if (realpath(common::SafeCStr(dir), real_path) == nullptr) {
94 RETURN_STATUS_UNEXPECTED("Invalid file, can not get realpath of Profiling directory.");
95 }
96 #endif
97 dir_path_ = real_path;
98
99 #ifdef ENABLE_GPUQUE
100 std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
101 int32_t rank_id = cfg->rank_id();
102 // If DEVICE_ID is not set, default value is 0
103 if (rank_id < 0) {
104 device_id_ = common::GetEnv("DEVICE_ID");
105 // If DEVICE_ID is not set, default value is 0
106 if (device_id_.empty()) {
107 device_id_ = "0";
108 }
109 } else {
110 device_id_ = std::to_string(rank_id);
111 }
112 #else
113 device_id_ = common::GetEnv("RANK_ID");
114 // If RANK_ID is not set, default value is 0
115 if (device_id_.empty()) {
116 device_id_ = "0";
117 }
118 #endif
119
120 // Register all profiling node.
121 // device_queue node is used for graph mode
122 std::shared_ptr<Tracing> device_queue_tracing = std::make_shared<DeviceQueueTracing>();
123 RETURN_IF_NOT_OK(RegisterTracingNode(device_queue_tracing));
124
125 // dataset_iterator node is used for graph mode
126 std::shared_ptr<Tracing> dataset_iterator_tracing = std::make_shared<DatasetIteratorTracing>();
127 RETURN_IF_NOT_OK(RegisterTracingNode(dataset_iterator_tracing));
128
129 std::shared_ptr<Sampling> connector_size_sampling = std::make_shared<ConnectorSize>(tree_);
130 RETURN_IF_NOT_OK(RegisterSamplingNode(connector_size_sampling));
131
132 std::shared_ptr<Sampling> connector_thr_sampling = std::make_shared<ConnectorThroughput>(tree_);
133 RETURN_IF_NOT_OK(RegisterSamplingNode(connector_thr_sampling));
134
135 #ifndef ENABLE_ANDROID
136 std::shared_ptr<Sampling> cpu_sampling = std::make_shared<CpuSampling>(tree_);
137 RETURN_IF_NOT_OK(RegisterSamplingNode(cpu_sampling));
138 #endif
139 return Status::OK();
140 }
141
142 // Launch monitoring thread.
LaunchMonitor()143 Status ProfilingManager::LaunchMonitor() {
144 RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Monitor Thread launched", std::ref(*perf_monitor_)));
145 return Status::OK();
146 }
147
148 // Profiling node registration
RegisterTracingNode(std::shared_ptr<Tracing> node)149 Status ProfilingManager::RegisterTracingNode(std::shared_ptr<Tracing> node) {
150 // Check if node with the same name has already been registered.
151 auto exist = tracing_nodes_.find(node->Name());
152 if (exist != tracing_nodes_.end()) {
153 return Status(StatusCode::kMDProfilingError, "Profiling node already exist: " + node->Name());
154 }
155 // Register the node with its name as key.
156 RETURN_IF_NOT_OK(node->Init(dir_path_, device_id_));
157 tracing_nodes_[node->Name()] = node;
158 return Status::OK();
159 }
160
161 // Profiling node getter
GetTracingNode(const std::string & name,std::shared_ptr<Tracing> * node)162 Status ProfilingManager::GetTracingNode(const std::string &name, std::shared_ptr<Tracing> *node) {
163 // Check if node with the same name has already been registered.
164 auto exist = tracing_nodes_.find(name);
165 if (exist == tracing_nodes_.end()) {
166 return Status(StatusCode::kMDProfilingError, "Profiling node does not exist: " + name);
167 }
168 // Fetch node.
169 *node = tracing_nodes_[name];
170 return Status::OK();
171 }
172
173 // Profiling node registration
RegisterSamplingNode(std::shared_ptr<Sampling> node)174 Status ProfilingManager::RegisterSamplingNode(std::shared_ptr<Sampling> node) {
175 // Check if node with the same name has already been registered.
176 auto exist = sampling_nodes_.find(node->Name());
177 if (exist != sampling_nodes_.end()) {
178 return Status(StatusCode::kMDProfilingError, "Profiling node already exist: " + node->Name());
179 }
180 // Register the node with its name as key.
181 RETURN_IF_NOT_OK(node->Init(dir_path_, device_id_));
182 sampling_nodes_[node->Name()] = node;
183 return Status::OK();
184 }
185
186 // Profiling node getter
GetSamplingNode(const std::string & name,std::shared_ptr<Sampling> * node)187 Status ProfilingManager::GetSamplingNode(const std::string &name, std::shared_ptr<Sampling> *node) {
188 // Check if node with the same name has already been registered.
189 auto exist = sampling_nodes_.find(name);
190 if (exist == sampling_nodes_.end()) {
191 return Status(StatusCode::kMDProfilingError, "Profiling node does not exist: " + name);
192 }
193 // Fetch node.
194 *node = sampling_nodes_[name];
195 return Status::OK();
196 }
197
SaveProfilingData()198 Status ProfilingManager::SaveProfilingData() {
199 if (!IsProfilingEnable()) {
200 return Status::OK();
201 }
202 MS_LOG(INFO) << "Start to save profiling data.";
203 for (auto node : tracing_nodes_) {
204 RETURN_IF_NOT_OK(node.second->SaveToFile());
205 }
206 for (auto node : sampling_nodes_) {
207 RETURN_IF_NOT_OK(node.second->SaveToFile());
208 }
209 MS_LOG(INFO) << "Save profiling data end.";
210 return Status::OK();
211 }
Analyze()212 Status ProfilingManager::Analyze() {
213 if (!IsProfilingEnable()) {
214 return Status::OK();
215 }
216 MS_LOG(INFO) << "Start to analyze profiling data.";
217 for (auto node : sampling_nodes_) {
218 RETURN_IF_NOT_OK(node.second->Analyze());
219 }
220 return Status::OK();
221 }
222
ChangeFileMode()223 Status ProfilingManager::ChangeFileMode() {
224 if (!IsProfilingEnable()) {
225 return Status::OK();
226 }
227 MS_LOG(INFO) << "Start to change file mode.";
228 for (auto node : tracing_nodes_) {
229 RETURN_IF_NOT_OK(node.second->ChangeFileMode());
230 }
231 for (auto node : sampling_nodes_) {
232 RETURN_IF_NOT_OK(node.second->ChangeFileMode());
233 }
234 MS_LOG(INFO) << "Change file mode end.";
235 return Status::OK();
236 }
237
GetCurMilliSecond()238 uint64_t ProfilingTime::GetCurMilliSecond() {
239 // because cpplint does not allow using namespace
240 using std::chrono::duration_cast;
241 using std::chrono::milliseconds;
242 using std::chrono::steady_clock;
243 return static_cast<uint64_t>(duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count());
244 }
245 } // namespace dataset
246 } // namespace mindspore
247