• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // async_tcp_client.cpp
3 // ~~~~~~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 
11 #include <boost/asio/buffer.hpp>
12 #include <boost/asio/io_context.hpp>
13 #include <boost/asio/ip/tcp.hpp>
14 #include <boost/asio/read_until.hpp>
15 #include <boost/asio/steady_timer.hpp>
16 #include <boost/asio/write.hpp>
17 #include <boost/bind/bind.hpp>
18 #include <iostream>
19 #include <string>
20 
21 using boost::asio::steady_timer;
22 using boost::asio::ip::tcp;
23 
24 //
25 // This class manages socket timeouts by applying the concept of a deadline.
26 // Some asynchronous operations are given deadlines by which they must complete.
27 // Deadlines are enforced by an "actor" that persists for the lifetime of the
28 // client object:
29 //
30 //  +----------------+
31 //  |                |
32 //  | check_deadline |<---+
33 //  |                |    |
34 //  +----------------+    | async_wait()
35 //              |         |
36 //              +---------+
37 //
38 // If the deadline actor determines that the deadline has expired, the socket
39 // is closed and any outstanding operations are consequently cancelled.
40 //
41 // Connection establishment involves trying each endpoint in turn until a
42 // connection is successful, or the available endpoints are exhausted. If the
43 // deadline actor closes the socket, the connect actor is woken up and moves to
44 // the next endpoint.
45 //
46 //  +---------------+
47 //  |               |
48 //  | start_connect |<---+
49 //  |               |    |
50 //  +---------------+    |
51 //           |           |
52 //  async_-  |    +----------------+
53 // connect() |    |                |
54 //           +--->| handle_connect |
55 //                |                |
56 //                +----------------+
57 //                          :
58 // Once a connection is     :
59 // made, the connect        :
60 // actor forks in two -     :
61 //                          :
62 // an actor for reading     :       and an actor for
63 // inbound messages:        :       sending heartbeats:
64 //                          :
65 //  +------------+          :          +-------------+
66 //  |            |<- - - - -+- - - - ->|             |
67 //  | start_read |                     | start_write |<---+
68 //  |            |<---+                |             |    |
69 //  +------------+    |                +-------------+    | async_wait()
70 //          |         |                        |          |
71 //  async_- |    +-------------+       async_- |    +--------------+
72 //   read_- |    |             |       write() |    |              |
73 //  until() +--->| handle_read |               +--->| handle_write |
74 //               |             |                    |              |
75 //               +-------------+                    +--------------+
76 //
77 // The input actor reads messages from the socket, where messages are delimited
78 // by the newline character. The deadline for a complete message is 30 seconds.
79 //
80 // The heartbeat actor sends a heartbeat (a message that consists of a single
81 // newline character) every 10 seconds. In this example, no deadline is applied
82 // to message sending.
83 //
84 class client
85 {
86 public:
client(boost::asio::io_context & io_context)87   client(boost::asio::io_context& io_context)
88     : stopped_(false),
89       socket_(io_context),
90       deadline_(io_context),
91       heartbeat_timer_(io_context)
92   {
93   }
94 
95   // Called by the user of the client class to initiate the connection process.
96   // The endpoints will have been obtained using a tcp::resolver.
start(tcp::resolver::results_type endpoints)97   void start(tcp::resolver::results_type endpoints)
98   {
99     // Start the connect actor.
100     endpoints_ = endpoints;
101     start_connect(endpoints_.begin());
102 
103     // Start the deadline actor. You will note that we're not setting any
104     // particular deadline here. Instead, the connect and input actors will
105     // update the deadline prior to each asynchronous operation.
106     deadline_.async_wait(boost::bind(&client::check_deadline, this));
107   }
108 
109   // This function terminates all the actors to shut down the connection. It
110   // may be called by the user of the client class, or by the class itself in
111   // response to graceful termination or an unrecoverable error.
stop()112   void stop()
113   {
114     stopped_ = true;
115     boost::system::error_code ignored_ec;
116     socket_.close(ignored_ec);
117     deadline_.cancel();
118     heartbeat_timer_.cancel();
119   }
120 
121 private:
start_connect(tcp::resolver::results_type::iterator endpoint_iter)122   void start_connect(tcp::resolver::results_type::iterator endpoint_iter)
123   {
124     if (endpoint_iter != endpoints_.end())
125     {
126       std::cout << "Trying " << endpoint_iter->endpoint() << "...\n";
127 
128       // Set a deadline for the connect operation.
129       deadline_.expires_after(boost::asio::chrono::seconds(60));
130 
131       // Start the asynchronous connect operation.
132       socket_.async_connect(endpoint_iter->endpoint(),
133           boost::bind(&client::handle_connect, this,
134             boost::placeholders::_1, endpoint_iter));
135     }
136     else
137     {
138       // There are no more endpoints to try. Shut down the client.
139       stop();
140     }
141   }
142 
handle_connect(const boost::system::error_code & ec,tcp::resolver::results_type::iterator endpoint_iter)143   void handle_connect(const boost::system::error_code& ec,
144       tcp::resolver::results_type::iterator endpoint_iter)
145   {
146     if (stopped_)
147       return;
148 
149     // The async_connect() function automatically opens the socket at the start
150     // of the asynchronous operation. If the socket is closed at this time then
151     // the timeout handler must have run first.
152     if (!socket_.is_open())
153     {
154       std::cout << "Connect timed out\n";
155 
156       // Try the next available endpoint.
157       start_connect(++endpoint_iter);
158     }
159 
160     // Check if the connect operation failed before the deadline expired.
161     else if (ec)
162     {
163       std::cout << "Connect error: " << ec.message() << "\n";
164 
165       // We need to close the socket used in the previous connection attempt
166       // before starting a new one.
167       socket_.close();
168 
169       // Try the next available endpoint.
170       start_connect(++endpoint_iter);
171     }
172 
173     // Otherwise we have successfully established a connection.
174     else
175     {
176       std::cout << "Connected to " << endpoint_iter->endpoint() << "\n";
177 
178       // Start the input actor.
179       start_read();
180 
181       // Start the heartbeat actor.
182       start_write();
183     }
184   }
185 
start_read()186   void start_read()
187   {
188     // Set a deadline for the read operation.
189     deadline_.expires_after(boost::asio::chrono::seconds(30));
190 
191     // Start an asynchronous operation to read a newline-delimited message.
192     boost::asio::async_read_until(socket_,
193         boost::asio::dynamic_buffer(input_buffer_), '\n',
194         boost::bind(&client::handle_read, this,
195           boost::placeholders::_1, boost::placeholders::_2));
196   }
197 
handle_read(const boost::system::error_code & ec,std::size_t n)198   void handle_read(const boost::system::error_code& ec, std::size_t n)
199   {
200     if (stopped_)
201       return;
202 
203     if (!ec)
204     {
205       // Extract the newline-delimited message from the buffer.
206       std::string line(input_buffer_.substr(0, n - 1));
207       input_buffer_.erase(0, n);
208 
209       // Empty messages are heartbeats and so ignored.
210       if (!line.empty())
211       {
212         std::cout << "Received: " << line << "\n";
213       }
214 
215       start_read();
216     }
217     else
218     {
219       std::cout << "Error on receive: " << ec.message() << "\n";
220 
221       stop();
222     }
223   }
224 
start_write()225   void start_write()
226   {
227     if (stopped_)
228       return;
229 
230     // Start an asynchronous operation to send a heartbeat message.
231     boost::asio::async_write(socket_, boost::asio::buffer("\n", 1),
232         boost::bind(&client::handle_write, this, boost::placeholders::_1));
233   }
234 
handle_write(const boost::system::error_code & ec)235   void handle_write(const boost::system::error_code& ec)
236   {
237     if (stopped_)
238       return;
239 
240     if (!ec)
241     {
242       // Wait 10 seconds before sending the next heartbeat.
243       heartbeat_timer_.expires_after(boost::asio::chrono::seconds(10));
244       heartbeat_timer_.async_wait(boost::bind(&client::start_write, this));
245     }
246     else
247     {
248       std::cout << "Error on heartbeat: " << ec.message() << "\n";
249 
250       stop();
251     }
252   }
253 
check_deadline()254   void check_deadline()
255   {
256     if (stopped_)
257       return;
258 
259     // Check whether the deadline has passed. We compare the deadline against
260     // the current time since a new asynchronous operation may have moved the
261     // deadline before this actor had a chance to run.
262     if (deadline_.expiry() <= steady_timer::clock_type::now())
263     {
264       // The deadline has passed. The socket is closed so that any outstanding
265       // asynchronous operations are cancelled.
266       socket_.close();
267 
268       // There is no longer an active deadline. The expiry is set to the
269       // maximum time point so that the actor takes no action until a new
270       // deadline is set.
271       deadline_.expires_at(steady_timer::time_point::max());
272     }
273 
274     // Put the actor back to sleep.
275     deadline_.async_wait(boost::bind(&client::check_deadline, this));
276   }
277 
278 private:
279   bool stopped_;
280   tcp::resolver::results_type endpoints_;
281   tcp::socket socket_;
282   std::string input_buffer_;
283   steady_timer deadline_;
284   steady_timer heartbeat_timer_;
285 };
286 
main(int argc,char * argv[])287 int main(int argc, char* argv[])
288 {
289   try
290   {
291     if (argc != 3)
292     {
293       std::cerr << "Usage: client <host> <port>\n";
294       return 1;
295     }
296 
297     boost::asio::io_context io_context;
298     tcp::resolver r(io_context);
299     client c(io_context);
300 
301     c.start(r.resolve(argv[1], argv[2]));
302 
303     io_context.run();
304   }
305   catch (std::exception& e)
306   {
307     std::cerr << "Exception: " << e.what() << "\n";
308   }
309 
310   return 0;
311 }
312