1 /* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H 20 #define GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H 21 22 #include <condition_variable> 23 #include <memory> 24 #include <mutex> 25 26 #include <grpc/grpc.h> 27 #include <grpc/support/log.h> 28 #include <grpcpp/alarm.h> 29 #include <grpcpp/security/credentials.h> 30 #include <grpcpp/server_context.h> 31 #include <gtest/gtest.h> 32 33 #include <string> 34 #include <thread> 35 36 #include "src/proto/grpc/testing/echo.grpc.pb.h" 37 #include "test/cpp/util/string_ref_helper.h" 38 39 using std::chrono::system_clock; 40 41 namespace grpc { 42 namespace testing { 43 44 const int kServerDefaultResponseStreamsToSend = 3; 45 const char* const kServerResponseStreamsToSend = "server_responses_to_send"; 46 const char* const kServerTryCancelRequest = "server_try_cancel"; 47 const char* const kDebugInfoTrailerKey = "debug-info-bin"; 48 const char* const kServerFinishAfterNReads = "server_finish_after_n_reads"; 49 const char* const kServerUseCoalescingApi = "server_use_coalescing_api"; 50 const char* const kCheckClientInitialMetadataKey = "custom_client_metadata"; 51 const char* const kCheckClientInitialMetadataVal = "Value for client metadata"; 52 53 typedef enum { 54 DO_NOT_CANCEL = 0, 55 CANCEL_BEFORE_PROCESSING, 56 CANCEL_DURING_PROCESSING, 57 CANCEL_AFTER_PROCESSING 58 } ServerTryCancelRequestPhase; 59 60 namespace internal { 61 // When echo_deadline is requested, deadline seen in the ServerContext is set in 62 // the response in seconds. 63 void MaybeEchoDeadline(experimental::ServerContextBase* context, 64 const EchoRequest* request, EchoResponse* response); 65 66 void CheckServerAuthContext(const experimental::ServerContextBase* context, 67 const std::string& expected_transport_security_type, 68 const std::string& expected_client_identity); 69 70 // Returns the number of pairs in metadata that exactly match the given 71 // key-value pair. Returns -1 if the pair wasn't found. 72 int MetadataMatchCount( 73 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, 74 const std::string& key, const std::string& value); 75 76 int GetIntValueFromMetadataHelper( 77 const char* key, 78 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, 79 int default_value); 80 81 int GetIntValueFromMetadata( 82 const char* key, 83 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, 84 int default_value); 85 86 void ServerTryCancel(ServerContext* context); 87 } // namespace internal 88 89 class TestServiceSignaller { 90 public: ClientWaitUntilRpcStarted()91 void ClientWaitUntilRpcStarted() { 92 std::unique_lock<std::mutex> lock(mu_); 93 cv_rpc_started_.wait(lock, [this] { return rpc_started_; }); 94 } ServerWaitToContinue()95 void ServerWaitToContinue() { 96 std::unique_lock<std::mutex> lock(mu_); 97 cv_server_continue_.wait(lock, [this] { return server_should_continue_; }); 98 } SignalClientThatRpcStarted()99 void SignalClientThatRpcStarted() { 100 std::unique_lock<std::mutex> lock(mu_); 101 rpc_started_ = true; 102 cv_rpc_started_.notify_one(); 103 } SignalServerToContinue()104 void SignalServerToContinue() { 105 std::unique_lock<std::mutex> lock(mu_); 106 server_should_continue_ = true; 107 cv_server_continue_.notify_one(); 108 } 109 110 private: 111 std::mutex mu_; 112 std::condition_variable cv_rpc_started_; 113 bool rpc_started_ /* GUARDED_BY(mu_) */ = false; 114 std::condition_variable cv_server_continue_; 115 bool server_should_continue_ /* GUARDED_BY(mu_) */ = false; 116 }; 117 118 template <typename RpcService> 119 class TestMultipleServiceImpl : public RpcService { 120 public: TestMultipleServiceImpl()121 TestMultipleServiceImpl() : signal_client_(false), host_() {} TestMultipleServiceImpl(const std::string & host)122 explicit TestMultipleServiceImpl(const std::string& host) 123 : signal_client_(false), host_(new std::string(host)) {} 124 Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)125 Status Echo(ServerContext* context, const EchoRequest* request, 126 EchoResponse* response) { 127 if (request->has_param() && 128 request->param().server_notify_client_when_started()) { 129 signaller_.SignalClientThatRpcStarted(); 130 signaller_.ServerWaitToContinue(); 131 } 132 133 // A bit of sleep to make sure that short deadline tests fail 134 if (request->has_param() && request->param().server_sleep_us() > 0) { 135 gpr_sleep_until( 136 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), 137 gpr_time_from_micros(request->param().server_sleep_us(), 138 GPR_TIMESPAN))); 139 } 140 141 if (request->has_param() && request->param().server_die()) { 142 gpr_log(GPR_ERROR, "The request should not reach application handler."); 143 GPR_ASSERT(0); 144 } 145 if (request->has_param() && request->param().has_expected_error()) { 146 const auto& error = request->param().expected_error(); 147 return Status(static_cast<StatusCode>(error.code()), 148 error.error_message(), error.binary_error_details()); 149 } 150 int server_try_cancel = internal::GetIntValueFromMetadata( 151 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); 152 if (server_try_cancel > DO_NOT_CANCEL) { 153 // Since this is a unary RPC, by the time this server handler is called, 154 // the 'request' message is already read from the client. So the scenarios 155 // in server_try_cancel don't make much sense. Just cancel the RPC as long 156 // as server_try_cancel is not DO_NOT_CANCEL 157 internal::ServerTryCancel(context); 158 return Status::CANCELLED; 159 } 160 161 response->set_message(request->message()); 162 internal::MaybeEchoDeadline(context, request, response); 163 if (host_) { 164 response->mutable_param()->set_host(*host_); 165 } 166 if (request->has_param() && request->param().client_cancel_after_us()) { 167 { 168 std::unique_lock<std::mutex> lock(mu_); 169 signal_client_ = true; 170 } 171 while (!context->IsCancelled()) { 172 gpr_sleep_until(gpr_time_add( 173 gpr_now(GPR_CLOCK_REALTIME), 174 gpr_time_from_micros(request->param().client_cancel_after_us(), 175 GPR_TIMESPAN))); 176 } 177 return Status::CANCELLED; 178 } else if (request->has_param() && 179 request->param().server_cancel_after_us()) { 180 gpr_sleep_until(gpr_time_add( 181 gpr_now(GPR_CLOCK_REALTIME), 182 gpr_time_from_micros(request->param().server_cancel_after_us(), 183 GPR_TIMESPAN))); 184 return Status::CANCELLED; 185 } else if (!request->has_param() || 186 !request->param().skip_cancelled_check()) { 187 EXPECT_FALSE(context->IsCancelled()); 188 } 189 190 if (request->has_param() && request->param().echo_metadata_initially()) { 191 const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata = 192 context->client_metadata(); 193 for (const auto& metadatum : client_metadata) { 194 context->AddInitialMetadata(ToString(metadatum.first), 195 ToString(metadatum.second)); 196 } 197 } 198 199 if (request->has_param() && request->param().echo_metadata()) { 200 const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata = 201 context->client_metadata(); 202 for (const auto& metadatum : client_metadata) { 203 context->AddTrailingMetadata(ToString(metadatum.first), 204 ToString(metadatum.second)); 205 } 206 // Terminate rpc with error and debug info in trailer. 207 if (request->param().debug_info().stack_entries_size() || 208 !request->param().debug_info().detail().empty()) { 209 std::string serialized_debug_info = 210 request->param().debug_info().SerializeAsString(); 211 context->AddTrailingMetadata(kDebugInfoTrailerKey, 212 serialized_debug_info); 213 return Status::CANCELLED; 214 } 215 } 216 if (request->has_param() && 217 (request->param().expected_client_identity().length() > 0 || 218 request->param().check_auth_context())) { 219 internal::CheckServerAuthContext( 220 context, request->param().expected_transport_security_type(), 221 request->param().expected_client_identity()); 222 } 223 if (request->has_param() && 224 request->param().response_message_length() > 0) { 225 response->set_message( 226 std::string(request->param().response_message_length(), '\0')); 227 } 228 if (request->has_param() && request->param().echo_peer()) { 229 response->mutable_param()->set_peer(context->peer()); 230 } 231 return Status::OK; 232 } 233 Echo1(ServerContext * context,const EchoRequest * request,EchoResponse * response)234 Status Echo1(ServerContext* context, const EchoRequest* request, 235 EchoResponse* response) { 236 return Echo(context, request, response); 237 } 238 Echo2(ServerContext * context,const EchoRequest * request,EchoResponse * response)239 Status Echo2(ServerContext* context, const EchoRequest* request, 240 EchoResponse* response) { 241 return Echo(context, request, response); 242 } 243 CheckClientInitialMetadata(ServerContext * context,const SimpleRequest *,SimpleResponse *)244 Status CheckClientInitialMetadata(ServerContext* context, 245 const SimpleRequest* /*request*/, 246 SimpleResponse* /*response*/) { 247 EXPECT_EQ(internal::MetadataMatchCount(context->client_metadata(), 248 kCheckClientInitialMetadataKey, 249 kCheckClientInitialMetadataVal), 250 1); 251 EXPECT_EQ(1u, 252 context->client_metadata().count(kCheckClientInitialMetadataKey)); 253 return Status::OK; 254 } 255 256 // Unimplemented is left unimplemented to test the returned error. 257 RequestStream(ServerContext * context,ServerReader<EchoRequest> * reader,EchoResponse * response)258 Status RequestStream(ServerContext* context, 259 ServerReader<EchoRequest>* reader, 260 EchoResponse* response) { 261 // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by 262 // the server by calling ServerContext::TryCancel() depending on the value: 263 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads 264 // any message from the client 265 // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is 266 // reading messages from the client 267 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads 268 // all the messages from the client 269 int server_try_cancel = internal::GetIntValueFromMetadata( 270 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); 271 272 EchoRequest request; 273 response->set_message(""); 274 275 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { 276 internal::ServerTryCancel(context); 277 return Status::CANCELLED; 278 } 279 280 std::thread* server_try_cancel_thd = nullptr; 281 if (server_try_cancel == CANCEL_DURING_PROCESSING) { 282 server_try_cancel_thd = 283 new std::thread([context] { internal::ServerTryCancel(context); }); 284 } 285 286 int num_msgs_read = 0; 287 while (reader->Read(&request)) { 288 response->mutable_message()->append(request.message()); 289 } 290 gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read); 291 292 if (server_try_cancel_thd != nullptr) { 293 server_try_cancel_thd->join(); 294 delete server_try_cancel_thd; 295 return Status::CANCELLED; 296 } 297 298 if (server_try_cancel == CANCEL_AFTER_PROCESSING) { 299 internal::ServerTryCancel(context); 300 return Status::CANCELLED; 301 } 302 303 return Status::OK; 304 } 305 306 // Return 'kNumResponseStreamMsgs' messages. 307 // TODO(yangg) make it generic by adding a parameter into EchoRequest ResponseStream(ServerContext * context,const EchoRequest * request,ServerWriter<EchoResponse> * writer)308 Status ResponseStream(ServerContext* context, const EchoRequest* request, 309 ServerWriter<EchoResponse>* writer) { 310 // If server_try_cancel is set in the metadata, the RPC is cancelled by the 311 // server by calling ServerContext::TryCancel() depending on the value: 312 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes 313 // any messages to the client 314 // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is 315 // writing messages to the client 316 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes 317 // all the messages to the client 318 int server_try_cancel = internal::GetIntValueFromMetadata( 319 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); 320 321 int server_coalescing_api = internal::GetIntValueFromMetadata( 322 kServerUseCoalescingApi, context->client_metadata(), 0); 323 324 int server_responses_to_send = internal::GetIntValueFromMetadata( 325 kServerResponseStreamsToSend, context->client_metadata(), 326 kServerDefaultResponseStreamsToSend); 327 328 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { 329 internal::ServerTryCancel(context); 330 return Status::CANCELLED; 331 } 332 333 EchoResponse response; 334 std::thread* server_try_cancel_thd = nullptr; 335 if (server_try_cancel == CANCEL_DURING_PROCESSING) { 336 server_try_cancel_thd = 337 new std::thread([context] { internal::ServerTryCancel(context); }); 338 } 339 340 for (int i = 0; i < server_responses_to_send; i++) { 341 response.set_message(request->message() + std::to_string(i)); 342 if (i == server_responses_to_send - 1 && server_coalescing_api != 0) { 343 writer->WriteLast(response, WriteOptions()); 344 } else { 345 writer->Write(response); 346 } 347 } 348 349 if (server_try_cancel_thd != nullptr) { 350 server_try_cancel_thd->join(); 351 delete server_try_cancel_thd; 352 return Status::CANCELLED; 353 } 354 355 if (server_try_cancel == CANCEL_AFTER_PROCESSING) { 356 internal::ServerTryCancel(context); 357 return Status::CANCELLED; 358 } 359 360 return Status::OK; 361 } 362 BidiStream(ServerContext * context,ServerReaderWriter<EchoResponse,EchoRequest> * stream)363 Status BidiStream(ServerContext* context, 364 ServerReaderWriter<EchoResponse, EchoRequest>* stream) { 365 // If server_try_cancel is set in the metadata, the RPC is cancelled by the 366 // server by calling ServerContext::TryCancel() depending on the value: 367 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/ 368 // writes any messages from/to the client 369 // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is 370 // reading/writing messages from/to the client 371 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server 372 // reads/writes all messages from/to the client 373 int server_try_cancel = internal::GetIntValueFromMetadata( 374 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); 375 376 EchoRequest request; 377 EchoResponse response; 378 379 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { 380 internal::ServerTryCancel(context); 381 return Status::CANCELLED; 382 } 383 384 std::thread* server_try_cancel_thd = nullptr; 385 if (server_try_cancel == CANCEL_DURING_PROCESSING) { 386 server_try_cancel_thd = 387 new std::thread([context] { internal::ServerTryCancel(context); }); 388 } 389 390 // kServerFinishAfterNReads suggests after how many reads, the server should 391 // write the last message and send status (coalesced using WriteLast) 392 int server_write_last = internal::GetIntValueFromMetadata( 393 kServerFinishAfterNReads, context->client_metadata(), 0); 394 395 int read_counts = 0; 396 while (stream->Read(&request)) { 397 read_counts++; 398 gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); 399 response.set_message(request.message()); 400 if (read_counts == server_write_last) { 401 stream->WriteLast(response, WriteOptions()); 402 } else { 403 stream->Write(response); 404 } 405 } 406 407 if (server_try_cancel_thd != nullptr) { 408 server_try_cancel_thd->join(); 409 delete server_try_cancel_thd; 410 return Status::CANCELLED; 411 } 412 413 if (server_try_cancel == CANCEL_AFTER_PROCESSING) { 414 internal::ServerTryCancel(context); 415 return Status::CANCELLED; 416 } 417 418 return Status::OK; 419 } 420 421 // Unimplemented is left unimplemented to test the returned error. signal_client()422 bool signal_client() { 423 std::unique_lock<std::mutex> lock(mu_); 424 return signal_client_; 425 } ClientWaitUntilRpcStarted()426 void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); } SignalServerToContinue()427 void SignalServerToContinue() { signaller_.SignalServerToContinue(); } 428 429 private: 430 bool signal_client_; 431 std::mutex mu_; 432 TestServiceSignaller signaller_; 433 std::unique_ptr<std::string> host_; 434 }; 435 436 class CallbackTestServiceImpl 437 : public ::grpc::testing::EchoTestService::ExperimentalCallbackService { 438 public: CallbackTestServiceImpl()439 CallbackTestServiceImpl() : signal_client_(false), host_() {} CallbackTestServiceImpl(const std::string & host)440 explicit CallbackTestServiceImpl(const std::string& host) 441 : signal_client_(false), host_(new std::string(host)) {} 442 443 experimental::ServerUnaryReactor* Echo( 444 experimental::CallbackServerContext* context, const EchoRequest* request, 445 EchoResponse* response) override; 446 447 experimental::ServerUnaryReactor* CheckClientInitialMetadata( 448 experimental::CallbackServerContext* context, const SimpleRequest*, 449 SimpleResponse*) override; 450 451 experimental::ServerReadReactor<EchoRequest>* RequestStream( 452 experimental::CallbackServerContext* context, 453 EchoResponse* response) override; 454 455 experimental::ServerWriteReactor<EchoResponse>* ResponseStream( 456 experimental::CallbackServerContext* context, 457 const EchoRequest* request) override; 458 459 experimental::ServerBidiReactor<EchoRequest, EchoResponse>* BidiStream( 460 experimental::CallbackServerContext* context) override; 461 462 // Unimplemented is left unimplemented to test the returned error. signal_client()463 bool signal_client() { 464 std::unique_lock<std::mutex> lock(mu_); 465 return signal_client_; 466 } ClientWaitUntilRpcStarted()467 void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); } SignalServerToContinue()468 void SignalServerToContinue() { signaller_.SignalServerToContinue(); } 469 470 private: 471 bool signal_client_; 472 std::mutex mu_; 473 TestServiceSignaller signaller_; 474 std::unique_ptr<std::string> host_; 475 }; 476 477 using TestServiceImpl = 478 TestMultipleServiceImpl<::grpc::testing::EchoTestService::Service>; 479 480 } // namespace testing 481 } // namespace grpc 482 483 #endif // GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H 484