• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/resolver/polling_resolver.h"
20 
21 #include <inttypes.h>
22 
23 #include <functional>
24 #include <type_traits>
25 #include <utility>
26 #include <vector>
27 
28 #include "absl/status/status.h"
29 #include "absl/status/statusor.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/strip.h"
32 
33 #include <grpc/support/log.h>
34 
35 #include "src/core/lib/backoff/backoff.h"
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/lib/gprpp/debug_location.h"
38 #include "src/core/lib/gprpp/ref_counted_ptr.h"
39 #include "src/core/lib/gprpp/work_serializer.h"
40 #include "src/core/lib/iomgr/exec_ctx.h"
41 #include "src/core/resolver/endpoint_addresses.h"
42 #include "src/core/service_config/service_config.h"
43 #include "src/core/lib/uri/uri_parser.h"
44 
45 namespace grpc_core {
46 
47 using ::grpc_event_engine::experimental::EventEngine;
48 
PollingResolver(ResolverArgs args,Duration min_time_between_resolutions,BackOff::Options backoff_options,TraceFlag * tracer)49 PollingResolver::PollingResolver(ResolverArgs args,
50                                  Duration min_time_between_resolutions,
51                                  BackOff::Options backoff_options,
52                                  TraceFlag* tracer)
53     : authority_(args.uri.authority()),
54       name_to_resolve_(absl::StripPrefix(args.uri.path(), "/")),
55       channel_args_(std::move(args.args)),
56       work_serializer_(std::move(args.work_serializer)),
57       result_handler_(std::move(args.result_handler)),
58       tracer_(tracer),
59       interested_parties_(args.pollset_set),
60       min_time_between_resolutions_(min_time_between_resolutions),
61       backoff_(backoff_options) {
62   if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
63     gpr_log(GPR_INFO, "[polling resolver %p] created", this);
64   }
65 }
66 
~PollingResolver()67 PollingResolver::~PollingResolver() {
68   if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
69     gpr_log(GPR_INFO, "[polling resolver %p] destroying", this);
70   }
71 }
72 
StartLocked()73 void PollingResolver::StartLocked() { MaybeStartResolvingLocked(); }
74 
RequestReresolutionLocked()75 void PollingResolver::RequestReresolutionLocked() {
76   if (request_ == nullptr) {
77     // If we're still waiting for a result-health callback from the last
78     // result we reported, don't trigger the re-resolution until we get
79     // that callback.
80     if (result_status_state_ ==
81         ResultStatusState::kResultHealthCallbackPending) {
82       result_status_state_ =
83           ResultStatusState::kReresolutionRequestedWhileCallbackWasPending;
84     } else {
85       MaybeStartResolvingLocked();
86     }
87   }
88 }
89 
ResetBackoffLocked()90 void PollingResolver::ResetBackoffLocked() {
91   backoff_.Reset();
92   if (next_resolution_timer_handle_.has_value()) {
93     MaybeCancelNextResolutionTimer();
94     StartResolvingLocked();
95   }
96 }
97 
ShutdownLocked()98 void PollingResolver::ShutdownLocked() {
99   if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
100     gpr_log(GPR_INFO, "[polling resolver %p] shutting down", this);
101   }
102   shutdown_ = true;
103   MaybeCancelNextResolutionTimer();
104   request_.reset();
105 }
106 
ScheduleNextResolutionTimer(const Duration & timeout)107 void PollingResolver::ScheduleNextResolutionTimer(const Duration& timeout) {
108   next_resolution_timer_handle_ =
109       channel_args_.GetObject<EventEngine>()->RunAfter(
110           timeout, [self = RefAsSubclass<PollingResolver>()]() mutable {
111             ApplicationCallbackExecCtx callback_exec_ctx;
112             ExecCtx exec_ctx;
113             auto* self_ptr = self.get();
114             self_ptr->work_serializer_->Run(
115                 [self = std::move(self)]() { self->OnNextResolutionLocked(); },
116                 DEBUG_LOCATION);
117           });
118 }
119 
OnNextResolutionLocked()120 void PollingResolver::OnNextResolutionLocked() {
121   if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
122     gpr_log(GPR_INFO,
123             "[polling resolver %p] re-resolution timer fired: shutdown_=%d",
124             this, shutdown_);
125   }
126   // If we haven't been cancelled nor shutdown, then start resolving.
127   if (next_resolution_timer_handle_.has_value() && !shutdown_) {
128     next_resolution_timer_handle_.reset();
129     StartResolvingLocked();
130   }
131 }
132 
MaybeCancelNextResolutionTimer()133 void PollingResolver::MaybeCancelNextResolutionTimer() {
134   if (next_resolution_timer_handle_.has_value()) {
135     if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
136       gpr_log(GPR_INFO, "[polling resolver %p] cancel re-resolution timer",
137               this);
138     }
139     channel_args_.GetObject<EventEngine>()->Cancel(
140         *next_resolution_timer_handle_);
141     next_resolution_timer_handle_.reset();
142   }
143 }
144 
OnRequestComplete(Result result)145 void PollingResolver::OnRequestComplete(Result result) {
146   Ref(DEBUG_LOCATION, "OnRequestComplete").release();
147   work_serializer_->Run(
148       [this, result]() mutable { OnRequestCompleteLocked(std::move(result)); },
149       DEBUG_LOCATION);
150 }
151 
OnRequestCompleteLocked(Result result)152 void PollingResolver::OnRequestCompleteLocked(Result result) {
153   if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
154     gpr_log(GPR_INFO, "[polling resolver %p] request complete", this);
155   }
156   request_.reset();
157   if (!shutdown_) {
158     if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
159       gpr_log(GPR_INFO,
160               "[polling resolver %p] returning result: "
161               "addresses=%s, service_config=%s, resolution_note=%s",
162               this,
163               result.addresses.ok()
164                   ? absl::StrCat("<", result.addresses->size(), " addresses>")
165                         .c_str()
166                   : result.addresses.status().ToString().c_str(),
167               result.service_config.ok()
168                   ? (*result.service_config == nullptr
169                          ? "<null>"
170                          : std::string((*result.service_config)->json_string())
171                                .c_str())
172                   : result.service_config.status().ToString().c_str(),
173               result.resolution_note.c_str());
174     }
175     GPR_ASSERT(result.result_health_callback == nullptr);
176     result.result_health_callback =
177         [self = RefAsSubclass<PollingResolver>(
178              DEBUG_LOCATION, "result_health_callback")](absl::Status status) {
179           self->GetResultStatus(std::move(status));
180         };
181     result_status_state_ = ResultStatusState::kResultHealthCallbackPending;
182     result_handler_->ReportResult(std::move(result));
183   }
184   Unref(DEBUG_LOCATION, "OnRequestComplete");
185 }
186 
GetResultStatus(absl::Status status)187 void PollingResolver::GetResultStatus(absl::Status status) {
188   if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
189     gpr_log(GPR_INFO, "[polling resolver %p] result status from channel: %s",
190             this, status.ToString().c_str());
191   }
192   if (status.ok()) {
193     // Reset backoff state so that we start from the beginning when the
194     // next request gets triggered.
195     backoff_.Reset();
196     // If a re-resolution attempt was requested while the result-status
197     // callback was pending, trigger a new request now.
198     if (std::exchange(result_status_state_, ResultStatusState::kNone) ==
199         ResultStatusState::kReresolutionRequestedWhileCallbackWasPending) {
200       MaybeStartResolvingLocked();
201     }
202   } else {
203     // Set up for retry.
204     // InvalidateNow to avoid getting stuck re-initializing this timer
205     // in a loop while draining the currently-held WorkSerializer.
206     // Also see https://github.com/grpc/grpc/issues/26079.
207     ExecCtx::Get()->InvalidateNow();
208     const Timestamp next_try = backoff_.NextAttemptTime();
209     const Duration timeout = next_try - Timestamp::Now();
210     GPR_ASSERT(!next_resolution_timer_handle_.has_value());
211     if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
212       if (timeout > Duration::Zero()) {
213         gpr_log(GPR_INFO, "[polling resolver %p] retrying in %" PRId64 " ms",
214                 this, timeout.millis());
215       } else {
216         gpr_log(GPR_INFO, "[polling resolver %p] retrying immediately", this);
217       }
218     }
219     ScheduleNextResolutionTimer(timeout);
220     // Reset result_status_state_.  Note that even if re-resolution was
221     // requested while the result-health callback was pending, we can
222     // ignore it here, because we are in backoff to re-resolve anyway.
223     result_status_state_ = ResultStatusState::kNone;
224   }
225 }
226 
MaybeStartResolvingLocked()227 void PollingResolver::MaybeStartResolvingLocked() {
228   // If there is an existing timer, the time it fires is the earliest time we
229   // can start the next resolution.
230   if (next_resolution_timer_handle_.has_value()) return;
231   if (last_resolution_timestamp_.has_value()) {
232     // InvalidateNow to avoid getting stuck re-initializing this timer
233     // in a loop while draining the currently-held WorkSerializer.
234     // Also see https://github.com/grpc/grpc/issues/26079.
235     ExecCtx::Get()->InvalidateNow();
236     const Timestamp earliest_next_resolution =
237         *last_resolution_timestamp_ + min_time_between_resolutions_;
238     const Duration time_until_next_resolution =
239         earliest_next_resolution - Timestamp::Now();
240     if (time_until_next_resolution > Duration::Zero()) {
241       if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
242         const Duration last_resolution_ago =
243             Timestamp::Now() - *last_resolution_timestamp_;
244         gpr_log(GPR_INFO,
245                 "[polling resolver %p] in cooldown from last resolution "
246                 "(from %" PRId64 " ms ago); will resolve again in %" PRId64
247                 " ms",
248                 this, last_resolution_ago.millis(),
249                 time_until_next_resolution.millis());
250       }
251       ScheduleNextResolutionTimer(time_until_next_resolution);
252       return;
253     }
254   }
255   StartResolvingLocked();
256 }
257 
StartResolvingLocked()258 void PollingResolver::StartResolvingLocked() {
259   request_ = StartRequest();
260   last_resolution_timestamp_ = Timestamp::Now();
261   if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
262     if (request_ != nullptr) {
263       gpr_log(GPR_INFO,
264               "[polling resolver %p] starting resolution, request_=%p", this,
265               request_.get());
266     } else {
267       gpr_log(GPR_INFO, "[polling resolver %p] StartRequest failed", this);
268     }
269   }
270 }
271 
272 }  // namespace grpc_core
273