1 /** 2 * Copyright 2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_ABSTRACT_ACTOR_H_ 18 #define MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_ABSTRACT_ACTOR_H_ 19 20 #include <chrono> 21 #include <vector> 22 #include <string> 23 #include <memory> 24 #include <utility> 25 #include <set> 26 #include <unordered_set> 27 #include <map> 28 #include "mindrt/include/actor/op_actor.h" 29 #include "runtime/graph_scheduler/actor/actor_common.h" 30 #include "runtime/graph_scheduler/device_tensor_store.h" 31 #include "runtime/graph_scheduler/device_tensor_copy_store.h" 32 #include "runtime/hardware/device_context.h" 33 34 namespace mindspore { 35 namespace runtime { 36 using mindspore::device::DeviceContext; 37 using mindspore::kernel::KernelTensor; 38 39 // The flag of output data. 40 constexpr size_t kOutputDataFlagInit = 0; 41 // Indicates that the output data destination is stack actor, and the output data cannot be reused. 42 constexpr size_t kOutputDataFlagToStack = 1; 43 // Indicates that the output data is the batch data, and send data in batches to increase the sending performance. 44 constexpr size_t kOutputDataFlagBatch = 2; 45 // Indicates that the output data is the last data in the batch. 46 constexpr size_t kOutputDataFlagLastBatch = 4; 47 // Indicates that the output data destination is the internal fusion actor, and uses the synchronous sending interface. 48 constexpr size_t kOutputDataFlagBetweenFusion = 8; 49 // Indicates that the output data destination is the fusion actor, and needs to use the fusion output index. 50 constexpr size_t kOutputDataFlagToFusion = 16; 51 52 // Counter for callback. 53 class CallbackCounter { 54 public: 55 CallbackCounter() = default; 56 ~CallbackCounter() = default; 57 58 CallbackCounter(const CallbackCounter &) = delete; 59 CallbackCounter &operator=(const CallbackCounter &) = delete; 60 Counter()61 size_t Counter() { return counter_.load(); } Increase()62 size_t Increase() { return ++counter_; } Decrease()63 size_t Decrease() { return --counter_; } 64 expired()65 bool expired() const { return expired_.load(); } set_expired(bool expired)66 void set_expired(bool expired) { expired_ = expired; } 67 Wait()68 void Wait() { 69 std::unique_lock<std::mutex> locker(lock_); 70 MS_LOG(DEBUG) << "Wait for callback execution start."; 71 while (!cv_.wait_for(locker, std::chrono::seconds(1), [&]() { return counter_.load() == 0; })) { 72 MS_LOG(DEBUG) << "Wait cycle."; 73 } 74 } 75 Notify()76 void Notify() { 77 if (counter_.load() == 0) { 78 std::unique_lock<std::mutex> locker(lock_); 79 cv_.notify_all(); 80 } 81 } 82 83 std::atomic<size_t> reserved_memory_size_{0}; 84 85 private: 86 std::atomic<size_t> counter_{0}; 87 std::mutex lock_; 88 std::condition_variable cv_; 89 // Callback executed within async thread, this help to indicate that actor is expired. 90 std::atomic<bool> expired_{false}; 91 }; 92 using CallbackCounterPtr = std::shared_ptr<CallbackCounter>; 93 94 // The abstract common attributes of actors. The actor inheritance relationship: OpActor --> AbstractActor --> 95 // MemoryAwareActor --> DebugAwareActor --> KernelActor/DataSourceActor/CopyActor/LoopCountActor/OutputActor. 96 class AbstractActor : public OpActor<DeviceTensor> { 97 public: AbstractActor(const std::string & name,KernelTransformType type,const AID * recorder_aid)98 explicit AbstractActor(const std::string &name, KernelTransformType type, const AID *recorder_aid) 99 : OpActor(name), 100 type_(type), 101 recorder_aid_(recorder_aid), 102 input_datas_num_(0), 103 input_controls_num_(0), 104 running_dependent_msg_num_(0), 105 parent_fusion_actor_{nullptr}, 106 memory_alloc_insert_position_{nullptr}, 107 memory_free_insert_position_{nullptr} { 108 static std::atomic<int64_t> gActorId; 109 actor_id_ = ++gActorId; 110 } 111 ~AbstractActor() override = default; 112 IsActive(int msg_num)113 bool IsActive(int msg_num) override { return msg_num >= running_dependent_msg_num_ ? true : false; } 114 115 // The actor run when receive the input data. 116 void RunOpData(OpData<DeviceTensor> *const input_data, OpContext<DeviceTensor> *const context) override; 117 // The actor run when receive the input control. 118 void RunOpControl(AID *const input_control, OpContext<DeviceTensor> *const context) override; 119 // The actor run when receive the batch input data. 120 void RunBatchOpData(std::vector<OpData<DeviceTensor> *> *const batch_input_data, 121 OpContext<DeviceTensor> *const context); 122 123 // Get the position of node in the actor. FetchNodePosition(const KernelWithIndex & node)124 virtual size_t FetchNodePosition(const KernelWithIndex &node) const { return 0; } 125 126 // Get the member. type()127 KernelTransformType type() const { return type_; } actor_id()128 int64_t actor_id() const { return actor_id_; } device_contexts()129 const std::vector<const DeviceContext *> &device_contexts() const { return device_contexts_; } output_data_nodes()130 const std::vector<AnfNodePtr> &output_data_nodes() const { return output_data_nodes_; } device_tensor_store_keys()131 const std::vector<std::pair<size_t, AnfNodePtr>> &device_tensor_store_keys() const { 132 return device_tensor_store_keys_; 133 } set_device_tensor_store_keys(const std::vector<std::pair<size_t,AnfNodePtr>> & device_tensor_store_keys)134 void set_device_tensor_store_keys(const std::vector<std::pair<size_t, AnfNodePtr>> &device_tensor_store_keys) { 135 device_tensor_store_keys_ = device_tensor_store_keys; 136 } input_data_arrow_aids()137 const std::vector<std::pair<AID, DataArrow *>> &input_data_arrow_aids() const { return input_data_arrow_aids_; } input_control_arrow_aids()138 const std::vector<std::pair<AID, ControlArrow *>> &input_control_arrow_aids() const { 139 return input_control_arrow_aids_; 140 } internal_parameters()141 const std::map<KernelWithIndex, std::vector<AnfNodeWeakPtr>> &internal_parameters() const { 142 return internal_parameters_; 143 } batch_output_data_arrows()144 const mindspore::HashMap<std::string, std::vector<DataArrowPtr>> &batch_output_data_arrows() const { 145 return batch_output_data_arrows_; 146 } parent_fusion_actor()147 const AbstractActor *parent_fusion_actor() const { return parent_fusion_actor_; } sub_actors()148 const mindspore::HashMap<std::string, std::shared_ptr<AbstractActor>> &sub_actors() const { return sub_actors_; } dependent_actors()149 const std::unordered_set<std::string> &dependent_actors() const { return dependent_actors_; } memory_alloc_insert_position()150 AbstractActor *memory_alloc_insert_position() const { return memory_alloc_insert_position_; } memory_free_insert_position()151 AbstractActor *memory_free_insert_position() const { return memory_free_insert_position_; } device_contexts()152 const std::vector<const DeviceContext *> &device_contexts() { return device_contexts_; } 153 154 protected: 155 friend class GraphScheduler; 156 friend class ControlNodeScheduler; 157 friend class AnyTypeGraphScheduler; 158 friend class SchedulerHelper; 159 160 // Check whether satisfy the actor running condition. 161 virtual bool CheckRunningCondition(const OpContext<DeviceTensor> *context) const; 162 // The actor run really when satisfy the actor running condition. Run(OpContext<DeviceTensor> * const context)163 virtual void Run(OpContext<DeviceTensor> *const context) {} 164 165 // Erase input data and input controls when finish actor running. 166 virtual void EraseInput(const OpContext<DeviceTensor> *context); 167 168 // Fetch input data from the device tensor store. 169 void FetchInputByTensorStore(std::vector<DeviceTensor *> *const input_device_tensors, 170 std::vector<KernelTensor *> *const input_kernel_tensors, 171 std::vector<abstract::AbstractBasePtr> *const input_kernel_tensors_for_infer, 172 std::vector<DeviceTensor *> *const memory_free_tensors, 173 OpContext<DeviceTensor> *const context) const; 174 175 // Init the member output_data_ and batch_output_data_ by output data arrows. 176 void InitOutputData(); 177 // Update the output data before send output data. UpdateOutputData(OpData<DeviceTensor> * const output_data,const DataArrowPtr & data_arrow,const AnfNodePtr & output_node,OpContext<DeviceTensor> * const context)178 virtual void UpdateOutputData(OpData<DeviceTensor> *const output_data, const DataArrowPtr &data_arrow, 179 const AnfNodePtr &output_node, OpContext<DeviceTensor> *const context) {} 180 // Send output to downstream actors to trigger running. 181 virtual void SendOutput(OpContext<DeviceTensor> *const context); 182 // Send recorder info to recorder actor. SendRecorderInfo(OpContext<DeviceTensor> * const context)183 virtual void SendRecorderInfo(OpContext<DeviceTensor> *const context) const {} 184 void SendOutputData(OpContext<DeviceTensor> *const context, const std::vector<AnfNodePtr> &output_data_nodes, 185 const std::vector<DataArrowPtr> &output_data_arrows, 186 const std::vector<std::pair<OpDataUniquePtr<DeviceTensor>, size_t>> &output_data_list, 187 const mindspore::HashMap<DataArrow *, size_t> &data_arrow_to_fusion_actor_indexs, 188 mindspore::HashMap<std::string, std::vector<OpData<DeviceTensor> *>> *batch_output_data); 189 // Fetch the sub actor in the fusion actor by the name. 190 AbstractActor *FetchSubActorInFusionActor(const std::string &sub_actor_name) const; 191 bool IsOutputAddressPersisted(const DeviceTensor *output_device_tensor, const KernelWithIndex &output_node); 192 193 KernelTransformType type_; 194 195 // The device interface. 196 std::vector<const DeviceContext *> device_contexts_; 197 198 // The id of recorder actor. Send message to it for recording info. 199 const AID *recorder_aid_; 200 201 // Auto increment id for actor. 202 int64_t actor_id_; 203 204 // The output_data_nodes_ and output_data_ corresponds to the output_data_arrows_ one by one. 205 std::vector<AnfNodePtr> output_data_nodes_; 206 // The second of pair indicates the output data flag. See constant prefixed with kOutputDataFalg for details. 207 std::vector<std::pair<OpDataUniquePtr<DeviceTensor>, size_t>> output_data_; 208 // Record the fusion output index for output data arrow. 209 mindspore::HashMap<DataArrow *, size_t> data_arrow_to_fusion_actor_indexs_; 210 // Used to send batch data in the message which RunBatchOpData needs, the key is the actor name of destination actor. 211 mindspore::HashMap<std::string, std::vector<OpData<DeviceTensor> *>> batch_output_data_; 212 mindspore::HashMap<std::string, std::vector<DataArrowPtr>> batch_output_data_arrows_; 213 214 // When there is recursion in the graph, the actor will send data to the same stack actor multiple times. Since 215 // messages are sent asynchronously between actors, there will be multiple messages that remain unprocessed in 216 // the channel. In order to prevent old data from being overwritten, it is necessary to allocate a new op data, 217 // and these op data will be uniformly cleared by the scheduler after the step ends. 218 std::vector<OpDataUniquePtr<DeviceTensor>> to_stack_data_; 219 220 // The dependent device tensor stores, the dependent expression is pair<index, AnfNode>. 221 // Index is the input position, AnfNode is the key of the device tensor store. 222 std::vector<std::pair<size_t, AnfNodePtr>> device_tensor_store_keys_; 223 // The device tensor stores which have the auto monad attribute. 224 std::set<AnfNodePtr> auto_monad_device_tensor_stores_; 225 226 // Map <output_node_with_index, internal_parameter> is used to update the shape of internal parameter node for 227 // inferring the dynamic shape information of the nodes located at the boundary of the graph partition, such as 228 // heterogeneous scenario and so on. 229 std::map<KernelWithIndex, std::vector<AnfNodeWeakPtr>> internal_parameters_; 230 231 // The dependent input actors. 232 std::vector<std::pair<AID, DataArrow *>> input_data_arrow_aids_; 233 std::vector<std::pair<AID, ControlArrow *>> input_control_arrow_aids_; 234 // The dependent inputs number. 235 size_t input_datas_num_; 236 size_t input_controls_num_; 237 238 // The dependent messages number of actor running. 239 int running_dependent_msg_num_; 240 241 // Indicates whether the actor is in fusion actor. 242 AbstractActor *parent_fusion_actor_; 243 244 // The sub actors in the fusion actor are not spawned in the ActorMgr, so they do not participate in message 245 // interaction, but only internal processing. 246 mindspore::HashMap<std::string, std::shared_ptr<AbstractActor>> sub_actors_; 247 248 // All actors that the actor depends on for execution, the dependent actors are expanded by the input data and input 249 // controls. For example, ActorA->ActorB->ActorC, the expanded dependent actors of ActorC are ActorA and ActorB. 250 std::unordered_set<std::string> dependent_actors_; 251 252 // The information used for integration of dynamic and static memory. 253 AbstractActor *memory_alloc_insert_position_; 254 AbstractActor *memory_free_insert_position_; 255 }; 256 257 using AbstractActorPtr = std::shared_ptr<AbstractActor>; 258 } // namespace runtime 259 } // namespace mindspore 260 261 #endif // MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_ABSTRACT_ACTOR_H_ 262