• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sched/task_scheduler.h"
17 #include <random>
18 #include "eu/execute_unit.h"
19 #include "util/ffrt_facade.h"
20 
21 namespace {
22 constexpr std::size_t LOCAL_QUEUE_SIZE = 128;
23 constexpr int INSERT_GLOBAL_QUEUE_FREQ = 5;
24 constexpr int GLOBAL_INTERVAL = 60;
25 constexpr int UV_TASK_MAX_CONCURRENCY = 8;
26 constexpr unsigned int BUDGET = 10;
27 
InsertTask(void * task)28 void InsertTask(void *task)
29 {
30     ffrt::TaskBase *baseTask = reinterpret_cast<ffrt::TaskBase *>(task);
31     ffrt::FFRTFacade::GetSchedInstance()->GetScheduler(baseTask->qos_).PushTaskGlobal(baseTask);
32 }
33 } // namespace
34 
35 namespace ffrt {
36 int PLACE_HOLDER = 0;
PopTask()37 TaskBase *TaskScheduler::PopTask()
38 {
39     TaskBase *task = nullptr;
40     if (GetTaskSchedMode() == TaskSchedMode::LOCAL_TASK_SCHED_MODE) {
41         task = PopTaskLocalOrPriority();
42         if (task == nullptr) {
43             StealTask();
44             bool stealSuccees = false;
45             if (GetLocalTaskCnt() > 0) {
46                 stealSuccees = true;
47                 task = reinterpret_cast<TaskBase *>(GetLocalQueue()->PopHead());
48             }
49             unsigned int *workerTickPtr = *GetWorkerTick();
50             if (stealSuccees && workerTickPtr != nullptr) {
51                 *workerTickPtr = 1;
52             }
53         }
54     } else if (GetTaskSchedMode() == TaskSchedMode::DEFAULT_TASK_SCHED_MODE) {
55         task = PopTaskGlobal();
56     }
57     if (task) {
58         task->Pop();
59     }
60     return task;
61 }
62 
GetLocalQueue()63 SpmcQueue *TaskScheduler::GetLocalQueue()
64 {
65     thread_local static LocalQueue localQueue(qos, this->localQueues);
66     return localQueue.localQueue;
67 }
68 
GetPriorityTask()69 void **TaskScheduler::GetPriorityTask()
70 {
71     thread_local void *priorityTask{nullptr};
72     return &priorityTask;
73 }
74 
GetWorkerTick()75 unsigned int **TaskScheduler::GetWorkerTick()
76 {
77     thread_local unsigned int *workerTick{nullptr};
78     return &workerTick;
79 }
80 
StealTask()81 int TaskScheduler::StealTask()
82 {
83     std::lock_guard<std::mutex> lock(*GetMutex());
84     stealingInProgress = true;
85     std::unordered_map<pid_t, SpmcQueue *>::iterator iter = localQueues.begin();
86     while (iter != localQueues.end()) {
87         SpmcQueue* queue = iter->second;
88         unsigned int queueLen = queue->GetLength();
89         if (queue != GetLocalQueue() && queueLen > 0) {
90             unsigned int popLen = queue->PopHeadToAnotherQueue(*GetLocalQueue(), (queueLen + 1) / 2, InsertTask);
91             stealingInProgress = false;
92             return popLen;
93         }
94         iter++;
95     }
96     stealingInProgress = false;
97     return 0;
98 }
99 
RemoveLocalQueue(SpmcQueue * localQueue)100 void TaskScheduler::RemoveLocalQueue(SpmcQueue *localQueue)
101 {
102     if (localQueue != nullptr) {
103         localQueues.erase(syscall(SYS_gettid));
104     }
105 }
106 
GetPriorityTaskCnt()107 uint64_t TaskScheduler::GetPriorityTaskCnt()
108 {
109     if (*GetPriorityTask() != nullptr) {
110         return 1;
111     } else {
112         return 0;
113     }
114 }
115 
PushTaskToPriorityStack(TaskBase * executorTask)116 bool TaskScheduler::PushTaskToPriorityStack(TaskBase *executorTask)
117 {
118     if (*GetPriorityTask() == nullptr) {
119         *GetPriorityTask() = reinterpret_cast<void *>(executorTask);
120         return true;
121     }
122     return false;
123 }
124 
PushTaskLocalOrPriority(TaskBase * task)125 void TaskScheduler::PushTaskLocalOrPriority(TaskBase *task)
126 {
127     // in self-wakeup scenario, tasks are placed in local fifo to delay scheduling, implementing the yield function
128     bool selfWakeup = (ffrt::ExecuteCtx::Cur()->task == task);
129     if (!selfWakeup) {
130         if (PushTaskToPriorityStack(task)) {
131             return;
132         }
133 
134         if ((rand() % INSERT_GLOBAL_QUEUE_FREQ > 0)) {
135             if (GetLocalQueue() != nullptr && GetLocalQueue()->PushTail(task) == 0) {
136                 ffrt::FFRTFacade::GetEUInstance().NotifyTask<TaskNotifyType::TASK_LOCAL>(task->qos_);
137                 return;
138             }
139         }
140     }
141     PushTaskGlobal(task);
142 }
143 
PopTaskLocalOrPriority()144 TaskBase *TaskScheduler::PopTaskLocalOrPriority()
145 {
146     TaskBase *task = nullptr;
147     thread_local static unsigned int lifoCount = 0;
148     unsigned int *workerTickPtr = *GetWorkerTick();
149     if ((workerTickPtr != nullptr) && (*workerTickPtr % GLOBAL_INTERVAL == 0)) {
150         *workerTickPtr = 0;
151         task = PopTaskHybridProcess();
152         // the worker is not notified when the task attribute is set not to notify worker
153         if (NeedNotifyWorker(task)) {
154             FFRTFacade::GetEUInstance().NotifyTask<TaskNotifyType::TASK_PICKED>(qos);
155         }
156         if (task != nullptr) {
157             lifoCount = 0;
158             return task;
159         }
160     }
161     // preferentially pick up tasks from the priority unless the priority is empty or occupied
162     void **priorityTaskPtr = GetPriorityTask();
163     if (*priorityTaskPtr != nullptr) {
164         if (*priorityTaskPtr != &PLACE_HOLDER) {
165             lifoCount++;
166             task = reinterpret_cast<TaskBase *>(*priorityTaskPtr);
167             *priorityTaskPtr = (lifoCount > BUDGET) ? &PLACE_HOLDER : nullptr;
168             return task;
169         }
170         *priorityTaskPtr = nullptr;
171     }
172     lifoCount = 0;
173     return reinterpret_cast<TaskBase *>(GetLocalQueue()->PopHead());
174 }
175 
PushUVTaskToWaitingQueue(UVTask * task)176 bool TaskScheduler::PushUVTaskToWaitingQueue(UVTask* task)
177 {
178     std::lock_guard lg(uvMtx);
179     if (uvTaskConcurrency_ >= UV_TASK_MAX_CONCURRENCY) {
180         uvTaskWaitingQueue_.push_back(task);
181         return true;
182     }
183 
184     return false;
185 }
186 
CheckUVTaskConcurrency(UVTask * task)187 bool TaskScheduler::CheckUVTaskConcurrency(UVTask* task)
188 {
189     std::lock_guard lg(uvMtx);
190     // the number of workers executing UV tasks has reached the upper limit.
191     // therefore, the current task is placed back to the head of the waiting queue (be preferentially obtained later).
192     if (uvTaskConcurrency_ >= UV_TASK_MAX_CONCURRENCY) {
193         uvTaskWaitingQueue_.push_front(task);
194         return false;
195     }
196 
197     uvTaskConcurrency_++;
198     return true;
199 }
200 
PickWaitingUVTask()201 UVTask* TaskScheduler::PickWaitingUVTask()
202 {
203     std::lock_guard lg(uvMtx);
204     if (uvTaskWaitingQueue_.empty()) {
205         if (uvTaskConcurrency_ > 0) {
206             uvTaskConcurrency_--;
207         }
208         return nullptr;
209     }
210 
211     UVTask* task = uvTaskWaitingQueue_.front();
212     uvTaskWaitingQueue_.pop_front();
213     task->SetDequeued();
214     return task;
215 }
216 
CancelUVWork(ffrt_executor_task_t * uvWork)217 bool TaskScheduler::CancelUVWork(ffrt_executor_task_t* uvWork)
218 {
219     std::lock_guard lg(uvMtx);
220     if (!reinterpret_cast<LinkedList*>(uvWork->wq)->InList()) {
221         FFRT_SYSEVENT_LOGW("the task has been picked, or has not been inserted");
222         return false;
223     }
224 
225     auto iter = std::remove_if(uvTaskWaitingQueue_.begin(), uvTaskWaitingQueue_.end(), [uvWork](UVTask* task) {
226         if (task->uvWork == uvWork) {
227             return true;
228         }
229         return false;
230     });
231     if (iter != uvTaskWaitingQueue_.end()) {
232         uvTaskWaitingQueue_.erase(iter, uvTaskWaitingQueue_.end());
233         return true;
234     }
235 
236     auto it = cancelMap_.find(uvWork);
237     if (it != cancelMap_.end()) {
238         it->second++;
239     } else {
240         cancelMap_[uvWork] = 1;
241     }
242     return true;
243 }
244 
GetMutex()245 std::mutex* TaskScheduler::GetMutex()
246 {
247     /* We use acquire on load and release on store to enforce the
248      * happens-before relationship between the mutex implicit
249      * initialization and the publication of its address.
250      * i.e. if a thread reads the address of the mutex then
251      * it has been already initialized by the thread that published
252      * its address.
253      */
254     auto curMtx = mtx.load(std::memory_order_acquire);
255     if (curMtx == nullptr) {
256         curMtx = &FFRTFacade::GetEUInstance().GetWorkerGroup(qos).mutex;
257         mtx.store(curMtx, std::memory_order_release);
258     }
259     return curMtx;
260 }
261 
Instance()262 SchedulerFactory &SchedulerFactory::Instance()
263 {
264     static SchedulerFactory fac;
265     return fac;
266 }
267 
LocalQueue(int qos,std::unordered_map<pid_t,SpmcQueue * > localQueues)268 LocalQueue::LocalQueue(int qos, std::unordered_map<pid_t, SpmcQueue *> localQueues)
269 {
270     this->qos = qos;
271     localQueue = new SpmcQueue();
272     localQueue->Init(LOCAL_QUEUE_SIZE);
273     std::lock_guard<std::mutex> lock(*(FFRTFacade::GetSchedInstance()->GetScheduler(qos).GetMutex()));
274     localQueues.emplace(syscall(SYS_gettid), localQueue);
275 }
276 
~LocalQueue()277 LocalQueue::~LocalQueue()
278 {
279     std::lock_guard<std::mutex> lock(*(FFRTFacade::GetSchedInstance()->GetScheduler(qos).GetMutex()));
280     if (FFRT_LIKELY(localQueue != nullptr)) {
281         FFRTFacade::GetSchedInstance()->GetScheduler(qos).RemoveLocalQueue(localQueue);
282         delete localQueue;
283         localQueue = nullptr;
284     }
285 }
286 } // namespace ffrt
287