1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #include <memory>
18 #include <utility>
19
20 #include "absl/strings/str_cat.h"
21 #include "src/core/lib/iomgr/port.h"
22 #include "src/core/util/crash.h" // IWYU pragma: keep
23
24 #ifdef GRPC_POSIX_WAKEUP_FD
25 #include <errno.h>
26 #include <fcntl.h>
27 #include <unistd.h>
28
29 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
30 #endif
31
32 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h"
33 #include "src/core/util/strerror.h"
34
35 namespace grpc_event_engine {
36 namespace experimental {
37
38 #ifdef GRPC_POSIX_WAKEUP_FD
39
40 namespace {
41
SetSocketNonBlocking(int fd)42 absl::Status SetSocketNonBlocking(int fd) {
43 int oldflags = fcntl(fd, F_GETFL, 0);
44 if (oldflags < 0) {
45 return absl::Status(absl::StatusCode::kInternal,
46 absl::StrCat("fcntl: ", grpc_core::StrError(errno)));
47 }
48
49 oldflags |= O_NONBLOCK;
50
51 if (fcntl(fd, F_SETFL, oldflags) != 0) {
52 return absl::Status(absl::StatusCode::kInternal,
53 absl::StrCat("fcntl: ", grpc_core::StrError(errno)));
54 }
55
56 return absl::OkStatus();
57 }
58 } // namespace
59
Init()60 absl::Status PipeWakeupFd::Init() {
61 int pipefd[2];
62 int r = pipe(pipefd);
63 if (0 != r) {
64 return absl::Status(absl::StatusCode::kInternal,
65 absl::StrCat("pipe: ", grpc_core::StrError(errno)));
66 }
67 auto status = SetSocketNonBlocking(pipefd[0]);
68 if (!status.ok()) return status;
69 status = SetSocketNonBlocking(pipefd[1]);
70 if (!status.ok()) return status;
71 SetWakeupFds(pipefd[0], pipefd[1]);
72 return absl::OkStatus();
73 }
74
ConsumeWakeup()75 absl::Status PipeWakeupFd::ConsumeWakeup() {
76 char buf[128];
77 ssize_t r;
78
79 for (;;) {
80 r = read(ReadFd(), buf, sizeof(buf));
81 if (r > 0) continue;
82 if (r == 0) return absl::OkStatus();
83 switch (errno) {
84 case EAGAIN:
85 return absl::OkStatus();
86 case EINTR:
87 continue;
88 default:
89 return absl::Status(absl::StatusCode::kInternal,
90 absl::StrCat("read: ", grpc_core::StrError(errno)));
91 }
92 }
93 }
94
Wakeup()95 absl::Status PipeWakeupFd::Wakeup() {
96 char c = 0;
97 while (write(WriteFd(), &c, 1) != 1 && errno == EINTR) {
98 }
99 return absl::OkStatus();
100 }
101
~PipeWakeupFd()102 PipeWakeupFd::~PipeWakeupFd() {
103 if (ReadFd() != 0) {
104 close(ReadFd());
105 }
106 if (WriteFd() != 0) {
107 close(WriteFd());
108 }
109 }
110
IsSupported()111 bool PipeWakeupFd::IsSupported() {
112 PipeWakeupFd pipe_wakeup_fd;
113 return pipe_wakeup_fd.Init().ok();
114 }
115
CreatePipeWakeupFd()116 absl::StatusOr<std::unique_ptr<WakeupFd>> PipeWakeupFd::CreatePipeWakeupFd() {
117 static bool kIsPipeWakeupFdSupported = PipeWakeupFd::IsSupported();
118 if (kIsPipeWakeupFdSupported) {
119 auto pipe_wakeup_fd = std::make_unique<PipeWakeupFd>();
120 auto status = pipe_wakeup_fd->Init();
121 if (status.ok()) {
122 return std::unique_ptr<WakeupFd>(std::move(pipe_wakeup_fd));
123 }
124 return status;
125 }
126 return absl::NotFoundError("Pipe wakeup fd is not supported");
127 }
128
129 #else // GRPC_POSIX_WAKEUP_FD
130
131 absl::Status PipeWakeupFd::Init() { grpc_core::Crash("unimplemented"); }
132
133 absl::Status PipeWakeupFd::ConsumeWakeup() {
134 grpc_core::Crash("unimplemented");
135 }
136
137 absl::Status PipeWakeupFd::Wakeup() { grpc_core::Crash("unimplemented"); }
138
139 bool PipeWakeupFd::IsSupported() { return false; }
140
141 absl::StatusOr<std::unique_ptr<WakeupFd>> PipeWakeupFd::CreatePipeWakeupFd() {
142 return absl::NotFoundError("Pipe wakeup fd is not supported");
143 }
144
145 #endif // GRPC_POSIX_WAKEUP_FD
146
147 } // namespace experimental
148 } // namespace grpc_event_engine
149