1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "eu/co_routine.h"
17 #include <cstdio>
18 #include <cstdlib>
19 #include <cstring>
20 #include <securec.h>
21 #include <string>
22 #include <sys/mman.h>
23 #include "util/cpu_boost_wrapper.h"
24 #include "dfx/trace/ffrt_trace.h"
25 #include "dm/dependence_manager.h"
26 #include "core/entity.h"
27 #include "tm/queue_task.h"
28 #include "sched/scheduler.h"
29 #include "sync/sync.h"
30 #include "util/slab.h"
31 #include "sched/sched_deadline.h"
32 #include "sync/perf_counter.h"
33 #include "dfx/bbox/bbox.h"
34 #include "dfx/trace_record/ffrt_trace_record.h"
35 #include "eu/co_routine_factory.h"
36 #include "util/ffrt_facade.h"
37 #ifdef FFRT_TASK_LOCAL_ENABLE
38 #include "pthread_ffrt.h"
39 #endif
40 #ifdef FFRT_ASYNC_STACKTRACE
41 #include "dfx/async_stack/ffrt_async_stack.h"
42 #ifdef FFRT_TASK_LOCAL_ENABLE
43 #include "pthread_ffrt.h"
44 #endif
45 #endif
46
47 #ifdef FFRT_ENABLE_HITRACE_CHAIN
48 #include "dfx/trace/ffrt_trace_chain.h"
49 #endif
50
51 using namespace ffrt;
52
CoStackCheck(CoRoutine * co)53 static inline void CoStackCheck(CoRoutine* co)
54 {
55 if (unlikely(co->stkMem.magic != STACK_MAGIC)) {
56 FFRT_SYSEVENT_LOGE("sp offset:%llx.\n", co->stkMem.stk +
57 co->stkMem.size - co->ctx.storage[FFRT_REG_SP]);
58 FFRT_SYSEVENT_LOGE("stack over flow, check local variable in you tasks"
59 " or use api 'ffrt_task_attr_set_stack_size'.\n");
60 if (ExecuteCtx::Cur()->task != nullptr) {
61 auto curTask = ExecuteCtx::Cur()->task;
62 FFRT_SYSEVENT_LOGE("task name[%s], gid[%lu], submit_tid[%d]",
63 curTask->GetLabel().c_str(), curTask->gid, curTask->fromTid);
64 }
65 abort();
66 }
67 }
68
69 extern pthread_key_t g_executeCtxTlsKey;
70 pthread_key_t g_coThreadTlsKey = 0;
71 pthread_once_t g_coThreadTlsKeyOnce = PTHREAD_ONCE_INIT;
CoEnvDestructor(void * args)72 void CoEnvDestructor(void* args)
73 {
74 auto coEnv = static_cast<CoRoutineEnv*>(args);
75 if (coEnv) {
76 delete coEnv;
77 }
78 }
79
MakeCoEnvTlsKey()80 void MakeCoEnvTlsKey()
81 {
82 pthread_key_create(&g_coThreadTlsKey, CoEnvDestructor);
83 }
84
GetCoEnv()85 static inline CoRoutineEnv* GetCoEnv()
86 {
87 CoRoutineEnv* coEnv = nullptr;
88 pthread_once(&g_coThreadTlsKeyOnce, MakeCoEnvTlsKey);
89
90 void *curTls = pthread_getspecific(g_coThreadTlsKey);
91 if (curTls != nullptr) {
92 coEnv = reinterpret_cast<CoRoutineEnv *>(curTls);
93 } else {
94 coEnv = new CoRoutineEnv();
95 pthread_setspecific(g_coThreadTlsKey, coEnv);
96 }
97 return coEnv;
98 }
99
GetCoRoutineEnv()100 CoRoutineEnv* GetCoRoutineEnv()
101 {
102 return GetCoEnv();
103 }
104
105 #ifdef FFRT_TASK_LOCAL_ENABLE
106 namespace {
IsTaskLocalEnable(ffrt::CoTask * task)107 bool IsTaskLocalEnable(ffrt::CoTask* task)
108 {
109 if ((task->type != ffrt_normal_task)) {
110 return false;
111 }
112
113 TaskLocalAttr* attr = static_cast<CPUEUTask*>(task)->tlsAttr;
114
115 if (attr == nullptr || !attr->taskLocal) {
116 return false;
117 }
118
119 if (attr->tsd == nullptr) {
120 FFRT_SYSEVENT_LOGE("taskLocal enabled but task tsd invalid");
121 return false;
122 }
123
124 return true;
125 }
126
InitWorkerTsdValueToTask(void ** taskTsd)127 void InitWorkerTsdValueToTask(void** taskTsd)
128 {
129 const pthread_key_t updKeyMap[] = {g_executeCtxTlsKey, g_coThreadTlsKey};
130 auto threadTsd = pthread_gettsd();
131 for (const auto& key : updKeyMap) {
132 FFRT_UNLIKELY_COND_DO_ABORT(key <= 0, "FFRT abort: key[%u] invalid", key);
133 auto addr = threadTsd[key];
134 if (addr) {
135 taskTsd[key] = addr;
136 }
137 }
138 }
139
SwitchTsdAddrToTask(ffrt::CPUEUTask * task)140 void SwitchTsdAddrToTask(ffrt::CPUEUTask* task)
141 {
142 auto threadTsd = pthread_gettsd();
143 task->tlsAttr->threadTsd = threadTsd;
144 pthread_settsd(task->tlsAttr->tsd);
145 }
146
SwitchTsdToTask(ffrt::CoTask * task)147 void SwitchTsdToTask(ffrt::CoTask* task)
148 {
149 if (!IsTaskLocalEnable(task)) {
150 return;
151 }
152
153 CPUEUTask* cpuTask = static_cast<CPUEUTask*>(task);
154
155 InitWorkerTsdValueToTask(cpuTask->tlsAttr->tsd);
156
157 SwitchTsdAddrToTask(cpuTask);
158
159 cpuTask->runningTid.store(pthread_self());
160 FFRT_LOGD("switch tsd to task Success");
161 }
162
SwitchTsdAddrToThread(ffrt::CPUEUTask * task)163 bool SwitchTsdAddrToThread(ffrt::CPUEUTask* task)
164 {
165 if (!task->tlsAttr->threadTsd) {
166 FFRT_SYSEVENT_LOGE("threadTsd is null");
167 return false;
168 }
169 pthread_settsd(task->tlsAttr->threadTsd);
170 task->tlsAttr->threadTsd = nullptr;
171 return true;
172 }
173
UpdateWorkerTsdValueToThread(void ** taskTsd)174 void UpdateWorkerTsdValueToThread(void** taskTsd)
175 {
176 const pthread_key_t updKeyMap[] = {g_executeCtxTlsKey, g_coThreadTlsKey};
177 auto threadTsd = pthread_gettsd();
178 for (const auto& key : updKeyMap) {
179 FFRT_UNLIKELY_COND_DO_ABORT(key <= 0, "FFRT abort: key[%u] invalid", key);
180 auto threadVal = threadTsd[key];
181 auto taskVal = taskTsd[key];
182 if (!threadVal && taskVal) {
183 threadTsd[key] = taskVal;
184 } else {
185 FFRT_UNLIKELY_COND_DO_ABORT((threadVal && taskVal && (threadVal != taskVal)),
186 "FFRT abort: mismatch key=[%u]", key);
187 FFRT_UNLIKELY_COND_DO_ABORT((threadVal && !taskVal),
188 "FFRT abort: unexpected: thread exists but task not exists, key=[%u]", key);
189 }
190 taskTsd[key] = nullptr;
191 }
192 }
193
SwitchTsdToThread(ffrt::CoTask * task)194 void SwitchTsdToThread(ffrt::CoTask* task)
195 {
196 if (!IsTaskLocalEnable(task)) {
197 return;
198 }
199
200 CPUEUTask* cpuTask = static_cast<CPUEUTask*>(task);
201
202 if (!SwitchTsdAddrToThread(cpuTask)) {
203 return;
204 }
205
206 UpdateWorkerTsdValueToThread(cpuTask->tlsAttr->tsd);
207
208 cpuTask->runningTid.store(0);
209 FFRT_LOGD("switch tsd to thread Success");
210 }
211
TaskTsdRunDtors(ffrt::CPUEUTask * task)212 void TaskTsdRunDtors(ffrt::CPUEUTask* task)
213 {
214 SwitchTsdAddrToTask(task);
215 pthread_tsd_run_dtors();
216 SwitchTsdAddrToThread(task);
217 }
218 } // namespace
219
TaskTsdDeconstruct(ffrt::CPUEUTask * task)220 void TaskTsdDeconstruct(ffrt::CPUEUTask* task)
221 {
222 if (!IsTaskLocalEnable(task)) {
223 return;
224 }
225
226 TaskTsdRunDtors(task);
227 if (task->tlsAttr->tsd != nullptr) {
228 free(task->tlsAttr->tsd);
229 task->tlsAttr->tsd = nullptr;
230 task->tlsAttr->taskLocal = false;
231 }
232 FFRT_LOGD("tsd deconstruct done, task[%lu], name[%s]", task->gid, task->GetLabel().c_str());
233 }
234 #endif
235
CoSwitch(CoCtx * from,CoCtx * to)236 static inline void CoSwitch(CoCtx* from, CoCtx* to)
237 {
238 co2_switch_context(from, to);
239 }
240
CoExit(CoRoutine * co,bool isNormalTask)241 static inline void CoExit(CoRoutine* co, bool isNormalTask)
242 {
243 #ifdef FFRT_ENABLE_HITRACE_CHAIN
244 TraceChainAdapter::Instance().HiTraceChainClearId();
245 #endif
246 #ifdef FFRT_TASK_LOCAL_ENABLE
247 if (isNormalTask) {
248 SwitchTsdToThread(co->task);
249 }
250 #endif
251 CoStackCheck(co);
252 #ifdef ASAN_MODE
253 /* co to thread start */
254 __sanitizer_start_switch_fiber((void **)&co->asanFakeStack, co->asanFiberAddr, co->asanFiberSize);
255 /* clear remaining shadow stack */
256 __asan_handle_no_return();
257 #endif
258 /* co switch to thread, and do not switch back again */
259 CoSwitch(&co->ctx, &co->thEnv->schCtx);
260 }
261
CoStartEntry(void * arg)262 static inline void CoStartEntry(void* arg)
263 {
264 CoRoutine* co = reinterpret_cast<CoRoutine*>(arg);
265 #ifdef ASAN_MODE
266 /* thread to co finish first */
267 __sanitizer_finish_switch_fiber(co->asanFakeStack, (const void**)&co->asanFiberAddr, &co->asanFiberSize);
268 #endif
269 ffrt::CoTask* task = co->task;
270 bool isNormalTask = task->type == ffrt_normal_task;
271 task->Execute();
272 co->status.store(static_cast<int>(CoStatus::CO_UNINITIALIZED));
273 CoExit(co, isNormalTask);
274 }
275
CoSetStackProt(CoRoutine * co,int prot)276 static void CoSetStackProt(CoRoutine* co, int prot)
277 {
278 /* set the attribute of the page table closest to the stack top in the user stack to read-only,
279 * and 1~2 page table space will be wasted
280 */
281 size_t p_size = getpagesize();
282 uint64_t mp = reinterpret_cast<uint64_t>(co->stkMem.stk);
283 mp = (mp + p_size - 1) / p_size * p_size;
284 int ret = mprotect(reinterpret_cast<void *>(static_cast<uintptr_t>(mp)), p_size, prot);
285 FFRT_UNLIKELY_COND_DO_ABORT(ret < 0, "coroutine size:%lu, mp:0x%lx, page_size:%zu,result:%d,prot:%d, err:%d,%s",
286 static_cast<unsigned long>(sizeof(struct CoRoutine)), static_cast<unsigned long>(mp),
287 p_size, ret, prot, errno, strerror(errno));
288 }
289
AllocNewCoRoutine(size_t stackSize)290 static inline CoRoutine* AllocNewCoRoutine(size_t stackSize)
291 {
292 std::size_t defaultStackSize = FFRTFacade::GetCSAInstance()->size;
293 CoRoutine* co = nullptr;
294 if (likely(stackSize == defaultStackSize)) {
295 co = ffrt::CoRoutineAllocMem(stackSize);
296 } else {
297 co = static_cast<CoRoutine*>(mmap(nullptr, stackSize,
298 PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0));
299 if (co == reinterpret_cast<CoRoutine*>(MAP_FAILED)) {
300 FFRT_SYSEVENT_LOGE("memory mmap failed.");
301 return nullptr;
302 }
303 }
304 if (!co) {
305 FFRT_SYSEVENT_LOGE("memory not enough");
306 return nullptr;
307 }
308 new (co)CoRoutine();
309 co->allocatedSize = stackSize;
310 co->stkMem.size = static_cast<uint64_t>(stackSize - sizeof(CoRoutine) + 8);
311 co->stkMem.magic = STACK_MAGIC;
312 if (FFRTFacade::GetCSAInstance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
313 CoSetStackProt(co, PROT_READ);
314 }
315 return co;
316 }
317
CoMemFree(CoRoutine * co)318 static inline void CoMemFree(CoRoutine* co)
319 {
320 if (FFRTFacade::GetCSAInstance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
321 CoSetStackProt(co, PROT_WRITE | PROT_READ);
322 }
323 std::size_t defaultStackSize = FFRTFacade::GetCSAInstance()->size;
324 #if defined(TSAN_MODE)
325 assert(co->ctx.tsanFiber != nullptr);
326 __tsan_destroy_fiber(co->ctx.tsanFiber);
327 #endif
328 if (likely(co->allocatedSize == defaultStackSize)) {
329 ffrt::CoRoutineFreeMem(co);
330 } else {
331 int ret = munmap(co, co->allocatedSize);
332 if (ret != 0) {
333 FFRT_SYSEVENT_LOGE("munmap failed with errno: %d", errno);
334 }
335 }
336 }
337
CoStackFree(void)338 void CoStackFree(void)
339 {
340 CoRoutineEnv* coEnv = GetCoEnv();
341 if (coEnv != nullptr && coEnv->runningCo != nullptr) {
342 CoMemFree(coEnv->runningCo);
343 coEnv->runningCo = nullptr;
344 }
345 }
346
CoWorkerExit(void)347 void CoWorkerExit(void)
348 {
349 CoStackFree();
350 }
351
BindNewCoRoutione(ffrt::CoTask * task)352 static inline void BindNewCoRoutione(ffrt::CoTask* task)
353 {
354 CoRoutineEnv* coEnv = GetCoEnv();
355 task->coRoutine = coEnv->runningCo;
356 task->coRoutine->task = task;
357 task->coRoutine->thEnv = coEnv;
358 }
359
CoAlloc(ffrt::CoTask * task)360 static inline int CoAlloc(ffrt::CoTask* task)
361 {
362 CoRoutineEnv* coEnv = GetCoEnv();
363 if (task->coRoutine) { // use allocated coroutine stack
364 if (coEnv->runningCo) { // free cached stack if it exist
365 CoMemFree(coEnv->runningCo);
366 }
367 coEnv->runningCo = task->coRoutine;
368 } else {
369 if (!coEnv->runningCo) { // if no cached stack, alloc one
370 coEnv->runningCo = AllocNewCoRoutine(task->stack_size);
371 } else { // exist cached stack
372 if (coEnv->runningCo->allocatedSize != task->stack_size) { // stack size not match, alloc one
373 CoMemFree(coEnv->runningCo); // free cached stack
374 coEnv->runningCo = AllocNewCoRoutine(task->stack_size);
375 }
376 }
377 }
378 return 0;
379 }
380
381 // call CoCreate when task create
CoCreate(ffrt::CoTask * task)382 static inline int CoCreate(ffrt::CoTask* task)
383 {
384 CoAlloc(task);
385 if (GetCoEnv()->runningCo == nullptr) { // retry once if alloc failed
386 CoAlloc(task);
387 if (GetCoEnv()->runningCo == nullptr) { // retry still failed
388 FFRT_LOGE("alloc co routine failed");
389 return -1;
390 }
391 }
392 BindNewCoRoutione(task);
393 auto co = task->coRoutine;
394 if (co->status.load() == static_cast<int>(CoStatus::CO_UNINITIALIZED)) {
395 ffrt_fiber_init(&co->ctx, CoStartEntry, static_cast<void*>(co), co->stkMem.stk, co->stkMem.size);
396 }
397 return 0;
398 }
399
CoSwitchInTransaction(ffrt::CoTask * task)400 static inline void CoSwitchInTransaction(ffrt::CoTask* task)
401 {
402 if (task->cpuBoostCtxId >= 0) {
403 CpuBoostRestore(task->cpuBoostCtxId);
404 }
405 }
406
CoSwitchOutTransaction(ffrt::CoTask * task)407 static inline void CoSwitchOutTransaction(ffrt::CoTask* task)
408 {
409 if (task->cpuBoostCtxId >= 0) {
410 CpuBoostSave(task->cpuBoostCtxId);
411 }
412 }
413
CoBboxPreCheck(ffrt::CoTask * task)414 static inline bool CoBboxPreCheck(ffrt::CoTask* task)
415 {
416 if (task->coRoutine) {
417 int ret = task->coRoutine->status.exchange(static_cast<int>(CoStatus::CO_RUNNING));
418 if (ret == static_cast<int>(CoStatus::CO_RUNNING) && GetBboxEnableState() != 0) {
419 FFRT_SYSEVENT_LOGE("executed by worker suddenly, ignore backtrace");
420 return false;
421 }
422 }
423
424 return true;
425 }
426
427 // called by thread work
CoStart(ffrt::CoTask * task,CoRoutineEnv * coRoutineEnv)428 int CoStart(ffrt::CoTask* task, CoRoutineEnv* coRoutineEnv)
429 {
430 if (!CoBboxPreCheck(task)) {
431 return 0;
432 }
433
434 if (CoCreate(task) != 0) {
435 return -1;
436 }
437 auto co = task->coRoutine;
438
439 FFRTTraceRecord::TaskRun(task->GetQos(), task);
440 for (;;) {
441 ffrt::TaskLoadTracking::Begin(task);
442 #ifdef FFRT_ASYNC_STACKTRACE
443 FFRTSetStackId(task->stackId);
444 #endif
445 FFRT_TASK_BEGIN(task->label, task->gid);
446 CoSwitchInTransaction(task);
447 #ifdef FFRT_TASK_LOCAL_ENABLE
448 SwitchTsdToTask(co->task);
449 #endif
450 #ifdef FFRT_ENABLE_HITRACE_CHAIN
451 if (task->traceId_.valid == HITRACE_ID_VALID) {
452 TraceChainAdapter::Instance().HiTraceChainRestoreId(&task->traceId_);
453 }
454 #endif
455 #ifdef ASAN_MODE
456 /* thread to co start */
457 __sanitizer_start_switch_fiber((void **)&co->asanFakeStack, GetCoStackAddr(co), co->stkMem.size);
458 #endif
459 // mark tasks as EXECUTING which waked by CoWakeFunc that blocked by CoWait before
460 task->SetStatus(TaskStatus::EXECUTING);
461 /* thread switch to co */
462 CoSwitch(&co->thEnv->schCtx, &co->ctx);
463 #ifdef ASAN_MODE
464 /* co to thread finish */
465 __sanitizer_finish_switch_fiber(co->asanFakeStack, (const void**)&co->asanFiberAddr, &co->asanFiberSize);
466 #endif
467 FFRT_TASK_END();
468 #ifdef FFRT_ENABLE_HITRACE_CHAIN
469 if (co->status.load() != static_cast<int>(CoStatus::CO_UNINITIALIZED)) {
470 task->traceId_ = TraceChainAdapter::Instance().HiTraceChainGetId();
471 if (task->traceId_.valid == HITRACE_ID_VALID) {
472 TraceChainAdapter::Instance().HiTraceChainClearId();
473 }
474 }
475 #endif
476 ffrt::TaskLoadTracking::End(task); // Todo: deal with CoWait()
477 CoStackCheck(co);
478
479 // 1. coroutine task done, exit normally, need to exec next coroutine task
480 if (co->isTaskDone) {
481 ffrt::FFRTFacade::GetDMInstance().onTaskDone(static_cast<CPUEUTask*>(task));
482 co->isTaskDone = false;
483 return 0;
484 }
485
486 // 2. coroutine task block, switch to thread
487 // need suspend the coroutine task or continue to execute the coroutine task.
488 auto pending = coRoutineEnv->pending;
489 if (pending == nullptr) {
490 return 0;
491 }
492 coRoutineEnv->pending = nullptr;
493 FFRTTraceRecord::TaskCoSwitchOut(task);
494 // Fast path: skip state transition
495 if ((*pending)(task)) {
496 // The ownership of the task belongs to other host(cv/mutex/epoll etc)
497 // And the task cannot be accessed any more.
498 return 0;
499 }
500 FFRT_WAKE_TRACER(task->gid); // fast path wk
501 coRoutineEnv->runningCo = co;
502 }
503 }
504
505 // called by thread work
CoYield(void)506 void CoYield(void)
507 {
508 CoRoutineEnv* coEnv = GetCoEnv();
509 CoRoutine* co = static_cast<CoRoutine*>(coEnv->runningCo);
510 co->status.store(static_cast<int>(CoStatus::CO_NOT_FINISH));
511 coEnv->runningCo = nullptr;
512 CoSwitchOutTransaction(co->task);
513 FFRT_BLOCK_MARKER(co->task->gid);
514 #ifdef FFRT_TASK_LOCAL_ENABLE
515 SwitchTsdToThread(co->task);
516 #endif
517 CoStackCheck(co);
518 #ifdef ASAN_MODE
519 /* co to thread start */
520 __sanitizer_start_switch_fiber((void **)&co->asanFakeStack, co->asanFiberAddr, co->asanFiberSize);
521 #endif
522 /* co switch to thread */
523 CoSwitch(&co->ctx, &GetCoEnv()->schCtx);
524 #ifdef ASAN_MODE
525 /* thread to co finish */
526 __sanitizer_finish_switch_fiber(co->asanFakeStack, (const void**)&co->asanFiberAddr, &co->asanFiberSize);
527 #else
528 while (GetBboxEnableState() != 0) {
529 if (GetBboxEnableState() != gettid()) {
530 BboxFreeze(); // freeze non-crash thread
531 return;
532 }
533 const int IGNORE_DEPTH = 3;
534 backtrace(IGNORE_DEPTH);
535 co->status.store(static_cast<int>(CoStatus::CO_NOT_FINISH)); // recovery to old state
536 CoExit(co, co->task->type == ffrt_normal_task);
537 }
538 #endif
539 }
540
CoWait(const std::function<bool (ffrt::CoTask *)> & pred)541 void CoWait(const std::function<bool(ffrt::CoTask*)>& pred)
542 {
543 GetCoEnv()->pending = &pred;
544 CoYield();
545 }
546
CoWake(ffrt::CoTask * task,CoWakeType type)547 void CoWake(ffrt::CoTask* task, CoWakeType type)
548 {
549 if (task == nullptr) {
550 FFRT_SYSEVENT_LOGE("task is nullptr");
551 return;
552 }
553 // Fast path: state transition without lock
554 task->coWakeType = type;
555 FFRT_WAKE_TRACER(task->gid);
556 switch (task->type) {
557 case ffrt_normal_task: {
558 task->Ready();
559 break;
560 }
561 case ffrt_queue_task: {
562 QueueTask* sTask = reinterpret_cast<QueueTask*>(task);
563 auto handle = sTask->GetHandler();
564 handle->TransferTask(sTask);
565 break;
566 }
567 default: {
568 FFRT_LOGE("CoWake unsupport task[%lu], type=%d, name[%s]", task->gid, task->type, task->GetLabel().c_str());
569 break;
570 }
571 }
572 }
573
Instance()574 CoRoutineFactory &CoRoutineFactory::Instance()
575 {
576 static CoRoutineFactory fac;
577 return fac;
578 }
579