• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTO_RPC_RPC_ACTOR_H_
18 #define MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTO_RPC_RPC_ACTOR_H_
19 
20 #include <set>
21 #include <vector>
22 #include <string>
23 #include <memory>
24 #include <utility>
25 #include "runtime/graph_scheduler/actor/kernel_actor.h"
26 #include "include/backend/distributed/cluster/cluster_context.h"
27 #include "distributed/cluster/actor_route_table_proxy.h"
28 #include "include/backend/distributed/rpc/tcp/tcp_client.h"
29 #include "include/backend/distributed/rpc/tcp/tcp_server.h"
30 #ifdef ENABLE_RDMA
31 #include "include/backend/distributed/rpc/rdma/rdma_client.h"
32 #include "include/backend/distributed/rpc/rdma/rdma_server.h"
33 #endif
34 #include "proto/rpc.pb.h"
35 #include "proto/topology.pb.h"
36 
37 namespace mindspore {
38 namespace runtime {
39 using distributed::kEnableRDMA;
40 using distributed::kMaxRetryPortNum;
41 using distributed::kRDMADevName;
42 using distributed::kRDMAIP;
43 using distributed::kRemoteFuncId;
44 using distributed::cluster::ActorRouteTableProxy;
45 using distributed::cluster::ActorRouteTableProxyPtr;
46 using distributed::cluster::ClusterContext;
47 using distributed::cluster::topology::ActorAddress;
48 using distributed::rpc::RPCClientBase;
49 using distributed::rpc::RPCServerBase;
50 using distributed::rpc::TCPClient;
51 using distributed::rpc::TCPServer;
52 using mindspore::device::KernelInfo;
53 #ifdef ENABLE_RDMA
54 using distributed::rpc::kURPCInited;
55 using distributed::rpc::RDMAClient;
56 using distributed::rpc::RDMAServer;
57 #endif
58 
59 // The inter-process edge mark between two nodes.
60 constexpr char kInterProcessEdgeMark[] = "->";
61 
62 // The magic header of the rpc data which indicates this message contains dynamic shape data.
63 constexpr char kRpcDynamicShapeData[] = "RPC_DYNAMIC_SHAPE_DATA";
64 
65 // RpcDataPtr will be used for serializing and deserializing rpc message raw pointer data.
66 using RpcDataPtr = char *;
67 
68 // RpcActor is used to do rpc with other processes in distributed execution.
69 // Besides data arrows and controlling arrows, RpcActor also has inter-process arrows which is in charge of remote
70 // communication with other processes. It supports both sync and async communication.
71 class RpcActor : public KernelActor {
72  public:
RpcActor(const std::string & name,const CNodePtr & kernel,const DeviceContext * device_context,const AID & memory_manager_aid,const AID * debug_aid,const AID * recorder_aid,GraphExecutionStrategy strategy,const std::set<size_t> & modifiable_ref_input_indexes,const std::set<size_t> & modifiable_ref_output_indexes,const KernelTransformType & type)73   explicit RpcActor(const std::string &name, const CNodePtr &kernel, const DeviceContext *device_context,
74                     const AID &memory_manager_aid, const AID *debug_aid, const AID *recorder_aid,
75                     GraphExecutionStrategy strategy, const std::set<size_t> &modifiable_ref_input_indexes,
76                     const std::set<size_t> &modifiable_ref_output_indexes, const KernelTransformType &type)
77       : KernelActor(name, kernel, device_context, memory_manager_aid, debug_aid, recorder_aid, strategy,
78                     modifiable_ref_input_indexes, modifiable_ref_output_indexes, type),
79         op_context_(nullptr),
80         input_inter_process_num_(0),
81         is_exception_thrown_(false) {}
82   ~RpcActor() override = default;
83 
84   // Normally, an actor's op_context is passed by its input actor, but rpc actors could be triggered by inter-process
85   // arrows which do not contain op_context. So we need to set op_context manually.
86   virtual void SetOpcontext(OpContext<DeviceTensor> *const op_context);
87 
88   // Reset op context. Because op context is recreated for each each sinked loop, this method should be called after
89   // each sinked loop is done in case rpc actors visit the invalid op context.
ResetOpcontext()90   virtual void ResetOpcontext() {}
91 
92   // Update rpc actor status, for example update ready status of recv actors.
UpdateStatus()93   virtual void UpdateStatus() {}
94 
95   // Flush data for rpc actor, for example flush sent data for send actors.
FlushData()96   virtual void FlushData() {}
97 
98   // Set the actor route proxy for rpc actors.
99   void set_actor_route_table_proxy(const ActorRouteTableProxyPtr &proxy);
100 
101   // Set the inter-process edge names for rpc actor.
102   void set_inter_process_edge_names(const std::vector<std::string> &edge_name);
103 
104   // Set some info which will be used for rpc routing.
SetRouteInfo(uint32_t peer_rank,const std::string & peer_role,const std::string & src_node_name,const std::string & dst_node_name)105   virtual void SetRouteInfo(uint32_t peer_rank, const std::string &peer_role, const std::string &src_node_name,
106                             const std::string &dst_node_name) {}
107 
108   // Clear resource of rpc actor.
Clear()109   virtual void Clear() {}
110 
111   /**
112    * @description: Stop rpc communication to avoid dead lock after exception is thrown.
113    * @return {void}
114    */
StopRpcAtException()115   virtual void StopRpcAtException() {}
116 
117  protected:
118   /**
119    * @description: Copy rpc data with size and update the input data's address with offset.
120    * @param {RpcDataPtr} *rpc_data: Destination data address which will be updated in this method.
121    * @param {void} *src_data: Source data address.
122    * @param {size_t} src_data_size: Source data size and the offset.
123    * @return {bool}: Whether data is successfully copied.
124    */
125   bool CopyRpcDataWithOffset(RpcDataPtr *rpc_data, const void *src_data, size_t src_data_size) const;
126 
127   // The op context to run rpc actor inter-process op. Set by method 'SetOpcontext'.
128   OpContext<DeviceTensor> *op_context_;
129 
130   // The inter-process edge names. It is also used as the actor id for route. Each edeg name is a string consists of
131   // source node name and destination node name. The format is "source node name"->"destination node name". For each
132   // inter-process edge, this is unique. Rpc actor with the same inter process edge name should not be in the same
133   // process.
134   std::vector<std::string> inter_process_edge_names_;
135 
136   // The node name of rpc actor's peers. They are not the name of send or recv nodes. Instead, they are the names of the
137   // nodes which use send node as output and recv node as input.
138   std::vector<std::string> rpc_input_node_name_;
139   std::vector<std::string> rpc_output_node_name_;
140 
141   // The iter-process inputs number. This should be the same as size of vector rpc_input_node_name_.
142   size_t input_inter_process_num_;
143 
144   // The inter-process inputs of each sequential number.
145   mindspore::HashMap<int, std::vector<std::string>> input_op_inter_process_;
146 
147   // The arrows represent inter-process communication.
148   std::vector<AID> inter_process_input_arrows_;
149   std::vector<AID> inter_process_output_arrows_;
150 
151   ActorRouteTableProxyPtr actor_route_table_proxy_;
152 
153   // The flag of whether any exception is thrown.
154   bool is_exception_thrown_;
155 
156  private:
157   friend class GraphScheduler;
158 };
159 
160 using RpcActorPtr = std::shared_ptr<RpcActor>;
161 }  // namespace runtime
162 }  // namespace mindspore
163 
164 #endif  // MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTO_RPC_RPC_ACTOR_H_
165