1 /** 2 * Copyright 2020-2023 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TREE_ADAPTER_H_ 18 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TREE_ADAPTER_H_ 19 20 #include <algorithm> 21 #include <memory> 22 #include <string> 23 #include <unordered_map> 24 #include <utility> 25 #include <vector> 26 27 #include "minddata/dataset/engine/execution_tree.h" 28 #include "minddata/dataset/engine/ir/datasetops/dataset_node.h" 29 #include "minddata/dataset/engine/perf/auto_tune.h" 30 #include "minddata/dataset/engine/perf/dataset_iterator_tracing.h" 31 32 namespace mindspore { 33 namespace dataset { 34 class DatasetNode; 35 class TreeModifier; 36 class ToDevice; 37 class IteratorConsumer; 38 39 class TreeAdapter { 40 #ifndef ENABLE_SECURITY 41 friend ProfilingManager; 42 friend TreeConsumer; 43 friend ToDevice; 44 friend IteratorConsumer; 45 friend AutoTune; 46 #endif 47 friend TreeModifier; 48 49 public: 50 // this flag is used to indicate the purpose of the creation of this tree adapter (type of the tree_consumer). 51 // Currently there are 3 types of consumer, Iterator, Getter and TDT/Vocab/Save ... 52 // To avoid premature optimization, the last type (TDT/Vocab/Save) is regarded as Iterator for now. 53 enum UsageFlag { kDeIterator = 0, kDeGetter = 1, kDeReset = 2 }; 54 55 explicit TreeAdapter(UsageFlag usage = kDeIterator); 56 57 ~TreeAdapter() = default; 58 59 // This function performs syntax checking, semantics checking, optimizes, and then builds 60 // the Execution tree. 61 Status Compile(const std::shared_ptr<DatasetNode> &input_ir, int32_t num_epochs = -1, int64_t global_step = 0, 62 int64_t dataset_size = -1); 63 64 // Return the root node of the IR after cloned from the parsed IR tree RootIRNode()65 std::shared_ptr<DatasetNode> RootIRNode() const { return root_ir_; } 66 GetExecutionTree()67 const ExecutionTree *GetExecutionTree() const { return tree_.get(); } 68 69 // This is the main method TreeConsumer uses to interact with TreeAdapter 70 // 1. GetNext will Launch() the ExeTree on its first call by iterator (tree is already prepared) 71 // 2. GetNext will return empty row when eoe/eof is obtained 72 Status GetNext(TensorRow *); 73 74 // unique_ptr overloads operator bool(), will return false if it doesn't manage an object 75 // This is needed by Iterator to get data by 'GetNext'. GetRoot()76 std::weak_ptr<DatasetOp> GetRoot() const { return tree_ ? tree_->root() : nullptr; } 77 78 // This function will return the column_name_map once BuildAndPrepare() is called GetColumnNameMap()79 std::unordered_map<std::string, int32_t> GetColumnNameMap() const { return column_name_map_; } 80 81 // This function returns the TaskGroup associated with ExeTree. This is needed by DeviceQueueConsumer 82 // to be able to launch a thread. BuildAndPrepare needs to be called before this function AllTasks()83 TaskGroup *const AllTasks() const { return tree_ ? tree_->AllTasks() : nullptr; } 84 85 Status Launch(); 86 87 // Set optional optimization pass SetOptimize(bool value)88 void SetOptimize(bool value) { optimize_ = value; } 89 90 // Optional optimizations status OptimizationEnabled()91 bool OptimizationEnabled() const { return optimize_; } 92 93 // Return Offload Json 94 nlohmann::json GetOffloadJson(); 95 #ifndef ENABLE_SECURITY 96 /// \brief Setter for Profiling Manager 97 Status SetProfilingManagerPtr(const std::shared_ptr<ProfilingManager> &profiling_manager, 98 std::shared_ptr<Tracing> tracing_node = nullptr) { 99 profiling_manager_ = profiling_manager; 100 if (tracing_node != nullptr) { 101 tracing_ = std::dynamic_pointer_cast<DatasetIteratorTracing>(tracing_node); 102 } 103 return Status::OK(); 104 } 105 106 /// \brief Getter for profiling manager, no ownership GetProfilingManager()107 ProfilingManager *GetProfilingManager() { return profiling_manager_.get(); } 108 #endif 109 110 protected: 111 // Run the mandatory pass checking the syntax and semantics of the IR tree 112 Status PrePass(const std::shared_ptr<DatasetNode> &ir); 113 114 // Run the optional optimization pass on the IR tree 115 static Status Optimize(const std::shared_ptr<DatasetNode> &ir); 116 117 // Run the mandatory pass augmenting the IR tree 118 Status PostPass(const std::shared_ptr<DatasetNode> &ir); 119 120 #if !defined(__APPLE__) && !defined(BUILD_LITE) && !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && \ 121 !defined(ANDROID) 122 // Insert SendBridgeOp and ReceiveBridgeOp to the tree 123 Status InsertSendReceiveOp(); 124 125 // Split the tree to send tree and receive tree 126 Status SplitBySendReceiveOp(); 127 #endif 128 129 // Build an Execution tree 130 Status Build(const std::shared_ptr<DatasetNode> &root_ir, int64_t init_epoch = 0); 131 132 // This RECURSIVE function walks the (optimized) IR tree in DFS to build its corresponding Execution tree. 133 Status BuildExecutionTreeRecur(const std::shared_ptr<DatasetNode> &ir, std::shared_ptr<DatasetOp> *op); 134 135 // Adjust the pipeline (eg, move rng_ forward) if in reset mode 136 Status AdjustReset(const int64_t epoch_num); 137 138 std::unordered_map<std::string, int32_t> column_name_map_; 139 std::shared_ptr<DatasetNode> input_ir_; 140 std::shared_ptr<DatasetNode> root_ir_; 141 #if !defined(__APPLE__) && !defined(BUILD_LITE) && !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && \ 142 !defined(ANDROID) 143 // the send tree, like: xxDataset -> map -> ... -> batch -> send 144 std::unique_ptr<ExecutionTree> send_tree_; 145 // the receive tree, like: receive -> iterator / data_queue 146 std::unique_ptr<ExecutionTree> receive_tree_; 147 #endif 148 // 1. the tree holder, the send_tree_ will be moved to it and launched in independent dataset process 149 // 2. the tree holder, the receive_tree_ will be moved to it and launched in main dataset process 150 std::unique_ptr<ExecutionTree> tree_; 151 bool optimize_; // Flag to enable optional optimization pass 152 #ifndef ENABLE_SECURITY 153 std::shared_ptr<ProfilingManager> profiling_manager_; // Profiling manager 154 std::shared_ptr<DatasetIteratorTracing> tracing_; // trace profiling data 155 #endif 156 int32_t cur_batch_num_; // current batch number, used for profiling 157 int32_t cur_connector_size_; // current connector size of root op, used for profiling 158 int32_t cur_connector_capacity_; // current connector capacity of root op, used for profiling 159 UsageFlag usage_; // usage of this tree adapter (type of consumer) 160 bool launched_; 161 // State flags for the lifecycle of the tree 162 enum CompileState { 163 kCompileStateInit = 0, // The freshly initialized state 164 kCompileStateIRGraphBuilt, // User code has been parsed and its IR graph built 165 kCompileStateIRTreeCloned, // IR tree has been cloned from the IR graph 166 kCompileStateOptimized, // IR tree has been optimized 167 kCompileStateReady // Execution tree is generated from the optimized IR 168 }; 169 CompileState tree_state_; 170 nlohmann::json offload_json_; 171 }; 172 } // namespace dataset 173 } // namespace mindspore 174 175 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TREE_ADAPTER_H_ 176