1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/event_engine/posix_engine/traced_buffer_list.h"
16
17 #include <grpc/support/port_platform.h>
18 #include <grpc/support/time.h>
19 #include <stddef.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <time.h>
23
24 #include <utility>
25
26 #include "absl/functional/any_invocable.h"
27 #include "absl/log/log.h"
28 #include "src/core/lib/iomgr/port.h"
29 #include "src/core/util/sync.h"
30
31 #ifdef GRPC_LINUX_ERRQUEUE
32 #include <linux/errqueue.h> // IWYU pragma: keep
33 #include <linux/netlink.h>
34 #include <sys/socket.h> // IWYU pragma: keep
35
36 namespace grpc_event_engine {
37 namespace experimental {
38
39 namespace {
40 // Fills gpr_timespec gts based on values from timespec ts.
FillGprFromTimestamp(gpr_timespec * gts,const struct timespec * ts)41 void FillGprFromTimestamp(gpr_timespec* gts, const struct timespec* ts) {
42 gts->tv_sec = ts->tv_sec;
43 gts->tv_nsec = static_cast<int32_t>(ts->tv_nsec);
44 gts->clock_type = GPR_CLOCK_REALTIME;
45 }
46
DefaultTimestampsCallback(void *,Timestamps *,absl::Status)47 void DefaultTimestampsCallback(void* /*arg*/, Timestamps* /*ts*/,
48 absl::Status /*shutdown_err*/) {
49 VLOG(2) << "Timestamps callback has not been registered";
50 }
51
52 // The saved callback function that will be invoked when we get all the
53 // timestamps that we are going to get for a TracedBuffer.
54 absl::AnyInvocable<void(void*, Timestamps*, absl::Status)>
55 g_timestamps_callback =
__anonce34630c0202() 56 []() -> absl::AnyInvocable<void(void*, Timestamps*, absl::Status)> {
57 return DefaultTimestampsCallback;
58 }();
59
60 // Used to extract individual opt stats from cmsg, so as to avoid troubles with
61 // unaligned reads.
62 template <typename T>
ReadUnaligned(const void * ptr)63 T ReadUnaligned(const void* ptr) {
64 T val;
65 memcpy(&val, ptr, sizeof(val));
66 return val;
67 }
68
69 // Extracts opt stats from the tcp_info struct \a info to \a metrics
ExtractOptStatsFromTcpInfo(ConnectionMetrics * metrics,const tcp_info * info)70 void ExtractOptStatsFromTcpInfo(ConnectionMetrics* metrics,
71 const tcp_info* info) {
72 if (info == nullptr) {
73 return;
74 }
75 if (info->length > offsetof(tcp_info, tcpi_sndbuf_limited)) {
76 metrics->recurring_retrans = info->tcpi_retransmits;
77 metrics->is_delivery_rate_app_limited =
78 info->tcpi_delivery_rate_app_limited;
79 metrics->congestion_window = info->tcpi_snd_cwnd;
80 metrics->reordering = info->tcpi_reordering;
81 metrics->packet_retx = info->tcpi_total_retrans;
82 metrics->pacing_rate = info->tcpi_pacing_rate;
83 metrics->data_notsent = info->tcpi_notsent_bytes;
84 if (info->tcpi_min_rtt != UINT32_MAX) {
85 metrics->min_rtt = info->tcpi_min_rtt;
86 }
87 metrics->packet_sent = info->tcpi_data_segs_out;
88 metrics->delivery_rate = info->tcpi_delivery_rate;
89 metrics->busy_usec = info->tcpi_busy_time;
90 metrics->rwnd_limited_usec = info->tcpi_rwnd_limited;
91 metrics->sndbuf_limited_usec = info->tcpi_sndbuf_limited;
92 }
93 if (info->length > offsetof(tcp_info, tcpi_dsack_dups)) {
94 metrics->data_sent = info->tcpi_bytes_sent;
95 metrics->data_retx = info->tcpi_bytes_retrans;
96 metrics->packet_spurious_retx = info->tcpi_dsack_dups;
97 }
98 }
99
100 // Extracts opt stats from the given control message \a opt_stats to the
101 // connection metrics \a metrics.
ExtractOptStatsFromCmsg(ConnectionMetrics * metrics,const cmsghdr * opt_stats)102 void ExtractOptStatsFromCmsg(ConnectionMetrics* metrics,
103 const cmsghdr* opt_stats) {
104 if (opt_stats == nullptr) {
105 return;
106 }
107 const auto* data = CMSG_DATA(opt_stats);
108 constexpr int64_t cmsg_hdr_len = CMSG_ALIGN(sizeof(struct cmsghdr));
109 const int64_t len = opt_stats->cmsg_len - cmsg_hdr_len;
110 int64_t offset = 0;
111
112 while (offset < len) {
113 const auto* attr = reinterpret_cast<const nlattr*>(data + offset);
114 const void* val = data + offset + NLA_HDRLEN;
115 switch (attr->nla_type) {
116 case TCP_NLA_BUSY: {
117 metrics->busy_usec = ReadUnaligned<uint64_t>(val);
118 break;
119 }
120 case TCP_NLA_RWND_LIMITED: {
121 metrics->rwnd_limited_usec = ReadUnaligned<uint64_t>(val);
122 break;
123 }
124 case TCP_NLA_SNDBUF_LIMITED: {
125 metrics->sndbuf_limited_usec = ReadUnaligned<uint64_t>(val);
126 break;
127 }
128 case TCP_NLA_PACING_RATE: {
129 metrics->pacing_rate = ReadUnaligned<uint64_t>(val);
130 break;
131 }
132 case TCP_NLA_DELIVERY_RATE: {
133 metrics->delivery_rate = ReadUnaligned<uint64_t>(val);
134 break;
135 }
136 case TCP_NLA_DELIVERY_RATE_APP_LMT: {
137 metrics->is_delivery_rate_app_limited = ReadUnaligned<uint8_t>(val);
138 break;
139 }
140 case TCP_NLA_SND_CWND: {
141 metrics->congestion_window = ReadUnaligned<uint32_t>(val);
142 break;
143 }
144 case TCP_NLA_MIN_RTT: {
145 metrics->min_rtt = ReadUnaligned<uint32_t>(val);
146 break;
147 }
148 case TCP_NLA_SRTT: {
149 metrics->srtt = ReadUnaligned<uint32_t>(val);
150 break;
151 }
152 case TCP_NLA_RECUR_RETRANS: {
153 metrics->recurring_retrans = ReadUnaligned<uint8_t>(val);
154 break;
155 }
156 case TCP_NLA_BYTES_SENT: {
157 metrics->data_sent = ReadUnaligned<uint64_t>(val);
158 break;
159 }
160 case TCP_NLA_DATA_SEGS_OUT: {
161 metrics->packet_sent = ReadUnaligned<uint64_t>(val);
162 break;
163 }
164 case TCP_NLA_TOTAL_RETRANS: {
165 metrics->packet_retx = ReadUnaligned<uint64_t>(val);
166 break;
167 }
168 case TCP_NLA_DELIVERED: {
169 metrics->packet_delivered = ReadUnaligned<uint32_t>(val);
170 break;
171 }
172 case TCP_NLA_DELIVERED_CE: {
173 metrics->packet_delivered_ce = ReadUnaligned<uint32_t>(val);
174 break;
175 }
176 case TCP_NLA_BYTES_RETRANS: {
177 metrics->data_retx = ReadUnaligned<uint64_t>(val);
178 break;
179 }
180 case TCP_NLA_DSACK_DUPS: {
181 metrics->packet_spurious_retx = ReadUnaligned<uint32_t>(val);
182 break;
183 }
184 case TCP_NLA_REORDERING: {
185 metrics->reordering = ReadUnaligned<uint32_t>(val);
186 break;
187 }
188 case TCP_NLA_SND_SSTHRESH: {
189 metrics->snd_ssthresh = ReadUnaligned<uint32_t>(val);
190 break;
191 }
192 }
193 offset += NLA_ALIGN(attr->nla_len);
194 }
195 }
196 } // namespace.
197
Finished(gpr_timespec ts)198 bool TracedBufferList::TracedBuffer::Finished(gpr_timespec ts) {
199 constexpr int kGrpcMaxPendingAckTimeMillis = 10000;
200 return gpr_time_to_millis(gpr_time_sub(ts, last_timestamp_)) >
201 kGrpcMaxPendingAckTimeMillis;
202 }
203
AddNewEntry(int32_t seq_no,int fd,void * arg)204 void TracedBufferList::AddNewEntry(int32_t seq_no, int fd, void* arg) {
205 TracedBuffer* new_elem = new TracedBuffer(seq_no, arg);
206 // Store the current time as the sendmsg time.
207 new_elem->ts_.sendmsg_time.time = gpr_now(GPR_CLOCK_REALTIME);
208 new_elem->ts_.scheduled_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
209 new_elem->ts_.sent_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
210 new_elem->ts_.acked_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
211 if (GetSocketTcpInfo(&(new_elem->ts_.info), fd) == 0) {
212 ExtractOptStatsFromTcpInfo(&(new_elem->ts_.sendmsg_time.metrics),
213 &(new_elem->ts_.info));
214 }
215 new_elem->last_timestamp_ = new_elem->ts_.sendmsg_time.time;
216 grpc_core::MutexLock lock(&mu_);
217 if (!head_) {
218 head_ = tail_ = new_elem;
219 } else {
220 tail_->next_ = new_elem;
221 tail_ = new_elem;
222 }
223 }
224
ProcessTimestamp(struct sock_extended_err * serr,struct cmsghdr * opt_stats,struct scm_timestamping * tss)225 void TracedBufferList::ProcessTimestamp(struct sock_extended_err* serr,
226 struct cmsghdr* opt_stats,
227 struct scm_timestamping* tss) {
228 grpc_core::MutexLock lock(&mu_);
229 TracedBuffer* elem = head_;
230 TracedBuffer* prev = nullptr;
231 while (elem != nullptr) {
232 // The byte number refers to the sequence number of the last byte which this
233 // timestamp relates to.
234 if (serr->ee_data >= elem->seq_no_) {
235 switch (serr->ee_info) {
236 case SCM_TSTAMP_SCHED:
237 FillGprFromTimestamp(&(elem->ts_.scheduled_time.time), &(tss->ts[0]));
238 ExtractOptStatsFromCmsg(&(elem->ts_.scheduled_time.metrics),
239 opt_stats);
240 elem->last_timestamp_ = elem->ts_.scheduled_time.time;
241 elem = elem->next_;
242 break;
243 case SCM_TSTAMP_SND:
244 FillGprFromTimestamp(&(elem->ts_.sent_time.time), &(tss->ts[0]));
245 ExtractOptStatsFromCmsg(&(elem->ts_.sent_time.metrics), opt_stats);
246 elem->last_timestamp_ = elem->ts_.sent_time.time;
247 elem = elem->next_;
248 break;
249 case SCM_TSTAMP_ACK:
250 FillGprFromTimestamp(&(elem->ts_.acked_time.time), &(tss->ts[0]));
251 ExtractOptStatsFromCmsg(&(elem->ts_.acked_time.metrics), opt_stats);
252 // Got all timestamps. Do the callback and free this TracedBuffer. The
253 // thing below can be passed by value if we don't want the restriction
254 // on the lifetime.
255 g_timestamps_callback(elem->arg_, &(elem->ts_), absl::OkStatus());
256 // Safe to update head_ to elem->next_ because the list is ordered by
257 // seq_no. Thus if elem is to be deleted, it has to be the first
258 // element in the list.
259 head_ = elem->next_;
260 delete elem;
261 elem = head_;
262 break;
263 default:
264 abort();
265 }
266 } else {
267 break;
268 }
269 }
270
271 elem = head_;
272 gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
273 while (elem != nullptr) {
274 if (!elem->Finished(now)) {
275 prev = elem;
276 elem = elem->next_;
277 continue;
278 }
279 g_timestamps_callback(elem->arg_, &(elem->ts_),
280 absl::DeadlineExceededError("Ack timed out"));
281 if (prev != nullptr) {
282 prev->next_ = elem->next_;
283 delete elem;
284 elem = prev->next_;
285 } else {
286 head_ = elem->next_;
287 delete elem;
288 elem = head_;
289 }
290 }
291 tail_ = (head_ == nullptr) ? head_ : prev;
292 }
293
Shutdown(void * remaining,absl::Status shutdown_err)294 void TracedBufferList::Shutdown(void* remaining, absl::Status shutdown_err) {
295 grpc_core::MutexLock lock(&mu_);
296 while (head_) {
297 TracedBuffer* elem = head_;
298 g_timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err);
299 head_ = head_->next_;
300 delete elem;
301 }
302 if (remaining != nullptr) {
303 g_timestamps_callback(remaining, nullptr, shutdown_err);
304 }
305 tail_ = head_;
306 }
307
TcpSetWriteTimestampsCallback(absl::AnyInvocable<void (void *,Timestamps *,absl::Status)> fn)308 void TcpSetWriteTimestampsCallback(
309 absl::AnyInvocable<void(void*, Timestamps*, absl::Status)> fn) {
310 g_timestamps_callback = std::move(fn);
311 }
312
313 } // namespace experimental
314 } // namespace grpc_event_engine
315
316 #else // GRPC_LINUX_ERRQUEUE
317
318 #include "src/core/util/crash.h"
319
320 namespace grpc_event_engine {
321 namespace experimental {
322
TcpSetWriteTimestampsCallback(absl::AnyInvocable<void (void *,Timestamps *,absl::Status)>)323 void TcpSetWriteTimestampsCallback(
324 absl::AnyInvocable<void(void*, Timestamps*, absl::Status)> /*fn*/) {
325 grpc_core::Crash("Timestamps callback is not enabled for this platform");
326 }
327
328 } // namespace experimental
329 } // namespace grpc_event_engine
330
331 #endif // GRPC_LINUX_ERRQUEUE
332