1 /** 2 * Copyright 2020 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_PS_CORE_NODE_MANAGER_H_ 18 #define MINDSPORE_CCSRC_PS_CORE_NODE_MANAGER_H_ 19 20 #include <atomic> 21 #include <cstdlib> 22 #include <functional> 23 #include <iostream> 24 #include <map> 25 #include <memory> 26 #include <set> 27 #include <string> 28 #include <thread> 29 #include <unordered_map> 30 #include <vector> 31 #include <condition_variable> 32 #include <unordered_set> 33 #include <deque> 34 #include <algorithm> 35 36 #include "ps/core/node.h" 37 #include "utils/log_adapter.h" 38 #include "utils/convert_utils_base.h" 39 #include "ps/core/cluster_metadata.h" 40 41 namespace mindspore { 42 namespace ps { 43 namespace core { 44 class NodeManager { 45 public: NodeManager()46 NodeManager() 47 : initial_total_node_num_(0), 48 total_node_num_(-1), 49 current_node_num_(-1), 50 next_worker_rank_id_(-1), 51 next_server_rank_id_(-1), 52 meta_data_(nullptr), 53 node_state_(NodeState::NODE_STARTING), 54 cluster_state_(ClusterState::ClUSTER_STARTING) {} 55 virtual ~NodeManager() = default; 56 57 // When initializing nodes, the initial number of nodes will be assigned to the total number of nodes. 58 void InitNode(); 59 uint32_t NextRankId(const RegisterMessage ®ister_message, const std::shared_ptr<MessageMeta> &meta); 60 61 void UpdateHeartbeat(const std::string &node_id); 62 std::vector<ServersMeta> FetchServersMeta(); 63 // Fetch metadata information of all nodes. 64 std::vector<ServersMeta> FetchAllNodesMeta(); 65 66 void UpdateCluster(); 67 void CheckClusterTimeout(); 68 void AddFinishNode(const std::string &finish_message); 69 70 // After the scheduler receives the scale_out_done node, it will save this node. 71 void AddScaleOutDoneNode(const std::string &node_id); 72 // After the scheduler receives the scale_in_done node, it will save this node. 73 void AddScaleInDoneNode(const std::string &node_id); 74 75 // When workers and servers registered to scheduler, the scheduler will collect the number of registered 76 // nodes and Determine whether the registered number of worker and server is equal to total_node_num_. 77 bool IsAllNodesRegistered() const; 78 // When workers and servers send a finish message to the scheduler, the scheduler will collect the number of 79 // finish nodes and Determine whether the finished nodes are equal to total_node_num_. 80 bool IsAllNodesFinished() const; 81 82 // When workers and servers send a scale_out_done message to the scheduler, the scheduler will collect the number of 83 // nodes and Determine whether the nodes are equal to total_node_num_. 84 bool IsAllNodesScaleOutDone() const; 85 // When workers and servers send a scale_in_done message to the scheduler, the scheduler will collect the number of 86 // nodes and Determine whether the nodes are equal to total_node_num_. 87 bool IsAllNodesScaleInDone() const; 88 89 const std::unordered_map<std::string, NodeInfo> &nodes_info() const; 90 const std::unordered_map<std::string, NodeInfo> ®istered_nodes_info() const; 91 // After all the nodes are registered successfully, the nodes info can be updated. 92 void UpdateNodesInfo(); 93 94 void set_total_node_num(const int32_t &node_num); 95 const int32_t &total_node_num() const; 96 void set_worker_num(const int32_t &worker_num); 97 void set_server_num(const int32_t &server_num); 98 int32_t worker_num() const; 99 int32_t server_num() const; 100 101 void UpdateNodeState(const NodeState &state); 102 void UpdateClusterState(const ClusterState &state); 103 NodeState GetNodeState(); 104 ClusterState GetClusterState(); 105 106 // When the scheduler receives the scale out or scale in message, the metadata needs to be reset, because all nodes 107 // will re-register. 108 void ResetMetadata(const std::vector<std::string> &scale_in_nodes = {}); 109 110 bool IsWorkerOrServer0(); 111 112 // Determine whether the node id has been registered. 113 bool IsNodeRegistered(const std::string &node_id); 114 115 private: 116 std::mutex node_mutex_; 117 std::mutex cluster_mutex_; 118 119 uint32_t initial_total_node_num_; 120 int32_t total_node_num_; 121 int32_t current_node_num_; 122 123 std::atomic<int> next_worker_rank_id_; 124 std::atomic<int> next_server_rank_id_; 125 126 // Whenever a node is registered, it will be stored in this map. 127 std::unordered_map<std::string, NodeInfo> registered_nodes_info_; 128 // When all nodes are registered successfully, then all nodes info will be stored in this map. In other words, the 129 // nodes_info_ is a snapshot of the registered_nodes_info_. 130 std::unordered_map<std::string, NodeInfo> nodes_info_; 131 std::mutex assign_rank_id_mutex_; 132 std::mutex heartbeat_mutex_; 133 134 std::unordered_map<std::string, timeval> heartbeats_; 135 // timeout nodes 136 std::unordered_map<std::string, NodeInfo> timeout_nodes_info_; 137 std::unordered_set<std::string> finish_nodes_id_; 138 139 // The scheduler aggregates scale_out_done messages from workers/servers 140 std::unordered_set<std::string> scale_out_done_nodes_id_; 141 // The scheduler aggregates scale_in_done messages from workers/servers 142 std::unordered_set<std::string> scale_in_done_nodes_id_; 143 144 // Cluster metadata information can be dynamically changed 145 std::unique_ptr<ClusterMetadata> meta_data_; 146 147 NodeState node_state_; 148 ClusterState cluster_state_; 149 }; 150 } // namespace core 151 } // namespace ps 152 } // namespace mindspore 153 #endif // MINDSPORE_CCSRC_PS_CORE_NODE_MANAGER_H_ 154