• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/event_engine/posix_engine/traced_buffer_list.h"
16 
17 #include <time.h>
18 
19 #include <memory>
20 
21 #include "gtest/gtest.h"
22 
23 #include <grpc/support/atm.h>
24 #include <grpc/support/log.h>
25 #include <grpc/support/time.h>
26 
27 #include "src/core/lib/gpr/useful.h"
28 #include "src/core/lib/gprpp/time.h"
29 #include "src/core/lib/iomgr/exec_ctx.h"
30 #include "src/core/lib/iomgr/port.h"
31 
32 #ifdef GRPC_LINUX_ERRQUEUE
33 
34 #include <linux/errqueue.h>
35 
36 #define NUM_ELEM 5
37 
38 extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
39 
40 namespace grpc_event_engine {
41 namespace experimental {
42 namespace {
43 
44 constexpr uint64_t kMaxAdvanceTimeMillis = 24ull * 365 * 3600 * 1000;
45 
46 gpr_timespec g_now;
now_impl(gpr_clock_type clock_type)47 gpr_timespec now_impl(gpr_clock_type clock_type) {
48   GPR_ASSERT(clock_type != GPR_TIMESPAN);
49   gpr_timespec ts = g_now;
50   ts.clock_type = clock_type;
51   return ts;
52 }
53 
InitGlobals()54 void InitGlobals() {
55   g_now = {1, 0, GPR_CLOCK_MONOTONIC};
56   grpc_core::TestOnlySetProcessEpoch(g_now);
57   gpr_now_impl = now_impl;
58 }
59 
AdvanceClockMillis(uint64_t millis)60 void AdvanceClockMillis(uint64_t millis) {
61   grpc_core::ExecCtx exec_ctx;
62   g_now = gpr_time_add(
63       g_now,
64       gpr_time_from_millis(grpc_core::Clamp(millis, static_cast<uint64_t>(1),
65                                             kMaxAdvanceTimeMillis),
66                            GPR_TIMESPAN));
67   exec_ctx.InvalidateNow();
68 }
69 
TestShutdownFlushesListVerifier(void * arg,Timestamps *,absl::Status status)70 void TestShutdownFlushesListVerifier(void* arg, Timestamps* /*ts*/,
71                                      absl::Status status) {
72   ASSERT_TRUE(status.ok());
73   ASSERT_NE(arg, nullptr);
74   int* done = reinterpret_cast<int*>(arg);
75   *done = 1;
76 }
77 
78 }  // namespace
79 
80 // Tests that all TracedBuffer elements in the list are flushed out on shutdown.
81 // Also tests that arg is passed correctly.
TEST(BufferListTest,TestShutdownFlushesList)82 TEST(BufferListTest, TestShutdownFlushesList) {
83   TcpSetWriteTimestampsCallback(TestShutdownFlushesListVerifier);
84   TracedBufferList traced_buffers;
85   int verifier_called[NUM_ELEM];
86   for (auto i = 0; i < NUM_ELEM; i++) {
87     verifier_called[i] = 0;
88     traced_buffers.AddNewEntry(i, 0, static_cast<void*>(&verifier_called[i]));
89   }
90   traced_buffers.Shutdown(nullptr, absl::OkStatus());
91   for (auto i = 0; i < NUM_ELEM; i++) {
92     ASSERT_EQ(verifier_called[i], 1);
93   }
94   ASSERT_TRUE(traced_buffers.Size() == 0);
95 }
96 
97 // Tests that the timestamp verifier is called on an ACK timestamp.
TEST(BufferListTest,TestVerifierCalledOnAck)98 TEST(BufferListTest, TestVerifierCalledOnAck) {
99   struct sock_extended_err serr;
100   serr.ee_data = 213;
101   serr.ee_info = SCM_TSTAMP_ACK;
102   struct scm_timestamping tss;
103   tss.ts[0].tv_sec = 123;
104   tss.ts[0].tv_nsec = 456;
105   TcpSetWriteTimestampsCallback(
106       [](void* arg, Timestamps* ts, absl::Status status) {
107         ASSERT_TRUE(status.ok());
108         ASSERT_NE(arg, nullptr);
109         ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
110         ASSERT_EQ(ts->acked_time.time.tv_sec, 123);
111         ASSERT_EQ(ts->acked_time.time.tv_nsec, 456);
112         ASSERT_GT(ts->info.length, 0);
113         int* done = reinterpret_cast<int*>(arg);
114         *done = 1;
115       });
116   TracedBufferList traced_buffers;
117   int verifier_called = 0;
118   traced_buffers.AddNewEntry(213, 0, &verifier_called);
119   traced_buffers.ProcessTimestamp(&serr, nullptr, &tss);
120   ASSERT_EQ(verifier_called, 1);
121   ASSERT_TRUE(traced_buffers.Size() == 0);
122   traced_buffers.Shutdown(nullptr, absl::OkStatus());
123   ASSERT_TRUE(traced_buffers.Size() == 0);
124 }
125 
126 // Tests that ProcessTimestamp called after Shutdown does nothing.
TEST(BufferListTest,TestProcessTimestampAfterShutdown)127 TEST(BufferListTest, TestProcessTimestampAfterShutdown) {
128   struct sock_extended_err serr;
129   serr.ee_data = 213;
130   serr.ee_info = SCM_TSTAMP_ACK;
131   struct scm_timestamping tss;
132   tss.ts[0].tv_sec = 123;
133   tss.ts[0].tv_nsec = 456;
134   TcpSetWriteTimestampsCallback(TestShutdownFlushesListVerifier);
135   TracedBufferList traced_buffers;
136   int verifier_called = 0;
137 
138   traced_buffers.AddNewEntry(213, 0, &verifier_called);
139   ASSERT_TRUE(traced_buffers.Size() == 1);
140   traced_buffers.Shutdown(nullptr, absl::OkStatus());
141   ASSERT_TRUE(traced_buffers.Size() == 0);
142   // Check that the callback was executed after first Shutdown.
143   ASSERT_EQ(verifier_called, 1);
144   verifier_called = 0;
145   traced_buffers.Shutdown(nullptr, absl::OkStatus());
146   ASSERT_TRUE(traced_buffers.Size() == 0);
147   // Second Shutdown should not execute the callback.
148   ASSERT_EQ(verifier_called, 0);
149   traced_buffers.ProcessTimestamp(&serr, nullptr, &tss);
150   // A ProcessTimestamp after Shutdown should not execute the callback.
151   ASSERT_EQ(verifier_called, 0);
152 }
153 
TEST(BufferListTest,TestLongPendingAckForOneTracedBuffer)154 TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) {
155   constexpr int kMaxPendingAckMillis = 10000;
156   struct sock_extended_err serr[3];
157   gpr_atm verifier_called[3];
158   struct scm_timestamping tss;
159   TracedBufferList tb_list;
160   serr[0].ee_data = 1;
161   serr[0].ee_info = SCM_TSTAMP_SCHED;
162   serr[1].ee_data = 1;
163   serr[1].ee_info = SCM_TSTAMP_SND;
164   serr[2].ee_data = 1;
165   serr[2].ee_info = SCM_TSTAMP_ACK;
166   gpr_atm_rel_store(&verifier_called[0], static_cast<gpr_atm>(0));
167   gpr_atm_rel_store(&verifier_called[1], static_cast<gpr_atm>(0));
168   gpr_atm_rel_store(&verifier_called[2], static_cast<gpr_atm>(0));
169 
170   //  Add 3 traced buffers
171   tb_list.AddNewEntry(1, 0, &verifier_called[0]);
172   tb_list.AddNewEntry(2, 0, &verifier_called[1]);
173   tb_list.AddNewEntry(3, 0, &verifier_called[2]);
174 
175   AdvanceClockMillis(kMaxPendingAckMillis);
176   tss.ts[0].tv_sec = g_now.tv_sec;
177   tss.ts[0].tv_nsec = g_now.tv_nsec;
178 
179   // Process SCHED Timestamp for 1st traced buffer.
180   // Nothing should be flushed.
181   TcpSetWriteTimestampsCallback(
182       [](void*, Timestamps*, absl::Status) { ASSERT_TRUE(false); });
183   tb_list.ProcessTimestamp(&serr[0], nullptr, &tss);
184   ASSERT_EQ(tb_list.Size(), 3);
185   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
186   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(0));
187   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(0));
188 
189   AdvanceClockMillis(kMaxPendingAckMillis);
190   tss.ts[0].tv_sec = g_now.tv_sec;
191   tss.ts[0].tv_nsec = g_now.tv_nsec;
192 
193   // Process SND Timestamp for 1st traced buffer. The second and third traced
194   // buffers must be flushed because the max pending ack time would have
195   // elapsed for them.
196   TcpSetWriteTimestampsCallback([](void* arg, Timestamps*, absl::Status error) {
197     ASSERT_EQ(error, absl::DeadlineExceededError("Ack timed out"));
198     ASSERT_NE(arg, nullptr);
199     gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
200     gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
201   });
202   tb_list.ProcessTimestamp(&serr[1], nullptr, &tss);
203   ASSERT_EQ(tb_list.Size(), 1);
204   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
205   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(1));
206   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(1));
207 
208   AdvanceClockMillis(kMaxPendingAckMillis);
209   tss.ts[0].tv_sec = g_now.tv_sec;
210   tss.ts[0].tv_nsec = g_now.tv_nsec;
211 
212   // Process ACK Timestamp for 1st traced buffer.
213   TcpSetWriteTimestampsCallback(
214       [](void* arg, Timestamps* ts, absl::Status error) {
215         ASSERT_TRUE(error.ok());
216         ASSERT_NE(arg, nullptr);
217         ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
218         ASSERT_EQ(ts->acked_time.time.tv_sec, g_now.tv_sec);
219         ASSERT_EQ(ts->acked_time.time.tv_nsec, g_now.tv_nsec);
220         ASSERT_GT(ts->info.length, 0);
221         gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
222         gpr_atm_rel_store(done, static_cast<gpr_atm>(2));
223       });
224   tb_list.ProcessTimestamp(&serr[2], nullptr, &tss);
225   ASSERT_EQ(tb_list.Size(), 0);
226   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(2));
227   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(1));
228   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(1));
229 
230   tb_list.Shutdown(nullptr, absl::OkStatus());
231 }
232 
TEST(BufferListTest,TestLongPendingAckForSomeTracedBuffers)233 TEST(BufferListTest, TestLongPendingAckForSomeTracedBuffers) {
234   constexpr int kNumTracedBuffers = 10;
235   constexpr int kMaxPendingAckMillis = 10000;
236   struct sock_extended_err serr[kNumTracedBuffers];
237   gpr_atm verifier_called[kNumTracedBuffers];
238   struct scm_timestamping tss;
239   tss.ts[0].tv_sec = 123;
240   tss.ts[0].tv_nsec = 456;
241   TcpSetWriteTimestampsCallback(
242       [](void* arg, Timestamps* ts, absl::Status status) {
243         ASSERT_NE(arg, nullptr);
244         if (status.ok()) {
245           ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
246           ASSERT_EQ(ts->acked_time.time.tv_sec, 123);
247           ASSERT_EQ(ts->acked_time.time.tv_nsec, 456);
248           ASSERT_GT(ts->info.length, 0);
249           *(reinterpret_cast<int*>(arg)) = 1;
250         } else if (status == absl::DeadlineExceededError("Ack timed out")) {
251           *(reinterpret_cast<int*>(arg)) = 2;
252         } else {
253           ASSERT_TRUE(false);
254         }
255       });
256   TracedBufferList tb_list;
257   for (int i = 0; i < kNumTracedBuffers; i++) {
258     serr[i].ee_data = i + 1;
259     serr[i].ee_info = SCM_TSTAMP_ACK;
260     gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0));
261     tb_list.AddNewEntry(i + 1, 0, &verifier_called[i]);
262   }
263   int elapsed_time_millis = 0;
264   int increment_millis = (2 * kMaxPendingAckMillis) / 10;
265   for (int i = 0; i < kNumTracedBuffers; i++) {
266     AdvanceClockMillis(increment_millis);
267     elapsed_time_millis += increment_millis;
268     tb_list.ProcessTimestamp(&serr[i], nullptr, &tss);
269     if (elapsed_time_millis > kMaxPendingAckMillis) {
270       // MaxPendingAckMillis has elapsed. the rest of tb_list must have been
271       // flushed now.
272       ASSERT_EQ(tb_list.Size(), 0);
273       if (elapsed_time_millis - kMaxPendingAckMillis == increment_millis) {
274         // The first ProcessTimestamp just after kMaxPendingAckMillis would have
275         // still successfully processed the head traced buffer entry and then
276         // discarded all the other remaining traced buffer entries. The first
277         // traced buffer entry would have been processed because the ACK
278         // timestamp was received for it.
279         ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]),
280                   static_cast<gpr_atm>(1));
281       } else {
282         ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]),
283                   static_cast<gpr_atm>(2));
284       }
285     } else {
286       ASSERT_EQ(tb_list.Size(), kNumTracedBuffers - (i + 1));
287       ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]), static_cast<gpr_atm>(1));
288     }
289   }
290   tb_list.Shutdown(nullptr, absl::OkStatus());
291 }
292 
293 }  // namespace experimental
294 }  // namespace grpc_event_engine
295 
main(int argc,char ** argv)296 int main(int argc, char** argv) {
297   ::testing::InitGoogleTest(&argc, argv);
298   grpc_event_engine::experimental::InitGlobals();
299   return RUN_ALL_TESTS();
300 }
301 
302 #else  // GRPC_LINUX_ERRQUEUE
303 
main(int,char **)304 int main(int /*argc*/, char** /*argv*/) { return 0; }
305 
306 #endif  // GRPC_LINUX_ERRQUEUE
307