• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1  //
2  // server.cpp
3  // ~~~~~~~~~~
4  //
5  // Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6  //
7  // Distributed under the Boost Software License, Version 1.0. (See accompanying
8  // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9  //
10  
11  #include <boost/asio.hpp>
12  #include <boost/bind/bind.hpp>
13  #include <boost/shared_ptr.hpp>
14  #include <cmath>
15  #include <cstdlib>
16  #include <exception>
17  #include <iostream>
18  #include <set>
19  #include "protocol.hpp"
20  
21  using boost::asio::ip::tcp;
22  using boost::asio::ip::udp;
23  
24  typedef boost::shared_ptr<tcp::socket> tcp_socket_ptr;
25  typedef boost::shared_ptr<boost::asio::steady_timer> timer_ptr;
26  typedef boost::shared_ptr<control_request> control_request_ptr;
27  
28  class server
29  {
30  public:
31    // Construct the server to wait for incoming control connections.
server(boost::asio::io_context & io_context,unsigned short port)32    server(boost::asio::io_context& io_context, unsigned short port)
33      : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
34        timer_(io_context),
35        udp_socket_(io_context, udp::endpoint(udp::v4(), 0)),
36        next_frame_number_(1)
37    {
38      // Start waiting for a new control connection.
39      tcp_socket_ptr new_socket(new tcp::socket(acceptor_.get_executor()));
40      acceptor_.async_accept(*new_socket,
41          boost::bind(&server::handle_accept, this,
42            boost::asio::placeholders::error, new_socket));
43  
44      // Start the timer used to generate outgoing frames.
45      timer_.expires_after(boost::asio::chrono::milliseconds(100));
46      timer_.async_wait(boost::bind(&server::handle_timer, this));
47    }
48  
49    // Handle a new control connection.
handle_accept(const boost::system::error_code & ec,tcp_socket_ptr socket)50    void handle_accept(const boost::system::error_code& ec, tcp_socket_ptr socket)
51    {
52      if (!ec)
53      {
54        // Start receiving control requests on the connection.
55        control_request_ptr request(new control_request);
56        boost::asio::async_read(*socket, request->to_buffers(),
57            boost::bind(&server::handle_control_request, this,
58              boost::asio::placeholders::error, socket, request));
59      }
60  
61      // Start waiting for a new control connection.
62      tcp_socket_ptr new_socket(new tcp::socket(acceptor_.get_executor()));
63      acceptor_.async_accept(*new_socket,
64          boost::bind(&server::handle_accept, this,
65            boost::asio::placeholders::error, new_socket));
66    }
67  
68    // Handle a new control request.
handle_control_request(const boost::system::error_code & ec,tcp_socket_ptr socket,control_request_ptr request)69    void handle_control_request(const boost::system::error_code& ec,
70        tcp_socket_ptr socket, control_request_ptr request)
71    {
72      if (!ec)
73      {
74        // Delay handling of the control request to simulate network latency.
75        timer_ptr delay_timer(
76            new boost::asio::steady_timer(acceptor_.get_executor()));
77        delay_timer->expires_after(boost::asio::chrono::seconds(2));
78        delay_timer->async_wait(
79            boost::bind(&server::handle_control_request_timer, this,
80              socket, request, delay_timer));
81      }
82    }
83  
handle_control_request_timer(tcp_socket_ptr socket,control_request_ptr request,timer_ptr)84    void handle_control_request_timer(tcp_socket_ptr socket,
85        control_request_ptr request, timer_ptr /*delay_timer*/)
86    {
87      // Determine what address this client is connected from, since
88      // subscriptions must be stored on the server as a complete endpoint, not
89      // just a port. We use the non-throwing overload of remote_endpoint() since
90      // it may fail if the socket is no longer connected.
91      boost::system::error_code ec;
92      tcp::endpoint remote_endpoint = socket->remote_endpoint(ec);
93      if (!ec)
94      {
95        // Remove old port subscription, if any.
96        if (unsigned short old_port = request->old_port())
97        {
98          udp::endpoint old_endpoint(remote_endpoint.address(), old_port);
99          subscribers_.erase(old_endpoint);
100          std::cout << "Removing subscription " << old_endpoint << std::endl;
101        }
102  
103        // Add new port subscription, if any.
104        if (unsigned short new_port = request->new_port())
105        {
106          udp::endpoint new_endpoint(remote_endpoint.address(), new_port);
107          subscribers_.insert(new_endpoint);
108          std::cout << "Adding subscription " << new_endpoint << std::endl;
109        }
110      }
111  
112      // Wait for next control request on this connection.
113      boost::asio::async_read(*socket, request->to_buffers(),
114          boost::bind(&server::handle_control_request, this,
115            boost::asio::placeholders::error, socket, request));
116    }
117  
118    // Every time the timer fires we will generate a new frame and send it to all
119    // subscribers.
handle_timer()120    void handle_timer()
121    {
122      // Generate payload.
123      double x = next_frame_number_ * 0.2;
124      double y = std::sin(x);
125      int char_index = static_cast<int>((y + 1.0) * (frame::payload_size / 2));
126      std::string payload;
127      for (int i = 0; i < frame::payload_size; ++i)
128        payload += (i == char_index ? '*' : '.');
129  
130      // Create the frame to be sent to all subscribers.
131      frame f(next_frame_number_++, payload);
132  
133      // Send frame to all subscribers. We can use synchronous calls here since
134      // UDP send operations typically do not block.
135      std::set<udp::endpoint>::iterator j;
136      for (j = subscribers_.begin(); j != subscribers_.end(); ++j)
137      {
138        boost::system::error_code ec;
139        udp_socket_.send_to(f.to_buffers(), *j, 0, ec);
140      }
141  
142      // Wait for next timeout.
143      timer_.expires_after(boost::asio::chrono::milliseconds(100));
144      timer_.async_wait(boost::bind(&server::handle_timer, this));
145    }
146  
147  private:
148    // The acceptor used to accept incoming control connections.
149    tcp::acceptor acceptor_;
150  
151    // The timer used for generating data.
152    boost::asio::steady_timer timer_;
153  
154    // The socket used to send data to subscribers.
155    udp::socket udp_socket_;
156  
157    // The next frame number.
158    unsigned long next_frame_number_;
159  
160    // The set of endpoints that are subscribed.
161    std::set<udp::endpoint> subscribers_;
162  };
163  
main(int argc,char * argv[])164  int main(int argc, char* argv[])
165  {
166    try
167    {
168      if (argc != 2)
169      {
170        std::cerr << "Usage: server <port>\n";
171        return 1;
172      }
173  
174      boost::asio::io_context io_context;
175  
176      using namespace std; // For atoi.
177      server s(io_context, atoi(argv[1]));
178  
179      io_context.run();
180    }
181    catch (std::exception& e)
182    {
183      std::cerr << "Exception: " << e.what() << std::endl;
184    }
185  
186    return 0;
187  }
188