1 /** 2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef PANDA_RUNTIME_COROUTINES_STACKFUL_COROUTINE_WORKER_H 17 #define PANDA_RUNTIME_COROUTINES_STACKFUL_COROUTINE_WORKER_H 18 19 #include "runtime/coroutines/coroutine.h" 20 #include "runtime/coroutines/coroutine_events.h" 21 #include "runtime/coroutines/stackful_common.h" 22 #include "runtime/coroutines/coroutine_stats.h" 23 24 namespace ark { 25 26 class StackfulCoroutineContext; 27 class StackfulCoroutineManager; 28 29 /** 30 * Represents a worker thread for stackful coroutines. 31 * Contains local part of the scheduling machinery (local coroutine queues, methods) 32 */ 33 class StackfulCoroutineWorker { 34 public: 35 enum class ScheduleLoopType { THREAD, FIBER }; 36 37 NO_COPY_SEMANTIC(StackfulCoroutineWorker); 38 NO_MOVE_SEMANTIC(StackfulCoroutineWorker); 39 40 /** 41 * @brief The worker constructor. Create the worker and its schedule loop. 42 * 43 * Notable parameters: 44 * @param type defines the schedule loop type for this worker: a separate thread or a coroutine ("FIBER") 45 */ 46 StackfulCoroutineWorker(Runtime *runtime, PandaVM *vm, StackfulCoroutineManager *coroManager, ScheduleLoopType type, 47 PandaString name, size_t id); 48 ~StackfulCoroutineWorker() = default; 49 50 /// @return false if the worker is stopped and does not schedule anything, otherwise true IsActive()51 bool IsActive() const 52 { 53 os::memory::LockHolder lock(runnablesLock_); 54 return active_; 55 } 56 57 /// enable or disable the worker SetActive(bool val)58 void SetActive(bool val) 59 { 60 os::memory::LockHolder lock(runnablesLock_); 61 active_ = val; 62 runnablesCv_.Signal(); 63 } 64 65 /// @return the moving average number of runnable coroutines in the queue GetLoadFactor()66 double GetLoadFactor() const 67 { 68 return loadFactor_; 69 } 70 GetName()71 PandaString GetName() const 72 { 73 return name_; 74 } 75 SetName(PandaString name)76 void SetName(PandaString name) 77 { 78 name_ = std::move(name); 79 } 80 GetId()81 size_t GetId() const 82 { 83 return id_; 84 } 85 86 /** 87 * @brief Adds a coroutine to the runnables queue 88 * @param new_coro coroutine to add 89 * @param prioritize if true, add to the beginning of the queue (otherwise to the end) 90 */ 91 void AddRunnableCoroutine(Coroutine *newCoro, bool prioritize = false); 92 93 /** 94 * @brief Block current coroutine till an event happens and switch context to the next ready one 95 * @param awaitee the event to wait 96 * @return false if the event is apready happened, true after the waiting is completed 97 */ 98 bool WaitForEvent(CoroutineEvent *awaitee) RELEASE(awaitee); 99 100 /** 101 * @brief Signal that an event has happened and unblock all the coroutines in the current worker that are waiting 102 * for this event 103 * @param blocker the event that has happened 104 */ 105 void UnblockWaiters(CoroutineEvent *blocker); 106 107 /** 108 * @brief Add a coroutine to the finalization queue for future destruction and schedule the next ready one for 109 * execution. Used by a coroutine being terminated for safe self-destruction. 110 * @param finalizee coroutine to finalize (the caller) 111 */ 112 void RequestFinalization(Coroutine *finalizee); 113 114 /// @brief schedule the next ready coroutine from the runnables queue for execution 115 void RequestSchedule(); 116 117 /// @brief call to delete the fake "schedule loop" coroutine 118 void FinalizeFiberScheduleLoop(); 119 120 /* debugging tools */ 121 // See CoroutineManager/StackfulCoroutineManager for details 122 void DisableCoroutineSwitch(); 123 void EnableCoroutineSwitch(); 124 bool IsCoroutineSwitchDisabled(); 125 126 #ifndef NDEBUG 127 void PrintRunnables(const PandaString &requester); 128 #endif 129 130 /* profiling tools */ GetPerfStats()131 CoroutineWorkerStats &GetPerfStats() 132 { 133 return stats_; 134 } 135 136 private: 137 /* schedule loop management */ 138 /// the EP for threaded schedule loops 139 void ThreadProc(); 140 /// the EP for fiber schedule loops 141 void ScheduleLoop(); 142 /// the schedule loop itself 143 void ScheduleLoopBody(); 144 /// the helper proxy function for the fiber schedule loop 145 static void ScheduleLoopProxy(void *worker); 146 147 /* runnables queue management */ 148 void PushToRunnableQueue(Coroutine *co, bool pushFront = false); 149 Coroutine *PopFromRunnableQueue(); 150 bool RunnableCoroutinesExist() const; 151 void WaitForRunnables() REQUIRES(runnablesLock_); 152 153 /* scheduling machinery from high level functions to elementary helpers */ 154 /** 155 * Schedule the next ready coroutine from the runnables queue for execution if present, otherwise wait till those 156 * appear, then pick the best suitable and schedule it. After that eventually the current coroutine will be 157 * scheduled for execution too. Upon that, the control flow will get back to this function and it will return. 158 */ 159 void RequestScheduleImpl(); 160 void BlockCurrentCoroAndScheduleNext() RELEASE(runnablesLock_) RELEASE(waitersLock_); 161 void SuspendCurrentCoroAndScheduleNext() RELEASE(runnablesLock_); 162 template <bool SUSPEND_AS_BLOCKED> 163 void SuspendCurrentCoroGeneric(); 164 void BlockCurrentCoro(); 165 void SuspendCurrentCoro(); 166 /* "the lesser evil": keep thread safety annotations but duplicate the function body */ 167 void ScheduleNextCoroUnlockRunnablesWaiters() RELEASE(runnablesLock_) RELEASE(waitersLock_); 168 void ScheduleNextCoroUnlockRunnables() RELEASE(runnablesLock_); 169 void ScheduleNextCoroUnlockNone(); 170 StackfulCoroutineContext *GetCurrentContext() const; 171 StackfulCoroutineContext *PrepareNextRunnableContextForSwitch(); 172 void SwitchCoroutineContext(StackfulCoroutineContext *from, StackfulCoroutineContext *to); 173 174 /* various helper functions */ 175 /// parse the finalization queue and destroy all coroutines from it 176 void FinalizeTerminatedCoros(); 177 /// recalculate the load factor 178 void UpdateLoadFactor() REQUIRES(runnablesLock_); 179 /** 180 * This checker is called on a coroutine switch attempt. Issues fatal error in case when coroutine switch is 181 * disabled. 182 */ 183 void EnsureCoroutineSwitchEnabled(); 184 185 Runtime *runtime_ = nullptr; 186 PandaVM *vm_ = nullptr; 187 StackfulCoroutineManager *coroManager_; 188 Coroutine *scheduleLoopCtx_ = nullptr; 189 bool active_ GUARDED_BY(runnablesLock_) = true; 190 os::thread::ThreadId threadId_; 191 192 // runnable coroutines-related members 193 mutable os::memory::RecursiveMutex runnablesLock_; 194 os::memory::ConditionVariable runnablesCv_; 195 PandaDeque<Coroutine *> runnables_ GUARDED_BY(runnablesLock_); 196 // blocked coros-related members: Coroutine AWAITS CoroutineEvent 197 mutable os::memory::Mutex waitersLock_; 198 PandaMultiMap<CoroutineEvent *, Coroutine *> waiters_ GUARDED_BY(waitersLock_); 199 // terminated coros (waiting for deletion) 200 PandaQueue<Coroutine *> finalizationQueue_; 201 202 /// the moving average number of coroutines in the runnable queue 203 std::atomic<double> loadFactor_ = 0; 204 205 /** 206 * This counter is incremented on DisableCoroutineSwitch calls and decremented on EnableCoroutineSwitch calls. 207 * The value 0 means that coroutine switch is ENABLED. 208 */ 209 uint32_t disableCoroSwitchCounter_ = 0; 210 211 // stats 212 CoroutineWorkerStats stats_; 213 214 PandaString name_; 215 stackful_coroutines::WorkerId id_ = stackful_coroutines::INVALID_WORKER_ID; 216 }; 217 218 } // namespace ark 219 220 #endif /* PANDA_RUNTIME_COROUTINES_STACKFUL_COROUTINE_WORKER_H */ 221