• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "runtime/include/thread_scopes.h"
17 #include "runtime/coroutines/stackful_coroutine_manager.h"
18 #include "runtime/coroutines/stackful_coroutine.h"
19 #include "runtime/coroutines/stackful_coroutine_worker.h"
20 
21 namespace ark {
22 
StackfulCoroutineWorker(Runtime * runtime,PandaVM * vm,StackfulCoroutineManager * coroManager,ScheduleLoopType type,PandaString name,size_t id)23 StackfulCoroutineWorker::StackfulCoroutineWorker(Runtime *runtime, PandaVM *vm, StackfulCoroutineManager *coroManager,
24                                                  ScheduleLoopType type, PandaString name, size_t id)
25     : runtime_(runtime),
26       vm_(vm),
27       coroManager_(coroManager),
28       threadId_(os::thread::GetCurrentThreadId()),
29       stats_(name),
30       name_(std::move(name)),
31       id_(id)
32 {
33     ASSERT(id <= stackful_coroutines::MAX_WORKER_ID);
34     LOG(DEBUG, COROUTINES) << "Created a coroutine worker instance: id=" << id_ << " name=" << name_;
35     if (type == ScheduleLoopType::THREAD) {
36         std::thread t(&StackfulCoroutineWorker::ThreadProc, this);
37         os::thread::SetThreadName(t.native_handle(), name_.c_str());
38         t.detach();
39         // will create the schedule loop coroutine in the thread proc in order to set the stack protector correctly
40     } else {
41         scheduleLoopCtx_ =
42             coroManager->CreateNativeCoroutine(runtime, vm, ScheduleLoopProxy, this, "[fiber_sch] " + GetName());
43         PushToRunnableQueue(scheduleLoopCtx_);
44     }
45 }
46 
AddRunnableCoroutine(Coroutine * newCoro,bool prioritize)47 void StackfulCoroutineWorker::AddRunnableCoroutine(Coroutine *newCoro, bool prioritize)
48 {
49     PushToRunnableQueue(newCoro, prioritize);
50 }
51 
WaitForEvent(CoroutineEvent * awaitee)52 bool StackfulCoroutineWorker::WaitForEvent(CoroutineEvent *awaitee)
53 {
54     // precondition: this method is not called by the schedule loop coroutine
55 
56     Coroutine *waiter = Coroutine::GetCurrent();
57     ASSERT(GetCurrentContext()->GetWorker() == this);
58     ASSERT(awaitee != nullptr);
59 
60     if (awaitee->Happened()) {
61         awaitee->Unlock();
62         return false;
63     }
64 
65     waitersLock_.Lock();
66     awaitee->Unlock();
67     LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::AddWaitingCoroutine: " << waiter->GetName() << " AWAITS";
68     waiters_.insert({awaitee, waiter});
69 
70     runnablesLock_.Lock();
71     ASSERT(RunnableCoroutinesExist());
72     ScopedNativeCodeThread n(Coroutine::GetCurrent());
73     // will unlock waiters_lock_ and switch ctx
74     BlockCurrentCoroAndScheduleNext();
75 
76     return true;
77 }
78 
UnblockWaiters(CoroutineEvent * blocker)79 void StackfulCoroutineWorker::UnblockWaiters(CoroutineEvent *blocker)
80 {
81     os::memory::LockHolder lock(waitersLock_);
82     auto w = waiters_.find(blocker);
83     while (w != waiters_.end()) {
84         auto *coro = w->second;
85         waiters_.erase(w);
86         coro->RequestUnblock();
87         PushToRunnableQueue(coro);
88         w = waiters_.find(blocker);
89     }
90 }
91 
RequestFinalization(Coroutine * finalizee)92 void StackfulCoroutineWorker::RequestFinalization(Coroutine *finalizee)
93 {
94     // precondition: current coro and finalizee belong to the current worker
95     ASSERT(finalizee->GetContext<StackfulCoroutineContext>()->GetWorker() == this);
96     ASSERT(GetCurrentContext()->GetWorker() == this);
97 
98     finalizationQueue_.push(finalizee);
99     ScheduleNextCoroUnlockNone();
100 }
101 
RequestSchedule()102 void StackfulCoroutineWorker::RequestSchedule()
103 {
104     RequestScheduleImpl();
105 }
106 
FinalizeFiberScheduleLoop()107 void StackfulCoroutineWorker::FinalizeFiberScheduleLoop()
108 {
109     ASSERT(GetCurrentContext()->GetWorker() == this);
110 
111     // part of MAIN finalization sequence
112     if (RunnableCoroutinesExist()) {
113         // the schedule loop is still runnable
114         ASSERT(scheduleLoopCtx_->HasNativeEntrypoint());
115         runnablesLock_.Lock();
116         // sch loop only
117         ASSERT(runnables_.size() == 1);
118         SuspendCurrentCoroAndScheduleNext();
119     }
120 }
121 
DisableCoroutineSwitch()122 void StackfulCoroutineWorker::DisableCoroutineSwitch()
123 {
124     ++disableCoroSwitchCounter_;
125     LOG(DEBUG, COROUTINES) << "Coroutine switch on " << GetName()
126                            << " has been disabled! Recursive ctr = " << disableCoroSwitchCounter_;
127 }
128 
EnableCoroutineSwitch()129 void StackfulCoroutineWorker::EnableCoroutineSwitch()
130 {
131     ASSERT(IsCoroutineSwitchDisabled());
132     --disableCoroSwitchCounter_;
133     LOG(DEBUG, COROUTINES) << "Coroutine switch on " << GetName()
134                            << " has been enabled! Recursive ctr = " << disableCoroSwitchCounter_;
135 }
136 
IsCoroutineSwitchDisabled()137 bool StackfulCoroutineWorker::IsCoroutineSwitchDisabled()
138 {
139     return disableCoroSwitchCounter_ > 0;
140 }
141 
142 #ifndef NDEBUG
PrintRunnables(const PandaString & requester)143 void StackfulCoroutineWorker::PrintRunnables(const PandaString &requester)
144 {
145     os::memory::LockHolder lock(runnablesLock_);
146     LOG(DEBUG, COROUTINES) << "[" << requester << "] ";
147     for (auto *co : runnables_) {
148         LOG(DEBUG, COROUTINES) << co->GetName() << " <";
149     }
150     LOG(DEBUG, COROUTINES) << "X";
151 }
152 #endif
153 
ThreadProc()154 void StackfulCoroutineWorker::ThreadProc()
155 {
156     threadId_ = os::thread::GetCurrentThreadId();
157     scheduleLoopCtx_ = coroManager_->CreateEntrypointlessCoroutine(runtime_, vm_, false, "[thr_sch] " + GetName());
158     scheduleLoopCtx_->GetContext<StackfulCoroutineContext>()->SetWorker(this);
159     Coroutine::SetCurrent(scheduleLoopCtx_);
160     scheduleLoopCtx_->RequestResume();
161     scheduleLoopCtx_->NativeCodeBegin();
162     coroManager_->OnWorkerStartup();
163 
164     // profiling: start interval here, end in ctxswitch
165     stats_.StartInterval(CoroutineTimeStats::SCH_ALL);
166     ScheduleLoopBody();
167 
168     coroManager_->DestroyEntrypointlessCoroutine(scheduleLoopCtx_);
169     ASSERT(threadId_ == os::thread::GetCurrentThreadId());
170     coroManager_->OnWorkerShutdown();
171 }
172 
ScheduleLoop()173 void StackfulCoroutineWorker::ScheduleLoop()
174 {
175     LOG(DEBUG, COROUTINES) << "[" << GetName() << "] Schedule loop called!";
176     ScheduleLoopBody();
177 }
178 
ScheduleLoopBody()179 void StackfulCoroutineWorker::ScheduleLoopBody()
180 {
181     ScopedManagedCodeThread s(scheduleLoopCtx_);
182     while (IsActive()) {
183         RequestScheduleImpl();
184         os::memory::LockHolder lkRunnables(runnablesLock_);
185         UpdateLoadFactor();
186     }
187 }
188 
ScheduleLoopProxy(void * worker)189 void StackfulCoroutineWorker::ScheduleLoopProxy(void *worker)
190 {
191     static_cast<StackfulCoroutineWorker *>(worker)->ScheduleLoop();
192 }
193 
PushToRunnableQueue(Coroutine * co,bool pushFront)194 void StackfulCoroutineWorker::PushToRunnableQueue(Coroutine *co, bool pushFront)
195 {
196     os::memory::LockHolder lock(runnablesLock_);
197     co->GetContext<StackfulCoroutineContext>()->SetWorker(this);
198 
199     if (pushFront) {
200         runnables_.push_front(co);
201     } else {
202         runnables_.push_back(co);
203     }
204     UpdateLoadFactor();
205 
206     runnablesCv_.Signal();
207 }
208 
PopFromRunnableQueue()209 Coroutine *StackfulCoroutineWorker::PopFromRunnableQueue()
210 {
211     os::memory::LockHolder lock(runnablesLock_);
212     ASSERT(!runnables_.empty());
213     auto *co = runnables_.front();
214     runnables_.pop_front();
215     UpdateLoadFactor();
216     return co;
217 }
218 
RunnableCoroutinesExist() const219 bool StackfulCoroutineWorker::RunnableCoroutinesExist() const
220 {
221     os::memory::LockHolder lock(runnablesLock_);
222     return !runnables_.empty();
223 }
224 
WaitForRunnables()225 void StackfulCoroutineWorker::WaitForRunnables()
226 {
227     // NOTE(konstanting): in case of work stealing, use timed wait and try periodically to steal some runnables
228     while (!RunnableCoroutinesExist() && IsActive()) {
229         // profiling: no need to profile the SLEEPING state, closing the interval
230         stats_.FinishInterval(CoroutineTimeStats::SCH_ALL);
231         runnablesCv_.Wait(
232             &runnablesLock_);  // or timed wait? we may miss the signal in some cases (e.g. IsActive() change)...
233         // profiling: reopening the interval after the sleep
234         stats_.StartInterval(CoroutineTimeStats::SCH_ALL);
235         if (!RunnableCoroutinesExist() && IsActive()) {
236             LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::WaitForRunnables: spurious wakeup!";
237         } else {
238             LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::WaitForRunnables: wakeup!";
239         }
240     }
241 }
242 
RequestScheduleImpl()243 void StackfulCoroutineWorker::RequestScheduleImpl()
244 {
245     // precondition: called within the current worker, no cross-worker calls allowed
246     ASSERT(GetCurrentContext()->GetWorker() == this);
247     runnablesLock_.Lock();
248 
249     // NOTE(konstanting): implement coro migration, work stealing, etc.
250     ScopedNativeCodeThread n(Coroutine::GetCurrent());
251     if (RunnableCoroutinesExist()) {
252         SuspendCurrentCoroAndScheduleNext();
253     } else {
254         LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::RequestSchedule: No runnables, starting to wait...";
255         WaitForRunnables();
256         runnablesLock_.Unlock();
257     }
258 }
259 
BlockCurrentCoroAndScheduleNext()260 void StackfulCoroutineWorker::BlockCurrentCoroAndScheduleNext()
261 {
262     // precondition: current coro is already added to the waiters_
263     BlockCurrentCoro();
264     // will transfer control to another coro...
265     ScheduleNextCoroUnlockRunnablesWaiters();
266     // ...this coro has been scheduled again: process finalization queue
267     FinalizeTerminatedCoros();
268 }
269 
SuspendCurrentCoroAndScheduleNext()270 void StackfulCoroutineWorker::SuspendCurrentCoroAndScheduleNext()
271 {
272     SuspendCurrentCoro();
273     // will transfer control to another coro...
274     ScheduleNextCoroUnlockRunnables();
275     // ...this coro has been scheduled again: process finalization queue
276     FinalizeTerminatedCoros();
277 }
278 
279 template <bool SUSPEND_AS_BLOCKED>
SuspendCurrentCoroGeneric()280 void StackfulCoroutineWorker::SuspendCurrentCoroGeneric()
281 {
282     auto *currentCoro = Coroutine::GetCurrent();
283     currentCoro->RequestSuspend(SUSPEND_AS_BLOCKED);
284     if constexpr (!SUSPEND_AS_BLOCKED) {
285         PushToRunnableQueue(currentCoro, false);
286     }
287 }
288 
BlockCurrentCoro()289 void StackfulCoroutineWorker::BlockCurrentCoro()
290 {
291     SuspendCurrentCoroGeneric<true>();
292 }
293 
SuspendCurrentCoro()294 void StackfulCoroutineWorker::SuspendCurrentCoro()
295 {
296     SuspendCurrentCoroGeneric<false>();
297 }
298 
ScheduleNextCoroUnlockRunnablesWaiters()299 void StackfulCoroutineWorker::ScheduleNextCoroUnlockRunnablesWaiters()
300 {
301     // precondition: runnable coros are present
302     auto *currentCtx = GetCurrentContext();
303     auto *nextCtx = PrepareNextRunnableContextForSwitch();
304 
305     runnablesLock_.Unlock();
306     waitersLock_.Unlock();
307 
308     SwitchCoroutineContext(currentCtx, nextCtx);
309 }
310 
ScheduleNextCoroUnlockRunnables()311 void StackfulCoroutineWorker::ScheduleNextCoroUnlockRunnables()
312 {
313     // precondition: runnable coros are present
314     auto *currentCtx = GetCurrentContext();
315     auto *nextCtx = PrepareNextRunnableContextForSwitch();
316 
317     runnablesLock_.Unlock();
318 
319     SwitchCoroutineContext(currentCtx, nextCtx);
320 }
321 
ScheduleNextCoroUnlockNone()322 void StackfulCoroutineWorker::ScheduleNextCoroUnlockNone()
323 {
324     // precondition: runnable coros are present
325     auto *currentCtx = GetCurrentContext();
326     auto *nextCtx = PrepareNextRunnableContextForSwitch();
327     SwitchCoroutineContext(currentCtx, nextCtx);
328 }
329 
GetCurrentContext() const330 StackfulCoroutineContext *StackfulCoroutineWorker::GetCurrentContext() const
331 {
332     auto *co = Coroutine::GetCurrent();
333     return co->GetContext<StackfulCoroutineContext>();
334 }
335 
PrepareNextRunnableContextForSwitch()336 StackfulCoroutineContext *StackfulCoroutineWorker::PrepareNextRunnableContextForSwitch()
337 {
338     // precondition: runnable coros are present
339     auto *nextCtx = PopFromRunnableQueue()->GetContext<StackfulCoroutineContext>();
340     nextCtx->RequestResume();
341     Coroutine::SetCurrent(nextCtx->GetCoroutine());
342     return nextCtx;
343 }
344 
SwitchCoroutineContext(StackfulCoroutineContext * from,StackfulCoroutineContext * to)345 void StackfulCoroutineWorker::SwitchCoroutineContext(StackfulCoroutineContext *from, StackfulCoroutineContext *to)
346 {
347     ASSERT(from != nullptr);
348     ASSERT(to != nullptr);
349     EnsureCoroutineSwitchEnabled();
350     LOG(DEBUG, COROUTINES) << "Ctx switch: " << from->GetCoroutine()->GetName() << " --> "
351                            << to->GetCoroutine()->GetName();
352     stats_.FinishInterval(CoroutineTimeStats::SCH_ALL);
353     stats_.StartInterval(CoroutineTimeStats::CTX_SWITCH);
354     from->SwitchTo(to);
355     stats_.FinishInterval(CoroutineTimeStats::CTX_SWITCH);
356     stats_.StartInterval(CoroutineTimeStats::SCH_ALL);
357 }
358 
FinalizeTerminatedCoros()359 void StackfulCoroutineWorker::FinalizeTerminatedCoros()
360 {
361     while (!finalizationQueue_.empty()) {
362         auto *f = finalizationQueue_.front();
363         finalizationQueue_.pop();
364         coroManager_->DestroyEntrypointfulCoroutine(f);
365     }
366 }
367 
UpdateLoadFactor()368 void StackfulCoroutineWorker::UpdateLoadFactor()
369 {
370     loadFactor_ = (loadFactor_ + runnables_.size()) / 2U;
371 }
372 
EnsureCoroutineSwitchEnabled()373 void StackfulCoroutineWorker::EnsureCoroutineSwitchEnabled()
374 {
375     if (IsCoroutineSwitchDisabled()) {
376         LOG(FATAL, COROUTINES) << "ERROR ERROR ERROR >>> Trying to switch coroutines on " << GetName()
377                                << " when coroutine switch is DISABLED!!! <<< ERROR ERROR ERROR";
378         UNREACHABLE();
379     }
380 }
381 
382 }  // namespace ark
383