1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H 17 #define PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H 18 19 #include "libpandabase/os/mutex.h" 20 #include "libpandabase/taskmanager/schedulable_task_queue_interface.h" 21 22 namespace panda::taskmanager::internal { 23 24 /** 25 * @brief TaskQueue is a thread-safe queue for tasks. Queues can be registered in TaskScheduler and used to execute 26 * tasks on workers. Also, queues can notify other threads when a new task is pushed. 27 * @tparam Allocator - allocator of Task that will be used in internal queues. By default is used 28 * std::allocator<Task> 29 */ 30 template <class Allocator = std::allocator<Task>> 31 class TaskQueue : public SchedulableTaskQueueInterface { 32 using TaskAllocatorType = typename Allocator::template rebind<Task>::other; 33 using TaskQueueAllocatorType = typename Allocator::template rebind<TaskQueue<TaskAllocatorType>>::other; 34 template <class OtherAllocator> 35 friend class TaskQueue; 36 37 public: 38 NO_COPY_SEMANTIC(TaskQueue); 39 NO_MOVE_SEMANTIC(TaskQueue); 40 41 /** 42 * @brief The TaskQueue factory. Intended to be used by the TaskScheduler's CreateAndRegister method. 43 * @param task_type: TaskType of queue. 44 * @param vm_type: VMType of queue. 45 * @param priority: A number from 1 to 10 that determines the weight of the queue during the task selection process 46 * @return a pointer to the created queue. 47 */ Create(TaskType taskType,VMType vmType,uint8_t priority)48 static PANDA_PUBLIC_API SchedulableTaskQueueInterface *Create(TaskType taskType, VMType vmType, uint8_t priority) 49 { 50 TaskQueueAllocatorType allocator; 51 auto *mem = allocator.allocate(sizeof(TaskQueue<TaskAllocatorType>)); 52 return new (mem) TaskQueue<TaskAllocatorType>(taskType, vmType, priority); 53 } 54 Destroy(SchedulableTaskQueueInterface * queue)55 static PANDA_PUBLIC_API void Destroy(SchedulableTaskQueueInterface *queue) 56 { 57 TaskQueueAllocatorType allocator; 58 std::allocator_traits<TaskQueueAllocatorType>::destroy(allocator, queue); 59 allocator.deallocate(static_cast<TaskQueue<TaskAllocatorType> *>(queue), sizeof(TaskQueue<TaskAllocatorType>)); 60 } 61 ~TaskQueue()62 PANDA_PUBLIC_API ~TaskQueue() override 63 { 64 WaitForEmpty(); 65 } 66 67 /** 68 * @brief Adds task in task queue. Operation is thread-safe. 69 * @param task - task that will be added 70 * @return the size of queue after @arg task was added to it. 71 */ AddTask(Task && task)72 PANDA_PUBLIC_API size_t AddTask(Task &&task) override 73 { 74 ASSERT(task.GetTaskProperties().GetTaskType() == GetTaskType()); 75 ASSERT(task.GetTaskProperties().GetVMType() == GetVMType()); 76 os::memory::LockHolder pushLockHolder(pushPopLock_); 77 auto properties = task.GetTaskProperties(); 78 size_t size = 0; 79 { 80 os::memory::LockHolder taskQueueStateLockHolder(taskQueueStateLock_); 81 PushTaskToInternalQueues(std::move(task)); 82 pushWaitCondVar_.Signal(); 83 size = SumSizeOfInternalQueues(); 84 } 85 os::memory::LockHolder subscriberLockHolder(subscriberLock_); 86 // Notify subscriber about new task 87 if (newTasksCallback_ != nullptr) { 88 newTasksCallback_(properties, 1UL, size == 1UL); 89 } 90 return size; 91 } 92 93 /** 94 * @brief Pops task from task queue. Operation is thread-safe. The method will wait new task if queue is empty and 95 * method WaitForQueueEmptyAndFinish has not been executed. Otherwise it will return std::nullopt. 96 * This method should be used only in TaskScheduler 97 */ PopTask()98 [[nodiscard]] std::optional<Task> PopTask() override 99 { 100 os::memory::LockHolder popLockHolder(pushPopLock_); 101 while (IsEmpty()) { 102 if (finish_) { 103 return std::nullopt; 104 } 105 pushWaitCondVar_.Wait(&pushPopLock_); 106 } 107 os::memory::LockHolder taskQueueStateLockHolder(taskQueueStateLock_); 108 auto task = PopTaskFromInternalQueues(); 109 finishCondVar_.Signal(); 110 return std::make_optional(std::move(task)); 111 } 112 113 /** 114 * @brief Pops task from task queue with specified execution mode. Operation is thread-safe. The method will wait 115 * new task if queue with specified execution mode is empty and method WaitForQueueEmptyAndFinish has not been 116 * executed. Otherwise it will return std::nullopt. 117 * This method should be used only in TaskScheduler! 118 * @param mode - execution mode of task that we want to pop. 119 */ PopTask(TaskExecutionMode mode)120 [[nodiscard]] std::optional<Task> PopTask(TaskExecutionMode mode) override 121 { 122 os::memory::LockHolder popLockHolder(pushPopLock_); 123 auto *queue = &foregroundTaskQueue_; 124 if (mode != TaskExecutionMode::FOREGROUND) { 125 queue = &backgroundTaskQueue_; 126 } 127 while (!HasTaskWithExecutionMode(mode)) { 128 if (finish_) { 129 return std::nullopt; 130 } 131 pushWaitCondVar_.Wait(&pushPopLock_); 132 } 133 os::memory::LockHolder taskQueueStateLockHolder(taskQueueStateLock_); 134 auto task = PopTaskFromQueue(*queue); 135 finishCondVar_.Signal(); 136 return std::make_optional(std::move(task)); 137 } 138 139 /** 140 * @brief Method pops several tasks to worker. 141 * @param add_task_func - Functor that will be used to add popped tasks to worker 142 * @param size - Count of tasks you want to pop. If it is greater then count of tasks that are stored in queue, 143 * method will not wait and will pop all stored tasks. 144 * @return count of task that was added to worker 145 */ PopTasksToWorker(AddTaskToWorkerFunc addTaskFunc,size_t size)146 size_t PopTasksToWorker(AddTaskToWorkerFunc addTaskFunc, size_t size) override 147 { 148 os::memory::LockHolder popLockHolder(pushPopLock_); 149 os::memory::LockHolder taskQueueStateLockHolder(taskQueueStateLock_); 150 size = (SumSizeOfInternalQueues() < size) ? (SumSizeOfInternalQueues()) : (size); 151 for (size_t i = 0; i < size; i++) { 152 addTaskFunc(PopTaskFromInternalQueues()); 153 } 154 finishCondVar_.Signal(); 155 return size; 156 } 157 IsEmpty()158 [[nodiscard]] PANDA_PUBLIC_API bool IsEmpty() const override 159 { 160 os::memory::LockHolder lockHolder(taskQueueStateLock_); 161 return AreInternalQueuesEmpty(); 162 } 163 Size()164 [[nodiscard]] PANDA_PUBLIC_API size_t Size() const override 165 { 166 os::memory::LockHolder lockHolder(taskQueueStateLock_); 167 return SumSizeOfInternalQueues(); 168 } 169 170 /** 171 * @brief Method @returns true if queue does not have queue with specified execution mode 172 * @param mode - execution mode of tasks 173 */ HasTaskWithExecutionMode(TaskExecutionMode mode)174 [[nodiscard]] PANDA_PUBLIC_API bool HasTaskWithExecutionMode(TaskExecutionMode mode) const override 175 { 176 os::memory::LockHolder lockHolder(taskQueueStateLock_); 177 if (mode == TaskExecutionMode::FOREGROUND) { 178 return !foregroundTaskQueue_.empty(); 179 } 180 return !backgroundTaskQueue_.empty(); 181 } 182 183 /** 184 * @brief This method saves the @arg callback. It will be called after adding new task in AddTask method. 185 * This method should be used only in TaskScheduler! 186 * @param callback - function that get count of inputted tasks. 187 */ SetNewTasksCallback(NewTasksCallback callback)188 void SetNewTasksCallback(NewTasksCallback callback) override 189 { 190 os::memory::LockHolder subscriberLockHolder(subscriberLock_); 191 newTasksCallback_ = std::move(callback); 192 } 193 194 /// @brief Removes callback function. This method should be used only in TaskScheduler! UnsetNewTasksCallback()195 void UnsetNewTasksCallback() override 196 { 197 os::memory::LockHolder lockHolder(subscriberLock_); 198 newTasksCallback_ = nullptr; 199 } 200 201 /** 202 * @brief Method waits until internal queue will be empty and finalize using of TaskQueue 203 * After this method PopTask will not wait for new tasks. 204 */ WaitForQueueEmptyAndFinish()205 void WaitForQueueEmptyAndFinish() override 206 { 207 WaitForEmpty(); 208 } 209 210 private: WaitForEmpty()211 void WaitForEmpty() 212 { 213 { 214 os::memory::LockHolder lockHolder(taskQueueStateLock_); 215 while (!AreInternalQueuesEmpty()) { 216 finishCondVar_.Wait(&taskQueueStateLock_); 217 } 218 } 219 os::memory::LockHolder pushPopLockHolder(pushPopLock_); 220 finish_ = true; 221 pushWaitCondVar_.SignalAll(); 222 } 223 224 using InternalTaskQueue = std::queue<Task, std::deque<Task, TaskAllocatorType>>; 225 TaskQueue(TaskType taskType,VMType vmType,uint8_t priority)226 TaskQueue(TaskType taskType, VMType vmType, uint8_t priority) 227 : SchedulableTaskQueueInterface(taskType, vmType, priority), 228 foregroundTaskQueue_(TaskAllocatorType()), 229 backgroundTaskQueue_(TaskAllocatorType()) 230 { 231 } 232 AreInternalQueuesEmpty()233 bool AreInternalQueuesEmpty() const REQUIRES(taskQueueStateLock_) 234 { 235 return foregroundTaskQueue_.empty() && backgroundTaskQueue_.empty(); 236 } 237 SumSizeOfInternalQueues()238 size_t SumSizeOfInternalQueues() const REQUIRES(taskQueueStateLock_) 239 { 240 return foregroundTaskQueue_.size() + backgroundTaskQueue_.size(); 241 } 242 PushTaskToInternalQueues(Task && task)243 void PushTaskToInternalQueues(Task &&task) REQUIRES(taskQueueStateLock_) 244 { 245 if (task.GetTaskProperties().GetTaskExecutionMode() == TaskExecutionMode::FOREGROUND) { 246 foregroundTaskQueue_.push(std::move(task)); 247 } else { 248 backgroundTaskQueue_.push(std::move(task)); 249 } 250 } 251 PopTaskFromInternalQueues()252 Task PopTaskFromInternalQueues() REQUIRES(taskQueueStateLock_) 253 { 254 if (!foregroundTaskQueue_.empty()) { 255 return PopTaskFromQueue(foregroundTaskQueue_); 256 } 257 return PopTaskFromQueue(backgroundTaskQueue_); 258 } 259 PopTaskFromQueue(InternalTaskQueue & queue)260 Task PopTaskFromQueue(InternalTaskQueue &queue) REQUIRES(taskQueueStateLock_) 261 { 262 auto task = std::move(queue.front()); 263 queue.pop(); 264 return task; 265 } 266 267 /// push_pop_lock_ is used in push and pop operations as first guarder 268 mutable os::memory::Mutex pushPopLock_; 269 270 /// task_queue_state_lock_ is used in case of interaction with internal queues. 271 mutable os::memory::Mutex taskQueueStateLock_; 272 273 os::memory::ConditionVariable pushWaitCondVar_ GUARDED_BY(pushPopLock_); 274 os::memory::ConditionVariable finishCondVar_ GUARDED_BY(taskQueueStateLock_); 275 276 /// subscriber_lock_ is used in case of calling new_tasks_callback_ 277 os::memory::Mutex subscriberLock_; 278 NewTasksCallback newTasksCallback_ GUARDED_BY(subscriberLock_); 279 GUARDED_BY(pushPopLock_)280 bool finish_ GUARDED_BY(pushPopLock_) {false}; 281 282 /** 283 * foreground_task_queue_ is queue that contains task with ExecutionMode::FOREGROUND. If method PopTask() is used, 284 * foreground_task_queue_ will be checked first and if it's not empty, Task will be gotten from it. 285 */ 286 InternalTaskQueue foregroundTaskQueue_ GUARDED_BY(taskQueueStateLock_); 287 /** 288 * background_task_queue_ is queue that contains task with ExecutionMode::BACKGROUND. If method PopTask() is used, 289 * background_task_queue_ will be popped only if foreground_task_queue_ is empty. 290 */ 291 InternalTaskQueue backgroundTaskQueue_ GUARDED_BY(taskQueueStateLock_); 292 }; 293 294 } // namespace panda::taskmanager::internal 295 296 #endif // PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H 297