1 /*
2 * Copyright (c) 2023-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H
17 #define PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H
18
19 #include <atomic>
20 #include "libpandabase/os/mutex.h"
21 #include "libpandabase/taskmanager/schedulable_task_queue_interface.h"
22 #include "taskmanager/utils/task_time_stats.h"
23 #include "libpandabase/taskmanager/task.h"
24 #include "libpandabase/taskmanager/utils/two_lock_queue.h"
25
26 namespace ark::taskmanager::internal {
27
28 /**
29 * @brief TaskQueue is a thread-safe queue for tasks. Queues can be registered in TaskScheduler and used to execute
30 * tasks on workers. Also, queues can notify other threads when a new task is pushed.
31 * @tparam Allocator - allocator of Task that will be used in internal queues. By default is used
32 * std::allocator<Task>
33 */
34 template <class Allocator = std::allocator<Task>>
35 class TaskQueue : public SchedulableTaskQueueInterface {
36 using TaskAllocatorType = typename Allocator::template rebind<Task>::other;
37 using TaskQueueAllocatorType = typename Allocator::template rebind<TaskQueue<TaskAllocatorType>>::other;
38 template <class OtherAllocator>
39 friend class TaskQueue;
40
41 public:
42 NO_COPY_SEMANTIC(TaskQueue);
43 NO_MOVE_SEMANTIC(TaskQueue);
44
45 static PANDA_PUBLIC_API SchedulableTaskQueueInterface *Create(QueuePriority priority, TaskWaitList *waitList,
46 TaskTimeStatsBase *taskTimeStats);
47 static PANDA_PUBLIC_API void Destroy(SchedulableTaskQueueInterface *queue);
48
49 PANDA_PUBLIC_API size_t AddForegroundTask(RunnerCallback runner) override;
50 PANDA_PUBLIC_API size_t AddBackgroundTask(RunnerCallback runner) override;
51
52 PANDA_PUBLIC_API WaiterId AddForegroundTaskInWaitList(RunnerCallback runtime, uint64_t timeToWait) override;
53 PANDA_PUBLIC_API WaiterId AddBackgroundTaskInWaitList(RunnerCallback runtime, uint64_t timeToWait) override;
54
55 PANDA_PUBLIC_API WaiterId AddForegroundTaskInWaitList(RunnerCallback runtime) override;
56 PANDA_PUBLIC_API WaiterId AddBackgroundTaskInWaitList(RunnerCallback runtime) override;
57
58 PANDA_PUBLIC_API void SignalWaitList(WaiterId id) override;
59
60 [[nodiscard]] PANDA_PUBLIC_API bool IsEmpty() const override;
61 [[nodiscard]] PANDA_PUBLIC_API bool HasForegroundTasks() const override;
62 [[nodiscard]] PANDA_PUBLIC_API bool HasBackgroundTasks() const override;
63
64 [[nodiscard]] PANDA_PUBLIC_API size_t Size() const override;
65 [[nodiscard]] PANDA_PUBLIC_API size_t CountOfForegroundTasks() const override;
66 [[nodiscard]] PANDA_PUBLIC_API size_t CountOfBackgroundTasks() const override;
67
68 PANDA_PUBLIC_API size_t ExecuteTask() override;
69 PANDA_PUBLIC_API size_t ExecuteForegroundTask() override;
70 PANDA_PUBLIC_API size_t ExecuteBackgroundTask() override;
71
72 PANDA_PUBLIC_API void WaitTasks() override;
73 PANDA_PUBLIC_API void WaitForegroundTasks() override;
74 PANDA_PUBLIC_API void WaitBackgroundTasks() override;
75
76 [[nodiscard]] TaskPtr PopTask() override;
77 [[nodiscard]] TaskPtr PopForegroundTask() override;
78 [[nodiscard]] TaskPtr PopBackgroundTask() override;
79
80 size_t PopTasksToWorker(const AddTaskToWorkerFunc &addForegroundTaskFunc,
81 const AddTaskToWorkerFunc &addBackgroundTaskFunc, size_t size) override;
82 size_t PopForegroundTasksToHelperThread(const AddTaskToHelperFunc &addTaskFunc, size_t size) override;
83 size_t PopBackgroundTasksToHelperThread(const AddTaskToHelperFunc &addTaskFunc, size_t size) override;
84
85 size_t GetCountOfLiveTasks() const override;
86 size_t GetCountOfLiveForegroundTasks() const override;
87 size_t GetCountOfLiveBackgroundTasks() const override;
88
89 TaskTimeStatsBase *GetTaskTimeStats() const override;
90
91 void SetCallbacks(SignalWorkersCallback signalWorkersCallback) override;
92 void UnsetCallbacks() override;
93
94 private:
95 using InternalTaskQueue = TwoLockQueue<TaskPtr, TaskAllocatorType>;
96
97 PANDA_PUBLIC_API size_t AddForegroundTaskImpl(RunnerCallback &&runner);
98 PANDA_PUBLIC_API size_t AddBackgroundTaskImpl(RunnerCallback &&runner);
99
100 PANDA_PUBLIC_API size_t IncrementCountOfLiveForegroundTasks();
101 PANDA_PUBLIC_API size_t IncrementCountOfLiveBackgroundTasks();
102
103 static void OnForegroundTaskDestructionCallback(TaskQueueInterface *queue);
104 static void OnBackgroundTaskDestructionCallback(TaskQueueInterface *queue);
105
TaskQueue(QueuePriority priority,TaskWaitList * waitList,TaskTimeStatsBase * taskTimeStats)106 TaskQueue(QueuePriority priority, TaskWaitList *waitList, TaskTimeStatsBase *taskTimeStats)
107 : SchedulableTaskQueueInterface(priority), taskTimeStats_(taskTimeStats), waitList_(waitList)
108 {
109 }
~TaskQueue()110 PANDA_PUBLIC_API ~TaskQueue() override
111 {
112 ASSERT(foregroundTaskQueue_.IsEmpty() && backgroundTaskQueue_.IsEmpty());
113 }
114
115 /// subscriber_lock_ is used in case of calling new_tasks_callback_
116 SignalWorkersCallback signalWorkersCallback_;
117 TaskTimeStatsBase *taskTimeStats_ {nullptr};
118
119 os::memory::Mutex waitingMutex_;
120 os::memory::ConditionVariable waitingCondVar_;
121
122 /// foreground part of TaskQueue
123 std::atomic_size_t foregroundLiveTasks_ {0};
124 InternalTaskQueue foregroundTaskQueue_;
125 /// background part of TaskQueue
126 std::atomic_size_t backgroundLiveTasks_ {0};
127 InternalTaskQueue backgroundTaskQueue_;
128
129 TaskWaitList *waitList_ = nullptr;
130 };
131
132 template <class Allocator>
Create(QueuePriority priority,TaskWaitList * waitList,TaskTimeStatsBase * taskTimeStats)133 inline SchedulableTaskQueueInterface *TaskQueue<Allocator>::Create(QueuePriority priority, TaskWaitList *waitList,
134 TaskTimeStatsBase *taskTimeStats)
135 {
136 TaskQueueAllocatorType allocator;
137 auto *mem = allocator.allocate(1U);
138 return new (mem) TaskQueue<TaskAllocatorType>(priority, waitList, taskTimeStats);
139 }
140
141 template <class Allocator>
Destroy(SchedulableTaskQueueInterface * queue)142 inline void TaskQueue<Allocator>::Destroy(SchedulableTaskQueueInterface *queue)
143 {
144 TaskQueueAllocatorType allocator;
145 std::allocator_traits<TaskQueueAllocatorType>::destroy(allocator, queue);
146 allocator.deallocate(static_cast<TaskQueue<TaskAllocatorType> *>(queue), 1U);
147 }
148
149 template <class Allocator>
OnForegroundTaskDestructionCallback(TaskQueueInterface * queue)150 inline void TaskQueue<Allocator>::OnForegroundTaskDestructionCallback(TaskQueueInterface *queue)
151 {
152 auto iQueue = reinterpret_cast<TaskQueue *>(queue);
153 // Atomic with relaxed order reason: all non-atomic and relaxed stores will be see after waitingMutex_ getting
154 auto aliveTasks = iQueue->foregroundLiveTasks_.fetch_sub(1U, std::memory_order_relaxed);
155 if (aliveTasks == 1U) {
156 os::memory::LockHolder lh(iQueue->waitingMutex_);
157 iQueue->waitingCondVar_.SignalAll();
158 }
159 }
160
161 template <class Allocator>
OnBackgroundTaskDestructionCallback(TaskQueueInterface * queue)162 inline void TaskQueue<Allocator>::OnBackgroundTaskDestructionCallback(TaskQueueInterface *queue)
163 {
164 auto iQueue = reinterpret_cast<TaskQueue *>(queue);
165 // Atomic with relaxed order reason: all non-atomic and relaxed stores will be see after waitingMutex_ getting
166 auto aliveTasks = iQueue->backgroundLiveTasks_.fetch_sub(1U, std::memory_order_relaxed);
167 if (aliveTasks == 1U) {
168 os::memory::LockHolder lh(iQueue->waitingMutex_);
169 iQueue->waitingCondVar_.SignalAll();
170 }
171 }
172
173 template <class Allocator>
AddForegroundTask(RunnerCallback runner)174 inline size_t TaskQueue<Allocator>::AddForegroundTask(RunnerCallback runner)
175 {
176 IncrementCountOfLiveForegroundTasks();
177 return AddForegroundTaskImpl(std::move(runner));
178 }
179
180 template <class Allocator>
AddBackgroundTask(RunnerCallback runner)181 inline size_t TaskQueue<Allocator>::AddBackgroundTask(RunnerCallback runner)
182 {
183 IncrementCountOfLiveBackgroundTasks();
184 return AddBackgroundTaskImpl(std::move(runner));
185 }
186
187 template <class Allocator>
AddForegroundTaskInWaitList(RunnerCallback runner,uint64_t timeToWait)188 inline PANDA_PUBLIC_API WaiterId TaskQueue<Allocator>::AddForegroundTaskInWaitList(RunnerCallback runner,
189 uint64_t timeToWait)
190 {
191 IncrementCountOfLiveForegroundTasks();
192 auto waitListCallback = [this](RunnerCallback &&irunner) { AddForegroundTaskImpl(std::move(irunner)); };
193 return waitList_->AddValueToWait({std::move(runner), waitListCallback}, timeToWait);
194 }
195
196 template <class Allocator>
AddBackgroundTaskInWaitList(RunnerCallback runner,uint64_t timeToWait)197 inline PANDA_PUBLIC_API WaiterId TaskQueue<Allocator>::AddBackgroundTaskInWaitList(RunnerCallback runner,
198 uint64_t timeToWait)
199 {
200 IncrementCountOfLiveBackgroundTasks();
201 auto waitListCallback = [this](RunnerCallback &&irunner) { AddBackgroundTaskImpl(std::move(irunner)); };
202 return waitList_->AddValueToWait({std::move(runner), waitListCallback}, timeToWait);
203 }
204
205 template <class Allocator>
AddForegroundTaskInWaitList(RunnerCallback runner)206 inline PANDA_PUBLIC_API WaiterId TaskQueue<Allocator>::AddForegroundTaskInWaitList(RunnerCallback runner)
207 {
208 IncrementCountOfLiveForegroundTasks();
209 auto waitListCallback = [this](RunnerCallback &&irunner) { AddForegroundTaskImpl(std::move(irunner)); };
210 return waitList_->AddValueToWait({std::move(runner), waitListCallback});
211 }
212
213 template <class Allocator>
AddBackgroundTaskInWaitList(RunnerCallback runner)214 inline PANDA_PUBLIC_API WaiterId TaskQueue<Allocator>::AddBackgroundTaskInWaitList(RunnerCallback runner)
215 {
216 IncrementCountOfLiveBackgroundTasks();
217 auto waitListCallback = [this](RunnerCallback &&irunner) { AddBackgroundTaskImpl(std::move(irunner)); };
218 return waitList_->AddValueToWait({std::move(runner), waitListCallback});
219 }
220
221 template <class Allocator>
SignalWaitList(WaiterId id)222 void TaskQueue<Allocator>::SignalWaitList(WaiterId id)
223 {
224 auto waitVal = waitList_->GetValueById(id);
225 if (!waitVal.has_value()) {
226 return;
227 }
228 auto [task, taskPoster] = std::move(waitVal.value());
229 taskPoster(std::move(task));
230 }
231
232 template <class Allocator>
IsEmpty()233 inline bool TaskQueue<Allocator>::IsEmpty() const
234 {
235 return foregroundTaskQueue_.IsEmpty() && backgroundTaskQueue_.IsEmpty();
236 }
237
238 template <class Allocator>
HasForegroundTasks()239 inline bool TaskQueue<Allocator>::HasForegroundTasks() const
240 {
241 return !foregroundTaskQueue_.IsEmpty();
242 }
243
244 template <class Allocator>
HasBackgroundTasks()245 inline bool TaskQueue<Allocator>::HasBackgroundTasks() const
246 {
247 return !backgroundTaskQueue_.IsEmpty();
248 }
249
250 template <class Allocator>
Size()251 inline size_t TaskQueue<Allocator>::Size() const
252 {
253 return foregroundTaskQueue_.Size() + backgroundTaskQueue_.Size();
254 }
255
256 template <class Allocator>
CountOfForegroundTasks()257 inline size_t TaskQueue<Allocator>::CountOfForegroundTasks() const
258 {
259 return foregroundTaskQueue_.Size();
260 }
261
262 template <class Allocator>
CountOfBackgroundTasks()263 inline size_t TaskQueue<Allocator>::CountOfBackgroundTasks() const
264 {
265 return backgroundTaskQueue_.Size();
266 }
267
268 template <class Allocator>
ExecuteTask()269 inline size_t TaskQueue<Allocator>::ExecuteTask()
270 {
271 TaskPtr task = PopTask();
272 if (task == nullptr) {
273 return 0U;
274 }
275 task->RunTask();
276 return 1U;
277 }
278
279 template <class Allocator>
ExecuteForegroundTask()280 inline size_t TaskQueue<Allocator>::ExecuteForegroundTask()
281 {
282 TaskPtr task = PopForegroundTask();
283 if (task == nullptr) {
284 return 0U;
285 }
286 task->RunTask();
287 return 1U;
288 }
289
290 template <class Allocator>
ExecuteBackgroundTask()291 inline size_t TaskQueue<Allocator>::ExecuteBackgroundTask()
292 {
293 TaskPtr task = PopBackgroundTask();
294 if (task == nullptr) {
295 return 0U;
296 }
297 task->RunTask();
298 return 1U;
299 }
300
301 template <class Allocator>
WaitTasks()302 inline void TaskQueue<Allocator>::WaitTasks()
303 {
304 os::memory::LockHolder lh(waitingMutex_);
305 while (GetCountOfLiveBackgroundTasks() != 0 || GetCountOfLiveForegroundTasks() != 0) {
306 waitingCondVar_.Wait(&waitingMutex_);
307 }
308 }
309
310 template <class Allocator>
WaitForegroundTasks()311 inline void TaskQueue<Allocator>::WaitForegroundTasks()
312 {
313 os::memory::LockHolder lh(waitingMutex_);
314 while (GetCountOfLiveForegroundTasks() != 0) {
315 waitingCondVar_.Wait(&waitingMutex_);
316 }
317 }
318
319 template <class Allocator>
WaitBackgroundTasks()320 inline void TaskQueue<Allocator>::WaitBackgroundTasks()
321 {
322 os::memory::LockHolder lh(waitingMutex_);
323 while (GetCountOfLiveBackgroundTasks() != 0) {
324 waitingCondVar_.Wait(&waitingMutex_);
325 }
326 }
327
328 template <class Allocator>
PopTask()329 inline TaskPtr TaskQueue<Allocator>::PopTask()
330 {
331 TaskPtr task = nullptr;
332 if (foregroundTaskQueue_.TryPop(&task)) {
333 return task;
334 }
335 backgroundTaskQueue_.TryPop(&task);
336 return task;
337 }
338
339 template <class Allocator>
PopForegroundTask()340 inline TaskPtr TaskQueue<Allocator>::PopForegroundTask()
341 {
342 TaskPtr task = nullptr;
343 foregroundTaskQueue_.TryPop(&task);
344 return task;
345 }
346
347 template <class Allocator>
PopBackgroundTask()348 inline TaskPtr TaskQueue<Allocator>::PopBackgroundTask()
349 {
350 TaskPtr task = nullptr;
351 backgroundTaskQueue_.TryPop(&task);
352 return task;
353 }
354
355 template <class Allocator>
356 // CC-OFFNXT(G.FUD.06) Splitting this function will degrade readability. Keyword "inline" needs to satisfy ODR rule.
PopTasksToWorker(const AddTaskToWorkerFunc & addForegroundTaskFunc,const AddTaskToWorkerFunc & addBackgroundTaskFunc,size_t size)357 inline size_t TaskQueue<Allocator>::PopTasksToWorker(const AddTaskToWorkerFunc &addForegroundTaskFunc,
358 const AddTaskToWorkerFunc &addBackgroundTaskFunc, size_t size)
359 {
360 for (size_t i = 0; i < size; i++) {
361 TaskPtr task;
362 if (foregroundTaskQueue_.TryPop(&task)) {
363 addForegroundTaskFunc(std::move(task));
364 continue;
365 }
366 if (backgroundTaskQueue_.TryPop(&task)) {
367 addBackgroundTaskFunc(std::move(task));
368 continue;
369 }
370 return i;
371 }
372 return size;
373 }
374
375 template <class Allocator>
PopForegroundTasksToHelperThread(const AddTaskToHelperFunc & addTaskFunc,size_t size)376 inline size_t TaskQueue<Allocator>::PopForegroundTasksToHelperThread(const AddTaskToHelperFunc &addTaskFunc,
377 size_t size)
378 {
379 for (size_t i = 0; i < size; i++) {
380 TaskPtr task;
381 if (foregroundTaskQueue_.TryPop(&task)) {
382 addTaskFunc(std::move(task));
383 }
384 return i;
385 }
386 return size;
387 }
388
389 template <class Allocator>
PopBackgroundTasksToHelperThread(const AddTaskToHelperFunc & addTaskFunc,size_t size)390 inline size_t TaskQueue<Allocator>::PopBackgroundTasksToHelperThread(const AddTaskToHelperFunc &addTaskFunc,
391 size_t size)
392 {
393 for (size_t i = 0; i < size; i++) {
394 TaskPtr task;
395 if (backgroundTaskQueue_.TryPop(&task)) {
396 addTaskFunc(std::move(task));
397 }
398 return i;
399 }
400 return size;
401 }
402
403 template <class Allocator>
GetCountOfLiveTasks()404 inline size_t TaskQueue<Allocator>::GetCountOfLiveTasks() const
405 {
406 return GetCountOfLiveForegroundTasks() + GetCountOfLiveBackgroundTasks();
407 }
408
409 template <class Allocator>
GetCountOfLiveForegroundTasks()410 inline size_t TaskQueue<Allocator>::GetCountOfLiveForegroundTasks() const
411 {
412 // Atomic with relaxed order reason: no order dependency with another variables
413 return foregroundLiveTasks_.load(std::memory_order_relaxed);
414 }
415
416 template <class Allocator>
GetCountOfLiveBackgroundTasks()417 inline size_t TaskQueue<Allocator>::GetCountOfLiveBackgroundTasks() const
418 {
419 // Atomic with relaxed order reason: no order dependency with another variables
420 return backgroundLiveTasks_.load(std::memory_order_relaxed);
421 }
422
423 template <class Allocator>
GetTaskTimeStats()424 inline TaskTimeStatsBase *TaskQueue<Allocator>::GetTaskTimeStats() const
425 {
426 return taskTimeStats_;
427 }
428
429 template <class Allocator>
SetCallbacks(SignalWorkersCallback signalWorkersCallback)430 inline void TaskQueue<Allocator>::SetCallbacks(SignalWorkersCallback signalWorkersCallback)
431 {
432 signalWorkersCallback_ = std::move(signalWorkersCallback);
433 }
434
435 template <class Allocator>
UnsetCallbacks()436 inline void TaskQueue<Allocator>::UnsetCallbacks()
437 {
438 signalWorkersCallback_ = nullptr;
439 }
440
441 template <class Allocator>
AddForegroundTaskImpl(RunnerCallback && runner)442 inline size_t TaskQueue<Allocator>::AddForegroundTaskImpl(RunnerCallback &&runner)
443 {
444 auto task = Task::Create(std::move(runner), this, OnForegroundTaskDestructionCallback);
445 foregroundTaskQueue_.Push(std::move(task));
446 if (signalWorkersCallback_ != nullptr) {
447 signalWorkersCallback_();
448 }
449 return foregroundTaskQueue_.Size();
450 }
451
452 template <class Allocator>
AddBackgroundTaskImpl(RunnerCallback && runner)453 inline size_t TaskQueue<Allocator>::AddBackgroundTaskImpl(RunnerCallback &&runner)
454 {
455 auto task = Task::Create(std::move(runner), this, OnBackgroundTaskDestructionCallback);
456 backgroundTaskQueue_.Push(std::move(task));
457 if (signalWorkersCallback_ != nullptr) {
458 signalWorkersCallback_();
459 }
460 return backgroundTaskQueue_.Size();
461 }
462
463 template <class Allocator>
IncrementCountOfLiveForegroundTasks()464 inline size_t TaskQueue<Allocator>::IncrementCountOfLiveForegroundTasks()
465 {
466 // Atomic with relaxed order reason: no order dependency with another variables
467 return foregroundLiveTasks_.fetch_add(1U, std::memory_order_relaxed);
468 }
469 template <class Allocator>
IncrementCountOfLiveBackgroundTasks()470 inline size_t TaskQueue<Allocator>::IncrementCountOfLiveBackgroundTasks()
471 {
472 // Atomic with relaxed order reason: no order dependency with another variables
473 return backgroundLiveTasks_.fetch_add(1U, std::memory_order_relaxed);
474 }
475
476 } // namespace ark::taskmanager::internal
477
478 #endif // PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H
479