• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H
17 #define PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H
18 
19 #include <atomic>
20 #include "libpandabase/os/mutex.h"
21 #include "libpandabase/taskmanager/schedulable_task_queue_interface.h"
22 #include "taskmanager/utils/task_time_stats.h"
23 #include "libpandabase/taskmanager/task.h"
24 #include "libpandabase/taskmanager/utils/two_lock_queue.h"
25 
26 namespace ark::taskmanager::internal {
27 
28 /**
29  * @brief TaskQueue is a thread-safe queue for tasks. Queues can be registered in TaskScheduler and used to execute
30  * tasks on workers. Also, queues can notify other threads when a new task is pushed.
31  * @tparam Allocator - allocator of Task that will be used in internal queues. By default is used
32  * std::allocator<Task>
33  */
34 template <class Allocator = std::allocator<Task>>
35 class TaskQueue : public SchedulableTaskQueueInterface {
36     using TaskAllocatorType = typename Allocator::template rebind<Task>::other;
37     using TaskQueueAllocatorType = typename Allocator::template rebind<TaskQueue<TaskAllocatorType>>::other;
38     template <class OtherAllocator>
39     friend class TaskQueue;
40 
41 public:
42     NO_COPY_SEMANTIC(TaskQueue);
43     NO_MOVE_SEMANTIC(TaskQueue);
44 
45     static PANDA_PUBLIC_API SchedulableTaskQueueInterface *Create(QueuePriority priority, TaskWaitList *waitList,
46                                                                   TaskTimeStatsBase *taskTimeStats);
47     static PANDA_PUBLIC_API void Destroy(SchedulableTaskQueueInterface *queue);
48 
49     PANDA_PUBLIC_API size_t AddForegroundTask(RunnerCallback runner) override;
50     PANDA_PUBLIC_API size_t AddBackgroundTask(RunnerCallback runner) override;
51 
52     PANDA_PUBLIC_API WaiterId AddForegroundTaskInWaitList(RunnerCallback runtime, uint64_t timeToWait) override;
53     PANDA_PUBLIC_API WaiterId AddBackgroundTaskInWaitList(RunnerCallback runtime, uint64_t timeToWait) override;
54 
55     PANDA_PUBLIC_API WaiterId AddForegroundTaskInWaitList(RunnerCallback runtime) override;
56     PANDA_PUBLIC_API WaiterId AddBackgroundTaskInWaitList(RunnerCallback runtime) override;
57 
58     PANDA_PUBLIC_API void SignalWaitList(WaiterId id) override;
59 
60     [[nodiscard]] PANDA_PUBLIC_API bool IsEmpty() const override;
61     [[nodiscard]] PANDA_PUBLIC_API bool HasForegroundTasks() const override;
62     [[nodiscard]] PANDA_PUBLIC_API bool HasBackgroundTasks() const override;
63 
64     [[nodiscard]] PANDA_PUBLIC_API size_t Size() const override;
65     [[nodiscard]] PANDA_PUBLIC_API size_t CountOfForegroundTasks() const override;
66     [[nodiscard]] PANDA_PUBLIC_API size_t CountOfBackgroundTasks() const override;
67 
68     PANDA_PUBLIC_API size_t ExecuteTask() override;
69     PANDA_PUBLIC_API size_t ExecuteForegroundTask() override;
70     PANDA_PUBLIC_API size_t ExecuteBackgroundTask() override;
71 
72     PANDA_PUBLIC_API void WaitTasks() override;
73     PANDA_PUBLIC_API void WaitForegroundTasks() override;
74     PANDA_PUBLIC_API void WaitBackgroundTasks() override;
75 
76     [[nodiscard]] TaskPtr PopTask() override;
77     [[nodiscard]] TaskPtr PopForegroundTask() override;
78     [[nodiscard]] TaskPtr PopBackgroundTask() override;
79 
80     size_t PopTasksToWorker(const AddTaskToWorkerFunc &addForegroundTaskFunc,
81                             const AddTaskToWorkerFunc &addBackgroundTaskFunc, size_t size) override;
82     size_t PopForegroundTasksToHelperThread(const AddTaskToHelperFunc &addTaskFunc, size_t size) override;
83     size_t PopBackgroundTasksToHelperThread(const AddTaskToHelperFunc &addTaskFunc, size_t size) override;
84 
85     size_t GetCountOfLiveTasks() const override;
86     size_t GetCountOfLiveForegroundTasks() const override;
87     size_t GetCountOfLiveBackgroundTasks() const override;
88 
89     TaskTimeStatsBase *GetTaskTimeStats() const override;
90 
91     void SetCallbacks(SignalWorkersCallback signalWorkersCallback) override;
92     void UnsetCallbacks() override;
93 
94 private:
95     using InternalTaskQueue = TwoLockQueue<TaskPtr, TaskAllocatorType>;
96 
97     PANDA_PUBLIC_API size_t AddForegroundTaskImpl(RunnerCallback &&runner);
98     PANDA_PUBLIC_API size_t AddBackgroundTaskImpl(RunnerCallback &&runner);
99 
100     PANDA_PUBLIC_API size_t IncrementCountOfLiveForegroundTasks();
101     PANDA_PUBLIC_API size_t IncrementCountOfLiveBackgroundTasks();
102 
103     static void OnForegroundTaskDestructionCallback(TaskQueueInterface *queue);
104     static void OnBackgroundTaskDestructionCallback(TaskQueueInterface *queue);
105 
TaskQueue(QueuePriority priority,TaskWaitList * waitList,TaskTimeStatsBase * taskTimeStats)106     TaskQueue(QueuePriority priority, TaskWaitList *waitList, TaskTimeStatsBase *taskTimeStats)
107         : SchedulableTaskQueueInterface(priority), taskTimeStats_(taskTimeStats), waitList_(waitList)
108     {
109     }
~TaskQueue()110     PANDA_PUBLIC_API ~TaskQueue() override
111     {
112         ASSERT(foregroundTaskQueue_.IsEmpty() && backgroundTaskQueue_.IsEmpty());
113     }
114 
115     /// subscriber_lock_ is used in case of calling new_tasks_callback_
116     SignalWorkersCallback signalWorkersCallback_;
117     TaskTimeStatsBase *taskTimeStats_ {nullptr};
118 
119     os::memory::Mutex waitingMutex_;
120     os::memory::ConditionVariable waitingCondVar_;
121 
122     /// foreground part of TaskQueue
123     std::atomic_size_t foregroundLiveTasks_ {0};
124     InternalTaskQueue foregroundTaskQueue_;
125     /// background part of TaskQueue
126     std::atomic_size_t backgroundLiveTasks_ {0};
127     InternalTaskQueue backgroundTaskQueue_;
128 
129     TaskWaitList *waitList_ = nullptr;
130 };
131 
132 template <class Allocator>
Create(QueuePriority priority,TaskWaitList * waitList,TaskTimeStatsBase * taskTimeStats)133 inline SchedulableTaskQueueInterface *TaskQueue<Allocator>::Create(QueuePriority priority, TaskWaitList *waitList,
134                                                                    TaskTimeStatsBase *taskTimeStats)
135 {
136     TaskQueueAllocatorType allocator;
137     auto *mem = allocator.allocate(1U);
138     return new (mem) TaskQueue<TaskAllocatorType>(priority, waitList, taskTimeStats);
139 }
140 
141 template <class Allocator>
Destroy(SchedulableTaskQueueInterface * queue)142 inline void TaskQueue<Allocator>::Destroy(SchedulableTaskQueueInterface *queue)
143 {
144     TaskQueueAllocatorType allocator;
145     std::allocator_traits<TaskQueueAllocatorType>::destroy(allocator, queue);
146     allocator.deallocate(static_cast<TaskQueue<TaskAllocatorType> *>(queue), 1U);
147 }
148 
149 template <class Allocator>
OnForegroundTaskDestructionCallback(TaskQueueInterface * queue)150 inline void TaskQueue<Allocator>::OnForegroundTaskDestructionCallback(TaskQueueInterface *queue)
151 {
152     auto iQueue = reinterpret_cast<TaskQueue *>(queue);
153     // Atomic with relaxed order reason: all non-atomic and relaxed stores will be see after waitingMutex_ getting
154     auto aliveTasks = iQueue->foregroundLiveTasks_.fetch_sub(1U, std::memory_order_relaxed);
155     if (aliveTasks == 1U) {
156         os::memory::LockHolder lh(iQueue->waitingMutex_);
157         iQueue->waitingCondVar_.SignalAll();
158     }
159 }
160 
161 template <class Allocator>
OnBackgroundTaskDestructionCallback(TaskQueueInterface * queue)162 inline void TaskQueue<Allocator>::OnBackgroundTaskDestructionCallback(TaskQueueInterface *queue)
163 {
164     auto iQueue = reinterpret_cast<TaskQueue *>(queue);
165     // Atomic with relaxed order reason: all non-atomic and relaxed stores will be see after waitingMutex_ getting
166     auto aliveTasks = iQueue->backgroundLiveTasks_.fetch_sub(1U, std::memory_order_relaxed);
167     if (aliveTasks == 1U) {
168         os::memory::LockHolder lh(iQueue->waitingMutex_);
169         iQueue->waitingCondVar_.SignalAll();
170     }
171 }
172 
173 template <class Allocator>
AddForegroundTask(RunnerCallback runner)174 inline size_t TaskQueue<Allocator>::AddForegroundTask(RunnerCallback runner)
175 {
176     IncrementCountOfLiveForegroundTasks();
177     return AddForegroundTaskImpl(std::move(runner));
178 }
179 
180 template <class Allocator>
AddBackgroundTask(RunnerCallback runner)181 inline size_t TaskQueue<Allocator>::AddBackgroundTask(RunnerCallback runner)
182 {
183     IncrementCountOfLiveBackgroundTasks();
184     return AddBackgroundTaskImpl(std::move(runner));
185 }
186 
187 template <class Allocator>
AddForegroundTaskInWaitList(RunnerCallback runner,uint64_t timeToWait)188 inline PANDA_PUBLIC_API WaiterId TaskQueue<Allocator>::AddForegroundTaskInWaitList(RunnerCallback runner,
189                                                                                    uint64_t timeToWait)
190 {
191     IncrementCountOfLiveForegroundTasks();
192     auto waitListCallback = [this](RunnerCallback &&irunner) { AddForegroundTaskImpl(std::move(irunner)); };
193     return waitList_->AddValueToWait({std::move(runner), waitListCallback}, timeToWait);
194 }
195 
196 template <class Allocator>
AddBackgroundTaskInWaitList(RunnerCallback runner,uint64_t timeToWait)197 inline PANDA_PUBLIC_API WaiterId TaskQueue<Allocator>::AddBackgroundTaskInWaitList(RunnerCallback runner,
198                                                                                    uint64_t timeToWait)
199 {
200     IncrementCountOfLiveBackgroundTasks();
201     auto waitListCallback = [this](RunnerCallback &&irunner) { AddBackgroundTaskImpl(std::move(irunner)); };
202     return waitList_->AddValueToWait({std::move(runner), waitListCallback}, timeToWait);
203 }
204 
205 template <class Allocator>
AddForegroundTaskInWaitList(RunnerCallback runner)206 inline PANDA_PUBLIC_API WaiterId TaskQueue<Allocator>::AddForegroundTaskInWaitList(RunnerCallback runner)
207 {
208     IncrementCountOfLiveForegroundTasks();
209     auto waitListCallback = [this](RunnerCallback &&irunner) { AddForegroundTaskImpl(std::move(irunner)); };
210     return waitList_->AddValueToWait({std::move(runner), waitListCallback});
211 }
212 
213 template <class Allocator>
AddBackgroundTaskInWaitList(RunnerCallback runner)214 inline PANDA_PUBLIC_API WaiterId TaskQueue<Allocator>::AddBackgroundTaskInWaitList(RunnerCallback runner)
215 {
216     IncrementCountOfLiveBackgroundTasks();
217     auto waitListCallback = [this](RunnerCallback &&irunner) { AddBackgroundTaskImpl(std::move(irunner)); };
218     return waitList_->AddValueToWait({std::move(runner), waitListCallback});
219 }
220 
221 template <class Allocator>
SignalWaitList(WaiterId id)222 void TaskQueue<Allocator>::SignalWaitList(WaiterId id)
223 {
224     auto waitVal = waitList_->GetValueById(id);
225     if (!waitVal.has_value()) {
226         return;
227     }
228     auto [task, taskPoster] = std::move(waitVal.value());
229     taskPoster(std::move(task));
230 }
231 
232 template <class Allocator>
IsEmpty()233 inline bool TaskQueue<Allocator>::IsEmpty() const
234 {
235     return foregroundTaskQueue_.IsEmpty() && backgroundTaskQueue_.IsEmpty();
236 }
237 
238 template <class Allocator>
HasForegroundTasks()239 inline bool TaskQueue<Allocator>::HasForegroundTasks() const
240 {
241     return !foregroundTaskQueue_.IsEmpty();
242 }
243 
244 template <class Allocator>
HasBackgroundTasks()245 inline bool TaskQueue<Allocator>::HasBackgroundTasks() const
246 {
247     return !backgroundTaskQueue_.IsEmpty();
248 }
249 
250 template <class Allocator>
Size()251 inline size_t TaskQueue<Allocator>::Size() const
252 {
253     return foregroundTaskQueue_.Size() + backgroundTaskQueue_.Size();
254 }
255 
256 template <class Allocator>
CountOfForegroundTasks()257 inline size_t TaskQueue<Allocator>::CountOfForegroundTasks() const
258 {
259     return foregroundTaskQueue_.Size();
260 }
261 
262 template <class Allocator>
CountOfBackgroundTasks()263 inline size_t TaskQueue<Allocator>::CountOfBackgroundTasks() const
264 {
265     return backgroundTaskQueue_.Size();
266 }
267 
268 template <class Allocator>
ExecuteTask()269 inline size_t TaskQueue<Allocator>::ExecuteTask()
270 {
271     TaskPtr task = PopTask();
272     if (task == nullptr) {
273         return 0U;
274     }
275     task->RunTask();
276     return 1U;
277 }
278 
279 template <class Allocator>
ExecuteForegroundTask()280 inline size_t TaskQueue<Allocator>::ExecuteForegroundTask()
281 {
282     TaskPtr task = PopForegroundTask();
283     if (task == nullptr) {
284         return 0U;
285     }
286     task->RunTask();
287     return 1U;
288 }
289 
290 template <class Allocator>
ExecuteBackgroundTask()291 inline size_t TaskQueue<Allocator>::ExecuteBackgroundTask()
292 {
293     TaskPtr task = PopBackgroundTask();
294     if (task == nullptr) {
295         return 0U;
296     }
297     task->RunTask();
298     return 1U;
299 }
300 
301 template <class Allocator>
WaitTasks()302 inline void TaskQueue<Allocator>::WaitTasks()
303 {
304     os::memory::LockHolder lh(waitingMutex_);
305     while (GetCountOfLiveBackgroundTasks() != 0 || GetCountOfLiveForegroundTasks() != 0) {
306         waitingCondVar_.Wait(&waitingMutex_);
307     }
308 }
309 
310 template <class Allocator>
WaitForegroundTasks()311 inline void TaskQueue<Allocator>::WaitForegroundTasks()
312 {
313     os::memory::LockHolder lh(waitingMutex_);
314     while (GetCountOfLiveForegroundTasks() != 0) {
315         waitingCondVar_.Wait(&waitingMutex_);
316     }
317 }
318 
319 template <class Allocator>
WaitBackgroundTasks()320 inline void TaskQueue<Allocator>::WaitBackgroundTasks()
321 {
322     os::memory::LockHolder lh(waitingMutex_);
323     while (GetCountOfLiveBackgroundTasks() != 0) {
324         waitingCondVar_.Wait(&waitingMutex_);
325     }
326 }
327 
328 template <class Allocator>
PopTask()329 inline TaskPtr TaskQueue<Allocator>::PopTask()
330 {
331     TaskPtr task = nullptr;
332     if (foregroundTaskQueue_.TryPop(&task)) {
333         return task;
334     }
335     backgroundTaskQueue_.TryPop(&task);
336     return task;
337 }
338 
339 template <class Allocator>
PopForegroundTask()340 inline TaskPtr TaskQueue<Allocator>::PopForegroundTask()
341 {
342     TaskPtr task = nullptr;
343     foregroundTaskQueue_.TryPop(&task);
344     return task;
345 }
346 
347 template <class Allocator>
PopBackgroundTask()348 inline TaskPtr TaskQueue<Allocator>::PopBackgroundTask()
349 {
350     TaskPtr task = nullptr;
351     backgroundTaskQueue_.TryPop(&task);
352     return task;
353 }
354 
355 template <class Allocator>
356 // CC-OFFNXT(G.FUD.06) Splitting this function will degrade readability. Keyword "inline" needs to satisfy ODR rule.
PopTasksToWorker(const AddTaskToWorkerFunc & addForegroundTaskFunc,const AddTaskToWorkerFunc & addBackgroundTaskFunc,size_t size)357 inline size_t TaskQueue<Allocator>::PopTasksToWorker(const AddTaskToWorkerFunc &addForegroundTaskFunc,
358                                                      const AddTaskToWorkerFunc &addBackgroundTaskFunc, size_t size)
359 {
360     for (size_t i = 0; i < size; i++) {
361         TaskPtr task;
362         if (foregroundTaskQueue_.TryPop(&task)) {
363             addForegroundTaskFunc(std::move(task));
364             continue;
365         }
366         if (backgroundTaskQueue_.TryPop(&task)) {
367             addBackgroundTaskFunc(std::move(task));
368             continue;
369         }
370         return i;
371     }
372     return size;
373 }
374 
375 template <class Allocator>
PopForegroundTasksToHelperThread(const AddTaskToHelperFunc & addTaskFunc,size_t size)376 inline size_t TaskQueue<Allocator>::PopForegroundTasksToHelperThread(const AddTaskToHelperFunc &addTaskFunc,
377                                                                      size_t size)
378 {
379     for (size_t i = 0; i < size; i++) {
380         TaskPtr task;
381         if (foregroundTaskQueue_.TryPop(&task)) {
382             addTaskFunc(std::move(task));
383         }
384         return i;
385     }
386     return size;
387 }
388 
389 template <class Allocator>
PopBackgroundTasksToHelperThread(const AddTaskToHelperFunc & addTaskFunc,size_t size)390 inline size_t TaskQueue<Allocator>::PopBackgroundTasksToHelperThread(const AddTaskToHelperFunc &addTaskFunc,
391                                                                      size_t size)
392 {
393     for (size_t i = 0; i < size; i++) {
394         TaskPtr task;
395         if (backgroundTaskQueue_.TryPop(&task)) {
396             addTaskFunc(std::move(task));
397         }
398         return i;
399     }
400     return size;
401 }
402 
403 template <class Allocator>
GetCountOfLiveTasks()404 inline size_t TaskQueue<Allocator>::GetCountOfLiveTasks() const
405 {
406     return GetCountOfLiveForegroundTasks() + GetCountOfLiveBackgroundTasks();
407 }
408 
409 template <class Allocator>
GetCountOfLiveForegroundTasks()410 inline size_t TaskQueue<Allocator>::GetCountOfLiveForegroundTasks() const
411 {
412     // Atomic with relaxed order reason: no order dependency with another variables
413     return foregroundLiveTasks_.load(std::memory_order_relaxed);
414 }
415 
416 template <class Allocator>
GetCountOfLiveBackgroundTasks()417 inline size_t TaskQueue<Allocator>::GetCountOfLiveBackgroundTasks() const
418 {
419     // Atomic with relaxed order reason: no order dependency with another variables
420     return backgroundLiveTasks_.load(std::memory_order_relaxed);
421 }
422 
423 template <class Allocator>
GetTaskTimeStats()424 inline TaskTimeStatsBase *TaskQueue<Allocator>::GetTaskTimeStats() const
425 {
426     return taskTimeStats_;
427 }
428 
429 template <class Allocator>
SetCallbacks(SignalWorkersCallback signalWorkersCallback)430 inline void TaskQueue<Allocator>::SetCallbacks(SignalWorkersCallback signalWorkersCallback)
431 {
432     signalWorkersCallback_ = std::move(signalWorkersCallback);
433 }
434 
435 template <class Allocator>
UnsetCallbacks()436 inline void TaskQueue<Allocator>::UnsetCallbacks()
437 {
438     signalWorkersCallback_ = nullptr;
439 }
440 
441 template <class Allocator>
AddForegroundTaskImpl(RunnerCallback && runner)442 inline size_t TaskQueue<Allocator>::AddForegroundTaskImpl(RunnerCallback &&runner)
443 {
444     auto task = Task::Create(std::move(runner), this, OnForegroundTaskDestructionCallback);
445     foregroundTaskQueue_.Push(std::move(task));
446     if (signalWorkersCallback_ != nullptr) {
447         signalWorkersCallback_();
448     }
449     return foregroundTaskQueue_.Size();
450 }
451 
452 template <class Allocator>
AddBackgroundTaskImpl(RunnerCallback && runner)453 inline size_t TaskQueue<Allocator>::AddBackgroundTaskImpl(RunnerCallback &&runner)
454 {
455     auto task = Task::Create(std::move(runner), this, OnBackgroundTaskDestructionCallback);
456     backgroundTaskQueue_.Push(std::move(task));
457     if (signalWorkersCallback_ != nullptr) {
458         signalWorkersCallback_();
459     }
460     return backgroundTaskQueue_.Size();
461 }
462 
463 template <class Allocator>
IncrementCountOfLiveForegroundTasks()464 inline size_t TaskQueue<Allocator>::IncrementCountOfLiveForegroundTasks()
465 {
466     // Atomic with relaxed order reason: no order dependency with another variables
467     return foregroundLiveTasks_.fetch_add(1U, std::memory_order_relaxed);
468 }
469 template <class Allocator>
IncrementCountOfLiveBackgroundTasks()470 inline size_t TaskQueue<Allocator>::IncrementCountOfLiveBackgroundTasks()
471 {
472     // Atomic with relaxed order reason: no order dependency with another variables
473     return backgroundLiveTasks_.fetch_add(1U, std::memory_order_relaxed);
474 }
475 
476 }  // namespace ark::taskmanager::internal
477 
478 #endif  // PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H
479