• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H
17 #define PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H
18 
19 #include "libpandabase/os/mutex.h"
20 #include "libpandabase/taskmanager/schedulable_task_queue_interface.h"
21 
22 namespace panda::taskmanager::internal {
23 
24 /**
25  * @brief TaskQueue is a thread-safe queue for tasks. Queues can be registered in TaskScheduler and used to execute
26  * tasks on workers. Also, queues can notify other threads when a new task is pushed.
27  * @tparam Allocator - allocator of Task that will be used in internal queues. By default is used
28  * std::allocator<Task>
29  */
30 template <class Allocator = std::allocator<Task>>
31 class TaskQueue : public SchedulableTaskQueueInterface {
32     using TaskAllocatorType = typename Allocator::template rebind<Task>::other;
33     using TaskQueueAllocatorType = typename Allocator::template rebind<TaskQueue<TaskAllocatorType>>::other;
34     template <class OtherAllocator>
35     friend class TaskQueue;
36 
37 public:
38     NO_COPY_SEMANTIC(TaskQueue);
39     NO_MOVE_SEMANTIC(TaskQueue);
40 
41     /**
42      * @brief The TaskQueue factory. Intended to be used by the TaskScheduler's CreateAndRegister method.
43      * @param task_type: TaskType of queue.
44      * @param vm_type: VMType of queue.
45      * @param priority: A number from 1 to 10 that determines the weight of the queue during the task selection process
46      * @return a pointer to the created queue.
47      */
Create(TaskType taskType,VMType vmType,uint8_t priority)48     static PANDA_PUBLIC_API SchedulableTaskQueueInterface *Create(TaskType taskType, VMType vmType, uint8_t priority)
49     {
50         TaskQueueAllocatorType allocator;
51         auto *mem = allocator.allocate(sizeof(TaskQueue<TaskAllocatorType>));
52         return new (mem) TaskQueue<TaskAllocatorType>(taskType, vmType, priority);
53     }
54 
Destroy(SchedulableTaskQueueInterface * queue)55     static PANDA_PUBLIC_API void Destroy(SchedulableTaskQueueInterface *queue)
56     {
57         TaskQueueAllocatorType allocator;
58         std::allocator_traits<TaskQueueAllocatorType>::destroy(allocator, queue);
59         allocator.deallocate(static_cast<TaskQueue<TaskAllocatorType> *>(queue), sizeof(TaskQueue<TaskAllocatorType>));
60     }
61 
~TaskQueue()62     PANDA_PUBLIC_API ~TaskQueue() override
63     {
64         WaitForEmpty();
65     }
66 
67     /**
68      * @brief Adds task in task queue. Operation is thread-safe.
69      * @param task - task that will be added
70      * @return the size of queue after @arg task was added to it.
71      */
AddTask(Task && task)72     PANDA_PUBLIC_API size_t AddTask(Task &&task) override
73     {
74         ASSERT(task.GetTaskProperties().GetTaskType() == GetTaskType());
75         ASSERT(task.GetTaskProperties().GetVMType() == GetVMType());
76         os::memory::LockHolder pushLockHolder(pushPopLock_);
77         auto properties = task.GetTaskProperties();
78         size_t size = 0;
79         {
80             os::memory::LockHolder taskQueueStateLockHolder(taskQueueStateLock_);
81             PushTaskToInternalQueues(std::move(task));
82             pushWaitCondVar_.Signal();
83             size = SumSizeOfInternalQueues();
84         }
85         os::memory::LockHolder subscriberLockHolder(subscriberLock_);
86         // Notify subscriber about new task
87         if (newTasksCallback_ != nullptr) {
88             newTasksCallback_(properties, 1UL, size == 1UL);
89         }
90         return size;
91     }
92 
93     /**
94      * @brief Pops task from task queue. Operation is thread-safe. The method will wait new task if queue is empty and
95      * method WaitForQueueEmptyAndFinish has not been executed. Otherwise it will return std::nullopt.
96      * This method should be used only in TaskScheduler
97      */
PopTask()98     [[nodiscard]] std::optional<Task> PopTask() override
99     {
100         os::memory::LockHolder popLockHolder(pushPopLock_);
101         while (IsEmpty()) {
102             if (finish_) {
103                 return std::nullopt;
104             }
105             pushWaitCondVar_.Wait(&pushPopLock_);
106         }
107         os::memory::LockHolder taskQueueStateLockHolder(taskQueueStateLock_);
108         auto task = PopTaskFromInternalQueues();
109         finishCondVar_.Signal();
110         return std::make_optional(std::move(task));
111     }
112 
113     /**
114      * @brief Pops task from task queue with specified execution mode. Operation is thread-safe. The method will wait
115      * new task if queue with specified execution mode is empty and method WaitForQueueEmptyAndFinish has not been
116      * executed. Otherwise it will return std::nullopt.
117      * This method should be used only in TaskScheduler!
118      * @param mode - execution mode of task that we want to pop.
119      */
PopTask(TaskExecutionMode mode)120     [[nodiscard]] std::optional<Task> PopTask(TaskExecutionMode mode) override
121     {
122         os::memory::LockHolder popLockHolder(pushPopLock_);
123         auto *queue = &foregroundTaskQueue_;
124         if (mode != TaskExecutionMode::FOREGROUND) {
125             queue = &backgroundTaskQueue_;
126         }
127         while (!HasTaskWithExecutionMode(mode)) {
128             if (finish_) {
129                 return std::nullopt;
130             }
131             pushWaitCondVar_.Wait(&pushPopLock_);
132         }
133         os::memory::LockHolder taskQueueStateLockHolder(taskQueueStateLock_);
134         auto task = PopTaskFromQueue(*queue);
135         finishCondVar_.Signal();
136         return std::make_optional(std::move(task));
137     }
138 
139     /**
140      * @brief Method pops several tasks to worker.
141      * @param add_task_func - Functor that will be used to add popped tasks to worker
142      * @param size - Count of tasks you want to pop. If it is greater then count of tasks that are stored in queue,
143      * method will not wait and will pop all stored tasks.
144      * @return count of task that was added to worker
145      */
PopTasksToWorker(AddTaskToWorkerFunc addTaskFunc,size_t size)146     size_t PopTasksToWorker(AddTaskToWorkerFunc addTaskFunc, size_t size) override
147     {
148         os::memory::LockHolder popLockHolder(pushPopLock_);
149         os::memory::LockHolder taskQueueStateLockHolder(taskQueueStateLock_);
150         size = (SumSizeOfInternalQueues() < size) ? (SumSizeOfInternalQueues()) : (size);
151         for (size_t i = 0; i < size; i++) {
152             addTaskFunc(PopTaskFromInternalQueues());
153         }
154         finishCondVar_.Signal();
155         return size;
156     }
157 
IsEmpty()158     [[nodiscard]] PANDA_PUBLIC_API bool IsEmpty() const override
159     {
160         os::memory::LockHolder lockHolder(taskQueueStateLock_);
161         return AreInternalQueuesEmpty();
162     }
163 
Size()164     [[nodiscard]] PANDA_PUBLIC_API size_t Size() const override
165     {
166         os::memory::LockHolder lockHolder(taskQueueStateLock_);
167         return SumSizeOfInternalQueues();
168     }
169 
170     /**
171      * @brief Method @returns true if queue does not have queue with specified execution mode
172      * @param mode - execution mode of tasks
173      */
HasTaskWithExecutionMode(TaskExecutionMode mode)174     [[nodiscard]] PANDA_PUBLIC_API bool HasTaskWithExecutionMode(TaskExecutionMode mode) const override
175     {
176         os::memory::LockHolder lockHolder(taskQueueStateLock_);
177         if (mode == TaskExecutionMode::FOREGROUND) {
178             return !foregroundTaskQueue_.empty();
179         }
180         return !backgroundTaskQueue_.empty();
181     }
182 
183     /**
184      * @brief This method saves the @arg callback. It will be called after adding new task in AddTask method.
185      * This method should be used only in TaskScheduler!
186      * @param callback - function that get count of inputted tasks.
187      */
SetNewTasksCallback(NewTasksCallback callback)188     void SetNewTasksCallback(NewTasksCallback callback) override
189     {
190         os::memory::LockHolder subscriberLockHolder(subscriberLock_);
191         newTasksCallback_ = std::move(callback);
192     }
193 
194     /// @brief Removes callback function. This method should be used only in TaskScheduler!
UnsetNewTasksCallback()195     void UnsetNewTasksCallback() override
196     {
197         os::memory::LockHolder lockHolder(subscriberLock_);
198         newTasksCallback_ = nullptr;
199     }
200 
201     /**
202      * @brief Method waits until internal queue will be empty and finalize using of TaskQueue
203      * After this method PopTask will not wait for new tasks.
204      */
WaitForQueueEmptyAndFinish()205     void WaitForQueueEmptyAndFinish() override
206     {
207         WaitForEmpty();
208     }
209 
210 private:
WaitForEmpty()211     void WaitForEmpty()
212     {
213         {
214             os::memory::LockHolder lockHolder(taskQueueStateLock_);
215             while (!AreInternalQueuesEmpty()) {
216                 finishCondVar_.Wait(&taskQueueStateLock_);
217             }
218         }
219         os::memory::LockHolder pushPopLockHolder(pushPopLock_);
220         finish_ = true;
221         pushWaitCondVar_.SignalAll();
222     }
223 
224     using InternalTaskQueue = std::queue<Task, std::deque<Task, TaskAllocatorType>>;
225 
TaskQueue(TaskType taskType,VMType vmType,uint8_t priority)226     TaskQueue(TaskType taskType, VMType vmType, uint8_t priority)
227         : SchedulableTaskQueueInterface(taskType, vmType, priority),
228           foregroundTaskQueue_(TaskAllocatorType()),
229           backgroundTaskQueue_(TaskAllocatorType())
230     {
231     }
232 
AreInternalQueuesEmpty()233     bool AreInternalQueuesEmpty() const REQUIRES(taskQueueStateLock_)
234     {
235         return foregroundTaskQueue_.empty() && backgroundTaskQueue_.empty();
236     }
237 
SumSizeOfInternalQueues()238     size_t SumSizeOfInternalQueues() const REQUIRES(taskQueueStateLock_)
239     {
240         return foregroundTaskQueue_.size() + backgroundTaskQueue_.size();
241     }
242 
PushTaskToInternalQueues(Task && task)243     void PushTaskToInternalQueues(Task &&task) REQUIRES(taskQueueStateLock_)
244     {
245         if (task.GetTaskProperties().GetTaskExecutionMode() == TaskExecutionMode::FOREGROUND) {
246             foregroundTaskQueue_.push(std::move(task));
247         } else {
248             backgroundTaskQueue_.push(std::move(task));
249         }
250     }
251 
PopTaskFromInternalQueues()252     Task PopTaskFromInternalQueues() REQUIRES(taskQueueStateLock_)
253     {
254         if (!foregroundTaskQueue_.empty()) {
255             return PopTaskFromQueue(foregroundTaskQueue_);
256         }
257         return PopTaskFromQueue(backgroundTaskQueue_);
258     }
259 
PopTaskFromQueue(InternalTaskQueue & queue)260     Task PopTaskFromQueue(InternalTaskQueue &queue) REQUIRES(taskQueueStateLock_)
261     {
262         auto task = std::move(queue.front());
263         queue.pop();
264         return task;
265     }
266 
267     /// push_pop_lock_ is used in push and pop operations as first guarder
268     mutable os::memory::Mutex pushPopLock_;
269 
270     /// task_queue_state_lock_ is used in case of interaction with internal queues.
271     mutable os::memory::Mutex taskQueueStateLock_;
272 
273     os::memory::ConditionVariable pushWaitCondVar_ GUARDED_BY(pushPopLock_);
274     os::memory::ConditionVariable finishCondVar_ GUARDED_BY(taskQueueStateLock_);
275 
276     /// subscriber_lock_ is used in case of calling new_tasks_callback_
277     os::memory::Mutex subscriberLock_;
278     NewTasksCallback newTasksCallback_ GUARDED_BY(subscriberLock_);
279 
GUARDED_BY(pushPopLock_)280     bool finish_ GUARDED_BY(pushPopLock_) {false};
281 
282     /**
283      * foreground_task_queue_ is queue that contains task with ExecutionMode::FOREGROUND. If method PopTask() is used,
284      * foreground_task_queue_ will be checked first and if it's not empty, Task will be gotten from it.
285      */
286     InternalTaskQueue foregroundTaskQueue_ GUARDED_BY(taskQueueStateLock_);
287     /**
288      * background_task_queue_ is queue that contains task with ExecutionMode::BACKGROUND. If method PopTask() is used,
289      * background_task_queue_ will be popped only if foreground_task_queue_ is empty.
290      */
291     InternalTaskQueue backgroundTaskQueue_ GUARDED_BY(taskQueueStateLock_);
292 };
293 
294 }  // namespace panda::taskmanager::internal
295 
296 #endif  // PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H
297