1 /**
2 * Copyright 2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "actor/actor.h"
18 #include "actor/actormgr.h"
19 #include "actor/iomgr.h"
20
21 namespace mindspore {
ActorBase()22 ActorBase::ActorBase() : mailbox(nullptr), id("", ActorMgr::GetActorMgrRef()->GetUrl()), actionFunctions() {}
23
ActorBase(const std::string & name)24 ActorBase::ActorBase(const std::string &name)
25 : mailbox(nullptr), id(name, ActorMgr::GetActorMgrRef()->GetUrl()), actionFunctions() {}
26
ActorBase(const std::string & name,ActorThreadPool * pool)27 ActorBase::ActorBase(const std::string &name, ActorThreadPool *pool)
28 : mailbox(nullptr), id(name, ActorMgr::GetActorMgrRef()->GetUrl()), actionFunctions(), pool_(pool) {}
29
~ActorBase()30 ActorBase::~ActorBase() {}
31
Spawn(const std::shared_ptr<ActorBase>,std::unique_ptr<MailBox> mailboxPtr)32 void ActorBase::Spawn(const std::shared_ptr<ActorBase>, std::unique_ptr<MailBox> mailboxPtr) {
33 // lock here or await(). and unlock at Quit() or at await.
34 waiterLock.lock();
35 this->mailbox = std::move(mailboxPtr);
36 }
37
Await()38 void ActorBase::Await() {
39 std::string actorName = id.Name();
40 // lock here or at spawn(). and unlock here or at worker(). wait for the worker to finish.
41 MS_LOG(DEBUG) << "ACTOR is waiting for terminate to finish. a=" << actorName.c_str();
42
43 waiterLock.lock();
44 waiterLock.unlock();
45
46 // mailbox's hook may hold the actor reference, we need explicitly free the mailbox to avoid the memory leak. the
47 // details can refer to the comments in ActorMgr::Spawn
48 delete mailbox.release();
49 MS_LOG(DEBUG) << "ACTOR succeeded in waiting. a=" << actorName.c_str();
50 }
Terminate()51 void ActorBase::Terminate() {
52 bool flag = false;
53 if (terminating_.compare_exchange_strong(flag, true)) {
54 std::unique_ptr<MessageBase> msg(new (std::nothrow) MessageBase("Terminate", MessageBase::Type::KTERMINATE));
55 MINDRT_OOM_EXIT(msg);
56 (void)EnqueMessage(std::move(msg));
57 }
58 }
59
HandlekMsg(const std::unique_ptr<MessageBase> & msg)60 void ActorBase::HandlekMsg(const std::unique_ptr<MessageBase> &msg) {
61 auto it = actionFunctions.find(msg->Name());
62 if (it != actionFunctions.end()) {
63 ActorFunction &func = it->second;
64 func(msg);
65 } else {
66 MS_LOG(WARNING) << "ACTOR can not find function for message, a=" << id.Name().c_str()
67 << ",m=" << msg->Name().c_str();
68 }
69 }
EnqueMessage(std::unique_ptr<MessageBase> msg) const70 int ActorBase::EnqueMessage(std::unique_ptr<MessageBase> msg) const {
71 int ret = mailbox->EnqueueMessage(std::move(msg));
72 return ret;
73 }
74
Quit()75 void ActorBase::Quit() {
76 Finalize();
77 // lock at spawn(), unlock here.
78 waiterLock.unlock();
79 }
80
Run()81 void ActorBase::Run() {
82 auto msgHandler = [this](const std::unique_ptr<MessageBase> &msg) {
83 AddMsgRecord(msg->Name());
84 switch (msg->GetType()) {
85 case MessageBase::Type::KMSG:
86 case MessageBase::Type::KUDP: {
87 if (Filter(msg)) {
88 return ERRORCODE_SUCCESS;
89 }
90 this->HandlekMsg(msg);
91 return ERRORCODE_SUCCESS;
92 }
93 case MessageBase::Type::KHTTP: {
94 this->HandleHttp(msg);
95 return ERRORCODE_SUCCESS;
96 }
97 case MessageBase::Type::KASYNC: {
98 msg->Run(this);
99 return ERRORCODE_SUCCESS;
100 }
101 case MessageBase::Type::KLOCAL: {
102 this->HandleLocalMsg(msg);
103 return ERRORCODE_SUCCESS;
104 }
105 case MessageBase::Type::KTERMINATE: {
106 this->Quit();
107 return ACTOR_TERMINATED;
108 }
109 case MessageBase::Type::KEXIT: {
110 this->Exited(msg->From());
111 return ERRORCODE_SUCCESS;
112 }
113 }
114 return ERRORCODE_SUCCESS;
115 };
116
117 if (this->mailbox->TakeAllMsgsEachTime()) {
118 while (auto msgs = mailbox->GetMsgs()) {
119 for (auto it = msgs->begin(); it != msgs->end(); ++it) {
120 std::unique_ptr<MessageBase> &msg = *it;
121 if (msg == nullptr) {
122 continue;
123 }
124 MS_LOG(DEBUG) << "dequeue message]actor=" << id.Name() << ",msg=" << msg->Name();
125 if (msgHandler(msg) == ACTOR_TERMINATED) {
126 return;
127 }
128 }
129 msgs->clear();
130 }
131 } else {
132 while (auto msg = mailbox->GetMsg()) {
133 if (msgHandler(msg) == ACTOR_TERMINATED) {
134 return;
135 }
136 }
137 }
138 return;
139 }
140
Send(const AID & to,std::unique_ptr<MessageBase> msg)141 int ActorBase::Send(const AID &to, std::unique_ptr<MessageBase> msg) {
142 msg->SetFrom(id);
143 return ActorMgr::GetActorMgrRef()->Send(to, std::move(msg));
144 }
Send(const AID & to,std::string && name,std::string && strMsg,bool remoteLink,bool isExactNotRemote)145 int ActorBase::Send(const AID &to, std::string &&name, std::string &&strMsg, bool remoteLink, bool isExactNotRemote) {
146 std::unique_ptr<MessageBase> msg(
147 new (std::nothrow) MessageBase(this->id, to, std::move(name), std::move(strMsg), MessageBase::Type::KMSG));
148 MINDRT_OOM_EXIT(msg);
149 return ActorMgr::GetActorMgrRef()->Send(to, std::move(msg), remoteLink, isExactNotRemote);
150 }
151
152 // register the message handle
Receive(const std::string & msgName,ActorFunction && func)153 void ActorBase::Receive(const std::string &msgName, ActorFunction &&func) {
154 if (actionFunctions.find(msgName) != actionFunctions.end()) {
155 MS_LOG(ERROR) << "ACTOR function's name conflicts, a=" << id.Name().c_str() << ",f=" << msgName.c_str();
156 MINDRT_EXIT("function's name conflicts");
157 return;
158 }
159 actionFunctions.emplace(msgName, std::move(func));
160 return;
161 }
162
Link(const AID & to)163 int ActorBase::Link(const AID &to) {
164 auto io = ActorMgr::GetIOMgrRef(to);
165 if (io != nullptr) {
166 if (to.OK()) {
167 io->Link(this->GetAID(), to);
168 return ERRORCODE_SUCCESS;
169 } else {
170 return ACTOR_PARAMER_ERR;
171 }
172 } else {
173 return IO_NOT_FIND;
174 }
175 }
UnLink(const AID & to)176 int ActorBase::UnLink(const AID &to) {
177 auto io = ActorMgr::GetIOMgrRef(to);
178 if (io != nullptr) {
179 if (to.OK()) {
180 io->UnLink(to);
181 return ERRORCODE_SUCCESS;
182 } else {
183 return ACTOR_PARAMER_ERR;
184 }
185 } else {
186 return IO_NOT_FIND;
187 }
188 }
189
Reconnect(const AID & to)190 int ActorBase::Reconnect(const AID &to) {
191 auto io = ActorMgr::GetIOMgrRef(to);
192 if (io != nullptr) {
193 if (to.OK()) {
194 io->Reconnect(this->GetAID(), to);
195 return ERRORCODE_SUCCESS;
196 } else {
197 return ACTOR_PARAMER_ERR;
198 }
199 } else {
200 return IO_NOT_FIND;
201 }
202 }
203
GetOutBufSize(const AID & to)204 uint64_t ActorBase::GetOutBufSize(const AID &to) {
205 auto io = ActorMgr::GetIOMgrRef(to);
206 if (io != nullptr) {
207 return io->GetOutBufSize();
208 } else {
209 return 0;
210 }
211 }
212
GetInBufSize(const AID & to)213 uint64_t ActorBase::GetInBufSize(const AID &to) {
214 auto io = ActorMgr::GetIOMgrRef(to);
215 if (io != nullptr) {
216 return io->GetInBufSize();
217 } else {
218 return 0;
219 }
220 }
221
AddRuleUdp(const std::string & peer,int recordNum)222 int ActorBase::AddRuleUdp(const std::string &peer, int recordNum) {
223 const std::string udp = MINDRT_UDP;
224 auto io = ActorMgr::GetIOMgrRef(udp);
225 if (io != nullptr) {
226 return io->AddRuleUdp(peer, recordNum);
227 } else {
228 return 0;
229 }
230 }
231
DelRuleUdp(const std::string & peer,bool outputLog)232 void ActorBase::DelRuleUdp(const std::string &peer, bool outputLog) {
233 const std::string udp = MINDRT_UDP;
234 auto io = ActorMgr::GetIOMgrRef(udp);
235 if (io != nullptr) {
236 io->DelRuleUdp(peer, outputLog);
237 }
238 }
239 } // namespace mindspore
240