1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/grpc.h>
21 #include <grpc/impl/channel_arg_names.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/atm.h>
24 #include <grpc/support/sync.h>
25 #include <grpc/support/time.h>
26 #include <inttypes.h>
27
28 #include <functional>
29 #include <memory>
30 #include <string>
31 #include <utility>
32 #include <vector>
33
34 #include "absl/log/log.h"
35 #include "absl/status/status.h"
36 #include "absl/status/statusor.h"
37 #include "absl/strings/string_view.h"
38 #include "gtest/gtest.h"
39 #include "src/core/config/core_configuration.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/event_engine/default_event_engine.h"
42 #include "src/core/lib/experiments/experiments.h"
43 #include "src/core/lib/iomgr/closure.h"
44 #include "src/core/lib/iomgr/error.h"
45 #include "src/core/lib/iomgr/exec_ctx.h"
46 #include "src/core/lib/iomgr/iomgr_fwd.h"
47 #include "src/core/lib/iomgr/pollset.h"
48 #include "src/core/lib/iomgr/pollset_set.h"
49 #include "src/core/lib/iomgr/resolve_address.h"
50 #include "src/core/lib/iomgr/resolved_address.h"
51 #include "src/core/resolver/dns/c_ares/grpc_ares_wrapper.h"
52 #include "src/core/resolver/endpoint_addresses.h"
53 #include "src/core/resolver/resolver.h"
54 #include "src/core/resolver/resolver_factory.h"
55 #include "src/core/resolver/resolver_registry.h"
56 #include "src/core/util/debug_location.h"
57 #include "src/core/util/no_destruct.h"
58 #include "src/core/util/notification.h"
59 #include "src/core/util/orphanable.h"
60 #include "src/core/util/time.h"
61 #include "src/core/util/uri.h"
62 #include "src/core/util/work_serializer.h"
63 #include "test/core/test_util/test_config.h"
64
65 using ::grpc_event_engine::experimental::GetDefaultEventEngine;
66
67 constexpr int kMinResolutionPeriodMs = 1000;
68
69 static std::shared_ptr<grpc_core::WorkSerializer>* g_work_serializer;
70
71 static grpc_ares_request* (*g_default_dns_lookup_ares)(
72 const char* dns_server, const char* name, const char* default_port,
73 grpc_pollset_set* interested_parties, grpc_closure* on_done,
74 std::unique_ptr<grpc_core::EndpointAddressesList>* addresses,
75 int query_timeout_ms);
76
77 // Counter incremented by TestDNSResolver::LookupHostname indicating the
78 // number of times a system-level resolution has happened.
79 static int g_resolution_count;
80
81 static struct iomgr_args {
82 gpr_event ev;
83 gpr_atm done_atm;
84 gpr_mu* mu;
85 grpc_pollset* pollset;
86 grpc_pollset_set* pollset_set;
87 } g_iomgr_args;
88
89 namespace {
90
91 class TestDNSResolver : public grpc_core::DNSResolver {
92 public:
TestDNSResolver(std::shared_ptr<grpc_core::DNSResolver> default_resolver)93 explicit TestDNSResolver(
94 std::shared_ptr<grpc_core::DNSResolver> default_resolver)
95 : default_resolver_(std::move(default_resolver)),
96 engine_(GetDefaultEventEngine()) {}
97 // Wrapper around default resolve_address in order to count the number of
98 // times we incur in a system-level name resolution.
LookupHostname(std::function<void (absl::StatusOr<std::vector<grpc_resolved_address>>)> on_resolved,absl::string_view name,absl::string_view default_port,grpc_core::Duration timeout,grpc_pollset_set * interested_parties,absl::string_view name_server)99 TaskHandle LookupHostname(
100 std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
101 on_resolved,
102 absl::string_view name, absl::string_view default_port,
103 grpc_core::Duration timeout, grpc_pollset_set* interested_parties,
104 absl::string_view name_server) override {
105 auto result = default_resolver_->LookupHostname(
106 std::move(on_resolved), name, default_port, timeout, interested_parties,
107 name_server);
108 ++g_resolution_count;
109 static grpc_core::Timestamp last_resolution_time =
110 grpc_core::Timestamp::ProcessEpoch();
111 if (last_resolution_time == grpc_core::Timestamp::ProcessEpoch()) {
112 last_resolution_time = grpc_core::Timestamp::FromTimespecRoundUp(
113 gpr_now(GPR_CLOCK_MONOTONIC));
114 } else {
115 auto now = grpc_core::Timestamp::FromTimespecRoundUp(
116 gpr_now(GPR_CLOCK_MONOTONIC));
117 EXPECT_GE(now - last_resolution_time,
118 grpc_core::Duration::Milliseconds(kMinResolutionPeriodMs));
119 last_resolution_time = now;
120 }
121 // For correct time diff comparisons, make sure that any subsequent calls
122 // to grpc_core::Timestamp::Now() on this thread don't return a time
123 // which is earlier than that returned by the call(s) to
124 // gpr_now(GPR_CLOCK_MONOTONIC) within this function. This is important
125 // because the resolver's last_resolution_timestamp_ will be taken from
126 // grpc_core::Timestamp::Now() right after this returns.
127 grpc_core::ExecCtx::Get()->InvalidateNow();
128 return result;
129 }
130
LookupHostnameBlocking(absl::string_view name,absl::string_view default_port)131 absl::StatusOr<std::vector<grpc_resolved_address>> LookupHostnameBlocking(
132 absl::string_view name, absl::string_view default_port) override {
133 return default_resolver_->LookupHostnameBlocking(name, default_port);
134 }
135
LookupSRV(std::function<void (absl::StatusOr<std::vector<grpc_resolved_address>>)> on_resolved,absl::string_view,grpc_core::Duration,grpc_pollset_set *,absl::string_view)136 TaskHandle LookupSRV(
137 std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
138 on_resolved,
139 absl::string_view /* name */, grpc_core::Duration /* timeout */,
140 grpc_pollset_set* /* interested_parties */,
141 absl::string_view /* name_server */) override {
142 engine_->Run([on_resolved] {
143 grpc_core::ApplicationCallbackExecCtx app_exec_ctx;
144 grpc_core::ExecCtx exec_ctx;
145 on_resolved(absl::UnimplementedError(
146 "The Testing DNS resolver does not support looking up SRV records"));
147 });
148 return {-1, -1};
149 };
150
LookupTXT(std::function<void (absl::StatusOr<std::string>)> on_resolved,absl::string_view,grpc_core::Duration,grpc_pollset_set *,absl::string_view)151 TaskHandle LookupTXT(
152 std::function<void(absl::StatusOr<std::string>)> on_resolved,
153 absl::string_view /* name */, grpc_core::Duration /* timeout */,
154 grpc_pollset_set* /* interested_parties */,
155 absl::string_view /* name_server */) override {
156 // Not supported
157 engine_->Run([on_resolved] {
158 grpc_core::ApplicationCallbackExecCtx app_exec_ctx;
159 grpc_core::ExecCtx exec_ctx;
160 on_resolved(absl::UnimplementedError(
161 "The Testing DNS resolver does not support looking up TXT records"));
162 });
163 return {-1, -1};
164 };
165
166 // Not cancellable
Cancel(TaskHandle)167 bool Cancel(TaskHandle /*handle*/) override { return false; }
168
169 private:
170 std::shared_ptr<grpc_core::DNSResolver> default_resolver_;
171 std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
172 };
173
174 } // namespace
175
test_dns_lookup_ares(const char * dns_server,const char * name,const char * default_port,grpc_pollset_set *,grpc_closure * on_done,std::unique_ptr<grpc_core::EndpointAddressesList> * addresses,int query_timeout_ms)176 static grpc_ares_request* test_dns_lookup_ares(
177 const char* dns_server, const char* name, const char* default_port,
178 grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done,
179 std::unique_ptr<grpc_core::EndpointAddressesList>* addresses,
180 int query_timeout_ms) {
181 // A records should suffice
182 grpc_ares_request* result = g_default_dns_lookup_ares(
183 dns_server, name, default_port, g_iomgr_args.pollset_set, on_done,
184 addresses, query_timeout_ms);
185 ++g_resolution_count;
186 static auto last_resolution_time = grpc_core::Timestamp::ProcessEpoch();
187 auto now =
188 grpc_core::Timestamp::FromTimespecRoundUp(gpr_now(GPR_CLOCK_MONOTONIC));
189 VLOG(2) << "last_resolution_time:"
190 << last_resolution_time.milliseconds_after_process_epoch()
191 << " now:" << now.milliseconds_after_process_epoch()
192 << " min_time_between:" << kMinResolutionPeriodMs;
193 if (last_resolution_time != grpc_core::Timestamp::ProcessEpoch()) {
194 EXPECT_GE(now - last_resolution_time,
195 grpc_core::Duration::Milliseconds(kMinResolutionPeriodMs));
196 }
197 last_resolution_time = now;
198 // For correct time diff comparisons, make sure that any subsequent calls
199 // to grpc_core::Timestamp::Now() on this thread don't return a time
200 // which is earlier than that returned by the call(s) to
201 // gpr_now(GPR_CLOCK_MONOTONIC) within this function. This is important
202 // because the resolver's last_resolution_timestamp_ will be taken from
203 // grpc_core::Timestamp::Now() right after this returns.
204 grpc_core::ExecCtx::Get()->InvalidateNow();
205 return result;
206 }
207
test_deadline(void)208 static gpr_timespec test_deadline(void) {
209 return grpc_timeout_seconds_to_deadline(100);
210 }
211
do_nothing(void *,grpc_error_handle)212 static void do_nothing(void* /*arg*/, grpc_error_handle /*error*/) {}
213
iomgr_args_init(iomgr_args * args)214 static void iomgr_args_init(iomgr_args* args) {
215 gpr_event_init(&args->ev);
216 args->pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
217 grpc_pollset_init(args->pollset, &args->mu);
218 args->pollset_set = grpc_pollset_set_create();
219 grpc_pollset_set_add_pollset(args->pollset_set, args->pollset);
220 gpr_atm_rel_store(&args->done_atm, 0);
221 }
222
iomgr_args_finish(iomgr_args * args)223 static void iomgr_args_finish(iomgr_args* args) {
224 ASSERT_TRUE(gpr_event_wait(&args->ev, test_deadline()));
225 grpc_pollset_set_del_pollset(args->pollset_set, args->pollset);
226 grpc_pollset_set_destroy(args->pollset_set);
227 grpc_closure do_nothing_cb;
228 GRPC_CLOSURE_INIT(&do_nothing_cb, do_nothing, nullptr,
229 grpc_schedule_on_exec_ctx);
230 gpr_mu_lock(args->mu);
231 grpc_pollset_shutdown(args->pollset, &do_nothing_cb);
232 gpr_mu_unlock(args->mu);
233 // exec_ctx needs to be flushed before calling grpc_pollset_destroy()
234 grpc_core::ExecCtx::Get()->Flush();
235 grpc_pollset_destroy(args->pollset);
236 gpr_free(args->pollset);
237 }
238
n_sec_deadline(int seconds)239 static grpc_core::Timestamp n_sec_deadline(int seconds) {
240 return grpc_core::Timestamp::FromTimespecRoundUp(
241 grpc_timeout_seconds_to_deadline(seconds));
242 }
243
poll_pollset_until_request_done(iomgr_args * args)244 static void poll_pollset_until_request_done(iomgr_args* args) {
245 grpc_core::ExecCtx exec_ctx;
246 grpc_core::Timestamp deadline = n_sec_deadline(10);
247 while (true) {
248 bool done = gpr_atm_acq_load(&args->done_atm) != 0;
249 if (done) {
250 break;
251 }
252 grpc_core::Duration time_left = deadline - grpc_core::Timestamp::Now();
253 VLOG(2) << "done=" << done << ", time_left=" << time_left.millis();
254 ASSERT_GE(time_left, grpc_core::Duration::Zero());
255 grpc_pollset_worker* worker = nullptr;
256 gpr_mu_lock(args->mu);
257 GRPC_LOG_IF_ERROR("pollset_work", grpc_pollset_work(args->pollset, &worker,
258 n_sec_deadline(1)));
259 gpr_mu_unlock(args->mu);
260 grpc_core::ExecCtx::Get()->Flush();
261 }
262 gpr_event_set(&args->ev, reinterpret_cast<void*>(1));
263 }
264
265 struct OnResolutionCallbackArg;
266
267 class ResultHandler : public grpc_core::Resolver::ResultHandler {
268 public:
269 using ResultCallback = void (*)(OnResolutionCallbackArg* state);
270
SetCallback(ResultCallback result_cb,OnResolutionCallbackArg * state)271 void SetCallback(ResultCallback result_cb, OnResolutionCallbackArg* state) {
272 ASSERT_EQ(result_cb_, nullptr);
273 result_cb_ = result_cb;
274 ASSERT_EQ(state_, nullptr);
275 state_ = state;
276 }
277
ReportResult(grpc_core::Resolver::Result result)278 void ReportResult(grpc_core::Resolver::Result result) override {
279 if (result.result_health_callback != nullptr) {
280 result.result_health_callback(absl::OkStatus());
281 }
282 ASSERT_NE(result_cb_, nullptr);
283 ASSERT_NE(state_, nullptr);
284 ResultCallback cb = result_cb_;
285 OnResolutionCallbackArg* state = state_;
286 result_cb_ = nullptr;
287 state_ = nullptr;
288 cb(state);
289 }
290
291 private:
292 ResultCallback result_cb_ = nullptr;
293 OnResolutionCallbackArg* state_ = nullptr;
294 };
295
296 struct OnResolutionCallbackArg {
297 const char* uri_str = nullptr;
298 grpc_core::OrphanablePtr<grpc_core::Resolver> resolver;
299 ResultHandler* result_handler;
300 };
301
302 // Set to true by the last callback in the resolution chain.
303 static grpc_core::NoDestruct<grpc_core::Notification> g_all_callbacks_invoked;
304
305 // It's interesting to run a few rounds of this test because as
306 // we run more rounds, the base starting time
307 // (i.e. ExecCtx g_start_time) gets further and further away
308 // from "Now()". Thus the more rounds ran, the more highlighted the
309 // difference is between absolute and relative times values.
on_fourth_resolution(OnResolutionCallbackArg * cb_arg)310 static void on_fourth_resolution(OnResolutionCallbackArg* cb_arg) {
311 LOG(INFO) << "4th: g_resolution_count: " << g_resolution_count;
312 ASSERT_EQ(g_resolution_count, 4);
313 cb_arg->resolver.reset();
314 gpr_atm_rel_store(&g_iomgr_args.done_atm, 1);
315 gpr_mu_lock(g_iomgr_args.mu);
316 GRPC_LOG_IF_ERROR("pollset_kick",
317 grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
318 gpr_mu_unlock(g_iomgr_args.mu);
319 delete cb_arg;
320 g_all_callbacks_invoked->Notify();
321 }
322
on_third_resolution(OnResolutionCallbackArg * cb_arg)323 static void on_third_resolution(OnResolutionCallbackArg* cb_arg) {
324 LOG(INFO) << "3rd: g_resolution_count: " << g_resolution_count;
325 ASSERT_EQ(g_resolution_count, 3);
326 cb_arg->result_handler->SetCallback(on_fourth_resolution, cb_arg);
327 cb_arg->resolver->RequestReresolutionLocked();
328 gpr_mu_lock(g_iomgr_args.mu);
329 GRPC_LOG_IF_ERROR("pollset_kick",
330 grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
331 gpr_mu_unlock(g_iomgr_args.mu);
332 }
333
on_second_resolution(OnResolutionCallbackArg * cb_arg)334 static void on_second_resolution(OnResolutionCallbackArg* cb_arg) {
335 LOG(INFO) << "2nd: g_resolution_count: " << g_resolution_count;
336 // The resolution callback was not invoked until new data was
337 // available, which was delayed until after the cooldown period.
338 ASSERT_EQ(g_resolution_count, 2);
339 cb_arg->result_handler->SetCallback(on_third_resolution, cb_arg);
340 cb_arg->resolver->RequestReresolutionLocked();
341 gpr_mu_lock(g_iomgr_args.mu);
342 GRPC_LOG_IF_ERROR("pollset_kick",
343 grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
344 gpr_mu_unlock(g_iomgr_args.mu);
345 }
346
on_first_resolution(OnResolutionCallbackArg * cb_arg)347 static void on_first_resolution(OnResolutionCallbackArg* cb_arg) {
348 LOG(INFO) << "1st: g_resolution_count: " << g_resolution_count;
349 // There's one initial system-level resolution and one invocation of a
350 // notification callback (the current function).
351 ASSERT_EQ(g_resolution_count, 1);
352 cb_arg->result_handler->SetCallback(on_second_resolution, cb_arg);
353 cb_arg->resolver->RequestReresolutionLocked();
354 gpr_mu_lock(g_iomgr_args.mu);
355 GRPC_LOG_IF_ERROR("pollset_kick",
356 grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
357 gpr_mu_unlock(g_iomgr_args.mu);
358 }
359
start_test_under_work_serializer(void * arg)360 static void start_test_under_work_serializer(void* arg) {
361 OnResolutionCallbackArg* res_cb_arg =
362 static_cast<OnResolutionCallbackArg*>(arg);
363 res_cb_arg->result_handler = new ResultHandler();
364 grpc_core::ResolverFactory* factory = grpc_core::CoreConfiguration::Get()
365 .resolver_registry()
366 .LookupResolverFactory("dns");
367 absl::StatusOr<grpc_core::URI> uri =
368 grpc_core::URI::Parse(res_cb_arg->uri_str);
369 VLOG(2) << "test: '" << res_cb_arg->uri_str << "' should be valid for '"
370 << factory->scheme() << "'";
371 if (!uri.ok()) {
372 LOG(ERROR) << uri.status();
373 ASSERT_TRUE(uri.ok());
374 }
375 grpc_core::ResolverArgs args;
376 args.uri = std::move(*uri);
377 args.work_serializer = *g_work_serializer;
378 args.result_handler = std::unique_ptr<grpc_core::Resolver::ResultHandler>(
379 res_cb_arg->result_handler);
380 g_resolution_count = 0;
381
382 grpc_arg cooldown_arg = grpc_channel_arg_integer_create(
383 const_cast<char*>(GRPC_ARG_DNS_MIN_TIME_BETWEEN_RESOLUTIONS_MS),
384 kMinResolutionPeriodMs);
385 grpc_channel_args cooldown_args = {1, &cooldown_arg};
386 args.args = grpc_core::ChannelArgs::FromC(&cooldown_args);
387 args.args = args.args.SetObject(GetDefaultEventEngine());
388 res_cb_arg->resolver = factory->CreateResolver(std::move(args));
389 ASSERT_NE(res_cb_arg->resolver, nullptr);
390 // First resolution, would incur in system-level resolution.
391 res_cb_arg->result_handler->SetCallback(on_first_resolution, res_cb_arg);
392 res_cb_arg->resolver->StartLocked();
393 }
394
test_cooldown()395 static void test_cooldown() {
396 grpc_core::ExecCtx exec_ctx;
397 iomgr_args_init(&g_iomgr_args);
398 OnResolutionCallbackArg* res_cb_arg = new OnResolutionCallbackArg();
399 res_cb_arg->uri_str = "dns:127.0.0.1";
400
401 (*g_work_serializer)
402 ->Run([res_cb_arg]() { start_test_under_work_serializer(res_cb_arg); },
403 DEBUG_LOCATION);
404 grpc_core::ExecCtx::Get()->Flush();
405 poll_pollset_until_request_done(&g_iomgr_args);
406 iomgr_args_finish(&g_iomgr_args);
407 }
408
TEST(DnsResolverCooldownTest,MainTest)409 TEST(DnsResolverCooldownTest, MainTest) {
410 // TODO(yijiem): This test tests the cooldown behavior of the PollingResolver
411 // interface. To do that, it overrides the grpc_dns_lookup_hostname_ares
412 // function and overrides the iomgr's g_dns_resolver system. We would need to
413 // rewrite this test for EventEngine using a custom EE DNSResolver or adding
414 // to the resolver_fuzzer.
415 if (grpc_core::IsEventEngineDnsEnabled()) {
416 GTEST_SKIP() << "Not with event engine dns";
417 }
418 grpc_init();
419
420 auto work_serializer = std::make_shared<grpc_core::WorkSerializer>(
421 grpc_event_engine::experimental::GetDefaultEventEngine());
422 g_work_serializer = &work_serializer;
423
424 g_default_dns_lookup_ares = grpc_dns_lookup_hostname_ares;
425 grpc_dns_lookup_hostname_ares = test_dns_lookup_ares;
426 grpc_core::ResetDNSResolver(
427 std::make_unique<TestDNSResolver>(grpc_core::GetDNSResolver()));
428
429 test_cooldown();
430
431 grpc_shutdown();
432 g_all_callbacks_invoked->WaitForNotification();
433 }
434
main(int argc,char ** argv)435 int main(int argc, char** argv) {
436 grpc::testing::TestEnvironment env(&argc, argv);
437 ::testing::InitGoogleTest(&argc, argv);
438 return RUN_ALL_TESTS();
439 }
440