1 /**
2 * Copyright (c) 2022-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <algorithm>
17 #include "runtime/coroutines/coroutine.h"
18 #include "runtime/coroutines/stackful_coroutine.h"
19 #include "runtime/include/thread_scopes.h"
20 #include "libpandabase/macros.h"
21 #include "runtime/include/runtime.h"
22 #include "runtime/include/runtime_notification.h"
23 #include "runtime/include/panda_vm.h"
24 #include "runtime/coroutines/stackful_coroutine_manager.h"
25
26 namespace ark {
27
AllocCoroutineStack()28 uint8_t *StackfulCoroutineManager::AllocCoroutineStack()
29 {
30 Pool stackPool = PoolManager::GetMmapMemPool()->AllocPool<OSPagesAllocPolicy::NO_POLICY>(
31 coroStackSizeBytes_, SpaceType::SPACE_TYPE_NATIVE_STACKS, AllocatorType::NATIVE_STACKS_ALLOCATOR);
32 return static_cast<uint8_t *>(stackPool.GetMem());
33 }
34
FreeCoroutineStack(uint8_t * stack)35 void StackfulCoroutineManager::FreeCoroutineStack(uint8_t *stack)
36 {
37 if (stack != nullptr) {
38 PoolManager::GetMmapMemPool()->FreePool(stack, coroStackSizeBytes_);
39 }
40 }
41
CreateWorkers(size_t howMany,Runtime * runtime,PandaVM * vm)42 void StackfulCoroutineManager::CreateWorkers(size_t howMany, Runtime *runtime, PandaVM *vm)
43 {
44 auto allocator = Runtime::GetCurrent()->GetInternalAllocator();
45 bool isStatsEnabled = stats_.IsEnabled();
46
47 auto *wMain =
48 allocator->New<StackfulCoroutineWorker>(runtime, vm, this, StackfulCoroutineWorker::ScheduleLoopType::FIBER,
49 "[main] worker 0", stackful_coroutines::MAIN_WORKER_ID);
50 if (isStatsEnabled) {
51 wMain->GetPerfStats().Enable();
52 }
53 workers_.push_back(wMain);
54 ASSERT(workers_[stackful_coroutines::MAIN_WORKER_ID] == wMain);
55 ASSERT(wMain->GetId() == stackful_coroutines::MAIN_WORKER_ID);
56
57 for (uint32_t i = 1; i < howMany; ++i) {
58 auto *w = allocator->New<StackfulCoroutineWorker>(
59 runtime, vm, this, StackfulCoroutineWorker::ScheduleLoopType::THREAD, "worker " + ToPandaString(i), i);
60 if (isStatsEnabled) {
61 w->GetPerfStats().Enable();
62 }
63 workers_.push_back(w);
64 ASSERT(workers_[i] == w);
65 ASSERT(w->GetId() == i);
66 }
67
68 auto *mainCo = CreateMainCoroutine(runtime, vm);
69 mainCo->GetContext<StackfulCoroutineContext>()->SetWorker(wMain);
70 Coroutine::SetCurrent(mainCo);
71 activeWorkersCount_ = 1; // 1 is for MAIN
72
73 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::CreateWorkers(): waiting for workers startup";
74 while (activeWorkersCount_ < howMany) {
75 // NOTE(konstanting, #I67QXC): need timed wait?..
76 workersCv_.Wait(&workersLock_);
77 }
78 }
79
OnWorkerShutdown()80 void StackfulCoroutineManager::OnWorkerShutdown()
81 {
82 os::memory::LockHolder lock(workersLock_);
83 --activeWorkersCount_;
84 workersCv_.Signal();
85 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::OnWorkerShutdown(): COMPLETED, workers left = "
86 << activeWorkersCount_;
87 }
88
OnWorkerStartup()89 void StackfulCoroutineManager::OnWorkerStartup()
90 {
91 os::memory::LockHolder lock(workersLock_);
92 ++activeWorkersCount_;
93 workersCv_.Signal();
94 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::OnWorkerStartup(): COMPLETED, active workers = "
95 << activeWorkersCount_;
96 }
97
DisableCoroutineSwitch()98 void StackfulCoroutineManager::DisableCoroutineSwitch()
99 {
100 GetCurrentWorker()->DisableCoroutineSwitch();
101 }
102
EnableCoroutineSwitch()103 void StackfulCoroutineManager::EnableCoroutineSwitch()
104 {
105 GetCurrentWorker()->EnableCoroutineSwitch();
106 }
107
IsCoroutineSwitchDisabled()108 bool StackfulCoroutineManager::IsCoroutineSwitchDisabled()
109 {
110 return GetCurrentWorker()->IsCoroutineSwitchDisabled();
111 }
112
Initialize(CoroutineManagerConfig config,Runtime * runtime,PandaVM * vm)113 void StackfulCoroutineManager::Initialize(CoroutineManagerConfig config, Runtime *runtime, PandaVM *vm)
114 {
115 // enable stats collection if needed
116 if (config.enablePerfStats) {
117 stats_.Enable();
118 }
119 ScopedCoroutineStats s(&stats_, CoroutineTimeStats::INIT);
120 // set limits
121 coroStackSizeBytes_ = Runtime::GetCurrent()->GetOptions().GetCoroutineStackSizePages() * os::mem::GetPageSize();
122 if (coroStackSizeBytes_ != AlignUp(coroStackSizeBytes_, PANDA_POOL_ALIGNMENT_IN_BYTES)) {
123 size_t alignmentPages = PANDA_POOL_ALIGNMENT_IN_BYTES / os::mem::GetPageSize();
124 LOG(FATAL, COROUTINES) << "Coroutine stack size should be >= " << alignmentPages
125 << " pages and should be aligned to " << alignmentPages << "-page boundary!";
126 }
127 size_t coroStackAreaSizeBytes = Runtime::GetCurrent()->GetOptions().GetCoroutinesStackMemLimit();
128 coroutineCountLimit_ = coroStackAreaSizeBytes / coroStackSizeBytes_;
129 jsMode_ = config.emulateJs;
130
131 // create and activate workers
132 size_t numberOfAvailableCores = std::max(std::thread::hardware_concurrency() / 4ULL, 2ULL);
133 size_t targetNumberOfWorkers =
134 (config.workersCount == CoroutineManagerConfig::WORKERS_COUNT_AUTO)
135 ? std::min(numberOfAvailableCores, stackful_coroutines::MAX_WORKERS_COUNT)
136 : std::min(static_cast<size_t>(config.workersCount), stackful_coroutines::MAX_WORKERS_COUNT);
137 if (config.workersCount == CoroutineManagerConfig::WORKERS_COUNT_AUTO) {
138 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager(): AUTO mode selected, will set number of coroutine "
139 "workers to number of CPUs / 4, but not less than 2 = "
140 << targetNumberOfWorkers;
141 }
142 os::memory::LockHolder lock(workersLock_);
143 CreateWorkers(targetNumberOfWorkers, runtime, vm);
144 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager(): successfully created and activated " << workers_.size()
145 << " coroutine workers";
146 programCompletionEvent_ = Runtime::GetCurrent()->GetInternalAllocator()->New<GenericEvent>(this);
147 }
148
Finalize()149 void StackfulCoroutineManager::Finalize()
150 {
151 os::memory::LockHolder lock(coroPoolLock_);
152
153 auto allocator = Runtime::GetCurrent()->GetInternalAllocator();
154 allocator->Delete(programCompletionEvent_);
155 for (auto *co : coroutinePool_) {
156 co->DestroyInternalResources();
157 CoroutineManager::DestroyEntrypointfulCoroutine(co);
158 }
159 coroutinePool_.clear();
160 }
161
AddToRegistry(Coroutine * co)162 void StackfulCoroutineManager::AddToRegistry(Coroutine *co)
163 {
164 os::memory::LockHolder lock(coroListLock_);
165 co->GetVM()->GetGC()->OnThreadCreate(co);
166 coroutines_.insert(co);
167 coroutineCount_++;
168 }
169
RemoveFromRegistry(Coroutine * co)170 void StackfulCoroutineManager::RemoveFromRegistry(Coroutine *co)
171 {
172 coroutines_.erase(co);
173 coroutineCount_--;
174 }
175
RegisterCoroutine(Coroutine * co)176 void StackfulCoroutineManager::RegisterCoroutine(Coroutine *co)
177 {
178 AddToRegistry(co);
179 }
180
TerminateCoroutine(Coroutine * co)181 bool StackfulCoroutineManager::TerminateCoroutine(Coroutine *co)
182 {
183 if (co->HasManagedEntrypoint()) {
184 // profiling: start interval here, end in ctxswitch after finalization request is done
185 GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::SCH_ALL);
186 } else {
187 // profiling: no need. MAIN and NATIVE EP coros are deleted from the SCHEDULER itself
188 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::TerminateCoroutine(): terminating "
189 << ((GetExistingWorkersCount() == 0) ? "MAIN..." : "NATIVE EP coro...");
190 }
191
192 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::TerminateCoroutine() started";
193 co->NativeCodeEnd();
194 co->UpdateStatus(ThreadStatus::TERMINATING);
195
196 {
197 os::memory::LockHolder lList(coroListLock_);
198 RemoveFromRegistry(co);
199 // DestroyInternalResources()/CleanupInternalResources() must be called in one critical section with
200 // RemoveFromRegistry (under core_list_lock_). This functions transfer cards from coro's post_barrier buffer to
201 // UpdateRemsetThread internally. Situation when cards still remain and UpdateRemsetThread cannot visit the
202 // coro (because it is already removed) must be impossible.
203 if (Runtime::GetOptions().IsUseCoroutinePool() && co->HasManagedEntrypoint()) {
204 co->CleanupInternalResources();
205 } else {
206 co->DestroyInternalResources();
207 }
208 co->UpdateStatus(ThreadStatus::FINISHED);
209 }
210 Runtime::GetCurrent()->GetNotificationManager()->ThreadEndEvent(co);
211
212 if (co->HasManagedEntrypoint()) {
213 CheckProgramCompletion();
214 GetCurrentWorker()->RequestFinalization(co);
215 } else if (co->HasNativeEntrypoint()) {
216 GetCurrentWorker()->RequestFinalization(co);
217 } else {
218 // entrypointless and NOT native: e.g. MAIN
219 // (do nothing, as entrypointless coroutines should be destroyed manually)
220 }
221
222 return false;
223 }
224
GetActiveWorkersCount() const225 size_t StackfulCoroutineManager::GetActiveWorkersCount() const
226 {
227 os::memory::LockHolder lkWorkers(workersLock_);
228 return activeWorkersCount_;
229 }
230
GetExistingWorkersCount() const231 size_t StackfulCoroutineManager::GetExistingWorkersCount() const
232 {
233 os::memory::LockHolder lkWorkers(workersLock_);
234 return workers_.size();
235 }
236
CheckProgramCompletion()237 void StackfulCoroutineManager::CheckProgramCompletion()
238 {
239 os::memory::LockHolder lkCompletion(programCompletionLock_);
240
241 size_t activeWorkerCoros = GetActiveWorkersCount();
242 if (coroutineCount_ == 1 + activeWorkerCoros) { // 1 here is for MAIN
243 LOG(DEBUG, COROUTINES)
244 << "StackfulCoroutineManager::CheckProgramCompletion(): all coroutines finished execution!";
245 // programCompletionEvent_ acts as a stackful-friendly cond var
246 programCompletionEvent_->Happen();
247 } else {
248 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::CheckProgramCompletion(): still "
249 << coroutineCount_ - 1 - activeWorkerCoros << " coroutines exist...";
250 }
251 }
252
CreateCoroutineContext(bool coroHasEntrypoint)253 CoroutineContext *StackfulCoroutineManager::CreateCoroutineContext(bool coroHasEntrypoint)
254 {
255 return CreateCoroutineContextImpl(coroHasEntrypoint);
256 }
257
DeleteCoroutineContext(CoroutineContext * ctx)258 void StackfulCoroutineManager::DeleteCoroutineContext(CoroutineContext *ctx)
259 {
260 FreeCoroutineStack(static_cast<StackfulCoroutineContext *>(ctx)->GetStackLoAddrPtr());
261 Runtime::GetCurrent()->GetInternalAllocator()->Delete(ctx);
262 }
263
GetCoroutineCount()264 size_t StackfulCoroutineManager::GetCoroutineCount()
265 {
266 return coroutineCount_;
267 }
268
GetCoroutineCountLimit()269 size_t StackfulCoroutineManager::GetCoroutineCountLimit()
270 {
271 return coroutineCountLimit_;
272 }
273
Launch(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode)274 Coroutine *StackfulCoroutineManager::Launch(CompletionEvent *completionEvent, Method *entrypoint,
275 PandaVector<Value> &&arguments, CoroutineLaunchMode mode)
276 {
277 // profiling: scheduler and launch time
278 ScopedCoroutineStats sSch(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL);
279 ScopedCoroutineStats sLaunch(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::LAUNCH);
280
281 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Launch started";
282
283 auto *result = LaunchImpl(completionEvent, entrypoint, std::move(arguments), mode);
284 if (result == nullptr) {
285 ThrowOutOfMemoryError("Launch failed");
286 }
287
288 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Launch finished";
289 return result;
290 }
291
Await(CoroutineEvent * awaitee)292 void StackfulCoroutineManager::Await(CoroutineEvent *awaitee)
293 {
294 // profiling
295 ScopedCoroutineStats s(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL);
296
297 ASSERT(awaitee != nullptr);
298 [[maybe_unused]] auto *waiter = Coroutine::GetCurrent();
299 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Await started by " + waiter->GetName();
300 if (!GetCurrentWorker()->WaitForEvent(awaitee)) {
301 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Await finished (no await happened)";
302 return;
303 }
304 // NB: at this point the awaitee is likely already deleted
305 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Await finished by " + waiter->GetName();
306 }
307
UnblockWaiters(CoroutineEvent * blocker)308 void StackfulCoroutineManager::UnblockWaiters(CoroutineEvent *blocker)
309 {
310 // profiling: this function can be called either independently or as a path of some other SCH sequence,
311 // hence using the "weak" stats collector
312 ScopedCoroutineStats s(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL, true);
313
314 os::memory::LockHolder lkWorkers(workersLock_);
315 ASSERT(blocker != nullptr);
316 #ifndef NDEBUG
317 {
318 os::memory::LockHolder lkBlocker(*blocker);
319 ASSERT(blocker->Happened());
320 }
321 #endif
322
323 for (auto *w : workers_) {
324 w->UnblockWaiters(blocker);
325 }
326 }
327
Schedule()328 void StackfulCoroutineManager::Schedule()
329 {
330 // profiling
331 ScopedCoroutineStats s(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL);
332
333 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Schedule() request from "
334 << Coroutine::GetCurrent()->GetName();
335 GetCurrentWorker()->RequestSchedule();
336 }
337
EnumerateThreadsImpl(const ThreadManager::Callback & cb,unsigned int incMask,unsigned int xorMask) const338 bool StackfulCoroutineManager::EnumerateThreadsImpl(const ThreadManager::Callback &cb, unsigned int incMask,
339 unsigned int xorMask) const
340 {
341 os::memory::LockHolder lock(coroListLock_);
342 for (auto *t : coroutines_) {
343 if (!ApplyCallbackToThread(cb, t, incMask, xorMask)) {
344 return false;
345 }
346 }
347 return true;
348 }
349
SuspendAllThreads()350 void StackfulCoroutineManager::SuspendAllThreads()
351 {
352 os::memory::LockHolder lock(coroListLock_);
353 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::SuspendAllThreads started";
354 for (auto *t : coroutines_) {
355 t->SuspendImpl(true);
356 }
357 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::SuspendAllThreads finished";
358 }
359
ResumeAllThreads()360 void StackfulCoroutineManager::ResumeAllThreads()
361 {
362 os::memory::LockHolder lock(coroListLock_);
363 for (auto *t : coroutines_) {
364 t->ResumeImpl(true);
365 }
366 }
367
IsRunningThreadExist()368 bool StackfulCoroutineManager::IsRunningThreadExist()
369 {
370 UNREACHABLE();
371 // NOTE(konstanting): correct implementation. Which coroutine do we consider running?
372 return false;
373 }
374
WaitForDeregistration()375 void StackfulCoroutineManager::WaitForDeregistration()
376 {
377 // profiling: start interval here, end in ctxswitch (if needed)
378 GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::SCH_ALL);
379 //
380 MainCoroutineCompleted();
381 }
382
ReuseCoroutineInstance(Coroutine * co,CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,PandaString name)383 void StackfulCoroutineManager::ReuseCoroutineInstance(Coroutine *co, CompletionEvent *completionEvent,
384 Method *entrypoint, PandaVector<Value> &&arguments,
385 PandaString name)
386 {
387 auto *ctx = co->GetContext<CoroutineContext>();
388 co->ReInitialize(std::move(name), ctx,
389 Coroutine::ManagedEntrypointInfo {completionEvent, entrypoint, std::move(arguments)});
390 }
391
TryGetCoroutineFromPool()392 Coroutine *StackfulCoroutineManager::TryGetCoroutineFromPool()
393 {
394 os::memory::LockHolder lkPool(coroPoolLock_);
395 if (coroutinePool_.empty()) {
396 return nullptr;
397 }
398 Coroutine *co = coroutinePool_.back();
399 coroutinePool_.pop_back();
400 return co;
401 }
402
ChooseWorkerForCoroutine(Coroutine * co)403 StackfulCoroutineWorker *StackfulCoroutineManager::ChooseWorkerForCoroutine(Coroutine *co)
404 {
405 ASSERT(co != nullptr);
406 // currently this function does only the initial worker appointment
407 // but eventually it will support coroutine migration too
408
409 auto maskValue = co->GetContext<StackfulCoroutineContext>()->GetAffinityMask();
410 std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> affinityBits(maskValue);
411 LOG(DEBUG, COROUTINES) << "Choosing worker for coro " << co->GetName() << " with affinity mask = " << affinityBits;
412
413 // choosing the least loaded worker from the allowed worker set
414 auto preferFirstOverSecond = [&affinityBits](const StackfulCoroutineWorker *first,
415 const StackfulCoroutineWorker *second) {
416 if (!affinityBits.test(first->GetId())) {
417 return false;
418 }
419 if (!affinityBits.test(second->GetId())) {
420 return true;
421 }
422 return first->GetLoadFactor() < second->GetLoadFactor();
423 };
424
425 os::memory::LockHolder lkWorkers(workersLock_);
426 #ifndef NDEBUG
427 LOG(DEBUG, COROUTINES) << "Evaluating load factors:";
428 for (auto w : workers_) {
429 LOG(DEBUG, COROUTINES) << w->GetName() << ": LF = " << w->GetLoadFactor();
430 }
431 #endif
432 auto wIt = std::min_element(workers_.begin(), workers_.end(), preferFirstOverSecond);
433 LOG(DEBUG, COROUTINES) << "Chose worker: " << (*wIt)->GetName();
434 return *wIt;
435 }
436
CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode mode)437 stackful_coroutines::AffinityMask StackfulCoroutineManager::CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode mode)
438 {
439 /**
440 * launch mode \ policy DEFAULT NON_MAIN
441 * DEFAULT ->least busy, allow migration ->least busy, allow migration, disallow <main>
442 * SAME_WORKER ->same, forbid migration ->same, forbid migration
443 * EXCLUSIVE ->least busy, forbid migration ->least busy, forbid migration, disallow <main>
444 */
445
446 if (mode == CoroutineLaunchMode::SAME_WORKER) {
447 std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> mask(stackful_coroutines::AFFINITY_MASK_NONE);
448 mask.set(GetCurrentWorker()->GetId());
449 return mask.to_ullong();
450 }
451
452 // CoroutineLaunchMode::EXCLUSIVE is not supported yet (but will be)
453 ASSERT(mode == CoroutineLaunchMode::DEFAULT);
454
455 std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> mask(stackful_coroutines::AFFINITY_MASK_FULL);
456 switch (GetSchedulingPolicy()) {
457 case CoroutineSchedulingPolicy::NON_MAIN_WORKER: {
458 mask.reset(stackful_coroutines::MAIN_WORKER_ID);
459 break;
460 }
461 default:
462 case CoroutineSchedulingPolicy::DEFAULT:
463 break;
464 }
465 return mask.to_ullong();
466 }
467
LaunchImpl(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode)468 Coroutine *StackfulCoroutineManager::LaunchImpl(CompletionEvent *completionEvent, Method *entrypoint,
469 PandaVector<Value> &&arguments, CoroutineLaunchMode mode)
470 {
471 #ifndef NDEBUG
472 GetCurrentWorker()->PrintRunnables("LaunchImpl begin");
473 #endif
474 auto coroName = entrypoint->GetFullName();
475
476 Coroutine *co = nullptr;
477 if (Runtime::GetOptions().IsUseCoroutinePool()) {
478 co = TryGetCoroutineFromPool();
479 }
480 if (co != nullptr) {
481 ReuseCoroutineInstance(co, completionEvent, entrypoint, std::move(arguments), std::move(coroName));
482 } else {
483 co = CreateCoroutineInstance(completionEvent, entrypoint, std::move(arguments), std::move(coroName));
484 }
485 if (co == nullptr) {
486 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::LaunchImpl: failed to create a coroutine!";
487 return co;
488 }
489 Runtime::GetCurrent()->GetNotificationManager()->ThreadStartEvent(co);
490
491 auto affinityMask = CalcAffinityMaskFromLaunchMode(mode);
492 co->GetContext<StackfulCoroutineContext>()->SetAffinityMask(affinityMask);
493 auto *w = ChooseWorkerForCoroutine(co);
494 w->AddRunnableCoroutine(co, IsJsMode());
495
496 #ifndef NDEBUG
497 GetCurrentWorker()->PrintRunnables("LaunchImpl end");
498 #endif
499 return co;
500 }
501
DumpCoroutineStats() const502 void StackfulCoroutineManager::DumpCoroutineStats() const
503 {
504 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager: dumping performance statistics...";
505 os::memory::LockHolder lock(workersLock_);
506 PandaVector<CoroutineWorkerStats *> wstats;
507 for (auto *worker : workers_) {
508 worker->GetPerfStats().Disable();
509 wstats.push_back(&worker->GetPerfStats());
510 }
511 std::cout << "=== Coroutine statistics begin ===" << std::endl;
512 std::cout << stats_.GetFullStatistics(std::move(wstats));
513 std::cout << "=== Coroutine statistics end ===" << std::endl;
514 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager: performance statistics dumped successfully.";
515 }
516
WaitForNonMainCoroutinesCompletion()517 void StackfulCoroutineManager::WaitForNonMainCoroutinesCompletion()
518 {
519 os::memory::LockHolder lkCompletion(programCompletionLock_);
520 auto *main = Coroutine::GetCurrent();
521 while (coroutineCount_ > 1 + GetActiveWorkersCount()) { // 1 is for MAIN
522 programCompletionEvent_->SetNotHappened();
523 programCompletionEvent_->Lock();
524 programCompletionLock_.Unlock();
525 ScopedManagedCodeThread s(main); // perf?
526 GetCurrentWorker()->WaitForEvent(programCompletionEvent_);
527 LOG(DEBUG, COROUTINES)
528 << "StackfulCoroutineManager::WaitForNonMainCoroutinesCompletion(): possibly spurious wakeup from wait...";
529 // NOTE(konstanting, #I67QXC): test for the spurious wakeup
530 programCompletionLock_.Lock();
531 }
532 ASSERT(coroutineCount_ == (1 + GetActiveWorkersCount()));
533 }
534
MainCoroutineCompleted()535 void StackfulCoroutineManager::MainCoroutineCompleted()
536 {
537 // precondition: MAIN is already in the native mode
538 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): STARTED";
539 // block till only schedule loop coroutines are present
540 LOG(DEBUG, COROUTINES)
541 << "StackfulCoroutineManager::MainCoroutineCompleted(): waiting for other coroutines to complete";
542 WaitForNonMainCoroutinesCompletion();
543 // NOTE(konstanting, #I67QXC): correct state transitions for MAIN
544 GetCurrentContext()->MainThreadFinished();
545 GetCurrentContext()->EnterAwaitLoop();
546
547 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): stopping workers";
548 {
549 os::memory::LockHolder lock(workersLock_);
550 for (auto *worker : workers_) {
551 worker->SetActive(false);
552 }
553 while (activeWorkersCount_ > 1) { // 1 is for MAIN
554 // profiling: the SCH interval is expected to be started after the ctxswitch
555 GetCurrentWorker()->GetPerfStats().FinishInterval(CoroutineTimeStats::SCH_ALL);
556 // NOTE(konstanting, #I67QXC): need timed wait?..
557 workersCv_.Wait(&workersLock_);
558 // profiling: we don't want to profile the sleeping state
559 GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::SCH_ALL);
560 }
561 }
562
563 LOG(DEBUG, COROUTINES)
564 << "StackfulCoroutineManager::MainCoroutineCompleted(): stopping await loop on the main worker";
565 while (coroutineCount_ > 1) {
566 GetCurrentWorker()->FinalizeFiberScheduleLoop();
567 }
568 // profiling: the SCH interval is expected to be started after the ctxswitch
569 GetCurrentWorker()->GetPerfStats().FinishInterval(CoroutineTimeStats::SCH_ALL);
570
571 if (stats_.IsEnabled()) {
572 DumpCoroutineStats();
573 }
574 stats_.Disable();
575
576 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): deleting workers";
577 {
578 os::memory::LockHolder lkWorkers(workersLock_);
579 for (auto *worker : workers_) {
580 Runtime::GetCurrent()->GetInternalAllocator()->Delete(worker);
581 }
582 workers_.clear();
583 }
584
585 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): DONE";
586 }
587
GetCurrentContext()588 StackfulCoroutineContext *StackfulCoroutineManager::GetCurrentContext()
589 {
590 auto *co = Coroutine::GetCurrent();
591 return co->GetContext<StackfulCoroutineContext>();
592 }
593
GetCurrentWorker()594 StackfulCoroutineWorker *StackfulCoroutineManager::GetCurrentWorker()
595 {
596 return GetCurrentContext()->GetWorker();
597 }
598
IsJsMode()599 bool StackfulCoroutineManager::IsJsMode()
600 {
601 return jsMode_;
602 }
603
DestroyEntrypointfulCoroutine(Coroutine * co)604 void StackfulCoroutineManager::DestroyEntrypointfulCoroutine(Coroutine *co)
605 {
606 if (Runtime::GetOptions().IsUseCoroutinePool() && co->HasManagedEntrypoint()) {
607 co->CleanUp();
608 os::memory::LockHolder lock(coroPoolLock_);
609 coroutinePool_.push_back(co);
610 } else {
611 CoroutineManager::DestroyEntrypointfulCoroutine(co);
612 }
613 }
614
CreateCoroutineContextImpl(bool needStack)615 StackfulCoroutineContext *StackfulCoroutineManager::CreateCoroutineContextImpl(bool needStack)
616 {
617 uint8_t *stack = nullptr;
618 size_t stackSizeBytes = 0;
619 if (needStack) {
620 stack = AllocCoroutineStack();
621 if (stack == nullptr) {
622 return nullptr;
623 }
624 stackSizeBytes = coroStackSizeBytes_;
625 }
626 return Runtime::GetCurrent()->GetInternalAllocator()->New<StackfulCoroutineContext>(stack, stackSizeBytes);
627 }
628
CreateNativeCoroutine(Runtime * runtime,PandaVM * vm,Coroutine::NativeEntrypointInfo::NativeEntrypointFunc entry,void * param,PandaString name)629 Coroutine *StackfulCoroutineManager::CreateNativeCoroutine(Runtime *runtime, PandaVM *vm,
630 Coroutine::NativeEntrypointInfo::NativeEntrypointFunc entry,
631 void *param, PandaString name)
632 {
633 if (GetCoroutineCount() >= GetCoroutineCountLimit()) {
634 // resource limit reached
635 return nullptr;
636 }
637 StackfulCoroutineContext *ctx = CreateCoroutineContextImpl(true);
638 if (ctx == nullptr) {
639 // do not proceed if we cannot create a context for the new coroutine
640 return nullptr;
641 }
642 auto *co = GetCoroutineFactory()(runtime, vm, std::move(name), ctx, Coroutine::NativeEntrypointInfo(entry, param));
643 ASSERT(co != nullptr);
644
645 // Let's assume that even the "native" coroutine can eventually try to execute some managed code.
646 // In that case pre/post barrier buffers are necessary.
647 co->InitBuffers();
648 return co;
649 }
650
DestroyNativeCoroutine(Coroutine * co)651 void StackfulCoroutineManager::DestroyNativeCoroutine(Coroutine *co)
652 {
653 DestroyEntrypointlessCoroutine(co);
654 }
655
656 } // namespace ark
657