1 /**
2 * Copyright 2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include <map>
17 #include <list>
18 #include <string>
19 #include <memory>
20 #include <utility>
21
22 #include "actor/actormgr.h"
23 #include "actor/iomgr.h"
24
25 namespace mindspore {
26 ActorMgr ActorMgr::actorMgr;
27 std::map<std::string, std::shared_ptr<IOMgr>> ActorMgr::ioMgrs;
28
GetIOMgrRef(const std::string & protocol)29 std::shared_ptr<IOMgr> &ActorMgr::GetIOMgrRef(const std::string &protocol) {
30 auto it = ioMgrs.find(protocol);
31 if (it != ioMgrs.end()) {
32 return it->second;
33 } else {
34 MS_LOG(DEBUG) << "Can't find IOMgr of protocol " << protocol.c_str();
35 static std::shared_ptr<IOMgr> nullIOMgr;
36 return nullIOMgr;
37 }
38 }
ActorMgr()39 ActorMgr::ActorMgr() : actors(), procotols(), urls() {
40 actors.clear();
41 procotols.clear();
42 urls.clear();
43 }
44
~ActorMgr()45 ActorMgr::~ActorMgr() {
46 if (inner_pool_ != nullptr) {
47 delete inner_pool_;
48 inner_pool_ = nullptr;
49 }
50 }
51
Initialize(bool use_inner_pool,size_t actor_thread_num,size_t max_thread_num)52 int ActorMgr::Initialize(bool use_inner_pool, size_t actor_thread_num, size_t max_thread_num) {
53 bool expected = false;
54 if (!initialized_.compare_exchange_strong(expected, true)) {
55 MS_LOG(DEBUG) << "Actor Manager has been initialized before";
56 return MINDRT_OK;
57 }
58 // create inner thread pool only when specified use_inner_pool
59 if (use_inner_pool) {
60 if (max_thread_num <= actor_thread_num) {
61 inner_pool_ = ActorThreadPool::CreateThreadPool(actor_thread_num);
62 if (inner_pool_ == nullptr) {
63 MS_LOG(ERROR) << "ActorMgr CreateThreadPool failed";
64 return MINDRT_ERROR;
65 }
66 } else {
67 inner_pool_ = ActorThreadPool::CreateThreadPool(actor_thread_num, max_thread_num, {});
68 if (inner_pool_ == nullptr) {
69 MS_LOG(ERROR) << "ActorMgr CreateThreadPool failed";
70 return MINDRT_ERROR;
71 }
72 inner_pool_->SetActorThreadNum(actor_thread_num);
73 inner_pool_->DisableOccupiedActorThread();
74 inner_pool_->SetKernelThreadNum(max_thread_num - actor_thread_num);
75 }
76 if (inner_pool_ != nullptr) {
77 inner_pool_->SetMaxSpinCount(kDefaultSpinCount);
78 inner_pool_->SetSpinCountMaxValue();
79 }
80 }
81 return MINDRT_OK;
82 }
83
SetActorReady(const ActorReference & actor) const84 void ActorMgr::SetActorReady(const ActorReference &actor) const {
85 // use inner thread pool or actor thread pool created externally
86 // priority to use actor thread pool
87 MINDRT_OOM_EXIT(actor);
88 ActorThreadPool *pool = actor->pool_ ? actor->pool_ : inner_pool_;
89 if (pool == nullptr) {
90 MS_LOG(ERROR) << "ThreadPool is nullptr, " << actor->pool_ << ", " << inner_pool_
91 << ", actor: " << actor->GetAID().Name();
92 return;
93 }
94 pool->PushActorToQueue(actor.get());
95 }
96
GetUrl(const std::string & protocol)97 const std::string ActorMgr::GetUrl(const std::string &protocol) {
98 auto it = procotols.find(protocol);
99 if (it != procotols.end()) {
100 return it->second;
101 } else if (procotols.size() > 0) {
102 return procotols.begin()->second;
103 } else {
104 return "";
105 }
106 }
107
AddUrl(const std::string & protocol,const std::string & url)108 void ActorMgr::AddUrl(const std::string &protocol, const std::string &url) {
109 procotols[protocol] = url;
110 AID id("a@" + url);
111 (void)urls.insert(id.GetIp() + ":" + std::to_string(id.GetPort()));
112 (void)urls.insert(id.GetProtocol() + "://" + id.GetIp() + ":" + std::to_string(id.GetPort()));
113 (void)urls.insert(std::string("127.0.0.1:") + std::to_string(id.GetPort()));
114 (void)urls.insert(protocol + "://127.0.0.1:" + std::to_string(id.GetPort()));
115 }
116
AddIOMgr(const std::string & protocol,const std::shared_ptr<IOMgr> & ioMgr)117 void ActorMgr::AddIOMgr(const std::string &protocol, const std::shared_ptr<IOMgr> &ioMgr) { ioMgrs[protocol] = ioMgr; }
118
RemoveActor(const std::string & name)119 void ActorMgr::RemoveActor(const std::string &name) {
120 actorsMutex.lock();
121 (void)actors.erase(name);
122 actorsMutex.unlock();
123 }
124
TerminateAll()125 void ActorMgr::TerminateAll() {
126 // copy all the actors
127 std::list<ActorReference> actorsWaiting;
128 actorsMutex.lock();
129 for (auto actorIt = actors.begin(); actorIt != actors.end(); ++actorIt) {
130 actorsWaiting.push_back(actorIt->second);
131 }
132 actorsMutex.unlock();
133
134 // send terminal msg to all actors.
135 for (auto actorIt = actorsWaiting.begin(); actorIt != actorsWaiting.end(); ++actorIt) {
136 (*actorIt)->Terminate();
137 }
138
139 // wait actor's thread to finish.
140 for (auto actorIt = actorsWaiting.begin(); actorIt != actorsWaiting.end(); ++actorIt) {
141 (*actorIt)->Await();
142 }
143 }
144
Finalize()145 void ActorMgr::Finalize() {
146 this->TerminateAll();
147 MS_LOG(INFO) << "mindrt Actors finish exiting.";
148
149 // stop all actor threads;
150 MS_LOG(INFO) << "mindrt Threads finish exiting.";
151
152 // stop iomgr thread
153 for (auto mgrIt = ioMgrs.begin(); mgrIt != ioMgrs.end(); ++mgrIt) {
154 MS_LOG(INFO) << "finalize IOMgr=" << mgrIt->first.c_str();
155 mgrIt->second->Finish();
156 }
157
158 // delete actor thread pool if use_inner_pool
159 delete inner_pool_;
160 inner_pool_ = nullptr;
161 MS_LOG(INFO) << "mindrt IOMGRS finish exiting.";
162 }
163
GetActor(const AID & id)164 ActorReference ActorMgr::GetActor(const AID &id) {
165 #ifndef MS_COMPILE_IOS
166 actorsMutex.lock_shared();
167 #else
168 actorsMutex.lock();
169 #endif
170 const auto &actorIt = actors.find(id.Name());
171 if (actorIt != actors.end()) {
172 auto &result = actorIt->second;
173 #ifndef MS_COMPILE_IOS
174 actorsMutex.unlock_shared();
175 #else
176 actorsMutex.unlock();
177 #endif
178 return result;
179 } else {
180 #ifndef MS_COMPILE_IOS
181 actorsMutex.unlock_shared();
182 #else
183 actorsMutex.unlock();
184 #endif
185 MS_LOG(DEBUG) << "can't find ACTOR with name=" << id.Name().c_str();
186 return nullptr;
187 }
188 }
189
EnqueueMessage(const mindspore::ActorReference actor,std::unique_ptr<mindspore::MessageBase> msg)190 int ActorMgr::EnqueueMessage(const mindspore::ActorReference actor, std::unique_ptr<mindspore::MessageBase> msg) {
191 return actor->EnqueMessage(std::move(msg));
192 }
193
Send(const AID & to,std::unique_ptr<MessageBase> msg,bool remoteLink,bool isExactNotRemote)194 int ActorMgr::Send(const AID &to, std::unique_ptr<MessageBase> msg, bool remoteLink, bool isExactNotRemote) {
195 // The destination is local
196 if (IsLocalAddres(to)) {
197 auto actor = GetActor(to);
198 if (actor != nullptr) {
199 if (to.GetProtocol() == MINDRT_UDP && msg->GetType() == MessageBase::Type::KMSG) {
200 msg->type = MessageBase::Type::KUDP;
201 }
202 return EnqueueMessage(actor, std::move(msg));
203 } else {
204 return ACTOR_NOT_FIND;
205 }
206 } else {
207 // send to remote actor
208 if (msg->GetType() != MessageBase::Type::KMSG) {
209 MS_LOG(ERROR) << "The msg is not KMSG,it can't send to remote=" << std::string(to).c_str();
210 return ACTOR_PARAMER_ERR;
211 } else {
212 // null
213 }
214 msg->SetTo(to);
215 auto &io = ActorMgr::GetIOMgrRef(to);
216 if (io != nullptr) {
217 return io->Send(std::move(msg), remoteLink, isExactNotRemote);
218 } else {
219 MS_LOG(ERROR) << "The protocol is not supported:"
220 << "p=" << to.GetProtocol().c_str() << ",f=" << msg->From().Name().c_str()
221 << ",t=" << to.Name().c_str() << ",m=" << msg->Name().c_str();
222 return IO_NOT_FIND;
223 }
224 }
225 }
226
Spawn(const ActorReference & actor,bool shareThread)227 AID ActorMgr::Spawn(const ActorReference &actor, bool shareThread) {
228 actorsMutex.lock();
229 if (actors.find(actor->GetAID().Name()) != actors.end()) {
230 actorsMutex.unlock();
231 MS_LOG(ERROR) << "The actor's name conflicts,name:" << actor->GetAID().Name().c_str();
232 MINDRT_EXIT("Actor name conflicts.");
233 }
234 MS_LOG(DEBUG) << "ACTOR was spawned,a=" << actor->GetAID().Name().c_str();
235
236 if (shareThread) {
237 auto mailbox = std::unique_ptr<MailBox>(new (std::nothrow) NonblockingMailBox());
238 auto hook = std::unique_ptr<std::function<void()>>(
239 new std::function<void()>([actor]() { ActorMgr::GetActorMgrRef()->SetActorReady(actor); }));
240 // the mailbox has this hook, the hook holds the actor reference, the actor has the mailbox. this is a cycle which
241 // will leads to memory leak. in order to fix this issue, we should explicitly free the mailbox when terminate the
242 // actor
243 mailbox->SetNotifyHook(std::move(hook));
244 actor->Spawn(actor, std::move(mailbox));
245
246 } else {
247 auto mailbox = std::unique_ptr<MailBox>(new (std::nothrow) BlockingMailBox());
248 actor->Spawn(actor, std::move(mailbox));
249 ActorMgr::GetActorMgrRef()->SetActorReady(actor);
250 }
251
252 (void)this->actors.emplace(actor->GetAID().Name(), actor);
253 actorsMutex.unlock();
254
255 // long time
256 actor->Init();
257
258 return actor->GetAID();
259 }
260
Terminate(const AID & id)261 void ActorMgr::Terminate(const AID &id) {
262 auto actor = GetActor(id);
263 if (actor != nullptr) {
264 actor->Terminate();
265 // Wait actor's thread to finish.
266 actor->Await();
267 RemoveActor(id.Name());
268 }
269 }
270
Wait(const AID & id)271 void ActorMgr::Wait(const AID &id) {
272 auto actor = GetActor(id);
273 if (actor != nullptr) {
274 actor->Await();
275 }
276 }
277 }; // end of namespace mindspore
278