1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_channel/epoll_channel.h"
16
17 #include <fcntl.h>
18 #include <unistd.h>
19
20 #include "pw_log/log.h"
21 #include "pw_status/try.h"
22
23 namespace pw::channel {
24
Register()25 void EpollChannel::Register() {
26 if (fcntl(channel_fd_, F_SETFL, O_NONBLOCK) != 0) {
27 PW_LOG_ERROR("Failed to make channel file descriptor nonblocking: %s",
28 std::strerror(errno));
29 set_closed();
30 return;
31 }
32
33 if (!dispatcher_
34 ->NativeRegisterFileDescriptor(
35 channel_fd_, async2::Dispatcher::FileDescriptorType::kReadWrite)
36 .ok()) {
37 set_closed();
38 return;
39 }
40
41 ready_to_write_ = true;
42 }
43
DoPendRead(async2::Context & cx)44 async2::Poll<Result<multibuf::MultiBuf>> EpollChannel::DoPendRead(
45 async2::Context& cx) {
46 if (!is_read_open()) {
47 return Status::FailedPrecondition();
48 }
49
50 if (!allocation_future_.has_value()) {
51 allocation_future_ =
52 allocator_->AllocateContiguousAsync(kMinimumReadSize, kDesiredReadSize);
53 }
54 async2::Poll<std::optional<multibuf::MultiBuf>> maybe_multibuf =
55 allocation_future_->Pend(cx);
56 if (maybe_multibuf.IsPending()) {
57 return async2::Pending();
58 }
59
60 allocation_future_ = std::nullopt;
61
62 if (!maybe_multibuf->has_value()) {
63 PW_LOG_ERROR("Failed to allocate multibuf for reading");
64 return Status::ResourceExhausted();
65 }
66
67 multibuf::MultiBuf buf = std::move(**maybe_multibuf);
68 multibuf::Chunk& chunk = *buf.ChunkBegin();
69
70 int bytes_read = read(channel_fd_, chunk.data(), chunk.size());
71 if (bytes_read >= 0) {
72 buf.Truncate(bytes_read);
73 return async2::Ready(std::move(buf));
74 }
75
76 if (errno == EAGAIN) {
77 // EAGAIN on a non-blocking read indicates that there is no data available.
78 // Put the task to sleep until the dispatcher is notified that the file
79 // descriptor is active.
80 async2::Waker waker = cx.GetWaker(async2::WaitReason::Unspecified());
81 cx.dispatcher().NativeAddReadWakerForFileDescriptor(channel_fd_,
82 std::move(waker));
83 return async2::Pending();
84 }
85
86 return Status::Internal();
87 }
88
DoPendReadyToWrite(async2::Context & cx)89 async2::Poll<Status> EpollChannel::DoPendReadyToWrite(async2::Context& cx) {
90 if (!is_write_open()) {
91 return Status::FailedPrecondition();
92 }
93
94 if (!ready_to_write_) {
95 // The previous write operation failed. Block the task until the dispatcher
96 // receives a notification for the channel's file descriptor.
97 ready_to_write_ = true;
98 async2::Waker waker = cx.GetWaker(async2::WaitReason::Unspecified());
99 cx.dispatcher().NativeAddWriteWakerForFileDescriptor(channel_fd_,
100 std::move(waker));
101 return async2::Pending();
102 }
103
104 return OkStatus();
105 }
106
DoWrite(multibuf::MultiBuf && data)107 Result<channel::WriteToken> EpollChannel::DoWrite(multibuf::MultiBuf&& data) {
108 if (!is_write_open()) {
109 return Status::FailedPrecondition();
110 }
111
112 const uint32_t token = write_token_++;
113
114 for (multibuf::Chunk& chunk : data.Chunks()) {
115 if (write(channel_fd_, chunk.data(), chunk.size()) < 0) {
116 if (errno == EAGAIN || errno == EWOULDBLOCK) {
117 // The file descriptor is not currently available. The next call to
118 // `PendReadyToWrite` will put the task to sleep until it is writable
119 // again.
120 ready_to_write_ = false;
121 return Status::Unavailable();
122 }
123
124 PW_LOG_ERROR("Epoll channel write failed: %s", std::strerror(errno));
125 return Status::Internal();
126 }
127 }
128
129 return CreateWriteToken(token);
130 }
131
Cleanup()132 void EpollChannel::Cleanup() {
133 if (is_read_or_write_open()) {
134 dispatcher_->NativeUnregisterFileDescriptor(channel_fd_).IgnoreError();
135 set_closed();
136 }
137 close(channel_fd_);
138 }
139
140 } // namespace pw::channel
141