• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (c) 2023-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "libpandabase/os/time.h"
17 #include "runtime/include/thread_scopes.h"
18 #include "runtime/coroutines/stackful_coroutine_manager.h"
19 #include "runtime/coroutines/stackful_coroutine.h"
20 #include "runtime/coroutines/stackful_coroutine_worker.h"
21 
22 namespace ark {
23 
StackfulCoroutineWorker(Runtime * runtime,PandaVM * vm,StackfulCoroutineManager * coroManager,ScheduleLoopType type,PandaString name,size_t id)24 StackfulCoroutineWorker::StackfulCoroutineWorker(Runtime *runtime, PandaVM *vm, StackfulCoroutineManager *coroManager,
25                                                  ScheduleLoopType type, PandaString name, size_t id)
26     : CoroutineWorker(runtime, vm),
27       coroManager_(coroManager),
28       threadId_(os::thread::GetCurrentThreadId()),
29       workerCompletionEvent_(coroManager),
30       stats_(name),
31       name_(std::move(name)),
32       id_(id)
33 {
34     ASSERT(id <= stackful_coroutines::MAX_WORKER_ID);
35     LOG(DEBUG, COROUTINES) << "Created a coroutine worker instance: id=" << id_ << " name=" << name_;
36     if (type == ScheduleLoopType::THREAD) {
37         std::thread t(&StackfulCoroutineWorker::ThreadProc, this);
38         os::thread::SetThreadName(t.native_handle(), name_.c_str());
39         t.detach();
40         // will create the schedule loop coroutine in the thread proc in order to set the stack protector correctly
41     } else {
42         scheduleLoopCtx_ = coroManager->CreateNativeCoroutine(GetRuntime(), GetPandaVM(), ScheduleLoopProxy, this,
43                                                               "[fiber_sch] " + GetName(), Coroutine::Type::SCHEDULER,
44                                                               CoroutinePriority::MEDIUM_PRIORITY);
45         AddRunnableCoroutine(scheduleLoopCtx_);
46     }
47 }
48 
AddRunnableCoroutine(Coroutine * newCoro)49 void StackfulCoroutineWorker::AddRunnableCoroutine(Coroutine *newCoro)
50 {
51     ASSERT(newCoro != nullptr);
52     os::memory::LockHolder lock(runnablesLock_);
53     PushToRunnableQueue(newCoro, newCoro->GetPriority());
54     RegisterIncomingActiveCoroutine(newCoro);
55 }
56 
AddRunningCoroutine(Coroutine * newCoro)57 void StackfulCoroutineWorker::AddRunningCoroutine(Coroutine *newCoro)
58 {
59     ASSERT(newCoro != nullptr);
60     RegisterIncomingActiveCoroutine(newCoro);
61 }
62 
AddCreatedCoroutineAndSwitchToIt(Coroutine * newCoro)63 void StackfulCoroutineWorker::AddCreatedCoroutineAndSwitchToIt(Coroutine *newCoro)
64 {
65     // precondition: called within the current worker, no cross-worker calls allowed
66     ASSERT(GetCurrentContext()->GetWorker() == this);
67 
68     auto *coro = Coroutine::GetCurrent();
69     ScopedNativeCodeThread n(coro);
70     coro->RequestSuspend(false);
71 
72     auto *currentCtx = GetCurrentContext();
73     auto *nextCtx = newCoro->GetContext<StackfulCoroutineContext>();
74     nextCtx->RequestResume();
75     Coroutine::SetCurrent(newCoro);
76     RegisterIncomingActiveCoroutine(newCoro);
77 
78     SwitchCoroutineContext(currentCtx, nextCtx);
79 
80     // process finalization queue once this coro gets scheduled again
81     FinalizeTerminatedCoros();
82 }
83 
WaitForEvent(CoroutineEvent * awaitee)84 void StackfulCoroutineWorker::WaitForEvent(CoroutineEvent *awaitee)
85 {
86     // precondition: this method is not called by the schedule loop coroutine
87 
88     Coroutine *waiter = Coroutine::GetCurrent();
89     ASSERT(GetCurrentContext()->GetWorker() == this);
90     ASSERT(awaitee != nullptr);
91     ASSERT(!awaitee->Happened());
92     ASSERT(waiter->IsInNativeCode());
93 
94     waitersLock_.Lock();
95     awaitee->Unlock();
96     LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::AddWaitingCoroutine: " << waiter->GetName() << " AWAITS";
97     [[maybe_unused]] auto [_, inserted] = waiters_.insert({awaitee, waiter});
98     ASSERT(inserted);
99 
100     runnablesLock_.Lock();
101     ASSERT(RunnableCoroutinesExist());
102     // will unlock waiters_lock_ and switch ctx.
103     // NB! If migration on await is enabled, current coro can migrate to another worker, so
104     // IsCrossWorkerCall() will become true after resume!
105     BlockCurrentCoroAndScheduleNext();
106 }
107 
UnblockWaiters(CoroutineEvent * blocker)108 void StackfulCoroutineWorker::UnblockWaiters(CoroutineEvent *blocker)
109 {
110     bool canMigrateAwait = coroManager_->IsMigrateAwakenedCorosEnabled() && !IsMainWorker() && !InExclusiveMode();
111     Coroutine *unblockedCoro = nullptr;
112     {
113         os::memory::LockHolder lockW(waitersLock_);
114         auto w = waiters_.find(blocker);
115         if (w != waiters_.end()) {
116             unblockedCoro = w->second;
117             waiters_.erase(w);
118             if (!canMigrateAwait) {
119                 os::memory::LockHolder lockR(runnablesLock_);
120                 unblockedCoro->RequestUnblock();
121                 PushToRunnableQueue(unblockedCoro, unblockedCoro->GetPriority());
122             } else {
123                 // (wangyuzhong,#24880): in case of IsMigrateAwakenedCorosEnabled() we need to correctly issue the
124                 // external scheduler request from the correct worker. Here the coroutine becomes Active on one
125                 // worker(which causes the request to be sent) and then gets potentially transferred to another worker.
126                 unblockedCoro->RequestUnblock();
127             }
128         }
129     }
130     if (unblockedCoro == nullptr) {
131         LOG(DEBUG, COROUTINES) << "The coroutine is nullptr.";
132         return;
133     }
134     if (canMigrateAwait) {
135         coroManager_->MigrateAwakenedCoro(unblockedCoro);
136     }
137     if (IsDisabledForCrossWorkersLaunch()) {
138         workerCompletionEvent_.Happen();
139     }
140 }
141 
RequestFinalization(Coroutine * finalizee)142 void StackfulCoroutineWorker::RequestFinalization(Coroutine *finalizee)
143 {
144     // precondition: current coro and finalizee belong to the current worker
145     ASSERT(finalizee->GetWorker() == this);
146     ASSERT(GetCurrentContext()->GetWorker() == this);
147 
148     finalizationQueue_.push(finalizee);
149     // finalizee will never be scheduled again
150     ScheduleNextCoroUnlockNone();
151 }
152 
RequestSchedule()153 void StackfulCoroutineWorker::RequestSchedule()
154 {
155     RequestScheduleImpl();
156 }
157 
FinalizeFiberScheduleLoop()158 void StackfulCoroutineWorker::FinalizeFiberScheduleLoop()
159 {
160     ASSERT(GetCurrentContext()->GetWorker() == this);
161 
162     // part of MAIN finalization sequence
163     if (RunnableCoroutinesExist()) {
164         // the schedule loop is still runnable
165         ASSERT(scheduleLoopCtx_->HasNativeEntrypoint());
166         runnablesLock_.Lock();
167         // sch loop only
168         ASSERT(runnables_.Size() == 1);
169         SuspendCurrentCoroAndScheduleNext();
170         ASSERT(!IsCrossWorkerCall());
171     }
172 }
173 
CompleteAllAffinedCoroutines()174 void StackfulCoroutineWorker::CompleteAllAffinedCoroutines()
175 {
176     ASSERT_NATIVE_CODE();
177     ASSERT(GetCurrentContext()->GetWorker() == this);
178     ASSERT(IsDisabledForCrossWorkersLaunch());
179 
180     // CC-OFFNXT(G.FMT.04-CPP): project code style
181     auto lock = [](auto &&...locks) { ([&]() NO_THREAD_SAFETY_ANALYSIS { locks.Lock(); }(), ...); };
182     // CC-OFFNXT(G.FMT.04-CPP): project code style
183     auto unlock = [](auto &&...locks) { ([&]() NO_THREAD_SAFETY_ANALYSIS { locks.Unlock(); }(), ...); };
184 
185     // CC-OFFNXT(G.CTL.03): false positive
186     while (true) {
187         lock(waitersLock_, runnablesLock_);
188         if (runnables_.Size() > 1) {
189             unlock(waitersLock_, runnablesLock_);
190             coroManager_->Schedule();
191         } else if (!waiters_.empty()) {
192             workerCompletionEvent_.SetNotHappened();
193             workerCompletionEvent_.Lock();
194             unlock(waitersLock_, runnablesLock_);
195             coroManager_->Await(&workerCompletionEvent_);
196         } else {
197             unlock(waitersLock_, runnablesLock_);
198             break;
199         }
200     }
201 }
202 
DisableCoroutineSwitch()203 void StackfulCoroutineWorker::DisableCoroutineSwitch()
204 {
205     ++disableCoroSwitchCounter_;
206     LOG(DEBUG, COROUTINES) << "Coroutine switch on " << GetName()
207                            << " has been disabled! Recursive ctr = " << disableCoroSwitchCounter_;
208 }
209 
EnableCoroutineSwitch()210 void StackfulCoroutineWorker::EnableCoroutineSwitch()
211 {
212     ASSERT(IsCoroutineSwitchDisabled());
213     --disableCoroSwitchCounter_;
214     LOG(DEBUG, COROUTINES) << "Coroutine switch on " << GetName()
215                            << " has been enabled! Recursive ctr = " << disableCoroSwitchCounter_;
216 }
217 
IsCoroutineSwitchDisabled()218 bool StackfulCoroutineWorker::IsCoroutineSwitchDisabled()
219 {
220     return disableCoroSwitchCounter_ > 0;
221 }
222 
223 #ifndef NDEBUG
PrintRunnables(const PandaString & requester)224 void StackfulCoroutineWorker::PrintRunnables(const PandaString &requester)
225 {
226     os::memory::LockHolder lock(runnablesLock_);
227     LOG(DEBUG, COROUTINES) << "[" << requester << "] ";
228     runnables_.IterateOverCoroutines([](Coroutine *co) { LOG(DEBUG, COROUTINES) << co->GetName() << " <"; });
229     LOG(DEBUG, COROUTINES) << "X";
230 }
231 #endif
232 
GetRunnablesCount(Coroutine::Type type)233 size_t StackfulCoroutineWorker::GetRunnablesCount(Coroutine::Type type)
234 {
235     os::memory::LockHolder lock(runnablesLock_);
236     size_t runnablesCount = 0;
237     runnables_.IterateOverCoroutines([type, &runnablesCount](Coroutine *coro) {
238         if (coro->GetType() == type) {
239             runnablesCount++;
240         }
241     });
242     return runnablesCount;
243 }
244 
ThreadProc()245 void StackfulCoroutineWorker::ThreadProc()
246 {
247     threadId_ = os::thread::GetCurrentThreadId();
248     scheduleLoopCtx_ =
249         coroManager_->CreateEntrypointlessCoroutine(GetRuntime(), GetPandaVM(), false, "[thr_sch] " + GetName(),
250                                                     Coroutine::Type::SCHEDULER, CoroutinePriority::MEDIUM_PRIORITY);
251     Coroutine::SetCurrent(scheduleLoopCtx_);
252     scheduleLoopCtx_->RequestResume();
253     AddRunningCoroutine(scheduleLoopCtx_);
254     scheduleLoopCtx_->NativeCodeBegin();
255     coroManager_->OnWorkerStartup(this);
256 
257     // profiling: start interval here, end in ctxswitch
258     stats_.StartInterval(CoroutineTimeStats::SCH_ALL);
259     ScheduleLoopBody();
260 
261     coroManager_->DestroyEntrypointlessCoroutine(scheduleLoopCtx_);
262     ASSERT(threadId_ == os::thread::GetCurrentThreadId());
263     coroManager_->OnWorkerShutdown(this);
264 }
265 
ScheduleLoop()266 void StackfulCoroutineWorker::ScheduleLoop()
267 {
268     LOG(DEBUG, COROUTINES) << "[" << GetName() << "] Schedule loop called!";
269     ScheduleLoopBody();
270 }
271 
ScheduleLoopBody()272 void StackfulCoroutineWorker::ScheduleLoopBody()
273 {
274     while (IsActive()) {
275         RequestScheduleImpl();
276         os::memory::LockHolder lkRunnables(runnablesLock_);
277         UpdateLoadFactor();
278     }
279 }
280 
ScheduleLoopProxy(void * worker)281 void StackfulCoroutineWorker::ScheduleLoopProxy(void *worker)
282 {
283     static_cast<StackfulCoroutineWorker *>(worker)->ScheduleLoop();
284 }
285 
PushToRunnableQueue(Coroutine * co,CoroutinePriority priority)286 void StackfulCoroutineWorker::PushToRunnableQueue(Coroutine *co, CoroutinePriority priority)
287 {
288     runnables_.Push(co, priority);
289     UpdateLoadFactor();
290     runnablesCv_.Signal();
291 }
292 
PopFromRunnableQueue()293 Coroutine *StackfulCoroutineWorker::PopFromRunnableQueue()
294 {
295     os::memory::LockHolder lock(runnablesLock_);
296     ASSERT(!runnables_.Empty());
297     auto [co, _] = runnables_.Pop();
298     UpdateLoadFactor();
299     return co;
300 }
301 
RunnableCoroutinesExist() const302 bool StackfulCoroutineWorker::RunnableCoroutinesExist() const
303 {
304     os::memory::LockHolder lock(runnablesLock_);
305     return !runnables_.Empty();
306 }
307 
WaitForRunnables()308 void StackfulCoroutineWorker::WaitForRunnables()
309 {
310     // NOTE(konstanting): in case of work stealing, use timed wait and try periodically to steal some runnables
311     while (!RunnableCoroutinesExist() && IsActive()) {
312         // profiling: no need to profile the SLEEPING state, closing the interval
313         stats_.FinishInterval(CoroutineTimeStats::SCH_ALL);
314         runnablesCv_.Wait(
315             &runnablesLock_);  // or timed wait? we may miss the signal in some cases (e.g. IsActive() change)...
316         // profiling: reopening the interval after the sleep
317         stats_.StartInterval(CoroutineTimeStats::SCH_ALL);
318         if (!RunnableCoroutinesExist() && IsActive()) {
319             LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::WaitForRunnables: spurious wakeup!";
320         } else {
321             LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::WaitForRunnables: wakeup!";
322         }
323     }
324 }
325 
RegisterIncomingActiveCoroutine(Coroutine * newCoro)326 void StackfulCoroutineWorker::RegisterIncomingActiveCoroutine(Coroutine *newCoro)
327 {
328     ASSERT(newCoro != nullptr);
329     newCoro->SetWorker(this);
330     auto canMigrate = newCoro->GetContext<StackfulCoroutineContext>()->IsMigrationAllowed();
331     newCoro->LinkToExternalHolder(IsMainWorker() && !canMigrate);
332 }
333 
RequestScheduleImpl()334 void StackfulCoroutineWorker::RequestScheduleImpl()
335 {
336     // precondition: called within the current worker, no cross-worker calls allowed
337     ASSERT(GetCurrentContext()->GetWorker() == this);
338     ASSERT_NATIVE_CODE();
339     runnablesLock_.Lock();
340 
341     // NOTE(konstanting): implement coro migration, work stealing, etc.
342     if (RunnableCoroutinesExist()) {
343         SuspendCurrentCoroAndScheduleNext();
344         ASSERT(!IsCrossWorkerCall() || (Coroutine::GetCurrent()->GetType() == Coroutine::Type::MUTATOR));
345     } else {
346         coroManager_->TriggerMigration();
347         LOG(DEBUG, COROUTINES) << "StackfulCoroutineWorker::RequestSchedule: No runnables, starting to wait...";
348         WaitForRunnables();
349         runnablesLock_.Unlock();
350     }
351 }
352 
BlockCurrentCoroAndScheduleNext()353 void StackfulCoroutineWorker::BlockCurrentCoroAndScheduleNext()
354 {
355     // precondition: current coro is already added to the waiters_
356     BlockCurrentCoro();
357     // will transfer control to another coro... Can change current coroutine's host worker!
358     ScheduleNextCoroUnlockRunnablesWaiters();
359     // ...this coro has been scheduled again: process finalization queue
360     if (!IsCrossWorkerCall()) {
361         FinalizeTerminatedCoros();
362     } else {
363         // migration happened!
364     }
365 }
366 
SuspendCurrentCoroAndScheduleNext()367 void StackfulCoroutineWorker::SuspendCurrentCoroAndScheduleNext()
368 {
369     // will transfer control to another coro... Can change current coroutine's host worker!
370     SuspendCurrentCoro();
371     // will transfer control to another coro...
372     ScheduleNextCoroUnlockRunnables();
373     // ...this coro has been scheduled again: process finalization queue
374     if (!IsCrossWorkerCall()) {
375         FinalizeTerminatedCoros();
376     } else {
377         // migration happened!
378     }
379 }
380 
381 template <bool SUSPEND_AS_BLOCKED>
SuspendCurrentCoroGeneric()382 void StackfulCoroutineWorker::SuspendCurrentCoroGeneric()
383 {
384     auto *currentCoro = Coroutine::GetCurrent();
385     ASSERT(currentCoro != nullptr);
386     currentCoro->RequestSuspend(SUSPEND_AS_BLOCKED);
387     if constexpr (!SUSPEND_AS_BLOCKED) {
388         os::memory::LockHolder lock(runnablesLock_);
389         PushToRunnableQueue(currentCoro, currentCoro->GetPriority());
390     }
391 }
392 
BlockCurrentCoro()393 void StackfulCoroutineWorker::BlockCurrentCoro()
394 {
395     SuspendCurrentCoroGeneric<true>();
396 }
397 
SuspendCurrentCoro()398 void StackfulCoroutineWorker::SuspendCurrentCoro()
399 {
400     SuspendCurrentCoroGeneric<false>();
401 }
402 
ScheduleNextCoroUnlockRunnablesWaiters()403 void StackfulCoroutineWorker::ScheduleNextCoroUnlockRunnablesWaiters()
404 {
405     // precondition: runnable coros are present
406     auto *currentCtx = GetCurrentContext();
407     auto *nextCtx = PrepareNextRunnableContextForSwitch();
408 
409     runnablesLock_.Unlock();
410     waitersLock_.Unlock();
411 
412     SwitchCoroutineContext(currentCtx, nextCtx);
413 }
414 
ScheduleNextCoroUnlockRunnables()415 void StackfulCoroutineWorker::ScheduleNextCoroUnlockRunnables()
416 {
417     // precondition: runnable coros are present
418     auto *currentCtx = GetCurrentContext();
419     auto *nextCtx = PrepareNextRunnableContextForSwitch();
420 
421     runnablesLock_.Unlock();
422 
423     SwitchCoroutineContext(currentCtx, nextCtx);
424 }
425 
ScheduleNextCoroUnlockNone()426 void StackfulCoroutineWorker::ScheduleNextCoroUnlockNone()
427 {
428     // precondition: runnable coros are present
429     auto *currentCtx = GetCurrentContext();
430     auto *nextCtx = PrepareNextRunnableContextForSwitch();
431     SwitchCoroutineContext(currentCtx, nextCtx);
432 }
433 
GetCurrentContext() const434 StackfulCoroutineContext *StackfulCoroutineWorker::GetCurrentContext() const
435 {
436     auto *co = Coroutine::GetCurrent();
437     ASSERT(co != nullptr);
438     return co->GetContext<StackfulCoroutineContext>();
439 }
440 
PrepareNextRunnableContextForSwitch()441 StackfulCoroutineContext *StackfulCoroutineWorker::PrepareNextRunnableContextForSwitch()
442 {
443     // precondition: runnable coros are present
444     ASSERT(Coroutine::GetCurrent() != nullptr);
445     auto *il = Coroutine::GetCurrent()->ReleaseImmediateLauncher();
446     auto *nextCtx = il != nullptr ? il->GetContext<StackfulCoroutineContext>()
447                                   : PopFromRunnableQueue()->GetContext<StackfulCoroutineContext>();
448     nextCtx->RequestResume();
449     Coroutine::SetCurrent(nextCtx->GetCoroutine());
450     return nextCtx;
451 }
452 
SwitchCoroutineContext(StackfulCoroutineContext * from,StackfulCoroutineContext * to)453 void StackfulCoroutineWorker::SwitchCoroutineContext(StackfulCoroutineContext *from, StackfulCoroutineContext *to)
454 {
455     ASSERT(from != nullptr);
456     ASSERT(to != nullptr);
457     EnsureCoroutineSwitchEnabled();
458     LOG(DEBUG, COROUTINES) << "Ctx switch: " << from->GetCoroutine()->GetName() << " --> "
459                            << to->GetCoroutine()->GetName();
460     stats_.FinishInterval(CoroutineTimeStats::SCH_ALL);
461     OnContextSwitch();
462     stats_.StartInterval(CoroutineTimeStats::CTX_SWITCH);
463     from->SwitchTo(to);
464     if (IsCrossWorkerCall()) {
465         ASSERT(Coroutine::GetCurrent()->GetType() == Coroutine::Type::MUTATOR);
466         // Here this != current coroutine's worker. The rest of this function will be executed CONCURRENTLY!
467         // NOTE(konstanting): need to correctly handle stats_ update here
468         return;
469     }
470     stats_.FinishInterval(CoroutineTimeStats::CTX_SWITCH);
471     stats_.StartInterval(CoroutineTimeStats::SCH_ALL);
472 }
473 
FinalizeTerminatedCoros()474 void StackfulCoroutineWorker::FinalizeTerminatedCoros()
475 {
476     while (!finalizationQueue_.empty()) {
477         auto *f = finalizationQueue_.front();
478         finalizationQueue_.pop();
479         coroManager_->DestroyEntrypointfulCoroutine(f);
480     }
481 }
482 
UpdateLoadFactor()483 void StackfulCoroutineWorker::UpdateLoadFactor()
484 {
485     loadFactor_ = (loadFactor_ + runnables_.Size()) / 2U;
486 }
487 
EnsureCoroutineSwitchEnabled()488 void StackfulCoroutineWorker::EnsureCoroutineSwitchEnabled()
489 {
490     if (IsCoroutineSwitchDisabled()) {
491         LOG(FATAL, COROUTINES) << "ERROR ERROR ERROR >>> Trying to switch coroutines on " << GetName()
492                                << " when coroutine switch is DISABLED!!! <<< ERROR ERROR ERROR";
493         UNREACHABLE();
494     }
495 }
496 
MigrateTo(StackfulCoroutineWorker * to)497 void StackfulCoroutineWorker::MigrateTo(StackfulCoroutineWorker *to)
498 {
499     os::memory::LockHolder fromLock(runnablesLock_);
500     size_t migrateCount = runnables_.Size() / 2;  // migrate up to half of runnable coroutines
501     if (migrateCount == 0) {
502         LOG(DEBUG, COROUTINES) << "The blocked worker does not have runnable coroutines.";
503         return;
504     }
505 
506     os::memory::LockHolder toLock(to->runnablesLock_);
507     MigrateCoroutinesImpl(to, migrateCount);
508 }
509 
MigrateFrom(StackfulCoroutineWorker * from)510 bool StackfulCoroutineWorker::MigrateFrom(StackfulCoroutineWorker *from)
511 {
512     os::memory::LockHolder toLock(runnablesLock_);
513     if (!IsIdle()) {
514         LOG(DEBUG, COROUTINES) << "The worker is not idle.";
515         return false;
516     }
517 
518     os::memory::LockHolder fromLock(from->runnablesLock_);
519     size_t migrateCount = from->runnables_.Size() / 2;  // migrate up to half of runnable coroutines
520     if (migrateCount == 0) {
521         LOG(DEBUG, COROUTINES) << "The target worker does not have runnable coroutines.";
522         return true;
523     }
524 
525     from->MigrateCoroutinesImpl(this, migrateCount);
526     return true;
527 }
528 
MigrateCoroutinesImpl(StackfulCoroutineWorker * to,size_t migrateCount)529 void StackfulCoroutineWorker::MigrateCoroutinesImpl(StackfulCoroutineWorker *to, size_t migrateCount)
530 {
531     using CIterator = PriorityQueue::CIterator;
532     PandaVector<CIterator> migratedCoros;
533     runnables_.VisitCoroutines([&migratedCoros, &migrateCount, this, to](auto begin, auto end) {
534         for (; migrateCount > 0 && begin != end; ++begin) {
535             // not migrate SCHEDULER coroutine and FINALIZER coroutine
536             if ((*begin)->GetType() != Coroutine::Type::MUTATOR) {
537                 continue;
538             }
539             auto maskValue = (*begin)->template GetContext<StackfulCoroutineContext>()->GetAffinityMask();
540             std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> affinityBits(maskValue);
541             if (affinityBits.test(to->GetId())) {
542                 LOG(DEBUG, COROUTINES) << "migrate coro " << (*begin)->GetCoroutineId() << " from " << GetId() << " to "
543                                        << to->GetId();
544                 to->AddRunnableCoroutine(*(*begin));
545                 migratedCoros.push_back(begin);
546                 --migrateCount;
547             }
548         }
549     });
550     runnables_.RemoveCoroutines(migratedCoros);
551 }
552 
IsIdle()553 bool StackfulCoroutineWorker::IsIdle()
554 {
555     return GetRunnablesCount(Coroutine::Type::MUTATOR) == 0;
556 }
557 
MigrateCorosOutwardsIfBlocked()558 void StackfulCoroutineWorker::MigrateCorosOutwardsIfBlocked()
559 {
560     if (!IsPotentiallyBlocked()) {
561         return;
562     }
563     coroManager_->MigrateCoroutinesOutward(this);
564 }
565 
IsPotentiallyBlocked()566 bool StackfulCoroutineWorker::IsPotentiallyBlocked()
567 {
568     os::memory::LockHolder lock(runnablesLock_);
569     if (runnables_.Empty() || lastCtxSwitchTimeMillis_ == 0) {
570         return false;
571     }
572     if ((ark::os::time::GetClockTimeInMilli() - lastCtxSwitchTimeMillis_) >= MAX_EXECUTION_DURATION) {
573         LOG(DEBUG, COROUTINES) << "The current coroutine has been executed more than 6s.";
574         return true;
575     }
576     return false;
577 }
578 
OnContextSwitch()579 void StackfulCoroutineWorker::OnContextSwitch()
580 {
581     lastCtxSwitchTimeMillis_ = ark::os::time::GetClockTimeInMilli();
582 }
583 
584 }  // namespace ark
585