• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sdependence_manager.h"
17 #include "util/worker_monitor.h"
18 
19 namespace ffrt {
20 
SDependenceManager()21 SDependenceManager::SDependenceManager() : criticalMutex_(Entity::Instance()->criticalMutex_)
22 {
23 #ifdef FFRT_OH_TRACE_ENABLE
24     TraceAdapter::Instance();
25 #endif
26     // control construct sequences of singletons
27     SimpleAllocator<CPUEUTask>::instance();
28     SimpleAllocator<VersionCtx>::instance();
29 #ifdef FFRT_IO_TASK_SCHEDULER
30     PollerProxy::Instance();
31 #endif
32     FFRTScheduler::Instance();
33     ExecuteUnit::Instance();
34     TaskState::RegisterOps(TaskState::EXITED,
35         [this](CPUEUTask* task) { return this->onTaskDone(static_cast<SCPUEUTask*>(task)), true; });
36 #ifdef FFRT_OH_TRACE_ENABLE
37         static WorkerMonitor workerMonitor;
38         _StartTrace(HITRACE_TAG_FFRT, "dm_init", -1); // init g_tagsProperty for ohos ffrt trace
39         _FinishTrace(HITRACE_TAG_FFRT);
40 #endif
41 }
42 
~SDependenceManager()43 SDependenceManager::~SDependenceManager()
44 {
45 }
46 
onSubmit(bool has_handle,ffrt_task_handle_t & handle,ffrt_function_header_t * f,const ffrt_deps_t * ins,const ffrt_deps_t * outs,const task_attr_private * attr)47 void SDependenceManager::onSubmit(bool has_handle, ffrt_task_handle_t &handle, ffrt_function_header_t *f,
48     const ffrt_deps_t *ins, const ffrt_deps_t *outs, const task_attr_private *attr)
49 {
50     FFRT_TRACE_SCOPE(1, onSubmit);
51     // 1 Init eu and scheduler
52     auto ctx = ExecuteCtx::Cur();
53     auto en = Entity::Instance();
54 
55     // 2 Get current task's parent
56     auto parent = ctx->task ? ctx->task : DependenceManager::Root();
57 
58     std::vector<const void*> insNoDup;
59     std::vector<const void*> outsNoDup;
60     std::vector<CPUEUTask*> in_handles;
61     // signature去重:1)outs去重
62     if (outs) {
63         if (!outsDeDup(outsNoDup, outs)) {
64             FFRT_LOGE("onSubmit outsDeDup error");
65             return;
66         }
67     }
68 
69     // signature去重:2)ins去重(不影响功能,skip);3)ins不和outs重复(当前不支持weak signature)
70     if (ins) {
71         insDeDup(in_handles, insNoDup, outsNoDup, ins);
72     }
73 
74     // 2.1 Create task ctx
75     SCPUEUTask* task = nullptr;
76     {
77         FFRT_TRACE_SCOPE(1, CreateTask);
78         task = reinterpret_cast<SCPUEUTask*>(static_cast<uintptr_t>(
79             static_cast<size_t>(reinterpret_cast<uintptr_t>(f)) - OFFSETOF(SCPUEUTask, func_storage)));
80         new (task)SCPUEUTask(attr, parent, ++parent->childNum, QoS());
81     }
82     FFRT_LOGD("submit task[%lu], name[%s]", task->gid, task->label.c_str());
83 #ifdef FFRT_BBOX_ENABLE
84     TaskSubmitCounterInc();
85 #endif
86     if (has_handle) {
87         task->IncDeleteRef();
88         handle = static_cast<ffrt_task_handle_t>(task);
89         outsNoDup.push_back(handle); // handle作为任务的输出signature
90     }
91     QoS qos = (attr == nullptr ? QoS() : QoS(attr->qos_map.m_qos));
92     task->SetQos(qos);
93 
94     /* The parent's number of subtasks to be completed increases by one,
95         * and decreases by one after the subtask is completed
96         */
97     task->IncChildRef();
98 
99     std::vector<std::pair<VersionCtx*, NestType>> inDatas;
100     std::vector<std::pair<VersionCtx*, NestType>> outDatas;
101 
102     if (!(insNoDup.empty() && outsNoDup.empty())) {
103         // 3 Put the submitted task into Entity
104         FFRT_TRACE_SCOPE(1, submitBeforeLock);
105         std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
106         FFRT_TRACE_SCOPE(1, submitAfterLock);
107 
108         MapSignature2Deps(task, insNoDup, outsNoDup, inDatas, outDatas);
109 
110         {
111             FFRT_TRACE_SCOPE(1, dealInDepend);
112             // 3.1 Process input dependencies
113             for (auto& i : std::as_const(inDatas)) {
114                 i.first->AddConsumer(task, i.second);
115             }
116         }
117 
118         {
119             FFRT_TRACE_SCOPE(1, dealOutDepend);
120             // 3.2 Process output dependencies
121             for (auto& o : std::as_const(outDatas)) {
122                 o.first->AddProducer(task);
123             }
124         }
125         task->in_handles.swap(in_handles);
126         if (task->depRefCnt != 0) {
127             FFRT_BLOCK_TRACER(task->gid, dep);
128             return;
129         }
130     }
131 
132     FFRT_LOGD("Submit completed, enter ready queue, task[%lu], name[%s]", task->gid, task->label.c_str());
133     task->UpdateState(TaskState::READY);
134 #ifdef FFRT_BBOX_ENABLE
135     TaskEnQueuCounterInc();
136 #endif
137 }
138 
onSubmitUV(ffrt_executor_task_t * task,const task_attr_private * attr)139 void SDependenceManager::onSubmitUV(ffrt_executor_task_t* task, const task_attr_private* attr)
140 {
141     FFRT_EXECUTOR_TASK_SUBMIT_MARKER(task);
142     FFRT_TRACE_SCOPE(1, onSubmitUV);
143 #ifdef FFRT_BBOX_ENABLE
144     TaskSubmitCounterInc();
145 #endif
146     QoS qos = (attr == nullptr ? QoS() : QoS(attr->qos_map.m_qos));
147 
148     LinkedList* node = reinterpret_cast<LinkedList *>(&task->wq);
149     FFRTScheduler* sch = FFRTScheduler::Instance();
150     if (!sch->InsertNode(node, qos)) {
151         FFRT_LOGE("Submit UV task failed!");
152         return;
153     }
154 
155 #ifdef FFRT_BBOX_ENABLE
156     TaskEnQueuCounterInc();
157 #endif
158 }
159 
onWait()160 void SDependenceManager::onWait()
161 {
162     auto ctx = ExecuteCtx::Cur();
163     auto baseTask = ctx->task ? ctx->task : DependenceManager::Root();
164     auto task = static_cast<SCPUEUTask*>(baseTask);
165     bool legacyMode = task->coRoutine ? task->coRoutine->legacyMode : false;
166     if (!USE_COROUTINE || task->parent == nullptr || legacyMode) {
167         std::unique_lock<std::mutex> lck(task->lock);
168         task->MultiDepenceAdd(Denpence::CALL_DEPENCE);
169         FFRT_LOGD("onWait name:%s gid=%lu", task->label.c_str(), task->gid);
170         if (legacyMode) {
171             task->coRoutine->blockType = BlockType::BLOCK_THREAD;
172         }
173         task->childWaitCond_.wait(lck, [task] { return task->childWaitRefCnt == 0; });
174         return;
175     }
176 
177     auto childDepFun = [&](ffrt::CPUEUTask* inTask) -> bool {
178         auto sTask = static_cast<SCPUEUTask*>(inTask);
179         std::unique_lock<std::mutex> lck(sTask->lock);
180         if (sTask->childWaitRefCnt == 0) {
181             return false;
182         }
183         sTask->MultiDepenceAdd(Denpence::CALL_DEPENCE);
184         sTask->UpdateState(ffrt::TaskState::BLOCKED);
185         return true;
186     };
187     FFRT_BLOCK_TRACER(task->gid, chd);
188     CoWait(childDepFun);
189 }
190 
191 #ifdef QOS_DEPENDENCY
onWait(const ffrt_deps_t * deps,int64_t deadline=-1)192 void SDependenceManager::onWait(const ffrt_deps_t* deps, int64_t deadline = -1)
193 #else
194 void SDependenceManager::onWait(const ffrt_deps_t* deps)
195 #endif
196 {
197     auto ctx = ExecuteCtx::Cur();
198     auto baseTask = ctx->task ? ctx->task : DependenceManager::Root();
199     auto task = static_cast<SCPUEUTask*>(baseTask);
200 
201     auto dataDepFun = [&]() {
202         std::vector<VersionCtx*> waitDatas;
203         waitDatas.reserve(deps->len);
204         std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
205 
206         for (uint32_t i = 0; i < deps->len; ++i) {
207             auto d = deps->items[i].ptr;
208             auto it = std::as_const(Entity::Instance()->vaMap).find(d);
209             if (it != Entity::Instance()->vaMap.end()) {
210                 auto waitData = it->second;
211                 // Find the VersionCtx of the parent task level
212                 for (auto out : std::as_const(task->outs)) {
213                     if (waitData->signature == out->signature) {
214                         waitData = out;
215                         break;
216                     }
217                 }
218                 waitDatas.push_back(waitData);
219             }
220         }
221 #ifdef QOS_DEPENDENCY
222         if (deadline != -1) {
223             Scheduler::Instance()->onWait(waitDatas, deadline);
224         }
225 #endif
226         for (auto data : std::as_const(waitDatas)) {
227             data->AddDataWaitTaskByThis(task);
228         }
229     };
230 
231     bool legacyMode = task->coRoutine ? task->coRoutine->legacyMode : false;
232     if (!USE_COROUTINE || task->parent == nullptr || legacyMode) {
233         dataDepFun();
234         std::unique_lock<std::mutex> lck(task->lock);
235         task->MultiDepenceAdd(Denpence::DATA_DEPENCE);
236         FFRT_LOGD("onWait name:%s gid=%lu", task->label.c_str(), task->gid);
237         if (legacyMode) {
238             task->coRoutine->blockType = BlockType::BLOCK_THREAD;
239         }
240         task->dataWaitCond_.wait(lck, [task] { return task->dataWaitRefCnt == 0; });
241         return;
242     }
243 
244     auto pendDataDepFun = [&](ffrt::CPUEUTask* inTask) -> bool {
245         auto sTask = static_cast<SCPUEUTask*>(inTask);
246         dataDepFun();
247         FFRT_LOGD("onWait name:%s gid=%lu", sTask->label.c_str(), sTask->gid);
248         std::unique_lock<std::mutex> lck(sTask->lock);
249         if (sTask->dataWaitRefCnt == 0) {
250             return false;
251         }
252         sTask->MultiDepenceAdd(Denpence::DATA_DEPENCE);
253         sTask->UpdateState(ffrt::TaskState::BLOCKED);
254         return true;
255     };
256     FFRT_BLOCK_TRACER(task->gid, dat);
257     CoWait(pendDataDepFun);
258 }
259 
onTaskDone(CPUEUTask * task)260 void SDependenceManager::onTaskDone(CPUEUTask* task)
261 {
262     auto sTask = static_cast<SCPUEUTask*>(task);
263     FFRT_LOGD("Task completed, task[%lu], name[%s]", sTask->gid, sTask->label.c_str());
264 #ifdef FFRT_BBOX_ENABLE
265     TaskDoneCounterInc();
266 #endif
267     FFRT_TRACE_SCOPE(1, ontaskDone);
268     sTask->DecChildRef();
269     if (!(sTask->ins.empty() && sTask->outs.empty())) {
270         std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
271         FFRT_TRACE_SCOPE(1, taskDoneAfterLock);
272 
273         // Production data
274         for (auto out : std::as_const(sTask->outs)) {
275             out->onProduced();
276         }
277         // Consumption data
278         for (auto in : std::as_const(sTask->ins)) {
279             in->onConsumed(sTask);
280         }
281         for (auto in : std::as_const(sTask->in_handles)) {
282             in->DecDeleteRef();
283         }
284 
285         // VersionCtx recycling
286         Entity::Instance()->RecycleVersion();
287     }
288 
289     sTask->RecycleTask();
290 }
291 
MapSignature2Deps(SCPUEUTask * task,const std::vector<const void * > & inDeps,const std::vector<const void * > & outDeps,std::vector<std::pair<VersionCtx *,NestType>> & inVersions,std::vector<std::pair<VersionCtx *,NestType>> & outVersions)292 void SDependenceManager::MapSignature2Deps(SCPUEUTask* task, const std::vector<const void*>& inDeps,
293     const std::vector<const void*>& outDeps, std::vector<std::pair<VersionCtx*, NestType>>& inVersions,
294     std::vector<std::pair<VersionCtx*, NestType>>& outVersions)
295 {
296     auto en = Entity::Instance();
297     // scene description:
298     for (auto signature : inDeps) {
299         VersionCtx* version = nullptr;
300         NestType type = NestType::DEFAULT;
301         // scene 1|2
302         for (auto parentOut : std::as_const(static_cast<SCPUEUTask*>(task->parent)->outs)) {
303             if (parentOut->signature == signature) {
304                 version = parentOut;
305                 type = NestType::PARENTOUT;
306                 goto add_inversion;
307             }
308         }
309         // scene 3
310         for (auto parentIn : std::as_const(static_cast<SCPUEUTask*>(task->parent)->ins)) {
311             if (parentIn->signature == signature) {
312                 version = parentIn;
313                 type = NestType::PARENTIN;
314                 goto add_inversion;
315             }
316         }
317         // scene 4
318         version = en->VA2Ctx(signature, task);
319     add_inversion:
320         inVersions.push_back({version, type});
321     }
322 
323     for (auto signature : outDeps) {
324         VersionCtx* version = nullptr;
325         NestType type = NestType::DEFAULT;
326         // scene 5|6
327         for (auto parentOut : std::as_const(static_cast<SCPUEUTask*>(task->parent)->outs)) {
328             if (parentOut->signature == signature) {
329                 version = parentOut;
330                 type = NestType::PARENTOUT;
331                 goto add_outversion;
332             }
333         }
334         // scene 7
335 #ifndef FFRT_RELEASE
336         for (auto parentIn : std::as_const(static_cast<SCPUEUTask*>(task->parent)->ins)) {
337             if (parentIn->signature == signature) {
338                 FFRT_LOGE("parent's indep only cannot be child's outdep");
339             }
340         }
341 #endif
342         // scene 8
343         version = en->VA2Ctx(signature, task);
344     add_outversion:
345         outVersions.push_back({version, type});
346     }
347 }
348 } // namespace ffrt