1 /**
2 * Copyright (c) 2022-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <algorithm>
17 #include <limits>
18 #include "coroutines/stackful_common.h"
19 #include "libpandabase/macros.h"
20 #include "libpandabase/os/time.h"
21 #include "runtime/coroutines/coroutine.h"
22 #include "runtime/coroutines/stackful_coroutine.h"
23 #include "runtime/coroutines/stackful_coroutine_manager.h"
24 #include "runtime/include/panda_vm.h"
25 #include "runtime/include/runtime.h"
26 #include "runtime/include/runtime_notification.h"
27 #include "runtime/include/thread_scopes.h"
28
29 namespace ark {
30
AllocCoroutineStack()31 uint8_t *StackfulCoroutineManager::AllocCoroutineStack()
32 {
33 Pool stackPool = PoolManager::GetMmapMemPool()->AllocPool<OSPagesAllocPolicy::NO_POLICY>(
34 coroStackSizeBytes_, SpaceType::SPACE_TYPE_NATIVE_STACKS, AllocatorType::NATIVE_STACKS_ALLOCATOR);
35 return static_cast<uint8_t *>(stackPool.GetMem());
36 }
37
FreeCoroutineStack(uint8_t * stack)38 void StackfulCoroutineManager::FreeCoroutineStack(uint8_t *stack)
39 {
40 if (stack != nullptr) {
41 PoolManager::GetMmapMemPool()->FreePool(stack, coroStackSizeBytes_);
42 }
43 }
44
CreateMainCoroAndWorkers(size_t howMany,Runtime * runtime,PandaVM * vm)45 void StackfulCoroutineManager::CreateMainCoroAndWorkers(size_t howMany, Runtime *runtime, PandaVM *vm)
46 {
47 auto *wMain = CreateWorker(runtime, vm, StackfulCoroutineWorker::ScheduleLoopType::FIBER, "[main] worker ");
48 ASSERT(wMain->GetId() == stackful_coroutines::MAIN_WORKER_ID);
49
50 auto *mainCo = CreateMainCoroutine(runtime, vm);
51 wMain->AddRunningCoroutine(mainCo);
52 OnWorkerStartupImpl(wMain);
53
54 CreateWorkersImpl(howMany, runtime, vm);
55 }
56
CreateWorkers(size_t howMany,Runtime * runtime,PandaVM * vm)57 void StackfulCoroutineManager::CreateWorkers(size_t howMany, Runtime *runtime, PandaVM *vm)
58 {
59 os::memory::LockHolder lh(workersLock_);
60 CreateWorkersImpl(howMany, runtime, vm);
61 }
62
FinalizeWorkers(size_t howMany,Runtime * runtime,PandaVM * vm)63 void StackfulCoroutineManager::FinalizeWorkers(size_t howMany, Runtime *runtime, PandaVM *vm)
64 {
65 struct EntrypointParam {
66 explicit EntrypointParam(size_t wCount, CoroutineManager *coroMan)
67 : wCount_(wCount), workerFinalizationEvent(coroMan)
68 {
69 }
70
71 // NOLINTBEGIN(misc-non-private-member-variables-in-classes)
72 const size_t wCount_;
73 GenericEvent workerFinalizationEvent;
74 std::atomic<uint32_t> finalizedWorkersCount = 0;
75 // NOLINTEND(misc-non-private-member-variables-in-classes)
76 };
77
78 auto wCountBeforeFinalization = GetActiveWorkersCount();
79 ASSERT(wCountBeforeFinalization > howMany);
80
81 auto entrypointParam = EntrypointParam(howMany, this);
82 auto coroEntryPoint = [](void *param) {
83 auto *finalizee = Coroutine::GetCurrent()->GetContext<StackfulCoroutineContext>()->GetWorker();
84 finalizee->MigrateCoroutines();
85 finalizee->CompleteAllAffinedCoroutines();
86 finalizee->SetActive(false);
87 auto *entryParams = reinterpret_cast<EntrypointParam *>(param);
88 // Atomic with relaxed order reason: synchronization is not required
89 if (entryParams->finalizedWorkersCount.fetch_add(1, std::memory_order_relaxed) == entryParams->wCount_ - 1) {
90 entryParams->workerFinalizationEvent.Happen();
91 }
92 };
93
94 for (auto i = 0U; i < howMany; i++) {
95 auto *finWorker = ChooseWorkerForFinalization();
96 auto *co = CreateNativeCoroutine(runtime, vm, coroEntryPoint, &entrypointParam, "[finalize coro] ",
97 Coroutine::Type::FINALIZER, CoroutinePriority::CRITICAL_PRIORITY);
98 finWorker->AddRunnableCoroutine(co);
99 }
100 entrypointParam.workerFinalizationEvent.Lock();
101
102 Await(&entrypointParam.workerFinalizationEvent);
103
104 os::memory::LockHolder lh(workersLock_);
105 while (activeWorkersCount_ != wCountBeforeFinalization - howMany) {
106 workersCv_.Wait(&workersLock_);
107 }
108 ASSERT(activeWorkersCount_ > 0);
109 }
110
ChooseWorkerForFinalization()111 StackfulCoroutineWorker *StackfulCoroutineManager::ChooseWorkerForFinalization()
112 {
113 os::memory::LockHolder lh(workersLock_);
114 auto finWorkerIt = std::find_if(workers_.begin(), workers_.end(), [](auto &&worker) {
115 return !worker->IsMainWorker() && !worker->IsDisabledForCrossWorkersLaunch() && !worker->InExclusiveMode();
116 });
117 ASSERT(finWorkerIt != workers_.end());
118 (*finWorkerIt)->DisableForCrossWorkersLaunch();
119 return *finWorkerIt;
120 }
121
CreateWorkersImpl(size_t howMany,Runtime * runtime,PandaVM * vm)122 void StackfulCoroutineManager::CreateWorkersImpl(size_t howMany, Runtime *runtime, PandaVM *vm)
123 {
124 if (howMany == 0) {
125 LOG(DEBUG, COROUTINES)
126 << "StackfulCoroutineManager::CreateWorkersImpl():creation of zero workers requested,skipping...";
127 return;
128 }
129 auto wCountBeforeCreation = activeWorkersCount_;
130 for (uint32_t i = 0; i < howMany; ++i) {
131 CreateWorker(runtime, vm, StackfulCoroutineWorker::ScheduleLoopType::THREAD, "worker ");
132 }
133 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::CreateWorkers(): waiting for workers startup";
134 while (activeWorkersCount_ != howMany + wCountBeforeCreation) {
135 // NOTE(konstanting, #IAD5MH): need timed wait?..
136 workersCv_.Wait(&workersLock_);
137 }
138 }
139
CreateWorker(Runtime * runtime,PandaVM * vm,StackfulCoroutineWorker::ScheduleLoopType wType,PandaString workerName)140 StackfulCoroutineWorker *StackfulCoroutineManager::CreateWorker(Runtime *runtime, PandaVM *vm,
141 StackfulCoroutineWorker::ScheduleLoopType wType,
142 PandaString workerName)
143 {
144 auto allocator = runtime->GetInternalAllocator();
145 auto workerId = AllocateWorkerId();
146 workerName += ToPandaString(workerId);
147 auto *worker = allocator->New<StackfulCoroutineWorker>(runtime, vm, this, wType, std::move(workerName), workerId);
148 ASSERT(worker != nullptr);
149 if (stats_.IsEnabled()) {
150 worker->GetPerfStats().Enable();
151 }
152 return worker;
153 }
154
OnWorkerShutdown(StackfulCoroutineWorker * worker)155 void StackfulCoroutineManager::OnWorkerShutdown(StackfulCoroutineWorker *worker)
156 {
157 os::memory::LockHolder lock(workersLock_);
158 auto workerIt = std::find_if(workers_.begin(), workers_.end(), [worker](auto &&w) { return w == worker; });
159 workers_.erase(workerIt);
160 // We may have a problem related to the coroutine affinity mask aliasing (The ABA Problem) #23715:
161 // 1. Finalizing worker was available for the coroutine (the coroutine had a bit set in the affinity mask)
162 // 2. New specific worker (e.g. EAWorker) was created with the same Id
163 // 3. This worker becomes available for the coroutine, but it was not initially expected
164 ReleaseWorkerId(worker->GetId());
165 --activeWorkersCount_;
166 auto &workerStats = worker->GetPerfStats();
167 workerStats.Disable();
168 finalizedWorkerStats_.emplace_back(std::move(workerStats));
169 Runtime::GetCurrent()->GetInternalAllocator()->Delete(worker);
170 workersCv_.Signal();
171 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::OnWorkerShutdown(): COMPLETED, workers left = "
172 << activeWorkersCount_;
173 }
174
OnWorkerStartup(StackfulCoroutineWorker * worker)175 void StackfulCoroutineManager::OnWorkerStartup(StackfulCoroutineWorker *worker)
176 {
177 os::memory::LockHolder lock(workersLock_);
178 OnWorkerStartupImpl(worker);
179 }
180
OnWorkerStartupImpl(StackfulCoroutineWorker * worker)181 void StackfulCoroutineManager::OnWorkerStartupImpl(StackfulCoroutineWorker *worker)
182 {
183 workers_.push_back(worker);
184 ++activeWorkersCount_;
185 workersCv_.Signal();
186 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::OnWorkerStartup(): COMPLETED, active workers = "
187 << activeWorkersCount_;
188 }
189
InitializeWorkerIdAllocator()190 void StackfulCoroutineManager::InitializeWorkerIdAllocator()
191 {
192 os::memory::LockHolder lh(workerIdLock_);
193 for (auto id = stackful_coroutines::MAIN_WORKER_ID; id != stackful_coroutines::MAX_WORKERS_COUNT; ++id) {
194 freeWorkerIds_.push_back(id);
195 }
196 }
197
AllocateWorkerId()198 uint32_t StackfulCoroutineManager::AllocateWorkerId()
199 {
200 os::memory::LockHolder lh(workerIdLock_);
201 ASSERT(!freeWorkerIds_.empty());
202 auto workerId = freeWorkerIds_.front();
203 freeWorkerIds_.pop_front();
204 return workerId;
205 }
206
ReleaseWorkerId(uint32_t workerId)207 void StackfulCoroutineManager::ReleaseWorkerId(uint32_t workerId)
208 {
209 os::memory::LockHolder lh(workerIdLock_);
210 freeWorkerIds_.push_back(workerId);
211 ASSERT(freeWorkerIds_.size() <= stackful_coroutines::MAX_WORKERS_COUNT);
212 }
213
DisableCoroutineSwitch()214 void StackfulCoroutineManager::DisableCoroutineSwitch()
215 {
216 GetCurrentWorker()->DisableCoroutineSwitch();
217 }
218
EnableCoroutineSwitch()219 void StackfulCoroutineManager::EnableCoroutineSwitch()
220 {
221 GetCurrentWorker()->EnableCoroutineSwitch();
222 }
223
IsCoroutineSwitchDisabled()224 bool StackfulCoroutineManager::IsCoroutineSwitchDisabled()
225 {
226 return GetCurrentWorker()->IsCoroutineSwitchDisabled();
227 }
228
Initialize(CoroutineManagerConfig config,Runtime * runtime,PandaVM * vm)229 void StackfulCoroutineManager::Initialize(CoroutineManagerConfig config, Runtime *runtime, PandaVM *vm)
230 {
231 // enable stats collection if needed
232 if (config.enablePerfStats) {
233 stats_.Enable();
234 }
235 ScopedCoroutineStats s(&stats_, CoroutineTimeStats::INIT);
236 // set feature flags
237 enableDrainQueueIface_ = config.enableDrainQueueIface;
238 enableMigration_ = config.enableMigration;
239 migrateAwakenedCoros_ = config.migrateAwakenedCoros;
240
241 // set limits
242 coroStackSizeBytes_ = Runtime::GetCurrent()->GetOptions().GetCoroutineStackSizePages() * os::mem::GetPageSize();
243 if (coroStackSizeBytes_ != AlignUp(coroStackSizeBytes_, PANDA_POOL_ALIGNMENT_IN_BYTES)) {
244 size_t alignmentPages = PANDA_POOL_ALIGNMENT_IN_BYTES / os::mem::GetPageSize();
245 LOG(FATAL, COROUTINES) << "Coroutine stack size should be >= " << alignmentPages
246 << " pages and should be aligned to " << alignmentPages << "-page boundary!";
247 }
248 size_t coroStackAreaSizeBytes = Runtime::GetCurrent()->GetOptions().GetCoroutinesStackMemLimit();
249 coroutineCountLimit_ = coroStackAreaSizeBytes / coroStackSizeBytes_;
250
251 CalculateWorkerLimits(config, exclusiveWorkersLimit_, commonWorkersCount_);
252 ASSERT(commonWorkersCount_ + exclusiveWorkersLimit_ <= stackful_coroutines::MAX_WORKERS_COUNT);
253 InitializeWorkerIdAllocator();
254 {
255 os::memory::LockHolder lock(workersLock_);
256 CreateMainCoroAndWorkers(commonWorkersCount_ - 1, runtime, vm); // 1 is for MAIN here
257 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager(): successfully created and activated " << workers_.size()
258 << " coroutine workers";
259 programCompletionEvent_ = Runtime::GetCurrent()->GetInternalAllocator()->New<GenericEvent>(this);
260 }
261 if (enableMigration_) {
262 StartManagerThread();
263 }
264 }
265
Finalize()266 void StackfulCoroutineManager::Finalize()
267 {
268 os::memory::LockHolder lock(coroPoolLock_);
269
270 auto allocator = Runtime::GetCurrent()->GetInternalAllocator();
271 allocator->Delete(programCompletionEvent_);
272 for (auto *co : coroutinePool_) {
273 co->DestroyInternalResources();
274 CoroutineManager::DestroyEntrypointfulCoroutine(co);
275 }
276 coroutinePool_.clear();
277 }
278
AddToRegistry(Coroutine * co)279 void StackfulCoroutineManager::AddToRegistry(Coroutine *co)
280 {
281 co->GetVM()->GetGC()->OnThreadCreate(co);
282 coroutines_.insert(co);
283 coroutineCount_++;
284 if (co->GetType() != Coroutine::Type::MUTATOR) {
285 systemCoroutineCount_++;
286 }
287 }
288
RemoveFromRegistry(Coroutine * co)289 void StackfulCoroutineManager::RemoveFromRegistry(Coroutine *co)
290 {
291 coroutines_.erase(co);
292 coroutineCount_--;
293 if (co->GetType() != Coroutine::Type::MUTATOR) {
294 systemCoroutineCount_--;
295 }
296 }
297
RegisterCoroutine(Coroutine * co)298 void StackfulCoroutineManager::RegisterCoroutine(Coroutine *co)
299 {
300 os::memory::LockHolder lock(coroListLock_);
301 AddToRegistry(co);
302 // Propagate SUSPEND_REQUEST flag to the new coroutine to avoid the following situation:
303 // * Main coro holds read lock of the MutatorLock.
304 // * GC thread calls SuspendAll nad set SUSPEND_REQUEST flag to the main coro and
305 // tries to acquire write lock of the MutatorLock.
306 // * Main coro creates a new coro and adds it to the coroutines_ list.
307 // * SUSPEND_REQUEST is not set in the new coroutine
308 // * New coro starts execution, acquires read lock of the MutatorLock and enters a long loop
309 // * Main coro checks SUSPEND_REQUEST flag and blocks
310 // * GC will not start becuase the new coro has no SUSPEND_REQUEST flag and it will never release the MutatorLock
311 //
312 // We need to propagate SUSPEND_REQUEST under the coroListLock_.
313 // It guarantees that the flag is already set for the current coro and we need to propagate it
314 // or GC will see the new coro in EnumerateAllThreads.
315 #ifndef ARK_HYBRID
316 if (Thread::GetCurrent() != nullptr && Coroutine::GetCurrent() != nullptr &&
317 Coroutine::GetCurrent()->IsSuspended() && !co->IsSuspended()) {
318 co->SuspendImpl(true);
319 }
320 #endif
321 }
322
TerminateCoroutine(Coroutine * co)323 bool StackfulCoroutineManager::TerminateCoroutine(Coroutine *co)
324 {
325 if (co->HasManagedEntrypoint()) {
326 // profiling: start interval here, end in ctxswitch after finalization request is done
327 GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::SCH_ALL);
328 } else {
329 // profiling: no need. MAIN and NATIVE EP coros are deleted from the SCHEDULER itself
330 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::TerminateCoroutine(): terminating "
331 << ((GetExistingWorkersCount() == 0) ? "MAIN..." : "NATIVE EP coro...");
332 }
333
334 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::TerminateCoroutine() started";
335 co->NativeCodeEnd();
336 co->UpdateStatus(ThreadStatus::TERMINATING);
337
338 {
339 os::memory::LockHolder lList(coroListLock_);
340 RemoveFromRegistry(co);
341 #ifdef ARK_HYBRID
342 co->GetThreadHolder()->UnregisterCoroutine(co);
343 #endif
344 // We need collect TLAB metrics and clear TLAB before calling the manage thread destructor
345 // because of the possibility heap use after free. This happening when GC starts execute ResetYoungAllocator
346 // method which start iterate set of threads, collect TLAB metrics and clear TLAB. If thread was deleted from
347 // set but we haven't destroyed the thread yet, GC won't collect metrics and can complete TLAB
348 // deletion faster. And when we try to get the TLAB metrics in the destructor of managed thread, we will get
349 // heap use after free
350 co->CollectTLABMetrics();
351 co->ClearTLAB();
352 // DestroyInternalResources()/CleanupInternalResources() must be called in one critical section with
353 // RemoveFromRegistry (under core_list_lock_). This functions transfer cards from coro's post_barrier buffer to
354 // UpdateRemsetThread internally. Situation when cards still remain and UpdateRemsetThread cannot visit the
355 // coro (because it is already removed) must be impossible.
356 if (Runtime::GetOptions().IsUseCoroutinePool() && co->HasManagedEntrypoint()) {
357 co->CleanupInternalResources();
358 } else {
359 co->DestroyInternalResources();
360 }
361 co->UpdateStatus(ThreadStatus::FINISHED);
362 }
363 Runtime::GetCurrent()->GetNotificationManager()->ThreadEndEvent(co);
364
365 if (co->HasManagedEntrypoint()) {
366 CheckProgramCompletion();
367 GetCurrentWorker()->RequestFinalization(co);
368 } else if (co->HasNativeEntrypoint()) {
369 GetCurrentWorker()->RequestFinalization(co);
370 } else {
371 // entrypointless and NOT native: e.g. MAIN
372 // (do nothing, as entrypointless coroutines should be destroyed manually)
373 }
374
375 return false;
376 }
377
GetActiveWorkersCount() const378 size_t StackfulCoroutineManager::GetActiveWorkersCount() const
379 {
380 os::memory::LockHolder lkWorkers(workersLock_);
381 return activeWorkersCount_;
382 }
383
GetExistingWorkersCount() const384 size_t StackfulCoroutineManager::GetExistingWorkersCount() const
385 {
386 os::memory::LockHolder lkWorkers(workersLock_);
387 return workers_.size();
388 }
389
CheckProgramCompletion()390 void StackfulCoroutineManager::CheckProgramCompletion()
391 {
392 os::memory::LockHolder lkCompletion(programCompletionLock_);
393
394 size_t activeWorkerCoros = GetActiveWorkersCount();
395 if (coroutineCount_ <= 1 + activeWorkerCoros) { // 1 here is for MAIN
396 LOG(DEBUG, COROUTINES)
397 << "StackfulCoroutineManager::CheckProgramCompletion(): all coroutines finished execution!";
398 // programCompletionEvent_ acts as a stackful-friendly cond var
399 programCompletionEvent_->Happen();
400 } else {
401 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::CheckProgramCompletion(): still "
402 << coroutineCount_ - 1 - activeWorkerCoros << " coroutines exist...";
403 }
404 }
405
CreateCoroutineContext(bool coroHasEntrypoint)406 CoroutineContext *StackfulCoroutineManager::CreateCoroutineContext(bool coroHasEntrypoint)
407 {
408 return CreateCoroutineContextImpl(coroHasEntrypoint);
409 }
410
DeleteCoroutineContext(CoroutineContext * ctx)411 void StackfulCoroutineManager::DeleteCoroutineContext(CoroutineContext *ctx)
412 {
413 FreeCoroutineStack(static_cast<StackfulCoroutineContext *>(ctx)->GetStackLoAddrPtr());
414 Runtime::GetCurrent()->GetInternalAllocator()->Delete(ctx);
415 }
416
GetCoroutineCount()417 size_t StackfulCoroutineManager::GetCoroutineCount()
418 {
419 return coroutineCount_;
420 }
421
GetCoroutineCountLimit()422 size_t StackfulCoroutineManager::GetCoroutineCountLimit()
423 {
424 return coroutineCountLimit_;
425 }
426
Launch(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode,CoroutinePriority priority,bool abortFlag)427 bool StackfulCoroutineManager::Launch(CompletionEvent *completionEvent, Method *entrypoint,
428 PandaVector<Value> &&arguments, CoroutineLaunchMode mode,
429 CoroutinePriority priority, bool abortFlag)
430 {
431 auto epInfo = Coroutine::ManagedEntrypointInfo {completionEvent, entrypoint, std::move(arguments)};
432 return LaunchWithMode(std::move(epInfo), entrypoint->GetFullName(), mode, priority, false, abortFlag);
433 }
434
LaunchImmediately(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode,CoroutinePriority priority,bool abortFlag)435 bool StackfulCoroutineManager::LaunchImmediately(CompletionEvent *completionEvent, Method *entrypoint,
436 PandaVector<Value> &&arguments, CoroutineLaunchMode mode,
437 CoroutinePriority priority, bool abortFlag)
438 {
439 auto epInfo = Coroutine::ManagedEntrypointInfo {completionEvent, entrypoint, std::move(arguments)};
440 return LaunchWithMode(std::move(epInfo), entrypoint->GetFullName(), mode, priority, true, abortFlag);
441 }
442
LaunchNative(NativeEntrypointFunc epFunc,void * param,PandaString coroName,CoroutineLaunchMode mode,CoroutinePriority priority,bool abortFlag)443 bool StackfulCoroutineManager::LaunchNative(NativeEntrypointFunc epFunc, void *param, PandaString coroName,
444 CoroutineLaunchMode mode, CoroutinePriority priority, bool abortFlag)
445 {
446 auto epInfo = Coroutine::NativeEntrypointInfo {epFunc, param};
447 return LaunchWithMode(epInfo, std::move(coroName), mode, priority, false, abortFlag);
448 }
449
Await(CoroutineEvent * awaitee)450 void StackfulCoroutineManager::Await(CoroutineEvent *awaitee)
451 {
452 ASSERT_NATIVE_CODE();
453 // profiling
454 ScopedCoroutineStats s(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL);
455
456 ASSERT(awaitee != nullptr);
457 [[maybe_unused]] auto *waiter = Coroutine::GetCurrent();
458 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Await started by " + waiter->GetName();
459
460 if (awaitee->Happened()) {
461 awaitee->Unlock();
462 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Await finished (no await happened)";
463 return;
464 }
465
466 GetCurrentWorker()->WaitForEvent(awaitee);
467 // NB: at this point the awaitee is likely already deleted
468 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Await finished by " + waiter->GetName();
469 }
470
UnblockWaiters(CoroutineEvent * blocker)471 void StackfulCoroutineManager::UnblockWaiters(CoroutineEvent *blocker)
472 {
473 // profiling: this function can be called either independently or as a path of some other SCH sequence,
474 // hence using the "weak" stats collector
475 ScopedCoroutineStats s(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL, true);
476
477 os::memory::LockHolder lkWorkers(workersLock_);
478 ASSERT(blocker != nullptr);
479 #ifndef NDEBUG
480 {
481 os::memory::LockHolder lkBlocker(*blocker);
482 ASSERT(blocker->Happened());
483 }
484 #endif
485
486 for (auto *w : workers_) {
487 w->UnblockWaiters(blocker);
488 }
489 }
490
Schedule()491 void StackfulCoroutineManager::Schedule()
492 {
493 // profiling
494 ScopedCoroutineStats s(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL);
495
496 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::Schedule() request from "
497 << Coroutine::GetCurrent()->GetName();
498 GetCurrentWorker()->RequestSchedule();
499 }
500
EnumerateThreadsImpl(const ThreadManager::Callback & cb,unsigned int incMask,unsigned int xorMask) const501 bool StackfulCoroutineManager::EnumerateThreadsImpl(const ThreadManager::Callback &cb, unsigned int incMask,
502 unsigned int xorMask) const
503 {
504 os::memory::LockHolder lock(coroListLock_);
505 for (auto *t : coroutines_) {
506 if (!ApplyCallbackToThread(cb, t, incMask, xorMask)) {
507 return false;
508 }
509 }
510 return true;
511 }
512
SuspendAllThreads()513 void StackfulCoroutineManager::SuspendAllThreads()
514 {
515 os::memory::LockHolder lock(coroListLock_);
516 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::SuspendAllThreads started";
517 for (auto *t : coroutines_) {
518 t->SuspendImpl(true);
519 }
520 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::SuspendAllThreads finished";
521 }
522
ResumeAllThreads()523 void StackfulCoroutineManager::ResumeAllThreads()
524 {
525 os::memory::LockHolder lock(coroListLock_);
526 for (auto *t : coroutines_) {
527 t->ResumeImpl(true);
528 }
529 }
530
IsRunningThreadExist()531 bool StackfulCoroutineManager::IsRunningThreadExist()
532 {
533 UNREACHABLE();
534 // NOTE(konstanting): correct implementation. Which coroutine do we consider running?
535 return false;
536 }
537
WaitForDeregistration()538 void StackfulCoroutineManager::WaitForDeregistration()
539 {
540 // profiling: start interval here, end in ctxswitch (if needed)
541 GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::SCH_ALL);
542 //
543 MainCoroutineCompleted();
544 }
545
ReuseCoroutineInstance(Coroutine * co,EntrypointInfo && epInfo,PandaString name,CoroutinePriority priority)546 void StackfulCoroutineManager::ReuseCoroutineInstance(Coroutine *co, EntrypointInfo &&epInfo, PandaString name,
547 CoroutinePriority priority)
548 {
549 auto *ctx = co->GetContext<CoroutineContext>();
550 co->ReInitialize(std::move(name), ctx, std::move(epInfo), priority);
551 }
552
TryGetCoroutineFromPool()553 Coroutine *StackfulCoroutineManager::TryGetCoroutineFromPool()
554 {
555 os::memory::LockHolder lkPool(coroPoolLock_);
556 if (coroutinePool_.empty()) {
557 return nullptr;
558 }
559 Coroutine *co = coroutinePool_.back();
560 coroutinePool_.pop_back();
561 return co;
562 }
563
ChooseWorkerForCoroutine(Coroutine * co)564 StackfulCoroutineWorker *StackfulCoroutineManager::ChooseWorkerForCoroutine(Coroutine *co)
565 {
566 ASSERT(co != nullptr);
567 auto maskValue = co->GetContext<StackfulCoroutineContext>()->GetAffinityMask();
568 return ChooseWorkerImpl(WorkerSelectionPolicy::LEAST_LOADED, maskValue);
569 }
570
CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode mode)571 stackful_coroutines::AffinityMask StackfulCoroutineManager::CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode mode)
572 {
573 /**
574 * launch mode \ policy DEFAULT NON_MAIN
575 * DEFAULT ->least busy, allow migration ->least busy, allow migration, disallow <main>
576 * SAME_WORKER ->same, forbid migration ->same, forbid migration
577 * MAIN_WORKER ->main, forbid migration ->main, forbid migration
578 * EXCLUSIVE ->least busy, forbid migration ->least busy, forbid migration, disallow <main>
579 */
580
581 if (mode == CoroutineLaunchMode::SAME_WORKER) {
582 std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> mask(stackful_coroutines::AFFINITY_MASK_NONE);
583 mask.set(GetCurrentWorker()->GetId());
584 return mask.to_ullong();
585 }
586
587 if (mode == CoroutineLaunchMode::MAIN_WORKER) {
588 std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> mask(stackful_coroutines::AFFINITY_MASK_NONE);
589 mask.set(stackful_coroutines::MAIN_WORKER_ID);
590 return mask.to_ullong();
591 }
592
593 // CoroutineLaunchMode::EXCLUSIVE is not supported yet (but will be)
594 ASSERT(mode == CoroutineLaunchMode::DEFAULT);
595
596 std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> mask(stackful_coroutines::AFFINITY_MASK_FULL);
597 switch (GetSchedulingPolicy()) {
598 case CoroutineSchedulingPolicy::NON_MAIN_WORKER: {
599 mask.reset(stackful_coroutines::MAIN_WORKER_ID);
600 break;
601 }
602 default:
603 case CoroutineSchedulingPolicy::ANY_WORKER:
604 break;
605 }
606 return mask.to_ullong();
607 }
608
GetCoroutineInstanceForLaunch(EntrypointInfo && epInfo,PandaString && coroName,CoroutinePriority priority,stackful_coroutines::AffinityMask affinityMask,bool abortFlag)609 Coroutine *StackfulCoroutineManager::GetCoroutineInstanceForLaunch(EntrypointInfo &&epInfo, PandaString &&coroName,
610 CoroutinePriority priority,
611 stackful_coroutines::AffinityMask affinityMask,
612 bool abortFlag)
613 {
614 Coroutine *co = nullptr;
615 if (Runtime::GetOptions().IsUseCoroutinePool()) {
616 co = TryGetCoroutineFromPool();
617 }
618 if (co != nullptr) {
619 ReuseCoroutineInstance(co, std::move(epInfo), std::move(coroName), priority);
620 } else {
621 co = CreateCoroutineInstance(std::move(epInfo), std::move(coroName), Coroutine::Type::MUTATOR, priority);
622 }
623 if (co == nullptr) {
624 LOG(DEBUG, COROUTINES)
625 << "StackfulCoroutineManager::GetCoroutineInstanceForLaunch: failed to create a coroutine!";
626 return co;
627 }
628 co->SetAbortFlag(abortFlag);
629 Runtime::GetCurrent()->GetNotificationManager()->ThreadStartEvent(co);
630 co->GetContext<StackfulCoroutineContext>()->SetAffinityMask(affinityMask);
631 return co;
632 }
633
LaunchImpl(EntrypointInfo && epInfo,PandaString && coroName,CoroutineLaunchMode mode,CoroutinePriority priority,bool abortFlag)634 bool StackfulCoroutineManager::LaunchImpl(EntrypointInfo &&epInfo, PandaString &&coroName, CoroutineLaunchMode mode,
635 CoroutinePriority priority, bool abortFlag)
636 {
637 #ifndef NDEBUG
638 GetCurrentWorker()->PrintRunnables("LaunchImpl begin");
639 #endif
640 Coroutine *co = nullptr;
641 auto affinityMask = CalcAffinityMaskFromLaunchMode(mode);
642 co = GetCoroutineInstanceForLaunch(std::move(epInfo), std::move(coroName), priority, affinityMask, abortFlag);
643 if (co == nullptr) {
644 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::LaunchImpl: failed to create a coroutine!";
645 return false;
646 }
647 {
648 os::memory::LockHolder lkWorkers(workersLock_);
649 auto *w = ChooseWorkerForCoroutine(co);
650 ASSERT(w != nullptr);
651 w->AddRunnableCoroutine(co);
652 }
653 #ifndef NDEBUG
654 GetCurrentWorker()->PrintRunnables("LaunchImpl end");
655 #endif
656 return true;
657 }
658
LaunchImmediatelyImpl(EntrypointInfo && epInfo,PandaString && coroName,CoroutineLaunchMode mode,CoroutinePriority priority,bool abortFlag)659 bool StackfulCoroutineManager::LaunchImmediatelyImpl(EntrypointInfo &&epInfo, PandaString &&coroName,
660 CoroutineLaunchMode mode, CoroutinePriority priority,
661 bool abortFlag)
662 {
663 Coroutine *co = nullptr;
664 auto affinityMask = CalcAffinityMaskFromLaunchMode(mode);
665
666 ASSERT(affinityMask == CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode::SAME_WORKER));
667
668 co = GetCoroutineInstanceForLaunch(std::move(epInfo), std::move(coroName), priority, affinityMask, abortFlag);
669 if (co == nullptr) {
670 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::LaunchImmediatelyImpl: failed to create a coroutine!";
671 return false;
672 }
673 StackfulCoroutineWorker *w = nullptr;
674 {
675 os::memory::LockHolder lkWorkers(workersLock_);
676 w = ChooseWorkerForCoroutine(co);
677 }
678 ASSERT(w != nullptr);
679 // since we are going to switch the context, we have to close the interval
680 GetCurrentWorker()->GetPerfStats().FinishInterval(CoroutineTimeStats::LAUNCH);
681 co->SetImmediateLauncher(Coroutine::GetCurrent());
682 w->AddCreatedCoroutineAndSwitchToIt(co);
683 // resume the interval once we schedule the original coro again
684 GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::LAUNCH);
685
686 return true;
687 }
688
LaunchWithMode(Coroutine::EntrypointInfo && epInfo,PandaString && coroName,CoroutineLaunchMode mode,CoroutinePriority priority,bool launchImmediately,bool abortFlag)689 bool StackfulCoroutineManager::LaunchWithMode(Coroutine::EntrypointInfo &&epInfo, PandaString &&coroName,
690 CoroutineLaunchMode mode, CoroutinePriority priority,
691 bool launchImmediately, bool abortFlag)
692 {
693 // profiling: scheduler and launch time
694 ScopedCoroutineStats sSch(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::SCH_ALL);
695 ScopedCoroutineStats sLaunch(&GetCurrentWorker()->GetPerfStats(), CoroutineTimeStats::LAUNCH);
696
697 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::LaunchWithMode started";
698
699 auto *co = Coroutine::GetCurrent();
700 ASSERT(co != nullptr);
701 auto *w = co->GetContext<StackfulCoroutineContext>()->GetWorker();
702 mode = (mode == CoroutineLaunchMode::DEFAULT && w->InExclusiveMode()) ? CoroutineLaunchMode::SAME_WORKER : mode;
703 bool result = false;
704 if (launchImmediately) {
705 result = LaunchImmediatelyImpl(std::move(epInfo), std::move(coroName), mode, priority, abortFlag);
706 } else {
707 result = LaunchImpl(std::move(epInfo), std::move(coroName), mode, priority, abortFlag);
708 }
709 if (!result) {
710 // let's count all launch failures as "limit exceeded" for now.
711 // Later on we can think of throwing different errors for different reasons.
712 ThrowCoroutinesLimitExceedError(
713 "Unable to create a new coroutine: reached the limit for the number of existing coroutines.");
714 }
715
716 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::LaunchWithMode finished";
717 return result;
718 }
719
DumpCoroutineStats() const720 void StackfulCoroutineManager::DumpCoroutineStats() const
721 {
722 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager: dumping performance statistics...";
723 std::cout << "=== Coroutine statistics begin ===" << std::endl;
724 std::cout << stats_.GetFullStatistics(finalizedWorkerStats_);
725 std::cout << "=== Coroutine statistics end ===" << std::endl;
726 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager: performance statistics dumped successfully.";
727 }
728
ListUnhandledEventsOnProgramExit()729 void StackfulCoroutineManager::ListUnhandledEventsOnProgramExit()
730 {
731 auto *coro = Coroutine::GetCurrent();
732 ASSERT(coro != nullptr);
733 coro->ListUnhandledEventsOnProgramExit();
734 }
735
WaitForNonMainCoroutinesCompletion()736 void StackfulCoroutineManager::WaitForNonMainCoroutinesCompletion()
737 {
738 os::memory::LockHolder lkCompletion(programCompletionLock_);
739 // It's neccessary to read activeWorkersCount before coroutineCount to avoid deadlock
740 do {
741 while (GetActiveWorkersCount() + 1 < coroutineCount_) { // 1 is for MAIN
742 programCompletionEvent_->SetNotHappened();
743 programCompletionEvent_->Lock();
744 programCompletionLock_.Unlock();
745 GetCurrentWorker()->WaitForEvent(programCompletionEvent_);
746 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::WaitForNonMainCoroutinesCompletion(): possibly "
747 "spurious wakeup from wait...";
748 // NOTE(konstanting, #IAD5MH): test for the spurious wakeup
749 programCompletionLock_.Lock();
750 }
751 ListUnhandledEventsOnProgramExit();
752 } while (GetActiveWorkersCount() + 1 < coroutineCount_); // 1 is for MAIN
753 // coroutineCount_ < 1 + GetActiveWorkersCount() in case of concurrent EWorker destroy
754 // in this case coroutineCount_ >= 1 + GetActiveWorkersCount() - ExclusiveWorkersCount()
755 ASSERT(!(GetActiveWorkersCount() + 1 < coroutineCount_));
756 }
757
MainCoroutineCompleted()758 void StackfulCoroutineManager::MainCoroutineCompleted()
759 {
760 // precondition: MAIN is already in the native mode
761 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): STARTED";
762 // block till only schedule loop coroutines are present
763 LOG(DEBUG, COROUTINES)
764 << "StackfulCoroutineManager::MainCoroutineCompleted(): waiting for other coroutines to complete";
765 WaitForNonMainCoroutinesCompletion();
766 if (enableMigration_) {
767 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): stop manager thread";
768 StopManagerThread();
769 }
770
771 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): stopping workers";
772 {
773 os::memory::LockHolder lock(workersLock_);
774 for (auto *worker : workers_) {
775 worker->SetActive(false);
776 }
777 while (activeWorkersCount_ > 1) { // 1 is for MAIN
778 // profiling: the SCH interval is expected to be started after the ctxswitch
779 GetCurrentWorker()->GetPerfStats().FinishInterval(CoroutineTimeStats::SCH_ALL);
780 // NOTE(konstanting, #IAD5MH): need timed wait?..
781 workersCv_.Wait(&workersLock_);
782 // profiling: we don't want to profile the sleeping state
783 GetCurrentWorker()->GetPerfStats().StartInterval(CoroutineTimeStats::SCH_ALL);
784 }
785 }
786 // Only system coroutines and current coro (MAIN) are left (1 is for MAIN)
787 ASSERT(activeCoroutines_ == systemCoroutineCount_ + 1);
788
789 LOG(DEBUG, COROUTINES)
790 << "StackfulCoroutineManager::MainCoroutineCompleted(): stopping await loop on the main worker";
791 while (coroutineCount_ > 1) {
792 GetCurrentWorker()->FinalizeFiberScheduleLoop();
793 }
794 // profiling: the SCH interval is expected to be started after the ctxswitch
795 GetCurrentWorker()->GetPerfStats().FinishInterval(CoroutineTimeStats::SCH_ALL);
796
797 OnWorkerShutdown(GetCurrentWorker());
798
799 #ifndef NDEBUG
800 {
801 os::memory::LockHolder lkWorkers(workersLock_);
802 ASSERT(workers_.empty());
803 ASSERT(activeWorkersCount_ == 0);
804 }
805 #endif
806
807 if (stats_.IsEnabled()) {
808 DumpCoroutineStats();
809 }
810 stats_.Disable();
811
812 // We need to lock programCompletionLock_ here to call
813 // programCompletionLock_.Unlock() in ExclusiveWorker before runtime destruction
814 os::memory::LockHolder lkCompletion(programCompletionLock_);
815 GetCurrentContext()->MainThreadFinished();
816 // MAIN finished, all workers are deleted, no active coros remain
817 ASSERT(activeCoroutines_ == 0);
818 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::MainCoroutineCompleted(): DONE";
819 }
820
GetCurrentContext()821 StackfulCoroutineContext *StackfulCoroutineManager::GetCurrentContext()
822 {
823 auto *co = Coroutine::GetCurrent();
824 ASSERT(co != nullptr);
825 return co->GetContext<StackfulCoroutineContext>();
826 }
827
GetCurrentWorker()828 StackfulCoroutineWorker *StackfulCoroutineManager::GetCurrentWorker()
829 {
830 return GetCurrentContext()->GetWorker();
831 }
832
IsMainWorker(Coroutine * co) const833 bool StackfulCoroutineManager::IsMainWorker(Coroutine *co) const
834 {
835 auto *worker = co->GetContext<StackfulCoroutineContext>()->GetWorker();
836 return worker->IsMainWorker();
837 }
838
DestroyEntrypointfulCoroutine(Coroutine * co)839 void StackfulCoroutineManager::DestroyEntrypointfulCoroutine(Coroutine *co)
840 {
841 if (Runtime::GetOptions().IsUseCoroutinePool() && co->HasManagedEntrypoint()) {
842 co->CleanUp();
843 os::memory::LockHolder lock(coroPoolLock_);
844 coroutinePool_.push_back(co);
845 } else {
846 CoroutineManager::DestroyEntrypointfulCoroutine(co);
847 }
848 }
849
CreateCoroutineContextImpl(bool needStack)850 StackfulCoroutineContext *StackfulCoroutineManager::CreateCoroutineContextImpl(bool needStack)
851 {
852 uint8_t *stack = nullptr;
853 size_t stackSizeBytes = 0;
854 if (needStack) {
855 stack = AllocCoroutineStack();
856 if (stack == nullptr) {
857 return nullptr;
858 }
859 stackSizeBytes = coroStackSizeBytes_;
860 }
861 return Runtime::GetCurrent()->GetInternalAllocator()->New<StackfulCoroutineContext>(stack, stackSizeBytes);
862 }
863
CreateNativeCoroutine(Runtime * runtime,PandaVM * vm,Coroutine::NativeEntrypointInfo::NativeEntrypointFunc entry,void * param,PandaString name,Coroutine::Type type,CoroutinePriority priority)864 Coroutine *StackfulCoroutineManager::CreateNativeCoroutine(Runtime *runtime, PandaVM *vm,
865 Coroutine::NativeEntrypointInfo::NativeEntrypointFunc entry,
866 void *param, PandaString name, Coroutine::Type type,
867 CoroutinePriority priority)
868 {
869 if (GetCoroutineCount() >= GetCoroutineCountLimit()) {
870 // resource limit reached
871 return nullptr;
872 }
873 StackfulCoroutineContext *ctx = CreateCoroutineContextImpl(true);
874 if (ctx == nullptr) {
875 // do not proceed if we cannot create a context for the new coroutine
876 return nullptr;
877 }
878 auto *co = GetCoroutineFactory()(runtime, vm, std::move(name), ctx, Coroutine::NativeEntrypointInfo(entry, param),
879 type, priority);
880 ASSERT(co != nullptr);
881
882 // Let's assume that even the "native" coroutine can eventually try to execute some managed code.
883 // In that case pre/post barrier buffers are necessary.
884 co->InitBuffers();
885 return co;
886 }
887
DestroyNativeCoroutine(Coroutine * co)888 void StackfulCoroutineManager::DestroyNativeCoroutine(Coroutine *co)
889 {
890 DestroyEntrypointlessCoroutine(co);
891 }
892
OnCoroBecameActive(Coroutine * co)893 void StackfulCoroutineManager::OnCoroBecameActive(Coroutine *co)
894 {
895 ASSERT(co->IsActive());
896 IncrementActiveCoroutines();
897 co->GetWorker()->OnCoroBecameActive(co);
898 }
899
OnCoroBecameNonActive(Coroutine * co)900 void StackfulCoroutineManager::OnCoroBecameNonActive([[maybe_unused]] Coroutine *co)
901 {
902 ASSERT(!co->IsActive());
903 DecrementActiveCoroutines();
904 }
905
OnNativeCallExit(Coroutine * co)906 void StackfulCoroutineManager::OnNativeCallExit(Coroutine *co)
907 {
908 if (IsDrainQueueInterfaceEnabled()) {
909 // A temporary hack for draining the coroutine queue on the current worker.
910 // Will stay there until we have the proper design for the execution model and
911 // the rules for interaction with the app framework.
912 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::OnNativeCallExit(): START DRAINING COROQUEUE";
913
914 // precondition: the event is handled for the current coroutine
915 ASSERT(co == Coroutine::GetCurrent());
916 auto *worker = GetCurrentWorker();
917 if (!worker->IsMainWorker() && !worker->InExclusiveMode()) {
918 return;
919 }
920 ScopedNativeCodeThread nativeCode(co);
921 while (worker->GetRunnablesCount(Coroutine::Type::MUTATOR) > 0) {
922 GetCurrentWorker()->RequestSchedule();
923 }
924
925 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager::OnNativeCallExit(): STOP DRAINING COROQUEUE";
926 }
927 }
928
IncrementActiveCoroutines()929 void StackfulCoroutineManager::IncrementActiveCoroutines()
930 {
931 activeCoroutines_++;
932 }
933
DecrementActiveCoroutines()934 void StackfulCoroutineManager::DecrementActiveCoroutines()
935 {
936 [[maybe_unused]] uint32_t old = activeCoroutines_--;
937 ASSERT(old > 0);
938 }
939
IsNoActiveMutatorsExceptCurrent()940 bool StackfulCoroutineManager::IsNoActiveMutatorsExceptCurrent()
941 {
942 // activeCoroutines_ == 1 means that only current or main mutator is left
943 // activeCoroutines_ == 0 means that main mutator is terminating
944 // or all coroutines are blocked (a deadlock in managed code happened)
945
946 // NOTE(konstanting): need to reevaluate the necessity of locks here as
947 // atomics difference is somewhat confusing. Also, we may have concurrent access to them.
948 return (activeCoroutines_ - systemCoroutineCount_) <= 1;
949 }
950
CreateExclusiveWorkerForThread(Runtime * runtime,PandaVM * vm)951 Coroutine *StackfulCoroutineManager::CreateExclusiveWorkerForThread(Runtime *runtime, PandaVM *vm)
952 {
953 ASSERT(Thread::GetCurrent() == nullptr);
954
955 // actually we need this lock due to worker limit
956 os::memory::LockHolder eWorkerLock(eWorkerCreationLock_);
957
958 if (IsExclusiveWorkersLimitReached()) {
959 LOG(DEBUG, COROUTINES) << "The program reached the limit of exclusive workers";
960 return nullptr;
961 }
962
963 auto *eWorker = CreateWorker(runtime, vm, StackfulCoroutineWorker::ScheduleLoopType::FIBER, "[e-worker] ");
964 ASSERT(eWorker != nullptr);
965 eWorker->SetExclusiveMode(true);
966 eWorker->DisableForCrossWorkersLaunch();
967 auto *eCoro = CreateEntrypointlessCoroutine(runtime, vm, true, "[ea_coro] " + eWorker->GetName(),
968 Coroutine::Type::MUTATOR, CoroutinePriority::MEDIUM_PRIORITY);
969 ASSERT(eCoro != nullptr);
970 eWorker->AddRunningCoroutine(eCoro);
971 OnWorkerStartup(eWorker);
972
973 ASSERT(Coroutine::GetCurrent() == eCoro);
974 return eCoro;
975 }
976
DestroyExclusiveWorker()977 bool StackfulCoroutineManager::DestroyExclusiveWorker()
978 {
979 auto *eWorker = GetCurrentWorker();
980 if (!eWorker->InExclusiveMode()) {
981 LOG(DEBUG, COROUTINES) << "Trying to destroy not exclusive worker";
982 return false;
983 }
984
985 eWorker->CompleteAllAffinedCoroutines();
986
987 eWorker->SetActive(false);
988 eWorker->FinalizeFiberScheduleLoop();
989
990 CheckProgramCompletion();
991
992 auto *eaCoro = Coroutine::GetCurrent();
993 programCompletionLock_.Lock();
994 DestroyEntrypointlessCoroutine(eaCoro);
995 Coroutine::SetCurrent(nullptr);
996
997 OnWorkerShutdown(eWorker);
998 programCompletionLock_.Unlock();
999 return true;
1000 }
1001
IsExclusiveWorkersLimitReached() const1002 bool StackfulCoroutineManager::IsExclusiveWorkersLimitReached() const
1003 {
1004 bool limitIsReached = GetActiveWorkersCount() - commonWorkersCount_ >= exclusiveWorkersLimit_;
1005 LOG_IF(limitIsReached, DEBUG, COROUTINES) << "The programm reached the limit of exclusive workers";
1006 return limitIsReached;
1007 }
1008
ManagerThreadProc()1009 void StackfulCoroutineManager::ManagerThreadProc()
1010 {
1011 // calculate the time after 5 seconds.
1012 auto nextCheckTime = ark::os::time::GetClockTimeInMilli() + DETECTION_INTERVAL_VALUE;
1013 while (managerRunning_) {
1014 {
1015 os::memory::LockHolder lock(managerMutex_);
1016 managerCv_.TimedWait(&managerMutex_, DETECTION_INTERVAL_VALUE); // wait 5 seconds.
1017 if (!managerRunning_) {
1018 LOG(DEBUG, COROUTINES) << "The manager thread stops running.";
1019 break;
1020 }
1021 }
1022 uint32_t count = migrateCount_.exchange(0);
1023 while (count > 0) {
1024 MigrateCoroutinesInward(count);
1025 }
1026 auto now = ark::os::time::GetClockTimeInMilli();
1027 if (now >= nextCheckTime) {
1028 CheckForBlockedWorkers();
1029 nextCheckTime = now + DETECTION_INTERVAL_VALUE; // update to the next 5 second.
1030 }
1031 }
1032 }
1033
CheckForBlockedWorkers()1034 void StackfulCoroutineManager::CheckForBlockedWorkers()
1035 {
1036 os::memory::LockHolder lock(workersLock_);
1037 ASSERT(!workers_.empty());
1038 for (auto *w : workers_) {
1039 // skip main worker and exclusive workers
1040 if (w->IsMainWorker() || w->InExclusiveMode()) {
1041 continue;
1042 }
1043 w->MigrateCorosOutwardsIfBlocked();
1044 }
1045 }
1046
MigrateCoroutinesInward(uint32_t & count)1047 void StackfulCoroutineManager::MigrateCoroutinesInward(uint32_t &count)
1048 {
1049 auto affinityMask = CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode::DEFAULT);
1050 os::memory::LockHolder lkWorkers(workersLock_);
1051 StackfulCoroutineWorker *from = ChooseWorkerImpl(WorkerSelectionPolicy::MOST_LOADED, affinityMask);
1052 if (from == nullptr || from->IsIdle()) {
1053 LOG(DEBUG, COROUTINES) << "no suitable worker.";
1054 count = 0;
1055 return;
1056 }
1057
1058 for (auto *w : workers_) {
1059 // skip main worker and exclusive workers
1060 if (w->IsMainWorker() || w->InExclusiveMode()) {
1061 continue;
1062 }
1063 if (w->MigrateFrom(from)) {
1064 --count;
1065 return;
1066 }
1067 }
1068 count = 0;
1069 }
1070
MigrateCoroutinesOutward(StackfulCoroutineWorker * from)1071 bool StackfulCoroutineManager::MigrateCoroutinesOutward(StackfulCoroutineWorker *from)
1072 {
1073 if (from->IsIdle()) {
1074 LOG(DEBUG, COROUTINES) << "The worker is idle, stop migration outward.";
1075 return false;
1076 }
1077 auto affinityMask = CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode::DEFAULT);
1078 os::memory::LockHolder lkWorkers(workersLock_);
1079 StackfulCoroutineWorker *to = ChooseWorkerImpl(WorkerSelectionPolicy::LEAST_LOADED, affinityMask);
1080 if (to == nullptr || to == from) {
1081 LOG(DEBUG, COROUTINES) << "no suitable worker.";
1082 return false;
1083 }
1084 from->MigrateTo(to);
1085 return true;
1086 }
1087
ChooseWorkerImpl(WorkerSelectionPolicy policy,size_t maskValue)1088 StackfulCoroutineWorker *StackfulCoroutineManager::ChooseWorkerImpl(WorkerSelectionPolicy policy, size_t maskValue)
1089 {
1090 std::bitset<stackful_coroutines::MAX_WORKERS_COUNT> affinityBits(maskValue);
1091 auto preferFirstOverSecond = [&affinityBits, &policy](const StackfulCoroutineWorker *first,
1092 const StackfulCoroutineWorker *second) {
1093 if (!affinityBits.test(first->GetId())) {
1094 return false;
1095 }
1096 if (!affinityBits.test(second->GetId())) {
1097 return true;
1098 }
1099 // choosing the least loaded worker from the allowed worker set
1100 if (policy == WorkerSelectionPolicy::LEAST_LOADED) {
1101 return first->GetLoadFactor() < second->GetLoadFactor();
1102 }
1103 // choosing the most loaded worker from the allowed worker set
1104 return first->GetLoadFactor() > second->GetLoadFactor();
1105 };
1106
1107 if (workers_.empty()) {
1108 LOG(DEBUG, COROUTINES) << "workers is empty.";
1109 return nullptr;
1110 }
1111 #ifndef NDEBUG
1112 LOG(DEBUG, COROUTINES) << "Evaluating load factors:";
1113 for (auto w : workers_) {
1114 LOG(DEBUG, COROUTINES) << w->GetName() << ": LF = " << w->GetLoadFactor();
1115 }
1116 #endif
1117 std::vector<StackfulCoroutineWorker *> suitableWorkers;
1118 // skip exclusive and finalizing workers
1119 std::copy_if(workers_.begin(), workers_.end(), std::back_inserter(suitableWorkers), [affinityBits](auto *w) {
1120 return !w->IsDisabledForCrossWorkersLaunch() || affinityBits.test(w->GetId());
1121 });
1122 ASSERT(!suitableWorkers.empty());
1123
1124 auto wIt = std::min_element(suitableWorkers.begin(), suitableWorkers.end(), preferFirstOverSecond);
1125 LOG(DEBUG, COROUTINES) << "Chose worker: " << (*wIt)->GetName();
1126
1127 return *wIt;
1128 }
1129
TriggerMigration()1130 void StackfulCoroutineManager::TriggerMigration()
1131 {
1132 auto *worker = GetCurrentWorker();
1133 if (worker->IsMainWorker() || worker->InExclusiveMode()) {
1134 return;
1135 }
1136 if (!IsMigrationEnabled()) {
1137 LOG(DEBUG, COROUTINES) << "Migration is not supported.";
1138 return;
1139 }
1140 ++migrateCount_;
1141 LOG(DEBUG, COROUTINES) << "trigger migration.";
1142 os::memory::LockHolder lock(managerMutex_);
1143 managerCv_.Signal();
1144 }
1145
MigrateAwakenedCoro(Coroutine * co)1146 void StackfulCoroutineManager::MigrateAwakenedCoro(Coroutine *co)
1147 {
1148 os::memory::LockHolder lkWorkers(workersLock_);
1149 auto *w = ChooseWorkerForCoroutine(co);
1150 ASSERT(w != nullptr);
1151 w->AddRunnableCoroutine(co);
1152 }
1153
StartManagerThread()1154 void StackfulCoroutineManager::StartManagerThread()
1155 {
1156 // create a thread to detect worker blocking and perform coroutine migration
1157 managerRunning_ = true;
1158 managerThread_ = std::thread(&StackfulCoroutineManager::ManagerThreadProc, this);
1159 os::thread::SetThreadName(managerThread_.native_handle(), "managerThread");
1160 }
1161
StopManagerThread()1162 void StackfulCoroutineManager::StopManagerThread()
1163 {
1164 if (!managerRunning_) {
1165 return;
1166 }
1167 {
1168 os::memory::LockHolder lock(managerMutex_);
1169 managerRunning_ = false;
1170 managerCv_.SignalAll();
1171 }
1172 if (managerThread_.joinable()) {
1173 managerThread_.join();
1174 }
1175 }
1176
PreZygoteFork()1177 void StackfulCoroutineManager::PreZygoteFork()
1178 {
1179 WaitForNonMainCoroutinesCompletion();
1180 if (enableMigration_) {
1181 StopManagerThread();
1182 }
1183
1184 FinalizeWorkers(commonWorkersCount_ - 1, Runtime::GetCurrent(), Runtime::GetCurrent()->GetPandaVM());
1185 }
1186
PostZygoteFork()1187 void StackfulCoroutineManager::PostZygoteFork()
1188 {
1189 os::memory::LockHolder lh(workersLock_);
1190 Runtime *runtime = Runtime::GetCurrent();
1191 CreateWorkers(commonWorkersCount_ - 1, runtime, runtime->GetPandaVM());
1192 if (enableMigration_) {
1193 StartManagerThread();
1194 }
1195 }
1196
CalculateWorkerLimits(const CoroutineManagerConfig & config,size_t & exclusiveWorkersLimit,size_t & commonWorkersLimit)1197 void StackfulCoroutineManager::CalculateWorkerLimits(const CoroutineManagerConfig &config,
1198 size_t &exclusiveWorkersLimit, size_t &commonWorkersLimit)
1199 {
1200 // 1 is for MAIN
1201 size_t eWorkersLimit =
1202 std::min(stackful_coroutines::MAX_WORKERS_COUNT - 1, static_cast<size_t>(config.exclusiveWorkersLimit));
1203
1204 #ifdef PANDA_ETS_INTEROP_JS
1205 // 2 is for taskpool execution engine eaworker
1206 bool res = Runtime::GetOptions().IsTaskpoolSupportInterop(plugins::LangToRuntimeType(panda_file::SourceLang::ETS));
1207 if (res) {
1208 eWorkersLimit += stackful_coroutines::TASKPOOL_EAWORKER_LIMIT;
1209 }
1210 #endif
1211 // create and activate workers
1212 size_t numberOfAvailableCores = std::max(std::thread::hardware_concurrency() / 4ULL, 2ULL);
1213
1214 // workaround for issue #21582
1215 const size_t maxCommonWorkers =
1216 std::max(stackful_coroutines::MAX_WORKERS_COUNT - eWorkersLimit, static_cast<size_t>(2ULL));
1217
1218 commonWorkersLimit = (config.workersCount == CoroutineManagerConfig::WORKERS_COUNT_AUTO)
1219 ? std::min(numberOfAvailableCores, maxCommonWorkers)
1220 : std::min(static_cast<size_t>(config.workersCount), maxCommonWorkers);
1221 if (config.workersCount == CoroutineManagerConfig::WORKERS_COUNT_AUTO) {
1222 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager(): AUTO mode selected, will set number of coroutine "
1223 "common workers to number of CPUs / 4, but not less than 2 and no more than "
1224 << maxCommonWorkers << " = " << commonWorkersLimit;
1225 }
1226 ASSERT(commonWorkersLimit > 0);
1227
1228 exclusiveWorkersLimit = std::min(stackful_coroutines::MAX_WORKERS_COUNT - commonWorkersLimit, eWorkersLimit);
1229
1230 LOG(DEBUG, COROUTINES) << "StackfulCoroutineManager(): EWorkers limit is set to " << exclusiveWorkersLimit
1231 << ", when suggested " << eWorkersLimit;
1232 }
1233 } // namespace ark
1234