1 /** 2 * Copyright 2021-2023 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ACTOR_ACTOR_H 18 #define MINDSPORE_CORE_MINDRT_INCLUDE_ACTOR_ACTOR_H 19 20 #include <functional> 21 #include <map> 22 #include <memory> 23 #include <mutex> 24 #include <string> 25 #include <utility> 26 #include "thread/hqueue.h" 27 #include "thread/semaphore.h" 28 #include "actor/msg.h" 29 #include "actor/mailbox.h" 30 31 namespace mindspore { 32 class ActorMgr; 33 class ActorWorker; 34 class ActorThreadPool; 35 36 // should be at least greater than 1 37 constexpr uint32_t MAX_ACTOR_RECORD_SIZE = 3; 38 39 class MS_CORE_API ActorBase { 40 public: GetAID()41 inline const AID &GetAID() const { return id; } 42 AddMsgRecord(const std::string & msgName)43 inline void AddMsgRecord(const std::string &msgName) { 44 recordNextPoint++; 45 uint32_t startPoint = recordNextPoint % MAX_ACTOR_RECORD_SIZE; 46 msgRecords[startPoint] = msgName; 47 } 48 PrintMsgRecord()49 inline void PrintMsgRecord() { 50 uint32_t startPoint = recordNextPoint % MAX_ACTOR_RECORD_SIZE; 51 for (uint32_t i = 0; i < MAX_ACTOR_RECORD_SIZE; i++) { 52 MS_LOG(DEBUG) << "Actor message dumps:" 53 << "actor:" << id.Name().c_str() << " msg:" << msgRecords[startPoint].c_str(); 54 startPoint = (startPoint + MAX_ACTOR_RECORD_SIZE - 1) % MAX_ACTOR_RECORD_SIZE; 55 } 56 } 57 58 ActorBase(); 59 explicit ActorBase(const std::string &name); 60 explicit ActorBase(const std::string &name, ActorThreadPool *pool); 61 virtual ~ActorBase(); 62 63 // send MessageBase message to the actor. 64 int Send(const AID &to, std::unique_ptr<MessageBase> msg); 65 66 // send string message to the actor 67 int Send(const AID &to, std::string &&name, std::string &&msg, bool remoteLink = false, 68 bool isExactNotRemote = false); 69 70 // get output buffer size for flow control 71 uint64_t GetOutBufSize(const AID &to); 72 73 // get input buffer size for flow control 74 uint64_t GetInBufSize(const AID &to); 75 76 // set record send/receive message package size 77 int AddRuleUdp(const std::string &peer, int recordNum); 78 79 // delete the send/receive message package size 80 void DelRuleUdp(const std::string &peer, bool outputLog); 81 set_thread_pool(ActorThreadPool * pool)82 void set_thread_pool(ActorThreadPool *pool) { pool_ = pool; } 83 84 // Judge if actor running by the received message number, the default is true. IsActive(int msg_num)85 virtual bool IsActive(int msg_num) { return true; } 86 set_actor_mgr(const std::shared_ptr<ActorMgr> & mgr)87 inline void set_actor_mgr(const std::shared_ptr<ActorMgr> &mgr) { actor_mgr_ = mgr; } get_actor_mgr()88 inline std::shared_ptr<ActorMgr> get_actor_mgr() const { return actor_mgr_; } 89 90 protected: 91 using ActorFunction = std::function<void(const std::unique_ptr<MessageBase> &msg)>; 92 93 // install KMSG handler . This method will be called before the actor start to run. Init()94 virtual void Init() {} 95 96 // This method will be called before the actor start to terminate. Finalize()97 virtual void Finalize() {} 98 99 // KHTTPMsg handler HandleHttp(const std::unique_ptr<MessageBase> & msg)100 virtual void HandleHttp(const std::unique_ptr<MessageBase> &msg) { 101 MS_LOG(ERROR) << "ACTOR (" << id.Name().c_str() << ") HandleHttp() is not implemented"; 102 } 103 104 // KLOCALMsg handler HandleLocalMsg(const std::unique_ptr<MessageBase> & msg)105 virtual void HandleLocalMsg(const std::unique_ptr<MessageBase> &msg) { 106 MS_LOG(ERROR) << "ACTOR (" << id.Name().c_str() << ") HandleLocalMsg() is not implemented."; 107 } 108 109 // The link is closed. Exited(const AID & actor)110 virtual void Exited(const AID &actor) { 111 MS_LOG(ERROR) << "ACTOR (" << id.Name().c_str() << ") Exited() is not implemented. "; 112 } 113 114 // Filter the KMSG Filter(const std::unique_ptr<MessageBase> & msg)115 virtual bool Filter(const std::unique_ptr<MessageBase> &msg) { return false; } 116 117 // register the message handle 118 void Receive(const std::string &msgName, ActorFunction &&func); 119 120 // register the message handle. It will be discarded. 121 template <typename T> Receive(const std::string & msgName,void (T::* method)(mindspore::AID,std::string &&,std::string &&))122 void Receive(const std::string &msgName, void (T::*method)(mindspore::AID, std::string &&, std::string &&)) { 123 ActorFunction func = std::bind(&BehaviorBase1<T>, static_cast<T *>(this), method, std::placeholders::_1); 124 Receive(msgName, std::move(func)); 125 } 126 127 // register the message handle 128 template <typename T> Receive(const std::string & msgName,void (T::* method)(const mindspore::AID &,std::string &&,std::string &&))129 void Receive(const std::string &msgName, void (T::*method)(const mindspore::AID &, std::string &&, std::string &&)) { 130 ActorFunction func = std::bind(&BehaviorBase<T>, static_cast<T *>(this), method, std::placeholders::_1); 131 Receive(msgName, std::move(func)); 132 return; 133 } 134 135 // register the message handle, for kmsg-udp message 136 template <typename T> ReceiveUdp(const std::string & msgName,void (T::* method)(const mindspore::AID &,std::string &&,std::string &&))137 void ReceiveUdp(const std::string &msgName, 138 void (T::*method)(const mindspore::AID &, std::string &&, std::string &&)) { 139 ActorFunction func = std::bind(&BehaviorBaseForUdp<T>, static_cast<T *>(this), method, std::placeholders::_1); 140 Receive(msgName, std::move(func)); 141 return; 142 } 143 144 // Link the remote actor 145 int Link(const AID &to); 146 147 // Unlink the remote actor 148 int UnLink(const AID &to); 149 150 // Reconnect to the remote actor 151 int Reconnect(const AID &to); 152 153 void Terminate(); 154 void Await(); 155 156 private: 157 friend class ActorMgr; 158 friend class ActorWorker; 159 friend class ParallelWorker; 160 161 // KMSG Msg Handler 162 virtual void HandlekMsg(const std::unique_ptr<MessageBase> &msg); 163 164 template <typename T> BehaviorBase(T * t,void (T::* method)(const mindspore::AID &,std::string &&,std::string &&),const std::unique_ptr<MessageBase> & msg)165 static void BehaviorBase(T *t, void (T::*method)(const mindspore::AID &, std::string &&, std::string &&), 166 const std::unique_ptr<MessageBase> &msg) { 167 MINDRT_OOM_EXIT(msg); 168 if (msg->type != MessageBase::Type::KMSG) { 169 MS_LOG(ERROR) << "Drop non-tcp message: from:" << std::string(msg->from).c_str() 170 << ",to:" << std::string(msg->to).c_str() << ",name:" << msg->name.c_str(); 171 return; 172 } 173 (t->*method)(msg->from, std::move(msg->name), std::move(msg->body)); 174 } 175 176 // register the message handle. It will be discarded. 177 template <typename T> BehaviorBase1(T * t,void (T::* method)(mindspore::AID,std::string &&,std::string &&),const std::unique_ptr<MessageBase> & msg)178 static void BehaviorBase1(T *t, void (T::*method)(mindspore::AID, std::string &&, std::string &&), 179 const std::unique_ptr<MessageBase> &msg) { 180 MINDRT_OOM_EXIT(msg); 181 if (msg->type != MessageBase::Type::KMSG) { 182 MS_LOG(ERROR) << "Drop non-tcp message: from:" << std::string(msg->from).c_str() 183 << ",to:" << std::string(msg->to).c_str() << ",name:" << msg->name.c_str(); 184 return; 185 } 186 (t->*method)(msg->from, std::move(msg->name), std::move(msg->body)); 187 } 188 189 // register the udp message handle. Use this closure function to drop non-udp messages 190 template <typename T> BehaviorBaseForUdp(T * t,void (T::* method)(const mindspore::AID &,std::string &&,std::string &&),const std::unique_ptr<MessageBase> & msg)191 static void BehaviorBaseForUdp(T *t, void (T::*method)(const mindspore::AID &, std::string &&, std::string &&), 192 const std::unique_ptr<MessageBase> &msg) { 193 MINDRT_OOM_EXIT(msg); 194 if (msg->type != MessageBase::Type::KUDP) { 195 MS_LOG(ERROR) << "Drop non-udp message: from:" << std::string(msg->from).c_str() 196 << ",to:" << std::string(msg->to).c_str() << ",name:" << msg->name.c_str(); 197 return; 198 } 199 (t->*method)(msg->from, std::move(msg->name), std::move(msg->body)); 200 } 201 202 void Run(); 203 void Quit(); 204 int EnqueMessage(std::unique_ptr<MessageBase> msg) const; 205 206 void Spawn(const std::shared_ptr<ActorBase>, std::unique_ptr<MailBox> mailbox); 207 208 std::unique_ptr<MailBox> mailbox; 209 std::atomic_bool terminating_{false}; 210 211 AID id; 212 std::map<std::string, ActorFunction> actionFunctions; 213 Semaphore waiterLock{1}; 214 std::string msgRecords[MAX_ACTOR_RECORD_SIZE]; 215 uint32_t recordNextPoint = 0; 216 217 ActorThreadPool *pool_{nullptr}; 218 std::shared_ptr<ActorMgr> actor_mgr_; 219 }; 220 using ActorReference = std::shared_ptr<ActorBase>; 221 }; // namespace mindspore 222 #endif 223