1 /**
2 * Copyright (c) 2022-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <thread>
17 #include "runtime/coroutines/coroutine.h"
18 #include "runtime/include/thread_scopes.h"
19 #include "libpandabase/os/mutex.h"
20 #include "runtime/include/runtime.h"
21 #include "runtime/include/runtime_notification.h"
22 #include "runtime/include/panda_vm.h"
23 #include "runtime/coroutines/threaded_coroutine.h"
24 #include "runtime/coroutines/threaded_coroutine_manager.h"
25
26 namespace ark {
27
Initialize(CoroutineManagerConfig config,Runtime * runtime,PandaVM * vm)28 void ThreadedCoroutineManager::Initialize(CoroutineManagerConfig config, Runtime *runtime, PandaVM *vm)
29 {
30 if (config.emulateJs) {
31 LOG(FATAL, COROUTINES) << "ThreadedCoroutineManager(): JS emulation is not supported!";
32 UNREACHABLE();
33 }
34 if (config.workersCount > 0) {
35 SetWorkersCount(static_cast<uint32_t>(config.workersCount));
36 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager(): setting number of coroutine workers to "
37 << GetWorkersCount();
38 } else {
39 SetWorkersCount(std::thread::hardware_concurrency());
40 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager(): setting number of coroutine workers to CPU count = "
41 << GetWorkersCount();
42 }
43
44 auto *mainCo = CreateMainCoroutine(runtime, vm);
45 Coroutine::SetCurrent(mainCo);
46 }
47
GetWorkersCount() const48 uint32_t ThreadedCoroutineManager::GetWorkersCount() const
49 {
50 return workersCount_;
51 }
52
SetWorkersCount(uint32_t n)53 void ThreadedCoroutineManager::SetWorkersCount(uint32_t n)
54 {
55 workersCount_ = n;
56 }
57
CreateCoroutineContext(bool coroHasEntrypoint)58 CoroutineContext *ThreadedCoroutineManager::CreateCoroutineContext([[maybe_unused]] bool coroHasEntrypoint)
59 {
60 auto alloc = Runtime::GetCurrent()->GetInternalAllocator();
61 return alloc->New<ThreadedCoroutineContext>();
62 }
63
DeleteCoroutineContext(CoroutineContext * ctx)64 void ThreadedCoroutineManager::DeleteCoroutineContext(CoroutineContext *ctx)
65 {
66 auto alloc = Runtime::GetCurrent()->GetInternalAllocator();
67 alloc->Delete(ctx);
68 }
69
GetCoroutineCount()70 size_t ThreadedCoroutineManager::GetCoroutineCount()
71 {
72 return coroutineCount_;
73 }
74
GetCoroutineCountLimit()75 size_t ThreadedCoroutineManager::GetCoroutineCountLimit()
76 {
77 return UINT_MAX;
78 }
79
AddToRegistry(Coroutine * co)80 void ThreadedCoroutineManager::AddToRegistry(Coroutine *co)
81 {
82 os::memory::LockHolder lock(coroListLock_);
83 co->GetVM()->GetGC()->OnThreadCreate(co);
84 coroutines_.insert(co);
85 coroutineCount_++;
86 }
87
RemoveFromRegistry(Coroutine * co)88 void ThreadedCoroutineManager::RemoveFromRegistry(Coroutine *co)
89 {
90 coroutines_.erase(co);
91 coroutineCount_--;
92 }
93
DeleteCoroutineInstance(Coroutine * co)94 void ThreadedCoroutineManager::DeleteCoroutineInstance(Coroutine *co)
95 {
96 auto *context = co->GetContext<CoroutineContext>();
97 Runtime::GetCurrent()->GetInternalAllocator()->Delete(co);
98 DeleteCoroutineContext(context);
99 }
100
RegisterCoroutine(Coroutine * co)101 void ThreadedCoroutineManager::RegisterCoroutine(Coroutine *co)
102 {
103 AddToRegistry(co);
104 }
105
TerminateCoroutine(Coroutine * co)106 bool ThreadedCoroutineManager::TerminateCoroutine(Coroutine *co)
107 {
108 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::TerminateCoroutine() started";
109 co->NativeCodeEnd();
110 co->UpdateStatus(ThreadStatus::TERMINATING);
111
112 os::memory::LockHolder l(coroSwitchLock_);
113 if (co->HasManagedEntrypoint()) {
114 // entrypointless coros should be destroyed manually
115 if (RunnableCoroutinesExist()) {
116 ScheduleNextCoroutine();
117 } else {
118 --runningCorosCount_;
119 }
120 }
121
122 {
123 os::memory::LockHolder lList(coroListLock_);
124 RemoveFromRegistry(co);
125 // DestroyInternalResources() must be called in one critical section with
126 // RemoveFromRegistry (under core_list_lock_). This function transfers cards from coro's post_barrier buffer to
127 // UpdateRemsetThread internally. Situation when cards still remain and UpdateRemsetThread cannot visit the
128 // coro (because it is already removed) must be impossible.
129 co->DestroyInternalResources();
130 }
131 co->UpdateStatus(ThreadStatus::FINISHED);
132 Runtime::GetCurrent()->GetNotificationManager()->ThreadEndEvent(co);
133
134 if (!co->HasManagedEntrypoint()) {
135 // entrypointless coros should be destroyed manually
136 return false;
137 }
138
139 DeleteCoroutineInstance(co);
140
141 os::memory::LockHolder lk(cvAwaitAllMutex_);
142 cvAwaitAll_.Signal();
143 return true;
144 // NOTE(konstanting): issue debug notifications to runtime
145 }
146
Launch(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode)147 Coroutine *ThreadedCoroutineManager::Launch(CompletionEvent *completionEvent, Method *entrypoint,
148 PandaVector<Value> &&arguments, [[maybe_unused]] CoroutineLaunchMode mode)
149 {
150 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Launch started";
151
152 auto *result = LaunchImpl(completionEvent, entrypoint, std::move(arguments));
153 if (result == nullptr) {
154 ThrowOutOfMemoryError("Launch failed");
155 }
156
157 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Launch finished";
158 return result;
159 }
160
RegisterWaiter(Coroutine * waiter,CoroutineEvent * awaitee)161 bool ThreadedCoroutineManager::RegisterWaiter(Coroutine *waiter, CoroutineEvent *awaitee)
162 {
163 os::memory::LockHolder l(waitersLock_);
164 if (awaitee->Happened()) {
165 awaitee->Unlock();
166 return false;
167 }
168
169 awaitee->Unlock();
170 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::RegisterAsAwaitee: " << waiter->GetName() << " AWAITS";
171 waiters_.insert({awaitee, waiter});
172 return true;
173 }
174
Await(CoroutineEvent * awaitee)175 void ThreadedCoroutineManager::Await(CoroutineEvent *awaitee)
176 {
177 ASSERT(awaitee != nullptr);
178 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await started";
179
180 auto *waiter = Coroutine::GetCurrent();
181 auto *waiterCtx = waiter->GetContext<ThreadedCoroutineContext>();
182
183 ScopedNativeCodeThread n(waiter);
184 coroSwitchLock_.Lock();
185
186 if (!RegisterWaiter(waiter, awaitee)) {
187 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await finished (no await happened)";
188 coroSwitchLock_.Unlock();
189 return;
190 }
191
192 waiter->RequestSuspend(true);
193 if (RunnableCoroutinesExist()) {
194 ScheduleNextCoroutine();
195 }
196 coroSwitchLock_.Unlock();
197 waiterCtx->WaitUntilResumed();
198
199 // NB: at this point the awaitee is already deleted
200 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await finished";
201 }
202
UnblockWaiters(CoroutineEvent * blocker)203 void ThreadedCoroutineManager::UnblockWaiters(CoroutineEvent *blocker)
204 {
205 os::memory::LockHolder lh(coroSwitchLock_);
206 UnblockWaitersImpl(blocker);
207 }
208
UnblockWaitersImpl(CoroutineEvent * blocker)209 void ThreadedCoroutineManager::UnblockWaitersImpl(CoroutineEvent *blocker)
210 {
211 os::memory::LockHolder l(waitersLock_);
212 ASSERT(blocker != nullptr);
213 #ifndef NDEBUG
214 {
215 os::memory::LockHolder lkBlocker(*blocker);
216 ASSERT(blocker->Happened());
217 }
218 #endif
219 auto w = waiters_.find(blocker);
220 while (w != waiters_.end()) {
221 auto *coro = w->second;
222 waiters_.erase(w);
223 coro->RequestUnblock();
224 PushToRunnableQueue(coro);
225 w = waiters_.find(blocker);
226 }
227 }
228
Schedule()229 void ThreadedCoroutineManager::Schedule()
230 {
231 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Schedule() request from "
232 << Coroutine::GetCurrent()->GetName();
233 ScheduleImpl();
234 }
235
EnumerateThreadsImpl(const ThreadManager::Callback & cb,unsigned int incMask,unsigned int xorMask) const236 bool ThreadedCoroutineManager::EnumerateThreadsImpl(const ThreadManager::Callback &cb, unsigned int incMask,
237 unsigned int xorMask) const
238 {
239 os::memory::LockHolder lock(coroListLock_);
240 for (auto *t : coroutines_) {
241 if (!ApplyCallbackToThread(cb, t, incMask, xorMask)) {
242 return false;
243 }
244 }
245 return true;
246 }
247
SuspendAllThreads()248 void ThreadedCoroutineManager::SuspendAllThreads()
249 {
250 os::memory::LockHolder lList(coroListLock_);
251 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::SuspendAllThreads started";
252 for (auto *t : coroutines_) {
253 t->SuspendImpl(true);
254 }
255 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::SuspendAllThreads finished";
256 }
257
ResumeAllThreads()258 void ThreadedCoroutineManager::ResumeAllThreads()
259 {
260 os::memory::LockHolder lock(coroListLock_);
261 for (auto *t : coroutines_) {
262 t->ResumeImpl(true);
263 }
264 }
265
IsRunningThreadExist()266 bool ThreadedCoroutineManager::IsRunningThreadExist()
267 {
268 UNREACHABLE();
269 // NOTE(konstanting): correct implementation. Which coroutine do we consider running?
270 return false;
271 }
272
WaitForDeregistration()273 void ThreadedCoroutineManager::WaitForDeregistration()
274 {
275 MainCoroutineCompleted();
276 }
277
278 #ifndef NDEBUG
PrintRunnableQueue(const PandaString & requester)279 void ThreadedCoroutineManager::PrintRunnableQueue(const PandaString &requester)
280 {
281 LOG(DEBUG, COROUTINES) << "[" << requester << "] ";
282 for (auto *co : runnablesQueue_) {
283 LOG(DEBUG, COROUTINES) << co->GetName() << " <";
284 }
285 LOG(DEBUG, COROUTINES) << "X";
286 }
287 #endif
288
PushToRunnableQueue(Coroutine * co)289 void ThreadedCoroutineManager::PushToRunnableQueue(Coroutine *co)
290 {
291 runnablesQueue_.push_back(co);
292 }
293
RunnableCoroutinesExist()294 bool ThreadedCoroutineManager::RunnableCoroutinesExist()
295 {
296 return !runnablesQueue_.empty();
297 }
298
PopFromRunnableQueue()299 Coroutine *ThreadedCoroutineManager::PopFromRunnableQueue()
300 {
301 auto *co = runnablesQueue_.front();
302 runnablesQueue_.pop_front();
303 return co;
304 }
305
ScheduleNextCoroutine()306 void ThreadedCoroutineManager::ScheduleNextCoroutine()
307 {
308 Coroutine *nextCoroutine = PopFromRunnableQueue();
309 nextCoroutine->RequestResume();
310 }
311
ScheduleImpl()312 void ThreadedCoroutineManager::ScheduleImpl()
313 {
314 auto *currentCo = Coroutine::GetCurrent();
315 auto *currentCtx = currentCo->GetContext<ThreadedCoroutineContext>();
316 ScopedNativeCodeThread n(currentCo);
317
318 coroSwitchLock_.Lock();
319 if (RunnableCoroutinesExist()) {
320 currentCo->RequestSuspend(false);
321 PushToRunnableQueue(currentCo);
322 ScheduleNextCoroutine();
323
324 coroSwitchLock_.Unlock();
325 currentCtx->WaitUntilResumed();
326 } else {
327 coroSwitchLock_.Unlock();
328 }
329 }
330
LaunchImpl(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,bool startSuspended)331 Coroutine *ThreadedCoroutineManager::LaunchImpl(CompletionEvent *completionEvent, Method *entrypoint,
332 PandaVector<Value> &&arguments, bool startSuspended)
333 {
334 os::memory::LockHolder l(coroSwitchLock_);
335 #ifndef NDEBUG
336 PrintRunnableQueue("LaunchImpl begin");
337 #endif
338 auto coroName = entrypoint->GetFullName();
339 Coroutine *co = CreateCoroutineInstance(completionEvent, entrypoint, std::move(arguments), std::move(coroName));
340 Runtime::GetCurrent()->GetNotificationManager()->ThreadStartEvent(co);
341 if (co == nullptr) {
342 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::LaunchImpl: failed to create a coroutine!";
343 return co;
344 }
345 auto *ctx = co->GetContext<ThreadedCoroutineContext>();
346 if (startSuspended) {
347 ctx->WaitUntilInitialized();
348 if (runningCorosCount_ >= GetWorkersCount()) {
349 PushToRunnableQueue(co);
350 } else {
351 ++runningCorosCount_;
352 ctx->RequestResume();
353 }
354 } else {
355 ++runningCorosCount_;
356 }
357 #ifndef NDEBUG
358 PrintRunnableQueue("LaunchImpl end");
359 #endif
360 return co;
361 }
362
MainCoroutineCompleted()363 void ThreadedCoroutineManager::MainCoroutineCompleted()
364 {
365 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted() started";
366 auto *ctx = Coroutine::GetCurrent()->GetContext<ThreadedCoroutineContext>();
367 // firstly yield
368 {
369 os::memory::LockHolder l(coroSwitchLock_);
370 ctx->MainThreadFinished();
371 if (RunnableCoroutinesExist()) {
372 ScheduleNextCoroutine();
373 }
374 }
375 // then start awaiting for other coroutines to complete
376 os::memory::LockHolder lk(cvAwaitAllMutex_);
377 ctx->EnterAwaitLoop();
378 while (coroutineCount_ > 1) { // main coro runs till VM shutdown
379 cvAwaitAll_.Wait(&cvAwaitAllMutex_);
380 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): await_all(): still "
381 << coroutineCount_ << " coroutines exist...";
382 }
383 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): await_all() done";
384 }
385
Finalize()386 void ThreadedCoroutineManager::Finalize() {}
387
388 } // namespace ark
389