• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "test/core/test_util/fake_udp_and_tcp_server.h"
18 
19 #include <errno.h>
20 #include <grpc/support/port_platform.h>
21 #include <grpc/support/time.h>
22 #include <string.h>
23 
24 #include <set>
25 #include <utility>
26 #include <vector>
27 
28 #include "absl/log/check.h"
29 #include "absl/log/log.h"
30 #include "absl/status/statusor.h"
31 #include "absl/strings/str_cat.h"
32 #include "src/core/lib/address_utils/sockaddr_utils.h"
33 #include "src/core/lib/iomgr/resolved_address.h"
34 #include "src/core/lib/iomgr/sockaddr.h"
35 #include "test/core/test_util/port.h"
36 
37 // IWYU pragma: no_include <arpa/inet.h>
38 
39 #ifdef GPR_WINDOWS
40 #include "src/core/lib/iomgr/sockaddr_windows.h"
41 #include "src/core/lib/iomgr/socket_windows.h"
42 #include "src/core/lib/iomgr/tcp_windows.h"
43 
44 #define BAD_SOCKET_RETURN_VAL INVALID_SOCKET
45 #define CLOSE_SOCKET closesocket
46 #define ERRNO WSAGetLastError()
47 #else
48 #include <fcntl.h>
49 #include <unistd.h>
50 
51 #define BAD_SOCKET_RETURN_VAL (-1)
52 #define CLOSE_SOCKET close
53 #define ERRNO errno
54 #endif
55 
56 namespace grpc_core {
57 namespace testing {
58 
59 namespace {
60 
ErrorIsRetryable(int error)61 bool ErrorIsRetryable(int error) {
62 #ifdef GPR_WINDOWS
63   return error == WSAEWOULDBLOCK || error == WSAEINPROGRESS;
64 #else
65   return error == EWOULDBLOCK || error == EAGAIN;
66 #endif
67 }
68 
69 }  // namespace
70 
FakeUdpAndTcpServer(AcceptMode accept_mode,std::function<FakeUdpAndTcpServer::ProcessReadResult (int,int,int)> process_read_cb)71 FakeUdpAndTcpServer::FakeUdpAndTcpServer(
72     AcceptMode accept_mode,
73     std::function<FakeUdpAndTcpServer::ProcessReadResult(int, int, int)>
74         process_read_cb)
75     : accept_mode_(accept_mode), process_read_cb_(std::move(process_read_cb)) {
76   port_ = grpc_pick_unused_port_or_die();
77   udp_socket_ = socket(AF_INET6, SOCK_DGRAM, 0);
78   if (udp_socket_ == BAD_SOCKET_RETURN_VAL) {
79     LOG(ERROR) << "Failed to create UDP ipv6 socket: " << ERRNO;
80     CHECK(0);
81   }
82   accept_socket_ = socket(AF_INET6, SOCK_STREAM, 0);
83   address_ = absl::StrCat("[::1]:", port_);
84   if (accept_socket_ == BAD_SOCKET_RETURN_VAL) {
85     LOG(ERROR) << "Failed to create TCP IPv6 socket: " << ERRNO;
86     CHECK(0);
87   }
88 #ifdef GPR_WINDOWS
89   char val = 1;
90   if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) ==
91       SOCKET_ERROR) {
92     LOG(ERROR) << "Failed to set SO_REUSEADDR on TCP ipv6 socket to [::1]:"
93                << port_ << ", errno: " << ERRNO;
94     CHECK(0);
95   }
96   grpc_error_handle set_non_block_error;
97   set_non_block_error = grpc_tcp_set_non_block(udp_socket_);
98   if (!set_non_block_error.ok()) {
99     LOG(ERROR) << "Failed to configure non-blocking socket: "
100                << StatusToString(set_non_block_error);
101     CHECK(0);
102   }
103   set_non_block_error = grpc_tcp_set_non_block(accept_socket_);
104   if (!set_non_block_error.ok()) {
105     LOG(ERROR) << "Failed to configure non-blocking socket: "
106                << StatusToString(set_non_block_error);
107     CHECK(0);
108   }
109 #else
110   int val = 1;
111   if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) !=
112       0) {
113     LOG(ERROR) << "Failed to set SO_REUSEADDR on socket [::1]:" << port_;
114     CHECK(0);
115   }
116   if (fcntl(udp_socket_, F_SETFL, O_NONBLOCK) != 0) {
117     LOG(ERROR) << "Failed to set O_NONBLOCK on socket: " << ERRNO;
118     CHECK(0);
119   }
120   if (fcntl(accept_socket_, F_SETFL, O_NONBLOCK) != 0) {
121     LOG(ERROR) << "Failed to set O_NONBLOCK on socket: " << ERRNO;
122     CHECK(0);
123   }
124 #endif
125   sockaddr_in6 addr;
126   memset(&addr, 0, sizeof(addr));
127   addr.sin6_family = AF_INET6;
128   addr.sin6_port = htons(port_);
129   (reinterpret_cast<char*>(&addr.sin6_addr))[15] = 1;
130   grpc_resolved_address resolved_addr;
131   memcpy(resolved_addr.addr, &addr, sizeof(addr));
132   resolved_addr.len = sizeof(addr);
133   std::string addr_str = grpc_sockaddr_to_string(&resolved_addr, false).value();
134   LOG(INFO) << "Fake UDP and TCP server listening on " << addr_str;
135   if (bind(udp_socket_, reinterpret_cast<const sockaddr*>(&addr),
136            sizeof(addr)) != 0) {
137     LOG(ERROR) << "Failed to bind UDP socket to [::1]:" << port_;
138     CHECK(0);
139   }
140   if (bind(accept_socket_, reinterpret_cast<const sockaddr*>(&addr),
141            sizeof(addr)) != 0) {
142     LOG(ERROR) << "Failed to bind TCP socket to [::1]:" << port_ << " : "
143                << ERRNO;
144     CHECK(0);
145   }
146   if (listen(accept_socket_, 100)) {
147     LOG(ERROR) << "Failed to listen on socket bound to [::1]:" << port_ << " : "
148                << ERRNO;
149     CHECK(0);
150   }
151   gpr_event_init(&stop_ev_);
152   run_server_loop_thd_ = std::make_unique<std::thread>(
153       std::bind(&FakeUdpAndTcpServer::RunServerLoop, this));
154 }
155 
~FakeUdpAndTcpServer()156 FakeUdpAndTcpServer::~FakeUdpAndTcpServer() {
157   VLOG(2) << "FakeUdpAndTcpServer stop and join server thread";
158   gpr_event_set(&stop_ev_, reinterpret_cast<void*>(1));
159   run_server_loop_thd_->join();
160   VLOG(2) << "FakeUdpAndTcpServer join server thread complete";
161   CLOSE_SOCKET(accept_socket_);
162   CLOSE_SOCKET(udp_socket_);
163 }
164 
165 FakeUdpAndTcpServer::ProcessReadResult
CloseSocketUponReceivingBytesFromPeer(int bytes_received_size,int read_error,int s)166 FakeUdpAndTcpServer::CloseSocketUponReceivingBytesFromPeer(
167     int bytes_received_size, int read_error, int s) {
168   if (bytes_received_size < 0 && !ErrorIsRetryable(read_error)) {
169     LOG(ERROR) << "Failed to receive from peer socket: " << s
170                << ". errno: " << read_error;
171     CHECK(0);
172   }
173   if (bytes_received_size >= 0) {
174     VLOG(2) << "Fake TCP server received " << bytes_received_size
175             << " bytes from peer socket: " << s << ". Close the connection.";
176     return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
177   }
178   return FakeUdpAndTcpServer::ProcessReadResult::kContinueReading;
179 }
180 
181 FakeUdpAndTcpServer::ProcessReadResult
CloseSocketUponCloseFromPeer(int bytes_received_size,int read_error,int s)182 FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer(int bytes_received_size,
183                                                   int read_error, int s) {
184   if (bytes_received_size < 0 && !ErrorIsRetryable(read_error)) {
185     LOG(ERROR) << "Failed to receive from peer socket: " << s
186                << ". errno: " << read_error;
187     CHECK(0);
188   }
189   if (bytes_received_size == 0) {
190     // The peer has shut down the connection.
191     VLOG(2) << "Fake TCP server received 0 bytes from peer socket: " << s
192             << ". Close the connection.";
193     return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
194   }
195   return FakeUdpAndTcpServer::ProcessReadResult::kContinueReading;
196 }
197 
198 FakeUdpAndTcpServer::ProcessReadResult
SendThreeAllZeroBytes(int bytes_received_size,int read_error,int s)199 FakeUdpAndTcpServer::SendThreeAllZeroBytes(int bytes_received_size,
200                                            int read_error, int s) {
201   if (bytes_received_size < 0 && !ErrorIsRetryable(read_error)) {
202     LOG(ERROR) << "Failed to receive from peer socket: " << s
203                << ". errno: " << read_error;
204     CHECK(0);
205   }
206   if (bytes_received_size == 0) {
207     // The peer has shut down the connection.
208     VLOG(2) << "Fake TCP server received 0 bytes from peer socket: " << s;
209     return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
210   }
211   char buf[3] = {0, 0, 0};
212   int bytes_sent = send(s, buf, sizeof(buf), 0);
213   VLOG(2) << "Fake TCP server sent " << bytes_sent
214           << " all-zero bytes on peer socket: " << s;
215   return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
216 }
217 
FakeUdpAndTcpServerPeer(int fd)218 FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::FakeUdpAndTcpServerPeer(int fd)
219     : fd_(fd) {}
220 
~FakeUdpAndTcpServerPeer()221 FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::~FakeUdpAndTcpServerPeer() {
222   CLOSE_SOCKET(fd_);
223 }
224 
225 void FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::
MaybeContinueSendingSettings()226     MaybeContinueSendingSettings() {
227   // https://tools.ietf.org/html/rfc7540#section-4.1
228   const std::vector<char> kEmptyHttp2SettingsFrame = {
229       0x00, 0x00, 0x00,       // length
230       0x04,                   // settings type
231       0x00,                   // flags
232       0x00, 0x00, 0x00, 0x00  // stream identifier
233   };
234   if (total_bytes_sent_ < static_cast<int>(kEmptyHttp2SettingsFrame.size())) {
235     int bytes_to_send = kEmptyHttp2SettingsFrame.size() - total_bytes_sent_;
236     int bytes_sent =
237         send(fd_, kEmptyHttp2SettingsFrame.data() + total_bytes_sent_,
238              bytes_to_send, 0);
239     if (bytes_sent < 0 && !ErrorIsRetryable(ERRNO)) {
240       LOG(ERROR) << "Fake TCP server encountered unexpected error:" << ERRNO
241                  << " sending " << bytes_to_send << " bytes on fd:" << fd_;
242       CHECK(0);
243     } else if (bytes_sent > 0) {
244       total_bytes_sent_ += bytes_sent;
245       CHECK(total_bytes_sent_ <= int(kEmptyHttp2SettingsFrame.size()));
246     }
247   }
248 }
249 
ReadFromUdpSocket()250 void FakeUdpAndTcpServer::ReadFromUdpSocket() {
251   char buf[100];
252   recvfrom(udp_socket_, buf, sizeof(buf), 0, nullptr, nullptr);
253 }
254 
RunServerLoop()255 void FakeUdpAndTcpServer::RunServerLoop() {
256   std::set<std::unique_ptr<FakeUdpAndTcpServerPeer>> peers;
257   while (!gpr_event_get(&stop_ev_)) {
258     // handle TCP connections
259     int p = accept(accept_socket_, nullptr, nullptr);
260     if (p != BAD_SOCKET_RETURN_VAL) {
261       VLOG(2) << "accepted peer socket: " << p;
262 #ifdef GPR_WINDOWS
263       grpc_error_handle set_non_block_error;
264       set_non_block_error = grpc_tcp_set_non_block(p);
265       if (!set_non_block_error.ok()) {
266         LOG(ERROR) << "Failed to configure non-blocking socket: "
267                    << StatusToString(set_non_block_error);
268         CHECK(0);
269       }
270 #else
271       if (fcntl(p, F_SETFL, O_NONBLOCK) != 0) {
272         LOG(ERROR) << "Failed to configure non-blocking socket, errno: "
273                    << ERRNO;
274         CHECK(0);
275       }
276 #endif
277       peers.insert(std::make_unique<FakeUdpAndTcpServerPeer>(p));
278     }
279     auto it = peers.begin();
280     while (it != peers.end()) {
281       FakeUdpAndTcpServerPeer* peer = (*it).get();
282       if (accept_mode_ == AcceptMode::kEagerlySendSettings) {
283         peer->MaybeContinueSendingSettings();
284       }
285       char buf[100];
286       int bytes_received_size = recv(peer->fd(), buf, 100, 0);
287       FakeUdpAndTcpServer::ProcessReadResult r =
288           process_read_cb_(bytes_received_size, ERRNO, peer->fd());
289       if (r == FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket) {
290         it = peers.erase(it);
291       } else {
292         CHECK(r == FakeUdpAndTcpServer::ProcessReadResult::kContinueReading);
293         it++;
294       }
295     }
296     // read from the UDP socket
297     ReadFromUdpSocket();
298     gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
299                                  gpr_time_from_millis(10, GPR_TIMESPAN)));
300   }
301 }
302 
303 }  // namespace testing
304 }  // namespace grpc_core
305