1 /*
2 *
3 * Copyright 2015-2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <fstream>
20 #include <memory>
21 #include <sstream>
22 #include <thread>
23
24 #include <gflags/gflags.h>
25 #include <grpc/grpc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/time.h>
28 #include <grpcpp/security/server_credentials.h>
29 #include <grpcpp/server.h>
30 #include <grpcpp/server_builder.h>
31 #include <grpcpp/server_context.h>
32
33 #include "src/core/lib/gpr/string.h"
34 #include "src/core/lib/transport/byte_stream.h"
35 #include "src/proto/grpc/testing/empty.pb.h"
36 #include "src/proto/grpc/testing/messages.pb.h"
37 #include "src/proto/grpc/testing/test.grpc.pb.h"
38 #include "test/cpp/interop/server_helper.h"
39 #include "test/cpp/util/test_config.h"
40
41 DEFINE_bool(use_alts, false,
42 "Whether to use alts. Enable alts will disable tls.");
43 DEFINE_bool(use_tls, false, "Whether to use tls.");
44 DEFINE_string(custom_credentials_type, "", "User provided credentials type.");
45 DEFINE_int32(port, 0, "Server port.");
46 DEFINE_int32(max_send_message_size, -1, "The maximum send message size.");
47
48 using grpc::Server;
49 using grpc::ServerBuilder;
50 using grpc::ServerContext;
51 using grpc::ServerCredentials;
52 using grpc::ServerReader;
53 using grpc::ServerReaderWriter;
54 using grpc::ServerWriter;
55 using grpc::SslServerCredentialsOptions;
56 using grpc::Status;
57 using grpc::WriteOptions;
58 using grpc::testing::InteropServerContextInspector;
59 using grpc::testing::Payload;
60 using grpc::testing::SimpleRequest;
61 using grpc::testing::SimpleResponse;
62 using grpc::testing::StreamingInputCallRequest;
63 using grpc::testing::StreamingInputCallResponse;
64 using grpc::testing::StreamingOutputCallRequest;
65 using grpc::testing::StreamingOutputCallResponse;
66 using grpc::testing::TestService;
67
68 const char kEchoInitialMetadataKey[] = "x-grpc-test-echo-initial";
69 const char kEchoTrailingBinMetadataKey[] = "x-grpc-test-echo-trailing-bin";
70 const char kEchoUserAgentKey[] = "x-grpc-test-echo-useragent";
71
MaybeEchoMetadata(ServerContext * context)72 void MaybeEchoMetadata(ServerContext* context) {
73 const auto& client_metadata = context->client_metadata();
74 GPR_ASSERT(client_metadata.count(kEchoInitialMetadataKey) <= 1);
75 GPR_ASSERT(client_metadata.count(kEchoTrailingBinMetadataKey) <= 1);
76
77 auto iter = client_metadata.find(kEchoInitialMetadataKey);
78 if (iter != client_metadata.end()) {
79 context->AddInitialMetadata(
80 kEchoInitialMetadataKey,
81 grpc::string(iter->second.begin(), iter->second.end()));
82 }
83 iter = client_metadata.find(kEchoTrailingBinMetadataKey);
84 if (iter != client_metadata.end()) {
85 context->AddTrailingMetadata(
86 kEchoTrailingBinMetadataKey,
87 grpc::string(iter->second.begin(), iter->second.end()));
88 }
89 // Check if client sent a magic key in the header that makes us echo
90 // back the user-agent (for testing purpose)
91 iter = client_metadata.find(kEchoUserAgentKey);
92 if (iter != client_metadata.end()) {
93 iter = client_metadata.find("user-agent");
94 if (iter != client_metadata.end()) {
95 context->AddInitialMetadata(
96 kEchoUserAgentKey,
97 grpc::string(iter->second.begin(), iter->second.end()));
98 }
99 }
100 }
101
SetPayload(int size,Payload * payload)102 bool SetPayload(int size, Payload* payload) {
103 std::unique_ptr<char[]> body(new char[size]());
104 payload->set_body(body.get(), size);
105 return true;
106 }
107
CheckExpectedCompression(const ServerContext & context,const bool compression_expected)108 bool CheckExpectedCompression(const ServerContext& context,
109 const bool compression_expected) {
110 const InteropServerContextInspector inspector(context);
111 const grpc_compression_algorithm received_compression =
112 inspector.GetCallCompressionAlgorithm();
113
114 if (compression_expected) {
115 if (received_compression == GRPC_COMPRESS_NONE) {
116 // Expected some compression, got NONE. This is an error.
117 gpr_log(GPR_ERROR,
118 "Expected compression but got uncompressed request from client.");
119 return false;
120 }
121 if (!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)) {
122 gpr_log(GPR_ERROR,
123 "Failure: Requested compression in a compressable request, but "
124 "compression bit in message flags not set.");
125 return false;
126 }
127 } else {
128 // Didn't expect compression -> make sure the request is uncompressed
129 if (inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS) {
130 gpr_log(GPR_ERROR,
131 "Failure: Didn't requested compression, but compression bit in "
132 "message flags set.");
133 return false;
134 }
135 }
136 return true;
137 }
138
139 class TestServiceImpl : public TestService::Service {
140 public:
EmptyCall(ServerContext * context,const grpc::testing::Empty * request,grpc::testing::Empty * response)141 Status EmptyCall(ServerContext* context, const grpc::testing::Empty* request,
142 grpc::testing::Empty* response) {
143 MaybeEchoMetadata(context);
144 return Status::OK;
145 }
146
147 // Response contains current timestamp. We ignore everything in the request.
CacheableUnaryCall(ServerContext * context,const SimpleRequest * request,SimpleResponse * response)148 Status CacheableUnaryCall(ServerContext* context,
149 const SimpleRequest* request,
150 SimpleResponse* response) {
151 gpr_timespec ts = gpr_now(GPR_CLOCK_PRECISE);
152 std::string timestamp = std::to_string((long long unsigned)ts.tv_nsec);
153 response->mutable_payload()->set_body(timestamp.c_str(), timestamp.size());
154 context->AddInitialMetadata("cache-control", "max-age=60, public");
155 return Status::OK;
156 }
157
UnaryCall(ServerContext * context,const SimpleRequest * request,SimpleResponse * response)158 Status UnaryCall(ServerContext* context, const SimpleRequest* request,
159 SimpleResponse* response) {
160 MaybeEchoMetadata(context);
161 if (request->has_response_compressed()) {
162 const bool compression_requested = request->response_compressed().value();
163 gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s",
164 compression_requested ? "enabled" : "disabled", __func__);
165 if (compression_requested) {
166 // Any level would do, let's go for HIGH because we are overachievers.
167 context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
168 } else {
169 context->set_compression_level(GRPC_COMPRESS_LEVEL_NONE);
170 }
171 }
172 if (!CheckExpectedCompression(*context,
173 request->expect_compressed().value())) {
174 return Status(grpc::StatusCode::INVALID_ARGUMENT,
175 "Compressed request expectation not met.");
176 }
177 if (request->response_size() > 0) {
178 if (!SetPayload(request->response_size(), response->mutable_payload())) {
179 return Status(grpc::StatusCode::INVALID_ARGUMENT,
180 "Error creating payload.");
181 }
182 }
183
184 if (request->has_response_status()) {
185 return Status(
186 static_cast<grpc::StatusCode>(request->response_status().code()),
187 request->response_status().message());
188 }
189
190 return Status::OK;
191 }
192
StreamingOutputCall(ServerContext * context,const StreamingOutputCallRequest * request,ServerWriter<StreamingOutputCallResponse> * writer)193 Status StreamingOutputCall(
194 ServerContext* context, const StreamingOutputCallRequest* request,
195 ServerWriter<StreamingOutputCallResponse>* writer) {
196 StreamingOutputCallResponse response;
197 bool write_success = true;
198 for (int i = 0; write_success && i < request->response_parameters_size();
199 i++) {
200 if (!SetPayload(request->response_parameters(i).size(),
201 response.mutable_payload())) {
202 return Status(grpc::StatusCode::INVALID_ARGUMENT,
203 "Error creating payload.");
204 }
205 WriteOptions wopts;
206 if (request->response_parameters(i).has_compressed()) {
207 // Compress by default. Disabled on a per-message basis.
208 context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
209 const bool compression_requested =
210 request->response_parameters(i).compressed().value();
211 gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s",
212 compression_requested ? "enabled" : "disabled", __func__);
213 if (!compression_requested) {
214 wopts.set_no_compression();
215 } // else, compression is already enabled via the context.
216 }
217 int time_us;
218 if ((time_us = request->response_parameters(i).interval_us()) > 0) {
219 // Sleep before response if needed
220 gpr_timespec sleep_time =
221 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
222 gpr_time_from_micros(time_us, GPR_TIMESPAN));
223 gpr_sleep_until(sleep_time);
224 }
225 write_success = writer->Write(response, wopts);
226 }
227 if (write_success) {
228 return Status::OK;
229 } else {
230 return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
231 }
232 }
233
StreamingInputCall(ServerContext * context,ServerReader<StreamingInputCallRequest> * reader,StreamingInputCallResponse * response)234 Status StreamingInputCall(ServerContext* context,
235 ServerReader<StreamingInputCallRequest>* reader,
236 StreamingInputCallResponse* response) {
237 StreamingInputCallRequest request;
238 int aggregated_payload_size = 0;
239 while (reader->Read(&request)) {
240 if (!CheckExpectedCompression(*context,
241 request.expect_compressed().value())) {
242 return Status(grpc::StatusCode::INVALID_ARGUMENT,
243 "Compressed request expectation not met.");
244 }
245 if (request.has_payload()) {
246 aggregated_payload_size += request.payload().body().size();
247 }
248 }
249 response->set_aggregated_payload_size(aggregated_payload_size);
250 return Status::OK;
251 }
252
FullDuplexCall(ServerContext * context,ServerReaderWriter<StreamingOutputCallResponse,StreamingOutputCallRequest> * stream)253 Status FullDuplexCall(
254 ServerContext* context,
255 ServerReaderWriter<StreamingOutputCallResponse,
256 StreamingOutputCallRequest>* stream) {
257 MaybeEchoMetadata(context);
258 StreamingOutputCallRequest request;
259 StreamingOutputCallResponse response;
260 bool write_success = true;
261 while (write_success && stream->Read(&request)) {
262 if (request.has_response_status()) {
263 return Status(
264 static_cast<grpc::StatusCode>(request.response_status().code()),
265 request.response_status().message());
266 }
267 if (request.response_parameters_size() != 0) {
268 response.mutable_payload()->set_type(request.payload().type());
269 response.mutable_payload()->set_body(
270 grpc::string(request.response_parameters(0).size(), '\0'));
271 int time_us;
272 if ((time_us = request.response_parameters(0).interval_us()) > 0) {
273 // Sleep before response if needed
274 gpr_timespec sleep_time =
275 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
276 gpr_time_from_micros(time_us, GPR_TIMESPAN));
277 gpr_sleep_until(sleep_time);
278 }
279 write_success = stream->Write(response);
280 }
281 }
282 if (write_success) {
283 return Status::OK;
284 } else {
285 return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
286 }
287 }
288
HalfDuplexCall(ServerContext * context,ServerReaderWriter<StreamingOutputCallResponse,StreamingOutputCallRequest> * stream)289 Status HalfDuplexCall(
290 ServerContext* context,
291 ServerReaderWriter<StreamingOutputCallResponse,
292 StreamingOutputCallRequest>* stream) {
293 std::vector<StreamingOutputCallRequest> requests;
294 StreamingOutputCallRequest request;
295 while (stream->Read(&request)) {
296 requests.push_back(request);
297 }
298
299 StreamingOutputCallResponse response;
300 bool write_success = true;
301 for (unsigned int i = 0; write_success && i < requests.size(); i++) {
302 response.mutable_payload()->set_type(requests[i].payload().type());
303 if (requests[i].response_parameters_size() == 0) {
304 return Status(grpc::StatusCode::INTERNAL,
305 "Request does not have response parameters.");
306 }
307 response.mutable_payload()->set_body(
308 grpc::string(requests[i].response_parameters(0).size(), '\0'));
309 write_success = stream->Write(response);
310 }
311 if (write_success) {
312 return Status::OK;
313 } else {
314 return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
315 }
316 }
317 };
318
RunServer(const std::shared_ptr<ServerCredentials> & creds)319 void grpc::testing::interop::RunServer(
320 const std::shared_ptr<ServerCredentials>& creds) {
321 RunServer(creds, FLAGS_port, nullptr, nullptr);
322 }
323
RunServer(const std::shared_ptr<ServerCredentials> & creds,std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>> server_options)324 void grpc::testing::interop::RunServer(
325 const std::shared_ptr<ServerCredentials>& creds,
326 std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>>
327 server_options) {
328 RunServer(creds, FLAGS_port, nullptr, std::move(server_options));
329 }
330
RunServer(const std::shared_ptr<ServerCredentials> & creds,const int port,ServerStartedCondition * server_started_condition)331 void grpc::testing::interop::RunServer(
332 const std::shared_ptr<ServerCredentials>& creds, const int port,
333 ServerStartedCondition* server_started_condition) {
334 RunServer(creds, port, server_started_condition, nullptr);
335 }
336
RunServer(const std::shared_ptr<ServerCredentials> & creds,const int port,ServerStartedCondition * server_started_condition,std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>> server_options)337 void grpc::testing::interop::RunServer(
338 const std::shared_ptr<ServerCredentials>& creds, const int port,
339 ServerStartedCondition* server_started_condition,
340 std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>>
341 server_options) {
342 GPR_ASSERT(port != 0);
343 std::ostringstream server_address;
344 server_address << "0.0.0.0:" << port;
345 TestServiceImpl service;
346
347 SimpleRequest request;
348 SimpleResponse response;
349
350 ServerBuilder builder;
351 builder.RegisterService(&service);
352 builder.AddListeningPort(server_address.str(), creds);
353 if (server_options != nullptr) {
354 for (size_t i = 0; i < server_options->size(); i++) {
355 builder.SetOption(std::move((*server_options)[i]));
356 }
357 }
358 if (FLAGS_max_send_message_size >= 0) {
359 builder.SetMaxSendMessageSize(FLAGS_max_send_message_size);
360 }
361 std::unique_ptr<Server> server(builder.BuildAndStart());
362 gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str());
363
364 // Signal that the server has started.
365 if (server_started_condition) {
366 std::unique_lock<std::mutex> lock(server_started_condition->mutex);
367 server_started_condition->server_started = true;
368 server_started_condition->condition.notify_all();
369 }
370
371 while (!gpr_atm_no_barrier_load(&g_got_sigint)) {
372 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
373 gpr_time_from_seconds(5, GPR_TIMESPAN)));
374 }
375 }
376