• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_PS_CORE_NODE_MANAGER_H_
18 #define MINDSPORE_CCSRC_PS_CORE_NODE_MANAGER_H_
19 
20 #include <atomic>
21 #include <cstdlib>
22 #include <functional>
23 #include <iostream>
24 #include <map>
25 #include <memory>
26 #include <set>
27 #include <string>
28 #include <thread>
29 #include <unordered_map>
30 #include <vector>
31 #include <condition_variable>
32 #include <unordered_set>
33 #include <deque>
34 #include <algorithm>
35 
36 #include "ps/core/node.h"
37 #include "utils/log_adapter.h"
38 #include "utils/convert_utils_base.h"
39 #include "ps/core/cluster_metadata.h"
40 
41 namespace mindspore {
42 namespace ps {
43 namespace core {
44 class NodeManager {
45  public:
NodeManager()46   NodeManager()
47       : initial_total_node_num_(0),
48         total_node_num_(-1),
49         current_node_num_(-1),
50         next_worker_rank_id_(-1),
51         next_server_rank_id_(-1),
52         meta_data_(nullptr),
53         node_state_(NodeState::NODE_STARTING),
54         cluster_state_(ClusterState::ClUSTER_STARTING) {}
55   virtual ~NodeManager() = default;
56 
57   // When initializing nodes, the initial number of nodes will be assigned to the total number of nodes.
58   void InitNode();
59   uint32_t NextRankId(const RegisterMessage &register_message, const std::shared_ptr<MessageMeta> &meta);
60 
61   void UpdateHeartbeat(const std::string &node_id);
62   std::vector<ServersMeta> FetchServersMeta();
63   // Fetch metadata information of all nodes.
64   std::vector<ServersMeta> FetchAllNodesMeta();
65 
66   void UpdateCluster();
67   void CheckClusterTimeout();
68   void AddFinishNode(const std::string &finish_message);
69 
70   // After the scheduler receives the scale_out_done node, it will save this node.
71   void AddScaleOutDoneNode(const std::string &node_id);
72   // After the scheduler receives the scale_in_done node, it will save this node.
73   void AddScaleInDoneNode(const std::string &node_id);
74 
75   // When workers and servers registered to scheduler, the scheduler will collect the number of registered
76   // nodes and Determine whether the registered number of worker and server is equal to total_node_num_.
77   bool IsAllNodesRegistered() const;
78   // When workers and servers send a finish message to the scheduler, the scheduler will collect the number of
79   // finish nodes and Determine whether the finished nodes are equal to total_node_num_.
80   bool IsAllNodesFinished() const;
81 
82   // When workers and servers send a scale_out_done message to the scheduler, the scheduler will collect the number of
83   // nodes and Determine whether the nodes are equal to total_node_num_.
84   bool IsAllNodesScaleOutDone() const;
85   // When workers and servers send a scale_in_done message to the scheduler, the scheduler will collect the number of
86   // nodes and Determine whether the nodes are equal to total_node_num_.
87   bool IsAllNodesScaleInDone() const;
88 
89   const std::unordered_map<std::string, NodeInfo> &nodes_info() const;
90   const std::unordered_map<std::string, NodeInfo> &registered_nodes_info() const;
91   // After all the nodes are registered successfully, the nodes info can be updated.
92   void UpdateNodesInfo();
93 
94   void set_total_node_num(const int32_t &node_num);
95   const int32_t &total_node_num() const;
96   void set_worker_num(const int32_t &worker_num);
97   void set_server_num(const int32_t &server_num);
98   int32_t worker_num() const;
99   int32_t server_num() const;
100 
101   void UpdateNodeState(const NodeState &state);
102   void UpdateClusterState(const ClusterState &state);
103   NodeState GetNodeState();
104   ClusterState GetClusterState();
105 
106   // When the scheduler receives the scale out or scale in message, the metadata needs to be reset, because all nodes
107   // will re-register.
108   void ResetMetadata(const std::vector<std::string> &scale_in_nodes = {});
109 
110   bool IsWorkerOrServer0();
111 
112   // Determine whether the node id has been registered.
113   bool IsNodeRegistered(const std::string &node_id);
114 
115  private:
116   std::mutex node_mutex_;
117   std::mutex cluster_mutex_;
118 
119   uint32_t initial_total_node_num_;
120   int32_t total_node_num_;
121   int32_t current_node_num_;
122 
123   std::atomic<int> next_worker_rank_id_;
124   std::atomic<int> next_server_rank_id_;
125 
126   // Whenever a node is registered, it will be stored in this map.
127   std::unordered_map<std::string, NodeInfo> registered_nodes_info_;
128   // When all nodes are registered successfully, then all nodes info will be stored in this map. In other words, the
129   // nodes_info_ is a snapshot of the registered_nodes_info_.
130   std::unordered_map<std::string, NodeInfo> nodes_info_;
131   std::mutex assign_rank_id_mutex_;
132   std::mutex heartbeat_mutex_;
133 
134   std::unordered_map<std::string, timeval> heartbeats_;
135   // timeout nodes
136   std::unordered_map<std::string, NodeInfo> timeout_nodes_info_;
137   std::unordered_set<std::string> finish_nodes_id_;
138 
139   // The scheduler aggregates scale_out_done messages from workers/servers
140   std::unordered_set<std::string> scale_out_done_nodes_id_;
141   // The scheduler aggregates scale_in_done messages from workers/servers
142   std::unordered_set<std::string> scale_in_done_nodes_id_;
143 
144   // Cluster metadata information can be dynamically changed
145   std::unique_ptr<ClusterMetadata> meta_data_;
146 
147   NodeState node_state_;
148   ClusterState cluster_state_;
149 };
150 }  // namespace core
151 }  // namespace ps
152 }  // namespace mindspore
153 #endif  // MINDSPORE_CCSRC_PS_CORE_NODE_MANAGER_H_
154