• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include <fcntl.h>
16 #include <sys/epoll.h>
17 #include <unistd.h>
18 
19 #include <cstring>
20 #include <mutex>
21 
22 #include "pw_assert/check.h"
23 #include "pw_async2/dispatcher_native.h"
24 #include "pw_log/log.h"
25 #include "pw_preprocessor/compiler.h"
26 #include "pw_status/status.h"
27 
28 namespace pw::async2 {
29 namespace {
30 
31 constexpr char kNotificationSignal = 'c';
32 
33 }  // namespace
34 
NativeInit()35 Status Dispatcher::NativeInit() {
36   epoll_fd_ = epoll_create1(0);
37   if (epoll_fd_ == -1) {
38     PW_LOG_ERROR("Failed to open epoll: %s", std::strerror(errno));
39     return Status::Internal();
40   }
41 
42   int pipefd[2];
43   if (pipe2(pipefd, O_DIRECT) == -1) {
44     PW_LOG_ERROR("Failed to create pipe: %s", std::strerror(errno));
45     return Status::Internal();
46   }
47 
48   wait_fd_ = pipefd[0];
49   notify_fd_ = pipefd[1];
50 
51   struct epoll_event event;
52   event.events = EPOLLIN;
53   event.data.fd = wait_fd_;
54   if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, wait_fd_, &event) == -1) {
55     PW_LOG_ERROR("Failed to initialize epoll event for dispatcher");
56     return Status::Internal();
57   }
58 
59   return OkStatus();
60 }
61 
DoRunUntilStalled(Task * task)62 Poll<> Dispatcher::DoRunUntilStalled(Task* task) {
63   {
64     std::lock_guard lock(dispatcher_lock());
65     PW_CHECK(task == nullptr || HasPostedTask(*task),
66              "Attempted to run a dispatcher until a task was stalled, "
67              "but that task has not been `Post`ed to that `Dispatcher`.");
68   }
69   while (true) {
70     RunOneTaskResult result = RunOneTask(task);
71     if (result.completed_main_task() || result.completed_all_tasks()) {
72       return Ready();
73     }
74     if (!result.ran_a_task()) {
75       return Pending();
76     }
77   }
78 }
79 
DoRunToCompletion(Task * task)80 void Dispatcher::DoRunToCompletion(Task* task) {
81   {
82     std::lock_guard lock(dispatcher_lock());
83     PW_CHECK(task == nullptr || HasPostedTask(*task),
84              "Attempted to run a dispatcher until a task was complete, "
85              "but that task has not been `Post`ed to that `Dispatcher`.");
86   }
87   while (true) {
88     RunOneTaskResult result = RunOneTask(task);
89     if (result.completed_main_task() || result.completed_all_tasks()) {
90       return;
91     }
92     if (!result.ran_a_task()) {
93       SleepInfo sleep_info = AttemptRequestWake();
94       if (sleep_info.should_sleep()) {
95         if (!NativeWaitForWake().ok()) {
96           break;
97         }
98       }
99     }
100   }
101 }
102 
NativeWaitForWake()103 Status Dispatcher::NativeWaitForWake() {
104   std::array<epoll_event, kMaxEventsToProcessAtOnce> events;
105 
106   int num_events =
107       epoll_wait(epoll_fd_, events.data(), events.size(), /*timeout=*/-1);
108   if (num_events < 0) {
109     if (errno == EINTR) {
110       return OkStatus();
111     }
112 
113     PW_LOG_ERROR("Dispatcher failed to wait for incoming events: %s",
114                  std::strerror(errno));
115     return Status::Internal();
116   }
117 
118   for (int i = 0; i < num_events; ++i) {
119     epoll_event& event = events[i];
120     if (event.data.fd == wait_fd_) {
121       // Consume the wake notification.
122       char unused;
123       ssize_t bytes_read = read(wait_fd_, &unused, 1);
124       PW_CHECK_INT_EQ(
125           bytes_read, 1, "Dispatcher failed to read wake notification");
126       PW_DCHECK_INT_EQ(unused, kNotificationSignal);
127     } else {
128       if ((event.events & (EPOLLIN | EPOLLRDHUP)) != 0) {
129         NativeFindAndWakeFileDescriptor(event.data.fd,
130                                         FileDescriptorType::kReadable);
131       }
132       if ((event.events & EPOLLOUT) != 0) {
133         NativeFindAndWakeFileDescriptor(event.data.fd,
134                                         FileDescriptorType::kWritable);
135       }
136     }
137   }
138 
139   return OkStatus();
140 }
141 
NativeRegisterFileDescriptor(int fd,FileDescriptorType type)142 Status Dispatcher::NativeRegisterFileDescriptor(int fd,
143                                                 FileDescriptorType type) {
144   epoll_event event;
145   event.events = EPOLLET;
146   event.data.fd = fd;
147 
148   if ((type & FileDescriptorType::kReadable) != 0) {
149     event.events |= EPOLLIN | EPOLLRDHUP;
150   }
151   if ((type & FileDescriptorType::kWritable) != 0) {
152     event.events |= EPOLLOUT;
153   }
154 
155   if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) == -1) {
156     PW_LOG_ERROR("Failed to register epoll event: %s", std::strerror(errno));
157     return Status::Internal();
158   }
159 
160   return OkStatus();
161 }
162 
NativeUnregisterFileDescriptor(int fd)163 Status Dispatcher::NativeUnregisterFileDescriptor(int fd) {
164   epoll_event event;
165   event.data.fd = fd;
166   if (epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event) == -1) {
167     PW_LOG_ERROR("Failed to unregister epoll event: %s", std::strerror(errno));
168     return Status::Internal();
169   }
170 
171   auto fd_waker = std::find_if(fd_wakers_.begin(),
172                                fd_wakers_.end(),
173                                [fd](auto& f) { return f.fd == fd; });
174   if (fd_waker != fd_wakers_.end()) {
175     fd_wakers_.erase(fd_waker);
176   }
177 
178   return OkStatus();
179 }
180 
NativeFindAndWakeFileDescriptor(int fd,FileDescriptorType type)181 void Dispatcher::NativeFindAndWakeFileDescriptor(int fd,
182                                                  FileDescriptorType type) {
183   auto fd_waker =
184       std::find_if(fd_wakers_.begin(), fd_wakers_.end(), [fd, type](auto& f) {
185         return f.fd == fd && f.type == type;
186       });
187   if (fd_waker == fd_wakers_.end()) {
188     PW_LOG_WARN(
189         "Received an event for registered file descriptor %d, but there is no "
190         "task to wake",
191         fd);
192     return;
193   }
194 
195   std::move(fd_waker->waker).Wake();
196   fd_wakers_.erase(fd_waker);
197 }
198 
DoWake()199 void Dispatcher::DoWake() {
200   // Perform a write to unblock the waiting dispatcher.
201   ssize_t bytes_written = write(notify_fd_, &kNotificationSignal, 1);
202   PW_CHECK_INT_EQ(
203       bytes_written, 1, "Dispatcher failed to write wake notification");
204 }
205 
206 }  // namespace pw::async2
207