• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TRACED_BUFFER_LIST_H
16 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TRACED_BUFFER_LIST_H
17 
18 #include <grpc/support/port_platform.h>
19 #include <grpc/support/time.h>
20 #include <stdint.h>
21 
22 #include "absl/functional/any_invocable.h"
23 #include "absl/status/status.h"
24 #include "absl/types/optional.h"
25 #include "src/core/lib/event_engine/posix_engine/internal_errqueue.h"
26 #include "src/core/lib/iomgr/port.h"
27 #include "src/core/util/sync.h"
28 
29 namespace grpc_event_engine {
30 namespace experimental {
31 
32 struct ConnectionMetrics {  // Delivery rate in Bytes/s.
33   absl::optional<uint64_t> delivery_rate;
34   // If the delivery rate is limited by the application, this is set to true.
35   absl::optional<bool> is_delivery_rate_app_limited;
36   // Total packets retransmitted.
37   absl::optional<uint32_t> packet_retx;
38   // Total packets retransmitted spuriously. This metric is smaller than or
39   // equal to packet_retx.
40   absl::optional<uint32_t> packet_spurious_retx;
41   // Total packets sent.
42   absl::optional<uint32_t> packet_sent;
43   // Total packets delivered.
44   absl::optional<uint32_t> packet_delivered;
45   // Total packets delivered with ECE marked. This metric is smaller than or
46   // equal to packet_delivered.
47   absl::optional<uint32_t> packet_delivered_ce;
48   // Total bytes lost so far.
49   absl::optional<uint64_t> data_retx;
50   // Total bytes sent so far.
51   absl::optional<uint64_t> data_sent;
52   // Total bytes in write queue but not sent.
53   absl::optional<uint64_t> data_notsent;
54   // Pacing rate of the connection in Bps
55   absl::optional<uint64_t> pacing_rate;
56   // Minimum RTT observed in usec.
57   absl::optional<uint32_t> min_rtt;
58   // Smoothed RTT in usec
59   absl::optional<uint32_t> srtt;
60   // Send congestion window.
61   absl::optional<uint32_t> congestion_window;
62   // Slow start threshold in packets.
63   absl::optional<uint32_t> snd_ssthresh;
64   // Maximum degree of reordering (i.e., maximum number of packets reodered)
65   // on the connection.
66   absl::optional<uint32_t> reordering;
67   // Represents the number of recurring retransmissions of the first sequence
68   // that is not acknowledged yet.
69   absl::optional<uint8_t> recurring_retrans;
70   // The cumulative time (in usec) that the transport protocol was busy
71   // sending data.
72   absl::optional<uint64_t> busy_usec;
73   // The cumulative time (in usec) that the transport protocol was limited by
74   // the receive window size.
75   absl::optional<uint64_t> rwnd_limited_usec;
76   // The cumulative time (in usec) that the transport protocol was limited by
77   // the send buffer size.
78   absl::optional<uint64_t> sndbuf_limited_usec;
79 };
80 
81 struct BufferTimestamp {
82   gpr_timespec time;
83   ConnectionMetrics metrics;  // Metrics collected with this timestamp
84 };
85 
86 struct Timestamps {
87   BufferTimestamp sendmsg_time;
88   BufferTimestamp scheduled_time;
89   BufferTimestamp sent_time;
90   BufferTimestamp acked_time;
91 
92   uint32_t byte_offset;  // byte offset relative to the start of the RPC
93 
94 #ifdef GRPC_LINUX_ERRQUEUE
95   tcp_info info;  // tcp_info collected on sendmsg
96 #endif            // GRPC_LINUX_ERRQUEUE
97 };
98 
99 // TracedBuffer is a class to keep track of timestamps for a specific buffer in
100 // the TCP layer. We are only tracking timestamps for Linux kernels and hence
101 // this class would only be used by Linux platforms. For all other platforms,
102 // TracedBuffer would be an empty class.
103 // The timestamps collected are according to Timestamps declared above A
104 // TracedBuffer list is kept track of using the head element of the list. If
105 // *the head element of the list is nullptr, then the list is empty.
106 #ifdef GRPC_LINUX_ERRQUEUE
107 
108 class TracedBufferList {
109  public:
110   TracedBufferList() = default;
111   ~TracedBufferList() = default;
112   // Add a new entry in the TracedBuffer list pointed to by head. Also saves
113   // sendmsg_time with the current timestamp.
114   void AddNewEntry(int32_t seq_no, int fd, void* arg);
115   // Processes a received timestamp based on sock_extended_err and
116   // scm_timestamping structures. It will invoke the timestamps callback if the
117   // timestamp type is SCM_TSTAMP_ACK.
118   void ProcessTimestamp(struct sock_extended_err* serr,
119                         struct cmsghdr* opt_stats,
120                         struct scm_timestamping* tss);
121   // The Size() operation is slow and is used only in tests.
Size()122   int Size() {
123     grpc_core::MutexLock lock(&mu_);
124     int size = 0;
125     TracedBuffer* curr = head_;
126     while (curr) {
127       ++size;
128       curr = curr->next_;
129     }
130     return size;
131   }
132   // Cleans the list by calling the callback for each traced buffer in the list
133   // with timestamps that it has.
134   void Shutdown(void* /*remaining*/, absl::Status /*shutdown_err*/);
135 
136  private:
137   class TracedBuffer {
138    public:
TracedBuffer(uint32_t seq_no,void * arg)139     TracedBuffer(uint32_t seq_no, void* arg) : seq_no_(seq_no), arg_(arg) {}
140     // Returns true if the TracedBuffer is considered stale at the given
141     // timestamp.
142     bool Finished(gpr_timespec ts);
143 
144    private:
145     friend class TracedBufferList;
146     gpr_timespec last_timestamp_;
147     TracedBuffer* next_ = nullptr;
148     uint32_t seq_no_;  // The sequence number for the last byte in the buffer
149     void* arg_;        // The arg to pass to timestamps_callback
150     Timestamps ts_;    // The timestamps corresponding to this buffer
151   };
152   grpc_core::Mutex mu_;
153   // TracedBuffers are ordered by sequence number and would need to be processed
154   // in a FIFO order starting with the smallest sequence number. To enable this,
155   // they are stored in a singly linked with head and tail pointers which allows
156   // easy appends and forward iteration operations.
157   TracedBuffer* head_ = nullptr;
158   TracedBuffer* tail_ = nullptr;
159 };
160 
161 #else   // GRPC_LINUX_ERRQUEUE
162 // TracedBufferList implementation is a no-op for this platform.
163 class TracedBufferList {
164  public:
AddNewEntry(int32_t,int,void *)165   void AddNewEntry(int32_t /*seq_no*/, int /*fd*/, void* /*arg*/) {}
ProcessTimestamp(struct sock_extended_err *,struct cmsghdr *,struct scm_timestamping *)166   void ProcessTimestamp(struct sock_extended_err* /*serr*/,
167                         struct cmsghdr* /*opt_stats*/,
168                         struct scm_timestamping* /*tss*/) {}
Size()169   int Size() { return 0; }
Shutdown(void *,absl::Status)170   void Shutdown(void* /*remaining*/, absl::Status /*shutdown_err*/) {}
171 };
172 #endif  // GRPC_LINUX_ERRQUEUE
173 
174 // Sets the callback function to call when timestamps for a write are collected.
175 // This is expected to be called atmost once.
176 void TcpSetWriteTimestampsCallback(
177     absl::AnyInvocable<void(void*, Timestamps*, absl::Status)>);
178 
179 }  // namespace experimental
180 }  // namespace grpc_event_engine
181 
182 #endif  // GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TRACED_BUFFER_LIST_H
183