• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "runtime/coroutines/coroutine.h"
17 #include "runtime/coroutines/coroutine_context.h"
18 #include "runtime/coroutines/coroutine_manager.h"
19 #include "runtime/coroutines/coroutine_events.h"
20 #include "runtime/include/panda_vm.h"
21 #include "runtime/coroutines/coro_callback_queue.h"
22 
23 namespace ark {
24 
Create(Runtime * runtime,PandaVM * vm,PandaString name,CoroutineContext * context,std::optional<EntrypointInfo> && epInfo)25 Coroutine *Coroutine::Create(Runtime *runtime, PandaVM *vm, PandaString name, CoroutineContext *context,
26                              std::optional<EntrypointInfo> &&epInfo)
27 {
28     mem::InternalAllocatorPtr allocator = runtime->GetInternalAllocator();
29     auto *callbackQueue = allocator->New<CoroCallbackQueue>();
30     auto *co = allocator->New<Coroutine>(os::thread::GetCurrentThreadId(), allocator, vm,
31                                          ark::panda_file::SourceLang::PANDA_ASSEMBLY, std::move(name), context,
32                                          callbackQueue, std::move(epInfo));
33     co->Initialize();
34     return co;
35 }
36 
Coroutine(ThreadId id,mem::InternalAllocatorPtr allocator,PandaVM * vm,ark::panda_file::SourceLang threadLang,PandaString name,CoroutineContext * context,CallbackQueue * queue,std::optional<EntrypointInfo> && epInfo)37 Coroutine::Coroutine(ThreadId id, mem::InternalAllocatorPtr allocator, PandaVM *vm,
38                      ark::panda_file::SourceLang threadLang, PandaString name, CoroutineContext *context,
39                      CallbackQueue *queue, std::optional<EntrypointInfo> &&epInfo)
40     : ManagedThread(id, allocator, vm, Thread::ThreadType::THREAD_TYPE_TASK, threadLang),
41       name_(std::move(name)),
42       context_(context),
43       startSuspended_(epInfo.has_value()),
44       callbackQueue_(queue),
45       callbacksEvent_(static_cast<CoroutineManager *>(vm->GetThreadManager()))
46 {
47     ASSERT(vm != nullptr);
48     ASSERT(context != nullptr);
49     SetEntrypointData(std::move(epInfo));
50     coroutineId_ = static_cast<CoroutineManager *>(GetVM()->GetThreadManager())->AllocateCoroutineId();
51 }
52 
~Coroutine()53 Coroutine::~Coroutine()
54 {
55     callbackQueue_->Destroy();
56     static_cast<CoroutineManager *>(GetVM()->GetThreadManager())->FreeCoroutineId(coroutineId_);
57 }
58 
ReInitialize(PandaString name,CoroutineContext * context,std::optional<EntrypointInfo> && epInfo)59 void Coroutine::ReInitialize(PandaString name, CoroutineContext *context, std::optional<EntrypointInfo> &&epInfo)
60 {
61     ASSERT(context != nullptr);
62     name_ = std::move(name);
63     startSuspended_ = epInfo.has_value();
64     context_ = context;
65 
66     SetEntrypointData(std::move(epInfo));
67     context_->AttachToCoroutine(this);
68 }
69 
SetEntrypointData(std::optional<EntrypointInfo> && epInfo)70 void Coroutine::SetEntrypointData(std::optional<EntrypointInfo> &&epInfo)
71 {
72     if (epInfo.has_value()) {
73         auto &info = epInfo.value();
74         if (std::holds_alternative<ManagedEntrypointInfo>(info)) {
75             auto &managedEp = std::get<ManagedEntrypointInfo>(info);
76             entrypoint_.emplace<ManagedEntrypointData>(managedEp.completionEvent, managedEp.entrypoint,
77                                                        std::move(managedEp.arguments));
78         } else if (std::holds_alternative<NativeEntrypointInfo>(info)) {
79             auto &nativeEp = std::get<NativeEntrypointInfo>(info);
80             entrypoint_ = NativeEntrypointData(nativeEp.entrypoint, nativeEp.param);
81         }
82     }
83 }
84 
CleanUp()85 void Coroutine::CleanUp()
86 {
87     ManagedThread::CleanUp();
88     name_ = "";
89     entrypoint_ = std::monostate();
90     startSuspended_ = false;
91     context_->CleanUp();
92 }
93 
~ManagedEntrypointData()94 Coroutine::ManagedEntrypointData::~ManagedEntrypointData()
95 {
96     // delete the event as it is owned by the ManagedEntrypointData instance
97     Runtime::GetCurrent()->GetInternalAllocator()->Delete(completionEvent);
98 }
99 
GetName() const100 PandaString Coroutine::GetName() const
101 {
102     return name_;
103 }
104 
GetCoroutineStatus() const105 Coroutine::Status Coroutine::GetCoroutineStatus() const
106 {
107     return context_->GetStatus();
108 }
109 
SetCoroutineStatus(Coroutine::Status newStatus)110 void Coroutine::SetCoroutineStatus(Coroutine::Status newStatus)
111 {
112     context_->SetStatus(newStatus);
113 }
114 
Destroy()115 void Coroutine::Destroy()
116 {
117     context_->Destroy();
118 }
119 
Initialize()120 void Coroutine::Initialize()
121 {
122     context_->AttachToCoroutine(this);
123     InitForStackOverflowCheck(ManagedThread::STACK_OVERFLOW_RESERVED_SIZE,
124                               ManagedThread::STACK_OVERFLOW_PROTECTED_SIZE);
125 }
126 
RetrieveStackInfo(void * & stackAddr,size_t & stackSize,size_t & guardSize)127 bool Coroutine::RetrieveStackInfo(void *&stackAddr, size_t &stackSize, size_t &guardSize)
128 {
129     if (HasManagedEntrypoint() || HasNativeEntrypoint()) {
130         // has EP and separate native context for its execution
131         return context_->RetrieveStackInfo(stackAddr, stackSize, guardSize);
132     }
133     // does not have EP, executes on OS-provided context and stack
134     return ManagedThread::RetrieveStackInfo(stackAddr, stackSize, guardSize);
135 }
136 
RequestSuspend(bool getsBlocked)137 void Coroutine::RequestSuspend(bool getsBlocked)
138 {
139     context_->RequestSuspend(getsBlocked);
140 }
141 
RequestResume()142 void Coroutine::RequestResume()
143 {
144     context_->RequestResume();
145 }
146 
RequestUnblock()147 void Coroutine::RequestUnblock()
148 {
149     context_->RequestUnblock();
150 }
151 
RequestCompletion(Value returnValue)152 void Coroutine::RequestCompletion([[maybe_unused]] Value returnValue)
153 {
154     auto *e = GetCompletionEvent();
155     e->Happen();
156 }
157 
AcceptAnnouncedCallbacks(PandaList<PandaUniquePtr<Callback>> callbacks)158 void Coroutine::AcceptAnnouncedCallbacks(PandaList<PandaUniquePtr<Callback>> callbacks)
159 {
160     os::memory::LockHolder lh(preparingCallbacksLock_);
161     ASSERT(preparingCallbacks_ >= callbacks.size());
162     auto readyCallbacks = callbacks.size();
163     ASSERT(callbackQueue_ != nullptr);
164     callbackQueue_->PostSequence(std::move(callbacks));
165     // Atomic with relaxed order reason: mutex synchronization
166     preparingCallbacks_.fetch_sub(readyCallbacks, std::memory_order_relaxed);
167     auto *event = awaiteeEvent_.exchange(nullptr);
168     event = (event == nullptr) ? &callbacksEvent_ : event;
169     event->Happen();
170 }
171 
WaitTillAnnouncedCallbacksAreDelivered()172 void Coroutine::WaitTillAnnouncedCallbacksAreDelivered()
173 {
174     preparingCallbacksLock_.Lock();
175     // Atomic with relaxed order reason: mutex synchronization
176     if (preparingCallbacks_.load(std::memory_order_relaxed) == 0) {
177         preparingCallbacksLock_.Unlock();
178         return;
179     }
180     callbacksEvent_.SetNotHappened();
181     callbacksEvent_.Lock();
182     preparingCallbacksLock_.Unlock();
183     auto *coroManager = static_cast<CoroutineManager *>(GetVM()->GetThreadManager());
184     coroManager->Await(&callbacksEvent_);
185 }
186 
ProcessPresentAndAnnouncedCallbacks()187 void Coroutine::ProcessPresentAndAnnouncedCallbacks()
188 {
189     ASSERT(callbackQueue_ != nullptr);
190     do {
191         ProcessPresentCallbacks();
192         WaitTillAnnouncedCallbacksAreDelivered();
193     } while (!callbackQueue_->IsEmpty());
194     ASSERT(preparingCallbacks_ == 0);
195 }
196 
TryEnterAwaitModeAndLockAwaitee(CoroutineEvent * event)197 bool Coroutine::TryEnterAwaitModeAndLockAwaitee(CoroutineEvent *event)
198 {
199     ASSERT(this == Coroutine::GetCurrent());
200     ASSERT(awaiteeEvent_ == nullptr);
201     ASSERT(event != nullptr);
202     os::memory::LockHolder lh(preparingCallbacksLock_);
203     if (!callbackQueue_->IsEmpty()) {
204         return false;
205     }
206 
207     if (preparingCallbacks_ != 0) {
208         awaiteeEvent_ = event;
209     }
210     event->SetNotHappened();
211     event->Lock();
212     return true;
213 }
214 
operator <<(std::ostream & os,Coroutine::Status status)215 std::ostream &operator<<(std::ostream &os, Coroutine::Status status)
216 {
217     switch (status) {
218         case Coroutine::Status::CREATED:
219             os << "CREATED";
220             break;
221         case Coroutine::Status::RUNNABLE:
222             os << "RUNNABLE";
223             break;
224         case Coroutine::Status::RUNNING:
225             os << "RUNNING";
226             break;
227         case Coroutine::Status::BLOCKED:
228             os << "BLOCKED";
229             break;
230         case Coroutine::Status::TERMINATING:
231             os << "TERMINATING";
232             break;
233         case Coroutine::Status::AWAIT_LOOP:
234             os << "AWAIT_LOOP";
235             break;
236         default:
237             break;
238     }
239     return os;
240 }
241 
242 }  // namespace ark
243