• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_META_SERVER_NODE_H_
18 #define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_META_SERVER_NODE_H_
19 
20 #include <string>
21 #include <memory>
22 #include <map>
23 #include <thread>
24 #include <shared_mutex>
25 #include <unordered_map>
26 #include "include/backend/distributed/rpc/tcp/tcp_server.h"
27 #include "distributed/recovery/configuration.h"
28 #include "include/backend/distributed/cluster/topology/node_base.h"
29 
30 namespace mindspore {
31 namespace distributed {
32 namespace cluster {
33 namespace topology {
34 // Indicates the state of compute graph node.
35 enum class NodeState {
36   // This node is newly created and unauthenticated.
37   kNew = 0,
38 
39   // This node has finished registration from meta server.
40   kRegistered,
41 
42   // This node has finished unregistration from meta server.
43   kUnregistered,
44 
45   // This node has timed out because there's no heartbeat message after `kNodeTimeout`.
46   kTimeout
47 };
48 
49 // Record the state of the compute graph node.
50 struct NodeInfo {
NodeInfoNodeInfo51   explicit NodeInfo(const std::string &id) { node_id = id; }
52   std::string node_id;
53 
54   // The local host name of this cluster node.
55   std::string host_name;
56 
57   // The host ip of this node in the cluster. Nodes use this address to create network communication with each other.
58   std::string host_ip;
59 
60   // The role name of this cluster node.
61   std::string role;
62 
63   // The rank id of this cluster node(only for compute graph node).
64   uint32_t rank_id{0};
65 
66   // The timestamp of last heartbeat.
67   // This timestamp is considered the health state of the node.
68   time_t last_update{0};
69 
70   // Maintain the state of the node.
71   NodeState state{NodeState::kNew};
72 };
73 
Dec2Hex(int i,uint32_t width)74 inline std::string Dec2Hex(int i, uint32_t width) {
75   std::string temp;
76   std::stringstream ss;
77   ss << std::hex << i;
78   ss >> temp;
79   if (width > temp.size()) {
80     return std::string((width - temp.size()), '0') + temp;
81   }
82   return temp;
83 }
84 
GenerateIpInOrder(const std::string & ip)85 inline std::string GenerateIpInOrder(const std::string &ip) {
86   rpc::SocketAddress addr;
87   std::string ordered_ip = "";
88   uint32_t dec_2_hex_width = 2;
89   int result = inet_pton(AF_INET, ip.c_str(), &addr.saIn.sin_addr);
90   if (result > 0) {
91     for (size_t i = 0; i < sizeof(addr.saIn.sin_addr.s_addr) / sizeof(unsigned char); i++) {
92       ordered_ip += Dec2Hex(*(reinterpret_cast<unsigned char *>(&addr.saIn.sin_addr.s_addr) + i), dec_2_hex_width);
93     }
94     return ordered_ip;
95   }
96 
97   result = inet_pton(AF_INET6, ip.c_str(), &addr.saIn6.sin6_addr);
98   if (result > 0) {
99     size_t ipv6_len = 16;
100     for (size_t i = 0; i < ipv6_len; i++) {
101       ordered_ip += Dec2Hex(addr.saIn6.sin6_addr.s6_addr[i], dec_2_hex_width);
102     }
103     return ordered_ip;
104   }
105 
106   MS_LOG(EXCEPTION) << "Parse ip failed, result: " << result << ", ip:" << ip;
107 }
108 
109 // The key of nodes consists of node's ip and id.
110 // This is used for sorting nodes and assign global rank ids.
111 struct NodeKey {
112   std::string host_ip;
113   std::string node_id;
114 
115   bool operator<(const NodeKey &node_key) const {
116     auto this_host_ordered_ip = GenerateIpInOrder(host_ip);
117     auto host_ordered_ip = GenerateIpInOrder(node_key.host_ip);
118     if (this_host_ordered_ip < host_ordered_ip) {
119       return true;
120     } else if (this_host_ordered_ip > host_ordered_ip) {
121       return false;
122     } else {
123       if (node_id < node_key.node_id) {
124         return true;
125       } else {
126         return false;
127       }
128     }
129   }
130   bool operator==(const NodeKey &node_key) const {
131     return (node_id == node_key.node_id) && (host_ip == node_key.host_ip);
132   }
133 };
134 
135 // The MetaServerNode is a separate process representing the meta server node which stores all the metadata and status
136 // of computation graph nodes.
137 class MetaServerNode : public NodeBase {
138  public:
139   explicit MetaServerNode(const std::string &node_id, const std::string &role, const size_t &node_num,
140                           uint64_t node_timeout = kDefaultNodeTimeout)
NodeBase(node_id,role)141       : NodeBase(node_id, role), total_node_num_(node_num), abnormal_node_num_(0), enable_monitor_(true) {}
142   ~MetaServerNode() override;
143 
144   bool Initialize() override;
145   bool Initialized() override;
146 
147   bool Finalize(bool force = false) override;
148 
149   // Get the current topology state.
150   TopoState TopologyState() const;
151 
152   // Get the number of alive compute graph node.
153   size_t GetAliveNodeNum();
154 
155   // Register the message handler for the user defined message which is specified by the `name` parameter.
156   bool RegisterMessageHandler(const std::string &name,
157                               const std::shared_ptr<std::function<std::string(const std::string &)>> &handler);
158 
159  private:
160   // Set metadata for this cluster.
161   void SetMetaData();
162 
163   // Create and init the tcp server.
164   bool InitTCPServer();
165 
166   // Handle the message received by the tcp server.
167   MessageBase *const HandleMessage(MessageBase *const message);
168 
169   // Process the received register message sent from compute graph nodes.
170   MessageBase *const ProcessRegister(MessageBase *const message);
171 
172   // Process the received unregister message sent from compute graph nodes.
173   MessageBase *const ProcessUnregister(MessageBase *const message);
174 
175   // Process the received heartbeat message sent from compute graph nodes.
176   MessageBase *const ProcessHeartbeat(MessageBase *const message);
177 
178   // Process user-defined metadata writing and reading requests.
179   MessageBase *const ProcessWriteMetadata(MessageBase *const message);
180   MessageBase *const ProcessReadMetadata(MessageBase *const message);
181   MessageBase *const ProcessDeleteMetadata(MessageBase *const message);
182 
183   // Gather all the hostname of registered compute graph nodes.
184   MessageBase *const ProcessGetHostNames(MessageBase *const message);
185 
186   // Maintain the state which is type of `TopoState` of this cluster topology.
187   void UpdateTopoState();
188 
189   // Try to transition the state of cluster to be initialized.
190   bool TransitionToInitialized();
191 
192   // For each computing graph node, port range should be assigned by meta server node for rpc servers to bind.
193   void AssignPortRange();
194 
195   // Recover metadata from the configuration if recovery is enabled.
196   bool Recovery();
197 
198   // Allocate a new valid rank id for new registered compute graph node.
199   uint32_t AllocateRankId(const std::string &role);
200 
201   // Check newly registered node's rank id is valid. If not, msn should reject this register request.
202   bool CheckRankIdValidation(const std::string &node_id, const std::string &role, uint32_t rank_id,
203                              const std::string &host_ip, std::string *reject_reason);
204 
205   // Reassign node ranks. This method should be called only after cluster is successfully built. It sorts all nodes with
206   // their node ip and node id, then assign their rank ids.
207   void ReassignNodeRank();
208 
209   // Persist the required metadata of cluster into storage through configuration.
210   bool Persist();
211 
212   // The meta server address used to manage the tcp server.
213   MetaServerAddress meta_server_addr_;
214 
215   // The TCP server is used to process messages sent from compute graph nodes.
216   std::unique_ptr<rpc::TCPServer> tcp_server_;
217 
218   // All the handlers for compute graph node's system messages processing.
219   // The `system` means the built-in messages used for cluster topology construction.
220   std::map<MessageName, MessageHandler> system_msg_handlers_;
221 
222   // All the handlers for compute graph node's user-defined messages processing.
223   // The `user-defined` means that this kind of message is user defined and has customized message handler.
224   std::map<std::string, std::shared_ptr<std::function<std::string(const std::string &)>>> message_handlers_;
225 
226   // Stores the registered compute graph nodes.
227   std::map<std::string, std::shared_ptr<NodeInfo>> nodes_;
228 
229   mutable std::shared_mutex nodes_mutex_;
230 
231   // The total legal number of compute graph nodes.
232   size_t total_node_num_;
233 
234   // The total number of abnormal(eg. timeout) compute graph nodes.
235   size_t abnormal_node_num_;
236 
237   // The monitor thread for update the topo state.
238   std::thread topo_monitor_;
239 
240   // The switch for the topo monitor thread.
241   std::atomic<bool> enable_monitor_;
242 
243   // The metadata written and read by users.
244   std::map<std::string, std::string> metadata_;
245 
246   mutable std::shared_mutex meta_mutex_;
247 
248   // A key-value pairs metadata config used for failover recovery if enabled.
249   std::unique_ptr<recovery::Configuration> configuration_;
250 
251   // The next valid rank id for compute graph nodes.
252   // Note that each role(group) has it's own rank id.
253   std::map<std::string, std::atomic<uint32_t>> next_rank_ids_;
254   // The expected node number for each role.
255   std::map<std::string, uint32_t> role_expect_num_;
256   mutable std::shared_mutex rank_mutex_;
257 };
258 }  // namespace topology
259 }  // namespace cluster
260 }  // namespace distributed
261 }  // namespace mindspore
262 #endif  // MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_META_SERVER_NODE_H_
263