1 // Copyright 2023 The gRPC Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_GRPC_POLLED_FD_POSIX_H 16 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_GRPC_POLLED_FD_POSIX_H 17 18 #include <grpc/event_engine/event_engine.h> 19 #include <grpc/support/port_platform.h> 20 21 #include <memory> 22 23 #include "src/core/lib/iomgr/port.h" 24 #include "src/core/util/sync.h" 25 26 #if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) 27 28 // IWYU pragma: no_include <ares_build.h> 29 30 #include <ares.h> 31 #include <sys/ioctl.h> 32 #include <sys/socket.h> 33 #include <sys/uio.h> 34 #include <unistd.h> 35 36 #include <string> 37 #include <unordered_set> 38 #include <utility> 39 40 #include "absl/functional/any_invocable.h" 41 #include "absl/status/status.h" 42 #include "absl/strings/str_cat.h" 43 #include "src/core/lib/event_engine/grpc_polled_fd.h" 44 #include "src/core/lib/event_engine/posix_engine/event_poller.h" 45 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" 46 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" 47 48 namespace grpc_event_engine { 49 namespace experimental { 50 51 class GrpcPolledFdPosix : public GrpcPolledFd { 52 public: GrpcPolledFdPosix(ares_socket_t as,EventHandle * handle)53 GrpcPolledFdPosix(ares_socket_t as, EventHandle* handle) 54 : name_(absl::StrCat("c-ares fd: ", static_cast<int>(as))), 55 as_(as), 56 handle_(handle) {} 57 ~GrpcPolledFdPosix()58 ~GrpcPolledFdPosix() override { 59 // c-ares library will close the fd. This fd may be picked up immediately by 60 // another thread and should not be closed by the following OrphanHandle. 61 int phony_release_fd; 62 handle_->OrphanHandle(/*on_done=*/nullptr, &phony_release_fd, 63 "c-ares query finished"); 64 } 65 RegisterForOnReadableLocked(absl::AnyInvocable<void (absl::Status)> read_closure)66 void RegisterForOnReadableLocked( 67 absl::AnyInvocable<void(absl::Status)> read_closure) override { 68 handle_->NotifyOnRead(new PosixEngineClosure(std::move(read_closure), 69 /*is_permanent=*/false)); 70 } 71 RegisterForOnWriteableLocked(absl::AnyInvocable<void (absl::Status)> write_closure)72 void RegisterForOnWriteableLocked( 73 absl::AnyInvocable<void(absl::Status)> write_closure) override { 74 handle_->NotifyOnWrite(new PosixEngineClosure(std::move(write_closure), 75 /*is_permanent=*/false)); 76 } 77 IsFdStillReadableLocked()78 bool IsFdStillReadableLocked() override { 79 size_t bytes_available = 0; 80 return ioctl(handle_->WrappedFd(), FIONREAD, &bytes_available) == 0 && 81 bytes_available > 0; 82 } 83 ShutdownLocked(absl::Status error)84 bool ShutdownLocked(absl::Status error) override { 85 handle_->ShutdownHandle(error); 86 return true; 87 } 88 GetWrappedAresSocketLocked()89 ares_socket_t GetWrappedAresSocketLocked() override { return as_; } 90 GetName()91 const char* GetName() const override { return name_.c_str(); } 92 93 private: 94 const std::string name_; 95 const ares_socket_t as_; 96 EventHandle* handle_; 97 }; 98 99 class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory { 100 public: GrpcPolledFdFactoryPosix(PosixEventPoller * poller)101 explicit GrpcPolledFdFactoryPosix(PosixEventPoller* poller) 102 : poller_(poller) {} 103 ~GrpcPolledFdFactoryPosix()104 ~GrpcPolledFdFactoryPosix() override { 105 for (auto& fd : owned_fds_) { 106 close(fd); 107 } 108 } 109 Initialize(grpc_core::Mutex *,EventEngine *)110 void Initialize(grpc_core::Mutex*, EventEngine*) override {} 111 NewGrpcPolledFdLocked(ares_socket_t as)112 std::unique_ptr<GrpcPolledFd> NewGrpcPolledFdLocked( 113 ares_socket_t as) override { 114 owned_fds_.insert(as); 115 return std::make_unique<GrpcPolledFdPosix>( 116 as, 117 poller_->CreateHandle(as, "c-ares socket", poller_->CanTrackErrors())); 118 } 119 ConfigureAresChannelLocked(ares_channel channel)120 void ConfigureAresChannelLocked(ares_channel channel) override { 121 ares_set_socket_functions(channel, &kSockFuncs, this); 122 ares_set_socket_configure_callback( 123 channel, &GrpcPolledFdFactoryPosix::ConfigureSocket, nullptr); 124 } 125 126 private: 127 /// Overridden socket API for c-ares Socket(int af,int type,int protocol,void *)128 static ares_socket_t Socket(int af, int type, int protocol, 129 void* /*user_data*/) { 130 return socket(af, type, protocol); 131 } 132 133 /// Overridden connect API for c-ares Connect(ares_socket_t as,const struct sockaddr * target,ares_socklen_t target_len,void *)134 static int Connect(ares_socket_t as, const struct sockaddr* target, 135 ares_socklen_t target_len, void* /*user_data*/) { 136 return connect(as, target, target_len); 137 } 138 139 /// Overridden writev API for c-ares WriteV(ares_socket_t as,const struct iovec * iov,int iovec_count,void *)140 static ares_ssize_t WriteV(ares_socket_t as, const struct iovec* iov, 141 int iovec_count, void* /*user_data*/) { 142 return writev(as, iov, iovec_count); 143 } 144 145 /// Overridden recvfrom API for c-ares RecvFrom(ares_socket_t as,void * data,size_t data_len,int flags,struct sockaddr * from,ares_socklen_t * from_len,void *)146 static ares_ssize_t RecvFrom(ares_socket_t as, void* data, size_t data_len, 147 int flags, struct sockaddr* from, 148 ares_socklen_t* from_len, void* /*user_data*/) { 149 return recvfrom(as, data, data_len, flags, from, from_len); 150 } 151 152 /// Overridden close API for c-ares Close(ares_socket_t as,void * user_data)153 static int Close(ares_socket_t as, void* user_data) { 154 GrpcPolledFdFactoryPosix* self = 155 static_cast<GrpcPolledFdFactoryPosix*>(user_data); 156 if (self->owned_fds_.find(as) == self->owned_fds_.end()) { 157 // c-ares owns this fd, grpc has never seen it 158 return close(as); 159 } 160 return 0; 161 } 162 163 /// Because we're using socket API overrides, c-ares won't 164 /// perform its typical configuration on the socket. See 165 /// https://github.com/c-ares/c-ares/blob/bad62225b7f6b278b92e8e85a255600b629ef517/src/lib/ares_process.c#L1018. 166 /// So we use the configure socket callback override and copy default 167 /// settings that c-ares would normally apply on posix platforms: 168 /// - non-blocking 169 /// - cloexec flag 170 /// - disable nagle ConfigureSocket(ares_socket_t fd,int type,void *)171 static int ConfigureSocket(ares_socket_t fd, int type, void* /*user_data*/) { 172 // clang-format off 173 #define RETURN_IF_ERROR(expr) if (!(expr).ok()) { return -1; } 174 // clang-format on 175 PosixSocketWrapper sock(fd); 176 RETURN_IF_ERROR(sock.SetSocketNonBlocking(1)); 177 RETURN_IF_ERROR(sock.SetSocketCloexec(1)); 178 if (type == SOCK_STREAM) { 179 RETURN_IF_ERROR(sock.SetSocketLowLatency(1)); 180 } 181 return 0; 182 } 183 184 const struct ares_socket_functions kSockFuncs = { 185 &GrpcPolledFdFactoryPosix::Socket /* socket */, 186 &GrpcPolledFdFactoryPosix::Close /* close */, 187 &GrpcPolledFdFactoryPosix::Connect /* connect */, 188 &GrpcPolledFdFactoryPosix::RecvFrom /* recvfrom */, 189 &GrpcPolledFdFactoryPosix::WriteV /* writev */, 190 }; 191 192 PosixEventPoller* poller_; 193 // fds that are used/owned by grpc - we (grpc) will close them rather than 194 // c-ares 195 std::unordered_set<ares_socket_t> owned_fds_; 196 }; 197 198 } // namespace experimental 199 } // namespace grpc_event_engine 200 201 #endif // GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) 202 #endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_GRPC_POLLED_FD_POSIX_H 203