• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
15 
16 #include <grpc/event_engine/event_engine.h>
17 #include <grpc/impl/codegen/slice.h>
18 #include <grpc/slice.h>
19 #include <grpc/slice_buffer.h>
20 #include <grpc/support/port_platform.h>
21 #include <grpc/support/time.h>
22 
23 #include <atomic>
24 #include <memory>
25 #include <utility>
26 
27 #include "absl/functional/any_invocable.h"
28 #include "absl/log/check.h"
29 #include "absl/log/log.h"
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/string_view.h"
33 #include "src/core/lib/debug/trace.h"
34 #include "src/core/lib/event_engine/extensions/can_track_errors.h"
35 #include "src/core/lib/event_engine/extensions/supports_fd.h"
36 #include "src/core/lib/event_engine/query_extensions.h"
37 #include "src/core/lib/event_engine/tcp_socket_utils.h"
38 #include "src/core/lib/iomgr/closure.h"
39 #include "src/core/lib/iomgr/endpoint.h"
40 #include "src/core/lib/iomgr/error.h"
41 #include "src/core/lib/iomgr/event_engine_shims/closure.h"
42 #include "src/core/lib/iomgr/exec_ctx.h"
43 #include "src/core/lib/iomgr/port.h"
44 #include "src/core/lib/slice/slice_string_helpers.h"
45 #include "src/core/lib/transport/error_utils.h"
46 #include "src/core/util/construct_destruct.h"
47 #include "src/core/util/debug_location.h"
48 #include "src/core/util/string.h"
49 #include "src/core/util/sync.h"
50 
51 namespace grpc_event_engine {
52 namespace experimental {
53 namespace {
54 
55 constexpr int64_t kShutdownBit = static_cast<int64_t>(1) << 32;
56 
57 // A wrapper class to manage Event Engine endpoint ref counting and
58 // asynchronous shutdown.
59 class EventEngineEndpointWrapper {
60  public:
61   struct grpc_event_engine_endpoint {
62     grpc_endpoint base;
63     EventEngineEndpointWrapper* wrapper;
64     alignas(SliceBuffer) char read_buffer[sizeof(SliceBuffer)];
65     alignas(SliceBuffer) char write_buffer[sizeof(SliceBuffer)];
66   };
67 
68   explicit EventEngineEndpointWrapper(
69       std::unique_ptr<EventEngine::Endpoint> endpoint);
70 
endpoint()71   EventEngine::Endpoint* endpoint() { return endpoint_.get(); }
72 
ReleaseEndpoint()73   std::unique_ptr<EventEngine::Endpoint> ReleaseEndpoint() {
74     return std::move(endpoint_);
75   }
76 
Fd()77   int Fd() {
78     grpc_core::MutexLock lock(&mu_);
79     return fd_;
80   }
81 
PeerAddress()82   absl::string_view PeerAddress() { return peer_address_; }
83 
LocalAddress()84   absl::string_view LocalAddress() { return local_address_; }
85 
Ref()86   void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
Unref()87   void Unref() {
88     if (refs_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
89       delete this;
90     }
91   }
92 
93   // Returns a managed grpc_endpoint object. It retains ownership of the
94   // object.
GetGrpcEndpoint()95   grpc_endpoint* GetGrpcEndpoint() { return &eeep_->base; }
96 
97   // Read using the underlying EventEngine endpoint object.
Read(grpc_closure * read_cb,grpc_slice_buffer * pending_read_buffer,const EventEngine::Endpoint::ReadArgs * args)98   bool Read(grpc_closure* read_cb, grpc_slice_buffer* pending_read_buffer,
99             const EventEngine::Endpoint::ReadArgs* args) {
100     Ref();
101     pending_read_cb_ = read_cb;
102     pending_read_buffer_ = pending_read_buffer;
103     // TODO(vigneshbabu): Use SliceBufferCast<> here.
104     grpc_core::Construct(reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer),
105                          SliceBuffer::TakeCSliceBuffer(*pending_read_buffer_));
106     SliceBuffer* read_buffer =
107         reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer);
108     read_buffer->Clear();
109     return endpoint_->Read(
110         [this](absl::Status status) { FinishPendingRead(status); }, read_buffer,
111         args);
112   }
113 
FinishPendingRead(absl::Status status)114   void FinishPendingRead(absl::Status status) {
115     auto* read_buffer = reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer);
116     grpc_slice_buffer_move_into(read_buffer->c_slice_buffer(),
117                                 pending_read_buffer_);
118     read_buffer->~SliceBuffer();
119     if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
120       size_t i;
121       LOG(INFO) << "TCP: " << eeep_->wrapper << " READ error=" << status;
122       if (ABSL_VLOG_IS_ON(2)) {
123         for (i = 0; i < pending_read_buffer_->count; i++) {
124           char* dump = grpc_dump_slice(pending_read_buffer_->slices[i],
125                                        GPR_DUMP_HEX | GPR_DUMP_ASCII);
126           VLOG(2) << "READ DATA: " << dump;
127           gpr_free(dump);
128         }
129       }
130     }
131     pending_read_buffer_ = nullptr;
132     grpc_closure* cb = pending_read_cb_;
133     pending_read_cb_ = nullptr;
134     if (grpc_core::ExecCtx::Get() == nullptr) {
135       grpc_core::ApplicationCallbackExecCtx app_ctx;
136       grpc_core::ExecCtx exec_ctx;
137       grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status);
138     } else {
139       grpc_core::Closure::Run(DEBUG_LOCATION, cb, status);
140     }
141     // For the ref taken in EventEngineEndpointWrapper::Read().
142     Unref();
143   }
144 
145   // Write using the underlying EventEngine endpoint object
Write(grpc_closure * write_cb,grpc_slice_buffer * slices,const EventEngine::Endpoint::WriteArgs * args)146   bool Write(grpc_closure* write_cb, grpc_slice_buffer* slices,
147              const EventEngine::Endpoint::WriteArgs* args) {
148     Ref();
149     if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
150       size_t i;
151       LOG(INFO) << "TCP: " << this << " WRITE (peer=" << PeerAddress() << ")";
152       if (ABSL_VLOG_IS_ON(2)) {
153         for (i = 0; i < slices->count; i++) {
154           char* dump =
155               grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
156           VLOG(2) << "WRITE DATA: " << dump;
157           gpr_free(dump);
158         }
159       }
160     }
161     // TODO(vigneshbabu): Use SliceBufferCast<> here.
162     grpc_core::Construct(reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer),
163                          SliceBuffer::TakeCSliceBuffer(*slices));
164     SliceBuffer* write_buffer =
165         reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer);
166     pending_write_cb_ = write_cb;
167     return endpoint_->Write(
168         [this](absl::Status status) { FinishPendingWrite(status); },
169         write_buffer, args);
170   }
171 
FinishPendingWrite(absl::Status status)172   void FinishPendingWrite(absl::Status status) {
173     auto* write_buffer = reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer);
174     write_buffer->~SliceBuffer();
175     GRPC_TRACE_LOG(tcp, INFO)
176         << "TCP: " << this << " WRITE (peer=" << PeerAddress()
177         << ") error=" << status;
178     grpc_closure* cb = pending_write_cb_;
179     pending_write_cb_ = nullptr;
180     if (grpc_core::ExecCtx::Get() == nullptr) {
181       grpc_core::ApplicationCallbackExecCtx app_ctx;
182       grpc_core::ExecCtx exec_ctx;
183       grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status);
184     } else {
185       grpc_core::Closure::Run(DEBUG_LOCATION, cb, status);
186     }
187     // For the ref taken in EventEngineEndpointWrapper::Write().
188     Unref();
189   }
190 
191   // Returns true if the endpoint is not yet shutdown. In that case, it also
192   // acquires a shutdown ref. Otherwise it returns false and doesn't modify
193   // the shutdown ref.
ShutdownRef()194   bool ShutdownRef() {
195     int64_t curr = shutdown_ref_.load(std::memory_order_acquire);
196     while (true) {
197       if (curr & kShutdownBit) {
198         return false;
199       }
200       if (shutdown_ref_.compare_exchange_strong(curr, curr + 1,
201                                                 std::memory_order_acq_rel,
202                                                 std::memory_order_relaxed)) {
203         return true;
204       }
205     }
206   }
207 
208   // Decrement the shutdown ref. If this is the last shutdown ref, it also
209   // deletes the underlying event engine endpoint. Deletion of the event
210   // engine endpoint should trigger execution of any pending read/write
211   // callbacks with NOT-OK status.
ShutdownUnref()212   void ShutdownUnref() {
213     if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
214         kShutdownBit + 1) {
215       auto* supports_fd =
216           QueryExtension<EndpointSupportsFdExtension>(endpoint_.get());
217       if (supports_fd != nullptr && fd_ > 0 && on_release_fd_) {
218         supports_fd->Shutdown(std::move(on_release_fd_));
219       }
220       OnShutdownInternal();
221     }
222   }
223 
224   // If trigger shutdown is called the first time, it sets the shutdown bit
225   // and decrements the shutdown ref. If trigger shutdown has been called
226   // before or in parallel, only one of them would win the race. The other
227   // invocation would simply return.
TriggerShutdown(absl::AnyInvocable<void (absl::StatusOr<int>)> on_release_fd)228   void TriggerShutdown(
229       absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd) {
230     auto* supports_fd =
231         QueryExtension<EndpointSupportsFdExtension>(endpoint_.get());
232     if (supports_fd != nullptr) {
233       on_release_fd_ = std::move(on_release_fd);
234     }
235     int64_t curr = shutdown_ref_.load(std::memory_order_acquire);
236     while (true) {
237       if (curr & kShutdownBit) {
238         return;
239       }
240       if (shutdown_ref_.compare_exchange_strong(curr, curr | kShutdownBit,
241                                                 std::memory_order_acq_rel,
242                                                 std::memory_order_relaxed)) {
243         Ref();
244         if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
245             kShutdownBit + 1) {
246           if (supports_fd != nullptr && fd_ > 0 && on_release_fd_) {
247             supports_fd->Shutdown(std::move(on_release_fd_));
248           }
249           OnShutdownInternal();
250         }
251         return;
252       }
253     }
254   }
255 
CanTrackErrors()256   bool CanTrackErrors() {
257     auto* can_track_errors =
258         QueryExtension<EndpointCanTrackErrorsExtension>(endpoint_.get());
259     if (can_track_errors != nullptr) {
260       return can_track_errors->CanTrackErrors();
261     } else {
262       return false;
263     }
264   }
265 
266  private:
OnShutdownInternal()267   void OnShutdownInternal() {
268     {
269       grpc_core::MutexLock lock(&mu_);
270       fd_ = -1;
271     }
272     endpoint_.reset();
273     // For the Ref taken in TriggerShutdown
274     Unref();
275   }
276   std::unique_ptr<EventEngine::Endpoint> endpoint_;
277   std::unique_ptr<grpc_event_engine_endpoint> eeep_;
278   std::atomic<int64_t> refs_{1};
279   std::atomic<int64_t> shutdown_ref_{1};
280   absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd_;
281   grpc_core::Mutex mu_;
282   grpc_closure* pending_read_cb_;
283   grpc_closure* pending_write_cb_;
284   grpc_slice_buffer* pending_read_buffer_;
285   const std::string peer_address_{
286       ResolvedAddressToURI(endpoint_->GetPeerAddress()).value_or("")};
287   const std::string local_address_{
288       ResolvedAddressToURI(endpoint_->GetLocalAddress()).value_or("")};
289   int fd_{-1};
290 };
291 
292 // Read from the endpoint and place the data in slices slice buffer. The
293 // provided closure is also invoked asynchronously.
EndpointRead(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,bool,int min_progress_size)294 void EndpointRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
295                   grpc_closure* cb, bool /* urgent */, int min_progress_size) {
296   auto* eeep =
297       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
298           ep);
299   if (!eeep->wrapper->ShutdownRef()) {
300     // Shutdown has already been triggered on the endpoint.
301     grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::CancelledError());
302     return;
303   }
304 
305   EventEngine::Endpoint::ReadArgs read_args = {min_progress_size};
306   if (eeep->wrapper->Read(cb, slices, &read_args)) {
307     // Read succeeded immediately. Run the callback inline.
308     eeep->wrapper->FinishPendingRead(absl::OkStatus());
309   }
310 
311   eeep->wrapper->ShutdownUnref();
312 }
313 
314 // Write the data from slices and invoke the provided closure asynchronously
315 // after the write is complete.
EndpointWrite(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg,int max_frame_size)316 void EndpointWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
317                    grpc_closure* cb, void* arg, int max_frame_size) {
318   auto* eeep =
319       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
320           ep);
321   if (!eeep->wrapper->ShutdownRef()) {
322     // Shutdown has already been triggered on the endpoint.
323     grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::CancelledError());
324     return;
325   }
326 
327   EventEngine::Endpoint::WriteArgs write_args = {arg, max_frame_size};
328   if (eeep->wrapper->Write(cb, slices, &write_args)) {
329     // Write succeeded immediately. Run the callback inline.
330     eeep->wrapper->FinishPendingWrite(absl::OkStatus());
331   }
332   eeep->wrapper->ShutdownUnref();
333 }
334 
EndpointAddToPollset(grpc_endpoint *,grpc_pollset *)335 void EndpointAddToPollset(grpc_endpoint* /* ep */,
336                           grpc_pollset* /* pollset */) {}
EndpointAddToPollsetSet(grpc_endpoint *,grpc_pollset_set *)337 void EndpointAddToPollsetSet(grpc_endpoint* /* ep */,
338                              grpc_pollset_set* /* pollset */) {}
EndpointDeleteFromPollsetSet(grpc_endpoint *,grpc_pollset_set *)339 void EndpointDeleteFromPollsetSet(grpc_endpoint* /* ep */,
340                                   grpc_pollset_set* /* pollset */) {}
341 
342 /// Attempts to free the underlying data structures.
343 /// After destruction, no new endpoint operations may be started.
344 /// It is the caller's responsibility to ensure that calls to EndpointDestroy
345 /// are synchronized.
EndpointDestroy(grpc_endpoint * ep)346 void EndpointDestroy(grpc_endpoint* ep) {
347   auto* eeep =
348       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
349           ep);
350   GRPC_TRACE_LOG(event_engine, INFO)
351       << "EventEngine::Endpoint::" << eeep->wrapper << " EndpointDestroy";
352   eeep->wrapper->TriggerShutdown(nullptr);
353   eeep->wrapper->Unref();
354 }
355 
EndpointGetPeerAddress(grpc_endpoint * ep)356 absl::string_view EndpointGetPeerAddress(grpc_endpoint* ep) {
357   auto* eeep =
358       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
359           ep);
360   return eeep->wrapper->PeerAddress();
361 }
362 
EndpointGetLocalAddress(grpc_endpoint * ep)363 absl::string_view EndpointGetLocalAddress(grpc_endpoint* ep) {
364   auto* eeep =
365       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
366           ep);
367   return eeep->wrapper->LocalAddress();
368 }
369 
EndpointGetFd(grpc_endpoint * ep)370 int EndpointGetFd(grpc_endpoint* ep) {
371   auto* eeep =
372       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
373           ep);
374   return eeep->wrapper->Fd();
375 }
376 
EndpointCanTrackErr(grpc_endpoint * ep)377 bool EndpointCanTrackErr(grpc_endpoint* ep) {
378   auto* eeep =
379       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
380           ep);
381   return eeep->wrapper->CanTrackErrors();
382 }
383 
384 grpc_endpoint_vtable grpc_event_engine_endpoint_vtable = {
385     EndpointRead,
386     EndpointWrite,
387     EndpointAddToPollset,
388     EndpointAddToPollsetSet,
389     EndpointDeleteFromPollsetSet,
390     EndpointDestroy,
391     EndpointGetPeerAddress,
392     EndpointGetLocalAddress,
393     EndpointGetFd,
394     EndpointCanTrackErr};
395 
EventEngineEndpointWrapper(std::unique_ptr<EventEngine::Endpoint> endpoint)396 EventEngineEndpointWrapper::EventEngineEndpointWrapper(
397     std::unique_ptr<EventEngine::Endpoint> endpoint)
398     : endpoint_(std::move(endpoint)),
399       eeep_(std::make_unique<grpc_event_engine_endpoint>()) {
400   eeep_->base.vtable = &grpc_event_engine_endpoint_vtable;
401   eeep_->wrapper = this;
402   auto* supports_fd =
403       QueryExtension<EndpointSupportsFdExtension>(endpoint_.get());
404   if (supports_fd != nullptr) {
405     fd_ = supports_fd->GetWrappedFd();
406   } else {
407     fd_ = -1;
408   }
409   GRPC_TRACE_LOG(event_engine, INFO)
410       << "EventEngine::Endpoint " << eeep_->wrapper << " Create";
411 }
412 
413 }  // namespace
414 
grpc_event_engine_endpoint_create(std::unique_ptr<EventEngine::Endpoint> ee_endpoint)415 grpc_endpoint* grpc_event_engine_endpoint_create(
416     std::unique_ptr<EventEngine::Endpoint> ee_endpoint) {
417   DCHECK(ee_endpoint != nullptr);
418   auto wrapper = new EventEngineEndpointWrapper(std::move(ee_endpoint));
419   return wrapper->GetGrpcEndpoint();
420 }
421 
grpc_is_event_engine_endpoint(grpc_endpoint * ep)422 bool grpc_is_event_engine_endpoint(grpc_endpoint* ep) {
423   return ep->vtable == &grpc_event_engine_endpoint_vtable;
424 }
425 
grpc_get_wrapped_event_engine_endpoint(grpc_endpoint * ep)426 EventEngine::Endpoint* grpc_get_wrapped_event_engine_endpoint(
427     grpc_endpoint* ep) {
428   if (!grpc_is_event_engine_endpoint(ep)) {
429     return nullptr;
430   }
431   auto* eeep =
432       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
433           ep);
434   return eeep->wrapper->endpoint();
435 }
436 
grpc_take_wrapped_event_engine_endpoint(grpc_endpoint * ep)437 std::unique_ptr<EventEngine::Endpoint> grpc_take_wrapped_event_engine_endpoint(
438     grpc_endpoint* ep) {
439   if (!grpc_is_event_engine_endpoint(ep)) {
440     return nullptr;
441   }
442   auto* eeep =
443       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
444           ep);
445   auto endpoint = eeep->wrapper->ReleaseEndpoint();
446   eeep->wrapper->Unref();
447   return endpoint;
448 }
449 
grpc_event_engine_endpoint_destroy_and_release_fd(grpc_endpoint * ep,int * fd,grpc_closure * on_release_fd)450 void grpc_event_engine_endpoint_destroy_and_release_fd(
451     grpc_endpoint* ep, int* fd, grpc_closure* on_release_fd) {
452   auto* eeep =
453       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
454           ep);
455   if (fd == nullptr || on_release_fd == nullptr) {
456     if (fd != nullptr) {
457       *fd = -1;
458     }
459     eeep->wrapper->TriggerShutdown(nullptr);
460   } else {
461     *fd = -1;
462     eeep->wrapper->TriggerShutdown(
463         [fd, on_release_fd](absl::StatusOr<int> release_fd) {
464           if (release_fd.ok()) {
465             *fd = *release_fd;
466           }
467           RunEventEngineClosure(on_release_fd,
468                                 absl_status_to_grpc_error(release_fd.status()));
469         });
470   }
471   eeep->wrapper->Unref();
472 }
473 
474 }  // namespace experimental
475 }  // namespace grpc_event_engine
476