1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "test/core/util/fake_udp_and_tcp_server.h"
18 
19 #include <errno.h>
20 #include <string.h>
21 
22 #include <set>
23 #include <utility>
24 #include <vector>
25 
26 #include "absl/status/statusor.h"
27 #include "absl/strings/str_cat.h"
28 
29 #include <grpc/support/log.h>
30 #include <grpc/support/port_platform.h>
31 #include <grpc/support/time.h>
32 
33 #include "src/core/lib/address_utils/sockaddr_utils.h"
34 #include "src/core/lib/iomgr/resolved_address.h"
35 #include "src/core/lib/iomgr/sockaddr.h"
36 #include "test/core/util/port.h"
37 
38 // IWYU pragma: no_include <arpa/inet.h>
39 
40 #ifdef GPR_WINDOWS
41 #include "src/core/lib/iomgr/sockaddr_windows.h"
42 #include "src/core/lib/iomgr/socket_windows.h"
43 #include "src/core/lib/iomgr/tcp_windows.h"
44 
45 #define BAD_SOCKET_RETURN_VAL INVALID_SOCKET
46 #define CLOSE_SOCKET closesocket
47 #define ERRNO WSAGetLastError()
48 #else
49 #include <fcntl.h>
50 #include <unistd.h>
51 
52 #define BAD_SOCKET_RETURN_VAL (-1)
53 #define CLOSE_SOCKET close
54 #define ERRNO errno
55 #endif
56 
57 namespace grpc_core {
58 namespace testing {
59 
60 namespace {
61 
ErrorIsRetryable(int error)62 bool ErrorIsRetryable(int error) {
63 #ifdef GPR_WINDOWS
64   return error == WSAEWOULDBLOCK || error == WSAEINPROGRESS;
65 #else
66   return error == EWOULDBLOCK || error == EAGAIN;
67 #endif
68 }
69 
70 }  // namespace
71 
FakeUdpAndTcpServer(AcceptMode accept_mode,std::function<FakeUdpAndTcpServer::ProcessReadResult (int,int,int)> process_read_cb)72 FakeUdpAndTcpServer::FakeUdpAndTcpServer(
73     AcceptMode accept_mode,
74     std::function<FakeUdpAndTcpServer::ProcessReadResult(int, int, int)>
75         process_read_cb)
76     : accept_mode_(accept_mode), process_read_cb_(std::move(process_read_cb)) {
77   port_ = grpc_pick_unused_port_or_die();
78   udp_socket_ = socket(AF_INET6, SOCK_DGRAM, 0);
79   if (udp_socket_ == BAD_SOCKET_RETURN_VAL) {
80     gpr_log(GPR_ERROR, "Failed to create UDP ipv6 socket: %d", ERRNO);
81     GPR_ASSERT(0);
82   }
83   accept_socket_ = socket(AF_INET6, SOCK_STREAM, 0);
84   address_ = absl::StrCat("[::1]:", port_);
85   if (accept_socket_ == BAD_SOCKET_RETURN_VAL) {
86     gpr_log(GPR_ERROR, "Failed to create TCP IPv6 socket: %d", ERRNO);
87     GPR_ASSERT(0);
88   }
89 #ifdef GPR_WINDOWS
90   char val = 1;
91   if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) ==
92       SOCKET_ERROR) {
93     gpr_log(GPR_ERROR,
94             "Failed to set SO_REUSEADDR on TCP ipv6 socket to [::1]:%d, "
95             "errno: %d",
96             port_, ERRNO);
97     GPR_ASSERT(0);
98   }
99   grpc_error_handle set_non_block_error;
100   set_non_block_error = grpc_tcp_set_non_block(udp_socket_);
101   if (!set_non_block_error.ok()) {
102     gpr_log(GPR_ERROR, "Failed to configure non-blocking socket: %s",
103             StatusToString(set_non_block_error).c_str());
104     GPR_ASSERT(0);
105   }
106   set_non_block_error = grpc_tcp_set_non_block(accept_socket_);
107   if (!set_non_block_error.ok()) {
108     gpr_log(GPR_ERROR, "Failed to configure non-blocking socket: %s",
109             StatusToString(set_non_block_error).c_str());
110     GPR_ASSERT(0);
111   }
112 #else
113   int val = 1;
114   if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) !=
115       0) {
116     gpr_log(GPR_ERROR, "Failed to set SO_REUSEADDR on socket [::1]:%d", port_);
117     GPR_ASSERT(0);
118   }
119   if (fcntl(udp_socket_, F_SETFL, O_NONBLOCK) != 0) {
120     gpr_log(GPR_ERROR, "Failed to set O_NONBLOCK on socket: %d", ERRNO);
121     GPR_ASSERT(0);
122   }
123   if (fcntl(accept_socket_, F_SETFL, O_NONBLOCK) != 0) {
124     gpr_log(GPR_ERROR, "Failed to set O_NONBLOCK on socket: %d", ERRNO);
125     GPR_ASSERT(0);
126   }
127 #endif
128   sockaddr_in6 addr;
129   memset(&addr, 0, sizeof(addr));
130   addr.sin6_family = AF_INET6;
131   addr.sin6_port = htons(port_);
132   (reinterpret_cast<char*>(&addr.sin6_addr))[15] = 1;
133   grpc_resolved_address resolved_addr;
134   memcpy(resolved_addr.addr, &addr, sizeof(addr));
135   resolved_addr.len = sizeof(addr);
136   std::string addr_str = grpc_sockaddr_to_string(&resolved_addr, false).value();
137   gpr_log(GPR_INFO, "Fake UDP and TCP server listening on %s",
138           addr_str.c_str());
139   if (bind(udp_socket_, reinterpret_cast<const sockaddr*>(&addr),
140            sizeof(addr)) != 0) {
141     gpr_log(GPR_ERROR, "Failed to bind UDP socket to [::1]:%d", port_);
142     GPR_ASSERT(0);
143   }
144   if (bind(accept_socket_, reinterpret_cast<const sockaddr*>(&addr),
145            sizeof(addr)) != 0) {
146     gpr_log(GPR_ERROR, "Failed to bind TCP socket to [::1]:%d : %d", port_,
147             ERRNO);
148     GPR_ASSERT(0);
149   }
150   if (listen(accept_socket_, 100)) {
151     gpr_log(GPR_ERROR, "Failed to listen on socket bound to [::1]:%d : %d",
152             port_, ERRNO);
153     GPR_ASSERT(0);
154   }
155   gpr_event_init(&stop_ev_);
156   run_server_loop_thd_ = std::make_unique<std::thread>(
157       std::bind(&FakeUdpAndTcpServer::RunServerLoop, this));
158 }
159 
~FakeUdpAndTcpServer()160 FakeUdpAndTcpServer::~FakeUdpAndTcpServer() {
161   gpr_log(GPR_DEBUG,
162           "FakeUdpAndTcpServer stop and "
163           "join server thread");
164   gpr_event_set(&stop_ev_, reinterpret_cast<void*>(1));
165   run_server_loop_thd_->join();
166   gpr_log(GPR_DEBUG,
167           "FakeUdpAndTcpServer join server "
168           "thread complete");
169   CLOSE_SOCKET(accept_socket_);
170   CLOSE_SOCKET(udp_socket_);
171 }
172 
173 FakeUdpAndTcpServer::ProcessReadResult
CloseSocketUponReceivingBytesFromPeer(int bytes_received_size,int read_error,int s)174 FakeUdpAndTcpServer::CloseSocketUponReceivingBytesFromPeer(
175     int bytes_received_size, int read_error, int s) {
176   if (bytes_received_size < 0 && !ErrorIsRetryable(read_error)) {
177     gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s,
178             read_error);
179     GPR_ASSERT(0);
180   }
181   if (bytes_received_size >= 0) {
182     gpr_log(GPR_DEBUG,
183             "Fake TCP server received %d bytes from peer socket: %d. Close "
184             "the "
185             "connection.",
186             bytes_received_size, s);
187     return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
188   }
189   return FakeUdpAndTcpServer::ProcessReadResult::kContinueReading;
190 }
191 
192 FakeUdpAndTcpServer::ProcessReadResult
CloseSocketUponCloseFromPeer(int bytes_received_size,int read_error,int s)193 FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer(int bytes_received_size,
194                                                   int read_error, int s) {
195   if (bytes_received_size < 0 && !ErrorIsRetryable(read_error)) {
196     gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s,
197             read_error);
198     GPR_ASSERT(0);
199   }
200   if (bytes_received_size == 0) {
201     // The peer has shut down the connection.
202     gpr_log(GPR_DEBUG,
203             "Fake TCP server received 0 bytes from peer socket: %d. Close "
204             "the "
205             "connection.",
206             s);
207     return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
208   }
209   return FakeUdpAndTcpServer::ProcessReadResult::kContinueReading;
210 }
211 
212 FakeUdpAndTcpServer::ProcessReadResult
SendThreeAllZeroBytes(int bytes_received_size,int read_error,int s)213 FakeUdpAndTcpServer::SendThreeAllZeroBytes(int bytes_received_size,
214                                            int read_error, int s) {
215   if (bytes_received_size < 0 && !ErrorIsRetryable(read_error)) {
216     gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s,
217             read_error);
218     GPR_ASSERT(0);
219   }
220   if (bytes_received_size == 0) {
221     // The peer has shut down the connection.
222     gpr_log(GPR_DEBUG, "Fake TCP server received 0 bytes from peer socket: %d.",
223             s);
224     return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
225   }
226   char buf[3] = {0, 0, 0};
227   int bytes_sent = send(s, buf, sizeof(buf), 0);
228   gpr_log(GPR_DEBUG,
229           "Fake TCP server sent %d all-zero bytes on peer socket: %d.",
230           bytes_sent, s);
231   return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
232 }
233 
FakeUdpAndTcpServerPeer(int fd)234 FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::FakeUdpAndTcpServerPeer(int fd)
235     : fd_(fd) {}
236 
~FakeUdpAndTcpServerPeer()237 FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::~FakeUdpAndTcpServerPeer() {
238   CLOSE_SOCKET(fd_);
239 }
240 
241 void FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::
MaybeContinueSendingSettings()242     MaybeContinueSendingSettings() {
243   // https://tools.ietf.org/html/rfc7540#section-4.1
244   const std::vector<char> kEmptyHttp2SettingsFrame = {
245       0x00, 0x00, 0x00,       // length
246       0x04,                   // settings type
247       0x00,                   // flags
248       0x00, 0x00, 0x00, 0x00  // stream identifier
249   };
250   if (total_bytes_sent_ < static_cast<int>(kEmptyHttp2SettingsFrame.size())) {
251     int bytes_to_send = kEmptyHttp2SettingsFrame.size() - total_bytes_sent_;
252     int bytes_sent =
253         send(fd_, kEmptyHttp2SettingsFrame.data() + total_bytes_sent_,
254              bytes_to_send, 0);
255     if (bytes_sent < 0 && !ErrorIsRetryable(ERRNO)) {
256       gpr_log(GPR_ERROR,
257               "Fake TCP server encountered unexpected error:%d "
258               "sending %d bytes on fd:%d",
259               ERRNO, bytes_to_send, fd_);
260       GPR_ASSERT(0);
261     } else if (bytes_sent > 0) {
262       total_bytes_sent_ += bytes_sent;
263       GPR_ASSERT(total_bytes_sent_ <= int(kEmptyHttp2SettingsFrame.size()));
264     }
265   }
266 }
267 
ReadFromUdpSocket()268 void FakeUdpAndTcpServer::ReadFromUdpSocket() {
269   char buf[100];
270   recvfrom(udp_socket_, buf, sizeof(buf), 0, nullptr, nullptr);
271 }
272 
RunServerLoop()273 void FakeUdpAndTcpServer::RunServerLoop() {
274   std::set<std::unique_ptr<FakeUdpAndTcpServerPeer>> peers;
275   while (!gpr_event_get(&stop_ev_)) {
276     // handle TCP connections
277     int p = accept(accept_socket_, nullptr, nullptr);
278     if (p != BAD_SOCKET_RETURN_VAL) {
279       gpr_log(GPR_DEBUG, "accepted peer socket: %d", p);
280 #ifdef GPR_WINDOWS
281       grpc_error_handle set_non_block_error;
282       set_non_block_error = grpc_tcp_set_non_block(p);
283       if (!set_non_block_error.ok()) {
284         gpr_log(GPR_ERROR, "Failed to configure non-blocking socket: %s",
285                 StatusToString(set_non_block_error).c_str());
286         GPR_ASSERT(0);
287       }
288 #else
289       if (fcntl(p, F_SETFL, O_NONBLOCK) != 0) {
290         gpr_log(GPR_ERROR, "Failed to configure non-blocking socket, errno: %d",
291                 ERRNO);
292         GPR_ASSERT(0);
293       }
294 #endif
295       peers.insert(std::make_unique<FakeUdpAndTcpServerPeer>(p));
296     }
297     auto it = peers.begin();
298     while (it != peers.end()) {
299       FakeUdpAndTcpServerPeer* peer = (*it).get();
300       if (accept_mode_ == AcceptMode::kEagerlySendSettings) {
301         peer->MaybeContinueSendingSettings();
302       }
303       char buf[100];
304       int bytes_received_size = recv(peer->fd(), buf, 100, 0);
305       FakeUdpAndTcpServer::ProcessReadResult r =
306           process_read_cb_(bytes_received_size, ERRNO, peer->fd());
307       if (r == FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket) {
308         it = peers.erase(it);
309       } else {
310         GPR_ASSERT(r ==
311                    FakeUdpAndTcpServer::ProcessReadResult::kContinueReading);
312         it++;
313       }
314     }
315     // read from the UDP socket
316     ReadFromUdpSocket();
317     gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
318                                  gpr_time_from_millis(10, GPR_TIMESPAN)));
319   }
320 }
321 
322 }  // namespace testing
323 }  // namespace grpc_core
324