• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "co_routine.h"
17 #include <cstdio>
18 #include <cstdlib>
19 #include <cstring>
20 #include <securec.h>
21 #include <string>
22 #include <sys/mman.h>
23 #include "util/cpu_boost_wrapper.h"
24 #include "ffrt_trace.h"
25 #include "dm/dependence_manager.h"
26 #include "core/entity.h"
27 #include "tm/queue_task.h"
28 #include "sched/scheduler.h"
29 #include "sync/sync.h"
30 #include "util/slab.h"
31 #include "sched/sched_deadline.h"
32 #include "sync/perf_counter.h"
33 #include "sync/io_poller.h"
34 #include "dfx/bbox/bbox.h"
35 #include "dfx/trace_record/ffrt_trace_record.h"
36 #include "co_routine_factory.h"
37 #include "util/ffrt_facade.h"
38 #ifdef FFRT_TASK_LOCAL_ENABLE
39 #include "pthread_ffrt.h"
40 #endif
41 #ifdef FFRT_ASYNC_STACKTRACE
42 #include "dfx/async_stack/ffrt_async_stack.h"
43 #ifdef FFRT_TASK_LOCAL_ENABLE
44 #include "pthread_ffrt.h"
45 #endif
46 #endif
47 
48 using namespace ffrt;
49 
CoStackCheck(CoRoutine * co)50 static inline void CoStackCheck(CoRoutine* co)
51 {
52     if (unlikely(co->stkMem.magic != STACK_MAGIC)) {
53         FFRT_LOGE("sp offset:%p.\n", co->stkMem.stk +
54             co->stkMem.size - co->ctx.regs[FFRT_REG_SP]);
55         FFRT_LOGE("stack over flow, check local variable in you tasks or use api 'ffrt_task_attr_set_stack_size'.\n");
56         if (ExecuteCtx::Cur()->task != nullptr) {
57             auto curTask = ExecuteCtx::Cur()->task;
58             FFRT_LOGE("task name[%s], gid[%lu], submit_tid[%d]",
59                 curTask->label.c_str(), curTask->gid, curTask->fromTid);
60         }
61         abort();
62     }
63 }
64 
65 extern pthread_key_t g_executeCtxTlsKey;
66 pthread_key_t g_coThreadTlsKey = 0;
67 pthread_once_t g_coThreadTlsKeyOnce = PTHREAD_ONCE_INIT;
CoEnvDestructor(void * args)68 void CoEnvDestructor(void* args)
69 {
70     auto coEnv = static_cast<CoRoutineEnv*>(args);
71     if (coEnv) {
72         delete coEnv;
73     }
74 }
75 
MakeCoEnvTlsKey()76 void MakeCoEnvTlsKey()
77 {
78     pthread_key_create(&g_coThreadTlsKey, CoEnvDestructor);
79 }
80 
GetCoEnv()81 CoRoutineEnv* GetCoEnv()
82 {
83     CoRoutineEnv* coEnv = nullptr;
84     pthread_once(&g_coThreadTlsKeyOnce, MakeCoEnvTlsKey);
85 
86     void *curTls = pthread_getspecific(g_coThreadTlsKey);
87     if (curTls != nullptr) {
88         coEnv = reinterpret_cast<CoRoutineEnv *>(curTls);
89     } else {
90         coEnv = new CoRoutineEnv();
91         pthread_setspecific(g_coThreadTlsKey, coEnv);
92     }
93     return coEnv;
94 }
95 
96 #ifdef FFRT_TASK_LOCAL_ENABLE
97 namespace {
IsTaskLocalEnable(ffrt::CPUEUTask * task)98 bool IsTaskLocalEnable(ffrt::CPUEUTask* task)
99 {
100     if ((task->type != ffrt_normal_task) || (!task->taskLocal)) {
101         return false;
102     }
103 
104     if (task->tsd == nullptr) {
105         FFRT_LOGE("taskLocal enabled but task tsd invalid");
106         return false;
107     }
108 
109     return true;
110 }
111 
InitWorkerTsdValueToTask(void ** taskTsd)112 void InitWorkerTsdValueToTask(void** taskTsd)
113 {
114     const pthread_key_t updKeyMap[] = {g_executeCtxTlsKey, g_coThreadTlsKey};
115     auto threadTsd = pthread_gettsd();
116     for (const auto& key : updKeyMap) {
117         FFRT_UNLIKELY_COND_DO_ABORT(key <= 0, "FFRT abort: key[%u] invalid", key);
118         auto addr = threadTsd[key];
119         if (addr) {
120             taskTsd[key] = addr;
121         }
122     }
123 }
124 
SwitchTsdAddrToTask(ffrt::CPUEUTask * task)125 void SwitchTsdAddrToTask(ffrt::CPUEUTask* task)
126 {
127     auto threadTsd = pthread_gettsd();
128     task->threadTsd = threadTsd;
129     pthread_settsd(task->tsd);
130 }
131 
SwitchTsdToTask(ffrt::CPUEUTask * task)132 void SwitchTsdToTask(ffrt::CPUEUTask* task)
133 {
134     if (!IsTaskLocalEnable(task)) {
135         return;
136     }
137 
138     InitWorkerTsdValueToTask(task->tsd);
139 
140     SwitchTsdAddrToTask(task);
141 
142     task->runningTid.store(pthread_self());
143     FFRT_LOGD("switch tsd to task Success");
144 }
145 
SwitchTsdAddrToThread(ffrt::CPUEUTask * task)146 bool SwitchTsdAddrToThread(ffrt::CPUEUTask* task)
147 {
148     if (!task->threadTsd) {
149         return false;
150     }
151     pthread_settsd(task->threadTsd);
152     task->threadTsd = nullptr;
153     return true;
154 }
155 
UpdateWorkerTsdValueToThread(void ** taskTsd)156 void UpdateWorkerTsdValueToThread(void** taskTsd)
157 {
158     const pthread_key_t updKeyMap[] = {g_executeCtxTlsKey, g_coThreadTlsKey};
159     auto threadTsd = pthread_gettsd();
160     for (const auto& key : updKeyMap) {
161         FFRT_UNLIKELY_COND_DO_ABORT(key <= 0, "FFRT abort: key[%u] invalid", key);
162         auto threadVal = threadTsd[key];
163         auto taskVal = taskTsd[key];
164         if (!threadVal && taskVal) {
165             threadTsd[key] = taskVal;
166         } else {
167             FFRT_UNLIKELY_COND_DO_ABORT((threadVal && taskVal && (threadVal != taskVal)),
168                 "FFRT abort: mismatch key=[%u]", key);
169             FFRT_UNLIKELY_COND_DO_ABORT((threadVal && !taskVal),
170                 "FFRT abort: unexpected: thread exists but task not exists, key=[%u]", key);
171         }
172         taskTsd[key] = nullptr;
173     }
174 }
175 
SwitchTsdToThread(ffrt::CPUEUTask * task)176 void SwitchTsdToThread(ffrt::CPUEUTask* task)
177 {
178     if (!IsTaskLocalEnable(task)) {
179         return;
180     }
181 
182     if (!SwitchTsdAddrToThread(task)) {
183         return;
184     }
185 
186     UpdateWorkerTsdValueToThread(task->tsd);
187 
188     task->runningTid.store(0);
189     FFRT_LOGD("switch tsd to thread Success");
190 }
191 
TaskTsdRunDtors(ffrt::CPUEUTask * task)192 void TaskTsdRunDtors(ffrt::CPUEUTask* task)
193 {
194     SwitchTsdAddrToTask(task);
195     pthread_tsd_run_dtors();
196     SwitchTsdAddrToThread(task);
197 }
198 } // namespace
199 
TaskTsdDeconstruct(ffrt::CPUEUTask * task)200 void TaskTsdDeconstruct(ffrt::CPUEUTask* task)
201 {
202     if (!IsTaskLocalEnable(task)) {
203         return;
204     }
205 
206     TaskTsdRunDtors(task);
207     if (task->tsd != nullptr) {
208         free(task->tsd);
209         task->tsd = nullptr;
210         task->taskLocal = false;
211     }
212     FFRT_LOGD("tsd deconstruct done, task[%lu], name[%s]", task->gid, task->label.c_str());
213 }
214 #endif
215 
CoSwitch(CoCtx * from,CoCtx * to)216 static inline void CoSwitch(CoCtx* from, CoCtx* to)
217 {
218     co2_switch_context(from, to);
219 }
220 
CoExit(CoRoutine * co,bool isNormalTask)221 static inline void CoExit(CoRoutine* co, bool isNormalTask)
222 {
223 #ifdef FFRT_TASK_LOCAL_ENABLE
224     if (isNormalTask) {
225         SwitchTsdToThread(co->task);
226     }
227 #endif
228     CoStackCheck(co);
229 #ifdef ASAN_MODE
230     /* co to thread start */
231     __sanitizer_start_switch_fiber((void **)&co->asanFakeStack, co->asanFiberAddr, co->asanFiberSize);
232     /* clear remaining shadow stack */
233     __asan_handle_no_return();
234 #endif
235     /* co switch to thread, and do not switch back again */
236     CoSwitch(&co->ctx, &co->thEnv->schCtx);
237 }
238 
CoStartEntry(void * arg)239 static inline void CoStartEntry(void* arg)
240 {
241     CoRoutine* co = reinterpret_cast<CoRoutine*>(arg);
242 #ifdef ASAN_MODE
243     /* thread to co finish first */
244     __sanitizer_finish_switch_fiber(co->asanFakeStack, (const void**)&co->asanFiberAddr, &co->asanFiberSize);
245 #endif
246     ffrt::CoTask* task = co->task;
247     bool isNormalTask = task->type == ffrt_normal_task;
248     task->Execute();
249     co->status.store(static_cast<int>(CoStatus::CO_UNINITIALIZED));
250     CoExit(co, isNormalTask);
251 }
252 
CoSetStackProt(CoRoutine * co,int prot)253 static void CoSetStackProt(CoRoutine* co, int prot)
254 {
255     /* set the attribute of the page table closest to the stack top in the user stack to read-only,
256      * and 1~2 page table space will be wasted
257      */
258     size_t p_size = getpagesize();
259     uint64_t mp = reinterpret_cast<uint64_t>(co->stkMem.stk);
260     mp = (mp + p_size - 1) / p_size * p_size;
261     int ret = mprotect(reinterpret_cast<void *>(static_cast<uintptr_t>(mp)), p_size, prot);
262     FFRT_UNLIKELY_COND_DO_ABORT(ret < 0, "coroutine size:%lu, mp:0x%lx, page_size:%zu,result:%d,prot:%d, err:%d,%s",
263                                 static_cast<unsigned long>(sizeof(struct CoRoutine)), static_cast<unsigned long>(mp),
264                                 p_size, ret, prot, errno, strerror(errno));
265 }
266 
AllocNewCoRoutine(size_t stackSize)267 static inline CoRoutine* AllocNewCoRoutine(size_t stackSize)
268 {
269     std::size_t defaultStackSize = FFRTFacade::GetCSAInstance()->size;
270     CoRoutine* co = nullptr;
271     if (likely(stackSize == defaultStackSize)) {
272         co = ffrt::CoRoutineAllocMem(stackSize);
273     } else {
274         co = static_cast<CoRoutine*>(mmap(nullptr, stackSize,
275             PROT_READ | PROT_WRITE,  MAP_ANONYMOUS | MAP_PRIVATE, -1, 0));
276         if (co == reinterpret_cast<CoRoutine*>(MAP_FAILED)) {
277             FFRT_LOGE("memory mmap failed.");
278             return nullptr;
279         }
280     }
281     if (!co) {
282         FFRT_LOGE("memory not enough");
283         return nullptr;
284     }
285     co->allocatedSize = stackSize;
286     co->stkMem.size = static_cast<uint64_t>(stackSize - sizeof(CoRoutine) + 8);
287     co->stkMem.magic = STACK_MAGIC;
288     if (FFRTFacade::GetCSAInstance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
289         CoSetStackProt(co, PROT_READ);
290     }
291     co->status.store(static_cast<int>(CoStatus::CO_UNINITIALIZED));
292     return co;
293 }
294 
CoMemFree(CoRoutine * co)295 static inline void CoMemFree(CoRoutine* co)
296 {
297     if (FFRTFacade::GetCSAInstance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
298         CoSetStackProt(co, PROT_WRITE | PROT_READ);
299     }
300     std::size_t defaultStackSize = FFRTFacade::GetCSAInstance()->size;
301     if (likely(co->allocatedSize == defaultStackSize)) {
302         ffrt::CoRoutineFreeMem(co);
303     } else {
304         int ret = munmap(co, co->allocatedSize);
305         if (ret != 0) {
306             FFRT_LOGE("munmap failed with errno: %d", errno);
307         }
308     }
309 }
310 
CoStackFree(void)311 void CoStackFree(void)
312 {
313     if (GetCoEnv()) {
314         if (GetCoEnv()->runningCo) {
315             CoMemFree(GetCoEnv()->runningCo);
316             GetCoEnv()->runningCo = nullptr;
317         }
318     }
319 }
320 
CoWorkerExit(void)321 void CoWorkerExit(void)
322 {
323     CoStackFree();
324 }
325 
BindNewCoRoutione(ffrt::CPUEUTask * task)326 static inline void BindNewCoRoutione(ffrt::CPUEUTask* task)
327 {
328     task->coRoutine = GetCoEnv()->runningCo;
329     task->coRoutine->task = task;
330     task->coRoutine->thEnv = GetCoEnv();
331 }
332 
UnbindCoRoutione(ffrt::CPUEUTask * task)333 static inline void UnbindCoRoutione(ffrt::CPUEUTask* task)
334 {
335     task->coRoutine->task = nullptr;
336     task->coRoutine = nullptr;
337 }
338 
CoAlloc(ffrt::CPUEUTask * task)339 static inline int CoAlloc(ffrt::CPUEUTask* task)
340 {
341     if (task->coRoutine) { // use allocated coroutine stack
342         if (GetCoEnv()->runningCo) { // free cached stack if it exist
343             CoMemFree(GetCoEnv()->runningCo);
344         }
345         GetCoEnv()->runningCo = task->coRoutine;
346     } else {
347         if (!GetCoEnv()->runningCo) { // if no cached stack, alloc one
348             GetCoEnv()->runningCo = AllocNewCoRoutine(task->stack_size);
349         } else { // exist cached stack
350             if (GetCoEnv()->runningCo->allocatedSize != task->stack_size) { // stack size not match, alloc one
351                 CoMemFree(GetCoEnv()->runningCo); // free cached stack
352                 GetCoEnv()->runningCo = AllocNewCoRoutine(task->stack_size);
353             }
354         }
355     }
356     return 0;
357 }
358 
359 // call CoCreat when task creat
CoCreat(ffrt::CPUEUTask * task)360 static inline int CoCreat(ffrt::CPUEUTask* task)
361 {
362     CoAlloc(task);
363     if (GetCoEnv()->runningCo == nullptr) { // retry once if alloc failed
364         CoAlloc(task);
365         if (GetCoEnv()->runningCo == nullptr) { // retry still failed
366             FFRT_LOGE("alloc co routine failed");
367             return -1;
368         }
369     }
370     BindNewCoRoutione(task);
371     auto co = task->coRoutine;
372     if (co->status.load() == static_cast<int>(CoStatus::CO_UNINITIALIZED)) {
373         co2_init_context(&co->ctx, CoStartEntry, static_cast<void*>(co), co->stkMem.stk, co->stkMem.size);
374     }
375     return 0;
376 }
377 
CoSwitchInTransaction(ffrt::CPUEUTask * task)378 static inline void CoSwitchInTransaction(ffrt::CPUEUTask* task)
379 {
380     if (task->cpuBoostCtxId >= 0) {
381         CpuBoostRestore(task->cpuBoostCtxId);
382     }
383 }
384 
CoSwitchOutTransaction(ffrt::CPUEUTask * task)385 static inline void CoSwitchOutTransaction(ffrt::CPUEUTask* task)
386 {
387     if (task->cpuBoostCtxId >= 0) {
388         CpuBoostSave(task->cpuBoostCtxId);
389     }
390 }
391 
CoBboxPreCheck(ffrt::CPUEUTask * task)392 static inline bool CoBboxPreCheck(ffrt::CPUEUTask* task)
393 {
394     if (task->coRoutine) {
395         int ret = task->coRoutine->status.exchange(static_cast<int>(CoStatus::CO_RUNNING));
396         if (ret == static_cast<int>(CoStatus::CO_RUNNING) && GetBboxEnableState() != 0) {
397             FFRT_LOGE("executed by worker suddenly, ignore backtrace");
398             return false;
399         }
400     }
401 
402     return true;
403 }
404 
405 // called by thread work
CoStart(ffrt::CPUEUTask * task,CoRoutineEnv * coRoutineEnv)406 int CoStart(ffrt::CPUEUTask* task, CoRoutineEnv* coRoutineEnv)
407 {
408     if (!CoBboxPreCheck(task)) {
409         return 0;
410     }
411     if (CoCreat(task) != 0) {
412         return -1;
413     }
414     auto co = task->coRoutine;
415 
416     FFRTTraceRecord::TaskRun(task->GetQos(), task);
417 
418     for (;;) {
419         ffrt::TaskLoadTracking::Begin(task);
420 #ifdef FFRT_ASYNC_STACKTRACE
421         FFRTSetStackId(task->stackId);
422 #endif
423 #ifdef ENABLE_HITRACE_CHAIN
424         if (task->traceId_.valid == HITRACE_ID_VALID) {
425             HiTraceChainRestoreId(&task->traceId_);
426         }
427 #endif
428         FFRT_TASK_BEGIN(task->label, task->gid);
429         if (task->type == ffrt_normal_task) {
430             task->UpdateState(ffrt::TaskState::RUNNING);
431         }
432         CoSwitchInTransaction(task);
433 #ifdef FFRT_TASK_LOCAL_ENABLE
434         SwitchTsdToTask(co->task);
435 #endif
436 #ifdef ASAN_MODE
437         /* thread to co start */
438         __sanitizer_start_switch_fiber((void **)&co->asanFakeStack, GetCoStackAddr(co), co->stkMem.size);
439 #endif
440         /* thread switch to co */
441         CoSwitch(&co->thEnv->schCtx, &co->ctx);
442 #ifdef ASAN_MODE
443         /* co to thread finish */
444 		__sanitizer_finish_switch_fiber(co->asanFakeStack, (const void **)&co->asanFiberAddr, &co->asanFiberSize);
445 #endif
446         FFRT_TASK_END();
447 #ifdef ENABLE_HITRACE_CHAIN
448         task->traceId_ = HiTraceChainGetId();
449         if (task->traceId_.valid == HITRACE_ID_VALID) {
450             HiTraceChainClearId();
451         }
452 #endif
453         ffrt::TaskLoadTracking::End(task); // Todo: deal with CoWait()
454         CoStackCheck(co);
455 
456         // 1. coroutine task done, exit normally, need to exec next coroutine task
457         if (co->isTaskDone) {
458             task->UpdateState(ffrt::TaskState::EXITED);
459             co->isTaskDone = false;
460             return 0;
461         }
462 
463         // 2. couroutine task block, switch to thread
464         // need suspend the coroutine task or continue to execute the coroutine task.
465         auto pending = coRoutineEnv->pending;
466         if (pending == nullptr) {
467             return 0;
468         }
469         coRoutineEnv->pending = nullptr;
470         FFRTTraceRecord::TaskCoSwitchOut(task);
471         // Fast path: skip state transition
472         if ((*pending)(task)) {
473             // The ownership of the task belongs to other host(cv/mutex/epoll etc)
474             // And the task cannot be accessed any more.
475             return 0;
476         }
477         FFRT_WAKE_TRACER(task->gid); // fast path wk
478         coRoutineEnv->runningCo = co;
479     }
480 }
481 
482 // called by thread work
CoYield(void)483 void CoYield(void)
484 {
485     CoRoutine* co = static_cast<CoRoutine*>(GetCoEnv()->runningCo);
486     co->status.store(static_cast<int>(CoStatus::CO_NOT_FINISH));
487     GetCoEnv()->runningCo = nullptr;
488     CoSwitchOutTransaction(co->task);
489     if (co->task->type == ffrt_normal_task) {
490         co->task->UpdateState(ffrt::TaskState::BLOCKED);
491     }
492     FFRT_BLOCK_MARKER(co->task->gid);
493 #ifdef FFRT_TASK_LOCAL_ENABLE
494     SwitchTsdToThread(co->task);
495 #endif
496     CoStackCheck(co);
497 #ifdef ASAN_MODE
498     /* co to thread start */
499     __sanitizer_start_switch_fiber((void **)&co->asanFakeStack, co->asanFiberAddr, co->asanFiberSize);
500 #endif
501     /* co switch to thread */
502     CoSwitch(&co->ctx, &GetCoEnv()->schCtx);
503 #ifdef ASAN_MODE
504     /* thread to co finish */
505     __sanitizer_finish_switch_fiber(co->asanFakeStack, (const void**)&co->asanFiberAddr, &co->asanFiberSize);
506 #else
507     while (GetBboxEnableState() != 0) {
508         if (GetBboxEnableState() != gettid()) {
509             BboxFreeze(); // freeze non-crash thread
510             return;
511         }
512         const int IGNORE_DEPTH = 3;
513         backtrace(IGNORE_DEPTH);
514         co->status.store(static_cast<int>(CoStatus::CO_NOT_FINISH)); // recovery to old state
515         CoExit(co, co->task->type == ffrt_normal_task);
516     }
517 #endif
518 }
519 
CoWait(const std::function<bool (ffrt::CPUEUTask *)> & pred)520 void CoWait(const std::function<bool(ffrt::CPUEUTask*)>& pred)
521 {
522     GetCoEnv()->pending = &pred;
523     CoYield();
524 }
525 
CoWake(ffrt::CPUEUTask * task,CoWakeType type)526 void CoWake(ffrt::CPUEUTask* task, CoWakeType type)
527 {
528     if (task == nullptr) {
529         FFRT_LOGE("task is nullptr");
530         return;
531     }
532     // Fast path: state transition without lock
533     task->coWakeType = type;
534     FFRT_WAKE_TRACER(task->gid);
535     switch (task->type) {
536         case ffrt_normal_task: {
537             task->UpdateState(ffrt::TaskState::READY);
538             break;
539         }
540         case ffrt_queue_task: {
541             QueueTask* sTask = reinterpret_cast<QueueTask*>(task);
542             auto handle = sTask->GetHandler();
543             handle->TransferTask(sTask);
544             break;
545         }
546         default: {
547             FFRT_LOGE("CoWake unsupport task[%lu], type=%d, name[%s]", task->gid, task->type, task->label.c_str());
548             break;
549         }
550     }
551 }
552 
Instance()553 CoRoutineFactory &CoRoutineFactory::Instance()
554 {
555     static CoRoutineFactory fac;
556     return fac;
557 }