• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9 
10 //------------------------------------------------------------------------------
11 //
12 // Example: WebSocket server, stackless coroutine
13 //
14 //------------------------------------------------------------------------------
15 
16 #include <boost/beast/core.hpp>
17 #include <boost/beast/websocket.hpp>
18 #include <boost/asio/coroutine.hpp>
19 #include <boost/asio/strand.hpp>
20 #include <boost/asio/dispatch.hpp>
21 #include <algorithm>
22 #include <cstdlib>
23 #include <functional>
24 #include <iostream>
25 #include <memory>
26 #include <string>
27 #include <thread>
28 #include <vector>
29 
30 namespace beast = boost::beast;         // from <boost/beast.hpp>
31 namespace http = beast::http;           // from <boost/beast/http.hpp>
32 namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
33 namespace net = boost::asio;            // from <boost/asio.hpp>
34 using tcp = boost::asio::ip::tcp;       // from <boost/asio/ip/tcp.hpp>
35 
36 //------------------------------------------------------------------------------
37 
38 // Report a failure
39 void
fail(beast::error_code ec,char const * what)40 fail(beast::error_code ec, char const* what)
41 {
42     std::cerr << what << ": " << ec.message() << "\n";
43 }
44 
45 // Echoes back all received WebSocket messages
46 class session
47     : public boost::asio::coroutine
48     , public std::enable_shared_from_this<session>
49 {
50     websocket::stream<beast::tcp_stream> ws_;
51     beast::flat_buffer buffer_;
52 
53 public:
54     // Take ownership of the socket
55     explicit
session(tcp::socket socket)56     session(tcp::socket socket)
57         : ws_(std::move(socket))
58     {
59     }
60 
61     // Start the asynchronous operation
62     void
run()63     run()
64     {
65         // We need to be executing within a strand to perform async operations
66         // on the I/O objects in this session. Although not strictly necessary
67         // for single-threaded contexts, this example code is written to be
68         // thread-safe by default.
69         net::dispatch(ws_.get_executor(),
70                       beast::bind_front_handler(&session::loop,
71                                                 shared_from_this(),
72                                                 beast::error_code{},
73                                                 0));
74     }
75 
76     #include <boost/asio/yield.hpp>
77 
78     void
loop(beast::error_code ec,std::size_t bytes_transferred)79     loop(
80         beast::error_code ec,
81         std::size_t bytes_transferred)
82     {
83         boost::ignore_unused(bytes_transferred);
84         reenter(*this)
85         {
86             // Set suggested timeout settings for the websocket
87             ws_.set_option(
88                 websocket::stream_base::timeout::suggested(
89                     beast::role_type::server));
90 
91             // Set a decorator to change the Server of the handshake
92             ws_.set_option(websocket::stream_base::decorator(
93                 [](websocket::response_type& res)
94                 {
95                     res.set(http::field::server,
96                         std::string(BOOST_BEAST_VERSION_STRING) +
97                             " websocket-server-stackless");
98                 }));
99 
100             // Accept the websocket handshake
101             yield ws_.async_accept(
102                 std::bind(
103                     &session::loop,
104                     shared_from_this(),
105                     std::placeholders::_1,
106                     0));
107             if(ec)
108                 return fail(ec, "accept");
109 
110             for(;;)
111             {
112                 // Read a message into our buffer
113                 yield ws_.async_read(
114                     buffer_,
115                     std::bind(
116                         &session::loop,
117                         shared_from_this(),
118                         std::placeholders::_1,
119                         std::placeholders::_2));
120                 if(ec == websocket::error::closed)
121                 {
122                     // This indicates that the session was closed
123                     return;
124                 }
125                 if(ec)
126                     fail(ec, "read");
127 
128                 // Echo the message
129                 ws_.text(ws_.got_text());
130                 yield ws_.async_write(
131                     buffer_.data(),
132                     std::bind(
133                         &session::loop,
134                         shared_from_this(),
135                         std::placeholders::_1,
136                         std::placeholders::_2));
137                 if(ec)
138                     return fail(ec, "write");
139 
140                 // Clear the buffer
141                 buffer_.consume(buffer_.size());
142             }
143         }
144     }
145 
146     #include <boost/asio/unyield.hpp>
147 };
148 
149 //------------------------------------------------------------------------------
150 
151 // Accepts incoming connections and launches the sessions
152 class listener
153     : public boost::asio::coroutine
154     , public std::enable_shared_from_this<listener>
155 {
156     net::io_context& ioc_;
157     tcp::acceptor acceptor_;
158     tcp::socket socket_;
159 
160 public:
listener(net::io_context & ioc,tcp::endpoint endpoint)161     listener(
162         net::io_context& ioc,
163         tcp::endpoint endpoint)
164         : ioc_(ioc)
165         , acceptor_(net::make_strand(ioc))
166         , socket_(net::make_strand(ioc))
167     {
168         beast::error_code ec;
169 
170         // Open the acceptor
171         acceptor_.open(endpoint.protocol(), ec);
172         if(ec)
173         {
174             fail(ec, "open");
175             return;
176         }
177 
178         // Allow address reuse
179         acceptor_.set_option(net::socket_base::reuse_address(true), ec);
180         if(ec)
181         {
182             fail(ec, "set_option");
183             return;
184         }
185 
186         // Bind to the server address
187         acceptor_.bind(endpoint, ec);
188         if(ec)
189         {
190             fail(ec, "bind");
191             return;
192         }
193 
194         // Start listening for connections
195         acceptor_.listen(
196             net::socket_base::max_listen_connections, ec);
197         if(ec)
198         {
199             fail(ec, "listen");
200             return;
201         }
202     }
203 
204     // Start accepting incoming connections
205     void
run()206     run()
207     {
208         loop();
209     }
210 
211 private:
212 
213     #include <boost/asio/yield.hpp>
214 
215     void
loop(beast::error_code ec={})216     loop(beast::error_code ec = {})
217     {
218         reenter(*this)
219         {
220             for(;;)
221             {
222                 yield acceptor_.async_accept(
223                     socket_,
224                     std::bind(
225                         &listener::loop,
226                         shared_from_this(),
227                         std::placeholders::_1));
228                 if(ec)
229                 {
230                     fail(ec, "accept");
231                 }
232                 else
233                 {
234                     // Create the session and run it
235                     std::make_shared<session>(std::move(socket_))->run();
236                 }
237 
238                 // Make sure each session gets its own strand
239                 socket_ = tcp::socket(net::make_strand(ioc_));
240             }
241         }
242     }
243 
244     #include <boost/asio/unyield.hpp>
245 };
246 
247 //------------------------------------------------------------------------------
248 
main(int argc,char * argv[])249 int main(int argc, char* argv[])
250 {
251     // Check command line arguments.
252     if (argc != 4)
253     {
254         std::cerr <<
255             "Usage: websocket-server-stackless <address> <port> <threads>\n" <<
256             "Example:\n" <<
257             "    websocket-server-stackless 0.0.0.0 8080 1\n";
258         return EXIT_FAILURE;
259     }
260     auto const address = net::ip::make_address(argv[1]);
261     auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
262     auto const threads = std::max<int>(1, std::atoi(argv[3]));
263 
264     // The io_context is required for all I/O
265     net::io_context ioc{threads};
266 
267     // Create and launch a listening port
268     std::make_shared<listener>(ioc, tcp::endpoint{address, port})->run();
269 
270     // Run the I/O service on the requested number of threads
271     std::vector<std::thread> v;
272     v.reserve(threads - 1);
273     for(auto i = threads - 1; i > 0; --i)
274         v.emplace_back(
275         [&ioc]
276         {
277             ioc.run();
278         });
279     ioc.run();
280 
281     return EXIT_SUCCESS;
282 }
283