• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_ABSTRACT_ACTOR_H_
18 #define MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_ABSTRACT_ACTOR_H_
19 
20 #include <chrono>
21 #include <vector>
22 #include <string>
23 #include <memory>
24 #include <utility>
25 #include <set>
26 #include <unordered_set>
27 #include <map>
28 #include "mindrt/include/actor/op_actor.h"
29 #include "runtime/graph_scheduler/actor/actor_common.h"
30 #include "runtime/graph_scheduler/device_tensor_store.h"
31 #include "runtime/graph_scheduler/device_tensor_copy_store.h"
32 #include "runtime/hardware/device_context.h"
33 
34 namespace mindspore {
35 namespace runtime {
36 using mindspore::device::DeviceContext;
37 using mindspore::kernel::KernelTensor;
38 
39 // The flag of output data.
40 constexpr size_t kOutputDataFlagInit = 0;
41 // Indicates that the output data destination is stack actor, and the output data cannot be reused.
42 constexpr size_t kOutputDataFlagToStack = 1;
43 // Indicates that the output data is the batch data, and send data in batches to increase the sending performance.
44 constexpr size_t kOutputDataFlagBatch = 2;
45 // Indicates that the output data is the last data in the batch.
46 constexpr size_t kOutputDataFlagLastBatch = 4;
47 // Indicates that the output data destination is the internal fusion actor, and uses the synchronous sending interface.
48 constexpr size_t kOutputDataFlagBetweenFusion = 8;
49 // Indicates that the output data destination is the fusion actor, and needs to use the fusion output index.
50 constexpr size_t kOutputDataFlagToFusion = 16;
51 
52 // Counter for callback.
53 class CallbackCounter {
54  public:
55   CallbackCounter() = default;
56   ~CallbackCounter() = default;
57 
58   CallbackCounter(const CallbackCounter &) = delete;
59   CallbackCounter &operator=(const CallbackCounter &) = delete;
60 
Counter()61   size_t Counter() { return counter_.load(); }
Increase()62   size_t Increase() { return ++counter_; }
Decrease()63   size_t Decrease() { return --counter_; }
64 
expired()65   bool expired() const { return expired_.load(); }
set_expired(bool expired)66   void set_expired(bool expired) { expired_ = expired; }
67 
Wait()68   void Wait() {
69     std::unique_lock<std::mutex> locker(lock_);
70     MS_LOG(DEBUG) << "Wait for callback execution start.";
71     while (!cv_.wait_for(locker, std::chrono::seconds(1), [&]() { return counter_.load() == 0; })) {
72       MS_LOG(DEBUG) << "Wait cycle.";
73     }
74   }
75 
Notify()76   void Notify() {
77     if (counter_.load() == 0) {
78       std::unique_lock<std::mutex> locker(lock_);
79       cv_.notify_all();
80     }
81   }
82 
83   std::atomic<size_t> reserved_memory_size_{0};
84 
85  private:
86   std::atomic<size_t> counter_{0};
87   std::mutex lock_;
88   std::condition_variable cv_;
89   // Callback executed within async thread, this help to indicate that actor is expired.
90   std::atomic<bool> expired_{false};
91 };
92 using CallbackCounterPtr = std::shared_ptr<CallbackCounter>;
93 
94 // The abstract common attributes of actors. The actor inheritance relationship:  OpActor --> AbstractActor -->
95 // MemoryAwareActor --> DebugAwareActor --> KernelActor/DataSourceActor/CopyActor/LoopCountActor/OutputActor.
96 class AbstractActor : public OpActor<DeviceTensor> {
97  public:
AbstractActor(const std::string & name,KernelTransformType type,const AID * recorder_aid)98   explicit AbstractActor(const std::string &name, KernelTransformType type, const AID *recorder_aid)
99       : OpActor(name),
100         type_(type),
101         recorder_aid_(recorder_aid),
102         input_datas_num_(0),
103         input_controls_num_(0),
104         running_dependent_msg_num_(0),
105         parent_fusion_actor_{nullptr},
106         memory_alloc_insert_position_{nullptr},
107         memory_free_insert_position_{nullptr} {
108     static std::atomic<int64_t> gActorId;
109     actor_id_ = ++gActorId;
110   }
111   ~AbstractActor() override = default;
112 
IsActive(int msg_num)113   bool IsActive(int msg_num) override { return msg_num >= running_dependent_msg_num_ ? true : false; }
114 
115   // The actor run when receive the input data.
116   void RunOpData(OpData<DeviceTensor> *const input_data, OpContext<DeviceTensor> *const context) override;
117   // The actor run when receive the input control.
118   void RunOpControl(AID *const input_control, OpContext<DeviceTensor> *const context) override;
119   // The actor run when receive the batch input data.
120   void RunBatchOpData(std::vector<OpData<DeviceTensor> *> *const batch_input_data,
121                       OpContext<DeviceTensor> *const context);
122 
123   // Get the position of node in the actor.
FetchNodePosition(const KernelWithIndex & node)124   virtual size_t FetchNodePosition(const KernelWithIndex &node) const { return 0; }
125 
126   // Get the member.
type()127   KernelTransformType type() const { return type_; }
actor_id()128   int64_t actor_id() const { return actor_id_; }
device_contexts()129   const std::vector<const DeviceContext *> &device_contexts() const { return device_contexts_; }
output_data_nodes()130   const std::vector<AnfNodePtr> &output_data_nodes() const { return output_data_nodes_; }
device_tensor_store_keys()131   const std::vector<std::pair<size_t, AnfNodePtr>> &device_tensor_store_keys() const {
132     return device_tensor_store_keys_;
133   }
set_device_tensor_store_keys(const std::vector<std::pair<size_t,AnfNodePtr>> & device_tensor_store_keys)134   void set_device_tensor_store_keys(const std::vector<std::pair<size_t, AnfNodePtr>> &device_tensor_store_keys) {
135     device_tensor_store_keys_ = device_tensor_store_keys;
136   }
input_data_arrow_aids()137   const std::vector<std::pair<AID, DataArrow *>> &input_data_arrow_aids() const { return input_data_arrow_aids_; }
input_control_arrow_aids()138   const std::vector<std::pair<AID, ControlArrow *>> &input_control_arrow_aids() const {
139     return input_control_arrow_aids_;
140   }
internal_parameters()141   const std::map<KernelWithIndex, std::vector<AnfNodeWeakPtr>> &internal_parameters() const {
142     return internal_parameters_;
143   }
batch_output_data_arrows()144   const mindspore::HashMap<std::string, std::vector<DataArrowPtr>> &batch_output_data_arrows() const {
145     return batch_output_data_arrows_;
146   }
parent_fusion_actor()147   const AbstractActor *parent_fusion_actor() const { return parent_fusion_actor_; }
sub_actors()148   const mindspore::HashMap<std::string, std::shared_ptr<AbstractActor>> &sub_actors() const { return sub_actors_; }
dependent_actors()149   const std::unordered_set<std::string> &dependent_actors() const { return dependent_actors_; }
memory_alloc_insert_position()150   AbstractActor *memory_alloc_insert_position() const { return memory_alloc_insert_position_; }
memory_free_insert_position()151   AbstractActor *memory_free_insert_position() const { return memory_free_insert_position_; }
device_contexts()152   const std::vector<const DeviceContext *> &device_contexts() { return device_contexts_; }
153 
154  protected:
155   friend class GraphScheduler;
156   friend class ControlNodeScheduler;
157   friend class AnyTypeGraphScheduler;
158   friend class SchedulerHelper;
159 
160   // Check whether satisfy the actor running condition.
161   virtual bool CheckRunningCondition(const OpContext<DeviceTensor> *context) const;
162   // The actor run really when satisfy the actor running condition.
Run(OpContext<DeviceTensor> * const context)163   virtual void Run(OpContext<DeviceTensor> *const context) {}
164 
165   // Erase input data and input controls when finish actor running.
166   virtual void EraseInput(const OpContext<DeviceTensor> *context);
167 
168   // Fetch input data from the device tensor store.
169   void FetchInputByTensorStore(std::vector<DeviceTensor *> *const input_device_tensors,
170                                std::vector<KernelTensor *> *const input_kernel_tensors,
171                                std::vector<abstract::AbstractBasePtr> *const input_kernel_tensors_for_infer,
172                                std::vector<DeviceTensor *> *const memory_free_tensors,
173                                OpContext<DeviceTensor> *const context) const;
174 
175   // Init the member output_data_ and batch_output_data_ by output data arrows.
176   void InitOutputData();
177   // Update the output data before send output data.
UpdateOutputData(OpData<DeviceTensor> * const output_data,const DataArrowPtr & data_arrow,const AnfNodePtr & output_node,OpContext<DeviceTensor> * const context)178   virtual void UpdateOutputData(OpData<DeviceTensor> *const output_data, const DataArrowPtr &data_arrow,
179                                 const AnfNodePtr &output_node, OpContext<DeviceTensor> *const context) {}
180   // Send output to downstream actors to trigger running.
181   virtual void SendOutput(OpContext<DeviceTensor> *const context);
182   // Send recorder info to recorder actor.
SendRecorderInfo(OpContext<DeviceTensor> * const context)183   virtual void SendRecorderInfo(OpContext<DeviceTensor> *const context) const {}
184   void SendOutputData(OpContext<DeviceTensor> *const context, const std::vector<AnfNodePtr> &output_data_nodes,
185                       const std::vector<DataArrowPtr> &output_data_arrows,
186                       const std::vector<std::pair<OpDataUniquePtr<DeviceTensor>, size_t>> &output_data_list,
187                       const mindspore::HashMap<DataArrow *, size_t> &data_arrow_to_fusion_actor_indexs,
188                       mindspore::HashMap<std::string, std::vector<OpData<DeviceTensor> *>> *batch_output_data);
189   // Fetch the sub actor in the fusion actor by the name.
190   AbstractActor *FetchSubActorInFusionActor(const std::string &sub_actor_name) const;
191   bool IsOutputAddressPersisted(const DeviceTensor *output_device_tensor, const KernelWithIndex &output_node);
192 
193   KernelTransformType type_;
194 
195   // The device interface.
196   std::vector<const DeviceContext *> device_contexts_;
197 
198   // The id of recorder actor. Send message to it for recording info.
199   const AID *recorder_aid_;
200 
201   // Auto increment id for actor.
202   int64_t actor_id_;
203 
204   // The output_data_nodes_ and output_data_ corresponds to the output_data_arrows_ one by one.
205   std::vector<AnfNodePtr> output_data_nodes_;
206   // The second of pair indicates the output data flag. See constant prefixed with kOutputDataFalg for details.
207   std::vector<std::pair<OpDataUniquePtr<DeviceTensor>, size_t>> output_data_;
208   // Record the fusion output index for output data arrow.
209   mindspore::HashMap<DataArrow *, size_t> data_arrow_to_fusion_actor_indexs_;
210   // Used to send batch data in the message which RunBatchOpData needs, the key is the actor name of destination actor.
211   mindspore::HashMap<std::string, std::vector<OpData<DeviceTensor> *>> batch_output_data_;
212   mindspore::HashMap<std::string, std::vector<DataArrowPtr>> batch_output_data_arrows_;
213 
214   // When there is recursion in the graph, the actor will send data to the same stack actor multiple times. Since
215   // messages are sent asynchronously between actors, there will be multiple messages that remain unprocessed in
216   // the channel. In order to prevent old data from being overwritten, it is necessary to allocate a new op data,
217   // and these op data will be uniformly cleared by the scheduler after the step ends.
218   std::vector<OpDataUniquePtr<DeviceTensor>> to_stack_data_;
219 
220   // The dependent device tensor stores, the dependent expression is pair<index, AnfNode>.
221   // Index is the input position, AnfNode is the key of the device tensor store.
222   std::vector<std::pair<size_t, AnfNodePtr>> device_tensor_store_keys_;
223   // The device tensor stores which have the auto monad attribute.
224   std::set<AnfNodePtr> auto_monad_device_tensor_stores_;
225 
226   // Map <output_node_with_index, internal_parameter> is used to update the shape of internal parameter node for
227   // inferring the dynamic shape information of the nodes located at the boundary of the graph partition, such as
228   // heterogeneous scenario and so on.
229   std::map<KernelWithIndex, std::vector<AnfNodeWeakPtr>> internal_parameters_;
230 
231   // The dependent input actors.
232   std::vector<std::pair<AID, DataArrow *>> input_data_arrow_aids_;
233   std::vector<std::pair<AID, ControlArrow *>> input_control_arrow_aids_;
234   // The dependent inputs number.
235   size_t input_datas_num_;
236   size_t input_controls_num_;
237 
238   // The dependent messages number of actor running.
239   int running_dependent_msg_num_;
240 
241   // Indicates whether the actor is in fusion actor.
242   AbstractActor *parent_fusion_actor_;
243 
244   // The sub actors in the fusion actor are not spawned in the ActorMgr, so they do not participate in message
245   // interaction, but only internal processing.
246   mindspore::HashMap<std::string, std::shared_ptr<AbstractActor>> sub_actors_;
247 
248   // All actors that the actor depends on for execution, the dependent actors are expanded by the input data and input
249   // controls. For example, ActorA->ActorB->ActorC, the expanded dependent actors of ActorC are ActorA and ActorB.
250   std::unordered_set<std::string> dependent_actors_;
251 
252   // The information used for integration of dynamic and static memory.
253   AbstractActor *memory_alloc_insert_position_;
254   AbstractActor *memory_free_insert_position_;
255 };
256 
257 using AbstractActorPtr = std::shared_ptr<AbstractActor>;
258 }  // namespace runtime
259 }  // namespace mindspore
260 
261 #endif  // MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_ABSTRACT_ACTOR_H_
262