1 //
2 // Copyright 2016 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 /// Implementation of the gRPC LB policy.
18 ///
19 /// This policy takes as input a list of resolved addresses, which must
20 /// include at least one balancer address.
21 ///
22 /// An internal channel (\a lb_channel_) is created for the addresses
23 /// from that are balancers. This channel behaves just like a regular
24 /// channel that uses pick_first to select from the list of balancer
25 /// addresses.
26 ///
27 /// When we get our initial update, we instantiate the internal *streaming*
28 /// call to the LB server (whichever address pick_first chose). The call
29 /// will be complete when either the balancer sends status or when we cancel
30 /// the call (e.g., because we are shutting down). In needed, we retry the
31 /// call. If we received at least one valid message from the server, a new
32 /// call attempt will be made immediately; otherwise, we apply back-off
33 /// delays between attempts.
34 ///
35 /// We maintain an internal round_robin policy instance for distributing
36 /// requests across backends. Whenever we receive a new serverlist from
37 /// the balancer, we update the round_robin policy with the new list of
38 /// addresses. If we cannot communicate with the balancer on startup,
39 /// however, we may enter fallback mode, in which case we will populate
40 /// the child policy's addresses from the backend addresses returned by the
41 /// resolver.
42 ///
43 /// Once a child policy instance is in place (and getting updated as described),
44 /// calls for a pick, a ping, or a cancellation will be serviced right
45 /// away by forwarding them to the child policy instance. Any time there's no
46 /// child policy available (i.e., right after the creation of the gRPCLB
47 /// policy), pick requests are queued.
48 ///
49 /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
50 /// high level design and details.
51
52 #include "src/core/load_balancing/grpclb/grpclb.h"
53
54 #include <grpc/event_engine/event_engine.h>
55 #include <grpc/impl/channel_arg_names.h>
56 #include <grpc/support/json.h>
57 #include <grpc/support/port_platform.h>
58
59 // IWYU pragma: no_include <sys/socket.h>
60
61 #include <grpc/byte_buffer.h>
62 #include <grpc/byte_buffer_reader.h>
63 #include <grpc/grpc.h>
64 #include <grpc/impl/connectivity_state.h>
65 #include <grpc/impl/propagation_bits.h>
66 #include <grpc/slice.h>
67 #include <grpc/status.h>
68 #include <grpc/support/alloc.h>
69 #include <inttypes.h>
70 #include <string.h>
71
72 #include <algorithm>
73 #include <atomic>
74 #include <map>
75 #include <memory>
76 #include <string>
77 #include <type_traits>
78 #include <utility>
79 #include <vector>
80
81 #include "absl/container/inlined_vector.h"
82 #include "absl/functional/function_ref.h"
83 #include "absl/log/check.h"
84 #include "absl/log/globals.h"
85 #include "absl/log/log.h"
86 #include "absl/status/status.h"
87 #include "absl/status/statusor.h"
88 #include "absl/strings/str_cat.h"
89 #include "absl/strings/str_format.h"
90 #include "absl/strings/str_join.h"
91 #include "absl/strings/string_view.h"
92 #include "absl/types/optional.h"
93 #include "absl/types/variant.h"
94 #include "src/core/channelz/channelz.h"
95 #include "src/core/client_channel/client_channel_filter.h"
96 #include "src/core/config/core_configuration.h"
97 #include "src/core/lib/address_utils/sockaddr_utils.h"
98 #include "src/core/lib/channel/channel_args.h"
99 #include "src/core/lib/debug/trace.h"
100 #include "src/core/lib/experiments/experiments.h"
101 #include "src/core/lib/iomgr/closure.h"
102 #include "src/core/lib/iomgr/error.h"
103 #include "src/core/lib/iomgr/exec_ctx.h"
104 #include "src/core/lib/iomgr/pollset_set.h"
105 #include "src/core/lib/iomgr/resolved_address.h"
106 #include "src/core/lib/iomgr/sockaddr.h"
107 #include "src/core/lib/iomgr/socket_utils.h"
108 #include "src/core/lib/security/credentials/credentials.h"
109 #include "src/core/lib/slice/slice.h"
110 #include "src/core/lib/slice/slice_string_helpers.h"
111 #include "src/core/lib/surface/call.h"
112 #include "src/core/lib/surface/channel.h"
113 #include "src/core/lib/surface/channel_stack_type.h"
114 #include "src/core/lib/transport/connectivity_state.h"
115 #include "src/core/lib/transport/metadata_batch.h"
116 #include "src/core/load_balancing/child_policy_handler.h"
117 #include "src/core/load_balancing/delegating_helper.h"
118 #include "src/core/load_balancing/grpclb/client_load_reporting_filter.h"
119 #include "src/core/load_balancing/grpclb/grpclb_balancer_addresses.h"
120 #include "src/core/load_balancing/grpclb/grpclb_client_stats.h"
121 #include "src/core/load_balancing/grpclb/load_balancer_api.h"
122 #include "src/core/load_balancing/lb_policy.h"
123 #include "src/core/load_balancing/lb_policy_factory.h"
124 #include "src/core/load_balancing/lb_policy_registry.h"
125 #include "src/core/load_balancing/subchannel_interface.h"
126 #include "src/core/resolver/endpoint_addresses.h"
127 #include "src/core/resolver/fake/fake_resolver.h"
128 #include "src/core/resolver/resolver.h"
129 #include "src/core/util/backoff.h"
130 #include "src/core/util/crash.h"
131 #include "src/core/util/debug_location.h"
132 #include "src/core/util/json/json.h"
133 #include "src/core/util/json/json_args.h"
134 #include "src/core/util/json/json_object_loader.h"
135 #include "src/core/util/orphanable.h"
136 #include "src/core/util/ref_counted.h"
137 #include "src/core/util/ref_counted_ptr.h"
138 #include "src/core/util/status_helper.h"
139 #include "src/core/util/string.h"
140 #include "src/core/util/time.h"
141 #include "src/core/util/useful.h"
142 #include "src/core/util/validation_errors.h"
143 #include "src/core/util/work_serializer.h"
144 #include "upb/mem/arena.hpp"
145
146 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
147 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
148 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
149 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2
150 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
151 #define GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS 10000
152
153 // Channel arg used to enable load reporting filter.
154 #define GRPC_ARG_GRPCLB_ENABLE_LOAD_REPORTING_FILTER \
155 "grpc.internal.grpclb_enable_load_reporting_filter"
156
157 namespace grpc_core {
158
159 namespace {
160
161 using ::grpc_event_engine::experimental::EventEngine;
162
163 constexpr absl::string_view kGrpclb = "grpclb";
164
165 class GrpcLbConfig final : public LoadBalancingPolicy::Config {
166 public:
167 GrpcLbConfig() = default;
168
169 GrpcLbConfig(const GrpcLbConfig&) = delete;
170 GrpcLbConfig& operator=(const GrpcLbConfig&) = delete;
171
172 GrpcLbConfig(GrpcLbConfig&& other) = delete;
173 GrpcLbConfig& operator=(GrpcLbConfig&& other) = delete;
174
JsonLoader(const JsonArgs &)175 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
176 static const auto* loader =
177 JsonObjectLoader<GrpcLbConfig>()
178 // Note: "childPolicy" field requires custom parsing, so
179 // it's handled in JsonPostLoad() instead.
180 .OptionalField("serviceName", &GrpcLbConfig::service_name_)
181 .Finish();
182 return loader;
183 }
184
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)185 void JsonPostLoad(const Json& json, const JsonArgs&,
186 ValidationErrors* errors) {
187 ValidationErrors::ScopedField field(errors, ".childPolicy");
188 Json child_policy_config_json_tmp;
189 const Json* child_policy_config_json;
190 auto it = json.object().find("childPolicy");
191 if (it == json.object().end()) {
192 child_policy_config_json_tmp = Json::FromArray({Json::FromObject({
193 {"round_robin", Json::FromObject({})},
194 })});
195 child_policy_config_json = &child_policy_config_json_tmp;
196 } else {
197 child_policy_config_json = &it->second;
198 }
199 auto child_policy_config =
200 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
201 *child_policy_config_json);
202 if (!child_policy_config.ok()) {
203 errors->AddError(child_policy_config.status().message());
204 return;
205 }
206 child_policy_ = std::move(*child_policy_config);
207 }
208
name() const209 absl::string_view name() const override { return kGrpclb; }
210
child_policy() const211 RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
212 return child_policy_;
213 }
214
service_name() const215 const std::string& service_name() const { return service_name_; }
216
217 private:
218 RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
219 std::string service_name_;
220 };
221
222 class GrpcLb final : public LoadBalancingPolicy {
223 public:
224 explicit GrpcLb(Args args);
225
name() const226 absl::string_view name() const override { return kGrpclb; }
227
228 absl::Status UpdateLocked(UpdateArgs args) override;
229 void ResetBackoffLocked() override;
230
231 private:
232 /// Contains a call to the LB server and all the data related to the call.
233 class BalancerCallState final
234 : public InternallyRefCounted<BalancerCallState> {
235 public:
236 explicit BalancerCallState(
237 RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
238 ~BalancerCallState() override;
239
240 // It's the caller's responsibility to ensure that Orphan() is called from
241 // inside the combiner.
242 void Orphan() override;
243
244 void StartQuery();
245
client_stats() const246 GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
247
seen_initial_response() const248 bool seen_initial_response() const { return seen_initial_response_; }
seen_serverlist() const249 bool seen_serverlist() const { return seen_serverlist_; }
250
251 private:
grpclb_policy() const252 GrpcLb* grpclb_policy() const {
253 return static_cast<GrpcLb*>(grpclb_policy_.get());
254 }
255
256 void ScheduleNextClientLoadReportLocked();
257 void SendClientLoadReportLocked();
258
259 // EventEngine callbacks
260 void MaybeSendClientLoadReportLocked();
261
262 static void ClientLoadReportDone(void* arg, grpc_error_handle error);
263 static void OnInitialRequestSent(void* arg, grpc_error_handle error);
264 static void OnBalancerMessageReceived(void* arg, grpc_error_handle error);
265 static void OnBalancerStatusReceived(void* arg, grpc_error_handle error);
266
267 void ClientLoadReportDoneLocked(grpc_error_handle error);
268 void OnInitialRequestSentLocked();
269 void OnBalancerMessageReceivedLocked();
270 void OnBalancerStatusReceivedLocked(grpc_error_handle error);
271
272 // The owning LB policy.
273 RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
274
275 // The streaming call to the LB server. Always non-NULL.
276 grpc_call* lb_call_ = nullptr;
277
278 // recv_initial_metadata
279 grpc_metadata_array lb_initial_metadata_recv_;
280
281 // send_message
282 grpc_byte_buffer* send_message_payload_ = nullptr;
283 grpc_closure lb_on_initial_request_sent_;
284
285 // recv_message
286 grpc_byte_buffer* recv_message_payload_ = nullptr;
287 grpc_closure lb_on_balancer_message_received_;
288 bool seen_initial_response_ = false;
289 bool seen_serverlist_ = false;
290
291 // recv_trailing_metadata
292 grpc_closure lb_on_balancer_status_received_;
293 grpc_metadata_array lb_trailing_metadata_recv_;
294 grpc_status_code lb_call_status_;
295 grpc_slice lb_call_status_details_;
296
297 // The stats for client-side load reporting associated with this LB call.
298 // Created after the first serverlist is received.
299 RefCountedPtr<GrpcLbClientStats> client_stats_;
300 Duration client_stats_report_interval_;
301 absl::optional<EventEngine::TaskHandle> client_load_report_handle_;
302 bool last_client_load_report_counters_were_zero_ = false;
303 bool client_load_report_is_due_ = false;
304 // The closure used for the completion of sending the load report.
305 grpc_closure client_load_report_done_closure_;
306 };
307
308 class SubchannelWrapper final : public DelegatingSubchannel {
309 public:
SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,RefCountedPtr<GrpcLb> lb_policy,grpc_event_engine::experimental::Slice lb_token,RefCountedPtr<GrpcLbClientStats> client_stats)310 SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,
311 RefCountedPtr<GrpcLb> lb_policy,
312 grpc_event_engine::experimental::Slice lb_token,
313 RefCountedPtr<GrpcLbClientStats> client_stats)
314 : DelegatingSubchannel(std::move(subchannel)),
315 lb_policy_(std::move(lb_policy)),
316 lb_token_(std::move(lb_token)),
317 client_stats_(std::move(client_stats)) {}
318
lb_token() const319 const grpc_event_engine::experimental::Slice& lb_token() const {
320 return lb_token_;
321 }
client_stats() const322 GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
323
324 private:
Orphaned()325 void Orphaned() override {
326 if (!IsWorkSerializerDispatchEnabled()) {
327 if (!lb_policy_->shutting_down_) {
328 lb_policy_->CacheDeletedSubchannelLocked(wrapped_subchannel());
329 }
330 return;
331 }
332 lb_policy_->work_serializer()->Run(
333 [self = WeakRefAsSubclass<SubchannelWrapper>()]() {
334 if (!self->lb_policy_->shutting_down_) {
335 self->lb_policy_->CacheDeletedSubchannelLocked(
336 self->wrapped_subchannel());
337 }
338 },
339 DEBUG_LOCATION);
340 }
341
342 RefCountedPtr<GrpcLb> lb_policy_;
343 grpc_event_engine::experimental::Slice lb_token_;
344 RefCountedPtr<GrpcLbClientStats> client_stats_;
345 };
346
347 class TokenAndClientStatsArg final
348 : public RefCounted<TokenAndClientStatsArg> {
349 public:
TokenAndClientStatsArg(grpc_event_engine::experimental::Slice lb_token,RefCountedPtr<GrpcLbClientStats> client_stats)350 TokenAndClientStatsArg(grpc_event_engine::experimental::Slice lb_token,
351 RefCountedPtr<GrpcLbClientStats> client_stats)
352 : lb_token_(std::move(lb_token)),
353 client_stats_(std::move(client_stats)) {}
354
ChannelArgName()355 static absl::string_view ChannelArgName() {
356 return GRPC_ARG_NO_SUBCHANNEL_PREFIX "grpclb_token_and_client_stats";
357 }
358
ChannelArgsCompare(const TokenAndClientStatsArg * a,const TokenAndClientStatsArg * b)359 static int ChannelArgsCompare(const TokenAndClientStatsArg* a,
360 const TokenAndClientStatsArg* b) {
361 int r =
362 a->lb_token_.as_string_view().compare(b->lb_token_.as_string_view());
363 if (r != 0) return r;
364 return QsortCompare(a->client_stats_.get(), b->client_stats_.get());
365 }
366
lb_token() const367 const grpc_event_engine::experimental::Slice& lb_token() const {
368 return lb_token_;
369 }
client_stats() const370 RefCountedPtr<GrpcLbClientStats> client_stats() const {
371 return client_stats_;
372 }
373
374 private:
375 grpc_event_engine::experimental::Slice lb_token_;
376 RefCountedPtr<GrpcLbClientStats> client_stats_;
377 };
378
379 class Serverlist final : public RefCounted<Serverlist> {
380 public:
381 // Takes ownership of serverlist.
Serverlist(std::vector<GrpcLbServer> serverlist)382 explicit Serverlist(std::vector<GrpcLbServer> serverlist)
383 : serverlist_(std::move(serverlist)) {}
384
385 bool operator==(const Serverlist& other) const;
386
serverlist() const387 const std::vector<GrpcLbServer>& serverlist() const { return serverlist_; }
388
389 // Returns a text representation suitable for logging.
390 std::string AsText() const;
391
392 // Extracts all non-drop entries into an EndpointAddressesIterator.
393 std::shared_ptr<EndpointAddressesIterator> GetServerAddressList(
394 GrpcLbClientStats* client_stats);
395
396 // Returns true if the serverlist contains at least one drop entry and
397 // no backend address entries.
398 bool ContainsAllDropEntries() const;
399
400 // Returns the LB token to use for a drop, or null if the call
401 // should not be dropped.
402 //
403 // Note: This is called from the picker, NOT from inside the control
404 // plane work_serializer.
405 const char* ShouldDrop();
406
407 private:
408 class AddressIterator;
409
410 std::vector<GrpcLbServer> serverlist_;
411
412 // Accessed from the picker, so needs synchronization.
413 std::atomic<size_t> drop_index_{0};
414 };
415
416 class Picker final : public SubchannelPicker {
417 public:
Picker(RefCountedPtr<Serverlist> serverlist,RefCountedPtr<SubchannelPicker> child_picker,RefCountedPtr<GrpcLbClientStats> client_stats)418 Picker(RefCountedPtr<Serverlist> serverlist,
419 RefCountedPtr<SubchannelPicker> child_picker,
420 RefCountedPtr<GrpcLbClientStats> client_stats)
421 : serverlist_(std::move(serverlist)),
422 child_picker_(std::move(child_picker)),
423 client_stats_(std::move(client_stats)) {}
424
425 PickResult Pick(PickArgs args) override;
426
427 private:
428 // A subchannel call tracker that unrefs the GrpcLbClientStats object
429 // in the case where the subchannel call is never actually started,
430 // since the client load reporting filter will not be able to do it
431 // in that case.
432 class SubchannelCallTracker final : public SubchannelCallTrackerInterface {
433 public:
SubchannelCallTracker(RefCountedPtr<GrpcLbClientStats> client_stats,std::unique_ptr<SubchannelCallTrackerInterface> original_call_tracker)434 SubchannelCallTracker(
435 RefCountedPtr<GrpcLbClientStats> client_stats,
436 std::unique_ptr<SubchannelCallTrackerInterface> original_call_tracker)
437 : client_stats_(std::move(client_stats)),
438 original_call_tracker_(std::move(original_call_tracker)) {}
439
Start()440 void Start() override {
441 if (original_call_tracker_ != nullptr) {
442 original_call_tracker_->Start();
443 }
444 // If we're actually starting the subchannel call, then the
445 // client load reporting filter will take ownership of the ref
446 // passed down to it via metadata.
447 client_stats_.release();
448 }
449
Finish(FinishArgs args)450 void Finish(FinishArgs args) override {
451 if (original_call_tracker_ != nullptr) {
452 original_call_tracker_->Finish(args);
453 }
454 }
455
456 private:
457 RefCountedPtr<GrpcLbClientStats> client_stats_;
458 std::unique_ptr<SubchannelCallTrackerInterface> original_call_tracker_;
459 };
460
461 // Serverlist to be used for determining drops.
462 RefCountedPtr<Serverlist> serverlist_;
463
464 RefCountedPtr<SubchannelPicker> child_picker_;
465 RefCountedPtr<GrpcLbClientStats> client_stats_;
466 };
467
468 class Helper final
469 : public ParentOwningDelegatingChannelControlHelper<GrpcLb> {
470 public:
Helper(RefCountedPtr<GrpcLb> parent)471 explicit Helper(RefCountedPtr<GrpcLb> parent)
472 : ParentOwningDelegatingChannelControlHelper(std::move(parent)) {}
473
474 RefCountedPtr<SubchannelInterface> CreateSubchannel(
475 const grpc_resolved_address& address,
476 const ChannelArgs& per_address_args, const ChannelArgs& args) override;
477 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
478 RefCountedPtr<SubchannelPicker> picker) override;
479 void RequestReresolution() override;
480 };
481
482 class StateWatcher final : public AsyncConnectivityStateWatcherInterface {
483 public:
StateWatcher(RefCountedPtr<GrpcLb> parent)484 explicit StateWatcher(RefCountedPtr<GrpcLb> parent)
485 : AsyncConnectivityStateWatcherInterface(parent->work_serializer()),
486 parent_(std::move(parent)) {}
487
~StateWatcher()488 ~StateWatcher() override { parent_.reset(DEBUG_LOCATION, "StateWatcher"); }
489
490 private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)491 void OnConnectivityStateChange(grpc_connectivity_state new_state,
492 const absl::Status& status) override {
493 if (parent_->fallback_at_startup_checks_pending_ &&
494 new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
495 // In TRANSIENT_FAILURE. Cancel the fallback timer and go into
496 // fallback mode immediately.
497 GRPC_TRACE_LOG(glb, INFO)
498 << "[grpclb " << parent_.get()
499 << "] balancer channel in state:TRANSIENT_FAILURE ("
500 << status.ToString() << "); entering fallback mode";
501 parent_->fallback_at_startup_checks_pending_ = false;
502 parent_->channel_control_helper()->GetEventEngine()->Cancel(
503 *parent_->lb_fallback_timer_handle_);
504 parent_->fallback_mode_ = true;
505 parent_->CreateOrUpdateChildPolicyLocked();
506 // Cancel the watch, since we don't care about the channel state once we
507 // go into fallback mode.
508 parent_->CancelBalancerChannelConnectivityWatchLocked();
509 }
510 }
511
512 RefCountedPtr<GrpcLb> parent_;
513 };
514
515 class NullLbTokenEndpointIterator;
516
517 void ShutdownLocked() override;
518
519 // Helper functions used in UpdateLocked().
520 absl::Status UpdateBalancerChannelLocked();
521
522 void CancelBalancerChannelConnectivityWatchLocked();
523
524 // Methods for dealing with fallback state.
525 void MaybeEnterFallbackModeAfterStartup();
526 void OnFallbackTimerLocked();
527
528 // Methods for dealing with the balancer call.
529 void StartBalancerCallLocked();
530 void StartBalancerCallRetryTimerLocked();
531 void OnBalancerCallRetryTimerLocked();
532
533 // Methods for dealing with the child policy.
534 ChannelArgs CreateChildPolicyArgsLocked(
535 bool is_backend_from_grpclb_load_balancer);
536 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
537 const ChannelArgs& args);
538 void CreateOrUpdateChildPolicyLocked();
539
540 // Subchannel caching.
541 void CacheDeletedSubchannelLocked(
542 RefCountedPtr<SubchannelInterface> subchannel);
543 void StartSubchannelCacheTimerLocked();
544 void OnSubchannelCacheTimerLocked();
545
546 // Configurations for the policy.
547 RefCountedPtr<GrpcLbConfig> config_;
548
549 // Current channel args from the resolver.
550 ChannelArgs args_;
551
552 // Internal state.
553 bool shutting_down_ = false;
554
555 // The channel for communicating with the LB server.
556 RefCountedPtr<Channel> lb_channel_;
557 StateWatcher* watcher_ = nullptr;
558 // Response generator to inject address updates into lb_channel_.
559 RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
560 // Parent channelz node.
561 RefCountedPtr<channelz::ChannelNode> parent_channelz_node_;
562
563 // The data associated with the current LB call. It holds a ref to this LB
564 // policy. It's initialized every time we query for backends. It's reset to
565 // NULL whenever the current LB call is no longer needed (e.g., the LB policy
566 // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
567 // contains a non-NULL lb_call_.
568 OrphanablePtr<BalancerCallState> lb_calld_;
569 // Timeout for the LB call. 0 means no deadline.
570 const Duration lb_call_timeout_;
571 // Balancer call retry state.
572 BackOff lb_call_backoff_;
573 absl::optional<EventEngine::TaskHandle> lb_call_retry_timer_handle_;
574
575 // The deserialized response from the balancer. May be nullptr until one
576 // such response has arrived.
577 RefCountedPtr<Serverlist> serverlist_;
578
579 // Whether we're in fallback mode.
580 bool fallback_mode_ = false;
581 // The backend addresses from the resolver.
582 absl::StatusOr<std::shared_ptr<NullLbTokenEndpointIterator>>
583 fallback_backend_addresses_;
584 // The last resolution note from our parent.
585 // To be passed to child policy when fallback_backend_addresses_ is empty.
586 std::string resolution_note_;
587 // State for fallback-at-startup checks.
588 // Timeout after startup after which we will go into fallback mode if
589 // we have not received a serverlist from the balancer.
590 const Duration fallback_at_startup_timeout_;
591 bool fallback_at_startup_checks_pending_ = false;
592 absl::optional<EventEngine::TaskHandle> lb_fallback_timer_handle_;
593
594 // The child policy to use for the backends.
595 OrphanablePtr<LoadBalancingPolicy> child_policy_;
596 // Child policy in state READY.
597 bool child_policy_ready_ = false;
598
599 // Deleted subchannel caching.
600 const Duration subchannel_cache_interval_;
601 std::map<Timestamp /*deletion time*/,
602 std::vector<RefCountedPtr<SubchannelInterface>>>
603 cached_subchannels_;
604 absl::optional<EventEngine::TaskHandle> subchannel_cache_timer_handle_;
605 };
606
607 //
608 // GrpcLb::Serverlist::AddressIterator
609 //
610
IsServerValid(const GrpcLbServer & server,size_t idx,bool log)611 bool IsServerValid(const GrpcLbServer& server, size_t idx, bool log) {
612 if (server.drop) return false;
613 if (GPR_UNLIKELY(server.port >> 16 != 0)) {
614 if (log) {
615 LOG(ERROR) << "Invalid port '" << server.port << "' at index " << idx
616 << " of serverlist. Ignoring.";
617 }
618 return false;
619 }
620 if (GPR_UNLIKELY(server.ip_size != 4 && server.ip_size != 16)) {
621 if (log) {
622 LOG(ERROR) << "Expected IP to be 4 or 16 bytes, got " << server.ip_size
623 << " at index " << idx << " of serverlist. Ignoring";
624 }
625 return false;
626 }
627 return true;
628 }
629
ParseServer(const GrpcLbServer & server,grpc_resolved_address * addr)630 void ParseServer(const GrpcLbServer& server, grpc_resolved_address* addr) {
631 memset(addr, 0, sizeof(*addr));
632 if (server.drop) return;
633 const uint16_t netorder_port = grpc_htons(static_cast<uint16_t>(server.port));
634 // the addresses are given in binary format (a in(6)_addr struct) in
635 // server->ip_address.bytes.
636 if (server.ip_size == 4) {
637 addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
638 grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
639 addr4->sin_family = GRPC_AF_INET;
640 memcpy(&addr4->sin_addr, server.ip_addr, server.ip_size);
641 addr4->sin_port = netorder_port;
642 } else if (server.ip_size == 16) {
643 addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
644 grpc_sockaddr_in6* addr6 =
645 reinterpret_cast<grpc_sockaddr_in6*>(&addr->addr);
646 addr6->sin6_family = GRPC_AF_INET6;
647 memcpy(&addr6->sin6_addr, server.ip_addr, server.ip_size);
648 addr6->sin6_port = netorder_port;
649 }
650 }
651
652 class GrpcLb::Serverlist::AddressIterator final
653 : public EndpointAddressesIterator {
654 public:
AddressIterator(RefCountedPtr<Serverlist> serverlist,RefCountedPtr<GrpcLbClientStats> client_stats)655 AddressIterator(RefCountedPtr<Serverlist> serverlist,
656 RefCountedPtr<GrpcLbClientStats> client_stats)
657 : serverlist_(std::move(serverlist)),
658 client_stats_(std::move(client_stats)) {}
659
ForEach(absl::FunctionRef<void (const EndpointAddresses &)> callback) const660 void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
661 const override {
662 for (size_t i = 0; i < serverlist_->serverlist_.size(); ++i) {
663 const GrpcLbServer& server = serverlist_->serverlist_[i];
664 if (!IsServerValid(server, i, false)) continue;
665 // Address processing.
666 grpc_resolved_address addr;
667 ParseServer(server, &addr);
668 // LB token processing.
669 const size_t lb_token_length = strnlen(
670 server.load_balance_token, GPR_ARRAY_SIZE(server.load_balance_token));
671 auto lb_token = grpc_event_engine::experimental::Slice::FromCopiedBuffer(
672 server.load_balance_token, lb_token_length);
673 if (lb_token.empty()) {
674 auto addr_uri = grpc_sockaddr_to_uri(&addr);
675 GRPC_TRACE_LOG(glb, INFO)
676 << "Missing LB token for backend address '"
677 << (addr_uri.ok() ? *addr_uri : addr_uri.status().ToString())
678 << "'. The empty token will be used instead";
679 }
680 // Return address with a channel arg containing LB token and stats object.
681 callback(EndpointAddresses(
682 addr, ChannelArgs().SetObject(MakeRefCounted<TokenAndClientStatsArg>(
683 std::move(lb_token), client_stats_))));
684 }
685 }
686
687 private:
688 RefCountedPtr<Serverlist> serverlist_;
689 RefCountedPtr<GrpcLbClientStats> client_stats_;
690 };
691
692 //
693 // GrpcLb::Serverlist
694 //
695
operator ==(const Serverlist & other) const696 bool GrpcLb::Serverlist::operator==(const Serverlist& other) const {
697 return serverlist_ == other.serverlist_;
698 }
699
AsText() const700 std::string GrpcLb::Serverlist::AsText() const {
701 std::vector<std::string> entries;
702 for (size_t i = 0; i < serverlist_.size(); ++i) {
703 const GrpcLbServer& server = serverlist_[i];
704 std::string ipport;
705 if (server.drop) {
706 ipport = "(drop)";
707 } else {
708 grpc_resolved_address addr;
709 ParseServer(server, &addr);
710 auto addr_str = grpc_sockaddr_to_string(&addr, false);
711 ipport = addr_str.ok() ? *addr_str : addr_str.status().ToString();
712 }
713 entries.push_back(absl::StrFormat(" %" PRIuPTR ": %s token=%s\n", i,
714 ipport, server.load_balance_token));
715 }
716 return absl::StrJoin(entries, "");
717 }
718
719 // Returns addresses extracted from the serverlist.
720 std::shared_ptr<EndpointAddressesIterator>
GetServerAddressList(GrpcLbClientStats * client_stats)721 GrpcLb::Serverlist::GetServerAddressList(GrpcLbClientStats* client_stats) {
722 RefCountedPtr<GrpcLbClientStats> stats;
723 if (client_stats != nullptr) stats = client_stats->Ref();
724 return std::make_shared<AddressIterator>(Ref(), std::move(stats));
725 }
726
ContainsAllDropEntries() const727 bool GrpcLb::Serverlist::ContainsAllDropEntries() const {
728 if (serverlist_.empty()) return false;
729 for (const GrpcLbServer& server : serverlist_) {
730 if (!server.drop) return false;
731 }
732 return true;
733 }
734
ShouldDrop()735 const char* GrpcLb::Serverlist::ShouldDrop() {
736 if (serverlist_.empty()) return nullptr;
737 size_t index = drop_index_.fetch_add(1, std::memory_order_relaxed);
738 GrpcLbServer& server = serverlist_[index % serverlist_.size()];
739 return server.drop ? server.load_balance_token : nullptr;
740 }
741
742 //
743 // GrpcLb::Picker
744 //
745
Pick(PickArgs args)746 GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
747 // Check if we should drop the call.
748 const char* drop_token =
749 serverlist_ == nullptr ? nullptr : serverlist_->ShouldDrop();
750 if (drop_token != nullptr) {
751 // Update client load reporting stats to indicate the number of
752 // dropped calls. Note that we have to do this here instead of in
753 // the client_load_reporting filter, because we do not create a
754 // subchannel call (and therefore no client_load_reporting filter)
755 // for dropped calls.
756 if (client_stats_ != nullptr) {
757 client_stats_->AddCallDropped(drop_token);
758 }
759 return PickResult::Drop(
760 absl::UnavailableError("drop directed by grpclb balancer"));
761 }
762 // Forward pick to child policy.
763 PickResult result = child_picker_->Pick(args);
764 // If pick succeeded, add LB token to initial metadata.
765 auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
766 if (complete_pick != nullptr) {
767 const SubchannelWrapper* subchannel_wrapper =
768 static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
769 // Encode client stats object into metadata for use by
770 // client_load_reporting filter.
771 GrpcLbClientStats* client_stats = subchannel_wrapper->client_stats();
772 if (client_stats != nullptr) {
773 complete_pick->subchannel_call_tracker =
774 std::make_unique<SubchannelCallTracker>(
775 client_stats->Ref(),
776 std::move(complete_pick->subchannel_call_tracker));
777 // The metadata value is a hack: we pretend the pointer points to
778 // a string and rely on the client_load_reporting filter to know
779 // how to interpret it.
780 // NOLINTBEGIN(bugprone-string-constructor)
781 complete_pick->metadata_mutations.Set(
782 GrpcLbClientStatsMetadata::key(),
783 grpc_event_engine::experimental::Slice(grpc_slice_from_static_buffer(
784 reinterpret_cast<const char*>(client_stats), 0)));
785 // NOLINTEND(bugprone-string-constructor)
786 // Update calls-started.
787 client_stats->AddCallStarted();
788 }
789 // Encode the LB token in metadata.
790 // Create a new copy on the call arena, since the subchannel list
791 // may get refreshed between when we return this pick and when the
792 // initial metadata goes out on the wire.
793 if (!subchannel_wrapper->lb_token().empty()) {
794 complete_pick->metadata_mutations.Set(
795 LbTokenMetadata::key(), subchannel_wrapper->lb_token().Ref());
796 }
797 // Unwrap subchannel to pass up to the channel.
798 complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
799 }
800 return result;
801 }
802
803 //
804 // GrpcLb::Helper
805 //
806
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)807 RefCountedPtr<SubchannelInterface> GrpcLb::Helper::CreateSubchannel(
808 const grpc_resolved_address& address, const ChannelArgs& per_address_args,
809 const ChannelArgs& args) {
810 if (parent()->shutting_down_) return nullptr;
811 const auto* arg = per_address_args.GetObject<TokenAndClientStatsArg>();
812 if (arg == nullptr) {
813 auto addr_str = grpc_sockaddr_to_string(&address, false);
814 Crash(
815 absl::StrFormat("[grpclb %p] no TokenAndClientStatsArg for address %s",
816 parent(), addr_str.value_or("N/A").c_str()));
817 }
818 return MakeRefCounted<SubchannelWrapper>(
819 parent()->channel_control_helper()->CreateSubchannel(
820 address, per_address_args, args),
821 parent()->RefAsSubclass<GrpcLb>(DEBUG_LOCATION, "SubchannelWrapper"),
822 arg->lb_token().Ref(), arg->client_stats());
823 }
824
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)825 void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
826 const absl::Status& status,
827 RefCountedPtr<SubchannelPicker> picker) {
828 if (parent()->shutting_down_) return;
829 // Record whether child policy reports READY.
830 parent()->child_policy_ready_ = state == GRPC_CHANNEL_READY;
831 // Enter fallback mode if needed.
832 parent()->MaybeEnterFallbackModeAfterStartup();
833 // We pass the serverlist to the picker so that it can handle drops.
834 // However, we don't want to handle drops in the case where the child
835 // policy is reporting a state other than READY (unless we are
836 // dropping *all* calls), because we don't want to process drops for picks
837 // that yield a QUEUE result; this would result in dropping too many calls,
838 // since we will see the queued picks multiple times, and we'd consider each
839 // one a separate call for the drop calculation. So in this case, we pass
840 // a null serverlist to the picker, which tells it not to do drops.
841 RefCountedPtr<Serverlist> serverlist;
842 if (state == GRPC_CHANNEL_READY ||
843 (parent()->serverlist_ != nullptr &&
844 parent()->serverlist_->ContainsAllDropEntries())) {
845 serverlist = parent()->serverlist_;
846 }
847 RefCountedPtr<GrpcLbClientStats> client_stats;
848 if (parent()->lb_calld_ != nullptr &&
849 parent()->lb_calld_->client_stats() != nullptr) {
850 client_stats = parent()->lb_calld_->client_stats()->Ref();
851 }
852 GRPC_TRACE_LOG(glb, INFO)
853 << "[grpclb " << parent() << " helper " << this
854 << "] state=" << ConnectivityStateName(state) << " (" << status.ToString()
855 << ") wrapping child picker " << picker.get()
856 << " (serverlist=" << serverlist.get()
857 << ", client_stats=" << client_stats.get() << ")";
858 parent()->channel_control_helper()->UpdateState(
859 state, status,
860 MakeRefCounted<Picker>(std::move(serverlist), std::move(picker),
861 std::move(client_stats)));
862 }
863
RequestReresolution()864 void GrpcLb::Helper::RequestReresolution() {
865 if (parent()->shutting_down_) return;
866 // Ignore if we're not in fallback mode, because if we got the backend
867 // addresses from the balancer, re-resolving is not going to fix it.
868 if (!parent()->fallback_mode_) return;
869 parent()->channel_control_helper()->RequestReresolution();
870 }
871
872 //
873 // GrpcLb::BalancerCallState
874 //
875
BalancerCallState(RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)876 GrpcLb::BalancerCallState::BalancerCallState(
877 RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
878 : InternallyRefCounted<BalancerCallState>(
879 GRPC_TRACE_FLAG_ENABLED(glb) ? "BalancerCallState" : nullptr),
880 grpclb_policy_(std::move(parent_grpclb_policy)) {
881 CHECK(grpclb_policy_ != nullptr);
882 CHECK(!grpclb_policy()->shutting_down_);
883 // Init the LB call. Note that the LB call will progress every time there's
884 // activity in grpclb_policy_->interested_parties(), which is comprised of
885 // the polling entities from client_channel.
886 GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSent, this,
887 grpc_schedule_on_exec_ctx);
888 GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
889 OnBalancerMessageReceived, this, grpc_schedule_on_exec_ctx);
890 GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, OnBalancerStatusReceived,
891 this, grpc_schedule_on_exec_ctx);
892 GRPC_CLOSURE_INIT(&client_load_report_done_closure_, ClientLoadReportDone,
893 this, grpc_schedule_on_exec_ctx);
894 const Timestamp deadline =
895 grpclb_policy()->lb_call_timeout_ == Duration::Zero()
896 ? Timestamp::InfFuture()
897 : Timestamp::Now() + grpclb_policy()->lb_call_timeout_;
898 lb_call_ = grpclb_policy()->lb_channel_->CreateCall(
899 /*parent_call=*/nullptr, GRPC_PROPAGATE_DEFAULTS,
900 /*cq=*/nullptr, grpclb_policy_->interested_parties(),
901 Slice::FromStaticString("/grpc.lb.v1.LoadBalancer/BalanceLoad"),
902 /*authority=*/absl::nullopt, deadline, /*registered_method=*/true);
903 // Init the LB call request payload.
904 upb::Arena arena;
905 grpc_slice request_payload_slice = GrpcLbRequestCreate(
906 grpclb_policy()->config_->service_name().empty()
907 ? grpclb_policy()->channel_control_helper()->GetAuthority()
908 : grpclb_policy()->config_->service_name(),
909 arena.ptr());
910 send_message_payload_ =
911 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
912 CSliceUnref(request_payload_slice);
913 // Init other data associated with the LB call.
914 grpc_metadata_array_init(&lb_initial_metadata_recv_);
915 grpc_metadata_array_init(&lb_trailing_metadata_recv_);
916 }
917
~BalancerCallState()918 GrpcLb::BalancerCallState::~BalancerCallState() {
919 CHECK_NE(lb_call_, nullptr);
920 grpc_call_unref(lb_call_);
921 grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
922 grpc_metadata_array_destroy(&lb_trailing_metadata_recv_);
923 grpc_byte_buffer_destroy(send_message_payload_);
924 grpc_byte_buffer_destroy(recv_message_payload_);
925 CSliceUnref(lb_call_status_details_);
926 }
927
Orphan()928 void GrpcLb::BalancerCallState::Orphan() {
929 CHECK_NE(lb_call_, nullptr);
930 // If we are here because grpclb_policy wants to cancel the call,
931 // lb_on_balancer_status_received_ will complete the cancellation and clean
932 // up. Otherwise, we are here because grpclb_policy has to orphan a failed
933 // call, then the following cancellation will be a no-op.
934 grpc_call_cancel_internal(lb_call_);
935 if (client_load_report_handle_.has_value() &&
936 grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
937 client_load_report_handle_.value())) {
938 Unref(DEBUG_LOCATION, "client_load_report cancelled");
939 }
940 // Note that the initial ref is hold by lb_on_balancer_status_received_
941 // instead of the caller of this function. So the corresponding unref happens
942 // in lb_on_balancer_status_received_ instead of here.
943 }
944
StartQuery()945 void GrpcLb::BalancerCallState::StartQuery() {
946 CHECK_NE(lb_call_, nullptr);
947 GRPC_TRACE_LOG(glb, INFO)
948 << "[grpclb " << grpclb_policy_.get() << "] lb_calld=" << this
949 << ": Starting LB call " << lb_call_;
950 // Create the ops.
951 grpc_call_error call_error;
952 grpc_op ops[3];
953 memset(ops, 0, sizeof(ops));
954 // Op: send initial metadata.
955 grpc_op* op = ops;
956 op->op = GRPC_OP_SEND_INITIAL_METADATA;
957 op->data.send_initial_metadata.count = 0;
958 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
959 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
960 op->reserved = nullptr;
961 op++;
962 // Op: send request message.
963 CHECK_NE(send_message_payload_, nullptr);
964 op->op = GRPC_OP_SEND_MESSAGE;
965 op->data.send_message.send_message = send_message_payload_;
966 op->flags = 0;
967 op->reserved = nullptr;
968 op++;
969 // TODO(roth): We currently track this ref manually. Once the
970 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
971 // with the callback.
972 auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
973 self.release();
974 call_error = grpc_call_start_batch_and_execute(lb_call_, ops,
975 static_cast<size_t>(op - ops),
976 &lb_on_initial_request_sent_);
977 CHECK_EQ(call_error, GRPC_CALL_OK);
978 // Op: recv initial metadata.
979 op = ops;
980 op->op = GRPC_OP_RECV_INITIAL_METADATA;
981 op->data.recv_initial_metadata.recv_initial_metadata =
982 &lb_initial_metadata_recv_;
983 op->flags = 0;
984 op->reserved = nullptr;
985 op++;
986 // Op: recv response.
987 op->op = GRPC_OP_RECV_MESSAGE;
988 op->data.recv_message.recv_message = &recv_message_payload_;
989 op->flags = 0;
990 op->reserved = nullptr;
991 op++;
992 // TODO(roth): We currently track this ref manually. Once the
993 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
994 // with the callback.
995 self = Ref(DEBUG_LOCATION, "on_message_received");
996 self.release();
997 call_error = grpc_call_start_batch_and_execute(
998 lb_call_, ops, static_cast<size_t>(op - ops),
999 &lb_on_balancer_message_received_);
1000 CHECK_EQ(call_error, GRPC_CALL_OK);
1001 // Op: recv server status.
1002 op = ops;
1003 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1004 op->data.recv_status_on_client.trailing_metadata =
1005 &lb_trailing_metadata_recv_;
1006 op->data.recv_status_on_client.status = &lb_call_status_;
1007 op->data.recv_status_on_client.status_details = &lb_call_status_details_;
1008 op->flags = 0;
1009 op->reserved = nullptr;
1010 op++;
1011 // This callback signals the end of the LB call, so it relies on the initial
1012 // ref instead of a new ref. When it's invoked, it's the initial ref that is
1013 // unreffed.
1014 call_error = grpc_call_start_batch_and_execute(
1015 lb_call_, ops, static_cast<size_t>(op - ops),
1016 &lb_on_balancer_status_received_);
1017 CHECK_EQ(call_error, GRPC_CALL_OK);
1018 }
1019
ScheduleNextClientLoadReportLocked()1020 void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
1021 client_load_report_handle_ =
1022 grpclb_policy()->channel_control_helper()->GetEventEngine()->RunAfter(
1023 client_stats_report_interval_, [this] {
1024 ApplicationCallbackExecCtx callback_exec_ctx;
1025 ExecCtx exec_ctx;
1026 grpclb_policy()->work_serializer()->Run(
1027 [this] { MaybeSendClientLoadReportLocked(); }, DEBUG_LOCATION);
1028 });
1029 }
1030
MaybeSendClientLoadReportLocked()1031 void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked() {
1032 client_load_report_handle_.reset();
1033 if (this != grpclb_policy()->lb_calld_.get()) {
1034 Unref(DEBUG_LOCATION, "client_load_report");
1035 return;
1036 }
1037 // If we've already sent the initial request, then we can go ahead and send
1038 // the load report. Otherwise, we need to wait until the initial request has
1039 // been sent to send this (see OnInitialRequestSentLocked()).
1040 if (send_message_payload_ == nullptr) {
1041 SendClientLoadReportLocked();
1042 } else {
1043 client_load_report_is_due_ = true;
1044 }
1045 }
1046
SendClientLoadReportLocked()1047 void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
1048 // Construct message payload.
1049 CHECK_EQ(send_message_payload_, nullptr);
1050 // Get snapshot of stats.
1051 int64_t num_calls_started;
1052 int64_t num_calls_finished;
1053 int64_t num_calls_finished_with_client_failed_to_send;
1054 int64_t num_calls_finished_known_received;
1055 std::unique_ptr<GrpcLbClientStats::DroppedCallCounts> drop_token_counts;
1056 client_stats_->Get(&num_calls_started, &num_calls_finished,
1057 &num_calls_finished_with_client_failed_to_send,
1058 &num_calls_finished_known_received, &drop_token_counts);
1059 // Skip client load report if the counters were all zero in the last
1060 // report and they are still zero in this one.
1061 if (num_calls_started == 0 && num_calls_finished == 0 &&
1062 num_calls_finished_with_client_failed_to_send == 0 &&
1063 num_calls_finished_known_received == 0 &&
1064 (drop_token_counts == nullptr || drop_token_counts->empty())) {
1065 if (last_client_load_report_counters_were_zero_) {
1066 ScheduleNextClientLoadReportLocked();
1067 return;
1068 }
1069 last_client_load_report_counters_were_zero_ = true;
1070 } else {
1071 last_client_load_report_counters_were_zero_ = false;
1072 }
1073 // Populate load report.
1074 upb::Arena arena;
1075 grpc_slice request_payload_slice = GrpcLbLoadReportRequestCreate(
1076 num_calls_started, num_calls_finished,
1077 num_calls_finished_with_client_failed_to_send,
1078 num_calls_finished_known_received, drop_token_counts.get(), arena.ptr());
1079 send_message_payload_ =
1080 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1081 CSliceUnref(request_payload_slice);
1082 // Send the report.
1083 grpc_op op;
1084 memset(&op, 0, sizeof(op));
1085 op.op = GRPC_OP_SEND_MESSAGE;
1086 op.data.send_message.send_message = send_message_payload_;
1087 grpc_call_error call_error = grpc_call_start_batch_and_execute(
1088 lb_call_, &op, 1, &client_load_report_done_closure_);
1089 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1090 LOG(ERROR) << "[grpclb " << grpclb_policy_.get() << "] lb_calld=" << this
1091 << " call_error=" << call_error << " sending client load report";
1092 CHECK_EQ(call_error, GRPC_CALL_OK);
1093 }
1094 }
1095
ClientLoadReportDone(void * arg,grpc_error_handle error)1096 void GrpcLb::BalancerCallState::ClientLoadReportDone(void* arg,
1097 grpc_error_handle error) {
1098 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1099 lb_calld->grpclb_policy()->work_serializer()->Run(
1100 [lb_calld, error]() { lb_calld->ClientLoadReportDoneLocked(error); },
1101 DEBUG_LOCATION);
1102 }
1103
ClientLoadReportDoneLocked(grpc_error_handle error)1104 void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(
1105 grpc_error_handle error) {
1106 grpc_byte_buffer_destroy(send_message_payload_);
1107 send_message_payload_ = nullptr;
1108 if (!error.ok() || this != grpclb_policy()->lb_calld_.get()) {
1109 Unref(DEBUG_LOCATION, "client_load_report");
1110 return;
1111 }
1112 ScheduleNextClientLoadReportLocked();
1113 }
1114
OnInitialRequestSent(void * arg,grpc_error_handle)1115 void GrpcLb::BalancerCallState::OnInitialRequestSent(
1116 void* arg, grpc_error_handle /*error*/) {
1117 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1118 lb_calld->grpclb_policy()->work_serializer()->Run(
1119 [lb_calld]() { lb_calld->OnInitialRequestSentLocked(); }, DEBUG_LOCATION);
1120 }
1121
OnInitialRequestSentLocked()1122 void GrpcLb::BalancerCallState::OnInitialRequestSentLocked() {
1123 grpc_byte_buffer_destroy(send_message_payload_);
1124 send_message_payload_ = nullptr;
1125 // If we attempted to send a client load report before the initial request was
1126 // sent (and this lb_calld is still in use), send the load report now.
1127 if (client_load_report_is_due_ && this == grpclb_policy()->lb_calld_.get()) {
1128 SendClientLoadReportLocked();
1129 client_load_report_is_due_ = false;
1130 }
1131 Unref(DEBUG_LOCATION, "on_initial_request_sent");
1132 }
1133
OnBalancerMessageReceived(void * arg,grpc_error_handle)1134 void GrpcLb::BalancerCallState::OnBalancerMessageReceived(
1135 void* arg, grpc_error_handle /*error*/) {
1136 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1137 lb_calld->grpclb_policy()->work_serializer()->Run(
1138 [lb_calld]() { lb_calld->OnBalancerMessageReceivedLocked(); },
1139 DEBUG_LOCATION);
1140 }
1141
OnBalancerMessageReceivedLocked()1142 void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
1143 // Null payload means the LB call was cancelled.
1144 if (this != grpclb_policy()->lb_calld_.get() ||
1145 recv_message_payload_ == nullptr) {
1146 Unref(DEBUG_LOCATION, "on_message_received");
1147 return;
1148 }
1149 grpc_byte_buffer_reader bbr;
1150 grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1151 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1152 grpc_byte_buffer_reader_destroy(&bbr);
1153 grpc_byte_buffer_destroy(recv_message_payload_);
1154 recv_message_payload_ = nullptr;
1155 GrpcLbResponse response;
1156 upb::Arena arena;
1157 if (!GrpcLbResponseParse(response_slice, arena.ptr(), &response) ||
1158 (response.type == response.INITIAL && seen_initial_response_)) {
1159 if (absl::MinLogLevel() <= absl::LogSeverityAtLeast::kError) {
1160 char* response_slice_str =
1161 grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
1162 LOG(ERROR) << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
1163 << ": Invalid LB response received: '" << response_slice_str
1164 << "'. Ignoring.";
1165 gpr_free(response_slice_str);
1166 }
1167 } else {
1168 switch (response.type) {
1169 case response.INITIAL: {
1170 if (response.client_stats_report_interval != Duration::Zero()) {
1171 client_stats_report_interval_ = std::max(
1172 Duration::Seconds(1), response.client_stats_report_interval);
1173 GRPC_TRACE_LOG(glb, INFO)
1174 << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
1175 << ": Received initial LB response message; client load "
1176 "reporting interval = "
1177 << client_stats_report_interval_.millis() << " milliseconds";
1178 } else {
1179 GRPC_TRACE_LOG(glb, INFO)
1180 << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
1181 << ": Received initial LB response message; client load "
1182 "reporting NOT enabled";
1183 }
1184 seen_initial_response_ = true;
1185 break;
1186 }
1187 case response.SERVERLIST: {
1188 CHECK_NE(lb_call_, nullptr);
1189 auto serverlist_wrapper =
1190 MakeRefCounted<Serverlist>(std::move(response.serverlist));
1191 GRPC_TRACE_LOG(glb, INFO)
1192 << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
1193 << ": Serverlist with " << serverlist_wrapper->serverlist().size()
1194 << " servers received:\n"
1195 << serverlist_wrapper->AsText();
1196 seen_serverlist_ = true;
1197 // Start sending client load report only after we start using the
1198 // serverlist returned from the current LB call.
1199 if (client_stats_report_interval_ > Duration::Zero() &&
1200 client_stats_ == nullptr) {
1201 client_stats_ = MakeRefCounted<GrpcLbClientStats>();
1202 // Ref held by callback.
1203 Ref(DEBUG_LOCATION, "client_load_report").release();
1204 ScheduleNextClientLoadReportLocked();
1205 }
1206 // Check if the serverlist differs from the previous one.
1207 if (grpclb_policy()->serverlist_ != nullptr &&
1208 *grpclb_policy()->serverlist_ == *serverlist_wrapper) {
1209 GRPC_TRACE_LOG(glb, INFO)
1210 << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
1211 << ": Incoming server list identical to current, "
1212 "ignoring.";
1213 } else { // New serverlist.
1214 // Dispose of the fallback.
1215 // TODO(roth): Ideally, we should stay in fallback mode until we
1216 // know that we can reach at least one of the backends in the new
1217 // serverlist. Unfortunately, we can't do that, since we need to
1218 // send the new addresses to the child policy in order to determine
1219 // if they are reachable, and if we don't exit fallback mode now,
1220 // CreateOrUpdateChildPolicyLocked() will use the fallback
1221 // addresses instead of the addresses from the new serverlist.
1222 // However, if we can't reach any of the servers in the new
1223 // serverlist, then the child policy will never switch away from
1224 // the fallback addresses, but the grpclb policy will still think
1225 // that we're not in fallback mode, which means that we won't send
1226 // updates to the child policy when the fallback addresses are
1227 // updated by the resolver. This is sub-optimal, but the only way
1228 // to fix it is to maintain a completely separate child policy for
1229 // fallback mode, and that's more work than we want to put into
1230 // the grpclb implementation at this point, since we're deprecating
1231 // it in favor of the xds policy. We will implement this the
1232 // right way in the xds policy instead.
1233 if (grpclb_policy()->fallback_mode_) {
1234 LOG(INFO) << "[grpclb " << grpclb_policy()
1235 << "] Received response from balancer; exiting fallback "
1236 "mode";
1237 grpclb_policy()->fallback_mode_ = false;
1238 }
1239 if (grpclb_policy()->fallback_at_startup_checks_pending_) {
1240 grpclb_policy()->fallback_at_startup_checks_pending_ = false;
1241 grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
1242 *grpclb_policy()->lb_fallback_timer_handle_);
1243 grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
1244 }
1245 // Update the serverlist in the GrpcLb instance. This serverlist
1246 // instance will be destroyed either upon the next update or when the
1247 // GrpcLb instance is destroyed.
1248 grpclb_policy()->serverlist_ = std::move(serverlist_wrapper);
1249 grpclb_policy()->CreateOrUpdateChildPolicyLocked();
1250 }
1251 break;
1252 }
1253 case response.FALLBACK: {
1254 if (!grpclb_policy()->fallback_mode_) {
1255 LOG(INFO) << "[grpclb " << grpclb_policy()
1256 << "] Entering fallback mode as requested by balancer";
1257 if (grpclb_policy()->fallback_at_startup_checks_pending_) {
1258 grpclb_policy()->fallback_at_startup_checks_pending_ = false;
1259 grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
1260 *grpclb_policy()->lb_fallback_timer_handle_);
1261 grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
1262 }
1263 grpclb_policy()->fallback_mode_ = true;
1264 grpclb_policy()->CreateOrUpdateChildPolicyLocked();
1265 // Reset serverlist, so that if the balancer exits fallback
1266 // mode by sending the same serverlist we were previously
1267 // using, we don't incorrectly ignore it as a duplicate.
1268 grpclb_policy()->serverlist_.reset();
1269 }
1270 break;
1271 }
1272 }
1273 }
1274 CSliceUnref(response_slice);
1275 if (!grpclb_policy()->shutting_down_) {
1276 // Keep listening for serverlist updates.
1277 grpc_op op;
1278 memset(&op, 0, sizeof(op));
1279 op.op = GRPC_OP_RECV_MESSAGE;
1280 op.data.recv_message.recv_message = &recv_message_payload_;
1281 op.flags = 0;
1282 op.reserved = nullptr;
1283 // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
1284 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
1285 lb_call_, &op, 1, &lb_on_balancer_message_received_);
1286 CHECK_EQ(call_error, GRPC_CALL_OK);
1287 } else {
1288 Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
1289 }
1290 }
1291
OnBalancerStatusReceived(void * arg,grpc_error_handle error)1292 void GrpcLb::BalancerCallState::OnBalancerStatusReceived(
1293 void* arg, grpc_error_handle error) {
1294 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1295 lb_calld->grpclb_policy()->work_serializer()->Run(
1296 [lb_calld, error]() { lb_calld->OnBalancerStatusReceivedLocked(error); },
1297 DEBUG_LOCATION);
1298 }
1299
OnBalancerStatusReceivedLocked(grpc_error_handle error)1300 void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
1301 grpc_error_handle error) {
1302 CHECK_NE(lb_call_, nullptr);
1303 if (GRPC_TRACE_FLAG_ENABLED(glb)) {
1304 char* status_details = grpc_slice_to_c_string(lb_call_status_details_);
1305 LOG(INFO) << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
1306 << ": Status from LB server received. Status = "
1307 << lb_call_status_ << ", details = '" << status_details
1308 << "', (lb_call: " << lb_call_ << "), error '"
1309 << StatusToString(error) << "'";
1310 gpr_free(status_details);
1311 }
1312 // If this lb_calld is still in use, this call ended because of a failure so
1313 // we want to retry connecting. Otherwise, we have deliberately ended this
1314 // call and no further action is required.
1315 if (this == grpclb_policy()->lb_calld_.get()) {
1316 // If the fallback-at-startup checks are pending, go into fallback mode
1317 // immediately. This short-circuits the timeout for the fallback-at-startup
1318 // case.
1319 grpclb_policy()->lb_calld_.reset();
1320 if (grpclb_policy()->fallback_at_startup_checks_pending_) {
1321 CHECK(!seen_serverlist_);
1322 LOG(INFO) << "[grpclb " << grpclb_policy()
1323 << "] Balancer call finished without receiving serverlist; "
1324 "entering fallback mode";
1325 grpclb_policy()->fallback_at_startup_checks_pending_ = false;
1326 grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
1327 *grpclb_policy()->lb_fallback_timer_handle_);
1328 grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
1329 grpclb_policy()->fallback_mode_ = true;
1330 grpclb_policy()->CreateOrUpdateChildPolicyLocked();
1331 } else {
1332 // This handles the fallback-after-startup case.
1333 grpclb_policy()->MaybeEnterFallbackModeAfterStartup();
1334 }
1335 CHECK(!grpclb_policy()->shutting_down_);
1336 grpclb_policy()->channel_control_helper()->RequestReresolution();
1337 if (seen_initial_response_) {
1338 // If we lose connection to the LB server, reset the backoff and restart
1339 // the LB call immediately.
1340 grpclb_policy()->lb_call_backoff_.Reset();
1341 grpclb_policy()->StartBalancerCallLocked();
1342 } else {
1343 // If this LB call fails establishing any connection to the LB server,
1344 // retry later.
1345 grpclb_policy()->StartBalancerCallRetryTimerLocked();
1346 }
1347 }
1348 Unref(DEBUG_LOCATION, "lb_call_ended");
1349 }
1350
1351 //
1352 // helper code for creating balancer channel
1353 //
1354
ExtractBalancerAddresses(const ChannelArgs & args)1355 EndpointAddressesList ExtractBalancerAddresses(const ChannelArgs& args) {
1356 const EndpointAddressesList* endpoints =
1357 FindGrpclbBalancerAddressesInChannelArgs(args);
1358 if (endpoints != nullptr) return *endpoints;
1359 return EndpointAddressesList();
1360 }
1361
1362 // Returns the channel args for the LB channel, used to create a bidirectional
1363 // stream for the reception of load balancing updates.
1364 //
1365 // Inputs:
1366 // - \a response_generator: in order to propagate updates from the resolver
1367 // above the grpclb policy.
1368 // - \a args: other args inherited from the grpclb policy.
BuildBalancerChannelArgs(FakeResolverResponseGenerator * response_generator,const ChannelArgs & args)1369 ChannelArgs BuildBalancerChannelArgs(
1370 FakeResolverResponseGenerator* response_generator,
1371 const ChannelArgs& args) {
1372 ChannelArgs grpclb_channel_args;
1373 const grpc_channel_args* lb_channel_specific_args =
1374 args.GetPointer<grpc_channel_args>(
1375 GRPC_ARG_EXPERIMENTAL_GRPCLB_CHANNEL_ARGS);
1376 if (lb_channel_specific_args != nullptr) {
1377 grpclb_channel_args = ChannelArgs::FromC(lb_channel_specific_args);
1378 } else {
1379 // Set grpclb_channel_args based on the parent channel's channel args.
1380 grpclb_channel_args =
1381 args
1382 // LB policy name, since we want to use the default (pick_first) in
1383 // the LB channel.
1384 .Remove(GRPC_ARG_LB_POLICY_NAME)
1385 // Strip out the service config, since we don't want the LB policy
1386 // config specified for the parent channel to affect the LB channel.
1387 .Remove(GRPC_ARG_SERVICE_CONFIG)
1388 // The fake resolver response generator, because we are replacing it
1389 // with the one from the grpclb policy, used to propagate updates to
1390 // the LB channel.
1391 .Remove(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR)
1392 // The LB channel should use the authority indicated by the target
1393 // authority table (see \a ModifyGrpclbBalancerChannelArgs),
1394 // as opposed to the authority from the parent channel.
1395 .Remove(GRPC_ARG_DEFAULT_AUTHORITY)
1396 // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should
1397 // be treated as a stand-alone channel and not inherit this argument
1398 // from the args of the parent channel.
1399 .Remove(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)
1400 // Don't want to pass down channelz node from parent; the balancer
1401 // channel will get its own.
1402 .Remove(GRPC_ARG_CHANNELZ_CHANNEL_NODE)
1403 // Remove the channel args for channel credentials and replace it
1404 // with a version that does not contain call credentials. The
1405 // loadbalancer is not necessarily trusted to handle bearer token
1406 // credentials.
1407 .Remove(GRPC_ARG_CHANNEL_CREDENTIALS);
1408 }
1409 return grpclb_channel_args
1410 // A channel arg indicating the target is a grpclb load balancer.
1411 .Set(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER, 1)
1412 // Tells channelz that this is an internal channel.
1413 .Set(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL, 1)
1414 // The fake resolver response generator, which we use to inject
1415 // address updates into the LB channel.
1416 .SetObject(response_generator->Ref());
1417 }
1418
1419 //
1420 // ctor and dtor
1421 //
1422
GrpcLb(Args args)1423 GrpcLb::GrpcLb(Args args)
1424 : LoadBalancingPolicy(std::move(args)),
1425 response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
1426 lb_call_timeout_(std::max(
1427 Duration::Zero(),
1428 channel_args()
1429 .GetDurationFromIntMillis(GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS)
1430 .value_or(Duration::Zero()))),
1431 lb_call_backoff_(
1432 BackOff::Options()
1433 .set_initial_backoff(Duration::Seconds(
1434 GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS))
1435 .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
1436 .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
1437 .set_max_backoff(Duration::Seconds(
1438 GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS))),
1439 fallback_at_startup_timeout_(std::max(
1440 Duration::Zero(),
1441 channel_args()
1442 .GetDurationFromIntMillis(GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS)
1443 .value_or(Duration::Milliseconds(
1444 GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS)))),
1445 subchannel_cache_interval_(std::max(
1446 Duration::Zero(),
1447 channel_args()
1448 .GetDurationFromIntMillis(
1449 GRPC_ARG_GRPCLB_SUBCHANNEL_CACHE_INTERVAL_MS)
1450 .value_or(Duration::Milliseconds(
1451 GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS)))) {
1452 GRPC_TRACE_LOG(glb, INFO)
1453 << "[grpclb " << this << "] Will use '"
1454 << std::string(channel_control_helper()->GetAuthority())
1455 << "' as the server name for LB request.";
1456 }
1457
ShutdownLocked()1458 void GrpcLb::ShutdownLocked() {
1459 shutting_down_ = true;
1460 lb_calld_.reset();
1461 if (subchannel_cache_timer_handle_.has_value()) {
1462 channel_control_helper()->GetEventEngine()->Cancel(
1463 *subchannel_cache_timer_handle_);
1464 subchannel_cache_timer_handle_.reset();
1465 }
1466 cached_subchannels_.clear();
1467 if (lb_call_retry_timer_handle_.has_value()) {
1468 channel_control_helper()->GetEventEngine()->Cancel(
1469 *lb_call_retry_timer_handle_);
1470 }
1471 if (fallback_at_startup_checks_pending_) {
1472 fallback_at_startup_checks_pending_ = false;
1473 channel_control_helper()->GetEventEngine()->Cancel(
1474 *lb_fallback_timer_handle_);
1475 CancelBalancerChannelConnectivityWatchLocked();
1476 }
1477 if (child_policy_ != nullptr) {
1478 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
1479 interested_parties());
1480 child_policy_.reset();
1481 }
1482 // We destroy the LB channel here instead of in our destructor because
1483 // destroying the channel triggers a last callback to
1484 // OnBalancerChannelConnectivityChangedLocked(), and we need to be
1485 // alive when that callback is invoked.
1486 if (lb_channel_ != nullptr) {
1487 if (parent_channelz_node_ != nullptr) {
1488 channelz::ChannelNode* child_channelz_node = lb_channel_->channelz_node();
1489 CHECK_NE(child_channelz_node, nullptr);
1490 parent_channelz_node_->RemoveChildChannel(child_channelz_node->uuid());
1491 }
1492 lb_channel_.reset();
1493 }
1494 }
1495
1496 //
1497 // public methods
1498 //
1499
ResetBackoffLocked()1500 void GrpcLb::ResetBackoffLocked() {
1501 if (lb_channel_ != nullptr) {
1502 lb_channel_->ResetConnectionBackoff();
1503 }
1504 if (child_policy_ != nullptr) {
1505 child_policy_->ResetBackoffLocked();
1506 }
1507 }
1508
1509 // Endpoint iterator wrapper to add null LB token attribute.
1510 class GrpcLb::NullLbTokenEndpointIterator final
1511 : public EndpointAddressesIterator {
1512 public:
NullLbTokenEndpointIterator(std::shared_ptr<EndpointAddressesIterator> parent_it)1513 explicit NullLbTokenEndpointIterator(
1514 std::shared_ptr<EndpointAddressesIterator> parent_it)
1515 : parent_it_(std::move(parent_it)) {}
1516
ForEach(absl::FunctionRef<void (const EndpointAddresses &)> callback) const1517 void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
1518 const override {
1519 parent_it_->ForEach([&](const EndpointAddresses& endpoint) {
1520 GRPC_TRACE_LOG(glb, INFO)
1521 << "[grpclb " << this
1522 << "] fallback address: " << endpoint.ToString();
1523 callback(EndpointAddresses(endpoint.addresses(),
1524 endpoint.args().SetObject(empty_token_)));
1525 });
1526 }
1527
1528 private:
1529 std::shared_ptr<EndpointAddressesIterator> parent_it_;
1530 RefCountedPtr<TokenAndClientStatsArg> empty_token_ =
1531 MakeRefCounted<TokenAndClientStatsArg>(
1532 grpc_event_engine::experimental::Slice(), nullptr);
1533 };
1534
UpdateLocked(UpdateArgs args)1535 absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
1536 GRPC_TRACE_LOG(glb, INFO) << "[grpclb " << this << "] received update";
1537 const bool is_initial_update = lb_channel_ == nullptr;
1538 config_ = args.config.TakeAsSubclass<GrpcLbConfig>();
1539 CHECK(config_ != nullptr);
1540 args_ = std::move(args.args);
1541 // Update fallback address list.
1542 if (!args.addresses.ok()) {
1543 fallback_backend_addresses_ = args.addresses.status();
1544 } else {
1545 fallback_backend_addresses_ = std::make_shared<NullLbTokenEndpointIterator>(
1546 std::move(*args.addresses));
1547 }
1548 resolution_note_ = std::move(args.resolution_note);
1549 // Update balancer channel.
1550 absl::Status status = UpdateBalancerChannelLocked();
1551 // Update the existing child policy, if any.
1552 if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
1553 // If this is the initial update, start the fallback-at-startup checks
1554 // and the balancer call.
1555 if (is_initial_update) {
1556 fallback_at_startup_checks_pending_ = true;
1557 // Start timer.
1558 lb_fallback_timer_handle_ =
1559 channel_control_helper()->GetEventEngine()->RunAfter(
1560 fallback_at_startup_timeout_,
1561 [self = RefAsSubclass<GrpcLb>(DEBUG_LOCATION,
1562 "on_fallback_timer")]() mutable {
1563 ApplicationCallbackExecCtx callback_exec_ctx;
1564 ExecCtx exec_ctx;
1565 auto self_ptr = self.get();
1566 self_ptr->work_serializer()->Run(
1567 [self = std::move(self)]() { self->OnFallbackTimerLocked(); },
1568 DEBUG_LOCATION);
1569 });
1570 // Start watching the channel's connectivity state. If the channel
1571 // goes into state TRANSIENT_FAILURE before the timer fires, we go into
1572 // fallback mode even if the fallback timeout has not elapsed.
1573 watcher_ =
1574 new StateWatcher(RefAsSubclass<GrpcLb>(DEBUG_LOCATION, "StateWatcher"));
1575 lb_channel_->AddConnectivityWatcher(
1576 GRPC_CHANNEL_IDLE,
1577 OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
1578 // Start balancer call.
1579 StartBalancerCallLocked();
1580 }
1581 return status;
1582 }
1583
1584 //
1585 // helpers for UpdateLocked()
1586 //
1587
UpdateBalancerChannelLocked()1588 absl::Status GrpcLb::UpdateBalancerChannelLocked() {
1589 // Get balancer addresses.
1590 EndpointAddressesList balancer_addresses = ExtractBalancerAddresses(args_);
1591 if (GRPC_TRACE_FLAG_ENABLED(glb)) {
1592 for (const auto& endpoint : balancer_addresses) {
1593 LOG(INFO) << "[grpclb " << this
1594 << "] balancer address: " << endpoint.ToString();
1595 }
1596 }
1597 absl::Status status;
1598 if (balancer_addresses.empty()) {
1599 status = absl::UnavailableError("balancer address list must be non-empty");
1600 }
1601 // Create channel credentials that do not contain call credentials.
1602 auto channel_credentials = channel_control_helper()->GetChannelCredentials();
1603 // Construct args for balancer channel.
1604 ChannelArgs lb_channel_args =
1605 BuildBalancerChannelArgs(response_generator_.get(), args_);
1606 // Create balancer channel if needed.
1607 if (lb_channel_ == nullptr) {
1608 std::string uri_str =
1609 absl::StrCat("fake:///", channel_control_helper()->GetAuthority());
1610 lb_channel_.reset(Channel::FromC(
1611 grpc_channel_create(uri_str.c_str(), channel_credentials.get(),
1612 lb_channel_args.ToC().get())));
1613 CHECK(lb_channel_ != nullptr);
1614 // Set up channelz linkage.
1615 channelz::ChannelNode* child_channelz_node = lb_channel_->channelz_node();
1616 auto parent_channelz_node = args_.GetObjectRef<channelz::ChannelNode>();
1617 if (child_channelz_node != nullptr && parent_channelz_node != nullptr) {
1618 parent_channelz_node->AddChildChannel(child_channelz_node->uuid());
1619 parent_channelz_node_ = std::move(parent_channelz_node);
1620 }
1621 }
1622 // Propagate updates to the LB channel (pick_first) through the fake
1623 // resolver.
1624 Resolver::Result result;
1625 result.addresses = std::move(balancer_addresses);
1626 // Pass channel creds via channel args, since the fake resolver won't
1627 // do this automatically.
1628 result.args = lb_channel_args.SetObject(std::move(channel_credentials));
1629 response_generator_->SetResponseAsync(std::move(result));
1630 // Return status.
1631 return status;
1632 }
1633
CancelBalancerChannelConnectivityWatchLocked()1634 void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
1635 lb_channel_->RemoveConnectivityWatcher(watcher_);
1636 }
1637
1638 //
1639 // code for balancer channel and call
1640 //
1641
StartBalancerCallLocked()1642 void GrpcLb::StartBalancerCallLocked() {
1643 CHECK(lb_channel_ != nullptr);
1644 if (shutting_down_) return;
1645 // Init the LB call data.
1646 CHECK(lb_calld_ == nullptr);
1647 lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
1648 GRPC_TRACE_LOG(glb, INFO)
1649 << "[grpclb " << this
1650 << "] Query for backends (lb_channel: " << lb_channel_.get()
1651 << ", lb_calld: " << lb_calld_.get() << ")";
1652 lb_calld_->StartQuery();
1653 }
1654
StartBalancerCallRetryTimerLocked()1655 void GrpcLb::StartBalancerCallRetryTimerLocked() {
1656 Duration delay = lb_call_backoff_.NextAttemptDelay();
1657 if (GRPC_TRACE_FLAG_ENABLED(glb)) {
1658 LOG(INFO) << "[grpclb " << this << "] Connection to LB server lost...";
1659 if (delay > Duration::Zero()) {
1660 LOG(INFO) << "[grpclb " << this << "] ... retry_timer_active in "
1661 << delay.millis() << "ms.";
1662 } else {
1663 LOG(INFO) << "[grpclb " << this
1664 << "] ... retry_timer_active immediately.";
1665 }
1666 }
1667 lb_call_retry_timer_handle_ =
1668 channel_control_helper()->GetEventEngine()->RunAfter(
1669 delay,
1670 [self = RefAsSubclass<GrpcLb>(
1671 DEBUG_LOCATION, "on_balancer_call_retry_timer")]() mutable {
1672 ApplicationCallbackExecCtx callback_exec_ctx;
1673 ExecCtx exec_ctx;
1674 auto self_ptr = self.get();
1675 self_ptr->work_serializer()->Run(
1676 [self = std::move(self)]() {
1677 self->OnBalancerCallRetryTimerLocked();
1678 },
1679 DEBUG_LOCATION);
1680 });
1681 }
1682
OnBalancerCallRetryTimerLocked()1683 void GrpcLb::OnBalancerCallRetryTimerLocked() {
1684 lb_call_retry_timer_handle_.reset();
1685 if (!shutting_down_ && lb_calld_ == nullptr) {
1686 GRPC_TRACE_LOG(glb, INFO)
1687 << "[grpclb " << this << "] Restarting call to LB server";
1688 StartBalancerCallLocked();
1689 }
1690 }
1691
1692 //
1693 // code for handling fallback mode
1694 //
1695
MaybeEnterFallbackModeAfterStartup()1696 void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
1697 // Enter fallback mode if all of the following are true:
1698 // - We are not currently in fallback mode.
1699 // - We are not currently waiting for the initial fallback timeout.
1700 // - We are not currently in contact with the balancer.
1701 // - The child policy is not in state READY.
1702 if (!fallback_mode_ && !fallback_at_startup_checks_pending_ &&
1703 (lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) &&
1704 !child_policy_ready_) {
1705 LOG(INFO) << "[grpclb " << this
1706 << "] lost contact with balancer and backends from most recent "
1707 "serverlist; entering fallback mode";
1708 fallback_mode_ = true;
1709 CreateOrUpdateChildPolicyLocked();
1710 }
1711 }
1712
OnFallbackTimerLocked()1713 void GrpcLb::OnFallbackTimerLocked() {
1714 // If we receive a serverlist after the timer fires but before this callback
1715 // actually runs, don't fall back.
1716 if (fallback_at_startup_checks_pending_ && !shutting_down_) {
1717 LOG(INFO) << "[grpclb " << this
1718 << "] No response from balancer after fallback timeout; "
1719 "entering fallback mode";
1720 fallback_at_startup_checks_pending_ = false;
1721 CancelBalancerChannelConnectivityWatchLocked();
1722 fallback_mode_ = true;
1723 CreateOrUpdateChildPolicyLocked();
1724 }
1725 }
1726
1727 //
1728 // code for interacting with the child policy
1729 //
1730
CreateChildPolicyArgsLocked(bool is_backend_from_grpclb_load_balancer)1731 ChannelArgs GrpcLb::CreateChildPolicyArgsLocked(
1732 bool is_backend_from_grpclb_load_balancer) {
1733 ChannelArgs r =
1734 args_
1735 .Set(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER,
1736 is_backend_from_grpclb_load_balancer)
1737 .Set(GRPC_ARG_GRPCLB_ENABLE_LOAD_REPORTING_FILTER, 1);
1738 if (is_backend_from_grpclb_load_balancer) {
1739 r = r.Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
1740 }
1741 return r;
1742 }
1743
CreateChildPolicyLocked(const ChannelArgs & args)1744 OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
1745 const ChannelArgs& args) {
1746 LoadBalancingPolicy::Args lb_policy_args;
1747 lb_policy_args.work_serializer = work_serializer();
1748 lb_policy_args.args = args;
1749 lb_policy_args.channel_control_helper =
1750 std::make_unique<Helper>(RefAsSubclass<GrpcLb>(DEBUG_LOCATION, "Helper"));
1751 OrphanablePtr<LoadBalancingPolicy> lb_policy =
1752 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), &glb_trace);
1753 GRPC_TRACE_LOG(glb, INFO)
1754 << "[grpclb " << this << "] Created new child policy handler ("
1755 << lb_policy.get() << ")";
1756 // Add the gRPC LB's interested_parties pollset_set to that of the newly
1757 // created child policy. This will make the child policy progress upon
1758 // activity on gRPC LB, which in turn is tied to the application's call.
1759 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1760 interested_parties());
1761 return lb_policy;
1762 }
1763
EndpointIteratorIsEmpty(const EndpointAddressesIterator & endpoints)1764 bool EndpointIteratorIsEmpty(const EndpointAddressesIterator& endpoints) {
1765 bool empty = true;
1766 endpoints.ForEach([&](const EndpointAddresses&) { empty = false; });
1767 return empty;
1768 }
1769
CreateOrUpdateChildPolicyLocked()1770 void GrpcLb::CreateOrUpdateChildPolicyLocked() {
1771 if (shutting_down_) return;
1772 // Construct update args.
1773 UpdateArgs update_args;
1774 bool is_backend_from_grpclb_load_balancer = false;
1775 if (fallback_mode_) {
1776 // If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
1777 // received any serverlist from the balancer, we use the fallback
1778 // backends returned by the resolver. Note that the fallback backend
1779 // list may be empty, in which case the new child policy will fail the
1780 // picks.
1781 update_args.addresses = fallback_backend_addresses_;
1782 if (fallback_backend_addresses_.ok() &&
1783 EndpointIteratorIsEmpty(**fallback_backend_addresses_)) {
1784 update_args.resolution_note = absl::StrCat(
1785 "grpclb in fallback mode without any fallback addresses: ",
1786 resolution_note_);
1787 }
1788 } else {
1789 update_args.addresses = serverlist_->GetServerAddressList(
1790 lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
1791 is_backend_from_grpclb_load_balancer = true;
1792 if (update_args.addresses.ok() &&
1793 EndpointIteratorIsEmpty(**update_args.addresses)) {
1794 update_args.resolution_note = "empty serverlist from grpclb balancer";
1795 }
1796 }
1797 update_args.args =
1798 CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
1799 CHECK(update_args.args != ChannelArgs());
1800 update_args.config = config_->child_policy();
1801 // Create child policy if needed.
1802 if (child_policy_ == nullptr) {
1803 child_policy_ = CreateChildPolicyLocked(update_args.args);
1804 }
1805 // Update the policy.
1806 GRPC_TRACE_LOG(glb, INFO)
1807 << "[grpclb " << this << "] Updating child policy handler "
1808 << child_policy_.get();
1809 // TODO(roth): If we're in fallback mode and the child policy rejects the
1810 // update, we should propagate that failure back to the resolver somehow.
1811 (void)child_policy_->UpdateLocked(std::move(update_args));
1812 }
1813
1814 //
1815 // subchannel caching
1816 //
1817
CacheDeletedSubchannelLocked(RefCountedPtr<SubchannelInterface> subchannel)1818 void GrpcLb::CacheDeletedSubchannelLocked(
1819 RefCountedPtr<SubchannelInterface> subchannel) {
1820 Timestamp deletion_time = Timestamp::Now() + subchannel_cache_interval_;
1821 cached_subchannels_[deletion_time].push_back(std::move(subchannel));
1822 if (!subchannel_cache_timer_handle_.has_value()) {
1823 StartSubchannelCacheTimerLocked();
1824 }
1825 }
1826
StartSubchannelCacheTimerLocked()1827 void GrpcLb::StartSubchannelCacheTimerLocked() {
1828 CHECK(!cached_subchannels_.empty());
1829 subchannel_cache_timer_handle_ =
1830 channel_control_helper()->GetEventEngine()->RunAfter(
1831 cached_subchannels_.begin()->first - Timestamp::Now(),
1832 [self = RefAsSubclass<GrpcLb>(DEBUG_LOCATION,
1833 "OnSubchannelCacheTimer")]() mutable {
1834 ApplicationCallbackExecCtx callback_exec_ctx;
1835 ExecCtx exec_ctx;
1836 auto* self_ptr = self.get();
1837 self_ptr->work_serializer()->Run(
1838 [self = std::move(self)]() mutable {
1839 self->OnSubchannelCacheTimerLocked();
1840 },
1841 DEBUG_LOCATION);
1842 });
1843 }
1844
OnSubchannelCacheTimerLocked()1845 void GrpcLb::OnSubchannelCacheTimerLocked() {
1846 if (subchannel_cache_timer_handle_.has_value()) {
1847 subchannel_cache_timer_handle_.reset();
1848 auto it = cached_subchannels_.begin();
1849 if (it != cached_subchannels_.end()) {
1850 GRPC_TRACE_LOG(glb, INFO)
1851 << "[grpclb " << this << "] removing " << it->second.size()
1852 << " subchannels from cache";
1853 cached_subchannels_.erase(it);
1854 }
1855 if (!cached_subchannels_.empty()) {
1856 StartSubchannelCacheTimerLocked();
1857 return;
1858 }
1859 }
1860 }
1861
1862 //
1863 // factory
1864 //
1865
1866 class GrpcLbFactory final : public LoadBalancingPolicyFactory {
1867 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const1868 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1869 LoadBalancingPolicy::Args args) const override {
1870 return MakeOrphanable<GrpcLb>(std::move(args));
1871 }
1872
name() const1873 absl::string_view name() const override { return kGrpclb; }
1874
1875 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const1876 ParseLoadBalancingConfig(const Json& json) const override {
1877 return LoadFromJson<RefCountedPtr<GrpcLbConfig>>(
1878 json, JsonArgs(), "errors validating grpclb LB policy config");
1879 }
1880 };
1881
1882 } // namespace
1883
1884 //
1885 // Plugin registration
1886 //
1887
RegisterGrpcLbPolicy(CoreConfiguration::Builder * builder)1888 void RegisterGrpcLbPolicy(CoreConfiguration::Builder* builder) {
1889 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
1890 std::make_unique<GrpcLbFactory>());
1891 builder->channel_init()
1892 ->RegisterFilter<ClientLoadReportingFilter>(GRPC_CLIENT_SUBCHANNEL)
1893 .IfChannelArg(GRPC_ARG_GRPCLB_ENABLE_LOAD_REPORTING_FILTER, false);
1894 }
1895
1896 } // namespace grpc_core
1897