• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (c) 2022-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <thread>
17 #include "runtime/coroutines/coroutine.h"
18 #include "runtime/include/thread_scopes.h"
19 #include "libpandabase/os/mutex.h"
20 #include "runtime/include/runtime.h"
21 #include "runtime/include/runtime_notification.h"
22 #include "runtime/include/panda_vm.h"
23 #include "runtime/coroutines/threaded_coroutine.h"
24 #include "runtime/coroutines/threaded_coroutine_manager.h"
25 
26 namespace ark {
27 
Initialize(CoroutineManagerConfig config,Runtime * runtime,PandaVM * vm)28 void ThreadedCoroutineManager::Initialize(CoroutineManagerConfig config, Runtime *runtime, PandaVM *vm)
29 {
30     // create and activate workers
31     size_t numberOfAvailableCores = std::max(std::thread::hardware_concurrency() / 4ULL, 2ULL);
32     size_t targetNumberOfWorkers = (config.workersCount == CoroutineManagerConfig::WORKERS_COUNT_AUTO)
33                                        ? numberOfAvailableCores
34                                        : config.workersCount;
35     if (config.workersCount == CoroutineManagerConfig::WORKERS_COUNT_AUTO) {
36         LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager(): AUTO mode selected, will set number of coroutine "
37                                   "workers to number of CPUs / 4, but not less than 2 = "
38                                << targetNumberOfWorkers;
39     }
40     os::memory::LockHolder lock(workersLock_);
41     CreateWorkers(targetNumberOfWorkers, runtime, vm);
42     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager(): successfully created and activated " << workers_.size()
43                            << " coroutine workers";
44 }
45 
GetActiveWorkersCount() const46 uint32_t ThreadedCoroutineManager::GetActiveWorkersCount() const
47 {
48     os::memory::LockHolder lkWorkers(workersLock_);
49     return activeWorkersCount_;
50 }
51 
CreateWorkers(size_t howMany,Runtime * runtime,PandaVM * vm)52 void ThreadedCoroutineManager::CreateWorkers(size_t howMany, Runtime *runtime, PandaVM *vm)
53 {
54     auto allocator = runtime->GetInternalAllocator();
55     for (size_t id = 0; id < howMany; ++id) {
56         auto *w = allocator->New<CoroutineWorker>(runtime, vm);
57         workers_.push_back(w);
58         ASSERT(workers_[id] == w);
59     }
60     activeWorkersCount_ = howMany;
61 
62     auto *mainCo = CreateMainCoroutine(runtime, vm);
63     mainCo->SetWorker(workers_[0]);
64     Coroutine::SetCurrent(mainCo);
65 }
66 
CreateCoroutineContext(bool coroHasEntrypoint)67 CoroutineContext *ThreadedCoroutineManager::CreateCoroutineContext([[maybe_unused]] bool coroHasEntrypoint)
68 {
69     auto alloc = Runtime::GetCurrent()->GetInternalAllocator();
70     return alloc->New<ThreadedCoroutineContext>();
71 }
72 
DeleteCoroutineContext(CoroutineContext * ctx)73 void ThreadedCoroutineManager::DeleteCoroutineContext(CoroutineContext *ctx)
74 {
75     auto alloc = Runtime::GetCurrent()->GetInternalAllocator();
76     alloc->Delete(ctx);
77 }
78 
GetCoroutineCount()79 size_t ThreadedCoroutineManager::GetCoroutineCount()
80 {
81     return coroutineCount_;
82 }
83 
GetCoroutineCountLimit()84 size_t ThreadedCoroutineManager::GetCoroutineCountLimit()
85 {
86     return UINT_MAX;
87 }
88 
AddToRegistry(Coroutine * co)89 void ThreadedCoroutineManager::AddToRegistry(Coroutine *co)
90 {
91     co->GetVM()->GetGC()->OnThreadCreate(co);
92     coroutines_.insert(co);
93     coroutineCount_++;
94 }
95 
RemoveFromRegistry(Coroutine * co)96 void ThreadedCoroutineManager::RemoveFromRegistry(Coroutine *co)
97 {
98     coroutines_.erase(co);
99     coroutineCount_--;
100 }
101 
DeleteCoroutineInstance(Coroutine * co)102 void ThreadedCoroutineManager::DeleteCoroutineInstance(Coroutine *co)
103 {
104     auto *context = co->GetContext<CoroutineContext>();
105     Runtime::GetCurrent()->GetInternalAllocator()->Delete(co);
106     DeleteCoroutineContext(context);
107 }
108 
RegisterCoroutine(Coroutine * co)109 void ThreadedCoroutineManager::RegisterCoroutine(Coroutine *co)
110 {
111     os::memory::LockHolder lock(coroListLock_);
112     AddToRegistry(co);
113     // Propagate SUSPEND_REQUEST flag to the new coroutine to avoid the following situation:
114     // * Main coro holds read lock of the MutatorLock.
115     // * GC thread calls SuspendAll nad set SUSPEND_REQUEST flag to the main coro and
116     //   tries to acquire write lock of the MutatorLock.
117     // * Main coro creates a new coro and adds it to the coroutines_ list.
118     // * SUSPEND_REQUEST is not set in the new coroutine
119     // * New coro starts execution, acquires read lock of the MutatorLock and enters a long loop
120     // * Main coro checks SUSPEND_REQUEST flag and blocks
121     // * GC will not start becuase the new coro has no SUSPEND_REQUEST flag and it will never release the MutatorLock
122     //
123     // We need to propagate SUSPEND_REQUEST under the coroListLock_.
124     // It guarantees that the flag is already set for the current coro and we need to propagate it
125     // or GC will see the new coro in EnumerateAllThreads.
126     if (Thread::GetCurrent() != nullptr && Coroutine::GetCurrent() != nullptr &&
127         Coroutine::GetCurrent()->IsSuspended() && !co->IsSuspended()) {
128         co->SuspendImpl(true);
129     }
130 }
131 
TerminateCoroutine(Coroutine * co)132 bool ThreadedCoroutineManager::TerminateCoroutine(Coroutine *co)
133 {
134     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::TerminateCoroutine() started";
135     co->NativeCodeEnd();
136     co->UpdateStatus(ThreadStatus::TERMINATING);
137 
138     os::memory::LockHolder l(coroSwitchLock_);
139     if (co->HasManagedEntrypoint()) {
140         // entrypointless coros should be destroyed manually
141         if (RunnableCoroutinesExist()) {
142             ScheduleNextCoroutine();
143         } else {
144             --runningCorosCount_;
145         }
146     }
147 
148     {
149         os::memory::LockHolder lList(coroListLock_);
150         RemoveFromRegistry(co);
151         // We need collect TLAB metrics and clear TLAB before calling the manage thread destructor
152         // because of the possibility heap use after free. This happening when GC starts execute ResetYoungAllocator
153         // method which start iterate set of threads, collect TLAB metrics and clear TLAB. If thread was deleted from
154         // set but we haven't destroyed the thread yet, GC won't collect metrics and can complete TLAB
155         // deletion faster. And when we try to get the TLAB metrics in the destructor of managed thread, we will get
156         // heap use after free
157         co->CollectTLABMetrics();
158         co->ClearTLAB();
159         // DestroyInternalResources() must be called in one critical section with
160         // RemoveFromRegistry (under core_list_lock_). This function transfers cards from coro's post_barrier buffer to
161         // UpdateRemsetThread internally. Situation when cards still remain and UpdateRemsetThread cannot visit the
162         // coro (because it is already removed) must be impossible.
163         co->DestroyInternalResources();
164     }
165     co->UpdateStatus(ThreadStatus::FINISHED);
166     Runtime::GetCurrent()->GetNotificationManager()->ThreadEndEvent(co);
167 
168     if (!co->HasManagedEntrypoint()) {
169         // entrypointless coros should be destroyed manually
170         return false;
171     }
172 
173     DeleteCoroutineInstance(co);
174 
175     os::memory::LockHolder lk(cvAwaitAllMutex_);
176     cvAwaitAll_.Signal();
177     return true;
178     // NOTE(konstanting): issue debug notifications to runtime
179 }
180 
Launch(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode,CoroutinePriority priority,bool abortFlag)181 bool ThreadedCoroutineManager::Launch(CompletionEvent *completionEvent, Method *entrypoint,
182                                       PandaVector<Value> &&arguments, [[maybe_unused]] CoroutineLaunchMode mode,
183                                       CoroutinePriority priority, [[maybe_unused]] bool abortFlag)
184 {
185     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Launch started";
186     auto epInfo = Coroutine::ManagedEntrypointInfo {completionEvent, entrypoint, std::move(arguments)};
187     bool result = LaunchImpl(std::move(epInfo), entrypoint->GetFullName(), priority);
188     if (!result) {
189         // let's count all launch failures as "limit exceeded" for now.
190         // Later on we can think of throwing different errors for different reasons.
191         ThrowCoroutinesLimitExceedError(
192             "Unable to create a new coroutine: reached the limit for the number of existing coroutines.");
193     }
194 
195     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Launch finished";
196     return result;
197 }
198 
LaunchImmediately(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode,CoroutinePriority priority,bool abortFlag)199 bool ThreadedCoroutineManager::LaunchImmediately([[maybe_unused]] CompletionEvent *completionEvent,
200                                                  [[maybe_unused]] Method *entrypoint,
201                                                  [[maybe_unused]] PandaVector<Value> &&arguments,
202                                                  [[maybe_unused]] CoroutineLaunchMode mode,
203                                                  [[maybe_unused]] CoroutinePriority priority,
204                                                  [[maybe_unused]] bool abortFlag)
205 {
206     LOG(FATAL, COROUTINES) << "ThreadedCoroutineManager::LaunchImmediately not supported";
207     return false;
208 }
209 
LaunchNative(NativeEntrypointFunc epFunc,void * param,PandaString coroName,CoroutineLaunchMode mode,CoroutinePriority priority,bool abortFlag)210 bool ThreadedCoroutineManager::LaunchNative(NativeEntrypointFunc epFunc, void *param, PandaString coroName,
211                                             [[maybe_unused]] CoroutineLaunchMode mode, CoroutinePriority priority,
212                                             [[maybe_unused]] bool abortFlag)
213 {
214     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::LaunchNative started";
215     auto epInfo = Coroutine::NativeEntrypointInfo {epFunc, param};
216     bool result = LaunchImpl(epInfo, std::move(coroName), priority);
217     if (!result) {
218         ThrowOutOfMemoryError("Launch failed");
219     }
220 
221     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::LaunchNative finished";
222     return result;
223 }
224 
RegisterWaiter(Coroutine * waiter,CoroutineEvent * awaitee)225 bool ThreadedCoroutineManager::RegisterWaiter(Coroutine *waiter, CoroutineEvent *awaitee)
226 {
227     os::memory::LockHolder l(waitersLock_);
228     if (awaitee->Happened()) {
229         awaitee->Unlock();
230         return false;
231     }
232 
233     awaitee->Unlock();
234     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::RegisterAsAwaitee: " << waiter->GetName() << " AWAITS";
235     [[maybe_unused]] auto [_, inserted] = waiters_.insert({awaitee, waiter});
236     ASSERT(inserted);
237     return true;
238 }
239 
Await(CoroutineEvent * awaitee)240 void ThreadedCoroutineManager::Await(CoroutineEvent *awaitee)
241 {
242     ASSERT(awaitee != nullptr);
243     ASSERT_NATIVE_CODE();
244     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await started";
245 
246     auto *waiter = Coroutine::GetCurrent();
247     ASSERT(waiter != nullptr);
248     auto *waiterCtx = waiter->GetContext<ThreadedCoroutineContext>();
249 
250     coroSwitchLock_.Lock();
251 
252     if (!RegisterWaiter(waiter, awaitee)) {
253         LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await finished (no await happened)";
254         coroSwitchLock_.Unlock();
255         return;
256     }
257 
258     waiter->RequestSuspend(true);
259     if (RunnableCoroutinesExist()) {
260         ScheduleNextCoroutine();
261     }
262     coroSwitchLock_.Unlock();
263     waiterCtx->WaitUntilResumed();
264 
265     // NB: at this point the awaitee is already deleted
266     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await finished";
267 }
268 
UnblockWaiters(CoroutineEvent * blocker)269 void ThreadedCoroutineManager::UnblockWaiters(CoroutineEvent *blocker)
270 {
271     os::memory::LockHolder lh(coroSwitchLock_);
272     UnblockWaitersImpl(blocker);
273 }
274 
UnblockWaitersImpl(CoroutineEvent * blocker)275 void ThreadedCoroutineManager::UnblockWaitersImpl(CoroutineEvent *blocker)
276 {
277     os::memory::LockHolder l(waitersLock_);
278     ASSERT(blocker != nullptr);
279 #ifndef NDEBUG
280     {
281         os::memory::LockHolder lkBlocker(*blocker);
282         ASSERT(blocker->Happened());
283     }
284 #endif
285     auto w = waiters_.find(blocker);
286     if (w != waiters_.end()) {
287         auto *coro = w->second;
288         waiters_.erase(w);
289         coro->RequestUnblock();
290         PushToRunnableQueue(coro, coro->GetPriority());
291         w = waiters_.find(blocker);
292     }
293 }
294 
Schedule()295 void ThreadedCoroutineManager::Schedule()
296 {
297     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Schedule() request from "
298                            << Coroutine::GetCurrent()->GetName();
299     ScheduleImpl();
300 }
301 
EnumerateThreadsImpl(const ThreadManager::Callback & cb,unsigned int incMask,unsigned int xorMask) const302 bool ThreadedCoroutineManager::EnumerateThreadsImpl(const ThreadManager::Callback &cb, unsigned int incMask,
303                                                     unsigned int xorMask) const
304 {
305     os::memory::LockHolder lock(coroListLock_);
306     for (auto *t : coroutines_) {
307         if (!ApplyCallbackToThread(cb, t, incMask, xorMask)) {
308             return false;
309         }
310     }
311     return true;
312 }
313 
SuspendAllThreads()314 void ThreadedCoroutineManager::SuspendAllThreads()
315 {
316     os::memory::LockHolder lList(coroListLock_);
317     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::SuspendAllThreads started";
318     for (auto *t : coroutines_) {
319         t->SuspendImpl(true);
320     }
321     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::SuspendAllThreads finished";
322 }
323 
ResumeAllThreads()324 void ThreadedCoroutineManager::ResumeAllThreads()
325 {
326     os::memory::LockHolder lock(coroListLock_);
327     for (auto *t : coroutines_) {
328         t->ResumeImpl(true);
329     }
330 }
331 
IsRunningThreadExist()332 bool ThreadedCoroutineManager::IsRunningThreadExist()
333 {
334     UNREACHABLE();
335     // NOTE(konstanting): correct implementation. Which coroutine do we consider running?
336     return false;
337 }
338 
WaitForDeregistration()339 void ThreadedCoroutineManager::WaitForDeregistration()
340 {
341     MainCoroutineCompleted();
342 }
343 
344 #ifndef NDEBUG
PrintRunnableQueue(const PandaString & requester)345 void ThreadedCoroutineManager::PrintRunnableQueue(const PandaString &requester)
346 {
347     LOG(DEBUG, COROUTINES) << "[" << requester << "] ";
348     runnablesQueue_.IterateOverCoroutines([](Coroutine *co) { LOG(DEBUG, COROUTINES) << co->GetName() << " <"; });
349     LOG(DEBUG, COROUTINES) << "X";
350 }
351 #endif
352 
PushToRunnableQueue(Coroutine * co,CoroutinePriority priority)353 void ThreadedCoroutineManager::PushToRunnableQueue(Coroutine *co, CoroutinePriority priority)
354 {
355     runnablesQueue_.Push(co, priority);
356 }
357 
RunnableCoroutinesExist()358 bool ThreadedCoroutineManager::RunnableCoroutinesExist()
359 {
360     return !runnablesQueue_.Empty();
361 }
362 
PopFromRunnableQueue()363 Coroutine *ThreadedCoroutineManager::PopFromRunnableQueue()
364 {
365     auto [co, _] = runnablesQueue_.Pop();
366     return co;
367 }
368 
ScheduleNextCoroutine()369 void ThreadedCoroutineManager::ScheduleNextCoroutine()
370 {
371     Coroutine *nextCoroutine = PopFromRunnableQueue();
372     nextCoroutine->RequestResume();
373 }
374 
ScheduleImpl()375 void ThreadedCoroutineManager::ScheduleImpl()
376 {
377     ASSERT_NATIVE_CODE();
378     auto *currentCo = Coroutine::GetCurrent();
379     ASSERT(currentCo != nullptr);
380     auto *currentCtx = currentCo->GetContext<ThreadedCoroutineContext>();
381 
382     coroSwitchLock_.Lock();
383     if (RunnableCoroutinesExist()) {
384         currentCo->RequestSuspend(false);
385         PushToRunnableQueue(currentCo, CoroutinePriority::LOW_PRIORITY);
386         ScheduleNextCoroutine();
387 
388         coroSwitchLock_.Unlock();
389         currentCtx->WaitUntilResumed();
390     } else {
391         coroSwitchLock_.Unlock();
392     }
393 }
394 
ChooseWorkerForCoroutine(Coroutine * co)395 CoroutineWorker *ThreadedCoroutineManager::ChooseWorkerForCoroutine([[maybe_unused]] Coroutine *co)
396 {
397     ASSERT(co != nullptr);
398     // Currently this function is a stub: we assign everything to the main worker and hope for the best.
399     // Later on, we will emulate the correct worker selection.
400     os::memory::LockHolder lkWorkers(workersLock_);
401     return workers_[0];
402 }
403 
LaunchImpl(EntrypointInfo && epInfo,PandaString && coroName,CoroutinePriority priority,bool startSuspended)404 bool ThreadedCoroutineManager::LaunchImpl(EntrypointInfo &&epInfo, PandaString &&coroName, CoroutinePriority priority,
405                                           bool startSuspended)
406 {
407     os::memory::LockHolder l(coroSwitchLock_);
408 #ifndef NDEBUG
409     PrintRunnableQueue("LaunchImpl begin");
410 #endif
411     Coroutine *co = CreateCoroutineInstance(std::move(epInfo), std::move(coroName), Coroutine::Type::MUTATOR, priority);
412     Runtime::GetCurrent()->GetNotificationManager()->ThreadStartEvent(co);
413     if (co == nullptr) {
414         LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::LaunchImpl: failed to create a coroutine!";
415         return false;
416     }
417     // assign a worker
418     auto *w = ChooseWorkerForCoroutine(co);
419     co->SetWorker(w);
420     // run
421     auto *ctx = co->GetContext<ThreadedCoroutineContext>();
422     if (startSuspended) {
423         ctx->WaitUntilInitialized();
424         if (runningCorosCount_ >= GetActiveWorkersCount()) {
425             PushToRunnableQueue(co, co->GetPriority());
426         } else {
427             ++runningCorosCount_;
428             ctx->RequestResume();
429         }
430     } else {
431         ++runningCorosCount_;
432     }
433 #ifndef NDEBUG
434     PrintRunnableQueue("LaunchImpl end");
435 #endif
436     return true;
437 }
438 
MainCoroutineCompleted()439 void ThreadedCoroutineManager::MainCoroutineCompleted()
440 {
441     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted() started";
442     ASSERT(Coroutine::GetCurrent() != nullptr);
443     auto *ctx = Coroutine::GetCurrent()->GetContext<ThreadedCoroutineContext>();
444     //  firstly yield
445     {
446         os::memory::LockHolder l(coroSwitchLock_);
447         ctx->MainThreadFinished();
448         if (RunnableCoroutinesExist()) {
449             ScheduleNextCoroutine();
450         }
451     }
452     // then start awaiting for other coroutines to complete
453     os::memory::LockHolder lk(cvAwaitAllMutex_);
454     ctx->EnterAwaitLoop();
455     while (coroutineCount_ > 1) {  // main coro runs till VM shutdown
456         cvAwaitAll_.Wait(&cvAwaitAllMutex_);
457         LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): await_all(): still "
458                                << coroutineCount_ << " coroutines exist...";
459     }
460     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): await_all() done";
461 
462     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): deleting workers";
463     {
464         os::memory::LockHolder lkWorkers(workersLock_);
465         for (auto *worker : workers_) {
466             Runtime::GetCurrent()->GetInternalAllocator()->Delete(worker);
467         }
468         workers_.clear();
469     }
470     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): DONE";
471 }
472 
Finalize()473 void ThreadedCoroutineManager::Finalize() {}
474 
IsMainWorker(Coroutine * co) const475 bool ThreadedCoroutineManager::IsMainWorker(Coroutine *co) const
476 {
477     return co == GetMainThread();
478 }
479 
480 }  // namespace ark
481