1 /**
2 * Copyright 2022 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_META_SERVER_NODE_H_
18 #define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_META_SERVER_NODE_H_
19
20 #include <string>
21 #include <memory>
22 #include <map>
23 #include <thread>
24 #include <shared_mutex>
25 #include <unordered_map>
26 #include "include/backend/distributed/rpc/tcp/tcp_server.h"
27 #include "distributed/recovery/configuration.h"
28 #include "include/backend/distributed/cluster/topology/node_base.h"
29
30 namespace mindspore {
31 namespace distributed {
32 namespace cluster {
33 namespace topology {
34 // Indicates the state of compute graph node.
35 enum class NodeState {
36 // This node is newly created and unauthenticated.
37 kNew = 0,
38
39 // This node has finished registration from meta server.
40 kRegistered,
41
42 // This node has finished unregistration from meta server.
43 kUnregistered,
44
45 // This node has timed out because there's no heartbeat message after `kNodeTimeout`.
46 kTimeout
47 };
48
49 // Record the state of the compute graph node.
50 struct NodeInfo {
NodeInfoNodeInfo51 explicit NodeInfo(const std::string &id) { node_id = id; }
52 std::string node_id;
53
54 // The local host name of this cluster node.
55 std::string host_name;
56
57 // The host ip of this node in the cluster. Nodes use this address to create network communication with each other.
58 std::string host_ip;
59
60 // The role name of this cluster node.
61 std::string role;
62
63 // The rank id of this cluster node(only for compute graph node).
64 uint32_t rank_id{0};
65
66 // The timestamp of last heartbeat.
67 // This timestamp is considered the health state of the node.
68 time_t last_update{0};
69
70 // Maintain the state of the node.
71 NodeState state{NodeState::kNew};
72 };
73
Dec2Hex(int i,uint32_t width)74 inline std::string Dec2Hex(int i, uint32_t width) {
75 std::string temp;
76 std::stringstream ss;
77 ss << std::hex << i;
78 ss >> temp;
79 if (width > temp.size()) {
80 return std::string((width - temp.size()), '0') + temp;
81 }
82 return temp;
83 }
84
GenerateIpInOrder(const std::string & ip)85 inline std::string GenerateIpInOrder(const std::string &ip) {
86 rpc::SocketAddress addr;
87 std::string ordered_ip = "";
88 uint32_t dec_2_hex_width = 2;
89 int result = inet_pton(AF_INET, ip.c_str(), &addr.saIn.sin_addr);
90 if (result > 0) {
91 for (size_t i = 0; i < sizeof(addr.saIn.sin_addr.s_addr) / sizeof(unsigned char); i++) {
92 ordered_ip += Dec2Hex(*(reinterpret_cast<unsigned char *>(&addr.saIn.sin_addr.s_addr) + i), dec_2_hex_width);
93 }
94 return ordered_ip;
95 }
96
97 result = inet_pton(AF_INET6, ip.c_str(), &addr.saIn6.sin6_addr);
98 if (result > 0) {
99 size_t ipv6_len = 16;
100 for (size_t i = 0; i < ipv6_len; i++) {
101 ordered_ip += Dec2Hex(addr.saIn6.sin6_addr.s6_addr[i], dec_2_hex_width);
102 }
103 return ordered_ip;
104 }
105
106 MS_LOG(EXCEPTION) << "Parse ip failed, result: " << result << ", ip:" << ip;
107 }
108
109 // The key of nodes consists of node's ip and id.
110 // This is used for sorting nodes and assign global rank ids.
111 struct NodeKey {
112 std::string host_ip;
113 std::string node_id;
114
115 bool operator<(const NodeKey &node_key) const {
116 auto this_host_ordered_ip = GenerateIpInOrder(host_ip);
117 auto host_ordered_ip = GenerateIpInOrder(node_key.host_ip);
118 if (this_host_ordered_ip < host_ordered_ip) {
119 return true;
120 } else if (this_host_ordered_ip > host_ordered_ip) {
121 return false;
122 } else {
123 if (node_id < node_key.node_id) {
124 return true;
125 } else {
126 return false;
127 }
128 }
129 }
130 bool operator==(const NodeKey &node_key) const {
131 return (node_id == node_key.node_id) && (host_ip == node_key.host_ip);
132 }
133 };
134
135 // The MetaServerNode is a separate process representing the meta server node which stores all the metadata and status
136 // of computation graph nodes.
137 class MetaServerNode : public NodeBase {
138 public:
139 explicit MetaServerNode(const std::string &node_id, const std::string &role, const size_t &node_num,
140 uint64_t node_timeout = kDefaultNodeTimeout)
NodeBase(node_id,role)141 : NodeBase(node_id, role), total_node_num_(node_num), abnormal_node_num_(0), enable_monitor_(true) {}
142 ~MetaServerNode() override;
143
144 bool Initialize() override;
145 bool Initialized() override;
146
147 bool Finalize(bool force = false) override;
148
149 // Get the current topology state.
150 TopoState TopologyState() const;
151
152 // Get the number of alive compute graph node.
153 size_t GetAliveNodeNum();
154
155 // Register the message handler for the user defined message which is specified by the `name` parameter.
156 bool RegisterMessageHandler(const std::string &name,
157 const std::shared_ptr<std::function<std::string(const std::string &)>> &handler);
158
159 private:
160 // Set metadata for this cluster.
161 void SetMetaData();
162
163 // Create and init the tcp server.
164 bool InitTCPServer();
165
166 // Handle the message received by the tcp server.
167 MessageBase *const HandleMessage(MessageBase *const message);
168
169 // Process the received register message sent from compute graph nodes.
170 MessageBase *const ProcessRegister(MessageBase *const message);
171
172 // Process the received unregister message sent from compute graph nodes.
173 MessageBase *const ProcessUnregister(MessageBase *const message);
174
175 // Process the received heartbeat message sent from compute graph nodes.
176 MessageBase *const ProcessHeartbeat(MessageBase *const message);
177
178 // Process user-defined metadata writing and reading requests.
179 MessageBase *const ProcessWriteMetadata(MessageBase *const message);
180 MessageBase *const ProcessReadMetadata(MessageBase *const message);
181 MessageBase *const ProcessDeleteMetadata(MessageBase *const message);
182
183 // Gather all the hostname of registered compute graph nodes.
184 MessageBase *const ProcessGetHostNames(MessageBase *const message);
185
186 // Maintain the state which is type of `TopoState` of this cluster topology.
187 void UpdateTopoState();
188
189 // Try to transition the state of cluster to be initialized.
190 bool TransitionToInitialized();
191
192 // For each computing graph node, port range should be assigned by meta server node for rpc servers to bind.
193 void AssignPortRange();
194
195 // Recover metadata from the configuration if recovery is enabled.
196 bool Recovery();
197
198 // Allocate a new valid rank id for new registered compute graph node.
199 uint32_t AllocateRankId(const std::string &role);
200
201 // Check newly registered node's rank id is valid. If not, msn should reject this register request.
202 bool CheckRankIdValidation(const std::string &node_id, const std::string &role, uint32_t rank_id,
203 const std::string &host_ip, std::string *reject_reason);
204
205 // Reassign node ranks. This method should be called only after cluster is successfully built. It sorts all nodes with
206 // their node ip and node id, then assign their rank ids.
207 void ReassignNodeRank();
208
209 // Persist the required metadata of cluster into storage through configuration.
210 bool Persist();
211
212 // The meta server address used to manage the tcp server.
213 MetaServerAddress meta_server_addr_;
214
215 // The TCP server is used to process messages sent from compute graph nodes.
216 std::unique_ptr<rpc::TCPServer> tcp_server_;
217
218 // All the handlers for compute graph node's system messages processing.
219 // The `system` means the built-in messages used for cluster topology construction.
220 std::map<MessageName, MessageHandler> system_msg_handlers_;
221
222 // All the handlers for compute graph node's user-defined messages processing.
223 // The `user-defined` means that this kind of message is user defined and has customized message handler.
224 std::map<std::string, std::shared_ptr<std::function<std::string(const std::string &)>>> message_handlers_;
225
226 // Stores the registered compute graph nodes.
227 std::map<std::string, std::shared_ptr<NodeInfo>> nodes_;
228
229 mutable std::shared_mutex nodes_mutex_;
230
231 // The total legal number of compute graph nodes.
232 size_t total_node_num_;
233
234 // The total number of abnormal(eg. timeout) compute graph nodes.
235 size_t abnormal_node_num_;
236
237 // The monitor thread for update the topo state.
238 std::thread topo_monitor_;
239
240 // The switch for the topo monitor thread.
241 std::atomic<bool> enable_monitor_;
242
243 // The metadata written and read by users.
244 std::map<std::string, std::string> metadata_;
245
246 mutable std::shared_mutex meta_mutex_;
247
248 // A key-value pairs metadata config used for failover recovery if enabled.
249 std::unique_ptr<recovery::Configuration> configuration_;
250
251 // The next valid rank id for compute graph nodes.
252 // Note that each role(group) has it's own rank id.
253 std::map<std::string, std::atomic<uint32_t>> next_rank_ids_;
254 // The expected node number for each role.
255 std::map<std::string, uint32_t> role_expect_num_;
256 mutable std::shared_mutex rank_mutex_;
257 };
258 } // namespace topology
259 } // namespace cluster
260 } // namespace distributed
261 } // namespace mindspore
262 #endif // MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_META_SERVER_NODE_H_
263