• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef PANDA_LIBPANDABASE_TASKMANAGER_TASK_MANAGER_H
17 #define PANDA_LIBPANDABASE_TASKMANAGER_TASK_MANAGER_H
18 
19 #include "libpandabase/taskmanager/task_queue.h"
20 #include "libpandabase/taskmanager/utils/wait_list.h"
21 #include "libpandabase/taskmanager/worker_thread.h"
22 #include "libpandabase/taskmanager/utils/task_time_stats.h"
23 #include <vector>
24 #include <map>
25 #include <queue>
26 
27 namespace ark::taskmanager {
28 /**
29  * Task Manager can register 3 queues with different type of tasks
30  *  - GC queue(ECMA)
31  *  - GC queue(ArkTS)
32  *  - JIT queue
33  */
34 class TaskScheduler {
35     using TaskPropertiesCounterMap = std::unordered_map<TaskProperties, size_t, TaskProperties::Hash>;
36 
37 public:
38     NO_COPY_SEMANTIC(TaskScheduler);
39     NO_MOVE_SEMANTIC(TaskScheduler);
40 
41     static constexpr uint64_t TASK_WAIT_TIMEOUT = 1U;
42 
43     using LocalTaskQueue = std::queue<Task>;
44 
45     /**
46      * @brief Creates an instance of TaskScheduler.
47      * @param threadsCount - number of worker that will be created be Task Manager
48      * @param taskStatsType - type of TaskStatistics that will be used in TaskScheduler
49      */
50     PANDA_PUBLIC_API static TaskScheduler *Create(size_t threadsCount,
51                                                   TaskTimeStatsType taskStatsType = TaskTimeStatsType::NO_STATISTICS);
52 
53     /**
54      * @brief Returns the pointer to TaskScheduler. If you use it before the Create or after Destroy methods, it
55      * will return nullptr.
56      */
57     [[nodiscard]] PANDA_PUBLIC_API static TaskScheduler *GetTaskScheduler();
58 
59     /// @brief Deletes the existed TaskScheduler. You should not use it if you didn't use Create before.
60     PANDA_PUBLIC_API static void Destroy();
61 
62     /// @brief Returns true if TaskScheduler outputs log info
63     PANDA_PUBLIC_API bool IsTaskLifetimeStatisticsUsed() const;
64 
65     TaskTimeStatsBase *GetTaskTimeStats() const;
66     TaskTimeStatsType GetTaskTimeStatsType() const;
67 
68     /**
69      * @brief Creates and starts workers with registered queues. After this method, you can not register new
70      * queues.
71      */
72     PANDA_PUBLIC_API void Initialize();
73 
74     /**
75      * @brief Method allocates, constructs and registers TaskQueue. If it already exists, method returns nullptr.
76      * @param taskType - TaskType of future TaskQueue.
77      * @param vmType - VMType of future TaskQueue.
78      * @param priority - value of priority:
79      * TaskQueueInterface::MIN_PRIORITY <= priority <= TaskQueueInterface::MIN_PRIORITY
80      * @tparam Allocator - allocator of Task that will be used in internal queues. By default is used
81      * std::allocator<Task>
82      */
83     template <class Allocator = std::allocator<Task>>
84     PANDA_PUBLIC_API TaskQueueInterface *CreateAndRegisterTaskQueue(
85         TaskType taskType, VMType vmType, uint8_t priority = TaskQueueInterface::DEFAULT_PRIORITY)
86     {
87         auto *queue = internal::TaskQueue<Allocator>::Create(taskType, vmType, priority);
88         if (UNLIKELY(queue == nullptr)) {
89             return nullptr;
90         }
91         auto id = RegisterQueue(queue);
92         if (UNLIKELY(id == INVALID_TASKQUEUE_ID)) {
93             internal::TaskQueue<Allocator>::Destroy(queue);
94             return nullptr;
95         }
96         return queue;
97     }
98     /**
99      * @brief Method Destroy and Unregister TaskQueue
100      * @param queue - TaskQueueInterface* of TaskQueue.
101      * @tparam Allocator - allocator of Task that will be used to deallocate TaskQueue. Use the same allocator as
102      * you have used in TaskScheduler::CreateAndRegisterTaskQueue method.
103      */
104     template <class Allocator = std::allocator<Task>>
UnregisterAndDestroyTaskQueue(TaskQueueInterface * queue)105     PANDA_PUBLIC_API void UnregisterAndDestroyTaskQueue(TaskQueueInterface *queue)
106     {
107         TaskQueueId id(queue->GetTaskType(), queue->GetVMType());
108         auto *schedulableQueue = taskQueues_[id];
109 
110         schedulableQueue->UnsetCallbacks();
111         taskQueues_.erase(id);
112         internal::TaskQueue<Allocator>::Destroy(schedulableQueue);
113     }
114 
115     /**
116      * @brief Fills @arg worker (local queues) with tasks. The number of tasks obtained depends on the max size of the
117      * worker's local queue and the number of workers. The algorithm strives to give the same number of tasks to all
118      * workers. If queues are empty and TaskScheduler is not destroying workers will wait.  If it's true, workers should
119      * finish after the execution of tasks.
120      * @param worker - pointer on worker that should be fill will tasks.
121      * @returns true if Worker should finish loop execution, otherwise returns false
122      */
123     bool FillWithTasks(WorkerThread *worker);
124 
125     /**
126      * @brief Method steal task from worker with the largest number of tasks and push it to gotten worker.
127      * @param worker: pointer to WorkerThread that should be fill with stollen task
128      */
129     void StealTaskFromOtherWorker(WorkerThread *taskReceiver);
130 
131     /// @brief Checks if task queues are empty
132     bool AreQueuesEmpty() const;
133 
134     /// @brief Checks if worker local queues are empty
135     bool AreWorkersEmpty() const;
136 
137     /**
138      * @brief Method increment counter of finished tasks and signal Finalize waiter
139      * @param counterMap - map from id to count of finished tasks
140      * @return count of executed tasks
141      */
142     size_t IncrementCounterOfExecutedTasks(const TaskPropertiesCounterMap &counterMap);
143 
144     /**
145      * @brief Executes tasks with specific properties. It will get them from queue or steal from workers.
146      * @param properties - TaskProperties of tasks needs to help
147      * @returns real count of tasks that was executed
148      */
149     PANDA_PUBLIC_API size_t HelpWorkersWithTasks(TaskProperties properties);
150 
151     /**
152      * @brief Method waits all tasks with specified properties. This method should be used only from Main Thread and
153      * only for finalization!
154      * @param properties - TaskProperties of tasks we will wait to be completed.
155      */
156     PANDA_PUBLIC_API void WaitForFinishAllTasksWithProperties(TaskProperties properties);
157 
158     /**
159      * @brief Adds the task to the wait list with timeout. After the timeout expires, the task will be added
160      * to its corresponding TaskQueue.
161      * @param task: instance of task
162      * @param time: waiting time in milliseconds
163      * @returns unique waiter id. It can be used to signal wait list to add task to TaskQueue
164      */
165     PANDA_PUBLIC_API WaiterId AddTaskToWaitListWithTimeout(Task &&task, uint64_t time);
166 
167     /**
168      * @brief Adds the task to the wait list.
169      * @param task: instance of task
170      * @returns unique waiter id. It can be used to signal wait list to add task to TaskQueue
171      * @see TaskScheduler::SignalWaitList
172      */
173     PANDA_PUBLIC_API WaiterId AddTaskToWaitList(Task &&task);
174 
175     /**
176      * @brief Signals wait list to add task in TaskQueue.
177      * @param waiterId: unique waiter id
178      * @see TaskScheduler::AddTaskToWaitListWithTimeout, TaskScheduler::AddTaskToWaitList
179      */
180     PANDA_PUBLIC_API void SignalWaitList(WaiterId waiterId);
181 
182     /// @brief This method indicates that workers can no longer wait for new tasks and be completed.
183     PANDA_PUBLIC_API void Finalize();
184 
185     PANDA_PUBLIC_API ~TaskScheduler();
186 
187 private:
188     explicit TaskScheduler(size_t workersCount, TaskTimeStatsType taskTimeStatsType);
189 
190     /**
191      * @brief Method get and execute tasks with specified properties. If there are no tasks with that properties method
192      * will return nullopt.
193      * @param properties - TaskProperties of task we want to get.
194      * @returns real count of gotten tasks
195      */
196     size_t GetAndExecuteSetOfTasksFromQueue(TaskProperties properties);
197 
198     /**
199      * @brief Method steal and execute one task from one Worker. Method will find worker the largest number of tasks,
200      * steal one from it and execute.
201      * @param properties - TaskProperties of tasks needs to help
202      * @returns 1 if stealing was done successfully
203      */
204     size_t StealAndExecuteOneTaskFromWorkers(TaskProperties properties);
205 
206     /**
207      * @brief Registers a queue that was created externally. It should be valid until all workers finish, and the
208      * queue should have a unique set of TaskType and VMType fields. You can not use this method after Initialize()
209      * method
210      * @param queue: pointer to a valid TaskQueue.
211      * @return TaskQueueId of queue that was added. If queue with same TaskType and VMType is already added, method
212      * returns INVALID_TASKQUEUE_ID
213      */
214     PANDA_PUBLIC_API TaskQueueId RegisterQueue(internal::SchedulableTaskQueueInterface *queue);
215 
216     /**
217      * @brief Method pops one task from internal queues based on priorities.
218      * @return if queue are empty, returns nullopt, otherwise returns task.
219      */
220     [[nodiscard]] std::optional<Task> GetNextTask() REQUIRES(popFromTaskQueuesLock_);
221 
222     /// @brief Checks if there are no tasks in queues and workers
223     bool AreNoMoreTasks() const;
224 
225     /**
226      * @brief Method puts tasks to @arg worker. Queue and count of tasks depends on selectedQueues_. After
227      * execution of the method selectedQueues_ will be empty.
228      * @param worker - pointer on worker that should be fill with tasks
229      * @param selectedQueue - count of selected tasks for all TaskQueueId
230      * @return count of task that was gotten by worker.
231      */
232     size_t PutTasksInWorker(WorkerThread *worker, TaskQueueId selectedQueue);
233 
234     /**
235      * @brief Method increment counter of new tasks and signal worker
236      * @param properties - TaskProperties of task from queue that execute the callback
237      * @param ivalue - the value by which the counter will be increased
238      */
239     void IncrementCounterOfAddedTasks(TaskProperties properties, size_t ivalue);
240 
241     /// @brief Method signals workers if it's needed
242     void SignalWorkers();
243 
244     void IncrementCountOfTasksInSystem(TaskProperties prop, size_t count);
245 
246     void DecrementCountOfTasksInSystem(TaskProperties prop, size_t count);
247 
248     size_t GetCountOfTasksInSystemWithTaskProperties(TaskProperties prop) const;
249 
250     size_t GetCountOfTasksInSystem() const;
251 
252     internal::SchedulableTaskQueueInterface *GetQueue(TaskQueueId id) const;
253 
254     void PutWaitTaskInLocalQueue(LocalTaskQueue &queue) REQUIRES(taskSchedulerStateLock_);
255 
256     void PutTaskInTaskQueues(LocalTaskQueue &queue);
257 
258     /**
259      * @brief Method waits until new tasks coming or finishing of Task Scheduler usage
260      * @return true if TaskScheduler have tasks to manager
261      */
262     bool WaitUntilNewTasks() REQUIRES(taskSchedulerStateLock_);
263 
264     static TaskScheduler *instance_;
265 
266     size_t workersCount_;
267 
268     /// Pointers to Worker Threads.
269     std::vector<WorkerThread *> workers_;
270 
271     /// Iterator for workers_ to balance stealing
272     size_t workersIterator_ = {0UL};
273 
274     /// pop_from_task_queues_lock_ is used to guard popping from queues
275     os::memory::Mutex popFromTaskQueuesLock_;
276 
277     /// Represents count of task that sleeps
278     std::atomic_size_t waitWorkersCount_ {0UL};
279 
280     /**
281      * Map from TaskType and VMType to queue.
282      * Can be changed only before Initialize methods.
283      * Since we can change the map only before creating the workers, we do not need to synchronize access after
284      * Initialize method
285      */
286     std::map<TaskQueueId, internal::SchedulableTaskQueueInterface *> taskQueues_;
287 
288     /// task_scheduler_state_lock_ is used to check state of task
289     os::memory::RecursiveMutex taskSchedulerStateLock_;
290 
291     /**
292      * queues_wait_cond_var_ is used when all registered queues are empty to wait until one of them will have a
293      * task
294      */
295     os::memory::ConditionVariable queuesWaitCondVar_ GUARDED_BY(taskSchedulerStateLock_);
296 
297     /**
298      * This cond var uses to wait for all tasks will be done.
299      * It is used in Finalize() method.
300      */
301     os::memory::ConditionVariable finishTasksCondVar_ GUARDED_BY(taskSchedulerStateLock_);
302 
303     /// start_ is true if we used Initialize method
304     std::atomic_bool start_ {false};
305 
306     /// finish_ is true when TaskScheduler finish Workers and TaskQueues
GUARDED_BY(taskSchedulerStateLock_)307     bool finish_ GUARDED_BY(taskSchedulerStateLock_) {false};
308 
309     /// newTasksCount_ represents count of new tasks
310     TaskPropertiesCounterMap newTasksCount_ GUARDED_BY(taskSchedulerStateLock_);
311 
312     std::atomic_size_t waitToFinish_ {0UL};
313 
314     std::atomic_bool disableHelpers_ {false};
315 
316     /**
317      * finishedTasksCount_ represents count of finished tasks;
318      * Task is finished if:
319      * - it was executed by Worker;
320      * - it was gotten by main thread;
321      */
322     TaskPropertiesCounterMap finishedTasksCount_ GUARDED_BY(taskSchedulerStateLock_);
323 
324     /**
325      * Represents count of tasks that exist in TaskScheduler system per TaskProperties. Task in system means that task
326      * was added but wasn't executed or popped.
327      */
328     std::unordered_map<TaskProperties, std::atomic_size_t, TaskProperties::Hash> countOfTasksInSystem_;
329 
330     TaskTimeStatsBase *taskTimeStats_ = nullptr;
331     TaskTimeStatsType taskTimeStatsType_;
332 
333     internal::TaskSelector selector_;
334 
335     WaitList<Task> waitList_ GUARDED_BY(taskSchedulerStateLock_);
336 };
337 
338 }  // namespace ark::taskmanager
339 
340 #endif  // PANDA_LIBPANDABASE_TASKMANAGER_TASK_MANAGER_H
341