• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_CLIENT_H_
18 #define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_CLIENT_H_
19 
20 #include <event2/event.h>
21 #include <event2/bufferevent.h>
22 #include <event2/thread.h>
23 #include <event2/bufferevent_ssl.h>
24 #include <functional>
25 #include <string>
26 #include <memory>
27 #include <vector>
28 #include <thread>
29 #include <mutex>
30 #include <atomic>
31 #include <condition_variable>
32 #include "ps/core/cluster_config.h"
33 #include "utils/convert_utils_base.h"
34 #include "ps/core/comm_util.h"
35 #include "ps/core/communicator/ssl_client.h"
36 #include "ps/core/communicator/ssl_wrapper.h"
37 #include "include/backend/distributed/ps/constants.h"
38 #include "include/backend/distributed/ps/ps_context.h"
39 #include "ps/core/communicator/tcp_message_handler.h"
40 #include "ps/core/file_configuration.h"
41 
42 namespace mindspore {
43 namespace ps {
44 namespace core {
45 class TcpClient {
46  public:
47   using OnConnected = std::function<void()>;
48   using OnDisconnected = std::function<void()>;
49   using OnRead = std::function<void(const void *, size_t)>;
50   using OnTimeout = std::function<void()>;
51   using OnMessage =
52     std::function<void(const std::shared_ptr<MessageMeta> &, const Protos &, const void *, size_t size)>;
53   using OnTimer = std::function<void()>;
54 
55   explicit TcpClient(const std::string &address, std::uint16_t port, NodeRole peer_role);
56   virtual ~TcpClient();
57 
58   std::string GetServerAddress() const;
59   void set_disconnected_callback(const OnDisconnected &disconnected);
60   void set_connected_callback(const OnConnected &connected);
61   bool WaitConnected(
62     const uint32_t &connected_timeout = PSContext::instance()->cluster_config().cluster_available_timeout);
63   void Init();
64   void StartWithDelay(int seconds);
65   void Stop();
66   void Start();
67   void StartWithNoBlock();
68   void SetMessageCallback(const OnMessage &cb);
69   bool SendMessage(const CommMessage &message) const;
70   bool SendMessage(const std::shared_ptr<MessageMeta> &meta, const Protos &protos, const void *data, size_t size);
71   void set_timer_callback(const OnTimer &timer);
72   const event_base &eventbase() const;
connection_status()73   int connection_status() { return connection_status_; }
is_started()74   static bool is_started() { return is_started_; }
75 
76  protected:
77   static void SetTcpNoDelay(const evutil_socket_t &fd);
78   static void TimeoutCallback(evutil_socket_t fd, std::int16_t what, void *arg);
79   static void ReadCallback(struct bufferevent *bev, void *ctx);
80   void ReadCallbackInner(struct bufferevent *bev);
81   static void EventCallback(struct bufferevent *bev, std::int16_t events, void *ptr);
82   void EventCallbackInner(struct bufferevent *bev, std::int16_t events);
83   virtual void OnReadHandler(const void *buf, size_t num);
84   static void TimerCallback(evutil_socket_t fd, int16_t event, void *arg);
85   void NotifyConnected();
86   bool EstablishSSL();
87 
88   std::string PeerRoleName() const;
89 
90  private:
91   OnMessage message_callback_;
92   TcpMessageHandler message_handler_;
93 
94   OnConnected connected_callback_;
95   OnDisconnected disconnected_callback_;
96   OnRead read_callback_;
97   OnTimeout timeout_callback_;
98   OnTimer on_timer_callback_;
99 
100   static event_base *event_base_;
101   static std::mutex event_base_mutex_;
102   static bool is_started_;
103 
104   std::mutex connection_mutex_;
105   std::condition_variable connection_cond_;
106   event *event_timeout_;
107   bufferevent *buffer_event_;
108 
109   std::string server_address_;
110   std::uint16_t server_port_;
111   NodeRole peer_role_;
112   // -1:disconnected, 0:connecting, 1:connected
113   std::atomic<int> connection_status_;
114   // The Configuration file
115   Configuration *config_;
116 };
117 }  // namespace core
118 }  // namespace ps
119 }  // namespace mindspore
120 #endif  // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_CLIENT_H_
121