1 //
2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9
10 #include "snippets.hpp"
11
12 #include <boost/beast/_experimental/unit_test/suite.hpp>
13 #include <boost/beast/_experimental/test/stream.hpp>
14 #include <boost/beast/core/async_base.hpp>
15 #include <boost/beast/core/buffers_prefix.hpp>
16 #include <boost/beast/core/error.hpp>
17 #include <boost/beast/core/flat_buffer.hpp>
18 #include <boost/beast/core/stream_traits.hpp>
19 #include <boost/beast/core/tcp_stream.hpp>
20 #include <boost/beast/http.hpp>
21 #include <boost/beast/ssl/ssl_stream.hpp>
22 #include <boost/asio/buffer.hpp>
23 #include <boost/asio/read.hpp>
24 #include <boost/asio/spawn.hpp>
25 #include <cstdlib>
26 #include <utility>
27 #include <string>
28
29 namespace boost {
30 namespace beast {
31
32 namespace {
33
34 struct handler_type
35 {
36 template<class... Args>
operator ()boost::beast::__anon17063ce90111::handler_type37 void operator()(Args&&...)
38 {
39 }
40 };
41
42 void
core_3_timeouts_snippets()43 core_3_timeouts_snippets()
44 {
45 handler_type handler;
46
47 #include "snippets.ipp"
48
49 {
50 //[code_core_3_timeouts_1
51
52 // `ioc` will be used to dispatch completion handlers
53 tcp_stream stream(ioc);
54
55 //]
56 }
57
58 {
59 //[code_core_3_timeouts_2
60
61 // The resolver is used to look up the IP addresses for a domain name
62 net::ip::tcp::resolver resolver(ioc);
63
64 // The stream will use the same executor as the resolver
65 tcp_stream stream(resolver.get_executor());
66
67 //]
68 }
69
70 {
71 //[code_core_3_timeouts_3
72
73 // The strand will be used to invoke all completion handlers
74 tcp_stream stream(net::make_strand(ioc));
75
76 //]
77
78 net::ip::tcp::resolver resolver(ioc);
79
80 //[code_core_3_timeouts_4
81
82 // Set the logical operation timer to 30 seconds
83 stream.expires_after (std::chrono::seconds(30));
84
85 // If the connection is not established within 30 seconds,
86 // the operation will be canceled and the handler will receive
87 // error::timeout as the error code.
88
89 stream.async_connect(resolver.resolve("www.example.com", "http"),
90 [](error_code ec, net::ip::tcp::endpoint ep)
91 {
92 if(ec == error::timeout)
93 std::cerr << "async_connect took too long\n";
94 else if(! ec)
95 std::cout << "Connected to " << ep << "\n";
96 }
97 );
98
99 // The timer is still running. If we don't want the next
100 // operation to time out 30 seconds relative to the previous
101 // call to `expires_after`, we need to turn it off before
102 // starting another asynchronous operation.
103
104 stream.expires_never();
105
106 //]
107 }
108
109 {
110 //[code_core_3_timeouts_5
111
112 // The acceptor is used to listen and accept incoming connections.
113 // We construct the acceptor to use a new strand, and listen
114 // on the loopback address with an operating-system assigned port.
115
116 net::ip::tcp::acceptor acceptor(net::make_strand(ioc));
117 acceptor.bind(net::ip::tcp::endpoint(net::ip::make_address_v4("127.0.0.1"), 0));
118 acceptor.listen(0);
119
120 // This blocks until a new incoming connection is established.
121 // Upon success, the function returns a new socket which is
122 // connected to the peer. The socket will have its own executor,
123 // which in the call below is a new strand for the I/O context.
124
125 net::ip::tcp::socket s = acceptor.accept(net::make_strand(ioc));
126
127 // Construct a new tcp_stream from the connected socket.
128 // The stream will use the strand created when the connection
129 // was accepted.
130
131 tcp_stream stream(std::move(s));
132 //]
133 }
134
135 {
136 tcp_stream stream(ioc);
137
138 //[code_core_3_timeouts_6
139
140 std::string s;
141
142 // Set the logical operation timer to 30 seconds.
143 stream.expires_after (std::chrono::seconds(30));
144
145 // Read a line from the stream into the string.
146 net::async_read_until(stream, net::dynamic_buffer(s), '\n',
147 [&s, &stream](error_code ec, std::size_t bytes_transferred)
148 {
149 if(ec)
150 return;
151
152 // read_until can read past the '\n', these will end up in
153 // our buffer but we don't want to echo those extra received
154 // bytes. `bytes_transferred` will be the number of bytes
155 // up to and including the '\n'. We use `buffers_prefix` so
156 // that extra data is not written.
157
158 net::async_write(stream, buffers_prefix(bytes_transferred, net::buffer(s)),
159 [&s](error_code ec, std::size_t bytes_transferred)
160 {
161 // Consume the line from the buffer
162 s.erase(s.begin(), s.begin() + bytes_transferred);
163
164 if(ec)
165 std::cerr << "Error: " << ec.message() << "\n";
166 });
167 });
168 //]
169 }
170
171 {
172 tcp_stream stream(ioc);
173
174 //[code_core_3_timeouts_7
175
176 std::string s1;
177 std::string s2;
178
179 // Set the logical operation timer to 15 seconds.
180 stream.expires_after (std::chrono::seconds(15));
181
182 // Read another line from the stream into our dynamic buffer.
183 // The operation will time out after 15 seconds.
184
185 net::async_read_until(stream, net::dynamic_buffer(s1), '\n', handler);
186
187 // Set the logical operation timer to 30 seconds.
188 stream.expires_after (std::chrono::seconds(30));
189
190 // Write the contents of the other buffer.
191 // This operation will time out after 30 seconds.
192
193 net::async_write(stream, net::buffer(s2), handler);
194
195 //]
196 }
197
198 {
199 //[code_core_3_timeouts_8
200
201 // To declare a stream with a rate policy, it is necessary to
202 // write out all of the template parameter types.
203 //
204 // `simple_rate_policy` is default constructible, but
205 // if the choice of RatePolicy is not DefaultConstructible,
206 // an instance of the type may be passed to the constructor.
207
208 basic_stream<net::ip::tcp, net::any_io_executor, simple_rate_policy> stream(ioc);
209
210 // The policy object, which is default constructed, or
211 // decay-copied upon construction, is attached to the stream
212 // and may be accessed through the function `rate_policy`.
213 //
214 // Here we set individual rate limits for reading and writing
215
216 stream.rate_policy().read_limit(10000); // bytes per second
217
218 stream.rate_policy().write_limit(850000); // bytes per second
219 //]
220 }
221 }
222
223 //[code_core_3_timeouts_1f
224
225 /** This function echoes back received lines from a peer, with a timeout.
226
227 The algorithm terminates upon any error (including timeout).
228 */
229 template <class Protocol, class Executor>
do_async_echo(basic_stream<Protocol,Executor> & stream)230 void do_async_echo (basic_stream<Protocol, Executor>& stream)
231 {
232 // This object will hold our state when reading the line.
233
234 struct echo_line
235 {
236 basic_stream<Protocol, Executor>& stream;
237
238 // The shared pointer is used to extend the lifetime of the
239 // string until the last asynchronous operation completes.
240 std::shared_ptr<std::string> s;
241
242 // This starts a new operation to read and echo a line
243 void operator()()
244 {
245 // If a line is not sent and received within 30 seconds, then
246 // the connection will be closed and this algorithm will terminate.
247
248 stream.expires_after(std::chrono::seconds(30));
249
250 // Read a line from the stream into our dynamic buffer, with a timeout
251 net::async_read_until(stream, net::dynamic_buffer(*s), '\n', std::move(*this));
252 }
253
254 // This function is called when the read completes
255 void operator()(error_code ec, std::size_t bytes_transferred)
256 {
257 if(ec)
258 return;
259
260 net::async_write(stream, buffers_prefix(bytes_transferred, net::buffer(*s)),
261 [this](error_code ec, std::size_t bytes_transferred)
262 {
263 s->erase(s->begin(), s->begin() + bytes_transferred);
264
265 if(! ec)
266 {
267 // Run this algorithm again
268 echo_line{stream, std::move(s)}();
269 }
270 else
271 {
272 std::cerr << "Error: " << ec.message() << "\n";
273 }
274 });
275 }
276 };
277
278 // Create the operation and run it
279 echo_line{stream, std::make_shared<std::string>()}();
280 }
281
282 //]
283
284 //[code_core_3_timeouts_2f
285
286 /** Request an HTTP resource from a TLS host and return it as a string, with a timeout.
287
288 This example uses fibers (stackful coroutines) and its own I/O context.
289 */
290 std::string
https_get(std::string const & host,std::string const & target,error_code & ec)291 https_get (std::string const& host, std::string const& target, error_code& ec)
292 {
293 // It is the responsibility of the algorithm to clear the error first.
294 ec = {};
295
296 // We use our own I/O context, to make this function blocking.
297 net::io_context ioc;
298
299 // This context is used to hold client and server certificates.
300 // We do not perform certificate verification in this example.
301
302 net::ssl::context ctx(net::ssl::context::tlsv12);
303
304 // This string will hold the body of the HTTP response, if any.
305 std::string result;
306
307 // Note that Networking TS does not come with spawn. This function
308 // launches a "fiber" which is a coroutine that has its own separately
309 // allocated stack.
310
311 boost::asio::spawn(ioc,
312 [&](boost::asio::yield_context yield)
313 {
314 // We use the Beast ssl_stream wrapped around a beast tcp_stream.
315 ssl_stream<tcp_stream> stream(ioc, ctx);
316
317 // The resolver will be used to look up the IP addresses for the host name
318 net::ip::tcp::resolver resolver(ioc);
319
320 // First, look up the name. Networking has its own timeout for this.
321 // The `yield` object is a CompletionToken which specializes the
322 // `net::async_result` customization point to make the fiber work.
323 //
324 // This call will appear to "block" until the operation completes.
325 // It isn't really blocking. Instead, the fiber implementation saves
326 // the call stack and suspends the function until the asynchronous
327 // operation is complete. Then it restores the call stack, and resumes
328 // the function to the statement following the async_resolve. This
329 // allows an asynchronous algorithm to be expressed synchronously.
330
331 auto const endpoints = resolver.async_resolve(host, "https", {}, yield[ec]);
332 if(ec)
333 return;
334
335 // The function `get_lowest_layer` retrieves the "bottom most" object
336 // in the stack of stream layers. In this case it will be the tcp_stream.
337 // This timeout will apply to all subsequent operations collectively.
338 // That is to say, they must all complete within the same 30 second
339 // window.
340
341 get_lowest_layer(stream).expires_after(std::chrono::seconds(30));
342
343 // `tcp_stream` range connect algorithms are member functions, unlike net::
344 get_lowest_layer(stream).async_connect(endpoints, yield[ec]);
345 if(ec)
346 return;
347
348 // Perform the TLS handshake
349 stream.async_handshake(net::ssl::stream_base::client, yield[ec]);
350 if(ec)
351 return;
352
353 // Send an HTTP GET request for the target
354 {
355 http::request<http::empty_body> req;
356 req.method(http::verb::get);
357 req.target(target);
358 req.version(11);
359 req.set(http::field::host, host);
360 req.set(http::field::user_agent, "Beast");
361 http::async_write(stream, req, yield[ec]);
362 if(ec)
363 return;
364 }
365
366 // Now read the response
367 flat_buffer buffer;
368 http::response<http::string_body> res;
369 http::async_read(stream, buffer, res, yield[ec]);
370 if(ec)
371 return;
372
373 // Try to perform the TLS shutdown handshake
374 stream.async_shutdown(yield[ec]);
375
376 // `net::ssl::error::stream_truncated`, also known as an SSL "short read",
377 // indicates the peer closed the connection without performing the
378 // required closing handshake (for example, Google does this to
379 // improve performance). Generally this can be a security issue,
380 // but if your communication protocol is self-terminated (as
381 // it is with both HTTP and WebSocket) then you may simply
382 // ignore the lack of close_notify:
383 //
384 // https://github.com/boostorg/beast/issues/38
385 //
386 // https://security.stackexchange.com/questions/91435/how-to-handle-a-malicious-ssl-tls-shutdown
387 //
388 // When a short read would cut off the end of an HTTP message,
389 // Beast returns the error beast::http::error::partial_message.
390 // Therefore, if we see a short read here, it has occurred
391 // after the message has been completed, so it is safe to ignore it.
392
393 if(ec == net::ssl::error::stream_truncated)
394 ec = {};
395 else if(ec)
396 return;
397
398 // Set the string to return to the caller
399 result = std::move(res.body());
400 });
401
402 // `run` will dispatch completion handlers, and block until there is
403 // no more "work" remaining. When this call returns, the operations
404 // are complete and we can give the caller the result.
405 ioc.run();
406
407 return result;
408 }
409
410 //]
411
412 //[code_core_3_timeouts_3f
413
414 class window
415 {
416 std::size_t value_ = 0;
417
418 // The size of the exponential window, in seconds.
419 // This should be a power of two.
420
421 static std::size_t constexpr Window = 4;
422
423 public:
424 /** Returns the number of elapsed seconds since the given time, and adjusts the time.
425
426 This function returns the number of elapsed seconds since the
427 specified time point, rounding down. It also moves the specified
428 time point forward by the number of elapsed seconds.
429
430 @param since The time point from which to calculate elapsed time.
431 The function will modify the value, by adding the number of elapsed
432 seconds to it.
433
434 @return The number of elapsed seconds.
435 */
436 template<class Clock, class Duration>
437 static
438 std::chrono::seconds
get_elapsed(std::chrono::time_point<Clock,Duration> & since)439 get_elapsed(std::chrono::time_point<Clock, Duration>& since) noexcept
440 {
441 auto const elapsed = std::chrono::duration_cast<
442 std::chrono::seconds>(Clock::now() - since);
443 since += elapsed;
444 return elapsed;
445 }
446
447 /// Returns the current value, after adding the given sample.
448 std::size_t
update(std::size_t sample,std::chrono::seconds elapsed)449 update(std::size_t sample, std::chrono::seconds elapsed) noexcept
450 {
451 // Apply exponential decay.
452 //
453 // This formula is fast (no division or multiplication) but inaccurate.
454 // It overshoots by `n*(1-a)/(1-a^n), where a=(window-1)/window`.
455 // Could be good enough for a rough approximation, but if relying
456 // on this for production please perform tests!
457
458 auto count = elapsed.count();
459 while(count--)
460 value_ -= (value_ + Window - 1) / Window;
461 value_ += sample;
462 return value_ / Window;
463 }
464 /// Returns the current value
465 std::size_t
value() const466 value() const noexcept
467 {
468 return value_ / Window;
469 }
470 };
471
472 //]
473
474 //[code_core_3_timeouts_4f
475
476 /** A RatePolicy to measure instantaneous throughput.
477
478 This measures the rate of transfer for reading and writing
479 using a simple exponential decay function.
480 */
481 class rate_gauge
482 {
483 // The clock used to measure elapsed time
484 using clock_type = std::chrono::steady_clock;
485
486 // This implements an exponential smoothing window function.
487 // The value `Seconds` is the size of the window in seconds.
488
489 clock_type::time_point when_;
490 std::size_t read_bytes_ = 0;
491 std::size_t write_bytes_ = 0;
492 window read_window_;
493 window write_window_;
494
495 // Friending this type allows us to mark the
496 // member functions required by RatePolicy as private.
497 friend class rate_policy_access;
498
499 // Returns the number of bytes available to read currently
500 // Required by RatePolicy
501 std::size_t
available_read_bytes() const502 available_read_bytes() const noexcept
503 {
504 // no limit
505 return (std::numeric_limits<std::size_t>::max)();
506 }
507
508 // Returns the number of bytes available to write currently
509 // Required by RatePolicy
510 std::size_t
available_write_bytes() const511 available_write_bytes() const noexcept
512 {
513 // no limit
514 return (std::numeric_limits<std::size_t>::max)();
515 }
516
517 // Called every time bytes are read
518 // Required by RatePolicy
519 void
transfer_read_bytes(std::size_t n)520 transfer_read_bytes(std::size_t n) noexcept
521 {
522 // Add this to our running total of bytes read
523 read_bytes_ += n;
524 }
525
526 // Called every time bytes are written
527 // Required by RatePolicy
528 void
transfer_write_bytes(std::size_t n)529 transfer_write_bytes(std::size_t n) noexcept
530 {
531 // Add this to our running total of bytes written
532 write_bytes_ += n;
533 }
534
535 // Called approximately once per second
536 // Required by RatePolicy
537 void
on_timer()538 on_timer()
539 {
540 // Calculate elapsed time in seconds, and adjust our time point
541 auto const elapsed = window::get_elapsed(when_);
542
543 // Skip the update when elapsed==0,
544 // otherwise the measurement will have jitter
545 if(elapsed.count() == 0)
546 return;
547
548 // Add our samples and apply exponential decay
549 read_window_.update(read_bytes_, elapsed);
550 write_window_.update(write_bytes_, elapsed);
551
552 // Reset our counts of bytes transferred
553 read_bytes_ = 0;
554 write_bytes_ = 0;
555 }
556
557 public:
rate_gauge()558 rate_gauge()
559 : when_(clock_type::now())
560 {
561 }
562
563 /// Returns the current rate of reading in bytes per second
564 std::size_t
read_bytes_per_second() const565 read_bytes_per_second() const noexcept
566 {
567 return read_window_.value();
568 }
569
570 /// Returns the current rate of writing in bytes per second
571 std::size_t
write_bytes_per_second() const572 write_bytes_per_second() const noexcept
573 {
574 return write_window_.value();
575 }
576 };
577
578 //]
579
580 void
core_3_timeouts_snippets2()581 core_3_timeouts_snippets2()
582 {
583 #include "snippets.ipp"
584
585 {
586 //[code_core_3_timeouts_9
587
588 // This stream will use our new rate_gauge policy
589 basic_stream<net::ip::tcp, net::any_io_executor, rate_gauge> stream(ioc);
590
591 //...
592
593 // Print the current rates
594 std::cout <<
595 stream.rate_policy().read_bytes_per_second() << " bytes/second read\n" <<
596 stream.rate_policy().write_bytes_per_second() << " bytes/second written\n";
597 //]
598 }
599 }
600
601 } // (anon)
602
603 template class basic_stream<net::ip::tcp, net::any_io_executor, rate_gauge>;
604
605 struct core_3_timeouts_test
606 : public beast::unit_test::suite
607 {
608 void
testWindowboost::beast::core_3_timeouts_test609 testWindow()
610 {
611 window w;
612 std::size_t v0 = w.value();
613 std::size_t const N = 100000;
614 for(std::size_t n = 1; n <= 2; ++n)
615 {
616 for(std::size_t i = 0;;++i)
617 {
618 auto const v = w.update(n * N, std::chrono::seconds(n));
619 if(v == v0)
620 {
621 BEAST_PASS();
622 #if 0
623 log <<
624 "update(" << n*N << ", " << n <<
625 ") converged to " << w.value() <<
626 " in " << i << std::endl;
627 #endif
628 break;
629 }
630 if(i > 1000)
631 {
632 BEAST_FAIL();
633 break;
634 }
635 v0 = v;
636 }
637 }
638 }
639
640 void
runboost::beast::core_3_timeouts_test641 run() override
642 {
643 testWindow();
644
645 BEAST_EXPECT(&core_3_timeouts_snippets);
646 BEAST_EXPECT(&core_3_timeouts_snippets2);
647 BEAST_EXPECT((&do_async_echo<net::ip::tcp, net::io_context::executor_type>));
648 BEAST_EXPECT(&https_get);
649 }
650 };
651
652 BEAST_DEFINE_TESTSUITE(beast,doc,core_3_timeouts);
653
654 } // beast
655 } // boost
656