1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "co_routine.h"
17 #include <cstdio>
18 #include <cstdlib>
19 #include <cstring>
20 #include <securec.h>
21 #include <string>
22 #include <sys/mman.h>
23 #include "ffrt_trace.h"
24 #include "dm/dependence_manager.h"
25 #include "core/entity.h"
26 #include "tm/queue_task.h"
27 #include "sched/scheduler.h"
28 #include "sync/sync.h"
29 #include "util/slab.h"
30 #include "sched/sched_deadline.h"
31 #include "sync/perf_counter.h"
32 #include "sync/io_poller.h"
33 #include "dfx/bbox/bbox.h"
34 #include "dfx/trace_record/ffrt_trace_record.h"
35 #include "co_routine_factory.h"
36 #include "util/ffrt_facade.h"
37 #ifdef FFRT_TASK_LOCAL_ENABLE
38 #include "pthread_ffrt.h"
39 #endif
40 #ifdef FFRT_ASYNC_STACKTRACE
41 #include "dfx/async_stack/ffrt_async_stack.h"
42 #ifdef FFRT_TASK_LOCAL_ENABLE
43 #include "pthread_ffrt.h"
44 #endif
45 #endif
46
47 using namespace ffrt;
48
CoStackCheck(CoRoutine * co)49 static inline void CoStackCheck(CoRoutine* co)
50 {
51 if (unlikely(co->stkMem.magic != STACK_MAGIC)) {
52 FFRT_LOGE("sp offset:%p.\n", co->stkMem.stk +
53 co->stkMem.size - co->ctx.regs[REG_SP]);
54 FFRT_LOGE("stack over flow, check local variable in you tasks or use api 'ffrt_task_attr_set_stack_size'.\n");
55 }
56 }
57
58 extern pthread_key_t g_executeCtxTlsKey;
59 pthread_key_t g_coThreadTlsKey = 0;
60 pthread_once_t g_coThreadTlsKeyOnce = PTHREAD_ONCE_INIT;
CoEnvDestructor(void * args)61 void CoEnvDestructor(void* args)
62 {
63 auto coEnv = static_cast<CoRoutineEnv*>(args);
64 if (coEnv) {
65 delete coEnv;
66 }
67 }
68
MakeCoEnvTlsKey()69 void MakeCoEnvTlsKey()
70 {
71 pthread_key_create(&g_coThreadTlsKey, CoEnvDestructor);
72 }
73
GetCoEnv()74 CoRoutineEnv* GetCoEnv()
75 {
76 CoRoutineEnv* coEnv = nullptr;
77 pthread_once(&g_coThreadTlsKeyOnce, MakeCoEnvTlsKey);
78
79 void *curTls = pthread_getspecific(g_coThreadTlsKey);
80 if (curTls != nullptr) {
81 coEnv = reinterpret_cast<CoRoutineEnv *>(curTls);
82 } else {
83 coEnv = new CoRoutineEnv();
84 pthread_setspecific(g_coThreadTlsKey, coEnv);
85 }
86 return coEnv;
87 }
88
89 #ifdef FFRT_TASK_LOCAL_ENABLE
90 namespace {
IsTaskLocalEnable(ffrt::CPUEUTask * task)91 bool IsTaskLocalEnable(ffrt::CPUEUTask* task)
92 {
93 if ((task->type != ffrt_normal_task) || (!task->taskLocal)) {
94 return false;
95 }
96
97 if (task->tsd == nullptr) {
98 FFRT_LOGE("taskLocal enabled but task tsd invalid");
99 return false;
100 }
101
102 return true;
103 }
104
InitWorkerTsdValueToTask(void ** taskTsd)105 void InitWorkerTsdValueToTask(void** taskTsd)
106 {
107 const pthread_key_t updKeyMap[] = {g_executeCtxTlsKey, g_coThreadTlsKey};
108 auto threadTsd = pthread_gettsd();
109 for (const auto& key : updKeyMap) {
110 FFRT_UNLIKELY_COND_DO_ABORT(key <= 0, "key[%d] invalid", key);
111 auto addr = threadTsd[key];
112 if (addr) {
113 taskTsd[key] = addr;
114 }
115 }
116 }
117
SwitchTsdAddrToTask(ffrt::CPUEUTask * task)118 void SwitchTsdAddrToTask(ffrt::CPUEUTask* task)
119 {
120 auto threadTsd = pthread_gettsd();
121 task->threadTsd = threadTsd;
122 pthread_settsd(task->tsd);
123 }
124
SwitchTsdToTask(ffrt::CPUEUTask * task)125 void SwitchTsdToTask(ffrt::CPUEUTask* task)
126 {
127 if (!IsTaskLocalEnable(task)) {
128 return;
129 }
130
131 InitWorkerTsdValueToTask(task->tsd);
132
133 SwitchTsdAddrToTask(task);
134
135 task->runningTid.store(pthread_self());
136 FFRT_LOGD("switch tsd to task Success");
137 }
138
SwitchTsdAddrToThread(ffrt::CPUEUTask * task)139 bool SwitchTsdAddrToThread(ffrt::CPUEUTask* task)
140 {
141 if (!task->threadTsd) {
142 return false;
143 }
144 pthread_settsd(task->threadTsd);
145 task->threadTsd = nullptr;
146 return true;
147 }
148
UpdateWorkerTsdValueToThread(void ** taskTsd)149 void UpdateWorkerTsdValueToThread(void** taskTsd)
150 {
151 const pthread_key_t updKeyMap[] = {g_executeCtxTlsKey, g_coThreadTlsKey};
152 auto threadTsd = pthread_gettsd();
153 for (const auto& key : updKeyMap) {
154 FFRT_UNLIKELY_COND_DO_ABORT(key <= 0, "key[%d] invalid", key);
155 auto threadVal = threadTsd[key];
156 auto taskVal = taskTsd[key];
157 if (!threadVal && taskVal) {
158 threadTsd[key] = taskVal;
159 } else {
160 FFRT_UNLIKELY_COND_DO_ABORT((threadVal && taskVal && (threadVal != taskVal)),
161 "mismatch key=[%d]", key);
162 FFRT_UNLIKELY_COND_DO_ABORT((threadVal && !taskVal),
163 "unexpected: thread exists but task not exists, key=[%d]", key);
164 }
165 taskTsd[key] = nullptr;
166 }
167 }
168
SwitchTsdToThread(ffrt::CPUEUTask * task)169 void SwitchTsdToThread(ffrt::CPUEUTask* task)
170 {
171 if (!IsTaskLocalEnable(task)) {
172 return;
173 }
174
175 if (!SwitchTsdAddrToThread(task)) {
176 return;
177 }
178
179 UpdateWorkerTsdValueToThread(task->tsd);
180
181 task->runningTid.store(0);
182 FFRT_LOGD("switch tsd to thread Success");
183 }
184
TaskTsdRunDtors(ffrt::CPUEUTask * task)185 void TaskTsdRunDtors(ffrt::CPUEUTask* task)
186 {
187 SwitchTsdAddrToTask(task);
188 pthread_tsd_run_dtors();
189 SwitchTsdAddrToThread(task);
190 }
191 } // namespace
192
TaskTsdDeconstruct(ffrt::CPUEUTask * task)193 void TaskTsdDeconstruct(ffrt::CPUEUTask* task)
194 {
195 if (!IsTaskLocalEnable(task)) {
196 return;
197 }
198
199 TaskTsdRunDtors(task);
200 if (task->tsd != nullptr) {
201 free(task->tsd);
202 task->tsd = nullptr;
203 task->taskLocal = false;
204 }
205 FFRT_LOGD("tsd deconstruct done, task[%lu], name[%s]", task->gid, task->label.c_str());
206 }
207 #endif
208
CoSwitch(CoCtx * from,CoCtx * to)209 static inline void CoSwitch(CoCtx* from, CoCtx* to)
210 {
211 co2_switch_context(from, to);
212 }
213
CoExit(CoRoutine * co,bool isNormalTask)214 static inline void CoExit(CoRoutine* co, bool isNormalTask)
215 {
216 #ifdef FFRT_TASK_LOCAL_ENABLE
217 if (isNormalTask) {
218 SwitchTsdToThread(co->task);
219 }
220 #endif
221 CoStackCheck(co);
222 #ifdef ASAN_MODE
223 /* co to thread start */
224 __sanitizer_start_switch_fiber((void **)&co->asanFakeStack, co->asanFiberAddr, co->asanFiberSize);
225 /* clear remaining shadow stack */
226 __asan_handle_no_return();
227 #endif
228 /* co switch to thread, and do not switch back again */
229 CoSwitch(&co->ctx, &co->thEnv->schCtx);
230 }
231
CoStartEntry(void * arg)232 static inline void CoStartEntry(void* arg)
233 {
234 CoRoutine* co = reinterpret_cast<CoRoutine*>(arg);
235 #ifdef ASAN_MODE
236 /* thread to co finish first */
237 __sanitizer_finish_switch_fiber(co->asanFakeStack, (const void**)&co->asanFiberAddr, &co->asanFiberSize);
238 #endif
239 ffrt::CPUEUTask* task = co->task;
240 bool isNormalTask = false;
241 switch (task->type) {
242 case ffrt_normal_task: {
243 isNormalTask = true;
244 task->Execute();
245 break;
246 }
247 case ffrt_queue_task: {
248 QueueTask* sTask = reinterpret_cast<QueueTask*>(task);
249 // Before the batch execution is complete, head node cannot be released.
250 sTask->IncDeleteRef();
251 sTask->Execute();
252 sTask->DecDeleteRef();
253 break;
254 }
255 default: {
256 FFRT_LOGE("CoStart unsupport task[%lu], type=%d, name[%s]", task->gid, task->type, task->label.c_str());
257 break;
258 }
259 }
260
261 co->status.store(static_cast<int>(CoStatus::CO_UNINITIALIZED));
262 CoExit(co, isNormalTask);
263 }
264
CoSetStackProt(CoRoutine * co,int prot)265 static void CoSetStackProt(CoRoutine* co, int prot)
266 {
267 /* set the attribute of the page table closest to the stack top in the user stack to read-only,
268 * and 1~2 page table space will be wasted
269 */
270 size_t p_size = getpagesize();
271 uint64_t mp = reinterpret_cast<uint64_t>(co->stkMem.stk);
272 mp = (mp + p_size - 1) / p_size * p_size;
273 int ret = mprotect(reinterpret_cast<void *>(static_cast<uintptr_t>(mp)), p_size, prot);
274 FFRT_UNLIKELY_COND_DO_ABORT(ret < 0, "coroutine size:%lu, mp:0x%lx, page_size:%zu,result:%d,prot:%d, err:%d,%s",
275 static_cast<unsigned long>(sizeof(struct CoRoutine)), static_cast<unsigned long>(mp),
276 p_size, ret, prot, errno, strerror(errno));
277 }
278
AllocNewCoRoutine(size_t stackSize)279 static inline CoRoutine* AllocNewCoRoutine(size_t stackSize)
280 {
281 std::size_t defaultStackSize = FFRTFacade::GetCSAInstance()->size;
282 CoRoutine* co = nullptr;
283 if (likely(stackSize == defaultStackSize)) {
284 co = ffrt::CoRoutineAllocMem(stackSize);
285 } else {
286 co = static_cast<CoRoutine*>(mmap(nullptr, stackSize,
287 PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0));
288 if (co == reinterpret_cast<CoRoutine*>(MAP_FAILED)) {
289 FFRT_LOGE("memory mmap failed.");
290 return nullptr;
291 }
292 }
293 if (!co) {
294 FFRT_LOGE("memory not enough");
295 return nullptr;
296 }
297 co->allocatedSize = stackSize;
298 co->stkMem.size = static_cast<uint64_t>(stackSize - sizeof(CoRoutine) + 8);
299 co->stkMem.magic = STACK_MAGIC;
300 if (FFRTFacade::GetCSAInstance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
301 CoSetStackProt(co, PROT_READ);
302 }
303 co->status.store(static_cast<int>(CoStatus::CO_UNINITIALIZED));
304 return co;
305 }
306
CoMemFree(CoRoutine * co)307 static inline void CoMemFree(CoRoutine* co)
308 {
309 if (FFRTFacade::GetCSAInstance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
310 CoSetStackProt(co, PROT_WRITE | PROT_READ);
311 }
312 std::size_t defaultStackSize = FFRTFacade::GetCSAInstance()->size;
313 if (likely(co->allocatedSize == defaultStackSize)) {
314 ffrt::CoRoutineFreeMem(co);
315 } else {
316 int ret = munmap(co, co->allocatedSize);
317 if (ret != 0) {
318 FFRT_LOGE("munmap failed with errno: %d", errno);
319 }
320 }
321 }
322
CoStackFree(void)323 void CoStackFree(void)
324 {
325 if (GetCoEnv()) {
326 if (GetCoEnv()->runningCo) {
327 CoMemFree(GetCoEnv()->runningCo);
328 GetCoEnv()->runningCo = nullptr;
329 }
330 }
331 }
332
CoWorkerExit(void)333 void CoWorkerExit(void)
334 {
335 CoStackFree();
336 }
337
BindNewCoRoutione(ffrt::CPUEUTask * task)338 static inline void BindNewCoRoutione(ffrt::CPUEUTask* task)
339 {
340 task->coRoutine = GetCoEnv()->runningCo;
341 task->coRoutine->task = task;
342 task->coRoutine->thEnv = GetCoEnv();
343 }
344
UnbindCoRoutione(ffrt::CPUEUTask * task)345 static inline void UnbindCoRoutione(ffrt::CPUEUTask* task)
346 {
347 task->coRoutine->task = nullptr;
348 task->coRoutine = nullptr;
349 }
350
CoAlloc(ffrt::CPUEUTask * task)351 static inline int CoAlloc(ffrt::CPUEUTask* task)
352 {
353 if (task->coRoutine) { // use allocated coroutine stack
354 if (GetCoEnv()->runningCo) { // free cached stack if it exist
355 CoMemFree(GetCoEnv()->runningCo);
356 }
357 GetCoEnv()->runningCo = task->coRoutine;
358 } else {
359 if (!GetCoEnv()->runningCo) { // if no cached stack, alloc one
360 GetCoEnv()->runningCo = AllocNewCoRoutine(task->stack_size);
361 } else { // exist cached stack
362 if (GetCoEnv()->runningCo->allocatedSize != task->stack_size) { // stack size not match, alloc one
363 CoMemFree(GetCoEnv()->runningCo); // free cached stack
364 GetCoEnv()->runningCo = AllocNewCoRoutine(task->stack_size);
365 }
366 }
367 }
368 return 0;
369 }
370
371 // call CoCreat when task creat
CoCreat(ffrt::CPUEUTask * task)372 static inline int CoCreat(ffrt::CPUEUTask* task)
373 {
374 CoAlloc(task);
375 if (GetCoEnv()->runningCo == nullptr) {
376 return -1;
377 }
378 BindNewCoRoutione(task);
379 auto co = task->coRoutine;
380 if (co->status.load() == static_cast<int>(CoStatus::CO_UNINITIALIZED)) {
381 co2_init_context(&co->ctx, CoStartEntry, static_cast<void*>(co), co->stkMem.stk, co->stkMem.size);
382 }
383 return 0;
384 }
385
CoSwitchInTransaction(ffrt::CPUEUTask * task)386 static inline void CoSwitchInTransaction(ffrt::CPUEUTask* task)
387 {
388 if (task->coRoutine->status == static_cast<int>(CoStatus::CO_NOT_FINISH)) {
389 for (auto& name : task->traceTag) {
390 FFRT_TRACE_BEGIN(name.c_str());
391 }
392 }
393 FFRT_FAKE_TRACE_MARKER(task->gid);
394 }
395
CoSwitchOutTransaction(ffrt::CPUEUTask * task)396 static inline void CoSwitchOutTransaction(ffrt::CPUEUTask* task)
397 {
398 FFRT_FAKE_TRACE_MARKER(task->gid);
399 int traceTagNum = static_cast<int>(task->traceTag.size());
400 for (int i = 0; i < traceTagNum; ++i) {
401 FFRT_TRACE_END();
402 }
403 }
404
CoBboxPreCheck(ffrt::CPUEUTask * task)405 static inline bool CoBboxPreCheck(ffrt::CPUEUTask* task)
406 {
407 if (task->coRoutine) {
408 int ret = task->coRoutine->status.exchange(static_cast<int>(CoStatus::CO_RUNNING));
409 if (ret == static_cast<int>(CoStatus::CO_RUNNING) && GetBboxEnableState() != 0) {
410 FFRT_LOGE("executed by worker suddenly, ignore backtrace");
411 return false;
412 }
413 }
414
415 return true;
416 }
417
418 // called by thread work
CoStart(ffrt::CPUEUTask * task,CoRoutineEnv * coRoutineEnv)419 int CoStart(ffrt::CPUEUTask* task, CoRoutineEnv* coRoutineEnv)
420 {
421 if (!CoBboxPreCheck(task)) {
422 return 0;
423 }
424
425 if (CoCreat(task) != 0) {
426 return -1;
427 }
428 auto co = task->coRoutine;
429
430 FFRTTraceRecord::TaskRun(task->GetQos(), task);
431
432 for (;;) {
433 ffrt::TaskLoadTracking::Begin(task);
434 #ifdef FFRT_ASYNC_STACKTRACE
435 FFRTSetStackId(task->stackId);
436 #endif
437 FFRT_TASK_BEGIN(task->label, task->gid);
438 if (task->type == ffrt_normal_task) {
439 task->UpdateState(ffrt::TaskState::RUNNING);
440 }
441 CoSwitchInTransaction(task);
442 #ifdef FFRT_TASK_LOCAL_ENABLE
443 SwitchTsdToTask(co->task);
444 #endif
445 #ifdef ASAN_MODE
446 /* thread to co start */
447 __sanitizer_start_switch_fiber((void **)&co->asanFakeStack, GetCoStackAddr(co), co->stkMem.size);
448 #endif
449 /* thread switch to co */
450 CoSwitch(&co->thEnv->schCtx, &co->ctx);
451 #ifdef ASAN_MODE
452 /* co to thread finish */
453 __sanitizer_finish_switch_fiber(co->asanFakeStack, (const void**)&co->asanFiberAddr, &co->asanFiberSize);
454 #endif
455 FFRT_TASK_END();
456 ffrt::TaskLoadTracking::End(task); // Todo: deal with CoWait()
457 CoStackCheck(co);
458
459 // 1. coroutine task done, exit normally, need to exec next coroutine task
460 if (co->isTaskDone) {
461 task->UpdateState(ffrt::TaskState::EXITED);
462 co->isTaskDone = false;
463 return 0;
464 }
465
466 // 2. couroutine task block, switch to thread
467 // need suspend the coroutine task or continue to execute the coroutine task.
468 auto pending = coRoutineEnv->pending;
469 if (pending == nullptr) {
470 return 0;
471 }
472 coRoutineEnv->pending = nullptr;
473 FFRTTraceRecord::TaskCoSwitchOut(task);
474 // Fast path: skip state transition
475 if ((*pending)(task)) {
476 // The ownership of the task belongs to other host(cv/mutex/epoll etc)
477 // And the task cannot be accessed any more.
478 return 0;
479 }
480 FFRT_WAKE_TRACER(task->gid); // fast path wk
481 coRoutineEnv->runningCo = co;
482 }
483 }
484
485 // called by thread work
CoYield(void)486 void CoYield(void)
487 {
488 CoRoutine* co = static_cast<CoRoutine*>(GetCoEnv()->runningCo);
489 co->status.store(static_cast<int>(CoStatus::CO_NOT_FINISH));
490 GetCoEnv()->runningCo = nullptr;
491 CoSwitchOutTransaction(co->task);
492 if (co->task->type == ffrt_normal_task) {
493 co->task->UpdateState(ffrt::TaskState::BLOCKED);
494 }
495 FFRT_BLOCK_MARKER(co->task->gid);
496 #ifdef FFRT_TASK_LOCAL_ENABLE
497 SwitchTsdToThread(co->task);
498 #endif
499 CoStackCheck(co);
500 #ifdef ASAN_MODE
501 /* co to thread start */
502 __sanitizer_start_switch_fiber((void **)&co->asanFakeStack, co->asanFiberAddr, co->asanFiberSize);
503 #endif
504 /* co switch to thread */
505 CoSwitch(&co->ctx, &GetCoEnv()->schCtx);
506 #ifdef ASAN_MODE
507 /* thread to co finish */
508 __sanitizer_finish_switch_fiber(co->asanFakeStack, (const void**)&co->asanFiberAddr, &co->asanFiberSize);
509 #else
510 while (GetBboxEnableState() != 0) {
511 if (GetBboxEnableState() != gettid()) {
512 BboxFreeze(); // freeze non-crash thread
513 return;
514 }
515 const int IGNORE_DEPTH = 3;
516 backtrace(IGNORE_DEPTH);
517 co->status.store(static_cast<int>(CoStatus::CO_NOT_FINISH)); // recovery to old state
518 CoExit(co, co->task->type == ffrt_normal_task);
519 }
520 #endif
521 }
522
CoWait(const std::function<bool (ffrt::CPUEUTask *)> & pred)523 void CoWait(const std::function<bool(ffrt::CPUEUTask*)>& pred)
524 {
525 GetCoEnv()->pending = &pred;
526 CoYield();
527 }
528
CoWake(ffrt::CPUEUTask * task,bool timeOut)529 void CoWake(ffrt::CPUEUTask* task, bool timeOut)
530 {
531 if (task == nullptr) {
532 FFRT_LOGE("task is nullptr");
533 return;
534 }
535 // Fast path: state transition without lock
536 task->wakeupTimeOut = timeOut;
537 FFRT_WAKE_TRACER(task->gid);
538 switch (task->type) {
539 case ffrt_normal_task: {
540 task->UpdateState(ffrt::TaskState::READY);
541 break;
542 }
543 case ffrt_queue_task: {
544 QueueTask* sTask = reinterpret_cast<QueueTask*>(task);
545 auto handle = sTask->GetHandler();
546 handle->TransferTask(sTask);
547 break;
548 }
549 default: {
550 FFRT_LOGE("CoWake unsupport task[%lu], type=%d, name[%s]", task->gid, task->type, task->label.c_str());
551 break;
552 }
553 }
554 }
555
Instance()556 CoRoutineFactory &CoRoutineFactory::Instance()
557 {
558 static CoRoutineFactory fac;
559 return fac;
560 }
561