• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2019 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/util/work_serializer.h"
20 
21 #include <grpc/grpc.h>
22 #include <grpc/support/sync.h>
23 #include <grpc/support/time.h>
24 #include <stddef.h>
25 
26 #include <memory>
27 #include <thread>
28 #include <utility>
29 #include <vector>
30 
31 #include "absl/functional/any_invocable.h"
32 #include "absl/synchronization/barrier.h"
33 #include "absl/time/clock.h"
34 #include "absl/time/time.h"
35 #include "gtest/gtest.h"
36 #include "src/core/lib/event_engine/default_event_engine.h"
37 #include "src/core/lib/experiments/experiments.h"
38 #include "src/core/lib/iomgr/exec_ctx.h"
39 #include "src/core/telemetry/histogram_view.h"
40 #include "src/core/telemetry/stats.h"
41 #include "src/core/telemetry/stats_data.h"
42 #include "src/core/util/notification.h"
43 #include "src/core/util/thd.h"
44 #include "test/core/event_engine/event_engine_test_utils.h"
45 #include "test/core/test_util/test_config.h"
46 
47 using grpc_event_engine::experimental::GetDefaultEventEngine;
48 using grpc_event_engine::experimental::WaitForSingleOwner;
49 
50 namespace grpc_core {
51 namespace {
TEST(WorkSerializerTest,NoOp)52 TEST(WorkSerializerTest, NoOp) {
53   auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
54   lock.reset();
55   WaitForSingleOwner(GetDefaultEventEngine());
56 }
57 
TEST(WorkSerializerTest,ExecuteOneRun)58 TEST(WorkSerializerTest, ExecuteOneRun) {
59   auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
60   gpr_event done;
61   gpr_event_init(&done);
62   lock->Run([&done]() { gpr_event_set(&done, reinterpret_cast<void*>(1)); },
63             DEBUG_LOCATION);
64   EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
65               nullptr);
66   lock.reset();
67   WaitForSingleOwner(GetDefaultEventEngine());
68 }
69 
TEST(WorkSerializerTest,ExecuteOneScheduleAndDrain)70 TEST(WorkSerializerTest, ExecuteOneScheduleAndDrain) {
71   auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
72   gpr_event done;
73   gpr_event_init(&done);
74   lock->Schedule(
75       [&done]() {
76         EXPECT_EQ(gpr_event_get(&done), nullptr);
77         gpr_event_set(&done, reinterpret_cast<void*>(1));
78       },
79       DEBUG_LOCATION);
80   lock->DrainQueue();
81   EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
82               nullptr);
83   lock.reset();
84   WaitForSingleOwner(GetDefaultEventEngine());
85 }
86 
87 class TestThread {
88  public:
TestThread(WorkSerializer * lock)89   explicit TestThread(WorkSerializer* lock)
90       : lock_(lock), thread_("grpc_execute_many", ExecuteManyLoop, this) {
91     gpr_event_init(&done_);
92     thread_.Start();
93   }
94 
~TestThread()95   ~TestThread() {
96     EXPECT_NE(gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME)),
97               nullptr);
98     thread_.Join();
99   }
100 
101  private:
ExecuteManyLoop(void * arg)102   static void ExecuteManyLoop(void* arg) {
103     TestThread* self = static_cast<TestThread*>(arg);
104     size_t n = 1;
105     for (size_t i = 0; i < 10; i++) {
106       for (size_t j = 0; j < 10000; j++) {
107         struct ExecutionArgs {
108           size_t* counter;
109           size_t value;
110         };
111         ExecutionArgs* c = new ExecutionArgs;
112         c->counter = &self->counter_;
113         c->value = n++;
114         self->lock_->Run(
115             [c]() {
116               EXPECT_TRUE(*c->counter == c->value - 1);
117               *c->counter = c->value;
118               delete c;
119             },
120             DEBUG_LOCATION);
121       }
122       // sleep for a little bit, to test other threads picking up the load
123       gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
124     }
125     self->lock_->Run(
126         [self]() { gpr_event_set(&self->done_, reinterpret_cast<void*>(1)); },
127         DEBUG_LOCATION);
128   }
129 
130   WorkSerializer* lock_ = nullptr;
131   Thread thread_;
132   size_t counter_ = 0;
133   gpr_event done_;
134 };
135 
TEST(WorkSerializerTest,ExecuteMany)136 TEST(WorkSerializerTest, ExecuteMany) {
137   auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
138   {
139     std::vector<std::unique_ptr<TestThread>> threads;
140     for (size_t i = 0; i < 10; ++i) {
141       threads.push_back(std::make_unique<TestThread>(lock.get()));
142     }
143   }
144   lock.reset();
145   WaitForSingleOwner(GetDefaultEventEngine());
146 }
147 
148 class TestThreadScheduleAndDrain {
149  public:
TestThreadScheduleAndDrain(WorkSerializer * lock)150   explicit TestThreadScheduleAndDrain(WorkSerializer* lock)
151       : lock_(lock), thread_("grpc_execute_many", ExecuteManyLoop, this) {
152     gpr_event_init(&done_);
153     thread_.Start();
154   }
155 
~TestThreadScheduleAndDrain()156   ~TestThreadScheduleAndDrain() {
157     EXPECT_NE(gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME)),
158               nullptr);
159     thread_.Join();
160   }
161 
162  private:
ExecuteManyLoop(void * arg)163   static void ExecuteManyLoop(void* arg) {
164     TestThreadScheduleAndDrain* self =
165         static_cast<TestThreadScheduleAndDrain*>(arg);
166     size_t n = 1;
167     for (size_t i = 0; i < 10; i++) {
168       for (size_t j = 0; j < 10000; j++) {
169         struct ExecutionArgs {
170           size_t* counter;
171           size_t value;
172         };
173         ExecutionArgs* c = new ExecutionArgs;
174         c->counter = &self->counter_;
175         c->value = n++;
176         self->lock_->Schedule(
177             [c]() {
178               EXPECT_TRUE(*c->counter == c->value - 1);
179               *c->counter = c->value;
180               delete c;
181             },
182             DEBUG_LOCATION);
183       }
184       self->lock_->DrainQueue();
185       // sleep for a little bit, to test other threads picking up the load
186       gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
187     }
188     self->lock_->Run(
189         [self]() { gpr_event_set(&self->done_, reinterpret_cast<void*>(1)); },
190         DEBUG_LOCATION);
191   }
192 
193   WorkSerializer* lock_ = nullptr;
194   Thread thread_;
195   size_t counter_ = 0;
196   gpr_event done_;
197 };
198 
TEST(WorkSerializerTest,ExecuteManyScheduleAndDrain)199 TEST(WorkSerializerTest, ExecuteManyScheduleAndDrain) {
200   auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
201   {
202     std::vector<std::unique_ptr<TestThreadScheduleAndDrain>> threads;
203     for (size_t i = 0; i < 10; ++i) {
204       threads.push_back(
205           std::make_unique<TestThreadScheduleAndDrain>(lock.get()));
206     }
207   }
208   lock.reset();
209   WaitForSingleOwner(GetDefaultEventEngine());
210 }
211 
TEST(WorkSerializerTest,ExecuteManyMixedRunScheduleAndDrain)212 TEST(WorkSerializerTest, ExecuteManyMixedRunScheduleAndDrain) {
213   auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
214   {
215     std::vector<std::unique_ptr<TestThread>> run_threads;
216     std::vector<std::unique_ptr<TestThreadScheduleAndDrain>> schedule_threads;
217     for (size_t i = 0; i < 10; ++i) {
218       run_threads.push_back(std::make_unique<TestThread>(lock.get()));
219       schedule_threads.push_back(
220           std::make_unique<TestThreadScheduleAndDrain>(lock.get()));
221     }
222   }
223   lock.reset();
224   WaitForSingleOwner(GetDefaultEventEngine());
225 }
226 
227 // Tests that work serializers allow destruction from the last callback
TEST(WorkSerializerTest,CallbackDestroysWorkSerializer)228 TEST(WorkSerializerTest, CallbackDestroysWorkSerializer) {
229   auto lock = std::make_shared<WorkSerializer>(GetDefaultEventEngine());
230   lock->Run([&]() { lock.reset(); }, DEBUG_LOCATION);
231   WaitForSingleOwner(GetDefaultEventEngine());
232 }
233 
234 // Tests additional racy conditions when the last callback triggers work
235 // serializer destruction.
TEST(WorkSerializerTest,WorkSerializerDestructionRace)236 TEST(WorkSerializerTest, WorkSerializerDestructionRace) {
237   for (int i = 0; i < 1000; ++i) {
238     auto lock = std::make_shared<WorkSerializer>(GetDefaultEventEngine());
239     Notification notification;
240     std::thread t1([&]() {
241       notification.WaitForNotification();
242       lock.reset();
243     });
244     lock->Run([&]() { notification.Notify(); }, DEBUG_LOCATION);
245     t1.join();
246   }
247   WaitForSingleOwner(GetDefaultEventEngine());
248 }
249 
250 // Tests racy conditions when the last callback triggers work
251 // serializer destruction.
TEST(WorkSerializerTest,WorkSerializerDestructionRaceMultipleThreads)252 TEST(WorkSerializerTest, WorkSerializerDestructionRaceMultipleThreads) {
253   auto lock = std::make_shared<WorkSerializer>(GetDefaultEventEngine());
254   absl::Barrier barrier(11);
255   std::vector<std::thread> threads;
256   threads.reserve(10);
257   for (int i = 0; i < 10; ++i) {
258     threads.emplace_back([lock, &barrier]() mutable {
259       barrier.Block();
260       lock->Run([lock]() mutable { lock.reset(); }, DEBUG_LOCATION);
261     });
262   }
263   barrier.Block();
264   lock.reset();
265   for (auto& thread : threads) {
266     thread.join();
267   }
268   WaitForSingleOwner(GetDefaultEventEngine());
269 }
270 
TEST(WorkSerializerTest,MetricsWork)271 TEST(WorkSerializerTest, MetricsWork) {
272   if (!IsWorkSerializerDispatchEnabled()) {
273     GTEST_SKIP() << "Work serializer dispatch experiment not enabled";
274   }
275 
276   auto serializer = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
277   auto schedule_sleep = [&serializer](absl::Duration how_long) {
278     ExecCtx exec_ctx;
279     auto n = std::make_shared<Notification>();
280     serializer->Run(
281         [how_long, n]() {
282           absl::SleepFor(how_long);
283           n->Notify();
284         },
285         DEBUG_LOCATION);
286     return n;
287   };
288   auto before = global_stats().Collect();
289   auto stats_diff_from = [&before](absl::AnyInvocable<void()> f) {
290     f();
291     // Insert a pause for the work serialier to update the stats. Reading stats
292     // here can still race with the work serializer's update attempt.
293     gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
294     auto after = global_stats().Collect();
295     auto diff = after->Diff(*before);
296     before = std::move(after);
297     return diff;
298   };
299   // Test adding one work item to the queue
300   auto diff = stats_diff_from(
301       [&] { schedule_sleep(absl::Seconds(1))->WaitForNotification(); });
302   EXPECT_EQ(diff->work_serializer_items_enqueued, 1);
303   EXPECT_EQ(diff->work_serializer_items_dequeued, 1);
304   EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
305                 .Percentile(0.5),
306             1.0);
307   EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
308                 .Percentile(0.5),
309             2.0);
310   EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
311                 .Percentile(0.5),
312             800.0);
313   EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
314                 .Percentile(0.5),
315             1300.0);
316   EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
317                 .Percentile(0.5),
318             800.0);
319   EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
320                 .Percentile(0.5),
321             1300.0);
322   EXPECT_GE(
323       diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
324           .Percentile(0.5),
325       800.0);
326   EXPECT_LE(
327       diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
328           .Percentile(0.5),
329       1300.0);
330   EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
331                 .Percentile(0.5),
332             diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
333                 .Percentile(0.5));
334   // Now throw a bunch of work in and see that we get good results
335   diff = stats_diff_from([&] {
336     for (int i = 0; i < 10; i++) {
337       schedule_sleep(absl::Milliseconds(1000));
338     }
339     schedule_sleep(absl::Milliseconds(1000))->WaitForNotification();
340   });
341   EXPECT_EQ(diff->work_serializer_items_enqueued, 11);
342   EXPECT_EQ(diff->work_serializer_items_dequeued, 11);
343   EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
344                 .Percentile(0.5),
345             7.0);
346   EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
347                 .Percentile(0.5),
348             15.0);
349   EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
350                 .Percentile(0.5),
351             7000.0);
352   EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
353                 .Percentile(0.5),
354             15000.0);
355   EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
356                 .Percentile(0.5),
357             7000.0);
358   EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
359                 .Percentile(0.5),
360             15000.0);
361   EXPECT_GE(
362       diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
363           .Percentile(0.5),
364       800.0);
365   EXPECT_LE(
366       diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
367           .Percentile(0.5),
368       1300.0);
369   EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
370                 .Percentile(0.5),
371             diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
372                 .Percentile(0.5));
373 
374   serializer.reset();
375   WaitForSingleOwner(GetDefaultEventEngine());
376 }
377 
378 #ifndef NDEBUG
TEST(WorkSerializerTest,RunningInWorkSerializer)379 TEST(WorkSerializerTest, RunningInWorkSerializer) {
380   auto work_serializer1 =
381       std::make_shared<WorkSerializer>(GetDefaultEventEngine());
382   auto work_serializer2 =
383       std::make_shared<WorkSerializer>(GetDefaultEventEngine());
384   EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
385   EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
386   work_serializer1->Run(
387       [=]() {
388         EXPECT_TRUE(work_serializer1->RunningInWorkSerializer());
389         EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
390         work_serializer2->Run(
391             [=]() {
392               EXPECT_EQ(work_serializer1->RunningInWorkSerializer(),
393                         !IsWorkSerializerDispatchEnabled());
394               EXPECT_TRUE(work_serializer2->RunningInWorkSerializer());
395             },
396             DEBUG_LOCATION);
397       },
398       DEBUG_LOCATION);
399   EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
400   EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
401   work_serializer2->Run(
402       [=]() {
403         EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
404         EXPECT_TRUE(work_serializer2->RunningInWorkSerializer());
405         work_serializer1->Run(
406             [=]() {
407               EXPECT_TRUE(work_serializer1->RunningInWorkSerializer());
408               EXPECT_EQ(work_serializer2->RunningInWorkSerializer(),
409                         !IsWorkSerializerDispatchEnabled());
410             },
411             DEBUG_LOCATION);
412       },
413       DEBUG_LOCATION);
414   EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
415   EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
416   Notification done1;
417   Notification done2;
418   work_serializer1->Run([&done1]() { done1.Notify(); }, DEBUG_LOCATION);
419   work_serializer2->Run([&done2]() { done2.Notify(); }, DEBUG_LOCATION);
420   done1.WaitForNotification();
421   done2.WaitForNotification();
422   work_serializer1.reset();
423   work_serializer2.reset();
424   WaitForSingleOwner(GetDefaultEventEngine());
425 }
426 #endif
427 
428 }  // namespace
429 }  // namespace grpc_core
430 
main(int argc,char ** argv)431 int main(int argc, char** argv) {
432   grpc::testing::TestEnvironment env(&argc, argv);
433   ::testing::InitGoogleTest(&argc, argv);
434   grpc_init();
435   int retval = RUN_ALL_TESTS();
436   grpc_shutdown();
437   return retval;
438 }
439