1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/cpp/interop/xds_interop_server_lib.h"
20
21 #include <grpc/grpc.h>
22 #include <grpc/support/time.h>
23 #include <grpcpp/ext/admin_services.h>
24 #include <grpcpp/ext/proto_server_reflection_plugin.h>
25 #include <grpcpp/server.h>
26 #include <grpcpp/server_builder.h>
27 #include <grpcpp/server_context.h>
28 #include <grpcpp/xds_server_builder.h>
29
30 #include <memory>
31
32 #include "absl/log/log.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/str_split.h"
35 #include "src/proto/grpc/testing/empty.pb.h"
36 #include "src/proto/grpc/testing/messages.pb.h"
37 #include "src/proto/grpc/testing/test.grpc.pb.h"
38 #include "test/cpp/end2end/test_health_check_service_impl.h"
39 #include "test/cpp/interop/pre_stop_hook_server.h"
40
41 namespace grpc {
42 namespace testing {
43 namespace {
44
45 using grpc::Server;
46 using grpc::ServerBuilder;
47 using grpc::ServerContext;
48 using grpc::Status;
49 using grpc::XdsServerBuilder;
50 using grpc::testing::Empty;
51 using grpc::testing::HealthCheckServiceImpl;
52 using grpc::testing::SimpleRequest;
53 using grpc::testing::SimpleResponse;
54 using grpc::testing::TestService;
55 using grpc::testing::XdsUpdateHealthService;
56
57 constexpr absl::string_view kRpcBehaviorMetadataKey = "rpc-behavior";
58 constexpr absl::string_view kErrorCodeRpcBehavior = "error-code-";
59 constexpr absl::string_view kHostnameRpcBehaviorFilter = "hostname=";
60
GetRpcBehaviorMetadata(ServerContext * context)61 std::vector<std::string> GetRpcBehaviorMetadata(ServerContext* context) {
62 std::vector<std::string> rpc_behaviors;
63 auto rpc_behavior_metadata =
64 context->client_metadata().equal_range(grpc::string_ref(
65 kRpcBehaviorMetadataKey.data(), kRpcBehaviorMetadataKey.length()));
66 for (auto metadata = rpc_behavior_metadata.first;
67 metadata != rpc_behavior_metadata.second; ++metadata) {
68 auto value = metadata->second;
69 for (auto behavior :
70 absl::StrSplit(absl::string_view(value.data(), value.length()), ',')) {
71 rpc_behaviors.emplace_back(behavior);
72 }
73 }
74 return rpc_behaviors;
75 }
76
77 class TestServiceImpl : public TestService::Service {
78 public:
TestServiceImpl(absl::string_view hostname,absl::string_view server_id)79 explicit TestServiceImpl(absl::string_view hostname,
80 absl::string_view server_id)
81 : hostname_(hostname), server_id_(server_id) {}
82
UnaryCall(ServerContext * context,const SimpleRequest * request,SimpleResponse * response)83 Status UnaryCall(ServerContext* context, const SimpleRequest* request,
84 SimpleResponse* response) override {
85 response->set_server_id(server_id_);
86 for (const auto& rpc_behavior : GetRpcBehaviorMetadata(context)) {
87 auto maybe_status =
88 GetStatusForRpcBehaviorMetadata(rpc_behavior, hostname_);
89 if (maybe_status.has_value()) {
90 return *maybe_status;
91 }
92 }
93 if (request->response_size() > 0) {
94 std::string payload(request->response_size(), '0');
95 response->mutable_payload()->set_body(payload.c_str(),
96 request->response_size());
97 }
98 response->set_hostname(hostname_);
99 context->AddInitialMetadata("hostname", hostname_);
100 return Status::OK;
101 }
102
EmptyCall(ServerContext * context,const Empty *,Empty *)103 Status EmptyCall(ServerContext* context, const Empty* /*request*/,
104 Empty* /*response*/) override {
105 context->AddInitialMetadata("hostname", hostname_);
106 return Status::OK;
107 }
108
109 private:
110 std::string hostname_;
111 std::string server_id_;
112 };
113
114 class XdsUpdateHealthServiceImpl : public XdsUpdateHealthService::Service {
115 public:
XdsUpdateHealthServiceImpl(HealthCheckServiceImpl * health_check_service,std::unique_ptr<PreStopHookServerManager> pre_stop_hook_server)116 explicit XdsUpdateHealthServiceImpl(
117 HealthCheckServiceImpl* health_check_service,
118 std::unique_ptr<PreStopHookServerManager> pre_stop_hook_server)
119 : health_check_service_(health_check_service),
120 pre_stop_hook_server_(std::move(pre_stop_hook_server)) {}
121
SetServing(ServerContext *,const Empty *,Empty *)122 Status SetServing(ServerContext* /* context */, const Empty* /* request */,
123 Empty* /* response */) override {
124 health_check_service_->SetAll(
125 grpc::health::v1::HealthCheckResponse::SERVING);
126 return Status::OK;
127 }
128
SetNotServing(ServerContext *,const Empty *,Empty *)129 Status SetNotServing(ServerContext* /* context */, const Empty* /* request */,
130 Empty* /* response */) override {
131 health_check_service_->SetAll(
132 grpc::health::v1::HealthCheckResponse::NOT_SERVING);
133 return Status::OK;
134 }
135
SendHookRequest(ServerContext *,const HookRequest * request,HookResponse *)136 Status SendHookRequest(ServerContext* /* context */,
137 const HookRequest* request,
138 HookResponse* /* response */) override {
139 switch (request->command()) {
140 case HookRequest::START:
141 return pre_stop_hook_server_->Start(request->server_port(), 30 /* s */);
142 case HookRequest::STOP:
143 return pre_stop_hook_server_->Stop();
144 case HookRequest::RETURN:
145 pre_stop_hook_server_->Return(
146 static_cast<StatusCode>(request->grpc_code_to_return()),
147 request->grpc_status_description());
148 return Status::OK;
149 default:
150 return Status(
151 StatusCode::INVALID_ARGUMENT,
152 absl::StrFormat("Invalid command %d", request->command()));
153 }
154 }
155
156 private:
157 HealthCheckServiceImpl* const health_check_service_;
158 std::unique_ptr<PreStopHookServerManager> pre_stop_hook_server_;
159 };
160
161 class MaintenanceServices {
162 public:
MaintenanceServices()163 MaintenanceServices()
164 : update_health_service_(&health_check_service_,
165 std::make_unique<PreStopHookServerManager>()) {
166 health_check_service_.SetStatus(
167 "", grpc::health::v1::HealthCheckResponse::SERVING);
168 health_check_service_.SetStatus(
169 "grpc.testing.TestService",
170 grpc::health::v1::HealthCheckResponse::SERVING);
171 health_check_service_.SetStatus(
172 "grpc.testing.XdsUpdateHealthService",
173 grpc::health::v1::HealthCheckResponse::SERVING);
174 }
175
AddToServerBuilder(ServerBuilder * builder)176 void AddToServerBuilder(ServerBuilder* builder) {
177 builder->RegisterService(&health_check_service_);
178 builder->RegisterService(&update_health_service_);
179 builder->RegisterService(&hook_service_);
180 grpc::AddAdminServices(builder);
181 }
182
183 private:
184 HealthCheckServiceImpl health_check_service_;
185 XdsUpdateHealthServiceImpl update_health_service_;
186 HookServiceImpl hook_service_;
187 };
188 } // namespace
189
GetStatusForRpcBehaviorMetadata(absl::string_view header_value,absl::string_view hostname)190 absl::optional<grpc::Status> GetStatusForRpcBehaviorMetadata(
191 absl::string_view header_value, absl::string_view hostname) {
192 for (auto part : absl::StrSplit(header_value, ' ')) {
193 if (absl::ConsumePrefix(&part, kHostnameRpcBehaviorFilter)) {
194 LOG(INFO) << part;
195 if (part.empty()) {
196 return Status(
197 grpc::StatusCode::INVALID_ARGUMENT,
198 absl::StrCat("Empty host name in the RPC behavior header: ",
199 header_value));
200 }
201 if (part != hostname) {
202 VLOG(2) << "RPC behavior for a different host: \"" << std::string(part)
203 << "\", this one is: \"" << hostname << "\"";
204 return absl::nullopt;
205 }
206 } else if (absl::ConsumePrefix(&part, kErrorCodeRpcBehavior)) {
207 grpc::StatusCode code;
208 if (absl::SimpleAtoi(part, &code)) {
209 return Status(
210 code,
211 absl::StrCat("Rpc failed as per the rpc-behavior header value: ",
212 header_value));
213 } else {
214 return Status(grpc::StatusCode::INVALID_ARGUMENT,
215 absl::StrCat("Invalid format for rpc-behavior header: ",
216 header_value));
217 }
218 } else {
219 // TODO (eugeneo): Add support for other behaviors as needed
220 return Status(
221 grpc::StatusCode::INVALID_ARGUMENT,
222 absl::StrCat("Unsupported rpc behavior header: ", header_value));
223 }
224 }
225 return absl::nullopt;
226 }
227
RunServer(bool secure_mode,int port,const int maintenance_port,absl::string_view hostname,absl::string_view server_id,const std::function<void (Server *)> & server_callback)228 void RunServer(bool secure_mode, int port, const int maintenance_port,
229 absl::string_view hostname, absl::string_view server_id,
230 const std::function<void(Server*)>& server_callback) {
231 std::unique_ptr<Server> xds_enabled_server;
232 std::unique_ptr<Server> server;
233 TestServiceImpl service(hostname, server_id);
234
235 grpc::reflection::InitProtoReflectionServerBuilderPlugin();
236 MaintenanceServices maintenance_services;
237 if (secure_mode) {
238 XdsServerBuilder xds_builder;
239 xds_builder.RegisterService(&service);
240 xds_builder.AddListeningPort(
241 absl::StrCat("0.0.0.0:", port),
242 grpc::XdsServerCredentials(grpc::InsecureServerCredentials()));
243 xds_enabled_server = xds_builder.BuildAndStart();
244 LOG(INFO) << "Server starting on 0.0.0.0:" << port;
245 ServerBuilder builder;
246 maintenance_services.AddToServerBuilder(&builder);
247 server = builder
248 .AddListeningPort(absl::StrCat("0.0.0.0:", maintenance_port),
249 grpc::InsecureServerCredentials())
250 .BuildAndStart();
251 LOG(INFO) << "Maintenance server listening on 0.0.0.0:" << maintenance_port;
252 } else {
253 ServerBuilder builder;
254 maintenance_services.AddToServerBuilder(&builder);
255 server = builder
256 .AddListeningPort(absl::StrCat("0.0.0.0:", port),
257 grpc::InsecureServerCredentials())
258 .RegisterService(&service)
259 .BuildAndStart();
260 LOG(INFO) << "Server listening on 0.0.0.0:" << port;
261 }
262 server_callback(server.get());
263 server->Wait();
264 }
265
266 } // namespace testing
267 } // namespace grpc
268