1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sched/task_scheduler.h"
17 #include <random>
18 #include "eu/execute_unit.h"
19 #include "util/ffrt_facade.h"
20
21 namespace {
22 constexpr std::size_t LOCAL_QUEUE_SIZE = 128;
23 constexpr int INSERT_GLOBAL_QUEUE_FREQ = 5;
24 constexpr int GLOBAL_INTERVAL = 60;
25 constexpr int UV_TASK_MAX_CONCURRENCY = 8;
26 constexpr unsigned int BUDGET = 10;
27
InsertTask(void * task)28 void InsertTask(void *task)
29 {
30 ffrt::TaskBase *baseTask = reinterpret_cast<ffrt::TaskBase *>(task);
31 ffrt::FFRTFacade::GetSchedInstance()->GetScheduler(baseTask->qos_).PushTaskGlobal(baseTask);
32 }
33 } // namespace
34
35 namespace ffrt {
36 int PLACE_HOLDER = 0;
PopTask()37 TaskBase *TaskScheduler::PopTask()
38 {
39 TaskBase *task = nullptr;
40 if (GetTaskSchedMode() == TaskSchedMode::LOCAL_TASK_SCHED_MODE) {
41 task = PopTaskLocalOrPriority();
42 if (task == nullptr) {
43 StealTask();
44 bool stealSuccees = false;
45 if (GetLocalTaskCnt() > 0) {
46 stealSuccees = true;
47 task = reinterpret_cast<TaskBase *>(GetLocalQueue()->PopHead());
48 }
49 unsigned int *workerTickPtr = *GetWorkerTick();
50 if (stealSuccees && workerTickPtr != nullptr) {
51 *workerTickPtr = 1;
52 }
53 }
54 } else if (GetTaskSchedMode() == TaskSchedMode::DEFAULT_TASK_SCHED_MODE) {
55 task = PopTaskGlobal();
56 }
57 if (task) {
58 task->Pop();
59 }
60 return task;
61 }
62
GetLocalQueue()63 SpmcQueue *TaskScheduler::GetLocalQueue()
64 {
65 thread_local static LocalQueue localQueue(qos, this->localQueues);
66 return localQueue.localQueue;
67 }
68
GetPriorityTask()69 void **TaskScheduler::GetPriorityTask()
70 {
71 thread_local void *priorityTask{nullptr};
72 return &priorityTask;
73 }
74
GetWorkerTick()75 unsigned int **TaskScheduler::GetWorkerTick()
76 {
77 thread_local unsigned int *workerTick{nullptr};
78 return &workerTick;
79 }
80
StealTask()81 int TaskScheduler::StealTask()
82 {
83 std::lock_guard<std::mutex> lock(*GetMutex());
84 stealingInProgress = true;
85 std::unordered_map<pid_t, SpmcQueue *>::iterator iter = localQueues.begin();
86 while (iter != localQueues.end()) {
87 SpmcQueue* queue = iter->second;
88 unsigned int queueLen = queue->GetLength();
89 if (queue != GetLocalQueue() && queueLen > 0) {
90 unsigned int popLen = queue->PopHeadToAnotherQueue(*GetLocalQueue(), (queueLen + 1) / 2, InsertTask);
91 stealingInProgress = false;
92 return popLen;
93 }
94 iter++;
95 }
96 stealingInProgress = false;
97 return 0;
98 }
99
RemoveLocalQueue(SpmcQueue * localQueue)100 void TaskScheduler::RemoveLocalQueue(SpmcQueue *localQueue)
101 {
102 if (localQueue != nullptr) {
103 localQueues.erase(syscall(SYS_gettid));
104 }
105 }
106
GetPriorityTaskCnt()107 uint64_t TaskScheduler::GetPriorityTaskCnt()
108 {
109 if (*GetPriorityTask() != nullptr) {
110 return 1;
111 } else {
112 return 0;
113 }
114 }
115
PushTaskToPriorityStack(TaskBase * executorTask)116 bool TaskScheduler::PushTaskToPriorityStack(TaskBase *executorTask)
117 {
118 if (*GetPriorityTask() == nullptr) {
119 *GetPriorityTask() = reinterpret_cast<void *>(executorTask);
120 return true;
121 }
122 return false;
123 }
124
PushTaskLocalOrPriority(TaskBase * task)125 void TaskScheduler::PushTaskLocalOrPriority(TaskBase *task)
126 {
127 // in self-wakeup scenario, tasks are placed in local fifo to delay scheduling, implementing the yield function
128 bool selfWakeup = (ffrt::ExecuteCtx::Cur()->task == task);
129 if (!selfWakeup) {
130 if (PushTaskToPriorityStack(task)) {
131 return;
132 }
133
134 if ((rand() % INSERT_GLOBAL_QUEUE_FREQ > 0)) {
135 if (GetLocalQueue() != nullptr && GetLocalQueue()->PushTail(task) == 0) {
136 ffrt::FFRTFacade::GetEUInstance().NotifyTask<TaskNotifyType::TASK_LOCAL>(task->qos_);
137 return;
138 }
139 }
140 }
141 PushTaskGlobal(task);
142 }
143
PopTaskLocalOrPriority()144 TaskBase *TaskScheduler::PopTaskLocalOrPriority()
145 {
146 TaskBase *task = nullptr;
147 thread_local static unsigned int lifoCount = 0;
148 unsigned int *workerTickPtr = *GetWorkerTick();
149 if ((workerTickPtr != nullptr) && (*workerTickPtr % GLOBAL_INTERVAL == 0)) {
150 *workerTickPtr = 0;
151 task = PopTaskHybridProcess();
152 // the worker is not notified when the task attribute is set not to notify worker
153 if (NeedNotifyWorker(task)) {
154 FFRTFacade::GetEUInstance().NotifyTask<TaskNotifyType::TASK_PICKED>(qos);
155 }
156 if (task != nullptr) {
157 lifoCount = 0;
158 return task;
159 }
160 }
161 // preferentially pick up tasks from the priority unless the priority is empty or occupied
162 void **priorityTaskPtr = GetPriorityTask();
163 if (*priorityTaskPtr != nullptr) {
164 if (*priorityTaskPtr != &PLACE_HOLDER) {
165 lifoCount++;
166 task = reinterpret_cast<TaskBase *>(*priorityTaskPtr);
167 *priorityTaskPtr = (lifoCount > BUDGET) ? &PLACE_HOLDER : nullptr;
168 return task;
169 }
170 *priorityTaskPtr = nullptr;
171 }
172 lifoCount = 0;
173 return reinterpret_cast<TaskBase *>(GetLocalQueue()->PopHead());
174 }
175
PushUVTaskToWaitingQueue(UVTask * task)176 bool TaskScheduler::PushUVTaskToWaitingQueue(UVTask* task)
177 {
178 std::lock_guard lg(uvMtx);
179 if (uvTaskConcurrency_ >= UV_TASK_MAX_CONCURRENCY) {
180 uvTaskWaitingQueue_.push_back(task);
181 return true;
182 }
183
184 return false;
185 }
186
CheckUVTaskConcurrency(UVTask * task)187 bool TaskScheduler::CheckUVTaskConcurrency(UVTask* task)
188 {
189 std::lock_guard lg(uvMtx);
190 // the number of workers executing UV tasks has reached the upper limit.
191 // therefore, the current task is placed back to the head of the waiting queue (be preferentially obtained later).
192 if (uvTaskConcurrency_ >= UV_TASK_MAX_CONCURRENCY) {
193 uvTaskWaitingQueue_.push_front(task);
194 return false;
195 }
196
197 uvTaskConcurrency_++;
198 return true;
199 }
200
PickWaitingUVTask()201 UVTask* TaskScheduler::PickWaitingUVTask()
202 {
203 std::lock_guard lg(uvMtx);
204 if (uvTaskWaitingQueue_.empty()) {
205 if (uvTaskConcurrency_ > 0) {
206 uvTaskConcurrency_--;
207 }
208 return nullptr;
209 }
210
211 UVTask* task = uvTaskWaitingQueue_.front();
212 uvTaskWaitingQueue_.pop_front();
213 task->SetDequeued();
214 return task;
215 }
216
CancelUVWork(ffrt_executor_task_t * uvWork)217 bool TaskScheduler::CancelUVWork(ffrt_executor_task_t* uvWork)
218 {
219 std::lock_guard lg(uvMtx);
220 if (!reinterpret_cast<LinkedList*>(uvWork->wq)->InList()) {
221 FFRT_SYSEVENT_LOGW("the task has been picked, or has not been inserted");
222 return false;
223 }
224
225 auto iter = std::remove_if(uvTaskWaitingQueue_.begin(), uvTaskWaitingQueue_.end(), [uvWork](UVTask* task) {
226 if (task->uvWork == uvWork) {
227 return true;
228 }
229 return false;
230 });
231 if (iter != uvTaskWaitingQueue_.end()) {
232 uvTaskWaitingQueue_.erase(iter, uvTaskWaitingQueue_.end());
233 return true;
234 }
235
236 auto it = cancelMap_.find(uvWork);
237 if (it != cancelMap_.end()) {
238 it->second++;
239 } else {
240 cancelMap_[uvWork] = 1;
241 }
242 return true;
243 }
244
GetMutex()245 std::mutex* TaskScheduler::GetMutex()
246 {
247 /* We use acquire on load and release on store to enforce the
248 * happens-before relationship between the mutex implicit
249 * initialization and the publication of its address.
250 * i.e. if a thread reads the address of the mutex then
251 * it has been already initialized by the thread that published
252 * its address.
253 */
254 auto curMtx = mtx.load(std::memory_order_acquire);
255 if (curMtx == nullptr) {
256 curMtx = &FFRTFacade::GetEUInstance().GetWorkerGroup(qos).mutex;
257 mtx.store(curMtx, std::memory_order_release);
258 }
259 return curMtx;
260 }
261
Instance()262 SchedulerFactory &SchedulerFactory::Instance()
263 {
264 static SchedulerFactory fac;
265 return fac;
266 }
267
LocalQueue(int qos,std::unordered_map<pid_t,SpmcQueue * > localQueues)268 LocalQueue::LocalQueue(int qos, std::unordered_map<pid_t, SpmcQueue *> localQueues)
269 {
270 this->qos = qos;
271 localQueue = new SpmcQueue();
272 localQueue->Init(LOCAL_QUEUE_SIZE);
273 std::lock_guard<std::mutex> lock(*(FFRTFacade::GetSchedInstance()->GetScheduler(qos).GetMutex()));
274 localQueues.emplace(syscall(SYS_gettid), localQueue);
275 }
276
~LocalQueue()277 LocalQueue::~LocalQueue()
278 {
279 std::lock_guard<std::mutex> lock(*(FFRTFacade::GetSchedInstance()->GetScheduler(qos).GetMutex()));
280 if (FFRT_LIKELY(localQueue != nullptr)) {
281 FFRTFacade::GetSchedInstance()->GetScheduler(qos).RemoveLocalQueue(localQueue);
282 delete localQueue;
283 localQueue = nullptr;
284 }
285 }
286 } // namespace ffrt
287