1 // Copyright 2022 gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <errno.h>
15 #include <fcntl.h>
16 #include <poll.h>
17 #include <stdlib.h>
18 #include <sys/socket.h>
19 #include <unistd.h>
20
21 #include <algorithm>
22 #include <chrono>
23 #include <cstring>
24 #include <memory>
25 #include <string>
26 #include <utility>
27 #include <vector>
28
29 #include "absl/memory/memory.h"
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/str_format.h"
34 #include "gtest/gtest.h"
35
36 #include <grpc/event_engine/event_engine.h>
37 #include <grpc/grpc.h>
38 #include <grpc/impl/channel_arg_names.h>
39 #include <grpc/support/log.h>
40
41 #include "src/core/lib/channel/channel_args.h"
42 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
43 #include "src/core/lib/event_engine/posix_engine/posix_engine.h"
44 #include "src/core/lib/event_engine/tcp_socket_utils.h"
45 #include "src/core/lib/gprpp/crash.h"
46 #include "src/core/lib/gprpp/notification.h"
47 #include "src/core/lib/resource_quota/memory_quota.h"
48 #include "src/core/lib/resource_quota/resource_quota.h"
49 #include "test/core/event_engine/event_engine_test_utils.h"
50 #include "test/core/util/port.h"
51 #include "test/core/util/test_config.h"
52
53 namespace grpc_event_engine {
54 namespace experimental {
55
56 using namespace std::chrono_literals;
57
58 namespace {
59
60 // Creates a server socket listening for one connection on a specific port. It
61 // then creates another client socket connected to the server socket. This fills
62 // up the kernel listen queue on the server socket. Any subsequent attempts to
63 // connect to the server socket will be pending indefinitely. This can be used
64 // to test Connection timeouts and cancellation attempts.
CreateConnectedSockets(EventEngine::ResolvedAddress resolved_addr)65 std::vector<int> CreateConnectedSockets(
66 EventEngine::ResolvedAddress resolved_addr) {
67 int server_socket;
68 int opt = -1;
69 int client_socket;
70 int one = 1;
71 int flags;
72 std::vector<int> ret_sockets;
73 // Creating a new socket file descriptor.
74 if ((server_socket = socket(AF_INET6, SOCK_STREAM, 0)) <= 0) {
75 grpc_core::Crash(
76 absl::StrFormat("Error creating socket: %s", std::strerror(errno)));
77 }
78 // MacOS builds fail if SO_REUSEADDR and SO_REUSEPORT are set in the same
79 // setsockopt syscall. So they are set separately one after the other.
80 if (setsockopt(server_socket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
81 grpc_core::Crash(absl::StrFormat("Error setsockopt(SO_REUSEADDR): %s",
82 std::strerror(errno)));
83 }
84 if (setsockopt(server_socket, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt))) {
85 grpc_core::Crash(absl::StrFormat("Error setsockopt(SO_REUSEPORT): %s",
86 std::strerror(errno)));
87 }
88
89 // Bind the new socket to server address.
90 if (bind(server_socket, resolved_addr.address(), resolved_addr.size()) < 0) {
91 grpc_core::Crash(absl::StrFormat("Error bind: %s", std::strerror(errno)));
92 }
93 // Set the new socket to listen for one active connection at a time.
94 // accept() is intentionally not called on the socket. This allows the
95 // connection queue to build up.
96 if (listen(server_socket, 1) < 0) {
97 grpc_core::Crash(absl::StrFormat("Error listen: %s", std::strerror(errno)));
98 }
99 ret_sockets.push_back(server_socket);
100 // Create and connect client sockets until the connection attempt times out.
101 // Even if the backlog specified to listen is 1, the kernel continues to
102 // accept a certain number of SYN packets before dropping them. This loop
103 // attempts to identify the number of new connection attempts that will
104 // be allowed by the kernel before any subsequent connection attempts
105 // become pending indefinitely.
106 while (true) {
107 client_socket = socket(AF_INET6, SOCK_STREAM, 0);
108 setsockopt(client_socket, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
109 // Make fd non-blocking.
110 flags = fcntl(client_socket, F_GETFL, 0);
111 EXPECT_EQ(fcntl(client_socket, F_SETFL, flags | O_NONBLOCK), 0);
112
113 if (connect(client_socket,
114 const_cast<struct sockaddr*>(resolved_addr.address()),
115 resolved_addr.size()) == -1) {
116 if (errno == EINPROGRESS) {
117 struct pollfd pfd;
118 pfd.fd = client_socket;
119 pfd.events = POLLOUT;
120 pfd.revents = 0;
121 int ret = poll(&pfd, 1, 1000);
122 if (ret == -1) {
123 gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno);
124 abort();
125 } else if (ret == 0) {
126 // current connection attempt timed out. It indicates that the
127 // kernel will cause any subsequent connection attempts to
128 // become pending indefinitely.
129 ret_sockets.push_back(client_socket);
130 return ret_sockets;
131 }
132 } else {
133 grpc_core::Crash(absl::StrFormat(
134 "Failed to connect to the server (errno=%d)", errno));
135 }
136 }
137 ret_sockets.push_back(client_socket);
138 }
139 return ret_sockets;
140 }
141
142 } // namespace
143
TEST(PosixEventEngineTest,IndefiniteConnectTimeoutOrRstTest)144 TEST(PosixEventEngineTest, IndefiniteConnectTimeoutOrRstTest) {
145 std::string target_addr = absl::StrCat(
146 "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
147 auto resolved_addr = URIToResolvedAddress(target_addr);
148 GPR_ASSERT(resolved_addr.ok());
149 std::shared_ptr<EventEngine> posix_ee = std::make_shared<PosixEventEngine>();
150 std::string resolved_addr_str =
151 ResolvedAddressToNormalizedString(*resolved_addr).value();
152 auto sockets = CreateConnectedSockets(*resolved_addr);
153 grpc_core::Notification signal;
154 grpc_core::ChannelArgs args;
155 auto quota = grpc_core::ResourceQuota::Default();
156 args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
157 ChannelArgsEndpointConfig config(args);
158 auto memory_quota = absl::make_unique<grpc_core::MemoryQuota>("bar");
159 posix_ee->Connect(
160 [&signal](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> status) {
161 EXPECT_EQ(status.status().code(), absl::StatusCode::kUnknown);
162 signal.Notify();
163 },
164 *resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-1"),
165 3s);
166 signal.WaitForNotification();
167 for (auto sock : sockets) {
168 close(sock);
169 }
170 WaitForSingleOwner(std::move(posix_ee));
171 }
172
TEST(PosixEventEngineTest,IndefiniteConnectCancellationTest)173 TEST(PosixEventEngineTest, IndefiniteConnectCancellationTest) {
174 std::string target_addr = absl::StrCat(
175 "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
176 auto resolved_addr = URIToResolvedAddress(target_addr);
177 GPR_ASSERT(resolved_addr.ok());
178 std::shared_ptr<EventEngine> posix_ee = std::make_shared<PosixEventEngine>();
179 std::string resolved_addr_str =
180 ResolvedAddressToNormalizedString(*resolved_addr).value();
181 auto sockets = CreateConnectedSockets(*resolved_addr);
182 grpc_core::ChannelArgs args;
183 auto quota = grpc_core::ResourceQuota::Default();
184 args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
185 ChannelArgsEndpointConfig config(args);
186 auto memory_quota = absl::make_unique<grpc_core::MemoryQuota>("bar");
187 auto connection_handle = posix_ee->Connect(
188 [](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> /*status*/) {
189 FAIL() << "The on_connect callback should not have run since the "
190 "connection attempt was cancelled.";
191 },
192 *resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-2"),
193 3s);
194 if (connection_handle.keys[0] > 0) {
195 ASSERT_TRUE(posix_ee->CancelConnect(connection_handle));
196 }
197 for (auto sock : sockets) {
198 close(sock);
199 }
200 WaitForSingleOwner(std::move(posix_ee));
201 }
202
203 } // namespace experimental
204 } // namespace grpc_event_engine
205
main(int argc,char ** argv)206 int main(int argc, char** argv) {
207 grpc::testing::TestEnvironment env(&argc, argv);
208 ::testing::InitGoogleTest(&argc, argv);
209 grpc_init();
210 int ret = RUN_ALL_TESTS();
211 grpc_shutdown();
212 return ret;
213 }
214