1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/proxy/multi_threaded_proxy_resolver.h"
6
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/message_loop/message_loop_proxy.h"
10 #include "base/metrics/histogram.h"
11 #include "base/strings/string_util.h"
12 #include "base/strings/stringprintf.h"
13 #include "base/threading/thread.h"
14 #include "base/threading/thread_restrictions.h"
15 #include "net/base/net_errors.h"
16 #include "net/base/net_log.h"
17 #include "net/proxy/proxy_info.h"
18
19 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
20 // data when SetPacScript fails. That will reclaim memory when
21 // testing bogus scripts.
22
23 namespace net {
24
25 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
26 // thread and a synchronous ProxyResolver (which will be operated on said
27 // thread.)
28 class MultiThreadedProxyResolver::Executor
29 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
30 public:
31 // |coordinator| must remain valid throughout our lifetime. It is used to
32 // signal when the executor is ready to receive work by calling
33 // |coordinator->OnExecutorReady()|.
34 // The constructor takes ownership of |resolver|.
35 // |thread_number| is an identifier used when naming the worker thread.
36 Executor(MultiThreadedProxyResolver* coordinator,
37 ProxyResolver* resolver,
38 int thread_number);
39
40 // Submit a job to this executor.
41 void StartJob(Job* job);
42
43 // Callback for when a job has completed running on the executor's thread.
44 void OnJobCompleted(Job* job);
45
46 // Cleanup the executor. Cancels all outstanding work, and frees the thread
47 // and resolver.
48 void Destroy();
49
50 // Returns the outstanding job, or NULL.
outstanding_job() const51 Job* outstanding_job() const { return outstanding_job_.get(); }
52
resolver()53 ProxyResolver* resolver() { return resolver_.get(); }
54
thread_number() const55 int thread_number() const { return thread_number_; }
56
57 private:
58 friend class base::RefCountedThreadSafe<Executor>;
59 ~Executor();
60
61 MultiThreadedProxyResolver* coordinator_;
62 const int thread_number_;
63
64 // The currently active job for this executor (either a SetPacScript or
65 // GetProxyForURL task).
66 scoped_refptr<Job> outstanding_job_;
67
68 // The synchronous resolver implementation.
69 scoped_ptr<ProxyResolver> resolver_;
70
71 // The thread where |resolver_| is run on.
72 // Note that declaration ordering is important here. |thread_| needs to be
73 // destroyed *before* |resolver_|, in case |resolver_| is currently
74 // executing on |thread_|.
75 scoped_ptr<base::Thread> thread_;
76 };
77
78 // MultiThreadedProxyResolver::Job ---------------------------------------------
79
80 class MultiThreadedProxyResolver::Job
81 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
82 public:
83 // Identifies the subclass of Job (only being used for debugging purposes).
84 enum Type {
85 TYPE_GET_PROXY_FOR_URL,
86 TYPE_SET_PAC_SCRIPT,
87 TYPE_SET_PAC_SCRIPT_INTERNAL,
88 };
89
Job(Type type,const CompletionCallback & callback)90 Job(Type type, const CompletionCallback& callback)
91 : type_(type),
92 callback_(callback),
93 executor_(NULL),
94 was_cancelled_(false) {
95 }
96
set_executor(Executor * executor)97 void set_executor(Executor* executor) {
98 executor_ = executor;
99 }
100
101 // The "executor" is the job runner that is scheduling this job. If
102 // this job has not been submitted to an executor yet, this will be
103 // NULL (and we know it hasn't started yet).
executor()104 Executor* executor() {
105 return executor_;
106 }
107
108 // Mark the job as having been cancelled.
Cancel()109 void Cancel() {
110 was_cancelled_ = true;
111 }
112
113 // Returns true if Cancel() has been called.
was_cancelled() const114 bool was_cancelled() const { return was_cancelled_; }
115
type() const116 Type type() const { return type_; }
117
118 // Returns true if this job still has a user callback. Some jobs
119 // do not have a user callback, because they were helper jobs
120 // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
121 //
122 // Otherwise jobs that correspond with user-initiated work will
123 // have a non-null callback up until the callback is run.
has_user_callback() const124 bool has_user_callback() const { return !callback_.is_null(); }
125
126 // This method is called when the job is inserted into a wait queue
127 // because no executors were ready to accept it.
WaitingForThread()128 virtual void WaitingForThread() {}
129
130 // This method is called just before the job is posted to the work thread.
FinishedWaitingForThread()131 virtual void FinishedWaitingForThread() {}
132
133 // This method is called on the worker thread to do the job's work. On
134 // completion, implementors are expected to call OnJobCompleted() on
135 // |origin_loop|.
136 virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) = 0;
137
138 protected:
OnJobCompleted()139 void OnJobCompleted() {
140 // |executor_| will be NULL if the executor has already been deleted.
141 if (executor_)
142 executor_->OnJobCompleted(this);
143 }
144
RunUserCallback(int result)145 void RunUserCallback(int result) {
146 DCHECK(has_user_callback());
147 CompletionCallback callback = callback_;
148 // Reset the callback so has_user_callback() will now return false.
149 callback_.Reset();
150 callback.Run(result);
151 }
152
153 friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
154
~Job()155 virtual ~Job() {}
156
157 private:
158 const Type type_;
159 CompletionCallback callback_;
160 Executor* executor_;
161 bool was_cancelled_;
162 };
163
164 // MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
165
166 // Runs on the worker thread to call ProxyResolver::SetPacScript.
167 class MultiThreadedProxyResolver::SetPacScriptJob
168 : public MultiThreadedProxyResolver::Job {
169 public:
SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData> & script_data,const CompletionCallback & callback)170 SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
171 const CompletionCallback& callback)
172 : Job(!callback.is_null() ? TYPE_SET_PAC_SCRIPT :
173 TYPE_SET_PAC_SCRIPT_INTERNAL,
174 callback),
175 script_data_(script_data) {
176 }
177
178 // Runs on the worker thread.
Run(scoped_refptr<base::MessageLoopProxy> origin_loop)179 virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
180 ProxyResolver* resolver = executor()->resolver();
181 int rv = resolver->SetPacScript(script_data_, CompletionCallback());
182
183 DCHECK_NE(rv, ERR_IO_PENDING);
184 origin_loop->PostTask(
185 FROM_HERE,
186 base::Bind(&SetPacScriptJob::RequestComplete, this, rv));
187 }
188
189 protected:
~SetPacScriptJob()190 virtual ~SetPacScriptJob() {}
191
192 private:
193 // Runs the completion callback on the origin thread.
RequestComplete(int result_code)194 void RequestComplete(int result_code) {
195 // The task may have been cancelled after it was started.
196 if (!was_cancelled() && has_user_callback()) {
197 RunUserCallback(result_code);
198 }
199 OnJobCompleted();
200 }
201
202 const scoped_refptr<ProxyResolverScriptData> script_data_;
203 };
204
205 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
206
207 class MultiThreadedProxyResolver::GetProxyForURLJob
208 : public MultiThreadedProxyResolver::Job {
209 public:
210 // |url| -- the URL of the query.
211 // |results| -- the structure to fill with proxy resolve results.
GetProxyForURLJob(const GURL & url,ProxyInfo * results,const CompletionCallback & callback,const BoundNetLog & net_log)212 GetProxyForURLJob(const GURL& url,
213 ProxyInfo* results,
214 const CompletionCallback& callback,
215 const BoundNetLog& net_log)
216 : Job(TYPE_GET_PROXY_FOR_URL, callback),
217 results_(results),
218 net_log_(net_log),
219 url_(url),
220 was_waiting_for_thread_(false) {
221 DCHECK(!callback.is_null());
222 start_time_ = base::TimeTicks::Now();
223 }
224
net_log()225 BoundNetLog* net_log() { return &net_log_; }
226
WaitingForThread()227 virtual void WaitingForThread() OVERRIDE {
228 was_waiting_for_thread_ = true;
229 net_log_.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
230 }
231
FinishedWaitingForThread()232 virtual void FinishedWaitingForThread() OVERRIDE {
233 DCHECK(executor());
234
235 submitted_to_thread_time_ = base::TimeTicks::Now();
236
237 if (was_waiting_for_thread_) {
238 net_log_.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
239 }
240
241 net_log_.AddEvent(
242 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
243 NetLog::IntegerCallback("thread_number", executor()->thread_number()));
244 }
245
246 // Runs on the worker thread.
Run(scoped_refptr<base::MessageLoopProxy> origin_loop)247 virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
248 ProxyResolver* resolver = executor()->resolver();
249 int rv = resolver->GetProxyForURL(
250 url_, &results_buf_, CompletionCallback(), NULL, net_log_);
251 DCHECK_NE(rv, ERR_IO_PENDING);
252
253 origin_loop->PostTask(
254 FROM_HERE,
255 base::Bind(&GetProxyForURLJob::QueryComplete, this, rv));
256 }
257
258 protected:
~GetProxyForURLJob()259 virtual ~GetProxyForURLJob() {}
260
261 private:
262 // Runs the completion callback on the origin thread.
QueryComplete(int result_code)263 void QueryComplete(int result_code) {
264 // The Job may have been cancelled after it was started.
265 if (!was_cancelled()) {
266 RecordPerformanceMetrics();
267 if (result_code >= OK) { // Note: unit-tests use values > 0.
268 results_->Use(results_buf_);
269 }
270 RunUserCallback(result_code);
271 }
272 OnJobCompleted();
273 }
274
RecordPerformanceMetrics()275 void RecordPerformanceMetrics() {
276 DCHECK(!was_cancelled());
277
278 base::TimeTicks now = base::TimeTicks::Now();
279
280 // Log the total time the request took to complete.
281 UMA_HISTOGRAM_MEDIUM_TIMES("Net.MTPR_GetProxyForUrl_Time",
282 now - start_time_);
283
284 // Log the time the request was stalled waiting for a thread to free up.
285 UMA_HISTOGRAM_MEDIUM_TIMES("Net.MTPR_GetProxyForUrl_Thread_Wait_Time",
286 submitted_to_thread_time_ - start_time_);
287 }
288
289 // Must only be used on the "origin" thread.
290 ProxyInfo* results_;
291
292 // Can be used on either "origin" or worker thread.
293 BoundNetLog net_log_;
294 const GURL url_;
295
296 // Usable from within DoQuery on the worker thread.
297 ProxyInfo results_buf_;
298
299 base::TimeTicks start_time_;
300 base::TimeTicks submitted_to_thread_time_;
301
302 bool was_waiting_for_thread_;
303 };
304
305 // MultiThreadedProxyResolver::Executor ----------------------------------------
306
Executor(MultiThreadedProxyResolver * coordinator,ProxyResolver * resolver,int thread_number)307 MultiThreadedProxyResolver::Executor::Executor(
308 MultiThreadedProxyResolver* coordinator,
309 ProxyResolver* resolver,
310 int thread_number)
311 : coordinator_(coordinator),
312 thread_number_(thread_number),
313 resolver_(resolver) {
314 DCHECK(coordinator);
315 DCHECK(resolver);
316 // Start up the thread.
317 thread_.reset(new base::Thread(base::StringPrintf("PAC thread #%d",
318 thread_number)));
319 CHECK(thread_->Start());
320 }
321
StartJob(Job * job)322 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
323 DCHECK(!outstanding_job_.get());
324 outstanding_job_ = job;
325
326 // Run the job. Once it has completed (regardless of whether it was
327 // cancelled), it will invoke OnJobCompleted() on this thread.
328 job->set_executor(this);
329 job->FinishedWaitingForThread();
330 thread_->message_loop()->PostTask(
331 FROM_HERE,
332 base::Bind(&Job::Run, job, base::MessageLoopProxy::current()));
333 }
334
OnJobCompleted(Job * job)335 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
336 DCHECK_EQ(job, outstanding_job_.get());
337 outstanding_job_ = NULL;
338 coordinator_->OnExecutorReady(this);
339 }
340
Destroy()341 void MultiThreadedProxyResolver::Executor::Destroy() {
342 DCHECK(coordinator_);
343
344 {
345 // See http://crbug.com/69710.
346 base::ThreadRestrictions::ScopedAllowIO allow_io;
347
348 // Join the worker thread.
349 thread_.reset();
350 }
351
352 // Cancel any outstanding job.
353 if (outstanding_job_.get()) {
354 outstanding_job_->Cancel();
355 // Orphan the job (since this executor may be deleted soon).
356 outstanding_job_->set_executor(NULL);
357 }
358
359 // It is now safe to free the ProxyResolver, since all the tasks that
360 // were using it on the resolver thread have completed.
361 resolver_.reset();
362
363 // Null some stuff as a precaution.
364 coordinator_ = NULL;
365 outstanding_job_ = NULL;
366 }
367
~Executor()368 MultiThreadedProxyResolver::Executor::~Executor() {
369 // The important cleanup happens as part of Destroy(), which should always be
370 // called first.
371 DCHECK(!coordinator_) << "Destroy() was not called";
372 DCHECK(!thread_.get());
373 DCHECK(!resolver_.get());
374 DCHECK(!outstanding_job_.get());
375 }
376
377 // MultiThreadedProxyResolver --------------------------------------------------
378
MultiThreadedProxyResolver(ProxyResolverFactory * resolver_factory,size_t max_num_threads)379 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
380 ProxyResolverFactory* resolver_factory,
381 size_t max_num_threads)
382 : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
383 resolver_factory_(resolver_factory),
384 max_num_threads_(max_num_threads) {
385 DCHECK_GE(max_num_threads, 1u);
386 }
387
~MultiThreadedProxyResolver()388 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
389 // We will cancel all outstanding requests.
390 pending_jobs_.clear();
391 ReleaseAllExecutors();
392 }
393
GetProxyForURL(const GURL & url,ProxyInfo * results,const CompletionCallback & callback,RequestHandle * request,const BoundNetLog & net_log)394 int MultiThreadedProxyResolver::GetProxyForURL(
395 const GURL& url, ProxyInfo* results, const CompletionCallback& callback,
396 RequestHandle* request, const BoundNetLog& net_log) {
397 DCHECK(CalledOnValidThread());
398 DCHECK(!callback.is_null());
399 DCHECK(current_script_data_.get())
400 << "Resolver is un-initialized. Must call SetPacScript() first!";
401
402 scoped_refptr<GetProxyForURLJob> job(
403 new GetProxyForURLJob(url, results, callback, net_log));
404
405 // Completion will be notified through |callback|, unless the caller cancels
406 // the request using |request|.
407 if (request)
408 *request = reinterpret_cast<RequestHandle>(job.get());
409
410 // If there is an executor that is ready to run this request, submit it!
411 Executor* executor = FindIdleExecutor();
412 if (executor) {
413 DCHECK_EQ(0u, pending_jobs_.size());
414 executor->StartJob(job.get());
415 return ERR_IO_PENDING;
416 }
417
418 // Otherwise queue this request. (We will schedule it to a thread once one
419 // becomes available).
420 job->WaitingForThread();
421 pending_jobs_.push_back(job);
422
423 // If we haven't already reached the thread limit, provision a new thread to
424 // drain the requests more quickly.
425 if (executors_.size() < max_num_threads_) {
426 executor = AddNewExecutor();
427 executor->StartJob(
428 new SetPacScriptJob(current_script_data_, CompletionCallback()));
429 }
430
431 return ERR_IO_PENDING;
432 }
433
CancelRequest(RequestHandle req)434 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
435 DCHECK(CalledOnValidThread());
436 DCHECK(req);
437
438 Job* job = reinterpret_cast<Job*>(req);
439 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
440
441 if (job->executor()) {
442 // If the job was already submitted to the executor, just mark it
443 // as cancelled so the user callback isn't run on completion.
444 job->Cancel();
445 } else {
446 // Otherwise the job is just sitting in a queue.
447 PendingJobsQueue::iterator it =
448 std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
449 DCHECK(it != pending_jobs_.end());
450 pending_jobs_.erase(it);
451 }
452 }
453
GetLoadState(RequestHandle req) const454 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const {
455 DCHECK(CalledOnValidThread());
456 DCHECK(req);
457 return LOAD_STATE_RESOLVING_PROXY_FOR_URL;
458 }
459
CancelSetPacScript()460 void MultiThreadedProxyResolver::CancelSetPacScript() {
461 DCHECK(CalledOnValidThread());
462 DCHECK_EQ(0u, pending_jobs_.size());
463 DCHECK_EQ(1u, executors_.size());
464 DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
465 executors_[0]->outstanding_job()->type());
466
467 // Defensively clear some data which shouldn't be getting used
468 // anymore.
469 current_script_data_ = NULL;
470
471 ReleaseAllExecutors();
472 }
473
SetPacScript(const scoped_refptr<ProxyResolverScriptData> & script_data,const CompletionCallback & callback)474 int MultiThreadedProxyResolver::SetPacScript(
475 const scoped_refptr<ProxyResolverScriptData>& script_data,
476 const CompletionCallback&callback) {
477 DCHECK(CalledOnValidThread());
478 DCHECK(!callback.is_null());
479
480 // Save the script details, so we can provision new executors later.
481 current_script_data_ = script_data;
482
483 // The user should not have any outstanding requests when they call
484 // SetPacScript().
485 CheckNoOutstandingUserRequests();
486
487 // Destroy all of the current threads and their proxy resolvers.
488 ReleaseAllExecutors();
489
490 // Provision a new executor, and run the SetPacScript request. On completion
491 // notification will be sent through |callback|.
492 Executor* executor = AddNewExecutor();
493 executor->StartJob(new SetPacScriptJob(script_data, callback));
494 return ERR_IO_PENDING;
495 }
496
CheckNoOutstandingUserRequests() const497 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
498 DCHECK(CalledOnValidThread());
499 CHECK_EQ(0u, pending_jobs_.size());
500
501 for (ExecutorList::const_iterator it = executors_.begin();
502 it != executors_.end(); ++it) {
503 const Executor* executor = it->get();
504 Job* job = executor->outstanding_job();
505 // The "has_user_callback()" is to exclude jobs for which the callback
506 // has already been invoked, or was not user-initiated (as in the case of
507 // lazy thread provisions). User-initiated jobs may !has_user_callback()
508 // when the callback has already been run. (Since we only clear the
509 // outstanding job AFTER the callback has been invoked, it is possible
510 // for a new request to be started from within the callback).
511 CHECK(!job || job->was_cancelled() || !job->has_user_callback());
512 }
513 }
514
ReleaseAllExecutors()515 void MultiThreadedProxyResolver::ReleaseAllExecutors() {
516 DCHECK(CalledOnValidThread());
517 for (ExecutorList::iterator it = executors_.begin();
518 it != executors_.end(); ++it) {
519 Executor* executor = it->get();
520 executor->Destroy();
521 }
522 executors_.clear();
523 }
524
525 MultiThreadedProxyResolver::Executor*
FindIdleExecutor()526 MultiThreadedProxyResolver::FindIdleExecutor() {
527 DCHECK(CalledOnValidThread());
528 for (ExecutorList::iterator it = executors_.begin();
529 it != executors_.end(); ++it) {
530 Executor* executor = it->get();
531 if (!executor->outstanding_job())
532 return executor;
533 }
534 return NULL;
535 }
536
537 MultiThreadedProxyResolver::Executor*
AddNewExecutor()538 MultiThreadedProxyResolver::AddNewExecutor() {
539 DCHECK(CalledOnValidThread());
540 DCHECK_LT(executors_.size(), max_num_threads_);
541 // The "thread number" is used to give the thread a unique name.
542 int thread_number = executors_.size();
543 ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
544 Executor* executor = new Executor(
545 this, resolver, thread_number);
546 executors_.push_back(make_scoped_refptr(executor));
547 return executor;
548 }
549
OnExecutorReady(Executor * executor)550 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
551 DCHECK(CalledOnValidThread());
552 if (pending_jobs_.empty())
553 return;
554
555 // Get the next job to process (FIFO). Transfer it from the pending queue
556 // to the executor.
557 scoped_refptr<Job> job = pending_jobs_.front();
558 pending_jobs_.pop_front();
559 executor->StartJob(job.get());
560 }
561
562 } // namespace net
563