• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "test/cpp/interop/xds_interop_server_lib.h"
20 
21 #include <grpc/grpc.h>
22 #include <grpc/support/time.h>
23 #include <grpcpp/ext/admin_services.h>
24 #include <grpcpp/ext/proto_server_reflection_plugin.h>
25 #include <grpcpp/server.h>
26 #include <grpcpp/server_builder.h>
27 #include <grpcpp/server_context.h>
28 #include <grpcpp/xds_server_builder.h>
29 
30 #include <memory>
31 
32 #include "absl/log/log.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/str_split.h"
35 #include "src/proto/grpc/testing/empty.pb.h"
36 #include "src/proto/grpc/testing/messages.pb.h"
37 #include "src/proto/grpc/testing/test.grpc.pb.h"
38 #include "test/cpp/end2end/test_health_check_service_impl.h"
39 #include "test/cpp/interop/pre_stop_hook_server.h"
40 
41 namespace grpc {
42 namespace testing {
43 namespace {
44 
45 using grpc::Server;
46 using grpc::ServerBuilder;
47 using grpc::ServerContext;
48 using grpc::Status;
49 using grpc::XdsServerBuilder;
50 using grpc::testing::Empty;
51 using grpc::testing::HealthCheckServiceImpl;
52 using grpc::testing::SimpleRequest;
53 using grpc::testing::SimpleResponse;
54 using grpc::testing::TestService;
55 using grpc::testing::XdsUpdateHealthService;
56 
57 constexpr absl::string_view kRpcBehaviorMetadataKey = "rpc-behavior";
58 constexpr absl::string_view kErrorCodeRpcBehavior = "error-code-";
59 constexpr absl::string_view kHostnameRpcBehaviorFilter = "hostname=";
60 
GetRpcBehaviorMetadata(ServerContext * context)61 std::vector<std::string> GetRpcBehaviorMetadata(ServerContext* context) {
62   std::vector<std::string> rpc_behaviors;
63   auto rpc_behavior_metadata =
64       context->client_metadata().equal_range(grpc::string_ref(
65           kRpcBehaviorMetadataKey.data(), kRpcBehaviorMetadataKey.length()));
66   for (auto metadata = rpc_behavior_metadata.first;
67        metadata != rpc_behavior_metadata.second; ++metadata) {
68     auto value = metadata->second;
69     for (auto behavior :
70          absl::StrSplit(absl::string_view(value.data(), value.length()), ',')) {
71       rpc_behaviors.emplace_back(behavior);
72     }
73   }
74   return rpc_behaviors;
75 }
76 
77 class TestServiceImpl : public TestService::Service {
78  public:
TestServiceImpl(absl::string_view hostname,absl::string_view server_id)79   explicit TestServiceImpl(absl::string_view hostname,
80                            absl::string_view server_id)
81       : hostname_(hostname), server_id_(server_id) {}
82 
UnaryCall(ServerContext * context,const SimpleRequest * request,SimpleResponse * response)83   Status UnaryCall(ServerContext* context, const SimpleRequest* request,
84                    SimpleResponse* response) override {
85     response->set_server_id(server_id_);
86     for (const auto& rpc_behavior : GetRpcBehaviorMetadata(context)) {
87       auto maybe_status =
88           GetStatusForRpcBehaviorMetadata(rpc_behavior, hostname_);
89       if (maybe_status.has_value()) {
90         return *maybe_status;
91       }
92     }
93     if (request->response_size() > 0) {
94       std::string payload(request->response_size(), '0');
95       response->mutable_payload()->set_body(payload.c_str(),
96                                             request->response_size());
97     }
98     response->set_hostname(hostname_);
99     context->AddInitialMetadata("hostname", hostname_);
100     return Status::OK;
101   }
102 
EmptyCall(ServerContext * context,const Empty *,Empty *)103   Status EmptyCall(ServerContext* context, const Empty* /*request*/,
104                    Empty* /*response*/) override {
105     context->AddInitialMetadata("hostname", hostname_);
106     return Status::OK;
107   }
108 
109  private:
110   std::string hostname_;
111   std::string server_id_;
112 };
113 
114 class XdsUpdateHealthServiceImpl : public XdsUpdateHealthService::Service {
115  public:
XdsUpdateHealthServiceImpl(HealthCheckServiceImpl * health_check_service,std::unique_ptr<PreStopHookServerManager> pre_stop_hook_server)116   explicit XdsUpdateHealthServiceImpl(
117       HealthCheckServiceImpl* health_check_service,
118       std::unique_ptr<PreStopHookServerManager> pre_stop_hook_server)
119       : health_check_service_(health_check_service),
120         pre_stop_hook_server_(std::move(pre_stop_hook_server)) {}
121 
SetServing(ServerContext *,const Empty *,Empty *)122   Status SetServing(ServerContext* /* context */, const Empty* /* request */,
123                     Empty* /* response */) override {
124     health_check_service_->SetAll(
125         grpc::health::v1::HealthCheckResponse::SERVING);
126     return Status::OK;
127   }
128 
SetNotServing(ServerContext *,const Empty *,Empty *)129   Status SetNotServing(ServerContext* /* context */, const Empty* /* request */,
130                        Empty* /* response */) override {
131     health_check_service_->SetAll(
132         grpc::health::v1::HealthCheckResponse::NOT_SERVING);
133     return Status::OK;
134   }
135 
SendHookRequest(ServerContext *,const HookRequest * request,HookResponse *)136   Status SendHookRequest(ServerContext* /* context */,
137                          const HookRequest* request,
138                          HookResponse* /* response */) override {
139     switch (request->command()) {
140       case HookRequest::START:
141         return pre_stop_hook_server_->Start(request->server_port(), 30 /* s */);
142       case HookRequest::STOP:
143         return pre_stop_hook_server_->Stop();
144       case HookRequest::RETURN:
145         pre_stop_hook_server_->Return(
146             static_cast<StatusCode>(request->grpc_code_to_return()),
147             request->grpc_status_description());
148         return Status::OK;
149       default:
150         return Status(
151             StatusCode::INVALID_ARGUMENT,
152             absl::StrFormat("Invalid command %d", request->command()));
153     }
154   }
155 
156  private:
157   HealthCheckServiceImpl* const health_check_service_;
158   std::unique_ptr<PreStopHookServerManager> pre_stop_hook_server_;
159 };
160 
161 class MaintenanceServices {
162  public:
MaintenanceServices()163   MaintenanceServices()
164       : update_health_service_(&health_check_service_,
165                                std::make_unique<PreStopHookServerManager>()) {
166     health_check_service_.SetStatus(
167         "", grpc::health::v1::HealthCheckResponse::SERVING);
168     health_check_service_.SetStatus(
169         "grpc.testing.TestService",
170         grpc::health::v1::HealthCheckResponse::SERVING);
171     health_check_service_.SetStatus(
172         "grpc.testing.XdsUpdateHealthService",
173         grpc::health::v1::HealthCheckResponse::SERVING);
174   }
175 
AddToServerBuilder(ServerBuilder * builder)176   void AddToServerBuilder(ServerBuilder* builder) {
177     builder->RegisterService(&health_check_service_);
178     builder->RegisterService(&update_health_service_);
179     builder->RegisterService(&hook_service_);
180     grpc::AddAdminServices(builder);
181   }
182 
183  private:
184   HealthCheckServiceImpl health_check_service_;
185   XdsUpdateHealthServiceImpl update_health_service_;
186   HookServiceImpl hook_service_;
187 };
188 }  // namespace
189 
GetStatusForRpcBehaviorMetadata(absl::string_view header_value,absl::string_view hostname)190 absl::optional<grpc::Status> GetStatusForRpcBehaviorMetadata(
191     absl::string_view header_value, absl::string_view hostname) {
192   for (auto part : absl::StrSplit(header_value, ' ')) {
193     if (absl::ConsumePrefix(&part, kHostnameRpcBehaviorFilter)) {
194       LOG(INFO) << part;
195       if (part.empty()) {
196         return Status(
197             grpc::StatusCode::INVALID_ARGUMENT,
198             absl::StrCat("Empty host name in the RPC behavior header: ",
199                          header_value));
200       }
201       if (part != hostname) {
202         VLOG(2) << "RPC behavior for a different host: \"" << std::string(part)
203                 << "\", this one is: \"" << hostname << "\"";
204         return absl::nullopt;
205       }
206     } else if (absl::ConsumePrefix(&part, kErrorCodeRpcBehavior)) {
207       grpc::StatusCode code;
208       if (absl::SimpleAtoi(part, &code)) {
209         return Status(
210             code,
211             absl::StrCat("Rpc failed as per the rpc-behavior header value: ",
212                          header_value));
213       } else {
214         return Status(grpc::StatusCode::INVALID_ARGUMENT,
215                       absl::StrCat("Invalid format for rpc-behavior header: ",
216                                    header_value));
217       }
218     } else {
219       // TODO (eugeneo): Add support for other behaviors as needed
220       return Status(
221           grpc::StatusCode::INVALID_ARGUMENT,
222           absl::StrCat("Unsupported rpc behavior header: ", header_value));
223     }
224   }
225   return absl::nullopt;
226 }
227 
RunServer(bool secure_mode,int port,const int maintenance_port,absl::string_view hostname,absl::string_view server_id,const std::function<void (Server *)> & server_callback)228 void RunServer(bool secure_mode, int port, const int maintenance_port,
229                absl::string_view hostname, absl::string_view server_id,
230                const std::function<void(Server*)>& server_callback) {
231   std::unique_ptr<Server> xds_enabled_server;
232   std::unique_ptr<Server> server;
233   TestServiceImpl service(hostname, server_id);
234 
235   grpc::reflection::InitProtoReflectionServerBuilderPlugin();
236   MaintenanceServices maintenance_services;
237   if (secure_mode) {
238     XdsServerBuilder xds_builder;
239     xds_builder.RegisterService(&service);
240     xds_builder.AddListeningPort(
241         absl::StrCat("0.0.0.0:", port),
242         grpc::XdsServerCredentials(grpc::InsecureServerCredentials()));
243     xds_enabled_server = xds_builder.BuildAndStart();
244     LOG(INFO) << "Server starting on 0.0.0.0:" << port;
245     ServerBuilder builder;
246     maintenance_services.AddToServerBuilder(&builder);
247     server = builder
248                  .AddListeningPort(absl::StrCat("0.0.0.0:", maintenance_port),
249                                    grpc::InsecureServerCredentials())
250                  .BuildAndStart();
251     LOG(INFO) << "Maintenance server listening on 0.0.0.0:" << maintenance_port;
252   } else {
253     ServerBuilder builder;
254     maintenance_services.AddToServerBuilder(&builder);
255     server = builder
256                  .AddListeningPort(absl::StrCat("0.0.0.0:", port),
257                                    grpc::InsecureServerCredentials())
258                  .RegisterService(&service)
259                  .BuildAndStart();
260     LOG(INFO) << "Server listening on 0.0.0.0:" << port;
261   }
262   server_callback(server.get());
263   server->Wait();
264 }
265 
266 }  // namespace testing
267 }  // namespace grpc
268