1 /**
2 * Copyright 2020-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "minddata/dataset/engine/tree_adapter.h"
18
19 #include "minddata/dataset/core/client.h"
20 #include "minddata/dataset/engine/ir/datasetops/root_node.h"
21 #ifndef ENABLE_ANDROID
22 #include "minddata/dataset/engine/opt/optional/tensor_op_fusion_pass.h"
23 #include "minddata/dataset/engine/opt/pre/cache_transform_pass.h"
24 #include "minddata/dataset/engine/opt/post/repeat_pass.h"
25 #endif
26 #include "minddata/dataset/engine/opt/pass.h"
27 #include "minddata/dataset/engine/opt/post/auto_worker_pass.h"
28 #ifdef ENABLE_PYTHON
29 #include "minddata/dataset/engine/opt/post/generator_node_pass.h"
30 #endif
31 #include "minddata/dataset/engine/opt/pre/cache_validation_pass.h"
32 #include "minddata/dataset/engine/opt/pre/deep_copy_pass.h"
33 #include "minddata/dataset/engine/opt/pre/epoch_ctrl_pass.h"
34 #include "minddata/dataset/engine/opt/pre/getter_pass.h"
35 #include "minddata/dataset/engine/opt/pre/input_validation_pass.h"
36 #include "minddata/dataset/engine/opt/pre/node_removal_pass.h"
37
38 namespace mindspore {
39 namespace dataset {
40
TreeAdapter(UsageFlag usage)41 TreeAdapter::TreeAdapter(UsageFlag usage) : usage_(usage), launched_(false), tree_state_(kCompileStateInit) {
42 optimize_ = common::GetEnv("OPTIMIZE") == "true";
43
44 // Initialize profiling parameters
45 cur_batch_num_ = 0;
46 cur_connector_size_ = 0;
47 cur_connector_capacity_ = 0;
48 }
49
PrePass(std::shared_ptr<DatasetNode> ir)50 Status TreeAdapter::PrePass(std::shared_ptr<DatasetNode> ir) {
51 RETURN_UNEXPECTED_IF_NULL(ir);
52 // Vector of actions in pre-pass phase
53 std::vector<std::unique_ptr<IRPass>> actions;
54
55 MS_LOG(INFO) << "Running pre pass loops.";
56 actions.emplace_back(std::make_unique<InputValidationPass>());
57 actions.emplace_back(std::make_unique<CacheValidationPass>());
58 actions.emplace_back(std::make_unique<NodeRemovalPass>());
59 actions.emplace_back(std::make_unique<EpochCtrlPass>());
60 if (usage_ == kDeGetter) actions.emplace_back(std::make_unique<GetterPass>());
61 #ifndef ENABLE_ANDROID
62 actions.emplace_back(std::make_unique<CacheTransformPass>());
63 #endif
64 // Vector of flags for each action
65 std::vector<bool> modified(actions.size(), false);
66 // Apply pre-pass actions
67 for (auto i = 0; i < actions.size(); i++) {
68 auto m = false;
69 RETURN_IF_NOT_OK(actions[i]->Run(ir, &m));
70 modified[i] = m;
71 }
72 MS_LOG(INFO) << "Pre pass complete.";
73 return Status::OK();
74 }
75
Optimize(std::shared_ptr<DatasetNode> ir)76 Status TreeAdapter::Optimize(std::shared_ptr<DatasetNode> ir) {
77 RETURN_UNEXPECTED_IF_NULL(ir);
78 // Vector of optimizations
79 std::vector<std::unique_ptr<IRNodePass>> optimizations;
80 MS_LOG(INFO) << "Running optimization pass loops";
81 #ifndef ENABLE_ANDROID
82 optimizations.emplace_back(std::make_unique<TensorOpFusionPass>());
83 #endif
84 // Apply optimization pass actions
85 for (auto i = 0; i < optimizations.size(); i++) {
86 bool modified = false;
87 RETURN_IF_NOT_OK(optimizations[i]->Run(ir, &modified));
88 }
89 MS_LOG(INFO) << "Optimization pass complete.";
90 return Status::OK();
91 }
92
PostPass(std::shared_ptr<DatasetNode> ir)93 Status TreeAdapter::PostPass(std::shared_ptr<DatasetNode> ir) {
94 RETURN_UNEXPECTED_IF_NULL(ir);
95 // Vector of actions in post-pass phase
96 std::vector<std::unique_ptr<IRPass>> actions;
97 MS_LOG(INFO) << "Running post pass loops.";
98
99 // AutoWorkerPass should ideally precede CacheTransForm Pass to avoid complications of the setting
100 if (GlobalContext::config_manager()->auto_num_workers() && usage_ == kDeIterator) {
101 // skip this for getter pass
102 actions.emplace_back(std::make_unique<AutoWorkerPass>());
103 }
104 #ifdef ENABLE_PYTHON
105 actions.emplace_back(std::make_unique<GeneratorNodePass>());
106 #endif
107 #ifndef ENABLE_ANDROID
108 actions.emplace_back(std::make_unique<RepeatPass>());
109 #endif
110 // We will gradually move RepeatPass from ExecutionTree::PrepareTreePostAction to here.
111
112 // Vector of flags for each action
113 std::vector<bool> modified(actions.size(), false);
114 for (auto i = 0; i < actions.size(); i++) {
115 auto m = false;
116 RETURN_IF_NOT_OK(actions[i]->Run(ir, &m));
117 modified[i] = m;
118 }
119 MS_LOG(INFO) << "Post passes complete.";
120 return Status::OK();
121 }
122
BuildExecutionTreeRecur(std::shared_ptr<DatasetNode> ir,std::shared_ptr<DatasetOp> * const op)123 Status TreeAdapter::BuildExecutionTreeRecur(std::shared_ptr<DatasetNode> ir, std::shared_ptr<DatasetOp> *const op) {
124 RETURN_UNEXPECTED_IF_NULL(ir);
125 RETURN_UNEXPECTED_IF_NULL(op);
126 RETURN_UNEXPECTED_IF_NULL(tree_);
127 // Build the DatasetOp ExecutionTree from the optimized IR tree
128 std::vector<std::shared_ptr<DatasetOp>> ops;
129 RETURN_IF_NOT_OK(ir->Build(&ops));
130
131 CHECK_FAIL_RETURN_UNEXPECTED(!ops.empty(), "Unable to build node: " + ir->Name());
132
133 (*op) = ops.front(); // return the first op to be added as child by the caller of this function
134 RETURN_IF_NOT_OK(tree_->AssociateNode(*op));
135
136 for (size_t i = 1; i < ops.size(); i++) {
137 RETURN_IF_NOT_OK(tree_->AssociateNode(ops[i]));
138 RETURN_IF_NOT_OK(ops[i - 1]->AddChild(ops[i]));
139 }
140
141 // Build the children of IR, once they return, add the return value to *op
142 for (const std::shared_ptr<DatasetNode> &child_ir : ir->Children()) {
143 std::shared_ptr<DatasetOp> child_op;
144 RETURN_IF_NOT_OK(BuildExecutionTreeRecur(child_ir, &child_op));
145 RETURN_IF_NOT_OK(ops.back()->AddChild(child_op)); // append children to the last of ops
146 }
147
148 return Status::OK();
149 }
150
Build(std::shared_ptr<DatasetNode> root_ir)151 Status TreeAdapter::Build(std::shared_ptr<DatasetNode> root_ir) {
152 RETURN_UNEXPECTED_IF_NULL(root_ir);
153 // This will evolve in the long run
154 tree_ = std::make_unique<ExecutionTree>();
155 // disable profiling if this is only a getter pass
156 #ifndef ENABLE_SECURITY
157 if (usage_ == kDeGetter) tree_->GetProfilingManager()->DisableProfiling();
158 #endif
159 // Build the Execution tree from the child of the IR root node, which represent the root of the input IR tree
160 std::shared_ptr<DatasetOp> root_op;
161 RETURN_IF_NOT_OK(BuildExecutionTreeRecur(root_ir->Children()[0], &root_op));
162 RETURN_IF_NOT_OK(tree_->AssignRoot(root_op));
163
164 // Note: We will gradually move the pre pass, optimizer pass, and post pass
165 // on ExecutionTree to perform on IR tree.
166 // Prepare the tree
167 RETURN_IF_NOT_OK(tree_->Prepare());
168
169 // After the tree is prepared, the col_name_id_map can safely be obtained
170 column_name_map_ = tree_->root()->column_name_id_map();
171
172 return Status::OK();
173 }
174
Compile(std::shared_ptr<DatasetNode> input_ir,int32_t num_epochs)175 Status TreeAdapter::Compile(std::shared_ptr<DatasetNode> input_ir, int32_t num_epochs) {
176 RETURN_UNEXPECTED_IF_NULL(input_ir);
177
178 tree_state_ = kCompileStateIRGraphBuilt;
179 MS_LOG(INFO) << "Input plan:" << '\n' << *input_ir << '\n';
180
181 // Clone the input IR tree and insert under the root node
182 // Create a root node to host the new copy of the input IR tree
183 // This is done so that the compilation will process and modify the tree
184 // without changing the tree associated with the user code.
185 // The tree from the user code is permitted to form a graph where any node
186 // is consumed by more than one parent. However, this cloning process here
187 // will break the graph into a tree by copying each consumption of a node into a new copy.
188 bool m = false;
189 DeepCopyPass cloning_tree;
190 RETURN_IF_NOT_OK(cloning_tree.Run(input_ir, &m));
191 std::shared_ptr<RootNode> root_ir = cloning_tree.Root();
192 root_ir->SetNumEpochs(num_epochs);
193
194 tree_state_ = kCompileStateIRTreeCloned;
195 MS_LOG(INFO) << "Plan before optimization:" << '\n' << *root_ir << '\n';
196
197 // Pre-pass of the IR tree
198 RETURN_IF_NOT_OK(PrePass(root_ir));
199
200 // Optional phase of optimization
201 if (optimize_) {
202 RETURN_IF_NOT_OK(Optimize(root_ir));
203 }
204
205 // Post-pass of the IR tree
206 RETURN_IF_NOT_OK(PostPass(root_ir));
207
208 tree_state_ = kCompileStateOptimized;
209 MS_LOG(INFO) << "Plan after optimization:" << '\n' << *root_ir << '\n';
210 // Remember the root node
211 root_ir_ = root_ir;
212
213 RETURN_IF_NOT_OK(Build(root_ir_));
214 tree_state_ = kCompileStateReady;
215
216 return Status::OK();
217 }
218
GetNext(TensorRow * row)219 Status TreeAdapter::GetNext(TensorRow *row) {
220 RETURN_UNEXPECTED_IF_NULL(tree_);
221 RETURN_UNEXPECTED_IF_NULL(row);
222 row->clear(); // make sure row is empty
223 #ifndef ENABLE_SECURITY
224 bool is_profiling_enable = tree_->GetProfilingManager()->IsProfilingEnable();
225 #endif
226
227 // When cur_db_ is a nullptr, it means this is the first call to get_next, launch ExecutionTree
228 if (!launched_) {
229 RETURN_IF_NOT_OK(Launch());
230 }
231
232 RETURN_IF_NOT_OK(tree_->root()->GetNextRow(row)); // first buf can't be eof or empty buf with none flag
233 if (row->eoe()) { // return empty tensor if 1st buf is a ctrl buf (no rows)
234 MS_LOG(INFO) << "End of data iteration.";
235 #ifndef ENABLE_SECURITY
236 if (is_profiling_enable) {
237 tree_->SetEpochEnd();
238 }
239 #endif
240 return Status::OK();
241 }
242 if (row->eof()) {
243 tree_->SetFinished();
244 std::string err = "EOF buffer encountered. User tries to fetch data beyond the specified number of epochs.";
245 RETURN_STATUS_UNEXPECTED(err);
246 }
247
248 // Record profiling info
249 #ifndef ENABLE_SECURITY
250 if (tracing_ != nullptr) {
251 uint64_t end_time = ProfilingTime::GetCurMilliSecond();
252 cur_batch_num_++;
253 cur_connector_size_ = tree_->root()->ConnectorSize();
254 cur_connector_capacity_ = tree_->root()->ConnectorCapacity();
255 RETURN_IF_NOT_OK(
256 tracing_->Record(CONNECTOR_DEPTH, cur_connector_capacity_, cur_batch_num_, cur_connector_size_, end_time));
257 }
258 #endif
259 return Status::OK();
260 }
261
Launch()262 Status TreeAdapter::Launch() {
263 CHECK_FAIL_RETURN_UNEXPECTED(tree_ != nullptr, "Tree is a nullptr.");
264 RETURN_IF_NOT_OK(tree_->Launch());
265 launched_ = true;
266 // Profiling
267 std::shared_ptr<Tracing> node;
268 #ifndef ENABLE_SECURITY
269 Status s = tree_->GetProfilingManager()->GetTracingNode(kDatasetIteratorTracingName, &node);
270 #else
271 Status s = Status::OK();
272 #endif
273 if (s.IsOk()) {
274 #ifndef ENABLE_SECURITY
275 tracing_ = std::dynamic_pointer_cast<DatasetIteratorTracing>(node);
276 #endif
277 cur_connector_size_ = tree_->root()->ConnectorSize();
278 cur_connector_capacity_ = tree_->root()->ConnectorCapacity();
279 }
280 return Status::OK();
281 }
282
283 } // namespace dataset
284 } // namespace mindspore
285