• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef FFRT_TASK_SCHEDULER_HPP
17 #define FFRT_TASK_SCHEDULER_HPP
18 #include "sched/task_runqueue.h"
19 #include "tm/task_base.h"
20 #include "util/spmc_queue.h"
21 
22 namespace ffrt {
23 extern int PLACE_HOLDER;
24 
25 enum class TaskSchedMode : uint8_t {
26     DEFAULT_TASK_SCHED_MODE = 0, // only use global queue
27     LOCAL_TASK_SCHED_MODE, // only use local queue and priority slot
28 };
29 
30 class TaskScheduler {
31 public:
32     TaskScheduler() = default;
~TaskScheduler()33     virtual ~TaskScheduler() {}
34 
PushTask(TaskBase * task)35     void PushTask(TaskBase *task)
36     {
37         if (GetTaskSchedMode() == TaskSchedMode::DEFAULT_TASK_SCHED_MODE) {
38             PushTaskGlobal(task);
39         } else if (GetTaskSchedMode() == TaskSchedMode::LOCAL_TASK_SCHED_MODE) {
40             PushTaskLocalOrPriority(task);
41         }
42     }
43 
44     TaskBase* PopTask();
45 
46     virtual void SetQos(QoS &q) = 0;
47 
48     int qos {0};
49 
50     int StealTask();
51     void RemoveLocalQueue(SpmcQueue* localQueue);
52     SpmcQueue* GetLocalQueue();
53     void** GetPriorityTask();
54     unsigned int** GetWorkerTick();
55 
56     // global_queue.size + totalLocalTaskCnt, not include the PriorityTaskCnt
GetTotalTaskCnt()57     uint64_t GetTotalTaskCnt()
58     {
59         uint64_t totalTaskCnt = GetGlobalTaskCnt();
60         for (auto &localQueue : localQueues) {
61             totalTaskCnt += localQueue.second->GetLength();
62         }
63         return totalTaskCnt;
64     }
65     // global_queue.size
66     virtual uint64_t GetGlobalTaskCnt() = 0;
67 
68     // thread_local local_queue.size, not totalLocalTaskCnt
GetLocalTaskCnt()69     inline uint64_t GetLocalTaskCnt()
70     {
71         return GetLocalQueue()->GetLength();
72     }
73     // thread_local priority.size, not totalPriorityTaskCnt
74     uint64_t GetPriorityTaskCnt();
75 
SetTaskSchedMode(const TaskSchedMode & mode)76     inline void SetTaskSchedMode(const TaskSchedMode& mode)
77     {
78         taskSchedMode = mode;
79     }
80 
GetTaskSchedMode()81     inline const TaskSchedMode& GetTaskSchedMode()
82     {
83         return taskSchedMode;
84     }
85 
GetWorkerLocalQueue(pid_t pid)86     inline SpmcQueue* GetWorkerLocalQueue(pid_t pid)
87     {
88         std::lock_guard lg(*GetMutex());
89         return localQueues[pid];
90     }
91 
92     virtual bool PushTaskGlobal(TaskBase* task, bool rtb = true) = 0;
93     virtual TaskBase* PopTaskGlobal() = 0;
94 
95     bool CancelUVWork(ffrt_executor_task_t* uvWork);
96     bool PushUVTaskToWaitingQueue(UVTask* task);
97     bool CheckUVTaskConcurrency(UVTask* task);
98     UVTask* PickWaitingUVTask();
99 
100     std::mutex* GetMutex();
101 
IsStealerActive()102     inline bool IsStealerActive()
103     {
104         return stealingInProgress.load(std::memory_order_relaxed);
105     }
106 
107 protected:
108     std::unordered_map<pid_t, SpmcQueue*> localQueues;
109     TaskSchedMode taskSchedMode = TaskSchedMode::DEFAULT_TASK_SCHED_MODE;
110 
111     void PushTaskLocalOrPriority(TaskBase* task);
112     TaskBase* PopTaskLocalOrPriority();
113 
114     // global queue -> local queue -> priority slot
115     virtual TaskBase* PopTaskHybridProcess() = 0;
116     bool PushTaskToPriorityStack(TaskBase *executorTask);
117 
GetUVTask(TaskBase * task)118     TaskBase* GetUVTask(TaskBase* task)
119     {
120         std::lock_guard<std::mutex> lg(uvMtx);
121         UVTask* uvTask = static_cast<UVTask*>(task);
122         auto it = cancelMap_.find(uvTask->uvWork);
123         if (it != cancelMap_.end()) {
124             uvTask->FreeMem();
125             // the task has been canceled, remove it
126             if (it->second == 1)
127                 cancelMap_.erase(it);
128             else
129                 it->second--;
130             return nullptr;
131         }
132 
133         uvTask->SetDequeued();
134         return task;
135     }
136 
137 private:
138     std::atomic<std::mutex*> mtx {nullptr};
139     std::mutex uvMtx;
140     std::unordered_map<ffrt_executor_task_t*, uint32_t> cancelMap_;
141     int uvTaskConcurrency_ = 0;
142     std::deque<UVTask*> uvTaskWaitingQueue_;
143     std::atomic<bool> stealingInProgress { false }; /* indicates whether a stealer is in progress or not */
144 };
145 
146 class SchedulerFactory {
147 public:
148     using AllocCB = std::function<TaskScheduler*()>;
149     using RecycleCB = std::function<void (TaskScheduler*)>;
150 
151     static SchedulerFactory& Instance();
152 
Alloc()153     static TaskScheduler *Alloc()
154     {
155         return Instance().alloc_();
156     }
157 
Recycle(TaskScheduler * schd)158     static void Recycle(TaskScheduler *schd)
159     {
160         Instance().recycle_(schd);
161     }
162 
RegistCb(const AllocCB & alloc,const RecycleCB & recycle)163     static void RegistCb(const AllocCB &alloc, const RecycleCB &recycle)
164     {
165         Instance().alloc_ = alloc;
166         Instance().recycle_ = recycle;
167     }
168 
169 private:
170     AllocCB alloc_;
171     RecycleCB recycle_;
172 };
173 
174 struct LocalQueue {
175     explicit LocalQueue(int qos, std::unordered_map<pid_t, SpmcQueue*> localQueues);
176     ~LocalQueue();
177     int qos {0};
178     SpmcQueue* localQueue;
179 };
180 } // namespace ffrt
181 
182 #endif
183