• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <errno.h>
15 #include <fcntl.h>
16 #include <grpc/event_engine/event_engine.h>
17 #include <grpc/grpc.h>
18 #include <grpc/impl/channel_arg_names.h>
19 #include <poll.h>
20 #include <stdlib.h>
21 #include <sys/socket.h>
22 #include <unistd.h>
23 
24 #include <algorithm>
25 #include <chrono>
26 #include <cstring>
27 #include <memory>
28 #include <string>
29 #include <utility>
30 #include <vector>
31 
32 #include "absl/log/check.h"
33 #include "absl/log/log.h"
34 #include "absl/memory/memory.h"
35 #include "absl/status/status.h"
36 #include "absl/status/statusor.h"
37 #include "absl/strings/str_cat.h"
38 #include "absl/strings/str_format.h"
39 #include "gtest/gtest.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
42 #include "src/core/lib/event_engine/posix_engine/posix_engine.h"
43 #include "src/core/lib/event_engine/tcp_socket_utils.h"
44 #include "src/core/lib/resource_quota/memory_quota.h"
45 #include "src/core/lib/resource_quota/resource_quota.h"
46 #include "src/core/util/crash.h"
47 #include "src/core/util/notification.h"
48 #include "test/core/event_engine/event_engine_test_utils.h"
49 #include "test/core/test_util/port.h"
50 #include "test/core/test_util/test_config.h"
51 
52 namespace grpc_event_engine {
53 namespace experimental {
54 
55 using namespace std::chrono_literals;
56 
57 namespace {
58 
59 // Creates a server socket listening for one connection on a specific port. It
60 // then creates another client socket connected to the server socket. This fills
61 // up the kernel listen queue on the server socket. Any subsequent attempts to
62 // connect to the server socket will be pending indefinitely. This can be used
63 // to test Connection timeouts and cancellation attempts.
CreateConnectedSockets(EventEngine::ResolvedAddress resolved_addr)64 std::vector<int> CreateConnectedSockets(
65     EventEngine::ResolvedAddress resolved_addr) {
66   int server_socket;
67   int opt = -1;
68   int client_socket;
69   int one = 1;
70   int flags;
71   std::vector<int> ret_sockets;
72   // Creating a new socket file descriptor.
73   if ((server_socket = socket(AF_INET6, SOCK_STREAM, 0)) <= 0) {
74     grpc_core::Crash(
75         absl::StrFormat("Error creating socket: %s", std::strerror(errno)));
76   }
77   // MacOS builds fail if SO_REUSEADDR and SO_REUSEPORT are set in the same
78   // setsockopt syscall. So they are set separately one after the other.
79   if (setsockopt(server_socket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
80     grpc_core::Crash(absl::StrFormat("Error setsockopt(SO_REUSEADDR): %s",
81                                      std::strerror(errno)));
82   }
83   if (setsockopt(server_socket, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt))) {
84     grpc_core::Crash(absl::StrFormat("Error setsockopt(SO_REUSEPORT): %s",
85                                      std::strerror(errno)));
86   }
87 
88   // Bind the new socket to server address.
89   if (bind(server_socket, resolved_addr.address(), resolved_addr.size()) < 0) {
90     grpc_core::Crash(absl::StrFormat("Error bind: %s", std::strerror(errno)));
91   }
92   // Set the new socket to listen for one active connection at a time.
93   // accept() is intentionally not called on the socket. This allows the
94   // connection queue to build up.
95   if (listen(server_socket, 1) < 0) {
96     grpc_core::Crash(absl::StrFormat("Error listen: %s", std::strerror(errno)));
97   }
98   ret_sockets.push_back(server_socket);
99   // Create and connect client sockets until the connection attempt times out.
100   // Even if the backlog specified to listen is 1, the kernel continues to
101   // accept a certain number of SYN packets before dropping them. This loop
102   // attempts to identify the number of new connection attempts that will
103   // be allowed by the kernel before any subsequent connection attempts
104   // become pending indefinitely.
105   while (true) {
106     client_socket = socket(AF_INET6, SOCK_STREAM, 0);
107     setsockopt(client_socket, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
108     // Make fd non-blocking.
109     flags = fcntl(client_socket, F_GETFL, 0);
110     EXPECT_EQ(fcntl(client_socket, F_SETFL, flags | O_NONBLOCK), 0);
111 
112     if (connect(client_socket,
113                 const_cast<struct sockaddr*>(resolved_addr.address()),
114                 resolved_addr.size()) == -1) {
115       if (errno == EINPROGRESS) {
116         struct pollfd pfd;
117         pfd.fd = client_socket;
118         pfd.events = POLLOUT;
119         pfd.revents = 0;
120         int ret = poll(&pfd, 1, 1000);
121         if (ret == -1) {
122           LOG(ERROR) << "poll() failed during connect; errno=" << errno;
123           abort();
124         } else if (ret == 0) {
125           // current connection attempt timed out. It indicates that the
126           // kernel will cause any subsequent connection attempts to
127           // become pending indefinitely.
128           ret_sockets.push_back(client_socket);
129           return ret_sockets;
130         }
131       } else {
132         grpc_core::Crash(absl::StrFormat(
133             "Failed to connect to the server (errno=%d)", errno));
134       }
135     }
136     ret_sockets.push_back(client_socket);
137   }
138   return ret_sockets;
139 }
140 
141 }  // namespace
142 
TEST(PosixEventEngineTest,IndefiniteConnectTimeoutOrRstTest)143 TEST(PosixEventEngineTest, IndefiniteConnectTimeoutOrRstTest) {
144   std::string target_addr = absl::StrCat(
145       "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
146   auto resolved_addr = URIToResolvedAddress(target_addr);
147   CHECK_OK(resolved_addr);
148   std::shared_ptr<EventEngine> posix_ee = std::make_shared<PosixEventEngine>();
149   std::string resolved_addr_str =
150       ResolvedAddressToNormalizedString(*resolved_addr).value();
151   auto sockets = CreateConnectedSockets(*resolved_addr);
152   grpc_core::Notification signal;
153   grpc_core::ChannelArgs args;
154   auto quota = grpc_core::ResourceQuota::Default();
155   args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
156   ChannelArgsEndpointConfig config(args);
157   auto memory_quota = absl::make_unique<grpc_core::MemoryQuota>("bar");
158   posix_ee->Connect(
159       [&signal](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> status) {
160         EXPECT_EQ(status.status().code(), absl::StatusCode::kUnknown);
161         signal.Notify();
162       },
163       *resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-1"),
164       3s);
165   signal.WaitForNotification();
166   for (auto sock : sockets) {
167     close(sock);
168   }
169   WaitForSingleOwner(std::move(posix_ee));
170 }
171 
TEST(PosixEventEngineTest,IndefiniteConnectCancellationTest)172 TEST(PosixEventEngineTest, IndefiniteConnectCancellationTest) {
173   std::string target_addr = absl::StrCat(
174       "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
175   auto resolved_addr = URIToResolvedAddress(target_addr);
176   CHECK_OK(resolved_addr);
177   std::shared_ptr<EventEngine> posix_ee = std::make_shared<PosixEventEngine>();
178   std::string resolved_addr_str =
179       ResolvedAddressToNormalizedString(*resolved_addr).value();
180   auto sockets = CreateConnectedSockets(*resolved_addr);
181   grpc_core::ChannelArgs args;
182   auto quota = grpc_core::ResourceQuota::Default();
183   args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
184   ChannelArgsEndpointConfig config(args);
185   auto memory_quota = absl::make_unique<grpc_core::MemoryQuota>("bar");
186   auto connection_handle = posix_ee->Connect(
187       [](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> /*status*/) {
188         FAIL() << "The on_connect callback should not have run since the "
189                   "connection attempt was cancelled.";
190       },
191       *resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-2"),
192       3s);
193   if (connection_handle.keys[0] > 0) {
194     ASSERT_TRUE(posix_ee->CancelConnect(connection_handle));
195   }
196   for (auto sock : sockets) {
197     close(sock);
198   }
199   WaitForSingleOwner(std::move(posix_ee));
200 }
201 
202 }  // namespace experimental
203 }  // namespace grpc_event_engine
204 
main(int argc,char ** argv)205 int main(int argc, char** argv) {
206   grpc::testing::TestEnvironment env(&argc, argv);
207   ::testing::InitGoogleTest(&argc, argv);
208   grpc_init();
209   int ret = RUN_ALL_TESTS();
210   grpc_shutdown();
211   return ret;
212 }
213