• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // async_tcp_client.cpp
3 // ~~~~~~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 
11 #include <boost/asio/buffer.hpp>
12 #include <boost/asio/io_context.hpp>
13 #include <boost/asio/ip/tcp.hpp>
14 #include <boost/asio/read_until.hpp>
15 #include <boost/asio/steady_timer.hpp>
16 #include <boost/asio/write.hpp>
17 #include <functional>
18 #include <iostream>
19 #include <string>
20 
21 using boost::asio::steady_timer;
22 using boost::asio::ip::tcp;
23 using std::placeholders::_1;
24 using std::placeholders::_2;
25 
26 //
27 // This class manages socket timeouts by applying the concept of a deadline.
28 // Some asynchronous operations are given deadlines by which they must complete.
29 // Deadlines are enforced by an "actor" that persists for the lifetime of the
30 // client object:
31 //
32 //  +----------------+
33 //  |                |
34 //  | check_deadline |<---+
35 //  |                |    |
36 //  +----------------+    | async_wait()
37 //              |         |
38 //              +---------+
39 //
40 // If the deadline actor determines that the deadline has expired, the socket
41 // is closed and any outstanding operations are consequently cancelled.
42 //
43 // Connection establishment involves trying each endpoint in turn until a
44 // connection is successful, or the available endpoints are exhausted. If the
45 // deadline actor closes the socket, the connect actor is woken up and moves to
46 // the next endpoint.
47 //
48 //  +---------------+
49 //  |               |
50 //  | start_connect |<---+
51 //  |               |    |
52 //  +---------------+    |
53 //           |           |
54 //  async_-  |    +----------------+
55 // connect() |    |                |
56 //           +--->| handle_connect |
57 //                |                |
58 //                +----------------+
59 //                          :
60 // Once a connection is     :
61 // made, the connect        :
62 // actor forks in two -     :
63 //                          :
64 // an actor for reading     :       and an actor for
65 // inbound messages:        :       sending heartbeats:
66 //                          :
67 //  +------------+          :          +-------------+
68 //  |            |<- - - - -+- - - - ->|             |
69 //  | start_read |                     | start_write |<---+
70 //  |            |<---+                |             |    |
71 //  +------------+    |                +-------------+    | async_wait()
72 //          |         |                        |          |
73 //  async_- |    +-------------+       async_- |    +--------------+
74 //   read_- |    |             |       write() |    |              |
75 //  until() +--->| handle_read |               +--->| handle_write |
76 //               |             |                    |              |
77 //               +-------------+                    +--------------+
78 //
79 // The input actor reads messages from the socket, where messages are delimited
80 // by the newline character. The deadline for a complete message is 30 seconds.
81 //
82 // The heartbeat actor sends a heartbeat (a message that consists of a single
83 // newline character) every 10 seconds. In this example, no deadline is applied
84 // to message sending.
85 //
86 class client
87 {
88 public:
client(boost::asio::io_context & io_context)89   client(boost::asio::io_context& io_context)
90     : socket_(io_context),
91       deadline_(io_context),
92       heartbeat_timer_(io_context)
93   {
94   }
95 
96   // Called by the user of the client class to initiate the connection process.
97   // The endpoints will have been obtained using a tcp::resolver.
start(tcp::resolver::results_type endpoints)98   void start(tcp::resolver::results_type endpoints)
99   {
100     // Start the connect actor.
101     endpoints_ = endpoints;
102     start_connect(endpoints_.begin());
103 
104     // Start the deadline actor. You will note that we're not setting any
105     // particular deadline here. Instead, the connect and input actors will
106     // update the deadline prior to each asynchronous operation.
107     deadline_.async_wait(std::bind(&client::check_deadline, this));
108   }
109 
110   // This function terminates all the actors to shut down the connection. It
111   // may be called by the user of the client class, or by the class itself in
112   // response to graceful termination or an unrecoverable error.
stop()113   void stop()
114   {
115     stopped_ = true;
116     boost::system::error_code ignored_error;
117     socket_.close(ignored_error);
118     deadline_.cancel();
119     heartbeat_timer_.cancel();
120   }
121 
122 private:
start_connect(tcp::resolver::results_type::iterator endpoint_iter)123   void start_connect(tcp::resolver::results_type::iterator endpoint_iter)
124   {
125     if (endpoint_iter != endpoints_.end())
126     {
127       std::cout << "Trying " << endpoint_iter->endpoint() << "...\n";
128 
129       // Set a deadline for the connect operation.
130       deadline_.expires_after(std::chrono::seconds(60));
131 
132       // Start the asynchronous connect operation.
133       socket_.async_connect(endpoint_iter->endpoint(),
134           std::bind(&client::handle_connect,
135             this, _1, endpoint_iter));
136     }
137     else
138     {
139       // There are no more endpoints to try. Shut down the client.
140       stop();
141     }
142   }
143 
handle_connect(const boost::system::error_code & error,tcp::resolver::results_type::iterator endpoint_iter)144   void handle_connect(const boost::system::error_code& error,
145       tcp::resolver::results_type::iterator endpoint_iter)
146   {
147     if (stopped_)
148       return;
149 
150     // The async_connect() function automatically opens the socket at the start
151     // of the asynchronous operation. If the socket is closed at this time then
152     // the timeout handler must have run first.
153     if (!socket_.is_open())
154     {
155       std::cout << "Connect timed out\n";
156 
157       // Try the next available endpoint.
158       start_connect(++endpoint_iter);
159     }
160 
161     // Check if the connect operation failed before the deadline expired.
162     else if (error)
163     {
164       std::cout << "Connect error: " << error.message() << "\n";
165 
166       // We need to close the socket used in the previous connection attempt
167       // before starting a new one.
168       socket_.close();
169 
170       // Try the next available endpoint.
171       start_connect(++endpoint_iter);
172     }
173 
174     // Otherwise we have successfully established a connection.
175     else
176     {
177       std::cout << "Connected to " << endpoint_iter->endpoint() << "\n";
178 
179       // Start the input actor.
180       start_read();
181 
182       // Start the heartbeat actor.
183       start_write();
184     }
185   }
186 
start_read()187   void start_read()
188   {
189     // Set a deadline for the read operation.
190     deadline_.expires_after(std::chrono::seconds(30));
191 
192     // Start an asynchronous operation to read a newline-delimited message.
193     boost::asio::async_read_until(socket_,
194         boost::asio::dynamic_buffer(input_buffer_), '\n',
195         std::bind(&client::handle_read, this, _1, _2));
196   }
197 
handle_read(const boost::system::error_code & error,std::size_t n)198   void handle_read(const boost::system::error_code& error, std::size_t n)
199   {
200     if (stopped_)
201       return;
202 
203     if (!error)
204     {
205       // Extract the newline-delimited message from the buffer.
206       std::string line(input_buffer_.substr(0, n - 1));
207       input_buffer_.erase(0, n);
208 
209       // Empty messages are heartbeats and so ignored.
210       if (!line.empty())
211       {
212         std::cout << "Received: " << line << "\n";
213       }
214 
215       start_read();
216     }
217     else
218     {
219       std::cout << "Error on receive: " << error.message() << "\n";
220 
221       stop();
222     }
223   }
224 
start_write()225   void start_write()
226   {
227     if (stopped_)
228       return;
229 
230     // Start an asynchronous operation to send a heartbeat message.
231     boost::asio::async_write(socket_, boost::asio::buffer("\n", 1),
232         std::bind(&client::handle_write, this, _1));
233   }
234 
handle_write(const boost::system::error_code & error)235   void handle_write(const boost::system::error_code& error)
236   {
237     if (stopped_)
238       return;
239 
240     if (!error)
241     {
242       // Wait 10 seconds before sending the next heartbeat.
243       heartbeat_timer_.expires_after(std::chrono::seconds(10));
244       heartbeat_timer_.async_wait(std::bind(&client::start_write, this));
245     }
246     else
247     {
248       std::cout << "Error on heartbeat: " << error.message() << "\n";
249 
250       stop();
251     }
252   }
253 
check_deadline()254   void check_deadline()
255   {
256     if (stopped_)
257       return;
258 
259     // Check whether the deadline has passed. We compare the deadline against
260     // the current time since a new asynchronous operation may have moved the
261     // deadline before this actor had a chance to run.
262     if (deadline_.expiry() <= steady_timer::clock_type::now())
263     {
264       // The deadline has passed. The socket is closed so that any outstanding
265       // asynchronous operations are cancelled.
266       socket_.close();
267 
268       // There is no longer an active deadline. The expiry is set to the
269       // maximum time point so that the actor takes no action until a new
270       // deadline is set.
271       deadline_.expires_at(steady_timer::time_point::max());
272     }
273 
274     // Put the actor back to sleep.
275     deadline_.async_wait(std::bind(&client::check_deadline, this));
276   }
277 
278 private:
279   bool stopped_ = false;
280   tcp::resolver::results_type endpoints_;
281   tcp::socket socket_;
282   std::string input_buffer_;
283   steady_timer deadline_;
284   steady_timer heartbeat_timer_;
285 };
286 
main(int argc,char * argv[])287 int main(int argc, char* argv[])
288 {
289   try
290   {
291     if (argc != 3)
292     {
293       std::cerr << "Usage: client <host> <port>\n";
294       return 1;
295     }
296 
297     boost::asio::io_context io_context;
298     tcp::resolver r(io_context);
299     client c(io_context);
300 
301     c.start(r.resolve(argv[1], argv[2]));
302 
303     io_context.run();
304   }
305   catch (std::exception& e)
306   {
307     std::cerr << "Exception: " << e.what() << "\n";
308   }
309 
310   return 0;
311 }
312