1 // 2 // 3 // Copyright 2016 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H 20 #define GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H 21 22 #include <grpc/grpc.h> 23 #include <grpcpp/alarm.h> 24 #include <grpcpp/security/credentials.h> 25 #include <grpcpp/server_context.h> 26 #include <gtest/gtest.h> 27 28 #include <condition_variable> 29 #include <memory> 30 #include <mutex> 31 #include <string> 32 #include <thread> 33 34 #include "absl/log/check.h" 35 #include "absl/log/log.h" 36 #include "src/core/util/crash.h" 37 #include "src/proto/grpc/testing/echo.grpc.pb.h" 38 #include "test/core/test_util/test_config.h" 39 #include "test/cpp/util/string_ref_helper.h" 40 41 namespace grpc { 42 namespace testing { 43 44 const int kServerDefaultResponseStreamsToSend = 3; 45 const char* const kServerResponseStreamsToSend = "server_responses_to_send"; 46 const char* const kServerTryCancelRequest = "server_try_cancel"; 47 const char* const kClientTryCancelRequest = "client_try_cancel"; 48 const char* const kDebugInfoTrailerKey = "debug-info-bin"; 49 const char* const kServerFinishAfterNReads = "server_finish_after_n_reads"; 50 const char* const kServerUseCoalescingApi = "server_use_coalescing_api"; 51 const char* const kCheckClientInitialMetadataKey = "custom_client_metadata"; 52 const char* const kCheckClientInitialMetadataVal = "Value for client metadata"; 53 54 typedef enum { 55 DO_NOT_CANCEL = 0, 56 CANCEL_BEFORE_PROCESSING, 57 CANCEL_DURING_PROCESSING, 58 CANCEL_AFTER_PROCESSING 59 } ServerTryCancelRequestPhase; 60 61 namespace internal { 62 // When echo_deadline is requested, deadline seen in the ServerContext is set in 63 // the response in seconds. 64 void MaybeEchoDeadline(ServerContextBase* context, const EchoRequest* request, 65 EchoResponse* response); 66 67 void CheckServerAuthContext(const ServerContextBase* context, 68 const std::string& expected_transport_security_type, 69 const std::string& expected_client_identity); 70 71 // Returns the number of pairs in metadata that exactly match the given 72 // key-value pair. Returns -1 if the pair wasn't found. 73 int MetadataMatchCount( 74 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, 75 const std::string& key, const std::string& value); 76 77 int GetIntValueFromMetadataHelper( 78 const char* key, 79 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, 80 int default_value); 81 82 int GetIntValueFromMetadata( 83 const char* key, 84 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, 85 int default_value); 86 87 void ServerTryCancel(ServerContext* context); 88 } // namespace internal 89 90 class TestServiceSignaller { 91 public: 92 // Waits for at least *desired_rpcs* to to be waiting for a server 93 // continue notification. 94 // Returns when *desired_rpcs* reaches that amount, or when we've 95 // surpassed the timeout, whichever happens first. The return value 96 // is whatever the number of RPCs waiting for server notification is 97 // at that time. ClientWaitUntilNRpcsStarted(int desired_rpcs,absl::Duration timeout)98 int ClientWaitUntilNRpcsStarted(int desired_rpcs, absl::Duration timeout) { 99 VLOG(2) << "*** enter ClientWaitUntilNRpcsStarted ***"; 100 absl::Time deadline = absl::Now() + timeout; 101 std::chrono::system_clock::time_point chrono_deadline = 102 absl::ToChronoTime(deadline); 103 std::unique_lock<std::mutex> lock(mu_); 104 cv_rpc_started_.wait_until(lock, chrono_deadline, [this, desired_rpcs] { 105 VLOG(2) << "*** desired_rpcs: " << desired_rpcs 106 << " rpcs_waiting_for_server_to_continue_: " 107 << rpcs_waiting_for_server_to_continue_ << " ***"; 108 return rpcs_waiting_for_server_to_continue_ >= desired_rpcs; 109 }); 110 VLOG(2) << "*** leave ClientWaitUntilNRpcsStarted ***"; 111 return rpcs_waiting_for_server_to_continue_; 112 } ServerWaitToContinue()113 void ServerWaitToContinue() { 114 VLOG(2) << "*** enter ServerWaitToContinue ***"; 115 std::unique_lock<std::mutex> lock(mu_); 116 cv_server_continue_.wait(lock, [this] { return server_should_continue_; }); 117 VLOG(2) << "*** leave ServerWaitToContinue ***"; 118 } SignalClientThatRpcStarted()119 void SignalClientThatRpcStarted() { 120 VLOG(2) << "*** SignalClientThatRpcStarted ***"; 121 std::unique_lock<std::mutex> lock(mu_); 122 ++rpcs_waiting_for_server_to_continue_; 123 cv_rpc_started_.notify_all(); 124 } SignalServerToContinue()125 void SignalServerToContinue() { 126 VLOG(2) << "*** SignalServerToContinue ***"; 127 std::unique_lock<std::mutex> lock(mu_); 128 server_should_continue_ = true; 129 cv_server_continue_.notify_all(); 130 } Reset()131 void Reset() { 132 std::unique_lock<std::mutex> lock(mu_); 133 rpcs_waiting_for_server_to_continue_ = 0; 134 server_should_continue_ = false; 135 } 136 137 private: 138 std::mutex mu_; 139 std::condition_variable cv_rpc_started_; 140 int rpcs_waiting_for_server_to_continue_ /* GUARDED_BY(mu_) */ = 0; 141 std::condition_variable cv_server_continue_; 142 bool server_should_continue_ /* GUARDED_BY(mu_) */ = false; 143 }; 144 145 template <typename RpcService> 146 class TestMultipleServiceImpl : public RpcService { 147 public: TestMultipleServiceImpl()148 TestMultipleServiceImpl() : signal_client_(false), host_() {} TestMultipleServiceImpl(const std::string & host)149 explicit TestMultipleServiceImpl(const std::string& host) 150 : signal_client_(false), host_(new std::string(host)) {} 151 Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)152 Status Echo(ServerContext* context, const EchoRequest* request, 153 EchoResponse* response) { 154 if (request->has_param() && 155 request->param().server_notify_client_when_started()) { 156 signaller_.SignalClientThatRpcStarted(); 157 signaller_.ServerWaitToContinue(); 158 } 159 160 // A bit of sleep to make sure that short deadline tests fail 161 if (request->has_param() && request->param().server_sleep_us() > 0) { 162 gpr_sleep_until(gpr_time_add( 163 gpr_now(GPR_CLOCK_MONOTONIC), 164 gpr_time_from_micros( 165 request->param().server_sleep_us() * grpc_test_slowdown_factor(), 166 GPR_TIMESPAN))); 167 } 168 169 if (request->has_param() && request->param().server_die()) { 170 LOG(ERROR) << "The request should not reach application handler."; 171 CHECK(0); 172 } 173 if (request->has_param() && request->param().has_expected_error()) { 174 const auto& error = request->param().expected_error(); 175 return Status(static_cast<StatusCode>(error.code()), 176 error.error_message(), error.binary_error_details()); 177 } 178 int server_try_cancel = internal::GetIntValueFromMetadata( 179 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); 180 if (server_try_cancel > DO_NOT_CANCEL) { 181 // Since this is a unary RPC, by the time this server handler is called, 182 // the 'request' message is already read from the client. So the scenarios 183 // in server_try_cancel don't make much sense. Just cancel the RPC as long 184 // as server_try_cancel is not DO_NOT_CANCEL 185 internal::ServerTryCancel(context); 186 return Status::CANCELLED; 187 } 188 189 response->set_message(request->message()); 190 internal::MaybeEchoDeadline(context, request, response); 191 if (host_) { 192 response->mutable_param()->set_host(*host_); 193 } else if (request->has_param() && 194 request->param().echo_host_from_authority_header()) { 195 auto authority = context->ExperimentalGetAuthority(); 196 std::string authority_str(authority.data(), authority.size()); 197 response->mutable_param()->set_host(std::move(authority_str)); 198 } 199 if (request->has_param() && request->param().client_cancel_after_us()) { 200 { 201 std::unique_lock<std::mutex> lock(mu_); 202 signal_client_ = true; 203 ++rpcs_waiting_for_client_cancel_; 204 } 205 while (!context->IsCancelled()) { 206 gpr_sleep_until(gpr_time_add( 207 gpr_now(GPR_CLOCK_REALTIME), 208 gpr_time_from_micros(request->param().client_cancel_after_us() * 209 grpc_test_slowdown_factor(), 210 GPR_TIMESPAN))); 211 } 212 { 213 std::unique_lock<std::mutex> lock(mu_); 214 --rpcs_waiting_for_client_cancel_; 215 } 216 return Status::CANCELLED; 217 } else if (request->has_param() && 218 request->param().server_cancel_after_us()) { 219 gpr_sleep_until(gpr_time_add( 220 gpr_now(GPR_CLOCK_REALTIME), 221 gpr_time_from_micros(request->param().server_cancel_after_us() * 222 grpc_test_slowdown_factor(), 223 GPR_TIMESPAN))); 224 return Status::CANCELLED; 225 } else if (!request->has_param() || 226 !request->param().skip_cancelled_check()) { 227 EXPECT_FALSE(context->IsCancelled()); 228 } 229 230 if (request->has_param() && request->param().echo_metadata_initially()) { 231 const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata = 232 context->client_metadata(); 233 for (const auto& metadatum : client_metadata) { 234 context->AddInitialMetadata(ToString(metadatum.first), 235 ToString(metadatum.second)); 236 } 237 } 238 239 if (request->has_param() && request->param().echo_metadata()) { 240 const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata = 241 context->client_metadata(); 242 for (const auto& metadatum : client_metadata) { 243 context->AddTrailingMetadata(ToString(metadatum.first), 244 ToString(metadatum.second)); 245 } 246 // Terminate rpc with error and debug info in trailer. 247 if (request->param().debug_info().stack_entries_size() || 248 !request->param().debug_info().detail().empty()) { 249 std::string serialized_debug_info = 250 request->param().debug_info().SerializeAsString(); 251 context->AddTrailingMetadata(kDebugInfoTrailerKey, 252 serialized_debug_info); 253 return Status::CANCELLED; 254 } 255 } 256 if (request->has_param() && 257 (!request->param().expected_client_identity().empty() || 258 request->param().check_auth_context())) { 259 internal::CheckServerAuthContext( 260 context, request->param().expected_transport_security_type(), 261 request->param().expected_client_identity()); 262 } 263 if (request->has_param() && 264 request->param().response_message_length() > 0) { 265 response->set_message( 266 std::string(request->param().response_message_length(), '\0')); 267 } 268 if (request->has_param() && request->param().echo_peer()) { 269 response->mutable_param()->set_peer(context->peer()); 270 } 271 return Status::OK; 272 } 273 Echo1(ServerContext * context,const EchoRequest * request,EchoResponse * response)274 Status Echo1(ServerContext* context, const EchoRequest* request, 275 EchoResponse* response) { 276 return Echo(context, request, response); 277 } 278 Echo2(ServerContext * context,const EchoRequest * request,EchoResponse * response)279 Status Echo2(ServerContext* context, const EchoRequest* request, 280 EchoResponse* response) { 281 return Echo(context, request, response); 282 } 283 CheckClientInitialMetadata(ServerContext * context,const SimpleRequest *,SimpleResponse *)284 Status CheckClientInitialMetadata(ServerContext* context, 285 const SimpleRequest* /*request*/, 286 SimpleResponse* /*response*/) { 287 EXPECT_EQ(internal::MetadataMatchCount(context->client_metadata(), 288 kCheckClientInitialMetadataKey, 289 kCheckClientInitialMetadataVal), 290 1); 291 EXPECT_EQ(1u, 292 context->client_metadata().count(kCheckClientInitialMetadataKey)); 293 return Status::OK; 294 } 295 296 // Unimplemented is left unimplemented to test the returned error. 297 RequestStream(ServerContext * context,ServerReader<EchoRequest> * reader,EchoResponse * response)298 Status RequestStream(ServerContext* context, 299 ServerReader<EchoRequest>* reader, 300 EchoResponse* response) { 301 // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by 302 // the server by calling ServerContext::TryCancel() depending on the value: 303 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads 304 // any message from the client 305 // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is 306 // reading messages from the client 307 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads 308 // all the messages from the client 309 int server_try_cancel = internal::GetIntValueFromMetadata( 310 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); 311 312 EchoRequest request; 313 response->set_message(""); 314 315 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { 316 internal::ServerTryCancel(context); 317 return Status::CANCELLED; 318 } 319 320 std::thread* server_try_cancel_thd = nullptr; 321 if (server_try_cancel == CANCEL_DURING_PROCESSING) { 322 server_try_cancel_thd = 323 new std::thread([context] { internal::ServerTryCancel(context); }); 324 } 325 326 int num_msgs_read = 0; 327 while (reader->Read(&request)) { 328 response->mutable_message()->append(request.message()); 329 } 330 LOG(INFO) << "Read: " << num_msgs_read << " messages"; 331 332 if (server_try_cancel_thd != nullptr) { 333 server_try_cancel_thd->join(); 334 delete server_try_cancel_thd; 335 return Status::CANCELLED; 336 } 337 338 if (server_try_cancel == CANCEL_AFTER_PROCESSING) { 339 internal::ServerTryCancel(context); 340 return Status::CANCELLED; 341 } 342 343 return Status::OK; 344 } 345 346 // Return 'kNumResponseStreamMsgs' messages. 347 // TODO(yangg) make it generic by adding a parameter into EchoRequest ResponseStream(ServerContext * context,const EchoRequest * request,ServerWriter<EchoResponse> * writer)348 Status ResponseStream(ServerContext* context, const EchoRequest* request, 349 ServerWriter<EchoResponse>* writer) { 350 // If server_try_cancel is set in the metadata, the RPC is cancelled by the 351 // server by calling ServerContext::TryCancel() depending on the value: 352 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes 353 // any messages to the client 354 // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is 355 // writing messages to the client 356 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes 357 // all the messages to the client 358 int server_try_cancel = internal::GetIntValueFromMetadata( 359 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); 360 361 int server_coalescing_api = internal::GetIntValueFromMetadata( 362 kServerUseCoalescingApi, context->client_metadata(), 0); 363 364 int server_responses_to_send = internal::GetIntValueFromMetadata( 365 kServerResponseStreamsToSend, context->client_metadata(), 366 kServerDefaultResponseStreamsToSend); 367 368 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { 369 internal::ServerTryCancel(context); 370 return Status::CANCELLED; 371 } 372 373 EchoResponse response; 374 std::thread* server_try_cancel_thd = nullptr; 375 if (server_try_cancel == CANCEL_DURING_PROCESSING) { 376 server_try_cancel_thd = 377 new std::thread([context] { internal::ServerTryCancel(context); }); 378 } 379 380 for (int i = 0; i < server_responses_to_send; i++) { 381 response.set_message(request->message() + std::to_string(i)); 382 if (i == server_responses_to_send - 1 && server_coalescing_api != 0) { 383 writer->WriteLast(response, WriteOptions()); 384 } else { 385 writer->Write(response); 386 } 387 } 388 389 if (server_try_cancel_thd != nullptr) { 390 server_try_cancel_thd->join(); 391 delete server_try_cancel_thd; 392 return Status::CANCELLED; 393 } 394 395 if (server_try_cancel == CANCEL_AFTER_PROCESSING) { 396 internal::ServerTryCancel(context); 397 return Status::CANCELLED; 398 } 399 400 return Status::OK; 401 } 402 BidiStream(ServerContext * context,ServerReaderWriter<EchoResponse,EchoRequest> * stream)403 Status BidiStream(ServerContext* context, 404 ServerReaderWriter<EchoResponse, EchoRequest>* stream) { 405 // If server_try_cancel is set in the metadata, the RPC is cancelled by the 406 // server by calling ServerContext::TryCancel() depending on the value: 407 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/ 408 // writes any messages from/to the client 409 // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is 410 // reading/writing messages from/to the client 411 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server 412 // reads/writes all messages from/to the client 413 int server_try_cancel = internal::GetIntValueFromMetadata( 414 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); 415 416 int client_try_cancel = static_cast<bool>(internal::GetIntValueFromMetadata( 417 kClientTryCancelRequest, context->client_metadata(), 0)); 418 419 EchoRequest request; 420 EchoResponse response; 421 422 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { 423 internal::ServerTryCancel(context); 424 return Status::CANCELLED; 425 } 426 427 std::thread* server_try_cancel_thd = nullptr; 428 if (server_try_cancel == CANCEL_DURING_PROCESSING) { 429 server_try_cancel_thd = 430 new std::thread([context] { internal::ServerTryCancel(context); }); 431 } 432 433 // kServerFinishAfterNReads suggests after how many reads, the server should 434 // write the last message and send status (coalesced using WriteLast) 435 int server_write_last = internal::GetIntValueFromMetadata( 436 kServerFinishAfterNReads, context->client_metadata(), 0); 437 438 int read_counts = 0; 439 while (stream->Read(&request)) { 440 read_counts++; 441 LOG(INFO) << "recv msg " << request.message(); 442 response.set_message(request.message()); 443 if (read_counts == server_write_last) { 444 stream->WriteLast(response, WriteOptions()); 445 break; 446 } else { 447 stream->Write(response); 448 } 449 } 450 451 if (client_try_cancel) { 452 EXPECT_TRUE(context->IsCancelled()); 453 } 454 455 if (server_try_cancel_thd != nullptr) { 456 server_try_cancel_thd->join(); 457 delete server_try_cancel_thd; 458 return Status::CANCELLED; 459 } 460 461 if (server_try_cancel == CANCEL_AFTER_PROCESSING) { 462 internal::ServerTryCancel(context); 463 return Status::CANCELLED; 464 } 465 466 return Status::OK; 467 } 468 469 // Unimplemented is left unimplemented to test the returned error. signal_client()470 bool signal_client() { 471 std::unique_lock<std::mutex> lock(mu_); 472 return signal_client_; 473 } 474 int ClientWaitUntilNRpcsStarted(int desired_rpcs, 475 absl::Duration timeout = absl::Minutes(1)) { 476 return signaller_.ClientWaitUntilNRpcsStarted(desired_rpcs, timeout); 477 } SignalServerToContinue()478 void SignalServerToContinue() { signaller_.SignalServerToContinue(); } ResetSignaller()479 void ResetSignaller() { signaller_.Reset(); } RpcsWaitingForClientCancel()480 uint64_t RpcsWaitingForClientCancel() { 481 std::unique_lock<std::mutex> lock(mu_); 482 return rpcs_waiting_for_client_cancel_; 483 } 484 485 private: 486 bool signal_client_; 487 std::mutex mu_; 488 TestServiceSignaller signaller_; 489 std::unique_ptr<std::string> host_; 490 uint64_t rpcs_waiting_for_client_cancel_ = 0; 491 }; 492 493 class CallbackTestServiceImpl 494 : public grpc::testing::EchoTestService::CallbackService { 495 public: CallbackTestServiceImpl()496 CallbackTestServiceImpl() : signal_client_(false), host_() {} CallbackTestServiceImpl(const std::string & host)497 explicit CallbackTestServiceImpl(const std::string& host) 498 : signal_client_(false), host_(new std::string(host)) {} 499 500 ServerUnaryReactor* Echo(CallbackServerContext* context, 501 const EchoRequest* request, 502 EchoResponse* response) override; 503 504 ServerUnaryReactor* CheckClientInitialMetadata(CallbackServerContext* context, 505 const SimpleRequest*, 506 SimpleResponse*) override; 507 508 ServerReadReactor<EchoRequest>* RequestStream( 509 CallbackServerContext* context, EchoResponse* response) override; 510 511 ServerWriteReactor<EchoResponse>* ResponseStream( 512 CallbackServerContext* context, const EchoRequest* request) override; 513 514 ServerBidiReactor<EchoRequest, EchoResponse>* BidiStream( 515 CallbackServerContext* context) override; 516 517 // Unimplemented is left unimplemented to test the returned error. signal_client()518 bool signal_client() { 519 std::unique_lock<std::mutex> lock(mu_); 520 return signal_client_; 521 } 522 int ClientWaitUntilNRpcsStarted(int desired_rpcs, 523 absl::Duration timeout = absl::Minutes(1)) { 524 return signaller_.ClientWaitUntilNRpcsStarted(desired_rpcs, timeout); 525 } SignalServerToContinue()526 void SignalServerToContinue() { signaller_.SignalServerToContinue(); } ResetSignaller()527 void ResetSignaller() { signaller_.Reset(); } 528 529 private: 530 bool signal_client_; 531 std::mutex mu_; 532 TestServiceSignaller signaller_; 533 std::unique_ptr<std::string> host_; 534 }; 535 536 using TestServiceImpl = 537 TestMultipleServiceImpl<grpc::testing::EchoTestService::Service>; 538 539 } // namespace testing 540 } // namespace grpc 541 542 #endif // GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H 543