1 /**
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "runtime/include/thread_scopes.h"
17 #include "runtime/coroutines/stackful_coroutine_manager.h"
18 #include "runtime/coroutines/stackful_coroutine.h"
19 #include "runtime/coroutines/stackful_coroutine_worker.h"
20
21 namespace panda {
22
StackfulCoroutineWorker(Runtime * runtime,PandaVM * vm,StackfulCoroutineManager * coroManager,ScheduleLoopType type,PandaString name)23 StackfulCoroutineWorker::StackfulCoroutineWorker(Runtime *runtime, PandaVM *vm, StackfulCoroutineManager *coroManager,
24 ScheduleLoopType type, PandaString name)
25 : runtime_(runtime),
26 vm_(vm),
27 coroManager_(coroManager),
28 id_(os::thread::GetCurrentThreadId()),
29 name_(std::move(name))
30 {
31 if (type == ScheduleLoopType::THREAD) {
32 std::thread t(&StackfulCoroutineWorker::ThreadProc, this);
33 os::thread::SetThreadName(t.native_handle(), name_.c_str());
34 t.detach();
35 // will create the schedule loop coroutine in the thread proc in order to set the stack protector correctly
36 } else {
37 scheduleLoopCtx_ =
38 coroManager->CreateNativeCoroutine(runtime, vm, ScheduleLoopProxy, this, "[fiber_sch] " + GetName());
39 PushToRunnableQueue(scheduleLoopCtx_);
40 }
41 }
42
AddRunnableCoroutine(Coroutine * newCoro,bool prioritize)43 void StackfulCoroutineWorker::AddRunnableCoroutine(Coroutine *newCoro, bool prioritize)
44 {
45 PushToRunnableQueue(newCoro, prioritize);
46 }
47
WaitForEvent(CoroutineEvent * awaitee)48 bool StackfulCoroutineWorker::WaitForEvent(CoroutineEvent *awaitee)
49 {
50 // precondition: this method is not called by the schedule loop coroutine
51
52 Coroutine *waiter = Coroutine::GetCurrent();
53 ASSERT(GetCurrentContext()->GetWorker() == this);
54 ASSERT(awaitee != nullptr);
55
56 if (awaitee->Happened()) {
57 awaitee->Unlock();
58 return false;
59 }
60
61 waitersLock_.Lock();
62 awaitee->Unlock();
63 LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::AddWaitingCoroutine: " << waiter->GetName() << " AWAITS";
64 waiters_.insert({awaitee, waiter});
65
66 runnablesLock_.Lock();
67 ASSERT(RunnableCoroutinesExist());
68 ScopedNativeCodeThread n(Coroutine::GetCurrent());
69 // will unlock waiters_lock_ and switch ctx
70 BlockCurrentCoroAndScheduleNext();
71
72 return true;
73 }
74
UnblockWaiters(CoroutineEvent * blocker)75 void StackfulCoroutineWorker::UnblockWaiters(CoroutineEvent *blocker)
76 {
77 os::memory::LockHolder lock(waitersLock_);
78 auto w = waiters_.find(blocker);
79 if (w != waiters_.end()) {
80 auto *coro = w->second;
81 waiters_.erase(w);
82 coro->RequestUnblock();
83 PushToRunnableQueue(coro);
84 }
85 }
86
RequestFinalization(Coroutine * finalizee)87 void StackfulCoroutineWorker::RequestFinalization(Coroutine *finalizee)
88 {
89 // precondition: current coro and finalizee belong to the current worker
90 ASSERT(finalizee->GetContext<StackfulCoroutineContext>()->GetWorker() == this);
91 ASSERT(GetCurrentContext()->GetWorker() == this);
92
93 finalizationQueue_.push(finalizee);
94 ScheduleNextCoroUnlockNone();
95 }
96
RequestSchedule()97 void StackfulCoroutineWorker::RequestSchedule()
98 {
99 RequestScheduleImpl();
100 }
101
FinalizeFiberScheduleLoop()102 void StackfulCoroutineWorker::FinalizeFiberScheduleLoop()
103 {
104 ASSERT(GetCurrentContext()->GetWorker() == this);
105
106 // part of MAIN finalization sequence
107 if (RunnableCoroutinesExist()) {
108 // the schedule loop is still runnable
109 ASSERT(scheduleLoopCtx_->HasNativeEntrypoint());
110 runnablesLock_.Lock();
111 // sch loop only
112 ASSERT(runnables_.size() == 1);
113 SuspendCurrentCoroAndScheduleNext();
114 }
115 }
116
DisableCoroutineSwitch()117 void StackfulCoroutineWorker::DisableCoroutineSwitch()
118 {
119 ++disableCoroSwitchCounter_;
120 LOG(DEBUG, COROUTINES) << "Coroutine switch on " << GetName()
121 << " has been disabled! Recursive ctr = " << disableCoroSwitchCounter_;
122 }
123
EnableCoroutineSwitch()124 void StackfulCoroutineWorker::EnableCoroutineSwitch()
125 {
126 ASSERT(IsCoroutineSwitchDisabled());
127 --disableCoroSwitchCounter_;
128 LOG(DEBUG, COROUTINES) << "Coroutine switch on " << GetName()
129 << " has been enabled! Recursive ctr = " << disableCoroSwitchCounter_;
130 }
131
IsCoroutineSwitchDisabled()132 bool StackfulCoroutineWorker::IsCoroutineSwitchDisabled()
133 {
134 return disableCoroSwitchCounter_ > 0;
135 }
136
137 #ifndef NDEBUG
PrintRunnables(const PandaString & requester)138 void StackfulCoroutineWorker::PrintRunnables(const PandaString &requester)
139 {
140 os::memory::LockHolder lock(runnablesLock_);
141 LOG(DEBUG, COROUTINES) << "[" << requester << "] ";
142 for (auto *co : runnables_) {
143 LOG(DEBUG, COROUTINES) << co->GetName() << " <";
144 }
145 LOG(DEBUG, COROUTINES) << "X";
146 }
147 #endif
148
ThreadProc()149 void StackfulCoroutineWorker::ThreadProc()
150 {
151 id_ = os::thread::GetCurrentThreadId();
152 scheduleLoopCtx_ = coroManager_->CreateEntrypointlessCoroutine(runtime_, vm_, false, "[thr_sch] " + GetName());
153 scheduleLoopCtx_->GetContext<StackfulCoroutineContext>()->SetWorker(this);
154 Coroutine::SetCurrent(scheduleLoopCtx_);
155 scheduleLoopCtx_->RequestResume();
156 scheduleLoopCtx_->NativeCodeBegin();
157 coroManager_->OnWorkerStartup();
158
159 ScheduleLoopBody();
160
161 coroManager_->DestroyEntrypointlessCoroutine(scheduleLoopCtx_);
162 ASSERT(id_ == os::thread::GetCurrentThreadId());
163 coroManager_->OnWorkerShutdown();
164 }
165
ScheduleLoop()166 void StackfulCoroutineWorker::ScheduleLoop()
167 {
168 LOG(DEBUG, COROUTINES) << "[" << GetName() << "] Schedule loop called!";
169 ScheduleLoopBody();
170 }
171
ScheduleLoopBody()172 void StackfulCoroutineWorker::ScheduleLoopBody()
173 {
174 ScopedManagedCodeThread s(scheduleLoopCtx_);
175 while (IsActive()) {
176 RequestScheduleImpl();
177 os::memory::LockHolder lkRunnables(runnablesLock_);
178 UpdateLoadFactor();
179 }
180 }
181
ScheduleLoopProxy(void * worker)182 void StackfulCoroutineWorker::ScheduleLoopProxy(void *worker)
183 {
184 static_cast<StackfulCoroutineWorker *>(worker)->ScheduleLoop();
185 }
186
PushToRunnableQueue(Coroutine * co,bool pushFront)187 void StackfulCoroutineWorker::PushToRunnableQueue(Coroutine *co, bool pushFront)
188 {
189 os::memory::LockHolder lock(runnablesLock_);
190 co->GetContext<StackfulCoroutineContext>()->SetWorker(this);
191
192 if (pushFront) {
193 runnables_.push_front(co);
194 } else {
195 runnables_.push_back(co);
196 }
197 UpdateLoadFactor();
198
199 runnablesCv_.Signal();
200 }
201
PopFromRunnableQueue()202 Coroutine *StackfulCoroutineWorker::PopFromRunnableQueue()
203 {
204 os::memory::LockHolder lock(runnablesLock_);
205 ASSERT(!runnables_.empty());
206 auto *co = runnables_.front();
207 runnables_.pop_front();
208 UpdateLoadFactor();
209 return co;
210 }
211
RunnableCoroutinesExist() const212 bool StackfulCoroutineWorker::RunnableCoroutinesExist() const
213 {
214 os::memory::LockHolder lock(runnablesLock_);
215 return !runnables_.empty();
216 }
217
WaitForRunnables()218 void StackfulCoroutineWorker::WaitForRunnables()
219 {
220 // NOTE(konstanting): in case of work stealing, use timed wait and try periodically to steal some runnables
221 while (!RunnableCoroutinesExist() && IsActive()) {
222 runnablesCv_.Wait(
223 &runnablesLock_); // or timed wait? we may miss the signal in some cases (e.g. IsActive() change)...
224 if (!RunnableCoroutinesExist() && IsActive()) {
225 LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::WaitForRunnables: spurious wakeup!";
226 } else {
227 LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::WaitForRunnables: wakeup!";
228 }
229 }
230 }
231
RequestScheduleImpl()232 void StackfulCoroutineWorker::RequestScheduleImpl()
233 {
234 // precondition: called within the current worker, no cross-worker calls allowed
235 ASSERT(GetCurrentContext()->GetWorker() == this);
236 runnablesLock_.Lock();
237
238 // NOTE(konstanting): implement coro migration, work stealing, etc.
239 ScopedNativeCodeThread n(Coroutine::GetCurrent());
240 if (RunnableCoroutinesExist()) {
241 SuspendCurrentCoroAndScheduleNext();
242 } else {
243 LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::RequestSchedule: No runnables, starting to wait...";
244 WaitForRunnables();
245 runnablesLock_.Unlock();
246 }
247 }
248
BlockCurrentCoroAndScheduleNext()249 void StackfulCoroutineWorker::BlockCurrentCoroAndScheduleNext()
250 {
251 // precondition: current coro is already added to the waiters_
252 BlockCurrentCoro();
253 // will transfer control to another coro...
254 ScheduleNextCoroUnlockRunnablesWaiters();
255 // ...this coro has been scheduled again: process finalization queue
256 FinalizeTerminatedCoros();
257 }
258
SuspendCurrentCoroAndScheduleNext()259 void StackfulCoroutineWorker::SuspendCurrentCoroAndScheduleNext()
260 {
261 SuspendCurrentCoro();
262 // will transfer control to another coro...
263 ScheduleNextCoroUnlockRunnables();
264 // ...this coro has been scheduled again: process finalization queue
265 FinalizeTerminatedCoros();
266 }
267
268 template <bool SUSPEND_AS_BLOCKED>
SuspendCurrentCoroGeneric()269 void StackfulCoroutineWorker::SuspendCurrentCoroGeneric()
270 {
271 auto *currentCoro = Coroutine::GetCurrent();
272 currentCoro->RequestSuspend(SUSPEND_AS_BLOCKED);
273 if constexpr (!SUSPEND_AS_BLOCKED) {
274 PushToRunnableQueue(currentCoro, false);
275 }
276 }
277
BlockCurrentCoro()278 void StackfulCoroutineWorker::BlockCurrentCoro()
279 {
280 SuspendCurrentCoroGeneric<true>();
281 }
282
SuspendCurrentCoro()283 void StackfulCoroutineWorker::SuspendCurrentCoro()
284 {
285 SuspendCurrentCoroGeneric<false>();
286 }
287
ScheduleNextCoroUnlockRunnablesWaiters()288 void StackfulCoroutineWorker::ScheduleNextCoroUnlockRunnablesWaiters()
289 {
290 // precondition: runnable coros are present
291 auto *currentCtx = GetCurrentContext();
292 auto *nextCtx = PrepareNextRunnableContextForSwitch();
293
294 runnablesLock_.Unlock();
295 waitersLock_.Unlock();
296
297 SwitchCoroutineContext(currentCtx, nextCtx);
298 }
299
ScheduleNextCoroUnlockRunnables()300 void StackfulCoroutineWorker::ScheduleNextCoroUnlockRunnables()
301 {
302 // precondition: runnable coros are present
303 auto *currentCtx = GetCurrentContext();
304 auto *nextCtx = PrepareNextRunnableContextForSwitch();
305
306 runnablesLock_.Unlock();
307
308 SwitchCoroutineContext(currentCtx, nextCtx);
309 }
310
ScheduleNextCoroUnlockNone()311 void StackfulCoroutineWorker::ScheduleNextCoroUnlockNone()
312 {
313 // precondition: runnable coros are present
314 auto *currentCtx = GetCurrentContext();
315 auto *nextCtx = PrepareNextRunnableContextForSwitch();
316 SwitchCoroutineContext(currentCtx, nextCtx);
317 }
318
GetCurrentContext() const319 StackfulCoroutineContext *StackfulCoroutineWorker::GetCurrentContext() const
320 {
321 auto *co = Coroutine::GetCurrent();
322 return co->GetContext<StackfulCoroutineContext>();
323 }
324
PrepareNextRunnableContextForSwitch()325 StackfulCoroutineContext *StackfulCoroutineWorker::PrepareNextRunnableContextForSwitch()
326 {
327 // precondition: runnable coros are present
328 auto *nextCtx = PopFromRunnableQueue()->GetContext<StackfulCoroutineContext>();
329 nextCtx->RequestResume();
330 Coroutine::SetCurrent(nextCtx->GetCoroutine());
331 return nextCtx;
332 }
333
SwitchCoroutineContext(StackfulCoroutineContext * from,StackfulCoroutineContext * to)334 void StackfulCoroutineWorker::SwitchCoroutineContext(StackfulCoroutineContext *from, StackfulCoroutineContext *to)
335 {
336 ASSERT(from != nullptr);
337 ASSERT(to != nullptr);
338 EnsureCoroutineSwitchEnabled();
339 from->SwitchTo(to);
340 }
341
FinalizeTerminatedCoros()342 void StackfulCoroutineWorker::FinalizeTerminatedCoros()
343 {
344 while (!finalizationQueue_.empty()) {
345 auto *f = finalizationQueue_.front();
346 finalizationQueue_.pop();
347 coroManager_->DestroyEntrypointfulCoroutine(f);
348 }
349 }
350
UpdateLoadFactor()351 void StackfulCoroutineWorker::UpdateLoadFactor()
352 {
353 loadFactor_ = (loadFactor_ + runnables_.size()) / 2U;
354 }
355
EnsureCoroutineSwitchEnabled()356 void StackfulCoroutineWorker::EnsureCoroutineSwitchEnabled()
357 {
358 if (IsCoroutineSwitchDisabled()) {
359 LOG(FATAL, COROUTINES) << "ERROR ERROR ERROR >>> Trying to switch coroutines on " << GetName()
360 << " when coroutine switch is DISABLED!!! <<< ERROR ERROR ERROR";
361 UNREACHABLE();
362 }
363 }
364
365 } // namespace panda
366