• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef PANDA_LIBPANDABASE_TASKMANAGER_TASK_MANAGER_H
17 #define PANDA_LIBPANDABASE_TASKMANAGER_TASK_MANAGER_H
18 
19 #include "libpandabase/taskmanager/task_queue.h"
20 #include "libpandabase/taskmanager/utils/wait_list.h"
21 #include "libpandabase/taskmanager/worker_thread.h"
22 #include "libpandabase/taskmanager/utils/task_time_stats.h"
23 #include <vector>
24 #include <map>
25 #include <queue>
26 
27 namespace ark::taskmanager {
28 class TaskQueueInterface;
29 }  // namespace ark::taskmanager
30 
31 namespace ark::taskmanager::internal {
32 
33 class TaskScheduler {
34 public:
35     TaskScheduler(size_t workersCount, TaskWaitList *waitList, TaskQueueSet *queueSet);
36     PANDA_PUBLIC_API ~TaskScheduler();
37     NO_COPY_SEMANTIC(TaskScheduler);
38     NO_MOVE_SEMANTIC(TaskScheduler);
39 
40     static constexpr uint64_t TASK_WAIT_TIMEOUT = 1U;
41 
42     using LocalTaskQueue = std::queue<Task>;
43 
44     /// @brief Returns true if TaskScheduler outputs log info
45     PANDA_PUBLIC_API bool IsTaskLifetimeStatisticsUsed() const;
46 
47     /**
48      * @brief Fills @arg worker (local queues) with tasks. The number of tasks obtained depends on the max size of the
49      * worker's local queue and the number of workers. The algorithm strives to give the same number of tasks to all
50      * workers. If queues are empty and TaskScheduler is not destroying workers will wait.  If it's true, workers should
51      * finish after the execution of tasks.
52      * @param worker - pointer on worker that should be fill will tasks.
53      * @returns true if Worker should finish loop execution, otherwise returns false
54      */
55     bool FillWithTasks(WorkerThread *worker);
56 
57     /**
58      * @brief Method steal task from worker with the largest number of tasks and push it to gotten worker.
59      * @param worker: pointer to WorkerThread that should be fill with stollen task
60      */
61     size_t StealTaskFromOtherWorker(WorkerThread *taskReceiver);
62 
63     /// @brief Checks if task queues are empty
64     bool AreQueuesEmpty() const;
65 
66     /// @brief Checks if worker local queues are empty
67     bool AreWorkersEmpty() const;
68 
69     /**
70      * @brief Executes tasks with specific properties. It will get them from queue or steal from workers.
71      * @param properties - TaskProperties of tasks needs to help
72      * @returns real count of tasks that was executed
73      */
74     PANDA_PUBLIC_API size_t HelpWorkersWithTasks(TaskQueueInterface *queue);
75 
76     /**
77      * @brief Adds the task to the wait list with timeout. After the timeout expires, the task will be added
78      * to its corresponding TaskQueue.
79      * @param task: instance of task
80      * @param time: waiting time in milliseconds
81      * @returns unique waiter id. It can be used to signal wait list to add task to TaskQueue
82      */
83     PANDA_PUBLIC_API WaiterId AddTaskToWaitListWithTimeout(Task &&task, uint64_t time);
84 
85     /**
86      * @brief Adds the task to the wait list.
87      * @param task: instance of task
88      * @returns unique waiter id. It can be used to signal wait list to add task to TaskQueue
89      * @see TaskScheduler::SignalWaitList
90      */
91     PANDA_PUBLIC_API WaiterId AddTaskToWaitList(Task &&task);
92 
93     /**
94      * @brief Signals wait list to add task in TaskQueue.
95      * @param waiterId: unique waiter id
96      * @see TaskScheduler::AddTaskToWaitListWithTimeout, TaskScheduler::AddTaskToWaitList
97      */
98     PANDA_PUBLIC_API void SignalWaitList(WaiterId waiterId);
99 
100     PANDA_PUBLIC_API size_t GetCountOfTasksInSystem() const;
101     /// @brief Method signals workers if it's needed
102     void SignalWorkers();
103 
104     PANDA_PUBLIC_API size_t GetCountOfWorkers() const;
105     PANDA_PUBLIC_API void SetCountOfWorkers(size_t count);
106 
107 private:
108     /**
109      * @brief Method get and execute tasks with specified properties. If there are no tasks with that properties method
110      * will return nullopt.
111      * @param properties - TaskProperties of task we want to get.
112      * @returns real count of gotten tasks
113      */
114     size_t GetAndExecuteSetOfTasksFromQueue(TaskQueueInterface *properties);
115 
116     /**
117      * @brief Method steal and execute one task from one Worker. Method will find worker the largest number of tasks,
118      * steal one from it and execute.
119      * @param properties - TaskProperties of tasks needs to help
120      * @returns 1 if stealing was done successfully
121      */
122     size_t StealAndExecuteOneTaskFromWorkers(TaskQueueInterface *properties);
123 
124     size_t PutTasksInWorker(WorkerThread *worker, internal::SchedulableTaskQueueInterface *queue);
125 
126     /// @brief Checks if there are no tasks in queues and workers
127     bool AreNoMoreTasks() const;
128 
129     void PutWaitTaskInLocalQueue(LocalTaskQueue &queue) REQUIRES(taskSchedulerStateLock_);
130 
131     void PutTaskInTaskQueues(LocalTaskQueue &queue);
132 
133     /**
134      * @brief Method waits until new tasks coming or finishing of Task Scheduler usage
135      * @return true if TaskScheduler have tasks to manager
136      */
137     bool WaitUntilNewTasks(WorkerThread *worker);
138 
139     size_t ProcessWaitList();
140 
141     TaskWaitList *waitList_ = nullptr;
142     TaskQueueSet *queueSet_ = nullptr;
143     std::atomic_bool waitListIsProcessing_ {false};
144 
145     /// Pointers to Worker Threads.
146     std::array<std::atomic<WorkerThread *>, MAX_WORKER_COUNT> workers_ {};
147     std::atomic_size_t workersCount_ = {0UL};
148 
149     /// Iterator for workers_ to balance stealing
150     size_t workersIterator_ = {0UL};
151 
152     /// Represents count of task that sleeps
153     std::atomic_size_t waitWorkersCount_ {0UL};
154 
155     /// task_scheduler_state_lock_ is used to check state of task
156     os::memory::RecursiveMutex mutable taskSchedulerStateLock_;
157 
158     /**
159      * queues_wait_cond_var_ is used when all registered queues are empty to wait until one of them will have a
160      * task
161      */
162     os::memory::ConditionVariable queuesWaitCondVar_ GUARDED_BY(taskSchedulerStateLock_);
163 
164     /// start_ is true if we used Initialize method
165     std::atomic_bool start_ {false};
166 
167     /// finish_ is true when TaskScheduler finish Workers and TaskQueues
GUARDED_BY(taskSchedulerStateLock_)168     bool finish_ GUARDED_BY(taskSchedulerStateLock_) {false};
169 
170     std::atomic_size_t waitToFinish_ {0UL};
171     std::vector<WorkerThread *> workersToDelete_;
172 };
173 
174 }  // namespace ark::taskmanager::internal
175 
176 #endif  // PANDA_LIBPANDABASE_TASKMANAGER_TASK_MANAGER_H
177