• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 #include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h"
19 
20 #include <grpc/support/port_platform.h>
21 #include <grpc/support/thd_id.h>
22 #include <inttypes.h>
23 
24 #include <atomic>
25 #include <chrono>
26 #include <cstddef>
27 #include <memory>
28 #include <utility>
29 
30 #include "absl/functional/any_invocable.h"
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "absl/time/clock.h"
34 #include "absl/time/time.h"
35 #include "absl/types/optional.h"
36 #include "src/core/lib/debug/trace.h"
37 #include "src/core/lib/event_engine/common_closures.h"
38 #include "src/core/lib/event_engine/thread_local.h"
39 #include "src/core/lib/event_engine/work_queue/basic_work_queue.h"
40 #include "src/core/lib/event_engine/work_queue/work_queue.h"
41 #include "src/core/util/backoff.h"
42 #include "src/core/util/crash.h"
43 #include "src/core/util/env.h"
44 #include "src/core/util/examine_stack.h"
45 #include "src/core/util/thd.h"
46 #include "src/core/util/time.h"
47 
48 #ifdef GPR_POSIX_SYNC
49 #include <csignal>
50 #elif defined(GPR_WINDOWS)
51 #include <signal.h>
52 #endif
53 
54 // IWYU pragma: no_include <ratio>
55 
56 // ## Thread Pool Fork-handling
57 //
58 // Thread-safety needs special attention with regard to fork() calls. The
59 // Forkable system employs a pre- and post- fork callback system that does not
60 // guarantee any ordering of execution. On fork() events, the thread pool does
61 // the following:
62 //
63 // On pre-fork:
64 // * the WorkStealingThreadPool triggers all threads to exit,
65 // * all queued work is saved, and
66 // * all threads will are down, including the Lifeguard thread.
67 //
68 // On post-fork:
69 //  * all threads are restarted, including the Lifeguard thread, and
70 //  * all previously-saved work is enqueued for execution.
71 //
72 // However, the queue may may get into trouble if one thread is attempting to
73 // restart the thread pool while another thread is shutting it down. For that
74 // reason, Quiesce and Start must be thread-safe, and Quiesce must wait for the
75 // pool to be in a fully started state before it is allowed to continue.
76 // Consider this potential ordering of events between Start and Quiesce:
77 //
78 //     ┌──────────┐
79 //     │ Thread 1 │
80 //     └────┬─────┘  ┌──────────┐
81 //          │        │ Thread 2 │
82 //          ▼        └────┬─────┘
83 //        Start()         │
84 //          │             ▼
85 //          │        Quiesce()
86 //          │        Wait for worker threads to exit
87 //          │        Wait for the lifeguard thread to exit
88 //          ▼
89 //        Start the Lifeguard thread
90 //        Start the worker threads
91 //
92 // Thread 2 will find no worker threads, and it will then want to wait on a
93 // non-existent Lifeguard thread to finish. Trying a simple
94 // `lifeguard_thread_.Join()` leads to memory access errors. This implementation
95 // uses Notifications to coordinate startup and shutdown states.
96 //
97 // ## Debugging
98 //
99 // Set the environment variable GRPC_THREAD_POOL_VERBOSE_FAILURES=anything to
100 // enable advanced debugging. When the pool takes too long to quiesce, a
101 // backtrace will be printed for every running thread, and the process will
102 // abort.
103 
104 namespace grpc_event_engine {
105 namespace experimental {
106 
107 namespace {
108 // TODO(ctiller): grpc_core::Timestamp, Duration have very specific contracts
109 // around time caching and around when the underlying gpr_now call can be
110 // substituted out.
111 // We should probably move all usage here to std::chrono to avoid weird bugs in
112 // the future.
113 
114 // Maximum amount of time an extra thread is allowed to idle before being
115 // reclaimed.
116 constexpr auto kIdleThreadLimit = std::chrono::seconds(20);
117 // Rate at which "Waiting for ..." logs should be printed while quiescing.
118 constexpr size_t kBlockingQuiesceLogRateSeconds = 3;
119 // Minimum time between thread creations.
120 constexpr grpc_core::Duration kTimeBetweenThrottledThreadStarts =
121     grpc_core::Duration::Seconds(1);
122 // Minimum time a worker thread should sleep between checking for new work. Used
123 // in backoff calculations to reduce vigilance when the pool is calm.
124 constexpr grpc_core::Duration kWorkerThreadMinSleepBetweenChecks{
125     grpc_core::Duration::Milliseconds(15)};
126 // Maximum time a worker thread should sleep between checking for new work.
127 constexpr grpc_core::Duration kWorkerThreadMaxSleepBetweenChecks{
128     grpc_core::Duration::Seconds(3)};
129 // Minimum time the lifeguard thread should sleep between checks. Used in
130 // backoff calculations to reduce vigilance when the pool is calm.
131 constexpr grpc_core::Duration kLifeguardMinSleepBetweenChecks{
132     grpc_core::Duration::Milliseconds(15)};
133 // Maximum time the lifeguard thread should sleep between checking for new work.
134 constexpr grpc_core::Duration kLifeguardMaxSleepBetweenChecks{
135     grpc_core::Duration::Seconds(1)};
136 constexpr grpc_core::Duration kBlockUntilThreadCountTimeout{
137     grpc_core::Duration::Seconds(60)};
138 
139 #ifdef GPR_POSIX_SYNC
140 const bool g_log_verbose_failures =
141     grpc_core::GetEnv("GRPC_THREAD_POOL_VERBOSE_FAILURES").has_value();
142 constexpr int kDumpStackSignal = SIGUSR1;
143 #elif defined(GPR_WINDOWS)
144 const bool g_log_verbose_failures =
145     grpc_core::GetEnv("GRPC_THREAD_POOL_VERBOSE_FAILURES").has_value();
146 constexpr int kDumpStackSignal = SIGTERM;
147 #else
148 constexpr bool g_log_verbose_failures = false;
149 constexpr int kDumpStackSignal = -1;
150 #endif
151 
152 std::atomic<size_t> g_reported_dump_count{0};
153 
DumpSignalHandler(int)154 void DumpSignalHandler(int /* sig */) {
155   const auto trace = grpc_core::GetCurrentStackTrace();
156   if (!trace.has_value()) {
157     LOG(ERROR) << "DumpStack::" << gpr_thd_currentid()
158                << ": Stack trace not available";
159   } else {
160     LOG(ERROR) << "DumpStack::" << gpr_thd_currentid() << ": " << trace.value();
161   }
162   g_reported_dump_count.fetch_add(1);
163   grpc_core::Thread::Kill(gpr_thd_currentid());
164 }
165 
166 }  // namespace
167 
168 thread_local WorkQueue* g_local_queue = nullptr;
169 
170 // -------- WorkStealingThreadPool --------
171 
WorkStealingThreadPool(size_t reserve_threads)172 WorkStealingThreadPool::WorkStealingThreadPool(size_t reserve_threads)
173     : pool_{std::make_shared<WorkStealingThreadPoolImpl>(reserve_threads)} {
174   if (g_log_verbose_failures) {
175     GRPC_TRACE_LOG(event_engine, INFO)
176         << "WorkStealingThreadPool verbose failures are enabled";
177   }
178   pool_->Start();
179 }
180 
Quiesce()181 void WorkStealingThreadPool::Quiesce() { pool_->Quiesce(); }
182 
~WorkStealingThreadPool()183 WorkStealingThreadPool::~WorkStealingThreadPool() {
184   CHECK(pool_->IsQuiesced());
185 }
186 
Run(absl::AnyInvocable<void ()> callback)187 void WorkStealingThreadPool::Run(absl::AnyInvocable<void()> callback) {
188   Run(SelfDeletingClosure::Create(std::move(callback)));
189 }
190 
Run(EventEngine::Closure * closure)191 void WorkStealingThreadPool::Run(EventEngine::Closure* closure) {
192   pool_->Run(closure);
193 }
194 
195 // -------- WorkStealingThreadPool::TheftRegistry --------
196 
Enroll(WorkQueue * queue)197 void WorkStealingThreadPool::TheftRegistry::Enroll(WorkQueue* queue) {
198   grpc_core::MutexLock lock(&mu_);
199   queues_.emplace(queue);
200 }
201 
Unenroll(WorkQueue * queue)202 void WorkStealingThreadPool::TheftRegistry::Unenroll(WorkQueue* queue) {
203   grpc_core::MutexLock lock(&mu_);
204   queues_.erase(queue);
205 }
206 
StealOne()207 EventEngine::Closure* WorkStealingThreadPool::TheftRegistry::StealOne() {
208   grpc_core::MutexLock lock(&mu_);
209   EventEngine::Closure* closure;
210   for (auto* queue : queues_) {
211     closure = queue->PopMostRecent();
212     if (closure != nullptr) return closure;
213   }
214   return nullptr;
215 }
216 
PrepareFork()217 void WorkStealingThreadPool::PrepareFork() { pool_->PrepareFork(); }
218 
PostforkParent()219 void WorkStealingThreadPool::PostforkParent() { pool_->Postfork(); }
220 
PostforkChild()221 void WorkStealingThreadPool::PostforkChild() { pool_->Postfork(); }
222 
223 // -------- WorkStealingThreadPool::WorkStealingThreadPoolImpl --------
224 
WorkStealingThreadPoolImpl(size_t reserve_threads)225 WorkStealingThreadPool::WorkStealingThreadPoolImpl::WorkStealingThreadPoolImpl(
226     size_t reserve_threads)
227     : reserve_threads_(reserve_threads), queue_(this) {}
228 
Start()229 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Start() {
230   for (size_t i = 0; i < reserve_threads_; i++) {
231     StartThread();
232   }
233   grpc_core::MutexLock lock(&lifeguard_ptr_mu_);
234   lifeguard_ = std::make_unique<Lifeguard>(this);
235 }
236 
Run(EventEngine::Closure * closure)237 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Run(
238     EventEngine::Closure* closure) {
239   CHECK(!IsQuiesced());
240   if (g_local_queue != nullptr && g_local_queue->owner() == this) {
241     g_local_queue->Add(closure);
242   } else {
243     queue_.Add(closure);
244   }
245   // Signal a worker in any case, even if work was added to a local queue. This
246   // improves performance on 32-core streaming benchmarks with small payloads.
247   work_signal_.Signal();
248 }
249 
StartThread()250 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread() {
251   last_started_thread_.store(
252       grpc_core::Timestamp::Now().milliseconds_after_process_epoch(),
253       std::memory_order_relaxed);
254   grpc_core::Thread(
255       "event_engine",
256       [](void* arg) {
257         ThreadState* worker = static_cast<ThreadState*>(arg);
258         worker->ThreadBody();
259         delete worker;
260       },
261       new ThreadState(shared_from_this()), nullptr,
262       grpc_core::Thread::Options().set_tracked(false).set_joinable(false))
263       .Start();
264 }
265 
Quiesce()266 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Quiesce() {
267   SetShutdown(true);
268   // Wait until all threads have exited.
269   // Note that if this is a threadpool thread then we won't exit this thread
270   // until all other threads have exited, so we need to wait for just one thread
271   // running instead of zero.
272   bool is_threadpool_thread = g_local_queue != nullptr;
273   work_signal()->SignalAll();
274   auto threads_were_shut_down = living_thread_count_.BlockUntilThreadCount(
275       is_threadpool_thread ? 1 : 0, "shutting down",
276       g_log_verbose_failures ? kBlockUntilThreadCountTimeout
277                              : grpc_core::Duration::Infinity());
278   if (!threads_were_shut_down.ok() && g_log_verbose_failures) {
279     DumpStacksAndCrash();
280   }
281   CHECK(queue_.Empty());
282   quiesced_.store(true, std::memory_order_relaxed);
283   grpc_core::MutexLock lock(&lifeguard_ptr_mu_);
284   lifeguard_.reset();
285 }
286 
SetThrottled(bool throttled)287 bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::SetThrottled(
288     bool throttled) {
289   return throttled_.exchange(throttled, std::memory_order_relaxed);
290 }
291 
SetShutdown(bool is_shutdown)292 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::SetShutdown(
293     bool is_shutdown) {
294   auto was_shutdown = shutdown_.exchange(is_shutdown);
295   CHECK(is_shutdown != was_shutdown);
296   work_signal_.SignalAll();
297 }
298 
SetForking(bool is_forking)299 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::SetForking(
300     bool is_forking) {
301   auto was_forking = forking_.exchange(is_forking);
302   CHECK(is_forking != was_forking);
303 }
304 
IsForking()305 bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::IsForking() {
306   return forking_.load(std::memory_order_relaxed);
307 }
308 
IsShutdown()309 bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::IsShutdown() {
310   return shutdown_.load(std::memory_order_relaxed);
311 }
312 
IsQuiesced()313 bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::IsQuiesced() {
314   return quiesced_.load(std::memory_order_relaxed);
315 }
316 
PrepareFork()317 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::PrepareFork() {
318   GRPC_TRACE_LOG(event_engine, INFO)
319       << "WorkStealingThreadPoolImpl::PrepareFork";
320   SetForking(true);
321   work_signal_.SignalAll();
322   auto threads_were_shut_down = living_thread_count_.BlockUntilThreadCount(
323       0, "forking", kBlockUntilThreadCountTimeout);
324   if (!threads_were_shut_down.ok() && g_log_verbose_failures) {
325     DumpStacksAndCrash();
326   }
327   grpc_core::MutexLock lock(&lifeguard_ptr_mu_);
328   lifeguard_.reset();
329 }
330 
Postfork()331 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Postfork() {
332   SetForking(false);
333   Start();
334 }
335 
TrackThread(gpr_thd_id tid)336 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::TrackThread(
337     gpr_thd_id tid) {
338   grpc_core::MutexLock lock(&thd_set_mu_);
339   thds_.insert(tid);
340 }
341 
UntrackThread(gpr_thd_id tid)342 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::UntrackThread(
343     gpr_thd_id tid) {
344   grpc_core::MutexLock lock(&thd_set_mu_);
345   thds_.erase(tid);
346 }
347 
DumpStacksAndCrash()348 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::DumpStacksAndCrash() {
349   grpc_core::MutexLock lock(&thd_set_mu_);
350   LOG(ERROR) << "Pool did not quiesce in time, gRPC will not shut down "
351                 "cleanly. Dumping all "
352              << thds_.size() << " thread stacks.";
353   for (const auto tid : thds_) {
354     grpc_core::Thread::Signal(tid, kDumpStackSignal);
355   }
356   // If this is a thread pool thread, wait for one fewer thread.
357   auto ignore_thread_count = g_local_queue != nullptr ? 1 : 0;
358   while (living_thread_count_.count() - ignore_thread_count >
359          g_reported_dump_count.load()) {
360     absl::SleepFor(absl::Milliseconds(200));
361   }
362   grpc_core::Crash(
363       "Pool did not quiesce in time, gRPC will not shut down cleanly.");
364 }
365 
366 // -------- WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard -----
367 
Lifeguard(WorkStealingThreadPoolImpl * pool)368 WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Lifeguard(
369     WorkStealingThreadPoolImpl* pool)
370     : pool_(pool),
371       backoff_(grpc_core::BackOff::Options()
372                    .set_initial_backoff(kLifeguardMinSleepBetweenChecks)
373                    .set_max_backoff(kLifeguardMaxSleepBetweenChecks)
374                    .set_multiplier(1.3)),
375       lifeguard_should_shut_down_(std::make_unique<grpc_core::Notification>()),
376       lifeguard_is_shut_down_(std::make_unique<grpc_core::Notification>()) {
377   // lifeguard_running_ is set early to avoid a quiesce race while the
378   // lifeguard is still starting up.
379   lifeguard_running_.store(true);
380   grpc_core::Thread(
381       "lifeguard",
382       [](void* arg) {
383         auto* lifeguard = static_cast<Lifeguard*>(arg);
384         lifeguard->LifeguardMain();
385       },
386       this, nullptr,
387       grpc_core::Thread::Options().set_tracked(false).set_joinable(false))
388       .Start();
389 }
390 
391 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::
LifeguardMain()392     LifeguardMain() {
393   while (true) {
394     if (pool_->IsForking()) break;
395     // If the pool is shut down, loop quickly until quiesced. Otherwise,
396     // reduce the check rate if the pool is idle.
397     if (pool_->IsShutdown()) {
398       if (pool_->IsQuiesced()) break;
399     } else {
400       lifeguard_should_shut_down_->WaitForNotificationWithTimeout(
401           absl::Milliseconds(backoff_.NextAttemptDelay().millis()));
402     }
403     MaybeStartNewThread();
404   }
405   lifeguard_running_.store(false, std::memory_order_relaxed);
406   lifeguard_is_shut_down_->Notify();
407 }
408 
~Lifeguard()409 WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::~Lifeguard() {
410   lifeguard_should_shut_down_->Notify();
411   while (lifeguard_running_.load(std::memory_order_relaxed)) {
412     GRPC_LOG_EVERY_N_SEC_DELAYED_DEBUG(
413         kBlockingQuiesceLogRateSeconds, "%s",
414         "Waiting for lifeguard thread to shut down");
415     lifeguard_is_shut_down_->WaitForNotification();
416   }
417   // Do an additional wait in case this method races with LifeguardMain's
418   // shutdown. This should return immediately if the lifeguard is already shut
419   // down.
420   lifeguard_is_shut_down_->WaitForNotification();
421   backoff_.Reset();
422   lifeguard_should_shut_down_ = std::make_unique<grpc_core::Notification>();
423   lifeguard_is_shut_down_ = std::make_unique<grpc_core::Notification>();
424 }
425 
426 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::
MaybeStartNewThread()427     MaybeStartNewThread() {
428   // No new threads are started when forking.
429   // No new work is done when forking needs to begin.
430   if (pool_->forking_.load()) return;
431   const auto living_thread_count = pool_->living_thread_count()->count();
432   // Wake an idle worker thread if there's global work to be had.
433   if (pool_->busy_thread_count()->count() < living_thread_count) {
434     if (!pool_->queue_.Empty()) {
435       pool_->work_signal()->Signal();
436       backoff_.Reset();
437     }
438     // Idle threads will eventually wake up for an attempt at work stealing.
439     return;
440   }
441   // No new threads if in the throttled state.
442   // However, all workers are busy, so the Lifeguard should be more
443   // vigilant about checking whether a new thread must be started.
444   if (grpc_core::Timestamp::Now() -
445           grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
446               pool_->last_started_thread_) <
447       kTimeBetweenThrottledThreadStarts) {
448     backoff_.Reset();
449     return;
450   }
451   // All workers are busy and the pool is not throttled. Start a new thread.
452   // TODO(hork): new threads may spawn when there is no work in the global
453   // queue, nor any work to steal. Add more sophisticated logic about when to
454   // start a thread.
455   GRPC_TRACE_LOG(event_engine, INFO)
456       << "Starting new ThreadPool thread due to backlog (total threads: "
457       << living_thread_count + 1;
458   pool_->StartThread();
459   // Tell the lifeguard to monitor the pool more closely.
460   backoff_.Reset();
461 }
462 
463 // -------- WorkStealingThreadPool::ThreadState --------
464 
ThreadState(std::shared_ptr<WorkStealingThreadPoolImpl> pool)465 WorkStealingThreadPool::ThreadState::ThreadState(
466     std::shared_ptr<WorkStealingThreadPoolImpl> pool)
467     : pool_(std::move(pool)),
468       auto_thread_counter_(
469           pool_->living_thread_count()->MakeAutoThreadCounter()),
470       backoff_(grpc_core::BackOff::Options()
471                    .set_initial_backoff(kWorkerThreadMinSleepBetweenChecks)
472                    .set_max_backoff(kWorkerThreadMaxSleepBetweenChecks)
473                    .set_multiplier(1.3)),
474       busy_count_idx_(pool_->busy_thread_count()->NextIndex()) {}
475 
ThreadBody()476 void WorkStealingThreadPool::ThreadState::ThreadBody() {
477   if (g_log_verbose_failures) {
478 #ifdef GPR_POSIX_SYNC
479     std::signal(kDumpStackSignal, DumpSignalHandler);
480 #elif defined(GPR_WINDOWS)
481     signal(kDumpStackSignal, DumpSignalHandler);
482 #endif
483     pool_->TrackThread(gpr_thd_currentid());
484   }
485   g_local_queue = new BasicWorkQueue(pool_.get());
486   pool_->theft_registry()->Enroll(g_local_queue);
487   ThreadLocal::SetIsEventEngineThread(true);
488   while (Step()) {
489     // loop until the thread should no longer run
490   }
491   // cleanup
492   if (pool_->IsForking()) {
493     // TODO(hork): consider WorkQueue::AddAll(WorkQueue*)
494     EventEngine::Closure* closure;
495     while (!g_local_queue->Empty()) {
496       closure = g_local_queue->PopMostRecent();
497       if (closure != nullptr) {
498         pool_->queue()->Add(closure);
499       }
500     }
501   } else if (pool_->IsShutdown()) {
502     FinishDraining();
503   }
504   CHECK(g_local_queue->Empty());
505   pool_->theft_registry()->Unenroll(g_local_queue);
506   delete g_local_queue;
507   if (g_log_verbose_failures) {
508     pool_->UntrackThread(gpr_thd_currentid());
509   }
510 }
511 
SleepIfRunning()512 void WorkStealingThreadPool::ThreadState::SleepIfRunning() {
513   if (pool_->IsForking()) return;
514   absl::SleepFor(
515       absl::Milliseconds(kTimeBetweenThrottledThreadStarts.millis()));
516 }
517 
Step()518 bool WorkStealingThreadPool::ThreadState::Step() {
519   if (pool_->IsForking()) return false;
520   auto* closure = g_local_queue->PopMostRecent();
521   // If local work is available, run it.
522   if (closure != nullptr) {
523     auto busy =
524         pool_->busy_thread_count()->MakeAutoThreadCounter(busy_count_idx_);
525     closure->Run();
526     return true;
527   }
528   // Thread shutdown exit condition (ignoring fork). All must be true:
529   // * shutdown was called
530   // * the local queue is empty
531   // * the global queue is empty
532   // * the steal pool returns nullptr
533   bool should_run_again = false;
534   auto start_time = std::chrono::steady_clock::now();
535   // Wait until work is available or until shut down.
536   while (!pool_->IsForking()) {
537     // Pull from the global queue next
538     // TODO(hork): consider an empty check for performance wins. Depends on the
539     // queue implementation, the BasicWorkQueue takes two locks when you do an
540     // empty check then pop.
541     closure = pool_->queue()->PopMostRecent();
542     if (closure != nullptr) {
543       should_run_again = true;
544       break;
545     };
546     // Try stealing if the queue is empty
547     closure = pool_->theft_registry()->StealOne();
548     if (closure != nullptr) {
549       should_run_again = true;
550       break;
551     }
552     // No closures were retrieved from anywhere.
553     // Quit the thread if the pool has been shut down.
554     if (pool_->IsShutdown()) break;
555     bool timed_out =
556         pool_->work_signal()->WaitWithTimeout(backoff_.NextAttemptDelay());
557     if (pool_->IsForking() || pool_->IsShutdown()) break;
558     // Quit a thread if the pool has more than it requires, and this thread
559     // has been idle long enough.
560     if (timed_out &&
561         pool_->living_thread_count()->count() > pool_->reserve_threads() &&
562         std::chrono::steady_clock::now() - start_time > kIdleThreadLimit) {
563       return false;
564     }
565   }
566   if (pool_->IsForking()) {
567     // save the closure since we aren't going to execute it.
568     if (closure != nullptr) g_local_queue->Add(closure);
569     return false;
570   }
571   if (closure != nullptr) {
572     auto busy =
573         pool_->busy_thread_count()->MakeAutoThreadCounter(busy_count_idx_);
574     closure->Run();
575   }
576   backoff_.Reset();
577   return should_run_again;
578 }
579 
FinishDraining()580 void WorkStealingThreadPool::ThreadState::FinishDraining() {
581   // The thread is definitionally busy while draining
582   auto busy =
583       pool_->busy_thread_count()->MakeAutoThreadCounter(busy_count_idx_);
584   // If a fork occurs at any point during shutdown, quit draining. The post-fork
585   // threads will finish draining the global queue.
586   while (!pool_->IsForking()) {
587     if (!g_local_queue->Empty()) {
588       auto* closure = g_local_queue->PopMostRecent();
589       if (closure != nullptr) {
590         closure->Run();
591       }
592       continue;
593     }
594     if (!pool_->queue()->Empty()) {
595       auto* closure = pool_->queue()->PopMostRecent();
596       if (closure != nullptr) {
597         closure->Run();
598       }
599       continue;
600     }
601     break;
602   }
603 }
604 
605 // -------- WorkStealingThreadPool::WorkSignal --------
606 
Signal()607 void WorkStealingThreadPool::WorkSignal::Signal() {
608   grpc_core::MutexLock lock(&mu_);
609   cv_.Signal();
610 }
611 
SignalAll()612 void WorkStealingThreadPool::WorkSignal::SignalAll() {
613   grpc_core::MutexLock lock(&mu_);
614   cv_.SignalAll();
615 }
616 
WaitWithTimeout(grpc_core::Duration time)617 bool WorkStealingThreadPool::WorkSignal::WaitWithTimeout(
618     grpc_core::Duration time) {
619   grpc_core::MutexLock lock(&mu_);
620   return cv_.WaitWithTimeout(&mu_, absl::Milliseconds(time.millis()));
621 }
622 
623 }  // namespace experimental
624 }  // namespace grpc_event_engine
625