• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H
17 #define PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H
18 
19 #include "libpandabase/os/mutex.h"
20 #include "libpandabase/taskmanager/schedulable_task_queue_interface.h"
21 #include "libpandabase/taskmanager/utils/sp_sc_lock_free_queue.h"
22 
23 namespace ark::taskmanager::internal {
24 
25 /**
26  * @brief TaskQueue is a thread-safe queue for tasks. Queues can be registered in TaskScheduler and used to execute
27  * tasks on workers. Also, queues can notify other threads when a new task is pushed.
28  * @tparam Allocator - allocator of Task that will be used in internal queues. By default is used
29  * std::allocator<Task>
30  */
31 template <class Allocator = std::allocator<Task>>
32 class TaskQueue : public SchedulableTaskQueueInterface {
33     using TaskAllocatorType = typename Allocator::template rebind<Task>::other;
34     using TaskQueueAllocatorType = typename Allocator::template rebind<TaskQueue<TaskAllocatorType>>::other;
35     template <class OtherAllocator>
36     friend class TaskQueue;
37 
38 public:
39     NO_COPY_SEMANTIC(TaskQueue);
40     NO_MOVE_SEMANTIC(TaskQueue);
41 
42     /**
43      * @brief The TaskQueue factory. Intended to be used by the TaskScheduler's CreateAndRegister method.
44      * @param task_type: TaskType of queue.
45      * @param vm_type: VMType of queue.
46      * @param priority: A number from 1 to 10 that determines the weight of the queue during the task selection process
47      * @return a pointer to the created queue.
48      */
Create(TaskType taskType,VMType vmType,uint8_t priority)49     static PANDA_PUBLIC_API SchedulableTaskQueueInterface *Create(TaskType taskType, VMType vmType, uint8_t priority)
50     {
51         TaskQueueAllocatorType allocator;
52         auto *mem = allocator.allocate(1U);
53         return new (mem) TaskQueue<TaskAllocatorType>(taskType, vmType, priority);
54     }
55 
Destroy(SchedulableTaskQueueInterface * queue)56     static PANDA_PUBLIC_API void Destroy(SchedulableTaskQueueInterface *queue)
57     {
58         TaskQueueAllocatorType allocator;
59         std::allocator_traits<TaskQueueAllocatorType>::destroy(allocator, queue);
60         allocator.deallocate(static_cast<TaskQueue<TaskAllocatorType> *>(queue), 1U);
61     }
62 
~TaskQueue()63     PANDA_PUBLIC_API ~TaskQueue() override
64     {
65         ASSERT(AreInternalQueuesEmpty());
66     }
67 
68     /**
69      * @brief Adds task in task queue. Operation is thread-safe.
70      * @param task - task that will be added
71      * @return the size of queue after @arg task was added to it.
72      */
AddTask(Task && task)73     PANDA_PUBLIC_API size_t AddTask(Task &&task) override
74     {
75         auto properties = task.GetTaskProperties();
76         ASSERT(properties.GetTaskType() == GetTaskType());
77         ASSERT(properties.GetVMType() == GetVMType());
78         // Send info about new added task
79         if (LIKELY(newTasksCallback_ != nullptr)) {
80             newTasksCallback_(properties, 1UL);
81         }
82         AddTaskWithoutNewTaskCallbackExecution(std::move(task));
83         return Size();
84     }
85 
86     /**
87      * @brief The method adds a task to the queue without execution the new task callback. This method should only be
88      * used with tasks that have already triggered this callback.
89      * @param task: instance of Task
90      */
AddTaskWithoutNewTaskCallbackExecution(Task && task)91     void AddTaskWithoutNewTaskCallbackExecution(Task &&task) override
92     {
93         EventOnTaskAdding(&task);
94         // Push task in one of internal queues based on its TaskExecutionMode
95         PushTaskToInternalQueues(std::move(task));
96         // Signal workers that should execute new task
97         if (signalWorkersCallback_ != nullptr) {
98             signalWorkersCallback_();
99         }
100     }
101 
102     /**
103      * @brief Pops task from task queue. Operation is thread-safe. The method will wait a new task if queue is empty
104      * and method WaitForQueueEmptyAndFinish has not been executed. Otherwise it will return std::nullopt.
105      * This method should be used only in TaskScheduler
106      */
PopTask()107     [[nodiscard]] std::optional<Task> PopTask() override
108     {
109         return PopTaskFromInternalQueues();
110     }
111 
112     /**
113      * @brief Pops task from task queue with specified execution mode. Operation is thread-safe. The method will wait
114      * a new task if queue with specified execution mode is empty and method WaitForQueueEmptyAndFinish has not been
115      * executed. Otherwise it will return std::nullopt.
116      * This method should be used only in TaskScheduler!
117      * @param mode - execution mode of task that we want to pop.
118      */
PopTask(TaskExecutionMode mode)119     [[nodiscard]] std::optional<Task> PopTask(TaskExecutionMode mode) override
120     {
121         if (UNLIKELY(!HasTaskWithExecutionMode(mode))) {
122             return std::nullopt;
123         }
124         auto *queue = &foregroundTaskQueue_;
125         if (UNLIKELY(mode != TaskExecutionMode::FOREGROUND)) {
126             queue = &backgroundTaskQueue_;
127         }
128         auto task = queue->Pop();
129         return task;
130     }
131 
132     /**
133      * @brief Method pops several tasks to worker.
134      * @param addTaskFunc - Functor that will be used to add popped tasks to worker
135      * @param size - Count of tasks you want to pop. If it is greater then count of tasks that are stored in queue,
136      * method will not wait and will pop all stored tasks.
137      * @return count of task that was added to worker
138      */
PopTasksToWorker(const AddTaskToWorkerFunc & addTaskFunc,size_t size)139     size_t PopTasksToWorker(const AddTaskToWorkerFunc &addTaskFunc, size_t size) override
140     {
141         if (UNLIKELY(AreInternalQueuesEmpty())) {
142             return 0;
143         }
144         size_t returnSize = 0;
145         for (; !AreInternalQueuesEmpty() && returnSize < size; returnSize++) {
146             addTaskFunc(PopTaskFromInternalQueues().value());
147         }
148         return returnSize;
149     }
150 
151     /**
152      * @brief Method pops several tasks to helper thread. Helper thread in TaskScheduler is the thread that uses
153      * HelpWorkersWithTasks method.
154      * @param addTaskFunc - Functor that will be used to add popped tasks to helper
155      * @param size - Count of tasks you want to pop. If it is greater then count of tasks that are stored in queue,
156      * method will not wait and will pop all stored tasks.
157      * @param mode - Execution mode of task you wast to pop
158      * @return count of task that was added to helper
159      */
PopTasksToHelperThread(const AddTaskToHelperFunc & addTaskFunc,size_t size,TaskExecutionMode mode)160     size_t PopTasksToHelperThread(const AddTaskToHelperFunc &addTaskFunc, size_t size, TaskExecutionMode mode) override
161     {
162         if (!HasTaskWithExecutionMode(mode)) {
163             return 0;
164         }
165         auto *queue = &foregroundTaskQueue_;
166         if (mode != TaskExecutionMode::FOREGROUND) {
167             queue = &backgroundTaskQueue_;
168         }
169         size_t returnSize = 0;
170         for (; HasTaskWithExecutionMode(mode) && returnSize < size; returnSize++) {
171             addTaskFunc(queue->Pop().value());
172         }
173         return returnSize;
174     }
175 
IsEmpty()176     [[nodiscard]] PANDA_PUBLIC_API bool IsEmpty() const override
177     {
178         return AreInternalQueuesEmpty();
179     }
180 
Size()181     [[nodiscard]] PANDA_PUBLIC_API size_t Size() const override
182     {
183         return SumSizeOfInternalQueues();
184     }
185 
186     /**
187      * @brief Method @returns true if queue does not have queue with specified execution mode
188      * @param mode - execution mode of tasks
189      */
HasTaskWithExecutionMode(TaskExecutionMode mode)190     [[nodiscard]] PANDA_PUBLIC_API bool HasTaskWithExecutionMode(TaskExecutionMode mode) const override
191     {
192         if (mode == TaskExecutionMode::FOREGROUND) {
193             return !foregroundTaskQueue_.IsEmpty();
194         }
195         return !backgroundTaskQueue_.IsEmpty();
196     }
197 
CountOfTasksWithExecutionMode(TaskExecutionMode mode)198     [[nodiscard]] PANDA_PUBLIC_API size_t CountOfTasksWithExecutionMode(TaskExecutionMode mode) const override
199     {
200         if (mode == TaskExecutionMode::FOREGROUND) {
201             return foregroundTaskQueue_.Size();
202         }
203         return backgroundTaskQueue_.Size();
204     }
205 
206     /**
207      * @brief This method saves the @arg callback.
208      * @param newTaskCallback - function that get count of inputted tasks and uses in AddTask method.
209      * @param signalWorkersCallback - function that should signal workers to return to work if it's needed
210      */
SetCallbacks(NewTasksCallback newTaskCallback,SignalWorkersCallback signalWorkersCallback)211     void SetCallbacks(NewTasksCallback newTaskCallback, SignalWorkersCallback signalWorkersCallback) override
212     {
213         newTasksCallback_ = std::move(newTaskCallback);
214         signalWorkersCallback_ = std::move(signalWorkersCallback);
215     }
216 
217     /// @brief Removes callback function. This method should be used only in TaskScheduler!
UnsetCallbacks()218     void UnsetCallbacks() override
219     {
220         newTasksCallback_ = nullptr;
221         signalWorkersCallback_ = nullptr;
222     }
223 
224 private:
225     using InternalTaskQueue = SPSCLockFreeQueue<Task, TaskAllocatorType>;
226 
TaskQueue(TaskType taskType,VMType vmType,uint8_t priority)227     TaskQueue(TaskType taskType, VMType vmType, uint8_t priority)
228         : SchedulableTaskQueueInterface(taskType, vmType, priority)
229     {
230     }
231 
AreInternalQueuesEmpty()232     bool AreInternalQueuesEmpty() const
233     {
234         return foregroundTaskQueue_.IsEmpty() && backgroundTaskQueue_.IsEmpty();
235     }
236 
SumSizeOfInternalQueues()237     size_t SumSizeOfInternalQueues() const
238     {
239         return foregroundTaskQueue_.Size() + backgroundTaskQueue_.Size();
240     }
241 
PushTaskToInternalQueues(Task && task)242     void PushTaskToInternalQueues(Task &&task)
243     {
244         if (task.GetTaskProperties().GetTaskExecutionMode() == TaskExecutionMode::FOREGROUND) {
245             os::memory::LockHolder lockGuard(pushForegroundLock_);
246             foregroundTaskQueue_.Push(std::move(task));
247         } else {
248             os::memory::LockHolder lockGuard(pushBackgroundLock_);
249             backgroundTaskQueue_.Push(std::move(task));
250         }
251     }
252 
PopTaskFromInternalQueues()253     std::optional<Task> PopTaskFromInternalQueues()
254     {
255         auto task = foregroundTaskQueue_.Pop();
256         if (task.has_value()) {
257             return task;
258         }
259         return backgroundTaskQueue_.Pop();
260     }
261 
EventOnTaskAdding(Task * task)262     void EventOnTaskAdding(Task *task)
263     {
264         ASSERT(task != nullptr);
265         task->EventOnTaskAdding();
266     }
267 
268     /// subscriber_lock_ is used in case of calling new_tasks_callback_
269     NewTasksCallback newTasksCallback_;
270     SignalWorkersCallback signalWorkersCallback_;
271 
272     /// foreground part of TaskQueue
273     mutable os::memory::Mutex pushForegroundLock_;
274     InternalTaskQueue foregroundTaskQueue_;
275 
276     /// background part of TaskQueue
277     mutable os::memory::Mutex pushBackgroundLock_;
278     InternalTaskQueue backgroundTaskQueue_;
279 };
280 
281 }  // namespace ark::taskmanager::internal
282 
283 #endif  // PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H
284