• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "test/cpp/end2end/test_service_impl.h"
20 
21 #include <string>
22 #include <thread>
23 
24 #include <grpc/support/log.h>
25 #include <grpcpp/security/credentials.h>
26 #include <grpcpp/server_context.h>
27 
28 #include "src/proto/grpc/testing/echo.grpc.pb.h"
29 #include "test/cpp/util/string_ref_helper.h"
30 
31 #include <gtest/gtest.h>
32 
33 using std::chrono::system_clock;
34 
35 namespace grpc {
36 namespace testing {
37 namespace {
38 
39 // When echo_deadline is requested, deadline seen in the ServerContext is set in
40 // the response in seconds.
MaybeEchoDeadline(ServerContext * context,const EchoRequest * request,EchoResponse * response)41 void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request,
42                        EchoResponse* response) {
43   if (request->has_param() && request->param().echo_deadline()) {
44     gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
45     if (context->deadline() != system_clock::time_point::max()) {
46       Timepoint2Timespec(context->deadline(), &deadline);
47     }
48     response->mutable_param()->set_request_deadline(deadline.tv_sec);
49   }
50 }
51 
CheckServerAuthContext(const ServerContext * context,const grpc::string & expected_transport_security_type,const grpc::string & expected_client_identity)52 void CheckServerAuthContext(
53     const ServerContext* context,
54     const grpc::string& expected_transport_security_type,
55     const grpc::string& expected_client_identity) {
56   std::shared_ptr<const AuthContext> auth_ctx = context->auth_context();
57   std::vector<grpc::string_ref> tst =
58       auth_ctx->FindPropertyValues("transport_security_type");
59   EXPECT_EQ(1u, tst.size());
60   EXPECT_EQ(expected_transport_security_type, ToString(tst[0]));
61   if (expected_client_identity.empty()) {
62     EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty());
63     EXPECT_TRUE(auth_ctx->GetPeerIdentity().empty());
64     EXPECT_FALSE(auth_ctx->IsPeerAuthenticated());
65   } else {
66     auto identity = auth_ctx->GetPeerIdentity();
67     EXPECT_TRUE(auth_ctx->IsPeerAuthenticated());
68     EXPECT_EQ(1u, identity.size());
69     EXPECT_EQ(expected_client_identity, identity[0]);
70   }
71 }
72 }  // namespace
73 
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)74 Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
75                              EchoResponse* response) {
76   // A bit of sleep to make sure that short deadline tests fail
77   if (request->has_param() && request->param().server_sleep_us() > 0) {
78     gpr_sleep_until(
79         gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
80                      gpr_time_from_micros(request->param().server_sleep_us(),
81                                           GPR_TIMESPAN)));
82   }
83 
84   if (request->has_param() && request->param().server_die()) {
85     gpr_log(GPR_ERROR, "The request should not reach application handler.");
86     GPR_ASSERT(0);
87   }
88   if (request->has_param() && request->param().has_expected_error()) {
89     const auto& error = request->param().expected_error();
90     return Status(static_cast<StatusCode>(error.code()), error.error_message(),
91                   error.binary_error_details());
92   }
93   int server_try_cancel = GetIntValueFromMetadata(
94       kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
95   if (server_try_cancel > DO_NOT_CANCEL) {
96     // Since this is a unary RPC, by the time this server handler is called,
97     // the 'request' message is already read from the client. So the scenarios
98     // in server_try_cancel don't make much sense. Just cancel the RPC as long
99     // as server_try_cancel is not DO_NOT_CANCEL
100     ServerTryCancel(context);
101     return Status::CANCELLED;
102   }
103 
104   response->set_message(request->message());
105   MaybeEchoDeadline(context, request, response);
106   if (host_) {
107     response->mutable_param()->set_host(*host_);
108   }
109   if (request->has_param() && request->param().client_cancel_after_us()) {
110     {
111       std::unique_lock<std::mutex> lock(mu_);
112       signal_client_ = true;
113     }
114     while (!context->IsCancelled()) {
115       gpr_sleep_until(gpr_time_add(
116           gpr_now(GPR_CLOCK_REALTIME),
117           gpr_time_from_micros(request->param().client_cancel_after_us(),
118                                GPR_TIMESPAN)));
119     }
120     return Status::CANCELLED;
121   } else if (request->has_param() &&
122              request->param().server_cancel_after_us()) {
123     gpr_sleep_until(gpr_time_add(
124         gpr_now(GPR_CLOCK_REALTIME),
125         gpr_time_from_micros(request->param().server_cancel_after_us(),
126                              GPR_TIMESPAN)));
127     return Status::CANCELLED;
128   } else if (!request->has_param() ||
129              !request->param().skip_cancelled_check()) {
130     EXPECT_FALSE(context->IsCancelled());
131   }
132 
133   if (request->has_param() && request->param().echo_metadata()) {
134     const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
135         context->client_metadata();
136     for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator
137              iter = client_metadata.begin();
138          iter != client_metadata.end(); ++iter) {
139       context->AddTrailingMetadata(ToString(iter->first),
140                                    ToString(iter->second));
141     }
142     // Terminate rpc with error and debug info in trailer.
143     if (request->param().debug_info().stack_entries_size() ||
144         !request->param().debug_info().detail().empty()) {
145       grpc::string serialized_debug_info =
146           request->param().debug_info().SerializeAsString();
147       context->AddTrailingMetadata(kDebugInfoTrailerKey, serialized_debug_info);
148       return Status::CANCELLED;
149     }
150   }
151   if (request->has_param() &&
152       (request->param().expected_client_identity().length() > 0 ||
153        request->param().check_auth_context())) {
154     CheckServerAuthContext(context,
155                            request->param().expected_transport_security_type(),
156                            request->param().expected_client_identity());
157   }
158   if (request->has_param() && request->param().response_message_length() > 0) {
159     response->set_message(
160         grpc::string(request->param().response_message_length(), '\0'));
161   }
162   if (request->has_param() && request->param().echo_peer()) {
163     response->mutable_param()->set_peer(context->peer());
164   }
165   return Status::OK;
166 }
167 
168 // Unimplemented is left unimplemented to test the returned error.
169 
RequestStream(ServerContext * context,ServerReader<EchoRequest> * reader,EchoResponse * response)170 Status TestServiceImpl::RequestStream(ServerContext* context,
171                                       ServerReader<EchoRequest>* reader,
172                                       EchoResponse* response) {
173   // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
174   // the server by calling ServerContext::TryCancel() depending on the value:
175   //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads
176   //   any message from the client
177   //   CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
178   //   reading messages from the client
179   //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
180   //   all the messages from the client
181   int server_try_cancel = GetIntValueFromMetadata(
182       kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
183 
184   EchoRequest request;
185   response->set_message("");
186 
187   if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
188     ServerTryCancel(context);
189     return Status::CANCELLED;
190   }
191 
192   std::thread* server_try_cancel_thd = nullptr;
193   if (server_try_cancel == CANCEL_DURING_PROCESSING) {
194     server_try_cancel_thd =
195         new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
196   }
197 
198   int num_msgs_read = 0;
199   while (reader->Read(&request)) {
200     response->mutable_message()->append(request.message());
201   }
202   gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read);
203 
204   if (server_try_cancel_thd != nullptr) {
205     server_try_cancel_thd->join();
206     delete server_try_cancel_thd;
207     return Status::CANCELLED;
208   }
209 
210   if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
211     ServerTryCancel(context);
212     return Status::CANCELLED;
213   }
214 
215   return Status::OK;
216 }
217 
218 // Return 'kNumResponseStreamMsgs' messages.
219 // TODO(yangg) make it generic by adding a parameter into EchoRequest
ResponseStream(ServerContext * context,const EchoRequest * request,ServerWriter<EchoResponse> * writer)220 Status TestServiceImpl::ResponseStream(ServerContext* context,
221                                        const EchoRequest* request,
222                                        ServerWriter<EchoResponse>* writer) {
223   // If server_try_cancel is set in the metadata, the RPC is cancelled by the
224   // server by calling ServerContext::TryCancel() depending on the value:
225   //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes
226   //   any messages to the client
227   //   CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
228   //   writing messages to the client
229   //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes
230   //   all the messages to the client
231   int server_try_cancel = GetIntValueFromMetadata(
232       kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
233 
234   int server_coalescing_api = GetIntValueFromMetadata(
235       kServerUseCoalescingApi, context->client_metadata(), 0);
236 
237   int server_responses_to_send = GetIntValueFromMetadata(
238       kServerResponseStreamsToSend, context->client_metadata(),
239       kServerDefaultResponseStreamsToSend);
240 
241   if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
242     ServerTryCancel(context);
243     return Status::CANCELLED;
244   }
245 
246   EchoResponse response;
247   std::thread* server_try_cancel_thd = nullptr;
248   if (server_try_cancel == CANCEL_DURING_PROCESSING) {
249     server_try_cancel_thd =
250         new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
251   }
252 
253   for (int i = 0; i < server_responses_to_send; i++) {
254     response.set_message(request->message() + grpc::to_string(i));
255     if (i == server_responses_to_send - 1 && server_coalescing_api != 0) {
256       writer->WriteLast(response, WriteOptions());
257     } else {
258       writer->Write(response);
259     }
260   }
261 
262   if (server_try_cancel_thd != nullptr) {
263     server_try_cancel_thd->join();
264     delete server_try_cancel_thd;
265     return Status::CANCELLED;
266   }
267 
268   if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
269     ServerTryCancel(context);
270     return Status::CANCELLED;
271   }
272 
273   return Status::OK;
274 }
275 
BidiStream(ServerContext * context,ServerReaderWriter<EchoResponse,EchoRequest> * stream)276 Status TestServiceImpl::BidiStream(
277     ServerContext* context,
278     ServerReaderWriter<EchoResponse, EchoRequest>* stream) {
279   // If server_try_cancel is set in the metadata, the RPC is cancelled by the
280   // server by calling ServerContext::TryCancel() depending on the value:
281   //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/
282   //   writes any messages from/to the client
283   //   CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
284   //   reading/writing messages from/to the client
285   //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server
286   //   reads/writes all messages from/to the client
287   int server_try_cancel = GetIntValueFromMetadata(
288       kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
289 
290   EchoRequest request;
291   EchoResponse response;
292 
293   if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
294     ServerTryCancel(context);
295     return Status::CANCELLED;
296   }
297 
298   std::thread* server_try_cancel_thd = nullptr;
299   if (server_try_cancel == CANCEL_DURING_PROCESSING) {
300     server_try_cancel_thd =
301         new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
302   }
303 
304   // kServerFinishAfterNReads suggests after how many reads, the server should
305   // write the last message and send status (coalesced using WriteLast)
306   int server_write_last = GetIntValueFromMetadata(
307       kServerFinishAfterNReads, context->client_metadata(), 0);
308 
309   int read_counts = 0;
310   while (stream->Read(&request)) {
311     read_counts++;
312     gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
313     response.set_message(request.message());
314     if (read_counts == server_write_last) {
315       stream->WriteLast(response, WriteOptions());
316     } else {
317       stream->Write(response);
318     }
319   }
320 
321   if (server_try_cancel_thd != nullptr) {
322     server_try_cancel_thd->join();
323     delete server_try_cancel_thd;
324     return Status::CANCELLED;
325   }
326 
327   if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
328     ServerTryCancel(context);
329     return Status::CANCELLED;
330   }
331 
332   return Status::OK;
333 }
334 
GetIntValueFromMetadata(const char * key,const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,int default_value)335 int TestServiceImpl::GetIntValueFromMetadata(
336     const char* key,
337     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
338     int default_value) {
339   if (metadata.find(key) != metadata.end()) {
340     std::istringstream iss(ToString(metadata.find(key)->second));
341     iss >> default_value;
342     gpr_log(GPR_INFO, "%s : %d", key, default_value);
343   }
344 
345   return default_value;
346 }
347 
ServerTryCancel(ServerContext * context)348 void TestServiceImpl::ServerTryCancel(ServerContext* context) {
349   EXPECT_FALSE(context->IsCancelled());
350   context->TryCancel();
351   gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
352   // Now wait until it's really canceled
353   while (!context->IsCancelled()) {
354     gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
355                                  gpr_time_from_micros(1000, GPR_TIMESPAN)));
356   }
357 }
358 
359 }  // namespace testing
360 }  // namespace grpc
361