• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/grpc.h>
16 #include <grpc/support/time.h>
17 #include <grpcpp/client_context.h>
18 #include <grpcpp/grpcpp.h>
19 #include <grpcpp/support/server_callback.h>
20 #include <gtest/gtest.h>
21 
22 #include <memory>
23 #include <string>
24 #include <thread>
25 #include <utility>
26 #include <vector>
27 
28 #include "absl/status/statusor.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/time/time.h"
31 #include "src/core/lib/event_engine/tcp_socket_utils.h"
32 #include "src/core/lib/experiments/config.h"
33 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
34 #include "src/core/util/notification.h"
35 #include "src/cpp/server/secure_server_credentials.h"
36 #include "src/proto/grpc/testing/echo.grpc.pb.h"
37 #include "test/core/test_util/port.h"
38 #include "test/core/test_util/test_config.h"
39 #include "test/cpp/util/credentials.h"
40 
41 // IWYU pragma: no_include <sys/socket.h>
42 
43 // A stress test which spins up a server with a small configured resource quota
44 // value. It then creates many channels which exchange large payloads with the
45 // server. This would drive the server to reach resource quota limits and
46 // trigger reclamation.
47 
48 namespace grpc {
49 namespace testing {
50 namespace {
51 constexpr int kResourceQuotaSizeBytes = 1024 * 1024;
52 constexpr int kPayloadSizeBytes = 1024 * 1024;
53 constexpr int kNumParallelChannels = 10;
54 }  // namespace
55 
56 class EchoClientUnaryReactor : public grpc::ClientUnaryReactor {
57  public:
EchoClientUnaryReactor(ClientContext * ctx,EchoTestService::Stub * stub,const std::string payload,Status * status)58   EchoClientUnaryReactor(ClientContext* ctx, EchoTestService::Stub* stub,
59                          const std::string payload, Status* status)
60       : ctx_(ctx), payload_(payload), status_(status) {
61     request_.set_message(payload);
62     stub->async()->Echo(ctx_, &request_, &response_, this);
63     StartCall();
64   }
65 
Await()66   void Await() { notification_.WaitForNotification(); }
67 
68  protected:
OnReadInitialMetadataDone(bool)69   void OnReadInitialMetadataDone(bool /*ok*/) override {}
70 
OnDone(const Status & s)71   void OnDone(const Status& s) override {
72     *status_ = s;
73     notification_.Notify();
74   }
75 
76  private:
77   ClientContext* const ctx_;
78   EchoRequest request_;
79   EchoResponse response_;
80   const std::string payload_;
81   grpc_core::Notification notification_;
82   Status* const status_;
83 };
84 
85 class EchoServerUnaryReactor : public ServerUnaryReactor {
86  public:
EchoServerUnaryReactor(CallbackServerContext *,const EchoRequest * request,EchoResponse * response)87   EchoServerUnaryReactor(CallbackServerContext* /*ctx*/,
88                          const EchoRequest* request, EchoResponse* response) {
89     response->set_message(request->message());
90     Finish(grpc::Status::OK);
91   }
92 
93  private:
OnDone()94   void OnDone() override { delete this; }
95 };
96 
97 class GrpcCallbackServiceImpl : public EchoTestService::CallbackService {
98  public:
Echo(CallbackServerContext * context,const EchoRequest * request,EchoResponse * response)99   ServerUnaryReactor* Echo(CallbackServerContext* context,
100                            const EchoRequest* request,
101                            EchoResponse* response) override {
102     return new EchoServerUnaryReactor(context, request, response);
103   }
104 };
105 
106 class End2EndResourceQuotaUnaryTest : public ::testing::Test {
107  protected:
End2EndResourceQuotaUnaryTest()108   End2EndResourceQuotaUnaryTest() {
109     int port = grpc_pick_unused_port_or_die();
110     server_address_ = absl::StrCat("localhost:", port);
111     payload_ = std::string(kPayloadSizeBytes, 'a');
112     ServerBuilder builder;
113     builder.AddListeningPort(server_address_, InsecureServerCredentials());
114     builder.SetResourceQuota(
115         grpc::ResourceQuota("TestService").Resize(kResourceQuotaSizeBytes));
116     builder.RegisterService(&grpc_service_);
117     server_ = builder.BuildAndStart();
118   }
119 
~End2EndResourceQuotaUnaryTest()120   ~End2EndResourceQuotaUnaryTest() override { server_->Shutdown(); }
121 
MakeGrpcCall()122   void MakeGrpcCall() {
123     ClientContext ctx;
124     Status status;
125     auto stub = EchoTestService::NewStub(
126         CreateChannel(server_address_, grpc::InsecureChannelCredentials()));
127     ctx.set_wait_for_ready(false);
128     EchoClientUnaryReactor reactor(&ctx, stub.get(), payload_, &status);
129     reactor.Await();
130   }
131 
MakeGrpcCalls()132   void MakeGrpcCalls() {
133     std::vector<std::thread> workers;
134     workers.reserve(kNumParallelChannels);
135     // Run MakeGrpcCall() many times concurrently.
136     for (int i = 0; i < kNumParallelChannels; ++i) {
137       workers.emplace_back([this]() { MakeGrpcCall(); });
138     }
139     for (int i = 0; i < kNumParallelChannels; ++i) {
140       workers[i].join();
141     }
142   }
143 
144   int port_;
145   std::unique_ptr<Server> server_;
146   string server_address_;
147   GrpcCallbackServiceImpl grpc_service_;
148   std::string payload_;
149 };
150 
TEST_F(End2EndResourceQuotaUnaryTest,MultipleUnaryRPCTest)151 TEST_F(End2EndResourceQuotaUnaryTest, MultipleUnaryRPCTest) { MakeGrpcCalls(); }
152 
153 class End2EndConnectionQuotaTest : public ::testing::TestWithParam<int> {
154  protected:
End2EndConnectionQuotaTest()155   End2EndConnectionQuotaTest() {
156     port_ = grpc_pick_unused_port_or_die();
157     server_address_ = absl::StrCat("[::]:", port_);
158     connect_address_ = absl::StrCat("ipv6:[::1]:", port_);
159     payload_ = std::string(kPayloadSizeBytes, 'a');
160     ServerBuilder builder;
161     builder.AddListeningPort(
162         server_address_,
163         std::make_shared<SecureServerCredentials>(
164             grpc_fake_transport_security_server_credentials_create()));
165     builder.AddChannelArgument(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS, 1000);
166     builder.AddChannelArgument(GRPC_ARG_MAX_ALLOWED_INCOMING_CONNECTIONS,
167                                GetParam());
168     builder.AddChannelArgument(
169         GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 10000);
170     builder.RegisterService(&grpc_service_);
171     server_ = builder.BuildAndStart();
172   }
173 
~End2EndConnectionQuotaTest()174   ~End2EndConnectionQuotaTest() override { server_->Shutdown(); }
175 
CreateGrpcChannelStub()176   std::unique_ptr<EchoTestService::Stub> CreateGrpcChannelStub() {
177     grpc::ChannelArguments args;
178     args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
179     args.SetInt(GRPC_ARG_ENABLE_RETRIES, 0);
180     args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 20000);
181     args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10000);
182     args.SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, 15000);
183     args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
184 
185     return EchoTestService::NewStub(CreateCustomChannel(
186         connect_address_,
187         std::make_shared<FakeTransportSecurityChannelCredentials>(), args));
188   }
189 
TestExceedingConnectionQuota()190   void TestExceedingConnectionQuota() {
191     const int kNumConnections = 2 * GetParam();
192 #ifdef GPR_LINUX
193     // On linux systems create 2 * NumConnection tcp connections which don't
194     // do anything and verify that they get closed after
195     // GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS seconds.
196     auto connect_address_resolved =
197         grpc_event_engine::experimental::URIToResolvedAddress(connect_address_);
198     std::vector<std::thread> workers;
199     workers.reserve(kNumConnections);
200     for (int i = 0; i < kNumConnections; ++i) {
201       workers.emplace_back([connect_address_resolved]() {
202         int client_fd;
203         int one = 1;
204         char buf[1024];
205         client_fd = socket(AF_INET6, SOCK_STREAM, 0);
206         setsockopt(client_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
207         // Connection should succeed.
208         EXPECT_EQ(connect(client_fd,
209                           const_cast<struct sockaddr*>(
210                               connect_address_resolved->address()),
211                           connect_address_resolved->size()),
212                   0);
213         // recv should not block forever and it should return because
214         // GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS is set and the server should
215         // close this connections after that timeout expires.
216         while (recv(client_fd, buf, 1024, 0) > 0) {
217         }
218         close(client_fd);
219       });
220     }
221     for (int i = 0; i < kNumConnections; ++i) {
222       workers[i].join();
223     }
224 #endif
225     // Subsequent kNumConnections / 2 RPCs should succeed because the previously
226     // spawned client connections have been closed.
227     std::vector<std::unique_ptr<EchoTestService::Stub>> stubs;
228     stubs.reserve(kNumConnections);
229     for (int i = 0; i < kNumConnections; i++) {
230       stubs.push_back(CreateGrpcChannelStub());
231     }
232     for (int i = 0; i < kNumConnections; ++i) {
233       ClientContext ctx;
234       Status status;
235       ctx.set_wait_for_ready(false);
236       EchoClientUnaryReactor reactor(&ctx, stubs[i].get(), payload_, &status);
237       reactor.Await();
238       // The first half RPCs should succeed.
239       if (i < kNumConnections / 2) {
240         EXPECT_TRUE(status.ok());
241 
242       } else {
243         // The second half should fail because they would attempt to create a
244         // new connection and fail since it would exceed the connection quota
245         // limit set at the server.
246         EXPECT_FALSE(status.ok());
247       }
248     }
249   }
250 
251   int port_;
252   std::unique_ptr<Server> server_;
253   string server_address_;
254   string connect_address_;
255   GrpcCallbackServiceImpl grpc_service_;
256   std::string payload_;
257 };
258 
TEST_P(End2EndConnectionQuotaTest,ConnectionQuotaTest)259 TEST_P(End2EndConnectionQuotaTest, ConnectionQuotaTest) {
260   TestExceedingConnectionQuota();
261 }
262 
263 INSTANTIATE_TEST_SUITE_P(ConnectionQuotaParamTest, End2EndConnectionQuotaTest,
264                          ::testing::ValuesIn<int>({10, 100}));
265 
266 }  // namespace testing
267 }  // namespace grpc
268 
main(int argc,char ** argv)269 int main(int argc, char** argv) {
270   ::testing::InitGoogleTest(&argc, argv);
271   return RUN_ALL_TESTS();
272 }
273