• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sdependence_manager.h"
17 #include "dfx/trace_record/ffrt_trace_record.h"
18 #include "util/worker_monitor.h"
19 #include "util/ffrt_facade.h"
20 #include "util/slab.h"
21 #include "tm/queue_task.h"
22 #include "tm/io_task.h"
23 
24 #ifdef FFRT_ASYNC_STACKTRACE
25 #include "dfx/async_stack/ffrt_async_stack.h"
26 #endif
27 
28 namespace ffrt {
29 
SDependenceManager()30 SDependenceManager::SDependenceManager() : criticalMutex_(Entity::Instance()->criticalMutex_)
31 {
32     // control construct sequences of singletons
33 #ifdef FFRT_OH_TRACE_ENABLE
34     TraceAdapter::Instance();
35 #endif
36     SimpleAllocator<CPUEUTask>::Instance();
37     SimpleAllocator<SCPUEUTask>::Instance();
38     SimpleAllocator<QueueTask>::Instance();
39     SimpleAllocator<IOTask>::Instance();
40     SimpleAllocator<VersionCtx>::Instance();
41     SimpleAllocator<WaitUntilEntry>::Instance();
42     QSimpleAllocator<CoRoutine>::Instance(CoStackAttr::Instance()->size);
43     PollerProxy::Instance();
44     FFRTScheduler::Instance();
45 #ifdef FFRT_WORKER_MONITOR
46     WorkerMonitor::GetInstance();
47 #endif
48     ExecuteUnit::Instance();
49     TaskState::RegisterOps(TaskState::EXITED,
50         [this](CPUEUTask* task) { return this->onTaskDone(static_cast<SCPUEUTask*>(task)), true; });
51 
52 #ifdef FFRT_OH_TRACE_ENABLE
53     _StartTrace(HITRACE_TAG_FFRT, "dm_init", -1); // init g_tagsProperty for ohos ffrt trace
54     _FinishTrace(HITRACE_TAG_FFRT);
55 #endif
56     DelayedWorker::GetInstance();
57 }
58 
~SDependenceManager()59 SDependenceManager::~SDependenceManager()
60 {
61 }
62 
RemoveRepeatedDeps(std::vector<CPUEUTask * > & in_handles,const ffrt_deps_t * ins,const ffrt_deps_t * outs,std::vector<const void * > & insNoDup,std::vector<const void * > & outsNoDup)63 void SDependenceManager::RemoveRepeatedDeps(std::vector<CPUEUTask*>& in_handles, const ffrt_deps_t* ins, const ffrt_deps_t* outs,
64     std::vector<const void *>& insNoDup, std::vector<const void *>& outsNoDup)
65 {
66     // signature去重:1)outs去重
67     if (outs) {
68         OutsDedup(outsNoDup, outs);
69     }
70 
71     // signature去重:2)ins去重(不影响功能,skip);3)ins不和outs重复(当前不支持weak signature)
72     if (ins) {
73         InsDedup(in_handles, insNoDup, outsNoDup, ins);
74     }
75 }
76 
onSubmit(bool has_handle,ffrt_task_handle_t & handle,ffrt_function_header_t * f,const ffrt_deps_t * ins,const ffrt_deps_t * outs,const task_attr_private * attr)77 void SDependenceManager::onSubmit(bool has_handle, ffrt_task_handle_t &handle, ffrt_function_header_t *f,
78     const ffrt_deps_t *ins, const ffrt_deps_t *outs, const task_attr_private *attr)
79 {
80     // 0 check outs handle
81     if (!CheckOutsHandle(outs)) {
82         FFRT_LOGE("outs contain handles error");
83         return;
84     }
85 
86     // 1 Init eu and scheduler
87     auto ctx = ExecuteCtx::Cur();
88 
89     // 2 Get current task's parent
90     auto parent = (ctx->task && ctx->task->type == ffrt_normal_task) ? ctx->task : DependenceManager::Root();
91 
92     // 2.1 Create task ctx
93     SCPUEUTask* task = nullptr;
94     {
95         task = reinterpret_cast<SCPUEUTask*>(static_cast<uintptr_t>(
96             static_cast<size_t>(reinterpret_cast<uintptr_t>(f)) - OFFSETOF(SCPUEUTask, func_storage)));
97         new (task)SCPUEUTask(attr, parent, ++parent->childNum, QoS());
98     }
99 #ifdef ENABLE_HITRACE_CHAIN
100     if (HiTraceChainGetId().valid == HITRACE_ID_VALID) {
101         task->traceId_ = HiTraceChainCreateSpan();
102     }
103 #endif
104     FFRT_SUBMIT_MARKER(task->gid);
105 #ifdef FFRT_ASYNC_STACKTRACE
106     {
107         task->stackId = FFRTCollectAsyncStack();
108     }
109 #endif
110     QoS qos = (attr == nullptr ? QoS() : QoS(attr->qos_));
111     FFRTTraceRecord::TaskSubmit<ffrt_normal_task>(qos, &(task->createTime), &(task->fromTid));
112 
113     std::vector<const void*> insNoDup;
114     std::vector<const void*> outsNoDup;
115     RemoveRepeatedDeps(task->in_handles, ins, outs, insNoDup, outsNoDup);
116 
117 #ifdef FFRT_OH_WATCHDOG_ENABLE
118     if (attr != nullptr && IsValidTimeout(task->gid, attr->timeout_)) {
119         task->isWatchdogEnable = true;
120         AddTaskToWatchdog(task->gid);
121         SendTimeoutWatchdog(task->gid, attr->timeout_, attr->delay_);
122     }
123 #endif
124     if (has_handle) {
125         task->IncDeleteRef();
126         handle = static_cast<ffrt_task_handle_t>(task);
127         outsNoDup.push_back(handle); // handle作为任务的输出signature
128     }
129     task->SetQos(qos);
130     /* The parent's number of subtasks to be completed increases by one,
131         * and decreases by one after the subtask is completed
132         */
133     task->IncChildRef();
134 
135     if (!(insNoDup.empty() && outsNoDup.empty())) {
136         std::vector<std::pair<VersionCtx*, NestType>> inDatas;
137         std::vector<std::pair<VersionCtx*, NestType>> outDatas;
138         // 3 Put the submitted task into Entity
139         std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
140 
141         MapSignature2Deps(task, insNoDup, outsNoDup, inDatas, outDatas);
142 
143         {
144             // 3.1 Process input dependencies
145             for (auto& i : std::as_const(inDatas)) {
146                 i.first->AddConsumer(task, i.second);
147             }
148         }
149 
150         {
151             // 3.2 Process output dependencies
152             for (auto& o : std::as_const(outDatas)) {
153                 o.first->AddProducer(task);
154             }
155         }
156         if (task->dataRefCnt.submitDep != 0) {
157             FFRT_BLOCK_TRACER(task->gid, dep);
158             FFRT_TRACE_END();
159             return;
160         }
161     }
162 
163     if (attr != nullptr) {
164         task->notifyWorker_ = attr->notifyWorker_;
165     }
166 
167     task->UpdateState(TaskState::READY);
168     FFRTTraceRecord::TaskEnqueue<ffrt_normal_task>(qos);
169     FFRT_TRACE_END();
170 }
171 
onWait()172 void SDependenceManager::onWait()
173 {
174     auto ctx = ExecuteCtx::Cur();
175     auto baseTask = (ctx->task && ctx->task->type == ffrt_normal_task) ? ctx->task : DependenceManager::Root();
176     auto task = static_cast<SCPUEUTask*>(baseTask);
177 
178     if (ThreadWaitMode(task)) {
179         std::unique_lock<std::mutex> lck(task->mutex_);
180         task->MultiDepenceAdd(Denpence::CALL_DEPENCE);
181         FFRT_LOGD("onWait name:%s gid=%lu", task->label.c_str(), task->gid);
182         if (FFRT_UNLIKELY(LegacyMode(task))) {
183             task->blockType = BlockType::BLOCK_THREAD;
184         }
185         task->waitCond_.wait(lck, [task] { return task->childRefCnt == 0; });
186         return;
187     }
188 
189     auto childDepFun = [&](ffrt::CPUEUTask* task) -> bool {
190         auto sTask = static_cast<SCPUEUTask*>(task);
191         std::unique_lock<std::mutex> lck(sTask->mutex_);
192         if (sTask->childRefCnt == 0) {
193             return false;
194         }
195         sTask->MultiDepenceAdd(Denpence::CALL_DEPENCE);
196         sTask->UpdateState(ffrt::TaskState::BLOCKED);
197         return true;
198     };
199     FFRT_BLOCK_TRACER(task->gid, chd);
200     CoWait(childDepFun);
201 }
202 
203 #ifdef QOS_DEPENDENCY
onWait(const ffrt_deps_t * deps,int64_t deadline=-1)204 void SDependenceManager::onWait(const ffrt_deps_t* deps, int64_t deadline = -1)
205 #else
206 void SDependenceManager::onWait(const ffrt_deps_t* deps)
207 #endif
208 {
209     auto ctx = ExecuteCtx::Cur();
210     auto baseTask = (ctx->task && ctx->task->type == ffrt_normal_task) ? ctx->task : DependenceManager::Root();
211     auto task = static_cast<SCPUEUTask*>(baseTask);
212     task->dataRefCnt.waitDep = 0;
213 
214     auto dataDepFun = [&]() {
215         std::vector<VersionCtx*> waitDatas;
216         waitDatas.reserve(deps->len);
217         std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
218 
219         for (uint32_t i = 0; i < deps->len; ++i) {
220             auto d = deps->items[i].ptr;
221             auto it = std::as_const(Entity::Instance()->vaMap).find(d);
222             if (it != Entity::Instance()->vaMap.end()) {
223                 auto waitData = it->second;
224                 // Find the VersionCtx of the parent task level
225                 for (auto out : std::as_const(task->outs)) {
226                     if (waitData->signature == out->signature) {
227                         waitData = out;
228                         break;
229                     }
230                 }
231                 waitDatas.push_back(waitData);
232             }
233         }
234 #ifdef QOS_DEPENDENCY
235         if (deadline != -1) {
236             Scheduler::Instance()->onWait(waitDatas, deadline);
237         }
238 #endif
239         for (auto data : std::as_const(waitDatas)) {
240             data->AddDataWaitTaskByThis(task);
241         }
242     };
243 
244     if (ThreadWaitMode(task)) {
245         dataDepFun();
246         std::unique_lock<std::mutex> lck(task->mutex_);
247         task->MultiDepenceAdd(Denpence::DATA_DEPENCE);
248         FFRT_LOGD("onWait name:%s gid=%lu", task->label.c_str(), task->gid);
249         if (FFRT_UNLIKELY(LegacyMode(task))) {
250             task->blockType = BlockType::BLOCK_THREAD;
251         }
252         task->waitCond_.wait(lck, [task] { return task->dataRefCnt.waitDep == 0; });
253         return;
254     }
255 
256     auto pendDataDepFun = [&](ffrt::CPUEUTask* task) -> bool {
257         auto sTask = static_cast<SCPUEUTask*>(task);
258         dataDepFun();
259         FFRT_LOGD("onWait name:%s gid=%lu", sTask->label.c_str(), sTask->gid);
260         std::unique_lock<std::mutex> lck(sTask->mutex_);
261         if (sTask->dataRefCnt.waitDep == 0) {
262             return false;
263         }
264         sTask->MultiDepenceAdd(Denpence::DATA_DEPENCE);
265         sTask->UpdateState(ffrt::TaskState::BLOCKED);
266         return true;
267     };
268     FFRT_BLOCK_TRACER(task->gid, dat);
269     CoWait(pendDataDepFun);
270 }
271 
onExecResults(ffrt_task_handle_t handle)272 int SDependenceManager::onExecResults(ffrt_task_handle_t handle)
273 {
274     return 0;
275 }
276 
onTaskDone(CPUEUTask * task)277 void SDependenceManager::onTaskDone(CPUEUTask* task)
278 {
279     auto sTask = static_cast<SCPUEUTask*>(task);
280     FFRTTraceRecord::TaskDone<ffrt_normal_task>(task->GetQos());
281     FFRTTraceRecord::TaskDone<ffrt_normal_task>(task->GetQos(),  task);
282     FFRT_TRACE_SCOPE(1, ontaskDone);
283     sTask->DecChildRef();
284     if (!(sTask->ins.empty() && sTask->outs.empty())) {
285         std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
286         FFRT_TRACE_SCOPE(1, taskDoneAfterLock);
287 
288         // Production data
289         for (auto out : std::as_const(sTask->outs)) {
290             out->onProduced();
291         }
292         // Consumption data
293         for (auto in : std::as_const(sTask->ins)) {
294             in->onConsumed(sTask);
295         }
296         for (auto in : std::as_const(sTask->in_handles)) {
297             in->DecDeleteRef();
298         }
299         // VersionCtx recycling
300         Entity::Instance()->RecycleVersion();
301     }
302     if (task->isWatchdogEnable) {
303         RemoveTaskFromWatchdog(task->gid);
304     }
305     sTask->RecycleTask();
306 }
307 
MapSignature2Deps(SCPUEUTask * task,const std::vector<const void * > & inDeps,const std::vector<const void * > & outDeps,std::vector<std::pair<VersionCtx *,NestType>> & inVersions,std::vector<std::pair<VersionCtx *,NestType>> & outVersions)308 void SDependenceManager::MapSignature2Deps(SCPUEUTask* task, const std::vector<const void*>& inDeps,
309     const std::vector<const void*>& outDeps, std::vector<std::pair<VersionCtx*, NestType>>& inVersions,
310     std::vector<std::pair<VersionCtx*, NestType>>& outVersions)
311 {
312     auto en = Entity::Instance();
313     // scene description:
314     for (auto signature : inDeps) {
315         VersionCtx* version = nullptr;
316         NestType type = NestType::DEFAULT;
317         // scene 1|2
318         for (auto parentOut : std::as_const(static_cast<SCPUEUTask*>(task->parent)->outs)) {
319             if (parentOut->signature == signature) {
320                 version = parentOut;
321                 type = NestType::PARENTOUT;
322                 goto add_inversion;
323             }
324         }
325         // scene 3
326         for (auto parentIn : std::as_const(static_cast<SCPUEUTask*>(task->parent)->ins)) {
327             if (parentIn->signature == signature) {
328                 version = parentIn;
329                 type = NestType::PARENTIN;
330                 goto add_inversion;
331             }
332         }
333         // scene 4
334         version = en->VA2Ctx(signature, task);
335     add_inversion:
336         inVersions.push_back({version, type});
337     }
338 
339     for (auto signature : outDeps) {
340         VersionCtx* version = nullptr;
341         NestType type = NestType::DEFAULT;
342         // scene 5|6
343         for (auto parentOut : std::as_const(static_cast<SCPUEUTask*>(task->parent)->outs)) {
344             if (parentOut->signature == signature) {
345                 version = parentOut;
346                 type = NestType::PARENTOUT;
347                 goto add_outversion;
348             }
349         }
350         // scene 7
351 #ifndef FFRT_RELEASE
352         for (auto parentIn : std::as_const(static_cast<SCPUEUTask*>(task->parent)->ins)) {
353             if (parentIn->signature == signature) {
354                 FFRT_LOGE("parent's indep only cannot be child's outdep");
355             }
356         }
357 #endif
358         // scene 8
359         version = en->VA2Ctx(signature, task);
360     add_outversion:
361         outVersions.push_back({version, type});
362     }
363 }
364 
onSkip(ffrt_task_handle_t handle)365 int SDependenceManager::onSkip(ffrt_task_handle_t handle)
366 {
367     ffrt::CPUEUTask *task = static_cast<ffrt::CPUEUTask*>(handle);
368     auto exp = ffrt::SkipStatus::SUBMITTED;
369     if (__atomic_compare_exchange_n(&task->skipped, &exp, ffrt::SkipStatus::SKIPPED, 0, __ATOMIC_ACQUIRE,
370         __ATOMIC_RELAXED)) {
371         return ffrt_success;
372     }
373 
374     FFRT_LOGE("skip task [%lu] failed", task->gid);
375     return ffrt_error;
376 }
377 } // namespace ffrt