1 /** 2 * Copyright (c) 2023-2025 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef PANDA_RUNTIME_COROUTINES_STACKFUL_COROUTINE_MANAGER_H 16 #define PANDA_RUNTIME_COROUTINES_STACKFUL_COROUTINE_MANAGER_H 17 18 #include "runtime/coroutines/coroutine_manager.h" 19 #include "runtime/coroutines/stackful_common.h" 20 #include "runtime/coroutines/stackful_coroutine.h" 21 #include "runtime/coroutines/stackful_coroutine_worker.h" 22 #include "runtime/coroutines/coroutine_stats.h" 23 24 namespace ark { 25 26 /** 27 * @brief Stackful ("fiber"-based) coroutine manager implementation. 28 * 29 * In this implementation coroutines are user-level threads ("fibers") with manually allocated stacks. 30 * 31 * For interface function descriptions see CoroutineManager class declaration. 32 */ 33 class StackfulCoroutineManager : public CoroutineManager { 34 public: 35 NO_COPY_SEMANTIC(StackfulCoroutineManager); 36 NO_MOVE_SEMANTIC(StackfulCoroutineManager); 37 StackfulCoroutineManager(CoroutineFactory factory)38 explicit StackfulCoroutineManager(CoroutineFactory factory) : CoroutineManager(factory) {} 39 ~StackfulCoroutineManager() override = default; 40 41 /* CoroutineManager interfaces, see CoroutineManager class for the details */ 42 void Initialize(CoroutineManagerConfig config, Runtime *runtime, PandaVM *vm) override; 43 void Finalize() override; 44 void RegisterCoroutine(Coroutine *co) override; 45 bool TerminateCoroutine(Coroutine *co) override; 46 bool Launch(CompletionEvent *completionEvent, Method *entrypoint, PandaVector<Value> &&arguments, 47 CoroutineLaunchMode mode, CoroutinePriority priority, bool abortFlag) override; 48 bool LaunchImmediately(CompletionEvent *completionEvent, Method *entrypoint, PandaVector<Value> &&arguments, 49 CoroutineLaunchMode mode, CoroutinePriority priority, bool abortFlag) override; 50 bool LaunchNative(NativeEntrypointFunc epFunc, void *param, PandaString coroName, CoroutineLaunchMode mode, 51 CoroutinePriority priority, bool abortFlag) override; 52 void Schedule() override; 53 void Await(CoroutineEvent *awaitee) RELEASE(awaitee) override; 54 void UnblockWaiters(CoroutineEvent *blocker) override; 55 56 bool IsMainWorker(Coroutine *co) const override; 57 Coroutine *CreateExclusiveWorkerForThread(Runtime *runtime, PandaVM *vm) override; 58 bool DestroyExclusiveWorker() override; 59 bool IsExclusiveWorkersLimitReached() const override; 60 61 void CreateWorkers(size_t howMany, Runtime *runtime, PandaVM *vm) override; 62 void FinalizeWorkers(size_t howMany, Runtime *runtime, PandaVM *vm) override; 63 64 void PreZygoteFork() override; 65 void PostZygoteFork() override; 66 67 /* ThreadManager interfaces, see ThreadManager class for the details */ 68 void WaitForDeregistration() override; 69 void SuspendAllThreads() override; 70 void ResumeAllThreads() override; 71 bool IsRunningThreadExist() override; 72 73 /** 74 * @brief Creates a coroutine instance with a native function as an entry point 75 * @param entry native function to execute 76 * @param param param to pass to the EP 77 */ 78 Coroutine *CreateNativeCoroutine(Runtime *runtime, PandaVM *vm, 79 Coroutine::NativeEntrypointInfo::NativeEntrypointFunc entry, void *param, 80 PandaString name, Coroutine::Type type, CoroutinePriority priority); 81 /// destroy the "native" coroutine created earlier 82 void DestroyNativeCoroutine(Coroutine *co); 83 void DestroyEntrypointfulCoroutine(Coroutine *co) override; 84 85 /// get next free worker id 86 size_t GetNextFreeWorkerId(); 87 88 /* events */ 89 /// called when a coroutine worker thread ends its execution 90 void OnWorkerShutdown(StackfulCoroutineWorker *worker); 91 /// called when a coroutine worker thread starts its execution 92 void OnWorkerStartup(StackfulCoroutineWorker *worker); 93 /// Should be called when a coro makes the non_active->active transition (see the state diagram in coroutine.h) 94 void OnCoroBecameActive(Coroutine *co) override; 95 /** 96 * Should be called when a running coro is being blocked or terminated, i.e. makes 97 * the active->non_active transition (see the state diagram in coroutine.h) 98 */ 99 void OnCoroBecameNonActive(Coroutine *co) override; 100 /// Should be called at the end of the VM native interface call 101 void OnNativeCallExit(Coroutine *co) override; 102 103 /* debugging tools */ 104 /** 105 * For StackfulCoroutineManager implementation: a fatal error is issued if an attempt to switch coroutines on 106 * current worker is detected when coroutine switch is disabled. 107 */ 108 void DisableCoroutineSwitch() override; 109 void EnableCoroutineSwitch() override; 110 bool IsCoroutineSwitchDisabled() override; IsDrainQueueInterfaceEnabled()111 bool IsDrainQueueInterfaceEnabled() 112 { 113 return enableDrainQueueIface_; 114 } IsMigrationEnabled()115 bool IsMigrationEnabled() 116 { 117 return enableMigration_; 118 } IsMigrateAwakenedCorosEnabled()119 bool IsMigrateAwakenedCorosEnabled() 120 { 121 return migrateAwakenedCoros_; 122 } 123 124 /* profiling tools */ GetPerfStats()125 CoroutineStats &GetPerfStats() 126 { 127 return stats_; 128 } 129 130 /// migrate coroutines from the 'from' worker to other workers 131 bool MigrateCoroutinesOutward(StackfulCoroutineWorker *from); 132 /// trigger the managerThread to migrate 133 void TriggerMigration(); 134 /** 135 * @brief migrate the awakened coroutine to the worker with the lowest load 136 * @param co the awakened coroutine 137 */ 138 void MigrateAwakenedCoro(Coroutine *co); 139 140 protected: 141 bool EnumerateThreadsImpl(const ThreadManager::Callback &cb, unsigned int incMask, 142 unsigned int xorMask) const override; 143 CoroutineContext *CreateCoroutineContext(bool coroHasEntrypoint) override; 144 void DeleteCoroutineContext(CoroutineContext *ctx) override; 145 146 size_t GetCoroutineCount() override; 147 size_t GetCoroutineCountLimit() override; 148 149 StackfulCoroutineContext *GetCurrentContext(); 150 StackfulCoroutineWorker *GetCurrentWorker(); 151 152 /** 153 * @brief reuse a cached coroutine instance in case when coroutine pool is enabled 154 * see Coroutine::ReInitialize for details 155 */ 156 void ReuseCoroutineInstance(Coroutine *co, EntrypointInfo &&epInfo, PandaString name, CoroutinePriority priority); 157 158 private: 159 using WorkerId = uint32_t; 160 161 StackfulCoroutineContext *CreateCoroutineContextImpl(bool needStack); 162 StackfulCoroutineWorker *ChooseWorkerForCoroutine(Coroutine *co) REQUIRES(workersLock_); 163 stackful_coroutines::AffinityMask CalcAffinityMaskFromLaunchMode(CoroutineLaunchMode mode); 164 165 Coroutine *GetCoroutineInstanceForLaunch(EntrypointInfo &&epInfo, PandaString &&coroName, 166 CoroutinePriority priority, stackful_coroutines::AffinityMask affinityMask, 167 bool abortFlag); 168 bool LaunchImpl(EntrypointInfo &&epInfo, PandaString &&coroName, CoroutineLaunchMode mode, 169 CoroutinePriority priority, bool abortFlag); 170 bool LaunchImmediatelyImpl(EntrypointInfo &&epInfo, PandaString &&coroName, CoroutineLaunchMode mode, 171 CoroutinePriority priority, bool abortFlag); 172 bool LaunchWithMode(EntrypointInfo &&epInfo, PandaString &&coroName, CoroutineLaunchMode mode, 173 CoroutinePriority priority, bool launchImmediately, bool abortFlag); 174 /** 175 * Tries to extract a coroutine instance from the pool for further reuse, returns nullptr in case when it is not 176 * possible. 177 */ 178 Coroutine *TryGetCoroutineFromPool(); 179 180 /* workers API */ 181 void CreateWorkersImpl(size_t howMany, Runtime *runtime, PandaVM *vm) REQUIRES(workersLock_); 182 /** 183 * This method creates main worker and coroutine + the number of common workers 184 * @param howMany total number of common worker threads, NOT including MAIN 185 */ 186 void CreateMainCoroAndWorkers(size_t howMany, Runtime *runtime, PandaVM *vm) REQUIRES(workersLock_); 187 void OnWorkerStartupImpl(StackfulCoroutineWorker *worker) REQUIRES(workersLock_); 188 StackfulCoroutineWorker *CreateWorker(Runtime *runtime, PandaVM *vm, 189 StackfulCoroutineWorker::ScheduleLoopType wType, PandaString workerName); 190 191 /* coroutine registry management */ 192 void AddToRegistry(Coroutine *co) REQUIRES(coroListLock_); 193 void RemoveFromRegistry(Coroutine *co) REQUIRES(coroListLock_); 194 195 /// call to check if we are done executing managed code and set appropriate member flags 196 void CheckProgramCompletion(); 197 /// call when main coroutine is done executing its managed EP 198 void MainCoroutineCompleted(); 199 /// wait till all the non-main coroutines with managed EP finish execution 200 void WaitForNonMainCoroutinesCompletion(); 201 /// @return number of running worker loop coroutines 202 size_t GetActiveWorkersCount() const; 203 /// @return number of existing worker instances 204 size_t GetExistingWorkersCount() const; 205 /// dump coroutine stats to stdout 206 void DumpCoroutineStats() const; 207 208 /* resource management */ 209 uint8_t *AllocCoroutineStack(); 210 void FreeCoroutineStack(uint8_t *stack); 211 212 /// @return true if there is no active coroutines 213 bool IsNoActiveMutatorsExceptCurrent(); 214 /// Increment/decrement active corotuines count 215 void IncrementActiveCoroutines(); 216 void DecrementActiveCoroutines(); 217 218 /// list unhandled language specific events on program exit 219 void ListUnhandledEventsOnProgramExit(); 220 221 StackfulCoroutineWorker *ChooseWorkerForFinalization(); 222 223 void InitializeWorkerIdAllocator(); 224 WorkerId AllocateWorkerId(); 225 void ReleaseWorkerId(WorkerId workerId); 226 227 /** 228 * The EP for manager thread. The manager thread can only be created when coroutine migration is supported. 229 * This function cannot be called directly, and can only be called once. 230 */ 231 void ManagerThreadProc(); 232 /// manage the lifecycle of the manager thread 233 void StartManagerThread(); 234 void StopManagerThread(); 235 236 void CheckForBlockedWorkers(); 237 void MigrateCoroutinesInward(uint32_t &count); 238 StackfulCoroutineWorker *ChooseWorkerImpl(WorkerSelectionPolicy policy, size_t maskValue) REQUIRES(workersLock_); 239 240 /** 241 * @brief Calculate worker limits based on configuration 242 * @param config The coroutine manager configuration 243 * @param exclusiveWorkersLimit Output parameter for exclusive workers limit 244 * @param commonWorkersLimit Output parameter for common workers limit 245 */ 246 void CalculateWorkerLimits(const CoroutineManagerConfig &config, size_t &exclusiveWorkersLimit, 247 size_t &commonWorkersLimit); 248 249 // for thread safety with GC 250 mutable os::memory::Mutex coroListLock_; 251 // all registered coros 252 PandaSet<Coroutine *> coroutines_ GUARDED_BY(coroListLock_); 253 254 // worker threads-related members 255 PandaList<StackfulCoroutineWorker *> workers_ GUARDED_BY(workersLock_); 256 // allocator of worker ids 257 os::memory::Mutex workerIdLock_; 258 PandaList<WorkerId> freeWorkerIds_ GUARDED_BY(workerIdLock_); 259 size_t activeWorkersCount_ GUARDED_BY(workersLock_) = 0; 260 mutable os::memory::RecursiveMutex workersLock_; 261 mutable os::memory::ConditionVariable workersCv_; 262 263 // events that control program completion 264 os::memory::Mutex programCompletionLock_; 265 CoroutineEvent *programCompletionEvent_ = nullptr; 266 267 // various counters 268 std::atomic_uint32_t coroutineCount_ = 0; 269 size_t coroutineCountLimit_ = 0; 270 size_t exclusiveWorkersLimit_ = 0; 271 size_t commonWorkersCount_ = 0; 272 size_t coroStackSizeBytes_ = 0; 273 274 // active coroutines are runnable + running coroutines 275 std::atomic_uint32_t activeCoroutines_ = 0; 276 // NOTE(konstanting): make it a map once number of the coroutine types gets bigger 277 std::atomic_uint32_t systemCoroutineCount_ = 0; 278 279 /** 280 * @brief holds pointers to the cached coroutine instances in order to speedup coroutine creation and destruction. 281 * linked coroutinecontext instances are cached too (we just keep the cached coroutines linked to their contexts). 282 * used only in case when --use-coroutine-pool=true 283 */ 284 PandaVector<Coroutine *> coroutinePool_ GUARDED_BY(coroPoolLock_); 285 mutable os::memory::Mutex coroPoolLock_; 286 287 // stats 288 CoroutineStats stats_; 289 PandaVector<CoroutineWorkerStats> finalizedWorkerStats_; 290 291 os::memory::Mutex eWorkerCreationLock_; 292 293 // Feature flags, eventually will be refactored into some structure. 294 // Should we just store the initial CoroutineConfig?.. 295 bool enableDrainQueueIface_ = false; 296 // coroutine migration feature 297 bool enableMigration_ = false; 298 bool migrateAwakenedCoros_ = false; 299 300 // the number of migration triggers 301 std::atomic_uint32_t migrateCount_ = 0; 302 // manager thread infos 303 std::atomic_bool managerRunning_ = false; 304 std::thread managerThread_; 305 os::memory::Mutex managerMutex_; 306 os::memory::ConditionVariable managerCv_; 307 308 // the time interval between detecting worker blocking 309 static constexpr uint32_t DETECTION_INTERVAL_VALUE = 5000; 310 }; 311 312 } // namespace ark 313 314 #endif /* PANDA_RUNTIME_COROUTINES_STACKFUL_COROUTINE_MANAGER_H */ 315