1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/cpp/interop/pre_stop_hook_server.h"
20
21 #include <grpcpp/grpcpp.h>
22
23 #include <thread>
24
25 #include "absl/strings/str_format.h"
26 #include "src/core/util/sync.h"
27 #include "src/proto/grpc/testing/messages.pb.h"
28
29 namespace grpc {
30 namespace testing {
31 namespace {
32
33 enum class State : std::uint8_t { kNew, kWaiting, kDone, kShuttingDown };
34
BuildHookServer(HookServiceImpl * service,int port)35 std::unique_ptr<Server> BuildHookServer(HookServiceImpl* service, int port) {
36 ServerBuilder builder;
37 builder.AddListeningPort(absl::StrFormat("0.0.0.0:%d", port),
38 grpc::InsecureServerCredentials());
39 builder.RegisterService(service);
40 return builder.BuildAndStart();
41 }
42
43 } // namespace
44
45 class PreStopHookServer {
46 public:
PreStopHookServer(int port,const absl::Duration & startup_timeout)47 explicit PreStopHookServer(int port, const absl::Duration& startup_timeout)
48 : server_(BuildHookServer(&hook_service_, port)),
49 server_thread_(PreStopHookServer::ServerThread, this) {
50 WaitForState(State::kWaiting, startup_timeout);
51 }
52
~PreStopHookServer()53 ~PreStopHookServer() {
54 hook_service_.Stop();
55 SetState(State::kShuttingDown);
56 server_->Shutdown();
57 WaitForState(State::kDone, absl::Seconds(5));
58 server_thread_.detach();
59 }
60
GetState()61 State GetState() {
62 grpc_core::MutexLock lock(&mu_);
63 return state_;
64 }
65
SetState(State state)66 void SetState(State state) {
67 grpc_core::MutexLock lock(&mu_);
68 state_ = state;
69 condition_.SignalAll();
70 }
71
SetReturnStatus(const Status & status)72 void SetReturnStatus(const Status& status) {
73 hook_service_.AddReturnStatus(status);
74 }
75
TestOnlyExpectRequests(size_t expected_requests_count,absl::Duration timeout)76 bool TestOnlyExpectRequests(size_t expected_requests_count,
77 absl::Duration timeout) {
78 return hook_service_.TestOnlyExpectRequests(expected_requests_count,
79 timeout);
80 }
81
82 private:
WaitForState(State state,const absl::Duration & timeout)83 bool WaitForState(State state, const absl::Duration& timeout) {
84 grpc_core::MutexLock lock(&mu_);
85 auto deadline = absl::Now() + timeout;
86 while (state_ != state && !condition_.WaitWithDeadline(&mu_, deadline)) {
87 }
88 return state_ == state;
89 }
90
ServerThread(PreStopHookServer * server)91 static void ServerThread(PreStopHookServer* server) {
92 server->SetState(State::kWaiting);
93 server->server_->Wait();
94 server->SetState(State::kDone);
95 }
96
97 HookServiceImpl hook_service_;
98 grpc_core::Mutex mu_;
99 grpc_core::CondVar condition_ ABSL_GUARDED_BY(mu_);
100 State state_ ABSL_GUARDED_BY(mu_) = State::kNew;
101 std::unique_ptr<Server> server_;
102 std::thread server_thread_;
103 };
104
Start(int port,size_t timeout_s)105 Status PreStopHookServerManager::Start(int port, size_t timeout_s) {
106 if (server_) {
107 return Status(StatusCode::ALREADY_EXISTS,
108 "Pre hook server is already running");
109 }
110 server_ = std::unique_ptr<PreStopHookServer, PreStopHookServerDeleter>(
111 new PreStopHookServer(port, absl::Seconds(timeout_s)),
112 PreStopHookServerDeleter());
113 return server_->GetState() == State::kWaiting
114 ? Status::OK
115 : Status(StatusCode::DEADLINE_EXCEEDED, "Server have not started");
116 }
117
Stop()118 Status PreStopHookServerManager::Stop() {
119 if (!server_) {
120 return Status(StatusCode::UNAVAILABLE, "Pre hook server is not running");
121 }
122 server_.reset();
123 return Status::OK;
124 }
125
Return(StatusCode code,absl::string_view description)126 void PreStopHookServerManager::Return(StatusCode code,
127 absl::string_view description) {
128 server_->SetReturnStatus(Status(code, std::string(description)));
129 }
130
TestOnlyExpectRequests(size_t expected_requests_count,const absl::Duration & timeout)131 bool PreStopHookServerManager::TestOnlyExpectRequests(
132 size_t expected_requests_count, const absl::Duration& timeout) {
133 return server_->TestOnlyExpectRequests(expected_requests_count, timeout);
134 }
135
operator ()(PreStopHookServer * server)136 void PreStopHookServerManager::PreStopHookServerDeleter::operator()(
137 PreStopHookServer* server) {
138 delete server;
139 }
140
141 //
142 // HookServiceImpl
143 //
144
Hook(CallbackServerContext * context,const Empty *,Empty *)145 ServerUnaryReactor* HookServiceImpl::Hook(CallbackServerContext* context,
146 const Empty* /* request */,
147 Empty* /* reply */) {
148 auto reactor = context->DefaultReactor();
149 grpc_core::MutexLock lock(&mu_);
150 pending_requests_.emplace_back(reactor);
151 MatchRequestsAndStatuses();
152 return reactor;
153 }
154
SetReturnStatus(CallbackServerContext * context,const SetReturnStatusRequest * request,Empty *)155 ServerUnaryReactor* HookServiceImpl::SetReturnStatus(
156 CallbackServerContext* context, const SetReturnStatusRequest* request,
157 Empty* /* reply */) {
158 auto reactor = context->DefaultReactor();
159 reactor->Finish(Status::OK);
160 grpc_core::MutexLock lock(&mu_);
161 respond_all_status_.emplace(
162 static_cast<StatusCode>(request->grpc_code_to_return()),
163 request->grpc_status_description());
164 MatchRequestsAndStatuses();
165 return reactor;
166 }
167
ClearReturnStatus(CallbackServerContext * context,const Empty *,Empty *)168 ServerUnaryReactor* HookServiceImpl::ClearReturnStatus(
169 CallbackServerContext* context, const Empty* /* request */,
170 Empty* /* reply */) {
171 auto reactor = context->DefaultReactor();
172 reactor->Finish(Status::OK);
173 grpc_core::MutexLock lock(&mu_);
174 respond_all_status_.reset();
175 MatchRequestsAndStatuses();
176 return reactor;
177 }
178
AddReturnStatus(const Status & status)179 void HookServiceImpl::AddReturnStatus(const Status& status) {
180 grpc_core::MutexLock lock(&mu_);
181 pending_statuses_.push_back(status);
182 MatchRequestsAndStatuses();
183 }
184
TestOnlyExpectRequests(size_t expected_requests_count,const absl::Duration & timeout)185 bool HookServiceImpl::TestOnlyExpectRequests(size_t expected_requests_count,
186 const absl::Duration& timeout) {
187 grpc_core::MutexLock lock(&mu_);
188 auto deadline = absl::Now() + timeout;
189 while (pending_requests_.size() < expected_requests_count &&
190 !request_var_.WaitWithDeadline(&mu_, deadline)) {
191 }
192 return pending_requests_.size() >= expected_requests_count;
193 }
194
Stop()195 void HookServiceImpl::Stop() {
196 grpc_core::MutexLock lock(&mu_);
197 if (!respond_all_status_.has_value()) {
198 respond_all_status_.emplace(StatusCode::ABORTED, "Shutting down");
199 }
200 MatchRequestsAndStatuses();
201 }
202
MatchRequestsAndStatuses()203 void HookServiceImpl::MatchRequestsAndStatuses() {
204 while (!pending_requests_.empty() && !pending_statuses_.empty()) {
205 pending_requests_.front()->Finish(std::move(pending_statuses_.front()));
206 pending_requests_.erase(pending_requests_.begin());
207 pending_statuses_.erase(pending_statuses_.begin());
208 }
209 if (respond_all_status_.has_value()) {
210 for (const auto& request : pending_requests_) {
211 request->Finish(*respond_all_status_);
212 }
213 pending_requests_.clear();
214 }
215 request_var_.SignalAll();
216 }
217
218 } // namespace testing
219 } // namespace grpc
220