1 /** 2 * Copyright 2020 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_SERVER_H_ 18 #define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_SERVER_H_ 19 20 #include <event2/buffer.h> 21 #include <event2/bufferevent.h> 22 #include <event2/event.h> 23 #include <event2/listener.h> 24 #include <event2/thread.h> 25 #include <event2/bufferevent_ssl.h> 26 #include <exception> 27 #include <functional> 28 #include <iostream> 29 #include <map> 30 #include <memory> 31 #include <mutex> 32 #include <string> 33 #include <vector> 34 #include <thread> 35 #include <atomic> 36 #include <utility> 37 38 #include "ps/core/communicator/tcp_message_handler.h" 39 #include "ps/core/communicator/ssl_wrapper.h" 40 #include "ps/core/cluster_config.h" 41 #include "utils/convert_utils_base.h" 42 #include "ps/core/comm_util.h" 43 #include "include/backend/distributed/ps/constants.h" 44 #include "include/backend/distributed/ps/ps_context.h" 45 #include "ps/core/file_configuration.h" 46 47 namespace mindspore { 48 namespace ps { 49 namespace core { 50 class TcpServer; 51 class TcpConnection { 52 public: TcpConnection(struct bufferevent * bev,const evutil_socket_t & fd,TcpServer * server)53 explicit TcpConnection(struct bufferevent *bev, const evutil_socket_t &fd, TcpServer *server) 54 : buffer_event_(bev), fd_(fd), server_(server) { 55 MS_LOG(WARNING) << "TcpConnection is constructed! fd is " << fd; 56 } 57 TcpConnection(const TcpConnection &); 58 virtual ~TcpConnection(); 59 60 using Callback = std::function<void(const std::shared_ptr<CommMessage>)>; 61 62 void InitConnection(const messageReceive &callback); 63 void SendMessage(const void *buffer, size_t num) const; 64 bool SendMessage(const std::shared_ptr<CommMessage> &message) const; 65 bool SendMessage(const std::shared_ptr<MessageMeta> &meta, const Protos &protos, const void *data, size_t size) const; 66 void OnReadHandler(const void *buffer, size_t numBytes); 67 const TcpServer *GetServer() const; 68 const evutil_socket_t &GetFd() const; 69 void set_callback(const Callback &callback); 70 71 protected: 72 struct bufferevent *buffer_event_; 73 evutil_socket_t fd_; 74 TcpServer *server_; 75 TcpMessageHandler tcp_message_handler_; 76 Callback callback_; 77 }; 78 79 using OnServerReceiveMessage = 80 std::function<void(const std::shared_ptr<TcpConnection> &conn, const std::shared_ptr<MessageMeta> &meta, 81 const Protos &protos, const void *data, size_t size)>; 82 83 class TcpServer { 84 public: 85 using OnConnected = std::function<void(const TcpServer &, const TcpConnection &)>; 86 using OnDisconnected = std::function<void(const TcpServer &, const TcpConnection &)>; 87 using OnAccepted = std::function<std::shared_ptr<TcpConnection>(const TcpServer &)>; 88 using OnTimerOnce = std::function<void(const TcpServer &)>; 89 using OnTimer = std::function<void()>; 90 91 TcpServer(const std::string &address, std::uint16_t port, Configuration *const config, 92 const std::pair<uint32_t, uint32_t> &port_range = {}); 93 TcpServer(const TcpServer &server); 94 virtual ~TcpServer(); 95 96 void SetServerCallback(const OnConnected &client_conn, const OnDisconnected &client_disconn, 97 const OnAccepted &client_accept); 98 void Init(); 99 void Start(); 100 void Stop(); 101 void SendToAllClients(const char *data, size_t len); 102 void AddConnection(const evutil_socket_t &fd, std::shared_ptr<TcpConnection> connection); 103 void RemoveConnection(const evutil_socket_t &fd); 104 std::shared_ptr<TcpConnection> &GetConnectionByFd(const evutil_socket_t &fd); 105 OnServerReceiveMessage GetServerReceive() const; 106 void SetMessageCallback(const OnServerReceiveMessage &cb); 107 bool SendMessage(const std::shared_ptr<TcpConnection> &conn, const std::shared_ptr<CommMessage> &message); 108 bool SendMessage(const std::shared_ptr<TcpConnection> &conn, const std::shared_ptr<MessageMeta> &meta, 109 const Protos &protos, const void *data, size_t sizee); 110 void SendMessage(const std::shared_ptr<CommMessage> &message); 111 uint16_t BoundPort() const; 112 std::string BoundIp() const; 113 int ConnectionNum() const; 114 const std::map<evutil_socket_t, std::shared_ptr<TcpConnection>> &Connections() const; 115 116 protected: 117 static void ListenerCallback(struct evconnlistener *listener, evutil_socket_t socket, struct sockaddr *saddr, 118 int socklen, void *server); 119 static void ListenerCallbackInner(evutil_socket_t socket, struct sockaddr *saddr, void *server); 120 static void SignalCallback(evutil_socket_t sig, std::int16_t events, void *server); 121 static void SignalCallbackInner(void *server); 122 static void ReadCallback(struct bufferevent *, void *connection); 123 static void ReadCallbackInner(struct bufferevent *, void *connection); 124 static void EventCallback(struct bufferevent *, std::int16_t events, void *server); 125 static void EventCallbackInner(struct bufferevent *, std::int16_t events, void *server); 126 static void SetTcpNoDelay(const evutil_socket_t &fd); 127 std::shared_ptr<TcpConnection> onCreateConnection(struct bufferevent *bev, const evutil_socket_t &fd); 128 129 struct event_base *base_; 130 struct event *signal_event_; 131 struct evconnlistener *listener_; 132 std::string server_address_; 133 std::uint16_t server_port_; 134 std::atomic<bool> is_stop_; 135 136 std::map<evutil_socket_t, std::shared_ptr<TcpConnection>> connections_; 137 OnConnected client_connection_; 138 OnDisconnected client_disconnection_; 139 OnAccepted client_accept_; 140 std::mutex connection_mutex_; 141 OnServerReceiveMessage message_callback_; 142 // The Configuration file 143 Configuration *config_; 144 int64_t max_connection_; 145 std::pair<uint32_t, uint32_t> port_range_; 146 }; 147 } // namespace core 148 } // namespace ps 149 } // namespace mindspore 150 #endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_SERVER_H_ 151