• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (c) 2022-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <thread>
17 #include "runtime/coroutines/coroutine.h"
18 #include "runtime/include/thread_scopes.h"
19 #include "libpandabase/os/mutex.h"
20 #include "runtime/include/runtime.h"
21 #include "runtime/include/runtime_notification.h"
22 #include "runtime/include/panda_vm.h"
23 #include "runtime/coroutines/threaded_coroutine.h"
24 #include "runtime/coroutines/threaded_coroutine_manager.h"
25 
26 namespace ark {
27 
Initialize(CoroutineManagerConfig config,Runtime * runtime,PandaVM * vm)28 void ThreadedCoroutineManager::Initialize(CoroutineManagerConfig config, Runtime *runtime, PandaVM *vm)
29 {
30     if (config.emulateJs) {
31         LOG(FATAL, COROUTINES) << "ThreadedCoroutineManager(): JS emulation is not supported!";
32         UNREACHABLE();
33     }
34     if (config.workersCount > 0) {
35         SetWorkersCount(static_cast<uint32_t>(config.workersCount));
36         LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager(): setting number of coroutine workers to "
37                                << GetWorkersCount();
38     } else {
39         SetWorkersCount(std::thread::hardware_concurrency());
40         LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager(): setting number of coroutine workers to CPU count = "
41                                << GetWorkersCount();
42     }
43 
44     auto *mainCo = CreateMainCoroutine(runtime, vm);
45     Coroutine::SetCurrent(mainCo);
46 }
47 
GetWorkersCount() const48 uint32_t ThreadedCoroutineManager::GetWorkersCount() const
49 {
50     return workersCount_;
51 }
52 
SetWorkersCount(uint32_t n)53 void ThreadedCoroutineManager::SetWorkersCount(uint32_t n)
54 {
55     workersCount_ = n;
56 }
57 
CreateCoroutineContext(bool coroHasEntrypoint)58 CoroutineContext *ThreadedCoroutineManager::CreateCoroutineContext([[maybe_unused]] bool coroHasEntrypoint)
59 {
60     auto alloc = Runtime::GetCurrent()->GetInternalAllocator();
61     return alloc->New<ThreadedCoroutineContext>();
62 }
63 
DeleteCoroutineContext(CoroutineContext * ctx)64 void ThreadedCoroutineManager::DeleteCoroutineContext(CoroutineContext *ctx)
65 {
66     auto alloc = Runtime::GetCurrent()->GetInternalAllocator();
67     alloc->Delete(ctx);
68 }
69 
GetCoroutineCount()70 size_t ThreadedCoroutineManager::GetCoroutineCount()
71 {
72     return coroutineCount_;
73 }
74 
GetCoroutineCountLimit()75 size_t ThreadedCoroutineManager::GetCoroutineCountLimit()
76 {
77     return UINT_MAX;
78 }
79 
AddToRegistry(Coroutine * co)80 void ThreadedCoroutineManager::AddToRegistry(Coroutine *co)
81 {
82     os::memory::LockHolder lock(coroListLock_);
83     co->GetVM()->GetGC()->OnThreadCreate(co);
84     coroutines_.insert(co);
85     coroutineCount_++;
86 }
87 
RemoveFromRegistry(Coroutine * co)88 void ThreadedCoroutineManager::RemoveFromRegistry(Coroutine *co)
89 {
90     coroutines_.erase(co);
91     coroutineCount_--;
92 }
93 
DeleteCoroutineInstance(Coroutine * co)94 void ThreadedCoroutineManager::DeleteCoroutineInstance(Coroutine *co)
95 {
96     auto *context = co->GetContext<CoroutineContext>();
97     Runtime::GetCurrent()->GetInternalAllocator()->Delete(co);
98     DeleteCoroutineContext(context);
99 }
100 
RegisterCoroutine(Coroutine * co)101 void ThreadedCoroutineManager::RegisterCoroutine(Coroutine *co)
102 {
103     AddToRegistry(co);
104 }
105 
TerminateCoroutine(Coroutine * co)106 bool ThreadedCoroutineManager::TerminateCoroutine(Coroutine *co)
107 {
108     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::TerminateCoroutine() started";
109     co->NativeCodeEnd();
110     co->UpdateStatus(ThreadStatus::TERMINATING);
111 
112     os::memory::LockHolder l(coroSwitchLock_);
113     if (co->HasManagedEntrypoint()) {
114         // entrypointless coros should be destroyed manually
115         if (RunnableCoroutinesExist()) {
116             ScheduleNextCoroutine();
117         } else {
118             --runningCorosCount_;
119         }
120     }
121 
122     {
123         os::memory::LockHolder lList(coroListLock_);
124         RemoveFromRegistry(co);
125         // DestroyInternalResources() must be called in one critical section with
126         // RemoveFromRegistry (under core_list_lock_). This function transfers cards from coro's post_barrier buffer to
127         // UpdateRemsetThread internally. Situation when cards still remain and UpdateRemsetThread cannot visit the
128         // coro (because it is already removed) must be impossible.
129         co->DestroyInternalResources();
130     }
131     co->UpdateStatus(ThreadStatus::FINISHED);
132     Runtime::GetCurrent()->GetNotificationManager()->ThreadEndEvent(co);
133 
134     if (!co->HasManagedEntrypoint()) {
135         // entrypointless coros should be destroyed manually
136         return false;
137     }
138 
139     DeleteCoroutineInstance(co);
140 
141     os::memory::LockHolder lk(cvAwaitAllMutex_);
142     cvAwaitAll_.Signal();
143     return true;
144     // NOTE(konstanting): issue debug notifications to runtime
145 }
146 
Launch(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,CoroutineLaunchMode mode)147 Coroutine *ThreadedCoroutineManager::Launch(CompletionEvent *completionEvent, Method *entrypoint,
148                                             PandaVector<Value> &&arguments, [[maybe_unused]] CoroutineLaunchMode mode)
149 {
150     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Launch started";
151 
152     auto *result = LaunchImpl(completionEvent, entrypoint, std::move(arguments));
153     if (result == nullptr) {
154         ThrowOutOfMemoryError("Launch failed");
155     }
156 
157     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Launch finished";
158     return result;
159 }
160 
RegisterWaiter(Coroutine * waiter,CoroutineEvent * awaitee)161 bool ThreadedCoroutineManager::RegisterWaiter(Coroutine *waiter, CoroutineEvent *awaitee)
162 {
163     os::memory::LockHolder l(waitersLock_);
164     if (awaitee->Happened()) {
165         awaitee->Unlock();
166         return false;
167     }
168 
169     awaitee->Unlock();
170     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::RegisterAsAwaitee: " << waiter->GetName() << " AWAITS";
171     waiters_.insert({awaitee, waiter});
172     return true;
173 }
174 
Await(CoroutineEvent * awaitee)175 void ThreadedCoroutineManager::Await(CoroutineEvent *awaitee)
176 {
177     ASSERT(awaitee != nullptr);
178     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await started";
179 
180     auto *waiter = Coroutine::GetCurrent();
181     auto *waiterCtx = waiter->GetContext<ThreadedCoroutineContext>();
182 
183     ScopedNativeCodeThread n(waiter);
184     coroSwitchLock_.Lock();
185 
186     if (!RegisterWaiter(waiter, awaitee)) {
187         LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await finished (no await happened)";
188         coroSwitchLock_.Unlock();
189         return;
190     }
191 
192     waiter->RequestSuspend(true);
193     if (RunnableCoroutinesExist()) {
194         ScheduleNextCoroutine();
195     }
196     coroSwitchLock_.Unlock();
197     waiterCtx->WaitUntilResumed();
198 
199     // NB: at this point the awaitee is already deleted
200     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Await finished";
201 }
202 
UnblockWaiters(CoroutineEvent * blocker)203 void ThreadedCoroutineManager::UnblockWaiters(CoroutineEvent *blocker)
204 {
205     os::memory::LockHolder lh(coroSwitchLock_);
206     UnblockWaitersImpl(blocker);
207 }
208 
UnblockWaitersImpl(CoroutineEvent * blocker)209 void ThreadedCoroutineManager::UnblockWaitersImpl(CoroutineEvent *blocker)
210 {
211     os::memory::LockHolder l(waitersLock_);
212     ASSERT(blocker != nullptr);
213 #ifndef NDEBUG
214     {
215         os::memory::LockHolder lkBlocker(*blocker);
216         ASSERT(blocker->Happened());
217     }
218 #endif
219     auto w = waiters_.find(blocker);
220     while (w != waiters_.end()) {
221         auto *coro = w->second;
222         waiters_.erase(w);
223         coro->RequestUnblock();
224         PushToRunnableQueue(coro);
225         w = waiters_.find(blocker);
226     }
227 }
228 
Schedule()229 void ThreadedCoroutineManager::Schedule()
230 {
231     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::Schedule() request from "
232                            << Coroutine::GetCurrent()->GetName();
233     ScheduleImpl();
234 }
235 
EnumerateThreadsImpl(const ThreadManager::Callback & cb,unsigned int incMask,unsigned int xorMask) const236 bool ThreadedCoroutineManager::EnumerateThreadsImpl(const ThreadManager::Callback &cb, unsigned int incMask,
237                                                     unsigned int xorMask) const
238 {
239     os::memory::LockHolder lock(coroListLock_);
240     for (auto *t : coroutines_) {
241         if (!ApplyCallbackToThread(cb, t, incMask, xorMask)) {
242             return false;
243         }
244     }
245     return true;
246 }
247 
SuspendAllThreads()248 void ThreadedCoroutineManager::SuspendAllThreads()
249 {
250     os::memory::LockHolder lList(coroListLock_);
251     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::SuspendAllThreads started";
252     for (auto *t : coroutines_) {
253         t->SuspendImpl(true);
254     }
255     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::SuspendAllThreads finished";
256 }
257 
ResumeAllThreads()258 void ThreadedCoroutineManager::ResumeAllThreads()
259 {
260     os::memory::LockHolder lock(coroListLock_);
261     for (auto *t : coroutines_) {
262         t->ResumeImpl(true);
263     }
264 }
265 
IsRunningThreadExist()266 bool ThreadedCoroutineManager::IsRunningThreadExist()
267 {
268     UNREACHABLE();
269     // NOTE(konstanting): correct implementation. Which coroutine do we consider running?
270     return false;
271 }
272 
WaitForDeregistration()273 void ThreadedCoroutineManager::WaitForDeregistration()
274 {
275     MainCoroutineCompleted();
276 }
277 
278 #ifndef NDEBUG
PrintRunnableQueue(const PandaString & requester)279 void ThreadedCoroutineManager::PrintRunnableQueue(const PandaString &requester)
280 {
281     LOG(DEBUG, COROUTINES) << "[" << requester << "] ";
282     for (auto *co : runnablesQueue_) {
283         LOG(DEBUG, COROUTINES) << co->GetName() << " <";
284     }
285     LOG(DEBUG, COROUTINES) << "X";
286 }
287 #endif
288 
PushToRunnableQueue(Coroutine * co)289 void ThreadedCoroutineManager::PushToRunnableQueue(Coroutine *co)
290 {
291     runnablesQueue_.push_back(co);
292 }
293 
RunnableCoroutinesExist()294 bool ThreadedCoroutineManager::RunnableCoroutinesExist()
295 {
296     return !runnablesQueue_.empty();
297 }
298 
PopFromRunnableQueue()299 Coroutine *ThreadedCoroutineManager::PopFromRunnableQueue()
300 {
301     auto *co = runnablesQueue_.front();
302     runnablesQueue_.pop_front();
303     return co;
304 }
305 
ScheduleNextCoroutine()306 void ThreadedCoroutineManager::ScheduleNextCoroutine()
307 {
308     Coroutine *nextCoroutine = PopFromRunnableQueue();
309     nextCoroutine->RequestResume();
310 }
311 
ScheduleImpl()312 void ThreadedCoroutineManager::ScheduleImpl()
313 {
314     auto *currentCo = Coroutine::GetCurrent();
315     auto *currentCtx = currentCo->GetContext<ThreadedCoroutineContext>();
316     ScopedNativeCodeThread n(currentCo);
317 
318     coroSwitchLock_.Lock();
319     if (RunnableCoroutinesExist()) {
320         currentCo->RequestSuspend(false);
321         PushToRunnableQueue(currentCo);
322         ScheduleNextCoroutine();
323 
324         coroSwitchLock_.Unlock();
325         currentCtx->WaitUntilResumed();
326     } else {
327         coroSwitchLock_.Unlock();
328     }
329 }
330 
LaunchImpl(CompletionEvent * completionEvent,Method * entrypoint,PandaVector<Value> && arguments,bool startSuspended)331 Coroutine *ThreadedCoroutineManager::LaunchImpl(CompletionEvent *completionEvent, Method *entrypoint,
332                                                 PandaVector<Value> &&arguments, bool startSuspended)
333 {
334     os::memory::LockHolder l(coroSwitchLock_);
335 #ifndef NDEBUG
336     PrintRunnableQueue("LaunchImpl begin");
337 #endif
338     auto coroName = entrypoint->GetFullName();
339     Coroutine *co = CreateCoroutineInstance(completionEvent, entrypoint, std::move(arguments), std::move(coroName));
340     Runtime::GetCurrent()->GetNotificationManager()->ThreadStartEvent(co);
341     if (co == nullptr) {
342         LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::LaunchImpl: failed to create a coroutine!";
343         return co;
344     }
345     auto *ctx = co->GetContext<ThreadedCoroutineContext>();
346     if (startSuspended) {
347         ctx->WaitUntilInitialized();
348         if (runningCorosCount_ >= GetWorkersCount()) {
349             PushToRunnableQueue(co);
350         } else {
351             ++runningCorosCount_;
352             ctx->RequestResume();
353         }
354     } else {
355         ++runningCorosCount_;
356     }
357 #ifndef NDEBUG
358     PrintRunnableQueue("LaunchImpl end");
359 #endif
360     return co;
361 }
362 
MainCoroutineCompleted()363 void ThreadedCoroutineManager::MainCoroutineCompleted()
364 {
365     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted() started";
366     auto *ctx = Coroutine::GetCurrent()->GetContext<ThreadedCoroutineContext>();
367     //  firstly yield
368     {
369         os::memory::LockHolder l(coroSwitchLock_);
370         ctx->MainThreadFinished();
371         if (RunnableCoroutinesExist()) {
372             ScheduleNextCoroutine();
373         }
374     }
375     // then start awaiting for other coroutines to complete
376     os::memory::LockHolder lk(cvAwaitAllMutex_);
377     ctx->EnterAwaitLoop();
378     while (coroutineCount_ > 1) {  // main coro runs till VM shutdown
379         cvAwaitAll_.Wait(&cvAwaitAllMutex_);
380         LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): await_all(): still "
381                                << coroutineCount_ << " coroutines exist...";
382     }
383     LOG(DEBUG, COROUTINES) << "ThreadedCoroutineManager::MainCoroutineCompleted(): await_all() done";
384 }
385 
Finalize()386 void ThreadedCoroutineManager::Finalize() {}
387 
388 }  // namespace ark
389