1 /**
2 * Copyright 2020 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <sys/stat.h>
18 #include <iterator>
19 #include <algorithm>
20 #include <memory>
21 #include <string>
22 #include <nlohmann/json.hpp>
23 #include "minddata/dataset/engine/perf/connector_throughput.h"
24 #include "minddata/dataset/engine/execution_tree.h"
25 #include "minddata/dataset/util/path.h"
26
27 namespace mindspore {
28 namespace dataset {
29
30 // temporary helper
InitNodes()31 int ConnectorThroughput::InitNodes() {
32 if (tree_ == nullptr) {
33 return 0;
34 }
35 auto it = (*tree_).begin();
36 return it.NumNodes();
37 }
38 // Sample action
Sample()39 Status ConnectorThroughput::Sample() {
40 std::vector<int64_t> out_row_count_row(n_nodes_);
41 std::vector<double> throughput_row(n_nodes_);
42 TimePoint cur_time; // initialised inside the loop, used outside the loop to update prev sample time.
43 auto col = 0;
44 for (const auto &node : *tree_) {
45 auto cur_out_rows_count = node.ConnectorOutRowsCount();
46 out_row_count_row[col] = cur_out_rows_count;
47 auto sz = timestamps_.size();
48 cur_time = std::chrono::steady_clock::now();
49 double data_time = 0;
50 if (sz > 1) {
51 auto full_time =
52 std::chrono::duration_cast<std::chrono::microseconds>(timestamps_[0][sz - 1] - timestamps_[0][sz - 2]);
53 data_time = std::chrono::duration<double>(full_time).count();
54 }
55 auto prev_out_rows_count = out_row_count_table_[col][out_row_count_table_.size() - 1];
56 if (data_time != 0) {
57 const int32_t multiplier = 1000;
58 auto thr = (cur_out_rows_count - prev_out_rows_count) / (multiplier * data_time);
59 throughput_row[col] = thr;
60 } else {
61 throughput_row[col] = 0;
62 }
63 col++;
64 }
65 std::vector<TimePoint> v = {cur_time}; // temporary fix
66 timestamps_.AddSample(v);
67 // Push new row of sample
68 out_row_count_table_.AddSample(out_row_count_row);
69 throughput_.AddSample(throughput_row);
70 return Status::OK();
71 }
72
ParseOpInfo(const DatasetOp & node,const std::vector<double> & thr)73 json ConnectorThroughput::ParseOpInfo(const DatasetOp &node, const std::vector<double> &thr) {
74 auto children = node.Children();
75 std::vector<int32_t> children_id;
76 std::transform(children.begin(), children.end(), std::back_inserter(children_id),
77 [](const std::shared_ptr<DatasetOp> &op) -> int32_t { return op ? op->id() : 0; });
78 json json_node;
79 json_node["op_id"] = node.id();
80 json_node["op_type"] = node.Name();
81 json_node["num_workers"] = node.NumWorkers();
82 json metrics;
83 // DeviceQueueOp is a special op,it is not inlined but its output queue is invalid.
84 // So we should not output its connector throughput.
85 if (!node.inlined() && node.Name() != "DeviceQueueOp") {
86 metrics["output_queue"] = {{"throughput", thr}};
87 }
88 json_node["metrics"] = metrics;
89 if (!children_id.empty()) {
90 json_node["children"] = children_id;
91 }
92
93 return json_node;
94 }
95
96 // Save profiling data to file
97 // If the file is already exist (created by other sampling node), simply add the data to metrics field.
SaveToFile()98 Status ConnectorThroughput::SaveToFile() {
99 json output;
100 RETURN_IF_NOT_OK(ReadJson(&output));
101
102 Path path = Path(file_path_);
103 // Traverse the ExecutionTree for JSON node generation
104 int col = 0;
105 for (auto &node : *tree_) {
106 std::vector<double> throughput;
107 if (throughput_.size() > col) {
108 for (auto i = 0; i < throughput_[col].size(); i++) {
109 throughput.push_back(throughput_[col][i]);
110 }
111 }
112
113 if (!path.Exists()) {
114 json json_node = ParseOpInfo(node, throughput);
115 output["op_info"].push_back(json_node);
116 } else {
117 if (!node.inlined() && node.Name() != "DeviceQueueOp") {
118 auto &ops_data = output["op_info"];
119 ops_data[col]["metrics"]["output_queue"]["throughput"] = throughput;
120 }
121 }
122 col++;
123 }
124
125 // Discard the content of the file when opening.
126 std::ofstream os(file_path_, std::ios::trunc);
127 os << output;
128 os.close();
129 return Status::OK();
130 }
131
Init(const std::string & dir_path,const std::string & device_id)132 Status ConnectorThroughput::Init(const std::string &dir_path, const std::string &device_id) {
133 file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).ToString();
134 Path path = Path(file_path_);
135 // Remove the file if it exists (from prior profiling usage)
136 RETURN_IF_NOT_OK(path.Remove());
137 return Status::OK();
138 }
139
ChangeFileMode()140 Status ConnectorThroughput::ChangeFileMode() {
141 if (file_path_.empty()) {
142 return Status::OK();
143 }
144
145 if (chmod(common::SafeCStr(file_path_), S_IRUSR | S_IWUSR) == -1) {
146 std::string err_str = "Change file mode failed," + file_path_;
147 return Status(StatusCode::kMDUnexpectedError, err_str);
148 }
149 return Status::OK();
150 }
151
Analyze()152 Status ConnectorThroughput::Analyze() { return Status::OK(); }
153 } // namespace dataset
154 } // namespace mindspore
155