• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_channel/epoll_channel.h"
16 
17 #include <fcntl.h>
18 #include <unistd.h>
19 
20 #include "pw_log/log.h"
21 #include "pw_status/try.h"
22 
23 namespace pw::channel {
24 
Register()25 void EpollChannel::Register() {
26   if (fcntl(channel_fd_, F_SETFL, O_NONBLOCK) != 0) {
27     PW_LOG_ERROR("Failed to make channel file descriptor nonblocking: %s",
28                  std::strerror(errno));
29     set_closed();
30     return;
31   }
32 
33   if (!dispatcher_
34            ->NativeRegisterFileDescriptor(
35                channel_fd_, async2::Dispatcher::FileDescriptorType::kReadWrite)
36            .ok()) {
37     set_closed();
38     return;
39   }
40 
41   ready_to_write_ = true;
42 }
43 
DoPendRead(async2::Context & cx)44 async2::Poll<Result<multibuf::MultiBuf>> EpollChannel::DoPendRead(
45     async2::Context& cx) {
46   if (!is_read_open()) {
47     return Status::FailedPrecondition();
48   }
49 
50   if (!allocation_future_.has_value()) {
51     allocation_future_ =
52         allocator_->AllocateContiguousAsync(kMinimumReadSize, kDesiredReadSize);
53   }
54   async2::Poll<std::optional<multibuf::MultiBuf>> maybe_multibuf =
55       allocation_future_->Pend(cx);
56   if (maybe_multibuf.IsPending()) {
57     return async2::Pending();
58   }
59 
60   allocation_future_ = std::nullopt;
61 
62   if (!maybe_multibuf->has_value()) {
63     PW_LOG_ERROR("Failed to allocate multibuf for reading");
64     return Status::ResourceExhausted();
65   }
66 
67   multibuf::MultiBuf buf = std::move(**maybe_multibuf);
68   multibuf::Chunk& chunk = *buf.ChunkBegin();
69 
70   int bytes_read = read(channel_fd_, chunk.data(), chunk.size());
71   if (bytes_read >= 0) {
72     buf.Truncate(bytes_read);
73     return async2::Ready(std::move(buf));
74   }
75 
76   if (errno == EAGAIN) {
77     // EAGAIN on a non-blocking read indicates that there is no data available.
78     // Put the task to sleep until the dispatcher is notified that the file
79     // descriptor is active.
80     async2::Waker waker = cx.GetWaker(async2::WaitReason::Unspecified());
81     cx.dispatcher().NativeAddReadWakerForFileDescriptor(channel_fd_,
82                                                         std::move(waker));
83     return async2::Pending();
84   }
85 
86   return Status::Internal();
87 }
88 
DoPendReadyToWrite(async2::Context & cx)89 async2::Poll<Status> EpollChannel::DoPendReadyToWrite(async2::Context& cx) {
90   if (!is_write_open()) {
91     return Status::FailedPrecondition();
92   }
93 
94   if (!ready_to_write_) {
95     // The previous write operation failed. Block the task until the dispatcher
96     // receives a notification for the channel's file descriptor.
97     ready_to_write_ = true;
98     async2::Waker waker = cx.GetWaker(async2::WaitReason::Unspecified());
99     cx.dispatcher().NativeAddWriteWakerForFileDescriptor(channel_fd_,
100                                                          std::move(waker));
101     return async2::Pending();
102   }
103 
104   return OkStatus();
105 }
106 
DoWrite(multibuf::MultiBuf && data)107 Result<channel::WriteToken> EpollChannel::DoWrite(multibuf::MultiBuf&& data) {
108   if (!is_write_open()) {
109     return Status::FailedPrecondition();
110   }
111 
112   const uint32_t token = write_token_++;
113 
114   for (multibuf::Chunk& chunk : data.Chunks()) {
115     if (write(channel_fd_, chunk.data(), chunk.size()) < 0) {
116       if (errno == EAGAIN || errno == EWOULDBLOCK) {
117         // The file descriptor is not currently available. The next call to
118         // `PendReadyToWrite` will put the task to sleep until it is writable
119         // again.
120         ready_to_write_ = false;
121         return Status::Unavailable();
122       }
123 
124       PW_LOG_ERROR("Epoll channel write failed: %s", std::strerror(errno));
125       return Status::Internal();
126     }
127   }
128 
129   return CreateWriteToken(token);
130 }
131 
Cleanup()132 void EpollChannel::Cleanup() {
133   if (is_read_or_write_open()) {
134     dispatcher_->NativeUnregisterFileDescriptor(channel_fd_).IgnoreError();
135     set_closed();
136   }
137   close(channel_fd_);
138 }
139 
140 }  // namespace pw::channel
141