• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef PANDA_LIBPANDABASE_TASKMANAGER_WORKER_THREAD_H
17 #define PANDA_LIBPANDABASE_TASKMANAGER_WORKER_THREAD_H
18 
19 #include "libpandabase/taskmanager/schedulable_task_queue_interface.h"
20 #include "libpandabase/taskmanager/utils/worker_thread_local_queue.h"
21 #include "libpandabase/taskmanager/utils/task_selector.h"
22 #include "libpandabase/os/mutex.h"
23 #include "libpandabase/os/thread.h"
24 #include <thread>
25 
26 namespace ark::taskmanager {
27 
28 using TaskPropertiesCounterMap = std::unordered_map<TaskProperties, size_t, TaskProperties::Hash>;
29 
30 class TaskScheduler;
31 
32 class WorkerThread {
33 public:
34     NO_COPY_SEMANTIC(WorkerThread);
35     NO_MOVE_SEMANTIC(WorkerThread);
36 
37     static constexpr size_t WORKER_QUEUE_SIZE = 4UL;
38 
39     /// @brief functor that should add task in worker
40     using AddTaskToWorkerFunc = std::function<void(Task &&)>;
41 
42     explicit WorkerThread(const std::string &name);
43     ~WorkerThread();
44 
45     /**
46      * @brief Adds task in internal queues.
47      * @param task - task that will be added in internal queues
48      */
49     void AddTask(Task &&task);
50 
51     /// @brief Returns true if all internal queues are empty
52     bool IsEmpty() const;
53 
54     /// @brief Returns count of tasks in local queue
55     size_t Size() const;
56 
57     /// @brief Returns count of task in local queue with specified properties
58     size_t CountOfTasksWithProperties(TaskProperties properties) const;
59 
60     /// @brief Waits for worker finish
61     void Join();
62 
63     /**
64      * @brief register all workers in local queue
65      * @param workers - ref to vector with all workers
66      */
67     void RegisterAllWorkersInLocalQueue(const std::vector<WorkerThread *> &workers);
68 
69     std::string GetWorkerName() const;
70 
71     /**
72      * @brief method returns id of worker to pop tasks.
73      * @param worker: pointer on WorkerThread which id you want to get. It should be added with
74      * RegisterAllWorkersInLocalQueue(...) method
75      */
76     size_t GetLocalWorkerQueuePopId(WorkerThread *worker) const;
77 
78     /// @brief method returns id of TaskScheduler.
79     size_t GetLocalWorkerQueueSchedulerPopId() const;
80 
81     /// @brief method starts WorkerLoop. All workers should be registered before Start executing
82     void Start();
83 
84     void SetStolenTask(Task &&stolenTask);
85 
86     /**
87      * @brief Fills with tasks other WorkerThread.
88      * @tparam Properties: variadic template class of TaskProperties that represent using of properties to pop
89      * @param addTaskFunc - functor that should add new task in other WorkerThread
90      * @param prop - TaskProperties of task is wanted to be fill with
91      * @param taskCount - Count of tasks wanted to pop
92      * @param id: id worker got after registration
93      * @returns count of tasks that was added
94      */
95     template <class... Properties>
GiveTasksToAnotherWorker(const AddTaskToWorkerFunc & addTaskFunc,size_t taskCount,size_t id,Properties...prop)96     size_t PANDA_PUBLIC_API GiveTasksToAnotherWorker(const AddTaskToWorkerFunc &addTaskFunc, size_t taskCount,
97                                                      size_t id, Properties... prop)
98     {
99         static_assert(sizeof...(prop) < 2UL, "it's possible to have only one prop arg or no one at all");
100 
101         size_t count = 0;
102         for (; count != taskCount; count++) {
103             // Try to pop task
104             std::optional<Task> task;
105             if constexpr (sizeof...(prop) == 0) {
106                 task = localQueue_.Pop(id);
107             } else {
108                 static_assert(std::is_same<std::tuple_element_t<0U, std::tuple<Properties...>>, TaskProperties>::value);
109                 task = localQueue_.Pop(id, std::get<TaskProperties>(std::tuple(prop...)));
110             }
111             // If pop task returned nullopt need to finish execution
112             if (UNLIKELY(!task.has_value())) {
113                 break;
114             }
115             addTaskFunc(std::move(task.value()));
116         }
117         return count;
118     }
119 
120     void TryDeleteRetiredPtrs();
121 
122 private:
123     void ExecuteTask(Task *task);
124 
125     /// @brief Main workers algorithm
126     void WorkerLoop();
127 
128     /**
129      * @brief pops and executes all tasks from internal queue.
130      * Also counts executed tasks in finishedTasksCounterMap_.
131      */
132     size_t ExecuteTasksFromLocalQueue();
133 
134     /// @brief method executes task from stolenTask_ field
135     void ExecuteStolenTask();
136 
137     /**
138      * @brief method wait for starting
139      * @see Start
140      */
141     void WaitForStart();
142 
143     std::thread *thread_ = nullptr;
144     TaskScheduler *scheduler_ = nullptr;
145     std::string name_;
146 
147     bool start_ {false};
148     os::memory::Mutex startWaitLock_;
149     os::memory::ConditionVariable startWaitCondVar_ GUARDED_BY(startWaitLock_);
150 
151     /**
152      * @brief Here should be saved task that was stolen from other workers local queue
153      * @see SetStolenTask
154      * @see ExecuteStolenTask
155      */
156     Task stolenTask_;
157 
158     /**
159      * @brief finishedTasksCounterMap_: map that consider info about executed tasks in one WorkerLoop iteration. Is used
160      * to notify TaskStatistics.
161      * @see ExecuteTasksFromLocalQueue
162      */
163     TaskPropertiesCounterMap finishedTasksCounterMap_;
164 
165     /**
166      * @brief localQueue_ is set of lock-free queues with registration of consumers. Registration uses for correct
167      * memory free.
168      * @see RegisterAllWorkersInLocalQueue
169      * @see GetLocalWorkerQueuePopId
170      * @see perWorkerPopId_
171      */
172     internal::WorkerThreadLocalQueue<WORKER_QUEUE_SIZE> localQueue_;
173 
174     /**
175      * @brief perWorkerPopId_: is map that uses when one worker wants to pop task from other worker's local queue.
176      * @see RegisterAllWorkersInLocalQueue
177      * @see GiveTasksToAnotherWorker
178      */
179     std::unordered_map<WorkerThread *, size_t> perWorkerPopId_;
180 
181     /**
182      * @brief schedulerPopId_: specific id for TaskScheduler. TaskScheduler uses it when Helper tries to steal task.
183      * @see GetLocalWorkerQueueSchedulerPopId
184      * @see TaskScheduler::StealTaskFromOtherWorker
185      */
186     size_t schedulerPopId_ {0};
187 
188     size_t countOfExecutedTask_ {0};
189 };
190 
191 }  // namespace ark::taskmanager
192 
193 #endif  // PANDA_LIBPANDABASE_TASKMANAGER_WORKER_THREAD_H
194