• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_RUNTIME_GRAPH_SCHEDULER_RPC_NODE_SCHEDULER_H_
18 #define MINDSPORE_CCSRC_RUNTIME_GRAPH_SCHEDULER_RPC_NODE_SCHEDULER_H_
19 
20 #include <map>
21 #include <string>
22 #include <memory>
23 #include "runtime/graph_scheduler/actor/actor_set.h"
24 #include "runtime/graph_scheduler/graph_compiler.h"
25 
26 namespace mindspore {
27 namespace runtime {
28 using mindspore::device::DeviceContext;
29 using mindspore::session::KernelGraph;
30 using mindspore::session::KernelWithIndex;
31 
32 // Scheduler for rpc actors, e.g., it adds inter-process arrows, generate router for actors, etc.
33 class RpcNodeScheduler {
34  public:
RpcNodeScheduler()35   RpcNodeScheduler() : op_context_(nullptr), rpc_actors_(nullptr) {}
36   ~RpcNodeScheduler() = default;
37 
38   // Build rpc actors and return rpc actor set.
39   RpcActorSetPtr Build(const ActorSet *actor_set);
40 
41   // Link rpc actors with inter-process arrows.
42   void Link(const ActorSet *actor_set) const;
43 
44   // This should be called by 'GraphScheduler::Scheduler()' method.
45   // Used to start servers for recv actors and create connections for send actors.
46   void Schedule(const ActorSet *actor_set) const;
47 
48   // Set op context to rpc actors.
49   void SetOpcontext(const RpcActorSetPtr &rpc_actors, OpContext<DeviceTensor> *const op_context);
50 
51   // Reset op context for rpc actors.
52   void ResetOpcontext(const RpcActorSetPtr &rpc_actors);
53 
54   // Clear resource of rpc actors. Finalize tcp clients/servers.
55   void Clear();
56 
57   // Abort rpc communication. This is usually called when the cluster exits with exception.
58   void Abort();
59 
60  private:
61   /**
62    * @description: Update reference counts of rpc actors's inputs and workspaces.
63    *               Because the memory of inputs and workspaces should not be released by the framework until rpc module
64    *               done sending or receiving.
65    * @param {RpcActorSetPtr} rpc_actor_set: The rpc actors set.
66    * @return {void}
67    */
68   void UpdateRpcActorRefCounts(RpcActorSetPtr rpc_actor_set) const;
69 
70   // Create new route table proxy.
71   ActorRouteTableProxyPtr CreateRouteTableProxy() const;
72 
73   OpContext<DeviceTensor> *op_context_;
74 
75   RpcActorSetPtr rpc_actors_;
76 };
77 
78 // The setter of op context for rpc actors.
79 class RpcActorOpContextSetter {
80  public:
RpcActorOpContextSetter(RpcNodeScheduler * rpc_node_scheduler,const RpcActorSetPtr & rpc_actors,OpContext<DeviceTensor> * const op_context)81   explicit RpcActorOpContextSetter(RpcNodeScheduler *rpc_node_scheduler, const RpcActorSetPtr &rpc_actors,
82                                    OpContext<DeviceTensor> *const op_context)
83       : rpc_node_scheduler_(rpc_node_scheduler), rpc_actors_(rpc_actors), op_context_(op_context) {
84     rpc_node_scheduler_->SetOpcontext(rpc_actors_, op_context_);
85   }
~RpcActorOpContextSetter()86   ~RpcActorOpContextSetter() {
87     try {
88       rpc_node_scheduler_->ResetOpcontext(rpc_actors_);
89     } catch (const std::exception &) {
90       MS_LOG(ERROR) << "Failed to reset op context.";
91     }
92   }
93 
94  private:
95   RpcNodeScheduler *rpc_node_scheduler_;
96   RpcActorSetPtr rpc_actors_;
97   OpContext<DeviceTensor> *op_context_;
98 };
99 
100 // This class is used to refresh the state of the rpc actor. For example, the mux recv actor receives requests for
101 // the service process. Currently, the requests are processed serially. Before each request (that is, the execution of
102 // an actor dag) begins, the state of the Recv actor needs to be refreshed. Make it in the ready state to continue with
103 // the next request.
104 class RpcActorStatusUpdater {
105  public:
106   static RpcActorStatusUpdater &GetInstance();
107 
108   // Set rpc actors which need to be update status.
109   void set_rpc_actors(const std::string &graph_name, const RpcActorSetPtr &rpc_actors);
110 
111   // Update rpc actors' status.
112   void UpdateRpcActorStatus(const std::string &graph_name);
113 
114   // Sent data should be flushed after each step.
115   void FlushRpcData(const std::string &graph_name);
116 
117  private:
118   RpcActorStatusUpdater() = default;
119   ~RpcActorStatusUpdater() = default;
120   DISABLE_COPY_AND_ASSIGN(RpcActorStatusUpdater);
121 
122   // Record graph's rpc actors which need to update status.
123   std::map<std::string, RpcActorSetWeakPtr> graph_to_rpc_actors_;
124 };
125 }  // namespace runtime
126 }  // namespace mindspore
127 
128 #endif  // MINDSPORE_CCSRC_RUNTIME_GRAPH_SCHEDULER_RPC_NODE_SCHEDULER_H_
129