• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (c) 2023-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef PANDA_RUNTIME_COROUTINES_STACKFUL_COROUTINE_WORKER_H
17 #define PANDA_RUNTIME_COROUTINES_STACKFUL_COROUTINE_WORKER_H
18 
19 #include "runtime/coroutines/coroutine.h"
20 #include "runtime/coroutines/coroutine_worker.h"
21 #include "runtime/coroutines/coroutine_events.h"
22 #include "runtime/coroutines/stackful_common.h"
23 #include "runtime/coroutines/coroutine_stats.h"
24 #include "runtime/coroutines/priority_queue.h"
25 #include "runtime/include/external_callback_poster.h"
26 
27 namespace ark {
28 
29 class StackfulCoroutineContext;
30 class StackfulCoroutineManager;
31 
32 /**
33  * Represents a worker thread for stackful coroutines.
34  * Contains local part of the scheduling machinery (local coroutine queues, methods)
35  */
36 class StackfulCoroutineWorker : public CoroutineWorker {
37 public:
38     enum class ScheduleLoopType { THREAD, FIBER };
39 
40     NO_COPY_SEMANTIC(StackfulCoroutineWorker);
41     NO_MOVE_SEMANTIC(StackfulCoroutineWorker);
42 
43     /**
44      * @brief The worker constructor. Create the worker and its schedule loop.
45      *
46      * Notable parameters:
47      * @param type defines the schedule loop type for this worker: a separate thread or a coroutine ("FIBER")
48      */
49     StackfulCoroutineWorker(Runtime *runtime, PandaVM *vm, StackfulCoroutineManager *coroManager, ScheduleLoopType type,
50                             PandaString name, size_t id);
51     ~StackfulCoroutineWorker() override = default;
52 
53     /// @return false if the worker is stopped and does not schedule anything, otherwise true
IsActive()54     bool IsActive() const
55     {
56         os::memory::LockHolder lock(runnablesLock_);
57         return active_;
58     }
59 
60     /// enable or disable the worker
SetActive(bool val)61     void SetActive(bool val)
62     {
63         os::memory::LockHolder lock(runnablesLock_);
64         active_ = val;
65         runnablesCv_.Signal();
66     }
67 
68     /// @return the moving average number of runnable coroutines in the queue
GetLoadFactor()69     double GetLoadFactor() const
70     {
71         return loadFactor_;
72     }
73 
GetName()74     PandaString GetName() const
75     {
76         return name_;
77     }
78 
SetName(PandaString name)79     void SetName(PandaString name)
80     {
81         name_ = std::move(name);
82     }
83 
GetId()84     size_t GetId() const
85     {
86         return id_;
87     }
88 
IsMainWorker()89     bool IsMainWorker() const
90     {
91         return id_ == stackful_coroutines::MAIN_WORKER_ID;
92     }
93 
DisableForCrossWorkersLaunch()94     void DisableForCrossWorkersLaunch()
95     {
96         isDisabledForCrossWorkersLaunch_ = true;
97     }
98 
IsDisabledForCrossWorkersLaunch()99     bool IsDisabledForCrossWorkersLaunch() const
100     {
101         return isDisabledForCrossWorkersLaunch_;
102     }
103 
104     /**
105      * @brief Adds a coroutine to the runnables queue. Any new incoming RUNNABLE coroutine should be added
106      * via this interface! And vice versa: no intra-worker queue transitions should be done via this
107      * interface
108      * @param newCoro coroutine to add
109      * @param prioritize if true, add to the beginning of the queue (otherwise to the end)
110      */
111     void AddRunnableCoroutine(Coroutine *newCoro);
112 
113     /**
114      * @brief Registers the RUNNING coroutine in the worker's structures.
115      * Any incoming running coroutine should be added via this interface! And vice versa: no
116      * intra-worker queue transitions should be done via this interface.
117      * @param newCoro coroutine to add
118      */
119     void AddRunningCoroutine(Coroutine *newCoro);
120 
121     /**
122      * @brief Add a new created coroutine and switch to it immediately.
123      * @param newCoro coroutine to add
124      */
125     void AddCreatedCoroutineAndSwitchToIt(Coroutine *newCoro);
126 
127     /**
128      * @brief Block current coroutine till an event happens and switch context to the next ready one
129      * @param awaitee the event to wait
130      */
131     void WaitForEvent(CoroutineEvent *awaitee) RELEASE(awaitee);
132 
133     /**
134      * @brief Signal that an event has happened and unblock all the coroutines in the current worker that are waiting
135      * for this event
136      * @param blocker the event that has happened
137      */
138     void UnblockWaiters(CoroutineEvent *blocker);
139 
140     /**
141      * @brief Add a coroutine to the finalization queue for future destruction and schedule the next ready one for
142      * execution. Used by a coroutine being terminated for safe self-destruction.
143      * @param finalizee coroutine to finalize (the caller)
144      */
145     void RequestFinalization(Coroutine *finalizee);
146 
147     /// @brief schedule the next ready coroutine from the runnables queue for execution
148     void RequestSchedule();
149 
150     /// @brief call to delete the fake "schedule loop" coroutine
151     void FinalizeFiberScheduleLoop();
152 
153     /// @brief schedule runnable coroutines and wait for blocked coroutines
154     void CompleteAllAffinedCoroutines() NO_THREAD_SAFETY_ANALYSIS;
155 
156     /* debugging tools */
157     // See CoroutineManager/StackfulCoroutineManager for details
158     void DisableCoroutineSwitch();
159     void EnableCoroutineSwitch();
160     bool IsCoroutineSwitchDisabled();
161 
162     /// @brief Migrate all not affinned coroutines from worker
MigrateCoroutines()163     void MigrateCoroutines() {}
164 
165 #ifndef NDEBUG
166     void PrintRunnables(const PandaString &requester);
167 #endif
168     /**
169      * @brief Returns the number of coroutines in the runnables queue that belong to a certain type.
170      * Since another worker can concurrently add more coroutines before or after the call, the returned value
171      * should be used only for debugging purposes AND ONLY IF YOU KNOW WHAT EXACTLY ARE YOU DOING.
172      * @param type the type of coroutines to count
173      */
174     size_t GetRunnablesCount(Coroutine::Type type);
175 
176     /* profiling tools */
GetPerfStats()177     CoroutineWorkerStats &GetPerfStats()
178     {
179         return stats_;
180     }
181 
182     /// @brief get exclusive status of worker
InExclusiveMode()183     bool InExclusiveMode() const
184     {
185         // Atomic with relaxed order reason: sync is not needed here
186         return inExclusiveMode_.load(std::memory_order_relaxed);
187     }
188 
189     /// @brief set exclusive status of worker
SetExclusiveMode(bool inExclusiveMode)190     void SetExclusiveMode(bool inExclusiveMode)
191     {
192         // Atomic with relaxed order reason: sync is not needed here
193         inExclusiveMode_.store(inExclusiveMode, std::memory_order_relaxed);
194     }
195 
196     /// migrate coroutines from this to the 'to' worker
197     void MigrateTo(StackfulCoroutineWorker *to);
198     /// migrate coroutines from the 'from' worker to this
199     bool MigrateFrom(StackfulCoroutineWorker *from);
200     /// check whether the worker is idle
201     bool IsIdle();
202     /// migrate the coroutines of the blocked worker to other workers
203     void MigrateCorosOutwardsIfBlocked();
204 
205 private:
206     /* schedule loop management */
207     /// the EP for threaded schedule loops
208     void ThreadProc();
209     /// the EP for fiber schedule loops
210     void ScheduleLoop();
211     /// the schedule loop itself
212     void ScheduleLoopBody();
213     /// the helper proxy function for the fiber schedule loop
214     static void ScheduleLoopProxy(void *worker);
215 
216     /* runnables queue management */
217     void PushToRunnableQueue(Coroutine *co, CoroutinePriority priority) REQUIRES(runnablesLock_);
218     Coroutine *PopFromRunnableQueue();
219     bool RunnableCoroutinesExist() const;
220     void WaitForRunnables() REQUIRES(runnablesLock_);
221     /**
222      * @brief Register a new active (= runnable or running) coroutine on this worker.
223      * @param newCoro the incoming coroutine to register
224      */
225     void RegisterIncomingActiveCoroutine(Coroutine *newCoro);
226 
227     /* scheduling machinery from high level functions to elementary helpers */
228     /**
229      * Schedule the next ready coroutine from the runnables queue for execution if present, otherwise wait till those
230      * appear, then pick the best suitable and schedule it. After that eventually the current coroutine will be
231      * scheduled for execution too. Upon that, the control flow will get back to this function and it will return.
232      */
233     void RequestScheduleImpl();
234     void BlockCurrentCoroAndScheduleNext() RELEASE(runnablesLock_) RELEASE(waitersLock_);
235     void SuspendCurrentCoroAndScheduleNext() RELEASE(runnablesLock_);
236     template <bool SUSPEND_AS_BLOCKED>
237     void SuspendCurrentCoroGeneric();
238     void BlockCurrentCoro();
239     void SuspendCurrentCoro();
240     /* "the lesser evil": keep thread safety annotations but duplicate the function body */
241     void ScheduleNextCoroUnlockRunnablesWaiters() RELEASE(runnablesLock_) RELEASE(waitersLock_);
242     void ScheduleNextCoroUnlockRunnables() RELEASE(runnablesLock_);
243     void ScheduleNextCoroUnlockNone();
244     StackfulCoroutineContext *GetCurrentContext() const;
245     StackfulCoroutineContext *PrepareNextRunnableContextForSwitch();
246     void SwitchCoroutineContext(StackfulCoroutineContext *from, StackfulCoroutineContext *to);
247 
248     /* various helper functions */
249     /// parse the finalization queue and destroy all coroutines from it
250     void FinalizeTerminatedCoros();
251     /// recalculate the load factor
252     void UpdateLoadFactor() REQUIRES(runnablesLock_);
253     /**
254      * This checker is called on a coroutine switch attempt. Issues fatal error in case when coroutine switch is
255      * disabled.
256      */
257     void EnsureCoroutineSwitchEnabled();
258 
259     /// @return true if current method is called from another worker instance
IsCrossWorkerCall()260     bool IsCrossWorkerCall()
261     {
262         ASSERT(Coroutine::GetCurrent() != nullptr);
263         return (this != Coroutine::GetCurrent()->GetWorker());
264     }
265 
266     /// check if this may have been blocked
267     bool IsPotentiallyBlocked();
268     void MigrateCoroutinesImpl(StackfulCoroutineWorker *to, size_t migrateCount) REQUIRES(runnablesLock_);
269 
270     /// called when the coroutineContext is switched
271     void OnContextSwitch();
272 
273     StackfulCoroutineManager *coroManager_;
274     Coroutine *scheduleLoopCtx_ = nullptr;
275     bool active_ GUARDED_BY(runnablesLock_) = true;
276     os::thread::ThreadId threadId_;
277 
278     // runnable coroutines-related members
279     mutable os::memory::RecursiveMutex runnablesLock_;
280     os::memory::ConditionVariable runnablesCv_;
281     PriorityQueue runnables_ GUARDED_BY(runnablesLock_);
282     // blocked coros-related members: Coroutine AWAITS CoroutineEvent
283     mutable os::memory::Mutex waitersLock_;
284     PandaMap<CoroutineEvent *, Coroutine *> waiters_ GUARDED_BY(waitersLock_);
285     // terminated coros (waiting for deletion)
286     PandaQueue<Coroutine *> finalizationQueue_;
287 
288     /// the moving average number of coroutines in the runnable queue
289     std::atomic<double> loadFactor_ = 0;
290 
291     // the timestamp of the last coroutine context switch
292     std::atomic<uint64_t> lastCtxSwitchTimeMillis_ = 0;
293 
294     /**
295      * If worker is in exclusive mode, it means that:
296      * 1. launch/transition of coroutine from other workers to e-worker is disabled
297      * 2. child e-worker coroutines will be scheduled on the same worker
298      */
299     std::atomic<bool> inExclusiveMode_ = false;
300     GenericEvent workerCompletionEvent_;
301     std::atomic<bool> isDisabledForCrossWorkersLaunch_ = false;
302 
303     /**
304      * This counter is incremented on DisableCoroutineSwitch calls and decremented on EnableCoroutineSwitch calls.
305      * The value 0 means that coroutine switch is ENABLED.
306      */
307     uint32_t disableCoroSwitchCounter_ = 0;
308 
309     // stats
310     CoroutineWorkerStats stats_;
311 
312     PandaString name_;
313     stackful_coroutines::WorkerId id_ = stackful_coroutines::INVALID_WORKER_ID;
314 
315     // the maximum continuous execution time of a coroutine
316     static constexpr uint32_t MAX_EXECUTION_DURATION = 6000;
317 };
318 
319 }  // namespace ark
320 
321 #endif /* PANDA_RUNTIME_COROUTINES_STACKFUL_COROUTINE_WORKER_H */
322