1 /** 2 * Copyright 2021-2022 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_DATA_PREPARE_ACTOR_H_ 18 #define MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_DATA_PREPARE_ACTOR_H_ 19 20 #include <atomic> 21 #include <vector> 22 #include <string> 23 #include <memory> 24 #include <utility> 25 #include <map> 26 #include <set> 27 #include "utils/hash_map.h" 28 #include "runtime/graph_scheduler/graph_compiler.h" 29 #include "runtime/graph_scheduler/actor/actor_common.h" 30 #include "runtime/graph_scheduler/actor/data_source_actor.h" 31 #include "runtime/graph_scheduler/actor/debug_aware_actor.h" 32 #include "runtime/graph_scheduler/device_tensor_store.h" 33 #include "runtime/hardware/device_context.h" 34 35 namespace mindspore { 36 namespace runtime { 37 using mindspore::device::DeviceContext; 38 39 // The data prepare actor is used to prepare data for device tensor store and host tensor queue to represent the begin 40 // of one step. 41 class DataPrepareActor : public DebugAwareActor { 42 public: DataPrepareActor(const std::string & name,const AID & memory_manager_aid,const AID * debug_aid,const AID * profiler_aid,const GraphCompilerInfo * graph_compiler_info,const HostQueueDSActorPtr & host_data_source_actor,const HostTensorQueuePtr & host_tensor_queue)43 DataPrepareActor(const std::string &name, const AID &memory_manager_aid, const AID *debug_aid, 44 const AID *profiler_aid, const GraphCompilerInfo *graph_compiler_info, 45 const HostQueueDSActorPtr &host_data_source_actor, const HostTensorQueuePtr &host_tensor_queue) 46 : DebugAwareActor(name, KernelTransformType::kDataPrepareActor, nullptr, memory_manager_aid, debug_aid, 47 profiler_aid), 48 graph_compiler_info_(graph_compiler_info), 49 strategy_(GraphExecutionStrategy::kPipeline), 50 real_strategy_(GraphExecutionStrategy::kPipeline), 51 host_data_source_actor_(host_data_source_actor), 52 host_tensor_queue_(host_tensor_queue), 53 first_step_(true), 54 has_parameter_input_(false) {} 55 ~DataPrepareActor() override = default; 56 57 // The process entry of data prepare. 58 void PrepareData(const std::vector<std::vector<TensorPtr>> &input_tensors, const VectorRef &args, 59 OpContext<DeviceTensor> *const context, GraphExecutionStrategy real_strategy); 60 61 // The debug related operation interface. 62 void SendDebugReq(OpContext<DeviceTensor> *const context) override; 63 void SendProfilerReq(OpContext<DeviceTensor> *const context); 64 void OnDebugFinish(OpContext<DeviceTensor> *const context) override; 65 66 // The continuous memory related operation interface. 67 void SendMemoryAllocReq(OpContext<DeviceTensor> *const context) override; 68 void OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) override; 69 continuous_memory_nodes()70 const std::map<std::pair<CNodePtr, const DeviceContext *>, std::pair<bool, bool>> &continuous_memory_nodes() const { 71 return continuous_memory_nodes_; 72 } 73 74 protected: 75 void Init() override; Run(OpContext<DeviceTensor> * const context)76 void Run(OpContext<DeviceTensor> *const context) override { 77 VectorRef empty_args; 78 PrepareData(init_tensors_, empty_args, context, GraphExecutionStrategy::kPipeline); 79 } 80 81 private: 82 friend class GraphScheduler; 83 84 void UpdateDynamicShapeAndSize(const AnfNodePtr &input_node, const TensorPtr &input_tensor) const; 85 void UpdateDeviceAddressForDataNode(const AnfNodePtr &input_node, const TensorPtr &input_tensor); 86 87 // Fetch the input info. 88 TensorPtr FetchInputTensor(const std::vector<TensorPtr> &tensors, size_t tensor_index, const VectorRef &args, 89 const KernelWithIndex &front_node) const; 90 TensorPtr FetchInputTensorByArg(const VectorRef &args, size_t arg_index, const KernelWithIndex &front_node) const; 91 size_t FetchInputTensorIndex(const KernelWithIndex &front_node) const; 92 93 void PrepareDataForDeviceTensorStore(const std::vector<std::vector<TensorPtr>> &input_tensors, const VectorRef &args, 94 OpContext<DeviceTensor> *const context); 95 void PrepareDataForHostTensorQueue(const std::vector<std::vector<TensorPtr>> &input_tensors, const VectorRef &args, 96 OpContext<DeviceTensor> *const context); 97 void PrepareDataForHostTensorQueueNew(const VectorRef &args, OpContext<DeviceTensor> *const context); 98 99 // Prepare the device data for persistent device tensor of weight node from host tensor. 100 void PrepareDataForWeightNode(const AnfNodePtr &backend_node, const AnfNodePtr &front_node, const TensorPtr &tensor, 101 const DeviceContext *device_context, OpContext<DeviceTensor> *const context); 102 // Prepare the device data for persistent device tensor of value node. 103 void PrepareDataForValueNode(const ValueNodePtr &node, const AnfNodePtr &front_node, 104 const DeviceContext *device_context, OpContext<DeviceTensor> *const context) const; 105 void PrepareDataForStringValue(const ValueNodePtr &node, size_t index, const AnfNodePtr &front_node, 106 const DeviceContext *device_context, OpContext<DeviceTensor> *const context) const; 107 // Sync host data of Sequence or Scalar type value to device side. 108 void PrepareDataForSequenceAndScalarValue(const ValueNodePtr &node, size_t index, const AnfNodePtr &front_node, 109 const DeviceContext *device_context, 110 OpContext<DeviceTensor> *const context) const; 111 // The branch processing of PrepareDataForValueNode that value type is tensor. 112 void PrepareDataForValueNodeTensor(const ValueNodePtr &node, const ValuePtr &node_value, const AnfNodePtr &front_node, 113 const DeviceContext *device_context, OpContext<DeviceTensor> *const context) const; 114 115 // The data prepare in the control flow scene. 116 // If the parameters in the root graph are only used by the control node, these parameters will not be initialized 117 // by the kernel graph, and addresses need to be specially allocated for these parameters. 118 void PrepareDeviceTensorStoreForControlNode(const ControlNodeParserPtr &control_node_parser, 119 const std::vector<TensorPtr> &tensors, const VectorRef &args, 120 OpContext<DeviceTensor> *const context) const; 121 void PrepareHostTensorQueueForControlNode(const std::vector<TensorPtr> &tensors, 122 std::vector<TensorPtr> *const host_tensors, 123 OpContext<DeviceTensor> *const context); 124 void PrepareDataForControlValueNode(const KernelWithIndex &node_with_index, const DeviceContext *device_context, 125 OpContext<DeviceTensor> *const context, const ControlNodeParserPtr &parser) const; 126 127 // The device tensor stores may exist the two device tensors and need copy data in the heterogeneous scene. 128 void CopyDataFromDeviceTensorStore(const AnfNodePtr &front_node, const AnfNodePtr &backend_node, 129 const device::DeviceAddressPtr &host_tensor_address, 130 const DeviceContext *device_context, OpContext<DeviceTensor> *context) const; 131 132 void SetInitTensorsIfNeeded(const std::vector<std::vector<TensorPtr>> &input_tensors); 133 134 // Preprocess before prepare data for data prepare actor. 135 void PreprocessBeforePrepareData() const; 136 137 const GraphCompilerInfo *graph_compiler_info_; 138 GraphExecutionStrategy strategy_; 139 GraphExecutionStrategy real_strategy_; 140 HostQueueDSActorPtr host_data_source_actor_; 141 HostTensorQueuePtr host_tensor_queue_; 142 143 // The nodes need continuous memory, which must allocate in the begin of step running. The first bool of pair 144 // expresses the inputs of node need continuous memory, the second bool of pair expresses the outputs of node need 145 // continuous memory. 146 std::map<std::pair<CNodePtr, const DeviceContext *>, std::pair<bool, bool>> continuous_memory_nodes_; 147 // The members for continuous memory alloc fetched by continuous_memory_nodes_. 148 std::vector<std::vector<DeviceTensorPtr>> continuous_memory_alloc_list_list_; 149 std::vector<std::vector<size_t>> size_list_list_; 150 std::vector<uint32_t> stream_id_list_; 151 std::vector<size_t> total_size_list_; 152 std::vector<const DeviceContext *> continuous_memory_device_contexts_; 153 std::vector<std::vector<TensorPtr>> init_tensors_; 154 155 // Record the address modified input nodes to refresh the ref node. 156 std::set<AnfNode *> address_modified_input_nodes_; 157 bool first_step_; 158 std::vector<ShapeVector> host_tensors_; 159 bool has_parameter_input_; 160 161 // The tensor of parameter(weight) maybe update host value by Python phase and need re-prepare to sync new host value 162 // to device side. 'tensors_need_reprepare_' records all tensors whose host value has updated, this HashSet will be 163 // update by update value callback of tensors. 164 static mindspore::HashSet<const tensor::Tensor *> tensors_need_reprepare_; 165 166 bool has_dynamic_shape_{false}; 167 168 // Global execution count for data prepare actor. 169 static std::atomic<size_t> execution_count_; 170 }; // namespace runtime 171 172 using DataPrepareActorPtr = std::shared_ptr<DataPrepareActor>; 173 } // namespace runtime 174 } // namespace mindspore 175 176 #endif // MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_DATA_PREPARE_ACTOR_H_ 177