• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sdependence_manager.h"
17 #include "dfx/trace_record/ffrt_trace_record.h"
18 #include "util/worker_monitor.h"
19 #include "util/ffrt_facade.h"
20 #include "util/slab.h"
21 
22 #ifdef FFRT_ASYNC_STACKTRACE
23 #include "dfx/async_stack/ffrt_async_stack.h"
24 #endif
25 
26 #ifdef FFRT_ENABLE_HITRACE_CHAIN
27 #include "dfx/trace/ffrt_trace_chain.h"
28 #endif
29 
30 namespace ffrt {
31 
SDependenceManager()32 SDependenceManager::SDependenceManager() : criticalMutex_(Entity::Instance()->criticalMutex_)
33 {
34     SimpleAllocator<SCPUEUTask>::Instance();
35 #ifdef FFRT_OH_TRACE_ENABLE
36     _StartTrace(HITRACE_TAG_FFRT, "dm_init", -1); // init g_tagsProperty for ohos ffrt trace
37     _FinishTrace(HITRACE_TAG_FFRT);
38 #endif
39     FFRT_LOGD("Construction completed.");
40 }
41 
~SDependenceManager()42 SDependenceManager::~SDependenceManager()
43 {
44     FFRT_LOGD("Destruction completed.");
45 }
46 
RemoveRepeatedDeps(std::vector<CPUEUTask * > & in_handles,const ffrt_deps_t * ins,const ffrt_deps_t * outs,std::vector<const void * > & insNoDup,std::vector<const void * > & outsNoDup)47 void SDependenceManager::RemoveRepeatedDeps(std::vector<CPUEUTask*>& in_handles, const ffrt_deps_t* ins, const ffrt_deps_t* outs,
48     std::vector<const void *>& insNoDup, std::vector<const void *>& outsNoDup)
49 {
50     // signature去重:1)outs去重
51     if (outs) {
52         OutsDedup(outsNoDup, outs);
53     }
54 
55     // signature去重:2)ins去重(不影响功能,skip);3)ins不和outs重复(当前不支持weak signature)
56     if (ins) {
57         InsDedup(in_handles, insNoDup, outsNoDup, ins);
58     }
59 }
60 
onSubmit(bool has_handle,ffrt_task_handle_t & handle,ffrt_function_header_t * f,const ffrt_deps_t * ins,const ffrt_deps_t * outs,const task_attr_private * attr)61 void SDependenceManager::onSubmit(bool has_handle, ffrt_task_handle_t &handle, ffrt_function_header_t *f,
62     const ffrt_deps_t *ins, const ffrt_deps_t *outs, const task_attr_private *attr)
63 {
64     // 0 check outs handle
65     if (!CheckOutsHandle(outs)) {
66         FFRT_SYSEVENT_LOGE("outs contain handles error");
67         return;
68     }
69 
70     // 1 Init eu and scheduler
71     auto ctx = ExecuteCtx::Cur();
72 
73     // 2 Get current task's parent
74     auto parent = (ctx->task && ctx->task->type == ffrt_normal_task) ?
75         static_cast<CPUEUTask*>(ctx->task) : DependenceManager::Root();
76 
77     // 2.1 Create task ctx
78     SCPUEUTask* task = nullptr;
79     {
80         task = reinterpret_cast<SCPUEUTask*>(static_cast<uintptr_t>(
81             static_cast<size_t>(reinterpret_cast<uintptr_t>(f)) - OFFSETOF(SCPUEUTask, func_storage)));
82         new (task)SCPUEUTask(attr, parent, ++parent->childNum);
83     }
84 #ifdef FFRT_ENABLE_HITRACE_CHAIN
85     if (TraceChainAdapter::Instance().HiTraceChainGetId().valid == HITRACE_ID_VALID) {
86         task->traceId_ = TraceChainAdapter::Instance().HiTraceChainCreateSpan();
87     }
88 #endif
89     FFRT_SUBMIT_MARKER(task->gid);
90 #ifdef FFRT_ASYNC_STACKTRACE
91     {
92         task->stackId = FFRTCollectAsyncStack();
93     }
94 #endif
95     QoS qos = (attr == nullptr ? QoS() : QoS(attr->qos_));
96     task->SetQos(qos);
97     task->Prepare();
98 
99     std::vector<CPUEUTask*> inHandles;
100     std::vector<const void*> insNoDup;
101     std::vector<const void*> outsNoDup;
102     RemoveRepeatedDeps(inHandles, ins, outs, insNoDup, outsNoDup);
103     task->SetInHandles(inHandles);
104 
105 #ifdef FFRT_OH_WATCHDOG_ENABLE
106     if (attr != nullptr && IsValidTimeout(task->gid, attr->timeout_)) {
107         task->isWatchdogEnable = true;
108         AddTaskToWatchdog(task->gid);
109         SendTimeoutWatchdog(task->gid, attr->timeout_, attr->delay_);
110     }
111 #endif
112     if (has_handle) {
113         task->IncDeleteRef();
114         handle = static_cast<ffrt_task_handle_t>(task);
115         outsNoDup.push_back(handle); // handle作为任务的输出signature
116     }
117 
118     /* The parent's number of subtasks to be completed increases by one,
119         * and decreases by one after the subtask is completed
120         */
121     task->IncChildRef();
122 
123     if (!(insNoDup.empty() && outsNoDup.empty())) {
124         std::vector<std::pair<VersionCtx*, NestType>> inDatas;
125         std::vector<std::pair<VersionCtx*, NestType>> outDatas;
126         // 3 Put the submitted task into Entity
127         std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
128 
129         MapSignature2Deps(task, insNoDup, outsNoDup, inDatas, outDatas);
130 
131         {
132             // 3.1 Process input dependencies
133             for (auto& i : std::as_const(inDatas)) {
134                 i.first->AddConsumer(task, i.second);
135             }
136         }
137 
138         {
139             // 3.2 Process output dependencies
140             for (auto& o : std::as_const(outDatas)) {
141                 o.first->AddProducer(task);
142             }
143         }
144         if (task->dataRefCnt.submitDep != 0) {
145             FFRT_BLOCK_TRACER(task->gid, dep);
146             FFRT_TRACE_END();
147             return;
148         }
149     }
150 
151     task->Ready();
152     FFRT_TRACE_END();
153 }
154 
onWait()155 void SDependenceManager::onWait()
156 {
157     auto ctx = ExecuteCtx::Cur();
158     auto baseTask = (ctx->task && ctx->task->type == ffrt_normal_task) ? ctx->task : DependenceManager::Root();
159     auto task = static_cast<SCPUEUTask*>(baseTask);
160 
161     if (task->Block() == BlockType::BLOCK_THREAD) {
162         std::unique_lock<std::mutex> lck(task->mutex_);
163         task->MultiDependenceAdd(Dependence::CALL_DEPENDENCE);
164         FFRT_LOGD("onWait name:%s gid=%lu", task->GetLabel().c_str(), task->gid);
165         task->waitCond_.wait(lck, [task] { return task->childRefCnt == 0; });
166         task->Wake();
167         return;
168     }
169 
170     auto childDepFun = [](CoTask* task) -> bool {
171         auto sTask = static_cast<SCPUEUTask*>(task);
172         std::lock_guard<std::mutex> lck(sTask->mutex_);
173         if (sTask->childRefCnt == 0) {
174             return false;
175         }
176         sTask->MultiDependenceAdd(Dependence::CALL_DEPENDENCE);
177         return true;
178     };
179     FFRT_BLOCK_TRACER(task->gid, chd);
180     CoWait(childDepFun);
181 }
182 
onWait(const ffrt_deps_t * deps)183 void SDependenceManager::onWait(const ffrt_deps_t* deps)
184 {
185     auto ctx = ExecuteCtx::Cur();
186     auto baseTask = (ctx->task && ctx->task->type == ffrt_normal_task) ? ctx->task : DependenceManager::Root();
187     auto task = static_cast<SCPUEUTask*>(baseTask);
188     task->dataRefCnt.waitDep = 0;
189 
190     auto dataDepFun = [&]() {
191         std::vector<VersionCtx*> waitDatas;
192         waitDatas.reserve(deps->len);
193         std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
194 
195         for (uint32_t i = 0; i < deps->len; ++i) {
196             auto d = deps->items[i].ptr;
197             auto it = std::as_const(Entity::Instance()->vaMap).find(d);
198             if (it != Entity::Instance()->vaMap.end()) {
199                 auto waitData = it->second;
200                 // Find the VersionCtx of the parent task level
201                 for (auto out : std::as_const(task->outs)) {
202                     if (waitData->signature == out->signature) {
203                         waitData = out;
204                         break;
205                     }
206                 }
207                 waitDatas.push_back(waitData);
208             }
209         }
210         for (auto data : std::as_const(waitDatas)) {
211             data->AddDataWaitTaskByThis(task);
212         }
213     };
214 
215     if (task->Block() == BlockType::BLOCK_THREAD) {
216         dataDepFun();
217         std::unique_lock<std::mutex> lck(task->mutex_);
218         task->MultiDependenceAdd(Dependence::DATA_DEPENDENCE);
219         FFRT_LOGD("onWait name:%s gid=%lu", task->GetLabel().c_str(), task->gid);
220         task->waitCond_.wait(lck, [task] { return task->dataRefCnt.waitDep == 0; });
221         task->Wake();
222         return;
223     }
224 
225     auto pendDataDepFun = [dataDepFun](ffrt::CoTask* task) -> bool {
226         auto sTask = static_cast<SCPUEUTask*>(task);
227         dataDepFun();
228         FFRT_LOGD("onWait name:%s gid=%lu", sTask->GetLabel().c_str(), sTask->gid);
229         std::lock_guard<std::mutex> lck(sTask->mutex_);
230         if (sTask->dataRefCnt.waitDep == 0) {
231             return false;
232         }
233         sTask->MultiDependenceAdd(Dependence::DATA_DEPENDENCE);
234         return true;
235     };
236     FFRT_BLOCK_TRACER(task->gid, dat);
237     CoWait(pendDataDepFun);
238 }
239 
onExecResults(ffrt_task_handle_t handle)240 int SDependenceManager::onExecResults(ffrt_task_handle_t handle)
241 {
242     return 0;
243 }
244 
onTaskDone(CPUEUTask * task)245 void SDependenceManager::onTaskDone(CPUEUTask* task)
246 {
247     auto sTask = static_cast<SCPUEUTask*>(task);
248     FFRTTraceRecord::TaskDone<ffrt_normal_task>(task->GetQos());
249     FFRTTraceRecord::TaskDone<ffrt_normal_task>(task->GetQos(),  task);
250     FFRT_TRACE_SCOPE(1, ontaskDone);
251     if (!(sTask->ins.empty() && sTask->outs.empty())) {
252         std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
253         FFRT_TRACE_SCOPE(1, taskDoneAfterLock);
254 
255         // Production data
256         for (auto out : std::as_const(sTask->outs)) {
257             out->onProduced();
258         }
259         // Consumption data
260         for (auto in : std::as_const(sTask->ins)) {
261             in->onConsumed(sTask);
262         }
263         for (auto in : sTask->GetInHandles()) {
264             in->DecDeleteRef();
265         }
266         // VersionCtx recycling
267         Entity::Instance()->RecycleVersion();
268     }
269     if (task->isWatchdogEnable) {
270         RemoveTaskFromWatchdog(task->gid);
271     }
272     // Note that `DecChildRef` is going to decrement the `childRefCnt`
273     // of the parent task. And if the parent happens to be
274     // root it may get deleted in `~RootTaskCtxWrapper`.
275     // Hence, following this call there should no longer
276     // be references or accesses to the root task object
277     // or any of its members. E.g. calling this
278     // before out->onProduced can lead to access
279     // of freed memory on wait condition notification
280     // of the parent task.
281     sTask->DecChildRef();
282     sTask->Finish();
283 }
284 
MapSignature2Deps(SCPUEUTask * task,const std::vector<const void * > & inDeps,const std::vector<const void * > & outDeps,std::vector<std::pair<VersionCtx *,NestType>> & inVersions,std::vector<std::pair<VersionCtx *,NestType>> & outVersions)285 void SDependenceManager::MapSignature2Deps(SCPUEUTask* task, const std::vector<const void*>& inDeps,
286     const std::vector<const void*>& outDeps, std::vector<std::pair<VersionCtx*, NestType>>& inVersions,
287     std::vector<std::pair<VersionCtx*, NestType>>& outVersions)
288 {
289     auto en = Entity::Instance();
290     // scene description:
291     for (auto signature : inDeps) {
292         VersionCtx* version = nullptr;
293         NestType type = NestType::DEFAULT;
294         // scene 1|2
295         for (auto parentOut : std::as_const(static_cast<SCPUEUTask*>(task->parent)->outs)) {
296             if (parentOut->signature == signature) {
297                 version = parentOut;
298                 type = NestType::PARENTOUT;
299                 goto add_inversion;
300             }
301         }
302         // scene 3
303         for (auto parentIn : std::as_const(static_cast<SCPUEUTask*>(task->parent)->ins)) {
304             if (parentIn->signature == signature) {
305                 version = parentIn;
306                 type = NestType::PARENTIN;
307                 goto add_inversion;
308             }
309         }
310         // scene 4
311         version = en->VA2Ctx(signature, task);
312     add_inversion:
313         inVersions.push_back({version, type});
314     }
315 
316     for (auto signature : outDeps) {
317         VersionCtx* version = nullptr;
318         NestType type = NestType::DEFAULT;
319         // scene 5|6
320         for (auto parentOut : std::as_const(static_cast<SCPUEUTask*>(task->parent)->outs)) {
321             if (parentOut->signature == signature) {
322                 version = parentOut;
323                 type = NestType::PARENTOUT;
324                 goto add_outversion;
325             }
326         }
327         // scene 7
328 #ifndef FFRT_RELEASE
329         for (auto parentIn : std::as_const(static_cast<SCPUEUTask*>(task->parent)->ins)) {
330             if (parentIn->signature == signature) {
331                 FFRT_SYSEVENT_LOGE("parent's indep only cannot be child's outdep");
332             }
333         }
334 #endif
335         // scene 8
336         version = en->VA2Ctx(signature, task);
337     add_outversion:
338         outVersions.push_back({version, type});
339     }
340 }
341 } // namespace ffrt
342