1 /** 2 * Copyright 2019-2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_EXECUTION_TREE_H_ 17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_EXECUTION_TREE_H_ 18 19 #include <functional> 20 #include <memory> 21 #include <stack> 22 #include <string> 23 #include <vector> 24 #ifndef ENABLE_ANDROID 25 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__APPLE__) 26 #include <sys/sysinfo.h> 27 #include <opencv2/imgproc/imgproc.hpp> 28 #endif 29 #endif 30 #include "minddata/dataset/engine/datasetops/dataset_op.h" 31 #include "minddata/dataset/util/status.h" 32 #ifndef ENABLE_SECURITY 33 #include "mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h" 34 #endif 35 namespace mindspore { 36 namespace dataset { 37 // Forward declares 38 class TaskGroup; 39 class DatasetOp; 40 class Pass; 41 using OptPass = std::vector<std::unique_ptr<Pass>>; 42 class ExecutionTree { 43 public: 44 // State flags for the lifecycle of the tree 45 enum TreeState { 46 kDeTStateInit = 0, // The freshly initialized state after construction 47 kDeTStateBuilding, // The tree is being built, nodes are being added 48 kDeTStatePrepared, // The tree has been prepared and is ready to be launched 49 kDeTStateExecuting, // The tree has been launched and is executing 50 kDeTStateEpochEnd, // The tree has been received end of epoch signal, just for profiling 51 kDeTStateFinished // The tree has been drained, dataset iterator received EOF 52 }; 53 54 class Iterator { 55 public: 56 // Constructor 57 // @param root The root node to start iterating from 58 explicit Iterator(const std::shared_ptr<DatasetOp> &root = nullptr); 59 60 // Destructor ~Iterator()61 ~Iterator() {} 62 63 Iterator &operator++() { 64 ++ind_; 65 return *this; 66 } // prefix ++ overload 67 Iterator operator++(int) { 68 Iterator it = *this; 69 it.ind_ = ind_; 70 ind_++; 71 return it; 72 } // post-fix ++ overload 73 Iterator &operator--() { 74 --ind_; 75 return *this; 76 } // prefix -- overload 77 Iterator operator--(int) { 78 Iterator it = *this; 79 it.ind_ = ind_; 80 ind_--; 81 return it; 82 } // post-fix -- overload 83 DatasetOp &operator*() { return *nodes_[ind_]; } // dereference operator 84 std::shared_ptr<DatasetOp> operator->() { return nodes_[ind_]; } 85 86 // getter function 87 // @return Shared pointer to the current operator get()88 std::shared_ptr<DatasetOp> get() { return nodes_[ind_]; } 89 90 bool operator==(const Iterator &rhs) { return nodes_[ind_] == rhs.nodes_[rhs.ind_]; } 91 92 bool operator!=(const Iterator &rhs) { return nodes_[ind_] != rhs.nodes_[rhs.ind_]; } 93 NumNodes()94 int32_t NumNodes() { return nodes_.size(); } 95 96 private: 97 int32_t ind_; // the cur node our Iterator points to 98 std::vector<std::shared_ptr<DatasetOp>> nodes_; // store the nodes in post order 99 void PostOrderTraverse(const std::shared_ptr<DatasetOp> &); 100 }; 101 102 // Constructor 103 ExecutionTree(); 104 105 // Destructor 106 ~ExecutionTree(); 107 108 /// \brief Associates a DatasetOp with this tree. This assigns a valid node id to the operator and 109 /// provides it with a link to the tree. A node cannot form any relationships (parent/child) with 110 /// other nodes unless they are associated with the same tree. 111 /// \param op - The operator to associate 112 /// \return Status The status code returned 113 Status AssociateNode(const std::shared_ptr<DatasetOp> &op); 114 115 /// \brief Set the root node of the tree 116 /// \param op - The operator to assign as root 117 /// \return Status The status code returned 118 Status AssignRoot(const std::shared_ptr<DatasetOp> &op); 119 120 /// \brief Start the execution of the tree 121 /// \return Status The status code returned 122 Status Launch(); 123 124 /// /brief A print method typically used for debugging 125 /// \param out - The output stream to write output to 126 void Print(std::ostream &out, const std::shared_ptr<DatasetOp> &op = nullptr) const; 127 128 /// \brief Return an iterator positioned at the start 129 /// \return Iterator - The iterator 130 ExecutionTree::Iterator begin(const std::shared_ptr<DatasetOp> &root = nullptr) const { 131 return Iterator(root == nullptr ? root_ : root); 132 } 133 134 /// \brief Return an iterator positioned at the end 135 /// \return Iterator - The iterator end()136 ExecutionTree::Iterator end() const { return Iterator(nullptr); } 137 138 /// \brief << Stream output operator overload 139 /// \notes This allows you to write the debug print info using stream operators 140 /// \param out - reference to the output stream being overloaded 141 /// \param exe_tree - reference to the execution tree to display 142 /// \return - the output stream must be returned 143 friend std::ostream &operator<<(std::ostream &out, ExecutionTree &exe_tree) { 144 exe_tree.Print(out); 145 return out; 146 } 147 148 /// \brief Given the number of workers, launches the worker entry function for each. Essentially a 149 /// wrapper for the TaskGroup handling that is stored inside the execution tree. 150 /// \param num_workers - The number of workers to launch 151 /// \param func - The function entry point that workers will execute 152 /// \param name - The description of worker to launch 153 /// \param op_id - The id of corresponding operator, if not inherit from dataset op then it is -1. 154 /// \return Status The status code returned 155 Status LaunchWorkers(int32_t num_workers, std::function<Status(uint32_t)> func, std::string name = "", 156 int32_t operator_id = -1); 157 158 /// \brief Getter method 159 /// \return shared_ptr to the root operator root()160 std::shared_ptr<DatasetOp> root() const { return root_; } 161 162 /// \brief The prepare phase walks the tree in post-order to perform modifications to get it ready for execution. 163 /// \return Status The status code returned 164 Status Prepare(); 165 166 /// \brief Return the pointer to the TaskGroup 167 /// \return raw pointer to the TaskGroup AllTasks()168 TaskGroup *const AllTasks() const { return tg_.get(); } 169 170 /// \brief Return if the ExecutionTree is at end of epoch status 171 /// \return bool - true is ExecutionTree is end of epoch status IsEpochEnd()172 bool IsEpochEnd() const { return tree_state_ == TreeState::kDeTStateEpochEnd; } 173 174 /// \brief Set the ExecutionTree to EOE state SetEpochEnd()175 void SetEpochEnd() { tree_state_ = TreeState::kDeTStateEpochEnd; } 176 177 /// \brief Set the ExecutionTree to executing state SetExecuting()178 void SetExecuting() { tree_state_ = TreeState::kDeTStateExecuting; } 179 180 /// \brief Set the ExecutionTree to Finished state. SetFinished()181 void SetFinished() { tree_state_ = TreeState::kDeTStateFinished; } 182 183 /// \brief Return if the ExecutionTree is finished (iterator receives EOF). 184 /// \return Bool - true is ExecutionTree is finished isFinished()185 bool isFinished() const { return tree_state_ == TreeState::kDeTStateFinished; } 186 187 /// \brief Return if the ExecutionTree is ready. 188 /// \return Bool - true is ExecutionTree is ready isPrepared()189 bool isPrepared() const { 190 return tree_state_ == TreeState::kDeTStatePrepared || tree_state_ == TreeState::kDeTStateExecuting || 191 tree_state_ == TreeState::kDeTStateFinished; 192 } 193 194 /// \brief Getter for profiling manager, no ownership 195 #ifndef ENABLE_SECURITY GetProfilingManager()196 ProfilingManager *GetProfilingManager() { return profiling_manager_.get(); } 197 #endif 198 199 private: 200 /// \brief A helper functions for doing the recursive printing 201 /// \param dataset_op - The dataset op to print 202 /// \param indent - an indent string for aligning child levels in output 203 /// \param last - an indicator if it's the last child or not 204 /// \param detailed - should it display the detailed node output or the summary line 205 void PrintNode(std::ostream &out, const std::shared_ptr<DatasetOp> &dataset_op, std::string indent, bool last, 206 bool detailed) const; 207 208 std::unique_ptr<TaskGroup> tg_; // Class for worker management 209 std::shared_ptr<DatasetOp> root_; // The root node of the tree 210 int32_t id_count_; // Counter for generating operator id's 211 uint32_t prepare_flags_; // Flags used during tree prepare 212 TreeState tree_state_; // Tracking the current tree state 213 #ifndef ENABLE_SECURITY 214 std::unique_ptr<ProfilingManager> profiling_manager_; // Profiling manager 215 #endif 216 #if defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE) 217 // This rank_id is for numa and device_queue, one process work with only one rank_id, 218 // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES', 219 // but for distribute scenario, this rank_id come from _get_global_rank() in python 220 int32_t rank_id_; 221 bool numa_enable_; 222 void *handle_; 223 #endif 224 }; 225 } // namespace dataset 226 } // namespace mindspore 227 228 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_EXECUTION_TREE_H_ 229