1 /*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 #include <grpc/support/port_platform.h>
19
20 #include "src/core/lib/iomgr/port.h"
21 #if GRPC_ARES == 1 && defined(GRPC_UV)
22
23 #include "absl/strings/str_format.h"
24
25 #include <ares.h>
26 #include <uv.h>
27
28 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
29
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/string_util.h>
33 #include <grpc/support/time.h>
34 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
35 #include "src/core/lib/gpr/string.h"
36 #include "src/core/lib/iomgr/work_serializer.h"
37
38 namespace grpc_core {
39
40 void ares_uv_poll_cb(uv_poll_t* handle, int status, int events);
41
ares_uv_poll_close_cb(uv_handle_t * handle)42 void ares_uv_poll_close_cb(uv_handle_t* handle) { delete handle; }
43
44 class GrpcPolledFdLibuv : public GrpcPolledFd {
45 public:
GrpcPolledFdLibuv(ares_socket_t as,std::shared_ptr<WorkSerializer> work_serializer)46 GrpcPolledFdLibuv(ares_socket_t as,
47 std::shared_ptr<WorkSerializer> work_serializer)
48 : name_(absl::StrFormat("c-ares socket: %" PRIdPTR, (intptr_t)as)),
49 as_(as),
50 work_serializer_(std::move(work_serializer)) {
51 handle_ = new uv_poll_t();
52 uv_poll_init_socket(uv_default_loop(), handle_, as);
53 handle_->data = this;
54 }
55
RegisterForOnReadableLocked(grpc_closure * read_closure)56 void RegisterForOnReadableLocked(grpc_closure* read_closure) override {
57 GPR_ASSERT(read_closure_ == nullptr);
58 GPR_ASSERT((poll_events_ & UV_READABLE) == 0);
59 read_closure_ = read_closure;
60 poll_events_ |= UV_READABLE;
61 uv_poll_start(handle_, poll_events_, ares_uv_poll_cb);
62 }
63
RegisterForOnWriteableLocked(grpc_closure * write_closure)64 void RegisterForOnWriteableLocked(grpc_closure* write_closure) override {
65 GPR_ASSERT(write_closure_ == nullptr);
66 GPR_ASSERT((poll_events_ & UV_WRITABLE) == 0);
67 write_closure_ = write_closure;
68 poll_events_ |= UV_WRITABLE;
69 uv_poll_start(handle_, poll_events_, ares_uv_poll_cb);
70 }
71
IsFdStillReadableLocked()72 bool IsFdStillReadableLocked() override {
73 /* uv_poll_t is based on poll, which is level triggered. So, if cares
74 * leaves some data unread, the event will trigger again. */
75 return false;
76 }
77
ShutdownInternalLocked(grpc_error * error)78 void ShutdownInternalLocked(grpc_error* error) {
79 uv_poll_stop(handle_);
80 uv_close(reinterpret_cast<uv_handle_t*>(handle_), ares_uv_poll_close_cb);
81 if (read_closure_ != nullptr) {
82 grpc_core::ExecCtx::Run(DEBUG_LOCATION, read_closure_,
83 GRPC_ERROR_CANCELLED);
84 }
85 if (write_closure_ != nullptr) {
86 grpc_core::ExecCtx::Run(DEBUG_LOCATION, write_closure_,
87 GRPC_ERROR_CANCELLED);
88 }
89 }
90
ShutdownLocked(grpc_error * error)91 void ShutdownLocked(grpc_error* error) override {
92 if (grpc_core::ExecCtx::Get() == nullptr) {
93 grpc_core::ExecCtx exec_ctx;
94 ShutdownInternalLocked(error);
95 } else {
96 ShutdownInternalLocked(error);
97 }
98 }
99
GetWrappedAresSocketLocked()100 ares_socket_t GetWrappedAresSocketLocked() override { return as_; }
101
GetName()102 const char* GetName() override { return name_.c_str(); }
103
104 // TODO(apolcyn): Data members should be private.
105 std::string name_;
106 ares_socket_t as_;
107 uv_poll_t* handle_;
108 grpc_closure* read_closure_ = nullptr;
109 grpc_closure* write_closure_ = nullptr;
110 int poll_events_ = 0;
111 std::shared_ptr<WorkSerializer> work_serializer_;
112 };
113
114 struct AresUvPollCbArg {
AresUvPollCbArggrpc_core::AresUvPollCbArg115 AresUvPollCbArg(uv_poll_t* handle, int status, int events)
116 : handle(handle), status(status), events(events) {}
117
118 uv_poll_t* handle;
119 int status;
120 int events;
121 };
122
ares_uv_poll_cb_locked(AresUvPollCbArg * arg)123 static void ares_uv_poll_cb_locked(AresUvPollCbArg* arg) {
124 std::unique_ptr<AresUvPollCbArg> arg_struct(arg);
125 uv_poll_t* handle = arg_struct->handle;
126 int status = arg_struct->status;
127 int events = arg_struct->events;
128 GrpcPolledFdLibuv* polled_fd =
129 reinterpret_cast<GrpcPolledFdLibuv*>(handle->data);
130 grpc_error* error = GRPC_ERROR_NONE;
131 if (status < 0) {
132 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("cares polling error");
133 error =
134 grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
135 grpc_slice_from_static_string(uv_strerror(status)));
136 }
137 if (events & UV_READABLE) {
138 GPR_ASSERT(polled_fd->read_closure_ != nullptr);
139 grpc_core::ExecCtx::Run(DEBUG_LOCATION, polled_fd->read_closure_, error);
140 polled_fd->read_closure_ = nullptr;
141 polled_fd->poll_events_ &= ~UV_READABLE;
142 }
143 if (events & UV_WRITABLE) {
144 GPR_ASSERT(polled_fd->write_closure_ != nullptr);
145 grpc_core::ExecCtx::Run(DEBUG_LOCATION, polled_fd->write_closure_, error);
146 polled_fd->write_closure_ = nullptr;
147 polled_fd->poll_events_ &= ~UV_WRITABLE;
148 }
149 uv_poll_start(handle, polled_fd->poll_events_, ares_uv_poll_cb);
150 }
151
ares_uv_poll_cb(uv_poll_t * handle,int status,int events)152 void ares_uv_poll_cb(uv_poll_t* handle, int status, int events) {
153 grpc_core::ExecCtx exec_ctx;
154 GrpcPolledFdLibuv* polled_fd =
155 reinterpret_cast<GrpcPolledFdLibuv*>(handle->data);
156 AresUvPollCbArg* arg = new AresUvPollCbArg(handle, status, events);
157 polled_fd->work_serializer_->Run([arg]() { ares_uv_poll_cb_locked(arg); },
158 DEBUG_LOCATION);
159 }
160
161 class GrpcPolledFdFactoryLibuv : public GrpcPolledFdFactory {
162 public:
NewGrpcPolledFdLocked(ares_socket_t as,grpc_pollset_set * driver_pollset_set,std::shared_ptr<WorkSerializer> work_serializer)163 GrpcPolledFd* NewGrpcPolledFdLocked(
164 ares_socket_t as, grpc_pollset_set* driver_pollset_set,
165 std::shared_ptr<WorkSerializer> work_serializer) override {
166 return new GrpcPolledFdLibuv(as, std::move(work_serializer));
167 }
168
ConfigureAresChannelLocked(ares_channel channel)169 void ConfigureAresChannelLocked(ares_channel channel) override {}
170 };
171
NewGrpcPolledFdFactory(std::shared_ptr<WorkSerializer> work_serializer)172 std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(
173 std::shared_ptr<WorkSerializer> work_serializer) {
174 return absl::make_unique<GrpcPolledFdFactoryLibuv>();
175 }
176
177 } // namespace grpc_core
178
179 #endif /* GRPC_ARES == 1 && defined(GRPC_UV) */
180