• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/proxy/multi_threaded_proxy_resolver.h"
6 
7 #include "base/message_loop.h"
8 #include "base/string_util.h"
9 #include "base/stringprintf.h"
10 #include "base/threading/thread.h"
11 #include "base/threading/thread_restrictions.h"
12 #include "net/base/net_errors.h"
13 #include "net/base/net_log.h"
14 #include "net/proxy/proxy_info.h"
15 
16 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
17 //               data when SetPacScript fails. That will reclaim memory when
18 //               testing bogus scripts.
19 
20 namespace net {
21 
22 namespace {
23 
24 class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> {
25  public:
PurgeMemoryTask(ProxyResolver * resolver)26   explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {}
PurgeMemory()27   void PurgeMemory() { resolver_->PurgeMemory(); }
28  private:
29   friend class base::RefCountedThreadSafe<PurgeMemoryTask>;
~PurgeMemoryTask()30   ~PurgeMemoryTask() {}
31   ProxyResolver* resolver_;
32 };
33 
34 }  // namespace
35 
36 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
37 // thread and a synchronous ProxyResolver (which will be operated on said
38 // thread.)
39 class MultiThreadedProxyResolver::Executor
40     : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
41  public:
42   // |coordinator| must remain valid throughout our lifetime. It is used to
43   // signal when the executor is ready to receive work by calling
44   // |coordinator->OnExecutorReady()|.
45   // The constructor takes ownership of |resolver|.
46   // |thread_number| is an identifier used when naming the worker thread.
47   Executor(MultiThreadedProxyResolver* coordinator,
48            ProxyResolver* resolver,
49            int thread_number);
50 
51   // Submit a job to this executor.
52   void StartJob(Job* job);
53 
54   // Callback for when a job has completed running on the executor's thread.
55   void OnJobCompleted(Job* job);
56 
57   // Cleanup the executor. Cancels all outstanding work, and frees the thread
58   // and resolver.
59   void Destroy();
60 
61   void PurgeMemory();
62 
63   // Returns the outstanding job, or NULL.
outstanding_job() const64   Job* outstanding_job() const { return outstanding_job_.get(); }
65 
resolver()66   ProxyResolver* resolver() { return resolver_.get(); }
67 
thread_number() const68   int thread_number() const { return thread_number_; }
69 
70  private:
71   friend class base::RefCountedThreadSafe<Executor>;
72   ~Executor();
73 
74   MultiThreadedProxyResolver* coordinator_;
75   const int thread_number_;
76 
77   // The currently active job for this executor (either a SetPacScript or
78   // GetProxyForURL task).
79   scoped_refptr<Job> outstanding_job_;
80 
81   // The synchronous resolver implementation.
82   scoped_ptr<ProxyResolver> resolver_;
83 
84   // The thread where |resolver_| is run on.
85   // Note that declaration ordering is important here. |thread_| needs to be
86   // destroyed *before* |resolver_|, in case |resolver_| is currently
87   // executing on |thread_|.
88   scoped_ptr<base::Thread> thread_;
89 };
90 
91 // MultiThreadedProxyResolver::Job ---------------------------------------------
92 
93 class MultiThreadedProxyResolver::Job
94     : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
95  public:
96   // Identifies the subclass of Job (only being used for debugging purposes).
97   enum Type {
98     TYPE_GET_PROXY_FOR_URL,
99     TYPE_SET_PAC_SCRIPT,
100     TYPE_SET_PAC_SCRIPT_INTERNAL,
101   };
102 
Job(Type type,CompletionCallback * user_callback)103   Job(Type type, CompletionCallback* user_callback)
104       : type_(type),
105         user_callback_(user_callback),
106         executor_(NULL),
107         was_cancelled_(false) {
108   }
109 
set_executor(Executor * executor)110   void set_executor(Executor* executor) {
111     executor_ = executor;
112   }
113 
114   // The "executor" is the job runner that is scheduling this job. If
115   // this job has not been submitted to an executor yet, this will be
116   // NULL (and we know it hasn't started yet).
executor()117   Executor* executor() {
118     return executor_;
119   }
120 
121   // Mark the job as having been cancelled.
Cancel()122   void Cancel() {
123     was_cancelled_ = true;
124   }
125 
126   // Returns true if Cancel() has been called.
was_cancelled() const127   bool was_cancelled() const { return was_cancelled_; }
128 
type() const129   Type type() const { return type_; }
130 
131   // Returns true if this job still has a user callback. Some jobs
132   // do not have a user callback, because they were helper jobs
133   // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
134   //
135   // Otherwise jobs that correspond with user-initiated work will
136   // have a non-NULL callback up until the callback is run.
has_user_callback() const137   bool has_user_callback() const { return user_callback_ != NULL; }
138 
139   // This method is called when the job is inserted into a wait queue
140   // because no executors were ready to accept it.
WaitingForThread()141   virtual void WaitingForThread() {}
142 
143   // This method is called just before the job is posted to the work thread.
FinishedWaitingForThread()144   virtual void FinishedWaitingForThread() {}
145 
146   // This method is called on the worker thread to do the job's work. On
147   // completion, implementors are expected to call OnJobCompleted() on
148   // |origin_loop|.
149   virtual void Run(MessageLoop* origin_loop) = 0;
150 
151  protected:
OnJobCompleted()152   void OnJobCompleted() {
153     // |executor_| will be NULL if the executor has already been deleted.
154     if (executor_)
155       executor_->OnJobCompleted(this);
156   }
157 
RunUserCallback(int result)158   void RunUserCallback(int result) {
159     DCHECK(has_user_callback());
160     CompletionCallback* callback = user_callback_;
161     // Null the callback so has_user_callback() will now return false.
162     user_callback_ = NULL;
163     callback->Run(result);
164   }
165 
166   friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
167 
~Job()168   virtual ~Job() {}
169 
170  private:
171   const Type type_;
172   CompletionCallback* user_callback_;
173   Executor* executor_;
174   bool was_cancelled_;
175 };
176 
177 // MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
178 
179 // Runs on the worker thread to call ProxyResolver::SetPacScript.
180 class MultiThreadedProxyResolver::SetPacScriptJob
181     : public MultiThreadedProxyResolver::Job {
182  public:
SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData> & script_data,CompletionCallback * callback)183   SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
184                   CompletionCallback* callback)
185     : Job(callback ? TYPE_SET_PAC_SCRIPT : TYPE_SET_PAC_SCRIPT_INTERNAL,
186           callback),
187       script_data_(script_data) {
188   }
189 
190   // Runs on the worker thread.
Run(MessageLoop * origin_loop)191   virtual void Run(MessageLoop* origin_loop) {
192     ProxyResolver* resolver = executor()->resolver();
193     int rv = resolver->SetPacScript(script_data_, NULL);
194 
195     DCHECK_NE(rv, ERR_IO_PENDING);
196     origin_loop->PostTask(
197         FROM_HERE,
198         NewRunnableMethod(this, &SetPacScriptJob::RequestComplete, rv));
199   }
200 
201  private:
202   // Runs the completion callback on the origin thread.
RequestComplete(int result_code)203   void RequestComplete(int result_code) {
204     // The task may have been cancelled after it was started.
205     if (!was_cancelled() && has_user_callback()) {
206       RunUserCallback(result_code);
207     }
208     OnJobCompleted();
209   }
210 
211   const scoped_refptr<ProxyResolverScriptData> script_data_;
212 };
213 
214 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
215 
216 class MultiThreadedProxyResolver::GetProxyForURLJob
217     : public MultiThreadedProxyResolver::Job {
218  public:
219   // |url|         -- the URL of the query.
220   // |results|     -- the structure to fill with proxy resolve results.
GetProxyForURLJob(const GURL & url,ProxyInfo * results,CompletionCallback * callback,const BoundNetLog & net_log)221   GetProxyForURLJob(const GURL& url,
222                     ProxyInfo* results,
223                     CompletionCallback* callback,
224                     const BoundNetLog& net_log)
225       : Job(TYPE_GET_PROXY_FOR_URL, callback),
226         results_(results),
227         net_log_(net_log),
228         url_(url),
229         was_waiting_for_thread_(false) {
230     DCHECK(callback);
231   }
232 
net_log()233   BoundNetLog* net_log() { return &net_log_; }
234 
WaitingForThread()235   virtual void WaitingForThread() {
236     was_waiting_for_thread_ = true;
237     net_log_.BeginEvent(
238         NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
239   }
240 
FinishedWaitingForThread()241   virtual void FinishedWaitingForThread() {
242     DCHECK(executor());
243 
244     if (was_waiting_for_thread_) {
245       net_log_.EndEvent(
246           NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
247     }
248 
249     net_log_.AddEvent(
250         NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
251         make_scoped_refptr(new NetLogIntegerParameter(
252             "thread_number", executor()->thread_number())));
253   }
254 
255   // Runs on the worker thread.
Run(MessageLoop * origin_loop)256   virtual void Run(MessageLoop* origin_loop) {
257     ProxyResolver* resolver = executor()->resolver();
258     int rv = resolver->GetProxyForURL(
259         url_, &results_buf_, NULL, NULL, net_log_);
260     DCHECK_NE(rv, ERR_IO_PENDING);
261 
262     origin_loop->PostTask(
263         FROM_HERE,
264         NewRunnableMethod(this, &GetProxyForURLJob::QueryComplete, rv));
265   }
266 
267  private:
268   // Runs the completion callback on the origin thread.
QueryComplete(int result_code)269   void QueryComplete(int result_code) {
270     // The Job may have been cancelled after it was started.
271     if (!was_cancelled()) {
272       if (result_code >= OK) {  // Note: unit-tests use values > 0.
273         results_->Use(results_buf_);
274       }
275       RunUserCallback(result_code);
276     }
277     OnJobCompleted();
278   }
279 
280   // Must only be used on the "origin" thread.
281   ProxyInfo* results_;
282 
283   // Can be used on either "origin" or worker thread.
284   BoundNetLog net_log_;
285   const GURL url_;
286 
287   // Usable from within DoQuery on the worker thread.
288   ProxyInfo results_buf_;
289 
290   bool was_waiting_for_thread_;
291 };
292 
293 // MultiThreadedProxyResolver::Executor ----------------------------------------
294 
Executor(MultiThreadedProxyResolver * coordinator,ProxyResolver * resolver,int thread_number)295 MultiThreadedProxyResolver::Executor::Executor(
296     MultiThreadedProxyResolver* coordinator,
297     ProxyResolver* resolver,
298     int thread_number)
299     : coordinator_(coordinator),
300       thread_number_(thread_number),
301       resolver_(resolver) {
302   DCHECK(coordinator);
303   DCHECK(resolver);
304   // Start up the thread.
305   // Note that it is safe to pass a temporary C-String to Thread(), as it will
306   // make a copy.
307   std::string thread_name =
308       base::StringPrintf("PAC thread #%d", thread_number);
309   thread_.reset(new base::Thread(thread_name.c_str()));
310   CHECK(thread_->Start());
311 }
312 
StartJob(Job * job)313 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
314   DCHECK(!outstanding_job_);
315   outstanding_job_ = job;
316 
317   // Run the job. Once it has completed (regardless of whether it was
318   // cancelled), it will invoke OnJobCompleted() on this thread.
319   job->set_executor(this);
320   job->FinishedWaitingForThread();
321   thread_->message_loop()->PostTask(
322       FROM_HERE,
323       NewRunnableMethod(job, &Job::Run, MessageLoop::current()));
324 }
325 
OnJobCompleted(Job * job)326 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
327   DCHECK_EQ(job, outstanding_job_.get());
328   outstanding_job_ = NULL;
329   coordinator_->OnExecutorReady(this);
330 }
331 
Destroy()332 void MultiThreadedProxyResolver::Executor::Destroy() {
333   DCHECK(coordinator_);
334 
335   // Give the resolver an opportunity to shutdown from THIS THREAD before
336   // joining on the resolver thread. This allows certain implementations
337   // to avoid deadlocks.
338   resolver_->Shutdown();
339 
340   {
341     // See http://crbug.com/69710.
342     base::ThreadRestrictions::ScopedAllowIO allow_io;
343 
344     // Join the worker thread.
345     thread_.reset();
346   }
347 
348   // Cancel any outstanding job.
349   if (outstanding_job_) {
350     outstanding_job_->Cancel();
351     // Orphan the job (since this executor may be deleted soon).
352     outstanding_job_->set_executor(NULL);
353   }
354 
355   // It is now safe to free the ProxyResolver, since all the tasks that
356   // were using it on the resolver thread have completed.
357   resolver_.reset();
358 
359   // Null some stuff as a precaution.
360   coordinator_ = NULL;
361   outstanding_job_ = NULL;
362 }
363 
PurgeMemory()364 void MultiThreadedProxyResolver::Executor::PurgeMemory() {
365   scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get()));
366   thread_->message_loop()->PostTask(
367       FROM_HERE,
368       NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory));
369 }
370 
~Executor()371 MultiThreadedProxyResolver::Executor::~Executor() {
372   // The important cleanup happens as part of Destroy(), which should always be
373   // called first.
374   DCHECK(!coordinator_) << "Destroy() was not called";
375   DCHECK(!thread_.get());
376   DCHECK(!resolver_.get());
377   DCHECK(!outstanding_job_);
378 }
379 
380 // MultiThreadedProxyResolver --------------------------------------------------
381 
MultiThreadedProxyResolver(ProxyResolverFactory * resolver_factory,size_t max_num_threads)382 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
383     ProxyResolverFactory* resolver_factory,
384     size_t max_num_threads)
385     : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
386       resolver_factory_(resolver_factory),
387       max_num_threads_(max_num_threads) {
388   DCHECK_GE(max_num_threads, 1u);
389 }
390 
~MultiThreadedProxyResolver()391 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
392   // We will cancel all outstanding requests.
393   pending_jobs_.clear();
394   ReleaseAllExecutors();
395 }
396 
GetProxyForURL(const GURL & url,ProxyInfo * results,CompletionCallback * callback,RequestHandle * request,const BoundNetLog & net_log)397 int MultiThreadedProxyResolver::GetProxyForURL(const GURL& url,
398                                                ProxyInfo* results,
399                                                CompletionCallback* callback,
400                                                RequestHandle* request,
401                                                const BoundNetLog& net_log) {
402   DCHECK(CalledOnValidThread());
403   DCHECK(callback);
404   DCHECK(current_script_data_.get())
405       << "Resolver is un-initialized. Must call SetPacScript() first!";
406 
407   scoped_refptr<GetProxyForURLJob> job(
408       new GetProxyForURLJob(url, results, callback, net_log));
409 
410   // Completion will be notified through |callback|, unless the caller cancels
411   // the request using |request|.
412   if (request)
413     *request = reinterpret_cast<RequestHandle>(job.get());
414 
415   // If there is an executor that is ready to run this request, submit it!
416   Executor* executor = FindIdleExecutor();
417   if (executor) {
418     DCHECK_EQ(0u, pending_jobs_.size());
419     executor->StartJob(job);
420     return ERR_IO_PENDING;
421   }
422 
423   // Otherwise queue this request. (We will schedule it to a thread once one
424   // becomes available).
425   job->WaitingForThread();
426   pending_jobs_.push_back(job);
427 
428   // If we haven't already reached the thread limit, provision a new thread to
429   // drain the requests more quickly.
430   if (executors_.size() < max_num_threads_) {
431     executor = AddNewExecutor();
432     executor->StartJob(
433         new SetPacScriptJob(current_script_data_, NULL));
434   }
435 
436   return ERR_IO_PENDING;
437 }
438 
CancelRequest(RequestHandle req)439 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
440   DCHECK(CalledOnValidThread());
441   DCHECK(req);
442 
443   Job* job = reinterpret_cast<Job*>(req);
444   DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
445 
446   if (job->executor()) {
447     // If the job was already submitted to the executor, just mark it
448     // as cancelled so the user callback isn't run on completion.
449     job->Cancel();
450   } else {
451     // Otherwise the job is just sitting in a queue.
452     PendingJobsQueue::iterator it =
453         std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
454     DCHECK(it != pending_jobs_.end());
455     pending_jobs_.erase(it);
456   }
457 }
458 
CancelSetPacScript()459 void MultiThreadedProxyResolver::CancelSetPacScript() {
460   DCHECK(CalledOnValidThread());
461   DCHECK_EQ(0u, pending_jobs_.size());
462   DCHECK_EQ(1u, executors_.size());
463   DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
464             executors_[0]->outstanding_job()->type());
465 
466   // Defensively clear some data which shouldn't be getting used
467   // anymore.
468   current_script_data_ = NULL;
469 
470   ReleaseAllExecutors();
471 }
472 
PurgeMemory()473 void MultiThreadedProxyResolver::PurgeMemory() {
474   DCHECK(CalledOnValidThread());
475   for (ExecutorList::iterator it = executors_.begin();
476        it != executors_.end(); ++it) {
477     Executor* executor = *it;
478     executor->PurgeMemory();
479   }
480 }
481 
SetPacScript(const scoped_refptr<ProxyResolverScriptData> & script_data,CompletionCallback * callback)482 int MultiThreadedProxyResolver::SetPacScript(
483     const scoped_refptr<ProxyResolverScriptData>& script_data,
484     CompletionCallback* callback) {
485   DCHECK(CalledOnValidThread());
486   DCHECK(callback);
487 
488   // Save the script details, so we can provision new executors later.
489   current_script_data_ = script_data;
490 
491   // The user should not have any outstanding requests when they call
492   // SetPacScript().
493   CheckNoOutstandingUserRequests();
494 
495   // Destroy all of the current threads and their proxy resolvers.
496   ReleaseAllExecutors();
497 
498   // Provision a new executor, and run the SetPacScript request. On completion
499   // notification will be sent through |callback|.
500   Executor* executor = AddNewExecutor();
501   executor->StartJob(new SetPacScriptJob(script_data, callback));
502   return ERR_IO_PENDING;
503 }
504 
CheckNoOutstandingUserRequests() const505 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
506   DCHECK(CalledOnValidThread());
507   CHECK_EQ(0u, pending_jobs_.size());
508 
509   for (ExecutorList::const_iterator it = executors_.begin();
510        it != executors_.end(); ++it) {
511     const Executor* executor = *it;
512     Job* job = executor->outstanding_job();
513     // The "has_user_callback()" is to exclude jobs for which the callback
514     // has already been invoked, or was not user-initiated (as in the case of
515     // lazy thread provisions). User-initiated jobs may !has_user_callback()
516     // when the callback has already been run. (Since we only clear the
517     // outstanding job AFTER the callback has been invoked, it is possible
518     // for a new request to be started from within the callback).
519     CHECK(!job || job->was_cancelled() || !job->has_user_callback());
520   }
521 }
522 
ReleaseAllExecutors()523 void MultiThreadedProxyResolver::ReleaseAllExecutors() {
524   DCHECK(CalledOnValidThread());
525   for (ExecutorList::iterator it = executors_.begin();
526        it != executors_.end(); ++it) {
527     Executor* executor = *it;
528     executor->Destroy();
529   }
530   executors_.clear();
531 }
532 
533 MultiThreadedProxyResolver::Executor*
FindIdleExecutor()534 MultiThreadedProxyResolver::FindIdleExecutor() {
535   DCHECK(CalledOnValidThread());
536   for (ExecutorList::iterator it = executors_.begin();
537        it != executors_.end(); ++it) {
538     Executor* executor = *it;
539     if (!executor->outstanding_job())
540       return executor;
541   }
542   return NULL;
543 }
544 
545 MultiThreadedProxyResolver::Executor*
AddNewExecutor()546 MultiThreadedProxyResolver::AddNewExecutor() {
547   DCHECK(CalledOnValidThread());
548   DCHECK_LT(executors_.size(), max_num_threads_);
549   // The "thread number" is used to give the thread a unique name.
550   int thread_number = executors_.size();
551   ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
552   Executor* executor = new Executor(
553       this, resolver, thread_number);
554   executors_.push_back(make_scoped_refptr(executor));
555   return executor;
556 }
557 
OnExecutorReady(Executor * executor)558 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
559   DCHECK(CalledOnValidThread());
560   if (pending_jobs_.empty())
561     return;
562 
563   // Get the next job to process (FIFO). Transfer it from the pending queue
564   // to the executor.
565   scoped_refptr<Job> job = pending_jobs_.front();
566   pending_jobs_.pop_front();
567   executor->StartJob(job);
568 }
569 
570 }  // namespace net
571