• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_RPC_MUX_RECV_ACTOR_H_
18 #define MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_RPC_MUX_RECV_ACTOR_H_
19 
20 #include <string>
21 #include <memory>
22 #include <set>
23 
24 #include "runtime/graph_scheduler/actor/rpc/recv_actor.h"
25 
26 namespace mindspore {
27 namespace runtime {
28 // MuxRecvActor inherits from RecvActor and it's used to receive data from other processes.
29 // MuxRecvActor(Multiplexed Recv Actor) can receive data from different processes, for example, when responding to
30 // requests as a service, it could receive requests from different processes and process them serially.
31 class MuxRecvActor : public RecvActor {
32  public:
MuxRecvActor(const std::string & name,const CNodePtr & kernel,const DeviceContext * device_context,const AID & memory_manager_aid,const AID * debug_aid,const AID * recorder_aid,GraphExecutionStrategy strategy,const std::set<size_t> & modifiable_ref_input_indexes,const std::set<size_t> & modifiable_ref_output_indexes)33   explicit MuxRecvActor(const std::string &name, const CNodePtr &kernel, const DeviceContext *device_context,
34                         const AID &memory_manager_aid, const AID *debug_aid, const AID *recorder_aid,
35                         GraphExecutionStrategy strategy, const std::set<size_t> &modifiable_ref_input_indexes,
36                         const std::set<size_t> &modifiable_ref_output_indexes)
37       : RecvActor(name, kernel, device_context, memory_manager_aid, debug_aid, recorder_aid, strategy,
38                   modifiable_ref_input_indexes, modifiable_ref_output_indexes) {}
39   ~MuxRecvActor() override = default;
40 
41   // Get the from actor aid of received message.
from_actor_aid()42   const AID &from_actor_aid() const { return from_actor_aid_; }
43 
44   // Clear resource of mux recv actor.
45   void Clear() override;
46 
47   // Stop mux recv actor gracefully.
48   void Finalize() override;
49 
50   // Stop rpc communication to avoid dead lock after exception is thrown.
51   void StopRpcAtException() override;
52 
53  private:
54   // Set the message handler of the server.
55   void SetMessageHandler() override;
56 
57   // The message callback of the tcp server.
58   MessageBase *HandleMessage(MessageBase *const msg);
59 
60   // Parse finalize command message from received message.
61   void ParseFinalizeReqData(size_t data_len, const MessageBase *const msg, bool *need_finalize) override;
62 
63   // Record the from actor aid when receive a message;
64   AID from_actor_aid_;
65 
66   // Whether the actor is finalized_
67   std::atomic_bool finalized_{false};
68 
69   uint32_t recv_finalize_msg_cnt_{0};
70 };
71 
72 using MuxRecvActorPtr = std::shared_ptr<MuxRecvActor>;
73 }  // namespace runtime
74 }  // namespace mindspore
75 
76 #endif  // MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_RPC_MUX_RECV_ACTOR_H_
77