• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/proxy/multi_threaded_proxy_resolver.h"
6 
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/message_loop/message_loop_proxy.h"
10 #include "base/metrics/histogram.h"
11 #include "base/strings/string_util.h"
12 #include "base/strings/stringprintf.h"
13 #include "base/threading/thread.h"
14 #include "base/threading/thread_restrictions.h"
15 #include "net/base/net_errors.h"
16 #include "net/base/net_log.h"
17 #include "net/proxy/proxy_info.h"
18 
19 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
20 //               data when SetPacScript fails. That will reclaim memory when
21 //               testing bogus scripts.
22 
23 namespace net {
24 
25 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
26 // thread and a synchronous ProxyResolver (which will be operated on said
27 // thread.)
28 class MultiThreadedProxyResolver::Executor
29     : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
30  public:
31   // |coordinator| must remain valid throughout our lifetime. It is used to
32   // signal when the executor is ready to receive work by calling
33   // |coordinator->OnExecutorReady()|.
34   // The constructor takes ownership of |resolver|.
35   // |thread_number| is an identifier used when naming the worker thread.
36   Executor(MultiThreadedProxyResolver* coordinator,
37            ProxyResolver* resolver,
38            int thread_number);
39 
40   // Submit a job to this executor.
41   void StartJob(Job* job);
42 
43   // Callback for when a job has completed running on the executor's thread.
44   void OnJobCompleted(Job* job);
45 
46   // Cleanup the executor. Cancels all outstanding work, and frees the thread
47   // and resolver.
48   void Destroy();
49 
50   // Returns the outstanding job, or NULL.
outstanding_job() const51   Job* outstanding_job() const { return outstanding_job_.get(); }
52 
resolver()53   ProxyResolver* resolver() { return resolver_.get(); }
54 
thread_number() const55   int thread_number() const { return thread_number_; }
56 
57  private:
58   friend class base::RefCountedThreadSafe<Executor>;
59   ~Executor();
60 
61   MultiThreadedProxyResolver* coordinator_;
62   const int thread_number_;
63 
64   // The currently active job for this executor (either a SetPacScript or
65   // GetProxyForURL task).
66   scoped_refptr<Job> outstanding_job_;
67 
68   // The synchronous resolver implementation.
69   scoped_ptr<ProxyResolver> resolver_;
70 
71   // The thread where |resolver_| is run on.
72   // Note that declaration ordering is important here. |thread_| needs to be
73   // destroyed *before* |resolver_|, in case |resolver_| is currently
74   // executing on |thread_|.
75   scoped_ptr<base::Thread> thread_;
76 };
77 
78 // MultiThreadedProxyResolver::Job ---------------------------------------------
79 
80 class MultiThreadedProxyResolver::Job
81     : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
82  public:
83   // Identifies the subclass of Job (only being used for debugging purposes).
84   enum Type {
85     TYPE_GET_PROXY_FOR_URL,
86     TYPE_SET_PAC_SCRIPT,
87     TYPE_SET_PAC_SCRIPT_INTERNAL,
88   };
89 
Job(Type type,const CompletionCallback & callback)90   Job(Type type, const CompletionCallback& callback)
91       : type_(type),
92         callback_(callback),
93         executor_(NULL),
94         was_cancelled_(false) {
95   }
96 
set_executor(Executor * executor)97   void set_executor(Executor* executor) {
98     executor_ = executor;
99   }
100 
101   // The "executor" is the job runner that is scheduling this job. If
102   // this job has not been submitted to an executor yet, this will be
103   // NULL (and we know it hasn't started yet).
executor()104   Executor* executor() {
105     return executor_;
106   }
107 
108   // Mark the job as having been cancelled.
Cancel()109   void Cancel() {
110     was_cancelled_ = true;
111   }
112 
113   // Returns true if Cancel() has been called.
was_cancelled() const114   bool was_cancelled() const { return was_cancelled_; }
115 
type() const116   Type type() const { return type_; }
117 
118   // Returns true if this job still has a user callback. Some jobs
119   // do not have a user callback, because they were helper jobs
120   // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
121   //
122   // Otherwise jobs that correspond with user-initiated work will
123   // have a non-null callback up until the callback is run.
has_user_callback() const124   bool has_user_callback() const { return !callback_.is_null(); }
125 
126   // This method is called when the job is inserted into a wait queue
127   // because no executors were ready to accept it.
WaitingForThread()128   virtual void WaitingForThread() {}
129 
130   // This method is called just before the job is posted to the work thread.
FinishedWaitingForThread()131   virtual void FinishedWaitingForThread() {}
132 
133   // This method is called on the worker thread to do the job's work. On
134   // completion, implementors are expected to call OnJobCompleted() on
135   // |origin_loop|.
136   virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) = 0;
137 
138  protected:
OnJobCompleted()139   void OnJobCompleted() {
140     // |executor_| will be NULL if the executor has already been deleted.
141     if (executor_)
142       executor_->OnJobCompleted(this);
143   }
144 
RunUserCallback(int result)145   void RunUserCallback(int result) {
146     DCHECK(has_user_callback());
147     CompletionCallback callback = callback_;
148     // Reset the callback so has_user_callback() will now return false.
149     callback_.Reset();
150     callback.Run(result);
151   }
152 
153   friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
154 
~Job()155   virtual ~Job() {}
156 
157  private:
158   const Type type_;
159   CompletionCallback callback_;
160   Executor* executor_;
161   bool was_cancelled_;
162 };
163 
164 // MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
165 
166 // Runs on the worker thread to call ProxyResolver::SetPacScript.
167 class MultiThreadedProxyResolver::SetPacScriptJob
168     : public MultiThreadedProxyResolver::Job {
169  public:
SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData> & script_data,const CompletionCallback & callback)170   SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
171                   const CompletionCallback& callback)
172     : Job(!callback.is_null() ? TYPE_SET_PAC_SCRIPT :
173                                 TYPE_SET_PAC_SCRIPT_INTERNAL,
174           callback),
175       script_data_(script_data) {
176   }
177 
178   // Runs on the worker thread.
Run(scoped_refptr<base::MessageLoopProxy> origin_loop)179   virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
180     ProxyResolver* resolver = executor()->resolver();
181     int rv = resolver->SetPacScript(script_data_, CompletionCallback());
182 
183     DCHECK_NE(rv, ERR_IO_PENDING);
184     origin_loop->PostTask(
185         FROM_HERE,
186         base::Bind(&SetPacScriptJob::RequestComplete, this, rv));
187   }
188 
189  protected:
~SetPacScriptJob()190   virtual ~SetPacScriptJob() {}
191 
192  private:
193   // Runs the completion callback on the origin thread.
RequestComplete(int result_code)194   void RequestComplete(int result_code) {
195     // The task may have been cancelled after it was started.
196     if (!was_cancelled() && has_user_callback()) {
197       RunUserCallback(result_code);
198     }
199     OnJobCompleted();
200   }
201 
202   const scoped_refptr<ProxyResolverScriptData> script_data_;
203 };
204 
205 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
206 
207 class MultiThreadedProxyResolver::GetProxyForURLJob
208     : public MultiThreadedProxyResolver::Job {
209  public:
210   // |url|         -- the URL of the query.
211   // |results|     -- the structure to fill with proxy resolve results.
GetProxyForURLJob(const GURL & url,ProxyInfo * results,const CompletionCallback & callback,const BoundNetLog & net_log)212   GetProxyForURLJob(const GURL& url,
213                     ProxyInfo* results,
214                     const CompletionCallback& callback,
215                     const BoundNetLog& net_log)
216       : Job(TYPE_GET_PROXY_FOR_URL, callback),
217         results_(results),
218         net_log_(net_log),
219         url_(url),
220         was_waiting_for_thread_(false) {
221     DCHECK(!callback.is_null());
222     start_time_ = base::TimeTicks::Now();
223   }
224 
net_log()225   BoundNetLog* net_log() { return &net_log_; }
226 
WaitingForThread()227   virtual void WaitingForThread() OVERRIDE {
228     was_waiting_for_thread_ = true;
229     net_log_.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
230   }
231 
FinishedWaitingForThread()232   virtual void FinishedWaitingForThread() OVERRIDE {
233     DCHECK(executor());
234 
235     submitted_to_thread_time_ = base::TimeTicks::Now();
236 
237     if (was_waiting_for_thread_) {
238       net_log_.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
239     }
240 
241     net_log_.AddEvent(
242         NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
243         NetLog::IntegerCallback("thread_number", executor()->thread_number()));
244   }
245 
246   // Runs on the worker thread.
Run(scoped_refptr<base::MessageLoopProxy> origin_loop)247   virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
248     ProxyResolver* resolver = executor()->resolver();
249     int rv = resolver->GetProxyForURL(
250         url_, &results_buf_, CompletionCallback(), NULL, net_log_);
251     DCHECK_NE(rv, ERR_IO_PENDING);
252 
253     origin_loop->PostTask(
254         FROM_HERE,
255         base::Bind(&GetProxyForURLJob::QueryComplete, this, rv));
256   }
257 
258  protected:
~GetProxyForURLJob()259   virtual ~GetProxyForURLJob() {}
260 
261  private:
262   // Runs the completion callback on the origin thread.
QueryComplete(int result_code)263   void QueryComplete(int result_code) {
264     // The Job may have been cancelled after it was started.
265     if (!was_cancelled()) {
266       RecordPerformanceMetrics();
267       if (result_code >= OK) {  // Note: unit-tests use values > 0.
268         results_->Use(results_buf_);
269       }
270       RunUserCallback(result_code);
271     }
272     OnJobCompleted();
273   }
274 
RecordPerformanceMetrics()275   void RecordPerformanceMetrics() {
276     DCHECK(!was_cancelled());
277 
278     base::TimeTicks now = base::TimeTicks::Now();
279 
280     // Log the total time the request took to complete.
281     UMA_HISTOGRAM_MEDIUM_TIMES("Net.MTPR_GetProxyForUrl_Time",
282                                now - start_time_);
283 
284     // Log the time the request was stalled waiting for a thread to free up.
285     UMA_HISTOGRAM_MEDIUM_TIMES("Net.MTPR_GetProxyForUrl_Thread_Wait_Time",
286                                submitted_to_thread_time_ - start_time_);
287   }
288 
289   // Must only be used on the "origin" thread.
290   ProxyInfo* results_;
291 
292   // Can be used on either "origin" or worker thread.
293   BoundNetLog net_log_;
294   const GURL url_;
295 
296   // Usable from within DoQuery on the worker thread.
297   ProxyInfo results_buf_;
298 
299   base::TimeTicks start_time_;
300   base::TimeTicks submitted_to_thread_time_;
301 
302   bool was_waiting_for_thread_;
303 };
304 
305 // MultiThreadedProxyResolver::Executor ----------------------------------------
306 
Executor(MultiThreadedProxyResolver * coordinator,ProxyResolver * resolver,int thread_number)307 MultiThreadedProxyResolver::Executor::Executor(
308     MultiThreadedProxyResolver* coordinator,
309     ProxyResolver* resolver,
310     int thread_number)
311     : coordinator_(coordinator),
312       thread_number_(thread_number),
313       resolver_(resolver) {
314   DCHECK(coordinator);
315   DCHECK(resolver);
316   // Start up the thread.
317   thread_.reset(new base::Thread(base::StringPrintf("PAC thread #%d",
318                                                     thread_number)));
319   CHECK(thread_->Start());
320 }
321 
StartJob(Job * job)322 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
323   DCHECK(!outstanding_job_.get());
324   outstanding_job_ = job;
325 
326   // Run the job. Once it has completed (regardless of whether it was
327   // cancelled), it will invoke OnJobCompleted() on this thread.
328   job->set_executor(this);
329   job->FinishedWaitingForThread();
330   thread_->message_loop()->PostTask(
331       FROM_HERE,
332       base::Bind(&Job::Run, job, base::MessageLoopProxy::current()));
333 }
334 
OnJobCompleted(Job * job)335 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
336   DCHECK_EQ(job, outstanding_job_.get());
337   outstanding_job_ = NULL;
338   coordinator_->OnExecutorReady(this);
339 }
340 
Destroy()341 void MultiThreadedProxyResolver::Executor::Destroy() {
342   DCHECK(coordinator_);
343 
344   {
345     // See http://crbug.com/69710.
346     base::ThreadRestrictions::ScopedAllowIO allow_io;
347 
348     // Join the worker thread.
349     thread_.reset();
350   }
351 
352   // Cancel any outstanding job.
353   if (outstanding_job_.get()) {
354     outstanding_job_->Cancel();
355     // Orphan the job (since this executor may be deleted soon).
356     outstanding_job_->set_executor(NULL);
357   }
358 
359   // It is now safe to free the ProxyResolver, since all the tasks that
360   // were using it on the resolver thread have completed.
361   resolver_.reset();
362 
363   // Null some stuff as a precaution.
364   coordinator_ = NULL;
365   outstanding_job_ = NULL;
366 }
367 
~Executor()368 MultiThreadedProxyResolver::Executor::~Executor() {
369   // The important cleanup happens as part of Destroy(), which should always be
370   // called first.
371   DCHECK(!coordinator_) << "Destroy() was not called";
372   DCHECK(!thread_.get());
373   DCHECK(!resolver_.get());
374   DCHECK(!outstanding_job_.get());
375 }
376 
377 // MultiThreadedProxyResolver --------------------------------------------------
378 
MultiThreadedProxyResolver(ProxyResolverFactory * resolver_factory,size_t max_num_threads)379 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
380     ProxyResolverFactory* resolver_factory,
381     size_t max_num_threads)
382     : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
383       resolver_factory_(resolver_factory),
384       max_num_threads_(max_num_threads) {
385   DCHECK_GE(max_num_threads, 1u);
386 }
387 
~MultiThreadedProxyResolver()388 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
389   // We will cancel all outstanding requests.
390   pending_jobs_.clear();
391   ReleaseAllExecutors();
392 }
393 
GetProxyForURL(const GURL & url,ProxyInfo * results,const CompletionCallback & callback,RequestHandle * request,const BoundNetLog & net_log)394 int MultiThreadedProxyResolver::GetProxyForURL(
395     const GURL& url, ProxyInfo* results, const CompletionCallback& callback,
396     RequestHandle* request, const BoundNetLog& net_log) {
397   DCHECK(CalledOnValidThread());
398   DCHECK(!callback.is_null());
399   DCHECK(current_script_data_.get())
400       << "Resolver is un-initialized. Must call SetPacScript() first!";
401 
402   scoped_refptr<GetProxyForURLJob> job(
403       new GetProxyForURLJob(url, results, callback, net_log));
404 
405   // Completion will be notified through |callback|, unless the caller cancels
406   // the request using |request|.
407   if (request)
408     *request = reinterpret_cast<RequestHandle>(job.get());
409 
410   // If there is an executor that is ready to run this request, submit it!
411   Executor* executor = FindIdleExecutor();
412   if (executor) {
413     DCHECK_EQ(0u, pending_jobs_.size());
414     executor->StartJob(job.get());
415     return ERR_IO_PENDING;
416   }
417 
418   // Otherwise queue this request. (We will schedule it to a thread once one
419   // becomes available).
420   job->WaitingForThread();
421   pending_jobs_.push_back(job);
422 
423   // If we haven't already reached the thread limit, provision a new thread to
424   // drain the requests more quickly.
425   if (executors_.size() < max_num_threads_) {
426     executor = AddNewExecutor();
427     executor->StartJob(
428         new SetPacScriptJob(current_script_data_, CompletionCallback()));
429   }
430 
431   return ERR_IO_PENDING;
432 }
433 
CancelRequest(RequestHandle req)434 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
435   DCHECK(CalledOnValidThread());
436   DCHECK(req);
437 
438   Job* job = reinterpret_cast<Job*>(req);
439   DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
440 
441   if (job->executor()) {
442     // If the job was already submitted to the executor, just mark it
443     // as cancelled so the user callback isn't run on completion.
444     job->Cancel();
445   } else {
446     // Otherwise the job is just sitting in a queue.
447     PendingJobsQueue::iterator it =
448         std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
449     DCHECK(it != pending_jobs_.end());
450     pending_jobs_.erase(it);
451   }
452 }
453 
GetLoadState(RequestHandle req) const454 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const {
455   DCHECK(CalledOnValidThread());
456   DCHECK(req);
457   return LOAD_STATE_RESOLVING_PROXY_FOR_URL;
458 }
459 
CancelSetPacScript()460 void MultiThreadedProxyResolver::CancelSetPacScript() {
461   DCHECK(CalledOnValidThread());
462   DCHECK_EQ(0u, pending_jobs_.size());
463   DCHECK_EQ(1u, executors_.size());
464   DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
465             executors_[0]->outstanding_job()->type());
466 
467   // Defensively clear some data which shouldn't be getting used
468   // anymore.
469   current_script_data_ = NULL;
470 
471   ReleaseAllExecutors();
472 }
473 
SetPacScript(const scoped_refptr<ProxyResolverScriptData> & script_data,const CompletionCallback & callback)474 int MultiThreadedProxyResolver::SetPacScript(
475     const scoped_refptr<ProxyResolverScriptData>& script_data,
476     const CompletionCallback&callback) {
477   DCHECK(CalledOnValidThread());
478   DCHECK(!callback.is_null());
479 
480   // Save the script details, so we can provision new executors later.
481   current_script_data_ = script_data;
482 
483   // The user should not have any outstanding requests when they call
484   // SetPacScript().
485   CheckNoOutstandingUserRequests();
486 
487   // Destroy all of the current threads and their proxy resolvers.
488   ReleaseAllExecutors();
489 
490   // Provision a new executor, and run the SetPacScript request. On completion
491   // notification will be sent through |callback|.
492   Executor* executor = AddNewExecutor();
493   executor->StartJob(new SetPacScriptJob(script_data, callback));
494   return ERR_IO_PENDING;
495 }
496 
CheckNoOutstandingUserRequests() const497 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
498   DCHECK(CalledOnValidThread());
499   CHECK_EQ(0u, pending_jobs_.size());
500 
501   for (ExecutorList::const_iterator it = executors_.begin();
502        it != executors_.end(); ++it) {
503     const Executor* executor = it->get();
504     Job* job = executor->outstanding_job();
505     // The "has_user_callback()" is to exclude jobs for which the callback
506     // has already been invoked, or was not user-initiated (as in the case of
507     // lazy thread provisions). User-initiated jobs may !has_user_callback()
508     // when the callback has already been run. (Since we only clear the
509     // outstanding job AFTER the callback has been invoked, it is possible
510     // for a new request to be started from within the callback).
511     CHECK(!job || job->was_cancelled() || !job->has_user_callback());
512   }
513 }
514 
ReleaseAllExecutors()515 void MultiThreadedProxyResolver::ReleaseAllExecutors() {
516   DCHECK(CalledOnValidThread());
517   for (ExecutorList::iterator it = executors_.begin();
518        it != executors_.end(); ++it) {
519     Executor* executor = it->get();
520     executor->Destroy();
521   }
522   executors_.clear();
523 }
524 
525 MultiThreadedProxyResolver::Executor*
FindIdleExecutor()526 MultiThreadedProxyResolver::FindIdleExecutor() {
527   DCHECK(CalledOnValidThread());
528   for (ExecutorList::iterator it = executors_.begin();
529        it != executors_.end(); ++it) {
530     Executor* executor = it->get();
531     if (!executor->outstanding_job())
532       return executor;
533   }
534   return NULL;
535 }
536 
537 MultiThreadedProxyResolver::Executor*
AddNewExecutor()538 MultiThreadedProxyResolver::AddNewExecutor() {
539   DCHECK(CalledOnValidThread());
540   DCHECK_LT(executors_.size(), max_num_threads_);
541   // The "thread number" is used to give the thread a unique name.
542   int thread_number = executors_.size();
543   ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
544   Executor* executor = new Executor(
545       this, resolver, thread_number);
546   executors_.push_back(make_scoped_refptr(executor));
547   return executor;
548 }
549 
OnExecutorReady(Executor * executor)550 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
551   DCHECK(CalledOnValidThread());
552   if (pending_jobs_.empty())
553     return;
554 
555   // Get the next job to process (FIFO). Transfer it from the pending queue
556   // to the executor.
557   scoped_refptr<Job> job = pending_jobs_.front();
558   pending_jobs_.pop_front();
559   executor->StartJob(job.get());
560 }
561 
562 }  // namespace net
563