• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "co_routine.h"
17 #include <cstdio>
18 #include <cstdlib>
19 #include <cstring>
20 #include <securec.h>
21 #include <string>
22 #include <sys/mman.h>
23 #include "ffrt_trace.h"
24 #include "dm/dependence_manager.h"
25 #include "core/entity.h"
26 #include "tm/queue_task.h"
27 #include "sched/scheduler.h"
28 #include "sync/sync.h"
29 #include "util/slab.h"
30 #include "sched/sched_deadline.h"
31 #include "sync/perf_counter.h"
32 #include "sync/io_poller.h"
33 #include "dfx/bbox/bbox.h"
34 #include "dfx/trace_record/ffrt_trace_record.h"
35 #include "co_routine_factory.h"
36 #include "util/ffrt_facade.h"
37 #ifdef FFRT_TASK_LOCAL_ENABLE
38 #include "pthread_ffrt.h"
39 #endif
40 #ifdef FFRT_ASYNC_STACKTRACE
41 #include "dfx/async_stack/ffrt_async_stack.h"
42 #ifdef FFRT_TASK_LOCAL_ENABLE
43 #include "pthread_ffrt.h"
44 #endif
45 #endif
46 
47 using namespace ffrt;
48 
CoStackCheck(CoRoutine * co)49 static inline void CoStackCheck(CoRoutine* co)
50 {
51     if (unlikely(co->stkMem.magic != STACK_MAGIC)) {
52         FFRT_LOGE("sp offset:%p.\n", co->stkMem.stk +
53             co->stkMem.size - co->ctx.regs[REG_SP]);
54         FFRT_LOGE("stack over flow, check local variable in you tasks or use api 'ffrt_task_attr_set_stack_size'.\n");
55     }
56 }
57 
58 extern pthread_key_t g_executeCtxTlsKey;
59 pthread_key_t g_coThreadTlsKey = 0;
60 pthread_once_t g_coThreadTlsKeyOnce = PTHREAD_ONCE_INIT;
CoEnvDestructor(void * args)61 void CoEnvDestructor(void* args)
62 {
63     auto coEnv = static_cast<CoRoutineEnv*>(args);
64     if (coEnv) {
65         delete coEnv;
66     }
67 }
68 
MakeCoEnvTlsKey()69 void MakeCoEnvTlsKey()
70 {
71     pthread_key_create(&g_coThreadTlsKey, CoEnvDestructor);
72 }
73 
GetCoEnv()74 CoRoutineEnv* GetCoEnv()
75 {
76     CoRoutineEnv* coEnv = nullptr;
77     pthread_once(&g_coThreadTlsKeyOnce, MakeCoEnvTlsKey);
78 
79     void *curTls = pthread_getspecific(g_coThreadTlsKey);
80     if (curTls != nullptr) {
81         coEnv = reinterpret_cast<CoRoutineEnv *>(curTls);
82     } else {
83         coEnv = new CoRoutineEnv();
84         pthread_setspecific(g_coThreadTlsKey, coEnv);
85     }
86     return coEnv;
87 }
88 
89 #ifdef FFRT_TASK_LOCAL_ENABLE
90 namespace {
IsTaskLocalEnable(ffrt::CPUEUTask * task)91 bool IsTaskLocalEnable(ffrt::CPUEUTask* task)
92 {
93     if ((task->type != ffrt_normal_task) || (!task->taskLocal)) {
94         return false;
95     }
96 
97     if (task->tsd == nullptr) {
98         FFRT_LOGE("taskLocal enabled but task tsd invalid");
99         return false;
100     }
101 
102     return true;
103 }
104 
InitWorkerTsdValueToTask(void ** taskTsd)105 void InitWorkerTsdValueToTask(void** taskTsd)
106 {
107     const pthread_key_t updKeyMap[] = {g_executeCtxTlsKey, g_coThreadTlsKey};
108     auto threadTsd = pthread_gettsd();
109     for (const auto& key : updKeyMap) {
110         FFRT_UNLIKELY_COND_DO_ABORT(key <= 0, "key[%d] invalid", key);
111         auto addr = threadTsd[key];
112         if (addr) {
113             taskTsd[key] = addr;
114         }
115     }
116 }
117 
SwitchTsdAddrToTask(ffrt::CPUEUTask * task)118 void SwitchTsdAddrToTask(ffrt::CPUEUTask* task)
119 {
120     auto threadTsd = pthread_gettsd();
121     task->threadTsd = threadTsd;
122     pthread_settsd(task->tsd);
123 }
124 
SwitchTsdToTask(ffrt::CPUEUTask * task)125 void SwitchTsdToTask(ffrt::CPUEUTask* task)
126 {
127     if (!IsTaskLocalEnable(task)) {
128         return;
129     }
130 
131     InitWorkerTsdValueToTask(task->tsd);
132 
133     SwitchTsdAddrToTask(task);
134 
135     task->runningTid.store(pthread_self());
136     FFRT_LOGD("switch tsd to task Success");
137 }
138 
SwitchTsdAddrToThread(ffrt::CPUEUTask * task)139 bool SwitchTsdAddrToThread(ffrt::CPUEUTask* task)
140 {
141     if (!task->threadTsd) {
142         return false;
143     }
144     pthread_settsd(task->threadTsd);
145     task->threadTsd = nullptr;
146     return true;
147 }
148 
UpdateWorkerTsdValueToThread(void ** taskTsd)149 void UpdateWorkerTsdValueToThread(void** taskTsd)
150 {
151     const pthread_key_t updKeyMap[] = {g_executeCtxTlsKey, g_coThreadTlsKey};
152     auto threadTsd = pthread_gettsd();
153     for (const auto& key : updKeyMap) {
154         FFRT_UNLIKELY_COND_DO_ABORT(key <= 0, "key[%d] invalid", key);
155         auto threadVal = threadTsd[key];
156         auto taskVal = taskTsd[key];
157         if (!threadVal && taskVal) {
158             threadTsd[key] = taskVal;
159         } else {
160             FFRT_UNLIKELY_COND_DO_ABORT((threadVal && taskVal && (threadVal != taskVal)),
161                 "mismatch key=[%d]", key);
162             FFRT_UNLIKELY_COND_DO_ABORT((threadVal && !taskVal),
163                 "unexpected: thread exists but task not exists, key=[%d]", key);
164         }
165         taskTsd[key] = nullptr;
166     }
167 }
168 
SwitchTsdToThread(ffrt::CPUEUTask * task)169 void SwitchTsdToThread(ffrt::CPUEUTask* task)
170 {
171     if (!IsTaskLocalEnable(task)) {
172         return;
173     }
174 
175     if (!SwitchTsdAddrToThread(task)) {
176         return;
177     }
178 
179     UpdateWorkerTsdValueToThread(task->tsd);
180 
181     task->runningTid.store(0);
182     FFRT_LOGD("switch tsd to thread Success");
183 }
184 
TaskTsdRunDtors(ffrt::CPUEUTask * task)185 void TaskTsdRunDtors(ffrt::CPUEUTask* task)
186 {
187     SwitchTsdAddrToTask(task);
188     pthread_tsd_run_dtors();
189     SwitchTsdAddrToThread(task);
190 }
191 } // namespace
192 
TaskTsdDeconstruct(ffrt::CPUEUTask * task)193 void TaskTsdDeconstruct(ffrt::CPUEUTask* task)
194 {
195     if (!IsTaskLocalEnable(task)) {
196         return;
197     }
198 
199     TaskTsdRunDtors(task);
200     if (task->tsd != nullptr) {
201         free(task->tsd);
202         task->tsd = nullptr;
203         task->taskLocal = false;
204     }
205     FFRT_LOGD("tsd deconstruct done, task[%lu], name[%s]", task->gid, task->label.c_str());
206 }
207 #endif
208 
CoSwitch(CoCtx * from,CoCtx * to)209 static inline void CoSwitch(CoCtx* from, CoCtx* to)
210 {
211     co2_switch_context(from, to);
212 }
213 
CoExit(CoRoutine * co,bool isNormalTask)214 static inline void CoExit(CoRoutine* co, bool isNormalTask)
215 {
216 #ifdef FFRT_TASK_LOCAL_ENABLE
217     if (isNormalTask) {
218         SwitchTsdToThread(co->task);
219     }
220 #endif
221     CoStackCheck(co);
222 #ifdef ASAN_MODE
223     /* co to thread start */
224     __sanitizer_start_switch_fiber((void **)&co->asanFakeStack, co->asanFiberAddr, co->asanFiberSize);
225     /* clear remaining shadow stack */
226     __asan_handle_no_return();
227 #endif
228     /* co switch to thread, and do not switch back again */
229     CoSwitch(&co->ctx, &co->thEnv->schCtx);
230 }
231 
CoStartEntry(void * arg)232 static inline void CoStartEntry(void* arg)
233 {
234     CoRoutine* co = reinterpret_cast<CoRoutine*>(arg);
235 #ifdef ASAN_MODE
236     /* thread to co finish first */
237     __sanitizer_finish_switch_fiber(co->asanFakeStack, (const void**)&co->asanFiberAddr, &co->asanFiberSize);
238 #endif
239     ffrt::CPUEUTask* task = co->task;
240     bool isNormalTask = false;
241     switch (task->type) {
242         case ffrt_normal_task: {
243             isNormalTask = true;
244             task->Execute();
245             break;
246         }
247         case ffrt_queue_task: {
248             QueueTask* sTask = reinterpret_cast<QueueTask*>(task);
249             // Before the batch execution is complete, head node cannot be released.
250             sTask->IncDeleteRef();
251             sTask->Execute();
252             sTask->DecDeleteRef();
253             break;
254         }
255         default: {
256             FFRT_LOGE("CoStart unsupport task[%lu], type=%d, name[%s]", task->gid, task->type, task->label.c_str());
257             break;
258         }
259     }
260 
261     co->status.store(static_cast<int>(CoStatus::CO_UNINITIALIZED));
262     CoExit(co, isNormalTask);
263 }
264 
CoSetStackProt(CoRoutine * co,int prot)265 static void CoSetStackProt(CoRoutine* co, int prot)
266 {
267     /* set the attribute of the page table closest to the stack top in the user stack to read-only,
268      * and 1~2 page table space will be wasted
269      */
270     size_t p_size = getpagesize();
271     uint64_t mp = reinterpret_cast<uint64_t>(co->stkMem.stk);
272     mp = (mp + p_size - 1) / p_size * p_size;
273     int ret = mprotect(reinterpret_cast<void *>(static_cast<uintptr_t>(mp)), p_size, prot);
274     FFRT_UNLIKELY_COND_DO_ABORT(ret < 0, "coroutine size:%lu, mp:0x%lx, page_size:%zu,result:%d,prot:%d, err:%d,%s",
275                                 static_cast<unsigned long>(sizeof(struct CoRoutine)), static_cast<unsigned long>(mp),
276                                 p_size, ret, prot, errno, strerror(errno));
277 }
278 
AllocNewCoRoutine(size_t stackSize)279 static inline CoRoutine* AllocNewCoRoutine(size_t stackSize)
280 {
281     std::size_t defaultStackSize = FFRTFacade::GetCSAInstance()->size;
282     CoRoutine* co = nullptr;
283     if (likely(stackSize == defaultStackSize)) {
284         co = ffrt::CoRoutineAllocMem(stackSize);
285     } else {
286         co = static_cast<CoRoutine*>(mmap(nullptr, stackSize,
287             PROT_READ | PROT_WRITE,  MAP_ANONYMOUS | MAP_PRIVATE, -1, 0));
288         if (co == reinterpret_cast<CoRoutine*>(MAP_FAILED)) {
289             FFRT_LOGE("memory mmap failed.");
290             return nullptr;
291         }
292     }
293     if (!co) {
294         FFRT_LOGE("memory not enough");
295         return nullptr;
296     }
297     co->allocatedSize = stackSize;
298     co->stkMem.size = static_cast<uint64_t>(stackSize - sizeof(CoRoutine) + 8);
299     co->stkMem.magic = STACK_MAGIC;
300     if (FFRTFacade::GetCSAInstance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
301         CoSetStackProt(co, PROT_READ);
302     }
303     co->status.store(static_cast<int>(CoStatus::CO_UNINITIALIZED));
304     return co;
305 }
306 
CoMemFree(CoRoutine * co)307 static inline void CoMemFree(CoRoutine* co)
308 {
309     if (FFRTFacade::GetCSAInstance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
310         CoSetStackProt(co, PROT_WRITE | PROT_READ);
311     }
312     std::size_t defaultStackSize = FFRTFacade::GetCSAInstance()->size;
313     if (likely(co->allocatedSize == defaultStackSize)) {
314         ffrt::CoRoutineFreeMem(co);
315     } else {
316         int ret = munmap(co, co->allocatedSize);
317         if (ret != 0) {
318             FFRT_LOGE("munmap failed with errno: %d", errno);
319         }
320     }
321 }
322 
CoStackFree(void)323 void CoStackFree(void)
324 {
325     if (GetCoEnv()) {
326         if (GetCoEnv()->runningCo) {
327             CoMemFree(GetCoEnv()->runningCo);
328             GetCoEnv()->runningCo = nullptr;
329         }
330     }
331 }
332 
CoWorkerExit(void)333 void CoWorkerExit(void)
334 {
335     CoStackFree();
336 }
337 
BindNewCoRoutione(ffrt::CPUEUTask * task)338 static inline void BindNewCoRoutione(ffrt::CPUEUTask* task)
339 {
340     task->coRoutine = GetCoEnv()->runningCo;
341     task->coRoutine->task = task;
342     task->coRoutine->thEnv = GetCoEnv();
343 }
344 
UnbindCoRoutione(ffrt::CPUEUTask * task)345 static inline void UnbindCoRoutione(ffrt::CPUEUTask* task)
346 {
347     task->coRoutine->task = nullptr;
348     task->coRoutine = nullptr;
349 }
350 
CoAlloc(ffrt::CPUEUTask * task)351 static inline int CoAlloc(ffrt::CPUEUTask* task)
352 {
353     if (task->coRoutine) { // use allocated coroutine stack
354         if (GetCoEnv()->runningCo) { // free cached stack if it exist
355             CoMemFree(GetCoEnv()->runningCo);
356         }
357         GetCoEnv()->runningCo = task->coRoutine;
358     } else {
359         if (!GetCoEnv()->runningCo) { // if no cached stack, alloc one
360             GetCoEnv()->runningCo = AllocNewCoRoutine(task->stack_size);
361         } else { // exist cached stack
362             if (GetCoEnv()->runningCo->allocatedSize != task->stack_size) { // stack size not match, alloc one
363                 CoMemFree(GetCoEnv()->runningCo); // free cached stack
364                 GetCoEnv()->runningCo = AllocNewCoRoutine(task->stack_size);
365             }
366         }
367     }
368     return 0;
369 }
370 
371 // call CoCreat when task creat
CoCreat(ffrt::CPUEUTask * task)372 static inline int CoCreat(ffrt::CPUEUTask* task)
373 {
374     CoAlloc(task);
375     if (GetCoEnv()->runningCo == nullptr) {
376         return -1;
377     }
378     BindNewCoRoutione(task);
379     auto co = task->coRoutine;
380     if (co->status.load() == static_cast<int>(CoStatus::CO_UNINITIALIZED)) {
381         co2_init_context(&co->ctx, CoStartEntry, static_cast<void*>(co), co->stkMem.stk, co->stkMem.size);
382     }
383     return 0;
384 }
385 
CoSwitchInTransaction(ffrt::CPUEUTask * task)386 static inline void CoSwitchInTransaction(ffrt::CPUEUTask* task)
387 {
388     if (task->coRoutine->status == static_cast<int>(CoStatus::CO_NOT_FINISH)) {
389         for (auto& name : task->traceTag) {
390             FFRT_TRACE_BEGIN(name.c_str());
391         }
392     }
393     FFRT_FAKE_TRACE_MARKER(task->gid);
394 }
395 
CoSwitchOutTransaction(ffrt::CPUEUTask * task)396 static inline void CoSwitchOutTransaction(ffrt::CPUEUTask* task)
397 {
398     FFRT_FAKE_TRACE_MARKER(task->gid);
399     int traceTagNum = static_cast<int>(task->traceTag.size());
400     for (int i = 0; i < traceTagNum; ++i) {
401         FFRT_TRACE_END();
402     }
403 }
404 
CoBboxPreCheck(ffrt::CPUEUTask * task)405 static inline bool CoBboxPreCheck(ffrt::CPUEUTask* task)
406 {
407     if (task->coRoutine) {
408         int ret = task->coRoutine->status.exchange(static_cast<int>(CoStatus::CO_RUNNING));
409         if (ret == static_cast<int>(CoStatus::CO_RUNNING) && GetBboxEnableState() != 0) {
410             FFRT_LOGE("executed by worker suddenly, ignore backtrace");
411             return false;
412         }
413     }
414 
415     return true;
416 }
417 
418 // called by thread work
CoStart(ffrt::CPUEUTask * task,CoRoutineEnv * coRoutineEnv)419 int CoStart(ffrt::CPUEUTask* task, CoRoutineEnv* coRoutineEnv)
420 {
421     if (!CoBboxPreCheck(task)) {
422         return 0;
423     }
424 
425     if (CoCreat(task) != 0) {
426         return -1;
427     }
428     auto co = task->coRoutine;
429 
430     FFRTTraceRecord::TaskRun(task->GetQos(), task);
431 
432     for (;;) {
433         ffrt::TaskLoadTracking::Begin(task);
434 #ifdef FFRT_ASYNC_STACKTRACE
435         FFRTSetStackId(task->stackId);
436 #endif
437         FFRT_TASK_BEGIN(task->label, task->gid);
438         if (task->type == ffrt_normal_task) {
439             task->UpdateState(ffrt::TaskState::RUNNING);
440         }
441         CoSwitchInTransaction(task);
442 #ifdef FFRT_TASK_LOCAL_ENABLE
443         SwitchTsdToTask(co->task);
444 #endif
445 #ifdef ASAN_MODE
446         /* thread to co start */
447         __sanitizer_start_switch_fiber((void **)&co->asanFakeStack, GetCoStackAddr(co), co->stkMem.size);
448 #endif
449         /* thread switch to co */
450         CoSwitch(&co->thEnv->schCtx, &co->ctx);
451 #ifdef ASAN_MODE
452         /* co to thread finish */
453         __sanitizer_finish_switch_fiber(co->asanFakeStack, (const void**)&co->asanFiberAddr, &co->asanFiberSize);
454 #endif
455         FFRT_TASK_END();
456         ffrt::TaskLoadTracking::End(task); // Todo: deal with CoWait()
457         CoStackCheck(co);
458 
459         // 1. coroutine task done, exit normally, need to exec next coroutine task
460         if (co->isTaskDone) {
461             task->UpdateState(ffrt::TaskState::EXITED);
462             co->isTaskDone = false;
463             return 0;
464         }
465 
466         // 2. couroutine task block, switch to thread
467         // need suspend the coroutine task or continue to execute the coroutine task.
468         auto pending = coRoutineEnv->pending;
469         if (pending == nullptr) {
470             return 0;
471         }
472         coRoutineEnv->pending = nullptr;
473         FFRTTraceRecord::TaskCoSwitchOut(task);
474         // Fast path: skip state transition
475         if ((*pending)(task)) {
476             // The ownership of the task belongs to other host(cv/mutex/epoll etc)
477             // And the task cannot be accessed any more.
478             return 0;
479         }
480         FFRT_WAKE_TRACER(task->gid); // fast path wk
481         coRoutineEnv->runningCo = co;
482     }
483 }
484 
485 // called by thread work
CoYield(void)486 void CoYield(void)
487 {
488     CoRoutine* co = static_cast<CoRoutine*>(GetCoEnv()->runningCo);
489     co->status.store(static_cast<int>(CoStatus::CO_NOT_FINISH));
490     GetCoEnv()->runningCo = nullptr;
491     CoSwitchOutTransaction(co->task);
492     if (co->task->type == ffrt_normal_task) {
493         co->task->UpdateState(ffrt::TaskState::BLOCKED);
494     }
495     FFRT_BLOCK_MARKER(co->task->gid);
496 #ifdef FFRT_TASK_LOCAL_ENABLE
497     SwitchTsdToThread(co->task);
498 #endif
499     CoStackCheck(co);
500 #ifdef ASAN_MODE
501     /* co to thread start */
502     __sanitizer_start_switch_fiber((void **)&co->asanFakeStack, co->asanFiberAddr, co->asanFiberSize);
503 #endif
504     /* co switch to thread */
505     CoSwitch(&co->ctx, &GetCoEnv()->schCtx);
506 #ifdef ASAN_MODE
507     /* thread to co finish */
508     __sanitizer_finish_switch_fiber(co->asanFakeStack, (const void**)&co->asanFiberAddr, &co->asanFiberSize);
509 #else
510     while (GetBboxEnableState() != 0) {
511         if (GetBboxEnableState() != gettid()) {
512             BboxFreeze(); // freeze non-crash thread
513             return;
514         }
515         const int IGNORE_DEPTH = 3;
516         backtrace(IGNORE_DEPTH);
517         co->status.store(static_cast<int>(CoStatus::CO_NOT_FINISH)); // recovery to old state
518         CoExit(co, co->task->type == ffrt_normal_task);
519     }
520 #endif
521 }
522 
CoWait(const std::function<bool (ffrt::CPUEUTask *)> & pred)523 void CoWait(const std::function<bool(ffrt::CPUEUTask*)>& pred)
524 {
525     GetCoEnv()->pending = &pred;
526     CoYield();
527 }
528 
CoWake(ffrt::CPUEUTask * task,bool timeOut)529 void CoWake(ffrt::CPUEUTask* task, bool timeOut)
530 {
531     if (task == nullptr) {
532         FFRT_LOGE("task is nullptr");
533         return;
534     }
535     // Fast path: state transition without lock
536     task->wakeupTimeOut = timeOut;
537     FFRT_WAKE_TRACER(task->gid);
538     switch (task->type) {
539         case ffrt_normal_task: {
540             task->UpdateState(ffrt::TaskState::READY);
541             break;
542         }
543         case ffrt_queue_task: {
544             QueueTask* sTask = reinterpret_cast<QueueTask*>(task);
545             auto handle = sTask->GetHandler();
546             handle->TransferTask(sTask);
547             break;
548         }
549         default: {
550             FFRT_LOGE("CoWake unsupport task[%lu], type=%d, name[%s]", task->gid, task->type, task->label.c_str());
551             break;
552         }
553     }
554 }
555 
Instance()556 CoRoutineFactory &CoRoutineFactory::Instance()
557 {
558     static CoRoutineFactory fac;
559     return fac;
560 }
561