• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H
20 #define GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H
21 
22 #include <condition_variable>
23 #include <memory>
24 #include <mutex>
25 
26 #include <grpc/grpc.h>
27 #include <grpc/support/log.h>
28 #include <grpcpp/alarm.h>
29 #include <grpcpp/security/credentials.h>
30 #include <grpcpp/server_context.h>
31 #include <gtest/gtest.h>
32 
33 #include <string>
34 #include <thread>
35 
36 #include "src/proto/grpc/testing/echo.grpc.pb.h"
37 #include "test/cpp/util/string_ref_helper.h"
38 
39 using std::chrono::system_clock;
40 
41 namespace grpc {
42 namespace testing {
43 
44 const int kServerDefaultResponseStreamsToSend = 3;
45 const char* const kServerResponseStreamsToSend = "server_responses_to_send";
46 const char* const kServerTryCancelRequest = "server_try_cancel";
47 const char* const kDebugInfoTrailerKey = "debug-info-bin";
48 const char* const kServerFinishAfterNReads = "server_finish_after_n_reads";
49 const char* const kServerUseCoalescingApi = "server_use_coalescing_api";
50 const char* const kCheckClientInitialMetadataKey = "custom_client_metadata";
51 const char* const kCheckClientInitialMetadataVal = "Value for client metadata";
52 
53 typedef enum {
54   DO_NOT_CANCEL = 0,
55   CANCEL_BEFORE_PROCESSING,
56   CANCEL_DURING_PROCESSING,
57   CANCEL_AFTER_PROCESSING
58 } ServerTryCancelRequestPhase;
59 
60 namespace internal {
61 // When echo_deadline is requested, deadline seen in the ServerContext is set in
62 // the response in seconds.
63 void MaybeEchoDeadline(experimental::ServerContextBase* context,
64                        const EchoRequest* request, EchoResponse* response);
65 
66 void CheckServerAuthContext(const experimental::ServerContextBase* context,
67                             const std::string& expected_transport_security_type,
68                             const std::string& expected_client_identity);
69 
70 // Returns the number of pairs in metadata that exactly match the given
71 // key-value pair. Returns -1 if the pair wasn't found.
72 int MetadataMatchCount(
73     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
74     const std::string& key, const std::string& value);
75 
76 int GetIntValueFromMetadataHelper(
77     const char* key,
78     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
79     int default_value);
80 
81 int GetIntValueFromMetadata(
82     const char* key,
83     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
84     int default_value);
85 
86 void ServerTryCancel(ServerContext* context);
87 }  // namespace internal
88 
89 class TestServiceSignaller {
90  public:
ClientWaitUntilRpcStarted()91   void ClientWaitUntilRpcStarted() {
92     std::unique_lock<std::mutex> lock(mu_);
93     cv_rpc_started_.wait(lock, [this] { return rpc_started_; });
94   }
ServerWaitToContinue()95   void ServerWaitToContinue() {
96     std::unique_lock<std::mutex> lock(mu_);
97     cv_server_continue_.wait(lock, [this] { return server_should_continue_; });
98   }
SignalClientThatRpcStarted()99   void SignalClientThatRpcStarted() {
100     std::unique_lock<std::mutex> lock(mu_);
101     rpc_started_ = true;
102     cv_rpc_started_.notify_one();
103   }
SignalServerToContinue()104   void SignalServerToContinue() {
105     std::unique_lock<std::mutex> lock(mu_);
106     server_should_continue_ = true;
107     cv_server_continue_.notify_one();
108   }
109 
110  private:
111   std::mutex mu_;
112   std::condition_variable cv_rpc_started_;
113   bool rpc_started_ /* GUARDED_BY(mu_) */ = false;
114   std::condition_variable cv_server_continue_;
115   bool server_should_continue_ /* GUARDED_BY(mu_) */ = false;
116 };
117 
118 template <typename RpcService>
119 class TestMultipleServiceImpl : public RpcService {
120  public:
TestMultipleServiceImpl()121   TestMultipleServiceImpl() : signal_client_(false), host_() {}
TestMultipleServiceImpl(const std::string & host)122   explicit TestMultipleServiceImpl(const std::string& host)
123       : signal_client_(false), host_(new std::string(host)) {}
124 
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)125   Status Echo(ServerContext* context, const EchoRequest* request,
126               EchoResponse* response) {
127     if (request->has_param() &&
128         request->param().server_notify_client_when_started()) {
129       signaller_.SignalClientThatRpcStarted();
130       signaller_.ServerWaitToContinue();
131     }
132 
133     // A bit of sleep to make sure that short deadline tests fail
134     if (request->has_param() && request->param().server_sleep_us() > 0) {
135       gpr_sleep_until(
136           gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
137                        gpr_time_from_micros(request->param().server_sleep_us(),
138                                             GPR_TIMESPAN)));
139     }
140 
141     if (request->has_param() && request->param().server_die()) {
142       gpr_log(GPR_ERROR, "The request should not reach application handler.");
143       GPR_ASSERT(0);
144     }
145     if (request->has_param() && request->param().has_expected_error()) {
146       const auto& error = request->param().expected_error();
147       return Status(static_cast<StatusCode>(error.code()),
148                     error.error_message(), error.binary_error_details());
149     }
150     int server_try_cancel = internal::GetIntValueFromMetadata(
151         kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
152     if (server_try_cancel > DO_NOT_CANCEL) {
153       // Since this is a unary RPC, by the time this server handler is called,
154       // the 'request' message is already read from the client. So the scenarios
155       // in server_try_cancel don't make much sense. Just cancel the RPC as long
156       // as server_try_cancel is not DO_NOT_CANCEL
157       internal::ServerTryCancel(context);
158       return Status::CANCELLED;
159     }
160 
161     response->set_message(request->message());
162     internal::MaybeEchoDeadline(context, request, response);
163     if (host_) {
164       response->mutable_param()->set_host(*host_);
165     }
166     if (request->has_param() && request->param().client_cancel_after_us()) {
167       {
168         std::unique_lock<std::mutex> lock(mu_);
169         signal_client_ = true;
170       }
171       while (!context->IsCancelled()) {
172         gpr_sleep_until(gpr_time_add(
173             gpr_now(GPR_CLOCK_REALTIME),
174             gpr_time_from_micros(request->param().client_cancel_after_us(),
175                                  GPR_TIMESPAN)));
176       }
177       return Status::CANCELLED;
178     } else if (request->has_param() &&
179                request->param().server_cancel_after_us()) {
180       gpr_sleep_until(gpr_time_add(
181           gpr_now(GPR_CLOCK_REALTIME),
182           gpr_time_from_micros(request->param().server_cancel_after_us(),
183                                GPR_TIMESPAN)));
184       return Status::CANCELLED;
185     } else if (!request->has_param() ||
186                !request->param().skip_cancelled_check()) {
187       EXPECT_FALSE(context->IsCancelled());
188     }
189 
190     if (request->has_param() && request->param().echo_metadata_initially()) {
191       const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
192           context->client_metadata();
193       for (const auto& metadatum : client_metadata) {
194         context->AddInitialMetadata(ToString(metadatum.first),
195                                     ToString(metadatum.second));
196       }
197     }
198 
199     if (request->has_param() && request->param().echo_metadata()) {
200       const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
201           context->client_metadata();
202       for (const auto& metadatum : client_metadata) {
203         context->AddTrailingMetadata(ToString(metadatum.first),
204                                      ToString(metadatum.second));
205       }
206       // Terminate rpc with error and debug info in trailer.
207       if (request->param().debug_info().stack_entries_size() ||
208           !request->param().debug_info().detail().empty()) {
209         std::string serialized_debug_info =
210             request->param().debug_info().SerializeAsString();
211         context->AddTrailingMetadata(kDebugInfoTrailerKey,
212                                      serialized_debug_info);
213         return Status::CANCELLED;
214       }
215     }
216     if (request->has_param() &&
217         (request->param().expected_client_identity().length() > 0 ||
218          request->param().check_auth_context())) {
219       internal::CheckServerAuthContext(
220           context, request->param().expected_transport_security_type(),
221           request->param().expected_client_identity());
222     }
223     if (request->has_param() &&
224         request->param().response_message_length() > 0) {
225       response->set_message(
226           std::string(request->param().response_message_length(), '\0'));
227     }
228     if (request->has_param() && request->param().echo_peer()) {
229       response->mutable_param()->set_peer(context->peer());
230     }
231     return Status::OK;
232   }
233 
Echo1(ServerContext * context,const EchoRequest * request,EchoResponse * response)234   Status Echo1(ServerContext* context, const EchoRequest* request,
235                EchoResponse* response) {
236     return Echo(context, request, response);
237   }
238 
Echo2(ServerContext * context,const EchoRequest * request,EchoResponse * response)239   Status Echo2(ServerContext* context, const EchoRequest* request,
240                EchoResponse* response) {
241     return Echo(context, request, response);
242   }
243 
CheckClientInitialMetadata(ServerContext * context,const SimpleRequest *,SimpleResponse *)244   Status CheckClientInitialMetadata(ServerContext* context,
245                                     const SimpleRequest* /*request*/,
246                                     SimpleResponse* /*response*/) {
247     EXPECT_EQ(internal::MetadataMatchCount(context->client_metadata(),
248                                            kCheckClientInitialMetadataKey,
249                                            kCheckClientInitialMetadataVal),
250               1);
251     EXPECT_EQ(1u,
252               context->client_metadata().count(kCheckClientInitialMetadataKey));
253     return Status::OK;
254   }
255 
256   // Unimplemented is left unimplemented to test the returned error.
257 
RequestStream(ServerContext * context,ServerReader<EchoRequest> * reader,EchoResponse * response)258   Status RequestStream(ServerContext* context,
259                        ServerReader<EchoRequest>* reader,
260                        EchoResponse* response) {
261     // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
262     // the server by calling ServerContext::TryCancel() depending on the value:
263     //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads
264     //   any message from the client
265     //   CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
266     //   reading messages from the client
267     //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
268     //   all the messages from the client
269     int server_try_cancel = internal::GetIntValueFromMetadata(
270         kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
271 
272     EchoRequest request;
273     response->set_message("");
274 
275     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
276       internal::ServerTryCancel(context);
277       return Status::CANCELLED;
278     }
279 
280     std::thread* server_try_cancel_thd = nullptr;
281     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
282       server_try_cancel_thd =
283           new std::thread([context] { internal::ServerTryCancel(context); });
284     }
285 
286     int num_msgs_read = 0;
287     while (reader->Read(&request)) {
288       response->mutable_message()->append(request.message());
289     }
290     gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read);
291 
292     if (server_try_cancel_thd != nullptr) {
293       server_try_cancel_thd->join();
294       delete server_try_cancel_thd;
295       return Status::CANCELLED;
296     }
297 
298     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
299       internal::ServerTryCancel(context);
300       return Status::CANCELLED;
301     }
302 
303     return Status::OK;
304   }
305 
306   // Return 'kNumResponseStreamMsgs' messages.
307   // TODO(yangg) make it generic by adding a parameter into EchoRequest
ResponseStream(ServerContext * context,const EchoRequest * request,ServerWriter<EchoResponse> * writer)308   Status ResponseStream(ServerContext* context, const EchoRequest* request,
309                         ServerWriter<EchoResponse>* writer) {
310     // If server_try_cancel is set in the metadata, the RPC is cancelled by the
311     // server by calling ServerContext::TryCancel() depending on the value:
312     //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes
313     //   any messages to the client
314     //   CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
315     //   writing messages to the client
316     //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes
317     //   all the messages to the client
318     int server_try_cancel = internal::GetIntValueFromMetadata(
319         kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
320 
321     int server_coalescing_api = internal::GetIntValueFromMetadata(
322         kServerUseCoalescingApi, context->client_metadata(), 0);
323 
324     int server_responses_to_send = internal::GetIntValueFromMetadata(
325         kServerResponseStreamsToSend, context->client_metadata(),
326         kServerDefaultResponseStreamsToSend);
327 
328     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
329       internal::ServerTryCancel(context);
330       return Status::CANCELLED;
331     }
332 
333     EchoResponse response;
334     std::thread* server_try_cancel_thd = nullptr;
335     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
336       server_try_cancel_thd =
337           new std::thread([context] { internal::ServerTryCancel(context); });
338     }
339 
340     for (int i = 0; i < server_responses_to_send; i++) {
341       response.set_message(request->message() + std::to_string(i));
342       if (i == server_responses_to_send - 1 && server_coalescing_api != 0) {
343         writer->WriteLast(response, WriteOptions());
344       } else {
345         writer->Write(response);
346       }
347     }
348 
349     if (server_try_cancel_thd != nullptr) {
350       server_try_cancel_thd->join();
351       delete server_try_cancel_thd;
352       return Status::CANCELLED;
353     }
354 
355     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
356       internal::ServerTryCancel(context);
357       return Status::CANCELLED;
358     }
359 
360     return Status::OK;
361   }
362 
BidiStream(ServerContext * context,ServerReaderWriter<EchoResponse,EchoRequest> * stream)363   Status BidiStream(ServerContext* context,
364                     ServerReaderWriter<EchoResponse, EchoRequest>* stream) {
365     // If server_try_cancel is set in the metadata, the RPC is cancelled by the
366     // server by calling ServerContext::TryCancel() depending on the value:
367     //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/
368     //   writes any messages from/to the client
369     //   CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
370     //   reading/writing messages from/to the client
371     //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server
372     //   reads/writes all messages from/to the client
373     int server_try_cancel = internal::GetIntValueFromMetadata(
374         kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
375 
376     EchoRequest request;
377     EchoResponse response;
378 
379     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
380       internal::ServerTryCancel(context);
381       return Status::CANCELLED;
382     }
383 
384     std::thread* server_try_cancel_thd = nullptr;
385     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
386       server_try_cancel_thd =
387           new std::thread([context] { internal::ServerTryCancel(context); });
388     }
389 
390     // kServerFinishAfterNReads suggests after how many reads, the server should
391     // write the last message and send status (coalesced using WriteLast)
392     int server_write_last = internal::GetIntValueFromMetadata(
393         kServerFinishAfterNReads, context->client_metadata(), 0);
394 
395     int read_counts = 0;
396     while (stream->Read(&request)) {
397       read_counts++;
398       gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
399       response.set_message(request.message());
400       if (read_counts == server_write_last) {
401         stream->WriteLast(response, WriteOptions());
402       } else {
403         stream->Write(response);
404       }
405     }
406 
407     if (server_try_cancel_thd != nullptr) {
408       server_try_cancel_thd->join();
409       delete server_try_cancel_thd;
410       return Status::CANCELLED;
411     }
412 
413     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
414       internal::ServerTryCancel(context);
415       return Status::CANCELLED;
416     }
417 
418     return Status::OK;
419   }
420 
421   // Unimplemented is left unimplemented to test the returned error.
signal_client()422   bool signal_client() {
423     std::unique_lock<std::mutex> lock(mu_);
424     return signal_client_;
425   }
ClientWaitUntilRpcStarted()426   void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); }
SignalServerToContinue()427   void SignalServerToContinue() { signaller_.SignalServerToContinue(); }
428 
429  private:
430   bool signal_client_;
431   std::mutex mu_;
432   TestServiceSignaller signaller_;
433   std::unique_ptr<std::string> host_;
434 };
435 
436 class CallbackTestServiceImpl
437     : public ::grpc::testing::EchoTestService::ExperimentalCallbackService {
438  public:
CallbackTestServiceImpl()439   CallbackTestServiceImpl() : signal_client_(false), host_() {}
CallbackTestServiceImpl(const std::string & host)440   explicit CallbackTestServiceImpl(const std::string& host)
441       : signal_client_(false), host_(new std::string(host)) {}
442 
443   experimental::ServerUnaryReactor* Echo(
444       experimental::CallbackServerContext* context, const EchoRequest* request,
445       EchoResponse* response) override;
446 
447   experimental::ServerUnaryReactor* CheckClientInitialMetadata(
448       experimental::CallbackServerContext* context, const SimpleRequest*,
449       SimpleResponse*) override;
450 
451   experimental::ServerReadReactor<EchoRequest>* RequestStream(
452       experimental::CallbackServerContext* context,
453       EchoResponse* response) override;
454 
455   experimental::ServerWriteReactor<EchoResponse>* ResponseStream(
456       experimental::CallbackServerContext* context,
457       const EchoRequest* request) override;
458 
459   experimental::ServerBidiReactor<EchoRequest, EchoResponse>* BidiStream(
460       experimental::CallbackServerContext* context) override;
461 
462   // Unimplemented is left unimplemented to test the returned error.
signal_client()463   bool signal_client() {
464     std::unique_lock<std::mutex> lock(mu_);
465     return signal_client_;
466   }
ClientWaitUntilRpcStarted()467   void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); }
SignalServerToContinue()468   void SignalServerToContinue() { signaller_.SignalServerToContinue(); }
469 
470  private:
471   bool signal_client_;
472   std::mutex mu_;
473   TestServiceSignaller signaller_;
474   std::unique_ptr<std::string> host_;
475 };
476 
477 using TestServiceImpl =
478     TestMultipleServiceImpl<::grpc::testing::EchoTestService::Service>;
479 
480 }  // namespace testing
481 }  // namespace grpc
482 
483 #endif  // GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H
484