1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/proxy_resolution/multi_threaded_proxy_resolver.h"
6
7 #include <memory>
8 #include <utility>
9 #include <vector>
10
11 #include "base/containers/circular_deque.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback_helpers.h"
14 #include "base/location.h"
15 #include "base/memory/raw_ptr.h"
16 #include "base/strings/string_util.h"
17 #include "base/strings/stringprintf.h"
18 #include "base/task/single_thread_task_runner.h"
19 #include "base/threading/thread.h"
20 #include "base/threading/thread_checker.h"
21 #include "base/threading/thread_restrictions.h"
22 #include "net/base/net_errors.h"
23 #include "net/base/network_anonymization_key.h"
24 #include "net/log/net_log.h"
25 #include "net/log/net_log_event_type.h"
26 #include "net/log/net_log_with_source.h"
27 #include "net/proxy_resolution/proxy_info.h"
28 #include "net/proxy_resolution/proxy_resolver.h"
29
30 namespace net {
31
32 class NetworkAnonymizationKey;
33
34 // http://crbug.com/69710
35 class MultiThreadedProxyResolverScopedAllowJoinOnIO
36 : public base::ScopedAllowBaseSyncPrimitivesOutsideBlockingScope {};
37
38 namespace {
39 class Job;
40
41 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
42 // thread and a synchronous ProxyResolver (which will be operated on said
43 // thread.)
44 class Executor : public base::RefCountedThreadSafe<Executor> {
45 public:
46 class Coordinator {
47 public:
48 virtual void OnExecutorReady(Executor* executor) = 0;
49
50 protected:
51 virtual ~Coordinator() = default;
52 };
53
54 // |coordinator| must remain valid throughout our lifetime. It is used to
55 // signal when the executor is ready to receive work by calling
56 // |coordinator->OnExecutorReady()|.
57 // |thread_number| is an identifier used when naming the worker thread.
58 Executor(Coordinator* coordinator, int thread_number);
59
60 // Submit a job to this executor.
61 void StartJob(scoped_refptr<Job> job);
62
63 // Callback for when a job has completed running on the executor's thread.
64 void OnJobCompleted(Job* job);
65
66 // Cleanup the executor. Cancels all outstanding work, and frees the thread
67 // and resolver.
68 void Destroy();
69
70 // Returns the outstanding job, or NULL.
outstanding_job() const71 Job* outstanding_job() const { return outstanding_job_.get(); }
72
resolver()73 ProxyResolver* resolver() { return resolver_.get(); }
74
thread_number() const75 int thread_number() const { return thread_number_; }
76
set_resolver(std::unique_ptr<ProxyResolver> resolver)77 void set_resolver(std::unique_ptr<ProxyResolver> resolver) {
78 resolver_ = std::move(resolver);
79 }
80
set_coordinator(Coordinator * coordinator)81 void set_coordinator(Coordinator* coordinator) {
82 DCHECK(coordinator);
83 DCHECK(coordinator_);
84 coordinator_ = coordinator;
85 }
86
87 private:
88 friend class base::RefCountedThreadSafe<Executor>;
89 ~Executor();
90
91 raw_ptr<Coordinator> coordinator_;
92 const int thread_number_;
93
94 // The currently active job for this executor (either a CreateProxyResolver or
95 // GetProxyForURL task).
96 scoped_refptr<Job> outstanding_job_;
97
98 // The synchronous resolver implementation.
99 std::unique_ptr<ProxyResolver> resolver_;
100
101 // The thread where |resolver_| is run on.
102 // Note that declaration ordering is important here. |thread_| needs to be
103 // destroyed *before* |resolver_|, in case |resolver_| is currently
104 // executing on |thread_|.
105 std::unique_ptr<base::Thread> thread_;
106 };
107
108 class MultiThreadedProxyResolver : public ProxyResolver,
109 public Executor::Coordinator {
110 public:
111 // Creates an asynchronous ProxyResolver that runs requests on up to
112 // |max_num_threads|.
113 //
114 // For each thread that is created, an accompanying synchronous ProxyResolver
115 // will be provisioned using |resolver_factory|. All methods on these
116 // ProxyResolvers will be called on the one thread.
117 MultiThreadedProxyResolver(
118 std::unique_ptr<ProxyResolverFactory> resolver_factory,
119 size_t max_num_threads,
120 const scoped_refptr<PacFileData>& script_data,
121 scoped_refptr<Executor> executor);
122
123 ~MultiThreadedProxyResolver() override;
124
125 // ProxyResolver implementation:
126 int GetProxyForURL(const GURL& url,
127 const NetworkAnonymizationKey& network_anonymization_key,
128 ProxyInfo* results,
129 CompletionOnceCallback callback,
130 std::unique_ptr<Request>* request,
131 const NetLogWithSource& net_log) override;
132
133 private:
134 class GetProxyForURLJob;
135 class RequestImpl;
136 // FIFO queue of pending jobs waiting to be started.
137 // TODO(eroman): Make this priority queue.
138 using PendingJobsQueue = base::circular_deque<scoped_refptr<Job>>;
139 using ExecutorList = std::vector<scoped_refptr<Executor>>;
140
141 // Returns an idle worker thread which is ready to receive GetProxyForURL()
142 // requests. If all threads are occupied, returns NULL.
143 Executor* FindIdleExecutor();
144
145 // Creates a new worker thread, and appends it to |executors_|.
146 void AddNewExecutor();
147
148 // Starts the next job from |pending_jobs_| if possible.
149 void OnExecutorReady(Executor* executor) override;
150
151 const std::unique_ptr<ProxyResolverFactory> resolver_factory_;
152 const size_t max_num_threads_;
153 PendingJobsQueue pending_jobs_;
154 ExecutorList executors_;
155 scoped_refptr<PacFileData> script_data_;
156
157 THREAD_CHECKER(thread_checker_);
158 };
159
160 // Job ---------------------------------------------
161
162 class Job : public base::RefCountedThreadSafe<Job> {
163 public:
164 Job() = default;
165
set_executor(Executor * executor)166 void set_executor(Executor* executor) {
167 executor_ = executor;
168 }
169
170 // The "executor" is the job runner that is scheduling this job. If
171 // this job has not been submitted to an executor yet, this will be
172 // NULL (and we know it hasn't started yet).
executor()173 Executor* executor() {
174 return executor_;
175 }
176
177 // Mark the job as having been cancelled.
Cancel()178 virtual void Cancel() { was_cancelled_ = true; }
179
180 // Returns true if Cancel() has been called.
was_cancelled() const181 bool was_cancelled() const { return was_cancelled_; }
182
183 // This method is called when the job is inserted into a wait queue
184 // because no executors were ready to accept it.
WaitingForThread()185 virtual void WaitingForThread() {}
186
187 // This method is called just before the job is posted to the work thread.
FinishedWaitingForThread()188 virtual void FinishedWaitingForThread() {}
189
190 // This method is called on the worker thread to do the job's work. On
191 // completion, implementors are expected to call OnJobCompleted() on
192 // |origin_runner|.
193 virtual void Run(
194 scoped_refptr<base::SingleThreadTaskRunner> origin_runner) = 0;
195
196 protected:
OnJobCompleted()197 void OnJobCompleted() {
198 // |executor_| will be NULL if the executor has already been deleted.
199 if (executor_)
200 executor_->OnJobCompleted(this);
201 }
202
203 friend class base::RefCountedThreadSafe<Job>;
204
205 virtual ~Job() = default;
206
207 private:
208 raw_ptr<Executor> executor_ = nullptr;
209 bool was_cancelled_ = false;
210 };
211
212 class MultiThreadedProxyResolver::RequestImpl : public ProxyResolver::Request {
213 public:
RequestImpl(scoped_refptr<Job> job)214 explicit RequestImpl(scoped_refptr<Job> job) : job_(std::move(job)) {}
215
~RequestImpl()216 ~RequestImpl() override { job_->Cancel(); }
217
GetLoadState()218 LoadState GetLoadState() override {
219 return LOAD_STATE_RESOLVING_PROXY_FOR_URL;
220 }
221
222 private:
223 scoped_refptr<Job> job_;
224 };
225
226 // CreateResolverJob -----------------------------------------------------------
227
228 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver.
229 class CreateResolverJob : public Job {
230 public:
CreateResolverJob(const scoped_refptr<PacFileData> & script_data,ProxyResolverFactory * factory)231 CreateResolverJob(const scoped_refptr<PacFileData>& script_data,
232 ProxyResolverFactory* factory)
233 : script_data_(script_data), factory_(factory) {}
234
235 // Runs on the worker thread.
Run(scoped_refptr<base::SingleThreadTaskRunner> origin_runner)236 void Run(scoped_refptr<base::SingleThreadTaskRunner> origin_runner) override {
237 std::unique_ptr<ProxyResolverFactory::Request> request;
238 int rv = factory_->CreateProxyResolver(script_data_, &resolver_,
239 CompletionOnceCallback(), &request);
240
241 DCHECK_NE(rv, ERR_IO_PENDING);
242 origin_runner->PostTask(
243 FROM_HERE,
244 base::BindOnce(&CreateResolverJob::RequestComplete, this, rv));
245 }
246
247 protected:
248 ~CreateResolverJob() override = default;
249
Cancel()250 void Cancel() override {
251 // Needed to prevent warnings danging warnings about `factory_`. The
252 // executor ensures that the thread has joined, but there may still be a
253 // pending RequestComplete() that still owns a reference to `this` after the
254 // factory and executor have been destroyed.
255 factory_ = nullptr;
256 Job::Cancel();
257 }
258
259 private:
260 // Runs the completion callback on the origin thread.
RequestComplete(int result_code)261 void RequestComplete(int result_code) {
262 // The task may have been cancelled after it was started.
263 if (!was_cancelled()) {
264 DCHECK(executor());
265 executor()->set_resolver(std::move(resolver_));
266 }
267 OnJobCompleted();
268 }
269
270 const scoped_refptr<PacFileData> script_data_;
271 raw_ptr<ProxyResolverFactory> factory_;
272 std::unique_ptr<ProxyResolver> resolver_;
273 };
274
275 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
276
277 class MultiThreadedProxyResolver::GetProxyForURLJob : public Job {
278 public:
279 // |url| -- the URL of the query.
280 // |results| -- the structure to fill with proxy resolve results.
GetProxyForURLJob(const GURL & url,const NetworkAnonymizationKey & network_anonymization_key,ProxyInfo * results,CompletionOnceCallback callback,const NetLogWithSource & net_log)281 GetProxyForURLJob(const GURL& url,
282 const NetworkAnonymizationKey& network_anonymization_key,
283 ProxyInfo* results,
284 CompletionOnceCallback callback,
285 const NetLogWithSource& net_log)
286 : callback_(std::move(callback)),
287 results_(results),
288 net_log_(net_log),
289 url_(url),
290 network_anonymization_key_(network_anonymization_key) {
291 DCHECK(callback_);
292 }
293
net_log()294 NetLogWithSource* net_log() { return &net_log_; }
295
WaitingForThread()296 void WaitingForThread() override {
297 was_waiting_for_thread_ = true;
298 net_log_.BeginEvent(NetLogEventType::WAITING_FOR_PROXY_RESOLVER_THREAD);
299 }
300
FinishedWaitingForThread()301 void FinishedWaitingForThread() override {
302 DCHECK(executor());
303
304 if (was_waiting_for_thread_) {
305 net_log_.EndEvent(NetLogEventType::WAITING_FOR_PROXY_RESOLVER_THREAD);
306 }
307
308 net_log_.AddEventWithIntParams(
309 NetLogEventType::SUBMITTED_TO_RESOLVER_THREAD, "thread_number",
310 executor()->thread_number());
311 }
312
313 // Runs on the worker thread.
Run(scoped_refptr<base::SingleThreadTaskRunner> origin_runner)314 void Run(scoped_refptr<base::SingleThreadTaskRunner> origin_runner) override {
315 ProxyResolver* resolver = executor()->resolver();
316 DCHECK(resolver);
317 int rv = resolver->GetProxyForURL(url_, network_anonymization_key_,
318 &results_buf_, CompletionOnceCallback(),
319 nullptr, net_log_);
320 DCHECK_NE(rv, ERR_IO_PENDING);
321
322 origin_runner->PostTask(
323 FROM_HERE, base::BindOnce(&GetProxyForURLJob::QueryComplete, this, rv));
324 }
325
Cancel()326 void Cancel() override {
327 // Needed to prevent warnings danging warnings about `results_`. The
328 // executor ensures that the thread has joined, but there may still be a
329 // pending QueryComplete() that still owns a reference to `this` after the
330 // factory and executor have been destroyed.
331 results_ = nullptr;
332 Job::Cancel();
333 }
334
335 protected:
336 ~GetProxyForURLJob() override = default;
337
338 private:
339 // Runs the completion callback on the origin thread.
QueryComplete(int result_code)340 void QueryComplete(int result_code) {
341 // The Job may have been cancelled after it was started.
342 if (!was_cancelled()) {
343 if (result_code >= OK) { // Note: unit-tests use values > 0.
344 results_->Use(results_buf_);
345 }
346 std::move(callback_).Run(result_code);
347 }
348 OnJobCompleted();
349 }
350
351 CompletionOnceCallback callback_;
352
353 // Must only be used on the "origin" thread.
354 raw_ptr<ProxyInfo> results_;
355
356 // Can be used on either "origin" or worker thread.
357 NetLogWithSource net_log_;
358
359 const GURL url_;
360 const NetworkAnonymizationKey network_anonymization_key_;
361
362 // Usable from within DoQuery on the worker thread.
363 ProxyInfo results_buf_;
364
365 bool was_waiting_for_thread_ = false;
366 };
367
368 // Executor ----------------------------------------
369
Executor(Executor::Coordinator * coordinator,int thread_number)370 Executor::Executor(Executor::Coordinator* coordinator, int thread_number)
371 : coordinator_(coordinator), thread_number_(thread_number) {
372 DCHECK(coordinator);
373 // Start up the thread.
374 thread_ = std::make_unique<base::Thread>(
375 base::StringPrintf("PAC thread #%d", thread_number));
376 CHECK(thread_->Start());
377 }
378
StartJob(scoped_refptr<Job> job)379 void Executor::StartJob(scoped_refptr<Job> job) {
380 DCHECK(!outstanding_job_.get());
381 outstanding_job_ = job;
382
383 // Run the job. Once it has completed (regardless of whether it was
384 // cancelled), it will invoke OnJobCompleted() on this thread.
385 job->set_executor(this);
386 job->FinishedWaitingForThread();
387 thread_->task_runner()->PostTask(
388 FROM_HERE,
389 base::BindOnce(&Job::Run, job,
390 base::SingleThreadTaskRunner::GetCurrentDefault()));
391 }
392
OnJobCompleted(Job * job)393 void Executor::OnJobCompleted(Job* job) {
394 DCHECK_EQ(job, outstanding_job_.get());
395 outstanding_job_ = nullptr;
396 coordinator_->OnExecutorReady(this);
397 }
398
Destroy()399 void Executor::Destroy() {
400 DCHECK(coordinator_);
401
402 {
403 // TODO(http://crbug.com/69710): Use ThreadPool instead of creating a
404 // base::Thread.
405 MultiThreadedProxyResolverScopedAllowJoinOnIO allow_thread_join;
406
407 // Join the worker thread.
408 thread_.reset();
409 }
410
411 // Cancel any outstanding job.
412 if (outstanding_job_.get()) {
413 outstanding_job_->Cancel();
414 // Orphan the job (since this executor may be deleted soon).
415 outstanding_job_->set_executor(nullptr);
416 }
417
418 // It is now safe to free the ProxyResolver, since all the tasks that
419 // were using it on the resolver thread have completed.
420 resolver_.reset();
421
422 // Null some stuff as a precaution.
423 coordinator_ = nullptr;
424 outstanding_job_ = nullptr;
425 }
426
~Executor()427 Executor::~Executor() {
428 // The important cleanup happens as part of Destroy(), which should always be
429 // called first.
430 DCHECK(!coordinator_) << "Destroy() was not called";
431 DCHECK(!thread_.get());
432 DCHECK(!resolver_.get());
433 DCHECK(!outstanding_job_.get());
434 }
435
436 // MultiThreadedProxyResolver --------------------------------------------------
437
MultiThreadedProxyResolver(std::unique_ptr<ProxyResolverFactory> resolver_factory,size_t max_num_threads,const scoped_refptr<PacFileData> & script_data,scoped_refptr<Executor> executor)438 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
439 std::unique_ptr<ProxyResolverFactory> resolver_factory,
440 size_t max_num_threads,
441 const scoped_refptr<PacFileData>& script_data,
442 scoped_refptr<Executor> executor)
443 : resolver_factory_(std::move(resolver_factory)),
444 max_num_threads_(max_num_threads),
445 script_data_(script_data) {
446 DCHECK(script_data_);
447 executor->set_coordinator(this);
448 executors_.push_back(executor);
449 }
450
~MultiThreadedProxyResolver()451 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
452 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
453 // We will cancel all outstanding requests.
454 pending_jobs_.clear();
455
456 for (auto& executor : executors_) {
457 executor->Destroy();
458 }
459 }
460
GetProxyForURL(const GURL & url,const NetworkAnonymizationKey & network_anonymization_key,ProxyInfo * results,CompletionOnceCallback callback,std::unique_ptr<Request> * request,const NetLogWithSource & net_log)461 int MultiThreadedProxyResolver::GetProxyForURL(
462 const GURL& url,
463 const NetworkAnonymizationKey& network_anonymization_key,
464 ProxyInfo* results,
465 CompletionOnceCallback callback,
466 std::unique_ptr<Request>* request,
467 const NetLogWithSource& net_log) {
468 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
469 DCHECK(!callback.is_null());
470
471 auto job = base::MakeRefCounted<GetProxyForURLJob>(
472 url, network_anonymization_key, results, std::move(callback), net_log);
473
474 // Completion will be notified through |callback|, unless the caller cancels
475 // the request using |request|.
476 if (request)
477 *request = std::make_unique<RequestImpl>(job);
478
479 // If there is an executor that is ready to run this request, submit it!
480 Executor* executor = FindIdleExecutor();
481 if (executor) {
482 DCHECK_EQ(0u, pending_jobs_.size());
483 executor->StartJob(job);
484 return ERR_IO_PENDING;
485 }
486
487 // Otherwise queue this request. (We will schedule it to a thread once one
488 // becomes available).
489 job->WaitingForThread();
490 pending_jobs_.push_back(job);
491
492 // If we haven't already reached the thread limit, provision a new thread to
493 // drain the requests more quickly.
494 if (executors_.size() < max_num_threads_)
495 AddNewExecutor();
496
497 return ERR_IO_PENDING;
498 }
499
FindIdleExecutor()500 Executor* MultiThreadedProxyResolver::FindIdleExecutor() {
501 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
502 for (auto& executor : executors_) {
503 if (!executor->outstanding_job())
504 return executor.get();
505 }
506 return nullptr;
507 }
508
AddNewExecutor()509 void MultiThreadedProxyResolver::AddNewExecutor() {
510 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
511 DCHECK_LT(executors_.size(), max_num_threads_);
512 // The "thread number" is used to give the thread a unique name.
513 int thread_number = executors_.size();
514
515 auto executor = base::MakeRefCounted<Executor>(this, thread_number);
516 executor->StartJob(base::MakeRefCounted<CreateResolverJob>(
517 script_data_, resolver_factory_.get()));
518 executors_.push_back(std::move(executor));
519 }
520
OnExecutorReady(Executor * executor)521 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
522 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
523 while (!pending_jobs_.empty()) {
524 scoped_refptr<Job> job = pending_jobs_.front();
525 pending_jobs_.pop_front();
526 if (!job->was_cancelled()) {
527 executor->StartJob(std::move(job));
528 return;
529 }
530 }
531 }
532
533 } // namespace
534
535 class MultiThreadedProxyResolverFactory::Job
536 : public ProxyResolverFactory::Request,
537 public Executor::Coordinator {
538 public:
Job(MultiThreadedProxyResolverFactory * factory,const scoped_refptr<PacFileData> & script_data,std::unique_ptr<ProxyResolver> * resolver,std::unique_ptr<ProxyResolverFactory> resolver_factory,size_t max_num_threads,CompletionOnceCallback callback)539 Job(MultiThreadedProxyResolverFactory* factory,
540 const scoped_refptr<PacFileData>& script_data,
541 std::unique_ptr<ProxyResolver>* resolver,
542 std::unique_ptr<ProxyResolverFactory> resolver_factory,
543 size_t max_num_threads,
544 CompletionOnceCallback callback)
545 : factory_(factory),
546 resolver_out_(resolver),
547 resolver_factory_(std::move(resolver_factory)),
548 max_num_threads_(max_num_threads),
549 script_data_(script_data),
550 executor_(base::MakeRefCounted<Executor>(this, 0)),
551 callback_(std::move(callback)) {
552 executor_->StartJob(base::MakeRefCounted<CreateResolverJob>(
553 script_data_, resolver_factory_.get()));
554 }
555
~Job()556 ~Job() override {
557 if (factory_) {
558 executor_->Destroy();
559 factory_->RemoveJob(this);
560 }
561 }
562
FactoryDestroyed()563 void FactoryDestroyed() {
564 executor_->Destroy();
565 executor_ = nullptr;
566 factory_ = nullptr;
567 resolver_out_ = nullptr;
568 }
569
570 private:
OnExecutorReady(Executor * executor)571 void OnExecutorReady(Executor* executor) override {
572 int error = OK;
573 if (executor->resolver()) {
574 *resolver_out_ = std::make_unique<MultiThreadedProxyResolver>(
575 std::move(resolver_factory_), max_num_threads_,
576 std::move(script_data_), executor_);
577 } else {
578 error = ERR_PAC_SCRIPT_FAILED;
579 executor_->Destroy();
580 }
581 factory_->RemoveJob(this);
582 factory_ = nullptr;
583 std::move(callback_).Run(error);
584 }
585
586 raw_ptr<MultiThreadedProxyResolverFactory> factory_;
587 raw_ptr<std::unique_ptr<ProxyResolver>> resolver_out_;
588 std::unique_ptr<ProxyResolverFactory> resolver_factory_;
589 const size_t max_num_threads_;
590 scoped_refptr<PacFileData> script_data_;
591 scoped_refptr<Executor> executor_;
592 CompletionOnceCallback callback_;
593 };
594
MultiThreadedProxyResolverFactory(size_t max_num_threads,bool factory_expects_bytes)595 MultiThreadedProxyResolverFactory::MultiThreadedProxyResolverFactory(
596 size_t max_num_threads,
597 bool factory_expects_bytes)
598 : ProxyResolverFactory(factory_expects_bytes),
599 max_num_threads_(max_num_threads) {
600 DCHECK_GE(max_num_threads, 1u);
601 }
602
~MultiThreadedProxyResolverFactory()603 MultiThreadedProxyResolverFactory::~MultiThreadedProxyResolverFactory() {
604 for (Job* job : jobs_) {
605 job->FactoryDestroyed();
606 }
607 }
608
CreateProxyResolver(const scoped_refptr<PacFileData> & pac_script,std::unique_ptr<ProxyResolver> * resolver,CompletionOnceCallback callback,std::unique_ptr<Request> * request)609 int MultiThreadedProxyResolverFactory::CreateProxyResolver(
610 const scoped_refptr<PacFileData>& pac_script,
611 std::unique_ptr<ProxyResolver>* resolver,
612 CompletionOnceCallback callback,
613 std::unique_ptr<Request>* request) {
614 auto job = std::make_unique<Job>(this, pac_script, resolver,
615 CreateProxyResolverFactory(),
616 max_num_threads_, std::move(callback));
617 jobs_.insert(job.get());
618 *request = std::move(job);
619 return ERR_IO_PENDING;
620 }
621
RemoveJob(MultiThreadedProxyResolverFactory::Job * job)622 void MultiThreadedProxyResolverFactory::RemoveJob(
623 MultiThreadedProxyResolverFactory::Job* job) {
624 size_t erased = jobs_.erase(job);
625 DCHECK_EQ(1u, erased);
626 }
627
628 } // namespace net
629