1 // 2 // 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_WORK_STEALING_THREAD_POOL_H 19 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_WORK_STEALING_THREAD_POOL_H 20 21 #include <grpc/event_engine/event_engine.h> 22 #include <grpc/support/port_platform.h> 23 #include <grpc/support/thd_id.h> 24 #include <stddef.h> 25 #include <stdint.h> 26 27 #include <atomic> 28 #include <memory> 29 30 #include "absl/base/thread_annotations.h" 31 #include "absl/container/flat_hash_set.h" 32 #include "absl/functional/any_invocable.h" 33 #include "src/core/lib/event_engine/thread_pool/thread_count.h" 34 #include "src/core/lib/event_engine/thread_pool/thread_pool.h" 35 #include "src/core/lib/event_engine/work_queue/basic_work_queue.h" 36 #include "src/core/lib/event_engine/work_queue/work_queue.h" 37 #include "src/core/util/backoff.h" 38 #include "src/core/util/notification.h" 39 #include "src/core/util/sync.h" 40 #include "src/core/util/time.h" 41 42 namespace grpc_event_engine { 43 namespace experimental { 44 45 class WorkStealingThreadPool final : public ThreadPool { 46 public: 47 explicit WorkStealingThreadPool(size_t reserve_threads); 48 // Asserts Quiesce was called. 49 ~WorkStealingThreadPool() override; 50 // Shut down the pool, and wait for all threads to exit. 51 // This method is safe to call from within a ThreadPool thread. 52 void Quiesce() override; 53 // Run must not be called after Quiesce completes 54 void Run(absl::AnyInvocable<void()> callback) override; 55 void Run(EventEngine::Closure* closure) override; 56 57 // Forkable 58 // These methods are exposed on the public object to allow for testing. 59 void PrepareFork() override; 60 void PostforkParent() override; 61 void PostforkChild() override; 62 63 private: 64 // A basic communication mechanism to signal waiting threads that work is 65 // available. 66 class WorkSignal { 67 public: 68 void Signal(); 69 void SignalAll(); 70 // Returns whether a timeout occurred. 71 bool WaitWithTimeout(grpc_core::Duration time); 72 73 private: 74 grpc_core::Mutex mu_; 75 grpc_core::CondVar cv_ ABSL_GUARDED_BY(mu_); 76 }; 77 78 // A pool of WorkQueues that participate in work stealing. 79 // 80 // Every worker thread registers and unregisters its thread-local thread pool 81 // here, and steals closures from other threads when work is otherwise 82 // unavailable. 83 class TheftRegistry { 84 public: 85 // Allow any member of the registry to steal from the provided queue. 86 void Enroll(WorkQueue* queue) ABSL_LOCKS_EXCLUDED(mu_); 87 // Disallow work stealing from the provided queue. 88 void Unenroll(WorkQueue* queue) ABSL_LOCKS_EXCLUDED(mu_); 89 // Returns one closure from another thread, or nullptr if none are 90 // available. 91 EventEngine::Closure* StealOne() ABSL_LOCKS_EXCLUDED(mu_); 92 93 private: 94 grpc_core::Mutex mu_; 95 absl::flat_hash_set<WorkQueue*> queues_ ABSL_GUARDED_BY(mu_); 96 }; 97 98 // An implementation of the ThreadPool 99 // This object is held as a shared_ptr between the owning ThreadPool and each 100 // worker thread. This design allows a ThreadPool worker thread to be the last 101 // owner of the ThreadPool itself. 102 class WorkStealingThreadPoolImpl 103 : public std::enable_shared_from_this<WorkStealingThreadPoolImpl> { 104 public: 105 explicit WorkStealingThreadPoolImpl(size_t reserve_threads); 106 // Start all threads. 107 void Start(); 108 // Add a closure to a work queue, preferably a thread-local queue if 109 // available, otherwise the global queue. 110 void Run(EventEngine::Closure* closure); 111 // Start a new thread. 112 // The reason argument determines whether thread creation is rate-limited; 113 // threads created to populate the initial pool are not rate-limited, but 114 // all others thread creation scenarios are rate-limited. 115 void StartThread(); 116 // Shut down the pool, and wait for all threads to exit. 117 // This method is safe to call from within a ThreadPool thread. 118 void Quiesce(); 119 // Sets a throttled state. 120 // After the initial pool has been created, if the pool is backlogged when 121 // a new thread has started, it is rate limited. 122 // Returns the previous throttling state. 123 bool SetThrottled(bool throttle); 124 // Set the shutdown flag. 125 void SetShutdown(bool is_shutdown); 126 // Set the forking flag. 127 void SetForking(bool is_forking); 128 // Forkable 129 // Ensures that the thread pool is empty before forking. 130 // Postfork parent and child have the same behavior. 131 void PrepareFork(); 132 void Postfork(); 133 // Thread ID tracking 134 void TrackThread(gpr_thd_id tid); 135 void UntrackThread(gpr_thd_id tid); 136 // Accessor methods 137 bool IsShutdown(); 138 bool IsForking(); 139 bool IsQuiesced(); reserve_threads()140 size_t reserve_threads() { return reserve_threads_; } busy_thread_count()141 BusyThreadCount* busy_thread_count() { return &busy_thread_count_; } living_thread_count()142 LivingThreadCount* living_thread_count() { return &living_thread_count_; } theft_registry()143 TheftRegistry* theft_registry() { return &theft_registry_; } queue()144 WorkQueue* queue() { return &queue_; } work_signal()145 WorkSignal* work_signal() { return &work_signal_; } 146 147 private: 148 // Lifeguard monitors the pool and keeps it healthy. 149 // It has two main responsibilities: 150 // * scale the pool to match demand. 151 // * distribute work to worker threads if the global queue is backing up 152 // and there are threads that can accept work. 153 class Lifeguard { 154 public: 155 explicit Lifeguard(WorkStealingThreadPoolImpl* pool); 156 ~Lifeguard(); 157 158 private: 159 // The main body of the lifeguard thread. 160 void LifeguardMain(); 161 // Starts a new thread if the pool is backlogged 162 void MaybeStartNewThread(); 163 164 WorkStealingThreadPoolImpl* pool_; 165 grpc_core::BackOff backoff_; 166 // Used for signaling that the lifeguard thread has stopped running. 167 std::unique_ptr<grpc_core::Notification> lifeguard_should_shut_down_; 168 std::unique_ptr<grpc_core::Notification> lifeguard_is_shut_down_; 169 std::atomic<bool> lifeguard_running_{false}; 170 }; 171 172 void DumpStacksAndCrash(); 173 174 const size_t reserve_threads_; 175 BusyThreadCount busy_thread_count_; 176 LivingThreadCount living_thread_count_; 177 TheftRegistry theft_registry_; 178 BasicWorkQueue queue_; 179 // Track shutdown and fork bits separately. 180 // It's possible for a ThreadPool to initiate shut down while fork handlers 181 // are running, and similarly possible for a fork event to occur during 182 // shutdown. 183 std::atomic<bool> shutdown_{false}; 184 std::atomic<bool> forking_{false}; 185 std::atomic<bool> quiesced_{false}; 186 std::atomic<uint64_t> last_started_thread_{0}; 187 // After pool creation we use this to rate limit creation of threads to one 188 // at a time. 189 std::atomic<bool> throttled_{false}; 190 WorkSignal work_signal_; 191 grpc_core::Mutex lifeguard_ptr_mu_; 192 std::unique_ptr<Lifeguard> lifeguard_ ABSL_GUARDED_BY(lifeguard_ptr_mu_); 193 // Set of threads for verbose failure debugging 194 grpc_core::Mutex thd_set_mu_; 195 absl::flat_hash_set<gpr_thd_id> thds_ ABSL_GUARDED_BY(thd_set_mu_); 196 }; 197 198 class ThreadState { 199 public: 200 explicit ThreadState(std::shared_ptr<WorkStealingThreadPoolImpl> pool); 201 void ThreadBody(); 202 void SleepIfRunning(); 203 bool Step(); 204 // After the pool is shut down, ensure all local and global callbacks are 205 // executed before quitting the thread. 206 void FinishDraining(); 207 208 private: 209 // pool_ must be the first member so that it is alive when the thread count 210 // is decremented at time of destruction. This is necessary when this thread 211 // state holds the last shared_ptr keeping the pool alive. 212 std::shared_ptr<WorkStealingThreadPoolImpl> pool_; 213 // auto_thread_counter_ must be declared after pool_, so that the thread 214 // count is decremented after all other pool state is cleaned up. 215 LivingThreadCount::AutoThreadCounter auto_thread_counter_; 216 grpc_core::BackOff backoff_; 217 size_t busy_count_idx_; 218 }; 219 220 const std::shared_ptr<WorkStealingThreadPoolImpl> pool_; 221 }; 222 223 } // namespace experimental 224 } // namespace grpc_event_engine 225 226 #endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_WORK_STEALING_THREAD_POOL_H 227