• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/resolver/polling_resolver.h"
18 
19 #include <grpc/support/port_platform.h>
20 #include <inttypes.h>
21 
22 #include <functional>
23 #include <type_traits>
24 #include <utility>
25 #include <vector>
26 
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "absl/status/status.h"
30 #include "absl/status/statusor.h"
31 #include "absl/strings/str_cat.h"
32 #include "absl/strings/strip.h"
33 #include "src/core/lib/channel/channel_args.h"
34 #include "src/core/lib/iomgr/exec_ctx.h"
35 #include "src/core/resolver/endpoint_addresses.h"
36 #include "src/core/service_config/service_config.h"
37 #include "src/core/util/backoff.h"
38 #include "src/core/util/debug_location.h"
39 #include "src/core/util/ref_counted_ptr.h"
40 #include "src/core/util/uri.h"
41 #include "src/core/util/work_serializer.h"
42 
43 namespace grpc_core {
44 
45 using ::grpc_event_engine::experimental::EventEngine;
46 
PollingResolver(ResolverArgs args,Duration min_time_between_resolutions,BackOff::Options backoff_options,TraceFlag * tracer)47 PollingResolver::PollingResolver(ResolverArgs args,
48                                  Duration min_time_between_resolutions,
49                                  BackOff::Options backoff_options,
50                                  TraceFlag* tracer)
51     : authority_(args.uri.authority()),
52       name_to_resolve_(absl::StripPrefix(args.uri.path(), "/")),
53       channel_args_(std::move(args.args)),
54       work_serializer_(std::move(args.work_serializer)),
55       result_handler_(std::move(args.result_handler)),
56       tracer_(tracer),
57       interested_parties_(args.pollset_set),
58       min_time_between_resolutions_(min_time_between_resolutions),
59       backoff_(backoff_options) {
60   if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
61     LOG(INFO) << "[polling resolver " << this << "] created";
62   }
63 }
64 
~PollingResolver()65 PollingResolver::~PollingResolver() {
66   if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
67     LOG(INFO) << "[polling resolver " << this << "] destroying";
68   }
69 }
70 
StartLocked()71 void PollingResolver::StartLocked() { MaybeStartResolvingLocked(); }
72 
RequestReresolutionLocked()73 void PollingResolver::RequestReresolutionLocked() {
74   if (request_ == nullptr) {
75     // If we're still waiting for a result-health callback from the last
76     // result we reported, don't trigger the re-resolution until we get
77     // that callback.
78     if (result_status_state_ ==
79         ResultStatusState::kResultHealthCallbackPending) {
80       result_status_state_ =
81           ResultStatusState::kReresolutionRequestedWhileCallbackWasPending;
82     } else {
83       MaybeStartResolvingLocked();
84     }
85   }
86 }
87 
ResetBackoffLocked()88 void PollingResolver::ResetBackoffLocked() {
89   backoff_.Reset();
90   if (next_resolution_timer_handle_.has_value()) {
91     MaybeCancelNextResolutionTimer();
92     StartResolvingLocked();
93   }
94 }
95 
ShutdownLocked()96 void PollingResolver::ShutdownLocked() {
97   if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
98     LOG(INFO) << "[polling resolver " << this << "] shutting down";
99   }
100   shutdown_ = true;
101   MaybeCancelNextResolutionTimer();
102   request_.reset();
103 }
104 
ScheduleNextResolutionTimer(Duration delay)105 void PollingResolver::ScheduleNextResolutionTimer(Duration delay) {
106   next_resolution_timer_handle_ =
107       channel_args_.GetObject<EventEngine>()->RunAfter(
108           delay, [self = RefAsSubclass<PollingResolver>()]() mutable {
109             ApplicationCallbackExecCtx callback_exec_ctx;
110             ExecCtx exec_ctx;
111             auto* self_ptr = self.get();
112             self_ptr->work_serializer_->Run(
113                 [self = std::move(self)]() { self->OnNextResolutionLocked(); },
114                 DEBUG_LOCATION);
115           });
116 }
117 
OnNextResolutionLocked()118 void PollingResolver::OnNextResolutionLocked() {
119   if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
120     LOG(INFO) << "[polling resolver " << this
121               << "] re-resolution timer fired: shutdown_=" << shutdown_;
122   }
123   // If we haven't been cancelled nor shutdown, then start resolving.
124   if (next_resolution_timer_handle_.has_value() && !shutdown_) {
125     next_resolution_timer_handle_.reset();
126     StartResolvingLocked();
127   }
128 }
129 
MaybeCancelNextResolutionTimer()130 void PollingResolver::MaybeCancelNextResolutionTimer() {
131   if (next_resolution_timer_handle_.has_value()) {
132     if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
133       LOG(INFO) << "[polling resolver " << this
134                 << "] cancel re-resolution timer";
135     }
136     channel_args_.GetObject<EventEngine>()->Cancel(
137         *next_resolution_timer_handle_);
138     next_resolution_timer_handle_.reset();
139   }
140 }
141 
OnRequestComplete(Result result)142 void PollingResolver::OnRequestComplete(Result result) {
143   Ref(DEBUG_LOCATION, "OnRequestComplete").release();
144   work_serializer_->Run(
145       [this, result]() mutable { OnRequestCompleteLocked(std::move(result)); },
146       DEBUG_LOCATION);
147 }
148 
OnRequestCompleteLocked(Result result)149 void PollingResolver::OnRequestCompleteLocked(Result result) {
150   if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
151     LOG(INFO) << "[polling resolver " << this << "] request complete";
152   }
153   request_.reset();
154   if (!shutdown_) {
155     if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
156       LOG(INFO)
157           << "[polling resolver " << this << "] returning result: addresses="
158           << (result.addresses.ok()
159                   ? absl::StrCat("<", result.addresses->size(), " addresses>")
160                   : result.addresses.status().ToString())
161           << ", service_config="
162           << (result.service_config.ok()
163                   ? (*result.service_config == nullptr
164                          ? "<null>"
165                          : std::string((*result.service_config)->json_string())
166                                .c_str())
167                   : result.service_config.status().ToString())
168           << ", resolution_note=" << result.resolution_note;
169     }
170     CHECK(result.result_health_callback == nullptr);
171     result.result_health_callback =
172         [self = RefAsSubclass<PollingResolver>(
173              DEBUG_LOCATION, "result_health_callback")](absl::Status status) {
174           self->GetResultStatus(std::move(status));
175         };
176     result_status_state_ = ResultStatusState::kResultHealthCallbackPending;
177     result_handler_->ReportResult(std::move(result));
178   }
179   Unref(DEBUG_LOCATION, "OnRequestComplete");
180 }
181 
GetResultStatus(absl::Status status)182 void PollingResolver::GetResultStatus(absl::Status status) {
183   if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
184     LOG(INFO) << "[polling resolver " << this
185               << "] result status from channel: " << status;
186   }
187   if (status.ok()) {
188     // Reset backoff state so that we start from the beginning when the
189     // next request gets triggered.
190     backoff_.Reset();
191     // If a re-resolution attempt was requested while the result-status
192     // callback was pending, trigger a new request now.
193     if (std::exchange(result_status_state_, ResultStatusState::kNone) ==
194         ResultStatusState::kReresolutionRequestedWhileCallbackWasPending) {
195       MaybeStartResolvingLocked();
196     }
197   } else {
198     // Set up for retry.
199     const Duration delay = backoff_.NextAttemptDelay();
200     CHECK(!next_resolution_timer_handle_.has_value());
201     if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
202       LOG(INFO) << "[polling resolver " << this << "] retrying in "
203                 << delay.millis() << " ms";
204     }
205     ScheduleNextResolutionTimer(delay);
206     // Reset result_status_state_.  Note that even if re-resolution was
207     // requested while the result-health callback was pending, we can
208     // ignore it here, because we are in backoff to re-resolve anyway.
209     result_status_state_ = ResultStatusState::kNone;
210   }
211 }
212 
MaybeStartResolvingLocked()213 void PollingResolver::MaybeStartResolvingLocked() {
214   // If there is an existing timer, the time it fires is the earliest time we
215   // can start the next resolution.
216   if (next_resolution_timer_handle_.has_value()) return;
217   if (last_resolution_timestamp_.has_value()) {
218     // InvalidateNow to avoid getting stuck re-initializing this timer
219     // in a loop while draining the currently-held WorkSerializer.
220     // Also see https://github.com/grpc/grpc/issues/26079.
221     ExecCtx::Get()->InvalidateNow();
222     const Timestamp earliest_next_resolution =
223         *last_resolution_timestamp_ + min_time_between_resolutions_;
224     const Duration time_until_next_resolution =
225         earliest_next_resolution - Timestamp::Now();
226     if (time_until_next_resolution > Duration::Zero()) {
227       if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
228         const Duration last_resolution_ago =
229             Timestamp::Now() - *last_resolution_timestamp_;
230         LOG(INFO) << "[polling resolver " << this
231                   << "] in cooldown from last resolution (from "
232                   << last_resolution_ago.millis()
233                   << " ms ago); will resolve again in "
234                   << time_until_next_resolution.millis() << " ms";
235       }
236       ScheduleNextResolutionTimer(time_until_next_resolution);
237       return;
238     }
239   }
240   StartResolvingLocked();
241 }
242 
StartResolvingLocked()243 void PollingResolver::StartResolvingLocked() {
244   request_ = StartRequest();
245   last_resolution_timestamp_ = Timestamp::Now();
246   if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
247     if (request_ != nullptr) {
248       LOG(INFO) << "[polling resolver " << this
249                 << "] starting resolution, request_=" << request_.get();
250     } else {
251       LOG(INFO) << "[polling resolver " << this << "] StartRequest failed";
252     }
253   }
254 }
255 
256 }  // namespace grpc_core
257