1 /**
2 * Copyright 2021-2023 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include <map>
17 #include <list>
18 #include <string>
19 #include <memory>
20 #include <utility>
21 #include <vector>
22
23 #include "actor/actormgr.h"
24 #include "actor/iomgr.h"
25
26 namespace mindspore {
27 ActorMgr ActorMgr::actorMgr;
28 std::map<std::string, std::shared_ptr<IOMgr>> ActorMgr::ioMgrs;
29
GetIOMgrRef(const std::string & protocol)30 std::shared_ptr<IOMgr> &ActorMgr::GetIOMgrRef(const std::string &protocol) {
31 auto it = ioMgrs.find(protocol);
32 if (it != ioMgrs.end()) {
33 return it->second;
34 } else {
35 MS_LOG(DEBUG) << "Can't find IOMgr of protocol " << protocol.c_str();
36 static std::shared_ptr<IOMgr> nullIOMgr;
37 return nullIOMgr;
38 }
39 }
ActorMgr()40 ActorMgr::ActorMgr() : actors(), procotols(), urls() {
41 actors.clear();
42 procotols.clear();
43 urls.clear();
44 }
45
~ActorMgr()46 ActorMgr::~ActorMgr() {
47 if (inner_pool_ != nullptr) {
48 delete inner_pool_;
49 inner_pool_ = nullptr;
50 }
51 }
52
Initialize(bool use_inner_pool,size_t actor_thread_num,size_t max_thread_num,size_t actor_queue_size,const std::vector<int> & core_list)53 int ActorMgr::Initialize(bool use_inner_pool, size_t actor_thread_num, size_t max_thread_num, size_t actor_queue_size,
54 const std::vector<int> &core_list) {
55 MS_LOG(DEBUG) << "ActorMgr Initialize, use_inner_pool : " << use_inner_pool
56 << ", actor_thread_num : " << actor_thread_num << ", max_thread_num : " << max_thread_num
57 << ", actor_queue_size : " << actor_queue_size << ", core_list size : " << core_list.size();
58 std::unique_lock lock(actorsMutex);
59 if (initialized_) {
60 MS_LOG(DEBUG) << "Actor Manager has been initialized before";
61 return MINDRT_OK;
62 }
63 initialized_ = true;
64 // create inner thread pool only when specified use_inner_pool
65 if (use_inner_pool) {
66 ActorThreadPool::set_actor_queue_size(actor_queue_size);
67 if (max_thread_num <= actor_thread_num) {
68 inner_pool_ = ActorThreadPool::CreateThreadPool(actor_thread_num);
69 if (inner_pool_ == nullptr) {
70 MS_LOG(ERROR) << "ActorMgr CreateThreadPool failed";
71 return MINDRT_ERROR;
72 }
73 } else {
74 // Reverse core list to avoid bind cpu 0.
75 std::vector<int> bind_list;
76 for (size_t i = 0; i < core_list.size(); i++) {
77 bind_list.push_back(core_list[core_list.size() - 1 - i]);
78 }
79 auto bind_mode = !bind_list.empty() ? BindMode::Power_Higher : BindMode::Power_NoBind;
80 inner_pool_ = ActorThreadPool::CreateThreadPool(actor_thread_num, max_thread_num, bind_list, bind_mode);
81 if (inner_pool_ == nullptr) {
82 MS_LOG(ERROR) << "ActorMgr CreateThreadPool failed";
83 return MINDRT_ERROR;
84 }
85 inner_pool_->SetActorThreadNum(actor_thread_num);
86 inner_pool_->SetKernelThreadNum(max_thread_num - actor_thread_num);
87 }
88 if (inner_pool_ != nullptr) {
89 inner_pool_->SetMaxSpinCount(kDefaultSpinCount);
90 inner_pool_->SetSpinCountMaxValue();
91 inner_pool_->SetKernelThreadMaxSpinCount(kDefaultKernelSpinCount);
92 inner_pool_->SetWorkerIdMap();
93 }
94 }
95 return MINDRT_OK;
96 }
97
SetActorReady(const ActorReference & actor) const98 void ActorMgr::SetActorReady(const ActorReference &actor) const {
99 // use inner thread pool or actor thread pool created externally
100 // priority to use actor thread pool
101 MINDRT_OOM_EXIT(actor);
102 ActorThreadPool *pool = actor->pool_ ? actor->pool_ : inner_pool_;
103 if (pool == nullptr) {
104 MS_LOG(ERROR) << "ThreadPool is nullptr, " << actor->pool_ << ", " << inner_pool_
105 << ", actor: " << actor->GetAID().Name();
106 return;
107 }
108 pool->PushActorToQueue(actor.get());
109 }
110
GetUrl(const std::string & protocol)111 const std::string ActorMgr::GetUrl(const std::string &protocol) {
112 auto it = procotols.find(protocol);
113 if (it != procotols.end()) {
114 return it->second;
115 } else if (procotols.size() > 0) {
116 return procotols.begin()->second;
117 } else {
118 return "";
119 }
120 }
121
AddUrl(const std::string & protocol,const std::string & url)122 void ActorMgr::AddUrl(const std::string &protocol, const std::string &url) {
123 procotols[protocol] = url;
124 AID id("a@" + url);
125 (void)urls.insert(id.GetIp() + ":" + std::to_string(id.GetPort()));
126 (void)urls.insert(id.GetProtocol() + "://" + id.GetIp() + ":" + std::to_string(id.GetPort()));
127 (void)urls.insert(std::string("127.0.0.1:") + std::to_string(id.GetPort()));
128 (void)urls.insert(protocol + "://127.0.0.1:" + std::to_string(id.GetPort()));
129 }
130
AddIOMgr(const std::string & protocol,const std::shared_ptr<IOMgr> & ioMgr)131 void ActorMgr::AddIOMgr(const std::string &protocol, const std::shared_ptr<IOMgr> &ioMgr) { ioMgrs[protocol] = ioMgr; }
132
RemoveActor(const std::string & name)133 void ActorMgr::RemoveActor(const std::string &name) {
134 actorsMutex.lock();
135 (void)actors.erase(name);
136 actorsMutex.unlock();
137 }
138
TerminateAll()139 void ActorMgr::TerminateAll() {
140 if (actors.empty()) {
141 return;
142 }
143 // copy all the actors
144 std::list<ActorReference> actorsWaiting;
145 actorsMutex.lock();
146 for (auto actorIt = actors.begin(); actorIt != actors.end(); ++actorIt) {
147 actorsWaiting.push_back(actorIt->second);
148 }
149 actorsMutex.unlock();
150
151 // send terminal msg to all actors.
152 for (auto actorIt = actorsWaiting.begin(); actorIt != actorsWaiting.end(); ++actorIt) {
153 (*actorIt)->Terminate();
154 }
155
156 // wait actor's thread to finish and remove actor.
157 for (auto actorIt = actorsWaiting.begin(); actorIt != actorsWaiting.end(); ++actorIt) {
158 (*actorIt)->Await();
159 RemoveActor((*actorIt)->GetAID().Name());
160 }
161 }
162
Finalize()163 void ActorMgr::Finalize() {
164 this->TerminateAll();
165 MS_LOG(INFO) << "mindrt Actors finish exiting.";
166
167 // stop all actor threads;
168 MS_LOG(INFO) << "mindrt Threads finish exiting.";
169
170 // stop iomgr thread
171 for (auto mgrIt = ioMgrs.begin(); mgrIt != ioMgrs.end(); ++mgrIt) {
172 MS_LOG(INFO) << "finalize IOMgr=" << mgrIt->first.c_str();
173 mgrIt->second->Finalize();
174 }
175
176 // delete actor thread pool if use_inner_pool
177 delete inner_pool_;
178 inner_pool_ = nullptr;
179 MS_LOG(INFO) << "mindrt IOMGRS finish exiting.";
180 }
181
GetActor(const AID & id)182 ActorReference ActorMgr::GetActor(const AID &id) {
183 #ifndef MS_COMPILE_IOS
184 actorsMutex.lock_shared();
185 #else
186 actorsMutex.lock();
187 #endif
188 const auto &actorIt = actors.find(id.Name());
189 if (actorIt != actors.end()) {
190 auto &result = actorIt->second;
191 #ifndef MS_COMPILE_IOS
192 actorsMutex.unlock_shared();
193 #else
194 actorsMutex.unlock();
195 #endif
196 return result;
197 } else {
198 #ifndef MS_COMPILE_IOS
199 actorsMutex.unlock_shared();
200 #else
201 actorsMutex.unlock();
202 #endif
203 MS_LOG(DEBUG) << "can't find ACTOR with name=" << id.Name().c_str();
204 return nullptr;
205 }
206 }
207
EnqueueMessage(const mindspore::ActorReference actor,std::unique_ptr<mindspore::MessageBase> msg)208 int ActorMgr::EnqueueMessage(const mindspore::ActorReference actor, std::unique_ptr<mindspore::MessageBase> msg) {
209 return actor->EnqueMessage(std::move(msg));
210 }
211
Send(const AID & to,std::unique_ptr<MessageBase> msg,bool remoteLink,bool isExactNotRemote)212 int ActorMgr::Send(const AID &to, std::unique_ptr<MessageBase> msg, bool remoteLink, bool isExactNotRemote) {
213 // The destination is local
214 if (IsLocalAddres(to)) {
215 auto actor = GetActor(to);
216 if (actor != nullptr) {
217 if (to.GetProtocol() == MINDRT_UDP && msg->GetType() == MessageBase::Type::KMSG) {
218 msg->type = MessageBase::Type::KUDP;
219 }
220 return EnqueueMessage(actor, std::move(msg));
221 } else {
222 return ACTOR_NOT_FIND;
223 }
224 } else {
225 // send to remote actor
226 if (msg->GetType() != MessageBase::Type::KMSG) {
227 MS_LOG(ERROR) << "The msg is not KMSG,it can't send to remote=" << std::string(to).c_str();
228 return ACTOR_PARAMER_ERR;
229 } else {
230 // null
231 }
232 msg->SetTo(to);
233 auto &io = ActorMgr::GetIOMgrRef(to);
234 if (io != nullptr) {
235 return io->Send(std::move(msg), remoteLink, isExactNotRemote);
236 } else {
237 MS_LOG(ERROR) << "The protocol is not supported:"
238 << "p=" << to.GetProtocol().c_str() << ",f=" << msg->From().Name().c_str()
239 << ",t=" << to.Name().c_str() << ",m=" << msg->Name().c_str();
240 return IO_NOT_FIND;
241 }
242 }
243 }
244
Spawn(const ActorReference & actor,bool shareThread)245 AID ActorMgr::Spawn(const ActorReference &actor, bool shareThread) {
246 actorsMutex.lock();
247 if (actors.find(actor->GetAID().Name()) != actors.end()) {
248 actorsMutex.unlock();
249 MS_LOG(ERROR) << "The actor's name conflicts,name:" << actor->GetAID().Name().c_str();
250 MINDRT_EXIT("Actor name conflicts.");
251 }
252 MS_LOG(DEBUG) << "ACTOR was spawned,a=" << actor->GetAID().Name().c_str();
253
254 if (shareThread) {
255 auto mailbox = std::make_unique<NonblockingMailBox>();
256 auto hook = std::make_unique<std::function<void()>>([actor]() {
257 auto actor_mgr = actor->get_actor_mgr();
258 if (actor_mgr != nullptr) {
259 actor_mgr->SetActorReady(actor);
260 } else {
261 ActorMgr::GetActorMgrRef()->SetActorReady(actor);
262 }
263 });
264 // the mailbox has this hook, the hook holds the actor reference, the actor has the mailbox. this is a cycle which
265 // will leads to memory leak. in order to fix this issue, we should explicitly free the mailbox when terminate the
266 // actor
267 mailbox->SetNotifyHook(std::move(hook));
268 actor->Spawn(actor, std::move(mailbox));
269 } else {
270 auto mailbox = std::unique_ptr<MailBox>(new (std::nothrow) BlockingMailBox());
271 actor->Spawn(actor, std::move(mailbox));
272 ActorMgr::GetActorMgrRef()->SetActorReady(actor);
273 }
274 (void)this->actors.emplace(actor->GetAID().Name(), actor);
275 actorsMutex.unlock();
276 // long time
277 actor->Init();
278 return actor->GetAID();
279 }
280
ResetActorAfterFork(const ActorReference & actor)281 void ActorMgr::ResetActorAfterFork(const ActorReference &actor) {
282 if (actor) {
283 actor->Quit();
284 if (actor->mailbox != nullptr) {
285 (void)actor->mailbox.release();
286 }
287
288 RemoveActor(actor->GetAID().Name());
289 }
290 }
291
Terminate(const AID & id)292 void ActorMgr::Terminate(const AID &id) {
293 auto actor = GetActor(id);
294 if (actor != nullptr) {
295 actor->Terminate();
296 // Wait actor's thread to finish.
297 actor->Await();
298 RemoveActor(id.Name());
299 }
300 }
301
Wait(const AID & id)302 void ActorMgr::Wait(const AID &id) {
303 auto actor = GetActor(id);
304 if (actor != nullptr) {
305 actor->Await();
306 }
307 }
308
ChildAfterFork()309 void ActorMgr::ChildAfterFork() {
310 if (inner_pool_) {
311 MS_LOG(DEBUG) << "ActorMgr reinitialize inner_pool_ after fork.";
312 inner_pool_->ChildAfterFork();
313 }
314 }
315
316 }; // end of namespace mindspore
317