1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
15
16 #include <grpc/event_engine/event_engine.h>
17 #include <grpc/impl/codegen/slice.h>
18 #include <grpc/slice.h>
19 #include <grpc/slice_buffer.h>
20 #include <grpc/support/port_platform.h>
21 #include <grpc/support/time.h>
22
23 #include <atomic>
24 #include <memory>
25 #include <utility>
26
27 #include "absl/functional/any_invocable.h"
28 #include "absl/log/check.h"
29 #include "absl/log/log.h"
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/string_view.h"
33 #include "src/core/lib/debug/trace.h"
34 #include "src/core/lib/event_engine/extensions/can_track_errors.h"
35 #include "src/core/lib/event_engine/extensions/supports_fd.h"
36 #include "src/core/lib/event_engine/query_extensions.h"
37 #include "src/core/lib/event_engine/tcp_socket_utils.h"
38 #include "src/core/lib/iomgr/closure.h"
39 #include "src/core/lib/iomgr/endpoint.h"
40 #include "src/core/lib/iomgr/error.h"
41 #include "src/core/lib/iomgr/event_engine_shims/closure.h"
42 #include "src/core/lib/iomgr/exec_ctx.h"
43 #include "src/core/lib/iomgr/port.h"
44 #include "src/core/lib/slice/slice_string_helpers.h"
45 #include "src/core/lib/transport/error_utils.h"
46 #include "src/core/util/construct_destruct.h"
47 #include "src/core/util/debug_location.h"
48 #include "src/core/util/string.h"
49 #include "src/core/util/sync.h"
50
51 namespace grpc_event_engine {
52 namespace experimental {
53 namespace {
54
55 constexpr int64_t kShutdownBit = static_cast<int64_t>(1) << 32;
56
57 // A wrapper class to manage Event Engine endpoint ref counting and
58 // asynchronous shutdown.
59 class EventEngineEndpointWrapper {
60 public:
61 struct grpc_event_engine_endpoint {
62 grpc_endpoint base;
63 EventEngineEndpointWrapper* wrapper;
64 alignas(SliceBuffer) char read_buffer[sizeof(SliceBuffer)];
65 alignas(SliceBuffer) char write_buffer[sizeof(SliceBuffer)];
66 };
67
68 explicit EventEngineEndpointWrapper(
69 std::unique_ptr<EventEngine::Endpoint> endpoint);
70
endpoint()71 EventEngine::Endpoint* endpoint() { return endpoint_.get(); }
72
ReleaseEndpoint()73 std::unique_ptr<EventEngine::Endpoint> ReleaseEndpoint() {
74 return std::move(endpoint_);
75 }
76
Fd()77 int Fd() {
78 grpc_core::MutexLock lock(&mu_);
79 return fd_;
80 }
81
PeerAddress()82 absl::string_view PeerAddress() { return peer_address_; }
83
LocalAddress()84 absl::string_view LocalAddress() { return local_address_; }
85
Ref()86 void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
Unref()87 void Unref() {
88 if (refs_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
89 delete this;
90 }
91 }
92
93 // Returns a managed grpc_endpoint object. It retains ownership of the
94 // object.
GetGrpcEndpoint()95 grpc_endpoint* GetGrpcEndpoint() { return &eeep_->base; }
96
97 // Read using the underlying EventEngine endpoint object.
Read(grpc_closure * read_cb,grpc_slice_buffer * pending_read_buffer,const EventEngine::Endpoint::ReadArgs * args)98 bool Read(grpc_closure* read_cb, grpc_slice_buffer* pending_read_buffer,
99 const EventEngine::Endpoint::ReadArgs* args) {
100 Ref();
101 pending_read_cb_ = read_cb;
102 pending_read_buffer_ = pending_read_buffer;
103 // TODO(vigneshbabu): Use SliceBufferCast<> here.
104 grpc_core::Construct(reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer),
105 SliceBuffer::TakeCSliceBuffer(*pending_read_buffer_));
106 SliceBuffer* read_buffer =
107 reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer);
108 read_buffer->Clear();
109 return endpoint_->Read(
110 [this](absl::Status status) { FinishPendingRead(status); }, read_buffer,
111 args);
112 }
113
FinishPendingRead(absl::Status status)114 void FinishPendingRead(absl::Status status) {
115 auto* read_buffer = reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer);
116 grpc_slice_buffer_move_into(read_buffer->c_slice_buffer(),
117 pending_read_buffer_);
118 read_buffer->~SliceBuffer();
119 if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
120 size_t i;
121 LOG(INFO) << "TCP: " << eeep_->wrapper << " READ error=" << status;
122 if (ABSL_VLOG_IS_ON(2)) {
123 for (i = 0; i < pending_read_buffer_->count; i++) {
124 char* dump = grpc_dump_slice(pending_read_buffer_->slices[i],
125 GPR_DUMP_HEX | GPR_DUMP_ASCII);
126 VLOG(2) << "READ DATA: " << dump;
127 gpr_free(dump);
128 }
129 }
130 }
131 pending_read_buffer_ = nullptr;
132 grpc_closure* cb = pending_read_cb_;
133 pending_read_cb_ = nullptr;
134 if (grpc_core::ExecCtx::Get() == nullptr) {
135 grpc_core::ApplicationCallbackExecCtx app_ctx;
136 grpc_core::ExecCtx exec_ctx;
137 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status);
138 } else {
139 grpc_core::Closure::Run(DEBUG_LOCATION, cb, status);
140 }
141 // For the ref taken in EventEngineEndpointWrapper::Read().
142 Unref();
143 }
144
145 // Write using the underlying EventEngine endpoint object
Write(grpc_closure * write_cb,grpc_slice_buffer * slices,const EventEngine::Endpoint::WriteArgs * args)146 bool Write(grpc_closure* write_cb, grpc_slice_buffer* slices,
147 const EventEngine::Endpoint::WriteArgs* args) {
148 Ref();
149 if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
150 size_t i;
151 LOG(INFO) << "TCP: " << this << " WRITE (peer=" << PeerAddress() << ")";
152 if (ABSL_VLOG_IS_ON(2)) {
153 for (i = 0; i < slices->count; i++) {
154 char* dump =
155 grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
156 VLOG(2) << "WRITE DATA: " << dump;
157 gpr_free(dump);
158 }
159 }
160 }
161 // TODO(vigneshbabu): Use SliceBufferCast<> here.
162 grpc_core::Construct(reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer),
163 SliceBuffer::TakeCSliceBuffer(*slices));
164 SliceBuffer* write_buffer =
165 reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer);
166 pending_write_cb_ = write_cb;
167 return endpoint_->Write(
168 [this](absl::Status status) { FinishPendingWrite(status); },
169 write_buffer, args);
170 }
171
FinishPendingWrite(absl::Status status)172 void FinishPendingWrite(absl::Status status) {
173 auto* write_buffer = reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer);
174 write_buffer->~SliceBuffer();
175 GRPC_TRACE_LOG(tcp, INFO)
176 << "TCP: " << this << " WRITE (peer=" << PeerAddress()
177 << ") error=" << status;
178 grpc_closure* cb = pending_write_cb_;
179 pending_write_cb_ = nullptr;
180 if (grpc_core::ExecCtx::Get() == nullptr) {
181 grpc_core::ApplicationCallbackExecCtx app_ctx;
182 grpc_core::ExecCtx exec_ctx;
183 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status);
184 } else {
185 grpc_core::Closure::Run(DEBUG_LOCATION, cb, status);
186 }
187 // For the ref taken in EventEngineEndpointWrapper::Write().
188 Unref();
189 }
190
191 // Returns true if the endpoint is not yet shutdown. In that case, it also
192 // acquires a shutdown ref. Otherwise it returns false and doesn't modify
193 // the shutdown ref.
ShutdownRef()194 bool ShutdownRef() {
195 int64_t curr = shutdown_ref_.load(std::memory_order_acquire);
196 while (true) {
197 if (curr & kShutdownBit) {
198 return false;
199 }
200 if (shutdown_ref_.compare_exchange_strong(curr, curr + 1,
201 std::memory_order_acq_rel,
202 std::memory_order_relaxed)) {
203 return true;
204 }
205 }
206 }
207
208 // Decrement the shutdown ref. If this is the last shutdown ref, it also
209 // deletes the underlying event engine endpoint. Deletion of the event
210 // engine endpoint should trigger execution of any pending read/write
211 // callbacks with NOT-OK status.
ShutdownUnref()212 void ShutdownUnref() {
213 if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
214 kShutdownBit + 1) {
215 auto* supports_fd =
216 QueryExtension<EndpointSupportsFdExtension>(endpoint_.get());
217 if (supports_fd != nullptr && fd_ > 0 && on_release_fd_) {
218 supports_fd->Shutdown(std::move(on_release_fd_));
219 }
220 OnShutdownInternal();
221 }
222 }
223
224 // If trigger shutdown is called the first time, it sets the shutdown bit
225 // and decrements the shutdown ref. If trigger shutdown has been called
226 // before or in parallel, only one of them would win the race. The other
227 // invocation would simply return.
TriggerShutdown(absl::AnyInvocable<void (absl::StatusOr<int>)> on_release_fd)228 void TriggerShutdown(
229 absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd) {
230 auto* supports_fd =
231 QueryExtension<EndpointSupportsFdExtension>(endpoint_.get());
232 if (supports_fd != nullptr) {
233 on_release_fd_ = std::move(on_release_fd);
234 }
235 int64_t curr = shutdown_ref_.load(std::memory_order_acquire);
236 while (true) {
237 if (curr & kShutdownBit) {
238 return;
239 }
240 if (shutdown_ref_.compare_exchange_strong(curr, curr | kShutdownBit,
241 std::memory_order_acq_rel,
242 std::memory_order_relaxed)) {
243 Ref();
244 if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
245 kShutdownBit + 1) {
246 if (supports_fd != nullptr && fd_ > 0 && on_release_fd_) {
247 supports_fd->Shutdown(std::move(on_release_fd_));
248 }
249 OnShutdownInternal();
250 }
251 return;
252 }
253 }
254 }
255
CanTrackErrors()256 bool CanTrackErrors() {
257 auto* can_track_errors =
258 QueryExtension<EndpointCanTrackErrorsExtension>(endpoint_.get());
259 if (can_track_errors != nullptr) {
260 return can_track_errors->CanTrackErrors();
261 } else {
262 return false;
263 }
264 }
265
266 private:
OnShutdownInternal()267 void OnShutdownInternal() {
268 {
269 grpc_core::MutexLock lock(&mu_);
270 fd_ = -1;
271 }
272 endpoint_.reset();
273 // For the Ref taken in TriggerShutdown
274 Unref();
275 }
276 std::unique_ptr<EventEngine::Endpoint> endpoint_;
277 std::unique_ptr<grpc_event_engine_endpoint> eeep_;
278 std::atomic<int64_t> refs_{1};
279 std::atomic<int64_t> shutdown_ref_{1};
280 absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd_;
281 grpc_core::Mutex mu_;
282 grpc_closure* pending_read_cb_;
283 grpc_closure* pending_write_cb_;
284 grpc_slice_buffer* pending_read_buffer_;
285 const std::string peer_address_{
286 ResolvedAddressToURI(endpoint_->GetPeerAddress()).value_or("")};
287 const std::string local_address_{
288 ResolvedAddressToURI(endpoint_->GetLocalAddress()).value_or("")};
289 int fd_{-1};
290 };
291
292 // Read from the endpoint and place the data in slices slice buffer. The
293 // provided closure is also invoked asynchronously.
EndpointRead(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,bool,int min_progress_size)294 void EndpointRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
295 grpc_closure* cb, bool /* urgent */, int min_progress_size) {
296 auto* eeep =
297 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
298 ep);
299 if (!eeep->wrapper->ShutdownRef()) {
300 // Shutdown has already been triggered on the endpoint.
301 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::CancelledError());
302 return;
303 }
304
305 EventEngine::Endpoint::ReadArgs read_args = {min_progress_size};
306 if (eeep->wrapper->Read(cb, slices, &read_args)) {
307 // Read succeeded immediately. Run the callback inline.
308 eeep->wrapper->FinishPendingRead(absl::OkStatus());
309 }
310
311 eeep->wrapper->ShutdownUnref();
312 }
313
314 // Write the data from slices and invoke the provided closure asynchronously
315 // after the write is complete.
EndpointWrite(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg,int max_frame_size)316 void EndpointWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
317 grpc_closure* cb, void* arg, int max_frame_size) {
318 auto* eeep =
319 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
320 ep);
321 if (!eeep->wrapper->ShutdownRef()) {
322 // Shutdown has already been triggered on the endpoint.
323 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::CancelledError());
324 return;
325 }
326
327 EventEngine::Endpoint::WriteArgs write_args = {arg, max_frame_size};
328 if (eeep->wrapper->Write(cb, slices, &write_args)) {
329 // Write succeeded immediately. Run the callback inline.
330 eeep->wrapper->FinishPendingWrite(absl::OkStatus());
331 }
332 eeep->wrapper->ShutdownUnref();
333 }
334
EndpointAddToPollset(grpc_endpoint *,grpc_pollset *)335 void EndpointAddToPollset(grpc_endpoint* /* ep */,
336 grpc_pollset* /* pollset */) {}
EndpointAddToPollsetSet(grpc_endpoint *,grpc_pollset_set *)337 void EndpointAddToPollsetSet(grpc_endpoint* /* ep */,
338 grpc_pollset_set* /* pollset */) {}
EndpointDeleteFromPollsetSet(grpc_endpoint *,grpc_pollset_set *)339 void EndpointDeleteFromPollsetSet(grpc_endpoint* /* ep */,
340 grpc_pollset_set* /* pollset */) {}
341
342 /// Attempts to free the underlying data structures.
343 /// After destruction, no new endpoint operations may be started.
344 /// It is the caller's responsibility to ensure that calls to EndpointDestroy
345 /// are synchronized.
EndpointDestroy(grpc_endpoint * ep)346 void EndpointDestroy(grpc_endpoint* ep) {
347 auto* eeep =
348 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
349 ep);
350 GRPC_TRACE_LOG(event_engine, INFO)
351 << "EventEngine::Endpoint::" << eeep->wrapper << " EndpointDestroy";
352 eeep->wrapper->TriggerShutdown(nullptr);
353 eeep->wrapper->Unref();
354 }
355
EndpointGetPeerAddress(grpc_endpoint * ep)356 absl::string_view EndpointGetPeerAddress(grpc_endpoint* ep) {
357 auto* eeep =
358 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
359 ep);
360 return eeep->wrapper->PeerAddress();
361 }
362
EndpointGetLocalAddress(grpc_endpoint * ep)363 absl::string_view EndpointGetLocalAddress(grpc_endpoint* ep) {
364 auto* eeep =
365 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
366 ep);
367 return eeep->wrapper->LocalAddress();
368 }
369
EndpointGetFd(grpc_endpoint * ep)370 int EndpointGetFd(grpc_endpoint* ep) {
371 auto* eeep =
372 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
373 ep);
374 return eeep->wrapper->Fd();
375 }
376
EndpointCanTrackErr(grpc_endpoint * ep)377 bool EndpointCanTrackErr(grpc_endpoint* ep) {
378 auto* eeep =
379 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
380 ep);
381 return eeep->wrapper->CanTrackErrors();
382 }
383
384 grpc_endpoint_vtable grpc_event_engine_endpoint_vtable = {
385 EndpointRead,
386 EndpointWrite,
387 EndpointAddToPollset,
388 EndpointAddToPollsetSet,
389 EndpointDeleteFromPollsetSet,
390 EndpointDestroy,
391 EndpointGetPeerAddress,
392 EndpointGetLocalAddress,
393 EndpointGetFd,
394 EndpointCanTrackErr};
395
EventEngineEndpointWrapper(std::unique_ptr<EventEngine::Endpoint> endpoint)396 EventEngineEndpointWrapper::EventEngineEndpointWrapper(
397 std::unique_ptr<EventEngine::Endpoint> endpoint)
398 : endpoint_(std::move(endpoint)),
399 eeep_(std::make_unique<grpc_event_engine_endpoint>()) {
400 eeep_->base.vtable = &grpc_event_engine_endpoint_vtable;
401 eeep_->wrapper = this;
402 auto* supports_fd =
403 QueryExtension<EndpointSupportsFdExtension>(endpoint_.get());
404 if (supports_fd != nullptr) {
405 fd_ = supports_fd->GetWrappedFd();
406 } else {
407 fd_ = -1;
408 }
409 GRPC_TRACE_LOG(event_engine, INFO)
410 << "EventEngine::Endpoint " << eeep_->wrapper << " Create";
411 }
412
413 } // namespace
414
grpc_event_engine_endpoint_create(std::unique_ptr<EventEngine::Endpoint> ee_endpoint)415 grpc_endpoint* grpc_event_engine_endpoint_create(
416 std::unique_ptr<EventEngine::Endpoint> ee_endpoint) {
417 DCHECK(ee_endpoint != nullptr);
418 auto wrapper = new EventEngineEndpointWrapper(std::move(ee_endpoint));
419 return wrapper->GetGrpcEndpoint();
420 }
421
grpc_is_event_engine_endpoint(grpc_endpoint * ep)422 bool grpc_is_event_engine_endpoint(grpc_endpoint* ep) {
423 return ep->vtable == &grpc_event_engine_endpoint_vtable;
424 }
425
grpc_get_wrapped_event_engine_endpoint(grpc_endpoint * ep)426 EventEngine::Endpoint* grpc_get_wrapped_event_engine_endpoint(
427 grpc_endpoint* ep) {
428 if (!grpc_is_event_engine_endpoint(ep)) {
429 return nullptr;
430 }
431 auto* eeep =
432 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
433 ep);
434 return eeep->wrapper->endpoint();
435 }
436
grpc_take_wrapped_event_engine_endpoint(grpc_endpoint * ep)437 std::unique_ptr<EventEngine::Endpoint> grpc_take_wrapped_event_engine_endpoint(
438 grpc_endpoint* ep) {
439 if (!grpc_is_event_engine_endpoint(ep)) {
440 return nullptr;
441 }
442 auto* eeep =
443 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
444 ep);
445 auto endpoint = eeep->wrapper->ReleaseEndpoint();
446 eeep->wrapper->Unref();
447 return endpoint;
448 }
449
grpc_event_engine_endpoint_destroy_and_release_fd(grpc_endpoint * ep,int * fd,grpc_closure * on_release_fd)450 void grpc_event_engine_endpoint_destroy_and_release_fd(
451 grpc_endpoint* ep, int* fd, grpc_closure* on_release_fd) {
452 auto* eeep =
453 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
454 ep);
455 if (fd == nullptr || on_release_fd == nullptr) {
456 if (fd != nullptr) {
457 *fd = -1;
458 }
459 eeep->wrapper->TriggerShutdown(nullptr);
460 } else {
461 *fd = -1;
462 eeep->wrapper->TriggerShutdown(
463 [fd, on_release_fd](absl::StatusOr<int> release_fd) {
464 if (release_fd.ok()) {
465 *fd = *release_fd;
466 }
467 RunEventEngineClosure(on_release_fd,
468 absl_status_to_grpc_error(release_fd.status()));
469 });
470 }
471 eeep->wrapper->Unref();
472 }
473
474 } // namespace experimental
475 } // namespace grpc_event_engine
476