1 // Copyright 2022 gRPC Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENDPOINT_H 16 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENDPOINT_H 17 18 #include <grpc/support/port_platform.h> 19 20 // IWYU pragma: no_include <bits/types/struct_iovec.h> 21 22 #include <grpc/event_engine/event_engine.h> 23 #include <grpc/event_engine/memory_allocator.h> 24 #include <grpc/event_engine/slice_buffer.h> 25 #include <grpc/support/alloc.h> 26 27 #include <atomic> 28 #include <cstdint> 29 #include <memory> 30 #include <new> 31 #include <utility> 32 33 #include "absl/base/thread_annotations.h" 34 #include "absl/container/flat_hash_map.h" 35 #include "absl/functional/any_invocable.h" 36 #include "absl/hash/hash.h" 37 #include "absl/log/check.h" 38 #include "absl/log/log.h" 39 #include "absl/status/status.h" 40 #include "absl/status/statusor.h" 41 #include "src/core/lib/event_engine/extensions/supports_fd.h" 42 #include "src/core/lib/event_engine/posix.h" 43 #include "src/core/lib/event_engine/posix_engine/event_poller.h" 44 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" 45 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" 46 #include "src/core/lib/event_engine/posix_engine/traced_buffer_list.h" 47 #include "src/core/lib/iomgr/port.h" 48 #include "src/core/lib/resource_quota/memory_quota.h" 49 #include "src/core/util/crash.h" 50 #include "src/core/util/ref_counted.h" 51 #include "src/core/util/sync.h" 52 53 #ifdef GRPC_POSIX_SOCKET_TCP 54 55 #include <sys/socket.h> // IWYU pragma: keep 56 #include <sys/types.h> // IWYU pragma: keep 57 58 #ifdef GRPC_MSG_IOVLEN_TYPE 59 typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type; 60 #else 61 typedef size_t msg_iovlen_type; 62 #endif 63 64 #endif // GRPC_POSIX_SOCKET_TCP 65 66 namespace grpc_event_engine { 67 namespace experimental { 68 69 #ifdef GRPC_POSIX_SOCKET_TCP 70 71 class TcpZerocopySendRecord { 72 public: TcpZerocopySendRecord()73 TcpZerocopySendRecord() { buf_.Clear(); }; 74 ~TcpZerocopySendRecord()75 ~TcpZerocopySendRecord() { DebugAssertEmpty(); } 76 77 // TcpZerocopySendRecord contains a slice buffer holding the slices to be 78 // sent. Given the slices that we wish to send, and the current offset into 79 // the slice buffer (indicating which have already been sent), populate an 80 // iovec array that will be used for a zerocopy enabled sendmsg(). 81 // unwind_slice_idx - input/output parameter. It indicates the index of last 82 // slice whose contents were partially sent in the previous sendmsg. After 83 // this function returns, it gets updated to to a new offset 84 // depending on the number of bytes which are decided to be sent in the 85 // current sendmsg. 86 // unwind_byte_idx - input/output parameter. It indicates the byte offset 87 // within the last slice whose contents were partially sent in the previous 88 // sendmsg. After this function returns, it gets updated to a new offset 89 // depending on the number of bytes which are decided to be sent in the 90 // current sendmsg. 91 // sending_length - total number of bytes to be sent in the current sendmsg. 92 // iov - An iovec array containing the bytes to be sent in the current 93 // sendmsg. 94 // Returns: the number of entries in the iovec array. 95 // 96 msg_iovlen_type PopulateIovs(size_t* unwind_slice_idx, 97 size_t* unwind_byte_idx, size_t* sending_length, 98 iovec* iov); 99 100 // A sendmsg() may not be able to send the bytes that we requested at this 101 // time, returning EAGAIN (possibly due to backpressure). In this case, 102 // unwind the offset into the slice buffer so we retry sending these bytes. UnwindIfThrottled(size_t unwind_slice_idx,size_t unwind_byte_idx)103 void UnwindIfThrottled(size_t unwind_slice_idx, size_t unwind_byte_idx) { 104 out_offset_.byte_idx = unwind_byte_idx; 105 out_offset_.slice_idx = unwind_slice_idx; 106 } 107 108 // Update the offset into the slice buffer based on how much we wanted to sent 109 // vs. what sendmsg() actually sent (which may be lower, possibly due to 110 // backpressure). 111 void UpdateOffsetForBytesSent(size_t sending_length, size_t actually_sent); 112 113 // Indicates whether all underlying data has been sent or not. AllSlicesSent()114 bool AllSlicesSent() { return out_offset_.slice_idx == buf_.Count(); } 115 116 // Reset this structure for a new tcp_write() with zerocopy. PrepareForSends(grpc_event_engine::experimental::SliceBuffer & slices_to_send)117 void PrepareForSends( 118 grpc_event_engine::experimental::SliceBuffer& slices_to_send) { 119 DebugAssertEmpty(); 120 out_offset_.slice_idx = 0; 121 out_offset_.byte_idx = 0; 122 buf_.Swap(slices_to_send); 123 Ref(); 124 } 125 126 // References: 1 reference per sendmsg(), and 1 for the tcp_write(). Ref()127 void Ref() { ref_.fetch_add(1, std::memory_order_relaxed); } 128 129 // Unref: called when we get an error queue notification for a sendmsg(), if a 130 // sendmsg() failed or when tcp_write() is done. Unref()131 bool Unref() { 132 const intptr_t prior = ref_.fetch_sub(1, std::memory_order_acq_rel); 133 DCHECK_GT(prior, 0); 134 if (prior == 1) { 135 AllSendsComplete(); 136 return true; 137 } 138 return false; 139 } 140 141 private: 142 struct OutgoingOffset { 143 size_t slice_idx = 0; 144 size_t byte_idx = 0; 145 }; 146 DebugAssertEmpty()147 void DebugAssertEmpty() { 148 DCHECK_EQ(buf_.Count(), 0u); 149 DCHECK_EQ(buf_.Length(), 0u); 150 DCHECK_EQ(ref_.load(std::memory_order_relaxed), 0); 151 } 152 153 // When all sendmsg() calls associated with this tcp_write() have been 154 // completed (ie. we have received the notifications for each sequence number 155 // for each sendmsg()) and all reference counts have been dropped, drop our 156 // reference to the underlying data since we no longer need it. AllSendsComplete()157 void AllSendsComplete() { 158 DCHECK_EQ(ref_.load(std::memory_order_relaxed), 0); 159 buf_.Clear(); 160 } 161 162 grpc_event_engine::experimental::SliceBuffer buf_; 163 std::atomic<intptr_t> ref_{0}; 164 OutgoingOffset out_offset_; 165 }; 166 167 class TcpZerocopySendCtx { 168 public: 169 static constexpr int kDefaultMaxSends = 4; 170 static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024; // 16KB 171 172 explicit TcpZerocopySendCtx( 173 bool zerocopy_enabled, int max_sends = kDefaultMaxSends, 174 size_t send_bytes_threshold = kDefaultSendBytesThreshold) max_sends_(max_sends)175 : max_sends_(max_sends), 176 free_send_records_size_(max_sends), 177 threshold_bytes_(send_bytes_threshold) { 178 send_records_ = static_cast<TcpZerocopySendRecord*>( 179 gpr_malloc(max_sends * sizeof(*send_records_))); 180 free_send_records_ = static_cast<TcpZerocopySendRecord**>( 181 gpr_malloc(max_sends * sizeof(*free_send_records_))); 182 if (send_records_ == nullptr || free_send_records_ == nullptr) { 183 gpr_free(send_records_); 184 gpr_free(free_send_records_); 185 VLOG(2) << "Disabling TCP TX zerocopy due to memory pressure.\n"; 186 memory_limited_ = true; 187 enabled_ = false; 188 } else { 189 for (int idx = 0; idx < max_sends_; ++idx) { 190 new (send_records_ + idx) TcpZerocopySendRecord(); 191 free_send_records_[idx] = send_records_ + idx; 192 } 193 enabled_ = zerocopy_enabled; 194 } 195 } 196 ~TcpZerocopySendCtx()197 ~TcpZerocopySendCtx() { 198 if (send_records_ != nullptr) { 199 for (int idx = 0; idx < max_sends_; ++idx) { 200 send_records_[idx].~TcpZerocopySendRecord(); 201 } 202 } 203 gpr_free(send_records_); 204 gpr_free(free_send_records_); 205 } 206 207 // True if we were unable to allocate the various bookkeeping structures at 208 // transport initialization time. If memory limited, we do not zerocopy. MemoryLimited()209 bool MemoryLimited() const { return memory_limited_; } 210 211 // TCP send zerocopy maintains an implicit sequence number for every 212 // successful sendmsg() with zerocopy enabled; the kernel later gives us an 213 // error queue notification with this sequence number indicating that the 214 // underlying data buffers that we sent can now be released. Once that 215 // notification is received, we can release the buffers associated with this 216 // zerocopy send record. Here, we associate the sequence number with the data 217 // buffers that were sent with the corresponding call to sendmsg(). NoteSend(TcpZerocopySendRecord * record)218 void NoteSend(TcpZerocopySendRecord* record) { 219 record->Ref(); 220 { 221 grpc_core::MutexLock lock(&mu_); 222 is_in_write_ = true; 223 AssociateSeqWithSendRecordLocked(last_send_, record); 224 } 225 ++last_send_; 226 } 227 228 // If sendmsg() actually failed, though, we need to revert the sequence number 229 // that we speculatively bumped before calling sendmsg(). Note that we bump 230 // this sequence number and perform relevant bookkeeping (see: NoteSend()) 231 // *before* calling sendmsg() since, if we called it *after* sendmsg(), then 232 // there is a possible race with the release notification which could occur on 233 // another thread before we do the necessary bookkeeping. Hence, calling 234 // NoteSend() *before* sendmsg() and implementing an undo function is needed. UndoSend()235 void UndoSend() { 236 --last_send_; 237 if (ReleaseSendRecord(last_send_)->Unref()) { 238 // We should still be holding the ref taken by tcp_write(). 239 DCHECK(0); 240 } 241 } 242 243 // Simply associate this send record (and the underlying sent data buffers) 244 // with the implicit sequence number for this zerocopy sendmsg(). AssociateSeqWithSendRecordLocked(uint32_t seq,TcpZerocopySendRecord * record)245 void AssociateSeqWithSendRecordLocked(uint32_t seq, 246 TcpZerocopySendRecord* record) 247 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 248 ctx_lookup_.emplace(seq, record); 249 } 250 251 // Get a send record for a send that we wish to do with zerocopy. GetSendRecord()252 TcpZerocopySendRecord* GetSendRecord() { 253 grpc_core::MutexLock lock(&mu_); 254 return TryGetSendRecordLocked(); 255 } 256 257 // A given send record corresponds to a single tcp_write() with zerocopy 258 // enabled. This can result in several sendmsg() calls to flush all of the 259 // data to wire. Each sendmsg() takes a reference on the 260 // TcpZerocopySendRecord, and corresponds to a single sequence number. 261 // ReleaseSendRecord releases a reference on TcpZerocopySendRecord for a 262 // single sequence number. This is called either when we receive the relevant 263 // error queue notification (saying that we can discard the underlying 264 // buffers for this sendmsg()) is received from the kernel - or, in case 265 // sendmsg() was unsuccessful to begin with. ReleaseSendRecord(uint32_t seq)266 TcpZerocopySendRecord* ReleaseSendRecord(uint32_t seq) { 267 grpc_core::MutexLock lock(&mu_); 268 return ReleaseSendRecordLocked(seq); 269 } 270 271 // After all the references to a TcpZerocopySendRecord are released, we can 272 // add it back to the pool (of size max_sends_). Note that we can only have 273 // max_sends_ tcp_write() instances with zerocopy enabled in flight at the 274 // same time. PutSendRecord(TcpZerocopySendRecord * record)275 void PutSendRecord(TcpZerocopySendRecord* record) { 276 grpc_core::MutexLock lock(&mu_); 277 DCHECK(record >= send_records_ && record < send_records_ + max_sends_); 278 PutSendRecordLocked(record); 279 } 280 281 // Indicate that we are disposing of this zerocopy context. This indicator 282 // will prevent new zerocopy writes from being issued. Shutdown()283 void Shutdown() { shutdown_.store(true, std::memory_order_release); } 284 285 // Indicates that there are no inflight tcp_write() instances with zerocopy 286 // enabled. AllSendRecordsEmpty()287 bool AllSendRecordsEmpty() { 288 grpc_core::MutexLock lock(&mu_); 289 return free_send_records_size_ == max_sends_; 290 } 291 Enabled()292 bool Enabled() const { return enabled_; } 293 294 // Only use zerocopy if we are sending at least this many bytes. The 295 // additional overhead of reading the error queue for notifications means that 296 // zerocopy is not useful for small transfers. ThresholdBytes()297 size_t ThresholdBytes() const { return threshold_bytes_; } 298 299 // Expected to be called by handler reading messages from the err queue. 300 // It is used to indicate that some optmem memory is now available. It returns 301 // true to tell the caller to mark the file descriptor as immediately 302 // writable. 303 // 304 // OptMem (controlled by the kernel option optmem_max) refers to the memory 305 // allocated to the cmsg list maintained by the kernel that contains "extra" 306 // packet information like SCM_RIGHTS or IP_TTL. Increasing this option allows 307 // the kernel to allocate more memory as needed for more control messages that 308 // need to be sent for each socket connected. 309 // 310 // If a write is currently in progress on the socket (ie. we have issued a 311 // sendmsg() and are about to check its return value) then we set omem state 312 // to CHECK to make the sending thread know that some tcp_omem was 313 // concurrently freed even if sendmsg() returns ENOBUFS. In this case, since 314 // there is already an active send thread, we do not need to mark the 315 // socket writeable, so we return false. 316 // 317 // If there was no write in progress on the socket, and the socket was not 318 // marked as FULL, then we need not mark the socket writeable now that some 319 // tcp_omem memory is freed since it was not considered as blocked on 320 // tcp_omem to begin with. So in this case, return false. 321 // 322 // But, if a write was not in progress and the omem state was FULL, then we 323 // need to mark the socket writeable since it is no longer blocked by 324 // tcp_omem. In this case, return true. 325 // 326 // Please refer to the STATE TRANSITION DIAGRAM below for more details. 327 // UpdateZeroCopyOptMemStateAfterFree()328 bool UpdateZeroCopyOptMemStateAfterFree() { 329 grpc_core::MutexLock lock(&mu_); 330 if (is_in_write_) { 331 zcopy_enobuf_state_ = OptMemState::kCheck; 332 return false; 333 } 334 DCHECK(zcopy_enobuf_state_ != OptMemState::kCheck); 335 if (zcopy_enobuf_state_ == OptMemState::kFull) { 336 // A previous sendmsg attempt was blocked by ENOBUFS. Return true to 337 // mark the fd as writable so the next write attempt could be made. 338 zcopy_enobuf_state_ = OptMemState::kOpen; 339 return true; 340 } else if (zcopy_enobuf_state_ == OptMemState::kOpen) { 341 // No need to mark the fd as writable because the previous write 342 // attempt did not encounter ENOBUFS. 343 return false; 344 } else { 345 // This state should never be reached because it implies that the previous 346 // state was CHECK and is_in_write is false. This means that after the 347 // previous sendmsg returned and set is_in_write to false, it did 348 // not update the z-copy change from CHECK to OPEN. 349 grpc_core::Crash("OMem state error!"); 350 } 351 } 352 353 // Expected to be called by the thread calling sendmsg after the syscall 354 // invocation. is complete. If an ENOBUF is seen, it checks if the error 355 // handler (Tx0cp completions) has already run and free'ed up some OMem. It 356 // returns true indicating that the write can be attempted again immediately. 357 // If ENOBUFS was seen but no Tx0cp completions have been received between the 358 // sendmsg() and us taking this lock, then tcp_omem is still full from our 359 // point of view. Therefore, we do not signal that the socket is writeable 360 // with respect to the availability of tcp_omem. Therefore the function 361 // returns false. This indicates that another write should not be attempted 362 // immediately and the calling thread should wait until the socket is writable 363 // again. If ENOBUFS was not seen, then again return false because the next 364 // write should be attempted only when the socket is writable again. 365 // 366 // Please refer to the STATE TRANSITION DIAGRAM below for more details. 367 // UpdateZeroCopyOptMemStateAfterSend(bool seen_enobuf,bool & constrained)368 bool UpdateZeroCopyOptMemStateAfterSend(bool seen_enobuf, bool& constrained) { 369 grpc_core::MutexLock lock(&mu_); 370 is_in_write_ = false; 371 constrained = false; 372 if (seen_enobuf) { 373 if (ctx_lookup_.size() == 1) { 374 // There is no un-acked z-copy record. Set constrained to true to 375 // indicate that we are re-source constrained because we're seeing 376 // ENOBUFS even for the first record. This indicates that either 377 // the process does not have hard memlock ulimit or RLIMIT_MEMLOCK 378 // configured correctly. 379 constrained = true; 380 } 381 if (zcopy_enobuf_state_ == OptMemState::kCheck) { 382 zcopy_enobuf_state_ = OptMemState::kOpen; 383 return true; 384 } else { 385 zcopy_enobuf_state_ = OptMemState::kFull; 386 } 387 } else if (zcopy_enobuf_state_ != OptMemState::kOpen) { 388 zcopy_enobuf_state_ = OptMemState::kOpen; 389 } 390 return false; 391 } 392 393 private: 394 // STATE TRANSITION DIAGRAM 395 // 396 // sendmsg succeeds Tx-zero copy succeeds and there is no active sendmsg 397 // ----<<--+ +------<<-------------------------------------+ 398 // | | | | 399 // | | v sendmsg returns ENOBUFS | 400 // +-----> OPEN ------------->>-------------------------> FULL 401 // ^ | 402 // | | 403 // | sendmsg completes | 404 // +----<<---------- CHECK <-------<<-------------+ 405 // Tx-zero copy succeeds and there is 406 // an active sendmsg 407 // 408 // OptMem (controlled by the kernel option optmem_max) refers to the memory 409 // allocated to the cmsg list maintained by the kernel that contains "extra" 410 // packet information like SCM_RIGHTS or IP_TTL. Increasing this option allows 411 // the kernel to allocate more memory as needed for more control messages that 412 // need to be sent for each socket connected. Each tx zero copy sendmsg has 413 // a corresponding entry added into the Optmem queue. The entry is popped 414 // from the Optmem queue when the zero copy send is complete. 415 enum class OptMemState : int8_t { 416 kOpen, // Everything is clear and omem is not full. 417 kFull, // The last sendmsg() has returned with an errno of ENOBUFS. 418 kCheck, // Error queue is read while is_in_write_ was true, so we should 419 // check this state after the sendmsg. 420 }; 421 ReleaseSendRecordLocked(uint32_t seq)422 TcpZerocopySendRecord* ReleaseSendRecordLocked(uint32_t seq) 423 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 424 auto iter = ctx_lookup_.find(seq); 425 DCHECK(iter != ctx_lookup_.end()); 426 TcpZerocopySendRecord* record = iter->second; 427 ctx_lookup_.erase(iter); 428 return record; 429 } 430 TryGetSendRecordLocked()431 TcpZerocopySendRecord* TryGetSendRecordLocked() 432 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 433 if (shutdown_.load(std::memory_order_acquire)) { 434 return nullptr; 435 } 436 if (free_send_records_size_ == 0) { 437 return nullptr; 438 } 439 free_send_records_size_--; 440 return free_send_records_[free_send_records_size_]; 441 } 442 PutSendRecordLocked(TcpZerocopySendRecord * record)443 void PutSendRecordLocked(TcpZerocopySendRecord* record) 444 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 445 DCHECK(free_send_records_size_ < max_sends_); 446 free_send_records_[free_send_records_size_] = record; 447 free_send_records_size_++; 448 } 449 450 TcpZerocopySendRecord* send_records_ ABSL_GUARDED_BY(mu_); 451 TcpZerocopySendRecord** free_send_records_ ABSL_GUARDED_BY(mu_); 452 int max_sends_; 453 int free_send_records_size_ ABSL_GUARDED_BY(mu_); 454 grpc_core::Mutex mu_; 455 uint32_t last_send_ = 0; 456 std::atomic<bool> shutdown_{false}; 457 bool enabled_ = false; 458 size_t threshold_bytes_ = kDefaultSendBytesThreshold; 459 absl::flat_hash_map<uint32_t, TcpZerocopySendRecord*> ctx_lookup_ 460 ABSL_GUARDED_BY(mu_); 461 bool memory_limited_ = false; 462 bool is_in_write_ ABSL_GUARDED_BY(mu_) = false; 463 OptMemState zcopy_enobuf_state_ ABSL_GUARDED_BY(mu_) = OptMemState::kOpen; 464 }; 465 466 class PosixEndpointImpl : public grpc_core::RefCounted<PosixEndpointImpl> { 467 public: 468 PosixEndpointImpl( 469 EventHandle* handle, PosixEngineClosure* on_done, 470 std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine, 471 grpc_event_engine::experimental::MemoryAllocator&& allocator, 472 const PosixTcpOptions& options); 473 ~PosixEndpointImpl() override; 474 bool Read( 475 absl::AnyInvocable<void(absl::Status)> on_read, 476 grpc_event_engine::experimental::SliceBuffer* buffer, 477 const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs* 478 args); 479 bool Write( 480 absl::AnyInvocable<void(absl::Status)> on_writable, 481 grpc_event_engine::experimental::SliceBuffer* data, 482 const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs* 483 args); 484 const grpc_event_engine::experimental::EventEngine::ResolvedAddress& GetPeerAddress()485 GetPeerAddress() const { 486 return peer_address_; 487 } 488 const grpc_event_engine::experimental::EventEngine::ResolvedAddress& GetLocalAddress()489 GetLocalAddress() const { 490 return local_address_; 491 } 492 GetWrappedFd()493 int GetWrappedFd() { return fd_; } 494 CanTrackErrors()495 bool CanTrackErrors() const { return poller_->CanTrackErrors(); } 496 497 void MaybeShutdown( 498 absl::Status why, 499 absl::AnyInvocable<void(absl::StatusOr<int> release_fd)> on_release_fd); 500 501 private: 502 void UpdateRcvLowat() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_); 503 void HandleWrite(absl::Status status); 504 void HandleError(absl::Status status); 505 void HandleRead(absl::Status status) ABSL_NO_THREAD_SAFETY_ANALYSIS; 506 bool HandleReadLocked(absl::Status& status) 507 ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_); 508 void MaybeMakeReadSlices() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_); 509 bool TcpDoRead(absl::Status& status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_); 510 void FinishEstimate(); 511 void AddToEstimate(size_t bytes); 512 void MaybePostReclaimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_); 513 void PerformReclamation() ABSL_LOCKS_EXCLUDED(read_mu_); 514 // Zero copy related helper methods. 515 TcpZerocopySendRecord* TcpGetSendZerocopyRecord( 516 grpc_event_engine::experimental::SliceBuffer& buf); 517 bool DoFlushZerocopy(TcpZerocopySendRecord* record, absl::Status& status); 518 bool TcpFlushZerocopy(TcpZerocopySendRecord* record, absl::Status& status); 519 bool TcpFlush(absl::Status& status); 520 void TcpShutdownTracedBufferList(); 521 void UnrefMaybePutZerocopySendRecord(TcpZerocopySendRecord* record); 522 void ZerocopyDisableAndWaitForRemaining(); 523 bool WriteWithTimestamps(struct msghdr* msg, size_t sending_length, 524 ssize_t* sent_length, int* saved_errno, 525 int additional_flags); 526 absl::Status TcpAnnotateError(absl::Status src_error) const; 527 #ifdef GRPC_LINUX_ERRQUEUE 528 bool ProcessErrors(); 529 // Reads a cmsg to process zerocopy control messages. 530 void ProcessZerocopy(struct cmsghdr* cmsg); 531 // Reads a cmsg to derive timestamps from the control messages. 532 struct cmsghdr* ProcessTimestamp(msghdr* msg, struct cmsghdr* cmsg); 533 #endif // GRPC_LINUX_ERRQUEUE 534 grpc_core::Mutex read_mu_; 535 PosixSocketWrapper sock_; 536 int fd_; 537 bool is_first_read_ = true; 538 bool has_posted_reclaimer_ ABSL_GUARDED_BY(read_mu_) = false; 539 double target_length_; 540 int min_read_chunk_size_; 541 int max_read_chunk_size_; 542 int set_rcvlowat_ = 0; 543 double bytes_read_this_round_ = 0; 544 std::atomic<int> ref_count_{1}; 545 546 // garbage after the last read. 547 grpc_event_engine::experimental::SliceBuffer last_read_buffer_; 548 549 grpc_event_engine::experimental::SliceBuffer* incoming_buffer_ 550 ABSL_GUARDED_BY(read_mu_) = nullptr; 551 // bytes pending on the socket from the last read. 552 int inq_ = 1; 553 // cache whether kernel supports inq. 554 bool inq_capable_ = false; 555 556 grpc_event_engine::experimental::SliceBuffer* outgoing_buffer_ = nullptr; 557 // byte within outgoing_buffer's slices[0] to write next. 558 size_t outgoing_byte_idx_ = 0; 559 560 PosixEngineClosure* on_read_ = nullptr; 561 PosixEngineClosure* on_write_ = nullptr; 562 PosixEngineClosure* on_error_ = nullptr; 563 PosixEngineClosure* on_done_ = nullptr; 564 absl::AnyInvocable<void(absl::Status)> read_cb_ ABSL_GUARDED_BY(read_mu_); 565 absl::AnyInvocable<void(absl::Status)> write_cb_; 566 567 grpc_event_engine::experimental::EventEngine::ResolvedAddress peer_address_; 568 grpc_event_engine::experimental::EventEngine::ResolvedAddress local_address_; 569 570 // Maintain a shared_ptr to mem_quota_ to ensure the underlying basic memory 571 // quota is not deleted until the endpoint is destroyed. 572 grpc_core::MemoryQuotaRefPtr mem_quota_; 573 grpc_core::MemoryOwner memory_owner_; 574 grpc_core::MemoryAllocator::Reservation self_reservation_; 575 576 void* outgoing_buffer_arg_ = nullptr; 577 578 absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd_ = nullptr; 579 580 // A counter which starts at 0. It is initialized the first time the 581 // socket options for collecting timestamps are set, and is incremented 582 // with each byte sent. 583 int bytes_counter_ = -1; 584 // True if timestamping options are set on the socket. 585 #ifdef GRPC_LINUX_ERRQUEUE 586 bool socket_ts_enabled_ = false; 587 #endif // GRPC_LINUX_ERRQUEUE 588 // Cache whether we can set timestamping options 589 bool ts_capable_ = true; 590 // Set to 1 if we do not want to be notified on errors anymore. 591 std::atomic<bool> stop_error_notification_{false}; 592 std::unique_ptr<TcpZerocopySendCtx> tcp_zerocopy_send_ctx_; 593 TcpZerocopySendRecord* current_zerocopy_send_ = nullptr; 594 // A hint from upper layers specifying the minimum number of bytes that need 595 // to be read to make meaningful progress. 596 int min_progress_size_ = 1; 597 TracedBufferList traced_buffers_; 598 // The handle is owned by the PosixEndpointImpl object. 599 EventHandle* handle_; 600 PosixEventPoller* poller_; 601 std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_; 602 }; 603 604 class PosixEndpoint : public PosixEndpointWithFdSupport { 605 public: PosixEndpoint(EventHandle * handle,PosixEngineClosure * on_shutdown,std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,grpc_event_engine::experimental::MemoryAllocator && allocator,const PosixTcpOptions & options)606 PosixEndpoint( 607 EventHandle* handle, PosixEngineClosure* on_shutdown, 608 std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine, 609 grpc_event_engine::experimental::MemoryAllocator&& allocator, 610 const PosixTcpOptions& options) 611 : impl_(new PosixEndpointImpl(handle, on_shutdown, std::move(engine), 612 std::move(allocator), options)) {} 613 Read(absl::AnyInvocable<void (absl::Status)> on_read,grpc_event_engine::experimental::SliceBuffer * buffer,const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs * args)614 bool Read( 615 absl::AnyInvocable<void(absl::Status)> on_read, 616 grpc_event_engine::experimental::SliceBuffer* buffer, 617 const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs* 618 args) override { 619 return impl_->Read(std::move(on_read), buffer, args); 620 } 621 Write(absl::AnyInvocable<void (absl::Status)> on_writable,grpc_event_engine::experimental::SliceBuffer * data,const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs * args)622 bool Write( 623 absl::AnyInvocable<void(absl::Status)> on_writable, 624 grpc_event_engine::experimental::SliceBuffer* data, 625 const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs* 626 args) override { 627 return impl_->Write(std::move(on_writable), data, args); 628 } 629 630 const grpc_event_engine::experimental::EventEngine::ResolvedAddress& GetPeerAddress()631 GetPeerAddress() const override { 632 return impl_->GetPeerAddress(); 633 } 634 const grpc_event_engine::experimental::EventEngine::ResolvedAddress& GetLocalAddress()635 GetLocalAddress() const override { 636 return impl_->GetLocalAddress(); 637 } 638 GetWrappedFd()639 int GetWrappedFd() override { return impl_->GetWrappedFd(); } 640 CanTrackErrors()641 bool CanTrackErrors() override { return impl_->CanTrackErrors(); } 642 Shutdown(absl::AnyInvocable<void (absl::StatusOr<int> release_fd)> on_release_fd)643 void Shutdown(absl::AnyInvocable<void(absl::StatusOr<int> release_fd)> 644 on_release_fd) override { 645 if (!shutdown_.exchange(true, std::memory_order_acq_rel)) { 646 impl_->MaybeShutdown(absl::FailedPreconditionError("Endpoint closing"), 647 std::move(on_release_fd)); 648 } 649 } 650 ~PosixEndpoint()651 ~PosixEndpoint() override { 652 if (!shutdown_.exchange(true, std::memory_order_acq_rel)) { 653 impl_->MaybeShutdown(absl::FailedPreconditionError("Endpoint closing"), 654 nullptr); 655 } 656 } 657 658 private: 659 PosixEndpointImpl* impl_; 660 std::atomic<bool> shutdown_{false}; 661 }; 662 663 #else // GRPC_POSIX_SOCKET_TCP 664 665 class PosixEndpoint : public PosixEndpointWithFdSupport { 666 public: 667 PosixEndpoint() = default; 668 669 bool Read(absl::AnyInvocable<void(absl::Status)> /*on_read*/, 670 grpc_event_engine::experimental::SliceBuffer* /*buffer*/, 671 const grpc_event_engine::experimental::EventEngine::Endpoint:: 672 ReadArgs* /*args*/) override { 673 grpc_core::Crash("PosixEndpoint::Read not supported on this platform"); 674 } 675 676 bool Write(absl::AnyInvocable<void(absl::Status)> /*on_writable*/, 677 grpc_event_engine::experimental::SliceBuffer* /*data*/, 678 const grpc_event_engine::experimental::EventEngine::Endpoint:: 679 WriteArgs* /*args*/) override { 680 grpc_core::Crash("PosixEndpoint::Write not supported on this platform"); 681 } 682 683 const grpc_event_engine::experimental::EventEngine::ResolvedAddress& 684 GetPeerAddress() const override { 685 grpc_core::Crash( 686 "PosixEndpoint::GetPeerAddress not supported on this platform"); 687 } 688 const grpc_event_engine::experimental::EventEngine::ResolvedAddress& 689 GetLocalAddress() const override { 690 grpc_core::Crash( 691 "PosixEndpoint::GetLocalAddress not supported on this platform"); 692 } 693 694 int GetWrappedFd() override { 695 grpc_core::Crash( 696 "PosixEndpoint::GetWrappedFd not supported on this platform"); 697 } 698 699 bool CanTrackErrors() override { 700 grpc_core::Crash( 701 "PosixEndpoint::CanTrackErrors not supported on this platform"); 702 } 703 704 void Shutdown(absl::AnyInvocable<void(absl::StatusOr<int> release_fd)> 705 on_release_fd) override { 706 grpc_core::Crash("PosixEndpoint::Shutdown not supported on this platform"); 707 } 708 709 ~PosixEndpoint() override = default; 710 }; 711 712 #endif // GRPC_POSIX_SOCKET_TCP 713 714 // Create a PosixEndpoint. 715 // A shared_ptr of the EventEngine is passed to the endpoint to ensure that 716 // the EventEngine is alive for the lifetime of the endpoint. The ownership 717 // of the EventHandle is transferred to the endpoint. 718 std::unique_ptr<PosixEndpoint> CreatePosixEndpoint( 719 EventHandle* handle, PosixEngineClosure* on_shutdown, 720 std::shared_ptr<EventEngine> engine, 721 grpc_event_engine::experimental::MemoryAllocator&& allocator, 722 const PosixTcpOptions& options); 723 724 } // namespace experimental 725 } // namespace grpc_event_engine 726 727 #endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENDPOINT_H 728