• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2020 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 // Implementation of the Route Lookup Service (RLS) LB policy
18 //
19 // The policy queries a route lookup service for the name of the actual service
20 // to use. A child policy that recognizes the name as a field of its
21 // configuration will take further load balancing action on the request.
22 
23 #include "src/core/load_balancing/rls/rls.h"
24 
25 #include <grpc/byte_buffer.h>
26 #include <grpc/byte_buffer_reader.h>
27 #include <grpc/event_engine/event_engine.h>
28 #include <grpc/grpc.h>
29 #include <grpc/impl/channel_arg_names.h>
30 #include <grpc/impl/connectivity_state.h>
31 #include <grpc/impl/propagation_bits.h>
32 #include <grpc/slice.h>
33 #include <grpc/status.h>
34 #include <grpc/support/json.h>
35 #include <grpc/support/port_platform.h>
36 #include <inttypes.h>
37 #include <stdlib.h>
38 #include <string.h>
39 
40 #include <algorithm>
41 #include <deque>
42 #include <list>
43 #include <map>
44 #include <memory>
45 #include <random>
46 #include <set>
47 #include <string>
48 #include <type_traits>
49 #include <unordered_map>
50 #include <utility>
51 #include <vector>
52 
53 #include "absl/base/thread_annotations.h"
54 #include "absl/hash/hash.h"
55 #include "absl/log/check.h"
56 #include "absl/log/log.h"
57 #include "absl/random/random.h"
58 #include "absl/status/status.h"
59 #include "absl/status/statusor.h"
60 #include "absl/strings/str_cat.h"
61 #include "absl/strings/str_format.h"
62 #include "absl/strings/str_join.h"
63 #include "absl/strings/string_view.h"
64 #include "absl/types/optional.h"
65 #include "src/core/channelz/channelz.h"
66 #include "src/core/client_channel/client_channel_filter.h"
67 #include "src/core/config/core_configuration.h"
68 #include "src/core/lib/channel/channel_args.h"
69 #include "src/core/lib/debug/trace.h"
70 #include "src/core/lib/iomgr/closure.h"
71 #include "src/core/lib/iomgr/error.h"
72 #include "src/core/lib/iomgr/exec_ctx.h"
73 #include "src/core/lib/iomgr/pollset_set.h"
74 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
75 #include "src/core/lib/slice/slice.h"
76 #include "src/core/lib/slice/slice_internal.h"
77 #include "src/core/lib/surface/call.h"
78 #include "src/core/lib/surface/channel.h"
79 #include "src/core/lib/transport/connectivity_state.h"
80 #include "src/core/lib/transport/error_utils.h"
81 #include "src/core/load_balancing/child_policy_handler.h"
82 #include "src/core/load_balancing/delegating_helper.h"
83 #include "src/core/load_balancing/lb_policy.h"
84 #include "src/core/load_balancing/lb_policy_factory.h"
85 #include "src/core/load_balancing/lb_policy_registry.h"
86 #include "src/core/resolver/endpoint_addresses.h"
87 #include "src/core/resolver/resolver_registry.h"
88 #include "src/core/service_config/service_config_impl.h"
89 #include "src/core/telemetry/metrics.h"
90 #include "src/core/util/backoff.h"
91 #include "src/core/util/debug_location.h"
92 #include "src/core/util/dual_ref_counted.h"
93 #include "src/core/util/json/json.h"
94 #include "src/core/util/json/json_args.h"
95 #include "src/core/util/json/json_object_loader.h"
96 #include "src/core/util/json/json_writer.h"
97 #include "src/core/util/match.h"
98 #include "src/core/util/orphanable.h"
99 #include "src/core/util/ref_counted_ptr.h"
100 #include "src/core/util/status_helper.h"
101 #include "src/core/util/sync.h"
102 #include "src/core/util/time.h"
103 #include "src/core/util/upb_utils.h"
104 #include "src/core/util/uuid_v4.h"
105 #include "src/core/util/validation_errors.h"
106 #include "src/core/util/work_serializer.h"
107 #include "src/proto/grpc/lookup/v1/rls.upb.h"
108 #include "upb/base/string_view.h"
109 #include "upb/mem/arena.hpp"
110 
111 using ::grpc_event_engine::experimental::EventEngine;
112 
113 namespace grpc_core {
114 
115 namespace {
116 
117 constexpr absl::string_view kMetricLabelRlsServerTarget =
118     "grpc.lb.rls.server_target";
119 constexpr absl::string_view kMetricLabelRlsInstanceUuid =
120     "grpc.lb.rls.instance_uuid";
121 constexpr absl::string_view kMetricRlsDataPlaneTarget =
122     "grpc.lb.rls.data_plane_target";
123 constexpr absl::string_view kMetricLabelPickResult = "grpc.lb.pick_result";
124 
125 const auto kMetricCacheSize =
126     GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
127         "grpc.lb.rls.cache_size", "EXPERIMENTAL.  Size of the RLS cache.", "By",
128         false)
129         .Labels(kMetricLabelTarget, kMetricLabelRlsServerTarget,
130                 kMetricLabelRlsInstanceUuid)
131         .Build();
132 
133 const auto kMetricCacheEntries =
134     GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
135         "grpc.lb.rls.cache_entries",
136         "EXPERIMENTAL.  Number of entries in the RLS cache.", "{entry}", false)
137         .Labels(kMetricLabelTarget, kMetricLabelRlsServerTarget,
138                 kMetricLabelRlsInstanceUuid)
139         .Build();
140 
141 const auto kMetricDefaultTargetPicks =
142     GlobalInstrumentsRegistry::RegisterUInt64Counter(
143         "grpc.lb.rls.default_target_picks",
144         "EXPERIMENTAL.  Number of LB picks sent to the default target.",
145         "{pick}", false)
146         .Labels(kMetricLabelTarget, kMetricLabelRlsServerTarget,
147                 kMetricRlsDataPlaneTarget, kMetricLabelPickResult)
148         .Build();
149 
150 const auto kMetricTargetPicks =
151     GlobalInstrumentsRegistry::RegisterUInt64Counter(
152         "grpc.lb.rls.target_picks",
153         "EXPERIMENTAL.  Number of LB picks sent to each RLS target.  Note that "
154         "if the default target is also returned by the RLS server, RPCs sent "
155         "to that target from the cache will be counted in this metric, not "
156         "in grpc.rls.default_target_picks.",
157         "{pick}", false)
158         .Labels(kMetricLabelTarget, kMetricLabelRlsServerTarget,
159                 kMetricRlsDataPlaneTarget, kMetricLabelPickResult)
160         .Build();
161 
162 const auto kMetricFailedPicks =
163     GlobalInstrumentsRegistry::RegisterUInt64Counter(
164         "grpc.lb.rls.failed_picks",
165         "EXPERIMENTAL.  Number of LB picks failed due to either a failed RLS "
166         "request or the RLS channel being throttled.",
167         "{pick}", false)
168         .Labels(kMetricLabelTarget, kMetricLabelRlsServerTarget)
169         .Build();
170 
171 constexpr absl::string_view kRls = "rls_experimental";
172 const char kGrpc[] = "grpc";
173 const char* kRlsRequestPath = "/grpc.lookup.v1.RouteLookupService/RouteLookup";
174 const char* kFakeTargetFieldValue = "fake_target_field_value";
175 const char* kRlsHeaderKey = "x-google-rls-data";
176 
177 const Duration kDefaultLookupServiceTimeout = Duration::Seconds(10);
178 const Duration kMaxMaxAge = Duration::Minutes(5);
179 const Duration kMinExpirationTime = Duration::Seconds(5);
180 const Duration kCacheBackoffInitial = Duration::Seconds(1);
181 const double kCacheBackoffMultiplier = 1.6;
182 const double kCacheBackoffJitter = 0.2;
183 const Duration kCacheBackoffMax = Duration::Minutes(2);
184 const Duration kDefaultThrottleWindowSize = Duration::Seconds(30);
185 const double kDefaultThrottleRatioForSuccesses = 2.0;
186 const int kDefaultThrottlePadding = 8;
187 const Duration kCacheCleanupTimerInterval = Duration::Minutes(1);
188 const int64_t kMaxCacheSizeBytes = 5 * 1024 * 1024;
189 
190 // Parsed RLS LB policy configuration.
191 class RlsLbConfig final : public LoadBalancingPolicy::Config {
192  public:
193   struct KeyBuilder {
194     std::map<std::string /*key*/, std::vector<std::string /*header*/>>
195         header_keys;
196     std::string host_key;
197     std::string service_key;
198     std::string method_key;
199     std::map<std::string /*key*/, std::string /*value*/> constant_keys;
200   };
201   using KeyBuilderMap = std::unordered_map<std::string /*path*/, KeyBuilder>;
202 
203   struct RouteLookupConfig {
204     KeyBuilderMap key_builder_map;
205     std::string lookup_service;
206     Duration lookup_service_timeout = kDefaultLookupServiceTimeout;
207     Duration max_age = kMaxMaxAge;
208     Duration stale_age = kMaxMaxAge;
209     int64_t cache_size_bytes = 0;
210     std::string default_target;
211 
212     static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
213     void JsonPostLoad(const Json& json, const JsonArgs& args,
214                       ValidationErrors* errors);
215   };
216 
217   RlsLbConfig() = default;
218 
219   RlsLbConfig(const RlsLbConfig&) = delete;
220   RlsLbConfig& operator=(const RlsLbConfig&) = delete;
221 
222   RlsLbConfig(RlsLbConfig&& other) = delete;
223   RlsLbConfig& operator=(RlsLbConfig&& other) = delete;
224 
name() const225   absl::string_view name() const override { return kRls; }
226 
key_builder_map() const227   const KeyBuilderMap& key_builder_map() const {
228     return route_lookup_config_.key_builder_map;
229   }
lookup_service() const230   const std::string& lookup_service() const {
231     return route_lookup_config_.lookup_service;
232   }
lookup_service_timeout() const233   Duration lookup_service_timeout() const {
234     return route_lookup_config_.lookup_service_timeout;
235   }
max_age() const236   Duration max_age() const { return route_lookup_config_.max_age; }
stale_age() const237   Duration stale_age() const { return route_lookup_config_.stale_age; }
cache_size_bytes() const238   int64_t cache_size_bytes() const {
239     return route_lookup_config_.cache_size_bytes;
240   }
default_target() const241   const std::string& default_target() const {
242     return route_lookup_config_.default_target;
243   }
rls_channel_service_config() const244   const std::string& rls_channel_service_config() const {
245     return rls_channel_service_config_;
246   }
child_policy_config() const247   const Json& child_policy_config() const { return child_policy_config_; }
child_policy_config_target_field_name() const248   const std::string& child_policy_config_target_field_name() const {
249     return child_policy_config_target_field_name_;
250   }
251   RefCountedPtr<LoadBalancingPolicy::Config>
default_child_policy_parsed_config() const252   default_child_policy_parsed_config() const {
253     return default_child_policy_parsed_config_;
254   }
255 
256   static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
257   void JsonPostLoad(const Json& json, const JsonArgs&,
258                     ValidationErrors* errors);
259 
260  private:
261   RouteLookupConfig route_lookup_config_;
262   std::string rls_channel_service_config_;
263   Json child_policy_config_;
264   std::string child_policy_config_target_field_name_;
265   RefCountedPtr<LoadBalancingPolicy::Config>
266       default_child_policy_parsed_config_;
267 };
268 
269 // RLS LB policy.
270 class RlsLb final : public LoadBalancingPolicy {
271  public:
272   explicit RlsLb(Args args);
273 
name() const274   absl::string_view name() const override { return kRls; }
275   absl::Status UpdateLocked(UpdateArgs args) override;
276   void ExitIdleLocked() override;
277   void ResetBackoffLocked() override;
278 
279  private:
280   // Key to access entries in the cache and the request map.
281   struct RequestKey {
282     std::map<std::string, std::string> key_map;
283 
operator ==grpc_core::__anon2f58d5c00111::RlsLb::RequestKey284     bool operator==(const RequestKey& rhs) const {
285       return key_map == rhs.key_map;
286     }
287 
288     template <typename H>
AbslHashValue(H h,const RequestKey & key)289     friend H AbslHashValue(H h, const RequestKey& key) {
290       std::hash<std::string> string_hasher;
291       for (auto& kv : key.key_map) {
292         h = H::combine(std::move(h), string_hasher(kv.first),
293                        string_hasher(kv.second));
294       }
295       return h;
296     }
297 
Sizegrpc_core::__anon2f58d5c00111::RlsLb::RequestKey298     size_t Size() const {
299       size_t size = sizeof(RequestKey);
300       for (auto& kv : key_map) {
301         size += kv.first.length() + kv.second.length();
302       }
303       return size;
304     }
305 
ToStringgrpc_core::__anon2f58d5c00111::RlsLb::RequestKey306     std::string ToString() const {
307       return absl::StrCat(
308           "{", absl::StrJoin(key_map, ",", absl::PairFormatter("=")), "}");
309     }
310   };
311 
312   // Data from an RLS response.
313   struct ResponseInfo {
314     absl::Status status;
315     std::vector<std::string> targets;
316     grpc_event_engine::experimental::Slice header_data;
317 
ToStringgrpc_core::__anon2f58d5c00111::RlsLb::ResponseInfo318     std::string ToString() const {
319       return absl::StrFormat("{status=%s, targets=[%s], header_data=\"%s\"}",
320                              status.ToString(), absl::StrJoin(targets, ","),
321                              header_data.as_string_view());
322     }
323   };
324 
325   // Wraps a child policy for a given RLS target.
326   class ChildPolicyWrapper final : public DualRefCounted<ChildPolicyWrapper> {
327    public:
328     ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy, std::string target);
329 
target() const330     const std::string& target() const { return target_; }
331 
Pick(PickArgs args)332     PickResult Pick(PickArgs args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
333       return picker_->Pick(args);
334     }
335 
336     // Updates for the child policy are handled in two phases:
337     // 1. In StartUpdate(), we parse and validate the new child policy
338     //    config and store the parsed config.
339     // 2. In MaybeFinishUpdate(), we actually pass the parsed config to the
340     //    child policy's UpdateLocked() method.
341     //
342     // The reason we do this is to avoid deadlocks.  In StartUpdate(),
343     // if the new config fails to validate, then we need to set
344     // picker_ to an instance that will fail all requests, which
345     // requires holding the lock.  However, we cannot call the child
346     // policy's UpdateLocked() method from MaybeFinishUpdate() while
347     // holding the lock, since that would cause a deadlock: the child's
348     // UpdateLocked() will call the helper's UpdateState() method, which
349     // will try to acquire the lock to set picker_.  So StartUpdate() is
350     // called while we are still holding the lock, but MaybeFinishUpdate()
351     // is called after releasing it.
352     //
353     // Both methods grab the data they need from the parent object.
354     void StartUpdate(OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete)
355         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
356     absl::Status MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb::mu_);
357 
ExitIdleLocked()358     void ExitIdleLocked() {
359       if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
360     }
361 
ResetBackoffLocked()362     void ResetBackoffLocked() {
363       if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
364     }
365 
366     // Gets the connectivity state of the child policy. Once the child policy
367     // reports TRANSIENT_FAILURE, the function will always return
368     // TRANSIENT_FAILURE state instead of the actual state of the child policy
369     // until the child policy reports another READY state.
connectivity_state() const370     grpc_connectivity_state connectivity_state() const
371         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
372       return connectivity_state_;
373     }
374 
375    private:
376     // ChannelControlHelper object that allows the child policy to update state
377     // with the wrapper.
378     class ChildPolicyHelper final : public DelegatingChannelControlHelper {
379      public:
ChildPolicyHelper(WeakRefCountedPtr<ChildPolicyWrapper> wrapper)380       explicit ChildPolicyHelper(WeakRefCountedPtr<ChildPolicyWrapper> wrapper)
381           : wrapper_(std::move(wrapper)) {}
~ChildPolicyHelper()382       ~ChildPolicyHelper() override {
383         wrapper_.reset(DEBUG_LOCATION, "ChildPolicyHelper");
384       }
385 
386       void UpdateState(grpc_connectivity_state state,
387                        const absl::Status& status,
388                        RefCountedPtr<SubchannelPicker> picker) override;
389 
390      private:
parent_helper() const391       ChannelControlHelper* parent_helper() const override {
392         return wrapper_->lb_policy_->channel_control_helper();
393       }
394 
395       WeakRefCountedPtr<ChildPolicyWrapper> wrapper_;
396     };
397 
398     // Note: We are forced to disable lock analysis here because
399     // Orphaned() is called by Unref() which is called by RefCountedPtr<>, which
400     // cannot have lock annotations for this particular caller.
401     void Orphaned() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
402 
403     RefCountedPtr<RlsLb> lb_policy_;
404     std::string target_;
405 
406     bool is_shutdown_ = false;  // Protected by WorkSerializer
407 
408     OrphanablePtr<ChildPolicyHandler> child_policy_;
409     RefCountedPtr<LoadBalancingPolicy::Config> pending_config_;
410 
411     grpc_connectivity_state connectivity_state_ ABSL_GUARDED_BY(&RlsLb::mu_) =
412         GRPC_CHANNEL_CONNECTING;
413     RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_
414         ABSL_GUARDED_BY(&RlsLb::mu_);
415   };
416 
417   // A picker that uses the cache and the request map in the LB policy
418   // (synchronized via a mutex) to determine how to route requests.
419   class Picker final : public LoadBalancingPolicy::SubchannelPicker {
420    public:
421     explicit Picker(RefCountedPtr<RlsLb> lb_policy);
422 
423     PickResult Pick(PickArgs args) override;
424 
425    private:
426     PickResult PickFromDefaultTargetOrFail(const char* reason, PickArgs args,
427                                            absl::Status status)
428         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
429 
430     RefCountedPtr<RlsLb> lb_policy_;
431     RefCountedPtr<RlsLbConfig> config_;
432     RefCountedPtr<ChildPolicyWrapper> default_child_policy_;
433   };
434 
435   // An LRU cache with adjustable size.
436   class Cache final {
437    public:
438     using Iterator = std::list<RequestKey>::iterator;
439 
440     class Entry final : public InternallyRefCounted<Entry> {
441      public:
442       Entry(RefCountedPtr<RlsLb> lb_policy, const RequestKey& key);
443 
444       // Notify the entry when it's evicted from the cache. Performs shut down.
445       // Note: We are forced to disable lock analysis here because
446       // Orphan() is called by OrphanablePtr<>, which cannot have lock
447       // annotations for this particular caller.
448       void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
449 
status() const450       const absl::Status& status() const
451           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
452         return status_;
453       }
backoff_time() const454       Timestamp backoff_time() const
455           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
456         return backoff_time_;
457       }
backoff_expiration_time() const458       Timestamp backoff_expiration_time() const
459           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
460         return backoff_expiration_time_;
461       }
data_expiration_time() const462       Timestamp data_expiration_time() const
463           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
464         return data_expiration_time_;
465       }
header_data() const466       const grpc_event_engine::experimental::Slice& header_data() const
467           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
468         return header_data_;
469       }
stale_time() const470       Timestamp stale_time() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
471         return stale_time_;
472       }
min_expiration_time() const473       Timestamp min_expiration_time() const
474           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
475         return min_expiration_time_;
476       }
477 
TakeBackoffState()478       std::unique_ptr<BackOff> TakeBackoffState()
479           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
480         return std::move(backoff_state_);
481       }
482 
483       // Cache size of entry.
484       size_t Size() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
485 
486       // Pick subchannel for request based on the entry's state.
487       PickResult Pick(PickArgs args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
488 
489       // If the cache entry is in backoff state, resets the backoff and, if
490       // applicable, its backoff timer. The method does not update the LB
491       // policy's picker; the caller is responsible for that if necessary.
492       void ResetBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
493 
494       // Check if the entry should be removed by the clean-up timer.
495       bool ShouldRemove() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
496 
497       // Check if the entry can be evicted from the cache, i.e. the
498       // min_expiration_time_ has passed.
499       bool CanEvict() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
500 
501       // Updates the entry upon reception of a new RLS response.
502       // Returns a list of child policy wrappers on which FinishUpdate()
503       // needs to be called after releasing the lock.
504       std::vector<ChildPolicyWrapper*> OnRlsResponseLocked(
505           ResponseInfo response, std::unique_ptr<BackOff> backoff_state,
506           OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete)
507           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
508 
509       // Moves entry to the end of the LRU list.
510       void MarkUsed() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
511 
512       // Takes entries from child_policy_wrappers_ and appends them to the end
513       // of \a child_policy_wrappers.
TakeChildPolicyWrappers(std::vector<RefCountedPtr<ChildPolicyWrapper>> * child_policy_wrappers)514       void TakeChildPolicyWrappers(
515           std::vector<RefCountedPtr<ChildPolicyWrapper>>* child_policy_wrappers)
516           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
517         child_policy_wrappers->insert(
518             child_policy_wrappers->end(),
519             std::make_move_iterator(child_policy_wrappers_.begin()),
520             std::make_move_iterator(child_policy_wrappers_.end()));
521         child_policy_wrappers_.clear();
522       }
523 
524      private:
525       class BackoffTimer final : public InternallyRefCounted<BackoffTimer> {
526        public:
527         BackoffTimer(RefCountedPtr<Entry> entry, Duration delay);
528 
529         // Note: We are forced to disable lock analysis here because
530         // Orphan() is called by OrphanablePtr<>, which cannot have lock
531         // annotations for this particular caller.
532         void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
533 
534        private:
535         void OnBackoffTimerLocked();
536 
537         RefCountedPtr<Entry> entry_;
538         absl::optional<EventEngine::TaskHandle> backoff_timer_task_handle_
539             ABSL_GUARDED_BY(&RlsLb::mu_);
540       };
541 
542       RefCountedPtr<RlsLb> lb_policy_;
543 
544       bool is_shutdown_ ABSL_GUARDED_BY(&RlsLb::mu_) = false;
545 
546       // Backoff states
547       absl::Status status_ ABSL_GUARDED_BY(&RlsLb::mu_);
548       std::unique_ptr<BackOff> backoff_state_ ABSL_GUARDED_BY(&RlsLb::mu_);
549       Timestamp backoff_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
550           Timestamp::InfPast();
551       Timestamp backoff_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
552           Timestamp::InfPast();
553       OrphanablePtr<BackoffTimer> backoff_timer_;
554 
555       // RLS response states
556       std::vector<RefCountedPtr<ChildPolicyWrapper>> child_policy_wrappers_
557           ABSL_GUARDED_BY(&RlsLb::mu_);
558       grpc_event_engine::experimental::Slice header_data_
559           ABSL_GUARDED_BY(&RlsLb::mu_);
560       Timestamp data_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
561           Timestamp::InfPast();
562       Timestamp stale_time_ ABSL_GUARDED_BY(&RlsLb::mu_) = Timestamp::InfPast();
563 
564       Timestamp min_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_);
565       Cache::Iterator lru_iterator_ ABSL_GUARDED_BY(&RlsLb::mu_);
566     };
567 
568     explicit Cache(RlsLb* lb_policy);
569 
570     // Finds an entry from the cache that corresponds to a key. If an entry is
571     // not found, nullptr is returned. Otherwise, the entry is considered
572     // recently used and its order in the LRU list of the cache is updated.
573     Entry* Find(const RequestKey& key)
574         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
575 
576     // Finds an entry from the cache that corresponds to a key. If an entry is
577     // not found, an entry is created, inserted in the cache, and returned to
578     // the caller. Otherwise, the entry found is returned to the caller. The
579     // entry returned to the user is considered recently used and its order in
580     // the LRU list of the cache is updated.
581     Entry* FindOrInsert(const RequestKey& key,
582                         std::vector<RefCountedPtr<ChildPolicyWrapper>>*
583                             child_policy_wrappers_to_delete)
584         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
585 
586     // Resizes the cache. If the new cache size is greater than the current size
587     // of the cache, do nothing. Otherwise, evict the oldest entries that
588     // exceed the new size limit of the cache.
589     void Resize(size_t bytes, std::vector<RefCountedPtr<ChildPolicyWrapper>>*
590                                   child_policy_wrappers_to_delete)
591         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
592 
593     // Resets backoff of all the cache entries.
594     void ResetAllBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
595 
596     // Shutdown the cache; clean-up and orphan all the stored cache entries.
597     GRPC_MUST_USE_RESULT std::vector<RefCountedPtr<ChildPolicyWrapper>>
598     Shutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
599 
600     void ReportMetricsLocked(CallbackMetricReporter& reporter)
601         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
602 
603    private:
604     // Shared logic for starting the cleanup timer
605     void StartCleanupTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
606 
607     void OnCleanupTimer();
608 
609     // Returns the entry size for a given key.
610     static size_t EntrySizeForKey(const RequestKey& key);
611 
612     // Evicts oversized cache elements when the current size is greater than
613     // the specified limit.
614     void MaybeShrinkSize(size_t bytes,
615                          std::vector<RefCountedPtr<ChildPolicyWrapper>>*
616                              child_policy_wrappers_to_delete)
617         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
618 
619     RlsLb* lb_policy_;
620 
621     size_t size_limit_ ABSL_GUARDED_BY(&RlsLb::mu_) = 0;
622     size_t size_ ABSL_GUARDED_BY(&RlsLb::mu_) = 0;
623 
624     std::list<RequestKey> lru_list_ ABSL_GUARDED_BY(&RlsLb::mu_);
625     std::unordered_map<RequestKey, OrphanablePtr<Entry>, absl::Hash<RequestKey>>
626         map_ ABSL_GUARDED_BY(&RlsLb::mu_);
627     absl::optional<EventEngine::TaskHandle> cleanup_timer_handle_;
628   };
629 
630   // Channel for communicating with the RLS server.
631   // Contains throttling logic for RLS requests.
632   class RlsChannel final : public InternallyRefCounted<RlsChannel> {
633    public:
634     explicit RlsChannel(RefCountedPtr<RlsLb> lb_policy);
635 
636     // Shuts down the channel.
637     void Orphan() override;
638 
639     // Starts an RLS call.
640     // If stale_entry is non-null, it points to the entry containing
641     // stale data for the key.
642     void StartRlsCall(const RequestKey& key, Cache::Entry* stale_entry)
643         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
644 
645     // Reports the result of an RLS call to the throttle.
646     void ReportResponseLocked(bool response_succeeded)
647         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
648 
649     // Checks if a proposed RLS call should be throttled.
ShouldThrottle()650     bool ShouldThrottle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
651       return throttle_.ShouldThrottle();
652     }
653 
654     // Resets the channel's backoff.
655     void ResetBackoff();
656 
channel() const657     Channel* channel() const { return channel_.get(); }
658 
659    private:
660     // Watches the state of the RLS channel. Notifies the LB policy when
661     // the channel was previously in TRANSIENT_FAILURE and then becomes READY.
662     class StateWatcher final : public AsyncConnectivityStateWatcherInterface {
663      public:
StateWatcher(RefCountedPtr<RlsChannel> rls_channel)664       explicit StateWatcher(RefCountedPtr<RlsChannel> rls_channel)
665           : AsyncConnectivityStateWatcherInterface(
666                 rls_channel->lb_policy_->work_serializer()),
667             rls_channel_(std::move(rls_channel)) {}
668 
669      private:
670       void OnConnectivityStateChange(grpc_connectivity_state new_state,
671                                      const absl::Status& status) override;
672 
673       RefCountedPtr<RlsChannel> rls_channel_;
674       bool was_transient_failure_ = false;
675     };
676 
677     // Throttle state for RLS requests.
678     class Throttle final {
679      public:
Throttle(Duration window_size=kDefaultThrottleWindowSize,float ratio_for_successes=kDefaultThrottleRatioForSuccesses,int padding=kDefaultThrottlePadding)680       explicit Throttle(
681           Duration window_size = kDefaultThrottleWindowSize,
682           float ratio_for_successes = kDefaultThrottleRatioForSuccesses,
683           int padding = kDefaultThrottlePadding)
684           : window_size_(window_size),
685             ratio_for_successes_(ratio_for_successes),
686             padding_(padding) {}
687 
688       bool ShouldThrottle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
689 
690       void RegisterResponse(bool success)
691           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
692 
693      private:
694       Duration window_size_;
695       double ratio_for_successes_;
696       int padding_;
697       std::mt19937 rng_{std::random_device()()};
698 
699       // Logged timestamp of requests.
700       std::deque<Timestamp> requests_ ABSL_GUARDED_BY(&RlsLb::mu_);
701 
702       // Logged timestamps of failures.
703       std::deque<Timestamp> failures_ ABSL_GUARDED_BY(&RlsLb::mu_);
704     };
705 
706     RefCountedPtr<RlsLb> lb_policy_;
707     bool is_shutdown_ = false;
708 
709     RefCountedPtr<Channel> channel_;
710     RefCountedPtr<channelz::ChannelNode> parent_channelz_node_;
711     StateWatcher* watcher_ = nullptr;
712     Throttle throttle_ ABSL_GUARDED_BY(&RlsLb::mu_);
713   };
714 
715   // A pending RLS request.  Instances will be tracked in request_map_.
716   class RlsRequest final : public InternallyRefCounted<RlsRequest> {
717    public:
718     // Asynchronously starts a call on rls_channel for key.
719     // Stores backoff_state, which will be transferred to the data cache
720     // if the RLS request fails.
721     RlsRequest(RefCountedPtr<RlsLb> lb_policy, RlsLb::RequestKey key,
722                RefCountedPtr<RlsChannel> rls_channel,
723                std::unique_ptr<BackOff> backoff_state,
724                grpc_lookup_v1_RouteLookupRequest_Reason reason,
725                grpc_event_engine::experimental::Slice stale_header_data);
726     ~RlsRequest() override;
727 
728     // Shuts down the request.  If the request is still in flight, it is
729     // cancelled, in which case no response will be added to the cache.
730     void Orphan() override;
731 
732    private:
733     // Callback to be invoked to start the call.
734     static void StartCall(void* arg, grpc_error_handle error);
735 
736     // Helper for StartCall() that runs within the WorkSerializer.
737     void StartCallLocked();
738 
739     // Callback to be invoked when the call is completed.
740     static void OnRlsCallComplete(void* arg, grpc_error_handle error);
741 
742     // Call completion callback running on LB policy WorkSerializer.
743     void OnRlsCallCompleteLocked(grpc_error_handle error);
744 
745     grpc_byte_buffer* MakeRequestProto();
746     ResponseInfo ParseResponseProto();
747 
748     RefCountedPtr<RlsLb> lb_policy_;
749     RlsLb::RequestKey key_;
750     RefCountedPtr<RlsChannel> rls_channel_;
751     std::unique_ptr<BackOff> backoff_state_;
752     grpc_lookup_v1_RouteLookupRequest_Reason reason_;
753     grpc_event_engine::experimental::Slice stale_header_data_;
754 
755     // RLS call state.
756     Timestamp deadline_;
757     grpc_closure call_start_cb_;
758     grpc_closure call_complete_cb_;
759     grpc_call* call_ = nullptr;
760     grpc_byte_buffer* send_message_ = nullptr;
761     grpc_metadata_array recv_initial_metadata_;
762     grpc_byte_buffer* recv_message_ = nullptr;
763     grpc_metadata_array recv_trailing_metadata_;
764     grpc_status_code status_recv_;
765     grpc_slice status_details_recv_;
766   };
767 
768   void ShutdownLocked() override;
769 
770   // Returns a new picker to the channel to trigger reprocessing of
771   // pending picks.  Schedules the actual picker update on the ExecCtx
772   // to be run later, so it's safe to invoke this while holding the lock.
773   void UpdatePickerAsync();
774   // Hops into work serializer and calls UpdatePickerLocked().
775   static void UpdatePickerCallback(void* arg, grpc_error_handle error);
776   // Updates the picker in the work serializer.
777   void UpdatePickerLocked() ABSL_LOCKS_EXCLUDED(&mu_);
778 
779   template <typename HandleType>
780   void MaybeExportPickCount(HandleType handle, absl::string_view target,
781                             const PickResult& pick_result);
782 
783   const std::string instance_uuid_;
784 
785   // Mutex to guard LB policy state that is accessed by the picker.
786   Mutex mu_;
787   bool is_shutdown_ ABSL_GUARDED_BY(mu_) = false;
788   bool update_in_progress_ = false;
789   Cache cache_ ABSL_GUARDED_BY(mu_);
790   // Maps an RLS request key to an RlsRequest object that represents a pending
791   // RLS request.
792   std::unordered_map<RequestKey, OrphanablePtr<RlsRequest>,
793                      absl::Hash<RequestKey>>
794       request_map_ ABSL_GUARDED_BY(mu_);
795   // The channel on which RLS requests are sent.
796   // Note that this channel may be swapped out when the RLS policy gets
797   // an update.  However, when that happens, any existing entries in
798   // request_map_ will continue to use the previous channel.
799   OrphanablePtr<RlsChannel> rls_channel_ ABSL_GUARDED_BY(mu_);
800 
801   // Accessed only from within WorkSerializer.
802   absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses_;
803   ChannelArgs channel_args_;
804   RefCountedPtr<RlsLbConfig> config_;
805   RefCountedPtr<ChildPolicyWrapper> default_child_policy_;
806   std::map<std::string /*target*/, ChildPolicyWrapper*> child_policy_map_;
807 
808   // Must be after mu_, so that it is destroyed before mu_.
809   std::unique_ptr<RegisteredMetricCallback> registered_metric_callback_;
810 };
811 
812 //
813 // RlsLb::ChildPolicyWrapper
814 //
815 
ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy,std::string target)816 RlsLb::ChildPolicyWrapper::ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy,
817                                               std::string target)
818     : DualRefCounted<ChildPolicyWrapper>(
819           GRPC_TRACE_FLAG_ENABLED(rls_lb) ? "ChildPolicyWrapper" : nullptr),
820       lb_policy_(std::move(lb_policy)),
821       target_(std::move(target)),
822       picker_(MakeRefCounted<QueuePicker>(nullptr)) {
823   lb_policy_->child_policy_map_.emplace(target_, this);
824 }
825 
Orphaned()826 void RlsLb::ChildPolicyWrapper::Orphaned() {
827   GRPC_TRACE_LOG(rls_lb, INFO)
828       << "[rlslb " << lb_policy_.get() << "] ChildPolicyWrapper=" << this
829       << " [" << target_ << "]: shutdown";
830   is_shutdown_ = true;
831   lb_policy_->child_policy_map_.erase(target_);
832   if (child_policy_ != nullptr) {
833     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
834                                      lb_policy_->interested_parties());
835     child_policy_.reset();
836   }
837   picker_.reset();
838 }
839 
InsertOrUpdateChildPolicyField(const std::string & field,const std::string & value,const Json & config,ValidationErrors * errors)840 absl::optional<Json> InsertOrUpdateChildPolicyField(const std::string& field,
841                                                     const std::string& value,
842                                                     const Json& config,
843                                                     ValidationErrors* errors) {
844   if (config.type() != Json::Type::kArray) {
845     errors->AddError("is not an array");
846     return absl::nullopt;
847   }
848   const size_t original_num_errors = errors->size();
849   Json::Array array;
850   for (size_t i = 0; i < config.array().size(); ++i) {
851     const Json& child_json = config.array()[i];
852     ValidationErrors::ScopedField json_field(errors, absl::StrCat("[", i, "]"));
853     if (child_json.type() != Json::Type::kObject) {
854       errors->AddError("is not an object");
855     } else {
856       const Json::Object& child = child_json.object();
857       if (child.size() != 1) {
858         errors->AddError("child policy object contains more than one field");
859       } else {
860         const std::string& child_name = child.begin()->first;
861         ValidationErrors::ScopedField json_field(
862             errors, absl::StrCat("[\"", child_name, "\"]"));
863         const Json& child_config_json = child.begin()->second;
864         if (child_config_json.type() != Json::Type::kObject) {
865           errors->AddError("child policy config is not an object");
866         } else {
867           Json::Object child_config = child_config_json.object();
868           child_config[field] = Json::FromString(value);
869           array.emplace_back(Json::FromObject(
870               {{child_name, Json::FromObject(std::move(child_config))}}));
871         }
872       }
873     }
874   }
875   if (errors->size() != original_num_errors) return absl::nullopt;
876   return Json::FromArray(std::move(array));
877 }
878 
StartUpdate(OrphanablePtr<ChildPolicyHandler> * child_policy_to_delete)879 void RlsLb::ChildPolicyWrapper::StartUpdate(
880     OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete) {
881   ValidationErrors errors;
882   auto child_policy_config = InsertOrUpdateChildPolicyField(
883       lb_policy_->config_->child_policy_config_target_field_name(), target_,
884       lb_policy_->config_->child_policy_config(), &errors);
885   CHECK(child_policy_config.has_value());
886   GRPC_TRACE_LOG(rls_lb, INFO)
887       << "[rlslb " << lb_policy_.get() << "] ChildPolicyWrapper=" << this
888       << " [" << target_
889       << "]: validating update, config: " << JsonDump(*child_policy_config);
890   auto config =
891       CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
892           *child_policy_config);
893   // Returned RLS target fails the validation.
894   if (!config.ok()) {
895     GRPC_TRACE_LOG(rls_lb, INFO)
896         << "[rlslb " << lb_policy_.get() << "] ChildPolicyWrapper=" << this
897         << " [" << target_ << "]: config failed to parse: " << config.status();
898     pending_config_.reset();
899     picker_ = MakeRefCounted<TransientFailurePicker>(
900         absl::UnavailableError(config.status().message()));
901     *child_policy_to_delete = std::move(child_policy_);
902   } else {
903     pending_config_ = std::move(*config);
904   }
905 }
906 
MaybeFinishUpdate()907 absl::Status RlsLb::ChildPolicyWrapper::MaybeFinishUpdate() {
908   // If pending_config_ is not set, that means StartUpdate() failed, so
909   // there's nothing to do here.
910   if (pending_config_ == nullptr) return absl::OkStatus();
911   // If child policy doesn't yet exist, create it.
912   if (child_policy_ == nullptr) {
913     Args create_args;
914     create_args.work_serializer = lb_policy_->work_serializer();
915     create_args.channel_control_helper = std::make_unique<ChildPolicyHelper>(
916         WeakRef(DEBUG_LOCATION, "ChildPolicyHelper"));
917     create_args.args = lb_policy_->channel_args_;
918     child_policy_ = MakeOrphanable<ChildPolicyHandler>(std::move(create_args),
919                                                        &rls_lb_trace);
920     GRPC_TRACE_LOG(rls_lb, INFO)
921         << "[rlslb " << lb_policy_.get() << "] ChildPolicyWrapper=" << this
922         << " [" << target_ << "], created new child policy handler "
923         << child_policy_.get();
924     grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
925                                      lb_policy_->interested_parties());
926   }
927   // Send the child the updated config.
928   GRPC_TRACE_LOG(rls_lb, INFO)
929       << "[rlslb " << lb_policy_.get() << "] ChildPolicyWrapper=" << this
930       << " [" << target_ << "], updating child policy handler "
931       << child_policy_.get();
932   UpdateArgs update_args;
933   update_args.config = std::move(pending_config_);
934   update_args.addresses = lb_policy_->addresses_;
935   update_args.args = lb_policy_->channel_args_;
936   return child_policy_->UpdateLocked(std::move(update_args));
937 }
938 
939 //
940 // RlsLb::ChildPolicyWrapper::ChildPolicyHelper
941 //
942 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)943 void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState(
944     grpc_connectivity_state state, const absl::Status& status,
945     RefCountedPtr<SubchannelPicker> picker) {
946   GRPC_TRACE_LOG(rls_lb, INFO)
947       << "[rlslb " << wrapper_->lb_policy_.get()
948       << "] ChildPolicyWrapper=" << wrapper_.get() << " [" << wrapper_->target_
949       << "] ChildPolicyHelper=" << this
950       << ": UpdateState(state=" << ConnectivityStateName(state)
951       << ", status=" << status << ", picker=" << picker.get() << ")";
952   if (wrapper_->is_shutdown_) return;
953   {
954     MutexLock lock(&wrapper_->lb_policy_->mu_);
955     // TODO(roth): It looks like this ignores subsequent TF updates that
956     // might change the status used to fail picks, which seems wrong.
957     if (wrapper_->connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
958         state != GRPC_CHANNEL_READY) {
959       return;
960     }
961     wrapper_->connectivity_state_ = state;
962     DCHECK(picker != nullptr);
963     if (picker != nullptr) {
964       // We want to unref the picker after we release the lock.
965       wrapper_->picker_.swap(picker);
966     }
967   }
968   wrapper_->lb_policy_->UpdatePickerLocked();
969 }
970 
971 //
972 // RlsLb::Picker
973 //
974 
975 // Builds the key to be used for a request based on path and initial_metadata.
BuildKeyMap(const RlsLbConfig::KeyBuilderMap & key_builder_map,absl::string_view path,absl::string_view host,const LoadBalancingPolicy::MetadataInterface * initial_metadata)976 std::map<std::string, std::string> BuildKeyMap(
977     const RlsLbConfig::KeyBuilderMap& key_builder_map, absl::string_view path,
978     absl::string_view host,
979     const LoadBalancingPolicy::MetadataInterface* initial_metadata) {
980   size_t last_slash_pos = path.npos;  // May need this a few times, so cache it.
981   // Find key builder for this path.
982   auto it = key_builder_map.find(std::string(path));
983   if (it == key_builder_map.end()) {
984     // Didn't find exact match, try method wildcard.
985     last_slash_pos = path.rfind('/');
986     DCHECK(last_slash_pos != path.npos);
987     if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
988     std::string service(path.substr(0, last_slash_pos + 1));
989     it = key_builder_map.find(service);
990     if (it == key_builder_map.end()) return {};
991   }
992   const RlsLbConfig::KeyBuilder* key_builder = &it->second;
993   // Construct key map using key builder.
994   std::map<std::string, std::string> key_map;
995   // Add header keys.
996   for (const auto& p : key_builder->header_keys) {
997     const std::string& key = p.first;
998     const std::vector<std::string>& header_names = p.second;
999     for (const std::string& header_name : header_names) {
1000       std::string buffer;
1001       absl::optional<absl::string_view> value =
1002           initial_metadata->Lookup(header_name, &buffer);
1003       if (value.has_value()) {
1004         key_map[key] = std::string(*value);
1005         break;
1006       }
1007     }
1008   }
1009   // Add constant keys.
1010   key_map.insert(key_builder->constant_keys.begin(),
1011                  key_builder->constant_keys.end());
1012   // Add host key.
1013   if (!key_builder->host_key.empty()) {
1014     key_map[key_builder->host_key] = std::string(host);
1015   }
1016   // Add service key.
1017   if (!key_builder->service_key.empty()) {
1018     if (last_slash_pos == path.npos) {
1019       last_slash_pos = path.rfind('/');
1020       DCHECK(last_slash_pos != path.npos);
1021       if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
1022     }
1023     key_map[key_builder->service_key] =
1024         std::string(path.substr(1, last_slash_pos - 1));
1025   }
1026   // Add method key.
1027   if (!key_builder->method_key.empty()) {
1028     if (last_slash_pos == path.npos) {
1029       last_slash_pos = path.rfind('/');
1030       DCHECK(last_slash_pos != path.npos);
1031       if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
1032     }
1033     key_map[key_builder->method_key] =
1034         std::string(path.substr(last_slash_pos + 1));
1035   }
1036   return key_map;
1037 }
1038 
Picker(RefCountedPtr<RlsLb> lb_policy)1039 RlsLb::Picker::Picker(RefCountedPtr<RlsLb> lb_policy)
1040     : lb_policy_(std::move(lb_policy)), config_(lb_policy_->config_) {
1041   if (lb_policy_->default_child_policy_ != nullptr) {
1042     default_child_policy_ =
1043         lb_policy_->default_child_policy_->Ref(DEBUG_LOCATION, "Picker");
1044   }
1045 }
1046 
Pick(PickArgs args)1047 LoadBalancingPolicy::PickResult RlsLb::Picker::Pick(PickArgs args) {
1048   // Construct key for request.
1049   RequestKey key = {
1050       BuildKeyMap(config_->key_builder_map(), args.path,
1051                   lb_policy_->channel_control_helper()->GetAuthority(),
1052                   args.initial_metadata)};
1053   GRPC_TRACE_LOG(rls_lb, INFO)
1054       << "[rlslb " << lb_policy_.get() << "] picker=" << this
1055       << ": request keys: " << key.ToString();
1056   Timestamp now = Timestamp::Now();
1057   MutexLock lock(&lb_policy_->mu_);
1058   if (lb_policy_->is_shutdown_) {
1059     return PickResult::Fail(
1060         absl::UnavailableError("LB policy already shut down"));
1061   }
1062   // Check if there's a cache entry.
1063   Cache::Entry* entry = lb_policy_->cache_.Find(key);
1064   // If there is no cache entry, or if the cache entry is not in backoff
1065   // and has a stale time in the past, and there is not already a
1066   // pending RLS request for this key, then try to start a new RLS request.
1067   if ((entry == nullptr ||
1068        (entry->stale_time() < now && entry->backoff_time() < now)) &&
1069       lb_policy_->request_map_.find(key) == lb_policy_->request_map_.end()) {
1070     // Check if requests are being throttled.
1071     if (lb_policy_->rls_channel_->ShouldThrottle()) {
1072       // Request is throttled.
1073       // If there is no non-expired data in the cache, then we use the
1074       // default target if set, or else we fail the pick.
1075       if (entry == nullptr || entry->data_expiration_time() < now) {
1076         return PickFromDefaultTargetOrFail(
1077             "RLS call throttled", args,
1078             absl::UnavailableError("RLS request throttled"));
1079       }
1080     }
1081     // Start the RLS call.
1082     lb_policy_->rls_channel_->StartRlsCall(
1083         key, (entry == nullptr || entry->data_expiration_time() < now) ? nullptr
1084                                                                        : entry);
1085   }
1086   // If the cache entry exists, see if it has usable data.
1087   if (entry != nullptr) {
1088     // If the entry has non-expired data, use it.
1089     if (entry->data_expiration_time() >= now) {
1090       GRPC_TRACE_LOG(rls_lb, INFO)
1091           << "[rlslb " << lb_policy_.get() << "] picker=" << this
1092           << ": using cache entry " << entry;
1093       return entry->Pick(args);
1094     }
1095     // If the entry is in backoff, then use the default target if set,
1096     // or else fail the pick.
1097     if (entry->backoff_time() >= now) {
1098       return PickFromDefaultTargetOrFail(
1099           "RLS call in backoff", args,
1100           absl::UnavailableError(absl::StrCat("RLS request failed: ",
1101                                               entry->status().ToString())));
1102     }
1103   }
1104   // RLS call pending.  Queue the pick.
1105   GRPC_TRACE_LOG(rls_lb, INFO)
1106       << "[rlslb " << lb_policy_.get() << "] picker=" << this
1107       << ": RLS request pending; queuing pick";
1108   return PickResult::Queue();
1109 }
1110 
PickFromDefaultTargetOrFail(const char * reason,PickArgs args,absl::Status status)1111 LoadBalancingPolicy::PickResult RlsLb::Picker::PickFromDefaultTargetOrFail(
1112     const char* reason, PickArgs args, absl::Status status) {
1113   if (default_child_policy_ != nullptr) {
1114     GRPC_TRACE_LOG(rls_lb, INFO)
1115         << "[rlslb " << lb_policy_.get() << "] picker=" << this << ": "
1116         << reason << "; using default target";
1117     auto pick_result = default_child_policy_->Pick(args);
1118     lb_policy_->MaybeExportPickCount(kMetricDefaultTargetPicks,
1119                                      config_->default_target(), pick_result);
1120     return pick_result;
1121   }
1122   GRPC_TRACE_LOG(rls_lb, INFO)
1123       << "[rlslb " << lb_policy_.get() << "] picker=" << this << ": " << reason
1124       << "; failing pick";
1125   auto& stats_plugins =
1126       lb_policy_->channel_control_helper()->GetStatsPluginGroup();
1127   stats_plugins.AddCounter(kMetricFailedPicks, 1,
1128                            {lb_policy_->channel_control_helper()->GetTarget(),
1129                             config_->lookup_service()},
1130                            {});
1131   return PickResult::Fail(std::move(status));
1132 }
1133 
1134 //
1135 // RlsLb::Cache::Entry::BackoffTimer
1136 //
1137 
BackoffTimer(RefCountedPtr<Entry> entry,Duration delay)1138 RlsLb::Cache::Entry::BackoffTimer::BackoffTimer(RefCountedPtr<Entry> entry,
1139                                                 Duration delay)
1140     : entry_(std::move(entry)) {
1141   backoff_timer_task_handle_ =
1142       entry_->lb_policy_->channel_control_helper()->GetEventEngine()->RunAfter(
1143           delay, [self = Ref(DEBUG_LOCATION, "BackoffTimer")]() mutable {
1144             ApplicationCallbackExecCtx callback_exec_ctx;
1145             ExecCtx exec_ctx;
1146             auto self_ptr = self.get();
1147             self_ptr->entry_->lb_policy_->work_serializer()->Run(
1148                 [self = std::move(self)]() { self->OnBackoffTimerLocked(); },
1149                 DEBUG_LOCATION);
1150           });
1151 }
1152 
Orphan()1153 void RlsLb::Cache::Entry::BackoffTimer::Orphan() {
1154   if (backoff_timer_task_handle_.has_value() &&
1155       entry_->lb_policy_->channel_control_helper()->GetEventEngine()->Cancel(
1156           *backoff_timer_task_handle_)) {
1157     GRPC_TRACE_LOG(rls_lb, INFO)
1158         << "[rlslb " << entry_->lb_policy_.get()
1159         << "] cache entry=" << entry_.get() << " "
1160         << (entry_->is_shutdown_ ? "(shut down)"
1161                                  : entry_->lru_iterator_->ToString())
1162         << ", backoff timer canceled";
1163   }
1164   backoff_timer_task_handle_.reset();
1165   Unref(DEBUG_LOCATION, "Orphan");
1166 }
1167 
OnBackoffTimerLocked()1168 void RlsLb::Cache::Entry::BackoffTimer::OnBackoffTimerLocked() {
1169   {
1170     MutexLock lock(&entry_->lb_policy_->mu_);
1171     GRPC_TRACE_LOG(rls_lb, INFO)
1172         << "[rlslb " << entry_->lb_policy_.get()
1173         << "] cache entry=" << entry_.get() << " "
1174         << (entry_->is_shutdown_ ? "(shut down)"
1175                                  : entry_->lru_iterator_->ToString())
1176         << ", backoff timer fired";
1177     // Skip the update if Orphaned
1178     if (!backoff_timer_task_handle_.has_value()) return;
1179     backoff_timer_task_handle_.reset();
1180   }
1181   // The pick was in backoff state and there could be a pick queued if
1182   // wait_for_ready is true. We'll update the picker for that case.
1183   entry_->lb_policy_->UpdatePickerLocked();
1184 }
1185 
1186 //
1187 // RlsLb::Cache::Entry
1188 //
1189 
MakeCacheEntryBackoff()1190 std::unique_ptr<BackOff> MakeCacheEntryBackoff() {
1191   return std::make_unique<BackOff>(
1192       BackOff::Options()
1193           .set_initial_backoff(kCacheBackoffInitial)
1194           .set_multiplier(kCacheBackoffMultiplier)
1195           .set_jitter(kCacheBackoffJitter)
1196           .set_max_backoff(kCacheBackoffMax));
1197 }
1198 
Entry(RefCountedPtr<RlsLb> lb_policy,const RequestKey & key)1199 RlsLb::Cache::Entry::Entry(RefCountedPtr<RlsLb> lb_policy,
1200                            const RequestKey& key)
1201     : InternallyRefCounted<Entry>(GRPC_TRACE_FLAG_ENABLED(rls_lb) ? "CacheEntry"
1202                                                                   : nullptr),
1203       lb_policy_(std::move(lb_policy)),
1204       backoff_state_(MakeCacheEntryBackoff()),
1205       min_expiration_time_(Timestamp::Now() + kMinExpirationTime),
1206       lru_iterator_(lb_policy_->cache_.lru_list_.insert(
1207           lb_policy_->cache_.lru_list_.end(), key)) {}
1208 
Orphan()1209 void RlsLb::Cache::Entry::Orphan() {
1210   // We should be holding RlsLB::mu_.
1211   GRPC_TRACE_LOG(rls_lb, INFO)
1212       << "[rlslb " << lb_policy_.get() << "] cache entry=" << this << " "
1213       << lru_iterator_->ToString() << ": cache entry evicted";
1214   is_shutdown_ = true;
1215   lb_policy_->cache_.lru_list_.erase(lru_iterator_);
1216   lru_iterator_ = lb_policy_->cache_.lru_list_.end();  // Just in case.
1217   CHECK(child_policy_wrappers_.empty());
1218   backoff_state_.reset();
1219   if (backoff_timer_ != nullptr) {
1220     backoff_timer_.reset();
1221     lb_policy_->UpdatePickerAsync();
1222   }
1223   Unref(DEBUG_LOCATION, "Orphan");
1224 }
1225 
Size() const1226 size_t RlsLb::Cache::Entry::Size() const {
1227   // lru_iterator_ is not valid once we're shut down.
1228   CHECK(!is_shutdown_);
1229   return lb_policy_->cache_.EntrySizeForKey(*lru_iterator_);
1230 }
1231 
Pick(PickArgs args)1232 LoadBalancingPolicy::PickResult RlsLb::Cache::Entry::Pick(PickArgs args) {
1233   size_t i = 0;
1234   ChildPolicyWrapper* child_policy_wrapper = nullptr;
1235   // Skip targets before the last one that are in state TRANSIENT_FAILURE.
1236   for (; i < child_policy_wrappers_.size(); ++i) {
1237     child_policy_wrapper = child_policy_wrappers_[i].get();
1238     if (child_policy_wrapper->connectivity_state() ==
1239             GRPC_CHANNEL_TRANSIENT_FAILURE &&
1240         i < child_policy_wrappers_.size() - 1) {
1241       GRPC_TRACE_LOG(rls_lb, INFO)
1242           << "[rlslb " << lb_policy_.get() << "] cache entry=" << this << " "
1243           << lru_iterator_->ToString() << ": target "
1244           << child_policy_wrapper->target() << " (" << i << " of "
1245           << child_policy_wrappers_.size()
1246           << ") in state TRANSIENT_FAILURE; skipping";
1247       continue;
1248     }
1249     break;
1250   }
1251   // Child policy not in TRANSIENT_FAILURE or is the last target in
1252   // the list, so delegate.
1253   GRPC_TRACE_LOG(rls_lb, INFO)
1254       << "[rlslb " << lb_policy_.get() << "] cache entry=" << this << " "
1255       << lru_iterator_->ToString() << ": target "
1256       << child_policy_wrapper->target() << " (" << i << " of "
1257       << child_policy_wrappers_.size() << ") in state "
1258       << ConnectivityStateName(child_policy_wrapper->connectivity_state())
1259       << "; delegating";
1260   auto pick_result = child_policy_wrapper->Pick(args);
1261   lb_policy_->MaybeExportPickCount(kMetricTargetPicks,
1262                                    child_policy_wrapper->target(), pick_result);
1263   // Add header data.
1264   if (!header_data_.empty()) {
1265     auto* complete_pick =
1266         absl::get_if<PickResult::Complete>(&pick_result.result);
1267     if (complete_pick != nullptr) {
1268       complete_pick->metadata_mutations.Set(kRlsHeaderKey, header_data_.Ref());
1269     }
1270   }
1271   return pick_result;
1272 }
1273 
ResetBackoff()1274 void RlsLb::Cache::Entry::ResetBackoff() {
1275   backoff_time_ = Timestamp::InfPast();
1276   backoff_timer_.reset();
1277 }
1278 
ShouldRemove() const1279 bool RlsLb::Cache::Entry::ShouldRemove() const {
1280   Timestamp now = Timestamp::Now();
1281   return data_expiration_time_ < now && backoff_expiration_time_ < now;
1282 }
1283 
CanEvict() const1284 bool RlsLb::Cache::Entry::CanEvict() const {
1285   Timestamp now = Timestamp::Now();
1286   return min_expiration_time_ < now;
1287 }
1288 
MarkUsed()1289 void RlsLb::Cache::Entry::MarkUsed() {
1290   auto& lru_list = lb_policy_->cache_.lru_list_;
1291   auto new_it = lru_list.insert(lru_list.end(), *lru_iterator_);
1292   lru_list.erase(lru_iterator_);
1293   lru_iterator_ = new_it;
1294 }
1295 
1296 std::vector<RlsLb::ChildPolicyWrapper*>
OnRlsResponseLocked(ResponseInfo response,std::unique_ptr<BackOff> backoff_state,OrphanablePtr<ChildPolicyHandler> * child_policy_to_delete)1297 RlsLb::Cache::Entry::OnRlsResponseLocked(
1298     ResponseInfo response, std::unique_ptr<BackOff> backoff_state,
1299     OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete) {
1300   // Move the entry to the end of the LRU list.
1301   MarkUsed();
1302   // If the request failed, store the failed status and update the
1303   // backoff state.
1304   if (!response.status.ok()) {
1305     status_ = response.status;
1306     if (backoff_state != nullptr) {
1307       backoff_state_ = std::move(backoff_state);
1308     } else {
1309       backoff_state_ = MakeCacheEntryBackoff();
1310     }
1311     const Duration delay = backoff_state_->NextAttemptDelay();
1312     const Timestamp now = Timestamp::Now();
1313     backoff_time_ = now + delay;
1314     backoff_expiration_time_ = now + delay * 2;
1315     backoff_timer_ = MakeOrphanable<BackoffTimer>(
1316         Ref(DEBUG_LOCATION, "BackoffTimer"), delay);
1317     lb_policy_->UpdatePickerAsync();
1318     return {};
1319   }
1320   // Request succeeded, so store the result.
1321   header_data_ = std::move(response.header_data);
1322   Timestamp now = Timestamp::Now();
1323   data_expiration_time_ = now + lb_policy_->config_->max_age();
1324   stale_time_ = now + lb_policy_->config_->stale_age();
1325   status_ = absl::OkStatus();
1326   backoff_state_.reset();
1327   backoff_time_ = Timestamp::InfPast();
1328   backoff_expiration_time_ = Timestamp::InfPast();
1329   // Check if we need to update this list of targets.
1330   bool targets_changed = [&]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
1331     if (child_policy_wrappers_.size() != response.targets.size()) return true;
1332     for (size_t i = 0; i < response.targets.size(); ++i) {
1333       if (child_policy_wrappers_[i]->target() != response.targets[i]) {
1334         return true;
1335       }
1336     }
1337     return false;
1338   }();
1339   if (!targets_changed) {
1340     // Targets didn't change, so we're not updating the list of child
1341     // policies.  Return a new picker so that any queued requests can be
1342     // re-processed.
1343     lb_policy_->UpdatePickerAsync();
1344     return {};
1345   }
1346   // Target list changed, so update it.
1347   std::set<absl::string_view> old_targets;
1348   for (RefCountedPtr<ChildPolicyWrapper>& child_policy_wrapper :
1349        child_policy_wrappers_) {
1350     old_targets.emplace(child_policy_wrapper->target());
1351   }
1352   bool update_picker = false;
1353   std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
1354   std::vector<RefCountedPtr<ChildPolicyWrapper>> new_child_policy_wrappers;
1355   new_child_policy_wrappers.reserve(response.targets.size());
1356   for (std::string& target : response.targets) {
1357     auto it = lb_policy_->child_policy_map_.find(target);
1358     if (it == lb_policy_->child_policy_map_.end()) {
1359       auto new_child = MakeRefCounted<ChildPolicyWrapper>(
1360           lb_policy_.Ref(DEBUG_LOCATION, "ChildPolicyWrapper"), target);
1361       new_child->StartUpdate(child_policy_to_delete);
1362       child_policies_to_finish_update.push_back(new_child.get());
1363       new_child_policy_wrappers.emplace_back(std::move(new_child));
1364     } else {
1365       new_child_policy_wrappers.emplace_back(
1366           it->second->Ref(DEBUG_LOCATION, "CacheEntry"));
1367       // If the target already existed but was not previously used for
1368       // this key, then we'll need to update the picker, since we
1369       // didn't actually create a new child policy, which would have
1370       // triggered an RLS picker update when it returned its first picker.
1371       if (old_targets.find(target) == old_targets.end()) {
1372         update_picker = true;
1373       }
1374     }
1375   }
1376   child_policy_wrappers_ = std::move(new_child_policy_wrappers);
1377   if (update_picker) {
1378     lb_policy_->UpdatePickerAsync();
1379   }
1380   return child_policies_to_finish_update;
1381 }
1382 
1383 //
1384 // RlsLb::Cache
1385 //
1386 
Cache(RlsLb * lb_policy)1387 RlsLb::Cache::Cache(RlsLb* lb_policy) : lb_policy_(lb_policy) {
1388   StartCleanupTimer();
1389 }
1390 
Find(const RequestKey & key)1391 RlsLb::Cache::Entry* RlsLb::Cache::Find(const RequestKey& key) {
1392   auto it = map_.find(key);
1393   if (it == map_.end()) return nullptr;
1394   it->second->MarkUsed();
1395   return it->second.get();
1396 }
1397 
FindOrInsert(const RequestKey & key,std::vector<RefCountedPtr<ChildPolicyWrapper>> * child_policy_wrappers_to_delete)1398 RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(
1399     const RequestKey& key, std::vector<RefCountedPtr<ChildPolicyWrapper>>*
1400                                child_policy_wrappers_to_delete) {
1401   auto it = map_.find(key);
1402   // If not found, create new entry.
1403   if (it == map_.end()) {
1404     size_t entry_size = EntrySizeForKey(key);
1405     MaybeShrinkSize(size_limit_ - std::min(size_limit_, entry_size),
1406                     child_policy_wrappers_to_delete);
1407     Entry* entry = new Entry(
1408         lb_policy_->RefAsSubclass<RlsLb>(DEBUG_LOCATION, "CacheEntry"), key);
1409     map_.emplace(key, OrphanablePtr<Entry>(entry));
1410     size_ += entry_size;
1411     GRPC_TRACE_LOG(rls_lb, INFO)
1412         << "[rlslb " << lb_policy_ << "] key=" << key.ToString()
1413         << ": cache entry added, entry=" << entry;
1414     return entry;
1415   }
1416   // Entry found, so use it.
1417   GRPC_TRACE_LOG(rls_lb, INFO)
1418       << "[rlslb " << lb_policy_ << "] key=" << key.ToString()
1419       << ": found cache entry " << it->second.get();
1420   it->second->MarkUsed();
1421   return it->second.get();
1422 }
1423 
Resize(size_t bytes,std::vector<RefCountedPtr<ChildPolicyWrapper>> * child_policy_wrappers_to_delete)1424 void RlsLb::Cache::Resize(size_t bytes,
1425                           std::vector<RefCountedPtr<ChildPolicyWrapper>>*
1426                               child_policy_wrappers_to_delete) {
1427   GRPC_TRACE_LOG(rls_lb, INFO)
1428       << "[rlslb " << lb_policy_ << "] resizing cache to " << bytes << " bytes";
1429   size_limit_ = bytes;
1430   MaybeShrinkSize(size_limit_, child_policy_wrappers_to_delete);
1431 }
1432 
ResetAllBackoff()1433 void RlsLb::Cache::ResetAllBackoff() {
1434   for (auto& p : map_) {
1435     p.second->ResetBackoff();
1436   }
1437   lb_policy_->UpdatePickerAsync();
1438 }
1439 
Shutdown()1440 std::vector<RefCountedPtr<RlsLb::ChildPolicyWrapper>> RlsLb::Cache::Shutdown() {
1441   std::vector<RefCountedPtr<ChildPolicyWrapper>>
1442       child_policy_wrappers_to_delete;
1443   for (auto& entry : map_) {
1444     entry.second->TakeChildPolicyWrappers(&child_policy_wrappers_to_delete);
1445   }
1446   map_.clear();
1447   lru_list_.clear();
1448   if (cleanup_timer_handle_.has_value() &&
1449       lb_policy_->channel_control_helper()->GetEventEngine()->Cancel(
1450           *cleanup_timer_handle_)) {
1451     GRPC_TRACE_LOG(rls_lb, INFO)
1452         << "[rlslb " << lb_policy_ << "] cache cleanup timer canceled";
1453   }
1454   cleanup_timer_handle_.reset();
1455   return child_policy_wrappers_to_delete;
1456 }
1457 
ReportMetricsLocked(CallbackMetricReporter & reporter)1458 void RlsLb::Cache::ReportMetricsLocked(CallbackMetricReporter& reporter) {
1459   reporter.Report(
1460       kMetricCacheSize, size_,
1461       {lb_policy_->channel_control_helper()->GetTarget(),
1462        lb_policy_->config_->lookup_service(), lb_policy_->instance_uuid_},
1463       {});
1464   reporter.Report(
1465       kMetricCacheEntries, map_.size(),
1466       {lb_policy_->channel_control_helper()->GetTarget(),
1467        lb_policy_->config_->lookup_service(), lb_policy_->instance_uuid_},
1468       {});
1469 }
1470 
StartCleanupTimer()1471 void RlsLb::Cache::StartCleanupTimer() {
1472   cleanup_timer_handle_ =
1473       lb_policy_->channel_control_helper()->GetEventEngine()->RunAfter(
1474           kCacheCleanupTimerInterval,
1475           [this, lb_policy = lb_policy_->Ref(DEBUG_LOCATION,
1476                                              "CacheCleanupTimer")]() mutable {
1477             ApplicationCallbackExecCtx callback_exec_ctx;
1478             ExecCtx exec_ctx;
1479             lb_policy_->work_serializer()->Run(
1480                 [this, lb_policy = std::move(lb_policy)]() {
1481                   // The lb_policy ref is held until the callback completes
1482                   OnCleanupTimer();
1483                 },
1484                 DEBUG_LOCATION);
1485           });
1486 }
1487 
OnCleanupTimer()1488 void RlsLb::Cache::OnCleanupTimer() {
1489   GRPC_TRACE_LOG(rls_lb, INFO)
1490       << "[rlslb " << lb_policy_ << "] cache cleanup timer fired";
1491   std::vector<RefCountedPtr<ChildPolicyWrapper>>
1492       child_policy_wrappers_to_delete;
1493   MutexLock lock(&lb_policy_->mu_);
1494   if (!cleanup_timer_handle_.has_value()) return;
1495   if (lb_policy_->is_shutdown_) return;
1496   for (auto it = map_.begin(); it != map_.end();) {
1497     if (GPR_UNLIKELY(it->second->ShouldRemove() && it->second->CanEvict())) {
1498       size_ -= it->second->Size();
1499       it->second->TakeChildPolicyWrappers(&child_policy_wrappers_to_delete);
1500       it = map_.erase(it);
1501     } else {
1502       ++it;
1503     }
1504   }
1505   StartCleanupTimer();
1506 }
1507 
EntrySizeForKey(const RequestKey & key)1508 size_t RlsLb::Cache::EntrySizeForKey(const RequestKey& key) {
1509   // Key is stored twice, once in LRU list and again in the cache map.
1510   return (key.Size() * 2) + sizeof(Entry);
1511 }
1512 
MaybeShrinkSize(size_t bytes,std::vector<RefCountedPtr<ChildPolicyWrapper>> * child_policy_wrappers_to_delete)1513 void RlsLb::Cache::MaybeShrinkSize(
1514     size_t bytes, std::vector<RefCountedPtr<ChildPolicyWrapper>>*
1515                       child_policy_wrappers_to_delete) {
1516   while (size_ > bytes) {
1517     auto lru_it = lru_list_.begin();
1518     if (GPR_UNLIKELY(lru_it == lru_list_.end())) break;
1519     auto map_it = map_.find(*lru_it);
1520     CHECK(map_it != map_.end());
1521     if (!map_it->second->CanEvict()) break;
1522     GRPC_TRACE_LOG(rls_lb, INFO)
1523         << "[rlslb " << lb_policy_ << "] LRU eviction: removing entry "
1524         << map_it->second.get() << " " << lru_it->ToString();
1525     size_ -= map_it->second->Size();
1526     map_it->second->TakeChildPolicyWrappers(child_policy_wrappers_to_delete);
1527     map_.erase(map_it);
1528   }
1529   GRPC_TRACE_LOG(rls_lb, INFO)
1530       << "[rlslb " << lb_policy_
1531       << "] LRU pass complete: desired size=" << bytes << " size=" << size_;
1532 }
1533 
1534 //
1535 // RlsLb::RlsChannel::StateWatcher
1536 //
1537 
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)1538 void RlsLb::RlsChannel::StateWatcher::OnConnectivityStateChange(
1539     grpc_connectivity_state new_state, const absl::Status& status) {
1540   auto* lb_policy = rls_channel_->lb_policy_.get();
1541   GRPC_TRACE_LOG(rls_lb, INFO)
1542       << "[rlslb " << lb_policy << "] RlsChannel=" << rls_channel_.get()
1543       << " StateWatcher=" << this << ": state changed to "
1544       << ConnectivityStateName(new_state) << " (" << status << ")";
1545   if (rls_channel_->is_shutdown_) return;
1546   MutexLock lock(&lb_policy->mu_);
1547   if (new_state == GRPC_CHANNEL_READY && was_transient_failure_) {
1548     was_transient_failure_ = false;
1549     // Reset the backoff of all cache entries, so that we don't
1550     // double-penalize if an RLS request fails while the channel is
1551     // down, since the throttling for the channel being down is handled
1552     // at the channel level instead of in the individual cache entries.
1553     lb_policy->cache_.ResetAllBackoff();
1554   } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
1555     was_transient_failure_ = true;
1556   }
1557 }
1558 
1559 //
1560 // RlsLb::RlsChannel::Throttle
1561 //
1562 
ShouldThrottle()1563 bool RlsLb::RlsChannel::Throttle::ShouldThrottle() {
1564   Timestamp now = Timestamp::Now();
1565   while (!requests_.empty() && now - requests_.front() > window_size_) {
1566     requests_.pop_front();
1567   }
1568   while (!failures_.empty() && now - failures_.front() > window_size_) {
1569     failures_.pop_front();
1570   }
1571   // Compute probability of throttling.
1572   float num_requests = requests_.size();
1573   float num_successes = num_requests - failures_.size();
1574   // Note: it's possible that this ratio will be negative, in which case
1575   // no throttling will be done.
1576   float throttle_probability =
1577       (num_requests - (num_successes * ratio_for_successes_)) /
1578       (num_requests + padding_);
1579   // Generate a random number for the request.
1580   std::uniform_real_distribution<float> dist(0, 1.0);
1581   // Check if we should throttle the request.
1582   bool throttle = dist(rng_) < throttle_probability;
1583   // If we're throttling, record the request and the failure.
1584   if (throttle) {
1585     requests_.push_back(now);
1586     failures_.push_back(now);
1587   }
1588   return throttle;
1589 }
1590 
RegisterResponse(bool success)1591 void RlsLb::RlsChannel::Throttle::RegisterResponse(bool success) {
1592   Timestamp now = Timestamp::Now();
1593   requests_.push_back(now);
1594   if (!success) failures_.push_back(now);
1595 }
1596 
1597 //
1598 // RlsLb::RlsChannel
1599 //
1600 
RlsChannel(RefCountedPtr<RlsLb> lb_policy)1601 RlsLb::RlsChannel::RlsChannel(RefCountedPtr<RlsLb> lb_policy)
1602     : InternallyRefCounted<RlsChannel>(
1603           GRPC_TRACE_FLAG_ENABLED(rls_lb) ? "RlsChannel" : nullptr),
1604       lb_policy_(std::move(lb_policy)) {
1605   // Get channel creds from parent channel.
1606   // Note that we are using the "unsafe" channel creds here, which do
1607   // include any associated call creds.  This is safe in this case,
1608   // because we are using the parent channel's authority on the RLS channel.
1609   auto creds =
1610       lb_policy_->channel_control_helper()->GetUnsafeChannelCredentials();
1611   // Use the parent channel's authority.
1612   auto authority = lb_policy_->channel_control_helper()->GetAuthority();
1613   ChannelArgs args = ChannelArgs()
1614                          .Set(GRPC_ARG_DEFAULT_AUTHORITY, authority)
1615                          .Set(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL, 1);
1616   // Propagate fake security connector expected targets, if any.
1617   // (This is ugly, but it seems better than propagating all channel args
1618   // from the parent channel by default and then having a giant
1619   // exclude list of args to strip out, like we do in grpclb.)
1620   absl::optional<absl::string_view> fake_security_expected_targets =
1621       lb_policy_->channel_args_.GetString(
1622           GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS);
1623   if (fake_security_expected_targets.has_value()) {
1624     args = args.Set(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS,
1625                     *fake_security_expected_targets);
1626   }
1627   // Add service config args if needed.
1628   const std::string& service_config =
1629       lb_policy_->config_->rls_channel_service_config();
1630   if (!service_config.empty()) {
1631     args = args.Set(GRPC_ARG_SERVICE_CONFIG, service_config)
1632                .Set(GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION, 1);
1633   }
1634   channel_.reset(Channel::FromC(
1635       grpc_channel_create(lb_policy_->config_->lookup_service().c_str(),
1636                           creds.get(), args.ToC().get())));
1637   GRPC_TRACE_LOG(rls_lb, INFO)
1638       << "[rlslb " << lb_policy_.get() << "] RlsChannel=" << this
1639       << ": created channel " << channel_.get() << " for "
1640       << lb_policy_->config_->lookup_service();
1641   if (channel_ != nullptr) {
1642     // Set up channelz linkage.
1643     channelz::ChannelNode* child_channelz_node = channel_->channelz_node();
1644     auto parent_channelz_node =
1645         lb_policy_->channel_args_.GetObjectRef<channelz::ChannelNode>();
1646     if (child_channelz_node != nullptr && parent_channelz_node != nullptr) {
1647       parent_channelz_node->AddChildChannel(child_channelz_node->uuid());
1648       parent_channelz_node_ = std::move(parent_channelz_node);
1649     }
1650     // Start connectivity watch.
1651     watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher"));
1652     channel_->AddConnectivityWatcher(
1653         GRPC_CHANNEL_IDLE,
1654         OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
1655   }
1656 }
1657 
Orphan()1658 void RlsLb::RlsChannel::Orphan() {
1659   GRPC_TRACE_LOG(rls_lb, INFO)
1660       << "[rlslb " << lb_policy_.get() << "] RlsChannel=" << this
1661       << ", channel=" << channel_.get() << ": shutdown";
1662   is_shutdown_ = true;
1663   if (channel_ != nullptr) {
1664     // Remove channelz linkage.
1665     if (parent_channelz_node_ != nullptr) {
1666       channelz::ChannelNode* child_channelz_node = channel_->channelz_node();
1667       CHECK_NE(child_channelz_node, nullptr);
1668       parent_channelz_node_->RemoveChildChannel(child_channelz_node->uuid());
1669     }
1670     // Stop connectivity watch.
1671     if (watcher_ != nullptr) {
1672       channel_->RemoveConnectivityWatcher(watcher_);
1673       watcher_ = nullptr;
1674     }
1675     channel_.reset();
1676   }
1677   Unref(DEBUG_LOCATION, "Orphan");
1678 }
1679 
StartRlsCall(const RequestKey & key,Cache::Entry * stale_entry)1680 void RlsLb::RlsChannel::StartRlsCall(const RequestKey& key,
1681                                      Cache::Entry* stale_entry) {
1682   std::unique_ptr<BackOff> backoff_state;
1683   grpc_lookup_v1_RouteLookupRequest_Reason reason =
1684       grpc_lookup_v1_RouteLookupRequest_REASON_MISS;
1685   grpc_event_engine::experimental::Slice stale_header_data;
1686   if (stale_entry != nullptr) {
1687     backoff_state = stale_entry->TakeBackoffState();
1688     reason = grpc_lookup_v1_RouteLookupRequest_REASON_STALE;
1689     stale_header_data = stale_entry->header_data().Ref();
1690   }
1691   lb_policy_->request_map_.emplace(
1692       key, MakeOrphanable<RlsRequest>(
1693                lb_policy_.Ref(DEBUG_LOCATION, "RlsRequest"), key,
1694                lb_policy_->rls_channel_->Ref(DEBUG_LOCATION, "RlsRequest"),
1695                std::move(backoff_state), reason, std::move(stale_header_data)));
1696 }
1697 
ReportResponseLocked(bool response_succeeded)1698 void RlsLb::RlsChannel::ReportResponseLocked(bool response_succeeded) {
1699   throttle_.RegisterResponse(response_succeeded);
1700 }
1701 
ResetBackoff()1702 void RlsLb::RlsChannel::ResetBackoff() {
1703   DCHECK(channel_ != nullptr);
1704   channel_->ResetConnectionBackoff();
1705 }
1706 
1707 //
1708 // RlsLb::RlsRequest
1709 //
1710 
RlsRequest(RefCountedPtr<RlsLb> lb_policy,RequestKey key,RefCountedPtr<RlsChannel> rls_channel,std::unique_ptr<BackOff> backoff_state,grpc_lookup_v1_RouteLookupRequest_Reason reason,grpc_event_engine::experimental::Slice stale_header_data)1711 RlsLb::RlsRequest::RlsRequest(
1712     RefCountedPtr<RlsLb> lb_policy, RequestKey key,
1713     RefCountedPtr<RlsChannel> rls_channel,
1714     std::unique_ptr<BackOff> backoff_state,
1715     grpc_lookup_v1_RouteLookupRequest_Reason reason,
1716     grpc_event_engine::experimental::Slice stale_header_data)
1717     : InternallyRefCounted<RlsRequest>(
1718           GRPC_TRACE_FLAG_ENABLED(rls_lb) ? "RlsRequest" : nullptr),
1719       lb_policy_(std::move(lb_policy)),
1720       key_(std::move(key)),
1721       rls_channel_(std::move(rls_channel)),
1722       backoff_state_(std::move(backoff_state)),
1723       reason_(reason),
1724       stale_header_data_(std::move(stale_header_data)) {
1725   GRPC_TRACE_LOG(rls_lb, INFO)
1726       << "[rlslb " << lb_policy_.get() << "] rls_request=" << this
1727       << ": RLS request created for key " << key_.ToString();
1728   GRPC_CLOSURE_INIT(&call_complete_cb_, OnRlsCallComplete, this, nullptr);
1729   ExecCtx::Run(
1730       DEBUG_LOCATION,
1731       GRPC_CLOSURE_INIT(&call_start_cb_, StartCall,
1732                         Ref(DEBUG_LOCATION, "StartCall").release(), nullptr),
1733       absl::OkStatus());
1734 }
1735 
~RlsRequest()1736 RlsLb::RlsRequest::~RlsRequest() { CHECK_EQ(call_, nullptr); }
1737 
Orphan()1738 void RlsLb::RlsRequest::Orphan() {
1739   if (call_ != nullptr) {
1740     GRPC_TRACE_LOG(rls_lb, INFO)
1741         << "[rlslb " << lb_policy_.get() << "] rls_request=" << this << " "
1742         << key_.ToString() << ": cancelling RLS call";
1743     grpc_call_cancel_internal(call_);
1744   }
1745   Unref(DEBUG_LOCATION, "Orphan");
1746 }
1747 
StartCall(void * arg,grpc_error_handle)1748 void RlsLb::RlsRequest::StartCall(void* arg, grpc_error_handle /*error*/) {
1749   auto* request = static_cast<RlsRequest*>(arg);
1750   request->lb_policy_->work_serializer()->Run(
1751       [request]() {
1752         request->StartCallLocked();
1753         request->Unref(DEBUG_LOCATION, "StartCall");
1754       },
1755       DEBUG_LOCATION);
1756 }
1757 
StartCallLocked()1758 void RlsLb::RlsRequest::StartCallLocked() {
1759   {
1760     MutexLock lock(&lb_policy_->mu_);
1761     if (lb_policy_->is_shutdown_) return;
1762   }
1763   Timestamp now = Timestamp::Now();
1764   deadline_ = now + lb_policy_->config_->lookup_service_timeout();
1765   grpc_metadata_array_init(&recv_initial_metadata_);
1766   grpc_metadata_array_init(&recv_trailing_metadata_);
1767   call_ = rls_channel_->channel()->CreateCall(
1768       /*parent_call=*/nullptr, GRPC_PROPAGATE_DEFAULTS, /*cq=*/nullptr,
1769       lb_policy_->interested_parties(),
1770       Slice::FromStaticString(kRlsRequestPath), /*authority=*/absl::nullopt,
1771       deadline_, /*registered_method=*/true);
1772   grpc_op ops[6];
1773   memset(ops, 0, sizeof(ops));
1774   grpc_op* op = ops;
1775   op->op = GRPC_OP_SEND_INITIAL_METADATA;
1776   ++op;
1777   op->op = GRPC_OP_SEND_MESSAGE;
1778   send_message_ = MakeRequestProto();
1779   op->data.send_message.send_message = send_message_;
1780   ++op;
1781   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
1782   ++op;
1783   op->op = GRPC_OP_RECV_INITIAL_METADATA;
1784   op->data.recv_initial_metadata.recv_initial_metadata =
1785       &recv_initial_metadata_;
1786   ++op;
1787   op->op = GRPC_OP_RECV_MESSAGE;
1788   op->data.recv_message.recv_message = &recv_message_;
1789   ++op;
1790   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1791   op->data.recv_status_on_client.trailing_metadata = &recv_trailing_metadata_;
1792   op->data.recv_status_on_client.status = &status_recv_;
1793   op->data.recv_status_on_client.status_details = &status_details_recv_;
1794   ++op;
1795   Ref(DEBUG_LOCATION, "OnRlsCallComplete").release();
1796   auto call_error = grpc_call_start_batch_and_execute(
1797       call_, ops, static_cast<size_t>(op - ops), &call_complete_cb_);
1798   CHECK_EQ(call_error, GRPC_CALL_OK);
1799 }
1800 
OnRlsCallComplete(void * arg,grpc_error_handle error)1801 void RlsLb::RlsRequest::OnRlsCallComplete(void* arg, grpc_error_handle error) {
1802   auto* request = static_cast<RlsRequest*>(arg);
1803   request->lb_policy_->work_serializer()->Run(
1804       [request, error]() {
1805         request->OnRlsCallCompleteLocked(error);
1806         request->Unref(DEBUG_LOCATION, "OnRlsCallComplete");
1807       },
1808       DEBUG_LOCATION);
1809 }
1810 
OnRlsCallCompleteLocked(grpc_error_handle error)1811 void RlsLb::RlsRequest::OnRlsCallCompleteLocked(grpc_error_handle error) {
1812   if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) {
1813     std::string status_message(StringViewFromSlice(status_details_recv_));
1814     LOG(INFO) << "[rlslb " << lb_policy_.get() << "] rls_request=" << this
1815               << " " << key_.ToString() << ", error=" << StatusToString(error)
1816               << ", status={" << status_recv_ << ", " << status_message << "}"
1817               << " RLS call response received";
1818   }
1819   // Parse response.
1820   ResponseInfo response;
1821   if (!error.ok()) {
1822     grpc_status_code code;
1823     std::string message;
1824     grpc_error_get_status(error, deadline_, &code, &message,
1825                           /*http_error=*/nullptr, /*error_string=*/nullptr);
1826     response.status =
1827         absl::Status(static_cast<absl::StatusCode>(code), message);
1828   } else if (status_recv_ != GRPC_STATUS_OK) {
1829     response.status = absl::Status(static_cast<absl::StatusCode>(status_recv_),
1830                                    StringViewFromSlice(status_details_recv_));
1831   } else {
1832     response = ParseResponseProto();
1833   }
1834   // Clean up call state.
1835   grpc_byte_buffer_destroy(send_message_);
1836   grpc_byte_buffer_destroy(recv_message_);
1837   grpc_metadata_array_destroy(&recv_initial_metadata_);
1838   grpc_metadata_array_destroy(&recv_trailing_metadata_);
1839   CSliceUnref(status_details_recv_);
1840   grpc_call_unref(call_);
1841   call_ = nullptr;
1842   // Return result to cache.
1843   GRPC_TRACE_LOG(rls_lb, INFO)
1844       << "[rlslb " << lb_policy_.get() << "] rls_request=" << this << " "
1845       << key_.ToString() << ": response info: " << response.ToString();
1846   std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
1847   std::vector<RefCountedPtr<ChildPolicyWrapper>>
1848       child_policy_wrappers_to_delete;
1849   OrphanablePtr<ChildPolicyHandler> child_policy_to_delete;
1850   {
1851     MutexLock lock(&lb_policy_->mu_);
1852     if (lb_policy_->is_shutdown_) return;
1853     rls_channel_->ReportResponseLocked(response.status.ok());
1854     Cache::Entry* cache_entry =
1855         lb_policy_->cache_.FindOrInsert(key_, &child_policy_wrappers_to_delete);
1856     child_policies_to_finish_update = cache_entry->OnRlsResponseLocked(
1857         std::move(response), std::move(backoff_state_),
1858         &child_policy_to_delete);
1859     lb_policy_->request_map_.erase(key_);
1860   }
1861   // Now that we've released the lock, finish the update on any newly
1862   // created child policies.
1863   for (ChildPolicyWrapper* child : child_policies_to_finish_update) {
1864     // If the child policy returns a non-OK status, request re-resolution.
1865     // Note that this will initially cause fixed backoff delay in the
1866     // resolver instead of exponential delay.  However, once the
1867     // resolver returns the initial re-resolution, we will be able to
1868     // return non-OK from UpdateLocked(), which will trigger
1869     // exponential backoff instead.
1870     absl::Status status = child->MaybeFinishUpdate();
1871     if (!status.ok()) {
1872       lb_policy_->channel_control_helper()->RequestReresolution();
1873     }
1874   }
1875 }
1876 
MakeRequestProto()1877 grpc_byte_buffer* RlsLb::RlsRequest::MakeRequestProto() {
1878   upb::Arena arena;
1879   grpc_lookup_v1_RouteLookupRequest* req =
1880       grpc_lookup_v1_RouteLookupRequest_new(arena.ptr());
1881   grpc_lookup_v1_RouteLookupRequest_set_target_type(
1882       req, upb_StringView_FromDataAndSize(kGrpc, sizeof(kGrpc) - 1));
1883   for (const auto& kv : key_.key_map) {
1884     grpc_lookup_v1_RouteLookupRequest_key_map_set(
1885         req, upb_StringView_FromDataAndSize(kv.first.data(), kv.first.size()),
1886         upb_StringView_FromDataAndSize(kv.second.data(), kv.second.size()),
1887         arena.ptr());
1888   }
1889   grpc_lookup_v1_RouteLookupRequest_set_reason(req, reason_);
1890   if (!stale_header_data_.empty()) {
1891     grpc_lookup_v1_RouteLookupRequest_set_stale_header_data(
1892         req, StdStringToUpbString(stale_header_data_.as_string_view()));
1893   }
1894   size_t len;
1895   char* buf =
1896       grpc_lookup_v1_RouteLookupRequest_serialize(req, arena.ptr(), &len);
1897   grpc_slice send_slice = grpc_slice_from_copied_buffer(buf, len);
1898   grpc_byte_buffer* byte_buffer = grpc_raw_byte_buffer_create(&send_slice, 1);
1899   CSliceUnref(send_slice);
1900   return byte_buffer;
1901 }
1902 
ParseResponseProto()1903 RlsLb::ResponseInfo RlsLb::RlsRequest::ParseResponseProto() {
1904   ResponseInfo response_info;
1905   upb::Arena arena;
1906   grpc_byte_buffer_reader bbr;
1907   grpc_byte_buffer_reader_init(&bbr, recv_message_);
1908   grpc_slice recv_slice = grpc_byte_buffer_reader_readall(&bbr);
1909   grpc_byte_buffer_reader_destroy(&bbr);
1910   grpc_lookup_v1_RouteLookupResponse* response =
1911       grpc_lookup_v1_RouteLookupResponse_parse(
1912           reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(recv_slice)),
1913           GRPC_SLICE_LENGTH(recv_slice), arena.ptr());
1914   CSliceUnref(recv_slice);
1915   if (response == nullptr) {
1916     response_info.status = absl::InternalError("cannot parse RLS response");
1917     return response_info;
1918   }
1919   size_t num_targets;
1920   const upb_StringView* targets_strview =
1921       grpc_lookup_v1_RouteLookupResponse_targets(response, &num_targets);
1922   if (num_targets == 0) {
1923     response_info.status =
1924         absl::InvalidArgumentError("RLS response has no target entry");
1925     return response_info;
1926   }
1927   response_info.targets.reserve(num_targets);
1928   for (size_t i = 0; i < num_targets; ++i) {
1929     response_info.targets.emplace_back(targets_strview[i].data,
1930                                        targets_strview[i].size);
1931   }
1932   upb_StringView header_data_strview =
1933       grpc_lookup_v1_RouteLookupResponse_header_data(response);
1934   response_info.header_data =
1935       grpc_event_engine::experimental::Slice::FromCopiedBuffer(
1936           header_data_strview.data, header_data_strview.size);
1937   return response_info;
1938 }
1939 
1940 //
1941 // RlsLb
1942 //
1943 
GenerateUUID()1944 std::string GenerateUUID() {
1945   absl::uniform_int_distribution<uint64_t> distribution;
1946   absl::BitGen bitgen;
1947   uint64_t hi = distribution(bitgen);
1948   uint64_t lo = distribution(bitgen);
1949   return GenerateUUIDv4(hi, lo);
1950 }
1951 
RlsLb(Args args)1952 RlsLb::RlsLb(Args args)
1953     : LoadBalancingPolicy(std::move(args)),
1954       instance_uuid_(channel_args()
1955                          .GetOwnedString(GRPC_ARG_TEST_ONLY_RLS_INSTANCE_ID)
1956                          .value_or(GenerateUUID())),
1957       cache_(this) {
1958   GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy created";
1959 }
1960 
EndpointsEqual(const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> endpoints1,const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> endpoints2)1961 bool EndpointsEqual(
1962     const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> endpoints1,
1963     const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>>
1964         endpoints2) {
1965   if (endpoints1.status() != endpoints2.status()) return false;
1966   if (endpoints1.ok()) {
1967     std::vector<EndpointAddresses> e1_list;
1968     (*endpoints1)->ForEach([&](const EndpointAddresses& endpoint) {
1969       e1_list.push_back(endpoint);
1970     });
1971     size_t i = 0;
1972     bool different = false;
1973     (*endpoints2)->ForEach([&](const EndpointAddresses& endpoint) {
1974       if (endpoint != e1_list[i++]) different = true;
1975     });
1976     if (different) return false;
1977     if (i != e1_list.size()) return false;
1978   }
1979   return true;
1980 }
1981 
UpdateLocked(UpdateArgs args)1982 absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
1983   GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy updated";
1984   update_in_progress_ = true;
1985   // Swap out config.
1986   RefCountedPtr<RlsLbConfig> old_config = std::move(config_);
1987   config_ = args.config.TakeAsSubclass<RlsLbConfig>();
1988   if (GRPC_TRACE_FLAG_ENABLED(rls_lb) &&
1989       (old_config == nullptr ||
1990        old_config->child_policy_config() != config_->child_policy_config())) {
1991     LOG(INFO) << "[rlslb " << this << "] updated child policy config: "
1992               << JsonDump(config_->child_policy_config());
1993   }
1994   // Swap out addresses.
1995   // If the new address list is an error and we have an existing address list,
1996   // stick with the existing addresses.
1997   absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> old_addresses;
1998   if (args.addresses.ok()) {
1999     old_addresses = std::move(addresses_);
2000     addresses_ = std::move(args.addresses);
2001   } else {
2002     old_addresses = addresses_;
2003   }
2004   // Swap out channel args.
2005   channel_args_ = std::move(args.args);
2006   // Determine whether we need to update all child policies.
2007   bool update_child_policies =
2008       old_config == nullptr ||
2009       old_config->child_policy_config() != config_->child_policy_config() ||
2010       !EndpointsEqual(old_addresses, addresses_) || args.args != channel_args_;
2011   // If default target changes, swap out child policy.
2012   bool created_default_child = false;
2013   if (old_config == nullptr ||
2014       config_->default_target() != old_config->default_target()) {
2015     if (config_->default_target().empty()) {
2016       GRPC_TRACE_LOG(rls_lb, INFO)
2017           << "[rlslb " << this << "] unsetting default target";
2018       default_child_policy_.reset();
2019     } else {
2020       auto it = child_policy_map_.find(config_->default_target());
2021       if (it == child_policy_map_.end()) {
2022         GRPC_TRACE_LOG(rls_lb, INFO)
2023             << "[rlslb " << this << "] creating new default target";
2024         default_child_policy_ = MakeRefCounted<ChildPolicyWrapper>(
2025             RefAsSubclass<RlsLb>(DEBUG_LOCATION, "ChildPolicyWrapper"),
2026             config_->default_target());
2027         created_default_child = true;
2028       } else {
2029         GRPC_TRACE_LOG(rls_lb, INFO)
2030             << "[rlslb " << this << "] using existing child for default target";
2031         default_child_policy_ =
2032             it->second->Ref(DEBUG_LOCATION, "DefaultChildPolicy");
2033       }
2034     }
2035   }
2036   // Now grab the lock to swap out the state it guards.
2037   std::vector<RefCountedPtr<ChildPolicyWrapper>>
2038       child_policy_wrappers_to_delete;
2039   OrphanablePtr<ChildPolicyHandler> child_policy_to_delete;
2040   {
2041     MutexLock lock(&mu_);
2042     // Swap out RLS channel if needed.
2043     if (old_config == nullptr ||
2044         config_->lookup_service() != old_config->lookup_service()) {
2045       rls_channel_ = MakeOrphanable<RlsChannel>(
2046           RefAsSubclass<RlsLb>(DEBUG_LOCATION, "RlsChannel"));
2047     }
2048     // Resize cache if needed.
2049     if (old_config == nullptr ||
2050         config_->cache_size_bytes() != old_config->cache_size_bytes()) {
2051       cache_.Resize(static_cast<size_t>(config_->cache_size_bytes()),
2052                     &child_policy_wrappers_to_delete);
2053     }
2054     // Start update of child policies if needed.
2055     if (update_child_policies) {
2056       GRPC_TRACE_LOG(rls_lb, INFO)
2057           << "[rlslb " << this << "] starting child policy updates";
2058       for (auto& p : child_policy_map_) {
2059         p.second->StartUpdate(&child_policy_to_delete);
2060       }
2061     } else if (created_default_child) {
2062       GRPC_TRACE_LOG(rls_lb, INFO)
2063           << "[rlslb " << this << "] starting default child policy update";
2064       default_child_policy_->StartUpdate(&child_policy_to_delete);
2065     }
2066   }
2067   // Now that we've released the lock, finish update of child policies.
2068   std::vector<std::string> errors;
2069   if (update_child_policies) {
2070     GRPC_TRACE_LOG(rls_lb, INFO)
2071         << "[rlslb " << this << "] finishing child policy updates";
2072     for (auto& p : child_policy_map_) {
2073       absl::Status status = p.second->MaybeFinishUpdate();
2074       if (!status.ok()) {
2075         errors.emplace_back(
2076             absl::StrCat("target ", p.first, ": ", status.ToString()));
2077       }
2078     }
2079   } else if (created_default_child) {
2080     GRPC_TRACE_LOG(rls_lb, INFO)
2081         << "[rlslb " << this << "] finishing default child policy update";
2082     absl::Status status = default_child_policy_->MaybeFinishUpdate();
2083     if (!status.ok()) {
2084       errors.emplace_back(absl::StrCat("target ", config_->default_target(),
2085                                        ": ", status.ToString()));
2086     }
2087   }
2088   update_in_progress_ = false;
2089   // On the initial update only, we set the gauge metric callback.  We
2090   // can't do this before the initial update, because otherwise the
2091   // callback could be invoked before we've set state that we need for
2092   // the label values (e.g., we'd add metrics with empty string for the
2093   // RLS server name).
2094   if (registered_metric_callback_ == nullptr) {
2095     registered_metric_callback_ =
2096         channel_control_helper()->GetStatsPluginGroup().RegisterCallback(
2097             [this](CallbackMetricReporter& reporter) {
2098               MutexLock lock(&mu_);
2099               cache_.ReportMetricsLocked(reporter);
2100             },
2101             Duration::Seconds(5), kMetricCacheSize, kMetricCacheEntries);
2102   }
2103   // In principle, we need to update the picker here only if the config
2104   // fields used by the picker have changed.  However, it seems fragile
2105   // to check individual fields, since the picker logic could change in
2106   // the future to use additional config fields, and we might not
2107   // remember to update the code here.  So for now, we just unconditionally
2108   // update the picker here, even though it's probably redundant.
2109   UpdatePickerLocked();
2110   // Return status.
2111   if (!errors.empty()) {
2112     return absl::UnavailableError(absl::StrCat(
2113         "errors from children: [", absl::StrJoin(errors, "; "), "]"));
2114   }
2115   return absl::OkStatus();
2116 }
2117 
ExitIdleLocked()2118 void RlsLb::ExitIdleLocked() {
2119   MutexLock lock(&mu_);
2120   for (auto& child_entry : child_policy_map_) {
2121     child_entry.second->ExitIdleLocked();
2122   }
2123 }
2124 
ResetBackoffLocked()2125 void RlsLb::ResetBackoffLocked() {
2126   {
2127     MutexLock lock(&mu_);
2128     rls_channel_->ResetBackoff();
2129     cache_.ResetAllBackoff();
2130   }
2131   for (auto& child : child_policy_map_) {
2132     child.second->ResetBackoffLocked();
2133   }
2134 }
2135 
ShutdownLocked()2136 void RlsLb::ShutdownLocked() {
2137   GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy shutdown";
2138   registered_metric_callback_.reset();
2139   RefCountedPtr<ChildPolicyWrapper> child_policy_to_delete;
2140   std::vector<RefCountedPtr<ChildPolicyWrapper>>
2141       child_policy_wrappers_to_delete;
2142   OrphanablePtr<RlsChannel> rls_channel_to_delete;
2143   {
2144     MutexLock lock(&mu_);
2145     is_shutdown_ = true;
2146     config_.reset(DEBUG_LOCATION, "ShutdownLocked");
2147     child_policy_wrappers_to_delete = cache_.Shutdown();
2148     request_map_.clear();
2149     rls_channel_to_delete = std::move(rls_channel_);
2150     child_policy_to_delete = std::move(default_child_policy_);
2151   }
2152   channel_args_ = ChannelArgs();
2153 }
2154 
UpdatePickerAsync()2155 void RlsLb::UpdatePickerAsync() {
2156   // Run via the ExecCtx, since the caller may be holding the lock, and
2157   // we don't want to be doing that when we hop into the WorkSerializer,
2158   // in case the WorkSerializer callback happens to run inline.
2159   ExecCtx::Run(
2160       DEBUG_LOCATION,
2161       GRPC_CLOSURE_CREATE(UpdatePickerCallback,
2162                           Ref(DEBUG_LOCATION, "UpdatePickerCallback").release(),
2163                           grpc_schedule_on_exec_ctx),
2164       absl::OkStatus());
2165 }
2166 
UpdatePickerCallback(void * arg,grpc_error_handle)2167 void RlsLb::UpdatePickerCallback(void* arg, grpc_error_handle /*error*/) {
2168   auto* rls_lb = static_cast<RlsLb*>(arg);
2169   rls_lb->work_serializer()->Run(
2170       [rls_lb]() {
2171         RefCountedPtr<RlsLb> lb_policy(rls_lb);
2172         lb_policy->UpdatePickerLocked();
2173         lb_policy.reset(DEBUG_LOCATION, "UpdatePickerCallback");
2174       },
2175       DEBUG_LOCATION);
2176 }
2177 
UpdatePickerLocked()2178 void RlsLb::UpdatePickerLocked() {
2179   // If we're in the process of propagating an update from our parent to
2180   // our children, ignore any updates that come from the children.  We
2181   // will instead return a new picker once the update has been seen by
2182   // all children.  This avoids unnecessary picker churn while an update
2183   // is being propagated to our children.
2184   if (update_in_progress_) return;
2185   GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] updating picker";
2186   grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
2187   if (!child_policy_map_.empty()) {
2188     state = GRPC_CHANNEL_TRANSIENT_FAILURE;
2189     int num_idle = 0;
2190     int num_connecting = 0;
2191     {
2192       MutexLock lock(&mu_);
2193       if (is_shutdown_) return;
2194       for (auto& p : child_policy_map_) {
2195         grpc_connectivity_state child_state = p.second->connectivity_state();
2196         GRPC_TRACE_LOG(rls_lb, INFO)
2197             << "[rlslb " << this << "] target " << p.second->target()
2198             << " in state " << ConnectivityStateName(child_state);
2199         if (child_state == GRPC_CHANNEL_READY) {
2200           state = GRPC_CHANNEL_READY;
2201           break;
2202         } else if (child_state == GRPC_CHANNEL_CONNECTING) {
2203           ++num_connecting;
2204         } else if (child_state == GRPC_CHANNEL_IDLE) {
2205           ++num_idle;
2206         }
2207       }
2208       if (state != GRPC_CHANNEL_READY) {
2209         if (num_connecting > 0) {
2210           state = GRPC_CHANNEL_CONNECTING;
2211         } else if (num_idle > 0) {
2212           state = GRPC_CHANNEL_IDLE;
2213         }
2214       }
2215     }
2216   }
2217   GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] reporting state "
2218                                << ConnectivityStateName(state);
2219   absl::Status status;
2220   if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
2221     status = absl::UnavailableError("no children available");
2222   }
2223   channel_control_helper()->UpdateState(
2224       state, status,
2225       MakeRefCounted<Picker>(RefAsSubclass<RlsLb>(DEBUG_LOCATION, "Picker")));
2226 }
2227 
2228 template <typename HandleType>
MaybeExportPickCount(HandleType handle,absl::string_view target,const PickResult & pick_result)2229 void RlsLb::MaybeExportPickCount(HandleType handle, absl::string_view target,
2230                                  const PickResult& pick_result) {
2231   absl::string_view pick_result_string = Match(
2232       pick_result.result,
2233       [](const LoadBalancingPolicy::PickResult::Complete&) {
2234         return "complete";
2235       },
2236       [](const LoadBalancingPolicy::PickResult::Queue&) { return ""; },
2237       [](const LoadBalancingPolicy::PickResult::Fail&) { return "fail"; },
2238       [](const LoadBalancingPolicy::PickResult::Drop&) { return "drop"; });
2239   if (pick_result_string.empty()) return;  // Don't report queued picks.
2240   auto& stats_plugins = channel_control_helper()->GetStatsPluginGroup();
2241   stats_plugins.AddCounter(
2242       handle, 1,
2243       {channel_control_helper()->GetTarget(), config_->lookup_service(), target,
2244        pick_result_string},
2245       {});
2246 }
2247 
2248 //
2249 // RlsLbFactory
2250 //
2251 
2252 struct GrpcKeyBuilder {
2253   struct Name {
2254     std::string service;
2255     std::string method;
2256 
JsonLoadergrpc_core::__anon2f58d5c00111::GrpcKeyBuilder::Name2257     static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
2258       static const auto* loader = JsonObjectLoader<Name>()
2259                                       .Field("service", &Name::service)
2260                                       .OptionalField("method", &Name::method)
2261                                       .Finish();
2262       return loader;
2263     }
2264   };
2265 
2266   struct NameMatcher {
2267     std::string key;
2268     std::vector<std::string> names;
2269     absl::optional<bool> required_match;
2270 
JsonLoadergrpc_core::__anon2f58d5c00111::GrpcKeyBuilder::NameMatcher2271     static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
2272       static const auto* loader =
2273           JsonObjectLoader<NameMatcher>()
2274               .Field("key", &NameMatcher::key)
2275               .Field("names", &NameMatcher::names)
2276               .OptionalField("requiredMatch", &NameMatcher::required_match)
2277               .Finish();
2278       return loader;
2279     }
2280 
JsonPostLoadgrpc_core::__anon2f58d5c00111::GrpcKeyBuilder::NameMatcher2281     void JsonPostLoad(const Json&, const JsonArgs&, ValidationErrors* errors) {
2282       // key must be non-empty.
2283       {
2284         ValidationErrors::ScopedField field(errors, ".key");
2285         if (!errors->FieldHasErrors() && key.empty()) {
2286           errors->AddError("must be non-empty");
2287         }
2288       }
2289       // List of header names must be non-empty.
2290       {
2291         ValidationErrors::ScopedField field(errors, ".names");
2292         if (!errors->FieldHasErrors() && names.empty()) {
2293           errors->AddError("must be non-empty");
2294         }
2295         // Individual header names must be non-empty.
2296         for (size_t i = 0; i < names.size(); ++i) {
2297           ValidationErrors::ScopedField field(errors,
2298                                               absl::StrCat("[", i, "]"));
2299           if (!errors->FieldHasErrors() && names[i].empty()) {
2300             errors->AddError("must be non-empty");
2301           }
2302         }
2303       }
2304       // requiredMatch must not be present.
2305       {
2306         ValidationErrors::ScopedField field(errors, ".requiredMatch");
2307         if (required_match.has_value()) {
2308           errors->AddError("must not be present");
2309         }
2310       }
2311     }
2312   };
2313 
2314   struct ExtraKeys {
2315     absl::optional<std::string> host_key;
2316     absl::optional<std::string> service_key;
2317     absl::optional<std::string> method_key;
2318 
JsonLoadergrpc_core::__anon2f58d5c00111::GrpcKeyBuilder::ExtraKeys2319     static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
2320       static const auto* loader =
2321           JsonObjectLoader<ExtraKeys>()
2322               .OptionalField("host", &ExtraKeys::host_key)
2323               .OptionalField("service", &ExtraKeys::service_key)
2324               .OptionalField("method", &ExtraKeys::method_key)
2325               .Finish();
2326       return loader;
2327     }
2328 
JsonPostLoadgrpc_core::__anon2f58d5c00111::GrpcKeyBuilder::ExtraKeys2329     void JsonPostLoad(const Json&, const JsonArgs&, ValidationErrors* errors) {
2330       auto check_field = [&](const std::string& field_name,
2331                              absl::optional<std::string>* struct_field) {
2332         ValidationErrors::ScopedField field(errors,
2333                                             absl::StrCat(".", field_name));
2334         if (struct_field->has_value() && (*struct_field)->empty()) {
2335           errors->AddError("must be non-empty if set");
2336         }
2337       };
2338       check_field("host", &host_key);
2339       check_field("service", &service_key);
2340       check_field("method", &method_key);
2341     }
2342   };
2343 
2344   std::vector<Name> names;
2345   std::vector<NameMatcher> headers;
2346   ExtraKeys extra_keys;
2347   std::map<std::string /*key*/, std::string /*value*/> constant_keys;
2348 
JsonLoadergrpc_core::__anon2f58d5c00111::GrpcKeyBuilder2349   static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
2350     static const auto* loader =
2351         JsonObjectLoader<GrpcKeyBuilder>()
2352             .Field("names", &GrpcKeyBuilder::names)
2353             .OptionalField("headers", &GrpcKeyBuilder::headers)
2354             .OptionalField("extraKeys", &GrpcKeyBuilder::extra_keys)
2355             .OptionalField("constantKeys", &GrpcKeyBuilder::constant_keys)
2356             .Finish();
2357     return loader;
2358   }
2359 
JsonPostLoadgrpc_core::__anon2f58d5c00111::GrpcKeyBuilder2360   void JsonPostLoad(const Json&, const JsonArgs&, ValidationErrors* errors) {
2361     // The names field must be non-empty.
2362     {
2363       ValidationErrors::ScopedField field(errors, ".names");
2364       if (!errors->FieldHasErrors() && names.empty()) {
2365         errors->AddError("must be non-empty");
2366       }
2367     }
2368     // Make sure no key in constantKeys is empty.
2369     if (constant_keys.find("") != constant_keys.end()) {
2370       ValidationErrors::ScopedField field(errors, ".constantKeys[\"\"]");
2371       errors->AddError("key must be non-empty");
2372     }
2373     // Check for duplicate keys.
2374     std::set<absl::string_view> keys_seen;
2375     auto duplicate_key_check_func = [&keys_seen, errors](
2376                                         const std::string& key,
2377                                         const std::string& field_name) {
2378       if (key.empty()) return;  // Already generated an error about this.
2379       ValidationErrors::ScopedField field(errors, field_name);
2380       auto it = keys_seen.find(key);
2381       if (it != keys_seen.end()) {
2382         errors->AddError(absl::StrCat("duplicate key \"", key, "\""));
2383       } else {
2384         keys_seen.insert(key);
2385       }
2386     };
2387     for (size_t i = 0; i < headers.size(); ++i) {
2388       NameMatcher& header = headers[i];
2389       duplicate_key_check_func(header.key,
2390                                absl::StrCat(".headers[", i, "].key"));
2391     }
2392     for (const auto& p : constant_keys) {
2393       duplicate_key_check_func(
2394           p.first, absl::StrCat(".constantKeys[\"", p.first, "\"]"));
2395     }
2396     if (extra_keys.host_key.has_value()) {
2397       duplicate_key_check_func(*extra_keys.host_key, ".extraKeys.host");
2398     }
2399     if (extra_keys.service_key.has_value()) {
2400       duplicate_key_check_func(*extra_keys.service_key, ".extraKeys.service");
2401     }
2402     if (extra_keys.method_key.has_value()) {
2403       duplicate_key_check_func(*extra_keys.method_key, ".extraKeys.method");
2404     }
2405   }
2406 };
2407 
JsonLoader(const JsonArgs &)2408 const JsonLoaderInterface* RlsLbConfig::RouteLookupConfig::JsonLoader(
2409     const JsonArgs&) {
2410   static const auto* loader =
2411       JsonObjectLoader<RouteLookupConfig>()
2412           // Note: Some fields require manual processing and are handled in
2413           // JsonPostLoad() instead.
2414           .Field("lookupService", &RouteLookupConfig::lookup_service)
2415           .OptionalField("lookupServiceTimeout",
2416                          &RouteLookupConfig::lookup_service_timeout)
2417           .OptionalField("maxAge", &RouteLookupConfig::max_age)
2418           .OptionalField("staleAge", &RouteLookupConfig::stale_age)
2419           .Field("cacheSizeBytes", &RouteLookupConfig::cache_size_bytes)
2420           .OptionalField("defaultTarget", &RouteLookupConfig::default_target)
2421           .Finish();
2422   return loader;
2423 }
2424 
JsonPostLoad(const Json & json,const JsonArgs & args,ValidationErrors * errors)2425 void RlsLbConfig::RouteLookupConfig::JsonPostLoad(const Json& json,
2426                                                   const JsonArgs& args,
2427                                                   ValidationErrors* errors) {
2428   // Parse grpcKeybuilders.
2429   auto grpc_keybuilders = LoadJsonObjectField<std::vector<GrpcKeyBuilder>>(
2430       json.object(), args, "grpcKeybuilders", errors);
2431   if (grpc_keybuilders.has_value()) {
2432     ValidationErrors::ScopedField field(errors, ".grpcKeybuilders");
2433     for (size_t i = 0; i < grpc_keybuilders->size(); ++i) {
2434       ValidationErrors::ScopedField field(errors, absl::StrCat("[", i, "]"));
2435       auto& grpc_keybuilder = (*grpc_keybuilders)[i];
2436       // Construct KeyBuilder.
2437       RlsLbConfig::KeyBuilder key_builder;
2438       for (const auto& header : grpc_keybuilder.headers) {
2439         key_builder.header_keys.emplace(header.key, header.names);
2440       }
2441       if (grpc_keybuilder.extra_keys.host_key.has_value()) {
2442         key_builder.host_key = std::move(*grpc_keybuilder.extra_keys.host_key);
2443       }
2444       if (grpc_keybuilder.extra_keys.service_key.has_value()) {
2445         key_builder.service_key =
2446             std::move(*grpc_keybuilder.extra_keys.service_key);
2447       }
2448       if (grpc_keybuilder.extra_keys.method_key.has_value()) {
2449         key_builder.method_key =
2450             std::move(*grpc_keybuilder.extra_keys.method_key);
2451       }
2452       key_builder.constant_keys = std::move(grpc_keybuilder.constant_keys);
2453       // Add entries to map.
2454       for (const auto& name : grpc_keybuilder.names) {
2455         std::string path = absl::StrCat("/", name.service, "/", name.method);
2456         bool inserted = key_builder_map.emplace(path, key_builder).second;
2457         if (!inserted) {
2458           errors->AddError(absl::StrCat("duplicate entry for \"", path, "\""));
2459         }
2460       }
2461     }
2462   }
2463   // Validate lookupService.
2464   {
2465     ValidationErrors::ScopedField field(errors, ".lookupService");
2466     if (!errors->FieldHasErrors() &&
2467         !CoreConfiguration::Get().resolver_registry().IsValidTarget(
2468             lookup_service)) {
2469       errors->AddError("must be valid gRPC target URI");
2470     }
2471   }
2472   // Clamp maxAge to the max allowed value.
2473   if (max_age > kMaxMaxAge) max_age = kMaxMaxAge;
2474   // If staleAge is set, then maxAge must also be set.
2475   if (json.object().find("staleAge") != json.object().end() &&
2476       json.object().find("maxAge") == json.object().end()) {
2477     ValidationErrors::ScopedField field(errors, ".maxAge");
2478     errors->AddError("must be set if staleAge is set");
2479   }
2480   // Ignore staleAge if greater than or equal to maxAge.
2481   if (stale_age >= max_age) stale_age = max_age;
2482   // Validate cacheSizeBytes.
2483   {
2484     ValidationErrors::ScopedField field(errors, ".cacheSizeBytes");
2485     if (!errors->FieldHasErrors() && cache_size_bytes <= 0) {
2486       errors->AddError("must be greater than 0");
2487     }
2488   }
2489   // Clamp cacheSizeBytes to the max allowed value.
2490   if (cache_size_bytes > kMaxCacheSizeBytes) {
2491     cache_size_bytes = kMaxCacheSizeBytes;
2492   }
2493   // Validate defaultTarget.
2494   {
2495     ValidationErrors::ScopedField field(errors, ".defaultTarget");
2496     if (!errors->FieldHasErrors() &&
2497         json.object().find("defaultTarget") != json.object().end() &&
2498         default_target.empty()) {
2499       errors->AddError("must be non-empty if set");
2500     }
2501   }
2502 }
2503 
JsonLoader(const JsonArgs &)2504 const JsonLoaderInterface* RlsLbConfig::JsonLoader(const JsonArgs&) {
2505   static const auto* loader =
2506       JsonObjectLoader<RlsLbConfig>()
2507           // Note: Some fields require manual processing and are handled in
2508           // JsonPostLoad() instead.
2509           .Field("routeLookupConfig", &RlsLbConfig::route_lookup_config_)
2510           .Field("childPolicyConfigTargetFieldName",
2511                  &RlsLbConfig::child_policy_config_target_field_name_)
2512           .Finish();
2513   return loader;
2514 }
2515 
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)2516 void RlsLbConfig::JsonPostLoad(const Json& json, const JsonArgs&,
2517                                ValidationErrors* errors) {
2518   // Parse routeLookupChannelServiceConfig.
2519   auto it = json.object().find("routeLookupChannelServiceConfig");
2520   if (it != json.object().end()) {
2521     ValidationErrors::ScopedField field(errors,
2522                                         ".routeLookupChannelServiceConfig");
2523     // Don't need to save the result here, just need the errors (if any).
2524     ServiceConfigImpl::Create(ChannelArgs(), it->second, errors);
2525   }
2526   // Validate childPolicyConfigTargetFieldName.
2527   {
2528     ValidationErrors::ScopedField field(errors,
2529                                         ".childPolicyConfigTargetFieldName");
2530     if (!errors->FieldHasErrors() &&
2531         child_policy_config_target_field_name_.empty()) {
2532       errors->AddError("must be non-empty");
2533     }
2534   }
2535   // Parse childPolicy.
2536   {
2537     ValidationErrors::ScopedField field(errors, ".childPolicy");
2538     auto it = json.object().find("childPolicy");
2539     if (it == json.object().end()) {
2540       errors->AddError("field not present");
2541     } else {
2542       // Add target to all child policy configs in the list.
2543       std::string target = route_lookup_config_.default_target.empty()
2544                                ? kFakeTargetFieldValue
2545                                : route_lookup_config_.default_target;
2546       auto child_policy_config = InsertOrUpdateChildPolicyField(
2547           child_policy_config_target_field_name_, target, it->second, errors);
2548       if (child_policy_config.has_value()) {
2549         child_policy_config_ = std::move(*child_policy_config);
2550         // Parse the config.
2551         auto parsed_config =
2552             CoreConfiguration::Get()
2553                 .lb_policy_registry()
2554                 .ParseLoadBalancingConfig(child_policy_config_);
2555         if (!parsed_config.ok()) {
2556           errors->AddError(parsed_config.status().message());
2557         } else {
2558           // Find the chosen config and return it in JSON form.
2559           // We remove all non-selected configs, and in the selected config,
2560           // we leave the target field in place, set to the default value.
2561           // This slightly optimizes what we need to do later when we update
2562           // a child policy for a given target.
2563           for (const Json& config : child_policy_config_.array()) {
2564             if (config.object().begin()->first == (*parsed_config)->name()) {
2565               child_policy_config_ = Json::FromArray({config});
2566               break;
2567             }
2568           }
2569           // If default target is set, set the default child config.
2570           if (!route_lookup_config_.default_target.empty()) {
2571             default_child_policy_parsed_config_ = std::move(*parsed_config);
2572           }
2573         }
2574       }
2575     }
2576   }
2577 }
2578 
2579 class RlsLbFactory final : public LoadBalancingPolicyFactory {
2580  public:
name() const2581   absl::string_view name() const override { return kRls; }
2582 
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const2583   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
2584       LoadBalancingPolicy::Args args) const override {
2585     return MakeOrphanable<RlsLb>(std::move(args));
2586   }
2587 
2588   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const2589   ParseLoadBalancingConfig(const Json& json) const override {
2590     return LoadFromJson<RefCountedPtr<RlsLbConfig>>(
2591         json, JsonArgs(), "errors validating RLS LB policy config");
2592   }
2593 };
2594 
2595 }  //  namespace
2596 
RegisterRlsLbPolicy(CoreConfiguration::Builder * builder)2597 void RegisterRlsLbPolicy(CoreConfiguration::Builder* builder) {
2598   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
2599       std::make_unique<RlsLbFactory>());
2600 }
2601 
2602 }  // namespace grpc_core
2603