• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <sys/stat.h>
18 #include <iterator>
19 #include <algorithm>
20 #include <memory>
21 #include <string>
22 #include <nlohmann/json.hpp>
23 #include "minddata/dataset/engine/perf/connector_throughput.h"
24 #include "minddata/dataset/engine/execution_tree.h"
25 #include "minddata/dataset/util/path.h"
26 
27 namespace mindspore {
28 namespace dataset {
29 
30 // temporary helper
InitNodes()31 int ConnectorThroughput::InitNodes() {
32   if (tree_ == nullptr) {
33     return 0;
34   }
35   auto it = (*tree_).begin();
36   return it.NumNodes();
37 }
38 // Sample action
Sample()39 Status ConnectorThroughput::Sample() {
40   std::vector<int64_t> out_row_count_row(n_nodes_);
41   std::vector<double> throughput_row(n_nodes_);
42   TimePoint cur_time;  // initialised inside the loop, used outside the loop to update prev sample time.
43   auto col = 0;
44   for (const auto &node : *tree_) {
45     auto cur_out_rows_count = node.ConnectorOutRowsCount();
46     out_row_count_row[col] = cur_out_rows_count;
47     auto sz = timestamps_.size();
48     cur_time = std::chrono::steady_clock::now();
49     double data_time = 0;
50     if (sz > 1) {
51       auto full_time =
52         std::chrono::duration_cast<std::chrono::microseconds>(timestamps_[0][sz - 1] - timestamps_[0][sz - 2]);
53       data_time = std::chrono::duration<double>(full_time).count();
54     }
55     auto prev_out_rows_count = out_row_count_table_[col][out_row_count_table_.size() - 1];
56     if (data_time != 0) {
57       const int32_t multiplier = 1000;
58       auto thr = (cur_out_rows_count - prev_out_rows_count) / (multiplier * data_time);
59       throughput_row[col] = thr;
60     } else {
61       throughput_row[col] = 0;
62     }
63     col++;
64   }
65   std::vector<TimePoint> v = {cur_time};  // temporary fix
66   timestamps_.AddSample(v);
67   // Push new row of sample
68   out_row_count_table_.AddSample(out_row_count_row);
69   throughput_.AddSample(throughput_row);
70   return Status::OK();
71 }
72 
ParseOpInfo(const DatasetOp & node,const std::vector<double> & thr)73 json ConnectorThroughput::ParseOpInfo(const DatasetOp &node, const std::vector<double> &thr) {
74   auto children = node.Children();
75   std::vector<int32_t> children_id;
76   std::transform(children.begin(), children.end(), std::back_inserter(children_id),
77                  [](const std::shared_ptr<DatasetOp> &op) -> int32_t { return op ? op->id() : 0; });
78   json json_node;
79   json_node["op_id"] = node.id();
80   json_node["op_type"] = node.Name();
81   json_node["num_workers"] = node.NumWorkers();
82   json metrics;
83   // DeviceQueueOp is a special op,it is not inlined but its output queue is invalid.
84   // So we should not output its connector throughput.
85   if (!node.inlined() && node.Name() != "DeviceQueueOp") {
86     metrics["output_queue"] = {{"throughput", thr}};
87   }
88   json_node["metrics"] = metrics;
89   if (!children_id.empty()) {
90     json_node["children"] = children_id;
91   }
92 
93   return json_node;
94 }
95 
96 // Save profiling data to file
97 // If the file is already exist (created by other sampling node), simply add the data to metrics field.
SaveToFile()98 Status ConnectorThroughput::SaveToFile() {
99   json output;
100   RETURN_IF_NOT_OK(ReadJson(&output));
101 
102   Path path = Path(file_path_);
103   // Traverse the ExecutionTree for JSON node generation
104   int col = 0;
105   for (auto &node : *tree_) {
106     std::vector<double> throughput;
107     if (throughput_.size() > col) {
108       for (auto i = 0; i < throughput_[col].size(); i++) {
109         throughput.push_back(throughput_[col][i]);
110       }
111     }
112 
113     if (!path.Exists()) {
114       json json_node = ParseOpInfo(node, throughput);
115       output["op_info"].push_back(json_node);
116     } else {
117       if (!node.inlined() && node.Name() != "DeviceQueueOp") {
118         auto &ops_data = output["op_info"];
119         ops_data[col]["metrics"]["output_queue"]["throughput"] = throughput;
120       }
121     }
122     col++;
123   }
124 
125   // Discard the content of the file when opening.
126   std::ofstream os(file_path_, std::ios::trunc);
127   os << output;
128   os.close();
129   return Status::OK();
130 }
131 
Init(const std::string & dir_path,const std::string & device_id)132 Status ConnectorThroughput::Init(const std::string &dir_path, const std::string &device_id) {
133   file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).ToString();
134   Path path = Path(file_path_);
135   // Remove the file if it exists (from prior profiling usage)
136   RETURN_IF_NOT_OK(path.Remove());
137   return Status::OK();
138 }
139 
ChangeFileMode()140 Status ConnectorThroughput::ChangeFileMode() {
141   if (file_path_.empty()) {
142     return Status::OK();
143   }
144 
145   if (chmod(common::SafeCStr(file_path_), S_IRUSR | S_IWUSR) == -1) {
146     std::string err_str = "Change file mode failed," + file_path_;
147     return Status(StatusCode::kMDUnexpectedError, err_str);
148   }
149   return Status::OK();
150 }
151 
Analyze()152 Status ConnectorThroughput::Analyze() { return Status::OK(); }
153 }  // namespace dataset
154 }  // namespace mindspore
155