• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_DATA_PREPARE_ACTOR_H_
18 #define MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_DATA_PREPARE_ACTOR_H_
19 
20 #include <atomic>
21 #include <vector>
22 #include <string>
23 #include <memory>
24 #include <utility>
25 #include <map>
26 #include <set>
27 #include "utils/hash_map.h"
28 #include "runtime/graph_scheduler/graph_compiler.h"
29 #include "runtime/graph_scheduler/actor/actor_common.h"
30 #include "runtime/graph_scheduler/actor/data_source_actor.h"
31 #include "runtime/graph_scheduler/actor/debug_aware_actor.h"
32 #include "runtime/graph_scheduler/device_tensor_store.h"
33 #include "runtime/hardware/device_context.h"
34 
35 namespace mindspore {
36 namespace runtime {
37 using mindspore::device::DeviceContext;
38 
39 // The data prepare actor is used to prepare data for device tensor store and host tensor queue to represent the begin
40 // of one step.
41 class DataPrepareActor : public DebugAwareActor {
42  public:
DataPrepareActor(const std::string & name,const AID & memory_manager_aid,const AID * debug_aid,const AID * profiler_aid,const GraphCompilerInfo * graph_compiler_info,const HostQueueDSActorPtr & host_data_source_actor,const HostTensorQueuePtr & host_tensor_queue)43   DataPrepareActor(const std::string &name, const AID &memory_manager_aid, const AID *debug_aid,
44                    const AID *profiler_aid, const GraphCompilerInfo *graph_compiler_info,
45                    const HostQueueDSActorPtr &host_data_source_actor, const HostTensorQueuePtr &host_tensor_queue)
46       : DebugAwareActor(name, KernelTransformType::kDataPrepareActor, nullptr, memory_manager_aid, debug_aid,
47                         profiler_aid),
48         graph_compiler_info_(graph_compiler_info),
49         strategy_(GraphExecutionStrategy::kPipeline),
50         real_strategy_(GraphExecutionStrategy::kPipeline),
51         host_data_source_actor_(host_data_source_actor),
52         host_tensor_queue_(host_tensor_queue),
53         first_step_(true),
54         has_parameter_input_(false) {}
55   ~DataPrepareActor() override = default;
56 
57   // The process entry of data prepare.
58   void PrepareData(const std::vector<std::vector<TensorPtr>> &input_tensors, const VectorRef &args,
59                    OpContext<DeviceTensor> *const context, GraphExecutionStrategy real_strategy);
60 
61   // The debug related operation interface.
62   void SendDebugReq(OpContext<DeviceTensor> *const context) override;
63   void SendProfilerReq(OpContext<DeviceTensor> *const context);
64   void OnDebugFinish(OpContext<DeviceTensor> *const context) override;
65 
66   // The continuous memory related operation interface.
67   void SendMemoryAllocReq(OpContext<DeviceTensor> *const context) override;
68   void OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) override;
69 
continuous_memory_nodes()70   const std::map<std::pair<CNodePtr, const DeviceContext *>, std::pair<bool, bool>> &continuous_memory_nodes() const {
71     return continuous_memory_nodes_;
72   }
73 
74  protected:
75   void Init() override;
Run(OpContext<DeviceTensor> * const context)76   void Run(OpContext<DeviceTensor> *const context) override {
77     VectorRef empty_args;
78     PrepareData(init_tensors_, empty_args, context, GraphExecutionStrategy::kPipeline);
79   }
80 
81  private:
82   friend class GraphScheduler;
83 
84   void UpdateDynamicShapeAndSize(const AnfNodePtr &input_node, const TensorPtr &input_tensor) const;
85   void UpdateDeviceAddressForDataNode(const AnfNodePtr &input_node, const TensorPtr &input_tensor);
86 
87   // Fetch the input info.
88   TensorPtr FetchInputTensor(const std::vector<TensorPtr> &tensors, size_t tensor_index, const VectorRef &args,
89                              const KernelWithIndex &front_node) const;
90   TensorPtr FetchInputTensorByArg(const VectorRef &args, size_t arg_index, const KernelWithIndex &front_node) const;
91   size_t FetchInputTensorIndex(const KernelWithIndex &front_node) const;
92 
93   void PrepareDataForDeviceTensorStore(const std::vector<std::vector<TensorPtr>> &input_tensors, const VectorRef &args,
94                                        OpContext<DeviceTensor> *const context);
95   void PrepareDataForHostTensorQueue(const std::vector<std::vector<TensorPtr>> &input_tensors, const VectorRef &args,
96                                      OpContext<DeviceTensor> *const context);
97   void PrepareDataForHostTensorQueueNew(const VectorRef &args, OpContext<DeviceTensor> *const context);
98 
99   // Prepare the device data for persistent device tensor of weight node from host tensor.
100   void PrepareDataForWeightNode(const AnfNodePtr &backend_node, const AnfNodePtr &front_node, const TensorPtr &tensor,
101                                 const DeviceContext *device_context, OpContext<DeviceTensor> *const context);
102   // Prepare the device data for persistent device tensor of value node.
103   void PrepareDataForValueNode(const ValueNodePtr &node, const AnfNodePtr &front_node,
104                                const DeviceContext *device_context, OpContext<DeviceTensor> *const context) const;
105   void PrepareDataForStringValue(const ValueNodePtr &node, size_t index, const AnfNodePtr &front_node,
106                                  const DeviceContext *device_context, OpContext<DeviceTensor> *const context) const;
107   // Sync host data of Sequence or Scalar type value to device side.
108   void PrepareDataForSequenceAndScalarValue(const ValueNodePtr &node, size_t index, const AnfNodePtr &front_node,
109                                             const DeviceContext *device_context,
110                                             OpContext<DeviceTensor> *const context) const;
111   //  The branch processing of PrepareDataForValueNode that value type is tensor.
112   void PrepareDataForValueNodeTensor(const ValueNodePtr &node, const ValuePtr &node_value, const AnfNodePtr &front_node,
113                                      const DeviceContext *device_context, OpContext<DeviceTensor> *const context) const;
114 
115   // The data prepare in the control flow scene.
116   // If the parameters in the root graph are only used by the control node, these parameters will not be initialized
117   // by the kernel graph, and addresses need to be specially allocated for these parameters.
118   void PrepareDeviceTensorStoreForControlNode(const ControlNodeParserPtr &control_node_parser,
119                                               const std::vector<TensorPtr> &tensors, const VectorRef &args,
120                                               OpContext<DeviceTensor> *const context) const;
121   void PrepareHostTensorQueueForControlNode(const std::vector<TensorPtr> &tensors,
122                                             std::vector<TensorPtr> *const host_tensors,
123                                             OpContext<DeviceTensor> *const context);
124   void PrepareDataForControlValueNode(const KernelWithIndex &node_with_index, const DeviceContext *device_context,
125                                       OpContext<DeviceTensor> *const context, const ControlNodeParserPtr &parser) const;
126 
127   // The device tensor stores may exist the two device tensors and need copy data in the heterogeneous scene.
128   void CopyDataFromDeviceTensorStore(const AnfNodePtr &front_node, const AnfNodePtr &backend_node,
129                                      const device::DeviceAddressPtr &host_tensor_address,
130                                      const DeviceContext *device_context, OpContext<DeviceTensor> *context) const;
131 
132   void SetInitTensorsIfNeeded(const std::vector<std::vector<TensorPtr>> &input_tensors);
133 
134   // Preprocess before prepare data for data prepare actor.
135   void PreprocessBeforePrepareData() const;
136 
137   const GraphCompilerInfo *graph_compiler_info_;
138   GraphExecutionStrategy strategy_;
139   GraphExecutionStrategy real_strategy_;
140   HostQueueDSActorPtr host_data_source_actor_;
141   HostTensorQueuePtr host_tensor_queue_;
142 
143   // The nodes need continuous memory, which must allocate in the begin of step running. The first bool of pair
144   // expresses the inputs of node need continuous memory, the second bool of pair expresses the outputs of node need
145   // continuous memory.
146   std::map<std::pair<CNodePtr, const DeviceContext *>, std::pair<bool, bool>> continuous_memory_nodes_;
147   // The members for continuous memory alloc fetched by continuous_memory_nodes_.
148   std::vector<std::vector<DeviceTensorPtr>> continuous_memory_alloc_list_list_;
149   std::vector<std::vector<size_t>> size_list_list_;
150   std::vector<uint32_t> stream_id_list_;
151   std::vector<size_t> total_size_list_;
152   std::vector<const DeviceContext *> continuous_memory_device_contexts_;
153   std::vector<std::vector<TensorPtr>> init_tensors_;
154 
155   // Record the address modified input nodes to refresh the ref node.
156   std::set<AnfNode *> address_modified_input_nodes_;
157   bool first_step_;
158   std::vector<ShapeVector> host_tensors_;
159   bool has_parameter_input_;
160 
161   // The tensor of parameter(weight) maybe update host value by Python phase and need re-prepare to sync new host value
162   // to device side. 'tensors_need_reprepare_' records all tensors whose host value has updated, this HashSet will be
163   // update by update value callback of tensors.
164   static mindspore::HashSet<const tensor::Tensor *> tensors_need_reprepare_;
165 
166   bool has_dynamic_shape_{false};
167 
168   // Global execution count for data prepare actor.
169   static std::atomic<size_t> execution_count_;
170 };  // namespace runtime
171 
172 using DataPrepareActorPtr = std::shared_ptr<DataPrepareActor>;
173 }  // namespace runtime
174 }  // namespace mindspore
175 
176 #endif  // MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_DATA_PREPARE_ACTOR_H_
177