• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <thread>
17 #include "runtime/coroutines/coroutine.h"
18 #include "runtime/include/thread_scopes.h"
19 #include "libpandabase/os/mutex.h"
20 #include "runtime/include/runtime.h"
21 #include "runtime/include/runtime_notification.h"
22 #include "runtime/include/panda_vm.h"
23 #include "runtime/coroutines/threaded_coroutine.h"
24 #include "runtime/coroutines/threaded_coroutine_manager.h"
25 
26 namespace panda {
27 
Initialize(CoroutineManagerConfig config,Runtime * runtime,PandaVM * vm)28 void ThreadedCoroutineManager::Initialize(CoroutineManagerConfig config, Runtime *runtime, PandaVM *vm)
29 {
30     if (config.emulateJs) {
31         LOG(FATAL, COROUTINES) << "ThreadedCoroutineManager(): JS emulation is not supported!";
32         UNREACHABLE();
33     }
34     if (config.workersCount > 0) {
35         SetWorkersCount(static_cast<uint32_t>(config.workersCount));
36         LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager(): setting number of coroutine workers to "
37                                << GetWorkersCount();
38     } else {
39         SetWorkersCount(std::thread::hardware_concurrency());
40         LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager(): setting number of coroutine workers to CPU count = "
41                                << GetWorkersCount();
42     }
43 
44     auto *mainCo = CreateMainCoroutine(runtime, vm);
45     Coroutine::SetCurrent(mainCo);
46 }
47 
GetWorkersCount() const48 uint32_t ThreadedCoroutineManager::GetWorkersCount() const
49 {
50     return workersCount_;
51 }
52 
SetWorkersCount(uint32_t n)53 void ThreadedCoroutineManager::SetWorkersCount(uint32_t n)
54 {
55     workersCount_ = n;
56 }
57 
CreateCoroutineContext(bool coroHasEntrypoint)58 CoroutineContext *ThreadedCoroutineManager::CreateCoroutineContext([[maybe_unused]] bool coroHasEntrypoint)
59 {
60     auto alloc = Runtime::GetCurrent()->GetInternalAllocator();
61     return alloc->New<ThreadedCoroutineContext>();
62 }
63 
DeleteCoroutineContext(CoroutineContext * ctx)64 void ThreadedCoroutineManager::DeleteCoroutineContext(CoroutineContext *ctx)
65 {
66     auto alloc = Runtime::GetCurrent()->GetInternalAllocator();
67     alloc->Delete(ctx);
68 }
69 
GetCoroutineCount()70 size_t ThreadedCoroutineManager::GetCoroutineCount()
71 {
72     return coroutineCount_;
73 }
74 
GetCoroutineCountLimit()75 size_t ThreadedCoroutineManager::GetCoroutineCountLimit()
76 {
77     return UINT_MAX;
78 }
79 
AddToRegistry(Coroutine * co)80 void ThreadedCoroutineManager::AddToRegistry(Coroutine *co)
81 {
82     os::memory::LockHolder lock(coroListLock_);
83     co->GetVM()->GetGC()->OnThreadCreate(co);
84     coroutines_.insert(co);
85     coroutineCount_++;
86 }
87 
RemoveFromRegistry(Coroutine * co)88 void ThreadedCoroutineManager::RemoveFromRegistry(Coroutine *co)
89 {
90     coroutines_.erase(co);
91     coroutineCount_--;
92 }
93 
DeleteCoroutineInstance(Coroutine * co)94 void ThreadedCoroutineManager::DeleteCoroutineInstance(Coroutine *co)
95 {
96     auto *context = co->GetContext<CoroutineContext>();
97     Runtime::GetCurrent()->GetInternalAllocator()->Delete(co);
98     DeleteCoroutineContext(context);
99 }
100 
RegisterCoroutine(Coroutine * co)101 void ThreadedCoroutineManager::RegisterCoroutine(Coroutine *co)
102 {
103     AddToRegistry(co);
104 }
105 
TerminateCoroutine(Coroutine * co)106 bool ThreadedCoroutineManager::TerminateCoroutine(Coroutine *co)
107 {
108     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::TerminateCoroutine() started";
109     co->NativeCodeEnd();
110     co->UpdateStatus(ThreadStatus::TERMINATING);
111 
112     os::memory::LockHolder l(coroSwitchLock_);
113     if (co->HasManagedEntrypoint()) {
114         // entrypointless coros should be destroyed manually
115         UnblockWaiters(co->GetCompletionEvent());
116         if (RunnableCoroutinesExist()) {
117             ScheduleNextCoroutine();
118         } else {
119             --runningCorosCount_;
120         }
121     }
122 
123     {
124         os::memory::LockHolder lList(coroListLock_);
125         RemoveFromRegistry(co);
126         // DestroyInternalResources() must be called in one critical section with
127         // RemoveFromRegistry (under core_list_lock_). This function transfers cards from coro's post_barrier buffer to
128         // UpdateRemsetThread internally. Situation when cards still remain and UpdateRemsetThread cannot visit the
129         // coro (because it is already removed) must be impossible.
130         co->DestroyInternalResources();
131     }
132     co->UpdateStatus(ThreadStatus::FINISHED);
133     Runtime::GetCurrent()->GetNotificationManager()->ThreadEndEvent(co);
134 
135     if (!co->HasManagedEntrypoint()) {
136         // entrypointless coros should be destroyed manually
137         return false;
138     }
139 
140     DeleteCoroutineInstance(co);
141     cvAwaitAll_.Signal();
142     return true;
143     // NOTE(konstanting): issue debug notifications to runtime
144 }
145 
Launch(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineAffinity affinity)146 Coroutine *ThreadedCoroutineManager::Launch(CompletionEvent *completionEvent, Method *entrypoint,
147                                             PandaVector<Value> &&arguments, [[maybe_unused]] CoroutineAffinity affinity)
148 {
149     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Launch started";
150 
151     auto *result = LaunchImpl(completionEvent, entrypoint, std::move(arguments));
152     if (result == nullptr) {
153         ThrowOutOfMemoryError("Launch failed");
154     }
155 
156     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Launch finished";
157     return result;
158 }
159 
RegisterWaiter(Coroutine * waiter,CoroutineEvent * awaitee)160 bool ThreadedCoroutineManager::RegisterWaiter(Coroutine *waiter, CoroutineEvent *awaitee)
161 {
162     os::memory::LockHolder l(waitersLock_);
163     if (awaitee->Happened()) {
164         awaitee->Unlock();
165         return false;
166     }
167 
168     awaitee->Unlock();
169     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::RegisterAsAwaitee: " << waiter->GetName() << " AWAITS";
170     waiters_.insert({awaitee, waiter});
171     return true;
172 }
173 
Await(CoroutineEvent * awaitee)174 void ThreadedCoroutineManager::Await(CoroutineEvent *awaitee)
175 {
176     ASSERT(awaitee != nullptr);
177     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await started";
178 
179     auto *waiter = Coroutine::GetCurrent();
180     auto *waiterCtx = waiter->GetContext<ThreadedCoroutineContext>();
181 
182     ScopedNativeCodeThread n(waiter);
183     coroSwitchLock_.Lock();
184 
185     if (!RegisterWaiter(waiter, awaitee)) {
186         LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await finished (no await happened)";
187         coroSwitchLock_.Unlock();
188         return;
189     }
190 
191     waiter->RequestSuspend(true);
192     if (RunnableCoroutinesExist()) {
193         ScheduleNextCoroutine();
194     }
195     coroSwitchLock_.Unlock();
196     waiterCtx->WaitUntilResumed();
197 
198     // NB: at this point the awaitee is already deleted
199     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await finished";
200 }
201 
UnblockWaiters(CoroutineEvent * blocker)202 void ThreadedCoroutineManager::UnblockWaiters(CoroutineEvent *blocker)
203 {
204     os::memory::LockHolder l(waitersLock_);
205     ASSERT(blocker != nullptr);
206 #ifndef NDEBUG
207     {
208         os::memory::LockHolder lkBlocker(*blocker);
209         ASSERT(blocker->Happened());
210     }
211 #endif
212     auto w = waiters_.find(blocker);
213     if (w != waiters_.end()) {
214         auto *coro = w->second;
215         waiters_.erase(w);
216         coro->RequestUnblock();
217         PushToRunnableQueue(coro);
218     }
219 }
220 
Schedule()221 void ThreadedCoroutineManager::Schedule()
222 {
223     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Schedule() request from "
224                            << Coroutine::GetCurrent()->GetName();
225     ScheduleImpl();
226 }
227 
EnumerateThreadsImpl(const ThreadManager::Callback & cb,unsigned int incMask,unsigned int xorMask) const228 bool ThreadedCoroutineManager::EnumerateThreadsImpl(const ThreadManager::Callback &cb, unsigned int incMask,
229                                                     unsigned int xorMask) const
230 {
231     os::memory::LockHolder lock(coroListLock_);
232     for (auto *t : coroutines_) {
233         if (!ApplyCallbackToThread(cb, t, incMask, xorMask)) {
234             return false;
235         }
236     }
237     return true;
238 }
239 
SuspendAllThreads()240 void ThreadedCoroutineManager::SuspendAllThreads()
241 {
242     os::memory::LockHolder lList(coroListLock_);
243     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::SuspendAllThreads started";
244     for (auto *t : coroutines_) {
245         t->SuspendImpl(true);
246     }
247     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::SuspendAllThreads finished";
248 }
249 
ResumeAllThreads()250 void ThreadedCoroutineManager::ResumeAllThreads()
251 {
252     os::memory::LockHolder lock(coroListLock_);
253     for (auto *t : coroutines_) {
254         t->ResumeImpl(true);
255     }
256 }
257 
IsRunningThreadExist()258 bool ThreadedCoroutineManager::IsRunningThreadExist()
259 {
260     UNREACHABLE();
261     // NOTE(konstanting): correct implementation. Which coroutine do we consider running?
262     return false;
263 }
264 
WaitForDeregistration()265 void ThreadedCoroutineManager::WaitForDeregistration()
266 {
267     MainCoroutineCompleted();
268 }
269 
270 #ifndef NDEBUG
PrintRunnableQueue(const PandaString & requester)271 void ThreadedCoroutineManager::PrintRunnableQueue(const PandaString &requester)
272 {
273     LOG(DEBUG, COROUTINES) << "[" << requester << "] ";
274     for (auto *co : runnablesQueue_) {
275         LOG(DEBUG, COROUTINES) << co->GetName() << " <";
276     }
277     LOG(DEBUG, COROUTINES) << "X";
278 }
279 #endif
280 
PushToRunnableQueue(Coroutine * co)281 void ThreadedCoroutineManager::PushToRunnableQueue(Coroutine *co)
282 {
283     runnablesQueue_.push_back(co);
284 }
285 
RunnableCoroutinesExist()286 bool ThreadedCoroutineManager::RunnableCoroutinesExist()
287 {
288     return !runnablesQueue_.empty();
289 }
290 
PopFromRunnableQueue()291 Coroutine *ThreadedCoroutineManager::PopFromRunnableQueue()
292 {
293     auto *co = runnablesQueue_.front();
294     runnablesQueue_.pop_front();
295     return co;
296 }
297 
ScheduleNextCoroutine()298 void ThreadedCoroutineManager::ScheduleNextCoroutine()
299 {
300     Coroutine *nextCoroutine = PopFromRunnableQueue();
301     nextCoroutine->RequestResume();
302 }
303 
ScheduleImpl()304 void ThreadedCoroutineManager::ScheduleImpl()
305 {
306     auto *currentCo = Coroutine::GetCurrent();
307     auto *currentCtx = currentCo->GetContext<ThreadedCoroutineContext>();
308     ScopedNativeCodeThread n(currentCo);
309 
310     coroSwitchLock_.Lock();
311     if (RunnableCoroutinesExist()) {
312         currentCo->RequestSuspend(false);
313         PushToRunnableQueue(currentCo);
314         ScheduleNextCoroutine();
315 
316         coroSwitchLock_.Unlock();
317         currentCtx->WaitUntilResumed();
318     } else {
319         coroSwitchLock_.Unlock();
320     }
321 }
322 
LaunchImpl(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,bool startSuspended)323 Coroutine *ThreadedCoroutineManager::LaunchImpl(CompletionEvent *completionEvent, Method *entrypoint,
324                                                 PandaVector<Value> &&arguments, bool startSuspended)
325 {
326     os::memory::LockHolder l(coroSwitchLock_);
327 #ifndef NDEBUG
328     PrintRunnableQueue("LaunchImpl begin");
329 #endif
330     auto coroName = entrypoint->GetFullName();
331     Coroutine *co = CreateCoroutineInstance(completionEvent, entrypoint, std::move(arguments), std::move(coroName));
332     Runtime::GetCurrent()->GetNotificationManager()->ThreadStartEvent(co);
333     if (co == nullptr) {
334         LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::LaunchImpl: failed to create a coroutine!";
335         return co;
336     }
337     auto *ctx = co->GetContext<ThreadedCoroutineContext>();
338     if (startSuspended) {
339         ctx->WaitUntilInitialized();
340         if (runningCorosCount_ >= GetWorkersCount()) {
341             PushToRunnableQueue(co);
342         } else {
343             ++runningCorosCount_;
344             ctx->RequestResume();
345         }
346     } else {
347         ++runningCorosCount_;
348     }
349 #ifndef NDEBUG
350     PrintRunnableQueue("LaunchImpl end");
351 #endif
352     return co;
353 }
354 
MainCoroutineCompleted()355 void ThreadedCoroutineManager::MainCoroutineCompleted()
356 {
357     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted() started";
358     auto *ctx = Coroutine::GetCurrent()->GetContext<ThreadedCoroutineContext>();
359     //  firstly yield
360     {
361         os::memory::LockHolder l(coroSwitchLock_);
362         ctx->MainThreadFinished();
363         if (RunnableCoroutinesExist()) {
364             ScheduleNextCoroutine();
365         }
366     }
367     // then start awaiting for other coroutines to complete
368     os::memory::LockHolder lk(cvMutex_);
369     ctx->EnterAwaitLoop();
370     while (coroutineCount_ > 1) {  // main coro runs till VM shutdown
371         cvAwaitAll_.Wait(&cvMutex_);
372         LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): await_all(): still "
373                                << coroutineCount_ << " coroutines exist...";
374     }
375     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): await_all() done";
376 }
377 
Finalize()378 void ThreadedCoroutineManager::Finalize() {}
379 
380 }  // namespace panda
381