1 //
2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9
10 //------------------------------------------------------------------------------
11 //
12 // Example: WebSocket server, fast
13 //
14 //------------------------------------------------------------------------------
15
16 /* This server contains the following ports:
17
18 Synchronous <base port + 0>
19 Asynchronous <base port + 1>
20 Coroutine <base port + 2>
21
22 This program is optimized for the Autobahn|Testsuite
23 benchmarking and WebSocket compliants testing program.
24
25 See:
26 https://github.com/crossbario/autobahn-testsuite
27 */
28
29 #include <boost/beast/core.hpp>
30 #include <boost/beast/http.hpp>
31 #include <boost/beast/version.hpp>
32 #include <boost/beast/websocket.hpp>
33 #include <boost/asio/spawn.hpp>
34 #include <boost/asio/strand.hpp>
35 #include <algorithm>
36 #include <cstdlib>
37 #include <functional>
38 #include <iostream>
39 #include <memory>
40 #include <string>
41 #include <thread>
42 #include <vector>
43
44 namespace beast = boost::beast; // from <boost/beast.hpp>
45 namespace http = beast::http; // from <boost/beast/http.hpp>
46 namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
47 namespace net = boost::asio; // from <boost/asio.hpp>
48 using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
49
50 //------------------------------------------------------------------------------
51
52 // Report a failure
53 void
fail(beast::error_code ec,char const * what)54 fail(beast::error_code ec, char const* what)
55 {
56 std::cerr << (std::string(what) + ": " + ec.message() + "\n");
57 }
58
59 // Adjust settings on the stream
60 template<class NextLayer>
61 void
setup_stream(websocket::stream<NextLayer> & ws)62 setup_stream(websocket::stream<NextLayer>& ws)
63 {
64 // These values are tuned for Autobahn|Testsuite, and
65 // should also be generally helpful for increased performance.
66
67 websocket::permessage_deflate pmd;
68 pmd.client_enable = true;
69 pmd.server_enable = true;
70 pmd.compLevel = 3;
71 ws.set_option(pmd);
72
73 ws.auto_fragment(false);
74
75 // Autobahn|Testsuite needs this
76 ws.read_message_max(64 * 1024 * 1024);
77 }
78
79 //------------------------------------------------------------------------------
80
81 void
do_sync_session(websocket::stream<beast::tcp_stream> & ws)82 do_sync_session(websocket::stream<beast::tcp_stream>& ws)
83 {
84 beast::error_code ec;
85
86 setup_stream(ws);
87
88 // Set a decorator to change the Server of the handshake
89 ws.set_option(websocket::stream_base::decorator(
90 [](websocket::response_type& res)
91 {
92 res.set(http::field::server, std::string(
93 BOOST_BEAST_VERSION_STRING) + "-Sync");
94 }));
95
96 ws.accept(ec);
97 if(ec)
98 return fail(ec, "accept");
99
100 for(;;)
101 {
102 beast::flat_buffer buffer;
103
104 ws.read(buffer, ec);
105 if(ec == websocket::error::closed)
106 break;
107 if(ec)
108 return fail(ec, "read");
109 ws.text(ws.got_text());
110 ws.write(buffer.data(), ec);
111 if(ec)
112 return fail(ec, "write");
113 }
114 }
115
116 void
do_sync_listen(net::io_context & ioc,tcp::endpoint endpoint)117 do_sync_listen(
118 net::io_context& ioc,
119 tcp::endpoint endpoint)
120 {
121 beast::error_code ec;
122 tcp::acceptor acceptor{ioc, endpoint};
123 for(;;)
124 {
125 tcp::socket socket{ioc};
126
127 acceptor.accept(socket, ec);
128 if(ec)
129 return fail(ec, "accept");
130
131 std::thread(std::bind(
132 &do_sync_session,
133 websocket::stream<beast::tcp_stream>(
134 std::move(socket)))).detach();
135 }
136 }
137
138 //------------------------------------------------------------------------------
139
140 // Echoes back all received WebSocket messages
141 class async_session : public std::enable_shared_from_this<async_session>
142 {
143 websocket::stream<beast::tcp_stream> ws_;
144 beast::flat_buffer buffer_;
145
146 public:
147 // Take ownership of the socket
148 explicit
async_session(tcp::socket && socket)149 async_session(tcp::socket&& socket)
150 : ws_(std::move(socket))
151 {
152 setup_stream(ws_);
153 }
154
155 // Start the asynchronous operation
156 void
run()157 run()
158 {
159 // Set suggested timeout settings for the websocket
160 ws_.set_option(
161 websocket::stream_base::timeout::suggested(
162 beast::role_type::server));
163
164 // Set a decorator to change the Server of the handshake
165 ws_.set_option(websocket::stream_base::decorator(
166 [](websocket::response_type& res)
167 {
168 res.set(http::field::server, std::string(
169 BOOST_BEAST_VERSION_STRING) + "-Async");
170 }));
171
172 // Accept the websocket handshake
173 ws_.async_accept(
174 beast::bind_front_handler(
175 &async_session::on_accept,
176 shared_from_this()));
177 }
178
179 void
on_accept(beast::error_code ec)180 on_accept(beast::error_code ec)
181 {
182 if(ec)
183 return fail(ec, "accept");
184
185 // Read a message
186 do_read();
187 }
188
189 void
do_read()190 do_read()
191 {
192 // Read a message into our buffer
193 ws_.async_read(
194 buffer_,
195 beast::bind_front_handler(
196 &async_session::on_read,
197 shared_from_this()));
198 }
199
200 void
on_read(beast::error_code ec,std::size_t bytes_transferred)201 on_read(
202 beast::error_code ec,
203 std::size_t bytes_transferred)
204 {
205 boost::ignore_unused(bytes_transferred);
206
207 // This indicates that the async_session was closed
208 if(ec == websocket::error::closed)
209 return;
210
211 if(ec)
212 fail(ec, "read");
213
214 // Echo the message
215 ws_.text(ws_.got_text());
216 ws_.async_write(
217 buffer_.data(),
218 beast::bind_front_handler(
219 &async_session::on_write,
220 shared_from_this()));
221 }
222
223 void
on_write(beast::error_code ec,std::size_t bytes_transferred)224 on_write(
225 beast::error_code ec,
226 std::size_t bytes_transferred)
227 {
228 boost::ignore_unused(bytes_transferred);
229
230 if(ec)
231 return fail(ec, "write");
232
233 // Clear the buffer
234 buffer_.consume(buffer_.size());
235
236 // Do another read
237 do_read();
238 }
239 };
240
241 // Accepts incoming connections and launches the sessions
242 class async_listener : public std::enable_shared_from_this<async_listener>
243 {
244 net::io_context& ioc_;
245 tcp::acceptor acceptor_;
246
247 public:
async_listener(net::io_context & ioc,tcp::endpoint endpoint)248 async_listener(
249 net::io_context& ioc,
250 tcp::endpoint endpoint)
251 : ioc_(ioc)
252 , acceptor_(net::make_strand(ioc))
253 {
254 beast::error_code ec;
255
256 // Open the acceptor
257 acceptor_.open(endpoint.protocol(), ec);
258 if(ec)
259 {
260 fail(ec, "open");
261 return;
262 }
263
264 // Allow address reuse
265 acceptor_.set_option(net::socket_base::reuse_address(true), ec);
266 if(ec)
267 {
268 fail(ec, "set_option");
269 return;
270 }
271
272 // Bind to the server address
273 acceptor_.bind(endpoint, ec);
274 if(ec)
275 {
276 fail(ec, "bind");
277 return;
278 }
279
280 // Start listening for connections
281 acceptor_.listen(
282 net::socket_base::max_listen_connections, ec);
283 if(ec)
284 {
285 fail(ec, "listen");
286 return;
287 }
288 }
289
290 // Start accepting incoming connections
291 void
run()292 run()
293 {
294 do_accept();
295 }
296
297 private:
298 void
do_accept()299 do_accept()
300 {
301 // The new connection gets its own strand
302 acceptor_.async_accept(
303 net::make_strand(ioc_),
304 beast::bind_front_handler(
305 &async_listener::on_accept,
306 shared_from_this()));
307 }
308
309 void
on_accept(beast::error_code ec,tcp::socket socket)310 on_accept(beast::error_code ec, tcp::socket socket)
311 {
312 if(ec)
313 {
314 fail(ec, "accept");
315 }
316 else
317 {
318 // Create the async_session and run it
319 std::make_shared<async_session>(std::move(socket))->run();
320 }
321
322 // Accept another connection
323 do_accept();
324 }
325 };
326
327 //------------------------------------------------------------------------------
328
329 void
do_coro_session(websocket::stream<beast::tcp_stream> & ws,net::yield_context yield)330 do_coro_session(
331 websocket::stream<beast::tcp_stream>& ws,
332 net::yield_context yield)
333 {
334 beast::error_code ec;
335
336 setup_stream(ws);
337
338 // Set suggested timeout settings for the websocket
339 ws.set_option(
340 websocket::stream_base::timeout::suggested(
341 beast::role_type::server));
342
343 // Set a decorator to change the Server of the handshake
344 ws.set_option(websocket::stream_base::decorator(
345 [](websocket::response_type& res)
346 {
347 res.set(http::field::server, std::string(
348 BOOST_BEAST_VERSION_STRING) + "-Fiber");
349 }));
350
351 ws.async_accept(yield[ec]);
352 if(ec)
353 return fail(ec, "accept");
354
355 for(;;)
356 {
357 beast::flat_buffer buffer;
358
359 ws.async_read(buffer, yield[ec]);
360 if(ec == websocket::error::closed)
361 break;
362 if(ec)
363 return fail(ec, "read");
364
365 ws.text(ws.got_text());
366 ws.async_write(buffer.data(), yield[ec]);
367 if(ec)
368 return fail(ec, "write");
369 }
370 }
371
372 void
do_coro_listen(net::io_context & ioc,tcp::endpoint endpoint,net::yield_context yield)373 do_coro_listen(
374 net::io_context& ioc,
375 tcp::endpoint endpoint,
376 net::yield_context yield)
377 {
378 beast::error_code ec;
379
380 tcp::acceptor acceptor(ioc);
381 acceptor.open(endpoint.protocol(), ec);
382 if(ec)
383 return fail(ec, "open");
384
385 acceptor.set_option(net::socket_base::reuse_address(true), ec);
386 if(ec)
387 return fail(ec, "set_option");
388
389 acceptor.bind(endpoint, ec);
390 if(ec)
391 return fail(ec, "bind");
392
393 acceptor.listen(net::socket_base::max_listen_connections, ec);
394 if(ec)
395 return fail(ec, "listen");
396
397 for(;;)
398 {
399 tcp::socket socket(ioc);
400
401 acceptor.async_accept(socket, yield[ec]);
402 if(ec)
403 {
404 fail(ec, "accept");
405 continue;
406 }
407
408 boost::asio::spawn(
409 acceptor.get_executor(),
410 std::bind(
411 &do_coro_session,
412 websocket::stream<
413 beast::tcp_stream>(std::move(socket)),
414 std::placeholders::_1));
415 }
416 }
417
418 //------------------------------------------------------------------------------
419
main(int argc,char * argv[])420 int main(int argc, char* argv[])
421 {
422 // Check command line arguments.
423 if (argc != 4)
424 {
425 std::cerr <<
426 "Usage: websocket-server-fast <address> <starting-port> <threads>\n" <<
427 "Example:\n"
428 " websocket-server-fast 0.0.0.0 8080 1\n"
429 " Connect to:\n"
430 " starting-port+0 for synchronous,\n"
431 " starting-port+1 for asynchronous,\n"
432 " starting-port+2 for coroutine.\n";
433 return EXIT_FAILURE;
434 }
435 auto const address = net::ip::make_address(argv[1]);
436 auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
437 auto const threads = std::max<int>(1, std::atoi(argv[3]));
438
439 // The io_context is required for all I/O
440 net::io_context ioc{threads};
441
442 // Create sync port
443 std::thread(beast::bind_front_handler(
444 &do_sync_listen,
445 std::ref(ioc),
446 tcp::endpoint{
447 address,
448 static_cast<unsigned short>(port + 0u)}
449 )).detach();
450
451 // Create async port
452 std::make_shared<async_listener>(
453 ioc,
454 tcp::endpoint{
455 address,
456 static_cast<unsigned short>(port + 1u)})->run();
457
458 // Create coro port
459 boost::asio::spawn(ioc,
460 std::bind(
461 &do_coro_listen,
462 std::ref(ioc),
463 tcp::endpoint{
464 address,
465 static_cast<unsigned short>(port + 2u)},
466 std::placeholders::_1));
467
468 // Run the I/O service on the requested number of threads
469 std::vector<std::thread> v;
470 v.reserve(threads - 1);
471 for(auto i = threads - 1; i > 0; --i)
472 v.emplace_back(
473 [&ioc]
474 {
475 ioc.run();
476 });
477 ioc.run();
478
479 return EXIT_SUCCESS;
480 }
481