1 /*
2 *
3 * Copyright 2015-2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <cinttypes>
20 #include <fstream>
21 #include <memory>
22 #include <string>
23 #include <type_traits>
24 #include <utility>
25
26 #include "absl/strings/str_format.h"
27
28 #include <grpc/grpc.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/time.h>
33 #include <grpcpp/channel.h>
34 #include <grpcpp/client_context.h>
35 #include <grpcpp/security/credentials.h>
36
37 #include "src/proto/grpc/testing/empty.pb.h"
38 #include "src/proto/grpc/testing/messages.pb.h"
39 #include "src/proto/grpc/testing/test.grpc.pb.h"
40 #include "test/core/util/histogram.h"
41 #include "test/cpp/interop/client_helper.h"
42 #include "test/cpp/interop/interop_client.h"
43
44 namespace grpc {
45 namespace testing {
46
47 namespace {
48 // The same value is defined by the Java client.
49 const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
50 const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979};
51 const int kNumResponseMessages = 2000;
52 const int kResponseMessageSize = 1030;
53 const int kReceiveDelayMilliSeconds = 20;
54 const int kLargeRequestSize = 271828;
55 const int kLargeResponseSize = 314159;
56
NoopChecks(const InteropClientContextInspector &,const SimpleRequest *,const SimpleResponse *)57 void NoopChecks(const InteropClientContextInspector& /*inspector*/,
58 const SimpleRequest* /*request*/,
59 const SimpleResponse* /*response*/) {}
60
UnaryCompressionChecks(const InteropClientContextInspector & inspector,const SimpleRequest * request,const SimpleResponse *)61 void UnaryCompressionChecks(const InteropClientContextInspector& inspector,
62 const SimpleRequest* request,
63 const SimpleResponse* /*response*/) {
64 const grpc_compression_algorithm received_compression =
65 inspector.GetCallCompressionAlgorithm();
66 if (request->response_compressed().value()) {
67 if (received_compression == GRPC_COMPRESS_NONE) {
68 // Requested some compression, got NONE. This is an error.
69 gpr_log(GPR_ERROR,
70 "Failure: Requested compression but got uncompressed response "
71 "from server.");
72 abort();
73 }
74 GPR_ASSERT(inspector.WasCompressed());
75 } else {
76 // Didn't request compression -> make sure the response is uncompressed
77 GPR_ASSERT(!(inspector.WasCompressed()));
78 }
79 }
80 } // namespace
81
ServiceStub(ChannelCreationFunc channel_creation_func,bool new_stub_every_call)82 InteropClient::ServiceStub::ServiceStub(
83 ChannelCreationFunc channel_creation_func, bool new_stub_every_call)
84 : channel_creation_func_(std::move(channel_creation_func)),
85 channel_(channel_creation_func_()),
86 new_stub_every_call_(new_stub_every_call) {
87 // If new_stub_every_call is false, then this is our chance to initialize
88 // stub_. (see Get())
89 if (!new_stub_every_call) {
90 stub_ = TestService::NewStub(channel_);
91 }
92 }
93
Get()94 TestService::Stub* InteropClient::ServiceStub::Get() {
95 if (new_stub_every_call_) {
96 stub_ = TestService::NewStub(channel_);
97 }
98
99 return stub_.get();
100 }
101
102 UnimplementedService::Stub*
GetUnimplementedServiceStub()103 InteropClient::ServiceStub::GetUnimplementedServiceStub() {
104 if (unimplemented_service_stub_ == nullptr) {
105 unimplemented_service_stub_ = UnimplementedService::NewStub(channel_);
106 }
107 return unimplemented_service_stub_.get();
108 }
109
ResetChannel()110 void InteropClient::ServiceStub::ResetChannel() {
111 channel_ = channel_creation_func_();
112 if (!new_stub_every_call_) {
113 stub_ = TestService::NewStub(channel_);
114 }
115 }
116
InteropClient(ChannelCreationFunc channel_creation_func,bool new_stub_every_test_case,bool do_not_abort_on_transient_failures)117 InteropClient::InteropClient(ChannelCreationFunc channel_creation_func,
118 bool new_stub_every_test_case,
119 bool do_not_abort_on_transient_failures)
120 : serviceStub_(std::move(channel_creation_func), new_stub_every_test_case),
121 do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {}
122
AssertStatusOk(const Status & s,const std::string & optional_debug_string)123 bool InteropClient::AssertStatusOk(const Status& s,
124 const std::string& optional_debug_string) {
125 if (s.ok()) {
126 return true;
127 }
128
129 // Note: At this point, s.error_code is definitely not StatusCode::OK (we
130 // already checked for s.ok() above). So, the following will call abort()
131 // (unless s.error_code() corresponds to a transient failure and
132 // 'do_not_abort_on_transient_failures' is true)
133 return AssertStatusCode(s, StatusCode::OK, optional_debug_string);
134 }
135
AssertStatusCode(const Status & s,StatusCode expected_code,const std::string & optional_debug_string)136 bool InteropClient::AssertStatusCode(const Status& s, StatusCode expected_code,
137 const std::string& optional_debug_string) {
138 if (s.error_code() == expected_code) {
139 return true;
140 }
141
142 gpr_log(GPR_ERROR,
143 "Error status code: %d (expected: %d), message: %s,"
144 " debug string: %s",
145 s.error_code(), expected_code, s.error_message().c_str(),
146 optional_debug_string.c_str());
147
148 // In case of transient transient/retryable failures (like a broken
149 // connection) we may or may not abort (see TransientFailureOrAbort())
150 if (s.error_code() == grpc::StatusCode::UNAVAILABLE) {
151 return TransientFailureOrAbort();
152 }
153
154 abort();
155 }
156
DoEmpty()157 bool InteropClient::DoEmpty() {
158 gpr_log(GPR_DEBUG, "Sending an empty rpc...");
159
160 Empty request;
161 Empty response;
162 ClientContext context;
163
164 Status s = serviceStub_.Get()->EmptyCall(&context, request, &response);
165
166 if (!AssertStatusOk(s, context.debug_error_string())) {
167 return false;
168 }
169
170 gpr_log(GPR_DEBUG, "Empty rpc done.");
171 return true;
172 }
173
PerformLargeUnary(SimpleRequest * request,SimpleResponse * response)174 bool InteropClient::PerformLargeUnary(SimpleRequest* request,
175 SimpleResponse* response) {
176 return PerformLargeUnary(request, response, NoopChecks);
177 }
178
PerformLargeUnary(SimpleRequest * request,SimpleResponse * response,const CheckerFn & custom_checks_fn)179 bool InteropClient::PerformLargeUnary(SimpleRequest* request,
180 SimpleResponse* response,
181 const CheckerFn& custom_checks_fn) {
182 ClientContext context;
183 InteropClientContextInspector inspector(context);
184 request->set_response_size(kLargeResponseSize);
185 std::string payload(kLargeRequestSize, '\0');
186 request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
187 if (request->has_expect_compressed()) {
188 if (request->expect_compressed().value()) {
189 context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
190 } else {
191 context.set_compression_algorithm(GRPC_COMPRESS_NONE);
192 }
193 }
194
195 Status s = serviceStub_.Get()->UnaryCall(&context, *request, response);
196 if (!AssertStatusOk(s, context.debug_error_string())) {
197 return false;
198 }
199
200 custom_checks_fn(inspector, request, response);
201
202 // Payload related checks.
203 GPR_ASSERT(response->payload().body() ==
204 std::string(kLargeResponseSize, '\0'));
205 return true;
206 }
207
DoComputeEngineCreds(const std::string & default_service_account,const std::string & oauth_scope)208 bool InteropClient::DoComputeEngineCreds(
209 const std::string& default_service_account,
210 const std::string& oauth_scope) {
211 gpr_log(GPR_DEBUG,
212 "Sending a large unary rpc with compute engine credentials ...");
213 SimpleRequest request;
214 SimpleResponse response;
215 request.set_fill_username(true);
216 request.set_fill_oauth_scope(true);
217
218 if (!PerformLargeUnary(&request, &response)) {
219 return false;
220 }
221
222 gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str());
223 gpr_log(GPR_DEBUG, "Got oauth_scope %s", response.oauth_scope().c_str());
224 GPR_ASSERT(!response.username().empty());
225 GPR_ASSERT(response.username().c_str() == default_service_account);
226 GPR_ASSERT(!response.oauth_scope().empty());
227 const char* oauth_scope_str = response.oauth_scope().c_str();
228 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != std::string::npos);
229 gpr_log(GPR_DEBUG, "Large unary with compute engine creds done.");
230 return true;
231 }
232
DoOauth2AuthToken(const std::string & username,const std::string & oauth_scope)233 bool InteropClient::DoOauth2AuthToken(const std::string& username,
234 const std::string& oauth_scope) {
235 gpr_log(GPR_DEBUG,
236 "Sending a unary rpc with raw oauth2 access token credentials ...");
237 SimpleRequest request;
238 SimpleResponse response;
239 request.set_fill_username(true);
240 request.set_fill_oauth_scope(true);
241
242 ClientContext context;
243
244 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
245
246 if (!AssertStatusOk(s, context.debug_error_string())) {
247 return false;
248 }
249
250 GPR_ASSERT(!response.username().empty());
251 GPR_ASSERT(!response.oauth_scope().empty());
252 GPR_ASSERT(username == response.username());
253 const char* oauth_scope_str = response.oauth_scope().c_str();
254 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != std::string::npos);
255 gpr_log(GPR_DEBUG, "Unary with oauth2 access token credentials done.");
256 return true;
257 }
258
DoPerRpcCreds(const std::string & json_key)259 bool InteropClient::DoPerRpcCreds(const std::string& json_key) {
260 gpr_log(GPR_DEBUG, "Sending a unary rpc with per-rpc JWT access token ...");
261 SimpleRequest request;
262 SimpleResponse response;
263 request.set_fill_username(true);
264
265 ClientContext context;
266 std::chrono::seconds token_lifetime = std::chrono::hours(1);
267 std::shared_ptr<CallCredentials> creds =
268 ServiceAccountJWTAccessCredentials(json_key, token_lifetime.count());
269
270 context.set_credentials(creds);
271
272 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
273
274 if (!AssertStatusOk(s, context.debug_error_string())) {
275 return false;
276 }
277
278 GPR_ASSERT(!response.username().empty());
279 GPR_ASSERT(json_key.find(response.username()) != std::string::npos);
280 gpr_log(GPR_DEBUG, "Unary with per-rpc JWT access token done.");
281 return true;
282 }
283
DoJwtTokenCreds(const std::string & username)284 bool InteropClient::DoJwtTokenCreds(const std::string& username) {
285 gpr_log(GPR_DEBUG,
286 "Sending a large unary rpc with JWT token credentials ...");
287 SimpleRequest request;
288 SimpleResponse response;
289 request.set_fill_username(true);
290
291 if (!PerformLargeUnary(&request, &response)) {
292 return false;
293 }
294
295 GPR_ASSERT(!response.username().empty());
296 GPR_ASSERT(username.find(response.username()) != std::string::npos);
297 gpr_log(GPR_DEBUG, "Large unary with JWT token creds done.");
298 return true;
299 }
300
DoGoogleDefaultCredentials(const std::string & default_service_account)301 bool InteropClient::DoGoogleDefaultCredentials(
302 const std::string& default_service_account) {
303 gpr_log(GPR_DEBUG,
304 "Sending a large unary rpc with GoogleDefaultCredentials...");
305 SimpleRequest request;
306 SimpleResponse response;
307 request.set_fill_username(true);
308
309 if (!PerformLargeUnary(&request, &response)) {
310 return false;
311 }
312
313 gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str());
314 GPR_ASSERT(!response.username().empty());
315 GPR_ASSERT(response.username().c_str() == default_service_account);
316 gpr_log(GPR_DEBUG, "Large unary rpc with GoogleDefaultCredentials done.");
317 return true;
318 }
319
DoLargeUnary()320 bool InteropClient::DoLargeUnary() {
321 gpr_log(GPR_DEBUG, "Sending a large unary rpc...");
322 SimpleRequest request;
323 SimpleResponse response;
324 if (!PerformLargeUnary(&request, &response)) {
325 return false;
326 }
327 gpr_log(GPR_DEBUG, "Large unary done.");
328 return true;
329 }
330
DoClientCompressedUnary()331 bool InteropClient::DoClientCompressedUnary() {
332 // Probing for compression-checks support.
333 ClientContext probe_context;
334 SimpleRequest probe_req;
335 SimpleResponse probe_res;
336
337 probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE);
338 probe_req.mutable_expect_compressed()->set_value(true); // lies!
339
340 probe_req.set_response_size(kLargeResponseSize);
341 probe_req.mutable_payload()->set_body(std::string(kLargeRequestSize, '\0'));
342
343 gpr_log(GPR_DEBUG, "Sending probe for compressed unary request.");
344 const Status s =
345 serviceStub_.Get()->UnaryCall(&probe_context, probe_req, &probe_res);
346 if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) {
347 // The server isn't able to evaluate incoming compression, making the rest
348 // of this test moot.
349 gpr_log(GPR_DEBUG, "Compressed unary request probe failed");
350 return false;
351 }
352 gpr_log(GPR_DEBUG, "Compressed unary request probe succeeded. Proceeding.");
353
354 const std::vector<bool> compressions = {true, false};
355 for (size_t i = 0; i < compressions.size(); i++) {
356 std::string log_suffix =
357 absl::StrFormat("(compression=%s)", compressions[i] ? "true" : "false");
358
359 gpr_log(GPR_DEBUG, "Sending compressed unary request %s.",
360 log_suffix.c_str());
361 SimpleRequest request;
362 SimpleResponse response;
363 request.mutable_expect_compressed()->set_value(compressions[i]);
364 if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) {
365 gpr_log(GPR_ERROR, "Compressed unary request failed %s",
366 log_suffix.c_str());
367 return false;
368 }
369
370 gpr_log(GPR_DEBUG, "Compressed unary request failed %s",
371 log_suffix.c_str());
372 }
373
374 return true;
375 }
376
DoServerCompressedUnary()377 bool InteropClient::DoServerCompressedUnary() {
378 const std::vector<bool> compressions = {true, false};
379 for (size_t i = 0; i < compressions.size(); i++) {
380 std::string log_suffix =
381 absl::StrFormat("(compression=%s)", compressions[i] ? "true" : "false");
382
383 gpr_log(GPR_DEBUG, "Sending unary request for compressed response %s.",
384 log_suffix.c_str());
385 SimpleRequest request;
386 SimpleResponse response;
387 request.mutable_response_compressed()->set_value(compressions[i]);
388
389 if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) {
390 gpr_log(GPR_ERROR, "Request for compressed unary failed %s",
391 log_suffix.c_str());
392 return false;
393 }
394
395 gpr_log(GPR_DEBUG, "Request for compressed unary failed %s",
396 log_suffix.c_str());
397 }
398
399 return true;
400 }
401
402 // Either abort() (unless do_not_abort_on_transient_failures_ is true) or return
403 // false
TransientFailureOrAbort()404 bool InteropClient::TransientFailureOrAbort() {
405 if (do_not_abort_on_transient_failures_) {
406 return false;
407 }
408
409 abort();
410 }
411
DoRequestStreaming()412 bool InteropClient::DoRequestStreaming() {
413 gpr_log(GPR_DEBUG, "Sending request steaming rpc ...");
414
415 ClientContext context;
416 StreamingInputCallRequest request;
417 StreamingInputCallResponse response;
418
419 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
420 serviceStub_.Get()->StreamingInputCall(&context, &response));
421
422 int aggregated_payload_size = 0;
423 for (size_t i = 0; i < request_stream_sizes.size(); ++i) {
424 Payload* payload = request.mutable_payload();
425 payload->set_body(std::string(request_stream_sizes[i], '\0'));
426 if (!stream->Write(request)) {
427 gpr_log(GPR_ERROR, "DoRequestStreaming(): stream->Write() failed");
428 return TransientFailureOrAbort();
429 }
430 aggregated_payload_size += request_stream_sizes[i];
431 }
432 GPR_ASSERT(stream->WritesDone());
433
434 Status s = stream->Finish();
435 if (!AssertStatusOk(s, context.debug_error_string())) {
436 return false;
437 }
438
439 GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
440 return true;
441 }
442
DoResponseStreaming()443 bool InteropClient::DoResponseStreaming() {
444 gpr_log(GPR_DEBUG, "Receiving response streaming rpc ...");
445
446 ClientContext context;
447 StreamingOutputCallRequest request;
448 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
449 ResponseParameters* response_parameter = request.add_response_parameters();
450 response_parameter->set_size(response_stream_sizes[i]);
451 }
452 StreamingOutputCallResponse response;
453 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
454 serviceStub_.Get()->StreamingOutputCall(&context, request));
455
456 unsigned int i = 0;
457 while (stream->Read(&response)) {
458 GPR_ASSERT(response.payload().body() ==
459 std::string(response_stream_sizes[i], '\0'));
460 ++i;
461 }
462
463 if (i < response_stream_sizes.size()) {
464 // stream->Read() failed before reading all the expected messages. This is
465 // most likely due to connection failure.
466 gpr_log(GPR_ERROR,
467 "DoResponseStreaming(): Read fewer streams (%d) than "
468 "response_stream_sizes.size() (%" PRIuPTR ")",
469 i, response_stream_sizes.size());
470 return TransientFailureOrAbort();
471 }
472
473 Status s = stream->Finish();
474 if (!AssertStatusOk(s, context.debug_error_string())) {
475 return false;
476 }
477
478 gpr_log(GPR_DEBUG, "Response streaming done.");
479 return true;
480 }
481
DoClientCompressedStreaming()482 bool InteropClient::DoClientCompressedStreaming() {
483 // Probing for compression-checks support.
484 ClientContext probe_context;
485 StreamingInputCallRequest probe_req;
486 StreamingInputCallResponse probe_res;
487
488 probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE);
489 probe_req.mutable_expect_compressed()->set_value(true); // lies!
490 probe_req.mutable_payload()->set_body(std::string(27182, '\0'));
491
492 gpr_log(GPR_DEBUG, "Sending probe for compressed streaming request.");
493
494 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> probe_stream(
495 serviceStub_.Get()->StreamingInputCall(&probe_context, &probe_res));
496
497 if (!probe_stream->Write(probe_req)) {
498 gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
499 return TransientFailureOrAbort();
500 }
501 Status s = probe_stream->Finish();
502 if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) {
503 // The server isn't able to evaluate incoming compression, making the rest
504 // of this test moot.
505 gpr_log(GPR_DEBUG, "Compressed streaming request probe failed");
506 return false;
507 }
508 gpr_log(GPR_DEBUG,
509 "Compressed streaming request probe succeeded. Proceeding.");
510
511 ClientContext context;
512 StreamingInputCallRequest request;
513 StreamingInputCallResponse response;
514
515 context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
516 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
517 serviceStub_.Get()->StreamingInputCall(&context, &response));
518
519 request.mutable_payload()->set_body(std::string(27182, '\0'));
520 request.mutable_expect_compressed()->set_value(true);
521 gpr_log(GPR_DEBUG, "Sending streaming request with compression enabled");
522 if (!stream->Write(request)) {
523 gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
524 return TransientFailureOrAbort();
525 }
526
527 WriteOptions wopts;
528 wopts.set_no_compression();
529 request.mutable_payload()->set_body(std::string(45904, '\0'));
530 request.mutable_expect_compressed()->set_value(false);
531 gpr_log(GPR_DEBUG, "Sending streaming request with compression disabled");
532 if (!stream->Write(request, wopts)) {
533 gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
534 return TransientFailureOrAbort();
535 }
536 GPR_ASSERT(stream->WritesDone());
537
538 s = stream->Finish();
539 if (!AssertStatusOk(s, context.debug_error_string())) {
540 return false;
541 }
542
543 return true;
544 }
545
DoServerCompressedStreaming()546 bool InteropClient::DoServerCompressedStreaming() {
547 const std::vector<bool> compressions = {true, false};
548 const std::vector<int> sizes = {31415, 92653};
549
550 ClientContext context;
551 InteropClientContextInspector inspector(context);
552 StreamingOutputCallRequest request;
553
554 GPR_ASSERT(compressions.size() == sizes.size());
555 for (size_t i = 0; i < sizes.size(); i++) {
556 std::string log_suffix =
557 absl::StrFormat("(compression=%s; size=%d)",
558 compressions[i] ? "true" : "false", sizes[i]);
559
560 gpr_log(GPR_DEBUG, "Sending request streaming rpc %s.", log_suffix.c_str());
561
562 ResponseParameters* const response_parameter =
563 request.add_response_parameters();
564 response_parameter->mutable_compressed()->set_value(compressions[i]);
565 response_parameter->set_size(sizes[i]);
566 }
567 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
568 serviceStub_.Get()->StreamingOutputCall(&context, request));
569
570 size_t k = 0;
571 StreamingOutputCallResponse response;
572 while (stream->Read(&response)) {
573 // Payload size checks.
574 GPR_ASSERT(response.payload().body() ==
575 std::string(request.response_parameters(k).size(), '\0'));
576
577 // Compression checks.
578 GPR_ASSERT(request.response_parameters(k).has_compressed());
579 if (request.response_parameters(k).compressed().value()) {
580 GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > GRPC_COMPRESS_NONE);
581 GPR_ASSERT(inspector.WasCompressed());
582 } else {
583 // requested *no* compression.
584 GPR_ASSERT(!(inspector.WasCompressed()));
585 }
586 ++k;
587 }
588
589 if (k < sizes.size()) {
590 // stream->Read() failed before reading all the expected messages. This
591 // is most likely due to a connection failure.
592 gpr_log(GPR_ERROR,
593 "%s(): Responses read (k=%" PRIuPTR
594 ") is less than the expected number of messages (%" PRIuPTR ").",
595 __func__, k, sizes.size());
596 return TransientFailureOrAbort();
597 }
598
599 Status s = stream->Finish();
600 if (!AssertStatusOk(s, context.debug_error_string())) {
601 return false;
602 }
603 return true;
604 }
605
DoResponseStreamingWithSlowConsumer()606 bool InteropClient::DoResponseStreamingWithSlowConsumer() {
607 gpr_log(GPR_DEBUG, "Receiving response streaming rpc with slow consumer ...");
608
609 ClientContext context;
610 StreamingOutputCallRequest request;
611
612 for (int i = 0; i < kNumResponseMessages; ++i) {
613 ResponseParameters* response_parameter = request.add_response_parameters();
614 response_parameter->set_size(kResponseMessageSize);
615 }
616 StreamingOutputCallResponse response;
617 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
618 serviceStub_.Get()->StreamingOutputCall(&context, request));
619
620 int i = 0;
621 while (stream->Read(&response)) {
622 GPR_ASSERT(response.payload().body() ==
623 std::string(kResponseMessageSize, '\0'));
624 gpr_log(GPR_DEBUG, "received message %d", i);
625 gpr_sleep_until(gpr_time_add(
626 gpr_now(GPR_CLOCK_REALTIME),
627 gpr_time_from_millis(kReceiveDelayMilliSeconds, GPR_TIMESPAN)));
628 ++i;
629 }
630
631 if (i < kNumResponseMessages) {
632 gpr_log(GPR_ERROR,
633 "DoResponseStreamingWithSlowConsumer(): Responses read (i=%d) is "
634 "less than the expected messages (i.e kNumResponseMessages = %d)",
635 i, kNumResponseMessages);
636
637 return TransientFailureOrAbort();
638 }
639
640 Status s = stream->Finish();
641 if (!AssertStatusOk(s, context.debug_error_string())) {
642 return false;
643 }
644
645 gpr_log(GPR_DEBUG, "Response streaming done.");
646 return true;
647 }
648
DoHalfDuplex()649 bool InteropClient::DoHalfDuplex() {
650 gpr_log(GPR_DEBUG, "Sending half-duplex streaming rpc ...");
651
652 ClientContext context;
653 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
654 StreamingOutputCallResponse>>
655 stream(serviceStub_.Get()->HalfDuplexCall(&context));
656
657 StreamingOutputCallRequest request;
658 ResponseParameters* response_parameter = request.add_response_parameters();
659 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
660 response_parameter->set_size(response_stream_sizes[i]);
661
662 if (!stream->Write(request)) {
663 gpr_log(GPR_ERROR, "DoHalfDuplex(): stream->Write() failed. i=%d", i);
664 return TransientFailureOrAbort();
665 }
666 }
667 stream->WritesDone();
668
669 unsigned int i = 0;
670 StreamingOutputCallResponse response;
671 while (stream->Read(&response)) {
672 GPR_ASSERT(response.payload().body() ==
673 std::string(response_stream_sizes[i], '\0'));
674 ++i;
675 }
676
677 if (i < response_stream_sizes.size()) {
678 // stream->Read() failed before reading all the expected messages. This is
679 // most likely due to a connection failure
680 gpr_log(GPR_ERROR,
681 "DoHalfDuplex(): Responses read (i=%d) are less than the expected "
682 "number of messages response_stream_sizes.size() (%" PRIuPTR ")",
683 i, response_stream_sizes.size());
684 return TransientFailureOrAbort();
685 }
686
687 Status s = stream->Finish();
688 if (!AssertStatusOk(s, context.debug_error_string())) {
689 return false;
690 }
691
692 gpr_log(GPR_DEBUG, "Half-duplex streaming rpc done.");
693 return true;
694 }
695
DoPingPong()696 bool InteropClient::DoPingPong() {
697 gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
698
699 ClientContext context;
700 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
701 StreamingOutputCallResponse>>
702 stream(serviceStub_.Get()->FullDuplexCall(&context));
703
704 StreamingOutputCallRequest request;
705 ResponseParameters* response_parameter = request.add_response_parameters();
706 Payload* payload = request.mutable_payload();
707 StreamingOutputCallResponse response;
708
709 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
710 response_parameter->set_size(response_stream_sizes[i]);
711 payload->set_body(std::string(request_stream_sizes[i], '\0'));
712
713 if (!stream->Write(request)) {
714 gpr_log(GPR_ERROR, "DoPingPong(): stream->Write() failed. i: %d", i);
715 return TransientFailureOrAbort();
716 }
717
718 if (!stream->Read(&response)) {
719 gpr_log(GPR_ERROR, "DoPingPong(): stream->Read() failed. i:%d", i);
720 return TransientFailureOrAbort();
721 }
722
723 GPR_ASSERT(response.payload().body() ==
724 std::string(response_stream_sizes[i], '\0'));
725 }
726
727 stream->WritesDone();
728
729 GPR_ASSERT(!stream->Read(&response));
730
731 Status s = stream->Finish();
732 if (!AssertStatusOk(s, context.debug_error_string())) {
733 return false;
734 }
735
736 gpr_log(GPR_DEBUG, "Ping pong streaming done.");
737 return true;
738 }
739
DoCancelAfterBegin()740 bool InteropClient::DoCancelAfterBegin() {
741 gpr_log(GPR_DEBUG, "Sending request streaming rpc ...");
742
743 ClientContext context;
744 StreamingInputCallRequest request;
745 StreamingInputCallResponse response;
746
747 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
748 serviceStub_.Get()->StreamingInputCall(&context, &response));
749
750 gpr_log(GPR_DEBUG, "Trying to cancel...");
751 context.TryCancel();
752 Status s = stream->Finish();
753
754 if (!AssertStatusCode(s, StatusCode::CANCELLED,
755 context.debug_error_string())) {
756 return false;
757 }
758
759 gpr_log(GPR_DEBUG, "Canceling streaming done.");
760 return true;
761 }
762
DoCancelAfterFirstResponse()763 bool InteropClient::DoCancelAfterFirstResponse() {
764 gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
765
766 ClientContext context;
767 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
768 StreamingOutputCallResponse>>
769 stream(serviceStub_.Get()->FullDuplexCall(&context));
770
771 StreamingOutputCallRequest request;
772 ResponseParameters* response_parameter = request.add_response_parameters();
773 response_parameter->set_size(31415);
774 request.mutable_payload()->set_body(std::string(27182, '\0'));
775 StreamingOutputCallResponse response;
776
777 if (!stream->Write(request)) {
778 gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Write() failed");
779 return TransientFailureOrAbort();
780 }
781
782 if (!stream->Read(&response)) {
783 gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Read failed");
784 return TransientFailureOrAbort();
785 }
786 GPR_ASSERT(response.payload().body() == std::string(31415, '\0'));
787
788 gpr_log(GPR_DEBUG, "Trying to cancel...");
789 context.TryCancel();
790
791 Status s = stream->Finish();
792 gpr_log(GPR_DEBUG, "Canceling pingpong streaming done.");
793 return true;
794 }
795
DoTimeoutOnSleepingServer()796 bool InteropClient::DoTimeoutOnSleepingServer() {
797 gpr_log(GPR_DEBUG,
798 "Sending Ping Pong streaming rpc with a short deadline...");
799
800 ClientContext context;
801 std::chrono::system_clock::time_point deadline =
802 std::chrono::system_clock::now() + std::chrono::milliseconds(1);
803 context.set_deadline(deadline);
804 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
805 StreamingOutputCallResponse>>
806 stream(serviceStub_.Get()->FullDuplexCall(&context));
807
808 StreamingOutputCallRequest request;
809 request.mutable_payload()->set_body(std::string(27182, '\0'));
810 stream->Write(request);
811
812 Status s = stream->Finish();
813 if (!AssertStatusCode(s, StatusCode::DEADLINE_EXCEEDED,
814 context.debug_error_string())) {
815 return false;
816 }
817
818 gpr_log(GPR_DEBUG, "Pingpong streaming timeout done.");
819 return true;
820 }
821
DoEmptyStream()822 bool InteropClient::DoEmptyStream() {
823 gpr_log(GPR_DEBUG, "Starting empty_stream.");
824
825 ClientContext context;
826 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
827 StreamingOutputCallResponse>>
828 stream(serviceStub_.Get()->FullDuplexCall(&context));
829 stream->WritesDone();
830 StreamingOutputCallResponse response;
831 GPR_ASSERT(stream->Read(&response) == false);
832
833 Status s = stream->Finish();
834 if (!AssertStatusOk(s, context.debug_error_string())) {
835 return false;
836 }
837
838 gpr_log(GPR_DEBUG, "empty_stream done.");
839 return true;
840 }
841
DoStatusWithMessage()842 bool InteropClient::DoStatusWithMessage() {
843 gpr_log(GPR_DEBUG,
844 "Sending RPC with a request for status code 2 and message");
845
846 const grpc::StatusCode test_code = grpc::StatusCode::UNKNOWN;
847 const std::string test_msg = "This is a test message";
848
849 // Test UnaryCall.
850 ClientContext context;
851 SimpleRequest request;
852 SimpleResponse response;
853 EchoStatus* requested_status = request.mutable_response_status();
854 requested_status->set_code(test_code);
855 requested_status->set_message(test_msg);
856 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
857 if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN,
858 context.debug_error_string())) {
859 return false;
860 }
861 GPR_ASSERT(s.error_message() == test_msg);
862
863 // Test FullDuplexCall.
864 ClientContext stream_context;
865 std::shared_ptr<ClientReaderWriter<StreamingOutputCallRequest,
866 StreamingOutputCallResponse>>
867 stream(serviceStub_.Get()->FullDuplexCall(&stream_context));
868 StreamingOutputCallRequest streaming_request;
869 requested_status = streaming_request.mutable_response_status();
870 requested_status->set_code(test_code);
871 requested_status->set_message(test_msg);
872 stream->Write(streaming_request);
873 stream->WritesDone();
874 StreamingOutputCallResponse streaming_response;
875 while (stream->Read(&streaming_response))
876 ;
877 s = stream->Finish();
878 if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN,
879 context.debug_error_string())) {
880 return false;
881 }
882 GPR_ASSERT(s.error_message() == test_msg);
883
884 gpr_log(GPR_DEBUG, "Done testing Status and Message");
885 return true;
886 }
887
DoCacheableUnary()888 bool InteropClient::DoCacheableUnary() {
889 gpr_log(GPR_DEBUG, "Sending RPC with cacheable response");
890
891 // Create request with current timestamp
892 gpr_timespec ts = gpr_now(GPR_CLOCK_PRECISE);
893 std::string timestamp =
894 std::to_string(static_cast<long long unsigned>(ts.tv_nsec));
895 SimpleRequest request;
896 request.mutable_payload()->set_body(timestamp.c_str(), timestamp.size());
897
898 // Request 1
899 ClientContext context1;
900 SimpleResponse response1;
901 context1.set_cacheable(true);
902 // Add fake user IP since some proxy's (GFE) won't cache requests from
903 // localhost.
904 context1.AddMetadata("x-user-ip", "1.2.3.4");
905 Status s1 =
906 serviceStub_.Get()->CacheableUnaryCall(&context1, request, &response1);
907 if (!AssertStatusOk(s1, context1.debug_error_string())) {
908 return false;
909 }
910 gpr_log(GPR_DEBUG, "response 1 payload: %s",
911 response1.payload().body().c_str());
912
913 // Request 2
914 ClientContext context2;
915 SimpleResponse response2;
916 context2.set_cacheable(true);
917 context2.AddMetadata("x-user-ip", "1.2.3.4");
918 Status s2 =
919 serviceStub_.Get()->CacheableUnaryCall(&context2, request, &response2);
920 if (!AssertStatusOk(s2, context2.debug_error_string())) {
921 return false;
922 }
923 gpr_log(GPR_DEBUG, "response 2 payload: %s",
924 response2.payload().body().c_str());
925
926 // Check that the body is same for both requests. It will be the same if the
927 // second response is a cached copy of the first response
928 GPR_ASSERT(response2.payload().body() == response1.payload().body());
929
930 // Request 3
931 // Modify the request body so it will not get a cache hit
932 ts = gpr_now(GPR_CLOCK_PRECISE);
933 timestamp = std::to_string(static_cast<long long unsigned>(ts.tv_nsec));
934 SimpleRequest request1;
935 request1.mutable_payload()->set_body(timestamp.c_str(), timestamp.size());
936 ClientContext context3;
937 SimpleResponse response3;
938 context3.set_cacheable(true);
939 context3.AddMetadata("x-user-ip", "1.2.3.4");
940 Status s3 =
941 serviceStub_.Get()->CacheableUnaryCall(&context3, request1, &response3);
942 if (!AssertStatusOk(s3, context3.debug_error_string())) {
943 return false;
944 }
945 gpr_log(GPR_DEBUG, "response 3 payload: %s",
946 response3.payload().body().c_str());
947
948 // Check that the response is different from the previous response.
949 GPR_ASSERT(response3.payload().body() != response1.payload().body());
950 return true;
951 }
952
DoPickFirstUnary()953 bool InteropClient::DoPickFirstUnary() {
954 const int rpcCount = 100;
955 SimpleRequest request;
956 SimpleResponse response;
957 std::string server_id;
958 request.set_fill_server_id(true);
959 for (int i = 0; i < rpcCount; i++) {
960 ClientContext context;
961 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
962 if (!AssertStatusOk(s, context.debug_error_string())) {
963 return false;
964 }
965 if (i == 0) {
966 server_id = response.server_id();
967 continue;
968 }
969 if (response.server_id() != server_id) {
970 gpr_log(GPR_ERROR, "#%d rpc hits server_id %s, expect server_id %s", i,
971 response.server_id().c_str(), server_id.c_str());
972 return false;
973 }
974 }
975 gpr_log(GPR_DEBUG, "pick first unary successfully finished");
976 return true;
977 }
978
DoCustomMetadata()979 bool InteropClient::DoCustomMetadata() {
980 const std::string kEchoInitialMetadataKey("x-grpc-test-echo-initial");
981 const std::string kInitialMetadataValue("test_initial_metadata_value");
982 const std::string kEchoTrailingBinMetadataKey(
983 "x-grpc-test-echo-trailing-bin");
984 const std::string kTrailingBinValue("\x0a\x0b\x0a\x0b\x0a\x0b");
985 ;
986
987 {
988 gpr_log(GPR_DEBUG, "Sending RPC with custom metadata");
989 ClientContext context;
990 context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue);
991 context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue);
992 SimpleRequest request;
993 SimpleResponse response;
994 request.set_response_size(kLargeResponseSize);
995 std::string payload(kLargeRequestSize, '\0');
996 request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
997
998 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
999 if (!AssertStatusOk(s, context.debug_error_string())) {
1000 return false;
1001 }
1002
1003 const auto& server_initial_metadata = context.GetServerInitialMetadata();
1004 auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
1005 GPR_ASSERT(iter != server_initial_metadata.end());
1006 GPR_ASSERT(iter->second == kInitialMetadataValue);
1007 const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
1008 iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
1009 GPR_ASSERT(iter != server_trailing_metadata.end());
1010 GPR_ASSERT(std::string(iter->second.begin(), iter->second.end()) ==
1011 kTrailingBinValue);
1012
1013 gpr_log(GPR_DEBUG, "Done testing RPC with custom metadata");
1014 }
1015
1016 {
1017 gpr_log(GPR_DEBUG, "Sending stream with custom metadata");
1018 ClientContext context;
1019 context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue);
1020 context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue);
1021 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
1022 StreamingOutputCallResponse>>
1023 stream(serviceStub_.Get()->FullDuplexCall(&context));
1024
1025 StreamingOutputCallRequest request;
1026 ResponseParameters* response_parameter = request.add_response_parameters();
1027 response_parameter->set_size(kLargeResponseSize);
1028 std::string payload(kLargeRequestSize, '\0');
1029 request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
1030 StreamingOutputCallResponse response;
1031
1032 if (!stream->Write(request)) {
1033 gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Write() failed");
1034 return TransientFailureOrAbort();
1035 }
1036
1037 stream->WritesDone();
1038
1039 if (!stream->Read(&response)) {
1040 gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Read() failed");
1041 return TransientFailureOrAbort();
1042 }
1043
1044 GPR_ASSERT(response.payload().body() ==
1045 std::string(kLargeResponseSize, '\0'));
1046
1047 GPR_ASSERT(!stream->Read(&response));
1048
1049 Status s = stream->Finish();
1050 if (!AssertStatusOk(s, context.debug_error_string())) {
1051 return false;
1052 }
1053
1054 const auto& server_initial_metadata = context.GetServerInitialMetadata();
1055 auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
1056 GPR_ASSERT(iter != server_initial_metadata.end());
1057 GPR_ASSERT(iter->second == kInitialMetadataValue);
1058 const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
1059 iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
1060 GPR_ASSERT(iter != server_trailing_metadata.end());
1061 GPR_ASSERT(std::string(iter->second.begin(), iter->second.end()) ==
1062 kTrailingBinValue);
1063
1064 gpr_log(GPR_DEBUG, "Done testing stream with custom metadata");
1065 }
1066
1067 return true;
1068 }
1069
1070 std::tuple<bool, int32_t, std::string>
PerformOneSoakTestIteration(const bool reset_channel,const int32_t max_acceptable_per_iteration_latency_ms)1071 InteropClient::PerformOneSoakTestIteration(
1072 const bool reset_channel,
1073 const int32_t max_acceptable_per_iteration_latency_ms) {
1074 gpr_timespec start = gpr_now(GPR_CLOCK_MONOTONIC);
1075 SimpleRequest request;
1076 SimpleResponse response;
1077 // Don't set the deadline on the RPC, and instead just
1078 // record how long the RPC took and compare. This makes
1079 // debugging easier when looking at failure results.
1080 ClientContext context;
1081 InteropClientContextInspector inspector(context);
1082 request.set_response_size(kLargeResponseSize);
1083 std::string payload(kLargeRequestSize, '\0');
1084 request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
1085 if (reset_channel) {
1086 serviceStub_.ResetChannel();
1087 }
1088 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
1089 gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
1090 int32_t elapsed_ms = gpr_time_to_millis(gpr_time_sub(now, start));
1091 if (!s.ok()) {
1092 return std::make_tuple(false, elapsed_ms, context.debug_error_string());
1093 } else if (elapsed_ms > max_acceptable_per_iteration_latency_ms) {
1094 std::string debug_string =
1095 absl::StrFormat("%d ms exceeds max acceptable latency: %d ms.",
1096 elapsed_ms, max_acceptable_per_iteration_latency_ms);
1097 return std::make_tuple(false, elapsed_ms, std::move(debug_string));
1098 } else {
1099 return std::make_tuple(true, elapsed_ms, "");
1100 }
1101 }
1102
PerformSoakTest(const bool reset_channel_per_iteration,const int32_t soak_iterations,const int32_t max_failures,const int32_t max_acceptable_per_iteration_latency_ms,const int32_t overall_timeout_seconds)1103 void InteropClient::PerformSoakTest(
1104 const bool reset_channel_per_iteration, const int32_t soak_iterations,
1105 const int32_t max_failures,
1106 const int32_t max_acceptable_per_iteration_latency_ms,
1107 const int32_t overall_timeout_seconds) {
1108 std::vector<std::tuple<bool, int32_t, std::string>> results;
1109 grpc_histogram* latencies_ms_histogram = grpc_histogram_create(
1110 1 /* resolution */,
1111 500 * 1e3 /* largest bucket; 500 seconds is unlikely */);
1112 gpr_timespec overall_deadline = gpr_time_add(
1113 gpr_now(GPR_CLOCK_MONOTONIC),
1114 gpr_time_from_seconds(overall_timeout_seconds, GPR_TIMESPAN));
1115 int32_t iterations_ran = 0;
1116 for (int i = 0;
1117 i < soak_iterations &&
1118 gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), overall_deadline) < 0;
1119 ++i) {
1120 auto result = PerformOneSoakTestIteration(
1121 reset_channel_per_iteration, max_acceptable_per_iteration_latency_ms);
1122 results.push_back(result);
1123 grpc_histogram_add(latencies_ms_histogram, std::get<1>(result));
1124 iterations_ran++;
1125 }
1126 int total_failures = 0;
1127 for (size_t i = 0; i < results.size(); i++) {
1128 bool success = std::get<0>(results[i]);
1129 int32_t elapsed_ms = std::get<1>(results[i]);
1130 std::string debug_string = std::get<2>(results[i]);
1131 if (!success) {
1132 gpr_log(GPR_DEBUG, "soak iteration: %ld elapsed_ms: %d failed: %s", i,
1133 elapsed_ms, debug_string.c_str());
1134 total_failures++;
1135 } else {
1136 gpr_log(GPR_DEBUG, "soak iteration: %ld elapsed_ms: %d succeeded", i,
1137 elapsed_ms);
1138 }
1139 }
1140 double latency_ms_median =
1141 grpc_histogram_percentile(latencies_ms_histogram, 50);
1142 double latency_ms_90th =
1143 grpc_histogram_percentile(latencies_ms_histogram, 90);
1144 double latency_ms_worst = grpc_histogram_maximum(latencies_ms_histogram);
1145 grpc_histogram_destroy(latencies_ms_histogram);
1146 if (iterations_ran < soak_iterations) {
1147 gpr_log(
1148 GPR_ERROR,
1149 "soak test consumed all %d seconds of time and quit early, only "
1150 "having ran %d out of desired %d iterations. "
1151 "total_failures: %d. "
1152 "max_failures_threshold: %d. "
1153 "median_soak_iteration_latency: %lf ms. "
1154 "90th_soak_iteration_latency: %lf ms. "
1155 "worst_soak_iteration_latency: %lf ms. "
1156 "Some or all of the iterations that did run were unexpectedly slow. "
1157 "See breakdown above for which iterations succeeded, failed, and "
1158 "why for more info.",
1159 overall_timeout_seconds, iterations_ran, soak_iterations,
1160 total_failures, max_failures, latency_ms_median, latency_ms_90th,
1161 latency_ms_worst);
1162 GPR_ASSERT(0);
1163 } else if (total_failures > max_failures) {
1164 gpr_log(GPR_ERROR,
1165 "soak test ran: %d iterations. total_failures: %d exceeds "
1166 "max_failures_threshold: %d. "
1167 "median_soak_iteration_latency: %lf ms. "
1168 "90th_soak_iteration_latency: %lf ms. "
1169 "worst_soak_iteration_latency: %lf ms. "
1170 "See breakdown above for which iterations succeeded, failed, and "
1171 "why for more info.",
1172 soak_iterations, total_failures, max_failures, latency_ms_median,
1173 latency_ms_90th, latency_ms_worst);
1174 GPR_ASSERT(0);
1175 } else {
1176 gpr_log(GPR_INFO,
1177 "soak test ran: %d iterations. total_failures: %d is within "
1178 "max_failures_threshold: %d. "
1179 "median_soak_iteration_latency: %lf ms. "
1180 "90th_soak_iteration_latency: %lf ms. "
1181 "worst_soak_iteration_latency: %lf ms. "
1182 "See breakdown above for which iterations succeeded, failed, and "
1183 "why for more info.",
1184 soak_iterations, total_failures, max_failures, latency_ms_median,
1185 latency_ms_90th, latency_ms_worst);
1186 }
1187 }
1188
DoRpcSoakTest(int32_t soak_iterations,int32_t max_failures,int64_t max_acceptable_per_iteration_latency_ms,int32_t overall_timeout_seconds)1189 bool InteropClient::DoRpcSoakTest(
1190 int32_t soak_iterations, int32_t max_failures,
1191 int64_t max_acceptable_per_iteration_latency_ms,
1192 int32_t overall_timeout_seconds) {
1193 gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations);
1194 GPR_ASSERT(soak_iterations > 0);
1195 PerformSoakTest(false /* reset channel per iteration */, soak_iterations,
1196 max_failures, max_acceptable_per_iteration_latency_ms,
1197 overall_timeout_seconds);
1198 gpr_log(GPR_DEBUG, "rpc_soak test done.");
1199 return true;
1200 }
1201
DoChannelSoakTest(int32_t soak_iterations,int32_t max_failures,int64_t max_acceptable_per_iteration_latency_ms,int32_t overall_timeout_seconds)1202 bool InteropClient::DoChannelSoakTest(
1203 int32_t soak_iterations, int32_t max_failures,
1204 int64_t max_acceptable_per_iteration_latency_ms,
1205 int32_t overall_timeout_seconds) {
1206 gpr_log(GPR_DEBUG, "Sending %d RPCs, tearing down the channel each time...",
1207 soak_iterations);
1208 GPR_ASSERT(soak_iterations > 0);
1209 PerformSoakTest(true /* reset channel per iteration */, soak_iterations,
1210 max_failures, max_acceptable_per_iteration_latency_ms,
1211 overall_timeout_seconds);
1212 gpr_log(GPR_DEBUG, "channel_soak test done.");
1213 return true;
1214 }
1215
DoLongLivedChannelTest(int32_t soak_iterations,int32_t iteration_interval)1216 bool InteropClient::DoLongLivedChannelTest(int32_t soak_iterations,
1217 int32_t iteration_interval) {
1218 gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations);
1219 GPR_ASSERT(soak_iterations > 0);
1220 GPR_ASSERT(iteration_interval > 0);
1221 SimpleRequest request;
1222 SimpleResponse response;
1223 int num_failures = 0;
1224 for (int i = 0; i < soak_iterations; ++i) {
1225 gpr_log(GPR_DEBUG, "Sending RPC number %d...", i);
1226 if (!PerformLargeUnary(&request, &response)) {
1227 gpr_log(GPR_ERROR, "Iteration %d failed.", i);
1228 num_failures++;
1229 }
1230 gpr_sleep_until(
1231 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
1232 gpr_time_from_seconds(iteration_interval, GPR_TIMESPAN)));
1233 }
1234 if (num_failures == 0) {
1235 gpr_log(GPR_DEBUG, "long_lived_channel test done.");
1236 return true;
1237 } else {
1238 gpr_log(GPR_DEBUG, "long_lived_channel test failed with %d rpc failures.",
1239 num_failures);
1240 return false;
1241 }
1242 }
1243
DoUnimplementedService()1244 bool InteropClient::DoUnimplementedService() {
1245 gpr_log(GPR_DEBUG, "Sending a request for an unimplemented service...");
1246
1247 Empty request;
1248 Empty response;
1249 ClientContext context;
1250
1251 UnimplementedService::Stub* stub = serviceStub_.GetUnimplementedServiceStub();
1252
1253 Status s = stub->UnimplementedCall(&context, request, &response);
1254
1255 if (!AssertStatusCode(s, StatusCode::UNIMPLEMENTED,
1256 context.debug_error_string())) {
1257 return false;
1258 }
1259
1260 gpr_log(GPR_DEBUG, "unimplemented service done.");
1261 return true;
1262 }
1263
DoUnimplementedMethod()1264 bool InteropClient::DoUnimplementedMethod() {
1265 gpr_log(GPR_DEBUG, "Sending a request for an unimplemented rpc...");
1266
1267 Empty request;
1268 Empty response;
1269 ClientContext context;
1270
1271 Status s =
1272 serviceStub_.Get()->UnimplementedCall(&context, request, &response);
1273
1274 if (!AssertStatusCode(s, StatusCode::UNIMPLEMENTED,
1275 context.debug_error_string())) {
1276 return false;
1277 }
1278
1279 gpr_log(GPR_DEBUG, "unimplemented rpc done.");
1280 return true;
1281 }
1282
1283 } // namespace testing
1284 } // namespace grpc
1285