1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 #include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h"
19
20 #include <grpc/support/port_platform.h>
21 #include <grpc/support/thd_id.h>
22 #include <inttypes.h>
23
24 #include <atomic>
25 #include <chrono>
26 #include <cstddef>
27 #include <memory>
28 #include <utility>
29
30 #include "absl/functional/any_invocable.h"
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "absl/time/clock.h"
34 #include "absl/time/time.h"
35 #include "absl/types/optional.h"
36 #include "src/core/lib/debug/trace.h"
37 #include "src/core/lib/event_engine/common_closures.h"
38 #include "src/core/lib/event_engine/thread_local.h"
39 #include "src/core/lib/event_engine/work_queue/basic_work_queue.h"
40 #include "src/core/lib/event_engine/work_queue/work_queue.h"
41 #include "src/core/util/backoff.h"
42 #include "src/core/util/crash.h"
43 #include "src/core/util/env.h"
44 #include "src/core/util/examine_stack.h"
45 #include "src/core/util/thd.h"
46 #include "src/core/util/time.h"
47
48 #ifdef GPR_POSIX_SYNC
49 #include <csignal>
50 #elif defined(GPR_WINDOWS)
51 #include <signal.h>
52 #endif
53
54 // IWYU pragma: no_include <ratio>
55
56 // ## Thread Pool Fork-handling
57 //
58 // Thread-safety needs special attention with regard to fork() calls. The
59 // Forkable system employs a pre- and post- fork callback system that does not
60 // guarantee any ordering of execution. On fork() events, the thread pool does
61 // the following:
62 //
63 // On pre-fork:
64 // * the WorkStealingThreadPool triggers all threads to exit,
65 // * all queued work is saved, and
66 // * all threads will are down, including the Lifeguard thread.
67 //
68 // On post-fork:
69 // * all threads are restarted, including the Lifeguard thread, and
70 // * all previously-saved work is enqueued for execution.
71 //
72 // However, the queue may may get into trouble if one thread is attempting to
73 // restart the thread pool while another thread is shutting it down. For that
74 // reason, Quiesce and Start must be thread-safe, and Quiesce must wait for the
75 // pool to be in a fully started state before it is allowed to continue.
76 // Consider this potential ordering of events between Start and Quiesce:
77 //
78 // ┌──────────┐
79 // │ Thread 1 │
80 // └────┬─────┘ ┌──────────┐
81 // │ │ Thread 2 │
82 // ▼ └────┬─────┘
83 // Start() │
84 // │ ▼
85 // │ Quiesce()
86 // │ Wait for worker threads to exit
87 // │ Wait for the lifeguard thread to exit
88 // ▼
89 // Start the Lifeguard thread
90 // Start the worker threads
91 //
92 // Thread 2 will find no worker threads, and it will then want to wait on a
93 // non-existent Lifeguard thread to finish. Trying a simple
94 // `lifeguard_thread_.Join()` leads to memory access errors. This implementation
95 // uses Notifications to coordinate startup and shutdown states.
96 //
97 // ## Debugging
98 //
99 // Set the environment variable GRPC_THREAD_POOL_VERBOSE_FAILURES=anything to
100 // enable advanced debugging. When the pool takes too long to quiesce, a
101 // backtrace will be printed for every running thread, and the process will
102 // abort.
103
104 namespace grpc_event_engine {
105 namespace experimental {
106
107 namespace {
108 // TODO(ctiller): grpc_core::Timestamp, Duration have very specific contracts
109 // around time caching and around when the underlying gpr_now call can be
110 // substituted out.
111 // We should probably move all usage here to std::chrono to avoid weird bugs in
112 // the future.
113
114 // Maximum amount of time an extra thread is allowed to idle before being
115 // reclaimed.
116 constexpr auto kIdleThreadLimit = std::chrono::seconds(20);
117 // Rate at which "Waiting for ..." logs should be printed while quiescing.
118 constexpr size_t kBlockingQuiesceLogRateSeconds = 3;
119 // Minimum time between thread creations.
120 constexpr grpc_core::Duration kTimeBetweenThrottledThreadStarts =
121 grpc_core::Duration::Seconds(1);
122 // Minimum time a worker thread should sleep between checking for new work. Used
123 // in backoff calculations to reduce vigilance when the pool is calm.
124 constexpr grpc_core::Duration kWorkerThreadMinSleepBetweenChecks{
125 grpc_core::Duration::Milliseconds(15)};
126 // Maximum time a worker thread should sleep between checking for new work.
127 constexpr grpc_core::Duration kWorkerThreadMaxSleepBetweenChecks{
128 grpc_core::Duration::Seconds(3)};
129 // Minimum time the lifeguard thread should sleep between checks. Used in
130 // backoff calculations to reduce vigilance when the pool is calm.
131 constexpr grpc_core::Duration kLifeguardMinSleepBetweenChecks{
132 grpc_core::Duration::Milliseconds(15)};
133 // Maximum time the lifeguard thread should sleep between checking for new work.
134 constexpr grpc_core::Duration kLifeguardMaxSleepBetweenChecks{
135 grpc_core::Duration::Seconds(1)};
136 constexpr grpc_core::Duration kBlockUntilThreadCountTimeout{
137 grpc_core::Duration::Seconds(60)};
138
139 #ifdef GPR_POSIX_SYNC
140 const bool g_log_verbose_failures =
141 grpc_core::GetEnv("GRPC_THREAD_POOL_VERBOSE_FAILURES").has_value();
142 constexpr int kDumpStackSignal = SIGUSR1;
143 #elif defined(GPR_WINDOWS)
144 const bool g_log_verbose_failures =
145 grpc_core::GetEnv("GRPC_THREAD_POOL_VERBOSE_FAILURES").has_value();
146 constexpr int kDumpStackSignal = SIGTERM;
147 #else
148 constexpr bool g_log_verbose_failures = false;
149 constexpr int kDumpStackSignal = -1;
150 #endif
151
152 std::atomic<size_t> g_reported_dump_count{0};
153
DumpSignalHandler(int)154 void DumpSignalHandler(int /* sig */) {
155 const auto trace = grpc_core::GetCurrentStackTrace();
156 if (!trace.has_value()) {
157 LOG(ERROR) << "DumpStack::" << gpr_thd_currentid()
158 << ": Stack trace not available";
159 } else {
160 LOG(ERROR) << "DumpStack::" << gpr_thd_currentid() << ": " << trace.value();
161 }
162 g_reported_dump_count.fetch_add(1);
163 grpc_core::Thread::Kill(gpr_thd_currentid());
164 }
165
166 } // namespace
167
168 thread_local WorkQueue* g_local_queue = nullptr;
169
170 // -------- WorkStealingThreadPool --------
171
WorkStealingThreadPool(size_t reserve_threads)172 WorkStealingThreadPool::WorkStealingThreadPool(size_t reserve_threads)
173 : pool_{std::make_shared<WorkStealingThreadPoolImpl>(reserve_threads)} {
174 if (g_log_verbose_failures) {
175 GRPC_TRACE_LOG(event_engine, INFO)
176 << "WorkStealingThreadPool verbose failures are enabled";
177 }
178 pool_->Start();
179 }
180
Quiesce()181 void WorkStealingThreadPool::Quiesce() { pool_->Quiesce(); }
182
~WorkStealingThreadPool()183 WorkStealingThreadPool::~WorkStealingThreadPool() {
184 CHECK(pool_->IsQuiesced());
185 }
186
Run(absl::AnyInvocable<void ()> callback)187 void WorkStealingThreadPool::Run(absl::AnyInvocable<void()> callback) {
188 Run(SelfDeletingClosure::Create(std::move(callback)));
189 }
190
Run(EventEngine::Closure * closure)191 void WorkStealingThreadPool::Run(EventEngine::Closure* closure) {
192 pool_->Run(closure);
193 }
194
195 // -------- WorkStealingThreadPool::TheftRegistry --------
196
Enroll(WorkQueue * queue)197 void WorkStealingThreadPool::TheftRegistry::Enroll(WorkQueue* queue) {
198 grpc_core::MutexLock lock(&mu_);
199 queues_.emplace(queue);
200 }
201
Unenroll(WorkQueue * queue)202 void WorkStealingThreadPool::TheftRegistry::Unenroll(WorkQueue* queue) {
203 grpc_core::MutexLock lock(&mu_);
204 queues_.erase(queue);
205 }
206
StealOne()207 EventEngine::Closure* WorkStealingThreadPool::TheftRegistry::StealOne() {
208 grpc_core::MutexLock lock(&mu_);
209 EventEngine::Closure* closure;
210 for (auto* queue : queues_) {
211 closure = queue->PopMostRecent();
212 if (closure != nullptr) return closure;
213 }
214 return nullptr;
215 }
216
PrepareFork()217 void WorkStealingThreadPool::PrepareFork() { pool_->PrepareFork(); }
218
PostforkParent()219 void WorkStealingThreadPool::PostforkParent() { pool_->Postfork(); }
220
PostforkChild()221 void WorkStealingThreadPool::PostforkChild() { pool_->Postfork(); }
222
223 // -------- WorkStealingThreadPool::WorkStealingThreadPoolImpl --------
224
WorkStealingThreadPoolImpl(size_t reserve_threads)225 WorkStealingThreadPool::WorkStealingThreadPoolImpl::WorkStealingThreadPoolImpl(
226 size_t reserve_threads)
227 : reserve_threads_(reserve_threads), queue_(this) {}
228
Start()229 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Start() {
230 for (size_t i = 0; i < reserve_threads_; i++) {
231 StartThread();
232 }
233 grpc_core::MutexLock lock(&lifeguard_ptr_mu_);
234 lifeguard_ = std::make_unique<Lifeguard>(this);
235 }
236
Run(EventEngine::Closure * closure)237 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Run(
238 EventEngine::Closure* closure) {
239 CHECK(!IsQuiesced());
240 if (g_local_queue != nullptr && g_local_queue->owner() == this) {
241 g_local_queue->Add(closure);
242 } else {
243 queue_.Add(closure);
244 }
245 // Signal a worker in any case, even if work was added to a local queue. This
246 // improves performance on 32-core streaming benchmarks with small payloads.
247 work_signal_.Signal();
248 }
249
StartThread()250 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread() {
251 last_started_thread_.store(
252 grpc_core::Timestamp::Now().milliseconds_after_process_epoch(),
253 std::memory_order_relaxed);
254 grpc_core::Thread(
255 "event_engine",
256 [](void* arg) {
257 ThreadState* worker = static_cast<ThreadState*>(arg);
258 worker->ThreadBody();
259 delete worker;
260 },
261 new ThreadState(shared_from_this()), nullptr,
262 grpc_core::Thread::Options().set_tracked(false).set_joinable(false))
263 .Start();
264 }
265
Quiesce()266 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Quiesce() {
267 SetShutdown(true);
268 // Wait until all threads have exited.
269 // Note that if this is a threadpool thread then we won't exit this thread
270 // until all other threads have exited, so we need to wait for just one thread
271 // running instead of zero.
272 bool is_threadpool_thread = g_local_queue != nullptr;
273 work_signal()->SignalAll();
274 auto threads_were_shut_down = living_thread_count_.BlockUntilThreadCount(
275 is_threadpool_thread ? 1 : 0, "shutting down",
276 g_log_verbose_failures ? kBlockUntilThreadCountTimeout
277 : grpc_core::Duration::Infinity());
278 if (!threads_were_shut_down.ok() && g_log_verbose_failures) {
279 DumpStacksAndCrash();
280 }
281 CHECK(queue_.Empty());
282 quiesced_.store(true, std::memory_order_relaxed);
283 grpc_core::MutexLock lock(&lifeguard_ptr_mu_);
284 lifeguard_.reset();
285 }
286
SetThrottled(bool throttled)287 bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::SetThrottled(
288 bool throttled) {
289 return throttled_.exchange(throttled, std::memory_order_relaxed);
290 }
291
SetShutdown(bool is_shutdown)292 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::SetShutdown(
293 bool is_shutdown) {
294 auto was_shutdown = shutdown_.exchange(is_shutdown);
295 CHECK(is_shutdown != was_shutdown);
296 work_signal_.SignalAll();
297 }
298
SetForking(bool is_forking)299 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::SetForking(
300 bool is_forking) {
301 auto was_forking = forking_.exchange(is_forking);
302 CHECK(is_forking != was_forking);
303 }
304
IsForking()305 bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::IsForking() {
306 return forking_.load(std::memory_order_relaxed);
307 }
308
IsShutdown()309 bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::IsShutdown() {
310 return shutdown_.load(std::memory_order_relaxed);
311 }
312
IsQuiesced()313 bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::IsQuiesced() {
314 return quiesced_.load(std::memory_order_relaxed);
315 }
316
PrepareFork()317 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::PrepareFork() {
318 GRPC_TRACE_LOG(event_engine, INFO)
319 << "WorkStealingThreadPoolImpl::PrepareFork";
320 SetForking(true);
321 work_signal_.SignalAll();
322 auto threads_were_shut_down = living_thread_count_.BlockUntilThreadCount(
323 0, "forking", kBlockUntilThreadCountTimeout);
324 if (!threads_were_shut_down.ok() && g_log_verbose_failures) {
325 DumpStacksAndCrash();
326 }
327 grpc_core::MutexLock lock(&lifeguard_ptr_mu_);
328 lifeguard_.reset();
329 }
330
Postfork()331 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Postfork() {
332 SetForking(false);
333 Start();
334 }
335
TrackThread(gpr_thd_id tid)336 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::TrackThread(
337 gpr_thd_id tid) {
338 grpc_core::MutexLock lock(&thd_set_mu_);
339 thds_.insert(tid);
340 }
341
UntrackThread(gpr_thd_id tid)342 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::UntrackThread(
343 gpr_thd_id tid) {
344 grpc_core::MutexLock lock(&thd_set_mu_);
345 thds_.erase(tid);
346 }
347
DumpStacksAndCrash()348 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::DumpStacksAndCrash() {
349 grpc_core::MutexLock lock(&thd_set_mu_);
350 LOG(ERROR) << "Pool did not quiesce in time, gRPC will not shut down "
351 "cleanly. Dumping all "
352 << thds_.size() << " thread stacks.";
353 for (const auto tid : thds_) {
354 grpc_core::Thread::Signal(tid, kDumpStackSignal);
355 }
356 // If this is a thread pool thread, wait for one fewer thread.
357 auto ignore_thread_count = g_local_queue != nullptr ? 1 : 0;
358 while (living_thread_count_.count() - ignore_thread_count >
359 g_reported_dump_count.load()) {
360 absl::SleepFor(absl::Milliseconds(200));
361 }
362 grpc_core::Crash(
363 "Pool did not quiesce in time, gRPC will not shut down cleanly.");
364 }
365
366 // -------- WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard -----
367
Lifeguard(WorkStealingThreadPoolImpl * pool)368 WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Lifeguard(
369 WorkStealingThreadPoolImpl* pool)
370 : pool_(pool),
371 backoff_(grpc_core::BackOff::Options()
372 .set_initial_backoff(kLifeguardMinSleepBetweenChecks)
373 .set_max_backoff(kLifeguardMaxSleepBetweenChecks)
374 .set_multiplier(1.3)),
375 lifeguard_should_shut_down_(std::make_unique<grpc_core::Notification>()),
376 lifeguard_is_shut_down_(std::make_unique<grpc_core::Notification>()) {
377 // lifeguard_running_ is set early to avoid a quiesce race while the
378 // lifeguard is still starting up.
379 lifeguard_running_.store(true);
380 grpc_core::Thread(
381 "lifeguard",
382 [](void* arg) {
383 auto* lifeguard = static_cast<Lifeguard*>(arg);
384 lifeguard->LifeguardMain();
385 },
386 this, nullptr,
387 grpc_core::Thread::Options().set_tracked(false).set_joinable(false))
388 .Start();
389 }
390
391 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::
LifeguardMain()392 LifeguardMain() {
393 while (true) {
394 if (pool_->IsForking()) break;
395 // If the pool is shut down, loop quickly until quiesced. Otherwise,
396 // reduce the check rate if the pool is idle.
397 if (pool_->IsShutdown()) {
398 if (pool_->IsQuiesced()) break;
399 } else {
400 lifeguard_should_shut_down_->WaitForNotificationWithTimeout(
401 absl::Milliseconds(backoff_.NextAttemptDelay().millis()));
402 }
403 MaybeStartNewThread();
404 }
405 lifeguard_running_.store(false, std::memory_order_relaxed);
406 lifeguard_is_shut_down_->Notify();
407 }
408
~Lifeguard()409 WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::~Lifeguard() {
410 lifeguard_should_shut_down_->Notify();
411 while (lifeguard_running_.load(std::memory_order_relaxed)) {
412 GRPC_LOG_EVERY_N_SEC_DELAYED_DEBUG(
413 kBlockingQuiesceLogRateSeconds, "%s",
414 "Waiting for lifeguard thread to shut down");
415 lifeguard_is_shut_down_->WaitForNotification();
416 }
417 // Do an additional wait in case this method races with LifeguardMain's
418 // shutdown. This should return immediately if the lifeguard is already shut
419 // down.
420 lifeguard_is_shut_down_->WaitForNotification();
421 backoff_.Reset();
422 lifeguard_should_shut_down_ = std::make_unique<grpc_core::Notification>();
423 lifeguard_is_shut_down_ = std::make_unique<grpc_core::Notification>();
424 }
425
426 void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::
MaybeStartNewThread()427 MaybeStartNewThread() {
428 // No new threads are started when forking.
429 // No new work is done when forking needs to begin.
430 if (pool_->forking_.load()) return;
431 const auto living_thread_count = pool_->living_thread_count()->count();
432 // Wake an idle worker thread if there's global work to be had.
433 if (pool_->busy_thread_count()->count() < living_thread_count) {
434 if (!pool_->queue_.Empty()) {
435 pool_->work_signal()->Signal();
436 backoff_.Reset();
437 }
438 // Idle threads will eventually wake up for an attempt at work stealing.
439 return;
440 }
441 // No new threads if in the throttled state.
442 // However, all workers are busy, so the Lifeguard should be more
443 // vigilant about checking whether a new thread must be started.
444 if (grpc_core::Timestamp::Now() -
445 grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
446 pool_->last_started_thread_) <
447 kTimeBetweenThrottledThreadStarts) {
448 backoff_.Reset();
449 return;
450 }
451 // All workers are busy and the pool is not throttled. Start a new thread.
452 // TODO(hork): new threads may spawn when there is no work in the global
453 // queue, nor any work to steal. Add more sophisticated logic about when to
454 // start a thread.
455 GRPC_TRACE_LOG(event_engine, INFO)
456 << "Starting new ThreadPool thread due to backlog (total threads: "
457 << living_thread_count + 1;
458 pool_->StartThread();
459 // Tell the lifeguard to monitor the pool more closely.
460 backoff_.Reset();
461 }
462
463 // -------- WorkStealingThreadPool::ThreadState --------
464
ThreadState(std::shared_ptr<WorkStealingThreadPoolImpl> pool)465 WorkStealingThreadPool::ThreadState::ThreadState(
466 std::shared_ptr<WorkStealingThreadPoolImpl> pool)
467 : pool_(std::move(pool)),
468 auto_thread_counter_(
469 pool_->living_thread_count()->MakeAutoThreadCounter()),
470 backoff_(grpc_core::BackOff::Options()
471 .set_initial_backoff(kWorkerThreadMinSleepBetweenChecks)
472 .set_max_backoff(kWorkerThreadMaxSleepBetweenChecks)
473 .set_multiplier(1.3)),
474 busy_count_idx_(pool_->busy_thread_count()->NextIndex()) {}
475
ThreadBody()476 void WorkStealingThreadPool::ThreadState::ThreadBody() {
477 if (g_log_verbose_failures) {
478 #ifdef GPR_POSIX_SYNC
479 std::signal(kDumpStackSignal, DumpSignalHandler);
480 #elif defined(GPR_WINDOWS)
481 signal(kDumpStackSignal, DumpSignalHandler);
482 #endif
483 pool_->TrackThread(gpr_thd_currentid());
484 }
485 g_local_queue = new BasicWorkQueue(pool_.get());
486 pool_->theft_registry()->Enroll(g_local_queue);
487 ThreadLocal::SetIsEventEngineThread(true);
488 while (Step()) {
489 // loop until the thread should no longer run
490 }
491 // cleanup
492 if (pool_->IsForking()) {
493 // TODO(hork): consider WorkQueue::AddAll(WorkQueue*)
494 EventEngine::Closure* closure;
495 while (!g_local_queue->Empty()) {
496 closure = g_local_queue->PopMostRecent();
497 if (closure != nullptr) {
498 pool_->queue()->Add(closure);
499 }
500 }
501 } else if (pool_->IsShutdown()) {
502 FinishDraining();
503 }
504 CHECK(g_local_queue->Empty());
505 pool_->theft_registry()->Unenroll(g_local_queue);
506 delete g_local_queue;
507 if (g_log_verbose_failures) {
508 pool_->UntrackThread(gpr_thd_currentid());
509 }
510 }
511
SleepIfRunning()512 void WorkStealingThreadPool::ThreadState::SleepIfRunning() {
513 if (pool_->IsForking()) return;
514 absl::SleepFor(
515 absl::Milliseconds(kTimeBetweenThrottledThreadStarts.millis()));
516 }
517
Step()518 bool WorkStealingThreadPool::ThreadState::Step() {
519 if (pool_->IsForking()) return false;
520 auto* closure = g_local_queue->PopMostRecent();
521 // If local work is available, run it.
522 if (closure != nullptr) {
523 auto busy =
524 pool_->busy_thread_count()->MakeAutoThreadCounter(busy_count_idx_);
525 closure->Run();
526 return true;
527 }
528 // Thread shutdown exit condition (ignoring fork). All must be true:
529 // * shutdown was called
530 // * the local queue is empty
531 // * the global queue is empty
532 // * the steal pool returns nullptr
533 bool should_run_again = false;
534 auto start_time = std::chrono::steady_clock::now();
535 // Wait until work is available or until shut down.
536 while (!pool_->IsForking()) {
537 // Pull from the global queue next
538 // TODO(hork): consider an empty check for performance wins. Depends on the
539 // queue implementation, the BasicWorkQueue takes two locks when you do an
540 // empty check then pop.
541 closure = pool_->queue()->PopMostRecent();
542 if (closure != nullptr) {
543 should_run_again = true;
544 break;
545 };
546 // Try stealing if the queue is empty
547 closure = pool_->theft_registry()->StealOne();
548 if (closure != nullptr) {
549 should_run_again = true;
550 break;
551 }
552 // No closures were retrieved from anywhere.
553 // Quit the thread if the pool has been shut down.
554 if (pool_->IsShutdown()) break;
555 bool timed_out =
556 pool_->work_signal()->WaitWithTimeout(backoff_.NextAttemptDelay());
557 if (pool_->IsForking() || pool_->IsShutdown()) break;
558 // Quit a thread if the pool has more than it requires, and this thread
559 // has been idle long enough.
560 if (timed_out &&
561 pool_->living_thread_count()->count() > pool_->reserve_threads() &&
562 std::chrono::steady_clock::now() - start_time > kIdleThreadLimit) {
563 return false;
564 }
565 }
566 if (pool_->IsForking()) {
567 // save the closure since we aren't going to execute it.
568 if (closure != nullptr) g_local_queue->Add(closure);
569 return false;
570 }
571 if (closure != nullptr) {
572 auto busy =
573 pool_->busy_thread_count()->MakeAutoThreadCounter(busy_count_idx_);
574 closure->Run();
575 }
576 backoff_.Reset();
577 return should_run_again;
578 }
579
FinishDraining()580 void WorkStealingThreadPool::ThreadState::FinishDraining() {
581 // The thread is definitionally busy while draining
582 auto busy =
583 pool_->busy_thread_count()->MakeAutoThreadCounter(busy_count_idx_);
584 // If a fork occurs at any point during shutdown, quit draining. The post-fork
585 // threads will finish draining the global queue.
586 while (!pool_->IsForking()) {
587 if (!g_local_queue->Empty()) {
588 auto* closure = g_local_queue->PopMostRecent();
589 if (closure != nullptr) {
590 closure->Run();
591 }
592 continue;
593 }
594 if (!pool_->queue()->Empty()) {
595 auto* closure = pool_->queue()->PopMostRecent();
596 if (closure != nullptr) {
597 closure->Run();
598 }
599 continue;
600 }
601 break;
602 }
603 }
604
605 // -------- WorkStealingThreadPool::WorkSignal --------
606
Signal()607 void WorkStealingThreadPool::WorkSignal::Signal() {
608 grpc_core::MutexLock lock(&mu_);
609 cv_.Signal();
610 }
611
SignalAll()612 void WorkStealingThreadPool::WorkSignal::SignalAll() {
613 grpc_core::MutexLock lock(&mu_);
614 cv_.SignalAll();
615 }
616
WaitWithTimeout(grpc_core::Duration time)617 bool WorkStealingThreadPool::WorkSignal::WaitWithTimeout(
618 grpc_core::Duration time) {
619 grpc_core::MutexLock lock(&mu_);
620 return cv_.WaitWithTimeout(&mu_, absl::Milliseconds(time.millis()));
621 }
622
623 } // namespace experimental
624 } // namespace grpc_event_engine
625