1 // Copyright 2020 the V8 project authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "src/libplatform/default-job.h"
6
7 #include "src/base/bits.h"
8 #include "src/base/macros.h"
9
10 namespace v8 {
11 namespace platform {
12 namespace {
13
14 // Capped to allow assigning task_ids from a bitfield.
15 constexpr size_t kMaxWorkersPerJob = 32;
16
17 } // namespace
18
~JobDelegate()19 DefaultJobState::JobDelegate::~JobDelegate() {
20 static_assert(kInvalidTaskId >= kMaxWorkersPerJob,
21 "kInvalidTaskId must be outside of the range of valid task_ids "
22 "[0, kMaxWorkersPerJob)");
23 if (task_id_ != kInvalidTaskId) outer_->ReleaseTaskId(task_id_);
24 }
25
GetTaskId()26 uint8_t DefaultJobState::JobDelegate::GetTaskId() {
27 if (task_id_ == kInvalidTaskId) task_id_ = outer_->AcquireTaskId();
28 return task_id_;
29 }
30
DefaultJobState(Platform * platform,std::unique_ptr<JobTask> job_task,TaskPriority priority,size_t num_worker_threads)31 DefaultJobState::DefaultJobState(Platform* platform,
32 std::unique_ptr<JobTask> job_task,
33 TaskPriority priority,
34 size_t num_worker_threads)
35 : platform_(platform),
36 job_task_(std::move(job_task)),
37 priority_(priority),
38 num_worker_threads_(std::min(num_worker_threads, kMaxWorkersPerJob)) {}
39
~DefaultJobState()40 DefaultJobState::~DefaultJobState() { DCHECK_EQ(0U, active_workers_); }
41
NotifyConcurrencyIncrease()42 void DefaultJobState::NotifyConcurrencyIncrease() {
43 if (is_canceled_.load(std::memory_order_relaxed)) return;
44
45 size_t num_tasks_to_post = 0;
46 TaskPriority priority;
47 {
48 base::MutexGuard guard(&mutex_);
49 const size_t max_concurrency = CappedMaxConcurrency(active_workers_);
50 // Consider |pending_tasks_| to avoid posting too many tasks.
51 if (max_concurrency > (active_workers_ + pending_tasks_)) {
52 num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_;
53 pending_tasks_ += num_tasks_to_post;
54 }
55 priority = priority_;
56 }
57 // Post additional worker tasks to reach |max_concurrency|.
58 for (size_t i = 0; i < num_tasks_to_post; ++i) {
59 CallOnWorkerThread(priority, std::make_unique<DefaultJobWorker>(
60 shared_from_this(), job_task_.get()));
61 }
62 }
63
AcquireTaskId()64 uint8_t DefaultJobState::AcquireTaskId() {
65 static_assert(kMaxWorkersPerJob <= sizeof(assigned_task_ids_) * 8,
66 "TaskId bitfield isn't big enough to fit kMaxWorkersPerJob.");
67 uint32_t assigned_task_ids =
68 assigned_task_ids_.load(std::memory_order_relaxed);
69 DCHECK_LE(v8::base::bits::CountPopulation(assigned_task_ids) + 1,
70 kMaxWorkersPerJob);
71 uint32_t new_assigned_task_ids = 0;
72 uint8_t task_id = 0;
73 // memory_order_acquire on success, matched with memory_order_release in
74 // ReleaseTaskId() so that operations done by previous threads that had
75 // the same task_id become visible to the current thread.
76 do {
77 // Count trailing one bits. This is the id of the right-most 0-bit in
78 // |assigned_task_ids|.
79 task_id = v8::base::bits::CountTrailingZeros32(~assigned_task_ids);
80 new_assigned_task_ids = assigned_task_ids | (uint32_t(1) << task_id);
81 } while (!assigned_task_ids_.compare_exchange_weak(
82 assigned_task_ids, new_assigned_task_ids, std::memory_order_acquire,
83 std::memory_order_relaxed));
84 return task_id;
85 }
86
ReleaseTaskId(uint8_t task_id)87 void DefaultJobState::ReleaseTaskId(uint8_t task_id) {
88 // memory_order_release to match AcquireTaskId().
89 uint32_t previous_task_ids = assigned_task_ids_.fetch_and(
90 ~(uint32_t(1) << task_id), std::memory_order_release);
91 DCHECK(previous_task_ids & (uint32_t(1) << task_id));
92 USE(previous_task_ids);
93 }
94
Join()95 void DefaultJobState::Join() {
96 bool can_run = false;
97 {
98 base::MutexGuard guard(&mutex_);
99 priority_ = TaskPriority::kUserBlocking;
100 // Reserve a worker for the joining thread. GetMaxConcurrency() is ignored
101 // here, but WaitForParticipationOpportunityLockRequired() waits for
102 // workers to return if necessary so we don't exceed GetMaxConcurrency().
103 num_worker_threads_ = platform_->NumberOfWorkerThreads() + 1;
104 ++active_workers_;
105 can_run = WaitForParticipationOpportunityLockRequired();
106 }
107 DefaultJobState::JobDelegate delegate(this, true);
108 while (can_run) {
109 job_task_->Run(&delegate);
110 base::MutexGuard guard(&mutex_);
111 can_run = WaitForParticipationOpportunityLockRequired();
112 }
113 }
114
CancelAndWait()115 void DefaultJobState::CancelAndWait() {
116 {
117 base::MutexGuard guard(&mutex_);
118 is_canceled_.store(true, std::memory_order_relaxed);
119 while (active_workers_ > 0) {
120 worker_released_condition_.Wait(&mutex_);
121 }
122 }
123 }
124
CancelAndDetach()125 void DefaultJobState::CancelAndDetach() {
126 is_canceled_.store(true, std::memory_order_relaxed);
127 }
128
IsActive()129 bool DefaultJobState::IsActive() {
130 base::MutexGuard guard(&mutex_);
131 return job_task_->GetMaxConcurrency(active_workers_) != 0 ||
132 active_workers_ != 0;
133 }
134
CanRunFirstTask()135 bool DefaultJobState::CanRunFirstTask() {
136 base::MutexGuard guard(&mutex_);
137 --pending_tasks_;
138 if (is_canceled_.load(std::memory_order_relaxed)) return false;
139 if (active_workers_ >= std::min(job_task_->GetMaxConcurrency(active_workers_),
140 num_worker_threads_)) {
141 return false;
142 }
143 // Acquire current worker.
144 ++active_workers_;
145 return true;
146 }
147
DidRunTask()148 bool DefaultJobState::DidRunTask() {
149 size_t num_tasks_to_post = 0;
150 TaskPriority priority;
151 {
152 base::MutexGuard guard(&mutex_);
153 const size_t max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
154 if (is_canceled_.load(std::memory_order_relaxed) ||
155 active_workers_ > max_concurrency) {
156 // Release current worker and notify.
157 --active_workers_;
158 worker_released_condition_.NotifyOne();
159 return false;
160 }
161 // Consider |pending_tasks_| to avoid posting too many tasks.
162 if (max_concurrency > active_workers_ + pending_tasks_) {
163 num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_;
164 pending_tasks_ += num_tasks_to_post;
165 }
166 priority = priority_;
167 }
168 // Post additional worker tasks to reach |max_concurrency| in the case that
169 // max concurrency increased. This is not strictly necessary, since
170 // NotifyConcurrencyIncrease() should eventually be invoked. However, some
171 // users of PostJob() batch work and tend to call NotifyConcurrencyIncrease()
172 // late. Posting here allows us to spawn new workers sooner.
173 for (size_t i = 0; i < num_tasks_to_post; ++i) {
174 CallOnWorkerThread(priority, std::make_unique<DefaultJobWorker>(
175 shared_from_this(), job_task_.get()));
176 }
177 return true;
178 }
179
WaitForParticipationOpportunityLockRequired()180 bool DefaultJobState::WaitForParticipationOpportunityLockRequired() {
181 size_t max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
182 while (active_workers_ > max_concurrency && active_workers_ > 1) {
183 worker_released_condition_.Wait(&mutex_);
184 max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
185 }
186 if (active_workers_ <= max_concurrency) return true;
187 DCHECK_EQ(1U, active_workers_);
188 DCHECK_EQ(0U, max_concurrency);
189 active_workers_ = 0;
190 is_canceled_.store(true, std::memory_order_relaxed);
191 return false;
192 }
193
CappedMaxConcurrency(size_t worker_count) const194 size_t DefaultJobState::CappedMaxConcurrency(size_t worker_count) const {
195 return std::min(job_task_->GetMaxConcurrency(worker_count),
196 num_worker_threads_);
197 }
198
CallOnWorkerThread(TaskPriority priority,std::unique_ptr<Task> task)199 void DefaultJobState::CallOnWorkerThread(TaskPriority priority,
200 std::unique_ptr<Task> task) {
201 switch (priority) {
202 case TaskPriority::kBestEffort:
203 return platform_->CallLowPriorityTaskOnWorkerThread(std::move(task));
204 case TaskPriority::kUserVisible:
205 return platform_->CallOnWorkerThread(std::move(task));
206 case TaskPriority::kUserBlocking:
207 return platform_->CallBlockingTaskOnWorkerThread(std::move(task));
208 }
209 }
210
UpdatePriority(TaskPriority priority)211 void DefaultJobState::UpdatePriority(TaskPriority priority) {
212 base::MutexGuard guard(&mutex_);
213 priority_ = priority;
214 }
215
DefaultJobHandle(std::shared_ptr<DefaultJobState> state)216 DefaultJobHandle::DefaultJobHandle(std::shared_ptr<DefaultJobState> state)
217 : state_(std::move(state)) {
218 state_->NotifyConcurrencyIncrease();
219 }
220
~DefaultJobHandle()221 DefaultJobHandle::~DefaultJobHandle() { DCHECK_EQ(nullptr, state_); }
222
Join()223 void DefaultJobHandle::Join() {
224 state_->Join();
225 state_ = nullptr;
226 }
Cancel()227 void DefaultJobHandle::Cancel() {
228 state_->CancelAndWait();
229 state_ = nullptr;
230 }
231
CancelAndDetach()232 void DefaultJobHandle::CancelAndDetach() {
233 state_->CancelAndDetach();
234 state_ = nullptr;
235 }
236
IsActive()237 bool DefaultJobHandle::IsActive() { return state_->IsActive(); }
238
UpdatePriority(TaskPriority priority)239 void DefaultJobHandle::UpdatePriority(TaskPriority priority) {
240 state_->UpdatePriority(priority);
241 }
242
243 } // namespace platform
244 } // namespace v8
245