• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1  //
2  // async_tcp_client.cpp
3  // ~~~~~~~~~~~~~~~~~~~~
4  //
5  // Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6  //
7  // Distributed under the Boost Software License, Version 1.0. (See accompanying
8  // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9  //
10  
11  #include <boost/asio/buffer.hpp>
12  #include <boost/asio/io_context.hpp>
13  #include <boost/asio/ip/tcp.hpp>
14  #include <boost/asio/read_until.hpp>
15  #include <boost/asio/steady_timer.hpp>
16  #include <boost/asio/write.hpp>
17  #include <boost/bind/bind.hpp>
18  #include <iostream>
19  #include <string>
20  
21  using boost::asio::steady_timer;
22  using boost::asio::ip::tcp;
23  
24  //
25  // This class manages socket timeouts by applying the concept of a deadline.
26  // Some asynchronous operations are given deadlines by which they must complete.
27  // Deadlines are enforced by an "actor" that persists for the lifetime of the
28  // client object:
29  //
30  //  +----------------+
31  //  |                |
32  //  | check_deadline |<---+
33  //  |                |    |
34  //  +----------------+    | async_wait()
35  //              |         |
36  //              +---------+
37  //
38  // If the deadline actor determines that the deadline has expired, the socket
39  // is closed and any outstanding operations are consequently cancelled.
40  //
41  // Connection establishment involves trying each endpoint in turn until a
42  // connection is successful, or the available endpoints are exhausted. If the
43  // deadline actor closes the socket, the connect actor is woken up and moves to
44  // the next endpoint.
45  //
46  //  +---------------+
47  //  |               |
48  //  | start_connect |<---+
49  //  |               |    |
50  //  +---------------+    |
51  //           |           |
52  //  async_-  |    +----------------+
53  // connect() |    |                |
54  //           +--->| handle_connect |
55  //                |                |
56  //                +----------------+
57  //                          :
58  // Once a connection is     :
59  // made, the connect        :
60  // actor forks in two -     :
61  //                          :
62  // an actor for reading     :       and an actor for
63  // inbound messages:        :       sending heartbeats:
64  //                          :
65  //  +------------+          :          +-------------+
66  //  |            |<- - - - -+- - - - ->|             |
67  //  | start_read |                     | start_write |<---+
68  //  |            |<---+                |             |    |
69  //  +------------+    |                +-------------+    | async_wait()
70  //          |         |                        |          |
71  //  async_- |    +-------------+       async_- |    +--------------+
72  //   read_- |    |             |       write() |    |              |
73  //  until() +--->| handle_read |               +--->| handle_write |
74  //               |             |                    |              |
75  //               +-------------+                    +--------------+
76  //
77  // The input actor reads messages from the socket, where messages are delimited
78  // by the newline character. The deadline for a complete message is 30 seconds.
79  //
80  // The heartbeat actor sends a heartbeat (a message that consists of a single
81  // newline character) every 10 seconds. In this example, no deadline is applied
82  // to message sending.
83  //
84  class client
85  {
86  public:
client(boost::asio::io_context & io_context)87    client(boost::asio::io_context& io_context)
88      : stopped_(false),
89        socket_(io_context),
90        deadline_(io_context),
91        heartbeat_timer_(io_context)
92    {
93    }
94  
95    // Called by the user of the client class to initiate the connection process.
96    // The endpoints will have been obtained using a tcp::resolver.
start(tcp::resolver::results_type endpoints)97    void start(tcp::resolver::results_type endpoints)
98    {
99      // Start the connect actor.
100      endpoints_ = endpoints;
101      start_connect(endpoints_.begin());
102  
103      // Start the deadline actor. You will note that we're not setting any
104      // particular deadline here. Instead, the connect and input actors will
105      // update the deadline prior to each asynchronous operation.
106      deadline_.async_wait(boost::bind(&client::check_deadline, this));
107    }
108  
109    // This function terminates all the actors to shut down the connection. It
110    // may be called by the user of the client class, or by the class itself in
111    // response to graceful termination or an unrecoverable error.
stop()112    void stop()
113    {
114      stopped_ = true;
115      boost::system::error_code ignored_ec;
116      socket_.close(ignored_ec);
117      deadline_.cancel();
118      heartbeat_timer_.cancel();
119    }
120  
121  private:
start_connect(tcp::resolver::results_type::iterator endpoint_iter)122    void start_connect(tcp::resolver::results_type::iterator endpoint_iter)
123    {
124      if (endpoint_iter != endpoints_.end())
125      {
126        std::cout << "Trying " << endpoint_iter->endpoint() << "...\n";
127  
128        // Set a deadline for the connect operation.
129        deadline_.expires_after(boost::asio::chrono::seconds(60));
130  
131        // Start the asynchronous connect operation.
132        socket_.async_connect(endpoint_iter->endpoint(),
133            boost::bind(&client::handle_connect, this,
134              boost::placeholders::_1, endpoint_iter));
135      }
136      else
137      {
138        // There are no more endpoints to try. Shut down the client.
139        stop();
140      }
141    }
142  
handle_connect(const boost::system::error_code & ec,tcp::resolver::results_type::iterator endpoint_iter)143    void handle_connect(const boost::system::error_code& ec,
144        tcp::resolver::results_type::iterator endpoint_iter)
145    {
146      if (stopped_)
147        return;
148  
149      // The async_connect() function automatically opens the socket at the start
150      // of the asynchronous operation. If the socket is closed at this time then
151      // the timeout handler must have run first.
152      if (!socket_.is_open())
153      {
154        std::cout << "Connect timed out\n";
155  
156        // Try the next available endpoint.
157        start_connect(++endpoint_iter);
158      }
159  
160      // Check if the connect operation failed before the deadline expired.
161      else if (ec)
162      {
163        std::cout << "Connect error: " << ec.message() << "\n";
164  
165        // We need to close the socket used in the previous connection attempt
166        // before starting a new one.
167        socket_.close();
168  
169        // Try the next available endpoint.
170        start_connect(++endpoint_iter);
171      }
172  
173      // Otherwise we have successfully established a connection.
174      else
175      {
176        std::cout << "Connected to " << endpoint_iter->endpoint() << "\n";
177  
178        // Start the input actor.
179        start_read();
180  
181        // Start the heartbeat actor.
182        start_write();
183      }
184    }
185  
start_read()186    void start_read()
187    {
188      // Set a deadline for the read operation.
189      deadline_.expires_after(boost::asio::chrono::seconds(30));
190  
191      // Start an asynchronous operation to read a newline-delimited message.
192      boost::asio::async_read_until(socket_,
193          boost::asio::dynamic_buffer(input_buffer_), '\n',
194          boost::bind(&client::handle_read, this,
195            boost::placeholders::_1, boost::placeholders::_2));
196    }
197  
handle_read(const boost::system::error_code & ec,std::size_t n)198    void handle_read(const boost::system::error_code& ec, std::size_t n)
199    {
200      if (stopped_)
201        return;
202  
203      if (!ec)
204      {
205        // Extract the newline-delimited message from the buffer.
206        std::string line(input_buffer_.substr(0, n - 1));
207        input_buffer_.erase(0, n);
208  
209        // Empty messages are heartbeats and so ignored.
210        if (!line.empty())
211        {
212          std::cout << "Received: " << line << "\n";
213        }
214  
215        start_read();
216      }
217      else
218      {
219        std::cout << "Error on receive: " << ec.message() << "\n";
220  
221        stop();
222      }
223    }
224  
start_write()225    void start_write()
226    {
227      if (stopped_)
228        return;
229  
230      // Start an asynchronous operation to send a heartbeat message.
231      boost::asio::async_write(socket_, boost::asio::buffer("\n", 1),
232          boost::bind(&client::handle_write, this, boost::placeholders::_1));
233    }
234  
handle_write(const boost::system::error_code & ec)235    void handle_write(const boost::system::error_code& ec)
236    {
237      if (stopped_)
238        return;
239  
240      if (!ec)
241      {
242        // Wait 10 seconds before sending the next heartbeat.
243        heartbeat_timer_.expires_after(boost::asio::chrono::seconds(10));
244        heartbeat_timer_.async_wait(boost::bind(&client::start_write, this));
245      }
246      else
247      {
248        std::cout << "Error on heartbeat: " << ec.message() << "\n";
249  
250        stop();
251      }
252    }
253  
check_deadline()254    void check_deadline()
255    {
256      if (stopped_)
257        return;
258  
259      // Check whether the deadline has passed. We compare the deadline against
260      // the current time since a new asynchronous operation may have moved the
261      // deadline before this actor had a chance to run.
262      if (deadline_.expiry() <= steady_timer::clock_type::now())
263      {
264        // The deadline has passed. The socket is closed so that any outstanding
265        // asynchronous operations are cancelled.
266        socket_.close();
267  
268        // There is no longer an active deadline. The expiry is set to the
269        // maximum time point so that the actor takes no action until a new
270        // deadline is set.
271        deadline_.expires_at(steady_timer::time_point::max());
272      }
273  
274      // Put the actor back to sleep.
275      deadline_.async_wait(boost::bind(&client::check_deadline, this));
276    }
277  
278  private:
279    bool stopped_;
280    tcp::resolver::results_type endpoints_;
281    tcp::socket socket_;
282    std::string input_buffer_;
283    steady_timer deadline_;
284    steady_timer heartbeat_timer_;
285  };
286  
main(int argc,char * argv[])287  int main(int argc, char* argv[])
288  {
289    try
290    {
291      if (argc != 3)
292      {
293        std::cerr << "Usage: client <host> <port>\n";
294        return 1;
295      }
296  
297      boost::asio::io_context io_context;
298      tcp::resolver r(io_context);
299      client c(io_context);
300  
301      c.start(r.resolve(argv[1], argv[2]));
302  
303      io_context.run();
304    }
305    catch (std::exception& e)
306    {
307      std::cerr << "Exception: " << e.what() << "\n";
308    }
309  
310    return 0;
311  }
312