1 /** 2 * Copyright 2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CORE_MINDRT_SRC_ACTOR_ACTORMGR_H 18 #define MINDSPORE_CORE_MINDRT_SRC_ACTOR_ACTORMGR_H 19 20 #include <atomic> 21 #include <set> 22 #include <utility> 23 #include <map> 24 #include <memory> 25 #include <string> 26 #ifndef MS_COMPILE_IOS 27 #include <shared_mutex> 28 #endif 29 #include <vector> 30 #include "actor/actor.h" 31 #include "thread/actor_threadpool.h" 32 #include "thread/hqueue.h" 33 34 namespace mindspore { 35 class ActorBase; 36 class IOMgr; 37 class MS_CORE_API ActorMgr { 38 public: GetActorMgrRef()39 static inline ActorMgr *GetActorMgrRef() { return &actorMgr; } 40 41 static std::shared_ptr<IOMgr> &GetIOMgrRef(const std::string &protocol = "tcp"); 42 GetIOMgrRef(const AID & to)43 static inline std::shared_ptr<IOMgr> &GetIOMgrRef(const AID &to) { return GetIOMgrRef(to.GetProtocol()); } 44 Receive(std::unique_ptr<MessageBase> msg)45 static void Receive(std::unique_ptr<MessageBase> msg) { 46 auto to = msg->To().Name(); 47 (void)ActorMgr::GetActorMgrRef()->Send(AID(to), std::move(msg)); 48 } 49 GetActorThreadPool()50 ActorThreadPool *GetActorThreadPool() const { return inner_pool_; } 51 52 ActorMgr(); 53 ~ActorMgr(); 54 55 void Finalize(); 56 // initialize actor manager resource, do not create inner thread pool by default 57 int Initialize(bool use_inner_pool = false, size_t actor_thread_num = 1, size_t max_thread_num = 1, 58 size_t actor_queue_size = kMaxHqueueSize, const std::vector<int> &core_list = {}); 59 60 void RemoveActor(const std::string &name); 61 ActorReference GetActor(const AID &id); 62 const std::string GetUrl(const std::string &protocol = "tcp"); 63 void AddUrl(const std::string &protocol, const std::string &url); 64 void AddIOMgr(const std::string &protocol, const std::shared_ptr<IOMgr> &ioMgr); 65 int Send(const AID &to, std::unique_ptr<MessageBase> msg, bool remoteLink = false, bool isExactNotRemote = false); 66 AID Spawn(const ActorReference &actor, bool shareThread = true); 67 void Terminate(const AID &id); 68 void TerminateAll(); 69 void Wait(const AID &pid); GetDelegate()70 inline const std::string &GetDelegate() const { return delegate; } 71 SetDelegate(const std::string & d)72 inline void SetDelegate(const std::string &d) { delegate = d; } 73 void SetActorReady(const ActorReference &actor) const; 74 75 void ChildAfterFork(); 76 77 // Quit and reset mailbox for actors after process fork, and prepare to spawn. 78 void ResetActorAfterFork(const ActorReference &actor); 79 80 private: IsLocalAddres(const AID & id)81 inline bool IsLocalAddres(const AID &id) { 82 if (id.Url() == "" || id.Url().empty() || urls.find(id.Url()) != urls.end()) { 83 return true; 84 } else { 85 return false; 86 } 87 } 88 int EnqueueMessage(const ActorReference actor, std::unique_ptr<MessageBase> msg); 89 // in order to avoid being initialized many times 90 bool initialized_{false}; 91 92 // actor manager support running on inner thread pool, 93 // or running on other thread pool created independently externally 94 ActorThreadPool *inner_pool_{nullptr}; 95 96 // Map of all local spawned and running processes. 97 std::map<std::string, ActorReference> actors; 98 #ifndef MS_COMPILE_IOS 99 std::shared_mutex actorsMutex; 100 #else 101 std::mutex actorsMutex; 102 #endif 103 std::map<std::string, std::string> procotols; 104 std::set<std::string> urls; 105 std::string delegate; 106 static ActorMgr actorMgr; 107 static std::map<std::string, std::shared_ptr<IOMgr> > ioMgrs; 108 }; // end of class ActorMgr 109 }; // end of namespace mindspore 110 #endif 111