• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CORE_MINDRT_SRC_ACTOR_ACTORMGR_H
18 #define MINDSPORE_CORE_MINDRT_SRC_ACTOR_ACTORMGR_H
19 
20 #include <atomic>
21 #include <set>
22 #include <utility>
23 #include <map>
24 #include <memory>
25 #include <string>
26 #ifndef MS_COMPILE_IOS
27 #include <shared_mutex>
28 #endif
29 #include <vector>
30 #include "actor/actor.h"
31 #include "thread/actor_threadpool.h"
32 #include "thread/hqueue.h"
33 
34 namespace mindspore {
35 class ActorBase;
36 class IOMgr;
37 class MS_CORE_API ActorMgr {
38  public:
GetActorMgrRef()39   static inline ActorMgr *GetActorMgrRef() { return &actorMgr; }
40 
41   static std::shared_ptr<IOMgr> &GetIOMgrRef(const std::string &protocol = "tcp");
42 
GetIOMgrRef(const AID & to)43   static inline std::shared_ptr<IOMgr> &GetIOMgrRef(const AID &to) { return GetIOMgrRef(to.GetProtocol()); }
44 
Receive(std::unique_ptr<MessageBase> msg)45   static void Receive(std::unique_ptr<MessageBase> msg) {
46     auto to = msg->To().Name();
47     (void)ActorMgr::GetActorMgrRef()->Send(AID(to), std::move(msg));
48   }
49 
GetActorThreadPool()50   ActorThreadPool *GetActorThreadPool() const { return inner_pool_; }
51 
52   ActorMgr();
53   ~ActorMgr();
54 
55   void Finalize();
56   // initialize actor manager resource, do not create inner thread pool by default
57   int Initialize(bool use_inner_pool = false, size_t actor_thread_num = 1, size_t max_thread_num = 1,
58                  size_t actor_queue_size = kMaxHqueueSize, const std::vector<int> &core_list = {});
59 
60   void RemoveActor(const std::string &name);
61   ActorReference GetActor(const AID &id);
62   const std::string GetUrl(const std::string &protocol = "tcp");
63   void AddUrl(const std::string &protocol, const std::string &url);
64   void AddIOMgr(const std::string &protocol, const std::shared_ptr<IOMgr> &ioMgr);
65   int Send(const AID &to, std::unique_ptr<MessageBase> msg, bool remoteLink = false, bool isExactNotRemote = false);
66   AID Spawn(const ActorReference &actor, bool shareThread = true);
67   void Terminate(const AID &id);
68   void TerminateAll();
69   void Wait(const AID &pid);
GetDelegate()70   inline const std::string &GetDelegate() const { return delegate; }
71 
SetDelegate(const std::string & d)72   inline void SetDelegate(const std::string &d) { delegate = d; }
73   void SetActorReady(const ActorReference &actor) const;
74 
75   void ChildAfterFork();
76 
77   // Quit and reset mailbox for actors after process fork, and prepare to spawn.
78   void ResetActorAfterFork(const ActorReference &actor);
79 
80  private:
IsLocalAddres(const AID & id)81   inline bool IsLocalAddres(const AID &id) {
82     if (id.Url() == "" || id.Url().empty() || urls.find(id.Url()) != urls.end()) {
83       return true;
84     } else {
85       return false;
86     }
87   }
88   int EnqueueMessage(const ActorReference actor, std::unique_ptr<MessageBase> msg);
89   // in order to avoid being initialized many times
90   bool initialized_{false};
91 
92   // actor manager support running on inner thread pool,
93   // or running on other thread pool created independently externally
94   ActorThreadPool *inner_pool_{nullptr};
95 
96   // Map of all local spawned and running processes.
97   std::map<std::string, ActorReference> actors;
98 #ifndef MS_COMPILE_IOS
99   std::shared_mutex actorsMutex;
100 #else
101   std::mutex actorsMutex;
102 #endif
103   std::map<std::string, std::string> procotols;
104   std::set<std::string> urls;
105   std::string delegate;
106   static ActorMgr actorMgr;
107   static std::map<std::string, std::shared_ptr<IOMgr> > ioMgrs;
108 };  // end of class ActorMgr
109 };  // end of namespace mindspore
110 #endif
111