1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 /// Implementation of the gRPC LB policy.
20 ///
21 /// This policy takes as input a list of resolved addresses, which must
22 /// include at least one balancer address.
23 ///
24 /// An internal channel (\a lb_channel_) is created for the addresses
25 /// from that are balancers. This channel behaves just like a regular
26 /// channel that uses pick_first to select from the list of balancer
27 /// addresses.
28 ///
29 /// The first time the policy gets a request for a pick, a ping, or to exit
30 /// the idle state, \a StartPickingLocked() is called. This method is
31 /// responsible for instantiating the internal *streaming* call to the LB
32 /// server (whichever address pick_first chose). The call will be complete
33 /// when either the balancer sends status or when we cancel the call (e.g.,
34 /// because we are shutting down). In needed, we retry the call. If we
35 /// received at least one valid message from the server, a new call attempt
36 /// will be made immediately; otherwise, we apply back-off delays between
37 /// attempts.
38 ///
39 /// We maintain an internal round_robin policy instance for distributing
40 /// requests across backends. Whenever we receive a new serverlist from
41 /// the balancer, we update the round_robin policy with the new list of
42 /// addresses. If we cannot communicate with the balancer on startup,
43 /// however, we may enter fallback mode, in which case we will populate
44 /// the RR policy's addresses from the backend addresses returned by the
45 /// resolver.
46 ///
47 /// Once an RR policy instance is in place (and getting updated as described),
48 /// calls for a pick, a ping, or a cancellation will be serviced right
49 /// away by forwarding them to the RR instance. Any time there's no RR
50 /// policy available (i.e., right after the creation of the gRPCLB policy),
51 /// pick and ping requests are added to a list of pending picks and pings
52 /// to be flushed and serviced when the RR policy instance becomes available.
53 ///
54 /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
55 /// high level design and details.
56
57 // With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
58 // using that endpoint. Because of various transitive includes in uv.h,
59 // including windows.h on Windows, uv.h must be included before other system
60 // headers. Therefore, sockaddr.h must always be included first.
61 #include <grpc/support/port_platform.h>
62
63 #include "src/core/lib/iomgr/sockaddr.h"
64 #include "src/core/lib/iomgr/socket_utils.h"
65
66 #include <inttypes.h>
67 #include <limits.h>
68 #include <string.h>
69
70 #include <grpc/byte_buffer_reader.h>
71 #include <grpc/grpc.h>
72 #include <grpc/support/alloc.h>
73 #include <grpc/support/string_util.h>
74 #include <grpc/support/time.h>
75
76 #include "src/core/ext/filters/client_channel/client_channel.h"
77 #include "src/core/ext/filters/client_channel/client_channel_factory.h"
78 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
79 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
80 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
81 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
82 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
83 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
84 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
85 #include "src/core/ext/filters/client_channel/parse_address.h"
86 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
87 #include "src/core/ext/filters/client_channel/subchannel_index.h"
88 #include "src/core/lib/backoff/backoff.h"
89 #include "src/core/lib/channel/channel_args.h"
90 #include "src/core/lib/channel/channel_stack.h"
91 #include "src/core/lib/gpr/host_port.h"
92 #include "src/core/lib/gpr/string.h"
93 #include "src/core/lib/gprpp/manual_constructor.h"
94 #include "src/core/lib/gprpp/memory.h"
95 #include "src/core/lib/gprpp/mutex_lock.h"
96 #include "src/core/lib/gprpp/orphanable.h"
97 #include "src/core/lib/gprpp/ref_counted_ptr.h"
98 #include "src/core/lib/iomgr/combiner.h"
99 #include "src/core/lib/iomgr/sockaddr.h"
100 #include "src/core/lib/iomgr/sockaddr_utils.h"
101 #include "src/core/lib/iomgr/timer.h"
102 #include "src/core/lib/slice/slice_hash_table.h"
103 #include "src/core/lib/slice/slice_internal.h"
104 #include "src/core/lib/slice/slice_string_helpers.h"
105 #include "src/core/lib/surface/call.h"
106 #include "src/core/lib/surface/channel.h"
107 #include "src/core/lib/surface/channel_init.h"
108 #include "src/core/lib/transport/static_metadata.h"
109
110 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
111 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
112 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
113 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2
114 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
115
116 namespace grpc_core {
117
118 TraceFlag grpc_lb_glb_trace(false, "glb");
119
120 namespace {
121
122 class GrpcLb : public LoadBalancingPolicy {
123 public:
124 GrpcLb(const grpc_lb_addresses* addresses, const Args& args);
125
126 void UpdateLocked(const grpc_channel_args& args) override;
127 bool PickLocked(PickState* pick, grpc_error** error) override;
128 void CancelPickLocked(PickState* pick, grpc_error* error) override;
129 void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
130 uint32_t initial_metadata_flags_eq,
131 grpc_error* error) override;
132 void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
133 grpc_closure* closure) override;
134 grpc_connectivity_state CheckConnectivityLocked(
135 grpc_error** connectivity_error) override;
136 void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
137 void ExitIdleLocked() override;
138 void ResetBackoffLocked() override;
139 void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
140 ChildRefsList* child_channels) override;
141
142 private:
143 /// Linked list of pending pick requests. It stores all information needed to
144 /// eventually call (Round Robin's) pick() on them. They mainly stay pending
145 /// waiting for the RR policy to be created.
146 ///
147 /// Note that when a pick is sent to the RR policy, we inject our own
148 /// on_complete callback, so that we can intercept the result before
149 /// invoking the original on_complete callback. This allows us to set the
150 /// LB token metadata and add client_stats to the call context.
151 /// See \a pending_pick_complete() for details.
152 struct PendingPick {
153 // The grpclb instance that created the wrapping. This instance is not
154 // owned; reference counts are untouched. It's used only for logging
155 // purposes.
156 GrpcLb* grpclb_policy;
157 // The original pick.
158 PickState* pick;
159 // Our on_complete closure and the original one.
160 grpc_closure on_complete;
161 grpc_closure* original_on_complete;
162 // The LB token associated with the pick. This is set via user_data in
163 // the pick.
164 grpc_mdelem lb_token;
165 // Stats for client-side load reporting.
166 RefCountedPtr<GrpcLbClientStats> client_stats;
167 // Next pending pick.
168 PendingPick* next = nullptr;
169 };
170
171 /// Contains a call to the LB server and all the data related to the call.
172 class BalancerCallState
173 : public InternallyRefCountedWithTracing<BalancerCallState> {
174 public:
175 explicit BalancerCallState(
176 RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
177
178 // It's the caller's responsibility to ensure that Orphan() is called from
179 // inside the combiner.
180 void Orphan() override;
181
182 void StartQuery();
183
client_stats() const184 GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
185
seen_initial_response() const186 bool seen_initial_response() const { return seen_initial_response_; }
187
188 private:
189 // So Delete() can access our private dtor.
190 template <typename T>
191 friend void grpc_core::Delete(T*);
192
193 ~BalancerCallState();
194
grpclb_policy() const195 GrpcLb* grpclb_policy() const {
196 return static_cast<GrpcLb*>(grpclb_policy_.get());
197 }
198
199 void ScheduleNextClientLoadReportLocked();
200 void SendClientLoadReportLocked();
201
202 static bool LoadReportCountersAreZero(grpc_grpclb_request* request);
203
204 static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error);
205 static void ClientLoadReportDoneLocked(void* arg, grpc_error* error);
206 static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
207 static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error);
208 static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error);
209
210 // The owning LB policy.
211 RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
212
213 // The streaming call to the LB server. Always non-NULL.
214 grpc_call* lb_call_ = nullptr;
215
216 // recv_initial_metadata
217 grpc_metadata_array lb_initial_metadata_recv_;
218
219 // send_message
220 grpc_byte_buffer* send_message_payload_ = nullptr;
221 grpc_closure lb_on_initial_request_sent_;
222
223 // recv_message
224 grpc_byte_buffer* recv_message_payload_ = nullptr;
225 grpc_closure lb_on_balancer_message_received_;
226 bool seen_initial_response_ = false;
227
228 // recv_trailing_metadata
229 grpc_closure lb_on_balancer_status_received_;
230 grpc_metadata_array lb_trailing_metadata_recv_;
231 grpc_status_code lb_call_status_;
232 grpc_slice lb_call_status_details_;
233
234 // The stats for client-side load reporting associated with this LB call.
235 // Created after the first serverlist is received.
236 RefCountedPtr<GrpcLbClientStats> client_stats_;
237 grpc_millis client_stats_report_interval_ = 0;
238 grpc_timer client_load_report_timer_;
239 bool client_load_report_timer_callback_pending_ = false;
240 bool last_client_load_report_counters_were_zero_ = false;
241 bool client_load_report_is_due_ = false;
242 // The closure used for either the load report timer or the callback for
243 // completion of sending the load report.
244 grpc_closure client_load_report_closure_;
245 };
246
247 ~GrpcLb();
248
249 void ShutdownLocked() override;
250
251 // Helper function used in ctor and UpdateLocked().
252 void ProcessChannelArgsLocked(const grpc_channel_args& args);
253
254 // Methods for dealing with the balancer channel and call.
255 void StartPickingLocked();
256 void StartBalancerCallLocked();
257 static void OnFallbackTimerLocked(void* arg, grpc_error* error);
258 void StartBalancerCallRetryTimerLocked();
259 static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error);
260 static void OnBalancerChannelConnectivityChangedLocked(void* arg,
261 grpc_error* error);
262
263 // Pending pick methods.
264 static void PendingPickSetMetadataAndContext(PendingPick* pp);
265 PendingPick* PendingPickCreate(PickState* pick);
266 void AddPendingPick(PendingPick* pp);
267 static void OnPendingPickComplete(void* arg, grpc_error* error);
268
269 // Methods for dealing with the RR policy.
270 void CreateOrUpdateRoundRobinPolicyLocked();
271 grpc_channel_args* CreateRoundRobinPolicyArgsLocked();
272 void CreateRoundRobinPolicyLocked(const Args& args);
273 bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
274 grpc_error** error);
275 void UpdateConnectivityStateFromRoundRobinPolicyLocked(
276 grpc_error* rr_state_error);
277 static void OnRoundRobinConnectivityChangedLocked(void* arg,
278 grpc_error* error);
279 static void OnRoundRobinRequestReresolutionLocked(void* arg,
280 grpc_error* error);
281
282 // Who the client is trying to communicate with.
283 const char* server_name_ = nullptr;
284
285 // Current channel args from the resolver.
286 grpc_channel_args* args_ = nullptr;
287
288 // Internal state.
289 bool started_picking_ = false;
290 bool shutting_down_ = false;
291 grpc_connectivity_state_tracker state_tracker_;
292
293 // The channel for communicating with the LB server.
294 grpc_channel* lb_channel_ = nullptr;
295 // Mutex to protect the channel to the LB server. This is used when
296 // processing a channelz request.
297 gpr_mu lb_channel_mu_;
298 grpc_connectivity_state lb_channel_connectivity_;
299 grpc_closure lb_channel_on_connectivity_changed_;
300 // Are we already watching the LB channel's connectivity?
301 bool watching_lb_channel_ = false;
302 // Response generator to inject address updates into lb_channel_.
303 RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
304
305 // The data associated with the current LB call. It holds a ref to this LB
306 // policy. It's initialized every time we query for backends. It's reset to
307 // NULL whenever the current LB call is no longer needed (e.g., the LB policy
308 // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
309 // contains a non-NULL lb_call_.
310 OrphanablePtr<BalancerCallState> lb_calld_;
311 // Timeout in milliseconds for the LB call. 0 means no deadline.
312 int lb_call_timeout_ms_ = 0;
313 // Balancer call retry state.
314 BackOff lb_call_backoff_;
315 bool retry_timer_callback_pending_ = false;
316 grpc_timer lb_call_retry_timer_;
317 grpc_closure lb_on_call_retry_;
318
319 // The deserialized response from the balancer. May be nullptr until one
320 // such response has arrived.
321 grpc_grpclb_serverlist* serverlist_ = nullptr;
322 // Index into serverlist for next pick.
323 // If the server at this index is a drop, we return a drop.
324 // Otherwise, we delegate to the RR policy.
325 size_t serverlist_index_ = 0;
326
327 // Timeout in milliseconds for before using fallback backend addresses.
328 // 0 means not using fallback.
329 int lb_fallback_timeout_ms_ = 0;
330 // The backend addresses from the resolver.
331 grpc_lb_addresses* fallback_backend_addresses_ = nullptr;
332 // Fallback timer.
333 bool fallback_timer_callback_pending_ = false;
334 grpc_timer lb_fallback_timer_;
335 grpc_closure lb_on_fallback_;
336
337 // Pending picks that are waiting on the RR policy's connectivity.
338 PendingPick* pending_picks_ = nullptr;
339
340 // The RR policy to use for the backends.
341 OrphanablePtr<LoadBalancingPolicy> rr_policy_;
342 grpc_connectivity_state rr_connectivity_state_;
343 grpc_closure on_rr_connectivity_changed_;
344 grpc_closure on_rr_request_reresolution_;
345 };
346
347 //
348 // serverlist parsing code
349 //
350
351 // vtable for LB tokens in grpc_lb_addresses
lb_token_copy(void * token)352 void* lb_token_copy(void* token) {
353 return token == nullptr
354 ? nullptr
355 : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
356 }
lb_token_destroy(void * token)357 void lb_token_destroy(void* token) {
358 if (token != nullptr) {
359 GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token});
360 }
361 }
lb_token_cmp(void * token1,void * token2)362 int lb_token_cmp(void* token1, void* token2) {
363 if (token1 > token2) return 1;
364 if (token1 < token2) return -1;
365 return 0;
366 }
367 const grpc_lb_user_data_vtable lb_token_vtable = {
368 lb_token_copy, lb_token_destroy, lb_token_cmp};
369
370 // Returns the backend addresses extracted from the given addresses.
ExtractBackendAddresses(const grpc_lb_addresses * addresses)371 grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) {
372 // First pass: count the number of backend addresses.
373 size_t num_backends = 0;
374 for (size_t i = 0; i < addresses->num_addresses; ++i) {
375 if (!addresses->addresses[i].is_balancer) {
376 ++num_backends;
377 }
378 }
379 // Second pass: actually populate the addresses and (empty) LB tokens.
380 grpc_lb_addresses* backend_addresses =
381 grpc_lb_addresses_create(num_backends, &lb_token_vtable);
382 size_t num_copied = 0;
383 for (size_t i = 0; i < addresses->num_addresses; ++i) {
384 if (addresses->addresses[i].is_balancer) continue;
385 const grpc_resolved_address* addr = &addresses->addresses[i].address;
386 grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr,
387 addr->len, false /* is_balancer */,
388 nullptr /* balancer_name */,
389 (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
390 ++num_copied;
391 }
392 return backend_addresses;
393 }
394
IsServerValid(const grpc_grpclb_server * server,size_t idx,bool log)395 bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
396 if (server->drop) return false;
397 const grpc_grpclb_ip_address* ip = &server->ip_address;
398 if (GPR_UNLIKELY(server->port >> 16 != 0)) {
399 if (log) {
400 gpr_log(GPR_ERROR,
401 "Invalid port '%d' at index %lu of serverlist. Ignoring.",
402 server->port, (unsigned long)idx);
403 }
404 return false;
405 }
406 if (GPR_UNLIKELY(ip->size != 4 && ip->size != 16)) {
407 if (log) {
408 gpr_log(GPR_ERROR,
409 "Expected IP to be 4 or 16 bytes, got %d at index %lu of "
410 "serverlist. Ignoring",
411 ip->size, (unsigned long)idx);
412 }
413 return false;
414 }
415 return true;
416 }
417
ParseServer(const grpc_grpclb_server * server,grpc_resolved_address * addr)418 void ParseServer(const grpc_grpclb_server* server,
419 grpc_resolved_address* addr) {
420 memset(addr, 0, sizeof(*addr));
421 if (server->drop) return;
422 const uint16_t netorder_port = grpc_htons((uint16_t)server->port);
423 /* the addresses are given in binary format (a in(6)_addr struct) in
424 * server->ip_address.bytes. */
425 const grpc_grpclb_ip_address* ip = &server->ip_address;
426 if (ip->size == 4) {
427 addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
428 grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
429 addr4->sin_family = GRPC_AF_INET;
430 memcpy(&addr4->sin_addr, ip->bytes, ip->size);
431 addr4->sin_port = netorder_port;
432 } else if (ip->size == 16) {
433 addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
434 grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr;
435 addr6->sin6_family = GRPC_AF_INET6;
436 memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
437 addr6->sin6_port = netorder_port;
438 }
439 }
440
441 // Returns addresses extracted from \a serverlist.
ProcessServerlist(const grpc_grpclb_serverlist * serverlist)442 grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
443 size_t num_valid = 0;
444 /* first pass: count how many are valid in order to allocate the necessary
445 * memory in a single block */
446 for (size_t i = 0; i < serverlist->num_servers; ++i) {
447 if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid;
448 }
449 grpc_lb_addresses* lb_addresses =
450 grpc_lb_addresses_create(num_valid, &lb_token_vtable);
451 /* second pass: actually populate the addresses and LB tokens (aka user data
452 * to the outside world) to be read by the RR policy during its creation.
453 * Given that the validity tests are very cheap, they are performed again
454 * instead of marking the valid ones during the first pass, as this would
455 * incurr in an allocation due to the arbitrary number of server */
456 size_t addr_idx = 0;
457 for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
458 const grpc_grpclb_server* server = serverlist->servers[sl_idx];
459 if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue;
460 GPR_ASSERT(addr_idx < num_valid);
461 /* address processing */
462 grpc_resolved_address addr;
463 ParseServer(server, &addr);
464 /* lb token processing */
465 void* user_data;
466 if (server->has_load_balance_token) {
467 const size_t lb_token_max_length =
468 GPR_ARRAY_SIZE(server->load_balance_token);
469 const size_t lb_token_length =
470 strnlen(server->load_balance_token, lb_token_max_length);
471 grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
472 server->load_balance_token, lb_token_length);
473 user_data =
474 (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr)
475 .payload;
476 } else {
477 char* uri = grpc_sockaddr_to_uri(&addr);
478 gpr_log(GPR_INFO,
479 "Missing LB token for backend address '%s'. The empty token will "
480 "be used instead",
481 uri);
482 gpr_free(uri);
483 user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
484 }
485 grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
486 false /* is_balancer */,
487 nullptr /* balancer_name */, user_data);
488 ++addr_idx;
489 }
490 GPR_ASSERT(addr_idx == num_valid);
491 return lb_addresses;
492 }
493
494 //
495 // GrpcLb::BalancerCallState
496 //
497
BalancerCallState(RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)498 GrpcLb::BalancerCallState::BalancerCallState(
499 RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
500 : InternallyRefCountedWithTracing<BalancerCallState>(&grpc_lb_glb_trace),
501 grpclb_policy_(std::move(parent_grpclb_policy)) {
502 GPR_ASSERT(grpclb_policy_ != nullptr);
503 GPR_ASSERT(!grpclb_policy()->shutting_down_);
504 // Init the LB call. Note that the LB call will progress every time there's
505 // activity in grpclb_policy_->interested_parties(), which is comprised of
506 // the polling entities from client_channel.
507 GPR_ASSERT(grpclb_policy()->server_name_ != nullptr);
508 GPR_ASSERT(grpclb_policy()->server_name_[0] != '\0');
509 const grpc_millis deadline =
510 grpclb_policy()->lb_call_timeout_ms_ == 0
511 ? GRPC_MILLIS_INF_FUTURE
512 : ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_ms_;
513 lb_call_ = grpc_channel_create_pollset_set_call(
514 grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
515 grpclb_policy_->interested_parties(),
516 GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
517 nullptr, deadline, nullptr);
518 // Init the LB call request payload.
519 grpc_grpclb_request* request =
520 grpc_grpclb_request_create(grpclb_policy()->server_name_);
521 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
522 send_message_payload_ =
523 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
524 grpc_slice_unref_internal(request_payload_slice);
525 grpc_grpclb_request_destroy(request);
526 // Init other data associated with the LB call.
527 grpc_metadata_array_init(&lb_initial_metadata_recv_);
528 grpc_metadata_array_init(&lb_trailing_metadata_recv_);
529 GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSentLocked,
530 this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
531 GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
532 OnBalancerMessageReceivedLocked, this,
533 grpc_combiner_scheduler(grpclb_policy()->combiner()));
534 GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_,
535 OnBalancerStatusReceivedLocked, this,
536 grpc_combiner_scheduler(grpclb_policy()->combiner()));
537 }
538
~BalancerCallState()539 GrpcLb::BalancerCallState::~BalancerCallState() {
540 GPR_ASSERT(lb_call_ != nullptr);
541 grpc_call_unref(lb_call_);
542 grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
543 grpc_metadata_array_destroy(&lb_trailing_metadata_recv_);
544 grpc_byte_buffer_destroy(send_message_payload_);
545 grpc_byte_buffer_destroy(recv_message_payload_);
546 grpc_slice_unref_internal(lb_call_status_details_);
547 }
548
Orphan()549 void GrpcLb::BalancerCallState::Orphan() {
550 GPR_ASSERT(lb_call_ != nullptr);
551 // If we are here because grpclb_policy wants to cancel the call,
552 // lb_on_balancer_status_received_ will complete the cancellation and clean
553 // up. Otherwise, we are here because grpclb_policy has to orphan a failed
554 // call, then the following cancellation will be a no-op.
555 grpc_call_cancel(lb_call_, nullptr);
556 if (client_load_report_timer_callback_pending_) {
557 grpc_timer_cancel(&client_load_report_timer_);
558 }
559 // Note that the initial ref is hold by lb_on_balancer_status_received_
560 // instead of the caller of this function. So the corresponding unref happens
561 // in lb_on_balancer_status_received_ instead of here.
562 }
563
StartQuery()564 void GrpcLb::BalancerCallState::StartQuery() {
565 GPR_ASSERT(lb_call_ != nullptr);
566 if (grpc_lb_glb_trace.enabled()) {
567 gpr_log(GPR_INFO,
568 "[grpclb %p] Starting LB call (lb_calld: %p, lb_call: %p)",
569 grpclb_policy_.get(), this, lb_call_);
570 }
571 // Create the ops.
572 grpc_call_error call_error;
573 grpc_op ops[3];
574 memset(ops, 0, sizeof(ops));
575 // Op: send initial metadata.
576 grpc_op* op = ops;
577 op->op = GRPC_OP_SEND_INITIAL_METADATA;
578 op->data.send_initial_metadata.count = 0;
579 op->flags = 0;
580 op->reserved = nullptr;
581 op++;
582 // Op: send request message.
583 GPR_ASSERT(send_message_payload_ != nullptr);
584 op->op = GRPC_OP_SEND_MESSAGE;
585 op->data.send_message.send_message = send_message_payload_;
586 op->flags = 0;
587 op->reserved = nullptr;
588 op++;
589 // TODO(roth): We currently track this ref manually. Once the
590 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
591 // with the callback.
592 auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
593 self.release();
594 call_error = grpc_call_start_batch_and_execute(
595 lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_);
596 GPR_ASSERT(GRPC_CALL_OK == call_error);
597 // Op: recv initial metadata.
598 op = ops;
599 op->op = GRPC_OP_RECV_INITIAL_METADATA;
600 op->data.recv_initial_metadata.recv_initial_metadata =
601 &lb_initial_metadata_recv_;
602 op->flags = 0;
603 op->reserved = nullptr;
604 op++;
605 // Op: recv response.
606 op->op = GRPC_OP_RECV_MESSAGE;
607 op->data.recv_message.recv_message = &recv_message_payload_;
608 op->flags = 0;
609 op->reserved = nullptr;
610 op++;
611 // TODO(roth): We currently track this ref manually. Once the
612 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
613 // with the callback.
614 self = Ref(DEBUG_LOCATION, "on_message_received");
615 self.release();
616 call_error = grpc_call_start_batch_and_execute(
617 lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_);
618 GPR_ASSERT(GRPC_CALL_OK == call_error);
619 // Op: recv server status.
620 op = ops;
621 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
622 op->data.recv_status_on_client.trailing_metadata =
623 &lb_trailing_metadata_recv_;
624 op->data.recv_status_on_client.status = &lb_call_status_;
625 op->data.recv_status_on_client.status_details = &lb_call_status_details_;
626 op->flags = 0;
627 op->reserved = nullptr;
628 op++;
629 // This callback signals the end of the LB call, so it relies on the initial
630 // ref instead of a new ref. When it's invoked, it's the initial ref that is
631 // unreffed.
632 call_error = grpc_call_start_batch_and_execute(
633 lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_);
634 GPR_ASSERT(GRPC_CALL_OK == call_error);
635 };
636
ScheduleNextClientLoadReportLocked()637 void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
638 const grpc_millis next_client_load_report_time =
639 ExecCtx::Get()->Now() + client_stats_report_interval_;
640 GRPC_CLOSURE_INIT(&client_load_report_closure_,
641 MaybeSendClientLoadReportLocked, this,
642 grpc_combiner_scheduler(grpclb_policy()->combiner()));
643 grpc_timer_init(&client_load_report_timer_, next_client_load_report_time,
644 &client_load_report_closure_);
645 client_load_report_timer_callback_pending_ = true;
646 }
647
MaybeSendClientLoadReportLocked(void * arg,grpc_error * error)648 void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
649 void* arg, grpc_error* error) {
650 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
651 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
652 lb_calld->client_load_report_timer_callback_pending_ = false;
653 if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
654 lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
655 return;
656 }
657 // If we've already sent the initial request, then we can go ahead and send
658 // the load report. Otherwise, we need to wait until the initial request has
659 // been sent to send this (see OnInitialRequestSentLocked()).
660 if (lb_calld->send_message_payload_ == nullptr) {
661 lb_calld->SendClientLoadReportLocked();
662 } else {
663 lb_calld->client_load_report_is_due_ = true;
664 }
665 }
666
LoadReportCountersAreZero(grpc_grpclb_request * request)667 bool GrpcLb::BalancerCallState::LoadReportCountersAreZero(
668 grpc_grpclb_request* request) {
669 GrpcLbClientStats::DroppedCallCounts* drop_entries =
670 static_cast<GrpcLbClientStats::DroppedCallCounts*>(
671 request->client_stats.calls_finished_with_drop.arg);
672 return request->client_stats.num_calls_started == 0 &&
673 request->client_stats.num_calls_finished == 0 &&
674 request->client_stats.num_calls_finished_with_client_failed_to_send ==
675 0 &&
676 request->client_stats.num_calls_finished_known_received == 0 &&
677 (drop_entries == nullptr || drop_entries->size() == 0);
678 }
679
SendClientLoadReportLocked()680 void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
681 // Construct message payload.
682 GPR_ASSERT(send_message_payload_ == nullptr);
683 grpc_grpclb_request* request =
684 grpc_grpclb_load_report_request_create_locked(client_stats_.get());
685 // Skip client load report if the counters were all zero in the last
686 // report and they are still zero in this one.
687 if (LoadReportCountersAreZero(request)) {
688 if (last_client_load_report_counters_were_zero_) {
689 grpc_grpclb_request_destroy(request);
690 ScheduleNextClientLoadReportLocked();
691 return;
692 }
693 last_client_load_report_counters_were_zero_ = true;
694 } else {
695 last_client_load_report_counters_were_zero_ = false;
696 }
697 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
698 send_message_payload_ =
699 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
700 grpc_slice_unref_internal(request_payload_slice);
701 grpc_grpclb_request_destroy(request);
702 // Send the report.
703 grpc_op op;
704 memset(&op, 0, sizeof(op));
705 op.op = GRPC_OP_SEND_MESSAGE;
706 op.data.send_message.send_message = send_message_payload_;
707 GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked,
708 this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
709 grpc_call_error call_error = grpc_call_start_batch_and_execute(
710 lb_call_, &op, 1, &client_load_report_closure_);
711 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
712 gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", grpclb_policy_.get(),
713 call_error);
714 GPR_ASSERT(GRPC_CALL_OK == call_error);
715 }
716 }
717
ClientLoadReportDoneLocked(void * arg,grpc_error * error)718 void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
719 grpc_error* error) {
720 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
721 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
722 grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
723 lb_calld->send_message_payload_ = nullptr;
724 if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
725 lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
726 return;
727 }
728 lb_calld->ScheduleNextClientLoadReportLocked();
729 }
730
OnInitialRequestSentLocked(void * arg,grpc_error * error)731 void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(void* arg,
732 grpc_error* error) {
733 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
734 grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
735 lb_calld->send_message_payload_ = nullptr;
736 // If we attempted to send a client load report before the initial request was
737 // sent (and this lb_calld is still in use), send the load report now.
738 if (lb_calld->client_load_report_is_due_ &&
739 lb_calld == lb_calld->grpclb_policy()->lb_calld_.get()) {
740 lb_calld->SendClientLoadReportLocked();
741 lb_calld->client_load_report_is_due_ = false;
742 }
743 lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent");
744 }
745
OnBalancerMessageReceivedLocked(void * arg,grpc_error * error)746 void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
747 void* arg, grpc_error* error) {
748 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
749 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
750 // Empty payload means the LB call was cancelled.
751 if (lb_calld != grpclb_policy->lb_calld_.get() ||
752 lb_calld->recv_message_payload_ == nullptr) {
753 lb_calld->Unref(DEBUG_LOCATION, "on_message_received");
754 return;
755 }
756 grpc_byte_buffer_reader bbr;
757 grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_);
758 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
759 grpc_byte_buffer_reader_destroy(&bbr);
760 grpc_byte_buffer_destroy(lb_calld->recv_message_payload_);
761 lb_calld->recv_message_payload_ = nullptr;
762 grpc_grpclb_initial_response* initial_response;
763 grpc_grpclb_serverlist* serverlist;
764 if (!lb_calld->seen_initial_response_ &&
765 (initial_response = grpc_grpclb_initial_response_parse(response_slice)) !=
766 nullptr) {
767 // Have NOT seen initial response, look for initial response.
768 if (initial_response->has_client_stats_report_interval) {
769 lb_calld->client_stats_report_interval_ = GPR_MAX(
770 GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
771 &initial_response->client_stats_report_interval));
772 if (grpc_lb_glb_trace.enabled()) {
773 gpr_log(GPR_INFO,
774 "[grpclb %p] Received initial LB response message; "
775 "client load reporting interval = %" PRId64 " milliseconds",
776 grpclb_policy, lb_calld->client_stats_report_interval_);
777 }
778 } else if (grpc_lb_glb_trace.enabled()) {
779 gpr_log(GPR_INFO,
780 "[grpclb %p] Received initial LB response message; client load "
781 "reporting NOT enabled",
782 grpclb_policy);
783 }
784 grpc_grpclb_initial_response_destroy(initial_response);
785 lb_calld->seen_initial_response_ = true;
786 } else if ((serverlist = grpc_grpclb_response_parse_serverlist(
787 response_slice)) != nullptr) {
788 // Have seen initial response, look for serverlist.
789 GPR_ASSERT(lb_calld->lb_call_ != nullptr);
790 if (grpc_lb_glb_trace.enabled()) {
791 gpr_log(GPR_INFO,
792 "[grpclb %p] Serverlist with %" PRIuPTR " servers received",
793 grpclb_policy, serverlist->num_servers);
794 for (size_t i = 0; i < serverlist->num_servers; ++i) {
795 grpc_resolved_address addr;
796 ParseServer(serverlist->servers[i], &addr);
797 char* ipport;
798 grpc_sockaddr_to_string(&ipport, &addr, false);
799 gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s",
800 grpclb_policy, i, ipport);
801 gpr_free(ipport);
802 }
803 }
804 /* update serverlist */
805 if (serverlist->num_servers > 0) {
806 // Start sending client load report only after we start using the
807 // serverlist returned from the current LB call.
808 if (lb_calld->client_stats_report_interval_ > 0 &&
809 lb_calld->client_stats_ == nullptr) {
810 lb_calld->client_stats_.reset(New<GrpcLbClientStats>());
811 // TODO(roth): We currently track this ref manually. Once the
812 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
813 // with the callback.
814 auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report");
815 self.release();
816 lb_calld->ScheduleNextClientLoadReportLocked();
817 }
818 if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_,
819 serverlist)) {
820 if (grpc_lb_glb_trace.enabled()) {
821 gpr_log(GPR_INFO,
822 "[grpclb %p] Incoming server list identical to current, "
823 "ignoring.",
824 grpclb_policy);
825 }
826 grpc_grpclb_destroy_serverlist(serverlist);
827 } else { /* new serverlist */
828 if (grpclb_policy->serverlist_ != nullptr) {
829 /* dispose of the old serverlist */
830 grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_);
831 } else {
832 /* or dispose of the fallback */
833 grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_);
834 grpclb_policy->fallback_backend_addresses_ = nullptr;
835 if (grpclb_policy->fallback_timer_callback_pending_) {
836 grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
837 }
838 }
839 // and update the copy in the GrpcLb instance. This
840 // serverlist instance will be destroyed either upon the next
841 // update or when the GrpcLb instance is destroyed.
842 grpclb_policy->serverlist_ = serverlist;
843 grpclb_policy->serverlist_index_ = 0;
844 grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
845 }
846 } else {
847 if (grpc_lb_glb_trace.enabled()) {
848 gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.",
849 grpclb_policy);
850 }
851 grpc_grpclb_destroy_serverlist(serverlist);
852 }
853 } else {
854 // No valid initial response or serverlist found.
855 gpr_log(GPR_ERROR,
856 "[grpclb %p] Invalid LB response received: '%s'. Ignoring.",
857 grpclb_policy,
858 grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
859 }
860 grpc_slice_unref_internal(response_slice);
861 if (!grpclb_policy->shutting_down_) {
862 // Keep listening for serverlist updates.
863 grpc_op op;
864 memset(&op, 0, sizeof(op));
865 op.op = GRPC_OP_RECV_MESSAGE;
866 op.data.recv_message.recv_message = &lb_calld->recv_message_payload_;
867 op.flags = 0;
868 op.reserved = nullptr;
869 // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
870 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
871 lb_calld->lb_call_, &op, 1,
872 &lb_calld->lb_on_balancer_message_received_);
873 GPR_ASSERT(GRPC_CALL_OK == call_error);
874 } else {
875 lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
876 }
877 }
878
OnBalancerStatusReceivedLocked(void * arg,grpc_error * error)879 void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
880 void* arg, grpc_error* error) {
881 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
882 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
883 GPR_ASSERT(lb_calld->lb_call_ != nullptr);
884 if (grpc_lb_glb_trace.enabled()) {
885 char* status_details =
886 grpc_slice_to_c_string(lb_calld->lb_call_status_details_);
887 gpr_log(GPR_INFO,
888 "[grpclb %p] Status from LB server received. Status = %d, details "
889 "= '%s', (lb_calld: %p, lb_call: %p), error '%s'",
890 grpclb_policy, lb_calld->lb_call_status_, status_details, lb_calld,
891 lb_calld->lb_call_, grpc_error_string(error));
892 gpr_free(status_details);
893 }
894 grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE);
895 // If this lb_calld is still in use, this call ended because of a failure so
896 // we want to retry connecting. Otherwise, we have deliberately ended this
897 // call and no further action is required.
898 if (lb_calld == grpclb_policy->lb_calld_.get()) {
899 grpclb_policy->lb_calld_.reset();
900 GPR_ASSERT(!grpclb_policy->shutting_down_);
901 if (lb_calld->seen_initial_response_) {
902 // If we lose connection to the LB server, reset the backoff and restart
903 // the LB call immediately.
904 grpclb_policy->lb_call_backoff_.Reset();
905 grpclb_policy->StartBalancerCallLocked();
906 } else {
907 // If this LB call fails establishing any connection to the LB server,
908 // retry later.
909 grpclb_policy->StartBalancerCallRetryTimerLocked();
910 }
911 }
912 lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended");
913 }
914
915 //
916 // helper code for creating balancer channel
917 //
918
ExtractBalancerAddresses(const grpc_lb_addresses * addresses)919 grpc_lb_addresses* ExtractBalancerAddresses(
920 const grpc_lb_addresses* addresses) {
921 size_t num_grpclb_addrs = 0;
922 for (size_t i = 0; i < addresses->num_addresses; ++i) {
923 if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
924 }
925 // There must be at least one balancer address, or else the
926 // client_channel would not have chosen this LB policy.
927 GPR_ASSERT(num_grpclb_addrs > 0);
928 grpc_lb_addresses* lb_addresses =
929 grpc_lb_addresses_create(num_grpclb_addrs, nullptr);
930 size_t lb_addresses_idx = 0;
931 for (size_t i = 0; i < addresses->num_addresses; ++i) {
932 if (!addresses->addresses[i].is_balancer) continue;
933 if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) {
934 gpr_log(GPR_ERROR,
935 "This LB policy doesn't support user data. It will be ignored");
936 }
937 grpc_lb_addresses_set_address(
938 lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
939 addresses->addresses[i].address.len, false /* is balancer */,
940 addresses->addresses[i].balancer_name, nullptr /* user data */);
941 }
942 GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
943 return lb_addresses;
944 }
945
946 /* Returns the channel args for the LB channel, used to create a bidirectional
947 * stream for the reception of load balancing updates.
948 *
949 * Inputs:
950 * - \a addresses: corresponding to the balancers.
951 * - \a response_generator: in order to propagate updates from the resolver
952 * above the grpclb policy.
953 * - \a args: other args inherited from the grpclb policy. */
BuildBalancerChannelArgs(const grpc_lb_addresses * addresses,FakeResolverResponseGenerator * response_generator,const grpc_channel_args * args)954 grpc_channel_args* BuildBalancerChannelArgs(
955 const grpc_lb_addresses* addresses,
956 FakeResolverResponseGenerator* response_generator,
957 const grpc_channel_args* args) {
958 grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses);
959 // Channel args to remove.
960 static const char* args_to_remove[] = {
961 // LB policy name, since we want to use the default (pick_first) in
962 // the LB channel.
963 GRPC_ARG_LB_POLICY_NAME,
964 // The channel arg for the server URI, since that will be different for
965 // the LB channel than for the parent channel. The client channel
966 // factory will re-add this arg with the right value.
967 GRPC_ARG_SERVER_URI,
968 // The resolved addresses, which will be generated by the name resolver
969 // used in the LB channel. Note that the LB channel will use the fake
970 // resolver, so this won't actually generate a query to DNS (or some
971 // other name service). However, the addresses returned by the fake
972 // resolver will have is_balancer=false, whereas our own addresses have
973 // is_balancer=true. We need the LB channel to return addresses with
974 // is_balancer=false so that it does not wind up recursively using the
975 // grpclb LB policy, as per the special case logic in client_channel.c.
976 GRPC_ARG_LB_ADDRESSES,
977 // The fake resolver response generator, because we are replacing it
978 // with the one from the grpclb policy, used to propagate updates to
979 // the LB channel.
980 GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
981 // The LB channel should use the authority indicated by the target
982 // authority table (see \a grpc_lb_policy_grpclb_modify_lb_channel_args),
983 // as opposed to the authority from the parent channel.
984 GRPC_ARG_DEFAULT_AUTHORITY,
985 // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be
986 // treated as a stand-alone channel and not inherit this argument from the
987 // args of the parent channel.
988 GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
989 };
990 // Channel args to add.
991 const grpc_arg args_to_add[] = {
992 // New LB addresses.
993 // Note that we pass these in both when creating the LB channel
994 // and via the fake resolver. The latter is what actually gets used.
995 grpc_lb_addresses_create_channel_arg(lb_addresses),
996 // The fake resolver response generator, which we use to inject
997 // address updates into the LB channel.
998 grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
999 response_generator),
1000 // A channel arg indicating the target is a grpclb load balancer.
1001 grpc_channel_arg_integer_create(
1002 const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1),
1003 // A channel arg indicating this is an internal channels, aka it is
1004 // owned by components in Core, not by the user application.
1005 grpc_channel_arg_integer_create(
1006 const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL), 1),
1007 };
1008 // Construct channel args.
1009 grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1010 args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add,
1011 GPR_ARRAY_SIZE(args_to_add));
1012 // Make any necessary modifications for security.
1013 new_args = grpc_lb_policy_grpclb_modify_lb_channel_args(new_args);
1014 // Clean up.
1015 grpc_lb_addresses_destroy(lb_addresses);
1016 return new_args;
1017 }
1018
1019 //
1020 // ctor and dtor
1021 //
1022
GrpcLb(const grpc_lb_addresses * addresses,const LoadBalancingPolicy::Args & args)1023 GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
1024 const LoadBalancingPolicy::Args& args)
1025 : LoadBalancingPolicy(args),
1026 response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
1027 lb_call_backoff_(
1028 BackOff::Options()
1029 .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS *
1030 1000)
1031 .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
1032 .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
1033 .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
1034 1000)) {
1035 // Initialization.
1036 gpr_mu_init(&lb_channel_mu_);
1037 grpc_subchannel_index_ref();
1038 GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
1039 &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
1040 grpc_combiner_scheduler(args.combiner));
1041 GRPC_CLOSURE_INIT(&on_rr_connectivity_changed_,
1042 &GrpcLb::OnRoundRobinConnectivityChangedLocked, this,
1043 grpc_combiner_scheduler(args.combiner));
1044 GRPC_CLOSURE_INIT(&on_rr_request_reresolution_,
1045 &GrpcLb::OnRoundRobinRequestReresolutionLocked, this,
1046 grpc_combiner_scheduler(args.combiner));
1047 grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "grpclb");
1048 // Record server name.
1049 const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
1050 const char* server_uri = grpc_channel_arg_get_string(arg);
1051 GPR_ASSERT(server_uri != nullptr);
1052 grpc_uri* uri = grpc_uri_parse(server_uri, true);
1053 GPR_ASSERT(uri->path[0] != '\0');
1054 server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
1055 if (grpc_lb_glb_trace.enabled()) {
1056 gpr_log(GPR_INFO,
1057 "[grpclb %p] Will use '%s' as the server name for LB request.",
1058 this, server_name_);
1059 }
1060 grpc_uri_destroy(uri);
1061 // Record LB call timeout.
1062 arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
1063 lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
1064 // Record fallback timeout.
1065 arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
1066 lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer(
1067 arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
1068 // Process channel args.
1069 ProcessChannelArgsLocked(*args.args);
1070 }
1071
~GrpcLb()1072 GrpcLb::~GrpcLb() {
1073 GPR_ASSERT(pending_picks_ == nullptr);
1074 gpr_mu_destroy(&lb_channel_mu_);
1075 gpr_free((void*)server_name_);
1076 grpc_channel_args_destroy(args_);
1077 grpc_connectivity_state_destroy(&state_tracker_);
1078 if (serverlist_ != nullptr) {
1079 grpc_grpclb_destroy_serverlist(serverlist_);
1080 }
1081 if (fallback_backend_addresses_ != nullptr) {
1082 grpc_lb_addresses_destroy(fallback_backend_addresses_);
1083 }
1084 grpc_subchannel_index_unref();
1085 }
1086
ShutdownLocked()1087 void GrpcLb::ShutdownLocked() {
1088 grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
1089 shutting_down_ = true;
1090 lb_calld_.reset();
1091 if (retry_timer_callback_pending_) {
1092 grpc_timer_cancel(&lb_call_retry_timer_);
1093 }
1094 if (fallback_timer_callback_pending_) {
1095 grpc_timer_cancel(&lb_fallback_timer_);
1096 }
1097 rr_policy_.reset();
1098 TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
1099 // We destroy the LB channel here instead of in our destructor because
1100 // destroying the channel triggers a last callback to
1101 // OnBalancerChannelConnectivityChangedLocked(), and we need to be
1102 // alive when that callback is invoked.
1103 if (lb_channel_ != nullptr) {
1104 gpr_mu_lock(&lb_channel_mu_);
1105 grpc_channel_destroy(lb_channel_);
1106 lb_channel_ = nullptr;
1107 gpr_mu_unlock(&lb_channel_mu_);
1108 }
1109 grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
1110 GRPC_ERROR_REF(error), "grpclb_shutdown");
1111 // Clear pending picks.
1112 PendingPick* pp;
1113 while ((pp = pending_picks_) != nullptr) {
1114 pending_picks_ = pp->next;
1115 pp->pick->connected_subchannel.reset();
1116 // Note: pp is deleted in this callback.
1117 GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
1118 }
1119 GRPC_ERROR_UNREF(error);
1120 }
1121
1122 //
1123 // public methods
1124 //
1125
HandOffPendingPicksLocked(LoadBalancingPolicy * new_policy)1126 void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
1127 PendingPick* pp;
1128 while ((pp = pending_picks_) != nullptr) {
1129 pending_picks_ = pp->next;
1130 pp->pick->on_complete = pp->original_on_complete;
1131 pp->pick->user_data = nullptr;
1132 grpc_error* error = GRPC_ERROR_NONE;
1133 if (new_policy->PickLocked(pp->pick, &error)) {
1134 // Synchronous return; schedule closure.
1135 GRPC_CLOSURE_SCHED(pp->pick->on_complete, error);
1136 }
1137 Delete(pp);
1138 }
1139 }
1140
1141 // Cancel a specific pending pick.
1142 //
1143 // A grpclb pick progresses as follows:
1144 // - If there's a Round Robin policy (rr_policy_) available, it'll be
1145 // handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
1146 // that point onwards, it'll be RR's responsibility. For cancellations, that
1147 // implies the pick needs also be cancelled by the RR instance.
1148 // - Otherwise, without an RR instance, picks stay pending at this policy's
1149 // level (grpclb), inside the pending_picks_ list. To cancel these,
1150 // we invoke the completion closure and set the pick's connected
1151 // subchannel to nullptr right here.
CancelPickLocked(PickState * pick,grpc_error * error)1152 void GrpcLb::CancelPickLocked(PickState* pick, grpc_error* error) {
1153 PendingPick* pp = pending_picks_;
1154 pending_picks_ = nullptr;
1155 while (pp != nullptr) {
1156 PendingPick* next = pp->next;
1157 if (pp->pick == pick) {
1158 pick->connected_subchannel.reset();
1159 // Note: pp is deleted in this callback.
1160 GRPC_CLOSURE_SCHED(&pp->on_complete,
1161 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1162 "Pick Cancelled", &error, 1));
1163 } else {
1164 pp->next = pending_picks_;
1165 pending_picks_ = pp;
1166 }
1167 pp = next;
1168 }
1169 if (rr_policy_ != nullptr) {
1170 rr_policy_->CancelPickLocked(pick, GRPC_ERROR_REF(error));
1171 }
1172 GRPC_ERROR_UNREF(error);
1173 }
1174
1175 // Cancel all pending picks.
1176 //
1177 // A grpclb pick progresses as follows:
1178 // - If there's a Round Robin policy (rr_policy_) available, it'll be
1179 // handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
1180 // that point onwards, it'll be RR's responsibility. For cancellations, that
1181 // implies the pick needs also be cancelled by the RR instance.
1182 // - Otherwise, without an RR instance, picks stay pending at this policy's
1183 // level (grpclb), inside the pending_picks_ list. To cancel these,
1184 // we invoke the completion closure and set the pick's connected
1185 // subchannel to nullptr right here.
CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,uint32_t initial_metadata_flags_eq,grpc_error * error)1186 void GrpcLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
1187 uint32_t initial_metadata_flags_eq,
1188 grpc_error* error) {
1189 PendingPick* pp = pending_picks_;
1190 pending_picks_ = nullptr;
1191 while (pp != nullptr) {
1192 PendingPick* next = pp->next;
1193 if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
1194 initial_metadata_flags_eq) {
1195 // Note: pp is deleted in this callback.
1196 GRPC_CLOSURE_SCHED(&pp->on_complete,
1197 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1198 "Pick Cancelled", &error, 1));
1199 } else {
1200 pp->next = pending_picks_;
1201 pending_picks_ = pp;
1202 }
1203 pp = next;
1204 }
1205 if (rr_policy_ != nullptr) {
1206 rr_policy_->CancelMatchingPicksLocked(initial_metadata_flags_mask,
1207 initial_metadata_flags_eq,
1208 GRPC_ERROR_REF(error));
1209 }
1210 GRPC_ERROR_UNREF(error);
1211 }
1212
ExitIdleLocked()1213 void GrpcLb::ExitIdleLocked() {
1214 if (!started_picking_) {
1215 StartPickingLocked();
1216 }
1217 }
1218
ResetBackoffLocked()1219 void GrpcLb::ResetBackoffLocked() {
1220 if (lb_channel_ != nullptr) {
1221 grpc_channel_reset_connect_backoff(lb_channel_);
1222 }
1223 if (rr_policy_ != nullptr) {
1224 rr_policy_->ResetBackoffLocked();
1225 }
1226 }
1227
PickLocked(PickState * pick,grpc_error ** error)1228 bool GrpcLb::PickLocked(PickState* pick, grpc_error** error) {
1229 PendingPick* pp = PendingPickCreate(pick);
1230 bool pick_done = false;
1231 if (rr_policy_ != nullptr) {
1232 if (grpc_lb_glb_trace.enabled()) {
1233 gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", this,
1234 rr_policy_.get());
1235 }
1236 pick_done =
1237 PickFromRoundRobinPolicyLocked(false /* force_async */, pp, error);
1238 } else { // rr_policy_ == NULL
1239 if (pick->on_complete == nullptr) {
1240 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1241 "No pick result available but synchronous result required.");
1242 pick_done = true;
1243 } else {
1244 if (grpc_lb_glb_trace.enabled()) {
1245 gpr_log(GPR_INFO,
1246 "[grpclb %p] No RR policy. Adding to grpclb's pending picks",
1247 this);
1248 }
1249 AddPendingPick(pp);
1250 if (!started_picking_) {
1251 StartPickingLocked();
1252 }
1253 pick_done = false;
1254 }
1255 }
1256 return pick_done;
1257 }
1258
FillChildRefsForChannelz(ChildRefsList * child_subchannels,ChildRefsList * child_channels)1259 void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels,
1260 ChildRefsList* child_channels) {
1261 // delegate to the RoundRobin to fill the children subchannels.
1262 rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
1263 MutexLock lock(&lb_channel_mu_);
1264 if (lb_channel_ != nullptr) {
1265 grpc_core::channelz::ChannelNode* channel_node =
1266 grpc_channel_get_channelz_node(lb_channel_);
1267 if (channel_node != nullptr) {
1268 child_channels->push_back(channel_node->uuid());
1269 }
1270 }
1271 }
1272
CheckConnectivityLocked(grpc_error ** connectivity_error)1273 grpc_connectivity_state GrpcLb::CheckConnectivityLocked(
1274 grpc_error** connectivity_error) {
1275 return grpc_connectivity_state_get(&state_tracker_, connectivity_error);
1276 }
1277
NotifyOnStateChangeLocked(grpc_connectivity_state * current,grpc_closure * notify)1278 void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
1279 grpc_closure* notify) {
1280 grpc_connectivity_state_notify_on_state_change(&state_tracker_, current,
1281 notify);
1282 }
1283
ProcessChannelArgsLocked(const grpc_channel_args & args)1284 void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
1285 const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
1286 if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
1287 // Ignore this update.
1288 gpr_log(
1289 GPR_ERROR,
1290 "[grpclb %p] No valid LB addresses channel arg in update, ignoring.",
1291 this);
1292 return;
1293 }
1294 const grpc_lb_addresses* addresses =
1295 static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
1296 // Update fallback address list.
1297 if (fallback_backend_addresses_ != nullptr) {
1298 grpc_lb_addresses_destroy(fallback_backend_addresses_);
1299 }
1300 fallback_backend_addresses_ = ExtractBackendAddresses(addresses);
1301 // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
1302 // since we use this to trigger the client_load_reporting filter.
1303 static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
1304 grpc_arg new_arg = grpc_channel_arg_string_create(
1305 (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb");
1306 grpc_channel_args_destroy(args_);
1307 args_ = grpc_channel_args_copy_and_add_and_remove(
1308 &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
1309 // Construct args for balancer channel.
1310 grpc_channel_args* lb_channel_args =
1311 BuildBalancerChannelArgs(addresses, response_generator_.get(), &args);
1312 // Create balancer channel if needed.
1313 if (lb_channel_ == nullptr) {
1314 char* uri_str;
1315 gpr_asprintf(&uri_str, "fake:///%s", server_name_);
1316 gpr_mu_lock(&lb_channel_mu_);
1317 lb_channel_ = grpc_client_channel_factory_create_channel(
1318 client_channel_factory(), uri_str,
1319 GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args);
1320 gpr_mu_unlock(&lb_channel_mu_);
1321 GPR_ASSERT(lb_channel_ != nullptr);
1322 gpr_free(uri_str);
1323 }
1324 // Propagate updates to the LB channel (pick_first) through the fake
1325 // resolver.
1326 response_generator_->SetResponse(lb_channel_args);
1327 grpc_channel_args_destroy(lb_channel_args);
1328 }
1329
UpdateLocked(const grpc_channel_args & args)1330 void GrpcLb::UpdateLocked(const grpc_channel_args& args) {
1331 ProcessChannelArgsLocked(args);
1332 // If fallback is configured and the RR policy already exists, update
1333 // it with the new fallback addresses.
1334 if (lb_fallback_timeout_ms_ > 0 && rr_policy_ != nullptr) {
1335 CreateOrUpdateRoundRobinPolicyLocked();
1336 }
1337 // Start watching the LB channel connectivity for connection, if not
1338 // already doing so.
1339 if (!watching_lb_channel_) {
1340 lb_channel_connectivity_ = grpc_channel_check_connectivity_state(
1341 lb_channel_, true /* try to connect */);
1342 grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
1343 grpc_channel_get_channel_stack(lb_channel_));
1344 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1345 watching_lb_channel_ = true;
1346 // TODO(roth): We currently track this ref manually. Once the
1347 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1348 // with the callback.
1349 auto self = Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity");
1350 self.release();
1351 grpc_client_channel_watch_connectivity_state(
1352 client_channel_elem,
1353 grpc_polling_entity_create_from_pollset_set(interested_parties()),
1354 &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
1355 nullptr);
1356 }
1357 }
1358
1359 //
1360 // code for balancer channel and call
1361 //
1362
StartPickingLocked()1363 void GrpcLb::StartPickingLocked() {
1364 // Start a timer to fall back.
1365 if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr &&
1366 !fallback_timer_callback_pending_) {
1367 grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
1368 // TODO(roth): We currently track this ref manually. Once the
1369 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1370 // with the callback.
1371 auto self = Ref(DEBUG_LOCATION, "on_fallback_timer");
1372 self.release();
1373 GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
1374 grpc_combiner_scheduler(combiner()));
1375 fallback_timer_callback_pending_ = true;
1376 grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
1377 }
1378 started_picking_ = true;
1379 StartBalancerCallLocked();
1380 }
1381
StartBalancerCallLocked()1382 void GrpcLb::StartBalancerCallLocked() {
1383 GPR_ASSERT(lb_channel_ != nullptr);
1384 if (shutting_down_) return;
1385 // Init the LB call data.
1386 GPR_ASSERT(lb_calld_ == nullptr);
1387 lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
1388 if (grpc_lb_glb_trace.enabled()) {
1389 gpr_log(GPR_INFO,
1390 "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
1391 this, lb_channel_, lb_calld_.get());
1392 }
1393 lb_calld_->StartQuery();
1394 }
1395
OnFallbackTimerLocked(void * arg,grpc_error * error)1396 void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
1397 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1398 grpclb_policy->fallback_timer_callback_pending_ = false;
1399 // If we receive a serverlist after the timer fires but before this callback
1400 // actually runs, don't fall back.
1401 if (grpclb_policy->serverlist_ == nullptr && !grpclb_policy->shutting_down_ &&
1402 error == GRPC_ERROR_NONE) {
1403 if (grpc_lb_glb_trace.enabled()) {
1404 gpr_log(GPR_INFO,
1405 "[grpclb %p] Falling back to use backends from resolver",
1406 grpclb_policy);
1407 }
1408 GPR_ASSERT(grpclb_policy->fallback_backend_addresses_ != nullptr);
1409 grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
1410 }
1411 grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
1412 }
1413
StartBalancerCallRetryTimerLocked()1414 void GrpcLb::StartBalancerCallRetryTimerLocked() {
1415 grpc_millis next_try = lb_call_backoff_.NextAttemptTime();
1416 if (grpc_lb_glb_trace.enabled()) {
1417 gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this);
1418 grpc_millis timeout = next_try - ExecCtx::Get()->Now();
1419 if (timeout > 0) {
1420 gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.",
1421 this, timeout);
1422 } else {
1423 gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active immediately.",
1424 this);
1425 }
1426 }
1427 // TODO(roth): We currently track this ref manually. Once the
1428 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1429 // with the callback.
1430 auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1431 self.release();
1432 GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimerLocked,
1433 this, grpc_combiner_scheduler(combiner()));
1434 retry_timer_callback_pending_ = true;
1435 grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
1436 }
1437
OnBalancerCallRetryTimerLocked(void * arg,grpc_error * error)1438 void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
1439 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1440 grpclb_policy->retry_timer_callback_pending_ = false;
1441 if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
1442 grpclb_policy->lb_calld_ == nullptr) {
1443 if (grpc_lb_glb_trace.enabled()) {
1444 gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server",
1445 grpclb_policy);
1446 }
1447 grpclb_policy->StartBalancerCallLocked();
1448 }
1449 grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1450 }
1451
1452 // Invoked as part of the update process. It continues watching the LB channel
1453 // until it shuts down or becomes READY. It's invoked even if the LB channel
1454 // stayed READY throughout the update (for example if the update is identical).
OnBalancerChannelConnectivityChangedLocked(void * arg,grpc_error * error)1455 void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
1456 grpc_error* error) {
1457 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1458 if (grpclb_policy->shutting_down_) goto done;
1459 // Re-initialize the lb_call. This should also take care of updating the
1460 // embedded RR policy. Note that the current RR policy, if any, will stay in
1461 // effect until an update from the new lb_call is received.
1462 switch (grpclb_policy->lb_channel_connectivity_) {
1463 case GRPC_CHANNEL_CONNECTING:
1464 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
1465 // Keep watching the LB channel.
1466 grpc_channel_element* client_channel_elem =
1467 grpc_channel_stack_last_element(
1468 grpc_channel_get_channel_stack(grpclb_policy->lb_channel_));
1469 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1470 grpc_client_channel_watch_connectivity_state(
1471 client_channel_elem,
1472 grpc_polling_entity_create_from_pollset_set(
1473 grpclb_policy->interested_parties()),
1474 &grpclb_policy->lb_channel_connectivity_,
1475 &grpclb_policy->lb_channel_on_connectivity_changed_, nullptr);
1476 break;
1477 }
1478 // The LB channel may be IDLE because it's shut down before the update.
1479 // Restart the LB call to kick the LB channel into gear.
1480 case GRPC_CHANNEL_IDLE:
1481 case GRPC_CHANNEL_READY:
1482 grpclb_policy->lb_calld_.reset();
1483 if (grpclb_policy->started_picking_) {
1484 if (grpclb_policy->retry_timer_callback_pending_) {
1485 grpc_timer_cancel(&grpclb_policy->lb_call_retry_timer_);
1486 }
1487 grpclb_policy->lb_call_backoff_.Reset();
1488 grpclb_policy->StartBalancerCallLocked();
1489 }
1490 // Fall through.
1491 case GRPC_CHANNEL_SHUTDOWN:
1492 done:
1493 grpclb_policy->watching_lb_channel_ = false;
1494 grpclb_policy->Unref(DEBUG_LOCATION,
1495 "watch_lb_channel_connectivity_cb_shutdown");
1496 }
1497 }
1498
1499 //
1500 // PendingPick
1501 //
1502
1503 // Adds lb_token of selected subchannel (address) to the call's initial
1504 // metadata.
AddLbTokenToInitialMetadata(grpc_mdelem lb_token,grpc_linked_mdelem * lb_token_mdelem_storage,grpc_metadata_batch * initial_metadata)1505 grpc_error* AddLbTokenToInitialMetadata(
1506 grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage,
1507 grpc_metadata_batch* initial_metadata) {
1508 GPR_ASSERT(lb_token_mdelem_storage != nullptr);
1509 GPR_ASSERT(!GRPC_MDISNULL(lb_token));
1510 return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
1511 lb_token);
1512 }
1513
1514 // Destroy function used when embedding client stats in call context.
DestroyClientStats(void * arg)1515 void DestroyClientStats(void* arg) {
1516 static_cast<GrpcLbClientStats*>(arg)->Unref();
1517 }
1518
PendingPickSetMetadataAndContext(PendingPick * pp)1519 void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
1520 /* if connected_subchannel is nullptr, no pick has been made by the RR
1521 * policy (e.g., all addresses failed to connect). There won't be any
1522 * user_data/token available */
1523 if (pp->pick->connected_subchannel != nullptr) {
1524 if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) {
1525 AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token),
1526 &pp->pick->lb_token_mdelem_storage,
1527 pp->pick->initial_metadata);
1528 } else {
1529 gpr_log(GPR_ERROR,
1530 "[grpclb %p] No LB token for connected subchannel pick %p",
1531 pp->grpclb_policy, pp->pick);
1532 abort();
1533 }
1534 // Pass on client stats via context. Passes ownership of the reference.
1535 if (pp->client_stats != nullptr) {
1536 pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
1537 pp->client_stats.release();
1538 pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
1539 DestroyClientStats;
1540 }
1541 } else {
1542 pp->client_stats.reset();
1543 }
1544 }
1545
1546 /* The \a on_complete closure passed as part of the pick requires keeping a
1547 * reference to its associated round robin instance. We wrap this closure in
1548 * order to unref the round robin instance upon its invocation */
OnPendingPickComplete(void * arg,grpc_error * error)1549 void GrpcLb::OnPendingPickComplete(void* arg, grpc_error* error) {
1550 PendingPick* pp = static_cast<PendingPick*>(arg);
1551 PendingPickSetMetadataAndContext(pp);
1552 GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error));
1553 Delete(pp);
1554 }
1555
PendingPickCreate(PickState * pick)1556 GrpcLb::PendingPick* GrpcLb::PendingPickCreate(PickState* pick) {
1557 PendingPick* pp = New<PendingPick>();
1558 pp->grpclb_policy = this;
1559 pp->pick = pick;
1560 GRPC_CLOSURE_INIT(&pp->on_complete, &GrpcLb::OnPendingPickComplete, pp,
1561 grpc_schedule_on_exec_ctx);
1562 pp->original_on_complete = pick->on_complete;
1563 pick->on_complete = &pp->on_complete;
1564 return pp;
1565 }
1566
AddPendingPick(PendingPick * pp)1567 void GrpcLb::AddPendingPick(PendingPick* pp) {
1568 pp->next = pending_picks_;
1569 pending_picks_ = pp;
1570 }
1571
1572 //
1573 // code for interacting with the RR policy
1574 //
1575
1576 // Performs a pick over \a rr_policy_. Given that a pick can return
1577 // immediately (ignoring its completion callback), we need to perform the
1578 // cleanups this callback would otherwise be responsible for.
1579 // If \a force_async is true, then we will manually schedule the
1580 // completion callback even if the pick is available immediately.
PickFromRoundRobinPolicyLocked(bool force_async,PendingPick * pp,grpc_error ** error)1581 bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
1582 grpc_error** error) {
1583 // Check for drops if we are not using fallback backend addresses.
1584 if (serverlist_ != nullptr) {
1585 // Look at the index into the serverlist to see if we should drop this call.
1586 grpc_grpclb_server* server = serverlist_->servers[serverlist_index_++];
1587 if (serverlist_index_ == serverlist_->num_servers) {
1588 serverlist_index_ = 0; // Wrap-around.
1589 }
1590 if (server->drop) {
1591 // Update client load reporting stats to indicate the number of
1592 // dropped calls. Note that we have to do this here instead of in
1593 // the client_load_reporting filter, because we do not create a
1594 // subchannel call (and therefore no client_load_reporting filter)
1595 // for dropped calls.
1596 if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
1597 lb_calld_->client_stats()->AddCallDroppedLocked(
1598 server->load_balance_token);
1599 }
1600 if (force_async) {
1601 GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
1602 Delete(pp);
1603 return false;
1604 }
1605 Delete(pp);
1606 return true;
1607 }
1608 }
1609 // Set client_stats and user_data.
1610 if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
1611 pp->client_stats = lb_calld_->client_stats()->Ref();
1612 }
1613 GPR_ASSERT(pp->pick->user_data == nullptr);
1614 pp->pick->user_data = (void**)&pp->lb_token;
1615 // Pick via the RR policy.
1616 bool pick_done = rr_policy_->PickLocked(pp->pick, error);
1617 if (pick_done) {
1618 PendingPickSetMetadataAndContext(pp);
1619 if (force_async) {
1620 GRPC_CLOSURE_SCHED(pp->original_on_complete, *error);
1621 *error = GRPC_ERROR_NONE;
1622 pick_done = false;
1623 }
1624 Delete(pp);
1625 }
1626 // else, the pending pick will be registered and taken care of by the
1627 // pending pick list inside the RR policy. Eventually,
1628 // OnPendingPickComplete() will be called, which will (among other
1629 // things) add the LB token to the call's initial metadata.
1630 return pick_done;
1631 }
1632
CreateRoundRobinPolicyLocked(const Args & args)1633 void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) {
1634 GPR_ASSERT(rr_policy_ == nullptr);
1635 rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
1636 "round_robin", args);
1637 if (GPR_UNLIKELY(rr_policy_ == nullptr)) {
1638 gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy",
1639 this);
1640 return;
1641 }
1642 // TODO(roth): We currently track this ref manually. Once the new
1643 // ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
1644 auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested");
1645 self.release();
1646 rr_policy_->SetReresolutionClosureLocked(&on_rr_request_reresolution_);
1647 grpc_error* rr_state_error = nullptr;
1648 rr_connectivity_state_ = rr_policy_->CheckConnectivityLocked(&rr_state_error);
1649 // Connectivity state is a function of the RR policy updated/created.
1650 UpdateConnectivityStateFromRoundRobinPolicyLocked(rr_state_error);
1651 // Add the gRPC LB's interested_parties pollset_set to that of the newly
1652 // created RR policy. This will make the RR policy progress upon activity on
1653 // gRPC LB, which in turn is tied to the application's call.
1654 grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(),
1655 interested_parties());
1656 // Subscribe to changes to the connectivity of the new RR.
1657 // TODO(roth): We currently track this ref manually. Once the new
1658 // ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
1659 self = Ref(DEBUG_LOCATION, "on_rr_connectivity_changed");
1660 self.release();
1661 rr_policy_->NotifyOnStateChangeLocked(&rr_connectivity_state_,
1662 &on_rr_connectivity_changed_);
1663 rr_policy_->ExitIdleLocked();
1664 // Send pending picks to RR policy.
1665 PendingPick* pp;
1666 while ((pp = pending_picks_)) {
1667 pending_picks_ = pp->next;
1668 if (grpc_lb_glb_trace.enabled()) {
1669 gpr_log(GPR_INFO,
1670 "[grpclb %p] Pending pick about to (async) PICK from RR %p", this,
1671 rr_policy_.get());
1672 }
1673 grpc_error* error = GRPC_ERROR_NONE;
1674 PickFromRoundRobinPolicyLocked(true /* force_async */, pp, &error);
1675 }
1676 }
1677
CreateRoundRobinPolicyArgsLocked()1678 grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
1679 grpc_lb_addresses* addresses;
1680 bool is_backend_from_grpclb_load_balancer = false;
1681 if (serverlist_ != nullptr) {
1682 GPR_ASSERT(serverlist_->num_servers > 0);
1683 addresses = ProcessServerlist(serverlist_);
1684 is_backend_from_grpclb_load_balancer = true;
1685 } else {
1686 // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't
1687 // received any serverlist from the balancer, we use the fallback backends
1688 // returned by the resolver. Note that the fallback backend list may be
1689 // empty, in which case the new round_robin policy will keep the requested
1690 // picks pending.
1691 GPR_ASSERT(fallback_backend_addresses_ != nullptr);
1692 addresses = grpc_lb_addresses_copy(fallback_backend_addresses_);
1693 }
1694 GPR_ASSERT(addresses != nullptr);
1695 // Replace the LB addresses in the channel args that we pass down to
1696 // the subchannel.
1697 static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
1698 const grpc_arg args_to_add[] = {
1699 grpc_lb_addresses_create_channel_arg(addresses),
1700 // A channel arg indicating if the target is a backend inferred from a
1701 // grpclb load balancer.
1702 grpc_channel_arg_integer_create(
1703 const_cast<char*>(
1704 GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER),
1705 is_backend_from_grpclb_load_balancer),
1706 };
1707 grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove(
1708 args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add,
1709 GPR_ARRAY_SIZE(args_to_add));
1710 grpc_lb_addresses_destroy(addresses);
1711 return args;
1712 }
1713
CreateOrUpdateRoundRobinPolicyLocked()1714 void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() {
1715 if (shutting_down_) return;
1716 grpc_channel_args* args = CreateRoundRobinPolicyArgsLocked();
1717 GPR_ASSERT(args != nullptr);
1718 if (rr_policy_ != nullptr) {
1719 if (grpc_lb_glb_trace.enabled()) {
1720 gpr_log(GPR_INFO, "[grpclb %p] Updating RR policy %p", this,
1721 rr_policy_.get());
1722 }
1723 rr_policy_->UpdateLocked(*args);
1724 } else {
1725 LoadBalancingPolicy::Args lb_policy_args;
1726 lb_policy_args.combiner = combiner();
1727 lb_policy_args.client_channel_factory = client_channel_factory();
1728 lb_policy_args.args = args;
1729 CreateRoundRobinPolicyLocked(lb_policy_args);
1730 if (grpc_lb_glb_trace.enabled()) {
1731 gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this,
1732 rr_policy_.get());
1733 }
1734 }
1735 grpc_channel_args_destroy(args);
1736 }
1737
OnRoundRobinRequestReresolutionLocked(void * arg,grpc_error * error)1738 void GrpcLb::OnRoundRobinRequestReresolutionLocked(void* arg,
1739 grpc_error* error) {
1740 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1741 if (grpclb_policy->shutting_down_ || error != GRPC_ERROR_NONE) {
1742 grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_reresolution_requested");
1743 return;
1744 }
1745 if (grpc_lb_glb_trace.enabled()) {
1746 gpr_log(
1747 GPR_INFO,
1748 "[grpclb %p] Re-resolution requested from the internal RR policy (%p).",
1749 grpclb_policy, grpclb_policy->rr_policy_.get());
1750 }
1751 // If we are talking to a balancer, we expect to get updated addresses form
1752 // the balancer, so we can ignore the re-resolution request from the RR
1753 // policy. Otherwise, handle the re-resolution request using the
1754 // grpclb policy's original re-resolution closure.
1755 if (grpclb_policy->lb_calld_ == nullptr ||
1756 !grpclb_policy->lb_calld_->seen_initial_response()) {
1757 grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE);
1758 }
1759 // Give back the wrapper closure to the RR policy.
1760 grpclb_policy->rr_policy_->SetReresolutionClosureLocked(
1761 &grpclb_policy->on_rr_request_reresolution_);
1762 }
1763
UpdateConnectivityStateFromRoundRobinPolicyLocked(grpc_error * rr_state_error)1764 void GrpcLb::UpdateConnectivityStateFromRoundRobinPolicyLocked(
1765 grpc_error* rr_state_error) {
1766 const grpc_connectivity_state curr_glb_state =
1767 grpc_connectivity_state_check(&state_tracker_);
1768 /* The new connectivity status is a function of the previous one and the new
1769 * input coming from the status of the RR policy.
1770 *
1771 * current state (grpclb's)
1772 * |
1773 * v || I | C | R | TF | SD | <- new state (RR's)
1774 * ===++====+=====+=====+======+======+
1775 * I || I | C | R | [I] | [I] |
1776 * ---++----+-----+-----+------+------+
1777 * C || I | C | R | [C] | [C] |
1778 * ---++----+-----+-----+------+------+
1779 * R || I | C | R | [R] | [R] |
1780 * ---++----+-----+-----+------+------+
1781 * TF || I | C | R | [TF] | [TF] |
1782 * ---++----+-----+-----+------+------+
1783 * SD || NA | NA | NA | NA | NA | (*)
1784 * ---++----+-----+-----+------+------+
1785 *
1786 * A [STATE] indicates that the old RR policy is kept. In those cases, STATE
1787 * is the current state of grpclb, which is left untouched.
1788 *
1789 * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to
1790 * the previous RR instance.
1791 *
1792 * Note that the status is never updated to SHUTDOWN as a result of calling
1793 * this function. Only glb_shutdown() has the power to set that state.
1794 *
1795 * (*) This function mustn't be called during shutting down. */
1796 GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
1797 switch (rr_connectivity_state_) {
1798 case GRPC_CHANNEL_TRANSIENT_FAILURE:
1799 case GRPC_CHANNEL_SHUTDOWN:
1800 GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
1801 break;
1802 case GRPC_CHANNEL_IDLE:
1803 case GRPC_CHANNEL_CONNECTING:
1804 case GRPC_CHANNEL_READY:
1805 GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
1806 }
1807 if (grpc_lb_glb_trace.enabled()) {
1808 gpr_log(
1809 GPR_INFO,
1810 "[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.",
1811 this, grpc_connectivity_state_name(rr_connectivity_state_),
1812 rr_policy_.get());
1813 }
1814 grpc_connectivity_state_set(&state_tracker_, rr_connectivity_state_,
1815 rr_state_error,
1816 "update_lb_connectivity_status_locked");
1817 }
1818
OnRoundRobinConnectivityChangedLocked(void * arg,grpc_error * error)1819 void GrpcLb::OnRoundRobinConnectivityChangedLocked(void* arg,
1820 grpc_error* error) {
1821 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1822 if (grpclb_policy->shutting_down_) {
1823 grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_connectivity_changed");
1824 return;
1825 }
1826 grpclb_policy->UpdateConnectivityStateFromRoundRobinPolicyLocked(
1827 GRPC_ERROR_REF(error));
1828 // Resubscribe. Reuse the "on_rr_connectivity_changed" ref.
1829 grpclb_policy->rr_policy_->NotifyOnStateChangeLocked(
1830 &grpclb_policy->rr_connectivity_state_,
1831 &grpclb_policy->on_rr_connectivity_changed_);
1832 }
1833
1834 //
1835 // factory
1836 //
1837
1838 class GrpcLbFactory : public LoadBalancingPolicyFactory {
1839 public:
CreateLoadBalancingPolicy(const LoadBalancingPolicy::Args & args) const1840 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1841 const LoadBalancingPolicy::Args& args) const override {
1842 /* Count the number of gRPC-LB addresses. There must be at least one. */
1843 const grpc_arg* arg =
1844 grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES);
1845 if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
1846 return nullptr;
1847 }
1848 grpc_lb_addresses* addresses =
1849 static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
1850 size_t num_grpclb_addrs = 0;
1851 for (size_t i = 0; i < addresses->num_addresses; ++i) {
1852 if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
1853 }
1854 if (num_grpclb_addrs == 0) return nullptr;
1855 return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(addresses, args));
1856 }
1857
name() const1858 const char* name() const override { return "grpclb"; }
1859 };
1860
1861 } // namespace
1862
1863 } // namespace grpc_core
1864
1865 //
1866 // Plugin registration
1867 //
1868
1869 namespace {
1870
1871 // Only add client_load_reporting filter if the grpclb LB policy is used.
maybe_add_client_load_reporting_filter(grpc_channel_stack_builder * builder,void * arg)1872 bool maybe_add_client_load_reporting_filter(grpc_channel_stack_builder* builder,
1873 void* arg) {
1874 const grpc_channel_args* args =
1875 grpc_channel_stack_builder_get_channel_arguments(builder);
1876 const grpc_arg* channel_arg =
1877 grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
1878 if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING &&
1879 strcmp(channel_arg->value.string, "grpclb") == 0) {
1880 return grpc_channel_stack_builder_append_filter(
1881 builder, (const grpc_channel_filter*)arg, nullptr, nullptr);
1882 }
1883 return true;
1884 }
1885
1886 } // namespace
1887
grpc_lb_policy_grpclb_init()1888 void grpc_lb_policy_grpclb_init() {
1889 grpc_core::LoadBalancingPolicyRegistry::Builder::
1890 RegisterLoadBalancingPolicyFactory(
1891 grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
1892 grpc_core::New<grpc_core::GrpcLbFactory>()));
1893 grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
1894 GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
1895 maybe_add_client_load_reporting_filter,
1896 (void*)&grpc_client_load_reporting_filter);
1897 }
1898
grpc_lb_policy_grpclb_shutdown()1899 void grpc_lb_policy_grpclb_shutdown() {}
1900