• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9 
10 //------------------------------------------------------------------------------
11 //
12 // Example: WebSocket server, fast
13 //
14 //------------------------------------------------------------------------------
15 
16 /*  This server contains the following ports:
17 
18     Synchronous     <base port + 0>
19     Asynchronous    <base port + 1>
20     Coroutine       <base port + 2>
21 
22     This program is optimized for the Autobahn|Testsuite
23     benchmarking and WebSocket compliants testing program.
24 
25     See:
26     https://github.com/crossbario/autobahn-testsuite
27 */
28 
29 #include <boost/beast/core.hpp>
30 #include <boost/beast/http.hpp>
31 #include <boost/beast/version.hpp>
32 #include <boost/beast/websocket.hpp>
33 #include <boost/asio/spawn.hpp>
34 #include <boost/asio/strand.hpp>
35 #include <algorithm>
36 #include <cstdlib>
37 #include <functional>
38 #include <iostream>
39 #include <memory>
40 #include <string>
41 #include <thread>
42 #include <vector>
43 
44 namespace beast = boost::beast;         // from <boost/beast.hpp>
45 namespace http = beast::http;           // from <boost/beast/http.hpp>
46 namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
47 namespace net = boost::asio;            // from <boost/asio.hpp>
48 using tcp = boost::asio::ip::tcp;       // from <boost/asio/ip/tcp.hpp>
49 
50 //------------------------------------------------------------------------------
51 
52 // Report a failure
53 void
fail(beast::error_code ec,char const * what)54 fail(beast::error_code ec, char const* what)
55 {
56     std::cerr << (std::string(what) + ": " + ec.message() + "\n");
57 }
58 
59 // Adjust settings on the stream
60 template<class NextLayer>
61 void
setup_stream(websocket::stream<NextLayer> & ws)62 setup_stream(websocket::stream<NextLayer>& ws)
63 {
64     // These values are tuned for Autobahn|Testsuite, and
65     // should also be generally helpful for increased performance.
66 
67     websocket::permessage_deflate pmd;
68     pmd.client_enable = true;
69     pmd.server_enable = true;
70     pmd.compLevel = 3;
71     ws.set_option(pmd);
72 
73     ws.auto_fragment(false);
74 
75     // Autobahn|Testsuite needs this
76     ws.read_message_max(64 * 1024 * 1024);
77 }
78 
79 //------------------------------------------------------------------------------
80 
81 void
do_sync_session(websocket::stream<beast::tcp_stream> & ws)82 do_sync_session(websocket::stream<beast::tcp_stream>& ws)
83 {
84     beast::error_code ec;
85 
86     setup_stream(ws);
87 
88     // Set a decorator to change the Server of the handshake
89     ws.set_option(websocket::stream_base::decorator(
90         [](websocket::response_type& res)
91         {
92             res.set(http::field::server, std::string(
93                 BOOST_BEAST_VERSION_STRING) + "-Sync");
94         }));
95 
96     ws.accept(ec);
97     if(ec)
98         return fail(ec, "accept");
99 
100     for(;;)
101     {
102         beast::flat_buffer buffer;
103 
104         ws.read(buffer, ec);
105         if(ec == websocket::error::closed)
106             break;
107         if(ec)
108             return fail(ec, "read");
109         ws.text(ws.got_text());
110         ws.write(buffer.data(), ec);
111         if(ec)
112             return fail(ec, "write");
113     }
114 }
115 
116 void
do_sync_listen(net::io_context & ioc,tcp::endpoint endpoint)117 do_sync_listen(
118     net::io_context& ioc,
119     tcp::endpoint endpoint)
120 {
121     beast::error_code ec;
122     tcp::acceptor acceptor{ioc, endpoint};
123     for(;;)
124     {
125         tcp::socket socket{ioc};
126 
127         acceptor.accept(socket, ec);
128         if(ec)
129             return fail(ec, "accept");
130 
131         std::thread(std::bind(
132             &do_sync_session,
133             websocket::stream<beast::tcp_stream>(
134                 std::move(socket)))).detach();
135     }
136 }
137 
138 //------------------------------------------------------------------------------
139 
140 // Echoes back all received WebSocket messages
141 class async_session : public std::enable_shared_from_this<async_session>
142 {
143     websocket::stream<beast::tcp_stream> ws_;
144     beast::flat_buffer buffer_;
145 
146 public:
147     // Take ownership of the socket
148     explicit
async_session(tcp::socket && socket)149     async_session(tcp::socket&& socket)
150         : ws_(std::move(socket))
151     {
152         setup_stream(ws_);
153     }
154 
155     // Start the asynchronous operation
156     void
run()157     run()
158     {
159         // Set suggested timeout settings for the websocket
160         ws_.set_option(
161             websocket::stream_base::timeout::suggested(
162                 beast::role_type::server));
163 
164         // Set a decorator to change the Server of the handshake
165         ws_.set_option(websocket::stream_base::decorator(
166             [](websocket::response_type& res)
167             {
168                 res.set(http::field::server, std::string(
169                     BOOST_BEAST_VERSION_STRING) + "-Async");
170             }));
171 
172         // Accept the websocket handshake
173         ws_.async_accept(
174             beast::bind_front_handler(
175                 &async_session::on_accept,
176                 shared_from_this()));
177     }
178 
179     void
on_accept(beast::error_code ec)180     on_accept(beast::error_code ec)
181     {
182         if(ec)
183             return fail(ec, "accept");
184 
185         // Read a message
186         do_read();
187     }
188 
189     void
do_read()190     do_read()
191     {
192         // Read a message into our buffer
193         ws_.async_read(
194             buffer_,
195             beast::bind_front_handler(
196                 &async_session::on_read,
197                 shared_from_this()));
198     }
199 
200     void
on_read(beast::error_code ec,std::size_t bytes_transferred)201     on_read(
202         beast::error_code ec,
203         std::size_t bytes_transferred)
204     {
205         boost::ignore_unused(bytes_transferred);
206 
207         // This indicates that the async_session was closed
208         if(ec == websocket::error::closed)
209             return;
210 
211         if(ec)
212             fail(ec, "read");
213 
214         // Echo the message
215         ws_.text(ws_.got_text());
216         ws_.async_write(
217             buffer_.data(),
218             beast::bind_front_handler(
219                 &async_session::on_write,
220                 shared_from_this()));
221     }
222 
223     void
on_write(beast::error_code ec,std::size_t bytes_transferred)224     on_write(
225         beast::error_code ec,
226         std::size_t bytes_transferred)
227     {
228         boost::ignore_unused(bytes_transferred);
229 
230         if(ec)
231             return fail(ec, "write");
232 
233         // Clear the buffer
234         buffer_.consume(buffer_.size());
235 
236         // Do another read
237         do_read();
238     }
239 };
240 
241 // Accepts incoming connections and launches the sessions
242 class async_listener : public std::enable_shared_from_this<async_listener>
243 {
244     net::io_context& ioc_;
245     tcp::acceptor acceptor_;
246 
247 public:
async_listener(net::io_context & ioc,tcp::endpoint endpoint)248     async_listener(
249         net::io_context& ioc,
250         tcp::endpoint endpoint)
251         : ioc_(ioc)
252         , acceptor_(net::make_strand(ioc))
253     {
254         beast::error_code ec;
255 
256         // Open the acceptor
257         acceptor_.open(endpoint.protocol(), ec);
258         if(ec)
259         {
260             fail(ec, "open");
261             return;
262         }
263 
264         // Allow address reuse
265         acceptor_.set_option(net::socket_base::reuse_address(true), ec);
266         if(ec)
267         {
268             fail(ec, "set_option");
269             return;
270         }
271 
272         // Bind to the server address
273         acceptor_.bind(endpoint, ec);
274         if(ec)
275         {
276             fail(ec, "bind");
277             return;
278         }
279 
280         // Start listening for connections
281         acceptor_.listen(
282             net::socket_base::max_listen_connections, ec);
283         if(ec)
284         {
285             fail(ec, "listen");
286             return;
287         }
288     }
289 
290     // Start accepting incoming connections
291     void
run()292     run()
293     {
294         do_accept();
295     }
296 
297 private:
298     void
do_accept()299     do_accept()
300     {
301         // The new connection gets its own strand
302         acceptor_.async_accept(
303             net::make_strand(ioc_),
304             beast::bind_front_handler(
305                 &async_listener::on_accept,
306                 shared_from_this()));
307     }
308 
309     void
on_accept(beast::error_code ec,tcp::socket socket)310     on_accept(beast::error_code ec, tcp::socket socket)
311     {
312         if(ec)
313         {
314             fail(ec, "accept");
315         }
316         else
317         {
318             // Create the async_session and run it
319             std::make_shared<async_session>(std::move(socket))->run();
320         }
321 
322         // Accept another connection
323         do_accept();
324     }
325 };
326 
327 //------------------------------------------------------------------------------
328 
329 void
do_coro_session(websocket::stream<beast::tcp_stream> & ws,net::yield_context yield)330 do_coro_session(
331     websocket::stream<beast::tcp_stream>& ws,
332     net::yield_context yield)
333 {
334     beast::error_code ec;
335 
336     setup_stream(ws);
337 
338     // Set suggested timeout settings for the websocket
339     ws.set_option(
340         websocket::stream_base::timeout::suggested(
341             beast::role_type::server));
342 
343     // Set a decorator to change the Server of the handshake
344     ws.set_option(websocket::stream_base::decorator(
345         [](websocket::response_type& res)
346         {
347             res.set(http::field::server, std::string(
348                 BOOST_BEAST_VERSION_STRING) + "-Fiber");
349         }));
350 
351     ws.async_accept(yield[ec]);
352     if(ec)
353         return fail(ec, "accept");
354 
355     for(;;)
356     {
357         beast::flat_buffer buffer;
358 
359         ws.async_read(buffer, yield[ec]);
360         if(ec == websocket::error::closed)
361             break;
362         if(ec)
363             return fail(ec, "read");
364 
365         ws.text(ws.got_text());
366         ws.async_write(buffer.data(), yield[ec]);
367         if(ec)
368             return fail(ec, "write");
369     }
370 }
371 
372 void
do_coro_listen(net::io_context & ioc,tcp::endpoint endpoint,net::yield_context yield)373 do_coro_listen(
374     net::io_context& ioc,
375     tcp::endpoint endpoint,
376     net::yield_context yield)
377 {
378     beast::error_code ec;
379 
380     tcp::acceptor acceptor(ioc);
381     acceptor.open(endpoint.protocol(), ec);
382     if(ec)
383         return fail(ec, "open");
384 
385     acceptor.set_option(net::socket_base::reuse_address(true), ec);
386     if(ec)
387         return fail(ec, "set_option");
388 
389     acceptor.bind(endpoint, ec);
390     if(ec)
391         return fail(ec, "bind");
392 
393     acceptor.listen(net::socket_base::max_listen_connections, ec);
394     if(ec)
395         return fail(ec, "listen");
396 
397     for(;;)
398     {
399         tcp::socket socket(ioc);
400 
401         acceptor.async_accept(socket, yield[ec]);
402         if(ec)
403         {
404             fail(ec, "accept");
405             continue;
406         }
407 
408         boost::asio::spawn(
409             acceptor.get_executor(),
410             std::bind(
411                 &do_coro_session,
412                 websocket::stream<
413                     beast::tcp_stream>(std::move(socket)),
414                 std::placeholders::_1));
415     }
416 }
417 
418 //------------------------------------------------------------------------------
419 
main(int argc,char * argv[])420 int main(int argc, char* argv[])
421 {
422     // Check command line arguments.
423     if (argc != 4)
424     {
425         std::cerr <<
426             "Usage: websocket-server-fast <address> <starting-port> <threads>\n" <<
427             "Example:\n"
428             "    websocket-server-fast 0.0.0.0 8080 1\n"
429             "  Connect to:\n"
430             "    starting-port+0 for synchronous,\n"
431             "    starting-port+1 for asynchronous,\n"
432             "    starting-port+2 for coroutine.\n";
433         return EXIT_FAILURE;
434     }
435     auto const address = net::ip::make_address(argv[1]);
436     auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
437     auto const threads = std::max<int>(1, std::atoi(argv[3]));
438 
439     // The io_context is required for all I/O
440     net::io_context ioc{threads};
441 
442     // Create sync port
443     std::thread(beast::bind_front_handler(
444         &do_sync_listen,
445         std::ref(ioc),
446         tcp::endpoint{
447             address,
448             static_cast<unsigned short>(port + 0u)}
449                 )).detach();
450 
451     // Create async port
452     std::make_shared<async_listener>(
453         ioc,
454         tcp::endpoint{
455             address,
456             static_cast<unsigned short>(port + 1u)})->run();
457 
458     // Create coro port
459     boost::asio::spawn(ioc,
460         std::bind(
461             &do_coro_listen,
462             std::ref(ioc),
463             tcp::endpoint{
464                 address,
465                 static_cast<unsigned short>(port + 2u)},
466             std::placeholders::_1));
467 
468     // Run the I/O service on the requested number of threads
469     std::vector<std::thread> v;
470     v.reserve(threads - 1);
471     for(auto i = threads - 1; i > 0; --i)
472         v.emplace_back(
473         [&ioc]
474         {
475             ioc.run();
476         });
477     ioc.run();
478 
479     return EXIT_SUCCESS;
480 }
481