1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/resolver/polling_resolver.h"
18
19 #include <grpc/support/port_platform.h>
20 #include <inttypes.h>
21
22 #include <functional>
23 #include <type_traits>
24 #include <utility>
25 #include <vector>
26
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "absl/status/status.h"
30 #include "absl/status/statusor.h"
31 #include "absl/strings/str_cat.h"
32 #include "absl/strings/strip.h"
33 #include "src/core/lib/channel/channel_args.h"
34 #include "src/core/lib/iomgr/exec_ctx.h"
35 #include "src/core/resolver/endpoint_addresses.h"
36 #include "src/core/service_config/service_config.h"
37 #include "src/core/util/backoff.h"
38 #include "src/core/util/debug_location.h"
39 #include "src/core/util/ref_counted_ptr.h"
40 #include "src/core/util/uri.h"
41 #include "src/core/util/work_serializer.h"
42
43 namespace grpc_core {
44
45 using ::grpc_event_engine::experimental::EventEngine;
46
PollingResolver(ResolverArgs args,Duration min_time_between_resolutions,BackOff::Options backoff_options,TraceFlag * tracer)47 PollingResolver::PollingResolver(ResolverArgs args,
48 Duration min_time_between_resolutions,
49 BackOff::Options backoff_options,
50 TraceFlag* tracer)
51 : authority_(args.uri.authority()),
52 name_to_resolve_(absl::StripPrefix(args.uri.path(), "/")),
53 channel_args_(std::move(args.args)),
54 work_serializer_(std::move(args.work_serializer)),
55 result_handler_(std::move(args.result_handler)),
56 tracer_(tracer),
57 interested_parties_(args.pollset_set),
58 min_time_between_resolutions_(min_time_between_resolutions),
59 backoff_(backoff_options) {
60 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
61 LOG(INFO) << "[polling resolver " << this << "] created";
62 }
63 }
64
~PollingResolver()65 PollingResolver::~PollingResolver() {
66 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
67 LOG(INFO) << "[polling resolver " << this << "] destroying";
68 }
69 }
70
StartLocked()71 void PollingResolver::StartLocked() { MaybeStartResolvingLocked(); }
72
RequestReresolutionLocked()73 void PollingResolver::RequestReresolutionLocked() {
74 if (request_ == nullptr) {
75 // If we're still waiting for a result-health callback from the last
76 // result we reported, don't trigger the re-resolution until we get
77 // that callback.
78 if (result_status_state_ ==
79 ResultStatusState::kResultHealthCallbackPending) {
80 result_status_state_ =
81 ResultStatusState::kReresolutionRequestedWhileCallbackWasPending;
82 } else {
83 MaybeStartResolvingLocked();
84 }
85 }
86 }
87
ResetBackoffLocked()88 void PollingResolver::ResetBackoffLocked() {
89 backoff_.Reset();
90 if (next_resolution_timer_handle_.has_value()) {
91 MaybeCancelNextResolutionTimer();
92 StartResolvingLocked();
93 }
94 }
95
ShutdownLocked()96 void PollingResolver::ShutdownLocked() {
97 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
98 LOG(INFO) << "[polling resolver " << this << "] shutting down";
99 }
100 shutdown_ = true;
101 MaybeCancelNextResolutionTimer();
102 request_.reset();
103 }
104
ScheduleNextResolutionTimer(Duration delay)105 void PollingResolver::ScheduleNextResolutionTimer(Duration delay) {
106 next_resolution_timer_handle_ =
107 channel_args_.GetObject<EventEngine>()->RunAfter(
108 delay, [self = RefAsSubclass<PollingResolver>()]() mutable {
109 ApplicationCallbackExecCtx callback_exec_ctx;
110 ExecCtx exec_ctx;
111 auto* self_ptr = self.get();
112 self_ptr->work_serializer_->Run(
113 [self = std::move(self)]() { self->OnNextResolutionLocked(); },
114 DEBUG_LOCATION);
115 });
116 }
117
OnNextResolutionLocked()118 void PollingResolver::OnNextResolutionLocked() {
119 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
120 LOG(INFO) << "[polling resolver " << this
121 << "] re-resolution timer fired: shutdown_=" << shutdown_;
122 }
123 // If we haven't been cancelled nor shutdown, then start resolving.
124 if (next_resolution_timer_handle_.has_value() && !shutdown_) {
125 next_resolution_timer_handle_.reset();
126 StartResolvingLocked();
127 }
128 }
129
MaybeCancelNextResolutionTimer()130 void PollingResolver::MaybeCancelNextResolutionTimer() {
131 if (next_resolution_timer_handle_.has_value()) {
132 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
133 LOG(INFO) << "[polling resolver " << this
134 << "] cancel re-resolution timer";
135 }
136 channel_args_.GetObject<EventEngine>()->Cancel(
137 *next_resolution_timer_handle_);
138 next_resolution_timer_handle_.reset();
139 }
140 }
141
OnRequestComplete(Result result)142 void PollingResolver::OnRequestComplete(Result result) {
143 Ref(DEBUG_LOCATION, "OnRequestComplete").release();
144 work_serializer_->Run(
145 [this, result]() mutable { OnRequestCompleteLocked(std::move(result)); },
146 DEBUG_LOCATION);
147 }
148
OnRequestCompleteLocked(Result result)149 void PollingResolver::OnRequestCompleteLocked(Result result) {
150 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
151 LOG(INFO) << "[polling resolver " << this << "] request complete";
152 }
153 request_.reset();
154 if (!shutdown_) {
155 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
156 LOG(INFO)
157 << "[polling resolver " << this << "] returning result: addresses="
158 << (result.addresses.ok()
159 ? absl::StrCat("<", result.addresses->size(), " addresses>")
160 : result.addresses.status().ToString())
161 << ", service_config="
162 << (result.service_config.ok()
163 ? (*result.service_config == nullptr
164 ? "<null>"
165 : std::string((*result.service_config)->json_string())
166 .c_str())
167 : result.service_config.status().ToString())
168 << ", resolution_note=" << result.resolution_note;
169 }
170 CHECK(result.result_health_callback == nullptr);
171 result.result_health_callback =
172 [self = RefAsSubclass<PollingResolver>(
173 DEBUG_LOCATION, "result_health_callback")](absl::Status status) {
174 self->GetResultStatus(std::move(status));
175 };
176 result_status_state_ = ResultStatusState::kResultHealthCallbackPending;
177 result_handler_->ReportResult(std::move(result));
178 }
179 Unref(DEBUG_LOCATION, "OnRequestComplete");
180 }
181
GetResultStatus(absl::Status status)182 void PollingResolver::GetResultStatus(absl::Status status) {
183 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
184 LOG(INFO) << "[polling resolver " << this
185 << "] result status from channel: " << status;
186 }
187 if (status.ok()) {
188 // Reset backoff state so that we start from the beginning when the
189 // next request gets triggered.
190 backoff_.Reset();
191 // If a re-resolution attempt was requested while the result-status
192 // callback was pending, trigger a new request now.
193 if (std::exchange(result_status_state_, ResultStatusState::kNone) ==
194 ResultStatusState::kReresolutionRequestedWhileCallbackWasPending) {
195 MaybeStartResolvingLocked();
196 }
197 } else {
198 // Set up for retry.
199 const Duration delay = backoff_.NextAttemptDelay();
200 CHECK(!next_resolution_timer_handle_.has_value());
201 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
202 LOG(INFO) << "[polling resolver " << this << "] retrying in "
203 << delay.millis() << " ms";
204 }
205 ScheduleNextResolutionTimer(delay);
206 // Reset result_status_state_. Note that even if re-resolution was
207 // requested while the result-health callback was pending, we can
208 // ignore it here, because we are in backoff to re-resolve anyway.
209 result_status_state_ = ResultStatusState::kNone;
210 }
211 }
212
MaybeStartResolvingLocked()213 void PollingResolver::MaybeStartResolvingLocked() {
214 // If there is an existing timer, the time it fires is the earliest time we
215 // can start the next resolution.
216 if (next_resolution_timer_handle_.has_value()) return;
217 if (last_resolution_timestamp_.has_value()) {
218 // InvalidateNow to avoid getting stuck re-initializing this timer
219 // in a loop while draining the currently-held WorkSerializer.
220 // Also see https://github.com/grpc/grpc/issues/26079.
221 ExecCtx::Get()->InvalidateNow();
222 const Timestamp earliest_next_resolution =
223 *last_resolution_timestamp_ + min_time_between_resolutions_;
224 const Duration time_until_next_resolution =
225 earliest_next_resolution - Timestamp::Now();
226 if (time_until_next_resolution > Duration::Zero()) {
227 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
228 const Duration last_resolution_ago =
229 Timestamp::Now() - *last_resolution_timestamp_;
230 LOG(INFO) << "[polling resolver " << this
231 << "] in cooldown from last resolution (from "
232 << last_resolution_ago.millis()
233 << " ms ago); will resolve again in "
234 << time_until_next_resolution.millis() << " ms";
235 }
236 ScheduleNextResolutionTimer(time_until_next_resolution);
237 return;
238 }
239 }
240 StartResolvingLocked();
241 }
242
StartResolvingLocked()243 void PollingResolver::StartResolvingLocked() {
244 request_ = StartRequest();
245 last_resolution_timestamp_ = Timestamp::Now();
246 if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
247 if (request_ != nullptr) {
248 LOG(INFO) << "[polling resolver " << this
249 << "] starting resolution, request_=" << request_.get();
250 } else {
251 LOG(INFO) << "[polling resolver " << this << "] StartRequest failed";
252 }
253 }
254 }
255
256 } // namespace grpc_core
257