1 //
2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9
10 //------------------------------------------------------------------------------
11 //
12 // wsload
13 //
14 // Measure the performance of a WebSocket server
15 //
16 //------------------------------------------------------------------------------
17
18 #include <boost/beast/core.hpp>
19 #include <boost/beast/websocket.hpp>
20 #include <boost/beast/_experimental/unit_test/dstream.hpp>
21 #include <boost/asio.hpp>
22 #include <atomic>
23 #include <chrono>
24 #include <cstdlib>
25 #include <functional>
26 #include <iostream>
27 #include <memory>
28 #include <mutex>
29 #include <random>
30 #include <thread>
31 #include <vector>
32
33 namespace beast = boost::beast; // from <boost/beast.hpp>
34 namespace http = beast::http; // from <boost/beast/http.hpp>
35 namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
36 namespace net = boost::asio; // from <boost/asio.hpp>
37 using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
38
39 class test_buffer
40 {
41 char data_[4096];
42 net::const_buffer b_;
43
44 public:
45 using const_iterator =
46 net::const_buffer const*;
47
48 using value_type = net::const_buffer;
49
test_buffer()50 test_buffer()
51 : b_(data_, sizeof(data_))
52 {
53 std::mt19937_64 rng;
54 std::uniform_int_distribution<unsigned short> dist;
55 for(auto& c : data_)
56 c = static_cast<unsigned char>(dist(rng));
57 }
58
59 const_iterator
begin() const60 begin() const
61 {
62 return &b_;
63 }
64
65 const_iterator
end() const66 end() const
67 {
68 return begin() + 1;
69 }
70 };
71
72 class report
73 {
74 std::mutex m_;
75 std::size_t bytes_ = 0;
76 std::size_t messages_ = 0;
77
78 public:
79 void
insert(std::size_t messages,std::size_t bytes)80 insert(std::size_t messages, std::size_t bytes)
81 {
82 std::lock_guard<std::mutex> lock(m_);
83 bytes_ += bytes;
84 messages_ += messages;
85 }
86
87 std::size_t
bytes() const88 bytes() const
89 {
90 return bytes_;
91 }
92
93 std::size_t
messages() const94 messages() const
95 {
96 return messages_;
97 }
98 };
99
100 void
fail(beast::error_code ec,char const * what)101 fail(beast::error_code ec, char const* what)
102 {
103 std::cerr << what << ": " << ec.message() << "\n";
104 }
105
106 class connection
107 : public std::enable_shared_from_this<connection>
108 {
109 websocket::stream<tcp::socket> ws_;
110 tcp::endpoint ep_;
111 std::size_t messages_;
112 report& rep_;
113 test_buffer const& tb_;
114 net::strand<
115 net::io_context::executor_type> strand_;
116 beast::flat_buffer buffer_;
117 std::mt19937_64 rng_;
118 std::size_t count_ = 0;
119 std::size_t bytes_ = 0;
120
121 public:
connection(net::io_context & ioc,tcp::endpoint const & ep,std::size_t messages,bool deflate,report & rep,test_buffer const & tb)122 connection(
123 net::io_context& ioc,
124 tcp::endpoint const& ep,
125 std::size_t messages,
126 bool deflate,
127 report& rep,
128 test_buffer const& tb)
129 : ws_(ioc)
130 , ep_(ep)
131 , messages_(messages)
132 , rep_(rep)
133 , tb_(tb)
134 , strand_(ioc.get_executor())
135 {
136 websocket::permessage_deflate pmd;
137 pmd.client_enable = deflate;
138 ws_.set_option(pmd);
139 ws_.binary(true);
140 ws_.auto_fragment(false);
141 ws_.write_buffer_bytes(64 * 1024);
142 }
143
~connection()144 ~connection()
145 {
146 rep_.insert(count_, bytes_);
147 }
148
149 void
run()150 run()
151 {
152 ws_.next_layer().async_connect(ep_,
153 beast::bind_front_handler(
154 &connection::on_connect,
155 this->shared_from_this()));
156 }
157
158 private:
159 void
on_connect(beast::error_code ec)160 on_connect(beast::error_code ec)
161 {
162 if(ec)
163 return fail(ec, "on_connect");
164
165 ws_.async_handshake(
166 ep_.address().to_string() + ":" + std::to_string(ep_.port()),
167 "/",
168 beast::bind_front_handler(
169 &connection::on_handshake,
170 this->shared_from_this()));
171 }
172
173 void
on_handshake(beast::error_code ec)174 on_handshake(beast::error_code ec)
175 {
176 if(ec)
177 return fail(ec, "handshake");
178
179 do_write();
180 }
181
182 void
do_write()183 do_write()
184 {
185 std::geometric_distribution<std::size_t> dist{
186 double(4) / beast::buffer_bytes(tb_)};
187 ws_.async_write_some(true,
188 beast::buffers_prefix(dist(rng_), tb_),
189 beast::bind_front_handler(
190 &connection::on_write,
191 this->shared_from_this()));
192 }
193
194 void
on_write(beast::error_code ec,std::size_t)195 on_write(beast::error_code ec, std::size_t)
196 {
197 if(ec)
198 return fail(ec, "write");
199
200 if(messages_--)
201 return do_read();
202
203 ws_.async_close({},
204 beast::bind_front_handler(
205 &connection::on_close,
206 this->shared_from_this()));
207 }
208
209 void
do_read()210 do_read()
211 {
212 ws_.async_read(buffer_,
213 beast::bind_front_handler(
214 &connection::on_read,
215 this->shared_from_this()));
216 }
217
218 void
on_read(beast::error_code ec,std::size_t)219 on_read(beast::error_code ec, std::size_t)
220 {
221 if(ec)
222 return fail(ec, "read");
223
224 ++count_;
225 bytes_ += buffer_.size();
226 buffer_.consume(buffer_.size());
227 do_write();
228 }
229
230 void
on_close(beast::error_code ec)231 on_close(beast::error_code ec)
232 {
233 if(ec)
234 return fail(ec, "close");
235 }
236 };
237
238 class timer
239 {
240 using clock_type =
241 std::chrono::system_clock;
242
243 clock_type::time_point when_;
244
245 public:
246 using duration =
247 clock_type::duration;
248
timer()249 timer()
250 : when_(clock_type::now())
251 {
252 }
253
254 duration
elapsed() const255 elapsed() const
256 {
257 return clock_type::now() - when_;
258 }
259 };
260
261 inline
262 std::uint64_t
throughput(std::chrono::duration<double> const & elapsed,std::uint64_t items)263 throughput(
264 std::chrono::duration<double> const& elapsed,
265 std::uint64_t items)
266 {
267 using namespace std::chrono;
268 return static_cast<std::uint64_t>(
269 1 / (elapsed/items).count());
270 }
271
272 int
main(int argc,char ** argv)273 main(int argc, char** argv)
274 {
275 beast::unit_test::dstream dout(std::cerr);
276
277 try
278 {
279 // Check command line arguments.
280 if(argc != 8)
281 {
282 std::cerr <<
283 "Usage: bench-wsload <address> <port> <trials> <messages> <workers> <threads> <compression:0|1>";
284 return EXIT_FAILURE;
285 }
286
287 auto const address = net::ip::make_address(argv[1]);
288 auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
289 auto const trials = static_cast<std::size_t>(std::atoi(argv[3]));
290 auto const messages= static_cast<std::size_t>(std::atoi(argv[4]));
291 auto const workers = static_cast<std::size_t>(std::atoi(argv[5]));
292 auto const threads = static_cast<std::size_t>(std::atoi(argv[6]));
293 auto const deflate = std::atoi(argv[7]) != 0;
294 auto const work = (messages + workers - 1) / workers;
295 test_buffer tb;
296 for(auto i = trials; i != 0; --i)
297 {
298 report rep;
299 net::io_context ioc{1};
300 for(auto j = workers; j; --j)
301 {
302 auto sp =
303 std::make_shared<connection>(
304 ioc,
305 tcp::endpoint{address, port},
306 work,
307 deflate,
308 rep,
309 tb);
310 sp->run();
311 }
312 timer clock;
313 std::vector<std::thread> tv;
314 if(threads > 1)
315 {
316 tv.reserve(threads);
317 tv.emplace_back([&ioc]{ ioc.run(); });
318 }
319 ioc.run();
320 for(auto& t : tv)
321 t.join();
322 auto const elapsed = clock.elapsed();
323 dout <<
324 throughput(elapsed, rep.bytes()) << " bytes/s in " <<
325 (std::chrono::duration_cast<
326 std::chrono::milliseconds>(
327 elapsed).count() / 1000.) << "ms and " <<
328 rep.bytes() << " bytes" << std::endl;
329 }
330 }
331 catch(std::exception const& e)
332 {
333 std::cerr << "Error: " << e.what() << std::endl;
334 return EXIT_FAILURE;
335 }
336
337 return EXIT_SUCCESS;
338 }
339