• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9 
10 //------------------------------------------------------------------------------
11 //
12 // Example: HTTP crawl (asynchronous)
13 //
14 //------------------------------------------------------------------------------
15 
16 #include "urls_large_data.hpp"
17 
18 #include <boost/beast/core.hpp>
19 #include <boost/beast/http.hpp>
20 #include <boost/beast/version.hpp>
21 #include <boost/asio/bind_executor.hpp>
22 #include <boost/asio/connect.hpp>
23 #include <boost/asio/ip/tcp.hpp>
24 #include <boost/asio/post.hpp>
25 #include <boost/asio/strand.hpp>
26 #include <atomic>
27 #include <chrono>
28 #include <cstdlib>
29 #include <functional>
30 #include <iomanip>
31 #include <iostream>
32 #include <memory>
33 #include <string>
34 #include <thread>
35 #include <vector>
36 #include <map>
37 
38 namespace chrono = std::chrono;         // from <chrono>
39 namespace beast = boost::beast;         // from <boost/beast.hpp>
40 namespace http = beast::http;           // from <boost/beast/http.hpp>
41 namespace net = boost::asio;            // from <boost/asio.hpp>
42 using tcp = net::ip::tcp;               // from <boost/asio/ip/tcp.hpp>
43 
44 //------------------------------------------------------------------------------
45 
46 // This structure aggregates statistics on all the sites
47 class crawl_report
48 {
49     net::strand<
50         net::io_context::executor_type> strand_;
51     std::atomic<std::size_t> index_;
52     std::vector<char const*> const& hosts_;
53     std::size_t count_ = 0;
54 
55 public:
crawl_report(net::io_context & ioc)56     crawl_report(net::io_context& ioc)
57         : strand_(ioc.get_executor())
58         , index_(0)
59         , hosts_(urls_large_data())
60     {
61     }
62 
63     // Run an aggregation function on the strand.
64     // This allows synchronization without a mutex.
65     template<class F>
66     void
aggregate(F const & f)67     aggregate(F const& f)
68     {
69         net::post(
70             strand_,
71             [&, f]
72             {
73                 f(*this);
74                 if(count_ % 100 == 0)
75                 {
76                     std::cerr <<
77                         "Progress: " << count_ << " of " << hosts_.size() << "\n";
78                     //std::cerr << *this;
79                 }
80                 ++count_;
81             });
82     }
83 
84     // Returns the next host to check
85     char const*
get_host()86     get_host()
87     {
88         auto const n = index_++;
89         if(n >= hosts_.size())
90             return nullptr;
91         return hosts_[n];
92     }
93 
94     // Counts the number of timer failures
95     std::size_t timer_failures = 0;
96 
97     // Counts the number of name resolution failures
98     std::size_t resolve_failures = 0;
99 
100     // Counts the number of connection failures
101     std::size_t connect_failures = 0;
102 
103     // Counts the number of write failures
104     std::size_t write_failures = 0;
105 
106     // Counts the number of read failures
107     std::size_t read_failures = 0;
108 
109     // Counts the number of success reads
110     std::size_t success = 0;
111 
112     // Counts the number received of each status code
113     std::map<unsigned, std::size_t> status_codes;
114 };
115 
116 std::ostream&
operator <<(std::ostream & os,crawl_report const & report)117 operator<<(std::ostream& os, crawl_report const& report)
118 {
119     // Print the report
120     os <<
121         "Crawl report\n" <<
122         "   Failure counts\n" <<
123         "       Timer   : " << report.timer_failures << "\n" <<
124         "       Resolve : " << report.resolve_failures << "\n" <<
125         "       Connect : " << report.connect_failures << "\n" <<
126         "       Write   : " << report.write_failures << "\n" <<
127         "       Read    : " << report.read_failures << "\n" <<
128         "       Success : " << report.success << "\n" <<
129         "   Status codes\n"
130         ;
131     for(auto const& result : report.status_codes)
132         os <<
133         "       " << std::setw(3) << result.first << ": " << result.second <<
134                     " (" << http::obsolete_reason(static_cast<http::status>(result.first)) << ")\n";
135     os.flush();
136     return os;
137 }
138 
139 //------------------------------------------------------------------------------
140 
141 // Performs HTTP GET requests and aggregates the results into a report
142 class worker : public std::enable_shared_from_this<worker>
143 {
144     enum
145     {
146         // Use a small timeout to keep things lively
147         timeout = 5
148     };
149 
150     crawl_report& report_;
151     net::strand<net::io_context::executor_type> ex_;
152     tcp::resolver resolver_;
153     beast::tcp_stream stream_;
154     beast::flat_buffer buffer_; // (Must persist between reads)
155     http::request<http::empty_body> req_;
156     http::response<http::string_body> res_;
157 
158 public:
159     worker(worker&&) = default;
160 
161     // Resolver and socket require an io_context
worker(crawl_report & report,net::io_context & ioc)162     worker(
163         crawl_report& report,
164         net::io_context& ioc)
165         : report_(report)
166         , ex_(net::make_strand(ioc.get_executor()))
167         , resolver_(ex_)
168         , stream_(ex_)
169     {
170         // Set up the common fields of the request
171         req_.version(11);
172         req_.method(http::verb::get);
173         req_.target("/");
174         req_.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
175     }
176 
177     // Start the asynchronous operation
178     void
run()179     run()
180     {
181         do_get_host();
182     }
183 
184     void
do_get_host()185     do_get_host()
186     {
187         // Grab another host
188         auto const host = report_.get_host();
189 
190         // nullptr means no more work
191         if(! host)
192             return;
193 
194         // The Host HTTP field is required
195         req_.set(http::field::host, host);
196 
197         // Set up an HTTP GET request message
198         // Look up the domain name
199         resolver_.async_resolve(
200             host,
201             "http",
202             beast::bind_front_handler(
203                 &worker::on_resolve,
204                 shared_from_this()));
205     }
206 
207     void
on_resolve(beast::error_code ec,tcp::resolver::results_type results)208     on_resolve(
209         beast::error_code ec,
210         tcp::resolver::results_type results)
211     {
212         if(ec)
213         {
214             report_.aggregate(
215             [](crawl_report& rep)
216             {
217                 ++rep.resolve_failures;
218             });
219             return do_get_host();
220         }
221 
222         // Set a timeout on the operation
223         stream_.expires_after(std::chrono::seconds(10));
224 
225         // Make the connection on the IP address we get from a lookup
226         stream_.async_connect(
227             results,
228             beast::bind_front_handler(
229                 &worker::on_connect,
230                 shared_from_this()));
231     }
232 
233     void
on_connect(beast::error_code ec,tcp::resolver::results_type::endpoint_type)234     on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type)
235     {
236         if(ec)
237         {
238             report_.aggregate(
239             [](crawl_report& rep)
240             {
241                 ++rep.connect_failures;
242             });
243             return do_get_host();
244         }
245 
246         // Set a timeout on the operation
247         stream_.expires_after(std::chrono::seconds(10));
248 
249         // Send the HTTP request to the remote host
250         http::async_write(
251             stream_,
252             req_,
253             beast::bind_front_handler(
254                 &worker::on_write,
255                 shared_from_this()));
256     }
257 
258     void
on_write(beast::error_code ec,std::size_t bytes_transferred)259     on_write(
260         beast::error_code ec,
261         std::size_t bytes_transferred)
262     {
263         boost::ignore_unused(bytes_transferred);
264 
265         if(ec)
266         {
267             report_.aggregate(
268             [](crawl_report& rep)
269             {
270                 ++rep.write_failures;
271             });
272             return do_get_host();
273         }
274 
275         // Receive the HTTP response
276         res_ = {};
277         http::async_read(
278             stream_,
279             buffer_,
280             res_,
281             beast::bind_front_handler(
282                 &worker::on_read,
283                 shared_from_this()));
284     }
285 
286     void
on_read(beast::error_code ec,std::size_t bytes_transferred)287     on_read(
288         beast::error_code ec,
289         std::size_t bytes_transferred)
290     {
291         boost::ignore_unused(bytes_transferred);
292 
293         if(ec)
294         {
295             report_.aggregate(
296             [](crawl_report& rep)
297             {
298                 ++rep.read_failures;
299             });
300             return do_get_host();
301         }
302 
303         auto const code = res_.result_int();
304         report_.aggregate(
305         [code](crawl_report& rep)
306         {
307             ++rep.success;
308             ++rep.status_codes[code];
309         });
310 
311         // Gracefully close the socket
312         stream_.socket().shutdown(tcp::socket::shutdown_both, ec);
313         stream_.close();
314 
315         // If we get here then the connection is closed gracefully
316 
317         do_get_host();
318     }
319 };
320 
321 class timer
322 {
323     using clock_type = chrono::system_clock;
324 
325     clock_type::time_point when_;
326 
327 public:
328     using duration = clock_type::duration;
329 
timer()330     timer()
331         : when_(clock_type::now())
332     {
333     }
334 
335     duration
elapsed() const336     elapsed() const
337     {
338         return clock_type::now() - when_;
339     }
340 };
341 
main(int argc,char * argv[])342 int main(int argc, char* argv[])
343 {
344     // Check command line arguments.
345     if (argc != 2)
346     {
347         std::cerr <<
348             "Usage: http-crawl <threads>\n" <<
349             "Example:\n" <<
350             "    http-crawl 100\n";
351         return EXIT_FAILURE;
352     }
353     auto const threads = std::max<int>(1, std::atoi(argv[1]));
354 
355     // The io_context is used to aggregate the statistics
356     net::io_context ioc;
357 
358     // The report holds the aggregated statistics
359     crawl_report report{ioc};
360 
361     timer t;
362 
363     // Create and launch the worker threads.
364     std::vector<std::thread> workers;
365     workers.reserve(threads + 1);
366     for(int i = 0; i < threads; ++i)
367     {
368         // Each worker will eventually add some data to the aggregated
369         // report. Outstanding work is tracked in each worker to
370         // represent the forthcoming delivery of this data by that
371         // worker.
372         auto reporting_work = net::require(
373             ioc.get_executor(),
374             net::execution::outstanding_work.tracked);
375 
376         workers.emplace_back(
377             [&report, reporting_work] {
378                 // We use a separate io_context for each worker because
379                 // the asio resolver simulates asynchronous operation using
380                 // a dedicated worker thread per io_context, and we want to
381                 // do a lot of name resolutions in parallel.
382                 net::io_context ioc;
383                 std::make_shared<worker>(report, ioc)->run();
384                 ioc.run();
385             });
386     }
387 
388     // Add another thread to run the main io_context which
389     // is used to aggregate the statistics
390     workers.emplace_back(
391     [&ioc]
392     {
393         ioc.run();
394     });
395 
396     // Now block until all threads exit
397     for(std::size_t i = 0; i < workers.size(); ++i)
398         workers[i].join();
399 
400     std::cout <<
401         "Elapsed time:    " << chrono::duration_cast<chrono::seconds>(t.elapsed()).count() << " seconds\n";
402     std::cout << report;
403 
404     return EXIT_SUCCESS;
405 }
406