1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "co_routine.h"
17 #include <cstdio>
18 #include <cstdlib>
19 #include <cstring>
20 #include <string>
21 #include <sys/mman.h>
22 #include "ffrt_trace.h"
23 #include "dm/dependence_manager.h"
24 #include "core/entity.h"
25 #include "sched/scheduler.h"
26 #include "sync/sync.h"
27 #include "util/slab.h"
28 #include "sched/sched_deadline.h"
29 #include "sync/perf_counter.h"
30 #include "sync/io_poller.h"
31 #include "dfx/bbox/bbox.h"
32 #include "co_routine_factory.h"
33
34 static thread_local CoRoutineEnv* g_CoThreadEnv = nullptr;
35
CoSwitch(CoCtx * from,CoCtx * to)36 static inline void CoSwitch(CoCtx* from, CoCtx* to)
37 {
38 co2_switch_context(from, to);
39 }
40
CoExit(CoRoutine * co)41 static inline void CoExit(CoRoutine* co)
42 {
43 CoSwitch(&co->ctx, &co->thEnv->schCtx);
44 }
45
CoStartEntry(void * arg)46 static inline void CoStartEntry(void* arg)
47 {
48 CoRoutine* co = reinterpret_cast<CoRoutine*>(arg);
49 {
50 FFRT_LOGD("Execute func() task[%lu], name[%s]", co->task->gid, co->task->label.c_str());
51 auto f = reinterpret_cast<ffrt_function_header_t*>(co->task->func_storage);
52 auto exp = ffrt::SkipStatus::SUBMITTED;
53 if (likely(__atomic_compare_exchange_n(&co->task->skipped, &exp, ffrt::SkipStatus::EXECUTED, 0,
54 __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) {
55 f->exec(f);
56 }
57 f->destroy(f);
58 }
59 FFRT_TASKDONE_MARKER(co->task->gid);
60 co->task->UpdateState(ffrt::TaskState::EXITED);
61 co->status.store(static_cast<int>(CoStatus::CO_UNINITIALIZED));
62 CoExit(co);
63 }
64
CoInitThreadEnv(void)65 static inline void CoInitThreadEnv(void)
66 {
67 g_CoThreadEnv = reinterpret_cast<CoRoutineEnv*>(calloc(1, sizeof(CoRoutineEnv)));
68 CoRoutineEnv* t = g_CoThreadEnv;
69 if (!t) {
70 abort();
71 }
72 }
73
CoSetStackProt(CoRoutine * co,int prot)74 static void CoSetStackProt(CoRoutine* co, int prot)
75 {
76 /* set the attribute of the page table closest to the stack top in the user stack to read-only,
77 * and 1~2 page table space will be wasted
78 */
79 size_t p_size = getpagesize();
80 if (co->stkMem.size < p_size * 3) {
81 abort();
82 }
83
84 uint64_t mp = reinterpret_cast<uint64_t>(co->stkMem.stk);
85 mp = (mp + p_size - 1) / p_size * p_size;
86 int ret = mprotect(reinterpret_cast<void *>(static_cast<uintptr_t>(mp)), p_size, prot);
87 if (ret < 0) {
88 printf("coroutine size:%lu, mp:0x%lx, page_size:%zu,result:%d,prot:%d, err:%d,%s.\n",
89 static_cast<unsigned long>(sizeof(struct CoRoutine)), static_cast<unsigned long>(mp),
90 p_size, ret, prot, errno, strerror(errno));
91 abort();
92 }
93 }
94
AllocNewCoRoutine(void)95 static inline CoRoutine* AllocNewCoRoutine(void)
96 {
97 std::size_t stack_size = CoStackAttr::Instance()->size;
98 CoRoutine* co = ffrt::CoRoutineAllocMem(stack_size);
99 if (co == nullptr) {
100 abort();
101 }
102
103 co->stkMem.size = static_cast<uint64_t>(CoStackAttr::Instance()->size - sizeof(CoRoutine) + 8);
104 co->stkMem.magic = STACK_MAGIC;
105 if (CoStackAttr::Instance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
106 CoSetStackProt(co, PROT_READ);
107 }
108 co->status.store(static_cast<int>(CoStatus::CO_UNINITIALIZED));
109 return co;
110 }
111
CoMemFree(CoRoutine * co)112 static inline void CoMemFree(CoRoutine* co)
113 {
114 if (CoStackAttr::Instance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
115 CoSetStackProt(co, PROT_WRITE | PROT_READ);
116 }
117 ffrt::CoRoutineFreeMem(co);
118 }
119
CoStackFree(void)120 void CoStackFree(void)
121 {
122 if (g_CoThreadEnv) {
123 if (g_CoThreadEnv->runningCo) {
124 CoMemFree(g_CoThreadEnv->runningCo);
125 g_CoThreadEnv->runningCo = nullptr;
126 }
127 }
128 }
129
CoWorkerExit(void)130 void CoWorkerExit(void)
131 {
132 if (g_CoThreadEnv) {
133 if (g_CoThreadEnv->runningCo) {
134 CoMemFree(g_CoThreadEnv->runningCo);
135 g_CoThreadEnv->runningCo = nullptr;
136 }
137 ::free(g_CoThreadEnv);
138 g_CoThreadEnv = nullptr;
139 }
140 }
141
BindNewCoRoutione(ffrt::CPUEUTask * task)142 static inline void BindNewCoRoutione(ffrt::CPUEUTask* task)
143 {
144 task->coRoutine = g_CoThreadEnv->runningCo;
145 task->coRoutine->task = task;
146 task->coRoutine->thEnv = g_CoThreadEnv;
147 }
148
UnbindCoRoutione(ffrt::CPUEUTask * task)149 static inline void UnbindCoRoutione(ffrt::CPUEUTask* task)
150 {
151 task->coRoutine->task = nullptr;
152 task->coRoutine = nullptr;
153 }
154
CoAlloc(ffrt::CPUEUTask * task)155 static inline int CoAlloc(ffrt::CPUEUTask* task)
156 {
157 if (!g_CoThreadEnv) {
158 CoInitThreadEnv();
159 }
160 if (task->coRoutine) {
161 if (g_CoThreadEnv->runningCo) {
162 CoMemFree(g_CoThreadEnv->runningCo);
163 }
164 g_CoThreadEnv->runningCo = task->coRoutine;
165 } else {
166 if (!g_CoThreadEnv->runningCo) {
167 g_CoThreadEnv->runningCo = AllocNewCoRoutine();
168 }
169 }
170 BindNewCoRoutione(task);
171 return 0;
172 }
173
174 // call CoCreat when task creat
CoCreat(ffrt::CPUEUTask * task)175 static inline int CoCreat(ffrt::CPUEUTask* task)
176 {
177 CoAlloc(task);
178 auto co = task->coRoutine;
179 if (co->status.load() == static_cast<int>(CoStatus::CO_UNINITIALIZED)) {
180 co2_init_context(&co->ctx, CoStartEntry, static_cast<void*>(co), co->stkMem.stk, co->stkMem.size);
181 }
182 return 0;
183 }
184
CoStackCheck(CoRoutine * co)185 static inline void CoStackCheck(CoRoutine* co)
186 {
187 if (co->stkMem.magic != STACK_MAGIC) {
188 FFRT_LOGE("sp offset:%lu.\n", (uint64_t)co->stkMem.stk +
189 co->stkMem.size - co->ctx.regs[FFRT_REG_SP]);
190 FFRT_LOGE("stack over flow, check local variable in you tasks or use api 'ffrt_set_co_stack_attribute'.\n");
191 abort();
192 }
193 }
194
CoSwitchInTrace(ffrt::CPUEUTask * task)195 static inline void CoSwitchInTrace(ffrt::CPUEUTask* task)
196 {
197 if (task->coRoutine->status == static_cast<int>(CoStatus::CO_NOT_FINISH)) {
198 for (auto name : task->traceTag) {
199 FFRT_TRACE_BEGIN(name.c_str());
200 }
201 }
202 FFRT_FAKE_TRACE_MARKER(task->gid);
203 }
204
CoSwitchOutTrace(ffrt::CPUEUTask * task)205 static inline void CoSwitchOutTrace(ffrt::CPUEUTask* task)
206 {
207 FFRT_FAKE_TRACE_MARKER(task->gid);
208 int traceTagNum = static_cast<int>(task->traceTag.size());
209 for (int i = 0; i < traceTagNum; ++i) {
210 FFRT_TRACE_END();
211 }
212 }
213
214 // called by thread work
CoStart(ffrt::CPUEUTask * task)215 void CoStart(ffrt::CPUEUTask* task)
216 {
217 if (task->coRoutine) {
218 int ret = task->coRoutine->status.exchange(static_cast<int>(CoStatus::CO_RUNNING));
219 if (ret == static_cast<int>(CoStatus::CO_RUNNING) && GetBboxEnableState() != 0) {
220 FFRT_BBOX_LOG("executed by worker suddenly, ignore backtrace");
221 return;
222 }
223 }
224 CoCreat(task);
225 auto co = task->coRoutine;
226
227 #ifdef FFRT_BBOX_ENABLE
228 TaskRunCounterInc();
229 #endif
230
231 for (;;) {
232 FFRT_LOGD("Costart task[%lu], name[%s]", task->gid, task->label.c_str());
233 ffrt::TaskLoadTracking::Begin(task);
234 FFRT_TASK_BEGIN(task->label, task->gid);
235 CoSwitchInTrace(task);
236
237 CoSwitch(&co->thEnv->schCtx, &co->ctx);
238 FFRT_TASK_END();
239 ffrt::TaskLoadTracking::End(task); // Todo: deal with CoWait()
240 CoStackCheck(co);
241 auto pending = g_CoThreadEnv->pending;
242 if (pending == nullptr) {
243 #ifdef FFRT_BBOX_ENABLE
244 TaskFinishCounterInc();
245 #endif
246 break;
247 }
248 g_CoThreadEnv->pending = nullptr;
249 // Fast path: skip state transition
250 if ((*pending)(task)) {
251 FFRT_LOGD("Cowait task[%lu], name[%s]", task->gid, task->label.c_str());
252 #ifdef FFRT_BBOX_ENABLE
253 TaskSwitchCounterInc();
254 #endif
255 return;
256 }
257 FFRT_WAKE_TRACER(task->gid); // fast path wk
258 g_CoThreadEnv->runningCo = co;
259 }
260 }
261
262 // called by thread work
CoYield(void)263 void CoYield(void)
264 {
265 CoRoutine* co = static_cast<CoRoutine*>(g_CoThreadEnv->runningCo);
266 co->status.store(static_cast<int>(CoStatus::CO_NOT_FINISH));
267 g_CoThreadEnv->runningCo = nullptr;
268 CoSwitchOutTrace(co->task);
269 FFRT_BLOCK_MARKER(co->task->gid);
270 CoSwitch(&co->ctx, &g_CoThreadEnv->schCtx);
271 while (GetBboxEnableState() != 0) {
272 if (GetBboxEnableState() != gettid()) {
273 BboxFreeze(); // freeze non-crash thread
274 return;
275 }
276 const int IGNORE_DEPTH = 3;
277 backtrace(IGNORE_DEPTH);
278 co->status.store(static_cast<int>(CoStatus::CO_NOT_FINISH)); // recovery to old state
279 CoExit(co);
280 }
281 }
282
CoWait(const std::function<bool (ffrt::CPUEUTask *)> & pred)283 void CoWait(const std::function<bool(ffrt::CPUEUTask*)>& pred)
284 {
285 g_CoThreadEnv->pending = &pred;
286 CoYield();
287 }
288
CoWake(ffrt::CPUEUTask * task,bool timeOut)289 void CoWake(ffrt::CPUEUTask* task, bool timeOut)
290 {
291 if (task == nullptr) {
292 FFRT_LOGE("task is nullptr");
293 return;
294 }
295 // Fast path: state transition without lock
296 FFRT_LOGD("Cowake task[%lu], name[%s], timeOut[%d]", task->gid, task->label.c_str(), timeOut);
297 task->wakeupTimeOut = timeOut;
298 FFRT_WAKE_TRACER(task->gid);
299 task->UpdateState(ffrt::TaskState::READY);
300 }
301