• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "test/core/event_engine/test_suite/posix/oracle_event_engine_posix.h"
16 
17 #include <poll.h>
18 #include <sys/socket.h>
19 #include <unistd.h>
20 
21 #include <algorithm>
22 #include <cerrno>
23 #include <cstring>
24 #include <memory>
25 
26 #include "absl/status/status.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/str_format.h"
29 #include "absl/time/clock.h"
30 #include "absl/time/time.h"
31 
32 #include <grpc/event_engine/event_engine.h>
33 #include <grpc/support/alloc.h>
34 #include <grpc/support/log.h>
35 
36 #include "src/core/lib/address_utils/sockaddr_utils.h"
37 #include "src/core/lib/gprpp/crash.h"
38 #include "src/core/lib/gprpp/strerror.h"
39 #include "src/core/lib/iomgr/resolved_address.h"
40 
41 namespace grpc_event_engine {
42 namespace experimental {
43 
44 namespace {
45 
46 const char* kStopMessage = "STOP";
47 
CreateGRPCResolvedAddress(const EventEngine::ResolvedAddress & ra)48 grpc_resolved_address CreateGRPCResolvedAddress(
49     const EventEngine::ResolvedAddress& ra) {
50   grpc_resolved_address grpc_addr;
51   memcpy(grpc_addr.addr, ra.address(), ra.size());
52   grpc_addr.len = ra.size();
53   return grpc_addr;
54 }
55 
56 // Blocks until poll(2) indicates that one of the fds has pending I/O
57 // the deadline is reached whichever comes first. Returns an OK
58 // status a valid I/O event is available for at least one of the fds, a Status
59 // with canonical code DEADLINE_EXCEEDED if the deadline expired and a non-OK
60 // Status if any other error occurred.
PollFds(struct pollfd * pfds,int nfds,absl::Duration timeout)61 absl::Status PollFds(struct pollfd* pfds, int nfds, absl::Duration timeout) {
62   int rv;
63   while (true) {
64     if (timeout != absl::InfiniteDuration()) {
65       rv = poll(pfds, nfds,
66                 static_cast<int>(absl::ToInt64Milliseconds(timeout)));
67     } else {
68       rv = poll(pfds, nfds, /* timeout = */ -1);
69     }
70     const int saved_errno = errno;
71     errno = saved_errno;
72     if (rv >= 0 || errno != EINTR) {
73       break;
74     }
75   }
76   if (rv < 0) {
77     return absl::UnknownError(grpc_core::StrError(errno));
78   }
79   if (rv == 0) {
80     return absl::CancelledError("Deadline exceeded");
81   }
82   return absl::OkStatus();
83 }
84 
BlockUntilReadable(int fd)85 absl::Status BlockUntilReadable(int fd) {
86   struct pollfd pfd;
87   pfd.fd = fd;
88   pfd.events = POLLIN;
89   pfd.revents = 0;
90   return PollFds(&pfd, 1, absl::InfiniteDuration());
91 }
92 
BlockUntilWritableWithTimeout(int fd,absl::Duration timeout)93 absl::Status BlockUntilWritableWithTimeout(int fd, absl::Duration timeout) {
94   struct pollfd pfd;
95   pfd.fd = fd;
96   pfd.events = POLLOUT;
97   pfd.revents = 0;
98   return PollFds(&pfd, 1, timeout);
99 }
100 
BlockUntilWritable(int fd)101 absl::Status BlockUntilWritable(int fd) {
102   return BlockUntilWritableWithTimeout(fd, absl::InfiniteDuration());
103 }
104 
105 // Tries to read upto num_expected_bytes from the socket. It returns early if
106 // specified data is not yet available.
TryReadBytes(int sockfd,int & saved_errno,int num_expected_bytes)107 std::string TryReadBytes(int sockfd, int& saved_errno, int num_expected_bytes) {
108   int ret = 0;
109   static constexpr int kDefaultNumExpectedBytes = 1024;
110   if (num_expected_bytes <= 0) {
111     num_expected_bytes = kDefaultNumExpectedBytes;
112   }
113   std::string read_data = std::string(num_expected_bytes, '\0');
114   char* buffer = const_cast<char*>(read_data.c_str());
115   int pending_bytes = num_expected_bytes;
116   do {
117     errno = 0;
118     ret = read(sockfd, buffer + num_expected_bytes - pending_bytes,
119                pending_bytes);
120     if (ret > 0) {
121       pending_bytes -= ret;
122     }
123   } while (pending_bytes > 0 && ((ret > 0) || (ret < 0 && errno == EINTR)));
124   saved_errno = errno;
125   return read_data.substr(0, num_expected_bytes - pending_bytes);
126 }
127 
128 // Blocks calling thread until the specified number of bytes have been
129 // read from the provided socket or it encounters an unrecoverable error. It
130 // puts the read bytes into a string and returns the string. If it encounters an
131 // error, it returns an empty string and updates saved_errno with the
132 // appropriate errno.
ReadBytes(int sockfd,int & saved_errno,int num_expected_bytes)133 std::string ReadBytes(int sockfd, int& saved_errno, int num_expected_bytes) {
134   std::string read_data;
135   do {
136     saved_errno = 0;
137     read_data += TryReadBytes(sockfd, saved_errno,
138                               num_expected_bytes - read_data.length());
139     if (saved_errno == EAGAIN &&
140         read_data.length() < static_cast<size_t>(num_expected_bytes)) {
141       GPR_ASSERT(BlockUntilReadable(sockfd).ok());
142     } else if (saved_errno != 0 && num_expected_bytes > 0) {
143       read_data.clear();
144       break;
145     }
146   } while (read_data.length() < static_cast<size_t>(num_expected_bytes));
147   return read_data;
148 }
149 
150 // Tries to write the specified bytes over the socket. It returns the number of
151 // bytes actually written.
TryWriteBytes(int sockfd,int & saved_errno,std::string write_bytes)152 int TryWriteBytes(int sockfd, int& saved_errno, std::string write_bytes) {
153   int ret = 0;
154   int pending_bytes = write_bytes.length();
155   do {
156     errno = 0;
157     ret = write(sockfd,
158                 write_bytes.c_str() + write_bytes.length() - pending_bytes,
159                 pending_bytes);
160     if (ret > 0) {
161       pending_bytes -= ret;
162     }
163   } while (pending_bytes > 0 && ((ret > 0) || (ret < 0 && errno == EINTR)));
164   saved_errno = errno;
165   return write_bytes.length() - pending_bytes;
166 }
167 
168 // Blocks calling thread until the specified number of bytes have been
169 // written over the provided socket or it encounters an unrecoverable error. The
170 // bytes to write are specified as a string. If it encounters an error, it
171 // returns an empty string and updates saved_errno with the appropriate errno
172 // and returns a value less than zero.
WriteBytes(int sockfd,int & saved_errno,std::string write_bytes)173 int WriteBytes(int sockfd, int& saved_errno, std::string write_bytes) {
174   int ret = 0;
175   int original_write_length = write_bytes.length();
176   do {
177     saved_errno = 0;
178     ret = TryWriteBytes(sockfd, saved_errno, write_bytes);
179     if (saved_errno == EAGAIN && ret < static_cast<int>(write_bytes.length())) {
180       GPR_ASSERT(ret >= 0);
181       GPR_ASSERT(BlockUntilWritable(sockfd).ok());
182     } else if (saved_errno != 0) {
183       GPR_ASSERT(ret < 0);
184       return ret;
185     }
186     write_bytes = write_bytes.substr(ret, std::string::npos);
187   } while (write_bytes.length() > 0);
188   return original_write_length;
189 }
190 }  // namespace
191 
PosixOracleEndpoint(int socket_fd)192 PosixOracleEndpoint::PosixOracleEndpoint(int socket_fd)
193     : socket_fd_(socket_fd) {
194   read_ops_ = grpc_core::Thread(
195       "read_ops_thread",
196       [](void* arg) {
197         static_cast<PosixOracleEndpoint*>(arg)->ProcessReadOperations();
198       },
199       this);
200   write_ops_ = grpc_core::Thread(
201       "write_ops_thread",
202       [](void* arg) {
203         static_cast<PosixOracleEndpoint*>(arg)->ProcessWriteOperations();
204       },
205       this);
206   read_ops_.Start();
207   write_ops_.Start();
208 }
209 
Shutdown()210 void PosixOracleEndpoint::Shutdown() {
211   grpc_core::MutexLock lock(&mu_);
212   if (std::exchange(is_shutdown_, true)) {
213     return;
214   }
215   read_ops_channel_ = ReadOperation();
216   read_op_signal_->Notify();
217   write_ops_channel_ = WriteOperation();
218   write_op_signal_->Notify();
219   read_ops_.Join();
220   write_ops_.Join();
221 }
222 
Create(int socket_fd)223 std::unique_ptr<PosixOracleEndpoint> PosixOracleEndpoint::Create(
224     int socket_fd) {
225   return std::make_unique<PosixOracleEndpoint>(socket_fd);
226 }
227 
~PosixOracleEndpoint()228 PosixOracleEndpoint::~PosixOracleEndpoint() {
229   Shutdown();
230   close(socket_fd_);
231 }
232 
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const ReadArgs * args)233 bool PosixOracleEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
234                                SliceBuffer* buffer, const ReadArgs* args) {
235   grpc_core::MutexLock lock(&mu_);
236   GPR_ASSERT(buffer != nullptr);
237   int read_hint_bytes =
238       args != nullptr ? std::max(1, static_cast<int>(args->read_hint_bytes))
239                       : 0;
240   read_ops_channel_ =
241       ReadOperation(read_hint_bytes, buffer, std::move(on_read));
242   read_op_signal_->Notify();
243   return false;
244 }
245 
Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const WriteArgs *)246 bool PosixOracleEndpoint::Write(
247     absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
248     const WriteArgs* /*args*/) {
249   grpc_core::MutexLock lock(&mu_);
250   GPR_ASSERT(data != nullptr);
251   write_ops_channel_ = WriteOperation(data, std::move(on_writable));
252   write_op_signal_->Notify();
253   return false;
254 }
255 
ProcessReadOperations()256 void PosixOracleEndpoint::ProcessReadOperations() {
257   gpr_log(GPR_INFO, "Starting thread to process read ops ...");
258   while (true) {
259     read_op_signal_->WaitForNotification();
260     read_op_signal_ = std::make_unique<grpc_core::Notification>();
261     auto read_op = std::exchange(read_ops_channel_, ReadOperation());
262     if (!read_op.IsValid()) {
263       read_op(std::string(), absl::CancelledError("Closed"));
264       break;
265     }
266     int saved_errno;
267     std::string read_data =
268         ReadBytes(socket_fd_, saved_errno, read_op.GetNumBytesToRead());
269     read_op(read_data, read_data.empty()
270                            ? absl::CancelledError(
271                                  absl::StrCat("Read failed with error = ",
272                                               grpc_core::StrError(saved_errno)))
273                            : absl::OkStatus());
274   }
275   gpr_log(GPR_INFO, "Shutting down read ops thread ...");
276 }
277 
ProcessWriteOperations()278 void PosixOracleEndpoint::ProcessWriteOperations() {
279   gpr_log(GPR_INFO, "Starting thread to process write ops ...");
280   while (true) {
281     write_op_signal_->WaitForNotification();
282     write_op_signal_ = std::make_unique<grpc_core::Notification>();
283     auto write_op = std::exchange(write_ops_channel_, WriteOperation());
284     if (!write_op.IsValid()) {
285       write_op(absl::CancelledError("Closed"));
286       break;
287     }
288     int saved_errno;
289     int ret = WriteBytes(socket_fd_, saved_errno, write_op.GetBytesToWrite());
290     write_op(ret < 0 ? absl::CancelledError(
291                            absl::StrCat("Write failed with error = ",
292                                         grpc_core::StrError(saved_errno)))
293                      : absl::OkStatus());
294   }
295   gpr_log(GPR_INFO, "Shutting down write ops thread ...");
296 }
297 
PosixOracleListener(EventEngine::Listener::AcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)298 PosixOracleListener::PosixOracleListener(
299     EventEngine::Listener::AcceptCallback on_accept,
300     absl::AnyInvocable<void(absl::Status)> on_shutdown,
301     std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
302     : on_accept_(std::move(on_accept)),
303       on_shutdown_(std::move(on_shutdown)),
304       memory_allocator_factory_(std::move(memory_allocator_factory)) {
305   if (pipe(pipefd_) == -1) {
306     grpc_core::Crash(absl::StrFormat("Error creating pipe: %s",
307                                      grpc_core::StrError(errno).c_str()));
308   }
309 }
310 
Start()311 absl::Status PosixOracleListener::Start() {
312   grpc_core::MutexLock lock(&mu_);
313   GPR_ASSERT(!listener_fds_.empty());
314   if (std::exchange(is_started_, true)) {
315     return absl::InternalError("Cannot start listener more than once ...");
316   }
317   serve_ = grpc_core::Thread(
318       "accept_thread",
319       [](void* arg) {
320         static_cast<PosixOracleListener*>(arg)->HandleIncomingConnections();
321       },
322       this);
323   serve_.Start();
324   return absl::OkStatus();
325 }
326 
~PosixOracleListener()327 PosixOracleListener::~PosixOracleListener() {
328   grpc_core::MutexLock lock(&mu_);
329   if (!is_started_) {
330     serve_.Join();
331     return;
332   }
333   for (int i = 0; i < static_cast<int>(listener_fds_.size()); i++) {
334     shutdown(listener_fds_[i], SHUT_RDWR);
335   }
336   // Send a STOP message over the pipe.
337   GPR_ASSERT(write(pipefd_[1], kStopMessage, strlen(kStopMessage)) != -1);
338   serve_.Join();
339   on_shutdown_(absl::OkStatus());
340 }
341 
HandleIncomingConnections()342 void PosixOracleListener::HandleIncomingConnections() {
343   gpr_log(GPR_INFO, "Starting accept thread ...");
344   GPR_ASSERT(!listener_fds_.empty());
345   int nfds = listener_fds_.size();
346   // Add one extra file descriptor to poll the pipe fd.
347   ++nfds;
348   struct pollfd* pfds =
349       static_cast<struct pollfd*>(gpr_malloc(sizeof(struct pollfd) * nfds));
350   memset(pfds, 0, sizeof(struct pollfd) * nfds);
351   while (true) {
352     for (int i = 0; i < nfds; i++) {
353       pfds[i].fd = i == nfds - 1 ? pipefd_[0] : listener_fds_[i];
354       pfds[i].events = POLLIN;
355       pfds[i].revents = 0;
356     }
357     if (!PollFds(pfds, nfds, absl::InfiniteDuration()).ok()) {
358       break;
359     }
360     int saved_errno = 0;
361     if ((pfds[nfds - 1].revents & POLLIN) &&
362         ReadBytes(pipefd_[0], saved_errno, strlen(kStopMessage)) ==
363             std::string(kStopMessage)) {
364       break;
365     }
366     for (int i = 0; i < nfds - 1; i++) {
367       if (!(pfds[i].revents & POLLIN)) {
368         continue;
369       }
370       // pfds[i].fd has a readable event.
371       int client_sock_fd = accept(pfds[i].fd, nullptr, nullptr);
372       if (client_sock_fd < 0) {
373         gpr_log(GPR_ERROR,
374                 "Error accepting new connection: %s. Ignoring connection "
375                 "attempt ...",
376                 grpc_core::StrError(errno).c_str());
377         continue;
378       }
379       on_accept_(PosixOracleEndpoint::Create(client_sock_fd),
380                  memory_allocator_factory_->CreateMemoryAllocator("test"));
381     }
382   }
383   gpr_log(GPR_INFO, "Shutting down accept thread ...");
384   gpr_free(pfds);
385 }
386 
Bind(const EventEngine::ResolvedAddress & addr)387 absl::StatusOr<int> PosixOracleListener::Bind(
388     const EventEngine::ResolvedAddress& addr) {
389   grpc_core::MutexLock lock(&mu_);
390   if (is_started_) {
391     return absl::FailedPreconditionError(
392         "Listener is already started, ports can no longer be bound");
393   }
394   int new_socket;
395   int opt = -1;
396   grpc_resolved_address address = CreateGRPCResolvedAddress(addr);
397   const char* scheme = grpc_sockaddr_get_uri_scheme(&address);
398   if (scheme == nullptr || strcmp(scheme, "ipv6") != 0) {
399     return absl::UnimplementedError(
400         "Unsupported bind address type. Only IPV6 addresses are supported "
401         "currently by the PosixOracleListener ...");
402   }
403 
404   // Creating a new socket file descriptor.
405   if ((new_socket = socket(AF_INET6, SOCK_STREAM, 0)) <= 0) {
406     return absl::UnknownError(
407         absl::StrCat("Error creating socket: ", grpc_core::StrError(errno)));
408   }
409   // MacOS biulds fail if SO_REUSEADDR and SO_REUSEPORT are set in the same
410   // setsockopt syscall. So they are set separately one after the other.
411   if (setsockopt(new_socket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
412     return absl::UnknownError(absl::StrCat("Error setsockopt(SO_REUSEADDR): ",
413                                            grpc_core::StrError(errno)));
414   }
415   if (setsockopt(new_socket, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt))) {
416     return absl::UnknownError(absl::StrCat("Error setsockopt(SO_REUSEPORT): ",
417                                            grpc_core::StrError(errno)));
418   }
419 
420   // Forcefully bind the new socket.
421   if (bind(new_socket, reinterpret_cast<const struct sockaddr*>(addr.address()),
422            address.len) < 0) {
423     return absl::UnknownError(
424         absl::StrCat("Error bind: ", grpc_core::StrError(errno)));
425   }
426   // Set the new socket to listen for one active connection at a time.
427   if (listen(new_socket, 1) < 0) {
428     return absl::UnknownError(
429         absl::StrCat("Error listen: ", grpc_core::StrError(errno)));
430   }
431   listener_fds_.push_back(new_socket);
432   return 0;
433 }
434 
435 // PosixOracleEventEngine implements blocking connect. It blocks the calling
436 // thread until either connect succeeds or fails with timeout.
Connect(OnConnectCallback on_connect,const ResolvedAddress & addr,const EndpointConfig &,MemoryAllocator,EventEngine::Duration timeout)437 EventEngine::ConnectionHandle PosixOracleEventEngine::Connect(
438     OnConnectCallback on_connect, const ResolvedAddress& addr,
439     const EndpointConfig& /*args*/, MemoryAllocator /*memory_allocator*/,
440     EventEngine::Duration timeout) {
441   int client_sock_fd;
442   absl::Time deadline = absl::Now() + absl::FromChrono(timeout);
443   grpc_resolved_address address = CreateGRPCResolvedAddress(addr);
444   const char* scheme = grpc_sockaddr_get_uri_scheme(&address);
445   if (scheme == nullptr || strcmp(scheme, "ipv6") != 0) {
446     on_connect(
447         absl::CancelledError("Unsupported bind address type. Only ipv6 "
448                              "addresses are currently supported."));
449     return {};
450   }
451   if ((client_sock_fd = socket(AF_INET6, SOCK_STREAM, 0)) < 0) {
452     on_connect(absl::CancelledError(
453         absl::StrCat("Connect failed: socket creation error: ",
454                      grpc_core::StrError(errno).c_str())));
455     return {};
456   }
457   int err;
458   int num_retries = 0;
459   static constexpr int kMaxRetries = 5;
460   do {
461     err = connect(client_sock_fd, const_cast<struct sockaddr*>(addr.address()),
462                   address.len);
463     if (err < 0 && (errno == EINPROGRESS || errno == EWOULDBLOCK)) {
464       auto status = BlockUntilWritableWithTimeout(
465           client_sock_fd,
466           std::max(deadline - absl::Now(), absl::ZeroDuration()));
467       if (!status.ok()) {
468         on_connect(status);
469         return {};
470       }
471     } else if (err < 0) {
472       if (errno != ECONNREFUSED || ++num_retries > kMaxRetries) {
473         on_connect(absl::CancelledError("Connect failed."));
474         return {};
475       }
476       // If ECONNREFUSED && num_retries < kMaxRetries, wait a while and try
477       // again.
478       absl::SleepFor(absl::Milliseconds(100));
479     }
480   } while (err < 0 && absl::Now() < deadline);
481   if (err < 0 && absl::Now() >= deadline) {
482     on_connect(absl::CancelledError("Deadline exceeded"));
483   } else {
484     on_connect(PosixOracleEndpoint::Create(client_sock_fd));
485   }
486   return {};
487 }
488 
489 }  // namespace experimental
490 }  // namespace grpc_event_engine
491