• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 the V8 project authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "src/libplatform/default-job.h"
6 
7 #include "src/base/bits.h"
8 #include "src/base/macros.h"
9 
10 namespace v8 {
11 namespace platform {
12 namespace {
13 
14 // Capped to allow assigning task_ids from a bitfield.
15 constexpr size_t kMaxWorkersPerJob = 32;
16 
17 }  // namespace
18 
~JobDelegate()19 DefaultJobState::JobDelegate::~JobDelegate() {
20   static_assert(kInvalidTaskId >= kMaxWorkersPerJob,
21                 "kInvalidTaskId must be outside of the range of valid task_ids "
22                 "[0, kMaxWorkersPerJob)");
23   if (task_id_ != kInvalidTaskId) outer_->ReleaseTaskId(task_id_);
24 }
25 
GetTaskId()26 uint8_t DefaultJobState::JobDelegate::GetTaskId() {
27   if (task_id_ == kInvalidTaskId) task_id_ = outer_->AcquireTaskId();
28   return task_id_;
29 }
30 
DefaultJobState(Platform * platform,std::unique_ptr<JobTask> job_task,TaskPriority priority,size_t num_worker_threads)31 DefaultJobState::DefaultJobState(Platform* platform,
32                                  std::unique_ptr<JobTask> job_task,
33                                  TaskPriority priority,
34                                  size_t num_worker_threads)
35     : platform_(platform),
36       job_task_(std::move(job_task)),
37       priority_(priority),
38       num_worker_threads_(std::min(num_worker_threads, kMaxWorkersPerJob)) {}
39 
~DefaultJobState()40 DefaultJobState::~DefaultJobState() { DCHECK_EQ(0U, active_workers_); }
41 
NotifyConcurrencyIncrease()42 void DefaultJobState::NotifyConcurrencyIncrease() {
43   if (is_canceled_.load(std::memory_order_relaxed)) return;
44 
45   size_t num_tasks_to_post = 0;
46   TaskPriority priority;
47   {
48     base::MutexGuard guard(&mutex_);
49     const size_t max_concurrency = CappedMaxConcurrency(active_workers_);
50     // Consider |pending_tasks_| to avoid posting too many tasks.
51     if (max_concurrency > (active_workers_ + pending_tasks_)) {
52       num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_;
53       pending_tasks_ += num_tasks_to_post;
54     }
55     priority = priority_;
56   }
57   // Post additional worker tasks to reach |max_concurrency|.
58   for (size_t i = 0; i < num_tasks_to_post; ++i) {
59     CallOnWorkerThread(priority, std::make_unique<DefaultJobWorker>(
60                                      shared_from_this(), job_task_.get()));
61   }
62 }
63 
AcquireTaskId()64 uint8_t DefaultJobState::AcquireTaskId() {
65   static_assert(kMaxWorkersPerJob <= sizeof(assigned_task_ids_) * 8,
66                 "TaskId bitfield isn't big enough to fit kMaxWorkersPerJob.");
67   uint32_t assigned_task_ids =
68       assigned_task_ids_.load(std::memory_order_relaxed);
69   DCHECK_LE(v8::base::bits::CountPopulation(assigned_task_ids) + 1,
70             kMaxWorkersPerJob);
71   uint32_t new_assigned_task_ids = 0;
72   uint8_t task_id = 0;
73   // memory_order_acquire on success, matched with memory_order_release in
74   // ReleaseTaskId() so that operations done by previous threads that had
75   // the same task_id become visible to the current thread.
76   do {
77     // Count trailing one bits. This is the id of the right-most 0-bit in
78     // |assigned_task_ids|.
79     task_id = v8::base::bits::CountTrailingZeros32(~assigned_task_ids);
80     new_assigned_task_ids = assigned_task_ids | (uint32_t(1) << task_id);
81   } while (!assigned_task_ids_.compare_exchange_weak(
82       assigned_task_ids, new_assigned_task_ids, std::memory_order_acquire,
83       std::memory_order_relaxed));
84   return task_id;
85 }
86 
ReleaseTaskId(uint8_t task_id)87 void DefaultJobState::ReleaseTaskId(uint8_t task_id) {
88   // memory_order_release to match AcquireTaskId().
89   uint32_t previous_task_ids = assigned_task_ids_.fetch_and(
90       ~(uint32_t(1) << task_id), std::memory_order_release);
91   DCHECK(previous_task_ids & (uint32_t(1) << task_id));
92   USE(previous_task_ids);
93 }
94 
Join()95 void DefaultJobState::Join() {
96   bool can_run = false;
97   {
98     base::MutexGuard guard(&mutex_);
99     priority_ = TaskPriority::kUserBlocking;
100     // Reserve a worker for the joining thread. GetMaxConcurrency() is ignored
101     // here, but WaitForParticipationOpportunityLockRequired() waits for
102     // workers to return if necessary so we don't exceed GetMaxConcurrency().
103     num_worker_threads_ = platform_->NumberOfWorkerThreads() + 1;
104     ++active_workers_;
105     can_run = WaitForParticipationOpportunityLockRequired();
106   }
107   DefaultJobState::JobDelegate delegate(this, true);
108   while (can_run) {
109     job_task_->Run(&delegate);
110     base::MutexGuard guard(&mutex_);
111     can_run = WaitForParticipationOpportunityLockRequired();
112   }
113 }
114 
CancelAndWait()115 void DefaultJobState::CancelAndWait() {
116   {
117     base::MutexGuard guard(&mutex_);
118     is_canceled_.store(true, std::memory_order_relaxed);
119     while (active_workers_ > 0) {
120       worker_released_condition_.Wait(&mutex_);
121     }
122   }
123 }
124 
CancelAndDetach()125 void DefaultJobState::CancelAndDetach() {
126   base::MutexGuard guard(&mutex_);
127   is_canceled_.store(true, std::memory_order_relaxed);
128 }
129 
IsActive()130 bool DefaultJobState::IsActive() {
131   base::MutexGuard guard(&mutex_);
132   return job_task_->GetMaxConcurrency(active_workers_) != 0 ||
133          active_workers_ != 0;
134 }
135 
CanRunFirstTask()136 bool DefaultJobState::CanRunFirstTask() {
137   base::MutexGuard guard(&mutex_);
138   --pending_tasks_;
139   if (is_canceled_.load(std::memory_order_relaxed)) return false;
140   if (active_workers_ >= std::min(job_task_->GetMaxConcurrency(active_workers_),
141                                   num_worker_threads_)) {
142     return false;
143   }
144   // Acquire current worker.
145   ++active_workers_;
146   return true;
147 }
148 
DidRunTask()149 bool DefaultJobState::DidRunTask() {
150   size_t num_tasks_to_post = 0;
151   TaskPriority priority;
152   {
153     base::MutexGuard guard(&mutex_);
154     const size_t max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
155     if (is_canceled_.load(std::memory_order_relaxed) ||
156         active_workers_ > max_concurrency) {
157       // Release current worker and notify.
158       --active_workers_;
159       worker_released_condition_.NotifyOne();
160       return false;
161     }
162     // Consider |pending_tasks_| to avoid posting too many tasks.
163     if (max_concurrency > active_workers_ + pending_tasks_) {
164       num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_;
165       pending_tasks_ += num_tasks_to_post;
166     }
167     priority = priority_;
168   }
169   // Post additional worker tasks to reach |max_concurrency| in the case that
170   // max concurrency increased. This is not strictly necessary, since
171   // NotifyConcurrencyIncrease() should eventually be invoked. However, some
172   // users of PostJob() batch work and tend to call NotifyConcurrencyIncrease()
173   // late. Posting here allows us to spawn new workers sooner.
174   for (size_t i = 0; i < num_tasks_to_post; ++i) {
175     CallOnWorkerThread(priority, std::make_unique<DefaultJobWorker>(
176                                      shared_from_this(), job_task_.get()));
177   }
178   return true;
179 }
180 
WaitForParticipationOpportunityLockRequired()181 bool DefaultJobState::WaitForParticipationOpportunityLockRequired() {
182   size_t max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
183   while (active_workers_ > max_concurrency && active_workers_ > 1) {
184     worker_released_condition_.Wait(&mutex_);
185     max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
186   }
187   if (active_workers_ <= max_concurrency) return true;
188   DCHECK_EQ(1U, active_workers_);
189   DCHECK_EQ(0U, max_concurrency);
190   active_workers_ = 0;
191   is_canceled_.store(true, std::memory_order_relaxed);
192   return false;
193 }
194 
CappedMaxConcurrency(size_t worker_count) const195 size_t DefaultJobState::CappedMaxConcurrency(size_t worker_count) const {
196   return std::min(job_task_->GetMaxConcurrency(worker_count),
197                   num_worker_threads_);
198 }
199 
CallOnWorkerThread(TaskPriority priority,std::unique_ptr<Task> task)200 void DefaultJobState::CallOnWorkerThread(TaskPriority priority,
201                                          std::unique_ptr<Task> task) {
202   switch (priority) {
203     case TaskPriority::kBestEffort:
204       return platform_->CallLowPriorityTaskOnWorkerThread(std::move(task));
205     case TaskPriority::kUserVisible:
206       return platform_->CallOnWorkerThread(std::move(task));
207     case TaskPriority::kUserBlocking:
208       return platform_->CallBlockingTaskOnWorkerThread(std::move(task));
209   }
210 }
211 
UpdatePriority(TaskPriority priority)212 void DefaultJobState::UpdatePriority(TaskPriority priority) {
213   base::MutexGuard guard(&mutex_);
214   priority_ = priority;
215 }
216 
DefaultJobHandle(std::shared_ptr<DefaultJobState> state)217 DefaultJobHandle::DefaultJobHandle(std::shared_ptr<DefaultJobState> state)
218     : state_(std::move(state)) {
219   state_->NotifyConcurrencyIncrease();
220 }
221 
~DefaultJobHandle()222 DefaultJobHandle::~DefaultJobHandle() { DCHECK_EQ(nullptr, state_); }
223 
Join()224 void DefaultJobHandle::Join() {
225   state_->Join();
226   state_ = nullptr;
227 }
Cancel()228 void DefaultJobHandle::Cancel() {
229   state_->CancelAndWait();
230   state_ = nullptr;
231 }
232 
CancelAndDetach()233 void DefaultJobHandle::CancelAndDetach() {
234   state_->CancelAndDetach();
235   state_ = nullptr;
236 }
237 
IsActive()238 bool DefaultJobHandle::IsActive() { return state_->IsActive(); }
239 
UpdatePriority(TaskPriority priority)240 void DefaultJobHandle::UpdatePriority(TaskPriority priority) {
241   state_->UpdatePriority(priority);
242 }
243 
244 }  // namespace platform
245 }  // namespace v8
246