1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/grpc.h>
20 #include <grpc/support/alloc.h>
21 #include <grpc/support/time.h>
22 #include <grpcpp/channel.h>
23 #include <grpcpp/client_context.h>
24 #include <grpcpp/create_channel.h>
25 #include <grpcpp/resource_quota.h>
26 #include <grpcpp/security/auth_metadata_processor.h>
27 #include <grpcpp/security/credentials.h>
28 #include <grpcpp/security/server_credentials.h>
29 #include <grpcpp/server.h>
30 #include <grpcpp/server_builder.h>
31 #include <grpcpp/server_context.h>
32 #include <grpcpp/support/string_ref.h>
33 #include <grpcpp/test/channel_test_peer.h>
34
35 #include <mutex>
36 #include <thread>
37
38 #include "absl/log/check.h"
39 #include "absl/log/log.h"
40 #include "absl/memory/memory.h"
41 #include "absl/strings/ascii.h"
42 #include "absl/strings/match.h"
43 #include "absl/strings/str_format.h"
44 #include "src/core/client_channel/backup_poller.h"
45 #include "src/core/config/config_vars.h"
46 #include "src/core/lib/iomgr/iomgr.h"
47 #include "src/core/lib/security/credentials/credentials.h"
48 #include "src/core/util/crash.h"
49 #include "src/core/util/env.h"
50 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
51 #include "src/proto/grpc/testing/echo.grpc.pb.h"
52 #include "test/core/test_util/port.h"
53 #include "test/core/test_util/test_config.h"
54 #include "test/cpp/end2end/interceptors_util.h"
55 #include "test/cpp/end2end/test_service_impl.h"
56 #include "test/cpp/util/string_ref_helper.h"
57 #include "test/cpp/util/test_credentials_provider.h"
58
59 #ifdef GRPC_POSIX_SOCKET_EV
60 #include "src/core/lib/iomgr/ev_posix.h"
61 #endif // GRPC_POSIX_SOCKET_EV
62
63 #include <gtest/gtest.h>
64
65 using std::chrono::system_clock;
66
67 namespace grpc {
68 namespace testing {
69 namespace {
70
CheckIsLocalhost(const std::string & addr)71 bool CheckIsLocalhost(const std::string& addr) {
72 const std::string kIpv6("ipv6:%5B::1%5D:");
73 const std::string kIpv4MappedIpv6("ipv6:%5B::ffff:127.0.0.1%5D:");
74 const std::string kIpv4("ipv4:127.0.0.1:");
75 return addr.substr(0, kIpv4.size()) == kIpv4 ||
76 addr.substr(0, kIpv4MappedIpv6.size()) == kIpv4MappedIpv6 ||
77 addr.substr(0, kIpv6.size()) == kIpv6;
78 }
79
80 const int kClientChannelBackupPollIntervalMs = 200;
81
82 const char kTestCredsPluginErrorMsg[] = "Could not find plugin metadata.";
83
84 const char kFakeToken[] = "fake_token";
85 const char kFakeSelector[] = "fake_selector";
86 const char kExpectedFakeCredsDebugString[] =
87 "CallCredentials{GoogleIAMCredentials{Token:present,"
88 "AuthoritySelector:fake_selector}}";
89
90 const char kWrongToken[] = "wrong_token";
91 const char kWrongSelector[] = "wrong_selector";
92 const char kExpectedWrongCredsDebugString[] =
93 "CallCredentials{GoogleIAMCredentials{Token:present,"
94 "AuthoritySelector:wrong_selector}}";
95
96 const char kFakeToken1[] = "fake_token1";
97 const char kFakeSelector1[] = "fake_selector1";
98 const char kExpectedFakeCreds1DebugString[] =
99 "CallCredentials{GoogleIAMCredentials{Token:present,"
100 "AuthoritySelector:fake_selector1}}";
101
102 const char kFakeToken2[] = "fake_token2";
103 const char kFakeSelector2[] = "fake_selector2";
104 const char kExpectedFakeCreds2DebugString[] =
105 "CallCredentials{GoogleIAMCredentials{Token:present,"
106 "AuthoritySelector:fake_selector2}}";
107
108 const char kExpectedAuthMetadataPluginKeyFailureCredsDebugString[] =
109 "CallCredentials{TestMetadataCredentials{key:TestPluginMetadata,"
110 "value:Does not matter, will fail the key is invalid.}}";
111 const char kExpectedAuthMetadataPluginValueFailureCredsDebugString[] =
112 "CallCredentials{TestMetadataCredentials{key:test-plugin-metadata,"
113 "value:With illegal \n value.}}";
114 const char kExpectedAuthMetadataPluginWithDeadlineCredsDebugString[] =
115 "CallCredentials{TestMetadataCredentials{key:meta_key,value:Does "
116 "not "
117 "matter}}";
118 const char kExpectedNonBlockingAuthMetadataPluginFailureCredsDebugString[] =
119 "CallCredentials{TestMetadataCredentials{key:test-plugin-metadata,"
120 "value:Does not matter, will fail anyway (see 3rd param)}}";
121 const char
122 kExpectedNonBlockingAuthMetadataPluginAndProcessorSuccessCredsDebugString
123 [] = "CallCredentials{TestMetadataCredentials{key:test-plugin-"
124 "metadata,value:Dr Jekyll}}";
125 const char
126 kExpectedNonBlockingAuthMetadataPluginAndProcessorFailureCredsDebugString
127 [] = "CallCredentials{TestMetadataCredentials{key:test-plugin-"
128 "metadata,value:Mr Hyde}}";
129 const char kExpectedBlockingAuthMetadataPluginFailureCredsDebugString[] =
130 "CallCredentials{TestMetadataCredentials{key:test-plugin-metadata,"
131 "value:Does not matter, will fail anyway (see 3rd param)}}";
132 const char kExpectedCompositeCallCredsDebugString[] =
133 "CallCredentials{CompositeCallCredentials{TestMetadataCredentials{"
134 "key:call-creds-key1,value:call-creds-val1},TestMetadataCredentials{key:"
135 "call-creds-key2,value:call-creds-val2}}}";
136
137 class TestMetadataCredentialsPlugin : public MetadataCredentialsPlugin {
138 public:
139 static const char kGoodMetadataKey[];
140 static const char kBadMetadataKey[];
141
TestMetadataCredentialsPlugin(const grpc::string_ref & metadata_key,const grpc::string_ref & metadata_value,bool is_blocking,bool is_successful,int delay_ms)142 TestMetadataCredentialsPlugin(const grpc::string_ref& metadata_key,
143 const grpc::string_ref& metadata_value,
144 bool is_blocking, bool is_successful,
145 int delay_ms)
146 : metadata_key_(metadata_key.data(), metadata_key.length()),
147 metadata_value_(metadata_value.data(), metadata_value.length()),
148 is_blocking_(is_blocking),
149 is_successful_(is_successful),
150 delay_ms_(delay_ms) {}
151
IsBlocking() const152 bool IsBlocking() const override { return is_blocking_; }
153
GetMetadata(grpc::string_ref service_url,grpc::string_ref method_name,const grpc::AuthContext & channel_auth_context,std::multimap<std::string,std::string> * metadata)154 Status GetMetadata(
155 grpc::string_ref service_url, grpc::string_ref method_name,
156 const grpc::AuthContext& channel_auth_context,
157 std::multimap<std::string, std::string>* metadata) override {
158 if (delay_ms_ != 0) {
159 gpr_sleep_until(
160 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
161 gpr_time_from_millis(delay_ms_, GPR_TIMESPAN)));
162 }
163 EXPECT_GT(service_url.length(), 0UL);
164 EXPECT_GT(method_name.length(), 0UL);
165 EXPECT_TRUE(channel_auth_context.IsPeerAuthenticated());
166 EXPECT_TRUE(metadata != nullptr);
167 if (is_successful_) {
168 metadata->insert(std::make_pair(metadata_key_, metadata_value_));
169 return Status::OK;
170 } else {
171 return Status(StatusCode::NOT_FOUND, kTestCredsPluginErrorMsg);
172 }
173 }
174
DebugString()175 std::string DebugString() override {
176 return absl::StrFormat("TestMetadataCredentials{key:%s,value:%s}",
177 metadata_key_.c_str(), metadata_value_.c_str());
178 }
179
180 private:
181 std::string metadata_key_;
182 std::string metadata_value_;
183 bool is_blocking_;
184 bool is_successful_;
185 int delay_ms_;
186 };
187
188 const char TestMetadataCredentialsPlugin::kBadMetadataKey[] =
189 "TestPluginMetadata";
190 const char TestMetadataCredentialsPlugin::kGoodMetadataKey[] =
191 "test-plugin-metadata";
192
193 class TestAuthMetadataProcessor : public AuthMetadataProcessor {
194 public:
195 static const char kGoodGuy[];
196
TestAuthMetadataProcessor(bool is_blocking)197 explicit TestAuthMetadataProcessor(bool is_blocking)
198 : is_blocking_(is_blocking) {}
199
GetCompatibleClientCreds()200 std::shared_ptr<CallCredentials> GetCompatibleClientCreds() {
201 return grpc::MetadataCredentialsFromPlugin(
202 std::unique_ptr<MetadataCredentialsPlugin>(
203 new TestMetadataCredentialsPlugin(
204 TestMetadataCredentialsPlugin::kGoodMetadataKey, kGoodGuy,
205 is_blocking_, true, 0)));
206 }
207
GetIncompatibleClientCreds()208 std::shared_ptr<CallCredentials> GetIncompatibleClientCreds() {
209 return grpc::MetadataCredentialsFromPlugin(
210 std::unique_ptr<MetadataCredentialsPlugin>(
211 new TestMetadataCredentialsPlugin(
212 TestMetadataCredentialsPlugin::kGoodMetadataKey, "Mr Hyde",
213 is_blocking_, true, 0)));
214 }
215
216 // Interface implementation
IsBlocking() const217 bool IsBlocking() const override { return is_blocking_; }
218
Process(const InputMetadata & auth_metadata,AuthContext * context,OutputMetadata * consumed_auth_metadata,OutputMetadata * response_metadata)219 Status Process(const InputMetadata& auth_metadata, AuthContext* context,
220 OutputMetadata* consumed_auth_metadata,
221 OutputMetadata* response_metadata) override {
222 EXPECT_TRUE(consumed_auth_metadata != nullptr);
223 EXPECT_TRUE(context != nullptr);
224 EXPECT_TRUE(response_metadata != nullptr);
225 auto auth_md =
226 auth_metadata.find(TestMetadataCredentialsPlugin::kGoodMetadataKey);
227 EXPECT_NE(auth_md, auth_metadata.end());
228 string_ref auth_md_value = auth_md->second;
229 if (auth_md_value == kGoodGuy) {
230 context->AddProperty(kIdentityPropName, kGoodGuy);
231 context->SetPeerIdentityPropertyName(kIdentityPropName);
232 consumed_auth_metadata->insert(std::make_pair(
233 string(auth_md->first.data(), auth_md->first.length()),
234 string(auth_md->second.data(), auth_md->second.length())));
235 return Status::OK;
236 } else {
237 return Status(StatusCode::UNAUTHENTICATED,
238 string("Invalid principal: ") +
239 string(auth_md_value.data(), auth_md_value.length()));
240 }
241 }
242
243 private:
244 static const char kIdentityPropName[];
245 bool is_blocking_;
246 };
247
248 const char TestAuthMetadataProcessor::kGoodGuy[] = "Dr Jekyll";
249 const char TestAuthMetadataProcessor::kIdentityPropName[] = "novel identity";
250
251 class Proxy : public grpc::testing::EchoTestService::Service {
252 public:
Proxy(const std::shared_ptr<Channel> & channel)253 explicit Proxy(const std::shared_ptr<Channel>& channel)
254 : stub_(grpc::testing::EchoTestService::NewStub(channel)) {}
255
Echo(ServerContext * server_context,const EchoRequest * request,EchoResponse * response)256 Status Echo(ServerContext* server_context, const EchoRequest* request,
257 EchoResponse* response) override {
258 std::unique_ptr<ClientContext> client_context =
259 ClientContext::FromServerContext(*server_context);
260 return stub_->Echo(client_context.get(), *request, response);
261 }
262
263 private:
264 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
265 };
266
267 class TestServiceImplDupPkg
268 : public grpc::testing::duplicate::EchoTestService::Service {
269 public:
Echo(ServerContext *,const EchoRequest *,EchoResponse * response)270 Status Echo(ServerContext* /*context*/, const EchoRequest* /*request*/,
271 EchoResponse* response) override {
272 response->set_message("no package");
273 return Status::OK;
274 }
275 };
276
277 class TestScenario {
278 public:
TestScenario(bool use_interceptors,bool use_proxy,bool inproc,const std::string & credentials_type,bool callback_server)279 TestScenario(bool use_interceptors, bool use_proxy, bool inproc,
280 const std::string& credentials_type, bool callback_server)
281 : use_interceptors_(use_interceptors),
282 use_proxy_(use_proxy),
283 inproc_(inproc),
284 credentials_type_(credentials_type),
285 callback_server_(callback_server) {}
286
use_interceptors() const287 bool use_interceptors() const { return use_interceptors_; }
use_proxy() const288 bool use_proxy() const { return use_proxy_; }
inproc() const289 bool inproc() const { return inproc_; }
credentials_type() const290 const std::string& credentials_type() const { return credentials_type_; }
callback_server() const291 bool callback_server() const { return callback_server_; }
292
293 std::string AsString() const;
294
Name(const::testing::TestParamInfo<TestScenario> & info)295 static std::string Name(const ::testing::TestParamInfo<TestScenario>& info) {
296 return info.param.AsString();
297 }
298
299 private:
300 bool use_interceptors_;
301 bool use_proxy_;
302 bool inproc_;
303 const std::string credentials_type_;
304 bool callback_server_;
305 };
306
AsString() const307 std::string TestScenario::AsString() const {
308 std::string retval = use_interceptors_ ? "Interceptor" : "";
309 if (use_proxy_) retval += "Proxy";
310 if (inproc_) retval += "Inproc";
311 if (callback_server_) retval += "CallbackServer";
312 if (credentials_type_ == kInsecureCredentialsType) {
313 retval += "Insecure";
314 } else {
315 std::string creds_type = absl::AsciiStrToLower(credentials_type_);
316 if (!creds_type.empty()) creds_type[0] = absl::ascii_toupper(creds_type[0]);
317 retval += creds_type;
318 }
319 return retval;
320 }
321
322 class End2endTest : public ::testing::TestWithParam<TestScenario> {
323 protected:
SetUpTestSuite()324 static void SetUpTestSuite() { grpc_init(); }
TearDownTestSuite()325 static void TearDownTestSuite() { grpc_shutdown(); }
End2endTest()326 End2endTest()
327 : is_server_started_(false),
328 kMaxMessageSize_(8192),
329 special_service_("special"),
330 first_picked_port_(0) {}
331
TearDown()332 void TearDown() override {
333 if (is_server_started_) {
334 server_->Shutdown();
335 if (proxy_server_) proxy_server_->Shutdown();
336 }
337 if (first_picked_port_ > 0) {
338 grpc_recycle_unused_port(first_picked_port_);
339 }
340 }
341
StartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)342 void StartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
343 int port = grpc_pick_unused_port_or_die();
344 first_picked_port_ = port;
345 server_address_ << "localhost:" << port;
346 // Setup server
347 BuildAndStartServer(processor);
348 }
349
RestartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)350 void RestartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
351 if (is_server_started_) {
352 server_->Shutdown();
353 BuildAndStartServer(processor);
354 }
355 }
356
BuildAndStartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)357 void BuildAndStartServer(
358 const std::shared_ptr<AuthMetadataProcessor>& processor) {
359 ServerBuilder builder;
360 ConfigureServerBuilder(&builder);
361 auto server_creds = GetCredentialsProvider()->GetServerCredentials(
362 GetParam().credentials_type());
363 if (GetParam().credentials_type() != kInsecureCredentialsType) {
364 server_creds->SetAuthMetadataProcessor(processor);
365 }
366 if (GetParam().use_interceptors()) {
367 std::vector<
368 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
369 creators;
370 // Add 20 phony server interceptors
371 creators.reserve(20);
372 for (auto i = 0; i < 20; i++) {
373 creators.push_back(std::make_unique<PhonyInterceptorFactory>());
374 }
375 builder.experimental().SetInterceptorCreators(std::move(creators));
376 }
377 builder.AddListeningPort(server_address_.str(), server_creds);
378 if (!GetParam().callback_server()) {
379 builder.RegisterService(&service_);
380 } else {
381 builder.RegisterService(&callback_service_);
382 }
383 builder.RegisterService("foo.test.youtube.com", &special_service_);
384 builder.RegisterService(&dup_pkg_service_);
385
386 builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
387 builder.SetSyncServerOption(
388 ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
389
390 server_ = builder.BuildAndStart();
391 is_server_started_ = true;
392 }
393
ConfigureServerBuilder(ServerBuilder * builder)394 virtual void ConfigureServerBuilder(ServerBuilder* builder) {
395 builder->SetMaxMessageSize(
396 kMaxMessageSize_); // For testing max message size.
397 }
398
ResetChannel(std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators={})399 void ResetChannel(
400 std::vector<
401 std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
402 interceptor_creators = {}) {
403 if (!is_server_started_) {
404 StartServer(std::shared_ptr<AuthMetadataProcessor>());
405 }
406 EXPECT_TRUE(is_server_started_);
407 ChannelArguments args;
408 auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
409 GetParam().credentials_type(), &args);
410 if (!user_agent_prefix_.empty()) {
411 args.SetUserAgentPrefix(user_agent_prefix_);
412 }
413 args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test");
414
415 if (!GetParam().inproc()) {
416 if (!GetParam().use_interceptors()) {
417 channel_ = grpc::CreateCustomChannel(server_address_.str(),
418 channel_creds, args);
419 } else {
420 channel_ = CreateCustomChannelWithInterceptors(
421 server_address_.str(), channel_creds, args,
422 interceptor_creators.empty() ? CreatePhonyClientInterceptors()
423 : std::move(interceptor_creators));
424 }
425 } else {
426 if (!GetParam().use_interceptors()) {
427 channel_ = server_->InProcessChannel(args);
428 } else {
429 channel_ = server_->experimental().InProcessChannelWithInterceptors(
430 args, interceptor_creators.empty()
431 ? CreatePhonyClientInterceptors()
432 : std::move(interceptor_creators));
433 }
434 }
435 }
436
ResetStub(std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators={})437 void ResetStub(
438 std::vector<
439 std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
440 interceptor_creators = {}) {
441 ResetChannel(std::move(interceptor_creators));
442 if (GetParam().use_proxy()) {
443 proxy_service_ = std::make_unique<Proxy>(channel_);
444 int port = grpc_pick_unused_port_or_die();
445 std::ostringstream proxyaddr;
446 proxyaddr << "localhost:" << port;
447 ServerBuilder builder;
448 builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials());
449 builder.RegisterService(proxy_service_.get());
450
451 builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
452 builder.SetSyncServerOption(
453 ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
454
455 proxy_server_ = builder.BuildAndStart();
456
457 channel_ =
458 grpc::CreateChannel(proxyaddr.str(), InsecureChannelCredentials());
459 }
460
461 stub_ = grpc::testing::EchoTestService::NewStub(channel_);
462 PhonyInterceptor::Reset();
463 }
464
465 bool is_server_started_;
466 std::shared_ptr<Channel> channel_;
467 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
468 std::unique_ptr<Server> server_;
469 std::unique_ptr<Server> proxy_server_;
470 std::unique_ptr<Proxy> proxy_service_;
471 std::ostringstream server_address_;
472 const int kMaxMessageSize_;
473 TestServiceImpl service_;
474 CallbackTestServiceImpl callback_service_;
475 TestServiceImpl special_service_;
476 TestServiceImplDupPkg dup_pkg_service_;
477 std::string user_agent_prefix_;
478 int first_picked_port_;
479 };
480
SendRpc(grpc::testing::EchoTestService::Stub * stub,int num_rpcs,bool with_binary_metadata)481 void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
482 bool with_binary_metadata) {
483 EchoRequest request;
484 EchoResponse response;
485 request.set_message("Hello hello hello hello");
486
487 for (int i = 0; i < num_rpcs; ++i) {
488 ClientContext context;
489 if (with_binary_metadata) {
490 char bytes[8] = {'\0', '\1', '\2', '\3',
491 '\4', '\5', '\6', static_cast<char>(i)};
492 context.AddMetadata("custom-bin", std::string(bytes, 8));
493 }
494 context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
495 Status s = stub->Echo(&context, request, &response);
496 EXPECT_EQ(response.message(), request.message());
497 EXPECT_TRUE(s.ok());
498 }
499 }
500
501 // This class is for testing scenarios where RPCs are cancelled on the server
502 // by calling ServerContext::TryCancel()
503 class End2endServerTryCancelTest : public End2endTest {
504 protected:
505 // Helper for testing client-streaming RPCs which are cancelled on the server.
506 // Depending on the value of server_try_cancel parameter, this will test one
507 // of the following three scenarios:
508 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
509 // any messages from the client
510 //
511 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
512 // messages from the client
513 //
514 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
515 // the messages from the client
516 //
517 // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestRequestStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,int num_msgs_to_send)518 void TestRequestStreamServerCancel(
519 ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) {
520 RestartServer(std::shared_ptr<AuthMetadataProcessor>());
521 ResetStub();
522 EchoRequest request;
523 EchoResponse response;
524 ClientContext context;
525
526 // Send server_try_cancel value in the client metadata
527 context.AddMetadata(kServerTryCancelRequest,
528 std::to_string(server_try_cancel));
529
530 auto stream = stub_->RequestStream(&context, &response);
531
532 int num_msgs_sent = 0;
533 while (num_msgs_sent < num_msgs_to_send) {
534 request.set_message("hello");
535 if (!stream->Write(request)) {
536 break;
537 }
538 num_msgs_sent++;
539 }
540 LOG(INFO) << "Sent " << num_msgs_sent << " messages";
541
542 stream->WritesDone();
543 Status s = stream->Finish();
544
545 // At this point, we know for sure that RPC was cancelled by the server
546 // since we passed server_try_cancel value in the metadata. Depending on the
547 // value of server_try_cancel, the RPC might have been cancelled by the
548 // server at different stages. The following validates our expectations of
549 // number of messages sent in various cancellation scenarios:
550
551 switch (server_try_cancel) {
552 case CANCEL_BEFORE_PROCESSING:
553 case CANCEL_DURING_PROCESSING:
554 // If the RPC is cancelled by server before / during messages from the
555 // client, it means that the client most likely did not get a chance to
556 // send all the messages it wanted to send. i.e num_msgs_sent <=
557 // num_msgs_to_send
558 EXPECT_LE(num_msgs_sent, num_msgs_to_send);
559 break;
560
561 case CANCEL_AFTER_PROCESSING:
562 // If the RPC was cancelled after all messages were read by the server,
563 // the client did get a chance to send all its messages
564 EXPECT_EQ(num_msgs_sent, num_msgs_to_send);
565 break;
566
567 default:
568 LOG(ERROR) << "Invalid server_try_cancel value: " << server_try_cancel;
569 EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
570 server_try_cancel <= CANCEL_AFTER_PROCESSING);
571 break;
572 }
573
574 EXPECT_FALSE(s.ok());
575 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
576 // Make sure that the server interceptors were notified
577 if (GetParam().use_interceptors()) {
578 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
579 }
580 }
581
582 // Helper for testing server-streaming RPCs which are cancelled on the server.
583 // Depending on the value of server_try_cancel parameter, this will test one
584 // of the following three scenarios:
585 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before writing
586 // any messages to the client
587 //
588 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while writing
589 // messages to the client
590 //
591 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after writing all
592 // the messages to the client
593 //
594 // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestResponseStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel)595 void TestResponseStreamServerCancel(
596 ServerTryCancelRequestPhase server_try_cancel) {
597 RestartServer(std::shared_ptr<AuthMetadataProcessor>());
598 ResetStub();
599 EchoRequest request;
600 EchoResponse response;
601 ClientContext context;
602
603 // Send server_try_cancel in the client metadata
604 context.AddMetadata(kServerTryCancelRequest,
605 std::to_string(server_try_cancel));
606
607 request.set_message("hello");
608 auto stream = stub_->ResponseStream(&context, request);
609
610 int num_msgs_read = 0;
611 while (num_msgs_read < kServerDefaultResponseStreamsToSend) {
612 if (!stream->Read(&response)) {
613 break;
614 }
615 EXPECT_EQ(response.message(),
616 request.message() + std::to_string(num_msgs_read));
617 num_msgs_read++;
618 }
619 LOG(INFO) << "Read " << num_msgs_read << " messages";
620
621 Status s = stream->Finish();
622
623 // Depending on the value of server_try_cancel, the RPC might have been
624 // cancelled by the server at different stages. The following validates our
625 // expectations of number of messages read in various cancellation
626 // scenarios:
627 switch (server_try_cancel) {
628 case CANCEL_BEFORE_PROCESSING:
629 // Server cancelled before sending any messages. Which means the client
630 // wouldn't have read any
631 EXPECT_EQ(num_msgs_read, 0);
632 break;
633
634 case CANCEL_DURING_PROCESSING:
635 // Server cancelled while writing messages. Client must have read less
636 // than or equal to the expected number of messages
637 EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
638 break;
639
640 case CANCEL_AFTER_PROCESSING:
641 // Even though the Server cancelled after writing all messages, the RPC
642 // may be cancelled before the Client got a chance to read all the
643 // messages.
644 EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
645 break;
646
647 default: {
648 LOG(ERROR) << "Invalid server_try_cancel value: " << server_try_cancel;
649 EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
650 server_try_cancel <= CANCEL_AFTER_PROCESSING);
651 break;
652 }
653 }
654
655 EXPECT_FALSE(s.ok());
656 // Make sure that the server interceptors were notified
657 if (GetParam().use_interceptors()) {
658 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
659 }
660 }
661
662 // Helper for testing bidirectional-streaming RPCs which are cancelled on the
663 // server. Depending on the value of server_try_cancel parameter, this will
664 // test one of the following three scenarios:
665 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
666 // writing any messages from/to the client
667 //
668 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading/
669 // writing messages from/to the client
670 //
671 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading/writing
672 // all the messages from/to the client
673 //
674 // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,int num_messages)675 void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,
676 int num_messages) {
677 RestartServer(std::shared_ptr<AuthMetadataProcessor>());
678 ResetStub();
679 EchoRequest request;
680 EchoResponse response;
681 ClientContext context;
682
683 // Send server_try_cancel in the client metadata
684 context.AddMetadata(kServerTryCancelRequest,
685 std::to_string(server_try_cancel));
686
687 auto stream = stub_->BidiStream(&context);
688
689 int num_msgs_read = 0;
690 int num_msgs_sent = 0;
691 while (num_msgs_sent < num_messages) {
692 request.set_message("hello " + std::to_string(num_msgs_sent));
693 if (!stream->Write(request)) {
694 break;
695 }
696 num_msgs_sent++;
697
698 if (!stream->Read(&response)) {
699 break;
700 }
701 num_msgs_read++;
702
703 EXPECT_EQ(response.message(), request.message());
704 }
705 LOG(INFO) << "Sent " << num_msgs_sent << " messages";
706 LOG(INFO) << "Read " << num_msgs_read << " messages";
707
708 stream->WritesDone();
709 Status s = stream->Finish();
710
711 // Depending on the value of server_try_cancel, the RPC might have been
712 // cancelled by the server at different stages. The following validates our
713 // expectations of number of messages read in various cancellation
714 // scenarios:
715 switch (server_try_cancel) {
716 case CANCEL_BEFORE_PROCESSING:
717 EXPECT_EQ(num_msgs_read, 0);
718 break;
719
720 case CANCEL_DURING_PROCESSING:
721 EXPECT_LE(num_msgs_sent, num_messages);
722 EXPECT_LE(num_msgs_read, num_msgs_sent);
723 break;
724
725 case CANCEL_AFTER_PROCESSING:
726 EXPECT_EQ(num_msgs_sent, num_messages);
727
728 // The Server cancelled after reading the last message and after writing
729 // the message to the client. However, the RPC cancellation might have
730 // taken effect before the client actually read the response.
731 EXPECT_LE(num_msgs_read, num_msgs_sent);
732 break;
733
734 default:
735 LOG(ERROR) << "Invalid server_try_cancel value: " << server_try_cancel;
736 EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
737 server_try_cancel <= CANCEL_AFTER_PROCESSING);
738 break;
739 }
740
741 EXPECT_FALSE(s.ok());
742 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
743 // Make sure that the server interceptors were notified
744 if (GetParam().use_interceptors()) {
745 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
746 }
747 }
748 };
749
TEST_P(End2endServerTryCancelTest,RequestEchoServerCancel)750 TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) {
751 ResetStub();
752 EchoRequest request;
753 EchoResponse response;
754 ClientContext context;
755
756 context.AddMetadata(kServerTryCancelRequest,
757 std::to_string(CANCEL_BEFORE_PROCESSING));
758 Status s = stub_->Echo(&context, request, &response);
759 EXPECT_FALSE(s.ok());
760 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
761 }
762
763 // Server to cancel before doing reading the request
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelBeforeReads)764 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelBeforeReads) {
765 TestRequestStreamServerCancel(CANCEL_BEFORE_PROCESSING, 1);
766 }
767
768 // Server to cancel while reading a request from the stream in parallel
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelDuringRead)769 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelDuringRead) {
770 TestRequestStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
771 }
772
773 // Server to cancel after reading all the requests but before returning to the
774 // client
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelAfterReads)775 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelAfterReads) {
776 TestRequestStreamServerCancel(CANCEL_AFTER_PROCESSING, 4);
777 }
778
779 // Server to cancel before sending any response messages
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelBefore)780 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelBefore) {
781 TestResponseStreamServerCancel(CANCEL_BEFORE_PROCESSING);
782 }
783
784 // Server to cancel while writing a response to the stream in parallel
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelDuring)785 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelDuring) {
786 TestResponseStreamServerCancel(CANCEL_DURING_PROCESSING);
787 }
788
789 // Server to cancel after writing all the responses to the stream but before
790 // returning to the client
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelAfter)791 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelAfter) {
792 TestResponseStreamServerCancel(CANCEL_AFTER_PROCESSING);
793 }
794
795 // Server to cancel before reading/writing any requests/responses on the stream
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelBefore)796 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelBefore) {
797 TestBidiStreamServerCancel(CANCEL_BEFORE_PROCESSING, 2);
798 }
799
800 // Server to cancel while reading/writing requests/responses on the stream in
801 // parallel
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelDuring)802 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelDuring) {
803 TestBidiStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
804 }
805
806 // Server to cancel after reading/writing all requests/responses on the stream
807 // but before returning to the client
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelAfter)808 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) {
809 TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5);
810 }
811
TEST_P(End2endTest,SimpleRpcWithCustomUserAgentPrefix)812 TEST_P(End2endTest, SimpleRpcWithCustomUserAgentPrefix) {
813 // User-Agent is an HTTP header for HTTP transports only
814 if (GetParam().inproc()) {
815 return;
816 }
817 user_agent_prefix_ = "custom_prefix";
818 ResetStub();
819 EchoRequest request;
820 EchoResponse response;
821 request.set_message("Hello hello hello hello");
822 request.mutable_param()->set_echo_metadata(true);
823
824 ClientContext context;
825 Status s = stub_->Echo(&context, request, &response);
826 EXPECT_EQ(response.message(), request.message());
827 EXPECT_TRUE(s.ok());
828 const auto& trailing_metadata = context.GetServerTrailingMetadata();
829 auto iter = trailing_metadata.find("user-agent");
830 EXPECT_TRUE(iter != trailing_metadata.end());
831 std::string expected_prefix = user_agent_prefix_ + " grpc-c++/";
832 EXPECT_TRUE(iter->second.starts_with(expected_prefix)) << iter->second;
833 }
834
TEST_P(End2endTest,MultipleRpcsWithVariedBinaryMetadataValue)835 TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) {
836 ResetStub();
837 std::vector<std::thread> threads;
838 threads.reserve(10);
839 for (int i = 0; i < 10; ++i) {
840 threads.emplace_back(SendRpc, stub_.get(), 10, true);
841 }
842 for (int i = 0; i < 10; ++i) {
843 threads[i].join();
844 }
845 }
846
TEST_P(End2endTest,MultipleRpcs)847 TEST_P(End2endTest, MultipleRpcs) {
848 ResetStub();
849 std::vector<std::thread> threads;
850 threads.reserve(10);
851 for (int i = 0; i < 10; ++i) {
852 threads.emplace_back(SendRpc, stub_.get(), 10, false);
853 }
854 for (int i = 0; i < 10; ++i) {
855 threads[i].join();
856 }
857 }
858
TEST_P(End2endTest,ManyStubs)859 TEST_P(End2endTest, ManyStubs) {
860 ResetStub();
861 ChannelTestPeer peer(channel_.get());
862 int registered_calls_pre = peer.registered_calls();
863 for (int i = 0; i < 1000; ++i) {
864 grpc::testing::EchoTestService::NewStub(channel_);
865 }
866 EXPECT_EQ(peer.registered_calls(), registered_calls_pre);
867 }
868
TEST_P(End2endTest,EmptyBinaryMetadata)869 TEST_P(End2endTest, EmptyBinaryMetadata) {
870 ResetStub();
871 EchoRequest request;
872 EchoResponse response;
873 request.set_message("Hello hello hello hello");
874 ClientContext context;
875 context.AddMetadata("custom-bin", "");
876 Status s = stub_->Echo(&context, request, &response);
877 EXPECT_EQ(response.message(), request.message());
878 EXPECT_TRUE(s.ok());
879 }
880
TEST_P(End2endTest,AuthoritySeenOnServerSide)881 TEST_P(End2endTest, AuthoritySeenOnServerSide) {
882 ResetStub();
883 EchoRequest request;
884 request.mutable_param()->set_echo_host_from_authority_header(true);
885 EchoResponse response;
886 request.set_message("Live long and prosper.");
887 ClientContext context;
888 Status s = stub_->Echo(&context, request, &response);
889 EXPECT_EQ(response.message(), request.message());
890 if (GetParam().credentials_type() == kTlsCredentialsType) {
891 // SSL creds overrides the authority.
892 EXPECT_EQ("foo.test.google.fr", response.param().host());
893 } else if (GetParam().inproc()) {
894 EXPECT_EQ("inproc", response.param().host());
895 } else {
896 EXPECT_EQ(server_address_.str(), response.param().host());
897 }
898 EXPECT_TRUE(s.ok());
899 }
900
TEST_P(End2endTest,ReconnectChannel)901 TEST_P(End2endTest, ReconnectChannel) {
902 if (GetParam().inproc()) {
903 return;
904 }
905 int poller_slowdown_factor = 1;
906 // It needs 2 pollset_works to reconnect the channel with polling engine
907 // "poll"
908 #ifdef GRPC_POSIX_SOCKET_EV
909 if (grpc_core::ConfigVars::Get().PollStrategy() == "poll") {
910 poller_slowdown_factor = 2;
911 }
912 #endif // GRPC_POSIX_SOCKET_EV
913 ResetStub();
914 SendRpc(stub_.get(), 1, false);
915 RestartServer(std::shared_ptr<AuthMetadataProcessor>());
916 // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
917 // reconnect the channel. Make it a factor of 5x
918 gpr_sleep_until(
919 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
920 gpr_time_from_millis(kClientChannelBackupPollIntervalMs * 5 *
921 poller_slowdown_factor *
922 grpc_test_slowdown_factor(),
923 GPR_TIMESPAN)));
924 SendRpc(stub_.get(), 1, false);
925 }
926
TEST_P(End2endTest,RequestStreamOneRequest)927 TEST_P(End2endTest, RequestStreamOneRequest) {
928 ResetStub();
929 EchoRequest request;
930 EchoResponse response;
931 ClientContext context;
932
933 auto stream = stub_->RequestStream(&context, &response);
934 request.set_message("hello");
935 EXPECT_TRUE(stream->Write(request));
936 stream->WritesDone();
937 Status s = stream->Finish();
938 EXPECT_EQ(response.message(), request.message());
939 EXPECT_TRUE(s.ok());
940 EXPECT_TRUE(context.debug_error_string().empty());
941 }
942
TEST_P(End2endTest,RequestStreamOneRequestWithCoalescingApi)943 TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) {
944 ResetStub();
945 EchoRequest request;
946 EchoResponse response;
947 ClientContext context;
948
949 context.set_initial_metadata_corked(true);
950 auto stream = stub_->RequestStream(&context, &response);
951 request.set_message("hello");
952 stream->WriteLast(request, WriteOptions());
953 Status s = stream->Finish();
954 EXPECT_EQ(response.message(), request.message());
955 EXPECT_TRUE(s.ok());
956 }
957
TEST_P(End2endTest,RequestStreamTwoRequests)958 TEST_P(End2endTest, RequestStreamTwoRequests) {
959 ResetStub();
960 EchoRequest request;
961 EchoResponse response;
962 ClientContext context;
963
964 auto stream = stub_->RequestStream(&context, &response);
965 request.set_message("hello");
966 EXPECT_TRUE(stream->Write(request));
967 EXPECT_TRUE(stream->Write(request));
968 stream->WritesDone();
969 Status s = stream->Finish();
970 EXPECT_EQ(response.message(), "hellohello");
971 EXPECT_TRUE(s.ok());
972 }
973
TEST_P(End2endTest,RequestStreamTwoRequestsWithWriteThrough)974 TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) {
975 ResetStub();
976 EchoRequest request;
977 EchoResponse response;
978 ClientContext context;
979
980 auto stream = stub_->RequestStream(&context, &response);
981 request.set_message("hello");
982 EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
983 EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
984 stream->WritesDone();
985 Status s = stream->Finish();
986 EXPECT_EQ(response.message(), "hellohello");
987 EXPECT_TRUE(s.ok());
988 }
989
TEST_P(End2endTest,RequestStreamTwoRequestsWithCoalescingApi)990 TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) {
991 ResetStub();
992 EchoRequest request;
993 EchoResponse response;
994 ClientContext context;
995
996 context.set_initial_metadata_corked(true);
997 auto stream = stub_->RequestStream(&context, &response);
998 request.set_message("hello");
999 EXPECT_TRUE(stream->Write(request));
1000 stream->WriteLast(request, WriteOptions());
1001 Status s = stream->Finish();
1002 EXPECT_EQ(response.message(), "hellohello");
1003 EXPECT_TRUE(s.ok());
1004 }
1005
TEST_P(End2endTest,ResponseStream)1006 TEST_P(End2endTest, ResponseStream) {
1007 ResetStub();
1008 EchoRequest request;
1009 EchoResponse response;
1010 ClientContext context;
1011 request.set_message("hello");
1012
1013 auto stream = stub_->ResponseStream(&context, request);
1014 for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
1015 EXPECT_TRUE(stream->Read(&response));
1016 EXPECT_EQ(response.message(), request.message() + std::to_string(i));
1017 }
1018 EXPECT_FALSE(stream->Read(&response));
1019
1020 Status s = stream->Finish();
1021 EXPECT_TRUE(s.ok());
1022 }
1023
TEST_P(End2endTest,ResponseStreamWithCoalescingApi)1024 TEST_P(End2endTest, ResponseStreamWithCoalescingApi) {
1025 ResetStub();
1026 EchoRequest request;
1027 EchoResponse response;
1028 ClientContext context;
1029 request.set_message("hello");
1030 context.AddMetadata(kServerUseCoalescingApi, "1");
1031
1032 auto stream = stub_->ResponseStream(&context, request);
1033 for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
1034 EXPECT_TRUE(stream->Read(&response));
1035 EXPECT_EQ(response.message(), request.message() + std::to_string(i));
1036 }
1037 EXPECT_FALSE(stream->Read(&response));
1038
1039 Status s = stream->Finish();
1040 EXPECT_TRUE(s.ok());
1041 }
1042
1043 // This was added to prevent regression from issue:
1044 // https://github.com/grpc/grpc/issues/11546
TEST_P(End2endTest,ResponseStreamWithEverythingCoalesced)1045 TEST_P(End2endTest, ResponseStreamWithEverythingCoalesced) {
1046 ResetStub();
1047 EchoRequest request;
1048 EchoResponse response;
1049 ClientContext context;
1050 request.set_message("hello");
1051 context.AddMetadata(kServerUseCoalescingApi, "1");
1052 // We will only send one message, forcing everything (init metadata, message,
1053 // trailing) to be coalesced together.
1054 context.AddMetadata(kServerResponseStreamsToSend, "1");
1055
1056 auto stream = stub_->ResponseStream(&context, request);
1057 EXPECT_TRUE(stream->Read(&response));
1058 EXPECT_EQ(response.message(), request.message() + "0");
1059
1060 EXPECT_FALSE(stream->Read(&response));
1061
1062 Status s = stream->Finish();
1063 EXPECT_TRUE(s.ok());
1064 }
1065
TEST_P(End2endTest,BidiStream)1066 TEST_P(End2endTest, BidiStream) {
1067 ResetStub();
1068 EchoRequest request;
1069 EchoResponse response;
1070 ClientContext context;
1071 std::string msg("hello");
1072
1073 auto stream = stub_->BidiStream(&context);
1074
1075 for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
1076 request.set_message(msg + std::to_string(i));
1077 EXPECT_TRUE(stream->Write(request));
1078 EXPECT_TRUE(stream->Read(&response));
1079 EXPECT_EQ(response.message(), request.message());
1080 }
1081
1082 stream->WritesDone();
1083 EXPECT_FALSE(stream->Read(&response));
1084 EXPECT_FALSE(stream->Read(&response));
1085
1086 Status s = stream->Finish();
1087 EXPECT_TRUE(s.ok());
1088 }
1089
TEST_P(End2endTest,BidiStreamWithCoalescingApi)1090 TEST_P(End2endTest, BidiStreamWithCoalescingApi) {
1091 ResetStub();
1092 EchoRequest request;
1093 EchoResponse response;
1094 ClientContext context;
1095 context.AddMetadata(kServerFinishAfterNReads, "3");
1096 context.set_initial_metadata_corked(true);
1097 std::string msg("hello");
1098
1099 auto stream = stub_->BidiStream(&context);
1100
1101 request.set_message(msg + "0");
1102 EXPECT_TRUE(stream->Write(request));
1103 EXPECT_TRUE(stream->Read(&response));
1104 EXPECT_EQ(response.message(), request.message());
1105
1106 request.set_message(msg + "1");
1107 EXPECT_TRUE(stream->Write(request));
1108 EXPECT_TRUE(stream->Read(&response));
1109 EXPECT_EQ(response.message(), request.message());
1110
1111 request.set_message(msg + "2");
1112 stream->WriteLast(request, WriteOptions());
1113 EXPECT_TRUE(stream->Read(&response));
1114 EXPECT_EQ(response.message(), request.message());
1115
1116 EXPECT_FALSE(stream->Read(&response));
1117 EXPECT_FALSE(stream->Read(&response));
1118
1119 Status s = stream->Finish();
1120 EXPECT_TRUE(s.ok());
1121 }
1122
1123 // This was added to prevent regression from issue:
1124 // https://github.com/grpc/grpc/issues/11546
TEST_P(End2endTest,BidiStreamWithEverythingCoalesced)1125 TEST_P(End2endTest, BidiStreamWithEverythingCoalesced) {
1126 ResetStub();
1127 EchoRequest request;
1128 EchoResponse response;
1129 ClientContext context;
1130 context.AddMetadata(kServerFinishAfterNReads, "1");
1131 context.set_initial_metadata_corked(true);
1132 std::string msg("hello");
1133
1134 auto stream = stub_->BidiStream(&context);
1135
1136 request.set_message(msg + "0");
1137 stream->WriteLast(request, WriteOptions());
1138 EXPECT_TRUE(stream->Read(&response));
1139 EXPECT_EQ(response.message(), request.message());
1140
1141 EXPECT_FALSE(stream->Read(&response));
1142 EXPECT_FALSE(stream->Read(&response));
1143
1144 Status s = stream->Finish();
1145 EXPECT_TRUE(s.ok());
1146 }
1147
1148 // Talk to the two services with the same name but different package names.
1149 // The two stubs are created on the same channel.
TEST_P(End2endTest,DiffPackageServices)1150 TEST_P(End2endTest, DiffPackageServices) {
1151 ResetStub();
1152 EchoRequest request;
1153 EchoResponse response;
1154 request.set_message("Hello");
1155
1156 ClientContext context;
1157 Status s = stub_->Echo(&context, request, &response);
1158 EXPECT_EQ(response.message(), request.message());
1159 EXPECT_TRUE(s.ok());
1160
1161 std::unique_ptr<grpc::testing::duplicate::EchoTestService::Stub> dup_pkg_stub(
1162 grpc::testing::duplicate::EchoTestService::NewStub(channel_));
1163 ClientContext context2;
1164 s = dup_pkg_stub->Echo(&context2, request, &response);
1165 EXPECT_EQ("no package", response.message());
1166 EXPECT_TRUE(s.ok());
1167 }
1168
1169 template <class ServiceType>
CancelRpc(ClientContext * context,int delay_us,ServiceType * service)1170 void CancelRpc(ClientContext* context, int delay_us, ServiceType* service) {
1171 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
1172 gpr_time_from_micros(delay_us, GPR_TIMESPAN)));
1173 while (!service->signal_client()) {
1174 }
1175 context->TryCancel();
1176 }
1177
TEST_P(End2endTest,CancelRpcBeforeStart)1178 TEST_P(End2endTest, CancelRpcBeforeStart) {
1179 ResetStub();
1180 EchoRequest request;
1181 EchoResponse response;
1182 ClientContext context;
1183 request.set_message("hello");
1184 context.TryCancel();
1185 Status s = stub_->Echo(&context, request, &response);
1186 EXPECT_EQ("", response.message());
1187 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1188 if (GetParam().use_interceptors()) {
1189 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1190 }
1191 }
1192
TEST_P(End2endTest,CancelRpcAfterStart)1193 TEST_P(End2endTest, CancelRpcAfterStart) {
1194 for (int i = 0; i < 10; i++) {
1195 ResetStub();
1196 EchoRequest request;
1197 EchoResponse response;
1198 ClientContext context;
1199 request.set_message("hello");
1200 request.mutable_param()->set_server_notify_client_when_started(true);
1201 request.mutable_param()->set_skip_cancelled_check(true);
1202 Status s;
1203 std::thread echo_thread([this, &s, &context, &request, &response] {
1204 s = stub_->Echo(&context, request, &response);
1205 });
1206 if (!GetParam().callback_server()) {
1207 EXPECT_EQ(service_.ClientWaitUntilNRpcsStarted(1), 1);
1208 } else {
1209 EXPECT_EQ(callback_service_.ClientWaitUntilNRpcsStarted(1), 1);
1210 }
1211
1212 context.TryCancel();
1213
1214 if (!GetParam().callback_server()) {
1215 service_.SignalServerToContinue();
1216 } else {
1217 callback_service_.SignalServerToContinue();
1218 }
1219
1220 echo_thread.join();
1221 // TODO(ctiller): improve test to not be flaky
1222 //
1223 // TryCancel is best effort, and it can happen that the cancellation is not
1224 // acted upon before the server wakes up, sends a response, and the client
1225 // reads that.
1226 // For this reason, we try a few times here to see the cancellation result.
1227 if (s.ok()) continue;
1228 EXPECT_EQ("", response.message());
1229 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1230 if (GetParam().use_interceptors()) {
1231 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1232 }
1233 return;
1234 }
1235 GTEST_FAIL() << "Failed to get cancellation";
1236 }
1237
1238 // Client cancels request stream after sending two messages
TEST_P(End2endTest,ClientCancelsRequestStream)1239 TEST_P(End2endTest, ClientCancelsRequestStream) {
1240 ResetStub();
1241 EchoRequest request;
1242 EchoResponse response;
1243 ClientContext context;
1244 request.set_message("hello");
1245
1246 auto stream = stub_->RequestStream(&context, &response);
1247 EXPECT_TRUE(stream->Write(request));
1248 EXPECT_TRUE(stream->Write(request));
1249
1250 context.TryCancel();
1251
1252 Status s = stream->Finish();
1253 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1254
1255 EXPECT_EQ(response.message(), "");
1256 if (GetParam().use_interceptors()) {
1257 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1258 }
1259 }
1260
1261 // Client cancels server stream after sending some messages
TEST_P(End2endTest,ClientCancelsResponseStream)1262 TEST_P(End2endTest, ClientCancelsResponseStream) {
1263 ResetStub();
1264 EchoRequest request;
1265 EchoResponse response;
1266 ClientContext context;
1267 request.set_message("hello");
1268
1269 auto stream = stub_->ResponseStream(&context, request);
1270
1271 EXPECT_TRUE(stream->Read(&response));
1272 EXPECT_EQ(response.message(), request.message() + "0");
1273 EXPECT_TRUE(stream->Read(&response));
1274 EXPECT_EQ(response.message(), request.message() + "1");
1275
1276 context.TryCancel();
1277
1278 // The cancellation races with responses, so there might be zero or
1279 // one responses pending, read till failure
1280
1281 if (stream->Read(&response)) {
1282 EXPECT_EQ(response.message(), request.message() + "2");
1283 // Since we have cancelled, we expect the next attempt to read to fail
1284 EXPECT_FALSE(stream->Read(&response));
1285 }
1286
1287 Status s = stream->Finish();
1288 // The final status could be either of CANCELLED or OK depending on
1289 // who won the race.
1290 EXPECT_GE(grpc::StatusCode::CANCELLED, s.error_code());
1291 if (GetParam().use_interceptors()) {
1292 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1293 }
1294 }
1295
1296 // Client cancels bidi stream after sending some messages
TEST_P(End2endTest,ClientCancelsBidi)1297 TEST_P(End2endTest, ClientCancelsBidi) {
1298 ResetStub();
1299 EchoRequest request;
1300 EchoResponse response;
1301 ClientContext context;
1302 std::string msg("hello");
1303
1304 // Send server_try_cancel value in the client metadata
1305 context.AddMetadata(kClientTryCancelRequest, std::to_string(1));
1306
1307 auto stream = stub_->BidiStream(&context);
1308
1309 request.set_message(msg + "0");
1310 EXPECT_TRUE(stream->Write(request));
1311 EXPECT_TRUE(stream->Read(&response));
1312 EXPECT_EQ(response.message(), request.message());
1313
1314 request.set_message(msg + "1");
1315 EXPECT_TRUE(stream->Write(request));
1316
1317 context.TryCancel();
1318
1319 // The cancellation races with responses, so there might be zero or
1320 // one responses pending, read till failure
1321
1322 if (stream->Read(&response)) {
1323 EXPECT_EQ(response.message(), request.message());
1324 // Since we have cancelled, we expect the next attempt to read to fail
1325 EXPECT_FALSE(stream->Read(&response));
1326 }
1327
1328 Status s = stream->Finish();
1329 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1330 if (GetParam().use_interceptors()) {
1331 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1332 }
1333 }
1334
TEST_P(End2endTest,RpcMaxMessageSize)1335 TEST_P(End2endTest, RpcMaxMessageSize) {
1336 ResetStub();
1337 EchoRequest request;
1338 EchoResponse response;
1339 request.set_message(string(kMaxMessageSize_ * 2, 'a'));
1340 request.mutable_param()->set_server_die(true);
1341
1342 ClientContext context;
1343 Status s = stub_->Echo(&context, request, &response);
1344 EXPECT_FALSE(s.ok());
1345 }
1346
ReaderThreadFunc(ClientReaderWriter<EchoRequest,EchoResponse> * stream,gpr_event * ev)1347 void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream,
1348 gpr_event* ev) {
1349 EchoResponse resp;
1350 gpr_event_set(ev, reinterpret_cast<void*>(1));
1351 while (stream->Read(&resp)) {
1352 LOG(INFO) << "Read message";
1353 }
1354 }
1355
1356 // Run a Read and a WritesDone simultaneously.
TEST_P(End2endTest,SimultaneousReadWritesDone)1357 TEST_P(End2endTest, SimultaneousReadWritesDone) {
1358 ResetStub();
1359 ClientContext context;
1360 gpr_event ev;
1361 gpr_event_init(&ev);
1362 auto stream = stub_->BidiStream(&context);
1363 std::thread reader_thread(ReaderThreadFunc, stream.get(), &ev);
1364 gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
1365 stream->WritesDone();
1366 reader_thread.join();
1367 Status s = stream->Finish();
1368 EXPECT_TRUE(s.ok());
1369 }
1370
TEST_P(End2endTest,ChannelState)1371 TEST_P(End2endTest, ChannelState) {
1372 if (GetParam().inproc()) {
1373 return;
1374 }
1375
1376 ResetStub();
1377 // Start IDLE
1378 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
1379
1380 // Did not ask to connect, no state change.
1381 CompletionQueue cq;
1382 std::chrono::system_clock::time_point deadline =
1383 std::chrono::system_clock::now() + std::chrono::milliseconds(10);
1384 channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, nullptr);
1385 void* tag;
1386 bool ok = true;
1387 cq.Next(&tag, &ok);
1388 EXPECT_FALSE(ok);
1389
1390 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true));
1391 EXPECT_TRUE(channel_->WaitForStateChange(GRPC_CHANNEL_IDLE,
1392 gpr_inf_future(GPR_CLOCK_REALTIME)));
1393 auto state = channel_->GetState(false);
1394 EXPECT_TRUE(state == GRPC_CHANNEL_CONNECTING || state == GRPC_CHANNEL_READY);
1395 }
1396
1397 // Takes 10s.
TEST_P(End2endTest,ChannelStateTimeout)1398 TEST_P(End2endTest, ChannelStateTimeout) {
1399 if ((GetParam().credentials_type() != kInsecureCredentialsType) ||
1400 GetParam().inproc()) {
1401 return;
1402 }
1403 int port = grpc_pick_unused_port_or_die();
1404 std::ostringstream server_address;
1405 server_address << "localhost:" << port;
1406 // Channel to non-existing server
1407 auto channel =
1408 grpc::CreateChannel(server_address.str(), InsecureChannelCredentials());
1409 // Start IDLE
1410 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(true));
1411
1412 auto state = GRPC_CHANNEL_IDLE;
1413 for (int i = 0; i < 10; i++) {
1414 channel->WaitForStateChange(
1415 state, std::chrono::system_clock::now() + std::chrono::seconds(1));
1416 state = channel->GetState(false);
1417 }
1418 }
1419
TEST_P(End2endTest,ChannelStateOnLameChannel)1420 TEST_P(End2endTest, ChannelStateOnLameChannel) {
1421 if ((GetParam().credentials_type() != kInsecureCredentialsType) ||
1422 GetParam().inproc()) {
1423 return;
1424 }
1425 // Channel using invalid target URI. This creates a lame channel.
1426 auto channel = grpc::CreateChannel("dns:///", InsecureChannelCredentials());
1427 // Channel should immediately report TRANSIENT_FAILURE.
1428 EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel->GetState(true));
1429 // And state will never change.
1430 auto state = GRPC_CHANNEL_TRANSIENT_FAILURE;
1431 for (int i = 0; i < 10; ++i) {
1432 channel->WaitForStateChange(
1433 state, std::chrono::system_clock::now() + std::chrono::seconds(1));
1434 state = channel->GetState(false);
1435 }
1436 }
1437
1438 // Talking to a non-existing service.
TEST_P(End2endTest,NonExistingService)1439 TEST_P(End2endTest, NonExistingService) {
1440 ResetChannel();
1441 std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1442 stub = grpc::testing::UnimplementedEchoService::NewStub(channel_);
1443
1444 EchoRequest request;
1445 EchoResponse response;
1446 request.set_message("Hello");
1447
1448 ClientContext context;
1449 Status s = stub->Unimplemented(&context, request, &response);
1450 EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code());
1451 EXPECT_EQ("", s.error_message());
1452 }
1453
1454 // Ask the server to send back a serialized proto in trailer.
1455 // This is an example of setting error details.
TEST_P(End2endTest,BinaryTrailerTest)1456 TEST_P(End2endTest, BinaryTrailerTest) {
1457 ResetStub();
1458 EchoRequest request;
1459 EchoResponse response;
1460 ClientContext context;
1461
1462 request.mutable_param()->set_echo_metadata(true);
1463 DebugInfo* info = request.mutable_param()->mutable_debug_info();
1464 info->add_stack_entries("stack_entry_1");
1465 info->add_stack_entries("stack_entry_2");
1466 info->add_stack_entries("stack_entry_3");
1467 info->set_detail("detailed debug info");
1468 std::string expected_string = info->SerializeAsString();
1469 request.set_message("Hello");
1470
1471 Status s = stub_->Echo(&context, request, &response);
1472 EXPECT_FALSE(s.ok());
1473 auto trailers = context.GetServerTrailingMetadata();
1474 EXPECT_EQ(1u, trailers.count(kDebugInfoTrailerKey));
1475 auto iter = trailers.find(kDebugInfoTrailerKey);
1476 EXPECT_EQ(expected_string, iter->second);
1477 // Parse the returned trailer into a DebugInfo proto.
1478 DebugInfo returned_info;
1479 EXPECT_TRUE(returned_info.ParseFromString(ToString(iter->second)));
1480 }
1481
TEST_P(End2endTest,ExpectErrorTest)1482 TEST_P(End2endTest, ExpectErrorTest) {
1483 ResetStub();
1484
1485 std::vector<ErrorStatus> expected_status;
1486 expected_status.emplace_back();
1487 expected_status.back().set_code(13); // INTERNAL
1488 // No Error message or details
1489
1490 expected_status.emplace_back();
1491 expected_status.back().set_code(13); // INTERNAL
1492 expected_status.back().set_error_message("text error message");
1493 expected_status.back().set_binary_error_details("text error details");
1494
1495 expected_status.emplace_back();
1496 expected_status.back().set_code(13); // INTERNAL
1497 expected_status.back().set_error_message("text error message");
1498 expected_status.back().set_binary_error_details(
1499 "\x0\x1\x2\x3\x4\x5\x6\x8\x9\xA\xB");
1500
1501 for (auto iter = expected_status.begin(); iter != expected_status.end();
1502 ++iter) {
1503 EchoRequest request;
1504 EchoResponse response;
1505 ClientContext context;
1506 request.set_message("Hello");
1507 auto* error = request.mutable_param()->mutable_expected_error();
1508 error->set_code(iter->code());
1509 error->set_error_message(iter->error_message());
1510 error->set_binary_error_details(iter->binary_error_details());
1511
1512 Status s = stub_->Echo(&context, request, &response);
1513 EXPECT_FALSE(s.ok());
1514 EXPECT_EQ(iter->code(), s.error_code());
1515 EXPECT_EQ(iter->error_message(), s.error_message());
1516 EXPECT_EQ(iter->binary_error_details(), s.error_details());
1517 EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "created"));
1518 #ifndef NDEBUG
1519 // grpc_core::StatusIntProperty::kFileLine is for debug only
1520 EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "file"));
1521 EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "line"));
1522 #endif
1523 EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "status"));
1524 EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "13"));
1525 }
1526 }
1527
1528 //////////////////////////////////////////////////////////////////////////
1529 // Test with and without a proxy.
1530 class ProxyEnd2endTest : public End2endTest {
1531 protected:
1532 };
1533
TEST_P(ProxyEnd2endTest,SimpleRpc)1534 TEST_P(ProxyEnd2endTest, SimpleRpc) {
1535 ResetStub();
1536 SendRpc(stub_.get(), 1, false);
1537 }
1538
TEST_P(ProxyEnd2endTest,SimpleRpcWithEmptyMessages)1539 TEST_P(ProxyEnd2endTest, SimpleRpcWithEmptyMessages) {
1540 ResetStub();
1541 EchoRequest request;
1542 EchoResponse response;
1543
1544 ClientContext context;
1545 Status s = stub_->Echo(&context, request, &response);
1546 EXPECT_TRUE(s.ok());
1547 }
1548
TEST_P(ProxyEnd2endTest,MultipleRpcs)1549 TEST_P(ProxyEnd2endTest, MultipleRpcs) {
1550 ResetStub();
1551 std::vector<std::thread> threads;
1552 threads.reserve(10);
1553 for (int i = 0; i < 10; ++i) {
1554 threads.emplace_back(SendRpc, stub_.get(), 10, false);
1555 }
1556 for (int i = 0; i < 10; ++i) {
1557 threads[i].join();
1558 }
1559 }
1560
1561 // Set a 10us deadline and make sure proper error is returned.
TEST_P(ProxyEnd2endTest,RpcDeadlineExpires)1562 TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) {
1563 ResetStub();
1564 EchoRequest request;
1565 EchoResponse response;
1566 request.set_message("Hello");
1567 request.mutable_param()->set_skip_cancelled_check(true);
1568 // Let server sleep for 4 secs first to guarantee expiry.
1569 // 4 secs might seem a bit extreme but the timer manager would have been just
1570 // initialized (when ResetStub() was called) and there are some warmup costs
1571 // i.e the timer thread many not have even started. There might also be other
1572 // delays in the timer manager thread (in acquiring locks, timer data
1573 // structure manipulations, starting backup timer threads) that add to the
1574 // delays. 4 secs might be still not enough in some cases but this
1575 // significantly reduces the test flakes
1576 request.mutable_param()->set_server_sleep_us(4 * 1000 * 1000);
1577
1578 ClientContext context;
1579 std::chrono::system_clock::time_point deadline =
1580 std::chrono::system_clock::now() + std::chrono::milliseconds(1);
1581 context.set_deadline(deadline);
1582 Status s = stub_->Echo(&context, request, &response);
1583 EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.error_code());
1584 }
1585
1586 // Set a long but finite deadline.
TEST_P(ProxyEnd2endTest,RpcLongDeadline)1587 TEST_P(ProxyEnd2endTest, RpcLongDeadline) {
1588 ResetStub();
1589 EchoRequest request;
1590 EchoResponse response;
1591 request.set_message("Hello");
1592
1593 ClientContext context;
1594 std::chrono::system_clock::time_point deadline =
1595 std::chrono::system_clock::now() + std::chrono::hours(1);
1596 context.set_deadline(deadline);
1597 Status s = stub_->Echo(&context, request, &response);
1598 EXPECT_EQ(response.message(), request.message());
1599 EXPECT_TRUE(s.ok());
1600 }
1601
1602 // Ask server to echo back the deadline it sees.
TEST_P(ProxyEnd2endTest,EchoDeadline)1603 TEST_P(ProxyEnd2endTest, EchoDeadline) {
1604 ResetStub();
1605 EchoRequest request;
1606 EchoResponse response;
1607 request.set_message("Hello");
1608 request.mutable_param()->set_echo_deadline(true);
1609
1610 ClientContext context;
1611 std::chrono::system_clock::time_point deadline =
1612 std::chrono::system_clock::now() + std::chrono::seconds(100);
1613 context.set_deadline(deadline);
1614 Status s = stub_->Echo(&context, request, &response);
1615 EXPECT_EQ(response.message(), request.message());
1616 EXPECT_TRUE(s.ok());
1617 gpr_timespec sent_deadline;
1618 Timepoint2Timespec(deadline, &sent_deadline);
1619 // We want to allow some reasonable error given:
1620 // - request_deadline() only has 1sec resolution so the best we can do is +-1
1621 // - if sent_deadline.tv_nsec is very close to the next second's boundary we
1622 // can end up being off by 2 in one direction.
1623 EXPECT_LE(response.param().request_deadline() - sent_deadline.tv_sec, 2);
1624 EXPECT_GE(response.param().request_deadline() - sent_deadline.tv_sec, -1);
1625 }
1626
1627 // Ask server to echo back the deadline it sees. The rpc has no deadline.
TEST_P(ProxyEnd2endTest,EchoDeadlineForNoDeadlineRpc)1628 TEST_P(ProxyEnd2endTest, EchoDeadlineForNoDeadlineRpc) {
1629 ResetStub();
1630 EchoRequest request;
1631 EchoResponse response;
1632 request.set_message("Hello");
1633 request.mutable_param()->set_echo_deadline(true);
1634
1635 ClientContext context;
1636 Status s = stub_->Echo(&context, request, &response);
1637 EXPECT_EQ(response.message(), request.message());
1638 EXPECT_TRUE(s.ok());
1639 EXPECT_EQ(response.param().request_deadline(),
1640 gpr_inf_future(GPR_CLOCK_REALTIME).tv_sec);
1641 }
1642
TEST_P(ProxyEnd2endTest,UnimplementedRpc)1643 TEST_P(ProxyEnd2endTest, UnimplementedRpc) {
1644 ResetStub();
1645 EchoRequest request;
1646 EchoResponse response;
1647 request.set_message("Hello");
1648
1649 ClientContext context;
1650 Status s = stub_->Unimplemented(&context, request, &response);
1651 EXPECT_FALSE(s.ok());
1652 EXPECT_EQ(s.error_code(), grpc::StatusCode::UNIMPLEMENTED);
1653 EXPECT_EQ(s.error_message(), "");
1654 EXPECT_EQ(response.message(), "");
1655 }
1656
1657 // Client cancels rpc after 10ms
TEST_P(ProxyEnd2endTest,ClientCancelsRpc)1658 TEST_P(ProxyEnd2endTest, ClientCancelsRpc) {
1659 ResetStub();
1660 EchoRequest request;
1661 EchoResponse response;
1662 request.set_message("Hello");
1663 const int kCancelDelayUs = 10 * 1000;
1664 request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs);
1665
1666 ClientContext context;
1667 std::thread cancel_thread;
1668 if (!GetParam().callback_server()) {
1669 cancel_thread = std::thread(
1670 [&context, this](int delay) { CancelRpc(&context, delay, &service_); },
1671 kCancelDelayUs);
1672 // Note: the unusual pattern above (and below) is caused by a conflict
1673 // between two sets of compiler expectations. clang allows const to be
1674 // captured without mention, so there is no need to capture kCancelDelayUs
1675 // (and indeed clang-tidy complains if you do so). OTOH, a Windows compiler
1676 // in our tests requires an explicit capture even for const. We square this
1677 // circle by passing the const value in as an argument to the lambda.
1678 } else {
1679 cancel_thread = std::thread(
1680 [&context, this](int delay) {
1681 CancelRpc(&context, delay, &callback_service_);
1682 },
1683 kCancelDelayUs);
1684 }
1685 Status s = stub_->Echo(&context, request, &response);
1686 cancel_thread.join();
1687 EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1688 EXPECT_EQ(s.error_message(), "CANCELLED");
1689 }
1690
1691 // Server cancels rpc after 1ms
TEST_P(ProxyEnd2endTest,ServerCancelsRpc)1692 TEST_P(ProxyEnd2endTest, ServerCancelsRpc) {
1693 ResetStub();
1694 EchoRequest request;
1695 EchoResponse response;
1696 request.set_message("Hello");
1697 request.mutable_param()->set_server_cancel_after_us(1000);
1698
1699 ClientContext context;
1700 Status s = stub_->Echo(&context, request, &response);
1701 EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1702 EXPECT_TRUE(s.error_message().empty());
1703 }
1704
1705 // Make the response larger than the flow control window.
TEST_P(ProxyEnd2endTest,HugeResponse)1706 TEST_P(ProxyEnd2endTest, HugeResponse) {
1707 ResetStub();
1708 EchoRequest request;
1709 EchoResponse response;
1710 request.set_message("huge response");
1711 const size_t kResponseSize = 1024 * (1024 + 10);
1712 request.mutable_param()->set_response_message_length(kResponseSize);
1713
1714 ClientContext context;
1715 std::chrono::system_clock::time_point deadline =
1716 std::chrono::system_clock::now() + std::chrono::seconds(20);
1717 context.set_deadline(deadline);
1718 Status s = stub_->Echo(&context, request, &response);
1719 EXPECT_EQ(kResponseSize, response.message().size());
1720 EXPECT_TRUE(s.ok());
1721 }
1722
TEST_P(ProxyEnd2endTest,Peer)1723 TEST_P(ProxyEnd2endTest, Peer) {
1724 // Peer is not meaningful for inproc
1725 if (GetParam().inproc()) {
1726 return;
1727 }
1728 ResetStub();
1729 EchoRequest request;
1730 EchoResponse response;
1731 request.set_message("hello");
1732 request.mutable_param()->set_echo_peer(true);
1733
1734 ClientContext context;
1735 Status s = stub_->Echo(&context, request, &response);
1736 EXPECT_EQ(response.message(), request.message());
1737 EXPECT_TRUE(s.ok());
1738 EXPECT_TRUE(CheckIsLocalhost(response.param().peer()));
1739 EXPECT_TRUE(CheckIsLocalhost(context.peer()));
1740 }
1741
1742 //////////////////////////////////////////////////////////////////////////
1743 class SecureEnd2endTest : public End2endTest {
1744 protected:
SecureEnd2endTest()1745 SecureEnd2endTest() {
1746 CHECK(!GetParam().use_proxy());
1747 CHECK(GetParam().credentials_type() != kInsecureCredentialsType);
1748 }
1749 };
1750
TEST_P(SecureEnd2endTest,SimpleRpcWithHost)1751 TEST_P(SecureEnd2endTest, SimpleRpcWithHost) {
1752 ResetStub();
1753
1754 EchoRequest request;
1755 EchoResponse response;
1756 request.set_message("Hello");
1757
1758 ClientContext context;
1759 context.set_authority("foo.test.youtube.com");
1760 Status s = stub_->Echo(&context, request, &response);
1761 EXPECT_EQ(response.message(), request.message());
1762 EXPECT_TRUE(response.has_param());
1763 EXPECT_EQ("special", response.param().host());
1764 EXPECT_TRUE(s.ok());
1765 }
1766
MetadataContains(const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,const std::string & key,const std::string & value)1767 bool MetadataContains(
1768 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
1769 const std::string& key, const std::string& value) {
1770 int count = 0;
1771
1772 for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator iter =
1773 metadata.begin();
1774 iter != metadata.end(); ++iter) {
1775 if (ToString(iter->first) == key && ToString(iter->second) == value) {
1776 count++;
1777 }
1778 }
1779 return count == 1;
1780 }
1781
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginAndProcessorSuccess)1782 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) {
1783 auto* processor = new TestAuthMetadataProcessor(true);
1784 StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1785 ResetStub();
1786 EchoRequest request;
1787 EchoResponse response;
1788 ClientContext context;
1789 context.set_credentials(processor->GetCompatibleClientCreds());
1790 request.set_message("Hello");
1791 request.mutable_param()->set_echo_metadata(true);
1792 request.mutable_param()->set_expected_client_identity(
1793 TestAuthMetadataProcessor::kGoodGuy);
1794 request.mutable_param()->set_expected_transport_security_type(
1795 GetParam().credentials_type());
1796
1797 Status s = stub_->Echo(&context, request, &response);
1798 EXPECT_EQ(request.message(), response.message());
1799 EXPECT_TRUE(s.ok());
1800
1801 // Metadata should have been consumed by the processor.
1802 EXPECT_FALSE(MetadataContains(
1803 context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY,
1804 std::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
1805 }
1806
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginAndProcessorFailure)1807 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorFailure) {
1808 auto* processor = new TestAuthMetadataProcessor(true);
1809 StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1810 ResetStub();
1811 EchoRequest request;
1812 EchoResponse response;
1813 ClientContext context;
1814 context.set_credentials(processor->GetIncompatibleClientCreds());
1815 request.set_message("Hello");
1816
1817 Status s = stub_->Echo(&context, request, &response);
1818 EXPECT_FALSE(s.ok());
1819 EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
1820 }
1821
TEST_P(SecureEnd2endTest,SetPerCallCredentials)1822 TEST_P(SecureEnd2endTest, SetPerCallCredentials) {
1823 ResetStub();
1824 EchoRequest request;
1825 EchoResponse response;
1826 ClientContext context;
1827 std::shared_ptr<CallCredentials> creds =
1828 GoogleIAMCredentials(kFakeToken, kFakeSelector);
1829 context.set_credentials(creds);
1830 request.set_message("Hello");
1831 request.mutable_param()->set_echo_metadata(true);
1832
1833 Status s = stub_->Echo(&context, request, &response);
1834 EXPECT_EQ(request.message(), response.message());
1835 EXPECT_TRUE(s.ok());
1836 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1837 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1838 kFakeToken));
1839 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1840 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1841 kFakeSelector));
1842 EXPECT_EQ(context.credentials()->DebugString(),
1843 kExpectedFakeCredsDebugString);
1844 }
1845
1846 class CredentialsInterceptor : public experimental::Interceptor {
1847 public:
CredentialsInterceptor(experimental::ClientRpcInfo * info)1848 explicit CredentialsInterceptor(experimental::ClientRpcInfo* info)
1849 : info_(info) {}
1850
Intercept(experimental::InterceptorBatchMethods * methods)1851 void Intercept(experimental::InterceptorBatchMethods* methods) override {
1852 if (methods->QueryInterceptionHookPoint(
1853 experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
1854 std::shared_ptr<CallCredentials> creds =
1855 GoogleIAMCredentials(kFakeToken, kFakeSelector);
1856 info_->client_context()->set_credentials(creds);
1857 }
1858 methods->Proceed();
1859 }
1860
1861 private:
1862 experimental::ClientRpcInfo* info_ = nullptr;
1863 };
1864
1865 class CredentialsInterceptorFactory
1866 : public experimental::ClientInterceptorFactoryInterface {
CreateClientInterceptor(experimental::ClientRpcInfo * info)1867 CredentialsInterceptor* CreateClientInterceptor(
1868 experimental::ClientRpcInfo* info) override {
1869 return new CredentialsInterceptor(info);
1870 }
1871 };
1872
TEST_P(SecureEnd2endTest,CallCredentialsInterception)1873 TEST_P(SecureEnd2endTest, CallCredentialsInterception) {
1874 if (!GetParam().use_interceptors()) {
1875 return;
1876 }
1877 std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
1878 interceptor_creators;
1879 interceptor_creators.push_back(
1880 std::make_unique<CredentialsInterceptorFactory>());
1881 ResetStub(std::move(interceptor_creators));
1882 EchoRequest request;
1883 EchoResponse response;
1884 ClientContext context;
1885
1886 request.set_message("Hello");
1887 request.mutable_param()->set_echo_metadata(true);
1888
1889 Status s = stub_->Echo(&context, request, &response);
1890 EXPECT_EQ(request.message(), response.message());
1891 EXPECT_TRUE(s.ok());
1892 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1893 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1894 kFakeToken));
1895 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1896 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1897 kFakeSelector));
1898 EXPECT_EQ(context.credentials()->DebugString(),
1899 kExpectedFakeCredsDebugString);
1900 }
1901
TEST_P(SecureEnd2endTest,CallCredentialsInterceptionWithSetCredentials)1902 TEST_P(SecureEnd2endTest, CallCredentialsInterceptionWithSetCredentials) {
1903 if (!GetParam().use_interceptors()) {
1904 return;
1905 }
1906 std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
1907 interceptor_creators;
1908 interceptor_creators.push_back(
1909 std::make_unique<CredentialsInterceptorFactory>());
1910 ResetStub(std::move(interceptor_creators));
1911 EchoRequest request;
1912 EchoResponse response;
1913 ClientContext context;
1914 std::shared_ptr<CallCredentials> creds1 =
1915 GoogleIAMCredentials(kWrongToken, kWrongSelector);
1916 context.set_credentials(creds1);
1917 EXPECT_EQ(context.credentials(), creds1);
1918 EXPECT_EQ(context.credentials()->DebugString(),
1919 kExpectedWrongCredsDebugString);
1920 request.set_message("Hello");
1921 request.mutable_param()->set_echo_metadata(true);
1922
1923 Status s = stub_->Echo(&context, request, &response);
1924 EXPECT_EQ(request.message(), response.message());
1925 EXPECT_TRUE(s.ok());
1926 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1927 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1928 kFakeToken));
1929 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1930 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1931 kFakeSelector));
1932 EXPECT_EQ(context.credentials()->DebugString(),
1933 kExpectedFakeCredsDebugString);
1934 }
1935
TEST_P(SecureEnd2endTest,OverridePerCallCredentials)1936 TEST_P(SecureEnd2endTest, OverridePerCallCredentials) {
1937 ResetStub();
1938 EchoRequest request;
1939 EchoResponse response;
1940 ClientContext context;
1941 std::shared_ptr<CallCredentials> creds1 =
1942 GoogleIAMCredentials(kFakeToken1, kFakeSelector1);
1943 context.set_credentials(creds1);
1944 EXPECT_EQ(context.credentials(), creds1);
1945 EXPECT_EQ(context.credentials()->DebugString(),
1946 kExpectedFakeCreds1DebugString);
1947 std::shared_ptr<CallCredentials> creds2 =
1948 GoogleIAMCredentials(kFakeToken2, kFakeSelector2);
1949 context.set_credentials(creds2);
1950 EXPECT_EQ(context.credentials(), creds2);
1951 request.set_message("Hello");
1952 request.mutable_param()->set_echo_metadata(true);
1953
1954 Status s = stub_->Echo(&context, request, &response);
1955 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1956 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1957 kFakeToken2));
1958 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1959 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1960 kFakeSelector2));
1961 EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
1962 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1963 kFakeToken1));
1964 EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
1965 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1966 kFakeSelector1));
1967 EXPECT_EQ(context.credentials()->DebugString(),
1968 kExpectedFakeCreds2DebugString);
1969 EXPECT_EQ(request.message(), response.message());
1970 EXPECT_TRUE(s.ok());
1971 }
1972
TEST_P(SecureEnd2endTest,AuthMetadataPluginKeyFailure)1973 TEST_P(SecureEnd2endTest, AuthMetadataPluginKeyFailure) {
1974 ResetStub();
1975 EchoRequest request;
1976 EchoResponse response;
1977 ClientContext context;
1978 context.set_credentials(grpc::MetadataCredentialsFromPlugin(
1979 std::unique_ptr<MetadataCredentialsPlugin>(
1980 new TestMetadataCredentialsPlugin(
1981 TestMetadataCredentialsPlugin::kBadMetadataKey,
1982 "Does not matter, will fail the key is invalid.", false, true,
1983 0))));
1984 request.set_message("Hello");
1985
1986 Status s = stub_->Echo(&context, request, &response);
1987 EXPECT_FALSE(s.ok());
1988 EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1989 EXPECT_EQ(context.credentials()->DebugString(),
1990 kExpectedAuthMetadataPluginKeyFailureCredsDebugString);
1991 }
1992
TEST_P(SecureEnd2endTest,AuthMetadataPluginValueFailure)1993 TEST_P(SecureEnd2endTest, AuthMetadataPluginValueFailure) {
1994 ResetStub();
1995 EchoRequest request;
1996 EchoResponse response;
1997 ClientContext context;
1998 context.set_credentials(grpc::MetadataCredentialsFromPlugin(
1999 std::unique_ptr<MetadataCredentialsPlugin>(
2000 new TestMetadataCredentialsPlugin(
2001 TestMetadataCredentialsPlugin::kGoodMetadataKey,
2002 "With illegal \n value.", false, true, 0))));
2003 request.set_message("Hello");
2004
2005 Status s = stub_->Echo(&context, request, &response);
2006 EXPECT_FALSE(s.ok());
2007 EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
2008 EXPECT_EQ(context.credentials()->DebugString(),
2009 kExpectedAuthMetadataPluginValueFailureCredsDebugString);
2010 }
2011
TEST_P(SecureEnd2endTest,AuthMetadataPluginWithDeadline)2012 TEST_P(SecureEnd2endTest, AuthMetadataPluginWithDeadline) {
2013 ResetStub();
2014 EchoRequest request;
2015 request.mutable_param()->set_skip_cancelled_check(true);
2016 EchoResponse response;
2017 ClientContext context;
2018 const int delay = 100;
2019 std::chrono::system_clock::time_point deadline =
2020 std::chrono::system_clock::now() + std::chrono::milliseconds(delay);
2021 context.set_deadline(deadline);
2022 context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2023 std::unique_ptr<MetadataCredentialsPlugin>(
2024 new TestMetadataCredentialsPlugin("meta_key", "Does not matter", true,
2025 true, delay))));
2026 request.set_message("Hello");
2027
2028 Status s = stub_->Echo(&context, request, &response);
2029 if (!s.ok()) {
2030 EXPECT_TRUE(s.error_code() == StatusCode::DEADLINE_EXCEEDED ||
2031 s.error_code() == StatusCode::UNAVAILABLE);
2032 }
2033 EXPECT_EQ(context.credentials()->DebugString(),
2034 kExpectedAuthMetadataPluginWithDeadlineCredsDebugString);
2035 }
2036
TEST_P(SecureEnd2endTest,AuthMetadataPluginWithCancel)2037 TEST_P(SecureEnd2endTest, AuthMetadataPluginWithCancel) {
2038 ResetStub();
2039 EchoRequest request;
2040 request.mutable_param()->set_skip_cancelled_check(true);
2041 EchoResponse response;
2042 ClientContext context;
2043 const int delay = 100;
2044 context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2045 std::unique_ptr<MetadataCredentialsPlugin>(
2046 new TestMetadataCredentialsPlugin("meta_key", "Does not matter", true,
2047 true, delay))));
2048 request.set_message("Hello");
2049
2050 std::thread cancel_thread([&] {
2051 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
2052 gpr_time_from_millis(delay, GPR_TIMESPAN)));
2053 context.TryCancel();
2054 });
2055 Status s = stub_->Echo(&context, request, &response);
2056 if (!s.ok()) {
2057 EXPECT_TRUE(s.error_code() == StatusCode::CANCELLED ||
2058 s.error_code() == StatusCode::UNAVAILABLE);
2059 }
2060 cancel_thread.join();
2061 EXPECT_EQ(context.credentials()->DebugString(),
2062 kExpectedAuthMetadataPluginWithDeadlineCredsDebugString);
2063 }
2064
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginFailure)2065 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) {
2066 ResetStub();
2067 EchoRequest request;
2068 EchoResponse response;
2069 ClientContext context;
2070 context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2071 std::unique_ptr<MetadataCredentialsPlugin>(
2072 new TestMetadataCredentialsPlugin(
2073 TestMetadataCredentialsPlugin::kGoodMetadataKey,
2074 "Does not matter, will fail anyway (see 3rd param)", false, false,
2075 0))));
2076 request.set_message("Hello");
2077
2078 Status s = stub_->Echo(&context, request, &response);
2079 EXPECT_FALSE(s.ok());
2080 EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
2081 EXPECT_EQ(s.error_message(),
2082 std::string("Getting metadata from plugin failed with error: ") +
2083 kTestCredsPluginErrorMsg);
2084 EXPECT_EQ(context.credentials()->DebugString(),
2085 kExpectedNonBlockingAuthMetadataPluginFailureCredsDebugString);
2086 }
2087
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginAndProcessorSuccess)2088 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) {
2089 auto* processor = new TestAuthMetadataProcessor(false);
2090 StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
2091 ResetStub();
2092 EchoRequest request;
2093 EchoResponse response;
2094 ClientContext context;
2095 context.set_credentials(processor->GetCompatibleClientCreds());
2096 request.set_message("Hello");
2097 request.mutable_param()->set_echo_metadata(true);
2098 request.mutable_param()->set_expected_client_identity(
2099 TestAuthMetadataProcessor::kGoodGuy);
2100 request.mutable_param()->set_expected_transport_security_type(
2101 GetParam().credentials_type());
2102
2103 Status s = stub_->Echo(&context, request, &response);
2104 EXPECT_EQ(request.message(), response.message());
2105 EXPECT_TRUE(s.ok());
2106
2107 // Metadata should have been consumed by the processor.
2108 EXPECT_FALSE(MetadataContains(
2109 context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY,
2110 std::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
2111 EXPECT_EQ(
2112 context.credentials()->DebugString(),
2113 kExpectedNonBlockingAuthMetadataPluginAndProcessorSuccessCredsDebugString);
2114 }
2115
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginAndProcessorFailure)2116 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) {
2117 auto* processor = new TestAuthMetadataProcessor(false);
2118 StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
2119 ResetStub();
2120 EchoRequest request;
2121 EchoResponse response;
2122 ClientContext context;
2123 context.set_credentials(processor->GetIncompatibleClientCreds());
2124 request.set_message("Hello");
2125
2126 Status s = stub_->Echo(&context, request, &response);
2127 EXPECT_FALSE(s.ok());
2128 EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
2129 EXPECT_EQ(
2130 context.credentials()->DebugString(),
2131 kExpectedNonBlockingAuthMetadataPluginAndProcessorFailureCredsDebugString);
2132 }
2133
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginFailure)2134 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) {
2135 ResetStub();
2136 EchoRequest request;
2137 EchoResponse response;
2138 ClientContext context;
2139 context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2140 std::unique_ptr<MetadataCredentialsPlugin>(
2141 new TestMetadataCredentialsPlugin(
2142 TestMetadataCredentialsPlugin::kGoodMetadataKey,
2143 "Does not matter, will fail anyway (see 3rd param)", true, false,
2144 0))));
2145 request.set_message("Hello");
2146
2147 Status s = stub_->Echo(&context, request, &response);
2148 EXPECT_FALSE(s.ok());
2149 EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
2150 EXPECT_EQ(s.error_message(),
2151 std::string("Getting metadata from plugin failed with error: ") +
2152 kTestCredsPluginErrorMsg);
2153 EXPECT_EQ(context.credentials()->DebugString(),
2154 kExpectedBlockingAuthMetadataPluginFailureCredsDebugString);
2155 }
2156
TEST_P(SecureEnd2endTest,CompositeCallCreds)2157 TEST_P(SecureEnd2endTest, CompositeCallCreds) {
2158 ResetStub();
2159 EchoRequest request;
2160 EchoResponse response;
2161 ClientContext context;
2162 const char kMetadataKey1[] = "call-creds-key1";
2163 const char kMetadataKey2[] = "call-creds-key2";
2164 const char kMetadataVal1[] = "call-creds-val1";
2165 const char kMetadataVal2[] = "call-creds-val2";
2166
2167 context.set_credentials(grpc::CompositeCallCredentials(
2168 grpc::MetadataCredentialsFromPlugin(
2169 std::unique_ptr<MetadataCredentialsPlugin>(
2170 new TestMetadataCredentialsPlugin(kMetadataKey1, kMetadataVal1,
2171 true, true, 0))),
2172 grpc::MetadataCredentialsFromPlugin(
2173 std::unique_ptr<MetadataCredentialsPlugin>(
2174 new TestMetadataCredentialsPlugin(kMetadataKey2, kMetadataVal2,
2175 true, true, 0)))));
2176 request.set_message("Hello");
2177 request.mutable_param()->set_echo_metadata(true);
2178
2179 Status s = stub_->Echo(&context, request, &response);
2180 EXPECT_TRUE(s.ok());
2181 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
2182 kMetadataKey1, kMetadataVal1));
2183 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
2184 kMetadataKey2, kMetadataVal2));
2185 EXPECT_EQ(context.credentials()->DebugString(),
2186 kExpectedCompositeCallCredsDebugString);
2187 }
2188
TEST_P(SecureEnd2endTest,ClientAuthContext)2189 TEST_P(SecureEnd2endTest, ClientAuthContext) {
2190 ResetStub();
2191 EchoRequest request;
2192 EchoResponse response;
2193 request.set_message("Hello");
2194 request.mutable_param()->set_check_auth_context(
2195 GetParam().credentials_type() == kTlsCredentialsType);
2196 request.mutable_param()->set_expected_transport_security_type(
2197 GetParam().credentials_type());
2198 ClientContext context;
2199 Status s = stub_->Echo(&context, request, &response);
2200 EXPECT_EQ(response.message(), request.message());
2201 EXPECT_TRUE(s.ok());
2202
2203 std::shared_ptr<const AuthContext> auth_ctx = context.auth_context();
2204 std::vector<grpc::string_ref> tst =
2205 auth_ctx->FindPropertyValues("transport_security_type");
2206 ASSERT_EQ(1u, tst.size());
2207 EXPECT_EQ(GetParam().credentials_type(), ToString(tst[0]));
2208 if (GetParam().credentials_type() == kTlsCredentialsType) {
2209 EXPECT_EQ("x509_subject_alternative_name",
2210 auth_ctx->GetPeerIdentityPropertyName());
2211 EXPECT_EQ(4u, auth_ctx->GetPeerIdentity().size());
2212 EXPECT_EQ("*.test.google.fr", ToString(auth_ctx->GetPeerIdentity()[0]));
2213 EXPECT_EQ("waterzooi.test.google.be",
2214 ToString(auth_ctx->GetPeerIdentity()[1]));
2215 EXPECT_EQ("*.test.youtube.com", ToString(auth_ctx->GetPeerIdentity()[2]));
2216 EXPECT_EQ("192.168.1.3", ToString(auth_ctx->GetPeerIdentity()[3]));
2217 }
2218 }
2219
2220 class ResourceQuotaEnd2endTest : public End2endTest {
2221 public:
ResourceQuotaEnd2endTest()2222 ResourceQuotaEnd2endTest()
2223 : server_resource_quota_("server_resource_quota") {}
2224
ConfigureServerBuilder(ServerBuilder * builder)2225 void ConfigureServerBuilder(ServerBuilder* builder) override {
2226 builder->SetResourceQuota(server_resource_quota_);
2227 }
2228
2229 private:
2230 ResourceQuota server_resource_quota_;
2231 };
2232
TEST_P(ResourceQuotaEnd2endTest,SimpleRequest)2233 TEST_P(ResourceQuotaEnd2endTest, SimpleRequest) {
2234 ResetStub();
2235
2236 EchoRequest request;
2237 EchoResponse response;
2238 request.set_message("Hello");
2239
2240 ClientContext context;
2241 Status s = stub_->Echo(&context, request, &response);
2242 EXPECT_EQ(response.message(), request.message());
2243 EXPECT_TRUE(s.ok());
2244 }
2245
2246 // TODO(vjpai): refactor arguments into a struct if it makes sense
CreateTestScenarios(bool use_proxy,bool test_insecure,bool test_secure,bool test_inproc,bool test_callback_server)2247 std::vector<TestScenario> CreateTestScenarios(bool use_proxy,
2248 bool test_insecure,
2249 bool test_secure,
2250 bool test_inproc,
2251 bool test_callback_server) {
2252 std::vector<TestScenario> scenarios;
2253 std::vector<std::string> credentials_types;
2254
2255 grpc_core::ConfigVars::Overrides overrides;
2256 overrides.client_channel_backup_poll_interval_ms =
2257 kClientChannelBackupPollIntervalMs;
2258 grpc_core::ConfigVars::SetOverrides(overrides);
2259 #if TARGET_OS_IPHONE
2260 // Workaround Apple CFStream bug
2261 grpc_core::SetEnv("grpc_cfstream", "0");
2262 #endif
2263
2264 if (test_secure) {
2265 credentials_types =
2266 GetCredentialsProvider()->GetSecureCredentialsTypeList();
2267 }
2268 auto insec_ok = [] {
2269 // Only allow insecure credentials type when it is registered with the
2270 // provider. User may create providers that do not have insecure.
2271 return GetCredentialsProvider()->GetChannelCredentials(
2272 kInsecureCredentialsType, nullptr) != nullptr;
2273 };
2274 if (test_insecure && insec_ok()) {
2275 credentials_types.push_back(kInsecureCredentialsType);
2276 }
2277
2278 // Test callback with inproc or if the event-engine allows it
2279 CHECK(!credentials_types.empty());
2280 for (const auto& cred : credentials_types) {
2281 scenarios.emplace_back(false, false, false, cred, false);
2282 scenarios.emplace_back(true, false, false, cred, false);
2283 if (test_callback_server) {
2284 // Note that these scenarios will be dynamically disabled if the event
2285 // engine doesn't run in the background
2286 scenarios.emplace_back(false, false, false, cred, true);
2287 scenarios.emplace_back(true, false, false, cred, true);
2288 }
2289 if (use_proxy) {
2290 scenarios.emplace_back(false, true, false, cred, false);
2291 scenarios.emplace_back(true, true, false, cred, false);
2292 }
2293 }
2294 if (test_inproc && insec_ok()) {
2295 scenarios.emplace_back(false, false, true, kInsecureCredentialsType, false);
2296 scenarios.emplace_back(true, false, true, kInsecureCredentialsType, false);
2297 if (test_callback_server) {
2298 scenarios.emplace_back(false, false, true, kInsecureCredentialsType,
2299 true);
2300 scenarios.emplace_back(true, false, true, kInsecureCredentialsType, true);
2301 }
2302 }
2303 return scenarios;
2304 }
2305
2306 INSTANTIATE_TEST_SUITE_P(
2307 End2end, End2endTest,
2308 ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)),
2309 &TestScenario::Name);
2310
2311 INSTANTIATE_TEST_SUITE_P(
2312 End2endServerTryCancel, End2endServerTryCancelTest,
2313 ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)),
2314 &TestScenario::Name);
2315
2316 INSTANTIATE_TEST_SUITE_P(
2317 ProxyEnd2end, ProxyEnd2endTest,
2318 ::testing::ValuesIn(CreateTestScenarios(true, true, true, true, true)),
2319 &TestScenario::Name);
2320
2321 INSTANTIATE_TEST_SUITE_P(
2322 SecureEnd2end, SecureEnd2endTest,
2323 ::testing::ValuesIn(CreateTestScenarios(false, false, true, false, true)),
2324 &TestScenario::Name);
2325
2326 INSTANTIATE_TEST_SUITE_P(
2327 ResourceQuotaEnd2end, ResourceQuotaEnd2endTest,
2328 ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)),
2329 &TestScenario::Name);
2330
2331 } // namespace
2332 } // namespace testing
2333 } // namespace grpc
2334
main(int argc,char ** argv)2335 int main(int argc, char** argv) {
2336 grpc::testing::TestEnvironment env(&argc, argv);
2337 ::testing::InitGoogleTest(&argc, argv);
2338 int ret = RUN_ALL_TESTS();
2339 return ret;
2340 }
2341