• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (c) 2023-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef PANDA_RUNTIME_COROUTINES_STACKFUL_COROUTINE_MANAGER_H
16 #define PANDA_RUNTIME_COROUTINES_STACKFUL_COROUTINE_MANAGER_H
17 
18 #include "runtime/coroutines/coroutine_manager.h"
19 #include "runtime/coroutines/stackful_common.h"
20 #include "runtime/coroutines/stackful_coroutine.h"
21 #include "runtime/coroutines/stackful_coroutine_worker.h"
22 #include "runtime/coroutines/coroutine_stats.h"
23 
24 namespace ark {
25 
26 /**
27  * @brief Stackful ("fiber"-based) coroutine manager implementation.
28  *
29  * In this implementation coroutines are user-level threads ("fibers") with manually allocated stacks.
30  *
31  * For interface function descriptions see CoroutineManager class declaration.
32  */
33 class StackfulCoroutineManager : public CoroutineManager {
34 public:
35     NO_COPY_SEMANTIC(StackfulCoroutineManager);
36     NO_MOVE_SEMANTIC(StackfulCoroutineManager);
37 
StackfulCoroutineManager(CoroutineFactory factory)38     explicit StackfulCoroutineManager(CoroutineFactory factory) : CoroutineManager(factory) {}
39     ~StackfulCoroutineManager() override = default;
40 
41     /* CoroutineManager interfaces, see CoroutineManager class for the details */
42     void Initialize(CoroutineManagerConfig config, Runtime *runtime, PandaVM *vm) override;
43     void Finalize() override;
44     void RegisterCoroutine(Coroutine *co) override;
45     bool TerminateCoroutine(Coroutine *co) override;
46     bool Launch(CompletionEvent *completionEvent, Method *entrypoint, PandaVector<Value> &&arguments,
47                 CoroutineLaunchMode mode, CoroutinePriority priority, bool abortFlag) override;
48     bool LaunchImmediately(CompletionEvent *completionEvent, Method *entrypoint, PandaVector<Value> &&arguments,
49                            CoroutineLaunchMode mode, CoroutinePriority priority, bool abortFlag) override;
50     bool LaunchNative(NativeEntrypointFunc epFunc, void *param, PandaString coroName, CoroutineLaunchMode mode,
51                       CoroutinePriority priority, bool abortFlag) override;
52     void Schedule() override;
53     void Await(CoroutineEvent *awaitee) RELEASE(awaitee) override;
54     void UnblockWaiters(CoroutineEvent *blocker) override;
55 
56     bool IsMainWorker(Coroutine *co) const override;
57     Coroutine *CreateExclusiveWorkerForThread(Runtime *runtime, PandaVM *vm) override;
58     bool DestroyExclusiveWorker() override;
59     bool IsExclusiveWorkersLimitReached() const override;
60 
61     void CreateWorkers(size_t howMany, Runtime *runtime, PandaVM *vm) override;
62     void FinalizeWorkers(size_t howMany, Runtime *runtime, PandaVM *vm) override;
63 
64     void PreZygoteFork() override;
65     void PostZygoteFork() override;
66 
67     /* ThreadManager interfaces, see ThreadManager class for the details */
68     void WaitForDeregistration() override;
69     void SuspendAllThreads() override;
70     void ResumeAllThreads() override;
71     bool IsRunningThreadExist() override;
72 
73     /**
74      * @brief Creates a coroutine instance with a native function as an entry point
75      * @param entry native function to execute
76      * @param param param to pass to the EP
77      */
78     Coroutine *CreateNativeCoroutine(Runtime *runtime, PandaVM *vm,
79                                      Coroutine::NativeEntrypointInfo::NativeEntrypointFunc entry, void *param,
80                                      PandaString name, Coroutine::Type type, CoroutinePriority priority);
81     /// destroy the "native" coroutine created earlier
82     void DestroyNativeCoroutine(Coroutine *co);
83     void DestroyEntrypointfulCoroutine(Coroutine *co) override;
84 
85     /// get next free worker id
86     size_t GetNextFreeWorkerId();
87 
88     /* events */
89     /// called when a coroutine worker thread ends its execution
90     void OnWorkerShutdown(StackfulCoroutineWorker *worker);
91     /// called when a coroutine worker thread starts its execution
92     void OnWorkerStartup(StackfulCoroutineWorker *worker);
93     /// Should be called when a coro makes the non_active->active transition (see the state diagram in coroutine.h)
94     void OnCoroBecameActive(Coroutine *co) override;
95     /**
96      * Should be called when a running coro is being blocked or terminated, i.e. makes
97      * the active->non_active transition (see the state diagram in coroutine.h)
98      */
99     void OnCoroBecameNonActive(Coroutine *co) override;
100     /// Should be called at the end of the VM native interface call
101     void OnNativeCallExit(Coroutine *co) override;
102 
103     /* debugging tools */
104     /**
105      * For StackfulCoroutineManager implementation: a fatal error is issued if an attempt to switch coroutines on
106      * current worker is detected when coroutine switch is disabled.
107      */
108     void DisableCoroutineSwitch() override;
109     void EnableCoroutineSwitch() override;
110     bool IsCoroutineSwitchDisabled() override;
IsDrainQueueInterfaceEnabled()111     bool IsDrainQueueInterfaceEnabled()
112     {
113         return enableDrainQueueIface_;
114     }
IsMigrationEnabled()115     bool IsMigrationEnabled()
116     {
117         return enableMigration_;
118     }
IsMigrateAwakenedCorosEnabled()119     bool IsMigrateAwakenedCorosEnabled()
120     {
121         return migrateAwakenedCoros_;
122     }
123 
124     /* profiling tools */
GetPerfStats()125     CoroutineStats &GetPerfStats()
126     {
127         return stats_;
128     }
129 
130     /// migrate coroutines from the 'from' worker to other workers
131     bool MigrateCoroutinesOutward(StackfulCoroutineWorker *from);
132     /// trigger the managerThread to migrate
133     void TriggerMigration();
134     /**
135      * @brief migrate the awakened coroutine to the worker with the lowest load
136      * @param co the awakened coroutine
137      */
138     void MigrateAwakenedCoro(Coroutine *co);
139 
140 protected:
141     bool EnumerateThreadsImpl(const ThreadManager::Callback &cb, unsigned int incMask,
142                               unsigned int xorMask) const override;
143     CoroutineContext *CreateCoroutineContext(bool coroHasEntrypoint) override;
144     void DeleteCoroutineContext(CoroutineContext *ctx) override;
145 
146     size_t GetCoroutineCount() override;
147     size_t GetCoroutineCountLimit() override;
148 
149     StackfulCoroutineContext *GetCurrentContext();
150     StackfulCoroutineWorker *GetCurrentWorker();
151 
152     /**
153      * @brief reuse a cached coroutine instance in case when coroutine pool is enabled
154      * see Coroutine::ReInitialize for details
155      */
156     void ReuseCoroutineInstance(Coroutine *co, EntrypointInfo &&epInfo, PandaString name, CoroutinePriority priority);
157 
158 private:
159     using WorkerId = uint32_t;
160 
161     StackfulCoroutineContext *CreateCoroutineContextImpl(bool needStack);
162     StackfulCoroutineWorker *ChooseWorkerForCoroutine(Coroutine *co) REQUIRES(workersLock_);
163     stackful_coroutines::AffinityMask CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode mode);
164 
165     Coroutine *GetCoroutineInstanceForLaunch(EntrypointInfo &&epInfo, PandaString &&coroName,
166                                              CoroutinePriority priority, stackful_coroutines::AffinityMask affinityMask,
167                                              bool abortFlag);
168     bool LaunchImpl(EntrypointInfo &&epInfo, PandaString &&coroName, CoroutineLaunchMode mode,
169                     CoroutinePriority priority, bool abortFlag);
170     bool LaunchImmediatelyImpl(EntrypointInfo &&epInfo, PandaString &&coroName, CoroutineLaunchMode mode,
171                                CoroutinePriority priority, bool abortFlag);
172     bool LaunchWithMode(EntrypointInfo &&epInfo, PandaString &&coroName, CoroutineLaunchMode mode,
173                         CoroutinePriority priority, bool launchImmediately, bool abortFlag);
174     /**
175      * Tries to extract a coroutine instance from the pool for further reuse, returns nullptr in case when it is not
176      * possible.
177      */
178     Coroutine *TryGetCoroutineFromPool();
179 
180     /* workers API */
181     void CreateWorkersImpl(size_t howMany, Runtime *runtime, PandaVM *vm) REQUIRES(workersLock_);
182     /**
183      * This method creates main worker and coroutine + the number of common workers
184      * @param howMany total number of common worker threads, NOT including MAIN
185      */
186     void CreateMainCoroAndWorkers(size_t howMany, Runtime *runtime, PandaVM *vm) REQUIRES(workersLock_);
187     void OnWorkerStartupImpl(StackfulCoroutineWorker *worker) REQUIRES(workersLock_);
188     StackfulCoroutineWorker *CreateWorker(Runtime *runtime, PandaVM *vm,
189                                           StackfulCoroutineWorker::ScheduleLoopType wType, PandaString workerName);
190 
191     /* coroutine registry management */
192     void AddToRegistry(Coroutine *co) REQUIRES(coroListLock_);
193     void RemoveFromRegistry(Coroutine *co) REQUIRES(coroListLock_);
194 
195     /// call to check if we are done executing managed code and set appropriate member flags
196     void CheckProgramCompletion();
197     /// call when main coroutine is done executing its managed EP
198     void MainCoroutineCompleted();
199     /// wait till all the non-main coroutines with managed EP finish execution
200     void WaitForNonMainCoroutinesCompletion();
201     /// @return number of running worker loop coroutines
202     size_t GetActiveWorkersCount() const;
203     /// @return number of existing worker instances
204     size_t GetExistingWorkersCount() const;
205     /// dump coroutine stats to stdout
206     void DumpCoroutineStats() const;
207 
208     /* resource management */
209     uint8_t *AllocCoroutineStack();
210     void FreeCoroutineStack(uint8_t *stack);
211 
212     /// @return true if there is no active coroutines
213     bool IsNoActiveMutatorsExceptCurrent();
214     /// Increment/decrement active corotuines count
215     void IncrementActiveCoroutines();
216     void DecrementActiveCoroutines();
217 
218     /// list unhandled language specific events on program exit
219     void ListUnhandledEventsOnProgramExit();
220 
221     StackfulCoroutineWorker *ChooseWorkerForFinalization();
222 
223     void InitializeWorkerIdAllocator();
224     WorkerId AllocateWorkerId();
225     void ReleaseWorkerId(WorkerId workerId);
226 
227     /**
228      * The EP for manager thread. The manager thread can only be created when coroutine migration is supported.
229      * This function cannot be called directly, and can only be called once.
230      */
231     void ManagerThreadProc();
232     /// manage the lifecycle of the manager thread
233     void StartManagerThread();
234     void StopManagerThread();
235 
236     void CheckForBlockedWorkers();
237     void MigrateCoroutinesInward(uint32_t &count);
238     StackfulCoroutineWorker *ChooseWorkerImpl(WorkerSelectionPolicy policy, size_t maskValue) REQUIRES(workersLock_);
239 
240     /**
241      * @brief Calculate worker limits based on configuration
242      * @param config The coroutine manager configuration
243      * @param exclusiveWorkersLimit Output parameter for exclusive workers limit
244      * @param commonWorkersLimit Output parameter for common workers limit
245      */
246     void CalculateWorkerLimits(const CoroutineManagerConfig &config, size_t &exclusiveWorkersLimit,
247                                size_t &commonWorkersLimit);
248 
249     // for thread safety with GC
250     mutable os::memory::Mutex coroListLock_;
251     // all registered coros
252     PandaSet<Coroutine *> coroutines_ GUARDED_BY(coroListLock_);
253 
254     // worker threads-related members
255     PandaList<StackfulCoroutineWorker *> workers_ GUARDED_BY(workersLock_);
256     // allocator of worker ids
257     os::memory::Mutex workerIdLock_;
258     PandaList<WorkerId> freeWorkerIds_ GUARDED_BY(workerIdLock_);
259     size_t activeWorkersCount_ GUARDED_BY(workersLock_) = 0;
260     mutable os::memory::RecursiveMutex workersLock_;
261     mutable os::memory::ConditionVariable workersCv_;
262 
263     // events that control program completion
264     os::memory::Mutex programCompletionLock_;
265     CoroutineEvent *programCompletionEvent_ = nullptr;
266 
267     // various counters
268     std::atomic_uint32_t coroutineCount_ = 0;
269     size_t coroutineCountLimit_ = 0;
270     size_t exclusiveWorkersLimit_ = 0;
271     size_t commonWorkersCount_ = 0;
272     size_t coroStackSizeBytes_ = 0;
273 
274     // active coroutines are runnable + running coroutines
275     std::atomic_uint32_t activeCoroutines_ = 0;
276     // NOTE(konstanting): make it a map once number of the coroutine types gets bigger
277     std::atomic_uint32_t systemCoroutineCount_ = 0;
278 
279     /**
280      * @brief holds pointers to the cached coroutine instances in order to speedup coroutine creation and destruction.
281      * linked coroutinecontext instances are cached too (we just keep the cached coroutines linked to their contexts).
282      * used only in case when --use-coroutine-pool=true
283      */
284     PandaVector<Coroutine *> coroutinePool_ GUARDED_BY(coroPoolLock_);
285     mutable os::memory::Mutex coroPoolLock_;
286 
287     // stats
288     CoroutineStats stats_;
289     PandaVector<CoroutineWorkerStats> finalizedWorkerStats_;
290 
291     os::memory::Mutex eWorkerCreationLock_;
292 
293     // Feature flags, eventually will be refactored into some structure.
294     // Should we just store the initial CoroutineConfig?..
295     bool enableDrainQueueIface_ = false;
296     // coroutine migration feature
297     bool enableMigration_ = false;
298     bool migrateAwakenedCoros_ = false;
299 
300     // the number of migration triggers
301     std::atomic_uint32_t migrateCount_ = 0;
302     // manager thread infos
303     std::atomic_bool managerRunning_ = false;
304     std::thread managerThread_;
305     os::memory::Mutex managerMutex_;
306     os::memory::ConditionVariable managerCv_;
307 
308     // the time interval between detecting worker blocking
309     static constexpr uint32_t DETECTION_INTERVAL_VALUE = 5000;
310 };
311 
312 }  // namespace ark
313 
314 #endif /* PANDA_RUNTIME_COROUTINES_STACKFUL_COROUTINE_MANAGER_H */
315