• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (c) 2022-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <algorithm>
17 #include "runtime/coroutines/coroutine.h"
18 #include "runtime/coroutines/stackful_coroutine.h"
19 #include "runtime/include/thread_scopes.h"
20 #include "libpandabase/macros.h"
21 #include "runtime/include/runtime.h"
22 #include "runtime/include/runtime_notification.h"
23 #include "runtime/include/panda_vm.h"
24 #include "runtime/coroutines/stackful_coroutine_manager.h"
25 
26 namespace ark {
27 
AllocCoroutineStack()28 uint8_t *StackfulCoroutineManager::AllocCoroutineStack()
29 {
30     Pool stackPool = PoolManager::GetMmapMemPool()->AllocPool<OSPagesAllocPolicy::NO_POLICY>(
31         coroStackSizeBytes_, SpaceType::SPACE_TYPE_NATIVE_STACKS, AllocatorType::NATIVE_STACKS_ALLOCATOR);
32     return static_cast<uint8_t *>(stackPool.GetMem());
33 }
34 
FreeCoroutineStack(uint8_t * stack)35 void StackfulCoroutineManager::FreeCoroutineStack(uint8_t *stack)
36 {
37     if (stack != nullptr) {
38         PoolManager::GetMmapMemPool()->FreePool(stack, coroStackSizeBytes_);
39     }
40 }
41 
CreateWorkers(size_t howMany,Runtime * runtime,PandaVM * vm)42 void StackfulCoroutineManager::CreateWorkers(size_t howMany, Runtime *runtime, PandaVM *vm)
43 {
44     auto allocator = Runtime::GetCurrent()->GetInternalAllocator();
45     bool isStatsEnabled = stats_.IsEnabled();
46 
47     auto *wMain =
48         allocator->New<StackfulCoroutineWorker>(runtime, vm, this, StackfulCoroutineWorker::ScheduleLoopType::FIBER,
49                                                 "[main] worker 0", stackful_coroutines::MAIN_WORKER_ID);
50     if (isStatsEnabled) {
51         wMain->GetPerfStats().Enable();
52     }
53     workers_.push_back(wMain);
54     ASSERT(workers_[stackful_coroutines::MAIN_WORKER_ID] == wMain);
55     ASSERT(wMain->GetId() == stackful_coroutines::MAIN_WORKER_ID);
56 
57     for (uint32_t i = 1; i < howMany; ++i) {
58         auto *w = allocator->New<StackfulCoroutineWorker>(
59             runtime, vm, this, StackfulCoroutineWorker::ScheduleLoopType::THREAD, "worker " + ToPandaString(i), i);
60         if (isStatsEnabled) {
61             w->GetPerfStats().Enable();
62         }
63         workers_.push_back(w);
64         ASSERT(workers_[i] == w);
65         ASSERT(w->GetId() == i);
66     }
67 
68     auto *mainCo = CreateMainCoroutine(runtime, vm);
69     mainCo->GetContext<StackfulCoroutineContext>()->SetWorker(wMain);
70     Coroutine::SetCurrent(mainCo);
71     activeWorkersCount_ = 1;  // 1 is for MAIN
72 
73     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::CreateWorkers(): waiting for workers startup";
74     while (activeWorkersCount_ < howMany) {
75         // NOTE(konstanting, #I67QXC): need timed wait?..
76         workersCv_.Wait(&workersLock_);
77     }
78 }
79 
OnWorkerShutdown()80 void StackfulCoroutineManager::OnWorkerShutdown()
81 {
82     os::memory::LockHolder lock(workersLock_);
83     --activeWorkersCount_;
84     workersCv_.Signal();
85     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::OnWorkerShutdown(): COMPLETED, workers left = "
86                            << activeWorkersCount_;
87 }
88 
OnWorkerStartup()89 void StackfulCoroutineManager::OnWorkerStartup()
90 {
91     os::memory::LockHolder lock(workersLock_);
92     ++activeWorkersCount_;
93     workersCv_.Signal();
94     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::OnWorkerStartup(): COMPLETED, active workers = "
95                            << activeWorkersCount_;
96 }
97 
DisableCoroutineSwitch()98 void StackfulCoroutineManager::DisableCoroutineSwitch()
99 {
100     GetCurrentWorker()->DisableCoroutineSwitch();
101 }
102 
EnableCoroutineSwitch()103 void StackfulCoroutineManager::EnableCoroutineSwitch()
104 {
105     GetCurrentWorker()->EnableCoroutineSwitch();
106 }
107 
IsCoroutineSwitchDisabled()108 bool StackfulCoroutineManager::IsCoroutineSwitchDisabled()
109 {
110     return GetCurrentWorker()->IsCoroutineSwitchDisabled();
111 }
112 
Initialize(CoroutineManagerConfig config,Runtime * runtime,PandaVM * vm)113 void StackfulCoroutineManager::Initialize(CoroutineManagerConfig config, Runtime *runtime, PandaVM *vm)
114 {
115     // enable stats collection if needed
116     if (config.enablePerfStats) {
117         stats_.Enable();
118     }
119     ScopedCoroutineStats s(&stats_, CoroutineTimeStats::INIT);
120     // set limits
121     coroStackSizeBytes_ = Runtime::GetCurrent()->GetOptions().GetCoroutineStackSizePages() * os::mem::GetPageSize();
122     if (coroStackSizeBytes_ != AlignUp(coroStackSizeBytes_, PANDA_POOL_ALIGNMENT_IN_BYTES)) {
123         size_t alignmentPages = PANDA_POOL_ALIGNMENT_IN_BYTES / os::mem::GetPageSize();
124         LOG(FATAL, COROUTINES) << "Coroutine stack size should be >= " << alignmentPages
125                                << " pages and should be aligned to " << alignmentPages << "-page boundary!";
126     }
127     size_t coroStackAreaSizeBytes = Runtime::GetCurrent()->GetOptions().GetCoroutinesStackMemLimit();
128     coroutineCountLimit_ = coroStackAreaSizeBytes / coroStackSizeBytes_;
129     jsMode_ = config.emulateJs;
130 
131     // create and activate workers
132     size_t numberOfAvailableCores = std::max(std::thread::hardware_concurrency() / 4ULL, 2ULL);
133     size_t targetNumberOfWorkers =
134         (config.workersCount == CoroutineManagerConfig::WORKERS_COUNT_AUTO)
135             ? std::min(numberOfAvailableCores, stackful_coroutines::MAX_WORKERS_COUNT)
136             : std::min(static_cast<size_t>(config.workersCount), stackful_coroutines::MAX_WORKERS_COUNT);
137     if (config.workersCount == CoroutineManagerConfig::WORKERS_COUNT_AUTO) {
138         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager(): AUTO mode selected, will set number of coroutine "
139                                   "workers to number of CPUs / 4, but not less than 2 = "
140                                << targetNumberOfWorkers;
141     }
142     os::memory::LockHolder lock(workersLock_);
143     CreateWorkers(targetNumberOfWorkers, runtime, vm);
144     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager(): successfully created and activated " << workers_.size()
145                            << " coroutine workers";
146     programCompletionEvent_ = Runtime::GetCurrent()->GetInternalAllocator()->New<GenericEvent>(this);
147 }
148 
Finalize()149 void StackfulCoroutineManager::Finalize()
150 {
151     os::memory::LockHolder lock(coroPoolLock_);
152 
153     auto allocator = Runtime::GetCurrent()->GetInternalAllocator();
154     allocator->Delete(programCompletionEvent_);
155     for (auto *co : coroutinePool_) {
156         co->DestroyInternalResources();
157         CoroutineManager::DestroyEntrypointfulCoroutine(co);
158     }
159     coroutinePool_.clear();
160 }
161 
AddToRegistry(Coroutine * co)162 void StackfulCoroutineManager::AddToRegistry(Coroutine *co)
163 {
164     os::memory::LockHolder lock(coroListLock_);
165     co->GetVM()->GetGC()->OnThreadCreate(co);
166     coroutines_.insert(co);
167     coroutineCount_++;
168 }
169 
RemoveFromRegistry(Coroutine * co)170 void StackfulCoroutineManager::RemoveFromRegistry(Coroutine *co)
171 {
172     coroutines_.erase(co);
173     coroutineCount_--;
174 }
175 
RegisterCoroutine(Coroutine * co)176 void StackfulCoroutineManager::RegisterCoroutine(Coroutine *co)
177 {
178     AddToRegistry(co);
179 }
180 
TerminateCoroutine(Coroutine * co)181 bool StackfulCoroutineManager::TerminateCoroutine(Coroutine *co)
182 {
183     if (co->HasManagedEntrypoint()) {
184         // profiling: start interval here, end in ctxswitch after finalization request is done
185         GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::SCH_ALL);
186     } else {
187         // profiling: no need. MAIN and NATIVE EP coros are deleted from the SCHEDULER itself
188         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::TerminateCoroutine(): terminating "
189                                << ((GetExistingWorkersCount() == 0) ? "MAIN..." : "NATIVE EP coro...");
190     }
191 
192     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::TerminateCoroutine() started";
193     co->NativeCodeEnd();
194     co->UpdateStatus(ThreadStatus::TERMINATING);
195 
196     {
197         os::memory::LockHolder lList(coroListLock_);
198         RemoveFromRegistry(co);
199         // DestroyInternalResources()/CleanupInternalResources() must be called in one critical section with
200         // RemoveFromRegistry (under core_list_lock_). This functions transfer cards from coro's post_barrier buffer to
201         // UpdateRemsetThread internally. Situation when cards still remain and UpdateRemsetThread cannot visit the
202         // coro (because it is already removed) must be impossible.
203         if (Runtime::GetOptions().IsUseCoroutinePool() && co->HasManagedEntrypoint()) {
204             co->CleanupInternalResources();
205         } else {
206             co->DestroyInternalResources();
207         }
208         co->UpdateStatus(ThreadStatus::FINISHED);
209     }
210     Runtime::GetCurrent()->GetNotificationManager()->ThreadEndEvent(co);
211 
212     if (co->HasManagedEntrypoint()) {
213         CheckProgramCompletion();
214         GetCurrentWorker()->RequestFinalization(co);
215     } else if (co->HasNativeEntrypoint()) {
216         GetCurrentWorker()->RequestFinalization(co);
217     } else {
218         // entrypointless and NOT native: e.g. MAIN
219         // (do nothing, as entrypointless coroutines should be destroyed manually)
220     }
221 
222     return false;
223 }
224 
GetActiveWorkersCount() const225 size_t StackfulCoroutineManager::GetActiveWorkersCount() const
226 {
227     os::memory::LockHolder lkWorkers(workersLock_);
228     return activeWorkersCount_;
229 }
230 
GetExistingWorkersCount() const231 size_t StackfulCoroutineManager::GetExistingWorkersCount() const
232 {
233     os::memory::LockHolder lkWorkers(workersLock_);
234     return workers_.size();
235 }
236 
CheckProgramCompletion()237 void StackfulCoroutineManager::CheckProgramCompletion()
238 {
239     os::memory::LockHolder lkCompletion(programCompletionLock_);
240 
241     size_t activeWorkerCoros = GetActiveWorkersCount();
242     if (coroutineCount_ == 1 + activeWorkerCoros) {  // 1 here is for MAIN
243         LOG(DEBUG, COROUTINES)
244             << "StackfulCoroutineManager::CheckProgramCompletion(): all coroutines finished execution!";
245         // programCompletionEvent_ acts as a stackful-friendly cond var
246         programCompletionEvent_->Happen();
247     } else {
248         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::CheckProgramCompletion(): still "
249                                << coroutineCount_ - 1 - activeWorkerCoros << " coroutines exist...";
250     }
251 }
252 
CreateCoroutineContext(bool coroHasEntrypoint)253 CoroutineContext *StackfulCoroutineManager::CreateCoroutineContext(bool coroHasEntrypoint)
254 {
255     return CreateCoroutineContextImpl(coroHasEntrypoint);
256 }
257 
DeleteCoroutineContext(CoroutineContext * ctx)258 void StackfulCoroutineManager::DeleteCoroutineContext(CoroutineContext *ctx)
259 {
260     FreeCoroutineStack(static_cast<StackfulCoroutineContext *>(ctx)->GetStackLoAddrPtr());
261     Runtime::GetCurrent()->GetInternalAllocator()->Delete(ctx);
262 }
263 
GetCoroutineCount()264 size_t StackfulCoroutineManager::GetCoroutineCount()
265 {
266     return coroutineCount_;
267 }
268 
GetCoroutineCountLimit()269 size_t StackfulCoroutineManager::GetCoroutineCountLimit()
270 {
271     return coroutineCountLimit_;
272 }
273 
Launch(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode)274 Coroutine *StackfulCoroutineManager::Launch(CompletionEvent *completionEvent, Method *entrypoint,
275                                             PandaVector<Value> &&arguments, CoroutineLaunchMode mode)
276 {
277     // profiling: scheduler and launch time
278     ScopedCoroutineStats sSch(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL);
279     ScopedCoroutineStats sLaunch(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::LAUNCH);
280 
281     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Launch started";
282 
283     auto *result = LaunchImpl(completionEvent, entrypoint, std::move(arguments), mode);
284     if (result == nullptr) {
285         ThrowOutOfMemoryError("Launch failed");
286     }
287 
288     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Launch finished";
289     return result;
290 }
291 
Await(CoroutineEvent * awaitee)292 void StackfulCoroutineManager::Await(CoroutineEvent *awaitee)
293 {
294     // profiling
295     ScopedCoroutineStats s(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL);
296 
297     ASSERT(awaitee != nullptr);
298     [[maybe_unused]] auto *waiter = Coroutine::GetCurrent();
299     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Await started by " + waiter->GetName();
300     if (!GetCurrentWorker()->WaitForEvent(awaitee)) {
301         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Await finished (no await happened)";
302         return;
303     }
304     // NB: at this point the awaitee is likely already deleted
305     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Await finished by " + waiter->GetName();
306 }
307 
UnblockWaiters(CoroutineEvent * blocker)308 void StackfulCoroutineManager::UnblockWaiters(CoroutineEvent *blocker)
309 {
310     // profiling: this function can be called either independently or as a path of some other SCH sequence,
311     // hence using the "weak" stats collector
312     ScopedCoroutineStats s(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL, true);
313 
314     os::memory::LockHolder lkWorkers(workersLock_);
315     ASSERT(blocker != nullptr);
316 #ifndef NDEBUG
317     {
318         os::memory::LockHolder lkBlocker(*blocker);
319         ASSERT(blocker->Happened());
320     }
321 #endif
322 
323     for (auto *w : workers_) {
324         w->UnblockWaiters(blocker);
325     }
326 }
327 
Schedule()328 void StackfulCoroutineManager::Schedule()
329 {
330     // profiling
331     ScopedCoroutineStats s(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL);
332 
333     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Schedule() request from "
334                            << Coroutine::GetCurrent()->GetName();
335     GetCurrentWorker()->RequestSchedule();
336 }
337 
EnumerateThreadsImpl(const ThreadManager::Callback & cb,unsigned int incMask,unsigned int xorMask) const338 bool StackfulCoroutineManager::EnumerateThreadsImpl(const ThreadManager::Callback &cb, unsigned int incMask,
339                                                     unsigned int xorMask) const
340 {
341     os::memory::LockHolder lock(coroListLock_);
342     for (auto *t : coroutines_) {
343         if (!ApplyCallbackToThread(cb, t, incMask, xorMask)) {
344             return false;
345         }
346     }
347     return true;
348 }
349 
SuspendAllThreads()350 void StackfulCoroutineManager::SuspendAllThreads()
351 {
352     os::memory::LockHolder lock(coroListLock_);
353     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::SuspendAllThreads started";
354     for (auto *t : coroutines_) {
355         t->SuspendImpl(true);
356     }
357     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::SuspendAllThreads finished";
358 }
359 
ResumeAllThreads()360 void StackfulCoroutineManager::ResumeAllThreads()
361 {
362     os::memory::LockHolder lock(coroListLock_);
363     for (auto *t : coroutines_) {
364         t->ResumeImpl(true);
365     }
366 }
367 
IsRunningThreadExist()368 bool StackfulCoroutineManager::IsRunningThreadExist()
369 {
370     UNREACHABLE();
371     // NOTE(konstanting): correct implementation. Which coroutine do we consider running?
372     return false;
373 }
374 
WaitForDeregistration()375 void StackfulCoroutineManager::WaitForDeregistration()
376 {
377     // profiling: start interval here, end in ctxswitch (if needed)
378     GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::SCH_ALL);
379     //
380     MainCoroutineCompleted();
381 }
382 
ReuseCoroutineInstance(Coroutine * co,CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,PandaString name)383 void StackfulCoroutineManager::ReuseCoroutineInstance(Coroutine *co, CompletionEvent *completionEvent,
384                                                       Method *entrypoint, PandaVector<Value> &&arguments,
385                                                       PandaString name)
386 {
387     auto *ctx = co->GetContext<CoroutineContext>();
388     co->ReInitialize(std::move(name), ctx,
389                      Coroutine::ManagedEntrypointInfo {completionEvent, entrypoint, std::move(arguments)});
390 }
391 
TryGetCoroutineFromPool()392 Coroutine *StackfulCoroutineManager::TryGetCoroutineFromPool()
393 {
394     os::memory::LockHolder lkPool(coroPoolLock_);
395     if (coroutinePool_.empty()) {
396         return nullptr;
397     }
398     Coroutine *co = coroutinePool_.back();
399     coroutinePool_.pop_back();
400     return co;
401 }
402 
ChooseWorkerForCoroutine(Coroutine * co)403 StackfulCoroutineWorker *StackfulCoroutineManager::ChooseWorkerForCoroutine(Coroutine *co)
404 {
405     ASSERT(co != nullptr);
406     // currently this function does only the initial worker appointment
407     // but eventually it will support coroutine migration too
408 
409     auto maskValue = co->GetContext<StackfulCoroutineContext>()->GetAffinityMask();
410     std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> affinityBits(maskValue);
411     LOG(DEBUG, COROUTINES) << "Choosing worker for coro " << co->GetName() << " with affinity mask = " << affinityBits;
412 
413     // choosing the least loaded worker from the allowed worker set
414     auto preferFirstOverSecond = [&affinityBits](const StackfulCoroutineWorker *first,
415                                                  const StackfulCoroutineWorker *second) {
416         if (!affinityBits.test(first->GetId())) {
417             return false;
418         }
419         if (!affinityBits.test(second->GetId())) {
420             return true;
421         }
422         return first->GetLoadFactor() < second->GetLoadFactor();
423     };
424 
425     os::memory::LockHolder lkWorkers(workersLock_);
426 #ifndef NDEBUG
427     LOG(DEBUG, COROUTINES) << "Evaluating load factors:";
428     for (auto w : workers_) {
429         LOG(DEBUG, COROUTINES) << w->GetName() << ": LF = " << w->GetLoadFactor();
430     }
431 #endif
432     auto wIt = std::min_element(workers_.begin(), workers_.end(), preferFirstOverSecond);
433     LOG(DEBUG, COROUTINES) << "Chose worker: " << (*wIt)->GetName();
434     return *wIt;
435 }
436 
CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode mode)437 stackful_coroutines::AffinityMask StackfulCoroutineManager::CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode mode)
438 {
439     /**
440      * launch mode \ policy      DEFAULT                         NON_MAIN
441      *   DEFAULT                ->least busy, allow migration   ->least busy, allow migration, disallow <main>
442      *   SAME_WORKER            ->same, forbid migration        ->same, forbid migration
443      *   EXCLUSIVE              ->least busy, forbid migration  ->least busy, forbid migration, disallow <main>
444      */
445 
446     if (mode == CoroutineLaunchMode::SAME_WORKER) {
447         std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> mask(stackful_coroutines::AFFINITY_MASK_NONE);
448         mask.set(GetCurrentWorker()->GetId());
449         return mask.to_ullong();
450     }
451 
452     // CoroutineLaunchMode::EXCLUSIVE is not supported yet (but will be)
453     ASSERT(mode == CoroutineLaunchMode::DEFAULT);
454 
455     std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> mask(stackful_coroutines::AFFINITY_MASK_FULL);
456     switch (GetSchedulingPolicy()) {
457         case CoroutineSchedulingPolicy::NON_MAIN_WORKER: {
458             mask.reset(stackful_coroutines::MAIN_WORKER_ID);
459             break;
460         }
461         default:
462         case CoroutineSchedulingPolicy::DEFAULT:
463             break;
464     }
465     return mask.to_ullong();
466 }
467 
LaunchImpl(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode)468 Coroutine *StackfulCoroutineManager::LaunchImpl(CompletionEvent *completionEvent, Method *entrypoint,
469                                                 PandaVector<Value> &&arguments, CoroutineLaunchMode mode)
470 {
471 #ifndef NDEBUG
472     GetCurrentWorker()->PrintRunnables("LaunchImpl begin");
473 #endif
474     auto coroName = entrypoint->GetFullName();
475 
476     Coroutine *co = nullptr;
477     if (Runtime::GetOptions().IsUseCoroutinePool()) {
478         co = TryGetCoroutineFromPool();
479     }
480     if (co != nullptr) {
481         ReuseCoroutineInstance(co, completionEvent, entrypoint, std::move(arguments), std::move(coroName));
482     } else {
483         co = CreateCoroutineInstance(completionEvent, entrypoint, std::move(arguments), std::move(coroName));
484     }
485     if (co == nullptr) {
486         LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::LaunchImpl: failed to create a coroutine!";
487         return co;
488     }
489     Runtime::GetCurrent()->GetNotificationManager()->ThreadStartEvent(co);
490 
491     auto affinityMask = CalcAffinityMaskFromLaunchMode(mode);
492     co->GetContext<StackfulCoroutineContext>()->SetAffinityMask(affinityMask);
493     auto *w = ChooseWorkerForCoroutine(co);
494     w->AddRunnableCoroutine(co, IsJsMode());
495 
496 #ifndef NDEBUG
497     GetCurrentWorker()->PrintRunnables("LaunchImpl end");
498 #endif
499     return co;
500 }
501 
DumpCoroutineStats() const502 void StackfulCoroutineManager::DumpCoroutineStats() const
503 {
504     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager: dumping performance statistics...";
505     os::memory::LockHolder lock(workersLock_);
506     PandaVector<CoroutineWorkerStats *> wstats;
507     for (auto *worker : workers_) {
508         worker->GetPerfStats().Disable();
509         wstats.push_back(&worker->GetPerfStats());
510     }
511     std::cout << "=== Coroutine statistics begin ===" << std::endl;
512     std::cout << stats_.GetFullStatistics(std::move(wstats));
513     std::cout << "=== Coroutine statistics end ===" << std::endl;
514     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager: performance statistics dumped successfully.";
515 }
516 
WaitForNonMainCoroutinesCompletion()517 void StackfulCoroutineManager::WaitForNonMainCoroutinesCompletion()
518 {
519     os::memory::LockHolder lkCompletion(programCompletionLock_);
520     auto *main = Coroutine::GetCurrent();
521     while (coroutineCount_ > 1 + GetActiveWorkersCount()) {  // 1 is for MAIN
522         programCompletionEvent_->SetNotHappened();
523         programCompletionEvent_->Lock();
524         programCompletionLock_.Unlock();
525         ScopedManagedCodeThread s(main);  // perf?
526         GetCurrentWorker()->WaitForEvent(programCompletionEvent_);
527         LOG(DEBUG, COROUTINES)
528             << "StackfulCoroutineManager::WaitForNonMainCoroutinesCompletion(): possibly spurious wakeup from wait...";
529         // NOTE(konstanting, #I67QXC): test for the spurious wakeup
530         programCompletionLock_.Lock();
531     }
532     ASSERT(coroutineCount_ == (1 + GetActiveWorkersCount()));
533 }
534 
MainCoroutineCompleted()535 void StackfulCoroutineManager::MainCoroutineCompleted()
536 {
537     // precondition: MAIN is already in the native mode
538     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): STARTED";
539     // block till only schedule loop coroutines are present
540     LOG(DEBUG, COROUTINES)
541         << "StackfulCoroutineManager::MainCoroutineCompleted(): waiting for other coroutines to complete";
542     WaitForNonMainCoroutinesCompletion();
543     // NOTE(konstanting, #I67QXC): correct state transitions for MAIN
544     GetCurrentContext()->MainThreadFinished();
545     GetCurrentContext()->EnterAwaitLoop();
546 
547     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): stopping workers";
548     {
549         os::memory::LockHolder lock(workersLock_);
550         for (auto *worker : workers_) {
551             worker->SetActive(false);
552         }
553         while (activeWorkersCount_ > 1) {  // 1 is for MAIN
554             // profiling: the SCH interval is expected to be started after the ctxswitch
555             GetCurrentWorker()->GetPerfStats().FinishInterval(CoroutineTimeStats::SCH_ALL);
556             // NOTE(konstanting, #I67QXC): need timed wait?..
557             workersCv_.Wait(&workersLock_);
558             // profiling: we don't want to profile the sleeping state
559             GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::SCH_ALL);
560         }
561     }
562 
563     LOG(DEBUG, COROUTINES)
564         << "StackfulCoroutineManager::MainCoroutineCompleted(): stopping await loop on the main worker";
565     while (coroutineCount_ > 1) {
566         GetCurrentWorker()->FinalizeFiberScheduleLoop();
567     }
568     // profiling: the SCH interval is expected to be started after the ctxswitch
569     GetCurrentWorker()->GetPerfStats().FinishInterval(CoroutineTimeStats::SCH_ALL);
570 
571     if (stats_.IsEnabled()) {
572         DumpCoroutineStats();
573     }
574     stats_.Disable();
575 
576     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): deleting workers";
577     {
578         os::memory::LockHolder lkWorkers(workersLock_);
579         for (auto *worker : workers_) {
580             Runtime::GetCurrent()->GetInternalAllocator()->Delete(worker);
581         }
582         workers_.clear();
583     }
584 
585     LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): DONE";
586 }
587 
GetCurrentContext()588 StackfulCoroutineContext *StackfulCoroutineManager::GetCurrentContext()
589 {
590     auto *co = Coroutine::GetCurrent();
591     return co->GetContext<StackfulCoroutineContext>();
592 }
593 
GetCurrentWorker()594 StackfulCoroutineWorker *StackfulCoroutineManager::GetCurrentWorker()
595 {
596     return GetCurrentContext()->GetWorker();
597 }
598 
IsJsMode()599 bool StackfulCoroutineManager::IsJsMode()
600 {
601     return jsMode_;
602 }
603 
DestroyEntrypointfulCoroutine(Coroutine * co)604 void StackfulCoroutineManager::DestroyEntrypointfulCoroutine(Coroutine *co)
605 {
606     if (Runtime::GetOptions().IsUseCoroutinePool() && co->HasManagedEntrypoint()) {
607         co->CleanUp();
608         os::memory::LockHolder lock(coroPoolLock_);
609         coroutinePool_.push_back(co);
610     } else {
611         CoroutineManager::DestroyEntrypointfulCoroutine(co);
612     }
613 }
614 
CreateCoroutineContextImpl(bool needStack)615 StackfulCoroutineContext *StackfulCoroutineManager::CreateCoroutineContextImpl(bool needStack)
616 {
617     uint8_t *stack = nullptr;
618     size_t stackSizeBytes = 0;
619     if (needStack) {
620         stack = AllocCoroutineStack();
621         if (stack == nullptr) {
622             return nullptr;
623         }
624         stackSizeBytes = coroStackSizeBytes_;
625     }
626     return Runtime::GetCurrent()->GetInternalAllocator()->New<StackfulCoroutineContext>(stack, stackSizeBytes);
627 }
628 
CreateNativeCoroutine(Runtime * runtime,PandaVM * vm,Coroutine::NativeEntrypointInfo::NativeEntrypointFunc entry,void * param,PandaString name)629 Coroutine *StackfulCoroutineManager::CreateNativeCoroutine(Runtime *runtime, PandaVM *vm,
630                                                            Coroutine::NativeEntrypointInfo::NativeEntrypointFunc entry,
631                                                            void *param, PandaString name)
632 {
633     if (GetCoroutineCount() >= GetCoroutineCountLimit()) {
634         // resource limit reached
635         return nullptr;
636     }
637     StackfulCoroutineContext *ctx = CreateCoroutineContextImpl(true);
638     if (ctx == nullptr) {
639         // do not proceed if we cannot create a context for the new coroutine
640         return nullptr;
641     }
642     auto *co = GetCoroutineFactory()(runtime, vm, std::move(name), ctx, Coroutine::NativeEntrypointInfo(entry, param));
643     ASSERT(co != nullptr);
644 
645     // Let's assume that even the "native" coroutine can eventually try to execute some managed code.
646     // In that case pre/post barrier buffers are necessary.
647     co->InitBuffers();
648     return co;
649 }
650 
DestroyNativeCoroutine(Coroutine * co)651 void StackfulCoroutineManager::DestroyNativeCoroutine(Coroutine *co)
652 {
653     DestroyEntrypointlessCoroutine(co);
654 }
655 
656 }  // namespace ark
657