• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9 
10 //------------------------------------------------------------------------------
11 //
12 // wsload
13 //
14 //  Measure the performance of a WebSocket server
15 //
16 //------------------------------------------------------------------------------
17 
18 #include <boost/beast/core.hpp>
19 #include <boost/beast/websocket.hpp>
20 #include <boost/beast/_experimental/unit_test/dstream.hpp>
21 #include <boost/asio.hpp>
22 #include <atomic>
23 #include <chrono>
24 #include <cstdlib>
25 #include <functional>
26 #include <iostream>
27 #include <memory>
28 #include <mutex>
29 #include <random>
30 #include <thread>
31 #include <vector>
32 
33 namespace beast = boost::beast;         // from <boost/beast.hpp>
34 namespace http = beast::http;           // from <boost/beast/http.hpp>
35 namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
36 namespace net = boost::asio;            // from <boost/asio.hpp>
37 using tcp = boost::asio::ip::tcp;       // from <boost/asio/ip/tcp.hpp>
38 
39 class test_buffer
40 {
41     char data_[4096];
42     net::const_buffer b_;
43 
44 public:
45     using const_iterator =
46         net::const_buffer const*;
47 
48     using value_type = net::const_buffer;
49 
test_buffer()50     test_buffer()
51         : b_(data_, sizeof(data_))
52     {
53         std::mt19937_64 rng;
54         std::uniform_int_distribution<unsigned short> dist;
55         for(auto& c : data_)
56             c = static_cast<unsigned char>(dist(rng));
57     }
58 
59     const_iterator
begin() const60     begin() const
61     {
62         return &b_;
63     }
64 
65     const_iterator
end() const66     end() const
67     {
68         return begin() + 1;
69     }
70 };
71 
72 class report
73 {
74     std::mutex m_;
75     std::size_t bytes_ = 0;
76     std::size_t messages_ = 0;
77 
78 public:
79     void
insert(std::size_t messages,std::size_t bytes)80     insert(std::size_t messages, std::size_t bytes)
81     {
82         std::lock_guard<std::mutex> lock(m_);
83         bytes_ += bytes;
84         messages_ += messages;
85     }
86 
87     std::size_t
bytes() const88     bytes() const
89     {
90         return bytes_;
91     }
92 
93     std::size_t
messages() const94     messages() const
95     {
96         return messages_;
97     }
98 };
99 
100 void
fail(beast::error_code ec,char const * what)101 fail(beast::error_code ec, char const* what)
102 {
103     std::cerr << what << ": " << ec.message() << "\n";
104 }
105 
106 class connection
107     : public std::enable_shared_from_this<connection>
108 {
109     websocket::stream<tcp::socket> ws_;
110     tcp::endpoint ep_;
111     std::size_t messages_;
112     report& rep_;
113     test_buffer const& tb_;
114     net::strand<
115         net::io_context::executor_type> strand_;
116     beast::flat_buffer buffer_;
117     std::mt19937_64 rng_;
118     std::size_t count_ = 0;
119     std::size_t bytes_ = 0;
120 
121 public:
connection(net::io_context & ioc,tcp::endpoint const & ep,std::size_t messages,bool deflate,report & rep,test_buffer const & tb)122     connection(
123         net::io_context& ioc,
124         tcp::endpoint const& ep,
125         std::size_t messages,
126         bool deflate,
127         report& rep,
128         test_buffer const& tb)
129         : ws_(ioc)
130         , ep_(ep)
131         , messages_(messages)
132         , rep_(rep)
133         , tb_(tb)
134         , strand_(ioc.get_executor())
135     {
136         websocket::permessage_deflate pmd;
137         pmd.client_enable = deflate;
138         ws_.set_option(pmd);
139         ws_.binary(true);
140         ws_.auto_fragment(false);
141         ws_.write_buffer_bytes(64 * 1024);
142     }
143 
~connection()144     ~connection()
145     {
146         rep_.insert(count_, bytes_);
147     }
148 
149     void
run()150     run()
151     {
152         ws_.next_layer().async_connect(ep_,
153             beast::bind_front_handler(
154                 &connection::on_connect,
155                 this->shared_from_this()));
156     }
157 
158 private:
159     void
on_connect(beast::error_code ec)160     on_connect(beast::error_code ec)
161     {
162         if(ec)
163             return fail(ec, "on_connect");
164 
165         ws_.async_handshake(
166             ep_.address().to_string() + ":" + std::to_string(ep_.port()),
167             "/",
168             beast::bind_front_handler(
169                 &connection::on_handshake,
170                 this->shared_from_this()));
171     }
172 
173     void
on_handshake(beast::error_code ec)174     on_handshake(beast::error_code ec)
175     {
176         if(ec)
177             return fail(ec, "handshake");
178 
179         do_write();
180     }
181 
182     void
do_write()183     do_write()
184     {
185         std::geometric_distribution<std::size_t> dist{
186             double(4) / beast::buffer_bytes(tb_)};
187         ws_.async_write_some(true,
188             beast::buffers_prefix(dist(rng_), tb_),
189             beast::bind_front_handler(
190                 &connection::on_write,
191                 this->shared_from_this()));
192     }
193 
194     void
on_write(beast::error_code ec,std::size_t)195     on_write(beast::error_code ec, std::size_t)
196     {
197         if(ec)
198             return fail(ec, "write");
199 
200         if(messages_--)
201             return do_read();
202 
203         ws_.async_close({},
204             beast::bind_front_handler(
205                 &connection::on_close,
206                 this->shared_from_this()));
207     }
208 
209     void
do_read()210     do_read()
211     {
212         ws_.async_read(buffer_,
213             beast::bind_front_handler(
214                 &connection::on_read,
215                 this->shared_from_this()));
216     }
217 
218     void
on_read(beast::error_code ec,std::size_t)219     on_read(beast::error_code ec, std::size_t)
220     {
221         if(ec)
222             return fail(ec, "read");
223 
224         ++count_;
225         bytes_ += buffer_.size();
226         buffer_.consume(buffer_.size());
227         do_write();
228     }
229 
230     void
on_close(beast::error_code ec)231     on_close(beast::error_code ec)
232     {
233         if(ec)
234             return fail(ec, "close");
235     }
236 };
237 
238 class timer
239 {
240     using clock_type =
241         std::chrono::system_clock;
242 
243     clock_type::time_point when_;
244 
245 public:
246     using duration =
247         clock_type::duration;
248 
timer()249     timer()
250         : when_(clock_type::now())
251     {
252     }
253 
254     duration
elapsed() const255     elapsed() const
256     {
257         return clock_type::now() - when_;
258     }
259 };
260 
261 inline
262 std::uint64_t
throughput(std::chrono::duration<double> const & elapsed,std::uint64_t items)263 throughput(
264     std::chrono::duration<double> const& elapsed,
265     std::uint64_t items)
266 {
267     using namespace std::chrono;
268     return static_cast<std::uint64_t>(
269         1 / (elapsed/items).count());
270 }
271 
272 int
main(int argc,char ** argv)273 main(int argc, char** argv)
274 {
275     beast::unit_test::dstream dout(std::cerr);
276 
277     try
278     {
279         // Check command line arguments.
280         if(argc != 8)
281         {
282             std::cerr <<
283                 "Usage: bench-wsload <address> <port> <trials> <messages> <workers> <threads> <compression:0|1>";
284             return EXIT_FAILURE;
285         }
286 
287         auto const address = net::ip::make_address(argv[1]);
288         auto const port    = static_cast<unsigned short>(std::atoi(argv[2]));
289         auto const trials  = static_cast<std::size_t>(std::atoi(argv[3]));
290         auto const messages= static_cast<std::size_t>(std::atoi(argv[4]));
291         auto const workers = static_cast<std::size_t>(std::atoi(argv[5]));
292         auto const threads = static_cast<std::size_t>(std::atoi(argv[6]));
293         auto const deflate = std::atoi(argv[7]) != 0;
294         auto const work = (messages + workers - 1) / workers;
295         test_buffer tb;
296         for(auto i = trials; i != 0; --i)
297         {
298             report rep;
299             net::io_context ioc{1};
300             for(auto j = workers; j; --j)
301             {
302                 auto sp =
303                 std::make_shared<connection>(
304                     ioc,
305                     tcp::endpoint{address, port},
306                     work,
307                     deflate,
308                     rep,
309                     tb);
310                 sp->run();
311             }
312             timer clock;
313             std::vector<std::thread> tv;
314             if(threads > 1)
315             {
316                 tv.reserve(threads);
317                 tv.emplace_back([&ioc]{ ioc.run(); });
318             }
319             ioc.run();
320             for(auto& t : tv)
321                 t.join();
322             auto const elapsed = clock.elapsed();
323             dout <<
324                 throughput(elapsed, rep.bytes()) << " bytes/s in " <<
325                 (std::chrono::duration_cast<
326                     std::chrono::milliseconds>(
327                     elapsed).count() / 1000.) << "ms and " <<
328                 rep.bytes() << " bytes" << std::endl;
329         }
330     }
331     catch(std::exception const& e)
332     {
333         std::cerr << "Error: " << e.what() << std::endl;
334         return EXIT_FAILURE;
335     }
336 
337     return EXIT_SUCCESS;
338 }
339