1 /**
2 * Copyright (c) 2022-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <thread>
17 #include "runtime/coroutines/coroutine.h"
18 #include "runtime/include/thread_scopes.h"
19 #include "libpandabase/os/mutex.h"
20 #include "runtime/include/runtime.h"
21 #include "runtime/include/runtime_notification.h"
22 #include "runtime/include/panda_vm.h"
23 #include "runtime/coroutines/threaded_coroutine.h"
24 #include "runtime/coroutines/threaded_coroutine_manager.h"
25
26 namespace ark {
27
Initialize(CoroutineManagerConfig config,Runtime * runtime,PandaVM * vm)28 void ThreadedCoroutineManager::Initialize(CoroutineManagerConfig config, Runtime *runtime, PandaVM *vm)
29 {
30 if (config.emulateJs) {
31 LOG(FATAL, COROUTINES) << "ThreadedCoroutineManager(): JS emulation is not supported!";
32 UNREACHABLE();
33 }
34 if (config.workersCount > 0) {
35 SetWorkersCount(static_cast<uint32_t>(config.workersCount));
36 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager(): setting number of coroutine workers to "
37 << GetWorkersCount();
38 } else {
39 SetWorkersCount(std::thread::hardware_concurrency());
40 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager(): setting number of coroutine workers to CPU count = "
41 << GetWorkersCount();
42 }
43
44 auto *mainCo = CreateMainCoroutine(runtime, vm);
45 Coroutine::SetCurrent(mainCo);
46 }
47
GetWorkersCount() const48 uint32_t ThreadedCoroutineManager::GetWorkersCount() const
49 {
50 return workersCount_;
51 }
52
SetWorkersCount(uint32_t n)53 void ThreadedCoroutineManager::SetWorkersCount(uint32_t n)
54 {
55 workersCount_ = n;
56 }
57
CreateCoroutineContext(bool coroHasEntrypoint)58 CoroutineContext *ThreadedCoroutineManager::CreateCoroutineContext([[maybe_unused]] bool coroHasEntrypoint)
59 {
60 auto alloc = Runtime::GetCurrent()->GetInternalAllocator();
61 return alloc->New<ThreadedCoroutineContext>();
62 }
63
DeleteCoroutineContext(CoroutineContext * ctx)64 void ThreadedCoroutineManager::DeleteCoroutineContext(CoroutineContext *ctx)
65 {
66 auto alloc = Runtime::GetCurrent()->GetInternalAllocator();
67 alloc->Delete(ctx);
68 }
69
GetCoroutineCount()70 size_t ThreadedCoroutineManager::GetCoroutineCount()
71 {
72 return coroutineCount_;
73 }
74
GetCoroutineCountLimit()75 size_t ThreadedCoroutineManager::GetCoroutineCountLimit()
76 {
77 return UINT_MAX;
78 }
79
AddToRegistry(Coroutine * co)80 void ThreadedCoroutineManager::AddToRegistry(Coroutine *co)
81 {
82 os::memory::LockHolder lock(coroListLock_);
83 co->GetVM()->GetGC()->OnThreadCreate(co);
84 coroutines_.insert(co);
85 coroutineCount_++;
86 }
87
RemoveFromRegistry(Coroutine * co)88 void ThreadedCoroutineManager::RemoveFromRegistry(Coroutine *co)
89 {
90 coroutines_.erase(co);
91 coroutineCount_--;
92 }
93
DeleteCoroutineInstance(Coroutine * co)94 void ThreadedCoroutineManager::DeleteCoroutineInstance(Coroutine *co)
95 {
96 auto *context = co->GetContext<CoroutineContext>();
97 Runtime::GetCurrent()->GetInternalAllocator()->Delete(co);
98 DeleteCoroutineContext(context);
99 }
100
RegisterCoroutine(Coroutine * co)101 void ThreadedCoroutineManager::RegisterCoroutine(Coroutine *co)
102 {
103 AddToRegistry(co);
104 }
105
TerminateCoroutine(Coroutine * co)106 bool ThreadedCoroutineManager::TerminateCoroutine(Coroutine *co)
107 {
108 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::TerminateCoroutine() started";
109 co->NativeCodeEnd();
110 co->UpdateStatus(ThreadStatus::TERMINATING);
111
112 os::memory::LockHolder l(coroSwitchLock_);
113 if (co->HasManagedEntrypoint()) {
114 // entrypointless coros should be destroyed manually
115 if (RunnableCoroutinesExist()) {
116 ScheduleNextCoroutine();
117 } else {
118 --runningCorosCount_;
119 }
120 }
121
122 {
123 os::memory::LockHolder lList(coroListLock_);
124 RemoveFromRegistry(co);
125 // We need collect TLAB metrics and clear TLAB before calling the manage thread destructor
126 // because of the possibility heap use after free. This happening when GC starts execute ResetYoungAllocator
127 // method which start iterate set of threads, collect TLAB metrics and clear TLAB. If thread was deleted from
128 // set but we haven't destroyed the thread yet, GC won't collect metrics and can complete TLAB
129 // deletion faster. And when we try to get the TLAB metrics in the destructor of managed thread, we will get
130 // heap use after free
131 co->CollectTLABMetrics();
132 co->ClearTLAB();
133 // DestroyInternalResources() must be called in one critical section with
134 // RemoveFromRegistry (under core_list_lock_). This function transfers cards from coro's post_barrier buffer to
135 // UpdateRemsetThread internally. Situation when cards still remain and UpdateRemsetThread cannot visit the
136 // coro (because it is already removed) must be impossible.
137 co->DestroyInternalResources();
138 }
139 co->UpdateStatus(ThreadStatus::FINISHED);
140 Runtime::GetCurrent()->GetNotificationManager()->ThreadEndEvent(co);
141
142 if (!co->HasManagedEntrypoint()) {
143 // entrypointless coros should be destroyed manually
144 return false;
145 }
146
147 DeleteCoroutineInstance(co);
148
149 os::memory::LockHolder lk(cvAwaitAllMutex_);
150 cvAwaitAll_.Signal();
151 return true;
152 // NOTE(konstanting): issue debug notifications to runtime
153 }
154
Launch(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode)155 Coroutine *ThreadedCoroutineManager::Launch(CompletionEvent *completionEvent, Method *entrypoint,
156 PandaVector<Value> &&arguments, [[maybe_unused]] CoroutineLaunchMode mode)
157 {
158 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Launch started";
159
160 auto *result = LaunchImpl(completionEvent, entrypoint, std::move(arguments));
161 if (result == nullptr) {
162 ThrowOutOfMemoryError("Launch failed");
163 }
164
165 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Launch finished";
166 return result;
167 }
168
RegisterWaiter(Coroutine * waiter,CoroutineEvent * awaitee)169 bool ThreadedCoroutineManager::RegisterWaiter(Coroutine *waiter, CoroutineEvent *awaitee)
170 {
171 os::memory::LockHolder l(waitersLock_);
172 if (awaitee->Happened()) {
173 awaitee->Unlock();
174 return false;
175 }
176
177 awaitee->Unlock();
178 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::RegisterAsAwaitee: " << waiter->GetName() << " AWAITS";
179 waiters_.insert({awaitee, waiter});
180 return true;
181 }
182
Await(CoroutineEvent * awaitee)183 void ThreadedCoroutineManager::Await(CoroutineEvent *awaitee)
184 {
185 ASSERT(awaitee != nullptr);
186 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await started";
187
188 auto *waiter = Coroutine::GetCurrent();
189 auto *waiterCtx = waiter->GetContext<ThreadedCoroutineContext>();
190
191 ScopedNativeCodeThread n(waiter);
192 coroSwitchLock_.Lock();
193
194 if (!RegisterWaiter(waiter, awaitee)) {
195 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await finished (no await happened)";
196 coroSwitchLock_.Unlock();
197 return;
198 }
199
200 waiter->RequestSuspend(true);
201 if (RunnableCoroutinesExist()) {
202 ScheduleNextCoroutine();
203 }
204 coroSwitchLock_.Unlock();
205 waiterCtx->WaitUntilResumed();
206
207 // NB: at this point the awaitee is already deleted
208 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await finished";
209 }
210
UnblockWaiters(CoroutineEvent * blocker)211 void ThreadedCoroutineManager::UnblockWaiters(CoroutineEvent *blocker)
212 {
213 os::memory::LockHolder lh(coroSwitchLock_);
214 UnblockWaitersImpl(blocker);
215 }
216
UnblockWaitersImpl(CoroutineEvent * blocker)217 void ThreadedCoroutineManager::UnblockWaitersImpl(CoroutineEvent *blocker)
218 {
219 os::memory::LockHolder l(waitersLock_);
220 ASSERT(blocker != nullptr);
221 #ifndef NDEBUG
222 {
223 os::memory::LockHolder lkBlocker(*blocker);
224 ASSERT(blocker->Happened());
225 }
226 #endif
227 auto w = waiters_.find(blocker);
228 while (w != waiters_.end()) {
229 auto *coro = w->second;
230 waiters_.erase(w);
231 coro->RequestUnblock();
232 PushToRunnableQueue(coro);
233 w = waiters_.find(blocker);
234 }
235 }
236
Schedule()237 void ThreadedCoroutineManager::Schedule()
238 {
239 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Schedule() request from "
240 << Coroutine::GetCurrent()->GetName();
241 ScheduleImpl();
242 }
243
EnumerateThreadsImpl(const ThreadManager::Callback & cb,unsigned int incMask,unsigned int xorMask) const244 bool ThreadedCoroutineManager::EnumerateThreadsImpl(const ThreadManager::Callback &cb, unsigned int incMask,
245 unsigned int xorMask) const
246 {
247 os::memory::LockHolder lock(coroListLock_);
248 for (auto *t : coroutines_) {
249 if (!ApplyCallbackToThread(cb, t, incMask, xorMask)) {
250 return false;
251 }
252 }
253 return true;
254 }
255
SuspendAllThreads()256 void ThreadedCoroutineManager::SuspendAllThreads()
257 {
258 os::memory::LockHolder lList(coroListLock_);
259 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::SuspendAllThreads started";
260 for (auto *t : coroutines_) {
261 t->SuspendImpl(true);
262 }
263 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::SuspendAllThreads finished";
264 }
265
ResumeAllThreads()266 void ThreadedCoroutineManager::ResumeAllThreads()
267 {
268 os::memory::LockHolder lock(coroListLock_);
269 for (auto *t : coroutines_) {
270 t->ResumeImpl(true);
271 }
272 }
273
IsRunningThreadExist()274 bool ThreadedCoroutineManager::IsRunningThreadExist()
275 {
276 UNREACHABLE();
277 // NOTE(konstanting): correct implementation. Which coroutine do we consider running?
278 return false;
279 }
280
WaitForDeregistration()281 void ThreadedCoroutineManager::WaitForDeregistration()
282 {
283 MainCoroutineCompleted();
284 }
285
286 #ifndef NDEBUG
PrintRunnableQueue(const PandaString & requester)287 void ThreadedCoroutineManager::PrintRunnableQueue(const PandaString &requester)
288 {
289 LOG(DEBUG, COROUTINES) << "[" << requester << "] ";
290 for (auto *co : runnablesQueue_) {
291 LOG(DEBUG, COROUTINES) << co->GetName() << " <";
292 }
293 LOG(DEBUG, COROUTINES) << "X";
294 }
295 #endif
296
PushToRunnableQueue(Coroutine * co)297 void ThreadedCoroutineManager::PushToRunnableQueue(Coroutine *co)
298 {
299 runnablesQueue_.push_back(co);
300 }
301
RunnableCoroutinesExist()302 bool ThreadedCoroutineManager::RunnableCoroutinesExist()
303 {
304 return !runnablesQueue_.empty();
305 }
306
PopFromRunnableQueue()307 Coroutine *ThreadedCoroutineManager::PopFromRunnableQueue()
308 {
309 auto *co = runnablesQueue_.front();
310 runnablesQueue_.pop_front();
311 return co;
312 }
313
ScheduleNextCoroutine()314 void ThreadedCoroutineManager::ScheduleNextCoroutine()
315 {
316 Coroutine *nextCoroutine = PopFromRunnableQueue();
317 nextCoroutine->RequestResume();
318 }
319
ScheduleImpl()320 void ThreadedCoroutineManager::ScheduleImpl()
321 {
322 auto *currentCo = Coroutine::GetCurrent();
323 auto *currentCtx = currentCo->GetContext<ThreadedCoroutineContext>();
324 ScopedNativeCodeThread n(currentCo);
325
326 coroSwitchLock_.Lock();
327 if (RunnableCoroutinesExist()) {
328 currentCo->RequestSuspend(false);
329 PushToRunnableQueue(currentCo);
330 ScheduleNextCoroutine();
331
332 coroSwitchLock_.Unlock();
333 currentCtx->WaitUntilResumed();
334 } else {
335 coroSwitchLock_.Unlock();
336 }
337 }
338
LaunchImpl(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,bool startSuspended)339 Coroutine *ThreadedCoroutineManager::LaunchImpl(CompletionEvent *completionEvent, Method *entrypoint,
340 PandaVector<Value> &&arguments, bool startSuspended)
341 {
342 os::memory::LockHolder l(coroSwitchLock_);
343 #ifndef NDEBUG
344 PrintRunnableQueue("LaunchImpl begin");
345 #endif
346 auto coroName = entrypoint->GetFullName();
347 Coroutine *co = CreateCoroutineInstance(completionEvent, entrypoint, std::move(arguments), std::move(coroName));
348 Runtime::GetCurrent()->GetNotificationManager()->ThreadStartEvent(co);
349 if (co == nullptr) {
350 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::LaunchImpl: failed to create a coroutine!";
351 return co;
352 }
353 auto *ctx = co->GetContext<ThreadedCoroutineContext>();
354 if (startSuspended) {
355 ctx->WaitUntilInitialized();
356 if (runningCorosCount_ >= GetWorkersCount()) {
357 PushToRunnableQueue(co);
358 } else {
359 ++runningCorosCount_;
360 ctx->RequestResume();
361 }
362 } else {
363 ++runningCorosCount_;
364 }
365 #ifndef NDEBUG
366 PrintRunnableQueue("LaunchImpl end");
367 #endif
368 return co;
369 }
370
MainCoroutineCompleted()371 void ThreadedCoroutineManager::MainCoroutineCompleted()
372 {
373 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted() started";
374 auto *ctx = Coroutine::GetCurrent()->GetContext<ThreadedCoroutineContext>();
375 // firstly yield
376 {
377 os::memory::LockHolder l(coroSwitchLock_);
378 ctx->MainThreadFinished();
379 if (RunnableCoroutinesExist()) {
380 ScheduleNextCoroutine();
381 }
382 }
383 // then start awaiting for other coroutines to complete
384 os::memory::LockHolder lk(cvAwaitAllMutex_);
385 ctx->EnterAwaitLoop();
386 while (coroutineCount_ > 1) { // main coro runs till VM shutdown
387 cvAwaitAll_.Wait(&cvAwaitAllMutex_);
388 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): await_all(): still "
389 << coroutineCount_ << " coroutines exist...";
390 }
391 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): await_all() done";
392 }
393
Finalize()394 void ThreadedCoroutineManager::Finalize() {}
395
IsMainWorker(Coroutine * co) const396 bool ThreadedCoroutineManager::IsMainWorker(Coroutine *co) const
397 {
398 return co == GetMainThread();
399 }
400
401 } // namespace ark
402