• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/proxy/multi_threaded_proxy_resolver.h"
6 
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/message_loop/message_loop_proxy.h"
10 #include "base/strings/string_util.h"
11 #include "base/strings/stringprintf.h"
12 #include "base/threading/thread.h"
13 #include "base/threading/thread_restrictions.h"
14 #include "net/base/net_errors.h"
15 #include "net/base/net_log.h"
16 #include "net/proxy/proxy_info.h"
17 
18 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
19 //               data when SetPacScript fails. That will reclaim memory when
20 //               testing bogus scripts.
21 
22 namespace net {
23 
24 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
25 // thread and a synchronous ProxyResolver (which will be operated on said
26 // thread.)
27 class MultiThreadedProxyResolver::Executor
28     : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
29  public:
30   // |coordinator| must remain valid throughout our lifetime. It is used to
31   // signal when the executor is ready to receive work by calling
32   // |coordinator->OnExecutorReady()|.
33   // The constructor takes ownership of |resolver|.
34   // |thread_number| is an identifier used when naming the worker thread.
35   Executor(MultiThreadedProxyResolver* coordinator,
36            ProxyResolver* resolver,
37            int thread_number);
38 
39   // Submit a job to this executor.
40   void StartJob(Job* job);
41 
42   // Callback for when a job has completed running on the executor's thread.
43   void OnJobCompleted(Job* job);
44 
45   // Cleanup the executor. Cancels all outstanding work, and frees the thread
46   // and resolver.
47   void Destroy();
48 
49   // Returns the outstanding job, or NULL.
outstanding_job() const50   Job* outstanding_job() const { return outstanding_job_.get(); }
51 
resolver()52   ProxyResolver* resolver() { return resolver_.get(); }
53 
thread_number() const54   int thread_number() const { return thread_number_; }
55 
56  private:
57   friend class base::RefCountedThreadSafe<Executor>;
58   ~Executor();
59 
60   MultiThreadedProxyResolver* coordinator_;
61   const int thread_number_;
62 
63   // The currently active job for this executor (either a SetPacScript or
64   // GetProxyForURL task).
65   scoped_refptr<Job> outstanding_job_;
66 
67   // The synchronous resolver implementation.
68   scoped_ptr<ProxyResolver> resolver_;
69 
70   // The thread where |resolver_| is run on.
71   // Note that declaration ordering is important here. |thread_| needs to be
72   // destroyed *before* |resolver_|, in case |resolver_| is currently
73   // executing on |thread_|.
74   scoped_ptr<base::Thread> thread_;
75 };
76 
77 // MultiThreadedProxyResolver::Job ---------------------------------------------
78 
79 class MultiThreadedProxyResolver::Job
80     : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
81  public:
82   // Identifies the subclass of Job (only being used for debugging purposes).
83   enum Type {
84     TYPE_GET_PROXY_FOR_URL,
85     TYPE_SET_PAC_SCRIPT,
86     TYPE_SET_PAC_SCRIPT_INTERNAL,
87   };
88 
Job(Type type,const CompletionCallback & callback)89   Job(Type type, const CompletionCallback& callback)
90       : type_(type),
91         callback_(callback),
92         executor_(NULL),
93         was_cancelled_(false) {
94   }
95 
set_executor(Executor * executor)96   void set_executor(Executor* executor) {
97     executor_ = executor;
98   }
99 
100   // The "executor" is the job runner that is scheduling this job. If
101   // this job has not been submitted to an executor yet, this will be
102   // NULL (and we know it hasn't started yet).
executor()103   Executor* executor() {
104     return executor_;
105   }
106 
107   // Mark the job as having been cancelled.
Cancel()108   void Cancel() {
109     was_cancelled_ = true;
110   }
111 
112   // Returns true if Cancel() has been called.
was_cancelled() const113   bool was_cancelled() const { return was_cancelled_; }
114 
type() const115   Type type() const { return type_; }
116 
117   // Returns true if this job still has a user callback. Some jobs
118   // do not have a user callback, because they were helper jobs
119   // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
120   //
121   // Otherwise jobs that correspond with user-initiated work will
122   // have a non-null callback up until the callback is run.
has_user_callback() const123   bool has_user_callback() const { return !callback_.is_null(); }
124 
125   // This method is called when the job is inserted into a wait queue
126   // because no executors were ready to accept it.
WaitingForThread()127   virtual void WaitingForThread() {}
128 
129   // This method is called just before the job is posted to the work thread.
FinishedWaitingForThread()130   virtual void FinishedWaitingForThread() {}
131 
132   // This method is called on the worker thread to do the job's work. On
133   // completion, implementors are expected to call OnJobCompleted() on
134   // |origin_loop|.
135   virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) = 0;
136 
137  protected:
OnJobCompleted()138   void OnJobCompleted() {
139     // |executor_| will be NULL if the executor has already been deleted.
140     if (executor_)
141       executor_->OnJobCompleted(this);
142   }
143 
RunUserCallback(int result)144   void RunUserCallback(int result) {
145     DCHECK(has_user_callback());
146     CompletionCallback callback = callback_;
147     // Reset the callback so has_user_callback() will now return false.
148     callback_.Reset();
149     callback.Run(result);
150   }
151 
152   friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
153 
~Job()154   virtual ~Job() {}
155 
156  private:
157   const Type type_;
158   CompletionCallback callback_;
159   Executor* executor_;
160   bool was_cancelled_;
161 };
162 
163 // MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
164 
165 // Runs on the worker thread to call ProxyResolver::SetPacScript.
166 class MultiThreadedProxyResolver::SetPacScriptJob
167     : public MultiThreadedProxyResolver::Job {
168  public:
SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData> & script_data,const CompletionCallback & callback)169   SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
170                   const CompletionCallback& callback)
171     : Job(!callback.is_null() ? TYPE_SET_PAC_SCRIPT :
172                                 TYPE_SET_PAC_SCRIPT_INTERNAL,
173           callback),
174       script_data_(script_data) {
175   }
176 
177   // Runs on the worker thread.
Run(scoped_refptr<base::MessageLoopProxy> origin_loop)178   virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
179     ProxyResolver* resolver = executor()->resolver();
180     int rv = resolver->SetPacScript(script_data_, CompletionCallback());
181 
182     DCHECK_NE(rv, ERR_IO_PENDING);
183     origin_loop->PostTask(
184         FROM_HERE,
185         base::Bind(&SetPacScriptJob::RequestComplete, this, rv));
186   }
187 
188  protected:
~SetPacScriptJob()189   virtual ~SetPacScriptJob() {}
190 
191  private:
192   // Runs the completion callback on the origin thread.
RequestComplete(int result_code)193   void RequestComplete(int result_code) {
194     // The task may have been cancelled after it was started.
195     if (!was_cancelled() && has_user_callback()) {
196       RunUserCallback(result_code);
197     }
198     OnJobCompleted();
199   }
200 
201   const scoped_refptr<ProxyResolverScriptData> script_data_;
202 };
203 
204 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
205 
206 class MultiThreadedProxyResolver::GetProxyForURLJob
207     : public MultiThreadedProxyResolver::Job {
208  public:
209   // |url|         -- the URL of the query.
210   // |results|     -- the structure to fill with proxy resolve results.
GetProxyForURLJob(const GURL & url,ProxyInfo * results,const CompletionCallback & callback,const BoundNetLog & net_log)211   GetProxyForURLJob(const GURL& url,
212                     ProxyInfo* results,
213                     const CompletionCallback& callback,
214                     const BoundNetLog& net_log)
215       : Job(TYPE_GET_PROXY_FOR_URL, callback),
216         results_(results),
217         net_log_(net_log),
218         url_(url),
219         was_waiting_for_thread_(false) {
220     DCHECK(!callback.is_null());
221   }
222 
net_log()223   BoundNetLog* net_log() { return &net_log_; }
224 
WaitingForThread()225   virtual void WaitingForThread() OVERRIDE {
226     was_waiting_for_thread_ = true;
227     net_log_.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
228   }
229 
FinishedWaitingForThread()230   virtual void FinishedWaitingForThread() OVERRIDE {
231     DCHECK(executor());
232 
233     if (was_waiting_for_thread_) {
234       net_log_.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
235     }
236 
237     net_log_.AddEvent(
238         NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
239         NetLog::IntegerCallback("thread_number", executor()->thread_number()));
240   }
241 
242   // Runs on the worker thread.
Run(scoped_refptr<base::MessageLoopProxy> origin_loop)243   virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
244     ProxyResolver* resolver = executor()->resolver();
245     int rv = resolver->GetProxyForURL(
246         url_, &results_buf_, CompletionCallback(), NULL, net_log_);
247     DCHECK_NE(rv, ERR_IO_PENDING);
248 
249     origin_loop->PostTask(
250         FROM_HERE,
251         base::Bind(&GetProxyForURLJob::QueryComplete, this, rv));
252   }
253 
254  protected:
~GetProxyForURLJob()255   virtual ~GetProxyForURLJob() {}
256 
257  private:
258   // Runs the completion callback on the origin thread.
QueryComplete(int result_code)259   void QueryComplete(int result_code) {
260     // The Job may have been cancelled after it was started.
261     if (!was_cancelled()) {
262       if (result_code >= OK) {  // Note: unit-tests use values > 0.
263         results_->Use(results_buf_);
264       }
265       RunUserCallback(result_code);
266     }
267     OnJobCompleted();
268   }
269 
270   // Must only be used on the "origin" thread.
271   ProxyInfo* results_;
272 
273   // Can be used on either "origin" or worker thread.
274   BoundNetLog net_log_;
275   const GURL url_;
276 
277   // Usable from within DoQuery on the worker thread.
278   ProxyInfo results_buf_;
279 
280   bool was_waiting_for_thread_;
281 };
282 
283 // MultiThreadedProxyResolver::Executor ----------------------------------------
284 
Executor(MultiThreadedProxyResolver * coordinator,ProxyResolver * resolver,int thread_number)285 MultiThreadedProxyResolver::Executor::Executor(
286     MultiThreadedProxyResolver* coordinator,
287     ProxyResolver* resolver,
288     int thread_number)
289     : coordinator_(coordinator),
290       thread_number_(thread_number),
291       resolver_(resolver) {
292   DCHECK(coordinator);
293   DCHECK(resolver);
294   // Start up the thread.
295   thread_.reset(new base::Thread(base::StringPrintf("PAC thread #%d",
296                                                     thread_number)));
297   CHECK(thread_->Start());
298 }
299 
StartJob(Job * job)300 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
301   DCHECK(!outstanding_job_.get());
302   outstanding_job_ = job;
303 
304   // Run the job. Once it has completed (regardless of whether it was
305   // cancelled), it will invoke OnJobCompleted() on this thread.
306   job->set_executor(this);
307   job->FinishedWaitingForThread();
308   thread_->message_loop()->PostTask(
309       FROM_HERE,
310       base::Bind(&Job::Run, job, base::MessageLoopProxy::current()));
311 }
312 
OnJobCompleted(Job * job)313 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
314   DCHECK_EQ(job, outstanding_job_.get());
315   outstanding_job_ = NULL;
316   coordinator_->OnExecutorReady(this);
317 }
318 
Destroy()319 void MultiThreadedProxyResolver::Executor::Destroy() {
320   DCHECK(coordinator_);
321 
322   {
323     // See http://crbug.com/69710.
324     base::ThreadRestrictions::ScopedAllowIO allow_io;
325 
326     // Join the worker thread.
327     thread_.reset();
328   }
329 
330   // Cancel any outstanding job.
331   if (outstanding_job_.get()) {
332     outstanding_job_->Cancel();
333     // Orphan the job (since this executor may be deleted soon).
334     outstanding_job_->set_executor(NULL);
335   }
336 
337   // It is now safe to free the ProxyResolver, since all the tasks that
338   // were using it on the resolver thread have completed.
339   resolver_.reset();
340 
341   // Null some stuff as a precaution.
342   coordinator_ = NULL;
343   outstanding_job_ = NULL;
344 }
345 
~Executor()346 MultiThreadedProxyResolver::Executor::~Executor() {
347   // The important cleanup happens as part of Destroy(), which should always be
348   // called first.
349   DCHECK(!coordinator_) << "Destroy() was not called";
350   DCHECK(!thread_.get());
351   DCHECK(!resolver_.get());
352   DCHECK(!outstanding_job_.get());
353 }
354 
355 // MultiThreadedProxyResolver --------------------------------------------------
356 
MultiThreadedProxyResolver(ProxyResolverFactory * resolver_factory,size_t max_num_threads)357 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
358     ProxyResolverFactory* resolver_factory,
359     size_t max_num_threads)
360     : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
361       resolver_factory_(resolver_factory),
362       max_num_threads_(max_num_threads) {
363   DCHECK_GE(max_num_threads, 1u);
364 }
365 
~MultiThreadedProxyResolver()366 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
367   // We will cancel all outstanding requests.
368   pending_jobs_.clear();
369   ReleaseAllExecutors();
370 }
371 
GetProxyForURL(const GURL & url,ProxyInfo * results,const CompletionCallback & callback,RequestHandle * request,const BoundNetLog & net_log)372 int MultiThreadedProxyResolver::GetProxyForURL(
373     const GURL& url, ProxyInfo* results, const CompletionCallback& callback,
374     RequestHandle* request, const BoundNetLog& net_log) {
375   DCHECK(CalledOnValidThread());
376   DCHECK(!callback.is_null());
377   DCHECK(current_script_data_.get())
378       << "Resolver is un-initialized. Must call SetPacScript() first!";
379 
380   scoped_refptr<GetProxyForURLJob> job(
381       new GetProxyForURLJob(url, results, callback, net_log));
382 
383   // Completion will be notified through |callback|, unless the caller cancels
384   // the request using |request|.
385   if (request)
386     *request = reinterpret_cast<RequestHandle>(job.get());
387 
388   // If there is an executor that is ready to run this request, submit it!
389   Executor* executor = FindIdleExecutor();
390   if (executor) {
391     DCHECK_EQ(0u, pending_jobs_.size());
392     executor->StartJob(job.get());
393     return ERR_IO_PENDING;
394   }
395 
396   // Otherwise queue this request. (We will schedule it to a thread once one
397   // becomes available).
398   job->WaitingForThread();
399   pending_jobs_.push_back(job);
400 
401   // If we haven't already reached the thread limit, provision a new thread to
402   // drain the requests more quickly.
403   if (executors_.size() < max_num_threads_) {
404     executor = AddNewExecutor();
405     executor->StartJob(
406         new SetPacScriptJob(current_script_data_, CompletionCallback()));
407   }
408 
409   return ERR_IO_PENDING;
410 }
411 
CancelRequest(RequestHandle req)412 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
413   DCHECK(CalledOnValidThread());
414   DCHECK(req);
415 
416   Job* job = reinterpret_cast<Job*>(req);
417   DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
418 
419   if (job->executor()) {
420     // If the job was already submitted to the executor, just mark it
421     // as cancelled so the user callback isn't run on completion.
422     job->Cancel();
423   } else {
424     // Otherwise the job is just sitting in a queue.
425     PendingJobsQueue::iterator it =
426         std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
427     DCHECK(it != pending_jobs_.end());
428     pending_jobs_.erase(it);
429   }
430 }
431 
GetLoadState(RequestHandle req) const432 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const {
433   DCHECK(CalledOnValidThread());
434   DCHECK(req);
435   return LOAD_STATE_RESOLVING_PROXY_FOR_URL;
436 }
437 
CancelSetPacScript()438 void MultiThreadedProxyResolver::CancelSetPacScript() {
439   DCHECK(CalledOnValidThread());
440   DCHECK_EQ(0u, pending_jobs_.size());
441   DCHECK_EQ(1u, executors_.size());
442   DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
443             executors_[0]->outstanding_job()->type());
444 
445   // Defensively clear some data which shouldn't be getting used
446   // anymore.
447   current_script_data_ = NULL;
448 
449   ReleaseAllExecutors();
450 }
451 
SetPacScript(const scoped_refptr<ProxyResolverScriptData> & script_data,const CompletionCallback & callback)452 int MultiThreadedProxyResolver::SetPacScript(
453     const scoped_refptr<ProxyResolverScriptData>& script_data,
454     const CompletionCallback&callback) {
455   DCHECK(CalledOnValidThread());
456   DCHECK(!callback.is_null());
457 
458   // Save the script details, so we can provision new executors later.
459   current_script_data_ = script_data;
460 
461   // The user should not have any outstanding requests when they call
462   // SetPacScript().
463   CheckNoOutstandingUserRequests();
464 
465   // Destroy all of the current threads and their proxy resolvers.
466   ReleaseAllExecutors();
467 
468   // Provision a new executor, and run the SetPacScript request. On completion
469   // notification will be sent through |callback|.
470   Executor* executor = AddNewExecutor();
471   executor->StartJob(new SetPacScriptJob(script_data, callback));
472   return ERR_IO_PENDING;
473 }
474 
CheckNoOutstandingUserRequests() const475 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
476   DCHECK(CalledOnValidThread());
477   CHECK_EQ(0u, pending_jobs_.size());
478 
479   for (ExecutorList::const_iterator it = executors_.begin();
480        it != executors_.end(); ++it) {
481     const Executor* executor = it->get();
482     Job* job = executor->outstanding_job();
483     // The "has_user_callback()" is to exclude jobs for which the callback
484     // has already been invoked, or was not user-initiated (as in the case of
485     // lazy thread provisions). User-initiated jobs may !has_user_callback()
486     // when the callback has already been run. (Since we only clear the
487     // outstanding job AFTER the callback has been invoked, it is possible
488     // for a new request to be started from within the callback).
489     CHECK(!job || job->was_cancelled() || !job->has_user_callback());
490   }
491 }
492 
ReleaseAllExecutors()493 void MultiThreadedProxyResolver::ReleaseAllExecutors() {
494   DCHECK(CalledOnValidThread());
495   for (ExecutorList::iterator it = executors_.begin();
496        it != executors_.end(); ++it) {
497     Executor* executor = it->get();
498     executor->Destroy();
499   }
500   executors_.clear();
501 }
502 
503 MultiThreadedProxyResolver::Executor*
FindIdleExecutor()504 MultiThreadedProxyResolver::FindIdleExecutor() {
505   DCHECK(CalledOnValidThread());
506   for (ExecutorList::iterator it = executors_.begin();
507        it != executors_.end(); ++it) {
508     Executor* executor = it->get();
509     if (!executor->outstanding_job())
510       return executor;
511   }
512   return NULL;
513 }
514 
515 MultiThreadedProxyResolver::Executor*
AddNewExecutor()516 MultiThreadedProxyResolver::AddNewExecutor() {
517   DCHECK(CalledOnValidThread());
518   DCHECK_LT(executors_.size(), max_num_threads_);
519   // The "thread number" is used to give the thread a unique name.
520   int thread_number = executors_.size();
521   ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
522   Executor* executor = new Executor(
523       this, resolver, thread_number);
524   executors_.push_back(make_scoped_refptr(executor));
525   return executor;
526 }
527 
OnExecutorReady(Executor * executor)528 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
529   DCHECK(CalledOnValidThread());
530   if (pending_jobs_.empty())
531     return;
532 
533   // Get the next job to process (FIFO). Transfer it from the pending queue
534   // to the executor.
535   scoped_refptr<Job> job = pending_jobs_.front();
536   pending_jobs_.pop_front();
537   executor->StartJob(job.get());
538 }
539 
540 }  // namespace net
541