1 //
2 // server.cpp
3 // ~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10
11 #include <algorithm>
12 #include <cstdlib>
13 #include <deque>
14 #include <iostream>
15 #include <memory>
16 #include <set>
17 #include <string>
18 #include <boost/asio/buffer.hpp>
19 #include <boost/asio/io_context.hpp>
20 #include <boost/asio/ip/tcp.hpp>
21 #include <boost/asio/ip/udp.hpp>
22 #include <boost/asio/read_until.hpp>
23 #include <boost/asio/steady_timer.hpp>
24 #include <boost/asio/write.hpp>
25
26 using boost::asio::steady_timer;
27 using boost::asio::ip::tcp;
28 using boost::asio::ip::udp;
29
30 //----------------------------------------------------------------------
31
32 class subscriber
33 {
34 public:
35 virtual ~subscriber() = default;
36 virtual void deliver(const std::string& msg) = 0;
37 };
38
39 typedef std::shared_ptr<subscriber> subscriber_ptr;
40
41 //----------------------------------------------------------------------
42
43 class channel
44 {
45 public:
join(subscriber_ptr subscriber)46 void join(subscriber_ptr subscriber)
47 {
48 subscribers_.insert(subscriber);
49 }
50
leave(subscriber_ptr subscriber)51 void leave(subscriber_ptr subscriber)
52 {
53 subscribers_.erase(subscriber);
54 }
55
deliver(const std::string & msg)56 void deliver(const std::string& msg)
57 {
58 for (const auto& s : subscribers_)
59 {
60 s->deliver(msg);
61 }
62 }
63
64 private:
65 std::set<subscriber_ptr> subscribers_;
66 };
67
68 //----------------------------------------------------------------------
69
70 //
71 // This class manages socket timeouts by applying the concept of a deadline.
72 // Some asynchronous operations are given deadlines by which they must complete.
73 // Deadlines are enforced by two "actors" that persist for the lifetime of the
74 // session object, one for input and one for output:
75 //
76 // +----------------+ +----------------+
77 // | | | |
78 // | check_deadline |<-------+ | check_deadline |<-------+
79 // | | | | | |
80 // +----------------+ | +----------------+ |
81 // | | | |
82 // async_wait() | +----------------+ async_wait() | +----------------+
83 // on input | | lambda | on output | | lambda |
84 // deadline +--->| in | deadline +--->| in |
85 // | check_deadline | | check_deadline |
86 // +----------------+ +----------------+
87 //
88 // If either deadline actor determines that the corresponding deadline has
89 // expired, the socket is closed and any outstanding operations are cancelled.
90 //
91 // The input actor reads messages from the socket, where messages are delimited
92 // by the newline character:
93 //
94 // +-------------+
95 // | |
96 // | read_line |<----+
97 // | | |
98 // +-------------+ |
99 // | |
100 // async_- | +-------------+
101 // read_- | | lambda |
102 // until() +--->| in |
103 // | read_line |
104 // +-------------+
105 //
106 // The deadline for receiving a complete message is 30 seconds. If a non-empty
107 // message is received, it is delivered to all subscribers. If a heartbeat (a
108 // message that consists of a single newline character) is received, a heartbeat
109 // is enqueued for the client, provided there are no other messages waiting to
110 // be sent.
111 //
112 // The output actor is responsible for sending messages to the client:
113 //
114 // +----------------+
115 // | |<---------------------+
116 // | await_output | |
117 // | |<-------+ |
118 // +----------------+ | |
119 // | | | |
120 // | async_- | +----------------+ |
121 // | wait() | | lambda | |
122 // | +->| in | |
123 // | | await_output | |
124 // | +----------------+ |
125 // V |
126 // +--------------+ +--------------+
127 // | | async_write() | lambda |
128 // | write_line |-------------->| in |
129 // | | | write_line |
130 // +--------------+ +--------------+
131 //
132 // The output actor first waits for an output message to be enqueued. It does
133 // this by using a steady_timer as an asynchronous condition variable. The
134 // steady_timer will be signalled whenever the output queue is non-empty.
135 //
136 // Once a message is available, it is sent to the client. The deadline for
137 // sending a complete message is 30 seconds. After the message is successfully
138 // sent, the output actor again waits for the output queue to become non-empty.
139 //
140 class tcp_session
141 : public subscriber,
142 public std::enable_shared_from_this<tcp_session>
143 {
144 public:
tcp_session(tcp::socket socket,channel & ch)145 tcp_session(tcp::socket socket, channel& ch)
146 : channel_(ch),
147 socket_(std::move(socket))
148 {
149 input_deadline_.expires_at(steady_timer::time_point::max());
150 output_deadline_.expires_at(steady_timer::time_point::max());
151
152 // The non_empty_output_queue_ steady_timer is set to the maximum time
153 // point whenever the output queue is empty. This ensures that the output
154 // actor stays asleep until a message is put into the queue.
155 non_empty_output_queue_.expires_at(steady_timer::time_point::max());
156 }
157
158 // Called by the server object to initiate the four actors.
start()159 void start()
160 {
161 channel_.join(shared_from_this());
162
163 read_line();
164 check_deadline(input_deadline_);
165
166 await_output();
167 check_deadline(output_deadline_);
168 }
169
170 private:
stop()171 void stop()
172 {
173 channel_.leave(shared_from_this());
174
175 boost::system::error_code ignored_error;
176 socket_.close(ignored_error);
177 input_deadline_.cancel();
178 non_empty_output_queue_.cancel();
179 output_deadline_.cancel();
180 }
181
stopped() const182 bool stopped() const
183 {
184 return !socket_.is_open();
185 }
186
deliver(const std::string & msg)187 void deliver(const std::string& msg) override
188 {
189 output_queue_.push_back(msg + "\n");
190
191 // Signal that the output queue contains messages. Modifying the expiry
192 // will wake the output actor, if it is waiting on the timer.
193 non_empty_output_queue_.expires_at(steady_timer::time_point::min());
194 }
195
read_line()196 void read_line()
197 {
198 // Set a deadline for the read operation.
199 input_deadline_.expires_after(std::chrono::seconds(30));
200
201 // Start an asynchronous operation to read a newline-delimited message.
202 auto self(shared_from_this());
203 boost::asio::async_read_until(socket_,
204 boost::asio::dynamic_buffer(input_buffer_), '\n',
205 [this, self](const boost::system::error_code& error, std::size_t n)
206 {
207 // Check if the session was stopped while the operation was pending.
208 if (stopped())
209 return;
210
211 if (!error)
212 {
213 // Extract the newline-delimited message from the buffer.
214 std::string msg(input_buffer_.substr(0, n - 1));
215 input_buffer_.erase(0, n);
216
217 if (!msg.empty())
218 {
219 channel_.deliver(msg);
220 }
221 else
222 {
223
224 // We received a heartbeat message from the client. If there's
225 // nothing else being sent or ready to be sent, send a heartbeat
226 // right back.
227 if (output_queue_.empty())
228 {
229 output_queue_.push_back("\n");
230
231 // Signal that the output queue contains messages. Modifying
232 // the expiry will wake the output actor, if it is waiting on
233 // the timer.
234 non_empty_output_queue_.expires_at(
235 steady_timer::time_point::min());
236 }
237 }
238
239 read_line();
240 }
241 else
242 {
243 stop();
244 }
245 });
246 }
247
await_output()248 void await_output()
249 {
250 auto self(shared_from_this());
251 non_empty_output_queue_.async_wait(
252 [this, self](const boost::system::error_code& /*error*/)
253 {
254 // Check if the session was stopped while the operation was pending.
255 if (stopped())
256 return;
257
258 if (output_queue_.empty())
259 {
260 // There are no messages that are ready to be sent. The actor goes
261 // to sleep by waiting on the non_empty_output_queue_ timer. When a
262 // new message is added, the timer will be modified and the actor
263 // will wake.
264 non_empty_output_queue_.expires_at(steady_timer::time_point::max());
265 await_output();
266 }
267 else
268 {
269 write_line();
270 }
271 });
272 }
273
write_line()274 void write_line()
275 {
276 // Set a deadline for the write operation.
277 output_deadline_.expires_after(std::chrono::seconds(30));
278
279 // Start an asynchronous operation to send a message.
280 auto self(shared_from_this());
281 boost::asio::async_write(socket_,
282 boost::asio::buffer(output_queue_.front()),
283 [this, self](const boost::system::error_code& error, std::size_t /*n*/)
284 {
285 // Check if the session was stopped while the operation was pending.
286 if (stopped())
287 return;
288
289 if (!error)
290 {
291 output_queue_.pop_front();
292
293 await_output();
294 }
295 else
296 {
297 stop();
298 }
299 });
300 }
301
check_deadline(steady_timer & deadline)302 void check_deadline(steady_timer& deadline)
303 {
304 auto self(shared_from_this());
305 deadline.async_wait(
306 [this, self, &deadline](const boost::system::error_code& /*error*/)
307 {
308 // Check if the session was stopped while the operation was pending.
309 if (stopped())
310 return;
311
312 // Check whether the deadline has passed. We compare the deadline
313 // against the current time since a new asynchronous operation may
314 // have moved the deadline before this actor had a chance to run.
315 if (deadline.expiry() <= steady_timer::clock_type::now())
316 {
317 // The deadline has passed. Stop the session. The other actors will
318 // terminate as soon as possible.
319 stop();
320 }
321 else
322 {
323 // Put the actor back to sleep.
324 check_deadline(deadline);
325 }
326 });
327 }
328
329 channel& channel_;
330 tcp::socket socket_;
331 std::string input_buffer_;
332 steady_timer input_deadline_{socket_.get_executor()};
333 std::deque<std::string> output_queue_;
334 steady_timer non_empty_output_queue_{socket_.get_executor()};
335 steady_timer output_deadline_{socket_.get_executor()};
336 };
337
338 typedef std::shared_ptr<tcp_session> tcp_session_ptr;
339
340 //----------------------------------------------------------------------
341
342 class udp_broadcaster
343 : public subscriber
344 {
345 public:
udp_broadcaster(boost::asio::io_context & io_context,const udp::endpoint & broadcast_endpoint)346 udp_broadcaster(boost::asio::io_context& io_context,
347 const udp::endpoint& broadcast_endpoint)
348 : socket_(io_context)
349 {
350 socket_.connect(broadcast_endpoint);
351 socket_.set_option(udp::socket::broadcast(true));
352 }
353
354 private:
deliver(const std::string & msg)355 void deliver(const std::string& msg)
356 {
357 boost::system::error_code ignored_error;
358 socket_.send(boost::asio::buffer(msg), 0, ignored_error);
359 }
360
361 udp::socket socket_;
362 };
363
364 //----------------------------------------------------------------------
365
366 class server
367 {
368 public:
server(boost::asio::io_context & io_context,const tcp::endpoint & listen_endpoint,const udp::endpoint & broadcast_endpoint)369 server(boost::asio::io_context& io_context,
370 const tcp::endpoint& listen_endpoint,
371 const udp::endpoint& broadcast_endpoint)
372 : io_context_(io_context),
373 acceptor_(io_context, listen_endpoint)
374 {
375 channel_.join(
376 std::make_shared<udp_broadcaster>(
377 io_context_, broadcast_endpoint));
378
379 accept();
380 }
381
382 private:
accept()383 void accept()
384 {
385 acceptor_.async_accept(
386 [this](const boost::system::error_code& error, tcp::socket socket)
387 {
388 if (!error)
389 {
390 std::make_shared<tcp_session>(std::move(socket), channel_)->start();
391 }
392
393 accept();
394 });
395 }
396
397 boost::asio::io_context& io_context_;
398 tcp::acceptor acceptor_;
399 channel channel_;
400 };
401
402 //----------------------------------------------------------------------
403
main(int argc,char * argv[])404 int main(int argc, char* argv[])
405 {
406 try
407 {
408 using namespace std; // For atoi.
409
410 if (argc != 4)
411 {
412 std::cerr << "Usage: server <listen_port> <bcast_address> <bcast_port>\n";
413 return 1;
414 }
415
416 boost::asio::io_context io_context;
417
418 tcp::endpoint listen_endpoint(tcp::v4(), atoi(argv[1]));
419
420 udp::endpoint broadcast_endpoint(
421 boost::asio::ip::make_address(argv[2]), atoi(argv[3]));
422
423 server s(io_context, listen_endpoint, broadcast_endpoint);
424
425 io_context.run();
426 }
427 catch (std::exception& e)
428 {
429 std::cerr << "Exception: " << e.what() << "\n";
430 }
431
432 return 0;
433 }
434