1 //
2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9
10 //------------------------------------------------------------------------------
11 //
12 // Example: HTTP crawl (asynchronous)
13 //
14 //------------------------------------------------------------------------------
15
16 #include "urls_large_data.hpp"
17
18 #include <boost/beast/core.hpp>
19 #include <boost/beast/http.hpp>
20 #include <boost/beast/version.hpp>
21 #include <boost/asio/bind_executor.hpp>
22 #include <boost/asio/connect.hpp>
23 #include <boost/asio/ip/tcp.hpp>
24 #include <boost/asio/post.hpp>
25 #include <boost/asio/strand.hpp>
26 #include <atomic>
27 #include <chrono>
28 #include <cstdlib>
29 #include <functional>
30 #include <iomanip>
31 #include <iostream>
32 #include <memory>
33 #include <string>
34 #include <thread>
35 #include <vector>
36 #include <map>
37
38 namespace chrono = std::chrono; // from <chrono>
39 namespace beast = boost::beast; // from <boost/beast.hpp>
40 namespace http = beast::http; // from <boost/beast/http.hpp>
41 namespace net = boost::asio; // from <boost/asio.hpp>
42 using tcp = net::ip::tcp; // from <boost/asio/ip/tcp.hpp>
43
44 //------------------------------------------------------------------------------
45
46 // This structure aggregates statistics on all the sites
47 class crawl_report
48 {
49 net::strand<
50 net::io_context::executor_type> strand_;
51 std::atomic<std::size_t> index_;
52 std::vector<char const*> const& hosts_;
53 std::size_t count_ = 0;
54
55 public:
crawl_report(net::io_context & ioc)56 crawl_report(net::io_context& ioc)
57 : strand_(ioc.get_executor())
58 , index_(0)
59 , hosts_(urls_large_data())
60 {
61 }
62
63 // Run an aggregation function on the strand.
64 // This allows synchronization without a mutex.
65 template<class F>
66 void
aggregate(F const & f)67 aggregate(F const& f)
68 {
69 net::post(
70 strand_,
71 [&, f]
72 {
73 f(*this);
74 if(count_ % 100 == 0)
75 {
76 std::cerr <<
77 "Progress: " << count_ << " of " << hosts_.size() << "\n";
78 //std::cerr << *this;
79 }
80 ++count_;
81 });
82 }
83
84 // Returns the next host to check
85 char const*
get_host()86 get_host()
87 {
88 auto const n = index_++;
89 if(n >= hosts_.size())
90 return nullptr;
91 return hosts_[n];
92 }
93
94 // Counts the number of timer failures
95 std::size_t timer_failures = 0;
96
97 // Counts the number of name resolution failures
98 std::size_t resolve_failures = 0;
99
100 // Counts the number of connection failures
101 std::size_t connect_failures = 0;
102
103 // Counts the number of write failures
104 std::size_t write_failures = 0;
105
106 // Counts the number of read failures
107 std::size_t read_failures = 0;
108
109 // Counts the number of success reads
110 std::size_t success = 0;
111
112 // Counts the number received of each status code
113 std::map<unsigned, std::size_t> status_codes;
114 };
115
116 std::ostream&
operator <<(std::ostream & os,crawl_report const & report)117 operator<<(std::ostream& os, crawl_report const& report)
118 {
119 // Print the report
120 os <<
121 "Crawl report\n" <<
122 " Failure counts\n" <<
123 " Timer : " << report.timer_failures << "\n" <<
124 " Resolve : " << report.resolve_failures << "\n" <<
125 " Connect : " << report.connect_failures << "\n" <<
126 " Write : " << report.write_failures << "\n" <<
127 " Read : " << report.read_failures << "\n" <<
128 " Success : " << report.success << "\n" <<
129 " Status codes\n"
130 ;
131 for(auto const& result : report.status_codes)
132 os <<
133 " " << std::setw(3) << result.first << ": " << result.second <<
134 " (" << http::obsolete_reason(static_cast<http::status>(result.first)) << ")\n";
135 os.flush();
136 return os;
137 }
138
139 //------------------------------------------------------------------------------
140
141 // Performs HTTP GET requests and aggregates the results into a report
142 class worker : public std::enable_shared_from_this<worker>
143 {
144 enum
145 {
146 // Use a small timeout to keep things lively
147 timeout = 5
148 };
149
150 crawl_report& report_;
151 net::strand<net::io_context::executor_type> ex_;
152 tcp::resolver resolver_;
153 beast::tcp_stream stream_;
154 beast::flat_buffer buffer_; // (Must persist between reads)
155 http::request<http::empty_body> req_;
156 http::response<http::string_body> res_;
157
158 public:
159 worker(worker&&) = default;
160
161 // Resolver and socket require an io_context
worker(crawl_report & report,net::io_context & ioc)162 worker(
163 crawl_report& report,
164 net::io_context& ioc)
165 : report_(report)
166 , ex_(net::make_strand(ioc.get_executor()))
167 , resolver_(ex_)
168 , stream_(ex_)
169 {
170 // Set up the common fields of the request
171 req_.version(11);
172 req_.method(http::verb::get);
173 req_.target("/");
174 req_.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
175 }
176
177 // Start the asynchronous operation
178 void
run()179 run()
180 {
181 do_get_host();
182 }
183
184 void
do_get_host()185 do_get_host()
186 {
187 // Grab another host
188 auto const host = report_.get_host();
189
190 // nullptr means no more work
191 if(! host)
192 return;
193
194 // The Host HTTP field is required
195 req_.set(http::field::host, host);
196
197 // Set up an HTTP GET request message
198 // Look up the domain name
199 resolver_.async_resolve(
200 host,
201 "http",
202 beast::bind_front_handler(
203 &worker::on_resolve,
204 shared_from_this()));
205 }
206
207 void
on_resolve(beast::error_code ec,tcp::resolver::results_type results)208 on_resolve(
209 beast::error_code ec,
210 tcp::resolver::results_type results)
211 {
212 if(ec)
213 {
214 report_.aggregate(
215 [](crawl_report& rep)
216 {
217 ++rep.resolve_failures;
218 });
219 return do_get_host();
220 }
221
222 // Set a timeout on the operation
223 stream_.expires_after(std::chrono::seconds(10));
224
225 // Make the connection on the IP address we get from a lookup
226 stream_.async_connect(
227 results,
228 beast::bind_front_handler(
229 &worker::on_connect,
230 shared_from_this()));
231 }
232
233 void
on_connect(beast::error_code ec,tcp::resolver::results_type::endpoint_type)234 on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type)
235 {
236 if(ec)
237 {
238 report_.aggregate(
239 [](crawl_report& rep)
240 {
241 ++rep.connect_failures;
242 });
243 return do_get_host();
244 }
245
246 // Set a timeout on the operation
247 stream_.expires_after(std::chrono::seconds(10));
248
249 // Send the HTTP request to the remote host
250 http::async_write(
251 stream_,
252 req_,
253 beast::bind_front_handler(
254 &worker::on_write,
255 shared_from_this()));
256 }
257
258 void
on_write(beast::error_code ec,std::size_t bytes_transferred)259 on_write(
260 beast::error_code ec,
261 std::size_t bytes_transferred)
262 {
263 boost::ignore_unused(bytes_transferred);
264
265 if(ec)
266 {
267 report_.aggregate(
268 [](crawl_report& rep)
269 {
270 ++rep.write_failures;
271 });
272 return do_get_host();
273 }
274
275 // Receive the HTTP response
276 res_ = {};
277 http::async_read(
278 stream_,
279 buffer_,
280 res_,
281 beast::bind_front_handler(
282 &worker::on_read,
283 shared_from_this()));
284 }
285
286 void
on_read(beast::error_code ec,std::size_t bytes_transferred)287 on_read(
288 beast::error_code ec,
289 std::size_t bytes_transferred)
290 {
291 boost::ignore_unused(bytes_transferred);
292
293 if(ec)
294 {
295 report_.aggregate(
296 [](crawl_report& rep)
297 {
298 ++rep.read_failures;
299 });
300 return do_get_host();
301 }
302
303 auto const code = res_.result_int();
304 report_.aggregate(
305 [code](crawl_report& rep)
306 {
307 ++rep.success;
308 ++rep.status_codes[code];
309 });
310
311 // Gracefully close the socket
312 stream_.socket().shutdown(tcp::socket::shutdown_both, ec);
313 stream_.close();
314
315 // If we get here then the connection is closed gracefully
316
317 do_get_host();
318 }
319 };
320
321 class timer
322 {
323 using clock_type = chrono::system_clock;
324
325 clock_type::time_point when_;
326
327 public:
328 using duration = clock_type::duration;
329
timer()330 timer()
331 : when_(clock_type::now())
332 {
333 }
334
335 duration
elapsed() const336 elapsed() const
337 {
338 return clock_type::now() - when_;
339 }
340 };
341
main(int argc,char * argv[])342 int main(int argc, char* argv[])
343 {
344 // Check command line arguments.
345 if (argc != 2)
346 {
347 std::cerr <<
348 "Usage: http-crawl <threads>\n" <<
349 "Example:\n" <<
350 " http-crawl 100\n";
351 return EXIT_FAILURE;
352 }
353 auto const threads = std::max<int>(1, std::atoi(argv[1]));
354
355 // The io_context is used to aggregate the statistics
356 net::io_context ioc;
357
358 // The report holds the aggregated statistics
359 crawl_report report{ioc};
360
361 timer t;
362
363 // Create and launch the worker threads.
364 std::vector<std::thread> workers;
365 workers.reserve(threads + 1);
366 for(int i = 0; i < threads; ++i)
367 {
368 // Each worker will eventually add some data to the aggregated
369 // report. Outstanding work is tracked in each worker to
370 // represent the forthcoming delivery of this data by that
371 // worker.
372 auto reporting_work = net::require(
373 ioc.get_executor(),
374 net::execution::outstanding_work.tracked);
375
376 workers.emplace_back(
377 [&report, reporting_work] {
378 // We use a separate io_context for each worker because
379 // the asio resolver simulates asynchronous operation using
380 // a dedicated worker thread per io_context, and we want to
381 // do a lot of name resolutions in parallel.
382 net::io_context ioc;
383 std::make_shared<worker>(report, ioc)->run();
384 ioc.run();
385 });
386 }
387
388 // Add another thread to run the main io_context which
389 // is used to aggregate the statistics
390 workers.emplace_back(
391 [&ioc]
392 {
393 ioc.run();
394 });
395
396 // Now block until all threads exit
397 for(std::size_t i = 0; i < workers.size(); ++i)
398 workers[i].join();
399
400 std::cout <<
401 "Elapsed time: " << chrono::duration_cast<chrono::seconds>(t.elapsed()).count() << " seconds\n";
402 std::cout << report;
403
404 return EXIT_SUCCESS;
405 }
406