• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_GRPC_POLLED_FD_POSIX_H
16 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_GRPC_POLLED_FD_POSIX_H
17 
18 #include <grpc/event_engine/event_engine.h>
19 #include <grpc/support/port_platform.h>
20 
21 #include <memory>
22 
23 #include "src/core/lib/iomgr/port.h"
24 #include "src/core/util/sync.h"
25 
26 #if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)
27 
28 // IWYU pragma: no_include <ares_build.h>
29 
30 #include <ares.h>
31 #include <sys/ioctl.h>
32 #include <sys/socket.h>
33 #include <sys/uio.h>
34 #include <unistd.h>
35 
36 #include <string>
37 #include <unordered_set>
38 #include <utility>
39 
40 #include "absl/functional/any_invocable.h"
41 #include "absl/status/status.h"
42 #include "absl/strings/str_cat.h"
43 #include "src/core/lib/event_engine/grpc_polled_fd.h"
44 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
45 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
46 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
47 
48 namespace grpc_event_engine {
49 namespace experimental {
50 
51 class GrpcPolledFdPosix : public GrpcPolledFd {
52  public:
GrpcPolledFdPosix(ares_socket_t as,EventHandle * handle)53   GrpcPolledFdPosix(ares_socket_t as, EventHandle* handle)
54       : name_(absl::StrCat("c-ares fd: ", static_cast<int>(as))),
55         as_(as),
56         handle_(handle) {}
57 
~GrpcPolledFdPosix()58   ~GrpcPolledFdPosix() override {
59     // c-ares library will close the fd. This fd may be picked up immediately by
60     // another thread and should not be closed by the following OrphanHandle.
61     int phony_release_fd;
62     handle_->OrphanHandle(/*on_done=*/nullptr, &phony_release_fd,
63                           "c-ares query finished");
64   }
65 
RegisterForOnReadableLocked(absl::AnyInvocable<void (absl::Status)> read_closure)66   void RegisterForOnReadableLocked(
67       absl::AnyInvocable<void(absl::Status)> read_closure) override {
68     handle_->NotifyOnRead(new PosixEngineClosure(std::move(read_closure),
69                                                  /*is_permanent=*/false));
70   }
71 
RegisterForOnWriteableLocked(absl::AnyInvocable<void (absl::Status)> write_closure)72   void RegisterForOnWriteableLocked(
73       absl::AnyInvocable<void(absl::Status)> write_closure) override {
74     handle_->NotifyOnWrite(new PosixEngineClosure(std::move(write_closure),
75                                                   /*is_permanent=*/false));
76   }
77 
IsFdStillReadableLocked()78   bool IsFdStillReadableLocked() override {
79     size_t bytes_available = 0;
80     return ioctl(handle_->WrappedFd(), FIONREAD, &bytes_available) == 0 &&
81            bytes_available > 0;
82   }
83 
ShutdownLocked(absl::Status error)84   bool ShutdownLocked(absl::Status error) override {
85     handle_->ShutdownHandle(error);
86     return true;
87   }
88 
GetWrappedAresSocketLocked()89   ares_socket_t GetWrappedAresSocketLocked() override { return as_; }
90 
GetName()91   const char* GetName() const override { return name_.c_str(); }
92 
93  private:
94   const std::string name_;
95   const ares_socket_t as_;
96   EventHandle* handle_;
97 };
98 
99 class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory {
100  public:
GrpcPolledFdFactoryPosix(PosixEventPoller * poller)101   explicit GrpcPolledFdFactoryPosix(PosixEventPoller* poller)
102       : poller_(poller) {}
103 
~GrpcPolledFdFactoryPosix()104   ~GrpcPolledFdFactoryPosix() override {
105     for (auto& fd : owned_fds_) {
106       close(fd);
107     }
108   }
109 
Initialize(grpc_core::Mutex *,EventEngine *)110   void Initialize(grpc_core::Mutex*, EventEngine*) override {}
111 
NewGrpcPolledFdLocked(ares_socket_t as)112   std::unique_ptr<GrpcPolledFd> NewGrpcPolledFdLocked(
113       ares_socket_t as) override {
114     owned_fds_.insert(as);
115     return std::make_unique<GrpcPolledFdPosix>(
116         as,
117         poller_->CreateHandle(as, "c-ares socket", poller_->CanTrackErrors()));
118   }
119 
ConfigureAresChannelLocked(ares_channel channel)120   void ConfigureAresChannelLocked(ares_channel channel) override {
121     ares_set_socket_functions(channel, &kSockFuncs, this);
122     ares_set_socket_configure_callback(
123         channel, &GrpcPolledFdFactoryPosix::ConfigureSocket, nullptr);
124   }
125 
126  private:
127   /// Overridden socket API for c-ares
Socket(int af,int type,int protocol,void *)128   static ares_socket_t Socket(int af, int type, int protocol,
129                               void* /*user_data*/) {
130     return socket(af, type, protocol);
131   }
132 
133   /// Overridden connect API for c-ares
Connect(ares_socket_t as,const struct sockaddr * target,ares_socklen_t target_len,void *)134   static int Connect(ares_socket_t as, const struct sockaddr* target,
135                      ares_socklen_t target_len, void* /*user_data*/) {
136     return connect(as, target, target_len);
137   }
138 
139   /// Overridden writev API for c-ares
WriteV(ares_socket_t as,const struct iovec * iov,int iovec_count,void *)140   static ares_ssize_t WriteV(ares_socket_t as, const struct iovec* iov,
141                              int iovec_count, void* /*user_data*/) {
142     return writev(as, iov, iovec_count);
143   }
144 
145   /// Overridden recvfrom API for c-ares
RecvFrom(ares_socket_t as,void * data,size_t data_len,int flags,struct sockaddr * from,ares_socklen_t * from_len,void *)146   static ares_ssize_t RecvFrom(ares_socket_t as, void* data, size_t data_len,
147                                int flags, struct sockaddr* from,
148                                ares_socklen_t* from_len, void* /*user_data*/) {
149     return recvfrom(as, data, data_len, flags, from, from_len);
150   }
151 
152   /// Overridden close API for c-ares
Close(ares_socket_t as,void * user_data)153   static int Close(ares_socket_t as, void* user_data) {
154     GrpcPolledFdFactoryPosix* self =
155         static_cast<GrpcPolledFdFactoryPosix*>(user_data);
156     if (self->owned_fds_.find(as) == self->owned_fds_.end()) {
157       // c-ares owns this fd, grpc has never seen it
158       return close(as);
159     }
160     return 0;
161   }
162 
163   /// Because we're using socket API overrides, c-ares won't
164   /// perform its typical configuration on the socket. See
165   /// https://github.com/c-ares/c-ares/blob/bad62225b7f6b278b92e8e85a255600b629ef517/src/lib/ares_process.c#L1018.
166   /// So we use the configure socket callback override and copy default
167   /// settings that c-ares would normally apply on posix platforms:
168   ///   - non-blocking
169   ///   - cloexec flag
170   ///   - disable nagle
ConfigureSocket(ares_socket_t fd,int type,void *)171   static int ConfigureSocket(ares_socket_t fd, int type, void* /*user_data*/) {
172     // clang-format off
173 #define RETURN_IF_ERROR(expr) if (!(expr).ok()) { return -1; }
174     // clang-format on
175     PosixSocketWrapper sock(fd);
176     RETURN_IF_ERROR(sock.SetSocketNonBlocking(1));
177     RETURN_IF_ERROR(sock.SetSocketCloexec(1));
178     if (type == SOCK_STREAM) {
179       RETURN_IF_ERROR(sock.SetSocketLowLatency(1));
180     }
181     return 0;
182   }
183 
184   const struct ares_socket_functions kSockFuncs = {
185       &GrpcPolledFdFactoryPosix::Socket /* socket */,
186       &GrpcPolledFdFactoryPosix::Close /* close */,
187       &GrpcPolledFdFactoryPosix::Connect /* connect */,
188       &GrpcPolledFdFactoryPosix::RecvFrom /* recvfrom */,
189       &GrpcPolledFdFactoryPosix::WriteV /* writev */,
190   };
191 
192   PosixEventPoller* poller_;
193   // fds that are used/owned by grpc - we (grpc) will close them rather than
194   // c-ares
195   std::unordered_set<ares_socket_t> owned_fds_;
196 };
197 
198 }  // namespace experimental
199 }  // namespace grpc_event_engine
200 
201 #endif  // GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)
202 #endif  // GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_GRPC_POLLED_FD_POSIX_H
203