• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/event_engine/posix_engine/traced_buffer_list.h"
16 
17 #include <grpc/support/port_platform.h>
18 #include <grpc/support/time.h>
19 #include <stddef.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <time.h>
23 
24 #include <utility>
25 
26 #include "absl/functional/any_invocable.h"
27 #include "absl/log/log.h"
28 #include "src/core/lib/iomgr/port.h"
29 #include "src/core/util/sync.h"
30 
31 #ifdef GRPC_LINUX_ERRQUEUE
32 #include <linux/errqueue.h>  // IWYU pragma: keep
33 #include <linux/netlink.h>
34 #include <sys/socket.h>  // IWYU pragma: keep
35 
36 namespace grpc_event_engine {
37 namespace experimental {
38 
39 namespace {
40 // Fills gpr_timespec gts based on values from timespec ts.
FillGprFromTimestamp(gpr_timespec * gts,const struct timespec * ts)41 void FillGprFromTimestamp(gpr_timespec* gts, const struct timespec* ts) {
42   gts->tv_sec = ts->tv_sec;
43   gts->tv_nsec = static_cast<int32_t>(ts->tv_nsec);
44   gts->clock_type = GPR_CLOCK_REALTIME;
45 }
46 
DefaultTimestampsCallback(void *,Timestamps *,absl::Status)47 void DefaultTimestampsCallback(void* /*arg*/, Timestamps* /*ts*/,
48                                absl::Status /*shutdown_err*/) {
49   VLOG(2) << "Timestamps callback has not been registered";
50 }
51 
52 // The saved callback function that will be invoked when we get all the
53 // timestamps that we are going to get for a TracedBuffer.
54 absl::AnyInvocable<void(void*, Timestamps*, absl::Status)>
55     g_timestamps_callback =
__anonce34630c0202() 56         []() -> absl::AnyInvocable<void(void*, Timestamps*, absl::Status)> {
57   return DefaultTimestampsCallback;
58 }();
59 
60 // Used to extract individual opt stats from cmsg, so as to avoid troubles with
61 // unaligned reads.
62 template <typename T>
ReadUnaligned(const void * ptr)63 T ReadUnaligned(const void* ptr) {
64   T val;
65   memcpy(&val, ptr, sizeof(val));
66   return val;
67 }
68 
69 // Extracts opt stats from the tcp_info struct \a info to \a metrics
ExtractOptStatsFromTcpInfo(ConnectionMetrics * metrics,const tcp_info * info)70 void ExtractOptStatsFromTcpInfo(ConnectionMetrics* metrics,
71                                 const tcp_info* info) {
72   if (info == nullptr) {
73     return;
74   }
75   if (info->length > offsetof(tcp_info, tcpi_sndbuf_limited)) {
76     metrics->recurring_retrans = info->tcpi_retransmits;
77     metrics->is_delivery_rate_app_limited =
78         info->tcpi_delivery_rate_app_limited;
79     metrics->congestion_window = info->tcpi_snd_cwnd;
80     metrics->reordering = info->tcpi_reordering;
81     metrics->packet_retx = info->tcpi_total_retrans;
82     metrics->pacing_rate = info->tcpi_pacing_rate;
83     metrics->data_notsent = info->tcpi_notsent_bytes;
84     if (info->tcpi_min_rtt != UINT32_MAX) {
85       metrics->min_rtt = info->tcpi_min_rtt;
86     }
87     metrics->packet_sent = info->tcpi_data_segs_out;
88     metrics->delivery_rate = info->tcpi_delivery_rate;
89     metrics->busy_usec = info->tcpi_busy_time;
90     metrics->rwnd_limited_usec = info->tcpi_rwnd_limited;
91     metrics->sndbuf_limited_usec = info->tcpi_sndbuf_limited;
92   }
93   if (info->length > offsetof(tcp_info, tcpi_dsack_dups)) {
94     metrics->data_sent = info->tcpi_bytes_sent;
95     metrics->data_retx = info->tcpi_bytes_retrans;
96     metrics->packet_spurious_retx = info->tcpi_dsack_dups;
97   }
98 }
99 
100 // Extracts opt stats from the given control message \a opt_stats to the
101 // connection metrics \a metrics.
ExtractOptStatsFromCmsg(ConnectionMetrics * metrics,const cmsghdr * opt_stats)102 void ExtractOptStatsFromCmsg(ConnectionMetrics* metrics,
103                              const cmsghdr* opt_stats) {
104   if (opt_stats == nullptr) {
105     return;
106   }
107   const auto* data = CMSG_DATA(opt_stats);
108   constexpr int64_t cmsg_hdr_len = CMSG_ALIGN(sizeof(struct cmsghdr));
109   const int64_t len = opt_stats->cmsg_len - cmsg_hdr_len;
110   int64_t offset = 0;
111 
112   while (offset < len) {
113     const auto* attr = reinterpret_cast<const nlattr*>(data + offset);
114     const void* val = data + offset + NLA_HDRLEN;
115     switch (attr->nla_type) {
116       case TCP_NLA_BUSY: {
117         metrics->busy_usec = ReadUnaligned<uint64_t>(val);
118         break;
119       }
120       case TCP_NLA_RWND_LIMITED: {
121         metrics->rwnd_limited_usec = ReadUnaligned<uint64_t>(val);
122         break;
123       }
124       case TCP_NLA_SNDBUF_LIMITED: {
125         metrics->sndbuf_limited_usec = ReadUnaligned<uint64_t>(val);
126         break;
127       }
128       case TCP_NLA_PACING_RATE: {
129         metrics->pacing_rate = ReadUnaligned<uint64_t>(val);
130         break;
131       }
132       case TCP_NLA_DELIVERY_RATE: {
133         metrics->delivery_rate = ReadUnaligned<uint64_t>(val);
134         break;
135       }
136       case TCP_NLA_DELIVERY_RATE_APP_LMT: {
137         metrics->is_delivery_rate_app_limited = ReadUnaligned<uint8_t>(val);
138         break;
139       }
140       case TCP_NLA_SND_CWND: {
141         metrics->congestion_window = ReadUnaligned<uint32_t>(val);
142         break;
143       }
144       case TCP_NLA_MIN_RTT: {
145         metrics->min_rtt = ReadUnaligned<uint32_t>(val);
146         break;
147       }
148       case TCP_NLA_SRTT: {
149         metrics->srtt = ReadUnaligned<uint32_t>(val);
150         break;
151       }
152       case TCP_NLA_RECUR_RETRANS: {
153         metrics->recurring_retrans = ReadUnaligned<uint8_t>(val);
154         break;
155       }
156       case TCP_NLA_BYTES_SENT: {
157         metrics->data_sent = ReadUnaligned<uint64_t>(val);
158         break;
159       }
160       case TCP_NLA_DATA_SEGS_OUT: {
161         metrics->packet_sent = ReadUnaligned<uint64_t>(val);
162         break;
163       }
164       case TCP_NLA_TOTAL_RETRANS: {
165         metrics->packet_retx = ReadUnaligned<uint64_t>(val);
166         break;
167       }
168       case TCP_NLA_DELIVERED: {
169         metrics->packet_delivered = ReadUnaligned<uint32_t>(val);
170         break;
171       }
172       case TCP_NLA_DELIVERED_CE: {
173         metrics->packet_delivered_ce = ReadUnaligned<uint32_t>(val);
174         break;
175       }
176       case TCP_NLA_BYTES_RETRANS: {
177         metrics->data_retx = ReadUnaligned<uint64_t>(val);
178         break;
179       }
180       case TCP_NLA_DSACK_DUPS: {
181         metrics->packet_spurious_retx = ReadUnaligned<uint32_t>(val);
182         break;
183       }
184       case TCP_NLA_REORDERING: {
185         metrics->reordering = ReadUnaligned<uint32_t>(val);
186         break;
187       }
188       case TCP_NLA_SND_SSTHRESH: {
189         metrics->snd_ssthresh = ReadUnaligned<uint32_t>(val);
190         break;
191       }
192     }
193     offset += NLA_ALIGN(attr->nla_len);
194   }
195 }
196 }  // namespace.
197 
Finished(gpr_timespec ts)198 bool TracedBufferList::TracedBuffer::Finished(gpr_timespec ts) {
199   constexpr int kGrpcMaxPendingAckTimeMillis = 10000;
200   return gpr_time_to_millis(gpr_time_sub(ts, last_timestamp_)) >
201          kGrpcMaxPendingAckTimeMillis;
202 }
203 
AddNewEntry(int32_t seq_no,int fd,void * arg)204 void TracedBufferList::AddNewEntry(int32_t seq_no, int fd, void* arg) {
205   TracedBuffer* new_elem = new TracedBuffer(seq_no, arg);
206   // Store the current time as the sendmsg time.
207   new_elem->ts_.sendmsg_time.time = gpr_now(GPR_CLOCK_REALTIME);
208   new_elem->ts_.scheduled_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
209   new_elem->ts_.sent_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
210   new_elem->ts_.acked_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
211   if (GetSocketTcpInfo(&(new_elem->ts_.info), fd) == 0) {
212     ExtractOptStatsFromTcpInfo(&(new_elem->ts_.sendmsg_time.metrics),
213                                &(new_elem->ts_.info));
214   }
215   new_elem->last_timestamp_ = new_elem->ts_.sendmsg_time.time;
216   grpc_core::MutexLock lock(&mu_);
217   if (!head_) {
218     head_ = tail_ = new_elem;
219   } else {
220     tail_->next_ = new_elem;
221     tail_ = new_elem;
222   }
223 }
224 
ProcessTimestamp(struct sock_extended_err * serr,struct cmsghdr * opt_stats,struct scm_timestamping * tss)225 void TracedBufferList::ProcessTimestamp(struct sock_extended_err* serr,
226                                         struct cmsghdr* opt_stats,
227                                         struct scm_timestamping* tss) {
228   grpc_core::MutexLock lock(&mu_);
229   TracedBuffer* elem = head_;
230   TracedBuffer* prev = nullptr;
231   while (elem != nullptr) {
232     // The byte number refers to the sequence number of the last byte which this
233     // timestamp relates to.
234     if (serr->ee_data >= elem->seq_no_) {
235       switch (serr->ee_info) {
236         case SCM_TSTAMP_SCHED:
237           FillGprFromTimestamp(&(elem->ts_.scheduled_time.time), &(tss->ts[0]));
238           ExtractOptStatsFromCmsg(&(elem->ts_.scheduled_time.metrics),
239                                   opt_stats);
240           elem->last_timestamp_ = elem->ts_.scheduled_time.time;
241           elem = elem->next_;
242           break;
243         case SCM_TSTAMP_SND:
244           FillGprFromTimestamp(&(elem->ts_.sent_time.time), &(tss->ts[0]));
245           ExtractOptStatsFromCmsg(&(elem->ts_.sent_time.metrics), opt_stats);
246           elem->last_timestamp_ = elem->ts_.sent_time.time;
247           elem = elem->next_;
248           break;
249         case SCM_TSTAMP_ACK:
250           FillGprFromTimestamp(&(elem->ts_.acked_time.time), &(tss->ts[0]));
251           ExtractOptStatsFromCmsg(&(elem->ts_.acked_time.metrics), opt_stats);
252           // Got all timestamps. Do the callback and free this TracedBuffer. The
253           // thing below can be passed by value if we don't want the restriction
254           // on the lifetime.
255           g_timestamps_callback(elem->arg_, &(elem->ts_), absl::OkStatus());
256           // Safe to update head_ to elem->next_ because the list is ordered by
257           // seq_no. Thus if elem is to be deleted, it has to be the first
258           // element in the list.
259           head_ = elem->next_;
260           delete elem;
261           elem = head_;
262           break;
263         default:
264           abort();
265       }
266     } else {
267       break;
268     }
269   }
270 
271   elem = head_;
272   gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
273   while (elem != nullptr) {
274     if (!elem->Finished(now)) {
275       prev = elem;
276       elem = elem->next_;
277       continue;
278     }
279     g_timestamps_callback(elem->arg_, &(elem->ts_),
280                           absl::DeadlineExceededError("Ack timed out"));
281     if (prev != nullptr) {
282       prev->next_ = elem->next_;
283       delete elem;
284       elem = prev->next_;
285     } else {
286       head_ = elem->next_;
287       delete elem;
288       elem = head_;
289     }
290   }
291   tail_ = (head_ == nullptr) ? head_ : prev;
292 }
293 
Shutdown(void * remaining,absl::Status shutdown_err)294 void TracedBufferList::Shutdown(void* remaining, absl::Status shutdown_err) {
295   grpc_core::MutexLock lock(&mu_);
296   while (head_) {
297     TracedBuffer* elem = head_;
298     g_timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err);
299     head_ = head_->next_;
300     delete elem;
301   }
302   if (remaining != nullptr) {
303     g_timestamps_callback(remaining, nullptr, shutdown_err);
304   }
305   tail_ = head_;
306 }
307 
TcpSetWriteTimestampsCallback(absl::AnyInvocable<void (void *,Timestamps *,absl::Status)> fn)308 void TcpSetWriteTimestampsCallback(
309     absl::AnyInvocable<void(void*, Timestamps*, absl::Status)> fn) {
310   g_timestamps_callback = std::move(fn);
311 }
312 
313 }  // namespace experimental
314 }  // namespace grpc_event_engine
315 
316 #else  // GRPC_LINUX_ERRQUEUE
317 
318 #include "src/core/util/crash.h"
319 
320 namespace grpc_event_engine {
321 namespace experimental {
322 
TcpSetWriteTimestampsCallback(absl::AnyInvocable<void (void *,Timestamps *,absl::Status)>)323 void TcpSetWriteTimestampsCallback(
324     absl::AnyInvocable<void(void*, Timestamps*, absl::Status)> /*fn*/) {
325   grpc_core::Crash("Timestamps callback is not enabled for this platform");
326 }
327 
328 }  // namespace experimental
329 }  // namespace grpc_event_engine
330 
331 #endif  // GRPC_LINUX_ERRQUEUE
332