1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 #include <grpc/support/port_platform.h>
19
20 #include "src/core/lib/iomgr/port.h"
21 #if GRPC_ARES == 1 && !defined(GRPC_UV)
22
23 #include <ares.h>
24 #include <string.h>
25
26 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
27
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/string_util.h>
31 #include <grpc/support/time.h>
32 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
33 #include "src/core/lib/gpr/string.h"
34 #include "src/core/lib/iomgr/iomgr_internal.h"
35 #include "src/core/lib/iomgr/sockaddr_utils.h"
36
37 typedef struct fd_node {
38 /** the owner of this fd node */
39 grpc_ares_ev_driver* ev_driver;
40 /** a closure wrapping on_readable_locked, which should be
41 invoked when the grpc_fd in this node becomes readable. */
42 grpc_closure read_closure;
43 /** a closure wrapping on_writable_locked, which should be
44 invoked when the grpc_fd in this node becomes writable. */
45 grpc_closure write_closure;
46 /** next fd node in the list */
47 struct fd_node* next;
48
49 /** wrapped fd that's polled by grpc's poller for the current platform */
50 grpc_core::GrpcPolledFd* grpc_polled_fd;
51 /** if the readable closure has been registered */
52 bool readable_registered;
53 /** if the writable closure has been registered */
54 bool writable_registered;
55 /** if the fd has been shutdown yet from grpc iomgr perspective */
56 bool already_shutdown;
57 } fd_node;
58
59 struct grpc_ares_ev_driver {
60 /** the ares_channel owned by this event driver */
61 ares_channel channel;
62 /** pollset set for driving the IO events of the channel */
63 grpc_pollset_set* pollset_set;
64 /** refcount of the event driver */
65 gpr_refcount refs;
66
67 /** combiner to synchronize c-ares and I/O callbacks on */
68 grpc_combiner* combiner;
69 /** a list of grpc_fd that this event driver is currently using. */
70 fd_node* fds;
71 /** is this event driver currently working? */
72 bool working;
73 /** is this event driver being shut down */
74 bool shutting_down;
75 /** request object that's using this ev driver */
76 grpc_ares_request* request;
77 /** Owned by the ev_driver. Creates new GrpcPolledFd's */
78 grpc_core::UniquePtr<grpc_core::GrpcPolledFdFactory> polled_fd_factory;
79 };
80
81 static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver);
82
grpc_ares_ev_driver_ref(grpc_ares_ev_driver * ev_driver)83 static grpc_ares_ev_driver* grpc_ares_ev_driver_ref(
84 grpc_ares_ev_driver* ev_driver) {
85 gpr_log(GPR_DEBUG, "Ref ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
86 gpr_ref(&ev_driver->refs);
87 return ev_driver;
88 }
89
grpc_ares_ev_driver_unref(grpc_ares_ev_driver * ev_driver)90 static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) {
91 gpr_log(GPR_DEBUG, "Unref ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
92 if (gpr_unref(&ev_driver->refs)) {
93 gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
94 GPR_ASSERT(ev_driver->fds == nullptr);
95 GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver");
96 ares_destroy(ev_driver->channel);
97 grpc_ares_complete_request_locked(ev_driver->request);
98 grpc_core::Delete(ev_driver);
99 }
100 }
101
fd_node_destroy_locked(fd_node * fdn)102 static void fd_node_destroy_locked(fd_node* fdn) {
103 gpr_log(GPR_DEBUG, "delete fd: %s", fdn->grpc_polled_fd->GetName());
104 GPR_ASSERT(!fdn->readable_registered);
105 GPR_ASSERT(!fdn->writable_registered);
106 GPR_ASSERT(fdn->already_shutdown);
107 grpc_core::Delete(fdn->grpc_polled_fd);
108 gpr_free(fdn);
109 }
110
fd_node_shutdown_locked(fd_node * fdn,const char * reason)111 static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) {
112 if (!fdn->already_shutdown) {
113 fdn->already_shutdown = true;
114 fdn->grpc_polled_fd->ShutdownLocked(
115 GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason));
116 }
117 }
118
grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver ** ev_driver,grpc_pollset_set * pollset_set,grpc_combiner * combiner,grpc_ares_request * request)119 grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
120 grpc_pollset_set* pollset_set,
121 grpc_combiner* combiner,
122 grpc_ares_request* request) {
123 *ev_driver = grpc_core::New<grpc_ares_ev_driver>();
124 ares_options opts;
125 memset(&opts, 0, sizeof(opts));
126 opts.flags |= ARES_FLAG_STAYOPEN;
127 int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS);
128 gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create_locked");
129 if (status != ARES_SUCCESS) {
130 char* err_msg;
131 gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s",
132 ares_strerror(status));
133 grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(err_msg);
134 gpr_free(err_msg);
135 gpr_free(*ev_driver);
136 return err;
137 }
138 (*ev_driver)->combiner = GRPC_COMBINER_REF(combiner, "ares event driver");
139 gpr_ref_init(&(*ev_driver)->refs, 1);
140 (*ev_driver)->pollset_set = pollset_set;
141 (*ev_driver)->fds = nullptr;
142 (*ev_driver)->working = false;
143 (*ev_driver)->shutting_down = false;
144 (*ev_driver)->request = request;
145 (*ev_driver)->polled_fd_factory =
146 grpc_core::NewGrpcPolledFdFactory((*ev_driver)->combiner);
147 (*ev_driver)
148 ->polled_fd_factory->ConfigureAresChannelLocked((*ev_driver)->channel);
149 return GRPC_ERROR_NONE;
150 }
151
grpc_ares_ev_driver_on_queries_complete_locked(grpc_ares_ev_driver * ev_driver)152 void grpc_ares_ev_driver_on_queries_complete_locked(
153 grpc_ares_ev_driver* ev_driver) {
154 // We mark the event driver as being shut down. If the event driver
155 // is working, grpc_ares_notify_on_event_locked will shut down the
156 // fds; if it's not working, there are no fds to shut down.
157 ev_driver->shutting_down = true;
158 grpc_ares_ev_driver_unref(ev_driver);
159 }
160
grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver * ev_driver)161 void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) {
162 ev_driver->shutting_down = true;
163 fd_node* fn = ev_driver->fds;
164 while (fn != nullptr) {
165 fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown");
166 fn = fn->next;
167 }
168 }
169
170 // Search fd in the fd_node list head. This is an O(n) search, the max possible
171 // value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests.
pop_fd_node_locked(fd_node ** head,ares_socket_t as)172 static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) {
173 fd_node dummy_head;
174 dummy_head.next = *head;
175 fd_node* node = &dummy_head;
176 while (node->next != nullptr) {
177 if (node->next->grpc_polled_fd->GetWrappedAresSocketLocked() == as) {
178 fd_node* ret = node->next;
179 node->next = node->next->next;
180 *head = dummy_head.next;
181 return ret;
182 }
183 node = node->next;
184 }
185 return nullptr;
186 }
187
on_readable_locked(void * arg,grpc_error * error)188 static void on_readable_locked(void* arg, grpc_error* error) {
189 fd_node* fdn = static_cast<fd_node*>(arg);
190 grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
191 const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
192 fdn->readable_registered = false;
193 gpr_log(GPR_DEBUG, "readable on %s", fdn->grpc_polled_fd->GetName());
194 if (error == GRPC_ERROR_NONE) {
195 do {
196 ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD);
197 } while (fdn->grpc_polled_fd->IsFdStillReadableLocked());
198 } else {
199 // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
200 // timed out. The pending lookups made on this ev_driver will be cancelled
201 // by the following ares_cancel() and the on_done callbacks will be invoked
202 // with a status of ARES_ECANCELLED. The remaining file descriptors in this
203 // ev_driver will be cleaned up in the follwing
204 // grpc_ares_notify_on_event_locked().
205 ares_cancel(ev_driver->channel);
206 }
207 grpc_ares_notify_on_event_locked(ev_driver);
208 grpc_ares_ev_driver_unref(ev_driver);
209 }
210
on_writable_locked(void * arg,grpc_error * error)211 static void on_writable_locked(void* arg, grpc_error* error) {
212 fd_node* fdn = static_cast<fd_node*>(arg);
213 grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
214 const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
215 fdn->writable_registered = false;
216 gpr_log(GPR_DEBUG, "writable on %s", fdn->grpc_polled_fd->GetName());
217 if (error == GRPC_ERROR_NONE) {
218 ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, as);
219 } else {
220 // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
221 // timed out. The pending lookups made on this ev_driver will be cancelled
222 // by the following ares_cancel() and the on_done callbacks will be invoked
223 // with a status of ARES_ECANCELLED. The remaining file descriptors in this
224 // ev_driver will be cleaned up in the follwing
225 // grpc_ares_notify_on_event_locked().
226 ares_cancel(ev_driver->channel);
227 }
228 grpc_ares_notify_on_event_locked(ev_driver);
229 grpc_ares_ev_driver_unref(ev_driver);
230 }
231
grpc_ares_ev_driver_get_channel_locked(grpc_ares_ev_driver * ev_driver)232 ares_channel* grpc_ares_ev_driver_get_channel_locked(
233 grpc_ares_ev_driver* ev_driver) {
234 return &ev_driver->channel;
235 }
236
237 // Get the file descriptors used by the ev_driver's ares channel, register
238 // driver_closure with these filedescriptors.
grpc_ares_notify_on_event_locked(grpc_ares_ev_driver * ev_driver)239 static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
240 fd_node* new_list = nullptr;
241 if (!ev_driver->shutting_down) {
242 ares_socket_t socks[ARES_GETSOCK_MAXNUM];
243 int socks_bitmask =
244 ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM);
245 for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
246 if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
247 ARES_GETSOCK_WRITABLE(socks_bitmask, i)) {
248 fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]);
249 // Create a new fd_node if sock[i] is not in the fd_node list.
250 if (fdn == nullptr) {
251 fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node)));
252 fdn->grpc_polled_fd =
253 ev_driver->polled_fd_factory->NewGrpcPolledFdLocked(
254 socks[i], ev_driver->pollset_set, ev_driver->combiner);
255 gpr_log(GPR_DEBUG, "new fd: %s", fdn->grpc_polled_fd->GetName());
256 fdn->ev_driver = ev_driver;
257 fdn->readable_registered = false;
258 fdn->writable_registered = false;
259 fdn->already_shutdown = false;
260 GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn,
261 grpc_combiner_scheduler(ev_driver->combiner));
262 GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn,
263 grpc_combiner_scheduler(ev_driver->combiner));
264 }
265 fdn->next = new_list;
266 new_list = fdn;
267 // Register read_closure if the socket is readable and read_closure has
268 // not been registered with this socket.
269 if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
270 !fdn->readable_registered) {
271 grpc_ares_ev_driver_ref(ev_driver);
272 gpr_log(GPR_DEBUG, "notify read on: %s",
273 fdn->grpc_polled_fd->GetName());
274 fdn->grpc_polled_fd->RegisterForOnReadableLocked(&fdn->read_closure);
275 fdn->readable_registered = true;
276 }
277 // Register write_closure if the socket is writable and write_closure
278 // has not been registered with this socket.
279 if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
280 !fdn->writable_registered) {
281 gpr_log(GPR_DEBUG, "notify write on: %s",
282 fdn->grpc_polled_fd->GetName());
283 grpc_ares_ev_driver_ref(ev_driver);
284 fdn->grpc_polled_fd->RegisterForOnWriteableLocked(
285 &fdn->write_closure);
286 fdn->writable_registered = true;
287 }
288 }
289 }
290 }
291 // Any remaining fds in ev_driver->fds were not returned by ares_getsock() and
292 // are therefore no longer in use, so they can be shut down and removed from
293 // the list.
294 while (ev_driver->fds != nullptr) {
295 fd_node* cur = ev_driver->fds;
296 ev_driver->fds = ev_driver->fds->next;
297 fd_node_shutdown_locked(cur, "c-ares fd shutdown");
298 if (!cur->readable_registered && !cur->writable_registered) {
299 fd_node_destroy_locked(cur);
300 } else {
301 cur->next = new_list;
302 new_list = cur;
303 }
304 }
305 ev_driver->fds = new_list;
306 // If the ev driver has no working fd, all the tasks are done.
307 if (new_list == nullptr) {
308 ev_driver->working = false;
309 gpr_log(GPR_DEBUG, "ev driver stop working");
310 }
311 }
312
grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver * ev_driver)313 void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) {
314 if (!ev_driver->working) {
315 ev_driver->working = true;
316 grpc_ares_notify_on_event_locked(ev_driver);
317 }
318 }
319
320 #endif /* GRPC_ARES == 1 && !defined(GRPC_UV) */
321