1 // Copyright 2023 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/event_engine/ares_resolver.h"
15
16 #include <grpc/support/port_platform.h>
17
18 #include <string>
19 #include <vector>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 // IWYU pragma: no_include <ares_version.h>
24 // IWYU pragma: no_include <arpa/inet.h>
25 // IWYU pragma: no_include <arpa/nameser.h>
26 // IWYU pragma: no_include <inttypes.h>
27 // IWYU pragma: no_include <netdb.h>
28 // IWYU pragma: no_include <netinet/in.h>
29 // IWYU pragma: no_include <stdlib.h>
30 // IWYU pragma: no_include <sys/socket.h>
31 // IWYU pragma: no_include <ratio>
32
33 #if GRPC_ARES == 1
34
35 #include <address_sorting/address_sorting.h>
36 #include <ares.h>
37
38 #if ARES_VERSION >= 0x011200
39 // c-ares 1.18.0 or later starts to provide ares_nameser.h as a public header.
40 #include <ares_nameser.h>
41 #else
42 #include "src/core/lib/event_engine/nameser.h" // IWYU pragma: keep
43 #endif
44
45 #include <grpc/event_engine/event_engine.h>
46 #include <string.h>
47
48 #include <algorithm>
49 #include <chrono>
50 #include <memory>
51 #include <type_traits>
52 #include <utility>
53
54 #include "absl/functional/any_invocable.h"
55 #include "absl/hash/hash.h"
56 #include "absl/log/check.h"
57 #include "absl/log/log.h"
58 #include "absl/strings/match.h"
59 #include "absl/strings/numbers.h"
60 #include "absl/strings/str_cat.h"
61 #include "absl/strings/str_format.h"
62 #include "absl/types/optional.h"
63 #include "src/core/config/config_vars.h"
64 #include "src/core/lib/address_utils/parse_address.h"
65 #include "src/core/lib/address_utils/sockaddr_utils.h"
66 #include "src/core/lib/debug/trace.h"
67 #include "src/core/lib/event_engine/grpc_polled_fd.h"
68 #include "src/core/lib/event_engine/time_util.h"
69 #include "src/core/lib/iomgr/resolved_address.h"
70 #include "src/core/lib/iomgr/sockaddr.h"
71 #include "src/core/util/debug_location.h"
72 #include "src/core/util/host_port.h"
73 #include "src/core/util/orphanable.h"
74 #include "src/core/util/ref_counted_ptr.h"
75 #ifdef GRPC_POSIX_SOCKET_ARES_EV_DRIVER
76 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
77 #endif
78
79 namespace grpc_event_engine {
80 namespace experimental {
81
82 namespace {
83
84 // A hard limit on the number of records (A/AAAA or SRV) we may get from a
85 // single response. This is to be defensive to prevent a bad DNS response from
86 // OOMing the process.
87 constexpr int kMaxRecordSize = 65536;
88
AresStatusToAbslStatus(int status,absl::string_view error_msg)89 absl::Status AresStatusToAbslStatus(int status, absl::string_view error_msg) {
90 switch (status) {
91 case ARES_ECANCELLED:
92 return absl::CancelledError(error_msg);
93 case ARES_ENOTIMP:
94 return absl::UnimplementedError(error_msg);
95 case ARES_ENOTFOUND:
96 return absl::NotFoundError(error_msg);
97 case ARES_ECONNREFUSED:
98 return absl::UnavailableError(error_msg);
99 default:
100 return absl::UnknownError(error_msg);
101 }
102 }
103
104 // An alternative here could be to use ares_timeout to try to be more
105 // accurate, but that would require using "struct timeval"'s, which just
106 // makes things a bit more complicated. So just poll every second, as
107 // suggested by the c-ares code comments.
108 constexpr EventEngine::Duration kAresBackupPollAlarmDuration =
109 std::chrono::seconds(1);
110
IsIpv6LoopbackAvailable()111 bool IsIpv6LoopbackAvailable() {
112 #ifdef GRPC_POSIX_SOCKET_ARES_EV_DRIVER
113 return PosixSocketWrapper::IsIpv6LoopbackAvailable();
114 #elif defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
115 // TODO(yijiem): implement this for Windows
116 return true;
117 #else
118 #error "Unsupported platform"
119 #endif
120 }
121
SetRequestDNSServer(absl::string_view dns_server,ares_channel * channel)122 absl::Status SetRequestDNSServer(absl::string_view dns_server,
123 ares_channel* channel) {
124 GRPC_TRACE_LOG(cares_resolver, INFO)
125 << "(EventEngine c-ares resolver) Using DNS server " << dns_server;
126 grpc_resolved_address addr;
127 struct ares_addr_port_node dns_server_addr = {};
128 if (grpc_parse_ipv4_hostport(dns_server, &addr, /*log_errors=*/false)) {
129 dns_server_addr.family = AF_INET;
130 struct sockaddr_in* in = reinterpret_cast<struct sockaddr_in*>(addr.addr);
131 memcpy(&dns_server_addr.addr.addr4, &in->sin_addr, sizeof(struct in_addr));
132 dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr);
133 dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr);
134 } else if (grpc_parse_ipv6_hostport(dns_server, &addr,
135 /*log_errors=*/false)) {
136 dns_server_addr.family = AF_INET6;
137 struct sockaddr_in6* in6 =
138 reinterpret_cast<struct sockaddr_in6*>(addr.addr);
139 memcpy(&dns_server_addr.addr.addr6, &in6->sin6_addr,
140 sizeof(struct in6_addr));
141 dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr);
142 dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr);
143 } else {
144 return absl::InvalidArgumentError(
145 absl::StrCat("Cannot parse authority: ", dns_server));
146 }
147 int status = ares_set_servers_ports(*channel, &dns_server_addr);
148 if (status != ARES_SUCCESS) {
149 return AresStatusToAbslStatus(status, ares_strerror(status));
150 }
151 return absl::OkStatus();
152 }
153
SortAddresses(const std::vector<EventEngine::ResolvedAddress> & addresses)154 std::vector<EventEngine::ResolvedAddress> SortAddresses(
155 const std::vector<EventEngine::ResolvedAddress>& addresses) {
156 address_sorting_sortable* sortables = static_cast<address_sorting_sortable*>(
157 gpr_zalloc(sizeof(address_sorting_sortable) * addresses.size()));
158 for (size_t i = 0; i < addresses.size(); i++) {
159 sortables[i].user_data =
160 const_cast<EventEngine::ResolvedAddress*>(&addresses[i]);
161 memcpy(&sortables[i].dest_addr.addr, addresses[i].address(),
162 addresses[i].size());
163 sortables[i].dest_addr.len = addresses[i].size();
164 }
165 address_sorting_rfc_6724_sort(sortables, addresses.size());
166 std::vector<EventEngine::ResolvedAddress> sorted_addresses;
167 sorted_addresses.reserve(addresses.size());
168 for (size_t i = 0; i < addresses.size(); ++i) {
169 sorted_addresses.emplace_back(
170 *static_cast<EventEngine::ResolvedAddress*>(sortables[i].user_data));
171 }
172 gpr_free(sortables);
173 return sorted_addresses;
174 }
175
176 struct QueryArg {
QueryArggrpc_event_engine::experimental::__anonb0960eca0111::QueryArg177 QueryArg(AresResolver* ar, int id, absl::string_view name)
178 : ares_resolver(ar), callback_map_id(id), query_name(name) {}
179 AresResolver* ares_resolver;
180 int callback_map_id;
181 std::string query_name;
182 };
183
184 struct HostnameQueryArg : public QueryArg {
HostnameQueryArggrpc_event_engine::experimental::__anonb0960eca0111::HostnameQueryArg185 HostnameQueryArg(AresResolver* ar, int id, absl::string_view name, int p)
186 : QueryArg(ar, id, name), port(p) {}
187 int port;
188 int pending_requests;
189 absl::Status error_status;
190 std::vector<EventEngine::ResolvedAddress> result;
191 };
192
193 } // namespace
194
195 absl::StatusOr<grpc_core::OrphanablePtr<AresResolver>>
CreateAresResolver(absl::string_view dns_server,std::unique_ptr<GrpcPolledFdFactory> polled_fd_factory,std::shared_ptr<EventEngine> event_engine)196 AresResolver::CreateAresResolver(
197 absl::string_view dns_server,
198 std::unique_ptr<GrpcPolledFdFactory> polled_fd_factory,
199 std::shared_ptr<EventEngine> event_engine) {
200 ares_options opts = {};
201 opts.flags |= ARES_FLAG_STAYOPEN;
202 if (g_event_engine_grpc_ares_test_only_force_tcp) {
203 opts.flags |= ARES_FLAG_USEVC;
204 }
205 ares_channel channel;
206 int status = ares_init_options(&channel, &opts, ARES_OPT_FLAGS);
207 if (status != ARES_SUCCESS) {
208 LOG(ERROR) << "ares_init_options failed, status: " << status;
209 return AresStatusToAbslStatus(
210 status,
211 absl::StrCat("Failed to init c-ares channel: ", ares_strerror(status)));
212 }
213 event_engine_grpc_ares_test_only_inject_config(&channel);
214 polled_fd_factory->ConfigureAresChannelLocked(channel);
215 if (!dns_server.empty()) {
216 absl::Status status = SetRequestDNSServer(dns_server, &channel);
217 if (!status.ok()) {
218 return status;
219 }
220 }
221 return grpc_core::MakeOrphanable<AresResolver>(
222 std::move(polled_fd_factory), std::move(event_engine), channel);
223 }
224
AresResolver(std::unique_ptr<GrpcPolledFdFactory> polled_fd_factory,std::shared_ptr<EventEngine> event_engine,ares_channel channel)225 AresResolver::AresResolver(
226 std::unique_ptr<GrpcPolledFdFactory> polled_fd_factory,
227 std::shared_ptr<EventEngine> event_engine, ares_channel channel)
228 : RefCountedDNSResolverInterface(
229 GRPC_TRACE_FLAG_ENABLED(cares_resolver) ? "AresResolver" : nullptr),
230 channel_(channel),
231 polled_fd_factory_(std::move(polled_fd_factory)),
232 event_engine_(std::move(event_engine)) {
233 polled_fd_factory_->Initialize(&mutex_, event_engine_.get());
234 }
235
~AresResolver()236 AresResolver::~AresResolver() {
237 CHECK(fd_node_list_.empty());
238 CHECK(callback_map_.empty());
239 ares_destroy(channel_);
240 }
241
Orphan()242 void AresResolver::Orphan() {
243 {
244 grpc_core::MutexLock lock(&mutex_);
245 shutting_down_ = true;
246 if (ares_backup_poll_alarm_handle_.has_value()) {
247 event_engine_->Cancel(*ares_backup_poll_alarm_handle_);
248 ares_backup_poll_alarm_handle_.reset();
249 }
250 for (const auto& fd_node : fd_node_list_) {
251 if (!fd_node->already_shutdown) {
252 GRPC_TRACE_LOG(cares_resolver, INFO)
253 << "(EventEngine c-ares resolver) resolver: " << this
254 << " shutdown fd: " << fd_node->polled_fd->GetName();
255 CHECK(fd_node->polled_fd->ShutdownLocked(
256 absl::CancelledError("AresResolver::Orphan")));
257 fd_node->already_shutdown = true;
258 }
259 }
260 }
261 Unref(DEBUG_LOCATION, "Orphan");
262 }
263
LookupHostname(EventEngine::DNSResolver::LookupHostnameCallback callback,absl::string_view name,absl::string_view default_port)264 void AresResolver::LookupHostname(
265 EventEngine::DNSResolver::LookupHostnameCallback callback,
266 absl::string_view name, absl::string_view default_port) {
267 absl::string_view host;
268 absl::string_view port_string;
269 if (!grpc_core::SplitHostPort(name, &host, &port_string)) {
270 event_engine_->Run(
271 [callback = std::move(callback),
272 status = absl::InvalidArgumentError(absl::StrCat(
273 "Unparsable name: ", name))]() mutable { callback(status); });
274 return;
275 }
276 if (host.empty()) {
277 event_engine_->Run([callback = std::move(callback),
278 status = absl::InvalidArgumentError(absl::StrCat(
279 "host must not be empty in name: ",
280 name))]() mutable { callback(status); });
281 return;
282 }
283 if (port_string.empty()) {
284 if (default_port.empty()) {
285 event_engine_->Run([callback = std::move(callback),
286 status = absl::InvalidArgumentError(absl::StrFormat(
287 "No port in name %s or default_port argument",
288 name))]() mutable { callback(status); });
289 return;
290 }
291 port_string = default_port;
292 }
293 int port = 0;
294 if (port_string == "http") {
295 port = 80;
296 } else if (port_string == "https") {
297 port = 443;
298 } else if (!absl::SimpleAtoi(port_string, &port)) {
299 event_engine_->Run([callback = std::move(callback),
300 status = absl::InvalidArgumentError(absl::StrCat(
301 "Failed to parse port in name: ",
302 name))]() mutable { callback(status); });
303 return;
304 }
305 // TODO(yijiem): Change this when refactoring code in
306 // src/core/lib/address_utils to use EventEngine::ResolvedAddress.
307 grpc_resolved_address addr;
308 const std::string hostport = grpc_core::JoinHostPort(host, port);
309 if (grpc_parse_ipv4_hostport(hostport.c_str(), &addr,
310 false /* log errors */) ||
311 grpc_parse_ipv6_hostport(hostport.c_str(), &addr,
312 false /* log errors */)) {
313 // Early out if the target is an ipv4 or ipv6 literal.
314 std::vector<EventEngine::ResolvedAddress> result;
315 result.emplace_back(reinterpret_cast<sockaddr*>(addr.addr), addr.len);
316 event_engine_->Run(
317 [callback = std::move(callback), result = std::move(result)]() mutable {
318 callback(std::move(result));
319 });
320 return;
321 }
322 grpc_core::MutexLock lock(&mutex_);
323 callback_map_.emplace(++id_, std::move(callback));
324 auto* resolver_arg = new HostnameQueryArg(this, id_, name, port);
325 if (IsIpv6LoopbackAvailable()) {
326 // Note that using AF_UNSPEC for both IPv6 and IPv4 queries does not work in
327 // all cases, e.g. for localhost:<> it only gets back the IPv6 result (i.e.
328 // ::1).
329 resolver_arg->pending_requests = 2;
330 ares_gethostbyname(channel_, std::string(host).c_str(), AF_INET,
331 &AresResolver::OnHostbynameDoneLocked, resolver_arg);
332 ares_gethostbyname(channel_, std::string(host).c_str(), AF_INET6,
333 &AresResolver::OnHostbynameDoneLocked, resolver_arg);
334 } else {
335 resolver_arg->pending_requests = 1;
336 ares_gethostbyname(channel_, std::string(host).c_str(), AF_INET,
337 &AresResolver::OnHostbynameDoneLocked, resolver_arg);
338 }
339 CheckSocketsLocked();
340 MaybeStartTimerLocked();
341 }
342
LookupSRV(EventEngine::DNSResolver::LookupSRVCallback callback,absl::string_view name)343 void AresResolver::LookupSRV(
344 EventEngine::DNSResolver::LookupSRVCallback callback,
345 absl::string_view name) {
346 absl::string_view host;
347 absl::string_view port;
348 if (!grpc_core::SplitHostPort(name, &host, &port)) {
349 event_engine_->Run(
350 [callback = std::move(callback),
351 status = absl::InvalidArgumentError(absl::StrCat(
352 "Unparsable name: ", name))]() mutable { callback(status); });
353 return;
354 }
355 if (host.empty()) {
356 event_engine_->Run([callback = std::move(callback),
357 status = absl::InvalidArgumentError(absl::StrCat(
358 "host must not be empty in name: ",
359 name))]() mutable { callback(status); });
360 return;
361 }
362 // Don't query for SRV records if the target is "localhost"
363 if (absl::EqualsIgnoreCase(host, "localhost")) {
364 event_engine_->Run([callback = std::move(callback)]() mutable {
365 callback(std::vector<EventEngine::DNSResolver::SRVRecord>());
366 });
367 return;
368 }
369 grpc_core::MutexLock lock(&mutex_);
370 callback_map_.emplace(++id_, std::move(callback));
371 auto* resolver_arg = new QueryArg(this, id_, host);
372 ares_query(channel_, std::string(host).c_str(), ns_c_in, ns_t_srv,
373 &AresResolver::OnSRVQueryDoneLocked, resolver_arg);
374 CheckSocketsLocked();
375 MaybeStartTimerLocked();
376 }
377
LookupTXT(EventEngine::DNSResolver::LookupTXTCallback callback,absl::string_view name)378 void AresResolver::LookupTXT(
379 EventEngine::DNSResolver::LookupTXTCallback callback,
380 absl::string_view name) {
381 absl::string_view host;
382 absl::string_view port;
383 if (!grpc_core::SplitHostPort(name, &host, &port)) {
384 event_engine_->Run(
385 [callback = std::move(callback),
386 status = absl::InvalidArgumentError(absl::StrCat(
387 "Unparsable name: ", name))]() mutable { callback(status); });
388 return;
389 }
390 if (host.empty()) {
391 event_engine_->Run([callback = std::move(callback),
392 status = absl::InvalidArgumentError(absl::StrCat(
393 "host must not be empty in name: ",
394 name))]() mutable { callback(status); });
395 return;
396 }
397 // Don't query for TXT records if the target is "localhost"
398 if (absl::EqualsIgnoreCase(host, "localhost")) {
399 event_engine_->Run([callback = std::move(callback)]() mutable {
400 callback(std::vector<std::string>());
401 });
402 return;
403 }
404 grpc_core::MutexLock lock(&mutex_);
405 callback_map_.emplace(++id_, std::move(callback));
406 auto* resolver_arg = new QueryArg(this, id_, host);
407 ares_search(channel_, std::string(host).c_str(), ns_c_in, ns_t_txt,
408 &AresResolver::OnTXTDoneLocked, resolver_arg);
409 CheckSocketsLocked();
410 MaybeStartTimerLocked();
411 }
412
CheckSocketsLocked()413 void AresResolver::CheckSocketsLocked() {
414 FdNodeList new_list;
415 if (!shutting_down_) {
416 ares_socket_t socks[ARES_GETSOCK_MAXNUM] = {};
417 int socks_bitmask = ares_getsock(channel_, socks, ARES_GETSOCK_MAXNUM);
418 for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
419 if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
420 ARES_GETSOCK_WRITABLE(socks_bitmask, i)) {
421 auto iter = std::find_if(
422 fd_node_list_.begin(), fd_node_list_.end(),
423 [sock = socks[i]](const auto& node) { return node->as == sock; });
424 if (iter == fd_node_list_.end()) {
425 GRPC_TRACE_LOG(cares_resolver, INFO)
426 << "(EventEngine c-ares resolver) resolver:" << this
427 << " new fd: " << socks[i];
428 new_list.push_back(std::make_unique<FdNode>(
429 socks[i], polled_fd_factory_->NewGrpcPolledFdLocked(socks[i])));
430 } else {
431 new_list.splice(new_list.end(), fd_node_list_, iter);
432 }
433 FdNode* fd_node = new_list.back().get();
434 if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
435 !fd_node->readable_registered) {
436 fd_node->readable_registered = true;
437 if (fd_node->polled_fd->IsFdStillReadableLocked()) {
438 // If c-ares is interested to read and the socket already has data
439 // available for read, schedules OnReadable directly here. This is
440 // to cope with the edge-triggered poller not getting an event if no
441 // new data arrives and c-ares hasn't read all the data in the
442 // previous ares_process_fd.
443 GRPC_TRACE_LOG(cares_resolver, INFO)
444 << "(EventEngine c-ares resolver) resolver:" << this
445 << " schedule read directly on: " << fd_node->as;
446 event_engine_->Run(
447 [self = Ref(DEBUG_LOCATION, "CheckSocketsLocked"),
448 fd_node]() mutable {
449 static_cast<AresResolver*>(self.get())
450 ->OnReadable(fd_node, absl::OkStatus());
451 });
452 } else {
453 // Otherwise register with the poller for readable event.
454 GRPC_TRACE_LOG(cares_resolver, INFO)
455 << "(EventEngine c-ares resolver) resolver:" << this
456 << " notify read on: " << fd_node->as;
457 fd_node->polled_fd->RegisterForOnReadableLocked(
458 [self = Ref(DEBUG_LOCATION, "CheckSocketsLocked"),
459 fd_node](absl::Status status) mutable {
460 static_cast<AresResolver*>(self.get())
461 ->OnReadable(fd_node, status);
462 });
463 }
464 }
465 // Register write_closure if the socket is writable and write_closure
466 // has not been registered with this socket.
467 if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
468 !fd_node->writable_registered) {
469 GRPC_TRACE_LOG(cares_resolver, INFO)
470 << "(EventEngine c-ares resolver) resolver:" << this
471 << " notify write on: " << fd_node->as;
472 fd_node->writable_registered = true;
473 fd_node->polled_fd->RegisterForOnWriteableLocked(
474 [self = Ref(DEBUG_LOCATION, "CheckSocketsLocked"),
475 fd_node](absl::Status status) mutable {
476 static_cast<AresResolver*>(self.get())
477 ->OnWritable(fd_node, status);
478 });
479 }
480 }
481 }
482 }
483 // Any remaining fds in fd_node_list_ were not returned by ares_getsock()
484 // and are therefore no longer in use, so they can be shut down and removed
485 // from the list.
486 // TODO(yijiem): Since we are keeping the underlying socket opened for both
487 // Posix and Windows, it might be reasonable to also keep the FdNodes alive
488 // till the end. But we need to change the state management of FdNodes in this
489 // file. This may simplify the code a bit.
490 while (!fd_node_list_.empty()) {
491 FdNode* fd_node = fd_node_list_.front().get();
492 if (!fd_node->already_shutdown) {
493 GRPC_TRACE_LOG(cares_resolver, INFO)
494 << "(EventEngine c-ares resolver) resolver: " << this
495 << " shutdown fd: " << fd_node->polled_fd->GetName();
496 fd_node->already_shutdown =
497 fd_node->polled_fd->ShutdownLocked(absl::OkStatus());
498 }
499 if (!fd_node->readable_registered && !fd_node->writable_registered) {
500 GRPC_TRACE_LOG(cares_resolver, INFO)
501 << "(EventEngine c-ares resolver) resolver: " << this
502 << " delete fd: " << fd_node->polled_fd->GetName();
503 fd_node_list_.pop_front();
504 } else {
505 new_list.splice(new_list.end(), fd_node_list_, fd_node_list_.begin());
506 }
507 }
508 fd_node_list_ = std::move(new_list);
509 }
510
MaybeStartTimerLocked()511 void AresResolver::MaybeStartTimerLocked() {
512 if (ares_backup_poll_alarm_handle_.has_value()) {
513 return;
514 }
515 // Initialize the backup poll alarm
516 GRPC_TRACE_LOG(cares_resolver, INFO)
517 << "(EventEngine c-ares resolver) request:" << this
518 << " MaybeStartTimerLocked next ares process poll time in "
519 << Milliseconds(kAresBackupPollAlarmDuration) << " ms";
520 ares_backup_poll_alarm_handle_ = event_engine_->RunAfter(
521 kAresBackupPollAlarmDuration,
522 [self = Ref(DEBUG_LOCATION, "MaybeStartTimerLocked")]() {
523 static_cast<AresResolver*>(self.get())->OnAresBackupPollAlarm();
524 });
525 }
526
OnReadable(FdNode * fd_node,absl::Status status)527 void AresResolver::OnReadable(FdNode* fd_node, absl::Status status) {
528 grpc_core::MutexLock lock(&mutex_);
529 CHECK(fd_node->readable_registered);
530 fd_node->readable_registered = false;
531 GRPC_TRACE_LOG(cares_resolver, INFO)
532 << "(EventEngine c-ares resolver) OnReadable: fd: " << fd_node->as
533 << "; request: " << this << "; status: " << status;
534 if (status.ok() && !shutting_down_) {
535 ares_process_fd(channel_, fd_node->as, ARES_SOCKET_BAD);
536 } else {
537 // If error is not absl::OkStatus() or the resolution was cancelled, it
538 // means the fd has been shutdown or timed out. The pending lookups made
539 // on this request will be cancelled by the following ares_cancel(). The
540 // remaining file descriptors in this request will be cleaned up in the
541 // following Work() method.
542 ares_cancel(channel_);
543 }
544 CheckSocketsLocked();
545 }
546
OnWritable(FdNode * fd_node,absl::Status status)547 void AresResolver::OnWritable(FdNode* fd_node, absl::Status status) {
548 grpc_core::MutexLock lock(&mutex_);
549 CHECK(fd_node->writable_registered);
550 fd_node->writable_registered = false;
551 GRPC_TRACE_LOG(cares_resolver, INFO)
552 << "(EventEngine c-ares resolver) OnWritable: fd: " << fd_node->as
553 << "; request:" << this << "; status: " << status;
554 if (status.ok() && !shutting_down_) {
555 ares_process_fd(channel_, ARES_SOCKET_BAD, fd_node->as);
556 } else {
557 // If error is not absl::OkStatus() or the resolution was cancelled, it
558 // means the fd has been shutdown or timed out. The pending lookups made
559 // on this request will be cancelled by the following ares_cancel(). The
560 // remaining file descriptors in this request will be cleaned up in the
561 // following Work() method.
562 ares_cancel(channel_);
563 }
564 CheckSocketsLocked();
565 }
566
567 // In case of non-responsive DNS servers, dropped packets, etc., c-ares has
568 // intelligent timeout and retry logic, which we can take advantage of by
569 // polling ares_process_fd on time intervals. Overall, the c-ares library is
570 // meant to be called into and given a chance to proceed name resolution:
571 // a) when fd events happen
572 // b) when some time has passed without fd events having happened
573 // For the latter, we use this backup poller. Also see
574 // https://github.com/grpc/grpc/pull/17688 description for more details.
OnAresBackupPollAlarm()575 void AresResolver::OnAresBackupPollAlarm() {
576 grpc_core::MutexLock lock(&mutex_);
577 ares_backup_poll_alarm_handle_.reset();
578 GRPC_TRACE_LOG(cares_resolver, INFO)
579 << "(EventEngine c-ares resolver) request:" << this
580 << " OnAresBackupPollAlarm shutting_down=" << shutting_down_;
581 if (!shutting_down_) {
582 for (const auto& fd_node : fd_node_list_) {
583 if (!fd_node->already_shutdown) {
584 GRPC_TRACE_LOG(cares_resolver, INFO)
585 << "(EventEngine c-ares resolver) request:" << this
586 << " OnAresBackupPollAlarm; ares_process_fd. fd="
587 << fd_node->polled_fd->GetName();
588 ares_socket_t as = fd_node->polled_fd->GetWrappedAresSocketLocked();
589 ares_process_fd(channel_, as, as);
590 }
591 }
592 MaybeStartTimerLocked();
593 CheckSocketsLocked();
594 }
595 }
596
OnHostbynameDoneLocked(void * arg,int status,int,struct hostent * hostent)597 void AresResolver::OnHostbynameDoneLocked(void* arg, int status,
598 int /*timeouts*/,
599 struct hostent* hostent) {
600 auto* hostname_qa = static_cast<HostnameQueryArg*>(arg);
601 CHECK_GT(hostname_qa->pending_requests--, 0);
602 auto* ares_resolver = hostname_qa->ares_resolver;
603 if (status != ARES_SUCCESS) {
604 std::string error_msg =
605 absl::StrFormat("address lookup failed for %s: %s",
606 hostname_qa->query_name, ares_strerror(status));
607 GRPC_TRACE_LOG(cares_resolver, INFO)
608 << "(EventEngine c-ares resolver) resolver:" << ares_resolver
609 << " OnHostbynameDoneLocked: " << error_msg;
610 hostname_qa->error_status = AresStatusToAbslStatus(status, error_msg);
611 } else {
612 GRPC_TRACE_LOG(cares_resolver, INFO)
613 << "(EventEngine c-ares resolver) resolver:" << ares_resolver
614 << " OnHostbynameDoneLocked name=" << hostname_qa->query_name
615 << " ARES_SUCCESS";
616 for (size_t i = 0; hostent->h_addr_list[i] != nullptr; i++) {
617 if (hostname_qa->result.size() == kMaxRecordSize) {
618 LOG(ERROR) << "A/AAAA response exceeds maximum record size of 65536";
619 break;
620 }
621 switch (hostent->h_addrtype) {
622 case AF_INET6: {
623 size_t addr_len = sizeof(struct sockaddr_in6);
624 struct sockaddr_in6 addr;
625 memset(&addr, 0, addr_len);
626 memcpy(&addr.sin6_addr, hostent->h_addr_list[i],
627 sizeof(struct in6_addr));
628 addr.sin6_family = static_cast<unsigned char>(hostent->h_addrtype);
629 addr.sin6_port = htons(hostname_qa->port);
630 hostname_qa->result.emplace_back(
631 reinterpret_cast<const sockaddr*>(&addr), addr_len);
632 char output[INET6_ADDRSTRLEN];
633 ares_inet_ntop(AF_INET6, &addr.sin6_addr, output, INET6_ADDRSTRLEN);
634 GRPC_TRACE_LOG(cares_resolver, INFO)
635 << "(EventEngine c-ares resolver) resolver:" << ares_resolver
636 << " c-ares resolver gets a AF_INET6 result: \n addr: " << output
637 << "\n port: " << hostname_qa->port
638 << "\n sin6_scope_id: " << addr.sin6_scope_id;
639 break;
640 }
641 case AF_INET: {
642 size_t addr_len = sizeof(struct sockaddr_in);
643 struct sockaddr_in addr;
644 memset(&addr, 0, addr_len);
645 memcpy(&addr.sin_addr, hostent->h_addr_list[i],
646 sizeof(struct in_addr));
647 addr.sin_family = static_cast<unsigned char>(hostent->h_addrtype);
648 addr.sin_port = htons(hostname_qa->port);
649 hostname_qa->result.emplace_back(
650 reinterpret_cast<const sockaddr*>(&addr), addr_len);
651 char output[INET_ADDRSTRLEN];
652 ares_inet_ntop(AF_INET, &addr.sin_addr, output, INET_ADDRSTRLEN);
653 GRPC_TRACE_LOG(cares_resolver, INFO)
654 << "(EventEngine c-ares resolver) resolver:" << ares_resolver
655 << " c-ares resolver gets a AF_INET result: \n addr: " << output
656 << "\n port: " << hostname_qa->port;
657 break;
658 }
659 default:
660 grpc_core::Crash(
661 absl::StrFormat("resolver:%p Received invalid type of address %d",
662 ares_resolver, hostent->h_addrtype));
663 }
664 }
665 }
666 if (hostname_qa->pending_requests == 0) {
667 auto nh =
668 ares_resolver->callback_map_.extract(hostname_qa->callback_map_id);
669 CHECK(!nh.empty());
670 CHECK(absl::holds_alternative<
671 EventEngine::DNSResolver::LookupHostnameCallback>(nh.mapped()));
672 auto callback = absl::get<EventEngine::DNSResolver::LookupHostnameCallback>(
673 std::move(nh.mapped()));
674 if (!hostname_qa->result.empty() || hostname_qa->error_status.ok()) {
675 ares_resolver->event_engine_->Run(
676 [callback = std::move(callback),
677 result = SortAddresses(hostname_qa->result)]() mutable {
678 callback(std::move(result));
679 });
680 } else {
681 ares_resolver->event_engine_->Run(
682 [callback = std::move(callback),
683 result = std::move(hostname_qa->error_status)]() mutable {
684 callback(std::move(result));
685 });
686 }
687 delete hostname_qa;
688 }
689 }
690
OnSRVQueryDoneLocked(void * arg,int status,int,unsigned char * abuf,int alen)691 void AresResolver::OnSRVQueryDoneLocked(void* arg, int status, int /*timeouts*/,
692 unsigned char* abuf, int alen) {
693 std::unique_ptr<QueryArg> qa(static_cast<QueryArg*>(arg));
694 auto* ares_resolver = qa->ares_resolver;
695 auto nh = ares_resolver->callback_map_.extract(qa->callback_map_id);
696 CHECK(!nh.empty());
697 CHECK(absl::holds_alternative<EventEngine::DNSResolver::LookupSRVCallback>(
698 nh.mapped()));
699 auto callback = absl::get<EventEngine::DNSResolver::LookupSRVCallback>(
700 std::move(nh.mapped()));
701 auto fail = [&](absl::string_view prefix) {
702 std::string error_message = absl::StrFormat(
703 "%s for %s: %s", prefix, qa->query_name, ares_strerror(status));
704 GRPC_TRACE_LOG(cares_resolver, INFO)
705 << "(EventEngine c-ares resolver) OnSRVQueryDoneLocked: "
706 << error_message;
707 ares_resolver->event_engine_->Run(
708 [callback = std::move(callback),
709 status = AresStatusToAbslStatus(status, error_message)]() mutable {
710 callback(status);
711 });
712 };
713 if (status != ARES_SUCCESS) {
714 fail("SRV lookup failed");
715 return;
716 }
717 GRPC_TRACE_LOG(cares_resolver, INFO)
718 << "(EventEngine c-ares resolver) resolver:" << ares_resolver
719 << " OnSRVQueryDoneLocked name=" << qa->query_name << " ARES_SUCCESS";
720 struct ares_srv_reply* reply = nullptr;
721 status = ares_parse_srv_reply(abuf, alen, &reply);
722 GRPC_TRACE_LOG(cares_resolver, INFO)
723 << "(EventEngine c-ares resolver) resolver:" << ares_resolver
724 << " ares_parse_srv_reply: " << status;
725 if (status != ARES_SUCCESS) {
726 fail("Failed to parse SRV reply");
727 return;
728 }
729 std::vector<EventEngine::DNSResolver::SRVRecord> result;
730 for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr;
731 srv_it = srv_it->next) {
732 if (result.size() == kMaxRecordSize) {
733 LOG(ERROR) << "SRV response exceeds maximum record size of 65536";
734 break;
735 }
736 EventEngine::DNSResolver::SRVRecord record;
737 record.host = srv_it->host;
738 record.port = srv_it->port;
739 record.priority = srv_it->priority;
740 record.weight = srv_it->weight;
741 result.push_back(std::move(record));
742 }
743 if (reply != nullptr) {
744 ares_free_data(reply);
745 }
746 ares_resolver->event_engine_->Run(
747 [callback = std::move(callback), result = std::move(result)]() mutable {
748 callback(std::move(result));
749 });
750 }
751
OnTXTDoneLocked(void * arg,int status,int,unsigned char * buf,int len)752 void AresResolver::OnTXTDoneLocked(void* arg, int status, int /*timeouts*/,
753 unsigned char* buf, int len) {
754 std::unique_ptr<QueryArg> qa(static_cast<QueryArg*>(arg));
755 auto* ares_resolver = qa->ares_resolver;
756 auto nh = ares_resolver->callback_map_.extract(qa->callback_map_id);
757 CHECK(!nh.empty());
758 CHECK(absl::holds_alternative<EventEngine::DNSResolver::LookupTXTCallback>(
759 nh.mapped()));
760 auto callback = absl::get<EventEngine::DNSResolver::LookupTXTCallback>(
761 std::move(nh.mapped()));
762 auto fail = [&](absl::string_view prefix) {
763 std::string error_message = absl::StrFormat(
764 "%s for %s: %s", prefix, qa->query_name, ares_strerror(status));
765 GRPC_TRACE_LOG(cares_resolver, INFO)
766 << "(EventEngine c-ares resolver) resolver:" << ares_resolver
767 << " OnTXTDoneLocked: " << error_message;
768 ares_resolver->event_engine_->Run(
769 [callback = std::move(callback),
770 status = AresStatusToAbslStatus(status, error_message)]() mutable {
771 callback(status);
772 });
773 };
774 if (status != ARES_SUCCESS) {
775 fail("TXT lookup failed");
776 return;
777 }
778 GRPC_TRACE_LOG(cares_resolver, INFO)
779 << "(EventEngine c-ares resolver) resolver:" << ares_resolver
780 << " OnTXTDoneLocked name=" << qa->query_name << " ARES_SUCCESS";
781 struct ares_txt_ext* reply = nullptr;
782 status = ares_parse_txt_reply_ext(buf, len, &reply);
783 if (status != ARES_SUCCESS) {
784 fail("Failed to parse TXT result");
785 return;
786 }
787 std::vector<std::string> result;
788 for (struct ares_txt_ext* part = reply; part != nullptr; part = part->next) {
789 if (part->record_start) {
790 result.emplace_back(reinterpret_cast<char*>(part->txt), part->length);
791 } else {
792 absl::StrAppend(
793 &result.back(),
794 std::string(reinterpret_cast<char*>(part->txt), part->length));
795 }
796 }
797 GRPC_TRACE_LOG(cares_resolver, INFO)
798 << "(EventEngine c-ares resolver) resolver:" << ares_resolver << " Got "
799 << result.size() << " TXT records";
800 if (GRPC_TRACE_FLAG_ENABLED(cares_resolver)) {
801 for (const auto& record : result) {
802 LOG(INFO) << record;
803 }
804 }
805 // Clean up.
806 ares_free_data(reply);
807 ares_resolver->event_engine_->Run(
808 [callback = std::move(callback), result = std::move(result)]() mutable {
809 callback(std::move(result));
810 });
811 }
812
813 } // namespace experimental
814 } // namespace grpc_event_engine
815
noop_inject_channel_config(ares_channel *)816 void noop_inject_channel_config(ares_channel* /*channel*/) {}
817
818 void (*event_engine_grpc_ares_test_only_inject_config)(ares_channel* channel) =
819 noop_inject_channel_config;
820
821 bool g_event_engine_grpc_ares_test_only_force_tcp = false;
822
ShouldUseAresDnsResolver()823 bool ShouldUseAresDnsResolver() {
824 #if defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) || \
825 defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
826 auto resolver_env = grpc_core::ConfigVars::Get().DnsResolver();
827 return resolver_env.empty() || absl::EqualsIgnoreCase(resolver_env, "ares");
828 #else // defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) ||
829 // defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
830 return false;
831 #endif // defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) ||
832 // defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
833 }
834
AresInit()835 absl::Status AresInit() {
836 if (ShouldUseAresDnsResolver()) {
837 // ares_library_init and ares_library_cleanup are currently no-op except
838 // under Windows. Calling them may cause race conditions when other parts of
839 // the binary calls these functions concurrently.
840 #ifdef GPR_WINDOWS
841 int status = ares_library_init(ARES_LIB_INIT_ALL);
842 if (status != ARES_SUCCESS) {
843 return GRPC_ERROR_CREATE(
844 absl::StrCat("ares_library_init failed: ", ares_strerror(status)));
845 }
846 #endif // GPR_WINDOWS
847 }
848 return absl::OkStatus();
849 }
AresShutdown()850 void AresShutdown() {
851 if (ShouldUseAresDnsResolver()) {
852 // ares_library_init and ares_library_cleanup are currently no-op except
853 // under Windows. Calling them may cause race conditions when other parts of
854 // the binary calls these functions concurrently.
855 #ifdef GPR_WINDOWS
856 ares_library_cleanup();
857 #endif // GPR_WINDOWS
858 }
859 }
860
861 #else // GRPC_ARES == 1
862
ShouldUseAresDnsResolver()863 bool ShouldUseAresDnsResolver() { return false; }
AresInit()864 absl::Status AresInit() { return absl::OkStatus(); }
AresShutdown()865 void AresShutdown() {}
866
867 #endif // GRPC_ARES == 1
868