• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015-2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <cinttypes>
20 #include <fstream>
21 #include <memory>
22 #include <string>
23 #include <type_traits>
24 #include <utility>
25 
26 #include "absl/strings/str_format.h"
27 
28 #include <grpc/grpc.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/time.h>
33 #include <grpcpp/channel.h>
34 #include <grpcpp/client_context.h>
35 #include <grpcpp/security/credentials.h>
36 
37 #include "src/proto/grpc/testing/empty.pb.h"
38 #include "src/proto/grpc/testing/messages.pb.h"
39 #include "src/proto/grpc/testing/test.grpc.pb.h"
40 #include "test/core/util/histogram.h"
41 #include "test/cpp/interop/client_helper.h"
42 #include "test/cpp/interop/interop_client.h"
43 
44 namespace grpc {
45 namespace testing {
46 
47 namespace {
48 // The same value is defined by the Java client.
49 const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
50 const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979};
51 const int kNumResponseMessages = 2000;
52 const int kResponseMessageSize = 1030;
53 const int kReceiveDelayMilliSeconds = 20;
54 const int kLargeRequestSize = 271828;
55 const int kLargeResponseSize = 314159;
56 
NoopChecks(const InteropClientContextInspector &,const SimpleRequest *,const SimpleResponse *)57 void NoopChecks(const InteropClientContextInspector& /*inspector*/,
58                 const SimpleRequest* /*request*/,
59                 const SimpleResponse* /*response*/) {}
60 
UnaryCompressionChecks(const InteropClientContextInspector & inspector,const SimpleRequest * request,const SimpleResponse *)61 void UnaryCompressionChecks(const InteropClientContextInspector& inspector,
62                             const SimpleRequest* request,
63                             const SimpleResponse* /*response*/) {
64   const grpc_compression_algorithm received_compression =
65       inspector.GetCallCompressionAlgorithm();
66   if (request->response_compressed().value()) {
67     if (received_compression == GRPC_COMPRESS_NONE) {
68       // Requested some compression, got NONE. This is an error.
69       gpr_log(GPR_ERROR,
70               "Failure: Requested compression but got uncompressed response "
71               "from server.");
72       abort();
73     }
74     GPR_ASSERT(inspector.WasCompressed());
75   } else {
76     // Didn't request compression -> make sure the response is uncompressed
77     GPR_ASSERT(!(inspector.WasCompressed()));
78   }
79 }
80 }  // namespace
81 
ServiceStub(ChannelCreationFunc channel_creation_func,bool new_stub_every_call)82 InteropClient::ServiceStub::ServiceStub(
83     ChannelCreationFunc channel_creation_func, bool new_stub_every_call)
84     : channel_creation_func_(std::move(channel_creation_func)),
85       channel_(channel_creation_func_()),
86       new_stub_every_call_(new_stub_every_call) {
87   // If new_stub_every_call is false, then this is our chance to initialize
88   // stub_. (see Get())
89   if (!new_stub_every_call) {
90     stub_ = TestService::NewStub(channel_);
91   }
92 }
93 
Get()94 TestService::Stub* InteropClient::ServiceStub::Get() {
95   if (new_stub_every_call_) {
96     stub_ = TestService::NewStub(channel_);
97   }
98 
99   return stub_.get();
100 }
101 
102 UnimplementedService::Stub*
GetUnimplementedServiceStub()103 InteropClient::ServiceStub::GetUnimplementedServiceStub() {
104   if (unimplemented_service_stub_ == nullptr) {
105     unimplemented_service_stub_ = UnimplementedService::NewStub(channel_);
106   }
107   return unimplemented_service_stub_.get();
108 }
109 
ResetChannel()110 void InteropClient::ServiceStub::ResetChannel() {
111   channel_ = channel_creation_func_();
112   if (!new_stub_every_call_) {
113     stub_ = TestService::NewStub(channel_);
114   }
115 }
116 
InteropClient(ChannelCreationFunc channel_creation_func,bool new_stub_every_test_case,bool do_not_abort_on_transient_failures)117 InteropClient::InteropClient(ChannelCreationFunc channel_creation_func,
118                              bool new_stub_every_test_case,
119                              bool do_not_abort_on_transient_failures)
120     : serviceStub_(std::move(channel_creation_func), new_stub_every_test_case),
121       do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {}
122 
AssertStatusOk(const Status & s,const std::string & optional_debug_string)123 bool InteropClient::AssertStatusOk(const Status& s,
124                                    const std::string& optional_debug_string) {
125   if (s.ok()) {
126     return true;
127   }
128 
129   // Note: At this point, s.error_code is definitely not StatusCode::OK (we
130   // already checked for s.ok() above). So, the following will call abort()
131   // (unless s.error_code() corresponds to a transient failure and
132   // 'do_not_abort_on_transient_failures' is true)
133   return AssertStatusCode(s, StatusCode::OK, optional_debug_string);
134 }
135 
AssertStatusCode(const Status & s,StatusCode expected_code,const std::string & optional_debug_string)136 bool InteropClient::AssertStatusCode(const Status& s, StatusCode expected_code,
137                                      const std::string& optional_debug_string) {
138   if (s.error_code() == expected_code) {
139     return true;
140   }
141 
142   gpr_log(GPR_ERROR,
143           "Error status code: %d (expected: %d), message: %s,"
144           " debug string: %s",
145           s.error_code(), expected_code, s.error_message().c_str(),
146           optional_debug_string.c_str());
147 
148   // In case of transient transient/retryable failures (like a broken
149   // connection) we may or may not abort (see TransientFailureOrAbort())
150   if (s.error_code() == grpc::StatusCode::UNAVAILABLE) {
151     return TransientFailureOrAbort();
152   }
153 
154   abort();
155 }
156 
DoEmpty()157 bool InteropClient::DoEmpty() {
158   gpr_log(GPR_DEBUG, "Sending an empty rpc...");
159 
160   Empty request;
161   Empty response;
162   ClientContext context;
163 
164   Status s = serviceStub_.Get()->EmptyCall(&context, request, &response);
165 
166   if (!AssertStatusOk(s, context.debug_error_string())) {
167     return false;
168   }
169 
170   gpr_log(GPR_DEBUG, "Empty rpc done.");
171   return true;
172 }
173 
PerformLargeUnary(SimpleRequest * request,SimpleResponse * response)174 bool InteropClient::PerformLargeUnary(SimpleRequest* request,
175                                       SimpleResponse* response) {
176   return PerformLargeUnary(request, response, NoopChecks);
177 }
178 
PerformLargeUnary(SimpleRequest * request,SimpleResponse * response,const CheckerFn & custom_checks_fn)179 bool InteropClient::PerformLargeUnary(SimpleRequest* request,
180                                       SimpleResponse* response,
181                                       const CheckerFn& custom_checks_fn) {
182   ClientContext context;
183   InteropClientContextInspector inspector(context);
184   request->set_response_size(kLargeResponseSize);
185   std::string payload(kLargeRequestSize, '\0');
186   request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
187   if (request->has_expect_compressed()) {
188     if (request->expect_compressed().value()) {
189       context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
190     } else {
191       context.set_compression_algorithm(GRPC_COMPRESS_NONE);
192     }
193   }
194 
195   Status s = serviceStub_.Get()->UnaryCall(&context, *request, response);
196   if (!AssertStatusOk(s, context.debug_error_string())) {
197     return false;
198   }
199 
200   custom_checks_fn(inspector, request, response);
201 
202   // Payload related checks.
203   GPR_ASSERT(response->payload().body() ==
204              std::string(kLargeResponseSize, '\0'));
205   return true;
206 }
207 
DoComputeEngineCreds(const std::string & default_service_account,const std::string & oauth_scope)208 bool InteropClient::DoComputeEngineCreds(
209     const std::string& default_service_account,
210     const std::string& oauth_scope) {
211   gpr_log(GPR_DEBUG,
212           "Sending a large unary rpc with compute engine credentials ...");
213   SimpleRequest request;
214   SimpleResponse response;
215   request.set_fill_username(true);
216   request.set_fill_oauth_scope(true);
217 
218   if (!PerformLargeUnary(&request, &response)) {
219     return false;
220   }
221 
222   gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str());
223   gpr_log(GPR_DEBUG, "Got oauth_scope %s", response.oauth_scope().c_str());
224   GPR_ASSERT(!response.username().empty());
225   GPR_ASSERT(response.username().c_str() == default_service_account);
226   GPR_ASSERT(!response.oauth_scope().empty());
227   const char* oauth_scope_str = response.oauth_scope().c_str();
228   GPR_ASSERT(oauth_scope.find(oauth_scope_str) != std::string::npos);
229   gpr_log(GPR_DEBUG, "Large unary with compute engine creds done.");
230   return true;
231 }
232 
DoOauth2AuthToken(const std::string & username,const std::string & oauth_scope)233 bool InteropClient::DoOauth2AuthToken(const std::string& username,
234                                       const std::string& oauth_scope) {
235   gpr_log(GPR_DEBUG,
236           "Sending a unary rpc with raw oauth2 access token credentials ...");
237   SimpleRequest request;
238   SimpleResponse response;
239   request.set_fill_username(true);
240   request.set_fill_oauth_scope(true);
241 
242   ClientContext context;
243 
244   Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
245 
246   if (!AssertStatusOk(s, context.debug_error_string())) {
247     return false;
248   }
249 
250   GPR_ASSERT(!response.username().empty());
251   GPR_ASSERT(!response.oauth_scope().empty());
252   GPR_ASSERT(username == response.username());
253   const char* oauth_scope_str = response.oauth_scope().c_str();
254   GPR_ASSERT(oauth_scope.find(oauth_scope_str) != std::string::npos);
255   gpr_log(GPR_DEBUG, "Unary with oauth2 access token credentials done.");
256   return true;
257 }
258 
DoPerRpcCreds(const std::string & json_key)259 bool InteropClient::DoPerRpcCreds(const std::string& json_key) {
260   gpr_log(GPR_DEBUG, "Sending a unary rpc with per-rpc JWT access token ...");
261   SimpleRequest request;
262   SimpleResponse response;
263   request.set_fill_username(true);
264 
265   ClientContext context;
266   std::chrono::seconds token_lifetime = std::chrono::hours(1);
267   std::shared_ptr<CallCredentials> creds =
268       ServiceAccountJWTAccessCredentials(json_key, token_lifetime.count());
269 
270   context.set_credentials(creds);
271 
272   Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
273 
274   if (!AssertStatusOk(s, context.debug_error_string())) {
275     return false;
276   }
277 
278   GPR_ASSERT(!response.username().empty());
279   GPR_ASSERT(json_key.find(response.username()) != std::string::npos);
280   gpr_log(GPR_DEBUG, "Unary with per-rpc JWT access token done.");
281   return true;
282 }
283 
DoJwtTokenCreds(const std::string & username)284 bool InteropClient::DoJwtTokenCreds(const std::string& username) {
285   gpr_log(GPR_DEBUG,
286           "Sending a large unary rpc with JWT token credentials ...");
287   SimpleRequest request;
288   SimpleResponse response;
289   request.set_fill_username(true);
290 
291   if (!PerformLargeUnary(&request, &response)) {
292     return false;
293   }
294 
295   GPR_ASSERT(!response.username().empty());
296   GPR_ASSERT(username.find(response.username()) != std::string::npos);
297   gpr_log(GPR_DEBUG, "Large unary with JWT token creds done.");
298   return true;
299 }
300 
DoGoogleDefaultCredentials(const std::string & default_service_account)301 bool InteropClient::DoGoogleDefaultCredentials(
302     const std::string& default_service_account) {
303   gpr_log(GPR_DEBUG,
304           "Sending a large unary rpc with GoogleDefaultCredentials...");
305   SimpleRequest request;
306   SimpleResponse response;
307   request.set_fill_username(true);
308 
309   if (!PerformLargeUnary(&request, &response)) {
310     return false;
311   }
312 
313   gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str());
314   GPR_ASSERT(!response.username().empty());
315   GPR_ASSERT(response.username().c_str() == default_service_account);
316   gpr_log(GPR_DEBUG, "Large unary rpc with GoogleDefaultCredentials done.");
317   return true;
318 }
319 
DoLargeUnary()320 bool InteropClient::DoLargeUnary() {
321   gpr_log(GPR_DEBUG, "Sending a large unary rpc...");
322   SimpleRequest request;
323   SimpleResponse response;
324   if (!PerformLargeUnary(&request, &response)) {
325     return false;
326   }
327   gpr_log(GPR_DEBUG, "Large unary done.");
328   return true;
329 }
330 
DoClientCompressedUnary()331 bool InteropClient::DoClientCompressedUnary() {
332   // Probing for compression-checks support.
333   ClientContext probe_context;
334   SimpleRequest probe_req;
335   SimpleResponse probe_res;
336 
337   probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE);
338   probe_req.mutable_expect_compressed()->set_value(true);  // lies!
339 
340   probe_req.set_response_size(kLargeResponseSize);
341   probe_req.mutable_payload()->set_body(std::string(kLargeRequestSize, '\0'));
342 
343   gpr_log(GPR_DEBUG, "Sending probe for compressed unary request.");
344   const Status s =
345       serviceStub_.Get()->UnaryCall(&probe_context, probe_req, &probe_res);
346   if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) {
347     // The server isn't able to evaluate incoming compression, making the rest
348     // of this test moot.
349     gpr_log(GPR_DEBUG, "Compressed unary request probe failed");
350     return false;
351   }
352   gpr_log(GPR_DEBUG, "Compressed unary request probe succeeded. Proceeding.");
353 
354   const std::vector<bool> compressions = {true, false};
355   for (size_t i = 0; i < compressions.size(); i++) {
356     std::string log_suffix =
357         absl::StrFormat("(compression=%s)", compressions[i] ? "true" : "false");
358 
359     gpr_log(GPR_DEBUG, "Sending compressed unary request %s.",
360             log_suffix.c_str());
361     SimpleRequest request;
362     SimpleResponse response;
363     request.mutable_expect_compressed()->set_value(compressions[i]);
364     if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) {
365       gpr_log(GPR_ERROR, "Compressed unary request failed %s",
366               log_suffix.c_str());
367       return false;
368     }
369 
370     gpr_log(GPR_DEBUG, "Compressed unary request failed %s",
371             log_suffix.c_str());
372   }
373 
374   return true;
375 }
376 
DoServerCompressedUnary()377 bool InteropClient::DoServerCompressedUnary() {
378   const std::vector<bool> compressions = {true, false};
379   for (size_t i = 0; i < compressions.size(); i++) {
380     std::string log_suffix =
381         absl::StrFormat("(compression=%s)", compressions[i] ? "true" : "false");
382 
383     gpr_log(GPR_DEBUG, "Sending unary request for compressed response %s.",
384             log_suffix.c_str());
385     SimpleRequest request;
386     SimpleResponse response;
387     request.mutable_response_compressed()->set_value(compressions[i]);
388 
389     if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) {
390       gpr_log(GPR_ERROR, "Request for compressed unary failed %s",
391               log_suffix.c_str());
392       return false;
393     }
394 
395     gpr_log(GPR_DEBUG, "Request for compressed unary failed %s",
396             log_suffix.c_str());
397   }
398 
399   return true;
400 }
401 
402 // Either abort() (unless do_not_abort_on_transient_failures_ is true) or return
403 // false
TransientFailureOrAbort()404 bool InteropClient::TransientFailureOrAbort() {
405   if (do_not_abort_on_transient_failures_) {
406     return false;
407   }
408 
409   abort();
410 }
411 
DoRequestStreaming()412 bool InteropClient::DoRequestStreaming() {
413   gpr_log(GPR_DEBUG, "Sending request steaming rpc ...");
414 
415   ClientContext context;
416   StreamingInputCallRequest request;
417   StreamingInputCallResponse response;
418 
419   std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
420       serviceStub_.Get()->StreamingInputCall(&context, &response));
421 
422   int aggregated_payload_size = 0;
423   for (size_t i = 0; i < request_stream_sizes.size(); ++i) {
424     Payload* payload = request.mutable_payload();
425     payload->set_body(std::string(request_stream_sizes[i], '\0'));
426     if (!stream->Write(request)) {
427       gpr_log(GPR_ERROR, "DoRequestStreaming(): stream->Write() failed");
428       return TransientFailureOrAbort();
429     }
430     aggregated_payload_size += request_stream_sizes[i];
431   }
432   GPR_ASSERT(stream->WritesDone());
433 
434   Status s = stream->Finish();
435   if (!AssertStatusOk(s, context.debug_error_string())) {
436     return false;
437   }
438 
439   GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
440   return true;
441 }
442 
DoResponseStreaming()443 bool InteropClient::DoResponseStreaming() {
444   gpr_log(GPR_DEBUG, "Receiving response streaming rpc ...");
445 
446   ClientContext context;
447   StreamingOutputCallRequest request;
448   for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
449     ResponseParameters* response_parameter = request.add_response_parameters();
450     response_parameter->set_size(response_stream_sizes[i]);
451   }
452   StreamingOutputCallResponse response;
453   std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
454       serviceStub_.Get()->StreamingOutputCall(&context, request));
455 
456   unsigned int i = 0;
457   while (stream->Read(&response)) {
458     GPR_ASSERT(response.payload().body() ==
459                std::string(response_stream_sizes[i], '\0'));
460     ++i;
461   }
462 
463   if (i < response_stream_sizes.size()) {
464     // stream->Read() failed before reading all the expected messages. This is
465     // most likely due to connection failure.
466     gpr_log(GPR_ERROR,
467             "DoResponseStreaming(): Read fewer streams (%d) than "
468             "response_stream_sizes.size() (%" PRIuPTR ")",
469             i, response_stream_sizes.size());
470     return TransientFailureOrAbort();
471   }
472 
473   Status s = stream->Finish();
474   if (!AssertStatusOk(s, context.debug_error_string())) {
475     return false;
476   }
477 
478   gpr_log(GPR_DEBUG, "Response streaming done.");
479   return true;
480 }
481 
DoClientCompressedStreaming()482 bool InteropClient::DoClientCompressedStreaming() {
483   // Probing for compression-checks support.
484   ClientContext probe_context;
485   StreamingInputCallRequest probe_req;
486   StreamingInputCallResponse probe_res;
487 
488   probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE);
489   probe_req.mutable_expect_compressed()->set_value(true);  // lies!
490   probe_req.mutable_payload()->set_body(std::string(27182, '\0'));
491 
492   gpr_log(GPR_DEBUG, "Sending probe for compressed streaming request.");
493 
494   std::unique_ptr<ClientWriter<StreamingInputCallRequest>> probe_stream(
495       serviceStub_.Get()->StreamingInputCall(&probe_context, &probe_res));
496 
497   if (!probe_stream->Write(probe_req)) {
498     gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
499     return TransientFailureOrAbort();
500   }
501   Status s = probe_stream->Finish();
502   if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) {
503     // The server isn't able to evaluate incoming compression, making the rest
504     // of this test moot.
505     gpr_log(GPR_DEBUG, "Compressed streaming request probe failed");
506     return false;
507   }
508   gpr_log(GPR_DEBUG,
509           "Compressed streaming request probe succeeded. Proceeding.");
510 
511   ClientContext context;
512   StreamingInputCallRequest request;
513   StreamingInputCallResponse response;
514 
515   context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
516   std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
517       serviceStub_.Get()->StreamingInputCall(&context, &response));
518 
519   request.mutable_payload()->set_body(std::string(27182, '\0'));
520   request.mutable_expect_compressed()->set_value(true);
521   gpr_log(GPR_DEBUG, "Sending streaming request with compression enabled");
522   if (!stream->Write(request)) {
523     gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
524     return TransientFailureOrAbort();
525   }
526 
527   WriteOptions wopts;
528   wopts.set_no_compression();
529   request.mutable_payload()->set_body(std::string(45904, '\0'));
530   request.mutable_expect_compressed()->set_value(false);
531   gpr_log(GPR_DEBUG, "Sending streaming request with compression disabled");
532   if (!stream->Write(request, wopts)) {
533     gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
534     return TransientFailureOrAbort();
535   }
536   GPR_ASSERT(stream->WritesDone());
537 
538   s = stream->Finish();
539   if (!AssertStatusOk(s, context.debug_error_string())) {
540     return false;
541   }
542 
543   return true;
544 }
545 
DoServerCompressedStreaming()546 bool InteropClient::DoServerCompressedStreaming() {
547   const std::vector<bool> compressions = {true, false};
548   const std::vector<int> sizes = {31415, 92653};
549 
550   ClientContext context;
551   InteropClientContextInspector inspector(context);
552   StreamingOutputCallRequest request;
553 
554   GPR_ASSERT(compressions.size() == sizes.size());
555   for (size_t i = 0; i < sizes.size(); i++) {
556     std::string log_suffix =
557         absl::StrFormat("(compression=%s; size=%d)",
558                         compressions[i] ? "true" : "false", sizes[i]);
559 
560     gpr_log(GPR_DEBUG, "Sending request streaming rpc %s.", log_suffix.c_str());
561 
562     ResponseParameters* const response_parameter =
563         request.add_response_parameters();
564     response_parameter->mutable_compressed()->set_value(compressions[i]);
565     response_parameter->set_size(sizes[i]);
566   }
567   std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
568       serviceStub_.Get()->StreamingOutputCall(&context, request));
569 
570   size_t k = 0;
571   StreamingOutputCallResponse response;
572   while (stream->Read(&response)) {
573     // Payload size checks.
574     GPR_ASSERT(response.payload().body() ==
575                std::string(request.response_parameters(k).size(), '\0'));
576 
577     // Compression checks.
578     GPR_ASSERT(request.response_parameters(k).has_compressed());
579     if (request.response_parameters(k).compressed().value()) {
580       GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > GRPC_COMPRESS_NONE);
581       GPR_ASSERT(inspector.WasCompressed());
582     } else {
583       // requested *no* compression.
584       GPR_ASSERT(!(inspector.WasCompressed()));
585     }
586     ++k;
587   }
588 
589   if (k < sizes.size()) {
590     // stream->Read() failed before reading all the expected messages. This
591     // is most likely due to a connection failure.
592     gpr_log(GPR_ERROR,
593             "%s(): Responses read (k=%" PRIuPTR
594             ") is less than the expected number of  messages (%" PRIuPTR ").",
595             __func__, k, sizes.size());
596     return TransientFailureOrAbort();
597   }
598 
599   Status s = stream->Finish();
600   if (!AssertStatusOk(s, context.debug_error_string())) {
601     return false;
602   }
603   return true;
604 }
605 
DoResponseStreamingWithSlowConsumer()606 bool InteropClient::DoResponseStreamingWithSlowConsumer() {
607   gpr_log(GPR_DEBUG, "Receiving response streaming rpc with slow consumer ...");
608 
609   ClientContext context;
610   StreamingOutputCallRequest request;
611 
612   for (int i = 0; i < kNumResponseMessages; ++i) {
613     ResponseParameters* response_parameter = request.add_response_parameters();
614     response_parameter->set_size(kResponseMessageSize);
615   }
616   StreamingOutputCallResponse response;
617   std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
618       serviceStub_.Get()->StreamingOutputCall(&context, request));
619 
620   int i = 0;
621   while (stream->Read(&response)) {
622     GPR_ASSERT(response.payload().body() ==
623                std::string(kResponseMessageSize, '\0'));
624     gpr_log(GPR_DEBUG, "received message %d", i);
625     gpr_sleep_until(gpr_time_add(
626         gpr_now(GPR_CLOCK_REALTIME),
627         gpr_time_from_millis(kReceiveDelayMilliSeconds, GPR_TIMESPAN)));
628     ++i;
629   }
630 
631   if (i < kNumResponseMessages) {
632     gpr_log(GPR_ERROR,
633             "DoResponseStreamingWithSlowConsumer(): Responses read (i=%d) is "
634             "less than the expected messages (i.e kNumResponseMessages = %d)",
635             i, kNumResponseMessages);
636 
637     return TransientFailureOrAbort();
638   }
639 
640   Status s = stream->Finish();
641   if (!AssertStatusOk(s, context.debug_error_string())) {
642     return false;
643   }
644 
645   gpr_log(GPR_DEBUG, "Response streaming done.");
646   return true;
647 }
648 
DoHalfDuplex()649 bool InteropClient::DoHalfDuplex() {
650   gpr_log(GPR_DEBUG, "Sending half-duplex streaming rpc ...");
651 
652   ClientContext context;
653   std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
654                                      StreamingOutputCallResponse>>
655       stream(serviceStub_.Get()->HalfDuplexCall(&context));
656 
657   StreamingOutputCallRequest request;
658   ResponseParameters* response_parameter = request.add_response_parameters();
659   for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
660     response_parameter->set_size(response_stream_sizes[i]);
661 
662     if (!stream->Write(request)) {
663       gpr_log(GPR_ERROR, "DoHalfDuplex(): stream->Write() failed. i=%d", i);
664       return TransientFailureOrAbort();
665     }
666   }
667   stream->WritesDone();
668 
669   unsigned int i = 0;
670   StreamingOutputCallResponse response;
671   while (stream->Read(&response)) {
672     GPR_ASSERT(response.payload().body() ==
673                std::string(response_stream_sizes[i], '\0'));
674     ++i;
675   }
676 
677   if (i < response_stream_sizes.size()) {
678     // stream->Read() failed before reading all the expected messages. This is
679     // most likely due to a connection failure
680     gpr_log(GPR_ERROR,
681             "DoHalfDuplex(): Responses read (i=%d) are less than the expected "
682             "number of messages response_stream_sizes.size() (%" PRIuPTR ")",
683             i, response_stream_sizes.size());
684     return TransientFailureOrAbort();
685   }
686 
687   Status s = stream->Finish();
688   if (!AssertStatusOk(s, context.debug_error_string())) {
689     return false;
690   }
691 
692   gpr_log(GPR_DEBUG, "Half-duplex streaming rpc done.");
693   return true;
694 }
695 
DoPingPong()696 bool InteropClient::DoPingPong() {
697   gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
698 
699   ClientContext context;
700   std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
701                                      StreamingOutputCallResponse>>
702       stream(serviceStub_.Get()->FullDuplexCall(&context));
703 
704   StreamingOutputCallRequest request;
705   ResponseParameters* response_parameter = request.add_response_parameters();
706   Payload* payload = request.mutable_payload();
707   StreamingOutputCallResponse response;
708 
709   for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
710     response_parameter->set_size(response_stream_sizes[i]);
711     payload->set_body(std::string(request_stream_sizes[i], '\0'));
712 
713     if (!stream->Write(request)) {
714       gpr_log(GPR_ERROR, "DoPingPong(): stream->Write() failed. i: %d", i);
715       return TransientFailureOrAbort();
716     }
717 
718     if (!stream->Read(&response)) {
719       gpr_log(GPR_ERROR, "DoPingPong(): stream->Read() failed. i:%d", i);
720       return TransientFailureOrAbort();
721     }
722 
723     GPR_ASSERT(response.payload().body() ==
724                std::string(response_stream_sizes[i], '\0'));
725   }
726 
727   stream->WritesDone();
728 
729   GPR_ASSERT(!stream->Read(&response));
730 
731   Status s = stream->Finish();
732   if (!AssertStatusOk(s, context.debug_error_string())) {
733     return false;
734   }
735 
736   gpr_log(GPR_DEBUG, "Ping pong streaming done.");
737   return true;
738 }
739 
DoCancelAfterBegin()740 bool InteropClient::DoCancelAfterBegin() {
741   gpr_log(GPR_DEBUG, "Sending request streaming rpc ...");
742 
743   ClientContext context;
744   StreamingInputCallRequest request;
745   StreamingInputCallResponse response;
746 
747   std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
748       serviceStub_.Get()->StreamingInputCall(&context, &response));
749 
750   gpr_log(GPR_DEBUG, "Trying to cancel...");
751   context.TryCancel();
752   Status s = stream->Finish();
753 
754   if (!AssertStatusCode(s, StatusCode::CANCELLED,
755                         context.debug_error_string())) {
756     return false;
757   }
758 
759   gpr_log(GPR_DEBUG, "Canceling streaming done.");
760   return true;
761 }
762 
DoCancelAfterFirstResponse()763 bool InteropClient::DoCancelAfterFirstResponse() {
764   gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
765 
766   ClientContext context;
767   std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
768                                      StreamingOutputCallResponse>>
769       stream(serviceStub_.Get()->FullDuplexCall(&context));
770 
771   StreamingOutputCallRequest request;
772   ResponseParameters* response_parameter = request.add_response_parameters();
773   response_parameter->set_size(31415);
774   request.mutable_payload()->set_body(std::string(27182, '\0'));
775   StreamingOutputCallResponse response;
776 
777   if (!stream->Write(request)) {
778     gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Write() failed");
779     return TransientFailureOrAbort();
780   }
781 
782   if (!stream->Read(&response)) {
783     gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Read failed");
784     return TransientFailureOrAbort();
785   }
786   GPR_ASSERT(response.payload().body() == std::string(31415, '\0'));
787 
788   gpr_log(GPR_DEBUG, "Trying to cancel...");
789   context.TryCancel();
790 
791   Status s = stream->Finish();
792   gpr_log(GPR_DEBUG, "Canceling pingpong streaming done.");
793   return true;
794 }
795 
DoTimeoutOnSleepingServer()796 bool InteropClient::DoTimeoutOnSleepingServer() {
797   gpr_log(GPR_DEBUG,
798           "Sending Ping Pong streaming rpc with a short deadline...");
799 
800   ClientContext context;
801   std::chrono::system_clock::time_point deadline =
802       std::chrono::system_clock::now() + std::chrono::milliseconds(1);
803   context.set_deadline(deadline);
804   std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
805                                      StreamingOutputCallResponse>>
806       stream(serviceStub_.Get()->FullDuplexCall(&context));
807 
808   StreamingOutputCallRequest request;
809   request.mutable_payload()->set_body(std::string(27182, '\0'));
810   stream->Write(request);
811 
812   Status s = stream->Finish();
813   if (!AssertStatusCode(s, StatusCode::DEADLINE_EXCEEDED,
814                         context.debug_error_string())) {
815     return false;
816   }
817 
818   gpr_log(GPR_DEBUG, "Pingpong streaming timeout done.");
819   return true;
820 }
821 
DoEmptyStream()822 bool InteropClient::DoEmptyStream() {
823   gpr_log(GPR_DEBUG, "Starting empty_stream.");
824 
825   ClientContext context;
826   std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
827                                      StreamingOutputCallResponse>>
828       stream(serviceStub_.Get()->FullDuplexCall(&context));
829   stream->WritesDone();
830   StreamingOutputCallResponse response;
831   GPR_ASSERT(stream->Read(&response) == false);
832 
833   Status s = stream->Finish();
834   if (!AssertStatusOk(s, context.debug_error_string())) {
835     return false;
836   }
837 
838   gpr_log(GPR_DEBUG, "empty_stream done.");
839   return true;
840 }
841 
DoStatusWithMessage()842 bool InteropClient::DoStatusWithMessage() {
843   gpr_log(GPR_DEBUG,
844           "Sending RPC with a request for status code 2 and message");
845 
846   const grpc::StatusCode test_code = grpc::StatusCode::UNKNOWN;
847   const std::string test_msg = "This is a test message";
848 
849   // Test UnaryCall.
850   ClientContext context;
851   SimpleRequest request;
852   SimpleResponse response;
853   EchoStatus* requested_status = request.mutable_response_status();
854   requested_status->set_code(test_code);
855   requested_status->set_message(test_msg);
856   Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
857   if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN,
858                         context.debug_error_string())) {
859     return false;
860   }
861   GPR_ASSERT(s.error_message() == test_msg);
862 
863   // Test FullDuplexCall.
864   ClientContext stream_context;
865   std::shared_ptr<ClientReaderWriter<StreamingOutputCallRequest,
866                                      StreamingOutputCallResponse>>
867       stream(serviceStub_.Get()->FullDuplexCall(&stream_context));
868   StreamingOutputCallRequest streaming_request;
869   requested_status = streaming_request.mutable_response_status();
870   requested_status->set_code(test_code);
871   requested_status->set_message(test_msg);
872   stream->Write(streaming_request);
873   stream->WritesDone();
874   StreamingOutputCallResponse streaming_response;
875   while (stream->Read(&streaming_response))
876     ;
877   s = stream->Finish();
878   if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN,
879                         context.debug_error_string())) {
880     return false;
881   }
882   GPR_ASSERT(s.error_message() == test_msg);
883 
884   gpr_log(GPR_DEBUG, "Done testing Status and Message");
885   return true;
886 }
887 
DoCacheableUnary()888 bool InteropClient::DoCacheableUnary() {
889   gpr_log(GPR_DEBUG, "Sending RPC with cacheable response");
890 
891   // Create request with current timestamp
892   gpr_timespec ts = gpr_now(GPR_CLOCK_PRECISE);
893   std::string timestamp =
894       std::to_string(static_cast<long long unsigned>(ts.tv_nsec));
895   SimpleRequest request;
896   request.mutable_payload()->set_body(timestamp.c_str(), timestamp.size());
897 
898   // Request 1
899   ClientContext context1;
900   SimpleResponse response1;
901   context1.set_cacheable(true);
902   // Add fake user IP since some proxy's (GFE) won't cache requests from
903   // localhost.
904   context1.AddMetadata("x-user-ip", "1.2.3.4");
905   Status s1 =
906       serviceStub_.Get()->CacheableUnaryCall(&context1, request, &response1);
907   if (!AssertStatusOk(s1, context1.debug_error_string())) {
908     return false;
909   }
910   gpr_log(GPR_DEBUG, "response 1 payload: %s",
911           response1.payload().body().c_str());
912 
913   // Request 2
914   ClientContext context2;
915   SimpleResponse response2;
916   context2.set_cacheable(true);
917   context2.AddMetadata("x-user-ip", "1.2.3.4");
918   Status s2 =
919       serviceStub_.Get()->CacheableUnaryCall(&context2, request, &response2);
920   if (!AssertStatusOk(s2, context2.debug_error_string())) {
921     return false;
922   }
923   gpr_log(GPR_DEBUG, "response 2 payload: %s",
924           response2.payload().body().c_str());
925 
926   // Check that the body is same for both requests. It will be the same if the
927   // second response is a cached copy of the first response
928   GPR_ASSERT(response2.payload().body() == response1.payload().body());
929 
930   // Request 3
931   // Modify the request body so it will not get a cache hit
932   ts = gpr_now(GPR_CLOCK_PRECISE);
933   timestamp = std::to_string(static_cast<long long unsigned>(ts.tv_nsec));
934   SimpleRequest request1;
935   request1.mutable_payload()->set_body(timestamp.c_str(), timestamp.size());
936   ClientContext context3;
937   SimpleResponse response3;
938   context3.set_cacheable(true);
939   context3.AddMetadata("x-user-ip", "1.2.3.4");
940   Status s3 =
941       serviceStub_.Get()->CacheableUnaryCall(&context3, request1, &response3);
942   if (!AssertStatusOk(s3, context3.debug_error_string())) {
943     return false;
944   }
945   gpr_log(GPR_DEBUG, "response 3 payload: %s",
946           response3.payload().body().c_str());
947 
948   // Check that the response is different from the previous response.
949   GPR_ASSERT(response3.payload().body() != response1.payload().body());
950   return true;
951 }
952 
DoPickFirstUnary()953 bool InteropClient::DoPickFirstUnary() {
954   const int rpcCount = 100;
955   SimpleRequest request;
956   SimpleResponse response;
957   std::string server_id;
958   request.set_fill_server_id(true);
959   for (int i = 0; i < rpcCount; i++) {
960     ClientContext context;
961     Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
962     if (!AssertStatusOk(s, context.debug_error_string())) {
963       return false;
964     }
965     if (i == 0) {
966       server_id = response.server_id();
967       continue;
968     }
969     if (response.server_id() != server_id) {
970       gpr_log(GPR_ERROR, "#%d rpc hits server_id %s, expect server_id %s", i,
971               response.server_id().c_str(), server_id.c_str());
972       return false;
973     }
974   }
975   gpr_log(GPR_DEBUG, "pick first unary successfully finished");
976   return true;
977 }
978 
DoCustomMetadata()979 bool InteropClient::DoCustomMetadata() {
980   const std::string kEchoInitialMetadataKey("x-grpc-test-echo-initial");
981   const std::string kInitialMetadataValue("test_initial_metadata_value");
982   const std::string kEchoTrailingBinMetadataKey(
983       "x-grpc-test-echo-trailing-bin");
984   const std::string kTrailingBinValue("\x0a\x0b\x0a\x0b\x0a\x0b");
985   ;
986 
987   {
988     gpr_log(GPR_DEBUG, "Sending RPC with custom metadata");
989     ClientContext context;
990     context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue);
991     context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue);
992     SimpleRequest request;
993     SimpleResponse response;
994     request.set_response_size(kLargeResponseSize);
995     std::string payload(kLargeRequestSize, '\0');
996     request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
997 
998     Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
999     if (!AssertStatusOk(s, context.debug_error_string())) {
1000       return false;
1001     }
1002 
1003     const auto& server_initial_metadata = context.GetServerInitialMetadata();
1004     auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
1005     GPR_ASSERT(iter != server_initial_metadata.end());
1006     GPR_ASSERT(iter->second == kInitialMetadataValue);
1007     const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
1008     iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
1009     GPR_ASSERT(iter != server_trailing_metadata.end());
1010     GPR_ASSERT(std::string(iter->second.begin(), iter->second.end()) ==
1011                kTrailingBinValue);
1012 
1013     gpr_log(GPR_DEBUG, "Done testing RPC with custom metadata");
1014   }
1015 
1016   {
1017     gpr_log(GPR_DEBUG, "Sending stream with custom metadata");
1018     ClientContext context;
1019     context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue);
1020     context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue);
1021     std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
1022                                        StreamingOutputCallResponse>>
1023         stream(serviceStub_.Get()->FullDuplexCall(&context));
1024 
1025     StreamingOutputCallRequest request;
1026     ResponseParameters* response_parameter = request.add_response_parameters();
1027     response_parameter->set_size(kLargeResponseSize);
1028     std::string payload(kLargeRequestSize, '\0');
1029     request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
1030     StreamingOutputCallResponse response;
1031 
1032     if (!stream->Write(request)) {
1033       gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Write() failed");
1034       return TransientFailureOrAbort();
1035     }
1036 
1037     stream->WritesDone();
1038 
1039     if (!stream->Read(&response)) {
1040       gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Read() failed");
1041       return TransientFailureOrAbort();
1042     }
1043 
1044     GPR_ASSERT(response.payload().body() ==
1045                std::string(kLargeResponseSize, '\0'));
1046 
1047     GPR_ASSERT(!stream->Read(&response));
1048 
1049     Status s = stream->Finish();
1050     if (!AssertStatusOk(s, context.debug_error_string())) {
1051       return false;
1052     }
1053 
1054     const auto& server_initial_metadata = context.GetServerInitialMetadata();
1055     auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
1056     GPR_ASSERT(iter != server_initial_metadata.end());
1057     GPR_ASSERT(iter->second == kInitialMetadataValue);
1058     const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
1059     iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
1060     GPR_ASSERT(iter != server_trailing_metadata.end());
1061     GPR_ASSERT(std::string(iter->second.begin(), iter->second.end()) ==
1062                kTrailingBinValue);
1063 
1064     gpr_log(GPR_DEBUG, "Done testing stream with custom metadata");
1065   }
1066 
1067   return true;
1068 }
1069 
1070 std::tuple<bool, int32_t, std::string>
PerformOneSoakTestIteration(const bool reset_channel,const int32_t max_acceptable_per_iteration_latency_ms)1071 InteropClient::PerformOneSoakTestIteration(
1072     const bool reset_channel,
1073     const int32_t max_acceptable_per_iteration_latency_ms) {
1074   gpr_timespec start = gpr_now(GPR_CLOCK_MONOTONIC);
1075   SimpleRequest request;
1076   SimpleResponse response;
1077   // Don't set the deadline on the RPC, and instead just
1078   // record how long the RPC took and compare. This makes
1079   // debugging easier when looking at failure results.
1080   ClientContext context;
1081   InteropClientContextInspector inspector(context);
1082   request.set_response_size(kLargeResponseSize);
1083   std::string payload(kLargeRequestSize, '\0');
1084   request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
1085   if (reset_channel) {
1086     serviceStub_.ResetChannel();
1087   }
1088   Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
1089   gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
1090   int32_t elapsed_ms = gpr_time_to_millis(gpr_time_sub(now, start));
1091   if (!s.ok()) {
1092     return std::make_tuple(false, elapsed_ms, context.debug_error_string());
1093   } else if (elapsed_ms > max_acceptable_per_iteration_latency_ms) {
1094     std::string debug_string =
1095         absl::StrFormat("%d ms exceeds max acceptable latency: %d ms.",
1096                         elapsed_ms, max_acceptable_per_iteration_latency_ms);
1097     return std::make_tuple(false, elapsed_ms, std::move(debug_string));
1098   } else {
1099     return std::make_tuple(true, elapsed_ms, "");
1100   }
1101 }
1102 
PerformSoakTest(const bool reset_channel_per_iteration,const int32_t soak_iterations,const int32_t max_failures,const int32_t max_acceptable_per_iteration_latency_ms,const int32_t overall_timeout_seconds)1103 void InteropClient::PerformSoakTest(
1104     const bool reset_channel_per_iteration, const int32_t soak_iterations,
1105     const int32_t max_failures,
1106     const int32_t max_acceptable_per_iteration_latency_ms,
1107     const int32_t overall_timeout_seconds) {
1108   std::vector<std::tuple<bool, int32_t, std::string>> results;
1109   grpc_histogram* latencies_ms_histogram = grpc_histogram_create(
1110       1 /* resolution */,
1111       500 * 1e3 /* largest bucket; 500 seconds is unlikely */);
1112   gpr_timespec overall_deadline = gpr_time_add(
1113       gpr_now(GPR_CLOCK_MONOTONIC),
1114       gpr_time_from_seconds(overall_timeout_seconds, GPR_TIMESPAN));
1115   int32_t iterations_ran = 0;
1116   for (int i = 0;
1117        i < soak_iterations &&
1118        gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), overall_deadline) < 0;
1119        ++i) {
1120     auto result = PerformOneSoakTestIteration(
1121         reset_channel_per_iteration, max_acceptable_per_iteration_latency_ms);
1122     results.push_back(result);
1123     grpc_histogram_add(latencies_ms_histogram, std::get<1>(result));
1124     iterations_ran++;
1125   }
1126   int total_failures = 0;
1127   for (size_t i = 0; i < results.size(); i++) {
1128     bool success = std::get<0>(results[i]);
1129     int32_t elapsed_ms = std::get<1>(results[i]);
1130     std::string debug_string = std::get<2>(results[i]);
1131     if (!success) {
1132       gpr_log(GPR_DEBUG, "soak iteration: %ld elapsed_ms: %d failed: %s", i,
1133               elapsed_ms, debug_string.c_str());
1134       total_failures++;
1135     } else {
1136       gpr_log(GPR_DEBUG, "soak iteration: %ld elapsed_ms: %d succeeded", i,
1137               elapsed_ms);
1138     }
1139   }
1140   double latency_ms_median =
1141       grpc_histogram_percentile(latencies_ms_histogram, 50);
1142   double latency_ms_90th =
1143       grpc_histogram_percentile(latencies_ms_histogram, 90);
1144   double latency_ms_worst = grpc_histogram_maximum(latencies_ms_histogram);
1145   grpc_histogram_destroy(latencies_ms_histogram);
1146   if (iterations_ran < soak_iterations) {
1147     gpr_log(
1148         GPR_ERROR,
1149         "soak test consumed all %d seconds of time and quit early, only "
1150         "having ran %d out of desired %d iterations. "
1151         "total_failures: %d. "
1152         "max_failures_threshold: %d. "
1153         "median_soak_iteration_latency: %lf ms. "
1154         "90th_soak_iteration_latency: %lf ms. "
1155         "worst_soak_iteration_latency: %lf ms. "
1156         "Some or all of the iterations that did run were unexpectedly slow. "
1157         "See breakdown above for which iterations succeeded, failed, and "
1158         "why for more info.",
1159         overall_timeout_seconds, iterations_ran, soak_iterations,
1160         total_failures, max_failures, latency_ms_median, latency_ms_90th,
1161         latency_ms_worst);
1162     GPR_ASSERT(0);
1163   } else if (total_failures > max_failures) {
1164     gpr_log(GPR_ERROR,
1165             "soak test ran: %d iterations. total_failures: %d exceeds "
1166             "max_failures_threshold: %d. "
1167             "median_soak_iteration_latency: %lf ms. "
1168             "90th_soak_iteration_latency: %lf ms. "
1169             "worst_soak_iteration_latency: %lf ms. "
1170             "See breakdown above for which iterations succeeded, failed, and "
1171             "why for more info.",
1172             soak_iterations, total_failures, max_failures, latency_ms_median,
1173             latency_ms_90th, latency_ms_worst);
1174     GPR_ASSERT(0);
1175   } else {
1176     gpr_log(GPR_INFO,
1177             "soak test ran: %d iterations. total_failures: %d is within "
1178             "max_failures_threshold: %d. "
1179             "median_soak_iteration_latency: %lf ms. "
1180             "90th_soak_iteration_latency: %lf ms. "
1181             "worst_soak_iteration_latency: %lf ms. "
1182             "See breakdown above for which iterations succeeded, failed, and "
1183             "why for more info.",
1184             soak_iterations, total_failures, max_failures, latency_ms_median,
1185             latency_ms_90th, latency_ms_worst);
1186   }
1187 }
1188 
DoRpcSoakTest(int32_t soak_iterations,int32_t max_failures,int64_t max_acceptable_per_iteration_latency_ms,int32_t overall_timeout_seconds)1189 bool InteropClient::DoRpcSoakTest(
1190     int32_t soak_iterations, int32_t max_failures,
1191     int64_t max_acceptable_per_iteration_latency_ms,
1192     int32_t overall_timeout_seconds) {
1193   gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations);
1194   GPR_ASSERT(soak_iterations > 0);
1195   PerformSoakTest(false /* reset channel per iteration */, soak_iterations,
1196                   max_failures, max_acceptable_per_iteration_latency_ms,
1197                   overall_timeout_seconds);
1198   gpr_log(GPR_DEBUG, "rpc_soak test done.");
1199   return true;
1200 }
1201 
DoChannelSoakTest(int32_t soak_iterations,int32_t max_failures,int64_t max_acceptable_per_iteration_latency_ms,int32_t overall_timeout_seconds)1202 bool InteropClient::DoChannelSoakTest(
1203     int32_t soak_iterations, int32_t max_failures,
1204     int64_t max_acceptable_per_iteration_latency_ms,
1205     int32_t overall_timeout_seconds) {
1206   gpr_log(GPR_DEBUG, "Sending %d RPCs, tearing down the channel each time...",
1207           soak_iterations);
1208   GPR_ASSERT(soak_iterations > 0);
1209   PerformSoakTest(true /* reset channel per iteration */, soak_iterations,
1210                   max_failures, max_acceptable_per_iteration_latency_ms,
1211                   overall_timeout_seconds);
1212   gpr_log(GPR_DEBUG, "channel_soak test done.");
1213   return true;
1214 }
1215 
DoLongLivedChannelTest(int32_t soak_iterations,int32_t iteration_interval)1216 bool InteropClient::DoLongLivedChannelTest(int32_t soak_iterations,
1217                                            int32_t iteration_interval) {
1218   gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations);
1219   GPR_ASSERT(soak_iterations > 0);
1220   GPR_ASSERT(iteration_interval > 0);
1221   SimpleRequest request;
1222   SimpleResponse response;
1223   int num_failures = 0;
1224   for (int i = 0; i < soak_iterations; ++i) {
1225     gpr_log(GPR_DEBUG, "Sending RPC number %d...", i);
1226     if (!PerformLargeUnary(&request, &response)) {
1227       gpr_log(GPR_ERROR, "Iteration %d failed.", i);
1228       num_failures++;
1229     }
1230     gpr_sleep_until(
1231         gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
1232                      gpr_time_from_seconds(iteration_interval, GPR_TIMESPAN)));
1233   }
1234   if (num_failures == 0) {
1235     gpr_log(GPR_DEBUG, "long_lived_channel test done.");
1236     return true;
1237   } else {
1238     gpr_log(GPR_DEBUG, "long_lived_channel test failed with %d rpc failures.",
1239             num_failures);
1240     return false;
1241   }
1242 }
1243 
DoUnimplementedService()1244 bool InteropClient::DoUnimplementedService() {
1245   gpr_log(GPR_DEBUG, "Sending a request for an unimplemented service...");
1246 
1247   Empty request;
1248   Empty response;
1249   ClientContext context;
1250 
1251   UnimplementedService::Stub* stub = serviceStub_.GetUnimplementedServiceStub();
1252 
1253   Status s = stub->UnimplementedCall(&context, request, &response);
1254 
1255   if (!AssertStatusCode(s, StatusCode::UNIMPLEMENTED,
1256                         context.debug_error_string())) {
1257     return false;
1258   }
1259 
1260   gpr_log(GPR_DEBUG, "unimplemented service done.");
1261   return true;
1262 }
1263 
DoUnimplementedMethod()1264 bool InteropClient::DoUnimplementedMethod() {
1265   gpr_log(GPR_DEBUG, "Sending a request for an unimplemented rpc...");
1266 
1267   Empty request;
1268   Empty response;
1269   ClientContext context;
1270 
1271   Status s =
1272       serviceStub_.Get()->UnimplementedCall(&context, request, &response);
1273 
1274   if (!AssertStatusCode(s, StatusCode::UNIMPLEMENTED,
1275                         context.debug_error_string())) {
1276     return false;
1277   }
1278 
1279   gpr_log(GPR_DEBUG, "unimplemented rpc done.");
1280   return true;
1281 }
1282 
1283 }  // namespace testing
1284 }  // namespace grpc
1285