• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H
20 #define GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H
21 
22 #include <grpc/grpc.h>
23 #include <grpcpp/alarm.h>
24 #include <grpcpp/security/credentials.h>
25 #include <grpcpp/server_context.h>
26 #include <gtest/gtest.h>
27 
28 #include <condition_variable>
29 #include <memory>
30 #include <mutex>
31 #include <string>
32 #include <thread>
33 
34 #include "absl/log/check.h"
35 #include "absl/log/log.h"
36 #include "src/core/util/crash.h"
37 #include "src/proto/grpc/testing/echo.grpc.pb.h"
38 #include "test/core/test_util/test_config.h"
39 #include "test/cpp/util/string_ref_helper.h"
40 
41 namespace grpc {
42 namespace testing {
43 
44 const int kServerDefaultResponseStreamsToSend = 3;
45 const char* const kServerResponseStreamsToSend = "server_responses_to_send";
46 const char* const kServerTryCancelRequest = "server_try_cancel";
47 const char* const kClientTryCancelRequest = "client_try_cancel";
48 const char* const kDebugInfoTrailerKey = "debug-info-bin";
49 const char* const kServerFinishAfterNReads = "server_finish_after_n_reads";
50 const char* const kServerUseCoalescingApi = "server_use_coalescing_api";
51 const char* const kCheckClientInitialMetadataKey = "custom_client_metadata";
52 const char* const kCheckClientInitialMetadataVal = "Value for client metadata";
53 
54 typedef enum {
55   DO_NOT_CANCEL = 0,
56   CANCEL_BEFORE_PROCESSING,
57   CANCEL_DURING_PROCESSING,
58   CANCEL_AFTER_PROCESSING
59 } ServerTryCancelRequestPhase;
60 
61 namespace internal {
62 // When echo_deadline is requested, deadline seen in the ServerContext is set in
63 // the response in seconds.
64 void MaybeEchoDeadline(ServerContextBase* context, const EchoRequest* request,
65                        EchoResponse* response);
66 
67 void CheckServerAuthContext(const ServerContextBase* context,
68                             const std::string& expected_transport_security_type,
69                             const std::string& expected_client_identity);
70 
71 // Returns the number of pairs in metadata that exactly match the given
72 // key-value pair. Returns -1 if the pair wasn't found.
73 int MetadataMatchCount(
74     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
75     const std::string& key, const std::string& value);
76 
77 int GetIntValueFromMetadataHelper(
78     const char* key,
79     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
80     int default_value);
81 
82 int GetIntValueFromMetadata(
83     const char* key,
84     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
85     int default_value);
86 
87 void ServerTryCancel(ServerContext* context);
88 }  // namespace internal
89 
90 class TestServiceSignaller {
91  public:
92   // Waits for at least *desired_rpcs* to to be waiting for a server
93   // continue notification.
94   // Returns when *desired_rpcs* reaches that amount, or when we've
95   // surpassed the timeout, whichever happens first. The return value
96   // is whatever the number of RPCs waiting for server notification is
97   // at that time.
ClientWaitUntilNRpcsStarted(int desired_rpcs,absl::Duration timeout)98   int ClientWaitUntilNRpcsStarted(int desired_rpcs, absl::Duration timeout) {
99     VLOG(2) << "*** enter ClientWaitUntilNRpcsStarted ***";
100     absl::Time deadline = absl::Now() + timeout;
101     std::chrono::system_clock::time_point chrono_deadline =
102         absl::ToChronoTime(deadline);
103     std::unique_lock<std::mutex> lock(mu_);
104     cv_rpc_started_.wait_until(lock, chrono_deadline, [this, desired_rpcs] {
105       VLOG(2) << "*** desired_rpcs: " << desired_rpcs
106               << " rpcs_waiting_for_server_to_continue_: "
107               << rpcs_waiting_for_server_to_continue_ << " ***";
108       return rpcs_waiting_for_server_to_continue_ >= desired_rpcs;
109     });
110     VLOG(2) << "*** leave ClientWaitUntilNRpcsStarted ***";
111     return rpcs_waiting_for_server_to_continue_;
112   }
ServerWaitToContinue()113   void ServerWaitToContinue() {
114     VLOG(2) << "*** enter ServerWaitToContinue ***";
115     std::unique_lock<std::mutex> lock(mu_);
116     cv_server_continue_.wait(lock, [this] { return server_should_continue_; });
117     VLOG(2) << "*** leave ServerWaitToContinue ***";
118   }
SignalClientThatRpcStarted()119   void SignalClientThatRpcStarted() {
120     VLOG(2) << "*** SignalClientThatRpcStarted ***";
121     std::unique_lock<std::mutex> lock(mu_);
122     ++rpcs_waiting_for_server_to_continue_;
123     cv_rpc_started_.notify_all();
124   }
SignalServerToContinue()125   void SignalServerToContinue() {
126     VLOG(2) << "*** SignalServerToContinue ***";
127     std::unique_lock<std::mutex> lock(mu_);
128     server_should_continue_ = true;
129     cv_server_continue_.notify_all();
130   }
Reset()131   void Reset() {
132     std::unique_lock<std::mutex> lock(mu_);
133     rpcs_waiting_for_server_to_continue_ = 0;
134     server_should_continue_ = false;
135   }
136 
137  private:
138   std::mutex mu_;
139   std::condition_variable cv_rpc_started_;
140   int rpcs_waiting_for_server_to_continue_ /* GUARDED_BY(mu_) */ = 0;
141   std::condition_variable cv_server_continue_;
142   bool server_should_continue_ /* GUARDED_BY(mu_) */ = false;
143 };
144 
145 template <typename RpcService>
146 class TestMultipleServiceImpl : public RpcService {
147  public:
TestMultipleServiceImpl()148   TestMultipleServiceImpl() : signal_client_(false), host_() {}
TestMultipleServiceImpl(const std::string & host)149   explicit TestMultipleServiceImpl(const std::string& host)
150       : signal_client_(false), host_(new std::string(host)) {}
151 
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)152   Status Echo(ServerContext* context, const EchoRequest* request,
153               EchoResponse* response) {
154     if (request->has_param() &&
155         request->param().server_notify_client_when_started()) {
156       signaller_.SignalClientThatRpcStarted();
157       signaller_.ServerWaitToContinue();
158     }
159 
160     // A bit of sleep to make sure that short deadline tests fail
161     if (request->has_param() && request->param().server_sleep_us() > 0) {
162       gpr_sleep_until(gpr_time_add(
163           gpr_now(GPR_CLOCK_MONOTONIC),
164           gpr_time_from_micros(
165               request->param().server_sleep_us() * grpc_test_slowdown_factor(),
166               GPR_TIMESPAN)));
167     }
168 
169     if (request->has_param() && request->param().server_die()) {
170       LOG(ERROR) << "The request should not reach application handler.";
171       CHECK(0);
172     }
173     if (request->has_param() && request->param().has_expected_error()) {
174       const auto& error = request->param().expected_error();
175       return Status(static_cast<StatusCode>(error.code()),
176                     error.error_message(), error.binary_error_details());
177     }
178     int server_try_cancel = internal::GetIntValueFromMetadata(
179         kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
180     if (server_try_cancel > DO_NOT_CANCEL) {
181       // Since this is a unary RPC, by the time this server handler is called,
182       // the 'request' message is already read from the client. So the scenarios
183       // in server_try_cancel don't make much sense. Just cancel the RPC as long
184       // as server_try_cancel is not DO_NOT_CANCEL
185       internal::ServerTryCancel(context);
186       return Status::CANCELLED;
187     }
188 
189     response->set_message(request->message());
190     internal::MaybeEchoDeadline(context, request, response);
191     if (host_) {
192       response->mutable_param()->set_host(*host_);
193     } else if (request->has_param() &&
194                request->param().echo_host_from_authority_header()) {
195       auto authority = context->ExperimentalGetAuthority();
196       std::string authority_str(authority.data(), authority.size());
197       response->mutable_param()->set_host(std::move(authority_str));
198     }
199     if (request->has_param() && request->param().client_cancel_after_us()) {
200       {
201         std::unique_lock<std::mutex> lock(mu_);
202         signal_client_ = true;
203         ++rpcs_waiting_for_client_cancel_;
204       }
205       while (!context->IsCancelled()) {
206         gpr_sleep_until(gpr_time_add(
207             gpr_now(GPR_CLOCK_REALTIME),
208             gpr_time_from_micros(request->param().client_cancel_after_us() *
209                                      grpc_test_slowdown_factor(),
210                                  GPR_TIMESPAN)));
211       }
212       {
213         std::unique_lock<std::mutex> lock(mu_);
214         --rpcs_waiting_for_client_cancel_;
215       }
216       return Status::CANCELLED;
217     } else if (request->has_param() &&
218                request->param().server_cancel_after_us()) {
219       gpr_sleep_until(gpr_time_add(
220           gpr_now(GPR_CLOCK_REALTIME),
221           gpr_time_from_micros(request->param().server_cancel_after_us() *
222                                    grpc_test_slowdown_factor(),
223                                GPR_TIMESPAN)));
224       return Status::CANCELLED;
225     } else if (!request->has_param() ||
226                !request->param().skip_cancelled_check()) {
227       EXPECT_FALSE(context->IsCancelled());
228     }
229 
230     if (request->has_param() && request->param().echo_metadata_initially()) {
231       const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
232           context->client_metadata();
233       for (const auto& metadatum : client_metadata) {
234         context->AddInitialMetadata(ToString(metadatum.first),
235                                     ToString(metadatum.second));
236       }
237     }
238 
239     if (request->has_param() && request->param().echo_metadata()) {
240       const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
241           context->client_metadata();
242       for (const auto& metadatum : client_metadata) {
243         context->AddTrailingMetadata(ToString(metadatum.first),
244                                      ToString(metadatum.second));
245       }
246       // Terminate rpc with error and debug info in trailer.
247       if (request->param().debug_info().stack_entries_size() ||
248           !request->param().debug_info().detail().empty()) {
249         std::string serialized_debug_info =
250             request->param().debug_info().SerializeAsString();
251         context->AddTrailingMetadata(kDebugInfoTrailerKey,
252                                      serialized_debug_info);
253         return Status::CANCELLED;
254       }
255     }
256     if (request->has_param() &&
257         (!request->param().expected_client_identity().empty() ||
258          request->param().check_auth_context())) {
259       internal::CheckServerAuthContext(
260           context, request->param().expected_transport_security_type(),
261           request->param().expected_client_identity());
262     }
263     if (request->has_param() &&
264         request->param().response_message_length() > 0) {
265       response->set_message(
266           std::string(request->param().response_message_length(), '\0'));
267     }
268     if (request->has_param() && request->param().echo_peer()) {
269       response->mutable_param()->set_peer(context->peer());
270     }
271     return Status::OK;
272   }
273 
Echo1(ServerContext * context,const EchoRequest * request,EchoResponse * response)274   Status Echo1(ServerContext* context, const EchoRequest* request,
275                EchoResponse* response) {
276     return Echo(context, request, response);
277   }
278 
Echo2(ServerContext * context,const EchoRequest * request,EchoResponse * response)279   Status Echo2(ServerContext* context, const EchoRequest* request,
280                EchoResponse* response) {
281     return Echo(context, request, response);
282   }
283 
CheckClientInitialMetadata(ServerContext * context,const SimpleRequest *,SimpleResponse *)284   Status CheckClientInitialMetadata(ServerContext* context,
285                                     const SimpleRequest* /*request*/,
286                                     SimpleResponse* /*response*/) {
287     EXPECT_EQ(internal::MetadataMatchCount(context->client_metadata(),
288                                            kCheckClientInitialMetadataKey,
289                                            kCheckClientInitialMetadataVal),
290               1);
291     EXPECT_EQ(1u,
292               context->client_metadata().count(kCheckClientInitialMetadataKey));
293     return Status::OK;
294   }
295 
296   // Unimplemented is left unimplemented to test the returned error.
297 
RequestStream(ServerContext * context,ServerReader<EchoRequest> * reader,EchoResponse * response)298   Status RequestStream(ServerContext* context,
299                        ServerReader<EchoRequest>* reader,
300                        EchoResponse* response) {
301     // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
302     // the server by calling ServerContext::TryCancel() depending on the value:
303     //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads
304     //   any message from the client
305     //   CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
306     //   reading messages from the client
307     //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
308     //   all the messages from the client
309     int server_try_cancel = internal::GetIntValueFromMetadata(
310         kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
311 
312     EchoRequest request;
313     response->set_message("");
314 
315     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
316       internal::ServerTryCancel(context);
317       return Status::CANCELLED;
318     }
319 
320     std::thread* server_try_cancel_thd = nullptr;
321     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
322       server_try_cancel_thd =
323           new std::thread([context] { internal::ServerTryCancel(context); });
324     }
325 
326     int num_msgs_read = 0;
327     while (reader->Read(&request)) {
328       response->mutable_message()->append(request.message());
329     }
330     LOG(INFO) << "Read: " << num_msgs_read << " messages";
331 
332     if (server_try_cancel_thd != nullptr) {
333       server_try_cancel_thd->join();
334       delete server_try_cancel_thd;
335       return Status::CANCELLED;
336     }
337 
338     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
339       internal::ServerTryCancel(context);
340       return Status::CANCELLED;
341     }
342 
343     return Status::OK;
344   }
345 
346   // Return 'kNumResponseStreamMsgs' messages.
347   // TODO(yangg) make it generic by adding a parameter into EchoRequest
ResponseStream(ServerContext * context,const EchoRequest * request,ServerWriter<EchoResponse> * writer)348   Status ResponseStream(ServerContext* context, const EchoRequest* request,
349                         ServerWriter<EchoResponse>* writer) {
350     // If server_try_cancel is set in the metadata, the RPC is cancelled by the
351     // server by calling ServerContext::TryCancel() depending on the value:
352     //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes
353     //   any messages to the client
354     //   CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
355     //   writing messages to the client
356     //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes
357     //   all the messages to the client
358     int server_try_cancel = internal::GetIntValueFromMetadata(
359         kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
360 
361     int server_coalescing_api = internal::GetIntValueFromMetadata(
362         kServerUseCoalescingApi, context->client_metadata(), 0);
363 
364     int server_responses_to_send = internal::GetIntValueFromMetadata(
365         kServerResponseStreamsToSend, context->client_metadata(),
366         kServerDefaultResponseStreamsToSend);
367 
368     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
369       internal::ServerTryCancel(context);
370       return Status::CANCELLED;
371     }
372 
373     EchoResponse response;
374     std::thread* server_try_cancel_thd = nullptr;
375     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
376       server_try_cancel_thd =
377           new std::thread([context] { internal::ServerTryCancel(context); });
378     }
379 
380     for (int i = 0; i < server_responses_to_send; i++) {
381       response.set_message(request->message() + std::to_string(i));
382       if (i == server_responses_to_send - 1 && server_coalescing_api != 0) {
383         writer->WriteLast(response, WriteOptions());
384       } else {
385         writer->Write(response);
386       }
387     }
388 
389     if (server_try_cancel_thd != nullptr) {
390       server_try_cancel_thd->join();
391       delete server_try_cancel_thd;
392       return Status::CANCELLED;
393     }
394 
395     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
396       internal::ServerTryCancel(context);
397       return Status::CANCELLED;
398     }
399 
400     return Status::OK;
401   }
402 
BidiStream(ServerContext * context,ServerReaderWriter<EchoResponse,EchoRequest> * stream)403   Status BidiStream(ServerContext* context,
404                     ServerReaderWriter<EchoResponse, EchoRequest>* stream) {
405     // If server_try_cancel is set in the metadata, the RPC is cancelled by the
406     // server by calling ServerContext::TryCancel() depending on the value:
407     //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/
408     //   writes any messages from/to the client
409     //   CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
410     //   reading/writing messages from/to the client
411     //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server
412     //   reads/writes all messages from/to the client
413     int server_try_cancel = internal::GetIntValueFromMetadata(
414         kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
415 
416     int client_try_cancel = static_cast<bool>(internal::GetIntValueFromMetadata(
417         kClientTryCancelRequest, context->client_metadata(), 0));
418 
419     EchoRequest request;
420     EchoResponse response;
421 
422     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
423       internal::ServerTryCancel(context);
424       return Status::CANCELLED;
425     }
426 
427     std::thread* server_try_cancel_thd = nullptr;
428     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
429       server_try_cancel_thd =
430           new std::thread([context] { internal::ServerTryCancel(context); });
431     }
432 
433     // kServerFinishAfterNReads suggests after how many reads, the server should
434     // write the last message and send status (coalesced using WriteLast)
435     int server_write_last = internal::GetIntValueFromMetadata(
436         kServerFinishAfterNReads, context->client_metadata(), 0);
437 
438     int read_counts = 0;
439     while (stream->Read(&request)) {
440       read_counts++;
441       LOG(INFO) << "recv msg " << request.message();
442       response.set_message(request.message());
443       if (read_counts == server_write_last) {
444         stream->WriteLast(response, WriteOptions());
445         break;
446       } else {
447         stream->Write(response);
448       }
449     }
450 
451     if (client_try_cancel) {
452       EXPECT_TRUE(context->IsCancelled());
453     }
454 
455     if (server_try_cancel_thd != nullptr) {
456       server_try_cancel_thd->join();
457       delete server_try_cancel_thd;
458       return Status::CANCELLED;
459     }
460 
461     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
462       internal::ServerTryCancel(context);
463       return Status::CANCELLED;
464     }
465 
466     return Status::OK;
467   }
468 
469   // Unimplemented is left unimplemented to test the returned error.
signal_client()470   bool signal_client() {
471     std::unique_lock<std::mutex> lock(mu_);
472     return signal_client_;
473   }
474   int ClientWaitUntilNRpcsStarted(int desired_rpcs,
475                                   absl::Duration timeout = absl::Minutes(1)) {
476     return signaller_.ClientWaitUntilNRpcsStarted(desired_rpcs, timeout);
477   }
SignalServerToContinue()478   void SignalServerToContinue() { signaller_.SignalServerToContinue(); }
ResetSignaller()479   void ResetSignaller() { signaller_.Reset(); }
RpcsWaitingForClientCancel()480   uint64_t RpcsWaitingForClientCancel() {
481     std::unique_lock<std::mutex> lock(mu_);
482     return rpcs_waiting_for_client_cancel_;
483   }
484 
485  private:
486   bool signal_client_;
487   std::mutex mu_;
488   TestServiceSignaller signaller_;
489   std::unique_ptr<std::string> host_;
490   uint64_t rpcs_waiting_for_client_cancel_ = 0;
491 };
492 
493 class CallbackTestServiceImpl
494     : public grpc::testing::EchoTestService::CallbackService {
495  public:
CallbackTestServiceImpl()496   CallbackTestServiceImpl() : signal_client_(false), host_() {}
CallbackTestServiceImpl(const std::string & host)497   explicit CallbackTestServiceImpl(const std::string& host)
498       : signal_client_(false), host_(new std::string(host)) {}
499 
500   ServerUnaryReactor* Echo(CallbackServerContext* context,
501                            const EchoRequest* request,
502                            EchoResponse* response) override;
503 
504   ServerUnaryReactor* CheckClientInitialMetadata(CallbackServerContext* context,
505                                                  const SimpleRequest*,
506                                                  SimpleResponse*) override;
507 
508   ServerReadReactor<EchoRequest>* RequestStream(
509       CallbackServerContext* context, EchoResponse* response) override;
510 
511   ServerWriteReactor<EchoResponse>* ResponseStream(
512       CallbackServerContext* context, const EchoRequest* request) override;
513 
514   ServerBidiReactor<EchoRequest, EchoResponse>* BidiStream(
515       CallbackServerContext* context) override;
516 
517   // Unimplemented is left unimplemented to test the returned error.
signal_client()518   bool signal_client() {
519     std::unique_lock<std::mutex> lock(mu_);
520     return signal_client_;
521   }
522   int ClientWaitUntilNRpcsStarted(int desired_rpcs,
523                                   absl::Duration timeout = absl::Minutes(1)) {
524     return signaller_.ClientWaitUntilNRpcsStarted(desired_rpcs, timeout);
525   }
SignalServerToContinue()526   void SignalServerToContinue() { signaller_.SignalServerToContinue(); }
ResetSignaller()527   void ResetSignaller() { signaller_.Reset(); }
528 
529  private:
530   bool signal_client_;
531   std::mutex mu_;
532   TestServiceSignaller signaller_;
533   std::unique_ptr<std::string> host_;
534 };
535 
536 using TestServiceImpl =
537     TestMultipleServiceImpl<grpc::testing::EchoTestService::Service>;
538 
539 }  // namespace testing
540 }  // namespace grpc
541 
542 #endif  // GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H
543