• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (c) 2022-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <algorithm>
17 #include <limits>
18 #include "coroutines/stackful_common.h"
19 #include "libpandabase/macros.h"
20 #include "libpandabase/os/time.h"
21 #include "runtime/coroutines/coroutine.h"
22 #include "runtime/coroutines/stackful_coroutine.h"
23 #include "runtime/coroutines/stackful_coroutine_manager.h"
24 #include "runtime/include/panda_vm.h"
25 #include "runtime/include/runtime.h"
26 #include "runtime/include/runtime_notification.h"
27 #include "runtime/include/thread_scopes.h"
28 
29 namespace ark {
30 
AllocCoroutineStack()31 uint8_t *StackfulCoroutineManager::AllocCoroutineStack()
32 {
33     Pool stackPool = PoolManager::GetMmapMemPool()->AllocPool<OSPagesAllocPolicy::NO_POLICY>(
34         coroStackSizeBytes_, SpaceType::SPACE_TYPE_NATIVE_STACKS, AllocatorType::NATIVE_STACKS_ALLOCATOR);
35     return static_cast<uint8_t *>(stackPool.GetMem());
36 }
37 
FreeCoroutineStack(uint8_t * stack)38 void StackfulCoroutineManager::FreeCoroutineStack(uint8_t *stack)
39 {
40     if (stack != nullptr) {
41         PoolManager::GetMmapMemPool()->FreePool(stack, coroStackSizeBytes_);
42     }
43 }
44 
CreateMainCoroAndWorkers(size_t howMany,Runtime * runtime,PandaVM * vm)45 void StackfulCoroutineManager::CreateMainCoroAndWorkers(size_t howMany, Runtime *runtime, PandaVM *vm)
46 {
47     auto *wMain = CreateWorker(runtime, vm, StackfulCoroutineWorker::ScheduleLoopType::FIBER, "[main] worker ");
48     ASSERT(wMain->GetId() == stackful_coroutines::MAIN_WORKER_ID);
49 
50     auto *mainCo = CreateMainCoroutine(runtime, vm);
51     wMain->AddRunningCoroutine(mainCo);
52     OnWorkerStartupImpl(wMain);
53 
54     CreateWorkersImpl(howMany, runtime, vm);
55 }
56 
CreateWorkers(size_t howMany,Runtime * runtime,PandaVM * vm)57 void StackfulCoroutineManager::CreateWorkers(size_t howMany, Runtime *runtime, PandaVM *vm)
58 {
59     os::memory::LockHolder lh(workersLock_);
60     CreateWorkersImpl(howMany, runtime, vm);
61 }
62 
FinalizeWorkers(size_t howMany,Runtime * runtime,PandaVM * vm)63 void StackfulCoroutineManager::FinalizeWorkers(size_t howMany, Runtime *runtime, PandaVM *vm)
64 {
65     struct EntrypointParam {
66         explicit EntrypointParam(size_t wCount, CoroutineManager *coroMan)
67             : wCount_(wCount), workerFinalizationEvent(coroMan)
68         {
69         }
70 
71         // NOLINTBEGIN(misc-non-private-member-variables-in-classes)
72         const size_t wCount_;
73         GenericEvent workerFinalizationEvent;
74         std::atomic<uint32_t> finalizedWorkersCount = 0;
75         // NOLINTEND(misc-non-private-member-variables-in-classes)
76     };
77 
78     auto wCountBeforeFinalization = GetActiveWorkersCount();
79     ASSERT(wCountBeforeFinalization > howMany);
80 
81     auto entrypointParam = EntrypointParam(howMany, this);
82     auto coroEntryPoint = [](void *param) {
83         auto *finalizee = Coroutine::GetCurrent()->GetContext<StackfulCoroutineContext>()->GetWorker();
84         finalizee->MigrateCoroutines();
85         finalizee->CompleteAllAffinedCoroutines();
86         finalizee->SetActive(false);
87         auto *entryParams = reinterpret_cast<EntrypointParam *>(param);
88         // Atomic with relaxed order reason: synchronization is not required
89         if (entryParams->finalizedWorkersCount.fetch_add(1, std::memory_order_relaxed) == entryParams->wCount_ - 1) {
90             entryParams->workerFinalizationEvent.Happen();
91         }
92     };
93 
94     for (auto i = 0U; i < howMany; i++) {
95         auto *finWorker = ChooseWorkerForFinalization();
96         auto *co = CreateNativeCoroutine(runtime, vm, coroEntryPoint, &entrypointParam, "[finalize coro] ",
97                                          Coroutine::Type::FINALIZER, CoroutinePriority::CRITICAL_PRIORITY);
98         finWorker->AddRunnableCoroutine(co);
99     }
100     entrypointParam.workerFinalizationEvent.Lock();
101 
102     Await(&entrypointParam.workerFinalizationEvent);
103 
104     os::memory::LockHolder lh(workersLock_);
105     while (activeWorkersCount_ != wCountBeforeFinalization - howMany) {
106         workersCv_.Wait(&workersLock_);
107     }
108     ASSERT(activeWorkersCount_ > 0);
109 }
110 
ChooseWorkerForFinalization()111 StackfulCoroutineWorker *StackfulCoroutineManager::ChooseWorkerForFinalization()
112 {
113     os::memory::LockHolder lh(workersLock_);
114     auto finWorkerIt = std::find_if(workers_.begin(), workers_.end(), [](auto &&worker) {
115         return !worker->IsMainWorker() && !worker->IsDisabledForCrossWorkersLaunch() && !worker->InExclusiveMode();
116     });
117     ASSERT(finWorkerIt != workers_.end());
118     (*finWorkerIt)->DisableForCrossWorkersLaunch();
119     return *finWorkerIt;
120 }
121 
CreateWorkersImpl(size_t howMany,Runtime * runtime,PandaVM * vm)122 void StackfulCoroutineManager::CreateWorkersImpl(size_t howMany, Runtime *runtime, PandaVM *vm)
123 {
124     if (howMany == 0) {
125         LOG(DEBUG, COROUTINES)
126             << "StackfulCoroutineManager::CreateWorkersImpl():creation of zero workers requested,skipping...";
127         return;
128     }
129     auto wCountBeforeCreation = activeWorkersCount_;
130     for (uint32_t i = 0; i < howMany; ++i) {
131         CreateWorker(runtime, vm, StackfulCoroutineWorker::ScheduleLoopType::THREAD, "worker ");
132     }
133     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::CreateWorkers(): waiting for workers startup";
134     while (activeWorkersCount_ != howMany + wCountBeforeCreation) {
135         // NOTE(konstanting, #IAD5MH): need timed wait?..
136         workersCv_.Wait(&workersLock_);
137     }
138 }
139 
CreateWorker(Runtime * runtime,PandaVM * vm,StackfulCoroutineWorker::ScheduleLoopType wType,PandaString workerName)140 StackfulCoroutineWorker *StackfulCoroutineManager::CreateWorker(Runtime *runtime, PandaVM *vm,
141                                                                 StackfulCoroutineWorker::ScheduleLoopType wType,
142                                                                 PandaString workerName)
143 {
144     auto allocator = runtime->GetInternalAllocator();
145     auto workerId = AllocateWorkerId();
146     workerName += ToPandaString(workerId);
147     auto *worker = allocator->New<StackfulCoroutineWorker>(runtime, vm, this, wType, std::move(workerName), workerId);
148     ASSERT(worker != nullptr);
149     if (stats_.IsEnabled()) {
150         worker->GetPerfStats().Enable();
151     }
152     return worker;
153 }
154 
OnWorkerShutdown(StackfulCoroutineWorker * worker)155 void StackfulCoroutineManager::OnWorkerShutdown(StackfulCoroutineWorker *worker)
156 {
157     os::memory::LockHolder lock(workersLock_);
158     auto workerIt = std::find_if(workers_.begin(), workers_.end(), [worker](auto &&w) { return w == worker; });
159     workers_.erase(workerIt);
160     // We may have a problem related to the coroutine affinity mask aliasing (The ABA Problem) #23715:
161     // 1. Finalizing worker was available for the coroutine (the coroutine had a bit set in the affinity mask)
162     // 2. New specific worker (e.g. EAWorker) was created with the same Id
163     // 3. This worker becomes available for the coroutine, but it was not initially expected
164     ReleaseWorkerId(worker->GetId());
165     --activeWorkersCount_;
166     auto &workerStats = worker->GetPerfStats();
167     workerStats.Disable();
168     finalizedWorkerStats_.emplace_back(std::move(workerStats));
169     Runtime::GetCurrent()->GetInternalAllocator()->Delete(worker);
170     workersCv_.Signal();
171     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::OnWorkerShutdown(): COMPLETED, workers left = "
172                            << activeWorkersCount_;
173 }
174 
OnWorkerStartup(StackfulCoroutineWorker * worker)175 void StackfulCoroutineManager::OnWorkerStartup(StackfulCoroutineWorker *worker)
176 {
177     os::memory::LockHolder lock(workersLock_);
178     OnWorkerStartupImpl(worker);
179 }
180 
OnWorkerStartupImpl(StackfulCoroutineWorker * worker)181 void StackfulCoroutineManager::OnWorkerStartupImpl(StackfulCoroutineWorker *worker)
182 {
183     workers_.push_back(worker);
184     ++activeWorkersCount_;
185     workersCv_.Signal();
186     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::OnWorkerStartup(): COMPLETED, active workers = "
187                            << activeWorkersCount_;
188 }
189 
InitializeWorkerIdAllocator()190 void StackfulCoroutineManager::InitializeWorkerIdAllocator()
191 {
192     os::memory::LockHolder lh(workerIdLock_);
193     for (auto id = stackful_coroutines::MAIN_WORKER_ID; id != stackful_coroutines::MAX_WORKERS_COUNT; ++id) {
194         freeWorkerIds_.push_back(id);
195     }
196 }
197 
AllocateWorkerId()198 uint32_t StackfulCoroutineManager::AllocateWorkerId()
199 {
200     os::memory::LockHolder lh(workerIdLock_);
201     ASSERT(!freeWorkerIds_.empty());
202     auto workerId = freeWorkerIds_.front();
203     freeWorkerIds_.pop_front();
204     return workerId;
205 }
206 
ReleaseWorkerId(uint32_t workerId)207 void StackfulCoroutineManager::ReleaseWorkerId(uint32_t workerId)
208 {
209     os::memory::LockHolder lh(workerIdLock_);
210     freeWorkerIds_.push_back(workerId);
211     ASSERT(freeWorkerIds_.size() <= stackful_coroutines::MAX_WORKERS_COUNT);
212 }
213 
DisableCoroutineSwitch()214 void StackfulCoroutineManager::DisableCoroutineSwitch()
215 {
216     GetCurrentWorker()->DisableCoroutineSwitch();
217 }
218 
EnableCoroutineSwitch()219 void StackfulCoroutineManager::EnableCoroutineSwitch()
220 {
221     GetCurrentWorker()->EnableCoroutineSwitch();
222 }
223 
IsCoroutineSwitchDisabled()224 bool StackfulCoroutineManager::IsCoroutineSwitchDisabled()
225 {
226     return GetCurrentWorker()->IsCoroutineSwitchDisabled();
227 }
228 
Initialize(CoroutineManagerConfig config,Runtime * runtime,PandaVM * vm)229 void StackfulCoroutineManager::Initialize(CoroutineManagerConfig config, Runtime *runtime, PandaVM *vm)
230 {
231     // enable stats collection if needed
232     if (config.enablePerfStats) {
233         stats_.Enable();
234     }
235     ScopedCoroutineStats s(&stats_, CoroutineTimeStats::INIT);
236     // set feature flags
237     enableDrainQueueIface_ = config.enableDrainQueueIface;
238     enableMigration_ = config.enableMigration;
239     migrateAwakenedCoros_ = config.migrateAwakenedCoros;
240 
241     // set limits
242     coroStackSizeBytes_ = Runtime::GetCurrent()->GetOptions().GetCoroutineStackSizePages() * os::mem::GetPageSize();
243     if (coroStackSizeBytes_ != AlignUp(coroStackSizeBytes_, PANDA_POOL_ALIGNMENT_IN_BYTES)) {
244         size_t alignmentPages = PANDA_POOL_ALIGNMENT_IN_BYTES / os::mem::GetPageSize();
245         LOG(FATAL, COROUTINES) << "Coroutine stack size should be >= " << alignmentPages
246                                << " pages and should be aligned to " << alignmentPages << "-page boundary!";
247     }
248     size_t coroStackAreaSizeBytes = Runtime::GetCurrent()->GetOptions().GetCoroutinesStackMemLimit();
249     coroutineCountLimit_ = coroStackAreaSizeBytes / coroStackSizeBytes_;
250 
251     CalculateWorkerLimits(config, exclusiveWorkersLimit_, commonWorkersCount_);
252     ASSERT(commonWorkersCount_ + exclusiveWorkersLimit_ <= stackful_coroutines::MAX_WORKERS_COUNT);
253     InitializeWorkerIdAllocator();
254     {
255         os::memory::LockHolder lock(workersLock_);
256         CreateMainCoroAndWorkers(commonWorkersCount_ - 1, runtime, vm);  // 1 is for MAIN here
257         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager(): successfully created and activated " << workers_.size()
258                                << " coroutine workers";
259         programCompletionEvent_ = Runtime::GetCurrent()->GetInternalAllocator()->New<GenericEvent>(this);
260     }
261     if (enableMigration_) {
262         StartManagerThread();
263     }
264 }
265 
Finalize()266 void StackfulCoroutineManager::Finalize()
267 {
268     os::memory::LockHolder lock(coroPoolLock_);
269 
270     auto allocator = Runtime::GetCurrent()->GetInternalAllocator();
271     allocator->Delete(programCompletionEvent_);
272     for (auto *co : coroutinePool_) {
273         co->DestroyInternalResources();
274         CoroutineManager::DestroyEntrypointfulCoroutine(co);
275     }
276     coroutinePool_.clear();
277 }
278 
AddToRegistry(Coroutine * co)279 void StackfulCoroutineManager::AddToRegistry(Coroutine *co)
280 {
281     co->GetVM()->GetGC()->OnThreadCreate(co);
282     coroutines_.insert(co);
283     coroutineCount_++;
284     if (co->GetType() != Coroutine::Type::MUTATOR) {
285         systemCoroutineCount_++;
286     }
287 }
288 
RemoveFromRegistry(Coroutine * co)289 void StackfulCoroutineManager::RemoveFromRegistry(Coroutine *co)
290 {
291     coroutines_.erase(co);
292     coroutineCount_--;
293     if (co->GetType() != Coroutine::Type::MUTATOR) {
294         systemCoroutineCount_--;
295     }
296 }
297 
RegisterCoroutine(Coroutine * co)298 void StackfulCoroutineManager::RegisterCoroutine(Coroutine *co)
299 {
300     os::memory::LockHolder lock(coroListLock_);
301     AddToRegistry(co);
302     // Propagate SUSPEND_REQUEST flag to the new coroutine to avoid the following situation:
303     // * Main coro holds read lock of the MutatorLock.
304     // * GC thread calls SuspendAll nad set SUSPEND_REQUEST flag to the main coro and
305     //   tries to acquire write lock of the MutatorLock.
306     // * Main coro creates a new coro and adds it to the coroutines_ list.
307     // * SUSPEND_REQUEST is not set in the new coroutine
308     // * New coro starts execution, acquires read lock of the MutatorLock and enters a long loop
309     // * Main coro checks SUSPEND_REQUEST flag and blocks
310     // * GC will not start becuase the new coro has no SUSPEND_REQUEST flag and it will never release the MutatorLock
311     //
312     // We need to propagate SUSPEND_REQUEST under the coroListLock_.
313     // It guarantees that the flag is already set for the current coro and we need to propagate it
314     // or GC will see the new coro in EnumerateAllThreads.
315 #ifndef ARK_HYBRID
316     if (Thread::GetCurrent() != nullptr && Coroutine::GetCurrent() != nullptr &&
317         Coroutine::GetCurrent()->IsSuspended() && !co->IsSuspended()) {
318         co->SuspendImpl(true);
319     }
320 #endif
321 }
322 
TerminateCoroutine(Coroutine * co)323 bool StackfulCoroutineManager::TerminateCoroutine(Coroutine *co)
324 {
325     if (co->HasManagedEntrypoint()) {
326         // profiling: start interval here, end in ctxswitch after finalization request is done
327         GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::SCH_ALL);
328     } else {
329         // profiling: no need. MAIN and NATIVE EP coros are deleted from the SCHEDULER itself
330         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::TerminateCoroutine(): terminating "
331                                << ((GetExistingWorkersCount() == 0) ? "MAIN..." : "NATIVE EP coro...");
332     }
333 
334     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::TerminateCoroutine() started";
335     co->NativeCodeEnd();
336     co->UpdateStatus(ThreadStatus::TERMINATING);
337 
338     {
339         os::memory::LockHolder lList(coroListLock_);
340         RemoveFromRegistry(co);
341 #ifdef ARK_HYBRID
342         co->GetThreadHolder()->UnregisterCoroutine(co);
343 #endif
344         // We need collect TLAB metrics and clear TLAB before calling the manage thread destructor
345         // because of the possibility heap use after free. This happening when GC starts execute ResetYoungAllocator
346         // method which start iterate set of threads, collect TLAB metrics and clear TLAB. If thread was deleted from
347         // set but we haven't destroyed the thread yet, GC won't collect metrics and can complete TLAB
348         // deletion faster. And when we try to get the TLAB metrics in the destructor of managed thread, we will get
349         // heap use after free
350         co->CollectTLABMetrics();
351         co->ClearTLAB();
352         // DestroyInternalResources()/CleanupInternalResources() must be called in one critical section with
353         // RemoveFromRegistry (under core_list_lock_). This functions transfer cards from coro's post_barrier buffer to
354         // UpdateRemsetThread internally. Situation when cards still remain and UpdateRemsetThread cannot visit the
355         // coro (because it is already removed) must be impossible.
356         if (Runtime::GetOptions().IsUseCoroutinePool() && co->HasManagedEntrypoint()) {
357             co->CleanupInternalResources();
358         } else {
359             co->DestroyInternalResources();
360         }
361         co->UpdateStatus(ThreadStatus::FINISHED);
362     }
363     Runtime::GetCurrent()->GetNotificationManager()->ThreadEndEvent(co);
364 
365     if (co->HasManagedEntrypoint()) {
366         CheckProgramCompletion();
367         GetCurrentWorker()->RequestFinalization(co);
368     } else if (co->HasNativeEntrypoint()) {
369         GetCurrentWorker()->RequestFinalization(co);
370     } else {
371         // entrypointless and NOT native: e.g. MAIN
372         // (do nothing, as entrypointless coroutines should be destroyed manually)
373     }
374 
375     return false;
376 }
377 
GetActiveWorkersCount() const378 size_t StackfulCoroutineManager::GetActiveWorkersCount() const
379 {
380     os::memory::LockHolder lkWorkers(workersLock_);
381     return activeWorkersCount_;
382 }
383 
GetExistingWorkersCount() const384 size_t StackfulCoroutineManager::GetExistingWorkersCount() const
385 {
386     os::memory::LockHolder lkWorkers(workersLock_);
387     return workers_.size();
388 }
389 
CheckProgramCompletion()390 void StackfulCoroutineManager::CheckProgramCompletion()
391 {
392     os::memory::LockHolder lkCompletion(programCompletionLock_);
393 
394     size_t activeWorkerCoros = GetActiveWorkersCount();
395     if (coroutineCount_ <= 1 + activeWorkerCoros) {  // 1 here is for MAIN
396         LOG(DEBUG, COROUTINES)
397             << "StackfulCoroutineManager::CheckProgramCompletion(): all coroutines finished execution!";
398         // programCompletionEvent_ acts as a stackful-friendly cond var
399         programCompletionEvent_->Happen();
400     } else {
401         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::CheckProgramCompletion(): still "
402                                << coroutineCount_ - 1 - activeWorkerCoros << " coroutines exist...";
403     }
404 }
405 
CreateCoroutineContext(bool coroHasEntrypoint)406 CoroutineContext *StackfulCoroutineManager::CreateCoroutineContext(bool coroHasEntrypoint)
407 {
408     return CreateCoroutineContextImpl(coroHasEntrypoint);
409 }
410 
DeleteCoroutineContext(CoroutineContext * ctx)411 void StackfulCoroutineManager::DeleteCoroutineContext(CoroutineContext *ctx)
412 {
413     FreeCoroutineStack(static_cast<StackfulCoroutineContext *>(ctx)->GetStackLoAddrPtr());
414     Runtime::GetCurrent()->GetInternalAllocator()->Delete(ctx);
415 }
416 
GetCoroutineCount()417 size_t StackfulCoroutineManager::GetCoroutineCount()
418 {
419     return coroutineCount_;
420 }
421 
GetCoroutineCountLimit()422 size_t StackfulCoroutineManager::GetCoroutineCountLimit()
423 {
424     return coroutineCountLimit_;
425 }
426 
Launch(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode,CoroutinePriority priority,bool abortFlag)427 bool StackfulCoroutineManager::Launch(CompletionEvent *completionEvent, Method *entrypoint,
428                                       PandaVector<Value> &&arguments, CoroutineLaunchMode mode,
429                                       CoroutinePriority priority, bool abortFlag)
430 {
431     auto epInfo = Coroutine::ManagedEntrypointInfo {completionEvent, entrypoint, std::move(arguments)};
432     return LaunchWithMode(std::move(epInfo), entrypoint->GetFullName(), mode, priority, false, abortFlag);
433 }
434 
LaunchImmediately(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode,CoroutinePriority priority,bool abortFlag)435 bool StackfulCoroutineManager::LaunchImmediately(CompletionEvent *completionEvent, Method *entrypoint,
436                                                  PandaVector<Value> &&arguments, CoroutineLaunchMode mode,
437                                                  CoroutinePriority priority, bool abortFlag)
438 {
439     auto epInfo = Coroutine::ManagedEntrypointInfo {completionEvent, entrypoint, std::move(arguments)};
440     return LaunchWithMode(std::move(epInfo), entrypoint->GetFullName(), mode, priority, true, abortFlag);
441 }
442 
LaunchNative(NativeEntrypointFunc epFunc,void * param,PandaString coroName,CoroutineLaunchMode mode,CoroutinePriority priority,bool abortFlag)443 bool StackfulCoroutineManager::LaunchNative(NativeEntrypointFunc epFunc, void *param, PandaString coroName,
444                                             CoroutineLaunchMode mode, CoroutinePriority priority, bool abortFlag)
445 {
446     auto epInfo = Coroutine::NativeEntrypointInfo {epFunc, param};
447     return LaunchWithMode(epInfo, std::move(coroName), mode, priority, false, abortFlag);
448 }
449 
Await(CoroutineEvent * awaitee)450 void StackfulCoroutineManager::Await(CoroutineEvent *awaitee)
451 {
452     ASSERT_NATIVE_CODE();
453     // profiling
454     ScopedCoroutineStats s(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL);
455 
456     ASSERT(awaitee != nullptr);
457     [[maybe_unused]] auto *waiter = Coroutine::GetCurrent();
458     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Await started by " + waiter->GetName();
459 
460     if (awaitee->Happened()) {
461         awaitee->Unlock();
462         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Await finished (no await happened)";
463         return;
464     }
465 
466     GetCurrentWorker()->WaitForEvent(awaitee);
467     // NB: at this point the awaitee is likely already deleted
468     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Await finished by " + waiter->GetName();
469 }
470 
UnblockWaiters(CoroutineEvent * blocker)471 void StackfulCoroutineManager::UnblockWaiters(CoroutineEvent *blocker)
472 {
473     // profiling: this function can be called either independently or as a path of some other SCH sequence,
474     // hence using the "weak" stats collector
475     ScopedCoroutineStats s(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL, true);
476 
477     os::memory::LockHolder lkWorkers(workersLock_);
478     ASSERT(blocker != nullptr);
479 #ifndef NDEBUG
480     {
481         os::memory::LockHolder lkBlocker(*blocker);
482         ASSERT(blocker->Happened());
483     }
484 #endif
485 
486     for (auto *w : workers_) {
487         w->UnblockWaiters(blocker);
488     }
489 }
490 
Schedule()491 void StackfulCoroutineManager::Schedule()
492 {
493     // profiling
494     ScopedCoroutineStats s(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL);
495 
496     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Schedule() request from "
497                            << Coroutine::GetCurrent()->GetName();
498     GetCurrentWorker()->RequestSchedule();
499 }
500 
EnumerateThreadsImpl(const ThreadManager::Callback & cb,unsigned int incMask,unsigned int xorMask) const501 bool StackfulCoroutineManager::EnumerateThreadsImpl(const ThreadManager::Callback &cb, unsigned int incMask,
502                                                     unsigned int xorMask) const
503 {
504     os::memory::LockHolder lock(coroListLock_);
505     for (auto *t : coroutines_) {
506         if (!ApplyCallbackToThread(cb, t, incMask, xorMask)) {
507             return false;
508         }
509     }
510     return true;
511 }
512 
SuspendAllThreads()513 void StackfulCoroutineManager::SuspendAllThreads()
514 {
515     os::memory::LockHolder lock(coroListLock_);
516     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::SuspendAllThreads started";
517     for (auto *t : coroutines_) {
518         t->SuspendImpl(true);
519     }
520     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::SuspendAllThreads finished";
521 }
522 
ResumeAllThreads()523 void StackfulCoroutineManager::ResumeAllThreads()
524 {
525     os::memory::LockHolder lock(coroListLock_);
526     for (auto *t : coroutines_) {
527         t->ResumeImpl(true);
528     }
529 }
530 
IsRunningThreadExist()531 bool StackfulCoroutineManager::IsRunningThreadExist()
532 {
533     UNREACHABLE();
534     // NOTE(konstanting): correct implementation. Which coroutine do we consider running?
535     return false;
536 }
537 
WaitForDeregistration()538 void StackfulCoroutineManager::WaitForDeregistration()
539 {
540     // profiling: start interval here, end in ctxswitch (if needed)
541     GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::SCH_ALL);
542     //
543     MainCoroutineCompleted();
544 }
545 
ReuseCoroutineInstance(Coroutine * co,EntrypointInfo && epInfo,PandaString name,CoroutinePriority priority)546 void StackfulCoroutineManager::ReuseCoroutineInstance(Coroutine *co, EntrypointInfo &&epInfo, PandaString name,
547                                                       CoroutinePriority priority)
548 {
549     auto *ctx = co->GetContext<CoroutineContext>();
550     co->ReInitialize(std::move(name), ctx, std::move(epInfo), priority);
551 }
552 
TryGetCoroutineFromPool()553 Coroutine *StackfulCoroutineManager::TryGetCoroutineFromPool()
554 {
555     os::memory::LockHolder lkPool(coroPoolLock_);
556     if (coroutinePool_.empty()) {
557         return nullptr;
558     }
559     Coroutine *co = coroutinePool_.back();
560     coroutinePool_.pop_back();
561     return co;
562 }
563 
ChooseWorkerForCoroutine(Coroutine * co)564 StackfulCoroutineWorker *StackfulCoroutineManager::ChooseWorkerForCoroutine(Coroutine *co)
565 {
566     ASSERT(co != nullptr);
567     auto maskValue = co->GetContext<StackfulCoroutineContext>()->GetAffinityMask();
568     return ChooseWorkerImpl(WorkerSelectionPolicy::LEAST_LOADED, maskValue);
569 }
570 
CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode mode)571 stackful_coroutines::AffinityMask StackfulCoroutineManager::CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode mode)
572 {
573     /**
574      * launch mode \ policy      DEFAULT                         NON_MAIN
575      *   DEFAULT                ->least busy, allow migration   ->least busy, allow migration, disallow <main>
576      *   SAME_WORKER            ->same, forbid migration        ->same, forbid migration
577      *   MAIN_WORKER            ->main, forbid migration        ->main, forbid migration
578      *   EXCLUSIVE              ->least busy, forbid migration  ->least busy, forbid migration, disallow <main>
579      */
580 
581     if (mode == CoroutineLaunchMode::SAME_WORKER) {
582         std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> mask(stackful_coroutines::AFFINITY_MASK_NONE);
583         mask.set(GetCurrentWorker()->GetId());
584         return mask.to_ullong();
585     }
586 
587     if (mode == CoroutineLaunchMode::MAIN_WORKER) {
588         std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> mask(stackful_coroutines::AFFINITY_MASK_NONE);
589         mask.set(stackful_coroutines::MAIN_WORKER_ID);
590         return mask.to_ullong();
591     }
592 
593     // CoroutineLaunchMode::EXCLUSIVE is not supported yet (but will be)
594     ASSERT(mode == CoroutineLaunchMode::DEFAULT);
595 
596     std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> mask(stackful_coroutines::AFFINITY_MASK_FULL);
597     switch (GetSchedulingPolicy()) {
598         case CoroutineSchedulingPolicy::NON_MAIN_WORKER: {
599             mask.reset(stackful_coroutines::MAIN_WORKER_ID);
600             break;
601         }
602         default:
603         case CoroutineSchedulingPolicy::ANY_WORKER:
604             break;
605     }
606     return mask.to_ullong();
607 }
608 
GetCoroutineInstanceForLaunch(EntrypointInfo && epInfo,PandaString && coroName,CoroutinePriority priority,stackful_coroutines::AffinityMask affinityMask,bool abortFlag)609 Coroutine *StackfulCoroutineManager::GetCoroutineInstanceForLaunch(EntrypointInfo &&epInfo, PandaString &&coroName,
610                                                                    CoroutinePriority priority,
611                                                                    stackful_coroutines::AffinityMask affinityMask,
612                                                                    bool abortFlag)
613 {
614     Coroutine *co = nullptr;
615     if (Runtime::GetOptions().IsUseCoroutinePool()) {
616         co = TryGetCoroutineFromPool();
617     }
618     if (co != nullptr) {
619         ReuseCoroutineInstance(co, std::move(epInfo), std::move(coroName), priority);
620     } else {
621         co = CreateCoroutineInstance(std::move(epInfo), std::move(coroName), Coroutine::Type::MUTATOR, priority);
622     }
623     if (co == nullptr) {
624         LOG(DEBUG, COROUTINES)
625             << "StackfulCoroutineManager::GetCoroutineInstanceForLaunch: failed to create a coroutine!";
626         return co;
627     }
628     co->SetAbortFlag(abortFlag);
629     Runtime::GetCurrent()->GetNotificationManager()->ThreadStartEvent(co);
630     co->GetContext<StackfulCoroutineContext>()->SetAffinityMask(affinityMask);
631     return co;
632 }
633 
LaunchImpl(EntrypointInfo && epInfo,PandaString && coroName,CoroutineLaunchMode mode,CoroutinePriority priority,bool abortFlag)634 bool StackfulCoroutineManager::LaunchImpl(EntrypointInfo &&epInfo, PandaString &&coroName, CoroutineLaunchMode mode,
635                                           CoroutinePriority priority, bool abortFlag)
636 {
637 #ifndef NDEBUG
638     GetCurrentWorker()->PrintRunnables("LaunchImpl begin");
639 #endif
640     Coroutine *co = nullptr;
641     auto affinityMask = CalcAffinityMaskFromLaunchMode(mode);
642     co = GetCoroutineInstanceForLaunch(std::move(epInfo), std::move(coroName), priority, affinityMask, abortFlag);
643     if (co == nullptr) {
644         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::LaunchImpl: failed to create a coroutine!";
645         return false;
646     }
647     {
648         os::memory::LockHolder lkWorkers(workersLock_);
649         auto *w = ChooseWorkerForCoroutine(co);
650         ASSERT(w != nullptr);
651         w->AddRunnableCoroutine(co);
652     }
653 #ifndef NDEBUG
654     GetCurrentWorker()->PrintRunnables("LaunchImpl end");
655 #endif
656     return true;
657 }
658 
LaunchImmediatelyImpl(EntrypointInfo && epInfo,PandaString && coroName,CoroutineLaunchMode mode,CoroutinePriority priority,bool abortFlag)659 bool StackfulCoroutineManager::LaunchImmediatelyImpl(EntrypointInfo &&epInfo, PandaString &&coroName,
660                                                      CoroutineLaunchMode mode, CoroutinePriority priority,
661                                                      bool abortFlag)
662 {
663     Coroutine *co = nullptr;
664     auto affinityMask = CalcAffinityMaskFromLaunchMode(mode);
665 
666     ASSERT(affinityMask == CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode::SAME_WORKER));
667 
668     co = GetCoroutineInstanceForLaunch(std::move(epInfo), std::move(coroName), priority, affinityMask, abortFlag);
669     if (co == nullptr) {
670         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::LaunchImmediatelyImpl: failed to create a coroutine!";
671         return false;
672     }
673     StackfulCoroutineWorker *w = nullptr;
674     {
675         os::memory::LockHolder lkWorkers(workersLock_);
676         w = ChooseWorkerForCoroutine(co);
677     }
678     ASSERT(w != nullptr);
679     // since we are going to switch the context, we have to close the interval
680     GetCurrentWorker()->GetPerfStats().FinishInterval(CoroutineTimeStats::LAUNCH);
681     co->SetImmediateLauncher(Coroutine::GetCurrent());
682     w->AddCreatedCoroutineAndSwitchToIt(co);
683     // resume the interval once we schedule the original coro again
684     GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::LAUNCH);
685 
686     return true;
687 }
688 
LaunchWithMode(Coroutine::EntrypointInfo && epInfo,PandaString && coroName,CoroutineLaunchMode mode,CoroutinePriority priority,bool launchImmediately,bool abortFlag)689 bool StackfulCoroutineManager::LaunchWithMode(Coroutine::EntrypointInfo &&epInfo, PandaString &&coroName,
690                                               CoroutineLaunchMode mode, CoroutinePriority priority,
691                                               bool launchImmediately, bool abortFlag)
692 {
693     // profiling: scheduler and launch time
694     ScopedCoroutineStats sSch(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL);
695     ScopedCoroutineStats sLaunch(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::LAUNCH);
696 
697     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::LaunchWithMode started";
698 
699     auto *co = Coroutine::GetCurrent();
700     ASSERT(co != nullptr);
701     auto *w = co->GetContext<StackfulCoroutineContext>()->GetWorker();
702     mode = (mode == CoroutineLaunchMode::DEFAULT && w->InExclusiveMode()) ? CoroutineLaunchMode::SAME_WORKER : mode;
703     bool result = false;
704     if (launchImmediately) {
705         result = LaunchImmediatelyImpl(std::move(epInfo), std::move(coroName), mode, priority, abortFlag);
706     } else {
707         result = LaunchImpl(std::move(epInfo), std::move(coroName), mode, priority, abortFlag);
708     }
709     if (!result) {
710         // let's count all launch failures as "limit exceeded" for now.
711         // Later on we can think of throwing different errors for different reasons.
712         ThrowCoroutinesLimitExceedError(
713             "Unable to create a new coroutine: reached the limit for the number of existing coroutines.");
714     }
715 
716     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::LaunchWithMode finished";
717     return result;
718 }
719 
DumpCoroutineStats() const720 void StackfulCoroutineManager::DumpCoroutineStats() const
721 {
722     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager: dumping performance statistics...";
723     std::cout << "=== Coroutine statistics begin ===" << std::endl;
724     std::cout << stats_.GetFullStatistics(finalizedWorkerStats_);
725     std::cout << "=== Coroutine statistics end ===" << std::endl;
726     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager: performance statistics dumped successfully.";
727 }
728 
ListUnhandledEventsOnProgramExit()729 void StackfulCoroutineManager::ListUnhandledEventsOnProgramExit()
730 {
731     auto *coro = Coroutine::GetCurrent();
732     ASSERT(coro != nullptr);
733     coro->ListUnhandledEventsOnProgramExit();
734 }
735 
WaitForNonMainCoroutinesCompletion()736 void StackfulCoroutineManager::WaitForNonMainCoroutinesCompletion()
737 {
738     os::memory::LockHolder lkCompletion(programCompletionLock_);
739     // It's neccessary to read activeWorkersCount before coroutineCount to avoid deadlock
740     do {
741         while (GetActiveWorkersCount() + 1 < coroutineCount_) {  // 1 is for MAIN
742             programCompletionEvent_->SetNotHappened();
743             programCompletionEvent_->Lock();
744             programCompletionLock_.Unlock();
745             GetCurrentWorker()->WaitForEvent(programCompletionEvent_);
746             LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::WaitForNonMainCoroutinesCompletion(): possibly "
747                                       "spurious wakeup from wait...";
748             // NOTE(konstanting, #IAD5MH): test for the spurious wakeup
749             programCompletionLock_.Lock();
750         }
751         ListUnhandledEventsOnProgramExit();
752     } while (GetActiveWorkersCount() + 1 < coroutineCount_);  // 1 is for MAIN
753     // coroutineCount_ < 1 + GetActiveWorkersCount() in case of concurrent EWorker destroy
754     // in this case coroutineCount_ >= 1 + GetActiveWorkersCount() - ExclusiveWorkersCount()
755     ASSERT(!(GetActiveWorkersCount() + 1 < coroutineCount_));
756 }
757 
MainCoroutineCompleted()758 void StackfulCoroutineManager::MainCoroutineCompleted()
759 {
760     // precondition: MAIN is already in the native mode
761     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): STARTED";
762     // block till only schedule loop coroutines are present
763     LOG(DEBUG, COROUTINES)
764         << "StackfulCoroutineManager::MainCoroutineCompleted(): waiting for other coroutines to complete";
765     WaitForNonMainCoroutinesCompletion();
766     if (enableMigration_) {
767         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): stop manager thread";
768         StopManagerThread();
769     }
770 
771     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): stopping workers";
772     {
773         os::memory::LockHolder lock(workersLock_);
774         for (auto *worker : workers_) {
775             worker->SetActive(false);
776         }
777         while (activeWorkersCount_ > 1) {  // 1 is for MAIN
778             // profiling: the SCH interval is expected to be started after the ctxswitch
779             GetCurrentWorker()->GetPerfStats().FinishInterval(CoroutineTimeStats::SCH_ALL);
780             // NOTE(konstanting, #IAD5MH): need timed wait?..
781             workersCv_.Wait(&workersLock_);
782             // profiling: we don't want to profile the sleeping state
783             GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::SCH_ALL);
784         }
785     }
786     // Only system coroutines and current coro (MAIN) are left (1 is for MAIN)
787     ASSERT(activeCoroutines_ == systemCoroutineCount_ + 1);
788 
789     LOG(DEBUG, COROUTINES)
790         << "StackfulCoroutineManager::MainCoroutineCompleted(): stopping await loop on the main worker";
791     while (coroutineCount_ > 1) {
792         GetCurrentWorker()->FinalizeFiberScheduleLoop();
793     }
794     // profiling: the SCH interval is expected to be started after the ctxswitch
795     GetCurrentWorker()->GetPerfStats().FinishInterval(CoroutineTimeStats::SCH_ALL);
796 
797     OnWorkerShutdown(GetCurrentWorker());
798 
799 #ifndef NDEBUG
800     {
801         os::memory::LockHolder lkWorkers(workersLock_);
802         ASSERT(workers_.empty());
803         ASSERT(activeWorkersCount_ == 0);
804     }
805 #endif
806 
807     if (stats_.IsEnabled()) {
808         DumpCoroutineStats();
809     }
810     stats_.Disable();
811 
812     // We need to lock programCompletionLock_ here to call
813     // programCompletionLock_.Unlock() in ExclusiveWorker before runtime destruction
814     os::memory::LockHolder lkCompletion(programCompletionLock_);
815     GetCurrentContext()->MainThreadFinished();
816     // MAIN finished, all workers are deleted, no active coros remain
817     ASSERT(activeCoroutines_ == 0);
818     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): DONE";
819 }
820 
GetCurrentContext()821 StackfulCoroutineContext *StackfulCoroutineManager::GetCurrentContext()
822 {
823     auto *co = Coroutine::GetCurrent();
824     ASSERT(co != nullptr);
825     return co->GetContext<StackfulCoroutineContext>();
826 }
827 
GetCurrentWorker()828 StackfulCoroutineWorker *StackfulCoroutineManager::GetCurrentWorker()
829 {
830     return GetCurrentContext()->GetWorker();
831 }
832 
IsMainWorker(Coroutine * co) const833 bool StackfulCoroutineManager::IsMainWorker(Coroutine *co) const
834 {
835     auto *worker = co->GetContext<StackfulCoroutineContext>()->GetWorker();
836     return worker->IsMainWorker();
837 }
838 
DestroyEntrypointfulCoroutine(Coroutine * co)839 void StackfulCoroutineManager::DestroyEntrypointfulCoroutine(Coroutine *co)
840 {
841     if (Runtime::GetOptions().IsUseCoroutinePool() && co->HasManagedEntrypoint()) {
842         co->CleanUp();
843         os::memory::LockHolder lock(coroPoolLock_);
844         coroutinePool_.push_back(co);
845     } else {
846         CoroutineManager::DestroyEntrypointfulCoroutine(co);
847     }
848 }
849 
CreateCoroutineContextImpl(bool needStack)850 StackfulCoroutineContext *StackfulCoroutineManager::CreateCoroutineContextImpl(bool needStack)
851 {
852     uint8_t *stack = nullptr;
853     size_t stackSizeBytes = 0;
854     if (needStack) {
855         stack = AllocCoroutineStack();
856         if (stack == nullptr) {
857             return nullptr;
858         }
859         stackSizeBytes = coroStackSizeBytes_;
860     }
861     return Runtime::GetCurrent()->GetInternalAllocator()->New<StackfulCoroutineContext>(stack, stackSizeBytes);
862 }
863 
CreateNativeCoroutine(Runtime * runtime,PandaVM * vm,Coroutine::NativeEntrypointInfo::NativeEntrypointFunc entry,void * param,PandaString name,Coroutine::Type type,CoroutinePriority priority)864 Coroutine *StackfulCoroutineManager::CreateNativeCoroutine(Runtime *runtime, PandaVM *vm,
865                                                            Coroutine::NativeEntrypointInfo::NativeEntrypointFunc entry,
866                                                            void *param, PandaString name, Coroutine::Type type,
867                                                            CoroutinePriority priority)
868 {
869     if (GetCoroutineCount() >= GetCoroutineCountLimit()) {
870         // resource limit reached
871         return nullptr;
872     }
873     StackfulCoroutineContext *ctx = CreateCoroutineContextImpl(true);
874     if (ctx == nullptr) {
875         // do not proceed if we cannot create a context for the new coroutine
876         return nullptr;
877     }
878     auto *co = GetCoroutineFactory()(runtime, vm, std::move(name), ctx, Coroutine::NativeEntrypointInfo(entry, param),
879                                      type, priority);
880     ASSERT(co != nullptr);
881 
882     // Let's assume that even the "native" coroutine can eventually try to execute some managed code.
883     // In that case pre/post barrier buffers are necessary.
884     co->InitBuffers();
885     return co;
886 }
887 
DestroyNativeCoroutine(Coroutine * co)888 void StackfulCoroutineManager::DestroyNativeCoroutine(Coroutine *co)
889 {
890     DestroyEntrypointlessCoroutine(co);
891 }
892 
OnCoroBecameActive(Coroutine * co)893 void StackfulCoroutineManager::OnCoroBecameActive(Coroutine *co)
894 {
895     ASSERT(co->IsActive());
896     IncrementActiveCoroutines();
897     co->GetWorker()->OnCoroBecameActive(co);
898 }
899 
OnCoroBecameNonActive(Coroutine * co)900 void StackfulCoroutineManager::OnCoroBecameNonActive([[maybe_unused]] Coroutine *co)
901 {
902     ASSERT(!co->IsActive());
903     DecrementActiveCoroutines();
904 }
905 
OnNativeCallExit(Coroutine * co)906 void StackfulCoroutineManager::OnNativeCallExit(Coroutine *co)
907 {
908     if (IsDrainQueueInterfaceEnabled()) {
909         // A temporary hack for draining the coroutine queue on the current worker.
910         // Will stay there until we have the proper design for the execution model and
911         // the rules for interaction with the app framework.
912         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::OnNativeCallExit(): START DRAINING COROQUEUE";
913 
914         // precondition: the event is handled for the current coroutine
915         ASSERT(co == Coroutine::GetCurrent());
916         auto *worker = GetCurrentWorker();
917         if (!worker->IsMainWorker() && !worker->InExclusiveMode()) {
918             return;
919         }
920         ScopedNativeCodeThread nativeCode(co);
921         while (worker->GetRunnablesCount(Coroutine::Type::MUTATOR) > 0) {
922             GetCurrentWorker()->RequestSchedule();
923         }
924 
925         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::OnNativeCallExit(): STOP DRAINING COROQUEUE";
926     }
927 }
928 
IncrementActiveCoroutines()929 void StackfulCoroutineManager::IncrementActiveCoroutines()
930 {
931     activeCoroutines_++;
932 }
933 
DecrementActiveCoroutines()934 void StackfulCoroutineManager::DecrementActiveCoroutines()
935 {
936     [[maybe_unused]] uint32_t old = activeCoroutines_--;
937     ASSERT(old > 0);
938 }
939 
IsNoActiveMutatorsExceptCurrent()940 bool StackfulCoroutineManager::IsNoActiveMutatorsExceptCurrent()
941 {
942     // activeCoroutines_ == 1 means that only current or main mutator is left
943     // activeCoroutines_ == 0 means that main mutator is terminating
944     // or all coroutines are blocked (a deadlock in managed code happened)
945 
946     // NOTE(konstanting): need to reevaluate the necessity of locks here as
947     // atomics difference is somewhat confusing. Also, we may have concurrent access to them.
948     return (activeCoroutines_ - systemCoroutineCount_) <= 1;
949 }
950 
CreateExclusiveWorkerForThread(Runtime * runtime,PandaVM * vm)951 Coroutine *StackfulCoroutineManager::CreateExclusiveWorkerForThread(Runtime *runtime, PandaVM *vm)
952 {
953     ASSERT(Thread::GetCurrent() == nullptr);
954 
955     // actually we need this lock due to worker limit
956     os::memory::LockHolder eWorkerLock(eWorkerCreationLock_);
957 
958     if (IsExclusiveWorkersLimitReached()) {
959         LOG(DEBUG, COROUTINES) << "The program reached the limit of exclusive workers";
960         return nullptr;
961     }
962 
963     auto *eWorker = CreateWorker(runtime, vm, StackfulCoroutineWorker::ScheduleLoopType::FIBER, "[e-worker] ");
964     ASSERT(eWorker != nullptr);
965     eWorker->SetExclusiveMode(true);
966     eWorker->DisableForCrossWorkersLaunch();
967     auto *eCoro = CreateEntrypointlessCoroutine(runtime, vm, true, "[ea_coro] " + eWorker->GetName(),
968                                                 Coroutine::Type::MUTATOR, CoroutinePriority::MEDIUM_PRIORITY);
969     ASSERT(eCoro != nullptr);
970     eWorker->AddRunningCoroutine(eCoro);
971     OnWorkerStartup(eWorker);
972 
973     ASSERT(Coroutine::GetCurrent() == eCoro);
974     return eCoro;
975 }
976 
DestroyExclusiveWorker()977 bool StackfulCoroutineManager::DestroyExclusiveWorker()
978 {
979     auto *eWorker = GetCurrentWorker();
980     if (!eWorker->InExclusiveMode()) {
981         LOG(DEBUG, COROUTINES) << "Trying to destroy not exclusive worker";
982         return false;
983     }
984 
985     eWorker->CompleteAllAffinedCoroutines();
986 
987     eWorker->SetActive(false);
988     eWorker->FinalizeFiberScheduleLoop();
989 
990     CheckProgramCompletion();
991 
992     auto *eaCoro = Coroutine::GetCurrent();
993     programCompletionLock_.Lock();
994     DestroyEntrypointlessCoroutine(eaCoro);
995     Coroutine::SetCurrent(nullptr);
996 
997     OnWorkerShutdown(eWorker);
998     programCompletionLock_.Unlock();
999     return true;
1000 }
1001 
IsExclusiveWorkersLimitReached() const1002 bool StackfulCoroutineManager::IsExclusiveWorkersLimitReached() const
1003 {
1004     bool limitIsReached = GetActiveWorkersCount() - commonWorkersCount_ >= exclusiveWorkersLimit_;
1005     LOG_IF(limitIsReached, DEBUG, COROUTINES) << "The programm reached the limit of exclusive workers";
1006     return limitIsReached;
1007 }
1008 
ManagerThreadProc()1009 void StackfulCoroutineManager::ManagerThreadProc()
1010 {
1011     // calculate the time after 5 seconds.
1012     auto nextCheckTime = ark::os::time::GetClockTimeInMilli() + DETECTION_INTERVAL_VALUE;
1013     while (managerRunning_) {
1014         {
1015             os::memory::LockHolder lock(managerMutex_);
1016             managerCv_.TimedWait(&managerMutex_, DETECTION_INTERVAL_VALUE);  // wait 5 seconds.
1017             if (!managerRunning_) {
1018                 LOG(DEBUG, COROUTINES) << "The manager thread stops running.";
1019                 break;
1020             }
1021         }
1022         uint32_t count = migrateCount_.exchange(0);
1023         while (count > 0) {
1024             MigrateCoroutinesInward(count);
1025         }
1026         auto now = ark::os::time::GetClockTimeInMilli();
1027         if (now >= nextCheckTime) {
1028             CheckForBlockedWorkers();
1029             nextCheckTime = now + DETECTION_INTERVAL_VALUE;  // update to the next 5 second.
1030         }
1031     }
1032 }
1033 
CheckForBlockedWorkers()1034 void StackfulCoroutineManager::CheckForBlockedWorkers()
1035 {
1036     os::memory::LockHolder lock(workersLock_);
1037     ASSERT(!workers_.empty());
1038     for (auto *w : workers_) {
1039         // skip main worker and exclusive workers
1040         if (w->IsMainWorker() || w->InExclusiveMode()) {
1041             continue;
1042         }
1043         w->MigrateCorosOutwardsIfBlocked();
1044     }
1045 }
1046 
MigrateCoroutinesInward(uint32_t & count)1047 void StackfulCoroutineManager::MigrateCoroutinesInward(uint32_t &count)
1048 {
1049     auto affinityMask = CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode::DEFAULT);
1050     os::memory::LockHolder lkWorkers(workersLock_);
1051     StackfulCoroutineWorker *from = ChooseWorkerImpl(WorkerSelectionPolicy::MOST_LOADED, affinityMask);
1052     if (from == nullptr || from->IsIdle()) {
1053         LOG(DEBUG, COROUTINES) << "no suitable worker.";
1054         count = 0;
1055         return;
1056     }
1057 
1058     for (auto *w : workers_) {
1059         // skip main worker and exclusive workers
1060         if (w->IsMainWorker() || w->InExclusiveMode()) {
1061             continue;
1062         }
1063         if (w->MigrateFrom(from)) {
1064             --count;
1065             return;
1066         }
1067     }
1068     count = 0;
1069 }
1070 
MigrateCoroutinesOutward(StackfulCoroutineWorker * from)1071 bool StackfulCoroutineManager::MigrateCoroutinesOutward(StackfulCoroutineWorker *from)
1072 {
1073     if (from->IsIdle()) {
1074         LOG(DEBUG, COROUTINES) << "The worker is idle, stop migration outward.";
1075         return false;
1076     }
1077     auto affinityMask = CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode::DEFAULT);
1078     os::memory::LockHolder lkWorkers(workersLock_);
1079     StackfulCoroutineWorker *to = ChooseWorkerImpl(WorkerSelectionPolicy::LEAST_LOADED, affinityMask);
1080     if (to == nullptr || to == from) {
1081         LOG(DEBUG, COROUTINES) << "no suitable worker.";
1082         return false;
1083     }
1084     from->MigrateTo(to);
1085     return true;
1086 }
1087 
ChooseWorkerImpl(WorkerSelectionPolicy policy,size_t maskValue)1088 StackfulCoroutineWorker *StackfulCoroutineManager::ChooseWorkerImpl(WorkerSelectionPolicy policy, size_t maskValue)
1089 {
1090     std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> affinityBits(maskValue);
1091     auto preferFirstOverSecond = [&affinityBits, &policy](const StackfulCoroutineWorker *first,
1092                                                           const StackfulCoroutineWorker *second) {
1093         if (!affinityBits.test(first->GetId())) {
1094             return false;
1095         }
1096         if (!affinityBits.test(second->GetId())) {
1097             return true;
1098         }
1099         // choosing the least loaded worker from the allowed worker set
1100         if (policy == WorkerSelectionPolicy::LEAST_LOADED) {
1101             return first->GetLoadFactor() < second->GetLoadFactor();
1102         }
1103         // choosing the most loaded worker from the allowed worker set
1104         return first->GetLoadFactor() > second->GetLoadFactor();
1105     };
1106 
1107     if (workers_.empty()) {
1108         LOG(DEBUG, COROUTINES) << "workers is empty.";
1109         return nullptr;
1110     }
1111 #ifndef NDEBUG
1112     LOG(DEBUG, COROUTINES) << "Evaluating load factors:";
1113     for (auto w : workers_) {
1114         LOG(DEBUG, COROUTINES) << w->GetName() << ": LF = " << w->GetLoadFactor();
1115     }
1116 #endif
1117     std::vector<StackfulCoroutineWorker *> suitableWorkers;
1118     // skip exclusive and finalizing workers
1119     std::copy_if(workers_.begin(), workers_.end(), std::back_inserter(suitableWorkers), [affinityBits](auto *w) {
1120         return !w->IsDisabledForCrossWorkersLaunch() || affinityBits.test(w->GetId());
1121     });
1122     ASSERT(!suitableWorkers.empty());
1123 
1124     auto wIt = std::min_element(suitableWorkers.begin(), suitableWorkers.end(), preferFirstOverSecond);
1125     LOG(DEBUG, COROUTINES) << "Chose worker: " << (*wIt)->GetName();
1126 
1127     return *wIt;
1128 }
1129 
TriggerMigration()1130 void StackfulCoroutineManager::TriggerMigration()
1131 {
1132     auto *worker = GetCurrentWorker();
1133     if (worker->IsMainWorker() || worker->InExclusiveMode()) {
1134         return;
1135     }
1136     if (!IsMigrationEnabled()) {
1137         LOG(DEBUG, COROUTINES) << "Migration is not supported.";
1138         return;
1139     }
1140     ++migrateCount_;
1141     LOG(DEBUG, COROUTINES) << "trigger migration.";
1142     os::memory::LockHolder lock(managerMutex_);
1143     managerCv_.Signal();
1144 }
1145 
MigrateAwakenedCoro(Coroutine * co)1146 void StackfulCoroutineManager::MigrateAwakenedCoro(Coroutine *co)
1147 {
1148     os::memory::LockHolder lkWorkers(workersLock_);
1149     auto *w = ChooseWorkerForCoroutine(co);
1150     ASSERT(w != nullptr);
1151     w->AddRunnableCoroutine(co);
1152 }
1153 
StartManagerThread()1154 void StackfulCoroutineManager::StartManagerThread()
1155 {
1156     // create a thread to detect worker blocking and perform coroutine migration
1157     managerRunning_ = true;
1158     managerThread_ = std::thread(&StackfulCoroutineManager::ManagerThreadProc, this);
1159     os::thread::SetThreadName(managerThread_.native_handle(), "managerThread");
1160 }
1161 
StopManagerThread()1162 void StackfulCoroutineManager::StopManagerThread()
1163 {
1164     if (!managerRunning_) {
1165         return;
1166     }
1167     {
1168         os::memory::LockHolder lock(managerMutex_);
1169         managerRunning_ = false;
1170         managerCv_.SignalAll();
1171     }
1172     if (managerThread_.joinable()) {
1173         managerThread_.join();
1174     }
1175 }
1176 
PreZygoteFork()1177 void StackfulCoroutineManager::PreZygoteFork()
1178 {
1179     WaitForNonMainCoroutinesCompletion();
1180     if (enableMigration_) {
1181         StopManagerThread();
1182     }
1183 
1184     FinalizeWorkers(commonWorkersCount_ - 1, Runtime::GetCurrent(), Runtime::GetCurrent()->GetPandaVM());
1185 }
1186 
PostZygoteFork()1187 void StackfulCoroutineManager::PostZygoteFork()
1188 {
1189     os::memory::LockHolder lh(workersLock_);
1190     Runtime *runtime = Runtime::GetCurrent();
1191     CreateWorkers(commonWorkersCount_ - 1, runtime, runtime->GetPandaVM());
1192     if (enableMigration_) {
1193         StartManagerThread();
1194     }
1195 }
1196 
CalculateWorkerLimits(const CoroutineManagerConfig & config,size_t & exclusiveWorkersLimit,size_t & commonWorkersLimit)1197 void StackfulCoroutineManager::CalculateWorkerLimits(const CoroutineManagerConfig &config,
1198                                                      size_t &exclusiveWorkersLimit, size_t &commonWorkersLimit)
1199 {
1200     // 1 is for MAIN
1201     size_t eWorkersLimit =
1202         std::min(stackful_coroutines::MAX_WORKERS_COUNT - 1, static_cast<size_t>(config.exclusiveWorkersLimit));
1203 
1204 #ifdef PANDA_ETS_INTEROP_JS
1205     // 2 is for taskpool execution engine eaworker
1206     bool res = Runtime::GetOptions().IsTaskpoolSupportInterop(plugins::LangToRuntimeType(panda_file::SourceLang::ETS));
1207     if (res) {
1208         eWorkersLimit += stackful_coroutines::TASKPOOL_EAWORKER_LIMIT;
1209     }
1210 #endif
1211     // create and activate workers
1212     size_t numberOfAvailableCores = std::max(std::thread::hardware_concurrency() / 4ULL, 2ULL);
1213 
1214     // workaround for issue #21582
1215     const size_t maxCommonWorkers =
1216         std::max(stackful_coroutines::MAX_WORKERS_COUNT - eWorkersLimit, static_cast<size_t>(2ULL));
1217 
1218     commonWorkersLimit = (config.workersCount == CoroutineManagerConfig::WORKERS_COUNT_AUTO)
1219                              ? std::min(numberOfAvailableCores, maxCommonWorkers)
1220                              : std::min(static_cast<size_t>(config.workersCount), maxCommonWorkers);
1221     if (config.workersCount == CoroutineManagerConfig::WORKERS_COUNT_AUTO) {
1222         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager(): AUTO mode selected, will set number of coroutine "
1223                                   "common workers to number of CPUs / 4, but not less than 2 and no more than "
1224                                << maxCommonWorkers << " = " << commonWorkersLimit;
1225     }
1226     ASSERT(commonWorkersLimit > 0);
1227 
1228     exclusiveWorkersLimit = std::min(stackful_coroutines::MAX_WORKERS_COUNT - commonWorkersLimit, eWorkersLimit);
1229 
1230     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager(): EWorkers limit is set to " << exclusiveWorkersLimit
1231                            << ", when suggested " << eWorkersLimit;
1232 }
1233 }  // namespace ark
1234