• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "actor/actor.h"
18 #include "actor/actormgr.h"
19 #include "actor/iomgr.h"
20 
21 namespace mindspore {
ActorBase()22 ActorBase::ActorBase() : mailbox(nullptr), id("", ActorMgr::GetActorMgrRef()->GetUrl()), actionFunctions() {}
23 
ActorBase(const std::string & name)24 ActorBase::ActorBase(const std::string &name)
25     : mailbox(nullptr), id(name, ActorMgr::GetActorMgrRef()->GetUrl()), actionFunctions() {}
26 
ActorBase(const std::string & name,ActorThreadPool * pool)27 ActorBase::ActorBase(const std::string &name, ActorThreadPool *pool)
28     : mailbox(nullptr), id(name, ActorMgr::GetActorMgrRef()->GetUrl()), actionFunctions(), pool_(pool) {}
29 
~ActorBase()30 ActorBase::~ActorBase() {}
31 
Spawn(const std::shared_ptr<ActorBase>,std::unique_ptr<MailBox> mailboxPtr)32 void ActorBase::Spawn(const std::shared_ptr<ActorBase>, std::unique_ptr<MailBox> mailboxPtr) {
33   // lock here or await(). and unlock at Quit() or at await.
34   waiterLock.lock();
35   this->mailbox = std::move(mailboxPtr);
36 }
37 
Await()38 void ActorBase::Await() {
39   std::string actorName = id.Name();
40   // lock here or at spawn(). and unlock here or at worker(). wait for the worker to finish.
41   MS_LOG(DEBUG) << "ACTOR is waiting for terminate to finish. a=" << actorName.c_str();
42 
43   waiterLock.lock();
44   waiterLock.unlock();
45 
46   // mailbox's hook may hold the actor reference, we need explicitly free the mailbox to avoid the memory leak. the
47   // details can refer to the comments in ActorMgr::Spawn
48   delete mailbox.release();
49   MS_LOG(DEBUG) << "ACTOR succeeded in waiting. a=" << actorName.c_str();
50 }
Terminate()51 void ActorBase::Terminate() {
52   bool flag = false;
53   if (terminating_.compare_exchange_strong(flag, true)) {
54     std::unique_ptr<MessageBase> msg(new (std::nothrow) MessageBase("Terminate", MessageBase::Type::KTERMINATE));
55     MINDRT_OOM_EXIT(msg);
56     (void)EnqueMessage(std::move(msg));
57   }
58 }
59 
HandlekMsg(const std::unique_ptr<MessageBase> & msg)60 void ActorBase::HandlekMsg(const std::unique_ptr<MessageBase> &msg) {
61   auto it = actionFunctions.find(msg->Name());
62   if (it != actionFunctions.end()) {
63     ActorFunction &func = it->second;
64     func(msg);
65   } else {
66     MS_LOG(WARNING) << "ACTOR can not find function for message, a=" << id.Name().c_str()
67                     << ",m=" << msg->Name().c_str();
68   }
69 }
EnqueMessage(std::unique_ptr<MessageBase> msg) const70 int ActorBase::EnqueMessage(std::unique_ptr<MessageBase> msg) const {
71   int ret = mailbox->EnqueueMessage(std::move(msg));
72   return ret;
73 }
74 
Quit()75 void ActorBase::Quit() {
76   Finalize();
77   // lock at spawn(), unlock here.
78   waiterLock.unlock();
79 }
80 
Run()81 void ActorBase::Run() {
82   auto msgHandler = [this](const std::unique_ptr<MessageBase> &msg) {
83     AddMsgRecord(msg->Name());
84     switch (msg->GetType()) {
85       case MessageBase::Type::KMSG:
86       case MessageBase::Type::KUDP: {
87         if (Filter(msg)) {
88           return ERRORCODE_SUCCESS;
89         }
90         this->HandlekMsg(msg);
91         return ERRORCODE_SUCCESS;
92       }
93       case MessageBase::Type::KHTTP: {
94         this->HandleHttp(msg);
95         return ERRORCODE_SUCCESS;
96       }
97       case MessageBase::Type::KASYNC: {
98         msg->Run(this);
99         return ERRORCODE_SUCCESS;
100       }
101       case MessageBase::Type::KLOCAL: {
102         this->HandleLocalMsg(msg);
103         return ERRORCODE_SUCCESS;
104       }
105       case MessageBase::Type::KTERMINATE: {
106         this->Quit();
107         return ACTOR_TERMINATED;
108       }
109       case MessageBase::Type::KEXIT: {
110         this->Exited(msg->From());
111         return ERRORCODE_SUCCESS;
112       }
113     }
114     return ERRORCODE_SUCCESS;
115   };
116 
117   if (this->mailbox->TakeAllMsgsEachTime()) {
118     while (auto msgs = mailbox->GetMsgs()) {
119       for (auto it = msgs->begin(); it != msgs->end(); ++it) {
120         std::unique_ptr<MessageBase> &msg = *it;
121         if (msg == nullptr) {
122           continue;
123         }
124         MS_LOG(DEBUG) << "dequeue message]actor=" << id.Name() << ",msg=" << msg->Name();
125         if (msgHandler(msg) == ACTOR_TERMINATED) {
126           return;
127         }
128       }
129       msgs->clear();
130     }
131   } else {
132     while (auto msg = mailbox->GetMsg()) {
133       if (msgHandler(msg) == ACTOR_TERMINATED) {
134         return;
135       }
136     }
137   }
138   return;
139 }
140 
Send(const AID & to,std::unique_ptr<MessageBase> msg)141 int ActorBase::Send(const AID &to, std::unique_ptr<MessageBase> msg) {
142   msg->SetFrom(id);
143   return ActorMgr::GetActorMgrRef()->Send(to, std::move(msg));
144 }
Send(const AID & to,std::string && name,std::string && strMsg,bool remoteLink,bool isExactNotRemote)145 int ActorBase::Send(const AID &to, std::string &&name, std::string &&strMsg, bool remoteLink, bool isExactNotRemote) {
146   std::unique_ptr<MessageBase> msg(
147     new (std::nothrow) MessageBase(this->id, to, std::move(name), std::move(strMsg), MessageBase::Type::KMSG));
148   MINDRT_OOM_EXIT(msg);
149   return ActorMgr::GetActorMgrRef()->Send(to, std::move(msg), remoteLink, isExactNotRemote);
150 }
151 
152 // register the message handle
Receive(const std::string & msgName,ActorFunction && func)153 void ActorBase::Receive(const std::string &msgName, ActorFunction &&func) {
154   if (actionFunctions.find(msgName) != actionFunctions.end()) {
155     MS_LOG(ERROR) << "ACTOR function's name conflicts, a=" << id.Name().c_str() << ",f=" << msgName.c_str();
156     MINDRT_EXIT("function's name conflicts");
157     return;
158   }
159   actionFunctions.emplace(msgName, std::move(func));
160   return;
161 }
162 
Link(const AID & to)163 int ActorBase::Link(const AID &to) {
164   auto io = ActorMgr::GetIOMgrRef(to);
165   if (io != nullptr) {
166     if (to.OK()) {
167       io->Link(this->GetAID(), to);
168       return ERRORCODE_SUCCESS;
169     } else {
170       return ACTOR_PARAMER_ERR;
171     }
172   } else {
173     return IO_NOT_FIND;
174   }
175 }
UnLink(const AID & to)176 int ActorBase::UnLink(const AID &to) {
177   auto io = ActorMgr::GetIOMgrRef(to);
178   if (io != nullptr) {
179     if (to.OK()) {
180       io->UnLink(to);
181       return ERRORCODE_SUCCESS;
182     } else {
183       return ACTOR_PARAMER_ERR;
184     }
185   } else {
186     return IO_NOT_FIND;
187   }
188 }
189 
Reconnect(const AID & to)190 int ActorBase::Reconnect(const AID &to) {
191   auto io = ActorMgr::GetIOMgrRef(to);
192   if (io != nullptr) {
193     if (to.OK()) {
194       io->Reconnect(this->GetAID(), to);
195       return ERRORCODE_SUCCESS;
196     } else {
197       return ACTOR_PARAMER_ERR;
198     }
199   } else {
200     return IO_NOT_FIND;
201   }
202 }
203 
GetOutBufSize(const AID & to)204 uint64_t ActorBase::GetOutBufSize(const AID &to) {
205   auto io = ActorMgr::GetIOMgrRef(to);
206   if (io != nullptr) {
207     return io->GetOutBufSize();
208   } else {
209     return 0;
210   }
211 }
212 
GetInBufSize(const AID & to)213 uint64_t ActorBase::GetInBufSize(const AID &to) {
214   auto io = ActorMgr::GetIOMgrRef(to);
215   if (io != nullptr) {
216     return io->GetInBufSize();
217   } else {
218     return 0;
219   }
220 }
221 
AddRuleUdp(const std::string & peer,int recordNum)222 int ActorBase::AddRuleUdp(const std::string &peer, int recordNum) {
223   const std::string udp = MINDRT_UDP;
224   auto io = ActorMgr::GetIOMgrRef(udp);
225   if (io != nullptr) {
226     return io->AddRuleUdp(peer, recordNum);
227   } else {
228     return 0;
229   }
230 }
231 
DelRuleUdp(const std::string & peer,bool outputLog)232 void ActorBase::DelRuleUdp(const std::string &peer, bool outputLog) {
233   const std::string udp = MINDRT_UDP;
234   auto io = ActorMgr::GetIOMgrRef(udp);
235   if (io != nullptr) {
236     io->DelRuleUdp(peer, outputLog);
237   }
238 }
239 }  // namespace mindspore
240