1 // 2 // server.cpp 3 // ~~~~~~~~~~ 4 // 5 // Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com) 6 // 7 // Distributed under the Boost Software License, Version 1.0. (See accompanying 8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 9 // 10 11 #include <boost/asio.hpp> 12 #include <boost/bind/bind.hpp> 13 #include <boost/shared_ptr.hpp> 14 #include <cmath> 15 #include <cstdlib> 16 #include <exception> 17 #include <iostream> 18 #include <set> 19 #include "protocol.hpp" 20 21 using boost::asio::ip::tcp; 22 using boost::asio::ip::udp; 23 24 typedef boost::shared_ptr<tcp::socket> tcp_socket_ptr; 25 typedef boost::shared_ptr<boost::asio::steady_timer> timer_ptr; 26 typedef boost::shared_ptr<control_request> control_request_ptr; 27 28 class server 29 { 30 public: 31 // Construct the server to wait for incoming control connections. server(boost::asio::io_context & io_context,unsigned short port)32 server(boost::asio::io_context& io_context, unsigned short port) 33 : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)), 34 timer_(io_context), 35 udp_socket_(io_context, udp::endpoint(udp::v4(), 0)), 36 next_frame_number_(1) 37 { 38 // Start waiting for a new control connection. 39 tcp_socket_ptr new_socket(new tcp::socket(acceptor_.get_executor())); 40 acceptor_.async_accept(*new_socket, 41 boost::bind(&server::handle_accept, this, 42 boost::asio::placeholders::error, new_socket)); 43 44 // Start the timer used to generate outgoing frames. 45 timer_.expires_after(boost::asio::chrono::milliseconds(100)); 46 timer_.async_wait(boost::bind(&server::handle_timer, this)); 47 } 48 49 // Handle a new control connection. handle_accept(const boost::system::error_code & ec,tcp_socket_ptr socket)50 void handle_accept(const boost::system::error_code& ec, tcp_socket_ptr socket) 51 { 52 if (!ec) 53 { 54 // Start receiving control requests on the connection. 55 control_request_ptr request(new control_request); 56 boost::asio::async_read(*socket, request->to_buffers(), 57 boost::bind(&server::handle_control_request, this, 58 boost::asio::placeholders::error, socket, request)); 59 } 60 61 // Start waiting for a new control connection. 62 tcp_socket_ptr new_socket(new tcp::socket(acceptor_.get_executor())); 63 acceptor_.async_accept(*new_socket, 64 boost::bind(&server::handle_accept, this, 65 boost::asio::placeholders::error, new_socket)); 66 } 67 68 // Handle a new control request. handle_control_request(const boost::system::error_code & ec,tcp_socket_ptr socket,control_request_ptr request)69 void handle_control_request(const boost::system::error_code& ec, 70 tcp_socket_ptr socket, control_request_ptr request) 71 { 72 if (!ec) 73 { 74 // Delay handling of the control request to simulate network latency. 75 timer_ptr delay_timer( 76 new boost::asio::steady_timer(acceptor_.get_executor())); 77 delay_timer->expires_after(boost::asio::chrono::seconds(2)); 78 delay_timer->async_wait( 79 boost::bind(&server::handle_control_request_timer, this, 80 socket, request, delay_timer)); 81 } 82 } 83 handle_control_request_timer(tcp_socket_ptr socket,control_request_ptr request,timer_ptr)84 void handle_control_request_timer(tcp_socket_ptr socket, 85 control_request_ptr request, timer_ptr /*delay_timer*/) 86 { 87 // Determine what address this client is connected from, since 88 // subscriptions must be stored on the server as a complete endpoint, not 89 // just a port. We use the non-throwing overload of remote_endpoint() since 90 // it may fail if the socket is no longer connected. 91 boost::system::error_code ec; 92 tcp::endpoint remote_endpoint = socket->remote_endpoint(ec); 93 if (!ec) 94 { 95 // Remove old port subscription, if any. 96 if (unsigned short old_port = request->old_port()) 97 { 98 udp::endpoint old_endpoint(remote_endpoint.address(), old_port); 99 subscribers_.erase(old_endpoint); 100 std::cout << "Removing subscription " << old_endpoint << std::endl; 101 } 102 103 // Add new port subscription, if any. 104 if (unsigned short new_port = request->new_port()) 105 { 106 udp::endpoint new_endpoint(remote_endpoint.address(), new_port); 107 subscribers_.insert(new_endpoint); 108 std::cout << "Adding subscription " << new_endpoint << std::endl; 109 } 110 } 111 112 // Wait for next control request on this connection. 113 boost::asio::async_read(*socket, request->to_buffers(), 114 boost::bind(&server::handle_control_request, this, 115 boost::asio::placeholders::error, socket, request)); 116 } 117 118 // Every time the timer fires we will generate a new frame and send it to all 119 // subscribers. handle_timer()120 void handle_timer() 121 { 122 // Generate payload. 123 double x = next_frame_number_ * 0.2; 124 double y = std::sin(x); 125 int char_index = static_cast<int>((y + 1.0) * (frame::payload_size / 2)); 126 std::string payload; 127 for (int i = 0; i < frame::payload_size; ++i) 128 payload += (i == char_index ? '*' : '.'); 129 130 // Create the frame to be sent to all subscribers. 131 frame f(next_frame_number_++, payload); 132 133 // Send frame to all subscribers. We can use synchronous calls here since 134 // UDP send operations typically do not block. 135 std::set<udp::endpoint>::iterator j; 136 for (j = subscribers_.begin(); j != subscribers_.end(); ++j) 137 { 138 boost::system::error_code ec; 139 udp_socket_.send_to(f.to_buffers(), *j, 0, ec); 140 } 141 142 // Wait for next timeout. 143 timer_.expires_after(boost::asio::chrono::milliseconds(100)); 144 timer_.async_wait(boost::bind(&server::handle_timer, this)); 145 } 146 147 private: 148 // The acceptor used to accept incoming control connections. 149 tcp::acceptor acceptor_; 150 151 // The timer used for generating data. 152 boost::asio::steady_timer timer_; 153 154 // The socket used to send data to subscribers. 155 udp::socket udp_socket_; 156 157 // The next frame number. 158 unsigned long next_frame_number_; 159 160 // The set of endpoints that are subscribed. 161 std::set<udp::endpoint> subscribers_; 162 }; 163 main(int argc,char * argv[])164 int main(int argc, char* argv[]) 165 { 166 try 167 { 168 if (argc != 2) 169 { 170 std::cerr << "Usage: server <port>\n"; 171 return 1; 172 } 173 174 boost::asio::io_context io_context; 175 176 using namespace std; // For atoi. 177 server s(io_context, atoi(argv[1])); 178 179 io_context.run(); 180 } 181 catch (std::exception& e) 182 { 183 std::cerr << "Exception: " << e.what() << std::endl; 184 } 185 186 return 0; 187 } 188