• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/post_job.h"
6 
7 #include "base/feature_list.h"
8 #include "base/task/scoped_set_task_priority_for_current_thread.h"
9 #include "base/task/task_features.h"
10 #include "base/task/thread_pool/job_task_source.h"
11 #include "base/task/thread_pool/job_task_source_old.h"
12 #include "base/task/thread_pool/pooled_task_runner_delegate.h"
13 #include "base/task/thread_pool/thread_pool_impl.h"
14 #include "base/task/thread_pool/thread_pool_instance.h"
15 
16 namespace base {
17 
18 namespace {
19 
CreateJobTaskSource(const Location & from_here,const TaskTraits & traits,RepeatingCallback<void (JobDelegate *)> worker_task,MaxConcurrencyCallback max_concurrency_callback)20 scoped_refptr<internal::JobTaskSource> CreateJobTaskSource(
21     const Location& from_here,
22     const TaskTraits& traits,
23     RepeatingCallback<void(JobDelegate*)> worker_task,
24     MaxConcurrencyCallback max_concurrency_callback) {
25   DCHECK(ThreadPoolInstance::Get())
26       << "Hint: if this is in a unit test, you're likely merely missing a "
27          "base::test::TaskEnvironment member in your fixture.\n";
28 
29   return internal::CreateJobTaskSource(
30       from_here, traits, std::move(worker_task),
31       std::move(max_concurrency_callback),
32       static_cast<internal::ThreadPoolImpl*>(ThreadPoolInstance::Get()));
33 }
34 
35 }  // namespace
36 
JobDelegate(internal::JobTaskSource * task_source,internal::PooledTaskRunnerDelegate * pooled_task_runner_delegate)37 JobDelegate::JobDelegate(
38     internal::JobTaskSource* task_source,
39     internal::PooledTaskRunnerDelegate* pooled_task_runner_delegate)
40     : task_source_(task_source),
41       pooled_task_runner_delegate_(pooled_task_runner_delegate) {
42   DCHECK(task_source_);
43 }
44 
~JobDelegate()45 JobDelegate::~JobDelegate() {
46   if (task_id_ != kInvalidTaskId)
47     task_source_->ReleaseTaskId(task_id_);
48 }
49 
ShouldYield()50 bool JobDelegate::ShouldYield() {
51 #if DCHECK_IS_ON()
52   // ShouldYield() shouldn't be called again after returning true.
53   DCHECK(!last_should_yield_);
54 #endif  // DCHECK_IS_ON()
55   const bool should_yield =
56       task_source_->ShouldYield() ||
57       (pooled_task_runner_delegate_ &&
58        pooled_task_runner_delegate_->ShouldYield(task_source_));
59 
60 #if DCHECK_IS_ON()
61   last_should_yield_ = should_yield;
62 #endif  // DCHECK_IS_ON()
63   return should_yield;
64 }
65 
YieldIfNeeded()66 void JobDelegate::YieldIfNeeded() {
67   // TODO(crbug.com/839091): Implement this.
68 }
69 
NotifyConcurrencyIncrease()70 void JobDelegate::NotifyConcurrencyIncrease() {
71   task_source_->NotifyConcurrencyIncrease();
72 }
73 
GetTaskId()74 uint8_t JobDelegate::GetTaskId() {
75   if (task_id_ == kInvalidTaskId)
76     task_id_ = task_source_->AcquireTaskId();
77   return task_id_;
78 }
79 
80 JobHandle::JobHandle() = default;
81 
JobHandle(scoped_refptr<internal::JobTaskSource> task_source)82 JobHandle::JobHandle(scoped_refptr<internal::JobTaskSource> task_source)
83     : task_source_(std::move(task_source)) {}
84 
~JobHandle()85 JobHandle::~JobHandle() {
86   DCHECK(!task_source_)
87       << "The Job must be cancelled, detached or joined before its "
88          "JobHandle is destroyed.";
89 }
90 
91 JobHandle::JobHandle(JobHandle&&) = default;
92 
operator =(JobHandle && other)93 JobHandle& JobHandle::operator=(JobHandle&& other) {
94   DCHECK(!task_source_)
95       << "The Job must be cancelled, detached or joined before its "
96          "JobHandle is re-assigned.";
97   task_source_ = std::move(other.task_source_);
98   return *this;
99 }
100 
IsActive() const101 bool JobHandle::IsActive() const {
102   return task_source_->IsActive();
103 }
104 
UpdatePriority(TaskPriority new_priority)105 void JobHandle::UpdatePriority(TaskPriority new_priority) {
106   if (!internal::PooledTaskRunnerDelegate::MatchesCurrentDelegate(
107           task_source_->GetDelegate())) {
108     return;
109   }
110   task_source_->GetDelegate()->UpdateJobPriority(task_source_, new_priority);
111 }
112 
NotifyConcurrencyIncrease()113 void JobHandle::NotifyConcurrencyIncrease() {
114   if (!internal::PooledTaskRunnerDelegate::MatchesCurrentDelegate(
115           task_source_->GetDelegate())) {
116     return;
117   }
118   task_source_->NotifyConcurrencyIncrease();
119 }
120 
Join()121 void JobHandle::Join() {
122   DCHECK(internal::PooledTaskRunnerDelegate::MatchesCurrentDelegate(
123       task_source_->GetDelegate()));
124   DCHECK_GE(internal::GetTaskPriorityForCurrentThread(),
125             task_source_->priority_racy())
126       << "Join may not be called on Job with higher priority than the current "
127          "thread.";
128   UpdatePriority(internal::GetTaskPriorityForCurrentThread());
129 
130   // Ensure that the job is queued if it has remaining concurrency. This is
131   // necessary to support CreateJob(...).Join().
132   task_source_->NotifyConcurrencyIncrease();
133 
134   bool must_run = task_source_->WillJoin();
135   while (must_run)
136     must_run = task_source_->RunJoinTask();
137   // Remove |task_source_| from the ThreadPool to prevent access to
138   // |max_concurrency_callback| after Join().
139   task_source_->GetDelegate()->RemoveJobTaskSource(task_source_);
140   task_source_ = nullptr;
141 }
142 
Cancel()143 void JobHandle::Cancel() {
144   DCHECK(internal::PooledTaskRunnerDelegate::MatchesCurrentDelegate(
145       task_source_->GetDelegate()));
146   task_source_->Cancel();
147   bool must_run = task_source_->WillJoin();
148   DCHECK(!must_run);
149   // Remove |task_source_| from the ThreadPool to prevent access to
150   // |max_concurrency_callback| after Join().
151   task_source_->GetDelegate()->RemoveJobTaskSource(task_source_);
152   task_source_ = nullptr;
153 }
154 
CancelAndDetach()155 void JobHandle::CancelAndDetach() {
156   task_source_->Cancel();
157   Detach();
158 }
159 
Detach()160 void JobHandle::Detach() {
161   DCHECK(task_source_);
162   task_source_ = nullptr;
163 }
164 
PostJob(const Location & from_here,const TaskTraits & traits,RepeatingCallback<void (JobDelegate *)> worker_task,MaxConcurrencyCallback max_concurrency_callback)165 JobHandle PostJob(const Location& from_here,
166                   const TaskTraits& traits,
167                   RepeatingCallback<void(JobDelegate*)> worker_task,
168                   MaxConcurrencyCallback max_concurrency_callback) {
169   auto task_source =
170       CreateJobTaskSource(from_here, traits, std::move(worker_task),
171                           std::move(max_concurrency_callback));
172   task_source->NotifyConcurrencyIncrease();
173   return internal::JobTaskSource::CreateJobHandle(std::move(task_source));
174 }
175 
CreateJob(const Location & from_here,const TaskTraits & traits,RepeatingCallback<void (JobDelegate *)> worker_task,MaxConcurrencyCallback max_concurrency_callback)176 JobHandle CreateJob(const Location& from_here,
177                     const TaskTraits& traits,
178                     RepeatingCallback<void(JobDelegate*)> worker_task,
179                     MaxConcurrencyCallback max_concurrency_callback) {
180   auto task_source =
181       CreateJobTaskSource(from_here, traits, std::move(worker_task),
182                           std::move(max_concurrency_callback));
183   return internal::JobTaskSource::CreateJobHandle(std::move(task_source));
184 }
185 
186 namespace internal {
187 
CreateJobTaskSource(const Location & from_here,const TaskTraits & traits,RepeatingCallback<void (JobDelegate *)> worker_task,MaxConcurrencyCallback max_concurrency_callback,PooledTaskRunnerDelegate * delegate)188 scoped_refptr<JobTaskSource> CreateJobTaskSource(
189     const Location& from_here,
190     const TaskTraits& traits,
191     RepeatingCallback<void(JobDelegate*)> worker_task,
192     MaxConcurrencyCallback max_concurrency_callback,
193     PooledTaskRunnerDelegate* delegate) {
194   if (base::FeatureList::IsEnabled(kUseNewJobImplementation)) {
195     return base::MakeRefCounted<internal::JobTaskSourceNew>(
196         from_here, traits, std::move(worker_task),
197         std::move(max_concurrency_callback), delegate);
198   }
199 
200   return base::MakeRefCounted<internal::JobTaskSourceOld>(
201       from_here, traits, std::move(worker_task),
202       std::move(max_concurrency_callback), delegate);
203 }
204 
205 }  // namespace internal
206 
207 }  // namespace base
208