• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #include <memory>
18 #include <utility>
19 
20 #include "absl/strings/str_cat.h"
21 #include "src/core/lib/iomgr/port.h"
22 #include "src/core/util/crash.h"  // IWYU pragma: keep
23 
24 #ifdef GRPC_POSIX_WAKEUP_FD
25 #include <errno.h>
26 #include <fcntl.h>
27 #include <unistd.h>
28 
29 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
30 #endif
31 
32 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h"
33 #include "src/core/util/strerror.h"
34 
35 namespace grpc_event_engine {
36 namespace experimental {
37 
38 #ifdef GRPC_POSIX_WAKEUP_FD
39 
40 namespace {
41 
SetSocketNonBlocking(int fd)42 absl::Status SetSocketNonBlocking(int fd) {
43   int oldflags = fcntl(fd, F_GETFL, 0);
44   if (oldflags < 0) {
45     return absl::Status(absl::StatusCode::kInternal,
46                         absl::StrCat("fcntl: ", grpc_core::StrError(errno)));
47   }
48 
49   oldflags |= O_NONBLOCK;
50 
51   if (fcntl(fd, F_SETFL, oldflags) != 0) {
52     return absl::Status(absl::StatusCode::kInternal,
53                         absl::StrCat("fcntl: ", grpc_core::StrError(errno)));
54   }
55 
56   return absl::OkStatus();
57 }
58 }  // namespace
59 
Init()60 absl::Status PipeWakeupFd::Init() {
61   int pipefd[2];
62   int r = pipe(pipefd);
63   if (0 != r) {
64     return absl::Status(absl::StatusCode::kInternal,
65                         absl::StrCat("pipe: ", grpc_core::StrError(errno)));
66   }
67   auto status = SetSocketNonBlocking(pipefd[0]);
68   if (!status.ok()) return status;
69   status = SetSocketNonBlocking(pipefd[1]);
70   if (!status.ok()) return status;
71   SetWakeupFds(pipefd[0], pipefd[1]);
72   return absl::OkStatus();
73 }
74 
ConsumeWakeup()75 absl::Status PipeWakeupFd::ConsumeWakeup() {
76   char buf[128];
77   ssize_t r;
78 
79   for (;;) {
80     r = read(ReadFd(), buf, sizeof(buf));
81     if (r > 0) continue;
82     if (r == 0) return absl::OkStatus();
83     switch (errno) {
84       case EAGAIN:
85         return absl::OkStatus();
86       case EINTR:
87         continue;
88       default:
89         return absl::Status(absl::StatusCode::kInternal,
90                             absl::StrCat("read: ", grpc_core::StrError(errno)));
91     }
92   }
93 }
94 
Wakeup()95 absl::Status PipeWakeupFd::Wakeup() {
96   char c = 0;
97   while (write(WriteFd(), &c, 1) != 1 && errno == EINTR) {
98   }
99   return absl::OkStatus();
100 }
101 
~PipeWakeupFd()102 PipeWakeupFd::~PipeWakeupFd() {
103   if (ReadFd() != 0) {
104     close(ReadFd());
105   }
106   if (WriteFd() != 0) {
107     close(WriteFd());
108   }
109 }
110 
IsSupported()111 bool PipeWakeupFd::IsSupported() {
112   PipeWakeupFd pipe_wakeup_fd;
113   return pipe_wakeup_fd.Init().ok();
114 }
115 
CreatePipeWakeupFd()116 absl::StatusOr<std::unique_ptr<WakeupFd>> PipeWakeupFd::CreatePipeWakeupFd() {
117   static bool kIsPipeWakeupFdSupported = PipeWakeupFd::IsSupported();
118   if (kIsPipeWakeupFdSupported) {
119     auto pipe_wakeup_fd = std::make_unique<PipeWakeupFd>();
120     auto status = pipe_wakeup_fd->Init();
121     if (status.ok()) {
122       return std::unique_ptr<WakeupFd>(std::move(pipe_wakeup_fd));
123     }
124     return status;
125   }
126   return absl::NotFoundError("Pipe wakeup fd is not supported");
127 }
128 
129 #else  //  GRPC_POSIX_WAKEUP_FD
130 
131 absl::Status PipeWakeupFd::Init() { grpc_core::Crash("unimplemented"); }
132 
133 absl::Status PipeWakeupFd::ConsumeWakeup() {
134   grpc_core::Crash("unimplemented");
135 }
136 
137 absl::Status PipeWakeupFd::Wakeup() { grpc_core::Crash("unimplemented"); }
138 
139 bool PipeWakeupFd::IsSupported() { return false; }
140 
141 absl::StatusOr<std::unique_ptr<WakeupFd>> PipeWakeupFd::CreatePipeWakeupFd() {
142   return absl::NotFoundError("Pipe wakeup fd is not supported");
143 }
144 
145 #endif  //  GRPC_POSIX_WAKEUP_FD
146 
147 }  // namespace experimental
148 }  // namespace grpc_event_engine
149