1 /* 2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef PANDA_LIBPANDABASE_TASKMANAGER_WORKER_THREAD_H 17 #define PANDA_LIBPANDABASE_TASKMANAGER_WORKER_THREAD_H 18 19 #include "libpandabase/taskmanager/schedulable_task_queue_interface.h" 20 #include "libpandabase/taskmanager/utils/worker_thread_local_queue.h" 21 #include "libpandabase/taskmanager/utils/task_selector.h" 22 #include "libpandabase/os/mutex.h" 23 #include "libpandabase/os/thread.h" 24 #include <thread> 25 26 namespace ark::taskmanager { 27 28 using TaskPropertiesCounterMap = std::unordered_map<TaskProperties, size_t, TaskProperties::Hash>; 29 30 class TaskScheduler; 31 32 class WorkerThread { 33 public: 34 NO_COPY_SEMANTIC(WorkerThread); 35 NO_MOVE_SEMANTIC(WorkerThread); 36 37 static constexpr size_t WORKER_QUEUE_SIZE = 4UL; 38 39 /// @brief functor that should add task in worker 40 using AddTaskToWorkerFunc = std::function<void(Task &&)>; 41 42 explicit WorkerThread(const std::string &name); 43 ~WorkerThread(); 44 45 /** 46 * @brief Adds task in internal queues. 47 * @param task - task that will be added in internal queues 48 */ 49 void AddTask(Task &&task); 50 51 /// @brief Returns true if all internal queues are empty 52 bool IsEmpty() const; 53 54 /// @brief Returns count of tasks in local queue 55 size_t Size() const; 56 57 /// @brief Returns count of task in local queue with specified properties 58 size_t CountOfTasksWithProperties(TaskProperties properties) const; 59 60 /// @brief Waits for worker finish 61 void Join(); 62 63 /** 64 * @brief register all workers in local queue 65 * @param workers - ref to vector with all workers 66 */ 67 void RegisterAllWorkersInLocalQueue(const std::vector<WorkerThread *> &workers); 68 69 std::string GetWorkerName() const; 70 71 /** 72 * @brief method returns id of worker to pop tasks. 73 * @param worker: pointer on WorkerThread which id you want to get. It should be added with 74 * RegisterAllWorkersInLocalQueue(...) method 75 */ 76 size_t GetLocalWorkerQueuePopId(WorkerThread *worker) const; 77 78 /// @brief method returns id of TaskScheduler. 79 size_t GetLocalWorkerQueueSchedulerPopId() const; 80 81 /// @brief method starts WorkerLoop. All workers should be registered before Start executing 82 void Start(); 83 84 void SetStolenTask(Task &&stolenTask); 85 86 /** 87 * @brief Fills with tasks other WorkerThread. 88 * @tparam Properties: variadic template class of TaskProperties that represent using of properties to pop 89 * @param addTaskFunc - functor that should add new task in other WorkerThread 90 * @param prop - TaskProperties of task is wanted to be fill with 91 * @param taskCount - Count of tasks wanted to pop 92 * @param id: id worker got after registration 93 * @returns count of tasks that was added 94 */ 95 template <class... Properties> GiveTasksToAnotherWorker(const AddTaskToWorkerFunc & addTaskFunc,size_t taskCount,size_t id,Properties...prop)96 size_t PANDA_PUBLIC_API GiveTasksToAnotherWorker(const AddTaskToWorkerFunc &addTaskFunc, size_t taskCount, 97 size_t id, Properties... prop) 98 { 99 static_assert(sizeof...(prop) < 2UL, "it's possible to have only one prop arg or no one at all"); 100 101 size_t count = 0; 102 for (; count != taskCount; count++) { 103 // Try to pop task 104 std::optional<Task> task; 105 if constexpr (sizeof...(prop) == 0) { 106 task = localQueue_.Pop(id); 107 } else { 108 static_assert(std::is_same<std::tuple_element_t<0U, std::tuple<Properties...>>, TaskProperties>::value); 109 task = localQueue_.Pop(id, std::get<TaskProperties>(std::tuple(prop...))); 110 } 111 // If pop task returned nullopt need to finish execution 112 if (UNLIKELY(!task.has_value())) { 113 break; 114 } 115 addTaskFunc(std::move(task.value())); 116 } 117 return count; 118 } 119 120 void TryDeleteRetiredPtrs(); 121 122 private: 123 void ExecuteTask(Task *task); 124 125 /// @brief Main workers algorithm 126 void WorkerLoop(); 127 128 /** 129 * @brief pops and executes all tasks from internal queue. 130 * Also counts executed tasks in finishedTasksCounterMap_. 131 */ 132 size_t ExecuteTasksFromLocalQueue(); 133 134 /// @brief method executes task from stolenTask_ field 135 void ExecuteStolenTask(); 136 137 /** 138 * @brief method wait for starting 139 * @see Start 140 */ 141 void WaitForStart(); 142 143 std::thread *thread_ = nullptr; 144 TaskScheduler *scheduler_ = nullptr; 145 std::string name_; 146 147 bool start_ {false}; 148 os::memory::Mutex startWaitLock_; 149 os::memory::ConditionVariable startWaitCondVar_ GUARDED_BY(startWaitLock_); 150 151 /** 152 * @brief Here should be saved task that was stolen from other workers local queue 153 * @see SetStolenTask 154 * @see ExecuteStolenTask 155 */ 156 Task stolenTask_; 157 158 /** 159 * @brief finishedTasksCounterMap_: map that consider info about executed tasks in one WorkerLoop iteration. Is used 160 * to notify TaskStatistics. 161 * @see ExecuteTasksFromLocalQueue 162 */ 163 TaskPropertiesCounterMap finishedTasksCounterMap_; 164 165 /** 166 * @brief localQueue_ is set of lock-free queues with registration of consumers. Registration uses for correct 167 * memory free. 168 * @see RegisterAllWorkersInLocalQueue 169 * @see GetLocalWorkerQueuePopId 170 * @see perWorkerPopId_ 171 */ 172 internal::WorkerThreadLocalQueue<WORKER_QUEUE_SIZE> localQueue_; 173 174 /** 175 * @brief perWorkerPopId_: is map that uses when one worker wants to pop task from other worker's local queue. 176 * @see RegisterAllWorkersInLocalQueue 177 * @see GiveTasksToAnotherWorker 178 */ 179 std::unordered_map<WorkerThread *, size_t> perWorkerPopId_; 180 181 /** 182 * @brief schedulerPopId_: specific id for TaskScheduler. TaskScheduler uses it when Helper tries to steal task. 183 * @see GetLocalWorkerQueueSchedulerPopId 184 * @see TaskScheduler::StealTaskFromOtherWorker 185 */ 186 size_t schedulerPopId_ {0}; 187 188 size_t countOfExecutedTask_ {0}; 189 }; 190 191 } // namespace ark::taskmanager 192 193 #endif // PANDA_LIBPANDABASE_TASKMANAGER_WORKER_THREAD_H 194