• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_SERVER_H_
18 #define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_SERVER_H_
19 
20 #include <event2/buffer.h>
21 #include <event2/bufferevent.h>
22 #include <event2/event.h>
23 #include <event2/listener.h>
24 #include <event2/thread.h>
25 #include <event2/bufferevent_ssl.h>
26 #include <exception>
27 #include <functional>
28 #include <iostream>
29 #include <map>
30 #include <memory>
31 #include <mutex>
32 #include <string>
33 #include <vector>
34 #include <thread>
35 #include <atomic>
36 #include <utility>
37 
38 #include "ps/core/communicator/tcp_message_handler.h"
39 #include "ps/core/communicator/ssl_wrapper.h"
40 #include "ps/core/cluster_config.h"
41 #include "utils/convert_utils_base.h"
42 #include "ps/core/comm_util.h"
43 #include "include/backend/distributed/ps/constants.h"
44 #include "include/backend/distributed/ps/ps_context.h"
45 #include "ps/core/file_configuration.h"
46 
47 namespace mindspore {
48 namespace ps {
49 namespace core {
50 class TcpServer;
51 class TcpConnection {
52  public:
TcpConnection(struct bufferevent * bev,const evutil_socket_t & fd,TcpServer * server)53   explicit TcpConnection(struct bufferevent *bev, const evutil_socket_t &fd, TcpServer *server)
54       : buffer_event_(bev), fd_(fd), server_(server) {
55     MS_LOG(WARNING) << "TcpConnection is constructed! fd is " << fd;
56   }
57   TcpConnection(const TcpConnection &);
58   virtual ~TcpConnection();
59 
60   using Callback = std::function<void(const std::shared_ptr<CommMessage>)>;
61 
62   void InitConnection(const messageReceive &callback);
63   void SendMessage(const void *buffer, size_t num) const;
64   bool SendMessage(const std::shared_ptr<CommMessage> &message) const;
65   bool SendMessage(const std::shared_ptr<MessageMeta> &meta, const Protos &protos, const void *data, size_t size) const;
66   void OnReadHandler(const void *buffer, size_t numBytes);
67   const TcpServer *GetServer() const;
68   const evutil_socket_t &GetFd() const;
69   void set_callback(const Callback &callback);
70 
71  protected:
72   struct bufferevent *buffer_event_;
73   evutil_socket_t fd_;
74   TcpServer *server_;
75   TcpMessageHandler tcp_message_handler_;
76   Callback callback_;
77 };
78 
79 using OnServerReceiveMessage =
80   std::function<void(const std::shared_ptr<TcpConnection> &conn, const std::shared_ptr<MessageMeta> &meta,
81                      const Protos &protos, const void *data, size_t size)>;
82 
83 class TcpServer {
84  public:
85   using OnConnected = std::function<void(const TcpServer &, const TcpConnection &)>;
86   using OnDisconnected = std::function<void(const TcpServer &, const TcpConnection &)>;
87   using OnAccepted = std::function<std::shared_ptr<TcpConnection>(const TcpServer &)>;
88   using OnTimerOnce = std::function<void(const TcpServer &)>;
89   using OnTimer = std::function<void()>;
90 
91   TcpServer(const std::string &address, std::uint16_t port, Configuration *const config,
92             const std::pair<uint32_t, uint32_t> &port_range = {});
93   TcpServer(const TcpServer &server);
94   virtual ~TcpServer();
95 
96   void SetServerCallback(const OnConnected &client_conn, const OnDisconnected &client_disconn,
97                          const OnAccepted &client_accept);
98   void Init();
99   void Start();
100   void Stop();
101   void SendToAllClients(const char *data, size_t len);
102   void AddConnection(const evutil_socket_t &fd, std::shared_ptr<TcpConnection> connection);
103   void RemoveConnection(const evutil_socket_t &fd);
104   std::shared_ptr<TcpConnection> &GetConnectionByFd(const evutil_socket_t &fd);
105   OnServerReceiveMessage GetServerReceive() const;
106   void SetMessageCallback(const OnServerReceiveMessage &cb);
107   bool SendMessage(const std::shared_ptr<TcpConnection> &conn, const std::shared_ptr<CommMessage> &message);
108   bool SendMessage(const std::shared_ptr<TcpConnection> &conn, const std::shared_ptr<MessageMeta> &meta,
109                    const Protos &protos, const void *data, size_t sizee);
110   void SendMessage(const std::shared_ptr<CommMessage> &message);
111   uint16_t BoundPort() const;
112   std::string BoundIp() const;
113   int ConnectionNum() const;
114   const std::map<evutil_socket_t, std::shared_ptr<TcpConnection>> &Connections() const;
115 
116  protected:
117   static void ListenerCallback(struct evconnlistener *listener, evutil_socket_t socket, struct sockaddr *saddr,
118                                int socklen, void *server);
119   static void ListenerCallbackInner(evutil_socket_t socket, struct sockaddr *saddr, void *server);
120   static void SignalCallback(evutil_socket_t sig, std::int16_t events, void *server);
121   static void SignalCallbackInner(void *server);
122   static void ReadCallback(struct bufferevent *, void *connection);
123   static void ReadCallbackInner(struct bufferevent *, void *connection);
124   static void EventCallback(struct bufferevent *, std::int16_t events, void *server);
125   static void EventCallbackInner(struct bufferevent *, std::int16_t events, void *server);
126   static void SetTcpNoDelay(const evutil_socket_t &fd);
127   std::shared_ptr<TcpConnection> onCreateConnection(struct bufferevent *bev, const evutil_socket_t &fd);
128 
129   struct event_base *base_;
130   struct event *signal_event_;
131   struct evconnlistener *listener_;
132   std::string server_address_;
133   std::uint16_t server_port_;
134   std::atomic<bool> is_stop_;
135 
136   std::map<evutil_socket_t, std::shared_ptr<TcpConnection>> connections_;
137   OnConnected client_connection_;
138   OnDisconnected client_disconnection_;
139   OnAccepted client_accept_;
140   std::mutex connection_mutex_;
141   OnServerReceiveMessage message_callback_;
142   // The Configuration file
143   Configuration *config_;
144   int64_t max_connection_;
145   std::pair<uint32_t, uint32_t> port_range_;
146 };
147 }  // namespace core
148 }  // namespace ps
149 }  // namespace mindspore
150 #endif  // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_SERVER_H_
151