1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <mutex>
20 #include <thread>
21
22 #include <grpc/grpc.h>
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/log.h>
25 #include <grpc/support/time.h>
26 #include <grpcpp/channel.h>
27 #include <grpcpp/client_context.h>
28 #include <grpcpp/create_channel.h>
29 #include <grpcpp/resource_quota.h>
30 #include <grpcpp/security/auth_metadata_processor.h>
31 #include <grpcpp/security/credentials.h>
32 #include <grpcpp/security/server_credentials.h>
33 #include <grpcpp/server.h>
34 #include <grpcpp/server_builder.h>
35 #include <grpcpp/server_context.h>
36
37 #include "src/core/lib/gpr/env.h"
38 #include "src/core/lib/security/credentials/credentials.h"
39 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
40 #include "src/proto/grpc/testing/echo.grpc.pb.h"
41 #include "test/core/util/port.h"
42 #include "test/core/util/test_config.h"
43 #include "test/cpp/end2end/test_service_impl.h"
44 #include "test/cpp/util/string_ref_helper.h"
45 #include "test/cpp/util/test_credentials_provider.h"
46
47 #include <gtest/gtest.h>
48
49 using grpc::testing::EchoRequest;
50 using grpc::testing::EchoResponse;
51 using grpc::testing::kTlsCredentialsType;
52 using std::chrono::system_clock;
53
54 namespace grpc {
55 namespace testing {
56 namespace {
57
CheckIsLocalhost(const grpc::string & addr)58 bool CheckIsLocalhost(const grpc::string& addr) {
59 const grpc::string kIpv6("ipv6:[::1]:");
60 const grpc::string kIpv4MappedIpv6("ipv6:[::ffff:127.0.0.1]:");
61 const grpc::string kIpv4("ipv4:127.0.0.1:");
62 return addr.substr(0, kIpv4.size()) == kIpv4 ||
63 addr.substr(0, kIpv4MappedIpv6.size()) == kIpv4MappedIpv6 ||
64 addr.substr(0, kIpv6.size()) == kIpv6;
65 }
66
67 const char kTestCredsPluginErrorMsg[] = "Could not find plugin metadata.";
68
69 class TestMetadataCredentialsPlugin : public MetadataCredentialsPlugin {
70 public:
71 static const char kGoodMetadataKey[];
72 static const char kBadMetadataKey[];
73
TestMetadataCredentialsPlugin(const grpc::string_ref & metadata_key,const grpc::string_ref & metadata_value,bool is_blocking,bool is_successful)74 TestMetadataCredentialsPlugin(const grpc::string_ref& metadata_key,
75 const grpc::string_ref& metadata_value,
76 bool is_blocking, bool is_successful)
77 : metadata_key_(metadata_key.data(), metadata_key.length()),
78 metadata_value_(metadata_value.data(), metadata_value.length()),
79 is_blocking_(is_blocking),
80 is_successful_(is_successful) {}
81
IsBlocking() const82 bool IsBlocking() const override { return is_blocking_; }
83
GetMetadata(grpc::string_ref service_url,grpc::string_ref method_name,const grpc::AuthContext & channel_auth_context,std::multimap<grpc::string,grpc::string> * metadata)84 Status GetMetadata(
85 grpc::string_ref service_url, grpc::string_ref method_name,
86 const grpc::AuthContext& channel_auth_context,
87 std::multimap<grpc::string, grpc::string>* metadata) override {
88 EXPECT_GT(service_url.length(), 0UL);
89 EXPECT_GT(method_name.length(), 0UL);
90 EXPECT_TRUE(channel_auth_context.IsPeerAuthenticated());
91 EXPECT_TRUE(metadata != nullptr);
92 if (is_successful_) {
93 metadata->insert(std::make_pair(metadata_key_, metadata_value_));
94 return Status::OK;
95 } else {
96 return Status(StatusCode::NOT_FOUND, kTestCredsPluginErrorMsg);
97 }
98 }
99
100 private:
101 grpc::string metadata_key_;
102 grpc::string metadata_value_;
103 bool is_blocking_;
104 bool is_successful_;
105 };
106
107 const char TestMetadataCredentialsPlugin::kBadMetadataKey[] =
108 "TestPluginMetadata";
109 const char TestMetadataCredentialsPlugin::kGoodMetadataKey[] =
110 "test-plugin-metadata";
111
112 class TestAuthMetadataProcessor : public AuthMetadataProcessor {
113 public:
114 static const char kGoodGuy[];
115
TestAuthMetadataProcessor(bool is_blocking)116 TestAuthMetadataProcessor(bool is_blocking) : is_blocking_(is_blocking) {}
117
GetCompatibleClientCreds()118 std::shared_ptr<CallCredentials> GetCompatibleClientCreds() {
119 return MetadataCredentialsFromPlugin(
120 std::unique_ptr<MetadataCredentialsPlugin>(
121 new TestMetadataCredentialsPlugin(
122 TestMetadataCredentialsPlugin::kGoodMetadataKey, kGoodGuy,
123 is_blocking_, true)));
124 }
125
GetIncompatibleClientCreds()126 std::shared_ptr<CallCredentials> GetIncompatibleClientCreds() {
127 return MetadataCredentialsFromPlugin(
128 std::unique_ptr<MetadataCredentialsPlugin>(
129 new TestMetadataCredentialsPlugin(
130 TestMetadataCredentialsPlugin::kGoodMetadataKey, "Mr Hyde",
131 is_blocking_, true)));
132 }
133
134 // Interface implementation
IsBlocking() const135 bool IsBlocking() const override { return is_blocking_; }
136
Process(const InputMetadata & auth_metadata,AuthContext * context,OutputMetadata * consumed_auth_metadata,OutputMetadata * response_metadata)137 Status Process(const InputMetadata& auth_metadata, AuthContext* context,
138 OutputMetadata* consumed_auth_metadata,
139 OutputMetadata* response_metadata) override {
140 EXPECT_TRUE(consumed_auth_metadata != nullptr);
141 EXPECT_TRUE(context != nullptr);
142 EXPECT_TRUE(response_metadata != nullptr);
143 auto auth_md =
144 auth_metadata.find(TestMetadataCredentialsPlugin::kGoodMetadataKey);
145 EXPECT_NE(auth_md, auth_metadata.end());
146 string_ref auth_md_value = auth_md->second;
147 if (auth_md_value == kGoodGuy) {
148 context->AddProperty(kIdentityPropName, kGoodGuy);
149 context->SetPeerIdentityPropertyName(kIdentityPropName);
150 consumed_auth_metadata->insert(std::make_pair(
151 string(auth_md->first.data(), auth_md->first.length()),
152 string(auth_md->second.data(), auth_md->second.length())));
153 return Status::OK;
154 } else {
155 return Status(StatusCode::UNAUTHENTICATED,
156 string("Invalid principal: ") +
157 string(auth_md_value.data(), auth_md_value.length()));
158 }
159 }
160
161 private:
162 static const char kIdentityPropName[];
163 bool is_blocking_;
164 };
165
166 const char TestAuthMetadataProcessor::kGoodGuy[] = "Dr Jekyll";
167 const char TestAuthMetadataProcessor::kIdentityPropName[] = "novel identity";
168
169 class Proxy : public ::grpc::testing::EchoTestService::Service {
170 public:
Proxy(const std::shared_ptr<Channel> & channel)171 Proxy(const std::shared_ptr<Channel>& channel)
172 : stub_(grpc::testing::EchoTestService::NewStub(channel)) {}
173
Echo(ServerContext * server_context,const EchoRequest * request,EchoResponse * response)174 Status Echo(ServerContext* server_context, const EchoRequest* request,
175 EchoResponse* response) override {
176 std::unique_ptr<ClientContext> client_context =
177 ClientContext::FromServerContext(*server_context);
178 return stub_->Echo(client_context.get(), *request, response);
179 }
180
181 private:
182 std::unique_ptr< ::grpc::testing::EchoTestService::Stub> stub_;
183 };
184
185 class TestServiceImplDupPkg
186 : public ::grpc::testing::duplicate::EchoTestService::Service {
187 public:
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)188 Status Echo(ServerContext* context, const EchoRequest* request,
189 EchoResponse* response) override {
190 response->set_message("no package");
191 return Status::OK;
192 }
193 };
194
195 class TestScenario {
196 public:
TestScenario(bool proxy,bool inproc_stub,const grpc::string & creds_type)197 TestScenario(bool proxy, bool inproc_stub, const grpc::string& creds_type)
198 : use_proxy(proxy), inproc(inproc_stub), credentials_type(creds_type) {}
199 void Log() const;
200 bool use_proxy;
201 bool inproc;
202 const grpc::string credentials_type;
203 };
204
operator <<(std::ostream & out,const TestScenario & scenario)205 static std::ostream& operator<<(std::ostream& out,
206 const TestScenario& scenario) {
207 return out << "TestScenario{use_proxy="
208 << (scenario.use_proxy ? "true" : "false")
209 << ", inproc=" << (scenario.inproc ? "true" : "false")
210 << ", credentials='" << scenario.credentials_type << "'}";
211 }
212
Log() const213 void TestScenario::Log() const {
214 std::ostringstream out;
215 out << *this;
216 gpr_log(GPR_DEBUG, "%s", out.str().c_str());
217 }
218
219 class End2endTest : public ::testing::TestWithParam<TestScenario> {
220 protected:
End2endTest()221 End2endTest()
222 : is_server_started_(false),
223 kMaxMessageSize_(8192),
224 special_service_("special"),
225 first_picked_port_(0) {
226 GetParam().Log();
227 }
228
TearDown()229 void TearDown() override {
230 if (is_server_started_) {
231 server_->Shutdown();
232 if (proxy_server_) proxy_server_->Shutdown();
233 }
234 if (first_picked_port_ > 0) {
235 grpc_recycle_unused_port(first_picked_port_);
236 }
237 }
238
StartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)239 void StartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
240 int port = grpc_pick_unused_port_or_die();
241 first_picked_port_ = port;
242 server_address_ << "127.0.0.1:" << port;
243 // Setup server
244 BuildAndStartServer(processor);
245 }
246
RestartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)247 void RestartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
248 if (is_server_started_) {
249 server_->Shutdown();
250 BuildAndStartServer(processor);
251 }
252 }
253
BuildAndStartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)254 void BuildAndStartServer(
255 const std::shared_ptr<AuthMetadataProcessor>& processor) {
256 ServerBuilder builder;
257 ConfigureServerBuilder(&builder);
258 auto server_creds = GetCredentialsProvider()->GetServerCredentials(
259 GetParam().credentials_type);
260 if (GetParam().credentials_type != kInsecureCredentialsType) {
261 server_creds->SetAuthMetadataProcessor(processor);
262 }
263 builder.AddListeningPort(server_address_.str(), server_creds);
264 builder.RegisterService(&service_);
265 builder.RegisterService("foo.test.youtube.com", &special_service_);
266 builder.RegisterService(&dup_pkg_service_);
267
268 builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
269 builder.SetSyncServerOption(
270 ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
271
272 server_ = builder.BuildAndStart();
273 is_server_started_ = true;
274 }
275
ConfigureServerBuilder(ServerBuilder * builder)276 virtual void ConfigureServerBuilder(ServerBuilder* builder) {
277 builder->SetMaxMessageSize(
278 kMaxMessageSize_); // For testing max message size.
279 }
280
ResetChannel()281 void ResetChannel() {
282 if (!is_server_started_) {
283 StartServer(std::shared_ptr<AuthMetadataProcessor>());
284 }
285 EXPECT_TRUE(is_server_started_);
286 ChannelArguments args;
287 auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
288 GetParam().credentials_type, &args);
289 if (!user_agent_prefix_.empty()) {
290 args.SetUserAgentPrefix(user_agent_prefix_);
291 }
292 args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test");
293
294 if (!GetParam().inproc) {
295 channel_ =
296 CreateCustomChannel(server_address_.str(), channel_creds, args);
297 } else {
298 channel_ = server_->InProcessChannel(args);
299 }
300 }
301
ResetStub()302 void ResetStub() {
303 ResetChannel();
304 if (GetParam().use_proxy) {
305 proxy_service_.reset(new Proxy(channel_));
306 int port = grpc_pick_unused_port_or_die();
307 std::ostringstream proxyaddr;
308 proxyaddr << "localhost:" << port;
309 ServerBuilder builder;
310 builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials());
311 builder.RegisterService(proxy_service_.get());
312
313 builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
314 builder.SetSyncServerOption(
315 ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
316
317 proxy_server_ = builder.BuildAndStart();
318
319 channel_ = CreateChannel(proxyaddr.str(), InsecureChannelCredentials());
320 }
321
322 stub_ = grpc::testing::EchoTestService::NewStub(channel_);
323 }
324
325 bool is_server_started_;
326 std::shared_ptr<Channel> channel_;
327 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
328 std::unique_ptr<Server> server_;
329 std::unique_ptr<Server> proxy_server_;
330 std::unique_ptr<Proxy> proxy_service_;
331 std::ostringstream server_address_;
332 const int kMaxMessageSize_;
333 TestServiceImpl service_;
334 TestServiceImpl special_service_;
335 TestServiceImplDupPkg dup_pkg_service_;
336 grpc::string user_agent_prefix_;
337 int first_picked_port_;
338 };
339
SendRpc(grpc::testing::EchoTestService::Stub * stub,int num_rpcs,bool with_binary_metadata)340 static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
341 bool with_binary_metadata) {
342 EchoRequest request;
343 EchoResponse response;
344 request.set_message("Hello hello hello hello");
345
346 for (int i = 0; i < num_rpcs; ++i) {
347 ClientContext context;
348 if (with_binary_metadata) {
349 char bytes[8] = {'\0', '\1', '\2', '\3',
350 '\4', '\5', '\6', static_cast<char>(i)};
351 context.AddMetadata("custom-bin", grpc::string(bytes, 8));
352 }
353 context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
354 Status s = stub->Echo(&context, request, &response);
355 EXPECT_EQ(response.message(), request.message());
356 EXPECT_TRUE(s.ok());
357 }
358 }
359
360 // This class is for testing scenarios where RPCs are cancelled on the server
361 // by calling ServerContext::TryCancel()
362 class End2endServerTryCancelTest : public End2endTest {
363 protected:
364 // Helper for testing client-streaming RPCs which are cancelled on the server.
365 // Depending on the value of server_try_cancel parameter, this will test one
366 // of the following three scenarios:
367 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
368 // any messages from the client
369 //
370 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
371 // messages from the client
372 //
373 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
374 // the messages from the client
375 //
376 // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestRequestStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,int num_msgs_to_send)377 void TestRequestStreamServerCancel(
378 ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) {
379 ResetStub();
380 EchoRequest request;
381 EchoResponse response;
382 ClientContext context;
383
384 // Send server_try_cancel value in the client metadata
385 context.AddMetadata(kServerTryCancelRequest,
386 grpc::to_string(server_try_cancel));
387
388 auto stream = stub_->RequestStream(&context, &response);
389
390 int num_msgs_sent = 0;
391 while (num_msgs_sent < num_msgs_to_send) {
392 request.set_message("hello");
393 if (!stream->Write(request)) {
394 break;
395 }
396 num_msgs_sent++;
397 }
398 gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
399
400 stream->WritesDone();
401 Status s = stream->Finish();
402
403 // At this point, we know for sure that RPC was cancelled by the server
404 // since we passed server_try_cancel value in the metadata. Depending on the
405 // value of server_try_cancel, the RPC might have been cancelled by the
406 // server at different stages. The following validates our expectations of
407 // number of messages sent in various cancellation scenarios:
408
409 switch (server_try_cancel) {
410 case CANCEL_BEFORE_PROCESSING:
411 case CANCEL_DURING_PROCESSING:
412 // If the RPC is cancelled by server before / during messages from the
413 // client, it means that the client most likely did not get a chance to
414 // send all the messages it wanted to send. i.e num_msgs_sent <=
415 // num_msgs_to_send
416 EXPECT_LE(num_msgs_sent, num_msgs_to_send);
417 break;
418
419 case CANCEL_AFTER_PROCESSING:
420 // If the RPC was cancelled after all messages were read by the server,
421 // the client did get a chance to send all its messages
422 EXPECT_EQ(num_msgs_sent, num_msgs_to_send);
423 break;
424
425 default:
426 gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
427 server_try_cancel);
428 EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
429 server_try_cancel <= CANCEL_AFTER_PROCESSING);
430 break;
431 }
432
433 EXPECT_FALSE(s.ok());
434 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
435 }
436
437 // Helper for testing server-streaming RPCs which are cancelled on the server.
438 // Depending on the value of server_try_cancel parameter, this will test one
439 // of the following three scenarios:
440 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before writing
441 // any messages to the client
442 //
443 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while writing
444 // messages to the client
445 //
446 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after writing all
447 // the messages to the client
448 //
449 // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestResponseStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel)450 void TestResponseStreamServerCancel(
451 ServerTryCancelRequestPhase server_try_cancel) {
452 ResetStub();
453 EchoRequest request;
454 EchoResponse response;
455 ClientContext context;
456
457 // Send server_try_cancel in the client metadata
458 context.AddMetadata(kServerTryCancelRequest,
459 grpc::to_string(server_try_cancel));
460
461 request.set_message("hello");
462 auto stream = stub_->ResponseStream(&context, request);
463
464 int num_msgs_read = 0;
465 while (num_msgs_read < kServerDefaultResponseStreamsToSend) {
466 if (!stream->Read(&response)) {
467 break;
468 }
469 EXPECT_EQ(response.message(),
470 request.message() + grpc::to_string(num_msgs_read));
471 num_msgs_read++;
472 }
473 gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
474
475 Status s = stream->Finish();
476
477 // Depending on the value of server_try_cancel, the RPC might have been
478 // cancelled by the server at different stages. The following validates our
479 // expectations of number of messages read in various cancellation
480 // scenarios:
481 switch (server_try_cancel) {
482 case CANCEL_BEFORE_PROCESSING:
483 // Server cancelled before sending any messages. Which means the client
484 // wouldn't have read any
485 EXPECT_EQ(num_msgs_read, 0);
486 break;
487
488 case CANCEL_DURING_PROCESSING:
489 // Server cancelled while writing messages. Client must have read less
490 // than or equal to the expected number of messages
491 EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
492 break;
493
494 case CANCEL_AFTER_PROCESSING:
495 // Even though the Server cancelled after writing all messages, the RPC
496 // may be cancelled before the Client got a chance to read all the
497 // messages.
498 EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
499 break;
500
501 default: {
502 gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
503 server_try_cancel);
504 EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
505 server_try_cancel <= CANCEL_AFTER_PROCESSING);
506 break;
507 }
508 }
509
510 EXPECT_FALSE(s.ok());
511 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
512 }
513
514 // Helper for testing bidirectional-streaming RPCs which are cancelled on the
515 // server. Depending on the value of server_try_cancel parameter, this will
516 // test one of the following three scenarios:
517 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
518 // writing any messages from/to the client
519 //
520 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading/
521 // writing messages from/to the client
522 //
523 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading/writing
524 // all the messages from/to the client
525 //
526 // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,int num_messages)527 void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,
528 int num_messages) {
529 ResetStub();
530 EchoRequest request;
531 EchoResponse response;
532 ClientContext context;
533
534 // Send server_try_cancel in the client metadata
535 context.AddMetadata(kServerTryCancelRequest,
536 grpc::to_string(server_try_cancel));
537
538 auto stream = stub_->BidiStream(&context);
539
540 int num_msgs_read = 0;
541 int num_msgs_sent = 0;
542 while (num_msgs_sent < num_messages) {
543 request.set_message("hello " + grpc::to_string(num_msgs_sent));
544 if (!stream->Write(request)) {
545 break;
546 }
547 num_msgs_sent++;
548
549 if (!stream->Read(&response)) {
550 break;
551 }
552 num_msgs_read++;
553
554 EXPECT_EQ(response.message(), request.message());
555 }
556 gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
557 gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
558
559 stream->WritesDone();
560 Status s = stream->Finish();
561
562 // Depending on the value of server_try_cancel, the RPC might have been
563 // cancelled by the server at different stages. The following validates our
564 // expectations of number of messages read in various cancellation
565 // scenarios:
566 switch (server_try_cancel) {
567 case CANCEL_BEFORE_PROCESSING:
568 EXPECT_EQ(num_msgs_read, 0);
569 break;
570
571 case CANCEL_DURING_PROCESSING:
572 EXPECT_LE(num_msgs_sent, num_messages);
573 EXPECT_LE(num_msgs_read, num_msgs_sent);
574 break;
575
576 case CANCEL_AFTER_PROCESSING:
577 EXPECT_EQ(num_msgs_sent, num_messages);
578
579 // The Server cancelled after reading the last message and after writing
580 // the message to the client. However, the RPC cancellation might have
581 // taken effect before the client actually read the response.
582 EXPECT_LE(num_msgs_read, num_msgs_sent);
583 break;
584
585 default:
586 gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
587 server_try_cancel);
588 EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
589 server_try_cancel <= CANCEL_AFTER_PROCESSING);
590 break;
591 }
592
593 EXPECT_FALSE(s.ok());
594 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
595 }
596 };
597
TEST_P(End2endServerTryCancelTest,RequestEchoServerCancel)598 TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) {
599 ResetStub();
600 EchoRequest request;
601 EchoResponse response;
602 ClientContext context;
603
604 context.AddMetadata(kServerTryCancelRequest,
605 grpc::to_string(CANCEL_BEFORE_PROCESSING));
606 Status s = stub_->Echo(&context, request, &response);
607 EXPECT_FALSE(s.ok());
608 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
609 }
610
611 // Server to cancel before doing reading the request
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelBeforeReads)612 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelBeforeReads) {
613 TestRequestStreamServerCancel(CANCEL_BEFORE_PROCESSING, 1);
614 }
615
616 // Server to cancel while reading a request from the stream in parallel
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelDuringRead)617 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelDuringRead) {
618 TestRequestStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
619 }
620
621 // Server to cancel after reading all the requests but before returning to the
622 // client
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelAfterReads)623 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelAfterReads) {
624 TestRequestStreamServerCancel(CANCEL_AFTER_PROCESSING, 4);
625 }
626
627 // Server to cancel before sending any response messages
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelBefore)628 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelBefore) {
629 TestResponseStreamServerCancel(CANCEL_BEFORE_PROCESSING);
630 }
631
632 // Server to cancel while writing a response to the stream in parallel
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelDuring)633 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelDuring) {
634 TestResponseStreamServerCancel(CANCEL_DURING_PROCESSING);
635 }
636
637 // Server to cancel after writing all the respones to the stream but before
638 // returning to the client
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelAfter)639 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelAfter) {
640 TestResponseStreamServerCancel(CANCEL_AFTER_PROCESSING);
641 }
642
643 // Server to cancel before reading/writing any requests/responses on the stream
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelBefore)644 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelBefore) {
645 TestBidiStreamServerCancel(CANCEL_BEFORE_PROCESSING, 2);
646 }
647
648 // Server to cancel while reading/writing requests/responses on the stream in
649 // parallel
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelDuring)650 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelDuring) {
651 TestBidiStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
652 }
653
654 // Server to cancel after reading/writing all requests/responses on the stream
655 // but before returning to the client
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelAfter)656 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) {
657 TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5);
658 }
659
TEST_P(End2endTest,SimpleRpcWithCustomUserAgentPrefix)660 TEST_P(End2endTest, SimpleRpcWithCustomUserAgentPrefix) {
661 // User-Agent is an HTTP header for HTTP transports only
662 if (GetParam().inproc) {
663 return;
664 }
665 user_agent_prefix_ = "custom_prefix";
666 ResetStub();
667 EchoRequest request;
668 EchoResponse response;
669 request.set_message("Hello hello hello hello");
670 request.mutable_param()->set_echo_metadata(true);
671
672 ClientContext context;
673 Status s = stub_->Echo(&context, request, &response);
674 EXPECT_EQ(response.message(), request.message());
675 EXPECT_TRUE(s.ok());
676 const auto& trailing_metadata = context.GetServerTrailingMetadata();
677 auto iter = trailing_metadata.find("user-agent");
678 EXPECT_TRUE(iter != trailing_metadata.end());
679 grpc::string expected_prefix = user_agent_prefix_ + " grpc-c++/";
680 EXPECT_TRUE(iter->second.starts_with(expected_prefix)) << iter->second;
681 }
682
TEST_P(End2endTest,MultipleRpcsWithVariedBinaryMetadataValue)683 TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) {
684 ResetStub();
685 std::vector<std::thread> threads;
686 threads.reserve(10);
687 for (int i = 0; i < 10; ++i) {
688 threads.emplace_back(SendRpc, stub_.get(), 10, true);
689 }
690 for (int i = 0; i < 10; ++i) {
691 threads[i].join();
692 }
693 }
694
TEST_P(End2endTest,MultipleRpcs)695 TEST_P(End2endTest, MultipleRpcs) {
696 ResetStub();
697 std::vector<std::thread> threads;
698 threads.reserve(10);
699 for (int i = 0; i < 10; ++i) {
700 threads.emplace_back(SendRpc, stub_.get(), 10, false);
701 }
702 for (int i = 0; i < 10; ++i) {
703 threads[i].join();
704 }
705 }
706
TEST_P(End2endTest,ReconnectChannel)707 TEST_P(End2endTest, ReconnectChannel) {
708 if (GetParam().inproc) {
709 return;
710 }
711 int poller_slowdown_factor = 1;
712 // It needs 2 pollset_works to reconnect the channel with polling engine
713 // "poll"
714 char* s = gpr_getenv("GRPC_POLL_STRATEGY");
715 if (s != nullptr && 0 == strcmp(s, "poll")) {
716 poller_slowdown_factor = 2;
717 }
718 gpr_free(s);
719 ResetStub();
720 SendRpc(stub_.get(), 1, false);
721 RestartServer(std::shared_ptr<AuthMetadataProcessor>());
722 // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
723 // reconnect the channel.
724 gpr_sleep_until(gpr_time_add(
725 gpr_now(GPR_CLOCK_REALTIME),
726 gpr_time_from_millis(
727 300 * poller_slowdown_factor * grpc_test_slowdown_factor(),
728 GPR_TIMESPAN)));
729 SendRpc(stub_.get(), 1, false);
730 }
731
TEST_P(End2endTest,RequestStreamOneRequest)732 TEST_P(End2endTest, RequestStreamOneRequest) {
733 ResetStub();
734 EchoRequest request;
735 EchoResponse response;
736 ClientContext context;
737
738 auto stream = stub_->RequestStream(&context, &response);
739 request.set_message("hello");
740 EXPECT_TRUE(stream->Write(request));
741 stream->WritesDone();
742 Status s = stream->Finish();
743 EXPECT_EQ(response.message(), request.message());
744 EXPECT_TRUE(s.ok());
745 EXPECT_TRUE(context.debug_error_string().empty());
746 }
747
TEST_P(End2endTest,RequestStreamOneRequestWithCoalescingApi)748 TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) {
749 ResetStub();
750 EchoRequest request;
751 EchoResponse response;
752 ClientContext context;
753
754 context.set_initial_metadata_corked(true);
755 auto stream = stub_->RequestStream(&context, &response);
756 request.set_message("hello");
757 stream->WriteLast(request, WriteOptions());
758 Status s = stream->Finish();
759 EXPECT_EQ(response.message(), request.message());
760 EXPECT_TRUE(s.ok());
761 }
762
TEST_P(End2endTest,RequestStreamTwoRequests)763 TEST_P(End2endTest, RequestStreamTwoRequests) {
764 ResetStub();
765 EchoRequest request;
766 EchoResponse response;
767 ClientContext context;
768
769 auto stream = stub_->RequestStream(&context, &response);
770 request.set_message("hello");
771 EXPECT_TRUE(stream->Write(request));
772 EXPECT_TRUE(stream->Write(request));
773 stream->WritesDone();
774 Status s = stream->Finish();
775 EXPECT_EQ(response.message(), "hellohello");
776 EXPECT_TRUE(s.ok());
777 }
778
TEST_P(End2endTest,RequestStreamTwoRequestsWithWriteThrough)779 TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) {
780 ResetStub();
781 EchoRequest request;
782 EchoResponse response;
783 ClientContext context;
784
785 auto stream = stub_->RequestStream(&context, &response);
786 request.set_message("hello");
787 EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
788 EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
789 stream->WritesDone();
790 Status s = stream->Finish();
791 EXPECT_EQ(response.message(), "hellohello");
792 EXPECT_TRUE(s.ok());
793 }
794
TEST_P(End2endTest,RequestStreamTwoRequestsWithCoalescingApi)795 TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) {
796 ResetStub();
797 EchoRequest request;
798 EchoResponse response;
799 ClientContext context;
800
801 context.set_initial_metadata_corked(true);
802 auto stream = stub_->RequestStream(&context, &response);
803 request.set_message("hello");
804 EXPECT_TRUE(stream->Write(request));
805 stream->WriteLast(request, WriteOptions());
806 Status s = stream->Finish();
807 EXPECT_EQ(response.message(), "hellohello");
808 EXPECT_TRUE(s.ok());
809 }
810
TEST_P(End2endTest,ResponseStream)811 TEST_P(End2endTest, ResponseStream) {
812 ResetStub();
813 EchoRequest request;
814 EchoResponse response;
815 ClientContext context;
816 request.set_message("hello");
817
818 auto stream = stub_->ResponseStream(&context, request);
819 for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
820 EXPECT_TRUE(stream->Read(&response));
821 EXPECT_EQ(response.message(), request.message() + grpc::to_string(i));
822 }
823 EXPECT_FALSE(stream->Read(&response));
824
825 Status s = stream->Finish();
826 EXPECT_TRUE(s.ok());
827 }
828
TEST_P(End2endTest,ResponseStreamWithCoalescingApi)829 TEST_P(End2endTest, ResponseStreamWithCoalescingApi) {
830 ResetStub();
831 EchoRequest request;
832 EchoResponse response;
833 ClientContext context;
834 request.set_message("hello");
835 context.AddMetadata(kServerUseCoalescingApi, "1");
836
837 auto stream = stub_->ResponseStream(&context, request);
838 for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
839 EXPECT_TRUE(stream->Read(&response));
840 EXPECT_EQ(response.message(), request.message() + grpc::to_string(i));
841 }
842 EXPECT_FALSE(stream->Read(&response));
843
844 Status s = stream->Finish();
845 EXPECT_TRUE(s.ok());
846 }
847
848 // This was added to prevent regression from issue:
849 // https://github.com/grpc/grpc/issues/11546
TEST_P(End2endTest,ResponseStreamWithEverythingCoalesced)850 TEST_P(End2endTest, ResponseStreamWithEverythingCoalesced) {
851 ResetStub();
852 EchoRequest request;
853 EchoResponse response;
854 ClientContext context;
855 request.set_message("hello");
856 context.AddMetadata(kServerUseCoalescingApi, "1");
857 // We will only send one message, forcing everything (init metadata, message,
858 // trailing) to be coalesced together.
859 context.AddMetadata(kServerResponseStreamsToSend, "1");
860
861 auto stream = stub_->ResponseStream(&context, request);
862 EXPECT_TRUE(stream->Read(&response));
863 EXPECT_EQ(response.message(), request.message() + "0");
864
865 EXPECT_FALSE(stream->Read(&response));
866
867 Status s = stream->Finish();
868 EXPECT_TRUE(s.ok());
869 }
870
TEST_P(End2endTest,BidiStream)871 TEST_P(End2endTest, BidiStream) {
872 ResetStub();
873 EchoRequest request;
874 EchoResponse response;
875 ClientContext context;
876 grpc::string msg("hello");
877
878 auto stream = stub_->BidiStream(&context);
879
880 for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
881 request.set_message(msg + grpc::to_string(i));
882 EXPECT_TRUE(stream->Write(request));
883 EXPECT_TRUE(stream->Read(&response));
884 EXPECT_EQ(response.message(), request.message());
885 }
886
887 stream->WritesDone();
888 EXPECT_FALSE(stream->Read(&response));
889 EXPECT_FALSE(stream->Read(&response));
890
891 Status s = stream->Finish();
892 EXPECT_TRUE(s.ok());
893 }
894
TEST_P(End2endTest,BidiStreamWithCoalescingApi)895 TEST_P(End2endTest, BidiStreamWithCoalescingApi) {
896 ResetStub();
897 EchoRequest request;
898 EchoResponse response;
899 ClientContext context;
900 context.AddMetadata(kServerFinishAfterNReads, "3");
901 context.set_initial_metadata_corked(true);
902 grpc::string msg("hello");
903
904 auto stream = stub_->BidiStream(&context);
905
906 request.set_message(msg + "0");
907 EXPECT_TRUE(stream->Write(request));
908 EXPECT_TRUE(stream->Read(&response));
909 EXPECT_EQ(response.message(), request.message());
910
911 request.set_message(msg + "1");
912 EXPECT_TRUE(stream->Write(request));
913 EXPECT_TRUE(stream->Read(&response));
914 EXPECT_EQ(response.message(), request.message());
915
916 request.set_message(msg + "2");
917 stream->WriteLast(request, WriteOptions());
918 EXPECT_TRUE(stream->Read(&response));
919 EXPECT_EQ(response.message(), request.message());
920
921 EXPECT_FALSE(stream->Read(&response));
922 EXPECT_FALSE(stream->Read(&response));
923
924 Status s = stream->Finish();
925 EXPECT_TRUE(s.ok());
926 }
927
928 // This was added to prevent regression from issue:
929 // https://github.com/grpc/grpc/issues/11546
TEST_P(End2endTest,BidiStreamWithEverythingCoalesced)930 TEST_P(End2endTest, BidiStreamWithEverythingCoalesced) {
931 ResetStub();
932 EchoRequest request;
933 EchoResponse response;
934 ClientContext context;
935 context.AddMetadata(kServerFinishAfterNReads, "1");
936 context.set_initial_metadata_corked(true);
937 grpc::string msg("hello");
938
939 auto stream = stub_->BidiStream(&context);
940
941 request.set_message(msg + "0");
942 stream->WriteLast(request, WriteOptions());
943 EXPECT_TRUE(stream->Read(&response));
944 EXPECT_EQ(response.message(), request.message());
945
946 EXPECT_FALSE(stream->Read(&response));
947 EXPECT_FALSE(stream->Read(&response));
948
949 Status s = stream->Finish();
950 EXPECT_TRUE(s.ok());
951 }
952
953 // Talk to the two services with the same name but different package names.
954 // The two stubs are created on the same channel.
TEST_P(End2endTest,DiffPackageServices)955 TEST_P(End2endTest, DiffPackageServices) {
956 ResetStub();
957 EchoRequest request;
958 EchoResponse response;
959 request.set_message("Hello");
960
961 ClientContext context;
962 Status s = stub_->Echo(&context, request, &response);
963 EXPECT_EQ(response.message(), request.message());
964 EXPECT_TRUE(s.ok());
965
966 std::unique_ptr<grpc::testing::duplicate::EchoTestService::Stub> dup_pkg_stub(
967 grpc::testing::duplicate::EchoTestService::NewStub(channel_));
968 ClientContext context2;
969 s = dup_pkg_stub->Echo(&context2, request, &response);
970 EXPECT_EQ("no package", response.message());
971 EXPECT_TRUE(s.ok());
972 }
973
CancelRpc(ClientContext * context,int delay_us,TestServiceImpl * service)974 void CancelRpc(ClientContext* context, int delay_us, TestServiceImpl* service) {
975 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
976 gpr_time_from_micros(delay_us, GPR_TIMESPAN)));
977 while (!service->signal_client()) {
978 }
979 context->TryCancel();
980 }
981
TEST_P(End2endTest,CancelRpcBeforeStart)982 TEST_P(End2endTest, CancelRpcBeforeStart) {
983 ResetStub();
984 EchoRequest request;
985 EchoResponse response;
986 ClientContext context;
987 request.set_message("hello");
988 context.TryCancel();
989 Status s = stub_->Echo(&context, request, &response);
990 EXPECT_EQ("", response.message());
991 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
992 }
993
994 // Client cancels request stream after sending two messages
TEST_P(End2endTest,ClientCancelsRequestStream)995 TEST_P(End2endTest, ClientCancelsRequestStream) {
996 ResetStub();
997 EchoRequest request;
998 EchoResponse response;
999 ClientContext context;
1000 request.set_message("hello");
1001
1002 auto stream = stub_->RequestStream(&context, &response);
1003 EXPECT_TRUE(stream->Write(request));
1004 EXPECT_TRUE(stream->Write(request));
1005
1006 context.TryCancel();
1007
1008 Status s = stream->Finish();
1009 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1010
1011 EXPECT_EQ(response.message(), "");
1012 }
1013
1014 // Client cancels server stream after sending some messages
TEST_P(End2endTest,ClientCancelsResponseStream)1015 TEST_P(End2endTest, ClientCancelsResponseStream) {
1016 ResetStub();
1017 EchoRequest request;
1018 EchoResponse response;
1019 ClientContext context;
1020 request.set_message("hello");
1021
1022 auto stream = stub_->ResponseStream(&context, request);
1023
1024 EXPECT_TRUE(stream->Read(&response));
1025 EXPECT_EQ(response.message(), request.message() + "0");
1026 EXPECT_TRUE(stream->Read(&response));
1027 EXPECT_EQ(response.message(), request.message() + "1");
1028
1029 context.TryCancel();
1030
1031 // The cancellation races with responses, so there might be zero or
1032 // one responses pending, read till failure
1033
1034 if (stream->Read(&response)) {
1035 EXPECT_EQ(response.message(), request.message() + "2");
1036 // Since we have cancelled, we expect the next attempt to read to fail
1037 EXPECT_FALSE(stream->Read(&response));
1038 }
1039
1040 Status s = stream->Finish();
1041 // The final status could be either of CANCELLED or OK depending on
1042 // who won the race.
1043 EXPECT_GE(grpc::StatusCode::CANCELLED, s.error_code());
1044 }
1045
1046 // Client cancels bidi stream after sending some messages
TEST_P(End2endTest,ClientCancelsBidi)1047 TEST_P(End2endTest, ClientCancelsBidi) {
1048 ResetStub();
1049 EchoRequest request;
1050 EchoResponse response;
1051 ClientContext context;
1052 grpc::string msg("hello");
1053
1054 auto stream = stub_->BidiStream(&context);
1055
1056 request.set_message(msg + "0");
1057 EXPECT_TRUE(stream->Write(request));
1058 EXPECT_TRUE(stream->Read(&response));
1059 EXPECT_EQ(response.message(), request.message());
1060
1061 request.set_message(msg + "1");
1062 EXPECT_TRUE(stream->Write(request));
1063
1064 context.TryCancel();
1065
1066 // The cancellation races with responses, so there might be zero or
1067 // one responses pending, read till failure
1068
1069 if (stream->Read(&response)) {
1070 EXPECT_EQ(response.message(), request.message());
1071 // Since we have cancelled, we expect the next attempt to read to fail
1072 EXPECT_FALSE(stream->Read(&response));
1073 }
1074
1075 Status s = stream->Finish();
1076 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1077 }
1078
TEST_P(End2endTest,RpcMaxMessageSize)1079 TEST_P(End2endTest, RpcMaxMessageSize) {
1080 ResetStub();
1081 EchoRequest request;
1082 EchoResponse response;
1083 request.set_message(string(kMaxMessageSize_ * 2, 'a'));
1084 request.mutable_param()->set_server_die(true);
1085
1086 ClientContext context;
1087 Status s = stub_->Echo(&context, request, &response);
1088 EXPECT_FALSE(s.ok());
1089 }
1090
ReaderThreadFunc(ClientReaderWriter<EchoRequest,EchoResponse> * stream,gpr_event * ev)1091 void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream,
1092 gpr_event* ev) {
1093 EchoResponse resp;
1094 gpr_event_set(ev, (void*)1);
1095 while (stream->Read(&resp)) {
1096 gpr_log(GPR_INFO, "Read message");
1097 }
1098 }
1099
1100 // Run a Read and a WritesDone simultaneously.
TEST_P(End2endTest,SimultaneousReadWritesDone)1101 TEST_P(End2endTest, SimultaneousReadWritesDone) {
1102 ResetStub();
1103 ClientContext context;
1104 gpr_event ev;
1105 gpr_event_init(&ev);
1106 auto stream = stub_->BidiStream(&context);
1107 std::thread reader_thread(ReaderThreadFunc, stream.get(), &ev);
1108 gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
1109 stream->WritesDone();
1110 reader_thread.join();
1111 Status s = stream->Finish();
1112 EXPECT_TRUE(s.ok());
1113 }
1114
TEST_P(End2endTest,ChannelState)1115 TEST_P(End2endTest, ChannelState) {
1116 if (GetParam().inproc) {
1117 return;
1118 }
1119
1120 ResetStub();
1121 // Start IDLE
1122 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
1123
1124 // Did not ask to connect, no state change.
1125 CompletionQueue cq;
1126 std::chrono::system_clock::time_point deadline =
1127 std::chrono::system_clock::now() + std::chrono::milliseconds(10);
1128 channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, nullptr);
1129 void* tag;
1130 bool ok = true;
1131 cq.Next(&tag, &ok);
1132 EXPECT_FALSE(ok);
1133
1134 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true));
1135 EXPECT_TRUE(channel_->WaitForStateChange(GRPC_CHANNEL_IDLE,
1136 gpr_inf_future(GPR_CLOCK_REALTIME)));
1137 auto state = channel_->GetState(false);
1138 EXPECT_TRUE(state == GRPC_CHANNEL_CONNECTING || state == GRPC_CHANNEL_READY);
1139 }
1140
1141 // Takes 10s.
TEST_P(End2endTest,ChannelStateTimeout)1142 TEST_P(End2endTest, ChannelStateTimeout) {
1143 if ((GetParam().credentials_type != kInsecureCredentialsType) ||
1144 GetParam().inproc) {
1145 return;
1146 }
1147 int port = grpc_pick_unused_port_or_die();
1148 std::ostringstream server_address;
1149 server_address << "127.0.0.1:" << port;
1150 // Channel to non-existing server
1151 auto channel =
1152 CreateChannel(server_address.str(), InsecureChannelCredentials());
1153 // Start IDLE
1154 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(true));
1155
1156 auto state = GRPC_CHANNEL_IDLE;
1157 for (int i = 0; i < 10; i++) {
1158 channel->WaitForStateChange(
1159 state, std::chrono::system_clock::now() + std::chrono::seconds(1));
1160 state = channel->GetState(false);
1161 }
1162 }
1163
1164 // Talking to a non-existing service.
TEST_P(End2endTest,NonExistingService)1165 TEST_P(End2endTest, NonExistingService) {
1166 ResetChannel();
1167 std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1168 stub = grpc::testing::UnimplementedEchoService::NewStub(channel_);
1169
1170 EchoRequest request;
1171 EchoResponse response;
1172 request.set_message("Hello");
1173
1174 ClientContext context;
1175 Status s = stub->Unimplemented(&context, request, &response);
1176 EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code());
1177 EXPECT_EQ("", s.error_message());
1178 }
1179
1180 // Ask the server to send back a serialized proto in trailer.
1181 // This is an example of setting error details.
TEST_P(End2endTest,BinaryTrailerTest)1182 TEST_P(End2endTest, BinaryTrailerTest) {
1183 ResetStub();
1184 EchoRequest request;
1185 EchoResponse response;
1186 ClientContext context;
1187
1188 request.mutable_param()->set_echo_metadata(true);
1189 DebugInfo* info = request.mutable_param()->mutable_debug_info();
1190 info->add_stack_entries("stack_entry_1");
1191 info->add_stack_entries("stack_entry_2");
1192 info->add_stack_entries("stack_entry_3");
1193 info->set_detail("detailed debug info");
1194 grpc::string expected_string = info->SerializeAsString();
1195 request.set_message("Hello");
1196
1197 Status s = stub_->Echo(&context, request, &response);
1198 EXPECT_FALSE(s.ok());
1199 auto trailers = context.GetServerTrailingMetadata();
1200 EXPECT_EQ(1u, trailers.count(kDebugInfoTrailerKey));
1201 auto iter = trailers.find(kDebugInfoTrailerKey);
1202 EXPECT_EQ(expected_string, iter->second);
1203 // Parse the returned trailer into a DebugInfo proto.
1204 DebugInfo returned_info;
1205 EXPECT_TRUE(returned_info.ParseFromString(ToString(iter->second)));
1206 }
1207
TEST_P(End2endTest,ExpectErrorTest)1208 TEST_P(End2endTest, ExpectErrorTest) {
1209 ResetStub();
1210
1211 std::vector<ErrorStatus> expected_status;
1212 expected_status.emplace_back();
1213 expected_status.back().set_code(13); // INTERNAL
1214 // No Error message or details
1215
1216 expected_status.emplace_back();
1217 expected_status.back().set_code(13); // INTERNAL
1218 expected_status.back().set_error_message("text error message");
1219 expected_status.back().set_binary_error_details("text error details");
1220
1221 expected_status.emplace_back();
1222 expected_status.back().set_code(13); // INTERNAL
1223 expected_status.back().set_error_message("text error message");
1224 expected_status.back().set_binary_error_details(
1225 "\x0\x1\x2\x3\x4\x5\x6\x8\x9\xA\xB");
1226
1227 for (auto iter = expected_status.begin(); iter != expected_status.end();
1228 ++iter) {
1229 EchoRequest request;
1230 EchoResponse response;
1231 ClientContext context;
1232 request.set_message("Hello");
1233 auto* error = request.mutable_param()->mutable_expected_error();
1234 error->set_code(iter->code());
1235 error->set_error_message(iter->error_message());
1236 error->set_binary_error_details(iter->binary_error_details());
1237
1238 Status s = stub_->Echo(&context, request, &response);
1239 EXPECT_FALSE(s.ok());
1240 EXPECT_EQ(iter->code(), s.error_code());
1241 EXPECT_EQ(iter->error_message(), s.error_message());
1242 EXPECT_EQ(iter->binary_error_details(), s.error_details());
1243 EXPECT_TRUE(context.debug_error_string().find("created") !=
1244 std::string::npos);
1245 EXPECT_TRUE(context.debug_error_string().find("file") != std::string::npos);
1246 EXPECT_TRUE(context.debug_error_string().find("line") != std::string::npos);
1247 EXPECT_TRUE(context.debug_error_string().find("status") !=
1248 std::string::npos);
1249 EXPECT_TRUE(context.debug_error_string().find("13") != std::string::npos);
1250 }
1251 }
1252
1253 //////////////////////////////////////////////////////////////////////////
1254 // Test with and without a proxy.
1255 class ProxyEnd2endTest : public End2endTest {
1256 protected:
1257 };
1258
TEST_P(ProxyEnd2endTest,SimpleRpc)1259 TEST_P(ProxyEnd2endTest, SimpleRpc) {
1260 ResetStub();
1261 SendRpc(stub_.get(), 1, false);
1262 }
1263
TEST_P(ProxyEnd2endTest,SimpleRpcWithEmptyMessages)1264 TEST_P(ProxyEnd2endTest, SimpleRpcWithEmptyMessages) {
1265 ResetStub();
1266 EchoRequest request;
1267 EchoResponse response;
1268
1269 ClientContext context;
1270 Status s = stub_->Echo(&context, request, &response);
1271 EXPECT_TRUE(s.ok());
1272 }
1273
TEST_P(ProxyEnd2endTest,MultipleRpcs)1274 TEST_P(ProxyEnd2endTest, MultipleRpcs) {
1275 ResetStub();
1276 std::vector<std::thread> threads;
1277 threads.reserve(10);
1278 for (int i = 0; i < 10; ++i) {
1279 threads.emplace_back(SendRpc, stub_.get(), 10, false);
1280 }
1281 for (int i = 0; i < 10; ++i) {
1282 threads[i].join();
1283 }
1284 }
1285
1286 // Set a 10us deadline and make sure proper error is returned.
TEST_P(ProxyEnd2endTest,RpcDeadlineExpires)1287 TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) {
1288 ResetStub();
1289 EchoRequest request;
1290 EchoResponse response;
1291 request.set_message("Hello");
1292 request.mutable_param()->set_skip_cancelled_check(true);
1293 // Let server sleep for 40 ms first to guarantee expiry.
1294 // 40 ms might seem a bit extreme but the timer manager would have been just
1295 // initialized (when ResetStub() was called) and there are some warmup costs
1296 // i.e the timer thread many not have even started. There might also be other
1297 // delays in the timer manager thread (in acquiring locks, timer data
1298 // structure manipulations, starting backup timer threads) that add to the
1299 // delays. 40ms is still not enough in some cases but this significantly
1300 // reduces the test flakes
1301 request.mutable_param()->set_server_sleep_us(40 * 1000);
1302
1303 ClientContext context;
1304 std::chrono::system_clock::time_point deadline =
1305 std::chrono::system_clock::now() + std::chrono::milliseconds(1);
1306 context.set_deadline(deadline);
1307 Status s = stub_->Echo(&context, request, &response);
1308 EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.error_code());
1309 }
1310
1311 // Set a long but finite deadline.
TEST_P(ProxyEnd2endTest,RpcLongDeadline)1312 TEST_P(ProxyEnd2endTest, RpcLongDeadline) {
1313 ResetStub();
1314 EchoRequest request;
1315 EchoResponse response;
1316 request.set_message("Hello");
1317
1318 ClientContext context;
1319 std::chrono::system_clock::time_point deadline =
1320 std::chrono::system_clock::now() + std::chrono::hours(1);
1321 context.set_deadline(deadline);
1322 Status s = stub_->Echo(&context, request, &response);
1323 EXPECT_EQ(response.message(), request.message());
1324 EXPECT_TRUE(s.ok());
1325 }
1326
1327 // Ask server to echo back the deadline it sees.
TEST_P(ProxyEnd2endTest,EchoDeadline)1328 TEST_P(ProxyEnd2endTest, EchoDeadline) {
1329 ResetStub();
1330 EchoRequest request;
1331 EchoResponse response;
1332 request.set_message("Hello");
1333 request.mutable_param()->set_echo_deadline(true);
1334
1335 ClientContext context;
1336 std::chrono::system_clock::time_point deadline =
1337 std::chrono::system_clock::now() + std::chrono::seconds(100);
1338 context.set_deadline(deadline);
1339 Status s = stub_->Echo(&context, request, &response);
1340 EXPECT_EQ(response.message(), request.message());
1341 EXPECT_TRUE(s.ok());
1342 gpr_timespec sent_deadline;
1343 Timepoint2Timespec(deadline, &sent_deadline);
1344 // We want to allow some reasonable error given:
1345 // - request_deadline() only has 1sec resolution so the best we can do is +-1
1346 // - if sent_deadline.tv_nsec is very close to the next second's boundary we
1347 // can end up being off by 2 in one direction.
1348 EXPECT_LE(response.param().request_deadline() - sent_deadline.tv_sec, 2);
1349 EXPECT_GE(response.param().request_deadline() - sent_deadline.tv_sec, -1);
1350 }
1351
1352 // Ask server to echo back the deadline it sees. The rpc has no deadline.
TEST_P(ProxyEnd2endTest,EchoDeadlineForNoDeadlineRpc)1353 TEST_P(ProxyEnd2endTest, EchoDeadlineForNoDeadlineRpc) {
1354 ResetStub();
1355 EchoRequest request;
1356 EchoResponse response;
1357 request.set_message("Hello");
1358 request.mutable_param()->set_echo_deadline(true);
1359
1360 ClientContext context;
1361 Status s = stub_->Echo(&context, request, &response);
1362 EXPECT_EQ(response.message(), request.message());
1363 EXPECT_TRUE(s.ok());
1364 EXPECT_EQ(response.param().request_deadline(),
1365 gpr_inf_future(GPR_CLOCK_REALTIME).tv_sec);
1366 }
1367
TEST_P(ProxyEnd2endTest,UnimplementedRpc)1368 TEST_P(ProxyEnd2endTest, UnimplementedRpc) {
1369 ResetStub();
1370 EchoRequest request;
1371 EchoResponse response;
1372 request.set_message("Hello");
1373
1374 ClientContext context;
1375 Status s = stub_->Unimplemented(&context, request, &response);
1376 EXPECT_FALSE(s.ok());
1377 EXPECT_EQ(s.error_code(), grpc::StatusCode::UNIMPLEMENTED);
1378 EXPECT_EQ(s.error_message(), "");
1379 EXPECT_EQ(response.message(), "");
1380 }
1381
1382 // Client cancels rpc after 10ms
TEST_P(ProxyEnd2endTest,ClientCancelsRpc)1383 TEST_P(ProxyEnd2endTest, ClientCancelsRpc) {
1384 ResetStub();
1385 EchoRequest request;
1386 EchoResponse response;
1387 request.set_message("Hello");
1388 const int kCancelDelayUs = 10 * 1000;
1389 request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs);
1390
1391 ClientContext context;
1392 std::thread cancel_thread(CancelRpc, &context, kCancelDelayUs, &service_);
1393 Status s = stub_->Echo(&context, request, &response);
1394 cancel_thread.join();
1395 EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1396 EXPECT_EQ(s.error_message(), "Cancelled");
1397 }
1398
1399 // Server cancels rpc after 1ms
TEST_P(ProxyEnd2endTest,ServerCancelsRpc)1400 TEST_P(ProxyEnd2endTest, ServerCancelsRpc) {
1401 ResetStub();
1402 EchoRequest request;
1403 EchoResponse response;
1404 request.set_message("Hello");
1405 request.mutable_param()->set_server_cancel_after_us(1000);
1406
1407 ClientContext context;
1408 Status s = stub_->Echo(&context, request, &response);
1409 EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1410 EXPECT_TRUE(s.error_message().empty());
1411 }
1412
1413 // Make the response larger than the flow control window.
TEST_P(ProxyEnd2endTest,HugeResponse)1414 TEST_P(ProxyEnd2endTest, HugeResponse) {
1415 ResetStub();
1416 EchoRequest request;
1417 EchoResponse response;
1418 request.set_message("huge response");
1419 const size_t kResponseSize = 1024 * (1024 + 10);
1420 request.mutable_param()->set_response_message_length(kResponseSize);
1421
1422 ClientContext context;
1423 std::chrono::system_clock::time_point deadline =
1424 std::chrono::system_clock::now() + std::chrono::seconds(20);
1425 context.set_deadline(deadline);
1426 Status s = stub_->Echo(&context, request, &response);
1427 EXPECT_EQ(kResponseSize, response.message().size());
1428 EXPECT_TRUE(s.ok());
1429 }
1430
TEST_P(ProxyEnd2endTest,Peer)1431 TEST_P(ProxyEnd2endTest, Peer) {
1432 // Peer is not meaningful for inproc
1433 if (GetParam().inproc) {
1434 return;
1435 }
1436 ResetStub();
1437 EchoRequest request;
1438 EchoResponse response;
1439 request.set_message("hello");
1440 request.mutable_param()->set_echo_peer(true);
1441
1442 ClientContext context;
1443 Status s = stub_->Echo(&context, request, &response);
1444 EXPECT_EQ(response.message(), request.message());
1445 EXPECT_TRUE(s.ok());
1446 EXPECT_TRUE(CheckIsLocalhost(response.param().peer()));
1447 EXPECT_TRUE(CheckIsLocalhost(context.peer()));
1448 }
1449
1450 //////////////////////////////////////////////////////////////////////////
1451 class SecureEnd2endTest : public End2endTest {
1452 protected:
SecureEnd2endTest()1453 SecureEnd2endTest() {
1454 GPR_ASSERT(!GetParam().use_proxy);
1455 GPR_ASSERT(GetParam().credentials_type != kInsecureCredentialsType);
1456 }
1457 };
1458
TEST_P(SecureEnd2endTest,SimpleRpcWithHost)1459 TEST_P(SecureEnd2endTest, SimpleRpcWithHost) {
1460 ResetStub();
1461
1462 EchoRequest request;
1463 EchoResponse response;
1464 request.set_message("Hello");
1465
1466 ClientContext context;
1467 context.set_authority("foo.test.youtube.com");
1468 Status s = stub_->Echo(&context, request, &response);
1469 EXPECT_EQ(response.message(), request.message());
1470 EXPECT_TRUE(response.has_param());
1471 EXPECT_EQ("special", response.param().host());
1472 EXPECT_TRUE(s.ok());
1473 }
1474
MetadataContains(const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,const grpc::string & key,const grpc::string & value)1475 bool MetadataContains(
1476 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
1477 const grpc::string& key, const grpc::string& value) {
1478 int count = 0;
1479
1480 for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator iter =
1481 metadata.begin();
1482 iter != metadata.end(); ++iter) {
1483 if (ToString(iter->first) == key && ToString(iter->second) == value) {
1484 count++;
1485 }
1486 }
1487 return count == 1;
1488 }
1489
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginAndProcessorSuccess)1490 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) {
1491 auto* processor = new TestAuthMetadataProcessor(true);
1492 StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1493 ResetStub();
1494 EchoRequest request;
1495 EchoResponse response;
1496 ClientContext context;
1497 context.set_credentials(processor->GetCompatibleClientCreds());
1498 request.set_message("Hello");
1499 request.mutable_param()->set_echo_metadata(true);
1500 request.mutable_param()->set_expected_client_identity(
1501 TestAuthMetadataProcessor::kGoodGuy);
1502 request.mutable_param()->set_expected_transport_security_type(
1503 GetParam().credentials_type);
1504
1505 Status s = stub_->Echo(&context, request, &response);
1506 EXPECT_EQ(request.message(), response.message());
1507 EXPECT_TRUE(s.ok());
1508
1509 // Metadata should have been consumed by the processor.
1510 EXPECT_FALSE(MetadataContains(
1511 context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY,
1512 grpc::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
1513 }
1514
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginAndProcessorFailure)1515 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorFailure) {
1516 auto* processor = new TestAuthMetadataProcessor(true);
1517 StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1518 ResetStub();
1519 EchoRequest request;
1520 EchoResponse response;
1521 ClientContext context;
1522 context.set_credentials(processor->GetIncompatibleClientCreds());
1523 request.set_message("Hello");
1524
1525 Status s = stub_->Echo(&context, request, &response);
1526 EXPECT_FALSE(s.ok());
1527 EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
1528 }
1529
TEST_P(SecureEnd2endTest,SetPerCallCredentials)1530 TEST_P(SecureEnd2endTest, SetPerCallCredentials) {
1531 ResetStub();
1532 EchoRequest request;
1533 EchoResponse response;
1534 ClientContext context;
1535 std::shared_ptr<CallCredentials> creds =
1536 GoogleIAMCredentials("fake_token", "fake_selector");
1537 context.set_credentials(creds);
1538 request.set_message("Hello");
1539 request.mutable_param()->set_echo_metadata(true);
1540
1541 Status s = stub_->Echo(&context, request, &response);
1542 EXPECT_EQ(request.message(), response.message());
1543 EXPECT_TRUE(s.ok());
1544 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1545 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1546 "fake_token"));
1547 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1548 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1549 "fake_selector"));
1550 }
1551
TEST_P(SecureEnd2endTest,OverridePerCallCredentials)1552 TEST_P(SecureEnd2endTest, OverridePerCallCredentials) {
1553 ResetStub();
1554 EchoRequest request;
1555 EchoResponse response;
1556 ClientContext context;
1557 std::shared_ptr<CallCredentials> creds1 =
1558 GoogleIAMCredentials("fake_token1", "fake_selector1");
1559 context.set_credentials(creds1);
1560 std::shared_ptr<CallCredentials> creds2 =
1561 GoogleIAMCredentials("fake_token2", "fake_selector2");
1562 context.set_credentials(creds2);
1563 request.set_message("Hello");
1564 request.mutable_param()->set_echo_metadata(true);
1565
1566 Status s = stub_->Echo(&context, request, &response);
1567 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1568 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1569 "fake_token2"));
1570 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1571 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1572 "fake_selector2"));
1573 EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
1574 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1575 "fake_token1"));
1576 EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
1577 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1578 "fake_selector1"));
1579 EXPECT_EQ(request.message(), response.message());
1580 EXPECT_TRUE(s.ok());
1581 }
1582
TEST_P(SecureEnd2endTest,AuthMetadataPluginKeyFailure)1583 TEST_P(SecureEnd2endTest, AuthMetadataPluginKeyFailure) {
1584 ResetStub();
1585 EchoRequest request;
1586 EchoResponse response;
1587 ClientContext context;
1588 context.set_credentials(
1589 MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
1590 new TestMetadataCredentialsPlugin(
1591 TestMetadataCredentialsPlugin::kBadMetadataKey,
1592 "Does not matter, will fail the key is invalid.", false, true))));
1593 request.set_message("Hello");
1594
1595 Status s = stub_->Echo(&context, request, &response);
1596 EXPECT_FALSE(s.ok());
1597 EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1598 }
1599
TEST_P(SecureEnd2endTest,AuthMetadataPluginValueFailure)1600 TEST_P(SecureEnd2endTest, AuthMetadataPluginValueFailure) {
1601 ResetStub();
1602 EchoRequest request;
1603 EchoResponse response;
1604 ClientContext context;
1605 context.set_credentials(
1606 MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
1607 new TestMetadataCredentialsPlugin(
1608 TestMetadataCredentialsPlugin::kGoodMetadataKey,
1609 "With illegal \n value.", false, true))));
1610 request.set_message("Hello");
1611
1612 Status s = stub_->Echo(&context, request, &response);
1613 EXPECT_FALSE(s.ok());
1614 EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1615 }
1616
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginFailure)1617 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) {
1618 ResetStub();
1619 EchoRequest request;
1620 EchoResponse response;
1621 ClientContext context;
1622 context.set_credentials(
1623 MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
1624 new TestMetadataCredentialsPlugin(
1625 TestMetadataCredentialsPlugin::kGoodMetadataKey,
1626 "Does not matter, will fail anyway (see 3rd param)", false,
1627 false))));
1628 request.set_message("Hello");
1629
1630 Status s = stub_->Echo(&context, request, &response);
1631 EXPECT_FALSE(s.ok());
1632 EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1633 EXPECT_EQ(s.error_message(),
1634 grpc::string("Getting metadata from plugin failed with error: ") +
1635 kTestCredsPluginErrorMsg);
1636 }
1637
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginAndProcessorSuccess)1638 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) {
1639 auto* processor = new TestAuthMetadataProcessor(false);
1640 StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1641 ResetStub();
1642 EchoRequest request;
1643 EchoResponse response;
1644 ClientContext context;
1645 context.set_credentials(processor->GetCompatibleClientCreds());
1646 request.set_message("Hello");
1647 request.mutable_param()->set_echo_metadata(true);
1648 request.mutable_param()->set_expected_client_identity(
1649 TestAuthMetadataProcessor::kGoodGuy);
1650 request.mutable_param()->set_expected_transport_security_type(
1651 GetParam().credentials_type);
1652
1653 Status s = stub_->Echo(&context, request, &response);
1654 EXPECT_EQ(request.message(), response.message());
1655 EXPECT_TRUE(s.ok());
1656
1657 // Metadata should have been consumed by the processor.
1658 EXPECT_FALSE(MetadataContains(
1659 context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY,
1660 grpc::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
1661 }
1662
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginAndProcessorFailure)1663 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) {
1664 auto* processor = new TestAuthMetadataProcessor(false);
1665 StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1666 ResetStub();
1667 EchoRequest request;
1668 EchoResponse response;
1669 ClientContext context;
1670 context.set_credentials(processor->GetIncompatibleClientCreds());
1671 request.set_message("Hello");
1672
1673 Status s = stub_->Echo(&context, request, &response);
1674 EXPECT_FALSE(s.ok());
1675 EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
1676 }
1677
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginFailure)1678 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) {
1679 ResetStub();
1680 EchoRequest request;
1681 EchoResponse response;
1682 ClientContext context;
1683 context.set_credentials(
1684 MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
1685 new TestMetadataCredentialsPlugin(
1686 TestMetadataCredentialsPlugin::kGoodMetadataKey,
1687 "Does not matter, will fail anyway (see 3rd param)", true,
1688 false))));
1689 request.set_message("Hello");
1690
1691 Status s = stub_->Echo(&context, request, &response);
1692 EXPECT_FALSE(s.ok());
1693 EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1694 EXPECT_EQ(s.error_message(),
1695 grpc::string("Getting metadata from plugin failed with error: ") +
1696 kTestCredsPluginErrorMsg);
1697 }
1698
TEST_P(SecureEnd2endTest,CompositeCallCreds)1699 TEST_P(SecureEnd2endTest, CompositeCallCreds) {
1700 ResetStub();
1701 EchoRequest request;
1702 EchoResponse response;
1703 ClientContext context;
1704 const char kMetadataKey1[] = "call-creds-key1";
1705 const char kMetadataKey2[] = "call-creds-key2";
1706 const char kMetadataVal1[] = "call-creds-val1";
1707 const char kMetadataVal2[] = "call-creds-val2";
1708
1709 context.set_credentials(CompositeCallCredentials(
1710 MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
1711 new TestMetadataCredentialsPlugin(kMetadataKey1, kMetadataVal1, true,
1712 true))),
1713 MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
1714 new TestMetadataCredentialsPlugin(kMetadataKey2, kMetadataVal2, true,
1715 true)))));
1716 request.set_message("Hello");
1717 request.mutable_param()->set_echo_metadata(true);
1718
1719 Status s = stub_->Echo(&context, request, &response);
1720 EXPECT_TRUE(s.ok());
1721 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1722 kMetadataKey1, kMetadataVal1));
1723 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1724 kMetadataKey2, kMetadataVal2));
1725 }
1726
TEST_P(SecureEnd2endTest,ClientAuthContext)1727 TEST_P(SecureEnd2endTest, ClientAuthContext) {
1728 ResetStub();
1729 EchoRequest request;
1730 EchoResponse response;
1731 request.set_message("Hello");
1732 request.mutable_param()->set_check_auth_context(GetParam().credentials_type ==
1733 kTlsCredentialsType);
1734 request.mutable_param()->set_expected_transport_security_type(
1735 GetParam().credentials_type);
1736 ClientContext context;
1737 Status s = stub_->Echo(&context, request, &response);
1738 EXPECT_EQ(response.message(), request.message());
1739 EXPECT_TRUE(s.ok());
1740
1741 std::shared_ptr<const AuthContext> auth_ctx = context.auth_context();
1742 std::vector<grpc::string_ref> tst =
1743 auth_ctx->FindPropertyValues("transport_security_type");
1744 ASSERT_EQ(1u, tst.size());
1745 EXPECT_EQ(GetParam().credentials_type, ToString(tst[0]));
1746 if (GetParam().credentials_type == kTlsCredentialsType) {
1747 EXPECT_EQ("x509_subject_alternative_name",
1748 auth_ctx->GetPeerIdentityPropertyName());
1749 EXPECT_EQ(4u, auth_ctx->GetPeerIdentity().size());
1750 EXPECT_EQ("*.test.google.fr", ToString(auth_ctx->GetPeerIdentity()[0]));
1751 EXPECT_EQ("waterzooi.test.google.be",
1752 ToString(auth_ctx->GetPeerIdentity()[1]));
1753 EXPECT_EQ("*.test.youtube.com", ToString(auth_ctx->GetPeerIdentity()[2]));
1754 EXPECT_EQ("192.168.1.3", ToString(auth_ctx->GetPeerIdentity()[3]));
1755 }
1756 }
1757
1758 class ResourceQuotaEnd2endTest : public End2endTest {
1759 public:
ResourceQuotaEnd2endTest()1760 ResourceQuotaEnd2endTest()
1761 : server_resource_quota_("server_resource_quota") {}
1762
ConfigureServerBuilder(ServerBuilder * builder)1763 virtual void ConfigureServerBuilder(ServerBuilder* builder) override {
1764 builder->SetResourceQuota(server_resource_quota_);
1765 }
1766
1767 private:
1768 ResourceQuota server_resource_quota_;
1769 };
1770
TEST_P(ResourceQuotaEnd2endTest,SimpleRequest)1771 TEST_P(ResourceQuotaEnd2endTest, SimpleRequest) {
1772 ResetStub();
1773
1774 EchoRequest request;
1775 EchoResponse response;
1776 request.set_message("Hello");
1777
1778 ClientContext context;
1779 Status s = stub_->Echo(&context, request, &response);
1780 EXPECT_EQ(response.message(), request.message());
1781 EXPECT_TRUE(s.ok());
1782 }
1783
CreateTestScenarios(bool use_proxy,bool test_insecure,bool test_secure,bool test_inproc)1784 std::vector<TestScenario> CreateTestScenarios(bool use_proxy,
1785 bool test_insecure,
1786 bool test_secure,
1787 bool test_inproc) {
1788 std::vector<TestScenario> scenarios;
1789 std::vector<grpc::string> credentials_types;
1790 if (test_secure) {
1791 credentials_types =
1792 GetCredentialsProvider()->GetSecureCredentialsTypeList();
1793 }
1794 auto insec_ok = [] {
1795 // Only allow insecure credentials type when it is registered with the
1796 // provider. User may create providers that do not have insecure.
1797 return GetCredentialsProvider()->GetChannelCredentials(
1798 kInsecureCredentialsType, nullptr) != nullptr;
1799 };
1800 if (test_insecure && insec_ok()) {
1801 credentials_types.push_back(kInsecureCredentialsType);
1802 }
1803 GPR_ASSERT(!credentials_types.empty());
1804 for (const auto& cred : credentials_types) {
1805 scenarios.emplace_back(false, false, cred);
1806 if (use_proxy) {
1807 scenarios.emplace_back(true, false, cred);
1808 }
1809 }
1810 if (test_inproc && insec_ok()) {
1811 scenarios.emplace_back(false, true, kInsecureCredentialsType);
1812 }
1813 return scenarios;
1814 }
1815
1816 INSTANTIATE_TEST_CASE_P(End2end, End2endTest,
1817 ::testing::ValuesIn(CreateTestScenarios(false, true,
1818 true, true)));
1819
1820 INSTANTIATE_TEST_CASE_P(End2endServerTryCancel, End2endServerTryCancelTest,
1821 ::testing::ValuesIn(CreateTestScenarios(false, true,
1822 true, true)));
1823
1824 INSTANTIATE_TEST_CASE_P(ProxyEnd2end, ProxyEnd2endTest,
1825 ::testing::ValuesIn(CreateTestScenarios(true, true,
1826 true, true)));
1827
1828 INSTANTIATE_TEST_CASE_P(SecureEnd2end, SecureEnd2endTest,
1829 ::testing::ValuesIn(CreateTestScenarios(false, false,
1830 true, false)));
1831
1832 INSTANTIATE_TEST_CASE_P(ResourceQuotaEnd2end, ResourceQuotaEnd2endTest,
1833 ::testing::ValuesIn(CreateTestScenarios(false, true,
1834 true, true)));
1835
1836 } // namespace
1837 } // namespace testing
1838 } // namespace grpc
1839
main(int argc,char ** argv)1840 int main(int argc, char** argv) {
1841 gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "200");
1842 grpc_test_init(argc, argv);
1843 ::testing::InitGoogleTest(&argc, argv);
1844 return RUN_ALL_TESTS();
1845 }
1846