• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/execution_tree.h"
17 
18 #include <iostream>
19 #include <limits>
20 #include <string>
21 
22 #include "minddata/dataset/engine/datasetops/data_queue_op.h"
23 #include "minddata/dataset/engine/datasetops/dataset_op.h"
24 #include "minddata/dataset/engine/perf/info_collector.h"
25 #include "minddata/dataset/util/task_manager.h"
26 #ifdef WITH_BACKEND
27 #include "mindspore/core/utils/numa_interface.h"
28 #include "utils/ms_context.h"
29 #endif
30 
31 namespace mindspore {
32 namespace dataset {
33 // Constructor
34 #ifdef WITH_BACKEND
ExecutionTree()35 ExecutionTree::ExecutionTree() : ExecutionTree(GlobalContext::config_manager()) {}
36 
ExecutionTree(std::shared_ptr<ConfigManager> cfg)37 ExecutionTree::ExecutionTree(std::shared_ptr<ConfigManager> cfg)
38     : rank_id_(cfg->rank_id()),
39       numa_enable_(cfg->numa_enable()),
40       handle_(nullptr),
41       id_count_(0),
42       tree_state_(kDeTStateInit) {
43   tg_ = std::make_unique<TaskGroup>();
44   root_ = nullptr;
45   unique_id_ = Services::GetUniqueID();
46 }
47 #else
48 ExecutionTree::ExecutionTree() : id_count_(0), tree_state_(kDeTStateInit) {
49   tg_ = std::make_unique<TaskGroup>();
50   root_ = nullptr;
51   unique_id_ = Services::GetUniqueID();
52 }
53 #endif
54 
55 // Destructor
~ExecutionTree()56 ExecutionTree::~ExecutionTree() {
57 #ifdef WITH_BACKEND
58   if (numa_enable_) {
59     handle_ = nullptr;
60   }
61   DataQueueOp *op = dynamic_cast<DataQueueOp *>(root_.get());
62   if (op != nullptr) {
63     op->StopWaiting();
64   }
65 #endif
66   (void)tg_->ServiceStop();
67 }
68 
69 // Associates a DatasetOp with this tree. This assigns a valid node id to the operator and
70 // provides it with a link to the tree. A node cannot form any relationships (parent/child) with
71 // other nodes unless they are associated with the same tree.
AssociateNode(const std::shared_ptr<DatasetOp> & op)72 Status ExecutionTree::AssociateNode(const std::shared_ptr<DatasetOp> &op) {
73   RETURN_UNEXPECTED_IF_NULL(op);
74   // If we are already a part of the tree, no-op
75   if (op->tree_ == this) {
76     return Status::OK();
77   }
78   if (tree_state_ != kDeTStateInit && tree_state_ != kDeTStateBuilding) {
79     std::string err_msg =
80       "Invalid tree state for adding a node. Current state: " + std::to_string(static_cast<int>(tree_state_)) +
81       " Expected states: " + std::to_string(static_cast<int>(kDeTStateInit)) + " or " +
82       std::to_string(static_cast<int>(kDeTStateBuilding));
83     RETURN_STATUS_UNEXPECTED(err_msg);
84   }
85 
86   // Enter the building state if we were not already there
87   tree_state_ = kDeTStateBuilding;
88 
89   // Assign an id to the operator
90   op->SetId(id_count_);
91   id_count_++;
92 
93   // Assign our tree into the op so that each op has a link back to the tree
94   op->set_tree(this);
95   return Status::OK();
96 }
97 
98 // Sets the root node of the tree
AssignRoot(const std::shared_ptr<DatasetOp> & op)99 Status ExecutionTree::AssignRoot(const std::shared_ptr<DatasetOp> &op) {
100   RETURN_UNEXPECTED_IF_NULL(op);
101   // Tree must be in building state before we can assign root to it
102   if (tree_state_ != kDeTStateBuilding) {
103     std::string err_msg =
104       "Invalid tree state for assigning a root node. Current state: " + std::to_string(static_cast<int>(tree_state_)) +
105       " Expected state: " + std::to_string(static_cast<int>(kDeTStateBuilding));
106     RETURN_STATUS_UNEXPECTED(err_msg);
107   }
108 
109   // If they didn't already call AssociateNode for this node before calling AssignRoot,
110   // then do so now.
111   if (op->operator_id_ == DatasetOp::kInvalidOperatorId) {
112     RETURN_IF_NOT_OK(this->AssociateNode(op));
113   }
114 
115   // Then add it as the root.
116   root_ = op;
117 
118   return Status::OK();
119 }
120 
121 // A print method typically used for debugging
Print(std::ostream & out,const std::shared_ptr<DatasetOp> & op) const122 void ExecutionTree::Print(std::ostream &out, const std::shared_ptr<DatasetOp> &op) const {
123   out << "Execution tree summary:\n"
124       << "-----------------------\n";
125   this->PrintNode(out, op == nullptr ? root_ : op, "", true, false);
126   out << "\nExecution tree operator details:\n"
127       << "--------------------------------\n";
128   this->PrintNode(out, op == nullptr ? root_ : op, "", true, true);
129 }
130 
131 // A helper functions for doing the recursive printing
PrintNode(std::ostream & out,const std::shared_ptr<DatasetOp> & dataset_op,std::string indent,bool last,bool detailed) const132 void ExecutionTree::PrintNode(std::ostream &out, const std::shared_ptr<DatasetOp> &dataset_op, std::string indent,
133                               bool last, bool detailed) const {
134   if (dataset_op == nullptr) {
135     return;
136   }
137   // Decide which printer to use based on detailed arg.
138   if (!detailed) {
139     out << indent << "+- " << *dataset_op;
140     indent += (last ? "    " : "|   ");
141   } else {
142     dataset_op->Print(out, detailed);
143   }
144 
145   // Descend to children
146   for (size_t i = 0; i < dataset_op->child_.size(); ++i) {
147     this->PrintNode(out, dataset_op->child_[i], indent, (i == (dataset_op->child_.size() - 1)), detailed);
148   }
149 }
150 
151 // Start the execution of the tree
Launch()152 Status ExecutionTree::Launch() {
153   RETURN_IF_NOT_OK(CollectPipelineInfoStart("Pipeline", "Launch"));
154   // opencv limit too many threads
155 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__APPLE__) && !defined(ENABLE_ANDROID)
156 #ifdef WITH_BACKEND
157   // Here we do numa bind for performance optimization, as our test result,
158   // if we do numa bind when get_dataset_size launch a tree, we'll get a
159   // better performance than only we do numa bind at the time _To_Device
160   // launch a tree. Our numa bind work is a process level bind, bind with
161   // both cpu and memory and we choose numa_node with a polling logic:
162   // numa_bind_id = rank_id_ % (numa_max_node() + 1)
163   // Now we only support GPU scenario and the single process scenario of Ascend,
164   // now we remove the target_link of numa with _c_dataengine, and user can use
165   // a config api to control whether to open numa feature.
166   if (numa_enable_ && rank_id_ >= 0) {
167     if (handle_ == nullptr) {
168       handle_ = GetNumaAdapterHandle();
169       if (handle_ == nullptr) {
170         RETURN_STATUS_UNEXPECTED("Numa package (libnuma.so) not found.");
171       }
172     }
173     RETURN_IF_NOT_OK(NumaBind(handle_.get(), rank_id_));
174     MS_LOG(INFO) << "Numa bind memory and cpu successful.";
175   }
176 #endif
177 #endif
178 
179   // Tree must be built and prepared before it can be launched!
180   if (tree_state_ != kDeTStatePrepared) {
181     std::string err_msg =
182       "Invalid tree state for launching tree. Current state: " + std::to_string(static_cast<int>(tree_state_)) +
183       " Expected state: " + std::to_string(static_cast<int>(kDeTStatePrepared));
184     RETURN_STATUS_UNEXPECTED(err_msg);
185   }
186 
187   std::ostringstream ss;
188   ss << *this;
189   MS_LOG(INFO) << "Printing the tree before launch tasks:\n" << ss.str();
190   for (auto itr = this->begin(); itr != this->end(); ++itr) {
191     // An inlined operator is one that has an output connector size of 0, and it does not
192     // require a thread to execute.  Instead, the work of this operator is executed inlined
193     // from the tree node directly above it (or in the case of a root node, it runs from within
194     // the launching tree/user thread.  Do not exec any thread for an inlined op.
195     // Set the state of the Operator as running. This only matters in Leaf ops, CacheOp and TakeOp
196     itr->state_ = DatasetOp::OpState::kDeOpRunning;
197     itr->Launch();
198     if (!itr->inlined()) {
199       RETURN_IF_NOT_OK(tg_->CreateAsyncTask(itr->NameWithID(), std::ref(*itr), nullptr, itr->id()));
200       // Set if this task group has data queue op
201       if (itr->Name() == kDeviceQueueOp) {
202         tg_->HasDataQueue(true);
203       }
204     }
205   }
206 
207   tree_state_ = kDeTStateExecuting;
208   RETURN_IF_NOT_OK(CollectPipelineInfoEnd("Pipeline", "Launch"));
209   return Status::OK();
210 }
211 
212 // A function that traverse the tree in postorder then save the results in nodes
PostOrderTraverse(const std::shared_ptr<DatasetOp> & node)213 void ExecutionTree::Iterator::PostOrderTraverse(const std::shared_ptr<DatasetOp> &node) {
214   if (node == nullptr) {
215     return;
216   }
217   for (int32_t i = 0; i < node->child_.size(); ++i) {
218     PostOrderTraverse(node->child_[i]);
219   }
220   nodes_.push_back(node);
221 }
222 
Iterator(const std::shared_ptr<DatasetOp> & root)223 ExecutionTree::Iterator::Iterator(const std::shared_ptr<DatasetOp> &root) : ind_(0) {
224   // post-order traverse the tree, if root is null, it return
225   PostOrderTraverse(root);
226   (void)nodes_.emplace_back(nullptr);
227 }
228 
229 // Given the number of workers, launch the worker entry function for each worker. This is essentially a
230 // wrapper for the TaskGroup handling that is stored inside the execution tree.
LaunchWorkers(int32_t num_workers,std::function<Status (uint32_t)> func,std::vector<Task * > * worker_tasks,std::string name,int32_t operator_id)231 Status ExecutionTree::LaunchWorkers(int32_t num_workers, std::function<Status(uint32_t)> func,
232                                     std::vector<Task *> *worker_tasks, std::string name, int32_t operator_id) {
233   int32_t num_cpu_threads = GlobalContext::Instance()->config_manager()->num_cpu_threads();
234   // this performs check that num_workers is positive and not unreasonably large which could happen
235   // for example, un-initialized variable. uint16 max is 65536 which is large enough to cover everything
236   CHECK_FAIL_RETURN_UNEXPECTED(num_workers > 0 && num_workers < std::numeric_limits<uint16_t>::max(),
237                                name + "'s num_worker=" + std::to_string(num_workers) + ", is negative or too large.");
238   // Launch the workers
239   if (num_workers > num_cpu_threads) {
240     MS_LOG(WARNING) << name << " is launched with " << std::to_string(num_workers) << " worker threads which exceeds "
241                     << std::to_string(num_cpu_threads) << ", the maximum number of threads on this CPU.";
242   }
243   worker_tasks->resize(num_workers);
244   for (size_t i = 0; i < num_workers; ++i) {
245     Task *task = nullptr;
246     RETURN_IF_NOT_OK(tg_->CreateAsyncTask(name, std::bind(func, i), &task, operator_id));
247     CHECK_FAIL_RETURN_UNEXPECTED(task != nullptr, "Failed to create a new worker");
248     (*worker_tasks)[i] = task;
249   }
250   return Status::OK();
251 }
252 
253 // Given the number of workers, launches the worker entry function for each. Essentially a
254 // wrapper for the TaskGroup handling that is stored inside the execution tree.
LaunchWorkers(int32_t num_workers,std::function<Status (uint32_t)> func,std::string name,int32_t operator_id)255 Status ExecutionTree::LaunchWorkers(int32_t num_workers, std::function<Status(uint32_t)> func, std::string name,
256                                     int32_t operator_id) {
257   std::vector<Task *> tasks;
258   return LaunchWorkers(num_workers, func, &tasks, name, operator_id);
259 }
260 
261 // Walks the tree to perform modifications to the tree in post-order to get it ready for execution.
Prepare(bool is_pull_mode)262 Status ExecutionTree::Prepare(bool is_pull_mode) {
263   if (root_ == nullptr) {
264     RETURN_STATUS_UNEXPECTED("Please assign one operator as the root of this tree.");
265   }
266 
267   std::vector<std::shared_ptr<DatasetOp>> fifo;
268   std::shared_ptr<DatasetOp> op = root_;
269   size_t index = 0;
270 
271   // Build a FIFO queue with the root at the beginning and continue adding its descendants to the queue.
272   fifo.push_back(op);
273   do {
274     op = fifo[index];
275     fifo.insert(fifo.end(), op->child_.begin(), op->child_.end());
276     ++index;
277   } while (index < fifo.size());
278 
279   // By iterating from the end of the FIFO queue, we simulate the post-order walk.
280   for (auto rit = fifo.crbegin(); rit != fifo.crend(); ++rit) {
281     if (!is_pull_mode) {
282       RETURN_IF_NOT_OK((*rit)->PrepareOperator());
283     } else {
284       RETURN_IF_NOT_OK((*rit)->PrepareOperatorPullBased());
285     }
286   }
287 
288   // The tree is prepared.
289   tree_state_ = kDeTStatePrepared;
290   return Status::OK();
291 }
292 }  // namespace dataset
293 }  // namespace mindspore
294