• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "runtime/include/thread_scopes.h"
17 #include "runtime/coroutines/stackful_coroutine_manager.h"
18 #include "runtime/coroutines/stackful_coroutine.h"
19 #include "runtime/coroutines/stackful_coroutine_worker.h"
20 
21 namespace panda {
22 
StackfulCoroutineWorker(Runtime * runtime,PandaVM * vm,StackfulCoroutineManager * coroManager,ScheduleLoopType type,PandaString name)23 StackfulCoroutineWorker::StackfulCoroutineWorker(Runtime *runtime, PandaVM *vm, StackfulCoroutineManager *coroManager,
24                                                  ScheduleLoopType type, PandaString name)
25     : runtime_(runtime),
26       vm_(vm),
27       coroManager_(coroManager),
28       id_(os::thread::GetCurrentThreadId()),
29       name_(std::move(name))
30 {
31     if (type == ScheduleLoopType::THREAD) {
32         std::thread t(&StackfulCoroutineWorker::ThreadProc, this);
33         os::thread::SetThreadName(t.native_handle(), name_.c_str());
34         t.detach();
35         // will create the schedule loop coroutine in the thread proc in order to set the stack protector correctly
36     } else {
37         scheduleLoopCtx_ =
38             coroManager->CreateNativeCoroutine(runtime, vm, ScheduleLoopProxy, this, "[fiber_sch] " + GetName());
39         PushToRunnableQueue(scheduleLoopCtx_);
40     }
41 }
42 
AddRunnableCoroutine(Coroutine * newCoro,bool prioritize)43 void StackfulCoroutineWorker::AddRunnableCoroutine(Coroutine *newCoro, bool prioritize)
44 {
45     PushToRunnableQueue(newCoro, prioritize);
46 }
47 
WaitForEvent(CoroutineEvent * awaitee)48 bool StackfulCoroutineWorker::WaitForEvent(CoroutineEvent *awaitee)
49 {
50     // precondition: this method is not called by the schedule loop coroutine
51 
52     Coroutine *waiter = Coroutine::GetCurrent();
53     ASSERT(GetCurrentContext()->GetWorker() == this);
54     ASSERT(awaitee != nullptr);
55 
56     if (awaitee->Happened()) {
57         awaitee->Unlock();
58         return false;
59     }
60 
61     waitersLock_.Lock();
62     awaitee->Unlock();
63     LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::AddWaitingCoroutine: " << waiter->GetName() << " AWAITS";
64     waiters_.insert({awaitee, waiter});
65 
66     runnablesLock_.Lock();
67     ASSERT(RunnableCoroutinesExist());
68     ScopedNativeCodeThread n(Coroutine::GetCurrent());
69     // will unlock waiters_lock_ and switch ctx
70     BlockCurrentCoroAndScheduleNext();
71 
72     return true;
73 }
74 
UnblockWaiters(CoroutineEvent * blocker)75 void StackfulCoroutineWorker::UnblockWaiters(CoroutineEvent *blocker)
76 {
77     os::memory::LockHolder lock(waitersLock_);
78     auto w = waiters_.find(blocker);
79     if (w != waiters_.end()) {
80         auto *coro = w->second;
81         waiters_.erase(w);
82         coro->RequestUnblock();
83         PushToRunnableQueue(coro);
84     }
85 }
86 
RequestFinalization(Coroutine * finalizee)87 void StackfulCoroutineWorker::RequestFinalization(Coroutine *finalizee)
88 {
89     // precondition: current coro and finalizee belong to the current worker
90     ASSERT(finalizee->GetContext<StackfulCoroutineContext>()->GetWorker() == this);
91     ASSERT(GetCurrentContext()->GetWorker() == this);
92 
93     finalizationQueue_.push(finalizee);
94     ScheduleNextCoroUnlockNone();
95 }
96 
RequestSchedule()97 void StackfulCoroutineWorker::RequestSchedule()
98 {
99     RequestScheduleImpl();
100 }
101 
FinalizeFiberScheduleLoop()102 void StackfulCoroutineWorker::FinalizeFiberScheduleLoop()
103 {
104     ASSERT(GetCurrentContext()->GetWorker() == this);
105 
106     // part of MAIN finalization sequence
107     if (RunnableCoroutinesExist()) {
108         // the schedule loop is still runnable
109         ASSERT(scheduleLoopCtx_->HasNativeEntrypoint());
110         runnablesLock_.Lock();
111         // sch loop only
112         ASSERT(runnables_.size() == 1);
113         SuspendCurrentCoroAndScheduleNext();
114     }
115 }
116 
DisableCoroutineSwitch()117 void StackfulCoroutineWorker::DisableCoroutineSwitch()
118 {
119     ++disableCoroSwitchCounter_;
120     LOG(DEBUG, COROUTINES) << "Coroutine switch on " << GetName()
121                            << " has been disabled! Recursive ctr = " << disableCoroSwitchCounter_;
122 }
123 
EnableCoroutineSwitch()124 void StackfulCoroutineWorker::EnableCoroutineSwitch()
125 {
126     ASSERT(IsCoroutineSwitchDisabled());
127     --disableCoroSwitchCounter_;
128     LOG(DEBUG, COROUTINES) << "Coroutine switch on " << GetName()
129                            << " has been enabled! Recursive ctr = " << disableCoroSwitchCounter_;
130 }
131 
IsCoroutineSwitchDisabled()132 bool StackfulCoroutineWorker::IsCoroutineSwitchDisabled()
133 {
134     return disableCoroSwitchCounter_ > 0;
135 }
136 
137 #ifndef NDEBUG
PrintRunnables(const PandaString & requester)138 void StackfulCoroutineWorker::PrintRunnables(const PandaString &requester)
139 {
140     os::memory::LockHolder lock(runnablesLock_);
141     LOG(DEBUG, COROUTINES) << "[" << requester << "] ";
142     for (auto *co : runnables_) {
143         LOG(DEBUG, COROUTINES) << co->GetName() << " <";
144     }
145     LOG(DEBUG, COROUTINES) << "X";
146 }
147 #endif
148 
ThreadProc()149 void StackfulCoroutineWorker::ThreadProc()
150 {
151     id_ = os::thread::GetCurrentThreadId();
152     scheduleLoopCtx_ = coroManager_->CreateEntrypointlessCoroutine(runtime_, vm_, false, "[thr_sch] " + GetName());
153     scheduleLoopCtx_->GetContext<StackfulCoroutineContext>()->SetWorker(this);
154     Coroutine::SetCurrent(scheduleLoopCtx_);
155     scheduleLoopCtx_->RequestResume();
156     scheduleLoopCtx_->NativeCodeBegin();
157     coroManager_->OnWorkerStartup();
158 
159     ScheduleLoopBody();
160 
161     coroManager_->DestroyEntrypointlessCoroutine(scheduleLoopCtx_);
162     ASSERT(id_ == os::thread::GetCurrentThreadId());
163     coroManager_->OnWorkerShutdown();
164 }
165 
ScheduleLoop()166 void StackfulCoroutineWorker::ScheduleLoop()
167 {
168     LOG(DEBUG, COROUTINES) << "[" << GetName() << "] Schedule loop called!";
169     ScheduleLoopBody();
170 }
171 
ScheduleLoopBody()172 void StackfulCoroutineWorker::ScheduleLoopBody()
173 {
174     ScopedManagedCodeThread s(scheduleLoopCtx_);
175     while (IsActive()) {
176         RequestScheduleImpl();
177         os::memory::LockHolder lkRunnables(runnablesLock_);
178         UpdateLoadFactor();
179     }
180 }
181 
ScheduleLoopProxy(void * worker)182 void StackfulCoroutineWorker::ScheduleLoopProxy(void *worker)
183 {
184     static_cast<StackfulCoroutineWorker *>(worker)->ScheduleLoop();
185 }
186 
PushToRunnableQueue(Coroutine * co,bool pushFront)187 void StackfulCoroutineWorker::PushToRunnableQueue(Coroutine *co, bool pushFront)
188 {
189     os::memory::LockHolder lock(runnablesLock_);
190     co->GetContext<StackfulCoroutineContext>()->SetWorker(this);
191 
192     if (pushFront) {
193         runnables_.push_front(co);
194     } else {
195         runnables_.push_back(co);
196     }
197     UpdateLoadFactor();
198 
199     runnablesCv_.Signal();
200 }
201 
PopFromRunnableQueue()202 Coroutine *StackfulCoroutineWorker::PopFromRunnableQueue()
203 {
204     os::memory::LockHolder lock(runnablesLock_);
205     ASSERT(!runnables_.empty());
206     auto *co = runnables_.front();
207     runnables_.pop_front();
208     UpdateLoadFactor();
209     return co;
210 }
211 
RunnableCoroutinesExist() const212 bool StackfulCoroutineWorker::RunnableCoroutinesExist() const
213 {
214     os::memory::LockHolder lock(runnablesLock_);
215     return !runnables_.empty();
216 }
217 
WaitForRunnables()218 void StackfulCoroutineWorker::WaitForRunnables()
219 {
220     // NOTE(konstanting): in case of work stealing, use timed wait and try periodically to steal some runnables
221     while (!RunnableCoroutinesExist() && IsActive()) {
222         runnablesCv_.Wait(
223             &runnablesLock_);  // or timed wait? we may miss the signal in some cases (e.g. IsActive() change)...
224         if (!RunnableCoroutinesExist() && IsActive()) {
225             LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::WaitForRunnables: spurious wakeup!";
226         } else {
227             LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::WaitForRunnables: wakeup!";
228         }
229     }
230 }
231 
RequestScheduleImpl()232 void StackfulCoroutineWorker::RequestScheduleImpl()
233 {
234     // precondition: called within the current worker, no cross-worker calls allowed
235     ASSERT(GetCurrentContext()->GetWorker() == this);
236     runnablesLock_.Lock();
237 
238     // NOTE(konstanting): implement coro migration, work stealing, etc.
239     ScopedNativeCodeThread n(Coroutine::GetCurrent());
240     if (RunnableCoroutinesExist()) {
241         SuspendCurrentCoroAndScheduleNext();
242     } else {
243         LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::RequestSchedule: No runnables, starting to wait...";
244         WaitForRunnables();
245         runnablesLock_.Unlock();
246     }
247 }
248 
BlockCurrentCoroAndScheduleNext()249 void StackfulCoroutineWorker::BlockCurrentCoroAndScheduleNext()
250 {
251     // precondition: current coro is already added to the waiters_
252     BlockCurrentCoro();
253     // will transfer control to another coro...
254     ScheduleNextCoroUnlockRunnablesWaiters();
255     // ...this coro has been scheduled again: process finalization queue
256     FinalizeTerminatedCoros();
257 }
258 
SuspendCurrentCoroAndScheduleNext()259 void StackfulCoroutineWorker::SuspendCurrentCoroAndScheduleNext()
260 {
261     SuspendCurrentCoro();
262     // will transfer control to another coro...
263     ScheduleNextCoroUnlockRunnables();
264     // ...this coro has been scheduled again: process finalization queue
265     FinalizeTerminatedCoros();
266 }
267 
268 template <bool SUSPEND_AS_BLOCKED>
SuspendCurrentCoroGeneric()269 void StackfulCoroutineWorker::SuspendCurrentCoroGeneric()
270 {
271     auto *currentCoro = Coroutine::GetCurrent();
272     currentCoro->RequestSuspend(SUSPEND_AS_BLOCKED);
273     if constexpr (!SUSPEND_AS_BLOCKED) {
274         PushToRunnableQueue(currentCoro, false);
275     }
276 }
277 
BlockCurrentCoro()278 void StackfulCoroutineWorker::BlockCurrentCoro()
279 {
280     SuspendCurrentCoroGeneric<true>();
281 }
282 
SuspendCurrentCoro()283 void StackfulCoroutineWorker::SuspendCurrentCoro()
284 {
285     SuspendCurrentCoroGeneric<false>();
286 }
287 
ScheduleNextCoroUnlockRunnablesWaiters()288 void StackfulCoroutineWorker::ScheduleNextCoroUnlockRunnablesWaiters()
289 {
290     // precondition: runnable coros are present
291     auto *currentCtx = GetCurrentContext();
292     auto *nextCtx = PrepareNextRunnableContextForSwitch();
293 
294     runnablesLock_.Unlock();
295     waitersLock_.Unlock();
296 
297     SwitchCoroutineContext(currentCtx, nextCtx);
298 }
299 
ScheduleNextCoroUnlockRunnables()300 void StackfulCoroutineWorker::ScheduleNextCoroUnlockRunnables()
301 {
302     // precondition: runnable coros are present
303     auto *currentCtx = GetCurrentContext();
304     auto *nextCtx = PrepareNextRunnableContextForSwitch();
305 
306     runnablesLock_.Unlock();
307 
308     SwitchCoroutineContext(currentCtx, nextCtx);
309 }
310 
ScheduleNextCoroUnlockNone()311 void StackfulCoroutineWorker::ScheduleNextCoroUnlockNone()
312 {
313     // precondition: runnable coros are present
314     auto *currentCtx = GetCurrentContext();
315     auto *nextCtx = PrepareNextRunnableContextForSwitch();
316     SwitchCoroutineContext(currentCtx, nextCtx);
317 }
318 
GetCurrentContext() const319 StackfulCoroutineContext *StackfulCoroutineWorker::GetCurrentContext() const
320 {
321     auto *co = Coroutine::GetCurrent();
322     return co->GetContext<StackfulCoroutineContext>();
323 }
324 
PrepareNextRunnableContextForSwitch()325 StackfulCoroutineContext *StackfulCoroutineWorker::PrepareNextRunnableContextForSwitch()
326 {
327     // precondition: runnable coros are present
328     auto *nextCtx = PopFromRunnableQueue()->GetContext<StackfulCoroutineContext>();
329     nextCtx->RequestResume();
330     Coroutine::SetCurrent(nextCtx->GetCoroutine());
331     return nextCtx;
332 }
333 
SwitchCoroutineContext(StackfulCoroutineContext * from,StackfulCoroutineContext * to)334 void StackfulCoroutineWorker::SwitchCoroutineContext(StackfulCoroutineContext *from, StackfulCoroutineContext *to)
335 {
336     ASSERT(from != nullptr);
337     ASSERT(to != nullptr);
338     EnsureCoroutineSwitchEnabled();
339     from->SwitchTo(to);
340 }
341 
FinalizeTerminatedCoros()342 void StackfulCoroutineWorker::FinalizeTerminatedCoros()
343 {
344     while (!finalizationQueue_.empty()) {
345         auto *f = finalizationQueue_.front();
346         finalizationQueue_.pop();
347         coroManager_->DestroyEntrypointfulCoroutine(f);
348     }
349 }
350 
UpdateLoadFactor()351 void StackfulCoroutineWorker::UpdateLoadFactor()
352 {
353     loadFactor_ = (loadFactor_ + runnables_.size()) / 2U;
354 }
355 
EnsureCoroutineSwitchEnabled()356 void StackfulCoroutineWorker::EnsureCoroutineSwitchEnabled()
357 {
358     if (IsCoroutineSwitchDisabled()) {
359         LOG(FATAL, COROUTINES) << "ERROR ERROR ERROR >>> Trying to switch coroutines on " << GetName()
360                                << " when coroutine switch is DISABLED!!! <<< ERROR ERROR ERROR";
361         UNREACHABLE();
362     }
363 }
364 
365 }  // namespace panda
366