1 //
2 //
3 // Copyright 2019 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/util/work_serializer.h"
20
21 #include <grpc/grpc.h>
22 #include <grpc/support/sync.h>
23 #include <grpc/support/time.h>
24 #include <stddef.h>
25
26 #include <memory>
27 #include <thread>
28 #include <utility>
29 #include <vector>
30
31 #include "absl/functional/any_invocable.h"
32 #include "absl/synchronization/barrier.h"
33 #include "absl/time/clock.h"
34 #include "absl/time/time.h"
35 #include "gtest/gtest.h"
36 #include "src/core/lib/event_engine/default_event_engine.h"
37 #include "src/core/lib/experiments/experiments.h"
38 #include "src/core/lib/iomgr/exec_ctx.h"
39 #include "src/core/telemetry/histogram_view.h"
40 #include "src/core/telemetry/stats.h"
41 #include "src/core/telemetry/stats_data.h"
42 #include "src/core/util/notification.h"
43 #include "src/core/util/thd.h"
44 #include "test/core/event_engine/event_engine_test_utils.h"
45 #include "test/core/test_util/test_config.h"
46
47 using grpc_event_engine::experimental::GetDefaultEventEngine;
48 using grpc_event_engine::experimental::WaitForSingleOwner;
49
50 namespace grpc_core {
51 namespace {
TEST(WorkSerializerTest,NoOp)52 TEST(WorkSerializerTest, NoOp) {
53 auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
54 lock.reset();
55 WaitForSingleOwner(GetDefaultEventEngine());
56 }
57
TEST(WorkSerializerTest,ExecuteOneRun)58 TEST(WorkSerializerTest, ExecuteOneRun) {
59 auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
60 gpr_event done;
61 gpr_event_init(&done);
62 lock->Run([&done]() { gpr_event_set(&done, reinterpret_cast<void*>(1)); },
63 DEBUG_LOCATION);
64 EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
65 nullptr);
66 lock.reset();
67 WaitForSingleOwner(GetDefaultEventEngine());
68 }
69
TEST(WorkSerializerTest,ExecuteOneScheduleAndDrain)70 TEST(WorkSerializerTest, ExecuteOneScheduleAndDrain) {
71 auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
72 gpr_event done;
73 gpr_event_init(&done);
74 lock->Schedule(
75 [&done]() {
76 EXPECT_EQ(gpr_event_get(&done), nullptr);
77 gpr_event_set(&done, reinterpret_cast<void*>(1));
78 },
79 DEBUG_LOCATION);
80 lock->DrainQueue();
81 EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
82 nullptr);
83 lock.reset();
84 WaitForSingleOwner(GetDefaultEventEngine());
85 }
86
87 class TestThread {
88 public:
TestThread(WorkSerializer * lock)89 explicit TestThread(WorkSerializer* lock)
90 : lock_(lock), thread_("grpc_execute_many", ExecuteManyLoop, this) {
91 gpr_event_init(&done_);
92 thread_.Start();
93 }
94
~TestThread()95 ~TestThread() {
96 EXPECT_NE(gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME)),
97 nullptr);
98 thread_.Join();
99 }
100
101 private:
ExecuteManyLoop(void * arg)102 static void ExecuteManyLoop(void* arg) {
103 TestThread* self = static_cast<TestThread*>(arg);
104 size_t n = 1;
105 for (size_t i = 0; i < 10; i++) {
106 for (size_t j = 0; j < 10000; j++) {
107 struct ExecutionArgs {
108 size_t* counter;
109 size_t value;
110 };
111 ExecutionArgs* c = new ExecutionArgs;
112 c->counter = &self->counter_;
113 c->value = n++;
114 self->lock_->Run(
115 [c]() {
116 EXPECT_TRUE(*c->counter == c->value - 1);
117 *c->counter = c->value;
118 delete c;
119 },
120 DEBUG_LOCATION);
121 }
122 // sleep for a little bit, to test other threads picking up the load
123 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
124 }
125 self->lock_->Run(
126 [self]() { gpr_event_set(&self->done_, reinterpret_cast<void*>(1)); },
127 DEBUG_LOCATION);
128 }
129
130 WorkSerializer* lock_ = nullptr;
131 Thread thread_;
132 size_t counter_ = 0;
133 gpr_event done_;
134 };
135
TEST(WorkSerializerTest,ExecuteMany)136 TEST(WorkSerializerTest, ExecuteMany) {
137 auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
138 {
139 std::vector<std::unique_ptr<TestThread>> threads;
140 for (size_t i = 0; i < 10; ++i) {
141 threads.push_back(std::make_unique<TestThread>(lock.get()));
142 }
143 }
144 lock.reset();
145 WaitForSingleOwner(GetDefaultEventEngine());
146 }
147
148 class TestThreadScheduleAndDrain {
149 public:
TestThreadScheduleAndDrain(WorkSerializer * lock)150 explicit TestThreadScheduleAndDrain(WorkSerializer* lock)
151 : lock_(lock), thread_("grpc_execute_many", ExecuteManyLoop, this) {
152 gpr_event_init(&done_);
153 thread_.Start();
154 }
155
~TestThreadScheduleAndDrain()156 ~TestThreadScheduleAndDrain() {
157 EXPECT_NE(gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME)),
158 nullptr);
159 thread_.Join();
160 }
161
162 private:
ExecuteManyLoop(void * arg)163 static void ExecuteManyLoop(void* arg) {
164 TestThreadScheduleAndDrain* self =
165 static_cast<TestThreadScheduleAndDrain*>(arg);
166 size_t n = 1;
167 for (size_t i = 0; i < 10; i++) {
168 for (size_t j = 0; j < 10000; j++) {
169 struct ExecutionArgs {
170 size_t* counter;
171 size_t value;
172 };
173 ExecutionArgs* c = new ExecutionArgs;
174 c->counter = &self->counter_;
175 c->value = n++;
176 self->lock_->Schedule(
177 [c]() {
178 EXPECT_TRUE(*c->counter == c->value - 1);
179 *c->counter = c->value;
180 delete c;
181 },
182 DEBUG_LOCATION);
183 }
184 self->lock_->DrainQueue();
185 // sleep for a little bit, to test other threads picking up the load
186 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
187 }
188 self->lock_->Run(
189 [self]() { gpr_event_set(&self->done_, reinterpret_cast<void*>(1)); },
190 DEBUG_LOCATION);
191 }
192
193 WorkSerializer* lock_ = nullptr;
194 Thread thread_;
195 size_t counter_ = 0;
196 gpr_event done_;
197 };
198
TEST(WorkSerializerTest,ExecuteManyScheduleAndDrain)199 TEST(WorkSerializerTest, ExecuteManyScheduleAndDrain) {
200 auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
201 {
202 std::vector<std::unique_ptr<TestThreadScheduleAndDrain>> threads;
203 for (size_t i = 0; i < 10; ++i) {
204 threads.push_back(
205 std::make_unique<TestThreadScheduleAndDrain>(lock.get()));
206 }
207 }
208 lock.reset();
209 WaitForSingleOwner(GetDefaultEventEngine());
210 }
211
TEST(WorkSerializerTest,ExecuteManyMixedRunScheduleAndDrain)212 TEST(WorkSerializerTest, ExecuteManyMixedRunScheduleAndDrain) {
213 auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
214 {
215 std::vector<std::unique_ptr<TestThread>> run_threads;
216 std::vector<std::unique_ptr<TestThreadScheduleAndDrain>> schedule_threads;
217 for (size_t i = 0; i < 10; ++i) {
218 run_threads.push_back(std::make_unique<TestThread>(lock.get()));
219 schedule_threads.push_back(
220 std::make_unique<TestThreadScheduleAndDrain>(lock.get()));
221 }
222 }
223 lock.reset();
224 WaitForSingleOwner(GetDefaultEventEngine());
225 }
226
227 // Tests that work serializers allow destruction from the last callback
TEST(WorkSerializerTest,CallbackDestroysWorkSerializer)228 TEST(WorkSerializerTest, CallbackDestroysWorkSerializer) {
229 auto lock = std::make_shared<WorkSerializer>(GetDefaultEventEngine());
230 lock->Run([&]() { lock.reset(); }, DEBUG_LOCATION);
231 WaitForSingleOwner(GetDefaultEventEngine());
232 }
233
234 // Tests additional racy conditions when the last callback triggers work
235 // serializer destruction.
TEST(WorkSerializerTest,WorkSerializerDestructionRace)236 TEST(WorkSerializerTest, WorkSerializerDestructionRace) {
237 for (int i = 0; i < 1000; ++i) {
238 auto lock = std::make_shared<WorkSerializer>(GetDefaultEventEngine());
239 Notification notification;
240 std::thread t1([&]() {
241 notification.WaitForNotification();
242 lock.reset();
243 });
244 lock->Run([&]() { notification.Notify(); }, DEBUG_LOCATION);
245 t1.join();
246 }
247 WaitForSingleOwner(GetDefaultEventEngine());
248 }
249
250 // Tests racy conditions when the last callback triggers work
251 // serializer destruction.
TEST(WorkSerializerTest,WorkSerializerDestructionRaceMultipleThreads)252 TEST(WorkSerializerTest, WorkSerializerDestructionRaceMultipleThreads) {
253 auto lock = std::make_shared<WorkSerializer>(GetDefaultEventEngine());
254 absl::Barrier barrier(11);
255 std::vector<std::thread> threads;
256 threads.reserve(10);
257 for (int i = 0; i < 10; ++i) {
258 threads.emplace_back([lock, &barrier]() mutable {
259 barrier.Block();
260 lock->Run([lock]() mutable { lock.reset(); }, DEBUG_LOCATION);
261 });
262 }
263 barrier.Block();
264 lock.reset();
265 for (auto& thread : threads) {
266 thread.join();
267 }
268 WaitForSingleOwner(GetDefaultEventEngine());
269 }
270
TEST(WorkSerializerTest,MetricsWork)271 TEST(WorkSerializerTest, MetricsWork) {
272 if (!IsWorkSerializerDispatchEnabled()) {
273 GTEST_SKIP() << "Work serializer dispatch experiment not enabled";
274 }
275
276 auto serializer = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
277 auto schedule_sleep = [&serializer](absl::Duration how_long) {
278 ExecCtx exec_ctx;
279 auto n = std::make_shared<Notification>();
280 serializer->Run(
281 [how_long, n]() {
282 absl::SleepFor(how_long);
283 n->Notify();
284 },
285 DEBUG_LOCATION);
286 return n;
287 };
288 auto before = global_stats().Collect();
289 auto stats_diff_from = [&before](absl::AnyInvocable<void()> f) {
290 f();
291 // Insert a pause for the work serialier to update the stats. Reading stats
292 // here can still race with the work serializer's update attempt.
293 gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
294 auto after = global_stats().Collect();
295 auto diff = after->Diff(*before);
296 before = std::move(after);
297 return diff;
298 };
299 // Test adding one work item to the queue
300 auto diff = stats_diff_from(
301 [&] { schedule_sleep(absl::Seconds(1))->WaitForNotification(); });
302 EXPECT_EQ(diff->work_serializer_items_enqueued, 1);
303 EXPECT_EQ(diff->work_serializer_items_dequeued, 1);
304 EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
305 .Percentile(0.5),
306 1.0);
307 EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
308 .Percentile(0.5),
309 2.0);
310 EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
311 .Percentile(0.5),
312 800.0);
313 EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
314 .Percentile(0.5),
315 1300.0);
316 EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
317 .Percentile(0.5),
318 800.0);
319 EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
320 .Percentile(0.5),
321 1300.0);
322 EXPECT_GE(
323 diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
324 .Percentile(0.5),
325 800.0);
326 EXPECT_LE(
327 diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
328 .Percentile(0.5),
329 1300.0);
330 EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
331 .Percentile(0.5),
332 diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
333 .Percentile(0.5));
334 // Now throw a bunch of work in and see that we get good results
335 diff = stats_diff_from([&] {
336 for (int i = 0; i < 10; i++) {
337 schedule_sleep(absl::Milliseconds(1000));
338 }
339 schedule_sleep(absl::Milliseconds(1000))->WaitForNotification();
340 });
341 EXPECT_EQ(diff->work_serializer_items_enqueued, 11);
342 EXPECT_EQ(diff->work_serializer_items_dequeued, 11);
343 EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
344 .Percentile(0.5),
345 7.0);
346 EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
347 .Percentile(0.5),
348 15.0);
349 EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
350 .Percentile(0.5),
351 7000.0);
352 EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
353 .Percentile(0.5),
354 15000.0);
355 EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
356 .Percentile(0.5),
357 7000.0);
358 EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
359 .Percentile(0.5),
360 15000.0);
361 EXPECT_GE(
362 diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
363 .Percentile(0.5),
364 800.0);
365 EXPECT_LE(
366 diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
367 .Percentile(0.5),
368 1300.0);
369 EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
370 .Percentile(0.5),
371 diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
372 .Percentile(0.5));
373
374 serializer.reset();
375 WaitForSingleOwner(GetDefaultEventEngine());
376 }
377
378 #ifndef NDEBUG
TEST(WorkSerializerTest,RunningInWorkSerializer)379 TEST(WorkSerializerTest, RunningInWorkSerializer) {
380 auto work_serializer1 =
381 std::make_shared<WorkSerializer>(GetDefaultEventEngine());
382 auto work_serializer2 =
383 std::make_shared<WorkSerializer>(GetDefaultEventEngine());
384 EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
385 EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
386 work_serializer1->Run(
387 [=]() {
388 EXPECT_TRUE(work_serializer1->RunningInWorkSerializer());
389 EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
390 work_serializer2->Run(
391 [=]() {
392 EXPECT_EQ(work_serializer1->RunningInWorkSerializer(),
393 !IsWorkSerializerDispatchEnabled());
394 EXPECT_TRUE(work_serializer2->RunningInWorkSerializer());
395 },
396 DEBUG_LOCATION);
397 },
398 DEBUG_LOCATION);
399 EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
400 EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
401 work_serializer2->Run(
402 [=]() {
403 EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
404 EXPECT_TRUE(work_serializer2->RunningInWorkSerializer());
405 work_serializer1->Run(
406 [=]() {
407 EXPECT_TRUE(work_serializer1->RunningInWorkSerializer());
408 EXPECT_EQ(work_serializer2->RunningInWorkSerializer(),
409 !IsWorkSerializerDispatchEnabled());
410 },
411 DEBUG_LOCATION);
412 },
413 DEBUG_LOCATION);
414 EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
415 EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
416 Notification done1;
417 Notification done2;
418 work_serializer1->Run([&done1]() { done1.Notify(); }, DEBUG_LOCATION);
419 work_serializer2->Run([&done2]() { done2.Notify(); }, DEBUG_LOCATION);
420 done1.WaitForNotification();
421 done2.WaitForNotification();
422 work_serializer1.reset();
423 work_serializer2.reset();
424 WaitForSingleOwner(GetDefaultEventEngine());
425 }
426 #endif
427
428 } // namespace
429 } // namespace grpc_core
430
main(int argc,char ** argv)431 int main(int argc, char** argv) {
432 grpc::testing::TestEnvironment env(&argc, argv);
433 ::testing::InitGoogleTest(&argc, argv);
434 grpc_init();
435 int retval = RUN_ALL_TESTS();
436 grpc_shutdown();
437 return retval;
438 }
439