• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TREE_ADAPTER_H_
18 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TREE_ADAPTER_H_
19 
20 #include <algorithm>
21 #include <memory>
22 #include <string>
23 #include <unordered_map>
24 #include <utility>
25 #include <vector>
26 
27 #include "minddata/dataset/engine/execution_tree.h"
28 #include "minddata/dataset/engine/ir/datasetops/dataset_node.h"
29 #include "minddata/dataset/engine/perf/auto_tune.h"
30 #include "minddata/dataset/engine/perf/dataset_iterator_tracing.h"
31 
32 namespace mindspore {
33 namespace dataset {
34 class DatasetNode;
35 class TreeModifier;
36 class ToDevice;
37 class IteratorConsumer;
38 
39 class TreeAdapter {
40 #ifndef ENABLE_SECURITY
41   friend ProfilingManager;
42   friend TreeConsumer;
43   friend ToDevice;
44   friend IteratorConsumer;
45   friend AutoTune;
46 #endif
47   friend TreeModifier;
48 
49  public:
50   // this flag is used to indicate the purpose of the creation of this tree adapter (type of the tree_consumer).
51   // Currently there are 3 types of consumer, Iterator, Getter and TDT/Vocab/Save ...
52   // To avoid premature optimization, the last type (TDT/Vocab/Save) is regarded as Iterator for now.
53   enum UsageFlag { kDeIterator = 0, kDeGetter = 1, kDeReset = 2 };
54 
55   explicit TreeAdapter(UsageFlag usage = kDeIterator);
56 
57   ~TreeAdapter() = default;
58 
59   // This function performs syntax checking, semantics checking, optimizes, and then builds
60   // the Execution tree.
61   Status Compile(const std::shared_ptr<DatasetNode> &input_ir, int32_t num_epochs = -1, int64_t global_step = 0,
62                  int64_t dataset_size = -1);
63 
64   // Return the root node of the IR after cloned from the parsed IR tree
RootIRNode()65   std::shared_ptr<DatasetNode> RootIRNode() const { return root_ir_; }
66 
GetExecutionTree()67   const ExecutionTree *GetExecutionTree() const { return tree_.get(); }
68 
69   // This is the main method TreeConsumer uses to interact with TreeAdapter
70   // 1. GetNext will Launch() the ExeTree on its first call by iterator (tree is already prepared)
71   // 2. GetNext will return empty row when eoe/eof is obtained
72   Status GetNext(TensorRow *);
73 
74   // unique_ptr overloads operator bool(), will return false if it doesn't manage an object
75   // This is needed by Iterator to get data by 'GetNext'.
GetRoot()76   std::weak_ptr<DatasetOp> GetRoot() const { return tree_ ? tree_->root() : nullptr; }
77 
78   // This function will return the column_name_map once BuildAndPrepare() is called
GetColumnNameMap()79   std::unordered_map<std::string, int32_t> GetColumnNameMap() const { return column_name_map_; }
80 
81   // This function returns the TaskGroup associated with ExeTree. This is needed by DeviceQueueConsumer
82   // to be able to launch a thread. BuildAndPrepare needs to be called before this function
AllTasks()83   TaskGroup *const AllTasks() const { return tree_ ? tree_->AllTasks() : nullptr; }
84 
85   Status Launch();
86 
87   // Set optional optimization pass
SetOptimize(bool value)88   void SetOptimize(bool value) { optimize_ = value; }
89 
90   // Optional optimizations status
OptimizationEnabled()91   bool OptimizationEnabled() const { return optimize_; }
92 
93   // Return Offload Json
94   nlohmann::json GetOffloadJson();
95 #ifndef ENABLE_SECURITY
96   /// \brief Setter for Profiling Manager
97   Status SetProfilingManagerPtr(const std::shared_ptr<ProfilingManager> &profiling_manager,
98                                 std::shared_ptr<Tracing> tracing_node = nullptr) {
99     profiling_manager_ = profiling_manager;
100     if (tracing_node != nullptr) {
101       tracing_ = std::dynamic_pointer_cast<DatasetIteratorTracing>(tracing_node);
102     }
103     return Status::OK();
104   }
105 
106   /// \brief Getter for profiling manager, no ownership
GetProfilingManager()107   ProfilingManager *GetProfilingManager() { return profiling_manager_.get(); }
108 #endif
109 
110  protected:
111   // Run the mandatory pass checking the syntax and semantics of the IR tree
112   Status PrePass(const std::shared_ptr<DatasetNode> &ir);
113 
114   // Run the optional optimization pass on the IR tree
115   static Status Optimize(const std::shared_ptr<DatasetNode> &ir);
116 
117   // Run the mandatory pass augmenting the IR tree
118   Status PostPass(const std::shared_ptr<DatasetNode> &ir);
119 
120 #if !defined(__APPLE__) && !defined(BUILD_LITE) && !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && \
121   !defined(ANDROID)
122   // Insert SendBridgeOp and ReceiveBridgeOp to the tree
123   Status InsertSendReceiveOp();
124 
125   // Split the tree to send tree and receive tree
126   Status SplitBySendReceiveOp();
127 #endif
128 
129   // Build an Execution tree
130   Status Build(const std::shared_ptr<DatasetNode> &root_ir, int64_t init_epoch = 0);
131 
132   // This RECURSIVE function walks the (optimized) IR tree in DFS to build its corresponding Execution tree.
133   Status BuildExecutionTreeRecur(const std::shared_ptr<DatasetNode> &ir, std::shared_ptr<DatasetOp> *op);
134 
135   // Adjust the pipeline (eg, move rng_ forward) if in reset mode
136   Status AdjustReset(const int64_t epoch_num);
137 
138   std::unordered_map<std::string, int32_t> column_name_map_;
139   std::shared_ptr<DatasetNode> input_ir_;
140   std::shared_ptr<DatasetNode> root_ir_;
141 #if !defined(__APPLE__) && !defined(BUILD_LITE) && !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && \
142   !defined(ANDROID)
143   // the send tree, like: xxDataset -> map -> ... -> batch -> send
144   std::unique_ptr<ExecutionTree> send_tree_;
145   // the receive tree, like: receive -> iterator / data_queue
146   std::unique_ptr<ExecutionTree> receive_tree_;
147 #endif
148   // 1. the tree holder, the send_tree_ will be moved to it and launched in independent dataset process
149   // 2. the tree holder, the receive_tree_ will be moved to it and launched in main dataset process
150   std::unique_ptr<ExecutionTree> tree_;
151   bool optimize_;  // Flag to enable optional optimization pass
152 #ifndef ENABLE_SECURITY
153   std::shared_ptr<ProfilingManager> profiling_manager_;  // Profiling manager
154   std::shared_ptr<DatasetIteratorTracing> tracing_;      // trace profiling data
155 #endif
156   int32_t cur_batch_num_;           // current batch number, used for profiling
157   int32_t cur_connector_size_;      // current connector size of root op, used for profiling
158   int32_t cur_connector_capacity_;  // current connector capacity of root op, used for profiling
159   UsageFlag usage_;                 // usage of this tree adapter (type of consumer)
160   bool launched_;
161   // State flags for the lifecycle of the tree
162   enum CompileState {
163     kCompileStateInit = 0,      // The freshly initialized state
164     kCompileStateIRGraphBuilt,  // User code has been parsed and its IR graph built
165     kCompileStateIRTreeCloned,  // IR tree has been cloned from the IR graph
166     kCompileStateOptimized,     // IR tree has been optimized
167     kCompileStateReady          // Execution tree is generated from the optimized IR
168   };
169   CompileState tree_state_;
170   nlohmann::json offload_json_;
171 };
172 }  // namespace dataset
173 }  // namespace mindspore
174 
175 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TREE_ADAPTER_H_
176