1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "co_routine.h"
17 #include <cstdio>
18 #include <cstdlib>
19 #include <cstring>
20 #include <securec.h>
21 #include <string>
22 #include <sys/mman.h>
23 #include "util/cpu_boost_wrapper.h"
24 #include "ffrt_trace.h"
25 #include "dm/dependence_manager.h"
26 #include "core/entity.h"
27 #include "tm/queue_task.h"
28 #include "sched/scheduler.h"
29 #include "sync/sync.h"
30 #include "util/slab.h"
31 #include "sched/sched_deadline.h"
32 #include "sync/perf_counter.h"
33 #include "sync/io_poller.h"
34 #include "dfx/bbox/bbox.h"
35 #include "dfx/trace_record/ffrt_trace_record.h"
36 #include "co_routine_factory.h"
37 #include "util/ffrt_facade.h"
38 #ifdef FFRT_TASK_LOCAL_ENABLE
39 #include "pthread_ffrt.h"
40 #endif
41 #ifdef FFRT_ASYNC_STACKTRACE
42 #include "dfx/async_stack/ffrt_async_stack.h"
43 #ifdef FFRT_TASK_LOCAL_ENABLE
44 #include "pthread_ffrt.h"
45 #endif
46 #endif
47
48 using namespace ffrt;
49
CoStackCheck(CoRoutine * co)50 static inline void CoStackCheck(CoRoutine* co)
51 {
52 if (unlikely(co->stkMem.magic != STACK_MAGIC)) {
53 FFRT_LOGE("sp offset:%p.\n", co->stkMem.stk +
54 co->stkMem.size - co->ctx.regs[FFRT_REG_SP]);
55 FFRT_LOGE("stack over flow, check local variable in you tasks or use api 'ffrt_task_attr_set_stack_size'.\n");
56 if (ExecuteCtx::Cur()->task != nullptr) {
57 auto curTask = ExecuteCtx::Cur()->task;
58 FFRT_LOGE("task name[%s], gid[%lu], submit_tid[%d]",
59 curTask->label.c_str(), curTask->gid, curTask->fromTid);
60 }
61 abort();
62 }
63 }
64
65 extern pthread_key_t g_executeCtxTlsKey;
66 pthread_key_t g_coThreadTlsKey = 0;
67 pthread_once_t g_coThreadTlsKeyOnce = PTHREAD_ONCE_INIT;
CoEnvDestructor(void * args)68 void CoEnvDestructor(void* args)
69 {
70 auto coEnv = static_cast<CoRoutineEnv*>(args);
71 if (coEnv) {
72 delete coEnv;
73 }
74 }
75
MakeCoEnvTlsKey()76 void MakeCoEnvTlsKey()
77 {
78 pthread_key_create(&g_coThreadTlsKey, CoEnvDestructor);
79 }
80
GetCoEnv()81 CoRoutineEnv* GetCoEnv()
82 {
83 CoRoutineEnv* coEnv = nullptr;
84 pthread_once(&g_coThreadTlsKeyOnce, MakeCoEnvTlsKey);
85
86 void *curTls = pthread_getspecific(g_coThreadTlsKey);
87 if (curTls != nullptr) {
88 coEnv = reinterpret_cast<CoRoutineEnv *>(curTls);
89 } else {
90 coEnv = new CoRoutineEnv();
91 pthread_setspecific(g_coThreadTlsKey, coEnv);
92 }
93 return coEnv;
94 }
95
96 #ifdef FFRT_TASK_LOCAL_ENABLE
97 namespace {
IsTaskLocalEnable(ffrt::CPUEUTask * task)98 bool IsTaskLocalEnable(ffrt::CPUEUTask* task)
99 {
100 if ((task->type != ffrt_normal_task) || (!task->taskLocal)) {
101 return false;
102 }
103
104 if (task->tsd == nullptr) {
105 FFRT_LOGE("taskLocal enabled but task tsd invalid");
106 return false;
107 }
108
109 return true;
110 }
111
InitWorkerTsdValueToTask(void ** taskTsd)112 void InitWorkerTsdValueToTask(void** taskTsd)
113 {
114 const pthread_key_t updKeyMap[] = {g_executeCtxTlsKey, g_coThreadTlsKey};
115 auto threadTsd = pthread_gettsd();
116 for (const auto& key : updKeyMap) {
117 FFRT_UNLIKELY_COND_DO_ABORT(key <= 0, "FFRT abort: key[%u] invalid", key);
118 auto addr = threadTsd[key];
119 if (addr) {
120 taskTsd[key] = addr;
121 }
122 }
123 }
124
SwitchTsdAddrToTask(ffrt::CPUEUTask * task)125 void SwitchTsdAddrToTask(ffrt::CPUEUTask* task)
126 {
127 auto threadTsd = pthread_gettsd();
128 task->threadTsd = threadTsd;
129 pthread_settsd(task->tsd);
130 }
131
SwitchTsdToTask(ffrt::CPUEUTask * task)132 void SwitchTsdToTask(ffrt::CPUEUTask* task)
133 {
134 if (!IsTaskLocalEnable(task)) {
135 return;
136 }
137
138 InitWorkerTsdValueToTask(task->tsd);
139
140 SwitchTsdAddrToTask(task);
141
142 task->runningTid.store(pthread_self());
143 FFRT_LOGD("switch tsd to task Success");
144 }
145
SwitchTsdAddrToThread(ffrt::CPUEUTask * task)146 bool SwitchTsdAddrToThread(ffrt::CPUEUTask* task)
147 {
148 if (!task->threadTsd) {
149 return false;
150 }
151 pthread_settsd(task->threadTsd);
152 task->threadTsd = nullptr;
153 return true;
154 }
155
UpdateWorkerTsdValueToThread(void ** taskTsd)156 void UpdateWorkerTsdValueToThread(void** taskTsd)
157 {
158 const pthread_key_t updKeyMap[] = {g_executeCtxTlsKey, g_coThreadTlsKey};
159 auto threadTsd = pthread_gettsd();
160 for (const auto& key : updKeyMap) {
161 FFRT_UNLIKELY_COND_DO_ABORT(key <= 0, "FFRT abort: key[%u] invalid", key);
162 auto threadVal = threadTsd[key];
163 auto taskVal = taskTsd[key];
164 if (!threadVal && taskVal) {
165 threadTsd[key] = taskVal;
166 } else {
167 FFRT_UNLIKELY_COND_DO_ABORT((threadVal && taskVal && (threadVal != taskVal)),
168 "FFRT abort: mismatch key=[%u]", key);
169 FFRT_UNLIKELY_COND_DO_ABORT((threadVal && !taskVal),
170 "FFRT abort: unexpected: thread exists but task not exists, key=[%u]", key);
171 }
172 taskTsd[key] = nullptr;
173 }
174 }
175
SwitchTsdToThread(ffrt::CPUEUTask * task)176 void SwitchTsdToThread(ffrt::CPUEUTask* task)
177 {
178 if (!IsTaskLocalEnable(task)) {
179 return;
180 }
181
182 if (!SwitchTsdAddrToThread(task)) {
183 return;
184 }
185
186 UpdateWorkerTsdValueToThread(task->tsd);
187
188 task->runningTid.store(0);
189 FFRT_LOGD("switch tsd to thread Success");
190 }
191
TaskTsdRunDtors(ffrt::CPUEUTask * task)192 void TaskTsdRunDtors(ffrt::CPUEUTask* task)
193 {
194 SwitchTsdAddrToTask(task);
195 pthread_tsd_run_dtors();
196 SwitchTsdAddrToThread(task);
197 }
198 } // namespace
199
TaskTsdDeconstruct(ffrt::CPUEUTask * task)200 void TaskTsdDeconstruct(ffrt::CPUEUTask* task)
201 {
202 if (!IsTaskLocalEnable(task)) {
203 return;
204 }
205
206 TaskTsdRunDtors(task);
207 if (task->tsd != nullptr) {
208 free(task->tsd);
209 task->tsd = nullptr;
210 task->taskLocal = false;
211 }
212 FFRT_LOGD("tsd deconstruct done, task[%lu], name[%s]", task->gid, task->label.c_str());
213 }
214 #endif
215
CoSwitch(CoCtx * from,CoCtx * to)216 static inline void CoSwitch(CoCtx* from, CoCtx* to)
217 {
218 co2_switch_context(from, to);
219 }
220
CoExit(CoRoutine * co,bool isNormalTask)221 static inline void CoExit(CoRoutine* co, bool isNormalTask)
222 {
223 #ifdef FFRT_TASK_LOCAL_ENABLE
224 if (isNormalTask) {
225 SwitchTsdToThread(co->task);
226 }
227 #endif
228 CoStackCheck(co);
229 #ifdef ASAN_MODE
230 /* co to thread start */
231 __sanitizer_start_switch_fiber((void **)&co->asanFakeStack, co->asanFiberAddr, co->asanFiberSize);
232 /* clear remaining shadow stack */
233 __asan_handle_no_return();
234 #endif
235 /* co switch to thread, and do not switch back again */
236 CoSwitch(&co->ctx, &co->thEnv->schCtx);
237 }
238
CoStartEntry(void * arg)239 static inline void CoStartEntry(void* arg)
240 {
241 CoRoutine* co = reinterpret_cast<CoRoutine*>(arg);
242 #ifdef ASAN_MODE
243 /* thread to co finish first */
244 __sanitizer_finish_switch_fiber(co->asanFakeStack, (const void**)&co->asanFiberAddr, &co->asanFiberSize);
245 #endif
246 ffrt::CoTask* task = co->task;
247 bool isNormalTask = task->type == ffrt_normal_task;
248 task->Execute();
249 co->status.store(static_cast<int>(CoStatus::CO_UNINITIALIZED));
250 CoExit(co, isNormalTask);
251 }
252
CoSetStackProt(CoRoutine * co,int prot)253 static void CoSetStackProt(CoRoutine* co, int prot)
254 {
255 /* set the attribute of the page table closest to the stack top in the user stack to read-only,
256 * and 1~2 page table space will be wasted
257 */
258 size_t p_size = getpagesize();
259 uint64_t mp = reinterpret_cast<uint64_t>(co->stkMem.stk);
260 mp = (mp + p_size - 1) / p_size * p_size;
261 int ret = mprotect(reinterpret_cast<void *>(static_cast<uintptr_t>(mp)), p_size, prot);
262 FFRT_UNLIKELY_COND_DO_ABORT(ret < 0, "coroutine size:%lu, mp:0x%lx, page_size:%zu,result:%d,prot:%d, err:%d,%s",
263 static_cast<unsigned long>(sizeof(struct CoRoutine)), static_cast<unsigned long>(mp),
264 p_size, ret, prot, errno, strerror(errno));
265 }
266
AllocNewCoRoutine(size_t stackSize)267 static inline CoRoutine* AllocNewCoRoutine(size_t stackSize)
268 {
269 std::size_t defaultStackSize = FFRTFacade::GetCSAInstance()->size;
270 CoRoutine* co = nullptr;
271 if (likely(stackSize == defaultStackSize)) {
272 co = ffrt::CoRoutineAllocMem(stackSize);
273 } else {
274 co = static_cast<CoRoutine*>(mmap(nullptr, stackSize,
275 PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0));
276 if (co == reinterpret_cast<CoRoutine*>(MAP_FAILED)) {
277 FFRT_LOGE("memory mmap failed.");
278 return nullptr;
279 }
280 }
281 if (!co) {
282 FFRT_LOGE("memory not enough");
283 return nullptr;
284 }
285 co->allocatedSize = stackSize;
286 co->stkMem.size = static_cast<uint64_t>(stackSize - sizeof(CoRoutine) + 8);
287 co->stkMem.magic = STACK_MAGIC;
288 if (FFRTFacade::GetCSAInstance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
289 CoSetStackProt(co, PROT_READ);
290 }
291 co->status.store(static_cast<int>(CoStatus::CO_UNINITIALIZED));
292 return co;
293 }
294
CoMemFree(CoRoutine * co)295 static inline void CoMemFree(CoRoutine* co)
296 {
297 if (FFRTFacade::GetCSAInstance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
298 CoSetStackProt(co, PROT_WRITE | PROT_READ);
299 }
300 std::size_t defaultStackSize = FFRTFacade::GetCSAInstance()->size;
301 if (likely(co->allocatedSize == defaultStackSize)) {
302 ffrt::CoRoutineFreeMem(co);
303 } else {
304 int ret = munmap(co, co->allocatedSize);
305 if (ret != 0) {
306 FFRT_LOGE("munmap failed with errno: %d", errno);
307 }
308 }
309 }
310
CoStackFree(void)311 void CoStackFree(void)
312 {
313 if (GetCoEnv()) {
314 if (GetCoEnv()->runningCo) {
315 CoMemFree(GetCoEnv()->runningCo);
316 GetCoEnv()->runningCo = nullptr;
317 }
318 }
319 }
320
CoWorkerExit(void)321 void CoWorkerExit(void)
322 {
323 CoStackFree();
324 }
325
BindNewCoRoutione(ffrt::CPUEUTask * task)326 static inline void BindNewCoRoutione(ffrt::CPUEUTask* task)
327 {
328 task->coRoutine = GetCoEnv()->runningCo;
329 task->coRoutine->task = task;
330 task->coRoutine->thEnv = GetCoEnv();
331 }
332
UnbindCoRoutione(ffrt::CPUEUTask * task)333 static inline void UnbindCoRoutione(ffrt::CPUEUTask* task)
334 {
335 task->coRoutine->task = nullptr;
336 task->coRoutine = nullptr;
337 }
338
CoAlloc(ffrt::CPUEUTask * task)339 static inline int CoAlloc(ffrt::CPUEUTask* task)
340 {
341 if (task->coRoutine) { // use allocated coroutine stack
342 if (GetCoEnv()->runningCo) { // free cached stack if it exist
343 CoMemFree(GetCoEnv()->runningCo);
344 }
345 GetCoEnv()->runningCo = task->coRoutine;
346 } else {
347 if (!GetCoEnv()->runningCo) { // if no cached stack, alloc one
348 GetCoEnv()->runningCo = AllocNewCoRoutine(task->stack_size);
349 } else { // exist cached stack
350 if (GetCoEnv()->runningCo->allocatedSize != task->stack_size) { // stack size not match, alloc one
351 CoMemFree(GetCoEnv()->runningCo); // free cached stack
352 GetCoEnv()->runningCo = AllocNewCoRoutine(task->stack_size);
353 }
354 }
355 }
356 return 0;
357 }
358
359 // call CoCreat when task creat
CoCreat(ffrt::CPUEUTask * task)360 static inline int CoCreat(ffrt::CPUEUTask* task)
361 {
362 CoAlloc(task);
363 if (GetCoEnv()->runningCo == nullptr) { // retry once if alloc failed
364 CoAlloc(task);
365 if (GetCoEnv()->runningCo == nullptr) { // retry still failed
366 FFRT_LOGE("alloc co routine failed");
367 return -1;
368 }
369 }
370 BindNewCoRoutione(task);
371 auto co = task->coRoutine;
372 if (co->status.load() == static_cast<int>(CoStatus::CO_UNINITIALIZED)) {
373 co2_init_context(&co->ctx, CoStartEntry, static_cast<void*>(co), co->stkMem.stk, co->stkMem.size);
374 }
375 return 0;
376 }
377
CoSwitchInTransaction(ffrt::CPUEUTask * task)378 static inline void CoSwitchInTransaction(ffrt::CPUEUTask* task)
379 {
380 if (task->cpuBoostCtxId >= 0) {
381 CpuBoostRestore(task->cpuBoostCtxId);
382 }
383 }
384
CoSwitchOutTransaction(ffrt::CPUEUTask * task)385 static inline void CoSwitchOutTransaction(ffrt::CPUEUTask* task)
386 {
387 if (task->cpuBoostCtxId >= 0) {
388 CpuBoostSave(task->cpuBoostCtxId);
389 }
390 }
391
CoBboxPreCheck(ffrt::CPUEUTask * task)392 static inline bool CoBboxPreCheck(ffrt::CPUEUTask* task)
393 {
394 if (task->coRoutine) {
395 int ret = task->coRoutine->status.exchange(static_cast<int>(CoStatus::CO_RUNNING));
396 if (ret == static_cast<int>(CoStatus::CO_RUNNING) && GetBboxEnableState() != 0) {
397 FFRT_LOGE("executed by worker suddenly, ignore backtrace");
398 return false;
399 }
400 }
401
402 return true;
403 }
404
405 // called by thread work
CoStart(ffrt::CPUEUTask * task,CoRoutineEnv * coRoutineEnv)406 int CoStart(ffrt::CPUEUTask* task, CoRoutineEnv* coRoutineEnv)
407 {
408 if (!CoBboxPreCheck(task)) {
409 return 0;
410 }
411 if (CoCreat(task) != 0) {
412 return -1;
413 }
414 auto co = task->coRoutine;
415
416 FFRTTraceRecord::TaskRun(task->GetQos(), task);
417
418 for (;;) {
419 ffrt::TaskLoadTracking::Begin(task);
420 #ifdef FFRT_ASYNC_STACKTRACE
421 FFRTSetStackId(task->stackId);
422 #endif
423 #ifdef ENABLE_HITRACE_CHAIN
424 if (task->traceId_.valid == HITRACE_ID_VALID) {
425 HiTraceChainRestoreId(&task->traceId_);
426 }
427 #endif
428 FFRT_TASK_BEGIN(task->label, task->gid);
429 if (task->type == ffrt_normal_task) {
430 task->UpdateState(ffrt::TaskState::RUNNING);
431 }
432 CoSwitchInTransaction(task);
433 #ifdef FFRT_TASK_LOCAL_ENABLE
434 SwitchTsdToTask(co->task);
435 #endif
436 #ifdef ASAN_MODE
437 /* thread to co start */
438 __sanitizer_start_switch_fiber((void **)&co->asanFakeStack, GetCoStackAddr(co), co->stkMem.size);
439 #endif
440 /* thread switch to co */
441 CoSwitch(&co->thEnv->schCtx, &co->ctx);
442 #ifdef ASAN_MODE
443 /* co to thread finish */
444 __sanitizer_finish_switch_fiber(co->asanFakeStack, (const void **)&co->asanFiberAddr, &co->asanFiberSize);
445 #endif
446 FFRT_TASK_END();
447 #ifdef ENABLE_HITRACE_CHAIN
448 task->traceId_ = HiTraceChainGetId();
449 if (task->traceId_.valid == HITRACE_ID_VALID) {
450 HiTraceChainClearId();
451 }
452 #endif
453 ffrt::TaskLoadTracking::End(task); // Todo: deal with CoWait()
454 CoStackCheck(co);
455
456 // 1. coroutine task done, exit normally, need to exec next coroutine task
457 if (co->isTaskDone) {
458 task->UpdateState(ffrt::TaskState::EXITED);
459 co->isTaskDone = false;
460 return 0;
461 }
462
463 // 2. couroutine task block, switch to thread
464 // need suspend the coroutine task or continue to execute the coroutine task.
465 auto pending = coRoutineEnv->pending;
466 if (pending == nullptr) {
467 return 0;
468 }
469 coRoutineEnv->pending = nullptr;
470 FFRTTraceRecord::TaskCoSwitchOut(task);
471 // Fast path: skip state transition
472 if ((*pending)(task)) {
473 // The ownership of the task belongs to other host(cv/mutex/epoll etc)
474 // And the task cannot be accessed any more.
475 return 0;
476 }
477 FFRT_WAKE_TRACER(task->gid); // fast path wk
478 coRoutineEnv->runningCo = co;
479 }
480 }
481
482 // called by thread work
CoYield(void)483 void CoYield(void)
484 {
485 CoRoutine* co = static_cast<CoRoutine*>(GetCoEnv()->runningCo);
486 co->status.store(static_cast<int>(CoStatus::CO_NOT_FINISH));
487 GetCoEnv()->runningCo = nullptr;
488 CoSwitchOutTransaction(co->task);
489 if (co->task->type == ffrt_normal_task) {
490 co->task->UpdateState(ffrt::TaskState::BLOCKED);
491 }
492 FFRT_BLOCK_MARKER(co->task->gid);
493 #ifdef FFRT_TASK_LOCAL_ENABLE
494 SwitchTsdToThread(co->task);
495 #endif
496 CoStackCheck(co);
497 #ifdef ASAN_MODE
498 /* co to thread start */
499 __sanitizer_start_switch_fiber((void **)&co->asanFakeStack, co->asanFiberAddr, co->asanFiberSize);
500 #endif
501 /* co switch to thread */
502 CoSwitch(&co->ctx, &GetCoEnv()->schCtx);
503 #ifdef ASAN_MODE
504 /* thread to co finish */
505 __sanitizer_finish_switch_fiber(co->asanFakeStack, (const void**)&co->asanFiberAddr, &co->asanFiberSize);
506 #else
507 while (GetBboxEnableState() != 0) {
508 if (GetBboxEnableState() != gettid()) {
509 BboxFreeze(); // freeze non-crash thread
510 return;
511 }
512 const int IGNORE_DEPTH = 3;
513 backtrace(IGNORE_DEPTH);
514 co->status.store(static_cast<int>(CoStatus::CO_NOT_FINISH)); // recovery to old state
515 CoExit(co, co->task->type == ffrt_normal_task);
516 }
517 #endif
518 }
519
CoWait(const std::function<bool (ffrt::CPUEUTask *)> & pred)520 void CoWait(const std::function<bool(ffrt::CPUEUTask*)>& pred)
521 {
522 GetCoEnv()->pending = &pred;
523 CoYield();
524 }
525
CoWake(ffrt::CPUEUTask * task,CoWakeType type)526 void CoWake(ffrt::CPUEUTask* task, CoWakeType type)
527 {
528 if (task == nullptr) {
529 FFRT_LOGE("task is nullptr");
530 return;
531 }
532 // Fast path: state transition without lock
533 task->coWakeType = type;
534 FFRT_WAKE_TRACER(task->gid);
535 switch (task->type) {
536 case ffrt_normal_task: {
537 task->UpdateState(ffrt::TaskState::READY);
538 break;
539 }
540 case ffrt_queue_task: {
541 QueueTask* sTask = reinterpret_cast<QueueTask*>(task);
542 auto handle = sTask->GetHandler();
543 handle->TransferTask(sTask);
544 break;
545 }
546 default: {
547 FFRT_LOGE("CoWake unsupport task[%lu], type=%d, name[%s]", task->gid, task->type, task->label.c_str());
548 break;
549 }
550 }
551 }
552
Instance()553 CoRoutineFactory &CoRoutineFactory::Instance()
554 {
555 static CoRoutineFactory fac;
556 return fac;
557 }