1 /*
2 * Copyright 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "os/reactor.h"
18
19 #include <fcntl.h>
20 #include <sys/epoll.h>
21 #include <sys/eventfd.h>
22 #include <unistd.h>
23
24 #include <algorithm>
25 #include <cerrno>
26 #include <cinttypes>
27 #include <cstring>
28
29 #include "os/log.h"
30
31 namespace {
32
33 // Use at most sizeof(epoll_event) * kEpollMaxEvents kernel memory
34 constexpr int kEpollMaxEvents = 64;
35 constexpr uint64_t kStopReactor = 1 << 0;
36 constexpr uint64_t kWaitForIdle = 1 << 1;
37
38 } // namespace
39
40 namespace bluetooth {
41 namespace os {
42 using common::Closure;
43
44 struct Reactor::Event::impl {
implbluetooth::os::Reactor::Event::impl45 impl() {
46 fd_ = eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK);
47 ASSERT_LOG(fd_ != -1, "Unable to create nonblocking event file descriptor semaphore");
48 }
~implbluetooth::os::Reactor::Event::impl49 ~impl() {
50 ASSERT_LOG(fd_ != -1, "Unable to close a never-opened event file descriptor");
51 close(fd_);
52 fd_ = -1;
53 }
54 int fd_ = -1;
55 };
56
Event()57 Reactor::Event::Event() : pimpl_(new impl()) {}
~Event()58 Reactor::Event::~Event() {
59 delete pimpl_;
60 }
61
Read()62 bool Reactor::Event::Read() {
63 uint64_t val = 0;
64 return eventfd_read(pimpl_->fd_, &val) == 0;
65 }
Id() const66 int Reactor::Event::Id() const {
67 return pimpl_->fd_;
68 }
Clear()69 void Reactor::Event::Clear() {
70 uint64_t val;
71 while (eventfd_read(pimpl_->fd_, &val) == 0) {
72 }
73 }
Close()74 void Reactor::Event::Close() {
75 int close_status;
76 RUN_NO_INTR(close_status = close(pimpl_->fd_));
77 ASSERT(close_status != -1);
78 }
Notify()79 void Reactor::Event::Notify() {
80 uint64_t val = 1;
81 auto write_result = eventfd_write(pimpl_->fd_, val);
82 ASSERT(write_result != -1);
83 }
84
85 class Reactor::Reactable {
86 public:
Reactable(int fd,Closure on_read_ready,Closure on_write_ready)87 Reactable(int fd, Closure on_read_ready, Closure on_write_ready)
88 : fd_(fd),
89 on_read_ready_(std::move(on_read_ready)),
90 on_write_ready_(std::move(on_write_ready)),
91 is_executing_(false),
92 removed_(false) {}
93 const int fd_;
94 Closure on_read_ready_;
95 Closure on_write_ready_;
96 bool is_executing_;
97 bool removed_;
98 std::mutex mutex_;
99 std::unique_ptr<std::promise<void>> finished_promise_;
100 };
101
Reactor()102 Reactor::Reactor() : epoll_fd_(0), control_fd_(0), is_running_(false) {
103 RUN_NO_INTR(epoll_fd_ = epoll_create1(EPOLL_CLOEXEC));
104 ASSERT_LOG(epoll_fd_ != -1, "could not create epoll fd: %s", strerror(errno));
105
106 control_fd_ = eventfd(0, EFD_NONBLOCK);
107 ASSERT(control_fd_ != -1);
108
109 epoll_event control_epoll_event = {EPOLLIN, {.ptr = nullptr}};
110 int result;
111 RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, control_fd_, &control_epoll_event));
112 ASSERT(result != -1);
113 }
114
~Reactor()115 Reactor::~Reactor() {
116 int result;
117 RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, control_fd_, nullptr));
118 ASSERT(result != -1);
119
120 RUN_NO_INTR(result = close(control_fd_));
121 ASSERT(result != -1);
122
123 RUN_NO_INTR(result = close(epoll_fd_));
124 ASSERT(result != -1);
125 }
126
Run()127 void Reactor::Run() {
128 bool already_running = is_running_.exchange(true);
129 ASSERT(!already_running);
130
131 int timeout_ms = -1;
132 bool waiting_for_idle = false;
133 for (;;) {
134 {
135 std::unique_lock<std::mutex> lock(mutex_);
136 invalidation_list_.clear();
137 }
138 epoll_event events[kEpollMaxEvents];
139 int count;
140 RUN_NO_INTR(count = epoll_wait(epoll_fd_, events, kEpollMaxEvents, timeout_ms));
141 ASSERT(count != -1);
142 if (waiting_for_idle && count == 0) {
143 timeout_ms = -1;
144 waiting_for_idle = false;
145 idle_promise_->set_value();
146 idle_promise_ = nullptr;
147 }
148
149 for (int i = 0; i < count; ++i) {
150 auto event = events[i];
151 ASSERT(event.events != 0u);
152
153 // If the ptr stored in epoll_event.data is nullptr, it means the control fd triggered
154 if (event.data.ptr == nullptr) {
155 uint64_t value;
156 eventfd_read(control_fd_, &value);
157 if ((value & kStopReactor) != 0) {
158 is_running_ = false;
159 return;
160 } else if ((value & kWaitForIdle) != 0) {
161 timeout_ms = 30;
162 waiting_for_idle = true;
163 continue;
164 } else {
165 LOG_ERROR("Unknown control_fd value %" PRIu64 "x", value);
166 continue;
167 }
168 }
169 auto* reactable = static_cast<Reactor::Reactable*>(event.data.ptr);
170 std::unique_lock<std::mutex> lock(mutex_);
171 executing_reactable_finished_ = nullptr;
172 // See if this reactable has been removed in the meantime.
173 if (std::find(invalidation_list_.begin(), invalidation_list_.end(), reactable) != invalidation_list_.end()) {
174 continue;
175 }
176
177 {
178 std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
179 lock.unlock();
180 reactable->is_executing_ = true;
181 }
182 if (event.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && !reactable->on_read_ready_.is_null()) {
183 reactable->on_read_ready_.Run();
184 }
185 if (event.events & EPOLLOUT && !reactable->on_write_ready_.is_null()) {
186 reactable->on_write_ready_.Run();
187 }
188 {
189 std::unique_lock<std::mutex> reactable_lock(reactable->mutex_);
190 reactable->is_executing_ = false;
191 if (reactable->removed_) {
192 reactable->finished_promise_->set_value();
193 reactable_lock.unlock();
194 delete reactable;
195 }
196 }
197 }
198 }
199 }
200
Stop()201 void Reactor::Stop() {
202 if (!is_running_) {
203 LOG_WARN("not running, will stop once it's started");
204 }
205 auto control = eventfd_write(control_fd_, kStopReactor);
206 ASSERT(control != -1);
207 }
208
NewEvent() const209 std::unique_ptr<Reactor::Event> Reactor::NewEvent() const {
210 return std::make_unique<Reactor::Event>();
211 }
212
Register(int fd,Closure on_read_ready,Closure on_write_ready)213 Reactor::Reactable* Reactor::Register(int fd, Closure on_read_ready, Closure on_write_ready) {
214 uint32_t poll_event_type = 0;
215 if (!on_read_ready.is_null()) {
216 poll_event_type |= (EPOLLIN | EPOLLRDHUP);
217 }
218 if (!on_write_ready.is_null()) {
219 poll_event_type |= EPOLLOUT;
220 }
221 auto* reactable = new Reactable(fd, on_read_ready, on_write_ready);
222 epoll_event event = {
223 .events = poll_event_type,
224 .data = {.ptr = reactable},
225 };
226 int register_fd;
227 RUN_NO_INTR(register_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event));
228 ASSERT(register_fd != -1);
229 return reactable;
230 }
231
Unregister(Reactor::Reactable * reactable)232 void Reactor::Unregister(Reactor::Reactable* reactable) {
233 ASSERT(reactable != nullptr);
234 {
235 std::lock_guard<std::mutex> lock(mutex_);
236 invalidation_list_.push_back(reactable);
237 }
238 bool delaying_delete_until_callback_finished = false;
239 {
240 int result;
241 std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
242 RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, reactable->fd_, nullptr));
243 if (result == -1 && errno == ENOENT) {
244 LOG_INFO("reactable is invalid or unregistered");
245 } else {
246 ASSERT(result != -1);
247 }
248
249 // If we are unregistering during the callback event from this reactable, we delete it after the callback is
250 // executed. reactable->is_executing_ is protected by reactable->mutex_, so it's thread safe.
251 if (reactable->is_executing_) {
252 reactable->removed_ = true;
253 reactable->finished_promise_ = std::make_unique<std::promise<void>>();
254 executing_reactable_finished_ = std::make_shared<std::future<void>>(reactable->finished_promise_->get_future());
255 delaying_delete_until_callback_finished = true;
256 }
257 }
258 // If we are unregistering outside of the callback event from this reactable, we delete it now
259 if (!delaying_delete_until_callback_finished) {
260 delete reactable;
261 }
262 }
263
WaitForUnregisteredReactable(std::chrono::milliseconds timeout)264 bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) {
265 std::lock_guard<std::mutex> lock(mutex_);
266 if (executing_reactable_finished_ == nullptr) {
267 return true;
268 }
269 auto stop_status = executing_reactable_finished_->wait_for(timeout);
270 if (stop_status != std::future_status::ready) {
271 LOG_ERROR("Unregister reactable timed out");
272 }
273 return stop_status == std::future_status::ready;
274 }
275
WaitForIdle(std::chrono::milliseconds timeout)276 bool Reactor::WaitForIdle(std::chrono::milliseconds timeout) {
277 auto promise = std::make_shared<std::promise<void>>();
278 auto future = std::make_unique<std::future<void>>(promise->get_future());
279 {
280 std::lock_guard<std::mutex> lock(mutex_);
281 idle_promise_ = promise;
282 }
283
284 auto control = eventfd_write(control_fd_, kWaitForIdle);
285 ASSERT(control != -1);
286
287 auto idle_status = future->wait_for(timeout);
288 return idle_status == std::future_status::ready;
289 }
290
ModifyRegistration(Reactor::Reactable * reactable,Closure on_read_ready,Closure on_write_ready)291 void Reactor::ModifyRegistration(Reactor::Reactable* reactable, Closure on_read_ready, Closure on_write_ready) {
292 ASSERT(reactable != nullptr);
293
294 uint32_t poll_event_type = 0;
295 if (!on_read_ready.is_null()) {
296 poll_event_type |= (EPOLLIN | EPOLLRDHUP);
297 }
298 if (!on_write_ready.is_null()) {
299 poll_event_type |= EPOLLOUT;
300 }
301 {
302 std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
303 reactable->on_read_ready_ = std::move(on_read_ready);
304 reactable->on_write_ready_ = std::move(on_write_ready);
305 }
306 epoll_event event = {
307 .events = poll_event_type,
308 .data = {.ptr = reactable},
309 };
310 int modify_fd;
311 RUN_NO_INTR(modify_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, reactable->fd_, &event));
312 ASSERT(modify_fd != -1);
313 }
314
315 } // namespace os
316 } // namespace bluetooth
317