• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (c) 2022-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <thread>
17 #include "runtime/coroutines/coroutine.h"
18 #include "runtime/include/thread_scopes.h"
19 #include "libpandabase/os/mutex.h"
20 #include "runtime/include/runtime.h"
21 #include "runtime/include/runtime_notification.h"
22 #include "runtime/include/panda_vm.h"
23 #include "runtime/coroutines/threaded_coroutine.h"
24 #include "runtime/coroutines/threaded_coroutine_manager.h"
25 
26 namespace ark {
27 
Initialize(CoroutineManagerConfig config,Runtime * runtime,PandaVM * vm)28 void ThreadedCoroutineManager::Initialize(CoroutineManagerConfig config, Runtime *runtime, PandaVM *vm)
29 {
30     if (config.emulateJs) {
31         LOG(FATAL, COROUTINES) << "ThreadedCoroutineManager(): JS emulation is not supported!";
32         UNREACHABLE();
33     }
34     if (config.workersCount > 0) {
35         SetWorkersCount(static_cast<uint32_t>(config.workersCount));
36         LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager(): setting number of coroutine workers to "
37                                << GetWorkersCount();
38     } else {
39         SetWorkersCount(std::thread::hardware_concurrency());
40         LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager(): setting number of coroutine workers to CPU count = "
41                                << GetWorkersCount();
42     }
43 
44     auto *mainCo = CreateMainCoroutine(runtime, vm);
45     Coroutine::SetCurrent(mainCo);
46 }
47 
GetWorkersCount() const48 uint32_t ThreadedCoroutineManager::GetWorkersCount() const
49 {
50     return workersCount_;
51 }
52 
SetWorkersCount(uint32_t n)53 void ThreadedCoroutineManager::SetWorkersCount(uint32_t n)
54 {
55     workersCount_ = n;
56 }
57 
CreateCoroutineContext(bool coroHasEntrypoint)58 CoroutineContext *ThreadedCoroutineManager::CreateCoroutineContext([[maybe_unused]] bool coroHasEntrypoint)
59 {
60     auto alloc = Runtime::GetCurrent()->GetInternalAllocator();
61     return alloc->New<ThreadedCoroutineContext>();
62 }
63 
DeleteCoroutineContext(CoroutineContext * ctx)64 void ThreadedCoroutineManager::DeleteCoroutineContext(CoroutineContext *ctx)
65 {
66     auto alloc = Runtime::GetCurrent()->GetInternalAllocator();
67     alloc->Delete(ctx);
68 }
69 
GetCoroutineCount()70 size_t ThreadedCoroutineManager::GetCoroutineCount()
71 {
72     return coroutineCount_;
73 }
74 
GetCoroutineCountLimit()75 size_t ThreadedCoroutineManager::GetCoroutineCountLimit()
76 {
77     return UINT_MAX;
78 }
79 
AddToRegistry(Coroutine * co)80 void ThreadedCoroutineManager::AddToRegistry(Coroutine *co)
81 {
82     os::memory::LockHolder lock(coroListLock_);
83     co->GetVM()->GetGC()->OnThreadCreate(co);
84     coroutines_.insert(co);
85     coroutineCount_++;
86 }
87 
RemoveFromRegistry(Coroutine * co)88 void ThreadedCoroutineManager::RemoveFromRegistry(Coroutine *co)
89 {
90     coroutines_.erase(co);
91     coroutineCount_--;
92 }
93 
DeleteCoroutineInstance(Coroutine * co)94 void ThreadedCoroutineManager::DeleteCoroutineInstance(Coroutine *co)
95 {
96     auto *context = co->GetContext<CoroutineContext>();
97     Runtime::GetCurrent()->GetInternalAllocator()->Delete(co);
98     DeleteCoroutineContext(context);
99 }
100 
RegisterCoroutine(Coroutine * co)101 void ThreadedCoroutineManager::RegisterCoroutine(Coroutine *co)
102 {
103     AddToRegistry(co);
104 }
105 
TerminateCoroutine(Coroutine * co)106 bool ThreadedCoroutineManager::TerminateCoroutine(Coroutine *co)
107 {
108     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::TerminateCoroutine() started";
109     co->NativeCodeEnd();
110     co->UpdateStatus(ThreadStatus::TERMINATING);
111 
112     os::memory::LockHolder l(coroSwitchLock_);
113     if (co->HasManagedEntrypoint()) {
114         // entrypointless coros should be destroyed manually
115         if (RunnableCoroutinesExist()) {
116             ScheduleNextCoroutine();
117         } else {
118             --runningCorosCount_;
119         }
120     }
121 
122     {
123         os::memory::LockHolder lList(coroListLock_);
124         RemoveFromRegistry(co);
125         // We need collect TLAB metrics and clear TLAB before calling the manage thread destructor
126         // because of the possibility heap use after free. This happening when GC starts execute ResetYoungAllocator
127         // method which start iterate set of threads, collect TLAB metrics and clear TLAB. If thread was deleted from
128         // set but we haven't destroyed the thread yet, GC won't collect metrics and can complete TLAB
129         // deletion faster. And when we try to get the TLAB metrics in the destructor of managed thread, we will get
130         // heap use after free
131         co->CollectTLABMetrics();
132         co->ClearTLAB();
133         // DestroyInternalResources() must be called in one critical section with
134         // RemoveFromRegistry (under core_list_lock_). This function transfers cards from coro's post_barrier buffer to
135         // UpdateRemsetThread internally. Situation when cards still remain and UpdateRemsetThread cannot visit the
136         // coro (because it is already removed) must be impossible.
137         co->DestroyInternalResources();
138     }
139     co->UpdateStatus(ThreadStatus::FINISHED);
140     Runtime::GetCurrent()->GetNotificationManager()->ThreadEndEvent(co);
141 
142     if (!co->HasManagedEntrypoint()) {
143         // entrypointless coros should be destroyed manually
144         return false;
145     }
146 
147     DeleteCoroutineInstance(co);
148 
149     os::memory::LockHolder lk(cvAwaitAllMutex_);
150     cvAwaitAll_.Signal();
151     return true;
152     // NOTE(konstanting): issue debug notifications to runtime
153 }
154 
Launch(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode)155 Coroutine *ThreadedCoroutineManager::Launch(CompletionEvent *completionEvent, Method *entrypoint,
156                                             PandaVector<Value> &&arguments, [[maybe_unused]] CoroutineLaunchMode mode)
157 {
158     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Launch started";
159 
160     auto *result = LaunchImpl(completionEvent, entrypoint, std::move(arguments));
161     if (result == nullptr) {
162         ThrowOutOfMemoryError("Launch failed");
163     }
164 
165     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Launch finished";
166     return result;
167 }
168 
RegisterWaiter(Coroutine * waiter,CoroutineEvent * awaitee)169 bool ThreadedCoroutineManager::RegisterWaiter(Coroutine *waiter, CoroutineEvent *awaitee)
170 {
171     os::memory::LockHolder l(waitersLock_);
172     if (awaitee->Happened()) {
173         awaitee->Unlock();
174         return false;
175     }
176 
177     awaitee->Unlock();
178     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::RegisterAsAwaitee: " << waiter->GetName() << " AWAITS";
179     waiters_.insert({awaitee, waiter});
180     return true;
181 }
182 
Await(CoroutineEvent * awaitee)183 void ThreadedCoroutineManager::Await(CoroutineEvent *awaitee)
184 {
185     ASSERT(awaitee != nullptr);
186     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await started";
187 
188     auto *waiter = Coroutine::GetCurrent();
189     auto *waiterCtx = waiter->GetContext<ThreadedCoroutineContext>();
190 
191     ScopedNativeCodeThread n(waiter);
192     coroSwitchLock_.Lock();
193 
194     if (!RegisterWaiter(waiter, awaitee)) {
195         LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await finished (no await happened)";
196         coroSwitchLock_.Unlock();
197         return;
198     }
199 
200     waiter->RequestSuspend(true);
201     if (RunnableCoroutinesExist()) {
202         ScheduleNextCoroutine();
203     }
204     coroSwitchLock_.Unlock();
205     waiterCtx->WaitUntilResumed();
206 
207     // NB: at this point the awaitee is already deleted
208     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await finished";
209 }
210 
UnblockWaiters(CoroutineEvent * blocker)211 void ThreadedCoroutineManager::UnblockWaiters(CoroutineEvent *blocker)
212 {
213     os::memory::LockHolder lh(coroSwitchLock_);
214     UnblockWaitersImpl(blocker);
215 }
216 
UnblockWaitersImpl(CoroutineEvent * blocker)217 void ThreadedCoroutineManager::UnblockWaitersImpl(CoroutineEvent *blocker)
218 {
219     os::memory::LockHolder l(waitersLock_);
220     ASSERT(blocker != nullptr);
221 #ifndef NDEBUG
222     {
223         os::memory::LockHolder lkBlocker(*blocker);
224         ASSERT(blocker->Happened());
225     }
226 #endif
227     auto w = waiters_.find(blocker);
228     while (w != waiters_.end()) {
229         auto *coro = w->second;
230         waiters_.erase(w);
231         coro->RequestUnblock();
232         PushToRunnableQueue(coro);
233         w = waiters_.find(blocker);
234     }
235 }
236 
Schedule()237 void ThreadedCoroutineManager::Schedule()
238 {
239     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Schedule() request from "
240                            << Coroutine::GetCurrent()->GetName();
241     ScheduleImpl();
242 }
243 
EnumerateThreadsImpl(const ThreadManager::Callback & cb,unsigned int incMask,unsigned int xorMask) const244 bool ThreadedCoroutineManager::EnumerateThreadsImpl(const ThreadManager::Callback &cb, unsigned int incMask,
245                                                     unsigned int xorMask) const
246 {
247     os::memory::LockHolder lock(coroListLock_);
248     for (auto *t : coroutines_) {
249         if (!ApplyCallbackToThread(cb, t, incMask, xorMask)) {
250             return false;
251         }
252     }
253     return true;
254 }
255 
SuspendAllThreads()256 void ThreadedCoroutineManager::SuspendAllThreads()
257 {
258     os::memory::LockHolder lList(coroListLock_);
259     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::SuspendAllThreads started";
260     for (auto *t : coroutines_) {
261         t->SuspendImpl(true);
262     }
263     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::SuspendAllThreads finished";
264 }
265 
ResumeAllThreads()266 void ThreadedCoroutineManager::ResumeAllThreads()
267 {
268     os::memory::LockHolder lock(coroListLock_);
269     for (auto *t : coroutines_) {
270         t->ResumeImpl(true);
271     }
272 }
273 
IsRunningThreadExist()274 bool ThreadedCoroutineManager::IsRunningThreadExist()
275 {
276     UNREACHABLE();
277     // NOTE(konstanting): correct implementation. Which coroutine do we consider running?
278     return false;
279 }
280 
WaitForDeregistration()281 void ThreadedCoroutineManager::WaitForDeregistration()
282 {
283     MainCoroutineCompleted();
284 }
285 
286 #ifndef NDEBUG
PrintRunnableQueue(const PandaString & requester)287 void ThreadedCoroutineManager::PrintRunnableQueue(const PandaString &requester)
288 {
289     LOG(DEBUG, COROUTINES) << "[" << requester << "] ";
290     for (auto *co : runnablesQueue_) {
291         LOG(DEBUG, COROUTINES) << co->GetName() << " <";
292     }
293     LOG(DEBUG, COROUTINES) << "X";
294 }
295 #endif
296 
PushToRunnableQueue(Coroutine * co)297 void ThreadedCoroutineManager::PushToRunnableQueue(Coroutine *co)
298 {
299     runnablesQueue_.push_back(co);
300 }
301 
RunnableCoroutinesExist()302 bool ThreadedCoroutineManager::RunnableCoroutinesExist()
303 {
304     return !runnablesQueue_.empty();
305 }
306 
PopFromRunnableQueue()307 Coroutine *ThreadedCoroutineManager::PopFromRunnableQueue()
308 {
309     auto *co = runnablesQueue_.front();
310     runnablesQueue_.pop_front();
311     return co;
312 }
313 
ScheduleNextCoroutine()314 void ThreadedCoroutineManager::ScheduleNextCoroutine()
315 {
316     Coroutine *nextCoroutine = PopFromRunnableQueue();
317     nextCoroutine->RequestResume();
318 }
319 
ScheduleImpl()320 void ThreadedCoroutineManager::ScheduleImpl()
321 {
322     auto *currentCo = Coroutine::GetCurrent();
323     auto *currentCtx = currentCo->GetContext<ThreadedCoroutineContext>();
324     ScopedNativeCodeThread n(currentCo);
325 
326     coroSwitchLock_.Lock();
327     if (RunnableCoroutinesExist()) {
328         currentCo->RequestSuspend(false);
329         PushToRunnableQueue(currentCo);
330         ScheduleNextCoroutine();
331 
332         coroSwitchLock_.Unlock();
333         currentCtx->WaitUntilResumed();
334     } else {
335         coroSwitchLock_.Unlock();
336     }
337 }
338 
LaunchImpl(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,bool startSuspended)339 Coroutine *ThreadedCoroutineManager::LaunchImpl(CompletionEvent *completionEvent, Method *entrypoint,
340                                                 PandaVector<Value> &&arguments, bool startSuspended)
341 {
342     os::memory::LockHolder l(coroSwitchLock_);
343 #ifndef NDEBUG
344     PrintRunnableQueue("LaunchImpl begin");
345 #endif
346     auto coroName = entrypoint->GetFullName();
347     Coroutine *co = CreateCoroutineInstance(completionEvent, entrypoint, std::move(arguments), std::move(coroName));
348     Runtime::GetCurrent()->GetNotificationManager()->ThreadStartEvent(co);
349     if (co == nullptr) {
350         LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::LaunchImpl: failed to create a coroutine!";
351         return co;
352     }
353     auto *ctx = co->GetContext<ThreadedCoroutineContext>();
354     if (startSuspended) {
355         ctx->WaitUntilInitialized();
356         if (runningCorosCount_ >= GetWorkersCount()) {
357             PushToRunnableQueue(co);
358         } else {
359             ++runningCorosCount_;
360             ctx->RequestResume();
361         }
362     } else {
363         ++runningCorosCount_;
364     }
365 #ifndef NDEBUG
366     PrintRunnableQueue("LaunchImpl end");
367 #endif
368     return co;
369 }
370 
MainCoroutineCompleted()371 void ThreadedCoroutineManager::MainCoroutineCompleted()
372 {
373     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted() started";
374     auto *ctx = Coroutine::GetCurrent()->GetContext<ThreadedCoroutineContext>();
375     //  firstly yield
376     {
377         os::memory::LockHolder l(coroSwitchLock_);
378         ctx->MainThreadFinished();
379         if (RunnableCoroutinesExist()) {
380             ScheduleNextCoroutine();
381         }
382     }
383     // then start awaiting for other coroutines to complete
384     os::memory::LockHolder lk(cvAwaitAllMutex_);
385     ctx->EnterAwaitLoop();
386     while (coroutineCount_ > 1) {  // main coro runs till VM shutdown
387         cvAwaitAll_.Wait(&cvAwaitAllMutex_);
388         LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): await_all(): still "
389                                << coroutineCount_ << " coroutines exist...";
390     }
391     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): await_all() done";
392 }
393 
Finalize()394 void ThreadedCoroutineManager::Finalize() {}
395 
IsMainWorker(Coroutine * co) const396 bool ThreadedCoroutineManager::IsMainWorker(Coroutine *co) const
397 {
398     return co == GetMainThread();
399 }
400 
401 }  // namespace ark
402