1 /**
2 * Copyright (c) 2023-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "libpandabase/os/time.h"
17 #include "runtime/include/thread_scopes.h"
18 #include "runtime/coroutines/stackful_coroutine_manager.h"
19 #include "runtime/coroutines/stackful_coroutine.h"
20 #include "runtime/coroutines/stackful_coroutine_worker.h"
21
22 namespace ark {
23
StackfulCoroutineWorker(Runtime * runtime,PandaVM * vm,StackfulCoroutineManager * coroManager,ScheduleLoopType type,PandaString name,size_t id)24 StackfulCoroutineWorker::StackfulCoroutineWorker(Runtime *runtime, PandaVM *vm, StackfulCoroutineManager *coroManager,
25 ScheduleLoopType type, PandaString name, size_t id)
26 : CoroutineWorker(runtime, vm),
27 coroManager_(coroManager),
28 threadId_(os::thread::GetCurrentThreadId()),
29 workerCompletionEvent_(coroManager),
30 stats_(name),
31 name_(std::move(name)),
32 id_(id)
33 {
34 ASSERT(id <= stackful_coroutines::MAX_WORKER_ID);
35 LOG(DEBUG, COROUTINES) << "Created a coroutine worker instance: id=" << id_ << " name=" << name_;
36 if (type == ScheduleLoopType::THREAD) {
37 std::thread t(&StackfulCoroutineWorker::ThreadProc, this);
38 os::thread::SetThreadName(t.native_handle(), name_.c_str());
39 t.detach();
40 // will create the schedule loop coroutine in the thread proc in order to set the stack protector correctly
41 } else {
42 scheduleLoopCtx_ = coroManager->CreateNativeCoroutine(GetRuntime(), GetPandaVM(), ScheduleLoopProxy, this,
43 "[fiber_sch] " + GetName(), Coroutine::Type::SCHEDULER,
44 CoroutinePriority::MEDIUM_PRIORITY);
45 AddRunnableCoroutine(scheduleLoopCtx_);
46 }
47 }
48
AddRunnableCoroutine(Coroutine * newCoro)49 void StackfulCoroutineWorker::AddRunnableCoroutine(Coroutine *newCoro)
50 {
51 ASSERT(newCoro != nullptr);
52 os::memory::LockHolder lock(runnablesLock_);
53 PushToRunnableQueue(newCoro, newCoro->GetPriority());
54 RegisterIncomingActiveCoroutine(newCoro);
55 }
56
AddRunningCoroutine(Coroutine * newCoro)57 void StackfulCoroutineWorker::AddRunningCoroutine(Coroutine *newCoro)
58 {
59 ASSERT(newCoro != nullptr);
60 RegisterIncomingActiveCoroutine(newCoro);
61 }
62
AddCreatedCoroutineAndSwitchToIt(Coroutine * newCoro)63 void StackfulCoroutineWorker::AddCreatedCoroutineAndSwitchToIt(Coroutine *newCoro)
64 {
65 // precondition: called within the current worker, no cross-worker calls allowed
66 ASSERT(GetCurrentContext()->GetWorker() == this);
67
68 auto *coro = Coroutine::GetCurrent();
69 ScopedNativeCodeThread n(coro);
70 coro->RequestSuspend(false);
71
72 auto *currentCtx = GetCurrentContext();
73 auto *nextCtx = newCoro->GetContext<StackfulCoroutineContext>();
74 nextCtx->RequestResume();
75 Coroutine::SetCurrent(newCoro);
76 RegisterIncomingActiveCoroutine(newCoro);
77
78 SwitchCoroutineContext(currentCtx, nextCtx);
79
80 // process finalization queue once this coro gets scheduled again
81 FinalizeTerminatedCoros();
82 }
83
WaitForEvent(CoroutineEvent * awaitee)84 void StackfulCoroutineWorker::WaitForEvent(CoroutineEvent *awaitee)
85 {
86 // precondition: this method is not called by the schedule loop coroutine
87
88 Coroutine *waiter = Coroutine::GetCurrent();
89 ASSERT(GetCurrentContext()->GetWorker() == this);
90 ASSERT(awaitee != nullptr);
91 ASSERT(!awaitee->Happened());
92 ASSERT(waiter->IsInNativeCode());
93
94 waitersLock_.Lock();
95 awaitee->Unlock();
96 LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::AddWaitingCoroutine: " << waiter->GetName() << " AWAITS";
97 [[maybe_unused]] auto [_, inserted] = waiters_.insert({awaitee, waiter});
98 ASSERT(inserted);
99
100 runnablesLock_.Lock();
101 ASSERT(RunnableCoroutinesExist());
102 // will unlock waiters_lock_ and switch ctx.
103 // NB! If migration on await is enabled, current coro can migrate to another worker, so
104 // IsCrossWorkerCall() will become true after resume!
105 BlockCurrentCoroAndScheduleNext();
106 }
107
UnblockWaiters(CoroutineEvent * blocker)108 void StackfulCoroutineWorker::UnblockWaiters(CoroutineEvent *blocker)
109 {
110 bool canMigrateAwait = coroManager_->IsMigrateAwakenedCorosEnabled() && !IsMainWorker() && !InExclusiveMode();
111 Coroutine *unblockedCoro = nullptr;
112 {
113 os::memory::LockHolder lockW(waitersLock_);
114 auto w = waiters_.find(blocker);
115 if (w != waiters_.end()) {
116 unblockedCoro = w->second;
117 waiters_.erase(w);
118 if (!canMigrateAwait) {
119 os::memory::LockHolder lockR(runnablesLock_);
120 unblockedCoro->RequestUnblock();
121 PushToRunnableQueue(unblockedCoro, unblockedCoro->GetPriority());
122 } else {
123 // (wangyuzhong,#24880): in case of IsMigrateAwakenedCorosEnabled() we need to correctly issue the
124 // external scheduler request from the correct worker. Here the coroutine becomes Active on one
125 // worker(which causes the request to be sent) and then gets potentially transferred to another worker.
126 unblockedCoro->RequestUnblock();
127 }
128 }
129 }
130 if (unblockedCoro == nullptr) {
131 LOG(DEBUG, COROUTINES) << "The coroutine is nullptr.";
132 return;
133 }
134 if (canMigrateAwait) {
135 coroManager_->MigrateAwakenedCoro(unblockedCoro);
136 }
137 if (IsDisabledForCrossWorkersLaunch()) {
138 workerCompletionEvent_.Happen();
139 }
140 }
141
RequestFinalization(Coroutine * finalizee)142 void StackfulCoroutineWorker::RequestFinalization(Coroutine *finalizee)
143 {
144 // precondition: current coro and finalizee belong to the current worker
145 ASSERT(finalizee->GetWorker() == this);
146 ASSERT(GetCurrentContext()->GetWorker() == this);
147
148 finalizationQueue_.push(finalizee);
149 // finalizee will never be scheduled again
150 ScheduleNextCoroUnlockNone();
151 }
152
RequestSchedule()153 void StackfulCoroutineWorker::RequestSchedule()
154 {
155 RequestScheduleImpl();
156 }
157
FinalizeFiberScheduleLoop()158 void StackfulCoroutineWorker::FinalizeFiberScheduleLoop()
159 {
160 ASSERT(GetCurrentContext()->GetWorker() == this);
161
162 // part of MAIN finalization sequence
163 if (RunnableCoroutinesExist()) {
164 // the schedule loop is still runnable
165 ASSERT(scheduleLoopCtx_->HasNativeEntrypoint());
166 runnablesLock_.Lock();
167 // sch loop only
168 ASSERT(runnables_.Size() == 1);
169 SuspendCurrentCoroAndScheduleNext();
170 ASSERT(!IsCrossWorkerCall());
171 }
172 }
173
CompleteAllAffinedCoroutines()174 void StackfulCoroutineWorker::CompleteAllAffinedCoroutines()
175 {
176 ASSERT_NATIVE_CODE();
177 ASSERT(GetCurrentContext()->GetWorker() == this);
178 ASSERT(IsDisabledForCrossWorkersLaunch());
179
180 // CC-OFFNXT(G.FMT.04-CPP): project code style
181 auto lock = [](auto &&...locks) { ([&]() NO_THREAD_SAFETY_ANALYSIS { locks.Lock(); }(), ...); };
182 // CC-OFFNXT(G.FMT.04-CPP): project code style
183 auto unlock = [](auto &&...locks) { ([&]() NO_THREAD_SAFETY_ANALYSIS { locks.Unlock(); }(), ...); };
184
185 // CC-OFFNXT(G.CTL.03): false positive
186 while (true) {
187 lock(waitersLock_, runnablesLock_);
188 if (runnables_.Size() > 1) {
189 unlock(waitersLock_, runnablesLock_);
190 coroManager_->Schedule();
191 } else if (!waiters_.empty()) {
192 workerCompletionEvent_.SetNotHappened();
193 workerCompletionEvent_.Lock();
194 unlock(waitersLock_, runnablesLock_);
195 coroManager_->Await(&workerCompletionEvent_);
196 } else {
197 unlock(waitersLock_, runnablesLock_);
198 break;
199 }
200 }
201 }
202
DisableCoroutineSwitch()203 void StackfulCoroutineWorker::DisableCoroutineSwitch()
204 {
205 ++disableCoroSwitchCounter_;
206 LOG(DEBUG, COROUTINES) << "Coroutine switch on " << GetName()
207 << " has been disabled! Recursive ctr = " << disableCoroSwitchCounter_;
208 }
209
EnableCoroutineSwitch()210 void StackfulCoroutineWorker::EnableCoroutineSwitch()
211 {
212 ASSERT(IsCoroutineSwitchDisabled());
213 --disableCoroSwitchCounter_;
214 LOG(DEBUG, COROUTINES) << "Coroutine switch on " << GetName()
215 << " has been enabled! Recursive ctr = " << disableCoroSwitchCounter_;
216 }
217
IsCoroutineSwitchDisabled()218 bool StackfulCoroutineWorker::IsCoroutineSwitchDisabled()
219 {
220 return disableCoroSwitchCounter_ > 0;
221 }
222
223 #ifndef NDEBUG
PrintRunnables(const PandaString & requester)224 void StackfulCoroutineWorker::PrintRunnables(const PandaString &requester)
225 {
226 os::memory::LockHolder lock(runnablesLock_);
227 LOG(DEBUG, COROUTINES) << "[" << requester << "] ";
228 runnables_.IterateOverCoroutines([](Coroutine *co) { LOG(DEBUG, COROUTINES) << co->GetName() << " <"; });
229 LOG(DEBUG, COROUTINES) << "X";
230 }
231 #endif
232
GetRunnablesCount(Coroutine::Type type)233 size_t StackfulCoroutineWorker::GetRunnablesCount(Coroutine::Type type)
234 {
235 os::memory::LockHolder lock(runnablesLock_);
236 size_t runnablesCount = 0;
237 runnables_.IterateOverCoroutines([type, &runnablesCount](Coroutine *coro) {
238 if (coro->GetType() == type) {
239 runnablesCount++;
240 }
241 });
242 return runnablesCount;
243 }
244
ThreadProc()245 void StackfulCoroutineWorker::ThreadProc()
246 {
247 threadId_ = os::thread::GetCurrentThreadId();
248 scheduleLoopCtx_ =
249 coroManager_->CreateEntrypointlessCoroutine(GetRuntime(), GetPandaVM(), false, "[thr_sch] " + GetName(),
250 Coroutine::Type::SCHEDULER, CoroutinePriority::MEDIUM_PRIORITY);
251 Coroutine::SetCurrent(scheduleLoopCtx_);
252 scheduleLoopCtx_->RequestResume();
253 AddRunningCoroutine(scheduleLoopCtx_);
254 scheduleLoopCtx_->NativeCodeBegin();
255 coroManager_->OnWorkerStartup(this);
256
257 // profiling: start interval here, end in ctxswitch
258 stats_.StartInterval(CoroutineTimeStats::SCH_ALL);
259 ScheduleLoopBody();
260
261 coroManager_->DestroyEntrypointlessCoroutine(scheduleLoopCtx_);
262 ASSERT(threadId_ == os::thread::GetCurrentThreadId());
263 coroManager_->OnWorkerShutdown(this);
264 }
265
ScheduleLoop()266 void StackfulCoroutineWorker::ScheduleLoop()
267 {
268 LOG(DEBUG, COROUTINES) << "[" << GetName() << "] Schedule loop called!";
269 ScheduleLoopBody();
270 }
271
ScheduleLoopBody()272 void StackfulCoroutineWorker::ScheduleLoopBody()
273 {
274 while (IsActive()) {
275 RequestScheduleImpl();
276 os::memory::LockHolder lkRunnables(runnablesLock_);
277 UpdateLoadFactor();
278 }
279 }
280
ScheduleLoopProxy(void * worker)281 void StackfulCoroutineWorker::ScheduleLoopProxy(void *worker)
282 {
283 static_cast<StackfulCoroutineWorker *>(worker)->ScheduleLoop();
284 }
285
PushToRunnableQueue(Coroutine * co,CoroutinePriority priority)286 void StackfulCoroutineWorker::PushToRunnableQueue(Coroutine *co, CoroutinePriority priority)
287 {
288 runnables_.Push(co, priority);
289 UpdateLoadFactor();
290 runnablesCv_.Signal();
291 }
292
PopFromRunnableQueue()293 Coroutine *StackfulCoroutineWorker::PopFromRunnableQueue()
294 {
295 os::memory::LockHolder lock(runnablesLock_);
296 ASSERT(!runnables_.Empty());
297 auto [co, _] = runnables_.Pop();
298 UpdateLoadFactor();
299 return co;
300 }
301
RunnableCoroutinesExist() const302 bool StackfulCoroutineWorker::RunnableCoroutinesExist() const
303 {
304 os::memory::LockHolder lock(runnablesLock_);
305 return !runnables_.Empty();
306 }
307
WaitForRunnables()308 void StackfulCoroutineWorker::WaitForRunnables()
309 {
310 // NOTE(konstanting): in case of work stealing, use timed wait and try periodically to steal some runnables
311 while (!RunnableCoroutinesExist() && IsActive()) {
312 // profiling: no need to profile the SLEEPING state, closing the interval
313 stats_.FinishInterval(CoroutineTimeStats::SCH_ALL);
314 runnablesCv_.Wait(
315 &runnablesLock_); // or timed wait? we may miss the signal in some cases (e.g. IsActive() change)...
316 // profiling: reopening the interval after the sleep
317 stats_.StartInterval(CoroutineTimeStats::SCH_ALL);
318 if (!RunnableCoroutinesExist() && IsActive()) {
319 LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::WaitForRunnables: spurious wakeup!";
320 } else {
321 LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::WaitForRunnables: wakeup!";
322 }
323 }
324 }
325
RegisterIncomingActiveCoroutine(Coroutine * newCoro)326 void StackfulCoroutineWorker::RegisterIncomingActiveCoroutine(Coroutine *newCoro)
327 {
328 ASSERT(newCoro != nullptr);
329 newCoro->SetWorker(this);
330 auto canMigrate = newCoro->GetContext<StackfulCoroutineContext>()->IsMigrationAllowed();
331 newCoro->LinkToExternalHolder(IsMainWorker() && !canMigrate);
332 }
333
RequestScheduleImpl()334 void StackfulCoroutineWorker::RequestScheduleImpl()
335 {
336 // precondition: called within the current worker, no cross-worker calls allowed
337 ASSERT(GetCurrentContext()->GetWorker() == this);
338 ASSERT_NATIVE_CODE();
339 runnablesLock_.Lock();
340
341 // NOTE(konstanting): implement coro migration, work stealing, etc.
342 if (RunnableCoroutinesExist()) {
343 SuspendCurrentCoroAndScheduleNext();
344 ASSERT(!IsCrossWorkerCall() || (Coroutine::GetCurrent()->GetType() == Coroutine::Type::MUTATOR));
345 } else {
346 coroManager_->TriggerMigration();
347 LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::RequestSchedule: No runnables, starting to wait...";
348 WaitForRunnables();
349 runnablesLock_.Unlock();
350 }
351 }
352
BlockCurrentCoroAndScheduleNext()353 void StackfulCoroutineWorker::BlockCurrentCoroAndScheduleNext()
354 {
355 // precondition: current coro is already added to the waiters_
356 BlockCurrentCoro();
357 // will transfer control to another coro... Can change current coroutine's host worker!
358 ScheduleNextCoroUnlockRunnablesWaiters();
359 // ...this coro has been scheduled again: process finalization queue
360 if (!IsCrossWorkerCall()) {
361 FinalizeTerminatedCoros();
362 } else {
363 // migration happened!
364 }
365 }
366
SuspendCurrentCoroAndScheduleNext()367 void StackfulCoroutineWorker::SuspendCurrentCoroAndScheduleNext()
368 {
369 // will transfer control to another coro... Can change current coroutine's host worker!
370 SuspendCurrentCoro();
371 // will transfer control to another coro...
372 ScheduleNextCoroUnlockRunnables();
373 // ...this coro has been scheduled again: process finalization queue
374 if (!IsCrossWorkerCall()) {
375 FinalizeTerminatedCoros();
376 } else {
377 // migration happened!
378 }
379 }
380
381 template <bool SUSPEND_AS_BLOCKED>
SuspendCurrentCoroGeneric()382 void StackfulCoroutineWorker::SuspendCurrentCoroGeneric()
383 {
384 auto *currentCoro = Coroutine::GetCurrent();
385 ASSERT(currentCoro != nullptr);
386 currentCoro->RequestSuspend(SUSPEND_AS_BLOCKED);
387 if constexpr (!SUSPEND_AS_BLOCKED) {
388 os::memory::LockHolder lock(runnablesLock_);
389 PushToRunnableQueue(currentCoro, currentCoro->GetPriority());
390 }
391 }
392
BlockCurrentCoro()393 void StackfulCoroutineWorker::BlockCurrentCoro()
394 {
395 SuspendCurrentCoroGeneric<true>();
396 }
397
SuspendCurrentCoro()398 void StackfulCoroutineWorker::SuspendCurrentCoro()
399 {
400 SuspendCurrentCoroGeneric<false>();
401 }
402
ScheduleNextCoroUnlockRunnablesWaiters()403 void StackfulCoroutineWorker::ScheduleNextCoroUnlockRunnablesWaiters()
404 {
405 // precondition: runnable coros are present
406 auto *currentCtx = GetCurrentContext();
407 auto *nextCtx = PrepareNextRunnableContextForSwitch();
408
409 runnablesLock_.Unlock();
410 waitersLock_.Unlock();
411
412 SwitchCoroutineContext(currentCtx, nextCtx);
413 }
414
ScheduleNextCoroUnlockRunnables()415 void StackfulCoroutineWorker::ScheduleNextCoroUnlockRunnables()
416 {
417 // precondition: runnable coros are present
418 auto *currentCtx = GetCurrentContext();
419 auto *nextCtx = PrepareNextRunnableContextForSwitch();
420
421 runnablesLock_.Unlock();
422
423 SwitchCoroutineContext(currentCtx, nextCtx);
424 }
425
ScheduleNextCoroUnlockNone()426 void StackfulCoroutineWorker::ScheduleNextCoroUnlockNone()
427 {
428 // precondition: runnable coros are present
429 auto *currentCtx = GetCurrentContext();
430 auto *nextCtx = PrepareNextRunnableContextForSwitch();
431 SwitchCoroutineContext(currentCtx, nextCtx);
432 }
433
GetCurrentContext() const434 StackfulCoroutineContext *StackfulCoroutineWorker::GetCurrentContext() const
435 {
436 auto *co = Coroutine::GetCurrent();
437 ASSERT(co != nullptr);
438 return co->GetContext<StackfulCoroutineContext>();
439 }
440
PrepareNextRunnableContextForSwitch()441 StackfulCoroutineContext *StackfulCoroutineWorker::PrepareNextRunnableContextForSwitch()
442 {
443 // precondition: runnable coros are present
444 ASSERT(Coroutine::GetCurrent() != nullptr);
445 auto *il = Coroutine::GetCurrent()->ReleaseImmediateLauncher();
446 auto *nextCtx = il != nullptr ? il->GetContext<StackfulCoroutineContext>()
447 : PopFromRunnableQueue()->GetContext<StackfulCoroutineContext>();
448 nextCtx->RequestResume();
449 Coroutine::SetCurrent(nextCtx->GetCoroutine());
450 return nextCtx;
451 }
452
SwitchCoroutineContext(StackfulCoroutineContext * from,StackfulCoroutineContext * to)453 void StackfulCoroutineWorker::SwitchCoroutineContext(StackfulCoroutineContext *from, StackfulCoroutineContext *to)
454 {
455 ASSERT(from != nullptr);
456 ASSERT(to != nullptr);
457 EnsureCoroutineSwitchEnabled();
458 LOG(DEBUG, COROUTINES) << "Ctx switch: " << from->GetCoroutine()->GetName() << " --> "
459 << to->GetCoroutine()->GetName();
460 stats_.FinishInterval(CoroutineTimeStats::SCH_ALL);
461 OnContextSwitch();
462 stats_.StartInterval(CoroutineTimeStats::CTX_SWITCH);
463 from->SwitchTo(to);
464 if (IsCrossWorkerCall()) {
465 ASSERT(Coroutine::GetCurrent()->GetType() == Coroutine::Type::MUTATOR);
466 // Here this != current coroutine's worker. The rest of this function will be executed CONCURRENTLY!
467 // NOTE(konstanting): need to correctly handle stats_ update here
468 return;
469 }
470 stats_.FinishInterval(CoroutineTimeStats::CTX_SWITCH);
471 stats_.StartInterval(CoroutineTimeStats::SCH_ALL);
472 }
473
FinalizeTerminatedCoros()474 void StackfulCoroutineWorker::FinalizeTerminatedCoros()
475 {
476 while (!finalizationQueue_.empty()) {
477 auto *f = finalizationQueue_.front();
478 finalizationQueue_.pop();
479 coroManager_->DestroyEntrypointfulCoroutine(f);
480 }
481 }
482
UpdateLoadFactor()483 void StackfulCoroutineWorker::UpdateLoadFactor()
484 {
485 loadFactor_ = (loadFactor_ + runnables_.Size()) / 2U;
486 }
487
EnsureCoroutineSwitchEnabled()488 void StackfulCoroutineWorker::EnsureCoroutineSwitchEnabled()
489 {
490 if (IsCoroutineSwitchDisabled()) {
491 LOG(FATAL, COROUTINES) << "ERROR ERROR ERROR >>> Trying to switch coroutines on " << GetName()
492 << " when coroutine switch is DISABLED!!! <<< ERROR ERROR ERROR";
493 UNREACHABLE();
494 }
495 }
496
MigrateTo(StackfulCoroutineWorker * to)497 void StackfulCoroutineWorker::MigrateTo(StackfulCoroutineWorker *to)
498 {
499 os::memory::LockHolder fromLock(runnablesLock_);
500 size_t migrateCount = runnables_.Size() / 2; // migrate up to half of runnable coroutines
501 if (migrateCount == 0) {
502 LOG(DEBUG, COROUTINES) << "The blocked worker does not have runnable coroutines.";
503 return;
504 }
505
506 os::memory::LockHolder toLock(to->runnablesLock_);
507 MigrateCoroutinesImpl(to, migrateCount);
508 }
509
MigrateFrom(StackfulCoroutineWorker * from)510 bool StackfulCoroutineWorker::MigrateFrom(StackfulCoroutineWorker *from)
511 {
512 os::memory::LockHolder toLock(runnablesLock_);
513 if (!IsIdle()) {
514 LOG(DEBUG, COROUTINES) << "The worker is not idle.";
515 return false;
516 }
517
518 os::memory::LockHolder fromLock(from->runnablesLock_);
519 size_t migrateCount = from->runnables_.Size() / 2; // migrate up to half of runnable coroutines
520 if (migrateCount == 0) {
521 LOG(DEBUG, COROUTINES) << "The target worker does not have runnable coroutines.";
522 return true;
523 }
524
525 from->MigrateCoroutinesImpl(this, migrateCount);
526 return true;
527 }
528
MigrateCoroutinesImpl(StackfulCoroutineWorker * to,size_t migrateCount)529 void StackfulCoroutineWorker::MigrateCoroutinesImpl(StackfulCoroutineWorker *to, size_t migrateCount)
530 {
531 using CIterator = PriorityQueue::CIterator;
532 PandaVector<CIterator> migratedCoros;
533 runnables_.VisitCoroutines([&migratedCoros, &migrateCount, this, to](auto begin, auto end) {
534 for (; migrateCount > 0 && begin != end; ++begin) {
535 // not migrate SCHEDULER coroutine and FINALIZER coroutine
536 if ((*begin)->GetType() != Coroutine::Type::MUTATOR) {
537 continue;
538 }
539 auto maskValue = (*begin)->template GetContext<StackfulCoroutineContext>()->GetAffinityMask();
540 std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> affinityBits(maskValue);
541 if (affinityBits.test(to->GetId())) {
542 LOG(DEBUG, COROUTINES) << "migrate coro " << (*begin)->GetCoroutineId() << " from " << GetId() << " to "
543 << to->GetId();
544 to->AddRunnableCoroutine(*(*begin));
545 migratedCoros.push_back(begin);
546 --migrateCount;
547 }
548 }
549 });
550 runnables_.RemoveCoroutines(migratedCoros);
551 }
552
IsIdle()553 bool StackfulCoroutineWorker::IsIdle()
554 {
555 return GetRunnablesCount(Coroutine::Type::MUTATOR) == 0;
556 }
557
MigrateCorosOutwardsIfBlocked()558 void StackfulCoroutineWorker::MigrateCorosOutwardsIfBlocked()
559 {
560 if (!IsPotentiallyBlocked()) {
561 return;
562 }
563 coroManager_->MigrateCoroutinesOutward(this);
564 }
565
IsPotentiallyBlocked()566 bool StackfulCoroutineWorker::IsPotentiallyBlocked()
567 {
568 os::memory::LockHolder lock(runnablesLock_);
569 if (runnables_.Empty() || lastCtxSwitchTimeMillis_ == 0) {
570 return false;
571 }
572 if ((ark::os::time::GetClockTimeInMilli() - lastCtxSwitchTimeMillis_) >= MAX_EXECUTION_DURATION) {
573 LOG(DEBUG, COROUTINES) << "The current coroutine has been executed more than 6s.";
574 return true;
575 }
576 return false;
577 }
578
OnContextSwitch()579 void StackfulCoroutineWorker::OnContextSwitch()
580 {
581 lastCtxSwitchTimeMillis_ = ark::os::time::GetClockTimeInMilli();
582 }
583
584 } // namespace ark
585