• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2016 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 /// Implementation of the gRPC LB policy.
18 ///
19 /// This policy takes as input a list of resolved addresses, which must
20 /// include at least one balancer address.
21 ///
22 /// An internal channel (\a lb_channel_) is created for the addresses
23 /// from that are balancers.  This channel behaves just like a regular
24 /// channel that uses pick_first to select from the list of balancer
25 /// addresses.
26 ///
27 /// When we get our initial update, we instantiate the internal *streaming*
28 /// call to the LB server (whichever address pick_first chose).  The call
29 /// will be complete when either the balancer sends status or when we cancel
30 /// the call (e.g., because we are shutting down).  In needed, we retry the
31 /// call.  If we received at least one valid message from the server, a new
32 /// call attempt will be made immediately; otherwise, we apply back-off
33 /// delays between attempts.
34 ///
35 /// We maintain an internal round_robin policy instance for distributing
36 /// requests across backends.  Whenever we receive a new serverlist from
37 /// the balancer, we update the round_robin policy with the new list of
38 /// addresses.  If we cannot communicate with the balancer on startup,
39 /// however, we may enter fallback mode, in which case we will populate
40 /// the child policy's addresses from the backend addresses returned by the
41 /// resolver.
42 ///
43 /// Once a child policy instance is in place (and getting updated as described),
44 /// calls for a pick, a ping, or a cancellation will be serviced right
45 /// away by forwarding them to the child policy instance.  Any time there's no
46 /// child policy available (i.e., right after the creation of the gRPCLB
47 /// policy), pick requests are queued.
48 ///
49 /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
50 /// high level design and details.
51 
52 #include <grpc/support/port_platform.h>
53 
54 #include "src/core/load_balancing/grpclb/grpclb.h"
55 
56 #include <grpc/event_engine/event_engine.h>
57 #include <grpc/impl/channel_arg_names.h>
58 #include <grpc/support/json.h>
59 
60 // IWYU pragma: no_include <sys/socket.h>
61 
62 #include <inttypes.h>
63 #include <string.h>
64 
65 #include <algorithm>
66 #include <atomic>
67 #include <map>
68 #include <memory>
69 #include <string>
70 #include <type_traits>
71 #include <utility>
72 #include <vector>
73 
74 #include "absl/container/inlined_vector.h"
75 #include "absl/functional/function_ref.h"
76 #include "absl/status/status.h"
77 #include "absl/status/statusor.h"
78 #include "absl/strings/str_cat.h"
79 #include "absl/strings/str_format.h"
80 #include "absl/strings/str_join.h"
81 #include "absl/strings/string_view.h"
82 #include "absl/types/optional.h"
83 #include "absl/types/variant.h"
84 #include "upb/mem/arena.hpp"
85 
86 #include <grpc/byte_buffer.h>
87 #include <grpc/byte_buffer_reader.h>
88 #include <grpc/grpc.h>
89 #include <grpc/impl/connectivity_state.h>
90 #include <grpc/impl/propagation_bits.h>
91 #include <grpc/slice.h>
92 #include <grpc/status.h>
93 #include <grpc/support/alloc.h>
94 #include <grpc/support/log.h>
95 
96 #include "src/core/client_channel/client_channel_filter.h"
97 #include "src/core/lib/address_utils/sockaddr_utils.h"
98 #include "src/core/lib/backoff/backoff.h"
99 #include "src/core/lib/channel/channel_args.h"
100 #include "src/core/lib/channel/channelz.h"
101 #include "src/core/lib/config/core_configuration.h"
102 #include "src/core/lib/debug/trace.h"
103 #include "src/core/lib/experiments/experiments.h"
104 #include "src/core/lib/gpr/string.h"
105 #include "src/core/lib/gpr/useful.h"
106 #include "src/core/lib/gprpp/crash.h"
107 #include "src/core/lib/gprpp/debug_location.h"
108 #include "src/core/lib/gprpp/orphanable.h"
109 #include "src/core/lib/gprpp/ref_counted.h"
110 #include "src/core/lib/gprpp/ref_counted_ptr.h"
111 #include "src/core/lib/gprpp/status_helper.h"
112 #include "src/core/lib/gprpp/time.h"
113 #include "src/core/lib/gprpp/validation_errors.h"
114 #include "src/core/lib/gprpp/work_serializer.h"
115 #include "src/core/lib/iomgr/closure.h"
116 #include "src/core/lib/iomgr/error.h"
117 #include "src/core/lib/iomgr/exec_ctx.h"
118 #include "src/core/lib/iomgr/pollset_set.h"
119 #include "src/core/lib/iomgr/resolved_address.h"
120 #include "src/core/lib/iomgr/sockaddr.h"
121 #include "src/core/lib/iomgr/socket_utils.h"
122 #include "src/core/lib/json/json.h"
123 #include "src/core/lib/json/json_args.h"
124 #include "src/core/lib/json/json_object_loader.h"
125 #include "src/core/lib/security/credentials/credentials.h"
126 #include "src/core/lib/slice/slice.h"
127 #include "src/core/lib/slice/slice_string_helpers.h"
128 #include "src/core/lib/surface/call.h"
129 #include "src/core/lib/surface/channel.h"
130 #include "src/core/lib/surface/channel_stack_type.h"
131 #include "src/core/lib/transport/connectivity_state.h"
132 #include "src/core/lib/transport/metadata_batch.h"
133 #include "src/core/load_balancing/child_policy_handler.h"
134 #include "src/core/load_balancing/delegating_helper.h"
135 #include "src/core/load_balancing/grpclb/client_load_reporting_filter.h"
136 #include "src/core/load_balancing/grpclb/grpclb_balancer_addresses.h"
137 #include "src/core/load_balancing/grpclb/grpclb_client_stats.h"
138 #include "src/core/load_balancing/grpclb/load_balancer_api.h"
139 #include "src/core/load_balancing/lb_policy.h"
140 #include "src/core/load_balancing/lb_policy_factory.h"
141 #include "src/core/load_balancing/lb_policy_registry.h"
142 #include "src/core/load_balancing/subchannel_interface.h"
143 #include "src/core/resolver/endpoint_addresses.h"
144 #include "src/core/resolver/fake/fake_resolver.h"
145 #include "src/core/resolver/resolver.h"
146 
147 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
148 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
149 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
150 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2
151 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
152 #define GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS 10000
153 
154 // Channel arg used to enable load reporting filter.
155 #define GRPC_ARG_GRPCLB_ENABLE_LOAD_REPORTING_FILTER \
156   "grpc.internal.grpclb_enable_load_reporting_filter"
157 
158 namespace grpc_core {
159 
160 TraceFlag grpc_lb_glb_trace(false, "glb");
161 
162 namespace {
163 
164 using ::grpc_event_engine::experimental::EventEngine;
165 
166 constexpr absl::string_view kGrpclb = "grpclb";
167 
168 class GrpcLbConfig final : public LoadBalancingPolicy::Config {
169  public:
170   GrpcLbConfig() = default;
171 
172   GrpcLbConfig(const GrpcLbConfig&) = delete;
173   GrpcLbConfig& operator=(const GrpcLbConfig&) = delete;
174 
175   GrpcLbConfig(GrpcLbConfig&& other) = delete;
176   GrpcLbConfig& operator=(GrpcLbConfig&& other) = delete;
177 
JsonLoader(const JsonArgs &)178   static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
179     static const auto* loader =
180         JsonObjectLoader<GrpcLbConfig>()
181             // Note: "childPolicy" field requires custom parsing, so
182             // it's handled in JsonPostLoad() instead.
183             .OptionalField("serviceName", &GrpcLbConfig::service_name_)
184             .Finish();
185     return loader;
186   }
187 
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)188   void JsonPostLoad(const Json& json, const JsonArgs&,
189                     ValidationErrors* errors) {
190     ValidationErrors::ScopedField field(errors, ".childPolicy");
191     Json child_policy_config_json_tmp;
192     const Json* child_policy_config_json;
193     auto it = json.object().find("childPolicy");
194     if (it == json.object().end()) {
195       child_policy_config_json_tmp = Json::FromArray({Json::FromObject({
196           {"round_robin", Json::FromObject({})},
197       })});
198       child_policy_config_json = &child_policy_config_json_tmp;
199     } else {
200       child_policy_config_json = &it->second;
201     }
202     auto child_policy_config =
203         CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
204             *child_policy_config_json);
205     if (!child_policy_config.ok()) {
206       errors->AddError(child_policy_config.status().message());
207       return;
208     }
209     child_policy_ = std::move(*child_policy_config);
210   }
211 
name() const212   absl::string_view name() const override { return kGrpclb; }
213 
child_policy() const214   RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
215     return child_policy_;
216   }
217 
service_name() const218   const std::string& service_name() const { return service_name_; }
219 
220  private:
221   RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
222   std::string service_name_;
223 };
224 
225 class GrpcLb final : public LoadBalancingPolicy {
226  public:
227   explicit GrpcLb(Args args);
228 
name() const229   absl::string_view name() const override { return kGrpclb; }
230 
231   absl::Status UpdateLocked(UpdateArgs args) override;
232   void ResetBackoffLocked() override;
233 
234  private:
235   /// Contains a call to the LB server and all the data related to the call.
236   class BalancerCallState final
237       : public InternallyRefCounted<BalancerCallState> {
238    public:
239     explicit BalancerCallState(
240         RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
241     ~BalancerCallState() override;
242 
243     // It's the caller's responsibility to ensure that Orphan() is called from
244     // inside the combiner.
245     void Orphan() override;
246 
247     void StartQuery();
248 
client_stats() const249     GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
250 
seen_initial_response() const251     bool seen_initial_response() const { return seen_initial_response_; }
seen_serverlist() const252     bool seen_serverlist() const { return seen_serverlist_; }
253 
254    private:
grpclb_policy() const255     GrpcLb* grpclb_policy() const {
256       return static_cast<GrpcLb*>(grpclb_policy_.get());
257     }
258 
259     void ScheduleNextClientLoadReportLocked();
260     void SendClientLoadReportLocked();
261 
262     // EventEngine callbacks
263     void MaybeSendClientLoadReportLocked();
264 
265     static void ClientLoadReportDone(void* arg, grpc_error_handle error);
266     static void OnInitialRequestSent(void* arg, grpc_error_handle error);
267     static void OnBalancerMessageReceived(void* arg, grpc_error_handle error);
268     static void OnBalancerStatusReceived(void* arg, grpc_error_handle error);
269 
270     void ClientLoadReportDoneLocked(grpc_error_handle error);
271     void OnInitialRequestSentLocked();
272     void OnBalancerMessageReceivedLocked();
273     void OnBalancerStatusReceivedLocked(grpc_error_handle error);
274 
275     // The owning LB policy.
276     RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
277 
278     // The streaming call to the LB server. Always non-NULL.
279     grpc_call* lb_call_ = nullptr;
280 
281     // recv_initial_metadata
282     grpc_metadata_array lb_initial_metadata_recv_;
283 
284     // send_message
285     grpc_byte_buffer* send_message_payload_ = nullptr;
286     grpc_closure lb_on_initial_request_sent_;
287 
288     // recv_message
289     grpc_byte_buffer* recv_message_payload_ = nullptr;
290     grpc_closure lb_on_balancer_message_received_;
291     bool seen_initial_response_ = false;
292     bool seen_serverlist_ = false;
293 
294     // recv_trailing_metadata
295     grpc_closure lb_on_balancer_status_received_;
296     grpc_metadata_array lb_trailing_metadata_recv_;
297     grpc_status_code lb_call_status_;
298     grpc_slice lb_call_status_details_;
299 
300     // The stats for client-side load reporting associated with this LB call.
301     // Created after the first serverlist is received.
302     RefCountedPtr<GrpcLbClientStats> client_stats_;
303     Duration client_stats_report_interval_;
304     absl::optional<EventEngine::TaskHandle> client_load_report_handle_;
305     bool last_client_load_report_counters_were_zero_ = false;
306     bool client_load_report_is_due_ = false;
307     // The closure used for the completion of sending the load report.
308     grpc_closure client_load_report_done_closure_;
309   };
310 
311   class SubchannelWrapper final : public DelegatingSubchannel {
312    public:
SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,RefCountedPtr<GrpcLb> lb_policy,std::string lb_token,RefCountedPtr<GrpcLbClientStats> client_stats)313     SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,
314                       RefCountedPtr<GrpcLb> lb_policy, std::string lb_token,
315                       RefCountedPtr<GrpcLbClientStats> client_stats)
316         : DelegatingSubchannel(std::move(subchannel)),
317           lb_policy_(std::move(lb_policy)),
318           lb_token_(std::move(lb_token)),
319           client_stats_(std::move(client_stats)) {}
320 
lb_token() const321     const std::string& lb_token() const { return lb_token_; }
client_stats() const322     GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
323 
324    private:
Orphaned()325     void Orphaned() override {
326       if (!IsWorkSerializerDispatchEnabled()) {
327         if (!lb_policy_->shutting_down_) {
328           lb_policy_->CacheDeletedSubchannelLocked(wrapped_subchannel());
329         }
330         return;
331       }
332       lb_policy_->work_serializer()->Run(
333           [self = WeakRefAsSubclass<SubchannelWrapper>()]() {
334             if (!self->lb_policy_->shutting_down_) {
335               self->lb_policy_->CacheDeletedSubchannelLocked(
336                   self->wrapped_subchannel());
337             }
338           },
339           DEBUG_LOCATION);
340     }
341 
342     RefCountedPtr<GrpcLb> lb_policy_;
343     std::string lb_token_;
344     RefCountedPtr<GrpcLbClientStats> client_stats_;
345   };
346 
347   class TokenAndClientStatsArg final
348       : public RefCounted<TokenAndClientStatsArg> {
349    public:
TokenAndClientStatsArg(std::string lb_token,RefCountedPtr<GrpcLbClientStats> client_stats)350     TokenAndClientStatsArg(std::string lb_token,
351                            RefCountedPtr<GrpcLbClientStats> client_stats)
352         : lb_token_(std::move(lb_token)),
353           client_stats_(std::move(client_stats)) {}
354 
ChannelArgName()355     static absl::string_view ChannelArgName() {
356       return GRPC_ARG_NO_SUBCHANNEL_PREFIX "grpclb_token_and_client_stats";
357     }
358 
ChannelArgsCompare(const TokenAndClientStatsArg * a,const TokenAndClientStatsArg * b)359     static int ChannelArgsCompare(const TokenAndClientStatsArg* a,
360                                   const TokenAndClientStatsArg* b) {
361       int r = a->lb_token_.compare(b->lb_token_);
362       if (r != 0) return r;
363       return QsortCompare(a->client_stats_.get(), b->client_stats_.get());
364     }
365 
lb_token() const366     const std::string& lb_token() const { return lb_token_; }
client_stats() const367     RefCountedPtr<GrpcLbClientStats> client_stats() const {
368       return client_stats_;
369     }
370 
371    private:
372     std::string lb_token_;
373     RefCountedPtr<GrpcLbClientStats> client_stats_;
374   };
375 
376   class Serverlist final : public RefCounted<Serverlist> {
377    public:
378     // Takes ownership of serverlist.
Serverlist(std::vector<GrpcLbServer> serverlist)379     explicit Serverlist(std::vector<GrpcLbServer> serverlist)
380         : serverlist_(std::move(serverlist)) {}
381 
382     bool operator==(const Serverlist& other) const;
383 
serverlist() const384     const std::vector<GrpcLbServer>& serverlist() const { return serverlist_; }
385 
386     // Returns a text representation suitable for logging.
387     std::string AsText() const;
388 
389     // Extracts all non-drop entries into an EndpointAddressesIterator.
390     std::shared_ptr<EndpointAddressesIterator> GetServerAddressList(
391         GrpcLbClientStats* client_stats);
392 
393     // Returns true if the serverlist contains at least one drop entry and
394     // no backend address entries.
395     bool ContainsAllDropEntries() const;
396 
397     // Returns the LB token to use for a drop, or null if the call
398     // should not be dropped.
399     //
400     // Note: This is called from the picker, NOT from inside the control
401     // plane work_serializer.
402     const char* ShouldDrop();
403 
404    private:
405     class AddressIterator;
406 
407     std::vector<GrpcLbServer> serverlist_;
408 
409     // Accessed from the picker, so needs synchronization.
410     std::atomic<size_t> drop_index_{0};
411   };
412 
413   class Picker final : public SubchannelPicker {
414    public:
Picker(RefCountedPtr<Serverlist> serverlist,RefCountedPtr<SubchannelPicker> child_picker,RefCountedPtr<GrpcLbClientStats> client_stats)415     Picker(RefCountedPtr<Serverlist> serverlist,
416            RefCountedPtr<SubchannelPicker> child_picker,
417            RefCountedPtr<GrpcLbClientStats> client_stats)
418         : serverlist_(std::move(serverlist)),
419           child_picker_(std::move(child_picker)),
420           client_stats_(std::move(client_stats)) {}
421 
422     PickResult Pick(PickArgs args) override;
423 
424    private:
425     // A subchannel call tracker that unrefs the GrpcLbClientStats object
426     // in the case where the subchannel call is never actually started,
427     // since the client load reporting filter will not be able to do it
428     // in that case.
429     class SubchannelCallTracker final : public SubchannelCallTrackerInterface {
430      public:
SubchannelCallTracker(RefCountedPtr<GrpcLbClientStats> client_stats,std::unique_ptr<SubchannelCallTrackerInterface> original_call_tracker)431       SubchannelCallTracker(
432           RefCountedPtr<GrpcLbClientStats> client_stats,
433           std::unique_ptr<SubchannelCallTrackerInterface> original_call_tracker)
434           : client_stats_(std::move(client_stats)),
435             original_call_tracker_(std::move(original_call_tracker)) {}
436 
Start()437       void Start() override {
438         if (original_call_tracker_ != nullptr) {
439           original_call_tracker_->Start();
440         }
441         // If we're actually starting the subchannel call, then the
442         // client load reporting filter will take ownership of the ref
443         // passed down to it via metadata.
444         client_stats_.release();
445       }
446 
Finish(FinishArgs args)447       void Finish(FinishArgs args) override {
448         if (original_call_tracker_ != nullptr) {
449           original_call_tracker_->Finish(args);
450         }
451       }
452 
453      private:
454       RefCountedPtr<GrpcLbClientStats> client_stats_;
455       std::unique_ptr<SubchannelCallTrackerInterface> original_call_tracker_;
456     };
457 
458     // Serverlist to be used for determining drops.
459     RefCountedPtr<Serverlist> serverlist_;
460 
461     RefCountedPtr<SubchannelPicker> child_picker_;
462     RefCountedPtr<GrpcLbClientStats> client_stats_;
463   };
464 
465   class Helper final
466       : public ParentOwningDelegatingChannelControlHelper<GrpcLb> {
467    public:
Helper(RefCountedPtr<GrpcLb> parent)468     explicit Helper(RefCountedPtr<GrpcLb> parent)
469         : ParentOwningDelegatingChannelControlHelper(std::move(parent)) {}
470 
471     RefCountedPtr<SubchannelInterface> CreateSubchannel(
472         const grpc_resolved_address& address,
473         const ChannelArgs& per_address_args, const ChannelArgs& args) override;
474     void UpdateState(grpc_connectivity_state state, const absl::Status& status,
475                      RefCountedPtr<SubchannelPicker> picker) override;
476     void RequestReresolution() override;
477   };
478 
479   class StateWatcher final : public AsyncConnectivityStateWatcherInterface {
480    public:
StateWatcher(RefCountedPtr<GrpcLb> parent)481     explicit StateWatcher(RefCountedPtr<GrpcLb> parent)
482         : AsyncConnectivityStateWatcherInterface(parent->work_serializer()),
483           parent_(std::move(parent)) {}
484 
~StateWatcher()485     ~StateWatcher() override { parent_.reset(DEBUG_LOCATION, "StateWatcher"); }
486 
487    private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)488     void OnConnectivityStateChange(grpc_connectivity_state new_state,
489                                    const absl::Status& status) override {
490       if (parent_->fallback_at_startup_checks_pending_ &&
491           new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
492         // In TRANSIENT_FAILURE.  Cancel the fallback timer and go into
493         // fallback mode immediately.
494         gpr_log(GPR_INFO,
495                 "[grpclb %p] balancer channel in state:TRANSIENT_FAILURE (%s); "
496                 "entering fallback mode",
497                 parent_.get(), status.ToString().c_str());
498         parent_->fallback_at_startup_checks_pending_ = false;
499         parent_->channel_control_helper()->GetEventEngine()->Cancel(
500             *parent_->lb_fallback_timer_handle_);
501         parent_->fallback_mode_ = true;
502         parent_->CreateOrUpdateChildPolicyLocked();
503         // Cancel the watch, since we don't care about the channel state once we
504         // go into fallback mode.
505         parent_->CancelBalancerChannelConnectivityWatchLocked();
506       }
507     }
508 
509     RefCountedPtr<GrpcLb> parent_;
510   };
511 
512   class NullLbTokenEndpointIterator;
513 
514   void ShutdownLocked() override;
515 
516   // Helper functions used in UpdateLocked().
517   absl::Status UpdateBalancerChannelLocked();
518 
519   void CancelBalancerChannelConnectivityWatchLocked();
520 
521   // Methods for dealing with fallback state.
522   void MaybeEnterFallbackModeAfterStartup();
523   void OnFallbackTimerLocked();
524 
525   // Methods for dealing with the balancer call.
526   void StartBalancerCallLocked();
527   void StartBalancerCallRetryTimerLocked();
528   void OnBalancerCallRetryTimerLocked();
529 
530   // Methods for dealing with the child policy.
531   ChannelArgs CreateChildPolicyArgsLocked(
532       bool is_backend_from_grpclb_load_balancer);
533   OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
534       const ChannelArgs& args);
535   void CreateOrUpdateChildPolicyLocked();
536 
537   // Subchannel caching.
538   void CacheDeletedSubchannelLocked(
539       RefCountedPtr<SubchannelInterface> subchannel);
540   void StartSubchannelCacheTimerLocked();
541   void OnSubchannelCacheTimerLocked();
542 
543   // Configurations for the policy.
544   RefCountedPtr<GrpcLbConfig> config_;
545 
546   // Current channel args from the resolver.
547   ChannelArgs args_;
548 
549   // Internal state.
550   bool shutting_down_ = false;
551 
552   // The channel for communicating with the LB server.
553   OrphanablePtr<Channel> lb_channel_;
554   StateWatcher* watcher_ = nullptr;
555   // Response generator to inject address updates into lb_channel_.
556   RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
557   // Parent channelz node.
558   RefCountedPtr<channelz::ChannelNode> parent_channelz_node_;
559 
560   // The data associated with the current LB call. It holds a ref to this LB
561   // policy. It's initialized every time we query for backends. It's reset to
562   // NULL whenever the current LB call is no longer needed (e.g., the LB policy
563   // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
564   // contains a non-NULL lb_call_.
565   OrphanablePtr<BalancerCallState> lb_calld_;
566   // Timeout for the LB call. 0 means no deadline.
567   const Duration lb_call_timeout_;
568   // Balancer call retry state.
569   BackOff lb_call_backoff_;
570   absl::optional<EventEngine::TaskHandle> lb_call_retry_timer_handle_;
571 
572   // The deserialized response from the balancer. May be nullptr until one
573   // such response has arrived.
574   RefCountedPtr<Serverlist> serverlist_;
575 
576   // Whether we're in fallback mode.
577   bool fallback_mode_ = false;
578   // The backend addresses from the resolver.
579   absl::StatusOr<std::shared_ptr<NullLbTokenEndpointIterator>>
580       fallback_backend_addresses_;
581   // The last resolution note from our parent.
582   // To be passed to child policy when fallback_backend_addresses_ is empty.
583   std::string resolution_note_;
584   // State for fallback-at-startup checks.
585   // Timeout after startup after which we will go into fallback mode if
586   // we have not received a serverlist from the balancer.
587   const Duration fallback_at_startup_timeout_;
588   bool fallback_at_startup_checks_pending_ = false;
589   absl::optional<EventEngine::TaskHandle> lb_fallback_timer_handle_;
590 
591   // The child policy to use for the backends.
592   OrphanablePtr<LoadBalancingPolicy> child_policy_;
593   // Child policy in state READY.
594   bool child_policy_ready_ = false;
595 
596   // Deleted subchannel caching.
597   const Duration subchannel_cache_interval_;
598   std::map<Timestamp /*deletion time*/,
599            std::vector<RefCountedPtr<SubchannelInterface>>>
600       cached_subchannels_;
601   absl::optional<EventEngine::TaskHandle> subchannel_cache_timer_handle_;
602 };
603 
604 //
605 // GrpcLb::Serverlist::AddressIterator
606 //
607 
IsServerValid(const GrpcLbServer & server,size_t idx,bool log)608 bool IsServerValid(const GrpcLbServer& server, size_t idx, bool log) {
609   if (server.drop) return false;
610   if (GPR_UNLIKELY(server.port >> 16 != 0)) {
611     if (log) {
612       gpr_log(GPR_ERROR,
613               "Invalid port '%d' at index %" PRIuPTR
614               " of serverlist. Ignoring.",
615               server.port, idx);
616     }
617     return false;
618   }
619   if (GPR_UNLIKELY(server.ip_size != 4 && server.ip_size != 16)) {
620     if (log) {
621       gpr_log(GPR_ERROR,
622               "Expected IP to be 4 or 16 bytes, got %d at index %" PRIuPTR
623               " of serverlist. Ignoring",
624               server.ip_size, idx);
625     }
626     return false;
627   }
628   return true;
629 }
630 
ParseServer(const GrpcLbServer & server,grpc_resolved_address * addr)631 void ParseServer(const GrpcLbServer& server, grpc_resolved_address* addr) {
632   memset(addr, 0, sizeof(*addr));
633   if (server.drop) return;
634   const uint16_t netorder_port = grpc_htons(static_cast<uint16_t>(server.port));
635   // the addresses are given in binary format (a in(6)_addr struct) in
636   // server->ip_address.bytes.
637   if (server.ip_size == 4) {
638     addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
639     grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
640     addr4->sin_family = GRPC_AF_INET;
641     memcpy(&addr4->sin_addr, server.ip_addr, server.ip_size);
642     addr4->sin_port = netorder_port;
643   } else if (server.ip_size == 16) {
644     addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
645     grpc_sockaddr_in6* addr6 =
646         reinterpret_cast<grpc_sockaddr_in6*>(&addr->addr);
647     addr6->sin6_family = GRPC_AF_INET6;
648     memcpy(&addr6->sin6_addr, server.ip_addr, server.ip_size);
649     addr6->sin6_port = netorder_port;
650   }
651 }
652 
653 class GrpcLb::Serverlist::AddressIterator final
654     : public EndpointAddressesIterator {
655  public:
AddressIterator(RefCountedPtr<Serverlist> serverlist,RefCountedPtr<GrpcLbClientStats> client_stats)656   AddressIterator(RefCountedPtr<Serverlist> serverlist,
657                   RefCountedPtr<GrpcLbClientStats> client_stats)
658       : serverlist_(std::move(serverlist)),
659         client_stats_(std::move(client_stats)) {}
660 
ForEach(absl::FunctionRef<void (const EndpointAddresses &)> callback) const661   void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
662       const override {
663     for (size_t i = 0; i < serverlist_->serverlist_.size(); ++i) {
664       const GrpcLbServer& server = serverlist_->serverlist_[i];
665       if (!IsServerValid(server, i, false)) continue;
666       // Address processing.
667       grpc_resolved_address addr;
668       ParseServer(server, &addr);
669       // LB token processing.
670       const size_t lb_token_length = strnlen(
671           server.load_balance_token, GPR_ARRAY_SIZE(server.load_balance_token));
672       std::string lb_token(server.load_balance_token, lb_token_length);
673       if (lb_token.empty()) {
674         auto addr_uri = grpc_sockaddr_to_uri(&addr);
675         gpr_log(GPR_INFO,
676                 "Missing LB token for backend address '%s'. The empty token "
677                 "will be used instead",
678                 addr_uri.ok() ? addr_uri->c_str()
679                               : addr_uri.status().ToString().c_str());
680       }
681       // Return address with a channel arg containing LB token and stats object.
682       callback(EndpointAddresses(
683           addr, ChannelArgs().SetObject(MakeRefCounted<TokenAndClientStatsArg>(
684                     std::move(lb_token), client_stats_))));
685     }
686   }
687 
688  private:
689   RefCountedPtr<Serverlist> serverlist_;
690   RefCountedPtr<GrpcLbClientStats> client_stats_;
691 };
692 
693 //
694 // GrpcLb::Serverlist
695 //
696 
operator ==(const Serverlist & other) const697 bool GrpcLb::Serverlist::operator==(const Serverlist& other) const {
698   return serverlist_ == other.serverlist_;
699 }
700 
AsText() const701 std::string GrpcLb::Serverlist::AsText() const {
702   std::vector<std::string> entries;
703   for (size_t i = 0; i < serverlist_.size(); ++i) {
704     const GrpcLbServer& server = serverlist_[i];
705     std::string ipport;
706     if (server.drop) {
707       ipport = "(drop)";
708     } else {
709       grpc_resolved_address addr;
710       ParseServer(server, &addr);
711       auto addr_str = grpc_sockaddr_to_string(&addr, false);
712       ipport = addr_str.ok() ? *addr_str : addr_str.status().ToString();
713     }
714     entries.push_back(absl::StrFormat("  %" PRIuPTR ": %s token=%s\n", i,
715                                       ipport, server.load_balance_token));
716   }
717   return absl::StrJoin(entries, "");
718 }
719 
720 // Returns addresses extracted from the serverlist.
721 std::shared_ptr<EndpointAddressesIterator>
GetServerAddressList(GrpcLbClientStats * client_stats)722 GrpcLb::Serverlist::GetServerAddressList(GrpcLbClientStats* client_stats) {
723   RefCountedPtr<GrpcLbClientStats> stats;
724   if (client_stats != nullptr) stats = client_stats->Ref();
725   return std::make_shared<AddressIterator>(Ref(), std::move(stats));
726 }
727 
ContainsAllDropEntries() const728 bool GrpcLb::Serverlist::ContainsAllDropEntries() const {
729   if (serverlist_.empty()) return false;
730   for (const GrpcLbServer& server : serverlist_) {
731     if (!server.drop) return false;
732   }
733   return true;
734 }
735 
ShouldDrop()736 const char* GrpcLb::Serverlist::ShouldDrop() {
737   if (serverlist_.empty()) return nullptr;
738   size_t index = drop_index_.fetch_add(1, std::memory_order_relaxed);
739   GrpcLbServer& server = serverlist_[index % serverlist_.size()];
740   return server.drop ? server.load_balance_token : nullptr;
741 }
742 
743 //
744 // GrpcLb::Picker
745 //
746 
Pick(PickArgs args)747 GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
748   // Check if we should drop the call.
749   const char* drop_token =
750       serverlist_ == nullptr ? nullptr : serverlist_->ShouldDrop();
751   if (drop_token != nullptr) {
752     // Update client load reporting stats to indicate the number of
753     // dropped calls.  Note that we have to do this here instead of in
754     // the client_load_reporting filter, because we do not create a
755     // subchannel call (and therefore no client_load_reporting filter)
756     // for dropped calls.
757     if (client_stats_ != nullptr) {
758       client_stats_->AddCallDropped(drop_token);
759     }
760     return PickResult::Drop(
761         absl::UnavailableError("drop directed by grpclb balancer"));
762   }
763   // Forward pick to child policy.
764   PickResult result = child_picker_->Pick(args);
765   // If pick succeeded, add LB token to initial metadata.
766   auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
767   if (complete_pick != nullptr) {
768     const SubchannelWrapper* subchannel_wrapper =
769         static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
770     // Encode client stats object into metadata for use by
771     // client_load_reporting filter.
772     GrpcLbClientStats* client_stats = subchannel_wrapper->client_stats();
773     if (client_stats != nullptr) {
774       complete_pick->subchannel_call_tracker =
775           std::make_unique<SubchannelCallTracker>(
776               client_stats->Ref(),
777               std::move(complete_pick->subchannel_call_tracker));
778       // The metadata value is a hack: we pretend the pointer points to
779       // a string and rely on the client_load_reporting filter to know
780       // how to interpret it.
781       // NOLINTBEGIN(bugprone-string-constructor)
782       args.initial_metadata->Add(
783           GrpcLbClientStatsMetadata::key(),
784           absl::string_view(reinterpret_cast<const char*>(client_stats), 0));
785       // NOLINTEND(bugprone-string-constructor)
786       // Update calls-started.
787       client_stats->AddCallStarted();
788     }
789     // Encode the LB token in metadata.
790     // Create a new copy on the call arena, since the subchannel list
791     // may get refreshed between when we return this pick and when the
792     // initial metadata goes out on the wire.
793     if (!subchannel_wrapper->lb_token().empty()) {
794       char* lb_token = static_cast<char*>(
795           args.call_state->Alloc(subchannel_wrapper->lb_token().size() + 1));
796       strcpy(lb_token, subchannel_wrapper->lb_token().c_str());
797       args.initial_metadata->Add(LbTokenMetadata::key(), lb_token);
798     }
799     // Unwrap subchannel to pass up to the channel.
800     complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
801   }
802   return result;
803 }
804 
805 //
806 // GrpcLb::Helper
807 //
808 
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)809 RefCountedPtr<SubchannelInterface> GrpcLb::Helper::CreateSubchannel(
810     const grpc_resolved_address& address, const ChannelArgs& per_address_args,
811     const ChannelArgs& args) {
812   if (parent()->shutting_down_) return nullptr;
813   const auto* arg = per_address_args.GetObject<TokenAndClientStatsArg>();
814   if (arg == nullptr) {
815     auto addr_str = grpc_sockaddr_to_string(&address, false);
816     Crash(
817         absl::StrFormat("[grpclb %p] no TokenAndClientStatsArg for address %s",
818                         parent(), addr_str.value_or("N/A").c_str()));
819   }
820   std::string lb_token = arg->lb_token();
821   RefCountedPtr<GrpcLbClientStats> client_stats = arg->client_stats();
822   return MakeRefCounted<SubchannelWrapper>(
823       parent()->channel_control_helper()->CreateSubchannel(
824           address, per_address_args, args),
825       parent()->RefAsSubclass<GrpcLb>(DEBUG_LOCATION, "SubchannelWrapper"),
826       std::move(lb_token), std::move(client_stats));
827 }
828 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)829 void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
830                                  const absl::Status& status,
831                                  RefCountedPtr<SubchannelPicker> picker) {
832   if (parent()->shutting_down_) return;
833   // Record whether child policy reports READY.
834   parent()->child_policy_ready_ = state == GRPC_CHANNEL_READY;
835   // Enter fallback mode if needed.
836   parent()->MaybeEnterFallbackModeAfterStartup();
837   // We pass the serverlist to the picker so that it can handle drops.
838   // However, we don't want to handle drops in the case where the child
839   // policy is reporting a state other than READY (unless we are
840   // dropping *all* calls), because we don't want to process drops for picks
841   // that yield a QUEUE result; this would result in dropping too many calls,
842   // since we will see the queued picks multiple times, and we'd consider each
843   // one a separate call for the drop calculation.  So in this case, we pass
844   // a null serverlist to the picker, which tells it not to do drops.
845   RefCountedPtr<Serverlist> serverlist;
846   if (state == GRPC_CHANNEL_READY ||
847       (parent()->serverlist_ != nullptr &&
848        parent()->serverlist_->ContainsAllDropEntries())) {
849     serverlist = parent()->serverlist_;
850   }
851   RefCountedPtr<GrpcLbClientStats> client_stats;
852   if (parent()->lb_calld_ != nullptr &&
853       parent()->lb_calld_->client_stats() != nullptr) {
854     client_stats = parent()->lb_calld_->client_stats()->Ref();
855   }
856   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
857     gpr_log(GPR_INFO,
858             "[grpclb %p helper %p] state=%s (%s) wrapping child "
859             "picker %p (serverlist=%p, client_stats=%p)",
860             parent(), this, ConnectivityStateName(state),
861             status.ToString().c_str(), picker.get(), serverlist.get(),
862             client_stats.get());
863   }
864   parent()->channel_control_helper()->UpdateState(
865       state, status,
866       MakeRefCounted<Picker>(std::move(serverlist), std::move(picker),
867                              std::move(client_stats)));
868 }
869 
RequestReresolution()870 void GrpcLb::Helper::RequestReresolution() {
871   if (parent()->shutting_down_) return;
872   // Ignore if we're not in fallback mode, because if we got the backend
873   // addresses from the balancer, re-resolving is not going to fix it.
874   if (!parent()->fallback_mode_) return;
875   parent()->channel_control_helper()->RequestReresolution();
876 }
877 
878 //
879 // GrpcLb::BalancerCallState
880 //
881 
BalancerCallState(RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)882 GrpcLb::BalancerCallState::BalancerCallState(
883     RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
884     : InternallyRefCounted<BalancerCallState>(
885           GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace) ? "BalancerCallState"
886                                                      : nullptr),
887       grpclb_policy_(std::move(parent_grpclb_policy)) {
888   GPR_ASSERT(grpclb_policy_ != nullptr);
889   GPR_ASSERT(!grpclb_policy()->shutting_down_);
890   // Init the LB call. Note that the LB call will progress every time there's
891   // activity in grpclb_policy_->interested_parties(), which is comprised of
892   // the polling entities from client_channel.
893   GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSent, this,
894                     grpc_schedule_on_exec_ctx);
895   GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
896                     OnBalancerMessageReceived, this, grpc_schedule_on_exec_ctx);
897   GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, OnBalancerStatusReceived,
898                     this, grpc_schedule_on_exec_ctx);
899   GRPC_CLOSURE_INIT(&client_load_report_done_closure_, ClientLoadReportDone,
900                     this, grpc_schedule_on_exec_ctx);
901   const Timestamp deadline =
902       grpclb_policy()->lb_call_timeout_ == Duration::Zero()
903           ? Timestamp::InfFuture()
904           : Timestamp::Now() + grpclb_policy()->lb_call_timeout_;
905   lb_call_ = grpclb_policy()->lb_channel_->CreateCall(
906       /*parent_call=*/nullptr, GRPC_PROPAGATE_DEFAULTS,
907       /*cq=*/nullptr, grpclb_policy_->interested_parties(),
908       Slice::FromStaticString("/grpc.lb.v1.LoadBalancer/BalanceLoad"),
909       /*authority=*/absl::nullopt, deadline, /*registered_method=*/true);
910   // Init the LB call request payload.
911   upb::Arena arena;
912   grpc_slice request_payload_slice = GrpcLbRequestCreate(
913       grpclb_policy()->config_->service_name().empty()
914           ? grpclb_policy()->channel_control_helper()->GetAuthority()
915           : grpclb_policy()->config_->service_name(),
916       arena.ptr());
917   send_message_payload_ =
918       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
919   CSliceUnref(request_payload_slice);
920   // Init other data associated with the LB call.
921   grpc_metadata_array_init(&lb_initial_metadata_recv_);
922   grpc_metadata_array_init(&lb_trailing_metadata_recv_);
923 }
924 
~BalancerCallState()925 GrpcLb::BalancerCallState::~BalancerCallState() {
926   GPR_ASSERT(lb_call_ != nullptr);
927   grpc_call_unref(lb_call_);
928   grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
929   grpc_metadata_array_destroy(&lb_trailing_metadata_recv_);
930   grpc_byte_buffer_destroy(send_message_payload_);
931   grpc_byte_buffer_destroy(recv_message_payload_);
932   CSliceUnref(lb_call_status_details_);
933 }
934 
Orphan()935 void GrpcLb::BalancerCallState::Orphan() {
936   GPR_ASSERT(lb_call_ != nullptr);
937   // If we are here because grpclb_policy wants to cancel the call,
938   // lb_on_balancer_status_received_ will complete the cancellation and clean
939   // up. Otherwise, we are here because grpclb_policy has to orphan a failed
940   // call, then the following cancellation will be a no-op.
941   grpc_call_cancel_internal(lb_call_);
942   if (client_load_report_handle_.has_value() &&
943       grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
944           client_load_report_handle_.value())) {
945     Unref(DEBUG_LOCATION, "client_load_report cancelled");
946   }
947   // Note that the initial ref is hold by lb_on_balancer_status_received_
948   // instead of the caller of this function. So the corresponding unref happens
949   // in lb_on_balancer_status_received_ instead of here.
950 }
951 
StartQuery()952 void GrpcLb::BalancerCallState::StartQuery() {
953   GPR_ASSERT(lb_call_ != nullptr);
954   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
955     gpr_log(GPR_INFO, "[grpclb %p] lb_calld=%p: Starting LB call %p",
956             grpclb_policy_.get(), this, lb_call_);
957   }
958   // Create the ops.
959   grpc_call_error call_error;
960   grpc_op ops[3];
961   memset(ops, 0, sizeof(ops));
962   // Op: send initial metadata.
963   grpc_op* op = ops;
964   op->op = GRPC_OP_SEND_INITIAL_METADATA;
965   op->data.send_initial_metadata.count = 0;
966   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
967               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
968   op->reserved = nullptr;
969   op++;
970   // Op: send request message.
971   GPR_ASSERT(send_message_payload_ != nullptr);
972   op->op = GRPC_OP_SEND_MESSAGE;
973   op->data.send_message.send_message = send_message_payload_;
974   op->flags = 0;
975   op->reserved = nullptr;
976   op++;
977   // TODO(roth): We currently track this ref manually.  Once the
978   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
979   // with the callback.
980   auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
981   self.release();
982   call_error = grpc_call_start_batch_and_execute(lb_call_, ops,
983                                                  static_cast<size_t>(op - ops),
984                                                  &lb_on_initial_request_sent_);
985   GPR_ASSERT(GRPC_CALL_OK == call_error);
986   // Op: recv initial metadata.
987   op = ops;
988   op->op = GRPC_OP_RECV_INITIAL_METADATA;
989   op->data.recv_initial_metadata.recv_initial_metadata =
990       &lb_initial_metadata_recv_;
991   op->flags = 0;
992   op->reserved = nullptr;
993   op++;
994   // Op: recv response.
995   op->op = GRPC_OP_RECV_MESSAGE;
996   op->data.recv_message.recv_message = &recv_message_payload_;
997   op->flags = 0;
998   op->reserved = nullptr;
999   op++;
1000   // TODO(roth): We currently track this ref manually.  Once the
1001   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1002   // with the callback.
1003   self = Ref(DEBUG_LOCATION, "on_message_received");
1004   self.release();
1005   call_error = grpc_call_start_batch_and_execute(
1006       lb_call_, ops, static_cast<size_t>(op - ops),
1007       &lb_on_balancer_message_received_);
1008   GPR_ASSERT(GRPC_CALL_OK == call_error);
1009   // Op: recv server status.
1010   op = ops;
1011   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1012   op->data.recv_status_on_client.trailing_metadata =
1013       &lb_trailing_metadata_recv_;
1014   op->data.recv_status_on_client.status = &lb_call_status_;
1015   op->data.recv_status_on_client.status_details = &lb_call_status_details_;
1016   op->flags = 0;
1017   op->reserved = nullptr;
1018   op++;
1019   // This callback signals the end of the LB call, so it relies on the initial
1020   // ref instead of a new ref. When it's invoked, it's the initial ref that is
1021   // unreffed.
1022   call_error = grpc_call_start_batch_and_execute(
1023       lb_call_, ops, static_cast<size_t>(op - ops),
1024       &lb_on_balancer_status_received_);
1025   GPR_ASSERT(GRPC_CALL_OK == call_error);
1026 }
1027 
ScheduleNextClientLoadReportLocked()1028 void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
1029   client_load_report_handle_ =
1030       grpclb_policy()->channel_control_helper()->GetEventEngine()->RunAfter(
1031           client_stats_report_interval_, [this] {
1032             ApplicationCallbackExecCtx callback_exec_ctx;
1033             ExecCtx exec_ctx;
1034             grpclb_policy()->work_serializer()->Run(
1035                 [this] { MaybeSendClientLoadReportLocked(); }, DEBUG_LOCATION);
1036           });
1037 }
1038 
MaybeSendClientLoadReportLocked()1039 void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked() {
1040   client_load_report_handle_.reset();
1041   if (this != grpclb_policy()->lb_calld_.get()) {
1042     Unref(DEBUG_LOCATION, "client_load_report");
1043     return;
1044   }
1045   // If we've already sent the initial request, then we can go ahead and send
1046   // the load report. Otherwise, we need to wait until the initial request has
1047   // been sent to send this (see OnInitialRequestSentLocked()).
1048   if (send_message_payload_ == nullptr) {
1049     SendClientLoadReportLocked();
1050   } else {
1051     client_load_report_is_due_ = true;
1052   }
1053 }
1054 
SendClientLoadReportLocked()1055 void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
1056   // Construct message payload.
1057   GPR_ASSERT(send_message_payload_ == nullptr);
1058   // Get snapshot of stats.
1059   int64_t num_calls_started;
1060   int64_t num_calls_finished;
1061   int64_t num_calls_finished_with_client_failed_to_send;
1062   int64_t num_calls_finished_known_received;
1063   std::unique_ptr<GrpcLbClientStats::DroppedCallCounts> drop_token_counts;
1064   client_stats_->Get(&num_calls_started, &num_calls_finished,
1065                      &num_calls_finished_with_client_failed_to_send,
1066                      &num_calls_finished_known_received, &drop_token_counts);
1067   // Skip client load report if the counters were all zero in the last
1068   // report and they are still zero in this one.
1069   if (num_calls_started == 0 && num_calls_finished == 0 &&
1070       num_calls_finished_with_client_failed_to_send == 0 &&
1071       num_calls_finished_known_received == 0 &&
1072       (drop_token_counts == nullptr || drop_token_counts->empty())) {
1073     if (last_client_load_report_counters_were_zero_) {
1074       ScheduleNextClientLoadReportLocked();
1075       return;
1076     }
1077     last_client_load_report_counters_were_zero_ = true;
1078   } else {
1079     last_client_load_report_counters_were_zero_ = false;
1080   }
1081   // Populate load report.
1082   upb::Arena arena;
1083   grpc_slice request_payload_slice = GrpcLbLoadReportRequestCreate(
1084       num_calls_started, num_calls_finished,
1085       num_calls_finished_with_client_failed_to_send,
1086       num_calls_finished_known_received, drop_token_counts.get(), arena.ptr());
1087   send_message_payload_ =
1088       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1089   CSliceUnref(request_payload_slice);
1090   // Send the report.
1091   grpc_op op;
1092   memset(&op, 0, sizeof(op));
1093   op.op = GRPC_OP_SEND_MESSAGE;
1094   op.data.send_message.send_message = send_message_payload_;
1095   grpc_call_error call_error = grpc_call_start_batch_and_execute(
1096       lb_call_, &op, 1, &client_load_report_done_closure_);
1097   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1098     gpr_log(GPR_ERROR,
1099             "[grpclb %p] lb_calld=%p call_error=%d sending client load report",
1100             grpclb_policy_.get(), this, call_error);
1101     GPR_ASSERT(GRPC_CALL_OK == call_error);
1102   }
1103 }
1104 
ClientLoadReportDone(void * arg,grpc_error_handle error)1105 void GrpcLb::BalancerCallState::ClientLoadReportDone(void* arg,
1106                                                      grpc_error_handle error) {
1107   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1108   lb_calld->grpclb_policy()->work_serializer()->Run(
1109       [lb_calld, error]() { lb_calld->ClientLoadReportDoneLocked(error); },
1110       DEBUG_LOCATION);
1111 }
1112 
ClientLoadReportDoneLocked(grpc_error_handle error)1113 void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(
1114     grpc_error_handle error) {
1115   grpc_byte_buffer_destroy(send_message_payload_);
1116   send_message_payload_ = nullptr;
1117   if (!error.ok() || this != grpclb_policy()->lb_calld_.get()) {
1118     Unref(DEBUG_LOCATION, "client_load_report");
1119     return;
1120   }
1121   ScheduleNextClientLoadReportLocked();
1122 }
1123 
OnInitialRequestSent(void * arg,grpc_error_handle)1124 void GrpcLb::BalancerCallState::OnInitialRequestSent(
1125     void* arg, grpc_error_handle /*error*/) {
1126   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1127   lb_calld->grpclb_policy()->work_serializer()->Run(
1128       [lb_calld]() { lb_calld->OnInitialRequestSentLocked(); }, DEBUG_LOCATION);
1129 }
1130 
OnInitialRequestSentLocked()1131 void GrpcLb::BalancerCallState::OnInitialRequestSentLocked() {
1132   grpc_byte_buffer_destroy(send_message_payload_);
1133   send_message_payload_ = nullptr;
1134   // If we attempted to send a client load report before the initial request was
1135   // sent (and this lb_calld is still in use), send the load report now.
1136   if (client_load_report_is_due_ && this == grpclb_policy()->lb_calld_.get()) {
1137     SendClientLoadReportLocked();
1138     client_load_report_is_due_ = false;
1139   }
1140   Unref(DEBUG_LOCATION, "on_initial_request_sent");
1141 }
1142 
OnBalancerMessageReceived(void * arg,grpc_error_handle)1143 void GrpcLb::BalancerCallState::OnBalancerMessageReceived(
1144     void* arg, grpc_error_handle /*error*/) {
1145   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1146   lb_calld->grpclb_policy()->work_serializer()->Run(
1147       [lb_calld]() { lb_calld->OnBalancerMessageReceivedLocked(); },
1148       DEBUG_LOCATION);
1149 }
1150 
OnBalancerMessageReceivedLocked()1151 void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
1152   // Null payload means the LB call was cancelled.
1153   if (this != grpclb_policy()->lb_calld_.get() ||
1154       recv_message_payload_ == nullptr) {
1155     Unref(DEBUG_LOCATION, "on_message_received");
1156     return;
1157   }
1158   grpc_byte_buffer_reader bbr;
1159   grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1160   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1161   grpc_byte_buffer_reader_destroy(&bbr);
1162   grpc_byte_buffer_destroy(recv_message_payload_);
1163   recv_message_payload_ = nullptr;
1164   GrpcLbResponse response;
1165   upb::Arena arena;
1166   if (!GrpcLbResponseParse(response_slice, arena.ptr(), &response) ||
1167       (response.type == response.INITIAL && seen_initial_response_)) {
1168     if (gpr_should_log(GPR_LOG_SEVERITY_ERROR)) {
1169       char* response_slice_str =
1170           grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
1171       gpr_log(GPR_ERROR,
1172               "[grpclb %p] lb_calld=%p: Invalid LB response received: '%s'. "
1173               "Ignoring.",
1174               grpclb_policy(), this, response_slice_str);
1175       gpr_free(response_slice_str);
1176     }
1177   } else {
1178     switch (response.type) {
1179       case response.INITIAL: {
1180         if (response.client_stats_report_interval != Duration::Zero()) {
1181           client_stats_report_interval_ = std::max(
1182               Duration::Seconds(1), response.client_stats_report_interval);
1183           if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1184             gpr_log(GPR_INFO,
1185                     "[grpclb %p] lb_calld=%p: Received initial LB response "
1186                     "message; client load reporting interval = %" PRId64
1187                     " milliseconds",
1188                     grpclb_policy(), this,
1189                     client_stats_report_interval_.millis());
1190           }
1191         } else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1192           gpr_log(GPR_INFO,
1193                   "[grpclb %p] lb_calld=%p: Received initial LB response "
1194                   "message; client load reporting NOT enabled",
1195                   grpclb_policy(), this);
1196         }
1197         seen_initial_response_ = true;
1198         break;
1199       }
1200       case response.SERVERLIST: {
1201         GPR_ASSERT(lb_call_ != nullptr);
1202         auto serverlist_wrapper =
1203             MakeRefCounted<Serverlist>(std::move(response.serverlist));
1204         if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1205           gpr_log(GPR_INFO,
1206                   "[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR
1207                   " servers received:\n%s",
1208                   grpclb_policy(), this,
1209                   serverlist_wrapper->serverlist().size(),
1210                   serverlist_wrapper->AsText().c_str());
1211         }
1212         seen_serverlist_ = true;
1213         // Start sending client load report only after we start using the
1214         // serverlist returned from the current LB call.
1215         if (client_stats_report_interval_ > Duration::Zero() &&
1216             client_stats_ == nullptr) {
1217           client_stats_ = MakeRefCounted<GrpcLbClientStats>();
1218           // Ref held by callback.
1219           Ref(DEBUG_LOCATION, "client_load_report").release();
1220           ScheduleNextClientLoadReportLocked();
1221         }
1222         // Check if the serverlist differs from the previous one.
1223         if (grpclb_policy()->serverlist_ != nullptr &&
1224             *grpclb_policy()->serverlist_ == *serverlist_wrapper) {
1225           if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1226             gpr_log(GPR_INFO,
1227                     "[grpclb %p] lb_calld=%p: Incoming server list identical "
1228                     "to current, ignoring.",
1229                     grpclb_policy(), this);
1230           }
1231         } else {  // New serverlist.
1232           // Dispose of the fallback.
1233           // TODO(roth): Ideally, we should stay in fallback mode until we
1234           // know that we can reach at least one of the backends in the new
1235           // serverlist.  Unfortunately, we can't do that, since we need to
1236           // send the new addresses to the child policy in order to determine
1237           // if they are reachable, and if we don't exit fallback mode now,
1238           // CreateOrUpdateChildPolicyLocked() will use the fallback
1239           // addresses instead of the addresses from the new serverlist.
1240           // However, if we can't reach any of the servers in the new
1241           // serverlist, then the child policy will never switch away from
1242           // the fallback addresses, but the grpclb policy will still think
1243           // that we're not in fallback mode, which means that we won't send
1244           // updates to the child policy when the fallback addresses are
1245           // updated by the resolver.  This is sub-optimal, but the only way
1246           // to fix it is to maintain a completely separate child policy for
1247           // fallback mode, and that's more work than we want to put into
1248           // the grpclb implementation at this point, since we're deprecating
1249           // it in favor of the xds policy.  We will implement this the
1250           // right way in the xds policy instead.
1251           if (grpclb_policy()->fallback_mode_) {
1252             gpr_log(GPR_INFO,
1253                     "[grpclb %p] Received response from balancer; exiting "
1254                     "fallback mode",
1255                     grpclb_policy());
1256             grpclb_policy()->fallback_mode_ = false;
1257           }
1258           if (grpclb_policy()->fallback_at_startup_checks_pending_) {
1259             grpclb_policy()->fallback_at_startup_checks_pending_ = false;
1260             grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
1261                 *grpclb_policy()->lb_fallback_timer_handle_);
1262             grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
1263           }
1264           // Update the serverlist in the GrpcLb instance. This serverlist
1265           // instance will be destroyed either upon the next update or when the
1266           // GrpcLb instance is destroyed.
1267           grpclb_policy()->serverlist_ = std::move(serverlist_wrapper);
1268           grpclb_policy()->CreateOrUpdateChildPolicyLocked();
1269         }
1270         break;
1271       }
1272       case response.FALLBACK: {
1273         if (!grpclb_policy()->fallback_mode_) {
1274           gpr_log(GPR_INFO,
1275                   "[grpclb %p] Entering fallback mode as requested by balancer",
1276                   grpclb_policy());
1277           if (grpclb_policy()->fallback_at_startup_checks_pending_) {
1278             grpclb_policy()->fallback_at_startup_checks_pending_ = false;
1279             grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
1280                 *grpclb_policy()->lb_fallback_timer_handle_);
1281             grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
1282           }
1283           grpclb_policy()->fallback_mode_ = true;
1284           grpclb_policy()->CreateOrUpdateChildPolicyLocked();
1285           // Reset serverlist, so that if the balancer exits fallback
1286           // mode by sending the same serverlist we were previously
1287           // using, we don't incorrectly ignore it as a duplicate.
1288           grpclb_policy()->serverlist_.reset();
1289         }
1290         break;
1291       }
1292     }
1293   }
1294   CSliceUnref(response_slice);
1295   if (!grpclb_policy()->shutting_down_) {
1296     // Keep listening for serverlist updates.
1297     grpc_op op;
1298     memset(&op, 0, sizeof(op));
1299     op.op = GRPC_OP_RECV_MESSAGE;
1300     op.data.recv_message.recv_message = &recv_message_payload_;
1301     op.flags = 0;
1302     op.reserved = nullptr;
1303     // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
1304     const grpc_call_error call_error = grpc_call_start_batch_and_execute(
1305         lb_call_, &op, 1, &lb_on_balancer_message_received_);
1306     GPR_ASSERT(GRPC_CALL_OK == call_error);
1307   } else {
1308     Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
1309   }
1310 }
1311 
OnBalancerStatusReceived(void * arg,grpc_error_handle error)1312 void GrpcLb::BalancerCallState::OnBalancerStatusReceived(
1313     void* arg, grpc_error_handle error) {
1314   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1315   lb_calld->grpclb_policy()->work_serializer()->Run(
1316       [lb_calld, error]() { lb_calld->OnBalancerStatusReceivedLocked(error); },
1317       DEBUG_LOCATION);
1318 }
1319 
OnBalancerStatusReceivedLocked(grpc_error_handle error)1320 void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
1321     grpc_error_handle error) {
1322   GPR_ASSERT(lb_call_ != nullptr);
1323   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1324     char* status_details = grpc_slice_to_c_string(lb_call_status_details_);
1325     gpr_log(GPR_INFO,
1326             "[grpclb %p] lb_calld=%p: Status from LB server received. "
1327             "Status = %d, details = '%s', (lb_call: %p), error '%s'",
1328             grpclb_policy(), this, lb_call_status_, status_details, lb_call_,
1329             StatusToString(error).c_str());
1330     gpr_free(status_details);
1331   }
1332   // If this lb_calld is still in use, this call ended because of a failure so
1333   // we want to retry connecting. Otherwise, we have deliberately ended this
1334   // call and no further action is required.
1335   if (this == grpclb_policy()->lb_calld_.get()) {
1336     // If the fallback-at-startup checks are pending, go into fallback mode
1337     // immediately.  This short-circuits the timeout for the fallback-at-startup
1338     // case.
1339     grpclb_policy()->lb_calld_.reset();
1340     if (grpclb_policy()->fallback_at_startup_checks_pending_) {
1341       GPR_ASSERT(!seen_serverlist_);
1342       gpr_log(GPR_INFO,
1343               "[grpclb %p] Balancer call finished without receiving "
1344               "serverlist; entering fallback mode",
1345               grpclb_policy());
1346       grpclb_policy()->fallback_at_startup_checks_pending_ = false;
1347       grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
1348           *grpclb_policy()->lb_fallback_timer_handle_);
1349       grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
1350       grpclb_policy()->fallback_mode_ = true;
1351       grpclb_policy()->CreateOrUpdateChildPolicyLocked();
1352     } else {
1353       // This handles the fallback-after-startup case.
1354       grpclb_policy()->MaybeEnterFallbackModeAfterStartup();
1355     }
1356     GPR_ASSERT(!grpclb_policy()->shutting_down_);
1357     grpclb_policy()->channel_control_helper()->RequestReresolution();
1358     if (seen_initial_response_) {
1359       // If we lose connection to the LB server, reset the backoff and restart
1360       // the LB call immediately.
1361       grpclb_policy()->lb_call_backoff_.Reset();
1362       grpclb_policy()->StartBalancerCallLocked();
1363     } else {
1364       // If this LB call fails establishing any connection to the LB server,
1365       // retry later.
1366       grpclb_policy()->StartBalancerCallRetryTimerLocked();
1367     }
1368   }
1369   Unref(DEBUG_LOCATION, "lb_call_ended");
1370 }
1371 
1372 //
1373 // helper code for creating balancer channel
1374 //
1375 
ExtractBalancerAddresses(const ChannelArgs & args)1376 EndpointAddressesList ExtractBalancerAddresses(const ChannelArgs& args) {
1377   const EndpointAddressesList* endpoints =
1378       FindGrpclbBalancerAddressesInChannelArgs(args);
1379   if (endpoints != nullptr) return *endpoints;
1380   return EndpointAddressesList();
1381 }
1382 
1383 // Returns the channel args for the LB channel, used to create a bidirectional
1384 // stream for the reception of load balancing updates.
1385 //
1386 // Inputs:
1387 //   - \a response_generator: in order to propagate updates from the resolver
1388 //   above the grpclb policy.
1389 //   - \a args: other args inherited from the grpclb policy.
BuildBalancerChannelArgs(FakeResolverResponseGenerator * response_generator,const ChannelArgs & args)1390 ChannelArgs BuildBalancerChannelArgs(
1391     FakeResolverResponseGenerator* response_generator,
1392     const ChannelArgs& args) {
1393   ChannelArgs grpclb_channel_args;
1394   const grpc_channel_args* lb_channel_specific_args =
1395       args.GetPointer<grpc_channel_args>(
1396           GRPC_ARG_EXPERIMENTAL_GRPCLB_CHANNEL_ARGS);
1397   if (lb_channel_specific_args != nullptr) {
1398     grpclb_channel_args = ChannelArgs::FromC(lb_channel_specific_args);
1399   } else {
1400     // Set grpclb_channel_args based on the parent channel's channel args.
1401     grpclb_channel_args =
1402         args
1403             // LB policy name, since we want to use the default (pick_first) in
1404             // the LB channel.
1405             .Remove(GRPC_ARG_LB_POLICY_NAME)
1406             // Strip out the service config, since we don't want the LB policy
1407             // config specified for the parent channel to affect the LB channel.
1408             .Remove(GRPC_ARG_SERVICE_CONFIG)
1409             // The fake resolver response generator, because we are replacing it
1410             // with the one from the grpclb policy, used to propagate updates to
1411             // the LB channel.
1412             .Remove(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR)
1413             // The LB channel should use the authority indicated by the target
1414             // authority table (see \a ModifyGrpclbBalancerChannelArgs),
1415             // as opposed to the authority from the parent channel.
1416             .Remove(GRPC_ARG_DEFAULT_AUTHORITY)
1417             // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should
1418             // be treated as a stand-alone channel and not inherit this argument
1419             // from the args of the parent channel.
1420             .Remove(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)
1421             // Don't want to pass down channelz node from parent; the balancer
1422             // channel will get its own.
1423             .Remove(GRPC_ARG_CHANNELZ_CHANNEL_NODE)
1424             // Remove the channel args for channel credentials and replace it
1425             // with a version that does not contain call credentials. The
1426             // loadbalancer is not necessarily trusted to handle bearer token
1427             // credentials.
1428             .Remove(GRPC_ARG_CHANNEL_CREDENTIALS);
1429   }
1430   return grpclb_channel_args
1431       // A channel arg indicating the target is a grpclb load balancer.
1432       .Set(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER, 1)
1433       // Tells channelz that this is an internal channel.
1434       .Set(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL, 1)
1435       // The fake resolver response generator, which we use to inject
1436       // address updates into the LB channel.
1437       .SetObject(response_generator->Ref());
1438 }
1439 
1440 //
1441 // ctor and dtor
1442 //
1443 
GrpcLb(Args args)1444 GrpcLb::GrpcLb(Args args)
1445     : LoadBalancingPolicy(std::move(args)),
1446       response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
1447       lb_call_timeout_(std::max(
1448           Duration::Zero(),
1449           channel_args()
1450               .GetDurationFromIntMillis(GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS)
1451               .value_or(Duration::Zero()))),
1452       lb_call_backoff_(
1453           BackOff::Options()
1454               .set_initial_backoff(Duration::Seconds(
1455                   GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS))
1456               .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
1457               .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
1458               .set_max_backoff(Duration::Seconds(
1459                   GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS))),
1460       fallback_at_startup_timeout_(std::max(
1461           Duration::Zero(),
1462           channel_args()
1463               .GetDurationFromIntMillis(GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS)
1464               .value_or(Duration::Milliseconds(
1465                   GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS)))),
1466       subchannel_cache_interval_(std::max(
1467           Duration::Zero(),
1468           channel_args()
1469               .GetDurationFromIntMillis(
1470                   GRPC_ARG_GRPCLB_SUBCHANNEL_CACHE_INTERVAL_MS)
1471               .value_or(Duration::Milliseconds(
1472                   GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS)))) {
1473   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1474     gpr_log(GPR_INFO,
1475             "[grpclb %p] Will use '%s' as the server name for LB request.",
1476             this,
1477             std::string(channel_control_helper()->GetAuthority()).c_str());
1478   }
1479 }
1480 
ShutdownLocked()1481 void GrpcLb::ShutdownLocked() {
1482   shutting_down_ = true;
1483   lb_calld_.reset();
1484   if (subchannel_cache_timer_handle_.has_value()) {
1485     channel_control_helper()->GetEventEngine()->Cancel(
1486         *subchannel_cache_timer_handle_);
1487     subchannel_cache_timer_handle_.reset();
1488   }
1489   cached_subchannels_.clear();
1490   if (lb_call_retry_timer_handle_.has_value()) {
1491     channel_control_helper()->GetEventEngine()->Cancel(
1492         *lb_call_retry_timer_handle_);
1493   }
1494   if (fallback_at_startup_checks_pending_) {
1495     fallback_at_startup_checks_pending_ = false;
1496     channel_control_helper()->GetEventEngine()->Cancel(
1497         *lb_fallback_timer_handle_);
1498     CancelBalancerChannelConnectivityWatchLocked();
1499   }
1500   if (child_policy_ != nullptr) {
1501     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
1502                                      interested_parties());
1503     child_policy_.reset();
1504   }
1505   // We destroy the LB channel here instead of in our destructor because
1506   // destroying the channel triggers a last callback to
1507   // OnBalancerChannelConnectivityChangedLocked(), and we need to be
1508   // alive when that callback is invoked.
1509   if (lb_channel_ != nullptr) {
1510     if (parent_channelz_node_ != nullptr) {
1511       channelz::ChannelNode* child_channelz_node = lb_channel_->channelz_node();
1512       GPR_ASSERT(child_channelz_node != nullptr);
1513       parent_channelz_node_->RemoveChildChannel(child_channelz_node->uuid());
1514     }
1515     lb_channel_.reset();
1516   }
1517 }
1518 
1519 //
1520 // public methods
1521 //
1522 
ResetBackoffLocked()1523 void GrpcLb::ResetBackoffLocked() {
1524   if (lb_channel_ != nullptr) {
1525     lb_channel_->ResetConnectionBackoff();
1526   }
1527   if (child_policy_ != nullptr) {
1528     child_policy_->ResetBackoffLocked();
1529   }
1530 }
1531 
1532 // Endpoint iterator wrapper to add null LB token attribute.
1533 class GrpcLb::NullLbTokenEndpointIterator final
1534     : public EndpointAddressesIterator {
1535  public:
NullLbTokenEndpointIterator(std::shared_ptr<EndpointAddressesIterator> parent_it)1536   explicit NullLbTokenEndpointIterator(
1537       std::shared_ptr<EndpointAddressesIterator> parent_it)
1538       : parent_it_(std::move(parent_it)) {}
1539 
ForEach(absl::FunctionRef<void (const EndpointAddresses &)> callback) const1540   void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
1541       const override {
1542     parent_it_->ForEach([&](const EndpointAddresses& endpoint) {
1543       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1544         gpr_log(GPR_INFO, "[grpclb %p] fallback address: %s", this,
1545                 endpoint.ToString().c_str());
1546       }
1547       callback(EndpointAddresses(endpoint.addresses(),
1548                                  endpoint.args().SetObject(empty_token_)));
1549     });
1550   }
1551 
1552  private:
1553   std::shared_ptr<EndpointAddressesIterator> parent_it_;
1554   RefCountedPtr<TokenAndClientStatsArg> empty_token_ =
1555       MakeRefCounted<TokenAndClientStatsArg>("", nullptr);
1556 };
1557 
UpdateLocked(UpdateArgs args)1558 absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
1559   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1560     gpr_log(GPR_INFO, "[grpclb %p] received update", this);
1561   }
1562   const bool is_initial_update = lb_channel_ == nullptr;
1563   config_ = args.config.TakeAsSubclass<GrpcLbConfig>();
1564   GPR_ASSERT(config_ != nullptr);
1565   args_ = std::move(args.args);
1566   // Update fallback address list.
1567   if (!args.addresses.ok()) {
1568     fallback_backend_addresses_ = args.addresses.status();
1569   } else {
1570     fallback_backend_addresses_ = std::make_shared<NullLbTokenEndpointIterator>(
1571         std::move(*args.addresses));
1572   }
1573   resolution_note_ = std::move(args.resolution_note);
1574   // Update balancer channel.
1575   absl::Status status = UpdateBalancerChannelLocked();
1576   // Update the existing child policy, if any.
1577   if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
1578   // If this is the initial update, start the fallback-at-startup checks
1579   // and the balancer call.
1580   if (is_initial_update) {
1581     fallback_at_startup_checks_pending_ = true;
1582     // Start timer.
1583     lb_fallback_timer_handle_ =
1584         channel_control_helper()->GetEventEngine()->RunAfter(
1585             fallback_at_startup_timeout_,
1586             [self = RefAsSubclass<GrpcLb>(DEBUG_LOCATION,
1587                                           "on_fallback_timer")]() mutable {
1588               ApplicationCallbackExecCtx callback_exec_ctx;
1589               ExecCtx exec_ctx;
1590               auto self_ptr = self.get();
1591               self_ptr->work_serializer()->Run(
1592                   [self = std::move(self)]() { self->OnFallbackTimerLocked(); },
1593                   DEBUG_LOCATION);
1594             });
1595     // Start watching the channel's connectivity state.  If the channel
1596     // goes into state TRANSIENT_FAILURE before the timer fires, we go into
1597     // fallback mode even if the fallback timeout has not elapsed.
1598     watcher_ =
1599         new StateWatcher(RefAsSubclass<GrpcLb>(DEBUG_LOCATION, "StateWatcher"));
1600     lb_channel_->AddConnectivityWatcher(
1601         GRPC_CHANNEL_IDLE,
1602         OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
1603     // Start balancer call.
1604     StartBalancerCallLocked();
1605   }
1606   return status;
1607 }
1608 
1609 //
1610 // helpers for UpdateLocked()
1611 //
1612 
UpdateBalancerChannelLocked()1613 absl::Status GrpcLb::UpdateBalancerChannelLocked() {
1614   // Get balancer addresses.
1615   EndpointAddressesList balancer_addresses = ExtractBalancerAddresses(args_);
1616   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1617     for (const auto& endpoint : balancer_addresses) {
1618       gpr_log(GPR_INFO, "[grpclb %p] balancer address: %s", this,
1619               endpoint.ToString().c_str());
1620     }
1621   }
1622   absl::Status status;
1623   if (balancer_addresses.empty()) {
1624     status = absl::UnavailableError("balancer address list must be non-empty");
1625   }
1626   // Create channel credentials that do not contain call credentials.
1627   auto channel_credentials = channel_control_helper()->GetChannelCredentials();
1628   // Construct args for balancer channel.
1629   ChannelArgs lb_channel_args =
1630       BuildBalancerChannelArgs(response_generator_.get(), args_);
1631   // Create balancer channel if needed.
1632   if (lb_channel_ == nullptr) {
1633     std::string uri_str =
1634         absl::StrCat("fake:///", channel_control_helper()->GetAuthority());
1635     lb_channel_.reset(Channel::FromC(
1636         grpc_channel_create(uri_str.c_str(), channel_credentials.get(),
1637                             lb_channel_args.ToC().get())));
1638     GPR_ASSERT(lb_channel_ != nullptr);
1639     // Set up channelz linkage.
1640     channelz::ChannelNode* child_channelz_node = lb_channel_->channelz_node();
1641     auto parent_channelz_node = args_.GetObjectRef<channelz::ChannelNode>();
1642     if (child_channelz_node != nullptr && parent_channelz_node != nullptr) {
1643       parent_channelz_node->AddChildChannel(child_channelz_node->uuid());
1644       parent_channelz_node_ = std::move(parent_channelz_node);
1645     }
1646   }
1647   // Propagate updates to the LB channel (pick_first) through the fake
1648   // resolver.
1649   Resolver::Result result;
1650   result.addresses = std::move(balancer_addresses);
1651   // Pass channel creds via channel args, since the fake resolver won't
1652   // do this automatically.
1653   result.args = lb_channel_args.SetObject(std::move(channel_credentials));
1654   response_generator_->SetResponseAsync(std::move(result));
1655   // Return status.
1656   return status;
1657 }
1658 
CancelBalancerChannelConnectivityWatchLocked()1659 void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
1660   lb_channel_->RemoveConnectivityWatcher(watcher_);
1661 }
1662 
1663 //
1664 // code for balancer channel and call
1665 //
1666 
StartBalancerCallLocked()1667 void GrpcLb::StartBalancerCallLocked() {
1668   GPR_ASSERT(lb_channel_ != nullptr);
1669   if (shutting_down_) return;
1670   // Init the LB call data.
1671   GPR_ASSERT(lb_calld_ == nullptr);
1672   lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
1673   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1674     gpr_log(GPR_INFO,
1675             "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
1676             this, lb_channel_.get(), lb_calld_.get());
1677   }
1678   lb_calld_->StartQuery();
1679 }
1680 
StartBalancerCallRetryTimerLocked()1681 void GrpcLb::StartBalancerCallRetryTimerLocked() {
1682   Duration timeout = lb_call_backoff_.NextAttemptTime() - Timestamp::Now();
1683   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1684     gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this);
1685     if (timeout > Duration::Zero()) {
1686       gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.",
1687               this, timeout.millis());
1688     } else {
1689       gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active immediately.",
1690               this);
1691     }
1692   }
1693   lb_call_retry_timer_handle_ =
1694       channel_control_helper()->GetEventEngine()->RunAfter(
1695           timeout,
1696           [self = RefAsSubclass<GrpcLb>(
1697                DEBUG_LOCATION, "on_balancer_call_retry_timer")]() mutable {
1698             ApplicationCallbackExecCtx callback_exec_ctx;
1699             ExecCtx exec_ctx;
1700             auto self_ptr = self.get();
1701             self_ptr->work_serializer()->Run(
1702                 [self = std::move(self)]() {
1703                   self->OnBalancerCallRetryTimerLocked();
1704                 },
1705                 DEBUG_LOCATION);
1706           });
1707 }
1708 
OnBalancerCallRetryTimerLocked()1709 void GrpcLb::OnBalancerCallRetryTimerLocked() {
1710   lb_call_retry_timer_handle_.reset();
1711   if (!shutting_down_ && lb_calld_ == nullptr) {
1712     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1713       gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", this);
1714     }
1715     StartBalancerCallLocked();
1716   }
1717 }
1718 
1719 //
1720 // code for handling fallback mode
1721 //
1722 
MaybeEnterFallbackModeAfterStartup()1723 void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
1724   // Enter fallback mode if all of the following are true:
1725   // - We are not currently in fallback mode.
1726   // - We are not currently waiting for the initial fallback timeout.
1727   // - We are not currently in contact with the balancer.
1728   // - The child policy is not in state READY.
1729   if (!fallback_mode_ && !fallback_at_startup_checks_pending_ &&
1730       (lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) &&
1731       !child_policy_ready_) {
1732     gpr_log(GPR_INFO,
1733             "[grpclb %p] lost contact with balancer and backends from "
1734             "most recent serverlist; entering fallback mode",
1735             this);
1736     fallback_mode_ = true;
1737     CreateOrUpdateChildPolicyLocked();
1738   }
1739 }
1740 
OnFallbackTimerLocked()1741 void GrpcLb::OnFallbackTimerLocked() {
1742   // If we receive a serverlist after the timer fires but before this callback
1743   // actually runs, don't fall back.
1744   if (fallback_at_startup_checks_pending_ && !shutting_down_) {
1745     gpr_log(GPR_INFO,
1746             "[grpclb %p] No response from balancer after fallback timeout; "
1747             "entering fallback mode",
1748             this);
1749     fallback_at_startup_checks_pending_ = false;
1750     CancelBalancerChannelConnectivityWatchLocked();
1751     fallback_mode_ = true;
1752     CreateOrUpdateChildPolicyLocked();
1753   }
1754 }
1755 
1756 //
1757 // code for interacting with the child policy
1758 //
1759 
CreateChildPolicyArgsLocked(bool is_backend_from_grpclb_load_balancer)1760 ChannelArgs GrpcLb::CreateChildPolicyArgsLocked(
1761     bool is_backend_from_grpclb_load_balancer) {
1762   ChannelArgs r =
1763       args_
1764           .Set(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER,
1765                is_backend_from_grpclb_load_balancer)
1766           .Set(GRPC_ARG_GRPCLB_ENABLE_LOAD_REPORTING_FILTER, 1);
1767   if (is_backend_from_grpclb_load_balancer) {
1768     r = r.Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
1769   }
1770   return r;
1771 }
1772 
CreateChildPolicyLocked(const ChannelArgs & args)1773 OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
1774     const ChannelArgs& args) {
1775   LoadBalancingPolicy::Args lb_policy_args;
1776   lb_policy_args.work_serializer = work_serializer();
1777   lb_policy_args.args = args;
1778   lb_policy_args.channel_control_helper =
1779       std::make_unique<Helper>(RefAsSubclass<GrpcLb>(DEBUG_LOCATION, "Helper"));
1780   OrphanablePtr<LoadBalancingPolicy> lb_policy =
1781       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
1782                                          &grpc_lb_glb_trace);
1783   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1784     gpr_log(GPR_INFO, "[grpclb %p] Created new child policy handler (%p)", this,
1785             lb_policy.get());
1786   }
1787   // Add the gRPC LB's interested_parties pollset_set to that of the newly
1788   // created child policy. This will make the child policy progress upon
1789   // activity on gRPC LB, which in turn is tied to the application's call.
1790   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1791                                    interested_parties());
1792   return lb_policy;
1793 }
1794 
EndpointIteratorIsEmpty(const EndpointAddressesIterator & endpoints)1795 bool EndpointIteratorIsEmpty(const EndpointAddressesIterator& endpoints) {
1796   bool empty = true;
1797   endpoints.ForEach([&](const EndpointAddresses&) { empty = false; });
1798   return empty;
1799 }
1800 
CreateOrUpdateChildPolicyLocked()1801 void GrpcLb::CreateOrUpdateChildPolicyLocked() {
1802   if (shutting_down_) return;
1803   // Construct update args.
1804   UpdateArgs update_args;
1805   bool is_backend_from_grpclb_load_balancer = false;
1806   if (fallback_mode_) {
1807     // If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
1808     // received any serverlist from the balancer, we use the fallback
1809     // backends returned by the resolver. Note that the fallback backend
1810     // list may be empty, in which case the new child policy will fail the
1811     // picks.
1812     update_args.addresses = fallback_backend_addresses_;
1813     if (fallback_backend_addresses_.ok() &&
1814         EndpointIteratorIsEmpty(**fallback_backend_addresses_)) {
1815       update_args.resolution_note = absl::StrCat(
1816           "grpclb in fallback mode without any fallback addresses: ",
1817           resolution_note_);
1818     }
1819   } else {
1820     update_args.addresses = serverlist_->GetServerAddressList(
1821         lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
1822     is_backend_from_grpclb_load_balancer = true;
1823     if (update_args.addresses.ok() &&
1824         EndpointIteratorIsEmpty(**update_args.addresses)) {
1825       update_args.resolution_note = "empty serverlist from grpclb balancer";
1826     }
1827   }
1828   update_args.args =
1829       CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
1830   GPR_ASSERT(update_args.args != ChannelArgs());
1831   update_args.config = config_->child_policy();
1832   // Create child policy if needed.
1833   if (child_policy_ == nullptr) {
1834     child_policy_ = CreateChildPolicyLocked(update_args.args);
1835   }
1836   // Update the policy.
1837   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1838     gpr_log(GPR_INFO, "[grpclb %p] Updating child policy handler %p", this,
1839             child_policy_.get());
1840   }
1841   // TODO(roth): If we're in fallback mode and the child policy rejects the
1842   // update, we should propagate that failure back to the resolver somehow.
1843   (void)child_policy_->UpdateLocked(std::move(update_args));
1844 }
1845 
1846 //
1847 // subchannel caching
1848 //
1849 
CacheDeletedSubchannelLocked(RefCountedPtr<SubchannelInterface> subchannel)1850 void GrpcLb::CacheDeletedSubchannelLocked(
1851     RefCountedPtr<SubchannelInterface> subchannel) {
1852   Timestamp deletion_time = Timestamp::Now() + subchannel_cache_interval_;
1853   cached_subchannels_[deletion_time].push_back(std::move(subchannel));
1854   if (!subchannel_cache_timer_handle_.has_value()) {
1855     StartSubchannelCacheTimerLocked();
1856   }
1857 }
1858 
StartSubchannelCacheTimerLocked()1859 void GrpcLb::StartSubchannelCacheTimerLocked() {
1860   GPR_ASSERT(!cached_subchannels_.empty());
1861   subchannel_cache_timer_handle_ =
1862       channel_control_helper()->GetEventEngine()->RunAfter(
1863           cached_subchannels_.begin()->first - Timestamp::Now(),
1864           [self = RefAsSubclass<GrpcLb>(DEBUG_LOCATION,
1865                                         "OnSubchannelCacheTimer")]() mutable {
1866             ApplicationCallbackExecCtx callback_exec_ctx;
1867             ExecCtx exec_ctx;
1868             auto* self_ptr = self.get();
1869             self_ptr->work_serializer()->Run(
1870                 [self = std::move(self)]() mutable {
1871                   self->OnSubchannelCacheTimerLocked();
1872                 },
1873                 DEBUG_LOCATION);
1874           });
1875 }
1876 
OnSubchannelCacheTimerLocked()1877 void GrpcLb::OnSubchannelCacheTimerLocked() {
1878   if (subchannel_cache_timer_handle_.has_value()) {
1879     subchannel_cache_timer_handle_.reset();
1880     auto it = cached_subchannels_.begin();
1881     if (it != cached_subchannels_.end()) {
1882       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1883         gpr_log(GPR_INFO,
1884                 "[grpclb %p] removing %" PRIuPTR " subchannels from cache",
1885                 this, it->second.size());
1886       }
1887       cached_subchannels_.erase(it);
1888     }
1889     if (!cached_subchannels_.empty()) {
1890       StartSubchannelCacheTimerLocked();
1891       return;
1892     }
1893   }
1894 }
1895 
1896 //
1897 // factory
1898 //
1899 
1900 class GrpcLbFactory final : public LoadBalancingPolicyFactory {
1901  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const1902   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1903       LoadBalancingPolicy::Args args) const override {
1904     return MakeOrphanable<GrpcLb>(std::move(args));
1905   }
1906 
name() const1907   absl::string_view name() const override { return kGrpclb; }
1908 
1909   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const1910   ParseLoadBalancingConfig(const Json& json) const override {
1911     return LoadFromJson<RefCountedPtr<GrpcLbConfig>>(
1912         json, JsonArgs(), "errors validating grpclb LB policy config");
1913   }
1914 };
1915 
1916 }  // namespace
1917 
1918 //
1919 // Plugin registration
1920 //
1921 
RegisterGrpcLbPolicy(CoreConfiguration::Builder * builder)1922 void RegisterGrpcLbPolicy(CoreConfiguration::Builder* builder) {
1923   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
1924       std::make_unique<GrpcLbFactory>());
1925   builder->channel_init()
1926       ->RegisterFilter<ClientLoadReportingFilter>(GRPC_CLIENT_SUBCHANNEL)
1927       .IfChannelArg(GRPC_ARG_GRPCLB_ENABLE_LOAD_REPORTING_FILTER, false);
1928 }
1929 
1930 }  // namespace grpc_core
1931