1 // Copyright 2022 gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <errno.h>
15 #include <fcntl.h>
16 #include <grpc/event_engine/event_engine.h>
17 #include <grpc/grpc.h>
18 #include <grpc/impl/channel_arg_names.h>
19 #include <poll.h>
20 #include <stdlib.h>
21 #include <sys/socket.h>
22 #include <unistd.h>
23
24 #include <algorithm>
25 #include <chrono>
26 #include <cstring>
27 #include <memory>
28 #include <string>
29 #include <utility>
30 #include <vector>
31
32 #include "absl/log/check.h"
33 #include "absl/log/log.h"
34 #include "absl/memory/memory.h"
35 #include "absl/status/status.h"
36 #include "absl/status/statusor.h"
37 #include "absl/strings/str_cat.h"
38 #include "absl/strings/str_format.h"
39 #include "gtest/gtest.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
42 #include "src/core/lib/event_engine/posix_engine/posix_engine.h"
43 #include "src/core/lib/event_engine/tcp_socket_utils.h"
44 #include "src/core/lib/resource_quota/memory_quota.h"
45 #include "src/core/lib/resource_quota/resource_quota.h"
46 #include "src/core/util/crash.h"
47 #include "src/core/util/notification.h"
48 #include "test/core/event_engine/event_engine_test_utils.h"
49 #include "test/core/test_util/port.h"
50 #include "test/core/test_util/test_config.h"
51
52 namespace grpc_event_engine {
53 namespace experimental {
54
55 using namespace std::chrono_literals;
56
57 namespace {
58
59 // Creates a server socket listening for one connection on a specific port. It
60 // then creates another client socket connected to the server socket. This fills
61 // up the kernel listen queue on the server socket. Any subsequent attempts to
62 // connect to the server socket will be pending indefinitely. This can be used
63 // to test Connection timeouts and cancellation attempts.
CreateConnectedSockets(EventEngine::ResolvedAddress resolved_addr)64 std::vector<int> CreateConnectedSockets(
65 EventEngine::ResolvedAddress resolved_addr) {
66 int server_socket;
67 int opt = -1;
68 int client_socket;
69 int one = 1;
70 int flags;
71 std::vector<int> ret_sockets;
72 // Creating a new socket file descriptor.
73 if ((server_socket = socket(AF_INET6, SOCK_STREAM, 0)) <= 0) {
74 grpc_core::Crash(
75 absl::StrFormat("Error creating socket: %s", std::strerror(errno)));
76 }
77 // MacOS builds fail if SO_REUSEADDR and SO_REUSEPORT are set in the same
78 // setsockopt syscall. So they are set separately one after the other.
79 if (setsockopt(server_socket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
80 grpc_core::Crash(absl::StrFormat("Error setsockopt(SO_REUSEADDR): %s",
81 std::strerror(errno)));
82 }
83 if (setsockopt(server_socket, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt))) {
84 grpc_core::Crash(absl::StrFormat("Error setsockopt(SO_REUSEPORT): %s",
85 std::strerror(errno)));
86 }
87
88 // Bind the new socket to server address.
89 if (bind(server_socket, resolved_addr.address(), resolved_addr.size()) < 0) {
90 grpc_core::Crash(absl::StrFormat("Error bind: %s", std::strerror(errno)));
91 }
92 // Set the new socket to listen for one active connection at a time.
93 // accept() is intentionally not called on the socket. This allows the
94 // connection queue to build up.
95 if (listen(server_socket, 1) < 0) {
96 grpc_core::Crash(absl::StrFormat("Error listen: %s", std::strerror(errno)));
97 }
98 ret_sockets.push_back(server_socket);
99 // Create and connect client sockets until the connection attempt times out.
100 // Even if the backlog specified to listen is 1, the kernel continues to
101 // accept a certain number of SYN packets before dropping them. This loop
102 // attempts to identify the number of new connection attempts that will
103 // be allowed by the kernel before any subsequent connection attempts
104 // become pending indefinitely.
105 while (true) {
106 client_socket = socket(AF_INET6, SOCK_STREAM, 0);
107 setsockopt(client_socket, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
108 // Make fd non-blocking.
109 flags = fcntl(client_socket, F_GETFL, 0);
110 EXPECT_EQ(fcntl(client_socket, F_SETFL, flags | O_NONBLOCK), 0);
111
112 if (connect(client_socket,
113 const_cast<struct sockaddr*>(resolved_addr.address()),
114 resolved_addr.size()) == -1) {
115 if (errno == EINPROGRESS) {
116 struct pollfd pfd;
117 pfd.fd = client_socket;
118 pfd.events = POLLOUT;
119 pfd.revents = 0;
120 int ret = poll(&pfd, 1, 1000);
121 if (ret == -1) {
122 LOG(ERROR) << "poll() failed during connect; errno=" << errno;
123 abort();
124 } else if (ret == 0) {
125 // current connection attempt timed out. It indicates that the
126 // kernel will cause any subsequent connection attempts to
127 // become pending indefinitely.
128 ret_sockets.push_back(client_socket);
129 return ret_sockets;
130 }
131 } else {
132 grpc_core::Crash(absl::StrFormat(
133 "Failed to connect to the server (errno=%d)", errno));
134 }
135 }
136 ret_sockets.push_back(client_socket);
137 }
138 return ret_sockets;
139 }
140
141 } // namespace
142
TEST(PosixEventEngineTest,IndefiniteConnectTimeoutOrRstTest)143 TEST(PosixEventEngineTest, IndefiniteConnectTimeoutOrRstTest) {
144 std::string target_addr = absl::StrCat(
145 "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
146 auto resolved_addr = URIToResolvedAddress(target_addr);
147 CHECK_OK(resolved_addr);
148 std::shared_ptr<EventEngine> posix_ee = std::make_shared<PosixEventEngine>();
149 std::string resolved_addr_str =
150 ResolvedAddressToNormalizedString(*resolved_addr).value();
151 auto sockets = CreateConnectedSockets(*resolved_addr);
152 grpc_core::Notification signal;
153 grpc_core::ChannelArgs args;
154 auto quota = grpc_core::ResourceQuota::Default();
155 args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
156 ChannelArgsEndpointConfig config(args);
157 auto memory_quota = absl::make_unique<grpc_core::MemoryQuota>("bar");
158 posix_ee->Connect(
159 [&signal](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> status) {
160 EXPECT_EQ(status.status().code(), absl::StatusCode::kUnknown);
161 signal.Notify();
162 },
163 *resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-1"),
164 3s);
165 signal.WaitForNotification();
166 for (auto sock : sockets) {
167 close(sock);
168 }
169 WaitForSingleOwner(std::move(posix_ee));
170 }
171
TEST(PosixEventEngineTest,IndefiniteConnectCancellationTest)172 TEST(PosixEventEngineTest, IndefiniteConnectCancellationTest) {
173 std::string target_addr = absl::StrCat(
174 "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
175 auto resolved_addr = URIToResolvedAddress(target_addr);
176 CHECK_OK(resolved_addr);
177 std::shared_ptr<EventEngine> posix_ee = std::make_shared<PosixEventEngine>();
178 std::string resolved_addr_str =
179 ResolvedAddressToNormalizedString(*resolved_addr).value();
180 auto sockets = CreateConnectedSockets(*resolved_addr);
181 grpc_core::ChannelArgs args;
182 auto quota = grpc_core::ResourceQuota::Default();
183 args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
184 ChannelArgsEndpointConfig config(args);
185 auto memory_quota = absl::make_unique<grpc_core::MemoryQuota>("bar");
186 auto connection_handle = posix_ee->Connect(
187 [](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> /*status*/) {
188 FAIL() << "The on_connect callback should not have run since the "
189 "connection attempt was cancelled.";
190 },
191 *resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-2"),
192 3s);
193 if (connection_handle.keys[0] > 0) {
194 ASSERT_TRUE(posix_ee->CancelConnect(connection_handle));
195 }
196 for (auto sock : sockets) {
197 close(sock);
198 }
199 WaitForSingleOwner(std::move(posix_ee));
200 }
201
202 } // namespace experimental
203 } // namespace grpc_event_engine
204
main(int argc,char ** argv)205 int main(int argc, char** argv) {
206 grpc::testing::TestEnvironment env(&argc, argv);
207 ::testing::InitGoogleTest(&argc, argv);
208 grpc_init();
209 int ret = RUN_ALL_TESTS();
210 grpc_shutdown();
211 return ret;
212 }
213