• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef FFRT_DYNAMIC_GRAPH_H
17 #define FFRT_DYNAMIC_GRAPH_H
18 #include <unordered_map>
19 #include <vector>
20 #include <string>
21 #include <mutex>
22 #include <shared_mutex>
23 #include "internal_inc/types.h"
24 #include "internal_inc/osal.h"
25 #include "core/task_ctx.h"
26 #include "core/version_ctx.h"
27 #include "sched/execute_ctx.h"
28 #include "sched/qos.h"
29 #include "dfx/trace/ffrt_trace.h"
30 #include "util/slab.h"
31 #include "sched/task_state.h"
32 #include "sched/scheduler.h"
33 #include "eu/execute_unit.h"
34 #include "entity.h"
35 #include "dfx/bbox/bbox.h"
36 
37 namespace ffrt {
38 #define OFFSETOF(TYPE, MEMBER) (reinterpret_cast<size_t>(&((reinterpret_cast<TYPE *>(0))->MEMBER)))
39 
40 using TaskCtxAllocator = SimpleAllocator<TaskCtx>;
41 
outsDeDup(std::vector<const void * > & outsNoDup,const ffrt_deps_t * outs)42 inline bool outsDeDup(std::vector<const void *> &outsNoDup, const ffrt_deps_t *outs)
43 {
44     for (uint32_t i = 0; i < outs->len; i++) {
45         if (std::find(outsNoDup.begin(), outsNoDup.end(), outs->items[i].ptr) == outsNoDup.end()) {
46             if ((outs->items[i].type) == ffrt_dependence_task) {
47                 FFRT_LOGE("handle can't be used as out dependence");
48                 return false;
49             }
50             outsNoDup.push_back(outs->items[i].ptr);
51         }
52     }
53     return true;
54 }
55 
insDeDup(std::vector<TaskCtx * > & in_handles,std::vector<const void * > & insNoDup,std::vector<const void * > & outsNoDup,const ffrt_deps_t * ins)56 inline void insDeDup(std::vector<TaskCtx*> &in_handles, std::vector<const void *> &insNoDup,
57     std::vector<const void *> &outsNoDup, const ffrt_deps_t *ins)
58 {
59     for (uint32_t i = 0; i < ins->len; i++) {
60         if (std::find(outsNoDup.begin(), outsNoDup.end(), ins->items[i].ptr) == outsNoDup.end()) {
61             if ((ins->items[i].type) == ffrt_dependence_task) {
62                 ((ffrt::TaskCtx*)(ins->items[i].ptr))->IncDeleteRef();
63                 in_handles.emplace_back((ffrt::TaskCtx*)(ins->items[i].ptr));
64             }
65             insNoDup.push_back(ins->items[i].ptr);
66         }
67     }
68 }
69 
70 class DependenceManager {
71 public:
DependenceManager()72     DependenceManager() : criticalMutex_(Entity::Instance()->criticalMutex_)
73     {
74         // control construct sequences of singletons
75         SimpleAllocator<TaskCtx>::instance();
76         SimpleAllocator<VersionCtx>::instance();
77         FFRTScheduler::Instance();
78         ExecuteUnit::Instance();
79 
80         TaskState::RegisterOps(TaskState::EXITED, [this](TaskCtx* task) { return this->onTaskDone(task), true; });
81     }
~DependenceManager()82     ~DependenceManager()
83     {
84     }
Instance()85     static inline DependenceManager* Instance()
86     {
87         static DependenceManager graph;
88         return &graph;
89     }
90 
Root()91     static inline TaskCtx* Root()
92     {
93         // Within an ffrt process, different threads may have different QoS interval
94         thread_local static RootTaskCtxWrapper root_wrapper;
95         return root_wrapper.Root();
96     }
97 
98     template <int WITH_HANDLE>
onSubmit(ffrt_task_handle_t & handle,ffrt_function_header_t * f,const ffrt_deps_t * ins,const ffrt_deps_t * outs,const task_attr_private * attr)99     void onSubmit(ffrt_task_handle_t &handle, ffrt_function_header_t *f, const ffrt_deps_t *ins,
100         const ffrt_deps_t *outs, const task_attr_private *attr)
101     {
102         FFRT_TRACE_SCOPE(1, onSubmit);
103         // 1 Init eu and scheduler
104         auto ctx = ExecuteCtx::Cur();
105         auto en = Entity::Instance();
106 
107         // 2 Get current task's parent
108         auto parent = ctx->task ? ctx->task : DependenceManager::Root();
109 
110         std::vector<const void*> insNoDup;
111         std::vector<const void*> outsNoDup;
112         std::vector<TaskCtx*> in_handles;
113         // signature去重:1)outs去重
114         if (outs) {
115             if (!outsDeDup(outsNoDup, outs)) {
116                 FFRT_LOGE("onSubmit outsDeDup error");
117                 return;
118             }
119         }
120 
121         // signature去重:2)ins去重(不影响功能,skip);3)ins不和outs重复(当前不支持weak signature)
122         if (ins) {
123             insDeDup(in_handles, insNoDup, outsNoDup, ins);
124         }
125 
126         // 2.1 Create task ctx
127         TaskCtx* task = nullptr;
128         {
129             FFRT_TRACE_SCOPE(1, CreateTask);
130             task = reinterpret_cast<TaskCtx*>(static_cast<uintptr_t>(
131                 static_cast<size_t>(reinterpret_cast<uintptr_t>(f)) - OFFSETOF(TaskCtx, func_storage)));
132             new (task)TaskCtx(attr, parent, ++parent->childNum, nullptr);
133         }
134         FFRT_LOGD("submit task[%lu], name[%s]", task->gid, task->label.c_str());
135 #ifdef FFRT_BBOX_ENABLE
136         TaskSubmitCounterInc();
137 #endif
138         if (WITH_HANDLE != 0) {
139             task->IncDeleteRef();
140             handle = static_cast<ffrt_task_handle_t>(task);
141             outsNoDup.push_back(handle); // handle作为任务的输出signature
142         }
143         QoS qos = (attr == nullptr ? QoS() : QoS(attr->qos_));
144         task->SetQos(qos);
145         task->InitRelatedIntervals(parent);
146         /* The parent's number of subtasks to be completed increases by one,
147          * and decreases by one after the subtask is completed
148          */
149         task->IncChildRef();
150 
151         std::vector<std::pair<VersionCtx*, NestType>> inDatas;
152         std::vector<std::pair<VersionCtx*, NestType>> outDatas;
153 
154         if (!(insNoDup.empty() && outsNoDup.empty())) {
155             // 3 Put the submitted task into Entity
156             FFRT_TRACE_SCOPE(1, submitBeforeLock);
157             std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
158             FFRT_TRACE_SCOPE(1, submitAfterLock);
159 
160             MapSignature2Deps(task, insNoDup, outsNoDup, inDatas, outDatas);
161 
162             {
163                 FFRT_TRACE_SCOPE(1, dealInDepend);
164                 // 3.1 Process input dependencies
165                 for (auto& i : std::as_const(inDatas)) {
166                     i.first->AddConsumer(task, i.second);
167                 }
168             }
169 
170             {
171                 FFRT_TRACE_SCOPE(1, dealOutDepend);
172                 // 3.2 Process output dependencies
173                 for (auto& o : std::as_const(outDatas)) {
174                     o.first->AddProducer(task);
175                 }
176             }
177             task->in_handles.swap(in_handles);
178             if (task->depRefCnt != 0) {
179                 FFRT_BLOCK_TRACER(task->gid, dep);
180                 return;
181             }
182         }
183 
184         FFRT_LOGD("Submit completed, enter ready queue, task[%lu], name[%s]", task->gid, task->label.c_str());
185         task->UpdateState(TaskState::READY);
186 #ifdef FFRT_BBOX_ENABLE
187         TaskEnQueuCounterInc();
188 #endif
189     }
190 
onSubmitUV(ffrt_executor_task_t * task,const task_attr_private * attr)191     void onSubmitUV(ffrt_executor_task_t* task, const task_attr_private* attr)
192     {
193         FFRT_TRACE_SCOPE(1, onSubmitUV);
194 #ifdef FFRT_BBOX_ENABLE
195         TaskSubmitCounterInc();
196 #endif
197         QoS qos = (attr == nullptr ? QoS() : QoS(attr->qos_));
198 
199         LinkedList* node = (LinkedList *)(&task->wq);
200         FFRTScheduler* sch = FFRTScheduler::Instance();
201         if (!sch->InsertNode(node, qos)) {
202             FFRT_LOGE("Submit UV task failed!");
203             return;
204         }
205 #ifdef FFRT_BBOX_ENABLE
206         TaskEnQueuCounterInc();
207 #endif
208     }
209 
onWait()210     void onWait()
211     {
212         auto ctx = ExecuteCtx::Cur();
213         auto task = ctx->task ? ctx->task : DependenceManager::Root();
214         if (!USE_COROUTINE || task->parent == nullptr) {
215             std::unique_lock<std::mutex> lck(task->lock);
216             task->MultiDepenceAdd(Denpence::CALL_DEPENCE);
217             FFRT_LOGD("onWait name:%s gid=%lu", task->label.c_str(), task->gid);
218             task->childWaitCond_.wait(lck, [task] { return task->childWaitRefCnt == 0; });
219             return;
220         }
221 
222         auto childDepFun = [&](ffrt::TaskCtx* inTask) -> bool {
223             std::unique_lock<std::mutex> lck(inTask->lock);
224             if (inTask->childWaitRefCnt == 0) {
225                 return false;
226             }
227             inTask->MultiDepenceAdd(Denpence::CALL_DEPENCE);
228             inTask->UpdateState(ffrt::TaskState::BLOCKED);
229             return true;
230         };
231         FFRT_BLOCK_TRACER(task->gid, chd);
232         CoWait(childDepFun);
233     }
234 
235 #ifdef QOS_DEPENDENCY
236     void onWait(const ffrt_deps_t* deps, int64_t deadline = -1)
237 #else
238     void onWait(const ffrt_deps_t* deps)
239 #endif
240     {
241         auto ctx = ExecuteCtx::Cur();
242         auto task = ctx->task ? ctx->task : DependenceManager::Root();
243 
244         auto dataDepFun = [&]() {
245             std::vector<VersionCtx*> waitDatas;
246             waitDatas.reserve(deps->len);
247             std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
248 
249             for (uint32_t i = 0; i < deps->len; ++i) {
250                 auto d = deps->items[i].ptr;
251                 auto it = std::as_const(Entity::Instance()->vaMap).find(d);
252                 if (it != Entity::Instance()->vaMap.end()) {
253                     auto waitData = it->second;
254                     // Find the VersionCtx of the parent task level
255                     for (auto out : std::as_const(task->outs)) {
256                         if (waitData->signature == out->signature) {
257                             waitData = out;
258                             break;
259                         }
260                     }
261                     waitDatas.push_back(waitData);
262                 }
263             }
264 #ifdef QOS_DEPENDENCY
265             if (deadline != -1) {
266                 Scheduler::Instance()->onWait(waitDatas, deadline);
267             }
268 #endif
269             for (auto data : std::as_const(waitDatas)) {
270                 data->AddDataWaitTaskByThis(task);
271             }
272         };
273 
274         if (!USE_COROUTINE || task->parent == nullptr) {
275             dataDepFun();
276             std::unique_lock<std::mutex> lck(task->lock);
277             task->MultiDepenceAdd(Denpence::DATA_DEPENCE);
278             FFRT_LOGD("onWait name:%s gid=%lu", task->label.c_str(), task->gid);
279             task->dataWaitCond_.wait(lck, [task] { return task->dataWaitRefCnt == 0; });
280             return;
281         }
282 
283         auto pendDataDepFun = [&](ffrt::TaskCtx* inTask) -> bool {
284             dataDepFun();
285             FFRT_LOGD("onWait name:%s gid=%lu", inTask->label.c_str(), inTask->gid);
286             std::unique_lock<std::mutex> lck(inTask->lock);
287             if (inTask->dataWaitRefCnt == 0) {
288                 return false;
289             }
290             inTask->MultiDepenceAdd(Denpence::DATA_DEPENCE);
291             inTask->UpdateState(ffrt::TaskState::BLOCKED);
292             return true;
293         };
294         FFRT_BLOCK_TRACER(task->gid, dat);
295         CoWait(pendDataDepFun);
296     }
297 
onTaskDone(TaskCtx * task)298     void onTaskDone(TaskCtx* task)
299     {
300         FFRT_LOGD("Task completed, task[%lu], name[%s]", task->gid, task->label.c_str());
301 #ifdef FFRT_BBOX_ENABLE
302         TaskDoneCounterInc();
303 #endif
304         FFRT_TRACE_SCOPE(1, ontaskDone);
305         task->DecChildRef();
306         if (!(task->ins.empty() && task->outs.empty())) {
307             std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
308             FFRT_TRACE_SCOPE(1, taskDoneAfterLock);
309 
310             // Production data
311             for (auto out : std::as_const(task->outs)) {
312                 out->onProduced();
313             }
314             // Consumption data
315             for (auto in : std::as_const(task->ins)) {
316                 in->onConsumed(task);
317             }
318             for (auto in : std::as_const(task->in_handles)) {
319                 in->DecDeleteRef();
320             }
321 
322             // VersionCtx recycling
323             Entity::Instance()->RecycleVersion();
324         }
325 
326         task->RecycleTask();
327     }
328 
MapSignature2Deps(TaskCtx * task,const std::vector<const void * > & inDeps,const std::vector<const void * > & outDeps,std::vector<std::pair<VersionCtx *,NestType>> & inVersions,std::vector<std::pair<VersionCtx *,NestType>> & outVersions)329     void MapSignature2Deps(TaskCtx* task, const std::vector<const void*>& inDeps,
330         const std::vector<const void*>& outDeps, std::vector<std::pair<VersionCtx*, NestType>>& inVersions,
331         std::vector<std::pair<VersionCtx*, NestType>>& outVersions)
332     {
333         auto en = Entity::Instance();
334         // scene description:
335         for (auto signature : inDeps) {
336             VersionCtx* version = nullptr;
337             NestType type = NestType::DEFAULT;
338             // scene 1|2
339             for (auto parentOut : std::as_const(task->parent->outs)) {
340                 if (parentOut->signature == signature) {
341                     version = parentOut;
342                     type = NestType::PARENTOUT;
343                     goto add_inversion;
344                 }
345             }
346             // scene 3
347             for (auto parentIn : std::as_const(task->parent->ins)) {
348                 if (parentIn->signature == signature) {
349                     version = parentIn;
350                     type = NestType::PARENTIN;
351                     goto add_inversion;
352                 }
353             }
354             // scene 4
355             version = en->VA2Ctx(signature, task);
356         add_inversion:
357             inVersions.push_back({version, type});
358         }
359 
360         for (auto signature : outDeps) {
361             VersionCtx* version = nullptr;
362             NestType type = NestType::DEFAULT;
363             // scene 5|6
364             for (auto parentOut : std::as_const(task->parent->outs)) {
365                 if (parentOut->signature == signature) {
366                     version = parentOut;
367                     type = NestType::PARENTOUT;
368                     goto add_outversion;
369                 }
370             }
371             // scene 7
372 #ifndef FFRT_RELEASE
373             for (auto parentIn : std::as_const(task->parent->ins)) {
374                 if (parentIn->signature == signature) {
375                     FFRT_LOGE("parent's indep only cannot be child's outdep");
376                 }
377             }
378 #endif
379             // scene 8
380             version = en->VA2Ctx(signature, task);
381         add_outversion:
382             outVersions.push_back({ version, type });
383         }
384     }
385 
386 #ifdef MUTEX_PERF // Mutex Lock&Unlock Cycles Statistic
387     xx::mutex& criticalMutex_;
388 #else
389     fast_mutex& criticalMutex_;
390 #endif
391 };
392 } // namespace ffrt
393 #endif
394