• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (c) 2022-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <algorithm>
17 #include "runtime/coroutines/coroutine.h"
18 #include "runtime/coroutines/stackful_coroutine.h"
19 #include "runtime/include/thread_scopes.h"
20 #include "libpandabase/macros.h"
21 #include "runtime/include/runtime.h"
22 #include "runtime/include/runtime_notification.h"
23 #include "runtime/include/panda_vm.h"
24 #include "runtime/coroutines/stackful_coroutine_manager.h"
25 
26 namespace ark {
27 
AllocCoroutineStack()28 uint8_t *StackfulCoroutineManager::AllocCoroutineStack()
29 {
30     Pool stackPool = PoolManager::GetMmapMemPool()->AllocPool<OSPagesAllocPolicy::NO_POLICY>(
31         coroStackSizeBytes_, SpaceType::SPACE_TYPE_NATIVE_STACKS, AllocatorType::NATIVE_STACKS_ALLOCATOR);
32     return static_cast<uint8_t *>(stackPool.GetMem());
33 }
34 
FreeCoroutineStack(uint8_t * stack)35 void StackfulCoroutineManager::FreeCoroutineStack(uint8_t *stack)
36 {
37     if (stack != nullptr) {
38         PoolManager::GetMmapMemPool()->FreePool(stack, coroStackSizeBytes_);
39     }
40 }
41 
CreateWorkers(size_t howMany,Runtime * runtime,PandaVM * vm)42 void StackfulCoroutineManager::CreateWorkers(size_t howMany, Runtime *runtime, PandaVM *vm)
43 {
44     auto allocator = Runtime::GetCurrent()->GetInternalAllocator();
45     bool isStatsEnabled = stats_.IsEnabled();
46 
47     auto *wMain =
48         allocator->New<StackfulCoroutineWorker>(runtime, vm, this, StackfulCoroutineWorker::ScheduleLoopType::FIBER,
49                                                 "[main] worker 0", stackful_coroutines::MAIN_WORKER_ID);
50     if (isStatsEnabled) {
51         wMain->GetPerfStats().Enable();
52     }
53     workers_.push_back(wMain);
54     ASSERT(workers_[stackful_coroutines::MAIN_WORKER_ID] == wMain);
55     ASSERT(wMain->GetId() == stackful_coroutines::MAIN_WORKER_ID);
56 
57     for (uint32_t i = 1; i < howMany; ++i) {
58         auto *w = allocator->New<StackfulCoroutineWorker>(
59             runtime, vm, this, StackfulCoroutineWorker::ScheduleLoopType::THREAD, "worker " + ToPandaString(i), i);
60         if (isStatsEnabled) {
61             w->GetPerfStats().Enable();
62         }
63         workers_.push_back(w);
64         ASSERT(workers_[i] == w);
65         ASSERT(w->GetId() == i);
66     }
67 
68     auto *mainCo = CreateMainCoroutine(runtime, vm);
69     mainCo->GetContext<StackfulCoroutineContext>()->SetWorker(wMain);
70     Coroutine::SetCurrent(mainCo);
71     activeWorkersCount_ = 1;  // 1 is for MAIN
72 
73     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::CreateWorkers(): waiting for workers startup";
74     while (activeWorkersCount_ < howMany) {
75         // NOTE(konstanting, #I67QXC): need timed wait?..
76         workersCv_.Wait(&workersLock_);
77     }
78 }
79 
OnWorkerShutdown()80 void StackfulCoroutineManager::OnWorkerShutdown()
81 {
82     os::memory::LockHolder lock(workersLock_);
83     --activeWorkersCount_;
84     workersCv_.Signal();
85     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::OnWorkerShutdown(): COMPLETED, workers left = "
86                            << activeWorkersCount_;
87 }
88 
OnWorkerStartup()89 void StackfulCoroutineManager::OnWorkerStartup()
90 {
91     os::memory::LockHolder lock(workersLock_);
92     ++activeWorkersCount_;
93     workersCv_.Signal();
94     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::OnWorkerStartup(): COMPLETED, active workers = "
95                            << activeWorkersCount_;
96 }
97 
DisableCoroutineSwitch()98 void StackfulCoroutineManager::DisableCoroutineSwitch()
99 {
100     GetCurrentWorker()->DisableCoroutineSwitch();
101 }
102 
EnableCoroutineSwitch()103 void StackfulCoroutineManager::EnableCoroutineSwitch()
104 {
105     GetCurrentWorker()->EnableCoroutineSwitch();
106 }
107 
IsCoroutineSwitchDisabled()108 bool StackfulCoroutineManager::IsCoroutineSwitchDisabled()
109 {
110     return GetCurrentWorker()->IsCoroutineSwitchDisabled();
111 }
112 
Initialize(CoroutineManagerConfig config,Runtime * runtime,PandaVM * vm)113 void StackfulCoroutineManager::Initialize(CoroutineManagerConfig config, Runtime *runtime, PandaVM *vm)
114 {
115     // enable stats collection if needed
116     if (config.enablePerfStats) {
117         stats_.Enable();
118     }
119     ScopedCoroutineStats s(&stats_, CoroutineTimeStats::INIT);
120     // set limits
121     coroStackSizeBytes_ = Runtime::GetCurrent()->GetOptions().GetCoroutineStackSizePages() * os::mem::GetPageSize();
122     if (coroStackSizeBytes_ != AlignUp(coroStackSizeBytes_, PANDA_POOL_ALIGNMENT_IN_BYTES)) {
123         size_t alignmentPages = PANDA_POOL_ALIGNMENT_IN_BYTES / os::mem::GetPageSize();
124         LOG(FATAL, COROUTINES) << "Coroutine stack size should be >= " << alignmentPages
125                                << " pages and should be aligned to " << alignmentPages << "-page boundary!";
126     }
127     size_t coroStackAreaSizeBytes = Runtime::GetCurrent()->GetOptions().GetCoroutinesStackMemLimit();
128     coroutineCountLimit_ = coroStackAreaSizeBytes / coroStackSizeBytes_;
129     jsMode_ = config.emulateJs;
130 
131     // create and activate workers
132     size_t numberOfAvailableCores = std::max(std::thread::hardware_concurrency() / 4ULL, 2ULL);
133     size_t targetNumberOfWorkers =
134         (config.workersCount == CoroutineManagerConfig::WORKERS_COUNT_AUTO)
135             ? std::min(numberOfAvailableCores, stackful_coroutines::MAX_WORKERS_COUNT)
136             : std::min(static_cast<size_t>(config.workersCount), stackful_coroutines::MAX_WORKERS_COUNT);
137     if (config.workersCount == CoroutineManagerConfig::WORKERS_COUNT_AUTO) {
138         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager(): AUTO mode selected, will set number of coroutine "
139                                   "workers to number of CPUs / 4, but not less than 2 = "
140                                << targetNumberOfWorkers;
141     }
142     os::memory::LockHolder lock(workersLock_);
143     CreateWorkers(targetNumberOfWorkers, runtime, vm);
144     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager(): successfully created and activated " << workers_.size()
145                            << " coroutine workers";
146     programCompletionEvent_ = Runtime::GetCurrent()->GetInternalAllocator()->New<GenericEvent>(this);
147 }
148 
Finalize()149 void StackfulCoroutineManager::Finalize()
150 {
151     os::memory::LockHolder lock(coroPoolLock_);
152 
153     auto allocator = Runtime::GetCurrent()->GetInternalAllocator();
154     allocator->Delete(programCompletionEvent_);
155     for (auto *co : coroutinePool_) {
156         co->DestroyInternalResources();
157         CoroutineManager::DestroyEntrypointfulCoroutine(co);
158     }
159     coroutinePool_.clear();
160 }
161 
AddToRegistry(Coroutine * co)162 void StackfulCoroutineManager::AddToRegistry(Coroutine *co)
163 {
164     os::memory::LockHolder lock(coroListLock_);
165     co->GetVM()->GetGC()->OnThreadCreate(co);
166     coroutines_.insert(co);
167     coroutineCount_++;
168 }
169 
RemoveFromRegistry(Coroutine * co)170 void StackfulCoroutineManager::RemoveFromRegistry(Coroutine *co)
171 {
172     coroutines_.erase(co);
173     coroutineCount_--;
174 }
175 
RegisterCoroutine(Coroutine * co)176 void StackfulCoroutineManager::RegisterCoroutine(Coroutine *co)
177 {
178     AddToRegistry(co);
179 }
180 
TerminateCoroutine(Coroutine * co)181 bool StackfulCoroutineManager::TerminateCoroutine(Coroutine *co)
182 {
183     if (co->HasManagedEntrypoint()) {
184         // profiling: start interval here, end in ctxswitch after finalization request is done
185         GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::SCH_ALL);
186     } else {
187         // profiling: no need. MAIN and NATIVE EP coros are deleted from the SCHEDULER itself
188         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::TerminateCoroutine(): terminating "
189                                << ((GetExistingWorkersCount() == 0) ? "MAIN..." : "NATIVE EP coro...");
190     }
191 
192     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::TerminateCoroutine() started";
193     co->NativeCodeEnd();
194     co->UpdateStatus(ThreadStatus::TERMINATING);
195 
196     {
197         os::memory::LockHolder lList(coroListLock_);
198         RemoveFromRegistry(co);
199         // We need collect TLAB metrics and clear TLAB before calling the manage thread destructor
200         // because of the possibility heap use after free. This happening when GC starts execute ResetYoungAllocator
201         // method which start iterate set of threads, collect TLAB metrics and clear TLAB. If thread was deleted from
202         // set but we haven't destroyed the thread yet, GC won't collect metrics and can complete TLAB
203         // deletion faster. And when we try to get the TLAB metrics in the destructor of managed thread, we will get
204         // heap use after free
205         co->CollectTLABMetrics();
206         co->ClearTLAB();
207         // DestroyInternalResources()/CleanupInternalResources() must be called in one critical section with
208         // RemoveFromRegistry (under core_list_lock_). This functions transfer cards from coro's post_barrier buffer to
209         // UpdateRemsetThread internally. Situation when cards still remain and UpdateRemsetThread cannot visit the
210         // coro (because it is already removed) must be impossible.
211         if (Runtime::GetOptions().IsUseCoroutinePool() && co->HasManagedEntrypoint()) {
212             co->CleanupInternalResources();
213         } else {
214             co->DestroyInternalResources();
215         }
216         co->UpdateStatus(ThreadStatus::FINISHED);
217     }
218     Runtime::GetCurrent()->GetNotificationManager()->ThreadEndEvent(co);
219 
220     if (co->HasManagedEntrypoint()) {
221         CheckProgramCompletion();
222         GetCurrentWorker()->RequestFinalization(co);
223     } else if (co->HasNativeEntrypoint()) {
224         GetCurrentWorker()->RequestFinalization(co);
225     } else {
226         // entrypointless and NOT native: e.g. MAIN
227         // (do nothing, as entrypointless coroutines should be destroyed manually)
228     }
229 
230     return false;
231 }
232 
GetActiveWorkersCount() const233 size_t StackfulCoroutineManager::GetActiveWorkersCount() const
234 {
235     os::memory::LockHolder lkWorkers(workersLock_);
236     return activeWorkersCount_;
237 }
238 
GetExistingWorkersCount() const239 size_t StackfulCoroutineManager::GetExistingWorkersCount() const
240 {
241     os::memory::LockHolder lkWorkers(workersLock_);
242     return workers_.size();
243 }
244 
CheckProgramCompletion()245 void StackfulCoroutineManager::CheckProgramCompletion()
246 {
247     os::memory::LockHolder lkCompletion(programCompletionLock_);
248 
249     size_t activeWorkerCoros = GetActiveWorkersCount();
250     if (coroutineCount_ == 1 + activeWorkerCoros) {  // 1 here is for MAIN
251         LOG(DEBUG, COROUTINES)
252             << "StackfulCoroutineManager::CheckProgramCompletion(): all coroutines finished execution!";
253         // programCompletionEvent_ acts as a stackful-friendly cond var
254         programCompletionEvent_->Happen();
255     } else {
256         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::CheckProgramCompletion(): still "
257                                << coroutineCount_ - 1 - activeWorkerCoros << " coroutines exist...";
258     }
259 }
260 
CreateCoroutineContext(bool coroHasEntrypoint)261 CoroutineContext *StackfulCoroutineManager::CreateCoroutineContext(bool coroHasEntrypoint)
262 {
263     return CreateCoroutineContextImpl(coroHasEntrypoint);
264 }
265 
DeleteCoroutineContext(CoroutineContext * ctx)266 void StackfulCoroutineManager::DeleteCoroutineContext(CoroutineContext *ctx)
267 {
268     FreeCoroutineStack(static_cast<StackfulCoroutineContext *>(ctx)->GetStackLoAddrPtr());
269     Runtime::GetCurrent()->GetInternalAllocator()->Delete(ctx);
270 }
271 
GetCoroutineCount()272 size_t StackfulCoroutineManager::GetCoroutineCount()
273 {
274     return coroutineCount_;
275 }
276 
GetCoroutineCountLimit()277 size_t StackfulCoroutineManager::GetCoroutineCountLimit()
278 {
279     return coroutineCountLimit_;
280 }
281 
Launch(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode)282 Coroutine *StackfulCoroutineManager::Launch(CompletionEvent *completionEvent, Method *entrypoint,
283                                             PandaVector<Value> &&arguments, CoroutineLaunchMode mode)
284 {
285     // profiling: scheduler and launch time
286     ScopedCoroutineStats sSch(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL);
287     ScopedCoroutineStats sLaunch(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::LAUNCH);
288 
289     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Launch started";
290 
291     auto *result = LaunchImpl(completionEvent, entrypoint, std::move(arguments), mode);
292     if (result == nullptr) {
293         ThrowOutOfMemoryError("Launch failed");
294     }
295 
296     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Launch finished";
297     return result;
298 }
299 
Await(CoroutineEvent * awaitee)300 void StackfulCoroutineManager::Await(CoroutineEvent *awaitee)
301 {
302     // profiling
303     ScopedCoroutineStats s(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL);
304 
305     ASSERT(awaitee != nullptr);
306     [[maybe_unused]] auto *waiter = Coroutine::GetCurrent();
307     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Await started by " + waiter->GetName();
308     if (!GetCurrentWorker()->WaitForEvent(awaitee)) {
309         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Await finished (no await happened)";
310         return;
311     }
312     // NB: at this point the awaitee is likely already deleted
313     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Await finished by " + waiter->GetName();
314 }
315 
UnblockWaiters(CoroutineEvent * blocker)316 void StackfulCoroutineManager::UnblockWaiters(CoroutineEvent *blocker)
317 {
318     // profiling: this function can be called either independently or as a path of some other SCH sequence,
319     // hence using the "weak" stats collector
320     ScopedCoroutineStats s(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL, true);
321 
322     os::memory::LockHolder lkWorkers(workersLock_);
323     ASSERT(blocker != nullptr);
324 #ifndef NDEBUG
325     {
326         os::memory::LockHolder lkBlocker(*blocker);
327         ASSERT(blocker->Happened());
328     }
329 #endif
330 
331     for (auto *w : workers_) {
332         w->UnblockWaiters(blocker);
333     }
334 }
335 
Schedule()336 void StackfulCoroutineManager::Schedule()
337 {
338     // profiling
339     ScopedCoroutineStats s(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL);
340 
341     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Schedule() request from "
342                            << Coroutine::GetCurrent()->GetName();
343     GetCurrentWorker()->RequestSchedule();
344 }
345 
EnumerateThreadsImpl(const ThreadManager::Callback & cb,unsigned int incMask,unsigned int xorMask) const346 bool StackfulCoroutineManager::EnumerateThreadsImpl(const ThreadManager::Callback &cb, unsigned int incMask,
347                                                     unsigned int xorMask) const
348 {
349     os::memory::LockHolder lock(coroListLock_);
350     for (auto *t : coroutines_) {
351         if (!ApplyCallbackToThread(cb, t, incMask, xorMask)) {
352             return false;
353         }
354     }
355     return true;
356 }
357 
SuspendAllThreads()358 void StackfulCoroutineManager::SuspendAllThreads()
359 {
360     os::memory::LockHolder lock(coroListLock_);
361     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::SuspendAllThreads started";
362     for (auto *t : coroutines_) {
363         t->SuspendImpl(true);
364     }
365     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::SuspendAllThreads finished";
366 }
367 
ResumeAllThreads()368 void StackfulCoroutineManager::ResumeAllThreads()
369 {
370     os::memory::LockHolder lock(coroListLock_);
371     for (auto *t : coroutines_) {
372         t->ResumeImpl(true);
373     }
374 }
375 
IsRunningThreadExist()376 bool StackfulCoroutineManager::IsRunningThreadExist()
377 {
378     UNREACHABLE();
379     // NOTE(konstanting): correct implementation. Which coroutine do we consider running?
380     return false;
381 }
382 
WaitForDeregistration()383 void StackfulCoroutineManager::WaitForDeregistration()
384 {
385     // profiling: start interval here, end in ctxswitch (if needed)
386     GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::SCH_ALL);
387     //
388     MainCoroutineCompleted();
389 }
390 
ReuseCoroutineInstance(Coroutine * co,CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,PandaString name)391 void StackfulCoroutineManager::ReuseCoroutineInstance(Coroutine *co, CompletionEvent *completionEvent,
392                                                       Method *entrypoint, PandaVector<Value> &&arguments,
393                                                       PandaString name)
394 {
395     auto *ctx = co->GetContext<CoroutineContext>();
396     co->ReInitialize(std::move(name), ctx,
397                      Coroutine::ManagedEntrypointInfo {completionEvent, entrypoint, std::move(arguments)});
398 }
399 
TryGetCoroutineFromPool()400 Coroutine *StackfulCoroutineManager::TryGetCoroutineFromPool()
401 {
402     os::memory::LockHolder lkPool(coroPoolLock_);
403     if (coroutinePool_.empty()) {
404         return nullptr;
405     }
406     Coroutine *co = coroutinePool_.back();
407     coroutinePool_.pop_back();
408     return co;
409 }
410 
ChooseWorkerForCoroutine(Coroutine * co)411 StackfulCoroutineWorker *StackfulCoroutineManager::ChooseWorkerForCoroutine(Coroutine *co)
412 {
413     ASSERT(co != nullptr);
414     // currently this function does only the initial worker appointment
415     // but eventually it will support coroutine migration too
416 
417     auto maskValue = co->GetContext<StackfulCoroutineContext>()->GetAffinityMask();
418     std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> affinityBits(maskValue);
419     LOG(DEBUG, COROUTINES) << "Choosing worker for coro " << co->GetName() << " with affinity mask = " << affinityBits;
420 
421     // choosing the least loaded worker from the allowed worker set
422     auto preferFirstOverSecond = [&affinityBits](const StackfulCoroutineWorker *first,
423                                                  const StackfulCoroutineWorker *second) {
424         if (!affinityBits.test(first->GetId())) {
425             return false;
426         }
427         if (!affinityBits.test(second->GetId())) {
428             return true;
429         }
430         return first->GetLoadFactor() < second->GetLoadFactor();
431     };
432 
433     os::memory::LockHolder lkWorkers(workersLock_);
434 #ifndef NDEBUG
435     LOG(DEBUG, COROUTINES) << "Evaluating load factors:";
436     for (auto w : workers_) {
437         LOG(DEBUG, COROUTINES) << w->GetName() << ": LF = " << w->GetLoadFactor();
438     }
439 #endif
440     auto wIt = std::min_element(workers_.begin(), workers_.end(), preferFirstOverSecond);
441     LOG(DEBUG, COROUTINES) << "Chose worker: " << (*wIt)->GetName();
442     return *wIt;
443 }
444 
CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode mode)445 stackful_coroutines::AffinityMask StackfulCoroutineManager::CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode mode)
446 {
447     /**
448      * launch mode \ policy      DEFAULT                         NON_MAIN
449      *   DEFAULT                ->least busy, allow migration   ->least busy, allow migration, disallow <main>
450      *   SAME_WORKER            ->same, forbid migration        ->same, forbid migration
451      *   MAIN_WORKER            ->main, forbid migration        ->main, forbid migration
452      *   EXCLUSIVE              ->least busy, forbid migration  ->least busy, forbid migration, disallow <main>
453      */
454 
455     if (mode == CoroutineLaunchMode::SAME_WORKER) {
456         std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> mask(stackful_coroutines::AFFINITY_MASK_NONE);
457         mask.set(GetCurrentWorker()->GetId());
458         return mask.to_ullong();
459     }
460 
461     if (mode == CoroutineLaunchMode::MAIN_WORKER) {
462         std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> mask(stackful_coroutines::AFFINITY_MASK_NONE);
463         mask.set(stackful_coroutines::MAIN_WORKER_ID);
464         return mask.to_ullong();
465     }
466 
467     // CoroutineLaunchMode::EXCLUSIVE is not supported yet (but will be)
468     ASSERT(mode == CoroutineLaunchMode::DEFAULT);
469 
470     std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> mask(stackful_coroutines::AFFINITY_MASK_FULL);
471     switch (GetSchedulingPolicy()) {
472         case CoroutineSchedulingPolicy::NON_MAIN_WORKER: {
473             mask.reset(stackful_coroutines::MAIN_WORKER_ID);
474             break;
475         }
476         default:
477         case CoroutineSchedulingPolicy::DEFAULT:
478             break;
479     }
480     return mask.to_ullong();
481 }
482 
LaunchImpl(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode)483 Coroutine *StackfulCoroutineManager::LaunchImpl(CompletionEvent *completionEvent, Method *entrypoint,
484                                                 PandaVector<Value> &&arguments, CoroutineLaunchMode mode)
485 {
486 #ifndef NDEBUG
487     GetCurrentWorker()->PrintRunnables("LaunchImpl begin");
488 #endif
489     auto coroName = entrypoint->GetFullName();
490 
491     Coroutine *co = nullptr;
492     if (Runtime::GetOptions().IsUseCoroutinePool()) {
493         co = TryGetCoroutineFromPool();
494     }
495     if (co != nullptr) {
496         ReuseCoroutineInstance(co, completionEvent, entrypoint, std::move(arguments), std::move(coroName));
497     } else {
498         co = CreateCoroutineInstance(completionEvent, entrypoint, std::move(arguments), std::move(coroName));
499     }
500     if (co == nullptr) {
501         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::LaunchImpl: failed to create a coroutine!";
502         return co;
503     }
504     Runtime::GetCurrent()->GetNotificationManager()->ThreadStartEvent(co);
505 
506     auto affinityMask = CalcAffinityMaskFromLaunchMode(mode);
507     co->GetContext<StackfulCoroutineContext>()->SetAffinityMask(affinityMask);
508     auto *w = ChooseWorkerForCoroutine(co);
509     w->AddRunnableCoroutine(co, IsJsMode());
510 
511 #ifndef NDEBUG
512     GetCurrentWorker()->PrintRunnables("LaunchImpl end");
513 #endif
514     return co;
515 }
516 
DumpCoroutineStats() const517 void StackfulCoroutineManager::DumpCoroutineStats() const
518 {
519     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager: dumping performance statistics...";
520     os::memory::LockHolder lock(workersLock_);
521     PandaVector<CoroutineWorkerStats *> wstats;
522     for (auto *worker : workers_) {
523         worker->GetPerfStats().Disable();
524         wstats.push_back(&worker->GetPerfStats());
525     }
526     std::cout << "=== Coroutine statistics begin ===" << std::endl;
527     std::cout << stats_.GetFullStatistics(std::move(wstats));
528     std::cout << "=== Coroutine statistics end ===" << std::endl;
529     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager: performance statistics dumped successfully.";
530 }
531 
WaitForNonMainCoroutinesCompletion()532 void StackfulCoroutineManager::WaitForNonMainCoroutinesCompletion()
533 {
534     os::memory::LockHolder lkCompletion(programCompletionLock_);
535     auto *main = Coroutine::GetCurrent();
536     while (coroutineCount_ > 1 + GetActiveWorkersCount()) {  // 1 is for MAIN
537         programCompletionEvent_->SetNotHappened();
538         programCompletionEvent_->Lock();
539         programCompletionLock_.Unlock();
540         ScopedManagedCodeThread s(main);  // perf?
541         GetCurrentWorker()->WaitForEvent(programCompletionEvent_);
542         LOG(DEBUG, COROUTINES)
543             << "StackfulCoroutineManager::WaitForNonMainCoroutinesCompletion(): possibly spurious wakeup from wait...";
544         // NOTE(konstanting, #I67QXC): test for the spurious wakeup
545         programCompletionLock_.Lock();
546     }
547     ASSERT(coroutineCount_ == (1 + GetActiveWorkersCount()));
548 }
549 
MainCoroutineCompleted()550 void StackfulCoroutineManager::MainCoroutineCompleted()
551 {
552     // precondition: MAIN is already in the native mode
553     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): STARTED";
554     // block till only schedule loop coroutines are present
555     LOG(DEBUG, COROUTINES)
556         << "StackfulCoroutineManager::MainCoroutineCompleted(): waiting for other coroutines to complete";
557     WaitForNonMainCoroutinesCompletion();
558     // NOTE(konstanting, #I67QXC): correct state transitions for MAIN
559     GetCurrentContext()->MainThreadFinished();
560     GetCurrentContext()->EnterAwaitLoop();
561 
562     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): stopping workers";
563     {
564         os::memory::LockHolder lock(workersLock_);
565         for (auto *worker : workers_) {
566             worker->SetActive(false);
567         }
568         while (activeWorkersCount_ > 1) {  // 1 is for MAIN
569             // profiling: the SCH interval is expected to be started after the ctxswitch
570             GetCurrentWorker()->GetPerfStats().FinishInterval(CoroutineTimeStats::SCH_ALL);
571             // NOTE(konstanting, #I67QXC): need timed wait?..
572             workersCv_.Wait(&workersLock_);
573             // profiling: we don't want to profile the sleeping state
574             GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::SCH_ALL);
575         }
576     }
577 
578     LOG(DEBUG, COROUTINES)
579         << "StackfulCoroutineManager::MainCoroutineCompleted(): stopping await loop on the main worker";
580     while (coroutineCount_ > 1) {
581         GetCurrentWorker()->FinalizeFiberScheduleLoop();
582     }
583     // profiling: the SCH interval is expected to be started after the ctxswitch
584     GetCurrentWorker()->GetPerfStats().FinishInterval(CoroutineTimeStats::SCH_ALL);
585 
586     if (stats_.IsEnabled()) {
587         DumpCoroutineStats();
588     }
589     stats_.Disable();
590 
591     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): deleting workers";
592     {
593         os::memory::LockHolder lkWorkers(workersLock_);
594         for (auto *worker : workers_) {
595             Runtime::GetCurrent()->GetInternalAllocator()->Delete(worker);
596         }
597         workers_.clear();
598     }
599 
600     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): DONE";
601 }
602 
GetCurrentContext()603 StackfulCoroutineContext *StackfulCoroutineManager::GetCurrentContext()
604 {
605     auto *co = Coroutine::GetCurrent();
606     return co->GetContext<StackfulCoroutineContext>();
607 }
608 
GetCurrentWorker()609 StackfulCoroutineWorker *StackfulCoroutineManager::GetCurrentWorker()
610 {
611     return GetCurrentContext()->GetWorker();
612 }
613 
IsMainWorker(Coroutine * co) const614 bool StackfulCoroutineManager::IsMainWorker(Coroutine *co) const
615 {
616     auto *worker = co->GetContext<StackfulCoroutineContext>()->GetWorker();
617     return worker->GetId() == stackful_coroutines::MAIN_WORKER_ID;
618 }
619 
IsJsMode()620 bool StackfulCoroutineManager::IsJsMode()
621 {
622     return jsMode_;
623 }
624 
DestroyEntrypointfulCoroutine(Coroutine * co)625 void StackfulCoroutineManager::DestroyEntrypointfulCoroutine(Coroutine *co)
626 {
627     if (Runtime::GetOptions().IsUseCoroutinePool() && co->HasManagedEntrypoint()) {
628         co->CleanUp();
629         os::memory::LockHolder lock(coroPoolLock_);
630         coroutinePool_.push_back(co);
631     } else {
632         CoroutineManager::DestroyEntrypointfulCoroutine(co);
633     }
634 }
635 
CreateCoroutineContextImpl(bool needStack)636 StackfulCoroutineContext *StackfulCoroutineManager::CreateCoroutineContextImpl(bool needStack)
637 {
638     uint8_t *stack = nullptr;
639     size_t stackSizeBytes = 0;
640     if (needStack) {
641         stack = AllocCoroutineStack();
642         if (stack == nullptr) {
643             return nullptr;
644         }
645         stackSizeBytes = coroStackSizeBytes_;
646     }
647     return Runtime::GetCurrent()->GetInternalAllocator()->New<StackfulCoroutineContext>(stack, stackSizeBytes);
648 }
649 
CreateNativeCoroutine(Runtime * runtime,PandaVM * vm,Coroutine::NativeEntrypointInfo::NativeEntrypointFunc entry,void * param,PandaString name)650 Coroutine *StackfulCoroutineManager::CreateNativeCoroutine(Runtime *runtime, PandaVM *vm,
651                                                            Coroutine::NativeEntrypointInfo::NativeEntrypointFunc entry,
652                                                            void *param, PandaString name)
653 {
654     if (GetCoroutineCount() >= GetCoroutineCountLimit()) {
655         // resource limit reached
656         return nullptr;
657     }
658     StackfulCoroutineContext *ctx = CreateCoroutineContextImpl(true);
659     if (ctx == nullptr) {
660         // do not proceed if we cannot create a context for the new coroutine
661         return nullptr;
662     }
663     auto *co = GetCoroutineFactory()(runtime, vm, std::move(name), ctx, Coroutine::NativeEntrypointInfo(entry, param));
664     ASSERT(co != nullptr);
665 
666     // Let's assume that even the "native" coroutine can eventually try to execute some managed code.
667     // In that case pre/post barrier buffers are necessary.
668     co->InitBuffers();
669     return co;
670 }
671 
DestroyNativeCoroutine(Coroutine * co)672 void StackfulCoroutineManager::DestroyNativeCoroutine(Coroutine *co)
673 {
674     DestroyEntrypointlessCoroutine(co);
675 }
676 
677 }  // namespace ark
678