• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
15 
16 #include <errno.h>
17 #include <grpc/event_engine/internal/slice_cast.h>
18 #include <grpc/event_engine/slice.h>
19 #include <grpc/event_engine/slice_buffer.h>
20 #include <grpc/status.h>
21 #include <grpc/support/port_platform.h>
22 #include <inttypes.h>
23 #include <limits.h>
24 
25 #include <algorithm>
26 #include <cctype>
27 #include <cstdint>
28 #include <cstdlib>
29 #include <memory>
30 #include <string>
31 #include <type_traits>
32 
33 #include "absl/functional/any_invocable.h"
34 #include "absl/log/check.h"
35 #include "absl/log/log.h"
36 #include "absl/status/status.h"
37 #include "absl/status/statusor.h"
38 #include "absl/strings/str_cat.h"
39 #include "absl/types/optional.h"
40 #include "src/core/lib/debug/trace.h"
41 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
42 #include "src/core/lib/event_engine/posix_engine/internal_errqueue.h"
43 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
44 #include "src/core/lib/event_engine/tcp_socket_utils.h"
45 #include "src/core/lib/experiments/experiments.h"
46 #include "src/core/lib/iomgr/exec_ctx.h"
47 #include "src/core/lib/resource_quota/resource_quota.h"
48 #include "src/core/lib/slice/slice.h"
49 #include "src/core/telemetry/stats.h"
50 #include "src/core/util/debug_location.h"
51 #include "src/core/util/load_file.h"
52 #include "src/core/util/ref_counted_ptr.h"
53 #include "src/core/util/status_helper.h"
54 #include "src/core/util/strerror.h"
55 #include "src/core/util/sync.h"
56 #include "src/core/util/time.h"
57 
58 #ifdef GRPC_POSIX_SOCKET_TCP
59 #ifdef GRPC_LINUX_ERRQUEUE
60 #include <dirent.h>            // IWYU pragma: keep
61 #include <linux/capability.h>  // IWYU pragma: keep
62 #include <linux/errqueue.h>    // IWYU pragma: keep
63 #include <linux/netlink.h>     // IWYU pragma: keep
64 #include <sys/prctl.h>         // IWYU pragma: keep
65 #include <sys/resource.h>      // IWYU pragma: keep
66 #endif
67 #include <netinet/in.h>  // IWYU pragma: keep
68 
69 #ifndef SOL_TCP
70 #define SOL_TCP IPPROTO_TCP
71 #endif
72 
73 #ifndef TCP_INQ
74 #define TCP_INQ 36
75 #define TCP_CM_INQ TCP_INQ
76 #endif
77 
78 #ifdef GRPC_HAVE_MSG_NOSIGNAL
79 #define SENDMSG_FLAGS MSG_NOSIGNAL
80 #else
81 #define SENDMSG_FLAGS 0
82 #endif
83 
84 // TCP zero copy sendmsg flag.
85 // NB: We define this here as a fallback in case we're using an older set of
86 // library headers that has not defined MSG_ZEROCOPY. Since this constant is
87 // part of the kernel, we are guaranteed it will never change/disagree so
88 // defining it here is safe.
89 #ifndef MSG_ZEROCOPY
90 #define MSG_ZEROCOPY 0x4000000
91 #endif
92 
93 #define MAX_READ_IOVEC 64
94 
95 namespace grpc_event_engine {
96 namespace experimental {
97 
98 namespace {
99 
100 // A wrapper around sendmsg. It sends \a msg over \a fd and returns the number
101 // of bytes sent.
TcpSend(int fd,const struct msghdr * msg,int * saved_errno,int additional_flags=0)102 ssize_t TcpSend(int fd, const struct msghdr* msg, int* saved_errno,
103                 int additional_flags = 0) {
104   GRPC_LATENT_SEE_PARENT_SCOPE("TcpSend");
105   ssize_t sent_length;
106   do {
107     grpc_core::global_stats().IncrementSyscallWrite();
108     sent_length = sendmsg(fd, msg, SENDMSG_FLAGS | additional_flags);
109   } while (sent_length < 0 && (*saved_errno = errno) == EINTR);
110   return sent_length;
111 }
112 
113 #ifdef GRPC_LINUX_ERRQUEUE
114 
115 #define CAP_IS_SUPPORTED(cap) (prctl(PR_CAPBSET_READ, (cap), 0) > 0)
116 
117 // Remove spaces and newline characters from the end of a string.
rtrim(std::string & s)118 void rtrim(std::string& s) {
119   s.erase(std::find_if(s.rbegin(), s.rend(),
120                        [](unsigned char ch) { return !std::isspace(ch); })
121               .base(),
122           s.end());
123 }
124 
ParseUlimitMemLockFromFile(std::string file_name)125 uint64_t ParseUlimitMemLockFromFile(std::string file_name) {
126   static std::string kHardMemlockPrefix = "* hard memlock";
127   auto result = grpc_core::LoadFile(file_name, false);
128   if (!result.ok()) {
129     return 0;
130   }
131   std::string file_contents(reinterpret_cast<const char*>((*result).begin()),
132                             (*result).length());
133   // Find start position containing prefix.
134   size_t start = file_contents.find(kHardMemlockPrefix);
135   if (start == std::string::npos) {
136     return 0;
137   }
138   // Find position of next newline after prefix.
139   size_t end = file_contents.find(start, '\n');
140   // Extract substring between prefix and next newline.
141   auto memlock_value_string = file_contents.substr(
142       start + kHardMemlockPrefix.length() + 1, end - start);
143   rtrim(memlock_value_string);
144   if (memlock_value_string == "unlimited" ||
145       memlock_value_string == "infinity") {
146     return UINT64_MAX;
147   } else {
148     return std::atoi(memlock_value_string.c_str());
149   }
150 }
151 
152 // Ulimit hard memlock controls per socket limit for maximum locked memory in
153 // RAM. Parses all files under  /etc/security/limits.d/ and
154 // /etc/security/limits.conf file for a line of the following format:
155 // * hard memlock <value>
156 // It extracts the first valid <value> and returns it. A value of UINT64_MAX
157 // represents unlimited or infinity. Hard memlock value should be set to
158 // allow zerocopy sendmsgs to succeed. It controls the maximum amount of
159 // memory that can be locked by a socket in RAM.
GetUlimitHardMemLock()160 uint64_t GetUlimitHardMemLock() {
161   static const uint64_t kUlimitHardMemLock = []() -> uint64_t {
162     if (CAP_IS_SUPPORTED(CAP_SYS_RESOURCE)) {
163       // hard memlock ulimit is ignored for privileged user.
164       return UINT64_MAX;
165     }
166     if (auto dir = opendir("/etc/security/limits.d")) {
167       while (auto f = readdir(dir)) {
168         if (f->d_name[0] == '.') {
169           continue;  // Skip everything that starts with a dot
170         }
171         uint64_t hard_memlock = ParseUlimitMemLockFromFile(
172             absl::StrCat("/etc/security/limits.d/", std::string(f->d_name)));
173         if (hard_memlock != 0) {
174           return hard_memlock;
175         }
176       }
177       closedir(dir);
178     }
179     return ParseUlimitMemLockFromFile("/etc/security/limits.conf");
180   }();
181   return kUlimitHardMemLock;
182 }
183 
184 // RLIMIT_MEMLOCK controls per process limit for maximum locked memory in RAM.
GetRLimitMemLockMax()185 uint64_t GetRLimitMemLockMax() {
186   static const uint64_t kRlimitMemLock = []() -> uint64_t {
187     if (CAP_IS_SUPPORTED(CAP_SYS_RESOURCE)) {
188       // RLIMIT_MEMLOCK is ignored for privileged user.
189       return UINT64_MAX;
190     }
191     struct rlimit limit;
192     if (getrlimit(RLIMIT_MEMLOCK, &limit) != 0) {
193       return 0;
194     }
195     return static_cast<uint64_t>(limit.rlim_max);
196   }();
197   return kRlimitMemLock;
198 }
199 
200 // Whether the cmsg received from error queue is of the IPv4 or IPv6 levels.
CmsgIsIpLevel(const cmsghdr & cmsg)201 bool CmsgIsIpLevel(const cmsghdr& cmsg) {
202   return (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR) ||
203          (cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR);
204 }
205 
CmsgIsZeroCopy(const cmsghdr & cmsg)206 bool CmsgIsZeroCopy(const cmsghdr& cmsg) {
207   if (!CmsgIsIpLevel(cmsg)) {
208     return false;
209   }
210   auto serr = reinterpret_cast<const sock_extended_err*> CMSG_DATA(&cmsg);
211   return serr->ee_errno == 0 && serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY;
212 }
213 #endif  // GRPC_LINUX_ERRQUEUE
214 
PosixOSError(int error_no,absl::string_view call_name)215 absl::Status PosixOSError(int error_no, absl::string_view call_name) {
216   return absl::UnknownError(absl::StrCat(
217       call_name, ": ", grpc_core::StrError(error_no), " (", error_no, ")"));
218 }
219 
220 }  // namespace
221 
222 #if defined(IOV_MAX) && IOV_MAX < 260
223 #define MAX_WRITE_IOVEC IOV_MAX
224 #else
225 #define MAX_WRITE_IOVEC 260
226 #endif
PopulateIovs(size_t * unwind_slice_idx,size_t * unwind_byte_idx,size_t * sending_length,iovec * iov)227 msg_iovlen_type TcpZerocopySendRecord::PopulateIovs(size_t* unwind_slice_idx,
228                                                     size_t* unwind_byte_idx,
229                                                     size_t* sending_length,
230                                                     iovec* iov) {
231   msg_iovlen_type iov_size;
232   *unwind_slice_idx = out_offset_.slice_idx;
233   *unwind_byte_idx = out_offset_.byte_idx;
234   for (iov_size = 0;
235        out_offset_.slice_idx != buf_.Count() && iov_size != MAX_WRITE_IOVEC;
236        iov_size++) {
237     MutableSlice& slice = internal::SliceCast<MutableSlice>(
238         buf_.MutableSliceAt(out_offset_.slice_idx));
239     iov[iov_size].iov_base = slice.begin() + out_offset_.byte_idx;
240     iov[iov_size].iov_len = slice.length() - out_offset_.byte_idx;
241     *sending_length += iov[iov_size].iov_len;
242     ++(out_offset_.slice_idx);
243     out_offset_.byte_idx = 0;
244   }
245   DCHECK_GT(iov_size, 0u);
246   return iov_size;
247 }
248 
UpdateOffsetForBytesSent(size_t sending_length,size_t actually_sent)249 void TcpZerocopySendRecord::UpdateOffsetForBytesSent(size_t sending_length,
250                                                      size_t actually_sent) {
251   size_t trailing = sending_length - actually_sent;
252   while (trailing > 0) {
253     size_t slice_length;
254     out_offset_.slice_idx--;
255     slice_length = buf_.RefSlice(out_offset_.slice_idx).length();
256     if (slice_length > trailing) {
257       out_offset_.byte_idx = slice_length - trailing;
258       break;
259     } else {
260       trailing -= slice_length;
261     }
262   }
263 }
264 
AddToEstimate(size_t bytes)265 void PosixEndpointImpl::AddToEstimate(size_t bytes) {
266   bytes_read_this_round_ += static_cast<double>(bytes);
267 }
268 
FinishEstimate()269 void PosixEndpointImpl::FinishEstimate() {
270   // If we read >80% of the target buffer in one read loop, increase the size of
271   // the target buffer to either the amount read, or twice its previous value.
272   if (bytes_read_this_round_ > target_length_ * 0.8) {
273     target_length_ = std::max(2 * target_length_, bytes_read_this_round_);
274   } else {
275     target_length_ = 0.99 * target_length_ + 0.01 * bytes_read_this_round_;
276   }
277   bytes_read_this_round_ = 0;
278 }
279 
TcpAnnotateError(absl::Status src_error) const280 absl::Status PosixEndpointImpl::TcpAnnotateError(absl::Status src_error) const {
281   grpc_core::StatusSetInt(&src_error, grpc_core::StatusIntProperty::kFd,
282                           handle_->WrappedFd());
283   grpc_core::StatusSetInt(&src_error, grpc_core::StatusIntProperty::kRpcStatus,
284                           GRPC_STATUS_UNAVAILABLE);
285   return src_error;
286 }
287 
288 // Returns true if data available to read or error other than EAGAIN.
TcpDoRead(absl::Status & status)289 bool PosixEndpointImpl::TcpDoRead(absl::Status& status) {
290   GRPC_LATENT_SEE_INNER_SCOPE("TcpDoRead");
291 
292   struct msghdr msg;
293   struct iovec iov[MAX_READ_IOVEC];
294   ssize_t read_bytes;
295   size_t total_read_bytes = 0;
296   size_t iov_len = std::min<size_t>(MAX_READ_IOVEC, incoming_buffer_->Count());
297 #ifdef GRPC_LINUX_ERRQUEUE
298   constexpr size_t cmsg_alloc_space =
299       CMSG_SPACE(sizeof(scm_timestamping)) + CMSG_SPACE(sizeof(int));
300 #else
301   constexpr size_t cmsg_alloc_space = 24;  // CMSG_SPACE(sizeof(int))
302 #endif  // GRPC_LINUX_ERRQUEUE
303   char cmsgbuf[cmsg_alloc_space];
304   for (size_t i = 0; i < iov_len; i++) {
305     MutableSlice& slice =
306         internal::SliceCast<MutableSlice>(incoming_buffer_->MutableSliceAt(i));
307     iov[i].iov_base = slice.begin();
308     iov[i].iov_len = slice.length();
309   }
310 
311   CHECK_NE(incoming_buffer_->Length(), 0u);
312   DCHECK_GT(min_progress_size_, 0);
313 
314   do {
315     // Assume there is something on the queue. If we receive TCP_INQ from
316     // kernel, we will update this value, otherwise, we have to assume there is
317     // always something to read until we get EAGAIN.
318     inq_ = 1;
319 
320     msg.msg_name = nullptr;
321     msg.msg_namelen = 0;
322     msg.msg_iov = iov;
323     msg.msg_iovlen = static_cast<msg_iovlen_type>(iov_len);
324     if (inq_capable_) {
325       msg.msg_control = cmsgbuf;
326       msg.msg_controllen = sizeof(cmsgbuf);
327     } else {
328       msg.msg_control = nullptr;
329       msg.msg_controllen = 0;
330     }
331     msg.msg_flags = 0;
332 
333     grpc_core::global_stats().IncrementTcpReadOffer(incoming_buffer_->Length());
334     grpc_core::global_stats().IncrementTcpReadOfferIovSize(
335         incoming_buffer_->Count());
336     do {
337       grpc_core::global_stats().IncrementSyscallRead();
338       read_bytes = recvmsg(fd_, &msg, 0);
339     } while (read_bytes < 0 && errno == EINTR);
340 
341     if (read_bytes < 0 && errno == EAGAIN) {
342       // NB: After calling call_read_cb a parallel call of the read handler may
343       // be running.
344       if (total_read_bytes > 0) {
345         break;
346       }
347       FinishEstimate();
348       inq_ = 0;
349       return false;
350     }
351 
352     // We have read something in previous reads. We need to deliver those bytes
353     // to the upper layer.
354     if (read_bytes <= 0 && total_read_bytes >= 1) {
355       break;
356     }
357 
358     if (read_bytes <= 0) {
359       // 0 read size ==> end of stream
360       incoming_buffer_->Clear();
361       if (read_bytes == 0) {
362         status = TcpAnnotateError(absl::InternalError("Socket closed"));
363       } else {
364         status = TcpAnnotateError(absl::InternalError(
365             absl::StrCat("recvmsg:", grpc_core::StrError(errno))));
366       }
367       return true;
368     }
369 
370     grpc_core::global_stats().IncrementTcpReadSize(read_bytes);
371     AddToEstimate(static_cast<size_t>(read_bytes));
372     DCHECK((size_t)read_bytes <= incoming_buffer_->Length() - total_read_bytes);
373 
374 #ifdef GRPC_HAVE_TCP_INQ
375     if (inq_capable_) {
376       DCHECK(!(msg.msg_flags & MSG_CTRUNC));
377       struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
378       for (; cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
379         if (cmsg->cmsg_level == SOL_TCP && cmsg->cmsg_type == TCP_CM_INQ &&
380             cmsg->cmsg_len == CMSG_LEN(sizeof(int))) {
381           inq_ = *reinterpret_cast<int*>(CMSG_DATA(cmsg));
382           break;
383         }
384       }
385     }
386 #endif  // GRPC_HAVE_TCP_INQ
387 
388     total_read_bytes += read_bytes;
389     if (inq_ == 0 || total_read_bytes == incoming_buffer_->Length()) {
390       break;
391     }
392 
393     // We had a partial read, and still have space to read more data. So, adjust
394     // IOVs and try to read more.
395     size_t remaining = read_bytes;
396     size_t j = 0;
397     for (size_t i = 0; i < iov_len; i++) {
398       if (remaining >= iov[i].iov_len) {
399         remaining -= iov[i].iov_len;
400         continue;
401       }
402       if (remaining > 0) {
403         iov[j].iov_base = static_cast<char*>(iov[i].iov_base) + remaining;
404         iov[j].iov_len = iov[i].iov_len - remaining;
405         remaining = 0;
406       } else {
407         iov[j].iov_base = iov[i].iov_base;
408         iov[j].iov_len = iov[i].iov_len;
409       }
410       ++j;
411     }
412     iov_len = j;
413   } while (true);
414 
415   if (inq_ == 0) {
416     FinishEstimate();
417     // If this is using the epoll poller, then it is edge-triggered.
418     // Since this read did not consume the edge (i.e., did not get EAGAIN), the
419     // next read on this endpoint must assume there is something to read.
420     // Otherwise, assuming there is nothing to read and waiting for an epoll
421     // edge event could cause the next read to wait indefinitely.
422     inq_ = 1;
423   }
424 
425   DCHECK_GT(total_read_bytes, 0u);
426   status = absl::OkStatus();
427   if (grpc_core::IsTcpFrameSizeTuningEnabled()) {
428     // Update min progress size based on the total number of bytes read in
429     // this round.
430     min_progress_size_ -= total_read_bytes;
431     if (min_progress_size_ > 0) {
432       // There is still some bytes left to be read before we can signal
433       // the read as complete. Append the bytes read so far into
434       // last_read_buffer which serves as a staging buffer. Return false
435       // to indicate tcp_handle_read needs to be scheduled again.
436       incoming_buffer_->MoveFirstNBytesIntoSliceBuffer(total_read_bytes,
437                                                        last_read_buffer_);
438       return false;
439     } else {
440       // The required number of bytes have been read. Append the bytes
441       // read in this round into last_read_buffer. Then swap last_read_buffer
442       // and incoming_buffer. Now incoming buffer contains all the bytes
443       // read since the start of the last tcp_read operation. last_read_buffer
444       // would contain any spare space left in the incoming buffer. This
445       // space will be used in the next tcp_read operation.
446       min_progress_size_ = 1;
447       incoming_buffer_->MoveFirstNBytesIntoSliceBuffer(total_read_bytes,
448                                                        last_read_buffer_);
449       incoming_buffer_->Swap(last_read_buffer_);
450       return true;
451     }
452   }
453   if (total_read_bytes < incoming_buffer_->Length()) {
454     incoming_buffer_->MoveLastNBytesIntoSliceBuffer(
455         incoming_buffer_->Length() - total_read_bytes, last_read_buffer_);
456   }
457   return true;
458 }
459 
PerformReclamation()460 void PosixEndpointImpl::PerformReclamation() {
461   read_mu_.Lock();
462   if (incoming_buffer_ != nullptr) {
463     incoming_buffer_->Clear();
464   }
465   has_posted_reclaimer_ = false;
466   read_mu_.Unlock();
467 }
468 
MaybePostReclaimer()469 void PosixEndpointImpl::MaybePostReclaimer() {
470   if (!has_posted_reclaimer_) {
471     has_posted_reclaimer_ = true;
472     memory_owner_.PostReclaimer(
473         grpc_core::ReclamationPass::kBenign,
474         [self = Ref(DEBUG_LOCATION, "Posix Reclaimer")](
475             absl::optional<grpc_core::ReclamationSweep> sweep) {
476           if (sweep.has_value()) {
477             self->PerformReclamation();
478           }
479         });
480   }
481 }
482 
UpdateRcvLowat()483 void PosixEndpointImpl::UpdateRcvLowat() {
484   if (!grpc_core::IsTcpRcvLowatEnabled()) return;
485 
486   // TODO(ctiller): Check if supported by OS.
487   // TODO(ctiller): Allow some adjustments instead of hardcoding things.
488 
489   static constexpr int kRcvLowatMax = 16 * 1024 * 1024;
490   static constexpr int kRcvLowatThreshold = 16 * 1024;
491 
492   int remaining = std::min({static_cast<int>(incoming_buffer_->Length()),
493                             kRcvLowatMax, min_progress_size_});
494 
495   // Setting SO_RCVLOWAT for small quantities does not save on CPU.
496   if (remaining < kRcvLowatThreshold) {
497     remaining = 0;
498   }
499 
500   // If zerocopy is off, wake shortly before the full RPC is here. More can
501   // show up partway through recvmsg() since it takes a while to copy data.
502   // So an early wakeup aids latency.
503   if (!tcp_zerocopy_send_ctx_->Enabled() && remaining > 0) {
504     remaining -= kRcvLowatThreshold;
505   }
506 
507   // We still do not know the RPC size. Do not set SO_RCVLOWAT.
508   if (set_rcvlowat_ <= 1 && remaining <= 1) return;
509 
510   // Previous value is still valid. No change needed in SO_RCVLOWAT.
511   if (set_rcvlowat_ == remaining) {
512     return;
513   }
514   auto result = sock_.SetSocketRcvLowat(remaining);
515   if (result.ok()) {
516     set_rcvlowat_ = *result;
517   } else {
518     LOG(ERROR) << "ERROR in SO_RCVLOWAT: " << result.status().message();
519   }
520 }
521 
MaybeMakeReadSlices()522 void PosixEndpointImpl::MaybeMakeReadSlices() {
523   static const int kBigAlloc = 64 * 1024;
524   static const int kSmallAlloc = 8 * 1024;
525   if (incoming_buffer_->Length() < std::max<size_t>(min_progress_size_, 1)) {
526     size_t allocate_length = min_progress_size_;
527     const size_t target_length = static_cast<size_t>(target_length_);
528     // If memory pressure is low and we think there will be more than
529     // min_progress_size bytes to read, allocate a bit more.
530     const bool low_memory_pressure =
531         memory_owner_.GetPressureInfo().pressure_control_value < 0.8;
532     if (low_memory_pressure && target_length > allocate_length) {
533       allocate_length = target_length;
534     }
535     int extra_wanted = std::max<int>(
536         1, allocate_length - static_cast<int>(incoming_buffer_->Length()));
537     if (extra_wanted >=
538         (low_memory_pressure ? kSmallAlloc * 3 / 2 : kBigAlloc)) {
539       while (extra_wanted > 0) {
540         extra_wanted -= kBigAlloc;
541         incoming_buffer_->AppendIndexed(
542             Slice(memory_owner_.MakeSlice(kBigAlloc)));
543         grpc_core::global_stats().IncrementTcpReadAlloc64k();
544       }
545     } else {
546       while (extra_wanted > 0) {
547         extra_wanted -= kSmallAlloc;
548         incoming_buffer_->AppendIndexed(
549             Slice(memory_owner_.MakeSlice(kSmallAlloc)));
550         grpc_core::global_stats().IncrementTcpReadAlloc8k();
551       }
552     }
553     MaybePostReclaimer();
554   }
555 }
556 
HandleReadLocked(absl::Status & status)557 bool PosixEndpointImpl::HandleReadLocked(absl::Status& status) {
558   if (status.ok() && memory_owner_.is_valid()) {
559     MaybeMakeReadSlices();
560     if (!TcpDoRead(status)) {
561       UpdateRcvLowat();
562       // We've consumed the edge, request a new one.
563       return false;
564     }
565   } else {
566     if (!memory_owner_.is_valid() && status.ok()) {
567       status = TcpAnnotateError(absl::UnknownError("Shutting down endpoint"));
568     }
569     incoming_buffer_->Clear();
570     last_read_buffer_.Clear();
571   }
572   return true;
573 }
574 
HandleRead(absl::Status status)575 void PosixEndpointImpl::HandleRead(absl::Status status) {
576   bool ret = false;
577   absl::AnyInvocable<void(absl::Status)> cb = nullptr;
578   grpc_core::EnsureRunInExecCtx([&, this]() mutable {
579     grpc_core::MutexLock lock(&read_mu_);
580     ret = HandleReadLocked(status);
581     if (ret) {
582       GRPC_TRACE_LOG(event_engine_endpoint, INFO)
583           << "Endpoint[" << this << "]: Read complete";
584       cb = std::move(read_cb_);
585       read_cb_ = nullptr;
586       incoming_buffer_ = nullptr;
587     }
588   });
589   if (!ret) {
590     handle_->NotifyOnRead(on_read_);
591     return;
592   }
593   cb(status);
594   Unref();
595 }
596 
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const EventEngine::Endpoint::ReadArgs * args)597 bool PosixEndpointImpl::Read(absl::AnyInvocable<void(absl::Status)> on_read,
598                              SliceBuffer* buffer,
599                              const EventEngine::Endpoint::ReadArgs* args) {
600   grpc_core::ReleasableMutexLock lock(&read_mu_);
601   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
602       << "Endpoint[" << this << "]: Read";
603   CHECK(read_cb_ == nullptr);
604   incoming_buffer_ = buffer;
605   incoming_buffer_->Clear();
606   incoming_buffer_->Swap(last_read_buffer_);
607   if (args != nullptr && grpc_core::IsTcpFrameSizeTuningEnabled()) {
608     min_progress_size_ = std::max(static_cast<int>(args->read_hint_bytes), 1);
609   } else {
610     min_progress_size_ = 1;
611   }
612   Ref().release();
613   if (is_first_read_) {
614     read_cb_ = std::move(on_read);
615     UpdateRcvLowat();
616     // Endpoint read called for the very first time. Register read callback
617     // with the polling engine.
618     is_first_read_ = false;
619     lock.Release();
620     handle_->NotifyOnRead(on_read_);
621   } else if (inq_ == 0) {
622     read_cb_ = std::move(on_read);
623     UpdateRcvLowat();
624     lock.Release();
625     // Upper layer asked to read more but we know there is no pending data to
626     // read from previous reads. So, wait for POLLIN.
627     handle_->NotifyOnRead(on_read_);
628   } else {
629     absl::Status status;
630     MaybeMakeReadSlices();
631     if (!TcpDoRead(status)) {
632       UpdateRcvLowat();
633       read_cb_ = std::move(on_read);
634       // We've consumed the edge, request a new one.
635       lock.Release();
636       handle_->NotifyOnRead(on_read_);
637       return false;
638     }
639     if (!status.ok()) {
640       // Read failed immediately. Schedule the on_read callback to run
641       // asynchronously.
642       lock.Release();
643       engine_->Run([on_read = std::move(on_read), status, this]() mutable {
644         GRPC_TRACE_LOG(event_engine_endpoint, INFO)
645             << "Endpoint[" << this << "]: Read failed immediately: " << status;
646         on_read(status);
647       });
648       Unref();
649       return false;
650     }
651     // Read succeeded immediately. Return true and don't run the on_read
652     // callback.
653     incoming_buffer_ = nullptr;
654     Unref();
655     GRPC_TRACE_LOG(event_engine_endpoint, INFO)
656         << "Endpoint[" << this << "]: Read succeeded immediately";
657     return true;
658   }
659   return false;
660 }
661 
662 #ifdef GRPC_LINUX_ERRQUEUE
TcpGetSendZerocopyRecord(SliceBuffer & buf)663 TcpZerocopySendRecord* PosixEndpointImpl::TcpGetSendZerocopyRecord(
664     SliceBuffer& buf) {
665   TcpZerocopySendRecord* zerocopy_send_record = nullptr;
666   const bool use_zerocopy =
667       tcp_zerocopy_send_ctx_->Enabled() &&
668       tcp_zerocopy_send_ctx_->ThresholdBytes() < buf.Length();
669   if (use_zerocopy) {
670     zerocopy_send_record = tcp_zerocopy_send_ctx_->GetSendRecord();
671     if (zerocopy_send_record == nullptr) {
672       ProcessErrors();
673       zerocopy_send_record = tcp_zerocopy_send_ctx_->GetSendRecord();
674     }
675     if (zerocopy_send_record != nullptr) {
676       zerocopy_send_record->PrepareForSends(buf);
677       DCHECK_EQ(buf.Count(), 0u);
678       DCHECK_EQ(buf.Length(), 0u);
679       outgoing_byte_idx_ = 0;
680       outgoing_buffer_ = nullptr;
681     }
682   }
683   return zerocopy_send_record;
684 }
685 
686 // For linux platforms, reads the socket's error queue and processes error
687 // messages from the queue.
ProcessErrors()688 bool PosixEndpointImpl::ProcessErrors() {
689   bool processed_err = false;
690   struct iovec iov;
691   iov.iov_base = nullptr;
692   iov.iov_len = 0;
693   struct msghdr msg;
694   msg.msg_name = nullptr;
695   msg.msg_namelen = 0;
696   msg.msg_iov = &iov;
697   msg.msg_iovlen = 0;
698   msg.msg_flags = 0;
699   // Allocate enough space so we don't need to keep increasing this as size of
700   // OPT_STATS increase.
701   constexpr size_t cmsg_alloc_space =
702       CMSG_SPACE(sizeof(scm_timestamping)) +
703       CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in)) +
704       CMSG_SPACE(32 * NLA_ALIGN(NLA_HDRLEN + sizeof(uint64_t)));
705   // Allocate aligned space for cmsgs received along with timestamps.
706   union {
707     char rbuf[cmsg_alloc_space];
708     struct cmsghdr align;
709   } aligned_buf;
710   msg.msg_control = aligned_buf.rbuf;
711   int r, saved_errno;
712   while (true) {
713     msg.msg_controllen = sizeof(aligned_buf.rbuf);
714     do {
715       r = recvmsg(fd_, &msg, MSG_ERRQUEUE);
716       saved_errno = errno;
717     } while (r < 0 && saved_errno == EINTR);
718 
719     if (r < 0 && saved_errno == EAGAIN) {
720       return processed_err;  // No more errors to process
721     } else if (r < 0) {
722       return processed_err;
723     }
724     if (GPR_UNLIKELY((msg.msg_flags & MSG_CTRUNC) != 0)) {
725       LOG(ERROR) << "Error message was truncated.";
726     }
727 
728     if (msg.msg_controllen == 0) {
729       // There was no control message found. It was probably spurious.
730       return processed_err;
731     }
732     bool seen = false;
733     for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len;
734          cmsg = CMSG_NXTHDR(&msg, cmsg)) {
735       if (CmsgIsZeroCopy(*cmsg)) {
736         ProcessZerocopy(cmsg);
737         seen = true;
738         processed_err = true;
739       } else if (cmsg->cmsg_level == SOL_SOCKET &&
740                  cmsg->cmsg_type == SCM_TIMESTAMPING) {
741         cmsg = ProcessTimestamp(&msg, cmsg);
742         seen = true;
743         processed_err = true;
744       } else {
745         // Got a control message that is not a timestamp or zerocopy. Don't know
746         // how to handle this.
747         return processed_err;
748       }
749     }
750     if (!seen) {
751       return processed_err;
752     }
753   }
754 }
755 
ZerocopyDisableAndWaitForRemaining()756 void PosixEndpointImpl::ZerocopyDisableAndWaitForRemaining() {
757   tcp_zerocopy_send_ctx_->Shutdown();
758   while (!tcp_zerocopy_send_ctx_->AllSendRecordsEmpty()) {
759     ProcessErrors();
760   }
761 }
762 
763 // Reads \a cmsg to process zerocopy control messages.
ProcessZerocopy(struct cmsghdr * cmsg)764 void PosixEndpointImpl::ProcessZerocopy(struct cmsghdr* cmsg) {
765   DCHECK(cmsg);
766   auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(cmsg));
767   DCHECK_EQ(serr->ee_errno, 0u);
768   DCHECK(serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY);
769   const uint32_t lo = serr->ee_info;
770   const uint32_t hi = serr->ee_data;
771   for (uint32_t seq = lo; seq <= hi; ++seq) {
772     // TODO(arjunroy): It's likely that lo and hi refer to zerocopy sequence
773     // numbers that are generated by a single call to grpc_endpoint_write; ie.
774     // we can batch the unref operation. So, check if record is the same for
775     // both; if so, batch the unref/put.
776     TcpZerocopySendRecord* record =
777         tcp_zerocopy_send_ctx_->ReleaseSendRecord(seq);
778     DCHECK(record);
779     UnrefMaybePutZerocopySendRecord(record);
780   }
781   if (tcp_zerocopy_send_ctx_->UpdateZeroCopyOptMemStateAfterFree()) {
782     handle_->SetWritable();
783   }
784 }
785 
786 // Reads \a cmsg to derive timestamps from the control messages. If a valid
787 // timestamp is found, the traced buffer list is updated with this timestamp.
788 // The caller of this function should be looping on the control messages found
789 // in \a msg. \a cmsg should point to the control message that the caller wants
790 // processed. On return, a pointer to a control message is returned. On the next
791 // iteration, CMSG_NXTHDR(msg, ret_val) should be passed as \a cmsg.
ProcessTimestamp(msghdr * msg,struct cmsghdr * cmsg)792 struct cmsghdr* PosixEndpointImpl::ProcessTimestamp(msghdr* msg,
793                                                     struct cmsghdr* cmsg) {
794   auto next_cmsg = CMSG_NXTHDR(msg, cmsg);
795   cmsghdr* opt_stats = nullptr;
796   if (next_cmsg == nullptr) {
797     return cmsg;
798   }
799 
800   // Check if next_cmsg is an OPT_STATS msg.
801   if (next_cmsg->cmsg_level == SOL_SOCKET &&
802       next_cmsg->cmsg_type == SCM_TIMESTAMPING_OPT_STATS) {
803     opt_stats = next_cmsg;
804     next_cmsg = CMSG_NXTHDR(msg, opt_stats);
805     if (next_cmsg == nullptr) {
806       return opt_stats;
807     }
808   }
809 
810   if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) ||
811       !(next_cmsg->cmsg_type == IP_RECVERR ||
812         next_cmsg->cmsg_type == IPV6_RECVERR)) {
813     return cmsg;
814   }
815 
816   auto tss = reinterpret_cast<scm_timestamping*>(CMSG_DATA(cmsg));
817   auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(next_cmsg));
818   if (serr->ee_errno != ENOMSG ||
819       serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) {
820     LOG(ERROR) << "Unexpected control message";
821     return cmsg;
822   }
823   traced_buffers_.ProcessTimestamp(serr, opt_stats, tss);
824   return next_cmsg;
825 }
826 
HandleError(absl::Status status)827 void PosixEndpointImpl::HandleError(absl::Status status) {
828   if (!status.ok() ||
829       stop_error_notification_.load(std::memory_order_relaxed)) {
830     // We aren't going to register to hear on error anymore, so it is safe to
831     // unref.
832     Unref();
833     return;
834   }
835   // We are still interested in collecting timestamps, so let's try reading
836   // them.
837   if (!ProcessErrors()) {
838     // This might not a timestamps error. Set the read and write closures to be
839     // ready.
840     handle_->SetReadable();
841     handle_->SetWritable();
842   }
843   handle_->NotifyOnError(on_error_);
844 }
845 
WriteWithTimestamps(struct msghdr * msg,size_t sending_length,ssize_t * sent_length,int * saved_errno,int additional_flags)846 bool PosixEndpointImpl::WriteWithTimestamps(struct msghdr* msg,
847                                             size_t sending_length,
848                                             ssize_t* sent_length,
849                                             int* saved_errno,
850                                             int additional_flags) {
851   if (!socket_ts_enabled_) {
852     uint32_t opt = kTimestampingSocketOptions;
853     if (setsockopt(fd_, SOL_SOCKET, SO_TIMESTAMPING, static_cast<void*>(&opt),
854                    sizeof(opt)) != 0) {
855       return false;
856     }
857     bytes_counter_ = -1;
858     socket_ts_enabled_ = true;
859   }
860   // Set control message to indicate that you want timestamps.
861   union {
862     char cmsg_buf[CMSG_SPACE(sizeof(uint32_t))];
863     struct cmsghdr align;
864   } u;
865   cmsghdr* cmsg = reinterpret_cast<cmsghdr*>(u.cmsg_buf);
866   cmsg->cmsg_level = SOL_SOCKET;
867   cmsg->cmsg_type = SO_TIMESTAMPING;
868   cmsg->cmsg_len = CMSG_LEN(sizeof(uint32_t));
869   *reinterpret_cast<int*>(CMSG_DATA(cmsg)) = kTimestampingRecordingOptions;
870   msg->msg_control = u.cmsg_buf;
871   msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t));
872 
873   // If there was an error on sendmsg the logic in tcp_flush will handle it.
874   grpc_core::global_stats().IncrementTcpWriteSize(sending_length);
875   ssize_t length = TcpSend(fd_, msg, saved_errno, additional_flags);
876   *sent_length = length;
877   // Only save timestamps if all the bytes were taken by sendmsg.
878   if (sending_length == static_cast<size_t>(length)) {
879     traced_buffers_.AddNewEntry(static_cast<uint32_t>(bytes_counter_ + length),
880                                 fd_, outgoing_buffer_arg_);
881     outgoing_buffer_arg_ = nullptr;
882   }
883   return true;
884 }
885 
886 #else   // GRPC_LINUX_ERRQUEUE
TcpGetSendZerocopyRecord(SliceBuffer &)887 TcpZerocopySendRecord* PosixEndpointImpl::TcpGetSendZerocopyRecord(
888     SliceBuffer& /*buf*/) {
889   return nullptr;
890 }
891 
HandleError(absl::Status)892 void PosixEndpointImpl::HandleError(absl::Status /*status*/) {
893   grpc_core::Crash("Error handling not supported on this platform");
894 }
895 
ZerocopyDisableAndWaitForRemaining()896 void PosixEndpointImpl::ZerocopyDisableAndWaitForRemaining() {}
897 
WriteWithTimestamps(struct msghdr *,size_t,ssize_t *,int *,int)898 bool PosixEndpointImpl::WriteWithTimestamps(struct msghdr* /*msg*/,
899                                             size_t /*sending_length*/,
900                                             ssize_t* /*sent_length*/,
901                                             int* /*saved_errno*/,
902                                             int /*additional_flags*/) {
903   grpc_core::Crash("Write with timestamps not supported for this platform");
904 }
905 #endif  // GRPC_LINUX_ERRQUEUE
906 
UnrefMaybePutZerocopySendRecord(TcpZerocopySendRecord * record)907 void PosixEndpointImpl::UnrefMaybePutZerocopySendRecord(
908     TcpZerocopySendRecord* record) {
909   if (record->Unref()) {
910     tcp_zerocopy_send_ctx_->PutSendRecord(record);
911   }
912 }
913 
914 // If outgoing_buffer_arg is filled, shuts down the list early, so that any
915 // release operations needed can be performed on the arg.
TcpShutdownTracedBufferList()916 void PosixEndpointImpl::TcpShutdownTracedBufferList() {
917   if (outgoing_buffer_arg_ != nullptr) {
918     traced_buffers_.Shutdown(outgoing_buffer_arg_,
919                              absl::InternalError("TracedBuffer list shutdown"));
920     outgoing_buffer_arg_ = nullptr;
921   }
922 }
923 
924 // returns true if done, false if pending; if returning true, *error is set
DoFlushZerocopy(TcpZerocopySendRecord * record,absl::Status & status)925 bool PosixEndpointImpl::DoFlushZerocopy(TcpZerocopySendRecord* record,
926                                         absl::Status& status) {
927   msg_iovlen_type iov_size;
928   ssize_t sent_length = 0;
929   size_t sending_length;
930   size_t unwind_slice_idx;
931   size_t unwind_byte_idx;
932   bool tried_sending_message;
933   int saved_errno;
934   msghdr msg;
935   bool constrained;
936   status = absl::OkStatus();
937   // iov consumes a large space. Keep it as the last item on the stack to
938   // improve locality. After all, we expect only the first elements of it
939   // being populated in most cases.
940   iovec iov[MAX_WRITE_IOVEC];
941   while (true) {
942     sending_length = 0;
943     iov_size = record->PopulateIovs(&unwind_slice_idx, &unwind_byte_idx,
944                                     &sending_length, iov);
945     msg.msg_name = nullptr;
946     msg.msg_namelen = 0;
947     msg.msg_iov = iov;
948     msg.msg_iovlen = iov_size;
949     msg.msg_flags = 0;
950     tried_sending_message = false;
951     constrained = false;
952     // Before calling sendmsg (with or without timestamps): we
953     // take a single ref on the zerocopy send record.
954     tcp_zerocopy_send_ctx_->NoteSend(record);
955     saved_errno = 0;
956     if (outgoing_buffer_arg_ != nullptr) {
957       if (!ts_capable_ ||
958           !WriteWithTimestamps(&msg, sending_length, &sent_length, &saved_errno,
959                                MSG_ZEROCOPY)) {
960         // We could not set socket options to collect Fathom timestamps.
961         // Fallback on writing without timestamps.
962         ts_capable_ = false;
963         TcpShutdownTracedBufferList();
964       } else {
965         tried_sending_message = true;
966       }
967     }
968     if (!tried_sending_message) {
969       msg.msg_control = nullptr;
970       msg.msg_controllen = 0;
971       grpc_core::global_stats().IncrementTcpWriteSize(sending_length);
972       grpc_core::global_stats().IncrementTcpWriteIovSize(iov_size);
973       sent_length = TcpSend(fd_, &msg, &saved_errno, MSG_ZEROCOPY);
974     }
975     if (tcp_zerocopy_send_ctx_->UpdateZeroCopyOptMemStateAfterSend(
976             saved_errno == ENOBUFS, constrained) ||
977         constrained) {
978       // If constrained, is true it implies that we received an ENOBUFS error
979       // but there are no un-acked z-copy records. This situation may arise
980       // because the per-process RLIMIT_MEMLOCK limit or the per-socket hard
981       // memlock ulimit on the machine may be very small. These limits control
982       // the max number of bytes a process/socket can respectively pin to RAM.
983       // Tx0cp respects these limits and if a sendmsg tries to send more than
984       // this limit, the kernel may return ENOBUFS error. Print a warning
985       // message here to allow help with debugging. Grpc should not attempt to
986       // raise the limit values.
987       if (!constrained) {
988         handle_->SetWritable();
989       } else {
990 #ifdef GRPC_LINUX_ERRQUEUE
991         LOG_EVERY_N_SEC(INFO, 1)
992             << "Tx0cp encountered an ENOBUFS error possibly because one or "
993                "both of RLIMIT_MEMLOCK or hard memlock ulimit values are too "
994                "small for the intended user. Current system value of "
995                "RLIMIT_MEMLOCK is "
996             << GetRLimitMemLockMax() << " and hard memlock ulimit is "
997             << GetUlimitHardMemLock()
998             << ".Consider increasing these values appropriately for the "
999                "intended user.";
1000 #endif
1001       }
1002     }
1003     if (sent_length < 0) {
1004       // If this particular send failed, drop ref taken earlier in this method.
1005       tcp_zerocopy_send_ctx_->UndoSend();
1006       if (saved_errno == EAGAIN || saved_errno == ENOBUFS) {
1007         record->UnwindIfThrottled(unwind_slice_idx, unwind_byte_idx);
1008         return false;
1009       } else {
1010         status = TcpAnnotateError(PosixOSError(saved_errno, "sendmsg"));
1011         TcpShutdownTracedBufferList();
1012         return true;
1013       }
1014     }
1015     bytes_counter_ += sent_length;
1016     record->UpdateOffsetForBytesSent(sending_length,
1017                                      static_cast<size_t>(sent_length));
1018     if (record->AllSlicesSent()) {
1019       return true;
1020     }
1021   }
1022 }
1023 
TcpFlushZerocopy(TcpZerocopySendRecord * record,absl::Status & status)1024 bool PosixEndpointImpl::TcpFlushZerocopy(TcpZerocopySendRecord* record,
1025                                          absl::Status& status) {
1026   bool done = DoFlushZerocopy(record, status);
1027   if (done) {
1028     // Either we encountered an error, or we successfully sent all the bytes.
1029     // In either case, we're done with this record.
1030     UnrefMaybePutZerocopySendRecord(record);
1031   }
1032   return done;
1033 }
1034 
TcpFlush(absl::Status & status)1035 bool PosixEndpointImpl::TcpFlush(absl::Status& status) {
1036   struct msghdr msg;
1037   struct iovec iov[MAX_WRITE_IOVEC];
1038   msg_iovlen_type iov_size;
1039   ssize_t sent_length = 0;
1040   size_t sending_length;
1041   size_t trailing;
1042   size_t unwind_slice_idx;
1043   size_t unwind_byte_idx;
1044   int saved_errno;
1045   status = absl::OkStatus();
1046 
1047   // We always start at zero, because we eagerly unref and trim the slice
1048   // buffer as we write
1049   size_t outgoing_slice_idx = 0;
1050 
1051   while (true) {
1052     sending_length = 0;
1053     unwind_slice_idx = outgoing_slice_idx;
1054     unwind_byte_idx = outgoing_byte_idx_;
1055     for (iov_size = 0; outgoing_slice_idx != outgoing_buffer_->Count() &&
1056                        iov_size != MAX_WRITE_IOVEC;
1057          iov_size++) {
1058       MutableSlice& slice = internal::SliceCast<MutableSlice>(
1059           outgoing_buffer_->MutableSliceAt(outgoing_slice_idx));
1060       iov[iov_size].iov_base = slice.begin() + outgoing_byte_idx_;
1061       iov[iov_size].iov_len = slice.length() - outgoing_byte_idx_;
1062 
1063       sending_length += iov[iov_size].iov_len;
1064       outgoing_slice_idx++;
1065       outgoing_byte_idx_ = 0;
1066     }
1067     CHECK_GT(iov_size, 0u);
1068 
1069     msg.msg_name = nullptr;
1070     msg.msg_namelen = 0;
1071     msg.msg_iov = iov;
1072     msg.msg_iovlen = iov_size;
1073     msg.msg_flags = 0;
1074     bool tried_sending_message = false;
1075     saved_errno = 0;
1076     if (outgoing_buffer_arg_ != nullptr) {
1077       if (!ts_capable_ || !WriteWithTimestamps(&msg, sending_length,
1078                                                &sent_length, &saved_errno, 0)) {
1079         // We could not set socket options to collect Fathom timestamps.
1080         // Fallback on writing without timestamps.
1081         ts_capable_ = false;
1082         TcpShutdownTracedBufferList();
1083       } else {
1084         tried_sending_message = true;
1085       }
1086     }
1087     if (!tried_sending_message) {
1088       msg.msg_control = nullptr;
1089       msg.msg_controllen = 0;
1090       grpc_core::global_stats().IncrementTcpWriteSize(sending_length);
1091       grpc_core::global_stats().IncrementTcpWriteIovSize(iov_size);
1092       sent_length = TcpSend(fd_, &msg, &saved_errno);
1093     }
1094 
1095     if (sent_length < 0) {
1096       if (saved_errno == EAGAIN || saved_errno == ENOBUFS) {
1097         outgoing_byte_idx_ = unwind_byte_idx;
1098         // unref all and forget about all slices that have been written to this
1099         // point
1100         for (size_t idx = 0; idx < unwind_slice_idx; ++idx) {
1101           outgoing_buffer_->TakeFirst();
1102         }
1103         return false;
1104       } else {
1105         status = TcpAnnotateError(PosixOSError(saved_errno, "sendmsg"));
1106         outgoing_buffer_->Clear();
1107         TcpShutdownTracedBufferList();
1108         return true;
1109       }
1110     }
1111 
1112     CHECK_EQ(outgoing_byte_idx_, 0u);
1113     bytes_counter_ += sent_length;
1114     trailing = sending_length - static_cast<size_t>(sent_length);
1115     while (trailing > 0) {
1116       size_t slice_length;
1117       outgoing_slice_idx--;
1118       slice_length = outgoing_buffer_->RefSlice(outgoing_slice_idx).length();
1119       if (slice_length > trailing) {
1120         outgoing_byte_idx_ = slice_length - trailing;
1121         break;
1122       } else {
1123         trailing -= slice_length;
1124       }
1125     }
1126     if (outgoing_slice_idx == outgoing_buffer_->Count()) {
1127       outgoing_buffer_->Clear();
1128       return true;
1129     }
1130   }
1131 }
1132 
HandleWrite(absl::Status status)1133 void PosixEndpointImpl::HandleWrite(absl::Status status) {
1134   if (!status.ok()) {
1135     GRPC_TRACE_LOG(event_engine_endpoint, INFO)
1136         << "Endpoint[" << this << "]: Write failed: " << status;
1137     absl::AnyInvocable<void(absl::Status)> cb_ = std::move(write_cb_);
1138     write_cb_ = nullptr;
1139     if (current_zerocopy_send_ != nullptr) {
1140       UnrefMaybePutZerocopySendRecord(current_zerocopy_send_);
1141       current_zerocopy_send_ = nullptr;
1142     }
1143     cb_(status);
1144     Unref();
1145     return;
1146   }
1147   bool flush_result = current_zerocopy_send_ != nullptr
1148                           ? TcpFlushZerocopy(current_zerocopy_send_, status)
1149                           : TcpFlush(status);
1150   if (!flush_result) {
1151     DCHECK(status.ok());
1152     handle_->NotifyOnWrite(on_write_);
1153   } else {
1154     GRPC_TRACE_LOG(event_engine_endpoint, INFO)
1155         << "Endpoint[" << this << "]: Write complete: " << status;
1156     absl::AnyInvocable<void(absl::Status)> cb_ = std::move(write_cb_);
1157     write_cb_ = nullptr;
1158     current_zerocopy_send_ = nullptr;
1159     cb_(status);
1160     Unref();
1161   }
1162 }
1163 
Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const EventEngine::Endpoint::WriteArgs * args)1164 bool PosixEndpointImpl::Write(
1165     absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
1166     const EventEngine::Endpoint::WriteArgs* args) {
1167   absl::Status status = absl::OkStatus();
1168   TcpZerocopySendRecord* zerocopy_send_record = nullptr;
1169 
1170   CHECK(write_cb_ == nullptr);
1171   DCHECK_EQ(current_zerocopy_send_, nullptr);
1172   DCHECK_NE(data, nullptr);
1173 
1174   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
1175       << "Endpoint[" << this << "]: Write " << data->Length() << " bytes";
1176 
1177   if (data->Length() == 0) {
1178     TcpShutdownTracedBufferList();
1179     if (handle_->IsHandleShutdown()) {
1180       status = TcpAnnotateError(absl::InternalError("EOF"));
1181       engine_->Run(
1182           [on_writable = std::move(on_writable), status, this]() mutable {
1183             GRPC_TRACE_LOG(event_engine_endpoint, INFO)
1184                 << "Endpoint[" << this << "]: Write failed: " << status;
1185             on_writable(status);
1186           });
1187       return false;
1188     }
1189     GRPC_TRACE_LOG(event_engine_endpoint, INFO)
1190         << "Endpoint[" << this << "]: Write skipped";
1191     return true;
1192   }
1193 
1194   zerocopy_send_record = TcpGetSendZerocopyRecord(*data);
1195   if (zerocopy_send_record == nullptr) {
1196     // Either not enough bytes, or couldn't allocate a zerocopy context.
1197     outgoing_buffer_ = data;
1198     outgoing_byte_idx_ = 0;
1199   }
1200   if (args != nullptr) {
1201     outgoing_buffer_arg_ = args->google_specific;
1202   }
1203   if (outgoing_buffer_arg_) {
1204     CHECK(poller_->CanTrackErrors());
1205   }
1206 
1207   bool flush_result = zerocopy_send_record != nullptr
1208                           ? TcpFlushZerocopy(zerocopy_send_record, status)
1209                           : TcpFlush(status);
1210   if (!flush_result) {
1211     Ref().release();
1212     write_cb_ = std::move(on_writable);
1213     current_zerocopy_send_ = zerocopy_send_record;
1214     handle_->NotifyOnWrite(on_write_);
1215     return false;
1216   }
1217   if (!status.ok()) {
1218     // Write failed immediately. Schedule the on_writable callback to run
1219     // asynchronously.
1220     engine_->Run(
1221         [on_writable = std::move(on_writable), status, this]() mutable {
1222           GRPC_TRACE_LOG(event_engine_endpoint, INFO)
1223               << "Endpoint[" << this << "]: Write failed: " << status;
1224           on_writable(status);
1225         });
1226     return false;
1227   }
1228   // Write succeeded immediately. Return true and don't run the on_writable
1229   // callback.
1230   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
1231       << "Endpoint[" << this << "]: Write succeeded immediately";
1232   return true;
1233 }
1234 
MaybeShutdown(absl::Status why,absl::AnyInvocable<void (absl::StatusOr<int>)> on_release_fd)1235 void PosixEndpointImpl::MaybeShutdown(
1236     absl::Status why,
1237     absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd) {
1238   if (poller_->CanTrackErrors()) {
1239     ZerocopyDisableAndWaitForRemaining();
1240     stop_error_notification_.store(true, std::memory_order_release);
1241     handle_->SetHasError();
1242   }
1243   on_release_fd_ = std::move(on_release_fd);
1244   grpc_core::StatusSetInt(&why, grpc_core::StatusIntProperty::kRpcStatus,
1245                           GRPC_STATUS_UNAVAILABLE);
1246   handle_->ShutdownHandle(why);
1247   read_mu_.Lock();
1248   memory_owner_.Reset();
1249   read_mu_.Unlock();
1250   Unref();
1251 }
1252 
~PosixEndpointImpl()1253 PosixEndpointImpl ::~PosixEndpointImpl() {
1254   int release_fd = -1;
1255   handle_->OrphanHandle(on_done_,
1256                         on_release_fd_ == nullptr ? nullptr : &release_fd, "");
1257   if (on_release_fd_ != nullptr) {
1258     engine_->Run([on_release_fd = std::move(on_release_fd_),
1259                   release_fd]() mutable { on_release_fd(release_fd); });
1260   }
1261   delete on_read_;
1262   delete on_write_;
1263   delete on_error_;
1264 }
1265 
PosixEndpointImpl(EventHandle * handle,PosixEngineClosure * on_done,std::shared_ptr<EventEngine> engine,MemoryAllocator &&,const PosixTcpOptions & options)1266 PosixEndpointImpl::PosixEndpointImpl(EventHandle* handle,
1267                                      PosixEngineClosure* on_done,
1268                                      std::shared_ptr<EventEngine> engine,
1269                                      MemoryAllocator&& /*allocator*/,
1270                                      const PosixTcpOptions& options)
1271     : sock_(PosixSocketWrapper(handle->WrappedFd())),
1272       on_done_(on_done),
1273       traced_buffers_(),
1274       handle_(handle),
1275       poller_(handle->Poller()),
1276       engine_(engine) {
1277   PosixSocketWrapper sock(handle->WrappedFd());
1278   fd_ = handle_->WrappedFd();
1279   CHECK(options.resource_quota != nullptr);
1280   auto peer_addr_string = sock.PeerAddressString();
1281   mem_quota_ = options.resource_quota->memory_quota();
1282   memory_owner_ = mem_quota_->CreateMemoryOwner();
1283   self_reservation_ = memory_owner_.MakeReservation(sizeof(PosixEndpointImpl));
1284   auto local_address = sock.LocalAddress();
1285   if (local_address.ok()) {
1286     local_address_ = *local_address;
1287   }
1288   auto peer_address = sock.PeerAddress();
1289   if (peer_address.ok()) {
1290     peer_address_ = *peer_address;
1291   }
1292   target_length_ = static_cast<double>(options.tcp_read_chunk_size);
1293   bytes_read_this_round_ = 0;
1294   min_read_chunk_size_ = options.tcp_min_read_chunk_size;
1295   max_read_chunk_size_ = options.tcp_max_read_chunk_size;
1296   bool zerocopy_enabled =
1297       options.tcp_tx_zero_copy_enabled && poller_->CanTrackErrors();
1298 #ifdef GRPC_LINUX_ERRQUEUE
1299   if (zerocopy_enabled) {
1300     if (GetRLimitMemLockMax() == 0) {
1301       zerocopy_enabled = false;
1302       LOG(ERROR) << "Tx zero-copy will not be used by gRPC since RLIMIT_MEMLOCK"
1303                  << " value is not set. Consider raising its value with "
1304                  << "setrlimit().";
1305     } else if (GetUlimitHardMemLock() == 0) {
1306       zerocopy_enabled = false;
1307       LOG(ERROR) << "Tx zero-copy will not be used by gRPC since hard memlock "
1308                  << "ulimit value is not set. Use ulimit -l <value> to set its "
1309                  << "value.";
1310     } else {
1311       const int enable = 1;
1312       if (setsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &enable, sizeof(enable)) !=
1313           0) {
1314         zerocopy_enabled = false;
1315         LOG(ERROR) << "Failed to set zerocopy options on the socket.";
1316       }
1317     }
1318 
1319     if (zerocopy_enabled) {
1320       VLOG(2) << "Tx-zero copy enabled for gRPC sends. RLIMIT_MEMLOCK value "
1321               << "=" << GetRLimitMemLockMax()
1322               << ",ulimit hard memlock value = " << GetUlimitHardMemLock();
1323     }
1324   }
1325 #endif  // GRPC_LINUX_ERRQUEUE
1326   tcp_zerocopy_send_ctx_ = std::make_unique<TcpZerocopySendCtx>(
1327       zerocopy_enabled, options.tcp_tx_zerocopy_max_simultaneous_sends,
1328       options.tcp_tx_zerocopy_send_bytes_threshold);
1329 #ifdef GRPC_HAVE_TCP_INQ
1330   int one = 1;
1331   if (setsockopt(fd_, SOL_TCP, TCP_INQ, &one, sizeof(one)) == 0) {
1332     inq_capable_ = true;
1333   } else {
1334     VLOG(2) << "cannot set inq fd=" << fd_ << " errno=" << errno;
1335     inq_capable_ = false;
1336   }
1337 #else
1338   inq_capable_ = false;
1339 #endif  // GRPC_HAVE_TCP_INQ
1340 
1341   on_read_ = PosixEngineClosure::ToPermanentClosure(
1342       [this](absl::Status status) { HandleRead(std::move(status)); });
1343   on_write_ = PosixEngineClosure::ToPermanentClosure(
1344       [this](absl::Status status) { HandleWrite(std::move(status)); });
1345   on_error_ = PosixEngineClosure::ToPermanentClosure(
1346       [this](absl::Status status) { HandleError(std::move(status)); });
1347 
1348   // Start being notified on errors if poller can track errors.
1349   if (poller_->CanTrackErrors()) {
1350     Ref().release();
1351     handle_->NotifyOnError(on_error_);
1352   }
1353 }
1354 
CreatePosixEndpoint(EventHandle * handle,PosixEngineClosure * on_shutdown,std::shared_ptr<EventEngine> engine,MemoryAllocator && allocator,const PosixTcpOptions & options)1355 std::unique_ptr<PosixEndpoint> CreatePosixEndpoint(
1356     EventHandle* handle, PosixEngineClosure* on_shutdown,
1357     std::shared_ptr<EventEngine> engine, MemoryAllocator&& allocator,
1358     const PosixTcpOptions& options) {
1359   DCHECK_NE(handle, nullptr);
1360   return std::make_unique<PosixEndpoint>(handle, on_shutdown, std::move(engine),
1361                                          std::move(allocator), options);
1362 }
1363 
1364 }  // namespace experimental
1365 }  // namespace grpc_event_engine
1366 
1367 #else  // GRPC_POSIX_SOCKET_TCP
1368 
1369 namespace grpc_event_engine {
1370 namespace experimental {
1371 
CreatePosixEndpoint(EventHandle *,PosixEngineClosure *,std::shared_ptr<EventEngine>,const PosixTcpOptions &)1372 std::unique_ptr<PosixEndpoint> CreatePosixEndpoint(
1373     EventHandle* /*handle*/, PosixEngineClosure* /*on_shutdown*/,
1374     std::shared_ptr<EventEngine> /*engine*/,
1375     const PosixTcpOptions& /*options*/) {
1376   grpc_core::Crash("Cannot create PosixEndpoint on this platform");
1377 }
1378 
1379 }  // namespace experimental
1380 }  // namespace grpc_event_engine
1381 
1382 #endif  // GRPC_POSIX_SOCKET_TCP
1383