• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENDPOINT_H
16 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENDPOINT_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 // IWYU pragma: no_include <bits/types/struct_iovec.h>
21 
22 #include <grpc/event_engine/event_engine.h>
23 #include <grpc/event_engine/memory_allocator.h>
24 #include <grpc/event_engine/slice_buffer.h>
25 #include <grpc/support/alloc.h>
26 
27 #include <atomic>
28 #include <cstdint>
29 #include <memory>
30 #include <new>
31 #include <utility>
32 
33 #include "absl/base/thread_annotations.h"
34 #include "absl/container/flat_hash_map.h"
35 #include "absl/functional/any_invocable.h"
36 #include "absl/hash/hash.h"
37 #include "absl/log/check.h"
38 #include "absl/log/log.h"
39 #include "absl/status/status.h"
40 #include "absl/status/statusor.h"
41 #include "src/core/lib/event_engine/extensions/supports_fd.h"
42 #include "src/core/lib/event_engine/posix.h"
43 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
44 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
45 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
46 #include "src/core/lib/event_engine/posix_engine/traced_buffer_list.h"
47 #include "src/core/lib/iomgr/port.h"
48 #include "src/core/lib/resource_quota/memory_quota.h"
49 #include "src/core/util/crash.h"
50 #include "src/core/util/ref_counted.h"
51 #include "src/core/util/sync.h"
52 
53 #ifdef GRPC_POSIX_SOCKET_TCP
54 
55 #include <sys/socket.h>  // IWYU pragma: keep
56 #include <sys/types.h>   // IWYU pragma: keep
57 
58 #ifdef GRPC_MSG_IOVLEN_TYPE
59 typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type;
60 #else
61 typedef size_t msg_iovlen_type;
62 #endif
63 
64 #endif  //  GRPC_POSIX_SOCKET_TCP
65 
66 namespace grpc_event_engine {
67 namespace experimental {
68 
69 #ifdef GRPC_POSIX_SOCKET_TCP
70 
71 class TcpZerocopySendRecord {
72  public:
TcpZerocopySendRecord()73   TcpZerocopySendRecord() { buf_.Clear(); };
74 
~TcpZerocopySendRecord()75   ~TcpZerocopySendRecord() { DebugAssertEmpty(); }
76 
77   // TcpZerocopySendRecord contains a slice buffer holding the slices to be
78   // sent. Given the slices that we wish to send, and the current offset into
79   // the slice buffer (indicating which have already been sent), populate an
80   // iovec array that will be used for a zerocopy enabled sendmsg().
81   //   unwind_slice_idx - input/output parameter. It indicates the index of last
82   //   slice whose contents were partially sent in the previous sendmsg. After
83   //   this function returns, it gets updated to to a new offset
84   //   depending on the number of bytes which are decided to be sent in the
85   //   current sendmsg.
86   //   unwind_byte_idx - input/output parameter. It indicates the byte offset
87   //   within the last slice whose contents were partially sent in the previous
88   //   sendmsg. After this function returns, it gets updated to a new offset
89   //   depending on the number of bytes which are decided to be sent in the
90   //   current sendmsg.
91   //   sending_length - total number of bytes to be sent in the current sendmsg.
92   //   iov - An iovec array containing the bytes to be sent in the current
93   //   sendmsg.
94   //  Returns: the number of entries in the iovec array.
95   //
96   msg_iovlen_type PopulateIovs(size_t* unwind_slice_idx,
97                                size_t* unwind_byte_idx, size_t* sending_length,
98                                iovec* iov);
99 
100   // A sendmsg() may not be able to send the bytes that we requested at this
101   // time, returning EAGAIN (possibly due to backpressure). In this case,
102   // unwind the offset into the slice buffer so we retry sending these bytes.
UnwindIfThrottled(size_t unwind_slice_idx,size_t unwind_byte_idx)103   void UnwindIfThrottled(size_t unwind_slice_idx, size_t unwind_byte_idx) {
104     out_offset_.byte_idx = unwind_byte_idx;
105     out_offset_.slice_idx = unwind_slice_idx;
106   }
107 
108   // Update the offset into the slice buffer based on how much we wanted to sent
109   // vs. what sendmsg() actually sent (which may be lower, possibly due to
110   // backpressure).
111   void UpdateOffsetForBytesSent(size_t sending_length, size_t actually_sent);
112 
113   // Indicates whether all underlying data has been sent or not.
AllSlicesSent()114   bool AllSlicesSent() { return out_offset_.slice_idx == buf_.Count(); }
115 
116   // Reset this structure for a new tcp_write() with zerocopy.
PrepareForSends(grpc_event_engine::experimental::SliceBuffer & slices_to_send)117   void PrepareForSends(
118       grpc_event_engine::experimental::SliceBuffer& slices_to_send) {
119     DebugAssertEmpty();
120     out_offset_.slice_idx = 0;
121     out_offset_.byte_idx = 0;
122     buf_.Swap(slices_to_send);
123     Ref();
124   }
125 
126   // References: 1 reference per sendmsg(), and 1 for the tcp_write().
Ref()127   void Ref() { ref_.fetch_add(1, std::memory_order_relaxed); }
128 
129   // Unref: called when we get an error queue notification for a sendmsg(), if a
130   //  sendmsg() failed or when tcp_write() is done.
Unref()131   bool Unref() {
132     const intptr_t prior = ref_.fetch_sub(1, std::memory_order_acq_rel);
133     DCHECK_GT(prior, 0);
134     if (prior == 1) {
135       AllSendsComplete();
136       return true;
137     }
138     return false;
139   }
140 
141  private:
142   struct OutgoingOffset {
143     size_t slice_idx = 0;
144     size_t byte_idx = 0;
145   };
146 
DebugAssertEmpty()147   void DebugAssertEmpty() {
148     DCHECK_EQ(buf_.Count(), 0u);
149     DCHECK_EQ(buf_.Length(), 0u);
150     DCHECK_EQ(ref_.load(std::memory_order_relaxed), 0);
151   }
152 
153   // When all sendmsg() calls associated with this tcp_write() have been
154   // completed (ie. we have received the notifications for each sequence number
155   // for each sendmsg()) and all reference counts have been dropped, drop our
156   // reference to the underlying data since we no longer need it.
AllSendsComplete()157   void AllSendsComplete() {
158     DCHECK_EQ(ref_.load(std::memory_order_relaxed), 0);
159     buf_.Clear();
160   }
161 
162   grpc_event_engine::experimental::SliceBuffer buf_;
163   std::atomic<intptr_t> ref_{0};
164   OutgoingOffset out_offset_;
165 };
166 
167 class TcpZerocopySendCtx {
168  public:
169   static constexpr int kDefaultMaxSends = 4;
170   static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024;  // 16KB
171 
172   explicit TcpZerocopySendCtx(
173       bool zerocopy_enabled, int max_sends = kDefaultMaxSends,
174       size_t send_bytes_threshold = kDefaultSendBytesThreshold)
max_sends_(max_sends)175       : max_sends_(max_sends),
176         free_send_records_size_(max_sends),
177         threshold_bytes_(send_bytes_threshold) {
178     send_records_ = static_cast<TcpZerocopySendRecord*>(
179         gpr_malloc(max_sends * sizeof(*send_records_)));
180     free_send_records_ = static_cast<TcpZerocopySendRecord**>(
181         gpr_malloc(max_sends * sizeof(*free_send_records_)));
182     if (send_records_ == nullptr || free_send_records_ == nullptr) {
183       gpr_free(send_records_);
184       gpr_free(free_send_records_);
185       VLOG(2) << "Disabling TCP TX zerocopy due to memory pressure.\n";
186       memory_limited_ = true;
187       enabled_ = false;
188     } else {
189       for (int idx = 0; idx < max_sends_; ++idx) {
190         new (send_records_ + idx) TcpZerocopySendRecord();
191         free_send_records_[idx] = send_records_ + idx;
192       }
193       enabled_ = zerocopy_enabled;
194     }
195   }
196 
~TcpZerocopySendCtx()197   ~TcpZerocopySendCtx() {
198     if (send_records_ != nullptr) {
199       for (int idx = 0; idx < max_sends_; ++idx) {
200         send_records_[idx].~TcpZerocopySendRecord();
201       }
202     }
203     gpr_free(send_records_);
204     gpr_free(free_send_records_);
205   }
206 
207   // True if we were unable to allocate the various bookkeeping structures at
208   // transport initialization time. If memory limited, we do not zerocopy.
MemoryLimited()209   bool MemoryLimited() const { return memory_limited_; }
210 
211   // TCP send zerocopy maintains an implicit sequence number for every
212   // successful sendmsg() with zerocopy enabled; the kernel later gives us an
213   // error queue notification with this sequence number indicating that the
214   // underlying data buffers that we sent can now be released. Once that
215   // notification is received, we can release the buffers associated with this
216   // zerocopy send record. Here, we associate the sequence number with the data
217   // buffers that were sent with the corresponding call to sendmsg().
NoteSend(TcpZerocopySendRecord * record)218   void NoteSend(TcpZerocopySendRecord* record) {
219     record->Ref();
220     {
221       grpc_core::MutexLock lock(&mu_);
222       is_in_write_ = true;
223       AssociateSeqWithSendRecordLocked(last_send_, record);
224     }
225     ++last_send_;
226   }
227 
228   // If sendmsg() actually failed, though, we need to revert the sequence number
229   // that we speculatively bumped before calling sendmsg(). Note that we bump
230   // this sequence number and perform relevant bookkeeping (see: NoteSend())
231   // *before* calling sendmsg() since, if we called it *after* sendmsg(), then
232   // there is a possible race with the release notification which could occur on
233   // another thread before we do the necessary bookkeeping. Hence, calling
234   // NoteSend() *before* sendmsg() and implementing an undo function is needed.
UndoSend()235   void UndoSend() {
236     --last_send_;
237     if (ReleaseSendRecord(last_send_)->Unref()) {
238       // We should still be holding the ref taken by tcp_write().
239       DCHECK(0);
240     }
241   }
242 
243   // Simply associate this send record (and the underlying sent data buffers)
244   // with the implicit sequence number for this zerocopy sendmsg().
AssociateSeqWithSendRecordLocked(uint32_t seq,TcpZerocopySendRecord * record)245   void AssociateSeqWithSendRecordLocked(uint32_t seq,
246                                         TcpZerocopySendRecord* record)
247       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
248     ctx_lookup_.emplace(seq, record);
249   }
250 
251   // Get a send record for a send that we wish to do with zerocopy.
GetSendRecord()252   TcpZerocopySendRecord* GetSendRecord() {
253     grpc_core::MutexLock lock(&mu_);
254     return TryGetSendRecordLocked();
255   }
256 
257   // A given send record corresponds to a single tcp_write() with zerocopy
258   // enabled. This can result in several sendmsg() calls to flush all of the
259   // data to wire. Each sendmsg() takes a reference on the
260   // TcpZerocopySendRecord, and corresponds to a single sequence number.
261   // ReleaseSendRecord releases a reference on TcpZerocopySendRecord for a
262   // single sequence number. This is called either when we receive the relevant
263   // error queue notification (saying that we can discard the underlying
264   // buffers for this sendmsg()) is received from the kernel - or, in case
265   // sendmsg() was unsuccessful to begin with.
ReleaseSendRecord(uint32_t seq)266   TcpZerocopySendRecord* ReleaseSendRecord(uint32_t seq) {
267     grpc_core::MutexLock lock(&mu_);
268     return ReleaseSendRecordLocked(seq);
269   }
270 
271   // After all the references to a TcpZerocopySendRecord are released, we can
272   // add it back to the pool (of size max_sends_). Note that we can only have
273   // max_sends_ tcp_write() instances with zerocopy enabled in flight at the
274   // same time.
PutSendRecord(TcpZerocopySendRecord * record)275   void PutSendRecord(TcpZerocopySendRecord* record) {
276     grpc_core::MutexLock lock(&mu_);
277     DCHECK(record >= send_records_ && record < send_records_ + max_sends_);
278     PutSendRecordLocked(record);
279   }
280 
281   // Indicate that we are disposing of this zerocopy context. This indicator
282   // will prevent new zerocopy writes from being issued.
Shutdown()283   void Shutdown() { shutdown_.store(true, std::memory_order_release); }
284 
285   // Indicates that there are no inflight tcp_write() instances with zerocopy
286   // enabled.
AllSendRecordsEmpty()287   bool AllSendRecordsEmpty() {
288     grpc_core::MutexLock lock(&mu_);
289     return free_send_records_size_ == max_sends_;
290   }
291 
Enabled()292   bool Enabled() const { return enabled_; }
293 
294   // Only use zerocopy if we are sending at least this many bytes. The
295   // additional overhead of reading the error queue for notifications means that
296   // zerocopy is not useful for small transfers.
ThresholdBytes()297   size_t ThresholdBytes() const { return threshold_bytes_; }
298 
299   // Expected to be called by handler reading messages from the err queue.
300   // It is used to indicate that some optmem memory is now available. It returns
301   // true to tell the caller to mark the file descriptor as immediately
302   // writable.
303   //
304   // OptMem (controlled by the kernel option optmem_max) refers to the memory
305   // allocated to the cmsg list maintained by the kernel that contains "extra"
306   // packet information like SCM_RIGHTS or IP_TTL. Increasing this option allows
307   // the kernel to allocate more memory as needed for more control messages that
308   // need to be sent for each socket connected.
309   //
310   // If a write is currently in progress on the socket (ie. we have issued a
311   // sendmsg() and are about to check its return value) then we set omem state
312   // to CHECK to make the sending thread know that some tcp_omem was
313   // concurrently freed even if sendmsg() returns ENOBUFS. In this case, since
314   // there is already an active send thread, we do not need to mark the
315   // socket writeable, so we return false.
316   //
317   // If there was no write in progress on the socket, and the socket was not
318   // marked as FULL, then we need not mark the socket writeable now that some
319   // tcp_omem memory is freed since it was not considered as blocked on
320   // tcp_omem to begin with. So in this case, return false.
321   //
322   // But, if a write was not in progress and the omem state was FULL, then we
323   // need to mark the socket writeable since it is no longer blocked by
324   // tcp_omem. In this case, return true.
325   //
326   // Please refer to the STATE TRANSITION DIAGRAM below for more details.
327   //
UpdateZeroCopyOptMemStateAfterFree()328   bool UpdateZeroCopyOptMemStateAfterFree() {
329     grpc_core::MutexLock lock(&mu_);
330     if (is_in_write_) {
331       zcopy_enobuf_state_ = OptMemState::kCheck;
332       return false;
333     }
334     DCHECK(zcopy_enobuf_state_ != OptMemState::kCheck);
335     if (zcopy_enobuf_state_ == OptMemState::kFull) {
336       // A previous sendmsg attempt was blocked by ENOBUFS. Return true to
337       // mark the fd as writable so the next write attempt could be made.
338       zcopy_enobuf_state_ = OptMemState::kOpen;
339       return true;
340     } else if (zcopy_enobuf_state_ == OptMemState::kOpen) {
341       // No need to mark the fd as writable because the previous write
342       // attempt did not encounter ENOBUFS.
343       return false;
344     } else {
345       // This state should never be reached because it implies that the previous
346       // state was CHECK and is_in_write is false. This means that after the
347       // previous sendmsg returned and set is_in_write to false, it did
348       // not update the z-copy change from CHECK to OPEN.
349       grpc_core::Crash("OMem state error!");
350     }
351   }
352 
353   // Expected to be called by the thread calling sendmsg after the syscall
354   // invocation. is complete. If an ENOBUF is seen, it checks if the error
355   // handler (Tx0cp completions) has already run and free'ed up some OMem. It
356   // returns true indicating that the write can be attempted again immediately.
357   // If ENOBUFS was seen but no Tx0cp completions have been received between the
358   // sendmsg() and us taking this lock, then tcp_omem is still full from our
359   // point of view. Therefore, we do not signal that the socket is writeable
360   // with respect to the availability of tcp_omem. Therefore the function
361   // returns false. This indicates that another write should not be attempted
362   // immediately and the calling thread should wait until the socket is writable
363   // again. If ENOBUFS was not seen, then again return false because the next
364   // write should be attempted only when the socket is writable again.
365   //
366   // Please refer to the STATE TRANSITION DIAGRAM below for more details.
367   //
UpdateZeroCopyOptMemStateAfterSend(bool seen_enobuf,bool & constrained)368   bool UpdateZeroCopyOptMemStateAfterSend(bool seen_enobuf, bool& constrained) {
369     grpc_core::MutexLock lock(&mu_);
370     is_in_write_ = false;
371     constrained = false;
372     if (seen_enobuf) {
373       if (ctx_lookup_.size() == 1) {
374         // There is no un-acked z-copy record. Set constrained to true to
375         // indicate that we are re-source constrained because we're seeing
376         // ENOBUFS even for the first record. This indicates that either
377         // the process does not have hard memlock ulimit or RLIMIT_MEMLOCK
378         // configured correctly.
379         constrained = true;
380       }
381       if (zcopy_enobuf_state_ == OptMemState::kCheck) {
382         zcopy_enobuf_state_ = OptMemState::kOpen;
383         return true;
384       } else {
385         zcopy_enobuf_state_ = OptMemState::kFull;
386       }
387     } else if (zcopy_enobuf_state_ != OptMemState::kOpen) {
388       zcopy_enobuf_state_ = OptMemState::kOpen;
389     }
390     return false;
391   }
392 
393  private:
394   //                      STATE TRANSITION DIAGRAM
395   //
396   // sendmsg succeeds       Tx-zero copy succeeds and there is no active sendmsg
397   //      ----<<--+  +------<<-------------------------------------+
398   //      |       |  |                                             |
399   //      |       |  v       sendmsg returns ENOBUFS               |
400   //      +-----> OPEN  ------------->>-------------------------> FULL
401   //                ^                                              |
402   //                |                                              |
403   //                | sendmsg completes                            |
404   //                +----<<---------- CHECK <-------<<-------------+
405   //                                        Tx-zero copy succeeds and there is
406   //                                        an active sendmsg
407   //
408   // OptMem (controlled by the kernel option optmem_max) refers to the memory
409   // allocated to the cmsg list maintained by the kernel that contains "extra"
410   // packet information like SCM_RIGHTS or IP_TTL. Increasing this option allows
411   // the kernel to allocate more memory as needed for more control messages that
412   // need to be sent for each socket connected. Each tx zero copy sendmsg has
413   // a corresponding entry added into the Optmem queue. The entry is popped
414   // from the Optmem queue when the zero copy send is complete.
415   enum class OptMemState : int8_t {
416     kOpen,   // Everything is clear and omem is not full.
417     kFull,   // The last sendmsg() has returned with an errno of ENOBUFS.
418     kCheck,  // Error queue is read while is_in_write_ was true, so we should
419              // check this state after the sendmsg.
420   };
421 
ReleaseSendRecordLocked(uint32_t seq)422   TcpZerocopySendRecord* ReleaseSendRecordLocked(uint32_t seq)
423       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
424     auto iter = ctx_lookup_.find(seq);
425     DCHECK(iter != ctx_lookup_.end());
426     TcpZerocopySendRecord* record = iter->second;
427     ctx_lookup_.erase(iter);
428     return record;
429   }
430 
TryGetSendRecordLocked()431   TcpZerocopySendRecord* TryGetSendRecordLocked()
432       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
433     if (shutdown_.load(std::memory_order_acquire)) {
434       return nullptr;
435     }
436     if (free_send_records_size_ == 0) {
437       return nullptr;
438     }
439     free_send_records_size_--;
440     return free_send_records_[free_send_records_size_];
441   }
442 
PutSendRecordLocked(TcpZerocopySendRecord * record)443   void PutSendRecordLocked(TcpZerocopySendRecord* record)
444       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
445     DCHECK(free_send_records_size_ < max_sends_);
446     free_send_records_[free_send_records_size_] = record;
447     free_send_records_size_++;
448   }
449 
450   TcpZerocopySendRecord* send_records_ ABSL_GUARDED_BY(mu_);
451   TcpZerocopySendRecord** free_send_records_ ABSL_GUARDED_BY(mu_);
452   int max_sends_;
453   int free_send_records_size_ ABSL_GUARDED_BY(mu_);
454   grpc_core::Mutex mu_;
455   uint32_t last_send_ = 0;
456   std::atomic<bool> shutdown_{false};
457   bool enabled_ = false;
458   size_t threshold_bytes_ = kDefaultSendBytesThreshold;
459   absl::flat_hash_map<uint32_t, TcpZerocopySendRecord*> ctx_lookup_
460       ABSL_GUARDED_BY(mu_);
461   bool memory_limited_ = false;
462   bool is_in_write_ ABSL_GUARDED_BY(mu_) = false;
463   OptMemState zcopy_enobuf_state_ ABSL_GUARDED_BY(mu_) = OptMemState::kOpen;
464 };
465 
466 class PosixEndpointImpl : public grpc_core::RefCounted<PosixEndpointImpl> {
467  public:
468   PosixEndpointImpl(
469       EventHandle* handle, PosixEngineClosure* on_done,
470       std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
471       grpc_event_engine::experimental::MemoryAllocator&& allocator,
472       const PosixTcpOptions& options);
473   ~PosixEndpointImpl() override;
474   bool Read(
475       absl::AnyInvocable<void(absl::Status)> on_read,
476       grpc_event_engine::experimental::SliceBuffer* buffer,
477       const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs*
478           args);
479   bool Write(
480       absl::AnyInvocable<void(absl::Status)> on_writable,
481       grpc_event_engine::experimental::SliceBuffer* data,
482       const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs*
483           args);
484   const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetPeerAddress()485   GetPeerAddress() const {
486     return peer_address_;
487   }
488   const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetLocalAddress()489   GetLocalAddress() const {
490     return local_address_;
491   }
492 
GetWrappedFd()493   int GetWrappedFd() { return fd_; }
494 
CanTrackErrors()495   bool CanTrackErrors() const { return poller_->CanTrackErrors(); }
496 
497   void MaybeShutdown(
498       absl::Status why,
499       absl::AnyInvocable<void(absl::StatusOr<int> release_fd)> on_release_fd);
500 
501  private:
502   void UpdateRcvLowat() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_);
503   void HandleWrite(absl::Status status);
504   void HandleError(absl::Status status);
505   void HandleRead(absl::Status status) ABSL_NO_THREAD_SAFETY_ANALYSIS;
506   bool HandleReadLocked(absl::Status& status)
507       ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_);
508   void MaybeMakeReadSlices() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_);
509   bool TcpDoRead(absl::Status& status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_);
510   void FinishEstimate();
511   void AddToEstimate(size_t bytes);
512   void MaybePostReclaimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_);
513   void PerformReclamation() ABSL_LOCKS_EXCLUDED(read_mu_);
514   // Zero copy related helper methods.
515   TcpZerocopySendRecord* TcpGetSendZerocopyRecord(
516       grpc_event_engine::experimental::SliceBuffer& buf);
517   bool DoFlushZerocopy(TcpZerocopySendRecord* record, absl::Status& status);
518   bool TcpFlushZerocopy(TcpZerocopySendRecord* record, absl::Status& status);
519   bool TcpFlush(absl::Status& status);
520   void TcpShutdownTracedBufferList();
521   void UnrefMaybePutZerocopySendRecord(TcpZerocopySendRecord* record);
522   void ZerocopyDisableAndWaitForRemaining();
523   bool WriteWithTimestamps(struct msghdr* msg, size_t sending_length,
524                            ssize_t* sent_length, int* saved_errno,
525                            int additional_flags);
526   absl::Status TcpAnnotateError(absl::Status src_error) const;
527 #ifdef GRPC_LINUX_ERRQUEUE
528   bool ProcessErrors();
529   // Reads a cmsg to process zerocopy control messages.
530   void ProcessZerocopy(struct cmsghdr* cmsg);
531   // Reads a cmsg to derive timestamps from the control messages.
532   struct cmsghdr* ProcessTimestamp(msghdr* msg, struct cmsghdr* cmsg);
533 #endif  // GRPC_LINUX_ERRQUEUE
534   grpc_core::Mutex read_mu_;
535   PosixSocketWrapper sock_;
536   int fd_;
537   bool is_first_read_ = true;
538   bool has_posted_reclaimer_ ABSL_GUARDED_BY(read_mu_) = false;
539   double target_length_;
540   int min_read_chunk_size_;
541   int max_read_chunk_size_;
542   int set_rcvlowat_ = 0;
543   double bytes_read_this_round_ = 0;
544   std::atomic<int> ref_count_{1};
545 
546   // garbage after the last read.
547   grpc_event_engine::experimental::SliceBuffer last_read_buffer_;
548 
549   grpc_event_engine::experimental::SliceBuffer* incoming_buffer_
550       ABSL_GUARDED_BY(read_mu_) = nullptr;
551   // bytes pending on the socket from the last read.
552   int inq_ = 1;
553   // cache whether kernel supports inq.
554   bool inq_capable_ = false;
555 
556   grpc_event_engine::experimental::SliceBuffer* outgoing_buffer_ = nullptr;
557   // byte within outgoing_buffer's slices[0] to write next.
558   size_t outgoing_byte_idx_ = 0;
559 
560   PosixEngineClosure* on_read_ = nullptr;
561   PosixEngineClosure* on_write_ = nullptr;
562   PosixEngineClosure* on_error_ = nullptr;
563   PosixEngineClosure* on_done_ = nullptr;
564   absl::AnyInvocable<void(absl::Status)> read_cb_ ABSL_GUARDED_BY(read_mu_);
565   absl::AnyInvocable<void(absl::Status)> write_cb_;
566 
567   grpc_event_engine::experimental::EventEngine::ResolvedAddress peer_address_;
568   grpc_event_engine::experimental::EventEngine::ResolvedAddress local_address_;
569 
570   // Maintain a shared_ptr to mem_quota_ to ensure the underlying basic memory
571   // quota is not deleted until the endpoint is destroyed.
572   grpc_core::MemoryQuotaRefPtr mem_quota_;
573   grpc_core::MemoryOwner memory_owner_;
574   grpc_core::MemoryAllocator::Reservation self_reservation_;
575 
576   void* outgoing_buffer_arg_ = nullptr;
577 
578   absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd_ = nullptr;
579 
580   // A counter which starts at 0. It is initialized the first time the
581   // socket options for collecting timestamps are set, and is incremented
582   // with each byte sent.
583   int bytes_counter_ = -1;
584   // True if timestamping options are set on the socket.
585 #ifdef GRPC_LINUX_ERRQUEUE
586   bool socket_ts_enabled_ = false;
587 #endif  // GRPC_LINUX_ERRQUEUE
588   // Cache whether we can set timestamping options
589   bool ts_capable_ = true;
590   // Set to 1 if we do not want to be notified on errors anymore.
591   std::atomic<bool> stop_error_notification_{false};
592   std::unique_ptr<TcpZerocopySendCtx> tcp_zerocopy_send_ctx_;
593   TcpZerocopySendRecord* current_zerocopy_send_ = nullptr;
594   // A hint from upper layers specifying the minimum number of bytes that need
595   // to be read to make meaningful progress.
596   int min_progress_size_ = 1;
597   TracedBufferList traced_buffers_;
598   // The handle is owned by the PosixEndpointImpl object.
599   EventHandle* handle_;
600   PosixEventPoller* poller_;
601   std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
602 };
603 
604 class PosixEndpoint : public PosixEndpointWithFdSupport {
605  public:
PosixEndpoint(EventHandle * handle,PosixEngineClosure * on_shutdown,std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,grpc_event_engine::experimental::MemoryAllocator && allocator,const PosixTcpOptions & options)606   PosixEndpoint(
607       EventHandle* handle, PosixEngineClosure* on_shutdown,
608       std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
609       grpc_event_engine::experimental::MemoryAllocator&& allocator,
610       const PosixTcpOptions& options)
611       : impl_(new PosixEndpointImpl(handle, on_shutdown, std::move(engine),
612                                     std::move(allocator), options)) {}
613 
Read(absl::AnyInvocable<void (absl::Status)> on_read,grpc_event_engine::experimental::SliceBuffer * buffer,const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs * args)614   bool Read(
615       absl::AnyInvocable<void(absl::Status)> on_read,
616       grpc_event_engine::experimental::SliceBuffer* buffer,
617       const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs*
618           args) override {
619     return impl_->Read(std::move(on_read), buffer, args);
620   }
621 
Write(absl::AnyInvocable<void (absl::Status)> on_writable,grpc_event_engine::experimental::SliceBuffer * data,const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs * args)622   bool Write(
623       absl::AnyInvocable<void(absl::Status)> on_writable,
624       grpc_event_engine::experimental::SliceBuffer* data,
625       const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs*
626           args) override {
627     return impl_->Write(std::move(on_writable), data, args);
628   }
629 
630   const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetPeerAddress()631   GetPeerAddress() const override {
632     return impl_->GetPeerAddress();
633   }
634   const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetLocalAddress()635   GetLocalAddress() const override {
636     return impl_->GetLocalAddress();
637   }
638 
GetWrappedFd()639   int GetWrappedFd() override { return impl_->GetWrappedFd(); }
640 
CanTrackErrors()641   bool CanTrackErrors() override { return impl_->CanTrackErrors(); }
642 
Shutdown(absl::AnyInvocable<void (absl::StatusOr<int> release_fd)> on_release_fd)643   void Shutdown(absl::AnyInvocable<void(absl::StatusOr<int> release_fd)>
644                     on_release_fd) override {
645     if (!shutdown_.exchange(true, std::memory_order_acq_rel)) {
646       impl_->MaybeShutdown(absl::FailedPreconditionError("Endpoint closing"),
647                            std::move(on_release_fd));
648     }
649   }
650 
~PosixEndpoint()651   ~PosixEndpoint() override {
652     if (!shutdown_.exchange(true, std::memory_order_acq_rel)) {
653       impl_->MaybeShutdown(absl::FailedPreconditionError("Endpoint closing"),
654                            nullptr);
655     }
656   }
657 
658  private:
659   PosixEndpointImpl* impl_;
660   std::atomic<bool> shutdown_{false};
661 };
662 
663 #else  // GRPC_POSIX_SOCKET_TCP
664 
665 class PosixEndpoint : public PosixEndpointWithFdSupport {
666  public:
667   PosixEndpoint() = default;
668 
669   bool Read(absl::AnyInvocable<void(absl::Status)> /*on_read*/,
670             grpc_event_engine::experimental::SliceBuffer* /*buffer*/,
671             const grpc_event_engine::experimental::EventEngine::Endpoint::
672                 ReadArgs* /*args*/) override {
673     grpc_core::Crash("PosixEndpoint::Read not supported on this platform");
674   }
675 
676   bool Write(absl::AnyInvocable<void(absl::Status)> /*on_writable*/,
677              grpc_event_engine::experimental::SliceBuffer* /*data*/,
678              const grpc_event_engine::experimental::EventEngine::Endpoint::
679                  WriteArgs* /*args*/) override {
680     grpc_core::Crash("PosixEndpoint::Write not supported on this platform");
681   }
682 
683   const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
684   GetPeerAddress() const override {
685     grpc_core::Crash(
686         "PosixEndpoint::GetPeerAddress not supported on this platform");
687   }
688   const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
689   GetLocalAddress() const override {
690     grpc_core::Crash(
691         "PosixEndpoint::GetLocalAddress not supported on this platform");
692   }
693 
694   int GetWrappedFd() override {
695     grpc_core::Crash(
696         "PosixEndpoint::GetWrappedFd not supported on this platform");
697   }
698 
699   bool CanTrackErrors() override {
700     grpc_core::Crash(
701         "PosixEndpoint::CanTrackErrors not supported on this platform");
702   }
703 
704   void Shutdown(absl::AnyInvocable<void(absl::StatusOr<int> release_fd)>
705                     on_release_fd) override {
706     grpc_core::Crash("PosixEndpoint::Shutdown not supported on this platform");
707   }
708 
709   ~PosixEndpoint() override = default;
710 };
711 
712 #endif  // GRPC_POSIX_SOCKET_TCP
713 
714 // Create a PosixEndpoint.
715 // A shared_ptr of the EventEngine is passed to the endpoint to ensure that
716 // the EventEngine is alive for the lifetime of the endpoint. The ownership
717 // of the EventHandle is transferred to the endpoint.
718 std::unique_ptr<PosixEndpoint> CreatePosixEndpoint(
719     EventHandle* handle, PosixEngineClosure* on_shutdown,
720     std::shared_ptr<EventEngine> engine,
721     grpc_event_engine::experimental::MemoryAllocator&& allocator,
722     const PosixTcpOptions& options);
723 
724 }  // namespace experimental
725 }  // namespace grpc_event_engine
726 
727 #endif  // GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENDPOINT_H
728