• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_EXECUTION_TREE_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_EXECUTION_TREE_H_
18 
19 #include <functional>
20 #include <memory>
21 #include <stack>
22 #include <string>
23 #include <vector>
24 #ifndef ENABLE_ANDROID
25 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__APPLE__)
26 #include <sys/sysinfo.h>
27 #include <opencv2/imgproc/imgproc.hpp>
28 #endif
29 #endif
30 #include "minddata/dataset/engine/datasetops/dataset_op.h"
31 #include "minddata/dataset/util/status.h"
32 #ifndef ENABLE_SECURITY
33 #include "mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h"
34 #endif
35 namespace mindspore {
36 namespace dataset {
37 // Forward declares
38 class TaskGroup;
39 class DatasetOp;
40 class Pass;
41 using OptPass = std::vector<std::unique_ptr<Pass>>;
42 class ExecutionTree {
43  public:
44   // State flags for the lifecycle of the tree
45   enum TreeState {
46     kDeTStateInit = 0,   // The freshly initialized state after construction
47     kDeTStateBuilding,   // The tree is being built, nodes are being added
48     kDeTStatePrepared,   // The tree has been prepared and is ready to be launched
49     kDeTStateExecuting,  // The tree has been launched and is executing
50     kDeTStateEpochEnd,   // The tree has been received end of epoch signal, just for profiling
51     kDeTStateFinished    // The tree has been drained, dataset iterator received EOF
52   };
53 
54   class Iterator {
55    public:
56     // support iterator traits
57     using iterator_category = std::bidirectional_iterator_tag;
58     using value_type = DatasetOp;
59     using difference_type = ptrdiff_t;
60     using pointer = DatasetOp *;
61     using reference = DatasetOp &;
62 
63     // Constructor
64     // @param root The root node to start iterating from
65     explicit Iterator(const std::shared_ptr<DatasetOp> &root = nullptr);
66 
67     // Destructor
~Iterator()68     ~Iterator() {}
69 
70     Iterator &operator++() {
71       ++ind_;
72       return *this;
73     }  // prefix ++ overload
74 
75     Iterator operator++(int) {
76       Iterator it = *this;
77       it.ind_ = ind_;
78       ind_++;
79       return it;
80     }  // post-fix ++ overload
81 
82     Iterator &operator--() {
83       --ind_;
84       return *this;
85     }  // prefix -- overload
86 
87     Iterator operator--(int) {
88       Iterator it = *this;
89       it.ind_ = ind_;
90       ind_--;
91       return it;
92     }  // post-fix -- overload
93 
94     DatasetOp &operator*() { return *nodes_[ind_]; }  // dereference operator
95 
96     std::shared_ptr<DatasetOp> operator->() { return nodes_[ind_]; }
97 
98     // getter function
99     // @return Shared pointer to the current operator
get()100     std::shared_ptr<DatasetOp> get() { return nodes_[ind_]; }
101 
102     bool operator==(const Iterator &rhs) { return nodes_[ind_] == rhs.nodes_[rhs.ind_]; }
103 
104     bool operator!=(const Iterator &rhs) { return nodes_[ind_] != rhs.nodes_[rhs.ind_]; }
105 
NumNodes()106     size_t NumNodes() const { return nodes_.size(); }
107 
108    private:
109     size_t ind_;                                     // the cur node our Iterator points to
110     std::vector<std::shared_ptr<DatasetOp>> nodes_;  // store the nodes in post order
111     void PostOrderTraverse(const std::shared_ptr<DatasetOp> &);
112   };
113 
114   // Constructor
115   ExecutionTree();
116 
117   // Destructor
118   ~ExecutionTree();
119 
120   /// \brief Associates a DatasetOp with this tree. This assigns a valid node id to the operator and
121   ///     provides it with a link to the tree. A node cannot form any relationships (parent/child) with
122   ///     other nodes unless they are associated with the same tree.
123   /// \param op - The operator to associate
124   /// \return Status The status code returned
125   Status AssociateNode(const std::shared_ptr<DatasetOp> &op);
126 
127   /// \brief Set the root node of the tree
128   /// \param op - The operator to assign as root
129   /// \return Status The status code returned
130   Status AssignRoot(const std::shared_ptr<DatasetOp> &op);
131 
132   /// \brief Start the execution of the tree
133   /// \return Status The status code returned
134   Status Launch();
135 
136   /// /brief A print method typically used for debugging
137   /// \param out - The output stream to write output to
138   void Print(std::ostream &out, const std::shared_ptr<DatasetOp> &op = nullptr) const;
139 
140   /// \brief Return an iterator positioned at the start
141   /// \return Iterator - The iterator
142   ExecutionTree::Iterator begin(const std::shared_ptr<DatasetOp> &root = nullptr) const {
143     return Iterator(root == nullptr ? root_ : root);
144   }
145 
146   /// \brief Return an iterator positioned at the end
147   /// \return Iterator - The iterator
end()148   ExecutionTree::Iterator end() const { return Iterator(nullptr); }
149 
150   /// \brief << Stream output operator overload
151   /// \notes This allows you to write the debug print info using stream operators
152   /// \param out - reference to the output stream being overloaded
153   /// \param exe_tree - reference to the execution tree to display
154   /// \return - the output stream must be returned
155   friend std::ostream &operator<<(std::ostream &out, const ExecutionTree &exe_tree) {
156     exe_tree.Print(out);
157     return out;
158   }
159 
IsPython()160   const bool IsPython() {
161     for (auto itr = this->begin(); itr != this->end(); ++itr) {
162       if (itr->IsPython()) {
163         return true;
164       }
165     }
166     return false;
167   }
168 
169   /// \brief Given the number of workers, launches the worker entry function for each. Essentially a
170   ///     wrapper for the TaskGroup handling that is stored inside the execution tree.
171   /// \param num_workers - The number of workers to launch
172   /// \param func - The function entry point that workers will execute
173   /// \param[out] worker_tasks - output vector to hold generated tasks
174   /// \param name - The description of worker to launch
175   /// \param op_id - The id of corresponding operator, if not inherit from dataset op then it is -1.
176   /// \return Status The status code returned
177   Status LaunchWorkers(int32_t num_workers, std::function<Status(uint32_t)> func, std::vector<Task *> *worker_tasks,
178                        std::string name, int32_t operator_id = -1);
179 
180   Status LaunchWorkers(int32_t num_workers, std::function<Status(uint32_t)> func, std::string name,
181                        int32_t operator_id = -1);
182 
183   /// \brief Getter method
184   /// \return shared_ptr to the root operator
root()185   std::shared_ptr<DatasetOp> root() const { return root_; }
186 
187   /// \brief The prepare phase walks the tree in post-order to perform modifications to get it ready for execution.
188   /// \param is_pull_mode - an indicator if it's in pull mode or not
189   /// \return Status The status code returned
190   Status Prepare(bool is_pull_mode = false);
191 
192   /// \brief Return the pointer to the TaskGroup
193   /// \return raw pointer to the TaskGroup
AllTasks()194   TaskGroup *const AllTasks() const { return tg_.get(); }
195 
196   /// \brief Return if the ExecutionTree is at end of epoch status
197   /// \return bool - true is ExecutionTree is end of epoch status
IsEpochEnd()198   bool IsEpochEnd() const { return tree_state_ == TreeState::kDeTStateEpochEnd; }
199 
200   /// \brief Set the ExecutionTree to EOE state
SetEpochEnd()201   void SetEpochEnd() { tree_state_ = TreeState::kDeTStateEpochEnd; }
202 
203   /// \brief Set the ExecutionTree to executing state
SetExecuting()204   void SetExecuting() { tree_state_ = TreeState::kDeTStateExecuting; }
205 
206   /// \brief Set the ExecutionTree to Finished state.
SetFinished()207   void SetFinished() { tree_state_ = TreeState::kDeTStateFinished; }
208 
209   /// \brief Return if the ExecutionTree is finished (iterator receives EOF).
210   /// \return Bool - true is ExecutionTree is finished
isFinished()211   bool isFinished() const { return tree_state_ == TreeState::kDeTStateFinished; }
212 
213   /// \brief Return if the ExecutionTree is ready.
214   /// \return Bool - true is ExecutionTree is ready
isPrepared()215   bool isPrepared() const {
216     return tree_state_ == TreeState::kDeTStatePrepared || tree_state_ == TreeState::kDeTStateExecuting ||
217            tree_state_ == TreeState::kDeTStateFinished;
218   }
219 
220   /// \brief Get a unique identifier for the tree
221   /// \return unique ID as a string
GetUniqueId()222   std::string GetUniqueId() { return unique_id_; }
223 
224  private:
225   /// \brief A helper functions for doing the recursive printing
226   /// \param dataset_op - The dataset op to print
227   /// \param indent - an indent string for aligning child levels in output
228   /// \param last - an indicator if it's the last child or not
229   /// \param detailed - should it display the detailed node output or the summary line
230   void PrintNode(std::ostream &out, const std::shared_ptr<DatasetOp> &dataset_op, std::string indent, bool last,
231                  bool detailed) const;
232 
233   std::unique_ptr<TaskGroup> tg_;    // Class for worker management
234   std::shared_ptr<DatasetOp> root_;  // The root node of the tree
235   int32_t id_count_;                 // Counter for generating operator id's
236   TreeState tree_state_;             // Tracking the current tree state
237   std::string unique_id_;            // A unique identifier for the tree
238 
239 #ifdef WITH_BACKEND
240   // Constructor for if defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)
241   explicit ExecutionTree(std::shared_ptr<ConfigManager> cfg);
242   // This rank_id is for numa and device_queue, one process work with only one rank_id,
243   // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
244   // but for distribute scenario, this rank_id come from _get_global_rank() in python
245   int32_t rank_id_;
246   bool numa_enable_;
247   std::shared_ptr<void> handle_;
248 #endif
249 };
250 }  // namespace dataset
251 }  // namespace mindspore
252 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_EXECUTION_TREE_H_
253