1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/proxy/multi_threaded_proxy_resolver.h"
6
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/message_loop/message_loop_proxy.h"
10 #include "base/strings/string_util.h"
11 #include "base/strings/stringprintf.h"
12 #include "base/threading/thread.h"
13 #include "base/threading/thread_restrictions.h"
14 #include "net/base/net_errors.h"
15 #include "net/base/net_log.h"
16 #include "net/proxy/proxy_info.h"
17
18 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
19 // data when SetPacScript fails. That will reclaim memory when
20 // testing bogus scripts.
21
22 namespace net {
23
24 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
25 // thread and a synchronous ProxyResolver (which will be operated on said
26 // thread.)
27 class MultiThreadedProxyResolver::Executor
28 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
29 public:
30 // |coordinator| must remain valid throughout our lifetime. It is used to
31 // signal when the executor is ready to receive work by calling
32 // |coordinator->OnExecutorReady()|.
33 // The constructor takes ownership of |resolver|.
34 // |thread_number| is an identifier used when naming the worker thread.
35 Executor(MultiThreadedProxyResolver* coordinator,
36 ProxyResolver* resolver,
37 int thread_number);
38
39 // Submit a job to this executor.
40 void StartJob(Job* job);
41
42 // Callback for when a job has completed running on the executor's thread.
43 void OnJobCompleted(Job* job);
44
45 // Cleanup the executor. Cancels all outstanding work, and frees the thread
46 // and resolver.
47 void Destroy();
48
49 // Returns the outstanding job, or NULL.
outstanding_job() const50 Job* outstanding_job() const { return outstanding_job_.get(); }
51
resolver()52 ProxyResolver* resolver() { return resolver_.get(); }
53
thread_number() const54 int thread_number() const { return thread_number_; }
55
56 private:
57 friend class base::RefCountedThreadSafe<Executor>;
58 ~Executor();
59
60 MultiThreadedProxyResolver* coordinator_;
61 const int thread_number_;
62
63 // The currently active job for this executor (either a SetPacScript or
64 // GetProxyForURL task).
65 scoped_refptr<Job> outstanding_job_;
66
67 // The synchronous resolver implementation.
68 scoped_ptr<ProxyResolver> resolver_;
69
70 // The thread where |resolver_| is run on.
71 // Note that declaration ordering is important here. |thread_| needs to be
72 // destroyed *before* |resolver_|, in case |resolver_| is currently
73 // executing on |thread_|.
74 scoped_ptr<base::Thread> thread_;
75 };
76
77 // MultiThreadedProxyResolver::Job ---------------------------------------------
78
79 class MultiThreadedProxyResolver::Job
80 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
81 public:
82 // Identifies the subclass of Job (only being used for debugging purposes).
83 enum Type {
84 TYPE_GET_PROXY_FOR_URL,
85 TYPE_SET_PAC_SCRIPT,
86 TYPE_SET_PAC_SCRIPT_INTERNAL,
87 };
88
Job(Type type,const CompletionCallback & callback)89 Job(Type type, const CompletionCallback& callback)
90 : type_(type),
91 callback_(callback),
92 executor_(NULL),
93 was_cancelled_(false) {
94 }
95
set_executor(Executor * executor)96 void set_executor(Executor* executor) {
97 executor_ = executor;
98 }
99
100 // The "executor" is the job runner that is scheduling this job. If
101 // this job has not been submitted to an executor yet, this will be
102 // NULL (and we know it hasn't started yet).
executor()103 Executor* executor() {
104 return executor_;
105 }
106
107 // Mark the job as having been cancelled.
Cancel()108 void Cancel() {
109 was_cancelled_ = true;
110 }
111
112 // Returns true if Cancel() has been called.
was_cancelled() const113 bool was_cancelled() const { return was_cancelled_; }
114
type() const115 Type type() const { return type_; }
116
117 // Returns true if this job still has a user callback. Some jobs
118 // do not have a user callback, because they were helper jobs
119 // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
120 //
121 // Otherwise jobs that correspond with user-initiated work will
122 // have a non-null callback up until the callback is run.
has_user_callback() const123 bool has_user_callback() const { return !callback_.is_null(); }
124
125 // This method is called when the job is inserted into a wait queue
126 // because no executors were ready to accept it.
WaitingForThread()127 virtual void WaitingForThread() {}
128
129 // This method is called just before the job is posted to the work thread.
FinishedWaitingForThread()130 virtual void FinishedWaitingForThread() {}
131
132 // This method is called on the worker thread to do the job's work. On
133 // completion, implementors are expected to call OnJobCompleted() on
134 // |origin_loop|.
135 virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) = 0;
136
137 protected:
OnJobCompleted()138 void OnJobCompleted() {
139 // |executor_| will be NULL if the executor has already been deleted.
140 if (executor_)
141 executor_->OnJobCompleted(this);
142 }
143
RunUserCallback(int result)144 void RunUserCallback(int result) {
145 DCHECK(has_user_callback());
146 CompletionCallback callback = callback_;
147 // Reset the callback so has_user_callback() will now return false.
148 callback_.Reset();
149 callback.Run(result);
150 }
151
152 friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
153
~Job()154 virtual ~Job() {}
155
156 private:
157 const Type type_;
158 CompletionCallback callback_;
159 Executor* executor_;
160 bool was_cancelled_;
161 };
162
163 // MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
164
165 // Runs on the worker thread to call ProxyResolver::SetPacScript.
166 class MultiThreadedProxyResolver::SetPacScriptJob
167 : public MultiThreadedProxyResolver::Job {
168 public:
SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData> & script_data,const CompletionCallback & callback)169 SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
170 const CompletionCallback& callback)
171 : Job(!callback.is_null() ? TYPE_SET_PAC_SCRIPT :
172 TYPE_SET_PAC_SCRIPT_INTERNAL,
173 callback),
174 script_data_(script_data) {
175 }
176
177 // Runs on the worker thread.
Run(scoped_refptr<base::MessageLoopProxy> origin_loop)178 virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
179 ProxyResolver* resolver = executor()->resolver();
180 int rv = resolver->SetPacScript(script_data_, CompletionCallback());
181
182 DCHECK_NE(rv, ERR_IO_PENDING);
183 origin_loop->PostTask(
184 FROM_HERE,
185 base::Bind(&SetPacScriptJob::RequestComplete, this, rv));
186 }
187
188 protected:
~SetPacScriptJob()189 virtual ~SetPacScriptJob() {}
190
191 private:
192 // Runs the completion callback on the origin thread.
RequestComplete(int result_code)193 void RequestComplete(int result_code) {
194 // The task may have been cancelled after it was started.
195 if (!was_cancelled() && has_user_callback()) {
196 RunUserCallback(result_code);
197 }
198 OnJobCompleted();
199 }
200
201 const scoped_refptr<ProxyResolverScriptData> script_data_;
202 };
203
204 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
205
206 class MultiThreadedProxyResolver::GetProxyForURLJob
207 : public MultiThreadedProxyResolver::Job {
208 public:
209 // |url| -- the URL of the query.
210 // |results| -- the structure to fill with proxy resolve results.
GetProxyForURLJob(const GURL & url,ProxyInfo * results,const CompletionCallback & callback,const BoundNetLog & net_log)211 GetProxyForURLJob(const GURL& url,
212 ProxyInfo* results,
213 const CompletionCallback& callback,
214 const BoundNetLog& net_log)
215 : Job(TYPE_GET_PROXY_FOR_URL, callback),
216 results_(results),
217 net_log_(net_log),
218 url_(url),
219 was_waiting_for_thread_(false) {
220 DCHECK(!callback.is_null());
221 }
222
net_log()223 BoundNetLog* net_log() { return &net_log_; }
224
WaitingForThread()225 virtual void WaitingForThread() OVERRIDE {
226 was_waiting_for_thread_ = true;
227 net_log_.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
228 }
229
FinishedWaitingForThread()230 virtual void FinishedWaitingForThread() OVERRIDE {
231 DCHECK(executor());
232
233 if (was_waiting_for_thread_) {
234 net_log_.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
235 }
236
237 net_log_.AddEvent(
238 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
239 NetLog::IntegerCallback("thread_number", executor()->thread_number()));
240 }
241
242 // Runs on the worker thread.
Run(scoped_refptr<base::MessageLoopProxy> origin_loop)243 virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
244 ProxyResolver* resolver = executor()->resolver();
245 int rv = resolver->GetProxyForURL(
246 url_, &results_buf_, CompletionCallback(), NULL, net_log_);
247 DCHECK_NE(rv, ERR_IO_PENDING);
248
249 origin_loop->PostTask(
250 FROM_HERE,
251 base::Bind(&GetProxyForURLJob::QueryComplete, this, rv));
252 }
253
254 protected:
~GetProxyForURLJob()255 virtual ~GetProxyForURLJob() {}
256
257 private:
258 // Runs the completion callback on the origin thread.
QueryComplete(int result_code)259 void QueryComplete(int result_code) {
260 // The Job may have been cancelled after it was started.
261 if (!was_cancelled()) {
262 if (result_code >= OK) { // Note: unit-tests use values > 0.
263 results_->Use(results_buf_);
264 }
265 RunUserCallback(result_code);
266 }
267 OnJobCompleted();
268 }
269
270 // Must only be used on the "origin" thread.
271 ProxyInfo* results_;
272
273 // Can be used on either "origin" or worker thread.
274 BoundNetLog net_log_;
275 const GURL url_;
276
277 // Usable from within DoQuery on the worker thread.
278 ProxyInfo results_buf_;
279
280 bool was_waiting_for_thread_;
281 };
282
283 // MultiThreadedProxyResolver::Executor ----------------------------------------
284
Executor(MultiThreadedProxyResolver * coordinator,ProxyResolver * resolver,int thread_number)285 MultiThreadedProxyResolver::Executor::Executor(
286 MultiThreadedProxyResolver* coordinator,
287 ProxyResolver* resolver,
288 int thread_number)
289 : coordinator_(coordinator),
290 thread_number_(thread_number),
291 resolver_(resolver) {
292 DCHECK(coordinator);
293 DCHECK(resolver);
294 // Start up the thread.
295 thread_.reset(new base::Thread(base::StringPrintf("PAC thread #%d",
296 thread_number)));
297 CHECK(thread_->Start());
298 }
299
StartJob(Job * job)300 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
301 DCHECK(!outstanding_job_.get());
302 outstanding_job_ = job;
303
304 // Run the job. Once it has completed (regardless of whether it was
305 // cancelled), it will invoke OnJobCompleted() on this thread.
306 job->set_executor(this);
307 job->FinishedWaitingForThread();
308 thread_->message_loop()->PostTask(
309 FROM_HERE,
310 base::Bind(&Job::Run, job, base::MessageLoopProxy::current()));
311 }
312
OnJobCompleted(Job * job)313 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
314 DCHECK_EQ(job, outstanding_job_.get());
315 outstanding_job_ = NULL;
316 coordinator_->OnExecutorReady(this);
317 }
318
Destroy()319 void MultiThreadedProxyResolver::Executor::Destroy() {
320 DCHECK(coordinator_);
321
322 {
323 // See http://crbug.com/69710.
324 base::ThreadRestrictions::ScopedAllowIO allow_io;
325
326 // Join the worker thread.
327 thread_.reset();
328 }
329
330 // Cancel any outstanding job.
331 if (outstanding_job_.get()) {
332 outstanding_job_->Cancel();
333 // Orphan the job (since this executor may be deleted soon).
334 outstanding_job_->set_executor(NULL);
335 }
336
337 // It is now safe to free the ProxyResolver, since all the tasks that
338 // were using it on the resolver thread have completed.
339 resolver_.reset();
340
341 // Null some stuff as a precaution.
342 coordinator_ = NULL;
343 outstanding_job_ = NULL;
344 }
345
~Executor()346 MultiThreadedProxyResolver::Executor::~Executor() {
347 // The important cleanup happens as part of Destroy(), which should always be
348 // called first.
349 DCHECK(!coordinator_) << "Destroy() was not called";
350 DCHECK(!thread_.get());
351 DCHECK(!resolver_.get());
352 DCHECK(!outstanding_job_.get());
353 }
354
355 // MultiThreadedProxyResolver --------------------------------------------------
356
MultiThreadedProxyResolver(ProxyResolverFactory * resolver_factory,size_t max_num_threads)357 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
358 ProxyResolverFactory* resolver_factory,
359 size_t max_num_threads)
360 : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
361 resolver_factory_(resolver_factory),
362 max_num_threads_(max_num_threads) {
363 DCHECK_GE(max_num_threads, 1u);
364 }
365
~MultiThreadedProxyResolver()366 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
367 // We will cancel all outstanding requests.
368 pending_jobs_.clear();
369 ReleaseAllExecutors();
370 }
371
GetProxyForURL(const GURL & url,ProxyInfo * results,const CompletionCallback & callback,RequestHandle * request,const BoundNetLog & net_log)372 int MultiThreadedProxyResolver::GetProxyForURL(
373 const GURL& url, ProxyInfo* results, const CompletionCallback& callback,
374 RequestHandle* request, const BoundNetLog& net_log) {
375 DCHECK(CalledOnValidThread());
376 DCHECK(!callback.is_null());
377 DCHECK(current_script_data_.get())
378 << "Resolver is un-initialized. Must call SetPacScript() first!";
379
380 scoped_refptr<GetProxyForURLJob> job(
381 new GetProxyForURLJob(url, results, callback, net_log));
382
383 // Completion will be notified through |callback|, unless the caller cancels
384 // the request using |request|.
385 if (request)
386 *request = reinterpret_cast<RequestHandle>(job.get());
387
388 // If there is an executor that is ready to run this request, submit it!
389 Executor* executor = FindIdleExecutor();
390 if (executor) {
391 DCHECK_EQ(0u, pending_jobs_.size());
392 executor->StartJob(job.get());
393 return ERR_IO_PENDING;
394 }
395
396 // Otherwise queue this request. (We will schedule it to a thread once one
397 // becomes available).
398 job->WaitingForThread();
399 pending_jobs_.push_back(job);
400
401 // If we haven't already reached the thread limit, provision a new thread to
402 // drain the requests more quickly.
403 if (executors_.size() < max_num_threads_) {
404 executor = AddNewExecutor();
405 executor->StartJob(
406 new SetPacScriptJob(current_script_data_, CompletionCallback()));
407 }
408
409 return ERR_IO_PENDING;
410 }
411
CancelRequest(RequestHandle req)412 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
413 DCHECK(CalledOnValidThread());
414 DCHECK(req);
415
416 Job* job = reinterpret_cast<Job*>(req);
417 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
418
419 if (job->executor()) {
420 // If the job was already submitted to the executor, just mark it
421 // as cancelled so the user callback isn't run on completion.
422 job->Cancel();
423 } else {
424 // Otherwise the job is just sitting in a queue.
425 PendingJobsQueue::iterator it =
426 std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
427 DCHECK(it != pending_jobs_.end());
428 pending_jobs_.erase(it);
429 }
430 }
431
GetLoadState(RequestHandle req) const432 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const {
433 DCHECK(CalledOnValidThread());
434 DCHECK(req);
435 return LOAD_STATE_RESOLVING_PROXY_FOR_URL;
436 }
437
CancelSetPacScript()438 void MultiThreadedProxyResolver::CancelSetPacScript() {
439 DCHECK(CalledOnValidThread());
440 DCHECK_EQ(0u, pending_jobs_.size());
441 DCHECK_EQ(1u, executors_.size());
442 DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
443 executors_[0]->outstanding_job()->type());
444
445 // Defensively clear some data which shouldn't be getting used
446 // anymore.
447 current_script_data_ = NULL;
448
449 ReleaseAllExecutors();
450 }
451
SetPacScript(const scoped_refptr<ProxyResolverScriptData> & script_data,const CompletionCallback & callback)452 int MultiThreadedProxyResolver::SetPacScript(
453 const scoped_refptr<ProxyResolverScriptData>& script_data,
454 const CompletionCallback&callback) {
455 DCHECK(CalledOnValidThread());
456 DCHECK(!callback.is_null());
457
458 // Save the script details, so we can provision new executors later.
459 current_script_data_ = script_data;
460
461 // The user should not have any outstanding requests when they call
462 // SetPacScript().
463 CheckNoOutstandingUserRequests();
464
465 // Destroy all of the current threads and their proxy resolvers.
466 ReleaseAllExecutors();
467
468 // Provision a new executor, and run the SetPacScript request. On completion
469 // notification will be sent through |callback|.
470 Executor* executor = AddNewExecutor();
471 executor->StartJob(new SetPacScriptJob(script_data, callback));
472 return ERR_IO_PENDING;
473 }
474
CheckNoOutstandingUserRequests() const475 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
476 DCHECK(CalledOnValidThread());
477 CHECK_EQ(0u, pending_jobs_.size());
478
479 for (ExecutorList::const_iterator it = executors_.begin();
480 it != executors_.end(); ++it) {
481 const Executor* executor = it->get();
482 Job* job = executor->outstanding_job();
483 // The "has_user_callback()" is to exclude jobs for which the callback
484 // has already been invoked, or was not user-initiated (as in the case of
485 // lazy thread provisions). User-initiated jobs may !has_user_callback()
486 // when the callback has already been run. (Since we only clear the
487 // outstanding job AFTER the callback has been invoked, it is possible
488 // for a new request to be started from within the callback).
489 CHECK(!job || job->was_cancelled() || !job->has_user_callback());
490 }
491 }
492
ReleaseAllExecutors()493 void MultiThreadedProxyResolver::ReleaseAllExecutors() {
494 DCHECK(CalledOnValidThread());
495 for (ExecutorList::iterator it = executors_.begin();
496 it != executors_.end(); ++it) {
497 Executor* executor = it->get();
498 executor->Destroy();
499 }
500 executors_.clear();
501 }
502
503 MultiThreadedProxyResolver::Executor*
FindIdleExecutor()504 MultiThreadedProxyResolver::FindIdleExecutor() {
505 DCHECK(CalledOnValidThread());
506 for (ExecutorList::iterator it = executors_.begin();
507 it != executors_.end(); ++it) {
508 Executor* executor = it->get();
509 if (!executor->outstanding_job())
510 return executor;
511 }
512 return NULL;
513 }
514
515 MultiThreadedProxyResolver::Executor*
AddNewExecutor()516 MultiThreadedProxyResolver::AddNewExecutor() {
517 DCHECK(CalledOnValidThread());
518 DCHECK_LT(executors_.size(), max_num_threads_);
519 // The "thread number" is used to give the thread a unique name.
520 int thread_number = executors_.size();
521 ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
522 Executor* executor = new Executor(
523 this, resolver, thread_number);
524 executors_.push_back(make_scoped_refptr(executor));
525 return executor;
526 }
527
OnExecutorReady(Executor * executor)528 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
529 DCHECK(CalledOnValidThread());
530 if (pending_jobs_.empty())
531 return;
532
533 // Get the next job to process (FIFO). Transfer it from the pending queue
534 // to the executor.
535 scoped_refptr<Job> job = pending_jobs_.front();
536 pending_jobs_.pop_front();
537 executor->StartJob(job.get());
538 }
539
540 } // namespace net
541