1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/event_engine/posix_engine/traced_buffer_list.h"
16
17 #include <grpc/support/atm.h>
18 #include <grpc/support/time.h>
19 #include <time.h>
20
21 #include <memory>
22
23 #include "absl/log/check.h"
24 #include "gtest/gtest.h"
25 #include "src/core/lib/iomgr/exec_ctx.h"
26 #include "src/core/lib/iomgr/port.h"
27 #include "src/core/util/time.h"
28 #include "src/core/util/useful.h"
29
30 #ifdef GRPC_LINUX_ERRQUEUE
31
32 #include <linux/errqueue.h>
33
34 #define NUM_ELEM 5
35
36 extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
37
38 namespace grpc_event_engine {
39 namespace experimental {
40 namespace {
41
42 constexpr uint64_t kMaxAdvanceTimeMillis = 24ull * 365 * 3600 * 1000;
43
44 gpr_timespec g_now;
now_impl(gpr_clock_type clock_type)45 gpr_timespec now_impl(gpr_clock_type clock_type) {
46 CHECK(clock_type != GPR_TIMESPAN);
47 gpr_timespec ts = g_now;
48 ts.clock_type = clock_type;
49 return ts;
50 }
51
InitGlobals()52 void InitGlobals() {
53 g_now = {1, 0, GPR_CLOCK_MONOTONIC};
54 grpc_core::TestOnlySetProcessEpoch(g_now);
55 gpr_now_impl = now_impl;
56 }
57
AdvanceClockMillis(uint64_t millis)58 void AdvanceClockMillis(uint64_t millis) {
59 grpc_core::ExecCtx exec_ctx;
60 g_now = gpr_time_add(
61 g_now,
62 gpr_time_from_millis(grpc_core::Clamp(millis, static_cast<uint64_t>(1),
63 kMaxAdvanceTimeMillis),
64 GPR_TIMESPAN));
65 exec_ctx.InvalidateNow();
66 }
67
TestShutdownFlushesListVerifier(void * arg,Timestamps *,absl::Status status)68 void TestShutdownFlushesListVerifier(void* arg, Timestamps* /*ts*/,
69 absl::Status status) {
70 ASSERT_TRUE(status.ok());
71 ASSERT_NE(arg, nullptr);
72 int* done = reinterpret_cast<int*>(arg);
73 *done = 1;
74 }
75
76 } // namespace
77
78 // Tests that all TracedBuffer elements in the list are flushed out on shutdown.
79 // Also tests that arg is passed correctly.
TEST(BufferListTest,TestShutdownFlushesList)80 TEST(BufferListTest, TestShutdownFlushesList) {
81 TcpSetWriteTimestampsCallback(TestShutdownFlushesListVerifier);
82 TracedBufferList traced_buffers;
83 int verifier_called[NUM_ELEM];
84 for (auto i = 0; i < NUM_ELEM; i++) {
85 verifier_called[i] = 0;
86 traced_buffers.AddNewEntry(i, 0, static_cast<void*>(&verifier_called[i]));
87 }
88 traced_buffers.Shutdown(nullptr, absl::OkStatus());
89 for (auto i = 0; i < NUM_ELEM; i++) {
90 ASSERT_EQ(verifier_called[i], 1);
91 }
92 ASSERT_TRUE(traced_buffers.Size() == 0);
93 }
94
95 // Tests that the timestamp verifier is called on an ACK timestamp.
TEST(BufferListTest,TestVerifierCalledOnAck)96 TEST(BufferListTest, TestVerifierCalledOnAck) {
97 struct sock_extended_err serr;
98 serr.ee_data = 213;
99 serr.ee_info = SCM_TSTAMP_ACK;
100 struct scm_timestamping tss;
101 tss.ts[0].tv_sec = 123;
102 tss.ts[0].tv_nsec = 456;
103 TcpSetWriteTimestampsCallback(
104 [](void* arg, Timestamps* ts, absl::Status status) {
105 ASSERT_TRUE(status.ok());
106 ASSERT_NE(arg, nullptr);
107 ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
108 ASSERT_EQ(ts->acked_time.time.tv_sec, 123);
109 ASSERT_EQ(ts->acked_time.time.tv_nsec, 456);
110 ASSERT_GT(ts->info.length, 0);
111 int* done = reinterpret_cast<int*>(arg);
112 *done = 1;
113 });
114 TracedBufferList traced_buffers;
115 int verifier_called = 0;
116 traced_buffers.AddNewEntry(213, 0, &verifier_called);
117 traced_buffers.ProcessTimestamp(&serr, nullptr, &tss);
118 ASSERT_EQ(verifier_called, 1);
119 ASSERT_TRUE(traced_buffers.Size() == 0);
120 traced_buffers.Shutdown(nullptr, absl::OkStatus());
121 ASSERT_TRUE(traced_buffers.Size() == 0);
122 }
123
124 // Tests that ProcessTimestamp called after Shutdown does nothing.
TEST(BufferListTest,TestProcessTimestampAfterShutdown)125 TEST(BufferListTest, TestProcessTimestampAfterShutdown) {
126 struct sock_extended_err serr;
127 serr.ee_data = 213;
128 serr.ee_info = SCM_TSTAMP_ACK;
129 struct scm_timestamping tss;
130 tss.ts[0].tv_sec = 123;
131 tss.ts[0].tv_nsec = 456;
132 TcpSetWriteTimestampsCallback(TestShutdownFlushesListVerifier);
133 TracedBufferList traced_buffers;
134 int verifier_called = 0;
135
136 traced_buffers.AddNewEntry(213, 0, &verifier_called);
137 ASSERT_TRUE(traced_buffers.Size() == 1);
138 traced_buffers.Shutdown(nullptr, absl::OkStatus());
139 ASSERT_TRUE(traced_buffers.Size() == 0);
140 // Check that the callback was executed after first Shutdown.
141 ASSERT_EQ(verifier_called, 1);
142 verifier_called = 0;
143 traced_buffers.Shutdown(nullptr, absl::OkStatus());
144 ASSERT_TRUE(traced_buffers.Size() == 0);
145 // Second Shutdown should not execute the callback.
146 ASSERT_EQ(verifier_called, 0);
147 traced_buffers.ProcessTimestamp(&serr, nullptr, &tss);
148 // A ProcessTimestamp after Shutdown should not execute the callback.
149 ASSERT_EQ(verifier_called, 0);
150 }
151
TEST(BufferListTest,TestLongPendingAckForOneTracedBuffer)152 TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) {
153 constexpr int kMaxPendingAckMillis = 10000;
154 struct sock_extended_err serr[3];
155 gpr_atm verifier_called[3];
156 struct scm_timestamping tss;
157 TracedBufferList tb_list;
158 serr[0].ee_data = 1;
159 serr[0].ee_info = SCM_TSTAMP_SCHED;
160 serr[1].ee_data = 1;
161 serr[1].ee_info = SCM_TSTAMP_SND;
162 serr[2].ee_data = 1;
163 serr[2].ee_info = SCM_TSTAMP_ACK;
164 gpr_atm_rel_store(&verifier_called[0], static_cast<gpr_atm>(0));
165 gpr_atm_rel_store(&verifier_called[1], static_cast<gpr_atm>(0));
166 gpr_atm_rel_store(&verifier_called[2], static_cast<gpr_atm>(0));
167
168 // Add 3 traced buffers
169 tb_list.AddNewEntry(1, 0, &verifier_called[0]);
170 tb_list.AddNewEntry(2, 0, &verifier_called[1]);
171 tb_list.AddNewEntry(3, 0, &verifier_called[2]);
172
173 AdvanceClockMillis(kMaxPendingAckMillis);
174 tss.ts[0].tv_sec = g_now.tv_sec;
175 tss.ts[0].tv_nsec = g_now.tv_nsec;
176
177 // Process SCHED Timestamp for 1st traced buffer.
178 // Nothing should be flushed.
179 TcpSetWriteTimestampsCallback(
180 [](void*, Timestamps*, absl::Status) { ASSERT_TRUE(false); });
181 tb_list.ProcessTimestamp(&serr[0], nullptr, &tss);
182 ASSERT_EQ(tb_list.Size(), 3);
183 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
184 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(0));
185 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(0));
186
187 AdvanceClockMillis(kMaxPendingAckMillis);
188 tss.ts[0].tv_sec = g_now.tv_sec;
189 tss.ts[0].tv_nsec = g_now.tv_nsec;
190
191 // Process SND Timestamp for 1st traced buffer. The second and third traced
192 // buffers must be flushed because the max pending ack time would have
193 // elapsed for them.
194 TcpSetWriteTimestampsCallback([](void* arg, Timestamps*, absl::Status error) {
195 ASSERT_EQ(error, absl::DeadlineExceededError("Ack timed out"));
196 ASSERT_NE(arg, nullptr);
197 gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
198 gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
199 });
200 tb_list.ProcessTimestamp(&serr[1], nullptr, &tss);
201 ASSERT_EQ(tb_list.Size(), 1);
202 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
203 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(1));
204 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(1));
205
206 AdvanceClockMillis(kMaxPendingAckMillis);
207 tss.ts[0].tv_sec = g_now.tv_sec;
208 tss.ts[0].tv_nsec = g_now.tv_nsec;
209
210 // Process ACK Timestamp for 1st traced buffer.
211 TcpSetWriteTimestampsCallback(
212 [](void* arg, Timestamps* ts, absl::Status error) {
213 ASSERT_TRUE(error.ok());
214 ASSERT_NE(arg, nullptr);
215 ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
216 ASSERT_EQ(ts->acked_time.time.tv_sec, g_now.tv_sec);
217 ASSERT_EQ(ts->acked_time.time.tv_nsec, g_now.tv_nsec);
218 ASSERT_GT(ts->info.length, 0);
219 gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
220 gpr_atm_rel_store(done, static_cast<gpr_atm>(2));
221 });
222 tb_list.ProcessTimestamp(&serr[2], nullptr, &tss);
223 ASSERT_EQ(tb_list.Size(), 0);
224 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(2));
225 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(1));
226 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(1));
227
228 tb_list.Shutdown(nullptr, absl::OkStatus());
229 }
230
TEST(BufferListTest,TestLongPendingAckForSomeTracedBuffers)231 TEST(BufferListTest, TestLongPendingAckForSomeTracedBuffers) {
232 constexpr int kNumTracedBuffers = 10;
233 constexpr int kMaxPendingAckMillis = 10000;
234 struct sock_extended_err serr[kNumTracedBuffers];
235 gpr_atm verifier_called[kNumTracedBuffers];
236 struct scm_timestamping tss;
237 tss.ts[0].tv_sec = 123;
238 tss.ts[0].tv_nsec = 456;
239 TcpSetWriteTimestampsCallback(
240 [](void* arg, Timestamps* ts, absl::Status status) {
241 ASSERT_NE(arg, nullptr);
242 if (status.ok()) {
243 ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
244 ASSERT_EQ(ts->acked_time.time.tv_sec, 123);
245 ASSERT_EQ(ts->acked_time.time.tv_nsec, 456);
246 ASSERT_GT(ts->info.length, 0);
247 *(reinterpret_cast<int*>(arg)) = 1;
248 } else if (status == absl::DeadlineExceededError("Ack timed out")) {
249 *(reinterpret_cast<int*>(arg)) = 2;
250 } else {
251 ASSERT_TRUE(false);
252 }
253 });
254 TracedBufferList tb_list;
255 for (int i = 0; i < kNumTracedBuffers; i++) {
256 serr[i].ee_data = i + 1;
257 serr[i].ee_info = SCM_TSTAMP_ACK;
258 gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0));
259 tb_list.AddNewEntry(i + 1, 0, &verifier_called[i]);
260 }
261 int elapsed_time_millis = 0;
262 int increment_millis = (2 * kMaxPendingAckMillis) / 10;
263 for (int i = 0; i < kNumTracedBuffers; i++) {
264 AdvanceClockMillis(increment_millis);
265 elapsed_time_millis += increment_millis;
266 tb_list.ProcessTimestamp(&serr[i], nullptr, &tss);
267 if (elapsed_time_millis > kMaxPendingAckMillis) {
268 // MaxPendingAckMillis has elapsed. the rest of tb_list must have been
269 // flushed now.
270 ASSERT_EQ(tb_list.Size(), 0);
271 if (elapsed_time_millis - kMaxPendingAckMillis == increment_millis) {
272 // The first ProcessTimestamp just after kMaxPendingAckMillis would have
273 // still successfully processed the head traced buffer entry and then
274 // discarded all the other remaining traced buffer entries. The first
275 // traced buffer entry would have been processed because the ACK
276 // timestamp was received for it.
277 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]),
278 static_cast<gpr_atm>(1));
279 } else {
280 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]),
281 static_cast<gpr_atm>(2));
282 }
283 } else {
284 ASSERT_EQ(tb_list.Size(), kNumTracedBuffers - (i + 1));
285 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]), static_cast<gpr_atm>(1));
286 }
287 }
288 tb_list.Shutdown(nullptr, absl::OkStatus());
289 }
290
291 } // namespace experimental
292 } // namespace grpc_event_engine
293
main(int argc,char ** argv)294 int main(int argc, char** argv) {
295 ::testing::InitGoogleTest(&argc, argv);
296 grpc_event_engine::experimental::InitGlobals();
297 return RUN_ALL_TESTS();
298 }
299
300 #else // GRPC_LINUX_ERRQUEUE
301
main(int,char **)302 int main(int /*argc*/, char** /*argv*/) { return 0; }
303
304 #endif // GRPC_LINUX_ERRQUEUE
305