1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sdependence_manager.h"
17 #include "dfx/trace_record/ffrt_trace_record.h"
18 #include "util/worker_monitor.h"
19 #include "util/ffrt_facade.h"
20 #include "util/slab.h"
21 #include "tm/queue_task.h"
22 #include "tm/io_task.h"
23
24 #ifdef FFRT_ASYNC_STACKTRACE
25 #include "dfx/async_stack/ffrt_async_stack.h"
26 #endif
27
28 namespace ffrt {
29
SDependenceManager()30 SDependenceManager::SDependenceManager() : criticalMutex_(Entity::Instance()->criticalMutex_)
31 {
32 // control construct sequences of singletons
33 #ifdef FFRT_OH_TRACE_ENABLE
34 TraceAdapter::Instance();
35 #endif
36 SimpleAllocator<CPUEUTask>::Instance();
37 SimpleAllocator<SCPUEUTask>::Instance();
38 SimpleAllocator<QueueTask>::Instance();
39 SimpleAllocator<IOTask>::Instance();
40 SimpleAllocator<VersionCtx>::Instance();
41 SimpleAllocator<WaitUntilEntry>::Instance();
42 QSimpleAllocator<CoRoutine>::Instance(CoStackAttr::Instance()->size);
43 PollerProxy::Instance();
44 FFRTScheduler::Instance();
45 #ifdef FFRT_WORKER_MONITOR
46 WorkerMonitor::GetInstance();
47 #endif
48 ExecuteUnit::Instance();
49 TaskState::RegisterOps(TaskState::EXITED,
50 [this](CPUEUTask* task) { return this->onTaskDone(static_cast<SCPUEUTask*>(task)), true; });
51
52 #ifdef FFRT_OH_TRACE_ENABLE
53 _StartTrace(HITRACE_TAG_FFRT, "dm_init", -1); // init g_tagsProperty for ohos ffrt trace
54 _FinishTrace(HITRACE_TAG_FFRT);
55 #endif
56 DelayedWorker::GetInstance();
57 }
58
~SDependenceManager()59 SDependenceManager::~SDependenceManager()
60 {
61 }
62
RemoveRepeatedDeps(std::vector<CPUEUTask * > & in_handles,const ffrt_deps_t * ins,const ffrt_deps_t * outs,std::vector<const void * > & insNoDup,std::vector<const void * > & outsNoDup)63 void SDependenceManager::RemoveRepeatedDeps(std::vector<CPUEUTask*>& in_handles, const ffrt_deps_t* ins, const ffrt_deps_t* outs,
64 std::vector<const void *>& insNoDup, std::vector<const void *>& outsNoDup)
65 {
66 // signature去重:1)outs去重
67 if (outs) {
68 OutsDedup(outsNoDup, outs);
69 }
70
71 // signature去重:2)ins去重(不影响功能,skip);3)ins不和outs重复(当前不支持weak signature)
72 if (ins) {
73 InsDedup(in_handles, insNoDup, outsNoDup, ins);
74 }
75 }
76
onSubmit(bool has_handle,ffrt_task_handle_t & handle,ffrt_function_header_t * f,const ffrt_deps_t * ins,const ffrt_deps_t * outs,const task_attr_private * attr)77 void SDependenceManager::onSubmit(bool has_handle, ffrt_task_handle_t &handle, ffrt_function_header_t *f,
78 const ffrt_deps_t *ins, const ffrt_deps_t *outs, const task_attr_private *attr)
79 {
80 // 0 check outs handle
81 if (!CheckOutsHandle(outs)) {
82 FFRT_LOGE("outs contain handles error");
83 return;
84 }
85
86 // 1 Init eu and scheduler
87 auto ctx = ExecuteCtx::Cur();
88
89 // 2 Get current task's parent
90 auto parent = (ctx->task && ctx->task->type == ffrt_normal_task) ? ctx->task : DependenceManager::Root();
91
92 // 2.1 Create task ctx
93 SCPUEUTask* task = nullptr;
94 {
95 task = reinterpret_cast<SCPUEUTask*>(static_cast<uintptr_t>(
96 static_cast<size_t>(reinterpret_cast<uintptr_t>(f)) - OFFSETOF(SCPUEUTask, func_storage)));
97 new (task)SCPUEUTask(attr, parent, ++parent->childNum, QoS());
98 }
99 #ifdef ENABLE_HITRACE_CHAIN
100 if (HiTraceChainGetId().valid == HITRACE_ID_VALID) {
101 task->traceId_ = HiTraceChainCreateSpan();
102 }
103 #endif
104 FFRT_SUBMIT_MARKER(task->gid);
105 #ifdef FFRT_ASYNC_STACKTRACE
106 {
107 task->stackId = FFRTCollectAsyncStack();
108 }
109 #endif
110 QoS qos = (attr == nullptr ? QoS() : QoS(attr->qos_));
111 FFRTTraceRecord::TaskSubmit<ffrt_normal_task>(qos, &(task->createTime), &(task->fromTid));
112
113 std::vector<const void*> insNoDup;
114 std::vector<const void*> outsNoDup;
115 RemoveRepeatedDeps(task->in_handles, ins, outs, insNoDup, outsNoDup);
116
117 #ifdef FFRT_OH_WATCHDOG_ENABLE
118 if (attr != nullptr && IsValidTimeout(task->gid, attr->timeout_)) {
119 task->isWatchdogEnable = true;
120 AddTaskToWatchdog(task->gid);
121 SendTimeoutWatchdog(task->gid, attr->timeout_, attr->delay_);
122 }
123 #endif
124 if (has_handle) {
125 task->IncDeleteRef();
126 handle = static_cast<ffrt_task_handle_t>(task);
127 outsNoDup.push_back(handle); // handle作为任务的输出signature
128 }
129 task->SetQos(qos);
130 /* The parent's number of subtasks to be completed increases by one,
131 * and decreases by one after the subtask is completed
132 */
133 task->IncChildRef();
134
135 if (!(insNoDup.empty() && outsNoDup.empty())) {
136 std::vector<std::pair<VersionCtx*, NestType>> inDatas;
137 std::vector<std::pair<VersionCtx*, NestType>> outDatas;
138 // 3 Put the submitted task into Entity
139 std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
140
141 MapSignature2Deps(task, insNoDup, outsNoDup, inDatas, outDatas);
142
143 {
144 // 3.1 Process input dependencies
145 for (auto& i : std::as_const(inDatas)) {
146 i.first->AddConsumer(task, i.second);
147 }
148 }
149
150 {
151 // 3.2 Process output dependencies
152 for (auto& o : std::as_const(outDatas)) {
153 o.first->AddProducer(task);
154 }
155 }
156 if (task->dataRefCnt.submitDep != 0) {
157 FFRT_BLOCK_TRACER(task->gid, dep);
158 FFRT_TRACE_END();
159 return;
160 }
161 }
162
163 if (attr != nullptr) {
164 task->notifyWorker_ = attr->notifyWorker_;
165 }
166
167 task->UpdateState(TaskState::READY);
168 FFRTTraceRecord::TaskEnqueue<ffrt_normal_task>(qos);
169 FFRT_TRACE_END();
170 }
171
onWait()172 void SDependenceManager::onWait()
173 {
174 auto ctx = ExecuteCtx::Cur();
175 auto baseTask = (ctx->task && ctx->task->type == ffrt_normal_task) ? ctx->task : DependenceManager::Root();
176 auto task = static_cast<SCPUEUTask*>(baseTask);
177
178 if (ThreadWaitMode(task)) {
179 std::unique_lock<std::mutex> lck(task->mutex_);
180 task->MultiDepenceAdd(Denpence::CALL_DEPENCE);
181 FFRT_LOGD("onWait name:%s gid=%lu", task->label.c_str(), task->gid);
182 if (FFRT_UNLIKELY(LegacyMode(task))) {
183 task->blockType = BlockType::BLOCK_THREAD;
184 }
185 task->waitCond_.wait(lck, [task] { return task->childRefCnt == 0; });
186 return;
187 }
188
189 auto childDepFun = [&](ffrt::CPUEUTask* task) -> bool {
190 auto sTask = static_cast<SCPUEUTask*>(task);
191 std::unique_lock<std::mutex> lck(sTask->mutex_);
192 if (sTask->childRefCnt == 0) {
193 return false;
194 }
195 sTask->MultiDepenceAdd(Denpence::CALL_DEPENCE);
196 sTask->UpdateState(ffrt::TaskState::BLOCKED);
197 return true;
198 };
199 FFRT_BLOCK_TRACER(task->gid, chd);
200 CoWait(childDepFun);
201 }
202
203 #ifdef QOS_DEPENDENCY
onWait(const ffrt_deps_t * deps,int64_t deadline=-1)204 void SDependenceManager::onWait(const ffrt_deps_t* deps, int64_t deadline = -1)
205 #else
206 void SDependenceManager::onWait(const ffrt_deps_t* deps)
207 #endif
208 {
209 auto ctx = ExecuteCtx::Cur();
210 auto baseTask = (ctx->task && ctx->task->type == ffrt_normal_task) ? ctx->task : DependenceManager::Root();
211 auto task = static_cast<SCPUEUTask*>(baseTask);
212 task->dataRefCnt.waitDep = 0;
213
214 auto dataDepFun = [&]() {
215 std::vector<VersionCtx*> waitDatas;
216 waitDatas.reserve(deps->len);
217 std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
218
219 for (uint32_t i = 0; i < deps->len; ++i) {
220 auto d = deps->items[i].ptr;
221 auto it = std::as_const(Entity::Instance()->vaMap).find(d);
222 if (it != Entity::Instance()->vaMap.end()) {
223 auto waitData = it->second;
224 // Find the VersionCtx of the parent task level
225 for (auto out : std::as_const(task->outs)) {
226 if (waitData->signature == out->signature) {
227 waitData = out;
228 break;
229 }
230 }
231 waitDatas.push_back(waitData);
232 }
233 }
234 #ifdef QOS_DEPENDENCY
235 if (deadline != -1) {
236 Scheduler::Instance()->onWait(waitDatas, deadline);
237 }
238 #endif
239 for (auto data : std::as_const(waitDatas)) {
240 data->AddDataWaitTaskByThis(task);
241 }
242 };
243
244 if (ThreadWaitMode(task)) {
245 dataDepFun();
246 std::unique_lock<std::mutex> lck(task->mutex_);
247 task->MultiDepenceAdd(Denpence::DATA_DEPENCE);
248 FFRT_LOGD("onWait name:%s gid=%lu", task->label.c_str(), task->gid);
249 if (FFRT_UNLIKELY(LegacyMode(task))) {
250 task->blockType = BlockType::BLOCK_THREAD;
251 }
252 task->waitCond_.wait(lck, [task] { return task->dataRefCnt.waitDep == 0; });
253 return;
254 }
255
256 auto pendDataDepFun = [&](ffrt::CPUEUTask* task) -> bool {
257 auto sTask = static_cast<SCPUEUTask*>(task);
258 dataDepFun();
259 FFRT_LOGD("onWait name:%s gid=%lu", sTask->label.c_str(), sTask->gid);
260 std::unique_lock<std::mutex> lck(sTask->mutex_);
261 if (sTask->dataRefCnt.waitDep == 0) {
262 return false;
263 }
264 sTask->MultiDepenceAdd(Denpence::DATA_DEPENCE);
265 sTask->UpdateState(ffrt::TaskState::BLOCKED);
266 return true;
267 };
268 FFRT_BLOCK_TRACER(task->gid, dat);
269 CoWait(pendDataDepFun);
270 }
271
onExecResults(ffrt_task_handle_t handle)272 int SDependenceManager::onExecResults(ffrt_task_handle_t handle)
273 {
274 return 0;
275 }
276
onTaskDone(CPUEUTask * task)277 void SDependenceManager::onTaskDone(CPUEUTask* task)
278 {
279 auto sTask = static_cast<SCPUEUTask*>(task);
280 FFRTTraceRecord::TaskDone<ffrt_normal_task>(task->GetQos());
281 FFRTTraceRecord::TaskDone<ffrt_normal_task>(task->GetQos(), task);
282 FFRT_TRACE_SCOPE(1, ontaskDone);
283 sTask->DecChildRef();
284 if (!(sTask->ins.empty() && sTask->outs.empty())) {
285 std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
286 FFRT_TRACE_SCOPE(1, taskDoneAfterLock);
287
288 // Production data
289 for (auto out : std::as_const(sTask->outs)) {
290 out->onProduced();
291 }
292 // Consumption data
293 for (auto in : std::as_const(sTask->ins)) {
294 in->onConsumed(sTask);
295 }
296 for (auto in : std::as_const(sTask->in_handles)) {
297 in->DecDeleteRef();
298 }
299 // VersionCtx recycling
300 Entity::Instance()->RecycleVersion();
301 }
302 if (task->isWatchdogEnable) {
303 RemoveTaskFromWatchdog(task->gid);
304 }
305 sTask->RecycleTask();
306 }
307
MapSignature2Deps(SCPUEUTask * task,const std::vector<const void * > & inDeps,const std::vector<const void * > & outDeps,std::vector<std::pair<VersionCtx *,NestType>> & inVersions,std::vector<std::pair<VersionCtx *,NestType>> & outVersions)308 void SDependenceManager::MapSignature2Deps(SCPUEUTask* task, const std::vector<const void*>& inDeps,
309 const std::vector<const void*>& outDeps, std::vector<std::pair<VersionCtx*, NestType>>& inVersions,
310 std::vector<std::pair<VersionCtx*, NestType>>& outVersions)
311 {
312 auto en = Entity::Instance();
313 // scene description:
314 for (auto signature : inDeps) {
315 VersionCtx* version = nullptr;
316 NestType type = NestType::DEFAULT;
317 // scene 1|2
318 for (auto parentOut : std::as_const(static_cast<SCPUEUTask*>(task->parent)->outs)) {
319 if (parentOut->signature == signature) {
320 version = parentOut;
321 type = NestType::PARENTOUT;
322 goto add_inversion;
323 }
324 }
325 // scene 3
326 for (auto parentIn : std::as_const(static_cast<SCPUEUTask*>(task->parent)->ins)) {
327 if (parentIn->signature == signature) {
328 version = parentIn;
329 type = NestType::PARENTIN;
330 goto add_inversion;
331 }
332 }
333 // scene 4
334 version = en->VA2Ctx(signature, task);
335 add_inversion:
336 inVersions.push_back({version, type});
337 }
338
339 for (auto signature : outDeps) {
340 VersionCtx* version = nullptr;
341 NestType type = NestType::DEFAULT;
342 // scene 5|6
343 for (auto parentOut : std::as_const(static_cast<SCPUEUTask*>(task->parent)->outs)) {
344 if (parentOut->signature == signature) {
345 version = parentOut;
346 type = NestType::PARENTOUT;
347 goto add_outversion;
348 }
349 }
350 // scene 7
351 #ifndef FFRT_RELEASE
352 for (auto parentIn : std::as_const(static_cast<SCPUEUTask*>(task->parent)->ins)) {
353 if (parentIn->signature == signature) {
354 FFRT_LOGE("parent's indep only cannot be child's outdep");
355 }
356 }
357 #endif
358 // scene 8
359 version = en->VA2Ctx(signature, task);
360 add_outversion:
361 outVersions.push_back({version, type});
362 }
363 }
364
onSkip(ffrt_task_handle_t handle)365 int SDependenceManager::onSkip(ffrt_task_handle_t handle)
366 {
367 ffrt::CPUEUTask *task = static_cast<ffrt::CPUEUTask*>(handle);
368 auto exp = ffrt::SkipStatus::SUBMITTED;
369 if (__atomic_compare_exchange_n(&task->skipped, &exp, ffrt::SkipStatus::SKIPPED, 0, __ATOMIC_ACQUIRE,
370 __ATOMIC_RELAXED)) {
371 return ffrt_success;
372 }
373
374 FFRT_LOGE("skip task [%lu] failed", task->gid);
375 return ffrt_error;
376 }
377 } // namespace ffrt