• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
15 
16 #include <grpc/grpc.h>
17 #include <grpc/support/thd_id.h>
18 
19 #include <atomic>
20 #include <chrono>
21 #include <cmath>
22 #include <cstddef>
23 #include <functional>
24 #include <memory>
25 #include <thread>
26 #include <tuple>
27 #include <vector>
28 
29 #include "absl/time/clock.h"
30 #include "absl/time/time.h"
31 #include "gtest/gtest.h"
32 #include "src/core/lib/event_engine/thread_pool/thread_count.h"
33 #include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h"
34 #include "src/core/util/notification.h"
35 #include "src/core/util/thd.h"
36 #include "src/core/util/time.h"
37 #include "test/core/test_util/test_config.h"
38 
39 namespace grpc_event_engine {
40 namespace experimental {
41 
42 template <typename T>
43 class ThreadPoolTest : public testing::Test {};
44 
45 using ThreadPoolTypes = ::testing::Types<WorkStealingThreadPool>;
46 TYPED_TEST_SUITE(ThreadPoolTest, ThreadPoolTypes);
47 
TYPED_TEST(ThreadPoolTest,CanRunAnyInvocable)48 TYPED_TEST(ThreadPoolTest, CanRunAnyInvocable) {
49   TypeParam p(8);
50   grpc_core::Notification n;
51   p.Run([&n] { n.Notify(); });
52   n.WaitForNotification();
53   p.Quiesce();
54 }
55 
TYPED_TEST(ThreadPoolTest,CanDestroyInsideClosure)56 TYPED_TEST(ThreadPoolTest, CanDestroyInsideClosure) {
57   auto* p = new TypeParam(8);
58   grpc_core::Notification n;
59   p->Run([p, &n]() mutable {
60     // This should delete the thread pool and not deadlock
61     p->Quiesce();
62     delete p;
63     n.Notify();
64   });
65   n.WaitForNotification();
66 }
67 
TYPED_TEST(ThreadPoolTest,CanSurviveFork)68 TYPED_TEST(ThreadPoolTest, CanSurviveFork) {
69   TypeParam p(8);
70   grpc_core::Notification inner_closure_ran;
71   p.Run([&inner_closure_ran, &p] {
72     std::this_thread::sleep_for(std::chrono::seconds(1));
73     p.Run([&inner_closure_ran] {
74       std::this_thread::sleep_for(std::chrono::seconds(1));
75       inner_closure_ran.Notify();
76     });
77   });
78   // simulate a fork and watch the child process
79   p.PrepareFork();
80   p.PostforkChild();
81   inner_closure_ran.WaitForNotification();
82   grpc_core::Notification n2;
83   p.Run([&n2] { n2.Notify(); });
84   n2.WaitForNotification();
85   p.Quiesce();
86 }
87 
TYPED_TEST(ThreadPoolTest,ForkStressTest)88 TYPED_TEST(ThreadPoolTest, ForkStressTest) {
89   // Runs a large number of closures and multiple simulated fork events,
90   // ensuring that only some fixed number of closures are executed between fork
91   // events.
92   //
93   // Why: Python relies on fork support, and fork behaves poorly in the presence
94   // of threads, but non-deterministically. gRPC has had problems in this space.
95   // This test exercises a subset of the fork logic, the pieces we can control
96   // without an actual OS fork.
97   constexpr int expected_runcount = 1000;
98   constexpr absl::Duration fork_frequency{absl::Milliseconds(50)};
99   constexpr int num_closures_between_forks{100};
100   TypeParam pool(8);
101   std::atomic<int> runcount{0};
102   std::atomic<int> fork_count{0};
103   std::function<void()> inner_fn;
104   inner_fn = [&]() {
105     auto curr_runcount = runcount.load(std::memory_order_relaxed);
106     // exit when the right number of closures have run, with some flex for
107     // relaxed atomics.
108     if (curr_runcount >= expected_runcount) return;
109     if (fork_count.load(std::memory_order_relaxed) *
110             num_closures_between_forks <=
111         curr_runcount) {
112       // skip incrementing, and schedule again.
113       pool.Run(inner_fn);
114       return;
115     }
116     runcount.fetch_add(1, std::memory_order_relaxed);
117   };
118   for (auto i = 0; i < expected_runcount; i++) {
119     pool.Run(inner_fn);
120   }
121   // simulate multiple forks at a fixed frequency
122   int curr_runcount = 0;
123   while (curr_runcount < expected_runcount) {
124     absl::SleepFor(fork_frequency);
125     curr_runcount = runcount.load(std::memory_order_relaxed);
126     int curr_forkcount = fork_count.load(std::memory_order_relaxed);
127     if (curr_forkcount * num_closures_between_forks > curr_runcount) {
128       continue;
129     }
130     pool.PrepareFork();
131     pool.PostforkChild();
132     fork_count.fetch_add(1);
133   }
134   ASSERT_GE(fork_count.load(), expected_runcount / num_closures_between_forks);
135   // owners are the local pool, and the copy inside `inner_fn`.
136   pool.Quiesce();
137 }
138 
TYPED_TEST(ThreadPoolTest,StartQuiesceRaceStressTest)139 TYPED_TEST(ThreadPoolTest, StartQuiesceRaceStressTest) {
140   // Repeatedly race Start and Quiesce against each other to ensure thread
141   // safety.
142   constexpr int iter_count = 500;
143   struct ThdState {
144     std::unique_ptr<TypeParam> pool;
145     int i;
146   };
147   for (auto i = 0; i < iter_count; i++) {
148     ThdState state{std::make_unique<TypeParam>(8), i};
149     state.pool->PrepareFork();
150     grpc_core::Thread t1(
151         "t1",
152         [](void* arg) {
153           ThdState* state = static_cast<ThdState*>(arg);
154           state->i % 2 == 0 ? state->pool->Quiesce()
155                             : state->pool->PostforkParent();
156         },
157         &state, nullptr,
158         grpc_core::Thread::Options().set_tracked(false).set_joinable(true));
159     grpc_core::Thread t2(
160         "t2",
161         [](void* arg) {
162           ThdState* state = static_cast<ThdState*>(arg);
163           state->i % 2 == 1 ? state->pool->Quiesce()
164                             : state->pool->PostforkParent();
165         },
166         &state, nullptr,
167         grpc_core::Thread::Options().set_tracked(false).set_joinable(true));
168     t1.Start();
169     t2.Start();
170     t1.Join();
171     t2.Join();
172   }
173 }
174 
ScheduleSelf(ThreadPool * p)175 void ScheduleSelf(ThreadPool* p) {
176   p->Run([p] { ScheduleSelf(p); });
177 }
178 
ScheduleTwiceUntilZero(ThreadPool * p,std::atomic<int> & runcount,int n)179 void ScheduleTwiceUntilZero(ThreadPool* p, std::atomic<int>& runcount, int n) {
180   runcount.fetch_add(1);
181   if (n == 0) return;
182   p->Run([p, &runcount, n] {
183     ScheduleTwiceUntilZero(p, runcount, n - 1);
184     ScheduleTwiceUntilZero(p, runcount, n - 1);
185   });
186 }
187 
TYPED_TEST(ThreadPoolTest,CanStartLotsOfClosures)188 TYPED_TEST(ThreadPoolTest, CanStartLotsOfClosures) {
189   TypeParam p(8);
190   std::atomic<int> runcount{0};
191   int branch_factor = 20;
192   ScheduleTwiceUntilZero(&p, runcount, branch_factor);
193   p.Quiesce();
194   ASSERT_EQ(runcount.load(), pow(2, branch_factor + 1) - 1);
195 }
196 
TYPED_TEST(ThreadPoolTest,ScalesWhenBackloggedFromGlobalQueue)197 TYPED_TEST(ThreadPoolTest, ScalesWhenBackloggedFromGlobalQueue) {
198   int pool_thread_count = 8;
199   TypeParam p(pool_thread_count);
200   grpc_core::Notification signal;
201   // Ensures the pool is saturated before signaling closures to continue.
202   std::atomic<int> waiters{0};
203   std::atomic<bool> signaled{false};
204   for (auto i = 0; i < pool_thread_count; i++) {
205     p.Run([&]() {
206       waiters.fetch_add(1);
207       while (!signaled.load()) {
208         signal.WaitForNotification();
209       }
210     });
211   }
212   while (waiters.load() != pool_thread_count) {
213     absl::SleepFor(absl::Milliseconds(50));
214   }
215   p.Run([&]() {
216     signaled.store(true);
217     signal.Notify();
218   });
219   p.Quiesce();
220 }
221 
TYPED_TEST(ThreadPoolTest,ScalesWhenBackloggedFromSingleThreadLocalQueue)222 TYPED_TEST(ThreadPoolTest, ScalesWhenBackloggedFromSingleThreadLocalQueue) {
223   constexpr int pool_thread_count = 8;
224   TypeParam p(pool_thread_count);
225   grpc_core::Notification signal;
226   // Ensures the pool is saturated before signaling closures to continue.
227   std::atomic<int> waiters{0};
228   std::atomic<bool> signaled{false};
229   p.Run([&]() {
230     for (int i = 0; i < pool_thread_count; i++) {
231       p.Run([&]() {
232         waiters.fetch_add(1);
233         while (!signaled.load()) {
234           signal.WaitForNotification();
235         }
236       });
237     }
238     while (waiters.load() != pool_thread_count) {
239       absl::SleepFor(absl::Milliseconds(50));
240     }
241     p.Run([&]() {
242       signaled.store(true);
243       signal.Notify();
244     });
245   });
246   p.Quiesce();
247 }
248 
TYPED_TEST(ThreadPoolTest,QuiesceRaceStressTest)249 TYPED_TEST(ThreadPoolTest, QuiesceRaceStressTest) {
250   constexpr int cycle_count = 333;
251   constexpr int thread_count = 8;
252   constexpr int run_count = thread_count * 2;
253   for (auto i = 0; i < cycle_count; i++) {
254     TypeParam p(thread_count);
255     for (auto j = 0; j < run_count; j++) {
256       p.Run([]() {});
257     }
258     p.Quiesce();
259   }
260 }
261 
TYPED_TEST(ThreadPoolTest,WorkerThreadLocalRunWorksWithOtherPools)262 TYPED_TEST(ThreadPoolTest, WorkerThreadLocalRunWorksWithOtherPools) {
263   // WorkStealingThreadPools may queue work onto a thread-local queue, and that
264   // work may be stolen by other threads. This test tries to ensure that work
265   // queued from a pool-A worker-thread, to pool-B, does not end up on a pool-A
266   // queue.
267   constexpr size_t p1_run_iterations = 32;
268   constexpr size_t p2_run_iterations = 1000;
269   TypeParam p1(8);
270   TypeParam p2(8);
271   std::vector<gpr_thd_id> tid(p1_run_iterations);
272   std::atomic<size_t> iter_count{0};
273   grpc_core::Notification finished_all_iterations;
274   for (size_t p1_i = 0; p1_i < p1_run_iterations; p1_i++) {
275     p1.Run([&, p1_i, total_iterations = p1_run_iterations * p2_run_iterations] {
276       tid[p1_i] = gpr_thd_currentid();
277       for (size_t p2_i = 0; p2_i < p2_run_iterations; p2_i++) {
278         p2.Run([&, p1_i, total_iterations] {
279           EXPECT_NE(tid[p1_i], gpr_thd_currentid());
280           if (total_iterations == iter_count.fetch_add(1) + 1) {
281             finished_all_iterations.Notify();
282           }
283         });
284       }
285     });
286   }
287   finished_all_iterations.WaitForNotification();
288   p2.Quiesce();
289   p1.Quiesce();
290 }
291 
TYPED_TEST(ThreadPoolTest,DISABLED_TestDumpStack)292 TYPED_TEST(ThreadPoolTest, DISABLED_TestDumpStack) {
293   TypeParam p1(8);
294   for (size_t i = 0; i < 8; i++) {
295     p1.Run([]() { absl::SleepFor(absl::Seconds(90)); });
296   }
297   absl::SleepFor(absl::Seconds(2));
298   p1.Quiesce();
299 }
300 
301 class BusyThreadCountTest : public testing::Test {};
302 
TEST_F(BusyThreadCountTest,StressTest)303 TEST_F(BusyThreadCountTest, StressTest) {
304   // Spawns a large number of threads to concurrently increments/decrement the
305   // counters, and request count totals. Magic numbers were tuned for tests to
306   // run in a reasonable amount of time.
307   constexpr size_t thread_count = 300;
308   constexpr int run_count = 1000;
309   constexpr int increment_by = 50;
310   BusyThreadCount busy_thread_count;
311   grpc_core::Notification stop_counting;
312   std::thread counter_thread([&]() {
313     while (!stop_counting.HasBeenNotified()) {
314       busy_thread_count.count();
315     }
316   });
317   std::vector<std::thread> threads;
318   threads.reserve(thread_count);
319   for (size_t i = 0; i < thread_count; i++) {
320     threads.emplace_back([&]() {
321       for (int j = 0; j < run_count; j++) {
322         // Get a new index for every iteration.
323         // This is not the intended use, but further stress tests the NextIndex
324         // function.
325         auto thread_idx = busy_thread_count.NextIndex();
326         for (int inc = 0; inc < increment_by; inc++) {
327           busy_thread_count.Increment(thread_idx);
328         }
329         for (int inc = 0; inc < increment_by; inc++) {
330           busy_thread_count.Decrement(thread_idx);
331         }
332       }
333     });
334   }
335   for (auto& thd : threads) thd.join();
336   stop_counting.Notify();
337   counter_thread.join();
338   ASSERT_EQ(busy_thread_count.count(), 0);
339 }
340 
TEST_F(BusyThreadCountTest,AutoCountStressTest)341 TEST_F(BusyThreadCountTest, AutoCountStressTest) {
342   // Spawns a large number of threads to concurrently increments/decrement the
343   // counters, and request count totals. Magic numbers were tuned for tests to
344   // run in a reasonable amount of time.
345   constexpr size_t thread_count = 150;
346   constexpr int run_count = 1000;
347   constexpr int increment_by = 30;
348   BusyThreadCount busy_thread_count;
349   grpc_core::Notification stop_counting;
350   std::thread counter_thread([&]() {
351     while (!stop_counting.HasBeenNotified()) {
352       busy_thread_count.count();
353     }
354   });
355   std::vector<std::thread> threads;
356   threads.reserve(thread_count);
357   for (size_t i = 0; i < thread_count; i++) {
358     threads.emplace_back([&]() {
359       for (int j = 0; j < run_count; j++) {
360         std::vector<BusyThreadCount::AutoThreadCounter> auto_counters;
361         auto_counters.reserve(increment_by);
362         for (int ctr_count = 0; ctr_count < increment_by; ctr_count++) {
363           auto_counters.push_back(busy_thread_count.MakeAutoThreadCounter(
364               busy_thread_count.NextIndex()));
365         }
366       }
367     });
368   }
369   for (auto& thd : threads) thd.join();
370   stop_counting.Notify();
371   counter_thread.join();
372   ASSERT_EQ(busy_thread_count.count(), 0);
373 }
374 
375 class LivingThreadCountTest : public testing::Test {};
376 
TEST_F(LivingThreadCountTest,StressTest)377 TEST_F(LivingThreadCountTest, StressTest) {
378   // Spawns a large number of threads to concurrently increments/decrement the
379   // counters, and request count totals. Magic numbers were tuned for tests to
380   // run in a reasonable amount of time.
381   constexpr size_t thread_count = 50;
382   constexpr int run_count = 1000;
383   constexpr int increment_by = 10;
384   LivingThreadCount living_thread_count;
385   grpc_core::Notification stop_counting;
386   std::thread counter_thread([&]() {
387     while (!stop_counting.HasBeenNotified()) {
388       living_thread_count.count();
389     }
390   });
391   std::vector<std::thread> threads;
392   threads.reserve(thread_count);
393   for (size_t i = 0; i < thread_count; i++) {
394     threads.emplace_back([&]() {
395       for (int j = 0; j < run_count; j++) {
396         // Get a new index for every iteration.
397         // This is not the intended use, but further stress tests the NextIndex
398         // function.
399         for (int inc = 0; inc < increment_by; inc++) {
400           living_thread_count.Increment();
401         }
402         for (int inc = 0; inc < increment_by; inc++) {
403           living_thread_count.Decrement();
404         }
405       }
406     });
407   }
408   for (auto& thd : threads) thd.join();
409   stop_counting.Notify();
410   counter_thread.join();
411   ASSERT_EQ(living_thread_count.count(), 0);
412 }
413 
TEST_F(LivingThreadCountTest,AutoCountStressTest)414 TEST_F(LivingThreadCountTest, AutoCountStressTest) {
415   // Spawns a large number of threads to concurrently increments/decrement the
416   // counters, and request count totals. Magic numbers were tuned for tests to
417   // run in a reasonable amount of time.
418   constexpr size_t thread_count = 50;
419   constexpr int run_count = 1000;
420   constexpr int increment_by = 10;
421   LivingThreadCount living_thread_count;
422   grpc_core::Notification stop_counting;
423   std::thread counter_thread([&]() {
424     while (!stop_counting.HasBeenNotified()) {
425       living_thread_count.count();
426     }
427   });
428   std::vector<std::thread> threads;
429   threads.reserve(thread_count);
430   for (size_t i = 0; i < thread_count; i++) {
431     threads.emplace_back([&]() {
432       for (int j = 0; j < run_count; j++) {
433         std::vector<LivingThreadCount::AutoThreadCounter> auto_counters;
434         auto_counters.reserve(increment_by);
435         for (int ctr_count = 0; ctr_count < increment_by; ctr_count++) {
436           auto_counters.push_back(living_thread_count.MakeAutoThreadCounter());
437         }
438       }
439     });
440   }
441   for (auto& thd : threads) thd.join();
442   stop_counting.Notify();
443   counter_thread.join();
444   ASSERT_EQ(living_thread_count.count(), 0);
445 }
446 
TEST_F(LivingThreadCountTest,BlockUntilThreadCountTest)447 TEST_F(LivingThreadCountTest, BlockUntilThreadCountTest) {
448   constexpr size_t thread_count = 100;
449   grpc_core::Notification waiting;
450   LivingThreadCount living_thread_count;
451   std::vector<std::thread> threads;
452   threads.reserve(thread_count);
453   // Start N living threads
454   for (size_t i = 0; i < thread_count; i++) {
455     threads.emplace_back([&]() {
456       auto alive = living_thread_count.MakeAutoThreadCounter();
457       waiting.WaitForNotification();
458     });
459   }
460   // Join in a separate thread
461   std::thread joiner([&]() {
462     waiting.Notify();
463     for (auto& thd : threads) thd.join();
464   });
465   {
466     auto alive = living_thread_count.MakeAutoThreadCounter();
467     std::ignore = living_thread_count.BlockUntilThreadCount(
468         1, "block until 1 thread remains", grpc_core::Duration::Infinity());
469   }
470   std::ignore = living_thread_count.BlockUntilThreadCount(
471       0, "block until all threads are gone", grpc_core::Duration::Infinity());
472   joiner.join();
473   ASSERT_EQ(living_thread_count.count(), 0);
474 }
475 
476 }  // namespace experimental
477 }  // namespace grpc_event_engine
478 
main(int argc,char ** argv)479 int main(int argc, char** argv) {
480   ::testing::InitGoogleTest(&argc, argv);
481   grpc::testing::TestEnvironment env(&argc, argv);
482   grpc_init();
483   auto result = RUN_ALL_TESTS();
484   grpc_shutdown();
485   return result;
486 }
487