• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <map>
17 #include <list>
18 #include <string>
19 #include <memory>
20 #include <utility>
21 #include <vector>
22 
23 #include "actor/actormgr.h"
24 #include "actor/iomgr.h"
25 
26 namespace mindspore {
27 ActorMgr ActorMgr::actorMgr;
28 std::map<std::string, std::shared_ptr<IOMgr>> ActorMgr::ioMgrs;
29 
GetIOMgrRef(const std::string & protocol)30 std::shared_ptr<IOMgr> &ActorMgr::GetIOMgrRef(const std::string &protocol) {
31   auto it = ioMgrs.find(protocol);
32   if (it != ioMgrs.end()) {
33     return it->second;
34   } else {
35     MS_LOG(DEBUG) << "Can't find IOMgr of protocol " << protocol.c_str();
36     static std::shared_ptr<IOMgr> nullIOMgr;
37     return nullIOMgr;
38   }
39 }
ActorMgr()40 ActorMgr::ActorMgr() : actors(), procotols(), urls() {
41   actors.clear();
42   procotols.clear();
43   urls.clear();
44 }
45 
~ActorMgr()46 ActorMgr::~ActorMgr() {
47   if (inner_pool_ != nullptr) {
48     delete inner_pool_;
49     inner_pool_ = nullptr;
50   }
51 }
52 
Initialize(bool use_inner_pool,size_t actor_thread_num,size_t max_thread_num,size_t actor_queue_size,const std::vector<int> & core_list)53 int ActorMgr::Initialize(bool use_inner_pool, size_t actor_thread_num, size_t max_thread_num, size_t actor_queue_size,
54                          const std::vector<int> &core_list) {
55   MS_LOG(DEBUG) << "ActorMgr Initialize, use_inner_pool : " << use_inner_pool
56                 << ", actor_thread_num : " << actor_thread_num << ", max_thread_num : " << max_thread_num
57                 << ", actor_queue_size : " << actor_queue_size << ", core_list size : " << core_list.size();
58   std::unique_lock lock(actorsMutex);
59   if (initialized_) {
60     MS_LOG(DEBUG) << "Actor Manager has been initialized before";
61     return MINDRT_OK;
62   }
63   initialized_ = true;
64   // create inner thread pool only when specified use_inner_pool
65   if (use_inner_pool) {
66     ActorThreadPool::set_actor_queue_size(actor_queue_size);
67     if (max_thread_num <= actor_thread_num) {
68       inner_pool_ = ActorThreadPool::CreateThreadPool(actor_thread_num);
69       if (inner_pool_ == nullptr) {
70         MS_LOG(ERROR) << "ActorMgr CreateThreadPool failed";
71         return MINDRT_ERROR;
72       }
73     } else {
74       // Reverse core list to avoid bind cpu 0.
75       std::vector<int> bind_list;
76       for (size_t i = 0; i < core_list.size(); i++) {
77         bind_list.push_back(core_list[core_list.size() - 1 - i]);
78       }
79       auto bind_mode = !bind_list.empty() ? BindMode::Power_Higher : BindMode::Power_NoBind;
80       inner_pool_ = ActorThreadPool::CreateThreadPool(actor_thread_num, max_thread_num, bind_list, bind_mode);
81       if (inner_pool_ == nullptr) {
82         MS_LOG(ERROR) << "ActorMgr CreateThreadPool failed";
83         return MINDRT_ERROR;
84       }
85       inner_pool_->SetActorThreadNum(actor_thread_num);
86       inner_pool_->SetKernelThreadNum(max_thread_num - actor_thread_num);
87     }
88     if (inner_pool_ != nullptr) {
89       inner_pool_->SetMaxSpinCount(kDefaultSpinCount);
90       inner_pool_->SetSpinCountMaxValue();
91       inner_pool_->SetKernelThreadMaxSpinCount(kDefaultKernelSpinCount);
92       inner_pool_->SetWorkerIdMap();
93     }
94   }
95   return MINDRT_OK;
96 }
97 
SetActorReady(const ActorReference & actor) const98 void ActorMgr::SetActorReady(const ActorReference &actor) const {
99   // use inner thread pool or actor thread pool created externally
100   // priority to use actor thread pool
101   MINDRT_OOM_EXIT(actor);
102   ActorThreadPool *pool = actor->pool_ ? actor->pool_ : inner_pool_;
103   if (pool == nullptr) {
104     MS_LOG(ERROR) << "ThreadPool is nullptr, " << actor->pool_ << ", " << inner_pool_
105                   << ", actor: " << actor->GetAID().Name();
106     return;
107   }
108   pool->PushActorToQueue(actor.get());
109 }
110 
GetUrl(const std::string & protocol)111 const std::string ActorMgr::GetUrl(const std::string &protocol) {
112   auto it = procotols.find(protocol);
113   if (it != procotols.end()) {
114     return it->second;
115   } else if (procotols.size() > 0) {
116     return procotols.begin()->second;
117   } else {
118     return "";
119   }
120 }
121 
AddUrl(const std::string & protocol,const std::string & url)122 void ActorMgr::AddUrl(const std::string &protocol, const std::string &url) {
123   procotols[protocol] = url;
124   AID id("a@" + url);
125   (void)urls.insert(id.GetIp() + ":" + std::to_string(id.GetPort()));
126   (void)urls.insert(id.GetProtocol() + "://" + id.GetIp() + ":" + std::to_string(id.GetPort()));
127   (void)urls.insert(std::string("127.0.0.1:") + std::to_string(id.GetPort()));
128   (void)urls.insert(protocol + "://127.0.0.1:" + std::to_string(id.GetPort()));
129 }
130 
AddIOMgr(const std::string & protocol,const std::shared_ptr<IOMgr> & ioMgr)131 void ActorMgr::AddIOMgr(const std::string &protocol, const std::shared_ptr<IOMgr> &ioMgr) { ioMgrs[protocol] = ioMgr; }
132 
RemoveActor(const std::string & name)133 void ActorMgr::RemoveActor(const std::string &name) {
134   actorsMutex.lock();
135   (void)actors.erase(name);
136   actorsMutex.unlock();
137 }
138 
TerminateAll()139 void ActorMgr::TerminateAll() {
140   if (actors.empty()) {
141     return;
142   }
143   // copy all the actors
144   std::list<ActorReference> actorsWaiting;
145   actorsMutex.lock();
146   for (auto actorIt = actors.begin(); actorIt != actors.end(); ++actorIt) {
147     actorsWaiting.push_back(actorIt->second);
148   }
149   actorsMutex.unlock();
150 
151   // send terminal msg to all actors.
152   for (auto actorIt = actorsWaiting.begin(); actorIt != actorsWaiting.end(); ++actorIt) {
153     (*actorIt)->Terminate();
154   }
155 
156   // wait actor's thread to finish and remove actor.
157   for (auto actorIt = actorsWaiting.begin(); actorIt != actorsWaiting.end(); ++actorIt) {
158     (*actorIt)->Await();
159     RemoveActor((*actorIt)->GetAID().Name());
160   }
161 }
162 
Finalize()163 void ActorMgr::Finalize() {
164   this->TerminateAll();
165   MS_LOG(INFO) << "mindrt Actors finish exiting.";
166 
167   // stop all actor threads;
168   MS_LOG(INFO) << "mindrt Threads finish exiting.";
169 
170   // stop iomgr thread
171   for (auto mgrIt = ioMgrs.begin(); mgrIt != ioMgrs.end(); ++mgrIt) {
172     MS_LOG(INFO) << "finalize IOMgr=" << mgrIt->first.c_str();
173     mgrIt->second->Finalize();
174   }
175 
176   // delete actor thread pool if use_inner_pool
177   delete inner_pool_;
178   inner_pool_ = nullptr;
179   MS_LOG(INFO) << "mindrt IOMGRS finish exiting.";
180 }
181 
GetActor(const AID & id)182 ActorReference ActorMgr::GetActor(const AID &id) {
183 #ifndef MS_COMPILE_IOS
184   actorsMutex.lock_shared();
185 #else
186   actorsMutex.lock();
187 #endif
188   const auto &actorIt = actors.find(id.Name());
189   if (actorIt != actors.end()) {
190     auto &result = actorIt->second;
191 #ifndef MS_COMPILE_IOS
192     actorsMutex.unlock_shared();
193 #else
194     actorsMutex.unlock();
195 #endif
196     return result;
197   } else {
198 #ifndef MS_COMPILE_IOS
199     actorsMutex.unlock_shared();
200 #else
201     actorsMutex.unlock();
202 #endif
203     MS_LOG(DEBUG) << "can't find ACTOR with name=" << id.Name().c_str();
204     return nullptr;
205   }
206 }
207 
EnqueueMessage(const mindspore::ActorReference actor,std::unique_ptr<mindspore::MessageBase> msg)208 int ActorMgr::EnqueueMessage(const mindspore::ActorReference actor, std::unique_ptr<mindspore::MessageBase> msg) {
209   return actor->EnqueMessage(std::move(msg));
210 }
211 
Send(const AID & to,std::unique_ptr<MessageBase> msg,bool remoteLink,bool isExactNotRemote)212 int ActorMgr::Send(const AID &to, std::unique_ptr<MessageBase> msg, bool remoteLink, bool isExactNotRemote) {
213   // The destination is local
214   if (IsLocalAddres(to)) {
215     auto actor = GetActor(to);
216     if (actor != nullptr) {
217       if (to.GetProtocol() == MINDRT_UDP && msg->GetType() == MessageBase::Type::KMSG) {
218         msg->type = MessageBase::Type::KUDP;
219       }
220       return EnqueueMessage(actor, std::move(msg));
221     } else {
222       return ACTOR_NOT_FIND;
223     }
224   } else {
225     // send to remote actor
226     if (msg->GetType() != MessageBase::Type::KMSG) {
227       MS_LOG(ERROR) << "The msg is not KMSG,it can't send to remote=" << std::string(to).c_str();
228       return ACTOR_PARAMER_ERR;
229     } else {
230       // null
231     }
232     msg->SetTo(to);
233     auto &io = ActorMgr::GetIOMgrRef(to);
234     if (io != nullptr) {
235       return io->Send(std::move(msg), remoteLink, isExactNotRemote);
236     } else {
237       MS_LOG(ERROR) << "The protocol is not supported:"
238                     << "p=" << to.GetProtocol().c_str() << ",f=" << msg->From().Name().c_str()
239                     << ",t=" << to.Name().c_str() << ",m=" << msg->Name().c_str();
240       return IO_NOT_FIND;
241     }
242   }
243 }
244 
Spawn(const ActorReference & actor,bool shareThread)245 AID ActorMgr::Spawn(const ActorReference &actor, bool shareThread) {
246   actorsMutex.lock();
247   if (actors.find(actor->GetAID().Name()) != actors.end()) {
248     actorsMutex.unlock();
249     MS_LOG(ERROR) << "The actor's name conflicts,name:" << actor->GetAID().Name().c_str();
250     MINDRT_EXIT("Actor name conflicts.");
251   }
252   MS_LOG(DEBUG) << "ACTOR was spawned,a=" << actor->GetAID().Name().c_str();
253 
254   if (shareThread) {
255     auto mailbox = std::make_unique<NonblockingMailBox>();
256     auto hook = std::make_unique<std::function<void()>>([actor]() {
257       auto actor_mgr = actor->get_actor_mgr();
258       if (actor_mgr != nullptr) {
259         actor_mgr->SetActorReady(actor);
260       } else {
261         ActorMgr::GetActorMgrRef()->SetActorReady(actor);
262       }
263     });
264     // the mailbox has this hook, the hook holds the actor reference, the actor has the mailbox. this is a cycle which
265     // will leads to memory leak. in order to fix this issue, we should explicitly free the mailbox when terminate the
266     // actor
267     mailbox->SetNotifyHook(std::move(hook));
268     actor->Spawn(actor, std::move(mailbox));
269   } else {
270     auto mailbox = std::unique_ptr<MailBox>(new (std::nothrow) BlockingMailBox());
271     actor->Spawn(actor, std::move(mailbox));
272     ActorMgr::GetActorMgrRef()->SetActorReady(actor);
273   }
274   (void)this->actors.emplace(actor->GetAID().Name(), actor);
275   actorsMutex.unlock();
276   // long time
277   actor->Init();
278   return actor->GetAID();
279 }
280 
ResetActorAfterFork(const ActorReference & actor)281 void ActorMgr::ResetActorAfterFork(const ActorReference &actor) {
282   if (actor) {
283     actor->Quit();
284     if (actor->mailbox != nullptr) {
285       (void)actor->mailbox.release();
286     }
287 
288     RemoveActor(actor->GetAID().Name());
289   }
290 }
291 
Terminate(const AID & id)292 void ActorMgr::Terminate(const AID &id) {
293   auto actor = GetActor(id);
294   if (actor != nullptr) {
295     actor->Terminate();
296     // Wait actor's thread to finish.
297     actor->Await();
298     RemoveActor(id.Name());
299   }
300 }
301 
Wait(const AID & id)302 void ActorMgr::Wait(const AID &id) {
303   auto actor = GetActor(id);
304   if (actor != nullptr) {
305     actor->Await();
306   }
307 }
308 
ChildAfterFork()309 void ActorMgr::ChildAfterFork() {
310   if (inner_pool_) {
311     MS_LOG(DEBUG) << "ActorMgr reinitialize inner_pool_ after fork.";
312     inner_pool_->ChildAfterFork();
313   }
314 }
315 
316 };  // end of namespace mindspore
317