1 //
2 // Copyright 2020 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 // Implementation of the Route Lookup Service (RLS) LB policy
18 //
19 // The policy queries a route lookup service for the name of the actual service
20 // to use. A child policy that recognizes the name as a field of its
21 // configuration will take further load balancing action on the request.
22
23 #include "src/core/load_balancing/rls/rls.h"
24
25 #include <grpc/byte_buffer.h>
26 #include <grpc/byte_buffer_reader.h>
27 #include <grpc/event_engine/event_engine.h>
28 #include <grpc/grpc.h>
29 #include <grpc/impl/channel_arg_names.h>
30 #include <grpc/impl/connectivity_state.h>
31 #include <grpc/impl/propagation_bits.h>
32 #include <grpc/slice.h>
33 #include <grpc/status.h>
34 #include <grpc/support/json.h>
35 #include <grpc/support/port_platform.h>
36 #include <inttypes.h>
37 #include <stdlib.h>
38 #include <string.h>
39
40 #include <algorithm>
41 #include <deque>
42 #include <list>
43 #include <map>
44 #include <memory>
45 #include <random>
46 #include <set>
47 #include <string>
48 #include <type_traits>
49 #include <unordered_map>
50 #include <utility>
51 #include <vector>
52
53 #include "absl/base/thread_annotations.h"
54 #include "absl/hash/hash.h"
55 #include "absl/log/check.h"
56 #include "absl/log/log.h"
57 #include "absl/random/random.h"
58 #include "absl/status/status.h"
59 #include "absl/status/statusor.h"
60 #include "absl/strings/str_cat.h"
61 #include "absl/strings/str_format.h"
62 #include "absl/strings/str_join.h"
63 #include "absl/strings/string_view.h"
64 #include "absl/types/optional.h"
65 #include "src/core/channelz/channelz.h"
66 #include "src/core/client_channel/client_channel_filter.h"
67 #include "src/core/config/core_configuration.h"
68 #include "src/core/lib/channel/channel_args.h"
69 #include "src/core/lib/debug/trace.h"
70 #include "src/core/lib/iomgr/closure.h"
71 #include "src/core/lib/iomgr/error.h"
72 #include "src/core/lib/iomgr/exec_ctx.h"
73 #include "src/core/lib/iomgr/pollset_set.h"
74 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
75 #include "src/core/lib/slice/slice.h"
76 #include "src/core/lib/slice/slice_internal.h"
77 #include "src/core/lib/surface/call.h"
78 #include "src/core/lib/surface/channel.h"
79 #include "src/core/lib/transport/connectivity_state.h"
80 #include "src/core/lib/transport/error_utils.h"
81 #include "src/core/load_balancing/child_policy_handler.h"
82 #include "src/core/load_balancing/delegating_helper.h"
83 #include "src/core/load_balancing/lb_policy.h"
84 #include "src/core/load_balancing/lb_policy_factory.h"
85 #include "src/core/load_balancing/lb_policy_registry.h"
86 #include "src/core/resolver/endpoint_addresses.h"
87 #include "src/core/resolver/resolver_registry.h"
88 #include "src/core/service_config/service_config_impl.h"
89 #include "src/core/telemetry/metrics.h"
90 #include "src/core/util/backoff.h"
91 #include "src/core/util/debug_location.h"
92 #include "src/core/util/dual_ref_counted.h"
93 #include "src/core/util/json/json.h"
94 #include "src/core/util/json/json_args.h"
95 #include "src/core/util/json/json_object_loader.h"
96 #include "src/core/util/json/json_writer.h"
97 #include "src/core/util/match.h"
98 #include "src/core/util/orphanable.h"
99 #include "src/core/util/ref_counted_ptr.h"
100 #include "src/core/util/status_helper.h"
101 #include "src/core/util/sync.h"
102 #include "src/core/util/time.h"
103 #include "src/core/util/upb_utils.h"
104 #include "src/core/util/uuid_v4.h"
105 #include "src/core/util/validation_errors.h"
106 #include "src/core/util/work_serializer.h"
107 #include "src/proto/grpc/lookup/v1/rls.upb.h"
108 #include "upb/base/string_view.h"
109 #include "upb/mem/arena.hpp"
110
111 using ::grpc_event_engine::experimental::EventEngine;
112
113 namespace grpc_core {
114
115 namespace {
116
117 constexpr absl::string_view kMetricLabelRlsServerTarget =
118 "grpc.lb.rls.server_target";
119 constexpr absl::string_view kMetricLabelRlsInstanceUuid =
120 "grpc.lb.rls.instance_uuid";
121 constexpr absl::string_view kMetricRlsDataPlaneTarget =
122 "grpc.lb.rls.data_plane_target";
123 constexpr absl::string_view kMetricLabelPickResult = "grpc.lb.pick_result";
124
125 const auto kMetricCacheSize =
126 GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
127 "grpc.lb.rls.cache_size", "EXPERIMENTAL. Size of the RLS cache.", "By",
128 false)
129 .Labels(kMetricLabelTarget, kMetricLabelRlsServerTarget,
130 kMetricLabelRlsInstanceUuid)
131 .Build();
132
133 const auto kMetricCacheEntries =
134 GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
135 "grpc.lb.rls.cache_entries",
136 "EXPERIMENTAL. Number of entries in the RLS cache.", "{entry}", false)
137 .Labels(kMetricLabelTarget, kMetricLabelRlsServerTarget,
138 kMetricLabelRlsInstanceUuid)
139 .Build();
140
141 const auto kMetricDefaultTargetPicks =
142 GlobalInstrumentsRegistry::RegisterUInt64Counter(
143 "grpc.lb.rls.default_target_picks",
144 "EXPERIMENTAL. Number of LB picks sent to the default target.",
145 "{pick}", false)
146 .Labels(kMetricLabelTarget, kMetricLabelRlsServerTarget,
147 kMetricRlsDataPlaneTarget, kMetricLabelPickResult)
148 .Build();
149
150 const auto kMetricTargetPicks =
151 GlobalInstrumentsRegistry::RegisterUInt64Counter(
152 "grpc.lb.rls.target_picks",
153 "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that "
154 "if the default target is also returned by the RLS server, RPCs sent "
155 "to that target from the cache will be counted in this metric, not "
156 "in grpc.rls.default_target_picks.",
157 "{pick}", false)
158 .Labels(kMetricLabelTarget, kMetricLabelRlsServerTarget,
159 kMetricRlsDataPlaneTarget, kMetricLabelPickResult)
160 .Build();
161
162 const auto kMetricFailedPicks =
163 GlobalInstrumentsRegistry::RegisterUInt64Counter(
164 "grpc.lb.rls.failed_picks",
165 "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS "
166 "request or the RLS channel being throttled.",
167 "{pick}", false)
168 .Labels(kMetricLabelTarget, kMetricLabelRlsServerTarget)
169 .Build();
170
171 constexpr absl::string_view kRls = "rls_experimental";
172 const char kGrpc[] = "grpc";
173 const char* kRlsRequestPath = "/grpc.lookup.v1.RouteLookupService/RouteLookup";
174 const char* kFakeTargetFieldValue = "fake_target_field_value";
175 const char* kRlsHeaderKey = "x-google-rls-data";
176
177 const Duration kDefaultLookupServiceTimeout = Duration::Seconds(10);
178 const Duration kMaxMaxAge = Duration::Minutes(5);
179 const Duration kMinExpirationTime = Duration::Seconds(5);
180 const Duration kCacheBackoffInitial = Duration::Seconds(1);
181 const double kCacheBackoffMultiplier = 1.6;
182 const double kCacheBackoffJitter = 0.2;
183 const Duration kCacheBackoffMax = Duration::Minutes(2);
184 const Duration kDefaultThrottleWindowSize = Duration::Seconds(30);
185 const double kDefaultThrottleRatioForSuccesses = 2.0;
186 const int kDefaultThrottlePadding = 8;
187 const Duration kCacheCleanupTimerInterval = Duration::Minutes(1);
188 const int64_t kMaxCacheSizeBytes = 5 * 1024 * 1024;
189
190 // Parsed RLS LB policy configuration.
191 class RlsLbConfig final : public LoadBalancingPolicy::Config {
192 public:
193 struct KeyBuilder {
194 std::map<std::string /*key*/, std::vector<std::string /*header*/>>
195 header_keys;
196 std::string host_key;
197 std::string service_key;
198 std::string method_key;
199 std::map<std::string /*key*/, std::string /*value*/> constant_keys;
200 };
201 using KeyBuilderMap = std::unordered_map<std::string /*path*/, KeyBuilder>;
202
203 struct RouteLookupConfig {
204 KeyBuilderMap key_builder_map;
205 std::string lookup_service;
206 Duration lookup_service_timeout = kDefaultLookupServiceTimeout;
207 Duration max_age = kMaxMaxAge;
208 Duration stale_age = kMaxMaxAge;
209 int64_t cache_size_bytes = 0;
210 std::string default_target;
211
212 static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
213 void JsonPostLoad(const Json& json, const JsonArgs& args,
214 ValidationErrors* errors);
215 };
216
217 RlsLbConfig() = default;
218
219 RlsLbConfig(const RlsLbConfig&) = delete;
220 RlsLbConfig& operator=(const RlsLbConfig&) = delete;
221
222 RlsLbConfig(RlsLbConfig&& other) = delete;
223 RlsLbConfig& operator=(RlsLbConfig&& other) = delete;
224
name() const225 absl::string_view name() const override { return kRls; }
226
key_builder_map() const227 const KeyBuilderMap& key_builder_map() const {
228 return route_lookup_config_.key_builder_map;
229 }
lookup_service() const230 const std::string& lookup_service() const {
231 return route_lookup_config_.lookup_service;
232 }
lookup_service_timeout() const233 Duration lookup_service_timeout() const {
234 return route_lookup_config_.lookup_service_timeout;
235 }
max_age() const236 Duration max_age() const { return route_lookup_config_.max_age; }
stale_age() const237 Duration stale_age() const { return route_lookup_config_.stale_age; }
cache_size_bytes() const238 int64_t cache_size_bytes() const {
239 return route_lookup_config_.cache_size_bytes;
240 }
default_target() const241 const std::string& default_target() const {
242 return route_lookup_config_.default_target;
243 }
rls_channel_service_config() const244 const std::string& rls_channel_service_config() const {
245 return rls_channel_service_config_;
246 }
child_policy_config() const247 const Json& child_policy_config() const { return child_policy_config_; }
child_policy_config_target_field_name() const248 const std::string& child_policy_config_target_field_name() const {
249 return child_policy_config_target_field_name_;
250 }
251 RefCountedPtr<LoadBalancingPolicy::Config>
default_child_policy_parsed_config() const252 default_child_policy_parsed_config() const {
253 return default_child_policy_parsed_config_;
254 }
255
256 static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
257 void JsonPostLoad(const Json& json, const JsonArgs&,
258 ValidationErrors* errors);
259
260 private:
261 RouteLookupConfig route_lookup_config_;
262 std::string rls_channel_service_config_;
263 Json child_policy_config_;
264 std::string child_policy_config_target_field_name_;
265 RefCountedPtr<LoadBalancingPolicy::Config>
266 default_child_policy_parsed_config_;
267 };
268
269 // RLS LB policy.
270 class RlsLb final : public LoadBalancingPolicy {
271 public:
272 explicit RlsLb(Args args);
273
name() const274 absl::string_view name() const override { return kRls; }
275 absl::Status UpdateLocked(UpdateArgs args) override;
276 void ExitIdleLocked() override;
277 void ResetBackoffLocked() override;
278
279 private:
280 // Key to access entries in the cache and the request map.
281 struct RequestKey {
282 std::map<std::string, std::string> key_map;
283
operator ==grpc_core::__anon2f58d5c00111::RlsLb::RequestKey284 bool operator==(const RequestKey& rhs) const {
285 return key_map == rhs.key_map;
286 }
287
288 template <typename H>
AbslHashValue(H h,const RequestKey & key)289 friend H AbslHashValue(H h, const RequestKey& key) {
290 std::hash<std::string> string_hasher;
291 for (auto& kv : key.key_map) {
292 h = H::combine(std::move(h), string_hasher(kv.first),
293 string_hasher(kv.second));
294 }
295 return h;
296 }
297
Sizegrpc_core::__anon2f58d5c00111::RlsLb::RequestKey298 size_t Size() const {
299 size_t size = sizeof(RequestKey);
300 for (auto& kv : key_map) {
301 size += kv.first.length() + kv.second.length();
302 }
303 return size;
304 }
305
ToStringgrpc_core::__anon2f58d5c00111::RlsLb::RequestKey306 std::string ToString() const {
307 return absl::StrCat(
308 "{", absl::StrJoin(key_map, ",", absl::PairFormatter("=")), "}");
309 }
310 };
311
312 // Data from an RLS response.
313 struct ResponseInfo {
314 absl::Status status;
315 std::vector<std::string> targets;
316 grpc_event_engine::experimental::Slice header_data;
317
ToStringgrpc_core::__anon2f58d5c00111::RlsLb::ResponseInfo318 std::string ToString() const {
319 return absl::StrFormat("{status=%s, targets=[%s], header_data=\"%s\"}",
320 status.ToString(), absl::StrJoin(targets, ","),
321 header_data.as_string_view());
322 }
323 };
324
325 // Wraps a child policy for a given RLS target.
326 class ChildPolicyWrapper final : public DualRefCounted<ChildPolicyWrapper> {
327 public:
328 ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy, std::string target);
329
target() const330 const std::string& target() const { return target_; }
331
Pick(PickArgs args)332 PickResult Pick(PickArgs args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
333 return picker_->Pick(args);
334 }
335
336 // Updates for the child policy are handled in two phases:
337 // 1. In StartUpdate(), we parse and validate the new child policy
338 // config and store the parsed config.
339 // 2. In MaybeFinishUpdate(), we actually pass the parsed config to the
340 // child policy's UpdateLocked() method.
341 //
342 // The reason we do this is to avoid deadlocks. In StartUpdate(),
343 // if the new config fails to validate, then we need to set
344 // picker_ to an instance that will fail all requests, which
345 // requires holding the lock. However, we cannot call the child
346 // policy's UpdateLocked() method from MaybeFinishUpdate() while
347 // holding the lock, since that would cause a deadlock: the child's
348 // UpdateLocked() will call the helper's UpdateState() method, which
349 // will try to acquire the lock to set picker_. So StartUpdate() is
350 // called while we are still holding the lock, but MaybeFinishUpdate()
351 // is called after releasing it.
352 //
353 // Both methods grab the data they need from the parent object.
354 void StartUpdate(OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete)
355 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
356 absl::Status MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb::mu_);
357
ExitIdleLocked()358 void ExitIdleLocked() {
359 if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
360 }
361
ResetBackoffLocked()362 void ResetBackoffLocked() {
363 if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
364 }
365
366 // Gets the connectivity state of the child policy. Once the child policy
367 // reports TRANSIENT_FAILURE, the function will always return
368 // TRANSIENT_FAILURE state instead of the actual state of the child policy
369 // until the child policy reports another READY state.
connectivity_state() const370 grpc_connectivity_state connectivity_state() const
371 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
372 return connectivity_state_;
373 }
374
375 private:
376 // ChannelControlHelper object that allows the child policy to update state
377 // with the wrapper.
378 class ChildPolicyHelper final : public DelegatingChannelControlHelper {
379 public:
ChildPolicyHelper(WeakRefCountedPtr<ChildPolicyWrapper> wrapper)380 explicit ChildPolicyHelper(WeakRefCountedPtr<ChildPolicyWrapper> wrapper)
381 : wrapper_(std::move(wrapper)) {}
~ChildPolicyHelper()382 ~ChildPolicyHelper() override {
383 wrapper_.reset(DEBUG_LOCATION, "ChildPolicyHelper");
384 }
385
386 void UpdateState(grpc_connectivity_state state,
387 const absl::Status& status,
388 RefCountedPtr<SubchannelPicker> picker) override;
389
390 private:
parent_helper() const391 ChannelControlHelper* parent_helper() const override {
392 return wrapper_->lb_policy_->channel_control_helper();
393 }
394
395 WeakRefCountedPtr<ChildPolicyWrapper> wrapper_;
396 };
397
398 // Note: We are forced to disable lock analysis here because
399 // Orphaned() is called by Unref() which is called by RefCountedPtr<>, which
400 // cannot have lock annotations for this particular caller.
401 void Orphaned() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
402
403 RefCountedPtr<RlsLb> lb_policy_;
404 std::string target_;
405
406 bool is_shutdown_ = false; // Protected by WorkSerializer
407
408 OrphanablePtr<ChildPolicyHandler> child_policy_;
409 RefCountedPtr<LoadBalancingPolicy::Config> pending_config_;
410
411 grpc_connectivity_state connectivity_state_ ABSL_GUARDED_BY(&RlsLb::mu_) =
412 GRPC_CHANNEL_CONNECTING;
413 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_
414 ABSL_GUARDED_BY(&RlsLb::mu_);
415 };
416
417 // A picker that uses the cache and the request map in the LB policy
418 // (synchronized via a mutex) to determine how to route requests.
419 class Picker final : public LoadBalancingPolicy::SubchannelPicker {
420 public:
421 explicit Picker(RefCountedPtr<RlsLb> lb_policy);
422
423 PickResult Pick(PickArgs args) override;
424
425 private:
426 PickResult PickFromDefaultTargetOrFail(const char* reason, PickArgs args,
427 absl::Status status)
428 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
429
430 RefCountedPtr<RlsLb> lb_policy_;
431 RefCountedPtr<RlsLbConfig> config_;
432 RefCountedPtr<ChildPolicyWrapper> default_child_policy_;
433 };
434
435 // An LRU cache with adjustable size.
436 class Cache final {
437 public:
438 using Iterator = std::list<RequestKey>::iterator;
439
440 class Entry final : public InternallyRefCounted<Entry> {
441 public:
442 Entry(RefCountedPtr<RlsLb> lb_policy, const RequestKey& key);
443
444 // Notify the entry when it's evicted from the cache. Performs shut down.
445 // Note: We are forced to disable lock analysis here because
446 // Orphan() is called by OrphanablePtr<>, which cannot have lock
447 // annotations for this particular caller.
448 void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
449
status() const450 const absl::Status& status() const
451 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
452 return status_;
453 }
backoff_time() const454 Timestamp backoff_time() const
455 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
456 return backoff_time_;
457 }
backoff_expiration_time() const458 Timestamp backoff_expiration_time() const
459 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
460 return backoff_expiration_time_;
461 }
data_expiration_time() const462 Timestamp data_expiration_time() const
463 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
464 return data_expiration_time_;
465 }
header_data() const466 const grpc_event_engine::experimental::Slice& header_data() const
467 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
468 return header_data_;
469 }
stale_time() const470 Timestamp stale_time() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
471 return stale_time_;
472 }
min_expiration_time() const473 Timestamp min_expiration_time() const
474 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
475 return min_expiration_time_;
476 }
477
TakeBackoffState()478 std::unique_ptr<BackOff> TakeBackoffState()
479 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
480 return std::move(backoff_state_);
481 }
482
483 // Cache size of entry.
484 size_t Size() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
485
486 // Pick subchannel for request based on the entry's state.
487 PickResult Pick(PickArgs args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
488
489 // If the cache entry is in backoff state, resets the backoff and, if
490 // applicable, its backoff timer. The method does not update the LB
491 // policy's picker; the caller is responsible for that if necessary.
492 void ResetBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
493
494 // Check if the entry should be removed by the clean-up timer.
495 bool ShouldRemove() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
496
497 // Check if the entry can be evicted from the cache, i.e. the
498 // min_expiration_time_ has passed.
499 bool CanEvict() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
500
501 // Updates the entry upon reception of a new RLS response.
502 // Returns a list of child policy wrappers on which FinishUpdate()
503 // needs to be called after releasing the lock.
504 std::vector<ChildPolicyWrapper*> OnRlsResponseLocked(
505 ResponseInfo response, std::unique_ptr<BackOff> backoff_state,
506 OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete)
507 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
508
509 // Moves entry to the end of the LRU list.
510 void MarkUsed() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
511
512 // Takes entries from child_policy_wrappers_ and appends them to the end
513 // of \a child_policy_wrappers.
TakeChildPolicyWrappers(std::vector<RefCountedPtr<ChildPolicyWrapper>> * child_policy_wrappers)514 void TakeChildPolicyWrappers(
515 std::vector<RefCountedPtr<ChildPolicyWrapper>>* child_policy_wrappers)
516 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
517 child_policy_wrappers->insert(
518 child_policy_wrappers->end(),
519 std::make_move_iterator(child_policy_wrappers_.begin()),
520 std::make_move_iterator(child_policy_wrappers_.end()));
521 child_policy_wrappers_.clear();
522 }
523
524 private:
525 class BackoffTimer final : public InternallyRefCounted<BackoffTimer> {
526 public:
527 BackoffTimer(RefCountedPtr<Entry> entry, Duration delay);
528
529 // Note: We are forced to disable lock analysis here because
530 // Orphan() is called by OrphanablePtr<>, which cannot have lock
531 // annotations for this particular caller.
532 void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
533
534 private:
535 void OnBackoffTimerLocked();
536
537 RefCountedPtr<Entry> entry_;
538 absl::optional<EventEngine::TaskHandle> backoff_timer_task_handle_
539 ABSL_GUARDED_BY(&RlsLb::mu_);
540 };
541
542 RefCountedPtr<RlsLb> lb_policy_;
543
544 bool is_shutdown_ ABSL_GUARDED_BY(&RlsLb::mu_) = false;
545
546 // Backoff states
547 absl::Status status_ ABSL_GUARDED_BY(&RlsLb::mu_);
548 std::unique_ptr<BackOff> backoff_state_ ABSL_GUARDED_BY(&RlsLb::mu_);
549 Timestamp backoff_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
550 Timestamp::InfPast();
551 Timestamp backoff_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
552 Timestamp::InfPast();
553 OrphanablePtr<BackoffTimer> backoff_timer_;
554
555 // RLS response states
556 std::vector<RefCountedPtr<ChildPolicyWrapper>> child_policy_wrappers_
557 ABSL_GUARDED_BY(&RlsLb::mu_);
558 grpc_event_engine::experimental::Slice header_data_
559 ABSL_GUARDED_BY(&RlsLb::mu_);
560 Timestamp data_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
561 Timestamp::InfPast();
562 Timestamp stale_time_ ABSL_GUARDED_BY(&RlsLb::mu_) = Timestamp::InfPast();
563
564 Timestamp min_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_);
565 Cache::Iterator lru_iterator_ ABSL_GUARDED_BY(&RlsLb::mu_);
566 };
567
568 explicit Cache(RlsLb* lb_policy);
569
570 // Finds an entry from the cache that corresponds to a key. If an entry is
571 // not found, nullptr is returned. Otherwise, the entry is considered
572 // recently used and its order in the LRU list of the cache is updated.
573 Entry* Find(const RequestKey& key)
574 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
575
576 // Finds an entry from the cache that corresponds to a key. If an entry is
577 // not found, an entry is created, inserted in the cache, and returned to
578 // the caller. Otherwise, the entry found is returned to the caller. The
579 // entry returned to the user is considered recently used and its order in
580 // the LRU list of the cache is updated.
581 Entry* FindOrInsert(const RequestKey& key,
582 std::vector<RefCountedPtr<ChildPolicyWrapper>>*
583 child_policy_wrappers_to_delete)
584 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
585
586 // Resizes the cache. If the new cache size is greater than the current size
587 // of the cache, do nothing. Otherwise, evict the oldest entries that
588 // exceed the new size limit of the cache.
589 void Resize(size_t bytes, std::vector<RefCountedPtr<ChildPolicyWrapper>>*
590 child_policy_wrappers_to_delete)
591 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
592
593 // Resets backoff of all the cache entries.
594 void ResetAllBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
595
596 // Shutdown the cache; clean-up and orphan all the stored cache entries.
597 GRPC_MUST_USE_RESULT std::vector<RefCountedPtr<ChildPolicyWrapper>>
598 Shutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
599
600 void ReportMetricsLocked(CallbackMetricReporter& reporter)
601 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
602
603 private:
604 // Shared logic for starting the cleanup timer
605 void StartCleanupTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
606
607 void OnCleanupTimer();
608
609 // Returns the entry size for a given key.
610 static size_t EntrySizeForKey(const RequestKey& key);
611
612 // Evicts oversized cache elements when the current size is greater than
613 // the specified limit.
614 void MaybeShrinkSize(size_t bytes,
615 std::vector<RefCountedPtr<ChildPolicyWrapper>>*
616 child_policy_wrappers_to_delete)
617 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
618
619 RlsLb* lb_policy_;
620
621 size_t size_limit_ ABSL_GUARDED_BY(&RlsLb::mu_) = 0;
622 size_t size_ ABSL_GUARDED_BY(&RlsLb::mu_) = 0;
623
624 std::list<RequestKey> lru_list_ ABSL_GUARDED_BY(&RlsLb::mu_);
625 std::unordered_map<RequestKey, OrphanablePtr<Entry>, absl::Hash<RequestKey>>
626 map_ ABSL_GUARDED_BY(&RlsLb::mu_);
627 absl::optional<EventEngine::TaskHandle> cleanup_timer_handle_;
628 };
629
630 // Channel for communicating with the RLS server.
631 // Contains throttling logic for RLS requests.
632 class RlsChannel final : public InternallyRefCounted<RlsChannel> {
633 public:
634 explicit RlsChannel(RefCountedPtr<RlsLb> lb_policy);
635
636 // Shuts down the channel.
637 void Orphan() override;
638
639 // Starts an RLS call.
640 // If stale_entry is non-null, it points to the entry containing
641 // stale data for the key.
642 void StartRlsCall(const RequestKey& key, Cache::Entry* stale_entry)
643 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
644
645 // Reports the result of an RLS call to the throttle.
646 void ReportResponseLocked(bool response_succeeded)
647 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
648
649 // Checks if a proposed RLS call should be throttled.
ShouldThrottle()650 bool ShouldThrottle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
651 return throttle_.ShouldThrottle();
652 }
653
654 // Resets the channel's backoff.
655 void ResetBackoff();
656
channel() const657 Channel* channel() const { return channel_.get(); }
658
659 private:
660 // Watches the state of the RLS channel. Notifies the LB policy when
661 // the channel was previously in TRANSIENT_FAILURE and then becomes READY.
662 class StateWatcher final : public AsyncConnectivityStateWatcherInterface {
663 public:
StateWatcher(RefCountedPtr<RlsChannel> rls_channel)664 explicit StateWatcher(RefCountedPtr<RlsChannel> rls_channel)
665 : AsyncConnectivityStateWatcherInterface(
666 rls_channel->lb_policy_->work_serializer()),
667 rls_channel_(std::move(rls_channel)) {}
668
669 private:
670 void OnConnectivityStateChange(grpc_connectivity_state new_state,
671 const absl::Status& status) override;
672
673 RefCountedPtr<RlsChannel> rls_channel_;
674 bool was_transient_failure_ = false;
675 };
676
677 // Throttle state for RLS requests.
678 class Throttle final {
679 public:
Throttle(Duration window_size=kDefaultThrottleWindowSize,float ratio_for_successes=kDefaultThrottleRatioForSuccesses,int padding=kDefaultThrottlePadding)680 explicit Throttle(
681 Duration window_size = kDefaultThrottleWindowSize,
682 float ratio_for_successes = kDefaultThrottleRatioForSuccesses,
683 int padding = kDefaultThrottlePadding)
684 : window_size_(window_size),
685 ratio_for_successes_(ratio_for_successes),
686 padding_(padding) {}
687
688 bool ShouldThrottle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
689
690 void RegisterResponse(bool success)
691 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
692
693 private:
694 Duration window_size_;
695 double ratio_for_successes_;
696 int padding_;
697 std::mt19937 rng_{std::random_device()()};
698
699 // Logged timestamp of requests.
700 std::deque<Timestamp> requests_ ABSL_GUARDED_BY(&RlsLb::mu_);
701
702 // Logged timestamps of failures.
703 std::deque<Timestamp> failures_ ABSL_GUARDED_BY(&RlsLb::mu_);
704 };
705
706 RefCountedPtr<RlsLb> lb_policy_;
707 bool is_shutdown_ = false;
708
709 RefCountedPtr<Channel> channel_;
710 RefCountedPtr<channelz::ChannelNode> parent_channelz_node_;
711 StateWatcher* watcher_ = nullptr;
712 Throttle throttle_ ABSL_GUARDED_BY(&RlsLb::mu_);
713 };
714
715 // A pending RLS request. Instances will be tracked in request_map_.
716 class RlsRequest final : public InternallyRefCounted<RlsRequest> {
717 public:
718 // Asynchronously starts a call on rls_channel for key.
719 // Stores backoff_state, which will be transferred to the data cache
720 // if the RLS request fails.
721 RlsRequest(RefCountedPtr<RlsLb> lb_policy, RlsLb::RequestKey key,
722 RefCountedPtr<RlsChannel> rls_channel,
723 std::unique_ptr<BackOff> backoff_state,
724 grpc_lookup_v1_RouteLookupRequest_Reason reason,
725 grpc_event_engine::experimental::Slice stale_header_data);
726 ~RlsRequest() override;
727
728 // Shuts down the request. If the request is still in flight, it is
729 // cancelled, in which case no response will be added to the cache.
730 void Orphan() override;
731
732 private:
733 // Callback to be invoked to start the call.
734 static void StartCall(void* arg, grpc_error_handle error);
735
736 // Helper for StartCall() that runs within the WorkSerializer.
737 void StartCallLocked();
738
739 // Callback to be invoked when the call is completed.
740 static void OnRlsCallComplete(void* arg, grpc_error_handle error);
741
742 // Call completion callback running on LB policy WorkSerializer.
743 void OnRlsCallCompleteLocked(grpc_error_handle error);
744
745 grpc_byte_buffer* MakeRequestProto();
746 ResponseInfo ParseResponseProto();
747
748 RefCountedPtr<RlsLb> lb_policy_;
749 RlsLb::RequestKey key_;
750 RefCountedPtr<RlsChannel> rls_channel_;
751 std::unique_ptr<BackOff> backoff_state_;
752 grpc_lookup_v1_RouteLookupRequest_Reason reason_;
753 grpc_event_engine::experimental::Slice stale_header_data_;
754
755 // RLS call state.
756 Timestamp deadline_;
757 grpc_closure call_start_cb_;
758 grpc_closure call_complete_cb_;
759 grpc_call* call_ = nullptr;
760 grpc_byte_buffer* send_message_ = nullptr;
761 grpc_metadata_array recv_initial_metadata_;
762 grpc_byte_buffer* recv_message_ = nullptr;
763 grpc_metadata_array recv_trailing_metadata_;
764 grpc_status_code status_recv_;
765 grpc_slice status_details_recv_;
766 };
767
768 void ShutdownLocked() override;
769
770 // Returns a new picker to the channel to trigger reprocessing of
771 // pending picks. Schedules the actual picker update on the ExecCtx
772 // to be run later, so it's safe to invoke this while holding the lock.
773 void UpdatePickerAsync();
774 // Hops into work serializer and calls UpdatePickerLocked().
775 static void UpdatePickerCallback(void* arg, grpc_error_handle error);
776 // Updates the picker in the work serializer.
777 void UpdatePickerLocked() ABSL_LOCKS_EXCLUDED(&mu_);
778
779 template <typename HandleType>
780 void MaybeExportPickCount(HandleType handle, absl::string_view target,
781 const PickResult& pick_result);
782
783 const std::string instance_uuid_;
784
785 // Mutex to guard LB policy state that is accessed by the picker.
786 Mutex mu_;
787 bool is_shutdown_ ABSL_GUARDED_BY(mu_) = false;
788 bool update_in_progress_ = false;
789 Cache cache_ ABSL_GUARDED_BY(mu_);
790 // Maps an RLS request key to an RlsRequest object that represents a pending
791 // RLS request.
792 std::unordered_map<RequestKey, OrphanablePtr<RlsRequest>,
793 absl::Hash<RequestKey>>
794 request_map_ ABSL_GUARDED_BY(mu_);
795 // The channel on which RLS requests are sent.
796 // Note that this channel may be swapped out when the RLS policy gets
797 // an update. However, when that happens, any existing entries in
798 // request_map_ will continue to use the previous channel.
799 OrphanablePtr<RlsChannel> rls_channel_ ABSL_GUARDED_BY(mu_);
800
801 // Accessed only from within WorkSerializer.
802 absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses_;
803 ChannelArgs channel_args_;
804 RefCountedPtr<RlsLbConfig> config_;
805 RefCountedPtr<ChildPolicyWrapper> default_child_policy_;
806 std::map<std::string /*target*/, ChildPolicyWrapper*> child_policy_map_;
807
808 // Must be after mu_, so that it is destroyed before mu_.
809 std::unique_ptr<RegisteredMetricCallback> registered_metric_callback_;
810 };
811
812 //
813 // RlsLb::ChildPolicyWrapper
814 //
815
ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy,std::string target)816 RlsLb::ChildPolicyWrapper::ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy,
817 std::string target)
818 : DualRefCounted<ChildPolicyWrapper>(
819 GRPC_TRACE_FLAG_ENABLED(rls_lb) ? "ChildPolicyWrapper" : nullptr),
820 lb_policy_(std::move(lb_policy)),
821 target_(std::move(target)),
822 picker_(MakeRefCounted<QueuePicker>(nullptr)) {
823 lb_policy_->child_policy_map_.emplace(target_, this);
824 }
825
Orphaned()826 void RlsLb::ChildPolicyWrapper::Orphaned() {
827 GRPC_TRACE_LOG(rls_lb, INFO)
828 << "[rlslb " << lb_policy_.get() << "] ChildPolicyWrapper=" << this
829 << " [" << target_ << "]: shutdown";
830 is_shutdown_ = true;
831 lb_policy_->child_policy_map_.erase(target_);
832 if (child_policy_ != nullptr) {
833 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
834 lb_policy_->interested_parties());
835 child_policy_.reset();
836 }
837 picker_.reset();
838 }
839
InsertOrUpdateChildPolicyField(const std::string & field,const std::string & value,const Json & config,ValidationErrors * errors)840 absl::optional<Json> InsertOrUpdateChildPolicyField(const std::string& field,
841 const std::string& value,
842 const Json& config,
843 ValidationErrors* errors) {
844 if (config.type() != Json::Type::kArray) {
845 errors->AddError("is not an array");
846 return absl::nullopt;
847 }
848 const size_t original_num_errors = errors->size();
849 Json::Array array;
850 for (size_t i = 0; i < config.array().size(); ++i) {
851 const Json& child_json = config.array()[i];
852 ValidationErrors::ScopedField json_field(errors, absl::StrCat("[", i, "]"));
853 if (child_json.type() != Json::Type::kObject) {
854 errors->AddError("is not an object");
855 } else {
856 const Json::Object& child = child_json.object();
857 if (child.size() != 1) {
858 errors->AddError("child policy object contains more than one field");
859 } else {
860 const std::string& child_name = child.begin()->first;
861 ValidationErrors::ScopedField json_field(
862 errors, absl::StrCat("[\"", child_name, "\"]"));
863 const Json& child_config_json = child.begin()->second;
864 if (child_config_json.type() != Json::Type::kObject) {
865 errors->AddError("child policy config is not an object");
866 } else {
867 Json::Object child_config = child_config_json.object();
868 child_config[field] = Json::FromString(value);
869 array.emplace_back(Json::FromObject(
870 {{child_name, Json::FromObject(std::move(child_config))}}));
871 }
872 }
873 }
874 }
875 if (errors->size() != original_num_errors) return absl::nullopt;
876 return Json::FromArray(std::move(array));
877 }
878
StartUpdate(OrphanablePtr<ChildPolicyHandler> * child_policy_to_delete)879 void RlsLb::ChildPolicyWrapper::StartUpdate(
880 OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete) {
881 ValidationErrors errors;
882 auto child_policy_config = InsertOrUpdateChildPolicyField(
883 lb_policy_->config_->child_policy_config_target_field_name(), target_,
884 lb_policy_->config_->child_policy_config(), &errors);
885 CHECK(child_policy_config.has_value());
886 GRPC_TRACE_LOG(rls_lb, INFO)
887 << "[rlslb " << lb_policy_.get() << "] ChildPolicyWrapper=" << this
888 << " [" << target_
889 << "]: validating update, config: " << JsonDump(*child_policy_config);
890 auto config =
891 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
892 *child_policy_config);
893 // Returned RLS target fails the validation.
894 if (!config.ok()) {
895 GRPC_TRACE_LOG(rls_lb, INFO)
896 << "[rlslb " << lb_policy_.get() << "] ChildPolicyWrapper=" << this
897 << " [" << target_ << "]: config failed to parse: " << config.status();
898 pending_config_.reset();
899 picker_ = MakeRefCounted<TransientFailurePicker>(
900 absl::UnavailableError(config.status().message()));
901 *child_policy_to_delete = std::move(child_policy_);
902 } else {
903 pending_config_ = std::move(*config);
904 }
905 }
906
MaybeFinishUpdate()907 absl::Status RlsLb::ChildPolicyWrapper::MaybeFinishUpdate() {
908 // If pending_config_ is not set, that means StartUpdate() failed, so
909 // there's nothing to do here.
910 if (pending_config_ == nullptr) return absl::OkStatus();
911 // If child policy doesn't yet exist, create it.
912 if (child_policy_ == nullptr) {
913 Args create_args;
914 create_args.work_serializer = lb_policy_->work_serializer();
915 create_args.channel_control_helper = std::make_unique<ChildPolicyHelper>(
916 WeakRef(DEBUG_LOCATION, "ChildPolicyHelper"));
917 create_args.args = lb_policy_->channel_args_;
918 child_policy_ = MakeOrphanable<ChildPolicyHandler>(std::move(create_args),
919 &rls_lb_trace);
920 GRPC_TRACE_LOG(rls_lb, INFO)
921 << "[rlslb " << lb_policy_.get() << "] ChildPolicyWrapper=" << this
922 << " [" << target_ << "], created new child policy handler "
923 << child_policy_.get();
924 grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
925 lb_policy_->interested_parties());
926 }
927 // Send the child the updated config.
928 GRPC_TRACE_LOG(rls_lb, INFO)
929 << "[rlslb " << lb_policy_.get() << "] ChildPolicyWrapper=" << this
930 << " [" << target_ << "], updating child policy handler "
931 << child_policy_.get();
932 UpdateArgs update_args;
933 update_args.config = std::move(pending_config_);
934 update_args.addresses = lb_policy_->addresses_;
935 update_args.args = lb_policy_->channel_args_;
936 return child_policy_->UpdateLocked(std::move(update_args));
937 }
938
939 //
940 // RlsLb::ChildPolicyWrapper::ChildPolicyHelper
941 //
942
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)943 void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState(
944 grpc_connectivity_state state, const absl::Status& status,
945 RefCountedPtr<SubchannelPicker> picker) {
946 GRPC_TRACE_LOG(rls_lb, INFO)
947 << "[rlslb " << wrapper_->lb_policy_.get()
948 << "] ChildPolicyWrapper=" << wrapper_.get() << " [" << wrapper_->target_
949 << "] ChildPolicyHelper=" << this
950 << ": UpdateState(state=" << ConnectivityStateName(state)
951 << ", status=" << status << ", picker=" << picker.get() << ")";
952 if (wrapper_->is_shutdown_) return;
953 {
954 MutexLock lock(&wrapper_->lb_policy_->mu_);
955 // TODO(roth): It looks like this ignores subsequent TF updates that
956 // might change the status used to fail picks, which seems wrong.
957 if (wrapper_->connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
958 state != GRPC_CHANNEL_READY) {
959 return;
960 }
961 wrapper_->connectivity_state_ = state;
962 DCHECK(picker != nullptr);
963 if (picker != nullptr) {
964 // We want to unref the picker after we release the lock.
965 wrapper_->picker_.swap(picker);
966 }
967 }
968 wrapper_->lb_policy_->UpdatePickerLocked();
969 }
970
971 //
972 // RlsLb::Picker
973 //
974
975 // Builds the key to be used for a request based on path and initial_metadata.
BuildKeyMap(const RlsLbConfig::KeyBuilderMap & key_builder_map,absl::string_view path,absl::string_view host,const LoadBalancingPolicy::MetadataInterface * initial_metadata)976 std::map<std::string, std::string> BuildKeyMap(
977 const RlsLbConfig::KeyBuilderMap& key_builder_map, absl::string_view path,
978 absl::string_view host,
979 const LoadBalancingPolicy::MetadataInterface* initial_metadata) {
980 size_t last_slash_pos = path.npos; // May need this a few times, so cache it.
981 // Find key builder for this path.
982 auto it = key_builder_map.find(std::string(path));
983 if (it == key_builder_map.end()) {
984 // Didn't find exact match, try method wildcard.
985 last_slash_pos = path.rfind('/');
986 DCHECK(last_slash_pos != path.npos);
987 if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
988 std::string service(path.substr(0, last_slash_pos + 1));
989 it = key_builder_map.find(service);
990 if (it == key_builder_map.end()) return {};
991 }
992 const RlsLbConfig::KeyBuilder* key_builder = &it->second;
993 // Construct key map using key builder.
994 std::map<std::string, std::string> key_map;
995 // Add header keys.
996 for (const auto& p : key_builder->header_keys) {
997 const std::string& key = p.first;
998 const std::vector<std::string>& header_names = p.second;
999 for (const std::string& header_name : header_names) {
1000 std::string buffer;
1001 absl::optional<absl::string_view> value =
1002 initial_metadata->Lookup(header_name, &buffer);
1003 if (value.has_value()) {
1004 key_map[key] = std::string(*value);
1005 break;
1006 }
1007 }
1008 }
1009 // Add constant keys.
1010 key_map.insert(key_builder->constant_keys.begin(),
1011 key_builder->constant_keys.end());
1012 // Add host key.
1013 if (!key_builder->host_key.empty()) {
1014 key_map[key_builder->host_key] = std::string(host);
1015 }
1016 // Add service key.
1017 if (!key_builder->service_key.empty()) {
1018 if (last_slash_pos == path.npos) {
1019 last_slash_pos = path.rfind('/');
1020 DCHECK(last_slash_pos != path.npos);
1021 if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
1022 }
1023 key_map[key_builder->service_key] =
1024 std::string(path.substr(1, last_slash_pos - 1));
1025 }
1026 // Add method key.
1027 if (!key_builder->method_key.empty()) {
1028 if (last_slash_pos == path.npos) {
1029 last_slash_pos = path.rfind('/');
1030 DCHECK(last_slash_pos != path.npos);
1031 if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
1032 }
1033 key_map[key_builder->method_key] =
1034 std::string(path.substr(last_slash_pos + 1));
1035 }
1036 return key_map;
1037 }
1038
Picker(RefCountedPtr<RlsLb> lb_policy)1039 RlsLb::Picker::Picker(RefCountedPtr<RlsLb> lb_policy)
1040 : lb_policy_(std::move(lb_policy)), config_(lb_policy_->config_) {
1041 if (lb_policy_->default_child_policy_ != nullptr) {
1042 default_child_policy_ =
1043 lb_policy_->default_child_policy_->Ref(DEBUG_LOCATION, "Picker");
1044 }
1045 }
1046
Pick(PickArgs args)1047 LoadBalancingPolicy::PickResult RlsLb::Picker::Pick(PickArgs args) {
1048 // Construct key for request.
1049 RequestKey key = {
1050 BuildKeyMap(config_->key_builder_map(), args.path,
1051 lb_policy_->channel_control_helper()->GetAuthority(),
1052 args.initial_metadata)};
1053 GRPC_TRACE_LOG(rls_lb, INFO)
1054 << "[rlslb " << lb_policy_.get() << "] picker=" << this
1055 << ": request keys: " << key.ToString();
1056 Timestamp now = Timestamp::Now();
1057 MutexLock lock(&lb_policy_->mu_);
1058 if (lb_policy_->is_shutdown_) {
1059 return PickResult::Fail(
1060 absl::UnavailableError("LB policy already shut down"));
1061 }
1062 // Check if there's a cache entry.
1063 Cache::Entry* entry = lb_policy_->cache_.Find(key);
1064 // If there is no cache entry, or if the cache entry is not in backoff
1065 // and has a stale time in the past, and there is not already a
1066 // pending RLS request for this key, then try to start a new RLS request.
1067 if ((entry == nullptr ||
1068 (entry->stale_time() < now && entry->backoff_time() < now)) &&
1069 lb_policy_->request_map_.find(key) == lb_policy_->request_map_.end()) {
1070 // Check if requests are being throttled.
1071 if (lb_policy_->rls_channel_->ShouldThrottle()) {
1072 // Request is throttled.
1073 // If there is no non-expired data in the cache, then we use the
1074 // default target if set, or else we fail the pick.
1075 if (entry == nullptr || entry->data_expiration_time() < now) {
1076 return PickFromDefaultTargetOrFail(
1077 "RLS call throttled", args,
1078 absl::UnavailableError("RLS request throttled"));
1079 }
1080 }
1081 // Start the RLS call.
1082 lb_policy_->rls_channel_->StartRlsCall(
1083 key, (entry == nullptr || entry->data_expiration_time() < now) ? nullptr
1084 : entry);
1085 }
1086 // If the cache entry exists, see if it has usable data.
1087 if (entry != nullptr) {
1088 // If the entry has non-expired data, use it.
1089 if (entry->data_expiration_time() >= now) {
1090 GRPC_TRACE_LOG(rls_lb, INFO)
1091 << "[rlslb " << lb_policy_.get() << "] picker=" << this
1092 << ": using cache entry " << entry;
1093 return entry->Pick(args);
1094 }
1095 // If the entry is in backoff, then use the default target if set,
1096 // or else fail the pick.
1097 if (entry->backoff_time() >= now) {
1098 return PickFromDefaultTargetOrFail(
1099 "RLS call in backoff", args,
1100 absl::UnavailableError(absl::StrCat("RLS request failed: ",
1101 entry->status().ToString())));
1102 }
1103 }
1104 // RLS call pending. Queue the pick.
1105 GRPC_TRACE_LOG(rls_lb, INFO)
1106 << "[rlslb " << lb_policy_.get() << "] picker=" << this
1107 << ": RLS request pending; queuing pick";
1108 return PickResult::Queue();
1109 }
1110
PickFromDefaultTargetOrFail(const char * reason,PickArgs args,absl::Status status)1111 LoadBalancingPolicy::PickResult RlsLb::Picker::PickFromDefaultTargetOrFail(
1112 const char* reason, PickArgs args, absl::Status status) {
1113 if (default_child_policy_ != nullptr) {
1114 GRPC_TRACE_LOG(rls_lb, INFO)
1115 << "[rlslb " << lb_policy_.get() << "] picker=" << this << ": "
1116 << reason << "; using default target";
1117 auto pick_result = default_child_policy_->Pick(args);
1118 lb_policy_->MaybeExportPickCount(kMetricDefaultTargetPicks,
1119 config_->default_target(), pick_result);
1120 return pick_result;
1121 }
1122 GRPC_TRACE_LOG(rls_lb, INFO)
1123 << "[rlslb " << lb_policy_.get() << "] picker=" << this << ": " << reason
1124 << "; failing pick";
1125 auto& stats_plugins =
1126 lb_policy_->channel_control_helper()->GetStatsPluginGroup();
1127 stats_plugins.AddCounter(kMetricFailedPicks, 1,
1128 {lb_policy_->channel_control_helper()->GetTarget(),
1129 config_->lookup_service()},
1130 {});
1131 return PickResult::Fail(std::move(status));
1132 }
1133
1134 //
1135 // RlsLb::Cache::Entry::BackoffTimer
1136 //
1137
BackoffTimer(RefCountedPtr<Entry> entry,Duration delay)1138 RlsLb::Cache::Entry::BackoffTimer::BackoffTimer(RefCountedPtr<Entry> entry,
1139 Duration delay)
1140 : entry_(std::move(entry)) {
1141 backoff_timer_task_handle_ =
1142 entry_->lb_policy_->channel_control_helper()->GetEventEngine()->RunAfter(
1143 delay, [self = Ref(DEBUG_LOCATION, "BackoffTimer")]() mutable {
1144 ApplicationCallbackExecCtx callback_exec_ctx;
1145 ExecCtx exec_ctx;
1146 auto self_ptr = self.get();
1147 self_ptr->entry_->lb_policy_->work_serializer()->Run(
1148 [self = std::move(self)]() { self->OnBackoffTimerLocked(); },
1149 DEBUG_LOCATION);
1150 });
1151 }
1152
Orphan()1153 void RlsLb::Cache::Entry::BackoffTimer::Orphan() {
1154 if (backoff_timer_task_handle_.has_value() &&
1155 entry_->lb_policy_->channel_control_helper()->GetEventEngine()->Cancel(
1156 *backoff_timer_task_handle_)) {
1157 GRPC_TRACE_LOG(rls_lb, INFO)
1158 << "[rlslb " << entry_->lb_policy_.get()
1159 << "] cache entry=" << entry_.get() << " "
1160 << (entry_->is_shutdown_ ? "(shut down)"
1161 : entry_->lru_iterator_->ToString())
1162 << ", backoff timer canceled";
1163 }
1164 backoff_timer_task_handle_.reset();
1165 Unref(DEBUG_LOCATION, "Orphan");
1166 }
1167
OnBackoffTimerLocked()1168 void RlsLb::Cache::Entry::BackoffTimer::OnBackoffTimerLocked() {
1169 {
1170 MutexLock lock(&entry_->lb_policy_->mu_);
1171 GRPC_TRACE_LOG(rls_lb, INFO)
1172 << "[rlslb " << entry_->lb_policy_.get()
1173 << "] cache entry=" << entry_.get() << " "
1174 << (entry_->is_shutdown_ ? "(shut down)"
1175 : entry_->lru_iterator_->ToString())
1176 << ", backoff timer fired";
1177 // Skip the update if Orphaned
1178 if (!backoff_timer_task_handle_.has_value()) return;
1179 backoff_timer_task_handle_.reset();
1180 }
1181 // The pick was in backoff state and there could be a pick queued if
1182 // wait_for_ready is true. We'll update the picker for that case.
1183 entry_->lb_policy_->UpdatePickerLocked();
1184 }
1185
1186 //
1187 // RlsLb::Cache::Entry
1188 //
1189
MakeCacheEntryBackoff()1190 std::unique_ptr<BackOff> MakeCacheEntryBackoff() {
1191 return std::make_unique<BackOff>(
1192 BackOff::Options()
1193 .set_initial_backoff(kCacheBackoffInitial)
1194 .set_multiplier(kCacheBackoffMultiplier)
1195 .set_jitter(kCacheBackoffJitter)
1196 .set_max_backoff(kCacheBackoffMax));
1197 }
1198
Entry(RefCountedPtr<RlsLb> lb_policy,const RequestKey & key)1199 RlsLb::Cache::Entry::Entry(RefCountedPtr<RlsLb> lb_policy,
1200 const RequestKey& key)
1201 : InternallyRefCounted<Entry>(GRPC_TRACE_FLAG_ENABLED(rls_lb) ? "CacheEntry"
1202 : nullptr),
1203 lb_policy_(std::move(lb_policy)),
1204 backoff_state_(MakeCacheEntryBackoff()),
1205 min_expiration_time_(Timestamp::Now() + kMinExpirationTime),
1206 lru_iterator_(lb_policy_->cache_.lru_list_.insert(
1207 lb_policy_->cache_.lru_list_.end(), key)) {}
1208
Orphan()1209 void RlsLb::Cache::Entry::Orphan() {
1210 // We should be holding RlsLB::mu_.
1211 GRPC_TRACE_LOG(rls_lb, INFO)
1212 << "[rlslb " << lb_policy_.get() << "] cache entry=" << this << " "
1213 << lru_iterator_->ToString() << ": cache entry evicted";
1214 is_shutdown_ = true;
1215 lb_policy_->cache_.lru_list_.erase(lru_iterator_);
1216 lru_iterator_ = lb_policy_->cache_.lru_list_.end(); // Just in case.
1217 CHECK(child_policy_wrappers_.empty());
1218 backoff_state_.reset();
1219 if (backoff_timer_ != nullptr) {
1220 backoff_timer_.reset();
1221 lb_policy_->UpdatePickerAsync();
1222 }
1223 Unref(DEBUG_LOCATION, "Orphan");
1224 }
1225
Size() const1226 size_t RlsLb::Cache::Entry::Size() const {
1227 // lru_iterator_ is not valid once we're shut down.
1228 CHECK(!is_shutdown_);
1229 return lb_policy_->cache_.EntrySizeForKey(*lru_iterator_);
1230 }
1231
Pick(PickArgs args)1232 LoadBalancingPolicy::PickResult RlsLb::Cache::Entry::Pick(PickArgs args) {
1233 size_t i = 0;
1234 ChildPolicyWrapper* child_policy_wrapper = nullptr;
1235 // Skip targets before the last one that are in state TRANSIENT_FAILURE.
1236 for (; i < child_policy_wrappers_.size(); ++i) {
1237 child_policy_wrapper = child_policy_wrappers_[i].get();
1238 if (child_policy_wrapper->connectivity_state() ==
1239 GRPC_CHANNEL_TRANSIENT_FAILURE &&
1240 i < child_policy_wrappers_.size() - 1) {
1241 GRPC_TRACE_LOG(rls_lb, INFO)
1242 << "[rlslb " << lb_policy_.get() << "] cache entry=" << this << " "
1243 << lru_iterator_->ToString() << ": target "
1244 << child_policy_wrapper->target() << " (" << i << " of "
1245 << child_policy_wrappers_.size()
1246 << ") in state TRANSIENT_FAILURE; skipping";
1247 continue;
1248 }
1249 break;
1250 }
1251 // Child policy not in TRANSIENT_FAILURE or is the last target in
1252 // the list, so delegate.
1253 GRPC_TRACE_LOG(rls_lb, INFO)
1254 << "[rlslb " << lb_policy_.get() << "] cache entry=" << this << " "
1255 << lru_iterator_->ToString() << ": target "
1256 << child_policy_wrapper->target() << " (" << i << " of "
1257 << child_policy_wrappers_.size() << ") in state "
1258 << ConnectivityStateName(child_policy_wrapper->connectivity_state())
1259 << "; delegating";
1260 auto pick_result = child_policy_wrapper->Pick(args);
1261 lb_policy_->MaybeExportPickCount(kMetricTargetPicks,
1262 child_policy_wrapper->target(), pick_result);
1263 // Add header data.
1264 if (!header_data_.empty()) {
1265 auto* complete_pick =
1266 absl::get_if<PickResult::Complete>(&pick_result.result);
1267 if (complete_pick != nullptr) {
1268 complete_pick->metadata_mutations.Set(kRlsHeaderKey, header_data_.Ref());
1269 }
1270 }
1271 return pick_result;
1272 }
1273
ResetBackoff()1274 void RlsLb::Cache::Entry::ResetBackoff() {
1275 backoff_time_ = Timestamp::InfPast();
1276 backoff_timer_.reset();
1277 }
1278
ShouldRemove() const1279 bool RlsLb::Cache::Entry::ShouldRemove() const {
1280 Timestamp now = Timestamp::Now();
1281 return data_expiration_time_ < now && backoff_expiration_time_ < now;
1282 }
1283
CanEvict() const1284 bool RlsLb::Cache::Entry::CanEvict() const {
1285 Timestamp now = Timestamp::Now();
1286 return min_expiration_time_ < now;
1287 }
1288
MarkUsed()1289 void RlsLb::Cache::Entry::MarkUsed() {
1290 auto& lru_list = lb_policy_->cache_.lru_list_;
1291 auto new_it = lru_list.insert(lru_list.end(), *lru_iterator_);
1292 lru_list.erase(lru_iterator_);
1293 lru_iterator_ = new_it;
1294 }
1295
1296 std::vector<RlsLb::ChildPolicyWrapper*>
OnRlsResponseLocked(ResponseInfo response,std::unique_ptr<BackOff> backoff_state,OrphanablePtr<ChildPolicyHandler> * child_policy_to_delete)1297 RlsLb::Cache::Entry::OnRlsResponseLocked(
1298 ResponseInfo response, std::unique_ptr<BackOff> backoff_state,
1299 OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete) {
1300 // Move the entry to the end of the LRU list.
1301 MarkUsed();
1302 // If the request failed, store the failed status and update the
1303 // backoff state.
1304 if (!response.status.ok()) {
1305 status_ = response.status;
1306 if (backoff_state != nullptr) {
1307 backoff_state_ = std::move(backoff_state);
1308 } else {
1309 backoff_state_ = MakeCacheEntryBackoff();
1310 }
1311 const Duration delay = backoff_state_->NextAttemptDelay();
1312 const Timestamp now = Timestamp::Now();
1313 backoff_time_ = now + delay;
1314 backoff_expiration_time_ = now + delay * 2;
1315 backoff_timer_ = MakeOrphanable<BackoffTimer>(
1316 Ref(DEBUG_LOCATION, "BackoffTimer"), delay);
1317 lb_policy_->UpdatePickerAsync();
1318 return {};
1319 }
1320 // Request succeeded, so store the result.
1321 header_data_ = std::move(response.header_data);
1322 Timestamp now = Timestamp::Now();
1323 data_expiration_time_ = now + lb_policy_->config_->max_age();
1324 stale_time_ = now + lb_policy_->config_->stale_age();
1325 status_ = absl::OkStatus();
1326 backoff_state_.reset();
1327 backoff_time_ = Timestamp::InfPast();
1328 backoff_expiration_time_ = Timestamp::InfPast();
1329 // Check if we need to update this list of targets.
1330 bool targets_changed = [&]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
1331 if (child_policy_wrappers_.size() != response.targets.size()) return true;
1332 for (size_t i = 0; i < response.targets.size(); ++i) {
1333 if (child_policy_wrappers_[i]->target() != response.targets[i]) {
1334 return true;
1335 }
1336 }
1337 return false;
1338 }();
1339 if (!targets_changed) {
1340 // Targets didn't change, so we're not updating the list of child
1341 // policies. Return a new picker so that any queued requests can be
1342 // re-processed.
1343 lb_policy_->UpdatePickerAsync();
1344 return {};
1345 }
1346 // Target list changed, so update it.
1347 std::set<absl::string_view> old_targets;
1348 for (RefCountedPtr<ChildPolicyWrapper>& child_policy_wrapper :
1349 child_policy_wrappers_) {
1350 old_targets.emplace(child_policy_wrapper->target());
1351 }
1352 bool update_picker = false;
1353 std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
1354 std::vector<RefCountedPtr<ChildPolicyWrapper>> new_child_policy_wrappers;
1355 new_child_policy_wrappers.reserve(response.targets.size());
1356 for (std::string& target : response.targets) {
1357 auto it = lb_policy_->child_policy_map_.find(target);
1358 if (it == lb_policy_->child_policy_map_.end()) {
1359 auto new_child = MakeRefCounted<ChildPolicyWrapper>(
1360 lb_policy_.Ref(DEBUG_LOCATION, "ChildPolicyWrapper"), target);
1361 new_child->StartUpdate(child_policy_to_delete);
1362 child_policies_to_finish_update.push_back(new_child.get());
1363 new_child_policy_wrappers.emplace_back(std::move(new_child));
1364 } else {
1365 new_child_policy_wrappers.emplace_back(
1366 it->second->Ref(DEBUG_LOCATION, "CacheEntry"));
1367 // If the target already existed but was not previously used for
1368 // this key, then we'll need to update the picker, since we
1369 // didn't actually create a new child policy, which would have
1370 // triggered an RLS picker update when it returned its first picker.
1371 if (old_targets.find(target) == old_targets.end()) {
1372 update_picker = true;
1373 }
1374 }
1375 }
1376 child_policy_wrappers_ = std::move(new_child_policy_wrappers);
1377 if (update_picker) {
1378 lb_policy_->UpdatePickerAsync();
1379 }
1380 return child_policies_to_finish_update;
1381 }
1382
1383 //
1384 // RlsLb::Cache
1385 //
1386
Cache(RlsLb * lb_policy)1387 RlsLb::Cache::Cache(RlsLb* lb_policy) : lb_policy_(lb_policy) {
1388 StartCleanupTimer();
1389 }
1390
Find(const RequestKey & key)1391 RlsLb::Cache::Entry* RlsLb::Cache::Find(const RequestKey& key) {
1392 auto it = map_.find(key);
1393 if (it == map_.end()) return nullptr;
1394 it->second->MarkUsed();
1395 return it->second.get();
1396 }
1397
FindOrInsert(const RequestKey & key,std::vector<RefCountedPtr<ChildPolicyWrapper>> * child_policy_wrappers_to_delete)1398 RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(
1399 const RequestKey& key, std::vector<RefCountedPtr<ChildPolicyWrapper>>*
1400 child_policy_wrappers_to_delete) {
1401 auto it = map_.find(key);
1402 // If not found, create new entry.
1403 if (it == map_.end()) {
1404 size_t entry_size = EntrySizeForKey(key);
1405 MaybeShrinkSize(size_limit_ - std::min(size_limit_, entry_size),
1406 child_policy_wrappers_to_delete);
1407 Entry* entry = new Entry(
1408 lb_policy_->RefAsSubclass<RlsLb>(DEBUG_LOCATION, "CacheEntry"), key);
1409 map_.emplace(key, OrphanablePtr<Entry>(entry));
1410 size_ += entry_size;
1411 GRPC_TRACE_LOG(rls_lb, INFO)
1412 << "[rlslb " << lb_policy_ << "] key=" << key.ToString()
1413 << ": cache entry added, entry=" << entry;
1414 return entry;
1415 }
1416 // Entry found, so use it.
1417 GRPC_TRACE_LOG(rls_lb, INFO)
1418 << "[rlslb " << lb_policy_ << "] key=" << key.ToString()
1419 << ": found cache entry " << it->second.get();
1420 it->second->MarkUsed();
1421 return it->second.get();
1422 }
1423
Resize(size_t bytes,std::vector<RefCountedPtr<ChildPolicyWrapper>> * child_policy_wrappers_to_delete)1424 void RlsLb::Cache::Resize(size_t bytes,
1425 std::vector<RefCountedPtr<ChildPolicyWrapper>>*
1426 child_policy_wrappers_to_delete) {
1427 GRPC_TRACE_LOG(rls_lb, INFO)
1428 << "[rlslb " << lb_policy_ << "] resizing cache to " << bytes << " bytes";
1429 size_limit_ = bytes;
1430 MaybeShrinkSize(size_limit_, child_policy_wrappers_to_delete);
1431 }
1432
ResetAllBackoff()1433 void RlsLb::Cache::ResetAllBackoff() {
1434 for (auto& p : map_) {
1435 p.second->ResetBackoff();
1436 }
1437 lb_policy_->UpdatePickerAsync();
1438 }
1439
Shutdown()1440 std::vector<RefCountedPtr<RlsLb::ChildPolicyWrapper>> RlsLb::Cache::Shutdown() {
1441 std::vector<RefCountedPtr<ChildPolicyWrapper>>
1442 child_policy_wrappers_to_delete;
1443 for (auto& entry : map_) {
1444 entry.second->TakeChildPolicyWrappers(&child_policy_wrappers_to_delete);
1445 }
1446 map_.clear();
1447 lru_list_.clear();
1448 if (cleanup_timer_handle_.has_value() &&
1449 lb_policy_->channel_control_helper()->GetEventEngine()->Cancel(
1450 *cleanup_timer_handle_)) {
1451 GRPC_TRACE_LOG(rls_lb, INFO)
1452 << "[rlslb " << lb_policy_ << "] cache cleanup timer canceled";
1453 }
1454 cleanup_timer_handle_.reset();
1455 return child_policy_wrappers_to_delete;
1456 }
1457
ReportMetricsLocked(CallbackMetricReporter & reporter)1458 void RlsLb::Cache::ReportMetricsLocked(CallbackMetricReporter& reporter) {
1459 reporter.Report(
1460 kMetricCacheSize, size_,
1461 {lb_policy_->channel_control_helper()->GetTarget(),
1462 lb_policy_->config_->lookup_service(), lb_policy_->instance_uuid_},
1463 {});
1464 reporter.Report(
1465 kMetricCacheEntries, map_.size(),
1466 {lb_policy_->channel_control_helper()->GetTarget(),
1467 lb_policy_->config_->lookup_service(), lb_policy_->instance_uuid_},
1468 {});
1469 }
1470
StartCleanupTimer()1471 void RlsLb::Cache::StartCleanupTimer() {
1472 cleanup_timer_handle_ =
1473 lb_policy_->channel_control_helper()->GetEventEngine()->RunAfter(
1474 kCacheCleanupTimerInterval,
1475 [this, lb_policy = lb_policy_->Ref(DEBUG_LOCATION,
1476 "CacheCleanupTimer")]() mutable {
1477 ApplicationCallbackExecCtx callback_exec_ctx;
1478 ExecCtx exec_ctx;
1479 lb_policy_->work_serializer()->Run(
1480 [this, lb_policy = std::move(lb_policy)]() {
1481 // The lb_policy ref is held until the callback completes
1482 OnCleanupTimer();
1483 },
1484 DEBUG_LOCATION);
1485 });
1486 }
1487
OnCleanupTimer()1488 void RlsLb::Cache::OnCleanupTimer() {
1489 GRPC_TRACE_LOG(rls_lb, INFO)
1490 << "[rlslb " << lb_policy_ << "] cache cleanup timer fired";
1491 std::vector<RefCountedPtr<ChildPolicyWrapper>>
1492 child_policy_wrappers_to_delete;
1493 MutexLock lock(&lb_policy_->mu_);
1494 if (!cleanup_timer_handle_.has_value()) return;
1495 if (lb_policy_->is_shutdown_) return;
1496 for (auto it = map_.begin(); it != map_.end();) {
1497 if (GPR_UNLIKELY(it->second->ShouldRemove() && it->second->CanEvict())) {
1498 size_ -= it->second->Size();
1499 it->second->TakeChildPolicyWrappers(&child_policy_wrappers_to_delete);
1500 it = map_.erase(it);
1501 } else {
1502 ++it;
1503 }
1504 }
1505 StartCleanupTimer();
1506 }
1507
EntrySizeForKey(const RequestKey & key)1508 size_t RlsLb::Cache::EntrySizeForKey(const RequestKey& key) {
1509 // Key is stored twice, once in LRU list and again in the cache map.
1510 return (key.Size() * 2) + sizeof(Entry);
1511 }
1512
MaybeShrinkSize(size_t bytes,std::vector<RefCountedPtr<ChildPolicyWrapper>> * child_policy_wrappers_to_delete)1513 void RlsLb::Cache::MaybeShrinkSize(
1514 size_t bytes, std::vector<RefCountedPtr<ChildPolicyWrapper>>*
1515 child_policy_wrappers_to_delete) {
1516 while (size_ > bytes) {
1517 auto lru_it = lru_list_.begin();
1518 if (GPR_UNLIKELY(lru_it == lru_list_.end())) break;
1519 auto map_it = map_.find(*lru_it);
1520 CHECK(map_it != map_.end());
1521 if (!map_it->second->CanEvict()) break;
1522 GRPC_TRACE_LOG(rls_lb, INFO)
1523 << "[rlslb " << lb_policy_ << "] LRU eviction: removing entry "
1524 << map_it->second.get() << " " << lru_it->ToString();
1525 size_ -= map_it->second->Size();
1526 map_it->second->TakeChildPolicyWrappers(child_policy_wrappers_to_delete);
1527 map_.erase(map_it);
1528 }
1529 GRPC_TRACE_LOG(rls_lb, INFO)
1530 << "[rlslb " << lb_policy_
1531 << "] LRU pass complete: desired size=" << bytes << " size=" << size_;
1532 }
1533
1534 //
1535 // RlsLb::RlsChannel::StateWatcher
1536 //
1537
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)1538 void RlsLb::RlsChannel::StateWatcher::OnConnectivityStateChange(
1539 grpc_connectivity_state new_state, const absl::Status& status) {
1540 auto* lb_policy = rls_channel_->lb_policy_.get();
1541 GRPC_TRACE_LOG(rls_lb, INFO)
1542 << "[rlslb " << lb_policy << "] RlsChannel=" << rls_channel_.get()
1543 << " StateWatcher=" << this << ": state changed to "
1544 << ConnectivityStateName(new_state) << " (" << status << ")";
1545 if (rls_channel_->is_shutdown_) return;
1546 MutexLock lock(&lb_policy->mu_);
1547 if (new_state == GRPC_CHANNEL_READY && was_transient_failure_) {
1548 was_transient_failure_ = false;
1549 // Reset the backoff of all cache entries, so that we don't
1550 // double-penalize if an RLS request fails while the channel is
1551 // down, since the throttling for the channel being down is handled
1552 // at the channel level instead of in the individual cache entries.
1553 lb_policy->cache_.ResetAllBackoff();
1554 } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
1555 was_transient_failure_ = true;
1556 }
1557 }
1558
1559 //
1560 // RlsLb::RlsChannel::Throttle
1561 //
1562
ShouldThrottle()1563 bool RlsLb::RlsChannel::Throttle::ShouldThrottle() {
1564 Timestamp now = Timestamp::Now();
1565 while (!requests_.empty() && now - requests_.front() > window_size_) {
1566 requests_.pop_front();
1567 }
1568 while (!failures_.empty() && now - failures_.front() > window_size_) {
1569 failures_.pop_front();
1570 }
1571 // Compute probability of throttling.
1572 float num_requests = requests_.size();
1573 float num_successes = num_requests - failures_.size();
1574 // Note: it's possible that this ratio will be negative, in which case
1575 // no throttling will be done.
1576 float throttle_probability =
1577 (num_requests - (num_successes * ratio_for_successes_)) /
1578 (num_requests + padding_);
1579 // Generate a random number for the request.
1580 std::uniform_real_distribution<float> dist(0, 1.0);
1581 // Check if we should throttle the request.
1582 bool throttle = dist(rng_) < throttle_probability;
1583 // If we're throttling, record the request and the failure.
1584 if (throttle) {
1585 requests_.push_back(now);
1586 failures_.push_back(now);
1587 }
1588 return throttle;
1589 }
1590
RegisterResponse(bool success)1591 void RlsLb::RlsChannel::Throttle::RegisterResponse(bool success) {
1592 Timestamp now = Timestamp::Now();
1593 requests_.push_back(now);
1594 if (!success) failures_.push_back(now);
1595 }
1596
1597 //
1598 // RlsLb::RlsChannel
1599 //
1600
RlsChannel(RefCountedPtr<RlsLb> lb_policy)1601 RlsLb::RlsChannel::RlsChannel(RefCountedPtr<RlsLb> lb_policy)
1602 : InternallyRefCounted<RlsChannel>(
1603 GRPC_TRACE_FLAG_ENABLED(rls_lb) ? "RlsChannel" : nullptr),
1604 lb_policy_(std::move(lb_policy)) {
1605 // Get channel creds from parent channel.
1606 // Note that we are using the "unsafe" channel creds here, which do
1607 // include any associated call creds. This is safe in this case,
1608 // because we are using the parent channel's authority on the RLS channel.
1609 auto creds =
1610 lb_policy_->channel_control_helper()->GetUnsafeChannelCredentials();
1611 // Use the parent channel's authority.
1612 auto authority = lb_policy_->channel_control_helper()->GetAuthority();
1613 ChannelArgs args = ChannelArgs()
1614 .Set(GRPC_ARG_DEFAULT_AUTHORITY, authority)
1615 .Set(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL, 1);
1616 // Propagate fake security connector expected targets, if any.
1617 // (This is ugly, but it seems better than propagating all channel args
1618 // from the parent channel by default and then having a giant
1619 // exclude list of args to strip out, like we do in grpclb.)
1620 absl::optional<absl::string_view> fake_security_expected_targets =
1621 lb_policy_->channel_args_.GetString(
1622 GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS);
1623 if (fake_security_expected_targets.has_value()) {
1624 args = args.Set(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS,
1625 *fake_security_expected_targets);
1626 }
1627 // Add service config args if needed.
1628 const std::string& service_config =
1629 lb_policy_->config_->rls_channel_service_config();
1630 if (!service_config.empty()) {
1631 args = args.Set(GRPC_ARG_SERVICE_CONFIG, service_config)
1632 .Set(GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION, 1);
1633 }
1634 channel_.reset(Channel::FromC(
1635 grpc_channel_create(lb_policy_->config_->lookup_service().c_str(),
1636 creds.get(), args.ToC().get())));
1637 GRPC_TRACE_LOG(rls_lb, INFO)
1638 << "[rlslb " << lb_policy_.get() << "] RlsChannel=" << this
1639 << ": created channel " << channel_.get() << " for "
1640 << lb_policy_->config_->lookup_service();
1641 if (channel_ != nullptr) {
1642 // Set up channelz linkage.
1643 channelz::ChannelNode* child_channelz_node = channel_->channelz_node();
1644 auto parent_channelz_node =
1645 lb_policy_->channel_args_.GetObjectRef<channelz::ChannelNode>();
1646 if (child_channelz_node != nullptr && parent_channelz_node != nullptr) {
1647 parent_channelz_node->AddChildChannel(child_channelz_node->uuid());
1648 parent_channelz_node_ = std::move(parent_channelz_node);
1649 }
1650 // Start connectivity watch.
1651 watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher"));
1652 channel_->AddConnectivityWatcher(
1653 GRPC_CHANNEL_IDLE,
1654 OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
1655 }
1656 }
1657
Orphan()1658 void RlsLb::RlsChannel::Orphan() {
1659 GRPC_TRACE_LOG(rls_lb, INFO)
1660 << "[rlslb " << lb_policy_.get() << "] RlsChannel=" << this
1661 << ", channel=" << channel_.get() << ": shutdown";
1662 is_shutdown_ = true;
1663 if (channel_ != nullptr) {
1664 // Remove channelz linkage.
1665 if (parent_channelz_node_ != nullptr) {
1666 channelz::ChannelNode* child_channelz_node = channel_->channelz_node();
1667 CHECK_NE(child_channelz_node, nullptr);
1668 parent_channelz_node_->RemoveChildChannel(child_channelz_node->uuid());
1669 }
1670 // Stop connectivity watch.
1671 if (watcher_ != nullptr) {
1672 channel_->RemoveConnectivityWatcher(watcher_);
1673 watcher_ = nullptr;
1674 }
1675 channel_.reset();
1676 }
1677 Unref(DEBUG_LOCATION, "Orphan");
1678 }
1679
StartRlsCall(const RequestKey & key,Cache::Entry * stale_entry)1680 void RlsLb::RlsChannel::StartRlsCall(const RequestKey& key,
1681 Cache::Entry* stale_entry) {
1682 std::unique_ptr<BackOff> backoff_state;
1683 grpc_lookup_v1_RouteLookupRequest_Reason reason =
1684 grpc_lookup_v1_RouteLookupRequest_REASON_MISS;
1685 grpc_event_engine::experimental::Slice stale_header_data;
1686 if (stale_entry != nullptr) {
1687 backoff_state = stale_entry->TakeBackoffState();
1688 reason = grpc_lookup_v1_RouteLookupRequest_REASON_STALE;
1689 stale_header_data = stale_entry->header_data().Ref();
1690 }
1691 lb_policy_->request_map_.emplace(
1692 key, MakeOrphanable<RlsRequest>(
1693 lb_policy_.Ref(DEBUG_LOCATION, "RlsRequest"), key,
1694 lb_policy_->rls_channel_->Ref(DEBUG_LOCATION, "RlsRequest"),
1695 std::move(backoff_state), reason, std::move(stale_header_data)));
1696 }
1697
ReportResponseLocked(bool response_succeeded)1698 void RlsLb::RlsChannel::ReportResponseLocked(bool response_succeeded) {
1699 throttle_.RegisterResponse(response_succeeded);
1700 }
1701
ResetBackoff()1702 void RlsLb::RlsChannel::ResetBackoff() {
1703 DCHECK(channel_ != nullptr);
1704 channel_->ResetConnectionBackoff();
1705 }
1706
1707 //
1708 // RlsLb::RlsRequest
1709 //
1710
RlsRequest(RefCountedPtr<RlsLb> lb_policy,RequestKey key,RefCountedPtr<RlsChannel> rls_channel,std::unique_ptr<BackOff> backoff_state,grpc_lookup_v1_RouteLookupRequest_Reason reason,grpc_event_engine::experimental::Slice stale_header_data)1711 RlsLb::RlsRequest::RlsRequest(
1712 RefCountedPtr<RlsLb> lb_policy, RequestKey key,
1713 RefCountedPtr<RlsChannel> rls_channel,
1714 std::unique_ptr<BackOff> backoff_state,
1715 grpc_lookup_v1_RouteLookupRequest_Reason reason,
1716 grpc_event_engine::experimental::Slice stale_header_data)
1717 : InternallyRefCounted<RlsRequest>(
1718 GRPC_TRACE_FLAG_ENABLED(rls_lb) ? "RlsRequest" : nullptr),
1719 lb_policy_(std::move(lb_policy)),
1720 key_(std::move(key)),
1721 rls_channel_(std::move(rls_channel)),
1722 backoff_state_(std::move(backoff_state)),
1723 reason_(reason),
1724 stale_header_data_(std::move(stale_header_data)) {
1725 GRPC_TRACE_LOG(rls_lb, INFO)
1726 << "[rlslb " << lb_policy_.get() << "] rls_request=" << this
1727 << ": RLS request created for key " << key_.ToString();
1728 GRPC_CLOSURE_INIT(&call_complete_cb_, OnRlsCallComplete, this, nullptr);
1729 ExecCtx::Run(
1730 DEBUG_LOCATION,
1731 GRPC_CLOSURE_INIT(&call_start_cb_, StartCall,
1732 Ref(DEBUG_LOCATION, "StartCall").release(), nullptr),
1733 absl::OkStatus());
1734 }
1735
~RlsRequest()1736 RlsLb::RlsRequest::~RlsRequest() { CHECK_EQ(call_, nullptr); }
1737
Orphan()1738 void RlsLb::RlsRequest::Orphan() {
1739 if (call_ != nullptr) {
1740 GRPC_TRACE_LOG(rls_lb, INFO)
1741 << "[rlslb " << lb_policy_.get() << "] rls_request=" << this << " "
1742 << key_.ToString() << ": cancelling RLS call";
1743 grpc_call_cancel_internal(call_);
1744 }
1745 Unref(DEBUG_LOCATION, "Orphan");
1746 }
1747
StartCall(void * arg,grpc_error_handle)1748 void RlsLb::RlsRequest::StartCall(void* arg, grpc_error_handle /*error*/) {
1749 auto* request = static_cast<RlsRequest*>(arg);
1750 request->lb_policy_->work_serializer()->Run(
1751 [request]() {
1752 request->StartCallLocked();
1753 request->Unref(DEBUG_LOCATION, "StartCall");
1754 },
1755 DEBUG_LOCATION);
1756 }
1757
StartCallLocked()1758 void RlsLb::RlsRequest::StartCallLocked() {
1759 {
1760 MutexLock lock(&lb_policy_->mu_);
1761 if (lb_policy_->is_shutdown_) return;
1762 }
1763 Timestamp now = Timestamp::Now();
1764 deadline_ = now + lb_policy_->config_->lookup_service_timeout();
1765 grpc_metadata_array_init(&recv_initial_metadata_);
1766 grpc_metadata_array_init(&recv_trailing_metadata_);
1767 call_ = rls_channel_->channel()->CreateCall(
1768 /*parent_call=*/nullptr, GRPC_PROPAGATE_DEFAULTS, /*cq=*/nullptr,
1769 lb_policy_->interested_parties(),
1770 Slice::FromStaticString(kRlsRequestPath), /*authority=*/absl::nullopt,
1771 deadline_, /*registered_method=*/true);
1772 grpc_op ops[6];
1773 memset(ops, 0, sizeof(ops));
1774 grpc_op* op = ops;
1775 op->op = GRPC_OP_SEND_INITIAL_METADATA;
1776 ++op;
1777 op->op = GRPC_OP_SEND_MESSAGE;
1778 send_message_ = MakeRequestProto();
1779 op->data.send_message.send_message = send_message_;
1780 ++op;
1781 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
1782 ++op;
1783 op->op = GRPC_OP_RECV_INITIAL_METADATA;
1784 op->data.recv_initial_metadata.recv_initial_metadata =
1785 &recv_initial_metadata_;
1786 ++op;
1787 op->op = GRPC_OP_RECV_MESSAGE;
1788 op->data.recv_message.recv_message = &recv_message_;
1789 ++op;
1790 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1791 op->data.recv_status_on_client.trailing_metadata = &recv_trailing_metadata_;
1792 op->data.recv_status_on_client.status = &status_recv_;
1793 op->data.recv_status_on_client.status_details = &status_details_recv_;
1794 ++op;
1795 Ref(DEBUG_LOCATION, "OnRlsCallComplete").release();
1796 auto call_error = grpc_call_start_batch_and_execute(
1797 call_, ops, static_cast<size_t>(op - ops), &call_complete_cb_);
1798 CHECK_EQ(call_error, GRPC_CALL_OK);
1799 }
1800
OnRlsCallComplete(void * arg,grpc_error_handle error)1801 void RlsLb::RlsRequest::OnRlsCallComplete(void* arg, grpc_error_handle error) {
1802 auto* request = static_cast<RlsRequest*>(arg);
1803 request->lb_policy_->work_serializer()->Run(
1804 [request, error]() {
1805 request->OnRlsCallCompleteLocked(error);
1806 request->Unref(DEBUG_LOCATION, "OnRlsCallComplete");
1807 },
1808 DEBUG_LOCATION);
1809 }
1810
OnRlsCallCompleteLocked(grpc_error_handle error)1811 void RlsLb::RlsRequest::OnRlsCallCompleteLocked(grpc_error_handle error) {
1812 if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) {
1813 std::string status_message(StringViewFromSlice(status_details_recv_));
1814 LOG(INFO) << "[rlslb " << lb_policy_.get() << "] rls_request=" << this
1815 << " " << key_.ToString() << ", error=" << StatusToString(error)
1816 << ", status={" << status_recv_ << ", " << status_message << "}"
1817 << " RLS call response received";
1818 }
1819 // Parse response.
1820 ResponseInfo response;
1821 if (!error.ok()) {
1822 grpc_status_code code;
1823 std::string message;
1824 grpc_error_get_status(error, deadline_, &code, &message,
1825 /*http_error=*/nullptr, /*error_string=*/nullptr);
1826 response.status =
1827 absl::Status(static_cast<absl::StatusCode>(code), message);
1828 } else if (status_recv_ != GRPC_STATUS_OK) {
1829 response.status = absl::Status(static_cast<absl::StatusCode>(status_recv_),
1830 StringViewFromSlice(status_details_recv_));
1831 } else {
1832 response = ParseResponseProto();
1833 }
1834 // Clean up call state.
1835 grpc_byte_buffer_destroy(send_message_);
1836 grpc_byte_buffer_destroy(recv_message_);
1837 grpc_metadata_array_destroy(&recv_initial_metadata_);
1838 grpc_metadata_array_destroy(&recv_trailing_metadata_);
1839 CSliceUnref(status_details_recv_);
1840 grpc_call_unref(call_);
1841 call_ = nullptr;
1842 // Return result to cache.
1843 GRPC_TRACE_LOG(rls_lb, INFO)
1844 << "[rlslb " << lb_policy_.get() << "] rls_request=" << this << " "
1845 << key_.ToString() << ": response info: " << response.ToString();
1846 std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
1847 std::vector<RefCountedPtr<ChildPolicyWrapper>>
1848 child_policy_wrappers_to_delete;
1849 OrphanablePtr<ChildPolicyHandler> child_policy_to_delete;
1850 {
1851 MutexLock lock(&lb_policy_->mu_);
1852 if (lb_policy_->is_shutdown_) return;
1853 rls_channel_->ReportResponseLocked(response.status.ok());
1854 Cache::Entry* cache_entry =
1855 lb_policy_->cache_.FindOrInsert(key_, &child_policy_wrappers_to_delete);
1856 child_policies_to_finish_update = cache_entry->OnRlsResponseLocked(
1857 std::move(response), std::move(backoff_state_),
1858 &child_policy_to_delete);
1859 lb_policy_->request_map_.erase(key_);
1860 }
1861 // Now that we've released the lock, finish the update on any newly
1862 // created child policies.
1863 for (ChildPolicyWrapper* child : child_policies_to_finish_update) {
1864 // If the child policy returns a non-OK status, request re-resolution.
1865 // Note that this will initially cause fixed backoff delay in the
1866 // resolver instead of exponential delay. However, once the
1867 // resolver returns the initial re-resolution, we will be able to
1868 // return non-OK from UpdateLocked(), which will trigger
1869 // exponential backoff instead.
1870 absl::Status status = child->MaybeFinishUpdate();
1871 if (!status.ok()) {
1872 lb_policy_->channel_control_helper()->RequestReresolution();
1873 }
1874 }
1875 }
1876
MakeRequestProto()1877 grpc_byte_buffer* RlsLb::RlsRequest::MakeRequestProto() {
1878 upb::Arena arena;
1879 grpc_lookup_v1_RouteLookupRequest* req =
1880 grpc_lookup_v1_RouteLookupRequest_new(arena.ptr());
1881 grpc_lookup_v1_RouteLookupRequest_set_target_type(
1882 req, upb_StringView_FromDataAndSize(kGrpc, sizeof(kGrpc) - 1));
1883 for (const auto& kv : key_.key_map) {
1884 grpc_lookup_v1_RouteLookupRequest_key_map_set(
1885 req, upb_StringView_FromDataAndSize(kv.first.data(), kv.first.size()),
1886 upb_StringView_FromDataAndSize(kv.second.data(), kv.second.size()),
1887 arena.ptr());
1888 }
1889 grpc_lookup_v1_RouteLookupRequest_set_reason(req, reason_);
1890 if (!stale_header_data_.empty()) {
1891 grpc_lookup_v1_RouteLookupRequest_set_stale_header_data(
1892 req, StdStringToUpbString(stale_header_data_.as_string_view()));
1893 }
1894 size_t len;
1895 char* buf =
1896 grpc_lookup_v1_RouteLookupRequest_serialize(req, arena.ptr(), &len);
1897 grpc_slice send_slice = grpc_slice_from_copied_buffer(buf, len);
1898 grpc_byte_buffer* byte_buffer = grpc_raw_byte_buffer_create(&send_slice, 1);
1899 CSliceUnref(send_slice);
1900 return byte_buffer;
1901 }
1902
ParseResponseProto()1903 RlsLb::ResponseInfo RlsLb::RlsRequest::ParseResponseProto() {
1904 ResponseInfo response_info;
1905 upb::Arena arena;
1906 grpc_byte_buffer_reader bbr;
1907 grpc_byte_buffer_reader_init(&bbr, recv_message_);
1908 grpc_slice recv_slice = grpc_byte_buffer_reader_readall(&bbr);
1909 grpc_byte_buffer_reader_destroy(&bbr);
1910 grpc_lookup_v1_RouteLookupResponse* response =
1911 grpc_lookup_v1_RouteLookupResponse_parse(
1912 reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(recv_slice)),
1913 GRPC_SLICE_LENGTH(recv_slice), arena.ptr());
1914 CSliceUnref(recv_slice);
1915 if (response == nullptr) {
1916 response_info.status = absl::InternalError("cannot parse RLS response");
1917 return response_info;
1918 }
1919 size_t num_targets;
1920 const upb_StringView* targets_strview =
1921 grpc_lookup_v1_RouteLookupResponse_targets(response, &num_targets);
1922 if (num_targets == 0) {
1923 response_info.status =
1924 absl::InvalidArgumentError("RLS response has no target entry");
1925 return response_info;
1926 }
1927 response_info.targets.reserve(num_targets);
1928 for (size_t i = 0; i < num_targets; ++i) {
1929 response_info.targets.emplace_back(targets_strview[i].data,
1930 targets_strview[i].size);
1931 }
1932 upb_StringView header_data_strview =
1933 grpc_lookup_v1_RouteLookupResponse_header_data(response);
1934 response_info.header_data =
1935 grpc_event_engine::experimental::Slice::FromCopiedBuffer(
1936 header_data_strview.data, header_data_strview.size);
1937 return response_info;
1938 }
1939
1940 //
1941 // RlsLb
1942 //
1943
GenerateUUID()1944 std::string GenerateUUID() {
1945 absl::uniform_int_distribution<uint64_t> distribution;
1946 absl::BitGen bitgen;
1947 uint64_t hi = distribution(bitgen);
1948 uint64_t lo = distribution(bitgen);
1949 return GenerateUUIDv4(hi, lo);
1950 }
1951
RlsLb(Args args)1952 RlsLb::RlsLb(Args args)
1953 : LoadBalancingPolicy(std::move(args)),
1954 instance_uuid_(channel_args()
1955 .GetOwnedString(GRPC_ARG_TEST_ONLY_RLS_INSTANCE_ID)
1956 .value_or(GenerateUUID())),
1957 cache_(this) {
1958 GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy created";
1959 }
1960
EndpointsEqual(const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> endpoints1,const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> endpoints2)1961 bool EndpointsEqual(
1962 const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> endpoints1,
1963 const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>>
1964 endpoints2) {
1965 if (endpoints1.status() != endpoints2.status()) return false;
1966 if (endpoints1.ok()) {
1967 std::vector<EndpointAddresses> e1_list;
1968 (*endpoints1)->ForEach([&](const EndpointAddresses& endpoint) {
1969 e1_list.push_back(endpoint);
1970 });
1971 size_t i = 0;
1972 bool different = false;
1973 (*endpoints2)->ForEach([&](const EndpointAddresses& endpoint) {
1974 if (endpoint != e1_list[i++]) different = true;
1975 });
1976 if (different) return false;
1977 if (i != e1_list.size()) return false;
1978 }
1979 return true;
1980 }
1981
UpdateLocked(UpdateArgs args)1982 absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
1983 GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy updated";
1984 update_in_progress_ = true;
1985 // Swap out config.
1986 RefCountedPtr<RlsLbConfig> old_config = std::move(config_);
1987 config_ = args.config.TakeAsSubclass<RlsLbConfig>();
1988 if (GRPC_TRACE_FLAG_ENABLED(rls_lb) &&
1989 (old_config == nullptr ||
1990 old_config->child_policy_config() != config_->child_policy_config())) {
1991 LOG(INFO) << "[rlslb " << this << "] updated child policy config: "
1992 << JsonDump(config_->child_policy_config());
1993 }
1994 // Swap out addresses.
1995 // If the new address list is an error and we have an existing address list,
1996 // stick with the existing addresses.
1997 absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> old_addresses;
1998 if (args.addresses.ok()) {
1999 old_addresses = std::move(addresses_);
2000 addresses_ = std::move(args.addresses);
2001 } else {
2002 old_addresses = addresses_;
2003 }
2004 // Swap out channel args.
2005 channel_args_ = std::move(args.args);
2006 // Determine whether we need to update all child policies.
2007 bool update_child_policies =
2008 old_config == nullptr ||
2009 old_config->child_policy_config() != config_->child_policy_config() ||
2010 !EndpointsEqual(old_addresses, addresses_) || args.args != channel_args_;
2011 // If default target changes, swap out child policy.
2012 bool created_default_child = false;
2013 if (old_config == nullptr ||
2014 config_->default_target() != old_config->default_target()) {
2015 if (config_->default_target().empty()) {
2016 GRPC_TRACE_LOG(rls_lb, INFO)
2017 << "[rlslb " << this << "] unsetting default target";
2018 default_child_policy_.reset();
2019 } else {
2020 auto it = child_policy_map_.find(config_->default_target());
2021 if (it == child_policy_map_.end()) {
2022 GRPC_TRACE_LOG(rls_lb, INFO)
2023 << "[rlslb " << this << "] creating new default target";
2024 default_child_policy_ = MakeRefCounted<ChildPolicyWrapper>(
2025 RefAsSubclass<RlsLb>(DEBUG_LOCATION, "ChildPolicyWrapper"),
2026 config_->default_target());
2027 created_default_child = true;
2028 } else {
2029 GRPC_TRACE_LOG(rls_lb, INFO)
2030 << "[rlslb " << this << "] using existing child for default target";
2031 default_child_policy_ =
2032 it->second->Ref(DEBUG_LOCATION, "DefaultChildPolicy");
2033 }
2034 }
2035 }
2036 // Now grab the lock to swap out the state it guards.
2037 std::vector<RefCountedPtr<ChildPolicyWrapper>>
2038 child_policy_wrappers_to_delete;
2039 OrphanablePtr<ChildPolicyHandler> child_policy_to_delete;
2040 {
2041 MutexLock lock(&mu_);
2042 // Swap out RLS channel if needed.
2043 if (old_config == nullptr ||
2044 config_->lookup_service() != old_config->lookup_service()) {
2045 rls_channel_ = MakeOrphanable<RlsChannel>(
2046 RefAsSubclass<RlsLb>(DEBUG_LOCATION, "RlsChannel"));
2047 }
2048 // Resize cache if needed.
2049 if (old_config == nullptr ||
2050 config_->cache_size_bytes() != old_config->cache_size_bytes()) {
2051 cache_.Resize(static_cast<size_t>(config_->cache_size_bytes()),
2052 &child_policy_wrappers_to_delete);
2053 }
2054 // Start update of child policies if needed.
2055 if (update_child_policies) {
2056 GRPC_TRACE_LOG(rls_lb, INFO)
2057 << "[rlslb " << this << "] starting child policy updates";
2058 for (auto& p : child_policy_map_) {
2059 p.second->StartUpdate(&child_policy_to_delete);
2060 }
2061 } else if (created_default_child) {
2062 GRPC_TRACE_LOG(rls_lb, INFO)
2063 << "[rlslb " << this << "] starting default child policy update";
2064 default_child_policy_->StartUpdate(&child_policy_to_delete);
2065 }
2066 }
2067 // Now that we've released the lock, finish update of child policies.
2068 std::vector<std::string> errors;
2069 if (update_child_policies) {
2070 GRPC_TRACE_LOG(rls_lb, INFO)
2071 << "[rlslb " << this << "] finishing child policy updates";
2072 for (auto& p : child_policy_map_) {
2073 absl::Status status = p.second->MaybeFinishUpdate();
2074 if (!status.ok()) {
2075 errors.emplace_back(
2076 absl::StrCat("target ", p.first, ": ", status.ToString()));
2077 }
2078 }
2079 } else if (created_default_child) {
2080 GRPC_TRACE_LOG(rls_lb, INFO)
2081 << "[rlslb " << this << "] finishing default child policy update";
2082 absl::Status status = default_child_policy_->MaybeFinishUpdate();
2083 if (!status.ok()) {
2084 errors.emplace_back(absl::StrCat("target ", config_->default_target(),
2085 ": ", status.ToString()));
2086 }
2087 }
2088 update_in_progress_ = false;
2089 // On the initial update only, we set the gauge metric callback. We
2090 // can't do this before the initial update, because otherwise the
2091 // callback could be invoked before we've set state that we need for
2092 // the label values (e.g., we'd add metrics with empty string for the
2093 // RLS server name).
2094 if (registered_metric_callback_ == nullptr) {
2095 registered_metric_callback_ =
2096 channel_control_helper()->GetStatsPluginGroup().RegisterCallback(
2097 [this](CallbackMetricReporter& reporter) {
2098 MutexLock lock(&mu_);
2099 cache_.ReportMetricsLocked(reporter);
2100 },
2101 Duration::Seconds(5), kMetricCacheSize, kMetricCacheEntries);
2102 }
2103 // In principle, we need to update the picker here only if the config
2104 // fields used by the picker have changed. However, it seems fragile
2105 // to check individual fields, since the picker logic could change in
2106 // the future to use additional config fields, and we might not
2107 // remember to update the code here. So for now, we just unconditionally
2108 // update the picker here, even though it's probably redundant.
2109 UpdatePickerLocked();
2110 // Return status.
2111 if (!errors.empty()) {
2112 return absl::UnavailableError(absl::StrCat(
2113 "errors from children: [", absl::StrJoin(errors, "; "), "]"));
2114 }
2115 return absl::OkStatus();
2116 }
2117
ExitIdleLocked()2118 void RlsLb::ExitIdleLocked() {
2119 MutexLock lock(&mu_);
2120 for (auto& child_entry : child_policy_map_) {
2121 child_entry.second->ExitIdleLocked();
2122 }
2123 }
2124
ResetBackoffLocked()2125 void RlsLb::ResetBackoffLocked() {
2126 {
2127 MutexLock lock(&mu_);
2128 rls_channel_->ResetBackoff();
2129 cache_.ResetAllBackoff();
2130 }
2131 for (auto& child : child_policy_map_) {
2132 child.second->ResetBackoffLocked();
2133 }
2134 }
2135
ShutdownLocked()2136 void RlsLb::ShutdownLocked() {
2137 GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy shutdown";
2138 registered_metric_callback_.reset();
2139 RefCountedPtr<ChildPolicyWrapper> child_policy_to_delete;
2140 std::vector<RefCountedPtr<ChildPolicyWrapper>>
2141 child_policy_wrappers_to_delete;
2142 OrphanablePtr<RlsChannel> rls_channel_to_delete;
2143 {
2144 MutexLock lock(&mu_);
2145 is_shutdown_ = true;
2146 config_.reset(DEBUG_LOCATION, "ShutdownLocked");
2147 child_policy_wrappers_to_delete = cache_.Shutdown();
2148 request_map_.clear();
2149 rls_channel_to_delete = std::move(rls_channel_);
2150 child_policy_to_delete = std::move(default_child_policy_);
2151 }
2152 channel_args_ = ChannelArgs();
2153 }
2154
UpdatePickerAsync()2155 void RlsLb::UpdatePickerAsync() {
2156 // Run via the ExecCtx, since the caller may be holding the lock, and
2157 // we don't want to be doing that when we hop into the WorkSerializer,
2158 // in case the WorkSerializer callback happens to run inline.
2159 ExecCtx::Run(
2160 DEBUG_LOCATION,
2161 GRPC_CLOSURE_CREATE(UpdatePickerCallback,
2162 Ref(DEBUG_LOCATION, "UpdatePickerCallback").release(),
2163 grpc_schedule_on_exec_ctx),
2164 absl::OkStatus());
2165 }
2166
UpdatePickerCallback(void * arg,grpc_error_handle)2167 void RlsLb::UpdatePickerCallback(void* arg, grpc_error_handle /*error*/) {
2168 auto* rls_lb = static_cast<RlsLb*>(arg);
2169 rls_lb->work_serializer()->Run(
2170 [rls_lb]() {
2171 RefCountedPtr<RlsLb> lb_policy(rls_lb);
2172 lb_policy->UpdatePickerLocked();
2173 lb_policy.reset(DEBUG_LOCATION, "UpdatePickerCallback");
2174 },
2175 DEBUG_LOCATION);
2176 }
2177
UpdatePickerLocked()2178 void RlsLb::UpdatePickerLocked() {
2179 // If we're in the process of propagating an update from our parent to
2180 // our children, ignore any updates that come from the children. We
2181 // will instead return a new picker once the update has been seen by
2182 // all children. This avoids unnecessary picker churn while an update
2183 // is being propagated to our children.
2184 if (update_in_progress_) return;
2185 GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] updating picker";
2186 grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
2187 if (!child_policy_map_.empty()) {
2188 state = GRPC_CHANNEL_TRANSIENT_FAILURE;
2189 int num_idle = 0;
2190 int num_connecting = 0;
2191 {
2192 MutexLock lock(&mu_);
2193 if (is_shutdown_) return;
2194 for (auto& p : child_policy_map_) {
2195 grpc_connectivity_state child_state = p.second->connectivity_state();
2196 GRPC_TRACE_LOG(rls_lb, INFO)
2197 << "[rlslb " << this << "] target " << p.second->target()
2198 << " in state " << ConnectivityStateName(child_state);
2199 if (child_state == GRPC_CHANNEL_READY) {
2200 state = GRPC_CHANNEL_READY;
2201 break;
2202 } else if (child_state == GRPC_CHANNEL_CONNECTING) {
2203 ++num_connecting;
2204 } else if (child_state == GRPC_CHANNEL_IDLE) {
2205 ++num_idle;
2206 }
2207 }
2208 if (state != GRPC_CHANNEL_READY) {
2209 if (num_connecting > 0) {
2210 state = GRPC_CHANNEL_CONNECTING;
2211 } else if (num_idle > 0) {
2212 state = GRPC_CHANNEL_IDLE;
2213 }
2214 }
2215 }
2216 }
2217 GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] reporting state "
2218 << ConnectivityStateName(state);
2219 absl::Status status;
2220 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
2221 status = absl::UnavailableError("no children available");
2222 }
2223 channel_control_helper()->UpdateState(
2224 state, status,
2225 MakeRefCounted<Picker>(RefAsSubclass<RlsLb>(DEBUG_LOCATION, "Picker")));
2226 }
2227
2228 template <typename HandleType>
MaybeExportPickCount(HandleType handle,absl::string_view target,const PickResult & pick_result)2229 void RlsLb::MaybeExportPickCount(HandleType handle, absl::string_view target,
2230 const PickResult& pick_result) {
2231 absl::string_view pick_result_string = Match(
2232 pick_result.result,
2233 [](const LoadBalancingPolicy::PickResult::Complete&) {
2234 return "complete";
2235 },
2236 [](const LoadBalancingPolicy::PickResult::Queue&) { return ""; },
2237 [](const LoadBalancingPolicy::PickResult::Fail&) { return "fail"; },
2238 [](const LoadBalancingPolicy::PickResult::Drop&) { return "drop"; });
2239 if (pick_result_string.empty()) return; // Don't report queued picks.
2240 auto& stats_plugins = channel_control_helper()->GetStatsPluginGroup();
2241 stats_plugins.AddCounter(
2242 handle, 1,
2243 {channel_control_helper()->GetTarget(), config_->lookup_service(), target,
2244 pick_result_string},
2245 {});
2246 }
2247
2248 //
2249 // RlsLbFactory
2250 //
2251
2252 struct GrpcKeyBuilder {
2253 struct Name {
2254 std::string service;
2255 std::string method;
2256
JsonLoadergrpc_core::__anon2f58d5c00111::GrpcKeyBuilder::Name2257 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
2258 static const auto* loader = JsonObjectLoader<Name>()
2259 .Field("service", &Name::service)
2260 .OptionalField("method", &Name::method)
2261 .Finish();
2262 return loader;
2263 }
2264 };
2265
2266 struct NameMatcher {
2267 std::string key;
2268 std::vector<std::string> names;
2269 absl::optional<bool> required_match;
2270
JsonLoadergrpc_core::__anon2f58d5c00111::GrpcKeyBuilder::NameMatcher2271 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
2272 static const auto* loader =
2273 JsonObjectLoader<NameMatcher>()
2274 .Field("key", &NameMatcher::key)
2275 .Field("names", &NameMatcher::names)
2276 .OptionalField("requiredMatch", &NameMatcher::required_match)
2277 .Finish();
2278 return loader;
2279 }
2280
JsonPostLoadgrpc_core::__anon2f58d5c00111::GrpcKeyBuilder::NameMatcher2281 void JsonPostLoad(const Json&, const JsonArgs&, ValidationErrors* errors) {
2282 // key must be non-empty.
2283 {
2284 ValidationErrors::ScopedField field(errors, ".key");
2285 if (!errors->FieldHasErrors() && key.empty()) {
2286 errors->AddError("must be non-empty");
2287 }
2288 }
2289 // List of header names must be non-empty.
2290 {
2291 ValidationErrors::ScopedField field(errors, ".names");
2292 if (!errors->FieldHasErrors() && names.empty()) {
2293 errors->AddError("must be non-empty");
2294 }
2295 // Individual header names must be non-empty.
2296 for (size_t i = 0; i < names.size(); ++i) {
2297 ValidationErrors::ScopedField field(errors,
2298 absl::StrCat("[", i, "]"));
2299 if (!errors->FieldHasErrors() && names[i].empty()) {
2300 errors->AddError("must be non-empty");
2301 }
2302 }
2303 }
2304 // requiredMatch must not be present.
2305 {
2306 ValidationErrors::ScopedField field(errors, ".requiredMatch");
2307 if (required_match.has_value()) {
2308 errors->AddError("must not be present");
2309 }
2310 }
2311 }
2312 };
2313
2314 struct ExtraKeys {
2315 absl::optional<std::string> host_key;
2316 absl::optional<std::string> service_key;
2317 absl::optional<std::string> method_key;
2318
JsonLoadergrpc_core::__anon2f58d5c00111::GrpcKeyBuilder::ExtraKeys2319 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
2320 static const auto* loader =
2321 JsonObjectLoader<ExtraKeys>()
2322 .OptionalField("host", &ExtraKeys::host_key)
2323 .OptionalField("service", &ExtraKeys::service_key)
2324 .OptionalField("method", &ExtraKeys::method_key)
2325 .Finish();
2326 return loader;
2327 }
2328
JsonPostLoadgrpc_core::__anon2f58d5c00111::GrpcKeyBuilder::ExtraKeys2329 void JsonPostLoad(const Json&, const JsonArgs&, ValidationErrors* errors) {
2330 auto check_field = [&](const std::string& field_name,
2331 absl::optional<std::string>* struct_field) {
2332 ValidationErrors::ScopedField field(errors,
2333 absl::StrCat(".", field_name));
2334 if (struct_field->has_value() && (*struct_field)->empty()) {
2335 errors->AddError("must be non-empty if set");
2336 }
2337 };
2338 check_field("host", &host_key);
2339 check_field("service", &service_key);
2340 check_field("method", &method_key);
2341 }
2342 };
2343
2344 std::vector<Name> names;
2345 std::vector<NameMatcher> headers;
2346 ExtraKeys extra_keys;
2347 std::map<std::string /*key*/, std::string /*value*/> constant_keys;
2348
JsonLoadergrpc_core::__anon2f58d5c00111::GrpcKeyBuilder2349 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
2350 static const auto* loader =
2351 JsonObjectLoader<GrpcKeyBuilder>()
2352 .Field("names", &GrpcKeyBuilder::names)
2353 .OptionalField("headers", &GrpcKeyBuilder::headers)
2354 .OptionalField("extraKeys", &GrpcKeyBuilder::extra_keys)
2355 .OptionalField("constantKeys", &GrpcKeyBuilder::constant_keys)
2356 .Finish();
2357 return loader;
2358 }
2359
JsonPostLoadgrpc_core::__anon2f58d5c00111::GrpcKeyBuilder2360 void JsonPostLoad(const Json&, const JsonArgs&, ValidationErrors* errors) {
2361 // The names field must be non-empty.
2362 {
2363 ValidationErrors::ScopedField field(errors, ".names");
2364 if (!errors->FieldHasErrors() && names.empty()) {
2365 errors->AddError("must be non-empty");
2366 }
2367 }
2368 // Make sure no key in constantKeys is empty.
2369 if (constant_keys.find("") != constant_keys.end()) {
2370 ValidationErrors::ScopedField field(errors, ".constantKeys[\"\"]");
2371 errors->AddError("key must be non-empty");
2372 }
2373 // Check for duplicate keys.
2374 std::set<absl::string_view> keys_seen;
2375 auto duplicate_key_check_func = [&keys_seen, errors](
2376 const std::string& key,
2377 const std::string& field_name) {
2378 if (key.empty()) return; // Already generated an error about this.
2379 ValidationErrors::ScopedField field(errors, field_name);
2380 auto it = keys_seen.find(key);
2381 if (it != keys_seen.end()) {
2382 errors->AddError(absl::StrCat("duplicate key \"", key, "\""));
2383 } else {
2384 keys_seen.insert(key);
2385 }
2386 };
2387 for (size_t i = 0; i < headers.size(); ++i) {
2388 NameMatcher& header = headers[i];
2389 duplicate_key_check_func(header.key,
2390 absl::StrCat(".headers[", i, "].key"));
2391 }
2392 for (const auto& p : constant_keys) {
2393 duplicate_key_check_func(
2394 p.first, absl::StrCat(".constantKeys[\"", p.first, "\"]"));
2395 }
2396 if (extra_keys.host_key.has_value()) {
2397 duplicate_key_check_func(*extra_keys.host_key, ".extraKeys.host");
2398 }
2399 if (extra_keys.service_key.has_value()) {
2400 duplicate_key_check_func(*extra_keys.service_key, ".extraKeys.service");
2401 }
2402 if (extra_keys.method_key.has_value()) {
2403 duplicate_key_check_func(*extra_keys.method_key, ".extraKeys.method");
2404 }
2405 }
2406 };
2407
JsonLoader(const JsonArgs &)2408 const JsonLoaderInterface* RlsLbConfig::RouteLookupConfig::JsonLoader(
2409 const JsonArgs&) {
2410 static const auto* loader =
2411 JsonObjectLoader<RouteLookupConfig>()
2412 // Note: Some fields require manual processing and are handled in
2413 // JsonPostLoad() instead.
2414 .Field("lookupService", &RouteLookupConfig::lookup_service)
2415 .OptionalField("lookupServiceTimeout",
2416 &RouteLookupConfig::lookup_service_timeout)
2417 .OptionalField("maxAge", &RouteLookupConfig::max_age)
2418 .OptionalField("staleAge", &RouteLookupConfig::stale_age)
2419 .Field("cacheSizeBytes", &RouteLookupConfig::cache_size_bytes)
2420 .OptionalField("defaultTarget", &RouteLookupConfig::default_target)
2421 .Finish();
2422 return loader;
2423 }
2424
JsonPostLoad(const Json & json,const JsonArgs & args,ValidationErrors * errors)2425 void RlsLbConfig::RouteLookupConfig::JsonPostLoad(const Json& json,
2426 const JsonArgs& args,
2427 ValidationErrors* errors) {
2428 // Parse grpcKeybuilders.
2429 auto grpc_keybuilders = LoadJsonObjectField<std::vector<GrpcKeyBuilder>>(
2430 json.object(), args, "grpcKeybuilders", errors);
2431 if (grpc_keybuilders.has_value()) {
2432 ValidationErrors::ScopedField field(errors, ".grpcKeybuilders");
2433 for (size_t i = 0; i < grpc_keybuilders->size(); ++i) {
2434 ValidationErrors::ScopedField field(errors, absl::StrCat("[", i, "]"));
2435 auto& grpc_keybuilder = (*grpc_keybuilders)[i];
2436 // Construct KeyBuilder.
2437 RlsLbConfig::KeyBuilder key_builder;
2438 for (const auto& header : grpc_keybuilder.headers) {
2439 key_builder.header_keys.emplace(header.key, header.names);
2440 }
2441 if (grpc_keybuilder.extra_keys.host_key.has_value()) {
2442 key_builder.host_key = std::move(*grpc_keybuilder.extra_keys.host_key);
2443 }
2444 if (grpc_keybuilder.extra_keys.service_key.has_value()) {
2445 key_builder.service_key =
2446 std::move(*grpc_keybuilder.extra_keys.service_key);
2447 }
2448 if (grpc_keybuilder.extra_keys.method_key.has_value()) {
2449 key_builder.method_key =
2450 std::move(*grpc_keybuilder.extra_keys.method_key);
2451 }
2452 key_builder.constant_keys = std::move(grpc_keybuilder.constant_keys);
2453 // Add entries to map.
2454 for (const auto& name : grpc_keybuilder.names) {
2455 std::string path = absl::StrCat("/", name.service, "/", name.method);
2456 bool inserted = key_builder_map.emplace(path, key_builder).second;
2457 if (!inserted) {
2458 errors->AddError(absl::StrCat("duplicate entry for \"", path, "\""));
2459 }
2460 }
2461 }
2462 }
2463 // Validate lookupService.
2464 {
2465 ValidationErrors::ScopedField field(errors, ".lookupService");
2466 if (!errors->FieldHasErrors() &&
2467 !CoreConfiguration::Get().resolver_registry().IsValidTarget(
2468 lookup_service)) {
2469 errors->AddError("must be valid gRPC target URI");
2470 }
2471 }
2472 // Clamp maxAge to the max allowed value.
2473 if (max_age > kMaxMaxAge) max_age = kMaxMaxAge;
2474 // If staleAge is set, then maxAge must also be set.
2475 if (json.object().find("staleAge") != json.object().end() &&
2476 json.object().find("maxAge") == json.object().end()) {
2477 ValidationErrors::ScopedField field(errors, ".maxAge");
2478 errors->AddError("must be set if staleAge is set");
2479 }
2480 // Ignore staleAge if greater than or equal to maxAge.
2481 if (stale_age >= max_age) stale_age = max_age;
2482 // Validate cacheSizeBytes.
2483 {
2484 ValidationErrors::ScopedField field(errors, ".cacheSizeBytes");
2485 if (!errors->FieldHasErrors() && cache_size_bytes <= 0) {
2486 errors->AddError("must be greater than 0");
2487 }
2488 }
2489 // Clamp cacheSizeBytes to the max allowed value.
2490 if (cache_size_bytes > kMaxCacheSizeBytes) {
2491 cache_size_bytes = kMaxCacheSizeBytes;
2492 }
2493 // Validate defaultTarget.
2494 {
2495 ValidationErrors::ScopedField field(errors, ".defaultTarget");
2496 if (!errors->FieldHasErrors() &&
2497 json.object().find("defaultTarget") != json.object().end() &&
2498 default_target.empty()) {
2499 errors->AddError("must be non-empty if set");
2500 }
2501 }
2502 }
2503
JsonLoader(const JsonArgs &)2504 const JsonLoaderInterface* RlsLbConfig::JsonLoader(const JsonArgs&) {
2505 static const auto* loader =
2506 JsonObjectLoader<RlsLbConfig>()
2507 // Note: Some fields require manual processing and are handled in
2508 // JsonPostLoad() instead.
2509 .Field("routeLookupConfig", &RlsLbConfig::route_lookup_config_)
2510 .Field("childPolicyConfigTargetFieldName",
2511 &RlsLbConfig::child_policy_config_target_field_name_)
2512 .Finish();
2513 return loader;
2514 }
2515
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)2516 void RlsLbConfig::JsonPostLoad(const Json& json, const JsonArgs&,
2517 ValidationErrors* errors) {
2518 // Parse routeLookupChannelServiceConfig.
2519 auto it = json.object().find("routeLookupChannelServiceConfig");
2520 if (it != json.object().end()) {
2521 ValidationErrors::ScopedField field(errors,
2522 ".routeLookupChannelServiceConfig");
2523 // Don't need to save the result here, just need the errors (if any).
2524 ServiceConfigImpl::Create(ChannelArgs(), it->second, errors);
2525 }
2526 // Validate childPolicyConfigTargetFieldName.
2527 {
2528 ValidationErrors::ScopedField field(errors,
2529 ".childPolicyConfigTargetFieldName");
2530 if (!errors->FieldHasErrors() &&
2531 child_policy_config_target_field_name_.empty()) {
2532 errors->AddError("must be non-empty");
2533 }
2534 }
2535 // Parse childPolicy.
2536 {
2537 ValidationErrors::ScopedField field(errors, ".childPolicy");
2538 auto it = json.object().find("childPolicy");
2539 if (it == json.object().end()) {
2540 errors->AddError("field not present");
2541 } else {
2542 // Add target to all child policy configs in the list.
2543 std::string target = route_lookup_config_.default_target.empty()
2544 ? kFakeTargetFieldValue
2545 : route_lookup_config_.default_target;
2546 auto child_policy_config = InsertOrUpdateChildPolicyField(
2547 child_policy_config_target_field_name_, target, it->second, errors);
2548 if (child_policy_config.has_value()) {
2549 child_policy_config_ = std::move(*child_policy_config);
2550 // Parse the config.
2551 auto parsed_config =
2552 CoreConfiguration::Get()
2553 .lb_policy_registry()
2554 .ParseLoadBalancingConfig(child_policy_config_);
2555 if (!parsed_config.ok()) {
2556 errors->AddError(parsed_config.status().message());
2557 } else {
2558 // Find the chosen config and return it in JSON form.
2559 // We remove all non-selected configs, and in the selected config,
2560 // we leave the target field in place, set to the default value.
2561 // This slightly optimizes what we need to do later when we update
2562 // a child policy for a given target.
2563 for (const Json& config : child_policy_config_.array()) {
2564 if (config.object().begin()->first == (*parsed_config)->name()) {
2565 child_policy_config_ = Json::FromArray({config});
2566 break;
2567 }
2568 }
2569 // If default target is set, set the default child config.
2570 if (!route_lookup_config_.default_target.empty()) {
2571 default_child_policy_parsed_config_ = std::move(*parsed_config);
2572 }
2573 }
2574 }
2575 }
2576 }
2577 }
2578
2579 class RlsLbFactory final : public LoadBalancingPolicyFactory {
2580 public:
name() const2581 absl::string_view name() const override { return kRls; }
2582
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const2583 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
2584 LoadBalancingPolicy::Args args) const override {
2585 return MakeOrphanable<RlsLb>(std::move(args));
2586 }
2587
2588 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const2589 ParseLoadBalancingConfig(const Json& json) const override {
2590 return LoadFromJson<RefCountedPtr<RlsLbConfig>>(
2591 json, JsonArgs(), "errors validating RLS LB policy config");
2592 }
2593 };
2594
2595 } // namespace
2596
RegisterRlsLbPolicy(CoreConfiguration::Builder * builder)2597 void RegisterRlsLbPolicy(CoreConfiguration::Builder* builder) {
2598 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
2599 std::make_unique<RlsLbFactory>());
2600 }
2601
2602 } // namespace grpc_core
2603