1 /** 2 * Copyright 2022 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_RPC_MUX_RECV_ACTOR_H_ 18 #define MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_RPC_MUX_RECV_ACTOR_H_ 19 20 #include <string> 21 #include <memory> 22 #include <set> 23 24 #include "runtime/graph_scheduler/actor/rpc/recv_actor.h" 25 26 namespace mindspore { 27 namespace runtime { 28 // MuxRecvActor inherits from RecvActor and it's used to receive data from other processes. 29 // MuxRecvActor(Multiplexed Recv Actor) can receive data from different processes, for example, when responding to 30 // requests as a service, it could receive requests from different processes and process them serially. 31 class MuxRecvActor : public RecvActor { 32 public: MuxRecvActor(const std::string & name,const CNodePtr & kernel,const DeviceContext * device_context,const AID & memory_manager_aid,const AID * debug_aid,const AID * recorder_aid,GraphExecutionStrategy strategy,const std::set<size_t> & modifiable_ref_input_indexes,const std::set<size_t> & modifiable_ref_output_indexes)33 explicit MuxRecvActor(const std::string &name, const CNodePtr &kernel, const DeviceContext *device_context, 34 const AID &memory_manager_aid, const AID *debug_aid, const AID *recorder_aid, 35 GraphExecutionStrategy strategy, const std::set<size_t> &modifiable_ref_input_indexes, 36 const std::set<size_t> &modifiable_ref_output_indexes) 37 : RecvActor(name, kernel, device_context, memory_manager_aid, debug_aid, recorder_aid, strategy, 38 modifiable_ref_input_indexes, modifiable_ref_output_indexes) {} 39 ~MuxRecvActor() override = default; 40 41 // Get the from actor aid of received message. from_actor_aid()42 const AID &from_actor_aid() const { return from_actor_aid_; } 43 44 // Clear resource of mux recv actor. 45 void Clear() override; 46 47 // Stop mux recv actor gracefully. 48 void Finalize() override; 49 50 // Stop rpc communication to avoid dead lock after exception is thrown. 51 void StopRpcAtException() override; 52 53 private: 54 // Set the message handler of the server. 55 void SetMessageHandler() override; 56 57 // The message callback of the tcp server. 58 MessageBase *HandleMessage(MessageBase *const msg); 59 60 // Parse finalize command message from received message. 61 void ParseFinalizeReqData(size_t data_len, const MessageBase *const msg, bool *need_finalize) override; 62 63 // Record the from actor aid when receive a message; 64 AID from_actor_aid_; 65 66 // Whether the actor is finalized_ 67 std::atomic_bool finalized_{false}; 68 69 uint32_t recv_finalize_msg_cnt_{0}; 70 }; 71 72 using MuxRecvActorPtr = std::shared_ptr<MuxRecvActor>; 73 } // namespace runtime 74 } // namespace mindspore 75 76 #endif // MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_RPC_MUX_RECV_ACTOR_H_ 77