1 /** 2 * Copyright 2022 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTO_RPC_RPC_ACTOR_H_ 18 #define MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTO_RPC_RPC_ACTOR_H_ 19 20 #include <set> 21 #include <vector> 22 #include <string> 23 #include <memory> 24 #include <utility> 25 #include "runtime/graph_scheduler/actor/kernel_actor.h" 26 #include "include/backend/distributed/cluster/cluster_context.h" 27 #include "distributed/cluster/actor_route_table_proxy.h" 28 #include "include/backend/distributed/rpc/tcp/tcp_client.h" 29 #include "include/backend/distributed/rpc/tcp/tcp_server.h" 30 #ifdef ENABLE_RDMA 31 #include "include/backend/distributed/rpc/rdma/rdma_client.h" 32 #include "include/backend/distributed/rpc/rdma/rdma_server.h" 33 #endif 34 #include "proto/rpc.pb.h" 35 #include "proto/topology.pb.h" 36 37 namespace mindspore { 38 namespace runtime { 39 using distributed::kEnableRDMA; 40 using distributed::kMaxRetryPortNum; 41 using distributed::kRDMADevName; 42 using distributed::kRDMAIP; 43 using distributed::kRemoteFuncId; 44 using distributed::cluster::ActorRouteTableProxy; 45 using distributed::cluster::ActorRouteTableProxyPtr; 46 using distributed::cluster::ClusterContext; 47 using distributed::cluster::topology::ActorAddress; 48 using distributed::rpc::RPCClientBase; 49 using distributed::rpc::RPCServerBase; 50 using distributed::rpc::TCPClient; 51 using distributed::rpc::TCPServer; 52 using mindspore::device::KernelInfo; 53 #ifdef ENABLE_RDMA 54 using distributed::rpc::kURPCInited; 55 using distributed::rpc::RDMAClient; 56 using distributed::rpc::RDMAServer; 57 #endif 58 59 // The inter-process edge mark between two nodes. 60 constexpr char kInterProcessEdgeMark[] = "->"; 61 62 // The magic header of the rpc data which indicates this message contains dynamic shape data. 63 constexpr char kRpcDynamicShapeData[] = "RPC_DYNAMIC_SHAPE_DATA"; 64 65 // RpcDataPtr will be used for serializing and deserializing rpc message raw pointer data. 66 using RpcDataPtr = char *; 67 68 // RpcActor is used to do rpc with other processes in distributed execution. 69 // Besides data arrows and controlling arrows, RpcActor also has inter-process arrows which is in charge of remote 70 // communication with other processes. It supports both sync and async communication. 71 class RpcActor : public KernelActor { 72 public: RpcActor(const std::string & name,const CNodePtr & kernel,const DeviceContext * device_context,const AID & memory_manager_aid,const AID * debug_aid,const AID * recorder_aid,GraphExecutionStrategy strategy,const std::set<size_t> & modifiable_ref_input_indexes,const std::set<size_t> & modifiable_ref_output_indexes,const KernelTransformType & type)73 explicit RpcActor(const std::string &name, const CNodePtr &kernel, const DeviceContext *device_context, 74 const AID &memory_manager_aid, const AID *debug_aid, const AID *recorder_aid, 75 GraphExecutionStrategy strategy, const std::set<size_t> &modifiable_ref_input_indexes, 76 const std::set<size_t> &modifiable_ref_output_indexes, const KernelTransformType &type) 77 : KernelActor(name, kernel, device_context, memory_manager_aid, debug_aid, recorder_aid, strategy, 78 modifiable_ref_input_indexes, modifiable_ref_output_indexes, type), 79 op_context_(nullptr), 80 input_inter_process_num_(0), 81 is_exception_thrown_(false) {} 82 ~RpcActor() override = default; 83 84 // Normally, an actor's op_context is passed by its input actor, but rpc actors could be triggered by inter-process 85 // arrows which do not contain op_context. So we need to set op_context manually. 86 virtual void SetOpcontext(OpContext<DeviceTensor> *const op_context); 87 88 // Reset op context. Because op context is recreated for each each sinked loop, this method should be called after 89 // each sinked loop is done in case rpc actors visit the invalid op context. ResetOpcontext()90 virtual void ResetOpcontext() {} 91 92 // Update rpc actor status, for example update ready status of recv actors. UpdateStatus()93 virtual void UpdateStatus() {} 94 95 // Flush data for rpc actor, for example flush sent data for send actors. FlushData()96 virtual void FlushData() {} 97 98 // Set the actor route proxy for rpc actors. 99 void set_actor_route_table_proxy(const ActorRouteTableProxyPtr &proxy); 100 101 // Set the inter-process edge names for rpc actor. 102 void set_inter_process_edge_names(const std::vector<std::string> &edge_name); 103 104 // Set some info which will be used for rpc routing. SetRouteInfo(uint32_t peer_rank,const std::string & peer_role,const std::string & src_node_name,const std::string & dst_node_name)105 virtual void SetRouteInfo(uint32_t peer_rank, const std::string &peer_role, const std::string &src_node_name, 106 const std::string &dst_node_name) {} 107 108 // Clear resource of rpc actor. Clear()109 virtual void Clear() {} 110 111 /** 112 * @description: Stop rpc communication to avoid dead lock after exception is thrown. 113 * @return {void} 114 */ StopRpcAtException()115 virtual void StopRpcAtException() {} 116 117 protected: 118 /** 119 * @description: Copy rpc data with size and update the input data's address with offset. 120 * @param {RpcDataPtr} *rpc_data: Destination data address which will be updated in this method. 121 * @param {void} *src_data: Source data address. 122 * @param {size_t} src_data_size: Source data size and the offset. 123 * @return {bool}: Whether data is successfully copied. 124 */ 125 bool CopyRpcDataWithOffset(RpcDataPtr *rpc_data, const void *src_data, size_t src_data_size) const; 126 127 // The op context to run rpc actor inter-process op. Set by method 'SetOpcontext'. 128 OpContext<DeviceTensor> *op_context_; 129 130 // The inter-process edge names. It is also used as the actor id for route. Each edeg name is a string consists of 131 // source node name and destination node name. The format is "source node name"->"destination node name". For each 132 // inter-process edge, this is unique. Rpc actor with the same inter process edge name should not be in the same 133 // process. 134 std::vector<std::string> inter_process_edge_names_; 135 136 // The node name of rpc actor's peers. They are not the name of send or recv nodes. Instead, they are the names of the 137 // nodes which use send node as output and recv node as input. 138 std::vector<std::string> rpc_input_node_name_; 139 std::vector<std::string> rpc_output_node_name_; 140 141 // The iter-process inputs number. This should be the same as size of vector rpc_input_node_name_. 142 size_t input_inter_process_num_; 143 144 // The inter-process inputs of each sequential number. 145 mindspore::HashMap<int, std::vector<std::string>> input_op_inter_process_; 146 147 // The arrows represent inter-process communication. 148 std::vector<AID> inter_process_input_arrows_; 149 std::vector<AID> inter_process_output_arrows_; 150 151 ActorRouteTableProxyPtr actor_route_table_proxy_; 152 153 // The flag of whether any exception is thrown. 154 bool is_exception_thrown_; 155 156 private: 157 friend class GraphScheduler; 158 }; 159 160 using RpcActorPtr = std::shared_ptr<RpcActor>; 161 } // namespace runtime 162 } // namespace mindspore 163 164 #endif // MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTO_RPC_RPC_ACTOR_H_ 165