• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // server.cpp
3 // ~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 
11 #include <algorithm>
12 #include <cstdlib>
13 #include <deque>
14 #include <iostream>
15 #include <memory>
16 #include <set>
17 #include <string>
18 #include <boost/asio/buffer.hpp>
19 #include <boost/asio/io_context.hpp>
20 #include <boost/asio/ip/tcp.hpp>
21 #include <boost/asio/ip/udp.hpp>
22 #include <boost/asio/read_until.hpp>
23 #include <boost/asio/steady_timer.hpp>
24 #include <boost/asio/write.hpp>
25 
26 using boost::asio::steady_timer;
27 using boost::asio::ip::tcp;
28 using boost::asio::ip::udp;
29 
30 //----------------------------------------------------------------------
31 
32 class subscriber
33 {
34 public:
35   virtual ~subscriber() = default;
36   virtual void deliver(const std::string& msg) = 0;
37 };
38 
39 typedef std::shared_ptr<subscriber> subscriber_ptr;
40 
41 //----------------------------------------------------------------------
42 
43 class channel
44 {
45 public:
join(subscriber_ptr subscriber)46   void join(subscriber_ptr subscriber)
47   {
48     subscribers_.insert(subscriber);
49   }
50 
leave(subscriber_ptr subscriber)51   void leave(subscriber_ptr subscriber)
52   {
53     subscribers_.erase(subscriber);
54   }
55 
deliver(const std::string & msg)56   void deliver(const std::string& msg)
57   {
58     for (const auto& s : subscribers_)
59     {
60       s->deliver(msg);
61     }
62   }
63 
64 private:
65   std::set<subscriber_ptr> subscribers_;
66 };
67 
68 //----------------------------------------------------------------------
69 
70 //
71 // This class manages socket timeouts by applying the concept of a deadline.
72 // Some asynchronous operations are given deadlines by which they must complete.
73 // Deadlines are enforced by two "actors" that persist for the lifetime of the
74 // session object, one for input and one for output:
75 //
76 //  +----------------+                      +----------------+
77 //  |                |                      |                |
78 //  | check_deadline |<-------+             | check_deadline |<-------+
79 //  |                |        |             |                |        |
80 //  +----------------+        |             +----------------+        |
81 //               |            |                          |            |
82 //  async_wait() |    +----------------+    async_wait() |    +----------------+
83 //   on input    |    |     lambda     |     on output   |    |     lambda     |
84 //   deadline    +--->|       in       |     deadline    +--->|       in       |
85 //                    | check_deadline |                      | check_deadline |
86 //                    +----------------+                      +----------------+
87 //
88 // If either deadline actor determines that the corresponding deadline has
89 // expired, the socket is closed and any outstanding operations are cancelled.
90 //
91 // The input actor reads messages from the socket, where messages are delimited
92 // by the newline character:
93 //
94 //  +-------------+
95 //  |             |
96 //  |  read_line  |<----+
97 //  |             |     |
98 //  +-------------+     |
99 //          |           |
100 //  async_- |    +-------------+
101 //   read_- |    |   lambda    |
102 //  until() +--->|     in      |
103 //               |  read_line  |
104 //               +-------------+
105 //
106 // The deadline for receiving a complete message is 30 seconds. If a non-empty
107 // message is received, it is delivered to all subscribers. If a heartbeat (a
108 // message that consists of a single newline character) is received, a heartbeat
109 // is enqueued for the client, provided there are no other messages waiting to
110 // be sent.
111 //
112 // The output actor is responsible for sending messages to the client:
113 //
114 //  +----------------+
115 //  |                |<---------------------+
116 //  |  await_output  |                      |
117 //  |                |<-------+             |
118 //  +----------------+        |             |
119 //    |            |          |             |
120 //    |    async_- |  +----------------+    |
121 //    |     wait() |  |     lambda     |    |
122 //    |            +->|       in       |    |
123 //    |               |  await_output  |    |
124 //    |               +----------------+    |
125 //    V                                     |
126 //  +--------------+               +--------------+
127 //  |              | async_write() |    lambda    |
128 //  |  write_line  |-------------->|      in      |
129 //  |              |               |  write_line  |
130 //  +--------------+               +--------------+
131 //
132 // The output actor first waits for an output message to be enqueued. It does
133 // this by using a steady_timer as an asynchronous condition variable. The
134 // steady_timer will be signalled whenever the output queue is non-empty.
135 //
136 // Once a message is available, it is sent to the client. The deadline for
137 // sending a complete message is 30 seconds. After the message is successfully
138 // sent, the output actor again waits for the output queue to become non-empty.
139 //
140 class tcp_session
141   : public subscriber,
142     public std::enable_shared_from_this<tcp_session>
143 {
144 public:
tcp_session(tcp::socket socket,channel & ch)145   tcp_session(tcp::socket socket, channel& ch)
146     : channel_(ch),
147       socket_(std::move(socket))
148   {
149     input_deadline_.expires_at(steady_timer::time_point::max());
150     output_deadline_.expires_at(steady_timer::time_point::max());
151 
152     // The non_empty_output_queue_ steady_timer is set to the maximum time
153     // point whenever the output queue is empty. This ensures that the output
154     // actor stays asleep until a message is put into the queue.
155     non_empty_output_queue_.expires_at(steady_timer::time_point::max());
156   }
157 
158   // Called by the server object to initiate the four actors.
start()159   void start()
160   {
161     channel_.join(shared_from_this());
162 
163     read_line();
164     check_deadline(input_deadline_);
165 
166     await_output();
167     check_deadline(output_deadline_);
168   }
169 
170 private:
stop()171   void stop()
172   {
173     channel_.leave(shared_from_this());
174 
175     boost::system::error_code ignored_error;
176     socket_.close(ignored_error);
177     input_deadline_.cancel();
178     non_empty_output_queue_.cancel();
179     output_deadline_.cancel();
180   }
181 
stopped() const182   bool stopped() const
183   {
184     return !socket_.is_open();
185   }
186 
deliver(const std::string & msg)187   void deliver(const std::string& msg) override
188   {
189     output_queue_.push_back(msg + "\n");
190 
191     // Signal that the output queue contains messages. Modifying the expiry
192     // will wake the output actor, if it is waiting on the timer.
193     non_empty_output_queue_.expires_at(steady_timer::time_point::min());
194   }
195 
read_line()196   void read_line()
197   {
198     // Set a deadline for the read operation.
199     input_deadline_.expires_after(std::chrono::seconds(30));
200 
201     // Start an asynchronous operation to read a newline-delimited message.
202     auto self(shared_from_this());
203     boost::asio::async_read_until(socket_,
204         boost::asio::dynamic_buffer(input_buffer_), '\n',
205         [this, self](const boost::system::error_code& error, std::size_t n)
206         {
207           // Check if the session was stopped while the operation was pending.
208           if (stopped())
209             return;
210 
211           if (!error)
212           {
213             // Extract the newline-delimited message from the buffer.
214             std::string msg(input_buffer_.substr(0, n - 1));
215             input_buffer_.erase(0, n);
216 
217             if (!msg.empty())
218             {
219               channel_.deliver(msg);
220             }
221             else
222             {
223 
224               // We received a heartbeat message from the client. If there's
225               // nothing else being sent or ready to be sent, send a heartbeat
226               // right back.
227               if (output_queue_.empty())
228               {
229                 output_queue_.push_back("\n");
230 
231                 // Signal that the output queue contains messages. Modifying
232                 // the expiry will wake the output actor, if it is waiting on
233                 // the timer.
234                 non_empty_output_queue_.expires_at(
235                     steady_timer::time_point::min());
236               }
237             }
238 
239             read_line();
240           }
241           else
242           {
243             stop();
244           }
245         });
246   }
247 
await_output()248   void await_output()
249   {
250     auto self(shared_from_this());
251     non_empty_output_queue_.async_wait(
252         [this, self](const boost::system::error_code& /*error*/)
253         {
254           // Check if the session was stopped while the operation was pending.
255           if (stopped())
256             return;
257 
258           if (output_queue_.empty())
259           {
260             // There are no messages that are ready to be sent. The actor goes
261             // to sleep by waiting on the non_empty_output_queue_ timer. When a
262             // new message is added, the timer will be modified and the actor
263             // will wake.
264             non_empty_output_queue_.expires_at(steady_timer::time_point::max());
265             await_output();
266           }
267           else
268           {
269             write_line();
270           }
271         });
272   }
273 
write_line()274   void write_line()
275   {
276     // Set a deadline for the write operation.
277     output_deadline_.expires_after(std::chrono::seconds(30));
278 
279     // Start an asynchronous operation to send a message.
280     auto self(shared_from_this());
281     boost::asio::async_write(socket_,
282         boost::asio::buffer(output_queue_.front()),
283         [this, self](const boost::system::error_code& error, std::size_t /*n*/)
284         {
285           // Check if the session was stopped while the operation was pending.
286           if (stopped())
287             return;
288 
289           if (!error)
290           {
291             output_queue_.pop_front();
292 
293             await_output();
294           }
295           else
296           {
297             stop();
298           }
299         });
300   }
301 
check_deadline(steady_timer & deadline)302   void check_deadline(steady_timer& deadline)
303   {
304     auto self(shared_from_this());
305     deadline.async_wait(
306         [this, self, &deadline](const boost::system::error_code& /*error*/)
307         {
308           // Check if the session was stopped while the operation was pending.
309           if (stopped())
310             return;
311 
312           // Check whether the deadline has passed. We compare the deadline
313           // against the current time since a new asynchronous operation may
314           // have moved the deadline before this actor had a chance to run.
315           if (deadline.expiry() <= steady_timer::clock_type::now())
316           {
317             // The deadline has passed. Stop the session. The other actors will
318             // terminate as soon as possible.
319             stop();
320           }
321           else
322           {
323             // Put the actor back to sleep.
324             check_deadline(deadline);
325           }
326         });
327   }
328 
329   channel& channel_;
330   tcp::socket socket_;
331   std::string input_buffer_;
332   steady_timer input_deadline_{socket_.get_executor()};
333   std::deque<std::string> output_queue_;
334   steady_timer non_empty_output_queue_{socket_.get_executor()};
335   steady_timer output_deadline_{socket_.get_executor()};
336 };
337 
338 typedef std::shared_ptr<tcp_session> tcp_session_ptr;
339 
340 //----------------------------------------------------------------------
341 
342 class udp_broadcaster
343   : public subscriber
344 {
345 public:
udp_broadcaster(boost::asio::io_context & io_context,const udp::endpoint & broadcast_endpoint)346   udp_broadcaster(boost::asio::io_context& io_context,
347       const udp::endpoint& broadcast_endpoint)
348     : socket_(io_context)
349   {
350     socket_.connect(broadcast_endpoint);
351     socket_.set_option(udp::socket::broadcast(true));
352   }
353 
354 private:
deliver(const std::string & msg)355   void deliver(const std::string& msg)
356   {
357     boost::system::error_code ignored_error;
358     socket_.send(boost::asio::buffer(msg), 0, ignored_error);
359   }
360 
361   udp::socket socket_;
362 };
363 
364 //----------------------------------------------------------------------
365 
366 class server
367 {
368 public:
server(boost::asio::io_context & io_context,const tcp::endpoint & listen_endpoint,const udp::endpoint & broadcast_endpoint)369   server(boost::asio::io_context& io_context,
370       const tcp::endpoint& listen_endpoint,
371       const udp::endpoint& broadcast_endpoint)
372     : io_context_(io_context),
373       acceptor_(io_context, listen_endpoint)
374   {
375     channel_.join(
376         std::make_shared<udp_broadcaster>(
377           io_context_, broadcast_endpoint));
378 
379     accept();
380   }
381 
382 private:
accept()383   void accept()
384   {
385     acceptor_.async_accept(
386         [this](const boost::system::error_code& error, tcp::socket socket)
387         {
388           if (!error)
389           {
390             std::make_shared<tcp_session>(std::move(socket), channel_)->start();
391           }
392 
393           accept();
394         });
395   }
396 
397   boost::asio::io_context& io_context_;
398   tcp::acceptor acceptor_;
399   channel channel_;
400 };
401 
402 //----------------------------------------------------------------------
403 
main(int argc,char * argv[])404 int main(int argc, char* argv[])
405 {
406   try
407   {
408     using namespace std; // For atoi.
409 
410     if (argc != 4)
411     {
412       std::cerr << "Usage: server <listen_port> <bcast_address> <bcast_port>\n";
413       return 1;
414     }
415 
416     boost::asio::io_context io_context;
417 
418     tcp::endpoint listen_endpoint(tcp::v4(), atoi(argv[1]));
419 
420     udp::endpoint broadcast_endpoint(
421         boost::asio::ip::make_address(argv[2]), atoi(argv[3]));
422 
423     server s(io_context, listen_endpoint, broadcast_endpoint);
424 
425     io_context.run();
426   }
427   catch (std::exception& e)
428   {
429     std::cerr << "Exception: " << e.what() << "\n";
430   }
431 
432   return 0;
433 }
434