1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/event_engine/posix_engine/traced_buffer_list.h"
16
17 #include <time.h>
18
19 #include <memory>
20
21 #include "gtest/gtest.h"
22
23 #include <grpc/support/atm.h>
24 #include <grpc/support/log.h>
25 #include <grpc/support/time.h>
26
27 #include "src/core/lib/gpr/useful.h"
28 #include "src/core/lib/gprpp/time.h"
29 #include "src/core/lib/iomgr/exec_ctx.h"
30 #include "src/core/lib/iomgr/port.h"
31
32 #ifdef GRPC_LINUX_ERRQUEUE
33
34 #include <linux/errqueue.h>
35
36 #define NUM_ELEM 5
37
38 extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
39
40 namespace grpc_event_engine {
41 namespace experimental {
42 namespace {
43
44 constexpr uint64_t kMaxAdvanceTimeMillis = 24ull * 365 * 3600 * 1000;
45
46 gpr_timespec g_now;
now_impl(gpr_clock_type clock_type)47 gpr_timespec now_impl(gpr_clock_type clock_type) {
48 GPR_ASSERT(clock_type != GPR_TIMESPAN);
49 gpr_timespec ts = g_now;
50 ts.clock_type = clock_type;
51 return ts;
52 }
53
InitGlobals()54 void InitGlobals() {
55 g_now = {1, 0, GPR_CLOCK_MONOTONIC};
56 grpc_core::TestOnlySetProcessEpoch(g_now);
57 gpr_now_impl = now_impl;
58 }
59
AdvanceClockMillis(uint64_t millis)60 void AdvanceClockMillis(uint64_t millis) {
61 grpc_core::ExecCtx exec_ctx;
62 g_now = gpr_time_add(
63 g_now,
64 gpr_time_from_millis(grpc_core::Clamp(millis, static_cast<uint64_t>(1),
65 kMaxAdvanceTimeMillis),
66 GPR_TIMESPAN));
67 exec_ctx.InvalidateNow();
68 }
69
TestShutdownFlushesListVerifier(void * arg,Timestamps *,absl::Status status)70 void TestShutdownFlushesListVerifier(void* arg, Timestamps* /*ts*/,
71 absl::Status status) {
72 ASSERT_TRUE(status.ok());
73 ASSERT_NE(arg, nullptr);
74 int* done = reinterpret_cast<int*>(arg);
75 *done = 1;
76 }
77
78 } // namespace
79
80 // Tests that all TracedBuffer elements in the list are flushed out on shutdown.
81 // Also tests that arg is passed correctly.
TEST(BufferListTest,TestShutdownFlushesList)82 TEST(BufferListTest, TestShutdownFlushesList) {
83 TcpSetWriteTimestampsCallback(TestShutdownFlushesListVerifier);
84 TracedBufferList traced_buffers;
85 int verifier_called[NUM_ELEM];
86 for (auto i = 0; i < NUM_ELEM; i++) {
87 verifier_called[i] = 0;
88 traced_buffers.AddNewEntry(i, 0, static_cast<void*>(&verifier_called[i]));
89 }
90 traced_buffers.Shutdown(nullptr, absl::OkStatus());
91 for (auto i = 0; i < NUM_ELEM; i++) {
92 ASSERT_EQ(verifier_called[i], 1);
93 }
94 ASSERT_TRUE(traced_buffers.Size() == 0);
95 }
96
97 // Tests that the timestamp verifier is called on an ACK timestamp.
TEST(BufferListTest,TestVerifierCalledOnAck)98 TEST(BufferListTest, TestVerifierCalledOnAck) {
99 struct sock_extended_err serr;
100 serr.ee_data = 213;
101 serr.ee_info = SCM_TSTAMP_ACK;
102 struct scm_timestamping tss;
103 tss.ts[0].tv_sec = 123;
104 tss.ts[0].tv_nsec = 456;
105 TcpSetWriteTimestampsCallback(
106 [](void* arg, Timestamps* ts, absl::Status status) {
107 ASSERT_TRUE(status.ok());
108 ASSERT_NE(arg, nullptr);
109 ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
110 ASSERT_EQ(ts->acked_time.time.tv_sec, 123);
111 ASSERT_EQ(ts->acked_time.time.tv_nsec, 456);
112 ASSERT_GT(ts->info.length, 0);
113 int* done = reinterpret_cast<int*>(arg);
114 *done = 1;
115 });
116 TracedBufferList traced_buffers;
117 int verifier_called = 0;
118 traced_buffers.AddNewEntry(213, 0, &verifier_called);
119 traced_buffers.ProcessTimestamp(&serr, nullptr, &tss);
120 ASSERT_EQ(verifier_called, 1);
121 ASSERT_TRUE(traced_buffers.Size() == 0);
122 traced_buffers.Shutdown(nullptr, absl::OkStatus());
123 ASSERT_TRUE(traced_buffers.Size() == 0);
124 }
125
126 // Tests that ProcessTimestamp called after Shutdown does nothing.
TEST(BufferListTest,TestProcessTimestampAfterShutdown)127 TEST(BufferListTest, TestProcessTimestampAfterShutdown) {
128 struct sock_extended_err serr;
129 serr.ee_data = 213;
130 serr.ee_info = SCM_TSTAMP_ACK;
131 struct scm_timestamping tss;
132 tss.ts[0].tv_sec = 123;
133 tss.ts[0].tv_nsec = 456;
134 TcpSetWriteTimestampsCallback(TestShutdownFlushesListVerifier);
135 TracedBufferList traced_buffers;
136 int verifier_called = 0;
137
138 traced_buffers.AddNewEntry(213, 0, &verifier_called);
139 ASSERT_TRUE(traced_buffers.Size() == 1);
140 traced_buffers.Shutdown(nullptr, absl::OkStatus());
141 ASSERT_TRUE(traced_buffers.Size() == 0);
142 // Check that the callback was executed after first Shutdown.
143 ASSERT_EQ(verifier_called, 1);
144 verifier_called = 0;
145 traced_buffers.Shutdown(nullptr, absl::OkStatus());
146 ASSERT_TRUE(traced_buffers.Size() == 0);
147 // Second Shutdown should not execute the callback.
148 ASSERT_EQ(verifier_called, 0);
149 traced_buffers.ProcessTimestamp(&serr, nullptr, &tss);
150 // A ProcessTimestamp after Shutdown should not execute the callback.
151 ASSERT_EQ(verifier_called, 0);
152 }
153
TEST(BufferListTest,TestLongPendingAckForOneTracedBuffer)154 TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) {
155 constexpr int kMaxPendingAckMillis = 10000;
156 struct sock_extended_err serr[3];
157 gpr_atm verifier_called[3];
158 struct scm_timestamping tss;
159 TracedBufferList tb_list;
160 serr[0].ee_data = 1;
161 serr[0].ee_info = SCM_TSTAMP_SCHED;
162 serr[1].ee_data = 1;
163 serr[1].ee_info = SCM_TSTAMP_SND;
164 serr[2].ee_data = 1;
165 serr[2].ee_info = SCM_TSTAMP_ACK;
166 gpr_atm_rel_store(&verifier_called[0], static_cast<gpr_atm>(0));
167 gpr_atm_rel_store(&verifier_called[1], static_cast<gpr_atm>(0));
168 gpr_atm_rel_store(&verifier_called[2], static_cast<gpr_atm>(0));
169
170 // Add 3 traced buffers
171 tb_list.AddNewEntry(1, 0, &verifier_called[0]);
172 tb_list.AddNewEntry(2, 0, &verifier_called[1]);
173 tb_list.AddNewEntry(3, 0, &verifier_called[2]);
174
175 AdvanceClockMillis(kMaxPendingAckMillis);
176 tss.ts[0].tv_sec = g_now.tv_sec;
177 tss.ts[0].tv_nsec = g_now.tv_nsec;
178
179 // Process SCHED Timestamp for 1st traced buffer.
180 // Nothing should be flushed.
181 TcpSetWriteTimestampsCallback(
182 [](void*, Timestamps*, absl::Status) { ASSERT_TRUE(false); });
183 tb_list.ProcessTimestamp(&serr[0], nullptr, &tss);
184 ASSERT_EQ(tb_list.Size(), 3);
185 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
186 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(0));
187 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(0));
188
189 AdvanceClockMillis(kMaxPendingAckMillis);
190 tss.ts[0].tv_sec = g_now.tv_sec;
191 tss.ts[0].tv_nsec = g_now.tv_nsec;
192
193 // Process SND Timestamp for 1st traced buffer. The second and third traced
194 // buffers must be flushed because the max pending ack time would have
195 // elapsed for them.
196 TcpSetWriteTimestampsCallback([](void* arg, Timestamps*, absl::Status error) {
197 ASSERT_EQ(error, absl::DeadlineExceededError("Ack timed out"));
198 ASSERT_NE(arg, nullptr);
199 gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
200 gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
201 });
202 tb_list.ProcessTimestamp(&serr[1], nullptr, &tss);
203 ASSERT_EQ(tb_list.Size(), 1);
204 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
205 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(1));
206 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(1));
207
208 AdvanceClockMillis(kMaxPendingAckMillis);
209 tss.ts[0].tv_sec = g_now.tv_sec;
210 tss.ts[0].tv_nsec = g_now.tv_nsec;
211
212 // Process ACK Timestamp for 1st traced buffer.
213 TcpSetWriteTimestampsCallback(
214 [](void* arg, Timestamps* ts, absl::Status error) {
215 ASSERT_TRUE(error.ok());
216 ASSERT_NE(arg, nullptr);
217 ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
218 ASSERT_EQ(ts->acked_time.time.tv_sec, g_now.tv_sec);
219 ASSERT_EQ(ts->acked_time.time.tv_nsec, g_now.tv_nsec);
220 ASSERT_GT(ts->info.length, 0);
221 gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
222 gpr_atm_rel_store(done, static_cast<gpr_atm>(2));
223 });
224 tb_list.ProcessTimestamp(&serr[2], nullptr, &tss);
225 ASSERT_EQ(tb_list.Size(), 0);
226 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(2));
227 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(1));
228 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(1));
229
230 tb_list.Shutdown(nullptr, absl::OkStatus());
231 }
232
TEST(BufferListTest,TestLongPendingAckForSomeTracedBuffers)233 TEST(BufferListTest, TestLongPendingAckForSomeTracedBuffers) {
234 constexpr int kNumTracedBuffers = 10;
235 constexpr int kMaxPendingAckMillis = 10000;
236 struct sock_extended_err serr[kNumTracedBuffers];
237 gpr_atm verifier_called[kNumTracedBuffers];
238 struct scm_timestamping tss;
239 tss.ts[0].tv_sec = 123;
240 tss.ts[0].tv_nsec = 456;
241 TcpSetWriteTimestampsCallback(
242 [](void* arg, Timestamps* ts, absl::Status status) {
243 ASSERT_NE(arg, nullptr);
244 if (status.ok()) {
245 ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
246 ASSERT_EQ(ts->acked_time.time.tv_sec, 123);
247 ASSERT_EQ(ts->acked_time.time.tv_nsec, 456);
248 ASSERT_GT(ts->info.length, 0);
249 *(reinterpret_cast<int*>(arg)) = 1;
250 } else if (status == absl::DeadlineExceededError("Ack timed out")) {
251 *(reinterpret_cast<int*>(arg)) = 2;
252 } else {
253 ASSERT_TRUE(false);
254 }
255 });
256 TracedBufferList tb_list;
257 for (int i = 0; i < kNumTracedBuffers; i++) {
258 serr[i].ee_data = i + 1;
259 serr[i].ee_info = SCM_TSTAMP_ACK;
260 gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0));
261 tb_list.AddNewEntry(i + 1, 0, &verifier_called[i]);
262 }
263 int elapsed_time_millis = 0;
264 int increment_millis = (2 * kMaxPendingAckMillis) / 10;
265 for (int i = 0; i < kNumTracedBuffers; i++) {
266 AdvanceClockMillis(increment_millis);
267 elapsed_time_millis += increment_millis;
268 tb_list.ProcessTimestamp(&serr[i], nullptr, &tss);
269 if (elapsed_time_millis > kMaxPendingAckMillis) {
270 // MaxPendingAckMillis has elapsed. the rest of tb_list must have been
271 // flushed now.
272 ASSERT_EQ(tb_list.Size(), 0);
273 if (elapsed_time_millis - kMaxPendingAckMillis == increment_millis) {
274 // The first ProcessTimestamp just after kMaxPendingAckMillis would have
275 // still successfully processed the head traced buffer entry and then
276 // discarded all the other remaining traced buffer entries. The first
277 // traced buffer entry would have been processed because the ACK
278 // timestamp was received for it.
279 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]),
280 static_cast<gpr_atm>(1));
281 } else {
282 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]),
283 static_cast<gpr_atm>(2));
284 }
285 } else {
286 ASSERT_EQ(tb_list.Size(), kNumTracedBuffers - (i + 1));
287 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]), static_cast<gpr_atm>(1));
288 }
289 }
290 tb_list.Shutdown(nullptr, absl::OkStatus());
291 }
292
293 } // namespace experimental
294 } // namespace grpc_event_engine
295
main(int argc,char ** argv)296 int main(int argc, char** argv) {
297 ::testing::InitGoogleTest(&argc, argv);
298 grpc_event_engine::experimental::InitGlobals();
299 return RUN_ALL_TESTS();
300 }
301
302 #else // GRPC_LINUX_ERRQUEUE
303
main(int,char **)304 int main(int /*argc*/, char** /*argv*/) { return 0; }
305
306 #endif // GRPC_LINUX_ERRQUEUE
307