• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2020 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 // FIXME: add tests:
18 // - cache eviction via cleanup timer (based on age)
19 // - RLS channel is down; wait_for_ready request is sent and RLS request fails
20 //   and goes into backoff; RLS channel comes back up before backoff timer
21 //   fires; request is processed at that point
22 // - find some deterministic way to exercise adaptive throttler code
23 
24 #include <gmock/gmock.h>
25 #include <grpc/credentials.h>
26 #include <grpcpp/channel.h>
27 #include <grpcpp/create_channel.h>
28 #include <grpcpp/security/credentials.h>
29 #include <grpcpp/server.h>
30 #include <grpcpp/server_builder.h>
31 #include <grpcpp/support/channel_arguments.h>
32 #include <gtest/gtest.h>
33 
34 #include <deque>
35 #include <map>
36 #include <thread>
37 
38 #include "absl/log/check.h"
39 #include "absl/log/log.h"
40 #include "absl/strings/str_format.h"
41 #include "absl/strings/str_join.h"
42 #include "absl/types/optional.h"
43 #include "src/core/client_channel/backup_poller.h"
44 #include "src/core/config/config_vars.h"
45 #include "src/core/lib/address_utils/parse_address.h"
46 #include "src/core/lib/channel/channel_args.h"
47 #include "src/core/lib/iomgr/sockaddr.h"
48 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
49 #include "src/core/load_balancing/rls/rls.h"
50 #include "src/core/resolver/fake/fake_resolver.h"
51 #include "src/core/service_config/service_config_impl.h"
52 #include "src/core/util/env.h"
53 #include "src/core/util/host_port.h"
54 #include "src/core/util/time.h"
55 #include "src/core/util/uri.h"
56 #include "src/cpp/server/secure_server_credentials.h"
57 #include "src/proto/grpc/lookup/v1/rls.grpc.pb.h"
58 #include "src/proto/grpc/lookup/v1/rls.pb.h"
59 #include "src/proto/grpc/testing/echo.grpc.pb.h"
60 #include "test/core/event_engine/event_engine_test_utils.h"
61 #include "test/core/test_util/fake_stats_plugin.h"
62 #include "test/core/test_util/port.h"
63 #include "test/core/test_util/resolve_localhost_ip46.h"
64 #include "test/core/test_util/test_config.h"
65 #include "test/core/test_util/test_lb_policies.h"
66 #include "test/cpp/end2end/counted_service.h"
67 #include "test/cpp/end2end/rls_server.h"
68 #include "test/cpp/end2end/test_service_impl.h"
69 #include "test/cpp/util/credentials.h"
70 #include "test/cpp/util/test_config.h"
71 
72 using ::grpc::lookup::v1::RouteLookupRequest;
73 
74 namespace grpc {
75 namespace testing {
76 namespace {
77 
78 const char* kServerName = "test.google.fr";
79 const char* kRequestMessage = "Live long and prosper.";
80 const char* kRlsInstanceUuid = "rls_instance_uuid";
81 
82 const char* kCallCredsMdKey = "call_cred_name";
83 const char* kCallCredsMdValue = "call_cred_value";
84 
85 const char* kTestKey = "test_key";
86 const char* kTestValue = "test_value";
87 const char* kHostKey = "host_key";
88 const char* kServiceKey = "service_key";
89 const char* kServiceValue = "grpc.testing.EchoTestService";
90 const char* kMethodKey = "method_key";
91 const char* kMethodValue = "Echo";
92 const char* kConstantKey = "constant_key";
93 const char* kConstantValue = "constant_value";
94 
95 using BackendService = CountedService<TestServiceImpl>;
96 
97 // Subclass of TestServiceImpl that increments a request counter for
98 // every call to the Echo Rpc.
99 class MyTestServiceImpl : public BackendService {
100  public:
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)101   Status Echo(ServerContext* context, const EchoRequest* request,
102               EchoResponse* response) override {
103     // Backend should see call creds.
104     EXPECT_THAT(context->client_metadata(),
105                 ::testing::Contains(
106                     ::testing::Pair(kCallCredsMdKey, kCallCredsMdValue)));
107     IncreaseRequestCount();
108     auto client_metadata = context->client_metadata();
109     auto range = client_metadata.equal_range("x-google-rls-data");
110     {
111       grpc::internal::MutexLock lock(&mu_);
112       for (auto it = range.first; it != range.second; ++it) {
113         rls_header_data_.insert(
114             std::string(it->second.begin(), it->second.length()));
115       }
116     }
117     IncreaseResponseCount();
118     return TestServiceImpl::Echo(context, request, response);
119   }
120 
rls_data()121   std::set<std::string> rls_data() {
122     grpc::internal::MutexLock lock(&mu_);
123     return std::move(rls_header_data_);
124   }
125 
Start()126   void Start() {}
127 
Shutdown()128   void Shutdown() {}
129 
130  private:
131   grpc::internal::Mutex mu_;
132   std::set<std::string> rls_header_data_ ABSL_GUARDED_BY(&mu_);
133 };
134 
135 class FakeResolverResponseGeneratorWrapper {
136  public:
FakeResolverResponseGeneratorWrapper()137   FakeResolverResponseGeneratorWrapper()
138       : response_generator_(grpc_core::MakeRefCounted<
139                             grpc_core::FakeResolverResponseGenerator>()) {}
140 
SetNextResolution(absl::string_view service_config_json)141   void SetNextResolution(absl::string_view service_config_json) {
142     grpc_core::ExecCtx exec_ctx;
143     response_generator_->SetResponseSynchronously(
144         BuildFakeResults(service_config_json));
145   }
146 
Get() const147   grpc_core::FakeResolverResponseGenerator* Get() const {
148     return response_generator_.get();
149   }
150 
151  private:
BuildFakeResults(absl::string_view service_config_json)152   static grpc_core::Resolver::Result BuildFakeResults(
153       absl::string_view service_config_json) {
154     grpc_core::Resolver::Result result;
155     result.service_config =
156         grpc_core::ServiceConfigImpl::Create(result.args, service_config_json);
157     EXPECT_TRUE(result.service_config.ok()) << result.service_config.status();
158     return result;
159   }
160 
161   grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
162       response_generator_;
163 };
164 
165 class RlsEnd2endTest : public ::testing::Test {
166  protected:
SetUpTestSuite()167   static void SetUpTestSuite() {
168     grpc_core::ConfigVars::Overrides overrides;
169     overrides.client_channel_backup_poll_interval_ms = 1;
170     grpc_core::ConfigVars::SetOverrides(overrides);
171     grpc_core::CoreConfiguration::RegisterBuilder(
172         grpc_core::RegisterFixedAddressLoadBalancingPolicy);
173     grpc_init();
174   }
175 
TearDownTestSuite()176   static void TearDownTestSuite() {
177     grpc_shutdown_blocking();
178     WaitForSingleOwner(
179         grpc_event_engine::experimental::GetDefaultEventEngine());
180     grpc_core::CoreConfiguration::Reset();
181   }
182 
SetUp()183   void SetUp() override {
184     rls_server_ = std::make_unique<ServerThread<RlsServiceImpl>>(
185         "rls", [](grpc::ServerContext* ctx) {
186           EXPECT_THAT(ctx->client_metadata(),
187                       ::testing::Contains(
188                           ::testing::Pair(kCallCredsMdKey, kCallCredsMdValue)));
189           EXPECT_EQ(ctx->ExperimentalGetAuthority(), kServerName);
190         });
191     rls_server_->Start();
192     rls_server_target_ = absl::StrFormat("localhost:%d", rls_server_->port_);
193     // Set up client.
194     resolver_response_generator_ =
195         std::make_unique<FakeResolverResponseGeneratorWrapper>();
196     ChannelArguments args;
197     args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
198                     resolver_response_generator_->Get());
199     args.SetString(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS, kServerName);
200     args.SetString(GRPC_ARG_TEST_ONLY_RLS_INSTANCE_ID, kRlsInstanceUuid);
201     grpc_channel_credentials* channel_creds =
202         grpc_fake_transport_security_credentials_create();
203     grpc_call_credentials* call_creds = grpc_md_only_test_credentials_create(
204         kCallCredsMdKey, kCallCredsMdValue);
205     auto creds = std::make_shared<TestCompositeChannelCredentials>(
206         channel_creds, call_creds);
207     call_creds->Unref();
208     channel_creds->Unref();
209     target_uri_ = absl::StrCat("fake:///", kServerName);
210     channel_ = grpc::CreateCustomChannel(target_uri_, creds, args);
211     stub_ = grpc::testing::EchoTestService::NewStub(channel_);
212   }
213 
TearDown()214   void TearDown() override {
215     ShutdownBackends();
216     rls_server_->Shutdown();
217   }
218 
ShutdownBackends()219   void ShutdownBackends() {
220     for (auto& server : backends_) {
221       server->Shutdown();
222     }
223   }
224 
StartBackends(size_t num_servers)225   void StartBackends(size_t num_servers) {
226     backends_.clear();
227     for (size_t i = 0; i < num_servers; ++i) {
228       backends_.push_back(
229           std::make_unique<ServerThread<MyTestServiceImpl>>("backend"));
230       backends_.back()->Start();
231     }
232   }
233 
234   struct RpcOptions {
235     int timeout_ms = 5000;
236     bool wait_for_ready = false;
237     std::vector<std::pair<std::string, std::string>> metadata;
238 
RpcOptionsgrpc::testing::__anon4e73adf20111::RlsEnd2endTest::RpcOptions239     RpcOptions() {}
240 
set_timeout_msgrpc::testing::__anon4e73adf20111::RlsEnd2endTest::RpcOptions241     RpcOptions& set_timeout_ms(int rpc_timeout_ms) {
242       timeout_ms = rpc_timeout_ms;
243       return *this;
244     }
245 
set_wait_for_readygrpc::testing::__anon4e73adf20111::RlsEnd2endTest::RpcOptions246     RpcOptions& set_wait_for_ready(bool rpc_wait_for_ready) {
247       wait_for_ready = rpc_wait_for_ready;
248       return *this;
249     }
250 
set_metadatagrpc::testing::__anon4e73adf20111::RlsEnd2endTest::RpcOptions251     RpcOptions& set_metadata(
252         std::vector<std::pair<std::string, std::string>> rpc_metadata) {
253       metadata = std::move(rpc_metadata);
254       return *this;
255     }
256 
257     // Populates context.
SetupRpcgrpc::testing::__anon4e73adf20111::RlsEnd2endTest::RpcOptions258     void SetupRpc(ClientContext* context) const {
259       for (const auto& item : metadata) {
260         context->AddMetadata(item.first, item.second);
261       }
262       if (timeout_ms != 0) {
263         context->set_deadline(
264             grpc_timeout_milliseconds_to_deadline(timeout_ms));
265       }
266       if (wait_for_ready) context->set_wait_for_ready(true);
267     }
268   };
269 
SendRpc(const RpcOptions & rpc_options=RpcOptions (),EchoResponse * response=nullptr)270   Status SendRpc(const RpcOptions& rpc_options = RpcOptions(),
271                  EchoResponse* response = nullptr) {
272     EchoResponse local_response;
273     if (response == nullptr) response = &local_response;
274     ClientContext context;
275     rpc_options.SetupRpc(&context);
276     EchoRequest request;
277     request.set_message(kRequestMessage);
278     return stub_->Echo(&context, request, response);
279   }
280 
CheckRpcSendOk(const grpc_core::DebugLocation & location,const RpcOptions & rpc_options=RpcOptions ())281   void CheckRpcSendOk(const grpc_core::DebugLocation& location,
282                       const RpcOptions& rpc_options = RpcOptions()) {
283     EchoResponse response;
284     Status status = SendRpc(rpc_options, &response);
285     ASSERT_TRUE(status.ok()) << location.file() << ":" << location.line()
286                              << ": RPC failed: " << status.error_code() << ": "
287                              << status.error_message();
288     EXPECT_EQ(response.message(), kRequestMessage)
289         << location.file() << ":" << location.line();
290   }
291 
CheckRpcSendFailure(const grpc_core::DebugLocation & location,StatusCode expected_code,absl::string_view expected_message,const RpcOptions & rpc_options=RpcOptions ())292   void CheckRpcSendFailure(const grpc_core::DebugLocation& location,
293                            StatusCode expected_code,
294                            absl::string_view expected_message,
295                            const RpcOptions& rpc_options = RpcOptions()) {
296     Status status = SendRpc(rpc_options);
297     ASSERT_FALSE(status.ok()) << location.file() << ":" << location.line();
298     EXPECT_EQ(expected_code, status.error_code())
299         << location.file() << ":" << location.line();
300     EXPECT_EQ(expected_message, status.error_message())
301         << location.file() << ":" << location.line();
302   }
303 
304   class ServiceConfigBuilder {
305    public:
ServiceConfigBuilder(absl::string_view rls_server_target)306     explicit ServiceConfigBuilder(absl::string_view rls_server_target)
307         : rls_server_target_(rls_server_target) {}
308 
set_lookup_service_timeout(grpc_core::Duration timeout)309     ServiceConfigBuilder& set_lookup_service_timeout(
310         grpc_core::Duration timeout) {
311       lookup_service_timeout_ = timeout * grpc_test_slowdown_factor();
312       return *this;
313     }
314 
set_default_target(std::string default_target)315     ServiceConfigBuilder& set_default_target(std::string default_target) {
316       default_target_ = std::move(default_target);
317       return *this;
318     }
319 
set_max_age(grpc_core::Duration max_age)320     ServiceConfigBuilder& set_max_age(grpc_core::Duration max_age) {
321       max_age_ = max_age * grpc_test_slowdown_factor();
322       return *this;
323     }
324 
set_stale_age(grpc_core::Duration stale_age)325     ServiceConfigBuilder& set_stale_age(grpc_core::Duration stale_age) {
326       stale_age_ = stale_age * grpc_test_slowdown_factor();
327       return *this;
328     }
329 
set_cache_size_bytes(int64_t size)330     ServiceConfigBuilder& set_cache_size_bytes(int64_t size) {
331       cache_size_bytes_ = size;
332       return *this;
333     }
334 
AddKeyBuilder(absl::string_view key_builder)335     ServiceConfigBuilder& AddKeyBuilder(absl::string_view key_builder) {
336       key_builders_.push_back(absl::StrCat("{", key_builder, "}"));
337       return *this;
338     }
339 
Build()340     std::string Build() {
341       // First build parts of routeLookupConfig.
342       std::vector<std::string> route_lookup_config_parts;
343       route_lookup_config_parts.push_back(absl::StrFormat(
344           "        \"lookupService\":\"%s\"", rls_server_target_));
345       if (lookup_service_timeout_ > grpc_core::Duration::Zero()) {
346         route_lookup_config_parts.push_back(
347             absl::StrFormat("        \"lookupServiceTimeout\":\"%fs\"",
348                             lookup_service_timeout_.seconds()));
349       }
350       if (!default_target_.empty()) {
351         route_lookup_config_parts.push_back(absl::StrFormat(
352             "        \"defaultTarget\":\"%s\"", default_target_));
353       }
354       route_lookup_config_parts.push_back(absl::StrFormat(
355           "        \"cacheSizeBytes\":%" PRId64, cache_size_bytes_));
356       if (max_age_ > grpc_core::Duration::Zero()) {
357         route_lookup_config_parts.push_back(
358             absl::StrFormat("        \"maxAge\":\"%fs\"", max_age_.seconds()));
359       }
360       if (stale_age_ > grpc_core::Duration::Zero()) {
361         route_lookup_config_parts.push_back(absl::StrFormat(
362             "        \"staleAge\":\"%fs\"", stale_age_.seconds()));
363       }
364       if (!key_builders_.empty()) {
365         route_lookup_config_parts.push_back(
366             absl::StrFormat("        \"grpcKeybuilders\":[%s]",
367                             absl::StrJoin(key_builders_, ",")));
368       }
369       // Now build parts of RLS LB policy config.
370       std::vector<std::string> rls_config_parts;
371       if (!route_lookup_config_parts.empty()) {
372         rls_config_parts.push_back(absl::StrCat(
373             "      \"routeLookupConfig\":{",
374             absl::StrJoin(route_lookup_config_parts, ","), "      }"));
375       }
376       rls_config_parts.push_back(
377           "      \"childPolicy\":[{"
378           "        \"fixed_address_lb\":{}\n"
379           "      }],\n"
380           "      \"childPolicyConfigTargetFieldName\":\"address\"\n");
381       // Put it all together.
382       return absl::StrCat(
383           "{"
384           "  \"loadBalancingConfig\":[{"
385           "    \"rls_experimental\":{",
386           absl::StrJoin(rls_config_parts, ","),
387           "    }"
388           "  }]"
389           "}");
390     }
391 
392    private:
393     absl::string_view rls_server_target_;
394     grpc_core::Duration lookup_service_timeout_;
395     std::string default_target_;
396     grpc_core::Duration max_age_;
397     grpc_core::Duration stale_age_;
398     int64_t cache_size_bytes_ = 10485760;
399     std::vector<std::string> key_builders_;
400   };
401 
MakeServiceConfigBuilder()402   ServiceConfigBuilder MakeServiceConfigBuilder() {
403     return ServiceConfigBuilder(rls_server_target_);
404   }
405 
SetNextResolution(absl::string_view service_config_json)406   void SetNextResolution(absl::string_view service_config_json) {
407     resolver_response_generator_->SetNextResolution(service_config_json);
408   }
409 
410   template <typename T>
411   struct ServerThread {
412     template <typename... Args>
ServerThreadgrpc::testing::__anon4e73adf20111::RlsEnd2endTest::ServerThread413     explicit ServerThread(const grpc::string& type, Args&&... args)
414         : port_(grpc_pick_unused_port_or_die()),
415           type_(type),
416           service_(std::forward<Args>(args)...) {}
417 
Startgrpc::testing::__anon4e73adf20111::RlsEnd2endTest::ServerThread418     void Start() {
419       LOG(INFO) << "starting " << type_ << " server on port " << port_;
420       CHECK(!running_);
421       running_ = true;
422       service_.Start();
423       grpc::internal::Mutex mu;
424       // We need to acquire the lock here in order to prevent the notify_one
425       // by ServerThread::Serve from firing before the wait below is hit.
426       grpc::internal::MutexLock lock(&mu);
427       grpc::internal::CondVar cond;
428       thread_ = std::make_unique<std::thread>(
429           std::bind(&ServerThread::Serve, this, &mu, &cond));
430       cond.Wait(&mu);
431       LOG(INFO) << type_ << " server startup complete";
432     }
433 
Servegrpc::testing::__anon4e73adf20111::RlsEnd2endTest::ServerThread434     void Serve(grpc::internal::Mutex* mu, grpc::internal::CondVar* cond) {
435       // We need to acquire the lock here in order to prevent the notify_one
436       // below from firing before its corresponding wait is executed.
437       grpc::internal::MutexLock lock(mu);
438       ServerBuilder builder;
439       auto creds = std::make_shared<SecureServerCredentials>(
440           grpc_fake_transport_security_server_credentials_create());
441       builder.AddListeningPort(absl::StrCat("localhost:", port_),
442                                std::move(creds));
443       builder.RegisterService(&service_);
444       server_ = builder.BuildAndStart();
445       cond->Signal();
446     }
447 
Shutdowngrpc::testing::__anon4e73adf20111::RlsEnd2endTest::ServerThread448     void Shutdown() {
449       if (!running_) return;
450       LOG(INFO) << type_ << " about to shutdown";
451       service_.Shutdown();
452       server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
453       thread_->join();
454       LOG(INFO) << type_ << " shutdown completed";
455       running_ = false;
456     }
457 
458     const int port_;
459     grpc::string type_;
460     T service_;
461     std::unique_ptr<Server> server_;
462     std::unique_ptr<std::thread> thread_;
463     bool running_ = false;
464   };
465 
466   std::vector<std::unique_ptr<ServerThread<MyTestServiceImpl>>> backends_;
467   std::string rls_server_target_;
468   std::unique_ptr<ServerThread<RlsServiceImpl>> rls_server_;
469   std::unique_ptr<FakeResolverResponseGeneratorWrapper>
470       resolver_response_generator_;
471   std::string target_uri_;
472   std::shared_ptr<grpc::Channel> channel_;
473   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
474 };
475 
TEST_F(RlsEnd2endTest,Basic)476 TEST_F(RlsEnd2endTest, Basic) {
477   StartBackends(1);
478   SetNextResolution(
479       MakeServiceConfigBuilder()
480           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
481                                          "  \"service\":\"%s\","
482                                          "  \"method\":\"%s\""
483                                          "}],"
484                                          "\"headers\":["
485                                          "  {"
486                                          "    \"key\":\"%s\","
487                                          "    \"names\":["
488                                          "      \"key1\""
489                                          "    ]"
490                                          "  }"
491                                          "]",
492                                          kServiceValue, kMethodValue, kTestKey))
493           .Build());
494   rls_server_->service_.SetResponse(
495       BuildRlsRequest({{kTestKey, kTestValue}}),
496       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
497   CheckRpcSendOk(DEBUG_LOCATION,
498                  RpcOptions().set_metadata({{"key1", kTestValue}}));
499   EXPECT_EQ(rls_server_->service_.request_count(), 1);
500   EXPECT_EQ(rls_server_->service_.response_count(), 1);
501   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
502   // No RLS header seen by the backend, since the RLS response didn't set any.
503   EXPECT_THAT(backends_[0]->service_.rls_data(), ::testing::ElementsAre());
504 }
505 
TEST_F(RlsEnd2endTest,DuplicateHeadersAreMerged)506 TEST_F(RlsEnd2endTest, DuplicateHeadersAreMerged) {
507   const char* kTestValue2 = "test_value_2";
508   StartBackends(1);
509   SetNextResolution(
510       MakeServiceConfigBuilder()
511           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
512                                          "  \"service\":\"%s\","
513                                          "  \"method\":\"%s\""
514                                          "}],"
515                                          "\"headers\":["
516                                          "  {"
517                                          "    \"key\":\"%s\","
518                                          "    \"names\":["
519                                          "      \"key1\""
520                                          "    ]"
521                                          "  }"
522                                          "]",
523                                          kServiceValue, kMethodValue, kTestKey))
524           .Build());
525   rls_server_->service_.SetResponse(
526       BuildRlsRequest({{kTestKey, absl::StrCat(kTestValue, ",", kTestValue2)}}),
527       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
528   // Same header present twice in the request.  Values should be merged.
529   CheckRpcSendOk(
530       DEBUG_LOCATION,
531       RpcOptions().set_metadata({{"key1", kTestValue}, {"key1", kTestValue2}}));
532   EXPECT_EQ(rls_server_->service_.request_count(), 1);
533   EXPECT_EQ(rls_server_->service_.response_count(), 1);
534   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
535 }
536 
TEST_F(RlsEnd2endTest,SecondHeaderUsed)537 TEST_F(RlsEnd2endTest, SecondHeaderUsed) {
538   StartBackends(1);
539   SetNextResolution(
540       MakeServiceConfigBuilder()
541           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
542                                          "  \"service\":\"%s\","
543                                          "  \"method\":\"%s\""
544                                          "}],"
545                                          "\"headers\":["
546                                          "  {"
547                                          "    \"key\":\"%s\","
548                                          "    \"names\":["
549                                          "      \"key1\", \"key2\""
550                                          "    ]"
551                                          "  }"
552                                          "]",
553                                          kServiceValue, kMethodValue, kTestKey))
554           .Build());
555   rls_server_->service_.SetResponse(
556       BuildRlsRequest({{kTestKey, kTestValue}}),
557       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
558   CheckRpcSendOk(DEBUG_LOCATION,
559                  RpcOptions().set_metadata({{"key2", kTestValue}}));
560   EXPECT_EQ(rls_server_->service_.request_count(), 1);
561   EXPECT_EQ(rls_server_->service_.response_count(), 1);
562   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
563 }
564 
TEST_F(RlsEnd2endTest,MultipleHeaderKeys)565 TEST_F(RlsEnd2endTest, MultipleHeaderKeys) {
566   const char* kTestKey2 = "test_key_2";
567   const char* kTestValue2 = "test_value_2";
568   StartBackends(1);
569   SetNextResolution(MakeServiceConfigBuilder()
570                         .AddKeyBuilder(absl::StrFormat(
571                             "\"names\":[{"
572                             "  \"service\":\"%s\","
573                             "  \"method\":\"%s\""
574                             "}],"
575                             "\"headers\":["
576                             "  {"
577                             "    \"key\":\"%s\","
578                             "    \"names\":["
579                             "      \"key1\""
580                             "    ]"
581                             "  },"
582                             "  {"
583                             "    \"key\":\"%s\","
584                             "    \"names\":["
585                             "      \"key2\""
586                             "    ]"
587                             "  }"
588                             "]",
589                             kServiceValue, kMethodValue, kTestKey, kTestKey2))
590                         .Build());
591   rls_server_->service_.SetResponse(
592       BuildRlsRequest({
593           {kTestKey, kTestValue},
594           {kTestKey2, kTestValue2},
595       }),
596       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
597   CheckRpcSendOk(
598       DEBUG_LOCATION,
599       RpcOptions().set_metadata({{"key1", kTestValue}, {"key2", kTestValue2}}));
600   EXPECT_EQ(rls_server_->service_.request_count(), 1);
601   EXPECT_EQ(rls_server_->service_.response_count(), 1);
602   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
603   // No RLS header seen by the backend, since the RLS response didn't set any.
604   EXPECT_THAT(backends_[0]->service_.rls_data(), ::testing::ElementsAre());
605 }
606 
TEST_F(RlsEnd2endTest,NoHeaderMatch)607 TEST_F(RlsEnd2endTest, NoHeaderMatch) {
608   StartBackends(1);
609   SetNextResolution(
610       MakeServiceConfigBuilder()
611           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
612                                          "  \"service\":\"%s\","
613                                          "  \"method\":\"%s\""
614                                          "}],"
615                                          "\"headers\":["
616                                          "  {"
617                                          "    \"key\":\"%s\","
618                                          "    \"names\":["
619                                          "      \"key1\""
620                                          "    ]"
621                                          "  }"
622                                          "]",
623                                          kServiceValue, kMethodValue, kTestKey))
624           .Build());
625   rls_server_->service_.SetResponse(
626       BuildRlsRequest({}),
627       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
628   // Request does not have header "key1", so kTestKey will not be added.
629   CheckRpcSendOk(DEBUG_LOCATION);
630   EXPECT_EQ(rls_server_->service_.request_count(), 1);
631   EXPECT_EQ(rls_server_->service_.response_count(), 1);
632   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
633 }
634 
TEST_F(RlsEnd2endTest,WildcardMethod)635 TEST_F(RlsEnd2endTest, WildcardMethod) {
636   StartBackends(1);
637   SetNextResolution(MakeServiceConfigBuilder()
638                         .AddKeyBuilder(absl::StrFormat("\"names\":[{"
639                                                        "  \"service\":\"%s\""
640                                                        "}],"
641                                                        "\"headers\":["
642                                                        "  {"
643                                                        "    \"key\":\"%s\","
644                                                        "    \"names\":["
645                                                        "      \"key1\""
646                                                        "    ]"
647                                                        "  }"
648                                                        "]",
649                                                        kServiceValue, kTestKey))
650                         .Build());
651   rls_server_->service_.SetResponse(
652       BuildRlsRequest({{kTestKey, kTestValue}}),
653       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
654   CheckRpcSendOk(DEBUG_LOCATION,
655                  RpcOptions().set_metadata({{"key1", kTestValue}}));
656   EXPECT_EQ(rls_server_->service_.request_count(), 1);
657   EXPECT_EQ(rls_server_->service_.response_count(), 1);
658   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
659 }
660 
TEST_F(RlsEnd2endTest,NoKeyBuilderForMethod)661 TEST_F(RlsEnd2endTest, NoKeyBuilderForMethod) {
662   StartBackends(1);
663   SetNextResolution(
664       MakeServiceConfigBuilder()
665           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
666                                          "  \"service\":\"%s\","
667                                          "  \"method\":\"some_other_method\""
668                                          "}],"
669                                          "\"headers\":["
670                                          "  {"
671                                          "    \"key\":\"%s\","
672                                          "    \"names\":["
673                                          "      \"key1\""
674                                          "    ]"
675                                          "  }"
676                                          "]",
677                                          kServiceValue, kTestKey))
678           .Build());
679   rls_server_->service_.SetResponse(
680       BuildRlsRequest({}),
681       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
682   CheckRpcSendOk(DEBUG_LOCATION);
683   EXPECT_EQ(rls_server_->service_.request_count(), 1);
684   EXPECT_EQ(rls_server_->service_.response_count(), 1);
685   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
686 }
687 
TEST_F(RlsEnd2endTest,HeaderData)688 TEST_F(RlsEnd2endTest, HeaderData) {
689   const char* kHeaderData = "header_data";
690   StartBackends(1);
691   SetNextResolution(
692       MakeServiceConfigBuilder()
693           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
694                                          "  \"service\":\"%s\","
695                                          "  \"method\":\"%s\""
696                                          "}],"
697                                          "\"headers\":["
698                                          "  {"
699                                          "    \"key\":\"%s\","
700                                          "    \"names\":["
701                                          "      \"key1\""
702                                          "    ]"
703                                          "  }"
704                                          "]",
705                                          kServiceValue, kMethodValue, kTestKey))
706           .Build());
707   rls_server_->service_.SetResponse(
708       BuildRlsRequest({{kTestKey, kTestValue}}),
709       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)},
710                        kHeaderData));
711   CheckRpcSendOk(DEBUG_LOCATION,
712                  RpcOptions().set_metadata({{"key1", kTestValue}}));
713   EXPECT_EQ(rls_server_->service_.request_count(), 1);
714   EXPECT_EQ(rls_server_->service_.response_count(), 1);
715   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
716   EXPECT_THAT(backends_[0]->service_.rls_data(),
717               ::testing::ElementsAre(kHeaderData));
718 }
719 
TEST_F(RlsEnd2endTest,ExtraKeysAndConstantKeys)720 TEST_F(RlsEnd2endTest, ExtraKeysAndConstantKeys) {
721   StartBackends(1);
722   SetNextResolution(
723       MakeServiceConfigBuilder()
724           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
725                                          "  \"service\":\"%s\","
726                                          "  \"method\":\"%s\""
727                                          "}],"
728                                          "\"headers\":["
729                                          "  {"
730                                          "    \"key\":\"%s\","
731                                          "    \"names\":["
732                                          "      \"key1\",\"key2\",\"key3\""
733                                          "    ]"
734                                          "  }"
735                                          "],"
736                                          "\"extraKeys\":{"
737                                          "  \"host\":\"%s\","
738                                          "  \"service\":\"%s\","
739                                          "  \"method\":\"%s\""
740                                          "},"
741                                          "\"constantKeys\":{"
742                                          "  \"%s\":\"%s\""
743                                          "}",
744                                          kServiceValue, kMethodValue, kTestKey,
745                                          kHostKey, kServiceKey, kMethodKey,
746                                          kConstantKey, kConstantValue))
747           .Build());
748   rls_server_->service_.SetResponse(
749       BuildRlsRequest({
750           {kTestKey, kTestValue},
751           {kHostKey, kServerName},
752           {kServiceKey, kServiceValue},
753           {kMethodKey, kMethodValue},
754           {kConstantKey, kConstantValue},
755       }),
756       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
757   CheckRpcSendOk(DEBUG_LOCATION,
758                  RpcOptions().set_metadata({{"key1", kTestValue}}));
759   EXPECT_EQ(rls_server_->service_.request_count(), 1);
760   EXPECT_EQ(rls_server_->service_.response_count(), 1);
761   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
762 }
763 
TEST_F(RlsEnd2endTest,TwoCacheEntriesWithSameTarget)764 TEST_F(RlsEnd2endTest, TwoCacheEntriesWithSameTarget) {
765   const char* kTestValue2 = "test_value2";
766   StartBackends(1);
767   SetNextResolution(
768       MakeServiceConfigBuilder()
769           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
770                                          "  \"service\":\"%s\","
771                                          "  \"method\":\"%s\""
772                                          "}],"
773                                          "\"headers\":["
774                                          "  {"
775                                          "    \"key\":\"%s\","
776                                          "    \"names\":["
777                                          "      \"key1\""
778                                          "    ]"
779                                          "  }"
780                                          "]",
781                                          kServiceValue, kMethodValue, kTestKey))
782           .Build());
783   rls_server_->service_.SetResponse(
784       BuildRlsRequest({{kTestKey, kTestValue}}),
785       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
786   rls_server_->service_.SetResponse(
787       BuildRlsRequest({{kTestKey, kTestValue2}}),
788       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
789   CheckRpcSendOk(DEBUG_LOCATION,
790                  RpcOptions().set_metadata({{"key1", kTestValue}}));
791   EXPECT_EQ(rls_server_->service_.request_count(), 1);
792   EXPECT_EQ(rls_server_->service_.response_count(), 1);
793   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
794   CheckRpcSendOk(DEBUG_LOCATION,
795                  RpcOptions().set_metadata({{"key1", kTestValue2}}));
796   EXPECT_EQ(rls_server_->service_.request_count(), 2);
797   EXPECT_EQ(rls_server_->service_.response_count(), 2);
798   EXPECT_EQ(backends_[0]->service_.request_count(), 2);
799 }
800 
TEST_F(RlsEnd2endTest,FailedRlsRequestWithoutDefaultTarget)801 TEST_F(RlsEnd2endTest, FailedRlsRequestWithoutDefaultTarget) {
802   StartBackends(1);
803   SetNextResolution(
804       MakeServiceConfigBuilder()
805           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
806                                          "  \"service\":\"%s\","
807                                          "  \"method\":\"%s\""
808                                          "}],"
809                                          "\"headers\":["
810                                          "  {"
811                                          "    \"key\":\"%s\","
812                                          "    \"names\":["
813                                          "      \"key1\""
814                                          "    ]"
815                                          "  }"
816                                          "]",
817                                          kServiceValue, kMethodValue, kTestKey))
818           .Build());
819   // The test below has one RLS RPC fail and then a subsequent one that
820   // should succeed.  However, once the first RPC fails, the adaptive
821   // throttling code will throttle the second RPC with about 11% probability,
822   // which would cause the test to be flaky.  To avoid that, we seed the
823   // throttling state by sending two successful RPCs before we start the
824   // real test, which ensures that the second RPC of the real test will
825   // not be throttled (with 3 successes and 1 failure, the throttling
826   // probability will be negative, so the subsequent request will never be
827   // throttled).
828   const char* kTestValue2 = "test_value_2";
829   const char* kTestValue3 = "test_value_3";
830   rls_server_->service_.SetResponse(
831       BuildRlsRequest({{kTestKey, kTestValue2}}),
832       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
833   rls_server_->service_.SetResponse(
834       BuildRlsRequest({{kTestKey, kTestValue3}}),
835       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
836   CheckRpcSendOk(DEBUG_LOCATION,
837                  RpcOptions().set_metadata({{"key1", kTestValue2}}));
838   CheckRpcSendOk(DEBUG_LOCATION,
839                  RpcOptions().set_metadata({{"key1", kTestValue3}}));
840   // Now start the real test.
841   // Send an RPC before we give the RLS server a response.
842   // The RLS request will fail, and thus so will the data plane RPC.
843   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
844                       "RLS request failed: INTERNAL: no response entry",
845                       RpcOptions().set_metadata({{"key1", kTestValue}}));
846   EXPECT_THAT(
847       rls_server_->service_.GetUnmatchedRequests(),
848       ::testing::ElementsAre(
849           // TODO(roth): Change this to use ::testing::ProtoEquals()
850           // once that becomes available in OSS.
851           ::testing::Property(
852               &RouteLookupRequest::DebugString,
853               BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
854   // Now give the RLS server the right response.
855   rls_server_->service_.SetResponse(
856       BuildRlsRequest({{kTestKey, kTestValue}}),
857       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
858   // Sleep long enough for backoff to elapse, then try another RPC.
859   gpr_sleep_until(grpc_timeout_seconds_to_deadline(3));
860   CheckRpcSendOk(DEBUG_LOCATION,
861                  RpcOptions().set_metadata({{"key1", kTestValue}}));
862   EXPECT_EQ(rls_server_->service_.request_count(), 4);
863   EXPECT_EQ(rls_server_->service_.response_count(), 3);
864   EXPECT_EQ(backends_[0]->service_.request_count(), 3);
865 }
866 
TEST_F(RlsEnd2endTest,FailedRlsRequestWithDefaultTarget)867 TEST_F(RlsEnd2endTest, FailedRlsRequestWithDefaultTarget) {
868   StartBackends(1);
869   SetNextResolution(
870       MakeServiceConfigBuilder()
871           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
872                                          "  \"service\":\"%s\","
873                                          "  \"method\":\"%s\""
874                                          "}],"
875                                          "\"headers\":["
876                                          "  {"
877                                          "    \"key\":\"%s\","
878                                          "    \"names\":["
879                                          "      \"key1\""
880                                          "    ]"
881                                          "  }"
882                                          "]",
883                                          kServiceValue, kMethodValue, kTestKey))
884           .set_default_target(grpc_core::LocalIpUri(backends_[0]->port_))
885           .Build());
886   // Don't give the RLS server a response, so the RLS request will fail.
887   // The data plane RPC should be sent to the default target.
888   CheckRpcSendOk(DEBUG_LOCATION,
889                  RpcOptions().set_metadata({{"key1", kTestValue}}));
890   EXPECT_THAT(
891       rls_server_->service_.GetUnmatchedRequests(),
892       ::testing::ElementsAre(
893           // TODO(roth): Change this to use ::testing::ProtoEquals()
894           // once that becomes available in OSS.
895           ::testing::Property(
896               &RouteLookupRequest::DebugString,
897               BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
898   EXPECT_EQ(rls_server_->service_.request_count(), 1);
899   EXPECT_EQ(rls_server_->service_.response_count(), 0);
900   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
901 }
902 
TEST_F(RlsEnd2endTest,RlsRequestTimeout)903 TEST_F(RlsEnd2endTest, RlsRequestTimeout) {
904   StartBackends(2);
905   SetNextResolution(
906       MakeServiceConfigBuilder()
907           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
908                                          "  \"service\":\"%s\","
909                                          "  \"method\":\"%s\""
910                                          "}],"
911                                          "\"headers\":["
912                                          "  {"
913                                          "    \"key\":\"%s\","
914                                          "    \"names\":["
915                                          "      \"key1\""
916                                          "    ]"
917                                          "  }"
918                                          "]",
919                                          kServiceValue, kMethodValue, kTestKey))
920           .set_default_target(grpc_core::LocalIpUri(backends_[1]->port_))
921           .set_lookup_service_timeout(grpc_core::Duration::Seconds(2))
922           .Build());
923   // RLS server will send a response, but it takes longer than the
924   // timeout set in the LB policy config.
925   rls_server_->service_.SetResponse(
926       BuildRlsRequest({{kTestKey, kTestValue}}),
927       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}),
928       /*response_delay=*/grpc_core::Duration::Seconds(3));
929   // The data plane RPC should be sent to the default target.
930   CheckRpcSendOk(DEBUG_LOCATION,
931                  RpcOptions().set_metadata({{"key1", kTestValue}}));
932   EXPECT_EQ(rls_server_->service_.request_count(), 1);
933   EXPECT_EQ(backends_[0]->service_.request_count(), 0);
934   EXPECT_EQ(backends_[1]->service_.request_count(), 1);
935 }
936 
TEST_F(RlsEnd2endTest,UpdateConfig)937 TEST_F(RlsEnd2endTest, UpdateConfig) {
938   StartBackends(2);
939   auto service_config_builder =
940       MakeServiceConfigBuilder()
941           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
942                                          "  \"service\":\"%s\","
943                                          "  \"method\":\"%s\""
944                                          "}],"
945                                          "\"headers\":["
946                                          "  {"
947                                          "    \"key\":\"%s\","
948                                          "    \"names\":["
949                                          "      \"key1\""
950                                          "    ]"
951                                          "  }"
952                                          "]",
953                                          kServiceValue, kMethodValue, kTestKey))
954           .set_default_target(grpc_core::LocalIpUri(backends_[0]->port_));
955   SetNextResolution(service_config_builder.Build());
956   // Don't give the RLS server a response, so the RLS request will fail.
957   // The data plane RPC should be sent to the default target.
958   CheckRpcSendOk(DEBUG_LOCATION,
959                  RpcOptions().set_metadata({{"key1", kTestValue}}));
960   EXPECT_THAT(
961       rls_server_->service_.GetUnmatchedRequests(),
962       ::testing::ElementsAre(
963           // TODO(roth): Change this to use ::testing::ProtoEquals()
964           // once that becomes available in OSS.
965           ::testing::Property(
966               &RouteLookupRequest::DebugString,
967               BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
968   EXPECT_EQ(rls_server_->service_.request_count(), 1);
969   EXPECT_EQ(rls_server_->service_.response_count(), 0);
970   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
971   EXPECT_EQ(backends_[1]->service_.request_count(), 0);
972   // Now update the config to point to a new default target.
973   service_config_builder.set_default_target(
974       grpc_core::LocalIpUri(backends_[1]->port_));
975   SetNextResolution(service_config_builder.Build());
976   // Send another RPC, which should go to the new default target.
977   // The RLS server will *not* see another request, because the cache
978   // entry is still in backoff.
979   CheckRpcSendOk(DEBUG_LOCATION,
980                  RpcOptions().set_metadata({{"key1", kTestValue}}));
981   EXPECT_EQ(rls_server_->service_.request_count(), 1);
982   EXPECT_EQ(rls_server_->service_.response_count(), 0);
983   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
984   EXPECT_EQ(backends_[1]->service_.request_count(), 1);
985 }
986 
TEST_F(RlsEnd2endTest,CachedResponse)987 TEST_F(RlsEnd2endTest, CachedResponse) {
988   StartBackends(1);
989   SetNextResolution(
990       MakeServiceConfigBuilder()
991           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
992                                          "  \"service\":\"%s\","
993                                          "  \"method\":\"%s\""
994                                          "}],"
995                                          "\"headers\":["
996                                          "  {"
997                                          "    \"key\":\"%s\","
998                                          "    \"names\":["
999                                          "      \"key1\""
1000                                          "    ]"
1001                                          "  }"
1002                                          "]",
1003                                          kServiceValue, kMethodValue, kTestKey))
1004           .Build());
1005   rls_server_->service_.SetResponse(
1006       BuildRlsRequest({{kTestKey, kTestValue}}),
1007       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1008   // Send two RPCs.
1009   CheckRpcSendOk(DEBUG_LOCATION,
1010                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1011   CheckRpcSendOk(DEBUG_LOCATION,
1012                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1013   // The RLS server should have seen only one request.
1014   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1015   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1016   EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1017 }
1018 
TEST_F(RlsEnd2endTest,StaleCacheEntry)1019 TEST_F(RlsEnd2endTest, StaleCacheEntry) {
1020   StartBackends(1);
1021   SetNextResolution(
1022       MakeServiceConfigBuilder()
1023           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1024                                          "  \"service\":\"%s\","
1025                                          "  \"method\":\"%s\""
1026                                          "}],"
1027                                          "\"headers\":["
1028                                          "  {"
1029                                          "    \"key\":\"%s\","
1030                                          "    \"names\":["
1031                                          "      \"key1\""
1032                                          "    ]"
1033                                          "  }"
1034                                          "]",
1035                                          kServiceValue, kMethodValue, kTestKey))
1036           .set_max_age(grpc_core::Duration::Seconds(5))
1037           .set_stale_age(grpc_core::Duration::Seconds(1))
1038           .Build());
1039   rls_server_->service_.SetResponse(
1040       BuildRlsRequest({{kTestKey, kTestValue}}),
1041       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1042   // Send one RPC.  RLS server gets a request, and RPC goes to backend.
1043   CheckRpcSendOk(DEBUG_LOCATION,
1044                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1045   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1046   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1047   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1048   // Update RLS server to expect stale request.
1049   rls_server_->service_.RemoveResponse(
1050       BuildRlsRequest({{kTestKey, kTestValue}}));
1051   rls_server_->service_.SetResponse(
1052       BuildRlsRequest({{kTestKey, kTestValue}},
1053                       RouteLookupRequest::REASON_STALE),
1054       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1055   // Wait longer than stale age.
1056   gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1057   // Send another RPC.  This should use the stale value but should
1058   // dispatch a second RLS request.
1059   CheckRpcSendOk(DEBUG_LOCATION,
1060                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1061   EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1062   // Wait for RLS server to receive the second request.
1063   gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1064   EXPECT_EQ(rls_server_->service_.request_count(), 2);
1065   EXPECT_EQ(rls_server_->service_.response_count(), 2);
1066 }
1067 
TEST_F(RlsEnd2endTest,StaleCacheEntryWithHeaderData)1068 TEST_F(RlsEnd2endTest, StaleCacheEntryWithHeaderData) {
1069   const char* kHeaderData = "header_data";
1070   StartBackends(1);
1071   SetNextResolution(
1072       MakeServiceConfigBuilder()
1073           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1074                                          "  \"service\":\"%s\","
1075                                          "  \"method\":\"%s\""
1076                                          "}],"
1077                                          "\"headers\":["
1078                                          "  {"
1079                                          "    \"key\":\"%s\","
1080                                          "    \"names\":["
1081                                          "      \"key1\""
1082                                          "    ]"
1083                                          "  }"
1084                                          "]",
1085                                          kServiceValue, kMethodValue, kTestKey))
1086           .set_max_age(grpc_core::Duration::Seconds(5))
1087           .set_stale_age(grpc_core::Duration::Seconds(1))
1088           .Build());
1089   rls_server_->service_.SetResponse(
1090       BuildRlsRequest({{kTestKey, kTestValue}}),
1091       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)},
1092                        kHeaderData));
1093   // Send one RPC.  RLS server gets a request, and RPC goes to backend.
1094   CheckRpcSendOk(DEBUG_LOCATION,
1095                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1096   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1097   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1098   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1099   // Update RLS server to expect stale request.
1100   rls_server_->service_.RemoveResponse(
1101       BuildRlsRequest({{kTestKey, kTestValue}}));
1102   rls_server_->service_.SetResponse(
1103       BuildRlsRequest({{kTestKey, kTestValue}},
1104                       RouteLookupRequest::REASON_STALE, kHeaderData),
1105       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)},
1106                        kHeaderData));
1107   // Wait longer than stale age.
1108   gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1109   // Send another RPC.  This should use the stale value but should
1110   // dispatch a second RLS request.
1111   CheckRpcSendOk(DEBUG_LOCATION,
1112                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1113   EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1114   // Wait for RLS server to receive the second request.
1115   gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1116   EXPECT_EQ(rls_server_->service_.request_count(), 2);
1117   EXPECT_EQ(rls_server_->service_.response_count(), 2);
1118 }
1119 
TEST_F(RlsEnd2endTest,ExpiredCacheEntry)1120 TEST_F(RlsEnd2endTest, ExpiredCacheEntry) {
1121   StartBackends(1);
1122   SetNextResolution(
1123       MakeServiceConfigBuilder()
1124           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1125                                          "  \"service\":\"%s\","
1126                                          "  \"method\":\"%s\""
1127                                          "}],"
1128                                          "\"headers\":["
1129                                          "  {"
1130                                          "    \"key\":\"%s\","
1131                                          "    \"names\":["
1132                                          "      \"key1\""
1133                                          "    ]"
1134                                          "  }"
1135                                          "]",
1136                                          kServiceValue, kMethodValue, kTestKey))
1137           .set_max_age(grpc_core::Duration::Seconds(1))
1138           .set_lookup_service_timeout(grpc_core::Duration::Seconds(1))
1139           .Build());
1140   rls_server_->service_.SetResponse(
1141       BuildRlsRequest({{kTestKey, kTestValue}}),
1142       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1143   // Send one RPC.  RLS server gets a request, and RPC goes to backend.
1144   CheckRpcSendOk(DEBUG_LOCATION,
1145                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1146   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1147   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1148   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1149   // Remove response from RLS server so that the next RLS request fails.
1150   rls_server_->service_.RemoveResponse(
1151       BuildRlsRequest({{kTestKey, kTestValue}}));
1152   // Wait for cache to be expired.
1153   gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1154   // Send another RPC.  This should trigger a second RLS request, but
1155   // that fails, so the RPC fails.
1156   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1157                       "RLS request failed: INTERNAL: no response entry",
1158                       RpcOptions().set_metadata({{"key1", kTestValue}}));
1159   EXPECT_EQ(rls_server_->service_.request_count(), 2);
1160   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1161   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1162 }
1163 
TEST_F(RlsEnd2endTest,CacheSizeLimit)1164 TEST_F(RlsEnd2endTest, CacheSizeLimit) {
1165   const char* kTestValue2 = "test_value_2";
1166   StartBackends(2);
1167   SetNextResolution(
1168       MakeServiceConfigBuilder()
1169           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1170                                          "  \"service\":\"%s\","
1171                                          "  \"method\":\"%s\""
1172                                          "}],"
1173                                          "\"headers\":["
1174                                          "  {"
1175                                          "    \"key\":\"%s\","
1176                                          "    \"names\":["
1177                                          "      \"key1\""
1178                                          "    ]"
1179                                          "  }"
1180                                          "]",
1181                                          kServiceValue, kMethodValue,
1182                                          kTestKey))
1183           .set_cache_size_bytes(1)  // Not even big enough for one entry.
1184           .Build());
1185   // Set RLS responses for both kTestValue and kTestValue2.
1186   rls_server_->service_.SetResponse(
1187       BuildRlsRequest({{kTestKey, kTestValue}}),
1188       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1189   rls_server_->service_.SetResponse(
1190       BuildRlsRequest({{kTestKey, kTestValue2}}),
1191       BuildRlsResponse({grpc_core::LocalIpUri(backends_[1]->port_)}));
1192   // Send an RPC for kTestValue.
1193   // RLS server gets a request, and RPC goes to backend.
1194   CheckRpcSendOk(DEBUG_LOCATION,
1195                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1196   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1197   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1198   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1199   EXPECT_EQ(backends_[1]->service_.request_count(), 0);
1200   // A second RPC for kTestValue should not generate another RLS
1201   // request, because the cache entry is held by min_eviction_time.
1202   CheckRpcSendOk(DEBUG_LOCATION,
1203                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1204   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1205   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1206   EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1207   EXPECT_EQ(backends_[1]->service_.request_count(), 0);
1208   // Wait for min_eviction_time to elapse.
1209   gpr_sleep_until(grpc_timeout_seconds_to_deadline(6));
1210   // Send a request for kTestValue2.
1211   // RLS server gets a request, and RPC goes to backend.
1212   // This causes the entry for kTestValue to be evicted.
1213   CheckRpcSendOk(DEBUG_LOCATION,
1214                  RpcOptions().set_metadata({{"key1", kTestValue2}}));
1215   EXPECT_EQ(rls_server_->service_.request_count(), 2);
1216   EXPECT_EQ(rls_server_->service_.response_count(), 2);
1217   EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1218   EXPECT_EQ(backends_[1]->service_.request_count(), 1);
1219   // Send another RPC for kTestValue.
1220   // This should now trigger a new RLS request.
1221   CheckRpcSendOk(DEBUG_LOCATION,
1222                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1223   EXPECT_EQ(rls_server_->service_.request_count(), 3);
1224   EXPECT_EQ(rls_server_->service_.response_count(), 3);
1225   EXPECT_EQ(backends_[0]->service_.request_count(), 3);
1226   EXPECT_EQ(backends_[1]->service_.request_count(), 1);
1227   // Another RPC for kTestValue2 should still work due to min_eviction_time.
1228   CheckRpcSendOk(DEBUG_LOCATION,
1229                  RpcOptions().set_metadata({{"key1", kTestValue2}}));
1230   EXPECT_EQ(rls_server_->service_.request_count(), 3);
1231   EXPECT_EQ(rls_server_->service_.response_count(), 3);
1232   EXPECT_EQ(backends_[0]->service_.request_count(), 3);
1233   EXPECT_EQ(backends_[1]->service_.request_count(), 2);
1234 }
1235 
TEST_F(RlsEnd2endTest,MultipleTargets)1236 TEST_F(RlsEnd2endTest, MultipleTargets) {
1237   StartBackends(1);
1238   SetNextResolution(
1239       MakeServiceConfigBuilder()
1240           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1241                                          "  \"service\":\"%s\","
1242                                          "  \"method\":\"%s\""
1243                                          "}],"
1244                                          "\"headers\":["
1245                                          "  {"
1246                                          "    \"key\":\"%s\","
1247                                          "    \"names\":["
1248                                          "      \"key1\""
1249                                          "    ]"
1250                                          "  }"
1251                                          "]",
1252                                          kServiceValue, kMethodValue, kTestKey))
1253           .Build());
1254   rls_server_->service_.SetResponse(
1255       BuildRlsRequest({{kTestKey, kTestValue}}),
1256       BuildRlsResponse(
1257           // Second target will report TRANSIENT_FAILURE, but should
1258           // never be used.
1259           {grpc_core::LocalIpUri(backends_[0]->port_), "invalid_target"}));
1260   CheckRpcSendOk(DEBUG_LOCATION,
1261                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1262   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1263   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1264   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1265 }
1266 
TEST_F(RlsEnd2endTest,MultipleTargetsFirstInTransientFailure)1267 TEST_F(RlsEnd2endTest, MultipleTargetsFirstInTransientFailure) {
1268   StartBackends(1);
1269   SetNextResolution(
1270       MakeServiceConfigBuilder()
1271           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1272                                          "  \"service\":\"%s\","
1273                                          "  \"method\":\"%s\""
1274                                          "}],"
1275                                          "\"headers\":["
1276                                          "  {"
1277                                          "    \"key\":\"%s\","
1278                                          "    \"names\":["
1279                                          "      \"key1\""
1280                                          "    ]"
1281                                          "  }"
1282                                          "]",
1283                                          kServiceValue, kMethodValue, kTestKey))
1284           .Build());
1285   rls_server_->service_.SetResponse(
1286       BuildRlsRequest({{kTestKey, kTestValue}}),
1287       BuildRlsResponse(
1288           // First target will report TRANSIENT_FAILURE.
1289           {"invalid_target", grpc_core::LocalIpUri(backends_[0]->port_)}));
1290   CheckRpcSendOk(DEBUG_LOCATION,
1291                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1292   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1293   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1294   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1295 }
1296 
TEST_F(RlsEnd2endTest,ConnectivityStateReady)1297 TEST_F(RlsEnd2endTest, ConnectivityStateReady) {
1298   StartBackends(1);
1299   SetNextResolution(
1300       MakeServiceConfigBuilder()
1301           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1302                                          "  \"service\":\"%s\","
1303                                          "  \"method\":\"%s\""
1304                                          "}],"
1305                                          "\"headers\":["
1306                                          "  {"
1307                                          "    \"key\":\"%s\","
1308                                          "    \"names\":["
1309                                          "      \"key1\""
1310                                          "    ]"
1311                                          "  }"
1312                                          "]",
1313                                          kServiceValue, kMethodValue, kTestKey))
1314           .Build());
1315   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
1316   rls_server_->service_.SetResponse(
1317       BuildRlsRequest({{kTestKey, kTestValue}}),
1318       BuildRlsResponse(
1319           // One target in TRANSIENT_FAILURE, the other in READY.
1320           {"invalid_target", grpc_core::LocalIpUri(backends_[0]->port_)}));
1321   CheckRpcSendOk(DEBUG_LOCATION,
1322                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1323   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1324   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1325   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1326   EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(/*try_to_connect=*/false));
1327 }
1328 
TEST_F(RlsEnd2endTest,ConnectivityStateIdle)1329 TEST_F(RlsEnd2endTest, ConnectivityStateIdle) {
1330   SetNextResolution(
1331       MakeServiceConfigBuilder()
1332           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1333                                          "  \"service\":\"%s\","
1334                                          "  \"method\":\"%s\""
1335                                          "}],"
1336                                          "\"headers\":["
1337                                          "  {"
1338                                          "    \"key\":\"%s\","
1339                                          "    \"names\":["
1340                                          "      \"key1\""
1341                                          "    ]"
1342                                          "  }"
1343                                          "]",
1344                                          kServiceValue, kMethodValue, kTestKey))
1345           .Build());
1346   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
1347   // RLS server not given any responses, so the request will fail.
1348   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1349                       "RLS request failed: INTERNAL: no response entry");
1350   // No child policies, so should be IDLE.
1351   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
1352 }
1353 
TEST_F(RlsEnd2endTest,ConnectivityStateTransientFailure)1354 TEST_F(RlsEnd2endTest, ConnectivityStateTransientFailure) {
1355   SetNextResolution(
1356       MakeServiceConfigBuilder()
1357           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1358                                          "  \"service\":\"%s\","
1359                                          "  \"method\":\"%s\""
1360                                          "}],"
1361                                          "\"headers\":["
1362                                          "  {"
1363                                          "    \"key\":\"%s\","
1364                                          "    \"names\":["
1365                                          "      \"key1\""
1366                                          "    ]"
1367                                          "  }"
1368                                          "]",
1369                                          kServiceValue, kMethodValue, kTestKey))
1370           .Build());
1371   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
1372   rls_server_->service_.SetResponse(BuildRlsRequest({{kTestKey, kTestValue}}),
1373                                     BuildRlsResponse({"invalid_target"}));
1374   CheckRpcSendFailure(
1375       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1376       "empty address list (no address in fixed_address_lb policy)",
1377       RpcOptions().set_metadata({{"key1", kTestValue}}));
1378   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1379   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1380   EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE,
1381             channel_->GetState(/*try_to_connect=*/false));
1382 }
1383 
1384 class RlsMetricsEnd2endTest : public RlsEnd2endTest {
1385  protected:
SetUp()1386   void SetUp() override {
1387     // Register stats plugin before initializing client.
1388     stats_plugin_ = grpc_core::FakeStatsPluginBuilder()
1389                         .UseDisabledByDefaultMetrics(true)
1390                         .BuildAndRegister();
1391     RlsEnd2endTest::SetUp();
1392   }
1393 
1394   std::shared_ptr<grpc_core::FakeStatsPlugin> stats_plugin_;
1395 };
1396 
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionDefaultTargetPicks)1397 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionDefaultTargetPicks) {
1398   const auto* descriptor =
1399       grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1400           "grpc.lb.rls.default_target_picks");
1401   ASSERT_NE(descriptor, nullptr);
1402   EXPECT_EQ(descriptor->value_type,
1403             grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64);
1404   EXPECT_EQ(descriptor->instrument_type,
1405             grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCounter);
1406   EXPECT_EQ(descriptor->enable_by_default, false);
1407   EXPECT_EQ(descriptor->name, "grpc.lb.rls.default_target_picks");
1408   EXPECT_EQ(descriptor->unit, "{pick}");
1409   EXPECT_THAT(descriptor->label_keys,
1410               ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target",
1411                                      "grpc.lb.rls.data_plane_target",
1412                                      "grpc.lb.pick_result"));
1413   EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1414 }
1415 
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionTargetPicks)1416 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionTargetPicks) {
1417   const auto* descriptor =
1418       grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1419           "grpc.lb.rls.target_picks");
1420   ASSERT_NE(descriptor, nullptr);
1421   EXPECT_EQ(descriptor->value_type,
1422             grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64);
1423   EXPECT_EQ(descriptor->instrument_type,
1424             grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCounter);
1425   EXPECT_EQ(descriptor->enable_by_default, false);
1426   EXPECT_EQ(descriptor->name, "grpc.lb.rls.target_picks");
1427   EXPECT_EQ(descriptor->unit, "{pick}");
1428   EXPECT_THAT(descriptor->label_keys,
1429               ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target",
1430                                      "grpc.lb.rls.data_plane_target",
1431                                      "grpc.lb.pick_result"));
1432   EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1433 }
1434 
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionFailedPicks)1435 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionFailedPicks) {
1436   const auto* descriptor =
1437       grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1438           "grpc.lb.rls.failed_picks");
1439   ASSERT_NE(descriptor, nullptr);
1440   EXPECT_EQ(descriptor->value_type,
1441             grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64);
1442   EXPECT_EQ(descriptor->instrument_type,
1443             grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCounter);
1444   EXPECT_EQ(descriptor->enable_by_default, false);
1445   EXPECT_EQ(descriptor->name, "grpc.lb.rls.failed_picks");
1446   EXPECT_EQ(descriptor->unit, "{pick}");
1447   EXPECT_THAT(
1448       descriptor->label_keys,
1449       ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target"));
1450   EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1451 }
1452 
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionCacheEntries)1453 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionCacheEntries) {
1454   const auto* descriptor =
1455       grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1456           "grpc.lb.rls.cache_entries");
1457   ASSERT_NE(descriptor, nullptr);
1458   EXPECT_EQ(descriptor->value_type,
1459             grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64);
1460   EXPECT_EQ(
1461       descriptor->instrument_type,
1462       grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge);
1463   EXPECT_EQ(descriptor->enable_by_default, false);
1464   EXPECT_EQ(descriptor->name, "grpc.lb.rls.cache_entries");
1465   EXPECT_EQ(descriptor->unit, "{entry}");
1466   EXPECT_THAT(descriptor->label_keys,
1467               ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target",
1468                                      "grpc.lb.rls.instance_uuid"));
1469   EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1470 }
1471 
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionCacheSize)1472 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionCacheSize) {
1473   const auto* descriptor =
1474       grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1475           "grpc.lb.rls.cache_size");
1476   ASSERT_NE(descriptor, nullptr);
1477   EXPECT_EQ(descriptor->value_type,
1478             grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64);
1479   EXPECT_EQ(
1480       descriptor->instrument_type,
1481       grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge);
1482   EXPECT_EQ(descriptor->enable_by_default, false);
1483   EXPECT_EQ(descriptor->name, "grpc.lb.rls.cache_size");
1484   EXPECT_EQ(descriptor->unit, "By");
1485   EXPECT_THAT(descriptor->label_keys,
1486               ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target",
1487                                      "grpc.lb.rls.instance_uuid"));
1488   EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1489 }
1490 
TEST_F(RlsMetricsEnd2endTest,MetricValues)1491 TEST_F(RlsMetricsEnd2endTest, MetricValues) {
1492   auto kMetricTargetPicks =
1493       grpc_core::GlobalInstrumentsRegistryTestPeer::
1494           FindUInt64CounterHandleByName("grpc.lb.rls.target_picks")
1495               .value();
1496   auto kMetricFailedPicks =
1497       grpc_core::GlobalInstrumentsRegistryTestPeer::
1498           FindUInt64CounterHandleByName("grpc.lb.rls.failed_picks")
1499               .value();
1500   auto kMetricCacheEntries =
1501       grpc_core::GlobalInstrumentsRegistryTestPeer::
1502           FindCallbackInt64GaugeHandleByName("grpc.lb.rls.cache_entries")
1503               .value();
1504   auto kMetricCacheSize =
1505       grpc_core::GlobalInstrumentsRegistryTestPeer::
1506           FindCallbackInt64GaugeHandleByName("grpc.lb.rls.cache_size")
1507               .value();
1508   StartBackends(2);
1509   SetNextResolution(
1510       MakeServiceConfigBuilder()
1511           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1512                                          "  \"service\":\"%s\","
1513                                          "  \"method\":\"%s\""
1514                                          "}],"
1515                                          "\"headers\":["
1516                                          "  {"
1517                                          "    \"key\":\"%s\","
1518                                          "    \"names\":["
1519                                          "      \"key1\""
1520                                          "    ]"
1521                                          "  }"
1522                                          "]",
1523                                          kServiceValue, kMethodValue, kTestKey))
1524           .Build());
1525   const std::string rls_target0 = grpc_core::LocalIpUri(backends_[0]->port_);
1526   const std::string rls_target1 = grpc_core::LocalIpUri(backends_[1]->port_);
1527   // Send an RPC to the target for backend 0.
1528   rls_server_->service_.SetResponse(BuildRlsRequest({{kTestKey, rls_target0}}),
1529                                     BuildRlsResponse({rls_target0}));
1530   CheckRpcSendOk(DEBUG_LOCATION,
1531                  RpcOptions().set_metadata({{"key1", rls_target0}}));
1532   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1533   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1534   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1535   EXPECT_EQ(backends_[1]->service_.request_count(), 0);
1536   // Check exported metrics.
1537   EXPECT_THAT(
1538       stats_plugin_->GetUInt64CounterValue(
1539           kMetricTargetPicks,
1540           {target_uri_, rls_server_target_, rls_target0, "complete"}, {}),
1541       ::testing::Optional(1));
1542   EXPECT_THAT(
1543       stats_plugin_->GetUInt64CounterValue(
1544           kMetricTargetPicks,
1545           {target_uri_, rls_server_target_, rls_target1, "complete"}, {}),
1546       absl::nullopt);
1547   EXPECT_EQ(stats_plugin_->GetUInt64CounterValue(
1548                 kMetricFailedPicks, {target_uri_, rls_server_target_}, {}),
1549             absl::nullopt);
1550   stats_plugin_->TriggerCallbacks();
1551   EXPECT_THAT(stats_plugin_->GetInt64CallbackGaugeValue(
1552                   kMetricCacheEntries,
1553                   {target_uri_, rls_server_target_, kRlsInstanceUuid}, {}),
1554               ::testing::Optional(1));
1555   auto cache_size = stats_plugin_->GetInt64CallbackGaugeValue(
1556       kMetricCacheSize, {target_uri_, rls_server_target_, kRlsInstanceUuid},
1557       {});
1558   EXPECT_THAT(cache_size, ::testing::Optional(::testing::Ge(1)));
1559   // Send an RPC to the target for backend 1.
1560   rls_server_->service_.SetResponse(BuildRlsRequest({{kTestKey, rls_target1}}),
1561                                     BuildRlsResponse({rls_target1}));
1562   CheckRpcSendOk(DEBUG_LOCATION,
1563                  RpcOptions().set_metadata({{"key1", rls_target1}}));
1564   EXPECT_EQ(rls_server_->service_.request_count(), 2);
1565   EXPECT_EQ(rls_server_->service_.response_count(), 2);
1566   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1567   EXPECT_EQ(backends_[1]->service_.request_count(), 1);
1568   // Check exported metrics.
1569   EXPECT_THAT(
1570       stats_plugin_->GetUInt64CounterValue(
1571           kMetricTargetPicks,
1572           {target_uri_, rls_server_target_, rls_target0, "complete"}, {}),
1573       ::testing::Optional(1));
1574   EXPECT_THAT(
1575       stats_plugin_->GetUInt64CounterValue(
1576           kMetricTargetPicks,
1577           {target_uri_, rls_server_target_, rls_target1, "complete"}, {}),
1578       ::testing::Optional(1));
1579   EXPECT_EQ(stats_plugin_->GetUInt64CounterValue(
1580                 kMetricFailedPicks, {target_uri_, rls_server_target_}, {}),
1581             absl::nullopt);
1582   stats_plugin_->TriggerCallbacks();
1583   EXPECT_THAT(stats_plugin_->GetInt64CallbackGaugeValue(
1584                   kMetricCacheEntries,
1585                   {target_uri_, rls_server_target_, kRlsInstanceUuid}, {}),
1586               ::testing::Optional(2));
1587   auto cache_size2 = stats_plugin_->GetInt64CallbackGaugeValue(
1588       kMetricCacheSize, {target_uri_, rls_server_target_, kRlsInstanceUuid},
1589       {});
1590   EXPECT_THAT(cache_size2, ::testing::Optional(::testing::Ge(2)));
1591   if (cache_size.has_value() && cache_size2.has_value()) {
1592     EXPECT_GT(*cache_size2, *cache_size);
1593   }
1594   // Send an RPC for which the RLS server has no response, which means
1595   // that the RLS request will fail.  There is no default target, so the
1596   // data plane RPC will fail.
1597   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1598                       "RLS request failed: INTERNAL: no response entry",
1599                       RpcOptions().set_metadata({{"key1", kTestValue}}));
1600   EXPECT_THAT(
1601       rls_server_->service_.GetUnmatchedRequests(),
1602       ::testing::ElementsAre(
1603           // TODO(roth): Change this to use ::testing::ProtoEquals()
1604           // once that becomes available in OSS.
1605           ::testing::Property(
1606               &RouteLookupRequest::DebugString,
1607               BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
1608   EXPECT_EQ(rls_server_->service_.request_count(), 3);
1609   EXPECT_EQ(rls_server_->service_.response_count(), 2);
1610   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1611   EXPECT_EQ(backends_[1]->service_.request_count(), 1);
1612   // Check exported metrics.
1613   EXPECT_THAT(
1614       stats_plugin_->GetUInt64CounterValue(
1615           kMetricTargetPicks,
1616           {target_uri_, rls_server_target_, rls_target0, "complete"}, {}),
1617       ::testing::Optional(1));
1618   EXPECT_THAT(
1619       stats_plugin_->GetUInt64CounterValue(
1620           kMetricTargetPicks,
1621           {target_uri_, rls_server_target_, rls_target1, "complete"}, {}),
1622       ::testing::Optional(1));
1623   EXPECT_THAT(stats_plugin_->GetUInt64CounterValue(
1624                   kMetricFailedPicks, {target_uri_, rls_server_target_}, {}),
1625               ::testing::Optional(1));
1626   stats_plugin_->TriggerCallbacks();
1627   EXPECT_THAT(stats_plugin_->GetInt64CallbackGaugeValue(
1628                   kMetricCacheEntries,
1629                   {target_uri_, rls_server_target_, kRlsInstanceUuid}, {}),
1630               ::testing::Optional(3));
1631   auto cache_size3 = stats_plugin_->GetInt64CallbackGaugeValue(
1632       kMetricCacheSize, {target_uri_, rls_server_target_, kRlsInstanceUuid},
1633       {});
1634   EXPECT_THAT(cache_size3, ::testing::Optional(::testing::Ge(3)));
1635   if (cache_size.has_value() && cache_size3.has_value()) {
1636     EXPECT_GT(*cache_size3, *cache_size);
1637   }
1638 }
1639 
TEST_F(RlsMetricsEnd2endTest,MetricValuesDefaultTargetRpcs)1640 TEST_F(RlsMetricsEnd2endTest, MetricValuesDefaultTargetRpcs) {
1641   auto kMetricDefaultTargetPicks =
1642       grpc_core::GlobalInstrumentsRegistryTestPeer::
1643           FindUInt64CounterHandleByName("grpc.lb.rls.default_target_picks")
1644               .value();
1645   StartBackends(1);
1646   const std::string default_target = grpc_core::LocalIpUri(backends_[0]->port_);
1647   SetNextResolution(
1648       MakeServiceConfigBuilder()
1649           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1650                                          "  \"service\":\"%s\","
1651                                          "  \"method\":\"%s\""
1652                                          "}],"
1653                                          "\"headers\":["
1654                                          "  {"
1655                                          "    \"key\":\"%s\","
1656                                          "    \"names\":["
1657                                          "      \"key1\""
1658                                          "    ]"
1659                                          "  }"
1660                                          "]",
1661                                          kServiceValue, kMethodValue, kTestKey))
1662           .set_default_target(default_target)
1663           .Build());
1664   // Don't give the RLS server a response, so the RLS request will fail.
1665   // The data plane RPC should be sent to the default target.
1666   CheckRpcSendOk(DEBUG_LOCATION,
1667                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1668   EXPECT_THAT(
1669       rls_server_->service_.GetUnmatchedRequests(),
1670       ::testing::ElementsAre(
1671           // TODO(roth): Change this to use ::testing::ProtoEquals()
1672           // once that becomes available in OSS.
1673           ::testing::Property(
1674               &RouteLookupRequest::DebugString,
1675               BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
1676   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1677   EXPECT_EQ(rls_server_->service_.response_count(), 0);
1678   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1679   // Check expected metrics.
1680   EXPECT_THAT(
1681       stats_plugin_->GetUInt64CounterValue(
1682           kMetricDefaultTargetPicks,
1683           {target_uri_, rls_server_target_, default_target, "complete"}, {}),
1684       ::testing::Optional(1));
1685 }
1686 
1687 }  // namespace
1688 }  // namespace testing
1689 }  // namespace grpc
1690 
main(int argc,char ** argv)1691 int main(int argc, char** argv) {
1692   ::testing::InitGoogleTest(&argc, argv);
1693   grpc::testing::TestEnvironment env(&argc, argv);
1694   return RUN_ALL_TESTS();
1695 }
1696