1 /**
2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "runtime/include/thread_scopes.h"
17 #include "runtime/coroutines/stackful_coroutine_manager.h"
18 #include "runtime/coroutines/stackful_coroutine.h"
19 #include "runtime/coroutines/stackful_coroutine_worker.h"
20
21 namespace ark {
22
StackfulCoroutineWorker(Runtime * runtime,PandaVM * vm,StackfulCoroutineManager * coroManager,ScheduleLoopType type,PandaString name,size_t id)23 StackfulCoroutineWorker::StackfulCoroutineWorker(Runtime *runtime, PandaVM *vm, StackfulCoroutineManager *coroManager,
24 ScheduleLoopType type, PandaString name, size_t id)
25 : runtime_(runtime),
26 vm_(vm),
27 coroManager_(coroManager),
28 threadId_(os::thread::GetCurrentThreadId()),
29 stats_(name),
30 name_(std::move(name)),
31 id_(id)
32 {
33 ASSERT(id <= stackful_coroutines::MAX_WORKER_ID);
34 LOG(DEBUG, COROUTINES) << "Created a coroutine worker instance: id=" << id_ << " name=" << name_;
35 if (type == ScheduleLoopType::THREAD) {
36 std::thread t(&StackfulCoroutineWorker::ThreadProc, this);
37 os::thread::SetThreadName(t.native_handle(), name_.c_str());
38 t.detach();
39 // will create the schedule loop coroutine in the thread proc in order to set the stack protector correctly
40 } else {
41 scheduleLoopCtx_ =
42 coroManager->CreateNativeCoroutine(runtime, vm, ScheduleLoopProxy, this, "[fiber_sch] " + GetName());
43 PushToRunnableQueue(scheduleLoopCtx_);
44 }
45 }
46
AddRunnableCoroutine(Coroutine * newCoro,bool prioritize)47 void StackfulCoroutineWorker::AddRunnableCoroutine(Coroutine *newCoro, bool prioritize)
48 {
49 PushToRunnableQueue(newCoro, prioritize);
50 }
51
WaitForEvent(CoroutineEvent * awaitee)52 bool StackfulCoroutineWorker::WaitForEvent(CoroutineEvent *awaitee)
53 {
54 // precondition: this method is not called by the schedule loop coroutine
55
56 Coroutine *waiter = Coroutine::GetCurrent();
57 ASSERT(GetCurrentContext()->GetWorker() == this);
58 ASSERT(awaitee != nullptr);
59
60 if (awaitee->Happened()) {
61 awaitee->Unlock();
62 return false;
63 }
64
65 waitersLock_.Lock();
66 awaitee->Unlock();
67 LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::AddWaitingCoroutine: " << waiter->GetName() << " AWAITS";
68 waiters_.insert({awaitee, waiter});
69
70 runnablesLock_.Lock();
71 ASSERT(RunnableCoroutinesExist());
72 ScopedNativeCodeThread n(Coroutine::GetCurrent());
73 // will unlock waiters_lock_ and switch ctx
74 BlockCurrentCoroAndScheduleNext();
75
76 return true;
77 }
78
UnblockWaiters(CoroutineEvent * blocker)79 void StackfulCoroutineWorker::UnblockWaiters(CoroutineEvent *blocker)
80 {
81 os::memory::LockHolder lock(waitersLock_);
82 auto w = waiters_.find(blocker);
83 while (w != waiters_.end()) {
84 auto *coro = w->second;
85 waiters_.erase(w);
86 coro->RequestUnblock();
87 PushToRunnableQueue(coro);
88 w = waiters_.find(blocker);
89 }
90 }
91
RequestFinalization(Coroutine * finalizee)92 void StackfulCoroutineWorker::RequestFinalization(Coroutine *finalizee)
93 {
94 // precondition: current coro and finalizee belong to the current worker
95 ASSERT(finalizee->GetContext<StackfulCoroutineContext>()->GetWorker() == this);
96 ASSERT(GetCurrentContext()->GetWorker() == this);
97
98 finalizationQueue_.push(finalizee);
99 ScheduleNextCoroUnlockNone();
100 }
101
RequestSchedule()102 void StackfulCoroutineWorker::RequestSchedule()
103 {
104 RequestScheduleImpl();
105 }
106
FinalizeFiberScheduleLoop()107 void StackfulCoroutineWorker::FinalizeFiberScheduleLoop()
108 {
109 ASSERT(GetCurrentContext()->GetWorker() == this);
110
111 // part of MAIN finalization sequence
112 if (RunnableCoroutinesExist()) {
113 // the schedule loop is still runnable
114 ASSERT(scheduleLoopCtx_->HasNativeEntrypoint());
115 runnablesLock_.Lock();
116 // sch loop only
117 ASSERT(runnables_.size() == 1);
118 SuspendCurrentCoroAndScheduleNext();
119 }
120 }
121
DisableCoroutineSwitch()122 void StackfulCoroutineWorker::DisableCoroutineSwitch()
123 {
124 ++disableCoroSwitchCounter_;
125 LOG(DEBUG, COROUTINES) << "Coroutine switch on " << GetName()
126 << " has been disabled! Recursive ctr = " << disableCoroSwitchCounter_;
127 }
128
EnableCoroutineSwitch()129 void StackfulCoroutineWorker::EnableCoroutineSwitch()
130 {
131 ASSERT(IsCoroutineSwitchDisabled());
132 --disableCoroSwitchCounter_;
133 LOG(DEBUG, COROUTINES) << "Coroutine switch on " << GetName()
134 << " has been enabled! Recursive ctr = " << disableCoroSwitchCounter_;
135 }
136
IsCoroutineSwitchDisabled()137 bool StackfulCoroutineWorker::IsCoroutineSwitchDisabled()
138 {
139 return disableCoroSwitchCounter_ > 0;
140 }
141
142 #ifndef NDEBUG
PrintRunnables(const PandaString & requester)143 void StackfulCoroutineWorker::PrintRunnables(const PandaString &requester)
144 {
145 os::memory::LockHolder lock(runnablesLock_);
146 LOG(DEBUG, COROUTINES) << "[" << requester << "] ";
147 for (auto *co : runnables_) {
148 LOG(DEBUG, COROUTINES) << co->GetName() << " <";
149 }
150 LOG(DEBUG, COROUTINES) << "X";
151 }
152 #endif
153
ThreadProc()154 void StackfulCoroutineWorker::ThreadProc()
155 {
156 threadId_ = os::thread::GetCurrentThreadId();
157 scheduleLoopCtx_ = coroManager_->CreateEntrypointlessCoroutine(runtime_, vm_, false, "[thr_sch] " + GetName());
158 scheduleLoopCtx_->GetContext<StackfulCoroutineContext>()->SetWorker(this);
159 Coroutine::SetCurrent(scheduleLoopCtx_);
160 scheduleLoopCtx_->RequestResume();
161 scheduleLoopCtx_->NativeCodeBegin();
162 coroManager_->OnWorkerStartup();
163
164 // profiling: start interval here, end in ctxswitch
165 stats_.StartInterval(CoroutineTimeStats::SCH_ALL);
166 ScheduleLoopBody();
167
168 coroManager_->DestroyEntrypointlessCoroutine(scheduleLoopCtx_);
169 ASSERT(threadId_ == os::thread::GetCurrentThreadId());
170 coroManager_->OnWorkerShutdown();
171 }
172
ScheduleLoop()173 void StackfulCoroutineWorker::ScheduleLoop()
174 {
175 LOG(DEBUG, COROUTINES) << "[" << GetName() << "] Schedule loop called!";
176 ScheduleLoopBody();
177 }
178
ScheduleLoopBody()179 void StackfulCoroutineWorker::ScheduleLoopBody()
180 {
181 ScopedManagedCodeThread s(scheduleLoopCtx_);
182 while (IsActive()) {
183 RequestScheduleImpl();
184 os::memory::LockHolder lkRunnables(runnablesLock_);
185 UpdateLoadFactor();
186 }
187 }
188
ScheduleLoopProxy(void * worker)189 void StackfulCoroutineWorker::ScheduleLoopProxy(void *worker)
190 {
191 static_cast<StackfulCoroutineWorker *>(worker)->ScheduleLoop();
192 }
193
PushToRunnableQueue(Coroutine * co,bool pushFront)194 void StackfulCoroutineWorker::PushToRunnableQueue(Coroutine *co, bool pushFront)
195 {
196 os::memory::LockHolder lock(runnablesLock_);
197 co->GetContext<StackfulCoroutineContext>()->SetWorker(this);
198
199 if (pushFront) {
200 runnables_.push_front(co);
201 } else {
202 runnables_.push_back(co);
203 }
204 UpdateLoadFactor();
205
206 runnablesCv_.Signal();
207 }
208
PopFromRunnableQueue()209 Coroutine *StackfulCoroutineWorker::PopFromRunnableQueue()
210 {
211 os::memory::LockHolder lock(runnablesLock_);
212 ASSERT(!runnables_.empty());
213 auto *co = runnables_.front();
214 runnables_.pop_front();
215 UpdateLoadFactor();
216 return co;
217 }
218
RunnableCoroutinesExist() const219 bool StackfulCoroutineWorker::RunnableCoroutinesExist() const
220 {
221 os::memory::LockHolder lock(runnablesLock_);
222 return !runnables_.empty();
223 }
224
WaitForRunnables()225 void StackfulCoroutineWorker::WaitForRunnables()
226 {
227 // NOTE(konstanting): in case of work stealing, use timed wait and try periodically to steal some runnables
228 while (!RunnableCoroutinesExist() && IsActive()) {
229 // profiling: no need to profile the SLEEPING state, closing the interval
230 stats_.FinishInterval(CoroutineTimeStats::SCH_ALL);
231 runnablesCv_.Wait(
232 &runnablesLock_); // or timed wait? we may miss the signal in some cases (e.g. IsActive() change)...
233 // profiling: reopening the interval after the sleep
234 stats_.StartInterval(CoroutineTimeStats::SCH_ALL);
235 if (!RunnableCoroutinesExist() && IsActive()) {
236 LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::WaitForRunnables: spurious wakeup!";
237 } else {
238 LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::WaitForRunnables: wakeup!";
239 }
240 }
241 }
242
RequestScheduleImpl()243 void StackfulCoroutineWorker::RequestScheduleImpl()
244 {
245 // precondition: called within the current worker, no cross-worker calls allowed
246 ASSERT(GetCurrentContext()->GetWorker() == this);
247 runnablesLock_.Lock();
248
249 // NOTE(konstanting): implement coro migration, work stealing, etc.
250 ScopedNativeCodeThread n(Coroutine::GetCurrent());
251 if (RunnableCoroutinesExist()) {
252 SuspendCurrentCoroAndScheduleNext();
253 } else {
254 LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::RequestSchedule: No runnables, starting to wait...";
255 WaitForRunnables();
256 runnablesLock_.Unlock();
257 }
258 }
259
BlockCurrentCoroAndScheduleNext()260 void StackfulCoroutineWorker::BlockCurrentCoroAndScheduleNext()
261 {
262 // precondition: current coro is already added to the waiters_
263 BlockCurrentCoro();
264 // will transfer control to another coro...
265 ScheduleNextCoroUnlockRunnablesWaiters();
266 // ...this coro has been scheduled again: process finalization queue
267 FinalizeTerminatedCoros();
268 }
269
SuspendCurrentCoroAndScheduleNext()270 void StackfulCoroutineWorker::SuspendCurrentCoroAndScheduleNext()
271 {
272 SuspendCurrentCoro();
273 // will transfer control to another coro...
274 ScheduleNextCoroUnlockRunnables();
275 // ...this coro has been scheduled again: process finalization queue
276 FinalizeTerminatedCoros();
277 }
278
279 template <bool SUSPEND_AS_BLOCKED>
SuspendCurrentCoroGeneric()280 void StackfulCoroutineWorker::SuspendCurrentCoroGeneric()
281 {
282 auto *currentCoro = Coroutine::GetCurrent();
283 currentCoro->RequestSuspend(SUSPEND_AS_BLOCKED);
284 if constexpr (!SUSPEND_AS_BLOCKED) {
285 PushToRunnableQueue(currentCoro, false);
286 }
287 }
288
BlockCurrentCoro()289 void StackfulCoroutineWorker::BlockCurrentCoro()
290 {
291 SuspendCurrentCoroGeneric<true>();
292 }
293
SuspendCurrentCoro()294 void StackfulCoroutineWorker::SuspendCurrentCoro()
295 {
296 SuspendCurrentCoroGeneric<false>();
297 }
298
ScheduleNextCoroUnlockRunnablesWaiters()299 void StackfulCoroutineWorker::ScheduleNextCoroUnlockRunnablesWaiters()
300 {
301 // precondition: runnable coros are present
302 auto *currentCtx = GetCurrentContext();
303 auto *nextCtx = PrepareNextRunnableContextForSwitch();
304
305 runnablesLock_.Unlock();
306 waitersLock_.Unlock();
307
308 SwitchCoroutineContext(currentCtx, nextCtx);
309 }
310
ScheduleNextCoroUnlockRunnables()311 void StackfulCoroutineWorker::ScheduleNextCoroUnlockRunnables()
312 {
313 // precondition: runnable coros are present
314 auto *currentCtx = GetCurrentContext();
315 auto *nextCtx = PrepareNextRunnableContextForSwitch();
316
317 runnablesLock_.Unlock();
318
319 SwitchCoroutineContext(currentCtx, nextCtx);
320 }
321
ScheduleNextCoroUnlockNone()322 void StackfulCoroutineWorker::ScheduleNextCoroUnlockNone()
323 {
324 // precondition: runnable coros are present
325 auto *currentCtx = GetCurrentContext();
326 auto *nextCtx = PrepareNextRunnableContextForSwitch();
327 SwitchCoroutineContext(currentCtx, nextCtx);
328 }
329
GetCurrentContext() const330 StackfulCoroutineContext *StackfulCoroutineWorker::GetCurrentContext() const
331 {
332 auto *co = Coroutine::GetCurrent();
333 return co->GetContext<StackfulCoroutineContext>();
334 }
335
PrepareNextRunnableContextForSwitch()336 StackfulCoroutineContext *StackfulCoroutineWorker::PrepareNextRunnableContextForSwitch()
337 {
338 // precondition: runnable coros are present
339 auto *nextCtx = PopFromRunnableQueue()->GetContext<StackfulCoroutineContext>();
340 nextCtx->RequestResume();
341 Coroutine::SetCurrent(nextCtx->GetCoroutine());
342 return nextCtx;
343 }
344
SwitchCoroutineContext(StackfulCoroutineContext * from,StackfulCoroutineContext * to)345 void StackfulCoroutineWorker::SwitchCoroutineContext(StackfulCoroutineContext *from, StackfulCoroutineContext *to)
346 {
347 ASSERT(from != nullptr);
348 ASSERT(to != nullptr);
349 EnsureCoroutineSwitchEnabled();
350 LOG(DEBUG, COROUTINES) << "Ctx switch: " << from->GetCoroutine()->GetName() << " --> "
351 << to->GetCoroutine()->GetName();
352 stats_.FinishInterval(CoroutineTimeStats::SCH_ALL);
353 stats_.StartInterval(CoroutineTimeStats::CTX_SWITCH);
354 from->SwitchTo(to);
355 stats_.FinishInterval(CoroutineTimeStats::CTX_SWITCH);
356 stats_.StartInterval(CoroutineTimeStats::SCH_ALL);
357 }
358
FinalizeTerminatedCoros()359 void StackfulCoroutineWorker::FinalizeTerminatedCoros()
360 {
361 while (!finalizationQueue_.empty()) {
362 auto *f = finalizationQueue_.front();
363 finalizationQueue_.pop();
364 coroManager_->DestroyEntrypointfulCoroutine(f);
365 }
366 }
367
UpdateLoadFactor()368 void StackfulCoroutineWorker::UpdateLoadFactor()
369 {
370 loadFactor_ = (loadFactor_ + runnables_.size()) / 2U;
371 }
372
EnsureCoroutineSwitchEnabled()373 void StackfulCoroutineWorker::EnsureCoroutineSwitchEnabled()
374 {
375 if (IsCoroutineSwitchDisabled()) {
376 LOG(FATAL, COROUTINES) << "ERROR ERROR ERROR >>> Trying to switch coroutines on " << GetName()
377 << " when coroutine switch is DISABLED!!! <<< ERROR ERROR ERROR";
378 UNREACHABLE();
379 }
380 }
381
382 } // namespace ark
383