• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/execution_tree.h"
17 #include <iostream>
18 #include <string>
19 #include <limits>
20 #include "minddata/dataset/engine/datasetops/dataset_op.h"
21 #include "minddata/dataset/engine/datasetops/device_queue_op.h"
22 #ifndef ENABLE_SECURITY
23 #include "minddata/dataset/engine/perf/profiling.h"
24 #include "minddata/dataset/engine/perf/monitor.h"
25 #endif
26 #if defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)
27 #include "minddata/dataset/util/numa_interface.h"
28 #endif
29 #include "minddata/dataset/util/task_manager.h"
30 
31 namespace mindspore {
32 namespace dataset {
33 // Constructor
ExecutionTree()34 ExecutionTree::ExecutionTree() : id_count_(0), tree_state_(kDeTStateInit) {
35   tg_ = std::make_unique<TaskGroup>();
36   root_ = nullptr;
37   prepare_flags_ = 0;
38 #ifndef ENABLE_SECURITY
39   profiling_manager_ = std::make_unique<ProfilingManager>(this);
40 #endif
41 #if defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)
42   std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
43   rank_id_ = cfg->rank_id();
44   numa_enable_ = cfg->numa_enable();
45   handle_ = nullptr;
46 #endif
47 }
48 
49 // Destructor
~ExecutionTree()50 ExecutionTree::~ExecutionTree() {
51 #if defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)
52   if (numa_enable_) {
53     if (handle_ != nullptr) {
54       ReleaseLibrary(handle_);
55     }
56   }
57 #if defined(ENABLE_TDTQUE)
58   DeviceQueueOp *op = dynamic_cast<DeviceQueueOp *>(root_.get());
59   if (op != nullptr) {
60     op->StopWaiting();
61   }
62 #endif
63 #endif
64   (void)tg_->ServiceStop();
65 }
66 
67 // Associates a DatasetOp with this tree. This assigns a valid node id to the operator and
68 // provides it with a link to the tree. A node cannot form any relationships (parent/child) with
69 // other nodes unless they are associated with the same tree.
AssociateNode(const std::shared_ptr<DatasetOp> & op)70 Status ExecutionTree::AssociateNode(const std::shared_ptr<DatasetOp> &op) {
71   RETURN_UNEXPECTED_IF_NULL(op);
72   // If we are already a part of the tree, no-op
73   if (op->tree_ == this) {
74     return Status::OK();
75   }
76   if (tree_state_ != kDeTStateInit && tree_state_ != kDeTStateBuilding) {
77     std::string err_msg =
78       "Invalid tree state for adding a node. Current state: " + std::to_string(static_cast<int>(tree_state_)) +
79       " Expected states: " + std::to_string(static_cast<int>(kDeTStateInit)) + " or " +
80       std::to_string(static_cast<int>(kDeTStateBuilding));
81     RETURN_STATUS_UNEXPECTED(err_msg);
82   }
83 
84   // Enter the building state if we were not already there
85   tree_state_ = kDeTStateBuilding;
86 
87   // Assign an id to the operator
88   op->SetId(id_count_);
89   id_count_++;
90 
91   // Assign our tree into the op so that each op has a link back to the tree
92   op->set_tree(this);
93   return Status::OK();
94 }
95 
96 // Sets the root node of the tree
AssignRoot(const std::shared_ptr<DatasetOp> & op)97 Status ExecutionTree::AssignRoot(const std::shared_ptr<DatasetOp> &op) {
98   RETURN_UNEXPECTED_IF_NULL(op);
99   // Tree must be in building state before we can assign root to it
100   if (tree_state_ != kDeTStateBuilding) {
101     std::string err_msg =
102       "Invalid tree state for assigning a root node. Current state: " + std::to_string(static_cast<int>(tree_state_)) +
103       " Expected state: " + std::to_string(static_cast<int>(kDeTStateBuilding));
104     RETURN_STATUS_UNEXPECTED(err_msg);
105   }
106 
107   // If they didn't already call AssociateNode for this node before calling AssignRoot,
108   // then do so now.
109   if (op->operator_id_ == DatasetOp::kInvalidOperatorId) {
110     RETURN_IF_NOT_OK(this->AssociateNode(op));
111   }
112 
113   // Then add it as the root.
114   root_ = op;
115 
116   return Status::OK();
117 }
118 
119 // A print method typically used for debugging
Print(std::ostream & out,const std::shared_ptr<DatasetOp> & op) const120 void ExecutionTree::Print(std::ostream &out, const std::shared_ptr<DatasetOp> &op) const {
121   out << "Execution tree summary:\n"
122       << "-----------------------\n";
123   this->PrintNode(out, op == nullptr ? root_ : op, "", true, false);
124   out << "\nExecution tree operator details:\n"
125       << "--------------------------------\n";
126   this->PrintNode(out, op == nullptr ? root_ : op, "", true, true);
127 }
128 
129 // A helper functions for doing the recursive printing
PrintNode(std::ostream & out,const std::shared_ptr<DatasetOp> & dataset_op,std::string indent,bool last,bool detailed) const130 void ExecutionTree::PrintNode(std::ostream &out, const std::shared_ptr<DatasetOp> &dataset_op, std::string indent,
131                               bool last, bool detailed) const {
132   if (dataset_op == nullptr) {
133     return;
134   }
135   // Decide which printer to use based on detailed arg.
136   if (!detailed) {
137     out << indent << "+- " << *dataset_op;
138     indent += (last ? "    " : "|   ");
139   } else {
140     dataset_op->Print(out, detailed);
141   }
142 
143   // Descend to children
144   for (size_t i = 0; i < dataset_op->child_.size(); ++i) {
145     this->PrintNode(out, dataset_op->child_[i], indent, (i == (dataset_op->child_.size() - 1)), detailed);
146   }
147 }
148 
149 // Start the execution of the tree
Launch()150 Status ExecutionTree::Launch() {
151   // opencv limit too many threads
152 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__APPLE__) && !defined(ENABLE_ANDROID)
153 #if defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)
154   // Here we do numa bind for performance optimization, as our test result,
155   // if we do numa bind when get_dataset_size launch a tree, we'll get a
156   // better performance than only we do numa bind at the time _To_Device
157   // launch a tree. Our numa bind work is a process level bind, bind with
158   // both cpu and memory and we choose numa_node with a polling logic:
159   // numa_bind_id = rank_id_ % (numa_max_node() + 1)
160   // Now we only support GPU scenario and the single process scenario of Ascend,
161   // now we remove the target_link of numa with _c_dataengine, and user can use
162   // a config api to control whether to open numa feature.
163   if (numa_enable_ && rank_id_ >= 0) {
164     if (handle_ == nullptr) {
165       handle_ = GetNumaAdapterHandle();
166       if (handle_ == nullptr) {
167         RETURN_STATUS_UNEXPECTED("Numa package (libnuma.so) not found.");
168       }
169     }
170     RETURN_IF_NOT_OK(NumaBind(handle_, rank_id_));
171     MS_LOG(INFO) << "Numa bind memory and cpu successful.";
172   }
173 #endif
174   int32_t thread_num = get_nprocs();
175   if (thread_num == 0) {
176     std::string err_msg = "Invalid thread number, got 0.";
177     RETURN_STATUS_UNEXPECTED(err_msg);
178   }
179   constexpr int32_t max_cv_threads_cnt = 8;
180   cv::setNumThreads(thread_num > max_cv_threads_cnt ? max_cv_threads_cnt : thread_num);
181 #endif
182 
183   // Tree must be built and prepared before it can be launched!
184   if (tree_state_ != kDeTStatePrepared) {
185     std::string err_msg =
186       "Invalid tree state for launching tree. Current state: " + std::to_string(static_cast<int>(tree_state_)) +
187       " Expected state: " + std::to_string(static_cast<int>(kDeTStatePrepared));
188     RETURN_STATUS_UNEXPECTED(err_msg);
189   }
190 
191   // Profiling infrastructures need to be initialized before Op launching
192 #ifndef ENABLE_SECURITY
193   if (profiling_manager_->IsProfilingEnable()) {
194     // Setup profiling manager
195     RETURN_IF_NOT_OK(profiling_manager_->Initialize());
196     // Launch Monitor Thread
197     RETURN_IF_NOT_OK(profiling_manager_->LaunchMonitor());
198   }
199 #endif
200 
201   std::ostringstream ss;
202   ss << *this;
203   MS_LOG(DEBUG) << "Printing the tree before launch tasks:\n" << ss.str();
204   for (auto itr = this->begin(); itr != this->end(); ++itr) {
205     // An inlined operator is one that has an output connector size of 0, and it does not
206     // require a thread to execute.  Instead, the work of this operator is executed inlined
207     // from the tree node directly above it (or in the case of a root node, it runs from within
208     // the launching tree/user thread.  Do not exec any thread for an inlined op.
209     itr->state_ = DatasetOp::OpState::kDeOpRunning;
210     if (!itr->inlined()) {
211       RETURN_IF_NOT_OK(tg_->CreateAsyncTask(itr->NameWithID(), std::ref(*itr), nullptr, itr->id()));
212       // Set the state of the Operator as running. This only matters in Leaf ops, CacheOp and TakeOp
213     }
214   }
215 
216   tree_state_ = kDeTStateExecuting;
217 
218   return Status::OK();
219 }
220 
221 // A function that traverse the tree in postorder then save the results in nodes
PostOrderTraverse(const std::shared_ptr<DatasetOp> & node)222 void ExecutionTree::Iterator::PostOrderTraverse(const std::shared_ptr<DatasetOp> &node) {
223   if (node == nullptr) {
224     return;
225   }
226   for (int32_t i = 0; i < node->child_.size(); ++i) {
227     PostOrderTraverse(node->child_[i]);
228   }
229   nodes_.push_back(node);
230 }
231 
Iterator(const std::shared_ptr<DatasetOp> & root)232 ExecutionTree::Iterator::Iterator(const std::shared_ptr<DatasetOp> &root) : ind_(0) {
233   // post-order traverse the tree, if root is null, it return
234   PostOrderTraverse(root);
235   nodes_.emplace_back(nullptr);
236 }
237 
238 // Given the number of workers, launches the worker entry function for each. Essentially a
239 // wrapper for the TaskGroup handling that is stored inside the execution tree.
LaunchWorkers(int32_t num_workers,std::function<Status (uint32_t)> func,std::string name,int32_t operator_id)240 Status ExecutionTree::LaunchWorkers(int32_t num_workers, std::function<Status(uint32_t)> func, std::string name,
241                                     int32_t operator_id) {
242   int32_t num_cpu_threads = GlobalContext::Instance()->config_manager()->num_cpu_threads();
243   // this performs check that num_workers is positive and not unreasonably large which could happen
244   // for example, un-initialized variable. uint16 max is 65536 which is large enough to cover everything
245   CHECK_FAIL_RETURN_UNEXPECTED(num_workers > 0 && num_workers < std::numeric_limits<uint16_t>::max(),
246                                name + "'s num_worker=" + std::to_string(num_workers) + ", is negative or too large.");
247   // Launch the workers
248   if (num_workers > num_cpu_threads) {
249     MS_LOG(WARNING) << name + " is launched with " << std::to_string(num_workers) << " worker threads which exceeds "
250                     << std::to_string(num_cpu_threads) << ", the maximum number of threads on this CPU.";
251   }
252   for (int32_t i = 0; i < num_workers; ++i) {
253     RETURN_IF_NOT_OK(tg_->CreateAsyncTask(name, std::bind(func, i), nullptr, operator_id));
254   }
255   return Status::OK();
256 }
257 
258 // Walks the tree to perform modifications to the tree in post-order to get it ready for execution.
Prepare()259 Status ExecutionTree::Prepare() {
260   if (root_ == nullptr) {
261     RETURN_STATUS_UNEXPECTED("Please assign one operator as the root of this tree.");
262   }
263 
264   std::vector<std::shared_ptr<DatasetOp>> fifo;
265   std::shared_ptr<DatasetOp> op = root_;
266   size_t index = 0;
267 
268   // Build a FIFO queue with the root at the beginning and continue adding its descendants to the queue.
269   fifo.push_back(op);
270   do {
271     op = fifo[index];
272     fifo.insert(fifo.end(), op->child_.begin(), op->child_.end());
273     ++index;
274   } while (index < fifo.size());
275 
276   // By iterating from the end of the FIFO queue, we simulate the post-order walk.
277   for (auto rit = fifo.crbegin(); rit != fifo.crend(); ++rit) {
278     RETURN_IF_NOT_OK((*rit)->PrepareOperator());
279   }
280 
281   // The tree is prepared.
282   tree_state_ = kDeTStatePrepared;
283   return Status::OK();
284 }
285 }  // namespace dataset
286 }  // namespace mindspore
287