• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 #include <grpc/support/port_platform.h>
19 
20 #include "src/core/lib/iomgr/port.h"
21 #if GRPC_ARES == 1 && !defined(GRPC_UV)
22 
23 #include <ares.h>
24 #include <string.h>
25 
26 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
27 
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/string_util.h>
31 #include <grpc/support/time.h>
32 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
33 #include "src/core/lib/gpr/string.h"
34 #include "src/core/lib/iomgr/iomgr_internal.h"
35 #include "src/core/lib/iomgr/sockaddr_utils.h"
36 
37 typedef struct fd_node {
38   /** the owner of this fd node */
39   grpc_ares_ev_driver* ev_driver;
40   /** a closure wrapping on_readable_locked, which should be
41      invoked when the grpc_fd in this node becomes readable. */
42   grpc_closure read_closure;
43   /** a closure wrapping on_writable_locked, which should be
44      invoked when the grpc_fd in this node becomes writable. */
45   grpc_closure write_closure;
46   /** next fd node in the list */
47   struct fd_node* next;
48 
49   /** wrapped fd that's polled by grpc's poller for the current platform */
50   grpc_core::GrpcPolledFd* grpc_polled_fd;
51   /** if the readable closure has been registered */
52   bool readable_registered;
53   /** if the writable closure has been registered */
54   bool writable_registered;
55   /** if the fd has been shutdown yet from grpc iomgr perspective */
56   bool already_shutdown;
57 } fd_node;
58 
59 struct grpc_ares_ev_driver {
60   /** the ares_channel owned by this event driver */
61   ares_channel channel;
62   /** pollset set for driving the IO events of the channel */
63   grpc_pollset_set* pollset_set;
64   /** refcount of the event driver */
65   gpr_refcount refs;
66 
67   /** combiner to synchronize c-ares and I/O callbacks on */
68   grpc_combiner* combiner;
69   /** a list of grpc_fd that this event driver is currently using. */
70   fd_node* fds;
71   /** is this event driver currently working? */
72   bool working;
73   /** is this event driver being shut down */
74   bool shutting_down;
75   /** request object that's using this ev driver */
76   grpc_ares_request* request;
77   /** Owned by the ev_driver. Creates new GrpcPolledFd's */
78   grpc_core::UniquePtr<grpc_core::GrpcPolledFdFactory> polled_fd_factory;
79 };
80 
81 static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver);
82 
grpc_ares_ev_driver_ref(grpc_ares_ev_driver * ev_driver)83 static grpc_ares_ev_driver* grpc_ares_ev_driver_ref(
84     grpc_ares_ev_driver* ev_driver) {
85   gpr_log(GPR_DEBUG, "Ref ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
86   gpr_ref(&ev_driver->refs);
87   return ev_driver;
88 }
89 
grpc_ares_ev_driver_unref(grpc_ares_ev_driver * ev_driver)90 static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) {
91   gpr_log(GPR_DEBUG, "Unref ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
92   if (gpr_unref(&ev_driver->refs)) {
93     gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
94     GPR_ASSERT(ev_driver->fds == nullptr);
95     GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver");
96     ares_destroy(ev_driver->channel);
97     grpc_ares_complete_request_locked(ev_driver->request);
98     grpc_core::Delete(ev_driver);
99   }
100 }
101 
fd_node_destroy_locked(fd_node * fdn)102 static void fd_node_destroy_locked(fd_node* fdn) {
103   gpr_log(GPR_DEBUG, "delete fd: %s", fdn->grpc_polled_fd->GetName());
104   GPR_ASSERT(!fdn->readable_registered);
105   GPR_ASSERT(!fdn->writable_registered);
106   GPR_ASSERT(fdn->already_shutdown);
107   grpc_core::Delete(fdn->grpc_polled_fd);
108   gpr_free(fdn);
109 }
110 
fd_node_shutdown_locked(fd_node * fdn,const char * reason)111 static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) {
112   if (!fdn->already_shutdown) {
113     fdn->already_shutdown = true;
114     fdn->grpc_polled_fd->ShutdownLocked(
115         GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason));
116   }
117 }
118 
grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver ** ev_driver,grpc_pollset_set * pollset_set,grpc_combiner * combiner,grpc_ares_request * request)119 grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
120                                               grpc_pollset_set* pollset_set,
121                                               grpc_combiner* combiner,
122                                               grpc_ares_request* request) {
123   *ev_driver = grpc_core::New<grpc_ares_ev_driver>();
124   ares_options opts;
125   memset(&opts, 0, sizeof(opts));
126   opts.flags |= ARES_FLAG_STAYOPEN;
127   int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS);
128   gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create_locked");
129   if (status != ARES_SUCCESS) {
130     char* err_msg;
131     gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s",
132                  ares_strerror(status));
133     grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(err_msg);
134     gpr_free(err_msg);
135     gpr_free(*ev_driver);
136     return err;
137   }
138   (*ev_driver)->combiner = GRPC_COMBINER_REF(combiner, "ares event driver");
139   gpr_ref_init(&(*ev_driver)->refs, 1);
140   (*ev_driver)->pollset_set = pollset_set;
141   (*ev_driver)->fds = nullptr;
142   (*ev_driver)->working = false;
143   (*ev_driver)->shutting_down = false;
144   (*ev_driver)->request = request;
145   (*ev_driver)->polled_fd_factory =
146       grpc_core::NewGrpcPolledFdFactory((*ev_driver)->combiner);
147   (*ev_driver)
148       ->polled_fd_factory->ConfigureAresChannelLocked((*ev_driver)->channel);
149   return GRPC_ERROR_NONE;
150 }
151 
grpc_ares_ev_driver_on_queries_complete_locked(grpc_ares_ev_driver * ev_driver)152 void grpc_ares_ev_driver_on_queries_complete_locked(
153     grpc_ares_ev_driver* ev_driver) {
154   // We mark the event driver as being shut down. If the event driver
155   // is working, grpc_ares_notify_on_event_locked will shut down the
156   // fds; if it's not working, there are no fds to shut down.
157   ev_driver->shutting_down = true;
158   grpc_ares_ev_driver_unref(ev_driver);
159 }
160 
grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver * ev_driver)161 void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) {
162   ev_driver->shutting_down = true;
163   fd_node* fn = ev_driver->fds;
164   while (fn != nullptr) {
165     fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown");
166     fn = fn->next;
167   }
168 }
169 
170 // Search fd in the fd_node list head. This is an O(n) search, the max possible
171 // value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests.
pop_fd_node_locked(fd_node ** head,ares_socket_t as)172 static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) {
173   fd_node dummy_head;
174   dummy_head.next = *head;
175   fd_node* node = &dummy_head;
176   while (node->next != nullptr) {
177     if (node->next->grpc_polled_fd->GetWrappedAresSocketLocked() == as) {
178       fd_node* ret = node->next;
179       node->next = node->next->next;
180       *head = dummy_head.next;
181       return ret;
182     }
183     node = node->next;
184   }
185   return nullptr;
186 }
187 
on_readable_locked(void * arg,grpc_error * error)188 static void on_readable_locked(void* arg, grpc_error* error) {
189   fd_node* fdn = static_cast<fd_node*>(arg);
190   grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
191   const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
192   fdn->readable_registered = false;
193   gpr_log(GPR_DEBUG, "readable on %s", fdn->grpc_polled_fd->GetName());
194   if (error == GRPC_ERROR_NONE) {
195     do {
196       ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD);
197     } while (fdn->grpc_polled_fd->IsFdStillReadableLocked());
198   } else {
199     // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
200     // timed out. The pending lookups made on this ev_driver will be cancelled
201     // by the following ares_cancel() and the on_done callbacks will be invoked
202     // with a status of ARES_ECANCELLED. The remaining file descriptors in this
203     // ev_driver will be cleaned up in the follwing
204     // grpc_ares_notify_on_event_locked().
205     ares_cancel(ev_driver->channel);
206   }
207   grpc_ares_notify_on_event_locked(ev_driver);
208   grpc_ares_ev_driver_unref(ev_driver);
209 }
210 
on_writable_locked(void * arg,grpc_error * error)211 static void on_writable_locked(void* arg, grpc_error* error) {
212   fd_node* fdn = static_cast<fd_node*>(arg);
213   grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
214   const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
215   fdn->writable_registered = false;
216   gpr_log(GPR_DEBUG, "writable on %s", fdn->grpc_polled_fd->GetName());
217   if (error == GRPC_ERROR_NONE) {
218     ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, as);
219   } else {
220     // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
221     // timed out. The pending lookups made on this ev_driver will be cancelled
222     // by the following ares_cancel() and the on_done callbacks will be invoked
223     // with a status of ARES_ECANCELLED. The remaining file descriptors in this
224     // ev_driver will be cleaned up in the follwing
225     // grpc_ares_notify_on_event_locked().
226     ares_cancel(ev_driver->channel);
227   }
228   grpc_ares_notify_on_event_locked(ev_driver);
229   grpc_ares_ev_driver_unref(ev_driver);
230 }
231 
grpc_ares_ev_driver_get_channel_locked(grpc_ares_ev_driver * ev_driver)232 ares_channel* grpc_ares_ev_driver_get_channel_locked(
233     grpc_ares_ev_driver* ev_driver) {
234   return &ev_driver->channel;
235 }
236 
237 // Get the file descriptors used by the ev_driver's ares channel, register
238 // driver_closure with these filedescriptors.
grpc_ares_notify_on_event_locked(grpc_ares_ev_driver * ev_driver)239 static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
240   fd_node* new_list = nullptr;
241   if (!ev_driver->shutting_down) {
242     ares_socket_t socks[ARES_GETSOCK_MAXNUM];
243     int socks_bitmask =
244         ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM);
245     for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
246       if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
247           ARES_GETSOCK_WRITABLE(socks_bitmask, i)) {
248         fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]);
249         // Create a new fd_node if sock[i] is not in the fd_node list.
250         if (fdn == nullptr) {
251           fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node)));
252           fdn->grpc_polled_fd =
253               ev_driver->polled_fd_factory->NewGrpcPolledFdLocked(
254                   socks[i], ev_driver->pollset_set, ev_driver->combiner);
255           gpr_log(GPR_DEBUG, "new fd: %s", fdn->grpc_polled_fd->GetName());
256           fdn->ev_driver = ev_driver;
257           fdn->readable_registered = false;
258           fdn->writable_registered = false;
259           fdn->already_shutdown = false;
260           GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn,
261                             grpc_combiner_scheduler(ev_driver->combiner));
262           GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn,
263                             grpc_combiner_scheduler(ev_driver->combiner));
264         }
265         fdn->next = new_list;
266         new_list = fdn;
267         // Register read_closure if the socket is readable and read_closure has
268         // not been registered with this socket.
269         if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
270             !fdn->readable_registered) {
271           grpc_ares_ev_driver_ref(ev_driver);
272           gpr_log(GPR_DEBUG, "notify read on: %s",
273                   fdn->grpc_polled_fd->GetName());
274           fdn->grpc_polled_fd->RegisterForOnReadableLocked(&fdn->read_closure);
275           fdn->readable_registered = true;
276         }
277         // Register write_closure if the socket is writable and write_closure
278         // has not been registered with this socket.
279         if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
280             !fdn->writable_registered) {
281           gpr_log(GPR_DEBUG, "notify write on: %s",
282                   fdn->grpc_polled_fd->GetName());
283           grpc_ares_ev_driver_ref(ev_driver);
284           fdn->grpc_polled_fd->RegisterForOnWriteableLocked(
285               &fdn->write_closure);
286           fdn->writable_registered = true;
287         }
288       }
289     }
290   }
291   // Any remaining fds in ev_driver->fds were not returned by ares_getsock() and
292   // are therefore no longer in use, so they can be shut down and removed from
293   // the list.
294   while (ev_driver->fds != nullptr) {
295     fd_node* cur = ev_driver->fds;
296     ev_driver->fds = ev_driver->fds->next;
297     fd_node_shutdown_locked(cur, "c-ares fd shutdown");
298     if (!cur->readable_registered && !cur->writable_registered) {
299       fd_node_destroy_locked(cur);
300     } else {
301       cur->next = new_list;
302       new_list = cur;
303     }
304   }
305   ev_driver->fds = new_list;
306   // If the ev driver has no working fd, all the tasks are done.
307   if (new_list == nullptr) {
308     ev_driver->working = false;
309     gpr_log(GPR_DEBUG, "ev driver stop working");
310   }
311 }
312 
grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver * ev_driver)313 void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) {
314   if (!ev_driver->working) {
315     ev_driver->working = true;
316     grpc_ares_notify_on_event_locked(ev_driver);
317   }
318 }
319 
320 #endif /* GRPC_ARES == 1 && !defined(GRPC_UV) */
321