1 /**
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <thread>
17 #include "runtime/coroutines/coroutine.h"
18 #include "runtime/include/thread_scopes.h"
19 #include "libpandabase/os/mutex.h"
20 #include "runtime/include/runtime.h"
21 #include "runtime/include/runtime_notification.h"
22 #include "runtime/include/panda_vm.h"
23 #include "runtime/coroutines/threaded_coroutine.h"
24 #include "runtime/coroutines/threaded_coroutine_manager.h"
25
26 namespace panda {
27
Initialize(CoroutineManagerConfig config,Runtime * runtime,PandaVM * vm)28 void ThreadedCoroutineManager::Initialize(CoroutineManagerConfig config, Runtime *runtime, PandaVM *vm)
29 {
30 if (config.emulateJs) {
31 LOG(FATAL, COROUTINES) << "ThreadedCoroutineManager(): JS emulation is not supported!";
32 UNREACHABLE();
33 }
34 if (config.workersCount > 0) {
35 SetWorkersCount(static_cast<uint32_t>(config.workersCount));
36 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager(): setting number of coroutine workers to "
37 << GetWorkersCount();
38 } else {
39 SetWorkersCount(std::thread::hardware_concurrency());
40 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager(): setting number of coroutine workers to CPU count = "
41 << GetWorkersCount();
42 }
43
44 auto *mainCo = CreateMainCoroutine(runtime, vm);
45 Coroutine::SetCurrent(mainCo);
46 }
47
GetWorkersCount() const48 uint32_t ThreadedCoroutineManager::GetWorkersCount() const
49 {
50 return workersCount_;
51 }
52
SetWorkersCount(uint32_t n)53 void ThreadedCoroutineManager::SetWorkersCount(uint32_t n)
54 {
55 workersCount_ = n;
56 }
57
CreateCoroutineContext(bool coroHasEntrypoint)58 CoroutineContext *ThreadedCoroutineManager::CreateCoroutineContext([[maybe_unused]] bool coroHasEntrypoint)
59 {
60 auto alloc = Runtime::GetCurrent()->GetInternalAllocator();
61 return alloc->New<ThreadedCoroutineContext>();
62 }
63
DeleteCoroutineContext(CoroutineContext * ctx)64 void ThreadedCoroutineManager::DeleteCoroutineContext(CoroutineContext *ctx)
65 {
66 auto alloc = Runtime::GetCurrent()->GetInternalAllocator();
67 alloc->Delete(ctx);
68 }
69
GetCoroutineCount()70 size_t ThreadedCoroutineManager::GetCoroutineCount()
71 {
72 return coroutineCount_;
73 }
74
GetCoroutineCountLimit()75 size_t ThreadedCoroutineManager::GetCoroutineCountLimit()
76 {
77 return UINT_MAX;
78 }
79
AddToRegistry(Coroutine * co)80 void ThreadedCoroutineManager::AddToRegistry(Coroutine *co)
81 {
82 os::memory::LockHolder lock(coroListLock_);
83 co->GetVM()->GetGC()->OnThreadCreate(co);
84 coroutines_.insert(co);
85 coroutineCount_++;
86 }
87
RemoveFromRegistry(Coroutine * co)88 void ThreadedCoroutineManager::RemoveFromRegistry(Coroutine *co)
89 {
90 coroutines_.erase(co);
91 coroutineCount_--;
92 }
93
DeleteCoroutineInstance(Coroutine * co)94 void ThreadedCoroutineManager::DeleteCoroutineInstance(Coroutine *co)
95 {
96 auto *context = co->GetContext<CoroutineContext>();
97 Runtime::GetCurrent()->GetInternalAllocator()->Delete(co);
98 DeleteCoroutineContext(context);
99 }
100
RegisterCoroutine(Coroutine * co)101 void ThreadedCoroutineManager::RegisterCoroutine(Coroutine *co)
102 {
103 AddToRegistry(co);
104 }
105
TerminateCoroutine(Coroutine * co)106 bool ThreadedCoroutineManager::TerminateCoroutine(Coroutine *co)
107 {
108 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::TerminateCoroutine() started";
109 co->NativeCodeEnd();
110 co->UpdateStatus(ThreadStatus::TERMINATING);
111
112 os::memory::LockHolder l(coroSwitchLock_);
113 if (co->HasManagedEntrypoint()) {
114 // entrypointless coros should be destroyed manually
115 UnblockWaiters(co->GetCompletionEvent());
116 if (RunnableCoroutinesExist()) {
117 ScheduleNextCoroutine();
118 } else {
119 --runningCorosCount_;
120 }
121 }
122
123 {
124 os::memory::LockHolder lList(coroListLock_);
125 RemoveFromRegistry(co);
126 // DestroyInternalResources() must be called in one critical section with
127 // RemoveFromRegistry (under core_list_lock_). This function transfers cards from coro's post_barrier buffer to
128 // UpdateRemsetThread internally. Situation when cards still remain and UpdateRemsetThread cannot visit the
129 // coro (because it is already removed) must be impossible.
130 co->DestroyInternalResources();
131 }
132 co->UpdateStatus(ThreadStatus::FINISHED);
133 Runtime::GetCurrent()->GetNotificationManager()->ThreadEndEvent(co);
134
135 if (!co->HasManagedEntrypoint()) {
136 // entrypointless coros should be destroyed manually
137 return false;
138 }
139
140 DeleteCoroutineInstance(co);
141 cvAwaitAll_.Signal();
142 return true;
143 // NOTE(konstanting): issue debug notifications to runtime
144 }
145
Launch(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineAffinity affinity)146 Coroutine *ThreadedCoroutineManager::Launch(CompletionEvent *completionEvent, Method *entrypoint,
147 PandaVector<Value> &&arguments, [[maybe_unused]] CoroutineAffinity affinity)
148 {
149 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Launch started";
150
151 auto *result = LaunchImpl(completionEvent, entrypoint, std::move(arguments));
152 if (result == nullptr) {
153 ThrowOutOfMemoryError("Launch failed");
154 }
155
156 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Launch finished";
157 return result;
158 }
159
RegisterWaiter(Coroutine * waiter,CoroutineEvent * awaitee)160 bool ThreadedCoroutineManager::RegisterWaiter(Coroutine *waiter, CoroutineEvent *awaitee)
161 {
162 os::memory::LockHolder l(waitersLock_);
163 if (awaitee->Happened()) {
164 awaitee->Unlock();
165 return false;
166 }
167
168 awaitee->Unlock();
169 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::RegisterAsAwaitee: " << waiter->GetName() << " AWAITS";
170 waiters_.insert({awaitee, waiter});
171 return true;
172 }
173
Await(CoroutineEvent * awaitee)174 void ThreadedCoroutineManager::Await(CoroutineEvent *awaitee)
175 {
176 ASSERT(awaitee != nullptr);
177 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await started";
178
179 auto *waiter = Coroutine::GetCurrent();
180 auto *waiterCtx = waiter->GetContext<ThreadedCoroutineContext>();
181
182 ScopedNativeCodeThread n(waiter);
183 coroSwitchLock_.Lock();
184
185 if (!RegisterWaiter(waiter, awaitee)) {
186 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await finished (no await happened)";
187 coroSwitchLock_.Unlock();
188 return;
189 }
190
191 waiter->RequestSuspend(true);
192 if (RunnableCoroutinesExist()) {
193 ScheduleNextCoroutine();
194 }
195 coroSwitchLock_.Unlock();
196 waiterCtx->WaitUntilResumed();
197
198 // NB: at this point the awaitee is already deleted
199 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await finished";
200 }
201
UnblockWaiters(CoroutineEvent * blocker)202 void ThreadedCoroutineManager::UnblockWaiters(CoroutineEvent *blocker)
203 {
204 os::memory::LockHolder l(waitersLock_);
205 ASSERT(blocker != nullptr);
206 #ifndef NDEBUG
207 {
208 os::memory::LockHolder lkBlocker(*blocker);
209 ASSERT(blocker->Happened());
210 }
211 #endif
212 auto w = waiters_.find(blocker);
213 if (w != waiters_.end()) {
214 auto *coro = w->second;
215 waiters_.erase(w);
216 coro->RequestUnblock();
217 PushToRunnableQueue(coro);
218 }
219 }
220
Schedule()221 void ThreadedCoroutineManager::Schedule()
222 {
223 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Schedule() request from "
224 << Coroutine::GetCurrent()->GetName();
225 ScheduleImpl();
226 }
227
EnumerateThreadsImpl(const ThreadManager::Callback & cb,unsigned int incMask,unsigned int xorMask) const228 bool ThreadedCoroutineManager::EnumerateThreadsImpl(const ThreadManager::Callback &cb, unsigned int incMask,
229 unsigned int xorMask) const
230 {
231 os::memory::LockHolder lock(coroListLock_);
232 for (auto *t : coroutines_) {
233 if (!ApplyCallbackToThread(cb, t, incMask, xorMask)) {
234 return false;
235 }
236 }
237 return true;
238 }
239
SuspendAllThreads()240 void ThreadedCoroutineManager::SuspendAllThreads()
241 {
242 os::memory::LockHolder lList(coroListLock_);
243 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::SuspendAllThreads started";
244 for (auto *t : coroutines_) {
245 t->SuspendImpl(true);
246 }
247 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::SuspendAllThreads finished";
248 }
249
ResumeAllThreads()250 void ThreadedCoroutineManager::ResumeAllThreads()
251 {
252 os::memory::LockHolder lock(coroListLock_);
253 for (auto *t : coroutines_) {
254 t->ResumeImpl(true);
255 }
256 }
257
IsRunningThreadExist()258 bool ThreadedCoroutineManager::IsRunningThreadExist()
259 {
260 UNREACHABLE();
261 // NOTE(konstanting): correct implementation. Which coroutine do we consider running?
262 return false;
263 }
264
WaitForDeregistration()265 void ThreadedCoroutineManager::WaitForDeregistration()
266 {
267 MainCoroutineCompleted();
268 }
269
270 #ifndef NDEBUG
PrintRunnableQueue(const PandaString & requester)271 void ThreadedCoroutineManager::PrintRunnableQueue(const PandaString &requester)
272 {
273 LOG(DEBUG, COROUTINES) << "[" << requester << "] ";
274 for (auto *co : runnablesQueue_) {
275 LOG(DEBUG, COROUTINES) << co->GetName() << " <";
276 }
277 LOG(DEBUG, COROUTINES) << "X";
278 }
279 #endif
280
PushToRunnableQueue(Coroutine * co)281 void ThreadedCoroutineManager::PushToRunnableQueue(Coroutine *co)
282 {
283 runnablesQueue_.push_back(co);
284 }
285
RunnableCoroutinesExist()286 bool ThreadedCoroutineManager::RunnableCoroutinesExist()
287 {
288 return !runnablesQueue_.empty();
289 }
290
PopFromRunnableQueue()291 Coroutine *ThreadedCoroutineManager::PopFromRunnableQueue()
292 {
293 auto *co = runnablesQueue_.front();
294 runnablesQueue_.pop_front();
295 return co;
296 }
297
ScheduleNextCoroutine()298 void ThreadedCoroutineManager::ScheduleNextCoroutine()
299 {
300 Coroutine *nextCoroutine = PopFromRunnableQueue();
301 nextCoroutine->RequestResume();
302 }
303
ScheduleImpl()304 void ThreadedCoroutineManager::ScheduleImpl()
305 {
306 auto *currentCo = Coroutine::GetCurrent();
307 auto *currentCtx = currentCo->GetContext<ThreadedCoroutineContext>();
308 ScopedNativeCodeThread n(currentCo);
309
310 coroSwitchLock_.Lock();
311 if (RunnableCoroutinesExist()) {
312 currentCo->RequestSuspend(false);
313 PushToRunnableQueue(currentCo);
314 ScheduleNextCoroutine();
315
316 coroSwitchLock_.Unlock();
317 currentCtx->WaitUntilResumed();
318 } else {
319 coroSwitchLock_.Unlock();
320 }
321 }
322
LaunchImpl(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,bool startSuspended)323 Coroutine *ThreadedCoroutineManager::LaunchImpl(CompletionEvent *completionEvent, Method *entrypoint,
324 PandaVector<Value> &&arguments, bool startSuspended)
325 {
326 os::memory::LockHolder l(coroSwitchLock_);
327 #ifndef NDEBUG
328 PrintRunnableQueue("LaunchImpl begin");
329 #endif
330 auto coroName = entrypoint->GetFullName();
331 Coroutine *co = CreateCoroutineInstance(completionEvent, entrypoint, std::move(arguments), std::move(coroName));
332 Runtime::GetCurrent()->GetNotificationManager()->ThreadStartEvent(co);
333 if (co == nullptr) {
334 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::LaunchImpl: failed to create a coroutine!";
335 return co;
336 }
337 auto *ctx = co->GetContext<ThreadedCoroutineContext>();
338 if (startSuspended) {
339 ctx->WaitUntilInitialized();
340 if (runningCorosCount_ >= GetWorkersCount()) {
341 PushToRunnableQueue(co);
342 } else {
343 ++runningCorosCount_;
344 ctx->RequestResume();
345 }
346 } else {
347 ++runningCorosCount_;
348 }
349 #ifndef NDEBUG
350 PrintRunnableQueue("LaunchImpl end");
351 #endif
352 return co;
353 }
354
MainCoroutineCompleted()355 void ThreadedCoroutineManager::MainCoroutineCompleted()
356 {
357 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted() started";
358 auto *ctx = Coroutine::GetCurrent()->GetContext<ThreadedCoroutineContext>();
359 // firstly yield
360 {
361 os::memory::LockHolder l(coroSwitchLock_);
362 ctx->MainThreadFinished();
363 if (RunnableCoroutinesExist()) {
364 ScheduleNextCoroutine();
365 }
366 }
367 // then start awaiting for other coroutines to complete
368 os::memory::LockHolder lk(cvMutex_);
369 ctx->EnterAwaitLoop();
370 while (coroutineCount_ > 1) { // main coro runs till VM shutdown
371 cvAwaitAll_.Wait(&cvMutex_);
372 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): await_all(): still "
373 << coroutineCount_ << " coroutines exist...";
374 }
375 LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): await_all() done";
376 }
377
Finalize()378 void ThreadedCoroutineManager::Finalize() {}
379
380 } // namespace panda
381