1 #include <pdx/service_dispatcher.h>
2
3 #include <errno.h>
4 #include <log/log.h>
5 #include <sys/epoll.h>
6 #include <sys/eventfd.h>
7
8 #include <pdx/service.h>
9 #include <pdx/service_endpoint.h>
10
11 static const int kMaxEventsPerLoop = 128;
12
13 namespace android {
14 namespace pdx {
15
Create()16 std::unique_ptr<ServiceDispatcher> ServiceDispatcher::Create() {
17 std::unique_ptr<ServiceDispatcher> dispatcher{new ServiceDispatcher()};
18 if (!dispatcher->epoll_fd_ || !dispatcher->event_fd_) {
19 dispatcher.reset();
20 }
21
22 return dispatcher;
23 }
24
ServiceDispatcher()25 ServiceDispatcher::ServiceDispatcher() {
26 event_fd_.Reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK));
27 if (!event_fd_) {
28 ALOGE("Failed to create event fd because: %s\n", strerror(errno));
29 return;
30 }
31
32 epoll_fd_.Reset(epoll_create1(EPOLL_CLOEXEC));
33 if (!epoll_fd_) {
34 ALOGE("Failed to create epoll fd because: %s\n", strerror(errno));
35 return;
36 }
37
38 // Use "this" as a unique pointer to distinguish the event fd from all
39 // the other entries that point to instances of Service.
40 epoll_event event;
41 event.events = EPOLLIN;
42 event.data.ptr = this;
43
44 if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, event_fd_.Get(), &event) < 0) {
45 ALOGE("Failed to add event fd to epoll fd because: %s\n", strerror(errno));
46
47 // Close the fds here and signal failure to the factory method.
48 event_fd_.Close();
49 epoll_fd_.Close();
50 }
51 }
52
~ServiceDispatcher()53 ServiceDispatcher::~ServiceDispatcher() { SetCanceled(true); }
54
ThreadEnter()55 int ServiceDispatcher::ThreadEnter() {
56 std::lock_guard<std::mutex> autolock(mutex_);
57
58 if (canceled_)
59 return -EBUSY;
60
61 thread_count_++;
62 return 0;
63 }
64
ThreadExit()65 void ServiceDispatcher::ThreadExit() {
66 std::lock_guard<std::mutex> autolock(mutex_);
67 thread_count_--;
68 condition_.notify_one();
69 }
70
AddService(const std::shared_ptr<Service> & service)71 int ServiceDispatcher::AddService(const std::shared_ptr<Service>& service) {
72 std::lock_guard<std::mutex> autolock(mutex_);
73
74 epoll_event event;
75 event.events = EPOLLIN;
76 event.data.ptr = service.get();
77
78 if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, service->endpoint()->epoll_fd(),
79 &event) < 0) {
80 ALOGE("Failed to add service to dispatcher because: %s\n", strerror(errno));
81 return -errno;
82 }
83
84 services_.push_back(service);
85 return 0;
86 }
87
RemoveService(const std::shared_ptr<Service> & service)88 int ServiceDispatcher::RemoveService(const std::shared_ptr<Service>& service) {
89 std::lock_guard<std::mutex> autolock(mutex_);
90
91 // It's dangerous to remove a service while other threads may be using it.
92 if (thread_count_ > 0)
93 return -EBUSY;
94
95 epoll_event ee; // See BUGS in man 2 epoll_ctl.
96 if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_DEL, service->endpoint()->epoll_fd(),
97 &ee) < 0) {
98 ALOGE("Failed to remove service from dispatcher because: %s\n",
99 strerror(errno));
100 return -errno;
101 }
102
103 services_.erase(std::remove(services_.begin(), services_.end(), service),
104 services_.end());
105 return 0;
106 }
107
ReceiveAndDispatch()108 int ServiceDispatcher::ReceiveAndDispatch() { return ReceiveAndDispatch(-1); }
109
ReceiveAndDispatch(int timeout)110 int ServiceDispatcher::ReceiveAndDispatch(int timeout) {
111 int ret = ThreadEnter();
112 if (ret < 0)
113 return ret;
114
115 epoll_event events[kMaxEventsPerLoop];
116
117 int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, timeout);
118 if (count <= 0) {
119 ALOGE_IF(count < 0, "Failed to wait for epoll events because: %s\n",
120 strerror(errno));
121 ThreadExit();
122 return count < 0 ? -errno : -ETIMEDOUT;
123 }
124
125 for (int i = 0; i < count; i++) {
126 if (events[i].data.ptr == this) {
127 ThreadExit();
128 return -EBUSY;
129 } else {
130 Service* service = static_cast<Service*>(events[i].data.ptr);
131
132 ALOGI_IF(TRACE, "Dispatching message: fd=%d\n",
133 service->endpoint()->epoll_fd());
134 service->ReceiveAndDispatch();
135 }
136 }
137
138 ThreadExit();
139 return 0;
140 }
141
EnterDispatchLoop()142 int ServiceDispatcher::EnterDispatchLoop() {
143 int ret = ThreadEnter();
144 if (ret < 0)
145 return ret;
146
147 epoll_event events[kMaxEventsPerLoop];
148
149 while (!IsCanceled()) {
150 int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, -1);
151 if (count < 0 && errno != EINTR) {
152 ALOGE("Failed to wait for epoll events because: %s\n", strerror(errno));
153 ThreadExit();
154 return -errno;
155 }
156
157 for (int i = 0; i < count; i++) {
158 if (events[i].data.ptr == this) {
159 ThreadExit();
160 return -EBUSY;
161 } else {
162 Service* service = static_cast<Service*>(events[i].data.ptr);
163
164 ALOGI_IF(TRACE, "Dispatching message: fd=%d\n",
165 service->endpoint()->epoll_fd());
166 service->ReceiveAndDispatch();
167 }
168 }
169 }
170
171 ThreadExit();
172 return 0;
173 }
174
SetCanceled(bool cancel)175 void ServiceDispatcher::SetCanceled(bool cancel) {
176 std::unique_lock<std::mutex> lock(mutex_);
177 canceled_ = cancel;
178
179 if (canceled_ && thread_count_ > 0) {
180 eventfd_write(event_fd_.Get(), 1); // Signal threads to quit.
181
182 condition_.wait(lock, [this] { return !(canceled_ && thread_count_ > 0); });
183
184 eventfd_t value;
185 eventfd_read(event_fd_.Get(), &value); // Unsignal.
186 }
187 }
188
IsCanceled() const189 bool ServiceDispatcher::IsCanceled() const { return canceled_; }
190
191 } // namespace pdx
192 } // namespace android
193