1 /*
2 * Copyright 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "os/reactor.h"
18
19 #include <fcntl.h>
20 #include <sys/epoll.h>
21 #include <sys/eventfd.h>
22 #include <unistd.h>
23 #include <algorithm>
24 #include <cerrno>
25 #include <cstring>
26
27 #include "os/log.h"
28
29 namespace {
30
31 // Use at most sizeof(epoll_event) * kEpollMaxEvents kernel memory
32 constexpr int kEpollMaxEvents = 64;
33
34 } // namespace
35
36 namespace bluetooth {
37 namespace os {
38
39 class Reactor::Reactable {
40 public:
Reactable(int fd,Closure on_read_ready,Closure on_write_ready)41 Reactable(int fd, Closure on_read_ready, Closure on_write_ready)
42 : fd_(fd), on_read_ready_(std::move(on_read_ready)), on_write_ready_(std::move(on_write_ready)),
43 is_executing_(false), removed_(false) {}
44 const int fd_;
45 Closure on_read_ready_;
46 Closure on_write_ready_;
47 bool is_executing_;
48 bool removed_;
49 std::mutex mutex_;
50 std::unique_ptr<std::promise<void>> finished_promise_;
51 };
52
Reactor()53 Reactor::Reactor()
54 : epoll_fd_(0),
55 control_fd_(0),
56 is_running_(false) {
57 RUN_NO_INTR(epoll_fd_ = epoll_create1(EPOLL_CLOEXEC));
58 ASSERT_LOG(epoll_fd_ != -1, "could not create epoll fd: %s", strerror(errno));
59
60 control_fd_ = eventfd(0, EFD_NONBLOCK);
61 ASSERT(control_fd_ != -1);
62
63 epoll_event control_epoll_event = {EPOLLIN, {.ptr = nullptr}};
64 int result;
65 RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, control_fd_, &control_epoll_event));
66 ASSERT(result != -1);
67 }
68
~Reactor()69 Reactor::~Reactor() {
70 int result;
71 RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, control_fd_, nullptr));
72 ASSERT(result != -1);
73
74 RUN_NO_INTR(result = close(control_fd_));
75 ASSERT(result != -1);
76
77 RUN_NO_INTR(result = close(epoll_fd_));
78 ASSERT(result != -1);
79 }
80
Run()81 void Reactor::Run() {
82 bool already_running = is_running_.exchange(true);
83 ASSERT(!already_running);
84
85 for (;;) {
86 {
87 std::unique_lock<std::mutex> lock(mutex_);
88 invalidation_list_.clear();
89 }
90 epoll_event events[kEpollMaxEvents];
91 int count;
92 RUN_NO_INTR(count = epoll_wait(epoll_fd_, events, kEpollMaxEvents, -1));
93 ASSERT(count != -1);
94
95 for (int i = 0; i < count; ++i) {
96 auto event = events[i];
97 ASSERT(event.events != 0u);
98
99 // If the ptr stored in epoll_event.data is nullptr, it means the control fd triggered
100 if (event.data.ptr == nullptr) {
101 uint64_t value;
102 eventfd_read(control_fd_, &value);
103 is_running_ = false;
104 return;
105 }
106 auto* reactable = static_cast<Reactor::Reactable*>(event.data.ptr);
107 std::unique_lock<std::mutex> lock(mutex_);
108 // See if this reactable has been removed in the meantime.
109 if (std::find(invalidation_list_.begin(), invalidation_list_.end(), reactable) != invalidation_list_.end()) {
110 continue;
111 }
112
113 {
114 std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
115 lock.unlock();
116 reactable->is_executing_ = true;
117 }
118 if (event.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && !reactable->on_read_ready_.is_null()) {
119 reactable->on_read_ready_.Run();
120 }
121 if (event.events & EPOLLOUT && !reactable->on_write_ready_.is_null()) {
122 reactable->on_write_ready_.Run();
123 }
124 {
125 std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
126 reactable->is_executing_ = false;
127 if (reactable->removed_) {
128 reactable->finished_promise_->set_value();
129 delete reactable;
130 }
131 }
132 }
133 }
134 }
135
Stop()136 void Reactor::Stop() {
137 if (!is_running_) {
138 LOG_WARN("not running, will stop once it's started");
139 }
140 auto control = eventfd_write(control_fd_, 1);
141 ASSERT(control != -1);
142 }
143
Register(int fd,Closure on_read_ready,Closure on_write_ready)144 Reactor::Reactable* Reactor::Register(int fd, Closure on_read_ready, Closure on_write_ready) {
145 uint32_t poll_event_type = 0;
146 if (!on_read_ready.is_null()) {
147 poll_event_type |= (EPOLLIN | EPOLLRDHUP);
148 }
149 if (!on_write_ready.is_null()) {
150 poll_event_type |= EPOLLOUT;
151 }
152 auto* reactable = new Reactable(fd, on_read_ready, on_write_ready);
153 epoll_event event = {
154 .events = poll_event_type,
155 .data = {.ptr = reactable},
156 };
157 int register_fd;
158 RUN_NO_INTR(register_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event));
159 ASSERT(register_fd != -1);
160 return reactable;
161 }
162
Unregister(Reactor::Reactable * reactable)163 void Reactor::Unregister(Reactor::Reactable* reactable) {
164 ASSERT(reactable != nullptr);
165 {
166 std::lock_guard<std::mutex> lock(mutex_);
167 invalidation_list_.push_back(reactable);
168 }
169 bool delaying_delete_until_callback_finished = false;
170 {
171 int result;
172 std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
173 RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, reactable->fd_, nullptr));
174 if (result == -1 && errno == ENOENT) {
175 LOG_INFO("reactable is invalid or unregistered");
176 } else {
177 ASSERT(result != -1);
178 }
179
180 // If we are unregistering during the callback event from this reactable, we delete it after the callback is
181 // executed. reactable->is_executing_ is protected by reactable->mutex_, so it's thread safe.
182 if (reactable->is_executing_) {
183 reactable->removed_ = true;
184 reactable->finished_promise_ = std::make_unique<std::promise<void>>();
185 executing_reactable_finished_ = std::make_unique<std::future<void>>(reactable->finished_promise_->get_future());
186 delaying_delete_until_callback_finished = true;
187 }
188 }
189 // If we are unregistering outside of the callback event from this reactable, we delete it now
190 if (!delaying_delete_until_callback_finished) {
191 delete reactable;
192 }
193 }
194
WaitForUnregisteredReactable(std::chrono::milliseconds timeout)195 bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) {
196 if (executing_reactable_finished_ == nullptr) {
197 return true;
198 }
199 auto stop_status = executing_reactable_finished_->wait_for(timeout);
200 return stop_status == std::future_status::ready;
201 }
202
ModifyRegistration(Reactor::Reactable * reactable,Closure on_read_ready,Closure on_write_ready)203 void Reactor::ModifyRegistration(Reactor::Reactable* reactable, Closure on_read_ready, Closure on_write_ready) {
204 ASSERT(reactable != nullptr);
205
206 uint32_t poll_event_type = 0;
207 if (!on_read_ready.is_null()) {
208 poll_event_type |= (EPOLLIN | EPOLLRDHUP);
209 }
210 if (!on_write_ready.is_null()) {
211 poll_event_type |= EPOLLOUT;
212 }
213 {
214 std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
215 reactable->on_read_ready_ = std::move(on_read_ready);
216 reactable->on_write_ready_ = std::move(on_write_ready);
217 }
218 epoll_event event = {
219 .events = poll_event_type,
220 .data = {.ptr = reactable},
221 };
222 int modify_fd;
223 RUN_NO_INTR(modify_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, reactable->fd_, &event));
224 ASSERT(modify_fd != -1);
225 }
226
227 } // namespace os
228 } // namespace bluetooth
229