• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/event_engine/ares_resolver.h"
15 
16 #include <grpc/support/port_platform.h>
17 
18 #include <string>
19 #include <vector>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 // IWYU pragma: no_include <ares_version.h>
24 // IWYU pragma: no_include <arpa/inet.h>
25 // IWYU pragma: no_include <arpa/nameser.h>
26 // IWYU pragma: no_include <inttypes.h>
27 // IWYU pragma: no_include <netdb.h>
28 // IWYU pragma: no_include <netinet/in.h>
29 // IWYU pragma: no_include <stdlib.h>
30 // IWYU pragma: no_include <sys/socket.h>
31 // IWYU pragma: no_include <ratio>
32 
33 #if GRPC_ARES == 1
34 
35 #include <address_sorting/address_sorting.h>
36 #include <ares.h>
37 
38 #if ARES_VERSION >= 0x011200
39 // c-ares 1.18.0 or later starts to provide ares_nameser.h as a public header.
40 #include <ares_nameser.h>
41 #else
42 #include "src/core/lib/event_engine/nameser.h"  // IWYU pragma: keep
43 #endif
44 
45 #include <grpc/event_engine/event_engine.h>
46 #include <string.h>
47 
48 #include <algorithm>
49 #include <chrono>
50 #include <memory>
51 #include <type_traits>
52 #include <utility>
53 
54 #include "absl/functional/any_invocable.h"
55 #include "absl/hash/hash.h"
56 #include "absl/log/check.h"
57 #include "absl/log/log.h"
58 #include "absl/strings/match.h"
59 #include "absl/strings/numbers.h"
60 #include "absl/strings/str_cat.h"
61 #include "absl/strings/str_format.h"
62 #include "absl/types/optional.h"
63 #include "src/core/config/config_vars.h"
64 #include "src/core/lib/address_utils/parse_address.h"
65 #include "src/core/lib/address_utils/sockaddr_utils.h"
66 #include "src/core/lib/debug/trace.h"
67 #include "src/core/lib/event_engine/grpc_polled_fd.h"
68 #include "src/core/lib/event_engine/time_util.h"
69 #include "src/core/lib/iomgr/resolved_address.h"
70 #include "src/core/lib/iomgr/sockaddr.h"
71 #include "src/core/util/debug_location.h"
72 #include "src/core/util/host_port.h"
73 #include "src/core/util/orphanable.h"
74 #include "src/core/util/ref_counted_ptr.h"
75 #ifdef GRPC_POSIX_SOCKET_ARES_EV_DRIVER
76 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
77 #endif
78 
79 namespace grpc_event_engine {
80 namespace experimental {
81 
82 namespace {
83 
84 // A hard limit on the number of records (A/AAAA or SRV) we may get from a
85 // single response. This is to be defensive to prevent a bad DNS response from
86 // OOMing the process.
87 constexpr int kMaxRecordSize = 65536;
88 
AresStatusToAbslStatus(int status,absl::string_view error_msg)89 absl::Status AresStatusToAbslStatus(int status, absl::string_view error_msg) {
90   switch (status) {
91     case ARES_ECANCELLED:
92       return absl::CancelledError(error_msg);
93     case ARES_ENOTIMP:
94       return absl::UnimplementedError(error_msg);
95     case ARES_ENOTFOUND:
96       return absl::NotFoundError(error_msg);
97     case ARES_ECONNREFUSED:
98       return absl::UnavailableError(error_msg);
99     default:
100       return absl::UnknownError(error_msg);
101   }
102 }
103 
104 // An alternative here could be to use ares_timeout to try to be more
105 // accurate, but that would require using "struct timeval"'s, which just
106 // makes things a bit more complicated. So just poll every second, as
107 // suggested by the c-ares code comments.
108 constexpr EventEngine::Duration kAresBackupPollAlarmDuration =
109     std::chrono::seconds(1);
110 
IsIpv6LoopbackAvailable()111 bool IsIpv6LoopbackAvailable() {
112 #ifdef GRPC_POSIX_SOCKET_ARES_EV_DRIVER
113   return PosixSocketWrapper::IsIpv6LoopbackAvailable();
114 #elif defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
115   // TODO(yijiem): implement this for Windows
116   return true;
117 #else
118 #error "Unsupported platform"
119 #endif
120 }
121 
SetRequestDNSServer(absl::string_view dns_server,ares_channel * channel)122 absl::Status SetRequestDNSServer(absl::string_view dns_server,
123                                  ares_channel* channel) {
124   GRPC_TRACE_LOG(cares_resolver, INFO)
125       << "(EventEngine c-ares resolver) Using DNS server " << dns_server;
126   grpc_resolved_address addr;
127   struct ares_addr_port_node dns_server_addr = {};
128   if (grpc_parse_ipv4_hostport(dns_server, &addr, /*log_errors=*/false)) {
129     dns_server_addr.family = AF_INET;
130     struct sockaddr_in* in = reinterpret_cast<struct sockaddr_in*>(addr.addr);
131     memcpy(&dns_server_addr.addr.addr4, &in->sin_addr, sizeof(struct in_addr));
132     dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr);
133     dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr);
134   } else if (grpc_parse_ipv6_hostport(dns_server, &addr,
135                                       /*log_errors=*/false)) {
136     dns_server_addr.family = AF_INET6;
137     struct sockaddr_in6* in6 =
138         reinterpret_cast<struct sockaddr_in6*>(addr.addr);
139     memcpy(&dns_server_addr.addr.addr6, &in6->sin6_addr,
140            sizeof(struct in6_addr));
141     dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr);
142     dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr);
143   } else {
144     return absl::InvalidArgumentError(
145         absl::StrCat("Cannot parse authority: ", dns_server));
146   }
147   int status = ares_set_servers_ports(*channel, &dns_server_addr);
148   if (status != ARES_SUCCESS) {
149     return AresStatusToAbslStatus(status, ares_strerror(status));
150   }
151   return absl::OkStatus();
152 }
153 
SortAddresses(const std::vector<EventEngine::ResolvedAddress> & addresses)154 std::vector<EventEngine::ResolvedAddress> SortAddresses(
155     const std::vector<EventEngine::ResolvedAddress>& addresses) {
156   address_sorting_sortable* sortables = static_cast<address_sorting_sortable*>(
157       gpr_zalloc(sizeof(address_sorting_sortable) * addresses.size()));
158   for (size_t i = 0; i < addresses.size(); i++) {
159     sortables[i].user_data =
160         const_cast<EventEngine::ResolvedAddress*>(&addresses[i]);
161     memcpy(&sortables[i].dest_addr.addr, addresses[i].address(),
162            addresses[i].size());
163     sortables[i].dest_addr.len = addresses[i].size();
164   }
165   address_sorting_rfc_6724_sort(sortables, addresses.size());
166   std::vector<EventEngine::ResolvedAddress> sorted_addresses;
167   sorted_addresses.reserve(addresses.size());
168   for (size_t i = 0; i < addresses.size(); ++i) {
169     sorted_addresses.emplace_back(
170         *static_cast<EventEngine::ResolvedAddress*>(sortables[i].user_data));
171   }
172   gpr_free(sortables);
173   return sorted_addresses;
174 }
175 
176 struct QueryArg {
QueryArggrpc_event_engine::experimental::__anonb0960eca0111::QueryArg177   QueryArg(AresResolver* ar, int id, absl::string_view name)
178       : ares_resolver(ar), callback_map_id(id), query_name(name) {}
179   AresResolver* ares_resolver;
180   int callback_map_id;
181   std::string query_name;
182 };
183 
184 struct HostnameQueryArg : public QueryArg {
HostnameQueryArggrpc_event_engine::experimental::__anonb0960eca0111::HostnameQueryArg185   HostnameQueryArg(AresResolver* ar, int id, absl::string_view name, int p)
186       : QueryArg(ar, id, name), port(p) {}
187   int port;
188   int pending_requests;
189   absl::Status error_status;
190   std::vector<EventEngine::ResolvedAddress> result;
191 };
192 
193 }  // namespace
194 
195 absl::StatusOr<grpc_core::OrphanablePtr<AresResolver>>
CreateAresResolver(absl::string_view dns_server,std::unique_ptr<GrpcPolledFdFactory> polled_fd_factory,std::shared_ptr<EventEngine> event_engine)196 AresResolver::CreateAresResolver(
197     absl::string_view dns_server,
198     std::unique_ptr<GrpcPolledFdFactory> polled_fd_factory,
199     std::shared_ptr<EventEngine> event_engine) {
200   ares_options opts = {};
201   opts.flags |= ARES_FLAG_STAYOPEN;
202   if (g_event_engine_grpc_ares_test_only_force_tcp) {
203     opts.flags |= ARES_FLAG_USEVC;
204   }
205   ares_channel channel;
206   int status = ares_init_options(&channel, &opts, ARES_OPT_FLAGS);
207   if (status != ARES_SUCCESS) {
208     LOG(ERROR) << "ares_init_options failed, status: " << status;
209     return AresStatusToAbslStatus(
210         status,
211         absl::StrCat("Failed to init c-ares channel: ", ares_strerror(status)));
212   }
213   event_engine_grpc_ares_test_only_inject_config(&channel);
214   polled_fd_factory->ConfigureAresChannelLocked(channel);
215   if (!dns_server.empty()) {
216     absl::Status status = SetRequestDNSServer(dns_server, &channel);
217     if (!status.ok()) {
218       return status;
219     }
220   }
221   return grpc_core::MakeOrphanable<AresResolver>(
222       std::move(polled_fd_factory), std::move(event_engine), channel);
223 }
224 
AresResolver(std::unique_ptr<GrpcPolledFdFactory> polled_fd_factory,std::shared_ptr<EventEngine> event_engine,ares_channel channel)225 AresResolver::AresResolver(
226     std::unique_ptr<GrpcPolledFdFactory> polled_fd_factory,
227     std::shared_ptr<EventEngine> event_engine, ares_channel channel)
228     : RefCountedDNSResolverInterface(
229           GRPC_TRACE_FLAG_ENABLED(cares_resolver) ? "AresResolver" : nullptr),
230       channel_(channel),
231       polled_fd_factory_(std::move(polled_fd_factory)),
232       event_engine_(std::move(event_engine)) {
233   polled_fd_factory_->Initialize(&mutex_, event_engine_.get());
234 }
235 
~AresResolver()236 AresResolver::~AresResolver() {
237   CHECK(fd_node_list_.empty());
238   CHECK(callback_map_.empty());
239   ares_destroy(channel_);
240 }
241 
Orphan()242 void AresResolver::Orphan() {
243   {
244     grpc_core::MutexLock lock(&mutex_);
245     shutting_down_ = true;
246     if (ares_backup_poll_alarm_handle_.has_value()) {
247       event_engine_->Cancel(*ares_backup_poll_alarm_handle_);
248       ares_backup_poll_alarm_handle_.reset();
249     }
250     for (const auto& fd_node : fd_node_list_) {
251       if (!fd_node->already_shutdown) {
252         GRPC_TRACE_LOG(cares_resolver, INFO)
253             << "(EventEngine c-ares resolver) resolver: " << this
254             << " shutdown fd: " << fd_node->polled_fd->GetName();
255         CHECK(fd_node->polled_fd->ShutdownLocked(
256             absl::CancelledError("AresResolver::Orphan")));
257         fd_node->already_shutdown = true;
258       }
259     }
260   }
261   Unref(DEBUG_LOCATION, "Orphan");
262 }
263 
LookupHostname(EventEngine::DNSResolver::LookupHostnameCallback callback,absl::string_view name,absl::string_view default_port)264 void AresResolver::LookupHostname(
265     EventEngine::DNSResolver::LookupHostnameCallback callback,
266     absl::string_view name, absl::string_view default_port) {
267   absl::string_view host;
268   absl::string_view port_string;
269   if (!grpc_core::SplitHostPort(name, &host, &port_string)) {
270     event_engine_->Run(
271         [callback = std::move(callback),
272          status = absl::InvalidArgumentError(absl::StrCat(
273              "Unparsable name: ", name))]() mutable { callback(status); });
274     return;
275   }
276   if (host.empty()) {
277     event_engine_->Run([callback = std::move(callback),
278                         status = absl::InvalidArgumentError(absl::StrCat(
279                             "host must not be empty in name: ",
280                             name))]() mutable { callback(status); });
281     return;
282   }
283   if (port_string.empty()) {
284     if (default_port.empty()) {
285       event_engine_->Run([callback = std::move(callback),
286                           status = absl::InvalidArgumentError(absl::StrFormat(
287                               "No port in name %s or default_port argument",
288                               name))]() mutable { callback(status); });
289       return;
290     }
291     port_string = default_port;
292   }
293   int port = 0;
294   if (port_string == "http") {
295     port = 80;
296   } else if (port_string == "https") {
297     port = 443;
298   } else if (!absl::SimpleAtoi(port_string, &port)) {
299     event_engine_->Run([callback = std::move(callback),
300                         status = absl::InvalidArgumentError(absl::StrCat(
301                             "Failed to parse port in name: ",
302                             name))]() mutable { callback(status); });
303     return;
304   }
305   // TODO(yijiem): Change this when refactoring code in
306   // src/core/lib/address_utils to use EventEngine::ResolvedAddress.
307   grpc_resolved_address addr;
308   const std::string hostport = grpc_core::JoinHostPort(host, port);
309   if (grpc_parse_ipv4_hostport(hostport.c_str(), &addr,
310                                false /* log errors */) ||
311       grpc_parse_ipv6_hostport(hostport.c_str(), &addr,
312                                false /* log errors */)) {
313     // Early out if the target is an ipv4 or ipv6 literal.
314     std::vector<EventEngine::ResolvedAddress> result;
315     result.emplace_back(reinterpret_cast<sockaddr*>(addr.addr), addr.len);
316     event_engine_->Run(
317         [callback = std::move(callback), result = std::move(result)]() mutable {
318           callback(std::move(result));
319         });
320     return;
321   }
322   grpc_core::MutexLock lock(&mutex_);
323   callback_map_.emplace(++id_, std::move(callback));
324   auto* resolver_arg = new HostnameQueryArg(this, id_, name, port);
325   if (IsIpv6LoopbackAvailable()) {
326     // Note that using AF_UNSPEC for both IPv6 and IPv4 queries does not work in
327     // all cases, e.g. for localhost:<> it only gets back the IPv6 result (i.e.
328     // ::1).
329     resolver_arg->pending_requests = 2;
330     ares_gethostbyname(channel_, std::string(host).c_str(), AF_INET,
331                        &AresResolver::OnHostbynameDoneLocked, resolver_arg);
332     ares_gethostbyname(channel_, std::string(host).c_str(), AF_INET6,
333                        &AresResolver::OnHostbynameDoneLocked, resolver_arg);
334   } else {
335     resolver_arg->pending_requests = 1;
336     ares_gethostbyname(channel_, std::string(host).c_str(), AF_INET,
337                        &AresResolver::OnHostbynameDoneLocked, resolver_arg);
338   }
339   CheckSocketsLocked();
340   MaybeStartTimerLocked();
341 }
342 
LookupSRV(EventEngine::DNSResolver::LookupSRVCallback callback,absl::string_view name)343 void AresResolver::LookupSRV(
344     EventEngine::DNSResolver::LookupSRVCallback callback,
345     absl::string_view name) {
346   absl::string_view host;
347   absl::string_view port;
348   if (!grpc_core::SplitHostPort(name, &host, &port)) {
349     event_engine_->Run(
350         [callback = std::move(callback),
351          status = absl::InvalidArgumentError(absl::StrCat(
352              "Unparsable name: ", name))]() mutable { callback(status); });
353     return;
354   }
355   if (host.empty()) {
356     event_engine_->Run([callback = std::move(callback),
357                         status = absl::InvalidArgumentError(absl::StrCat(
358                             "host must not be empty in name: ",
359                             name))]() mutable { callback(status); });
360     return;
361   }
362   // Don't query for SRV records if the target is "localhost"
363   if (absl::EqualsIgnoreCase(host, "localhost")) {
364     event_engine_->Run([callback = std::move(callback)]() mutable {
365       callback(std::vector<EventEngine::DNSResolver::SRVRecord>());
366     });
367     return;
368   }
369   grpc_core::MutexLock lock(&mutex_);
370   callback_map_.emplace(++id_, std::move(callback));
371   auto* resolver_arg = new QueryArg(this, id_, host);
372   ares_query(channel_, std::string(host).c_str(), ns_c_in, ns_t_srv,
373              &AresResolver::OnSRVQueryDoneLocked, resolver_arg);
374   CheckSocketsLocked();
375   MaybeStartTimerLocked();
376 }
377 
LookupTXT(EventEngine::DNSResolver::LookupTXTCallback callback,absl::string_view name)378 void AresResolver::LookupTXT(
379     EventEngine::DNSResolver::LookupTXTCallback callback,
380     absl::string_view name) {
381   absl::string_view host;
382   absl::string_view port;
383   if (!grpc_core::SplitHostPort(name, &host, &port)) {
384     event_engine_->Run(
385         [callback = std::move(callback),
386          status = absl::InvalidArgumentError(absl::StrCat(
387              "Unparsable name: ", name))]() mutable { callback(status); });
388     return;
389   }
390   if (host.empty()) {
391     event_engine_->Run([callback = std::move(callback),
392                         status = absl::InvalidArgumentError(absl::StrCat(
393                             "host must not be empty in name: ",
394                             name))]() mutable { callback(status); });
395     return;
396   }
397   // Don't query for TXT records if the target is "localhost"
398   if (absl::EqualsIgnoreCase(host, "localhost")) {
399     event_engine_->Run([callback = std::move(callback)]() mutable {
400       callback(std::vector<std::string>());
401     });
402     return;
403   }
404   grpc_core::MutexLock lock(&mutex_);
405   callback_map_.emplace(++id_, std::move(callback));
406   auto* resolver_arg = new QueryArg(this, id_, host);
407   ares_search(channel_, std::string(host).c_str(), ns_c_in, ns_t_txt,
408               &AresResolver::OnTXTDoneLocked, resolver_arg);
409   CheckSocketsLocked();
410   MaybeStartTimerLocked();
411 }
412 
CheckSocketsLocked()413 void AresResolver::CheckSocketsLocked() {
414   FdNodeList new_list;
415   if (!shutting_down_) {
416     ares_socket_t socks[ARES_GETSOCK_MAXNUM] = {};
417     int socks_bitmask = ares_getsock(channel_, socks, ARES_GETSOCK_MAXNUM);
418     for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
419       if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
420           ARES_GETSOCK_WRITABLE(socks_bitmask, i)) {
421         auto iter = std::find_if(
422             fd_node_list_.begin(), fd_node_list_.end(),
423             [sock = socks[i]](const auto& node) { return node->as == sock; });
424         if (iter == fd_node_list_.end()) {
425           GRPC_TRACE_LOG(cares_resolver, INFO)
426               << "(EventEngine c-ares resolver) resolver:" << this
427               << " new fd: " << socks[i];
428           new_list.push_back(std::make_unique<FdNode>(
429               socks[i], polled_fd_factory_->NewGrpcPolledFdLocked(socks[i])));
430         } else {
431           new_list.splice(new_list.end(), fd_node_list_, iter);
432         }
433         FdNode* fd_node = new_list.back().get();
434         if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
435             !fd_node->readable_registered) {
436           fd_node->readable_registered = true;
437           if (fd_node->polled_fd->IsFdStillReadableLocked()) {
438             // If c-ares is interested to read and the socket already has data
439             // available for read, schedules OnReadable directly here. This is
440             // to cope with the edge-triggered poller not getting an event if no
441             // new data arrives and c-ares hasn't read all the data in the
442             // previous ares_process_fd.
443             GRPC_TRACE_LOG(cares_resolver, INFO)
444                 << "(EventEngine c-ares resolver) resolver:" << this
445                 << " schedule read directly on: " << fd_node->as;
446             event_engine_->Run(
447                 [self = Ref(DEBUG_LOCATION, "CheckSocketsLocked"),
448                  fd_node]() mutable {
449                   static_cast<AresResolver*>(self.get())
450                       ->OnReadable(fd_node, absl::OkStatus());
451                 });
452           } else {
453             // Otherwise register with the poller for readable event.
454             GRPC_TRACE_LOG(cares_resolver, INFO)
455                 << "(EventEngine c-ares resolver) resolver:" << this
456                 << " notify read on: " << fd_node->as;
457             fd_node->polled_fd->RegisterForOnReadableLocked(
458                 [self = Ref(DEBUG_LOCATION, "CheckSocketsLocked"),
459                  fd_node](absl::Status status) mutable {
460                   static_cast<AresResolver*>(self.get())
461                       ->OnReadable(fd_node, status);
462                 });
463           }
464         }
465         // Register write_closure if the socket is writable and write_closure
466         // has not been registered with this socket.
467         if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
468             !fd_node->writable_registered) {
469           GRPC_TRACE_LOG(cares_resolver, INFO)
470               << "(EventEngine c-ares resolver) resolver:" << this
471               << " notify write on: " << fd_node->as;
472           fd_node->writable_registered = true;
473           fd_node->polled_fd->RegisterForOnWriteableLocked(
474               [self = Ref(DEBUG_LOCATION, "CheckSocketsLocked"),
475                fd_node](absl::Status status) mutable {
476                 static_cast<AresResolver*>(self.get())
477                     ->OnWritable(fd_node, status);
478               });
479         }
480       }
481     }
482   }
483   // Any remaining fds in fd_node_list_ were not returned by ares_getsock()
484   // and are therefore no longer in use, so they can be shut down and removed
485   // from the list.
486   // TODO(yijiem): Since we are keeping the underlying socket opened for both
487   // Posix and Windows, it might be reasonable to also keep the FdNodes alive
488   // till the end. But we need to change the state management of FdNodes in this
489   // file. This may simplify the code a bit.
490   while (!fd_node_list_.empty()) {
491     FdNode* fd_node = fd_node_list_.front().get();
492     if (!fd_node->already_shutdown) {
493       GRPC_TRACE_LOG(cares_resolver, INFO)
494           << "(EventEngine c-ares resolver) resolver: " << this
495           << " shutdown fd: " << fd_node->polled_fd->GetName();
496       fd_node->already_shutdown =
497           fd_node->polled_fd->ShutdownLocked(absl::OkStatus());
498     }
499     if (!fd_node->readable_registered && !fd_node->writable_registered) {
500       GRPC_TRACE_LOG(cares_resolver, INFO)
501           << "(EventEngine c-ares resolver) resolver: " << this
502           << " delete fd: " << fd_node->polled_fd->GetName();
503       fd_node_list_.pop_front();
504     } else {
505       new_list.splice(new_list.end(), fd_node_list_, fd_node_list_.begin());
506     }
507   }
508   fd_node_list_ = std::move(new_list);
509 }
510 
MaybeStartTimerLocked()511 void AresResolver::MaybeStartTimerLocked() {
512   if (ares_backup_poll_alarm_handle_.has_value()) {
513     return;
514   }
515   // Initialize the backup poll alarm
516   GRPC_TRACE_LOG(cares_resolver, INFO)
517       << "(EventEngine c-ares resolver) request:" << this
518       << " MaybeStartTimerLocked next ares process poll time in "
519       << Milliseconds(kAresBackupPollAlarmDuration) << " ms";
520   ares_backup_poll_alarm_handle_ = event_engine_->RunAfter(
521       kAresBackupPollAlarmDuration,
522       [self = Ref(DEBUG_LOCATION, "MaybeStartTimerLocked")]() {
523         static_cast<AresResolver*>(self.get())->OnAresBackupPollAlarm();
524       });
525 }
526 
OnReadable(FdNode * fd_node,absl::Status status)527 void AresResolver::OnReadable(FdNode* fd_node, absl::Status status) {
528   grpc_core::MutexLock lock(&mutex_);
529   CHECK(fd_node->readable_registered);
530   fd_node->readable_registered = false;
531   GRPC_TRACE_LOG(cares_resolver, INFO)
532       << "(EventEngine c-ares resolver) OnReadable: fd: " << fd_node->as
533       << "; request: " << this << "; status: " << status;
534   if (status.ok() && !shutting_down_) {
535     ares_process_fd(channel_, fd_node->as, ARES_SOCKET_BAD);
536   } else {
537     // If error is not absl::OkStatus() or the resolution was cancelled, it
538     // means the fd has been shutdown or timed out. The pending lookups made
539     // on this request will be cancelled by the following ares_cancel(). The
540     // remaining file descriptors in this request will be cleaned up in the
541     // following Work() method.
542     ares_cancel(channel_);
543   }
544   CheckSocketsLocked();
545 }
546 
OnWritable(FdNode * fd_node,absl::Status status)547 void AresResolver::OnWritable(FdNode* fd_node, absl::Status status) {
548   grpc_core::MutexLock lock(&mutex_);
549   CHECK(fd_node->writable_registered);
550   fd_node->writable_registered = false;
551   GRPC_TRACE_LOG(cares_resolver, INFO)
552       << "(EventEngine c-ares resolver) OnWritable: fd: " << fd_node->as
553       << "; request:" << this << "; status: " << status;
554   if (status.ok() && !shutting_down_) {
555     ares_process_fd(channel_, ARES_SOCKET_BAD, fd_node->as);
556   } else {
557     // If error is not absl::OkStatus() or the resolution was cancelled, it
558     // means the fd has been shutdown or timed out. The pending lookups made
559     // on this request will be cancelled by the following ares_cancel(). The
560     // remaining file descriptors in this request will be cleaned up in the
561     // following Work() method.
562     ares_cancel(channel_);
563   }
564   CheckSocketsLocked();
565 }
566 
567 // In case of non-responsive DNS servers, dropped packets, etc., c-ares has
568 // intelligent timeout and retry logic, which we can take advantage of by
569 // polling ares_process_fd on time intervals. Overall, the c-ares library is
570 // meant to be called into and given a chance to proceed name resolution:
571 //   a) when fd events happen
572 //   b) when some time has passed without fd events having happened
573 // For the latter, we use this backup poller. Also see
574 // https://github.com/grpc/grpc/pull/17688 description for more details.
OnAresBackupPollAlarm()575 void AresResolver::OnAresBackupPollAlarm() {
576   grpc_core::MutexLock lock(&mutex_);
577   ares_backup_poll_alarm_handle_.reset();
578   GRPC_TRACE_LOG(cares_resolver, INFO)
579       << "(EventEngine c-ares resolver) request:" << this
580       << " OnAresBackupPollAlarm shutting_down=" << shutting_down_;
581   if (!shutting_down_) {
582     for (const auto& fd_node : fd_node_list_) {
583       if (!fd_node->already_shutdown) {
584         GRPC_TRACE_LOG(cares_resolver, INFO)
585             << "(EventEngine c-ares resolver) request:" << this
586             << " OnAresBackupPollAlarm; ares_process_fd. fd="
587             << fd_node->polled_fd->GetName();
588         ares_socket_t as = fd_node->polled_fd->GetWrappedAresSocketLocked();
589         ares_process_fd(channel_, as, as);
590       }
591     }
592     MaybeStartTimerLocked();
593     CheckSocketsLocked();
594   }
595 }
596 
OnHostbynameDoneLocked(void * arg,int status,int,struct hostent * hostent)597 void AresResolver::OnHostbynameDoneLocked(void* arg, int status,
598                                           int /*timeouts*/,
599                                           struct hostent* hostent) {
600   auto* hostname_qa = static_cast<HostnameQueryArg*>(arg);
601   CHECK_GT(hostname_qa->pending_requests--, 0);
602   auto* ares_resolver = hostname_qa->ares_resolver;
603   if (status != ARES_SUCCESS) {
604     std::string error_msg =
605         absl::StrFormat("address lookup failed for %s: %s",
606                         hostname_qa->query_name, ares_strerror(status));
607     GRPC_TRACE_LOG(cares_resolver, INFO)
608         << "(EventEngine c-ares resolver) resolver:" << ares_resolver
609         << " OnHostbynameDoneLocked: " << error_msg;
610     hostname_qa->error_status = AresStatusToAbslStatus(status, error_msg);
611   } else {
612     GRPC_TRACE_LOG(cares_resolver, INFO)
613         << "(EventEngine c-ares resolver) resolver:" << ares_resolver
614         << " OnHostbynameDoneLocked name=" << hostname_qa->query_name
615         << " ARES_SUCCESS";
616     for (size_t i = 0; hostent->h_addr_list[i] != nullptr; i++) {
617       if (hostname_qa->result.size() == kMaxRecordSize) {
618         LOG(ERROR) << "A/AAAA response exceeds maximum record size of 65536";
619         break;
620       }
621       switch (hostent->h_addrtype) {
622         case AF_INET6: {
623           size_t addr_len = sizeof(struct sockaddr_in6);
624           struct sockaddr_in6 addr;
625           memset(&addr, 0, addr_len);
626           memcpy(&addr.sin6_addr, hostent->h_addr_list[i],
627                  sizeof(struct in6_addr));
628           addr.sin6_family = static_cast<unsigned char>(hostent->h_addrtype);
629           addr.sin6_port = htons(hostname_qa->port);
630           hostname_qa->result.emplace_back(
631               reinterpret_cast<const sockaddr*>(&addr), addr_len);
632           char output[INET6_ADDRSTRLEN];
633           ares_inet_ntop(AF_INET6, &addr.sin6_addr, output, INET6_ADDRSTRLEN);
634           GRPC_TRACE_LOG(cares_resolver, INFO)
635               << "(EventEngine c-ares resolver) resolver:" << ares_resolver
636               << " c-ares resolver gets a AF_INET6 result: \n  addr: " << output
637               << "\n  port: " << hostname_qa->port
638               << "\n  sin6_scope_id: " << addr.sin6_scope_id;
639           break;
640         }
641         case AF_INET: {
642           size_t addr_len = sizeof(struct sockaddr_in);
643           struct sockaddr_in addr;
644           memset(&addr, 0, addr_len);
645           memcpy(&addr.sin_addr, hostent->h_addr_list[i],
646                  sizeof(struct in_addr));
647           addr.sin_family = static_cast<unsigned char>(hostent->h_addrtype);
648           addr.sin_port = htons(hostname_qa->port);
649           hostname_qa->result.emplace_back(
650               reinterpret_cast<const sockaddr*>(&addr), addr_len);
651           char output[INET_ADDRSTRLEN];
652           ares_inet_ntop(AF_INET, &addr.sin_addr, output, INET_ADDRSTRLEN);
653           GRPC_TRACE_LOG(cares_resolver, INFO)
654               << "(EventEngine c-ares resolver) resolver:" << ares_resolver
655               << " c-ares resolver gets a AF_INET result: \n  addr: " << output
656               << "\n  port: " << hostname_qa->port;
657           break;
658         }
659         default:
660           grpc_core::Crash(
661               absl::StrFormat("resolver:%p Received invalid type of address %d",
662                               ares_resolver, hostent->h_addrtype));
663       }
664     }
665   }
666   if (hostname_qa->pending_requests == 0) {
667     auto nh =
668         ares_resolver->callback_map_.extract(hostname_qa->callback_map_id);
669     CHECK(!nh.empty());
670     CHECK(absl::holds_alternative<
671           EventEngine::DNSResolver::LookupHostnameCallback>(nh.mapped()));
672     auto callback = absl::get<EventEngine::DNSResolver::LookupHostnameCallback>(
673         std::move(nh.mapped()));
674     if (!hostname_qa->result.empty() || hostname_qa->error_status.ok()) {
675       ares_resolver->event_engine_->Run(
676           [callback = std::move(callback),
677            result = SortAddresses(hostname_qa->result)]() mutable {
678             callback(std::move(result));
679           });
680     } else {
681       ares_resolver->event_engine_->Run(
682           [callback = std::move(callback),
683            result = std::move(hostname_qa->error_status)]() mutable {
684             callback(std::move(result));
685           });
686     }
687     delete hostname_qa;
688   }
689 }
690 
OnSRVQueryDoneLocked(void * arg,int status,int,unsigned char * abuf,int alen)691 void AresResolver::OnSRVQueryDoneLocked(void* arg, int status, int /*timeouts*/,
692                                         unsigned char* abuf, int alen) {
693   std::unique_ptr<QueryArg> qa(static_cast<QueryArg*>(arg));
694   auto* ares_resolver = qa->ares_resolver;
695   auto nh = ares_resolver->callback_map_.extract(qa->callback_map_id);
696   CHECK(!nh.empty());
697   CHECK(absl::holds_alternative<EventEngine::DNSResolver::LookupSRVCallback>(
698       nh.mapped()));
699   auto callback = absl::get<EventEngine::DNSResolver::LookupSRVCallback>(
700       std::move(nh.mapped()));
701   auto fail = [&](absl::string_view prefix) {
702     std::string error_message = absl::StrFormat(
703         "%s for %s: %s", prefix, qa->query_name, ares_strerror(status));
704     GRPC_TRACE_LOG(cares_resolver, INFO)
705         << "(EventEngine c-ares resolver) OnSRVQueryDoneLocked: "
706         << error_message;
707     ares_resolver->event_engine_->Run(
708         [callback = std::move(callback),
709          status = AresStatusToAbslStatus(status, error_message)]() mutable {
710           callback(status);
711         });
712   };
713   if (status != ARES_SUCCESS) {
714     fail("SRV lookup failed");
715     return;
716   }
717   GRPC_TRACE_LOG(cares_resolver, INFO)
718       << "(EventEngine c-ares resolver) resolver:" << ares_resolver
719       << " OnSRVQueryDoneLocked name=" << qa->query_name << " ARES_SUCCESS";
720   struct ares_srv_reply* reply = nullptr;
721   status = ares_parse_srv_reply(abuf, alen, &reply);
722   GRPC_TRACE_LOG(cares_resolver, INFO)
723       << "(EventEngine c-ares resolver) resolver:" << ares_resolver
724       << " ares_parse_srv_reply: " << status;
725   if (status != ARES_SUCCESS) {
726     fail("Failed to parse SRV reply");
727     return;
728   }
729   std::vector<EventEngine::DNSResolver::SRVRecord> result;
730   for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr;
731        srv_it = srv_it->next) {
732     if (result.size() == kMaxRecordSize) {
733       LOG(ERROR) << "SRV response exceeds maximum record size of 65536";
734       break;
735     }
736     EventEngine::DNSResolver::SRVRecord record;
737     record.host = srv_it->host;
738     record.port = srv_it->port;
739     record.priority = srv_it->priority;
740     record.weight = srv_it->weight;
741     result.push_back(std::move(record));
742   }
743   if (reply != nullptr) {
744     ares_free_data(reply);
745   }
746   ares_resolver->event_engine_->Run(
747       [callback = std::move(callback), result = std::move(result)]() mutable {
748         callback(std::move(result));
749       });
750 }
751 
OnTXTDoneLocked(void * arg,int status,int,unsigned char * buf,int len)752 void AresResolver::OnTXTDoneLocked(void* arg, int status, int /*timeouts*/,
753                                    unsigned char* buf, int len) {
754   std::unique_ptr<QueryArg> qa(static_cast<QueryArg*>(arg));
755   auto* ares_resolver = qa->ares_resolver;
756   auto nh = ares_resolver->callback_map_.extract(qa->callback_map_id);
757   CHECK(!nh.empty());
758   CHECK(absl::holds_alternative<EventEngine::DNSResolver::LookupTXTCallback>(
759       nh.mapped()));
760   auto callback = absl::get<EventEngine::DNSResolver::LookupTXTCallback>(
761       std::move(nh.mapped()));
762   auto fail = [&](absl::string_view prefix) {
763     std::string error_message = absl::StrFormat(
764         "%s for %s: %s", prefix, qa->query_name, ares_strerror(status));
765     GRPC_TRACE_LOG(cares_resolver, INFO)
766         << "(EventEngine c-ares resolver) resolver:" << ares_resolver
767         << " OnTXTDoneLocked: " << error_message;
768     ares_resolver->event_engine_->Run(
769         [callback = std::move(callback),
770          status = AresStatusToAbslStatus(status, error_message)]() mutable {
771           callback(status);
772         });
773   };
774   if (status != ARES_SUCCESS) {
775     fail("TXT lookup failed");
776     return;
777   }
778   GRPC_TRACE_LOG(cares_resolver, INFO)
779       << "(EventEngine c-ares resolver) resolver:" << ares_resolver
780       << " OnTXTDoneLocked name=" << qa->query_name << " ARES_SUCCESS";
781   struct ares_txt_ext* reply = nullptr;
782   status = ares_parse_txt_reply_ext(buf, len, &reply);
783   if (status != ARES_SUCCESS) {
784     fail("Failed to parse TXT result");
785     return;
786   }
787   std::vector<std::string> result;
788   for (struct ares_txt_ext* part = reply; part != nullptr; part = part->next) {
789     if (part->record_start) {
790       result.emplace_back(reinterpret_cast<char*>(part->txt), part->length);
791     } else {
792       absl::StrAppend(
793           &result.back(),
794           std::string(reinterpret_cast<char*>(part->txt), part->length));
795     }
796   }
797   GRPC_TRACE_LOG(cares_resolver, INFO)
798       << "(EventEngine c-ares resolver) resolver:" << ares_resolver << " Got "
799       << result.size() << " TXT records";
800   if (GRPC_TRACE_FLAG_ENABLED(cares_resolver)) {
801     for (const auto& record : result) {
802       LOG(INFO) << record;
803     }
804   }
805   // Clean up.
806   ares_free_data(reply);
807   ares_resolver->event_engine_->Run(
808       [callback = std::move(callback), result = std::move(result)]() mutable {
809         callback(std::move(result));
810       });
811 }
812 
813 }  // namespace experimental
814 }  // namespace grpc_event_engine
815 
noop_inject_channel_config(ares_channel *)816 void noop_inject_channel_config(ares_channel* /*channel*/) {}
817 
818 void (*event_engine_grpc_ares_test_only_inject_config)(ares_channel* channel) =
819     noop_inject_channel_config;
820 
821 bool g_event_engine_grpc_ares_test_only_force_tcp = false;
822 
ShouldUseAresDnsResolver()823 bool ShouldUseAresDnsResolver() {
824 #if defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) || \
825     defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
826   auto resolver_env = grpc_core::ConfigVars::Get().DnsResolver();
827   return resolver_env.empty() || absl::EqualsIgnoreCase(resolver_env, "ares");
828 #else   // defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) ||
829         // defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
830   return false;
831 #endif  // defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) ||
832         // defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
833 }
834 
AresInit()835 absl::Status AresInit() {
836   if (ShouldUseAresDnsResolver()) {
837     // ares_library_init and ares_library_cleanup are currently no-op except
838     // under Windows. Calling them may cause race conditions when other parts of
839     // the binary calls these functions concurrently.
840 #ifdef GPR_WINDOWS
841     int status = ares_library_init(ARES_LIB_INIT_ALL);
842     if (status != ARES_SUCCESS) {
843       return GRPC_ERROR_CREATE(
844           absl::StrCat("ares_library_init failed: ", ares_strerror(status)));
845     }
846 #endif  // GPR_WINDOWS
847   }
848   return absl::OkStatus();
849 }
AresShutdown()850 void AresShutdown() {
851   if (ShouldUseAresDnsResolver()) {
852     // ares_library_init and ares_library_cleanup are currently no-op except
853     // under Windows. Calling them may cause race conditions when other parts of
854     // the binary calls these functions concurrently.
855 #ifdef GPR_WINDOWS
856     ares_library_cleanup();
857 #endif  // GPR_WINDOWS
858   }
859 }
860 
861 #else  // GRPC_ARES == 1
862 
ShouldUseAresDnsResolver()863 bool ShouldUseAresDnsResolver() { return false; }
AresInit()864 absl::Status AresInit() { return absl::OkStatus(); }
AresShutdown()865 void AresShutdown() {}
866 
867 #endif  // GRPC_ARES == 1
868