1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sdependence_manager.h"
17 #include "util/worker_monitor.h"
18
19 namespace ffrt {
20
SDependenceManager()21 SDependenceManager::SDependenceManager() : criticalMutex_(Entity::Instance()->criticalMutex_)
22 {
23 #ifdef FFRT_OH_TRACE_ENABLE
24 TraceAdapter::Instance();
25 #endif
26 // control construct sequences of singletons
27 SimpleAllocator<CPUEUTask>::instance();
28 SimpleAllocator<VersionCtx>::instance();
29 #ifdef FFRT_IO_TASK_SCHEDULER
30 PollerProxy::Instance();
31 #endif
32 FFRTScheduler::Instance();
33 ExecuteUnit::Instance();
34 TaskState::RegisterOps(TaskState::EXITED,
35 [this](CPUEUTask* task) { return this->onTaskDone(static_cast<SCPUEUTask*>(task)), true; });
36 #ifdef FFRT_OH_TRACE_ENABLE
37 static WorkerMonitor workerMonitor;
38 _StartTrace(HITRACE_TAG_FFRT, "dm_init", -1); // init g_tagsProperty for ohos ffrt trace
39 _FinishTrace(HITRACE_TAG_FFRT);
40 #endif
41 }
42
~SDependenceManager()43 SDependenceManager::~SDependenceManager()
44 {
45 }
46
onSubmit(bool has_handle,ffrt_task_handle_t & handle,ffrt_function_header_t * f,const ffrt_deps_t * ins,const ffrt_deps_t * outs,const task_attr_private * attr)47 void SDependenceManager::onSubmit(bool has_handle, ffrt_task_handle_t &handle, ffrt_function_header_t *f,
48 const ffrt_deps_t *ins, const ffrt_deps_t *outs, const task_attr_private *attr)
49 {
50 FFRT_TRACE_SCOPE(1, onSubmit);
51 // 1 Init eu and scheduler
52 auto ctx = ExecuteCtx::Cur();
53 auto en = Entity::Instance();
54
55 // 2 Get current task's parent
56 auto parent = ctx->task ? ctx->task : DependenceManager::Root();
57
58 std::vector<const void*> insNoDup;
59 std::vector<const void*> outsNoDup;
60 std::vector<CPUEUTask*> in_handles;
61 // signature去重:1)outs去重
62 if (outs) {
63 if (!outsDeDup(outsNoDup, outs)) {
64 FFRT_LOGE("onSubmit outsDeDup error");
65 return;
66 }
67 }
68
69 // signature去重:2)ins去重(不影响功能,skip);3)ins不和outs重复(当前不支持weak signature)
70 if (ins) {
71 insDeDup(in_handles, insNoDup, outsNoDup, ins);
72 }
73
74 // 2.1 Create task ctx
75 SCPUEUTask* task = nullptr;
76 {
77 FFRT_TRACE_SCOPE(1, CreateTask);
78 task = reinterpret_cast<SCPUEUTask*>(static_cast<uintptr_t>(
79 static_cast<size_t>(reinterpret_cast<uintptr_t>(f)) - OFFSETOF(SCPUEUTask, func_storage)));
80 new (task)SCPUEUTask(attr, parent, ++parent->childNum, QoS());
81 }
82 FFRT_LOGD("submit task[%lu], name[%s]", task->gid, task->label.c_str());
83 #ifdef FFRT_BBOX_ENABLE
84 TaskSubmitCounterInc();
85 #endif
86 if (has_handle) {
87 task->IncDeleteRef();
88 handle = static_cast<ffrt_task_handle_t>(task);
89 outsNoDup.push_back(handle); // handle作为任务的输出signature
90 }
91 QoS qos = (attr == nullptr ? QoS() : QoS(attr->qos_map.m_qos));
92 task->SetQos(qos);
93
94 /* The parent's number of subtasks to be completed increases by one,
95 * and decreases by one after the subtask is completed
96 */
97 task->IncChildRef();
98
99 std::vector<std::pair<VersionCtx*, NestType>> inDatas;
100 std::vector<std::pair<VersionCtx*, NestType>> outDatas;
101
102 if (!(insNoDup.empty() && outsNoDup.empty())) {
103 // 3 Put the submitted task into Entity
104 FFRT_TRACE_SCOPE(1, submitBeforeLock);
105 std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
106 FFRT_TRACE_SCOPE(1, submitAfterLock);
107
108 MapSignature2Deps(task, insNoDup, outsNoDup, inDatas, outDatas);
109
110 {
111 FFRT_TRACE_SCOPE(1, dealInDepend);
112 // 3.1 Process input dependencies
113 for (auto& i : std::as_const(inDatas)) {
114 i.first->AddConsumer(task, i.second);
115 }
116 }
117
118 {
119 FFRT_TRACE_SCOPE(1, dealOutDepend);
120 // 3.2 Process output dependencies
121 for (auto& o : std::as_const(outDatas)) {
122 o.first->AddProducer(task);
123 }
124 }
125 task->in_handles.swap(in_handles);
126 if (task->depRefCnt != 0) {
127 FFRT_BLOCK_TRACER(task->gid, dep);
128 return;
129 }
130 }
131
132 FFRT_LOGD("Submit completed, enter ready queue, task[%lu], name[%s]", task->gid, task->label.c_str());
133 task->UpdateState(TaskState::READY);
134 #ifdef FFRT_BBOX_ENABLE
135 TaskEnQueuCounterInc();
136 #endif
137 }
138
onSubmitUV(ffrt_executor_task_t * task,const task_attr_private * attr)139 void SDependenceManager::onSubmitUV(ffrt_executor_task_t* task, const task_attr_private* attr)
140 {
141 FFRT_EXECUTOR_TASK_SUBMIT_MARKER(task);
142 FFRT_TRACE_SCOPE(1, onSubmitUV);
143 #ifdef FFRT_BBOX_ENABLE
144 TaskSubmitCounterInc();
145 #endif
146 QoS qos = (attr == nullptr ? QoS() : QoS(attr->qos_map.m_qos));
147
148 LinkedList* node = reinterpret_cast<LinkedList *>(&task->wq);
149 FFRTScheduler* sch = FFRTScheduler::Instance();
150 if (!sch->InsertNode(node, qos)) {
151 FFRT_LOGE("Submit UV task failed!");
152 return;
153 }
154
155 #ifdef FFRT_BBOX_ENABLE
156 TaskEnQueuCounterInc();
157 #endif
158 }
159
onWait()160 void SDependenceManager::onWait()
161 {
162 auto ctx = ExecuteCtx::Cur();
163 auto baseTask = ctx->task ? ctx->task : DependenceManager::Root();
164 auto task = static_cast<SCPUEUTask*>(baseTask);
165 bool legacyMode = task->coRoutine ? task->coRoutine->legacyMode : false;
166 if (!USE_COROUTINE || task->parent == nullptr || legacyMode) {
167 std::unique_lock<std::mutex> lck(task->lock);
168 task->MultiDepenceAdd(Denpence::CALL_DEPENCE);
169 FFRT_LOGD("onWait name:%s gid=%lu", task->label.c_str(), task->gid);
170 if (legacyMode) {
171 task->coRoutine->blockType = BlockType::BLOCK_THREAD;
172 }
173 task->childWaitCond_.wait(lck, [task] { return task->childWaitRefCnt == 0; });
174 return;
175 }
176
177 auto childDepFun = [&](ffrt::CPUEUTask* inTask) -> bool {
178 auto sTask = static_cast<SCPUEUTask*>(inTask);
179 std::unique_lock<std::mutex> lck(sTask->lock);
180 if (sTask->childWaitRefCnt == 0) {
181 return false;
182 }
183 sTask->MultiDepenceAdd(Denpence::CALL_DEPENCE);
184 sTask->UpdateState(ffrt::TaskState::BLOCKED);
185 return true;
186 };
187 FFRT_BLOCK_TRACER(task->gid, chd);
188 CoWait(childDepFun);
189 }
190
191 #ifdef QOS_DEPENDENCY
onWait(const ffrt_deps_t * deps,int64_t deadline=-1)192 void SDependenceManager::onWait(const ffrt_deps_t* deps, int64_t deadline = -1)
193 #else
194 void SDependenceManager::onWait(const ffrt_deps_t* deps)
195 #endif
196 {
197 auto ctx = ExecuteCtx::Cur();
198 auto baseTask = ctx->task ? ctx->task : DependenceManager::Root();
199 auto task = static_cast<SCPUEUTask*>(baseTask);
200
201 auto dataDepFun = [&]() {
202 std::vector<VersionCtx*> waitDatas;
203 waitDatas.reserve(deps->len);
204 std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
205
206 for (uint32_t i = 0; i < deps->len; ++i) {
207 auto d = deps->items[i].ptr;
208 auto it = std::as_const(Entity::Instance()->vaMap).find(d);
209 if (it != Entity::Instance()->vaMap.end()) {
210 auto waitData = it->second;
211 // Find the VersionCtx of the parent task level
212 for (auto out : std::as_const(task->outs)) {
213 if (waitData->signature == out->signature) {
214 waitData = out;
215 break;
216 }
217 }
218 waitDatas.push_back(waitData);
219 }
220 }
221 #ifdef QOS_DEPENDENCY
222 if (deadline != -1) {
223 Scheduler::Instance()->onWait(waitDatas, deadline);
224 }
225 #endif
226 for (auto data : std::as_const(waitDatas)) {
227 data->AddDataWaitTaskByThis(task);
228 }
229 };
230
231 bool legacyMode = task->coRoutine ? task->coRoutine->legacyMode : false;
232 if (!USE_COROUTINE || task->parent == nullptr || legacyMode) {
233 dataDepFun();
234 std::unique_lock<std::mutex> lck(task->lock);
235 task->MultiDepenceAdd(Denpence::DATA_DEPENCE);
236 FFRT_LOGD("onWait name:%s gid=%lu", task->label.c_str(), task->gid);
237 if (legacyMode) {
238 task->coRoutine->blockType = BlockType::BLOCK_THREAD;
239 }
240 task->dataWaitCond_.wait(lck, [task] { return task->dataWaitRefCnt == 0; });
241 return;
242 }
243
244 auto pendDataDepFun = [&](ffrt::CPUEUTask* inTask) -> bool {
245 auto sTask = static_cast<SCPUEUTask*>(inTask);
246 dataDepFun();
247 FFRT_LOGD("onWait name:%s gid=%lu", sTask->label.c_str(), sTask->gid);
248 std::unique_lock<std::mutex> lck(sTask->lock);
249 if (sTask->dataWaitRefCnt == 0) {
250 return false;
251 }
252 sTask->MultiDepenceAdd(Denpence::DATA_DEPENCE);
253 sTask->UpdateState(ffrt::TaskState::BLOCKED);
254 return true;
255 };
256 FFRT_BLOCK_TRACER(task->gid, dat);
257 CoWait(pendDataDepFun);
258 }
259
onTaskDone(CPUEUTask * task)260 void SDependenceManager::onTaskDone(CPUEUTask* task)
261 {
262 auto sTask = static_cast<SCPUEUTask*>(task);
263 FFRT_LOGD("Task completed, task[%lu], name[%s]", sTask->gid, sTask->label.c_str());
264 #ifdef FFRT_BBOX_ENABLE
265 TaskDoneCounterInc();
266 #endif
267 FFRT_TRACE_SCOPE(1, ontaskDone);
268 sTask->DecChildRef();
269 if (!(sTask->ins.empty() && sTask->outs.empty())) {
270 std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
271 FFRT_TRACE_SCOPE(1, taskDoneAfterLock);
272
273 // Production data
274 for (auto out : std::as_const(sTask->outs)) {
275 out->onProduced();
276 }
277 // Consumption data
278 for (auto in : std::as_const(sTask->ins)) {
279 in->onConsumed(sTask);
280 }
281 for (auto in : std::as_const(sTask->in_handles)) {
282 in->DecDeleteRef();
283 }
284
285 // VersionCtx recycling
286 Entity::Instance()->RecycleVersion();
287 }
288
289 sTask->RecycleTask();
290 }
291
MapSignature2Deps(SCPUEUTask * task,const std::vector<const void * > & inDeps,const std::vector<const void * > & outDeps,std::vector<std::pair<VersionCtx *,NestType>> & inVersions,std::vector<std::pair<VersionCtx *,NestType>> & outVersions)292 void SDependenceManager::MapSignature2Deps(SCPUEUTask* task, const std::vector<const void*>& inDeps,
293 const std::vector<const void*>& outDeps, std::vector<std::pair<VersionCtx*, NestType>>& inVersions,
294 std::vector<std::pair<VersionCtx*, NestType>>& outVersions)
295 {
296 auto en = Entity::Instance();
297 // scene description:
298 for (auto signature : inDeps) {
299 VersionCtx* version = nullptr;
300 NestType type = NestType::DEFAULT;
301 // scene 1|2
302 for (auto parentOut : std::as_const(static_cast<SCPUEUTask*>(task->parent)->outs)) {
303 if (parentOut->signature == signature) {
304 version = parentOut;
305 type = NestType::PARENTOUT;
306 goto add_inversion;
307 }
308 }
309 // scene 3
310 for (auto parentIn : std::as_const(static_cast<SCPUEUTask*>(task->parent)->ins)) {
311 if (parentIn->signature == signature) {
312 version = parentIn;
313 type = NestType::PARENTIN;
314 goto add_inversion;
315 }
316 }
317 // scene 4
318 version = en->VA2Ctx(signature, task);
319 add_inversion:
320 inVersions.push_back({version, type});
321 }
322
323 for (auto signature : outDeps) {
324 VersionCtx* version = nullptr;
325 NestType type = NestType::DEFAULT;
326 // scene 5|6
327 for (auto parentOut : std::as_const(static_cast<SCPUEUTask*>(task->parent)->outs)) {
328 if (parentOut->signature == signature) {
329 version = parentOut;
330 type = NestType::PARENTOUT;
331 goto add_outversion;
332 }
333 }
334 // scene 7
335 #ifndef FFRT_RELEASE
336 for (auto parentIn : std::as_const(static_cast<SCPUEUTask*>(task->parent)->ins)) {
337 if (parentIn->signature == signature) {
338 FFRT_LOGE("parent's indep only cannot be child's outdep");
339 }
340 }
341 #endif
342 // scene 8
343 version = en->VA2Ctx(signature, task);
344 add_outversion:
345 outVersions.push_back({version, type});
346 }
347 }
348 } // namespace ffrt