1 /*
2 *
3 * Copyright 2015-2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <fstream>
20 #include <memory>
21 #include <sstream>
22 #include <thread>
23
24 #include <gflags/gflags.h>
25 #include <grpc/grpc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/time.h>
28 #include <grpcpp/security/server_credentials.h>
29 #include <grpcpp/server.h>
30 #include <grpcpp/server_builder.h>
31 #include <grpcpp/server_context.h>
32
33 #include "src/core/lib/gpr/string.h"
34 #include "src/proto/grpc/testing/empty.pb.h"
35 #include "src/proto/grpc/testing/messages.pb.h"
36 #include "src/proto/grpc/testing/test.grpc.pb.h"
37 #include "test/cpp/interop/server_helper.h"
38 #include "test/cpp/util/test_config.h"
39
40 DEFINE_bool(use_alts, false,
41 "Whether to use alts. Enable alts will disable tls.");
42 DEFINE_bool(use_tls, false, "Whether to use tls.");
43 DEFINE_string(custom_credentials_type, "", "User provided credentials type.");
44 DEFINE_int32(port, 0, "Server port.");
45 DEFINE_int32(max_send_message_size, -1, "The maximum send message size.");
46
47 using grpc::Server;
48 using grpc::ServerBuilder;
49 using grpc::ServerContext;
50 using grpc::ServerCredentials;
51 using grpc::ServerReader;
52 using grpc::ServerReaderWriter;
53 using grpc::ServerWriter;
54 using grpc::SslServerCredentialsOptions;
55 using grpc::Status;
56 using grpc::WriteOptions;
57 using grpc::testing::InteropServerContextInspector;
58 using grpc::testing::Payload;
59 using grpc::testing::SimpleRequest;
60 using grpc::testing::SimpleResponse;
61 using grpc::testing::StreamingInputCallRequest;
62 using grpc::testing::StreamingInputCallResponse;
63 using grpc::testing::StreamingOutputCallRequest;
64 using grpc::testing::StreamingOutputCallResponse;
65 using grpc::testing::TestService;
66
67 const char kEchoInitialMetadataKey[] = "x-grpc-test-echo-initial";
68 const char kEchoTrailingBinMetadataKey[] = "x-grpc-test-echo-trailing-bin";
69 const char kEchoUserAgentKey[] = "x-grpc-test-echo-useragent";
70
MaybeEchoMetadata(ServerContext * context)71 void MaybeEchoMetadata(ServerContext* context) {
72 const auto& client_metadata = context->client_metadata();
73 GPR_ASSERT(client_metadata.count(kEchoInitialMetadataKey) <= 1);
74 GPR_ASSERT(client_metadata.count(kEchoTrailingBinMetadataKey) <= 1);
75
76 auto iter = client_metadata.find(kEchoInitialMetadataKey);
77 if (iter != client_metadata.end()) {
78 context->AddInitialMetadata(
79 kEchoInitialMetadataKey,
80 std::string(iter->second.begin(), iter->second.end()));
81 }
82 iter = client_metadata.find(kEchoTrailingBinMetadataKey);
83 if (iter != client_metadata.end()) {
84 context->AddTrailingMetadata(
85 kEchoTrailingBinMetadataKey,
86 std::string(iter->second.begin(), iter->second.end()));
87 }
88 // Check if client sent a magic key in the header that makes us echo
89 // back the user-agent (for testing purpose)
90 iter = client_metadata.find(kEchoUserAgentKey);
91 if (iter != client_metadata.end()) {
92 iter = client_metadata.find("user-agent");
93 if (iter != client_metadata.end()) {
94 context->AddInitialMetadata(
95 kEchoUserAgentKey,
96 std::string(iter->second.begin(), iter->second.end()));
97 }
98 }
99 }
100
SetPayload(int size,Payload * payload)101 bool SetPayload(int size, Payload* payload) {
102 std::unique_ptr<char[]> body(new char[size]());
103 payload->set_body(body.get(), size);
104 return true;
105 }
106
CheckExpectedCompression(const ServerContext & context,const bool compression_expected)107 bool CheckExpectedCompression(const ServerContext& context,
108 const bool compression_expected) {
109 const InteropServerContextInspector inspector(context);
110 const grpc_compression_algorithm received_compression =
111 inspector.GetCallCompressionAlgorithm();
112
113 if (compression_expected) {
114 if (received_compression == GRPC_COMPRESS_NONE) {
115 // Expected some compression, got NONE. This is an error.
116 gpr_log(GPR_ERROR,
117 "Expected compression but got uncompressed request from client.");
118 return false;
119 }
120 if (!(inspector.WasCompressed())) {
121 gpr_log(GPR_ERROR,
122 "Failure: Requested compression in a compressable request, but "
123 "compression bit in message flags not set.");
124 return false;
125 }
126 } else {
127 // Didn't expect compression -> make sure the request is uncompressed
128 if (inspector.WasCompressed()) {
129 gpr_log(GPR_ERROR,
130 "Failure: Didn't requested compression, but compression bit in "
131 "message flags set.");
132 return false;
133 }
134 }
135 return true;
136 }
137
138 class TestServiceImpl : public TestService::Service {
139 public:
EmptyCall(ServerContext * context,const grpc::testing::Empty *,grpc::testing::Empty *)140 Status EmptyCall(ServerContext* context,
141 const grpc::testing::Empty* /*request*/,
142 grpc::testing::Empty* /*response*/) {
143 MaybeEchoMetadata(context);
144 return Status::OK;
145 }
146
147 // Response contains current timestamp. We ignore everything in the request.
CacheableUnaryCall(ServerContext * context,const SimpleRequest *,SimpleResponse * response)148 Status CacheableUnaryCall(ServerContext* context,
149 const SimpleRequest* /*request*/,
150 SimpleResponse* response) {
151 gpr_timespec ts = gpr_now(GPR_CLOCK_PRECISE);
152 std::string timestamp = std::to_string((long long unsigned)ts.tv_nsec);
153 response->mutable_payload()->set_body(timestamp.c_str(), timestamp.size());
154 context->AddInitialMetadata("cache-control", "max-age=60, public");
155 return Status::OK;
156 }
157
UnaryCall(ServerContext * context,const SimpleRequest * request,SimpleResponse * response)158 Status UnaryCall(ServerContext* context, const SimpleRequest* request,
159 SimpleResponse* response) {
160 MaybeEchoMetadata(context);
161 if (request->has_response_compressed()) {
162 const bool compression_requested = request->response_compressed().value();
163 gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s",
164 compression_requested ? "enabled" : "disabled", __func__);
165 if (compression_requested) {
166 // Any level would do, let's go for HIGH because we are overachievers.
167 context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
168 } else {
169 context->set_compression_level(GRPC_COMPRESS_LEVEL_NONE);
170 }
171 }
172 if (!CheckExpectedCompression(*context,
173 request->expect_compressed().value())) {
174 return Status(grpc::StatusCode::INVALID_ARGUMENT,
175 "Compressed request expectation not met.");
176 }
177 if (request->response_size() > 0) {
178 if (!SetPayload(request->response_size(), response->mutable_payload())) {
179 return Status(grpc::StatusCode::INVALID_ARGUMENT,
180 "Error creating payload.");
181 }
182 }
183
184 if (request->has_response_status()) {
185 return Status(
186 static_cast<grpc::StatusCode>(request->response_status().code()),
187 request->response_status().message());
188 }
189
190 return Status::OK;
191 }
192
StreamingOutputCall(ServerContext * context,const StreamingOutputCallRequest * request,ServerWriter<StreamingOutputCallResponse> * writer)193 Status StreamingOutputCall(
194 ServerContext* context, const StreamingOutputCallRequest* request,
195 ServerWriter<StreamingOutputCallResponse>* writer) {
196 StreamingOutputCallResponse response;
197 bool write_success = true;
198 for (int i = 0; write_success && i < request->response_parameters_size();
199 i++) {
200 if (!SetPayload(request->response_parameters(i).size(),
201 response.mutable_payload())) {
202 return Status(grpc::StatusCode::INVALID_ARGUMENT,
203 "Error creating payload.");
204 }
205 WriteOptions wopts;
206 if (request->response_parameters(i).has_compressed()) {
207 // Compress by default. Disabled on a per-message basis.
208 context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
209 const bool compression_requested =
210 request->response_parameters(i).compressed().value();
211 gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s",
212 compression_requested ? "enabled" : "disabled", __func__);
213 if (!compression_requested) {
214 wopts.set_no_compression();
215 } // else, compression is already enabled via the context.
216 }
217 int time_us;
218 if ((time_us = request->response_parameters(i).interval_us()) > 0) {
219 // Sleep before response if needed
220 gpr_timespec sleep_time =
221 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
222 gpr_time_from_micros(time_us, GPR_TIMESPAN));
223 gpr_sleep_until(sleep_time);
224 }
225 write_success = writer->Write(response, wopts);
226 }
227 if (write_success) {
228 return Status::OK;
229 } else {
230 return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
231 }
232 }
233
StreamingInputCall(ServerContext * context,ServerReader<StreamingInputCallRequest> * reader,StreamingInputCallResponse * response)234 Status StreamingInputCall(ServerContext* context,
235 ServerReader<StreamingInputCallRequest>* reader,
236 StreamingInputCallResponse* response) {
237 StreamingInputCallRequest request;
238 int aggregated_payload_size = 0;
239 while (reader->Read(&request)) {
240 if (!CheckExpectedCompression(*context,
241 request.expect_compressed().value())) {
242 return Status(grpc::StatusCode::INVALID_ARGUMENT,
243 "Compressed request expectation not met.");
244 }
245 if (request.has_payload()) {
246 aggregated_payload_size += request.payload().body().size();
247 }
248 }
249 response->set_aggregated_payload_size(aggregated_payload_size);
250 return Status::OK;
251 }
252
FullDuplexCall(ServerContext * context,ServerReaderWriter<StreamingOutputCallResponse,StreamingOutputCallRequest> * stream)253 Status FullDuplexCall(
254 ServerContext* context,
255 ServerReaderWriter<StreamingOutputCallResponse,
256 StreamingOutputCallRequest>* stream) {
257 MaybeEchoMetadata(context);
258 StreamingOutputCallRequest request;
259 StreamingOutputCallResponse response;
260 bool write_success = true;
261 while (write_success && stream->Read(&request)) {
262 if (request.has_response_status()) {
263 return Status(
264 static_cast<grpc::StatusCode>(request.response_status().code()),
265 request.response_status().message());
266 }
267 if (request.response_parameters_size() != 0) {
268 response.mutable_payload()->set_type(request.payload().type());
269 response.mutable_payload()->set_body(
270 std::string(request.response_parameters(0).size(), '\0'));
271 int time_us;
272 if ((time_us = request.response_parameters(0).interval_us()) > 0) {
273 // Sleep before response if needed
274 gpr_timespec sleep_time =
275 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
276 gpr_time_from_micros(time_us, GPR_TIMESPAN));
277 gpr_sleep_until(sleep_time);
278 }
279 write_success = stream->Write(response);
280 }
281 }
282 if (write_success) {
283 return Status::OK;
284 } else {
285 return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
286 }
287 }
288
HalfDuplexCall(ServerContext *,ServerReaderWriter<StreamingOutputCallResponse,StreamingOutputCallRequest> * stream)289 Status HalfDuplexCall(
290 ServerContext* /*context*/,
291 ServerReaderWriter<StreamingOutputCallResponse,
292 StreamingOutputCallRequest>* stream) {
293 std::vector<StreamingOutputCallRequest> requests;
294 StreamingOutputCallRequest request;
295 while (stream->Read(&request)) {
296 requests.push_back(request);
297 }
298
299 StreamingOutputCallResponse response;
300 bool write_success = true;
301 for (unsigned int i = 0; write_success && i < requests.size(); i++) {
302 response.mutable_payload()->set_type(requests[i].payload().type());
303 if (requests[i].response_parameters_size() == 0) {
304 return Status(grpc::StatusCode::INTERNAL,
305 "Request does not have response parameters.");
306 }
307 response.mutable_payload()->set_body(
308 std::string(requests[i].response_parameters(0).size(), '\0'));
309 write_success = stream->Write(response);
310 }
311 if (write_success) {
312 return Status::OK;
313 } else {
314 return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
315 }
316 }
317 };
318
RunServer(const std::shared_ptr<ServerCredentials> & creds)319 void grpc::testing::interop::RunServer(
320 const std::shared_ptr<ServerCredentials>& creds) {
321 RunServer(creds, FLAGS_port, nullptr, nullptr);
322 }
323
RunServer(const std::shared_ptr<ServerCredentials> & creds,std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>> server_options)324 void grpc::testing::interop::RunServer(
325 const std::shared_ptr<ServerCredentials>& creds,
326 std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>>
327 server_options) {
328 RunServer(creds, FLAGS_port, nullptr, std::move(server_options));
329 }
330
RunServer(const std::shared_ptr<ServerCredentials> & creds,const int port,ServerStartedCondition * server_started_condition)331 void grpc::testing::interop::RunServer(
332 const std::shared_ptr<ServerCredentials>& creds, const int port,
333 ServerStartedCondition* server_started_condition) {
334 RunServer(creds, port, server_started_condition, nullptr);
335 }
336
RunServer(const std::shared_ptr<ServerCredentials> & creds,const int port,ServerStartedCondition * server_started_condition,std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>> server_options)337 void grpc::testing::interop::RunServer(
338 const std::shared_ptr<ServerCredentials>& creds, const int port,
339 ServerStartedCondition* server_started_condition,
340 std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>>
341 server_options) {
342 GPR_ASSERT(port != 0);
343 std::ostringstream server_address;
344 server_address << "0.0.0.0:" << port;
345 TestServiceImpl service;
346
347 SimpleRequest request;
348 SimpleResponse response;
349
350 ServerBuilder builder;
351 builder.RegisterService(&service);
352 builder.AddListeningPort(server_address.str(), creds);
353 if (server_options != nullptr) {
354 for (size_t i = 0; i < server_options->size(); i++) {
355 builder.SetOption(std::move((*server_options)[i]));
356 }
357 }
358 if (FLAGS_max_send_message_size >= 0) {
359 builder.SetMaxSendMessageSize(FLAGS_max_send_message_size);
360 }
361 std::unique_ptr<Server> server(builder.BuildAndStart());
362 gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str());
363
364 // Signal that the server has started.
365 if (server_started_condition) {
366 std::unique_lock<std::mutex> lock(server_started_condition->mutex);
367 server_started_condition->server_started = true;
368 server_started_condition->condition.notify_all();
369 }
370
371 while (!gpr_atm_no_barrier_load(&g_got_sigint)) {
372 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
373 gpr_time_from_seconds(5, GPR_TIMESPAN)));
374 }
375 }
376