• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/event_engine/posix_engine/traced_buffer_list.h"
16 
17 #include <grpc/support/atm.h>
18 #include <grpc/support/time.h>
19 #include <time.h>
20 
21 #include <memory>
22 
23 #include "absl/log/check.h"
24 #include "gtest/gtest.h"
25 #include "src/core/lib/iomgr/exec_ctx.h"
26 #include "src/core/lib/iomgr/port.h"
27 #include "src/core/util/time.h"
28 #include "src/core/util/useful.h"
29 
30 #ifdef GRPC_LINUX_ERRQUEUE
31 
32 #include <linux/errqueue.h>
33 
34 #define NUM_ELEM 5
35 
36 extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
37 
38 namespace grpc_event_engine {
39 namespace experimental {
40 namespace {
41 
42 constexpr uint64_t kMaxAdvanceTimeMillis = 24ull * 365 * 3600 * 1000;
43 
44 gpr_timespec g_now;
now_impl(gpr_clock_type clock_type)45 gpr_timespec now_impl(gpr_clock_type clock_type) {
46   CHECK(clock_type != GPR_TIMESPAN);
47   gpr_timespec ts = g_now;
48   ts.clock_type = clock_type;
49   return ts;
50 }
51 
InitGlobals()52 void InitGlobals() {
53   g_now = {1, 0, GPR_CLOCK_MONOTONIC};
54   grpc_core::TestOnlySetProcessEpoch(g_now);
55   gpr_now_impl = now_impl;
56 }
57 
AdvanceClockMillis(uint64_t millis)58 void AdvanceClockMillis(uint64_t millis) {
59   grpc_core::ExecCtx exec_ctx;
60   g_now = gpr_time_add(
61       g_now,
62       gpr_time_from_millis(grpc_core::Clamp(millis, static_cast<uint64_t>(1),
63                                             kMaxAdvanceTimeMillis),
64                            GPR_TIMESPAN));
65   exec_ctx.InvalidateNow();
66 }
67 
TestShutdownFlushesListVerifier(void * arg,Timestamps *,absl::Status status)68 void TestShutdownFlushesListVerifier(void* arg, Timestamps* /*ts*/,
69                                      absl::Status status) {
70   ASSERT_TRUE(status.ok());
71   ASSERT_NE(arg, nullptr);
72   int* done = reinterpret_cast<int*>(arg);
73   *done = 1;
74 }
75 
76 }  // namespace
77 
78 // Tests that all TracedBuffer elements in the list are flushed out on shutdown.
79 // Also tests that arg is passed correctly.
TEST(BufferListTest,TestShutdownFlushesList)80 TEST(BufferListTest, TestShutdownFlushesList) {
81   TcpSetWriteTimestampsCallback(TestShutdownFlushesListVerifier);
82   TracedBufferList traced_buffers;
83   int verifier_called[NUM_ELEM];
84   for (auto i = 0; i < NUM_ELEM; i++) {
85     verifier_called[i] = 0;
86     traced_buffers.AddNewEntry(i, 0, static_cast<void*>(&verifier_called[i]));
87   }
88   traced_buffers.Shutdown(nullptr, absl::OkStatus());
89   for (auto i = 0; i < NUM_ELEM; i++) {
90     ASSERT_EQ(verifier_called[i], 1);
91   }
92   ASSERT_TRUE(traced_buffers.Size() == 0);
93 }
94 
95 // Tests that the timestamp verifier is called on an ACK timestamp.
TEST(BufferListTest,TestVerifierCalledOnAck)96 TEST(BufferListTest, TestVerifierCalledOnAck) {
97   struct sock_extended_err serr;
98   serr.ee_data = 213;
99   serr.ee_info = SCM_TSTAMP_ACK;
100   struct scm_timestamping tss;
101   tss.ts[0].tv_sec = 123;
102   tss.ts[0].tv_nsec = 456;
103   TcpSetWriteTimestampsCallback(
104       [](void* arg, Timestamps* ts, absl::Status status) {
105         ASSERT_TRUE(status.ok());
106         ASSERT_NE(arg, nullptr);
107         ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
108         ASSERT_EQ(ts->acked_time.time.tv_sec, 123);
109         ASSERT_EQ(ts->acked_time.time.tv_nsec, 456);
110         ASSERT_GT(ts->info.length, 0);
111         int* done = reinterpret_cast<int*>(arg);
112         *done = 1;
113       });
114   TracedBufferList traced_buffers;
115   int verifier_called = 0;
116   traced_buffers.AddNewEntry(213, 0, &verifier_called);
117   traced_buffers.ProcessTimestamp(&serr, nullptr, &tss);
118   ASSERT_EQ(verifier_called, 1);
119   ASSERT_TRUE(traced_buffers.Size() == 0);
120   traced_buffers.Shutdown(nullptr, absl::OkStatus());
121   ASSERT_TRUE(traced_buffers.Size() == 0);
122 }
123 
124 // Tests that ProcessTimestamp called after Shutdown does nothing.
TEST(BufferListTest,TestProcessTimestampAfterShutdown)125 TEST(BufferListTest, TestProcessTimestampAfterShutdown) {
126   struct sock_extended_err serr;
127   serr.ee_data = 213;
128   serr.ee_info = SCM_TSTAMP_ACK;
129   struct scm_timestamping tss;
130   tss.ts[0].tv_sec = 123;
131   tss.ts[0].tv_nsec = 456;
132   TcpSetWriteTimestampsCallback(TestShutdownFlushesListVerifier);
133   TracedBufferList traced_buffers;
134   int verifier_called = 0;
135 
136   traced_buffers.AddNewEntry(213, 0, &verifier_called);
137   ASSERT_TRUE(traced_buffers.Size() == 1);
138   traced_buffers.Shutdown(nullptr, absl::OkStatus());
139   ASSERT_TRUE(traced_buffers.Size() == 0);
140   // Check that the callback was executed after first Shutdown.
141   ASSERT_EQ(verifier_called, 1);
142   verifier_called = 0;
143   traced_buffers.Shutdown(nullptr, absl::OkStatus());
144   ASSERT_TRUE(traced_buffers.Size() == 0);
145   // Second Shutdown should not execute the callback.
146   ASSERT_EQ(verifier_called, 0);
147   traced_buffers.ProcessTimestamp(&serr, nullptr, &tss);
148   // A ProcessTimestamp after Shutdown should not execute the callback.
149   ASSERT_EQ(verifier_called, 0);
150 }
151 
TEST(BufferListTest,TestLongPendingAckForOneTracedBuffer)152 TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) {
153   constexpr int kMaxPendingAckMillis = 10000;
154   struct sock_extended_err serr[3];
155   gpr_atm verifier_called[3];
156   struct scm_timestamping tss;
157   TracedBufferList tb_list;
158   serr[0].ee_data = 1;
159   serr[0].ee_info = SCM_TSTAMP_SCHED;
160   serr[1].ee_data = 1;
161   serr[1].ee_info = SCM_TSTAMP_SND;
162   serr[2].ee_data = 1;
163   serr[2].ee_info = SCM_TSTAMP_ACK;
164   gpr_atm_rel_store(&verifier_called[0], static_cast<gpr_atm>(0));
165   gpr_atm_rel_store(&verifier_called[1], static_cast<gpr_atm>(0));
166   gpr_atm_rel_store(&verifier_called[2], static_cast<gpr_atm>(0));
167 
168   //  Add 3 traced buffers
169   tb_list.AddNewEntry(1, 0, &verifier_called[0]);
170   tb_list.AddNewEntry(2, 0, &verifier_called[1]);
171   tb_list.AddNewEntry(3, 0, &verifier_called[2]);
172 
173   AdvanceClockMillis(kMaxPendingAckMillis);
174   tss.ts[0].tv_sec = g_now.tv_sec;
175   tss.ts[0].tv_nsec = g_now.tv_nsec;
176 
177   // Process SCHED Timestamp for 1st traced buffer.
178   // Nothing should be flushed.
179   TcpSetWriteTimestampsCallback(
180       [](void*, Timestamps*, absl::Status) { ASSERT_TRUE(false); });
181   tb_list.ProcessTimestamp(&serr[0], nullptr, &tss);
182   ASSERT_EQ(tb_list.Size(), 3);
183   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
184   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(0));
185   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(0));
186 
187   AdvanceClockMillis(kMaxPendingAckMillis);
188   tss.ts[0].tv_sec = g_now.tv_sec;
189   tss.ts[0].tv_nsec = g_now.tv_nsec;
190 
191   // Process SND Timestamp for 1st traced buffer. The second and third traced
192   // buffers must be flushed because the max pending ack time would have
193   // elapsed for them.
194   TcpSetWriteTimestampsCallback([](void* arg, Timestamps*, absl::Status error) {
195     ASSERT_EQ(error, absl::DeadlineExceededError("Ack timed out"));
196     ASSERT_NE(arg, nullptr);
197     gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
198     gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
199   });
200   tb_list.ProcessTimestamp(&serr[1], nullptr, &tss);
201   ASSERT_EQ(tb_list.Size(), 1);
202   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
203   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(1));
204   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(1));
205 
206   AdvanceClockMillis(kMaxPendingAckMillis);
207   tss.ts[0].tv_sec = g_now.tv_sec;
208   tss.ts[0].tv_nsec = g_now.tv_nsec;
209 
210   // Process ACK Timestamp for 1st traced buffer.
211   TcpSetWriteTimestampsCallback(
212       [](void* arg, Timestamps* ts, absl::Status error) {
213         ASSERT_TRUE(error.ok());
214         ASSERT_NE(arg, nullptr);
215         ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
216         ASSERT_EQ(ts->acked_time.time.tv_sec, g_now.tv_sec);
217         ASSERT_EQ(ts->acked_time.time.tv_nsec, g_now.tv_nsec);
218         ASSERT_GT(ts->info.length, 0);
219         gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
220         gpr_atm_rel_store(done, static_cast<gpr_atm>(2));
221       });
222   tb_list.ProcessTimestamp(&serr[2], nullptr, &tss);
223   ASSERT_EQ(tb_list.Size(), 0);
224   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(2));
225   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(1));
226   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(1));
227 
228   tb_list.Shutdown(nullptr, absl::OkStatus());
229 }
230 
TEST(BufferListTest,TestLongPendingAckForSomeTracedBuffers)231 TEST(BufferListTest, TestLongPendingAckForSomeTracedBuffers) {
232   constexpr int kNumTracedBuffers = 10;
233   constexpr int kMaxPendingAckMillis = 10000;
234   struct sock_extended_err serr[kNumTracedBuffers];
235   gpr_atm verifier_called[kNumTracedBuffers];
236   struct scm_timestamping tss;
237   tss.ts[0].tv_sec = 123;
238   tss.ts[0].tv_nsec = 456;
239   TcpSetWriteTimestampsCallback(
240       [](void* arg, Timestamps* ts, absl::Status status) {
241         ASSERT_NE(arg, nullptr);
242         if (status.ok()) {
243           ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
244           ASSERT_EQ(ts->acked_time.time.tv_sec, 123);
245           ASSERT_EQ(ts->acked_time.time.tv_nsec, 456);
246           ASSERT_GT(ts->info.length, 0);
247           *(reinterpret_cast<int*>(arg)) = 1;
248         } else if (status == absl::DeadlineExceededError("Ack timed out")) {
249           *(reinterpret_cast<int*>(arg)) = 2;
250         } else {
251           ASSERT_TRUE(false);
252         }
253       });
254   TracedBufferList tb_list;
255   for (int i = 0; i < kNumTracedBuffers; i++) {
256     serr[i].ee_data = i + 1;
257     serr[i].ee_info = SCM_TSTAMP_ACK;
258     gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0));
259     tb_list.AddNewEntry(i + 1, 0, &verifier_called[i]);
260   }
261   int elapsed_time_millis = 0;
262   int increment_millis = (2 * kMaxPendingAckMillis) / 10;
263   for (int i = 0; i < kNumTracedBuffers; i++) {
264     AdvanceClockMillis(increment_millis);
265     elapsed_time_millis += increment_millis;
266     tb_list.ProcessTimestamp(&serr[i], nullptr, &tss);
267     if (elapsed_time_millis > kMaxPendingAckMillis) {
268       // MaxPendingAckMillis has elapsed. the rest of tb_list must have been
269       // flushed now.
270       ASSERT_EQ(tb_list.Size(), 0);
271       if (elapsed_time_millis - kMaxPendingAckMillis == increment_millis) {
272         // The first ProcessTimestamp just after kMaxPendingAckMillis would have
273         // still successfully processed the head traced buffer entry and then
274         // discarded all the other remaining traced buffer entries. The first
275         // traced buffer entry would have been processed because the ACK
276         // timestamp was received for it.
277         ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]),
278                   static_cast<gpr_atm>(1));
279       } else {
280         ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]),
281                   static_cast<gpr_atm>(2));
282       }
283     } else {
284       ASSERT_EQ(tb_list.Size(), kNumTracedBuffers - (i + 1));
285       ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]), static_cast<gpr_atm>(1));
286     }
287   }
288   tb_list.Shutdown(nullptr, absl::OkStatus());
289 }
290 
291 }  // namespace experimental
292 }  // namespace grpc_event_engine
293 
main(int argc,char ** argv)294 int main(int argc, char** argv) {
295   ::testing::InitGoogleTest(&argc, argv);
296   grpc_event_engine::experimental::InitGlobals();
297   return RUN_ALL_TESTS();
298 }
299 
300 #else  // GRPC_LINUX_ERRQUEUE
301 
main(int,char **)302 int main(int /*argc*/, char** /*argv*/) { return 0; }
303 
304 #endif  // GRPC_LINUX_ERRQUEUE
305