1 /** 2 * Copyright 2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CORE_MINDRT_SRC_ACTOR_ACTORMGR_H 18 #define MINDSPORE_CORE_MINDRT_SRC_ACTOR_ACTORMGR_H 19 20 #include <atomic> 21 #include <set> 22 #include <utility> 23 #include <map> 24 #include <memory> 25 #include <string> 26 #ifndef MS_COMPILE_IOS 27 #include <shared_mutex> 28 #endif 29 #include "actor/actor.h" 30 #include "thread/actor_threadpool.h" 31 #include "thread/hqueue.h" 32 33 namespace mindspore { 34 35 class ActorBase; 36 class IOMgr; 37 38 class ActorMgr { 39 public: GetActorMgrRef()40 static inline ActorMgr *GetActorMgrRef() { return &actorMgr; } 41 42 static std::shared_ptr<IOMgr> &GetIOMgrRef(const std::string &protocol = "tcp"); 43 GetIOMgrRef(const AID & to)44 static inline std::shared_ptr<IOMgr> &GetIOMgrRef(const AID &to) { return GetIOMgrRef(to.GetProtocol()); } 45 Receive(std::unique_ptr<MessageBase> msg)46 static void Receive(std::unique_ptr<MessageBase> msg) { 47 auto to = msg->To().Name(); 48 (void)ActorMgr::GetActorMgrRef()->Send(AID(to), std::move(msg)); 49 } 50 GetActorThreadPool()51 ActorThreadPool *GetActorThreadPool() const { return inner_pool_; } 52 53 ActorMgr(); 54 ~ActorMgr(); 55 56 void Finalize(); 57 // initialize actor manager resource, do not create inner thread pool by default 58 int Initialize(bool use_inner_pool = false, size_t actor_thread_num = 1, size_t max_thread_num = 1); 59 60 void RemoveActor(const std::string &name); 61 ActorReference GetActor(const AID &id); 62 const std::string GetUrl(const std::string &protocol = "tcp"); 63 void AddUrl(const std::string &protocol, const std::string &url); 64 void AddIOMgr(const std::string &protocol, const std::shared_ptr<IOMgr> &ioMgr); 65 int Send(const AID &to, std::unique_ptr<MessageBase> msg, bool remoteLink = false, bool isExactNotRemote = false); 66 AID Spawn(const ActorReference &actor, bool shareThread = true); 67 void Terminate(const AID &id); 68 void TerminateAll(); 69 void Wait(const AID &pid); GetDelegate()70 inline const std::string &GetDelegate() const { return delegate; } 71 SetDelegate(const std::string & d)72 inline void SetDelegate(const std::string &d) { delegate = d; } 73 void SetActorReady(const ActorReference &actor) const; 74 75 private: IsLocalAddres(const AID & id)76 inline bool IsLocalAddres(const AID &id) { 77 if (id.Url() == "" || id.Url().empty() || urls.find(id.Url()) != urls.end()) { 78 return true; 79 } else { 80 return false; 81 } 82 } 83 int EnqueueMessage(const ActorReference actor, std::unique_ptr<MessageBase> msg); 84 // in order to avoid being initialized many times 85 std::atomic_bool initialized_{false}; 86 87 // actor manager support running on inner thread pool, 88 // or running on other thread pool created independently externally 89 ActorThreadPool *inner_pool_{nullptr}; 90 91 // Map of all local spawned and running processes. 92 std::map<std::string, ActorReference> actors; 93 #ifndef MS_COMPILE_IOS 94 std::shared_mutex actorsMutex; 95 #else 96 std::mutex actorsMutex; 97 #endif 98 std::map<std::string, std::string> procotols; 99 std::set<std::string> urls; 100 std::string delegate; 101 static ActorMgr actorMgr; 102 static std::map<std::string, std::shared_ptr<IOMgr> > ioMgrs; 103 }; // end of class ActorMgr 104 105 }; // end of namespace mindspore 106 #endif 107