• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_COMPUTE_GRAPH_NODE_H_
18 #define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_COMPUTE_GRAPH_NODE_H_
19 
20 #include <string>
21 #include <memory>
22 #include <thread>
23 #include <vector>
24 #include <map>
25 #include <shared_mutex>
26 #include "include/backend/distributed/cluster/topology/common.h"
27 #include "include/backend/distributed/rpc/tcp/tcp_client.h"
28 #include "include/backend/distributed/cluster/topology/node_base.h"
29 
30 namespace mindspore {
31 namespace distributed {
32 namespace cluster {
33 namespace topology {
34 // The ComputeGraphNode is a separate process representing a sub-graph of the distributed computation graph.
35 class BACKEND_EXPORT ComputeGraphNode : public NodeBase {
36  public:
ComputeGraphNode(const std::string & node_id,const std::string & role)37   ComputeGraphNode(const std::string &node_id, const std::string &role)
38       : NodeBase(node_id, role), client_ip_(""), authenticated_(false), enable_hb_(false) {}
39   ~ComputeGraphNode() override;
40 
41   bool Initialize() override;
42   bool Initialized() override;
43 
44   bool Finalize(bool force = false) override;
45 
46   // Send the specified message to the meta server node.
47   bool SendMessageToMSN(const std::string msg_name, const std::string &msg_body, bool sync = true);
48 
49   // Query the specified message from the meta server node according to the given message name.
50   // Returns nullptr if no message returned after timeout.
51   std::shared_ptr<std::string> RetrieveMessageFromMSN(const std::string &msg_name, uint32_t timeout = 5);
52 
53   // Write and read user defined metadata to the meta server node.
54   bool PutMetadata(const std::string &name, const std::string &value, bool sync = true);
55   bool PutMetadata(const std::string &name, const void *value, const size_t &size);
56 
57   std::string GetMetadata(const std::string &name, uint32_t timeout = 5);
58 
59   bool DeleteMetadata(const std::string &name, uint32_t timeout = 5);
60 
61   // Exchange metadata(name:value) between all the compute graph nodes.
62   // The transaction of the exchange process is guaranteed.
63   bool ExchangeMetadata(const std::string &biz, const size_t &rank_size, const std::vector<std::string> &names_prefix,
64                         const std::vector<std::string> &values, std::map<std::string, std::string> *results,
65                         uint32_t timeout = 90);
66 
67   // Get all the hostnames of compute graph nodes.
68   std::vector<std::string> GetHostNames(const std::string &role);
69 
70   void set_abnormal_callback(std::shared_ptr<std::function<void(void)>> abnormal_callback) override;
71 
72   // Return client ip of this cgn which is used for cluster building.
73   const std::string &client_ip() const;
74 
75  private:
76   // Send the register message to the meta server node when this node process startup.
77   bool Register();
78 
79   // Send the unregister message to the meta server node.
80   bool Unregister();
81 
82   // Send the heartbeat message to the meta server node.
83   bool Heartbeat();
84 
85   // Call the `Reconnect` function if the input func execution failed.
86   bool ReconnectIfNeeded(const std::function<bool(void)> &func, const std::string &error, size_t retry);
87   bool ReconnectWithTimeoutWindow(const std::function<bool(void)> &func, const std::string &error, size_t time_out);
88 
89   // Reconnect to the meta server node.
90   bool Reconnect();
91 
92   std::shared_ptr<std::string> RetrieveMessageFromMSN(const std::string &msg_name, const std::string &msg_body,
93                                                       uint32_t timeout = 5);
94 
95   // The meta server address used to synchronize metadata with other compute graph nodes.
96   MetaServerAddress meta_server_addr_;
97 
98   // The TCP client is used to send messages to meta server node.
99   std::unique_ptr<rpc::TCPClient> tcp_client_;
100 
101   // The TCP client used to send heartbeat to meta server.
102   std::unique_ptr<rpc::TCPClient> hb_client_;
103 
104   // Tcp client ip address of this cgn.
105   std::string client_ip_;
106 
107   // Incidate whether this node is authenticated by meta server node.
108   std::atomic<bool> authenticated_;
109 
110   // The heartbeat thread from compute graph node to meta server node.
111   std::thread heartbeat_;
112 
113   // Indicate whether the heartbeat thread is running.
114   bool enable_hb_;
115 
116   std::shared_ptr<std::function<void(void)>> abnormal_callback_;
117 
118   mutable std::shared_mutex exchange_meta_mutex_;
119 };
120 }  // namespace topology
121 }  // namespace cluster
122 }  // namespace distributed
123 }  // namespace mindspore
124 #endif  // MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_COMPUTE_GRAPH_NODE_H_
125