1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sdependence_manager.h"
17 #include "dfx/trace_record/ffrt_trace_record.h"
18 #include "util/worker_monitor.h"
19 #include "util/ffrt_facade.h"
20 #include "util/slab.h"
21
22 #ifdef FFRT_ASYNC_STACKTRACE
23 #include "dfx/async_stack/ffrt_async_stack.h"
24 #endif
25
26 #ifdef FFRT_ENABLE_HITRACE_CHAIN
27 #include "dfx/trace/ffrt_trace_chain.h"
28 #endif
29
30 namespace ffrt {
31
SDependenceManager()32 SDependenceManager::SDependenceManager() : criticalMutex_(Entity::Instance()->criticalMutex_)
33 {
34 SimpleAllocator<SCPUEUTask>::Instance();
35 #ifdef FFRT_OH_TRACE_ENABLE
36 _StartTrace(HITRACE_TAG_FFRT, "dm_init", -1); // init g_tagsProperty for ohos ffrt trace
37 _FinishTrace(HITRACE_TAG_FFRT);
38 #endif
39 FFRT_LOGD("Construction completed.");
40 }
41
~SDependenceManager()42 SDependenceManager::~SDependenceManager()
43 {
44 FFRT_LOGD("Destruction completed.");
45 }
46
RemoveRepeatedDeps(std::vector<CPUEUTask * > & in_handles,const ffrt_deps_t * ins,const ffrt_deps_t * outs,std::vector<const void * > & insNoDup,std::vector<const void * > & outsNoDup)47 void SDependenceManager::RemoveRepeatedDeps(std::vector<CPUEUTask*>& in_handles, const ffrt_deps_t* ins, const ffrt_deps_t* outs,
48 std::vector<const void *>& insNoDup, std::vector<const void *>& outsNoDup)
49 {
50 // signature去重:1)outs去重
51 if (outs) {
52 OutsDedup(outsNoDup, outs);
53 }
54
55 // signature去重:2)ins去重(不影响功能,skip);3)ins不和outs重复(当前不支持weak signature)
56 if (ins) {
57 InsDedup(in_handles, insNoDup, outsNoDup, ins);
58 }
59 }
60
onSubmit(bool has_handle,ffrt_task_handle_t & handle,ffrt_function_header_t * f,const ffrt_deps_t * ins,const ffrt_deps_t * outs,const task_attr_private * attr)61 void SDependenceManager::onSubmit(bool has_handle, ffrt_task_handle_t &handle, ffrt_function_header_t *f,
62 const ffrt_deps_t *ins, const ffrt_deps_t *outs, const task_attr_private *attr)
63 {
64 // 0 check outs handle
65 if (!CheckOutsHandle(outs)) {
66 FFRT_SYSEVENT_LOGE("outs contain handles error");
67 return;
68 }
69
70 // 1 Init eu and scheduler
71 auto ctx = ExecuteCtx::Cur();
72
73 // 2 Get current task's parent
74 auto parent = (ctx->task && ctx->task->type == ffrt_normal_task) ?
75 static_cast<CPUEUTask*>(ctx->task) : DependenceManager::Root();
76
77 // 2.1 Create task ctx
78 SCPUEUTask* task = nullptr;
79 {
80 task = reinterpret_cast<SCPUEUTask*>(static_cast<uintptr_t>(
81 static_cast<size_t>(reinterpret_cast<uintptr_t>(f)) - OFFSETOF(SCPUEUTask, func_storage)));
82 new (task)SCPUEUTask(attr, parent, ++parent->childNum);
83 }
84 #ifdef FFRT_ENABLE_HITRACE_CHAIN
85 if (TraceChainAdapter::Instance().HiTraceChainGetId().valid == HITRACE_ID_VALID) {
86 task->traceId_ = TraceChainAdapter::Instance().HiTraceChainCreateSpan();
87 }
88 #endif
89 FFRT_SUBMIT_MARKER(task->gid);
90 #ifdef FFRT_ASYNC_STACKTRACE
91 {
92 task->stackId = FFRTCollectAsyncStack();
93 }
94 #endif
95 QoS qos = (attr == nullptr ? QoS() : QoS(attr->qos_));
96 task->SetQos(qos);
97 task->Prepare();
98
99 std::vector<CPUEUTask*> inHandles;
100 std::vector<const void*> insNoDup;
101 std::vector<const void*> outsNoDup;
102 RemoveRepeatedDeps(inHandles, ins, outs, insNoDup, outsNoDup);
103 task->SetInHandles(inHandles);
104
105 #ifdef FFRT_OH_WATCHDOG_ENABLE
106 if (attr != nullptr && IsValidTimeout(task->gid, attr->timeout_)) {
107 task->isWatchdogEnable = true;
108 AddTaskToWatchdog(task->gid);
109 SendTimeoutWatchdog(task->gid, attr->timeout_, attr->delay_);
110 }
111 #endif
112 if (has_handle) {
113 task->IncDeleteRef();
114 handle = static_cast<ffrt_task_handle_t>(task);
115 outsNoDup.push_back(handle); // handle作为任务的输出signature
116 }
117
118 /* The parent's number of subtasks to be completed increases by one,
119 * and decreases by one after the subtask is completed
120 */
121 task->IncChildRef();
122
123 if (!(insNoDup.empty() && outsNoDup.empty())) {
124 std::vector<std::pair<VersionCtx*, NestType>> inDatas;
125 std::vector<std::pair<VersionCtx*, NestType>> outDatas;
126 // 3 Put the submitted task into Entity
127 std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
128
129 MapSignature2Deps(task, insNoDup, outsNoDup, inDatas, outDatas);
130
131 {
132 // 3.1 Process input dependencies
133 for (auto& i : std::as_const(inDatas)) {
134 i.first->AddConsumer(task, i.second);
135 }
136 }
137
138 {
139 // 3.2 Process output dependencies
140 for (auto& o : std::as_const(outDatas)) {
141 o.first->AddProducer(task);
142 }
143 }
144 if (task->dataRefCnt.submitDep != 0) {
145 FFRT_BLOCK_TRACER(task->gid, dep);
146 FFRT_TRACE_END();
147 return;
148 }
149 }
150
151 task->Ready();
152 FFRT_TRACE_END();
153 }
154
onWait()155 void SDependenceManager::onWait()
156 {
157 auto ctx = ExecuteCtx::Cur();
158 auto baseTask = (ctx->task && ctx->task->type == ffrt_normal_task) ? ctx->task : DependenceManager::Root();
159 auto task = static_cast<SCPUEUTask*>(baseTask);
160
161 if (task->Block() == BlockType::BLOCK_THREAD) {
162 std::unique_lock<std::mutex> lck(task->mutex_);
163 task->MultiDependenceAdd(Dependence::CALL_DEPENDENCE);
164 FFRT_LOGD("onWait name:%s gid=%lu", task->GetLabel().c_str(), task->gid);
165 task->waitCond_.wait(lck, [task] { return task->childRefCnt == 0; });
166 task->Wake();
167 return;
168 }
169
170 auto childDepFun = [](CoTask* task) -> bool {
171 auto sTask = static_cast<SCPUEUTask*>(task);
172 std::lock_guard<std::mutex> lck(sTask->mutex_);
173 if (sTask->childRefCnt == 0) {
174 return false;
175 }
176 sTask->MultiDependenceAdd(Dependence::CALL_DEPENDENCE);
177 return true;
178 };
179 FFRT_BLOCK_TRACER(task->gid, chd);
180 CoWait(childDepFun);
181 }
182
onWait(const ffrt_deps_t * deps)183 void SDependenceManager::onWait(const ffrt_deps_t* deps)
184 {
185 auto ctx = ExecuteCtx::Cur();
186 auto baseTask = (ctx->task && ctx->task->type == ffrt_normal_task) ? ctx->task : DependenceManager::Root();
187 auto task = static_cast<SCPUEUTask*>(baseTask);
188 task->dataRefCnt.waitDep = 0;
189
190 auto dataDepFun = [&]() {
191 std::vector<VersionCtx*> waitDatas;
192 waitDatas.reserve(deps->len);
193 std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
194
195 for (uint32_t i = 0; i < deps->len; ++i) {
196 auto d = deps->items[i].ptr;
197 auto it = std::as_const(Entity::Instance()->vaMap).find(d);
198 if (it != Entity::Instance()->vaMap.end()) {
199 auto waitData = it->second;
200 // Find the VersionCtx of the parent task level
201 for (auto out : std::as_const(task->outs)) {
202 if (waitData->signature == out->signature) {
203 waitData = out;
204 break;
205 }
206 }
207 waitDatas.push_back(waitData);
208 }
209 }
210 for (auto data : std::as_const(waitDatas)) {
211 data->AddDataWaitTaskByThis(task);
212 }
213 };
214
215 if (task->Block() == BlockType::BLOCK_THREAD) {
216 dataDepFun();
217 std::unique_lock<std::mutex> lck(task->mutex_);
218 task->MultiDependenceAdd(Dependence::DATA_DEPENDENCE);
219 FFRT_LOGD("onWait name:%s gid=%lu", task->GetLabel().c_str(), task->gid);
220 task->waitCond_.wait(lck, [task] { return task->dataRefCnt.waitDep == 0; });
221 task->Wake();
222 return;
223 }
224
225 auto pendDataDepFun = [dataDepFun](ffrt::CoTask* task) -> bool {
226 auto sTask = static_cast<SCPUEUTask*>(task);
227 dataDepFun();
228 FFRT_LOGD("onWait name:%s gid=%lu", sTask->GetLabel().c_str(), sTask->gid);
229 std::lock_guard<std::mutex> lck(sTask->mutex_);
230 if (sTask->dataRefCnt.waitDep == 0) {
231 return false;
232 }
233 sTask->MultiDependenceAdd(Dependence::DATA_DEPENDENCE);
234 return true;
235 };
236 FFRT_BLOCK_TRACER(task->gid, dat);
237 CoWait(pendDataDepFun);
238 }
239
onExecResults(ffrt_task_handle_t handle)240 int SDependenceManager::onExecResults(ffrt_task_handle_t handle)
241 {
242 return 0;
243 }
244
onTaskDone(CPUEUTask * task)245 void SDependenceManager::onTaskDone(CPUEUTask* task)
246 {
247 auto sTask = static_cast<SCPUEUTask*>(task);
248 FFRTTraceRecord::TaskDone<ffrt_normal_task>(task->GetQos());
249 FFRTTraceRecord::TaskDone<ffrt_normal_task>(task->GetQos(), task);
250 FFRT_TRACE_SCOPE(1, ontaskDone);
251 if (!(sTask->ins.empty() && sTask->outs.empty())) {
252 std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
253 FFRT_TRACE_SCOPE(1, taskDoneAfterLock);
254
255 // Production data
256 for (auto out : std::as_const(sTask->outs)) {
257 out->onProduced();
258 }
259 // Consumption data
260 for (auto in : std::as_const(sTask->ins)) {
261 in->onConsumed(sTask);
262 }
263 for (auto in : sTask->GetInHandles()) {
264 in->DecDeleteRef();
265 }
266 // VersionCtx recycling
267 Entity::Instance()->RecycleVersion();
268 }
269 if (task->isWatchdogEnable) {
270 RemoveTaskFromWatchdog(task->gid);
271 }
272 // Note that `DecChildRef` is going to decrement the `childRefCnt`
273 // of the parent task. And if the parent happens to be
274 // root it may get deleted in `~RootTaskCtxWrapper`.
275 // Hence, following this call there should no longer
276 // be references or accesses to the root task object
277 // or any of its members. E.g. calling this
278 // before out->onProduced can lead to access
279 // of freed memory on wait condition notification
280 // of the parent task.
281 sTask->DecChildRef();
282 sTask->Finish();
283 }
284
MapSignature2Deps(SCPUEUTask * task,const std::vector<const void * > & inDeps,const std::vector<const void * > & outDeps,std::vector<std::pair<VersionCtx *,NestType>> & inVersions,std::vector<std::pair<VersionCtx *,NestType>> & outVersions)285 void SDependenceManager::MapSignature2Deps(SCPUEUTask* task, const std::vector<const void*>& inDeps,
286 const std::vector<const void*>& outDeps, std::vector<std::pair<VersionCtx*, NestType>>& inVersions,
287 std::vector<std::pair<VersionCtx*, NestType>>& outVersions)
288 {
289 auto en = Entity::Instance();
290 // scene description:
291 for (auto signature : inDeps) {
292 VersionCtx* version = nullptr;
293 NestType type = NestType::DEFAULT;
294 // scene 1|2
295 for (auto parentOut : std::as_const(static_cast<SCPUEUTask*>(task->parent)->outs)) {
296 if (parentOut->signature == signature) {
297 version = parentOut;
298 type = NestType::PARENTOUT;
299 goto add_inversion;
300 }
301 }
302 // scene 3
303 for (auto parentIn : std::as_const(static_cast<SCPUEUTask*>(task->parent)->ins)) {
304 if (parentIn->signature == signature) {
305 version = parentIn;
306 type = NestType::PARENTIN;
307 goto add_inversion;
308 }
309 }
310 // scene 4
311 version = en->VA2Ctx(signature, task);
312 add_inversion:
313 inVersions.push_back({version, type});
314 }
315
316 for (auto signature : outDeps) {
317 VersionCtx* version = nullptr;
318 NestType type = NestType::DEFAULT;
319 // scene 5|6
320 for (auto parentOut : std::as_const(static_cast<SCPUEUTask*>(task->parent)->outs)) {
321 if (parentOut->signature == signature) {
322 version = parentOut;
323 type = NestType::PARENTOUT;
324 goto add_outversion;
325 }
326 }
327 // scene 7
328 #ifndef FFRT_RELEASE
329 for (auto parentIn : std::as_const(static_cast<SCPUEUTask*>(task->parent)->ins)) {
330 if (parentIn->signature == signature) {
331 FFRT_SYSEVENT_LOGE("parent's indep only cannot be child's outdep");
332 }
333 }
334 #endif
335 // scene 8
336 version = en->VA2Ctx(signature, task);
337 add_outversion:
338 outVersions.push_back({version, type});
339 }
340 }
341 } // namespace ffrt
342