1 //
2 // server.cpp
3 // ~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10
11 #include <algorithm>
12 #include <cstdlib>
13 #include <deque>
14 #include <iostream>
15 #include <set>
16 #include <string>
17 #include <boost/bind/bind.hpp>
18 #include <boost/shared_ptr.hpp>
19 #include <boost/enable_shared_from_this.hpp>
20 #include <boost/asio/buffer.hpp>
21 #include <boost/asio/io_context.hpp>
22 #include <boost/asio/ip/tcp.hpp>
23 #include <boost/asio/ip/udp.hpp>
24 #include <boost/asio/read_until.hpp>
25 #include <boost/asio/steady_timer.hpp>
26 #include <boost/asio/write.hpp>
27
28 using boost::asio::steady_timer;
29 using boost::asio::ip::tcp;
30 using boost::asio::ip::udp;
31
32 //----------------------------------------------------------------------
33
34 class subscriber
35 {
36 public:
~subscriber()37 virtual ~subscriber() {}
38 virtual void deliver(const std::string& msg) = 0;
39 };
40
41 typedef boost::shared_ptr<subscriber> subscriber_ptr;
42
43 //----------------------------------------------------------------------
44
45 class channel
46 {
47 public:
join(subscriber_ptr subscriber)48 void join(subscriber_ptr subscriber)
49 {
50 subscribers_.insert(subscriber);
51 }
52
leave(subscriber_ptr subscriber)53 void leave(subscriber_ptr subscriber)
54 {
55 subscribers_.erase(subscriber);
56 }
57
deliver(const std::string & msg)58 void deliver(const std::string& msg)
59 {
60 std::for_each(subscribers_.begin(), subscribers_.end(),
61 boost::bind(&subscriber::deliver,
62 boost::placeholders::_1, boost::ref(msg)));
63 }
64
65 private:
66 std::set<subscriber_ptr> subscribers_;
67 };
68
69 //----------------------------------------------------------------------
70
71 //
72 // This class manages socket timeouts by applying the concept of a deadline.
73 // Some asynchronous operations are given deadlines by which they must complete.
74 // Deadlines are enforced by two "actors" that persist for the lifetime of the
75 // session object, one for input and one for output:
76 //
77 // +----------------+ +----------------+
78 // | | | |
79 // | check_deadline |<---+ | check_deadline |<---+
80 // | | | async_wait() | | | async_wait()
81 // +----------------+ | on input +----------------+ | on output
82 // | | deadline | | deadline
83 // +---------+ +---------+
84 //
85 // If either deadline actor determines that the corresponding deadline has
86 // expired, the socket is closed and any outstanding operations are cancelled.
87 //
88 // The input actor reads messages from the socket, where messages are delimited
89 // by the newline character:
90 //
91 // +------------+
92 // | |
93 // | start_read |<---+
94 // | | |
95 // +------------+ |
96 // | |
97 // async_- | +-------------+
98 // read_- | | |
99 // until() +--->| handle_read |
100 // | |
101 // +-------------+
102 //
103 // The deadline for receiving a complete message is 30 seconds. If a non-empty
104 // message is received, it is delivered to all subscribers. If a heartbeat (a
105 // message that consists of a single newline character) is received, a heartbeat
106 // is enqueued for the client, provided there are no other messages waiting to
107 // be sent.
108 //
109 // The output actor is responsible for sending messages to the client:
110 //
111 // +--------------+
112 // | |<---------------------+
113 // | await_output | |
114 // | |<---+ |
115 // +--------------+ | |
116 // | | | async_wait() |
117 // | +--------+ |
118 // V |
119 // +-------------+ +--------------+
120 // | | async_write() | |
121 // | start_write |-------------->| handle_write |
122 // | | | |
123 // +-------------+ +--------------+
124 //
125 // The output actor first waits for an output message to be enqueued. It does
126 // this by using a steady_timer as an asynchronous condition variable. The
127 // steady_timer will be signalled whenever the output queue is non-empty.
128 //
129 // Once a message is available, it is sent to the client. The deadline for
130 // sending a complete message is 30 seconds. After the message is successfully
131 // sent, the output actor again waits for the output queue to become non-empty.
132 //
133 class tcp_session
134 : public subscriber,
135 public boost::enable_shared_from_this<tcp_session>
136 {
137 public:
tcp_session(boost::asio::io_context & io_context,channel & ch)138 tcp_session(boost::asio::io_context& io_context, channel& ch)
139 : channel_(ch),
140 socket_(io_context),
141 input_deadline_(io_context),
142 non_empty_output_queue_(io_context),
143 output_deadline_(io_context)
144 {
145 input_deadline_.expires_at(steady_timer::time_point::max());
146 output_deadline_.expires_at(steady_timer::time_point::max());
147
148 // The non_empty_output_queue_ steady_timer is set to the maximum time
149 // point whenever the output queue is empty. This ensures that the output
150 // actor stays asleep until a message is put into the queue.
151 non_empty_output_queue_.expires_at(steady_timer::time_point::max());
152 }
153
socket()154 tcp::socket& socket()
155 {
156 return socket_;
157 }
158
159 // Called by the server object to initiate the four actors.
start()160 void start()
161 {
162 channel_.join(shared_from_this());
163
164 start_read();
165
166 input_deadline_.async_wait(
167 boost::bind(&tcp_session::check_deadline,
168 shared_from_this(), &input_deadline_));
169
170 await_output();
171
172 output_deadline_.async_wait(
173 boost::bind(&tcp_session::check_deadline,
174 shared_from_this(), &output_deadline_));
175 }
176
177 private:
stop()178 void stop()
179 {
180 channel_.leave(shared_from_this());
181
182 boost::system::error_code ignored_ec;
183 socket_.close(ignored_ec);
184 input_deadline_.cancel();
185 non_empty_output_queue_.cancel();
186 output_deadline_.cancel();
187 }
188
stopped() const189 bool stopped() const
190 {
191 return !socket_.is_open();
192 }
193
deliver(const std::string & msg)194 void deliver(const std::string& msg)
195 {
196 output_queue_.push_back(msg + "\n");
197
198 // Signal that the output queue contains messages. Modifying the expiry
199 // will wake the output actor, if it is waiting on the timer.
200 non_empty_output_queue_.expires_at(steady_timer::time_point::min());
201 }
202
start_read()203 void start_read()
204 {
205 // Set a deadline for the read operation.
206 input_deadline_.expires_after(boost::asio::chrono::seconds(30));
207
208 // Start an asynchronous operation to read a newline-delimited message.
209 boost::asio::async_read_until(socket_,
210 boost::asio::dynamic_buffer(input_buffer_), '\n',
211 boost::bind(&tcp_session::handle_read, shared_from_this(),
212 boost::placeholders::_1, boost::placeholders::_2));
213 }
214
handle_read(const boost::system::error_code & ec,std::size_t n)215 void handle_read(const boost::system::error_code& ec, std::size_t n)
216 {
217 if (stopped())
218 return;
219
220 if (!ec)
221 {
222 // Extract the newline-delimited message from the buffer.
223 std::string msg(input_buffer_.substr(0, n - 1));
224 input_buffer_.erase(0, n);
225
226 if (!msg.empty())
227 {
228 channel_.deliver(msg);
229 }
230 else
231 {
232 // We received a heartbeat message from the client. If there's nothing
233 // else being sent or ready to be sent, send a heartbeat right back.
234 if (output_queue_.empty())
235 {
236 output_queue_.push_back("\n");
237
238 // Signal that the output queue contains messages. Modifying the
239 // expiry will wake the output actor, if it is waiting on the timer.
240 non_empty_output_queue_.expires_at(steady_timer::time_point::min());
241 }
242 }
243
244 start_read();
245 }
246 else
247 {
248 stop();
249 }
250 }
251
await_output()252 void await_output()
253 {
254 if (stopped())
255 return;
256
257 if (output_queue_.empty())
258 {
259 // There are no messages that are ready to be sent. The actor goes to
260 // sleep by waiting on the non_empty_output_queue_ timer. When a new
261 // message is added, the timer will be modified and the actor will wake.
262 non_empty_output_queue_.expires_at(steady_timer::time_point::max());
263 non_empty_output_queue_.async_wait(
264 boost::bind(&tcp_session::await_output, shared_from_this()));
265 }
266 else
267 {
268 start_write();
269 }
270 }
271
start_write()272 void start_write()
273 {
274 // Set a deadline for the write operation.
275 output_deadline_.expires_after(boost::asio::chrono::seconds(30));
276
277 // Start an asynchronous operation to send a message.
278 boost::asio::async_write(socket_,
279 boost::asio::buffer(output_queue_.front()),
280 boost::bind(&tcp_session::handle_write,
281 shared_from_this(), boost::placeholders::_1));
282 }
283
handle_write(const boost::system::error_code & ec)284 void handle_write(const boost::system::error_code& ec)
285 {
286 if (stopped())
287 return;
288
289 if (!ec)
290 {
291 output_queue_.pop_front();
292
293 await_output();
294 }
295 else
296 {
297 stop();
298 }
299 }
300
check_deadline(steady_timer * deadline)301 void check_deadline(steady_timer* deadline)
302 {
303 if (stopped())
304 return;
305
306 // Check whether the deadline has passed. We compare the deadline against
307 // the current time since a new asynchronous operation may have moved the
308 // deadline before this actor had a chance to run.
309 if (deadline->expiry() <= steady_timer::clock_type::now())
310 {
311 // The deadline has passed. Stop the session. The other actors will
312 // terminate as soon as possible.
313 stop();
314 }
315 else
316 {
317 // Put the actor back to sleep.
318 deadline->async_wait(
319 boost::bind(&tcp_session::check_deadline,
320 shared_from_this(), deadline));
321 }
322 }
323
324 channel& channel_;
325 tcp::socket socket_;
326 std::string input_buffer_;
327 steady_timer input_deadline_;
328 std::deque<std::string> output_queue_;
329 steady_timer non_empty_output_queue_;
330 steady_timer output_deadline_;
331 };
332
333 typedef boost::shared_ptr<tcp_session> tcp_session_ptr;
334
335 //----------------------------------------------------------------------
336
337 class udp_broadcaster
338 : public subscriber
339 {
340 public:
udp_broadcaster(boost::asio::io_context & io_context,const udp::endpoint & broadcast_endpoint)341 udp_broadcaster(boost::asio::io_context& io_context,
342 const udp::endpoint& broadcast_endpoint)
343 : socket_(io_context)
344 {
345 socket_.connect(broadcast_endpoint);
346 socket_.set_option(udp::socket::broadcast(true));
347 }
348
349 private:
deliver(const std::string & msg)350 void deliver(const std::string& msg)
351 {
352 boost::system::error_code ignored_ec;
353 socket_.send(boost::asio::buffer(msg), 0, ignored_ec);
354 }
355
356 udp::socket socket_;
357 };
358
359 //----------------------------------------------------------------------
360
361 class server
362 {
363 public:
server(boost::asio::io_context & io_context,const tcp::endpoint & listen_endpoint,const udp::endpoint & broadcast_endpoint)364 server(boost::asio::io_context& io_context,
365 const tcp::endpoint& listen_endpoint,
366 const udp::endpoint& broadcast_endpoint)
367 : io_context_(io_context),
368 acceptor_(io_context, listen_endpoint)
369 {
370 subscriber_ptr bc(new udp_broadcaster(io_context_, broadcast_endpoint));
371 channel_.join(bc);
372
373 start_accept();
374 }
375
start_accept()376 void start_accept()
377 {
378 tcp_session_ptr new_session(new tcp_session(io_context_, channel_));
379
380 acceptor_.async_accept(new_session->socket(),
381 boost::bind(&server::handle_accept,
382 this, new_session, boost::placeholders::_1));
383 }
384
handle_accept(tcp_session_ptr session,const boost::system::error_code & ec)385 void handle_accept(tcp_session_ptr session,
386 const boost::system::error_code& ec)
387 {
388 if (!ec)
389 {
390 session->start();
391 }
392
393 start_accept();
394 }
395
396 private:
397 boost::asio::io_context& io_context_;
398 tcp::acceptor acceptor_;
399 channel channel_;
400 };
401
402 //----------------------------------------------------------------------
403
main(int argc,char * argv[])404 int main(int argc, char* argv[])
405 {
406 try
407 {
408 using namespace std; // For atoi.
409
410 if (argc != 4)
411 {
412 std::cerr << "Usage: server <listen_port> <bcast_address> <bcast_port>\n";
413 return 1;
414 }
415
416 boost::asio::io_context io_context;
417
418 tcp::endpoint listen_endpoint(tcp::v4(), atoi(argv[1]));
419
420 udp::endpoint broadcast_endpoint(
421 boost::asio::ip::make_address(argv[2]), atoi(argv[3]));
422
423 server s(io_context, listen_endpoint, broadcast_endpoint);
424
425 io_context.run();
426 }
427 catch (std::exception& e)
428 {
429 std::cerr << "Exception: " << e.what() << "\n";
430 }
431
432 return 0;
433 }
434