• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_WORK_STEALING_THREAD_POOL_H
19 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_WORK_STEALING_THREAD_POOL_H
20 
21 #include <grpc/event_engine/event_engine.h>
22 #include <grpc/support/port_platform.h>
23 #include <grpc/support/thd_id.h>
24 #include <stddef.h>
25 #include <stdint.h>
26 
27 #include <atomic>
28 #include <memory>
29 
30 #include "absl/base/thread_annotations.h"
31 #include "absl/container/flat_hash_set.h"
32 #include "absl/functional/any_invocable.h"
33 #include "src/core/lib/event_engine/thread_pool/thread_count.h"
34 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
35 #include "src/core/lib/event_engine/work_queue/basic_work_queue.h"
36 #include "src/core/lib/event_engine/work_queue/work_queue.h"
37 #include "src/core/util/backoff.h"
38 #include "src/core/util/notification.h"
39 #include "src/core/util/sync.h"
40 #include "src/core/util/time.h"
41 
42 namespace grpc_event_engine {
43 namespace experimental {
44 
45 class WorkStealingThreadPool final : public ThreadPool {
46  public:
47   explicit WorkStealingThreadPool(size_t reserve_threads);
48   // Asserts Quiesce was called.
49   ~WorkStealingThreadPool() override;
50   // Shut down the pool, and wait for all threads to exit.
51   // This method is safe to call from within a ThreadPool thread.
52   void Quiesce() override;
53   // Run must not be called after Quiesce completes
54   void Run(absl::AnyInvocable<void()> callback) override;
55   void Run(EventEngine::Closure* closure) override;
56 
57   // Forkable
58   // These methods are exposed on the public object to allow for testing.
59   void PrepareFork() override;
60   void PostforkParent() override;
61   void PostforkChild() override;
62 
63  private:
64   // A basic communication mechanism to signal waiting threads that work is
65   // available.
66   class WorkSignal {
67    public:
68     void Signal();
69     void SignalAll();
70     // Returns whether a timeout occurred.
71     bool WaitWithTimeout(grpc_core::Duration time);
72 
73    private:
74     grpc_core::Mutex mu_;
75     grpc_core::CondVar cv_ ABSL_GUARDED_BY(mu_);
76   };
77 
78   // A pool of WorkQueues that participate in work stealing.
79   //
80   // Every worker thread registers and unregisters its thread-local thread pool
81   // here, and steals closures from other threads when work is otherwise
82   // unavailable.
83   class TheftRegistry {
84    public:
85     // Allow any member of the registry to steal from the provided queue.
86     void Enroll(WorkQueue* queue) ABSL_LOCKS_EXCLUDED(mu_);
87     // Disallow work stealing from the provided queue.
88     void Unenroll(WorkQueue* queue) ABSL_LOCKS_EXCLUDED(mu_);
89     // Returns one closure from another thread, or nullptr if none are
90     // available.
91     EventEngine::Closure* StealOne() ABSL_LOCKS_EXCLUDED(mu_);
92 
93    private:
94     grpc_core::Mutex mu_;
95     absl::flat_hash_set<WorkQueue*> queues_ ABSL_GUARDED_BY(mu_);
96   };
97 
98   // An implementation of the ThreadPool
99   // This object is held as a shared_ptr between the owning ThreadPool and each
100   // worker thread. This design allows a ThreadPool worker thread to be the last
101   // owner of the ThreadPool itself.
102   class WorkStealingThreadPoolImpl
103       : public std::enable_shared_from_this<WorkStealingThreadPoolImpl> {
104    public:
105     explicit WorkStealingThreadPoolImpl(size_t reserve_threads);
106     // Start all threads.
107     void Start();
108     // Add a closure to a work queue, preferably a thread-local queue if
109     // available, otherwise the global queue.
110     void Run(EventEngine::Closure* closure);
111     // Start a new thread.
112     // The reason argument determines whether thread creation is rate-limited;
113     // threads created to populate the initial pool are not rate-limited, but
114     // all others thread creation scenarios are rate-limited.
115     void StartThread();
116     // Shut down the pool, and wait for all threads to exit.
117     // This method is safe to call from within a ThreadPool thread.
118     void Quiesce();
119     // Sets a throttled state.
120     // After the initial pool has been created, if the pool is backlogged when
121     // a new thread has started, it is rate limited.
122     // Returns the previous throttling state.
123     bool SetThrottled(bool throttle);
124     // Set the shutdown flag.
125     void SetShutdown(bool is_shutdown);
126     // Set the forking flag.
127     void SetForking(bool is_forking);
128     // Forkable
129     // Ensures that the thread pool is empty before forking.
130     // Postfork parent and child have the same behavior.
131     void PrepareFork();
132     void Postfork();
133     // Thread ID tracking
134     void TrackThread(gpr_thd_id tid);
135     void UntrackThread(gpr_thd_id tid);
136     // Accessor methods
137     bool IsShutdown();
138     bool IsForking();
139     bool IsQuiesced();
reserve_threads()140     size_t reserve_threads() { return reserve_threads_; }
busy_thread_count()141     BusyThreadCount* busy_thread_count() { return &busy_thread_count_; }
living_thread_count()142     LivingThreadCount* living_thread_count() { return &living_thread_count_; }
theft_registry()143     TheftRegistry* theft_registry() { return &theft_registry_; }
queue()144     WorkQueue* queue() { return &queue_; }
work_signal()145     WorkSignal* work_signal() { return &work_signal_; }
146 
147    private:
148     // Lifeguard monitors the pool and keeps it healthy.
149     // It has two main responsibilities:
150     //  * scale the pool to match demand.
151     //  * distribute work to worker threads if the global queue is backing up
152     //    and there are threads that can accept work.
153     class Lifeguard {
154      public:
155       explicit Lifeguard(WorkStealingThreadPoolImpl* pool);
156       ~Lifeguard();
157 
158      private:
159       // The main body of the lifeguard thread.
160       void LifeguardMain();
161       // Starts a new thread if the pool is backlogged
162       void MaybeStartNewThread();
163 
164       WorkStealingThreadPoolImpl* pool_;
165       grpc_core::BackOff backoff_;
166       // Used for signaling that the lifeguard thread has stopped running.
167       std::unique_ptr<grpc_core::Notification> lifeguard_should_shut_down_;
168       std::unique_ptr<grpc_core::Notification> lifeguard_is_shut_down_;
169       std::atomic<bool> lifeguard_running_{false};
170     };
171 
172     void DumpStacksAndCrash();
173 
174     const size_t reserve_threads_;
175     BusyThreadCount busy_thread_count_;
176     LivingThreadCount living_thread_count_;
177     TheftRegistry theft_registry_;
178     BasicWorkQueue queue_;
179     // Track shutdown and fork bits separately.
180     // It's possible for a ThreadPool to initiate shut down while fork handlers
181     // are running, and similarly possible for a fork event to occur during
182     // shutdown.
183     std::atomic<bool> shutdown_{false};
184     std::atomic<bool> forking_{false};
185     std::atomic<bool> quiesced_{false};
186     std::atomic<uint64_t> last_started_thread_{0};
187     // After pool creation we use this to rate limit creation of threads to one
188     // at a time.
189     std::atomic<bool> throttled_{false};
190     WorkSignal work_signal_;
191     grpc_core::Mutex lifeguard_ptr_mu_;
192     std::unique_ptr<Lifeguard> lifeguard_ ABSL_GUARDED_BY(lifeguard_ptr_mu_);
193     // Set of threads for verbose failure debugging
194     grpc_core::Mutex thd_set_mu_;
195     absl::flat_hash_set<gpr_thd_id> thds_ ABSL_GUARDED_BY(thd_set_mu_);
196   };
197 
198   class ThreadState {
199    public:
200     explicit ThreadState(std::shared_ptr<WorkStealingThreadPoolImpl> pool);
201     void ThreadBody();
202     void SleepIfRunning();
203     bool Step();
204     // After the pool is shut down, ensure all local and global callbacks are
205     // executed before quitting the thread.
206     void FinishDraining();
207 
208    private:
209     // pool_ must be the first member so that it is alive when the thread count
210     // is decremented at time of destruction. This is necessary when this thread
211     // state holds the last shared_ptr keeping the pool alive.
212     std::shared_ptr<WorkStealingThreadPoolImpl> pool_;
213     // auto_thread_counter_ must be declared after pool_, so that the thread
214     // count is decremented after all other pool state is cleaned up.
215     LivingThreadCount::AutoThreadCounter auto_thread_counter_;
216     grpc_core::BackOff backoff_;
217     size_t busy_count_idx_;
218   };
219 
220   const std::shared_ptr<WorkStealingThreadPoolImpl> pool_;
221 };
222 
223 }  // namespace experimental
224 }  // namespace grpc_event_engine
225 
226 #endif  // GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_WORK_STEALING_THREAD_POOL_H
227