1 /**
2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "runtime/coroutines/coroutine.h"
17 #include "runtime/coroutines/coroutine_context.h"
18 #include "runtime/coroutines/coroutine_manager.h"
19 #include "runtime/coroutines/coroutine_events.h"
20 #include "runtime/include/panda_vm.h"
21 #include "runtime/coroutines/coro_callback_queue.h"
22
23 namespace ark {
24
Create(Runtime * runtime,PandaVM * vm,PandaString name,CoroutineContext * context,std::optional<EntrypointInfo> && epInfo)25 Coroutine *Coroutine::Create(Runtime *runtime, PandaVM *vm, PandaString name, CoroutineContext *context,
26 std::optional<EntrypointInfo> &&epInfo)
27 {
28 mem::InternalAllocatorPtr allocator = runtime->GetInternalAllocator();
29 auto *callbackQueue = allocator->New<CoroCallbackQueue>();
30 auto *co = allocator->New<Coroutine>(os::thread::GetCurrentThreadId(), allocator, vm,
31 ark::panda_file::SourceLang::PANDA_ASSEMBLY, std::move(name), context,
32 callbackQueue, std::move(epInfo));
33 co->Initialize();
34 return co;
35 }
36
Coroutine(ThreadId id,mem::InternalAllocatorPtr allocator,PandaVM * vm,ark::panda_file::SourceLang threadLang,PandaString name,CoroutineContext * context,CallbackQueue * queue,std::optional<EntrypointInfo> && epInfo)37 Coroutine::Coroutine(ThreadId id, mem::InternalAllocatorPtr allocator, PandaVM *vm,
38 ark::panda_file::SourceLang threadLang, PandaString name, CoroutineContext *context,
39 CallbackQueue *queue, std::optional<EntrypointInfo> &&epInfo)
40 : ManagedThread(id, allocator, vm, Thread::ThreadType::THREAD_TYPE_TASK, threadLang),
41 name_(std::move(name)),
42 context_(context),
43 startSuspended_(epInfo.has_value()),
44 callbackQueue_(queue),
45 callbacksEvent_(static_cast<CoroutineManager *>(vm->GetThreadManager()))
46 {
47 ASSERT(vm != nullptr);
48 ASSERT(context != nullptr);
49 SetEntrypointData(std::move(epInfo));
50 coroutineId_ = static_cast<CoroutineManager *>(GetVM()->GetThreadManager())->AllocateCoroutineId();
51 }
52
~Coroutine()53 Coroutine::~Coroutine()
54 {
55 callbackQueue_->Destroy();
56 static_cast<CoroutineManager *>(GetVM()->GetThreadManager())->FreeCoroutineId(coroutineId_);
57 }
58
ReInitialize(PandaString name,CoroutineContext * context,std::optional<EntrypointInfo> && epInfo)59 void Coroutine::ReInitialize(PandaString name, CoroutineContext *context, std::optional<EntrypointInfo> &&epInfo)
60 {
61 ASSERT(context != nullptr);
62 name_ = std::move(name);
63 startSuspended_ = epInfo.has_value();
64 context_ = context;
65
66 SetEntrypointData(std::move(epInfo));
67 context_->AttachToCoroutine(this);
68 }
69
SetEntrypointData(std::optional<EntrypointInfo> && epInfo)70 void Coroutine::SetEntrypointData(std::optional<EntrypointInfo> &&epInfo)
71 {
72 if (epInfo.has_value()) {
73 auto &info = epInfo.value();
74 if (std::holds_alternative<ManagedEntrypointInfo>(info)) {
75 auto &managedEp = std::get<ManagedEntrypointInfo>(info);
76 entrypoint_.emplace<ManagedEntrypointData>(managedEp.completionEvent, managedEp.entrypoint,
77 std::move(managedEp.arguments));
78 } else if (std::holds_alternative<NativeEntrypointInfo>(info)) {
79 auto &nativeEp = std::get<NativeEntrypointInfo>(info);
80 entrypoint_ = NativeEntrypointData(nativeEp.entrypoint, nativeEp.param);
81 }
82 }
83 }
84
CleanUp()85 void Coroutine::CleanUp()
86 {
87 ManagedThread::CleanUp();
88 name_ = "";
89 entrypoint_ = std::monostate();
90 startSuspended_ = false;
91 context_->CleanUp();
92 }
93
~ManagedEntrypointData()94 Coroutine::ManagedEntrypointData::~ManagedEntrypointData()
95 {
96 // delete the event as it is owned by the ManagedEntrypointData instance
97 Runtime::GetCurrent()->GetInternalAllocator()->Delete(completionEvent);
98 }
99
GetName() const100 PandaString Coroutine::GetName() const
101 {
102 return name_;
103 }
104
GetCoroutineStatus() const105 Coroutine::Status Coroutine::GetCoroutineStatus() const
106 {
107 return context_->GetStatus();
108 }
109
SetCoroutineStatus(Coroutine::Status newStatus)110 void Coroutine::SetCoroutineStatus(Coroutine::Status newStatus)
111 {
112 context_->SetStatus(newStatus);
113 }
114
Destroy()115 void Coroutine::Destroy()
116 {
117 context_->Destroy();
118 }
119
Initialize()120 void Coroutine::Initialize()
121 {
122 context_->AttachToCoroutine(this);
123 InitForStackOverflowCheck(ManagedThread::STACK_OVERFLOW_RESERVED_SIZE,
124 ManagedThread::STACK_OVERFLOW_PROTECTED_SIZE);
125 }
126
RetrieveStackInfo(void * & stackAddr,size_t & stackSize,size_t & guardSize)127 bool Coroutine::RetrieveStackInfo(void *&stackAddr, size_t &stackSize, size_t &guardSize)
128 {
129 if (HasManagedEntrypoint() || HasNativeEntrypoint()) {
130 // has EP and separate native context for its execution
131 return context_->RetrieveStackInfo(stackAddr, stackSize, guardSize);
132 }
133 // does not have EP, executes on OS-provided context and stack
134 return ManagedThread::RetrieveStackInfo(stackAddr, stackSize, guardSize);
135 }
136
RequestSuspend(bool getsBlocked)137 void Coroutine::RequestSuspend(bool getsBlocked)
138 {
139 context_->RequestSuspend(getsBlocked);
140 }
141
RequestResume()142 void Coroutine::RequestResume()
143 {
144 context_->RequestResume();
145 }
146
RequestUnblock()147 void Coroutine::RequestUnblock()
148 {
149 context_->RequestUnblock();
150 }
151
RequestCompletion(Value returnValue)152 void Coroutine::RequestCompletion([[maybe_unused]] Value returnValue)
153 {
154 auto *e = GetCompletionEvent();
155 e->Happen();
156 }
157
AcceptAnnouncedCallbacks(PandaList<PandaUniquePtr<Callback>> callbacks)158 void Coroutine::AcceptAnnouncedCallbacks(PandaList<PandaUniquePtr<Callback>> callbacks)
159 {
160 os::memory::LockHolder lh(preparingCallbacksLock_);
161 ASSERT(preparingCallbacks_ >= callbacks.size());
162 auto readyCallbacks = callbacks.size();
163 ASSERT(callbackQueue_ != nullptr);
164 callbackQueue_->PostSequence(std::move(callbacks));
165 // Atomic with relaxed order reason: mutex synchronization
166 preparingCallbacks_.fetch_sub(readyCallbacks, std::memory_order_relaxed);
167 auto *event = awaiteeEvent_.exchange(nullptr);
168 event = (event == nullptr) ? &callbacksEvent_ : event;
169 event->Happen();
170 }
171
WaitTillAnnouncedCallbacksAreDelivered()172 void Coroutine::WaitTillAnnouncedCallbacksAreDelivered()
173 {
174 preparingCallbacksLock_.Lock();
175 // Atomic with relaxed order reason: mutex synchronization
176 if (preparingCallbacks_.load(std::memory_order_relaxed) == 0) {
177 preparingCallbacksLock_.Unlock();
178 return;
179 }
180 callbacksEvent_.SetNotHappened();
181 callbacksEvent_.Lock();
182 preparingCallbacksLock_.Unlock();
183 auto *coroManager = static_cast<CoroutineManager *>(GetVM()->GetThreadManager());
184 coroManager->Await(&callbacksEvent_);
185 }
186
ProcessPresentAndAnnouncedCallbacks()187 void Coroutine::ProcessPresentAndAnnouncedCallbacks()
188 {
189 ASSERT(callbackQueue_ != nullptr);
190 do {
191 ProcessPresentCallbacks();
192 WaitTillAnnouncedCallbacksAreDelivered();
193 } while (!callbackQueue_->IsEmpty());
194 ASSERT(preparingCallbacks_ == 0);
195 }
196
TryEnterAwaitModeAndLockAwaitee(CoroutineEvent * event)197 bool Coroutine::TryEnterAwaitModeAndLockAwaitee(CoroutineEvent *event)
198 {
199 ASSERT(this == Coroutine::GetCurrent());
200 ASSERT(awaiteeEvent_ == nullptr);
201 ASSERT(event != nullptr);
202 os::memory::LockHolder lh(preparingCallbacksLock_);
203 if (!callbackQueue_->IsEmpty()) {
204 return false;
205 }
206
207 if (preparingCallbacks_ != 0) {
208 awaiteeEvent_ = event;
209 }
210 event->SetNotHappened();
211 event->Lock();
212 return true;
213 }
214
operator <<(std::ostream & os,Coroutine::Status status)215 std::ostream &operator<<(std::ostream &os, Coroutine::Status status)
216 {
217 switch (status) {
218 case Coroutine::Status::CREATED:
219 os << "CREATED";
220 break;
221 case Coroutine::Status::RUNNABLE:
222 os << "RUNNABLE";
223 break;
224 case Coroutine::Status::RUNNING:
225 os << "RUNNING";
226 break;
227 case Coroutine::Status::BLOCKED:
228 os << "BLOCKED";
229 break;
230 case Coroutine::Status::TERMINATING:
231 os << "TERMINATING";
232 break;
233 case Coroutine::Status::AWAIT_LOOP:
234 os << "AWAIT_LOOP";
235 break;
236 default:
237 break;
238 }
239 return os;
240 }
241
242 } // namespace ark
243