1 /**
2 * Copyright (c) 2022-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <thread>
17 #include "runtime/coroutines/coroutine.h"
18 #include "runtime/include/thread_scopes.h"
19 #include "libpandabase/os/mutex.h"
20 #include "runtime/include/runtime.h"
21 #include "runtime/include/runtime_notification.h"
22 #include "runtime/include/panda_vm.h"
23 #include "runtime/coroutines/threaded_coroutine.h"
24 #include "runtime/coroutines/threaded_coroutine_manager.h"
25
26 namespace ark {
27
Initialize(CoroutineManagerConfig config,Runtime * runtime,PandaVM * vm)28 void ThreadedCoroutineManager::Initialize(CoroutineManagerConfig config, Runtime *runtime, PandaVM *vm)
29 {
30 // create and activate workers
31 size_t numberOfAvailableCores = std::max(std::thread::hardware_concurrency() / 4ULL, 2ULL);
32 size_t targetNumberOfWorkers = (config.workersCount == CoroutineManagerConfig::WORKERS_COUNT_AUTO)
33 ? numberOfAvailableCores
34 : config.workersCount;
35 if (config.workersCount == CoroutineManagerConfig::WORKERS_COUNT_AUTO) {
36 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager(): AUTO mode selected, will set number of coroutine "
37 "workers to number of CPUs / 4, but not less than 2 = "
38 << targetNumberOfWorkers;
39 }
40 os::memory::LockHolder lock(workersLock_);
41 CreateWorkers(targetNumberOfWorkers, runtime, vm);
42 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager(): successfully created and activated " << workers_.size()
43 << " coroutine workers";
44 }
45
GetActiveWorkersCount() const46 uint32_t ThreadedCoroutineManager::GetActiveWorkersCount() const
47 {
48 os::memory::LockHolder lkWorkers(workersLock_);
49 return activeWorkersCount_;
50 }
51
CreateWorkers(size_t howMany,Runtime * runtime,PandaVM * vm)52 void ThreadedCoroutineManager::CreateWorkers(size_t howMany, Runtime *runtime, PandaVM *vm)
53 {
54 auto allocator = runtime->GetInternalAllocator();
55 for (size_t id = 0; id < howMany; ++id) {
56 auto *w = allocator->New<CoroutineWorker>(runtime, vm);
57 workers_.push_back(w);
58 ASSERT(workers_[id] == w);
59 }
60 activeWorkersCount_ = howMany;
61
62 auto *mainCo = CreateMainCoroutine(runtime, vm);
63 mainCo->SetWorker(workers_[0]);
64 Coroutine::SetCurrent(mainCo);
65 }
66
CreateCoroutineContext(bool coroHasEntrypoint)67 CoroutineContext *ThreadedCoroutineManager::CreateCoroutineContext([[maybe_unused]] bool coroHasEntrypoint)
68 {
69 auto alloc = Runtime::GetCurrent()->GetInternalAllocator();
70 return alloc->New<ThreadedCoroutineContext>();
71 }
72
DeleteCoroutineContext(CoroutineContext * ctx)73 void ThreadedCoroutineManager::DeleteCoroutineContext(CoroutineContext *ctx)
74 {
75 auto alloc = Runtime::GetCurrent()->GetInternalAllocator();
76 alloc->Delete(ctx);
77 }
78
GetCoroutineCount()79 size_t ThreadedCoroutineManager::GetCoroutineCount()
80 {
81 return coroutineCount_;
82 }
83
GetCoroutineCountLimit()84 size_t ThreadedCoroutineManager::GetCoroutineCountLimit()
85 {
86 return UINT_MAX;
87 }
88
AddToRegistry(Coroutine * co)89 void ThreadedCoroutineManager::AddToRegistry(Coroutine *co)
90 {
91 co->GetVM()->GetGC()->OnThreadCreate(co);
92 coroutines_.insert(co);
93 coroutineCount_++;
94 }
95
RemoveFromRegistry(Coroutine * co)96 void ThreadedCoroutineManager::RemoveFromRegistry(Coroutine *co)
97 {
98 coroutines_.erase(co);
99 coroutineCount_--;
100 }
101
DeleteCoroutineInstance(Coroutine * co)102 void ThreadedCoroutineManager::DeleteCoroutineInstance(Coroutine *co)
103 {
104 auto *context = co->GetContext<CoroutineContext>();
105 Runtime::GetCurrent()->GetInternalAllocator()->Delete(co);
106 DeleteCoroutineContext(context);
107 }
108
RegisterCoroutine(Coroutine * co)109 void ThreadedCoroutineManager::RegisterCoroutine(Coroutine *co)
110 {
111 os::memory::LockHolder lock(coroListLock_);
112 AddToRegistry(co);
113 // Propagate SUSPEND_REQUEST flag to the new coroutine to avoid the following situation:
114 // * Main coro holds read lock of the MutatorLock.
115 // * GC thread calls SuspendAll nad set SUSPEND_REQUEST flag to the main coro and
116 // tries to acquire write lock of the MutatorLock.
117 // * Main coro creates a new coro and adds it to the coroutines_ list.
118 // * SUSPEND_REQUEST is not set in the new coroutine
119 // * New coro starts execution, acquires read lock of the MutatorLock and enters a long loop
120 // * Main coro checks SUSPEND_REQUEST flag and blocks
121 // * GC will not start becuase the new coro has no SUSPEND_REQUEST flag and it will never release the MutatorLock
122 //
123 // We need to propagate SUSPEND_REQUEST under the coroListLock_.
124 // It guarantees that the flag is already set for the current coro and we need to propagate it
125 // or GC will see the new coro in EnumerateAllThreads.
126 if (Thread::GetCurrent() != nullptr && Coroutine::GetCurrent() != nullptr &&
127 Coroutine::GetCurrent()->IsSuspended() && !co->IsSuspended()) {
128 co->SuspendImpl(true);
129 }
130 }
131
TerminateCoroutine(Coroutine * co)132 bool ThreadedCoroutineManager::TerminateCoroutine(Coroutine *co)
133 {
134 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::TerminateCoroutine() started";
135 co->NativeCodeEnd();
136 co->UpdateStatus(ThreadStatus::TERMINATING);
137
138 os::memory::LockHolder l(coroSwitchLock_);
139 if (co->HasManagedEntrypoint()) {
140 // entrypointless coros should be destroyed manually
141 if (RunnableCoroutinesExist()) {
142 ScheduleNextCoroutine();
143 } else {
144 --runningCorosCount_;
145 }
146 }
147
148 {
149 os::memory::LockHolder lList(coroListLock_);
150 RemoveFromRegistry(co);
151 // We need collect TLAB metrics and clear TLAB before calling the manage thread destructor
152 // because of the possibility heap use after free. This happening when GC starts execute ResetYoungAllocator
153 // method which start iterate set of threads, collect TLAB metrics and clear TLAB. If thread was deleted from
154 // set but we haven't destroyed the thread yet, GC won't collect metrics and can complete TLAB
155 // deletion faster. And when we try to get the TLAB metrics in the destructor of managed thread, we will get
156 // heap use after free
157 co->CollectTLABMetrics();
158 co->ClearTLAB();
159 // DestroyInternalResources() must be called in one critical section with
160 // RemoveFromRegistry (under core_list_lock_). This function transfers cards from coro's post_barrier buffer to
161 // UpdateRemsetThread internally. Situation when cards still remain and UpdateRemsetThread cannot visit the
162 // coro (because it is already removed) must be impossible.
163 co->DestroyInternalResources();
164 }
165 co->UpdateStatus(ThreadStatus::FINISHED);
166 Runtime::GetCurrent()->GetNotificationManager()->ThreadEndEvent(co);
167
168 if (!co->HasManagedEntrypoint()) {
169 // entrypointless coros should be destroyed manually
170 return false;
171 }
172
173 DeleteCoroutineInstance(co);
174
175 os::memory::LockHolder lk(cvAwaitAllMutex_);
176 cvAwaitAll_.Signal();
177 return true;
178 // NOTE(konstanting): issue debug notifications to runtime
179 }
180
Launch(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode,CoroutinePriority priority,bool abortFlag)181 bool ThreadedCoroutineManager::Launch(CompletionEvent *completionEvent, Method *entrypoint,
182 PandaVector<Value> &&arguments, [[maybe_unused]] CoroutineLaunchMode mode,
183 CoroutinePriority priority, [[maybe_unused]] bool abortFlag)
184 {
185 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Launch started";
186 auto epInfo = Coroutine::ManagedEntrypointInfo {completionEvent, entrypoint, std::move(arguments)};
187 bool result = LaunchImpl(std::move(epInfo), entrypoint->GetFullName(), priority);
188 if (!result) {
189 // let's count all launch failures as "limit exceeded" for now.
190 // Later on we can think of throwing different errors for different reasons.
191 ThrowCoroutinesLimitExceedError(
192 "Unable to create a new coroutine: reached the limit for the number of existing coroutines.");
193 }
194
195 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Launch finished";
196 return result;
197 }
198
LaunchImmediately(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode,CoroutinePriority priority,bool abortFlag)199 bool ThreadedCoroutineManager::LaunchImmediately([[maybe_unused]] CompletionEvent *completionEvent,
200 [[maybe_unused]] Method *entrypoint,
201 [[maybe_unused]] PandaVector<Value> &&arguments,
202 [[maybe_unused]] CoroutineLaunchMode mode,
203 [[maybe_unused]] CoroutinePriority priority,
204 [[maybe_unused]] bool abortFlag)
205 {
206 LOG(FATAL, COROUTINES) << "ThreadedCoroutineManager::LaunchImmediately not supported";
207 return false;
208 }
209
LaunchNative(NativeEntrypointFunc epFunc,void * param,PandaString coroName,CoroutineLaunchMode mode,CoroutinePriority priority,bool abortFlag)210 bool ThreadedCoroutineManager::LaunchNative(NativeEntrypointFunc epFunc, void *param, PandaString coroName,
211 [[maybe_unused]] CoroutineLaunchMode mode, CoroutinePriority priority,
212 [[maybe_unused]] bool abortFlag)
213 {
214 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::LaunchNative started";
215 auto epInfo = Coroutine::NativeEntrypointInfo {epFunc, param};
216 bool result = LaunchImpl(epInfo, std::move(coroName), priority);
217 if (!result) {
218 ThrowOutOfMemoryError("Launch failed");
219 }
220
221 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::LaunchNative finished";
222 return result;
223 }
224
RegisterWaiter(Coroutine * waiter,CoroutineEvent * awaitee)225 bool ThreadedCoroutineManager::RegisterWaiter(Coroutine *waiter, CoroutineEvent *awaitee)
226 {
227 os::memory::LockHolder l(waitersLock_);
228 if (awaitee->Happened()) {
229 awaitee->Unlock();
230 return false;
231 }
232
233 awaitee->Unlock();
234 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::RegisterAsAwaitee: " << waiter->GetName() << " AWAITS";
235 [[maybe_unused]] auto [_, inserted] = waiters_.insert({awaitee, waiter});
236 ASSERT(inserted);
237 return true;
238 }
239
Await(CoroutineEvent * awaitee)240 void ThreadedCoroutineManager::Await(CoroutineEvent *awaitee)
241 {
242 ASSERT(awaitee != nullptr);
243 ASSERT_NATIVE_CODE();
244 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await started";
245
246 auto *waiter = Coroutine::GetCurrent();
247 ASSERT(waiter != nullptr);
248 auto *waiterCtx = waiter->GetContext<ThreadedCoroutineContext>();
249
250 coroSwitchLock_.Lock();
251
252 if (!RegisterWaiter(waiter, awaitee)) {
253 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await finished (no await happened)";
254 coroSwitchLock_.Unlock();
255 return;
256 }
257
258 waiter->RequestSuspend(true);
259 if (RunnableCoroutinesExist()) {
260 ScheduleNextCoroutine();
261 }
262 coroSwitchLock_.Unlock();
263 waiterCtx->WaitUntilResumed();
264
265 // NB: at this point the awaitee is already deleted
266 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await finished";
267 }
268
UnblockWaiters(CoroutineEvent * blocker)269 void ThreadedCoroutineManager::UnblockWaiters(CoroutineEvent *blocker)
270 {
271 os::memory::LockHolder lh(coroSwitchLock_);
272 UnblockWaitersImpl(blocker);
273 }
274
UnblockWaitersImpl(CoroutineEvent * blocker)275 void ThreadedCoroutineManager::UnblockWaitersImpl(CoroutineEvent *blocker)
276 {
277 os::memory::LockHolder l(waitersLock_);
278 ASSERT(blocker != nullptr);
279 #ifndef NDEBUG
280 {
281 os::memory::LockHolder lkBlocker(*blocker);
282 ASSERT(blocker->Happened());
283 }
284 #endif
285 auto w = waiters_.find(blocker);
286 if (w != waiters_.end()) {
287 auto *coro = w->second;
288 waiters_.erase(w);
289 coro->RequestUnblock();
290 PushToRunnableQueue(coro, coro->GetPriority());
291 w = waiters_.find(blocker);
292 }
293 }
294
Schedule()295 void ThreadedCoroutineManager::Schedule()
296 {
297 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Schedule() request from "
298 << Coroutine::GetCurrent()->GetName();
299 ScheduleImpl();
300 }
301
EnumerateThreadsImpl(const ThreadManager::Callback & cb,unsigned int incMask,unsigned int xorMask) const302 bool ThreadedCoroutineManager::EnumerateThreadsImpl(const ThreadManager::Callback &cb, unsigned int incMask,
303 unsigned int xorMask) const
304 {
305 os::memory::LockHolder lock(coroListLock_);
306 for (auto *t : coroutines_) {
307 if (!ApplyCallbackToThread(cb, t, incMask, xorMask)) {
308 return false;
309 }
310 }
311 return true;
312 }
313
SuspendAllThreads()314 void ThreadedCoroutineManager::SuspendAllThreads()
315 {
316 os::memory::LockHolder lList(coroListLock_);
317 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::SuspendAllThreads started";
318 for (auto *t : coroutines_) {
319 t->SuspendImpl(true);
320 }
321 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::SuspendAllThreads finished";
322 }
323
ResumeAllThreads()324 void ThreadedCoroutineManager::ResumeAllThreads()
325 {
326 os::memory::LockHolder lock(coroListLock_);
327 for (auto *t : coroutines_) {
328 t->ResumeImpl(true);
329 }
330 }
331
IsRunningThreadExist()332 bool ThreadedCoroutineManager::IsRunningThreadExist()
333 {
334 UNREACHABLE();
335 // NOTE(konstanting): correct implementation. Which coroutine do we consider running?
336 return false;
337 }
338
WaitForDeregistration()339 void ThreadedCoroutineManager::WaitForDeregistration()
340 {
341 MainCoroutineCompleted();
342 }
343
344 #ifndef NDEBUG
PrintRunnableQueue(const PandaString & requester)345 void ThreadedCoroutineManager::PrintRunnableQueue(const PandaString &requester)
346 {
347 LOG(DEBUG, COROUTINES) << "[" << requester << "] ";
348 runnablesQueue_.IterateOverCoroutines([](Coroutine *co) { LOG(DEBUG, COROUTINES) << co->GetName() << " <"; });
349 LOG(DEBUG, COROUTINES) << "X";
350 }
351 #endif
352
PushToRunnableQueue(Coroutine * co,CoroutinePriority priority)353 void ThreadedCoroutineManager::PushToRunnableQueue(Coroutine *co, CoroutinePriority priority)
354 {
355 runnablesQueue_.Push(co, priority);
356 }
357
RunnableCoroutinesExist()358 bool ThreadedCoroutineManager::RunnableCoroutinesExist()
359 {
360 return !runnablesQueue_.Empty();
361 }
362
PopFromRunnableQueue()363 Coroutine *ThreadedCoroutineManager::PopFromRunnableQueue()
364 {
365 auto [co, _] = runnablesQueue_.Pop();
366 return co;
367 }
368
ScheduleNextCoroutine()369 void ThreadedCoroutineManager::ScheduleNextCoroutine()
370 {
371 Coroutine *nextCoroutine = PopFromRunnableQueue();
372 nextCoroutine->RequestResume();
373 }
374
ScheduleImpl()375 void ThreadedCoroutineManager::ScheduleImpl()
376 {
377 ASSERT_NATIVE_CODE();
378 auto *currentCo = Coroutine::GetCurrent();
379 ASSERT(currentCo != nullptr);
380 auto *currentCtx = currentCo->GetContext<ThreadedCoroutineContext>();
381
382 coroSwitchLock_.Lock();
383 if (RunnableCoroutinesExist()) {
384 currentCo->RequestSuspend(false);
385 PushToRunnableQueue(currentCo, CoroutinePriority::LOW_PRIORITY);
386 ScheduleNextCoroutine();
387
388 coroSwitchLock_.Unlock();
389 currentCtx->WaitUntilResumed();
390 } else {
391 coroSwitchLock_.Unlock();
392 }
393 }
394
ChooseWorkerForCoroutine(Coroutine * co)395 CoroutineWorker *ThreadedCoroutineManager::ChooseWorkerForCoroutine([[maybe_unused]] Coroutine *co)
396 {
397 ASSERT(co != nullptr);
398 // Currently this function is a stub: we assign everything to the main worker and hope for the best.
399 // Later on, we will emulate the correct worker selection.
400 os::memory::LockHolder lkWorkers(workersLock_);
401 return workers_[0];
402 }
403
LaunchImpl(EntrypointInfo && epInfo,PandaString && coroName,CoroutinePriority priority,bool startSuspended)404 bool ThreadedCoroutineManager::LaunchImpl(EntrypointInfo &&epInfo, PandaString &&coroName, CoroutinePriority priority,
405 bool startSuspended)
406 {
407 os::memory::LockHolder l(coroSwitchLock_);
408 #ifndef NDEBUG
409 PrintRunnableQueue("LaunchImpl begin");
410 #endif
411 Coroutine *co = CreateCoroutineInstance(std::move(epInfo), std::move(coroName), Coroutine::Type::MUTATOR, priority);
412 Runtime::GetCurrent()->GetNotificationManager()->ThreadStartEvent(co);
413 if (co == nullptr) {
414 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::LaunchImpl: failed to create a coroutine!";
415 return false;
416 }
417 // assign a worker
418 auto *w = ChooseWorkerForCoroutine(co);
419 co->SetWorker(w);
420 // run
421 auto *ctx = co->GetContext<ThreadedCoroutineContext>();
422 if (startSuspended) {
423 ctx->WaitUntilInitialized();
424 if (runningCorosCount_ >= GetActiveWorkersCount()) {
425 PushToRunnableQueue(co, co->GetPriority());
426 } else {
427 ++runningCorosCount_;
428 ctx->RequestResume();
429 }
430 } else {
431 ++runningCorosCount_;
432 }
433 #ifndef NDEBUG
434 PrintRunnableQueue("LaunchImpl end");
435 #endif
436 return true;
437 }
438
MainCoroutineCompleted()439 void ThreadedCoroutineManager::MainCoroutineCompleted()
440 {
441 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted() started";
442 ASSERT(Coroutine::GetCurrent() != nullptr);
443 auto *ctx = Coroutine::GetCurrent()->GetContext<ThreadedCoroutineContext>();
444 // firstly yield
445 {
446 os::memory::LockHolder l(coroSwitchLock_);
447 ctx->MainThreadFinished();
448 if (RunnableCoroutinesExist()) {
449 ScheduleNextCoroutine();
450 }
451 }
452 // then start awaiting for other coroutines to complete
453 os::memory::LockHolder lk(cvAwaitAllMutex_);
454 ctx->EnterAwaitLoop();
455 while (coroutineCount_ > 1) { // main coro runs till VM shutdown
456 cvAwaitAll_.Wait(&cvAwaitAllMutex_);
457 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): await_all(): still "
458 << coroutineCount_ << " coroutines exist...";
459 }
460 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): await_all() done";
461
462 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): deleting workers";
463 {
464 os::memory::LockHolder lkWorkers(workersLock_);
465 for (auto *worker : workers_) {
466 Runtime::GetCurrent()->GetInternalAllocator()->Delete(worker);
467 }
468 workers_.clear();
469 }
470 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): DONE";
471 }
472
Finalize()473 void ThreadedCoroutineManager::Finalize() {}
474
IsMainWorker(Coroutine * co) const475 bool ThreadedCoroutineManager::IsMainWorker(Coroutine *co) const
476 {
477 return co == GetMainThread();
478 }
479
480 } // namespace ark
481