• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/grpc.h>
21 #include <grpc/impl/channel_arg_names.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/atm.h>
24 #include <grpc/support/sync.h>
25 #include <grpc/support/time.h>
26 #include <inttypes.h>
27 
28 #include <functional>
29 #include <memory>
30 #include <string>
31 #include <utility>
32 #include <vector>
33 
34 #include "absl/log/log.h"
35 #include "absl/status/status.h"
36 #include "absl/status/statusor.h"
37 #include "absl/strings/string_view.h"
38 #include "gtest/gtest.h"
39 #include "src/core/config/core_configuration.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/event_engine/default_event_engine.h"
42 #include "src/core/lib/experiments/experiments.h"
43 #include "src/core/lib/iomgr/closure.h"
44 #include "src/core/lib/iomgr/error.h"
45 #include "src/core/lib/iomgr/exec_ctx.h"
46 #include "src/core/lib/iomgr/iomgr_fwd.h"
47 #include "src/core/lib/iomgr/pollset.h"
48 #include "src/core/lib/iomgr/pollset_set.h"
49 #include "src/core/lib/iomgr/resolve_address.h"
50 #include "src/core/lib/iomgr/resolved_address.h"
51 #include "src/core/resolver/dns/c_ares/grpc_ares_wrapper.h"
52 #include "src/core/resolver/endpoint_addresses.h"
53 #include "src/core/resolver/resolver.h"
54 #include "src/core/resolver/resolver_factory.h"
55 #include "src/core/resolver/resolver_registry.h"
56 #include "src/core/util/debug_location.h"
57 #include "src/core/util/no_destruct.h"
58 #include "src/core/util/notification.h"
59 #include "src/core/util/orphanable.h"
60 #include "src/core/util/time.h"
61 #include "src/core/util/uri.h"
62 #include "src/core/util/work_serializer.h"
63 #include "test/core/test_util/test_config.h"
64 
65 using ::grpc_event_engine::experimental::GetDefaultEventEngine;
66 
67 constexpr int kMinResolutionPeriodMs = 1000;
68 
69 static std::shared_ptr<grpc_core::WorkSerializer>* g_work_serializer;
70 
71 static grpc_ares_request* (*g_default_dns_lookup_ares)(
72     const char* dns_server, const char* name, const char* default_port,
73     grpc_pollset_set* interested_parties, grpc_closure* on_done,
74     std::unique_ptr<grpc_core::EndpointAddressesList>* addresses,
75     int query_timeout_ms);
76 
77 // Counter incremented by TestDNSResolver::LookupHostname indicating the
78 // number of times a system-level resolution has happened.
79 static int g_resolution_count;
80 
81 static struct iomgr_args {
82   gpr_event ev;
83   gpr_atm done_atm;
84   gpr_mu* mu;
85   grpc_pollset* pollset;
86   grpc_pollset_set* pollset_set;
87 } g_iomgr_args;
88 
89 namespace {
90 
91 class TestDNSResolver : public grpc_core::DNSResolver {
92  public:
TestDNSResolver(std::shared_ptr<grpc_core::DNSResolver> default_resolver)93   explicit TestDNSResolver(
94       std::shared_ptr<grpc_core::DNSResolver> default_resolver)
95       : default_resolver_(std::move(default_resolver)),
96         engine_(GetDefaultEventEngine()) {}
97   // Wrapper around default resolve_address in order to count the number of
98   // times we incur in a system-level name resolution.
LookupHostname(std::function<void (absl::StatusOr<std::vector<grpc_resolved_address>>)> on_resolved,absl::string_view name,absl::string_view default_port,grpc_core::Duration timeout,grpc_pollset_set * interested_parties,absl::string_view name_server)99   TaskHandle LookupHostname(
100       std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
101           on_resolved,
102       absl::string_view name, absl::string_view default_port,
103       grpc_core::Duration timeout, grpc_pollset_set* interested_parties,
104       absl::string_view name_server) override {
105     auto result = default_resolver_->LookupHostname(
106         std::move(on_resolved), name, default_port, timeout, interested_parties,
107         name_server);
108     ++g_resolution_count;
109     static grpc_core::Timestamp last_resolution_time =
110         grpc_core::Timestamp::ProcessEpoch();
111     if (last_resolution_time == grpc_core::Timestamp::ProcessEpoch()) {
112       last_resolution_time = grpc_core::Timestamp::FromTimespecRoundUp(
113           gpr_now(GPR_CLOCK_MONOTONIC));
114     } else {
115       auto now = grpc_core::Timestamp::FromTimespecRoundUp(
116           gpr_now(GPR_CLOCK_MONOTONIC));
117       EXPECT_GE(now - last_resolution_time,
118                 grpc_core::Duration::Milliseconds(kMinResolutionPeriodMs));
119       last_resolution_time = now;
120     }
121     // For correct time diff comparisons, make sure that any subsequent calls
122     // to grpc_core::Timestamp::Now() on this thread don't return a time
123     // which is earlier than that returned by the call(s) to
124     // gpr_now(GPR_CLOCK_MONOTONIC) within this function. This is important
125     // because the resolver's last_resolution_timestamp_ will be taken from
126     // grpc_core::Timestamp::Now() right after this returns.
127     grpc_core::ExecCtx::Get()->InvalidateNow();
128     return result;
129   }
130 
LookupHostnameBlocking(absl::string_view name,absl::string_view default_port)131   absl::StatusOr<std::vector<grpc_resolved_address>> LookupHostnameBlocking(
132       absl::string_view name, absl::string_view default_port) override {
133     return default_resolver_->LookupHostnameBlocking(name, default_port);
134   }
135 
LookupSRV(std::function<void (absl::StatusOr<std::vector<grpc_resolved_address>>)> on_resolved,absl::string_view,grpc_core::Duration,grpc_pollset_set *,absl::string_view)136   TaskHandle LookupSRV(
137       std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
138           on_resolved,
139       absl::string_view /* name */, grpc_core::Duration /* timeout */,
140       grpc_pollset_set* /* interested_parties */,
141       absl::string_view /* name_server */) override {
142     engine_->Run([on_resolved] {
143       grpc_core::ApplicationCallbackExecCtx app_exec_ctx;
144       grpc_core::ExecCtx exec_ctx;
145       on_resolved(absl::UnimplementedError(
146           "The Testing DNS resolver does not support looking up SRV records"));
147     });
148     return {-1, -1};
149   };
150 
LookupTXT(std::function<void (absl::StatusOr<std::string>)> on_resolved,absl::string_view,grpc_core::Duration,grpc_pollset_set *,absl::string_view)151   TaskHandle LookupTXT(
152       std::function<void(absl::StatusOr<std::string>)> on_resolved,
153       absl::string_view /* name */, grpc_core::Duration /* timeout */,
154       grpc_pollset_set* /* interested_parties */,
155       absl::string_view /* name_server */) override {
156     // Not supported
157     engine_->Run([on_resolved] {
158       grpc_core::ApplicationCallbackExecCtx app_exec_ctx;
159       grpc_core::ExecCtx exec_ctx;
160       on_resolved(absl::UnimplementedError(
161           "The Testing DNS resolver does not support looking up TXT records"));
162     });
163     return {-1, -1};
164   };
165 
166   // Not cancellable
Cancel(TaskHandle)167   bool Cancel(TaskHandle /*handle*/) override { return false; }
168 
169  private:
170   std::shared_ptr<grpc_core::DNSResolver> default_resolver_;
171   std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
172 };
173 
174 }  // namespace
175 
test_dns_lookup_ares(const char * dns_server,const char * name,const char * default_port,grpc_pollset_set *,grpc_closure * on_done,std::unique_ptr<grpc_core::EndpointAddressesList> * addresses,int query_timeout_ms)176 static grpc_ares_request* test_dns_lookup_ares(
177     const char* dns_server, const char* name, const char* default_port,
178     grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done,
179     std::unique_ptr<grpc_core::EndpointAddressesList>* addresses,
180     int query_timeout_ms) {
181   // A records should suffice
182   grpc_ares_request* result = g_default_dns_lookup_ares(
183       dns_server, name, default_port, g_iomgr_args.pollset_set, on_done,
184       addresses, query_timeout_ms);
185   ++g_resolution_count;
186   static auto last_resolution_time = grpc_core::Timestamp::ProcessEpoch();
187   auto now =
188       grpc_core::Timestamp::FromTimespecRoundUp(gpr_now(GPR_CLOCK_MONOTONIC));
189   VLOG(2) << "last_resolution_time:"
190           << last_resolution_time.milliseconds_after_process_epoch()
191           << " now:" << now.milliseconds_after_process_epoch()
192           << " min_time_between:" << kMinResolutionPeriodMs;
193   if (last_resolution_time != grpc_core::Timestamp::ProcessEpoch()) {
194     EXPECT_GE(now - last_resolution_time,
195               grpc_core::Duration::Milliseconds(kMinResolutionPeriodMs));
196   }
197   last_resolution_time = now;
198   // For correct time diff comparisons, make sure that any subsequent calls
199   // to grpc_core::Timestamp::Now() on this thread don't return a time
200   // which is earlier than that returned by the call(s) to
201   // gpr_now(GPR_CLOCK_MONOTONIC) within this function. This is important
202   // because the resolver's last_resolution_timestamp_ will be taken from
203   // grpc_core::Timestamp::Now() right after this returns.
204   grpc_core::ExecCtx::Get()->InvalidateNow();
205   return result;
206 }
207 
test_deadline(void)208 static gpr_timespec test_deadline(void) {
209   return grpc_timeout_seconds_to_deadline(100);
210 }
211 
do_nothing(void *,grpc_error_handle)212 static void do_nothing(void* /*arg*/, grpc_error_handle /*error*/) {}
213 
iomgr_args_init(iomgr_args * args)214 static void iomgr_args_init(iomgr_args* args) {
215   gpr_event_init(&args->ev);
216   args->pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
217   grpc_pollset_init(args->pollset, &args->mu);
218   args->pollset_set = grpc_pollset_set_create();
219   grpc_pollset_set_add_pollset(args->pollset_set, args->pollset);
220   gpr_atm_rel_store(&args->done_atm, 0);
221 }
222 
iomgr_args_finish(iomgr_args * args)223 static void iomgr_args_finish(iomgr_args* args) {
224   ASSERT_TRUE(gpr_event_wait(&args->ev, test_deadline()));
225   grpc_pollset_set_del_pollset(args->pollset_set, args->pollset);
226   grpc_pollset_set_destroy(args->pollset_set);
227   grpc_closure do_nothing_cb;
228   GRPC_CLOSURE_INIT(&do_nothing_cb, do_nothing, nullptr,
229                     grpc_schedule_on_exec_ctx);
230   gpr_mu_lock(args->mu);
231   grpc_pollset_shutdown(args->pollset, &do_nothing_cb);
232   gpr_mu_unlock(args->mu);
233   // exec_ctx needs to be flushed before calling grpc_pollset_destroy()
234   grpc_core::ExecCtx::Get()->Flush();
235   grpc_pollset_destroy(args->pollset);
236   gpr_free(args->pollset);
237 }
238 
n_sec_deadline(int seconds)239 static grpc_core::Timestamp n_sec_deadline(int seconds) {
240   return grpc_core::Timestamp::FromTimespecRoundUp(
241       grpc_timeout_seconds_to_deadline(seconds));
242 }
243 
poll_pollset_until_request_done(iomgr_args * args)244 static void poll_pollset_until_request_done(iomgr_args* args) {
245   grpc_core::ExecCtx exec_ctx;
246   grpc_core::Timestamp deadline = n_sec_deadline(10);
247   while (true) {
248     bool done = gpr_atm_acq_load(&args->done_atm) != 0;
249     if (done) {
250       break;
251     }
252     grpc_core::Duration time_left = deadline - grpc_core::Timestamp::Now();
253     VLOG(2) << "done=" << done << ", time_left=" << time_left.millis();
254     ASSERT_GE(time_left, grpc_core::Duration::Zero());
255     grpc_pollset_worker* worker = nullptr;
256     gpr_mu_lock(args->mu);
257     GRPC_LOG_IF_ERROR("pollset_work", grpc_pollset_work(args->pollset, &worker,
258                                                         n_sec_deadline(1)));
259     gpr_mu_unlock(args->mu);
260     grpc_core::ExecCtx::Get()->Flush();
261   }
262   gpr_event_set(&args->ev, reinterpret_cast<void*>(1));
263 }
264 
265 struct OnResolutionCallbackArg;
266 
267 class ResultHandler : public grpc_core::Resolver::ResultHandler {
268  public:
269   using ResultCallback = void (*)(OnResolutionCallbackArg* state);
270 
SetCallback(ResultCallback result_cb,OnResolutionCallbackArg * state)271   void SetCallback(ResultCallback result_cb, OnResolutionCallbackArg* state) {
272     ASSERT_EQ(result_cb_, nullptr);
273     result_cb_ = result_cb;
274     ASSERT_EQ(state_, nullptr);
275     state_ = state;
276   }
277 
ReportResult(grpc_core::Resolver::Result result)278   void ReportResult(grpc_core::Resolver::Result result) override {
279     if (result.result_health_callback != nullptr) {
280       result.result_health_callback(absl::OkStatus());
281     }
282     ASSERT_NE(result_cb_, nullptr);
283     ASSERT_NE(state_, nullptr);
284     ResultCallback cb = result_cb_;
285     OnResolutionCallbackArg* state = state_;
286     result_cb_ = nullptr;
287     state_ = nullptr;
288     cb(state);
289   }
290 
291  private:
292   ResultCallback result_cb_ = nullptr;
293   OnResolutionCallbackArg* state_ = nullptr;
294 };
295 
296 struct OnResolutionCallbackArg {
297   const char* uri_str = nullptr;
298   grpc_core::OrphanablePtr<grpc_core::Resolver> resolver;
299   ResultHandler* result_handler;
300 };
301 
302 // Set to true by the last callback in the resolution chain.
303 static grpc_core::NoDestruct<grpc_core::Notification> g_all_callbacks_invoked;
304 
305 // It's interesting to run a few rounds of this test because as
306 // we run more rounds, the base starting time
307 // (i.e. ExecCtx g_start_time) gets further and further away
308 // from "Now()". Thus the more rounds ran, the more highlighted the
309 // difference is between absolute and relative times values.
on_fourth_resolution(OnResolutionCallbackArg * cb_arg)310 static void on_fourth_resolution(OnResolutionCallbackArg* cb_arg) {
311   LOG(INFO) << "4th: g_resolution_count: " << g_resolution_count;
312   ASSERT_EQ(g_resolution_count, 4);
313   cb_arg->resolver.reset();
314   gpr_atm_rel_store(&g_iomgr_args.done_atm, 1);
315   gpr_mu_lock(g_iomgr_args.mu);
316   GRPC_LOG_IF_ERROR("pollset_kick",
317                     grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
318   gpr_mu_unlock(g_iomgr_args.mu);
319   delete cb_arg;
320   g_all_callbacks_invoked->Notify();
321 }
322 
on_third_resolution(OnResolutionCallbackArg * cb_arg)323 static void on_third_resolution(OnResolutionCallbackArg* cb_arg) {
324   LOG(INFO) << "3rd: g_resolution_count: " << g_resolution_count;
325   ASSERT_EQ(g_resolution_count, 3);
326   cb_arg->result_handler->SetCallback(on_fourth_resolution, cb_arg);
327   cb_arg->resolver->RequestReresolutionLocked();
328   gpr_mu_lock(g_iomgr_args.mu);
329   GRPC_LOG_IF_ERROR("pollset_kick",
330                     grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
331   gpr_mu_unlock(g_iomgr_args.mu);
332 }
333 
on_second_resolution(OnResolutionCallbackArg * cb_arg)334 static void on_second_resolution(OnResolutionCallbackArg* cb_arg) {
335   LOG(INFO) << "2nd: g_resolution_count: " << g_resolution_count;
336   // The resolution callback was not invoked until new data was
337   // available, which was delayed until after the cooldown period.
338   ASSERT_EQ(g_resolution_count, 2);
339   cb_arg->result_handler->SetCallback(on_third_resolution, cb_arg);
340   cb_arg->resolver->RequestReresolutionLocked();
341   gpr_mu_lock(g_iomgr_args.mu);
342   GRPC_LOG_IF_ERROR("pollset_kick",
343                     grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
344   gpr_mu_unlock(g_iomgr_args.mu);
345 }
346 
on_first_resolution(OnResolutionCallbackArg * cb_arg)347 static void on_first_resolution(OnResolutionCallbackArg* cb_arg) {
348   LOG(INFO) << "1st: g_resolution_count: " << g_resolution_count;
349   // There's one initial system-level resolution and one invocation of a
350   // notification callback (the current function).
351   ASSERT_EQ(g_resolution_count, 1);
352   cb_arg->result_handler->SetCallback(on_second_resolution, cb_arg);
353   cb_arg->resolver->RequestReresolutionLocked();
354   gpr_mu_lock(g_iomgr_args.mu);
355   GRPC_LOG_IF_ERROR("pollset_kick",
356                     grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
357   gpr_mu_unlock(g_iomgr_args.mu);
358 }
359 
start_test_under_work_serializer(void * arg)360 static void start_test_under_work_serializer(void* arg) {
361   OnResolutionCallbackArg* res_cb_arg =
362       static_cast<OnResolutionCallbackArg*>(arg);
363   res_cb_arg->result_handler = new ResultHandler();
364   grpc_core::ResolverFactory* factory = grpc_core::CoreConfiguration::Get()
365                                             .resolver_registry()
366                                             .LookupResolverFactory("dns");
367   absl::StatusOr<grpc_core::URI> uri =
368       grpc_core::URI::Parse(res_cb_arg->uri_str);
369   VLOG(2) << "test: '" << res_cb_arg->uri_str << "' should be valid for '"
370           << factory->scheme() << "'";
371   if (!uri.ok()) {
372     LOG(ERROR) << uri.status();
373     ASSERT_TRUE(uri.ok());
374   }
375   grpc_core::ResolverArgs args;
376   args.uri = std::move(*uri);
377   args.work_serializer = *g_work_serializer;
378   args.result_handler = std::unique_ptr<grpc_core::Resolver::ResultHandler>(
379       res_cb_arg->result_handler);
380   g_resolution_count = 0;
381 
382   grpc_arg cooldown_arg = grpc_channel_arg_integer_create(
383       const_cast<char*>(GRPC_ARG_DNS_MIN_TIME_BETWEEN_RESOLUTIONS_MS),
384       kMinResolutionPeriodMs);
385   grpc_channel_args cooldown_args = {1, &cooldown_arg};
386   args.args = grpc_core::ChannelArgs::FromC(&cooldown_args);
387   args.args = args.args.SetObject(GetDefaultEventEngine());
388   res_cb_arg->resolver = factory->CreateResolver(std::move(args));
389   ASSERT_NE(res_cb_arg->resolver, nullptr);
390   // First resolution, would incur in system-level resolution.
391   res_cb_arg->result_handler->SetCallback(on_first_resolution, res_cb_arg);
392   res_cb_arg->resolver->StartLocked();
393 }
394 
test_cooldown()395 static void test_cooldown() {
396   grpc_core::ExecCtx exec_ctx;
397   iomgr_args_init(&g_iomgr_args);
398   OnResolutionCallbackArg* res_cb_arg = new OnResolutionCallbackArg();
399   res_cb_arg->uri_str = "dns:127.0.0.1";
400 
401   (*g_work_serializer)
402       ->Run([res_cb_arg]() { start_test_under_work_serializer(res_cb_arg); },
403             DEBUG_LOCATION);
404   grpc_core::ExecCtx::Get()->Flush();
405   poll_pollset_until_request_done(&g_iomgr_args);
406   iomgr_args_finish(&g_iomgr_args);
407 }
408 
TEST(DnsResolverCooldownTest,MainTest)409 TEST(DnsResolverCooldownTest, MainTest) {
410   // TODO(yijiem): This test tests the cooldown behavior of the PollingResolver
411   // interface. To do that, it overrides the grpc_dns_lookup_hostname_ares
412   // function and overrides the iomgr's g_dns_resolver system. We would need to
413   // rewrite this test for EventEngine using a custom EE DNSResolver or adding
414   // to the resolver_fuzzer.
415   if (grpc_core::IsEventEngineDnsEnabled()) {
416     GTEST_SKIP() << "Not with event engine dns";
417   }
418   grpc_init();
419 
420   auto work_serializer = std::make_shared<grpc_core::WorkSerializer>(
421       grpc_event_engine::experimental::GetDefaultEventEngine());
422   g_work_serializer = &work_serializer;
423 
424   g_default_dns_lookup_ares = grpc_dns_lookup_hostname_ares;
425   grpc_dns_lookup_hostname_ares = test_dns_lookup_ares;
426   grpc_core::ResetDNSResolver(
427       std::make_unique<TestDNSResolver>(grpc_core::GetDNSResolver()));
428 
429   test_cooldown();
430 
431   grpc_shutdown();
432   g_all_callbacks_invoked->WaitForNotification();
433 }
434 
main(int argc,char ** argv)435 int main(int argc, char** argv) {
436   grpc::testing::TestEnvironment env(&argc, argv);
437   ::testing::InitGoogleTest(&argc, argv);
438   return RUN_ALL_TESTS();
439 }
440