• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "os/reactor.h"
18 
19 #include <fcntl.h>
20 #include <sys/epoll.h>
21 #include <sys/eventfd.h>
22 #include <unistd.h>
23 
24 #include <algorithm>
25 #include <cerrno>
26 #include <cinttypes>
27 #include <cstring>
28 
29 #include "os/log.h"
30 
31 namespace {
32 
33 // Use at most sizeof(epoll_event) * kEpollMaxEvents kernel memory
34 constexpr int kEpollMaxEvents = 64;
35 constexpr uint64_t kStopReactor = 1 << 0;
36 constexpr uint64_t kWaitForIdle = 1 << 1;
37 
38 }  // namespace
39 
40 namespace bluetooth {
41 namespace os {
42 using common::Closure;
43 
44 struct Reactor::Event::impl {
implbluetooth::os::Reactor::Event::impl45   impl() {
46     fd_ = eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK);
47     ASSERT_LOG(fd_ != -1, "Unable to create nonblocking event file descriptor semaphore");
48   }
~implbluetooth::os::Reactor::Event::impl49   ~impl() {
50     ASSERT_LOG(fd_ != -1, "Unable to close a never-opened event file descriptor");
51     close(fd_);
52     fd_ = -1;
53   }
54   int fd_ = -1;
55 };
56 
Event()57 Reactor::Event::Event() : pimpl_(new impl()) {}
~Event()58 Reactor::Event::~Event() {
59   delete pimpl_;
60 }
61 
Read()62 bool Reactor::Event::Read() {
63   uint64_t val = 0;
64   return eventfd_read(pimpl_->fd_, &val) == 0;
65 }
Id() const66 int Reactor::Event::Id() const {
67   return pimpl_->fd_;
68 }
Clear()69 void Reactor::Event::Clear() {
70   uint64_t val;
71   while (eventfd_read(pimpl_->fd_, &val) == 0) {
72   }
73 }
Close()74 void Reactor::Event::Close() {
75   int close_status;
76   RUN_NO_INTR(close_status = close(pimpl_->fd_));
77   ASSERT(close_status != -1);
78 }
Notify()79 void Reactor::Event::Notify() {
80   uint64_t val = 1;
81   auto write_result = eventfd_write(pimpl_->fd_, val);
82   ASSERT(write_result != -1);
83 }
84 
85 class Reactor::Reactable {
86  public:
Reactable(int fd,Closure on_read_ready,Closure on_write_ready)87   Reactable(int fd, Closure on_read_ready, Closure on_write_ready)
88       : fd_(fd),
89         on_read_ready_(std::move(on_read_ready)),
90         on_write_ready_(std::move(on_write_ready)),
91         is_executing_(false),
92         removed_(false) {}
93   const int fd_;
94   Closure on_read_ready_;
95   Closure on_write_ready_;
96   bool is_executing_;
97   bool removed_;
98   std::mutex mutex_;
99   std::unique_ptr<std::promise<void>> finished_promise_;
100 };
101 
Reactor()102 Reactor::Reactor() : epoll_fd_(0), control_fd_(0), is_running_(false) {
103   RUN_NO_INTR(epoll_fd_ = epoll_create1(EPOLL_CLOEXEC));
104   ASSERT_LOG(epoll_fd_ != -1, "could not create epoll fd: %s", strerror(errno));
105 
106   control_fd_ = eventfd(0, EFD_NONBLOCK);
107   ASSERT(control_fd_ != -1);
108 
109   epoll_event control_epoll_event = {EPOLLIN, {.ptr = nullptr}};
110   int result;
111   RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, control_fd_, &control_epoll_event));
112   ASSERT(result != -1);
113 }
114 
~Reactor()115 Reactor::~Reactor() {
116   int result;
117   RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, control_fd_, nullptr));
118   ASSERT(result != -1);
119 
120   RUN_NO_INTR(result = close(control_fd_));
121   ASSERT(result != -1);
122 
123   RUN_NO_INTR(result = close(epoll_fd_));
124   ASSERT(result != -1);
125 }
126 
Run()127 void Reactor::Run() {
128   bool already_running = is_running_.exchange(true);
129   ASSERT(!already_running);
130 
131   int timeout_ms = -1;
132   bool waiting_for_idle = false;
133   for (;;) {
134     {
135       std::unique_lock<std::mutex> lock(mutex_);
136       invalidation_list_.clear();
137     }
138     epoll_event events[kEpollMaxEvents];
139     int count;
140     RUN_NO_INTR(count = epoll_wait(epoll_fd_, events, kEpollMaxEvents, timeout_ms));
141     ASSERT(count != -1);
142     if (waiting_for_idle && count == 0) {
143       timeout_ms = -1;
144       waiting_for_idle = false;
145       std::scoped_lock<std::mutex> lock(mutex_);
146       idle_promise_->set_value();
147       idle_promise_ = nullptr;
148     }
149 
150     for (int i = 0; i < count; ++i) {
151       auto event = events[i];
152       ASSERT(event.events != 0u);
153 
154       // If the ptr stored in epoll_event.data is nullptr, it means the control fd triggered
155       if (event.data.ptr == nullptr) {
156         uint64_t value;
157         eventfd_read(control_fd_, &value);
158         if ((value & kStopReactor) != 0) {
159           is_running_ = false;
160           return;
161         } else if ((value & kWaitForIdle) != 0) {
162           timeout_ms = 30;
163           waiting_for_idle = true;
164           continue;
165         } else {
166           LOG_ERROR("Unknown control_fd value %" PRIu64 "x", value);
167           continue;
168         }
169       }
170       auto* reactable = static_cast<Reactor::Reactable*>(event.data.ptr);
171       std::unique_lock<std::mutex> lock(mutex_);
172       executing_reactable_finished_ = nullptr;
173       // See if this reactable has been removed in the meantime.
174       if (std::find(invalidation_list_.begin(), invalidation_list_.end(), reactable) != invalidation_list_.end()) {
175         continue;
176       }
177 
178       {
179         std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
180         lock.unlock();
181         reactable->is_executing_ = true;
182       }
183       if (event.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && !reactable->on_read_ready_.is_null()) {
184         reactable->on_read_ready_.Run();
185       }
186       if (event.events & EPOLLOUT && !reactable->on_write_ready_.is_null()) {
187         reactable->on_write_ready_.Run();
188       }
189       {
190         std::unique_lock<std::mutex> reactable_lock(reactable->mutex_);
191         reactable->is_executing_ = false;
192         if (reactable->removed_) {
193           reactable->finished_promise_->set_value();
194           reactable_lock.unlock();
195           delete reactable;
196         }
197       }
198     }
199   }
200 }
201 
Stop()202 void Reactor::Stop() {
203   if (!is_running_) {
204     LOG_WARN("not running, will stop once it's started");
205   }
206   auto control = eventfd_write(control_fd_, kStopReactor);
207   ASSERT(control != -1);
208 }
209 
NewEvent() const210 std::unique_ptr<Reactor::Event> Reactor::NewEvent() const {
211   return std::make_unique<Reactor::Event>();
212 }
213 
Register(int fd,Closure on_read_ready,Closure on_write_ready)214 Reactor::Reactable* Reactor::Register(int fd, Closure on_read_ready, Closure on_write_ready) {
215   uint32_t poll_event_type = 0;
216   if (!on_read_ready.is_null()) {
217     poll_event_type |= (EPOLLIN | EPOLLRDHUP);
218   }
219   if (!on_write_ready.is_null()) {
220     poll_event_type |= EPOLLOUT;
221   }
222   auto* reactable = new Reactable(fd, on_read_ready, on_write_ready);
223   epoll_event event = {
224       .events = poll_event_type,
225       .data = {.ptr = reactable},
226   };
227   int register_fd;
228   RUN_NO_INTR(register_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event));
229   ASSERT(register_fd != -1);
230   return reactable;
231 }
232 
Unregister(Reactor::Reactable * reactable)233 void Reactor::Unregister(Reactor::Reactable* reactable) {
234   ASSERT(reactable != nullptr);
235   {
236     std::lock_guard<std::mutex> lock(mutex_);
237     invalidation_list_.push_back(reactable);
238   }
239   bool delaying_delete_until_callback_finished = false;
240   {
241     int result;
242     std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
243     RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, reactable->fd_, nullptr));
244     if (result == -1 && errno == ENOENT) {
245       LOG_INFO("reactable is invalid or unregistered");
246     } else {
247       ASSERT(result != -1);
248     }
249 
250     // If we are unregistering during the callback event from this reactable, we delete it after the callback is
251     // executed. reactable->is_executing_ is protected by reactable->mutex_, so it's thread safe.
252     if (reactable->is_executing_) {
253       reactable->removed_ = true;
254       reactable->finished_promise_ = std::make_unique<std::promise<void>>();
255       executing_reactable_finished_ = std::make_shared<std::future<void>>(reactable->finished_promise_->get_future());
256       delaying_delete_until_callback_finished = true;
257     }
258   }
259   // If we are unregistering outside of the callback event from this reactable, we delete it now
260   if (!delaying_delete_until_callback_finished) {
261     delete reactable;
262   }
263 }
264 
WaitForUnregisteredReactable(std::chrono::milliseconds timeout)265 bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) {
266   std::lock_guard<std::mutex> lock(mutex_);
267   if (executing_reactable_finished_ == nullptr) {
268     return true;
269   }
270   auto stop_status = executing_reactable_finished_->wait_for(timeout);
271   if (stop_status != std::future_status::ready) {
272     LOG_ERROR("Unregister reactable timed out");
273   }
274   return stop_status == std::future_status::ready;
275 }
276 
WaitForIdle(std::chrono::milliseconds timeout)277 bool Reactor::WaitForIdle(std::chrono::milliseconds timeout) {
278   auto promise = std::make_shared<std::promise<void>>();
279   auto future = std::make_unique<std::future<void>>(promise->get_future());
280   {
281     std::lock_guard<std::mutex> lock(mutex_);
282     idle_promise_ = promise;
283   }
284 
285   auto control = eventfd_write(control_fd_, kWaitForIdle);
286   ASSERT(control != -1);
287 
288   auto idle_status = future->wait_for(timeout);
289   return idle_status == std::future_status::ready;
290 }
291 
ModifyRegistration(Reactor::Reactable * reactable,ReactOn react_on)292 void Reactor::ModifyRegistration(Reactor::Reactable* reactable, ReactOn react_on) {
293   ASSERT(reactable != nullptr);
294 
295   uint32_t poll_event_type = 0;
296   if (react_on == REACT_ON_READ_ONLY || react_on == REACT_ON_READ_WRITE) {
297     poll_event_type |= (EPOLLIN | EPOLLRDHUP);
298   }
299   if (react_on == REACT_ON_WRITE_ONLY || react_on == REACT_ON_READ_WRITE) {
300     poll_event_type |= EPOLLOUT;
301   }
302   epoll_event event = {
303       .events = poll_event_type,
304       .data = {.ptr = reactable},
305   };
306   int modify_fd;
307   RUN_NO_INTR(modify_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, reactable->fd_, &event));
308   ASSERT(modify_fd != -1);
309 }
310 
311 }  // namespace os
312 }  // namespace bluetooth
313