• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/impl/connectivity_state.h>
18 #include <grpc/support/port_platform.h>
19 #include <inttypes.h>
20 #include <stdlib.h>
21 
22 #include <algorithm>
23 #include <atomic>
24 #include <memory>
25 #include <string>
26 #include <utility>
27 #include <vector>
28 
29 #include "absl/log/check.h"
30 #include "absl/log/log.h"
31 #include "absl/meta/type_traits.h"
32 #include "absl/random/random.h"
33 #include "absl/status/status.h"
34 #include "absl/status/statusor.h"
35 #include "absl/strings/str_cat.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/types/optional.h"
38 #include "src/core/config/core_configuration.h"
39 #include "src/core/lib/channel/channel_args.h"
40 #include "src/core/lib/debug/trace.h"
41 #include "src/core/lib/transport/connectivity_state.h"
42 #include "src/core/load_balancing/endpoint_list.h"
43 #include "src/core/load_balancing/lb_policy.h"
44 #include "src/core/load_balancing/lb_policy_factory.h"
45 #include "src/core/resolver/endpoint_addresses.h"
46 #include "src/core/util/debug_location.h"
47 #include "src/core/util/json/json.h"
48 #include "src/core/util/orphanable.h"
49 #include "src/core/util/ref_counted_ptr.h"
50 #include "src/core/util/work_serializer.h"
51 
52 namespace grpc_core {
53 
54 namespace {
55 
56 constexpr absl::string_view kRoundRobin = "round_robin";
57 
58 class RoundRobin final : public LoadBalancingPolicy {
59  public:
60   explicit RoundRobin(Args args);
61 
name() const62   absl::string_view name() const override { return kRoundRobin; }
63 
64   absl::Status UpdateLocked(UpdateArgs args) override;
65   void ResetBackoffLocked() override;
66 
67  private:
68   class RoundRobinEndpointList final : public EndpointList {
69    public:
RoundRobinEndpointList(RefCountedPtr<RoundRobin> round_robin,EndpointAddressesIterator * endpoints,const ChannelArgs & args,std::string resolution_note,std::vector<std::string> * errors)70     RoundRobinEndpointList(RefCountedPtr<RoundRobin> round_robin,
71                            EndpointAddressesIterator* endpoints,
72                            const ChannelArgs& args, std::string resolution_note,
73                            std::vector<std::string>* errors)
74         : EndpointList(std::move(round_robin), std::move(resolution_note),
75                        GRPC_TRACE_FLAG_ENABLED(round_robin)
76                            ? "RoundRobinEndpointList"
77                            : nullptr) {
78       Init(endpoints, args,
79            [&](RefCountedPtr<EndpointList> endpoint_list,
80                const EndpointAddresses& addresses, const ChannelArgs& args) {
81              return MakeOrphanable<RoundRobinEndpoint>(
82                  std::move(endpoint_list), addresses, args,
83                  policy<RoundRobin>()->work_serializer(), errors);
84            });
85     }
86 
87    private:
88     class RoundRobinEndpoint final : public Endpoint {
89      public:
RoundRobinEndpoint(RefCountedPtr<EndpointList> endpoint_list,const EndpointAddresses & addresses,const ChannelArgs & args,std::shared_ptr<WorkSerializer> work_serializer,std::vector<std::string> * errors)90       RoundRobinEndpoint(RefCountedPtr<EndpointList> endpoint_list,
91                          const EndpointAddresses& addresses,
92                          const ChannelArgs& args,
93                          std::shared_ptr<WorkSerializer> work_serializer,
94                          std::vector<std::string>* errors)
95           : Endpoint(std::move(endpoint_list)) {
96         absl::Status status = Init(addresses, args, std::move(work_serializer));
97         if (!status.ok()) {
98           errors->emplace_back(absl::StrCat("endpoint ", addresses.ToString(),
99                                             ": ", status.ToString()));
100         }
101       }
102 
103      private:
104       // Called when the child policy reports a connectivity state update.
105       void OnStateUpdate(absl::optional<grpc_connectivity_state> old_state,
106                          grpc_connectivity_state new_state,
107                          const absl::Status& status) override;
108     };
109 
channel_control_helper() const110     LoadBalancingPolicy::ChannelControlHelper* channel_control_helper()
111         const override {
112       return policy<RoundRobin>()->channel_control_helper();
113     }
114 
115     // Updates the counters of children in each state when a
116     // child transitions from old_state to new_state.
117     void UpdateStateCountersLocked(
118         absl::optional<grpc_connectivity_state> old_state,
119         grpc_connectivity_state new_state);
120 
121     // Ensures that the right child list is used and then updates
122     // the RR policy's connectivity state based on the child list's
123     // state counters.
124     void MaybeUpdateRoundRobinConnectivityStateLocked(
125         absl::Status status_for_tf);
126 
CountersString() const127     std::string CountersString() const {
128       return absl::StrCat("num_children=", size(), " num_ready=", num_ready_,
129                           " num_connecting=", num_connecting_,
130                           " num_transient_failure=", num_transient_failure_);
131     }
132 
133     size_t num_ready_ = 0;
134     size_t num_connecting_ = 0;
135     size_t num_transient_failure_ = 0;
136 
137     absl::Status last_failure_;
138   };
139 
140   class Picker final : public SubchannelPicker {
141    public:
142     Picker(RoundRobin* parent,
143            std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>>
144                pickers);
145 
146     PickResult Pick(PickArgs args) override;
147 
148    private:
149     // Using pointer value only, no ref held -- do not dereference!
150     RoundRobin* parent_;
151 
152     std::atomic<size_t> last_picked_index_;
153     std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers_;
154   };
155 
156   ~RoundRobin() override;
157 
158   void ShutdownLocked() override;
159 
160   // Current child list.
161   OrphanablePtr<RoundRobinEndpointList> endpoint_list_;
162   // Latest pending child list.
163   // When we get an updated address list, we create a new child list
164   // for it here, and we wait to swap it into endpoint_list_ until the new
165   // list becomes READY.
166   OrphanablePtr<RoundRobinEndpointList> latest_pending_endpoint_list_;
167 
168   bool shutdown_ = false;
169 
170   absl::BitGen bit_gen_;
171 };
172 
173 //
174 // RoundRobin::Picker
175 //
176 
Picker(RoundRobin * parent,std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers)177 RoundRobin::Picker::Picker(
178     RoundRobin* parent,
179     std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers)
180     : parent_(parent), pickers_(std::move(pickers)) {
181   // For discussion on why we generate a random starting index for
182   // the picker, see https://github.com/grpc/grpc-go/issues/2580.
183   size_t index = absl::Uniform<size_t>(parent->bit_gen_, 0, pickers_.size());
184   last_picked_index_.store(index, std::memory_order_relaxed);
185   GRPC_TRACE_LOG(round_robin, INFO)
186       << "[RR " << parent_ << " picker " << this
187       << "] created picker from endpoint_list=" << parent_->endpoint_list_.get()
188       << " with " << pickers_.size()
189       << " READY children; last_picked_index_=" << index;
190 }
191 
Pick(PickArgs args)192 RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs args) {
193   size_t index = last_picked_index_.fetch_add(1, std::memory_order_relaxed) %
194                  pickers_.size();
195   GRPC_TRACE_LOG(round_robin, INFO)
196       << "[RR " << parent_ << " picker " << this << "] using picker index "
197       << index << ", picker=" << pickers_[index].get();
198   return pickers_[index]->Pick(args);
199 }
200 
201 //
202 // RoundRobin
203 //
204 
RoundRobin(Args args)205 RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) {
206   GRPC_TRACE_LOG(round_robin, INFO) << "[RR " << this << "] Created";
207 }
208 
~RoundRobin()209 RoundRobin::~RoundRobin() {
210   GRPC_TRACE_LOG(round_robin, INFO)
211       << "[RR " << this << "] Destroying Round Robin policy";
212   CHECK(endpoint_list_ == nullptr);
213   CHECK(latest_pending_endpoint_list_ == nullptr);
214 }
215 
ShutdownLocked()216 void RoundRobin::ShutdownLocked() {
217   GRPC_TRACE_LOG(round_robin, INFO) << "[RR " << this << "] Shutting down";
218   shutdown_ = true;
219   endpoint_list_.reset();
220   latest_pending_endpoint_list_.reset();
221 }
222 
ResetBackoffLocked()223 void RoundRobin::ResetBackoffLocked() {
224   endpoint_list_->ResetBackoffLocked();
225   if (latest_pending_endpoint_list_ != nullptr) {
226     latest_pending_endpoint_list_->ResetBackoffLocked();
227   }
228 }
229 
UpdateLocked(UpdateArgs args)230 absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
231   EndpointAddressesIterator* addresses = nullptr;
232   if (args.addresses.ok()) {
233     GRPC_TRACE_LOG(round_robin, INFO) << "[RR " << this << "] received update";
234     addresses = args.addresses->get();
235   } else {
236     GRPC_TRACE_LOG(round_robin, INFO)
237         << "[RR " << this
238         << "] received update with address error: " << args.addresses.status();
239     // If we already have a child list, then keep using the existing
240     // list, but still report back that the update was not accepted.
241     if (endpoint_list_ != nullptr) return args.addresses.status();
242   }
243   // Create new child list, replacing the previous pending list, if any.
244   if (GRPC_TRACE_FLAG_ENABLED(round_robin) &&
245       latest_pending_endpoint_list_ != nullptr) {
246     LOG(INFO) << "[RR " << this << "] replacing previous pending child list "
247               << latest_pending_endpoint_list_.get();
248   }
249   std::vector<std::string> errors;
250   latest_pending_endpoint_list_ = MakeOrphanable<RoundRobinEndpointList>(
251       RefAsSubclass<RoundRobin>(DEBUG_LOCATION, "RoundRobinEndpointList"),
252       addresses, args.args, std::move(args.resolution_note), &errors);
253   // If the new list is empty, immediately promote it to
254   // endpoint_list_ and report TRANSIENT_FAILURE.
255   if (latest_pending_endpoint_list_->size() == 0) {
256     if (GRPC_TRACE_FLAG_ENABLED(round_robin) && endpoint_list_ != nullptr) {
257       LOG(INFO) << "[RR " << this << "] replacing previous child list "
258                 << endpoint_list_.get();
259     }
260     endpoint_list_ = std::move(latest_pending_endpoint_list_);
261     absl::Status status = args.addresses.ok()
262                               ? absl::UnavailableError("empty address list")
263                               : args.addresses.status();
264     endpoint_list_->ReportTransientFailure(status);
265     return status;
266   }
267   // Otherwise, if this is the initial update, immediately promote it to
268   // endpoint_list_.
269   if (endpoint_list_ == nullptr) {
270     endpoint_list_ = std::move(latest_pending_endpoint_list_);
271   }
272   if (!errors.empty()) {
273     return absl::UnavailableError(absl::StrCat(
274         "errors from children: [", absl::StrJoin(errors, "; "), "]"));
275   }
276   return absl::OkStatus();
277 }
278 
279 //
280 // RoundRobin::RoundRobinEndpointList::RoundRobinEndpoint
281 //
282 
OnStateUpdate(absl::optional<grpc_connectivity_state> old_state,grpc_connectivity_state new_state,const absl::Status & status)283 void RoundRobin::RoundRobinEndpointList::RoundRobinEndpoint::OnStateUpdate(
284     absl::optional<grpc_connectivity_state> old_state,
285     grpc_connectivity_state new_state, const absl::Status& status) {
286   auto* rr_endpoint_list = endpoint_list<RoundRobinEndpointList>();
287   auto* round_robin = policy<RoundRobin>();
288   GRPC_TRACE_LOG(round_robin, INFO)
289       << "[RR " << round_robin << "] connectivity changed for child " << this
290       << ", endpoint_list " << rr_endpoint_list << " (index " << Index()
291       << " of " << rr_endpoint_list->size() << "): prev_state="
292       << (old_state.has_value() ? ConnectivityStateName(*old_state) : "N/A")
293       << " new_state=" << ConnectivityStateName(new_state) << " (" << status
294       << ")";
295   if (new_state == GRPC_CHANNEL_IDLE) {
296     GRPC_TRACE_LOG(round_robin, INFO)
297         << "[RR " << round_robin << "] child " << this
298         << " reported IDLE; requesting connection";
299     ExitIdleLocked();
300   }
301   // If state changed, update state counters.
302   if (!old_state.has_value() || *old_state != new_state) {
303     rr_endpoint_list->UpdateStateCountersLocked(old_state, new_state);
304   }
305   // Update the policy state.
306   rr_endpoint_list->MaybeUpdateRoundRobinConnectivityStateLocked(status);
307 }
308 
309 //
310 // RoundRobin::RoundRobinEndpointList
311 //
312 
UpdateStateCountersLocked(absl::optional<grpc_connectivity_state> old_state,grpc_connectivity_state new_state)313 void RoundRobin::RoundRobinEndpointList::UpdateStateCountersLocked(
314     absl::optional<grpc_connectivity_state> old_state,
315     grpc_connectivity_state new_state) {
316   // We treat IDLE the same as CONNECTING, since it will immediately
317   // transition into that state anyway.
318   if (old_state.has_value()) {
319     CHECK(*old_state != GRPC_CHANNEL_SHUTDOWN);
320     if (*old_state == GRPC_CHANNEL_READY) {
321       CHECK_GT(num_ready_, 0u);
322       --num_ready_;
323     } else if (*old_state == GRPC_CHANNEL_CONNECTING ||
324                *old_state == GRPC_CHANNEL_IDLE) {
325       CHECK_GT(num_connecting_, 0u);
326       --num_connecting_;
327     } else if (*old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
328       CHECK_GT(num_transient_failure_, 0u);
329       --num_transient_failure_;
330     }
331   }
332   CHECK(new_state != GRPC_CHANNEL_SHUTDOWN);
333   if (new_state == GRPC_CHANNEL_READY) {
334     ++num_ready_;
335   } else if (new_state == GRPC_CHANNEL_CONNECTING ||
336              new_state == GRPC_CHANNEL_IDLE) {
337     ++num_connecting_;
338   } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
339     ++num_transient_failure_;
340   }
341 }
342 
343 void RoundRobin::RoundRobinEndpointList::
MaybeUpdateRoundRobinConnectivityStateLocked(absl::Status status_for_tf)344     MaybeUpdateRoundRobinConnectivityStateLocked(absl::Status status_for_tf) {
345   auto* round_robin = policy<RoundRobin>();
346   // If this is latest_pending_endpoint_list_, then swap it into
347   // endpoint_list_ in the following cases:
348   // - endpoint_list_ has no READY children.
349   // - This list has at least one READY child and we have seen the
350   //   initial connectivity state notification for all children.
351   // - All of the children in this list are in TRANSIENT_FAILURE.
352   //   (This may cause the channel to go from READY to TRANSIENT_FAILURE,
353   //   but we're doing what the control plane told us to do.)
354   if (round_robin->latest_pending_endpoint_list_.get() == this &&
355       (round_robin->endpoint_list_->num_ready_ == 0 ||
356        (num_ready_ > 0 && AllEndpointsSeenInitialState()) ||
357        num_transient_failure_ == size())) {
358     if (GRPC_TRACE_FLAG_ENABLED(round_robin)) {
359       LOG(INFO) << "[RR " << round_robin << "] swapping out child list "
360                 << round_robin->endpoint_list_.get() << " ("
361                 << round_robin->endpoint_list_->CountersString()
362                 << ") in favor of " << this << " (" << CountersString() << ")";
363     }
364     round_robin->endpoint_list_ =
365         std::move(round_robin->latest_pending_endpoint_list_);
366   }
367   // Only set connectivity state if this is the current child list.
368   if (round_robin->endpoint_list_.get() != this) return;
369   // First matching rule wins:
370   // 1) ANY child is READY => policy is READY.
371   // 2) ANY child is CONNECTING => policy is CONNECTING.
372   // 3) ALL children are TRANSIENT_FAILURE => policy is TRANSIENT_FAILURE.
373   if (num_ready_ > 0) {
374     GRPC_TRACE_LOG(round_robin, INFO)
375         << "[RR " << round_robin << "] reporting READY with child list "
376         << this;
377     std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers;
378     for (const auto& endpoint : endpoints()) {
379       auto state = endpoint->connectivity_state();
380       if (state.has_value() && *state == GRPC_CHANNEL_READY) {
381         pickers.push_back(endpoint->picker());
382       }
383     }
384     CHECK(!pickers.empty());
385     round_robin->channel_control_helper()->UpdateState(
386         GRPC_CHANNEL_READY, absl::OkStatus(),
387         MakeRefCounted<Picker>(round_robin, std::move(pickers)));
388   } else if (num_connecting_ > 0) {
389     GRPC_TRACE_LOG(round_robin, INFO)
390         << "[RR " << round_robin << "] reporting CONNECTING with child list "
391         << this;
392     round_robin->channel_control_helper()->UpdateState(
393         GRPC_CHANNEL_CONNECTING, absl::OkStatus(),
394         MakeRefCounted<QueuePicker>(nullptr));
395   } else if (num_transient_failure_ == size()) {
396     GRPC_TRACE_LOG(round_robin, INFO)
397         << "[RR " << round_robin
398         << "] reporting TRANSIENT_FAILURE with child list " << this << ": "
399         << status_for_tf;
400     if (!status_for_tf.ok()) {
401       last_failure_ = absl::UnavailableError(
402           absl::StrCat("connections to all backends failing; last error: ",
403                        status_for_tf.message()));
404     }
405     ReportTransientFailure(last_failure_);
406   }
407 }
408 
409 //
410 // factory
411 //
412 
413 class RoundRobinConfig final : public LoadBalancingPolicy::Config {
414  public:
name() const415   absl::string_view name() const override { return kRoundRobin; }
416 };
417 
418 class RoundRobinFactory final : public LoadBalancingPolicyFactory {
419  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const420   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
421       LoadBalancingPolicy::Args args) const override {
422     return MakeOrphanable<RoundRobin>(std::move(args));
423   }
424 
name() const425   absl::string_view name() const override { return kRoundRobin; }
426 
427   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json &) const428   ParseLoadBalancingConfig(const Json& /*json*/) const override {
429     return MakeRefCounted<RoundRobinConfig>();
430   }
431 };
432 
433 }  // namespace
434 
RegisterRoundRobinLbPolicy(CoreConfiguration::Builder * builder)435 void RegisterRoundRobinLbPolicy(CoreConfiguration::Builder* builder) {
436   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
437       std::make_unique<RoundRobinFactory>());
438 }
439 
440 }  // namespace grpc_core
441