• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ACTOR_ACTOR_H
18 #define MINDSPORE_CORE_MINDRT_INCLUDE_ACTOR_ACTOR_H
19 
20 #include <functional>
21 #include <map>
22 #include <memory>
23 #include <mutex>
24 #include <string>
25 #include <utility>
26 #include "thread/hqueue.h"
27 #include "actor/msg.h"
28 #include "actor/mailbox.h"
29 
30 namespace mindspore {
31 class ActorMgr;
32 class ActorWorker;
33 class ActorThreadPool;
34 
35 // should be at least greater than 1
36 constexpr uint32_t MAX_ACTOR_RECORD_SIZE = 3;
37 
38 class ActorBase {
39  public:
GetAID()40   inline const AID &GetAID() const { return id; }
41 
AddMsgRecord(const std::string & msgName)42   inline void AddMsgRecord(const std::string &msgName) {
43     recordNextPoint++;
44     uint32_t startPoint = recordNextPoint % MAX_ACTOR_RECORD_SIZE;
45     msgRecords[startPoint] = msgName;
46   }
47 
PrintMsgRecord()48   inline void PrintMsgRecord() {
49     uint32_t startPoint = recordNextPoint % MAX_ACTOR_RECORD_SIZE;
50     for (uint32_t i = 0; i < MAX_ACTOR_RECORD_SIZE; i++) {
51       MS_LOG(DEBUG) << "Actor message dumps:"
52                     << "actor:" << id.Name().c_str() << " msg:" << msgRecords[startPoint].c_str();
53       startPoint = (startPoint + MAX_ACTOR_RECORD_SIZE - 1) % MAX_ACTOR_RECORD_SIZE;
54     }
55   }
56 
57   ActorBase();
58   explicit ActorBase(const std::string &name);
59   explicit ActorBase(const std::string &name, ActorThreadPool *pool);
60   virtual ~ActorBase();
61 
62   // send  MessageBase message to the  actor.
63   int Send(const AID &to, std::unique_ptr<MessageBase> msg);
64 
65   // send string message to the actor
66   int Send(const AID &to, std::string &&name, std::string &&msg, bool remoteLink = false,
67            bool isExactNotRemote = false);
68 
69   // get output buffer size for flow control
70   uint64_t GetOutBufSize(const AID &to);
71 
72   // get input buffer size for flow control
73   uint64_t GetInBufSize(const AID &to);
74 
75   // set record send/receive message package size
76   int AddRuleUdp(const std::string &peer, int recordNum);
77 
78   // delete the send/receive message package size
79   void DelRuleUdp(const std::string &peer, bool outputLog);
80 
set_thread_pool(ActorThreadPool * pool)81   void set_thread_pool(ActorThreadPool *pool) { pool_ = pool; }
82 
83   // Judge if actor running by the received message number, the default is true.
IsActive(int msg_num)84   virtual bool IsActive(int msg_num) { return true; }
85 
86  protected:
87   using ActorFunction = std::function<void(const std::unique_ptr<MessageBase> &msg)>;
88 
89   // install KMSG handler . This method will be called before the actor start to run.
Init()90   virtual void Init() {}
91 
92   // This method will be called before the actor start to terminate.
Finalize()93   virtual void Finalize() {}
94 
95   // KHTTPMsg handler
HandleHttp(const std::unique_ptr<MessageBase> & msg)96   virtual void HandleHttp(const std::unique_ptr<MessageBase> &msg) {
97     MS_LOG(ERROR) << "ACTOR (" << id.Name().c_str() << ") HandleHttp() is not implemented";
98   }
99 
100   // KLOCALMsg handler
HandleLocalMsg(const std::unique_ptr<MessageBase> & msg)101   virtual void HandleLocalMsg(const std::unique_ptr<MessageBase> &msg) {
102     MS_LOG(ERROR) << "ACTOR (" << id.Name().c_str() << ") HandleLocalMsg() is not implemented.";
103   }
104 
105   // The link is closed.
Exited(const AID & actor)106   virtual void Exited(const AID &actor) {
107     MS_LOG(ERROR) << "ACTOR (" << id.Name().c_str() << ") Exited() is not implemented. ";
108   }
109 
110   // Filter the KMSG
Filter(const std::unique_ptr<MessageBase> & msg)111   virtual bool Filter(const std::unique_ptr<MessageBase> &msg) { return false; }
112 
113   // register the message handle
114   void Receive(const std::string &msgName, ActorFunction &&func);
115 
116   // register the message handle. It will be discarded.
117   template <typename T>
Receive(const std::string & msgName,void (T::* method)(mindspore::AID,std::string &&,std::string &&))118   void Receive(const std::string &msgName, void (T::*method)(mindspore::AID, std::string &&, std::string &&)) {
119     ActorFunction func = std::bind(&BehaviorBase1<T>, static_cast<T *>(this), method, std::placeholders::_1);
120     Receive(msgName, std::move(func));
121   }
122 
123   // register the message handle
124   template <typename T>
Receive(const std::string & msgName,void (T::* method)(const mindspore::AID &,std::string &&,std::string &&))125   void Receive(const std::string &msgName, void (T::*method)(const mindspore::AID &, std::string &&, std::string &&)) {
126     ActorFunction func = std::bind(&BehaviorBase<T>, static_cast<T *>(this), method, std::placeholders::_1);
127     Receive(msgName, std::move(func));
128     return;
129   }
130 
131   // register the message handle, for kmsg-udp message
132   template <typename T>
ReceiveUdp(const std::string & msgName,void (T::* method)(const mindspore::AID &,std::string &&,std::string &&))133   void ReceiveUdp(const std::string &msgName,
134                   void (T::*method)(const mindspore::AID &, std::string &&, std::string &&)) {
135     ActorFunction func = std::bind(&BehaviorBaseForUdp<T>, static_cast<T *>(this), method, std::placeholders::_1);
136     Receive(msgName, std::move(func));
137     return;
138   }
139 
140   // Link the remote actor
141   int Link(const AID &to);
142 
143   // Unlink the remote actor
144   int UnLink(const AID &to);
145 
146   // Reconnect to the remote actor
147   int Reconnect(const AID &to);
148 
149   void Terminate();
150   void Await();
151 
152  private:
153   friend class ActorMgr;
154   friend class ActorWorker;
155 
156   // KMSG Msg Handler
157   virtual void HandlekMsg(const std::unique_ptr<MessageBase> &msg);
158 
159   template <typename T>
BehaviorBase(T * t,void (T::* method)(const mindspore::AID &,std::string &&,std::string &&),const std::unique_ptr<MessageBase> & msg)160   static void BehaviorBase(T *t, void (T::*method)(const mindspore::AID &, std::string &&, std::string &&),
161                            const std::unique_ptr<MessageBase> &msg) {
162     MINDRT_OOM_EXIT(msg);
163     if (msg->type != MessageBase::Type::KMSG) {
164       MS_LOG(ERROR) << "Drop non-tcp message: from:" << std::string(msg->from).c_str()
165                     << ",to:" << std::string(msg->to).c_str() << ",name:" << msg->name.c_str();
166       return;
167     }
168     (t->*method)(msg->from, std::move(msg->name), std::move(msg->body));
169   }
170 
171   // register the message handle. It will be discarded.
172   template <typename T>
BehaviorBase1(T * t,void (T::* method)(mindspore::AID,std::string &&,std::string &&),const std::unique_ptr<MessageBase> & msg)173   static void BehaviorBase1(T *t, void (T::*method)(mindspore::AID, std::string &&, std::string &&),
174                             const std::unique_ptr<MessageBase> &msg) {
175     MINDRT_OOM_EXIT(msg);
176     if (msg->type != MessageBase::Type::KMSG) {
177       MS_LOG(ERROR) << "Drop non-tcp message:  from:" << std::string(msg->from).c_str()
178                     << ",to:" << std::string(msg->to).c_str() << ",name:" << msg->name.c_str();
179       return;
180     }
181     (t->*method)(msg->from, std::move(msg->name), std::move(msg->body));
182   }
183 
184   // register the udp message handle. Use this closure function to drop non-udp messages
185   template <typename T>
BehaviorBaseForUdp(T * t,void (T::* method)(const mindspore::AID &,std::string &&,std::string &&),const std::unique_ptr<MessageBase> & msg)186   static void BehaviorBaseForUdp(T *t, void (T::*method)(const mindspore::AID &, std::string &&, std::string &&),
187                                  const std::unique_ptr<MessageBase> &msg) {
188     MINDRT_OOM_EXIT(msg);
189     if (msg->type != MessageBase::Type::KUDP) {
190       MS_LOG(ERROR) << "Drop non-udp message:  from:" << std::string(msg->from).c_str()
191                     << ",to:" << std::string(msg->to).c_str() << ",name:" << msg->name.c_str();
192       return;
193     }
194     (t->*method)(msg->from, std::move(msg->name), std::move(msg->body));
195   }
196 
197   void Run();
198   void Quit();
199   int EnqueMessage(std::unique_ptr<MessageBase> msg) const;
200 
201   void Spawn(const std::shared_ptr<ActorBase>, std::unique_ptr<MailBox> mailbox);
202 
203   std::unique_ptr<MailBox> mailbox;
204   std::atomic_bool terminating_ = false;
205 
206   AID id;
207   std::map<std::string, ActorFunction> actionFunctions;
208   std::mutex waiterLock;
209 
210   std::string msgRecords[MAX_ACTOR_RECORD_SIZE];
211   uint32_t recordNextPoint = 0;
212 
213   ActorThreadPool *pool_{nullptr};
214 };
215 
216 using ActorReference = std::shared_ptr<ActorBase>;
217 };  // namespace mindspore
218 #endif
219