• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "libpandabase/taskmanager/task_scheduler.h"
17 #include "libpandabase/utils/logger.h"
18 
19 namespace ark::taskmanager {
20 
21 TaskScheduler *TaskScheduler::instance_ = nullptr;
22 
TaskScheduler(size_t workersCount,TaskTimeStatsType taskTimeStatsType)23 TaskScheduler::TaskScheduler(size_t workersCount, TaskTimeStatsType taskTimeStatsType)
24     : workersCount_(workersCount), taskTimeStatsType_(taskTimeStatsType), selector_(taskQueues_)
25 {
26     switch (taskTimeStatsType) {
27         case TaskTimeStatsType::LIGHT_STATISTICS:
28             taskTimeStats_ = new internal::LightTaskTimeTimeStats(workersCount);
29             break;
30         case TaskTimeStatsType::NO_STATISTICS:
31             break;
32         default:
33             UNREACHABLE();
34     }
35 }
36 
37 /* static */
Create(size_t threadsCount,TaskTimeStatsType taskTimeStatsType)38 TaskScheduler *TaskScheduler::Create(size_t threadsCount, TaskTimeStatsType taskTimeStatsType)
39 {
40     ASSERT(instance_ == nullptr);
41     ASSERT(threadsCount > 0);
42     instance_ = new TaskScheduler(threadsCount, taskTimeStatsType);
43     return instance_;
44 }
45 
46 /* static */
GetTaskScheduler()47 TaskScheduler *TaskScheduler::GetTaskScheduler()
48 {
49     return instance_;
50 }
51 
52 /* static */
Destroy()53 void TaskScheduler::Destroy()
54 {
55     ASSERT(instance_ != nullptr);
56     delete instance_;
57     instance_ = nullptr;
58 }
59 
RegisterQueue(internal::SchedulableTaskQueueInterface * queue)60 TaskQueueId TaskScheduler::RegisterQueue(internal::SchedulableTaskQueueInterface *queue)
61 {
62     os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
63     ASSERT(!start_);
64     LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: Register task queue with {" << queue->GetTaskType() << ", "
65                              << queue->GetVMType() << "}";
66     TaskQueueId id(queue->GetTaskType(), queue->GetVMType());
67     if (UNLIKELY(taskQueues_.find(id) != taskQueues_.end())) {
68         return INVALID_TASKQUEUE_ID;
69     }
70     taskQueues_[id] = queue;
71     queue->SetCallbacks(
72         [this](TaskProperties properties, size_t count) { this->IncrementCounterOfAddedTasks(properties, count); },
73         [this]() { this->SignalWorkers(); });
74 
75     // init countOfTasksIsSystem_ for possible task from registered queue
76     countOfTasksInSystem_[{queue->GetTaskType(), queue->GetVMType(), TaskExecutionMode::FOREGROUND}] = 0U;
77     countOfTasksInSystem_[{queue->GetTaskType(), queue->GetVMType(), TaskExecutionMode::BACKGROUND}] = 0U;
78 
79     return id;
80 }
81 
Initialize()82 void TaskScheduler::Initialize()
83 {
84     ASSERT(!start_);
85     selector_.Init();
86     start_ = true;
87     LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: creates " << workersCount_ << " threads";
88     // Starts all workers
89     for (size_t i = 0; i < workersCount_; i++) {
90         workers_.push_back(new WorkerThread("TSWorker_" + std::to_string(i + 1UL)));
91         LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: created thread with name " << workers_.back()->GetWorkerName();
92     }
93     // Set names of workers and get them info about other ones
94     for (auto *worker : workers_) {
95         worker->RegisterAllWorkersInLocalQueue(workers_);
96     }
97     // Start worker loop executing
98     for (auto *worker : workers_) {
99         worker->Start();
100     }
101     // Atomic with release order reason: other thread should see last value
102     disableHelpers_.store(false, std::memory_order_release);
103 }
104 
StealTaskFromOtherWorker(WorkerThread * taskReceiver)105 void TaskScheduler::StealTaskFromOtherWorker(WorkerThread *taskReceiver)
106 {
107     // WorkerThread tries to find Worker with the most tasks
108     auto chosenWorker =
109         *std::max_element(workers_.begin(), workers_.end(),
110                           [](const WorkerThread *lv, const WorkerThread *rv) { return lv->Size() < rv->Size(); });
111     if (chosenWorker->Size() == 0) {
112         return;
113     }
114     // If worker was successfully found, steals task from its local queue
115     chosenWorker->GiveTasksToAnotherWorker(
116         [taskReceiver](Task &&task) { taskReceiver->SetStolenTask(std::move(task)); }, 1UL,
117         chosenWorker->GetLocalWorkerQueuePopId(taskReceiver));
118 }
119 
FillWithTasks(WorkerThread * worker)120 bool TaskScheduler::FillWithTasks(WorkerThread *worker)
121 {
122     ASSERT(start_);
123     std::queue<Task> readyTasks;
124     {
125         os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
126         // We increment counter of waiters to signal them in future
127         if (AreQueuesEmpty() && !waitList_.HaveReadyValue()) {
128             if (!WaitUntilNewTasks()) {
129                 return true;
130             }
131         }
132         if (!waitList_.HaveReadyValue()) {
133             // Use selector to choose next queue to pop task
134             auto selectedQueue = selector_.SelectQueue();
135             ASSERT(selectedQueue != INVALID_TASKQUEUE_ID);
136             // Getting task from selected queue
137             PutTasksInWorker(worker, selectedQueue);
138             return false;
139         }
140         // Wait list have task that should be added in queues. So worker firstly should add this task in local queue
141         // under lock-holder. Next in should add them in TaskQueues
142         PutWaitTaskInLocalQueue(readyTasks);
143     }
144     // Worker puts tasks to TaskQueues without lock-holder to avoid unnecessary waiting of other workers
145     PutTaskInTaskQueues(readyTasks);
146     return false;
147 }
148 
WaitUntilNewTasks()149 bool TaskScheduler::WaitUntilNewTasks()
150 {
151     // Atomic with acq_rel order reason: sync for counter
152     waitWorkersCount_.fetch_add(1, std::memory_order_acq_rel);
153     while (AreQueuesEmpty() && !waitList_.HaveReadyValue() && !finish_) {
154         queuesWaitCondVar_.TimedWait(&taskSchedulerStateLock_, TASK_WAIT_TIMEOUT);
155     }
156     // Atomic with acq_rel order reason: sync for counter
157     waitWorkersCount_.fetch_sub(1, std::memory_order_acq_rel);
158     return !finish_;
159 }
160 
PutTasksInWorker(WorkerThread * worker,TaskQueueId selectedQueue)161 size_t TaskScheduler::PutTasksInWorker(WorkerThread *worker, TaskQueueId selectedQueue)
162 {
163     auto addTaskFunc = [worker](Task &&task) { worker->AddTask(std::move(task)); };
164     auto queue = taskQueues_[selectedQueue];
165 
166     // Now we calc how many task we want to get from queue. If there are few tasks, then we want them to be evenly
167     // distributed among the workers.
168     size_t size = queue->Size();
169     size_t countToGet = size / workers_.size();
170     countToGet = (countToGet >= WorkerThread::WORKER_QUEUE_SIZE) ? WorkerThread::WORKER_QUEUE_SIZE
171                  : (size % workers_.size() == 0)                 ? countToGet
172                                                                  : countToGet + 1;
173     // Firstly we use method to delete retired ptrs
174     worker->TryDeleteRetiredPtrs();
175     // Execute popping task form queue
176     size_t queueTaskCount = queue->PopTasksToWorker(addTaskFunc, countToGet);
177     LOG(DEBUG, TASK_MANAGER) << worker->GetWorkerName() << ": get tasks " << queueTaskCount << "; ";
178     return queueTaskCount;
179 }
180 
AreQueuesEmpty() const181 bool TaskScheduler::AreQueuesEmpty() const
182 {
183     for ([[maybe_unused]] const auto &[id, queue] : taskQueues_) {
184         ASSERT(queue != nullptr);
185         if (!queue->IsEmpty()) {
186             return false;
187         }
188     }
189     return true;
190 }
191 
AreWorkersEmpty() const192 bool TaskScheduler::AreWorkersEmpty() const
193 {
194     for (auto *worker : workers_) {
195         if (!worker->IsEmpty()) {
196             return false;
197         }
198     }
199     return true;
200 }
201 
AreNoMoreTasks() const202 bool TaskScheduler::AreNoMoreTasks() const
203 {
204     return GetCountOfTasksInSystem() == 0;
205 }
206 
HelpWorkersWithTasks(TaskProperties properties)207 size_t TaskScheduler::HelpWorkersWithTasks(TaskProperties properties)
208 {
209     // Atomic with acquire order reason: getting correct value
210     if (disableHelpers_.load(std::memory_order_acquire)) {
211         return 0;
212     }
213     size_t executedTasksCount = 0;
214     auto *queue = GetQueue({properties.GetTaskType(), properties.GetVMType()});
215     if (queue->HasTaskWithExecutionMode(properties.GetTaskExecutionMode())) {
216         executedTasksCount = GetAndExecuteSetOfTasksFromQueue(properties);
217     } else if (LIKELY(!workers_.empty())) {
218         executedTasksCount = StealAndExecuteOneTaskFromWorkers(properties);
219     }
220     if (UNLIKELY(executedTasksCount == 0)) {
221         LOG(DEBUG, TASK_MANAGER) << "Helper: got no tasks;";
222         return 0;
223     }
224     LOG(DEBUG, TASK_MANAGER) << "Helper: executed tasks: " << executedTasksCount << ";";
225     DecrementCountOfTasksInSystem(properties, executedTasksCount);
226 
227     // Atomic with acquire order reason: get correct value
228     auto waitToFinish = waitToFinish_.load(std::memory_order_acquire);
229     if (waitToFinish > 0 && GetCountOfTasksInSystemWithTaskProperties(properties) == 0) {
230         os::memory::LockHolder taskManagerLockHolder(taskSchedulerStateLock_);
231         finishTasksCondVar_.SignalAll();
232     }
233     return executedTasksCount;
234 }
235 
GetAndExecuteSetOfTasksFromQueue(TaskProperties properties)236 size_t TaskScheduler::GetAndExecuteSetOfTasksFromQueue(TaskProperties properties)
237 {
238     auto *queue = GetQueue({properties.GetTaskType(), properties.GetVMType()});
239     if (queue->IsEmpty()) {
240         return 0;
241     }
242 
243     std::queue<Task> taskQueue;
244     size_t realCount = 0;
245     {
246         os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
247         size_t size = queue->CountOfTasksWithExecutionMode(properties.GetTaskExecutionMode());
248         size_t countToGet = size / (workers_.size() + 1);
249         countToGet = (countToGet >= WorkerThread::WORKER_QUEUE_SIZE) ? WorkerThread::WORKER_QUEUE_SIZE
250                      : (size % (workers_.size() + 1) == 0)           ? countToGet
251                                                                      : countToGet + 1;
252         realCount = queue->PopTasksToHelperThread([&taskQueue](Task &&task) { taskQueue.push(std::move(task)); },
253                                                   countToGet, properties.GetTaskExecutionMode());
254     }
255     while (!taskQueue.empty()) {
256         taskQueue.front().RunTask();
257         taskQueue.pop();
258     }
259     return realCount;
260 }
261 
StealAndExecuteOneTaskFromWorkers(TaskProperties properties)262 size_t TaskScheduler::StealAndExecuteOneTaskFromWorkers(TaskProperties properties)
263 {
264     ASSERT(!workers_.empty());
265     std::queue<Task> taskQueue;
266     auto addTaskToQueue = [&taskQueue](Task &&task) { taskQueue.push(std::move(task)); };
267     auto chosenWorker = *std::max_element(
268         workers_.begin(), workers_.end(), [&properties](const WorkerThread *lv, const WorkerThread *rv) {
269             return lv->CountOfTasksWithProperties(properties) < rv->CountOfTasksWithProperties(properties);
270         });
271     if (chosenWorker->CountOfTasksWithProperties(properties) == 0) {
272         return 0;
273     }
274 
275     auto stolen = chosenWorker->GiveTasksToAnotherWorker(addTaskToQueue, 1UL,
276                                                          chosenWorker->GetLocalWorkerQueueSchedulerPopId(), properties);
277 
278     while (!taskQueue.empty()) {
279         taskQueue.front().RunTask();
280         taskQueue.pop();
281     }
282     return stolen;
283 }
284 
WaitForFinishAllTasksWithProperties(TaskProperties properties)285 void TaskScheduler::WaitForFinishAllTasksWithProperties(TaskProperties properties)
286 {
287     os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
288     // Atomic with acq_rel order reason: other thread should see correct value
289     waitToFinish_.fetch_add(1, std::memory_order_acq_rel);
290     while (GetCountOfTasksInSystemWithTaskProperties(properties) != 0) {
291         finishTasksCondVar_.Wait(&taskSchedulerStateLock_);
292     }
293     // Atomic with acq_rel order reason: other thread should see correct value
294     waitToFinish_.fetch_sub(1, std::memory_order_acq_rel);
295 }
296 
Finalize()297 void TaskScheduler::Finalize()
298 {
299     ASSERT(start_);
300     {
301         // Wait all tasks will be done
302         os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
303         // Atomic with acq_rel order reason: other thread should
304         // see correct value
305         waitToFinish_.fetch_add(1, std::memory_order_acq_rel);
306         while (!AreNoMoreTasks()) {
307             finishTasksCondVar_.Wait(&taskSchedulerStateLock_);
308         }
309         finish_ = true;
310         // Atomic with release order reason: other thread should see last value
311         disableHelpers_.store(true, std::memory_order_release);
312         // Atomic with acq_rel order reason: other thread should
313         // see correct value
314         waitToFinish_.fetch_sub(1, std::memory_order_acq_rel);
315         queuesWaitCondVar_.SignalAll();
316     }
317     for (auto *worker : workers_) {
318         worker->Join();
319     }
320     for (auto *worker : workers_) {
321         delete worker;
322     }
323 
324     if (IsTaskLifetimeStatisticsUsed()) {
325         for (const auto &line : taskTimeStats_->GetTaskStatistics()) {
326             LOG(INFO, TASK_MANAGER) << line;
327         }
328     }
329     LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: Finalized";
330 }
331 
IncrementCounterOfAddedTasks(TaskProperties properties,size_t ivalue)332 void TaskScheduler::IncrementCounterOfAddedTasks(TaskProperties properties, size_t ivalue)
333 {
334     IncrementCountOfTasksInSystem(properties, ivalue);
335 }
336 
IncrementCounterOfExecutedTasks(const TaskPropertiesCounterMap & counterMap)337 size_t TaskScheduler::IncrementCounterOfExecutedTasks(const TaskPropertiesCounterMap &counterMap)
338 {
339     size_t countOfTasks = 0;
340     for (const auto &[properties, count] : counterMap) {
341         countOfTasks += count;
342         DecrementCountOfTasksInSystem(properties, count);
343         // Atomic with acquire order reason: get correct value
344         auto waitToFinish = waitToFinish_.load(std::memory_order_acquire);
345         if (waitToFinish > 0 && GetCountOfTasksInSystemWithTaskProperties(properties) == 0) {
346             os::memory::LockHolder outsideLockHolder(taskSchedulerStateLock_);
347             finishTasksCondVar_.SignalAll();
348         }
349     }
350     return countOfTasks;
351 }
352 
SignalWorkers()353 void TaskScheduler::SignalWorkers()
354 {
355     // Atomic with acquire order reason: get correct value
356     if (waitWorkersCount_.load(std::memory_order_acquire) > 0) {
357         os::memory::LockHolder outsideLockHolder(taskSchedulerStateLock_);
358         queuesWaitCondVar_.Signal();
359     }
360 }
361 
GetQueue(TaskQueueId id) const362 internal::SchedulableTaskQueueInterface *TaskScheduler::GetQueue(TaskQueueId id) const
363 {
364     internal::SchedulableTaskQueueInterface *queue = nullptr;
365     auto taskQueuesIterator = taskQueues_.find(id);
366     if (taskQueuesIterator == taskQueues_.end()) {
367         LOG(FATAL, COMMON) << "Attempt to take a task from a non-existent queue";
368     }
369     std::tie(std::ignore, queue) = *taskQueuesIterator;
370     return queue;
371 }
372 
AddTaskToWaitListWithTimeout(Task && task,uint64_t time)373 WaiterId TaskScheduler::AddTaskToWaitListWithTimeout(Task &&task, uint64_t time)
374 {
375     os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
376     this->IncrementCounterOfAddedTasks(task.GetTaskProperties(), 1U);
377     return waitList_.AddValueToWait(std::move(task), time);
378 }
379 
AddTaskToWaitList(Task && task)380 WaiterId TaskScheduler::AddTaskToWaitList(Task &&task)
381 {
382     // Use adding with max time as possible, wait list will understand that it should set max possible time
383     return AddTaskToWaitListWithTimeout(std::move(task), std::numeric_limits<uint64_t>().max());
384 }
385 
PutWaitTaskInLocalQueue(LocalTaskQueue & queue)386 void TaskScheduler::PutWaitTaskInLocalQueue(LocalTaskQueue &queue)
387 {
388     for (auto task = waitList_.GetReadyValue(); task.has_value(); task = waitList_.GetReadyValue()) {
389         queue.push(std::move(task.value()));
390     }
391 }
392 
PutTaskInTaskQueues(LocalTaskQueue & queue)393 void TaskScheduler::PutTaskInTaskQueues(LocalTaskQueue &queue)
394 {
395     while (!queue.empty()) {
396         Task task = std::move(queue.front());
397         queue.pop();
398         auto prop = task.GetTaskProperties();
399         auto *taskQueue = GetQueue({prop.GetTaskType(), prop.GetVMType()});
400         taskQueue->AddTaskWithoutNewTaskCallbackExecution(std::move(task));
401     }
402 }
403 
SignalWaitList(WaiterId waiterId)404 void TaskScheduler::SignalWaitList(WaiterId waiterId)
405 {
406     std::optional<Task> task;
407     {
408         os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
409         task = waitList_.GetValueById(waiterId);
410     }
411     if (!task.has_value()) {
412         return;
413     }
414     auto prop = task->GetTaskProperties();
415     auto *queue = GetQueue({prop.GetTaskType(), prop.GetVMType()});
416     queue->AddTaskWithoutNewTaskCallbackExecution(std::move(task.value()));
417 }
418 
~TaskScheduler()419 TaskScheduler::~TaskScheduler()
420 {
421     // We can delete TaskScheduler if it wasn't started or it was finished
422     ASSERT(start_ == finish_);
423     // Check if all task queue was deleted
424     ASSERT(taskQueues_.empty());
425     delete taskTimeStats_;
426 }
427 
IncrementCountOfTasksInSystem(TaskProperties prop,size_t count)428 void TaskScheduler::IncrementCountOfTasksInSystem(TaskProperties prop, size_t count)
429 {
430     // Atomic with acq_rel order reason: fast add count to countOfTasksInSystem_[prop]
431     countOfTasksInSystem_[prop].fetch_add(count, std::memory_order_acq_rel);
432 }
433 
DecrementCountOfTasksInSystem(TaskProperties prop,size_t count)434 void TaskScheduler::DecrementCountOfTasksInSystem(TaskProperties prop, size_t count)
435 {
436     // Atomic with acq_rel order reason: fast sub count to countOfTasksInSystem_[prop]
437     countOfTasksInSystem_[prop].fetch_sub(count, std::memory_order_acq_rel);
438 }
439 
GetCountOfTasksInSystemWithTaskProperties(TaskProperties prop) const440 size_t TaskScheduler::GetCountOfTasksInSystemWithTaskProperties(TaskProperties prop) const
441 {
442     // Atomic with acquire order reason: need to sync with all prev fetch_adds and fetch_subs
443     return countOfTasksInSystem_.at(prop).load(std::memory_order_acquire);
444 }
445 
GetCountOfTasksInSystem() const446 size_t TaskScheduler::GetCountOfTasksInSystem() const
447 {
448     size_t sumCount = 0;
449     for ([[maybe_unused]] const auto &[prop, counter] : countOfTasksInSystem_) {
450         // Atomic with acquire order reason: need to sync with all prev fetch_adds and fetch_subs
451         sumCount += counter.load(std::memory_order_acquire);
452     }
453     return sumCount;
454 }
455 
IsTaskLifetimeStatisticsUsed() const456 bool TaskScheduler::IsTaskLifetimeStatisticsUsed() const
457 {
458     return taskTimeStatsType_ != TaskTimeStatsType::NO_STATISTICS;
459 }
460 
GetTaskTimeStats() const461 TaskTimeStatsBase *TaskScheduler::GetTaskTimeStats() const
462 {
463     return taskTimeStats_;
464 }
465 
466 }  // namespace ark::taskmanager
467