• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/post_job.h"
6 
7 #include "base/task/scoped_set_task_priority_for_current_thread.h"
8 #include "base/task/thread_pool/job_task_source.h"
9 #include "base/task/thread_pool/pooled_task_runner_delegate.h"
10 #include "base/task/thread_pool/thread_pool_impl.h"
11 #include "base/task/thread_pool/thread_pool_instance.h"
12 
13 namespace base {
14 
15 namespace {
16 
CreateJobTaskSource(const Location & from_here,const TaskTraits & traits,RepeatingCallback<void (JobDelegate *)> worker_task,MaxConcurrencyCallback max_concurrency_callback)17 scoped_refptr<internal::JobTaskSource> CreateJobTaskSource(
18     const Location& from_here,
19     const TaskTraits& traits,
20     RepeatingCallback<void(JobDelegate*)> worker_task,
21     MaxConcurrencyCallback max_concurrency_callback) {
22   DCHECK(ThreadPoolInstance::Get())
23       << "Hint: if this is in a unit test, you're likely merely missing a "
24          "base::test::TaskEnvironment member in your fixture.\n";
25 
26   return base::MakeRefCounted<internal::JobTaskSource>(
27       from_here, traits, std::move(worker_task),
28       std::move(max_concurrency_callback),
29       static_cast<internal::ThreadPoolImpl*>(ThreadPoolInstance::Get()));
30 }
31 
32 }  // namespace
33 
JobDelegate(internal::JobTaskSource * task_source,internal::PooledTaskRunnerDelegate * pooled_task_runner_delegate)34 JobDelegate::JobDelegate(
35     internal::JobTaskSource* task_source,
36     internal::PooledTaskRunnerDelegate* pooled_task_runner_delegate)
37     : task_source_(task_source),
38       pooled_task_runner_delegate_(pooled_task_runner_delegate) {
39   DCHECK(task_source_);
40 }
41 
~JobDelegate()42 JobDelegate::~JobDelegate() {
43   if (task_id_ != kInvalidTaskId)
44     task_source_->ReleaseTaskId(task_id_);
45 }
46 
ShouldYield()47 bool JobDelegate::ShouldYield() {
48 #if DCHECK_IS_ON()
49   // ShouldYield() shouldn't be called again after returning true.
50   DCHECK(!last_should_yield_);
51 #endif  // DCHECK_IS_ON()
52   const bool should_yield =
53       task_source_->ShouldYield() ||
54       (pooled_task_runner_delegate_ &&
55        pooled_task_runner_delegate_->ShouldYield(task_source_));
56 
57 #if DCHECK_IS_ON()
58   last_should_yield_ = should_yield;
59 #endif  // DCHECK_IS_ON()
60   return should_yield;
61 }
62 
YieldIfNeeded()63 void JobDelegate::YieldIfNeeded() {
64   // TODO(crbug.com/839091): Implement this.
65 }
66 
NotifyConcurrencyIncrease()67 void JobDelegate::NotifyConcurrencyIncrease() {
68   task_source_->NotifyConcurrencyIncrease();
69 }
70 
GetTaskId()71 uint8_t JobDelegate::GetTaskId() {
72   if (task_id_ == kInvalidTaskId)
73     task_id_ = task_source_->AcquireTaskId();
74   return task_id_;
75 }
76 
77 JobHandle::JobHandle() = default;
78 
JobHandle(scoped_refptr<internal::JobTaskSource> task_source)79 JobHandle::JobHandle(scoped_refptr<internal::JobTaskSource> task_source)
80     : task_source_(std::move(task_source)) {}
81 
~JobHandle()82 JobHandle::~JobHandle() {
83   DCHECK(!task_source_)
84       << "The Job must be cancelled, detached or joined before its "
85          "JobHandle is destroyed.";
86 }
87 
88 JobHandle::JobHandle(JobHandle&&) = default;
89 
operator =(JobHandle && other)90 JobHandle& JobHandle::operator=(JobHandle&& other) {
91   DCHECK(!task_source_)
92       << "The Job must be cancelled, detached or joined before its "
93          "JobHandle is re-assigned.";
94   task_source_ = std::move(other.task_source_);
95   return *this;
96 }
97 
IsActive() const98 bool JobHandle::IsActive() const {
99   return task_source_->IsActive();
100 }
101 
UpdatePriority(TaskPriority new_priority)102 void JobHandle::UpdatePriority(TaskPriority new_priority) {
103   if (!internal::PooledTaskRunnerDelegate::MatchesCurrentDelegate(
104           task_source_->delegate())) {
105     return;
106   }
107   task_source_->delegate()->UpdateJobPriority(task_source_, new_priority);
108 }
109 
NotifyConcurrencyIncrease()110 void JobHandle::NotifyConcurrencyIncrease() {
111   if (!internal::PooledTaskRunnerDelegate::MatchesCurrentDelegate(
112           task_source_->delegate())) {
113     return;
114   }
115   task_source_->NotifyConcurrencyIncrease();
116 }
117 
Join()118 void JobHandle::Join() {
119   DCHECK(internal::PooledTaskRunnerDelegate::MatchesCurrentDelegate(
120       task_source_->delegate()));
121   DCHECK_GE(internal::GetTaskPriorityForCurrentThread(),
122             task_source_->priority_racy())
123       << "Join may not be called on Job with higher priority than the current "
124          "thread.";
125   UpdatePriority(internal::GetTaskPriorityForCurrentThread());
126   if (task_source_->GetRemainingConcurrency() != 0) {
127     // Make sure the task source is in the queue if not enough workers are
128     // contributing. This is necessary for CreateJob(...).Join(). This is a
129     // noop if the task source was already in the queue.
130     task_source_->delegate()->EnqueueJobTaskSource(task_source_);
131   }
132   bool must_run = task_source_->WillJoin();
133   while (must_run)
134     must_run = task_source_->RunJoinTask();
135   // Remove |task_source_| from the ThreadPool to prevent access to
136   // |max_concurrency_callback| after Join().
137   task_source_->delegate()->RemoveJobTaskSource(task_source_);
138   task_source_ = nullptr;
139 }
140 
Cancel()141 void JobHandle::Cancel() {
142   DCHECK(internal::PooledTaskRunnerDelegate::MatchesCurrentDelegate(
143       task_source_->delegate()));
144   task_source_->Cancel();
145   bool must_run = task_source_->WillJoin();
146   DCHECK(!must_run);
147   // Remove |task_source_| from the ThreadPool to prevent access to
148   // |max_concurrency_callback| after Join().
149   task_source_->delegate()->RemoveJobTaskSource(task_source_);
150   task_source_ = nullptr;
151 }
152 
CancelAndDetach()153 void JobHandle::CancelAndDetach() {
154   task_source_->Cancel();
155   Detach();
156 }
157 
Detach()158 void JobHandle::Detach() {
159   DCHECK(task_source_);
160   task_source_ = nullptr;
161 }
162 
PostJob(const Location & from_here,const TaskTraits & traits,RepeatingCallback<void (JobDelegate *)> worker_task,MaxConcurrencyCallback max_concurrency_callback)163 JobHandle PostJob(const Location& from_here,
164                   const TaskTraits& traits,
165                   RepeatingCallback<void(JobDelegate*)> worker_task,
166                   MaxConcurrencyCallback max_concurrency_callback) {
167   auto task_source =
168       CreateJobTaskSource(from_here, traits, std::move(worker_task),
169                           std::move(max_concurrency_callback));
170   const bool queued =
171       static_cast<internal::ThreadPoolImpl*>(ThreadPoolInstance::Get())
172           ->EnqueueJobTaskSource(task_source);
173   if (queued)
174     return internal::JobTaskSource::CreateJobHandle(std::move(task_source));
175   return JobHandle();
176 }
177 
CreateJob(const Location & from_here,const TaskTraits & traits,RepeatingCallback<void (JobDelegate *)> worker_task,MaxConcurrencyCallback max_concurrency_callback)178 JobHandle CreateJob(const Location& from_here,
179                     const TaskTraits& traits,
180                     RepeatingCallback<void(JobDelegate*)> worker_task,
181                     MaxConcurrencyCallback max_concurrency_callback) {
182   auto task_source =
183       CreateJobTaskSource(from_here, traits, std::move(worker_task),
184                           std::move(max_concurrency_callback));
185   return internal::JobTaskSource::CreateJobHandle(std::move(task_source));
186 }
187 
188 }  // namespace base
189