• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_NODE_BASE_H_
18 #define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_NODE_BASE_H_
19 
20 #include <chrono>
21 #include <string>
22 #include <memory>
23 #include "include/backend/distributed/cluster/topology/common.h"
24 #include "include/backend/distributed/cluster/topology/utils.h"
25 
26 namespace mindspore {
27 namespace distributed {
28 namespace cluster {
29 namespace topology {
30 // A node represents a separate process which is one node of the distributed computation graph or the meta-server
31 // process. The node abstraction is for the dynamic networking of the distributed computation graph, allowing
32 // distributed computation graphs to communicate with each other during runtime and automatic recovery of node
33 // processes.
34 class NodeBase {
35  public:
NodeBase(const std::string & node_id,const std::string & role)36   explicit NodeBase(const std::string &node_id, const std::string &role)
37       : node_id_(node_id),
38         rank_id_(-1),
39         role_(role),
40         finalized_(false),
41         start_time_(Now()),
42         topo_state_(TopoState::kInitializing) {
43     std::string env_topo_timeout = common::GetEnv(kEnvTopoTimeOut);
44     int int_topo_timeout = env_topo_timeout.empty() ? kDefaultTopoTimeOut : std::stoi(env_topo_timeout);
45     topo_timeout_ = (int_topo_timeout < 0) ? UINT64_MAX : int_topo_timeout;
46     MS_LOG(INFO) << "Cluster topo timeout is " << topo_timeout_ << " seconds.";
47 
48     std::string env_node_timeout = common::GetEnv(kEnvNodeTimeOut);
49     int int_node_timeout = env_node_timeout.empty() ? kDefaultNodeTimeout : std::stoi(env_node_timeout);
50     node_timeout_ = (int_node_timeout < 0) ? UINT64_MAX : int_node_timeout;
51     MS_LOG(INFO) << "Node timeout after exception is " << node_timeout_ << " seconds.";
52 
53     // If set MS_DISABLE_HEARTBEAT to 1, disable heartbeat after cluster is built.
54     disable_heartbeat_ = (common::GetEnv("MS_DISABLE_HEARTBEAT") == "1");
55     if (disable_heartbeat_) {
56       MS_LOG(WARNING)
57         << "The heartbeat feature between cluster nodes is disabled! The scheduler won't detect timeout nodes.";
58     }
59   }
60   virtual ~NodeBase() = default;
61 
62   // Prepare the resources hold in this node.
63   virtual bool Initialize() = 0;
64 
65   // Returns whether all the initialization work has been completed.
66   virtual bool Initialized() = 0;
67 
68   // Release the resources hold in this node.
69   // If the parameter force is set to true, this node will be finalized without waiting for unregister of all the
70   // compute graph node.
71   virtual bool Finalize(bool force = false) = 0;
72 
73   // Set the callback which will be called when the state of the cluster is abnormal.
set_abnormal_callback(std::shared_ptr<std::function<void (void)>> abnormal_callback)74   virtual void set_abnormal_callback(std::shared_ptr<std::function<void(void)>> abnormal_callback) {}
75 
node_id()76   std::string node_id() const { return node_id_; }
77 
set_rank_id(uint32_t rank_id)78   void set_rank_id(uint32_t rank_id) { rank_id_ = rank_id; }
rank_id()79   uint32_t rank_id() const { return rank_id_; }
80 
role()81   std::string role() const { return role_; }
82 
topo_timeout()83   size_t topo_timeout() const { return topo_timeout_; }
node_timeout()84   size_t node_timeout() const { return node_timeout_; }
85 
86  protected:
87   // Each node process has a unique node id which is immutable during the life cycle of this node.
88   // The node id is used for identify authentication during networking and process recovery.
89   std::string node_id_;
90 
91   // The rank id of this compute graph node process in the cluster.
92   // The rank id is assigned by meta server node and starts from 0 to (node_num - 1).
93   uint32_t rank_id_;
94 
95   // The role name of this node specified by the environment variable.
96   std::string role_;
97 
98   // Indicates whether the finalize method of this node has been called.
99   bool finalized_;
100 
101   // The start time of this meta server node.
102   std::chrono::high_resolution_clock::time_point start_time_;
103 
104   // The state of the topology consisting of compute graph nodes.
105   TopoState topo_state_;
106 
107   // Cluster building time out window in second.
108   size_t topo_timeout_;
109 
110   // The timeout(second) window for heartbeat from compute graph node to meta server.
111   size_t node_timeout_;
112 
113   // Whether heartbeat is disabled. If it is, the scheduler won't detect timed out node. It's caller's job to handle the
114   // exception in this cluster.
115   bool disable_heartbeat_;
116 };
117 }  // namespace topology
118 }  // namespace cluster
119 }  // namespace distributed
120 }  // namespace mindspore
121 #endif  // MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_NODE_BASE_H_
122