1 /** 2 * Copyright 2020 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_CLIENT_H_ 18 #define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_CLIENT_H_ 19 20 #include <event2/event.h> 21 #include <event2/bufferevent.h> 22 #include <event2/thread.h> 23 #include <event2/bufferevent_ssl.h> 24 #include <functional> 25 #include <string> 26 #include <memory> 27 #include <vector> 28 #include <thread> 29 #include <mutex> 30 #include <atomic> 31 #include <condition_variable> 32 #include "ps/core/cluster_config.h" 33 #include "utils/convert_utils_base.h" 34 #include "ps/core/comm_util.h" 35 #include "ps/core/communicator/ssl_client.h" 36 #include "ps/core/communicator/ssl_wrapper.h" 37 #include "include/backend/distributed/ps/constants.h" 38 #include "include/backend/distributed/ps/ps_context.h" 39 #include "ps/core/communicator/tcp_message_handler.h" 40 #include "ps/core/file_configuration.h" 41 42 namespace mindspore { 43 namespace ps { 44 namespace core { 45 class TcpClient { 46 public: 47 using OnConnected = std::function<void()>; 48 using OnDisconnected = std::function<void()>; 49 using OnRead = std::function<void(const void *, size_t)>; 50 using OnTimeout = std::function<void()>; 51 using OnMessage = 52 std::function<void(const std::shared_ptr<MessageMeta> &, const Protos &, const void *, size_t size)>; 53 using OnTimer = std::function<void()>; 54 55 explicit TcpClient(const std::string &address, std::uint16_t port, NodeRole peer_role); 56 virtual ~TcpClient(); 57 58 std::string GetServerAddress() const; 59 void set_disconnected_callback(const OnDisconnected &disconnected); 60 void set_connected_callback(const OnConnected &connected); 61 bool WaitConnected( 62 const uint32_t &connected_timeout = PSContext::instance()->cluster_config().cluster_available_timeout); 63 void Init(); 64 void StartWithDelay(int seconds); 65 void Stop(); 66 void Start(); 67 void StartWithNoBlock(); 68 void SetMessageCallback(const OnMessage &cb); 69 bool SendMessage(const CommMessage &message) const; 70 bool SendMessage(const std::shared_ptr<MessageMeta> &meta, const Protos &protos, const void *data, size_t size); 71 void set_timer_callback(const OnTimer &timer); 72 const event_base &eventbase() const; connection_status()73 int connection_status() { return connection_status_; } is_started()74 static bool is_started() { return is_started_; } 75 76 protected: 77 static void SetTcpNoDelay(const evutil_socket_t &fd); 78 static void TimeoutCallback(evutil_socket_t fd, std::int16_t what, void *arg); 79 static void ReadCallback(struct bufferevent *bev, void *ctx); 80 void ReadCallbackInner(struct bufferevent *bev); 81 static void EventCallback(struct bufferevent *bev, std::int16_t events, void *ptr); 82 void EventCallbackInner(struct bufferevent *bev, std::int16_t events); 83 virtual void OnReadHandler(const void *buf, size_t num); 84 static void TimerCallback(evutil_socket_t fd, int16_t event, void *arg); 85 void NotifyConnected(); 86 bool EstablishSSL(); 87 88 std::string PeerRoleName() const; 89 90 private: 91 OnMessage message_callback_; 92 TcpMessageHandler message_handler_; 93 94 OnConnected connected_callback_; 95 OnDisconnected disconnected_callback_; 96 OnRead read_callback_; 97 OnTimeout timeout_callback_; 98 OnTimer on_timer_callback_; 99 100 static event_base *event_base_; 101 static std::mutex event_base_mutex_; 102 static bool is_started_; 103 104 std::mutex connection_mutex_; 105 std::condition_variable connection_cond_; 106 event *event_timeout_; 107 bufferevent *buffer_event_; 108 109 std::string server_address_; 110 std::uint16_t server_port_; 111 NodeRole peer_role_; 112 // -1:disconnected, 0:connecting, 1:connected 113 std::atomic<int> connection_status_; 114 // The Configuration file 115 Configuration *config_; 116 }; 117 } // namespace core 118 } // namespace ps 119 } // namespace mindspore 120 #endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_CLIENT_H_ 121