• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_PS_CORE_SCHEDULER_NODE_H_
18 #define MINDSPORE_CCSRC_PS_CORE_SCHEDULER_NODE_H_
19 
20 #include <atomic>
21 #include <cstdlib>
22 #include <iostream>
23 #include <memory>
24 #include <string>
25 #include <vector>
26 #include <thread>
27 #include <mutex>
28 #include <unordered_map>
29 
30 #include "ps/core/cluster_config.h"
31 #include "ps/ps_context.h"
32 #include "ps/core/communicator/tcp_client.h"
33 #include "ps/core/communicator/tcp_server.h"
34 #include "ps/core/node_manager.h"
35 #include "ps/core/node.h"
36 #include "ps/core/communicator/request_process_result_code.h"
37 #include "ps/core/communicator/http_message_handler.h"
38 #include "ps/constants.h"
39 #include "ps/core/cluster_metadata.h"
40 #include "ps/core/communicator/http_server.h"
41 #include "ps/core/leader_scaler.h"
42 #include "ps/core/recovery_base.h"
43 #include "ps/core/instance_manager.h"
44 
45 namespace mindspore {
46 namespace ps {
47 namespace core {
48 class SchedulerNode : public Node {
49  public:
SchedulerNode()50   SchedulerNode()
51       : server_(nullptr),
52         scheduler_thread_(nullptr),
53         update_state_thread_(nullptr),
54         restful_thread_(nullptr),
55         http_server_(nullptr),
56         client_thread_(nullptr),
57         is_client_started_(false),
58         leader_scaler_(nullptr),
59         scheduler_recovery_(nullptr) {}
60   ~SchedulerNode() override;
61 
62   typedef void (SchedulerNode::*ResponseHandler)(const std::shared_ptr<TcpServer> &server,
63                                                  const std::shared_ptr<TcpConnection> &conn,
64                                                  const std::shared_ptr<MessageMeta> &meta, const void *data,
65                                                  size_t size);
66 
67   bool Start(const uint32_t &timeout = PSContext::instance()->cluster_config().cluster_available_timeout) override;
68   bool Stop() override;
69   bool Finish(const uint32_t &timeout = kTimeoutInSeconds) override;
70 
71  private:
72   void Initialize();
73 
74   void InitCommandHandler();
75   void CreateTcpServer();
76   void StartUpdateClusterStateTimer();
77   const std::shared_ptr<TcpClient> &GetOrCreateClient(const NodeInfo &node_info);
78 
79   void ProcessHeartbeat(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn,
80                         const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size);
81   void ProcessRegister(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn,
82                        const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size);
83   void ProcessFinish(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn,
84                      const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size);
85   void ProcessFetchMetadata(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn,
86                             const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size);
87 
88   // Process scale_out_done messages from workers/servers
89   void ProcessScaleOutDone(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn,
90                            const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size);
91   // Process scale_in_done messages from workers/servers
92   void ProcessScaleInDone(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn,
93                           const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size);
94 
95   // Process scale_in_done messages from workers/servers
96   void ProcessSendEvent(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn,
97                         const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size);
98 
99   // After scheduler collects all registered message, it actively sends finish to the node connected by the client.
100   void SendMetadata(const std::shared_ptr<TcpClient> &client, uint32_t rank_id);
101   // After scheduler collects all finish message, it actively sends finish to the node connected by the client.
102   void SendFinish(const std::shared_ptr<TcpClient> &client);
103 
104   // After scheduler collects all scale_out_done message, it actively sends scale_out_done to the node connected by the
105   // client.
106   void SendScaleOutDone(const std::shared_ptr<TcpClient> &client);
107 
108   // After scheduler collects all scale_in_done message, it actively sends scale_out_done to the node connected by the
109   // client.
110   void SendScaleInDone(const std::shared_ptr<TcpClient> &client);
111   // After scheduler receive SEND_EVENT message, it will broadcast the event to all other nodes.
112   void SendEvent(const std::shared_ptr<TcpClient> &client, const uint32_t &event);
113 
114   // Handle the scale out http request, then delegate to the leader scaler to process scale out asynchronously.
115   void ProcessScaleOut(const std::shared_ptr<HttpMessageHandler> &resp);
116 
117   // Handle the scale in http request, then delegate to the leader scaler to process scale in asynchronously.
118   void ProcessScaleIn(const std::shared_ptr<HttpMessageHandler> &resp);
119 
120   // Handle the get nodes info http request Synchronously.
121   void ProcessGetNodesInfo(const std::shared_ptr<HttpMessageHandler> &resp);
122 
123   // Handle the get cluster state http request Synchronously.
124   void ProcessGetClusterState(const std::shared_ptr<HttpMessageHandler> &resp);
125 
126   // Handle the new instance http request Synchronously.
127   void ProcessNewInstance(const std::shared_ptr<HttpMessageHandler> &resp);
128 
129   // Handle the query instance http request Synchronously.
130   void ProcessQueryInstance(const std::shared_ptr<HttpMessageHandler> &resp);
131 
132   // Handle the enable FLS http request Synchronously.
133   void ProcessEnableFLS(const std::shared_ptr<HttpMessageHandler> &resp);
134 
135   // Handle the disable FLS http request Synchronously.
136   void ProcessDisableFLS(const std::shared_ptr<HttpMessageHandler> &resp);
137 
138   // check whether the cluster is in the ready state.
139   RequestProcessResult CheckIfClusterReady();
140 
141   // check whether the node id is legal.
142   RequestProcessResult CheckIfNodeIdLegal(const std::vector<std::string> &node_ids);
143 
144   void StartRestfulServer(const std::string &address, std::uint16_t port, size_t thread_num = 10);
145   void StopRestfulServer();
146 
147   std::shared_ptr<TcpServer> server_;
148   std::unique_ptr<std::thread> scheduler_thread_;
149   std::unique_ptr<std::thread> update_state_thread_;
150   std::unordered_map<NodeCommand, ResponseHandler> handlers_;
151 
152   NodeManager node_manager_;
153 
154   // In this thread will start a http server.
155   std::unique_ptr<std::thread> restful_thread_;
156   std::shared_ptr<HttpServer> http_server_;
157 
158   std::unordered_map<std::string, std::shared_ptr<TcpClient>> connected_nodes_;
159 
160   std::unique_ptr<std::thread> client_thread_;
161   std::atomic<bool> is_client_started_;
162 
163   std::unique_ptr<LeaderScaler> leader_scaler_;
164 
165   std::unordered_map<std::string, OnRequestReceive> callbacks_;
166 
167   // Used to persist and obtain metadata information for scheduler.
168   std::unique_ptr<RecoveryBase> scheduler_recovery_;
169 
170   // The node id of scale in nodes.
171   std::vector<std::string> scale_in_node_ids_;
172 
173   std::unique_ptr<InstanceManager> instance_manager_;
174 };
175 }  // namespace core
176 }  // namespace ps
177 }  // namespace mindspore
178 
179 #endif  // MINDSPORE_CCSRC_PS_CORE_SCHEDULER_NODE_H_
180