1 /*
2 * Copyright 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "os/reactor.h"
18
19 #include <fcntl.h>
20 #include <sys/epoll.h>
21 #include <sys/eventfd.h>
22 #include <unistd.h>
23
24 #include <algorithm>
25 #include <cerrno>
26 #include <cinttypes>
27 #include <cstring>
28
29 #include "os/log.h"
30
31 namespace {
32
33 // Use at most sizeof(epoll_event) * kEpollMaxEvents kernel memory
34 constexpr int kEpollMaxEvents = 64;
35 constexpr uint64_t kStopReactor = 1 << 0;
36 constexpr uint64_t kWaitForIdle = 1 << 1;
37
38 } // namespace
39
40 namespace bluetooth {
41 namespace os {
42 using common::Closure;
43
44 struct Reactor::Event::impl {
implbluetooth::os::Reactor::Event::impl45 impl() {
46 fd_ = eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK);
47 ASSERT_LOG(fd_ != -1, "Unable to create nonblocking event file descriptor semaphore");
48 }
~implbluetooth::os::Reactor::Event::impl49 ~impl() {
50 ASSERT_LOG(fd_ != -1, "Unable to close a never-opened event file descriptor");
51 close(fd_);
52 fd_ = -1;
53 }
54 int fd_ = -1;
55 };
56
Event()57 Reactor::Event::Event() : pimpl_(new impl()) {}
~Event()58 Reactor::Event::~Event() {
59 delete pimpl_;
60 }
61
Read()62 bool Reactor::Event::Read() {
63 uint64_t val = 0;
64 return eventfd_read(pimpl_->fd_, &val) == 0;
65 }
Id() const66 int Reactor::Event::Id() const {
67 return pimpl_->fd_;
68 }
Clear()69 void Reactor::Event::Clear() {
70 uint64_t val;
71 while (eventfd_read(pimpl_->fd_, &val) == 0) {
72 }
73 }
Close()74 void Reactor::Event::Close() {
75 int close_status;
76 RUN_NO_INTR(close_status = close(pimpl_->fd_));
77 ASSERT(close_status != -1);
78 }
Notify()79 void Reactor::Event::Notify() {
80 uint64_t val = 1;
81 auto write_result = eventfd_write(pimpl_->fd_, val);
82 ASSERT(write_result != -1);
83 }
84
85 class Reactor::Reactable {
86 public:
Reactable(int fd,Closure on_read_ready,Closure on_write_ready)87 Reactable(int fd, Closure on_read_ready, Closure on_write_ready)
88 : fd_(fd),
89 on_read_ready_(std::move(on_read_ready)),
90 on_write_ready_(std::move(on_write_ready)),
91 is_executing_(false),
92 removed_(false) {}
93 const int fd_;
94 Closure on_read_ready_;
95 Closure on_write_ready_;
96 bool is_executing_;
97 bool removed_;
98 std::mutex mutex_;
99 std::unique_ptr<std::promise<void>> finished_promise_;
100 };
101
Reactor()102 Reactor::Reactor() : epoll_fd_(0), control_fd_(0), is_running_(false) {
103 RUN_NO_INTR(epoll_fd_ = epoll_create1(EPOLL_CLOEXEC));
104 ASSERT_LOG(epoll_fd_ != -1, "could not create epoll fd: %s", strerror(errno));
105
106 control_fd_ = eventfd(0, EFD_NONBLOCK);
107 ASSERT(control_fd_ != -1);
108
109 epoll_event control_epoll_event = {EPOLLIN, {.ptr = nullptr}};
110 int result;
111 RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, control_fd_, &control_epoll_event));
112 ASSERT(result != -1);
113 }
114
~Reactor()115 Reactor::~Reactor() {
116 int result;
117 RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, control_fd_, nullptr));
118 ASSERT(result != -1);
119
120 RUN_NO_INTR(result = close(control_fd_));
121 ASSERT(result != -1);
122
123 RUN_NO_INTR(result = close(epoll_fd_));
124 ASSERT(result != -1);
125 }
126
Run()127 void Reactor::Run() {
128 bool already_running = is_running_.exchange(true);
129 ASSERT(!already_running);
130
131 int timeout_ms = -1;
132 bool waiting_for_idle = false;
133 for (;;) {
134 {
135 std::unique_lock<std::mutex> lock(mutex_);
136 invalidation_list_.clear();
137 }
138 epoll_event events[kEpollMaxEvents];
139 int count;
140 RUN_NO_INTR(count = epoll_wait(epoll_fd_, events, kEpollMaxEvents, timeout_ms));
141 ASSERT(count != -1);
142 if (waiting_for_idle && count == 0) {
143 timeout_ms = -1;
144 waiting_for_idle = false;
145 std::scoped_lock<std::mutex> lock(mutex_);
146 idle_promise_->set_value();
147 idle_promise_ = nullptr;
148 }
149
150 for (int i = 0; i < count; ++i) {
151 auto event = events[i];
152 ASSERT(event.events != 0u);
153
154 // If the ptr stored in epoll_event.data is nullptr, it means the control fd triggered
155 if (event.data.ptr == nullptr) {
156 uint64_t value;
157 eventfd_read(control_fd_, &value);
158 if ((value & kStopReactor) != 0) {
159 is_running_ = false;
160 return;
161 } else if ((value & kWaitForIdle) != 0) {
162 timeout_ms = 30;
163 waiting_for_idle = true;
164 continue;
165 } else {
166 LOG_ERROR("Unknown control_fd value %" PRIu64 "x", value);
167 continue;
168 }
169 }
170 auto* reactable = static_cast<Reactor::Reactable*>(event.data.ptr);
171 std::unique_lock<std::mutex> lock(mutex_);
172 executing_reactable_finished_ = nullptr;
173 // See if this reactable has been removed in the meantime.
174 if (std::find(invalidation_list_.begin(), invalidation_list_.end(), reactable) != invalidation_list_.end()) {
175 continue;
176 }
177
178 {
179 std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
180 lock.unlock();
181 reactable->is_executing_ = true;
182 }
183 if (event.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && !reactable->on_read_ready_.is_null()) {
184 reactable->on_read_ready_.Run();
185 }
186 if (event.events & EPOLLOUT && !reactable->on_write_ready_.is_null()) {
187 reactable->on_write_ready_.Run();
188 }
189 {
190 std::unique_lock<std::mutex> reactable_lock(reactable->mutex_);
191 reactable->is_executing_ = false;
192 if (reactable->removed_) {
193 reactable->finished_promise_->set_value();
194 reactable_lock.unlock();
195 delete reactable;
196 }
197 }
198 }
199 }
200 }
201
Stop()202 void Reactor::Stop() {
203 if (!is_running_) {
204 LOG_WARN("not running, will stop once it's started");
205 }
206 auto control = eventfd_write(control_fd_, kStopReactor);
207 ASSERT(control != -1);
208 }
209
NewEvent() const210 std::unique_ptr<Reactor::Event> Reactor::NewEvent() const {
211 return std::make_unique<Reactor::Event>();
212 }
213
Register(int fd,Closure on_read_ready,Closure on_write_ready)214 Reactor::Reactable* Reactor::Register(int fd, Closure on_read_ready, Closure on_write_ready) {
215 uint32_t poll_event_type = 0;
216 if (!on_read_ready.is_null()) {
217 poll_event_type |= (EPOLLIN | EPOLLRDHUP);
218 }
219 if (!on_write_ready.is_null()) {
220 poll_event_type |= EPOLLOUT;
221 }
222 auto* reactable = new Reactable(fd, on_read_ready, on_write_ready);
223 epoll_event event = {
224 .events = poll_event_type,
225 .data = {.ptr = reactable},
226 };
227 int register_fd;
228 RUN_NO_INTR(register_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event));
229 ASSERT(register_fd != -1);
230 return reactable;
231 }
232
Unregister(Reactor::Reactable * reactable)233 void Reactor::Unregister(Reactor::Reactable* reactable) {
234 ASSERT(reactable != nullptr);
235 {
236 std::lock_guard<std::mutex> lock(mutex_);
237 invalidation_list_.push_back(reactable);
238 }
239 bool delaying_delete_until_callback_finished = false;
240 {
241 int result;
242 std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
243 RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, reactable->fd_, nullptr));
244 if (result == -1 && errno == ENOENT) {
245 LOG_INFO("reactable is invalid or unregistered");
246 } else {
247 ASSERT(result != -1);
248 }
249
250 // If we are unregistering during the callback event from this reactable, we delete it after the callback is
251 // executed. reactable->is_executing_ is protected by reactable->mutex_, so it's thread safe.
252 if (reactable->is_executing_) {
253 reactable->removed_ = true;
254 reactable->finished_promise_ = std::make_unique<std::promise<void>>();
255 executing_reactable_finished_ = std::make_shared<std::future<void>>(reactable->finished_promise_->get_future());
256 delaying_delete_until_callback_finished = true;
257 }
258 }
259 // If we are unregistering outside of the callback event from this reactable, we delete it now
260 if (!delaying_delete_until_callback_finished) {
261 delete reactable;
262 }
263 }
264
WaitForUnregisteredReactable(std::chrono::milliseconds timeout)265 bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) {
266 std::lock_guard<std::mutex> lock(mutex_);
267 if (executing_reactable_finished_ == nullptr) {
268 return true;
269 }
270 auto stop_status = executing_reactable_finished_->wait_for(timeout);
271 if (stop_status != std::future_status::ready) {
272 LOG_ERROR("Unregister reactable timed out");
273 }
274 return stop_status == std::future_status::ready;
275 }
276
WaitForIdle(std::chrono::milliseconds timeout)277 bool Reactor::WaitForIdle(std::chrono::milliseconds timeout) {
278 auto promise = std::make_shared<std::promise<void>>();
279 auto future = std::make_unique<std::future<void>>(promise->get_future());
280 {
281 std::lock_guard<std::mutex> lock(mutex_);
282 idle_promise_ = promise;
283 }
284
285 auto control = eventfd_write(control_fd_, kWaitForIdle);
286 ASSERT(control != -1);
287
288 auto idle_status = future->wait_for(timeout);
289 return idle_status == std::future_status::ready;
290 }
291
ModifyRegistration(Reactor::Reactable * reactable,ReactOn react_on)292 void Reactor::ModifyRegistration(Reactor::Reactable* reactable, ReactOn react_on) {
293 ASSERT(reactable != nullptr);
294
295 uint32_t poll_event_type = 0;
296 if (react_on == REACT_ON_READ_ONLY || react_on == REACT_ON_READ_WRITE) {
297 poll_event_type |= (EPOLLIN | EPOLLRDHUP);
298 }
299 if (react_on == REACT_ON_WRITE_ONLY || react_on == REACT_ON_READ_WRITE) {
300 poll_event_type |= EPOLLOUT;
301 }
302 epoll_event event = {
303 .events = poll_event_type,
304 .data = {.ptr = reactable},
305 };
306 int modify_fd;
307 RUN_NO_INTR(modify_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, reactable->fd_, &event));
308 ASSERT(modify_fd != -1);
309 }
310
311 } // namespace os
312 } // namespace bluetooth
313