1 //
2 // Copyright 2020 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 // FIXME: add tests:
18 // - cache eviction via cleanup timer (based on age)
19 // - RLS channel is down; wait_for_ready request is sent and RLS request fails
20 // and goes into backoff; RLS channel comes back up before backoff timer
21 // fires; request is processed at that point
22 // - find some deterministic way to exercise adaptive throttler code
23
24 #include <gmock/gmock.h>
25 #include <grpc/credentials.h>
26 #include <grpcpp/channel.h>
27 #include <grpcpp/create_channel.h>
28 #include <grpcpp/security/credentials.h>
29 #include <grpcpp/server.h>
30 #include <grpcpp/server_builder.h>
31 #include <grpcpp/support/channel_arguments.h>
32 #include <gtest/gtest.h>
33
34 #include <deque>
35 #include <map>
36 #include <thread>
37
38 #include "absl/log/check.h"
39 #include "absl/log/log.h"
40 #include "absl/strings/str_format.h"
41 #include "absl/strings/str_join.h"
42 #include "absl/types/optional.h"
43 #include "src/core/client_channel/backup_poller.h"
44 #include "src/core/config/config_vars.h"
45 #include "src/core/lib/address_utils/parse_address.h"
46 #include "src/core/lib/channel/channel_args.h"
47 #include "src/core/lib/iomgr/sockaddr.h"
48 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
49 #include "src/core/load_balancing/rls/rls.h"
50 #include "src/core/resolver/fake/fake_resolver.h"
51 #include "src/core/service_config/service_config_impl.h"
52 #include "src/core/util/env.h"
53 #include "src/core/util/host_port.h"
54 #include "src/core/util/time.h"
55 #include "src/core/util/uri.h"
56 #include "src/cpp/server/secure_server_credentials.h"
57 #include "src/proto/grpc/lookup/v1/rls.grpc.pb.h"
58 #include "src/proto/grpc/lookup/v1/rls.pb.h"
59 #include "src/proto/grpc/testing/echo.grpc.pb.h"
60 #include "test/core/event_engine/event_engine_test_utils.h"
61 #include "test/core/test_util/fake_stats_plugin.h"
62 #include "test/core/test_util/port.h"
63 #include "test/core/test_util/resolve_localhost_ip46.h"
64 #include "test/core/test_util/test_config.h"
65 #include "test/core/test_util/test_lb_policies.h"
66 #include "test/cpp/end2end/counted_service.h"
67 #include "test/cpp/end2end/rls_server.h"
68 #include "test/cpp/end2end/test_service_impl.h"
69 #include "test/cpp/util/credentials.h"
70 #include "test/cpp/util/test_config.h"
71
72 using ::grpc::lookup::v1::RouteLookupRequest;
73
74 namespace grpc {
75 namespace testing {
76 namespace {
77
78 const char* kServerName = "test.google.fr";
79 const char* kRequestMessage = "Live long and prosper.";
80 const char* kRlsInstanceUuid = "rls_instance_uuid";
81
82 const char* kCallCredsMdKey = "call_cred_name";
83 const char* kCallCredsMdValue = "call_cred_value";
84
85 const char* kTestKey = "test_key";
86 const char* kTestValue = "test_value";
87 const char* kHostKey = "host_key";
88 const char* kServiceKey = "service_key";
89 const char* kServiceValue = "grpc.testing.EchoTestService";
90 const char* kMethodKey = "method_key";
91 const char* kMethodValue = "Echo";
92 const char* kConstantKey = "constant_key";
93 const char* kConstantValue = "constant_value";
94
95 using BackendService = CountedService<TestServiceImpl>;
96
97 // Subclass of TestServiceImpl that increments a request counter for
98 // every call to the Echo Rpc.
99 class MyTestServiceImpl : public BackendService {
100 public:
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)101 Status Echo(ServerContext* context, const EchoRequest* request,
102 EchoResponse* response) override {
103 // Backend should see call creds.
104 EXPECT_THAT(context->client_metadata(),
105 ::testing::Contains(
106 ::testing::Pair(kCallCredsMdKey, kCallCredsMdValue)));
107 IncreaseRequestCount();
108 auto client_metadata = context->client_metadata();
109 auto range = client_metadata.equal_range("x-google-rls-data");
110 {
111 grpc::internal::MutexLock lock(&mu_);
112 for (auto it = range.first; it != range.second; ++it) {
113 rls_header_data_.insert(
114 std::string(it->second.begin(), it->second.length()));
115 }
116 }
117 IncreaseResponseCount();
118 return TestServiceImpl::Echo(context, request, response);
119 }
120
rls_data()121 std::set<std::string> rls_data() {
122 grpc::internal::MutexLock lock(&mu_);
123 return std::move(rls_header_data_);
124 }
125
Start()126 void Start() {}
127
Shutdown()128 void Shutdown() {}
129
130 private:
131 grpc::internal::Mutex mu_;
132 std::set<std::string> rls_header_data_ ABSL_GUARDED_BY(&mu_);
133 };
134
135 class FakeResolverResponseGeneratorWrapper {
136 public:
FakeResolverResponseGeneratorWrapper()137 FakeResolverResponseGeneratorWrapper()
138 : response_generator_(grpc_core::MakeRefCounted<
139 grpc_core::FakeResolverResponseGenerator>()) {}
140
SetNextResolution(absl::string_view service_config_json)141 void SetNextResolution(absl::string_view service_config_json) {
142 grpc_core::ExecCtx exec_ctx;
143 response_generator_->SetResponseSynchronously(
144 BuildFakeResults(service_config_json));
145 }
146
Get() const147 grpc_core::FakeResolverResponseGenerator* Get() const {
148 return response_generator_.get();
149 }
150
151 private:
BuildFakeResults(absl::string_view service_config_json)152 static grpc_core::Resolver::Result BuildFakeResults(
153 absl::string_view service_config_json) {
154 grpc_core::Resolver::Result result;
155 result.service_config =
156 grpc_core::ServiceConfigImpl::Create(result.args, service_config_json);
157 EXPECT_TRUE(result.service_config.ok()) << result.service_config.status();
158 return result;
159 }
160
161 grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
162 response_generator_;
163 };
164
165 class RlsEnd2endTest : public ::testing::Test {
166 protected:
SetUpTestSuite()167 static void SetUpTestSuite() {
168 grpc_core::ConfigVars::Overrides overrides;
169 overrides.client_channel_backup_poll_interval_ms = 1;
170 grpc_core::ConfigVars::SetOverrides(overrides);
171 grpc_core::CoreConfiguration::RegisterBuilder(
172 grpc_core::RegisterFixedAddressLoadBalancingPolicy);
173 grpc_init();
174 }
175
TearDownTestSuite()176 static void TearDownTestSuite() {
177 grpc_shutdown_blocking();
178 WaitForSingleOwner(
179 grpc_event_engine::experimental::GetDefaultEventEngine());
180 grpc_core::CoreConfiguration::Reset();
181 }
182
SetUp()183 void SetUp() override {
184 rls_server_ = std::make_unique<ServerThread<RlsServiceImpl>>(
185 "rls", [](grpc::ServerContext* ctx) {
186 EXPECT_THAT(ctx->client_metadata(),
187 ::testing::Contains(
188 ::testing::Pair(kCallCredsMdKey, kCallCredsMdValue)));
189 EXPECT_EQ(ctx->ExperimentalGetAuthority(), kServerName);
190 });
191 rls_server_->Start();
192 rls_server_target_ = absl::StrFormat("localhost:%d", rls_server_->port_);
193 // Set up client.
194 resolver_response_generator_ =
195 std::make_unique<FakeResolverResponseGeneratorWrapper>();
196 ChannelArguments args;
197 args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
198 resolver_response_generator_->Get());
199 args.SetString(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS, kServerName);
200 args.SetString(GRPC_ARG_TEST_ONLY_RLS_INSTANCE_ID, kRlsInstanceUuid);
201 grpc_channel_credentials* channel_creds =
202 grpc_fake_transport_security_credentials_create();
203 grpc_call_credentials* call_creds = grpc_md_only_test_credentials_create(
204 kCallCredsMdKey, kCallCredsMdValue);
205 auto creds = std::make_shared<TestCompositeChannelCredentials>(
206 channel_creds, call_creds);
207 call_creds->Unref();
208 channel_creds->Unref();
209 target_uri_ = absl::StrCat("fake:///", kServerName);
210 channel_ = grpc::CreateCustomChannel(target_uri_, creds, args);
211 stub_ = grpc::testing::EchoTestService::NewStub(channel_);
212 }
213
TearDown()214 void TearDown() override {
215 ShutdownBackends();
216 rls_server_->Shutdown();
217 }
218
ShutdownBackends()219 void ShutdownBackends() {
220 for (auto& server : backends_) {
221 server->Shutdown();
222 }
223 }
224
StartBackends(size_t num_servers)225 void StartBackends(size_t num_servers) {
226 backends_.clear();
227 for (size_t i = 0; i < num_servers; ++i) {
228 backends_.push_back(
229 std::make_unique<ServerThread<MyTestServiceImpl>>("backend"));
230 backends_.back()->Start();
231 }
232 }
233
234 struct RpcOptions {
235 int timeout_ms = 5000;
236 bool wait_for_ready = false;
237 std::vector<std::pair<std::string, std::string>> metadata;
238
RpcOptionsgrpc::testing::__anon4e73adf20111::RlsEnd2endTest::RpcOptions239 RpcOptions() {}
240
set_timeout_msgrpc::testing::__anon4e73adf20111::RlsEnd2endTest::RpcOptions241 RpcOptions& set_timeout_ms(int rpc_timeout_ms) {
242 timeout_ms = rpc_timeout_ms;
243 return *this;
244 }
245
set_wait_for_readygrpc::testing::__anon4e73adf20111::RlsEnd2endTest::RpcOptions246 RpcOptions& set_wait_for_ready(bool rpc_wait_for_ready) {
247 wait_for_ready = rpc_wait_for_ready;
248 return *this;
249 }
250
set_metadatagrpc::testing::__anon4e73adf20111::RlsEnd2endTest::RpcOptions251 RpcOptions& set_metadata(
252 std::vector<std::pair<std::string, std::string>> rpc_metadata) {
253 metadata = std::move(rpc_metadata);
254 return *this;
255 }
256
257 // Populates context.
SetupRpcgrpc::testing::__anon4e73adf20111::RlsEnd2endTest::RpcOptions258 void SetupRpc(ClientContext* context) const {
259 for (const auto& item : metadata) {
260 context->AddMetadata(item.first, item.second);
261 }
262 if (timeout_ms != 0) {
263 context->set_deadline(
264 grpc_timeout_milliseconds_to_deadline(timeout_ms));
265 }
266 if (wait_for_ready) context->set_wait_for_ready(true);
267 }
268 };
269
SendRpc(const RpcOptions & rpc_options=RpcOptions (),EchoResponse * response=nullptr)270 Status SendRpc(const RpcOptions& rpc_options = RpcOptions(),
271 EchoResponse* response = nullptr) {
272 EchoResponse local_response;
273 if (response == nullptr) response = &local_response;
274 ClientContext context;
275 rpc_options.SetupRpc(&context);
276 EchoRequest request;
277 request.set_message(kRequestMessage);
278 return stub_->Echo(&context, request, response);
279 }
280
CheckRpcSendOk(const grpc_core::DebugLocation & location,const RpcOptions & rpc_options=RpcOptions ())281 void CheckRpcSendOk(const grpc_core::DebugLocation& location,
282 const RpcOptions& rpc_options = RpcOptions()) {
283 EchoResponse response;
284 Status status = SendRpc(rpc_options, &response);
285 ASSERT_TRUE(status.ok()) << location.file() << ":" << location.line()
286 << ": RPC failed: " << status.error_code() << ": "
287 << status.error_message();
288 EXPECT_EQ(response.message(), kRequestMessage)
289 << location.file() << ":" << location.line();
290 }
291
CheckRpcSendFailure(const grpc_core::DebugLocation & location,StatusCode expected_code,absl::string_view expected_message,const RpcOptions & rpc_options=RpcOptions ())292 void CheckRpcSendFailure(const grpc_core::DebugLocation& location,
293 StatusCode expected_code,
294 absl::string_view expected_message,
295 const RpcOptions& rpc_options = RpcOptions()) {
296 Status status = SendRpc(rpc_options);
297 ASSERT_FALSE(status.ok()) << location.file() << ":" << location.line();
298 EXPECT_EQ(expected_code, status.error_code())
299 << location.file() << ":" << location.line();
300 EXPECT_EQ(expected_message, status.error_message())
301 << location.file() << ":" << location.line();
302 }
303
304 class ServiceConfigBuilder {
305 public:
ServiceConfigBuilder(absl::string_view rls_server_target)306 explicit ServiceConfigBuilder(absl::string_view rls_server_target)
307 : rls_server_target_(rls_server_target) {}
308
set_lookup_service_timeout(grpc_core::Duration timeout)309 ServiceConfigBuilder& set_lookup_service_timeout(
310 grpc_core::Duration timeout) {
311 lookup_service_timeout_ = timeout * grpc_test_slowdown_factor();
312 return *this;
313 }
314
set_default_target(std::string default_target)315 ServiceConfigBuilder& set_default_target(std::string default_target) {
316 default_target_ = std::move(default_target);
317 return *this;
318 }
319
set_max_age(grpc_core::Duration max_age)320 ServiceConfigBuilder& set_max_age(grpc_core::Duration max_age) {
321 max_age_ = max_age * grpc_test_slowdown_factor();
322 return *this;
323 }
324
set_stale_age(grpc_core::Duration stale_age)325 ServiceConfigBuilder& set_stale_age(grpc_core::Duration stale_age) {
326 stale_age_ = stale_age * grpc_test_slowdown_factor();
327 return *this;
328 }
329
set_cache_size_bytes(int64_t size)330 ServiceConfigBuilder& set_cache_size_bytes(int64_t size) {
331 cache_size_bytes_ = size;
332 return *this;
333 }
334
AddKeyBuilder(absl::string_view key_builder)335 ServiceConfigBuilder& AddKeyBuilder(absl::string_view key_builder) {
336 key_builders_.push_back(absl::StrCat("{", key_builder, "}"));
337 return *this;
338 }
339
Build()340 std::string Build() {
341 // First build parts of routeLookupConfig.
342 std::vector<std::string> route_lookup_config_parts;
343 route_lookup_config_parts.push_back(absl::StrFormat(
344 " \"lookupService\":\"%s\"", rls_server_target_));
345 if (lookup_service_timeout_ > grpc_core::Duration::Zero()) {
346 route_lookup_config_parts.push_back(
347 absl::StrFormat(" \"lookupServiceTimeout\":\"%fs\"",
348 lookup_service_timeout_.seconds()));
349 }
350 if (!default_target_.empty()) {
351 route_lookup_config_parts.push_back(absl::StrFormat(
352 " \"defaultTarget\":\"%s\"", default_target_));
353 }
354 route_lookup_config_parts.push_back(absl::StrFormat(
355 " \"cacheSizeBytes\":%" PRId64, cache_size_bytes_));
356 if (max_age_ > grpc_core::Duration::Zero()) {
357 route_lookup_config_parts.push_back(
358 absl::StrFormat(" \"maxAge\":\"%fs\"", max_age_.seconds()));
359 }
360 if (stale_age_ > grpc_core::Duration::Zero()) {
361 route_lookup_config_parts.push_back(absl::StrFormat(
362 " \"staleAge\":\"%fs\"", stale_age_.seconds()));
363 }
364 if (!key_builders_.empty()) {
365 route_lookup_config_parts.push_back(
366 absl::StrFormat(" \"grpcKeybuilders\":[%s]",
367 absl::StrJoin(key_builders_, ",")));
368 }
369 // Now build parts of RLS LB policy config.
370 std::vector<std::string> rls_config_parts;
371 if (!route_lookup_config_parts.empty()) {
372 rls_config_parts.push_back(absl::StrCat(
373 " \"routeLookupConfig\":{",
374 absl::StrJoin(route_lookup_config_parts, ","), " }"));
375 }
376 rls_config_parts.push_back(
377 " \"childPolicy\":[{"
378 " \"fixed_address_lb\":{}\n"
379 " }],\n"
380 " \"childPolicyConfigTargetFieldName\":\"address\"\n");
381 // Put it all together.
382 return absl::StrCat(
383 "{"
384 " \"loadBalancingConfig\":[{"
385 " \"rls_experimental\":{",
386 absl::StrJoin(rls_config_parts, ","),
387 " }"
388 " }]"
389 "}");
390 }
391
392 private:
393 absl::string_view rls_server_target_;
394 grpc_core::Duration lookup_service_timeout_;
395 std::string default_target_;
396 grpc_core::Duration max_age_;
397 grpc_core::Duration stale_age_;
398 int64_t cache_size_bytes_ = 10485760;
399 std::vector<std::string> key_builders_;
400 };
401
MakeServiceConfigBuilder()402 ServiceConfigBuilder MakeServiceConfigBuilder() {
403 return ServiceConfigBuilder(rls_server_target_);
404 }
405
SetNextResolution(absl::string_view service_config_json)406 void SetNextResolution(absl::string_view service_config_json) {
407 resolver_response_generator_->SetNextResolution(service_config_json);
408 }
409
410 template <typename T>
411 struct ServerThread {
412 template <typename... Args>
ServerThreadgrpc::testing::__anon4e73adf20111::RlsEnd2endTest::ServerThread413 explicit ServerThread(const grpc::string& type, Args&&... args)
414 : port_(grpc_pick_unused_port_or_die()),
415 type_(type),
416 service_(std::forward<Args>(args)...) {}
417
Startgrpc::testing::__anon4e73adf20111::RlsEnd2endTest::ServerThread418 void Start() {
419 LOG(INFO) << "starting " << type_ << " server on port " << port_;
420 CHECK(!running_);
421 running_ = true;
422 service_.Start();
423 grpc::internal::Mutex mu;
424 // We need to acquire the lock here in order to prevent the notify_one
425 // by ServerThread::Serve from firing before the wait below is hit.
426 grpc::internal::MutexLock lock(&mu);
427 grpc::internal::CondVar cond;
428 thread_ = std::make_unique<std::thread>(
429 std::bind(&ServerThread::Serve, this, &mu, &cond));
430 cond.Wait(&mu);
431 LOG(INFO) << type_ << " server startup complete";
432 }
433
Servegrpc::testing::__anon4e73adf20111::RlsEnd2endTest::ServerThread434 void Serve(grpc::internal::Mutex* mu, grpc::internal::CondVar* cond) {
435 // We need to acquire the lock here in order to prevent the notify_one
436 // below from firing before its corresponding wait is executed.
437 grpc::internal::MutexLock lock(mu);
438 ServerBuilder builder;
439 auto creds = std::make_shared<SecureServerCredentials>(
440 grpc_fake_transport_security_server_credentials_create());
441 builder.AddListeningPort(absl::StrCat("localhost:", port_),
442 std::move(creds));
443 builder.RegisterService(&service_);
444 server_ = builder.BuildAndStart();
445 cond->Signal();
446 }
447
Shutdowngrpc::testing::__anon4e73adf20111::RlsEnd2endTest::ServerThread448 void Shutdown() {
449 if (!running_) return;
450 LOG(INFO) << type_ << " about to shutdown";
451 service_.Shutdown();
452 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
453 thread_->join();
454 LOG(INFO) << type_ << " shutdown completed";
455 running_ = false;
456 }
457
458 const int port_;
459 grpc::string type_;
460 T service_;
461 std::unique_ptr<Server> server_;
462 std::unique_ptr<std::thread> thread_;
463 bool running_ = false;
464 };
465
466 std::vector<std::unique_ptr<ServerThread<MyTestServiceImpl>>> backends_;
467 std::string rls_server_target_;
468 std::unique_ptr<ServerThread<RlsServiceImpl>> rls_server_;
469 std::unique_ptr<FakeResolverResponseGeneratorWrapper>
470 resolver_response_generator_;
471 std::string target_uri_;
472 std::shared_ptr<grpc::Channel> channel_;
473 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
474 };
475
TEST_F(RlsEnd2endTest,Basic)476 TEST_F(RlsEnd2endTest, Basic) {
477 StartBackends(1);
478 SetNextResolution(
479 MakeServiceConfigBuilder()
480 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
481 " \"service\":\"%s\","
482 " \"method\":\"%s\""
483 "}],"
484 "\"headers\":["
485 " {"
486 " \"key\":\"%s\","
487 " \"names\":["
488 " \"key1\""
489 " ]"
490 " }"
491 "]",
492 kServiceValue, kMethodValue, kTestKey))
493 .Build());
494 rls_server_->service_.SetResponse(
495 BuildRlsRequest({{kTestKey, kTestValue}}),
496 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
497 CheckRpcSendOk(DEBUG_LOCATION,
498 RpcOptions().set_metadata({{"key1", kTestValue}}));
499 EXPECT_EQ(rls_server_->service_.request_count(), 1);
500 EXPECT_EQ(rls_server_->service_.response_count(), 1);
501 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
502 // No RLS header seen by the backend, since the RLS response didn't set any.
503 EXPECT_THAT(backends_[0]->service_.rls_data(), ::testing::ElementsAre());
504 }
505
TEST_F(RlsEnd2endTest,DuplicateHeadersAreMerged)506 TEST_F(RlsEnd2endTest, DuplicateHeadersAreMerged) {
507 const char* kTestValue2 = "test_value_2";
508 StartBackends(1);
509 SetNextResolution(
510 MakeServiceConfigBuilder()
511 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
512 " \"service\":\"%s\","
513 " \"method\":\"%s\""
514 "}],"
515 "\"headers\":["
516 " {"
517 " \"key\":\"%s\","
518 " \"names\":["
519 " \"key1\""
520 " ]"
521 " }"
522 "]",
523 kServiceValue, kMethodValue, kTestKey))
524 .Build());
525 rls_server_->service_.SetResponse(
526 BuildRlsRequest({{kTestKey, absl::StrCat(kTestValue, ",", kTestValue2)}}),
527 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
528 // Same header present twice in the request. Values should be merged.
529 CheckRpcSendOk(
530 DEBUG_LOCATION,
531 RpcOptions().set_metadata({{"key1", kTestValue}, {"key1", kTestValue2}}));
532 EXPECT_EQ(rls_server_->service_.request_count(), 1);
533 EXPECT_EQ(rls_server_->service_.response_count(), 1);
534 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
535 }
536
TEST_F(RlsEnd2endTest,SecondHeaderUsed)537 TEST_F(RlsEnd2endTest, SecondHeaderUsed) {
538 StartBackends(1);
539 SetNextResolution(
540 MakeServiceConfigBuilder()
541 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
542 " \"service\":\"%s\","
543 " \"method\":\"%s\""
544 "}],"
545 "\"headers\":["
546 " {"
547 " \"key\":\"%s\","
548 " \"names\":["
549 " \"key1\", \"key2\""
550 " ]"
551 " }"
552 "]",
553 kServiceValue, kMethodValue, kTestKey))
554 .Build());
555 rls_server_->service_.SetResponse(
556 BuildRlsRequest({{kTestKey, kTestValue}}),
557 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
558 CheckRpcSendOk(DEBUG_LOCATION,
559 RpcOptions().set_metadata({{"key2", kTestValue}}));
560 EXPECT_EQ(rls_server_->service_.request_count(), 1);
561 EXPECT_EQ(rls_server_->service_.response_count(), 1);
562 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
563 }
564
TEST_F(RlsEnd2endTest,MultipleHeaderKeys)565 TEST_F(RlsEnd2endTest, MultipleHeaderKeys) {
566 const char* kTestKey2 = "test_key_2";
567 const char* kTestValue2 = "test_value_2";
568 StartBackends(1);
569 SetNextResolution(MakeServiceConfigBuilder()
570 .AddKeyBuilder(absl::StrFormat(
571 "\"names\":[{"
572 " \"service\":\"%s\","
573 " \"method\":\"%s\""
574 "}],"
575 "\"headers\":["
576 " {"
577 " \"key\":\"%s\","
578 " \"names\":["
579 " \"key1\""
580 " ]"
581 " },"
582 " {"
583 " \"key\":\"%s\","
584 " \"names\":["
585 " \"key2\""
586 " ]"
587 " }"
588 "]",
589 kServiceValue, kMethodValue, kTestKey, kTestKey2))
590 .Build());
591 rls_server_->service_.SetResponse(
592 BuildRlsRequest({
593 {kTestKey, kTestValue},
594 {kTestKey2, kTestValue2},
595 }),
596 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
597 CheckRpcSendOk(
598 DEBUG_LOCATION,
599 RpcOptions().set_metadata({{"key1", kTestValue}, {"key2", kTestValue2}}));
600 EXPECT_EQ(rls_server_->service_.request_count(), 1);
601 EXPECT_EQ(rls_server_->service_.response_count(), 1);
602 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
603 // No RLS header seen by the backend, since the RLS response didn't set any.
604 EXPECT_THAT(backends_[0]->service_.rls_data(), ::testing::ElementsAre());
605 }
606
TEST_F(RlsEnd2endTest,NoHeaderMatch)607 TEST_F(RlsEnd2endTest, NoHeaderMatch) {
608 StartBackends(1);
609 SetNextResolution(
610 MakeServiceConfigBuilder()
611 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
612 " \"service\":\"%s\","
613 " \"method\":\"%s\""
614 "}],"
615 "\"headers\":["
616 " {"
617 " \"key\":\"%s\","
618 " \"names\":["
619 " \"key1\""
620 " ]"
621 " }"
622 "]",
623 kServiceValue, kMethodValue, kTestKey))
624 .Build());
625 rls_server_->service_.SetResponse(
626 BuildRlsRequest({}),
627 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
628 // Request does not have header "key1", so kTestKey will not be added.
629 CheckRpcSendOk(DEBUG_LOCATION);
630 EXPECT_EQ(rls_server_->service_.request_count(), 1);
631 EXPECT_EQ(rls_server_->service_.response_count(), 1);
632 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
633 }
634
TEST_F(RlsEnd2endTest,WildcardMethod)635 TEST_F(RlsEnd2endTest, WildcardMethod) {
636 StartBackends(1);
637 SetNextResolution(MakeServiceConfigBuilder()
638 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
639 " \"service\":\"%s\""
640 "}],"
641 "\"headers\":["
642 " {"
643 " \"key\":\"%s\","
644 " \"names\":["
645 " \"key1\""
646 " ]"
647 " }"
648 "]",
649 kServiceValue, kTestKey))
650 .Build());
651 rls_server_->service_.SetResponse(
652 BuildRlsRequest({{kTestKey, kTestValue}}),
653 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
654 CheckRpcSendOk(DEBUG_LOCATION,
655 RpcOptions().set_metadata({{"key1", kTestValue}}));
656 EXPECT_EQ(rls_server_->service_.request_count(), 1);
657 EXPECT_EQ(rls_server_->service_.response_count(), 1);
658 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
659 }
660
TEST_F(RlsEnd2endTest,NoKeyBuilderForMethod)661 TEST_F(RlsEnd2endTest, NoKeyBuilderForMethod) {
662 StartBackends(1);
663 SetNextResolution(
664 MakeServiceConfigBuilder()
665 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
666 " \"service\":\"%s\","
667 " \"method\":\"some_other_method\""
668 "}],"
669 "\"headers\":["
670 " {"
671 " \"key\":\"%s\","
672 " \"names\":["
673 " \"key1\""
674 " ]"
675 " }"
676 "]",
677 kServiceValue, kTestKey))
678 .Build());
679 rls_server_->service_.SetResponse(
680 BuildRlsRequest({}),
681 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
682 CheckRpcSendOk(DEBUG_LOCATION);
683 EXPECT_EQ(rls_server_->service_.request_count(), 1);
684 EXPECT_EQ(rls_server_->service_.response_count(), 1);
685 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
686 }
687
TEST_F(RlsEnd2endTest,HeaderData)688 TEST_F(RlsEnd2endTest, HeaderData) {
689 const char* kHeaderData = "header_data";
690 StartBackends(1);
691 SetNextResolution(
692 MakeServiceConfigBuilder()
693 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
694 " \"service\":\"%s\","
695 " \"method\":\"%s\""
696 "}],"
697 "\"headers\":["
698 " {"
699 " \"key\":\"%s\","
700 " \"names\":["
701 " \"key1\""
702 " ]"
703 " }"
704 "]",
705 kServiceValue, kMethodValue, kTestKey))
706 .Build());
707 rls_server_->service_.SetResponse(
708 BuildRlsRequest({{kTestKey, kTestValue}}),
709 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)},
710 kHeaderData));
711 CheckRpcSendOk(DEBUG_LOCATION,
712 RpcOptions().set_metadata({{"key1", kTestValue}}));
713 EXPECT_EQ(rls_server_->service_.request_count(), 1);
714 EXPECT_EQ(rls_server_->service_.response_count(), 1);
715 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
716 EXPECT_THAT(backends_[0]->service_.rls_data(),
717 ::testing::ElementsAre(kHeaderData));
718 }
719
TEST_F(RlsEnd2endTest,ExtraKeysAndConstantKeys)720 TEST_F(RlsEnd2endTest, ExtraKeysAndConstantKeys) {
721 StartBackends(1);
722 SetNextResolution(
723 MakeServiceConfigBuilder()
724 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
725 " \"service\":\"%s\","
726 " \"method\":\"%s\""
727 "}],"
728 "\"headers\":["
729 " {"
730 " \"key\":\"%s\","
731 " \"names\":["
732 " \"key1\",\"key2\",\"key3\""
733 " ]"
734 " }"
735 "],"
736 "\"extraKeys\":{"
737 " \"host\":\"%s\","
738 " \"service\":\"%s\","
739 " \"method\":\"%s\""
740 "},"
741 "\"constantKeys\":{"
742 " \"%s\":\"%s\""
743 "}",
744 kServiceValue, kMethodValue, kTestKey,
745 kHostKey, kServiceKey, kMethodKey,
746 kConstantKey, kConstantValue))
747 .Build());
748 rls_server_->service_.SetResponse(
749 BuildRlsRequest({
750 {kTestKey, kTestValue},
751 {kHostKey, kServerName},
752 {kServiceKey, kServiceValue},
753 {kMethodKey, kMethodValue},
754 {kConstantKey, kConstantValue},
755 }),
756 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
757 CheckRpcSendOk(DEBUG_LOCATION,
758 RpcOptions().set_metadata({{"key1", kTestValue}}));
759 EXPECT_EQ(rls_server_->service_.request_count(), 1);
760 EXPECT_EQ(rls_server_->service_.response_count(), 1);
761 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
762 }
763
TEST_F(RlsEnd2endTest,TwoCacheEntriesWithSameTarget)764 TEST_F(RlsEnd2endTest, TwoCacheEntriesWithSameTarget) {
765 const char* kTestValue2 = "test_value2";
766 StartBackends(1);
767 SetNextResolution(
768 MakeServiceConfigBuilder()
769 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
770 " \"service\":\"%s\","
771 " \"method\":\"%s\""
772 "}],"
773 "\"headers\":["
774 " {"
775 " \"key\":\"%s\","
776 " \"names\":["
777 " \"key1\""
778 " ]"
779 " }"
780 "]",
781 kServiceValue, kMethodValue, kTestKey))
782 .Build());
783 rls_server_->service_.SetResponse(
784 BuildRlsRequest({{kTestKey, kTestValue}}),
785 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
786 rls_server_->service_.SetResponse(
787 BuildRlsRequest({{kTestKey, kTestValue2}}),
788 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
789 CheckRpcSendOk(DEBUG_LOCATION,
790 RpcOptions().set_metadata({{"key1", kTestValue}}));
791 EXPECT_EQ(rls_server_->service_.request_count(), 1);
792 EXPECT_EQ(rls_server_->service_.response_count(), 1);
793 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
794 CheckRpcSendOk(DEBUG_LOCATION,
795 RpcOptions().set_metadata({{"key1", kTestValue2}}));
796 EXPECT_EQ(rls_server_->service_.request_count(), 2);
797 EXPECT_EQ(rls_server_->service_.response_count(), 2);
798 EXPECT_EQ(backends_[0]->service_.request_count(), 2);
799 }
800
TEST_F(RlsEnd2endTest,FailedRlsRequestWithoutDefaultTarget)801 TEST_F(RlsEnd2endTest, FailedRlsRequestWithoutDefaultTarget) {
802 StartBackends(1);
803 SetNextResolution(
804 MakeServiceConfigBuilder()
805 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
806 " \"service\":\"%s\","
807 " \"method\":\"%s\""
808 "}],"
809 "\"headers\":["
810 " {"
811 " \"key\":\"%s\","
812 " \"names\":["
813 " \"key1\""
814 " ]"
815 " }"
816 "]",
817 kServiceValue, kMethodValue, kTestKey))
818 .Build());
819 // The test below has one RLS RPC fail and then a subsequent one that
820 // should succeed. However, once the first RPC fails, the adaptive
821 // throttling code will throttle the second RPC with about 11% probability,
822 // which would cause the test to be flaky. To avoid that, we seed the
823 // throttling state by sending two successful RPCs before we start the
824 // real test, which ensures that the second RPC of the real test will
825 // not be throttled (with 3 successes and 1 failure, the throttling
826 // probability will be negative, so the subsequent request will never be
827 // throttled).
828 const char* kTestValue2 = "test_value_2";
829 const char* kTestValue3 = "test_value_3";
830 rls_server_->service_.SetResponse(
831 BuildRlsRequest({{kTestKey, kTestValue2}}),
832 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
833 rls_server_->service_.SetResponse(
834 BuildRlsRequest({{kTestKey, kTestValue3}}),
835 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
836 CheckRpcSendOk(DEBUG_LOCATION,
837 RpcOptions().set_metadata({{"key1", kTestValue2}}));
838 CheckRpcSendOk(DEBUG_LOCATION,
839 RpcOptions().set_metadata({{"key1", kTestValue3}}));
840 // Now start the real test.
841 // Send an RPC before we give the RLS server a response.
842 // The RLS request will fail, and thus so will the data plane RPC.
843 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
844 "RLS request failed: INTERNAL: no response entry",
845 RpcOptions().set_metadata({{"key1", kTestValue}}));
846 EXPECT_THAT(
847 rls_server_->service_.GetUnmatchedRequests(),
848 ::testing::ElementsAre(
849 // TODO(roth): Change this to use ::testing::ProtoEquals()
850 // once that becomes available in OSS.
851 ::testing::Property(
852 &RouteLookupRequest::DebugString,
853 BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
854 // Now give the RLS server the right response.
855 rls_server_->service_.SetResponse(
856 BuildRlsRequest({{kTestKey, kTestValue}}),
857 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
858 // Sleep long enough for backoff to elapse, then try another RPC.
859 gpr_sleep_until(grpc_timeout_seconds_to_deadline(3));
860 CheckRpcSendOk(DEBUG_LOCATION,
861 RpcOptions().set_metadata({{"key1", kTestValue}}));
862 EXPECT_EQ(rls_server_->service_.request_count(), 4);
863 EXPECT_EQ(rls_server_->service_.response_count(), 3);
864 EXPECT_EQ(backends_[0]->service_.request_count(), 3);
865 }
866
TEST_F(RlsEnd2endTest,FailedRlsRequestWithDefaultTarget)867 TEST_F(RlsEnd2endTest, FailedRlsRequestWithDefaultTarget) {
868 StartBackends(1);
869 SetNextResolution(
870 MakeServiceConfigBuilder()
871 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
872 " \"service\":\"%s\","
873 " \"method\":\"%s\""
874 "}],"
875 "\"headers\":["
876 " {"
877 " \"key\":\"%s\","
878 " \"names\":["
879 " \"key1\""
880 " ]"
881 " }"
882 "]",
883 kServiceValue, kMethodValue, kTestKey))
884 .set_default_target(grpc_core::LocalIpUri(backends_[0]->port_))
885 .Build());
886 // Don't give the RLS server a response, so the RLS request will fail.
887 // The data plane RPC should be sent to the default target.
888 CheckRpcSendOk(DEBUG_LOCATION,
889 RpcOptions().set_metadata({{"key1", kTestValue}}));
890 EXPECT_THAT(
891 rls_server_->service_.GetUnmatchedRequests(),
892 ::testing::ElementsAre(
893 // TODO(roth): Change this to use ::testing::ProtoEquals()
894 // once that becomes available in OSS.
895 ::testing::Property(
896 &RouteLookupRequest::DebugString,
897 BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
898 EXPECT_EQ(rls_server_->service_.request_count(), 1);
899 EXPECT_EQ(rls_server_->service_.response_count(), 0);
900 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
901 }
902
TEST_F(RlsEnd2endTest,RlsRequestTimeout)903 TEST_F(RlsEnd2endTest, RlsRequestTimeout) {
904 StartBackends(2);
905 SetNextResolution(
906 MakeServiceConfigBuilder()
907 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
908 " \"service\":\"%s\","
909 " \"method\":\"%s\""
910 "}],"
911 "\"headers\":["
912 " {"
913 " \"key\":\"%s\","
914 " \"names\":["
915 " \"key1\""
916 " ]"
917 " }"
918 "]",
919 kServiceValue, kMethodValue, kTestKey))
920 .set_default_target(grpc_core::LocalIpUri(backends_[1]->port_))
921 .set_lookup_service_timeout(grpc_core::Duration::Seconds(2))
922 .Build());
923 // RLS server will send a response, but it takes longer than the
924 // timeout set in the LB policy config.
925 rls_server_->service_.SetResponse(
926 BuildRlsRequest({{kTestKey, kTestValue}}),
927 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}),
928 /*response_delay=*/grpc_core::Duration::Seconds(3));
929 // The data plane RPC should be sent to the default target.
930 CheckRpcSendOk(DEBUG_LOCATION,
931 RpcOptions().set_metadata({{"key1", kTestValue}}));
932 EXPECT_EQ(rls_server_->service_.request_count(), 1);
933 EXPECT_EQ(backends_[0]->service_.request_count(), 0);
934 EXPECT_EQ(backends_[1]->service_.request_count(), 1);
935 }
936
TEST_F(RlsEnd2endTest,UpdateConfig)937 TEST_F(RlsEnd2endTest, UpdateConfig) {
938 StartBackends(2);
939 auto service_config_builder =
940 MakeServiceConfigBuilder()
941 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
942 " \"service\":\"%s\","
943 " \"method\":\"%s\""
944 "}],"
945 "\"headers\":["
946 " {"
947 " \"key\":\"%s\","
948 " \"names\":["
949 " \"key1\""
950 " ]"
951 " }"
952 "]",
953 kServiceValue, kMethodValue, kTestKey))
954 .set_default_target(grpc_core::LocalIpUri(backends_[0]->port_));
955 SetNextResolution(service_config_builder.Build());
956 // Don't give the RLS server a response, so the RLS request will fail.
957 // The data plane RPC should be sent to the default target.
958 CheckRpcSendOk(DEBUG_LOCATION,
959 RpcOptions().set_metadata({{"key1", kTestValue}}));
960 EXPECT_THAT(
961 rls_server_->service_.GetUnmatchedRequests(),
962 ::testing::ElementsAre(
963 // TODO(roth): Change this to use ::testing::ProtoEquals()
964 // once that becomes available in OSS.
965 ::testing::Property(
966 &RouteLookupRequest::DebugString,
967 BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
968 EXPECT_EQ(rls_server_->service_.request_count(), 1);
969 EXPECT_EQ(rls_server_->service_.response_count(), 0);
970 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
971 EXPECT_EQ(backends_[1]->service_.request_count(), 0);
972 // Now update the config to point to a new default target.
973 service_config_builder.set_default_target(
974 grpc_core::LocalIpUri(backends_[1]->port_));
975 SetNextResolution(service_config_builder.Build());
976 // Send another RPC, which should go to the new default target.
977 // The RLS server will *not* see another request, because the cache
978 // entry is still in backoff.
979 CheckRpcSendOk(DEBUG_LOCATION,
980 RpcOptions().set_metadata({{"key1", kTestValue}}));
981 EXPECT_EQ(rls_server_->service_.request_count(), 1);
982 EXPECT_EQ(rls_server_->service_.response_count(), 0);
983 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
984 EXPECT_EQ(backends_[1]->service_.request_count(), 1);
985 }
986
TEST_F(RlsEnd2endTest,CachedResponse)987 TEST_F(RlsEnd2endTest, CachedResponse) {
988 StartBackends(1);
989 SetNextResolution(
990 MakeServiceConfigBuilder()
991 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
992 " \"service\":\"%s\","
993 " \"method\":\"%s\""
994 "}],"
995 "\"headers\":["
996 " {"
997 " \"key\":\"%s\","
998 " \"names\":["
999 " \"key1\""
1000 " ]"
1001 " }"
1002 "]",
1003 kServiceValue, kMethodValue, kTestKey))
1004 .Build());
1005 rls_server_->service_.SetResponse(
1006 BuildRlsRequest({{kTestKey, kTestValue}}),
1007 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1008 // Send two RPCs.
1009 CheckRpcSendOk(DEBUG_LOCATION,
1010 RpcOptions().set_metadata({{"key1", kTestValue}}));
1011 CheckRpcSendOk(DEBUG_LOCATION,
1012 RpcOptions().set_metadata({{"key1", kTestValue}}));
1013 // The RLS server should have seen only one request.
1014 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1015 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1016 EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1017 }
1018
TEST_F(RlsEnd2endTest,StaleCacheEntry)1019 TEST_F(RlsEnd2endTest, StaleCacheEntry) {
1020 StartBackends(1);
1021 SetNextResolution(
1022 MakeServiceConfigBuilder()
1023 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1024 " \"service\":\"%s\","
1025 " \"method\":\"%s\""
1026 "}],"
1027 "\"headers\":["
1028 " {"
1029 " \"key\":\"%s\","
1030 " \"names\":["
1031 " \"key1\""
1032 " ]"
1033 " }"
1034 "]",
1035 kServiceValue, kMethodValue, kTestKey))
1036 .set_max_age(grpc_core::Duration::Seconds(5))
1037 .set_stale_age(grpc_core::Duration::Seconds(1))
1038 .Build());
1039 rls_server_->service_.SetResponse(
1040 BuildRlsRequest({{kTestKey, kTestValue}}),
1041 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1042 // Send one RPC. RLS server gets a request, and RPC goes to backend.
1043 CheckRpcSendOk(DEBUG_LOCATION,
1044 RpcOptions().set_metadata({{"key1", kTestValue}}));
1045 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1046 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1047 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1048 // Update RLS server to expect stale request.
1049 rls_server_->service_.RemoveResponse(
1050 BuildRlsRequest({{kTestKey, kTestValue}}));
1051 rls_server_->service_.SetResponse(
1052 BuildRlsRequest({{kTestKey, kTestValue}},
1053 RouteLookupRequest::REASON_STALE),
1054 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1055 // Wait longer than stale age.
1056 gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1057 // Send another RPC. This should use the stale value but should
1058 // dispatch a second RLS request.
1059 CheckRpcSendOk(DEBUG_LOCATION,
1060 RpcOptions().set_metadata({{"key1", kTestValue}}));
1061 EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1062 // Wait for RLS server to receive the second request.
1063 gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1064 EXPECT_EQ(rls_server_->service_.request_count(), 2);
1065 EXPECT_EQ(rls_server_->service_.response_count(), 2);
1066 }
1067
TEST_F(RlsEnd2endTest,StaleCacheEntryWithHeaderData)1068 TEST_F(RlsEnd2endTest, StaleCacheEntryWithHeaderData) {
1069 const char* kHeaderData = "header_data";
1070 StartBackends(1);
1071 SetNextResolution(
1072 MakeServiceConfigBuilder()
1073 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1074 " \"service\":\"%s\","
1075 " \"method\":\"%s\""
1076 "}],"
1077 "\"headers\":["
1078 " {"
1079 " \"key\":\"%s\","
1080 " \"names\":["
1081 " \"key1\""
1082 " ]"
1083 " }"
1084 "]",
1085 kServiceValue, kMethodValue, kTestKey))
1086 .set_max_age(grpc_core::Duration::Seconds(5))
1087 .set_stale_age(grpc_core::Duration::Seconds(1))
1088 .Build());
1089 rls_server_->service_.SetResponse(
1090 BuildRlsRequest({{kTestKey, kTestValue}}),
1091 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)},
1092 kHeaderData));
1093 // Send one RPC. RLS server gets a request, and RPC goes to backend.
1094 CheckRpcSendOk(DEBUG_LOCATION,
1095 RpcOptions().set_metadata({{"key1", kTestValue}}));
1096 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1097 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1098 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1099 // Update RLS server to expect stale request.
1100 rls_server_->service_.RemoveResponse(
1101 BuildRlsRequest({{kTestKey, kTestValue}}));
1102 rls_server_->service_.SetResponse(
1103 BuildRlsRequest({{kTestKey, kTestValue}},
1104 RouteLookupRequest::REASON_STALE, kHeaderData),
1105 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)},
1106 kHeaderData));
1107 // Wait longer than stale age.
1108 gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1109 // Send another RPC. This should use the stale value but should
1110 // dispatch a second RLS request.
1111 CheckRpcSendOk(DEBUG_LOCATION,
1112 RpcOptions().set_metadata({{"key1", kTestValue}}));
1113 EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1114 // Wait for RLS server to receive the second request.
1115 gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1116 EXPECT_EQ(rls_server_->service_.request_count(), 2);
1117 EXPECT_EQ(rls_server_->service_.response_count(), 2);
1118 }
1119
TEST_F(RlsEnd2endTest,ExpiredCacheEntry)1120 TEST_F(RlsEnd2endTest, ExpiredCacheEntry) {
1121 StartBackends(1);
1122 SetNextResolution(
1123 MakeServiceConfigBuilder()
1124 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1125 " \"service\":\"%s\","
1126 " \"method\":\"%s\""
1127 "}],"
1128 "\"headers\":["
1129 " {"
1130 " \"key\":\"%s\","
1131 " \"names\":["
1132 " \"key1\""
1133 " ]"
1134 " }"
1135 "]",
1136 kServiceValue, kMethodValue, kTestKey))
1137 .set_max_age(grpc_core::Duration::Seconds(1))
1138 .set_lookup_service_timeout(grpc_core::Duration::Seconds(1))
1139 .Build());
1140 rls_server_->service_.SetResponse(
1141 BuildRlsRequest({{kTestKey, kTestValue}}),
1142 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1143 // Send one RPC. RLS server gets a request, and RPC goes to backend.
1144 CheckRpcSendOk(DEBUG_LOCATION,
1145 RpcOptions().set_metadata({{"key1", kTestValue}}));
1146 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1147 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1148 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1149 // Remove response from RLS server so that the next RLS request fails.
1150 rls_server_->service_.RemoveResponse(
1151 BuildRlsRequest({{kTestKey, kTestValue}}));
1152 // Wait for cache to be expired.
1153 gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1154 // Send another RPC. This should trigger a second RLS request, but
1155 // that fails, so the RPC fails.
1156 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1157 "RLS request failed: INTERNAL: no response entry",
1158 RpcOptions().set_metadata({{"key1", kTestValue}}));
1159 EXPECT_EQ(rls_server_->service_.request_count(), 2);
1160 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1161 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1162 }
1163
TEST_F(RlsEnd2endTest,CacheSizeLimit)1164 TEST_F(RlsEnd2endTest, CacheSizeLimit) {
1165 const char* kTestValue2 = "test_value_2";
1166 StartBackends(2);
1167 SetNextResolution(
1168 MakeServiceConfigBuilder()
1169 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1170 " \"service\":\"%s\","
1171 " \"method\":\"%s\""
1172 "}],"
1173 "\"headers\":["
1174 " {"
1175 " \"key\":\"%s\","
1176 " \"names\":["
1177 " \"key1\""
1178 " ]"
1179 " }"
1180 "]",
1181 kServiceValue, kMethodValue,
1182 kTestKey))
1183 .set_cache_size_bytes(1) // Not even big enough for one entry.
1184 .Build());
1185 // Set RLS responses for both kTestValue and kTestValue2.
1186 rls_server_->service_.SetResponse(
1187 BuildRlsRequest({{kTestKey, kTestValue}}),
1188 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1189 rls_server_->service_.SetResponse(
1190 BuildRlsRequest({{kTestKey, kTestValue2}}),
1191 BuildRlsResponse({grpc_core::LocalIpUri(backends_[1]->port_)}));
1192 // Send an RPC for kTestValue.
1193 // RLS server gets a request, and RPC goes to backend.
1194 CheckRpcSendOk(DEBUG_LOCATION,
1195 RpcOptions().set_metadata({{"key1", kTestValue}}));
1196 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1197 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1198 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1199 EXPECT_EQ(backends_[1]->service_.request_count(), 0);
1200 // A second RPC for kTestValue should not generate another RLS
1201 // request, because the cache entry is held by min_eviction_time.
1202 CheckRpcSendOk(DEBUG_LOCATION,
1203 RpcOptions().set_metadata({{"key1", kTestValue}}));
1204 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1205 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1206 EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1207 EXPECT_EQ(backends_[1]->service_.request_count(), 0);
1208 // Wait for min_eviction_time to elapse.
1209 gpr_sleep_until(grpc_timeout_seconds_to_deadline(6));
1210 // Send a request for kTestValue2.
1211 // RLS server gets a request, and RPC goes to backend.
1212 // This causes the entry for kTestValue to be evicted.
1213 CheckRpcSendOk(DEBUG_LOCATION,
1214 RpcOptions().set_metadata({{"key1", kTestValue2}}));
1215 EXPECT_EQ(rls_server_->service_.request_count(), 2);
1216 EXPECT_EQ(rls_server_->service_.response_count(), 2);
1217 EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1218 EXPECT_EQ(backends_[1]->service_.request_count(), 1);
1219 // Send another RPC for kTestValue.
1220 // This should now trigger a new RLS request.
1221 CheckRpcSendOk(DEBUG_LOCATION,
1222 RpcOptions().set_metadata({{"key1", kTestValue}}));
1223 EXPECT_EQ(rls_server_->service_.request_count(), 3);
1224 EXPECT_EQ(rls_server_->service_.response_count(), 3);
1225 EXPECT_EQ(backends_[0]->service_.request_count(), 3);
1226 EXPECT_EQ(backends_[1]->service_.request_count(), 1);
1227 // Another RPC for kTestValue2 should still work due to min_eviction_time.
1228 CheckRpcSendOk(DEBUG_LOCATION,
1229 RpcOptions().set_metadata({{"key1", kTestValue2}}));
1230 EXPECT_EQ(rls_server_->service_.request_count(), 3);
1231 EXPECT_EQ(rls_server_->service_.response_count(), 3);
1232 EXPECT_EQ(backends_[0]->service_.request_count(), 3);
1233 EXPECT_EQ(backends_[1]->service_.request_count(), 2);
1234 }
1235
TEST_F(RlsEnd2endTest,MultipleTargets)1236 TEST_F(RlsEnd2endTest, MultipleTargets) {
1237 StartBackends(1);
1238 SetNextResolution(
1239 MakeServiceConfigBuilder()
1240 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1241 " \"service\":\"%s\","
1242 " \"method\":\"%s\""
1243 "}],"
1244 "\"headers\":["
1245 " {"
1246 " \"key\":\"%s\","
1247 " \"names\":["
1248 " \"key1\""
1249 " ]"
1250 " }"
1251 "]",
1252 kServiceValue, kMethodValue, kTestKey))
1253 .Build());
1254 rls_server_->service_.SetResponse(
1255 BuildRlsRequest({{kTestKey, kTestValue}}),
1256 BuildRlsResponse(
1257 // Second target will report TRANSIENT_FAILURE, but should
1258 // never be used.
1259 {grpc_core::LocalIpUri(backends_[0]->port_), "invalid_target"}));
1260 CheckRpcSendOk(DEBUG_LOCATION,
1261 RpcOptions().set_metadata({{"key1", kTestValue}}));
1262 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1263 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1264 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1265 }
1266
TEST_F(RlsEnd2endTest,MultipleTargetsFirstInTransientFailure)1267 TEST_F(RlsEnd2endTest, MultipleTargetsFirstInTransientFailure) {
1268 StartBackends(1);
1269 SetNextResolution(
1270 MakeServiceConfigBuilder()
1271 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1272 " \"service\":\"%s\","
1273 " \"method\":\"%s\""
1274 "}],"
1275 "\"headers\":["
1276 " {"
1277 " \"key\":\"%s\","
1278 " \"names\":["
1279 " \"key1\""
1280 " ]"
1281 " }"
1282 "]",
1283 kServiceValue, kMethodValue, kTestKey))
1284 .Build());
1285 rls_server_->service_.SetResponse(
1286 BuildRlsRequest({{kTestKey, kTestValue}}),
1287 BuildRlsResponse(
1288 // First target will report TRANSIENT_FAILURE.
1289 {"invalid_target", grpc_core::LocalIpUri(backends_[0]->port_)}));
1290 CheckRpcSendOk(DEBUG_LOCATION,
1291 RpcOptions().set_metadata({{"key1", kTestValue}}));
1292 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1293 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1294 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1295 }
1296
TEST_F(RlsEnd2endTest,ConnectivityStateReady)1297 TEST_F(RlsEnd2endTest, ConnectivityStateReady) {
1298 StartBackends(1);
1299 SetNextResolution(
1300 MakeServiceConfigBuilder()
1301 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1302 " \"service\":\"%s\","
1303 " \"method\":\"%s\""
1304 "}],"
1305 "\"headers\":["
1306 " {"
1307 " \"key\":\"%s\","
1308 " \"names\":["
1309 " \"key1\""
1310 " ]"
1311 " }"
1312 "]",
1313 kServiceValue, kMethodValue, kTestKey))
1314 .Build());
1315 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
1316 rls_server_->service_.SetResponse(
1317 BuildRlsRequest({{kTestKey, kTestValue}}),
1318 BuildRlsResponse(
1319 // One target in TRANSIENT_FAILURE, the other in READY.
1320 {"invalid_target", grpc_core::LocalIpUri(backends_[0]->port_)}));
1321 CheckRpcSendOk(DEBUG_LOCATION,
1322 RpcOptions().set_metadata({{"key1", kTestValue}}));
1323 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1324 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1325 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1326 EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(/*try_to_connect=*/false));
1327 }
1328
TEST_F(RlsEnd2endTest,ConnectivityStateIdle)1329 TEST_F(RlsEnd2endTest, ConnectivityStateIdle) {
1330 SetNextResolution(
1331 MakeServiceConfigBuilder()
1332 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1333 " \"service\":\"%s\","
1334 " \"method\":\"%s\""
1335 "}],"
1336 "\"headers\":["
1337 " {"
1338 " \"key\":\"%s\","
1339 " \"names\":["
1340 " \"key1\""
1341 " ]"
1342 " }"
1343 "]",
1344 kServiceValue, kMethodValue, kTestKey))
1345 .Build());
1346 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
1347 // RLS server not given any responses, so the request will fail.
1348 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1349 "RLS request failed: INTERNAL: no response entry");
1350 // No child policies, so should be IDLE.
1351 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
1352 }
1353
TEST_F(RlsEnd2endTest,ConnectivityStateTransientFailure)1354 TEST_F(RlsEnd2endTest, ConnectivityStateTransientFailure) {
1355 SetNextResolution(
1356 MakeServiceConfigBuilder()
1357 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1358 " \"service\":\"%s\","
1359 " \"method\":\"%s\""
1360 "}],"
1361 "\"headers\":["
1362 " {"
1363 " \"key\":\"%s\","
1364 " \"names\":["
1365 " \"key1\""
1366 " ]"
1367 " }"
1368 "]",
1369 kServiceValue, kMethodValue, kTestKey))
1370 .Build());
1371 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
1372 rls_server_->service_.SetResponse(BuildRlsRequest({{kTestKey, kTestValue}}),
1373 BuildRlsResponse({"invalid_target"}));
1374 CheckRpcSendFailure(
1375 DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1376 "empty address list (no address in fixed_address_lb policy)",
1377 RpcOptions().set_metadata({{"key1", kTestValue}}));
1378 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1379 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1380 EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE,
1381 channel_->GetState(/*try_to_connect=*/false));
1382 }
1383
1384 class RlsMetricsEnd2endTest : public RlsEnd2endTest {
1385 protected:
SetUp()1386 void SetUp() override {
1387 // Register stats plugin before initializing client.
1388 stats_plugin_ = grpc_core::FakeStatsPluginBuilder()
1389 .UseDisabledByDefaultMetrics(true)
1390 .BuildAndRegister();
1391 RlsEnd2endTest::SetUp();
1392 }
1393
1394 std::shared_ptr<grpc_core::FakeStatsPlugin> stats_plugin_;
1395 };
1396
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionDefaultTargetPicks)1397 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionDefaultTargetPicks) {
1398 const auto* descriptor =
1399 grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1400 "grpc.lb.rls.default_target_picks");
1401 ASSERT_NE(descriptor, nullptr);
1402 EXPECT_EQ(descriptor->value_type,
1403 grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64);
1404 EXPECT_EQ(descriptor->instrument_type,
1405 grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCounter);
1406 EXPECT_EQ(descriptor->enable_by_default, false);
1407 EXPECT_EQ(descriptor->name, "grpc.lb.rls.default_target_picks");
1408 EXPECT_EQ(descriptor->unit, "{pick}");
1409 EXPECT_THAT(descriptor->label_keys,
1410 ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target",
1411 "grpc.lb.rls.data_plane_target",
1412 "grpc.lb.pick_result"));
1413 EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1414 }
1415
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionTargetPicks)1416 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionTargetPicks) {
1417 const auto* descriptor =
1418 grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1419 "grpc.lb.rls.target_picks");
1420 ASSERT_NE(descriptor, nullptr);
1421 EXPECT_EQ(descriptor->value_type,
1422 grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64);
1423 EXPECT_EQ(descriptor->instrument_type,
1424 grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCounter);
1425 EXPECT_EQ(descriptor->enable_by_default, false);
1426 EXPECT_EQ(descriptor->name, "grpc.lb.rls.target_picks");
1427 EXPECT_EQ(descriptor->unit, "{pick}");
1428 EXPECT_THAT(descriptor->label_keys,
1429 ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target",
1430 "grpc.lb.rls.data_plane_target",
1431 "grpc.lb.pick_result"));
1432 EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1433 }
1434
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionFailedPicks)1435 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionFailedPicks) {
1436 const auto* descriptor =
1437 grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1438 "grpc.lb.rls.failed_picks");
1439 ASSERT_NE(descriptor, nullptr);
1440 EXPECT_EQ(descriptor->value_type,
1441 grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64);
1442 EXPECT_EQ(descriptor->instrument_type,
1443 grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCounter);
1444 EXPECT_EQ(descriptor->enable_by_default, false);
1445 EXPECT_EQ(descriptor->name, "grpc.lb.rls.failed_picks");
1446 EXPECT_EQ(descriptor->unit, "{pick}");
1447 EXPECT_THAT(
1448 descriptor->label_keys,
1449 ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target"));
1450 EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1451 }
1452
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionCacheEntries)1453 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionCacheEntries) {
1454 const auto* descriptor =
1455 grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1456 "grpc.lb.rls.cache_entries");
1457 ASSERT_NE(descriptor, nullptr);
1458 EXPECT_EQ(descriptor->value_type,
1459 grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64);
1460 EXPECT_EQ(
1461 descriptor->instrument_type,
1462 grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge);
1463 EXPECT_EQ(descriptor->enable_by_default, false);
1464 EXPECT_EQ(descriptor->name, "grpc.lb.rls.cache_entries");
1465 EXPECT_EQ(descriptor->unit, "{entry}");
1466 EXPECT_THAT(descriptor->label_keys,
1467 ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target",
1468 "grpc.lb.rls.instance_uuid"));
1469 EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1470 }
1471
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionCacheSize)1472 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionCacheSize) {
1473 const auto* descriptor =
1474 grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1475 "grpc.lb.rls.cache_size");
1476 ASSERT_NE(descriptor, nullptr);
1477 EXPECT_EQ(descriptor->value_type,
1478 grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64);
1479 EXPECT_EQ(
1480 descriptor->instrument_type,
1481 grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge);
1482 EXPECT_EQ(descriptor->enable_by_default, false);
1483 EXPECT_EQ(descriptor->name, "grpc.lb.rls.cache_size");
1484 EXPECT_EQ(descriptor->unit, "By");
1485 EXPECT_THAT(descriptor->label_keys,
1486 ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target",
1487 "grpc.lb.rls.instance_uuid"));
1488 EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1489 }
1490
TEST_F(RlsMetricsEnd2endTest,MetricValues)1491 TEST_F(RlsMetricsEnd2endTest, MetricValues) {
1492 auto kMetricTargetPicks =
1493 grpc_core::GlobalInstrumentsRegistryTestPeer::
1494 FindUInt64CounterHandleByName("grpc.lb.rls.target_picks")
1495 .value();
1496 auto kMetricFailedPicks =
1497 grpc_core::GlobalInstrumentsRegistryTestPeer::
1498 FindUInt64CounterHandleByName("grpc.lb.rls.failed_picks")
1499 .value();
1500 auto kMetricCacheEntries =
1501 grpc_core::GlobalInstrumentsRegistryTestPeer::
1502 FindCallbackInt64GaugeHandleByName("grpc.lb.rls.cache_entries")
1503 .value();
1504 auto kMetricCacheSize =
1505 grpc_core::GlobalInstrumentsRegistryTestPeer::
1506 FindCallbackInt64GaugeHandleByName("grpc.lb.rls.cache_size")
1507 .value();
1508 StartBackends(2);
1509 SetNextResolution(
1510 MakeServiceConfigBuilder()
1511 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1512 " \"service\":\"%s\","
1513 " \"method\":\"%s\""
1514 "}],"
1515 "\"headers\":["
1516 " {"
1517 " \"key\":\"%s\","
1518 " \"names\":["
1519 " \"key1\""
1520 " ]"
1521 " }"
1522 "]",
1523 kServiceValue, kMethodValue, kTestKey))
1524 .Build());
1525 const std::string rls_target0 = grpc_core::LocalIpUri(backends_[0]->port_);
1526 const std::string rls_target1 = grpc_core::LocalIpUri(backends_[1]->port_);
1527 // Send an RPC to the target for backend 0.
1528 rls_server_->service_.SetResponse(BuildRlsRequest({{kTestKey, rls_target0}}),
1529 BuildRlsResponse({rls_target0}));
1530 CheckRpcSendOk(DEBUG_LOCATION,
1531 RpcOptions().set_metadata({{"key1", rls_target0}}));
1532 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1533 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1534 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1535 EXPECT_EQ(backends_[1]->service_.request_count(), 0);
1536 // Check exported metrics.
1537 EXPECT_THAT(
1538 stats_plugin_->GetUInt64CounterValue(
1539 kMetricTargetPicks,
1540 {target_uri_, rls_server_target_, rls_target0, "complete"}, {}),
1541 ::testing::Optional(1));
1542 EXPECT_THAT(
1543 stats_plugin_->GetUInt64CounterValue(
1544 kMetricTargetPicks,
1545 {target_uri_, rls_server_target_, rls_target1, "complete"}, {}),
1546 absl::nullopt);
1547 EXPECT_EQ(stats_plugin_->GetUInt64CounterValue(
1548 kMetricFailedPicks, {target_uri_, rls_server_target_}, {}),
1549 absl::nullopt);
1550 stats_plugin_->TriggerCallbacks();
1551 EXPECT_THAT(stats_plugin_->GetInt64CallbackGaugeValue(
1552 kMetricCacheEntries,
1553 {target_uri_, rls_server_target_, kRlsInstanceUuid}, {}),
1554 ::testing::Optional(1));
1555 auto cache_size = stats_plugin_->GetInt64CallbackGaugeValue(
1556 kMetricCacheSize, {target_uri_, rls_server_target_, kRlsInstanceUuid},
1557 {});
1558 EXPECT_THAT(cache_size, ::testing::Optional(::testing::Ge(1)));
1559 // Send an RPC to the target for backend 1.
1560 rls_server_->service_.SetResponse(BuildRlsRequest({{kTestKey, rls_target1}}),
1561 BuildRlsResponse({rls_target1}));
1562 CheckRpcSendOk(DEBUG_LOCATION,
1563 RpcOptions().set_metadata({{"key1", rls_target1}}));
1564 EXPECT_EQ(rls_server_->service_.request_count(), 2);
1565 EXPECT_EQ(rls_server_->service_.response_count(), 2);
1566 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1567 EXPECT_EQ(backends_[1]->service_.request_count(), 1);
1568 // Check exported metrics.
1569 EXPECT_THAT(
1570 stats_plugin_->GetUInt64CounterValue(
1571 kMetricTargetPicks,
1572 {target_uri_, rls_server_target_, rls_target0, "complete"}, {}),
1573 ::testing::Optional(1));
1574 EXPECT_THAT(
1575 stats_plugin_->GetUInt64CounterValue(
1576 kMetricTargetPicks,
1577 {target_uri_, rls_server_target_, rls_target1, "complete"}, {}),
1578 ::testing::Optional(1));
1579 EXPECT_EQ(stats_plugin_->GetUInt64CounterValue(
1580 kMetricFailedPicks, {target_uri_, rls_server_target_}, {}),
1581 absl::nullopt);
1582 stats_plugin_->TriggerCallbacks();
1583 EXPECT_THAT(stats_plugin_->GetInt64CallbackGaugeValue(
1584 kMetricCacheEntries,
1585 {target_uri_, rls_server_target_, kRlsInstanceUuid}, {}),
1586 ::testing::Optional(2));
1587 auto cache_size2 = stats_plugin_->GetInt64CallbackGaugeValue(
1588 kMetricCacheSize, {target_uri_, rls_server_target_, kRlsInstanceUuid},
1589 {});
1590 EXPECT_THAT(cache_size2, ::testing::Optional(::testing::Ge(2)));
1591 if (cache_size.has_value() && cache_size2.has_value()) {
1592 EXPECT_GT(*cache_size2, *cache_size);
1593 }
1594 // Send an RPC for which the RLS server has no response, which means
1595 // that the RLS request will fail. There is no default target, so the
1596 // data plane RPC will fail.
1597 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1598 "RLS request failed: INTERNAL: no response entry",
1599 RpcOptions().set_metadata({{"key1", kTestValue}}));
1600 EXPECT_THAT(
1601 rls_server_->service_.GetUnmatchedRequests(),
1602 ::testing::ElementsAre(
1603 // TODO(roth): Change this to use ::testing::ProtoEquals()
1604 // once that becomes available in OSS.
1605 ::testing::Property(
1606 &RouteLookupRequest::DebugString,
1607 BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
1608 EXPECT_EQ(rls_server_->service_.request_count(), 3);
1609 EXPECT_EQ(rls_server_->service_.response_count(), 2);
1610 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1611 EXPECT_EQ(backends_[1]->service_.request_count(), 1);
1612 // Check exported metrics.
1613 EXPECT_THAT(
1614 stats_plugin_->GetUInt64CounterValue(
1615 kMetricTargetPicks,
1616 {target_uri_, rls_server_target_, rls_target0, "complete"}, {}),
1617 ::testing::Optional(1));
1618 EXPECT_THAT(
1619 stats_plugin_->GetUInt64CounterValue(
1620 kMetricTargetPicks,
1621 {target_uri_, rls_server_target_, rls_target1, "complete"}, {}),
1622 ::testing::Optional(1));
1623 EXPECT_THAT(stats_plugin_->GetUInt64CounterValue(
1624 kMetricFailedPicks, {target_uri_, rls_server_target_}, {}),
1625 ::testing::Optional(1));
1626 stats_plugin_->TriggerCallbacks();
1627 EXPECT_THAT(stats_plugin_->GetInt64CallbackGaugeValue(
1628 kMetricCacheEntries,
1629 {target_uri_, rls_server_target_, kRlsInstanceUuid}, {}),
1630 ::testing::Optional(3));
1631 auto cache_size3 = stats_plugin_->GetInt64CallbackGaugeValue(
1632 kMetricCacheSize, {target_uri_, rls_server_target_, kRlsInstanceUuid},
1633 {});
1634 EXPECT_THAT(cache_size3, ::testing::Optional(::testing::Ge(3)));
1635 if (cache_size.has_value() && cache_size3.has_value()) {
1636 EXPECT_GT(*cache_size3, *cache_size);
1637 }
1638 }
1639
TEST_F(RlsMetricsEnd2endTest,MetricValuesDefaultTargetRpcs)1640 TEST_F(RlsMetricsEnd2endTest, MetricValuesDefaultTargetRpcs) {
1641 auto kMetricDefaultTargetPicks =
1642 grpc_core::GlobalInstrumentsRegistryTestPeer::
1643 FindUInt64CounterHandleByName("grpc.lb.rls.default_target_picks")
1644 .value();
1645 StartBackends(1);
1646 const std::string default_target = grpc_core::LocalIpUri(backends_[0]->port_);
1647 SetNextResolution(
1648 MakeServiceConfigBuilder()
1649 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1650 " \"service\":\"%s\","
1651 " \"method\":\"%s\""
1652 "}],"
1653 "\"headers\":["
1654 " {"
1655 " \"key\":\"%s\","
1656 " \"names\":["
1657 " \"key1\""
1658 " ]"
1659 " }"
1660 "]",
1661 kServiceValue, kMethodValue, kTestKey))
1662 .set_default_target(default_target)
1663 .Build());
1664 // Don't give the RLS server a response, so the RLS request will fail.
1665 // The data plane RPC should be sent to the default target.
1666 CheckRpcSendOk(DEBUG_LOCATION,
1667 RpcOptions().set_metadata({{"key1", kTestValue}}));
1668 EXPECT_THAT(
1669 rls_server_->service_.GetUnmatchedRequests(),
1670 ::testing::ElementsAre(
1671 // TODO(roth): Change this to use ::testing::ProtoEquals()
1672 // once that becomes available in OSS.
1673 ::testing::Property(
1674 &RouteLookupRequest::DebugString,
1675 BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
1676 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1677 EXPECT_EQ(rls_server_->service_.response_count(), 0);
1678 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1679 // Check expected metrics.
1680 EXPECT_THAT(
1681 stats_plugin_->GetUInt64CounterValue(
1682 kMetricDefaultTargetPicks,
1683 {target_uri_, rls_server_target_, default_target, "complete"}, {}),
1684 ::testing::Optional(1));
1685 }
1686
1687 } // namespace
1688 } // namespace testing
1689 } // namespace grpc
1690
main(int argc,char ** argv)1691 int main(int argc, char** argv) {
1692 ::testing::InitGoogleTest(&argc, argv);
1693 grpc::testing::TestEnvironment env(&argc, argv);
1694 return RUN_ALL_TESTS();
1695 }
1696