1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/post_job.h"
6
7 #include "base/feature_list.h"
8 #include "base/task/scoped_set_task_priority_for_current_thread.h"
9 #include "base/task/task_features.h"
10 #include "base/task/thread_pool/job_task_source.h"
11 #include "base/task/thread_pool/job_task_source_old.h"
12 #include "base/task/thread_pool/pooled_task_runner_delegate.h"
13 #include "base/task/thread_pool/thread_pool_impl.h"
14 #include "base/task/thread_pool/thread_pool_instance.h"
15
16 namespace base {
17
18 namespace {
19
CreateJobTaskSource(const Location & from_here,const TaskTraits & traits,RepeatingCallback<void (JobDelegate *)> worker_task,MaxConcurrencyCallback max_concurrency_callback)20 scoped_refptr<internal::JobTaskSource> CreateJobTaskSource(
21 const Location& from_here,
22 const TaskTraits& traits,
23 RepeatingCallback<void(JobDelegate*)> worker_task,
24 MaxConcurrencyCallback max_concurrency_callback) {
25 DCHECK(ThreadPoolInstance::Get())
26 << "Hint: if this is in a unit test, you're likely merely missing a "
27 "base::test::TaskEnvironment member in your fixture.\n";
28
29 return internal::CreateJobTaskSource(
30 from_here, traits, std::move(worker_task),
31 std::move(max_concurrency_callback),
32 static_cast<internal::ThreadPoolImpl*>(ThreadPoolInstance::Get()));
33 }
34
35 } // namespace
36
JobDelegate(internal::JobTaskSource * task_source,internal::PooledTaskRunnerDelegate * pooled_task_runner_delegate)37 JobDelegate::JobDelegate(
38 internal::JobTaskSource* task_source,
39 internal::PooledTaskRunnerDelegate* pooled_task_runner_delegate)
40 : task_source_(task_source),
41 pooled_task_runner_delegate_(pooled_task_runner_delegate) {
42 DCHECK(task_source_);
43 }
44
~JobDelegate()45 JobDelegate::~JobDelegate() {
46 if (task_id_ != kInvalidTaskId)
47 task_source_->ReleaseTaskId(task_id_);
48 }
49
ShouldYield()50 bool JobDelegate::ShouldYield() {
51 #if DCHECK_IS_ON()
52 // ShouldYield() shouldn't be called again after returning true.
53 DCHECK(!last_should_yield_);
54 #endif // DCHECK_IS_ON()
55 const bool should_yield =
56 task_source_->ShouldYield() ||
57 (pooled_task_runner_delegate_ &&
58 pooled_task_runner_delegate_->ShouldYield(task_source_));
59
60 #if DCHECK_IS_ON()
61 last_should_yield_ = should_yield;
62 #endif // DCHECK_IS_ON()
63 return should_yield;
64 }
65
YieldIfNeeded()66 void JobDelegate::YieldIfNeeded() {
67 // TODO(crbug.com/839091): Implement this.
68 }
69
NotifyConcurrencyIncrease()70 void JobDelegate::NotifyConcurrencyIncrease() {
71 task_source_->NotifyConcurrencyIncrease();
72 }
73
GetTaskId()74 uint8_t JobDelegate::GetTaskId() {
75 if (task_id_ == kInvalidTaskId)
76 task_id_ = task_source_->AcquireTaskId();
77 return task_id_;
78 }
79
80 JobHandle::JobHandle() = default;
81
JobHandle(scoped_refptr<internal::JobTaskSource> task_source)82 JobHandle::JobHandle(scoped_refptr<internal::JobTaskSource> task_source)
83 : task_source_(std::move(task_source)) {}
84
~JobHandle()85 JobHandle::~JobHandle() {
86 DCHECK(!task_source_)
87 << "The Job must be cancelled, detached or joined before its "
88 "JobHandle is destroyed.";
89 }
90
91 JobHandle::JobHandle(JobHandle&&) = default;
92
operator =(JobHandle && other)93 JobHandle& JobHandle::operator=(JobHandle&& other) {
94 DCHECK(!task_source_)
95 << "The Job must be cancelled, detached or joined before its "
96 "JobHandle is re-assigned.";
97 task_source_ = std::move(other.task_source_);
98 return *this;
99 }
100
IsActive() const101 bool JobHandle::IsActive() const {
102 return task_source_->IsActive();
103 }
104
UpdatePriority(TaskPriority new_priority)105 void JobHandle::UpdatePriority(TaskPriority new_priority) {
106 if (!internal::PooledTaskRunnerDelegate::MatchesCurrentDelegate(
107 task_source_->GetDelegate())) {
108 return;
109 }
110 task_source_->GetDelegate()->UpdateJobPriority(task_source_, new_priority);
111 }
112
NotifyConcurrencyIncrease()113 void JobHandle::NotifyConcurrencyIncrease() {
114 if (!internal::PooledTaskRunnerDelegate::MatchesCurrentDelegate(
115 task_source_->GetDelegate())) {
116 return;
117 }
118 task_source_->NotifyConcurrencyIncrease();
119 }
120
Join()121 void JobHandle::Join() {
122 DCHECK(internal::PooledTaskRunnerDelegate::MatchesCurrentDelegate(
123 task_source_->GetDelegate()));
124 DCHECK_GE(internal::GetTaskPriorityForCurrentThread(),
125 task_source_->priority_racy())
126 << "Join may not be called on Job with higher priority than the current "
127 "thread.";
128 UpdatePriority(internal::GetTaskPriorityForCurrentThread());
129
130 // Ensure that the job is queued if it has remaining concurrency. This is
131 // necessary to support CreateJob(...).Join().
132 task_source_->NotifyConcurrencyIncrease();
133
134 bool must_run = task_source_->WillJoin();
135 while (must_run)
136 must_run = task_source_->RunJoinTask();
137 // Remove |task_source_| from the ThreadPool to prevent access to
138 // |max_concurrency_callback| after Join().
139 task_source_->GetDelegate()->RemoveJobTaskSource(task_source_);
140 task_source_ = nullptr;
141 }
142
Cancel()143 void JobHandle::Cancel() {
144 DCHECK(internal::PooledTaskRunnerDelegate::MatchesCurrentDelegate(
145 task_source_->GetDelegate()));
146 task_source_->Cancel();
147 bool must_run = task_source_->WillJoin();
148 DCHECK(!must_run);
149 // Remove |task_source_| from the ThreadPool to prevent access to
150 // |max_concurrency_callback| after Join().
151 task_source_->GetDelegate()->RemoveJobTaskSource(task_source_);
152 task_source_ = nullptr;
153 }
154
CancelAndDetach()155 void JobHandle::CancelAndDetach() {
156 task_source_->Cancel();
157 Detach();
158 }
159
Detach()160 void JobHandle::Detach() {
161 DCHECK(task_source_);
162 task_source_ = nullptr;
163 }
164
PostJob(const Location & from_here,const TaskTraits & traits,RepeatingCallback<void (JobDelegate *)> worker_task,MaxConcurrencyCallback max_concurrency_callback)165 JobHandle PostJob(const Location& from_here,
166 const TaskTraits& traits,
167 RepeatingCallback<void(JobDelegate*)> worker_task,
168 MaxConcurrencyCallback max_concurrency_callback) {
169 auto task_source =
170 CreateJobTaskSource(from_here, traits, std::move(worker_task),
171 std::move(max_concurrency_callback));
172 task_source->NotifyConcurrencyIncrease();
173 return internal::JobTaskSource::CreateJobHandle(std::move(task_source));
174 }
175
CreateJob(const Location & from_here,const TaskTraits & traits,RepeatingCallback<void (JobDelegate *)> worker_task,MaxConcurrencyCallback max_concurrency_callback)176 JobHandle CreateJob(const Location& from_here,
177 const TaskTraits& traits,
178 RepeatingCallback<void(JobDelegate*)> worker_task,
179 MaxConcurrencyCallback max_concurrency_callback) {
180 auto task_source =
181 CreateJobTaskSource(from_here, traits, std::move(worker_task),
182 std::move(max_concurrency_callback));
183 return internal::JobTaskSource::CreateJobHandle(std::move(task_source));
184 }
185
186 namespace internal {
187
CreateJobTaskSource(const Location & from_here,const TaskTraits & traits,RepeatingCallback<void (JobDelegate *)> worker_task,MaxConcurrencyCallback max_concurrency_callback,PooledTaskRunnerDelegate * delegate)188 scoped_refptr<JobTaskSource> CreateJobTaskSource(
189 const Location& from_here,
190 const TaskTraits& traits,
191 RepeatingCallback<void(JobDelegate*)> worker_task,
192 MaxConcurrencyCallback max_concurrency_callback,
193 PooledTaskRunnerDelegate* delegate) {
194 if (base::FeatureList::IsEnabled(kUseNewJobImplementation)) {
195 return base::MakeRefCounted<internal::JobTaskSourceNew>(
196 from_here, traits, std::move(worker_task),
197 std::move(max_concurrency_callback), delegate);
198 }
199
200 return base::MakeRefCounted<internal::JobTaskSourceOld>(
201 from_here, traits, std::move(worker_task),
202 std::move(max_concurrency_callback), delegate);
203 }
204
205 } // namespace internal
206
207 } // namespace base
208