• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef FFRT_SCHEDULER_HPP
17 #define FFRT_SCHEDULER_HPP
18 #include <list>
19 #include <vector>
20 #include <string>
21 #include <map>
22 #include <mutex>
23 #include <atomic>
24 #include <array>
25 #include "internal_inc/types.h"
26 #include "core/entity.h"
27 #include "eu/execute_unit.h"
28 #include "sync/sync.h"
29 #include "sched/task_scheduler.h"
30 #include "eu/worker_thread.h"
31 #include "tm/cpu_task.h"
32 #include "util/cb_func.h"
33 #include "dfx/bbox/bbox.h"
34 
35 namespace ffrt {
36 class FFRTScheduler {
37 public:
38     FFRTScheduler(const FFRTScheduler&) = delete;
39     FFRTScheduler& operator=(const FFRTScheduler&) = delete;
~FFRTScheduler()40     ~FFRTScheduler()
41     {
42         for (int i = 0; i < QoS::Max(); i++) {
43             SchedulerFactory::Recycle(fifoQue[i]);
44         }
45     }
46 
47     // 获取调度器的单例
48     static FFRTScheduler* Instance();
49     static void RegistInsCb(SingleInsCB<FFRTScheduler>::Instance &&cb);
50 
51 #ifdef QOS_DEPENDENCY
onWait(const std::vector<VersionCtx * > & waitDatas,int64_t deadline)52     void onWait(const std::vector<VersionCtx*>& waitDatas, int64_t deadline)
53     {
54         for (auto data : waitDatas) {
55             if (!data->childVersions.empty()) {
56                 auto waitVersion = data->childVersions.back();
57                 if (waitVersion->status != DataStatus::IDLE) { // 数据已经被生产出来
58                     continue;
59                 }
60                 FFRT_LOGD("wait task=%p deadline=%ld", waitVersion->myProducer, deadline);
61                 updateTask(waitVersion->myProducer, deadline);
62                 updateVersion(waitVersion->preVersion, deadline);
63             }
64         }
65     }
66 #endif
67 
GetScheduler(const QoS & qos)68     TaskScheduler& GetScheduler(const QoS& qos)
69     {
70         return *fifoQue[static_cast<size_t>(qos)];
71     }
72 
73 #ifdef FFRT_IO_TASK_SCHEDULER
PushTask(CPUEUTask * task)74     void PushTask(CPUEUTask* task)
75     {
76         int level = task->qos();
77         auto lock = ExecuteUnit::Instance().GetSleepCtl(level);
78         lock->lock();
79         fifoQue[static_cast<size_t>(level)]->WakeupTask(task);
80         lock->unlock();
81         ExecuteUnit::Instance().NotifyTaskAdded(level);
82     }
83 #endif
84 
WakeupTask(CPUEUTask * task)85     bool WakeupTask(CPUEUTask* task)
86     {
87         int qos_level = static_cast<int>(qos_default);
88         if (task != nullptr) {
89             qos_level = task->qos();
90             if (qos_level == qos_inherit) {
91                 return false;
92             }
93         }
94         QoS _qos = QoS(qos_level);
95         int level = _qos();
96         auto lock = ExecuteUnit::Instance().GetSleepCtl(level);
97         lock->lock();
98         fifoQue[static_cast<size_t>(level)]->WakeupTask(task);
99         lock->unlock();
100         FFRT_LOGD("qos[%d] task[%lu] entered q", level, task->gid);
101         ExecuteUnit::Instance().NotifyTaskAdded(level);
102         return true;
103     }
104 
InsertNode(LinkedList * node,const QoS qos)105     bool InsertNode(LinkedList* node, const QoS qos)
106     {
107         int qos_level = qos();
108         if (qos_level == qos_inherit) {
109             return false;
110         }
111 
112         auto lock = ExecuteUnit::Instance().GetSleepCtl(qos_level);
113         lock->lock();
114         fifoQue[static_cast<size_t>(qos_level)]->WakeupNode(node);
115         lock->unlock();
116 
117 #ifdef FFRT_IO_TASK_SCHEDULER
118         ffrt_executor_task_t* task = reinterpret_cast<ffrt_executor_task_t*>(reinterpret_cast<char*>(node) -
119             offsetof(ffrt_executor_task_t, wq));
120         if (task->type == ffrt_io_task) {
121             ExecuteUnit::Instance().NotifyLocalTaskAdded(qos_level);
122             return true;
123         }
124 #endif
125 
126         ExecuteUnit::Instance().NotifyTaskAdded(qos_level);
127         return true;
128     }
129 
RemoveNode(LinkedList * node,const QoS qos)130     bool RemoveNode(LinkedList* node, const QoS qos)
131     {
132         int qos_level = qos();
133         if (qos_level == qos_inherit) {
134             return false;
135         }
136         auto lock = ExecuteUnit::Instance().GetSleepCtl(qos_level);
137         lock->lock();
138         if (!node->InList()) {
139             lock->unlock();
140             return false;
141         }
142         fifoQue[static_cast<size_t>(qos_level)]->RemoveNode(node);
143         lock->unlock();
144 #ifdef FFRT_BBOX_ENABLE
145         TaskFinishCounterInc();
146 #endif
147         return true;
148     }
149 
150 protected:
FFRTScheduler()151     FFRTScheduler()
152     {
153         TaskState::RegisterOps(TaskState::READY, std::bind(&FFRTScheduler::WakeupTask, this, std::placeholders::_1));
154         for (int i = 0; i < QoS::Max(); i++) {
155             fifoQue[i] = SchedulerFactory::Alloc();
156         }
157     }
158 
159 private:
160     std::array<TaskScheduler*, QoS::Max()> fifoQue;
161 
162 #ifdef QOS_DEPENDENCY
resetDeadline(CPUEUTask * task,int64_t deadline)163     void resetDeadline(CPUEUTask* task, int64_t deadline)
164     {
165         auto it = std::find_if(readyTasks.begin(), readyTasks.end(), [task](auto& p) { return p.second == task; });
166         if (it == readyTasks.end()) {
167             return;
168         }
169         auto node = readyTasks.extract(it);
170         task->qos.deadline.relative += deadline - task->qos.deadline.absolute;
171         task->qos.deadline.absolute = deadline;
172         readyTasks.insert(std::move(node));
173     }
updateTask(CPUEUTask * task,int64_t deadline)174     void updateTask(CPUEUTask* task, int64_t deadline)
175     {
176         if (task == nullptr) {
177             return;
178         }
179         resetDeadline(task, deadline);
180         onWait(task->ins, deadline);
181         for (auto data : task->outs) {
182             updateVersion(data, deadline);
183         }
184         updateChildTask(task, deadline);
185     }
updateChildTask(CPUEUTask * task,int64_t deadline)186     void updateChildTask(CPUEUTask* task, int64_t deadline)
187     {
188         (void)task;
189         (void)deadline;
190     }
updateVersion(VersionCtx * data,int64_t deadline)191     void updateVersion(VersionCtx* data, int64_t deadline)
192     {
193         if (data == nullptr) {
194             return;
195         }
196         updateTask(data->myProducer, deadline);
197         for (auto task : data->consumers) {
198             updateTask(task, deadline);
199         }
200         updateVersion(data->preVersion, deadline);
201     }
202 #endif
203 };
204 
205 class SFFRTScheduler : public FFRTScheduler {
206 public:
Instance()207     static FFRTScheduler& Instance()
208     {
209         static SFFRTScheduler ins;
210         return ins;
211     }
212 private:
SFFRTScheduler()213     SFFRTScheduler()
214     {
215     }
216 };
217 } // namespace ffrt
218 #endif
219