1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/impl/connectivity_state.h>
18 #include <grpc/support/port_platform.h>
19 #include <inttypes.h>
20 #include <stdlib.h>
21
22 #include <algorithm>
23 #include <atomic>
24 #include <memory>
25 #include <string>
26 #include <utility>
27 #include <vector>
28
29 #include "absl/log/check.h"
30 #include "absl/log/log.h"
31 #include "absl/meta/type_traits.h"
32 #include "absl/random/random.h"
33 #include "absl/status/status.h"
34 #include "absl/status/statusor.h"
35 #include "absl/strings/str_cat.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/types/optional.h"
38 #include "src/core/config/core_configuration.h"
39 #include "src/core/lib/channel/channel_args.h"
40 #include "src/core/lib/debug/trace.h"
41 #include "src/core/lib/transport/connectivity_state.h"
42 #include "src/core/load_balancing/endpoint_list.h"
43 #include "src/core/load_balancing/lb_policy.h"
44 #include "src/core/load_balancing/lb_policy_factory.h"
45 #include "src/core/resolver/endpoint_addresses.h"
46 #include "src/core/util/debug_location.h"
47 #include "src/core/util/json/json.h"
48 #include "src/core/util/orphanable.h"
49 #include "src/core/util/ref_counted_ptr.h"
50 #include "src/core/util/work_serializer.h"
51
52 namespace grpc_core {
53
54 namespace {
55
56 constexpr absl::string_view kRoundRobin = "round_robin";
57
58 class RoundRobin final : public LoadBalancingPolicy {
59 public:
60 explicit RoundRobin(Args args);
61
name() const62 absl::string_view name() const override { return kRoundRobin; }
63
64 absl::Status UpdateLocked(UpdateArgs args) override;
65 void ResetBackoffLocked() override;
66
67 private:
68 class RoundRobinEndpointList final : public EndpointList {
69 public:
RoundRobinEndpointList(RefCountedPtr<RoundRobin> round_robin,EndpointAddressesIterator * endpoints,const ChannelArgs & args,std::string resolution_note,std::vector<std::string> * errors)70 RoundRobinEndpointList(RefCountedPtr<RoundRobin> round_robin,
71 EndpointAddressesIterator* endpoints,
72 const ChannelArgs& args, std::string resolution_note,
73 std::vector<std::string>* errors)
74 : EndpointList(std::move(round_robin), std::move(resolution_note),
75 GRPC_TRACE_FLAG_ENABLED(round_robin)
76 ? "RoundRobinEndpointList"
77 : nullptr) {
78 Init(endpoints, args,
79 [&](RefCountedPtr<EndpointList> endpoint_list,
80 const EndpointAddresses& addresses, const ChannelArgs& args) {
81 return MakeOrphanable<RoundRobinEndpoint>(
82 std::move(endpoint_list), addresses, args,
83 policy<RoundRobin>()->work_serializer(), errors);
84 });
85 }
86
87 private:
88 class RoundRobinEndpoint final : public Endpoint {
89 public:
RoundRobinEndpoint(RefCountedPtr<EndpointList> endpoint_list,const EndpointAddresses & addresses,const ChannelArgs & args,std::shared_ptr<WorkSerializer> work_serializer,std::vector<std::string> * errors)90 RoundRobinEndpoint(RefCountedPtr<EndpointList> endpoint_list,
91 const EndpointAddresses& addresses,
92 const ChannelArgs& args,
93 std::shared_ptr<WorkSerializer> work_serializer,
94 std::vector<std::string>* errors)
95 : Endpoint(std::move(endpoint_list)) {
96 absl::Status status = Init(addresses, args, std::move(work_serializer));
97 if (!status.ok()) {
98 errors->emplace_back(absl::StrCat("endpoint ", addresses.ToString(),
99 ": ", status.ToString()));
100 }
101 }
102
103 private:
104 // Called when the child policy reports a connectivity state update.
105 void OnStateUpdate(absl::optional<grpc_connectivity_state> old_state,
106 grpc_connectivity_state new_state,
107 const absl::Status& status) override;
108 };
109
channel_control_helper() const110 LoadBalancingPolicy::ChannelControlHelper* channel_control_helper()
111 const override {
112 return policy<RoundRobin>()->channel_control_helper();
113 }
114
115 // Updates the counters of children in each state when a
116 // child transitions from old_state to new_state.
117 void UpdateStateCountersLocked(
118 absl::optional<grpc_connectivity_state> old_state,
119 grpc_connectivity_state new_state);
120
121 // Ensures that the right child list is used and then updates
122 // the RR policy's connectivity state based on the child list's
123 // state counters.
124 void MaybeUpdateRoundRobinConnectivityStateLocked(
125 absl::Status status_for_tf);
126
CountersString() const127 std::string CountersString() const {
128 return absl::StrCat("num_children=", size(), " num_ready=", num_ready_,
129 " num_connecting=", num_connecting_,
130 " num_transient_failure=", num_transient_failure_);
131 }
132
133 size_t num_ready_ = 0;
134 size_t num_connecting_ = 0;
135 size_t num_transient_failure_ = 0;
136
137 absl::Status last_failure_;
138 };
139
140 class Picker final : public SubchannelPicker {
141 public:
142 Picker(RoundRobin* parent,
143 std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>>
144 pickers);
145
146 PickResult Pick(PickArgs args) override;
147
148 private:
149 // Using pointer value only, no ref held -- do not dereference!
150 RoundRobin* parent_;
151
152 std::atomic<size_t> last_picked_index_;
153 std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers_;
154 };
155
156 ~RoundRobin() override;
157
158 void ShutdownLocked() override;
159
160 // Current child list.
161 OrphanablePtr<RoundRobinEndpointList> endpoint_list_;
162 // Latest pending child list.
163 // When we get an updated address list, we create a new child list
164 // for it here, and we wait to swap it into endpoint_list_ until the new
165 // list becomes READY.
166 OrphanablePtr<RoundRobinEndpointList> latest_pending_endpoint_list_;
167
168 bool shutdown_ = false;
169
170 absl::BitGen bit_gen_;
171 };
172
173 //
174 // RoundRobin::Picker
175 //
176
Picker(RoundRobin * parent,std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers)177 RoundRobin::Picker::Picker(
178 RoundRobin* parent,
179 std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers)
180 : parent_(parent), pickers_(std::move(pickers)) {
181 // For discussion on why we generate a random starting index for
182 // the picker, see https://github.com/grpc/grpc-go/issues/2580.
183 size_t index = absl::Uniform<size_t>(parent->bit_gen_, 0, pickers_.size());
184 last_picked_index_.store(index, std::memory_order_relaxed);
185 GRPC_TRACE_LOG(round_robin, INFO)
186 << "[RR " << parent_ << " picker " << this
187 << "] created picker from endpoint_list=" << parent_->endpoint_list_.get()
188 << " with " << pickers_.size()
189 << " READY children; last_picked_index_=" << index;
190 }
191
Pick(PickArgs args)192 RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs args) {
193 size_t index = last_picked_index_.fetch_add(1, std::memory_order_relaxed) %
194 pickers_.size();
195 GRPC_TRACE_LOG(round_robin, INFO)
196 << "[RR " << parent_ << " picker " << this << "] using picker index "
197 << index << ", picker=" << pickers_[index].get();
198 return pickers_[index]->Pick(args);
199 }
200
201 //
202 // RoundRobin
203 //
204
RoundRobin(Args args)205 RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) {
206 GRPC_TRACE_LOG(round_robin, INFO) << "[RR " << this << "] Created";
207 }
208
~RoundRobin()209 RoundRobin::~RoundRobin() {
210 GRPC_TRACE_LOG(round_robin, INFO)
211 << "[RR " << this << "] Destroying Round Robin policy";
212 CHECK(endpoint_list_ == nullptr);
213 CHECK(latest_pending_endpoint_list_ == nullptr);
214 }
215
ShutdownLocked()216 void RoundRobin::ShutdownLocked() {
217 GRPC_TRACE_LOG(round_robin, INFO) << "[RR " << this << "] Shutting down";
218 shutdown_ = true;
219 endpoint_list_.reset();
220 latest_pending_endpoint_list_.reset();
221 }
222
ResetBackoffLocked()223 void RoundRobin::ResetBackoffLocked() {
224 endpoint_list_->ResetBackoffLocked();
225 if (latest_pending_endpoint_list_ != nullptr) {
226 latest_pending_endpoint_list_->ResetBackoffLocked();
227 }
228 }
229
UpdateLocked(UpdateArgs args)230 absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
231 EndpointAddressesIterator* addresses = nullptr;
232 if (args.addresses.ok()) {
233 GRPC_TRACE_LOG(round_robin, INFO) << "[RR " << this << "] received update";
234 addresses = args.addresses->get();
235 } else {
236 GRPC_TRACE_LOG(round_robin, INFO)
237 << "[RR " << this
238 << "] received update with address error: " << args.addresses.status();
239 // If we already have a child list, then keep using the existing
240 // list, but still report back that the update was not accepted.
241 if (endpoint_list_ != nullptr) return args.addresses.status();
242 }
243 // Create new child list, replacing the previous pending list, if any.
244 if (GRPC_TRACE_FLAG_ENABLED(round_robin) &&
245 latest_pending_endpoint_list_ != nullptr) {
246 LOG(INFO) << "[RR " << this << "] replacing previous pending child list "
247 << latest_pending_endpoint_list_.get();
248 }
249 std::vector<std::string> errors;
250 latest_pending_endpoint_list_ = MakeOrphanable<RoundRobinEndpointList>(
251 RefAsSubclass<RoundRobin>(DEBUG_LOCATION, "RoundRobinEndpointList"),
252 addresses, args.args, std::move(args.resolution_note), &errors);
253 // If the new list is empty, immediately promote it to
254 // endpoint_list_ and report TRANSIENT_FAILURE.
255 if (latest_pending_endpoint_list_->size() == 0) {
256 if (GRPC_TRACE_FLAG_ENABLED(round_robin) && endpoint_list_ != nullptr) {
257 LOG(INFO) << "[RR " << this << "] replacing previous child list "
258 << endpoint_list_.get();
259 }
260 endpoint_list_ = std::move(latest_pending_endpoint_list_);
261 absl::Status status = args.addresses.ok()
262 ? absl::UnavailableError("empty address list")
263 : args.addresses.status();
264 endpoint_list_->ReportTransientFailure(status);
265 return status;
266 }
267 // Otherwise, if this is the initial update, immediately promote it to
268 // endpoint_list_.
269 if (endpoint_list_ == nullptr) {
270 endpoint_list_ = std::move(latest_pending_endpoint_list_);
271 }
272 if (!errors.empty()) {
273 return absl::UnavailableError(absl::StrCat(
274 "errors from children: [", absl::StrJoin(errors, "; "), "]"));
275 }
276 return absl::OkStatus();
277 }
278
279 //
280 // RoundRobin::RoundRobinEndpointList::RoundRobinEndpoint
281 //
282
OnStateUpdate(absl::optional<grpc_connectivity_state> old_state,grpc_connectivity_state new_state,const absl::Status & status)283 void RoundRobin::RoundRobinEndpointList::RoundRobinEndpoint::OnStateUpdate(
284 absl::optional<grpc_connectivity_state> old_state,
285 grpc_connectivity_state new_state, const absl::Status& status) {
286 auto* rr_endpoint_list = endpoint_list<RoundRobinEndpointList>();
287 auto* round_robin = policy<RoundRobin>();
288 GRPC_TRACE_LOG(round_robin, INFO)
289 << "[RR " << round_robin << "] connectivity changed for child " << this
290 << ", endpoint_list " << rr_endpoint_list << " (index " << Index()
291 << " of " << rr_endpoint_list->size() << "): prev_state="
292 << (old_state.has_value() ? ConnectivityStateName(*old_state) : "N/A")
293 << " new_state=" << ConnectivityStateName(new_state) << " (" << status
294 << ")";
295 if (new_state == GRPC_CHANNEL_IDLE) {
296 GRPC_TRACE_LOG(round_robin, INFO)
297 << "[RR " << round_robin << "] child " << this
298 << " reported IDLE; requesting connection";
299 ExitIdleLocked();
300 }
301 // If state changed, update state counters.
302 if (!old_state.has_value() || *old_state != new_state) {
303 rr_endpoint_list->UpdateStateCountersLocked(old_state, new_state);
304 }
305 // Update the policy state.
306 rr_endpoint_list->MaybeUpdateRoundRobinConnectivityStateLocked(status);
307 }
308
309 //
310 // RoundRobin::RoundRobinEndpointList
311 //
312
UpdateStateCountersLocked(absl::optional<grpc_connectivity_state> old_state,grpc_connectivity_state new_state)313 void RoundRobin::RoundRobinEndpointList::UpdateStateCountersLocked(
314 absl::optional<grpc_connectivity_state> old_state,
315 grpc_connectivity_state new_state) {
316 // We treat IDLE the same as CONNECTING, since it will immediately
317 // transition into that state anyway.
318 if (old_state.has_value()) {
319 CHECK(*old_state != GRPC_CHANNEL_SHUTDOWN);
320 if (*old_state == GRPC_CHANNEL_READY) {
321 CHECK_GT(num_ready_, 0u);
322 --num_ready_;
323 } else if (*old_state == GRPC_CHANNEL_CONNECTING ||
324 *old_state == GRPC_CHANNEL_IDLE) {
325 CHECK_GT(num_connecting_, 0u);
326 --num_connecting_;
327 } else if (*old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
328 CHECK_GT(num_transient_failure_, 0u);
329 --num_transient_failure_;
330 }
331 }
332 CHECK(new_state != GRPC_CHANNEL_SHUTDOWN);
333 if (new_state == GRPC_CHANNEL_READY) {
334 ++num_ready_;
335 } else if (new_state == GRPC_CHANNEL_CONNECTING ||
336 new_state == GRPC_CHANNEL_IDLE) {
337 ++num_connecting_;
338 } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
339 ++num_transient_failure_;
340 }
341 }
342
343 void RoundRobin::RoundRobinEndpointList::
MaybeUpdateRoundRobinConnectivityStateLocked(absl::Status status_for_tf)344 MaybeUpdateRoundRobinConnectivityStateLocked(absl::Status status_for_tf) {
345 auto* round_robin = policy<RoundRobin>();
346 // If this is latest_pending_endpoint_list_, then swap it into
347 // endpoint_list_ in the following cases:
348 // - endpoint_list_ has no READY children.
349 // - This list has at least one READY child and we have seen the
350 // initial connectivity state notification for all children.
351 // - All of the children in this list are in TRANSIENT_FAILURE.
352 // (This may cause the channel to go from READY to TRANSIENT_FAILURE,
353 // but we're doing what the control plane told us to do.)
354 if (round_robin->latest_pending_endpoint_list_.get() == this &&
355 (round_robin->endpoint_list_->num_ready_ == 0 ||
356 (num_ready_ > 0 && AllEndpointsSeenInitialState()) ||
357 num_transient_failure_ == size())) {
358 if (GRPC_TRACE_FLAG_ENABLED(round_robin)) {
359 LOG(INFO) << "[RR " << round_robin << "] swapping out child list "
360 << round_robin->endpoint_list_.get() << " ("
361 << round_robin->endpoint_list_->CountersString()
362 << ") in favor of " << this << " (" << CountersString() << ")";
363 }
364 round_robin->endpoint_list_ =
365 std::move(round_robin->latest_pending_endpoint_list_);
366 }
367 // Only set connectivity state if this is the current child list.
368 if (round_robin->endpoint_list_.get() != this) return;
369 // First matching rule wins:
370 // 1) ANY child is READY => policy is READY.
371 // 2) ANY child is CONNECTING => policy is CONNECTING.
372 // 3) ALL children are TRANSIENT_FAILURE => policy is TRANSIENT_FAILURE.
373 if (num_ready_ > 0) {
374 GRPC_TRACE_LOG(round_robin, INFO)
375 << "[RR " << round_robin << "] reporting READY with child list "
376 << this;
377 std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers;
378 for (const auto& endpoint : endpoints()) {
379 auto state = endpoint->connectivity_state();
380 if (state.has_value() && *state == GRPC_CHANNEL_READY) {
381 pickers.push_back(endpoint->picker());
382 }
383 }
384 CHECK(!pickers.empty());
385 round_robin->channel_control_helper()->UpdateState(
386 GRPC_CHANNEL_READY, absl::OkStatus(),
387 MakeRefCounted<Picker>(round_robin, std::move(pickers)));
388 } else if (num_connecting_ > 0) {
389 GRPC_TRACE_LOG(round_robin, INFO)
390 << "[RR " << round_robin << "] reporting CONNECTING with child list "
391 << this;
392 round_robin->channel_control_helper()->UpdateState(
393 GRPC_CHANNEL_CONNECTING, absl::OkStatus(),
394 MakeRefCounted<QueuePicker>(nullptr));
395 } else if (num_transient_failure_ == size()) {
396 GRPC_TRACE_LOG(round_robin, INFO)
397 << "[RR " << round_robin
398 << "] reporting TRANSIENT_FAILURE with child list " << this << ": "
399 << status_for_tf;
400 if (!status_for_tf.ok()) {
401 last_failure_ = absl::UnavailableError(
402 absl::StrCat("connections to all backends failing; last error: ",
403 status_for_tf.message()));
404 }
405 ReportTransientFailure(last_failure_);
406 }
407 }
408
409 //
410 // factory
411 //
412
413 class RoundRobinConfig final : public LoadBalancingPolicy::Config {
414 public:
name() const415 absl::string_view name() const override { return kRoundRobin; }
416 };
417
418 class RoundRobinFactory final : public LoadBalancingPolicyFactory {
419 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const420 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
421 LoadBalancingPolicy::Args args) const override {
422 return MakeOrphanable<RoundRobin>(std::move(args));
423 }
424
name() const425 absl::string_view name() const override { return kRoundRobin; }
426
427 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json &) const428 ParseLoadBalancingConfig(const Json& /*json*/) const override {
429 return MakeRefCounted<RoundRobinConfig>();
430 }
431 };
432
433 } // namespace
434
RegisterRoundRobinLbPolicy(CoreConfiguration::Builder * builder)435 void RegisterRoundRobinLbPolicy(CoreConfiguration::Builder* builder) {
436 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
437 std::make_unique<RoundRobinFactory>());
438 }
439
440 } // namespace grpc_core
441