1 // Copyright 2018 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/base/prioritized_task_runner.h"
6
7 #include <algorithm>
8
9 #include "base/functional/bind.h"
10 #include "base/task/task_runner.h"
11 #include "base/task/thread_pool.h"
12
13 namespace net {
14
Job(const base::Location & from_here,base::OnceClosure task,base::OnceClosure reply,uint32_t priority,uint32_t task_count)15 PrioritizedTaskRunner::Job::Job(const base::Location& from_here,
16 base::OnceClosure task,
17 base::OnceClosure reply,
18 uint32_t priority,
19 uint32_t task_count)
20 : from_here(from_here),
21 task(std::move(task)),
22 reply(std::move(reply)),
23 priority(priority),
24 task_count(task_count) {}
25
26 PrioritizedTaskRunner::Job::Job() = default;
27
28 PrioritizedTaskRunner::Job::~Job() = default;
29 PrioritizedTaskRunner::Job::Job(Job&& other) = default;
30 PrioritizedTaskRunner::Job& PrioritizedTaskRunner::Job::operator=(Job&& other) =
31 default;
32
PrioritizedTaskRunner(const base::TaskTraits & task_traits)33 PrioritizedTaskRunner::PrioritizedTaskRunner(
34 const base::TaskTraits& task_traits)
35 : task_traits_(task_traits) {}
36
PostTaskAndReply(const base::Location & from_here,base::OnceClosure task,base::OnceClosure reply,uint32_t priority)37 void PrioritizedTaskRunner::PostTaskAndReply(const base::Location& from_here,
38 base::OnceClosure task,
39 base::OnceClosure reply,
40 uint32_t priority) {
41 Job job(from_here, std::move(task), std::move(reply), priority,
42 task_count_++);
43 {
44 base::AutoLock lock(task_job_heap_lock_);
45 task_job_heap_.push_back(std::move(job));
46 std::push_heap(task_job_heap_.begin(), task_job_heap_.end(), JobComparer());
47 }
48
49 scoped_refptr<base::TaskRunner> task_runner;
50 if (task_runner_for_testing_) {
51 task_runner = task_runner_for_testing_;
52 } else {
53 task_runner = base::ThreadPool::CreateSequencedTaskRunner(task_traits_);
54 }
55
56 task_runner->PostTaskAndReply(
57 from_here,
58 base::BindOnce(&PrioritizedTaskRunner::RunTaskAndPostReply, this),
59 base::BindOnce(&PrioritizedTaskRunner::RunReply, this));
60 }
61
62 PrioritizedTaskRunner::~PrioritizedTaskRunner() = default;
63
RunTaskAndPostReply()64 void PrioritizedTaskRunner::RunTaskAndPostReply() {
65 // Find the next job to run.
66 Job job;
67 {
68 base::AutoLock lock(task_job_heap_lock_);
69 std::pop_heap(task_job_heap_.begin(), task_job_heap_.end(), JobComparer());
70 job = std::move(task_job_heap_.back());
71 task_job_heap_.pop_back();
72 }
73
74 std::move(job.task).Run();
75
76 // Add the job to the reply priority queue.
77 base::AutoLock reply_lock(reply_job_heap_lock_);
78 reply_job_heap_.push_back(std::move(job));
79 std::push_heap(reply_job_heap_.begin(), reply_job_heap_.end(), JobComparer());
80 }
81
RunReply()82 void PrioritizedTaskRunner::RunReply() {
83 // Find the next job to run.
84 Job job;
85 {
86 base::AutoLock lock(reply_job_heap_lock_);
87 std::pop_heap(reply_job_heap_.begin(), reply_job_heap_.end(),
88 JobComparer());
89 job = std::move(reply_job_heap_.back());
90 reply_job_heap_.pop_back();
91 }
92
93 // Run the job.
94 std::move(job.reply).Run();
95 }
96
97 } // namespace net
98