1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/grpc.h>
16 #include <grpc/support/time.h>
17 #include <grpcpp/client_context.h>
18 #include <grpcpp/grpcpp.h>
19 #include <grpcpp/support/server_callback.h>
20 #include <gtest/gtest.h>
21
22 #include <memory>
23 #include <string>
24 #include <thread>
25 #include <utility>
26 #include <vector>
27
28 #include "absl/status/statusor.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/time/time.h"
31 #include "src/core/lib/event_engine/tcp_socket_utils.h"
32 #include "src/core/lib/experiments/config.h"
33 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
34 #include "src/core/util/notification.h"
35 #include "src/cpp/server/secure_server_credentials.h"
36 #include "src/proto/grpc/testing/echo.grpc.pb.h"
37 #include "test/core/test_util/port.h"
38 #include "test/core/test_util/test_config.h"
39 #include "test/cpp/util/credentials.h"
40
41 // IWYU pragma: no_include <sys/socket.h>
42
43 // A stress test which spins up a server with a small configured resource quota
44 // value. It then creates many channels which exchange large payloads with the
45 // server. This would drive the server to reach resource quota limits and
46 // trigger reclamation.
47
48 namespace grpc {
49 namespace testing {
50 namespace {
51 constexpr int kResourceQuotaSizeBytes = 1024 * 1024;
52 constexpr int kPayloadSizeBytes = 1024 * 1024;
53 constexpr int kNumParallelChannels = 10;
54 } // namespace
55
56 class EchoClientUnaryReactor : public grpc::ClientUnaryReactor {
57 public:
EchoClientUnaryReactor(ClientContext * ctx,EchoTestService::Stub * stub,const std::string payload,Status * status)58 EchoClientUnaryReactor(ClientContext* ctx, EchoTestService::Stub* stub,
59 const std::string payload, Status* status)
60 : ctx_(ctx), payload_(payload), status_(status) {
61 request_.set_message(payload);
62 stub->async()->Echo(ctx_, &request_, &response_, this);
63 StartCall();
64 }
65
Await()66 void Await() { notification_.WaitForNotification(); }
67
68 protected:
OnReadInitialMetadataDone(bool)69 void OnReadInitialMetadataDone(bool /*ok*/) override {}
70
OnDone(const Status & s)71 void OnDone(const Status& s) override {
72 *status_ = s;
73 notification_.Notify();
74 }
75
76 private:
77 ClientContext* const ctx_;
78 EchoRequest request_;
79 EchoResponse response_;
80 const std::string payload_;
81 grpc_core::Notification notification_;
82 Status* const status_;
83 };
84
85 class EchoServerUnaryReactor : public ServerUnaryReactor {
86 public:
EchoServerUnaryReactor(CallbackServerContext *,const EchoRequest * request,EchoResponse * response)87 EchoServerUnaryReactor(CallbackServerContext* /*ctx*/,
88 const EchoRequest* request, EchoResponse* response) {
89 response->set_message(request->message());
90 Finish(grpc::Status::OK);
91 }
92
93 private:
OnDone()94 void OnDone() override { delete this; }
95 };
96
97 class GrpcCallbackServiceImpl : public EchoTestService::CallbackService {
98 public:
Echo(CallbackServerContext * context,const EchoRequest * request,EchoResponse * response)99 ServerUnaryReactor* Echo(CallbackServerContext* context,
100 const EchoRequest* request,
101 EchoResponse* response) override {
102 return new EchoServerUnaryReactor(context, request, response);
103 }
104 };
105
106 class End2EndResourceQuotaUnaryTest : public ::testing::Test {
107 protected:
End2EndResourceQuotaUnaryTest()108 End2EndResourceQuotaUnaryTest() {
109 int port = grpc_pick_unused_port_or_die();
110 server_address_ = absl::StrCat("localhost:", port);
111 payload_ = std::string(kPayloadSizeBytes, 'a');
112 ServerBuilder builder;
113 builder.AddListeningPort(server_address_, InsecureServerCredentials());
114 builder.SetResourceQuota(
115 grpc::ResourceQuota("TestService").Resize(kResourceQuotaSizeBytes));
116 builder.RegisterService(&grpc_service_);
117 server_ = builder.BuildAndStart();
118 }
119
~End2EndResourceQuotaUnaryTest()120 ~End2EndResourceQuotaUnaryTest() override { server_->Shutdown(); }
121
MakeGrpcCall()122 void MakeGrpcCall() {
123 ClientContext ctx;
124 Status status;
125 auto stub = EchoTestService::NewStub(
126 CreateChannel(server_address_, grpc::InsecureChannelCredentials()));
127 ctx.set_wait_for_ready(false);
128 EchoClientUnaryReactor reactor(&ctx, stub.get(), payload_, &status);
129 reactor.Await();
130 }
131
MakeGrpcCalls()132 void MakeGrpcCalls() {
133 std::vector<std::thread> workers;
134 workers.reserve(kNumParallelChannels);
135 // Run MakeGrpcCall() many times concurrently.
136 for (int i = 0; i < kNumParallelChannels; ++i) {
137 workers.emplace_back([this]() { MakeGrpcCall(); });
138 }
139 for (int i = 0; i < kNumParallelChannels; ++i) {
140 workers[i].join();
141 }
142 }
143
144 int port_;
145 std::unique_ptr<Server> server_;
146 string server_address_;
147 GrpcCallbackServiceImpl grpc_service_;
148 std::string payload_;
149 };
150
TEST_F(End2EndResourceQuotaUnaryTest,MultipleUnaryRPCTest)151 TEST_F(End2EndResourceQuotaUnaryTest, MultipleUnaryRPCTest) { MakeGrpcCalls(); }
152
153 class End2EndConnectionQuotaTest : public ::testing::TestWithParam<int> {
154 protected:
End2EndConnectionQuotaTest()155 End2EndConnectionQuotaTest() {
156 port_ = grpc_pick_unused_port_or_die();
157 server_address_ = absl::StrCat("[::]:", port_);
158 connect_address_ = absl::StrCat("ipv6:[::1]:", port_);
159 payload_ = std::string(kPayloadSizeBytes, 'a');
160 ServerBuilder builder;
161 builder.AddListeningPort(
162 server_address_,
163 std::make_shared<SecureServerCredentials>(
164 grpc_fake_transport_security_server_credentials_create()));
165 builder.AddChannelArgument(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS, 1000);
166 builder.AddChannelArgument(GRPC_ARG_MAX_ALLOWED_INCOMING_CONNECTIONS,
167 GetParam());
168 builder.AddChannelArgument(
169 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 10000);
170 builder.RegisterService(&grpc_service_);
171 server_ = builder.BuildAndStart();
172 }
173
~End2EndConnectionQuotaTest()174 ~End2EndConnectionQuotaTest() override { server_->Shutdown(); }
175
CreateGrpcChannelStub()176 std::unique_ptr<EchoTestService::Stub> CreateGrpcChannelStub() {
177 grpc::ChannelArguments args;
178 args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
179 args.SetInt(GRPC_ARG_ENABLE_RETRIES, 0);
180 args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 20000);
181 args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10000);
182 args.SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, 15000);
183 args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
184
185 return EchoTestService::NewStub(CreateCustomChannel(
186 connect_address_,
187 std::make_shared<FakeTransportSecurityChannelCredentials>(), args));
188 }
189
TestExceedingConnectionQuota()190 void TestExceedingConnectionQuota() {
191 const int kNumConnections = 2 * GetParam();
192 #ifdef GPR_LINUX
193 // On linux systems create 2 * NumConnection tcp connections which don't
194 // do anything and verify that they get closed after
195 // GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS seconds.
196 auto connect_address_resolved =
197 grpc_event_engine::experimental::URIToResolvedAddress(connect_address_);
198 std::vector<std::thread> workers;
199 workers.reserve(kNumConnections);
200 for (int i = 0; i < kNumConnections; ++i) {
201 workers.emplace_back([connect_address_resolved]() {
202 int client_fd;
203 int one = 1;
204 char buf[1024];
205 client_fd = socket(AF_INET6, SOCK_STREAM, 0);
206 setsockopt(client_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
207 // Connection should succeed.
208 EXPECT_EQ(connect(client_fd,
209 const_cast<struct sockaddr*>(
210 connect_address_resolved->address()),
211 connect_address_resolved->size()),
212 0);
213 // recv should not block forever and it should return because
214 // GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS is set and the server should
215 // close this connections after that timeout expires.
216 while (recv(client_fd, buf, 1024, 0) > 0) {
217 }
218 close(client_fd);
219 });
220 }
221 for (int i = 0; i < kNumConnections; ++i) {
222 workers[i].join();
223 }
224 #endif
225 // Subsequent kNumConnections / 2 RPCs should succeed because the previously
226 // spawned client connections have been closed.
227 std::vector<std::unique_ptr<EchoTestService::Stub>> stubs;
228 stubs.reserve(kNumConnections);
229 for (int i = 0; i < kNumConnections; i++) {
230 stubs.push_back(CreateGrpcChannelStub());
231 }
232 for (int i = 0; i < kNumConnections; ++i) {
233 ClientContext ctx;
234 Status status;
235 ctx.set_wait_for_ready(false);
236 EchoClientUnaryReactor reactor(&ctx, stubs[i].get(), payload_, &status);
237 reactor.Await();
238 // The first half RPCs should succeed.
239 if (i < kNumConnections / 2) {
240 EXPECT_TRUE(status.ok());
241
242 } else {
243 // The second half should fail because they would attempt to create a
244 // new connection and fail since it would exceed the connection quota
245 // limit set at the server.
246 EXPECT_FALSE(status.ok());
247 }
248 }
249 }
250
251 int port_;
252 std::unique_ptr<Server> server_;
253 string server_address_;
254 string connect_address_;
255 GrpcCallbackServiceImpl grpc_service_;
256 std::string payload_;
257 };
258
TEST_P(End2EndConnectionQuotaTest,ConnectionQuotaTest)259 TEST_P(End2EndConnectionQuotaTest, ConnectionQuotaTest) {
260 TestExceedingConnectionQuota();
261 }
262
263 INSTANTIATE_TEST_SUITE_P(ConnectionQuotaParamTest, End2EndConnectionQuotaTest,
264 ::testing::ValuesIn<int>({10, 100}));
265
266 } // namespace testing
267 } // namespace grpc
268
main(int argc,char ** argv)269 int main(int argc, char** argv) {
270 ::testing::InitGoogleTest(&argc, argv);
271 return RUN_ALL_TESTS();
272 }
273