• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <errno.h>
15 #include <fcntl.h>
16 #include <poll.h>
17 #include <stdlib.h>
18 #include <sys/socket.h>
19 #include <unistd.h>
20 
21 #include <algorithm>
22 #include <chrono>
23 #include <cstring>
24 #include <memory>
25 #include <string>
26 #include <utility>
27 #include <vector>
28 
29 #include "absl/memory/memory.h"
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/str_format.h"
34 #include "gtest/gtest.h"
35 
36 #include <grpc/event_engine/event_engine.h>
37 #include <grpc/grpc.h>
38 #include <grpc/impl/channel_arg_names.h>
39 #include <grpc/support/log.h>
40 
41 #include "src/core/lib/channel/channel_args.h"
42 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
43 #include "src/core/lib/event_engine/posix_engine/posix_engine.h"
44 #include "src/core/lib/event_engine/tcp_socket_utils.h"
45 #include "src/core/lib/gprpp/crash.h"
46 #include "src/core/lib/gprpp/notification.h"
47 #include "src/core/lib/resource_quota/memory_quota.h"
48 #include "src/core/lib/resource_quota/resource_quota.h"
49 #include "test/core/event_engine/event_engine_test_utils.h"
50 #include "test/core/util/port.h"
51 #include "test/core/util/test_config.h"
52 
53 namespace grpc_event_engine {
54 namespace experimental {
55 
56 using namespace std::chrono_literals;
57 
58 namespace {
59 
60 // Creates a server socket listening for one connection on a specific port. It
61 // then creates another client socket connected to the server socket. This fills
62 // up the kernel listen queue on the server socket. Any subsequent attempts to
63 // connect to the server socket will be pending indefinitely. This can be used
64 // to test Connection timeouts and cancellation attempts.
CreateConnectedSockets(EventEngine::ResolvedAddress resolved_addr)65 std::vector<int> CreateConnectedSockets(
66     EventEngine::ResolvedAddress resolved_addr) {
67   int server_socket;
68   int opt = -1;
69   int client_socket;
70   int one = 1;
71   int flags;
72   std::vector<int> ret_sockets;
73   // Creating a new socket file descriptor.
74   if ((server_socket = socket(AF_INET6, SOCK_STREAM, 0)) <= 0) {
75     grpc_core::Crash(
76         absl::StrFormat("Error creating socket: %s", std::strerror(errno)));
77   }
78   // MacOS builds fail if SO_REUSEADDR and SO_REUSEPORT are set in the same
79   // setsockopt syscall. So they are set separately one after the other.
80   if (setsockopt(server_socket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
81     grpc_core::Crash(absl::StrFormat("Error setsockopt(SO_REUSEADDR): %s",
82                                      std::strerror(errno)));
83   }
84   if (setsockopt(server_socket, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt))) {
85     grpc_core::Crash(absl::StrFormat("Error setsockopt(SO_REUSEPORT): %s",
86                                      std::strerror(errno)));
87   }
88 
89   // Bind the new socket to server address.
90   if (bind(server_socket, resolved_addr.address(), resolved_addr.size()) < 0) {
91     grpc_core::Crash(absl::StrFormat("Error bind: %s", std::strerror(errno)));
92   }
93   // Set the new socket to listen for one active connection at a time.
94   // accept() is intentionally not called on the socket. This allows the
95   // connection queue to build up.
96   if (listen(server_socket, 1) < 0) {
97     grpc_core::Crash(absl::StrFormat("Error listen: %s", std::strerror(errno)));
98   }
99   ret_sockets.push_back(server_socket);
100   // Create and connect client sockets until the connection attempt times out.
101   // Even if the backlog specified to listen is 1, the kernel continues to
102   // accept a certain number of SYN packets before dropping them. This loop
103   // attempts to identify the number of new connection attempts that will
104   // be allowed by the kernel before any subsequent connection attempts
105   // become pending indefinitely.
106   while (true) {
107     client_socket = socket(AF_INET6, SOCK_STREAM, 0);
108     setsockopt(client_socket, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
109     // Make fd non-blocking.
110     flags = fcntl(client_socket, F_GETFL, 0);
111     EXPECT_EQ(fcntl(client_socket, F_SETFL, flags | O_NONBLOCK), 0);
112 
113     if (connect(client_socket,
114                 const_cast<struct sockaddr*>(resolved_addr.address()),
115                 resolved_addr.size()) == -1) {
116       if (errno == EINPROGRESS) {
117         struct pollfd pfd;
118         pfd.fd = client_socket;
119         pfd.events = POLLOUT;
120         pfd.revents = 0;
121         int ret = poll(&pfd, 1, 1000);
122         if (ret == -1) {
123           gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno);
124           abort();
125         } else if (ret == 0) {
126           // current connection attempt timed out. It indicates that the
127           // kernel will cause any subsequent connection attempts to
128           // become pending indefinitely.
129           ret_sockets.push_back(client_socket);
130           return ret_sockets;
131         }
132       } else {
133         grpc_core::Crash(absl::StrFormat(
134             "Failed to connect to the server (errno=%d)", errno));
135       }
136     }
137     ret_sockets.push_back(client_socket);
138   }
139   return ret_sockets;
140 }
141 
142 }  // namespace
143 
TEST(PosixEventEngineTest,IndefiniteConnectTimeoutOrRstTest)144 TEST(PosixEventEngineTest, IndefiniteConnectTimeoutOrRstTest) {
145   std::string target_addr = absl::StrCat(
146       "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
147   auto resolved_addr = URIToResolvedAddress(target_addr);
148   GPR_ASSERT(resolved_addr.ok());
149   std::shared_ptr<EventEngine> posix_ee = std::make_shared<PosixEventEngine>();
150   std::string resolved_addr_str =
151       ResolvedAddressToNormalizedString(*resolved_addr).value();
152   auto sockets = CreateConnectedSockets(*resolved_addr);
153   grpc_core::Notification signal;
154   grpc_core::ChannelArgs args;
155   auto quota = grpc_core::ResourceQuota::Default();
156   args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
157   ChannelArgsEndpointConfig config(args);
158   auto memory_quota = absl::make_unique<grpc_core::MemoryQuota>("bar");
159   posix_ee->Connect(
160       [&signal](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> status) {
161         EXPECT_EQ(status.status().code(), absl::StatusCode::kUnknown);
162         signal.Notify();
163       },
164       *resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-1"),
165       3s);
166   signal.WaitForNotification();
167   for (auto sock : sockets) {
168     close(sock);
169   }
170   WaitForSingleOwner(std::move(posix_ee));
171 }
172 
TEST(PosixEventEngineTest,IndefiniteConnectCancellationTest)173 TEST(PosixEventEngineTest, IndefiniteConnectCancellationTest) {
174   std::string target_addr = absl::StrCat(
175       "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
176   auto resolved_addr = URIToResolvedAddress(target_addr);
177   GPR_ASSERT(resolved_addr.ok());
178   std::shared_ptr<EventEngine> posix_ee = std::make_shared<PosixEventEngine>();
179   std::string resolved_addr_str =
180       ResolvedAddressToNormalizedString(*resolved_addr).value();
181   auto sockets = CreateConnectedSockets(*resolved_addr);
182   grpc_core::ChannelArgs args;
183   auto quota = grpc_core::ResourceQuota::Default();
184   args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
185   ChannelArgsEndpointConfig config(args);
186   auto memory_quota = absl::make_unique<grpc_core::MemoryQuota>("bar");
187   auto connection_handle = posix_ee->Connect(
188       [](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> /*status*/) {
189         FAIL() << "The on_connect callback should not have run since the "
190                   "connection attempt was cancelled.";
191       },
192       *resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-2"),
193       3s);
194   if (connection_handle.keys[0] > 0) {
195     ASSERT_TRUE(posix_ee->CancelConnect(connection_handle));
196   }
197   for (auto sock : sockets) {
198     close(sock);
199   }
200   WaitForSingleOwner(std::move(posix_ee));
201 }
202 
203 }  // namespace experimental
204 }  // namespace grpc_event_engine
205 
main(int argc,char ** argv)206 int main(int argc, char** argv) {
207   grpc::testing::TestEnvironment env(&argc, argv);
208   ::testing::InitGoogleTest(&argc, argv);
209   grpc_init();
210   int ret = RUN_ALL_TESTS();
211   grpc_shutdown();
212   return ret;
213 }
214