1 // 2 // async_tcp_client.cpp 3 // ~~~~~~~~~~~~~~~~~~~~ 4 // 5 // Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com) 6 // 7 // Distributed under the Boost Software License, Version 1.0. (See accompanying 8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 9 // 10 11 #include <boost/asio/buffer.hpp> 12 #include <boost/asio/io_context.hpp> 13 #include <boost/asio/ip/tcp.hpp> 14 #include <boost/asio/read_until.hpp> 15 #include <boost/asio/steady_timer.hpp> 16 #include <boost/asio/write.hpp> 17 #include <boost/bind/bind.hpp> 18 #include <iostream> 19 #include <string> 20 21 using boost::asio::steady_timer; 22 using boost::asio::ip::tcp; 23 24 // 25 // This class manages socket timeouts by applying the concept of a deadline. 26 // Some asynchronous operations are given deadlines by which they must complete. 27 // Deadlines are enforced by an "actor" that persists for the lifetime of the 28 // client object: 29 // 30 // +----------------+ 31 // | | 32 // | check_deadline |<---+ 33 // | | | 34 // +----------------+ | async_wait() 35 // | | 36 // +---------+ 37 // 38 // If the deadline actor determines that the deadline has expired, the socket 39 // is closed and any outstanding operations are consequently cancelled. 40 // 41 // Connection establishment involves trying each endpoint in turn until a 42 // connection is successful, or the available endpoints are exhausted. If the 43 // deadline actor closes the socket, the connect actor is woken up and moves to 44 // the next endpoint. 45 // 46 // +---------------+ 47 // | | 48 // | start_connect |<---+ 49 // | | | 50 // +---------------+ | 51 // | | 52 // async_- | +----------------+ 53 // connect() | | | 54 // +--->| handle_connect | 55 // | | 56 // +----------------+ 57 // : 58 // Once a connection is : 59 // made, the connect : 60 // actor forks in two - : 61 // : 62 // an actor for reading : and an actor for 63 // inbound messages: : sending heartbeats: 64 // : 65 // +------------+ : +-------------+ 66 // | |<- - - - -+- - - - ->| | 67 // | start_read | | start_write |<---+ 68 // | |<---+ | | | 69 // +------------+ | +-------------+ | async_wait() 70 // | | | | 71 // async_- | +-------------+ async_- | +--------------+ 72 // read_- | | | write() | | | 73 // until() +--->| handle_read | +--->| handle_write | 74 // | | | | 75 // +-------------+ +--------------+ 76 // 77 // The input actor reads messages from the socket, where messages are delimited 78 // by the newline character. The deadline for a complete message is 30 seconds. 79 // 80 // The heartbeat actor sends a heartbeat (a message that consists of a single 81 // newline character) every 10 seconds. In this example, no deadline is applied 82 // to message sending. 83 // 84 class client 85 { 86 public: client(boost::asio::io_context & io_context)87 client(boost::asio::io_context& io_context) 88 : stopped_(false), 89 socket_(io_context), 90 deadline_(io_context), 91 heartbeat_timer_(io_context) 92 { 93 } 94 95 // Called by the user of the client class to initiate the connection process. 96 // The endpoints will have been obtained using a tcp::resolver. start(tcp::resolver::results_type endpoints)97 void start(tcp::resolver::results_type endpoints) 98 { 99 // Start the connect actor. 100 endpoints_ = endpoints; 101 start_connect(endpoints_.begin()); 102 103 // Start the deadline actor. You will note that we're not setting any 104 // particular deadline here. Instead, the connect and input actors will 105 // update the deadline prior to each asynchronous operation. 106 deadline_.async_wait(boost::bind(&client::check_deadline, this)); 107 } 108 109 // This function terminates all the actors to shut down the connection. It 110 // may be called by the user of the client class, or by the class itself in 111 // response to graceful termination or an unrecoverable error. stop()112 void stop() 113 { 114 stopped_ = true; 115 boost::system::error_code ignored_ec; 116 socket_.close(ignored_ec); 117 deadline_.cancel(); 118 heartbeat_timer_.cancel(); 119 } 120 121 private: start_connect(tcp::resolver::results_type::iterator endpoint_iter)122 void start_connect(tcp::resolver::results_type::iterator endpoint_iter) 123 { 124 if (endpoint_iter != endpoints_.end()) 125 { 126 std::cout << "Trying " << endpoint_iter->endpoint() << "...\n"; 127 128 // Set a deadline for the connect operation. 129 deadline_.expires_after(boost::asio::chrono::seconds(60)); 130 131 // Start the asynchronous connect operation. 132 socket_.async_connect(endpoint_iter->endpoint(), 133 boost::bind(&client::handle_connect, this, 134 boost::placeholders::_1, endpoint_iter)); 135 } 136 else 137 { 138 // There are no more endpoints to try. Shut down the client. 139 stop(); 140 } 141 } 142 handle_connect(const boost::system::error_code & ec,tcp::resolver::results_type::iterator endpoint_iter)143 void handle_connect(const boost::system::error_code& ec, 144 tcp::resolver::results_type::iterator endpoint_iter) 145 { 146 if (stopped_) 147 return; 148 149 // The async_connect() function automatically opens the socket at the start 150 // of the asynchronous operation. If the socket is closed at this time then 151 // the timeout handler must have run first. 152 if (!socket_.is_open()) 153 { 154 std::cout << "Connect timed out\n"; 155 156 // Try the next available endpoint. 157 start_connect(++endpoint_iter); 158 } 159 160 // Check if the connect operation failed before the deadline expired. 161 else if (ec) 162 { 163 std::cout << "Connect error: " << ec.message() << "\n"; 164 165 // We need to close the socket used in the previous connection attempt 166 // before starting a new one. 167 socket_.close(); 168 169 // Try the next available endpoint. 170 start_connect(++endpoint_iter); 171 } 172 173 // Otherwise we have successfully established a connection. 174 else 175 { 176 std::cout << "Connected to " << endpoint_iter->endpoint() << "\n"; 177 178 // Start the input actor. 179 start_read(); 180 181 // Start the heartbeat actor. 182 start_write(); 183 } 184 } 185 start_read()186 void start_read() 187 { 188 // Set a deadline for the read operation. 189 deadline_.expires_after(boost::asio::chrono::seconds(30)); 190 191 // Start an asynchronous operation to read a newline-delimited message. 192 boost::asio::async_read_until(socket_, 193 boost::asio::dynamic_buffer(input_buffer_), '\n', 194 boost::bind(&client::handle_read, this, 195 boost::placeholders::_1, boost::placeholders::_2)); 196 } 197 handle_read(const boost::system::error_code & ec,std::size_t n)198 void handle_read(const boost::system::error_code& ec, std::size_t n) 199 { 200 if (stopped_) 201 return; 202 203 if (!ec) 204 { 205 // Extract the newline-delimited message from the buffer. 206 std::string line(input_buffer_.substr(0, n - 1)); 207 input_buffer_.erase(0, n); 208 209 // Empty messages are heartbeats and so ignored. 210 if (!line.empty()) 211 { 212 std::cout << "Received: " << line << "\n"; 213 } 214 215 start_read(); 216 } 217 else 218 { 219 std::cout << "Error on receive: " << ec.message() << "\n"; 220 221 stop(); 222 } 223 } 224 start_write()225 void start_write() 226 { 227 if (stopped_) 228 return; 229 230 // Start an asynchronous operation to send a heartbeat message. 231 boost::asio::async_write(socket_, boost::asio::buffer("\n", 1), 232 boost::bind(&client::handle_write, this, boost::placeholders::_1)); 233 } 234 handle_write(const boost::system::error_code & ec)235 void handle_write(const boost::system::error_code& ec) 236 { 237 if (stopped_) 238 return; 239 240 if (!ec) 241 { 242 // Wait 10 seconds before sending the next heartbeat. 243 heartbeat_timer_.expires_after(boost::asio::chrono::seconds(10)); 244 heartbeat_timer_.async_wait(boost::bind(&client::start_write, this)); 245 } 246 else 247 { 248 std::cout << "Error on heartbeat: " << ec.message() << "\n"; 249 250 stop(); 251 } 252 } 253 check_deadline()254 void check_deadline() 255 { 256 if (stopped_) 257 return; 258 259 // Check whether the deadline has passed. We compare the deadline against 260 // the current time since a new asynchronous operation may have moved the 261 // deadline before this actor had a chance to run. 262 if (deadline_.expiry() <= steady_timer::clock_type::now()) 263 { 264 // The deadline has passed. The socket is closed so that any outstanding 265 // asynchronous operations are cancelled. 266 socket_.close(); 267 268 // There is no longer an active deadline. The expiry is set to the 269 // maximum time point so that the actor takes no action until a new 270 // deadline is set. 271 deadline_.expires_at(steady_timer::time_point::max()); 272 } 273 274 // Put the actor back to sleep. 275 deadline_.async_wait(boost::bind(&client::check_deadline, this)); 276 } 277 278 private: 279 bool stopped_; 280 tcp::resolver::results_type endpoints_; 281 tcp::socket socket_; 282 std::string input_buffer_; 283 steady_timer deadline_; 284 steady_timer heartbeat_timer_; 285 }; 286 main(int argc,char * argv[])287 int main(int argc, char* argv[]) 288 { 289 try 290 { 291 if (argc != 3) 292 { 293 std::cerr << "Usage: client <host> <port>\n"; 294 return 1; 295 } 296 297 boost::asio::io_context io_context; 298 tcp::resolver r(io_context); 299 client c(io_context); 300 301 c.start(r.resolve(argv[1], argv[2])); 302 303 io_context.run(); 304 } 305 catch (std::exception& e) 306 { 307 std::cerr << "Exception: " << e.what() << "\n"; 308 } 309 310 return 0; 311 } 312