1 /** 2 * Copyright 2019-2022 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_EXECUTION_TREE_H_ 17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_EXECUTION_TREE_H_ 18 19 #include <functional> 20 #include <memory> 21 #include <stack> 22 #include <string> 23 #include <vector> 24 #ifndef ENABLE_ANDROID 25 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__APPLE__) 26 #include <sys/sysinfo.h> 27 #include <opencv2/imgproc/imgproc.hpp> 28 #endif 29 #endif 30 #include "minddata/dataset/engine/datasetops/dataset_op.h" 31 #include "minddata/dataset/util/status.h" 32 #ifndef ENABLE_SECURITY 33 #include "mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h" 34 #endif 35 namespace mindspore { 36 namespace dataset { 37 // Forward declares 38 class TaskGroup; 39 class DatasetOp; 40 class Pass; 41 using OptPass = std::vector<std::unique_ptr<Pass>>; 42 class ExecutionTree { 43 public: 44 // State flags for the lifecycle of the tree 45 enum TreeState { 46 kDeTStateInit = 0, // The freshly initialized state after construction 47 kDeTStateBuilding, // The tree is being built, nodes are being added 48 kDeTStatePrepared, // The tree has been prepared and is ready to be launched 49 kDeTStateExecuting, // The tree has been launched and is executing 50 kDeTStateEpochEnd, // The tree has been received end of epoch signal, just for profiling 51 kDeTStateFinished // The tree has been drained, dataset iterator received EOF 52 }; 53 54 class Iterator { 55 public: 56 // support iterator traits 57 using iterator_category = std::bidirectional_iterator_tag; 58 using value_type = DatasetOp; 59 using difference_type = ptrdiff_t; 60 using pointer = DatasetOp *; 61 using reference = DatasetOp &; 62 63 // Constructor 64 // @param root The root node to start iterating from 65 explicit Iterator(const std::shared_ptr<DatasetOp> &root = nullptr); 66 67 // Destructor ~Iterator()68 ~Iterator() {} 69 70 Iterator &operator++() { 71 ++ind_; 72 return *this; 73 } // prefix ++ overload 74 75 Iterator operator++(int) { 76 Iterator it = *this; 77 it.ind_ = ind_; 78 ind_++; 79 return it; 80 } // post-fix ++ overload 81 82 Iterator &operator--() { 83 --ind_; 84 return *this; 85 } // prefix -- overload 86 87 Iterator operator--(int) { 88 Iterator it = *this; 89 it.ind_ = ind_; 90 ind_--; 91 return it; 92 } // post-fix -- overload 93 94 DatasetOp &operator*() { return *nodes_[ind_]; } // dereference operator 95 96 std::shared_ptr<DatasetOp> operator->() { return nodes_[ind_]; } 97 98 // getter function 99 // @return Shared pointer to the current operator get()100 std::shared_ptr<DatasetOp> get() { return nodes_[ind_]; } 101 102 bool operator==(const Iterator &rhs) { return nodes_[ind_] == rhs.nodes_[rhs.ind_]; } 103 104 bool operator!=(const Iterator &rhs) { return nodes_[ind_] != rhs.nodes_[rhs.ind_]; } 105 NumNodes()106 size_t NumNodes() const { return nodes_.size(); } 107 108 private: 109 size_t ind_; // the cur node our Iterator points to 110 std::vector<std::shared_ptr<DatasetOp>> nodes_; // store the nodes in post order 111 void PostOrderTraverse(const std::shared_ptr<DatasetOp> &); 112 }; 113 114 // Constructor 115 ExecutionTree(); 116 117 // Destructor 118 ~ExecutionTree(); 119 120 /// \brief Associates a DatasetOp with this tree. This assigns a valid node id to the operator and 121 /// provides it with a link to the tree. A node cannot form any relationships (parent/child) with 122 /// other nodes unless they are associated with the same tree. 123 /// \param op - The operator to associate 124 /// \return Status The status code returned 125 Status AssociateNode(const std::shared_ptr<DatasetOp> &op); 126 127 /// \brief Set the root node of the tree 128 /// \param op - The operator to assign as root 129 /// \return Status The status code returned 130 Status AssignRoot(const std::shared_ptr<DatasetOp> &op); 131 132 /// \brief Start the execution of the tree 133 /// \return Status The status code returned 134 Status Launch(); 135 136 /// /brief A print method typically used for debugging 137 /// \param out - The output stream to write output to 138 void Print(std::ostream &out, const std::shared_ptr<DatasetOp> &op = nullptr) const; 139 140 /// \brief Return an iterator positioned at the start 141 /// \return Iterator - The iterator 142 ExecutionTree::Iterator begin(const std::shared_ptr<DatasetOp> &root = nullptr) const { 143 return Iterator(root == nullptr ? root_ : root); 144 } 145 146 /// \brief Return an iterator positioned at the end 147 /// \return Iterator - The iterator end()148 ExecutionTree::Iterator end() const { return Iterator(nullptr); } 149 150 /// \brief << Stream output operator overload 151 /// \notes This allows you to write the debug print info using stream operators 152 /// \param out - reference to the output stream being overloaded 153 /// \param exe_tree - reference to the execution tree to display 154 /// \return - the output stream must be returned 155 friend std::ostream &operator<<(std::ostream &out, const ExecutionTree &exe_tree) { 156 exe_tree.Print(out); 157 return out; 158 } 159 IsPython()160 const bool IsPython() { 161 for (auto itr = this->begin(); itr != this->end(); ++itr) { 162 if (itr->IsPython()) { 163 return true; 164 } 165 } 166 return false; 167 } 168 169 /// \brief Given the number of workers, launches the worker entry function for each. Essentially a 170 /// wrapper for the TaskGroup handling that is stored inside the execution tree. 171 /// \param num_workers - The number of workers to launch 172 /// \param func - The function entry point that workers will execute 173 /// \param[out] worker_tasks - output vector to hold generated tasks 174 /// \param name - The description of worker to launch 175 /// \param op_id - The id of corresponding operator, if not inherit from dataset op then it is -1. 176 /// \return Status The status code returned 177 Status LaunchWorkers(int32_t num_workers, std::function<Status(uint32_t)> func, std::vector<Task *> *worker_tasks, 178 std::string name, int32_t operator_id = -1); 179 180 Status LaunchWorkers(int32_t num_workers, std::function<Status(uint32_t)> func, std::string name, 181 int32_t operator_id = -1); 182 183 /// \brief Getter method 184 /// \return shared_ptr to the root operator root()185 std::shared_ptr<DatasetOp> root() const { return root_; } 186 187 /// \brief The prepare phase walks the tree in post-order to perform modifications to get it ready for execution. 188 /// \param is_pull_mode - an indicator if it's in pull mode or not 189 /// \return Status The status code returned 190 Status Prepare(bool is_pull_mode = false); 191 192 /// \brief Return the pointer to the TaskGroup 193 /// \return raw pointer to the TaskGroup AllTasks()194 TaskGroup *const AllTasks() const { return tg_.get(); } 195 196 /// \brief Return if the ExecutionTree is at end of epoch status 197 /// \return bool - true is ExecutionTree is end of epoch status IsEpochEnd()198 bool IsEpochEnd() const { return tree_state_ == TreeState::kDeTStateEpochEnd; } 199 200 /// \brief Set the ExecutionTree to EOE state SetEpochEnd()201 void SetEpochEnd() { tree_state_ = TreeState::kDeTStateEpochEnd; } 202 203 /// \brief Set the ExecutionTree to executing state SetExecuting()204 void SetExecuting() { tree_state_ = TreeState::kDeTStateExecuting; } 205 206 /// \brief Set the ExecutionTree to Finished state. SetFinished()207 void SetFinished() { tree_state_ = TreeState::kDeTStateFinished; } 208 209 /// \brief Return if the ExecutionTree is finished (iterator receives EOF). 210 /// \return Bool - true is ExecutionTree is finished isFinished()211 bool isFinished() const { return tree_state_ == TreeState::kDeTStateFinished; } 212 213 /// \brief Return if the ExecutionTree is ready. 214 /// \return Bool - true is ExecutionTree is ready isPrepared()215 bool isPrepared() const { 216 return tree_state_ == TreeState::kDeTStatePrepared || tree_state_ == TreeState::kDeTStateExecuting || 217 tree_state_ == TreeState::kDeTStateFinished; 218 } 219 220 /// \brief Get a unique identifier for the tree 221 /// \return unique ID as a string GetUniqueId()222 std::string GetUniqueId() { return unique_id_; } 223 224 private: 225 /// \brief A helper functions for doing the recursive printing 226 /// \param dataset_op - The dataset op to print 227 /// \param indent - an indent string for aligning child levels in output 228 /// \param last - an indicator if it's the last child or not 229 /// \param detailed - should it display the detailed node output or the summary line 230 void PrintNode(std::ostream &out, const std::shared_ptr<DatasetOp> &dataset_op, std::string indent, bool last, 231 bool detailed) const; 232 233 std::unique_ptr<TaskGroup> tg_; // Class for worker management 234 std::shared_ptr<DatasetOp> root_; // The root node of the tree 235 int32_t id_count_; // Counter for generating operator id's 236 TreeState tree_state_; // Tracking the current tree state 237 std::string unique_id_; // A unique identifier for the tree 238 239 #ifdef WITH_BACKEND 240 // Constructor for if defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE) 241 explicit ExecutionTree(std::shared_ptr<ConfigManager> cfg); 242 // This rank_id is for numa and device_queue, one process work with only one rank_id, 243 // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES', 244 // but for distribute scenario, this rank_id come from _get_global_rank() in python 245 int32_t rank_id_; 246 bool numa_enable_; 247 std::shared_ptr<void> handle_; 248 #endif 249 }; 250 } // namespace dataset 251 } // namespace mindspore 252 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_EXECUTION_TREE_H_ 253