• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ACTOR_ACTOR_H
18 #define MINDSPORE_CORE_MINDRT_INCLUDE_ACTOR_ACTOR_H
19 
20 #include <functional>
21 #include <map>
22 #include <memory>
23 #include <mutex>
24 #include <string>
25 #include <utility>
26 #include "thread/hqueue.h"
27 #include "thread/semaphore.h"
28 #include "actor/msg.h"
29 #include "actor/mailbox.h"
30 
31 namespace mindspore {
32 class ActorMgr;
33 class ActorWorker;
34 class ActorThreadPool;
35 
36 // should be at least greater than 1
37 constexpr uint32_t MAX_ACTOR_RECORD_SIZE = 3;
38 
39 class MS_CORE_API ActorBase {
40  public:
GetAID()41   inline const AID &GetAID() const { return id; }
42 
AddMsgRecord(const std::string & msgName)43   inline void AddMsgRecord(const std::string &msgName) {
44     recordNextPoint++;
45     uint32_t startPoint = recordNextPoint % MAX_ACTOR_RECORD_SIZE;
46     msgRecords[startPoint] = msgName;
47   }
48 
PrintMsgRecord()49   inline void PrintMsgRecord() {
50     uint32_t startPoint = recordNextPoint % MAX_ACTOR_RECORD_SIZE;
51     for (uint32_t i = 0; i < MAX_ACTOR_RECORD_SIZE; i++) {
52       MS_LOG(DEBUG) << "Actor message dumps:"
53                     << "actor:" << id.Name().c_str() << " msg:" << msgRecords[startPoint].c_str();
54       startPoint = (startPoint + MAX_ACTOR_RECORD_SIZE - 1) % MAX_ACTOR_RECORD_SIZE;
55     }
56   }
57 
58   ActorBase();
59   explicit ActorBase(const std::string &name);
60   explicit ActorBase(const std::string &name, ActorThreadPool *pool);
61   virtual ~ActorBase();
62 
63   // send  MessageBase message to the  actor.
64   int Send(const AID &to, std::unique_ptr<MessageBase> msg);
65 
66   // send string message to the actor
67   int Send(const AID &to, std::string &&name, std::string &&msg, bool remoteLink = false,
68            bool isExactNotRemote = false);
69 
70   // get output buffer size for flow control
71   uint64_t GetOutBufSize(const AID &to);
72 
73   // get input buffer size for flow control
74   uint64_t GetInBufSize(const AID &to);
75 
76   // set record send/receive message package size
77   int AddRuleUdp(const std::string &peer, int recordNum);
78 
79   // delete the send/receive message package size
80   void DelRuleUdp(const std::string &peer, bool outputLog);
81 
set_thread_pool(ActorThreadPool * pool)82   void set_thread_pool(ActorThreadPool *pool) { pool_ = pool; }
83 
84   // Judge if actor running by the received message number, the default is true.
IsActive(int msg_num)85   virtual bool IsActive(int msg_num) { return true; }
86 
set_actor_mgr(const std::shared_ptr<ActorMgr> & mgr)87   inline void set_actor_mgr(const std::shared_ptr<ActorMgr> &mgr) { actor_mgr_ = mgr; }
get_actor_mgr()88   inline std::shared_ptr<ActorMgr> get_actor_mgr() const { return actor_mgr_; }
89 
90  protected:
91   using ActorFunction = std::function<void(const std::unique_ptr<MessageBase> &msg)>;
92 
93   // install KMSG handler . This method will be called before the actor start to run.
Init()94   virtual void Init() {}
95 
96   // This method will be called before the actor start to terminate.
Finalize()97   virtual void Finalize() {}
98 
99   // KHTTPMsg handler
HandleHttp(const std::unique_ptr<MessageBase> & msg)100   virtual void HandleHttp(const std::unique_ptr<MessageBase> &msg) {
101     MS_LOG(ERROR) << "ACTOR (" << id.Name().c_str() << ") HandleHttp() is not implemented";
102   }
103 
104   // KLOCALMsg handler
HandleLocalMsg(const std::unique_ptr<MessageBase> & msg)105   virtual void HandleLocalMsg(const std::unique_ptr<MessageBase> &msg) {
106     MS_LOG(ERROR) << "ACTOR (" << id.Name().c_str() << ") HandleLocalMsg() is not implemented.";
107   }
108 
109   // The link is closed.
Exited(const AID & actor)110   virtual void Exited(const AID &actor) {
111     MS_LOG(ERROR) << "ACTOR (" << id.Name().c_str() << ") Exited() is not implemented. ";
112   }
113 
114   // Filter the KMSG
Filter(const std::unique_ptr<MessageBase> & msg)115   virtual bool Filter(const std::unique_ptr<MessageBase> &msg) { return false; }
116 
117   // register the message handle
118   void Receive(const std::string &msgName, ActorFunction &&func);
119 
120   // register the message handle. It will be discarded.
121   template <typename T>
Receive(const std::string & msgName,void (T::* method)(mindspore::AID,std::string &&,std::string &&))122   void Receive(const std::string &msgName, void (T::*method)(mindspore::AID, std::string &&, std::string &&)) {
123     ActorFunction func = std::bind(&BehaviorBase1<T>, static_cast<T *>(this), method, std::placeholders::_1);
124     Receive(msgName, std::move(func));
125   }
126 
127   // register the message handle
128   template <typename T>
Receive(const std::string & msgName,void (T::* method)(const mindspore::AID &,std::string &&,std::string &&))129   void Receive(const std::string &msgName, void (T::*method)(const mindspore::AID &, std::string &&, std::string &&)) {
130     ActorFunction func = std::bind(&BehaviorBase<T>, static_cast<T *>(this), method, std::placeholders::_1);
131     Receive(msgName, std::move(func));
132     return;
133   }
134 
135   // register the message handle, for kmsg-udp message
136   template <typename T>
ReceiveUdp(const std::string & msgName,void (T::* method)(const mindspore::AID &,std::string &&,std::string &&))137   void ReceiveUdp(const std::string &msgName,
138                   void (T::*method)(const mindspore::AID &, std::string &&, std::string &&)) {
139     ActorFunction func = std::bind(&BehaviorBaseForUdp<T>, static_cast<T *>(this), method, std::placeholders::_1);
140     Receive(msgName, std::move(func));
141     return;
142   }
143 
144   // Link the remote actor
145   int Link(const AID &to);
146 
147   // Unlink the remote actor
148   int UnLink(const AID &to);
149 
150   // Reconnect to the remote actor
151   int Reconnect(const AID &to);
152 
153   void Terminate();
154   void Await();
155 
156  private:
157   friend class ActorMgr;
158   friend class ActorWorker;
159   friend class ParallelWorker;
160 
161   // KMSG Msg Handler
162   virtual void HandlekMsg(const std::unique_ptr<MessageBase> &msg);
163 
164   template <typename T>
BehaviorBase(T * t,void (T::* method)(const mindspore::AID &,std::string &&,std::string &&),const std::unique_ptr<MessageBase> & msg)165   static void BehaviorBase(T *t, void (T::*method)(const mindspore::AID &, std::string &&, std::string &&),
166                            const std::unique_ptr<MessageBase> &msg) {
167     MINDRT_OOM_EXIT(msg);
168     if (msg->type != MessageBase::Type::KMSG) {
169       MS_LOG(ERROR) << "Drop non-tcp message: from:" << std::string(msg->from).c_str()
170                     << ",to:" << std::string(msg->to).c_str() << ",name:" << msg->name.c_str();
171       return;
172     }
173     (t->*method)(msg->from, std::move(msg->name), std::move(msg->body));
174   }
175 
176   // register the message handle. It will be discarded.
177   template <typename T>
BehaviorBase1(T * t,void (T::* method)(mindspore::AID,std::string &&,std::string &&),const std::unique_ptr<MessageBase> & msg)178   static void BehaviorBase1(T *t, void (T::*method)(mindspore::AID, std::string &&, std::string &&),
179                             const std::unique_ptr<MessageBase> &msg) {
180     MINDRT_OOM_EXIT(msg);
181     if (msg->type != MessageBase::Type::KMSG) {
182       MS_LOG(ERROR) << "Drop non-tcp message:  from:" << std::string(msg->from).c_str()
183                     << ",to:" << std::string(msg->to).c_str() << ",name:" << msg->name.c_str();
184       return;
185     }
186     (t->*method)(msg->from, std::move(msg->name), std::move(msg->body));
187   }
188 
189   // register the udp message handle. Use this closure function to drop non-udp messages
190   template <typename T>
BehaviorBaseForUdp(T * t,void (T::* method)(const mindspore::AID &,std::string &&,std::string &&),const std::unique_ptr<MessageBase> & msg)191   static void BehaviorBaseForUdp(T *t, void (T::*method)(const mindspore::AID &, std::string &&, std::string &&),
192                                  const std::unique_ptr<MessageBase> &msg) {
193     MINDRT_OOM_EXIT(msg);
194     if (msg->type != MessageBase::Type::KUDP) {
195       MS_LOG(ERROR) << "Drop non-udp message:  from:" << std::string(msg->from).c_str()
196                     << ",to:" << std::string(msg->to).c_str() << ",name:" << msg->name.c_str();
197       return;
198     }
199     (t->*method)(msg->from, std::move(msg->name), std::move(msg->body));
200   }
201 
202   void Run();
203   void Quit();
204   int EnqueMessage(std::unique_ptr<MessageBase> msg) const;
205 
206   void Spawn(const std::shared_ptr<ActorBase>, std::unique_ptr<MailBox> mailbox);
207 
208   std::unique_ptr<MailBox> mailbox;
209   std::atomic_bool terminating_{false};
210 
211   AID id;
212   std::map<std::string, ActorFunction> actionFunctions;
213   Semaphore waiterLock{1};
214   std::string msgRecords[MAX_ACTOR_RECORD_SIZE];
215   uint32_t recordNextPoint = 0;
216 
217   ActorThreadPool *pool_{nullptr};
218   std::shared_ptr<ActorMgr> actor_mgr_;
219 };
220 using ActorReference = std::shared_ptr<ActorBase>;
221 };  // namespace mindspore
222 #endif
223