1 /** 2 * Copyright 2022 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_RUNTIME_GRAPH_SCHEDULER_RPC_NODE_SCHEDULER_H_ 18 #define MINDSPORE_CCSRC_RUNTIME_GRAPH_SCHEDULER_RPC_NODE_SCHEDULER_H_ 19 20 #include <map> 21 #include <string> 22 #include <memory> 23 #include "runtime/graph_scheduler/actor/actor_set.h" 24 #include "runtime/graph_scheduler/graph_compiler.h" 25 26 namespace mindspore { 27 namespace runtime { 28 using mindspore::device::DeviceContext; 29 using mindspore::session::KernelGraph; 30 using mindspore::session::KernelWithIndex; 31 32 // Scheduler for rpc actors, e.g., it adds inter-process arrows, generate router for actors, etc. 33 class RpcNodeScheduler { 34 public: RpcNodeScheduler()35 RpcNodeScheduler() : op_context_(nullptr), rpc_actors_(nullptr) {} 36 ~RpcNodeScheduler() = default; 37 38 // Build rpc actors and return rpc actor set. 39 RpcActorSetPtr Build(const ActorSet *actor_set); 40 41 // Link rpc actors with inter-process arrows. 42 void Link(const ActorSet *actor_set) const; 43 44 // This should be called by 'GraphScheduler::Scheduler()' method. 45 // Used to start servers for recv actors and create connections for send actors. 46 void Schedule(const ActorSet *actor_set) const; 47 48 // Set op context to rpc actors. 49 void SetOpcontext(const RpcActorSetPtr &rpc_actors, OpContext<DeviceTensor> *const op_context); 50 51 // Reset op context for rpc actors. 52 void ResetOpcontext(const RpcActorSetPtr &rpc_actors); 53 54 // Clear resource of rpc actors. Finalize tcp clients/servers. 55 void Clear(); 56 57 // Abort rpc communication. This is usually called when the cluster exits with exception. 58 void Abort(); 59 60 private: 61 /** 62 * @description: Update reference counts of rpc actors's inputs and workspaces. 63 * Because the memory of inputs and workspaces should not be released by the framework until rpc module 64 * done sending or receiving. 65 * @param {RpcActorSetPtr} rpc_actor_set: The rpc actors set. 66 * @return {void} 67 */ 68 void UpdateRpcActorRefCounts(RpcActorSetPtr rpc_actor_set) const; 69 70 // Create new route table proxy. 71 ActorRouteTableProxyPtr CreateRouteTableProxy() const; 72 73 OpContext<DeviceTensor> *op_context_; 74 75 RpcActorSetPtr rpc_actors_; 76 }; 77 78 // The setter of op context for rpc actors. 79 class RpcActorOpContextSetter { 80 public: RpcActorOpContextSetter(RpcNodeScheduler * rpc_node_scheduler,const RpcActorSetPtr & rpc_actors,OpContext<DeviceTensor> * const op_context)81 explicit RpcActorOpContextSetter(RpcNodeScheduler *rpc_node_scheduler, const RpcActorSetPtr &rpc_actors, 82 OpContext<DeviceTensor> *const op_context) 83 : rpc_node_scheduler_(rpc_node_scheduler), rpc_actors_(rpc_actors), op_context_(op_context) { 84 rpc_node_scheduler_->SetOpcontext(rpc_actors_, op_context_); 85 } ~RpcActorOpContextSetter()86 ~RpcActorOpContextSetter() { 87 try { 88 rpc_node_scheduler_->ResetOpcontext(rpc_actors_); 89 } catch (const std::exception &) { 90 MS_LOG(ERROR) << "Failed to reset op context."; 91 } 92 } 93 94 private: 95 RpcNodeScheduler *rpc_node_scheduler_; 96 RpcActorSetPtr rpc_actors_; 97 OpContext<DeviceTensor> *op_context_; 98 }; 99 100 // This class is used to refresh the state of the rpc actor. For example, the mux recv actor receives requests for 101 // the service process. Currently, the requests are processed serially. Before each request (that is, the execution of 102 // an actor dag) begins, the state of the Recv actor needs to be refreshed. Make it in the ready state to continue with 103 // the next request. 104 class RpcActorStatusUpdater { 105 public: 106 static RpcActorStatusUpdater &GetInstance(); 107 108 // Set rpc actors which need to be update status. 109 void set_rpc_actors(const std::string &graph_name, const RpcActorSetPtr &rpc_actors); 110 111 // Update rpc actors' status. 112 void UpdateRpcActorStatus(const std::string &graph_name); 113 114 // Sent data should be flushed after each step. 115 void FlushRpcData(const std::string &graph_name); 116 117 private: 118 RpcActorStatusUpdater() = default; 119 ~RpcActorStatusUpdater() = default; 120 DISABLE_COPY_AND_ASSIGN(RpcActorStatusUpdater); 121 122 // Record graph's rpc actors which need to update status. 123 std::map<std::string, RpcActorSetWeakPtr> graph_to_rpc_actors_; 124 }; 125 } // namespace runtime 126 } // namespace mindspore 127 128 #endif // MINDSPORE_CCSRC_RUNTIME_GRAPH_SCHEDULER_RPC_NODE_SCHEDULER_H_ 129