1 //
2 //
3 // Copyright 2015-2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/cpp/interop/interop_client.h"
20
21 #include <grpc/grpc.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/string_util.h>
24 #include <grpc/support/time.h>
25 #include <grpcpp/channel.h>
26 #include <grpcpp/client_context.h>
27 #include <grpcpp/security/credentials.h>
28
29 #include <cinttypes>
30 #include <fstream>
31 #include <memory>
32 #include <string>
33 #include <type_traits>
34 #include <utility>
35
36 #include "absl/cleanup/cleanup.h"
37 #include "absl/log/check.h"
38 #include "absl/log/log.h"
39 #include "absl/strings/match.h"
40 #include "absl/strings/str_format.h"
41 #include "absl/strings/str_join.h"
42 #include "absl/types/optional.h"
43 #include "src/core/config/config_vars.h"
44 #include "src/core/config/core_configuration.h"
45 #include "src/core/util/crash.h"
46 #include "src/proto/grpc/testing/empty.pb.h"
47 #include "src/proto/grpc/testing/messages.pb.h"
48 #include "src/proto/grpc/testing/test.grpc.pb.h"
49 #include "test/core/test_util/histogram.h"
50 #include "test/cpp/interop/backend_metrics_lb_policy.h"
51 #include "test/cpp/interop/client_helper.h"
52
53 namespace grpc {
54 namespace testing {
55
56 namespace {
57 // The same value is defined by the Java client.
58 const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
59 const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979};
60 const int kNumResponseMessages = 2000;
61 const int kResponseMessageSize = 1030;
62 const int kReceiveDelayMilliSeconds = 20;
63 const int kLargeRequestSize = 271828;
64 const int kLargeResponseSize = 314159;
65
NoopChecks(const InteropClientContextInspector &,const SimpleRequest *,const SimpleResponse *)66 void NoopChecks(const InteropClientContextInspector& /*inspector*/,
67 const SimpleRequest* /*request*/,
68 const SimpleResponse* /*response*/) {}
69
UnaryCompressionChecks(const InteropClientContextInspector & inspector,const SimpleRequest * request,const SimpleResponse *)70 void UnaryCompressionChecks(const InteropClientContextInspector& inspector,
71 const SimpleRequest* request,
72 const SimpleResponse* /*response*/) {
73 const grpc_compression_algorithm received_compression =
74 inspector.GetCallCompressionAlgorithm();
75 if (request->response_compressed().value()) {
76 if (received_compression == GRPC_COMPRESS_NONE) {
77 // Requested some compression, got NONE. This is an error.
78 grpc_core::Crash(
79 "Failure: Requested compression but got uncompressed response "
80 "from server.");
81 }
82 CHECK(inspector.WasCompressed());
83 } else {
84 // Didn't request compression -> make sure the response is uncompressed
85 CHECK(!(inspector.WasCompressed()));
86 }
87 }
88
ValuesDiff(absl::string_view field,double expected,double actual)89 absl::optional<std::string> ValuesDiff(absl::string_view field, double expected,
90 double actual) {
91 if (expected != actual) {
92 return absl::StrFormat("%s: expected: %f, actual: %f", field, expected,
93 actual);
94 }
95 return absl::nullopt;
96 }
97
98 template <typename Map>
MapsDiff(absl::string_view path,const Map & expected,const Map & actual)99 absl::optional<std::string> MapsDiff(absl::string_view path,
100 const Map& expected, const Map& actual) {
101 auto result = ValuesDiff(absl::StrFormat("%s size", path), expected.size(),
102 actual.size());
103 if (result.has_value()) {
104 return result;
105 }
106 for (const auto& key_value : expected) {
107 auto it = actual.find(key_value.first);
108 if (it == actual.end()) {
109 return absl::StrFormat("In field %s, key %s was not found", path,
110 key_value.first);
111 }
112 result = ValuesDiff(absl::StrFormat("%s/%s", path, key_value.first),
113 key_value.second, it->second);
114 if (result.has_value()) {
115 return result;
116 }
117 }
118 return absl::nullopt;
119 }
120
OrcaLoadReportsDiff(const TestOrcaReport & expected,const TestOrcaReport & actual)121 absl::optional<std::string> OrcaLoadReportsDiff(const TestOrcaReport& expected,
122 const TestOrcaReport& actual) {
123 auto error = ValuesDiff("cpu_utilization", expected.cpu_utilization(),
124 actual.cpu_utilization());
125 if (error.has_value()) {
126 return error;
127 }
128 error = ValuesDiff("mem_utilization", expected.memory_utilization(),
129 actual.memory_utilization());
130 if (error.has_value()) {
131 return error;
132 }
133 error =
134 MapsDiff("request_cost", expected.request_cost(), actual.request_cost());
135 if (error.has_value()) {
136 return error;
137 }
138 error = MapsDiff("utilization", expected.utilization(), actual.utilization());
139 if (error.has_value()) {
140 return error;
141 }
142 return absl::nullopt;
143 }
144 } // namespace
145
ServiceStub(ChannelCreationFunc channel_creation_func,bool new_stub_every_call)146 InteropClient::ServiceStub::ServiceStub(
147 ChannelCreationFunc channel_creation_func, bool new_stub_every_call)
148 : channel_creation_func_(std::move(channel_creation_func)),
149 new_stub_every_call_(new_stub_every_call) {}
150
Get()151 TestService::Stub* InteropClient::ServiceStub::Get() {
152 if (new_stub_every_call_ || stub_ == nullptr) {
153 if (channel_ == nullptr) {
154 channel_ = channel_creation_func_();
155 }
156 stub_ = TestService::NewStub(channel_);
157 }
158 return stub_.get();
159 }
160
161 UnimplementedService::Stub*
GetUnimplementedServiceStub()162 InteropClient::ServiceStub::GetUnimplementedServiceStub() {
163 if (unimplemented_service_stub_ == nullptr) {
164 if (channel_ == nullptr) {
165 channel_ = channel_creation_func_();
166 }
167 unimplemented_service_stub_ = UnimplementedService::NewStub(channel_);
168 }
169 return unimplemented_service_stub_.get();
170 }
171
ResetChannel()172 void InteropClient::ServiceStub::ResetChannel() {
173 channel_.reset();
174 stub_.reset();
175 }
176
InteropClient(ChannelCreationFunc channel_creation_func,bool new_stub_every_test_case,bool do_not_abort_on_transient_failures)177 InteropClient::InteropClient(ChannelCreationFunc channel_creation_func,
178 bool new_stub_every_test_case,
179 bool do_not_abort_on_transient_failures)
180 : serviceStub_(
181 [channel_creation_func = std::move(channel_creation_func), this]() {
182 return channel_creation_func(
183 load_report_tracker_.GetChannelArguments());
184 },
185 new_stub_every_test_case),
186 do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {}
187
AssertStatusOk(const Status & s,const std::string & optional_debug_string)188 bool InteropClient::AssertStatusOk(const Status& s,
189 const std::string& optional_debug_string) {
190 if (s.ok()) {
191 return true;
192 }
193
194 // Note: At this point, s.error_code is definitely not StatusCode::OK (we
195 // already checked for s.ok() above). So, the following will call abort()
196 // (unless s.error_code() corresponds to a transient failure and
197 // 'do_not_abort_on_transient_failures' is true)
198 return AssertStatusCode(s, StatusCode::OK, optional_debug_string);
199 }
200
AssertStatusCode(const Status & s,StatusCode expected_code,const std::string & optional_debug_string)201 bool InteropClient::AssertStatusCode(const Status& s, StatusCode expected_code,
202 const std::string& optional_debug_string) {
203 if (s.error_code() == expected_code) {
204 return true;
205 }
206
207 LOG(ERROR) << "Error status code: " << s.error_code()
208 << " (expected: " << expected_code
209 << "), message: " << s.error_message()
210 << ", debug string: " << optional_debug_string;
211
212 // In case of transient transient/retryable failures (like a broken
213 // connection) we may or may not abort (see TransientFailureOrAbort())
214 if (s.error_code() == grpc::StatusCode::UNAVAILABLE) {
215 return TransientFailureOrAbort();
216 }
217
218 abort();
219 }
220
DoEmpty()221 bool InteropClient::DoEmpty() {
222 VLOG(2) << "Sending an empty rpc...";
223
224 Empty request;
225 Empty response;
226 ClientContext context;
227
228 Status s = serviceStub_.Get()->EmptyCall(&context, request, &response);
229
230 if (!AssertStatusOk(s, context.debug_error_string())) {
231 return false;
232 }
233
234 VLOG(2) << "Empty rpc done.";
235 return true;
236 }
237
PerformLargeUnary(SimpleRequest * request,SimpleResponse * response)238 bool InteropClient::PerformLargeUnary(SimpleRequest* request,
239 SimpleResponse* response) {
240 return PerformLargeUnary(request, response, NoopChecks);
241 }
242
PerformLargeUnary(SimpleRequest * request,SimpleResponse * response,const CheckerFn & custom_checks_fn)243 bool InteropClient::PerformLargeUnary(SimpleRequest* request,
244 SimpleResponse* response,
245 const CheckerFn& custom_checks_fn) {
246 ClientContext context;
247 InteropClientContextInspector inspector(context);
248 request->set_response_size(kLargeResponseSize);
249 std::string payload(kLargeRequestSize, '\0');
250 request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
251 if (request->has_expect_compressed()) {
252 if (request->expect_compressed().value()) {
253 context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
254 } else {
255 context.set_compression_algorithm(GRPC_COMPRESS_NONE);
256 }
257 }
258
259 Status s = serviceStub_.Get()->UnaryCall(&context, *request, response);
260 if (!AssertStatusOk(s, context.debug_error_string())) {
261 return false;
262 }
263
264 custom_checks_fn(inspector, request, response);
265
266 // Payload related checks.
267 CHECK(response->payload().body() == std::string(kLargeResponseSize, '\0'));
268 return true;
269 }
270
DoComputeEngineCreds(const std::string & default_service_account,const std::string & oauth_scope)271 bool InteropClient::DoComputeEngineCreds(
272 const std::string& default_service_account,
273 const std::string& oauth_scope) {
274 VLOG(2) << "Sending a large unary rpc with compute engine credentials ...";
275 SimpleRequest request;
276 SimpleResponse response;
277 request.set_fill_username(true);
278 request.set_fill_oauth_scope(true);
279
280 if (!PerformLargeUnary(&request, &response)) {
281 return false;
282 }
283
284 VLOG(2) << "Got username " << response.username();
285 VLOG(2) << "Got oauth_scope " << response.oauth_scope();
286 CHECK(!response.username().empty());
287 CHECK(response.username() == default_service_account);
288 CHECK(!response.oauth_scope().empty());
289 const char* oauth_scope_str = response.oauth_scope().c_str();
290 CHECK(absl::StrContains(oauth_scope, oauth_scope_str));
291 VLOG(2) << "Large unary with compute engine creds done.";
292 return true;
293 }
294
DoOauth2AuthToken(const std::string & username,const std::string & oauth_scope)295 bool InteropClient::DoOauth2AuthToken(const std::string& username,
296 const std::string& oauth_scope) {
297 VLOG(2) << "Sending a unary rpc with raw oauth2 access token credentials ...";
298 SimpleRequest request;
299 SimpleResponse response;
300 request.set_fill_username(true);
301 request.set_fill_oauth_scope(true);
302
303 ClientContext context;
304
305 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
306
307 if (!AssertStatusOk(s, context.debug_error_string())) {
308 return false;
309 }
310
311 CHECK(!response.username().empty());
312 CHECK(!response.oauth_scope().empty());
313 CHECK(username == response.username());
314 const char* oauth_scope_str = response.oauth_scope().c_str();
315 CHECK(absl::StrContains(oauth_scope, oauth_scope_str));
316 VLOG(2) << "Unary with oauth2 access token credentials done.";
317 return true;
318 }
319
DoPerRpcCreds(const std::string & json_key)320 bool InteropClient::DoPerRpcCreds(const std::string& json_key) {
321 VLOG(2) << "Sending a unary rpc with per-rpc JWT access token ...";
322 SimpleRequest request;
323 SimpleResponse response;
324 request.set_fill_username(true);
325
326 ClientContext context;
327 std::chrono::seconds token_lifetime = std::chrono::hours(1);
328 std::shared_ptr<CallCredentials> creds =
329 ServiceAccountJWTAccessCredentials(json_key, token_lifetime.count());
330
331 context.set_credentials(creds);
332
333 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
334
335 if (!AssertStatusOk(s, context.debug_error_string())) {
336 return false;
337 }
338
339 CHECK(!response.username().empty());
340 CHECK(json_key.find(response.username()) != std::string::npos);
341 VLOG(2) << "Unary with per-rpc JWT access token done.";
342 return true;
343 }
344
DoJwtTokenCreds(const std::string & username)345 bool InteropClient::DoJwtTokenCreds(const std::string& username) {
346 VLOG(2) << "Sending a large unary rpc with JWT token credentials ...";
347 SimpleRequest request;
348 SimpleResponse response;
349 request.set_fill_username(true);
350
351 if (!PerformLargeUnary(&request, &response)) {
352 return false;
353 }
354
355 CHECK(!response.username().empty());
356 CHECK(username.find(response.username()) != std::string::npos);
357 VLOG(2) << "Large unary with JWT token creds done.";
358 return true;
359 }
360
DoGoogleDefaultCredentials(const std::string & default_service_account)361 bool InteropClient::DoGoogleDefaultCredentials(
362 const std::string& default_service_account) {
363 VLOG(2) << "Sending a large unary rpc with GoogleDefaultCredentials...";
364 SimpleRequest request;
365 SimpleResponse response;
366 request.set_fill_username(true);
367
368 if (!PerformLargeUnary(&request, &response)) {
369 return false;
370 }
371
372 VLOG(2) << "Got username " << response.username();
373 CHECK(!response.username().empty());
374 CHECK(response.username() == default_service_account);
375 VLOG(2) << "Large unary rpc with GoogleDefaultCredentials done.";
376 return true;
377 }
378
DoLargeUnary()379 bool InteropClient::DoLargeUnary() {
380 VLOG(2) << "Sending a large unary rpc...";
381 SimpleRequest request;
382 SimpleResponse response;
383 if (!PerformLargeUnary(&request, &response)) {
384 return false;
385 }
386 VLOG(2) << "Large unary done.";
387 return true;
388 }
389
DoClientCompressedUnary()390 bool InteropClient::DoClientCompressedUnary() {
391 // Probing for compression-checks support.
392 ClientContext probe_context;
393 SimpleRequest probe_req;
394 SimpleResponse probe_res;
395
396 probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE);
397 probe_req.mutable_expect_compressed()->set_value(true); // lies!
398
399 probe_req.set_response_size(kLargeResponseSize);
400 probe_req.mutable_payload()->set_body(std::string(kLargeRequestSize, '\0'));
401
402 VLOG(2) << "Sending probe for compressed unary request.";
403 const Status s =
404 serviceStub_.Get()->UnaryCall(&probe_context, probe_req, &probe_res);
405 if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) {
406 // The server isn't able to evaluate incoming compression, making the rest
407 // of this test moot.
408 VLOG(2) << "Compressed unary request probe failed";
409 return false;
410 }
411 VLOG(2) << "Compressed unary request probe succeeded. Proceeding.";
412
413 const std::vector<bool> compressions = {true, false};
414 for (size_t i = 0; i < compressions.size(); i++) {
415 std::string log_suffix =
416 absl::StrFormat("(compression=%s)", compressions[i] ? "true" : "false");
417
418 VLOG(2) << "Sending compressed unary request " << log_suffix;
419 SimpleRequest request;
420 SimpleResponse response;
421 request.mutable_expect_compressed()->set_value(compressions[i]);
422 if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) {
423 LOG(ERROR) << "Compressed unary request failed " << log_suffix;
424 return false;
425 }
426
427 VLOG(2) << "Compressed unary request failed " << log_suffix;
428 }
429
430 return true;
431 }
432
DoServerCompressedUnary()433 bool InteropClient::DoServerCompressedUnary() {
434 const std::vector<bool> compressions = {true, false};
435 for (size_t i = 0; i < compressions.size(); i++) {
436 std::string log_suffix =
437 absl::StrFormat("(compression=%s)", compressions[i] ? "true" : "false");
438
439 VLOG(2) << "Sending unary request for compressed response " << log_suffix;
440 SimpleRequest request;
441 SimpleResponse response;
442 request.mutable_response_compressed()->set_value(compressions[i]);
443
444 if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) {
445 LOG(ERROR) << "Request for compressed unary failed " << log_suffix;
446 return false;
447 }
448
449 VLOG(2) << "Request for compressed unary failed " << log_suffix;
450 }
451
452 return true;
453 }
454
455 // Either abort() (unless do_not_abort_on_transient_failures_ is true) or return
456 // false
TransientFailureOrAbort()457 bool InteropClient::TransientFailureOrAbort() {
458 if (do_not_abort_on_transient_failures_) {
459 return false;
460 }
461
462 abort();
463 }
464
DoRequestStreaming()465 bool InteropClient::DoRequestStreaming() {
466 VLOG(2) << "Sending request steaming rpc ...";
467
468 ClientContext context;
469 StreamingInputCallRequest request;
470 StreamingInputCallResponse response;
471
472 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
473 serviceStub_.Get()->StreamingInputCall(&context, &response));
474
475 int aggregated_payload_size = 0;
476 for (size_t i = 0; i < request_stream_sizes.size(); ++i) {
477 Payload* payload = request.mutable_payload();
478 payload->set_body(std::string(request_stream_sizes[i], '\0'));
479 if (!stream->Write(request)) {
480 LOG(ERROR) << "DoRequestStreaming(): stream->Write() failed.";
481 return TransientFailureOrAbort();
482 }
483 aggregated_payload_size += request_stream_sizes[i];
484 }
485 CHECK(stream->WritesDone());
486
487 Status s = stream->Finish();
488 if (!AssertStatusOk(s, context.debug_error_string())) {
489 return false;
490 }
491
492 CHECK(response.aggregated_payload_size() == aggregated_payload_size);
493 return true;
494 }
495
DoResponseStreaming()496 bool InteropClient::DoResponseStreaming() {
497 VLOG(2) << "Receiving response streaming rpc ...";
498
499 ClientContext context;
500 StreamingOutputCallRequest request;
501 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
502 ResponseParameters* response_parameter = request.add_response_parameters();
503 response_parameter->set_size(response_stream_sizes[i]);
504 }
505 StreamingOutputCallResponse response;
506 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
507 serviceStub_.Get()->StreamingOutputCall(&context, request));
508
509 unsigned int i = 0;
510 while (stream->Read(&response)) {
511 CHECK(response.payload().body() ==
512 std::string(response_stream_sizes[i], '\0'));
513 ++i;
514 }
515
516 if (i < response_stream_sizes.size()) {
517 // stream->Read() failed before reading all the expected messages. This is
518 // most likely due to connection failure.
519 LOG(ERROR) << "DoResponseStreaming(): Read fewer streams (" << i
520 << ") than response_stream_sizes.size() ("
521 << response_stream_sizes.size() << ")";
522 return TransientFailureOrAbort();
523 }
524
525 Status s = stream->Finish();
526 if (!AssertStatusOk(s, context.debug_error_string())) {
527 return false;
528 }
529
530 VLOG(2) << "Response streaming done.";
531 return true;
532 }
533
DoClientCompressedStreaming()534 bool InteropClient::DoClientCompressedStreaming() {
535 // Probing for compression-checks support.
536 ClientContext probe_context;
537 StreamingInputCallRequest probe_req;
538 StreamingInputCallResponse probe_res;
539
540 probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE);
541 probe_req.mutable_expect_compressed()->set_value(true); // lies!
542 probe_req.mutable_payload()->set_body(std::string(27182, '\0'));
543
544 VLOG(2) << "Sending probe for compressed streaming request.";
545
546 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> probe_stream(
547 serviceStub_.Get()->StreamingInputCall(&probe_context, &probe_res));
548
549 if (!probe_stream->Write(probe_req)) {
550 LOG(ERROR) << __func__ << "(): stream->Write() failed";
551 return TransientFailureOrAbort();
552 }
553 Status s = probe_stream->Finish();
554 if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) {
555 // The server isn't able to evaluate incoming compression, making the rest
556 // of this test moot.
557 VLOG(2) << "Compressed streaming request probe failed";
558 return false;
559 }
560 VLOG(2) << "Compressed streaming request probe succeeded. Proceeding.";
561
562 ClientContext context;
563 StreamingInputCallRequest request;
564 StreamingInputCallResponse response;
565
566 context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
567 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
568 serviceStub_.Get()->StreamingInputCall(&context, &response));
569
570 request.mutable_payload()->set_body(std::string(27182, '\0'));
571 request.mutable_expect_compressed()->set_value(true);
572 VLOG(2) << "Sending streaming request with compression enabled";
573 if (!stream->Write(request)) {
574 LOG(ERROR) << __func__ << "(): stream->Write() failed.";
575 return TransientFailureOrAbort();
576 }
577
578 WriteOptions wopts;
579 wopts.set_no_compression();
580 request.mutable_payload()->set_body(std::string(45904, '\0'));
581 request.mutable_expect_compressed()->set_value(false);
582 VLOG(2) << "Sending streaming request with compression disabled";
583 if (!stream->Write(request, wopts)) {
584 LOG(ERROR) << __func__ << "(): stream->Write() failed";
585 return TransientFailureOrAbort();
586 }
587 CHECK(stream->WritesDone());
588
589 s = stream->Finish();
590 return AssertStatusOk(s, context.debug_error_string());
591 }
592
DoServerCompressedStreaming()593 bool InteropClient::DoServerCompressedStreaming() {
594 const std::vector<bool> compressions = {true, false};
595 const std::vector<int> sizes = {31415, 92653};
596
597 ClientContext context;
598 InteropClientContextInspector inspector(context);
599 StreamingOutputCallRequest request;
600
601 CHECK(compressions.size() == sizes.size());
602 for (size_t i = 0; i < sizes.size(); i++) {
603 std::string log_suffix =
604 absl::StrFormat("(compression=%s; size=%d)",
605 compressions[i] ? "true" : "false", sizes[i]);
606
607 VLOG(2) << "Sending request streaming rpc " << log_suffix;
608
609 ResponseParameters* const response_parameter =
610 request.add_response_parameters();
611 response_parameter->mutable_compressed()->set_value(compressions[i]);
612 response_parameter->set_size(sizes[i]);
613 }
614 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
615 serviceStub_.Get()->StreamingOutputCall(&context, request));
616
617 size_t k = 0;
618 StreamingOutputCallResponse response;
619 while (stream->Read(&response)) {
620 // Payload size checks.
621 CHECK(response.payload().body() ==
622 std::string(request.response_parameters(k).size(), '\0'));
623
624 // Compression checks.
625 CHECK(request.response_parameters(k).has_compressed());
626 if (request.response_parameters(k).compressed().value()) {
627 CHECK(inspector.GetCallCompressionAlgorithm() > GRPC_COMPRESS_NONE);
628 CHECK(inspector.WasCompressed());
629 } else {
630 // requested *no* compression.
631 CHECK(!(inspector.WasCompressed()));
632 }
633 ++k;
634 }
635
636 if (k < sizes.size()) {
637 // stream->Read() failed before reading all the expected messages. This
638 // is most likely due to a connection failure.
639 LOG(ERROR) << __func__ << "(): Responses read (k=" << k
640 << ") is less than the expected number of messages ("
641 << sizes.size() << ").";
642 return TransientFailureOrAbort();
643 }
644
645 Status s = stream->Finish();
646 return AssertStatusOk(s, context.debug_error_string());
647 }
648
DoResponseStreamingWithSlowConsumer()649 bool InteropClient::DoResponseStreamingWithSlowConsumer() {
650 VLOG(2) << "Receiving response streaming rpc with slow consumer ...";
651
652 ClientContext context;
653 StreamingOutputCallRequest request;
654
655 for (int i = 0; i < kNumResponseMessages; ++i) {
656 ResponseParameters* response_parameter = request.add_response_parameters();
657 response_parameter->set_size(kResponseMessageSize);
658 }
659 StreamingOutputCallResponse response;
660 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
661 serviceStub_.Get()->StreamingOutputCall(&context, request));
662
663 int i = 0;
664 while (stream->Read(&response)) {
665 CHECK(response.payload().body() == std::string(kResponseMessageSize, '\0'));
666 VLOG(2) << "received message " << i;
667 gpr_sleep_until(gpr_time_add(
668 gpr_now(GPR_CLOCK_REALTIME),
669 gpr_time_from_millis(kReceiveDelayMilliSeconds, GPR_TIMESPAN)));
670 ++i;
671 }
672
673 if (i < kNumResponseMessages) {
674 LOG(ERROR) << "DoResponseStreamingWithSlowConsumer(): Responses read (i="
675 << i
676 << ") is less than the expected messages (i.e "
677 "kNumResponseMessages = "
678 << kNumResponseMessages << ").";
679
680 return TransientFailureOrAbort();
681 }
682
683 Status s = stream->Finish();
684 if (!AssertStatusOk(s, context.debug_error_string())) {
685 return false;
686 }
687
688 VLOG(2) << "Response streaming done.";
689 return true;
690 }
691
DoHalfDuplex()692 bool InteropClient::DoHalfDuplex() {
693 VLOG(2) << "Sending half-duplex streaming rpc ...";
694
695 ClientContext context;
696 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
697 StreamingOutputCallResponse>>
698 stream(serviceStub_.Get()->HalfDuplexCall(&context));
699
700 StreamingOutputCallRequest request;
701 ResponseParameters* response_parameter = request.add_response_parameters();
702 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
703 response_parameter->set_size(response_stream_sizes[i]);
704
705 if (!stream->Write(request)) {
706 LOG(ERROR) << "DoHalfDuplex(): stream->Write() failed. i=" << i;
707 return TransientFailureOrAbort();
708 }
709 }
710 stream->WritesDone();
711
712 unsigned int i = 0;
713 StreamingOutputCallResponse response;
714 while (stream->Read(&response)) {
715 CHECK(response.payload().body() ==
716 std::string(response_stream_sizes[i], '\0'));
717 ++i;
718 }
719
720 if (i < response_stream_sizes.size()) {
721 // stream->Read() failed before reading all the expected messages. This is
722 // most likely due to a connection failure
723 LOG(ERROR) << "DoHalfDuplex(): Responses read (i=" << i
724 << ") are less than the expected number of messages "
725 "response_stream_sizes.size() ("
726 << response_stream_sizes.size() << ").";
727 return TransientFailureOrAbort();
728 }
729
730 Status s = stream->Finish();
731 if (!AssertStatusOk(s, context.debug_error_string())) {
732 return false;
733 }
734
735 VLOG(2) << "Half-duplex streaming rpc done.";
736 return true;
737 }
738
DoPingPong()739 bool InteropClient::DoPingPong() {
740 VLOG(2) << "Sending Ping Pong streaming rpc ...";
741
742 ClientContext context;
743 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
744 StreamingOutputCallResponse>>
745 stream(serviceStub_.Get()->FullDuplexCall(&context));
746
747 StreamingOutputCallRequest request;
748 ResponseParameters* response_parameter = request.add_response_parameters();
749 Payload* payload = request.mutable_payload();
750 StreamingOutputCallResponse response;
751
752 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
753 response_parameter->set_size(response_stream_sizes[i]);
754 payload->set_body(std::string(request_stream_sizes[i], '\0'));
755
756 if (!stream->Write(request)) {
757 LOG(ERROR) << "DoPingPong(): stream->Write() failed. i: " << i;
758 return TransientFailureOrAbort();
759 }
760
761 if (!stream->Read(&response)) {
762 LOG(ERROR) << "DoPingPong(): stream->Read() failed. i:" << i;
763 return TransientFailureOrAbort();
764 }
765
766 CHECK(response.payload().body() ==
767 std::string(response_stream_sizes[i], '\0'));
768 }
769
770 stream->WritesDone();
771
772 CHECK(!stream->Read(&response));
773
774 Status s = stream->Finish();
775 if (!AssertStatusOk(s, context.debug_error_string())) {
776 return false;
777 }
778
779 VLOG(2) << "Ping pong streaming done.";
780 return true;
781 }
782
DoCancelAfterBegin()783 bool InteropClient::DoCancelAfterBegin() {
784 VLOG(2) << "Sending request streaming rpc ...";
785
786 ClientContext context;
787 StreamingInputCallRequest request;
788 StreamingInputCallResponse response;
789
790 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
791 serviceStub_.Get()->StreamingInputCall(&context, &response));
792
793 VLOG(2) << "Trying to cancel...";
794 context.TryCancel();
795 Status s = stream->Finish();
796
797 if (!AssertStatusCode(s, StatusCode::CANCELLED,
798 context.debug_error_string())) {
799 return false;
800 }
801
802 VLOG(2) << "Canceling streaming done.";
803 return true;
804 }
805
DoCancelAfterFirstResponse()806 bool InteropClient::DoCancelAfterFirstResponse() {
807 VLOG(2) << "Sending Ping Pong streaming rpc ...";
808
809 ClientContext context;
810 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
811 StreamingOutputCallResponse>>
812 stream(serviceStub_.Get()->FullDuplexCall(&context));
813
814 StreamingOutputCallRequest request;
815 ResponseParameters* response_parameter = request.add_response_parameters();
816 response_parameter->set_size(31415);
817 request.mutable_payload()->set_body(std::string(27182, '\0'));
818 StreamingOutputCallResponse response;
819
820 if (!stream->Write(request)) {
821 LOG(ERROR) << "DoCancelAfterFirstResponse(): stream->Write() failed";
822 return TransientFailureOrAbort();
823 }
824
825 if (!stream->Read(&response)) {
826 LOG(ERROR) << "DoCancelAfterFirstResponse(): stream->Read failed";
827 return TransientFailureOrAbort();
828 }
829 CHECK(response.payload().body() == std::string(31415, '\0'));
830
831 VLOG(2) << "Trying to cancel...";
832 context.TryCancel();
833
834 Status s = stream->Finish();
835 VLOG(2) << "Canceling pingpong streaming done.";
836 return true;
837 }
838
DoTimeoutOnSleepingServer()839 bool InteropClient::DoTimeoutOnSleepingServer() {
840 VLOG(2) << "Sending Ping Pong streaming rpc with a short deadline...";
841
842 ClientContext context;
843 std::chrono::system_clock::time_point deadline =
844 std::chrono::system_clock::now() + std::chrono::milliseconds(1);
845 context.set_deadline(deadline);
846 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
847 StreamingOutputCallResponse>>
848 stream(serviceStub_.Get()->FullDuplexCall(&context));
849
850 StreamingOutputCallRequest request;
851 request.mutable_payload()->set_body(std::string(27182, '\0'));
852 stream->Write(request);
853
854 Status s = stream->Finish();
855 if (!AssertStatusCode(s, StatusCode::DEADLINE_EXCEEDED,
856 context.debug_error_string())) {
857 return false;
858 }
859
860 VLOG(2) << "Pingpong streaming timeout done.";
861 return true;
862 }
863
DoEmptyStream()864 bool InteropClient::DoEmptyStream() {
865 VLOG(2) << "Starting empty_stream.";
866
867 ClientContext context;
868 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
869 StreamingOutputCallResponse>>
870 stream(serviceStub_.Get()->FullDuplexCall(&context));
871 stream->WritesDone();
872 StreamingOutputCallResponse response;
873 CHECK(stream->Read(&response) == false);
874
875 Status s = stream->Finish();
876 if (!AssertStatusOk(s, context.debug_error_string())) {
877 return false;
878 }
879
880 VLOG(2) << "empty_stream done.";
881 return true;
882 }
883
DoStatusWithMessage()884 bool InteropClient::DoStatusWithMessage() {
885 VLOG(2) << "Sending RPC with a request for status code 2 and message";
886
887 const grpc::StatusCode test_code = grpc::StatusCode::UNKNOWN;
888 const std::string test_msg = "This is a test message";
889
890 // Test UnaryCall.
891 ClientContext context;
892 SimpleRequest request;
893 SimpleResponse response;
894 EchoStatus* requested_status = request.mutable_response_status();
895 requested_status->set_code(test_code);
896 requested_status->set_message(test_msg);
897 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
898 if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN,
899 context.debug_error_string())) {
900 return false;
901 }
902 CHECK(s.error_message() == test_msg);
903
904 // Test FullDuplexCall.
905 ClientContext stream_context;
906 std::shared_ptr<ClientReaderWriter<StreamingOutputCallRequest,
907 StreamingOutputCallResponse>>
908 stream(serviceStub_.Get()->FullDuplexCall(&stream_context));
909 StreamingOutputCallRequest streaming_request;
910 requested_status = streaming_request.mutable_response_status();
911 requested_status->set_code(test_code);
912 requested_status->set_message(test_msg);
913 stream->Write(streaming_request);
914 stream->WritesDone();
915 StreamingOutputCallResponse streaming_response;
916 while (stream->Read(&streaming_response)) {
917 }
918 s = stream->Finish();
919 if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN,
920 context.debug_error_string())) {
921 return false;
922 }
923 CHECK(s.error_message() == test_msg);
924
925 VLOG(2) << "Done testing Status and Message";
926 return true;
927 }
928
DoSpecialStatusMessage()929 bool InteropClient::DoSpecialStatusMessage() {
930 VLOG(2) << "Sending RPC with a request for status code 2 and message - "
931 "\\t\\ntest "
932 "with whitespace\\r\\nand Unicode BMP ☺ and non-BMP \\t\\n";
933 const grpc::StatusCode test_code = grpc::StatusCode::UNKNOWN;
934 const std::string test_msg =
935 "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP \t\n";
936 ClientContext context;
937 SimpleRequest request;
938 SimpleResponse response;
939 EchoStatus* requested_status = request.mutable_response_status();
940 requested_status->set_code(test_code);
941 requested_status->set_message(test_msg);
942 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
943 if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN,
944 context.debug_error_string())) {
945 return false;
946 }
947 CHECK(s.error_message() == test_msg);
948 VLOG(2) << "Done testing Special Status Message";
949 return true;
950 }
951
DoPickFirstUnary()952 bool InteropClient::DoPickFirstUnary() {
953 const int rpcCount = 100;
954 SimpleRequest request;
955 SimpleResponse response;
956 std::string server_id;
957 request.set_fill_server_id(true);
958 for (int i = 0; i < rpcCount; i++) {
959 ClientContext context;
960 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
961 if (!AssertStatusOk(s, context.debug_error_string())) {
962 return false;
963 }
964 if (i == 0) {
965 server_id = response.server_id();
966 continue;
967 }
968 if (response.server_id() != server_id) {
969 LOG(ERROR) << "#" << i << " rpc hits server_id " << response.server_id()
970 << ", expect server_id " << server_id;
971 return false;
972 }
973 }
974 VLOG(2) << "pick first unary successfully finished";
975 return true;
976 }
977
DoOrcaPerRpc()978 bool InteropClient::DoOrcaPerRpc() {
979 load_report_tracker_.ResetCollectedLoadReports();
980 grpc_core::CoreConfiguration::RegisterBuilder(RegisterBackendMetricsLbPolicy);
981 VLOG(2) << "testing orca per rpc";
982 SimpleRequest request;
983 SimpleResponse response;
984 ClientContext context;
985 auto orca_report = request.mutable_orca_per_query_report();
986 orca_report->set_cpu_utilization(0.8210);
987 orca_report->set_memory_utilization(0.5847);
988 orca_report->mutable_request_cost()->emplace("cost", 3456.32);
989 orca_report->mutable_utilization()->emplace("util", 0.30499);
990 auto status = serviceStub_.Get()->UnaryCall(&context, request, &response);
991 if (!AssertStatusOk(status, context.debug_error_string())) {
992 return false;
993 }
994 auto report = load_report_tracker_.GetNextLoadReport();
995 CHECK(report.has_value());
996 CHECK(report->has_value());
997 auto comparison_result = OrcaLoadReportsDiff(report->value(), *orca_report);
998 LOG_IF(FATAL, comparison_result.has_value()) << comparison_result->c_str();
999 CHECK(!load_report_tracker_.GetNextLoadReport().has_value());
1000 VLOG(2) << "orca per rpc successfully finished";
1001 return true;
1002 }
1003
DoOrcaOob()1004 bool InteropClient::DoOrcaOob() {
1005 static constexpr auto kTimeout = absl::Seconds(10);
1006 LOG(INFO) << "testing orca oob";
1007 load_report_tracker_.ResetCollectedLoadReports();
1008 // Make the backup poller poll very frequently in order to pick up
1009 // updates from all the subchannels's FDs.
1010 grpc_core::ConfigVars::Overrides overrides;
1011 overrides.client_channel_backup_poll_interval_ms = 250;
1012 grpc_core::ConfigVars::SetOverrides(overrides);
1013 grpc_core::CoreConfiguration::RegisterBuilder(RegisterBackendMetricsLbPolicy);
1014 ClientContext context;
1015 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
1016 StreamingOutputCallResponse>>
1017 stream(serviceStub_.Get()->FullDuplexCall(&context));
1018 auto stream_cleanup = absl::MakeCleanup([&]() {
1019 CHECK(stream->WritesDone());
1020 CHECK(stream->Finish().ok());
1021 });
1022 {
1023 StreamingOutputCallRequest request;
1024 request.add_response_parameters()->set_size(1);
1025 TestOrcaReport* orca_report = request.mutable_orca_oob_report();
1026 orca_report->set_cpu_utilization(0.8210);
1027 orca_report->set_memory_utilization(0.5847);
1028 orca_report->mutable_utilization()->emplace("util", 0.30499);
1029 StreamingOutputCallResponse response;
1030 if (!stream->Write(request)) {
1031 LOG(ERROR) << "DoOrcaOob(): stream->Write() failed";
1032 return TransientFailureOrAbort();
1033 }
1034 if (!stream->Read(&response)) {
1035 LOG(ERROR) << "DoOrcaOob(): stream->Read failed";
1036 return TransientFailureOrAbort();
1037 }
1038 CHECK(load_report_tracker_
1039 .WaitForOobLoadReport(
1040 [orca_report](const auto& actual) {
1041 auto value = OrcaLoadReportsDiff(*orca_report, actual);
1042 if (value.has_value()) {
1043 VLOG(2) << "Reports mismatch: " << value->c_str();
1044 return false;
1045 }
1046 return true;
1047 },
1048 kTimeout, 10)
1049 .has_value());
1050 }
1051 {
1052 StreamingOutputCallRequest request;
1053 request.add_response_parameters()->set_size(1);
1054 TestOrcaReport* orca_report = request.mutable_orca_oob_report();
1055 orca_report->set_cpu_utilization(0.29309);
1056 orca_report->set_memory_utilization(0.2);
1057 orca_report->mutable_utilization()->emplace("util", 0.2039);
1058 StreamingOutputCallResponse response;
1059 if (!stream->Write(request)) {
1060 LOG(ERROR) << "DoOrcaOob(): stream->Write() failed";
1061 return TransientFailureOrAbort();
1062 }
1063 if (!stream->Read(&response)) {
1064 LOG(ERROR) << "DoOrcaOob(): stream->Read failed";
1065 return TransientFailureOrAbort();
1066 }
1067 CHECK(
1068 load_report_tracker_
1069 .WaitForOobLoadReport(
1070 [orca_report](const auto& report) {
1071 return !OrcaLoadReportsDiff(*orca_report, report).has_value();
1072 },
1073 kTimeout, 10)
1074 .has_value());
1075 }
1076 LOG(INFO) << "orca oob successfully finished";
1077 return true;
1078 }
1079
DoCustomMetadata()1080 bool InteropClient::DoCustomMetadata() {
1081 const std::string kEchoInitialMetadataKey("x-grpc-test-echo-initial");
1082 const std::string kInitialMetadataValue("test_initial_metadata_value");
1083 const std::string kEchoTrailingBinMetadataKey(
1084 "x-grpc-test-echo-trailing-bin");
1085 const std::string kTrailingBinValue("\x0a\x0b\x0a\x0b\x0a\x0b");
1086
1087 {
1088 VLOG(2) << "Sending RPC with custom metadata";
1089 ClientContext context;
1090 context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue);
1091 context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue);
1092 SimpleRequest request;
1093 SimpleResponse response;
1094 request.set_response_size(kLargeResponseSize);
1095 std::string payload(kLargeRequestSize, '\0');
1096 request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
1097
1098 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
1099 if (!AssertStatusOk(s, context.debug_error_string())) {
1100 return false;
1101 }
1102
1103 const auto& server_initial_metadata = context.GetServerInitialMetadata();
1104 auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
1105 CHECK(iter != server_initial_metadata.end());
1106 CHECK(iter->second == kInitialMetadataValue);
1107 const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
1108 iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
1109 CHECK(iter != server_trailing_metadata.end());
1110 CHECK(std::string(iter->second.begin(), iter->second.end()) ==
1111 kTrailingBinValue);
1112
1113 VLOG(2) << "Done testing RPC with custom metadata";
1114 }
1115
1116 {
1117 VLOG(2) << "Sending stream with custom metadata";
1118 ClientContext context;
1119 context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue);
1120 context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue);
1121 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
1122 StreamingOutputCallResponse>>
1123 stream(serviceStub_.Get()->FullDuplexCall(&context));
1124
1125 StreamingOutputCallRequest request;
1126 ResponseParameters* response_parameter = request.add_response_parameters();
1127 response_parameter->set_size(kLargeResponseSize);
1128 std::string payload(kLargeRequestSize, '\0');
1129 request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
1130 StreamingOutputCallResponse response;
1131
1132 if (!stream->Write(request)) {
1133 LOG(ERROR) << "DoCustomMetadata(): stream->Write() failed";
1134 return TransientFailureOrAbort();
1135 }
1136
1137 stream->WritesDone();
1138
1139 if (!stream->Read(&response)) {
1140 LOG(ERROR) << "DoCustomMetadata(): stream->Read() failed";
1141 return TransientFailureOrAbort();
1142 }
1143
1144 CHECK(response.payload().body() == std::string(kLargeResponseSize, '\0'));
1145
1146 CHECK(!stream->Read(&response));
1147
1148 Status s = stream->Finish();
1149 if (!AssertStatusOk(s, context.debug_error_string())) {
1150 return false;
1151 }
1152
1153 const auto& server_initial_metadata = context.GetServerInitialMetadata();
1154 auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
1155 CHECK(iter != server_initial_metadata.end());
1156 CHECK(iter->second == kInitialMetadataValue);
1157 const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
1158 iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
1159 CHECK(iter != server_trailing_metadata.end());
1160 CHECK(std::string(iter->second.begin(), iter->second.end()) ==
1161 kTrailingBinValue);
1162
1163 VLOG(2) << "Done testing stream with custom metadata";
1164 }
1165
1166 return true;
1167 }
1168
1169 std::tuple<bool, int32_t, std::string, std::string>
PerformOneSoakTestIteration(const bool reset_channel,const int32_t max_acceptable_per_iteration_latency_ms,const int32_t request_size,const int32_t response_size)1170 InteropClient::PerformOneSoakTestIteration(
1171 const bool reset_channel,
1172 const int32_t max_acceptable_per_iteration_latency_ms,
1173 const int32_t request_size, const int32_t response_size) {
1174 gpr_timespec start = gpr_now(GPR_CLOCK_MONOTONIC);
1175 SimpleRequest request;
1176 SimpleResponse response;
1177 // Don't set the deadline on the RPC, and instead just
1178 // record how long the RPC took and compare. This makes
1179 // debugging easier when looking at failure results.
1180 ClientContext context;
1181 InteropClientContextInspector inspector(context);
1182 request.set_response_size(response_size);
1183 std::string payload(request_size, '\0');
1184 request.mutable_payload()->set_body(payload.c_str(), request_size);
1185 if (reset_channel) {
1186 serviceStub_.ResetChannel();
1187 }
1188 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
1189 gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
1190 int32_t elapsed_ms = gpr_time_to_millis(gpr_time_sub(now, start));
1191 if (!s.ok()) {
1192 return std::make_tuple(false, elapsed_ms, context.debug_error_string(),
1193 context.peer());
1194 } else if (elapsed_ms > max_acceptable_per_iteration_latency_ms) {
1195 std::string debug_string = absl::StrFormat(
1196 "%d ms exceeds max acceptable latency: %d ms, peer: %s", elapsed_ms,
1197 max_acceptable_per_iteration_latency_ms, context.peer());
1198 return std::make_tuple(false, elapsed_ms, std::move(debug_string),
1199 context.peer());
1200 } else {
1201 return std::make_tuple(true, elapsed_ms, "", context.peer());
1202 }
1203 }
1204
PerformSoakTest(const std::string & server_uri,const bool reset_channel_per_iteration,const int32_t soak_iterations,const int32_t max_failures,const int32_t max_acceptable_per_iteration_latency_ms,const int32_t min_time_ms_between_rpcs,const int32_t overall_timeout_seconds,const int32_t request_size,const int32_t response_size)1205 void InteropClient::PerformSoakTest(
1206 const std::string& server_uri, const bool reset_channel_per_iteration,
1207 const int32_t soak_iterations, const int32_t max_failures,
1208 const int32_t max_acceptable_per_iteration_latency_ms,
1209 const int32_t min_time_ms_between_rpcs,
1210 const int32_t overall_timeout_seconds, const int32_t request_size,
1211 const int32_t response_size) {
1212 std::vector<std::tuple<bool, int32_t, std::string, std::string>> results;
1213 grpc_histogram* latencies_ms_histogram = grpc_histogram_create(
1214 1 /* resolution */,
1215 500 * 1e3 /* largest bucket; 500 seconds is unlikely */);
1216 gpr_timespec overall_deadline = gpr_time_add(
1217 gpr_now(GPR_CLOCK_MONOTONIC),
1218 gpr_time_from_seconds(overall_timeout_seconds, GPR_TIMESPAN));
1219 int32_t iterations_ran = 0;
1220 int total_failures = 0;
1221 for (int i = 0;
1222 i < soak_iterations &&
1223 gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), overall_deadline) < 0;
1224 ++i) {
1225 gpr_timespec earliest_next_start = gpr_time_add(
1226 gpr_now(GPR_CLOCK_MONOTONIC),
1227 gpr_time_from_millis(min_time_ms_between_rpcs, GPR_TIMESPAN));
1228 auto result = PerformOneSoakTestIteration(
1229 reset_channel_per_iteration, max_acceptable_per_iteration_latency_ms,
1230 request_size, response_size);
1231 bool success = std::get<0>(result);
1232 int32_t elapsed_ms = std::get<1>(result);
1233 std::string debug_string = std::get<2>(result);
1234 std::string peer = std::get<3>(result);
1235 results.push_back(result);
1236 if (!success) {
1237 LOG(INFO) << "soak iteration: " << i << " elapsed_ms: " << elapsed_ms
1238 << " peer: " << peer << " server_uri: " << server_uri
1239 << " failed: " << debug_string;
1240 total_failures++;
1241 } else {
1242 LOG(INFO) << "soak iteration: " << i << " elapsed_ms: " << elapsed_ms
1243 << " peer: " << peer << " server_uri: " << server_uri
1244 << " succeeded";
1245 }
1246 grpc_histogram_add(latencies_ms_histogram, std::get<1>(result));
1247 iterations_ran++;
1248 gpr_sleep_until(earliest_next_start);
1249 }
1250 double latency_ms_median =
1251 grpc_histogram_percentile(latencies_ms_histogram, 50);
1252 double latency_ms_90th =
1253 grpc_histogram_percentile(latencies_ms_histogram, 90);
1254 double latency_ms_worst = grpc_histogram_maximum(latencies_ms_histogram);
1255 grpc_histogram_destroy(latencies_ms_histogram);
1256 if (iterations_ran < soak_iterations) {
1257 LOG(ERROR) << "(server_uri: " << server_uri << ") soak test consumed all "
1258 << overall_timeout_seconds
1259 << " seconds of time and quit early, only having ran "
1260 << iterations_ran << " out of desired " << soak_iterations
1261 << " iterations. total_failures: " << total_failures
1262 << ". max_failures_threshold: " << max_failures
1263 << ". median_soak_iteration_latency: " << latency_ms_median
1264 << " ms. 90th_soak_iteration_latency: " << latency_ms_90th
1265 << " ms. worst_soak_iteration_latency: " << latency_ms_worst
1266 << " ms. Some or all of the iterations that did run were "
1267 "unexpectedly slow. See breakdown above for which iterations "
1268 "succeeded, failed, and why for more info.";
1269 CHECK(0);
1270 } else if (total_failures > max_failures) {
1271 LOG(ERROR) << "(server_uri: " << server_uri
1272 << ") soak test ran: " << soak_iterations
1273 << " iterations. total_failures: " << total_failures
1274 << " exceeds max_failures_threshold: " << max_failures
1275 << ". median_soak_iteration_latency: " << latency_ms_median
1276 << " ms. 90th_soak_iteration_latency: " << latency_ms_90th
1277 << " ms. worst_soak_iteration_latency: " << latency_ms_worst
1278 << " ms. See breakdown above for which iterations succeeded, "
1279 "failed, and why for more info.";
1280 CHECK(0);
1281 } else {
1282 LOG(INFO) << "(server_uri: " << server_uri
1283 << ") soak test ran: " << soak_iterations
1284 << " iterations. total_failures: " << total_failures
1285 << " is within max_failures_threshold: " << max_failures
1286 << ". median_soak_iteration_latency: " << latency_ms_median
1287 << " ms. 90th_soak_iteration_latency: " << latency_ms_90th
1288 << " ms. worst_soak_iteration_latency: " << latency_ms_worst
1289 << " ms. See breakdown above for which iterations succeeded, "
1290 "failed, and why for more info.";
1291 }
1292 }
1293
DoRpcSoakTest(const std::string & server_uri,int32_t soak_iterations,int32_t max_failures,int64_t max_acceptable_per_iteration_latency_ms,int32_t soak_min_time_ms_between_rpcs,int32_t overall_timeout_seconds,int32_t request_size,int32_t response_size)1294 bool InteropClient::DoRpcSoakTest(
1295 const std::string& server_uri, int32_t soak_iterations,
1296 int32_t max_failures, int64_t max_acceptable_per_iteration_latency_ms,
1297 int32_t soak_min_time_ms_between_rpcs, int32_t overall_timeout_seconds,
1298 int32_t request_size, int32_t response_size) {
1299 VLOG(2) << "Sending " << soak_iterations << " RPCs...";
1300 CHECK_GT(soak_iterations, 0);
1301 PerformSoakTest(server_uri, false /* reset channel per iteration */,
1302 soak_iterations, max_failures,
1303 max_acceptable_per_iteration_latency_ms,
1304 soak_min_time_ms_between_rpcs, overall_timeout_seconds,
1305 request_size, response_size);
1306 VLOG(2) << "rpc_soak test done.";
1307 return true;
1308 }
1309
DoChannelSoakTest(const std::string & server_uri,int32_t soak_iterations,int32_t max_failures,int64_t max_acceptable_per_iteration_latency_ms,int32_t soak_min_time_ms_between_rpcs,int32_t overall_timeout_seconds,int32_t request_size,int32_t response_size)1310 bool InteropClient::DoChannelSoakTest(
1311 const std::string& server_uri, int32_t soak_iterations,
1312 int32_t max_failures, int64_t max_acceptable_per_iteration_latency_ms,
1313 int32_t soak_min_time_ms_between_rpcs, int32_t overall_timeout_seconds,
1314 int32_t request_size, int32_t response_size) {
1315 VLOG(2) << "Sending " << soak_iterations
1316 << " RPCs, tearing down the channel each time...";
1317 CHECK_GT(soak_iterations, 0);
1318 PerformSoakTest(server_uri, true /* reset channel per iteration */,
1319 soak_iterations, max_failures,
1320 max_acceptable_per_iteration_latency_ms,
1321 soak_min_time_ms_between_rpcs, overall_timeout_seconds,
1322 request_size, response_size);
1323 VLOG(2) << "channel_soak test done.";
1324 return true;
1325 }
1326
DoLongLivedChannelTest(int32_t soak_iterations,int32_t iteration_interval)1327 bool InteropClient::DoLongLivedChannelTest(int32_t soak_iterations,
1328 int32_t iteration_interval) {
1329 VLOG(2) << "Sending " << soak_iterations << " RPCs...";
1330 CHECK_GT(soak_iterations, 0);
1331 CHECK_GT(iteration_interval, 0);
1332 SimpleRequest request;
1333 SimpleResponse response;
1334 int num_failures = 0;
1335 for (int i = 0; i < soak_iterations; ++i) {
1336 VLOG(2) << "Sending RPC number " << i << "...";
1337 if (!PerformLargeUnary(&request, &response)) {
1338 LOG(ERROR) << "Iteration " << i << " failed.";
1339 num_failures++;
1340 }
1341 gpr_sleep_until(
1342 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
1343 gpr_time_from_seconds(iteration_interval, GPR_TIMESPAN)));
1344 }
1345 if (num_failures == 0) {
1346 VLOG(2) << "long_lived_channel test done.";
1347 return true;
1348 } else {
1349 VLOG(2) << "long_lived_channel test failed with " << num_failures
1350 << " rpc failures.";
1351 return false;
1352 }
1353 }
1354
DoUnimplementedService()1355 bool InteropClient::DoUnimplementedService() {
1356 VLOG(2) << "Sending a request for an unimplemented service...";
1357
1358 Empty request;
1359 Empty response;
1360 ClientContext context;
1361
1362 UnimplementedService::Stub* stub = serviceStub_.GetUnimplementedServiceStub();
1363
1364 Status s = stub->UnimplementedCall(&context, request, &response);
1365
1366 if (!AssertStatusCode(s, StatusCode::UNIMPLEMENTED,
1367 context.debug_error_string())) {
1368 return false;
1369 }
1370
1371 VLOG(2) << "unimplemented service done.";
1372 return true;
1373 }
1374
DoUnimplementedMethod()1375 bool InteropClient::DoUnimplementedMethod() {
1376 VLOG(2) << "Sending a request for an unimplemented rpc...";
1377
1378 Empty request;
1379 Empty response;
1380 ClientContext context;
1381
1382 Status s =
1383 serviceStub_.Get()->UnimplementedCall(&context, request, &response);
1384
1385 if (!AssertStatusCode(s, StatusCode::UNIMPLEMENTED,
1386 context.debug_error_string())) {
1387 return false;
1388 }
1389
1390 VLOG(2) << "unimplemented rpc done.";
1391 return true;
1392 }
1393
1394 } // namespace testing
1395 } // namespace grpc
1396