• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_EXECUTION_TREE_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_EXECUTION_TREE_H_
18 
19 #include <functional>
20 #include <memory>
21 #include <stack>
22 #include <string>
23 #include <vector>
24 #ifndef ENABLE_ANDROID
25 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__APPLE__)
26 #include <sys/sysinfo.h>
27 #include <opencv2/imgproc/imgproc.hpp>
28 #endif
29 #endif
30 #include "minddata/dataset/engine/datasetops/dataset_op.h"
31 #include "minddata/dataset/util/status.h"
32 #ifndef ENABLE_SECURITY
33 #include "mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h"
34 #endif
35 namespace mindspore {
36 namespace dataset {
37 // Forward declares
38 class TaskGroup;
39 class DatasetOp;
40 class Pass;
41 using OptPass = std::vector<std::unique_ptr<Pass>>;
42 class ExecutionTree {
43  public:
44   // State flags for the lifecycle of the tree
45   enum TreeState {
46     kDeTStateInit = 0,   // The freshly initialized state after construction
47     kDeTStateBuilding,   // The tree is being built, nodes are being added
48     kDeTStatePrepared,   // The tree has been prepared and is ready to be launched
49     kDeTStateExecuting,  // The tree has been launched and is executing
50     kDeTStateEpochEnd,   // The tree has been received end of epoch signal, just for profiling
51     kDeTStateFinished    // The tree has been drained, dataset iterator received EOF
52   };
53 
54   class Iterator {
55    public:
56     // Constructor
57     // @param root The root node to start iterating from
58     explicit Iterator(const std::shared_ptr<DatasetOp> &root = nullptr);
59 
60     // Destructor
~Iterator()61     ~Iterator() {}
62 
63     Iterator &operator++() {
64       ++ind_;
65       return *this;
66     }  // prefix ++ overload
67     Iterator operator++(int) {
68       Iterator it = *this;
69       it.ind_ = ind_;
70       ind_++;
71       return it;
72     }  // post-fix ++ overload
73     Iterator &operator--() {
74       --ind_;
75       return *this;
76     }  // prefix -- overload
77     Iterator operator--(int) {
78       Iterator it = *this;
79       it.ind_ = ind_;
80       ind_--;
81       return it;
82     }                                                 // post-fix -- overload
83     DatasetOp &operator*() { return *nodes_[ind_]; }  // dereference operator
84     std::shared_ptr<DatasetOp> operator->() { return nodes_[ind_]; }
85 
86     // getter function
87     // @return Shared pointer to the current operator
get()88     std::shared_ptr<DatasetOp> get() { return nodes_[ind_]; }
89 
90     bool operator==(const Iterator &rhs) { return nodes_[ind_] == rhs.nodes_[rhs.ind_]; }
91 
92     bool operator!=(const Iterator &rhs) { return nodes_[ind_] != rhs.nodes_[rhs.ind_]; }
93 
NumNodes()94     int32_t NumNodes() { return nodes_.size(); }
95 
96    private:
97     int32_t ind_;                                    // the cur node our Iterator points to
98     std::vector<std::shared_ptr<DatasetOp>> nodes_;  // store the nodes in post order
99     void PostOrderTraverse(const std::shared_ptr<DatasetOp> &);
100   };
101 
102   // Constructor
103   ExecutionTree();
104 
105   // Destructor
106   ~ExecutionTree();
107 
108   /// \brief Associates a DatasetOp with this tree. This assigns a valid node id to the operator and
109   ///     provides it with a link to the tree. A node cannot form any relationships (parent/child) with
110   ///     other nodes unless they are associated with the same tree.
111   /// \param op - The operator to associate
112   /// \return Status The status code returned
113   Status AssociateNode(const std::shared_ptr<DatasetOp> &op);
114 
115   /// \brief Set the root node of the tree
116   /// \param op - The operator to assign as root
117   /// \return Status The status code returned
118   Status AssignRoot(const std::shared_ptr<DatasetOp> &op);
119 
120   /// \brief Start the execution of the tree
121   /// \return Status The status code returned
122   Status Launch();
123 
124   /// /brief A print method typically used for debugging
125   /// \param out - The output stream to write output to
126   void Print(std::ostream &out, const std::shared_ptr<DatasetOp> &op = nullptr) const;
127 
128   /// \brief Return an iterator positioned at the start
129   /// \return Iterator - The iterator
130   ExecutionTree::Iterator begin(const std::shared_ptr<DatasetOp> &root = nullptr) const {
131     return Iterator(root == nullptr ? root_ : root);
132   }
133 
134   /// \brief Return an iterator positioned at the end
135   /// \return Iterator - The iterator
end()136   ExecutionTree::Iterator end() const { return Iterator(nullptr); }
137 
138   /// \brief << Stream output operator overload
139   /// \notes This allows you to write the debug print info using stream operators
140   /// \param out - reference to the output stream being overloaded
141   /// \param exe_tree - reference to the execution tree to display
142   /// \return - the output stream must be returned
143   friend std::ostream &operator<<(std::ostream &out, ExecutionTree &exe_tree) {
144     exe_tree.Print(out);
145     return out;
146   }
147 
148   /// \brief Given the number of workers, launches the worker entry function for each. Essentially a
149   ///     wrapper for the TaskGroup handling that is stored inside the execution tree.
150   /// \param num_workers - The number of workers to launch
151   /// \param func - The function entry point that workers will execute
152   /// \param name - The description of worker to launch
153   /// \param op_id - The id of corresponding operator, if not inherit from dataset op then it is -1.
154   /// \return Status The status code returned
155   Status LaunchWorkers(int32_t num_workers, std::function<Status(uint32_t)> func, std::string name = "",
156                        int32_t operator_id = -1);
157 
158   /// \brief Getter method
159   /// \return shared_ptr to the root operator
root()160   std::shared_ptr<DatasetOp> root() const { return root_; }
161 
162   /// \brief The prepare phase walks the tree in post-order to perform modifications to get it ready for execution.
163   /// \return Status The status code returned
164   Status Prepare();
165 
166   /// \brief Return the pointer to the TaskGroup
167   /// \return raw pointer to the TaskGroup
AllTasks()168   TaskGroup *const AllTasks() const { return tg_.get(); }
169 
170   /// \brief Return if the ExecutionTree is at end of epoch status
171   /// \return bool - true is ExecutionTree is end of epoch status
IsEpochEnd()172   bool IsEpochEnd() const { return tree_state_ == TreeState::kDeTStateEpochEnd; }
173 
174   /// \brief Set the ExecutionTree to EOE state
SetEpochEnd()175   void SetEpochEnd() { tree_state_ = TreeState::kDeTStateEpochEnd; }
176 
177   /// \brief Set the ExecutionTree to executing state
SetExecuting()178   void SetExecuting() { tree_state_ = TreeState::kDeTStateExecuting; }
179 
180   /// \brief Set the ExecutionTree to Finished state.
SetFinished()181   void SetFinished() { tree_state_ = TreeState::kDeTStateFinished; }
182 
183   /// \brief Return if the ExecutionTree is finished (iterator receives EOF).
184   /// \return Bool - true is ExecutionTree is finished
isFinished()185   bool isFinished() const { return tree_state_ == TreeState::kDeTStateFinished; }
186 
187   /// \brief Return if the ExecutionTree is ready.
188   /// \return Bool - true is ExecutionTree is ready
isPrepared()189   bool isPrepared() const {
190     return tree_state_ == TreeState::kDeTStatePrepared || tree_state_ == TreeState::kDeTStateExecuting ||
191            tree_state_ == TreeState::kDeTStateFinished;
192   }
193 
194   /// \brief Getter for profiling manager, no ownership
195 #ifndef ENABLE_SECURITY
GetProfilingManager()196   ProfilingManager *GetProfilingManager() { return profiling_manager_.get(); }
197 #endif
198 
199  private:
200   /// \brief A helper functions for doing the recursive printing
201   /// \param dataset_op - The dataset op to print
202   /// \param indent - an indent string for aligning child levels in output
203   /// \param last - an indicator if it's the last child or not
204   /// \param detailed - should it display the detailed node output or the summary line
205   void PrintNode(std::ostream &out, const std::shared_ptr<DatasetOp> &dataset_op, std::string indent, bool last,
206                  bool detailed) const;
207 
208   std::unique_ptr<TaskGroup> tg_;    // Class for worker management
209   std::shared_ptr<DatasetOp> root_;  // The root node of the tree
210   int32_t id_count_;                 // Counter for generating operator id's
211   uint32_t prepare_flags_;           // Flags used during tree prepare
212   TreeState tree_state_;             // Tracking the current tree state
213 #ifndef ENABLE_SECURITY
214   std::unique_ptr<ProfilingManager> profiling_manager_;  // Profiling manager
215 #endif
216 #if defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)
217   // This rank_id is for numa and device_queue, one process work with only one rank_id,
218   // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
219   // but for distribute scenario, this rank_id come from _get_global_rank() in python
220   int32_t rank_id_;
221   bool numa_enable_;
222   void *handle_;
223 #endif
224 };
225 }  // namespace dataset
226 }  // namespace mindspore
227 
228 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_EXECUTION_TREE_H_
229