1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "test/core/test_util/fake_udp_and_tcp_server.h"
18
19 #include <errno.h>
20 #include <grpc/support/port_platform.h>
21 #include <grpc/support/time.h>
22 #include <string.h>
23
24 #include <set>
25 #include <utility>
26 #include <vector>
27
28 #include "absl/log/check.h"
29 #include "absl/log/log.h"
30 #include "absl/status/statusor.h"
31 #include "absl/strings/str_cat.h"
32 #include "src/core/lib/address_utils/sockaddr_utils.h"
33 #include "src/core/lib/iomgr/resolved_address.h"
34 #include "src/core/lib/iomgr/sockaddr.h"
35 #include "test/core/test_util/port.h"
36
37 // IWYU pragma: no_include <arpa/inet.h>
38
39 #ifdef GPR_WINDOWS
40 #include "src/core/lib/iomgr/sockaddr_windows.h"
41 #include "src/core/lib/iomgr/socket_windows.h"
42 #include "src/core/lib/iomgr/tcp_windows.h"
43
44 #define BAD_SOCKET_RETURN_VAL INVALID_SOCKET
45 #define CLOSE_SOCKET closesocket
46 #define ERRNO WSAGetLastError()
47 #else
48 #include <fcntl.h>
49 #include <unistd.h>
50
51 #define BAD_SOCKET_RETURN_VAL (-1)
52 #define CLOSE_SOCKET close
53 #define ERRNO errno
54 #endif
55
56 namespace grpc_core {
57 namespace testing {
58
59 namespace {
60
ErrorIsRetryable(int error)61 bool ErrorIsRetryable(int error) {
62 #ifdef GPR_WINDOWS
63 return error == WSAEWOULDBLOCK || error == WSAEINPROGRESS;
64 #else
65 return error == EWOULDBLOCK || error == EAGAIN;
66 #endif
67 }
68
69 } // namespace
70
FakeUdpAndTcpServer(AcceptMode accept_mode,std::function<FakeUdpAndTcpServer::ProcessReadResult (int,int,int)> process_read_cb)71 FakeUdpAndTcpServer::FakeUdpAndTcpServer(
72 AcceptMode accept_mode,
73 std::function<FakeUdpAndTcpServer::ProcessReadResult(int, int, int)>
74 process_read_cb)
75 : accept_mode_(accept_mode), process_read_cb_(std::move(process_read_cb)) {
76 port_ = grpc_pick_unused_port_or_die();
77 udp_socket_ = socket(AF_INET6, SOCK_DGRAM, 0);
78 if (udp_socket_ == BAD_SOCKET_RETURN_VAL) {
79 LOG(ERROR) << "Failed to create UDP ipv6 socket: " << ERRNO;
80 CHECK(0);
81 }
82 accept_socket_ = socket(AF_INET6, SOCK_STREAM, 0);
83 address_ = absl::StrCat("[::1]:", port_);
84 if (accept_socket_ == BAD_SOCKET_RETURN_VAL) {
85 LOG(ERROR) << "Failed to create TCP IPv6 socket: " << ERRNO;
86 CHECK(0);
87 }
88 #ifdef GPR_WINDOWS
89 char val = 1;
90 if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) ==
91 SOCKET_ERROR) {
92 LOG(ERROR) << "Failed to set SO_REUSEADDR on TCP ipv6 socket to [::1]:"
93 << port_ << ", errno: " << ERRNO;
94 CHECK(0);
95 }
96 grpc_error_handle set_non_block_error;
97 set_non_block_error = grpc_tcp_set_non_block(udp_socket_);
98 if (!set_non_block_error.ok()) {
99 LOG(ERROR) << "Failed to configure non-blocking socket: "
100 << StatusToString(set_non_block_error);
101 CHECK(0);
102 }
103 set_non_block_error = grpc_tcp_set_non_block(accept_socket_);
104 if (!set_non_block_error.ok()) {
105 LOG(ERROR) << "Failed to configure non-blocking socket: "
106 << StatusToString(set_non_block_error);
107 CHECK(0);
108 }
109 #else
110 int val = 1;
111 if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) !=
112 0) {
113 LOG(ERROR) << "Failed to set SO_REUSEADDR on socket [::1]:" << port_;
114 CHECK(0);
115 }
116 if (fcntl(udp_socket_, F_SETFL, O_NONBLOCK) != 0) {
117 LOG(ERROR) << "Failed to set O_NONBLOCK on socket: " << ERRNO;
118 CHECK(0);
119 }
120 if (fcntl(accept_socket_, F_SETFL, O_NONBLOCK) != 0) {
121 LOG(ERROR) << "Failed to set O_NONBLOCK on socket: " << ERRNO;
122 CHECK(0);
123 }
124 #endif
125 sockaddr_in6 addr;
126 memset(&addr, 0, sizeof(addr));
127 addr.sin6_family = AF_INET6;
128 addr.sin6_port = htons(port_);
129 (reinterpret_cast<char*>(&addr.sin6_addr))[15] = 1;
130 grpc_resolved_address resolved_addr;
131 memcpy(resolved_addr.addr, &addr, sizeof(addr));
132 resolved_addr.len = sizeof(addr);
133 std::string addr_str = grpc_sockaddr_to_string(&resolved_addr, false).value();
134 LOG(INFO) << "Fake UDP and TCP server listening on " << addr_str;
135 if (bind(udp_socket_, reinterpret_cast<const sockaddr*>(&addr),
136 sizeof(addr)) != 0) {
137 LOG(ERROR) << "Failed to bind UDP socket to [::1]:" << port_;
138 CHECK(0);
139 }
140 if (bind(accept_socket_, reinterpret_cast<const sockaddr*>(&addr),
141 sizeof(addr)) != 0) {
142 LOG(ERROR) << "Failed to bind TCP socket to [::1]:" << port_ << " : "
143 << ERRNO;
144 CHECK(0);
145 }
146 if (listen(accept_socket_, 100)) {
147 LOG(ERROR) << "Failed to listen on socket bound to [::1]:" << port_ << " : "
148 << ERRNO;
149 CHECK(0);
150 }
151 gpr_event_init(&stop_ev_);
152 run_server_loop_thd_ = std::make_unique<std::thread>(
153 std::bind(&FakeUdpAndTcpServer::RunServerLoop, this));
154 }
155
~FakeUdpAndTcpServer()156 FakeUdpAndTcpServer::~FakeUdpAndTcpServer() {
157 VLOG(2) << "FakeUdpAndTcpServer stop and join server thread";
158 gpr_event_set(&stop_ev_, reinterpret_cast<void*>(1));
159 run_server_loop_thd_->join();
160 VLOG(2) << "FakeUdpAndTcpServer join server thread complete";
161 CLOSE_SOCKET(accept_socket_);
162 CLOSE_SOCKET(udp_socket_);
163 }
164
165 FakeUdpAndTcpServer::ProcessReadResult
CloseSocketUponReceivingBytesFromPeer(int bytes_received_size,int read_error,int s)166 FakeUdpAndTcpServer::CloseSocketUponReceivingBytesFromPeer(
167 int bytes_received_size, int read_error, int s) {
168 if (bytes_received_size < 0 && !ErrorIsRetryable(read_error)) {
169 LOG(ERROR) << "Failed to receive from peer socket: " << s
170 << ". errno: " << read_error;
171 CHECK(0);
172 }
173 if (bytes_received_size >= 0) {
174 VLOG(2) << "Fake TCP server received " << bytes_received_size
175 << " bytes from peer socket: " << s << ". Close the connection.";
176 return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
177 }
178 return FakeUdpAndTcpServer::ProcessReadResult::kContinueReading;
179 }
180
181 FakeUdpAndTcpServer::ProcessReadResult
CloseSocketUponCloseFromPeer(int bytes_received_size,int read_error,int s)182 FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer(int bytes_received_size,
183 int read_error, int s) {
184 if (bytes_received_size < 0 && !ErrorIsRetryable(read_error)) {
185 LOG(ERROR) << "Failed to receive from peer socket: " << s
186 << ". errno: " << read_error;
187 CHECK(0);
188 }
189 if (bytes_received_size == 0) {
190 // The peer has shut down the connection.
191 VLOG(2) << "Fake TCP server received 0 bytes from peer socket: " << s
192 << ". Close the connection.";
193 return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
194 }
195 return FakeUdpAndTcpServer::ProcessReadResult::kContinueReading;
196 }
197
198 FakeUdpAndTcpServer::ProcessReadResult
SendThreeAllZeroBytes(int bytes_received_size,int read_error,int s)199 FakeUdpAndTcpServer::SendThreeAllZeroBytes(int bytes_received_size,
200 int read_error, int s) {
201 if (bytes_received_size < 0 && !ErrorIsRetryable(read_error)) {
202 LOG(ERROR) << "Failed to receive from peer socket: " << s
203 << ". errno: " << read_error;
204 CHECK(0);
205 }
206 if (bytes_received_size == 0) {
207 // The peer has shut down the connection.
208 VLOG(2) << "Fake TCP server received 0 bytes from peer socket: " << s;
209 return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
210 }
211 char buf[3] = {0, 0, 0};
212 int bytes_sent = send(s, buf, sizeof(buf), 0);
213 VLOG(2) << "Fake TCP server sent " << bytes_sent
214 << " all-zero bytes on peer socket: " << s;
215 return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
216 }
217
FakeUdpAndTcpServerPeer(int fd)218 FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::FakeUdpAndTcpServerPeer(int fd)
219 : fd_(fd) {}
220
~FakeUdpAndTcpServerPeer()221 FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::~FakeUdpAndTcpServerPeer() {
222 CLOSE_SOCKET(fd_);
223 }
224
225 void FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::
MaybeContinueSendingSettings()226 MaybeContinueSendingSettings() {
227 // https://tools.ietf.org/html/rfc7540#section-4.1
228 const std::vector<char> kEmptyHttp2SettingsFrame = {
229 0x00, 0x00, 0x00, // length
230 0x04, // settings type
231 0x00, // flags
232 0x00, 0x00, 0x00, 0x00 // stream identifier
233 };
234 if (total_bytes_sent_ < static_cast<int>(kEmptyHttp2SettingsFrame.size())) {
235 int bytes_to_send = kEmptyHttp2SettingsFrame.size() - total_bytes_sent_;
236 int bytes_sent =
237 send(fd_, kEmptyHttp2SettingsFrame.data() + total_bytes_sent_,
238 bytes_to_send, 0);
239 if (bytes_sent < 0 && !ErrorIsRetryable(ERRNO)) {
240 LOG(ERROR) << "Fake TCP server encountered unexpected error:" << ERRNO
241 << " sending " << bytes_to_send << " bytes on fd:" << fd_;
242 CHECK(0);
243 } else if (bytes_sent > 0) {
244 total_bytes_sent_ += bytes_sent;
245 CHECK(total_bytes_sent_ <= int(kEmptyHttp2SettingsFrame.size()));
246 }
247 }
248 }
249
ReadFromUdpSocket()250 void FakeUdpAndTcpServer::ReadFromUdpSocket() {
251 char buf[100];
252 recvfrom(udp_socket_, buf, sizeof(buf), 0, nullptr, nullptr);
253 }
254
RunServerLoop()255 void FakeUdpAndTcpServer::RunServerLoop() {
256 std::set<std::unique_ptr<FakeUdpAndTcpServerPeer>> peers;
257 while (!gpr_event_get(&stop_ev_)) {
258 // handle TCP connections
259 int p = accept(accept_socket_, nullptr, nullptr);
260 if (p != BAD_SOCKET_RETURN_VAL) {
261 VLOG(2) << "accepted peer socket: " << p;
262 #ifdef GPR_WINDOWS
263 grpc_error_handle set_non_block_error;
264 set_non_block_error = grpc_tcp_set_non_block(p);
265 if (!set_non_block_error.ok()) {
266 LOG(ERROR) << "Failed to configure non-blocking socket: "
267 << StatusToString(set_non_block_error);
268 CHECK(0);
269 }
270 #else
271 if (fcntl(p, F_SETFL, O_NONBLOCK) != 0) {
272 LOG(ERROR) << "Failed to configure non-blocking socket, errno: "
273 << ERRNO;
274 CHECK(0);
275 }
276 #endif
277 peers.insert(std::make_unique<FakeUdpAndTcpServerPeer>(p));
278 }
279 auto it = peers.begin();
280 while (it != peers.end()) {
281 FakeUdpAndTcpServerPeer* peer = (*it).get();
282 if (accept_mode_ == AcceptMode::kEagerlySendSettings) {
283 peer->MaybeContinueSendingSettings();
284 }
285 char buf[100];
286 int bytes_received_size = recv(peer->fd(), buf, 100, 0);
287 FakeUdpAndTcpServer::ProcessReadResult r =
288 process_read_cb_(bytes_received_size, ERRNO, peer->fd());
289 if (r == FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket) {
290 it = peers.erase(it);
291 } else {
292 CHECK(r == FakeUdpAndTcpServer::ProcessReadResult::kContinueReading);
293 it++;
294 }
295 }
296 // read from the UDP socket
297 ReadFromUdpSocket();
298 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
299 gpr_time_from_millis(10, GPR_TIMESPAN)));
300 }
301 }
302
303 } // namespace testing
304 } // namespace grpc_core
305