• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // server.cpp
3 // ~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 
11 #include <algorithm>
12 #include <cstdlib>
13 #include <deque>
14 #include <iostream>
15 #include <set>
16 #include <string>
17 #include <boost/bind/bind.hpp>
18 #include <boost/shared_ptr.hpp>
19 #include <boost/enable_shared_from_this.hpp>
20 #include <boost/asio/buffer.hpp>
21 #include <boost/asio/io_context.hpp>
22 #include <boost/asio/ip/tcp.hpp>
23 #include <boost/asio/ip/udp.hpp>
24 #include <boost/asio/read_until.hpp>
25 #include <boost/asio/steady_timer.hpp>
26 #include <boost/asio/write.hpp>
27 
28 using boost::asio::steady_timer;
29 using boost::asio::ip::tcp;
30 using boost::asio::ip::udp;
31 
32 //----------------------------------------------------------------------
33 
34 class subscriber
35 {
36 public:
~subscriber()37   virtual ~subscriber() {}
38   virtual void deliver(const std::string& msg) = 0;
39 };
40 
41 typedef boost::shared_ptr<subscriber> subscriber_ptr;
42 
43 //----------------------------------------------------------------------
44 
45 class channel
46 {
47 public:
join(subscriber_ptr subscriber)48   void join(subscriber_ptr subscriber)
49   {
50     subscribers_.insert(subscriber);
51   }
52 
leave(subscriber_ptr subscriber)53   void leave(subscriber_ptr subscriber)
54   {
55     subscribers_.erase(subscriber);
56   }
57 
deliver(const std::string & msg)58   void deliver(const std::string& msg)
59   {
60     std::for_each(subscribers_.begin(), subscribers_.end(),
61         boost::bind(&subscriber::deliver,
62           boost::placeholders::_1, boost::ref(msg)));
63   }
64 
65 private:
66   std::set<subscriber_ptr> subscribers_;
67 };
68 
69 //----------------------------------------------------------------------
70 
71 //
72 // This class manages socket timeouts by applying the concept of a deadline.
73 // Some asynchronous operations are given deadlines by which they must complete.
74 // Deadlines are enforced by two "actors" that persist for the lifetime of the
75 // session object, one for input and one for output:
76 //
77 //  +----------------+                     +----------------+
78 //  |                |                     |                |
79 //  | check_deadline |<---+                | check_deadline |<---+
80 //  |                |    | async_wait()   |                |    | async_wait()
81 //  +----------------+    |  on input      +----------------+    |  on output
82 //              |         |  deadline                  |         |  deadline
83 //              +---------+                            +---------+
84 //
85 // If either deadline actor determines that the corresponding deadline has
86 // expired, the socket is closed and any outstanding operations are cancelled.
87 //
88 // The input actor reads messages from the socket, where messages are delimited
89 // by the newline character:
90 //
91 //  +------------+
92 //  |            |
93 //  | start_read |<---+
94 //  |            |    |
95 //  +------------+    |
96 //          |         |
97 //  async_- |    +-------------+
98 //   read_- |    |             |
99 //  until() +--->| handle_read |
100 //               |             |
101 //               +-------------+
102 //
103 // The deadline for receiving a complete message is 30 seconds. If a non-empty
104 // message is received, it is delivered to all subscribers. If a heartbeat (a
105 // message that consists of a single newline character) is received, a heartbeat
106 // is enqueued for the client, provided there are no other messages waiting to
107 // be sent.
108 //
109 // The output actor is responsible for sending messages to the client:
110 //
111 //  +--------------+
112 //  |              |<---------------------+
113 //  | await_output |                      |
114 //  |              |<---+                 |
115 //  +--------------+    |                 |
116 //      |      |        | async_wait()    |
117 //      |      +--------+                 |
118 //      V                                 |
119 //  +-------------+               +--------------+
120 //  |             | async_write() |              |
121 //  | start_write |-------------->| handle_write |
122 //  |             |               |              |
123 //  +-------------+               +--------------+
124 //
125 // The output actor first waits for an output message to be enqueued. It does
126 // this by using a steady_timer as an asynchronous condition variable. The
127 // steady_timer will be signalled whenever the output queue is non-empty.
128 //
129 // Once a message is available, it is sent to the client. The deadline for
130 // sending a complete message is 30 seconds. After the message is successfully
131 // sent, the output actor again waits for the output queue to become non-empty.
132 //
133 class tcp_session
134   : public subscriber,
135     public boost::enable_shared_from_this<tcp_session>
136 {
137 public:
tcp_session(boost::asio::io_context & io_context,channel & ch)138   tcp_session(boost::asio::io_context& io_context, channel& ch)
139     : channel_(ch),
140       socket_(io_context),
141       input_deadline_(io_context),
142       non_empty_output_queue_(io_context),
143       output_deadline_(io_context)
144   {
145     input_deadline_.expires_at(steady_timer::time_point::max());
146     output_deadline_.expires_at(steady_timer::time_point::max());
147 
148     // The non_empty_output_queue_ steady_timer is set to the maximum time
149     // point whenever the output queue is empty. This ensures that the output
150     // actor stays asleep until a message is put into the queue.
151     non_empty_output_queue_.expires_at(steady_timer::time_point::max());
152   }
153 
socket()154   tcp::socket& socket()
155   {
156     return socket_;
157   }
158 
159   // Called by the server object to initiate the four actors.
start()160   void start()
161   {
162     channel_.join(shared_from_this());
163 
164     start_read();
165 
166     input_deadline_.async_wait(
167         boost::bind(&tcp_session::check_deadline,
168         shared_from_this(), &input_deadline_));
169 
170     await_output();
171 
172     output_deadline_.async_wait(
173         boost::bind(&tcp_session::check_deadline,
174         shared_from_this(), &output_deadline_));
175   }
176 
177 private:
stop()178   void stop()
179   {
180     channel_.leave(shared_from_this());
181 
182     boost::system::error_code ignored_ec;
183     socket_.close(ignored_ec);
184     input_deadline_.cancel();
185     non_empty_output_queue_.cancel();
186     output_deadline_.cancel();
187   }
188 
stopped() const189   bool stopped() const
190   {
191     return !socket_.is_open();
192   }
193 
deliver(const std::string & msg)194   void deliver(const std::string& msg)
195   {
196     output_queue_.push_back(msg + "\n");
197 
198     // Signal that the output queue contains messages. Modifying the expiry
199     // will wake the output actor, if it is waiting on the timer.
200     non_empty_output_queue_.expires_at(steady_timer::time_point::min());
201   }
202 
start_read()203   void start_read()
204   {
205     // Set a deadline for the read operation.
206     input_deadline_.expires_after(boost::asio::chrono::seconds(30));
207 
208     // Start an asynchronous operation to read a newline-delimited message.
209     boost::asio::async_read_until(socket_,
210         boost::asio::dynamic_buffer(input_buffer_), '\n',
211         boost::bind(&tcp_session::handle_read, shared_from_this(),
212           boost::placeholders::_1, boost::placeholders::_2));
213   }
214 
handle_read(const boost::system::error_code & ec,std::size_t n)215   void handle_read(const boost::system::error_code& ec, std::size_t n)
216   {
217     if (stopped())
218       return;
219 
220     if (!ec)
221     {
222       // Extract the newline-delimited message from the buffer.
223       std::string msg(input_buffer_.substr(0, n - 1));
224       input_buffer_.erase(0, n);
225 
226       if (!msg.empty())
227       {
228         channel_.deliver(msg);
229       }
230       else
231       {
232         // We received a heartbeat message from the client. If there's nothing
233         // else being sent or ready to be sent, send a heartbeat right back.
234         if (output_queue_.empty())
235         {
236           output_queue_.push_back("\n");
237 
238           // Signal that the output queue contains messages. Modifying the
239           // expiry will wake the output actor, if it is waiting on the timer.
240           non_empty_output_queue_.expires_at(steady_timer::time_point::min());
241         }
242       }
243 
244       start_read();
245     }
246     else
247     {
248       stop();
249     }
250   }
251 
await_output()252   void await_output()
253   {
254     if (stopped())
255       return;
256 
257     if (output_queue_.empty())
258     {
259       // There are no messages that are ready to be sent. The actor goes to
260       // sleep by waiting on the non_empty_output_queue_ timer. When a new
261       // message is added, the timer will be modified and the actor will wake.
262       non_empty_output_queue_.expires_at(steady_timer::time_point::max());
263       non_empty_output_queue_.async_wait(
264           boost::bind(&tcp_session::await_output, shared_from_this()));
265     }
266     else
267     {
268       start_write();
269     }
270   }
271 
start_write()272   void start_write()
273   {
274     // Set a deadline for the write operation.
275     output_deadline_.expires_after(boost::asio::chrono::seconds(30));
276 
277     // Start an asynchronous operation to send a message.
278     boost::asio::async_write(socket_,
279         boost::asio::buffer(output_queue_.front()),
280         boost::bind(&tcp_session::handle_write,
281           shared_from_this(), boost::placeholders::_1));
282   }
283 
handle_write(const boost::system::error_code & ec)284   void handle_write(const boost::system::error_code& ec)
285   {
286     if (stopped())
287       return;
288 
289     if (!ec)
290     {
291       output_queue_.pop_front();
292 
293       await_output();
294     }
295     else
296     {
297       stop();
298     }
299   }
300 
check_deadline(steady_timer * deadline)301   void check_deadline(steady_timer* deadline)
302   {
303     if (stopped())
304       return;
305 
306     // Check whether the deadline has passed. We compare the deadline against
307     // the current time since a new asynchronous operation may have moved the
308     // deadline before this actor had a chance to run.
309     if (deadline->expiry() <= steady_timer::clock_type::now())
310     {
311       // The deadline has passed. Stop the session. The other actors will
312       // terminate as soon as possible.
313       stop();
314     }
315     else
316     {
317       // Put the actor back to sleep.
318       deadline->async_wait(
319           boost::bind(&tcp_session::check_deadline,
320           shared_from_this(), deadline));
321     }
322   }
323 
324   channel& channel_;
325   tcp::socket socket_;
326   std::string input_buffer_;
327   steady_timer input_deadline_;
328   std::deque<std::string> output_queue_;
329   steady_timer non_empty_output_queue_;
330   steady_timer output_deadline_;
331 };
332 
333 typedef boost::shared_ptr<tcp_session> tcp_session_ptr;
334 
335 //----------------------------------------------------------------------
336 
337 class udp_broadcaster
338   : public subscriber
339 {
340 public:
udp_broadcaster(boost::asio::io_context & io_context,const udp::endpoint & broadcast_endpoint)341   udp_broadcaster(boost::asio::io_context& io_context,
342       const udp::endpoint& broadcast_endpoint)
343     : socket_(io_context)
344   {
345     socket_.connect(broadcast_endpoint);
346     socket_.set_option(udp::socket::broadcast(true));
347   }
348 
349 private:
deliver(const std::string & msg)350   void deliver(const std::string& msg)
351   {
352     boost::system::error_code ignored_ec;
353     socket_.send(boost::asio::buffer(msg), 0, ignored_ec);
354   }
355 
356   udp::socket socket_;
357 };
358 
359 //----------------------------------------------------------------------
360 
361 class server
362 {
363 public:
server(boost::asio::io_context & io_context,const tcp::endpoint & listen_endpoint,const udp::endpoint & broadcast_endpoint)364   server(boost::asio::io_context& io_context,
365       const tcp::endpoint& listen_endpoint,
366       const udp::endpoint& broadcast_endpoint)
367     : io_context_(io_context),
368       acceptor_(io_context, listen_endpoint)
369   {
370     subscriber_ptr bc(new udp_broadcaster(io_context_, broadcast_endpoint));
371     channel_.join(bc);
372 
373     start_accept();
374   }
375 
start_accept()376   void start_accept()
377   {
378     tcp_session_ptr new_session(new tcp_session(io_context_, channel_));
379 
380     acceptor_.async_accept(new_session->socket(),
381         boost::bind(&server::handle_accept,
382           this, new_session, boost::placeholders::_1));
383   }
384 
handle_accept(tcp_session_ptr session,const boost::system::error_code & ec)385   void handle_accept(tcp_session_ptr session,
386       const boost::system::error_code& ec)
387   {
388     if (!ec)
389     {
390       session->start();
391     }
392 
393     start_accept();
394   }
395 
396 private:
397   boost::asio::io_context& io_context_;
398   tcp::acceptor acceptor_;
399   channel channel_;
400 };
401 
402 //----------------------------------------------------------------------
403 
main(int argc,char * argv[])404 int main(int argc, char* argv[])
405 {
406   try
407   {
408     using namespace std; // For atoi.
409 
410     if (argc != 4)
411     {
412       std::cerr << "Usage: server <listen_port> <bcast_address> <bcast_port>\n";
413       return 1;
414     }
415 
416     boost::asio::io_context io_context;
417 
418     tcp::endpoint listen_endpoint(tcp::v4(), atoi(argv[1]));
419 
420     udp::endpoint broadcast_endpoint(
421         boost::asio::ip::make_address(argv[2]), atoi(argv[3]));
422 
423     server s(io_context, listen_endpoint, broadcast_endpoint);
424 
425     io_context.run();
426   }
427   catch (std::exception& e)
428   {
429     std::cerr << "Exception: " << e.what() << "\n";
430   }
431 
432   return 0;
433 }
434