• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "test/cpp/interop/pre_stop_hook_server.h"
20 
21 #include <grpcpp/grpcpp.h>
22 
23 #include <thread>
24 
25 #include "absl/strings/str_format.h"
26 #include "src/core/util/sync.h"
27 #include "src/proto/grpc/testing/messages.pb.h"
28 
29 namespace grpc {
30 namespace testing {
31 namespace {
32 
33 enum class State : std::uint8_t { kNew, kWaiting, kDone, kShuttingDown };
34 
BuildHookServer(HookServiceImpl * service,int port)35 std::unique_ptr<Server> BuildHookServer(HookServiceImpl* service, int port) {
36   ServerBuilder builder;
37   builder.AddListeningPort(absl::StrFormat("0.0.0.0:%d", port),
38                            grpc::InsecureServerCredentials());
39   builder.RegisterService(service);
40   return builder.BuildAndStart();
41 }
42 
43 }  // namespace
44 
45 class PreStopHookServer {
46  public:
PreStopHookServer(int port,const absl::Duration & startup_timeout)47   explicit PreStopHookServer(int port, const absl::Duration& startup_timeout)
48       : server_(BuildHookServer(&hook_service_, port)),
49         server_thread_(PreStopHookServer::ServerThread, this) {
50     WaitForState(State::kWaiting, startup_timeout);
51   }
52 
~PreStopHookServer()53   ~PreStopHookServer() {
54     hook_service_.Stop();
55     SetState(State::kShuttingDown);
56     server_->Shutdown();
57     WaitForState(State::kDone, absl::Seconds(5));
58     server_thread_.detach();
59   }
60 
GetState()61   State GetState() {
62     grpc_core::MutexLock lock(&mu_);
63     return state_;
64   }
65 
SetState(State state)66   void SetState(State state) {
67     grpc_core::MutexLock lock(&mu_);
68     state_ = state;
69     condition_.SignalAll();
70   }
71 
SetReturnStatus(const Status & status)72   void SetReturnStatus(const Status& status) {
73     hook_service_.AddReturnStatus(status);
74   }
75 
TestOnlyExpectRequests(size_t expected_requests_count,absl::Duration timeout)76   bool TestOnlyExpectRequests(size_t expected_requests_count,
77                               absl::Duration timeout) {
78     return hook_service_.TestOnlyExpectRequests(expected_requests_count,
79                                                 timeout);
80   }
81 
82  private:
WaitForState(State state,const absl::Duration & timeout)83   bool WaitForState(State state, const absl::Duration& timeout) {
84     grpc_core::MutexLock lock(&mu_);
85     auto deadline = absl::Now() + timeout;
86     while (state_ != state && !condition_.WaitWithDeadline(&mu_, deadline)) {
87     }
88     return state_ == state;
89   }
90 
ServerThread(PreStopHookServer * server)91   static void ServerThread(PreStopHookServer* server) {
92     server->SetState(State::kWaiting);
93     server->server_->Wait();
94     server->SetState(State::kDone);
95   }
96 
97   HookServiceImpl hook_service_;
98   grpc_core::Mutex mu_;
99   grpc_core::CondVar condition_ ABSL_GUARDED_BY(mu_);
100   State state_ ABSL_GUARDED_BY(mu_) = State::kNew;
101   std::unique_ptr<Server> server_;
102   std::thread server_thread_;
103 };
104 
Start(int port,size_t timeout_s)105 Status PreStopHookServerManager::Start(int port, size_t timeout_s) {
106   if (server_) {
107     return Status(StatusCode::ALREADY_EXISTS,
108                   "Pre hook server is already running");
109   }
110   server_ = std::unique_ptr<PreStopHookServer, PreStopHookServerDeleter>(
111       new PreStopHookServer(port, absl::Seconds(timeout_s)),
112       PreStopHookServerDeleter());
113   return server_->GetState() == State::kWaiting
114              ? Status::OK
115              : Status(StatusCode::DEADLINE_EXCEEDED, "Server have not started");
116 }
117 
Stop()118 Status PreStopHookServerManager::Stop() {
119   if (!server_) {
120     return Status(StatusCode::UNAVAILABLE, "Pre hook server is not running");
121   }
122   server_.reset();
123   return Status::OK;
124 }
125 
Return(StatusCode code,absl::string_view description)126 void PreStopHookServerManager::Return(StatusCode code,
127                                       absl::string_view description) {
128   server_->SetReturnStatus(Status(code, std::string(description)));
129 }
130 
TestOnlyExpectRequests(size_t expected_requests_count,const absl::Duration & timeout)131 bool PreStopHookServerManager::TestOnlyExpectRequests(
132     size_t expected_requests_count, const absl::Duration& timeout) {
133   return server_->TestOnlyExpectRequests(expected_requests_count, timeout);
134 }
135 
operator ()(PreStopHookServer * server)136 void PreStopHookServerManager::PreStopHookServerDeleter::operator()(
137     PreStopHookServer* server) {
138   delete server;
139 }
140 
141 //
142 // HookServiceImpl
143 //
144 
Hook(CallbackServerContext * context,const Empty *,Empty *)145 ServerUnaryReactor* HookServiceImpl::Hook(CallbackServerContext* context,
146                                           const Empty* /* request */,
147                                           Empty* /* reply */) {
148   auto reactor = context->DefaultReactor();
149   grpc_core::MutexLock lock(&mu_);
150   pending_requests_.emplace_back(reactor);
151   MatchRequestsAndStatuses();
152   return reactor;
153 }
154 
SetReturnStatus(CallbackServerContext * context,const SetReturnStatusRequest * request,Empty *)155 ServerUnaryReactor* HookServiceImpl::SetReturnStatus(
156     CallbackServerContext* context, const SetReturnStatusRequest* request,
157     Empty* /* reply */) {
158   auto reactor = context->DefaultReactor();
159   reactor->Finish(Status::OK);
160   grpc_core::MutexLock lock(&mu_);
161   respond_all_status_.emplace(
162       static_cast<StatusCode>(request->grpc_code_to_return()),
163       request->grpc_status_description());
164   MatchRequestsAndStatuses();
165   return reactor;
166 }
167 
ClearReturnStatus(CallbackServerContext * context,const Empty *,Empty *)168 ServerUnaryReactor* HookServiceImpl::ClearReturnStatus(
169     CallbackServerContext* context, const Empty* /* request */,
170     Empty* /* reply */) {
171   auto reactor = context->DefaultReactor();
172   reactor->Finish(Status::OK);
173   grpc_core::MutexLock lock(&mu_);
174   respond_all_status_.reset();
175   MatchRequestsAndStatuses();
176   return reactor;
177 }
178 
AddReturnStatus(const Status & status)179 void HookServiceImpl::AddReturnStatus(const Status& status) {
180   grpc_core::MutexLock lock(&mu_);
181   pending_statuses_.push_back(status);
182   MatchRequestsAndStatuses();
183 }
184 
TestOnlyExpectRequests(size_t expected_requests_count,const absl::Duration & timeout)185 bool HookServiceImpl::TestOnlyExpectRequests(size_t expected_requests_count,
186                                              const absl::Duration& timeout) {
187   grpc_core::MutexLock lock(&mu_);
188   auto deadline = absl::Now() + timeout;
189   while (pending_requests_.size() < expected_requests_count &&
190          !request_var_.WaitWithDeadline(&mu_, deadline)) {
191   }
192   return pending_requests_.size() >= expected_requests_count;
193 }
194 
Stop()195 void HookServiceImpl::Stop() {
196   grpc_core::MutexLock lock(&mu_);
197   if (!respond_all_status_.has_value()) {
198     respond_all_status_.emplace(StatusCode::ABORTED, "Shutting down");
199   }
200   MatchRequestsAndStatuses();
201 }
202 
MatchRequestsAndStatuses()203 void HookServiceImpl::MatchRequestsAndStatuses() {
204   while (!pending_requests_.empty() && !pending_statuses_.empty()) {
205     pending_requests_.front()->Finish(std::move(pending_statuses_.front()));
206     pending_requests_.erase(pending_requests_.begin());
207     pending_statuses_.erase(pending_statuses_.begin());
208   }
209   if (respond_all_status_.has_value()) {
210     for (const auto& request : pending_requests_) {
211       request->Finish(*respond_all_status_);
212     }
213     pending_requests_.clear();
214   }
215   request_var_.SignalAll();
216 }
217 
218 }  // namespace testing
219 }  // namespace grpc
220