1 /* 2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H 17 #define PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H 18 19 #include "libpandabase/os/mutex.h" 20 #include "libpandabase/taskmanager/schedulable_task_queue_interface.h" 21 #include "libpandabase/taskmanager/utils/sp_sc_lock_free_queue.h" 22 23 namespace ark::taskmanager::internal { 24 25 /** 26 * @brief TaskQueue is a thread-safe queue for tasks. Queues can be registered in TaskScheduler and used to execute 27 * tasks on workers. Also, queues can notify other threads when a new task is pushed. 28 * @tparam Allocator - allocator of Task that will be used in internal queues. By default is used 29 * std::allocator<Task> 30 */ 31 template <class Allocator = std::allocator<Task>> 32 class TaskQueue : public SchedulableTaskQueueInterface { 33 using TaskAllocatorType = typename Allocator::template rebind<Task>::other; 34 using TaskQueueAllocatorType = typename Allocator::template rebind<TaskQueue<TaskAllocatorType>>::other; 35 template <class OtherAllocator> 36 friend class TaskQueue; 37 38 public: 39 NO_COPY_SEMANTIC(TaskQueue); 40 NO_MOVE_SEMANTIC(TaskQueue); 41 42 /** 43 * @brief The TaskQueue factory. Intended to be used by the TaskScheduler's CreateAndRegister method. 44 * @param task_type: TaskType of queue. 45 * @param vm_type: VMType of queue. 46 * @param priority: A number from 1 to 10 that determines the weight of the queue during the task selection process 47 * @return a pointer to the created queue. 48 */ Create(TaskType taskType,VMType vmType,uint8_t priority)49 static PANDA_PUBLIC_API SchedulableTaskQueueInterface *Create(TaskType taskType, VMType vmType, uint8_t priority) 50 { 51 TaskQueueAllocatorType allocator; 52 auto *mem = allocator.allocate(1U); 53 return new (mem) TaskQueue<TaskAllocatorType>(taskType, vmType, priority); 54 } 55 Destroy(SchedulableTaskQueueInterface * queue)56 static PANDA_PUBLIC_API void Destroy(SchedulableTaskQueueInterface *queue) 57 { 58 TaskQueueAllocatorType allocator; 59 std::allocator_traits<TaskQueueAllocatorType>::destroy(allocator, queue); 60 allocator.deallocate(static_cast<TaskQueue<TaskAllocatorType> *>(queue), 1U); 61 } 62 ~TaskQueue()63 PANDA_PUBLIC_API ~TaskQueue() override 64 { 65 ASSERT(AreInternalQueuesEmpty()); 66 } 67 68 /** 69 * @brief Adds task in task queue. Operation is thread-safe. 70 * @param task - task that will be added 71 * @return the size of queue after @arg task was added to it. 72 */ AddTask(Task && task)73 PANDA_PUBLIC_API size_t AddTask(Task &&task) override 74 { 75 auto properties = task.GetTaskProperties(); 76 ASSERT(properties.GetTaskType() == GetTaskType()); 77 ASSERT(properties.GetVMType() == GetVMType()); 78 // Send info about new added task 79 if (LIKELY(newTasksCallback_ != nullptr)) { 80 newTasksCallback_(properties, 1UL); 81 } 82 AddTaskWithoutNewTaskCallbackExecution(std::move(task)); 83 return Size(); 84 } 85 86 /** 87 * @brief The method adds a task to the queue without execution the new task callback. This method should only be 88 * used with tasks that have already triggered this callback. 89 * @param task: instance of Task 90 */ AddTaskWithoutNewTaskCallbackExecution(Task && task)91 void AddTaskWithoutNewTaskCallbackExecution(Task &&task) override 92 { 93 EventOnTaskAdding(&task); 94 // Push task in one of internal queues based on its TaskExecutionMode 95 PushTaskToInternalQueues(std::move(task)); 96 // Signal workers that should execute new task 97 if (signalWorkersCallback_ != nullptr) { 98 signalWorkersCallback_(); 99 } 100 } 101 102 /** 103 * @brief Pops task from task queue. Operation is thread-safe. The method will wait a new task if queue is empty 104 * and method WaitForQueueEmptyAndFinish has not been executed. Otherwise it will return std::nullopt. 105 * This method should be used only in TaskScheduler 106 */ PopTask()107 [[nodiscard]] std::optional<Task> PopTask() override 108 { 109 return PopTaskFromInternalQueues(); 110 } 111 112 /** 113 * @brief Pops task from task queue with specified execution mode. Operation is thread-safe. The method will wait 114 * a new task if queue with specified execution mode is empty and method WaitForQueueEmptyAndFinish has not been 115 * executed. Otherwise it will return std::nullopt. 116 * This method should be used only in TaskScheduler! 117 * @param mode - execution mode of task that we want to pop. 118 */ PopTask(TaskExecutionMode mode)119 [[nodiscard]] std::optional<Task> PopTask(TaskExecutionMode mode) override 120 { 121 if (UNLIKELY(!HasTaskWithExecutionMode(mode))) { 122 return std::nullopt; 123 } 124 auto *queue = &foregroundTaskQueue_; 125 if (UNLIKELY(mode != TaskExecutionMode::FOREGROUND)) { 126 queue = &backgroundTaskQueue_; 127 } 128 auto task = queue->Pop(); 129 return task; 130 } 131 132 /** 133 * @brief Method pops several tasks to worker. 134 * @param addTaskFunc - Functor that will be used to add popped tasks to worker 135 * @param size - Count of tasks you want to pop. If it is greater then count of tasks that are stored in queue, 136 * method will not wait and will pop all stored tasks. 137 * @return count of task that was added to worker 138 */ PopTasksToWorker(const AddTaskToWorkerFunc & addTaskFunc,size_t size)139 size_t PopTasksToWorker(const AddTaskToWorkerFunc &addTaskFunc, size_t size) override 140 { 141 if (UNLIKELY(AreInternalQueuesEmpty())) { 142 return 0; 143 } 144 size_t returnSize = 0; 145 for (; !AreInternalQueuesEmpty() && returnSize < size; returnSize++) { 146 addTaskFunc(PopTaskFromInternalQueues().value()); 147 } 148 return returnSize; 149 } 150 151 /** 152 * @brief Method pops several tasks to helper thread. Helper thread in TaskScheduler is the thread that uses 153 * HelpWorkersWithTasks method. 154 * @param addTaskFunc - Functor that will be used to add popped tasks to helper 155 * @param size - Count of tasks you want to pop. If it is greater then count of tasks that are stored in queue, 156 * method will not wait and will pop all stored tasks. 157 * @param mode - Execution mode of task you wast to pop 158 * @return count of task that was added to helper 159 */ PopTasksToHelperThread(const AddTaskToHelperFunc & addTaskFunc,size_t size,TaskExecutionMode mode)160 size_t PopTasksToHelperThread(const AddTaskToHelperFunc &addTaskFunc, size_t size, TaskExecutionMode mode) override 161 { 162 if (!HasTaskWithExecutionMode(mode)) { 163 return 0; 164 } 165 auto *queue = &foregroundTaskQueue_; 166 if (mode != TaskExecutionMode::FOREGROUND) { 167 queue = &backgroundTaskQueue_; 168 } 169 size_t returnSize = 0; 170 for (; HasTaskWithExecutionMode(mode) && returnSize < size; returnSize++) { 171 addTaskFunc(queue->Pop().value()); 172 } 173 return returnSize; 174 } 175 IsEmpty()176 [[nodiscard]] PANDA_PUBLIC_API bool IsEmpty() const override 177 { 178 return AreInternalQueuesEmpty(); 179 } 180 Size()181 [[nodiscard]] PANDA_PUBLIC_API size_t Size() const override 182 { 183 return SumSizeOfInternalQueues(); 184 } 185 186 /** 187 * @brief Method @returns true if queue does not have queue with specified execution mode 188 * @param mode - execution mode of tasks 189 */ HasTaskWithExecutionMode(TaskExecutionMode mode)190 [[nodiscard]] PANDA_PUBLIC_API bool HasTaskWithExecutionMode(TaskExecutionMode mode) const override 191 { 192 if (mode == TaskExecutionMode::FOREGROUND) { 193 return !foregroundTaskQueue_.IsEmpty(); 194 } 195 return !backgroundTaskQueue_.IsEmpty(); 196 } 197 CountOfTasksWithExecutionMode(TaskExecutionMode mode)198 [[nodiscard]] PANDA_PUBLIC_API size_t CountOfTasksWithExecutionMode(TaskExecutionMode mode) const override 199 { 200 if (mode == TaskExecutionMode::FOREGROUND) { 201 return foregroundTaskQueue_.Size(); 202 } 203 return backgroundTaskQueue_.Size(); 204 } 205 206 /** 207 * @brief This method saves the @arg callback. 208 * @param newTaskCallback - function that get count of inputted tasks and uses in AddTask method. 209 * @param signalWorkersCallback - function that should signal workers to return to work if it's needed 210 */ SetCallbacks(NewTasksCallback newTaskCallback,SignalWorkersCallback signalWorkersCallback)211 void SetCallbacks(NewTasksCallback newTaskCallback, SignalWorkersCallback signalWorkersCallback) override 212 { 213 newTasksCallback_ = std::move(newTaskCallback); 214 signalWorkersCallback_ = std::move(signalWorkersCallback); 215 } 216 217 /// @brief Removes callback function. This method should be used only in TaskScheduler! UnsetCallbacks()218 void UnsetCallbacks() override 219 { 220 newTasksCallback_ = nullptr; 221 signalWorkersCallback_ = nullptr; 222 } 223 224 private: 225 using InternalTaskQueue = SPSCLockFreeQueue<Task, TaskAllocatorType>; 226 TaskQueue(TaskType taskType,VMType vmType,uint8_t priority)227 TaskQueue(TaskType taskType, VMType vmType, uint8_t priority) 228 : SchedulableTaskQueueInterface(taskType, vmType, priority) 229 { 230 } 231 AreInternalQueuesEmpty()232 bool AreInternalQueuesEmpty() const 233 { 234 return foregroundTaskQueue_.IsEmpty() && backgroundTaskQueue_.IsEmpty(); 235 } 236 SumSizeOfInternalQueues()237 size_t SumSizeOfInternalQueues() const 238 { 239 return foregroundTaskQueue_.Size() + backgroundTaskQueue_.Size(); 240 } 241 PushTaskToInternalQueues(Task && task)242 void PushTaskToInternalQueues(Task &&task) 243 { 244 if (task.GetTaskProperties().GetTaskExecutionMode() == TaskExecutionMode::FOREGROUND) { 245 os::memory::LockHolder lockGuard(pushForegroundLock_); 246 foregroundTaskQueue_.Push(std::move(task)); 247 } else { 248 os::memory::LockHolder lockGuard(pushBackgroundLock_); 249 backgroundTaskQueue_.Push(std::move(task)); 250 } 251 } 252 PopTaskFromInternalQueues()253 std::optional<Task> PopTaskFromInternalQueues() 254 { 255 auto task = foregroundTaskQueue_.Pop(); 256 if (task.has_value()) { 257 return task; 258 } 259 return backgroundTaskQueue_.Pop(); 260 } 261 EventOnTaskAdding(Task * task)262 void EventOnTaskAdding(Task *task) 263 { 264 ASSERT(task != nullptr); 265 task->EventOnTaskAdding(); 266 } 267 268 /// subscriber_lock_ is used in case of calling new_tasks_callback_ 269 NewTasksCallback newTasksCallback_; 270 SignalWorkersCallback signalWorkersCallback_; 271 272 /// foreground part of TaskQueue 273 mutable os::memory::Mutex pushForegroundLock_; 274 InternalTaskQueue foregroundTaskQueue_; 275 276 /// background part of TaskQueue 277 mutable os::memory::Mutex pushBackgroundLock_; 278 InternalTaskQueue backgroundTaskQueue_; 279 }; 280 281 } // namespace ark::taskmanager::internal 282 283 #endif // PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H 284