1 //
2 // Copyright 2016 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 /// Implementation of the gRPC LB policy.
18 ///
19 /// This policy takes as input a list of resolved addresses, which must
20 /// include at least one balancer address.
21 ///
22 /// An internal channel (\a lb_channel_) is created for the addresses
23 /// from that are balancers. This channel behaves just like a regular
24 /// channel that uses pick_first to select from the list of balancer
25 /// addresses.
26 ///
27 /// When we get our initial update, we instantiate the internal *streaming*
28 /// call to the LB server (whichever address pick_first chose). The call
29 /// will be complete when either the balancer sends status or when we cancel
30 /// the call (e.g., because we are shutting down). In needed, we retry the
31 /// call. If we received at least one valid message from the server, a new
32 /// call attempt will be made immediately; otherwise, we apply back-off
33 /// delays between attempts.
34 ///
35 /// We maintain an internal round_robin policy instance for distributing
36 /// requests across backends. Whenever we receive a new serverlist from
37 /// the balancer, we update the round_robin policy with the new list of
38 /// addresses. If we cannot communicate with the balancer on startup,
39 /// however, we may enter fallback mode, in which case we will populate
40 /// the child policy's addresses from the backend addresses returned by the
41 /// resolver.
42 ///
43 /// Once a child policy instance is in place (and getting updated as described),
44 /// calls for a pick, a ping, or a cancellation will be serviced right
45 /// away by forwarding them to the child policy instance. Any time there's no
46 /// child policy available (i.e., right after the creation of the gRPCLB
47 /// policy), pick requests are queued.
48 ///
49 /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
50 /// high level design and details.
51
52 #include <grpc/support/port_platform.h>
53
54 #include "src/core/load_balancing/grpclb/grpclb.h"
55
56 #include <grpc/event_engine/event_engine.h>
57 #include <grpc/impl/channel_arg_names.h>
58 #include <grpc/support/json.h>
59
60 // IWYU pragma: no_include <sys/socket.h>
61
62 #include <inttypes.h>
63 #include <string.h>
64
65 #include <algorithm>
66 #include <atomic>
67 #include <map>
68 #include <memory>
69 #include <string>
70 #include <type_traits>
71 #include <utility>
72 #include <vector>
73
74 #include "absl/container/inlined_vector.h"
75 #include "absl/functional/function_ref.h"
76 #include "absl/status/status.h"
77 #include "absl/status/statusor.h"
78 #include "absl/strings/str_cat.h"
79 #include "absl/strings/str_format.h"
80 #include "absl/strings/str_join.h"
81 #include "absl/strings/string_view.h"
82 #include "absl/types/optional.h"
83 #include "absl/types/variant.h"
84 #include "upb/mem/arena.hpp"
85
86 #include <grpc/byte_buffer.h>
87 #include <grpc/byte_buffer_reader.h>
88 #include <grpc/grpc.h>
89 #include <grpc/impl/connectivity_state.h>
90 #include <grpc/impl/propagation_bits.h>
91 #include <grpc/slice.h>
92 #include <grpc/status.h>
93 #include <grpc/support/alloc.h>
94 #include <grpc/support/log.h>
95
96 #include "src/core/client_channel/client_channel_filter.h"
97 #include "src/core/lib/address_utils/sockaddr_utils.h"
98 #include "src/core/lib/backoff/backoff.h"
99 #include "src/core/lib/channel/channel_args.h"
100 #include "src/core/lib/channel/channelz.h"
101 #include "src/core/lib/config/core_configuration.h"
102 #include "src/core/lib/debug/trace.h"
103 #include "src/core/lib/experiments/experiments.h"
104 #include "src/core/lib/gpr/string.h"
105 #include "src/core/lib/gpr/useful.h"
106 #include "src/core/lib/gprpp/crash.h"
107 #include "src/core/lib/gprpp/debug_location.h"
108 #include "src/core/lib/gprpp/orphanable.h"
109 #include "src/core/lib/gprpp/ref_counted.h"
110 #include "src/core/lib/gprpp/ref_counted_ptr.h"
111 #include "src/core/lib/gprpp/status_helper.h"
112 #include "src/core/lib/gprpp/time.h"
113 #include "src/core/lib/gprpp/validation_errors.h"
114 #include "src/core/lib/gprpp/work_serializer.h"
115 #include "src/core/lib/iomgr/closure.h"
116 #include "src/core/lib/iomgr/error.h"
117 #include "src/core/lib/iomgr/exec_ctx.h"
118 #include "src/core/lib/iomgr/pollset_set.h"
119 #include "src/core/lib/iomgr/resolved_address.h"
120 #include "src/core/lib/iomgr/sockaddr.h"
121 #include "src/core/lib/iomgr/socket_utils.h"
122 #include "src/core/lib/json/json.h"
123 #include "src/core/lib/json/json_args.h"
124 #include "src/core/lib/json/json_object_loader.h"
125 #include "src/core/lib/security/credentials/credentials.h"
126 #include "src/core/lib/slice/slice.h"
127 #include "src/core/lib/slice/slice_string_helpers.h"
128 #include "src/core/lib/surface/call.h"
129 #include "src/core/lib/surface/channel.h"
130 #include "src/core/lib/surface/channel_stack_type.h"
131 #include "src/core/lib/transport/connectivity_state.h"
132 #include "src/core/lib/transport/metadata_batch.h"
133 #include "src/core/load_balancing/child_policy_handler.h"
134 #include "src/core/load_balancing/delegating_helper.h"
135 #include "src/core/load_balancing/grpclb/client_load_reporting_filter.h"
136 #include "src/core/load_balancing/grpclb/grpclb_balancer_addresses.h"
137 #include "src/core/load_balancing/grpclb/grpclb_client_stats.h"
138 #include "src/core/load_balancing/grpclb/load_balancer_api.h"
139 #include "src/core/load_balancing/lb_policy.h"
140 #include "src/core/load_balancing/lb_policy_factory.h"
141 #include "src/core/load_balancing/lb_policy_registry.h"
142 #include "src/core/load_balancing/subchannel_interface.h"
143 #include "src/core/resolver/endpoint_addresses.h"
144 #include "src/core/resolver/fake/fake_resolver.h"
145 #include "src/core/resolver/resolver.h"
146
147 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
148 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
149 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
150 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2
151 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
152 #define GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS 10000
153
154 // Channel arg used to enable load reporting filter.
155 #define GRPC_ARG_GRPCLB_ENABLE_LOAD_REPORTING_FILTER \
156 "grpc.internal.grpclb_enable_load_reporting_filter"
157
158 namespace grpc_core {
159
160 TraceFlag grpc_lb_glb_trace(false, "glb");
161
162 namespace {
163
164 using ::grpc_event_engine::experimental::EventEngine;
165
166 constexpr absl::string_view kGrpclb = "grpclb";
167
168 class GrpcLbConfig final : public LoadBalancingPolicy::Config {
169 public:
170 GrpcLbConfig() = default;
171
172 GrpcLbConfig(const GrpcLbConfig&) = delete;
173 GrpcLbConfig& operator=(const GrpcLbConfig&) = delete;
174
175 GrpcLbConfig(GrpcLbConfig&& other) = delete;
176 GrpcLbConfig& operator=(GrpcLbConfig&& other) = delete;
177
JsonLoader(const JsonArgs &)178 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
179 static const auto* loader =
180 JsonObjectLoader<GrpcLbConfig>()
181 // Note: "childPolicy" field requires custom parsing, so
182 // it's handled in JsonPostLoad() instead.
183 .OptionalField("serviceName", &GrpcLbConfig::service_name_)
184 .Finish();
185 return loader;
186 }
187
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)188 void JsonPostLoad(const Json& json, const JsonArgs&,
189 ValidationErrors* errors) {
190 ValidationErrors::ScopedField field(errors, ".childPolicy");
191 Json child_policy_config_json_tmp;
192 const Json* child_policy_config_json;
193 auto it = json.object().find("childPolicy");
194 if (it == json.object().end()) {
195 child_policy_config_json_tmp = Json::FromArray({Json::FromObject({
196 {"round_robin", Json::FromObject({})},
197 })});
198 child_policy_config_json = &child_policy_config_json_tmp;
199 } else {
200 child_policy_config_json = &it->second;
201 }
202 auto child_policy_config =
203 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
204 *child_policy_config_json);
205 if (!child_policy_config.ok()) {
206 errors->AddError(child_policy_config.status().message());
207 return;
208 }
209 child_policy_ = std::move(*child_policy_config);
210 }
211
name() const212 absl::string_view name() const override { return kGrpclb; }
213
child_policy() const214 RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
215 return child_policy_;
216 }
217
service_name() const218 const std::string& service_name() const { return service_name_; }
219
220 private:
221 RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
222 std::string service_name_;
223 };
224
225 class GrpcLb final : public LoadBalancingPolicy {
226 public:
227 explicit GrpcLb(Args args);
228
name() const229 absl::string_view name() const override { return kGrpclb; }
230
231 absl::Status UpdateLocked(UpdateArgs args) override;
232 void ResetBackoffLocked() override;
233
234 private:
235 /// Contains a call to the LB server and all the data related to the call.
236 class BalancerCallState final
237 : public InternallyRefCounted<BalancerCallState> {
238 public:
239 explicit BalancerCallState(
240 RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
241 ~BalancerCallState() override;
242
243 // It's the caller's responsibility to ensure that Orphan() is called from
244 // inside the combiner.
245 void Orphan() override;
246
247 void StartQuery();
248
client_stats() const249 GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
250
seen_initial_response() const251 bool seen_initial_response() const { return seen_initial_response_; }
seen_serverlist() const252 bool seen_serverlist() const { return seen_serverlist_; }
253
254 private:
grpclb_policy() const255 GrpcLb* grpclb_policy() const {
256 return static_cast<GrpcLb*>(grpclb_policy_.get());
257 }
258
259 void ScheduleNextClientLoadReportLocked();
260 void SendClientLoadReportLocked();
261
262 // EventEngine callbacks
263 void MaybeSendClientLoadReportLocked();
264
265 static void ClientLoadReportDone(void* arg, grpc_error_handle error);
266 static void OnInitialRequestSent(void* arg, grpc_error_handle error);
267 static void OnBalancerMessageReceived(void* arg, grpc_error_handle error);
268 static void OnBalancerStatusReceived(void* arg, grpc_error_handle error);
269
270 void ClientLoadReportDoneLocked(grpc_error_handle error);
271 void OnInitialRequestSentLocked();
272 void OnBalancerMessageReceivedLocked();
273 void OnBalancerStatusReceivedLocked(grpc_error_handle error);
274
275 // The owning LB policy.
276 RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
277
278 // The streaming call to the LB server. Always non-NULL.
279 grpc_call* lb_call_ = nullptr;
280
281 // recv_initial_metadata
282 grpc_metadata_array lb_initial_metadata_recv_;
283
284 // send_message
285 grpc_byte_buffer* send_message_payload_ = nullptr;
286 grpc_closure lb_on_initial_request_sent_;
287
288 // recv_message
289 grpc_byte_buffer* recv_message_payload_ = nullptr;
290 grpc_closure lb_on_balancer_message_received_;
291 bool seen_initial_response_ = false;
292 bool seen_serverlist_ = false;
293
294 // recv_trailing_metadata
295 grpc_closure lb_on_balancer_status_received_;
296 grpc_metadata_array lb_trailing_metadata_recv_;
297 grpc_status_code lb_call_status_;
298 grpc_slice lb_call_status_details_;
299
300 // The stats for client-side load reporting associated with this LB call.
301 // Created after the first serverlist is received.
302 RefCountedPtr<GrpcLbClientStats> client_stats_;
303 Duration client_stats_report_interval_;
304 absl::optional<EventEngine::TaskHandle> client_load_report_handle_;
305 bool last_client_load_report_counters_were_zero_ = false;
306 bool client_load_report_is_due_ = false;
307 // The closure used for the completion of sending the load report.
308 grpc_closure client_load_report_done_closure_;
309 };
310
311 class SubchannelWrapper final : public DelegatingSubchannel {
312 public:
SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,RefCountedPtr<GrpcLb> lb_policy,std::string lb_token,RefCountedPtr<GrpcLbClientStats> client_stats)313 SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,
314 RefCountedPtr<GrpcLb> lb_policy, std::string lb_token,
315 RefCountedPtr<GrpcLbClientStats> client_stats)
316 : DelegatingSubchannel(std::move(subchannel)),
317 lb_policy_(std::move(lb_policy)),
318 lb_token_(std::move(lb_token)),
319 client_stats_(std::move(client_stats)) {}
320
lb_token() const321 const std::string& lb_token() const { return lb_token_; }
client_stats() const322 GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
323
324 private:
Orphaned()325 void Orphaned() override {
326 if (!IsWorkSerializerDispatchEnabled()) {
327 if (!lb_policy_->shutting_down_) {
328 lb_policy_->CacheDeletedSubchannelLocked(wrapped_subchannel());
329 }
330 return;
331 }
332 lb_policy_->work_serializer()->Run(
333 [self = WeakRefAsSubclass<SubchannelWrapper>()]() {
334 if (!self->lb_policy_->shutting_down_) {
335 self->lb_policy_->CacheDeletedSubchannelLocked(
336 self->wrapped_subchannel());
337 }
338 },
339 DEBUG_LOCATION);
340 }
341
342 RefCountedPtr<GrpcLb> lb_policy_;
343 std::string lb_token_;
344 RefCountedPtr<GrpcLbClientStats> client_stats_;
345 };
346
347 class TokenAndClientStatsArg final
348 : public RefCounted<TokenAndClientStatsArg> {
349 public:
TokenAndClientStatsArg(std::string lb_token,RefCountedPtr<GrpcLbClientStats> client_stats)350 TokenAndClientStatsArg(std::string lb_token,
351 RefCountedPtr<GrpcLbClientStats> client_stats)
352 : lb_token_(std::move(lb_token)),
353 client_stats_(std::move(client_stats)) {}
354
ChannelArgName()355 static absl::string_view ChannelArgName() {
356 return GRPC_ARG_NO_SUBCHANNEL_PREFIX "grpclb_token_and_client_stats";
357 }
358
ChannelArgsCompare(const TokenAndClientStatsArg * a,const TokenAndClientStatsArg * b)359 static int ChannelArgsCompare(const TokenAndClientStatsArg* a,
360 const TokenAndClientStatsArg* b) {
361 int r = a->lb_token_.compare(b->lb_token_);
362 if (r != 0) return r;
363 return QsortCompare(a->client_stats_.get(), b->client_stats_.get());
364 }
365
lb_token() const366 const std::string& lb_token() const { return lb_token_; }
client_stats() const367 RefCountedPtr<GrpcLbClientStats> client_stats() const {
368 return client_stats_;
369 }
370
371 private:
372 std::string lb_token_;
373 RefCountedPtr<GrpcLbClientStats> client_stats_;
374 };
375
376 class Serverlist final : public RefCounted<Serverlist> {
377 public:
378 // Takes ownership of serverlist.
Serverlist(std::vector<GrpcLbServer> serverlist)379 explicit Serverlist(std::vector<GrpcLbServer> serverlist)
380 : serverlist_(std::move(serverlist)) {}
381
382 bool operator==(const Serverlist& other) const;
383
serverlist() const384 const std::vector<GrpcLbServer>& serverlist() const { return serverlist_; }
385
386 // Returns a text representation suitable for logging.
387 std::string AsText() const;
388
389 // Extracts all non-drop entries into an EndpointAddressesIterator.
390 std::shared_ptr<EndpointAddressesIterator> GetServerAddressList(
391 GrpcLbClientStats* client_stats);
392
393 // Returns true if the serverlist contains at least one drop entry and
394 // no backend address entries.
395 bool ContainsAllDropEntries() const;
396
397 // Returns the LB token to use for a drop, or null if the call
398 // should not be dropped.
399 //
400 // Note: This is called from the picker, NOT from inside the control
401 // plane work_serializer.
402 const char* ShouldDrop();
403
404 private:
405 class AddressIterator;
406
407 std::vector<GrpcLbServer> serverlist_;
408
409 // Accessed from the picker, so needs synchronization.
410 std::atomic<size_t> drop_index_{0};
411 };
412
413 class Picker final : public SubchannelPicker {
414 public:
Picker(RefCountedPtr<Serverlist> serverlist,RefCountedPtr<SubchannelPicker> child_picker,RefCountedPtr<GrpcLbClientStats> client_stats)415 Picker(RefCountedPtr<Serverlist> serverlist,
416 RefCountedPtr<SubchannelPicker> child_picker,
417 RefCountedPtr<GrpcLbClientStats> client_stats)
418 : serverlist_(std::move(serverlist)),
419 child_picker_(std::move(child_picker)),
420 client_stats_(std::move(client_stats)) {}
421
422 PickResult Pick(PickArgs args) override;
423
424 private:
425 // A subchannel call tracker that unrefs the GrpcLbClientStats object
426 // in the case where the subchannel call is never actually started,
427 // since the client load reporting filter will not be able to do it
428 // in that case.
429 class SubchannelCallTracker final : public SubchannelCallTrackerInterface {
430 public:
SubchannelCallTracker(RefCountedPtr<GrpcLbClientStats> client_stats,std::unique_ptr<SubchannelCallTrackerInterface> original_call_tracker)431 SubchannelCallTracker(
432 RefCountedPtr<GrpcLbClientStats> client_stats,
433 std::unique_ptr<SubchannelCallTrackerInterface> original_call_tracker)
434 : client_stats_(std::move(client_stats)),
435 original_call_tracker_(std::move(original_call_tracker)) {}
436
Start()437 void Start() override {
438 if (original_call_tracker_ != nullptr) {
439 original_call_tracker_->Start();
440 }
441 // If we're actually starting the subchannel call, then the
442 // client load reporting filter will take ownership of the ref
443 // passed down to it via metadata.
444 client_stats_.release();
445 }
446
Finish(FinishArgs args)447 void Finish(FinishArgs args) override {
448 if (original_call_tracker_ != nullptr) {
449 original_call_tracker_->Finish(args);
450 }
451 }
452
453 private:
454 RefCountedPtr<GrpcLbClientStats> client_stats_;
455 std::unique_ptr<SubchannelCallTrackerInterface> original_call_tracker_;
456 };
457
458 // Serverlist to be used for determining drops.
459 RefCountedPtr<Serverlist> serverlist_;
460
461 RefCountedPtr<SubchannelPicker> child_picker_;
462 RefCountedPtr<GrpcLbClientStats> client_stats_;
463 };
464
465 class Helper final
466 : public ParentOwningDelegatingChannelControlHelper<GrpcLb> {
467 public:
Helper(RefCountedPtr<GrpcLb> parent)468 explicit Helper(RefCountedPtr<GrpcLb> parent)
469 : ParentOwningDelegatingChannelControlHelper(std::move(parent)) {}
470
471 RefCountedPtr<SubchannelInterface> CreateSubchannel(
472 const grpc_resolved_address& address,
473 const ChannelArgs& per_address_args, const ChannelArgs& args) override;
474 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
475 RefCountedPtr<SubchannelPicker> picker) override;
476 void RequestReresolution() override;
477 };
478
479 class StateWatcher final : public AsyncConnectivityStateWatcherInterface {
480 public:
StateWatcher(RefCountedPtr<GrpcLb> parent)481 explicit StateWatcher(RefCountedPtr<GrpcLb> parent)
482 : AsyncConnectivityStateWatcherInterface(parent->work_serializer()),
483 parent_(std::move(parent)) {}
484
~StateWatcher()485 ~StateWatcher() override { parent_.reset(DEBUG_LOCATION, "StateWatcher"); }
486
487 private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)488 void OnConnectivityStateChange(grpc_connectivity_state new_state,
489 const absl::Status& status) override {
490 if (parent_->fallback_at_startup_checks_pending_ &&
491 new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
492 // In TRANSIENT_FAILURE. Cancel the fallback timer and go into
493 // fallback mode immediately.
494 gpr_log(GPR_INFO,
495 "[grpclb %p] balancer channel in state:TRANSIENT_FAILURE (%s); "
496 "entering fallback mode",
497 parent_.get(), status.ToString().c_str());
498 parent_->fallback_at_startup_checks_pending_ = false;
499 parent_->channel_control_helper()->GetEventEngine()->Cancel(
500 *parent_->lb_fallback_timer_handle_);
501 parent_->fallback_mode_ = true;
502 parent_->CreateOrUpdateChildPolicyLocked();
503 // Cancel the watch, since we don't care about the channel state once we
504 // go into fallback mode.
505 parent_->CancelBalancerChannelConnectivityWatchLocked();
506 }
507 }
508
509 RefCountedPtr<GrpcLb> parent_;
510 };
511
512 class NullLbTokenEndpointIterator;
513
514 void ShutdownLocked() override;
515
516 // Helper functions used in UpdateLocked().
517 absl::Status UpdateBalancerChannelLocked();
518
519 void CancelBalancerChannelConnectivityWatchLocked();
520
521 // Methods for dealing with fallback state.
522 void MaybeEnterFallbackModeAfterStartup();
523 void OnFallbackTimerLocked();
524
525 // Methods for dealing with the balancer call.
526 void StartBalancerCallLocked();
527 void StartBalancerCallRetryTimerLocked();
528 void OnBalancerCallRetryTimerLocked();
529
530 // Methods for dealing with the child policy.
531 ChannelArgs CreateChildPolicyArgsLocked(
532 bool is_backend_from_grpclb_load_balancer);
533 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
534 const ChannelArgs& args);
535 void CreateOrUpdateChildPolicyLocked();
536
537 // Subchannel caching.
538 void CacheDeletedSubchannelLocked(
539 RefCountedPtr<SubchannelInterface> subchannel);
540 void StartSubchannelCacheTimerLocked();
541 void OnSubchannelCacheTimerLocked();
542
543 // Configurations for the policy.
544 RefCountedPtr<GrpcLbConfig> config_;
545
546 // Current channel args from the resolver.
547 ChannelArgs args_;
548
549 // Internal state.
550 bool shutting_down_ = false;
551
552 // The channel for communicating with the LB server.
553 OrphanablePtr<Channel> lb_channel_;
554 StateWatcher* watcher_ = nullptr;
555 // Response generator to inject address updates into lb_channel_.
556 RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
557 // Parent channelz node.
558 RefCountedPtr<channelz::ChannelNode> parent_channelz_node_;
559
560 // The data associated with the current LB call. It holds a ref to this LB
561 // policy. It's initialized every time we query for backends. It's reset to
562 // NULL whenever the current LB call is no longer needed (e.g., the LB policy
563 // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
564 // contains a non-NULL lb_call_.
565 OrphanablePtr<BalancerCallState> lb_calld_;
566 // Timeout for the LB call. 0 means no deadline.
567 const Duration lb_call_timeout_;
568 // Balancer call retry state.
569 BackOff lb_call_backoff_;
570 absl::optional<EventEngine::TaskHandle> lb_call_retry_timer_handle_;
571
572 // The deserialized response from the balancer. May be nullptr until one
573 // such response has arrived.
574 RefCountedPtr<Serverlist> serverlist_;
575
576 // Whether we're in fallback mode.
577 bool fallback_mode_ = false;
578 // The backend addresses from the resolver.
579 absl::StatusOr<std::shared_ptr<NullLbTokenEndpointIterator>>
580 fallback_backend_addresses_;
581 // The last resolution note from our parent.
582 // To be passed to child policy when fallback_backend_addresses_ is empty.
583 std::string resolution_note_;
584 // State for fallback-at-startup checks.
585 // Timeout after startup after which we will go into fallback mode if
586 // we have not received a serverlist from the balancer.
587 const Duration fallback_at_startup_timeout_;
588 bool fallback_at_startup_checks_pending_ = false;
589 absl::optional<EventEngine::TaskHandle> lb_fallback_timer_handle_;
590
591 // The child policy to use for the backends.
592 OrphanablePtr<LoadBalancingPolicy> child_policy_;
593 // Child policy in state READY.
594 bool child_policy_ready_ = false;
595
596 // Deleted subchannel caching.
597 const Duration subchannel_cache_interval_;
598 std::map<Timestamp /*deletion time*/,
599 std::vector<RefCountedPtr<SubchannelInterface>>>
600 cached_subchannels_;
601 absl::optional<EventEngine::TaskHandle> subchannel_cache_timer_handle_;
602 };
603
604 //
605 // GrpcLb::Serverlist::AddressIterator
606 //
607
IsServerValid(const GrpcLbServer & server,size_t idx,bool log)608 bool IsServerValid(const GrpcLbServer& server, size_t idx, bool log) {
609 if (server.drop) return false;
610 if (GPR_UNLIKELY(server.port >> 16 != 0)) {
611 if (log) {
612 gpr_log(GPR_ERROR,
613 "Invalid port '%d' at index %" PRIuPTR
614 " of serverlist. Ignoring.",
615 server.port, idx);
616 }
617 return false;
618 }
619 if (GPR_UNLIKELY(server.ip_size != 4 && server.ip_size != 16)) {
620 if (log) {
621 gpr_log(GPR_ERROR,
622 "Expected IP to be 4 or 16 bytes, got %d at index %" PRIuPTR
623 " of serverlist. Ignoring",
624 server.ip_size, idx);
625 }
626 return false;
627 }
628 return true;
629 }
630
ParseServer(const GrpcLbServer & server,grpc_resolved_address * addr)631 void ParseServer(const GrpcLbServer& server, grpc_resolved_address* addr) {
632 memset(addr, 0, sizeof(*addr));
633 if (server.drop) return;
634 const uint16_t netorder_port = grpc_htons(static_cast<uint16_t>(server.port));
635 // the addresses are given in binary format (a in(6)_addr struct) in
636 // server->ip_address.bytes.
637 if (server.ip_size == 4) {
638 addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
639 grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
640 addr4->sin_family = GRPC_AF_INET;
641 memcpy(&addr4->sin_addr, server.ip_addr, server.ip_size);
642 addr4->sin_port = netorder_port;
643 } else if (server.ip_size == 16) {
644 addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
645 grpc_sockaddr_in6* addr6 =
646 reinterpret_cast<grpc_sockaddr_in6*>(&addr->addr);
647 addr6->sin6_family = GRPC_AF_INET6;
648 memcpy(&addr6->sin6_addr, server.ip_addr, server.ip_size);
649 addr6->sin6_port = netorder_port;
650 }
651 }
652
653 class GrpcLb::Serverlist::AddressIterator final
654 : public EndpointAddressesIterator {
655 public:
AddressIterator(RefCountedPtr<Serverlist> serverlist,RefCountedPtr<GrpcLbClientStats> client_stats)656 AddressIterator(RefCountedPtr<Serverlist> serverlist,
657 RefCountedPtr<GrpcLbClientStats> client_stats)
658 : serverlist_(std::move(serverlist)),
659 client_stats_(std::move(client_stats)) {}
660
ForEach(absl::FunctionRef<void (const EndpointAddresses &)> callback) const661 void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
662 const override {
663 for (size_t i = 0; i < serverlist_->serverlist_.size(); ++i) {
664 const GrpcLbServer& server = serverlist_->serverlist_[i];
665 if (!IsServerValid(server, i, false)) continue;
666 // Address processing.
667 grpc_resolved_address addr;
668 ParseServer(server, &addr);
669 // LB token processing.
670 const size_t lb_token_length = strnlen(
671 server.load_balance_token, GPR_ARRAY_SIZE(server.load_balance_token));
672 std::string lb_token(server.load_balance_token, lb_token_length);
673 if (lb_token.empty()) {
674 auto addr_uri = grpc_sockaddr_to_uri(&addr);
675 gpr_log(GPR_INFO,
676 "Missing LB token for backend address '%s'. The empty token "
677 "will be used instead",
678 addr_uri.ok() ? addr_uri->c_str()
679 : addr_uri.status().ToString().c_str());
680 }
681 // Return address with a channel arg containing LB token and stats object.
682 callback(EndpointAddresses(
683 addr, ChannelArgs().SetObject(MakeRefCounted<TokenAndClientStatsArg>(
684 std::move(lb_token), client_stats_))));
685 }
686 }
687
688 private:
689 RefCountedPtr<Serverlist> serverlist_;
690 RefCountedPtr<GrpcLbClientStats> client_stats_;
691 };
692
693 //
694 // GrpcLb::Serverlist
695 //
696
operator ==(const Serverlist & other) const697 bool GrpcLb::Serverlist::operator==(const Serverlist& other) const {
698 return serverlist_ == other.serverlist_;
699 }
700
AsText() const701 std::string GrpcLb::Serverlist::AsText() const {
702 std::vector<std::string> entries;
703 for (size_t i = 0; i < serverlist_.size(); ++i) {
704 const GrpcLbServer& server = serverlist_[i];
705 std::string ipport;
706 if (server.drop) {
707 ipport = "(drop)";
708 } else {
709 grpc_resolved_address addr;
710 ParseServer(server, &addr);
711 auto addr_str = grpc_sockaddr_to_string(&addr, false);
712 ipport = addr_str.ok() ? *addr_str : addr_str.status().ToString();
713 }
714 entries.push_back(absl::StrFormat(" %" PRIuPTR ": %s token=%s\n", i,
715 ipport, server.load_balance_token));
716 }
717 return absl::StrJoin(entries, "");
718 }
719
720 // Returns addresses extracted from the serverlist.
721 std::shared_ptr<EndpointAddressesIterator>
GetServerAddressList(GrpcLbClientStats * client_stats)722 GrpcLb::Serverlist::GetServerAddressList(GrpcLbClientStats* client_stats) {
723 RefCountedPtr<GrpcLbClientStats> stats;
724 if (client_stats != nullptr) stats = client_stats->Ref();
725 return std::make_shared<AddressIterator>(Ref(), std::move(stats));
726 }
727
ContainsAllDropEntries() const728 bool GrpcLb::Serverlist::ContainsAllDropEntries() const {
729 if (serverlist_.empty()) return false;
730 for (const GrpcLbServer& server : serverlist_) {
731 if (!server.drop) return false;
732 }
733 return true;
734 }
735
ShouldDrop()736 const char* GrpcLb::Serverlist::ShouldDrop() {
737 if (serverlist_.empty()) return nullptr;
738 size_t index = drop_index_.fetch_add(1, std::memory_order_relaxed);
739 GrpcLbServer& server = serverlist_[index % serverlist_.size()];
740 return server.drop ? server.load_balance_token : nullptr;
741 }
742
743 //
744 // GrpcLb::Picker
745 //
746
Pick(PickArgs args)747 GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
748 // Check if we should drop the call.
749 const char* drop_token =
750 serverlist_ == nullptr ? nullptr : serverlist_->ShouldDrop();
751 if (drop_token != nullptr) {
752 // Update client load reporting stats to indicate the number of
753 // dropped calls. Note that we have to do this here instead of in
754 // the client_load_reporting filter, because we do not create a
755 // subchannel call (and therefore no client_load_reporting filter)
756 // for dropped calls.
757 if (client_stats_ != nullptr) {
758 client_stats_->AddCallDropped(drop_token);
759 }
760 return PickResult::Drop(
761 absl::UnavailableError("drop directed by grpclb balancer"));
762 }
763 // Forward pick to child policy.
764 PickResult result = child_picker_->Pick(args);
765 // If pick succeeded, add LB token to initial metadata.
766 auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
767 if (complete_pick != nullptr) {
768 const SubchannelWrapper* subchannel_wrapper =
769 static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
770 // Encode client stats object into metadata for use by
771 // client_load_reporting filter.
772 GrpcLbClientStats* client_stats = subchannel_wrapper->client_stats();
773 if (client_stats != nullptr) {
774 complete_pick->subchannel_call_tracker =
775 std::make_unique<SubchannelCallTracker>(
776 client_stats->Ref(),
777 std::move(complete_pick->subchannel_call_tracker));
778 // The metadata value is a hack: we pretend the pointer points to
779 // a string and rely on the client_load_reporting filter to know
780 // how to interpret it.
781 // NOLINTBEGIN(bugprone-string-constructor)
782 args.initial_metadata->Add(
783 GrpcLbClientStatsMetadata::key(),
784 absl::string_view(reinterpret_cast<const char*>(client_stats), 0));
785 // NOLINTEND(bugprone-string-constructor)
786 // Update calls-started.
787 client_stats->AddCallStarted();
788 }
789 // Encode the LB token in metadata.
790 // Create a new copy on the call arena, since the subchannel list
791 // may get refreshed between when we return this pick and when the
792 // initial metadata goes out on the wire.
793 if (!subchannel_wrapper->lb_token().empty()) {
794 char* lb_token = static_cast<char*>(
795 args.call_state->Alloc(subchannel_wrapper->lb_token().size() + 1));
796 strcpy(lb_token, subchannel_wrapper->lb_token().c_str());
797 args.initial_metadata->Add(LbTokenMetadata::key(), lb_token);
798 }
799 // Unwrap subchannel to pass up to the channel.
800 complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
801 }
802 return result;
803 }
804
805 //
806 // GrpcLb::Helper
807 //
808
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)809 RefCountedPtr<SubchannelInterface> GrpcLb::Helper::CreateSubchannel(
810 const grpc_resolved_address& address, const ChannelArgs& per_address_args,
811 const ChannelArgs& args) {
812 if (parent()->shutting_down_) return nullptr;
813 const auto* arg = per_address_args.GetObject<TokenAndClientStatsArg>();
814 if (arg == nullptr) {
815 auto addr_str = grpc_sockaddr_to_string(&address, false);
816 Crash(
817 absl::StrFormat("[grpclb %p] no TokenAndClientStatsArg for address %s",
818 parent(), addr_str.value_or("N/A").c_str()));
819 }
820 std::string lb_token = arg->lb_token();
821 RefCountedPtr<GrpcLbClientStats> client_stats = arg->client_stats();
822 return MakeRefCounted<SubchannelWrapper>(
823 parent()->channel_control_helper()->CreateSubchannel(
824 address, per_address_args, args),
825 parent()->RefAsSubclass<GrpcLb>(DEBUG_LOCATION, "SubchannelWrapper"),
826 std::move(lb_token), std::move(client_stats));
827 }
828
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)829 void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
830 const absl::Status& status,
831 RefCountedPtr<SubchannelPicker> picker) {
832 if (parent()->shutting_down_) return;
833 // Record whether child policy reports READY.
834 parent()->child_policy_ready_ = state == GRPC_CHANNEL_READY;
835 // Enter fallback mode if needed.
836 parent()->MaybeEnterFallbackModeAfterStartup();
837 // We pass the serverlist to the picker so that it can handle drops.
838 // However, we don't want to handle drops in the case where the child
839 // policy is reporting a state other than READY (unless we are
840 // dropping *all* calls), because we don't want to process drops for picks
841 // that yield a QUEUE result; this would result in dropping too many calls,
842 // since we will see the queued picks multiple times, and we'd consider each
843 // one a separate call for the drop calculation. So in this case, we pass
844 // a null serverlist to the picker, which tells it not to do drops.
845 RefCountedPtr<Serverlist> serverlist;
846 if (state == GRPC_CHANNEL_READY ||
847 (parent()->serverlist_ != nullptr &&
848 parent()->serverlist_->ContainsAllDropEntries())) {
849 serverlist = parent()->serverlist_;
850 }
851 RefCountedPtr<GrpcLbClientStats> client_stats;
852 if (parent()->lb_calld_ != nullptr &&
853 parent()->lb_calld_->client_stats() != nullptr) {
854 client_stats = parent()->lb_calld_->client_stats()->Ref();
855 }
856 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
857 gpr_log(GPR_INFO,
858 "[grpclb %p helper %p] state=%s (%s) wrapping child "
859 "picker %p (serverlist=%p, client_stats=%p)",
860 parent(), this, ConnectivityStateName(state),
861 status.ToString().c_str(), picker.get(), serverlist.get(),
862 client_stats.get());
863 }
864 parent()->channel_control_helper()->UpdateState(
865 state, status,
866 MakeRefCounted<Picker>(std::move(serverlist), std::move(picker),
867 std::move(client_stats)));
868 }
869
RequestReresolution()870 void GrpcLb::Helper::RequestReresolution() {
871 if (parent()->shutting_down_) return;
872 // Ignore if we're not in fallback mode, because if we got the backend
873 // addresses from the balancer, re-resolving is not going to fix it.
874 if (!parent()->fallback_mode_) return;
875 parent()->channel_control_helper()->RequestReresolution();
876 }
877
878 //
879 // GrpcLb::BalancerCallState
880 //
881
BalancerCallState(RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)882 GrpcLb::BalancerCallState::BalancerCallState(
883 RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
884 : InternallyRefCounted<BalancerCallState>(
885 GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace) ? "BalancerCallState"
886 : nullptr),
887 grpclb_policy_(std::move(parent_grpclb_policy)) {
888 GPR_ASSERT(grpclb_policy_ != nullptr);
889 GPR_ASSERT(!grpclb_policy()->shutting_down_);
890 // Init the LB call. Note that the LB call will progress every time there's
891 // activity in grpclb_policy_->interested_parties(), which is comprised of
892 // the polling entities from client_channel.
893 GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSent, this,
894 grpc_schedule_on_exec_ctx);
895 GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
896 OnBalancerMessageReceived, this, grpc_schedule_on_exec_ctx);
897 GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, OnBalancerStatusReceived,
898 this, grpc_schedule_on_exec_ctx);
899 GRPC_CLOSURE_INIT(&client_load_report_done_closure_, ClientLoadReportDone,
900 this, grpc_schedule_on_exec_ctx);
901 const Timestamp deadline =
902 grpclb_policy()->lb_call_timeout_ == Duration::Zero()
903 ? Timestamp::InfFuture()
904 : Timestamp::Now() + grpclb_policy()->lb_call_timeout_;
905 lb_call_ = grpclb_policy()->lb_channel_->CreateCall(
906 /*parent_call=*/nullptr, GRPC_PROPAGATE_DEFAULTS,
907 /*cq=*/nullptr, grpclb_policy_->interested_parties(),
908 Slice::FromStaticString("/grpc.lb.v1.LoadBalancer/BalanceLoad"),
909 /*authority=*/absl::nullopt, deadline, /*registered_method=*/true);
910 // Init the LB call request payload.
911 upb::Arena arena;
912 grpc_slice request_payload_slice = GrpcLbRequestCreate(
913 grpclb_policy()->config_->service_name().empty()
914 ? grpclb_policy()->channel_control_helper()->GetAuthority()
915 : grpclb_policy()->config_->service_name(),
916 arena.ptr());
917 send_message_payload_ =
918 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
919 CSliceUnref(request_payload_slice);
920 // Init other data associated with the LB call.
921 grpc_metadata_array_init(&lb_initial_metadata_recv_);
922 grpc_metadata_array_init(&lb_trailing_metadata_recv_);
923 }
924
~BalancerCallState()925 GrpcLb::BalancerCallState::~BalancerCallState() {
926 GPR_ASSERT(lb_call_ != nullptr);
927 grpc_call_unref(lb_call_);
928 grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
929 grpc_metadata_array_destroy(&lb_trailing_metadata_recv_);
930 grpc_byte_buffer_destroy(send_message_payload_);
931 grpc_byte_buffer_destroy(recv_message_payload_);
932 CSliceUnref(lb_call_status_details_);
933 }
934
Orphan()935 void GrpcLb::BalancerCallState::Orphan() {
936 GPR_ASSERT(lb_call_ != nullptr);
937 // If we are here because grpclb_policy wants to cancel the call,
938 // lb_on_balancer_status_received_ will complete the cancellation and clean
939 // up. Otherwise, we are here because grpclb_policy has to orphan a failed
940 // call, then the following cancellation will be a no-op.
941 grpc_call_cancel_internal(lb_call_);
942 if (client_load_report_handle_.has_value() &&
943 grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
944 client_load_report_handle_.value())) {
945 Unref(DEBUG_LOCATION, "client_load_report cancelled");
946 }
947 // Note that the initial ref is hold by lb_on_balancer_status_received_
948 // instead of the caller of this function. So the corresponding unref happens
949 // in lb_on_balancer_status_received_ instead of here.
950 }
951
StartQuery()952 void GrpcLb::BalancerCallState::StartQuery() {
953 GPR_ASSERT(lb_call_ != nullptr);
954 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
955 gpr_log(GPR_INFO, "[grpclb %p] lb_calld=%p: Starting LB call %p",
956 grpclb_policy_.get(), this, lb_call_);
957 }
958 // Create the ops.
959 grpc_call_error call_error;
960 grpc_op ops[3];
961 memset(ops, 0, sizeof(ops));
962 // Op: send initial metadata.
963 grpc_op* op = ops;
964 op->op = GRPC_OP_SEND_INITIAL_METADATA;
965 op->data.send_initial_metadata.count = 0;
966 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
967 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
968 op->reserved = nullptr;
969 op++;
970 // Op: send request message.
971 GPR_ASSERT(send_message_payload_ != nullptr);
972 op->op = GRPC_OP_SEND_MESSAGE;
973 op->data.send_message.send_message = send_message_payload_;
974 op->flags = 0;
975 op->reserved = nullptr;
976 op++;
977 // TODO(roth): We currently track this ref manually. Once the
978 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
979 // with the callback.
980 auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
981 self.release();
982 call_error = grpc_call_start_batch_and_execute(lb_call_, ops,
983 static_cast<size_t>(op - ops),
984 &lb_on_initial_request_sent_);
985 GPR_ASSERT(GRPC_CALL_OK == call_error);
986 // Op: recv initial metadata.
987 op = ops;
988 op->op = GRPC_OP_RECV_INITIAL_METADATA;
989 op->data.recv_initial_metadata.recv_initial_metadata =
990 &lb_initial_metadata_recv_;
991 op->flags = 0;
992 op->reserved = nullptr;
993 op++;
994 // Op: recv response.
995 op->op = GRPC_OP_RECV_MESSAGE;
996 op->data.recv_message.recv_message = &recv_message_payload_;
997 op->flags = 0;
998 op->reserved = nullptr;
999 op++;
1000 // TODO(roth): We currently track this ref manually. Once the
1001 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1002 // with the callback.
1003 self = Ref(DEBUG_LOCATION, "on_message_received");
1004 self.release();
1005 call_error = grpc_call_start_batch_and_execute(
1006 lb_call_, ops, static_cast<size_t>(op - ops),
1007 &lb_on_balancer_message_received_);
1008 GPR_ASSERT(GRPC_CALL_OK == call_error);
1009 // Op: recv server status.
1010 op = ops;
1011 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1012 op->data.recv_status_on_client.trailing_metadata =
1013 &lb_trailing_metadata_recv_;
1014 op->data.recv_status_on_client.status = &lb_call_status_;
1015 op->data.recv_status_on_client.status_details = &lb_call_status_details_;
1016 op->flags = 0;
1017 op->reserved = nullptr;
1018 op++;
1019 // This callback signals the end of the LB call, so it relies on the initial
1020 // ref instead of a new ref. When it's invoked, it's the initial ref that is
1021 // unreffed.
1022 call_error = grpc_call_start_batch_and_execute(
1023 lb_call_, ops, static_cast<size_t>(op - ops),
1024 &lb_on_balancer_status_received_);
1025 GPR_ASSERT(GRPC_CALL_OK == call_error);
1026 }
1027
ScheduleNextClientLoadReportLocked()1028 void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
1029 client_load_report_handle_ =
1030 grpclb_policy()->channel_control_helper()->GetEventEngine()->RunAfter(
1031 client_stats_report_interval_, [this] {
1032 ApplicationCallbackExecCtx callback_exec_ctx;
1033 ExecCtx exec_ctx;
1034 grpclb_policy()->work_serializer()->Run(
1035 [this] { MaybeSendClientLoadReportLocked(); }, DEBUG_LOCATION);
1036 });
1037 }
1038
MaybeSendClientLoadReportLocked()1039 void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked() {
1040 client_load_report_handle_.reset();
1041 if (this != grpclb_policy()->lb_calld_.get()) {
1042 Unref(DEBUG_LOCATION, "client_load_report");
1043 return;
1044 }
1045 // If we've already sent the initial request, then we can go ahead and send
1046 // the load report. Otherwise, we need to wait until the initial request has
1047 // been sent to send this (see OnInitialRequestSentLocked()).
1048 if (send_message_payload_ == nullptr) {
1049 SendClientLoadReportLocked();
1050 } else {
1051 client_load_report_is_due_ = true;
1052 }
1053 }
1054
SendClientLoadReportLocked()1055 void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
1056 // Construct message payload.
1057 GPR_ASSERT(send_message_payload_ == nullptr);
1058 // Get snapshot of stats.
1059 int64_t num_calls_started;
1060 int64_t num_calls_finished;
1061 int64_t num_calls_finished_with_client_failed_to_send;
1062 int64_t num_calls_finished_known_received;
1063 std::unique_ptr<GrpcLbClientStats::DroppedCallCounts> drop_token_counts;
1064 client_stats_->Get(&num_calls_started, &num_calls_finished,
1065 &num_calls_finished_with_client_failed_to_send,
1066 &num_calls_finished_known_received, &drop_token_counts);
1067 // Skip client load report if the counters were all zero in the last
1068 // report and they are still zero in this one.
1069 if (num_calls_started == 0 && num_calls_finished == 0 &&
1070 num_calls_finished_with_client_failed_to_send == 0 &&
1071 num_calls_finished_known_received == 0 &&
1072 (drop_token_counts == nullptr || drop_token_counts->empty())) {
1073 if (last_client_load_report_counters_were_zero_) {
1074 ScheduleNextClientLoadReportLocked();
1075 return;
1076 }
1077 last_client_load_report_counters_were_zero_ = true;
1078 } else {
1079 last_client_load_report_counters_were_zero_ = false;
1080 }
1081 // Populate load report.
1082 upb::Arena arena;
1083 grpc_slice request_payload_slice = GrpcLbLoadReportRequestCreate(
1084 num_calls_started, num_calls_finished,
1085 num_calls_finished_with_client_failed_to_send,
1086 num_calls_finished_known_received, drop_token_counts.get(), arena.ptr());
1087 send_message_payload_ =
1088 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1089 CSliceUnref(request_payload_slice);
1090 // Send the report.
1091 grpc_op op;
1092 memset(&op, 0, sizeof(op));
1093 op.op = GRPC_OP_SEND_MESSAGE;
1094 op.data.send_message.send_message = send_message_payload_;
1095 grpc_call_error call_error = grpc_call_start_batch_and_execute(
1096 lb_call_, &op, 1, &client_load_report_done_closure_);
1097 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1098 gpr_log(GPR_ERROR,
1099 "[grpclb %p] lb_calld=%p call_error=%d sending client load report",
1100 grpclb_policy_.get(), this, call_error);
1101 GPR_ASSERT(GRPC_CALL_OK == call_error);
1102 }
1103 }
1104
ClientLoadReportDone(void * arg,grpc_error_handle error)1105 void GrpcLb::BalancerCallState::ClientLoadReportDone(void* arg,
1106 grpc_error_handle error) {
1107 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1108 lb_calld->grpclb_policy()->work_serializer()->Run(
1109 [lb_calld, error]() { lb_calld->ClientLoadReportDoneLocked(error); },
1110 DEBUG_LOCATION);
1111 }
1112
ClientLoadReportDoneLocked(grpc_error_handle error)1113 void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(
1114 grpc_error_handle error) {
1115 grpc_byte_buffer_destroy(send_message_payload_);
1116 send_message_payload_ = nullptr;
1117 if (!error.ok() || this != grpclb_policy()->lb_calld_.get()) {
1118 Unref(DEBUG_LOCATION, "client_load_report");
1119 return;
1120 }
1121 ScheduleNextClientLoadReportLocked();
1122 }
1123
OnInitialRequestSent(void * arg,grpc_error_handle)1124 void GrpcLb::BalancerCallState::OnInitialRequestSent(
1125 void* arg, grpc_error_handle /*error*/) {
1126 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1127 lb_calld->grpclb_policy()->work_serializer()->Run(
1128 [lb_calld]() { lb_calld->OnInitialRequestSentLocked(); }, DEBUG_LOCATION);
1129 }
1130
OnInitialRequestSentLocked()1131 void GrpcLb::BalancerCallState::OnInitialRequestSentLocked() {
1132 grpc_byte_buffer_destroy(send_message_payload_);
1133 send_message_payload_ = nullptr;
1134 // If we attempted to send a client load report before the initial request was
1135 // sent (and this lb_calld is still in use), send the load report now.
1136 if (client_load_report_is_due_ && this == grpclb_policy()->lb_calld_.get()) {
1137 SendClientLoadReportLocked();
1138 client_load_report_is_due_ = false;
1139 }
1140 Unref(DEBUG_LOCATION, "on_initial_request_sent");
1141 }
1142
OnBalancerMessageReceived(void * arg,grpc_error_handle)1143 void GrpcLb::BalancerCallState::OnBalancerMessageReceived(
1144 void* arg, grpc_error_handle /*error*/) {
1145 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1146 lb_calld->grpclb_policy()->work_serializer()->Run(
1147 [lb_calld]() { lb_calld->OnBalancerMessageReceivedLocked(); },
1148 DEBUG_LOCATION);
1149 }
1150
OnBalancerMessageReceivedLocked()1151 void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
1152 // Null payload means the LB call was cancelled.
1153 if (this != grpclb_policy()->lb_calld_.get() ||
1154 recv_message_payload_ == nullptr) {
1155 Unref(DEBUG_LOCATION, "on_message_received");
1156 return;
1157 }
1158 grpc_byte_buffer_reader bbr;
1159 grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1160 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1161 grpc_byte_buffer_reader_destroy(&bbr);
1162 grpc_byte_buffer_destroy(recv_message_payload_);
1163 recv_message_payload_ = nullptr;
1164 GrpcLbResponse response;
1165 upb::Arena arena;
1166 if (!GrpcLbResponseParse(response_slice, arena.ptr(), &response) ||
1167 (response.type == response.INITIAL && seen_initial_response_)) {
1168 if (gpr_should_log(GPR_LOG_SEVERITY_ERROR)) {
1169 char* response_slice_str =
1170 grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
1171 gpr_log(GPR_ERROR,
1172 "[grpclb %p] lb_calld=%p: Invalid LB response received: '%s'. "
1173 "Ignoring.",
1174 grpclb_policy(), this, response_slice_str);
1175 gpr_free(response_slice_str);
1176 }
1177 } else {
1178 switch (response.type) {
1179 case response.INITIAL: {
1180 if (response.client_stats_report_interval != Duration::Zero()) {
1181 client_stats_report_interval_ = std::max(
1182 Duration::Seconds(1), response.client_stats_report_interval);
1183 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1184 gpr_log(GPR_INFO,
1185 "[grpclb %p] lb_calld=%p: Received initial LB response "
1186 "message; client load reporting interval = %" PRId64
1187 " milliseconds",
1188 grpclb_policy(), this,
1189 client_stats_report_interval_.millis());
1190 }
1191 } else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1192 gpr_log(GPR_INFO,
1193 "[grpclb %p] lb_calld=%p: Received initial LB response "
1194 "message; client load reporting NOT enabled",
1195 grpclb_policy(), this);
1196 }
1197 seen_initial_response_ = true;
1198 break;
1199 }
1200 case response.SERVERLIST: {
1201 GPR_ASSERT(lb_call_ != nullptr);
1202 auto serverlist_wrapper =
1203 MakeRefCounted<Serverlist>(std::move(response.serverlist));
1204 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1205 gpr_log(GPR_INFO,
1206 "[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR
1207 " servers received:\n%s",
1208 grpclb_policy(), this,
1209 serverlist_wrapper->serverlist().size(),
1210 serverlist_wrapper->AsText().c_str());
1211 }
1212 seen_serverlist_ = true;
1213 // Start sending client load report only after we start using the
1214 // serverlist returned from the current LB call.
1215 if (client_stats_report_interval_ > Duration::Zero() &&
1216 client_stats_ == nullptr) {
1217 client_stats_ = MakeRefCounted<GrpcLbClientStats>();
1218 // Ref held by callback.
1219 Ref(DEBUG_LOCATION, "client_load_report").release();
1220 ScheduleNextClientLoadReportLocked();
1221 }
1222 // Check if the serverlist differs from the previous one.
1223 if (grpclb_policy()->serverlist_ != nullptr &&
1224 *grpclb_policy()->serverlist_ == *serverlist_wrapper) {
1225 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1226 gpr_log(GPR_INFO,
1227 "[grpclb %p] lb_calld=%p: Incoming server list identical "
1228 "to current, ignoring.",
1229 grpclb_policy(), this);
1230 }
1231 } else { // New serverlist.
1232 // Dispose of the fallback.
1233 // TODO(roth): Ideally, we should stay in fallback mode until we
1234 // know that we can reach at least one of the backends in the new
1235 // serverlist. Unfortunately, we can't do that, since we need to
1236 // send the new addresses to the child policy in order to determine
1237 // if they are reachable, and if we don't exit fallback mode now,
1238 // CreateOrUpdateChildPolicyLocked() will use the fallback
1239 // addresses instead of the addresses from the new serverlist.
1240 // However, if we can't reach any of the servers in the new
1241 // serverlist, then the child policy will never switch away from
1242 // the fallback addresses, but the grpclb policy will still think
1243 // that we're not in fallback mode, which means that we won't send
1244 // updates to the child policy when the fallback addresses are
1245 // updated by the resolver. This is sub-optimal, but the only way
1246 // to fix it is to maintain a completely separate child policy for
1247 // fallback mode, and that's more work than we want to put into
1248 // the grpclb implementation at this point, since we're deprecating
1249 // it in favor of the xds policy. We will implement this the
1250 // right way in the xds policy instead.
1251 if (grpclb_policy()->fallback_mode_) {
1252 gpr_log(GPR_INFO,
1253 "[grpclb %p] Received response from balancer; exiting "
1254 "fallback mode",
1255 grpclb_policy());
1256 grpclb_policy()->fallback_mode_ = false;
1257 }
1258 if (grpclb_policy()->fallback_at_startup_checks_pending_) {
1259 grpclb_policy()->fallback_at_startup_checks_pending_ = false;
1260 grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
1261 *grpclb_policy()->lb_fallback_timer_handle_);
1262 grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
1263 }
1264 // Update the serverlist in the GrpcLb instance. This serverlist
1265 // instance will be destroyed either upon the next update or when the
1266 // GrpcLb instance is destroyed.
1267 grpclb_policy()->serverlist_ = std::move(serverlist_wrapper);
1268 grpclb_policy()->CreateOrUpdateChildPolicyLocked();
1269 }
1270 break;
1271 }
1272 case response.FALLBACK: {
1273 if (!grpclb_policy()->fallback_mode_) {
1274 gpr_log(GPR_INFO,
1275 "[grpclb %p] Entering fallback mode as requested by balancer",
1276 grpclb_policy());
1277 if (grpclb_policy()->fallback_at_startup_checks_pending_) {
1278 grpclb_policy()->fallback_at_startup_checks_pending_ = false;
1279 grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
1280 *grpclb_policy()->lb_fallback_timer_handle_);
1281 grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
1282 }
1283 grpclb_policy()->fallback_mode_ = true;
1284 grpclb_policy()->CreateOrUpdateChildPolicyLocked();
1285 // Reset serverlist, so that if the balancer exits fallback
1286 // mode by sending the same serverlist we were previously
1287 // using, we don't incorrectly ignore it as a duplicate.
1288 grpclb_policy()->serverlist_.reset();
1289 }
1290 break;
1291 }
1292 }
1293 }
1294 CSliceUnref(response_slice);
1295 if (!grpclb_policy()->shutting_down_) {
1296 // Keep listening for serverlist updates.
1297 grpc_op op;
1298 memset(&op, 0, sizeof(op));
1299 op.op = GRPC_OP_RECV_MESSAGE;
1300 op.data.recv_message.recv_message = &recv_message_payload_;
1301 op.flags = 0;
1302 op.reserved = nullptr;
1303 // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
1304 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
1305 lb_call_, &op, 1, &lb_on_balancer_message_received_);
1306 GPR_ASSERT(GRPC_CALL_OK == call_error);
1307 } else {
1308 Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
1309 }
1310 }
1311
OnBalancerStatusReceived(void * arg,grpc_error_handle error)1312 void GrpcLb::BalancerCallState::OnBalancerStatusReceived(
1313 void* arg, grpc_error_handle error) {
1314 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1315 lb_calld->grpclb_policy()->work_serializer()->Run(
1316 [lb_calld, error]() { lb_calld->OnBalancerStatusReceivedLocked(error); },
1317 DEBUG_LOCATION);
1318 }
1319
OnBalancerStatusReceivedLocked(grpc_error_handle error)1320 void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
1321 grpc_error_handle error) {
1322 GPR_ASSERT(lb_call_ != nullptr);
1323 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1324 char* status_details = grpc_slice_to_c_string(lb_call_status_details_);
1325 gpr_log(GPR_INFO,
1326 "[grpclb %p] lb_calld=%p: Status from LB server received. "
1327 "Status = %d, details = '%s', (lb_call: %p), error '%s'",
1328 grpclb_policy(), this, lb_call_status_, status_details, lb_call_,
1329 StatusToString(error).c_str());
1330 gpr_free(status_details);
1331 }
1332 // If this lb_calld is still in use, this call ended because of a failure so
1333 // we want to retry connecting. Otherwise, we have deliberately ended this
1334 // call and no further action is required.
1335 if (this == grpclb_policy()->lb_calld_.get()) {
1336 // If the fallback-at-startup checks are pending, go into fallback mode
1337 // immediately. This short-circuits the timeout for the fallback-at-startup
1338 // case.
1339 grpclb_policy()->lb_calld_.reset();
1340 if (grpclb_policy()->fallback_at_startup_checks_pending_) {
1341 GPR_ASSERT(!seen_serverlist_);
1342 gpr_log(GPR_INFO,
1343 "[grpclb %p] Balancer call finished without receiving "
1344 "serverlist; entering fallback mode",
1345 grpclb_policy());
1346 grpclb_policy()->fallback_at_startup_checks_pending_ = false;
1347 grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
1348 *grpclb_policy()->lb_fallback_timer_handle_);
1349 grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
1350 grpclb_policy()->fallback_mode_ = true;
1351 grpclb_policy()->CreateOrUpdateChildPolicyLocked();
1352 } else {
1353 // This handles the fallback-after-startup case.
1354 grpclb_policy()->MaybeEnterFallbackModeAfterStartup();
1355 }
1356 GPR_ASSERT(!grpclb_policy()->shutting_down_);
1357 grpclb_policy()->channel_control_helper()->RequestReresolution();
1358 if (seen_initial_response_) {
1359 // If we lose connection to the LB server, reset the backoff and restart
1360 // the LB call immediately.
1361 grpclb_policy()->lb_call_backoff_.Reset();
1362 grpclb_policy()->StartBalancerCallLocked();
1363 } else {
1364 // If this LB call fails establishing any connection to the LB server,
1365 // retry later.
1366 grpclb_policy()->StartBalancerCallRetryTimerLocked();
1367 }
1368 }
1369 Unref(DEBUG_LOCATION, "lb_call_ended");
1370 }
1371
1372 //
1373 // helper code for creating balancer channel
1374 //
1375
ExtractBalancerAddresses(const ChannelArgs & args)1376 EndpointAddressesList ExtractBalancerAddresses(const ChannelArgs& args) {
1377 const EndpointAddressesList* endpoints =
1378 FindGrpclbBalancerAddressesInChannelArgs(args);
1379 if (endpoints != nullptr) return *endpoints;
1380 return EndpointAddressesList();
1381 }
1382
1383 // Returns the channel args for the LB channel, used to create a bidirectional
1384 // stream for the reception of load balancing updates.
1385 //
1386 // Inputs:
1387 // - \a response_generator: in order to propagate updates from the resolver
1388 // above the grpclb policy.
1389 // - \a args: other args inherited from the grpclb policy.
BuildBalancerChannelArgs(FakeResolverResponseGenerator * response_generator,const ChannelArgs & args)1390 ChannelArgs BuildBalancerChannelArgs(
1391 FakeResolverResponseGenerator* response_generator,
1392 const ChannelArgs& args) {
1393 ChannelArgs grpclb_channel_args;
1394 const grpc_channel_args* lb_channel_specific_args =
1395 args.GetPointer<grpc_channel_args>(
1396 GRPC_ARG_EXPERIMENTAL_GRPCLB_CHANNEL_ARGS);
1397 if (lb_channel_specific_args != nullptr) {
1398 grpclb_channel_args = ChannelArgs::FromC(lb_channel_specific_args);
1399 } else {
1400 // Set grpclb_channel_args based on the parent channel's channel args.
1401 grpclb_channel_args =
1402 args
1403 // LB policy name, since we want to use the default (pick_first) in
1404 // the LB channel.
1405 .Remove(GRPC_ARG_LB_POLICY_NAME)
1406 // Strip out the service config, since we don't want the LB policy
1407 // config specified for the parent channel to affect the LB channel.
1408 .Remove(GRPC_ARG_SERVICE_CONFIG)
1409 // The fake resolver response generator, because we are replacing it
1410 // with the one from the grpclb policy, used to propagate updates to
1411 // the LB channel.
1412 .Remove(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR)
1413 // The LB channel should use the authority indicated by the target
1414 // authority table (see \a ModifyGrpclbBalancerChannelArgs),
1415 // as opposed to the authority from the parent channel.
1416 .Remove(GRPC_ARG_DEFAULT_AUTHORITY)
1417 // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should
1418 // be treated as a stand-alone channel and not inherit this argument
1419 // from the args of the parent channel.
1420 .Remove(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)
1421 // Don't want to pass down channelz node from parent; the balancer
1422 // channel will get its own.
1423 .Remove(GRPC_ARG_CHANNELZ_CHANNEL_NODE)
1424 // Remove the channel args for channel credentials and replace it
1425 // with a version that does not contain call credentials. The
1426 // loadbalancer is not necessarily trusted to handle bearer token
1427 // credentials.
1428 .Remove(GRPC_ARG_CHANNEL_CREDENTIALS);
1429 }
1430 return grpclb_channel_args
1431 // A channel arg indicating the target is a grpclb load balancer.
1432 .Set(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER, 1)
1433 // Tells channelz that this is an internal channel.
1434 .Set(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL, 1)
1435 // The fake resolver response generator, which we use to inject
1436 // address updates into the LB channel.
1437 .SetObject(response_generator->Ref());
1438 }
1439
1440 //
1441 // ctor and dtor
1442 //
1443
GrpcLb(Args args)1444 GrpcLb::GrpcLb(Args args)
1445 : LoadBalancingPolicy(std::move(args)),
1446 response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
1447 lb_call_timeout_(std::max(
1448 Duration::Zero(),
1449 channel_args()
1450 .GetDurationFromIntMillis(GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS)
1451 .value_or(Duration::Zero()))),
1452 lb_call_backoff_(
1453 BackOff::Options()
1454 .set_initial_backoff(Duration::Seconds(
1455 GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS))
1456 .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
1457 .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
1458 .set_max_backoff(Duration::Seconds(
1459 GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS))),
1460 fallback_at_startup_timeout_(std::max(
1461 Duration::Zero(),
1462 channel_args()
1463 .GetDurationFromIntMillis(GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS)
1464 .value_or(Duration::Milliseconds(
1465 GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS)))),
1466 subchannel_cache_interval_(std::max(
1467 Duration::Zero(),
1468 channel_args()
1469 .GetDurationFromIntMillis(
1470 GRPC_ARG_GRPCLB_SUBCHANNEL_CACHE_INTERVAL_MS)
1471 .value_or(Duration::Milliseconds(
1472 GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS)))) {
1473 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1474 gpr_log(GPR_INFO,
1475 "[grpclb %p] Will use '%s' as the server name for LB request.",
1476 this,
1477 std::string(channel_control_helper()->GetAuthority()).c_str());
1478 }
1479 }
1480
ShutdownLocked()1481 void GrpcLb::ShutdownLocked() {
1482 shutting_down_ = true;
1483 lb_calld_.reset();
1484 if (subchannel_cache_timer_handle_.has_value()) {
1485 channel_control_helper()->GetEventEngine()->Cancel(
1486 *subchannel_cache_timer_handle_);
1487 subchannel_cache_timer_handle_.reset();
1488 }
1489 cached_subchannels_.clear();
1490 if (lb_call_retry_timer_handle_.has_value()) {
1491 channel_control_helper()->GetEventEngine()->Cancel(
1492 *lb_call_retry_timer_handle_);
1493 }
1494 if (fallback_at_startup_checks_pending_) {
1495 fallback_at_startup_checks_pending_ = false;
1496 channel_control_helper()->GetEventEngine()->Cancel(
1497 *lb_fallback_timer_handle_);
1498 CancelBalancerChannelConnectivityWatchLocked();
1499 }
1500 if (child_policy_ != nullptr) {
1501 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
1502 interested_parties());
1503 child_policy_.reset();
1504 }
1505 // We destroy the LB channel here instead of in our destructor because
1506 // destroying the channel triggers a last callback to
1507 // OnBalancerChannelConnectivityChangedLocked(), and we need to be
1508 // alive when that callback is invoked.
1509 if (lb_channel_ != nullptr) {
1510 if (parent_channelz_node_ != nullptr) {
1511 channelz::ChannelNode* child_channelz_node = lb_channel_->channelz_node();
1512 GPR_ASSERT(child_channelz_node != nullptr);
1513 parent_channelz_node_->RemoveChildChannel(child_channelz_node->uuid());
1514 }
1515 lb_channel_.reset();
1516 }
1517 }
1518
1519 //
1520 // public methods
1521 //
1522
ResetBackoffLocked()1523 void GrpcLb::ResetBackoffLocked() {
1524 if (lb_channel_ != nullptr) {
1525 lb_channel_->ResetConnectionBackoff();
1526 }
1527 if (child_policy_ != nullptr) {
1528 child_policy_->ResetBackoffLocked();
1529 }
1530 }
1531
1532 // Endpoint iterator wrapper to add null LB token attribute.
1533 class GrpcLb::NullLbTokenEndpointIterator final
1534 : public EndpointAddressesIterator {
1535 public:
NullLbTokenEndpointIterator(std::shared_ptr<EndpointAddressesIterator> parent_it)1536 explicit NullLbTokenEndpointIterator(
1537 std::shared_ptr<EndpointAddressesIterator> parent_it)
1538 : parent_it_(std::move(parent_it)) {}
1539
ForEach(absl::FunctionRef<void (const EndpointAddresses &)> callback) const1540 void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
1541 const override {
1542 parent_it_->ForEach([&](const EndpointAddresses& endpoint) {
1543 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1544 gpr_log(GPR_INFO, "[grpclb %p] fallback address: %s", this,
1545 endpoint.ToString().c_str());
1546 }
1547 callback(EndpointAddresses(endpoint.addresses(),
1548 endpoint.args().SetObject(empty_token_)));
1549 });
1550 }
1551
1552 private:
1553 std::shared_ptr<EndpointAddressesIterator> parent_it_;
1554 RefCountedPtr<TokenAndClientStatsArg> empty_token_ =
1555 MakeRefCounted<TokenAndClientStatsArg>("", nullptr);
1556 };
1557
UpdateLocked(UpdateArgs args)1558 absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
1559 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1560 gpr_log(GPR_INFO, "[grpclb %p] received update", this);
1561 }
1562 const bool is_initial_update = lb_channel_ == nullptr;
1563 config_ = args.config.TakeAsSubclass<GrpcLbConfig>();
1564 GPR_ASSERT(config_ != nullptr);
1565 args_ = std::move(args.args);
1566 // Update fallback address list.
1567 if (!args.addresses.ok()) {
1568 fallback_backend_addresses_ = args.addresses.status();
1569 } else {
1570 fallback_backend_addresses_ = std::make_shared<NullLbTokenEndpointIterator>(
1571 std::move(*args.addresses));
1572 }
1573 resolution_note_ = std::move(args.resolution_note);
1574 // Update balancer channel.
1575 absl::Status status = UpdateBalancerChannelLocked();
1576 // Update the existing child policy, if any.
1577 if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
1578 // If this is the initial update, start the fallback-at-startup checks
1579 // and the balancer call.
1580 if (is_initial_update) {
1581 fallback_at_startup_checks_pending_ = true;
1582 // Start timer.
1583 lb_fallback_timer_handle_ =
1584 channel_control_helper()->GetEventEngine()->RunAfter(
1585 fallback_at_startup_timeout_,
1586 [self = RefAsSubclass<GrpcLb>(DEBUG_LOCATION,
1587 "on_fallback_timer")]() mutable {
1588 ApplicationCallbackExecCtx callback_exec_ctx;
1589 ExecCtx exec_ctx;
1590 auto self_ptr = self.get();
1591 self_ptr->work_serializer()->Run(
1592 [self = std::move(self)]() { self->OnFallbackTimerLocked(); },
1593 DEBUG_LOCATION);
1594 });
1595 // Start watching the channel's connectivity state. If the channel
1596 // goes into state TRANSIENT_FAILURE before the timer fires, we go into
1597 // fallback mode even if the fallback timeout has not elapsed.
1598 watcher_ =
1599 new StateWatcher(RefAsSubclass<GrpcLb>(DEBUG_LOCATION, "StateWatcher"));
1600 lb_channel_->AddConnectivityWatcher(
1601 GRPC_CHANNEL_IDLE,
1602 OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
1603 // Start balancer call.
1604 StartBalancerCallLocked();
1605 }
1606 return status;
1607 }
1608
1609 //
1610 // helpers for UpdateLocked()
1611 //
1612
UpdateBalancerChannelLocked()1613 absl::Status GrpcLb::UpdateBalancerChannelLocked() {
1614 // Get balancer addresses.
1615 EndpointAddressesList balancer_addresses = ExtractBalancerAddresses(args_);
1616 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1617 for (const auto& endpoint : balancer_addresses) {
1618 gpr_log(GPR_INFO, "[grpclb %p] balancer address: %s", this,
1619 endpoint.ToString().c_str());
1620 }
1621 }
1622 absl::Status status;
1623 if (balancer_addresses.empty()) {
1624 status = absl::UnavailableError("balancer address list must be non-empty");
1625 }
1626 // Create channel credentials that do not contain call credentials.
1627 auto channel_credentials = channel_control_helper()->GetChannelCredentials();
1628 // Construct args for balancer channel.
1629 ChannelArgs lb_channel_args =
1630 BuildBalancerChannelArgs(response_generator_.get(), args_);
1631 // Create balancer channel if needed.
1632 if (lb_channel_ == nullptr) {
1633 std::string uri_str =
1634 absl::StrCat("fake:///", channel_control_helper()->GetAuthority());
1635 lb_channel_.reset(Channel::FromC(
1636 grpc_channel_create(uri_str.c_str(), channel_credentials.get(),
1637 lb_channel_args.ToC().get())));
1638 GPR_ASSERT(lb_channel_ != nullptr);
1639 // Set up channelz linkage.
1640 channelz::ChannelNode* child_channelz_node = lb_channel_->channelz_node();
1641 auto parent_channelz_node = args_.GetObjectRef<channelz::ChannelNode>();
1642 if (child_channelz_node != nullptr && parent_channelz_node != nullptr) {
1643 parent_channelz_node->AddChildChannel(child_channelz_node->uuid());
1644 parent_channelz_node_ = std::move(parent_channelz_node);
1645 }
1646 }
1647 // Propagate updates to the LB channel (pick_first) through the fake
1648 // resolver.
1649 Resolver::Result result;
1650 result.addresses = std::move(balancer_addresses);
1651 // Pass channel creds via channel args, since the fake resolver won't
1652 // do this automatically.
1653 result.args = lb_channel_args.SetObject(std::move(channel_credentials));
1654 response_generator_->SetResponseAsync(std::move(result));
1655 // Return status.
1656 return status;
1657 }
1658
CancelBalancerChannelConnectivityWatchLocked()1659 void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
1660 lb_channel_->RemoveConnectivityWatcher(watcher_);
1661 }
1662
1663 //
1664 // code for balancer channel and call
1665 //
1666
StartBalancerCallLocked()1667 void GrpcLb::StartBalancerCallLocked() {
1668 GPR_ASSERT(lb_channel_ != nullptr);
1669 if (shutting_down_) return;
1670 // Init the LB call data.
1671 GPR_ASSERT(lb_calld_ == nullptr);
1672 lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
1673 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1674 gpr_log(GPR_INFO,
1675 "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
1676 this, lb_channel_.get(), lb_calld_.get());
1677 }
1678 lb_calld_->StartQuery();
1679 }
1680
StartBalancerCallRetryTimerLocked()1681 void GrpcLb::StartBalancerCallRetryTimerLocked() {
1682 Duration timeout = lb_call_backoff_.NextAttemptTime() - Timestamp::Now();
1683 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1684 gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this);
1685 if (timeout > Duration::Zero()) {
1686 gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.",
1687 this, timeout.millis());
1688 } else {
1689 gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active immediately.",
1690 this);
1691 }
1692 }
1693 lb_call_retry_timer_handle_ =
1694 channel_control_helper()->GetEventEngine()->RunAfter(
1695 timeout,
1696 [self = RefAsSubclass<GrpcLb>(
1697 DEBUG_LOCATION, "on_balancer_call_retry_timer")]() mutable {
1698 ApplicationCallbackExecCtx callback_exec_ctx;
1699 ExecCtx exec_ctx;
1700 auto self_ptr = self.get();
1701 self_ptr->work_serializer()->Run(
1702 [self = std::move(self)]() {
1703 self->OnBalancerCallRetryTimerLocked();
1704 },
1705 DEBUG_LOCATION);
1706 });
1707 }
1708
OnBalancerCallRetryTimerLocked()1709 void GrpcLb::OnBalancerCallRetryTimerLocked() {
1710 lb_call_retry_timer_handle_.reset();
1711 if (!shutting_down_ && lb_calld_ == nullptr) {
1712 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1713 gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", this);
1714 }
1715 StartBalancerCallLocked();
1716 }
1717 }
1718
1719 //
1720 // code for handling fallback mode
1721 //
1722
MaybeEnterFallbackModeAfterStartup()1723 void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
1724 // Enter fallback mode if all of the following are true:
1725 // - We are not currently in fallback mode.
1726 // - We are not currently waiting for the initial fallback timeout.
1727 // - We are not currently in contact with the balancer.
1728 // - The child policy is not in state READY.
1729 if (!fallback_mode_ && !fallback_at_startup_checks_pending_ &&
1730 (lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) &&
1731 !child_policy_ready_) {
1732 gpr_log(GPR_INFO,
1733 "[grpclb %p] lost contact with balancer and backends from "
1734 "most recent serverlist; entering fallback mode",
1735 this);
1736 fallback_mode_ = true;
1737 CreateOrUpdateChildPolicyLocked();
1738 }
1739 }
1740
OnFallbackTimerLocked()1741 void GrpcLb::OnFallbackTimerLocked() {
1742 // If we receive a serverlist after the timer fires but before this callback
1743 // actually runs, don't fall back.
1744 if (fallback_at_startup_checks_pending_ && !shutting_down_) {
1745 gpr_log(GPR_INFO,
1746 "[grpclb %p] No response from balancer after fallback timeout; "
1747 "entering fallback mode",
1748 this);
1749 fallback_at_startup_checks_pending_ = false;
1750 CancelBalancerChannelConnectivityWatchLocked();
1751 fallback_mode_ = true;
1752 CreateOrUpdateChildPolicyLocked();
1753 }
1754 }
1755
1756 //
1757 // code for interacting with the child policy
1758 //
1759
CreateChildPolicyArgsLocked(bool is_backend_from_grpclb_load_balancer)1760 ChannelArgs GrpcLb::CreateChildPolicyArgsLocked(
1761 bool is_backend_from_grpclb_load_balancer) {
1762 ChannelArgs r =
1763 args_
1764 .Set(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER,
1765 is_backend_from_grpclb_load_balancer)
1766 .Set(GRPC_ARG_GRPCLB_ENABLE_LOAD_REPORTING_FILTER, 1);
1767 if (is_backend_from_grpclb_load_balancer) {
1768 r = r.Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
1769 }
1770 return r;
1771 }
1772
CreateChildPolicyLocked(const ChannelArgs & args)1773 OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
1774 const ChannelArgs& args) {
1775 LoadBalancingPolicy::Args lb_policy_args;
1776 lb_policy_args.work_serializer = work_serializer();
1777 lb_policy_args.args = args;
1778 lb_policy_args.channel_control_helper =
1779 std::make_unique<Helper>(RefAsSubclass<GrpcLb>(DEBUG_LOCATION, "Helper"));
1780 OrphanablePtr<LoadBalancingPolicy> lb_policy =
1781 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
1782 &grpc_lb_glb_trace);
1783 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1784 gpr_log(GPR_INFO, "[grpclb %p] Created new child policy handler (%p)", this,
1785 lb_policy.get());
1786 }
1787 // Add the gRPC LB's interested_parties pollset_set to that of the newly
1788 // created child policy. This will make the child policy progress upon
1789 // activity on gRPC LB, which in turn is tied to the application's call.
1790 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1791 interested_parties());
1792 return lb_policy;
1793 }
1794
EndpointIteratorIsEmpty(const EndpointAddressesIterator & endpoints)1795 bool EndpointIteratorIsEmpty(const EndpointAddressesIterator& endpoints) {
1796 bool empty = true;
1797 endpoints.ForEach([&](const EndpointAddresses&) { empty = false; });
1798 return empty;
1799 }
1800
CreateOrUpdateChildPolicyLocked()1801 void GrpcLb::CreateOrUpdateChildPolicyLocked() {
1802 if (shutting_down_) return;
1803 // Construct update args.
1804 UpdateArgs update_args;
1805 bool is_backend_from_grpclb_load_balancer = false;
1806 if (fallback_mode_) {
1807 // If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
1808 // received any serverlist from the balancer, we use the fallback
1809 // backends returned by the resolver. Note that the fallback backend
1810 // list may be empty, in which case the new child policy will fail the
1811 // picks.
1812 update_args.addresses = fallback_backend_addresses_;
1813 if (fallback_backend_addresses_.ok() &&
1814 EndpointIteratorIsEmpty(**fallback_backend_addresses_)) {
1815 update_args.resolution_note = absl::StrCat(
1816 "grpclb in fallback mode without any fallback addresses: ",
1817 resolution_note_);
1818 }
1819 } else {
1820 update_args.addresses = serverlist_->GetServerAddressList(
1821 lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
1822 is_backend_from_grpclb_load_balancer = true;
1823 if (update_args.addresses.ok() &&
1824 EndpointIteratorIsEmpty(**update_args.addresses)) {
1825 update_args.resolution_note = "empty serverlist from grpclb balancer";
1826 }
1827 }
1828 update_args.args =
1829 CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
1830 GPR_ASSERT(update_args.args != ChannelArgs());
1831 update_args.config = config_->child_policy();
1832 // Create child policy if needed.
1833 if (child_policy_ == nullptr) {
1834 child_policy_ = CreateChildPolicyLocked(update_args.args);
1835 }
1836 // Update the policy.
1837 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1838 gpr_log(GPR_INFO, "[grpclb %p] Updating child policy handler %p", this,
1839 child_policy_.get());
1840 }
1841 // TODO(roth): If we're in fallback mode and the child policy rejects the
1842 // update, we should propagate that failure back to the resolver somehow.
1843 (void)child_policy_->UpdateLocked(std::move(update_args));
1844 }
1845
1846 //
1847 // subchannel caching
1848 //
1849
CacheDeletedSubchannelLocked(RefCountedPtr<SubchannelInterface> subchannel)1850 void GrpcLb::CacheDeletedSubchannelLocked(
1851 RefCountedPtr<SubchannelInterface> subchannel) {
1852 Timestamp deletion_time = Timestamp::Now() + subchannel_cache_interval_;
1853 cached_subchannels_[deletion_time].push_back(std::move(subchannel));
1854 if (!subchannel_cache_timer_handle_.has_value()) {
1855 StartSubchannelCacheTimerLocked();
1856 }
1857 }
1858
StartSubchannelCacheTimerLocked()1859 void GrpcLb::StartSubchannelCacheTimerLocked() {
1860 GPR_ASSERT(!cached_subchannels_.empty());
1861 subchannel_cache_timer_handle_ =
1862 channel_control_helper()->GetEventEngine()->RunAfter(
1863 cached_subchannels_.begin()->first - Timestamp::Now(),
1864 [self = RefAsSubclass<GrpcLb>(DEBUG_LOCATION,
1865 "OnSubchannelCacheTimer")]() mutable {
1866 ApplicationCallbackExecCtx callback_exec_ctx;
1867 ExecCtx exec_ctx;
1868 auto* self_ptr = self.get();
1869 self_ptr->work_serializer()->Run(
1870 [self = std::move(self)]() mutable {
1871 self->OnSubchannelCacheTimerLocked();
1872 },
1873 DEBUG_LOCATION);
1874 });
1875 }
1876
OnSubchannelCacheTimerLocked()1877 void GrpcLb::OnSubchannelCacheTimerLocked() {
1878 if (subchannel_cache_timer_handle_.has_value()) {
1879 subchannel_cache_timer_handle_.reset();
1880 auto it = cached_subchannels_.begin();
1881 if (it != cached_subchannels_.end()) {
1882 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1883 gpr_log(GPR_INFO,
1884 "[grpclb %p] removing %" PRIuPTR " subchannels from cache",
1885 this, it->second.size());
1886 }
1887 cached_subchannels_.erase(it);
1888 }
1889 if (!cached_subchannels_.empty()) {
1890 StartSubchannelCacheTimerLocked();
1891 return;
1892 }
1893 }
1894 }
1895
1896 //
1897 // factory
1898 //
1899
1900 class GrpcLbFactory final : public LoadBalancingPolicyFactory {
1901 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const1902 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1903 LoadBalancingPolicy::Args args) const override {
1904 return MakeOrphanable<GrpcLb>(std::move(args));
1905 }
1906
name() const1907 absl::string_view name() const override { return kGrpclb; }
1908
1909 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const1910 ParseLoadBalancingConfig(const Json& json) const override {
1911 return LoadFromJson<RefCountedPtr<GrpcLbConfig>>(
1912 json, JsonArgs(), "errors validating grpclb LB policy config");
1913 }
1914 };
1915
1916 } // namespace
1917
1918 //
1919 // Plugin registration
1920 //
1921
RegisterGrpcLbPolicy(CoreConfiguration::Builder * builder)1922 void RegisterGrpcLbPolicy(CoreConfiguration::Builder* builder) {
1923 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
1924 std::make_unique<GrpcLbFactory>());
1925 builder->channel_init()
1926 ->RegisterFilter<ClientLoadReportingFilter>(GRPC_CLIENT_SUBCHANNEL)
1927 .IfChannelArg(GRPC_ARG_GRPCLB_ENABLE_LOAD_REPORTING_FILTER, false);
1928 }
1929
1930 } // namespace grpc_core
1931