1 /** 2 * Copyright (c) 2023-2025 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef PANDA_RUNTIME_COROUTINES_STACKFUL_COROUTINE_WORKER_H 17 #define PANDA_RUNTIME_COROUTINES_STACKFUL_COROUTINE_WORKER_H 18 19 #include "runtime/coroutines/coroutine.h" 20 #include "runtime/coroutines/coroutine_worker.h" 21 #include "runtime/coroutines/coroutine_events.h" 22 #include "runtime/coroutines/stackful_common.h" 23 #include "runtime/coroutines/coroutine_stats.h" 24 #include "runtime/coroutines/priority_queue.h" 25 #include "runtime/include/external_callback_poster.h" 26 27 namespace ark { 28 29 class StackfulCoroutineContext; 30 class StackfulCoroutineManager; 31 32 /** 33 * Represents a worker thread for stackful coroutines. 34 * Contains local part of the scheduling machinery (local coroutine queues, methods) 35 */ 36 class StackfulCoroutineWorker : public CoroutineWorker { 37 public: 38 enum class ScheduleLoopType { THREAD, FIBER }; 39 40 NO_COPY_SEMANTIC(StackfulCoroutineWorker); 41 NO_MOVE_SEMANTIC(StackfulCoroutineWorker); 42 43 /** 44 * @brief The worker constructor. Create the worker and its schedule loop. 45 * 46 * Notable parameters: 47 * @param type defines the schedule loop type for this worker: a separate thread or a coroutine ("FIBER") 48 */ 49 StackfulCoroutineWorker(Runtime *runtime, PandaVM *vm, StackfulCoroutineManager *coroManager, ScheduleLoopType type, 50 PandaString name, size_t id); 51 ~StackfulCoroutineWorker() override = default; 52 53 /// @return false if the worker is stopped and does not schedule anything, otherwise true IsActive()54 bool IsActive() const 55 { 56 os::memory::LockHolder lock(runnablesLock_); 57 return active_; 58 } 59 60 /// enable or disable the worker SetActive(bool val)61 void SetActive(bool val) 62 { 63 os::memory::LockHolder lock(runnablesLock_); 64 active_ = val; 65 runnablesCv_.Signal(); 66 } 67 68 /// @return the moving average number of runnable coroutines in the queue GetLoadFactor()69 double GetLoadFactor() const 70 { 71 return loadFactor_; 72 } 73 GetName()74 PandaString GetName() const 75 { 76 return name_; 77 } 78 SetName(PandaString name)79 void SetName(PandaString name) 80 { 81 name_ = std::move(name); 82 } 83 GetId()84 size_t GetId() const 85 { 86 return id_; 87 } 88 IsMainWorker()89 bool IsMainWorker() const 90 { 91 return id_ == stackful_coroutines::MAIN_WORKER_ID; 92 } 93 DisableForCrossWorkersLaunch()94 void DisableForCrossWorkersLaunch() 95 { 96 isDisabledForCrossWorkersLaunch_ = true; 97 } 98 IsDisabledForCrossWorkersLaunch()99 bool IsDisabledForCrossWorkersLaunch() const 100 { 101 return isDisabledForCrossWorkersLaunch_; 102 } 103 104 /** 105 * @brief Adds a coroutine to the runnables queue. Any new incoming RUNNABLE coroutine should be added 106 * via this interface! And vice versa: no intra-worker queue transitions should be done via this 107 * interface 108 * @param newCoro coroutine to add 109 * @param prioritize if true, add to the beginning of the queue (otherwise to the end) 110 */ 111 void AddRunnableCoroutine(Coroutine *newCoro); 112 113 /** 114 * @brief Registers the RUNNING coroutine in the worker's structures. 115 * Any incoming running coroutine should be added via this interface! And vice versa: no 116 * intra-worker queue transitions should be done via this interface. 117 * @param newCoro coroutine to add 118 */ 119 void AddRunningCoroutine(Coroutine *newCoro); 120 121 /** 122 * @brief Add a new created coroutine and switch to it immediately. 123 * @param newCoro coroutine to add 124 */ 125 void AddCreatedCoroutineAndSwitchToIt(Coroutine *newCoro); 126 127 /** 128 * @brief Block current coroutine till an event happens and switch context to the next ready one 129 * @param awaitee the event to wait 130 */ 131 void WaitForEvent(CoroutineEvent *awaitee) RELEASE(awaitee); 132 133 /** 134 * @brief Signal that an event has happened and unblock all the coroutines in the current worker that are waiting 135 * for this event 136 * @param blocker the event that has happened 137 */ 138 void UnblockWaiters(CoroutineEvent *blocker); 139 140 /** 141 * @brief Add a coroutine to the finalization queue for future destruction and schedule the next ready one for 142 * execution. Used by a coroutine being terminated for safe self-destruction. 143 * @param finalizee coroutine to finalize (the caller) 144 */ 145 void RequestFinalization(Coroutine *finalizee); 146 147 /// @brief schedule the next ready coroutine from the runnables queue for execution 148 void RequestSchedule(); 149 150 /// @brief call to delete the fake "schedule loop" coroutine 151 void FinalizeFiberScheduleLoop(); 152 153 /// @brief schedule runnable coroutines and wait for blocked coroutines 154 void CompleteAllAffinedCoroutines() NO_THREAD_SAFETY_ANALYSIS; 155 156 /* debugging tools */ 157 // See CoroutineManager/StackfulCoroutineManager for details 158 void DisableCoroutineSwitch(); 159 void EnableCoroutineSwitch(); 160 bool IsCoroutineSwitchDisabled(); 161 162 /// @brief Migrate all not affinned coroutines from worker MigrateCoroutines()163 void MigrateCoroutines() {} 164 165 #ifndef NDEBUG 166 void PrintRunnables(const PandaString &requester); 167 #endif 168 /** 169 * @brief Returns the number of coroutines in the runnables queue that belong to a certain type. 170 * Since another worker can concurrently add more coroutines before or after the call, the returned value 171 * should be used only for debugging purposes AND ONLY IF YOU KNOW WHAT EXACTLY ARE YOU DOING. 172 * @param type the type of coroutines to count 173 */ 174 size_t GetRunnablesCount(Coroutine::Type type); 175 176 /* profiling tools */ GetPerfStats()177 CoroutineWorkerStats &GetPerfStats() 178 { 179 return stats_; 180 } 181 182 /// @brief get exclusive status of worker InExclusiveMode()183 bool InExclusiveMode() const 184 { 185 // Atomic with relaxed order reason: sync is not needed here 186 return inExclusiveMode_.load(std::memory_order_relaxed); 187 } 188 189 /// @brief set exclusive status of worker SetExclusiveMode(bool inExclusiveMode)190 void SetExclusiveMode(bool inExclusiveMode) 191 { 192 // Atomic with relaxed order reason: sync is not needed here 193 inExclusiveMode_.store(inExclusiveMode, std::memory_order_relaxed); 194 } 195 196 /// migrate coroutines from this to the 'to' worker 197 void MigrateTo(StackfulCoroutineWorker *to); 198 /// migrate coroutines from the 'from' worker to this 199 bool MigrateFrom(StackfulCoroutineWorker *from); 200 /// check whether the worker is idle 201 bool IsIdle(); 202 /// migrate the coroutines of the blocked worker to other workers 203 void MigrateCorosOutwardsIfBlocked(); 204 205 private: 206 /* schedule loop management */ 207 /// the EP for threaded schedule loops 208 void ThreadProc(); 209 /// the EP for fiber schedule loops 210 void ScheduleLoop(); 211 /// the schedule loop itself 212 void ScheduleLoopBody(); 213 /// the helper proxy function for the fiber schedule loop 214 static void ScheduleLoopProxy(void *worker); 215 216 /* runnables queue management */ 217 void PushToRunnableQueue(Coroutine *co, CoroutinePriority priority) REQUIRES(runnablesLock_); 218 Coroutine *PopFromRunnableQueue(); 219 bool RunnableCoroutinesExist() const; 220 void WaitForRunnables() REQUIRES(runnablesLock_); 221 /** 222 * @brief Register a new active (= runnable or running) coroutine on this worker. 223 * @param newCoro the incoming coroutine to register 224 */ 225 void RegisterIncomingActiveCoroutine(Coroutine *newCoro); 226 227 /* scheduling machinery from high level functions to elementary helpers */ 228 /** 229 * Schedule the next ready coroutine from the runnables queue for execution if present, otherwise wait till those 230 * appear, then pick the best suitable and schedule it. After that eventually the current coroutine will be 231 * scheduled for execution too. Upon that, the control flow will get back to this function and it will return. 232 */ 233 void RequestScheduleImpl(); 234 void BlockCurrentCoroAndScheduleNext() RELEASE(runnablesLock_) RELEASE(waitersLock_); 235 void SuspendCurrentCoroAndScheduleNext() RELEASE(runnablesLock_); 236 template <bool SUSPEND_AS_BLOCKED> 237 void SuspendCurrentCoroGeneric(); 238 void BlockCurrentCoro(); 239 void SuspendCurrentCoro(); 240 /* "the lesser evil": keep thread safety annotations but duplicate the function body */ 241 void ScheduleNextCoroUnlockRunnablesWaiters() RELEASE(runnablesLock_) RELEASE(waitersLock_); 242 void ScheduleNextCoroUnlockRunnables() RELEASE(runnablesLock_); 243 void ScheduleNextCoroUnlockNone(); 244 StackfulCoroutineContext *GetCurrentContext() const; 245 StackfulCoroutineContext *PrepareNextRunnableContextForSwitch(); 246 void SwitchCoroutineContext(StackfulCoroutineContext *from, StackfulCoroutineContext *to); 247 248 /* various helper functions */ 249 /// parse the finalization queue and destroy all coroutines from it 250 void FinalizeTerminatedCoros(); 251 /// recalculate the load factor 252 void UpdateLoadFactor() REQUIRES(runnablesLock_); 253 /** 254 * This checker is called on a coroutine switch attempt. Issues fatal error in case when coroutine switch is 255 * disabled. 256 */ 257 void EnsureCoroutineSwitchEnabled(); 258 259 /// @return true if current method is called from another worker instance IsCrossWorkerCall()260 bool IsCrossWorkerCall() 261 { 262 ASSERT(Coroutine::GetCurrent() != nullptr); 263 return (this != Coroutine::GetCurrent()->GetWorker()); 264 } 265 266 /// check if this may have been blocked 267 bool IsPotentiallyBlocked(); 268 void MigrateCoroutinesImpl(StackfulCoroutineWorker *to, size_t migrateCount) REQUIRES(runnablesLock_); 269 270 /// called when the coroutineContext is switched 271 void OnContextSwitch(); 272 273 StackfulCoroutineManager *coroManager_; 274 Coroutine *scheduleLoopCtx_ = nullptr; 275 bool active_ GUARDED_BY(runnablesLock_) = true; 276 os::thread::ThreadId threadId_; 277 278 // runnable coroutines-related members 279 mutable os::memory::RecursiveMutex runnablesLock_; 280 os::memory::ConditionVariable runnablesCv_; 281 PriorityQueue runnables_ GUARDED_BY(runnablesLock_); 282 // blocked coros-related members: Coroutine AWAITS CoroutineEvent 283 mutable os::memory::Mutex waitersLock_; 284 PandaMap<CoroutineEvent *, Coroutine *> waiters_ GUARDED_BY(waitersLock_); 285 // terminated coros (waiting for deletion) 286 PandaQueue<Coroutine *> finalizationQueue_; 287 288 /// the moving average number of coroutines in the runnable queue 289 std::atomic<double> loadFactor_ = 0; 290 291 // the timestamp of the last coroutine context switch 292 std::atomic<uint64_t> lastCtxSwitchTimeMillis_ = 0; 293 294 /** 295 * If worker is in exclusive mode, it means that: 296 * 1. launch/transition of coroutine from other workers to e-worker is disabled 297 * 2. child e-worker coroutines will be scheduled on the same worker 298 */ 299 std::atomic<bool> inExclusiveMode_ = false; 300 GenericEvent workerCompletionEvent_; 301 std::atomic<bool> isDisabledForCrossWorkersLaunch_ = false; 302 303 /** 304 * This counter is incremented on DisableCoroutineSwitch calls and decremented on EnableCoroutineSwitch calls. 305 * The value 0 means that coroutine switch is ENABLED. 306 */ 307 uint32_t disableCoroSwitchCounter_ = 0; 308 309 // stats 310 CoroutineWorkerStats stats_; 311 312 PandaString name_; 313 stackful_coroutines::WorkerId id_ = stackful_coroutines::INVALID_WORKER_ID; 314 315 // the maximum continuous execution time of a coroutine 316 static constexpr uint32_t MAX_EXECUTION_DURATION = 6000; 317 }; 318 319 } // namespace ark 320 321 #endif /* PANDA_RUNTIME_COROUTINES_STACKFUL_COROUTINE_WORKER_H */ 322