• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2016 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 /// Implementation of the gRPC LB policy.
18 ///
19 /// This policy takes as input a list of resolved addresses, which must
20 /// include at least one balancer address.
21 ///
22 /// An internal channel (\a lb_channel_) is created for the addresses
23 /// from that are balancers.  This channel behaves just like a regular
24 /// channel that uses pick_first to select from the list of balancer
25 /// addresses.
26 ///
27 /// When we get our initial update, we instantiate the internal *streaming*
28 /// call to the LB server (whichever address pick_first chose).  The call
29 /// will be complete when either the balancer sends status or when we cancel
30 /// the call (e.g., because we are shutting down).  In needed, we retry the
31 /// call.  If we received at least one valid message from the server, a new
32 /// call attempt will be made immediately; otherwise, we apply back-off
33 /// delays between attempts.
34 ///
35 /// We maintain an internal round_robin policy instance for distributing
36 /// requests across backends.  Whenever we receive a new serverlist from
37 /// the balancer, we update the round_robin policy with the new list of
38 /// addresses.  If we cannot communicate with the balancer on startup,
39 /// however, we may enter fallback mode, in which case we will populate
40 /// the child policy's addresses from the backend addresses returned by the
41 /// resolver.
42 ///
43 /// Once a child policy instance is in place (and getting updated as described),
44 /// calls for a pick, a ping, or a cancellation will be serviced right
45 /// away by forwarding them to the child policy instance.  Any time there's no
46 /// child policy available (i.e., right after the creation of the gRPCLB
47 /// policy), pick requests are queued.
48 ///
49 /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
50 /// high level design and details.
51 
52 #include "src/core/load_balancing/grpclb/grpclb.h"
53 
54 #include <grpc/event_engine/event_engine.h>
55 #include <grpc/impl/channel_arg_names.h>
56 #include <grpc/support/json.h>
57 #include <grpc/support/port_platform.h>
58 
59 // IWYU pragma: no_include <sys/socket.h>
60 
61 #include <grpc/byte_buffer.h>
62 #include <grpc/byte_buffer_reader.h>
63 #include <grpc/grpc.h>
64 #include <grpc/impl/connectivity_state.h>
65 #include <grpc/impl/propagation_bits.h>
66 #include <grpc/slice.h>
67 #include <grpc/status.h>
68 #include <grpc/support/alloc.h>
69 #include <inttypes.h>
70 #include <string.h>
71 
72 #include <algorithm>
73 #include <atomic>
74 #include <map>
75 #include <memory>
76 #include <string>
77 #include <type_traits>
78 #include <utility>
79 #include <vector>
80 
81 #include "absl/container/inlined_vector.h"
82 #include "absl/functional/function_ref.h"
83 #include "absl/log/check.h"
84 #include "absl/log/globals.h"
85 #include "absl/log/log.h"
86 #include "absl/status/status.h"
87 #include "absl/status/statusor.h"
88 #include "absl/strings/str_cat.h"
89 #include "absl/strings/str_format.h"
90 #include "absl/strings/str_join.h"
91 #include "absl/strings/string_view.h"
92 #include "absl/types/optional.h"
93 #include "absl/types/variant.h"
94 #include "src/core/channelz/channelz.h"
95 #include "src/core/client_channel/client_channel_filter.h"
96 #include "src/core/config/core_configuration.h"
97 #include "src/core/lib/address_utils/sockaddr_utils.h"
98 #include "src/core/lib/channel/channel_args.h"
99 #include "src/core/lib/debug/trace.h"
100 #include "src/core/lib/experiments/experiments.h"
101 #include "src/core/lib/iomgr/closure.h"
102 #include "src/core/lib/iomgr/error.h"
103 #include "src/core/lib/iomgr/exec_ctx.h"
104 #include "src/core/lib/iomgr/pollset_set.h"
105 #include "src/core/lib/iomgr/resolved_address.h"
106 #include "src/core/lib/iomgr/sockaddr.h"
107 #include "src/core/lib/iomgr/socket_utils.h"
108 #include "src/core/lib/security/credentials/credentials.h"
109 #include "src/core/lib/slice/slice.h"
110 #include "src/core/lib/slice/slice_string_helpers.h"
111 #include "src/core/lib/surface/call.h"
112 #include "src/core/lib/surface/channel.h"
113 #include "src/core/lib/surface/channel_stack_type.h"
114 #include "src/core/lib/transport/connectivity_state.h"
115 #include "src/core/lib/transport/metadata_batch.h"
116 #include "src/core/load_balancing/child_policy_handler.h"
117 #include "src/core/load_balancing/delegating_helper.h"
118 #include "src/core/load_balancing/grpclb/client_load_reporting_filter.h"
119 #include "src/core/load_balancing/grpclb/grpclb_balancer_addresses.h"
120 #include "src/core/load_balancing/grpclb/grpclb_client_stats.h"
121 #include "src/core/load_balancing/grpclb/load_balancer_api.h"
122 #include "src/core/load_balancing/lb_policy.h"
123 #include "src/core/load_balancing/lb_policy_factory.h"
124 #include "src/core/load_balancing/lb_policy_registry.h"
125 #include "src/core/load_balancing/subchannel_interface.h"
126 #include "src/core/resolver/endpoint_addresses.h"
127 #include "src/core/resolver/fake/fake_resolver.h"
128 #include "src/core/resolver/resolver.h"
129 #include "src/core/util/backoff.h"
130 #include "src/core/util/crash.h"
131 #include "src/core/util/debug_location.h"
132 #include "src/core/util/json/json.h"
133 #include "src/core/util/json/json_args.h"
134 #include "src/core/util/json/json_object_loader.h"
135 #include "src/core/util/orphanable.h"
136 #include "src/core/util/ref_counted.h"
137 #include "src/core/util/ref_counted_ptr.h"
138 #include "src/core/util/status_helper.h"
139 #include "src/core/util/string.h"
140 #include "src/core/util/time.h"
141 #include "src/core/util/useful.h"
142 #include "src/core/util/validation_errors.h"
143 #include "src/core/util/work_serializer.h"
144 #include "upb/mem/arena.hpp"
145 
146 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
147 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
148 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
149 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2
150 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
151 #define GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS 10000
152 
153 // Channel arg used to enable load reporting filter.
154 #define GRPC_ARG_GRPCLB_ENABLE_LOAD_REPORTING_FILTER \
155   "grpc.internal.grpclb_enable_load_reporting_filter"
156 
157 namespace grpc_core {
158 
159 namespace {
160 
161 using ::grpc_event_engine::experimental::EventEngine;
162 
163 constexpr absl::string_view kGrpclb = "grpclb";
164 
165 class GrpcLbConfig final : public LoadBalancingPolicy::Config {
166  public:
167   GrpcLbConfig() = default;
168 
169   GrpcLbConfig(const GrpcLbConfig&) = delete;
170   GrpcLbConfig& operator=(const GrpcLbConfig&) = delete;
171 
172   GrpcLbConfig(GrpcLbConfig&& other) = delete;
173   GrpcLbConfig& operator=(GrpcLbConfig&& other) = delete;
174 
JsonLoader(const JsonArgs &)175   static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
176     static const auto* loader =
177         JsonObjectLoader<GrpcLbConfig>()
178             // Note: "childPolicy" field requires custom parsing, so
179             // it's handled in JsonPostLoad() instead.
180             .OptionalField("serviceName", &GrpcLbConfig::service_name_)
181             .Finish();
182     return loader;
183   }
184 
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)185   void JsonPostLoad(const Json& json, const JsonArgs&,
186                     ValidationErrors* errors) {
187     ValidationErrors::ScopedField field(errors, ".childPolicy");
188     Json child_policy_config_json_tmp;
189     const Json* child_policy_config_json;
190     auto it = json.object().find("childPolicy");
191     if (it == json.object().end()) {
192       child_policy_config_json_tmp = Json::FromArray({Json::FromObject({
193           {"round_robin", Json::FromObject({})},
194       })});
195       child_policy_config_json = &child_policy_config_json_tmp;
196     } else {
197       child_policy_config_json = &it->second;
198     }
199     auto child_policy_config =
200         CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
201             *child_policy_config_json);
202     if (!child_policy_config.ok()) {
203       errors->AddError(child_policy_config.status().message());
204       return;
205     }
206     child_policy_ = std::move(*child_policy_config);
207   }
208 
name() const209   absl::string_view name() const override { return kGrpclb; }
210 
child_policy() const211   RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
212     return child_policy_;
213   }
214 
service_name() const215   const std::string& service_name() const { return service_name_; }
216 
217  private:
218   RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
219   std::string service_name_;
220 };
221 
222 class GrpcLb final : public LoadBalancingPolicy {
223  public:
224   explicit GrpcLb(Args args);
225 
name() const226   absl::string_view name() const override { return kGrpclb; }
227 
228   absl::Status UpdateLocked(UpdateArgs args) override;
229   void ResetBackoffLocked() override;
230 
231  private:
232   /// Contains a call to the LB server and all the data related to the call.
233   class BalancerCallState final
234       : public InternallyRefCounted<BalancerCallState> {
235    public:
236     explicit BalancerCallState(
237         RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
238     ~BalancerCallState() override;
239 
240     // It's the caller's responsibility to ensure that Orphan() is called from
241     // inside the combiner.
242     void Orphan() override;
243 
244     void StartQuery();
245 
client_stats() const246     GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
247 
seen_initial_response() const248     bool seen_initial_response() const { return seen_initial_response_; }
seen_serverlist() const249     bool seen_serverlist() const { return seen_serverlist_; }
250 
251    private:
grpclb_policy() const252     GrpcLb* grpclb_policy() const {
253       return static_cast<GrpcLb*>(grpclb_policy_.get());
254     }
255 
256     void ScheduleNextClientLoadReportLocked();
257     void SendClientLoadReportLocked();
258 
259     // EventEngine callbacks
260     void MaybeSendClientLoadReportLocked();
261 
262     static void ClientLoadReportDone(void* arg, grpc_error_handle error);
263     static void OnInitialRequestSent(void* arg, grpc_error_handle error);
264     static void OnBalancerMessageReceived(void* arg, grpc_error_handle error);
265     static void OnBalancerStatusReceived(void* arg, grpc_error_handle error);
266 
267     void ClientLoadReportDoneLocked(grpc_error_handle error);
268     void OnInitialRequestSentLocked();
269     void OnBalancerMessageReceivedLocked();
270     void OnBalancerStatusReceivedLocked(grpc_error_handle error);
271 
272     // The owning LB policy.
273     RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
274 
275     // The streaming call to the LB server. Always non-NULL.
276     grpc_call* lb_call_ = nullptr;
277 
278     // recv_initial_metadata
279     grpc_metadata_array lb_initial_metadata_recv_;
280 
281     // send_message
282     grpc_byte_buffer* send_message_payload_ = nullptr;
283     grpc_closure lb_on_initial_request_sent_;
284 
285     // recv_message
286     grpc_byte_buffer* recv_message_payload_ = nullptr;
287     grpc_closure lb_on_balancer_message_received_;
288     bool seen_initial_response_ = false;
289     bool seen_serverlist_ = false;
290 
291     // recv_trailing_metadata
292     grpc_closure lb_on_balancer_status_received_;
293     grpc_metadata_array lb_trailing_metadata_recv_;
294     grpc_status_code lb_call_status_;
295     grpc_slice lb_call_status_details_;
296 
297     // The stats for client-side load reporting associated with this LB call.
298     // Created after the first serverlist is received.
299     RefCountedPtr<GrpcLbClientStats> client_stats_;
300     Duration client_stats_report_interval_;
301     absl::optional<EventEngine::TaskHandle> client_load_report_handle_;
302     bool last_client_load_report_counters_were_zero_ = false;
303     bool client_load_report_is_due_ = false;
304     // The closure used for the completion of sending the load report.
305     grpc_closure client_load_report_done_closure_;
306   };
307 
308   class SubchannelWrapper final : public DelegatingSubchannel {
309    public:
SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,RefCountedPtr<GrpcLb> lb_policy,grpc_event_engine::experimental::Slice lb_token,RefCountedPtr<GrpcLbClientStats> client_stats)310     SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,
311                       RefCountedPtr<GrpcLb> lb_policy,
312                       grpc_event_engine::experimental::Slice lb_token,
313                       RefCountedPtr<GrpcLbClientStats> client_stats)
314         : DelegatingSubchannel(std::move(subchannel)),
315           lb_policy_(std::move(lb_policy)),
316           lb_token_(std::move(lb_token)),
317           client_stats_(std::move(client_stats)) {}
318 
lb_token() const319     const grpc_event_engine::experimental::Slice& lb_token() const {
320       return lb_token_;
321     }
client_stats() const322     GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
323 
324    private:
Orphaned()325     void Orphaned() override {
326       if (!IsWorkSerializerDispatchEnabled()) {
327         if (!lb_policy_->shutting_down_) {
328           lb_policy_->CacheDeletedSubchannelLocked(wrapped_subchannel());
329         }
330         return;
331       }
332       lb_policy_->work_serializer()->Run(
333           [self = WeakRefAsSubclass<SubchannelWrapper>()]() {
334             if (!self->lb_policy_->shutting_down_) {
335               self->lb_policy_->CacheDeletedSubchannelLocked(
336                   self->wrapped_subchannel());
337             }
338           },
339           DEBUG_LOCATION);
340     }
341 
342     RefCountedPtr<GrpcLb> lb_policy_;
343     grpc_event_engine::experimental::Slice lb_token_;
344     RefCountedPtr<GrpcLbClientStats> client_stats_;
345   };
346 
347   class TokenAndClientStatsArg final
348       : public RefCounted<TokenAndClientStatsArg> {
349    public:
TokenAndClientStatsArg(grpc_event_engine::experimental::Slice lb_token,RefCountedPtr<GrpcLbClientStats> client_stats)350     TokenAndClientStatsArg(grpc_event_engine::experimental::Slice lb_token,
351                            RefCountedPtr<GrpcLbClientStats> client_stats)
352         : lb_token_(std::move(lb_token)),
353           client_stats_(std::move(client_stats)) {}
354 
ChannelArgName()355     static absl::string_view ChannelArgName() {
356       return GRPC_ARG_NO_SUBCHANNEL_PREFIX "grpclb_token_and_client_stats";
357     }
358 
ChannelArgsCompare(const TokenAndClientStatsArg * a,const TokenAndClientStatsArg * b)359     static int ChannelArgsCompare(const TokenAndClientStatsArg* a,
360                                   const TokenAndClientStatsArg* b) {
361       int r =
362           a->lb_token_.as_string_view().compare(b->lb_token_.as_string_view());
363       if (r != 0) return r;
364       return QsortCompare(a->client_stats_.get(), b->client_stats_.get());
365     }
366 
lb_token() const367     const grpc_event_engine::experimental::Slice& lb_token() const {
368       return lb_token_;
369     }
client_stats() const370     RefCountedPtr<GrpcLbClientStats> client_stats() const {
371       return client_stats_;
372     }
373 
374    private:
375     grpc_event_engine::experimental::Slice lb_token_;
376     RefCountedPtr<GrpcLbClientStats> client_stats_;
377   };
378 
379   class Serverlist final : public RefCounted<Serverlist> {
380    public:
381     // Takes ownership of serverlist.
Serverlist(std::vector<GrpcLbServer> serverlist)382     explicit Serverlist(std::vector<GrpcLbServer> serverlist)
383         : serverlist_(std::move(serverlist)) {}
384 
385     bool operator==(const Serverlist& other) const;
386 
serverlist() const387     const std::vector<GrpcLbServer>& serverlist() const { return serverlist_; }
388 
389     // Returns a text representation suitable for logging.
390     std::string AsText() const;
391 
392     // Extracts all non-drop entries into an EndpointAddressesIterator.
393     std::shared_ptr<EndpointAddressesIterator> GetServerAddressList(
394         GrpcLbClientStats* client_stats);
395 
396     // Returns true if the serverlist contains at least one drop entry and
397     // no backend address entries.
398     bool ContainsAllDropEntries() const;
399 
400     // Returns the LB token to use for a drop, or null if the call
401     // should not be dropped.
402     //
403     // Note: This is called from the picker, NOT from inside the control
404     // plane work_serializer.
405     const char* ShouldDrop();
406 
407    private:
408     class AddressIterator;
409 
410     std::vector<GrpcLbServer> serverlist_;
411 
412     // Accessed from the picker, so needs synchronization.
413     std::atomic<size_t> drop_index_{0};
414   };
415 
416   class Picker final : public SubchannelPicker {
417    public:
Picker(RefCountedPtr<Serverlist> serverlist,RefCountedPtr<SubchannelPicker> child_picker,RefCountedPtr<GrpcLbClientStats> client_stats)418     Picker(RefCountedPtr<Serverlist> serverlist,
419            RefCountedPtr<SubchannelPicker> child_picker,
420            RefCountedPtr<GrpcLbClientStats> client_stats)
421         : serverlist_(std::move(serverlist)),
422           child_picker_(std::move(child_picker)),
423           client_stats_(std::move(client_stats)) {}
424 
425     PickResult Pick(PickArgs args) override;
426 
427    private:
428     // A subchannel call tracker that unrefs the GrpcLbClientStats object
429     // in the case where the subchannel call is never actually started,
430     // since the client load reporting filter will not be able to do it
431     // in that case.
432     class SubchannelCallTracker final : public SubchannelCallTrackerInterface {
433      public:
SubchannelCallTracker(RefCountedPtr<GrpcLbClientStats> client_stats,std::unique_ptr<SubchannelCallTrackerInterface> original_call_tracker)434       SubchannelCallTracker(
435           RefCountedPtr<GrpcLbClientStats> client_stats,
436           std::unique_ptr<SubchannelCallTrackerInterface> original_call_tracker)
437           : client_stats_(std::move(client_stats)),
438             original_call_tracker_(std::move(original_call_tracker)) {}
439 
Start()440       void Start() override {
441         if (original_call_tracker_ != nullptr) {
442           original_call_tracker_->Start();
443         }
444         // If we're actually starting the subchannel call, then the
445         // client load reporting filter will take ownership of the ref
446         // passed down to it via metadata.
447         client_stats_.release();
448       }
449 
Finish(FinishArgs args)450       void Finish(FinishArgs args) override {
451         if (original_call_tracker_ != nullptr) {
452           original_call_tracker_->Finish(args);
453         }
454       }
455 
456      private:
457       RefCountedPtr<GrpcLbClientStats> client_stats_;
458       std::unique_ptr<SubchannelCallTrackerInterface> original_call_tracker_;
459     };
460 
461     // Serverlist to be used for determining drops.
462     RefCountedPtr<Serverlist> serverlist_;
463 
464     RefCountedPtr<SubchannelPicker> child_picker_;
465     RefCountedPtr<GrpcLbClientStats> client_stats_;
466   };
467 
468   class Helper final
469       : public ParentOwningDelegatingChannelControlHelper<GrpcLb> {
470    public:
Helper(RefCountedPtr<GrpcLb> parent)471     explicit Helper(RefCountedPtr<GrpcLb> parent)
472         : ParentOwningDelegatingChannelControlHelper(std::move(parent)) {}
473 
474     RefCountedPtr<SubchannelInterface> CreateSubchannel(
475         const grpc_resolved_address& address,
476         const ChannelArgs& per_address_args, const ChannelArgs& args) override;
477     void UpdateState(grpc_connectivity_state state, const absl::Status& status,
478                      RefCountedPtr<SubchannelPicker> picker) override;
479     void RequestReresolution() override;
480   };
481 
482   class StateWatcher final : public AsyncConnectivityStateWatcherInterface {
483    public:
StateWatcher(RefCountedPtr<GrpcLb> parent)484     explicit StateWatcher(RefCountedPtr<GrpcLb> parent)
485         : AsyncConnectivityStateWatcherInterface(parent->work_serializer()),
486           parent_(std::move(parent)) {}
487 
~StateWatcher()488     ~StateWatcher() override { parent_.reset(DEBUG_LOCATION, "StateWatcher"); }
489 
490    private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)491     void OnConnectivityStateChange(grpc_connectivity_state new_state,
492                                    const absl::Status& status) override {
493       if (parent_->fallback_at_startup_checks_pending_ &&
494           new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
495         // In TRANSIENT_FAILURE.  Cancel the fallback timer and go into
496         // fallback mode immediately.
497         GRPC_TRACE_LOG(glb, INFO)
498             << "[grpclb " << parent_.get()
499             << "] balancer channel in state:TRANSIENT_FAILURE ("
500             << status.ToString() << "); entering fallback mode";
501         parent_->fallback_at_startup_checks_pending_ = false;
502         parent_->channel_control_helper()->GetEventEngine()->Cancel(
503             *parent_->lb_fallback_timer_handle_);
504         parent_->fallback_mode_ = true;
505         parent_->CreateOrUpdateChildPolicyLocked();
506         // Cancel the watch, since we don't care about the channel state once we
507         // go into fallback mode.
508         parent_->CancelBalancerChannelConnectivityWatchLocked();
509       }
510     }
511 
512     RefCountedPtr<GrpcLb> parent_;
513   };
514 
515   class NullLbTokenEndpointIterator;
516 
517   void ShutdownLocked() override;
518 
519   // Helper functions used in UpdateLocked().
520   absl::Status UpdateBalancerChannelLocked();
521 
522   void CancelBalancerChannelConnectivityWatchLocked();
523 
524   // Methods for dealing with fallback state.
525   void MaybeEnterFallbackModeAfterStartup();
526   void OnFallbackTimerLocked();
527 
528   // Methods for dealing with the balancer call.
529   void StartBalancerCallLocked();
530   void StartBalancerCallRetryTimerLocked();
531   void OnBalancerCallRetryTimerLocked();
532 
533   // Methods for dealing with the child policy.
534   ChannelArgs CreateChildPolicyArgsLocked(
535       bool is_backend_from_grpclb_load_balancer);
536   OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
537       const ChannelArgs& args);
538   void CreateOrUpdateChildPolicyLocked();
539 
540   // Subchannel caching.
541   void CacheDeletedSubchannelLocked(
542       RefCountedPtr<SubchannelInterface> subchannel);
543   void StartSubchannelCacheTimerLocked();
544   void OnSubchannelCacheTimerLocked();
545 
546   // Configurations for the policy.
547   RefCountedPtr<GrpcLbConfig> config_;
548 
549   // Current channel args from the resolver.
550   ChannelArgs args_;
551 
552   // Internal state.
553   bool shutting_down_ = false;
554 
555   // The channel for communicating with the LB server.
556   RefCountedPtr<Channel> lb_channel_;
557   StateWatcher* watcher_ = nullptr;
558   // Response generator to inject address updates into lb_channel_.
559   RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
560   // Parent channelz node.
561   RefCountedPtr<channelz::ChannelNode> parent_channelz_node_;
562 
563   // The data associated with the current LB call. It holds a ref to this LB
564   // policy. It's initialized every time we query for backends. It's reset to
565   // NULL whenever the current LB call is no longer needed (e.g., the LB policy
566   // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
567   // contains a non-NULL lb_call_.
568   OrphanablePtr<BalancerCallState> lb_calld_;
569   // Timeout for the LB call. 0 means no deadline.
570   const Duration lb_call_timeout_;
571   // Balancer call retry state.
572   BackOff lb_call_backoff_;
573   absl::optional<EventEngine::TaskHandle> lb_call_retry_timer_handle_;
574 
575   // The deserialized response from the balancer. May be nullptr until one
576   // such response has arrived.
577   RefCountedPtr<Serverlist> serverlist_;
578 
579   // Whether we're in fallback mode.
580   bool fallback_mode_ = false;
581   // The backend addresses from the resolver.
582   absl::StatusOr<std::shared_ptr<NullLbTokenEndpointIterator>>
583       fallback_backend_addresses_;
584   // The last resolution note from our parent.
585   // To be passed to child policy when fallback_backend_addresses_ is empty.
586   std::string resolution_note_;
587   // State for fallback-at-startup checks.
588   // Timeout after startup after which we will go into fallback mode if
589   // we have not received a serverlist from the balancer.
590   const Duration fallback_at_startup_timeout_;
591   bool fallback_at_startup_checks_pending_ = false;
592   absl::optional<EventEngine::TaskHandle> lb_fallback_timer_handle_;
593 
594   // The child policy to use for the backends.
595   OrphanablePtr<LoadBalancingPolicy> child_policy_;
596   // Child policy in state READY.
597   bool child_policy_ready_ = false;
598 
599   // Deleted subchannel caching.
600   const Duration subchannel_cache_interval_;
601   std::map<Timestamp /*deletion time*/,
602            std::vector<RefCountedPtr<SubchannelInterface>>>
603       cached_subchannels_;
604   absl::optional<EventEngine::TaskHandle> subchannel_cache_timer_handle_;
605 };
606 
607 //
608 // GrpcLb::Serverlist::AddressIterator
609 //
610 
IsServerValid(const GrpcLbServer & server,size_t idx,bool log)611 bool IsServerValid(const GrpcLbServer& server, size_t idx, bool log) {
612   if (server.drop) return false;
613   if (GPR_UNLIKELY(server.port >> 16 != 0)) {
614     if (log) {
615       LOG(ERROR) << "Invalid port '" << server.port << "' at index " << idx
616                  << " of serverlist. Ignoring.";
617     }
618     return false;
619   }
620   if (GPR_UNLIKELY(server.ip_size != 4 && server.ip_size != 16)) {
621     if (log) {
622       LOG(ERROR) << "Expected IP to be 4 or 16 bytes, got " << server.ip_size
623                  << " at index " << idx << " of serverlist. Ignoring";
624     }
625     return false;
626   }
627   return true;
628 }
629 
ParseServer(const GrpcLbServer & server,grpc_resolved_address * addr)630 void ParseServer(const GrpcLbServer& server, grpc_resolved_address* addr) {
631   memset(addr, 0, sizeof(*addr));
632   if (server.drop) return;
633   const uint16_t netorder_port = grpc_htons(static_cast<uint16_t>(server.port));
634   // the addresses are given in binary format (a in(6)_addr struct) in
635   // server->ip_address.bytes.
636   if (server.ip_size == 4) {
637     addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
638     grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
639     addr4->sin_family = GRPC_AF_INET;
640     memcpy(&addr4->sin_addr, server.ip_addr, server.ip_size);
641     addr4->sin_port = netorder_port;
642   } else if (server.ip_size == 16) {
643     addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
644     grpc_sockaddr_in6* addr6 =
645         reinterpret_cast<grpc_sockaddr_in6*>(&addr->addr);
646     addr6->sin6_family = GRPC_AF_INET6;
647     memcpy(&addr6->sin6_addr, server.ip_addr, server.ip_size);
648     addr6->sin6_port = netorder_port;
649   }
650 }
651 
652 class GrpcLb::Serverlist::AddressIterator final
653     : public EndpointAddressesIterator {
654  public:
AddressIterator(RefCountedPtr<Serverlist> serverlist,RefCountedPtr<GrpcLbClientStats> client_stats)655   AddressIterator(RefCountedPtr<Serverlist> serverlist,
656                   RefCountedPtr<GrpcLbClientStats> client_stats)
657       : serverlist_(std::move(serverlist)),
658         client_stats_(std::move(client_stats)) {}
659 
ForEach(absl::FunctionRef<void (const EndpointAddresses &)> callback) const660   void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
661       const override {
662     for (size_t i = 0; i < serverlist_->serverlist_.size(); ++i) {
663       const GrpcLbServer& server = serverlist_->serverlist_[i];
664       if (!IsServerValid(server, i, false)) continue;
665       // Address processing.
666       grpc_resolved_address addr;
667       ParseServer(server, &addr);
668       // LB token processing.
669       const size_t lb_token_length = strnlen(
670           server.load_balance_token, GPR_ARRAY_SIZE(server.load_balance_token));
671       auto lb_token = grpc_event_engine::experimental::Slice::FromCopiedBuffer(
672           server.load_balance_token, lb_token_length);
673       if (lb_token.empty()) {
674         auto addr_uri = grpc_sockaddr_to_uri(&addr);
675         GRPC_TRACE_LOG(glb, INFO)
676             << "Missing LB token for backend address '"
677             << (addr_uri.ok() ? *addr_uri : addr_uri.status().ToString())
678             << "'. The empty token will be used instead";
679       }
680       // Return address with a channel arg containing LB token and stats object.
681       callback(EndpointAddresses(
682           addr, ChannelArgs().SetObject(MakeRefCounted<TokenAndClientStatsArg>(
683                     std::move(lb_token), client_stats_))));
684     }
685   }
686 
687  private:
688   RefCountedPtr<Serverlist> serverlist_;
689   RefCountedPtr<GrpcLbClientStats> client_stats_;
690 };
691 
692 //
693 // GrpcLb::Serverlist
694 //
695 
operator ==(const Serverlist & other) const696 bool GrpcLb::Serverlist::operator==(const Serverlist& other) const {
697   return serverlist_ == other.serverlist_;
698 }
699 
AsText() const700 std::string GrpcLb::Serverlist::AsText() const {
701   std::vector<std::string> entries;
702   for (size_t i = 0; i < serverlist_.size(); ++i) {
703     const GrpcLbServer& server = serverlist_[i];
704     std::string ipport;
705     if (server.drop) {
706       ipport = "(drop)";
707     } else {
708       grpc_resolved_address addr;
709       ParseServer(server, &addr);
710       auto addr_str = grpc_sockaddr_to_string(&addr, false);
711       ipport = addr_str.ok() ? *addr_str : addr_str.status().ToString();
712     }
713     entries.push_back(absl::StrFormat("  %" PRIuPTR ": %s token=%s\n", i,
714                                       ipport, server.load_balance_token));
715   }
716   return absl::StrJoin(entries, "");
717 }
718 
719 // Returns addresses extracted from the serverlist.
720 std::shared_ptr<EndpointAddressesIterator>
GetServerAddressList(GrpcLbClientStats * client_stats)721 GrpcLb::Serverlist::GetServerAddressList(GrpcLbClientStats* client_stats) {
722   RefCountedPtr<GrpcLbClientStats> stats;
723   if (client_stats != nullptr) stats = client_stats->Ref();
724   return std::make_shared<AddressIterator>(Ref(), std::move(stats));
725 }
726 
ContainsAllDropEntries() const727 bool GrpcLb::Serverlist::ContainsAllDropEntries() const {
728   if (serverlist_.empty()) return false;
729   for (const GrpcLbServer& server : serverlist_) {
730     if (!server.drop) return false;
731   }
732   return true;
733 }
734 
ShouldDrop()735 const char* GrpcLb::Serverlist::ShouldDrop() {
736   if (serverlist_.empty()) return nullptr;
737   size_t index = drop_index_.fetch_add(1, std::memory_order_relaxed);
738   GrpcLbServer& server = serverlist_[index % serverlist_.size()];
739   return server.drop ? server.load_balance_token : nullptr;
740 }
741 
742 //
743 // GrpcLb::Picker
744 //
745 
Pick(PickArgs args)746 GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
747   // Check if we should drop the call.
748   const char* drop_token =
749       serverlist_ == nullptr ? nullptr : serverlist_->ShouldDrop();
750   if (drop_token != nullptr) {
751     // Update client load reporting stats to indicate the number of
752     // dropped calls.  Note that we have to do this here instead of in
753     // the client_load_reporting filter, because we do not create a
754     // subchannel call (and therefore no client_load_reporting filter)
755     // for dropped calls.
756     if (client_stats_ != nullptr) {
757       client_stats_->AddCallDropped(drop_token);
758     }
759     return PickResult::Drop(
760         absl::UnavailableError("drop directed by grpclb balancer"));
761   }
762   // Forward pick to child policy.
763   PickResult result = child_picker_->Pick(args);
764   // If pick succeeded, add LB token to initial metadata.
765   auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
766   if (complete_pick != nullptr) {
767     const SubchannelWrapper* subchannel_wrapper =
768         static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
769     // Encode client stats object into metadata for use by
770     // client_load_reporting filter.
771     GrpcLbClientStats* client_stats = subchannel_wrapper->client_stats();
772     if (client_stats != nullptr) {
773       complete_pick->subchannel_call_tracker =
774           std::make_unique<SubchannelCallTracker>(
775               client_stats->Ref(),
776               std::move(complete_pick->subchannel_call_tracker));
777       // The metadata value is a hack: we pretend the pointer points to
778       // a string and rely on the client_load_reporting filter to know
779       // how to interpret it.
780       // NOLINTBEGIN(bugprone-string-constructor)
781       complete_pick->metadata_mutations.Set(
782           GrpcLbClientStatsMetadata::key(),
783           grpc_event_engine::experimental::Slice(grpc_slice_from_static_buffer(
784               reinterpret_cast<const char*>(client_stats), 0)));
785       // NOLINTEND(bugprone-string-constructor)
786       // Update calls-started.
787       client_stats->AddCallStarted();
788     }
789     // Encode the LB token in metadata.
790     // Create a new copy on the call arena, since the subchannel list
791     // may get refreshed between when we return this pick and when the
792     // initial metadata goes out on the wire.
793     if (!subchannel_wrapper->lb_token().empty()) {
794       complete_pick->metadata_mutations.Set(
795           LbTokenMetadata::key(), subchannel_wrapper->lb_token().Ref());
796     }
797     // Unwrap subchannel to pass up to the channel.
798     complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
799   }
800   return result;
801 }
802 
803 //
804 // GrpcLb::Helper
805 //
806 
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)807 RefCountedPtr<SubchannelInterface> GrpcLb::Helper::CreateSubchannel(
808     const grpc_resolved_address& address, const ChannelArgs& per_address_args,
809     const ChannelArgs& args) {
810   if (parent()->shutting_down_) return nullptr;
811   const auto* arg = per_address_args.GetObject<TokenAndClientStatsArg>();
812   if (arg == nullptr) {
813     auto addr_str = grpc_sockaddr_to_string(&address, false);
814     Crash(
815         absl::StrFormat("[grpclb %p] no TokenAndClientStatsArg for address %s",
816                         parent(), addr_str.value_or("N/A").c_str()));
817   }
818   return MakeRefCounted<SubchannelWrapper>(
819       parent()->channel_control_helper()->CreateSubchannel(
820           address, per_address_args, args),
821       parent()->RefAsSubclass<GrpcLb>(DEBUG_LOCATION, "SubchannelWrapper"),
822       arg->lb_token().Ref(), arg->client_stats());
823 }
824 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)825 void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
826                                  const absl::Status& status,
827                                  RefCountedPtr<SubchannelPicker> picker) {
828   if (parent()->shutting_down_) return;
829   // Record whether child policy reports READY.
830   parent()->child_policy_ready_ = state == GRPC_CHANNEL_READY;
831   // Enter fallback mode if needed.
832   parent()->MaybeEnterFallbackModeAfterStartup();
833   // We pass the serverlist to the picker so that it can handle drops.
834   // However, we don't want to handle drops in the case where the child
835   // policy is reporting a state other than READY (unless we are
836   // dropping *all* calls), because we don't want to process drops for picks
837   // that yield a QUEUE result; this would result in dropping too many calls,
838   // since we will see the queued picks multiple times, and we'd consider each
839   // one a separate call for the drop calculation.  So in this case, we pass
840   // a null serverlist to the picker, which tells it not to do drops.
841   RefCountedPtr<Serverlist> serverlist;
842   if (state == GRPC_CHANNEL_READY ||
843       (parent()->serverlist_ != nullptr &&
844        parent()->serverlist_->ContainsAllDropEntries())) {
845     serverlist = parent()->serverlist_;
846   }
847   RefCountedPtr<GrpcLbClientStats> client_stats;
848   if (parent()->lb_calld_ != nullptr &&
849       parent()->lb_calld_->client_stats() != nullptr) {
850     client_stats = parent()->lb_calld_->client_stats()->Ref();
851   }
852   GRPC_TRACE_LOG(glb, INFO)
853       << "[grpclb " << parent() << " helper " << this
854       << "] state=" << ConnectivityStateName(state) << " (" << status.ToString()
855       << ") wrapping child picker " << picker.get()
856       << " (serverlist=" << serverlist.get()
857       << ", client_stats=" << client_stats.get() << ")";
858   parent()->channel_control_helper()->UpdateState(
859       state, status,
860       MakeRefCounted<Picker>(std::move(serverlist), std::move(picker),
861                              std::move(client_stats)));
862 }
863 
RequestReresolution()864 void GrpcLb::Helper::RequestReresolution() {
865   if (parent()->shutting_down_) return;
866   // Ignore if we're not in fallback mode, because if we got the backend
867   // addresses from the balancer, re-resolving is not going to fix it.
868   if (!parent()->fallback_mode_) return;
869   parent()->channel_control_helper()->RequestReresolution();
870 }
871 
872 //
873 // GrpcLb::BalancerCallState
874 //
875 
BalancerCallState(RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)876 GrpcLb::BalancerCallState::BalancerCallState(
877     RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
878     : InternallyRefCounted<BalancerCallState>(
879           GRPC_TRACE_FLAG_ENABLED(glb) ? "BalancerCallState" : nullptr),
880       grpclb_policy_(std::move(parent_grpclb_policy)) {
881   CHECK(grpclb_policy_ != nullptr);
882   CHECK(!grpclb_policy()->shutting_down_);
883   // Init the LB call. Note that the LB call will progress every time there's
884   // activity in grpclb_policy_->interested_parties(), which is comprised of
885   // the polling entities from client_channel.
886   GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSent, this,
887                     grpc_schedule_on_exec_ctx);
888   GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
889                     OnBalancerMessageReceived, this, grpc_schedule_on_exec_ctx);
890   GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, OnBalancerStatusReceived,
891                     this, grpc_schedule_on_exec_ctx);
892   GRPC_CLOSURE_INIT(&client_load_report_done_closure_, ClientLoadReportDone,
893                     this, grpc_schedule_on_exec_ctx);
894   const Timestamp deadline =
895       grpclb_policy()->lb_call_timeout_ == Duration::Zero()
896           ? Timestamp::InfFuture()
897           : Timestamp::Now() + grpclb_policy()->lb_call_timeout_;
898   lb_call_ = grpclb_policy()->lb_channel_->CreateCall(
899       /*parent_call=*/nullptr, GRPC_PROPAGATE_DEFAULTS,
900       /*cq=*/nullptr, grpclb_policy_->interested_parties(),
901       Slice::FromStaticString("/grpc.lb.v1.LoadBalancer/BalanceLoad"),
902       /*authority=*/absl::nullopt, deadline, /*registered_method=*/true);
903   // Init the LB call request payload.
904   upb::Arena arena;
905   grpc_slice request_payload_slice = GrpcLbRequestCreate(
906       grpclb_policy()->config_->service_name().empty()
907           ? grpclb_policy()->channel_control_helper()->GetAuthority()
908           : grpclb_policy()->config_->service_name(),
909       arena.ptr());
910   send_message_payload_ =
911       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
912   CSliceUnref(request_payload_slice);
913   // Init other data associated with the LB call.
914   grpc_metadata_array_init(&lb_initial_metadata_recv_);
915   grpc_metadata_array_init(&lb_trailing_metadata_recv_);
916 }
917 
~BalancerCallState()918 GrpcLb::BalancerCallState::~BalancerCallState() {
919   CHECK_NE(lb_call_, nullptr);
920   grpc_call_unref(lb_call_);
921   grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
922   grpc_metadata_array_destroy(&lb_trailing_metadata_recv_);
923   grpc_byte_buffer_destroy(send_message_payload_);
924   grpc_byte_buffer_destroy(recv_message_payload_);
925   CSliceUnref(lb_call_status_details_);
926 }
927 
Orphan()928 void GrpcLb::BalancerCallState::Orphan() {
929   CHECK_NE(lb_call_, nullptr);
930   // If we are here because grpclb_policy wants to cancel the call,
931   // lb_on_balancer_status_received_ will complete the cancellation and clean
932   // up. Otherwise, we are here because grpclb_policy has to orphan a failed
933   // call, then the following cancellation will be a no-op.
934   grpc_call_cancel_internal(lb_call_);
935   if (client_load_report_handle_.has_value() &&
936       grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
937           client_load_report_handle_.value())) {
938     Unref(DEBUG_LOCATION, "client_load_report cancelled");
939   }
940   // Note that the initial ref is hold by lb_on_balancer_status_received_
941   // instead of the caller of this function. So the corresponding unref happens
942   // in lb_on_balancer_status_received_ instead of here.
943 }
944 
StartQuery()945 void GrpcLb::BalancerCallState::StartQuery() {
946   CHECK_NE(lb_call_, nullptr);
947   GRPC_TRACE_LOG(glb, INFO)
948       << "[grpclb " << grpclb_policy_.get() << "] lb_calld=" << this
949       << ": Starting LB call " << lb_call_;
950   // Create the ops.
951   grpc_call_error call_error;
952   grpc_op ops[3];
953   memset(ops, 0, sizeof(ops));
954   // Op: send initial metadata.
955   grpc_op* op = ops;
956   op->op = GRPC_OP_SEND_INITIAL_METADATA;
957   op->data.send_initial_metadata.count = 0;
958   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
959               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
960   op->reserved = nullptr;
961   op++;
962   // Op: send request message.
963   CHECK_NE(send_message_payload_, nullptr);
964   op->op = GRPC_OP_SEND_MESSAGE;
965   op->data.send_message.send_message = send_message_payload_;
966   op->flags = 0;
967   op->reserved = nullptr;
968   op++;
969   // TODO(roth): We currently track this ref manually.  Once the
970   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
971   // with the callback.
972   auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
973   self.release();
974   call_error = grpc_call_start_batch_and_execute(lb_call_, ops,
975                                                  static_cast<size_t>(op - ops),
976                                                  &lb_on_initial_request_sent_);
977   CHECK_EQ(call_error, GRPC_CALL_OK);
978   // Op: recv initial metadata.
979   op = ops;
980   op->op = GRPC_OP_RECV_INITIAL_METADATA;
981   op->data.recv_initial_metadata.recv_initial_metadata =
982       &lb_initial_metadata_recv_;
983   op->flags = 0;
984   op->reserved = nullptr;
985   op++;
986   // Op: recv response.
987   op->op = GRPC_OP_RECV_MESSAGE;
988   op->data.recv_message.recv_message = &recv_message_payload_;
989   op->flags = 0;
990   op->reserved = nullptr;
991   op++;
992   // TODO(roth): We currently track this ref manually.  Once the
993   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
994   // with the callback.
995   self = Ref(DEBUG_LOCATION, "on_message_received");
996   self.release();
997   call_error = grpc_call_start_batch_and_execute(
998       lb_call_, ops, static_cast<size_t>(op - ops),
999       &lb_on_balancer_message_received_);
1000   CHECK_EQ(call_error, GRPC_CALL_OK);
1001   // Op: recv server status.
1002   op = ops;
1003   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1004   op->data.recv_status_on_client.trailing_metadata =
1005       &lb_trailing_metadata_recv_;
1006   op->data.recv_status_on_client.status = &lb_call_status_;
1007   op->data.recv_status_on_client.status_details = &lb_call_status_details_;
1008   op->flags = 0;
1009   op->reserved = nullptr;
1010   op++;
1011   // This callback signals the end of the LB call, so it relies on the initial
1012   // ref instead of a new ref. When it's invoked, it's the initial ref that is
1013   // unreffed.
1014   call_error = grpc_call_start_batch_and_execute(
1015       lb_call_, ops, static_cast<size_t>(op - ops),
1016       &lb_on_balancer_status_received_);
1017   CHECK_EQ(call_error, GRPC_CALL_OK);
1018 }
1019 
ScheduleNextClientLoadReportLocked()1020 void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
1021   client_load_report_handle_ =
1022       grpclb_policy()->channel_control_helper()->GetEventEngine()->RunAfter(
1023           client_stats_report_interval_, [this] {
1024             ApplicationCallbackExecCtx callback_exec_ctx;
1025             ExecCtx exec_ctx;
1026             grpclb_policy()->work_serializer()->Run(
1027                 [this] { MaybeSendClientLoadReportLocked(); }, DEBUG_LOCATION);
1028           });
1029 }
1030 
MaybeSendClientLoadReportLocked()1031 void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked() {
1032   client_load_report_handle_.reset();
1033   if (this != grpclb_policy()->lb_calld_.get()) {
1034     Unref(DEBUG_LOCATION, "client_load_report");
1035     return;
1036   }
1037   // If we've already sent the initial request, then we can go ahead and send
1038   // the load report. Otherwise, we need to wait until the initial request has
1039   // been sent to send this (see OnInitialRequestSentLocked()).
1040   if (send_message_payload_ == nullptr) {
1041     SendClientLoadReportLocked();
1042   } else {
1043     client_load_report_is_due_ = true;
1044   }
1045 }
1046 
SendClientLoadReportLocked()1047 void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
1048   // Construct message payload.
1049   CHECK_EQ(send_message_payload_, nullptr);
1050   // Get snapshot of stats.
1051   int64_t num_calls_started;
1052   int64_t num_calls_finished;
1053   int64_t num_calls_finished_with_client_failed_to_send;
1054   int64_t num_calls_finished_known_received;
1055   std::unique_ptr<GrpcLbClientStats::DroppedCallCounts> drop_token_counts;
1056   client_stats_->Get(&num_calls_started, &num_calls_finished,
1057                      &num_calls_finished_with_client_failed_to_send,
1058                      &num_calls_finished_known_received, &drop_token_counts);
1059   // Skip client load report if the counters were all zero in the last
1060   // report and they are still zero in this one.
1061   if (num_calls_started == 0 && num_calls_finished == 0 &&
1062       num_calls_finished_with_client_failed_to_send == 0 &&
1063       num_calls_finished_known_received == 0 &&
1064       (drop_token_counts == nullptr || drop_token_counts->empty())) {
1065     if (last_client_load_report_counters_were_zero_) {
1066       ScheduleNextClientLoadReportLocked();
1067       return;
1068     }
1069     last_client_load_report_counters_were_zero_ = true;
1070   } else {
1071     last_client_load_report_counters_were_zero_ = false;
1072   }
1073   // Populate load report.
1074   upb::Arena arena;
1075   grpc_slice request_payload_slice = GrpcLbLoadReportRequestCreate(
1076       num_calls_started, num_calls_finished,
1077       num_calls_finished_with_client_failed_to_send,
1078       num_calls_finished_known_received, drop_token_counts.get(), arena.ptr());
1079   send_message_payload_ =
1080       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1081   CSliceUnref(request_payload_slice);
1082   // Send the report.
1083   grpc_op op;
1084   memset(&op, 0, sizeof(op));
1085   op.op = GRPC_OP_SEND_MESSAGE;
1086   op.data.send_message.send_message = send_message_payload_;
1087   grpc_call_error call_error = grpc_call_start_batch_and_execute(
1088       lb_call_, &op, 1, &client_load_report_done_closure_);
1089   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1090     LOG(ERROR) << "[grpclb " << grpclb_policy_.get() << "] lb_calld=" << this
1091                << " call_error=" << call_error << " sending client load report";
1092     CHECK_EQ(call_error, GRPC_CALL_OK);
1093   }
1094 }
1095 
ClientLoadReportDone(void * arg,grpc_error_handle error)1096 void GrpcLb::BalancerCallState::ClientLoadReportDone(void* arg,
1097                                                      grpc_error_handle error) {
1098   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1099   lb_calld->grpclb_policy()->work_serializer()->Run(
1100       [lb_calld, error]() { lb_calld->ClientLoadReportDoneLocked(error); },
1101       DEBUG_LOCATION);
1102 }
1103 
ClientLoadReportDoneLocked(grpc_error_handle error)1104 void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(
1105     grpc_error_handle error) {
1106   grpc_byte_buffer_destroy(send_message_payload_);
1107   send_message_payload_ = nullptr;
1108   if (!error.ok() || this != grpclb_policy()->lb_calld_.get()) {
1109     Unref(DEBUG_LOCATION, "client_load_report");
1110     return;
1111   }
1112   ScheduleNextClientLoadReportLocked();
1113 }
1114 
OnInitialRequestSent(void * arg,grpc_error_handle)1115 void GrpcLb::BalancerCallState::OnInitialRequestSent(
1116     void* arg, grpc_error_handle /*error*/) {
1117   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1118   lb_calld->grpclb_policy()->work_serializer()->Run(
1119       [lb_calld]() { lb_calld->OnInitialRequestSentLocked(); }, DEBUG_LOCATION);
1120 }
1121 
OnInitialRequestSentLocked()1122 void GrpcLb::BalancerCallState::OnInitialRequestSentLocked() {
1123   grpc_byte_buffer_destroy(send_message_payload_);
1124   send_message_payload_ = nullptr;
1125   // If we attempted to send a client load report before the initial request was
1126   // sent (and this lb_calld is still in use), send the load report now.
1127   if (client_load_report_is_due_ && this == grpclb_policy()->lb_calld_.get()) {
1128     SendClientLoadReportLocked();
1129     client_load_report_is_due_ = false;
1130   }
1131   Unref(DEBUG_LOCATION, "on_initial_request_sent");
1132 }
1133 
OnBalancerMessageReceived(void * arg,grpc_error_handle)1134 void GrpcLb::BalancerCallState::OnBalancerMessageReceived(
1135     void* arg, grpc_error_handle /*error*/) {
1136   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1137   lb_calld->grpclb_policy()->work_serializer()->Run(
1138       [lb_calld]() { lb_calld->OnBalancerMessageReceivedLocked(); },
1139       DEBUG_LOCATION);
1140 }
1141 
OnBalancerMessageReceivedLocked()1142 void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
1143   // Null payload means the LB call was cancelled.
1144   if (this != grpclb_policy()->lb_calld_.get() ||
1145       recv_message_payload_ == nullptr) {
1146     Unref(DEBUG_LOCATION, "on_message_received");
1147     return;
1148   }
1149   grpc_byte_buffer_reader bbr;
1150   grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1151   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1152   grpc_byte_buffer_reader_destroy(&bbr);
1153   grpc_byte_buffer_destroy(recv_message_payload_);
1154   recv_message_payload_ = nullptr;
1155   GrpcLbResponse response;
1156   upb::Arena arena;
1157   if (!GrpcLbResponseParse(response_slice, arena.ptr(), &response) ||
1158       (response.type == response.INITIAL && seen_initial_response_)) {
1159     if (absl::MinLogLevel() <= absl::LogSeverityAtLeast::kError) {
1160       char* response_slice_str =
1161           grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
1162       LOG(ERROR) << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
1163                  << ": Invalid LB response received: '" << response_slice_str
1164                  << "'. Ignoring.";
1165       gpr_free(response_slice_str);
1166     }
1167   } else {
1168     switch (response.type) {
1169       case response.INITIAL: {
1170         if (response.client_stats_report_interval != Duration::Zero()) {
1171           client_stats_report_interval_ = std::max(
1172               Duration::Seconds(1), response.client_stats_report_interval);
1173           GRPC_TRACE_LOG(glb, INFO)
1174               << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
1175               << ": Received initial LB response message; client load "
1176                  "reporting interval = "
1177               << client_stats_report_interval_.millis() << " milliseconds";
1178         } else {
1179           GRPC_TRACE_LOG(glb, INFO)
1180               << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
1181               << ": Received initial LB response message; client load "
1182                  "reporting NOT enabled";
1183         }
1184         seen_initial_response_ = true;
1185         break;
1186       }
1187       case response.SERVERLIST: {
1188         CHECK_NE(lb_call_, nullptr);
1189         auto serverlist_wrapper =
1190             MakeRefCounted<Serverlist>(std::move(response.serverlist));
1191         GRPC_TRACE_LOG(glb, INFO)
1192             << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
1193             << ": Serverlist with " << serverlist_wrapper->serverlist().size()
1194             << " servers received:\n"
1195             << serverlist_wrapper->AsText();
1196         seen_serverlist_ = true;
1197         // Start sending client load report only after we start using the
1198         // serverlist returned from the current LB call.
1199         if (client_stats_report_interval_ > Duration::Zero() &&
1200             client_stats_ == nullptr) {
1201           client_stats_ = MakeRefCounted<GrpcLbClientStats>();
1202           // Ref held by callback.
1203           Ref(DEBUG_LOCATION, "client_load_report").release();
1204           ScheduleNextClientLoadReportLocked();
1205         }
1206         // Check if the serverlist differs from the previous one.
1207         if (grpclb_policy()->serverlist_ != nullptr &&
1208             *grpclb_policy()->serverlist_ == *serverlist_wrapper) {
1209           GRPC_TRACE_LOG(glb, INFO)
1210               << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
1211               << ": Incoming server list identical to current, "
1212                  "ignoring.";
1213         } else {  // New serverlist.
1214           // Dispose of the fallback.
1215           // TODO(roth): Ideally, we should stay in fallback mode until we
1216           // know that we can reach at least one of the backends in the new
1217           // serverlist.  Unfortunately, we can't do that, since we need to
1218           // send the new addresses to the child policy in order to determine
1219           // if they are reachable, and if we don't exit fallback mode now,
1220           // CreateOrUpdateChildPolicyLocked() will use the fallback
1221           // addresses instead of the addresses from the new serverlist.
1222           // However, if we can't reach any of the servers in the new
1223           // serverlist, then the child policy will never switch away from
1224           // the fallback addresses, but the grpclb policy will still think
1225           // that we're not in fallback mode, which means that we won't send
1226           // updates to the child policy when the fallback addresses are
1227           // updated by the resolver.  This is sub-optimal, but the only way
1228           // to fix it is to maintain a completely separate child policy for
1229           // fallback mode, and that's more work than we want to put into
1230           // the grpclb implementation at this point, since we're deprecating
1231           // it in favor of the xds policy.  We will implement this the
1232           // right way in the xds policy instead.
1233           if (grpclb_policy()->fallback_mode_) {
1234             LOG(INFO) << "[grpclb " << grpclb_policy()
1235                       << "] Received response from balancer; exiting fallback "
1236                          "mode";
1237             grpclb_policy()->fallback_mode_ = false;
1238           }
1239           if (grpclb_policy()->fallback_at_startup_checks_pending_) {
1240             grpclb_policy()->fallback_at_startup_checks_pending_ = false;
1241             grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
1242                 *grpclb_policy()->lb_fallback_timer_handle_);
1243             grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
1244           }
1245           // Update the serverlist in the GrpcLb instance. This serverlist
1246           // instance will be destroyed either upon the next update or when the
1247           // GrpcLb instance is destroyed.
1248           grpclb_policy()->serverlist_ = std::move(serverlist_wrapper);
1249           grpclb_policy()->CreateOrUpdateChildPolicyLocked();
1250         }
1251         break;
1252       }
1253       case response.FALLBACK: {
1254         if (!grpclb_policy()->fallback_mode_) {
1255           LOG(INFO) << "[grpclb " << grpclb_policy()
1256                     << "] Entering fallback mode as requested by balancer";
1257           if (grpclb_policy()->fallback_at_startup_checks_pending_) {
1258             grpclb_policy()->fallback_at_startup_checks_pending_ = false;
1259             grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
1260                 *grpclb_policy()->lb_fallback_timer_handle_);
1261             grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
1262           }
1263           grpclb_policy()->fallback_mode_ = true;
1264           grpclb_policy()->CreateOrUpdateChildPolicyLocked();
1265           // Reset serverlist, so that if the balancer exits fallback
1266           // mode by sending the same serverlist we were previously
1267           // using, we don't incorrectly ignore it as a duplicate.
1268           grpclb_policy()->serverlist_.reset();
1269         }
1270         break;
1271       }
1272     }
1273   }
1274   CSliceUnref(response_slice);
1275   if (!grpclb_policy()->shutting_down_) {
1276     // Keep listening for serverlist updates.
1277     grpc_op op;
1278     memset(&op, 0, sizeof(op));
1279     op.op = GRPC_OP_RECV_MESSAGE;
1280     op.data.recv_message.recv_message = &recv_message_payload_;
1281     op.flags = 0;
1282     op.reserved = nullptr;
1283     // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
1284     const grpc_call_error call_error = grpc_call_start_batch_and_execute(
1285         lb_call_, &op, 1, &lb_on_balancer_message_received_);
1286     CHECK_EQ(call_error, GRPC_CALL_OK);
1287   } else {
1288     Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
1289   }
1290 }
1291 
OnBalancerStatusReceived(void * arg,grpc_error_handle error)1292 void GrpcLb::BalancerCallState::OnBalancerStatusReceived(
1293     void* arg, grpc_error_handle error) {
1294   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1295   lb_calld->grpclb_policy()->work_serializer()->Run(
1296       [lb_calld, error]() { lb_calld->OnBalancerStatusReceivedLocked(error); },
1297       DEBUG_LOCATION);
1298 }
1299 
OnBalancerStatusReceivedLocked(grpc_error_handle error)1300 void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
1301     grpc_error_handle error) {
1302   CHECK_NE(lb_call_, nullptr);
1303   if (GRPC_TRACE_FLAG_ENABLED(glb)) {
1304     char* status_details = grpc_slice_to_c_string(lb_call_status_details_);
1305     LOG(INFO) << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
1306               << ": Status from LB server received. Status = "
1307               << lb_call_status_ << ", details = '" << status_details
1308               << "', (lb_call: " << lb_call_ << "), error '"
1309               << StatusToString(error) << "'";
1310     gpr_free(status_details);
1311   }
1312   // If this lb_calld is still in use, this call ended because of a failure so
1313   // we want to retry connecting. Otherwise, we have deliberately ended this
1314   // call and no further action is required.
1315   if (this == grpclb_policy()->lb_calld_.get()) {
1316     // If the fallback-at-startup checks are pending, go into fallback mode
1317     // immediately.  This short-circuits the timeout for the fallback-at-startup
1318     // case.
1319     grpclb_policy()->lb_calld_.reset();
1320     if (grpclb_policy()->fallback_at_startup_checks_pending_) {
1321       CHECK(!seen_serverlist_);
1322       LOG(INFO) << "[grpclb " << grpclb_policy()
1323                 << "] Balancer call finished without receiving serverlist; "
1324                    "entering fallback mode";
1325       grpclb_policy()->fallback_at_startup_checks_pending_ = false;
1326       grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
1327           *grpclb_policy()->lb_fallback_timer_handle_);
1328       grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
1329       grpclb_policy()->fallback_mode_ = true;
1330       grpclb_policy()->CreateOrUpdateChildPolicyLocked();
1331     } else {
1332       // This handles the fallback-after-startup case.
1333       grpclb_policy()->MaybeEnterFallbackModeAfterStartup();
1334     }
1335     CHECK(!grpclb_policy()->shutting_down_);
1336     grpclb_policy()->channel_control_helper()->RequestReresolution();
1337     if (seen_initial_response_) {
1338       // If we lose connection to the LB server, reset the backoff and restart
1339       // the LB call immediately.
1340       grpclb_policy()->lb_call_backoff_.Reset();
1341       grpclb_policy()->StartBalancerCallLocked();
1342     } else {
1343       // If this LB call fails establishing any connection to the LB server,
1344       // retry later.
1345       grpclb_policy()->StartBalancerCallRetryTimerLocked();
1346     }
1347   }
1348   Unref(DEBUG_LOCATION, "lb_call_ended");
1349 }
1350 
1351 //
1352 // helper code for creating balancer channel
1353 //
1354 
ExtractBalancerAddresses(const ChannelArgs & args)1355 EndpointAddressesList ExtractBalancerAddresses(const ChannelArgs& args) {
1356   const EndpointAddressesList* endpoints =
1357       FindGrpclbBalancerAddressesInChannelArgs(args);
1358   if (endpoints != nullptr) return *endpoints;
1359   return EndpointAddressesList();
1360 }
1361 
1362 // Returns the channel args for the LB channel, used to create a bidirectional
1363 // stream for the reception of load balancing updates.
1364 //
1365 // Inputs:
1366 //   - \a response_generator: in order to propagate updates from the resolver
1367 //   above the grpclb policy.
1368 //   - \a args: other args inherited from the grpclb policy.
BuildBalancerChannelArgs(FakeResolverResponseGenerator * response_generator,const ChannelArgs & args)1369 ChannelArgs BuildBalancerChannelArgs(
1370     FakeResolverResponseGenerator* response_generator,
1371     const ChannelArgs& args) {
1372   ChannelArgs grpclb_channel_args;
1373   const grpc_channel_args* lb_channel_specific_args =
1374       args.GetPointer<grpc_channel_args>(
1375           GRPC_ARG_EXPERIMENTAL_GRPCLB_CHANNEL_ARGS);
1376   if (lb_channel_specific_args != nullptr) {
1377     grpclb_channel_args = ChannelArgs::FromC(lb_channel_specific_args);
1378   } else {
1379     // Set grpclb_channel_args based on the parent channel's channel args.
1380     grpclb_channel_args =
1381         args
1382             // LB policy name, since we want to use the default (pick_first) in
1383             // the LB channel.
1384             .Remove(GRPC_ARG_LB_POLICY_NAME)
1385             // Strip out the service config, since we don't want the LB policy
1386             // config specified for the parent channel to affect the LB channel.
1387             .Remove(GRPC_ARG_SERVICE_CONFIG)
1388             // The fake resolver response generator, because we are replacing it
1389             // with the one from the grpclb policy, used to propagate updates to
1390             // the LB channel.
1391             .Remove(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR)
1392             // The LB channel should use the authority indicated by the target
1393             // authority table (see \a ModifyGrpclbBalancerChannelArgs),
1394             // as opposed to the authority from the parent channel.
1395             .Remove(GRPC_ARG_DEFAULT_AUTHORITY)
1396             // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should
1397             // be treated as a stand-alone channel and not inherit this argument
1398             // from the args of the parent channel.
1399             .Remove(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)
1400             // Don't want to pass down channelz node from parent; the balancer
1401             // channel will get its own.
1402             .Remove(GRPC_ARG_CHANNELZ_CHANNEL_NODE)
1403             // Remove the channel args for channel credentials and replace it
1404             // with a version that does not contain call credentials. The
1405             // loadbalancer is not necessarily trusted to handle bearer token
1406             // credentials.
1407             .Remove(GRPC_ARG_CHANNEL_CREDENTIALS);
1408   }
1409   return grpclb_channel_args
1410       // A channel arg indicating the target is a grpclb load balancer.
1411       .Set(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER, 1)
1412       // Tells channelz that this is an internal channel.
1413       .Set(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL, 1)
1414       // The fake resolver response generator, which we use to inject
1415       // address updates into the LB channel.
1416       .SetObject(response_generator->Ref());
1417 }
1418 
1419 //
1420 // ctor and dtor
1421 //
1422 
GrpcLb(Args args)1423 GrpcLb::GrpcLb(Args args)
1424     : LoadBalancingPolicy(std::move(args)),
1425       response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
1426       lb_call_timeout_(std::max(
1427           Duration::Zero(),
1428           channel_args()
1429               .GetDurationFromIntMillis(GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS)
1430               .value_or(Duration::Zero()))),
1431       lb_call_backoff_(
1432           BackOff::Options()
1433               .set_initial_backoff(Duration::Seconds(
1434                   GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS))
1435               .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
1436               .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
1437               .set_max_backoff(Duration::Seconds(
1438                   GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS))),
1439       fallback_at_startup_timeout_(std::max(
1440           Duration::Zero(),
1441           channel_args()
1442               .GetDurationFromIntMillis(GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS)
1443               .value_or(Duration::Milliseconds(
1444                   GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS)))),
1445       subchannel_cache_interval_(std::max(
1446           Duration::Zero(),
1447           channel_args()
1448               .GetDurationFromIntMillis(
1449                   GRPC_ARG_GRPCLB_SUBCHANNEL_CACHE_INTERVAL_MS)
1450               .value_or(Duration::Milliseconds(
1451                   GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS)))) {
1452   GRPC_TRACE_LOG(glb, INFO)
1453       << "[grpclb " << this << "] Will use '"
1454       << std::string(channel_control_helper()->GetAuthority())
1455       << "' as the server name for LB request.";
1456 }
1457 
ShutdownLocked()1458 void GrpcLb::ShutdownLocked() {
1459   shutting_down_ = true;
1460   lb_calld_.reset();
1461   if (subchannel_cache_timer_handle_.has_value()) {
1462     channel_control_helper()->GetEventEngine()->Cancel(
1463         *subchannel_cache_timer_handle_);
1464     subchannel_cache_timer_handle_.reset();
1465   }
1466   cached_subchannels_.clear();
1467   if (lb_call_retry_timer_handle_.has_value()) {
1468     channel_control_helper()->GetEventEngine()->Cancel(
1469         *lb_call_retry_timer_handle_);
1470   }
1471   if (fallback_at_startup_checks_pending_) {
1472     fallback_at_startup_checks_pending_ = false;
1473     channel_control_helper()->GetEventEngine()->Cancel(
1474         *lb_fallback_timer_handle_);
1475     CancelBalancerChannelConnectivityWatchLocked();
1476   }
1477   if (child_policy_ != nullptr) {
1478     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
1479                                      interested_parties());
1480     child_policy_.reset();
1481   }
1482   // We destroy the LB channel here instead of in our destructor because
1483   // destroying the channel triggers a last callback to
1484   // OnBalancerChannelConnectivityChangedLocked(), and we need to be
1485   // alive when that callback is invoked.
1486   if (lb_channel_ != nullptr) {
1487     if (parent_channelz_node_ != nullptr) {
1488       channelz::ChannelNode* child_channelz_node = lb_channel_->channelz_node();
1489       CHECK_NE(child_channelz_node, nullptr);
1490       parent_channelz_node_->RemoveChildChannel(child_channelz_node->uuid());
1491     }
1492     lb_channel_.reset();
1493   }
1494 }
1495 
1496 //
1497 // public methods
1498 //
1499 
ResetBackoffLocked()1500 void GrpcLb::ResetBackoffLocked() {
1501   if (lb_channel_ != nullptr) {
1502     lb_channel_->ResetConnectionBackoff();
1503   }
1504   if (child_policy_ != nullptr) {
1505     child_policy_->ResetBackoffLocked();
1506   }
1507 }
1508 
1509 // Endpoint iterator wrapper to add null LB token attribute.
1510 class GrpcLb::NullLbTokenEndpointIterator final
1511     : public EndpointAddressesIterator {
1512  public:
NullLbTokenEndpointIterator(std::shared_ptr<EndpointAddressesIterator> parent_it)1513   explicit NullLbTokenEndpointIterator(
1514       std::shared_ptr<EndpointAddressesIterator> parent_it)
1515       : parent_it_(std::move(parent_it)) {}
1516 
ForEach(absl::FunctionRef<void (const EndpointAddresses &)> callback) const1517   void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
1518       const override {
1519     parent_it_->ForEach([&](const EndpointAddresses& endpoint) {
1520       GRPC_TRACE_LOG(glb, INFO)
1521           << "[grpclb " << this
1522           << "] fallback address: " << endpoint.ToString();
1523       callback(EndpointAddresses(endpoint.addresses(),
1524                                  endpoint.args().SetObject(empty_token_)));
1525     });
1526   }
1527 
1528  private:
1529   std::shared_ptr<EndpointAddressesIterator> parent_it_;
1530   RefCountedPtr<TokenAndClientStatsArg> empty_token_ =
1531       MakeRefCounted<TokenAndClientStatsArg>(
1532           grpc_event_engine::experimental::Slice(), nullptr);
1533 };
1534 
UpdateLocked(UpdateArgs args)1535 absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
1536   GRPC_TRACE_LOG(glb, INFO) << "[grpclb " << this << "] received update";
1537   const bool is_initial_update = lb_channel_ == nullptr;
1538   config_ = args.config.TakeAsSubclass<GrpcLbConfig>();
1539   CHECK(config_ != nullptr);
1540   args_ = std::move(args.args);
1541   // Update fallback address list.
1542   if (!args.addresses.ok()) {
1543     fallback_backend_addresses_ = args.addresses.status();
1544   } else {
1545     fallback_backend_addresses_ = std::make_shared<NullLbTokenEndpointIterator>(
1546         std::move(*args.addresses));
1547   }
1548   resolution_note_ = std::move(args.resolution_note);
1549   // Update balancer channel.
1550   absl::Status status = UpdateBalancerChannelLocked();
1551   // Update the existing child policy, if any.
1552   if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
1553   // If this is the initial update, start the fallback-at-startup checks
1554   // and the balancer call.
1555   if (is_initial_update) {
1556     fallback_at_startup_checks_pending_ = true;
1557     // Start timer.
1558     lb_fallback_timer_handle_ =
1559         channel_control_helper()->GetEventEngine()->RunAfter(
1560             fallback_at_startup_timeout_,
1561             [self = RefAsSubclass<GrpcLb>(DEBUG_LOCATION,
1562                                           "on_fallback_timer")]() mutable {
1563               ApplicationCallbackExecCtx callback_exec_ctx;
1564               ExecCtx exec_ctx;
1565               auto self_ptr = self.get();
1566               self_ptr->work_serializer()->Run(
1567                   [self = std::move(self)]() { self->OnFallbackTimerLocked(); },
1568                   DEBUG_LOCATION);
1569             });
1570     // Start watching the channel's connectivity state.  If the channel
1571     // goes into state TRANSIENT_FAILURE before the timer fires, we go into
1572     // fallback mode even if the fallback timeout has not elapsed.
1573     watcher_ =
1574         new StateWatcher(RefAsSubclass<GrpcLb>(DEBUG_LOCATION, "StateWatcher"));
1575     lb_channel_->AddConnectivityWatcher(
1576         GRPC_CHANNEL_IDLE,
1577         OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
1578     // Start balancer call.
1579     StartBalancerCallLocked();
1580   }
1581   return status;
1582 }
1583 
1584 //
1585 // helpers for UpdateLocked()
1586 //
1587 
UpdateBalancerChannelLocked()1588 absl::Status GrpcLb::UpdateBalancerChannelLocked() {
1589   // Get balancer addresses.
1590   EndpointAddressesList balancer_addresses = ExtractBalancerAddresses(args_);
1591   if (GRPC_TRACE_FLAG_ENABLED(glb)) {
1592     for (const auto& endpoint : balancer_addresses) {
1593       LOG(INFO) << "[grpclb " << this
1594                 << "] balancer address: " << endpoint.ToString();
1595     }
1596   }
1597   absl::Status status;
1598   if (balancer_addresses.empty()) {
1599     status = absl::UnavailableError("balancer address list must be non-empty");
1600   }
1601   // Create channel credentials that do not contain call credentials.
1602   auto channel_credentials = channel_control_helper()->GetChannelCredentials();
1603   // Construct args for balancer channel.
1604   ChannelArgs lb_channel_args =
1605       BuildBalancerChannelArgs(response_generator_.get(), args_);
1606   // Create balancer channel if needed.
1607   if (lb_channel_ == nullptr) {
1608     std::string uri_str =
1609         absl::StrCat("fake:///", channel_control_helper()->GetAuthority());
1610     lb_channel_.reset(Channel::FromC(
1611         grpc_channel_create(uri_str.c_str(), channel_credentials.get(),
1612                             lb_channel_args.ToC().get())));
1613     CHECK(lb_channel_ != nullptr);
1614     // Set up channelz linkage.
1615     channelz::ChannelNode* child_channelz_node = lb_channel_->channelz_node();
1616     auto parent_channelz_node = args_.GetObjectRef<channelz::ChannelNode>();
1617     if (child_channelz_node != nullptr && parent_channelz_node != nullptr) {
1618       parent_channelz_node->AddChildChannel(child_channelz_node->uuid());
1619       parent_channelz_node_ = std::move(parent_channelz_node);
1620     }
1621   }
1622   // Propagate updates to the LB channel (pick_first) through the fake
1623   // resolver.
1624   Resolver::Result result;
1625   result.addresses = std::move(balancer_addresses);
1626   // Pass channel creds via channel args, since the fake resolver won't
1627   // do this automatically.
1628   result.args = lb_channel_args.SetObject(std::move(channel_credentials));
1629   response_generator_->SetResponseAsync(std::move(result));
1630   // Return status.
1631   return status;
1632 }
1633 
CancelBalancerChannelConnectivityWatchLocked()1634 void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
1635   lb_channel_->RemoveConnectivityWatcher(watcher_);
1636 }
1637 
1638 //
1639 // code for balancer channel and call
1640 //
1641 
StartBalancerCallLocked()1642 void GrpcLb::StartBalancerCallLocked() {
1643   CHECK(lb_channel_ != nullptr);
1644   if (shutting_down_) return;
1645   // Init the LB call data.
1646   CHECK(lb_calld_ == nullptr);
1647   lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
1648   GRPC_TRACE_LOG(glb, INFO)
1649       << "[grpclb " << this
1650       << "] Query for backends (lb_channel: " << lb_channel_.get()
1651       << ", lb_calld: " << lb_calld_.get() << ")";
1652   lb_calld_->StartQuery();
1653 }
1654 
StartBalancerCallRetryTimerLocked()1655 void GrpcLb::StartBalancerCallRetryTimerLocked() {
1656   Duration delay = lb_call_backoff_.NextAttemptDelay();
1657   if (GRPC_TRACE_FLAG_ENABLED(glb)) {
1658     LOG(INFO) << "[grpclb " << this << "] Connection to LB server lost...";
1659     if (delay > Duration::Zero()) {
1660       LOG(INFO) << "[grpclb " << this << "] ... retry_timer_active in "
1661                 << delay.millis() << "ms.";
1662     } else {
1663       LOG(INFO) << "[grpclb " << this
1664                 << "] ... retry_timer_active immediately.";
1665     }
1666   }
1667   lb_call_retry_timer_handle_ =
1668       channel_control_helper()->GetEventEngine()->RunAfter(
1669           delay,
1670           [self = RefAsSubclass<GrpcLb>(
1671                DEBUG_LOCATION, "on_balancer_call_retry_timer")]() mutable {
1672             ApplicationCallbackExecCtx callback_exec_ctx;
1673             ExecCtx exec_ctx;
1674             auto self_ptr = self.get();
1675             self_ptr->work_serializer()->Run(
1676                 [self = std::move(self)]() {
1677                   self->OnBalancerCallRetryTimerLocked();
1678                 },
1679                 DEBUG_LOCATION);
1680           });
1681 }
1682 
OnBalancerCallRetryTimerLocked()1683 void GrpcLb::OnBalancerCallRetryTimerLocked() {
1684   lb_call_retry_timer_handle_.reset();
1685   if (!shutting_down_ && lb_calld_ == nullptr) {
1686     GRPC_TRACE_LOG(glb, INFO)
1687         << "[grpclb " << this << "] Restarting call to LB server";
1688     StartBalancerCallLocked();
1689   }
1690 }
1691 
1692 //
1693 // code for handling fallback mode
1694 //
1695 
MaybeEnterFallbackModeAfterStartup()1696 void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
1697   // Enter fallback mode if all of the following are true:
1698   // - We are not currently in fallback mode.
1699   // - We are not currently waiting for the initial fallback timeout.
1700   // - We are not currently in contact with the balancer.
1701   // - The child policy is not in state READY.
1702   if (!fallback_mode_ && !fallback_at_startup_checks_pending_ &&
1703       (lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) &&
1704       !child_policy_ready_) {
1705     LOG(INFO) << "[grpclb " << this
1706               << "] lost contact with balancer and backends from most recent "
1707                  "serverlist; entering fallback mode";
1708     fallback_mode_ = true;
1709     CreateOrUpdateChildPolicyLocked();
1710   }
1711 }
1712 
OnFallbackTimerLocked()1713 void GrpcLb::OnFallbackTimerLocked() {
1714   // If we receive a serverlist after the timer fires but before this callback
1715   // actually runs, don't fall back.
1716   if (fallback_at_startup_checks_pending_ && !shutting_down_) {
1717     LOG(INFO) << "[grpclb " << this
1718               << "] No response from balancer after fallback timeout; "
1719                  "entering fallback mode";
1720     fallback_at_startup_checks_pending_ = false;
1721     CancelBalancerChannelConnectivityWatchLocked();
1722     fallback_mode_ = true;
1723     CreateOrUpdateChildPolicyLocked();
1724   }
1725 }
1726 
1727 //
1728 // code for interacting with the child policy
1729 //
1730 
CreateChildPolicyArgsLocked(bool is_backend_from_grpclb_load_balancer)1731 ChannelArgs GrpcLb::CreateChildPolicyArgsLocked(
1732     bool is_backend_from_grpclb_load_balancer) {
1733   ChannelArgs r =
1734       args_
1735           .Set(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER,
1736                is_backend_from_grpclb_load_balancer)
1737           .Set(GRPC_ARG_GRPCLB_ENABLE_LOAD_REPORTING_FILTER, 1);
1738   if (is_backend_from_grpclb_load_balancer) {
1739     r = r.Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
1740   }
1741   return r;
1742 }
1743 
CreateChildPolicyLocked(const ChannelArgs & args)1744 OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
1745     const ChannelArgs& args) {
1746   LoadBalancingPolicy::Args lb_policy_args;
1747   lb_policy_args.work_serializer = work_serializer();
1748   lb_policy_args.args = args;
1749   lb_policy_args.channel_control_helper =
1750       std::make_unique<Helper>(RefAsSubclass<GrpcLb>(DEBUG_LOCATION, "Helper"));
1751   OrphanablePtr<LoadBalancingPolicy> lb_policy =
1752       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), &glb_trace);
1753   GRPC_TRACE_LOG(glb, INFO)
1754       << "[grpclb " << this << "] Created new child policy handler ("
1755       << lb_policy.get() << ")";
1756   // Add the gRPC LB's interested_parties pollset_set to that of the newly
1757   // created child policy. This will make the child policy progress upon
1758   // activity on gRPC LB, which in turn is tied to the application's call.
1759   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1760                                    interested_parties());
1761   return lb_policy;
1762 }
1763 
EndpointIteratorIsEmpty(const EndpointAddressesIterator & endpoints)1764 bool EndpointIteratorIsEmpty(const EndpointAddressesIterator& endpoints) {
1765   bool empty = true;
1766   endpoints.ForEach([&](const EndpointAddresses&) { empty = false; });
1767   return empty;
1768 }
1769 
CreateOrUpdateChildPolicyLocked()1770 void GrpcLb::CreateOrUpdateChildPolicyLocked() {
1771   if (shutting_down_) return;
1772   // Construct update args.
1773   UpdateArgs update_args;
1774   bool is_backend_from_grpclb_load_balancer = false;
1775   if (fallback_mode_) {
1776     // If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
1777     // received any serverlist from the balancer, we use the fallback
1778     // backends returned by the resolver. Note that the fallback backend
1779     // list may be empty, in which case the new child policy will fail the
1780     // picks.
1781     update_args.addresses = fallback_backend_addresses_;
1782     if (fallback_backend_addresses_.ok() &&
1783         EndpointIteratorIsEmpty(**fallback_backend_addresses_)) {
1784       update_args.resolution_note = absl::StrCat(
1785           "grpclb in fallback mode without any fallback addresses: ",
1786           resolution_note_);
1787     }
1788   } else {
1789     update_args.addresses = serverlist_->GetServerAddressList(
1790         lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
1791     is_backend_from_grpclb_load_balancer = true;
1792     if (update_args.addresses.ok() &&
1793         EndpointIteratorIsEmpty(**update_args.addresses)) {
1794       update_args.resolution_note = "empty serverlist from grpclb balancer";
1795     }
1796   }
1797   update_args.args =
1798       CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
1799   CHECK(update_args.args != ChannelArgs());
1800   update_args.config = config_->child_policy();
1801   // Create child policy if needed.
1802   if (child_policy_ == nullptr) {
1803     child_policy_ = CreateChildPolicyLocked(update_args.args);
1804   }
1805   // Update the policy.
1806   GRPC_TRACE_LOG(glb, INFO)
1807       << "[grpclb " << this << "] Updating child policy handler "
1808       << child_policy_.get();
1809   // TODO(roth): If we're in fallback mode and the child policy rejects the
1810   // update, we should propagate that failure back to the resolver somehow.
1811   (void)child_policy_->UpdateLocked(std::move(update_args));
1812 }
1813 
1814 //
1815 // subchannel caching
1816 //
1817 
CacheDeletedSubchannelLocked(RefCountedPtr<SubchannelInterface> subchannel)1818 void GrpcLb::CacheDeletedSubchannelLocked(
1819     RefCountedPtr<SubchannelInterface> subchannel) {
1820   Timestamp deletion_time = Timestamp::Now() + subchannel_cache_interval_;
1821   cached_subchannels_[deletion_time].push_back(std::move(subchannel));
1822   if (!subchannel_cache_timer_handle_.has_value()) {
1823     StartSubchannelCacheTimerLocked();
1824   }
1825 }
1826 
StartSubchannelCacheTimerLocked()1827 void GrpcLb::StartSubchannelCacheTimerLocked() {
1828   CHECK(!cached_subchannels_.empty());
1829   subchannel_cache_timer_handle_ =
1830       channel_control_helper()->GetEventEngine()->RunAfter(
1831           cached_subchannels_.begin()->first - Timestamp::Now(),
1832           [self = RefAsSubclass<GrpcLb>(DEBUG_LOCATION,
1833                                         "OnSubchannelCacheTimer")]() mutable {
1834             ApplicationCallbackExecCtx callback_exec_ctx;
1835             ExecCtx exec_ctx;
1836             auto* self_ptr = self.get();
1837             self_ptr->work_serializer()->Run(
1838                 [self = std::move(self)]() mutable {
1839                   self->OnSubchannelCacheTimerLocked();
1840                 },
1841                 DEBUG_LOCATION);
1842           });
1843 }
1844 
OnSubchannelCacheTimerLocked()1845 void GrpcLb::OnSubchannelCacheTimerLocked() {
1846   if (subchannel_cache_timer_handle_.has_value()) {
1847     subchannel_cache_timer_handle_.reset();
1848     auto it = cached_subchannels_.begin();
1849     if (it != cached_subchannels_.end()) {
1850       GRPC_TRACE_LOG(glb, INFO)
1851           << "[grpclb " << this << "] removing " << it->second.size()
1852           << " subchannels from cache";
1853       cached_subchannels_.erase(it);
1854     }
1855     if (!cached_subchannels_.empty()) {
1856       StartSubchannelCacheTimerLocked();
1857       return;
1858     }
1859   }
1860 }
1861 
1862 //
1863 // factory
1864 //
1865 
1866 class GrpcLbFactory final : public LoadBalancingPolicyFactory {
1867  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const1868   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1869       LoadBalancingPolicy::Args args) const override {
1870     return MakeOrphanable<GrpcLb>(std::move(args));
1871   }
1872 
name() const1873   absl::string_view name() const override { return kGrpclb; }
1874 
1875   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const1876   ParseLoadBalancingConfig(const Json& json) const override {
1877     return LoadFromJson<RefCountedPtr<GrpcLbConfig>>(
1878         json, JsonArgs(), "errors validating grpclb LB policy config");
1879   }
1880 };
1881 
1882 }  // namespace
1883 
1884 //
1885 // Plugin registration
1886 //
1887 
RegisterGrpcLbPolicy(CoreConfiguration::Builder * builder)1888 void RegisterGrpcLbPolicy(CoreConfiguration::Builder* builder) {
1889   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
1890       std::make_unique<GrpcLbFactory>());
1891   builder->channel_init()
1892       ->RegisterFilter<ClientLoadReportingFilter>(GRPC_CLIENT_SUBCHANNEL)
1893       .IfChannelArg(GRPC_ARG_GRPCLB_ENABLE_LOAD_REPORTING_FILTER, false);
1894 }
1895 
1896 }  // namespace grpc_core
1897