1 /* 2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef PANDA_LIBPANDABASE_TASKMANAGER_TASK_MANAGER_H 17 #define PANDA_LIBPANDABASE_TASKMANAGER_TASK_MANAGER_H 18 19 #include "libpandabase/taskmanager/task_queue.h" 20 #include "libpandabase/taskmanager/utils/wait_list.h" 21 #include "libpandabase/taskmanager/worker_thread.h" 22 #include "libpandabase/taskmanager/utils/task_time_stats.h" 23 #include <vector> 24 #include <map> 25 #include <queue> 26 27 namespace ark::taskmanager { 28 /** 29 * Task Manager can register 3 queues with different type of tasks 30 * - GC queue(ECMA) 31 * - GC queue(ArkTS) 32 * - JIT queue 33 */ 34 class TaskScheduler { 35 using TaskPropertiesCounterMap = std::unordered_map<TaskProperties, size_t, TaskProperties::Hash>; 36 37 public: 38 NO_COPY_SEMANTIC(TaskScheduler); 39 NO_MOVE_SEMANTIC(TaskScheduler); 40 41 static constexpr uint64_t TASK_WAIT_TIMEOUT = 1U; 42 43 using LocalTaskQueue = std::queue<Task>; 44 45 /** 46 * @brief Creates an instance of TaskScheduler. 47 * @param threadsCount - number of worker that will be created be Task Manager 48 * @param taskStatsType - type of TaskStatistics that will be used in TaskScheduler 49 */ 50 PANDA_PUBLIC_API static TaskScheduler *Create(size_t threadsCount, 51 TaskTimeStatsType taskStatsType = TaskTimeStatsType::NO_STATISTICS); 52 53 /** 54 * @brief Returns the pointer to TaskScheduler. If you use it before the Create or after Destroy methods, it 55 * will return nullptr. 56 */ 57 [[nodiscard]] PANDA_PUBLIC_API static TaskScheduler *GetTaskScheduler(); 58 59 /// @brief Deletes the existed TaskScheduler. You should not use it if you didn't use Create before. 60 PANDA_PUBLIC_API static void Destroy(); 61 62 /// @brief Returns true if TaskScheduler outputs log info 63 PANDA_PUBLIC_API bool IsTaskLifetimeStatisticsUsed() const; 64 65 TaskTimeStatsBase *GetTaskTimeStats() const; 66 TaskTimeStatsType GetTaskTimeStatsType() const; 67 68 /** 69 * @brief Creates and starts workers with registered queues. After this method, you can not register new 70 * queues. 71 */ 72 PANDA_PUBLIC_API void Initialize(); 73 74 /** 75 * @brief Method allocates, constructs and registers TaskQueue. If it already exists, method returns nullptr. 76 * @param taskType - TaskType of future TaskQueue. 77 * @param vmType - VMType of future TaskQueue. 78 * @param priority - value of priority: 79 * TaskQueueInterface::MIN_PRIORITY <= priority <= TaskQueueInterface::MIN_PRIORITY 80 * @tparam Allocator - allocator of Task that will be used in internal queues. By default is used 81 * std::allocator<Task> 82 */ 83 template <class Allocator = std::allocator<Task>> 84 PANDA_PUBLIC_API TaskQueueInterface *CreateAndRegisterTaskQueue( 85 TaskType taskType, VMType vmType, uint8_t priority = TaskQueueInterface::DEFAULT_PRIORITY) 86 { 87 auto *queue = internal::TaskQueue<Allocator>::Create(taskType, vmType, priority); 88 if (UNLIKELY(queue == nullptr)) { 89 return nullptr; 90 } 91 auto id = RegisterQueue(queue); 92 if (UNLIKELY(id == INVALID_TASKQUEUE_ID)) { 93 internal::TaskQueue<Allocator>::Destroy(queue); 94 return nullptr; 95 } 96 return queue; 97 } 98 /** 99 * @brief Method Destroy and Unregister TaskQueue 100 * @param queue - TaskQueueInterface* of TaskQueue. 101 * @tparam Allocator - allocator of Task that will be used to deallocate TaskQueue. Use the same allocator as 102 * you have used in TaskScheduler::CreateAndRegisterTaskQueue method. 103 */ 104 template <class Allocator = std::allocator<Task>> UnregisterAndDestroyTaskQueue(TaskQueueInterface * queue)105 PANDA_PUBLIC_API void UnregisterAndDestroyTaskQueue(TaskQueueInterface *queue) 106 { 107 TaskQueueId id(queue->GetTaskType(), queue->GetVMType()); 108 auto *schedulableQueue = taskQueues_[id]; 109 110 schedulableQueue->UnsetCallbacks(); 111 taskQueues_.erase(id); 112 internal::TaskQueue<Allocator>::Destroy(schedulableQueue); 113 } 114 115 /** 116 * @brief Fills @arg worker (local queues) with tasks. The number of tasks obtained depends on the max size of the 117 * worker's local queue and the number of workers. The algorithm strives to give the same number of tasks to all 118 * workers. If queues are empty and TaskScheduler is not destroying workers will wait. If it's true, workers should 119 * finish after the execution of tasks. 120 * @param worker - pointer on worker that should be fill will tasks. 121 * @returns true if Worker should finish loop execution, otherwise returns false 122 */ 123 bool FillWithTasks(WorkerThread *worker); 124 125 /** 126 * @brief Method steal task from worker with the largest number of tasks and push it to gotten worker. 127 * @param worker: pointer to WorkerThread that should be fill with stollen task 128 */ 129 void StealTaskFromOtherWorker(WorkerThread *taskReceiver); 130 131 /// @brief Checks if task queues are empty 132 bool AreQueuesEmpty() const; 133 134 /// @brief Checks if worker local queues are empty 135 bool AreWorkersEmpty() const; 136 137 /** 138 * @brief Method increment counter of finished tasks and signal Finalize waiter 139 * @param counterMap - map from id to count of finished tasks 140 * @return count of executed tasks 141 */ 142 size_t IncrementCounterOfExecutedTasks(const TaskPropertiesCounterMap &counterMap); 143 144 /** 145 * @brief Executes tasks with specific properties. It will get them from queue or steal from workers. 146 * @param properties - TaskProperties of tasks needs to help 147 * @returns real count of tasks that was executed 148 */ 149 PANDA_PUBLIC_API size_t HelpWorkersWithTasks(TaskProperties properties); 150 151 /** 152 * @brief Method waits all tasks with specified properties. This method should be used only from Main Thread and 153 * only for finalization! 154 * @param properties - TaskProperties of tasks we will wait to be completed. 155 */ 156 PANDA_PUBLIC_API void WaitForFinishAllTasksWithProperties(TaskProperties properties); 157 158 /** 159 * @brief Adds the task to the wait list with timeout. After the timeout expires, the task will be added 160 * to its corresponding TaskQueue. 161 * @param task: instance of task 162 * @param time: waiting time in milliseconds 163 * @returns unique waiter id. It can be used to signal wait list to add task to TaskQueue 164 */ 165 PANDA_PUBLIC_API WaiterId AddTaskToWaitListWithTimeout(Task &&task, uint64_t time); 166 167 /** 168 * @brief Adds the task to the wait list. 169 * @param task: instance of task 170 * @returns unique waiter id. It can be used to signal wait list to add task to TaskQueue 171 * @see TaskScheduler::SignalWaitList 172 */ 173 PANDA_PUBLIC_API WaiterId AddTaskToWaitList(Task &&task); 174 175 /** 176 * @brief Signals wait list to add task in TaskQueue. 177 * @param waiterId: unique waiter id 178 * @see TaskScheduler::AddTaskToWaitListWithTimeout, TaskScheduler::AddTaskToWaitList 179 */ 180 PANDA_PUBLIC_API void SignalWaitList(WaiterId waiterId); 181 182 /// @brief This method indicates that workers can no longer wait for new tasks and be completed. 183 PANDA_PUBLIC_API void Finalize(); 184 185 PANDA_PUBLIC_API ~TaskScheduler(); 186 187 private: 188 explicit TaskScheduler(size_t workersCount, TaskTimeStatsType taskTimeStatsType); 189 190 /** 191 * @brief Method get and execute tasks with specified properties. If there are no tasks with that properties method 192 * will return nullopt. 193 * @param properties - TaskProperties of task we want to get. 194 * @returns real count of gotten tasks 195 */ 196 size_t GetAndExecuteSetOfTasksFromQueue(TaskProperties properties); 197 198 /** 199 * @brief Method steal and execute one task from one Worker. Method will find worker the largest number of tasks, 200 * steal one from it and execute. 201 * @param properties - TaskProperties of tasks needs to help 202 * @returns 1 if stealing was done successfully 203 */ 204 size_t StealAndExecuteOneTaskFromWorkers(TaskProperties properties); 205 206 /** 207 * @brief Registers a queue that was created externally. It should be valid until all workers finish, and the 208 * queue should have a unique set of TaskType and VMType fields. You can not use this method after Initialize() 209 * method 210 * @param queue: pointer to a valid TaskQueue. 211 * @return TaskQueueId of queue that was added. If queue with same TaskType and VMType is already added, method 212 * returns INVALID_TASKQUEUE_ID 213 */ 214 PANDA_PUBLIC_API TaskQueueId RegisterQueue(internal::SchedulableTaskQueueInterface *queue); 215 216 /** 217 * @brief Method pops one task from internal queues based on priorities. 218 * @return if queue are empty, returns nullopt, otherwise returns task. 219 */ 220 [[nodiscard]] std::optional<Task> GetNextTask() REQUIRES(popFromTaskQueuesLock_); 221 222 /// @brief Checks if there are no tasks in queues and workers 223 bool AreNoMoreTasks() const; 224 225 /** 226 * @brief Method puts tasks to @arg worker. Queue and count of tasks depends on selectedQueues_. After 227 * execution of the method selectedQueues_ will be empty. 228 * @param worker - pointer on worker that should be fill with tasks 229 * @param selectedQueue - count of selected tasks for all TaskQueueId 230 * @return count of task that was gotten by worker. 231 */ 232 size_t PutTasksInWorker(WorkerThread *worker, TaskQueueId selectedQueue); 233 234 /** 235 * @brief Method increment counter of new tasks and signal worker 236 * @param properties - TaskProperties of task from queue that execute the callback 237 * @param ivalue - the value by which the counter will be increased 238 */ 239 void IncrementCounterOfAddedTasks(TaskProperties properties, size_t ivalue); 240 241 /// @brief Method signals workers if it's needed 242 void SignalWorkers(); 243 244 void IncrementCountOfTasksInSystem(TaskProperties prop, size_t count); 245 246 void DecrementCountOfTasksInSystem(TaskProperties prop, size_t count); 247 248 size_t GetCountOfTasksInSystemWithTaskProperties(TaskProperties prop) const; 249 250 size_t GetCountOfTasksInSystem() const; 251 252 internal::SchedulableTaskQueueInterface *GetQueue(TaskQueueId id) const; 253 254 void PutWaitTaskInLocalQueue(LocalTaskQueue &queue) REQUIRES(taskSchedulerStateLock_); 255 256 void PutTaskInTaskQueues(LocalTaskQueue &queue); 257 258 /** 259 * @brief Method waits until new tasks coming or finishing of Task Scheduler usage 260 * @return true if TaskScheduler have tasks to manager 261 */ 262 bool WaitUntilNewTasks() REQUIRES(taskSchedulerStateLock_); 263 264 static TaskScheduler *instance_; 265 266 size_t workersCount_; 267 268 /// Pointers to Worker Threads. 269 std::vector<WorkerThread *> workers_; 270 271 /// Iterator for workers_ to balance stealing 272 size_t workersIterator_ = {0UL}; 273 274 /// pop_from_task_queues_lock_ is used to guard popping from queues 275 os::memory::Mutex popFromTaskQueuesLock_; 276 277 /// Represents count of task that sleeps 278 std::atomic_size_t waitWorkersCount_ {0UL}; 279 280 /** 281 * Map from TaskType and VMType to queue. 282 * Can be changed only before Initialize methods. 283 * Since we can change the map only before creating the workers, we do not need to synchronize access after 284 * Initialize method 285 */ 286 std::map<TaskQueueId, internal::SchedulableTaskQueueInterface *> taskQueues_; 287 288 /// task_scheduler_state_lock_ is used to check state of task 289 os::memory::RecursiveMutex taskSchedulerStateLock_; 290 291 /** 292 * queues_wait_cond_var_ is used when all registered queues are empty to wait until one of them will have a 293 * task 294 */ 295 os::memory::ConditionVariable queuesWaitCondVar_ GUARDED_BY(taskSchedulerStateLock_); 296 297 /** 298 * This cond var uses to wait for all tasks will be done. 299 * It is used in Finalize() method. 300 */ 301 os::memory::ConditionVariable finishTasksCondVar_ GUARDED_BY(taskSchedulerStateLock_); 302 303 /// start_ is true if we used Initialize method 304 std::atomic_bool start_ {false}; 305 306 /// finish_ is true when TaskScheduler finish Workers and TaskQueues GUARDED_BY(taskSchedulerStateLock_)307 bool finish_ GUARDED_BY(taskSchedulerStateLock_) {false}; 308 309 /// newTasksCount_ represents count of new tasks 310 TaskPropertiesCounterMap newTasksCount_ GUARDED_BY(taskSchedulerStateLock_); 311 312 std::atomic_size_t waitToFinish_ {0UL}; 313 314 std::atomic_bool disableHelpers_ {false}; 315 316 /** 317 * finishedTasksCount_ represents count of finished tasks; 318 * Task is finished if: 319 * - it was executed by Worker; 320 * - it was gotten by main thread; 321 */ 322 TaskPropertiesCounterMap finishedTasksCount_ GUARDED_BY(taskSchedulerStateLock_); 323 324 /** 325 * Represents count of tasks that exist in TaskScheduler system per TaskProperties. Task in system means that task 326 * was added but wasn't executed or popped. 327 */ 328 std::unordered_map<TaskProperties, std::atomic_size_t, TaskProperties::Hash> countOfTasksInSystem_; 329 330 TaskTimeStatsBase *taskTimeStats_ = nullptr; 331 TaskTimeStatsType taskTimeStatsType_; 332 333 internal::TaskSelector selector_; 334 335 WaitList<Task> waitList_ GUARDED_BY(taskSchedulerStateLock_); 336 }; 337 338 } // namespace ark::taskmanager 339 340 #endif // PANDA_LIBPANDABASE_TASKMANAGER_TASK_MANAGER_H 341