1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "test/core/event_engine/test_suite/posix/oracle_event_engine_posix.h"
16
17 #include <poll.h>
18 #include <sys/socket.h>
19 #include <unistd.h>
20
21 #include <algorithm>
22 #include <cerrno>
23 #include <cstring>
24 #include <memory>
25
26 #include "absl/status/status.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/str_format.h"
29 #include "absl/time/clock.h"
30 #include "absl/time/time.h"
31
32 #include <grpc/event_engine/event_engine.h>
33 #include <grpc/support/alloc.h>
34 #include <grpc/support/log.h>
35
36 #include "src/core/lib/address_utils/sockaddr_utils.h"
37 #include "src/core/lib/gprpp/crash.h"
38 #include "src/core/lib/gprpp/strerror.h"
39 #include "src/core/lib/iomgr/resolved_address.h"
40
41 namespace grpc_event_engine {
42 namespace experimental {
43
44 namespace {
45
46 const char* kStopMessage = "STOP";
47
CreateGRPCResolvedAddress(const EventEngine::ResolvedAddress & ra)48 grpc_resolved_address CreateGRPCResolvedAddress(
49 const EventEngine::ResolvedAddress& ra) {
50 grpc_resolved_address grpc_addr;
51 memcpy(grpc_addr.addr, ra.address(), ra.size());
52 grpc_addr.len = ra.size();
53 return grpc_addr;
54 }
55
56 // Blocks until poll(2) indicates that one of the fds has pending I/O
57 // the deadline is reached whichever comes first. Returns an OK
58 // status a valid I/O event is available for at least one of the fds, a Status
59 // with canonical code DEADLINE_EXCEEDED if the deadline expired and a non-OK
60 // Status if any other error occurred.
PollFds(struct pollfd * pfds,int nfds,absl::Duration timeout)61 absl::Status PollFds(struct pollfd* pfds, int nfds, absl::Duration timeout) {
62 int rv;
63 while (true) {
64 if (timeout != absl::InfiniteDuration()) {
65 rv = poll(pfds, nfds,
66 static_cast<int>(absl::ToInt64Milliseconds(timeout)));
67 } else {
68 rv = poll(pfds, nfds, /* timeout = */ -1);
69 }
70 const int saved_errno = errno;
71 errno = saved_errno;
72 if (rv >= 0 || errno != EINTR) {
73 break;
74 }
75 }
76 if (rv < 0) {
77 return absl::UnknownError(grpc_core::StrError(errno));
78 }
79 if (rv == 0) {
80 return absl::CancelledError("Deadline exceeded");
81 }
82 return absl::OkStatus();
83 }
84
BlockUntilReadable(int fd)85 absl::Status BlockUntilReadable(int fd) {
86 struct pollfd pfd;
87 pfd.fd = fd;
88 pfd.events = POLLIN;
89 pfd.revents = 0;
90 return PollFds(&pfd, 1, absl::InfiniteDuration());
91 }
92
BlockUntilWritableWithTimeout(int fd,absl::Duration timeout)93 absl::Status BlockUntilWritableWithTimeout(int fd, absl::Duration timeout) {
94 struct pollfd pfd;
95 pfd.fd = fd;
96 pfd.events = POLLOUT;
97 pfd.revents = 0;
98 return PollFds(&pfd, 1, timeout);
99 }
100
BlockUntilWritable(int fd)101 absl::Status BlockUntilWritable(int fd) {
102 return BlockUntilWritableWithTimeout(fd, absl::InfiniteDuration());
103 }
104
105 // Tries to read upto num_expected_bytes from the socket. It returns early if
106 // specified data is not yet available.
TryReadBytes(int sockfd,int & saved_errno,int num_expected_bytes)107 std::string TryReadBytes(int sockfd, int& saved_errno, int num_expected_bytes) {
108 int ret = 0;
109 static constexpr int kDefaultNumExpectedBytes = 1024;
110 if (num_expected_bytes <= 0) {
111 num_expected_bytes = kDefaultNumExpectedBytes;
112 }
113 std::string read_data = std::string(num_expected_bytes, '\0');
114 char* buffer = const_cast<char*>(read_data.c_str());
115 int pending_bytes = num_expected_bytes;
116 do {
117 errno = 0;
118 ret = read(sockfd, buffer + num_expected_bytes - pending_bytes,
119 pending_bytes);
120 if (ret > 0) {
121 pending_bytes -= ret;
122 }
123 } while (pending_bytes > 0 && ((ret > 0) || (ret < 0 && errno == EINTR)));
124 saved_errno = errno;
125 return read_data.substr(0, num_expected_bytes - pending_bytes);
126 }
127
128 // Blocks calling thread until the specified number of bytes have been
129 // read from the provided socket or it encounters an unrecoverable error. It
130 // puts the read bytes into a string and returns the string. If it encounters an
131 // error, it returns an empty string and updates saved_errno with the
132 // appropriate errno.
ReadBytes(int sockfd,int & saved_errno,int num_expected_bytes)133 std::string ReadBytes(int sockfd, int& saved_errno, int num_expected_bytes) {
134 std::string read_data;
135 do {
136 saved_errno = 0;
137 read_data += TryReadBytes(sockfd, saved_errno,
138 num_expected_bytes - read_data.length());
139 if (saved_errno == EAGAIN &&
140 read_data.length() < static_cast<size_t>(num_expected_bytes)) {
141 GPR_ASSERT(BlockUntilReadable(sockfd).ok());
142 } else if (saved_errno != 0 && num_expected_bytes > 0) {
143 read_data.clear();
144 break;
145 }
146 } while (read_data.length() < static_cast<size_t>(num_expected_bytes));
147 return read_data;
148 }
149
150 // Tries to write the specified bytes over the socket. It returns the number of
151 // bytes actually written.
TryWriteBytes(int sockfd,int & saved_errno,std::string write_bytes)152 int TryWriteBytes(int sockfd, int& saved_errno, std::string write_bytes) {
153 int ret = 0;
154 int pending_bytes = write_bytes.length();
155 do {
156 errno = 0;
157 ret = write(sockfd,
158 write_bytes.c_str() + write_bytes.length() - pending_bytes,
159 pending_bytes);
160 if (ret > 0) {
161 pending_bytes -= ret;
162 }
163 } while (pending_bytes > 0 && ((ret > 0) || (ret < 0 && errno == EINTR)));
164 saved_errno = errno;
165 return write_bytes.length() - pending_bytes;
166 }
167
168 // Blocks calling thread until the specified number of bytes have been
169 // written over the provided socket or it encounters an unrecoverable error. The
170 // bytes to write are specified as a string. If it encounters an error, it
171 // returns an empty string and updates saved_errno with the appropriate errno
172 // and returns a value less than zero.
WriteBytes(int sockfd,int & saved_errno,std::string write_bytes)173 int WriteBytes(int sockfd, int& saved_errno, std::string write_bytes) {
174 int ret = 0;
175 int original_write_length = write_bytes.length();
176 do {
177 saved_errno = 0;
178 ret = TryWriteBytes(sockfd, saved_errno, write_bytes);
179 if (saved_errno == EAGAIN && ret < static_cast<int>(write_bytes.length())) {
180 GPR_ASSERT(ret >= 0);
181 GPR_ASSERT(BlockUntilWritable(sockfd).ok());
182 } else if (saved_errno != 0) {
183 GPR_ASSERT(ret < 0);
184 return ret;
185 }
186 write_bytes = write_bytes.substr(ret, std::string::npos);
187 } while (write_bytes.length() > 0);
188 return original_write_length;
189 }
190 } // namespace
191
PosixOracleEndpoint(int socket_fd)192 PosixOracleEndpoint::PosixOracleEndpoint(int socket_fd)
193 : socket_fd_(socket_fd) {
194 read_ops_ = grpc_core::Thread(
195 "read_ops_thread",
196 [](void* arg) {
197 static_cast<PosixOracleEndpoint*>(arg)->ProcessReadOperations();
198 },
199 this);
200 write_ops_ = grpc_core::Thread(
201 "write_ops_thread",
202 [](void* arg) {
203 static_cast<PosixOracleEndpoint*>(arg)->ProcessWriteOperations();
204 },
205 this);
206 read_ops_.Start();
207 write_ops_.Start();
208 }
209
Shutdown()210 void PosixOracleEndpoint::Shutdown() {
211 grpc_core::MutexLock lock(&mu_);
212 if (std::exchange(is_shutdown_, true)) {
213 return;
214 }
215 read_ops_channel_ = ReadOperation();
216 read_op_signal_->Notify();
217 write_ops_channel_ = WriteOperation();
218 write_op_signal_->Notify();
219 read_ops_.Join();
220 write_ops_.Join();
221 }
222
Create(int socket_fd)223 std::unique_ptr<PosixOracleEndpoint> PosixOracleEndpoint::Create(
224 int socket_fd) {
225 return std::make_unique<PosixOracleEndpoint>(socket_fd);
226 }
227
~PosixOracleEndpoint()228 PosixOracleEndpoint::~PosixOracleEndpoint() {
229 Shutdown();
230 close(socket_fd_);
231 }
232
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const ReadArgs * args)233 bool PosixOracleEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
234 SliceBuffer* buffer, const ReadArgs* args) {
235 grpc_core::MutexLock lock(&mu_);
236 GPR_ASSERT(buffer != nullptr);
237 int read_hint_bytes =
238 args != nullptr ? std::max(1, static_cast<int>(args->read_hint_bytes))
239 : 0;
240 read_ops_channel_ =
241 ReadOperation(read_hint_bytes, buffer, std::move(on_read));
242 read_op_signal_->Notify();
243 return false;
244 }
245
Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const WriteArgs *)246 bool PosixOracleEndpoint::Write(
247 absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
248 const WriteArgs* /*args*/) {
249 grpc_core::MutexLock lock(&mu_);
250 GPR_ASSERT(data != nullptr);
251 write_ops_channel_ = WriteOperation(data, std::move(on_writable));
252 write_op_signal_->Notify();
253 return false;
254 }
255
ProcessReadOperations()256 void PosixOracleEndpoint::ProcessReadOperations() {
257 gpr_log(GPR_INFO, "Starting thread to process read ops ...");
258 while (true) {
259 read_op_signal_->WaitForNotification();
260 read_op_signal_ = std::make_unique<grpc_core::Notification>();
261 auto read_op = std::exchange(read_ops_channel_, ReadOperation());
262 if (!read_op.IsValid()) {
263 read_op(std::string(), absl::CancelledError("Closed"));
264 break;
265 }
266 int saved_errno;
267 std::string read_data =
268 ReadBytes(socket_fd_, saved_errno, read_op.GetNumBytesToRead());
269 read_op(read_data, read_data.empty()
270 ? absl::CancelledError(
271 absl::StrCat("Read failed with error = ",
272 grpc_core::StrError(saved_errno)))
273 : absl::OkStatus());
274 }
275 gpr_log(GPR_INFO, "Shutting down read ops thread ...");
276 }
277
ProcessWriteOperations()278 void PosixOracleEndpoint::ProcessWriteOperations() {
279 gpr_log(GPR_INFO, "Starting thread to process write ops ...");
280 while (true) {
281 write_op_signal_->WaitForNotification();
282 write_op_signal_ = std::make_unique<grpc_core::Notification>();
283 auto write_op = std::exchange(write_ops_channel_, WriteOperation());
284 if (!write_op.IsValid()) {
285 write_op(absl::CancelledError("Closed"));
286 break;
287 }
288 int saved_errno;
289 int ret = WriteBytes(socket_fd_, saved_errno, write_op.GetBytesToWrite());
290 write_op(ret < 0 ? absl::CancelledError(
291 absl::StrCat("Write failed with error = ",
292 grpc_core::StrError(saved_errno)))
293 : absl::OkStatus());
294 }
295 gpr_log(GPR_INFO, "Shutting down write ops thread ...");
296 }
297
PosixOracleListener(EventEngine::Listener::AcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)298 PosixOracleListener::PosixOracleListener(
299 EventEngine::Listener::AcceptCallback on_accept,
300 absl::AnyInvocable<void(absl::Status)> on_shutdown,
301 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
302 : on_accept_(std::move(on_accept)),
303 on_shutdown_(std::move(on_shutdown)),
304 memory_allocator_factory_(std::move(memory_allocator_factory)) {
305 if (pipe(pipefd_) == -1) {
306 grpc_core::Crash(absl::StrFormat("Error creating pipe: %s",
307 grpc_core::StrError(errno).c_str()));
308 }
309 }
310
Start()311 absl::Status PosixOracleListener::Start() {
312 grpc_core::MutexLock lock(&mu_);
313 GPR_ASSERT(!listener_fds_.empty());
314 if (std::exchange(is_started_, true)) {
315 return absl::InternalError("Cannot start listener more than once ...");
316 }
317 serve_ = grpc_core::Thread(
318 "accept_thread",
319 [](void* arg) {
320 static_cast<PosixOracleListener*>(arg)->HandleIncomingConnections();
321 },
322 this);
323 serve_.Start();
324 return absl::OkStatus();
325 }
326
~PosixOracleListener()327 PosixOracleListener::~PosixOracleListener() {
328 grpc_core::MutexLock lock(&mu_);
329 if (!is_started_) {
330 serve_.Join();
331 return;
332 }
333 for (int i = 0; i < static_cast<int>(listener_fds_.size()); i++) {
334 shutdown(listener_fds_[i], SHUT_RDWR);
335 }
336 // Send a STOP message over the pipe.
337 GPR_ASSERT(write(pipefd_[1], kStopMessage, strlen(kStopMessage)) != -1);
338 serve_.Join();
339 on_shutdown_(absl::OkStatus());
340 }
341
HandleIncomingConnections()342 void PosixOracleListener::HandleIncomingConnections() {
343 gpr_log(GPR_INFO, "Starting accept thread ...");
344 GPR_ASSERT(!listener_fds_.empty());
345 int nfds = listener_fds_.size();
346 // Add one extra file descriptor to poll the pipe fd.
347 ++nfds;
348 struct pollfd* pfds =
349 static_cast<struct pollfd*>(gpr_malloc(sizeof(struct pollfd) * nfds));
350 memset(pfds, 0, sizeof(struct pollfd) * nfds);
351 while (true) {
352 for (int i = 0; i < nfds; i++) {
353 pfds[i].fd = i == nfds - 1 ? pipefd_[0] : listener_fds_[i];
354 pfds[i].events = POLLIN;
355 pfds[i].revents = 0;
356 }
357 if (!PollFds(pfds, nfds, absl::InfiniteDuration()).ok()) {
358 break;
359 }
360 int saved_errno = 0;
361 if ((pfds[nfds - 1].revents & POLLIN) &&
362 ReadBytes(pipefd_[0], saved_errno, strlen(kStopMessage)) ==
363 std::string(kStopMessage)) {
364 break;
365 }
366 for (int i = 0; i < nfds - 1; i++) {
367 if (!(pfds[i].revents & POLLIN)) {
368 continue;
369 }
370 // pfds[i].fd has a readable event.
371 int client_sock_fd = accept(pfds[i].fd, nullptr, nullptr);
372 if (client_sock_fd < 0) {
373 gpr_log(GPR_ERROR,
374 "Error accepting new connection: %s. Ignoring connection "
375 "attempt ...",
376 grpc_core::StrError(errno).c_str());
377 continue;
378 }
379 on_accept_(PosixOracleEndpoint::Create(client_sock_fd),
380 memory_allocator_factory_->CreateMemoryAllocator("test"));
381 }
382 }
383 gpr_log(GPR_INFO, "Shutting down accept thread ...");
384 gpr_free(pfds);
385 }
386
Bind(const EventEngine::ResolvedAddress & addr)387 absl::StatusOr<int> PosixOracleListener::Bind(
388 const EventEngine::ResolvedAddress& addr) {
389 grpc_core::MutexLock lock(&mu_);
390 if (is_started_) {
391 return absl::FailedPreconditionError(
392 "Listener is already started, ports can no longer be bound");
393 }
394 int new_socket;
395 int opt = -1;
396 grpc_resolved_address address = CreateGRPCResolvedAddress(addr);
397 const char* scheme = grpc_sockaddr_get_uri_scheme(&address);
398 if (scheme == nullptr || strcmp(scheme, "ipv6") != 0) {
399 return absl::UnimplementedError(
400 "Unsupported bind address type. Only IPV6 addresses are supported "
401 "currently by the PosixOracleListener ...");
402 }
403
404 // Creating a new socket file descriptor.
405 if ((new_socket = socket(AF_INET6, SOCK_STREAM, 0)) <= 0) {
406 return absl::UnknownError(
407 absl::StrCat("Error creating socket: ", grpc_core::StrError(errno)));
408 }
409 // MacOS biulds fail if SO_REUSEADDR and SO_REUSEPORT are set in the same
410 // setsockopt syscall. So they are set separately one after the other.
411 if (setsockopt(new_socket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
412 return absl::UnknownError(absl::StrCat("Error setsockopt(SO_REUSEADDR): ",
413 grpc_core::StrError(errno)));
414 }
415 if (setsockopt(new_socket, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt))) {
416 return absl::UnknownError(absl::StrCat("Error setsockopt(SO_REUSEPORT): ",
417 grpc_core::StrError(errno)));
418 }
419
420 // Forcefully bind the new socket.
421 if (bind(new_socket, reinterpret_cast<const struct sockaddr*>(addr.address()),
422 address.len) < 0) {
423 return absl::UnknownError(
424 absl::StrCat("Error bind: ", grpc_core::StrError(errno)));
425 }
426 // Set the new socket to listen for one active connection at a time.
427 if (listen(new_socket, 1) < 0) {
428 return absl::UnknownError(
429 absl::StrCat("Error listen: ", grpc_core::StrError(errno)));
430 }
431 listener_fds_.push_back(new_socket);
432 return 0;
433 }
434
435 // PosixOracleEventEngine implements blocking connect. It blocks the calling
436 // thread until either connect succeeds or fails with timeout.
Connect(OnConnectCallback on_connect,const ResolvedAddress & addr,const EndpointConfig &,MemoryAllocator,EventEngine::Duration timeout)437 EventEngine::ConnectionHandle PosixOracleEventEngine::Connect(
438 OnConnectCallback on_connect, const ResolvedAddress& addr,
439 const EndpointConfig& /*args*/, MemoryAllocator /*memory_allocator*/,
440 EventEngine::Duration timeout) {
441 int client_sock_fd;
442 absl::Time deadline = absl::Now() + absl::FromChrono(timeout);
443 grpc_resolved_address address = CreateGRPCResolvedAddress(addr);
444 const char* scheme = grpc_sockaddr_get_uri_scheme(&address);
445 if (scheme == nullptr || strcmp(scheme, "ipv6") != 0) {
446 on_connect(
447 absl::CancelledError("Unsupported bind address type. Only ipv6 "
448 "addresses are currently supported."));
449 return {};
450 }
451 if ((client_sock_fd = socket(AF_INET6, SOCK_STREAM, 0)) < 0) {
452 on_connect(absl::CancelledError(
453 absl::StrCat("Connect failed: socket creation error: ",
454 grpc_core::StrError(errno).c_str())));
455 return {};
456 }
457 int err;
458 int num_retries = 0;
459 static constexpr int kMaxRetries = 5;
460 do {
461 err = connect(client_sock_fd, const_cast<struct sockaddr*>(addr.address()),
462 address.len);
463 if (err < 0 && (errno == EINPROGRESS || errno == EWOULDBLOCK)) {
464 auto status = BlockUntilWritableWithTimeout(
465 client_sock_fd,
466 std::max(deadline - absl::Now(), absl::ZeroDuration()));
467 if (!status.ok()) {
468 on_connect(status);
469 return {};
470 }
471 } else if (err < 0) {
472 if (errno != ECONNREFUSED || ++num_retries > kMaxRetries) {
473 on_connect(absl::CancelledError("Connect failed."));
474 return {};
475 }
476 // If ECONNREFUSED && num_retries < kMaxRetries, wait a while and try
477 // again.
478 absl::SleepFor(absl::Milliseconds(100));
479 }
480 } while (err < 0 && absl::Now() < deadline);
481 if (err < 0 && absl::Now() >= deadline) {
482 on_connect(absl::CancelledError("Deadline exceeded"));
483 } else {
484 on_connect(PosixOracleEndpoint::Create(client_sock_fd));
485 }
486 return {};
487 }
488
489 } // namespace experimental
490 } // namespace grpc_event_engine
491