• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_CLIENT_H_
18 #define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_CLIENT_H_
19 
20 #include <event2/event.h>
21 #include <event2/bufferevent.h>
22 #include <event2/thread.h>
23 #include <event2/bufferevent_ssl.h>
24 
25 #include <functional>
26 #include <string>
27 #include <memory>
28 #include <vector>
29 #include <thread>
30 #include <mutex>
31 #include <atomic>
32 #include <condition_variable>
33 
34 #include "ps/core/cluster_config.h"
35 #include "utils/convert_utils_base.h"
36 #include "ps/core/comm_util.h"
37 #include "ps/core/communicator/ssl_client.h"
38 #include "ps/core/communicator/ssl_wrapper.h"
39 #include "ps/constants.h"
40 #include "ps/ps_context.h"
41 #include "ps/core/communicator/tcp_message_handler.h"
42 #include "ps/core/file_configuration.h"
43 
44 namespace mindspore {
45 namespace ps {
46 namespace core {
47 class TcpClient {
48  public:
49   using OnConnected = std::function<void()>;
50   using OnDisconnected = std::function<void()>;
51   using OnRead = std::function<void(const void *, size_t)>;
52   using OnTimeout = std::function<void()>;
53   using OnMessage =
54     std::function<void(const std::shared_ptr<MessageMeta> &, const Protos &, const void *, size_t size)>;
55   using OnTimer = std::function<void()>;
56 
57   explicit TcpClient(const std::string &address, std::uint16_t port, Configuration *const config);
58   virtual ~TcpClient();
59 
60   std::string GetServerAddress() const;
61   void set_disconnected_callback(const OnDisconnected &disconnected);
62   void set_connected_callback(const OnConnected &connected);
63   bool WaitConnected(
64     const uint32_t &connected_timeout = PSContext::instance()->cluster_config().cluster_available_timeout);
65   void Init();
66   void StartWithDelay(int seconds);
67   void Stop();
68   void Start();
69   void StartWithNoBlock();
70   void SetMessageCallback(const OnMessage &cb);
71   bool SendMessage(const CommMessage &message) const;
72   bool SendMessage(const std::shared_ptr<MessageMeta> &meta, const Protos &protos, const void *data, size_t size);
73   void set_timer_callback(const OnTimer &timer);
74   const event_base &eventbase() const;
75 
76  protected:
77   static void SetTcpNoDelay(const evutil_socket_t &fd);
78   static void TimeoutCallback(evutil_socket_t fd, std::int16_t what, void *arg);
79   static void ReadCallback(struct bufferevent *bev, void *ctx);
80   static void EventCallback(struct bufferevent *bev, std::int16_t events, void *ptr);
81   virtual void OnReadHandler(const void *buf, size_t num);
82   static void TimerCallback(evutil_socket_t fd, int16_t event, void *arg);
83   void NotifyConnected();
84   bool EstablishSSL();
85 
86  private:
87   OnMessage message_callback_;
88   TcpMessageHandler message_handler_;
89 
90   OnConnected connected_callback_;
91   OnDisconnected disconnected_callback_;
92   OnRead read_callback_;
93   OnTimeout timeout_callback_;
94   OnTimer on_timer_callback_;
95 
96   static event_base *event_base_;
97   static std::mutex event_base_mutex_;
98   static bool is_started_;
99 
100   std::mutex connection_mutex_;
101   std::condition_variable connection_cond_;
102   event *event_timeout_;
103   bufferevent *buffer_event_;
104 
105   std::string server_address_;
106   std::uint16_t server_port_;
107   std::atomic<bool> is_stop_;
108   std::atomic<bool> is_connected_;
109   // The Configuration file
110   Configuration *config_;
111 };
112 }  // namespace core
113 }  // namespace ps
114 }  // namespace mindspore
115 #endif  // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_CLIENT_H_
116