1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
15
16 #include <grpc/grpc.h>
17 #include <grpc/support/thd_id.h>
18
19 #include <atomic>
20 #include <chrono>
21 #include <cmath>
22 #include <cstddef>
23 #include <functional>
24 #include <memory>
25 #include <thread>
26 #include <tuple>
27 #include <vector>
28
29 #include "absl/time/clock.h"
30 #include "absl/time/time.h"
31 #include "gtest/gtest.h"
32 #include "src/core/lib/event_engine/thread_pool/thread_count.h"
33 #include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h"
34 #include "src/core/util/notification.h"
35 #include "src/core/util/thd.h"
36 #include "src/core/util/time.h"
37 #include "test/core/test_util/test_config.h"
38
39 namespace grpc_event_engine {
40 namespace experimental {
41
42 template <typename T>
43 class ThreadPoolTest : public testing::Test {};
44
45 using ThreadPoolTypes = ::testing::Types<WorkStealingThreadPool>;
46 TYPED_TEST_SUITE(ThreadPoolTest, ThreadPoolTypes);
47
TYPED_TEST(ThreadPoolTest,CanRunAnyInvocable)48 TYPED_TEST(ThreadPoolTest, CanRunAnyInvocable) {
49 TypeParam p(8);
50 grpc_core::Notification n;
51 p.Run([&n] { n.Notify(); });
52 n.WaitForNotification();
53 p.Quiesce();
54 }
55
TYPED_TEST(ThreadPoolTest,CanDestroyInsideClosure)56 TYPED_TEST(ThreadPoolTest, CanDestroyInsideClosure) {
57 auto* p = new TypeParam(8);
58 grpc_core::Notification n;
59 p->Run([p, &n]() mutable {
60 // This should delete the thread pool and not deadlock
61 p->Quiesce();
62 delete p;
63 n.Notify();
64 });
65 n.WaitForNotification();
66 }
67
TYPED_TEST(ThreadPoolTest,CanSurviveFork)68 TYPED_TEST(ThreadPoolTest, CanSurviveFork) {
69 TypeParam p(8);
70 grpc_core::Notification inner_closure_ran;
71 p.Run([&inner_closure_ran, &p] {
72 std::this_thread::sleep_for(std::chrono::seconds(1));
73 p.Run([&inner_closure_ran] {
74 std::this_thread::sleep_for(std::chrono::seconds(1));
75 inner_closure_ran.Notify();
76 });
77 });
78 // simulate a fork and watch the child process
79 p.PrepareFork();
80 p.PostforkChild();
81 inner_closure_ran.WaitForNotification();
82 grpc_core::Notification n2;
83 p.Run([&n2] { n2.Notify(); });
84 n2.WaitForNotification();
85 p.Quiesce();
86 }
87
TYPED_TEST(ThreadPoolTest,ForkStressTest)88 TYPED_TEST(ThreadPoolTest, ForkStressTest) {
89 // Runs a large number of closures and multiple simulated fork events,
90 // ensuring that only some fixed number of closures are executed between fork
91 // events.
92 //
93 // Why: Python relies on fork support, and fork behaves poorly in the presence
94 // of threads, but non-deterministically. gRPC has had problems in this space.
95 // This test exercises a subset of the fork logic, the pieces we can control
96 // without an actual OS fork.
97 constexpr int expected_runcount = 1000;
98 constexpr absl::Duration fork_frequency{absl::Milliseconds(50)};
99 constexpr int num_closures_between_forks{100};
100 TypeParam pool(8);
101 std::atomic<int> runcount{0};
102 std::atomic<int> fork_count{0};
103 std::function<void()> inner_fn;
104 inner_fn = [&]() {
105 auto curr_runcount = runcount.load(std::memory_order_relaxed);
106 // exit when the right number of closures have run, with some flex for
107 // relaxed atomics.
108 if (curr_runcount >= expected_runcount) return;
109 if (fork_count.load(std::memory_order_relaxed) *
110 num_closures_between_forks <=
111 curr_runcount) {
112 // skip incrementing, and schedule again.
113 pool.Run(inner_fn);
114 return;
115 }
116 runcount.fetch_add(1, std::memory_order_relaxed);
117 };
118 for (auto i = 0; i < expected_runcount; i++) {
119 pool.Run(inner_fn);
120 }
121 // simulate multiple forks at a fixed frequency
122 int curr_runcount = 0;
123 while (curr_runcount < expected_runcount) {
124 absl::SleepFor(fork_frequency);
125 curr_runcount = runcount.load(std::memory_order_relaxed);
126 int curr_forkcount = fork_count.load(std::memory_order_relaxed);
127 if (curr_forkcount * num_closures_between_forks > curr_runcount) {
128 continue;
129 }
130 pool.PrepareFork();
131 pool.PostforkChild();
132 fork_count.fetch_add(1);
133 }
134 ASSERT_GE(fork_count.load(), expected_runcount / num_closures_between_forks);
135 // owners are the local pool, and the copy inside `inner_fn`.
136 pool.Quiesce();
137 }
138
TYPED_TEST(ThreadPoolTest,StartQuiesceRaceStressTest)139 TYPED_TEST(ThreadPoolTest, StartQuiesceRaceStressTest) {
140 // Repeatedly race Start and Quiesce against each other to ensure thread
141 // safety.
142 constexpr int iter_count = 500;
143 struct ThdState {
144 std::unique_ptr<TypeParam> pool;
145 int i;
146 };
147 for (auto i = 0; i < iter_count; i++) {
148 ThdState state{std::make_unique<TypeParam>(8), i};
149 state.pool->PrepareFork();
150 grpc_core::Thread t1(
151 "t1",
152 [](void* arg) {
153 ThdState* state = static_cast<ThdState*>(arg);
154 state->i % 2 == 0 ? state->pool->Quiesce()
155 : state->pool->PostforkParent();
156 },
157 &state, nullptr,
158 grpc_core::Thread::Options().set_tracked(false).set_joinable(true));
159 grpc_core::Thread t2(
160 "t2",
161 [](void* arg) {
162 ThdState* state = static_cast<ThdState*>(arg);
163 state->i % 2 == 1 ? state->pool->Quiesce()
164 : state->pool->PostforkParent();
165 },
166 &state, nullptr,
167 grpc_core::Thread::Options().set_tracked(false).set_joinable(true));
168 t1.Start();
169 t2.Start();
170 t1.Join();
171 t2.Join();
172 }
173 }
174
ScheduleSelf(ThreadPool * p)175 void ScheduleSelf(ThreadPool* p) {
176 p->Run([p] { ScheduleSelf(p); });
177 }
178
ScheduleTwiceUntilZero(ThreadPool * p,std::atomic<int> & runcount,int n)179 void ScheduleTwiceUntilZero(ThreadPool* p, std::atomic<int>& runcount, int n) {
180 runcount.fetch_add(1);
181 if (n == 0) return;
182 p->Run([p, &runcount, n] {
183 ScheduleTwiceUntilZero(p, runcount, n - 1);
184 ScheduleTwiceUntilZero(p, runcount, n - 1);
185 });
186 }
187
TYPED_TEST(ThreadPoolTest,CanStartLotsOfClosures)188 TYPED_TEST(ThreadPoolTest, CanStartLotsOfClosures) {
189 TypeParam p(8);
190 std::atomic<int> runcount{0};
191 int branch_factor = 20;
192 ScheduleTwiceUntilZero(&p, runcount, branch_factor);
193 p.Quiesce();
194 ASSERT_EQ(runcount.load(), pow(2, branch_factor + 1) - 1);
195 }
196
TYPED_TEST(ThreadPoolTest,ScalesWhenBackloggedFromGlobalQueue)197 TYPED_TEST(ThreadPoolTest, ScalesWhenBackloggedFromGlobalQueue) {
198 int pool_thread_count = 8;
199 TypeParam p(pool_thread_count);
200 grpc_core::Notification signal;
201 // Ensures the pool is saturated before signaling closures to continue.
202 std::atomic<int> waiters{0};
203 std::atomic<bool> signaled{false};
204 for (auto i = 0; i < pool_thread_count; i++) {
205 p.Run([&]() {
206 waiters.fetch_add(1);
207 while (!signaled.load()) {
208 signal.WaitForNotification();
209 }
210 });
211 }
212 while (waiters.load() != pool_thread_count) {
213 absl::SleepFor(absl::Milliseconds(50));
214 }
215 p.Run([&]() {
216 signaled.store(true);
217 signal.Notify();
218 });
219 p.Quiesce();
220 }
221
TYPED_TEST(ThreadPoolTest,ScalesWhenBackloggedFromSingleThreadLocalQueue)222 TYPED_TEST(ThreadPoolTest, ScalesWhenBackloggedFromSingleThreadLocalQueue) {
223 constexpr int pool_thread_count = 8;
224 TypeParam p(pool_thread_count);
225 grpc_core::Notification signal;
226 // Ensures the pool is saturated before signaling closures to continue.
227 std::atomic<int> waiters{0};
228 std::atomic<bool> signaled{false};
229 p.Run([&]() {
230 for (int i = 0; i < pool_thread_count; i++) {
231 p.Run([&]() {
232 waiters.fetch_add(1);
233 while (!signaled.load()) {
234 signal.WaitForNotification();
235 }
236 });
237 }
238 while (waiters.load() != pool_thread_count) {
239 absl::SleepFor(absl::Milliseconds(50));
240 }
241 p.Run([&]() {
242 signaled.store(true);
243 signal.Notify();
244 });
245 });
246 p.Quiesce();
247 }
248
TYPED_TEST(ThreadPoolTest,QuiesceRaceStressTest)249 TYPED_TEST(ThreadPoolTest, QuiesceRaceStressTest) {
250 constexpr int cycle_count = 333;
251 constexpr int thread_count = 8;
252 constexpr int run_count = thread_count * 2;
253 for (auto i = 0; i < cycle_count; i++) {
254 TypeParam p(thread_count);
255 for (auto j = 0; j < run_count; j++) {
256 p.Run([]() {});
257 }
258 p.Quiesce();
259 }
260 }
261
TYPED_TEST(ThreadPoolTest,WorkerThreadLocalRunWorksWithOtherPools)262 TYPED_TEST(ThreadPoolTest, WorkerThreadLocalRunWorksWithOtherPools) {
263 // WorkStealingThreadPools may queue work onto a thread-local queue, and that
264 // work may be stolen by other threads. This test tries to ensure that work
265 // queued from a pool-A worker-thread, to pool-B, does not end up on a pool-A
266 // queue.
267 constexpr size_t p1_run_iterations = 32;
268 constexpr size_t p2_run_iterations = 1000;
269 TypeParam p1(8);
270 TypeParam p2(8);
271 std::vector<gpr_thd_id> tid(p1_run_iterations);
272 std::atomic<size_t> iter_count{0};
273 grpc_core::Notification finished_all_iterations;
274 for (size_t p1_i = 0; p1_i < p1_run_iterations; p1_i++) {
275 p1.Run([&, p1_i, total_iterations = p1_run_iterations * p2_run_iterations] {
276 tid[p1_i] = gpr_thd_currentid();
277 for (size_t p2_i = 0; p2_i < p2_run_iterations; p2_i++) {
278 p2.Run([&, p1_i, total_iterations] {
279 EXPECT_NE(tid[p1_i], gpr_thd_currentid());
280 if (total_iterations == iter_count.fetch_add(1) + 1) {
281 finished_all_iterations.Notify();
282 }
283 });
284 }
285 });
286 }
287 finished_all_iterations.WaitForNotification();
288 p2.Quiesce();
289 p1.Quiesce();
290 }
291
TYPED_TEST(ThreadPoolTest,DISABLED_TestDumpStack)292 TYPED_TEST(ThreadPoolTest, DISABLED_TestDumpStack) {
293 TypeParam p1(8);
294 for (size_t i = 0; i < 8; i++) {
295 p1.Run([]() { absl::SleepFor(absl::Seconds(90)); });
296 }
297 absl::SleepFor(absl::Seconds(2));
298 p1.Quiesce();
299 }
300
301 class BusyThreadCountTest : public testing::Test {};
302
TEST_F(BusyThreadCountTest,StressTest)303 TEST_F(BusyThreadCountTest, StressTest) {
304 // Spawns a large number of threads to concurrently increments/decrement the
305 // counters, and request count totals. Magic numbers were tuned for tests to
306 // run in a reasonable amount of time.
307 constexpr size_t thread_count = 300;
308 constexpr int run_count = 1000;
309 constexpr int increment_by = 50;
310 BusyThreadCount busy_thread_count;
311 grpc_core::Notification stop_counting;
312 std::thread counter_thread([&]() {
313 while (!stop_counting.HasBeenNotified()) {
314 busy_thread_count.count();
315 }
316 });
317 std::vector<std::thread> threads;
318 threads.reserve(thread_count);
319 for (size_t i = 0; i < thread_count; i++) {
320 threads.emplace_back([&]() {
321 for (int j = 0; j < run_count; j++) {
322 // Get a new index for every iteration.
323 // This is not the intended use, but further stress tests the NextIndex
324 // function.
325 auto thread_idx = busy_thread_count.NextIndex();
326 for (int inc = 0; inc < increment_by; inc++) {
327 busy_thread_count.Increment(thread_idx);
328 }
329 for (int inc = 0; inc < increment_by; inc++) {
330 busy_thread_count.Decrement(thread_idx);
331 }
332 }
333 });
334 }
335 for (auto& thd : threads) thd.join();
336 stop_counting.Notify();
337 counter_thread.join();
338 ASSERT_EQ(busy_thread_count.count(), 0);
339 }
340
TEST_F(BusyThreadCountTest,AutoCountStressTest)341 TEST_F(BusyThreadCountTest, AutoCountStressTest) {
342 // Spawns a large number of threads to concurrently increments/decrement the
343 // counters, and request count totals. Magic numbers were tuned for tests to
344 // run in a reasonable amount of time.
345 constexpr size_t thread_count = 150;
346 constexpr int run_count = 1000;
347 constexpr int increment_by = 30;
348 BusyThreadCount busy_thread_count;
349 grpc_core::Notification stop_counting;
350 std::thread counter_thread([&]() {
351 while (!stop_counting.HasBeenNotified()) {
352 busy_thread_count.count();
353 }
354 });
355 std::vector<std::thread> threads;
356 threads.reserve(thread_count);
357 for (size_t i = 0; i < thread_count; i++) {
358 threads.emplace_back([&]() {
359 for (int j = 0; j < run_count; j++) {
360 std::vector<BusyThreadCount::AutoThreadCounter> auto_counters;
361 auto_counters.reserve(increment_by);
362 for (int ctr_count = 0; ctr_count < increment_by; ctr_count++) {
363 auto_counters.push_back(busy_thread_count.MakeAutoThreadCounter(
364 busy_thread_count.NextIndex()));
365 }
366 }
367 });
368 }
369 for (auto& thd : threads) thd.join();
370 stop_counting.Notify();
371 counter_thread.join();
372 ASSERT_EQ(busy_thread_count.count(), 0);
373 }
374
375 class LivingThreadCountTest : public testing::Test {};
376
TEST_F(LivingThreadCountTest,StressTest)377 TEST_F(LivingThreadCountTest, StressTest) {
378 // Spawns a large number of threads to concurrently increments/decrement the
379 // counters, and request count totals. Magic numbers were tuned for tests to
380 // run in a reasonable amount of time.
381 constexpr size_t thread_count = 50;
382 constexpr int run_count = 1000;
383 constexpr int increment_by = 10;
384 LivingThreadCount living_thread_count;
385 grpc_core::Notification stop_counting;
386 std::thread counter_thread([&]() {
387 while (!stop_counting.HasBeenNotified()) {
388 living_thread_count.count();
389 }
390 });
391 std::vector<std::thread> threads;
392 threads.reserve(thread_count);
393 for (size_t i = 0; i < thread_count; i++) {
394 threads.emplace_back([&]() {
395 for (int j = 0; j < run_count; j++) {
396 // Get a new index for every iteration.
397 // This is not the intended use, but further stress tests the NextIndex
398 // function.
399 for (int inc = 0; inc < increment_by; inc++) {
400 living_thread_count.Increment();
401 }
402 for (int inc = 0; inc < increment_by; inc++) {
403 living_thread_count.Decrement();
404 }
405 }
406 });
407 }
408 for (auto& thd : threads) thd.join();
409 stop_counting.Notify();
410 counter_thread.join();
411 ASSERT_EQ(living_thread_count.count(), 0);
412 }
413
TEST_F(LivingThreadCountTest,AutoCountStressTest)414 TEST_F(LivingThreadCountTest, AutoCountStressTest) {
415 // Spawns a large number of threads to concurrently increments/decrement the
416 // counters, and request count totals. Magic numbers were tuned for tests to
417 // run in a reasonable amount of time.
418 constexpr size_t thread_count = 50;
419 constexpr int run_count = 1000;
420 constexpr int increment_by = 10;
421 LivingThreadCount living_thread_count;
422 grpc_core::Notification stop_counting;
423 std::thread counter_thread([&]() {
424 while (!stop_counting.HasBeenNotified()) {
425 living_thread_count.count();
426 }
427 });
428 std::vector<std::thread> threads;
429 threads.reserve(thread_count);
430 for (size_t i = 0; i < thread_count; i++) {
431 threads.emplace_back([&]() {
432 for (int j = 0; j < run_count; j++) {
433 std::vector<LivingThreadCount::AutoThreadCounter> auto_counters;
434 auto_counters.reserve(increment_by);
435 for (int ctr_count = 0; ctr_count < increment_by; ctr_count++) {
436 auto_counters.push_back(living_thread_count.MakeAutoThreadCounter());
437 }
438 }
439 });
440 }
441 for (auto& thd : threads) thd.join();
442 stop_counting.Notify();
443 counter_thread.join();
444 ASSERT_EQ(living_thread_count.count(), 0);
445 }
446
TEST_F(LivingThreadCountTest,BlockUntilThreadCountTest)447 TEST_F(LivingThreadCountTest, BlockUntilThreadCountTest) {
448 constexpr size_t thread_count = 100;
449 grpc_core::Notification waiting;
450 LivingThreadCount living_thread_count;
451 std::vector<std::thread> threads;
452 threads.reserve(thread_count);
453 // Start N living threads
454 for (size_t i = 0; i < thread_count; i++) {
455 threads.emplace_back([&]() {
456 auto alive = living_thread_count.MakeAutoThreadCounter();
457 waiting.WaitForNotification();
458 });
459 }
460 // Join in a separate thread
461 std::thread joiner([&]() {
462 waiting.Notify();
463 for (auto& thd : threads) thd.join();
464 });
465 {
466 auto alive = living_thread_count.MakeAutoThreadCounter();
467 std::ignore = living_thread_count.BlockUntilThreadCount(
468 1, "block until 1 thread remains", grpc_core::Duration::Infinity());
469 }
470 std::ignore = living_thread_count.BlockUntilThreadCount(
471 0, "block until all threads are gone", grpc_core::Duration::Infinity());
472 joiner.join();
473 ASSERT_EQ(living_thread_count.count(), 0);
474 }
475
476 } // namespace experimental
477 } // namespace grpc_event_engine
478
main(int argc,char ** argv)479 int main(int argc, char** argv) {
480 ::testing::InitGoogleTest(&argc, argv);
481 grpc::testing::TestEnvironment env(&argc, argv);
482 grpc_init();
483 auto result = RUN_ALL_TESTS();
484 grpc_shutdown();
485 return result;
486 }
487