1 /** 2 * Copyright 2020 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_PS_CORE_SCHEDULER_NODE_H_ 18 #define MINDSPORE_CCSRC_PS_CORE_SCHEDULER_NODE_H_ 19 20 #include <atomic> 21 #include <cstdlib> 22 #include <iostream> 23 #include <memory> 24 #include <string> 25 #include <vector> 26 #include <thread> 27 #include <mutex> 28 #include <unordered_map> 29 30 #include "ps/core/cluster_config.h" 31 #include "ps/ps_context.h" 32 #include "ps/core/communicator/tcp_client.h" 33 #include "ps/core/communicator/tcp_server.h" 34 #include "ps/core/node_manager.h" 35 #include "ps/core/node.h" 36 #include "ps/core/communicator/request_process_result_code.h" 37 #include "ps/core/communicator/http_message_handler.h" 38 #include "ps/constants.h" 39 #include "ps/core/cluster_metadata.h" 40 #include "ps/core/communicator/http_server.h" 41 #include "ps/core/leader_scaler.h" 42 #include "ps/core/recovery_base.h" 43 #include "ps/core/instance_manager.h" 44 45 namespace mindspore { 46 namespace ps { 47 namespace core { 48 class SchedulerNode : public Node { 49 public: SchedulerNode()50 SchedulerNode() 51 : server_(nullptr), 52 scheduler_thread_(nullptr), 53 update_state_thread_(nullptr), 54 restful_thread_(nullptr), 55 http_server_(nullptr), 56 client_thread_(nullptr), 57 is_client_started_(false), 58 leader_scaler_(nullptr), 59 scheduler_recovery_(nullptr) {} 60 ~SchedulerNode() override; 61 62 typedef void (SchedulerNode::*ResponseHandler)(const std::shared_ptr<TcpServer> &server, 63 const std::shared_ptr<TcpConnection> &conn, 64 const std::shared_ptr<MessageMeta> &meta, const void *data, 65 size_t size); 66 67 bool Start(const uint32_t &timeout = PSContext::instance()->cluster_config().cluster_available_timeout) override; 68 bool Stop() override; 69 bool Finish(const uint32_t &timeout = kTimeoutInSeconds) override; 70 71 private: 72 void Initialize(); 73 74 void InitCommandHandler(); 75 void CreateTcpServer(); 76 void StartUpdateClusterStateTimer(); 77 const std::shared_ptr<TcpClient> &GetOrCreateClient(const NodeInfo &node_info); 78 79 void ProcessHeartbeat(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn, 80 const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size); 81 void ProcessRegister(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn, 82 const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size); 83 void ProcessFinish(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn, 84 const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size); 85 void ProcessFetchMetadata(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn, 86 const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size); 87 88 // Process scale_out_done messages from workers/servers 89 void ProcessScaleOutDone(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn, 90 const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size); 91 // Process scale_in_done messages from workers/servers 92 void ProcessScaleInDone(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn, 93 const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size); 94 95 // Process scale_in_done messages from workers/servers 96 void ProcessSendEvent(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn, 97 const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size); 98 99 // After scheduler collects all registered message, it actively sends finish to the node connected by the client. 100 void SendMetadata(const std::shared_ptr<TcpClient> &client, uint32_t rank_id); 101 // After scheduler collects all finish message, it actively sends finish to the node connected by the client. 102 void SendFinish(const std::shared_ptr<TcpClient> &client); 103 104 // After scheduler collects all scale_out_done message, it actively sends scale_out_done to the node connected by the 105 // client. 106 void SendScaleOutDone(const std::shared_ptr<TcpClient> &client); 107 108 // After scheduler collects all scale_in_done message, it actively sends scale_out_done to the node connected by the 109 // client. 110 void SendScaleInDone(const std::shared_ptr<TcpClient> &client); 111 // After scheduler receive SEND_EVENT message, it will broadcast the event to all other nodes. 112 void SendEvent(const std::shared_ptr<TcpClient> &client, const uint32_t &event); 113 114 // Handle the scale out http request, then delegate to the leader scaler to process scale out asynchronously. 115 void ProcessScaleOut(const std::shared_ptr<HttpMessageHandler> &resp); 116 117 // Handle the scale in http request, then delegate to the leader scaler to process scale in asynchronously. 118 void ProcessScaleIn(const std::shared_ptr<HttpMessageHandler> &resp); 119 120 // Handle the get nodes info http request Synchronously. 121 void ProcessGetNodesInfo(const std::shared_ptr<HttpMessageHandler> &resp); 122 123 // Handle the get cluster state http request Synchronously. 124 void ProcessGetClusterState(const std::shared_ptr<HttpMessageHandler> &resp); 125 126 // Handle the new instance http request Synchronously. 127 void ProcessNewInstance(const std::shared_ptr<HttpMessageHandler> &resp); 128 129 // Handle the query instance http request Synchronously. 130 void ProcessQueryInstance(const std::shared_ptr<HttpMessageHandler> &resp); 131 132 // Handle the enable FLS http request Synchronously. 133 void ProcessEnableFLS(const std::shared_ptr<HttpMessageHandler> &resp); 134 135 // Handle the disable FLS http request Synchronously. 136 void ProcessDisableFLS(const std::shared_ptr<HttpMessageHandler> &resp); 137 138 // check whether the cluster is in the ready state. 139 RequestProcessResult CheckIfClusterReady(); 140 141 // check whether the node id is legal. 142 RequestProcessResult CheckIfNodeIdLegal(const std::vector<std::string> &node_ids); 143 144 void StartRestfulServer(const std::string &address, std::uint16_t port, size_t thread_num = 10); 145 void StopRestfulServer(); 146 147 std::shared_ptr<TcpServer> server_; 148 std::unique_ptr<std::thread> scheduler_thread_; 149 std::unique_ptr<std::thread> update_state_thread_; 150 std::unordered_map<NodeCommand, ResponseHandler> handlers_; 151 152 NodeManager node_manager_; 153 154 // In this thread will start a http server. 155 std::unique_ptr<std::thread> restful_thread_; 156 std::shared_ptr<HttpServer> http_server_; 157 158 std::unordered_map<std::string, std::shared_ptr<TcpClient>> connected_nodes_; 159 160 std::unique_ptr<std::thread> client_thread_; 161 std::atomic<bool> is_client_started_; 162 163 std::unique_ptr<LeaderScaler> leader_scaler_; 164 165 std::unordered_map<std::string, OnRequestReceive> callbacks_; 166 167 // Used to persist and obtain metadata information for scheduler. 168 std::unique_ptr<RecoveryBase> scheduler_recovery_; 169 170 // The node id of scale in nodes. 171 std::vector<std::string> scale_in_node_ids_; 172 173 std::unique_ptr<InstanceManager> instance_manager_; 174 }; 175 } // namespace core 176 } // namespace ps 177 } // namespace mindspore 178 179 #endif // MINDSPORE_CCSRC_PS_CORE_SCHEDULER_NODE_H_ 180