1 /**
2 * Copyright (c) 2022-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <algorithm>
17 #include "runtime/coroutines/coroutine.h"
18 #include "runtime/coroutines/stackful_coroutine.h"
19 #include "runtime/include/thread_scopes.h"
20 #include "libpandabase/macros.h"
21 #include "runtime/include/runtime.h"
22 #include "runtime/include/runtime_notification.h"
23 #include "runtime/include/panda_vm.h"
24 #include "runtime/coroutines/stackful_coroutine_manager.h"
25
26 namespace ark {
27
AllocCoroutineStack()28 uint8_t *StackfulCoroutineManager::AllocCoroutineStack()
29 {
30 Pool stackPool = PoolManager::GetMmapMemPool()->AllocPool<OSPagesAllocPolicy::NO_POLICY>(
31 coroStackSizeBytes_, SpaceType::SPACE_TYPE_NATIVE_STACKS, AllocatorType::NATIVE_STACKS_ALLOCATOR);
32 return static_cast<uint8_t *>(stackPool.GetMem());
33 }
34
FreeCoroutineStack(uint8_t * stack)35 void StackfulCoroutineManager::FreeCoroutineStack(uint8_t *stack)
36 {
37 if (stack != nullptr) {
38 PoolManager::GetMmapMemPool()->FreePool(stack, coroStackSizeBytes_);
39 }
40 }
41
CreateWorkers(size_t howMany,Runtime * runtime,PandaVM * vm)42 void StackfulCoroutineManager::CreateWorkers(size_t howMany, Runtime *runtime, PandaVM *vm)
43 {
44 auto allocator = Runtime::GetCurrent()->GetInternalAllocator();
45 bool isStatsEnabled = stats_.IsEnabled();
46
47 auto *wMain =
48 allocator->New<StackfulCoroutineWorker>(runtime, vm, this, StackfulCoroutineWorker::ScheduleLoopType::FIBER,
49 "[main] worker 0", stackful_coroutines::MAIN_WORKER_ID);
50 if (isStatsEnabled) {
51 wMain->GetPerfStats().Enable();
52 }
53 workers_.push_back(wMain);
54 ASSERT(workers_[stackful_coroutines::MAIN_WORKER_ID] == wMain);
55 ASSERT(wMain->GetId() == stackful_coroutines::MAIN_WORKER_ID);
56
57 for (uint32_t i = 1; i < howMany; ++i) {
58 auto *w = allocator->New<StackfulCoroutineWorker>(
59 runtime, vm, this, StackfulCoroutineWorker::ScheduleLoopType::THREAD, "worker " + ToPandaString(i), i);
60 if (isStatsEnabled) {
61 w->GetPerfStats().Enable();
62 }
63 workers_.push_back(w);
64 ASSERT(workers_[i] == w);
65 ASSERT(w->GetId() == i);
66 }
67
68 auto *mainCo = CreateMainCoroutine(runtime, vm);
69 mainCo->GetContext<StackfulCoroutineContext>()->SetWorker(wMain);
70 Coroutine::SetCurrent(mainCo);
71 activeWorkersCount_ = 1; // 1 is for MAIN
72
73 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::CreateWorkers(): waiting for workers startup";
74 while (activeWorkersCount_ < howMany) {
75 // NOTE(konstanting, #I67QXC): need timed wait?..
76 workersCv_.Wait(&workersLock_);
77 }
78 }
79
OnWorkerShutdown()80 void StackfulCoroutineManager::OnWorkerShutdown()
81 {
82 os::memory::LockHolder lock(workersLock_);
83 --activeWorkersCount_;
84 workersCv_.Signal();
85 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::OnWorkerShutdown(): COMPLETED, workers left = "
86 << activeWorkersCount_;
87 }
88
OnWorkerStartup()89 void StackfulCoroutineManager::OnWorkerStartup()
90 {
91 os::memory::LockHolder lock(workersLock_);
92 ++activeWorkersCount_;
93 workersCv_.Signal();
94 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::OnWorkerStartup(): COMPLETED, active workers = "
95 << activeWorkersCount_;
96 }
97
DisableCoroutineSwitch()98 void StackfulCoroutineManager::DisableCoroutineSwitch()
99 {
100 GetCurrentWorker()->DisableCoroutineSwitch();
101 }
102
EnableCoroutineSwitch()103 void StackfulCoroutineManager::EnableCoroutineSwitch()
104 {
105 GetCurrentWorker()->EnableCoroutineSwitch();
106 }
107
IsCoroutineSwitchDisabled()108 bool StackfulCoroutineManager::IsCoroutineSwitchDisabled()
109 {
110 return GetCurrentWorker()->IsCoroutineSwitchDisabled();
111 }
112
Initialize(CoroutineManagerConfig config,Runtime * runtime,PandaVM * vm)113 void StackfulCoroutineManager::Initialize(CoroutineManagerConfig config, Runtime *runtime, PandaVM *vm)
114 {
115 // enable stats collection if needed
116 if (config.enablePerfStats) {
117 stats_.Enable();
118 }
119 ScopedCoroutineStats s(&stats_, CoroutineTimeStats::INIT);
120 // set limits
121 coroStackSizeBytes_ = Runtime::GetCurrent()->GetOptions().GetCoroutineStackSizePages() * os::mem::GetPageSize();
122 if (coroStackSizeBytes_ != AlignUp(coroStackSizeBytes_, PANDA_POOL_ALIGNMENT_IN_BYTES)) {
123 size_t alignmentPages = PANDA_POOL_ALIGNMENT_IN_BYTES / os::mem::GetPageSize();
124 LOG(FATAL, COROUTINES) << "Coroutine stack size should be >= " << alignmentPages
125 << " pages and should be aligned to " << alignmentPages << "-page boundary!";
126 }
127 size_t coroStackAreaSizeBytes = Runtime::GetCurrent()->GetOptions().GetCoroutinesStackMemLimit();
128 coroutineCountLimit_ = coroStackAreaSizeBytes / coroStackSizeBytes_;
129 jsMode_ = config.emulateJs;
130
131 // create and activate workers
132 size_t numberOfAvailableCores = std::max(std::thread::hardware_concurrency() / 4ULL, 2ULL);
133 size_t targetNumberOfWorkers =
134 (config.workersCount == CoroutineManagerConfig::WORKERS_COUNT_AUTO)
135 ? std::min(numberOfAvailableCores, stackful_coroutines::MAX_WORKERS_COUNT)
136 : std::min(static_cast<size_t>(config.workersCount), stackful_coroutines::MAX_WORKERS_COUNT);
137 if (config.workersCount == CoroutineManagerConfig::WORKERS_COUNT_AUTO) {
138 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager(): AUTO mode selected, will set number of coroutine "
139 "workers to number of CPUs / 4, but not less than 2 = "
140 << targetNumberOfWorkers;
141 }
142 os::memory::LockHolder lock(workersLock_);
143 CreateWorkers(targetNumberOfWorkers, runtime, vm);
144 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager(): successfully created and activated " << workers_.size()
145 << " coroutine workers";
146 programCompletionEvent_ = Runtime::GetCurrent()->GetInternalAllocator()->New<GenericEvent>(this);
147 }
148
Finalize()149 void StackfulCoroutineManager::Finalize()
150 {
151 os::memory::LockHolder lock(coroPoolLock_);
152
153 auto allocator = Runtime::GetCurrent()->GetInternalAllocator();
154 allocator->Delete(programCompletionEvent_);
155 for (auto *co : coroutinePool_) {
156 co->DestroyInternalResources();
157 CoroutineManager::DestroyEntrypointfulCoroutine(co);
158 }
159 coroutinePool_.clear();
160 }
161
AddToRegistry(Coroutine * co)162 void StackfulCoroutineManager::AddToRegistry(Coroutine *co)
163 {
164 os::memory::LockHolder lock(coroListLock_);
165 co->GetVM()->GetGC()->OnThreadCreate(co);
166 coroutines_.insert(co);
167 coroutineCount_++;
168 }
169
RemoveFromRegistry(Coroutine * co)170 void StackfulCoroutineManager::RemoveFromRegistry(Coroutine *co)
171 {
172 coroutines_.erase(co);
173 coroutineCount_--;
174 }
175
RegisterCoroutine(Coroutine * co)176 void StackfulCoroutineManager::RegisterCoroutine(Coroutine *co)
177 {
178 AddToRegistry(co);
179 }
180
TerminateCoroutine(Coroutine * co)181 bool StackfulCoroutineManager::TerminateCoroutine(Coroutine *co)
182 {
183 if (co->HasManagedEntrypoint()) {
184 // profiling: start interval here, end in ctxswitch after finalization request is done
185 GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::SCH_ALL);
186 } else {
187 // profiling: no need. MAIN and NATIVE EP coros are deleted from the SCHEDULER itself
188 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::TerminateCoroutine(): terminating "
189 << ((GetExistingWorkersCount() == 0) ? "MAIN..." : "NATIVE EP coro...");
190 }
191
192 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::TerminateCoroutine() started";
193 co->NativeCodeEnd();
194 co->UpdateStatus(ThreadStatus::TERMINATING);
195
196 {
197 os::memory::LockHolder lList(coroListLock_);
198 RemoveFromRegistry(co);
199 // We need collect TLAB metrics and clear TLAB before calling the manage thread destructor
200 // because of the possibility heap use after free. This happening when GC starts execute ResetYoungAllocator
201 // method which start iterate set of threads, collect TLAB metrics and clear TLAB. If thread was deleted from
202 // set but we haven't destroyed the thread yet, GC won't collect metrics and can complete TLAB
203 // deletion faster. And when we try to get the TLAB metrics in the destructor of managed thread, we will get
204 // heap use after free
205 co->CollectTLABMetrics();
206 co->ClearTLAB();
207 // DestroyInternalResources()/CleanupInternalResources() must be called in one critical section with
208 // RemoveFromRegistry (under core_list_lock_). This functions transfer cards from coro's post_barrier buffer to
209 // UpdateRemsetThread internally. Situation when cards still remain and UpdateRemsetThread cannot visit the
210 // coro (because it is already removed) must be impossible.
211 if (Runtime::GetOptions().IsUseCoroutinePool() && co->HasManagedEntrypoint()) {
212 co->CleanupInternalResources();
213 } else {
214 co->DestroyInternalResources();
215 }
216 co->UpdateStatus(ThreadStatus::FINISHED);
217 }
218 Runtime::GetCurrent()->GetNotificationManager()->ThreadEndEvent(co);
219
220 if (co->HasManagedEntrypoint()) {
221 CheckProgramCompletion();
222 GetCurrentWorker()->RequestFinalization(co);
223 } else if (co->HasNativeEntrypoint()) {
224 GetCurrentWorker()->RequestFinalization(co);
225 } else {
226 // entrypointless and NOT native: e.g. MAIN
227 // (do nothing, as entrypointless coroutines should be destroyed manually)
228 }
229
230 return false;
231 }
232
GetActiveWorkersCount() const233 size_t StackfulCoroutineManager::GetActiveWorkersCount() const
234 {
235 os::memory::LockHolder lkWorkers(workersLock_);
236 return activeWorkersCount_;
237 }
238
GetExistingWorkersCount() const239 size_t StackfulCoroutineManager::GetExistingWorkersCount() const
240 {
241 os::memory::LockHolder lkWorkers(workersLock_);
242 return workers_.size();
243 }
244
CheckProgramCompletion()245 void StackfulCoroutineManager::CheckProgramCompletion()
246 {
247 os::memory::LockHolder lkCompletion(programCompletionLock_);
248
249 size_t activeWorkerCoros = GetActiveWorkersCount();
250 if (coroutineCount_ == 1 + activeWorkerCoros) { // 1 here is for MAIN
251 LOG(DEBUG, COROUTINES)
252 << "StackfulCoroutineManager::CheckProgramCompletion(): all coroutines finished execution!";
253 // programCompletionEvent_ acts as a stackful-friendly cond var
254 programCompletionEvent_->Happen();
255 } else {
256 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::CheckProgramCompletion(): still "
257 << coroutineCount_ - 1 - activeWorkerCoros << " coroutines exist...";
258 }
259 }
260
CreateCoroutineContext(bool coroHasEntrypoint)261 CoroutineContext *StackfulCoroutineManager::CreateCoroutineContext(bool coroHasEntrypoint)
262 {
263 return CreateCoroutineContextImpl(coroHasEntrypoint);
264 }
265
DeleteCoroutineContext(CoroutineContext * ctx)266 void StackfulCoroutineManager::DeleteCoroutineContext(CoroutineContext *ctx)
267 {
268 FreeCoroutineStack(static_cast<StackfulCoroutineContext *>(ctx)->GetStackLoAddrPtr());
269 Runtime::GetCurrent()->GetInternalAllocator()->Delete(ctx);
270 }
271
GetCoroutineCount()272 size_t StackfulCoroutineManager::GetCoroutineCount()
273 {
274 return coroutineCount_;
275 }
276
GetCoroutineCountLimit()277 size_t StackfulCoroutineManager::GetCoroutineCountLimit()
278 {
279 return coroutineCountLimit_;
280 }
281
Launch(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode)282 Coroutine *StackfulCoroutineManager::Launch(CompletionEvent *completionEvent, Method *entrypoint,
283 PandaVector<Value> &&arguments, CoroutineLaunchMode mode)
284 {
285 // profiling: scheduler and launch time
286 ScopedCoroutineStats sSch(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL);
287 ScopedCoroutineStats sLaunch(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::LAUNCH);
288
289 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Launch started";
290
291 auto *result = LaunchImpl(completionEvent, entrypoint, std::move(arguments), mode);
292 if (result == nullptr) {
293 ThrowOutOfMemoryError("Launch failed");
294 }
295
296 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Launch finished";
297 return result;
298 }
299
Await(CoroutineEvent * awaitee)300 void StackfulCoroutineManager::Await(CoroutineEvent *awaitee)
301 {
302 // profiling
303 ScopedCoroutineStats s(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL);
304
305 ASSERT(awaitee != nullptr);
306 [[maybe_unused]] auto *waiter = Coroutine::GetCurrent();
307 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Await started by " + waiter->GetName();
308 if (!GetCurrentWorker()->WaitForEvent(awaitee)) {
309 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Await finished (no await happened)";
310 return;
311 }
312 // NB: at this point the awaitee is likely already deleted
313 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Await finished by " + waiter->GetName();
314 }
315
UnblockWaiters(CoroutineEvent * blocker)316 void StackfulCoroutineManager::UnblockWaiters(CoroutineEvent *blocker)
317 {
318 // profiling: this function can be called either independently or as a path of some other SCH sequence,
319 // hence using the "weak" stats collector
320 ScopedCoroutineStats s(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL, true);
321
322 os::memory::LockHolder lkWorkers(workersLock_);
323 ASSERT(blocker != nullptr);
324 #ifndef NDEBUG
325 {
326 os::memory::LockHolder lkBlocker(*blocker);
327 ASSERT(blocker->Happened());
328 }
329 #endif
330
331 for (auto *w : workers_) {
332 w->UnblockWaiters(blocker);
333 }
334 }
335
Schedule()336 void StackfulCoroutineManager::Schedule()
337 {
338 // profiling
339 ScopedCoroutineStats s(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL);
340
341 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Schedule() request from "
342 << Coroutine::GetCurrent()->GetName();
343 GetCurrentWorker()->RequestSchedule();
344 }
345
EnumerateThreadsImpl(const ThreadManager::Callback & cb,unsigned int incMask,unsigned int xorMask) const346 bool StackfulCoroutineManager::EnumerateThreadsImpl(const ThreadManager::Callback &cb, unsigned int incMask,
347 unsigned int xorMask) const
348 {
349 os::memory::LockHolder lock(coroListLock_);
350 for (auto *t : coroutines_) {
351 if (!ApplyCallbackToThread(cb, t, incMask, xorMask)) {
352 return false;
353 }
354 }
355 return true;
356 }
357
SuspendAllThreads()358 void StackfulCoroutineManager::SuspendAllThreads()
359 {
360 os::memory::LockHolder lock(coroListLock_);
361 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::SuspendAllThreads started";
362 for (auto *t : coroutines_) {
363 t->SuspendImpl(true);
364 }
365 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::SuspendAllThreads finished";
366 }
367
ResumeAllThreads()368 void StackfulCoroutineManager::ResumeAllThreads()
369 {
370 os::memory::LockHolder lock(coroListLock_);
371 for (auto *t : coroutines_) {
372 t->ResumeImpl(true);
373 }
374 }
375
IsRunningThreadExist()376 bool StackfulCoroutineManager::IsRunningThreadExist()
377 {
378 UNREACHABLE();
379 // NOTE(konstanting): correct implementation. Which coroutine do we consider running?
380 return false;
381 }
382
WaitForDeregistration()383 void StackfulCoroutineManager::WaitForDeregistration()
384 {
385 // profiling: start interval here, end in ctxswitch (if needed)
386 GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::SCH_ALL);
387 //
388 MainCoroutineCompleted();
389 }
390
ReuseCoroutineInstance(Coroutine * co,CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,PandaString name)391 void StackfulCoroutineManager::ReuseCoroutineInstance(Coroutine *co, CompletionEvent *completionEvent,
392 Method *entrypoint, PandaVector<Value> &&arguments,
393 PandaString name)
394 {
395 auto *ctx = co->GetContext<CoroutineContext>();
396 co->ReInitialize(std::move(name), ctx,
397 Coroutine::ManagedEntrypointInfo {completionEvent, entrypoint, std::move(arguments)});
398 }
399
TryGetCoroutineFromPool()400 Coroutine *StackfulCoroutineManager::TryGetCoroutineFromPool()
401 {
402 os::memory::LockHolder lkPool(coroPoolLock_);
403 if (coroutinePool_.empty()) {
404 return nullptr;
405 }
406 Coroutine *co = coroutinePool_.back();
407 coroutinePool_.pop_back();
408 return co;
409 }
410
ChooseWorkerForCoroutine(Coroutine * co)411 StackfulCoroutineWorker *StackfulCoroutineManager::ChooseWorkerForCoroutine(Coroutine *co)
412 {
413 ASSERT(co != nullptr);
414 // currently this function does only the initial worker appointment
415 // but eventually it will support coroutine migration too
416
417 auto maskValue = co->GetContext<StackfulCoroutineContext>()->GetAffinityMask();
418 std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> affinityBits(maskValue);
419 LOG(DEBUG, COROUTINES) << "Choosing worker for coro " << co->GetName() << " with affinity mask = " << affinityBits;
420
421 // choosing the least loaded worker from the allowed worker set
422 auto preferFirstOverSecond = [&affinityBits](const StackfulCoroutineWorker *first,
423 const StackfulCoroutineWorker *second) {
424 if (!affinityBits.test(first->GetId())) {
425 return false;
426 }
427 if (!affinityBits.test(second->GetId())) {
428 return true;
429 }
430 return first->GetLoadFactor() < second->GetLoadFactor();
431 };
432
433 os::memory::LockHolder lkWorkers(workersLock_);
434 #ifndef NDEBUG
435 LOG(DEBUG, COROUTINES) << "Evaluating load factors:";
436 for (auto w : workers_) {
437 LOG(DEBUG, COROUTINES) << w->GetName() << ": LF = " << w->GetLoadFactor();
438 }
439 #endif
440 auto wIt = std::min_element(workers_.begin(), workers_.end(), preferFirstOverSecond);
441 LOG(DEBUG, COROUTINES) << "Chose worker: " << (*wIt)->GetName();
442 return *wIt;
443 }
444
CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode mode)445 stackful_coroutines::AffinityMask StackfulCoroutineManager::CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode mode)
446 {
447 /**
448 * launch mode \ policy DEFAULT NON_MAIN
449 * DEFAULT ->least busy, allow migration ->least busy, allow migration, disallow <main>
450 * SAME_WORKER ->same, forbid migration ->same, forbid migration
451 * MAIN_WORKER ->main, forbid migration ->main, forbid migration
452 * EXCLUSIVE ->least busy, forbid migration ->least busy, forbid migration, disallow <main>
453 */
454
455 if (mode == CoroutineLaunchMode::SAME_WORKER) {
456 std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> mask(stackful_coroutines::AFFINITY_MASK_NONE);
457 mask.set(GetCurrentWorker()->GetId());
458 return mask.to_ullong();
459 }
460
461 if (mode == CoroutineLaunchMode::MAIN_WORKER) {
462 std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> mask(stackful_coroutines::AFFINITY_MASK_NONE);
463 mask.set(stackful_coroutines::MAIN_WORKER_ID);
464 return mask.to_ullong();
465 }
466
467 // CoroutineLaunchMode::EXCLUSIVE is not supported yet (but will be)
468 ASSERT(mode == CoroutineLaunchMode::DEFAULT);
469
470 std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> mask(stackful_coroutines::AFFINITY_MASK_FULL);
471 switch (GetSchedulingPolicy()) {
472 case CoroutineSchedulingPolicy::NON_MAIN_WORKER: {
473 mask.reset(stackful_coroutines::MAIN_WORKER_ID);
474 break;
475 }
476 default:
477 case CoroutineSchedulingPolicy::DEFAULT:
478 break;
479 }
480 return mask.to_ullong();
481 }
482
LaunchImpl(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode)483 Coroutine *StackfulCoroutineManager::LaunchImpl(CompletionEvent *completionEvent, Method *entrypoint,
484 PandaVector<Value> &&arguments, CoroutineLaunchMode mode)
485 {
486 #ifndef NDEBUG
487 GetCurrentWorker()->PrintRunnables("LaunchImpl begin");
488 #endif
489 auto coroName = entrypoint->GetFullName();
490
491 Coroutine *co = nullptr;
492 if (Runtime::GetOptions().IsUseCoroutinePool()) {
493 co = TryGetCoroutineFromPool();
494 }
495 if (co != nullptr) {
496 ReuseCoroutineInstance(co, completionEvent, entrypoint, std::move(arguments), std::move(coroName));
497 } else {
498 co = CreateCoroutineInstance(completionEvent, entrypoint, std::move(arguments), std::move(coroName));
499 }
500 if (co == nullptr) {
501 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::LaunchImpl: failed to create a coroutine!";
502 return co;
503 }
504 Runtime::GetCurrent()->GetNotificationManager()->ThreadStartEvent(co);
505
506 auto affinityMask = CalcAffinityMaskFromLaunchMode(mode);
507 co->GetContext<StackfulCoroutineContext>()->SetAffinityMask(affinityMask);
508 auto *w = ChooseWorkerForCoroutine(co);
509 w->AddRunnableCoroutine(co, IsJsMode());
510
511 #ifndef NDEBUG
512 GetCurrentWorker()->PrintRunnables("LaunchImpl end");
513 #endif
514 return co;
515 }
516
DumpCoroutineStats() const517 void StackfulCoroutineManager::DumpCoroutineStats() const
518 {
519 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager: dumping performance statistics...";
520 os::memory::LockHolder lock(workersLock_);
521 PandaVector<CoroutineWorkerStats *> wstats;
522 for (auto *worker : workers_) {
523 worker->GetPerfStats().Disable();
524 wstats.push_back(&worker->GetPerfStats());
525 }
526 std::cout << "=== Coroutine statistics begin ===" << std::endl;
527 std::cout << stats_.GetFullStatistics(std::move(wstats));
528 std::cout << "=== Coroutine statistics end ===" << std::endl;
529 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager: performance statistics dumped successfully.";
530 }
531
WaitForNonMainCoroutinesCompletion()532 void StackfulCoroutineManager::WaitForNonMainCoroutinesCompletion()
533 {
534 os::memory::LockHolder lkCompletion(programCompletionLock_);
535 auto *main = Coroutine::GetCurrent();
536 while (coroutineCount_ > 1 + GetActiveWorkersCount()) { // 1 is for MAIN
537 programCompletionEvent_->SetNotHappened();
538 programCompletionEvent_->Lock();
539 programCompletionLock_.Unlock();
540 ScopedManagedCodeThread s(main); // perf?
541 GetCurrentWorker()->WaitForEvent(programCompletionEvent_);
542 LOG(DEBUG, COROUTINES)
543 << "StackfulCoroutineManager::WaitForNonMainCoroutinesCompletion(): possibly spurious wakeup from wait...";
544 // NOTE(konstanting, #I67QXC): test for the spurious wakeup
545 programCompletionLock_.Lock();
546 }
547 ASSERT(coroutineCount_ == (1 + GetActiveWorkersCount()));
548 }
549
MainCoroutineCompleted()550 void StackfulCoroutineManager::MainCoroutineCompleted()
551 {
552 // precondition: MAIN is already in the native mode
553 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): STARTED";
554 // block till only schedule loop coroutines are present
555 LOG(DEBUG, COROUTINES)
556 << "StackfulCoroutineManager::MainCoroutineCompleted(): waiting for other coroutines to complete";
557 WaitForNonMainCoroutinesCompletion();
558 // NOTE(konstanting, #I67QXC): correct state transitions for MAIN
559 GetCurrentContext()->MainThreadFinished();
560 GetCurrentContext()->EnterAwaitLoop();
561
562 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): stopping workers";
563 {
564 os::memory::LockHolder lock(workersLock_);
565 for (auto *worker : workers_) {
566 worker->SetActive(false);
567 }
568 while (activeWorkersCount_ > 1) { // 1 is for MAIN
569 // profiling: the SCH interval is expected to be started after the ctxswitch
570 GetCurrentWorker()->GetPerfStats().FinishInterval(CoroutineTimeStats::SCH_ALL);
571 // NOTE(konstanting, #I67QXC): need timed wait?..
572 workersCv_.Wait(&workersLock_);
573 // profiling: we don't want to profile the sleeping state
574 GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::SCH_ALL);
575 }
576 }
577
578 LOG(DEBUG, COROUTINES)
579 << "StackfulCoroutineManager::MainCoroutineCompleted(): stopping await loop on the main worker";
580 while (coroutineCount_ > 1) {
581 GetCurrentWorker()->FinalizeFiberScheduleLoop();
582 }
583 // profiling: the SCH interval is expected to be started after the ctxswitch
584 GetCurrentWorker()->GetPerfStats().FinishInterval(CoroutineTimeStats::SCH_ALL);
585
586 if (stats_.IsEnabled()) {
587 DumpCoroutineStats();
588 }
589 stats_.Disable();
590
591 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): deleting workers";
592 {
593 os::memory::LockHolder lkWorkers(workersLock_);
594 for (auto *worker : workers_) {
595 Runtime::GetCurrent()->GetInternalAllocator()->Delete(worker);
596 }
597 workers_.clear();
598 }
599
600 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): DONE";
601 }
602
GetCurrentContext()603 StackfulCoroutineContext *StackfulCoroutineManager::GetCurrentContext()
604 {
605 auto *co = Coroutine::GetCurrent();
606 return co->GetContext<StackfulCoroutineContext>();
607 }
608
GetCurrentWorker()609 StackfulCoroutineWorker *StackfulCoroutineManager::GetCurrentWorker()
610 {
611 return GetCurrentContext()->GetWorker();
612 }
613
IsMainWorker(Coroutine * co) const614 bool StackfulCoroutineManager::IsMainWorker(Coroutine *co) const
615 {
616 auto *worker = co->GetContext<StackfulCoroutineContext>()->GetWorker();
617 return worker->GetId() == stackful_coroutines::MAIN_WORKER_ID;
618 }
619
IsJsMode()620 bool StackfulCoroutineManager::IsJsMode()
621 {
622 return jsMode_;
623 }
624
DestroyEntrypointfulCoroutine(Coroutine * co)625 void StackfulCoroutineManager::DestroyEntrypointfulCoroutine(Coroutine *co)
626 {
627 if (Runtime::GetOptions().IsUseCoroutinePool() && co->HasManagedEntrypoint()) {
628 co->CleanUp();
629 os::memory::LockHolder lock(coroPoolLock_);
630 coroutinePool_.push_back(co);
631 } else {
632 CoroutineManager::DestroyEntrypointfulCoroutine(co);
633 }
634 }
635
CreateCoroutineContextImpl(bool needStack)636 StackfulCoroutineContext *StackfulCoroutineManager::CreateCoroutineContextImpl(bool needStack)
637 {
638 uint8_t *stack = nullptr;
639 size_t stackSizeBytes = 0;
640 if (needStack) {
641 stack = AllocCoroutineStack();
642 if (stack == nullptr) {
643 return nullptr;
644 }
645 stackSizeBytes = coroStackSizeBytes_;
646 }
647 return Runtime::GetCurrent()->GetInternalAllocator()->New<StackfulCoroutineContext>(stack, stackSizeBytes);
648 }
649
CreateNativeCoroutine(Runtime * runtime,PandaVM * vm,Coroutine::NativeEntrypointInfo::NativeEntrypointFunc entry,void * param,PandaString name)650 Coroutine *StackfulCoroutineManager::CreateNativeCoroutine(Runtime *runtime, PandaVM *vm,
651 Coroutine::NativeEntrypointInfo::NativeEntrypointFunc entry,
652 void *param, PandaString name)
653 {
654 if (GetCoroutineCount() >= GetCoroutineCountLimit()) {
655 // resource limit reached
656 return nullptr;
657 }
658 StackfulCoroutineContext *ctx = CreateCoroutineContextImpl(true);
659 if (ctx == nullptr) {
660 // do not proceed if we cannot create a context for the new coroutine
661 return nullptr;
662 }
663 auto *co = GetCoroutineFactory()(runtime, vm, std::move(name), ctx, Coroutine::NativeEntrypointInfo(entry, param));
664 ASSERT(co != nullptr);
665
666 // Let's assume that even the "native" coroutine can eventually try to execute some managed code.
667 // In that case pre/post barrier buffers are necessary.
668 co->InitBuffers();
669 return co;
670 }
671
DestroyNativeCoroutine(Coroutine * co)672 void StackfulCoroutineManager::DestroyNativeCoroutine(Coroutine *co)
673 {
674 DestroyEntrypointlessCoroutine(co);
675 }
676
677 } // namespace ark
678