1 /**
2 * Copyright 2021-2023 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "actor/actor.h"
18 #include "actor/actormgr.h"
19 #include "actor/iomgr.h"
20
21 namespace mindspore {
ActorBase()22 ActorBase::ActorBase() : mailbox(nullptr), id("", ActorMgr::GetActorMgrRef()->GetUrl()), actionFunctions() {}
23
ActorBase(const std::string & name)24 ActorBase::ActorBase(const std::string &name)
25 : mailbox(nullptr), id(name, ActorMgr::GetActorMgrRef()->GetUrl()), actionFunctions() {}
26
ActorBase(const std::string & name,ActorThreadPool * pool)27 ActorBase::ActorBase(const std::string &name, ActorThreadPool *pool)
28 : mailbox(nullptr), id(name, ActorMgr::GetActorMgrRef()->GetUrl()), actionFunctions(), pool_(pool) {}
29
~ActorBase()30 ActorBase::~ActorBase() {}
31
Spawn(const std::shared_ptr<ActorBase>,std::unique_ptr<MailBox> mailboxPtr)32 void ActorBase::Spawn(const std::shared_ptr<ActorBase>, std::unique_ptr<MailBox> mailboxPtr) {
33 // lock here or await(). and unlock at Quit() or at await.
34 waiterLock.Wait();
35 this->mailbox = std::move(mailboxPtr);
36 }
37
Await()38 void ActorBase::Await() {
39 std::string actorName = id.Name();
40 // lock here or at spawn(). and unlock here or at worker(). wait for the worker to finish.
41 MS_LOG(DEBUG) << "ACTOR is waiting for terminate to finish. a=" << actorName.c_str();
42 waiterLock.Wait();
43 waiterLock.Signal();
44
45 // mailbox's hook may hold the actor reference, we need explicitly free the mailbox to avoid the memory leak. the
46 // details can refer to the comments in ActorMgr::Spawn
47 delete mailbox.release();
48 MS_LOG(DEBUG) << "ACTOR succeeded in waiting. a=" << actorName.c_str();
49 }
Terminate()50 void ActorBase::Terminate() {
51 bool flag = false;
52 if (terminating_.compare_exchange_strong(flag, true)) {
53 std::unique_ptr<MessageBase> msg(new (std::nothrow) MessageBase("Terminate", MessageBase::Type::KTERMINATE));
54 MINDRT_OOM_EXIT(msg);
55 (void)EnqueMessage(std::move(msg));
56 }
57 }
58
HandlekMsg(const std::unique_ptr<MessageBase> & msg)59 void ActorBase::HandlekMsg(const std::unique_ptr<MessageBase> &msg) {
60 auto it = actionFunctions.find(msg->Name());
61 if (it != actionFunctions.end()) {
62 ActorFunction &func = it->second;
63 func(msg);
64 } else {
65 MS_LOG(WARNING) << "ACTOR can not find function for message, a=" << id.Name().c_str()
66 << ",m=" << msg->Name().c_str();
67 }
68 }
EnqueMessage(std::unique_ptr<MessageBase> msg) const69 int ActorBase::EnqueMessage(std::unique_ptr<MessageBase> msg) const {
70 int ret = mailbox->EnqueueMessage(std::move(msg));
71 return ret;
72 }
73
Quit()74 void ActorBase::Quit() {
75 Finalize();
76 // lock at spawn(), unlock here.
77 waiterLock.Signal();
78 }
79
Run()80 void ActorBase::Run() {
81 auto msgHandler = [this](const std::unique_ptr<MessageBase> &msg) {
82 switch (msg->GetType()) {
83 case MessageBase::Type::KMSG:
84 case MessageBase::Type::KUDP: {
85 if (Filter(msg)) {
86 return ERRORCODE_SUCCESS;
87 }
88 this->HandlekMsg(msg);
89 return ERRORCODE_SUCCESS;
90 }
91 case MessageBase::Type::KHTTP: {
92 this->HandleHttp(msg);
93 return ERRORCODE_SUCCESS;
94 }
95 case MessageBase::Type::KASYNC: {
96 msg->Run(this);
97 return ERRORCODE_SUCCESS;
98 }
99 case MessageBase::Type::KLOCAL: {
100 this->HandleLocalMsg(msg);
101 return ERRORCODE_SUCCESS;
102 }
103 case MessageBase::Type::KTERMINATE: {
104 this->Quit();
105 return ACTOR_TERMINATED;
106 }
107 case MessageBase::Type::KEXIT: {
108 this->Exited(msg->From());
109 return ERRORCODE_SUCCESS;
110 }
111 }
112 return ERRORCODE_SUCCESS;
113 };
114
115 if (this->mailbox->TakeAllMsgsEachTime()) {
116 while (auto msgs = mailbox->GetMsgs()) {
117 for (auto it = msgs->begin(); it != msgs->end(); ++it) {
118 std::unique_ptr<MessageBase> &msg = *it;
119 if (msg == nullptr) {
120 continue;
121 }
122 if (msgHandler(msg) == ACTOR_TERMINATED) {
123 return;
124 }
125 msg.reset(nullptr);
126 }
127 msgs->clear();
128 }
129 } else {
130 while (auto msg = mailbox->GetMsg()) {
131 if (msgHandler(msg) == ACTOR_TERMINATED) {
132 return;
133 }
134 }
135 }
136 return;
137 }
138
Send(const AID & to,std::unique_ptr<MessageBase> msg)139 int ActorBase::Send(const AID &to, std::unique_ptr<MessageBase> msg) {
140 msg->SetFrom(id);
141 return ActorMgr::GetActorMgrRef()->Send(to, std::move(msg));
142 }
Send(const AID & to,std::string && name,std::string && strMsg,bool remoteLink,bool isExactNotRemote)143 int ActorBase::Send(const AID &to, std::string &&name, std::string &&strMsg, bool remoteLink, bool isExactNotRemote) {
144 std::unique_ptr<MessageBase> msg(
145 new (std::nothrow) MessageBase(this->id, to, std::move(name), std::move(strMsg), MessageBase::Type::KMSG));
146 MINDRT_OOM_EXIT(msg);
147 return ActorMgr::GetActorMgrRef()->Send(to, std::move(msg), remoteLink, isExactNotRemote);
148 }
149
150 // register the message handle
Receive(const std::string & msgName,ActorFunction && func)151 void ActorBase::Receive(const std::string &msgName, ActorFunction &&func) {
152 if (actionFunctions.find(msgName) != actionFunctions.end()) {
153 MS_LOG(ERROR) << "ACTOR function's name conflicts, a=" << id.Name().c_str() << ",f=" << msgName.c_str();
154 MINDRT_EXIT("function's name conflicts");
155 return;
156 }
157 actionFunctions.emplace(msgName, std::move(func));
158 return;
159 }
160
Link(const AID & to)161 int ActorBase::Link(const AID &to) {
162 auto io = ActorMgr::GetIOMgrRef(to);
163 if (io != nullptr) {
164 if (to.OK()) {
165 io->Link(this->GetAID(), to);
166 return ERRORCODE_SUCCESS;
167 } else {
168 return ACTOR_PARAMER_ERR;
169 }
170 } else {
171 return IO_NOT_FIND;
172 }
173 }
UnLink(const AID & to)174 int ActorBase::UnLink(const AID &to) {
175 auto io = ActorMgr::GetIOMgrRef(to);
176 if (io != nullptr) {
177 if (to.OK()) {
178 io->UnLink(to);
179 return ERRORCODE_SUCCESS;
180 } else {
181 return ACTOR_PARAMER_ERR;
182 }
183 } else {
184 return IO_NOT_FIND;
185 }
186 }
187
Reconnect(const AID & to)188 int ActorBase::Reconnect(const AID &to) {
189 auto io = ActorMgr::GetIOMgrRef(to);
190 if (io != nullptr) {
191 if (to.OK()) {
192 io->Reconnect(this->GetAID(), to);
193 return ERRORCODE_SUCCESS;
194 } else {
195 return ACTOR_PARAMER_ERR;
196 }
197 } else {
198 return IO_NOT_FIND;
199 }
200 }
201
GetOutBufSize(const AID & to)202 uint64_t ActorBase::GetOutBufSize(const AID &to) {
203 auto io = ActorMgr::GetIOMgrRef(to);
204 if (io != nullptr) {
205 return io->GetOutBufSize();
206 } else {
207 return 0;
208 }
209 }
210
GetInBufSize(const AID & to)211 uint64_t ActorBase::GetInBufSize(const AID &to) {
212 auto io = ActorMgr::GetIOMgrRef(to);
213 if (io != nullptr) {
214 return io->GetInBufSize();
215 } else {
216 return 0;
217 }
218 }
219
AddRuleUdp(const std::string & peer,int recordNum)220 int ActorBase::AddRuleUdp(const std::string &peer, int recordNum) {
221 const std::string udp = MINDRT_UDP;
222 auto io = ActorMgr::GetIOMgrRef(udp);
223 if (io != nullptr) {
224 return io->AddRuleUdp(peer, recordNum);
225 } else {
226 return 0;
227 }
228 }
229
DelRuleUdp(const std::string & peer,bool outputLog)230 void ActorBase::DelRuleUdp(const std::string &peer, bool outputLog) {
231 const std::string udp = MINDRT_UDP;
232 auto io = ActorMgr::GetIOMgrRef(udp);
233 if (io != nullptr) {
234 io->DelRuleUdp(peer, outputLog);
235 }
236 }
237 } // namespace mindspore
238