• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2018 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/base/prioritized_task_runner.h"
6 
7 #include <algorithm>
8 
9 #include "base/functional/bind.h"
10 #include "base/task/task_runner.h"
11 #include "base/task/thread_pool.h"
12 
13 namespace net {
14 
Job(const base::Location & from_here,base::OnceClosure task,base::OnceClosure reply,uint32_t priority,uint32_t task_count)15 PrioritizedTaskRunner::Job::Job(const base::Location& from_here,
16                                 base::OnceClosure task,
17                                 base::OnceClosure reply,
18                                 uint32_t priority,
19                                 uint32_t task_count)
20     : from_here(from_here),
21       task(std::move(task)),
22       reply(std::move(reply)),
23       priority(priority),
24       task_count(task_count) {}
25 
26 PrioritizedTaskRunner::Job::Job() = default;
27 
28 PrioritizedTaskRunner::Job::~Job() = default;
29 PrioritizedTaskRunner::Job::Job(Job&& other) = default;
30 PrioritizedTaskRunner::Job& PrioritizedTaskRunner::Job::operator=(Job&& other) =
31     default;
32 
PrioritizedTaskRunner(const base::TaskTraits & task_traits)33 PrioritizedTaskRunner::PrioritizedTaskRunner(
34     const base::TaskTraits& task_traits)
35     : task_traits_(task_traits) {}
36 
PostTaskAndReply(const base::Location & from_here,base::OnceClosure task,base::OnceClosure reply,uint32_t priority)37 void PrioritizedTaskRunner::PostTaskAndReply(const base::Location& from_here,
38                                              base::OnceClosure task,
39                                              base::OnceClosure reply,
40                                              uint32_t priority) {
41   Job job(from_here, std::move(task), std::move(reply), priority,
42           task_count_++);
43   {
44     base::AutoLock lock(task_job_heap_lock_);
45     task_job_heap_.push_back(std::move(job));
46     std::push_heap(task_job_heap_.begin(), task_job_heap_.end(), JobComparer());
47   }
48 
49   scoped_refptr<base::TaskRunner> task_runner;
50   if (task_runner_for_testing_) {
51     task_runner = task_runner_for_testing_;
52   } else {
53     task_runner = base::ThreadPool::CreateSequencedTaskRunner(task_traits_);
54   }
55 
56   task_runner->PostTaskAndReply(
57       from_here,
58       base::BindOnce(&PrioritizedTaskRunner::RunTaskAndPostReply, this),
59       base::BindOnce(&PrioritizedTaskRunner::RunReply, this));
60 }
61 
62 PrioritizedTaskRunner::~PrioritizedTaskRunner() = default;
63 
RunTaskAndPostReply()64 void PrioritizedTaskRunner::RunTaskAndPostReply() {
65   // Find the next job to run.
66   Job job;
67   {
68     base::AutoLock lock(task_job_heap_lock_);
69     std::pop_heap(task_job_heap_.begin(), task_job_heap_.end(), JobComparer());
70     job = std::move(task_job_heap_.back());
71     task_job_heap_.pop_back();
72   }
73 
74   std::move(job.task).Run();
75 
76   // Add the job to the reply priority queue.
77   base::AutoLock reply_lock(reply_job_heap_lock_);
78   reply_job_heap_.push_back(std::move(job));
79   std::push_heap(reply_job_heap_.begin(), reply_job_heap_.end(), JobComparer());
80 }
81 
RunReply()82 void PrioritizedTaskRunner::RunReply() {
83   // Find the next job to run.
84   Job job;
85   {
86     base::AutoLock lock(reply_job_heap_lock_);
87     std::pop_heap(reply_job_heap_.begin(), reply_job_heap_.end(),
88                   JobComparer());
89     job = std::move(reply_job_heap_.back());
90     reply_job_heap_.pop_back();
91   }
92 
93   // Run the job.
94   std::move(job.reply).Run();
95 }
96 
97 }  // namespace net
98