• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "actor/actor.h"
18 #include "actor/actormgr.h"
19 #include "actor/iomgr.h"
20 
21 namespace mindspore {
ActorBase()22 ActorBase::ActorBase() : mailbox(nullptr), id("", ActorMgr::GetActorMgrRef()->GetUrl()), actionFunctions() {}
23 
ActorBase(const std::string & name)24 ActorBase::ActorBase(const std::string &name)
25     : mailbox(nullptr), id(name, ActorMgr::GetActorMgrRef()->GetUrl()), actionFunctions() {}
26 
ActorBase(const std::string & name,ActorThreadPool * pool)27 ActorBase::ActorBase(const std::string &name, ActorThreadPool *pool)
28     : mailbox(nullptr), id(name, ActorMgr::GetActorMgrRef()->GetUrl()), actionFunctions(), pool_(pool) {}
29 
~ActorBase()30 ActorBase::~ActorBase() {}
31 
Spawn(const std::shared_ptr<ActorBase>,std::unique_ptr<MailBox> mailboxPtr)32 void ActorBase::Spawn(const std::shared_ptr<ActorBase>, std::unique_ptr<MailBox> mailboxPtr) {
33   // lock here or await(). and unlock at Quit() or at await.
34   waiterLock.Wait();
35   this->mailbox = std::move(mailboxPtr);
36 }
37 
Await()38 void ActorBase::Await() {
39   std::string actorName = id.Name();
40   // lock here or at spawn(). and unlock here or at worker(). wait for the worker to finish.
41   MS_LOG(DEBUG) << "ACTOR is waiting for terminate to finish. a=" << actorName.c_str();
42   waiterLock.Wait();
43   waiterLock.Signal();
44 
45   // mailbox's hook may hold the actor reference, we need explicitly free the mailbox to avoid the memory leak. the
46   // details can refer to the comments in ActorMgr::Spawn
47   delete mailbox.release();
48   MS_LOG(DEBUG) << "ACTOR succeeded in waiting. a=" << actorName.c_str();
49 }
Terminate()50 void ActorBase::Terminate() {
51   bool flag = false;
52   if (terminating_.compare_exchange_strong(flag, true)) {
53     std::unique_ptr<MessageBase> msg(new (std::nothrow) MessageBase("Terminate", MessageBase::Type::KTERMINATE));
54     MINDRT_OOM_EXIT(msg);
55     (void)EnqueMessage(std::move(msg));
56   }
57 }
58 
HandlekMsg(const std::unique_ptr<MessageBase> & msg)59 void ActorBase::HandlekMsg(const std::unique_ptr<MessageBase> &msg) {
60   auto it = actionFunctions.find(msg->Name());
61   if (it != actionFunctions.end()) {
62     ActorFunction &func = it->second;
63     func(msg);
64   } else {
65     MS_LOG(WARNING) << "ACTOR can not find function for message, a=" << id.Name().c_str()
66                     << ",m=" << msg->Name().c_str();
67   }
68 }
EnqueMessage(std::unique_ptr<MessageBase> msg) const69 int ActorBase::EnqueMessage(std::unique_ptr<MessageBase> msg) const {
70   int ret = mailbox->EnqueueMessage(std::move(msg));
71   return ret;
72 }
73 
Quit()74 void ActorBase::Quit() {
75   Finalize();
76   // lock at spawn(), unlock here.
77   waiterLock.Signal();
78 }
79 
Run()80 void ActorBase::Run() {
81   auto msgHandler = [this](const std::unique_ptr<MessageBase> &msg) {
82     switch (msg->GetType()) {
83       case MessageBase::Type::KMSG:
84       case MessageBase::Type::KUDP: {
85         if (Filter(msg)) {
86           return ERRORCODE_SUCCESS;
87         }
88         this->HandlekMsg(msg);
89         return ERRORCODE_SUCCESS;
90       }
91       case MessageBase::Type::KHTTP: {
92         this->HandleHttp(msg);
93         return ERRORCODE_SUCCESS;
94       }
95       case MessageBase::Type::KASYNC: {
96         msg->Run(this);
97         return ERRORCODE_SUCCESS;
98       }
99       case MessageBase::Type::KLOCAL: {
100         this->HandleLocalMsg(msg);
101         return ERRORCODE_SUCCESS;
102       }
103       case MessageBase::Type::KTERMINATE: {
104         this->Quit();
105         return ACTOR_TERMINATED;
106       }
107       case MessageBase::Type::KEXIT: {
108         this->Exited(msg->From());
109         return ERRORCODE_SUCCESS;
110       }
111     }
112     return ERRORCODE_SUCCESS;
113   };
114 
115   if (this->mailbox->TakeAllMsgsEachTime()) {
116     while (auto msgs = mailbox->GetMsgs()) {
117       for (auto it = msgs->begin(); it != msgs->end(); ++it) {
118         std::unique_ptr<MessageBase> &msg = *it;
119         if (msg == nullptr) {
120           continue;
121         }
122         if (msgHandler(msg) == ACTOR_TERMINATED) {
123           return;
124         }
125         msg.reset(nullptr);
126       }
127       msgs->clear();
128     }
129   } else {
130     while (auto msg = mailbox->GetMsg()) {
131       if (msgHandler(msg) == ACTOR_TERMINATED) {
132         return;
133       }
134     }
135   }
136   return;
137 }
138 
Send(const AID & to,std::unique_ptr<MessageBase> msg)139 int ActorBase::Send(const AID &to, std::unique_ptr<MessageBase> msg) {
140   msg->SetFrom(id);
141   return ActorMgr::GetActorMgrRef()->Send(to, std::move(msg));
142 }
Send(const AID & to,std::string && name,std::string && strMsg,bool remoteLink,bool isExactNotRemote)143 int ActorBase::Send(const AID &to, std::string &&name, std::string &&strMsg, bool remoteLink, bool isExactNotRemote) {
144   std::unique_ptr<MessageBase> msg(
145     new (std::nothrow) MessageBase(this->id, to, std::move(name), std::move(strMsg), MessageBase::Type::KMSG));
146   MINDRT_OOM_EXIT(msg);
147   return ActorMgr::GetActorMgrRef()->Send(to, std::move(msg), remoteLink, isExactNotRemote);
148 }
149 
150 // register the message handle
Receive(const std::string & msgName,ActorFunction && func)151 void ActorBase::Receive(const std::string &msgName, ActorFunction &&func) {
152   if (actionFunctions.find(msgName) != actionFunctions.end()) {
153     MS_LOG(ERROR) << "ACTOR function's name conflicts, a=" << id.Name().c_str() << ",f=" << msgName.c_str();
154     MINDRT_EXIT("function's name conflicts");
155     return;
156   }
157   actionFunctions.emplace(msgName, std::move(func));
158   return;
159 }
160 
Link(const AID & to)161 int ActorBase::Link(const AID &to) {
162   auto io = ActorMgr::GetIOMgrRef(to);
163   if (io != nullptr) {
164     if (to.OK()) {
165       io->Link(this->GetAID(), to);
166       return ERRORCODE_SUCCESS;
167     } else {
168       return ACTOR_PARAMER_ERR;
169     }
170   } else {
171     return IO_NOT_FIND;
172   }
173 }
UnLink(const AID & to)174 int ActorBase::UnLink(const AID &to) {
175   auto io = ActorMgr::GetIOMgrRef(to);
176   if (io != nullptr) {
177     if (to.OK()) {
178       io->UnLink(to);
179       return ERRORCODE_SUCCESS;
180     } else {
181       return ACTOR_PARAMER_ERR;
182     }
183   } else {
184     return IO_NOT_FIND;
185   }
186 }
187 
Reconnect(const AID & to)188 int ActorBase::Reconnect(const AID &to) {
189   auto io = ActorMgr::GetIOMgrRef(to);
190   if (io != nullptr) {
191     if (to.OK()) {
192       io->Reconnect(this->GetAID(), to);
193       return ERRORCODE_SUCCESS;
194     } else {
195       return ACTOR_PARAMER_ERR;
196     }
197   } else {
198     return IO_NOT_FIND;
199   }
200 }
201 
GetOutBufSize(const AID & to)202 uint64_t ActorBase::GetOutBufSize(const AID &to) {
203   auto io = ActorMgr::GetIOMgrRef(to);
204   if (io != nullptr) {
205     return io->GetOutBufSize();
206   } else {
207     return 0;
208   }
209 }
210 
GetInBufSize(const AID & to)211 uint64_t ActorBase::GetInBufSize(const AID &to) {
212   auto io = ActorMgr::GetIOMgrRef(to);
213   if (io != nullptr) {
214     return io->GetInBufSize();
215   } else {
216     return 0;
217   }
218 }
219 
AddRuleUdp(const std::string & peer,int recordNum)220 int ActorBase::AddRuleUdp(const std::string &peer, int recordNum) {
221   const std::string udp = MINDRT_UDP;
222   auto io = ActorMgr::GetIOMgrRef(udp);
223   if (io != nullptr) {
224     return io->AddRuleUdp(peer, recordNum);
225   } else {
226     return 0;
227   }
228 }
229 
DelRuleUdp(const std::string & peer,bool outputLog)230 void ActorBase::DelRuleUdp(const std::string &peer, bool outputLog) {
231   const std::string udp = MINDRT_UDP;
232   auto io = ActorMgr::GetIOMgrRef(udp);
233   if (io != nullptr) {
234     io->DelRuleUdp(peer, outputLog);
235   }
236 }
237 }  // namespace mindspore
238