• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/grpc.h>
20 #include <grpc/support/alloc.h>
21 #include <grpc/support/time.h>
22 #include <grpcpp/channel.h>
23 #include <grpcpp/client_context.h>
24 #include <grpcpp/create_channel.h>
25 #include <grpcpp/resource_quota.h>
26 #include <grpcpp/security/auth_metadata_processor.h>
27 #include <grpcpp/security/credentials.h>
28 #include <grpcpp/security/server_credentials.h>
29 #include <grpcpp/server.h>
30 #include <grpcpp/server_builder.h>
31 #include <grpcpp/server_context.h>
32 #include <grpcpp/support/string_ref.h>
33 #include <grpcpp/test/channel_test_peer.h>
34 
35 #include <mutex>
36 #include <thread>
37 
38 #include "absl/log/check.h"
39 #include "absl/log/log.h"
40 #include "absl/memory/memory.h"
41 #include "absl/strings/ascii.h"
42 #include "absl/strings/match.h"
43 #include "absl/strings/str_format.h"
44 #include "src/core/client_channel/backup_poller.h"
45 #include "src/core/config/config_vars.h"
46 #include "src/core/lib/iomgr/iomgr.h"
47 #include "src/core/lib/security/credentials/credentials.h"
48 #include "src/core/util/crash.h"
49 #include "src/core/util/env.h"
50 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
51 #include "src/proto/grpc/testing/echo.grpc.pb.h"
52 #include "test/core/test_util/port.h"
53 #include "test/core/test_util/test_config.h"
54 #include "test/cpp/end2end/interceptors_util.h"
55 #include "test/cpp/end2end/test_service_impl.h"
56 #include "test/cpp/util/string_ref_helper.h"
57 #include "test/cpp/util/test_credentials_provider.h"
58 
59 #ifdef GRPC_POSIX_SOCKET_EV
60 #include "src/core/lib/iomgr/ev_posix.h"
61 #endif  // GRPC_POSIX_SOCKET_EV
62 
63 #include <gtest/gtest.h>
64 
65 using std::chrono::system_clock;
66 
67 namespace grpc {
68 namespace testing {
69 namespace {
70 
CheckIsLocalhost(const std::string & addr)71 bool CheckIsLocalhost(const std::string& addr) {
72   const std::string kIpv6("ipv6:%5B::1%5D:");
73   const std::string kIpv4MappedIpv6("ipv6:%5B::ffff:127.0.0.1%5D:");
74   const std::string kIpv4("ipv4:127.0.0.1:");
75   return addr.substr(0, kIpv4.size()) == kIpv4 ||
76          addr.substr(0, kIpv4MappedIpv6.size()) == kIpv4MappedIpv6 ||
77          addr.substr(0, kIpv6.size()) == kIpv6;
78 }
79 
80 const int kClientChannelBackupPollIntervalMs = 200;
81 
82 const char kTestCredsPluginErrorMsg[] = "Could not find plugin metadata.";
83 
84 const char kFakeToken[] = "fake_token";
85 const char kFakeSelector[] = "fake_selector";
86 const char kExpectedFakeCredsDebugString[] =
87     "CallCredentials{GoogleIAMCredentials{Token:present,"
88     "AuthoritySelector:fake_selector}}";
89 
90 const char kWrongToken[] = "wrong_token";
91 const char kWrongSelector[] = "wrong_selector";
92 const char kExpectedWrongCredsDebugString[] =
93     "CallCredentials{GoogleIAMCredentials{Token:present,"
94     "AuthoritySelector:wrong_selector}}";
95 
96 const char kFakeToken1[] = "fake_token1";
97 const char kFakeSelector1[] = "fake_selector1";
98 const char kExpectedFakeCreds1DebugString[] =
99     "CallCredentials{GoogleIAMCredentials{Token:present,"
100     "AuthoritySelector:fake_selector1}}";
101 
102 const char kFakeToken2[] = "fake_token2";
103 const char kFakeSelector2[] = "fake_selector2";
104 const char kExpectedFakeCreds2DebugString[] =
105     "CallCredentials{GoogleIAMCredentials{Token:present,"
106     "AuthoritySelector:fake_selector2}}";
107 
108 const char kExpectedAuthMetadataPluginKeyFailureCredsDebugString[] =
109     "CallCredentials{TestMetadataCredentials{key:TestPluginMetadata,"
110     "value:Does not matter, will fail the key is invalid.}}";
111 const char kExpectedAuthMetadataPluginValueFailureCredsDebugString[] =
112     "CallCredentials{TestMetadataCredentials{key:test-plugin-metadata,"
113     "value:With illegal \n value.}}";
114 const char kExpectedAuthMetadataPluginWithDeadlineCredsDebugString[] =
115     "CallCredentials{TestMetadataCredentials{key:meta_key,value:Does "
116     "not "
117     "matter}}";
118 const char kExpectedNonBlockingAuthMetadataPluginFailureCredsDebugString[] =
119     "CallCredentials{TestMetadataCredentials{key:test-plugin-metadata,"
120     "value:Does not matter, will fail anyway (see 3rd param)}}";
121 const char
122     kExpectedNonBlockingAuthMetadataPluginAndProcessorSuccessCredsDebugString
123         [] = "CallCredentials{TestMetadataCredentials{key:test-plugin-"
124              "metadata,value:Dr Jekyll}}";
125 const char
126     kExpectedNonBlockingAuthMetadataPluginAndProcessorFailureCredsDebugString
127         [] = "CallCredentials{TestMetadataCredentials{key:test-plugin-"
128              "metadata,value:Mr Hyde}}";
129 const char kExpectedBlockingAuthMetadataPluginFailureCredsDebugString[] =
130     "CallCredentials{TestMetadataCredentials{key:test-plugin-metadata,"
131     "value:Does not matter, will fail anyway (see 3rd param)}}";
132 const char kExpectedCompositeCallCredsDebugString[] =
133     "CallCredentials{CompositeCallCredentials{TestMetadataCredentials{"
134     "key:call-creds-key1,value:call-creds-val1},TestMetadataCredentials{key:"
135     "call-creds-key2,value:call-creds-val2}}}";
136 
137 class TestMetadataCredentialsPlugin : public MetadataCredentialsPlugin {
138  public:
139   static const char kGoodMetadataKey[];
140   static const char kBadMetadataKey[];
141 
TestMetadataCredentialsPlugin(const grpc::string_ref & metadata_key,const grpc::string_ref & metadata_value,bool is_blocking,bool is_successful,int delay_ms)142   TestMetadataCredentialsPlugin(const grpc::string_ref& metadata_key,
143                                 const grpc::string_ref& metadata_value,
144                                 bool is_blocking, bool is_successful,
145                                 int delay_ms)
146       : metadata_key_(metadata_key.data(), metadata_key.length()),
147         metadata_value_(metadata_value.data(), metadata_value.length()),
148         is_blocking_(is_blocking),
149         is_successful_(is_successful),
150         delay_ms_(delay_ms) {}
151 
IsBlocking() const152   bool IsBlocking() const override { return is_blocking_; }
153 
GetMetadata(grpc::string_ref service_url,grpc::string_ref method_name,const grpc::AuthContext & channel_auth_context,std::multimap<std::string,std::string> * metadata)154   Status GetMetadata(
155       grpc::string_ref service_url, grpc::string_ref method_name,
156       const grpc::AuthContext& channel_auth_context,
157       std::multimap<std::string, std::string>* metadata) override {
158     if (delay_ms_ != 0) {
159       gpr_sleep_until(
160           gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
161                        gpr_time_from_millis(delay_ms_, GPR_TIMESPAN)));
162     }
163     EXPECT_GT(service_url.length(), 0UL);
164     EXPECT_GT(method_name.length(), 0UL);
165     EXPECT_TRUE(channel_auth_context.IsPeerAuthenticated());
166     EXPECT_TRUE(metadata != nullptr);
167     if (is_successful_) {
168       metadata->insert(std::make_pair(metadata_key_, metadata_value_));
169       return Status::OK;
170     } else {
171       return Status(StatusCode::NOT_FOUND, kTestCredsPluginErrorMsg);
172     }
173   }
174 
DebugString()175   std::string DebugString() override {
176     return absl::StrFormat("TestMetadataCredentials{key:%s,value:%s}",
177                            metadata_key_.c_str(), metadata_value_.c_str());
178   }
179 
180  private:
181   std::string metadata_key_;
182   std::string metadata_value_;
183   bool is_blocking_;
184   bool is_successful_;
185   int delay_ms_;
186 };
187 
188 const char TestMetadataCredentialsPlugin::kBadMetadataKey[] =
189     "TestPluginMetadata";
190 const char TestMetadataCredentialsPlugin::kGoodMetadataKey[] =
191     "test-plugin-metadata";
192 
193 class TestAuthMetadataProcessor : public AuthMetadataProcessor {
194  public:
195   static const char kGoodGuy[];
196 
TestAuthMetadataProcessor(bool is_blocking)197   explicit TestAuthMetadataProcessor(bool is_blocking)
198       : is_blocking_(is_blocking) {}
199 
GetCompatibleClientCreds()200   std::shared_ptr<CallCredentials> GetCompatibleClientCreds() {
201     return grpc::MetadataCredentialsFromPlugin(
202         std::unique_ptr<MetadataCredentialsPlugin>(
203             new TestMetadataCredentialsPlugin(
204                 TestMetadataCredentialsPlugin::kGoodMetadataKey, kGoodGuy,
205                 is_blocking_, true, 0)));
206   }
207 
GetIncompatibleClientCreds()208   std::shared_ptr<CallCredentials> GetIncompatibleClientCreds() {
209     return grpc::MetadataCredentialsFromPlugin(
210         std::unique_ptr<MetadataCredentialsPlugin>(
211             new TestMetadataCredentialsPlugin(
212                 TestMetadataCredentialsPlugin::kGoodMetadataKey, "Mr Hyde",
213                 is_blocking_, true, 0)));
214   }
215 
216   // Interface implementation
IsBlocking() const217   bool IsBlocking() const override { return is_blocking_; }
218 
Process(const InputMetadata & auth_metadata,AuthContext * context,OutputMetadata * consumed_auth_metadata,OutputMetadata * response_metadata)219   Status Process(const InputMetadata& auth_metadata, AuthContext* context,
220                  OutputMetadata* consumed_auth_metadata,
221                  OutputMetadata* response_metadata) override {
222     EXPECT_TRUE(consumed_auth_metadata != nullptr);
223     EXPECT_TRUE(context != nullptr);
224     EXPECT_TRUE(response_metadata != nullptr);
225     auto auth_md =
226         auth_metadata.find(TestMetadataCredentialsPlugin::kGoodMetadataKey);
227     EXPECT_NE(auth_md, auth_metadata.end());
228     string_ref auth_md_value = auth_md->second;
229     if (auth_md_value == kGoodGuy) {
230       context->AddProperty(kIdentityPropName, kGoodGuy);
231       context->SetPeerIdentityPropertyName(kIdentityPropName);
232       consumed_auth_metadata->insert(std::make_pair(
233           string(auth_md->first.data(), auth_md->first.length()),
234           string(auth_md->second.data(), auth_md->second.length())));
235       return Status::OK;
236     } else {
237       return Status(StatusCode::UNAUTHENTICATED,
238                     string("Invalid principal: ") +
239                         string(auth_md_value.data(), auth_md_value.length()));
240     }
241   }
242 
243  private:
244   static const char kIdentityPropName[];
245   bool is_blocking_;
246 };
247 
248 const char TestAuthMetadataProcessor::kGoodGuy[] = "Dr Jekyll";
249 const char TestAuthMetadataProcessor::kIdentityPropName[] = "novel identity";
250 
251 class Proxy : public grpc::testing::EchoTestService::Service {
252  public:
Proxy(const std::shared_ptr<Channel> & channel)253   explicit Proxy(const std::shared_ptr<Channel>& channel)
254       : stub_(grpc::testing::EchoTestService::NewStub(channel)) {}
255 
Echo(ServerContext * server_context,const EchoRequest * request,EchoResponse * response)256   Status Echo(ServerContext* server_context, const EchoRequest* request,
257               EchoResponse* response) override {
258     std::unique_ptr<ClientContext> client_context =
259         ClientContext::FromServerContext(*server_context);
260     return stub_->Echo(client_context.get(), *request, response);
261   }
262 
263  private:
264   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
265 };
266 
267 class TestServiceImplDupPkg
268     : public grpc::testing::duplicate::EchoTestService::Service {
269  public:
Echo(ServerContext *,const EchoRequest *,EchoResponse * response)270   Status Echo(ServerContext* /*context*/, const EchoRequest* /*request*/,
271               EchoResponse* response) override {
272     response->set_message("no package");
273     return Status::OK;
274   }
275 };
276 
277 class TestScenario {
278  public:
TestScenario(bool use_interceptors,bool use_proxy,bool inproc,const std::string & credentials_type,bool callback_server)279   TestScenario(bool use_interceptors, bool use_proxy, bool inproc,
280                const std::string& credentials_type, bool callback_server)
281       : use_interceptors_(use_interceptors),
282         use_proxy_(use_proxy),
283         inproc_(inproc),
284         credentials_type_(credentials_type),
285         callback_server_(callback_server) {}
286 
use_interceptors() const287   bool use_interceptors() const { return use_interceptors_; }
use_proxy() const288   bool use_proxy() const { return use_proxy_; }
inproc() const289   bool inproc() const { return inproc_; }
credentials_type() const290   const std::string& credentials_type() const { return credentials_type_; }
callback_server() const291   bool callback_server() const { return callback_server_; }
292 
293   std::string AsString() const;
294 
Name(const::testing::TestParamInfo<TestScenario> & info)295   static std::string Name(const ::testing::TestParamInfo<TestScenario>& info) {
296     return info.param.AsString();
297   }
298 
299  private:
300   bool use_interceptors_;
301   bool use_proxy_;
302   bool inproc_;
303   const std::string credentials_type_;
304   bool callback_server_;
305 };
306 
AsString() const307 std::string TestScenario::AsString() const {
308   std::string retval = use_interceptors_ ? "Interceptor" : "";
309   if (use_proxy_) retval += "Proxy";
310   if (inproc_) retval += "Inproc";
311   if (callback_server_) retval += "CallbackServer";
312   if (credentials_type_ == kInsecureCredentialsType) {
313     retval += "Insecure";
314   } else {
315     std::string creds_type = absl::AsciiStrToLower(credentials_type_);
316     if (!creds_type.empty()) creds_type[0] = absl::ascii_toupper(creds_type[0]);
317     retval += creds_type;
318   }
319   return retval;
320 }
321 
322 class End2endTest : public ::testing::TestWithParam<TestScenario> {
323  protected:
SetUpTestSuite()324   static void SetUpTestSuite() { grpc_init(); }
TearDownTestSuite()325   static void TearDownTestSuite() { grpc_shutdown(); }
End2endTest()326   End2endTest()
327       : is_server_started_(false),
328         kMaxMessageSize_(8192),
329         special_service_("special"),
330         first_picked_port_(0) {}
331 
TearDown()332   void TearDown() override {
333     if (is_server_started_) {
334       server_->Shutdown();
335       if (proxy_server_) proxy_server_->Shutdown();
336     }
337     if (first_picked_port_ > 0) {
338       grpc_recycle_unused_port(first_picked_port_);
339     }
340   }
341 
StartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)342   void StartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
343     int port = grpc_pick_unused_port_or_die();
344     first_picked_port_ = port;
345     server_address_ << "localhost:" << port;
346     // Setup server
347     BuildAndStartServer(processor);
348   }
349 
RestartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)350   void RestartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
351     if (is_server_started_) {
352       server_->Shutdown();
353       BuildAndStartServer(processor);
354     }
355   }
356 
BuildAndStartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)357   void BuildAndStartServer(
358       const std::shared_ptr<AuthMetadataProcessor>& processor) {
359     ServerBuilder builder;
360     ConfigureServerBuilder(&builder);
361     auto server_creds = GetCredentialsProvider()->GetServerCredentials(
362         GetParam().credentials_type());
363     if (GetParam().credentials_type() != kInsecureCredentialsType) {
364       server_creds->SetAuthMetadataProcessor(processor);
365     }
366     if (GetParam().use_interceptors()) {
367       std::vector<
368           std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
369           creators;
370       // Add 20 phony server interceptors
371       creators.reserve(20);
372       for (auto i = 0; i < 20; i++) {
373         creators.push_back(std::make_unique<PhonyInterceptorFactory>());
374       }
375       builder.experimental().SetInterceptorCreators(std::move(creators));
376     }
377     builder.AddListeningPort(server_address_.str(), server_creds);
378     if (!GetParam().callback_server()) {
379       builder.RegisterService(&service_);
380     } else {
381       builder.RegisterService(&callback_service_);
382     }
383     builder.RegisterService("foo.test.youtube.com", &special_service_);
384     builder.RegisterService(&dup_pkg_service_);
385 
386     builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
387     builder.SetSyncServerOption(
388         ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
389 
390     server_ = builder.BuildAndStart();
391     is_server_started_ = true;
392   }
393 
ConfigureServerBuilder(ServerBuilder * builder)394   virtual void ConfigureServerBuilder(ServerBuilder* builder) {
395     builder->SetMaxMessageSize(
396         kMaxMessageSize_);  // For testing max message size.
397   }
398 
ResetChannel(std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators={})399   void ResetChannel(
400       std::vector<
401           std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
402           interceptor_creators = {}) {
403     if (!is_server_started_) {
404       StartServer(std::shared_ptr<AuthMetadataProcessor>());
405     }
406     EXPECT_TRUE(is_server_started_);
407     ChannelArguments args;
408     auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
409         GetParam().credentials_type(), &args);
410     if (!user_agent_prefix_.empty()) {
411       args.SetUserAgentPrefix(user_agent_prefix_);
412     }
413     args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test");
414 
415     if (!GetParam().inproc()) {
416       if (!GetParam().use_interceptors()) {
417         channel_ = grpc::CreateCustomChannel(server_address_.str(),
418                                              channel_creds, args);
419       } else {
420         channel_ = CreateCustomChannelWithInterceptors(
421             server_address_.str(), channel_creds, args,
422             interceptor_creators.empty() ? CreatePhonyClientInterceptors()
423                                          : std::move(interceptor_creators));
424       }
425     } else {
426       if (!GetParam().use_interceptors()) {
427         channel_ = server_->InProcessChannel(args);
428       } else {
429         channel_ = server_->experimental().InProcessChannelWithInterceptors(
430             args, interceptor_creators.empty()
431                       ? CreatePhonyClientInterceptors()
432                       : std::move(interceptor_creators));
433       }
434     }
435   }
436 
ResetStub(std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators={})437   void ResetStub(
438       std::vector<
439           std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
440           interceptor_creators = {}) {
441     ResetChannel(std::move(interceptor_creators));
442     if (GetParam().use_proxy()) {
443       proxy_service_ = std::make_unique<Proxy>(channel_);
444       int port = grpc_pick_unused_port_or_die();
445       std::ostringstream proxyaddr;
446       proxyaddr << "localhost:" << port;
447       ServerBuilder builder;
448       builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials());
449       builder.RegisterService(proxy_service_.get());
450 
451       builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
452       builder.SetSyncServerOption(
453           ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
454 
455       proxy_server_ = builder.BuildAndStart();
456 
457       channel_ =
458           grpc::CreateChannel(proxyaddr.str(), InsecureChannelCredentials());
459     }
460 
461     stub_ = grpc::testing::EchoTestService::NewStub(channel_);
462     PhonyInterceptor::Reset();
463   }
464 
465   bool is_server_started_;
466   std::shared_ptr<Channel> channel_;
467   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
468   std::unique_ptr<Server> server_;
469   std::unique_ptr<Server> proxy_server_;
470   std::unique_ptr<Proxy> proxy_service_;
471   std::ostringstream server_address_;
472   const int kMaxMessageSize_;
473   TestServiceImpl service_;
474   CallbackTestServiceImpl callback_service_;
475   TestServiceImpl special_service_;
476   TestServiceImplDupPkg dup_pkg_service_;
477   std::string user_agent_prefix_;
478   int first_picked_port_;
479 };
480 
SendRpc(grpc::testing::EchoTestService::Stub * stub,int num_rpcs,bool with_binary_metadata)481 void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
482              bool with_binary_metadata) {
483   EchoRequest request;
484   EchoResponse response;
485   request.set_message("Hello hello hello hello");
486 
487   for (int i = 0; i < num_rpcs; ++i) {
488     ClientContext context;
489     if (with_binary_metadata) {
490       char bytes[8] = {'\0', '\1', '\2', '\3',
491                        '\4', '\5', '\6', static_cast<char>(i)};
492       context.AddMetadata("custom-bin", std::string(bytes, 8));
493     }
494     context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
495     Status s = stub->Echo(&context, request, &response);
496     EXPECT_EQ(response.message(), request.message());
497     EXPECT_TRUE(s.ok());
498   }
499 }
500 
501 // This class is for testing scenarios where RPCs are cancelled on the server
502 // by calling ServerContext::TryCancel()
503 class End2endServerTryCancelTest : public End2endTest {
504  protected:
505   // Helper for testing client-streaming RPCs which are cancelled on the server.
506   // Depending on the value of server_try_cancel parameter, this will test one
507   // of the following three scenarios:
508   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
509   //   any messages from the client
510   //
511   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
512   //   messages from the client
513   //
514   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
515   //   the messages from the client
516   //
517   // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestRequestStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,int num_msgs_to_send)518   void TestRequestStreamServerCancel(
519       ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) {
520     RestartServer(std::shared_ptr<AuthMetadataProcessor>());
521     ResetStub();
522     EchoRequest request;
523     EchoResponse response;
524     ClientContext context;
525 
526     // Send server_try_cancel value in the client metadata
527     context.AddMetadata(kServerTryCancelRequest,
528                         std::to_string(server_try_cancel));
529 
530     auto stream = stub_->RequestStream(&context, &response);
531 
532     int num_msgs_sent = 0;
533     while (num_msgs_sent < num_msgs_to_send) {
534       request.set_message("hello");
535       if (!stream->Write(request)) {
536         break;
537       }
538       num_msgs_sent++;
539     }
540     LOG(INFO) << "Sent " << num_msgs_sent << " messages";
541 
542     stream->WritesDone();
543     Status s = stream->Finish();
544 
545     // At this point, we know for sure that RPC was cancelled by the server
546     // since we passed server_try_cancel value in the metadata. Depending on the
547     // value of server_try_cancel, the RPC might have been cancelled by the
548     // server at different stages. The following validates our expectations of
549     // number of messages sent in various cancellation scenarios:
550 
551     switch (server_try_cancel) {
552       case CANCEL_BEFORE_PROCESSING:
553       case CANCEL_DURING_PROCESSING:
554         // If the RPC is cancelled by server before / during messages from the
555         // client, it means that the client most likely did not get a chance to
556         // send all the messages it wanted to send. i.e num_msgs_sent <=
557         // num_msgs_to_send
558         EXPECT_LE(num_msgs_sent, num_msgs_to_send);
559         break;
560 
561       case CANCEL_AFTER_PROCESSING:
562         // If the RPC was cancelled after all messages were read by the server,
563         // the client did get a chance to send all its messages
564         EXPECT_EQ(num_msgs_sent, num_msgs_to_send);
565         break;
566 
567       default:
568         LOG(ERROR) << "Invalid server_try_cancel value: " << server_try_cancel;
569         EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
570                     server_try_cancel <= CANCEL_AFTER_PROCESSING);
571         break;
572     }
573 
574     EXPECT_FALSE(s.ok());
575     EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
576     // Make sure that the server interceptors were notified
577     if (GetParam().use_interceptors()) {
578       EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
579     }
580   }
581 
582   // Helper for testing server-streaming RPCs which are cancelled on the server.
583   // Depending on the value of server_try_cancel parameter, this will test one
584   // of the following three scenarios:
585   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before writing
586   //   any messages to the client
587   //
588   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while writing
589   //   messages to the client
590   //
591   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after writing all
592   //   the messages to the client
593   //
594   // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestResponseStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel)595   void TestResponseStreamServerCancel(
596       ServerTryCancelRequestPhase server_try_cancel) {
597     RestartServer(std::shared_ptr<AuthMetadataProcessor>());
598     ResetStub();
599     EchoRequest request;
600     EchoResponse response;
601     ClientContext context;
602 
603     // Send server_try_cancel in the client metadata
604     context.AddMetadata(kServerTryCancelRequest,
605                         std::to_string(server_try_cancel));
606 
607     request.set_message("hello");
608     auto stream = stub_->ResponseStream(&context, request);
609 
610     int num_msgs_read = 0;
611     while (num_msgs_read < kServerDefaultResponseStreamsToSend) {
612       if (!stream->Read(&response)) {
613         break;
614       }
615       EXPECT_EQ(response.message(),
616                 request.message() + std::to_string(num_msgs_read));
617       num_msgs_read++;
618     }
619     LOG(INFO) << "Read " << num_msgs_read << " messages";
620 
621     Status s = stream->Finish();
622 
623     // Depending on the value of server_try_cancel, the RPC might have been
624     // cancelled by the server at different stages. The following validates our
625     // expectations of number of messages read in various cancellation
626     // scenarios:
627     switch (server_try_cancel) {
628       case CANCEL_BEFORE_PROCESSING:
629         // Server cancelled before sending any messages. Which means the client
630         // wouldn't have read any
631         EXPECT_EQ(num_msgs_read, 0);
632         break;
633 
634       case CANCEL_DURING_PROCESSING:
635         // Server cancelled while writing messages. Client must have read less
636         // than or equal to the expected number of messages
637         EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
638         break;
639 
640       case CANCEL_AFTER_PROCESSING:
641         // Even though the Server cancelled after writing all messages, the RPC
642         // may be cancelled before the Client got a chance to read all the
643         // messages.
644         EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
645         break;
646 
647       default: {
648         LOG(ERROR) << "Invalid server_try_cancel value: " << server_try_cancel;
649         EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
650                     server_try_cancel <= CANCEL_AFTER_PROCESSING);
651         break;
652       }
653     }
654 
655     EXPECT_FALSE(s.ok());
656     // Make sure that the server interceptors were notified
657     if (GetParam().use_interceptors()) {
658       EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
659     }
660   }
661 
662   // Helper for testing bidirectional-streaming RPCs which are cancelled on the
663   // server. Depending on the value of server_try_cancel parameter, this will
664   // test one of the following three scenarios:
665   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
666   //   writing any messages from/to the client
667   //
668   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading/
669   //   writing messages from/to the client
670   //
671   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading/writing
672   //   all the messages from/to the client
673   //
674   // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,int num_messages)675   void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,
676                                   int num_messages) {
677     RestartServer(std::shared_ptr<AuthMetadataProcessor>());
678     ResetStub();
679     EchoRequest request;
680     EchoResponse response;
681     ClientContext context;
682 
683     // Send server_try_cancel in the client metadata
684     context.AddMetadata(kServerTryCancelRequest,
685                         std::to_string(server_try_cancel));
686 
687     auto stream = stub_->BidiStream(&context);
688 
689     int num_msgs_read = 0;
690     int num_msgs_sent = 0;
691     while (num_msgs_sent < num_messages) {
692       request.set_message("hello " + std::to_string(num_msgs_sent));
693       if (!stream->Write(request)) {
694         break;
695       }
696       num_msgs_sent++;
697 
698       if (!stream->Read(&response)) {
699         break;
700       }
701       num_msgs_read++;
702 
703       EXPECT_EQ(response.message(), request.message());
704     }
705     LOG(INFO) << "Sent " << num_msgs_sent << " messages";
706     LOG(INFO) << "Read " << num_msgs_read << " messages";
707 
708     stream->WritesDone();
709     Status s = stream->Finish();
710 
711     // Depending on the value of server_try_cancel, the RPC might have been
712     // cancelled by the server at different stages. The following validates our
713     // expectations of number of messages read in various cancellation
714     // scenarios:
715     switch (server_try_cancel) {
716       case CANCEL_BEFORE_PROCESSING:
717         EXPECT_EQ(num_msgs_read, 0);
718         break;
719 
720       case CANCEL_DURING_PROCESSING:
721         EXPECT_LE(num_msgs_sent, num_messages);
722         EXPECT_LE(num_msgs_read, num_msgs_sent);
723         break;
724 
725       case CANCEL_AFTER_PROCESSING:
726         EXPECT_EQ(num_msgs_sent, num_messages);
727 
728         // The Server cancelled after reading the last message and after writing
729         // the message to the client. However, the RPC cancellation might have
730         // taken effect before the client actually read the response.
731         EXPECT_LE(num_msgs_read, num_msgs_sent);
732         break;
733 
734       default:
735         LOG(ERROR) << "Invalid server_try_cancel value: " << server_try_cancel;
736         EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
737                     server_try_cancel <= CANCEL_AFTER_PROCESSING);
738         break;
739     }
740 
741     EXPECT_FALSE(s.ok());
742     EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
743     // Make sure that the server interceptors were notified
744     if (GetParam().use_interceptors()) {
745       EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
746     }
747   }
748 };
749 
TEST_P(End2endServerTryCancelTest,RequestEchoServerCancel)750 TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) {
751   ResetStub();
752   EchoRequest request;
753   EchoResponse response;
754   ClientContext context;
755 
756   context.AddMetadata(kServerTryCancelRequest,
757                       std::to_string(CANCEL_BEFORE_PROCESSING));
758   Status s = stub_->Echo(&context, request, &response);
759   EXPECT_FALSE(s.ok());
760   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
761 }
762 
763 // Server to cancel before doing reading the request
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelBeforeReads)764 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelBeforeReads) {
765   TestRequestStreamServerCancel(CANCEL_BEFORE_PROCESSING, 1);
766 }
767 
768 // Server to cancel while reading a request from the stream in parallel
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelDuringRead)769 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelDuringRead) {
770   TestRequestStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
771 }
772 
773 // Server to cancel after reading all the requests but before returning to the
774 // client
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelAfterReads)775 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelAfterReads) {
776   TestRequestStreamServerCancel(CANCEL_AFTER_PROCESSING, 4);
777 }
778 
779 // Server to cancel before sending any response messages
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelBefore)780 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelBefore) {
781   TestResponseStreamServerCancel(CANCEL_BEFORE_PROCESSING);
782 }
783 
784 // Server to cancel while writing a response to the stream in parallel
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelDuring)785 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelDuring) {
786   TestResponseStreamServerCancel(CANCEL_DURING_PROCESSING);
787 }
788 
789 // Server to cancel after writing all the responses to the stream but before
790 // returning to the client
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelAfter)791 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelAfter) {
792   TestResponseStreamServerCancel(CANCEL_AFTER_PROCESSING);
793 }
794 
795 // Server to cancel before reading/writing any requests/responses on the stream
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelBefore)796 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelBefore) {
797   TestBidiStreamServerCancel(CANCEL_BEFORE_PROCESSING, 2);
798 }
799 
800 // Server to cancel while reading/writing requests/responses on the stream in
801 // parallel
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelDuring)802 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelDuring) {
803   TestBidiStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
804 }
805 
806 // Server to cancel after reading/writing all requests/responses on the stream
807 // but before returning to the client
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelAfter)808 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) {
809   TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5);
810 }
811 
TEST_P(End2endTest,SimpleRpcWithCustomUserAgentPrefix)812 TEST_P(End2endTest, SimpleRpcWithCustomUserAgentPrefix) {
813   // User-Agent is an HTTP header for HTTP transports only
814   if (GetParam().inproc()) {
815     return;
816   }
817   user_agent_prefix_ = "custom_prefix";
818   ResetStub();
819   EchoRequest request;
820   EchoResponse response;
821   request.set_message("Hello hello hello hello");
822   request.mutable_param()->set_echo_metadata(true);
823 
824   ClientContext context;
825   Status s = stub_->Echo(&context, request, &response);
826   EXPECT_EQ(response.message(), request.message());
827   EXPECT_TRUE(s.ok());
828   const auto& trailing_metadata = context.GetServerTrailingMetadata();
829   auto iter = trailing_metadata.find("user-agent");
830   EXPECT_TRUE(iter != trailing_metadata.end());
831   std::string expected_prefix = user_agent_prefix_ + " grpc-c++/";
832   EXPECT_TRUE(iter->second.starts_with(expected_prefix)) << iter->second;
833 }
834 
TEST_P(End2endTest,MultipleRpcsWithVariedBinaryMetadataValue)835 TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) {
836   ResetStub();
837   std::vector<std::thread> threads;
838   threads.reserve(10);
839   for (int i = 0; i < 10; ++i) {
840     threads.emplace_back(SendRpc, stub_.get(), 10, true);
841   }
842   for (int i = 0; i < 10; ++i) {
843     threads[i].join();
844   }
845 }
846 
TEST_P(End2endTest,MultipleRpcs)847 TEST_P(End2endTest, MultipleRpcs) {
848   ResetStub();
849   std::vector<std::thread> threads;
850   threads.reserve(10);
851   for (int i = 0; i < 10; ++i) {
852     threads.emplace_back(SendRpc, stub_.get(), 10, false);
853   }
854   for (int i = 0; i < 10; ++i) {
855     threads[i].join();
856   }
857 }
858 
TEST_P(End2endTest,ManyStubs)859 TEST_P(End2endTest, ManyStubs) {
860   ResetStub();
861   ChannelTestPeer peer(channel_.get());
862   int registered_calls_pre = peer.registered_calls();
863   for (int i = 0; i < 1000; ++i) {
864     grpc::testing::EchoTestService::NewStub(channel_);
865   }
866   EXPECT_EQ(peer.registered_calls(), registered_calls_pre);
867 }
868 
TEST_P(End2endTest,EmptyBinaryMetadata)869 TEST_P(End2endTest, EmptyBinaryMetadata) {
870   ResetStub();
871   EchoRequest request;
872   EchoResponse response;
873   request.set_message("Hello hello hello hello");
874   ClientContext context;
875   context.AddMetadata("custom-bin", "");
876   Status s = stub_->Echo(&context, request, &response);
877   EXPECT_EQ(response.message(), request.message());
878   EXPECT_TRUE(s.ok());
879 }
880 
TEST_P(End2endTest,AuthoritySeenOnServerSide)881 TEST_P(End2endTest, AuthoritySeenOnServerSide) {
882   ResetStub();
883   EchoRequest request;
884   request.mutable_param()->set_echo_host_from_authority_header(true);
885   EchoResponse response;
886   request.set_message("Live long and prosper.");
887   ClientContext context;
888   Status s = stub_->Echo(&context, request, &response);
889   EXPECT_EQ(response.message(), request.message());
890   if (GetParam().credentials_type() == kTlsCredentialsType) {
891     // SSL creds overrides the authority.
892     EXPECT_EQ("foo.test.google.fr", response.param().host());
893   } else if (GetParam().inproc()) {
894     EXPECT_EQ("inproc", response.param().host());
895   } else {
896     EXPECT_EQ(server_address_.str(), response.param().host());
897   }
898   EXPECT_TRUE(s.ok());
899 }
900 
TEST_P(End2endTest,ReconnectChannel)901 TEST_P(End2endTest, ReconnectChannel) {
902   if (GetParam().inproc()) {
903     return;
904   }
905   int poller_slowdown_factor = 1;
906   // It needs 2 pollset_works to reconnect the channel with polling engine
907   // "poll"
908 #ifdef GRPC_POSIX_SOCKET_EV
909   if (grpc_core::ConfigVars::Get().PollStrategy() == "poll") {
910     poller_slowdown_factor = 2;
911   }
912 #endif  // GRPC_POSIX_SOCKET_EV
913   ResetStub();
914   SendRpc(stub_.get(), 1, false);
915   RestartServer(std::shared_ptr<AuthMetadataProcessor>());
916   // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
917   // reconnect the channel. Make it a factor of 5x
918   gpr_sleep_until(
919       gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
920                    gpr_time_from_millis(kClientChannelBackupPollIntervalMs * 5 *
921                                             poller_slowdown_factor *
922                                             grpc_test_slowdown_factor(),
923                                         GPR_TIMESPAN)));
924   SendRpc(stub_.get(), 1, false);
925 }
926 
TEST_P(End2endTest,RequestStreamOneRequest)927 TEST_P(End2endTest, RequestStreamOneRequest) {
928   ResetStub();
929   EchoRequest request;
930   EchoResponse response;
931   ClientContext context;
932 
933   auto stream = stub_->RequestStream(&context, &response);
934   request.set_message("hello");
935   EXPECT_TRUE(stream->Write(request));
936   stream->WritesDone();
937   Status s = stream->Finish();
938   EXPECT_EQ(response.message(), request.message());
939   EXPECT_TRUE(s.ok());
940   EXPECT_TRUE(context.debug_error_string().empty());
941 }
942 
TEST_P(End2endTest,RequestStreamOneRequestWithCoalescingApi)943 TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) {
944   ResetStub();
945   EchoRequest request;
946   EchoResponse response;
947   ClientContext context;
948 
949   context.set_initial_metadata_corked(true);
950   auto stream = stub_->RequestStream(&context, &response);
951   request.set_message("hello");
952   stream->WriteLast(request, WriteOptions());
953   Status s = stream->Finish();
954   EXPECT_EQ(response.message(), request.message());
955   EXPECT_TRUE(s.ok());
956 }
957 
TEST_P(End2endTest,RequestStreamTwoRequests)958 TEST_P(End2endTest, RequestStreamTwoRequests) {
959   ResetStub();
960   EchoRequest request;
961   EchoResponse response;
962   ClientContext context;
963 
964   auto stream = stub_->RequestStream(&context, &response);
965   request.set_message("hello");
966   EXPECT_TRUE(stream->Write(request));
967   EXPECT_TRUE(stream->Write(request));
968   stream->WritesDone();
969   Status s = stream->Finish();
970   EXPECT_EQ(response.message(), "hellohello");
971   EXPECT_TRUE(s.ok());
972 }
973 
TEST_P(End2endTest,RequestStreamTwoRequestsWithWriteThrough)974 TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) {
975   ResetStub();
976   EchoRequest request;
977   EchoResponse response;
978   ClientContext context;
979 
980   auto stream = stub_->RequestStream(&context, &response);
981   request.set_message("hello");
982   EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
983   EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
984   stream->WritesDone();
985   Status s = stream->Finish();
986   EXPECT_EQ(response.message(), "hellohello");
987   EXPECT_TRUE(s.ok());
988 }
989 
TEST_P(End2endTest,RequestStreamTwoRequestsWithCoalescingApi)990 TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) {
991   ResetStub();
992   EchoRequest request;
993   EchoResponse response;
994   ClientContext context;
995 
996   context.set_initial_metadata_corked(true);
997   auto stream = stub_->RequestStream(&context, &response);
998   request.set_message("hello");
999   EXPECT_TRUE(stream->Write(request));
1000   stream->WriteLast(request, WriteOptions());
1001   Status s = stream->Finish();
1002   EXPECT_EQ(response.message(), "hellohello");
1003   EXPECT_TRUE(s.ok());
1004 }
1005 
TEST_P(End2endTest,ResponseStream)1006 TEST_P(End2endTest, ResponseStream) {
1007   ResetStub();
1008   EchoRequest request;
1009   EchoResponse response;
1010   ClientContext context;
1011   request.set_message("hello");
1012 
1013   auto stream = stub_->ResponseStream(&context, request);
1014   for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
1015     EXPECT_TRUE(stream->Read(&response));
1016     EXPECT_EQ(response.message(), request.message() + std::to_string(i));
1017   }
1018   EXPECT_FALSE(stream->Read(&response));
1019 
1020   Status s = stream->Finish();
1021   EXPECT_TRUE(s.ok());
1022 }
1023 
TEST_P(End2endTest,ResponseStreamWithCoalescingApi)1024 TEST_P(End2endTest, ResponseStreamWithCoalescingApi) {
1025   ResetStub();
1026   EchoRequest request;
1027   EchoResponse response;
1028   ClientContext context;
1029   request.set_message("hello");
1030   context.AddMetadata(kServerUseCoalescingApi, "1");
1031 
1032   auto stream = stub_->ResponseStream(&context, request);
1033   for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
1034     EXPECT_TRUE(stream->Read(&response));
1035     EXPECT_EQ(response.message(), request.message() + std::to_string(i));
1036   }
1037   EXPECT_FALSE(stream->Read(&response));
1038 
1039   Status s = stream->Finish();
1040   EXPECT_TRUE(s.ok());
1041 }
1042 
1043 // This was added to prevent regression from issue:
1044 // https://github.com/grpc/grpc/issues/11546
TEST_P(End2endTest,ResponseStreamWithEverythingCoalesced)1045 TEST_P(End2endTest, ResponseStreamWithEverythingCoalesced) {
1046   ResetStub();
1047   EchoRequest request;
1048   EchoResponse response;
1049   ClientContext context;
1050   request.set_message("hello");
1051   context.AddMetadata(kServerUseCoalescingApi, "1");
1052   // We will only send one message, forcing everything (init metadata, message,
1053   // trailing) to be coalesced together.
1054   context.AddMetadata(kServerResponseStreamsToSend, "1");
1055 
1056   auto stream = stub_->ResponseStream(&context, request);
1057   EXPECT_TRUE(stream->Read(&response));
1058   EXPECT_EQ(response.message(), request.message() + "0");
1059 
1060   EXPECT_FALSE(stream->Read(&response));
1061 
1062   Status s = stream->Finish();
1063   EXPECT_TRUE(s.ok());
1064 }
1065 
TEST_P(End2endTest,BidiStream)1066 TEST_P(End2endTest, BidiStream) {
1067   ResetStub();
1068   EchoRequest request;
1069   EchoResponse response;
1070   ClientContext context;
1071   std::string msg("hello");
1072 
1073   auto stream = stub_->BidiStream(&context);
1074 
1075   for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
1076     request.set_message(msg + std::to_string(i));
1077     EXPECT_TRUE(stream->Write(request));
1078     EXPECT_TRUE(stream->Read(&response));
1079     EXPECT_EQ(response.message(), request.message());
1080   }
1081 
1082   stream->WritesDone();
1083   EXPECT_FALSE(stream->Read(&response));
1084   EXPECT_FALSE(stream->Read(&response));
1085 
1086   Status s = stream->Finish();
1087   EXPECT_TRUE(s.ok());
1088 }
1089 
TEST_P(End2endTest,BidiStreamWithCoalescingApi)1090 TEST_P(End2endTest, BidiStreamWithCoalescingApi) {
1091   ResetStub();
1092   EchoRequest request;
1093   EchoResponse response;
1094   ClientContext context;
1095   context.AddMetadata(kServerFinishAfterNReads, "3");
1096   context.set_initial_metadata_corked(true);
1097   std::string msg("hello");
1098 
1099   auto stream = stub_->BidiStream(&context);
1100 
1101   request.set_message(msg + "0");
1102   EXPECT_TRUE(stream->Write(request));
1103   EXPECT_TRUE(stream->Read(&response));
1104   EXPECT_EQ(response.message(), request.message());
1105 
1106   request.set_message(msg + "1");
1107   EXPECT_TRUE(stream->Write(request));
1108   EXPECT_TRUE(stream->Read(&response));
1109   EXPECT_EQ(response.message(), request.message());
1110 
1111   request.set_message(msg + "2");
1112   stream->WriteLast(request, WriteOptions());
1113   EXPECT_TRUE(stream->Read(&response));
1114   EXPECT_EQ(response.message(), request.message());
1115 
1116   EXPECT_FALSE(stream->Read(&response));
1117   EXPECT_FALSE(stream->Read(&response));
1118 
1119   Status s = stream->Finish();
1120   EXPECT_TRUE(s.ok());
1121 }
1122 
1123 // This was added to prevent regression from issue:
1124 // https://github.com/grpc/grpc/issues/11546
TEST_P(End2endTest,BidiStreamWithEverythingCoalesced)1125 TEST_P(End2endTest, BidiStreamWithEverythingCoalesced) {
1126   ResetStub();
1127   EchoRequest request;
1128   EchoResponse response;
1129   ClientContext context;
1130   context.AddMetadata(kServerFinishAfterNReads, "1");
1131   context.set_initial_metadata_corked(true);
1132   std::string msg("hello");
1133 
1134   auto stream = stub_->BidiStream(&context);
1135 
1136   request.set_message(msg + "0");
1137   stream->WriteLast(request, WriteOptions());
1138   EXPECT_TRUE(stream->Read(&response));
1139   EXPECT_EQ(response.message(), request.message());
1140 
1141   EXPECT_FALSE(stream->Read(&response));
1142   EXPECT_FALSE(stream->Read(&response));
1143 
1144   Status s = stream->Finish();
1145   EXPECT_TRUE(s.ok());
1146 }
1147 
1148 // Talk to the two services with the same name but different package names.
1149 // The two stubs are created on the same channel.
TEST_P(End2endTest,DiffPackageServices)1150 TEST_P(End2endTest, DiffPackageServices) {
1151   ResetStub();
1152   EchoRequest request;
1153   EchoResponse response;
1154   request.set_message("Hello");
1155 
1156   ClientContext context;
1157   Status s = stub_->Echo(&context, request, &response);
1158   EXPECT_EQ(response.message(), request.message());
1159   EXPECT_TRUE(s.ok());
1160 
1161   std::unique_ptr<grpc::testing::duplicate::EchoTestService::Stub> dup_pkg_stub(
1162       grpc::testing::duplicate::EchoTestService::NewStub(channel_));
1163   ClientContext context2;
1164   s = dup_pkg_stub->Echo(&context2, request, &response);
1165   EXPECT_EQ("no package", response.message());
1166   EXPECT_TRUE(s.ok());
1167 }
1168 
1169 template <class ServiceType>
CancelRpc(ClientContext * context,int delay_us,ServiceType * service)1170 void CancelRpc(ClientContext* context, int delay_us, ServiceType* service) {
1171   gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
1172                                gpr_time_from_micros(delay_us, GPR_TIMESPAN)));
1173   while (!service->signal_client()) {
1174   }
1175   context->TryCancel();
1176 }
1177 
TEST_P(End2endTest,CancelRpcBeforeStart)1178 TEST_P(End2endTest, CancelRpcBeforeStart) {
1179   ResetStub();
1180   EchoRequest request;
1181   EchoResponse response;
1182   ClientContext context;
1183   request.set_message("hello");
1184   context.TryCancel();
1185   Status s = stub_->Echo(&context, request, &response);
1186   EXPECT_EQ("", response.message());
1187   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1188   if (GetParam().use_interceptors()) {
1189     EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1190   }
1191 }
1192 
TEST_P(End2endTest,CancelRpcAfterStart)1193 TEST_P(End2endTest, CancelRpcAfterStart) {
1194   for (int i = 0; i < 10; i++) {
1195     ResetStub();
1196     EchoRequest request;
1197     EchoResponse response;
1198     ClientContext context;
1199     request.set_message("hello");
1200     request.mutable_param()->set_server_notify_client_when_started(true);
1201     request.mutable_param()->set_skip_cancelled_check(true);
1202     Status s;
1203     std::thread echo_thread([this, &s, &context, &request, &response] {
1204       s = stub_->Echo(&context, request, &response);
1205     });
1206     if (!GetParam().callback_server()) {
1207       EXPECT_EQ(service_.ClientWaitUntilNRpcsStarted(1), 1);
1208     } else {
1209       EXPECT_EQ(callback_service_.ClientWaitUntilNRpcsStarted(1), 1);
1210     }
1211 
1212     context.TryCancel();
1213 
1214     if (!GetParam().callback_server()) {
1215       service_.SignalServerToContinue();
1216     } else {
1217       callback_service_.SignalServerToContinue();
1218     }
1219 
1220     echo_thread.join();
1221     // TODO(ctiller): improve test to not be flaky
1222     //
1223     // TryCancel is best effort, and it can happen that the cancellation is not
1224     // acted upon before the server wakes up, sends a response, and the client
1225     // reads that.
1226     // For this reason, we try a few times here to see the cancellation result.
1227     if (s.ok()) continue;
1228     EXPECT_EQ("", response.message());
1229     EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1230     if (GetParam().use_interceptors()) {
1231       EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1232     }
1233     return;
1234   }
1235   GTEST_FAIL() << "Failed to get cancellation";
1236 }
1237 
1238 // Client cancels request stream after sending two messages
TEST_P(End2endTest,ClientCancelsRequestStream)1239 TEST_P(End2endTest, ClientCancelsRequestStream) {
1240   ResetStub();
1241   EchoRequest request;
1242   EchoResponse response;
1243   ClientContext context;
1244   request.set_message("hello");
1245 
1246   auto stream = stub_->RequestStream(&context, &response);
1247   EXPECT_TRUE(stream->Write(request));
1248   EXPECT_TRUE(stream->Write(request));
1249 
1250   context.TryCancel();
1251 
1252   Status s = stream->Finish();
1253   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1254 
1255   EXPECT_EQ(response.message(), "");
1256   if (GetParam().use_interceptors()) {
1257     EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1258   }
1259 }
1260 
1261 // Client cancels server stream after sending some messages
TEST_P(End2endTest,ClientCancelsResponseStream)1262 TEST_P(End2endTest, ClientCancelsResponseStream) {
1263   ResetStub();
1264   EchoRequest request;
1265   EchoResponse response;
1266   ClientContext context;
1267   request.set_message("hello");
1268 
1269   auto stream = stub_->ResponseStream(&context, request);
1270 
1271   EXPECT_TRUE(stream->Read(&response));
1272   EXPECT_EQ(response.message(), request.message() + "0");
1273   EXPECT_TRUE(stream->Read(&response));
1274   EXPECT_EQ(response.message(), request.message() + "1");
1275 
1276   context.TryCancel();
1277 
1278   // The cancellation races with responses, so there might be zero or
1279   // one responses pending, read till failure
1280 
1281   if (stream->Read(&response)) {
1282     EXPECT_EQ(response.message(), request.message() + "2");
1283     // Since we have cancelled, we expect the next attempt to read to fail
1284     EXPECT_FALSE(stream->Read(&response));
1285   }
1286 
1287   Status s = stream->Finish();
1288   // The final status could be either of CANCELLED or OK depending on
1289   // who won the race.
1290   EXPECT_GE(grpc::StatusCode::CANCELLED, s.error_code());
1291   if (GetParam().use_interceptors()) {
1292     EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1293   }
1294 }
1295 
1296 // Client cancels bidi stream after sending some messages
TEST_P(End2endTest,ClientCancelsBidi)1297 TEST_P(End2endTest, ClientCancelsBidi) {
1298   ResetStub();
1299   EchoRequest request;
1300   EchoResponse response;
1301   ClientContext context;
1302   std::string msg("hello");
1303 
1304   // Send server_try_cancel value in the client metadata
1305   context.AddMetadata(kClientTryCancelRequest, std::to_string(1));
1306 
1307   auto stream = stub_->BidiStream(&context);
1308 
1309   request.set_message(msg + "0");
1310   EXPECT_TRUE(stream->Write(request));
1311   EXPECT_TRUE(stream->Read(&response));
1312   EXPECT_EQ(response.message(), request.message());
1313 
1314   request.set_message(msg + "1");
1315   EXPECT_TRUE(stream->Write(request));
1316 
1317   context.TryCancel();
1318 
1319   // The cancellation races with responses, so there might be zero or
1320   // one responses pending, read till failure
1321 
1322   if (stream->Read(&response)) {
1323     EXPECT_EQ(response.message(), request.message());
1324     // Since we have cancelled, we expect the next attempt to read to fail
1325     EXPECT_FALSE(stream->Read(&response));
1326   }
1327 
1328   Status s = stream->Finish();
1329   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1330   if (GetParam().use_interceptors()) {
1331     EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1332   }
1333 }
1334 
TEST_P(End2endTest,RpcMaxMessageSize)1335 TEST_P(End2endTest, RpcMaxMessageSize) {
1336   ResetStub();
1337   EchoRequest request;
1338   EchoResponse response;
1339   request.set_message(string(kMaxMessageSize_ * 2, 'a'));
1340   request.mutable_param()->set_server_die(true);
1341 
1342   ClientContext context;
1343   Status s = stub_->Echo(&context, request, &response);
1344   EXPECT_FALSE(s.ok());
1345 }
1346 
ReaderThreadFunc(ClientReaderWriter<EchoRequest,EchoResponse> * stream,gpr_event * ev)1347 void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream,
1348                       gpr_event* ev) {
1349   EchoResponse resp;
1350   gpr_event_set(ev, reinterpret_cast<void*>(1));
1351   while (stream->Read(&resp)) {
1352     LOG(INFO) << "Read message";
1353   }
1354 }
1355 
1356 // Run a Read and a WritesDone simultaneously.
TEST_P(End2endTest,SimultaneousReadWritesDone)1357 TEST_P(End2endTest, SimultaneousReadWritesDone) {
1358   ResetStub();
1359   ClientContext context;
1360   gpr_event ev;
1361   gpr_event_init(&ev);
1362   auto stream = stub_->BidiStream(&context);
1363   std::thread reader_thread(ReaderThreadFunc, stream.get(), &ev);
1364   gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
1365   stream->WritesDone();
1366   reader_thread.join();
1367   Status s = stream->Finish();
1368   EXPECT_TRUE(s.ok());
1369 }
1370 
TEST_P(End2endTest,ChannelState)1371 TEST_P(End2endTest, ChannelState) {
1372   if (GetParam().inproc()) {
1373     return;
1374   }
1375 
1376   ResetStub();
1377   // Start IDLE
1378   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
1379 
1380   // Did not ask to connect, no state change.
1381   CompletionQueue cq;
1382   std::chrono::system_clock::time_point deadline =
1383       std::chrono::system_clock::now() + std::chrono::milliseconds(10);
1384   channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, nullptr);
1385   void* tag;
1386   bool ok = true;
1387   cq.Next(&tag, &ok);
1388   EXPECT_FALSE(ok);
1389 
1390   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true));
1391   EXPECT_TRUE(channel_->WaitForStateChange(GRPC_CHANNEL_IDLE,
1392                                            gpr_inf_future(GPR_CLOCK_REALTIME)));
1393   auto state = channel_->GetState(false);
1394   EXPECT_TRUE(state == GRPC_CHANNEL_CONNECTING || state == GRPC_CHANNEL_READY);
1395 }
1396 
1397 // Takes 10s.
TEST_P(End2endTest,ChannelStateTimeout)1398 TEST_P(End2endTest, ChannelStateTimeout) {
1399   if ((GetParam().credentials_type() != kInsecureCredentialsType) ||
1400       GetParam().inproc()) {
1401     return;
1402   }
1403   int port = grpc_pick_unused_port_or_die();
1404   std::ostringstream server_address;
1405   server_address << "localhost:" << port;
1406   // Channel to non-existing server
1407   auto channel =
1408       grpc::CreateChannel(server_address.str(), InsecureChannelCredentials());
1409   // Start IDLE
1410   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(true));
1411 
1412   auto state = GRPC_CHANNEL_IDLE;
1413   for (int i = 0; i < 10; i++) {
1414     channel->WaitForStateChange(
1415         state, std::chrono::system_clock::now() + std::chrono::seconds(1));
1416     state = channel->GetState(false);
1417   }
1418 }
1419 
TEST_P(End2endTest,ChannelStateOnLameChannel)1420 TEST_P(End2endTest, ChannelStateOnLameChannel) {
1421   if ((GetParam().credentials_type() != kInsecureCredentialsType) ||
1422       GetParam().inproc()) {
1423     return;
1424   }
1425   // Channel using invalid target URI.  This creates a lame channel.
1426   auto channel = grpc::CreateChannel("dns:///", InsecureChannelCredentials());
1427   // Channel should immediately report TRANSIENT_FAILURE.
1428   EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel->GetState(true));
1429   // And state will never change.
1430   auto state = GRPC_CHANNEL_TRANSIENT_FAILURE;
1431   for (int i = 0; i < 10; ++i) {
1432     channel->WaitForStateChange(
1433         state, std::chrono::system_clock::now() + std::chrono::seconds(1));
1434     state = channel->GetState(false);
1435   }
1436 }
1437 
1438 // Talking to a non-existing service.
TEST_P(End2endTest,NonExistingService)1439 TEST_P(End2endTest, NonExistingService) {
1440   ResetChannel();
1441   std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1442   stub = grpc::testing::UnimplementedEchoService::NewStub(channel_);
1443 
1444   EchoRequest request;
1445   EchoResponse response;
1446   request.set_message("Hello");
1447 
1448   ClientContext context;
1449   Status s = stub->Unimplemented(&context, request, &response);
1450   EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code());
1451   EXPECT_EQ("", s.error_message());
1452 }
1453 
1454 // Ask the server to send back a serialized proto in trailer.
1455 // This is an example of setting error details.
TEST_P(End2endTest,BinaryTrailerTest)1456 TEST_P(End2endTest, BinaryTrailerTest) {
1457   ResetStub();
1458   EchoRequest request;
1459   EchoResponse response;
1460   ClientContext context;
1461 
1462   request.mutable_param()->set_echo_metadata(true);
1463   DebugInfo* info = request.mutable_param()->mutable_debug_info();
1464   info->add_stack_entries("stack_entry_1");
1465   info->add_stack_entries("stack_entry_2");
1466   info->add_stack_entries("stack_entry_3");
1467   info->set_detail("detailed debug info");
1468   std::string expected_string = info->SerializeAsString();
1469   request.set_message("Hello");
1470 
1471   Status s = stub_->Echo(&context, request, &response);
1472   EXPECT_FALSE(s.ok());
1473   auto trailers = context.GetServerTrailingMetadata();
1474   EXPECT_EQ(1u, trailers.count(kDebugInfoTrailerKey));
1475   auto iter = trailers.find(kDebugInfoTrailerKey);
1476   EXPECT_EQ(expected_string, iter->second);
1477   // Parse the returned trailer into a DebugInfo proto.
1478   DebugInfo returned_info;
1479   EXPECT_TRUE(returned_info.ParseFromString(ToString(iter->second)));
1480 }
1481 
TEST_P(End2endTest,ExpectErrorTest)1482 TEST_P(End2endTest, ExpectErrorTest) {
1483   ResetStub();
1484 
1485   std::vector<ErrorStatus> expected_status;
1486   expected_status.emplace_back();
1487   expected_status.back().set_code(13);  // INTERNAL
1488   // No Error message or details
1489 
1490   expected_status.emplace_back();
1491   expected_status.back().set_code(13);  // INTERNAL
1492   expected_status.back().set_error_message("text error message");
1493   expected_status.back().set_binary_error_details("text error details");
1494 
1495   expected_status.emplace_back();
1496   expected_status.back().set_code(13);  // INTERNAL
1497   expected_status.back().set_error_message("text error message");
1498   expected_status.back().set_binary_error_details(
1499       "\x0\x1\x2\x3\x4\x5\x6\x8\x9\xA\xB");
1500 
1501   for (auto iter = expected_status.begin(); iter != expected_status.end();
1502        ++iter) {
1503     EchoRequest request;
1504     EchoResponse response;
1505     ClientContext context;
1506     request.set_message("Hello");
1507     auto* error = request.mutable_param()->mutable_expected_error();
1508     error->set_code(iter->code());
1509     error->set_error_message(iter->error_message());
1510     error->set_binary_error_details(iter->binary_error_details());
1511 
1512     Status s = stub_->Echo(&context, request, &response);
1513     EXPECT_FALSE(s.ok());
1514     EXPECT_EQ(iter->code(), s.error_code());
1515     EXPECT_EQ(iter->error_message(), s.error_message());
1516     EXPECT_EQ(iter->binary_error_details(), s.error_details());
1517     EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "created"));
1518 #ifndef NDEBUG
1519     // grpc_core::StatusIntProperty::kFileLine is for debug only
1520     EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "file"));
1521     EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "line"));
1522 #endif
1523     EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "status"));
1524     EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "13"));
1525   }
1526 }
1527 
1528 //////////////////////////////////////////////////////////////////////////
1529 // Test with and without a proxy.
1530 class ProxyEnd2endTest : public End2endTest {
1531  protected:
1532 };
1533 
TEST_P(ProxyEnd2endTest,SimpleRpc)1534 TEST_P(ProxyEnd2endTest, SimpleRpc) {
1535   ResetStub();
1536   SendRpc(stub_.get(), 1, false);
1537 }
1538 
TEST_P(ProxyEnd2endTest,SimpleRpcWithEmptyMessages)1539 TEST_P(ProxyEnd2endTest, SimpleRpcWithEmptyMessages) {
1540   ResetStub();
1541   EchoRequest request;
1542   EchoResponse response;
1543 
1544   ClientContext context;
1545   Status s = stub_->Echo(&context, request, &response);
1546   EXPECT_TRUE(s.ok());
1547 }
1548 
TEST_P(ProxyEnd2endTest,MultipleRpcs)1549 TEST_P(ProxyEnd2endTest, MultipleRpcs) {
1550   ResetStub();
1551   std::vector<std::thread> threads;
1552   threads.reserve(10);
1553   for (int i = 0; i < 10; ++i) {
1554     threads.emplace_back(SendRpc, stub_.get(), 10, false);
1555   }
1556   for (int i = 0; i < 10; ++i) {
1557     threads[i].join();
1558   }
1559 }
1560 
1561 // Set a 10us deadline and make sure proper error is returned.
TEST_P(ProxyEnd2endTest,RpcDeadlineExpires)1562 TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) {
1563   ResetStub();
1564   EchoRequest request;
1565   EchoResponse response;
1566   request.set_message("Hello");
1567   request.mutable_param()->set_skip_cancelled_check(true);
1568   // Let server sleep for 4 secs first to guarantee expiry.
1569   // 4 secs might seem a bit extreme but the timer manager would have been just
1570   // initialized (when ResetStub() was called) and there are some warmup costs
1571   // i.e the timer thread many not have even started. There might also be other
1572   // delays in the timer manager thread (in acquiring locks, timer data
1573   // structure manipulations, starting backup timer threads) that add to the
1574   // delays. 4 secs might be still not enough in some cases but this
1575   // significantly reduces the test flakes
1576   request.mutable_param()->set_server_sleep_us(4 * 1000 * 1000);
1577 
1578   ClientContext context;
1579   std::chrono::system_clock::time_point deadline =
1580       std::chrono::system_clock::now() + std::chrono::milliseconds(1);
1581   context.set_deadline(deadline);
1582   Status s = stub_->Echo(&context, request, &response);
1583   EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.error_code());
1584 }
1585 
1586 // Set a long but finite deadline.
TEST_P(ProxyEnd2endTest,RpcLongDeadline)1587 TEST_P(ProxyEnd2endTest, RpcLongDeadline) {
1588   ResetStub();
1589   EchoRequest request;
1590   EchoResponse response;
1591   request.set_message("Hello");
1592 
1593   ClientContext context;
1594   std::chrono::system_clock::time_point deadline =
1595       std::chrono::system_clock::now() + std::chrono::hours(1);
1596   context.set_deadline(deadline);
1597   Status s = stub_->Echo(&context, request, &response);
1598   EXPECT_EQ(response.message(), request.message());
1599   EXPECT_TRUE(s.ok());
1600 }
1601 
1602 // Ask server to echo back the deadline it sees.
TEST_P(ProxyEnd2endTest,EchoDeadline)1603 TEST_P(ProxyEnd2endTest, EchoDeadline) {
1604   ResetStub();
1605   EchoRequest request;
1606   EchoResponse response;
1607   request.set_message("Hello");
1608   request.mutable_param()->set_echo_deadline(true);
1609 
1610   ClientContext context;
1611   std::chrono::system_clock::time_point deadline =
1612       std::chrono::system_clock::now() + std::chrono::seconds(100);
1613   context.set_deadline(deadline);
1614   Status s = stub_->Echo(&context, request, &response);
1615   EXPECT_EQ(response.message(), request.message());
1616   EXPECT_TRUE(s.ok());
1617   gpr_timespec sent_deadline;
1618   Timepoint2Timespec(deadline, &sent_deadline);
1619   // We want to allow some reasonable error given:
1620   // - request_deadline() only has 1sec resolution so the best we can do is +-1
1621   // - if sent_deadline.tv_nsec is very close to the next second's boundary we
1622   // can end up being off by 2 in one direction.
1623   EXPECT_LE(response.param().request_deadline() - sent_deadline.tv_sec, 2);
1624   EXPECT_GE(response.param().request_deadline() - sent_deadline.tv_sec, -1);
1625 }
1626 
1627 // Ask server to echo back the deadline it sees. The rpc has no deadline.
TEST_P(ProxyEnd2endTest,EchoDeadlineForNoDeadlineRpc)1628 TEST_P(ProxyEnd2endTest, EchoDeadlineForNoDeadlineRpc) {
1629   ResetStub();
1630   EchoRequest request;
1631   EchoResponse response;
1632   request.set_message("Hello");
1633   request.mutable_param()->set_echo_deadline(true);
1634 
1635   ClientContext context;
1636   Status s = stub_->Echo(&context, request, &response);
1637   EXPECT_EQ(response.message(), request.message());
1638   EXPECT_TRUE(s.ok());
1639   EXPECT_EQ(response.param().request_deadline(),
1640             gpr_inf_future(GPR_CLOCK_REALTIME).tv_sec);
1641 }
1642 
TEST_P(ProxyEnd2endTest,UnimplementedRpc)1643 TEST_P(ProxyEnd2endTest, UnimplementedRpc) {
1644   ResetStub();
1645   EchoRequest request;
1646   EchoResponse response;
1647   request.set_message("Hello");
1648 
1649   ClientContext context;
1650   Status s = stub_->Unimplemented(&context, request, &response);
1651   EXPECT_FALSE(s.ok());
1652   EXPECT_EQ(s.error_code(), grpc::StatusCode::UNIMPLEMENTED);
1653   EXPECT_EQ(s.error_message(), "");
1654   EXPECT_EQ(response.message(), "");
1655 }
1656 
1657 // Client cancels rpc after 10ms
TEST_P(ProxyEnd2endTest,ClientCancelsRpc)1658 TEST_P(ProxyEnd2endTest, ClientCancelsRpc) {
1659   ResetStub();
1660   EchoRequest request;
1661   EchoResponse response;
1662   request.set_message("Hello");
1663   const int kCancelDelayUs = 10 * 1000;
1664   request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs);
1665 
1666   ClientContext context;
1667   std::thread cancel_thread;
1668   if (!GetParam().callback_server()) {
1669     cancel_thread = std::thread(
1670         [&context, this](int delay) { CancelRpc(&context, delay, &service_); },
1671         kCancelDelayUs);
1672     // Note: the unusual pattern above (and below) is caused by a conflict
1673     // between two sets of compiler expectations. clang allows const to be
1674     // captured without mention, so there is no need to capture kCancelDelayUs
1675     // (and indeed clang-tidy complains if you do so). OTOH, a Windows compiler
1676     // in our tests requires an explicit capture even for const. We square this
1677     // circle by passing the const value in as an argument to the lambda.
1678   } else {
1679     cancel_thread = std::thread(
1680         [&context, this](int delay) {
1681           CancelRpc(&context, delay, &callback_service_);
1682         },
1683         kCancelDelayUs);
1684   }
1685   Status s = stub_->Echo(&context, request, &response);
1686   cancel_thread.join();
1687   EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1688   EXPECT_EQ(s.error_message(), "CANCELLED");
1689 }
1690 
1691 // Server cancels rpc after 1ms
TEST_P(ProxyEnd2endTest,ServerCancelsRpc)1692 TEST_P(ProxyEnd2endTest, ServerCancelsRpc) {
1693   ResetStub();
1694   EchoRequest request;
1695   EchoResponse response;
1696   request.set_message("Hello");
1697   request.mutable_param()->set_server_cancel_after_us(1000);
1698 
1699   ClientContext context;
1700   Status s = stub_->Echo(&context, request, &response);
1701   EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1702   EXPECT_TRUE(s.error_message().empty());
1703 }
1704 
1705 // Make the response larger than the flow control window.
TEST_P(ProxyEnd2endTest,HugeResponse)1706 TEST_P(ProxyEnd2endTest, HugeResponse) {
1707   ResetStub();
1708   EchoRequest request;
1709   EchoResponse response;
1710   request.set_message("huge response");
1711   const size_t kResponseSize = 1024 * (1024 + 10);
1712   request.mutable_param()->set_response_message_length(kResponseSize);
1713 
1714   ClientContext context;
1715   std::chrono::system_clock::time_point deadline =
1716       std::chrono::system_clock::now() + std::chrono::seconds(20);
1717   context.set_deadline(deadline);
1718   Status s = stub_->Echo(&context, request, &response);
1719   EXPECT_EQ(kResponseSize, response.message().size());
1720   EXPECT_TRUE(s.ok());
1721 }
1722 
TEST_P(ProxyEnd2endTest,Peer)1723 TEST_P(ProxyEnd2endTest, Peer) {
1724   // Peer is not meaningful for inproc
1725   if (GetParam().inproc()) {
1726     return;
1727   }
1728   ResetStub();
1729   EchoRequest request;
1730   EchoResponse response;
1731   request.set_message("hello");
1732   request.mutable_param()->set_echo_peer(true);
1733 
1734   ClientContext context;
1735   Status s = stub_->Echo(&context, request, &response);
1736   EXPECT_EQ(response.message(), request.message());
1737   EXPECT_TRUE(s.ok());
1738   EXPECT_TRUE(CheckIsLocalhost(response.param().peer()));
1739   EXPECT_TRUE(CheckIsLocalhost(context.peer()));
1740 }
1741 
1742 //////////////////////////////////////////////////////////////////////////
1743 class SecureEnd2endTest : public End2endTest {
1744  protected:
SecureEnd2endTest()1745   SecureEnd2endTest() {
1746     CHECK(!GetParam().use_proxy());
1747     CHECK(GetParam().credentials_type() != kInsecureCredentialsType);
1748   }
1749 };
1750 
TEST_P(SecureEnd2endTest,SimpleRpcWithHost)1751 TEST_P(SecureEnd2endTest, SimpleRpcWithHost) {
1752   ResetStub();
1753 
1754   EchoRequest request;
1755   EchoResponse response;
1756   request.set_message("Hello");
1757 
1758   ClientContext context;
1759   context.set_authority("foo.test.youtube.com");
1760   Status s = stub_->Echo(&context, request, &response);
1761   EXPECT_EQ(response.message(), request.message());
1762   EXPECT_TRUE(response.has_param());
1763   EXPECT_EQ("special", response.param().host());
1764   EXPECT_TRUE(s.ok());
1765 }
1766 
MetadataContains(const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,const std::string & key,const std::string & value)1767 bool MetadataContains(
1768     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
1769     const std::string& key, const std::string& value) {
1770   int count = 0;
1771 
1772   for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator iter =
1773            metadata.begin();
1774        iter != metadata.end(); ++iter) {
1775     if (ToString(iter->first) == key && ToString(iter->second) == value) {
1776       count++;
1777     }
1778   }
1779   return count == 1;
1780 }
1781 
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginAndProcessorSuccess)1782 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) {
1783   auto* processor = new TestAuthMetadataProcessor(true);
1784   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1785   ResetStub();
1786   EchoRequest request;
1787   EchoResponse response;
1788   ClientContext context;
1789   context.set_credentials(processor->GetCompatibleClientCreds());
1790   request.set_message("Hello");
1791   request.mutable_param()->set_echo_metadata(true);
1792   request.mutable_param()->set_expected_client_identity(
1793       TestAuthMetadataProcessor::kGoodGuy);
1794   request.mutable_param()->set_expected_transport_security_type(
1795       GetParam().credentials_type());
1796 
1797   Status s = stub_->Echo(&context, request, &response);
1798   EXPECT_EQ(request.message(), response.message());
1799   EXPECT_TRUE(s.ok());
1800 
1801   // Metadata should have been consumed by the processor.
1802   EXPECT_FALSE(MetadataContains(
1803       context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY,
1804       std::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
1805 }
1806 
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginAndProcessorFailure)1807 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorFailure) {
1808   auto* processor = new TestAuthMetadataProcessor(true);
1809   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1810   ResetStub();
1811   EchoRequest request;
1812   EchoResponse response;
1813   ClientContext context;
1814   context.set_credentials(processor->GetIncompatibleClientCreds());
1815   request.set_message("Hello");
1816 
1817   Status s = stub_->Echo(&context, request, &response);
1818   EXPECT_FALSE(s.ok());
1819   EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
1820 }
1821 
TEST_P(SecureEnd2endTest,SetPerCallCredentials)1822 TEST_P(SecureEnd2endTest, SetPerCallCredentials) {
1823   ResetStub();
1824   EchoRequest request;
1825   EchoResponse response;
1826   ClientContext context;
1827   std::shared_ptr<CallCredentials> creds =
1828       GoogleIAMCredentials(kFakeToken, kFakeSelector);
1829   context.set_credentials(creds);
1830   request.set_message("Hello");
1831   request.mutable_param()->set_echo_metadata(true);
1832 
1833   Status s = stub_->Echo(&context, request, &response);
1834   EXPECT_EQ(request.message(), response.message());
1835   EXPECT_TRUE(s.ok());
1836   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1837                                GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1838                                kFakeToken));
1839   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1840                                GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1841                                kFakeSelector));
1842   EXPECT_EQ(context.credentials()->DebugString(),
1843             kExpectedFakeCredsDebugString);
1844 }
1845 
1846 class CredentialsInterceptor : public experimental::Interceptor {
1847  public:
CredentialsInterceptor(experimental::ClientRpcInfo * info)1848   explicit CredentialsInterceptor(experimental::ClientRpcInfo* info)
1849       : info_(info) {}
1850 
Intercept(experimental::InterceptorBatchMethods * methods)1851   void Intercept(experimental::InterceptorBatchMethods* methods) override {
1852     if (methods->QueryInterceptionHookPoint(
1853             experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
1854       std::shared_ptr<CallCredentials> creds =
1855           GoogleIAMCredentials(kFakeToken, kFakeSelector);
1856       info_->client_context()->set_credentials(creds);
1857     }
1858     methods->Proceed();
1859   }
1860 
1861  private:
1862   experimental::ClientRpcInfo* info_ = nullptr;
1863 };
1864 
1865 class CredentialsInterceptorFactory
1866     : public experimental::ClientInterceptorFactoryInterface {
CreateClientInterceptor(experimental::ClientRpcInfo * info)1867   CredentialsInterceptor* CreateClientInterceptor(
1868       experimental::ClientRpcInfo* info) override {
1869     return new CredentialsInterceptor(info);
1870   }
1871 };
1872 
TEST_P(SecureEnd2endTest,CallCredentialsInterception)1873 TEST_P(SecureEnd2endTest, CallCredentialsInterception) {
1874   if (!GetParam().use_interceptors()) {
1875     return;
1876   }
1877   std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
1878       interceptor_creators;
1879   interceptor_creators.push_back(
1880       std::make_unique<CredentialsInterceptorFactory>());
1881   ResetStub(std::move(interceptor_creators));
1882   EchoRequest request;
1883   EchoResponse response;
1884   ClientContext context;
1885 
1886   request.set_message("Hello");
1887   request.mutable_param()->set_echo_metadata(true);
1888 
1889   Status s = stub_->Echo(&context, request, &response);
1890   EXPECT_EQ(request.message(), response.message());
1891   EXPECT_TRUE(s.ok());
1892   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1893                                GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1894                                kFakeToken));
1895   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1896                                GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1897                                kFakeSelector));
1898   EXPECT_EQ(context.credentials()->DebugString(),
1899             kExpectedFakeCredsDebugString);
1900 }
1901 
TEST_P(SecureEnd2endTest,CallCredentialsInterceptionWithSetCredentials)1902 TEST_P(SecureEnd2endTest, CallCredentialsInterceptionWithSetCredentials) {
1903   if (!GetParam().use_interceptors()) {
1904     return;
1905   }
1906   std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
1907       interceptor_creators;
1908   interceptor_creators.push_back(
1909       std::make_unique<CredentialsInterceptorFactory>());
1910   ResetStub(std::move(interceptor_creators));
1911   EchoRequest request;
1912   EchoResponse response;
1913   ClientContext context;
1914   std::shared_ptr<CallCredentials> creds1 =
1915       GoogleIAMCredentials(kWrongToken, kWrongSelector);
1916   context.set_credentials(creds1);
1917   EXPECT_EQ(context.credentials(), creds1);
1918   EXPECT_EQ(context.credentials()->DebugString(),
1919             kExpectedWrongCredsDebugString);
1920   request.set_message("Hello");
1921   request.mutable_param()->set_echo_metadata(true);
1922 
1923   Status s = stub_->Echo(&context, request, &response);
1924   EXPECT_EQ(request.message(), response.message());
1925   EXPECT_TRUE(s.ok());
1926   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1927                                GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1928                                kFakeToken));
1929   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1930                                GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1931                                kFakeSelector));
1932   EXPECT_EQ(context.credentials()->DebugString(),
1933             kExpectedFakeCredsDebugString);
1934 }
1935 
TEST_P(SecureEnd2endTest,OverridePerCallCredentials)1936 TEST_P(SecureEnd2endTest, OverridePerCallCredentials) {
1937   ResetStub();
1938   EchoRequest request;
1939   EchoResponse response;
1940   ClientContext context;
1941   std::shared_ptr<CallCredentials> creds1 =
1942       GoogleIAMCredentials(kFakeToken1, kFakeSelector1);
1943   context.set_credentials(creds1);
1944   EXPECT_EQ(context.credentials(), creds1);
1945   EXPECT_EQ(context.credentials()->DebugString(),
1946             kExpectedFakeCreds1DebugString);
1947   std::shared_ptr<CallCredentials> creds2 =
1948       GoogleIAMCredentials(kFakeToken2, kFakeSelector2);
1949   context.set_credentials(creds2);
1950   EXPECT_EQ(context.credentials(), creds2);
1951   request.set_message("Hello");
1952   request.mutable_param()->set_echo_metadata(true);
1953 
1954   Status s = stub_->Echo(&context, request, &response);
1955   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1956                                GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1957                                kFakeToken2));
1958   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1959                                GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1960                                kFakeSelector2));
1961   EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
1962                                 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1963                                 kFakeToken1));
1964   EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
1965                                 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1966                                 kFakeSelector1));
1967   EXPECT_EQ(context.credentials()->DebugString(),
1968             kExpectedFakeCreds2DebugString);
1969   EXPECT_EQ(request.message(), response.message());
1970   EXPECT_TRUE(s.ok());
1971 }
1972 
TEST_P(SecureEnd2endTest,AuthMetadataPluginKeyFailure)1973 TEST_P(SecureEnd2endTest, AuthMetadataPluginKeyFailure) {
1974   ResetStub();
1975   EchoRequest request;
1976   EchoResponse response;
1977   ClientContext context;
1978   context.set_credentials(grpc::MetadataCredentialsFromPlugin(
1979       std::unique_ptr<MetadataCredentialsPlugin>(
1980           new TestMetadataCredentialsPlugin(
1981               TestMetadataCredentialsPlugin::kBadMetadataKey,
1982               "Does not matter, will fail the key is invalid.", false, true,
1983               0))));
1984   request.set_message("Hello");
1985 
1986   Status s = stub_->Echo(&context, request, &response);
1987   EXPECT_FALSE(s.ok());
1988   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1989   EXPECT_EQ(context.credentials()->DebugString(),
1990             kExpectedAuthMetadataPluginKeyFailureCredsDebugString);
1991 }
1992 
TEST_P(SecureEnd2endTest,AuthMetadataPluginValueFailure)1993 TEST_P(SecureEnd2endTest, AuthMetadataPluginValueFailure) {
1994   ResetStub();
1995   EchoRequest request;
1996   EchoResponse response;
1997   ClientContext context;
1998   context.set_credentials(grpc::MetadataCredentialsFromPlugin(
1999       std::unique_ptr<MetadataCredentialsPlugin>(
2000           new TestMetadataCredentialsPlugin(
2001               TestMetadataCredentialsPlugin::kGoodMetadataKey,
2002               "With illegal \n value.", false, true, 0))));
2003   request.set_message("Hello");
2004 
2005   Status s = stub_->Echo(&context, request, &response);
2006   EXPECT_FALSE(s.ok());
2007   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
2008   EXPECT_EQ(context.credentials()->DebugString(),
2009             kExpectedAuthMetadataPluginValueFailureCredsDebugString);
2010 }
2011 
TEST_P(SecureEnd2endTest,AuthMetadataPluginWithDeadline)2012 TEST_P(SecureEnd2endTest, AuthMetadataPluginWithDeadline) {
2013   ResetStub();
2014   EchoRequest request;
2015   request.mutable_param()->set_skip_cancelled_check(true);
2016   EchoResponse response;
2017   ClientContext context;
2018   const int delay = 100;
2019   std::chrono::system_clock::time_point deadline =
2020       std::chrono::system_clock::now() + std::chrono::milliseconds(delay);
2021   context.set_deadline(deadline);
2022   context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2023       std::unique_ptr<MetadataCredentialsPlugin>(
2024           new TestMetadataCredentialsPlugin("meta_key", "Does not matter", true,
2025                                             true, delay))));
2026   request.set_message("Hello");
2027 
2028   Status s = stub_->Echo(&context, request, &response);
2029   if (!s.ok()) {
2030     EXPECT_TRUE(s.error_code() == StatusCode::DEADLINE_EXCEEDED ||
2031                 s.error_code() == StatusCode::UNAVAILABLE);
2032   }
2033   EXPECT_EQ(context.credentials()->DebugString(),
2034             kExpectedAuthMetadataPluginWithDeadlineCredsDebugString);
2035 }
2036 
TEST_P(SecureEnd2endTest,AuthMetadataPluginWithCancel)2037 TEST_P(SecureEnd2endTest, AuthMetadataPluginWithCancel) {
2038   ResetStub();
2039   EchoRequest request;
2040   request.mutable_param()->set_skip_cancelled_check(true);
2041   EchoResponse response;
2042   ClientContext context;
2043   const int delay = 100;
2044   context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2045       std::unique_ptr<MetadataCredentialsPlugin>(
2046           new TestMetadataCredentialsPlugin("meta_key", "Does not matter", true,
2047                                             true, delay))));
2048   request.set_message("Hello");
2049 
2050   std::thread cancel_thread([&] {
2051     gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
2052                                  gpr_time_from_millis(delay, GPR_TIMESPAN)));
2053     context.TryCancel();
2054   });
2055   Status s = stub_->Echo(&context, request, &response);
2056   if (!s.ok()) {
2057     EXPECT_TRUE(s.error_code() == StatusCode::CANCELLED ||
2058                 s.error_code() == StatusCode::UNAVAILABLE);
2059   }
2060   cancel_thread.join();
2061   EXPECT_EQ(context.credentials()->DebugString(),
2062             kExpectedAuthMetadataPluginWithDeadlineCredsDebugString);
2063 }
2064 
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginFailure)2065 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) {
2066   ResetStub();
2067   EchoRequest request;
2068   EchoResponse response;
2069   ClientContext context;
2070   context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2071       std::unique_ptr<MetadataCredentialsPlugin>(
2072           new TestMetadataCredentialsPlugin(
2073               TestMetadataCredentialsPlugin::kGoodMetadataKey,
2074               "Does not matter, will fail anyway (see 3rd param)", false, false,
2075               0))));
2076   request.set_message("Hello");
2077 
2078   Status s = stub_->Echo(&context, request, &response);
2079   EXPECT_FALSE(s.ok());
2080   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
2081   EXPECT_EQ(s.error_message(),
2082             std::string("Getting metadata from plugin failed with error: ") +
2083                 kTestCredsPluginErrorMsg);
2084   EXPECT_EQ(context.credentials()->DebugString(),
2085             kExpectedNonBlockingAuthMetadataPluginFailureCredsDebugString);
2086 }
2087 
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginAndProcessorSuccess)2088 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) {
2089   auto* processor = new TestAuthMetadataProcessor(false);
2090   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
2091   ResetStub();
2092   EchoRequest request;
2093   EchoResponse response;
2094   ClientContext context;
2095   context.set_credentials(processor->GetCompatibleClientCreds());
2096   request.set_message("Hello");
2097   request.mutable_param()->set_echo_metadata(true);
2098   request.mutable_param()->set_expected_client_identity(
2099       TestAuthMetadataProcessor::kGoodGuy);
2100   request.mutable_param()->set_expected_transport_security_type(
2101       GetParam().credentials_type());
2102 
2103   Status s = stub_->Echo(&context, request, &response);
2104   EXPECT_EQ(request.message(), response.message());
2105   EXPECT_TRUE(s.ok());
2106 
2107   // Metadata should have been consumed by the processor.
2108   EXPECT_FALSE(MetadataContains(
2109       context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY,
2110       std::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
2111   EXPECT_EQ(
2112       context.credentials()->DebugString(),
2113       kExpectedNonBlockingAuthMetadataPluginAndProcessorSuccessCredsDebugString);
2114 }
2115 
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginAndProcessorFailure)2116 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) {
2117   auto* processor = new TestAuthMetadataProcessor(false);
2118   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
2119   ResetStub();
2120   EchoRequest request;
2121   EchoResponse response;
2122   ClientContext context;
2123   context.set_credentials(processor->GetIncompatibleClientCreds());
2124   request.set_message("Hello");
2125 
2126   Status s = stub_->Echo(&context, request, &response);
2127   EXPECT_FALSE(s.ok());
2128   EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
2129   EXPECT_EQ(
2130       context.credentials()->DebugString(),
2131       kExpectedNonBlockingAuthMetadataPluginAndProcessorFailureCredsDebugString);
2132 }
2133 
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginFailure)2134 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) {
2135   ResetStub();
2136   EchoRequest request;
2137   EchoResponse response;
2138   ClientContext context;
2139   context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2140       std::unique_ptr<MetadataCredentialsPlugin>(
2141           new TestMetadataCredentialsPlugin(
2142               TestMetadataCredentialsPlugin::kGoodMetadataKey,
2143               "Does not matter, will fail anyway (see 3rd param)", true, false,
2144               0))));
2145   request.set_message("Hello");
2146 
2147   Status s = stub_->Echo(&context, request, &response);
2148   EXPECT_FALSE(s.ok());
2149   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
2150   EXPECT_EQ(s.error_message(),
2151             std::string("Getting metadata from plugin failed with error: ") +
2152                 kTestCredsPluginErrorMsg);
2153   EXPECT_EQ(context.credentials()->DebugString(),
2154             kExpectedBlockingAuthMetadataPluginFailureCredsDebugString);
2155 }
2156 
TEST_P(SecureEnd2endTest,CompositeCallCreds)2157 TEST_P(SecureEnd2endTest, CompositeCallCreds) {
2158   ResetStub();
2159   EchoRequest request;
2160   EchoResponse response;
2161   ClientContext context;
2162   const char kMetadataKey1[] = "call-creds-key1";
2163   const char kMetadataKey2[] = "call-creds-key2";
2164   const char kMetadataVal1[] = "call-creds-val1";
2165   const char kMetadataVal2[] = "call-creds-val2";
2166 
2167   context.set_credentials(grpc::CompositeCallCredentials(
2168       grpc::MetadataCredentialsFromPlugin(
2169           std::unique_ptr<MetadataCredentialsPlugin>(
2170               new TestMetadataCredentialsPlugin(kMetadataKey1, kMetadataVal1,
2171                                                 true, true, 0))),
2172       grpc::MetadataCredentialsFromPlugin(
2173           std::unique_ptr<MetadataCredentialsPlugin>(
2174               new TestMetadataCredentialsPlugin(kMetadataKey2, kMetadataVal2,
2175                                                 true, true, 0)))));
2176   request.set_message("Hello");
2177   request.mutable_param()->set_echo_metadata(true);
2178 
2179   Status s = stub_->Echo(&context, request, &response);
2180   EXPECT_TRUE(s.ok());
2181   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
2182                                kMetadataKey1, kMetadataVal1));
2183   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
2184                                kMetadataKey2, kMetadataVal2));
2185   EXPECT_EQ(context.credentials()->DebugString(),
2186             kExpectedCompositeCallCredsDebugString);
2187 }
2188 
TEST_P(SecureEnd2endTest,ClientAuthContext)2189 TEST_P(SecureEnd2endTest, ClientAuthContext) {
2190   ResetStub();
2191   EchoRequest request;
2192   EchoResponse response;
2193   request.set_message("Hello");
2194   request.mutable_param()->set_check_auth_context(
2195       GetParam().credentials_type() == kTlsCredentialsType);
2196   request.mutable_param()->set_expected_transport_security_type(
2197       GetParam().credentials_type());
2198   ClientContext context;
2199   Status s = stub_->Echo(&context, request, &response);
2200   EXPECT_EQ(response.message(), request.message());
2201   EXPECT_TRUE(s.ok());
2202 
2203   std::shared_ptr<const AuthContext> auth_ctx = context.auth_context();
2204   std::vector<grpc::string_ref> tst =
2205       auth_ctx->FindPropertyValues("transport_security_type");
2206   ASSERT_EQ(1u, tst.size());
2207   EXPECT_EQ(GetParam().credentials_type(), ToString(tst[0]));
2208   if (GetParam().credentials_type() == kTlsCredentialsType) {
2209     EXPECT_EQ("x509_subject_alternative_name",
2210               auth_ctx->GetPeerIdentityPropertyName());
2211     EXPECT_EQ(4u, auth_ctx->GetPeerIdentity().size());
2212     EXPECT_EQ("*.test.google.fr", ToString(auth_ctx->GetPeerIdentity()[0]));
2213     EXPECT_EQ("waterzooi.test.google.be",
2214               ToString(auth_ctx->GetPeerIdentity()[1]));
2215     EXPECT_EQ("*.test.youtube.com", ToString(auth_ctx->GetPeerIdentity()[2]));
2216     EXPECT_EQ("192.168.1.3", ToString(auth_ctx->GetPeerIdentity()[3]));
2217   }
2218 }
2219 
2220 class ResourceQuotaEnd2endTest : public End2endTest {
2221  public:
ResourceQuotaEnd2endTest()2222   ResourceQuotaEnd2endTest()
2223       : server_resource_quota_("server_resource_quota") {}
2224 
ConfigureServerBuilder(ServerBuilder * builder)2225   void ConfigureServerBuilder(ServerBuilder* builder) override {
2226     builder->SetResourceQuota(server_resource_quota_);
2227   }
2228 
2229  private:
2230   ResourceQuota server_resource_quota_;
2231 };
2232 
TEST_P(ResourceQuotaEnd2endTest,SimpleRequest)2233 TEST_P(ResourceQuotaEnd2endTest, SimpleRequest) {
2234   ResetStub();
2235 
2236   EchoRequest request;
2237   EchoResponse response;
2238   request.set_message("Hello");
2239 
2240   ClientContext context;
2241   Status s = stub_->Echo(&context, request, &response);
2242   EXPECT_EQ(response.message(), request.message());
2243   EXPECT_TRUE(s.ok());
2244 }
2245 
2246 // TODO(vjpai): refactor arguments into a struct if it makes sense
CreateTestScenarios(bool use_proxy,bool test_insecure,bool test_secure,bool test_inproc,bool test_callback_server)2247 std::vector<TestScenario> CreateTestScenarios(bool use_proxy,
2248                                               bool test_insecure,
2249                                               bool test_secure,
2250                                               bool test_inproc,
2251                                               bool test_callback_server) {
2252   std::vector<TestScenario> scenarios;
2253   std::vector<std::string> credentials_types;
2254 
2255   grpc_core::ConfigVars::Overrides overrides;
2256   overrides.client_channel_backup_poll_interval_ms =
2257       kClientChannelBackupPollIntervalMs;
2258   grpc_core::ConfigVars::SetOverrides(overrides);
2259 #if TARGET_OS_IPHONE
2260   // Workaround Apple CFStream bug
2261   grpc_core::SetEnv("grpc_cfstream", "0");
2262 #endif
2263 
2264   if (test_secure) {
2265     credentials_types =
2266         GetCredentialsProvider()->GetSecureCredentialsTypeList();
2267   }
2268   auto insec_ok = [] {
2269     // Only allow insecure credentials type when it is registered with the
2270     // provider. User may create providers that do not have insecure.
2271     return GetCredentialsProvider()->GetChannelCredentials(
2272                kInsecureCredentialsType, nullptr) != nullptr;
2273   };
2274   if (test_insecure && insec_ok()) {
2275     credentials_types.push_back(kInsecureCredentialsType);
2276   }
2277 
2278   // Test callback with inproc or if the event-engine allows it
2279   CHECK(!credentials_types.empty());
2280   for (const auto& cred : credentials_types) {
2281     scenarios.emplace_back(false, false, false, cred, false);
2282     scenarios.emplace_back(true, false, false, cred, false);
2283     if (test_callback_server) {
2284       // Note that these scenarios will be dynamically disabled if the event
2285       // engine doesn't run in the background
2286       scenarios.emplace_back(false, false, false, cred, true);
2287       scenarios.emplace_back(true, false, false, cred, true);
2288     }
2289     if (use_proxy) {
2290       scenarios.emplace_back(false, true, false, cred, false);
2291       scenarios.emplace_back(true, true, false, cred, false);
2292     }
2293   }
2294   if (test_inproc && insec_ok()) {
2295     scenarios.emplace_back(false, false, true, kInsecureCredentialsType, false);
2296     scenarios.emplace_back(true, false, true, kInsecureCredentialsType, false);
2297     if (test_callback_server) {
2298       scenarios.emplace_back(false, false, true, kInsecureCredentialsType,
2299                              true);
2300       scenarios.emplace_back(true, false, true, kInsecureCredentialsType, true);
2301     }
2302   }
2303   return scenarios;
2304 }
2305 
2306 INSTANTIATE_TEST_SUITE_P(
2307     End2end, End2endTest,
2308     ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)),
2309     &TestScenario::Name);
2310 
2311 INSTANTIATE_TEST_SUITE_P(
2312     End2endServerTryCancel, End2endServerTryCancelTest,
2313     ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)),
2314     &TestScenario::Name);
2315 
2316 INSTANTIATE_TEST_SUITE_P(
2317     ProxyEnd2end, ProxyEnd2endTest,
2318     ::testing::ValuesIn(CreateTestScenarios(true, true, true, true, true)),
2319     &TestScenario::Name);
2320 
2321 INSTANTIATE_TEST_SUITE_P(
2322     SecureEnd2end, SecureEnd2endTest,
2323     ::testing::ValuesIn(CreateTestScenarios(false, false, true, false, true)),
2324     &TestScenario::Name);
2325 
2326 INSTANTIATE_TEST_SUITE_P(
2327     ResourceQuotaEnd2end, ResourceQuotaEnd2endTest,
2328     ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)),
2329     &TestScenario::Name);
2330 
2331 }  // namespace
2332 }  // namespace testing
2333 }  // namespace grpc
2334 
main(int argc,char ** argv)2335 int main(int argc, char** argv) {
2336   grpc::testing::TestEnvironment env(&argc, argv);
2337   ::testing::InitGoogleTest(&argc, argv);
2338   int ret = RUN_ALL_TESTS();
2339   return ret;
2340 }
2341