1 /** 2 * Copyright 2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ACTOR_ACTOR_H 18 #define MINDSPORE_CORE_MINDRT_INCLUDE_ACTOR_ACTOR_H 19 20 #include <functional> 21 #include <map> 22 #include <memory> 23 #include <mutex> 24 #include <string> 25 #include <utility> 26 #include "thread/hqueue.h" 27 #include "actor/msg.h" 28 #include "actor/mailbox.h" 29 30 namespace mindspore { 31 class ActorMgr; 32 class ActorWorker; 33 class ActorThreadPool; 34 35 // should be at least greater than 1 36 constexpr uint32_t MAX_ACTOR_RECORD_SIZE = 3; 37 38 class ActorBase { 39 public: GetAID()40 inline const AID &GetAID() const { return id; } 41 AddMsgRecord(const std::string & msgName)42 inline void AddMsgRecord(const std::string &msgName) { 43 recordNextPoint++; 44 uint32_t startPoint = recordNextPoint % MAX_ACTOR_RECORD_SIZE; 45 msgRecords[startPoint] = msgName; 46 } 47 PrintMsgRecord()48 inline void PrintMsgRecord() { 49 uint32_t startPoint = recordNextPoint % MAX_ACTOR_RECORD_SIZE; 50 for (uint32_t i = 0; i < MAX_ACTOR_RECORD_SIZE; i++) { 51 MS_LOG(DEBUG) << "Actor message dumps:" 52 << "actor:" << id.Name().c_str() << " msg:" << msgRecords[startPoint].c_str(); 53 startPoint = (startPoint + MAX_ACTOR_RECORD_SIZE - 1) % MAX_ACTOR_RECORD_SIZE; 54 } 55 } 56 57 ActorBase(); 58 explicit ActorBase(const std::string &name); 59 explicit ActorBase(const std::string &name, ActorThreadPool *pool); 60 virtual ~ActorBase(); 61 62 // send MessageBase message to the actor. 63 int Send(const AID &to, std::unique_ptr<MessageBase> msg); 64 65 // send string message to the actor 66 int Send(const AID &to, std::string &&name, std::string &&msg, bool remoteLink = false, 67 bool isExactNotRemote = false); 68 69 // get output buffer size for flow control 70 uint64_t GetOutBufSize(const AID &to); 71 72 // get input buffer size for flow control 73 uint64_t GetInBufSize(const AID &to); 74 75 // set record send/receive message package size 76 int AddRuleUdp(const std::string &peer, int recordNum); 77 78 // delete the send/receive message package size 79 void DelRuleUdp(const std::string &peer, bool outputLog); 80 set_thread_pool(ActorThreadPool * pool)81 void set_thread_pool(ActorThreadPool *pool) { pool_ = pool; } 82 83 // Judge if actor running by the received message number, the default is true. IsActive(int msg_num)84 virtual bool IsActive(int msg_num) { return true; } 85 86 protected: 87 using ActorFunction = std::function<void(const std::unique_ptr<MessageBase> &msg)>; 88 89 // install KMSG handler . This method will be called before the actor start to run. Init()90 virtual void Init() {} 91 92 // This method will be called before the actor start to terminate. Finalize()93 virtual void Finalize() {} 94 95 // KHTTPMsg handler HandleHttp(const std::unique_ptr<MessageBase> & msg)96 virtual void HandleHttp(const std::unique_ptr<MessageBase> &msg) { 97 MS_LOG(ERROR) << "ACTOR (" << id.Name().c_str() << ") HandleHttp() is not implemented"; 98 } 99 100 // KLOCALMsg handler HandleLocalMsg(const std::unique_ptr<MessageBase> & msg)101 virtual void HandleLocalMsg(const std::unique_ptr<MessageBase> &msg) { 102 MS_LOG(ERROR) << "ACTOR (" << id.Name().c_str() << ") HandleLocalMsg() is not implemented."; 103 } 104 105 // The link is closed. Exited(const AID & actor)106 virtual void Exited(const AID &actor) { 107 MS_LOG(ERROR) << "ACTOR (" << id.Name().c_str() << ") Exited() is not implemented. "; 108 } 109 110 // Filter the KMSG Filter(const std::unique_ptr<MessageBase> & msg)111 virtual bool Filter(const std::unique_ptr<MessageBase> &msg) { return false; } 112 113 // register the message handle 114 void Receive(const std::string &msgName, ActorFunction &&func); 115 116 // register the message handle. It will be discarded. 117 template <typename T> Receive(const std::string & msgName,void (T::* method)(mindspore::AID,std::string &&,std::string &&))118 void Receive(const std::string &msgName, void (T::*method)(mindspore::AID, std::string &&, std::string &&)) { 119 ActorFunction func = std::bind(&BehaviorBase1<T>, static_cast<T *>(this), method, std::placeholders::_1); 120 Receive(msgName, std::move(func)); 121 } 122 123 // register the message handle 124 template <typename T> Receive(const std::string & msgName,void (T::* method)(const mindspore::AID &,std::string &&,std::string &&))125 void Receive(const std::string &msgName, void (T::*method)(const mindspore::AID &, std::string &&, std::string &&)) { 126 ActorFunction func = std::bind(&BehaviorBase<T>, static_cast<T *>(this), method, std::placeholders::_1); 127 Receive(msgName, std::move(func)); 128 return; 129 } 130 131 // register the message handle, for kmsg-udp message 132 template <typename T> ReceiveUdp(const std::string & msgName,void (T::* method)(const mindspore::AID &,std::string &&,std::string &&))133 void ReceiveUdp(const std::string &msgName, 134 void (T::*method)(const mindspore::AID &, std::string &&, std::string &&)) { 135 ActorFunction func = std::bind(&BehaviorBaseForUdp<T>, static_cast<T *>(this), method, std::placeholders::_1); 136 Receive(msgName, std::move(func)); 137 return; 138 } 139 140 // Link the remote actor 141 int Link(const AID &to); 142 143 // Unlink the remote actor 144 int UnLink(const AID &to); 145 146 // Reconnect to the remote actor 147 int Reconnect(const AID &to); 148 149 void Terminate(); 150 void Await(); 151 152 private: 153 friend class ActorMgr; 154 friend class ActorWorker; 155 156 // KMSG Msg Handler 157 virtual void HandlekMsg(const std::unique_ptr<MessageBase> &msg); 158 159 template <typename T> BehaviorBase(T * t,void (T::* method)(const mindspore::AID &,std::string &&,std::string &&),const std::unique_ptr<MessageBase> & msg)160 static void BehaviorBase(T *t, void (T::*method)(const mindspore::AID &, std::string &&, std::string &&), 161 const std::unique_ptr<MessageBase> &msg) { 162 MINDRT_OOM_EXIT(msg); 163 if (msg->type != MessageBase::Type::KMSG) { 164 MS_LOG(ERROR) << "Drop non-tcp message: from:" << std::string(msg->from).c_str() 165 << ",to:" << std::string(msg->to).c_str() << ",name:" << msg->name.c_str(); 166 return; 167 } 168 (t->*method)(msg->from, std::move(msg->name), std::move(msg->body)); 169 } 170 171 // register the message handle. It will be discarded. 172 template <typename T> BehaviorBase1(T * t,void (T::* method)(mindspore::AID,std::string &&,std::string &&),const std::unique_ptr<MessageBase> & msg)173 static void BehaviorBase1(T *t, void (T::*method)(mindspore::AID, std::string &&, std::string &&), 174 const std::unique_ptr<MessageBase> &msg) { 175 MINDRT_OOM_EXIT(msg); 176 if (msg->type != MessageBase::Type::KMSG) { 177 MS_LOG(ERROR) << "Drop non-tcp message: from:" << std::string(msg->from).c_str() 178 << ",to:" << std::string(msg->to).c_str() << ",name:" << msg->name.c_str(); 179 return; 180 } 181 (t->*method)(msg->from, std::move(msg->name), std::move(msg->body)); 182 } 183 184 // register the udp message handle. Use this closure function to drop non-udp messages 185 template <typename T> BehaviorBaseForUdp(T * t,void (T::* method)(const mindspore::AID &,std::string &&,std::string &&),const std::unique_ptr<MessageBase> & msg)186 static void BehaviorBaseForUdp(T *t, void (T::*method)(const mindspore::AID &, std::string &&, std::string &&), 187 const std::unique_ptr<MessageBase> &msg) { 188 MINDRT_OOM_EXIT(msg); 189 if (msg->type != MessageBase::Type::KUDP) { 190 MS_LOG(ERROR) << "Drop non-udp message: from:" << std::string(msg->from).c_str() 191 << ",to:" << std::string(msg->to).c_str() << ",name:" << msg->name.c_str(); 192 return; 193 } 194 (t->*method)(msg->from, std::move(msg->name), std::move(msg->body)); 195 } 196 197 void Run(); 198 void Quit(); 199 int EnqueMessage(std::unique_ptr<MessageBase> msg) const; 200 201 void Spawn(const std::shared_ptr<ActorBase>, std::unique_ptr<MailBox> mailbox); 202 203 std::unique_ptr<MailBox> mailbox; 204 std::atomic_bool terminating_ = false; 205 206 AID id; 207 std::map<std::string, ActorFunction> actionFunctions; 208 std::mutex waiterLock; 209 210 std::string msgRecords[MAX_ACTOR_RECORD_SIZE]; 211 uint32_t recordNextPoint = 0; 212 213 ActorThreadPool *pool_{nullptr}; 214 }; 215 216 using ActorReference = std::shared_ptr<ActorBase>; 217 }; // namespace mindspore 218 #endif 219