1 /** 2 * Copyright 2022 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_COMPUTE_GRAPH_NODE_H_ 18 #define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_COMPUTE_GRAPH_NODE_H_ 19 20 #include <string> 21 #include <memory> 22 #include <thread> 23 #include <vector> 24 #include <map> 25 #include <shared_mutex> 26 #include "include/backend/distributed/cluster/topology/common.h" 27 #include "include/backend/distributed/rpc/tcp/tcp_client.h" 28 #include "include/backend/distributed/cluster/topology/node_base.h" 29 30 namespace mindspore { 31 namespace distributed { 32 namespace cluster { 33 namespace topology { 34 // The ComputeGraphNode is a separate process representing a sub-graph of the distributed computation graph. 35 class BACKEND_EXPORT ComputeGraphNode : public NodeBase { 36 public: ComputeGraphNode(const std::string & node_id,const std::string & role)37 ComputeGraphNode(const std::string &node_id, const std::string &role) 38 : NodeBase(node_id, role), client_ip_(""), authenticated_(false), enable_hb_(false) {} 39 ~ComputeGraphNode() override; 40 41 bool Initialize() override; 42 bool Initialized() override; 43 44 bool Finalize(bool force = false) override; 45 46 // Send the specified message to the meta server node. 47 bool SendMessageToMSN(const std::string msg_name, const std::string &msg_body, bool sync = true); 48 49 // Query the specified message from the meta server node according to the given message name. 50 // Returns nullptr if no message returned after timeout. 51 std::shared_ptr<std::string> RetrieveMessageFromMSN(const std::string &msg_name, uint32_t timeout = 5); 52 53 // Write and read user defined metadata to the meta server node. 54 bool PutMetadata(const std::string &name, const std::string &value, bool sync = true); 55 bool PutMetadata(const std::string &name, const void *value, const size_t &size); 56 57 std::string GetMetadata(const std::string &name, uint32_t timeout = 5); 58 59 bool DeleteMetadata(const std::string &name, uint32_t timeout = 5); 60 61 // Exchange metadata(name:value) between all the compute graph nodes. 62 // The transaction of the exchange process is guaranteed. 63 bool ExchangeMetadata(const std::string &biz, const size_t &rank_size, const std::vector<std::string> &names_prefix, 64 const std::vector<std::string> &values, std::map<std::string, std::string> *results, 65 uint32_t timeout = 90); 66 67 // Get all the hostnames of compute graph nodes. 68 std::vector<std::string> GetHostNames(const std::string &role); 69 70 void set_abnormal_callback(std::shared_ptr<std::function<void(void)>> abnormal_callback) override; 71 72 // Return client ip of this cgn which is used for cluster building. 73 const std::string &client_ip() const; 74 75 private: 76 // Send the register message to the meta server node when this node process startup. 77 bool Register(); 78 79 // Send the unregister message to the meta server node. 80 bool Unregister(); 81 82 // Send the heartbeat message to the meta server node. 83 bool Heartbeat(); 84 85 // Call the `Reconnect` function if the input func execution failed. 86 bool ReconnectIfNeeded(const std::function<bool(void)> &func, const std::string &error, size_t retry); 87 bool ReconnectWithTimeoutWindow(const std::function<bool(void)> &func, const std::string &error, size_t time_out); 88 89 // Reconnect to the meta server node. 90 bool Reconnect(); 91 92 std::shared_ptr<std::string> RetrieveMessageFromMSN(const std::string &msg_name, const std::string &msg_body, 93 uint32_t timeout = 5); 94 95 // The meta server address used to synchronize metadata with other compute graph nodes. 96 MetaServerAddress meta_server_addr_; 97 98 // The TCP client is used to send messages to meta server node. 99 std::unique_ptr<rpc::TCPClient> tcp_client_; 100 101 // The TCP client used to send heartbeat to meta server. 102 std::unique_ptr<rpc::TCPClient> hb_client_; 103 104 // Tcp client ip address of this cgn. 105 std::string client_ip_; 106 107 // Incidate whether this node is authenticated by meta server node. 108 std::atomic<bool> authenticated_; 109 110 // The heartbeat thread from compute graph node to meta server node. 111 std::thread heartbeat_; 112 113 // Indicate whether the heartbeat thread is running. 114 bool enable_hb_; 115 116 std::shared_ptr<std::function<void(void)>> abnormal_callback_; 117 118 mutable std::shared_mutex exchange_meta_mutex_; 119 }; 120 } // namespace topology 121 } // namespace cluster 122 } // namespace distributed 123 } // namespace mindspore 124 #endif // MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_COMPUTE_GRAPH_NODE_H_ 125