1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/proxy/multi_threaded_proxy_resolver.h"
6
7 #include "base/message_loop.h"
8 #include "base/string_util.h"
9 #include "base/stringprintf.h"
10 #include "base/threading/thread.h"
11 #include "base/threading/thread_restrictions.h"
12 #include "net/base/net_errors.h"
13 #include "net/base/net_log.h"
14 #include "net/proxy/proxy_info.h"
15
16 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
17 // data when SetPacScript fails. That will reclaim memory when
18 // testing bogus scripts.
19
20 namespace net {
21
22 namespace {
23
24 class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> {
25 public:
PurgeMemoryTask(ProxyResolver * resolver)26 explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {}
PurgeMemory()27 void PurgeMemory() { resolver_->PurgeMemory(); }
28 private:
29 friend class base::RefCountedThreadSafe<PurgeMemoryTask>;
~PurgeMemoryTask()30 ~PurgeMemoryTask() {}
31 ProxyResolver* resolver_;
32 };
33
34 } // namespace
35
36 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
37 // thread and a synchronous ProxyResolver (which will be operated on said
38 // thread.)
39 class MultiThreadedProxyResolver::Executor
40 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
41 public:
42 // |coordinator| must remain valid throughout our lifetime. It is used to
43 // signal when the executor is ready to receive work by calling
44 // |coordinator->OnExecutorReady()|.
45 // The constructor takes ownership of |resolver|.
46 // |thread_number| is an identifier used when naming the worker thread.
47 Executor(MultiThreadedProxyResolver* coordinator,
48 ProxyResolver* resolver,
49 int thread_number);
50
51 // Submit a job to this executor.
52 void StartJob(Job* job);
53
54 // Callback for when a job has completed running on the executor's thread.
55 void OnJobCompleted(Job* job);
56
57 // Cleanup the executor. Cancels all outstanding work, and frees the thread
58 // and resolver.
59 void Destroy();
60
61 void PurgeMemory();
62
63 // Returns the outstanding job, or NULL.
outstanding_job() const64 Job* outstanding_job() const { return outstanding_job_.get(); }
65
resolver()66 ProxyResolver* resolver() { return resolver_.get(); }
67
thread_number() const68 int thread_number() const { return thread_number_; }
69
70 private:
71 friend class base::RefCountedThreadSafe<Executor>;
72 ~Executor();
73
74 MultiThreadedProxyResolver* coordinator_;
75 const int thread_number_;
76
77 // The currently active job for this executor (either a SetPacScript or
78 // GetProxyForURL task).
79 scoped_refptr<Job> outstanding_job_;
80
81 // The synchronous resolver implementation.
82 scoped_ptr<ProxyResolver> resolver_;
83
84 // The thread where |resolver_| is run on.
85 // Note that declaration ordering is important here. |thread_| needs to be
86 // destroyed *before* |resolver_|, in case |resolver_| is currently
87 // executing on |thread_|.
88 scoped_ptr<base::Thread> thread_;
89 };
90
91 // MultiThreadedProxyResolver::Job ---------------------------------------------
92
93 class MultiThreadedProxyResolver::Job
94 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
95 public:
96 // Identifies the subclass of Job (only being used for debugging purposes).
97 enum Type {
98 TYPE_GET_PROXY_FOR_URL,
99 TYPE_SET_PAC_SCRIPT,
100 TYPE_SET_PAC_SCRIPT_INTERNAL,
101 };
102
Job(Type type,CompletionCallback * user_callback)103 Job(Type type, CompletionCallback* user_callback)
104 : type_(type),
105 user_callback_(user_callback),
106 executor_(NULL),
107 was_cancelled_(false) {
108 }
109
set_executor(Executor * executor)110 void set_executor(Executor* executor) {
111 executor_ = executor;
112 }
113
114 // The "executor" is the job runner that is scheduling this job. If
115 // this job has not been submitted to an executor yet, this will be
116 // NULL (and we know it hasn't started yet).
executor()117 Executor* executor() {
118 return executor_;
119 }
120
121 // Mark the job as having been cancelled.
Cancel()122 void Cancel() {
123 was_cancelled_ = true;
124 }
125
126 // Returns true if Cancel() has been called.
was_cancelled() const127 bool was_cancelled() const { return was_cancelled_; }
128
type() const129 Type type() const { return type_; }
130
131 // Returns true if this job still has a user callback. Some jobs
132 // do not have a user callback, because they were helper jobs
133 // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
134 //
135 // Otherwise jobs that correspond with user-initiated work will
136 // have a non-NULL callback up until the callback is run.
has_user_callback() const137 bool has_user_callback() const { return user_callback_ != NULL; }
138
139 // This method is called when the job is inserted into a wait queue
140 // because no executors were ready to accept it.
WaitingForThread()141 virtual void WaitingForThread() {}
142
143 // This method is called just before the job is posted to the work thread.
FinishedWaitingForThread()144 virtual void FinishedWaitingForThread() {}
145
146 // This method is called on the worker thread to do the job's work. On
147 // completion, implementors are expected to call OnJobCompleted() on
148 // |origin_loop|.
149 virtual void Run(MessageLoop* origin_loop) = 0;
150
151 protected:
OnJobCompleted()152 void OnJobCompleted() {
153 // |executor_| will be NULL if the executor has already been deleted.
154 if (executor_)
155 executor_->OnJobCompleted(this);
156 }
157
RunUserCallback(int result)158 void RunUserCallback(int result) {
159 DCHECK(has_user_callback());
160 CompletionCallback* callback = user_callback_;
161 // Null the callback so has_user_callback() will now return false.
162 user_callback_ = NULL;
163 callback->Run(result);
164 }
165
166 friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
167
~Job()168 virtual ~Job() {}
169
170 private:
171 const Type type_;
172 CompletionCallback* user_callback_;
173 Executor* executor_;
174 bool was_cancelled_;
175 };
176
177 // MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
178
179 // Runs on the worker thread to call ProxyResolver::SetPacScript.
180 class MultiThreadedProxyResolver::SetPacScriptJob
181 : public MultiThreadedProxyResolver::Job {
182 public:
SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData> & script_data,CompletionCallback * callback)183 SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
184 CompletionCallback* callback)
185 : Job(callback ? TYPE_SET_PAC_SCRIPT : TYPE_SET_PAC_SCRIPT_INTERNAL,
186 callback),
187 script_data_(script_data) {
188 }
189
190 // Runs on the worker thread.
Run(MessageLoop * origin_loop)191 virtual void Run(MessageLoop* origin_loop) {
192 ProxyResolver* resolver = executor()->resolver();
193 int rv = resolver->SetPacScript(script_data_, NULL);
194
195 DCHECK_NE(rv, ERR_IO_PENDING);
196 origin_loop->PostTask(
197 FROM_HERE,
198 NewRunnableMethod(this, &SetPacScriptJob::RequestComplete, rv));
199 }
200
201 private:
202 // Runs the completion callback on the origin thread.
RequestComplete(int result_code)203 void RequestComplete(int result_code) {
204 // The task may have been cancelled after it was started.
205 if (!was_cancelled() && has_user_callback()) {
206 RunUserCallback(result_code);
207 }
208 OnJobCompleted();
209 }
210
211 const scoped_refptr<ProxyResolverScriptData> script_data_;
212 };
213
214 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
215
216 class MultiThreadedProxyResolver::GetProxyForURLJob
217 : public MultiThreadedProxyResolver::Job {
218 public:
219 // |url| -- the URL of the query.
220 // |results| -- the structure to fill with proxy resolve results.
GetProxyForURLJob(const GURL & url,ProxyInfo * results,CompletionCallback * callback,const BoundNetLog & net_log)221 GetProxyForURLJob(const GURL& url,
222 ProxyInfo* results,
223 CompletionCallback* callback,
224 const BoundNetLog& net_log)
225 : Job(TYPE_GET_PROXY_FOR_URL, callback),
226 results_(results),
227 net_log_(net_log),
228 url_(url),
229 was_waiting_for_thread_(false) {
230 DCHECK(callback);
231 }
232
net_log()233 BoundNetLog* net_log() { return &net_log_; }
234
WaitingForThread()235 virtual void WaitingForThread() {
236 was_waiting_for_thread_ = true;
237 net_log_.BeginEvent(
238 NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
239 }
240
FinishedWaitingForThread()241 virtual void FinishedWaitingForThread() {
242 DCHECK(executor());
243
244 if (was_waiting_for_thread_) {
245 net_log_.EndEvent(
246 NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
247 }
248
249 net_log_.AddEvent(
250 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
251 make_scoped_refptr(new NetLogIntegerParameter(
252 "thread_number", executor()->thread_number())));
253 }
254
255 // Runs on the worker thread.
Run(MessageLoop * origin_loop)256 virtual void Run(MessageLoop* origin_loop) {
257 ProxyResolver* resolver = executor()->resolver();
258 int rv = resolver->GetProxyForURL(
259 url_, &results_buf_, NULL, NULL, net_log_);
260 DCHECK_NE(rv, ERR_IO_PENDING);
261
262 origin_loop->PostTask(
263 FROM_HERE,
264 NewRunnableMethod(this, &GetProxyForURLJob::QueryComplete, rv));
265 }
266
267 private:
268 // Runs the completion callback on the origin thread.
QueryComplete(int result_code)269 void QueryComplete(int result_code) {
270 // The Job may have been cancelled after it was started.
271 if (!was_cancelled()) {
272 if (result_code >= OK) { // Note: unit-tests use values > 0.
273 results_->Use(results_buf_);
274 }
275 RunUserCallback(result_code);
276 }
277 OnJobCompleted();
278 }
279
280 // Must only be used on the "origin" thread.
281 ProxyInfo* results_;
282
283 // Can be used on either "origin" or worker thread.
284 BoundNetLog net_log_;
285 const GURL url_;
286
287 // Usable from within DoQuery on the worker thread.
288 ProxyInfo results_buf_;
289
290 bool was_waiting_for_thread_;
291 };
292
293 // MultiThreadedProxyResolver::Executor ----------------------------------------
294
Executor(MultiThreadedProxyResolver * coordinator,ProxyResolver * resolver,int thread_number)295 MultiThreadedProxyResolver::Executor::Executor(
296 MultiThreadedProxyResolver* coordinator,
297 ProxyResolver* resolver,
298 int thread_number)
299 : coordinator_(coordinator),
300 thread_number_(thread_number),
301 resolver_(resolver) {
302 DCHECK(coordinator);
303 DCHECK(resolver);
304 // Start up the thread.
305 // Note that it is safe to pass a temporary C-String to Thread(), as it will
306 // make a copy.
307 std::string thread_name =
308 base::StringPrintf("PAC thread #%d", thread_number);
309 thread_.reset(new base::Thread(thread_name.c_str()));
310 CHECK(thread_->Start());
311 }
312
StartJob(Job * job)313 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
314 DCHECK(!outstanding_job_);
315 outstanding_job_ = job;
316
317 // Run the job. Once it has completed (regardless of whether it was
318 // cancelled), it will invoke OnJobCompleted() on this thread.
319 job->set_executor(this);
320 job->FinishedWaitingForThread();
321 thread_->message_loop()->PostTask(
322 FROM_HERE,
323 NewRunnableMethod(job, &Job::Run, MessageLoop::current()));
324 }
325
OnJobCompleted(Job * job)326 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
327 DCHECK_EQ(job, outstanding_job_.get());
328 outstanding_job_ = NULL;
329 coordinator_->OnExecutorReady(this);
330 }
331
Destroy()332 void MultiThreadedProxyResolver::Executor::Destroy() {
333 DCHECK(coordinator_);
334
335 // Give the resolver an opportunity to shutdown from THIS THREAD before
336 // joining on the resolver thread. This allows certain implementations
337 // to avoid deadlocks.
338 resolver_->Shutdown();
339
340 {
341 // See http://crbug.com/69710.
342 base::ThreadRestrictions::ScopedAllowIO allow_io;
343
344 // Join the worker thread.
345 thread_.reset();
346 }
347
348 // Cancel any outstanding job.
349 if (outstanding_job_) {
350 outstanding_job_->Cancel();
351 // Orphan the job (since this executor may be deleted soon).
352 outstanding_job_->set_executor(NULL);
353 }
354
355 // It is now safe to free the ProxyResolver, since all the tasks that
356 // were using it on the resolver thread have completed.
357 resolver_.reset();
358
359 // Null some stuff as a precaution.
360 coordinator_ = NULL;
361 outstanding_job_ = NULL;
362 }
363
PurgeMemory()364 void MultiThreadedProxyResolver::Executor::PurgeMemory() {
365 scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get()));
366 thread_->message_loop()->PostTask(
367 FROM_HERE,
368 NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory));
369 }
370
~Executor()371 MultiThreadedProxyResolver::Executor::~Executor() {
372 // The important cleanup happens as part of Destroy(), which should always be
373 // called first.
374 DCHECK(!coordinator_) << "Destroy() was not called";
375 DCHECK(!thread_.get());
376 DCHECK(!resolver_.get());
377 DCHECK(!outstanding_job_);
378 }
379
380 // MultiThreadedProxyResolver --------------------------------------------------
381
MultiThreadedProxyResolver(ProxyResolverFactory * resolver_factory,size_t max_num_threads)382 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
383 ProxyResolverFactory* resolver_factory,
384 size_t max_num_threads)
385 : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
386 resolver_factory_(resolver_factory),
387 max_num_threads_(max_num_threads) {
388 DCHECK_GE(max_num_threads, 1u);
389 }
390
~MultiThreadedProxyResolver()391 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
392 // We will cancel all outstanding requests.
393 pending_jobs_.clear();
394 ReleaseAllExecutors();
395 }
396
GetProxyForURL(const GURL & url,ProxyInfo * results,CompletionCallback * callback,RequestHandle * request,const BoundNetLog & net_log)397 int MultiThreadedProxyResolver::GetProxyForURL(const GURL& url,
398 ProxyInfo* results,
399 CompletionCallback* callback,
400 RequestHandle* request,
401 const BoundNetLog& net_log) {
402 DCHECK(CalledOnValidThread());
403 DCHECK(callback);
404 DCHECK(current_script_data_.get())
405 << "Resolver is un-initialized. Must call SetPacScript() first!";
406
407 scoped_refptr<GetProxyForURLJob> job(
408 new GetProxyForURLJob(url, results, callback, net_log));
409
410 // Completion will be notified through |callback|, unless the caller cancels
411 // the request using |request|.
412 if (request)
413 *request = reinterpret_cast<RequestHandle>(job.get());
414
415 // If there is an executor that is ready to run this request, submit it!
416 Executor* executor = FindIdleExecutor();
417 if (executor) {
418 DCHECK_EQ(0u, pending_jobs_.size());
419 executor->StartJob(job);
420 return ERR_IO_PENDING;
421 }
422
423 // Otherwise queue this request. (We will schedule it to a thread once one
424 // becomes available).
425 job->WaitingForThread();
426 pending_jobs_.push_back(job);
427
428 // If we haven't already reached the thread limit, provision a new thread to
429 // drain the requests more quickly.
430 if (executors_.size() < max_num_threads_) {
431 executor = AddNewExecutor();
432 executor->StartJob(
433 new SetPacScriptJob(current_script_data_, NULL));
434 }
435
436 return ERR_IO_PENDING;
437 }
438
CancelRequest(RequestHandle req)439 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
440 DCHECK(CalledOnValidThread());
441 DCHECK(req);
442
443 Job* job = reinterpret_cast<Job*>(req);
444 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
445
446 if (job->executor()) {
447 // If the job was already submitted to the executor, just mark it
448 // as cancelled so the user callback isn't run on completion.
449 job->Cancel();
450 } else {
451 // Otherwise the job is just sitting in a queue.
452 PendingJobsQueue::iterator it =
453 std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
454 DCHECK(it != pending_jobs_.end());
455 pending_jobs_.erase(it);
456 }
457 }
458
CancelSetPacScript()459 void MultiThreadedProxyResolver::CancelSetPacScript() {
460 DCHECK(CalledOnValidThread());
461 DCHECK_EQ(0u, pending_jobs_.size());
462 DCHECK_EQ(1u, executors_.size());
463 DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
464 executors_[0]->outstanding_job()->type());
465
466 // Defensively clear some data which shouldn't be getting used
467 // anymore.
468 current_script_data_ = NULL;
469
470 ReleaseAllExecutors();
471 }
472
PurgeMemory()473 void MultiThreadedProxyResolver::PurgeMemory() {
474 DCHECK(CalledOnValidThread());
475 for (ExecutorList::iterator it = executors_.begin();
476 it != executors_.end(); ++it) {
477 Executor* executor = *it;
478 executor->PurgeMemory();
479 }
480 }
481
SetPacScript(const scoped_refptr<ProxyResolverScriptData> & script_data,CompletionCallback * callback)482 int MultiThreadedProxyResolver::SetPacScript(
483 const scoped_refptr<ProxyResolverScriptData>& script_data,
484 CompletionCallback* callback) {
485 DCHECK(CalledOnValidThread());
486 DCHECK(callback);
487
488 // Save the script details, so we can provision new executors later.
489 current_script_data_ = script_data;
490
491 // The user should not have any outstanding requests when they call
492 // SetPacScript().
493 CheckNoOutstandingUserRequests();
494
495 // Destroy all of the current threads and their proxy resolvers.
496 ReleaseAllExecutors();
497
498 // Provision a new executor, and run the SetPacScript request. On completion
499 // notification will be sent through |callback|.
500 Executor* executor = AddNewExecutor();
501 executor->StartJob(new SetPacScriptJob(script_data, callback));
502 return ERR_IO_PENDING;
503 }
504
CheckNoOutstandingUserRequests() const505 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
506 DCHECK(CalledOnValidThread());
507 CHECK_EQ(0u, pending_jobs_.size());
508
509 for (ExecutorList::const_iterator it = executors_.begin();
510 it != executors_.end(); ++it) {
511 const Executor* executor = *it;
512 Job* job = executor->outstanding_job();
513 // The "has_user_callback()" is to exclude jobs for which the callback
514 // has already been invoked, or was not user-initiated (as in the case of
515 // lazy thread provisions). User-initiated jobs may !has_user_callback()
516 // when the callback has already been run. (Since we only clear the
517 // outstanding job AFTER the callback has been invoked, it is possible
518 // for a new request to be started from within the callback).
519 CHECK(!job || job->was_cancelled() || !job->has_user_callback());
520 }
521 }
522
ReleaseAllExecutors()523 void MultiThreadedProxyResolver::ReleaseAllExecutors() {
524 DCHECK(CalledOnValidThread());
525 for (ExecutorList::iterator it = executors_.begin();
526 it != executors_.end(); ++it) {
527 Executor* executor = *it;
528 executor->Destroy();
529 }
530 executors_.clear();
531 }
532
533 MultiThreadedProxyResolver::Executor*
FindIdleExecutor()534 MultiThreadedProxyResolver::FindIdleExecutor() {
535 DCHECK(CalledOnValidThread());
536 for (ExecutorList::iterator it = executors_.begin();
537 it != executors_.end(); ++it) {
538 Executor* executor = *it;
539 if (!executor->outstanding_job())
540 return executor;
541 }
542 return NULL;
543 }
544
545 MultiThreadedProxyResolver::Executor*
AddNewExecutor()546 MultiThreadedProxyResolver::AddNewExecutor() {
547 DCHECK(CalledOnValidThread());
548 DCHECK_LT(executors_.size(), max_num_threads_);
549 // The "thread number" is used to give the thread a unique name.
550 int thread_number = executors_.size();
551 ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
552 Executor* executor = new Executor(
553 this, resolver, thread_number);
554 executors_.push_back(make_scoped_refptr(executor));
555 return executor;
556 }
557
OnExecutorReady(Executor * executor)558 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
559 DCHECK(CalledOnValidThread());
560 if (pending_jobs_.empty())
561 return;
562
563 // Get the next job to process (FIFO). Transfer it from the pending queue
564 // to the executor.
565 scoped_refptr<Job> job = pending_jobs_.front();
566 pending_jobs_.pop_front();
567 executor->StartJob(job);
568 }
569
570 } // namespace net
571