1 /* 2 * Copyright (c) 2023-2025 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef PANDA_LIBPANDABASE_TASKMANAGER_TASK_MANAGER_H 17 #define PANDA_LIBPANDABASE_TASKMANAGER_TASK_MANAGER_H 18 19 #include "libpandabase/taskmanager/task_queue.h" 20 #include "libpandabase/taskmanager/utils/wait_list.h" 21 #include "libpandabase/taskmanager/worker_thread.h" 22 #include "libpandabase/taskmanager/utils/task_time_stats.h" 23 #include <vector> 24 #include <map> 25 #include <queue> 26 27 namespace ark::taskmanager { 28 class TaskQueueInterface; 29 } // namespace ark::taskmanager 30 31 namespace ark::taskmanager::internal { 32 33 class TaskScheduler { 34 public: 35 TaskScheduler(size_t workersCount, TaskWaitList *waitList, TaskQueueSet *queueSet); 36 PANDA_PUBLIC_API ~TaskScheduler(); 37 NO_COPY_SEMANTIC(TaskScheduler); 38 NO_MOVE_SEMANTIC(TaskScheduler); 39 40 static constexpr uint64_t TASK_WAIT_TIMEOUT = 1U; 41 42 using LocalTaskQueue = std::queue<Task>; 43 44 /// @brief Returns true if TaskScheduler outputs log info 45 PANDA_PUBLIC_API bool IsTaskLifetimeStatisticsUsed() const; 46 47 /** 48 * @brief Fills @arg worker (local queues) with tasks. The number of tasks obtained depends on the max size of the 49 * worker's local queue and the number of workers. The algorithm strives to give the same number of tasks to all 50 * workers. If queues are empty and TaskScheduler is not destroying workers will wait. If it's true, workers should 51 * finish after the execution of tasks. 52 * @param worker - pointer on worker that should be fill will tasks. 53 * @returns true if Worker should finish loop execution, otherwise returns false 54 */ 55 bool FillWithTasks(WorkerThread *worker); 56 57 /** 58 * @brief Method steal task from worker with the largest number of tasks and push it to gotten worker. 59 * @param worker: pointer to WorkerThread that should be fill with stollen task 60 */ 61 size_t StealTaskFromOtherWorker(WorkerThread *taskReceiver); 62 63 /// @brief Checks if task queues are empty 64 bool AreQueuesEmpty() const; 65 66 /// @brief Checks if worker local queues are empty 67 bool AreWorkersEmpty() const; 68 69 /** 70 * @brief Executes tasks with specific properties. It will get them from queue or steal from workers. 71 * @param properties - TaskProperties of tasks needs to help 72 * @returns real count of tasks that was executed 73 */ 74 PANDA_PUBLIC_API size_t HelpWorkersWithTasks(TaskQueueInterface *queue); 75 76 /** 77 * @brief Adds the task to the wait list with timeout. After the timeout expires, the task will be added 78 * to its corresponding TaskQueue. 79 * @param task: instance of task 80 * @param time: waiting time in milliseconds 81 * @returns unique waiter id. It can be used to signal wait list to add task to TaskQueue 82 */ 83 PANDA_PUBLIC_API WaiterId AddTaskToWaitListWithTimeout(Task &&task, uint64_t time); 84 85 /** 86 * @brief Adds the task to the wait list. 87 * @param task: instance of task 88 * @returns unique waiter id. It can be used to signal wait list to add task to TaskQueue 89 * @see TaskScheduler::SignalWaitList 90 */ 91 PANDA_PUBLIC_API WaiterId AddTaskToWaitList(Task &&task); 92 93 /** 94 * @brief Signals wait list to add task in TaskQueue. 95 * @param waiterId: unique waiter id 96 * @see TaskScheduler::AddTaskToWaitListWithTimeout, TaskScheduler::AddTaskToWaitList 97 */ 98 PANDA_PUBLIC_API void SignalWaitList(WaiterId waiterId); 99 100 PANDA_PUBLIC_API size_t GetCountOfTasksInSystem() const; 101 /// @brief Method signals workers if it's needed 102 void SignalWorkers(); 103 104 PANDA_PUBLIC_API size_t GetCountOfWorkers() const; 105 PANDA_PUBLIC_API void SetCountOfWorkers(size_t count); 106 107 private: 108 /** 109 * @brief Method get and execute tasks with specified properties. If there are no tasks with that properties method 110 * will return nullopt. 111 * @param properties - TaskProperties of task we want to get. 112 * @returns real count of gotten tasks 113 */ 114 size_t GetAndExecuteSetOfTasksFromQueue(TaskQueueInterface *properties); 115 116 /** 117 * @brief Method steal and execute one task from one Worker. Method will find worker the largest number of tasks, 118 * steal one from it and execute. 119 * @param properties - TaskProperties of tasks needs to help 120 * @returns 1 if stealing was done successfully 121 */ 122 size_t StealAndExecuteOneTaskFromWorkers(TaskQueueInterface *properties); 123 124 size_t PutTasksInWorker(WorkerThread *worker, internal::SchedulableTaskQueueInterface *queue); 125 126 /// @brief Checks if there are no tasks in queues and workers 127 bool AreNoMoreTasks() const; 128 129 void PutWaitTaskInLocalQueue(LocalTaskQueue &queue) REQUIRES(taskSchedulerStateLock_); 130 131 void PutTaskInTaskQueues(LocalTaskQueue &queue); 132 133 /** 134 * @brief Method waits until new tasks coming or finishing of Task Scheduler usage 135 * @return true if TaskScheduler have tasks to manager 136 */ 137 bool WaitUntilNewTasks(WorkerThread *worker); 138 139 size_t ProcessWaitList(); 140 141 TaskWaitList *waitList_ = nullptr; 142 TaskQueueSet *queueSet_ = nullptr; 143 std::atomic_bool waitListIsProcessing_ {false}; 144 145 /// Pointers to Worker Threads. 146 std::array<std::atomic<WorkerThread *>, MAX_WORKER_COUNT> workers_ {}; 147 std::atomic_size_t workersCount_ = {0UL}; 148 149 /// Iterator for workers_ to balance stealing 150 size_t workersIterator_ = {0UL}; 151 152 /// Represents count of task that sleeps 153 std::atomic_size_t waitWorkersCount_ {0UL}; 154 155 /// task_scheduler_state_lock_ is used to check state of task 156 os::memory::RecursiveMutex mutable taskSchedulerStateLock_; 157 158 /** 159 * queues_wait_cond_var_ is used when all registered queues are empty to wait until one of them will have a 160 * task 161 */ 162 os::memory::ConditionVariable queuesWaitCondVar_ GUARDED_BY(taskSchedulerStateLock_); 163 164 /// start_ is true if we used Initialize method 165 std::atomic_bool start_ {false}; 166 167 /// finish_ is true when TaskScheduler finish Workers and TaskQueues GUARDED_BY(taskSchedulerStateLock_)168 bool finish_ GUARDED_BY(taskSchedulerStateLock_) {false}; 169 170 std::atomic_size_t waitToFinish_ {0UL}; 171 std::vector<WorkerThread *> workersToDelete_; 172 }; 173 174 } // namespace ark::taskmanager::internal 175 176 #endif // PANDA_LIBPANDABASE_TASKMANAGER_TASK_MANAGER_H 177