1 /** 2 * Copyright 2022 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_NODE_BASE_H_ 18 #define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_NODE_BASE_H_ 19 20 #include <chrono> 21 #include <string> 22 #include <memory> 23 #include "include/backend/distributed/cluster/topology/common.h" 24 #include "include/backend/distributed/cluster/topology/utils.h" 25 26 namespace mindspore { 27 namespace distributed { 28 namespace cluster { 29 namespace topology { 30 // A node represents a separate process which is one node of the distributed computation graph or the meta-server 31 // process. The node abstraction is for the dynamic networking of the distributed computation graph, allowing 32 // distributed computation graphs to communicate with each other during runtime and automatic recovery of node 33 // processes. 34 class NodeBase { 35 public: NodeBase(const std::string & node_id,const std::string & role)36 explicit NodeBase(const std::string &node_id, const std::string &role) 37 : node_id_(node_id), 38 rank_id_(-1), 39 role_(role), 40 finalized_(false), 41 start_time_(Now()), 42 topo_state_(TopoState::kInitializing) { 43 std::string env_topo_timeout = common::GetEnv(kEnvTopoTimeOut); 44 int int_topo_timeout = env_topo_timeout.empty() ? kDefaultTopoTimeOut : std::stoi(env_topo_timeout); 45 topo_timeout_ = (int_topo_timeout < 0) ? UINT64_MAX : int_topo_timeout; 46 MS_LOG(INFO) << "Cluster topo timeout is " << topo_timeout_ << " seconds."; 47 48 std::string env_node_timeout = common::GetEnv(kEnvNodeTimeOut); 49 int int_node_timeout = env_node_timeout.empty() ? kDefaultNodeTimeout : std::stoi(env_node_timeout); 50 node_timeout_ = (int_node_timeout < 0) ? UINT64_MAX : int_node_timeout; 51 MS_LOG(INFO) << "Node timeout after exception is " << node_timeout_ << " seconds."; 52 53 // If set MS_DISABLE_HEARTBEAT to 1, disable heartbeat after cluster is built. 54 disable_heartbeat_ = (common::GetEnv("MS_DISABLE_HEARTBEAT") == "1"); 55 if (disable_heartbeat_) { 56 MS_LOG(WARNING) 57 << "The heartbeat feature between cluster nodes is disabled! The scheduler won't detect timeout nodes."; 58 } 59 } 60 virtual ~NodeBase() = default; 61 62 // Prepare the resources hold in this node. 63 virtual bool Initialize() = 0; 64 65 // Returns whether all the initialization work has been completed. 66 virtual bool Initialized() = 0; 67 68 // Release the resources hold in this node. 69 // If the parameter force is set to true, this node will be finalized without waiting for unregister of all the 70 // compute graph node. 71 virtual bool Finalize(bool force = false) = 0; 72 73 // Set the callback which will be called when the state of the cluster is abnormal. set_abnormal_callback(std::shared_ptr<std::function<void (void)>> abnormal_callback)74 virtual void set_abnormal_callback(std::shared_ptr<std::function<void(void)>> abnormal_callback) {} 75 node_id()76 std::string node_id() const { return node_id_; } 77 set_rank_id(uint32_t rank_id)78 void set_rank_id(uint32_t rank_id) { rank_id_ = rank_id; } rank_id()79 uint32_t rank_id() const { return rank_id_; } 80 role()81 std::string role() const { return role_; } 82 topo_timeout()83 size_t topo_timeout() const { return topo_timeout_; } node_timeout()84 size_t node_timeout() const { return node_timeout_; } 85 86 protected: 87 // Each node process has a unique node id which is immutable during the life cycle of this node. 88 // The node id is used for identify authentication during networking and process recovery. 89 std::string node_id_; 90 91 // The rank id of this compute graph node process in the cluster. 92 // The rank id is assigned by meta server node and starts from 0 to (node_num - 1). 93 uint32_t rank_id_; 94 95 // The role name of this node specified by the environment variable. 96 std::string role_; 97 98 // Indicates whether the finalize method of this node has been called. 99 bool finalized_; 100 101 // The start time of this meta server node. 102 std::chrono::high_resolution_clock::time_point start_time_; 103 104 // The state of the topology consisting of compute graph nodes. 105 TopoState topo_state_; 106 107 // Cluster building time out window in second. 108 size_t topo_timeout_; 109 110 // The timeout(second) window for heartbeat from compute graph node to meta server. 111 size_t node_timeout_; 112 113 // Whether heartbeat is disabled. If it is, the scheduler won't detect timed out node. It's caller's job to handle the 114 // exception in this cluster. 115 bool disable_heartbeat_; 116 }; 117 } // namespace topology 118 } // namespace cluster 119 } // namespace distributed 120 } // namespace mindspore 121 #endif // MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_NODE_BASE_H_ 122