• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "os/reactor.h"
18 
19 #include <fcntl.h>
20 #include <sys/epoll.h>
21 #include <sys/eventfd.h>
22 #include <unistd.h>
23 
24 #include <algorithm>
25 #include <cerrno>
26 #include <cinttypes>
27 #include <cstring>
28 
29 #include "os/log.h"
30 
31 namespace {
32 
33 // Use at most sizeof(epoll_event) * kEpollMaxEvents kernel memory
34 constexpr int kEpollMaxEvents = 64;
35 constexpr uint64_t kStopReactor = 1 << 0;
36 constexpr uint64_t kWaitForIdle = 1 << 1;
37 
38 }  // namespace
39 
40 namespace bluetooth {
41 namespace os {
42 using common::Closure;
43 
44 class Reactor::Reactable {
45  public:
Reactable(int fd,Closure on_read_ready,Closure on_write_ready)46   Reactable(int fd, Closure on_read_ready, Closure on_write_ready)
47       : fd_(fd),
48         on_read_ready_(std::move(on_read_ready)),
49         on_write_ready_(std::move(on_write_ready)),
50         is_executing_(false),
51         removed_(false) {}
52   const int fd_;
53   Closure on_read_ready_;
54   Closure on_write_ready_;
55   bool is_executing_;
56   bool removed_;
57   std::mutex mutex_;
58   std::unique_ptr<std::promise<void>> finished_promise_;
59 };
60 
Reactor()61 Reactor::Reactor() : epoll_fd_(0), control_fd_(0), is_running_(false) {
62   RUN_NO_INTR(epoll_fd_ = epoll_create1(EPOLL_CLOEXEC));
63   ASSERT_LOG(epoll_fd_ != -1, "could not create epoll fd: %s", strerror(errno));
64 
65   control_fd_ = eventfd(0, EFD_NONBLOCK);
66   ASSERT(control_fd_ != -1);
67 
68   epoll_event control_epoll_event = {EPOLLIN, {.ptr = nullptr}};
69   int result;
70   RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, control_fd_, &control_epoll_event));
71   ASSERT(result != -1);
72 }
73 
~Reactor()74 Reactor::~Reactor() {
75   int result;
76   RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, control_fd_, nullptr));
77   ASSERT(result != -1);
78 
79   RUN_NO_INTR(result = close(control_fd_));
80   ASSERT(result != -1);
81 
82   RUN_NO_INTR(result = close(epoll_fd_));
83   ASSERT(result != -1);
84 }
85 
Run()86 void Reactor::Run() {
87   bool already_running = is_running_.exchange(true);
88   ASSERT(!already_running);
89 
90   int timeout_ms = -1;
91   bool waiting_for_idle = false;
92   for (;;) {
93     {
94       std::unique_lock<std::mutex> lock(mutex_);
95       invalidation_list_.clear();
96     }
97     epoll_event events[kEpollMaxEvents];
98     int count;
99     RUN_NO_INTR(count = epoll_wait(epoll_fd_, events, kEpollMaxEvents, timeout_ms));
100     ASSERT(count != -1);
101     if (waiting_for_idle && count == 0) {
102       timeout_ms = -1;
103       waiting_for_idle = false;
104       idle_promise_->set_value();
105       idle_promise_ = nullptr;
106     }
107 
108     for (int i = 0; i < count; ++i) {
109       auto event = events[i];
110       ASSERT(event.events != 0u);
111 
112       // If the ptr stored in epoll_event.data is nullptr, it means the control fd triggered
113       if (event.data.ptr == nullptr) {
114         uint64_t value;
115         eventfd_read(control_fd_, &value);
116         if ((value & kStopReactor) != 0) {
117           is_running_ = false;
118           return;
119         } else if ((value & kWaitForIdle) != 0) {
120           timeout_ms = 30;
121           waiting_for_idle = true;
122           continue;
123         } else {
124           LOG_ERROR("Unknown control_fd value %" PRIu64 "x", value);
125           continue;
126         }
127       }
128       auto* reactable = static_cast<Reactor::Reactable*>(event.data.ptr);
129       std::unique_lock<std::mutex> lock(mutex_);
130       executing_reactable_finished_ = nullptr;
131       // See if this reactable has been removed in the meantime.
132       if (std::find(invalidation_list_.begin(), invalidation_list_.end(), reactable) != invalidation_list_.end()) {
133         continue;
134       }
135 
136       {
137         std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
138         lock.unlock();
139         reactable->is_executing_ = true;
140       }
141       if (event.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && !reactable->on_read_ready_.is_null()) {
142         reactable->on_read_ready_.Run();
143       }
144       if (event.events & EPOLLOUT && !reactable->on_write_ready_.is_null()) {
145         reactable->on_write_ready_.Run();
146       }
147       {
148         std::unique_lock<std::mutex> reactable_lock(reactable->mutex_);
149         reactable->is_executing_ = false;
150         if (reactable->removed_) {
151           reactable->finished_promise_->set_value();
152           reactable_lock.unlock();
153           delete reactable;
154         }
155       }
156     }
157   }
158 }
159 
Stop()160 void Reactor::Stop() {
161   if (!is_running_) {
162     LOG_WARN("not running, will stop once it's started");
163   }
164   auto control = eventfd_write(control_fd_, kStopReactor);
165   ASSERT(control != -1);
166 }
167 
Register(int fd,Closure on_read_ready,Closure on_write_ready)168 Reactor::Reactable* Reactor::Register(int fd, Closure on_read_ready, Closure on_write_ready) {
169   uint32_t poll_event_type = 0;
170   if (!on_read_ready.is_null()) {
171     poll_event_type |= (EPOLLIN | EPOLLRDHUP);
172   }
173   if (!on_write_ready.is_null()) {
174     poll_event_type |= EPOLLOUT;
175   }
176   auto* reactable = new Reactable(fd, on_read_ready, on_write_ready);
177   epoll_event event = {
178       .events = poll_event_type,
179       .data = {.ptr = reactable},
180   };
181   int register_fd;
182   RUN_NO_INTR(register_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event));
183   ASSERT(register_fd != -1);
184   return reactable;
185 }
186 
Unregister(Reactor::Reactable * reactable)187 void Reactor::Unregister(Reactor::Reactable* reactable) {
188   ASSERT(reactable != nullptr);
189   {
190     std::lock_guard<std::mutex> lock(mutex_);
191     invalidation_list_.push_back(reactable);
192   }
193   bool delaying_delete_until_callback_finished = false;
194   {
195     int result;
196     std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
197     RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, reactable->fd_, nullptr));
198     if (result == -1 && errno == ENOENT) {
199       LOG_INFO("reactable is invalid or unregistered");
200     } else {
201       ASSERT(result != -1);
202     }
203 
204     // If we are unregistering during the callback event from this reactable, we delete it after the callback is
205     // executed. reactable->is_executing_ is protected by reactable->mutex_, so it's thread safe.
206     if (reactable->is_executing_) {
207       reactable->removed_ = true;
208       reactable->finished_promise_ = std::make_unique<std::promise<void>>();
209       executing_reactable_finished_ = std::make_shared<std::future<void>>(reactable->finished_promise_->get_future());
210       delaying_delete_until_callback_finished = true;
211     }
212   }
213   // If we are unregistering outside of the callback event from this reactable, we delete it now
214   if (!delaying_delete_until_callback_finished) {
215     delete reactable;
216   }
217 }
218 
WaitForUnregisteredReactable(std::chrono::milliseconds timeout)219 bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) {
220   std::lock_guard<std::mutex> lock(mutex_);
221   if (executing_reactable_finished_ == nullptr) {
222     return true;
223   }
224   auto stop_status = executing_reactable_finished_->wait_for(timeout);
225   if (stop_status != std::future_status::ready) {
226     LOG_ERROR("Unregister reactable timed out");
227   }
228   return stop_status == std::future_status::ready;
229 }
230 
WaitForIdle(std::chrono::milliseconds timeout)231 bool Reactor::WaitForIdle(std::chrono::milliseconds timeout) {
232   auto promise = std::make_shared<std::promise<void>>();
233   auto future = std::make_unique<std::future<void>>(promise->get_future());
234   {
235     std::lock_guard<std::mutex> lock(mutex_);
236     idle_promise_ = promise;
237   }
238 
239   auto control = eventfd_write(control_fd_, kWaitForIdle);
240   ASSERT(control != -1);
241 
242   auto idle_status = future->wait_for(timeout);
243   return idle_status == std::future_status::ready;
244 }
245 
ModifyRegistration(Reactor::Reactable * reactable,Closure on_read_ready,Closure on_write_ready)246 void Reactor::ModifyRegistration(Reactor::Reactable* reactable, Closure on_read_ready, Closure on_write_ready) {
247   ASSERT(reactable != nullptr);
248 
249   uint32_t poll_event_type = 0;
250   if (!on_read_ready.is_null()) {
251     poll_event_type |= (EPOLLIN | EPOLLRDHUP);
252   }
253   if (!on_write_ready.is_null()) {
254     poll_event_type |= EPOLLOUT;
255   }
256   {
257     std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
258     reactable->on_read_ready_ = std::move(on_read_ready);
259     reactable->on_write_ready_ = std::move(on_write_ready);
260   }
261   epoll_event event = {
262       .events = poll_event_type,
263       .data = {.ptr = reactable},
264   };
265   int modify_fd;
266   RUN_NO_INTR(modify_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, reactable->fd_, &event));
267   ASSERT(modify_fd != -1);
268 }
269 
270 }  // namespace os
271 }  // namespace bluetooth
272