• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "minddata/dataset/engine/tree_adapter.h"
18 
19 #include "minddata/dataset/core/client.h"
20 #include "minddata/dataset/engine/ir/datasetops/root_node.h"
21 #ifndef ENABLE_ANDROID
22 #include "minddata/dataset/engine/opt/optional/tensor_op_fusion_pass.h"
23 #include "minddata/dataset/engine/opt/pre/cache_transform_pass.h"
24 #include "minddata/dataset/engine/opt/post/repeat_pass.h"
25 #endif
26 #include "minddata/dataset/engine/opt/pass.h"
27 #include "minddata/dataset/engine/opt/post/auto_worker_pass.h"
28 #ifdef ENABLE_PYTHON
29 #include "minddata/dataset/engine/opt/post/generator_node_pass.h"
30 #endif
31 #include "minddata/dataset/engine/opt/pre/cache_validation_pass.h"
32 #include "minddata/dataset/engine/opt/pre/deep_copy_pass.h"
33 #include "minddata/dataset/engine/opt/pre/epoch_ctrl_pass.h"
34 #include "minddata/dataset/engine/opt/pre/getter_pass.h"
35 #include "minddata/dataset/engine/opt/pre/input_validation_pass.h"
36 #include "minddata/dataset/engine/opt/pre/node_removal_pass.h"
37 
38 namespace mindspore {
39 namespace dataset {
40 
TreeAdapter(UsageFlag usage)41 TreeAdapter::TreeAdapter(UsageFlag usage) : usage_(usage), launched_(false), tree_state_(kCompileStateInit) {
42   optimize_ = common::GetEnv("OPTIMIZE") == "true";
43 
44   // Initialize profiling parameters
45   cur_batch_num_ = 0;
46   cur_connector_size_ = 0;
47   cur_connector_capacity_ = 0;
48 }
49 
PrePass(std::shared_ptr<DatasetNode> ir)50 Status TreeAdapter::PrePass(std::shared_ptr<DatasetNode> ir) {
51   RETURN_UNEXPECTED_IF_NULL(ir);
52   // Vector of actions in pre-pass phase
53   std::vector<std::unique_ptr<IRPass>> actions;
54 
55   MS_LOG(INFO) << "Running pre pass loops.";
56   actions.emplace_back(std::make_unique<InputValidationPass>());
57   actions.emplace_back(std::make_unique<CacheValidationPass>());
58   actions.emplace_back(std::make_unique<NodeRemovalPass>());
59   actions.emplace_back(std::make_unique<EpochCtrlPass>());
60   if (usage_ == kDeGetter) actions.emplace_back(std::make_unique<GetterPass>());
61 #ifndef ENABLE_ANDROID
62   actions.emplace_back(std::make_unique<CacheTransformPass>());
63 #endif
64   // Vector of flags for each action
65   std::vector<bool> modified(actions.size(), false);
66   // Apply pre-pass actions
67   for (auto i = 0; i < actions.size(); i++) {
68     auto m = false;
69     RETURN_IF_NOT_OK(actions[i]->Run(ir, &m));
70     modified[i] = m;
71   }
72   MS_LOG(INFO) << "Pre pass complete.";
73   return Status::OK();
74 }
75 
Optimize(std::shared_ptr<DatasetNode> ir)76 Status TreeAdapter::Optimize(std::shared_ptr<DatasetNode> ir) {
77   RETURN_UNEXPECTED_IF_NULL(ir);
78   // Vector of optimizations
79   std::vector<std::unique_ptr<IRNodePass>> optimizations;
80   MS_LOG(INFO) << "Running optimization pass loops";
81 #ifndef ENABLE_ANDROID
82   optimizations.emplace_back(std::make_unique<TensorOpFusionPass>());
83 #endif
84   // Apply optimization pass actions
85   for (auto i = 0; i < optimizations.size(); i++) {
86     bool modified = false;
87     RETURN_IF_NOT_OK(optimizations[i]->Run(ir, &modified));
88   }
89   MS_LOG(INFO) << "Optimization pass complete.";
90   return Status::OK();
91 }
92 
PostPass(std::shared_ptr<DatasetNode> ir)93 Status TreeAdapter::PostPass(std::shared_ptr<DatasetNode> ir) {
94   RETURN_UNEXPECTED_IF_NULL(ir);
95   // Vector of actions in post-pass phase
96   std::vector<std::unique_ptr<IRPass>> actions;
97   MS_LOG(INFO) << "Running post pass loops.";
98 
99   // AutoWorkerPass should ideally precede CacheTransForm Pass to avoid complications of the setting
100   if (GlobalContext::config_manager()->auto_num_workers() && usage_ == kDeIterator) {
101     // skip this for getter pass
102     actions.emplace_back(std::make_unique<AutoWorkerPass>());
103   }
104 #ifdef ENABLE_PYTHON
105   actions.emplace_back(std::make_unique<GeneratorNodePass>());
106 #endif
107 #ifndef ENABLE_ANDROID
108   actions.emplace_back(std::make_unique<RepeatPass>());
109 #endif
110   // We will gradually move RepeatPass from ExecutionTree::PrepareTreePostAction to here.
111 
112   // Vector of flags for each action
113   std::vector<bool> modified(actions.size(), false);
114   for (auto i = 0; i < actions.size(); i++) {
115     auto m = false;
116     RETURN_IF_NOT_OK(actions[i]->Run(ir, &m));
117     modified[i] = m;
118   }
119   MS_LOG(INFO) << "Post passes complete.";
120   return Status::OK();
121 }
122 
BuildExecutionTreeRecur(std::shared_ptr<DatasetNode> ir,std::shared_ptr<DatasetOp> * const op)123 Status TreeAdapter::BuildExecutionTreeRecur(std::shared_ptr<DatasetNode> ir, std::shared_ptr<DatasetOp> *const op) {
124   RETURN_UNEXPECTED_IF_NULL(ir);
125   RETURN_UNEXPECTED_IF_NULL(op);
126   RETURN_UNEXPECTED_IF_NULL(tree_);
127   // Build the DatasetOp ExecutionTree from the optimized IR tree
128   std::vector<std::shared_ptr<DatasetOp>> ops;
129   RETURN_IF_NOT_OK(ir->Build(&ops));
130 
131   CHECK_FAIL_RETURN_UNEXPECTED(!ops.empty(), "Unable to build node: " + ir->Name());
132 
133   (*op) = ops.front();  // return the first op to be added as child by the caller of this function
134   RETURN_IF_NOT_OK(tree_->AssociateNode(*op));
135 
136   for (size_t i = 1; i < ops.size(); i++) {
137     RETURN_IF_NOT_OK(tree_->AssociateNode(ops[i]));
138     RETURN_IF_NOT_OK(ops[i - 1]->AddChild(ops[i]));
139   }
140 
141   // Build the children of IR, once they return, add the return value to *op
142   for (const std::shared_ptr<DatasetNode> &child_ir : ir->Children()) {
143     std::shared_ptr<DatasetOp> child_op;
144     RETURN_IF_NOT_OK(BuildExecutionTreeRecur(child_ir, &child_op));
145     RETURN_IF_NOT_OK(ops.back()->AddChild(child_op));  // append children to the last of ops
146   }
147 
148   return Status::OK();
149 }
150 
Build(std::shared_ptr<DatasetNode> root_ir)151 Status TreeAdapter::Build(std::shared_ptr<DatasetNode> root_ir) {
152   RETURN_UNEXPECTED_IF_NULL(root_ir);
153   // This will evolve in the long run
154   tree_ = std::make_unique<ExecutionTree>();
155   // disable profiling if this is only a getter pass
156 #ifndef ENABLE_SECURITY
157   if (usage_ == kDeGetter) tree_->GetProfilingManager()->DisableProfiling();
158 #endif
159   // Build the Execution tree from the child of the IR root node, which represent the root of the input IR tree
160   std::shared_ptr<DatasetOp> root_op;
161   RETURN_IF_NOT_OK(BuildExecutionTreeRecur(root_ir->Children()[0], &root_op));
162   RETURN_IF_NOT_OK(tree_->AssignRoot(root_op));
163 
164   // Note: We will gradually move the pre pass, optimizer pass, and post pass
165   //       on ExecutionTree to perform on IR tree.
166   // Prepare the tree
167   RETURN_IF_NOT_OK(tree_->Prepare());
168 
169   // After the tree is prepared, the col_name_id_map can safely be obtained
170   column_name_map_ = tree_->root()->column_name_id_map();
171 
172   return Status::OK();
173 }
174 
Compile(std::shared_ptr<DatasetNode> input_ir,int32_t num_epochs)175 Status TreeAdapter::Compile(std::shared_ptr<DatasetNode> input_ir, int32_t num_epochs) {
176   RETURN_UNEXPECTED_IF_NULL(input_ir);
177 
178   tree_state_ = kCompileStateIRGraphBuilt;
179   MS_LOG(INFO) << "Input plan:" << '\n' << *input_ir << '\n';
180 
181   // Clone the input IR tree and insert under the root node
182   // Create a root node to host the new copy of the input IR tree
183   // This is done so that the compilation will process and modify the tree
184   // without changing the tree associated with the user code.
185   // The tree from the user code is permitted to form a graph where any node
186   // is consumed by more than one parent. However, this cloning process here
187   // will break the graph into a tree by copying each consumption of a node into a new copy.
188   bool m = false;
189   DeepCopyPass cloning_tree;
190   RETURN_IF_NOT_OK(cloning_tree.Run(input_ir, &m));
191   std::shared_ptr<RootNode> root_ir = cloning_tree.Root();
192   root_ir->SetNumEpochs(num_epochs);
193 
194   tree_state_ = kCompileStateIRTreeCloned;
195   MS_LOG(INFO) << "Plan before optimization:" << '\n' << *root_ir << '\n';
196 
197   // Pre-pass of the IR tree
198   RETURN_IF_NOT_OK(PrePass(root_ir));
199 
200   // Optional phase of optimization
201   if (optimize_) {
202     RETURN_IF_NOT_OK(Optimize(root_ir));
203   }
204 
205   // Post-pass of the IR tree
206   RETURN_IF_NOT_OK(PostPass(root_ir));
207 
208   tree_state_ = kCompileStateOptimized;
209   MS_LOG(INFO) << "Plan after optimization:" << '\n' << *root_ir << '\n';
210   // Remember the root node
211   root_ir_ = root_ir;
212 
213   RETURN_IF_NOT_OK(Build(root_ir_));
214   tree_state_ = kCompileStateReady;
215 
216   return Status::OK();
217 }
218 
GetNext(TensorRow * row)219 Status TreeAdapter::GetNext(TensorRow *row) {
220   RETURN_UNEXPECTED_IF_NULL(tree_);
221   RETURN_UNEXPECTED_IF_NULL(row);
222   row->clear();  // make sure row is empty
223 #ifndef ENABLE_SECURITY
224   bool is_profiling_enable = tree_->GetProfilingManager()->IsProfilingEnable();
225 #endif
226 
227   // When cur_db_ is a nullptr, it means this is the first call to get_next, launch ExecutionTree
228   if (!launched_) {
229     RETURN_IF_NOT_OK(Launch());
230   }
231 
232   RETURN_IF_NOT_OK(tree_->root()->GetNextRow(row));  // first buf can't be eof or empty buf with none flag
233   if (row->eoe()) {                                  // return empty tensor if 1st buf is a ctrl buf (no rows)
234     MS_LOG(INFO) << "End of data iteration.";
235 #ifndef ENABLE_SECURITY
236     if (is_profiling_enable) {
237       tree_->SetEpochEnd();
238     }
239 #endif
240     return Status::OK();
241   }
242   if (row->eof()) {
243     tree_->SetFinished();
244     std::string err = "EOF buffer encountered. User tries to fetch data beyond the specified number of epochs.";
245     RETURN_STATUS_UNEXPECTED(err);
246   }
247 
248   // Record profiling info
249 #ifndef ENABLE_SECURITY
250   if (tracing_ != nullptr) {
251     uint64_t end_time = ProfilingTime::GetCurMilliSecond();
252     cur_batch_num_++;
253     cur_connector_size_ = tree_->root()->ConnectorSize();
254     cur_connector_capacity_ = tree_->root()->ConnectorCapacity();
255     RETURN_IF_NOT_OK(
256       tracing_->Record(CONNECTOR_DEPTH, cur_connector_capacity_, cur_batch_num_, cur_connector_size_, end_time));
257   }
258 #endif
259   return Status::OK();
260 }
261 
Launch()262 Status TreeAdapter::Launch() {
263   CHECK_FAIL_RETURN_UNEXPECTED(tree_ != nullptr, "Tree is a nullptr.");
264   RETURN_IF_NOT_OK(tree_->Launch());
265   launched_ = true;
266   // Profiling
267   std::shared_ptr<Tracing> node;
268 #ifndef ENABLE_SECURITY
269   Status s = tree_->GetProfilingManager()->GetTracingNode(kDatasetIteratorTracingName, &node);
270 #else
271   Status s = Status::OK();
272 #endif
273   if (s.IsOk()) {
274 #ifndef ENABLE_SECURITY
275     tracing_ = std::dynamic_pointer_cast<DatasetIteratorTracing>(node);
276 #endif
277     cur_connector_size_ = tree_->root()->ConnectorSize();
278     cur_connector_capacity_ = tree_->root()->ConnectorCapacity();
279   }
280   return Status::OK();
281 }
282 
283 }  // namespace dataset
284 }  // namespace mindspore
285