1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/resolver/polling_resolver.h"
20
21 #include <inttypes.h>
22
23 #include <functional>
24 #include <type_traits>
25 #include <utility>
26 #include <vector>
27
28 #include "absl/status/status.h"
29 #include "absl/status/statusor.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/strip.h"
32
33 #include <grpc/support/log.h>
34
35 #include "src/core/lib/backoff/backoff.h"
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/lib/gprpp/debug_location.h"
38 #include "src/core/lib/gprpp/ref_counted_ptr.h"
39 #include "src/core/lib/gprpp/work_serializer.h"
40 #include "src/core/lib/iomgr/exec_ctx.h"
41 #include "src/core/resolver/endpoint_addresses.h"
42 #include "src/core/service_config/service_config.h"
43 #include "src/core/lib/uri/uri_parser.h"
44
45 namespace grpc_core {
46
47 using ::grpc_event_engine::experimental::EventEngine;
48
PollingResolver(ResolverArgs args,Duration min_time_between_resolutions,BackOff::Options backoff_options,TraceFlag * tracer)49 PollingResolver::PollingResolver(ResolverArgs args,
50 Duration min_time_between_resolutions,
51 BackOff::Options backoff_options,
52 TraceFlag* tracer)
53 : authority_(args.uri.authority()),
54 name_to_resolve_(absl::StripPrefix(args.uri.path(), "/")),
55 channel_args_(std::move(args.args)),
56 work_serializer_(std::move(args.work_serializer)),
57 result_handler_(std::move(args.result_handler)),
58 tracer_(tracer),
59 interested_parties_(args.pollset_set),
60 min_time_between_resolutions_(min_time_between_resolutions),
61 backoff_(backoff_options) {
62 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
63 gpr_log(GPR_INFO, "[polling resolver %p] created", this);
64 }
65 }
66
~PollingResolver()67 PollingResolver::~PollingResolver() {
68 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
69 gpr_log(GPR_INFO, "[polling resolver %p] destroying", this);
70 }
71 }
72
StartLocked()73 void PollingResolver::StartLocked() { MaybeStartResolvingLocked(); }
74
RequestReresolutionLocked()75 void PollingResolver::RequestReresolutionLocked() {
76 if (request_ == nullptr) {
77 // If we're still waiting for a result-health callback from the last
78 // result we reported, don't trigger the re-resolution until we get
79 // that callback.
80 if (result_status_state_ ==
81 ResultStatusState::kResultHealthCallbackPending) {
82 result_status_state_ =
83 ResultStatusState::kReresolutionRequestedWhileCallbackWasPending;
84 } else {
85 MaybeStartResolvingLocked();
86 }
87 }
88 }
89
ResetBackoffLocked()90 void PollingResolver::ResetBackoffLocked() {
91 backoff_.Reset();
92 if (next_resolution_timer_handle_.has_value()) {
93 MaybeCancelNextResolutionTimer();
94 StartResolvingLocked();
95 }
96 }
97
ShutdownLocked()98 void PollingResolver::ShutdownLocked() {
99 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
100 gpr_log(GPR_INFO, "[polling resolver %p] shutting down", this);
101 }
102 shutdown_ = true;
103 MaybeCancelNextResolutionTimer();
104 request_.reset();
105 }
106
ScheduleNextResolutionTimer(const Duration & timeout)107 void PollingResolver::ScheduleNextResolutionTimer(const Duration& timeout) {
108 next_resolution_timer_handle_ =
109 channel_args_.GetObject<EventEngine>()->RunAfter(
110 timeout, [self = RefAsSubclass<PollingResolver>()]() mutable {
111 ApplicationCallbackExecCtx callback_exec_ctx;
112 ExecCtx exec_ctx;
113 auto* self_ptr = self.get();
114 self_ptr->work_serializer_->Run(
115 [self = std::move(self)]() { self->OnNextResolutionLocked(); },
116 DEBUG_LOCATION);
117 });
118 }
119
OnNextResolutionLocked()120 void PollingResolver::OnNextResolutionLocked() {
121 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
122 gpr_log(GPR_INFO,
123 "[polling resolver %p] re-resolution timer fired: shutdown_=%d",
124 this, shutdown_);
125 }
126 // If we haven't been cancelled nor shutdown, then start resolving.
127 if (next_resolution_timer_handle_.has_value() && !shutdown_) {
128 next_resolution_timer_handle_.reset();
129 StartResolvingLocked();
130 }
131 }
132
MaybeCancelNextResolutionTimer()133 void PollingResolver::MaybeCancelNextResolutionTimer() {
134 if (next_resolution_timer_handle_.has_value()) {
135 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
136 gpr_log(GPR_INFO, "[polling resolver %p] cancel re-resolution timer",
137 this);
138 }
139 channel_args_.GetObject<EventEngine>()->Cancel(
140 *next_resolution_timer_handle_);
141 next_resolution_timer_handle_.reset();
142 }
143 }
144
OnRequestComplete(Result result)145 void PollingResolver::OnRequestComplete(Result result) {
146 Ref(DEBUG_LOCATION, "OnRequestComplete").release();
147 work_serializer_->Run(
148 [this, result]() mutable { OnRequestCompleteLocked(std::move(result)); },
149 DEBUG_LOCATION);
150 }
151
OnRequestCompleteLocked(Result result)152 void PollingResolver::OnRequestCompleteLocked(Result result) {
153 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
154 gpr_log(GPR_INFO, "[polling resolver %p] request complete", this);
155 }
156 request_.reset();
157 if (!shutdown_) {
158 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
159 gpr_log(GPR_INFO,
160 "[polling resolver %p] returning result: "
161 "addresses=%s, service_config=%s, resolution_note=%s",
162 this,
163 result.addresses.ok()
164 ? absl::StrCat("<", result.addresses->size(), " addresses>")
165 .c_str()
166 : result.addresses.status().ToString().c_str(),
167 result.service_config.ok()
168 ? (*result.service_config == nullptr
169 ? "<null>"
170 : std::string((*result.service_config)->json_string())
171 .c_str())
172 : result.service_config.status().ToString().c_str(),
173 result.resolution_note.c_str());
174 }
175 GPR_ASSERT(result.result_health_callback == nullptr);
176 result.result_health_callback =
177 [self = RefAsSubclass<PollingResolver>(
178 DEBUG_LOCATION, "result_health_callback")](absl::Status status) {
179 self->GetResultStatus(std::move(status));
180 };
181 result_status_state_ = ResultStatusState::kResultHealthCallbackPending;
182 result_handler_->ReportResult(std::move(result));
183 }
184 Unref(DEBUG_LOCATION, "OnRequestComplete");
185 }
186
GetResultStatus(absl::Status status)187 void PollingResolver::GetResultStatus(absl::Status status) {
188 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
189 gpr_log(GPR_INFO, "[polling resolver %p] result status from channel: %s",
190 this, status.ToString().c_str());
191 }
192 if (status.ok()) {
193 // Reset backoff state so that we start from the beginning when the
194 // next request gets triggered.
195 backoff_.Reset();
196 // If a re-resolution attempt was requested while the result-status
197 // callback was pending, trigger a new request now.
198 if (std::exchange(result_status_state_, ResultStatusState::kNone) ==
199 ResultStatusState::kReresolutionRequestedWhileCallbackWasPending) {
200 MaybeStartResolvingLocked();
201 }
202 } else {
203 // Set up for retry.
204 // InvalidateNow to avoid getting stuck re-initializing this timer
205 // in a loop while draining the currently-held WorkSerializer.
206 // Also see https://github.com/grpc/grpc/issues/26079.
207 ExecCtx::Get()->InvalidateNow();
208 const Timestamp next_try = backoff_.NextAttemptTime();
209 const Duration timeout = next_try - Timestamp::Now();
210 GPR_ASSERT(!next_resolution_timer_handle_.has_value());
211 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
212 if (timeout > Duration::Zero()) {
213 gpr_log(GPR_INFO, "[polling resolver %p] retrying in %" PRId64 " ms",
214 this, timeout.millis());
215 } else {
216 gpr_log(GPR_INFO, "[polling resolver %p] retrying immediately", this);
217 }
218 }
219 ScheduleNextResolutionTimer(timeout);
220 // Reset result_status_state_. Note that even if re-resolution was
221 // requested while the result-health callback was pending, we can
222 // ignore it here, because we are in backoff to re-resolve anyway.
223 result_status_state_ = ResultStatusState::kNone;
224 }
225 }
226
MaybeStartResolvingLocked()227 void PollingResolver::MaybeStartResolvingLocked() {
228 // If there is an existing timer, the time it fires is the earliest time we
229 // can start the next resolution.
230 if (next_resolution_timer_handle_.has_value()) return;
231 if (last_resolution_timestamp_.has_value()) {
232 // InvalidateNow to avoid getting stuck re-initializing this timer
233 // in a loop while draining the currently-held WorkSerializer.
234 // Also see https://github.com/grpc/grpc/issues/26079.
235 ExecCtx::Get()->InvalidateNow();
236 const Timestamp earliest_next_resolution =
237 *last_resolution_timestamp_ + min_time_between_resolutions_;
238 const Duration time_until_next_resolution =
239 earliest_next_resolution - Timestamp::Now();
240 if (time_until_next_resolution > Duration::Zero()) {
241 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
242 const Duration last_resolution_ago =
243 Timestamp::Now() - *last_resolution_timestamp_;
244 gpr_log(GPR_INFO,
245 "[polling resolver %p] in cooldown from last resolution "
246 "(from %" PRId64 " ms ago); will resolve again in %" PRId64
247 " ms",
248 this, last_resolution_ago.millis(),
249 time_until_next_resolution.millis());
250 }
251 ScheduleNextResolutionTimer(time_until_next_resolution);
252 return;
253 }
254 }
255 StartResolvingLocked();
256 }
257
StartResolvingLocked()258 void PollingResolver::StartResolvingLocked() {
259 request_ = StartRequest();
260 last_resolution_timestamp_ = Timestamp::Now();
261 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
262 if (request_ != nullptr) {
263 gpr_log(GPR_INFO,
264 "[polling resolver %p] starting resolution, request_=%p", this,
265 request_.get());
266 } else {
267 gpr_log(GPR_INFO, "[polling resolver %p] StartRequest failed", this);
268 }
269 }
270 }
271
272 } // namespace grpc_core
273