• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <map>
17 #include <list>
18 #include <string>
19 #include <memory>
20 #include <utility>
21 
22 #include "actor/actormgr.h"
23 #include "actor/iomgr.h"
24 
25 namespace mindspore {
26 ActorMgr ActorMgr::actorMgr;
27 std::map<std::string, std::shared_ptr<IOMgr>> ActorMgr::ioMgrs;
28 
GetIOMgrRef(const std::string & protocol)29 std::shared_ptr<IOMgr> &ActorMgr::GetIOMgrRef(const std::string &protocol) {
30   auto it = ioMgrs.find(protocol);
31   if (it != ioMgrs.end()) {
32     return it->second;
33   } else {
34     MS_LOG(DEBUG) << "Can't find IOMgr of protocol " << protocol.c_str();
35     static std::shared_ptr<IOMgr> nullIOMgr;
36     return nullIOMgr;
37   }
38 }
ActorMgr()39 ActorMgr::ActorMgr() : actors(), procotols(), urls() {
40   actors.clear();
41   procotols.clear();
42   urls.clear();
43 }
44 
~ActorMgr()45 ActorMgr::~ActorMgr() {
46   if (inner_pool_ != nullptr) {
47     delete inner_pool_;
48     inner_pool_ = nullptr;
49   }
50 }
51 
Initialize(bool use_inner_pool,size_t actor_thread_num,size_t max_thread_num)52 int ActorMgr::Initialize(bool use_inner_pool, size_t actor_thread_num, size_t max_thread_num) {
53   bool expected = false;
54   if (!initialized_.compare_exchange_strong(expected, true)) {
55     MS_LOG(DEBUG) << "Actor Manager has been initialized before";
56     return MINDRT_OK;
57   }
58   // create inner thread pool only when specified use_inner_pool
59   if (use_inner_pool) {
60     if (max_thread_num <= actor_thread_num) {
61       inner_pool_ = ActorThreadPool::CreateThreadPool(actor_thread_num);
62       if (inner_pool_ == nullptr) {
63         MS_LOG(ERROR) << "ActorMgr CreateThreadPool failed";
64         return MINDRT_ERROR;
65       }
66     } else {
67       inner_pool_ = ActorThreadPool::CreateThreadPool(actor_thread_num, max_thread_num, {});
68       if (inner_pool_ == nullptr) {
69         MS_LOG(ERROR) << "ActorMgr CreateThreadPool failed";
70         return MINDRT_ERROR;
71       }
72       inner_pool_->SetActorThreadNum(actor_thread_num);
73       inner_pool_->DisableOccupiedActorThread();
74       inner_pool_->SetKernelThreadNum(max_thread_num - actor_thread_num);
75     }
76     if (inner_pool_ != nullptr) {
77       inner_pool_->SetMaxSpinCount(kDefaultSpinCount);
78       inner_pool_->SetSpinCountMaxValue();
79     }
80   }
81   return MINDRT_OK;
82 }
83 
SetActorReady(const ActorReference & actor) const84 void ActorMgr::SetActorReady(const ActorReference &actor) const {
85   // use inner thread pool or actor thread pool created externally
86   // priority to use actor thread pool
87   MINDRT_OOM_EXIT(actor);
88   ActorThreadPool *pool = actor->pool_ ? actor->pool_ : inner_pool_;
89   if (pool == nullptr) {
90     MS_LOG(ERROR) << "ThreadPool is nullptr, " << actor->pool_ << ", " << inner_pool_
91                   << ", actor: " << actor->GetAID().Name();
92     return;
93   }
94   pool->PushActorToQueue(actor.get());
95 }
96 
GetUrl(const std::string & protocol)97 const std::string ActorMgr::GetUrl(const std::string &protocol) {
98   auto it = procotols.find(protocol);
99   if (it != procotols.end()) {
100     return it->second;
101   } else if (procotols.size() > 0) {
102     return procotols.begin()->second;
103   } else {
104     return "";
105   }
106 }
107 
AddUrl(const std::string & protocol,const std::string & url)108 void ActorMgr::AddUrl(const std::string &protocol, const std::string &url) {
109   procotols[protocol] = url;
110   AID id("a@" + url);
111   (void)urls.insert(id.GetIp() + ":" + std::to_string(id.GetPort()));
112   (void)urls.insert(id.GetProtocol() + "://" + id.GetIp() + ":" + std::to_string(id.GetPort()));
113   (void)urls.insert(std::string("127.0.0.1:") + std::to_string(id.GetPort()));
114   (void)urls.insert(protocol + "://127.0.0.1:" + std::to_string(id.GetPort()));
115 }
116 
AddIOMgr(const std::string & protocol,const std::shared_ptr<IOMgr> & ioMgr)117 void ActorMgr::AddIOMgr(const std::string &protocol, const std::shared_ptr<IOMgr> &ioMgr) { ioMgrs[protocol] = ioMgr; }
118 
RemoveActor(const std::string & name)119 void ActorMgr::RemoveActor(const std::string &name) {
120   actorsMutex.lock();
121   (void)actors.erase(name);
122   actorsMutex.unlock();
123 }
124 
TerminateAll()125 void ActorMgr::TerminateAll() {
126   // copy all the actors
127   std::list<ActorReference> actorsWaiting;
128   actorsMutex.lock();
129   for (auto actorIt = actors.begin(); actorIt != actors.end(); ++actorIt) {
130     actorsWaiting.push_back(actorIt->second);
131   }
132   actorsMutex.unlock();
133 
134   // send terminal msg to all actors.
135   for (auto actorIt = actorsWaiting.begin(); actorIt != actorsWaiting.end(); ++actorIt) {
136     (*actorIt)->Terminate();
137   }
138 
139   // wait actor's thread to finish.
140   for (auto actorIt = actorsWaiting.begin(); actorIt != actorsWaiting.end(); ++actorIt) {
141     (*actorIt)->Await();
142   }
143 }
144 
Finalize()145 void ActorMgr::Finalize() {
146   this->TerminateAll();
147   MS_LOG(INFO) << "mindrt Actors finish exiting.";
148 
149   // stop all actor threads;
150   MS_LOG(INFO) << "mindrt Threads finish exiting.";
151 
152   // stop iomgr thread
153   for (auto mgrIt = ioMgrs.begin(); mgrIt != ioMgrs.end(); ++mgrIt) {
154     MS_LOG(INFO) << "finalize IOMgr=" << mgrIt->first.c_str();
155     mgrIt->second->Finish();
156   }
157 
158   // delete actor thread pool if use_inner_pool
159   delete inner_pool_;
160   inner_pool_ = nullptr;
161   MS_LOG(INFO) << "mindrt IOMGRS finish exiting.";
162 }
163 
GetActor(const AID & id)164 ActorReference ActorMgr::GetActor(const AID &id) {
165 #ifndef MS_COMPILE_IOS
166   actorsMutex.lock_shared();
167 #else
168   actorsMutex.lock();
169 #endif
170   const auto &actorIt = actors.find(id.Name());
171   if (actorIt != actors.end()) {
172     auto &result = actorIt->second;
173 #ifndef MS_COMPILE_IOS
174     actorsMutex.unlock_shared();
175 #else
176     actorsMutex.unlock();
177 #endif
178     return result;
179   } else {
180 #ifndef MS_COMPILE_IOS
181     actorsMutex.unlock_shared();
182 #else
183     actorsMutex.unlock();
184 #endif
185     MS_LOG(DEBUG) << "can't find ACTOR with name=" << id.Name().c_str();
186     return nullptr;
187   }
188 }
189 
EnqueueMessage(const mindspore::ActorReference actor,std::unique_ptr<mindspore::MessageBase> msg)190 int ActorMgr::EnqueueMessage(const mindspore::ActorReference actor, std::unique_ptr<mindspore::MessageBase> msg) {
191   return actor->EnqueMessage(std::move(msg));
192 }
193 
Send(const AID & to,std::unique_ptr<MessageBase> msg,bool remoteLink,bool isExactNotRemote)194 int ActorMgr::Send(const AID &to, std::unique_ptr<MessageBase> msg, bool remoteLink, bool isExactNotRemote) {
195   // The destination is local
196   if (IsLocalAddres(to)) {
197     auto actor = GetActor(to);
198     if (actor != nullptr) {
199       if (to.GetProtocol() == MINDRT_UDP && msg->GetType() == MessageBase::Type::KMSG) {
200         msg->type = MessageBase::Type::KUDP;
201       }
202       return EnqueueMessage(actor, std::move(msg));
203     } else {
204       return ACTOR_NOT_FIND;
205     }
206   } else {
207     // send to remote actor
208     if (msg->GetType() != MessageBase::Type::KMSG) {
209       MS_LOG(ERROR) << "The msg is not KMSG,it can't send to remote=" << std::string(to).c_str();
210       return ACTOR_PARAMER_ERR;
211     } else {
212       // null
213     }
214     msg->SetTo(to);
215     auto &io = ActorMgr::GetIOMgrRef(to);
216     if (io != nullptr) {
217       return io->Send(std::move(msg), remoteLink, isExactNotRemote);
218     } else {
219       MS_LOG(ERROR) << "The protocol is not supported:"
220                     << "p=" << to.GetProtocol().c_str() << ",f=" << msg->From().Name().c_str()
221                     << ",t=" << to.Name().c_str() << ",m=" << msg->Name().c_str();
222       return IO_NOT_FIND;
223     }
224   }
225 }
226 
Spawn(const ActorReference & actor,bool shareThread)227 AID ActorMgr::Spawn(const ActorReference &actor, bool shareThread) {
228   actorsMutex.lock();
229   if (actors.find(actor->GetAID().Name()) != actors.end()) {
230     actorsMutex.unlock();
231     MS_LOG(ERROR) << "The actor's name conflicts,name:" << actor->GetAID().Name().c_str();
232     MINDRT_EXIT("Actor name conflicts.");
233   }
234   MS_LOG(DEBUG) << "ACTOR was spawned,a=" << actor->GetAID().Name().c_str();
235 
236   if (shareThread) {
237     auto mailbox = std::unique_ptr<MailBox>(new (std::nothrow) NonblockingMailBox());
238     auto hook = std::unique_ptr<std::function<void()>>(
239       new std::function<void()>([actor]() { ActorMgr::GetActorMgrRef()->SetActorReady(actor); }));
240     // the mailbox has this hook, the hook holds the actor reference, the actor has the mailbox. this is a cycle which
241     // will leads to memory leak. in order to fix this issue, we should explicitly free the mailbox when terminate the
242     // actor
243     mailbox->SetNotifyHook(std::move(hook));
244     actor->Spawn(actor, std::move(mailbox));
245 
246   } else {
247     auto mailbox = std::unique_ptr<MailBox>(new (std::nothrow) BlockingMailBox());
248     actor->Spawn(actor, std::move(mailbox));
249     ActorMgr::GetActorMgrRef()->SetActorReady(actor);
250   }
251 
252   (void)this->actors.emplace(actor->GetAID().Name(), actor);
253   actorsMutex.unlock();
254 
255   // long time
256   actor->Init();
257 
258   return actor->GetAID();
259 }
260 
Terminate(const AID & id)261 void ActorMgr::Terminate(const AID &id) {
262   auto actor = GetActor(id);
263   if (actor != nullptr) {
264     actor->Terminate();
265     // Wait actor's thread to finish.
266     actor->Await();
267     RemoveActor(id.Name());
268   }
269 }
270 
Wait(const AID & id)271 void ActorMgr::Wait(const AID &id) {
272   auto actor = GetActor(id);
273   if (actor != nullptr) {
274     actor->Await();
275   }
276 }
277 };  // end of namespace mindspore
278