1 //
2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9
10 //------------------------------------------------------------------------------
11 //
12 // Example: WebSocket server, stackless coroutine
13 //
14 //------------------------------------------------------------------------------
15
16 #include <boost/beast/core.hpp>
17 #include <boost/beast/websocket.hpp>
18 #include <boost/asio/coroutine.hpp>
19 #include <boost/asio/strand.hpp>
20 #include <boost/asio/dispatch.hpp>
21 #include <algorithm>
22 #include <cstdlib>
23 #include <functional>
24 #include <iostream>
25 #include <memory>
26 #include <string>
27 #include <thread>
28 #include <vector>
29
30 namespace beast = boost::beast; // from <boost/beast.hpp>
31 namespace http = beast::http; // from <boost/beast/http.hpp>
32 namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
33 namespace net = boost::asio; // from <boost/asio.hpp>
34 using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
35
36 //------------------------------------------------------------------------------
37
38 // Report a failure
39 void
fail(beast::error_code ec,char const * what)40 fail(beast::error_code ec, char const* what)
41 {
42 std::cerr << what << ": " << ec.message() << "\n";
43 }
44
45 // Echoes back all received WebSocket messages
46 class session
47 : public boost::asio::coroutine
48 , public std::enable_shared_from_this<session>
49 {
50 websocket::stream<beast::tcp_stream> ws_;
51 beast::flat_buffer buffer_;
52
53 public:
54 // Take ownership of the socket
55 explicit
session(tcp::socket socket)56 session(tcp::socket socket)
57 : ws_(std::move(socket))
58 {
59 }
60
61 // Start the asynchronous operation
62 void
run()63 run()
64 {
65 // We need to be executing within a strand to perform async operations
66 // on the I/O objects in this session. Although not strictly necessary
67 // for single-threaded contexts, this example code is written to be
68 // thread-safe by default.
69 net::dispatch(ws_.get_executor(),
70 beast::bind_front_handler(&session::loop,
71 shared_from_this(),
72 beast::error_code{},
73 0));
74 }
75
76 #include <boost/asio/yield.hpp>
77
78 void
loop(beast::error_code ec,std::size_t bytes_transferred)79 loop(
80 beast::error_code ec,
81 std::size_t bytes_transferred)
82 {
83 boost::ignore_unused(bytes_transferred);
84 reenter(*this)
85 {
86 // Set suggested timeout settings for the websocket
87 ws_.set_option(
88 websocket::stream_base::timeout::suggested(
89 beast::role_type::server));
90
91 // Set a decorator to change the Server of the handshake
92 ws_.set_option(websocket::stream_base::decorator(
93 [](websocket::response_type& res)
94 {
95 res.set(http::field::server,
96 std::string(BOOST_BEAST_VERSION_STRING) +
97 " websocket-server-stackless");
98 }));
99
100 // Accept the websocket handshake
101 yield ws_.async_accept(
102 std::bind(
103 &session::loop,
104 shared_from_this(),
105 std::placeholders::_1,
106 0));
107 if(ec)
108 return fail(ec, "accept");
109
110 for(;;)
111 {
112 // Read a message into our buffer
113 yield ws_.async_read(
114 buffer_,
115 std::bind(
116 &session::loop,
117 shared_from_this(),
118 std::placeholders::_1,
119 std::placeholders::_2));
120 if(ec == websocket::error::closed)
121 {
122 // This indicates that the session was closed
123 return;
124 }
125 if(ec)
126 fail(ec, "read");
127
128 // Echo the message
129 ws_.text(ws_.got_text());
130 yield ws_.async_write(
131 buffer_.data(),
132 std::bind(
133 &session::loop,
134 shared_from_this(),
135 std::placeholders::_1,
136 std::placeholders::_2));
137 if(ec)
138 return fail(ec, "write");
139
140 // Clear the buffer
141 buffer_.consume(buffer_.size());
142 }
143 }
144 }
145
146 #include <boost/asio/unyield.hpp>
147 };
148
149 //------------------------------------------------------------------------------
150
151 // Accepts incoming connections and launches the sessions
152 class listener
153 : public boost::asio::coroutine
154 , public std::enable_shared_from_this<listener>
155 {
156 net::io_context& ioc_;
157 tcp::acceptor acceptor_;
158 tcp::socket socket_;
159
160 public:
listener(net::io_context & ioc,tcp::endpoint endpoint)161 listener(
162 net::io_context& ioc,
163 tcp::endpoint endpoint)
164 : ioc_(ioc)
165 , acceptor_(net::make_strand(ioc))
166 , socket_(net::make_strand(ioc))
167 {
168 beast::error_code ec;
169
170 // Open the acceptor
171 acceptor_.open(endpoint.protocol(), ec);
172 if(ec)
173 {
174 fail(ec, "open");
175 return;
176 }
177
178 // Allow address reuse
179 acceptor_.set_option(net::socket_base::reuse_address(true), ec);
180 if(ec)
181 {
182 fail(ec, "set_option");
183 return;
184 }
185
186 // Bind to the server address
187 acceptor_.bind(endpoint, ec);
188 if(ec)
189 {
190 fail(ec, "bind");
191 return;
192 }
193
194 // Start listening for connections
195 acceptor_.listen(
196 net::socket_base::max_listen_connections, ec);
197 if(ec)
198 {
199 fail(ec, "listen");
200 return;
201 }
202 }
203
204 // Start accepting incoming connections
205 void
run()206 run()
207 {
208 loop();
209 }
210
211 private:
212
213 #include <boost/asio/yield.hpp>
214
215 void
loop(beast::error_code ec={})216 loop(beast::error_code ec = {})
217 {
218 reenter(*this)
219 {
220 for(;;)
221 {
222 yield acceptor_.async_accept(
223 socket_,
224 std::bind(
225 &listener::loop,
226 shared_from_this(),
227 std::placeholders::_1));
228 if(ec)
229 {
230 fail(ec, "accept");
231 }
232 else
233 {
234 // Create the session and run it
235 std::make_shared<session>(std::move(socket_))->run();
236 }
237
238 // Make sure each session gets its own strand
239 socket_ = tcp::socket(net::make_strand(ioc_));
240 }
241 }
242 }
243
244 #include <boost/asio/unyield.hpp>
245 };
246
247 //------------------------------------------------------------------------------
248
main(int argc,char * argv[])249 int main(int argc, char* argv[])
250 {
251 // Check command line arguments.
252 if (argc != 4)
253 {
254 std::cerr <<
255 "Usage: websocket-server-stackless <address> <port> <threads>\n" <<
256 "Example:\n" <<
257 " websocket-server-stackless 0.0.0.0 8080 1\n";
258 return EXIT_FAILURE;
259 }
260 auto const address = net::ip::make_address(argv[1]);
261 auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
262 auto const threads = std::max<int>(1, std::atoi(argv[3]));
263
264 // The io_context is required for all I/O
265 net::io_context ioc{threads};
266
267 // Create and launch a listening port
268 std::make_shared<listener>(ioc, tcp::endpoint{address, port})->run();
269
270 // Run the I/O service on the requested number of threads
271 std::vector<std::thread> v;
272 v.reserve(threads - 1);
273 for(auto i = threads - 1; i > 0; --i)
274 v.emplace_back(
275 [&ioc]
276 {
277 ioc.run();
278 });
279 ioc.run();
280
281 return EXIT_SUCCESS;
282 }
283