1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef FFRT_DYNAMIC_GRAPH_H
17 #define FFRT_DYNAMIC_GRAPH_H
18 #include <unordered_map>
19 #include <vector>
20 #include <string>
21 #include <mutex>
22 #include <shared_mutex>
23 #include "internal_inc/types.h"
24 #include "internal_inc/osal.h"
25 #include "core/task_ctx.h"
26 #include "core/version_ctx.h"
27 #include "sched/execute_ctx.h"
28 #include "sched/qos.h"
29 #include "dfx/trace/ffrt_trace.h"
30 #include "util/slab.h"
31 #include "sched/task_state.h"
32 #include "sched/scheduler.h"
33 #include "eu/execute_unit.h"
34 #include "entity.h"
35 #include "dfx/bbox/bbox.h"
36
37 namespace ffrt {
38 #define OFFSETOF(TYPE, MEMBER) (reinterpret_cast<size_t>(&((reinterpret_cast<TYPE *>(0))->MEMBER)))
39
40 using TaskCtxAllocator = SimpleAllocator<TaskCtx>;
41
outsDeDup(std::vector<const void * > & outsNoDup,const ffrt_deps_t * outs)42 inline bool outsDeDup(std::vector<const void *> &outsNoDup, const ffrt_deps_t *outs)
43 {
44 for (uint32_t i = 0; i < outs->len; i++) {
45 if (std::find(outsNoDup.begin(), outsNoDup.end(), outs->items[i].ptr) == outsNoDup.end()) {
46 if ((outs->items[i].type) == ffrt_dependence_task) {
47 FFRT_LOGE("handle can't be used as out dependence");
48 return false;
49 }
50 outsNoDup.push_back(outs->items[i].ptr);
51 }
52 }
53 return true;
54 }
55
insDeDup(std::vector<TaskCtx * > & in_handles,std::vector<const void * > & insNoDup,std::vector<const void * > & outsNoDup,const ffrt_deps_t * ins)56 inline void insDeDup(std::vector<TaskCtx*> &in_handles, std::vector<const void *> &insNoDup,
57 std::vector<const void *> &outsNoDup, const ffrt_deps_t *ins)
58 {
59 for (uint32_t i = 0; i < ins->len; i++) {
60 if (std::find(outsNoDup.begin(), outsNoDup.end(), ins->items[i].ptr) == outsNoDup.end()) {
61 if ((ins->items[i].type) == ffrt_dependence_task) {
62 ((ffrt::TaskCtx*)(ins->items[i].ptr))->IncDeleteRef();
63 in_handles.emplace_back((ffrt::TaskCtx*)(ins->items[i].ptr));
64 }
65 insNoDup.push_back(ins->items[i].ptr);
66 }
67 }
68 }
69
70 class DependenceManager {
71 public:
DependenceManager()72 DependenceManager() : criticalMutex_(Entity::Instance()->criticalMutex_)
73 {
74 // control construct sequences of singletons
75 SimpleAllocator<TaskCtx>::instance();
76 SimpleAllocator<VersionCtx>::instance();
77 FFRTScheduler::Instance();
78 ExecuteUnit::Instance();
79
80 TaskState::RegisterOps(TaskState::EXITED, [this](TaskCtx* task) { return this->onTaskDone(task), true; });
81 }
~DependenceManager()82 ~DependenceManager()
83 {
84 }
Instance()85 static inline DependenceManager* Instance()
86 {
87 static DependenceManager graph;
88 return &graph;
89 }
90
Root()91 static inline TaskCtx* Root()
92 {
93 // Within an ffrt process, different threads may have different QoS interval
94 thread_local static RootTaskCtxWrapper root_wrapper;
95 return root_wrapper.Root();
96 }
97
98 template <int WITH_HANDLE>
onSubmit(ffrt_task_handle_t & handle,ffrt_function_header_t * f,const ffrt_deps_t * ins,const ffrt_deps_t * outs,const task_attr_private * attr)99 void onSubmit(ffrt_task_handle_t &handle, ffrt_function_header_t *f, const ffrt_deps_t *ins,
100 const ffrt_deps_t *outs, const task_attr_private *attr)
101 {
102 FFRT_TRACE_SCOPE(1, onSubmit);
103 // 1 Init eu and scheduler
104 auto ctx = ExecuteCtx::Cur();
105 auto en = Entity::Instance();
106
107 // 2 Get current task's parent
108 auto parent = ctx->task ? ctx->task : DependenceManager::Root();
109
110 std::vector<const void*> insNoDup;
111 std::vector<const void*> outsNoDup;
112 std::vector<TaskCtx*> in_handles;
113 // signature去重:1)outs去重
114 if (outs) {
115 if (!outsDeDup(outsNoDup, outs)) {
116 FFRT_LOGE("onSubmit outsDeDup error");
117 return;
118 }
119 }
120
121 // signature去重:2)ins去重(不影响功能,skip);3)ins不和outs重复(当前不支持weak signature)
122 if (ins) {
123 insDeDup(in_handles, insNoDup, outsNoDup, ins);
124 }
125
126 // 2.1 Create task ctx
127 TaskCtx* task = nullptr;
128 {
129 FFRT_TRACE_SCOPE(1, CreateTask);
130 task = reinterpret_cast<TaskCtx*>(static_cast<uintptr_t>(
131 static_cast<size_t>(reinterpret_cast<uintptr_t>(f)) - OFFSETOF(TaskCtx, func_storage)));
132 new (task)TaskCtx(attr, parent, ++parent->childNum, nullptr);
133 }
134 FFRT_LOGD("submit task[%lu], name[%s]", task->gid, task->label.c_str());
135 #ifdef FFRT_BBOX_ENABLE
136 TaskSubmitCounterInc();
137 #endif
138 if (WITH_HANDLE != 0) {
139 task->IncDeleteRef();
140 handle = static_cast<ffrt_task_handle_t>(task);
141 outsNoDup.push_back(handle); // handle作为任务的输出signature
142 }
143 QoS qos = (attr == nullptr ? QoS() : QoS(attr->qos_));
144 task->SetQos(qos);
145 task->InitRelatedIntervals(parent);
146 /* The parent's number of subtasks to be completed increases by one,
147 * and decreases by one after the subtask is completed
148 */
149 task->IncChildRef();
150
151 std::vector<std::pair<VersionCtx*, NestType>> inDatas;
152 std::vector<std::pair<VersionCtx*, NestType>> outDatas;
153
154 if (!(insNoDup.empty() && outsNoDup.empty())) {
155 // 3 Put the submitted task into Entity
156 FFRT_TRACE_SCOPE(1, submitBeforeLock);
157 std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
158 FFRT_TRACE_SCOPE(1, submitAfterLock);
159
160 MapSignature2Deps(task, insNoDup, outsNoDup, inDatas, outDatas);
161
162 {
163 FFRT_TRACE_SCOPE(1, dealInDepend);
164 // 3.1 Process input dependencies
165 for (auto& i : std::as_const(inDatas)) {
166 i.first->AddConsumer(task, i.second);
167 }
168 }
169
170 {
171 FFRT_TRACE_SCOPE(1, dealOutDepend);
172 // 3.2 Process output dependencies
173 for (auto& o : std::as_const(outDatas)) {
174 o.first->AddProducer(task);
175 }
176 }
177 task->in_handles.swap(in_handles);
178 if (task->depRefCnt != 0) {
179 FFRT_BLOCK_TRACER(task->gid, dep);
180 return;
181 }
182 }
183
184 FFRT_LOGD("Submit completed, enter ready queue, task[%lu], name[%s]", task->gid, task->label.c_str());
185 task->UpdateState(TaskState::READY);
186 #ifdef FFRT_BBOX_ENABLE
187 TaskEnQueuCounterInc();
188 #endif
189 }
190
onSubmitUV(ffrt_executor_task_t * task,const task_attr_private * attr)191 void onSubmitUV(ffrt_executor_task_t* task, const task_attr_private* attr)
192 {
193 FFRT_TRACE_SCOPE(1, onSubmitUV);
194 #ifdef FFRT_BBOX_ENABLE
195 TaskSubmitCounterInc();
196 #endif
197 QoS qos = (attr == nullptr ? QoS() : QoS(attr->qos_));
198
199 LinkedList* node = (LinkedList *)(&task->wq);
200 FFRTScheduler* sch = FFRTScheduler::Instance();
201 if (!sch->InsertNode(node, qos)) {
202 FFRT_LOGE("Submit UV task failed!");
203 return;
204 }
205 #ifdef FFRT_BBOX_ENABLE
206 TaskEnQueuCounterInc();
207 #endif
208 }
209
onWait()210 void onWait()
211 {
212 auto ctx = ExecuteCtx::Cur();
213 auto task = ctx->task ? ctx->task : DependenceManager::Root();
214 if (!USE_COROUTINE || task->parent == nullptr) {
215 std::unique_lock<std::mutex> lck(task->lock);
216 task->MultiDepenceAdd(Denpence::CALL_DEPENCE);
217 FFRT_LOGD("onWait name:%s gid=%lu", task->label.c_str(), task->gid);
218 task->childWaitCond_.wait(lck, [task] { return task->childWaitRefCnt == 0; });
219 return;
220 }
221
222 auto childDepFun = [&](ffrt::TaskCtx* inTask) -> bool {
223 std::unique_lock<std::mutex> lck(inTask->lock);
224 if (inTask->childWaitRefCnt == 0) {
225 return false;
226 }
227 inTask->MultiDepenceAdd(Denpence::CALL_DEPENCE);
228 inTask->UpdateState(ffrt::TaskState::BLOCKED);
229 return true;
230 };
231 FFRT_BLOCK_TRACER(task->gid, chd);
232 CoWait(childDepFun);
233 }
234
235 #ifdef QOS_DEPENDENCY
236 void onWait(const ffrt_deps_t* deps, int64_t deadline = -1)
237 #else
238 void onWait(const ffrt_deps_t* deps)
239 #endif
240 {
241 auto ctx = ExecuteCtx::Cur();
242 auto task = ctx->task ? ctx->task : DependenceManager::Root();
243
244 auto dataDepFun = [&]() {
245 std::vector<VersionCtx*> waitDatas;
246 waitDatas.reserve(deps->len);
247 std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
248
249 for (uint32_t i = 0; i < deps->len; ++i) {
250 auto d = deps->items[i].ptr;
251 auto it = std::as_const(Entity::Instance()->vaMap).find(d);
252 if (it != Entity::Instance()->vaMap.end()) {
253 auto waitData = it->second;
254 // Find the VersionCtx of the parent task level
255 for (auto out : std::as_const(task->outs)) {
256 if (waitData->signature == out->signature) {
257 waitData = out;
258 break;
259 }
260 }
261 waitDatas.push_back(waitData);
262 }
263 }
264 #ifdef QOS_DEPENDENCY
265 if (deadline != -1) {
266 Scheduler::Instance()->onWait(waitDatas, deadline);
267 }
268 #endif
269 for (auto data : std::as_const(waitDatas)) {
270 data->AddDataWaitTaskByThis(task);
271 }
272 };
273
274 if (!USE_COROUTINE || task->parent == nullptr) {
275 dataDepFun();
276 std::unique_lock<std::mutex> lck(task->lock);
277 task->MultiDepenceAdd(Denpence::DATA_DEPENCE);
278 FFRT_LOGD("onWait name:%s gid=%lu", task->label.c_str(), task->gid);
279 task->dataWaitCond_.wait(lck, [task] { return task->dataWaitRefCnt == 0; });
280 return;
281 }
282
283 auto pendDataDepFun = [&](ffrt::TaskCtx* inTask) -> bool {
284 dataDepFun();
285 FFRT_LOGD("onWait name:%s gid=%lu", inTask->label.c_str(), inTask->gid);
286 std::unique_lock<std::mutex> lck(inTask->lock);
287 if (inTask->dataWaitRefCnt == 0) {
288 return false;
289 }
290 inTask->MultiDepenceAdd(Denpence::DATA_DEPENCE);
291 inTask->UpdateState(ffrt::TaskState::BLOCKED);
292 return true;
293 };
294 FFRT_BLOCK_TRACER(task->gid, dat);
295 CoWait(pendDataDepFun);
296 }
297
onTaskDone(TaskCtx * task)298 void onTaskDone(TaskCtx* task)
299 {
300 FFRT_LOGD("Task completed, task[%lu], name[%s]", task->gid, task->label.c_str());
301 #ifdef FFRT_BBOX_ENABLE
302 TaskDoneCounterInc();
303 #endif
304 FFRT_TRACE_SCOPE(1, ontaskDone);
305 task->DecChildRef();
306 if (!(task->ins.empty() && task->outs.empty())) {
307 std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
308 FFRT_TRACE_SCOPE(1, taskDoneAfterLock);
309
310 // Production data
311 for (auto out : std::as_const(task->outs)) {
312 out->onProduced();
313 }
314 // Consumption data
315 for (auto in : std::as_const(task->ins)) {
316 in->onConsumed(task);
317 }
318 for (auto in : std::as_const(task->in_handles)) {
319 in->DecDeleteRef();
320 }
321
322 // VersionCtx recycling
323 Entity::Instance()->RecycleVersion();
324 }
325
326 task->RecycleTask();
327 }
328
MapSignature2Deps(TaskCtx * task,const std::vector<const void * > & inDeps,const std::vector<const void * > & outDeps,std::vector<std::pair<VersionCtx *,NestType>> & inVersions,std::vector<std::pair<VersionCtx *,NestType>> & outVersions)329 void MapSignature2Deps(TaskCtx* task, const std::vector<const void*>& inDeps,
330 const std::vector<const void*>& outDeps, std::vector<std::pair<VersionCtx*, NestType>>& inVersions,
331 std::vector<std::pair<VersionCtx*, NestType>>& outVersions)
332 {
333 auto en = Entity::Instance();
334 // scene description:
335 for (auto signature : inDeps) {
336 VersionCtx* version = nullptr;
337 NestType type = NestType::DEFAULT;
338 // scene 1|2
339 for (auto parentOut : std::as_const(task->parent->outs)) {
340 if (parentOut->signature == signature) {
341 version = parentOut;
342 type = NestType::PARENTOUT;
343 goto add_inversion;
344 }
345 }
346 // scene 3
347 for (auto parentIn : std::as_const(task->parent->ins)) {
348 if (parentIn->signature == signature) {
349 version = parentIn;
350 type = NestType::PARENTIN;
351 goto add_inversion;
352 }
353 }
354 // scene 4
355 version = en->VA2Ctx(signature, task);
356 add_inversion:
357 inVersions.push_back({version, type});
358 }
359
360 for (auto signature : outDeps) {
361 VersionCtx* version = nullptr;
362 NestType type = NestType::DEFAULT;
363 // scene 5|6
364 for (auto parentOut : std::as_const(task->parent->outs)) {
365 if (parentOut->signature == signature) {
366 version = parentOut;
367 type = NestType::PARENTOUT;
368 goto add_outversion;
369 }
370 }
371 // scene 7
372 #ifndef FFRT_RELEASE
373 for (auto parentIn : std::as_const(task->parent->ins)) {
374 if (parentIn->signature == signature) {
375 FFRT_LOGE("parent's indep only cannot be child's outdep");
376 }
377 }
378 #endif
379 // scene 8
380 version = en->VA2Ctx(signature, task);
381 add_outversion:
382 outVersions.push_back({ version, type });
383 }
384 }
385
386 #ifdef MUTEX_PERF // Mutex Lock&Unlock Cycles Statistic
387 xx::mutex& criticalMutex_;
388 #else
389 fast_mutex& criticalMutex_;
390 #endif
391 };
392 } // namespace ffrt
393 #endif
394