1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <https/RunLoop.h>
18
19 #include <https/Support.h>
20
21 #include <glog/logging.h>
22
23 #include <cstring>
24 #include <fcntl.h>
25 #include <iostream>
26 #include <unistd.h>
27
28 #include <mutex>
29 #include <condition_variable>
30
operator <=(const QueueElem & other) const31 bool RunLoop::QueueElem::operator<=(const QueueElem &other) const {
32 if (mWhen) {
33 if (other.mWhen) {
34 return mWhen <= other.mWhen;
35 }
36
37 return false;
38 }
39
40 if (other.mWhen) {
41 return true;
42 }
43
44 // This ensures that two events posted without a trigger time are queued in
45 // the order they were post()ed in.
46 return true;
47 }
48
RunLoop()49 RunLoop::RunLoop()
50 : mDone(false),
51 mPThread(0),
52 mNextToken(1) {
53 int res = pipe(mControlFds);
54 CHECK_GE(res, 0);
55
56 makeFdNonblocking(mControlFds[0]);
57 }
58
RunLoop(std::string_view name)59 RunLoop::RunLoop(std::string_view name)
60 : RunLoop() {
61 mName = name;
62
63 mThread = std::thread([this]{ run(); });
64 }
65
~RunLoop()66 RunLoop::~RunLoop() {
67 stop();
68
69 close(mControlFds[1]);
70 mControlFds[1] = -1;
71
72 close(mControlFds[0]);
73 mControlFds[0] = -1;
74 }
75
stop()76 void RunLoop::stop() {
77 mDone = true;
78 interrupt();
79
80 if (mThread.joinable()) {
81 mThread.join();
82 }
83 }
84
post(AsyncFunction fn)85 RunLoop::Token RunLoop::post(AsyncFunction fn) {
86 CHECK(fn != nullptr);
87
88 auto token = mNextToken++;
89 insert({ std::nullopt, fn, token });
90
91 return token;
92 }
93
postAndAwait(AsyncFunction fn)94 bool RunLoop::postAndAwait(AsyncFunction fn) {
95 if (isCurrentThread()) {
96 // To wait from the runloop's thread would cause deadlock
97 post(fn);
98 return false;
99 }
100
101 std::mutex mtx;
102 bool ran = false;
103 std::condition_variable cond_var;
104
105 post([&cond_var, &mtx, &ran, fn](){
106 fn();
107 {
108 std::unique_lock<std::mutex> lock(mtx);
109 ran = true;
110 // Notify while holding the mutex, otherwise the condition variable
111 // could be destroyed before the call to notify_all.
112 cond_var.notify_all();
113 }
114 });
115
116 {
117 std::unique_lock<std::mutex> lock(mtx);
118 cond_var.wait(lock, [&ran](){ return ran;});
119 }
120 return ran;
121 }
122
postWithDelay(std::chrono::steady_clock::duration delay,AsyncFunction fn)123 RunLoop::Token RunLoop::postWithDelay(
124 std::chrono::steady_clock::duration delay, AsyncFunction fn) {
125 CHECK(fn != nullptr);
126
127 auto token = mNextToken++;
128 insert({ std::chrono::steady_clock::now() + delay, fn, token });
129
130 return token;
131 }
132
cancelToken(Token token)133 bool RunLoop::cancelToken(Token token) {
134 std::lock_guard<std::mutex> autoLock(mLock);
135
136 bool found = false;
137 for (auto it = mQueue.begin(); it != mQueue.end(); ++it) {
138 if (it->mToken == token) {
139 mQueue.erase(it);
140
141 if (it == mQueue.begin()) {
142 interrupt();
143 }
144
145 found = true;
146 break;
147 }
148 }
149
150 return found;
151 }
152
postSocketRecv(int sock,AsyncFunction fn)153 void RunLoop::postSocketRecv(int sock, AsyncFunction fn) {
154 CHECK_GE(sock, 0);
155 CHECK(fn != nullptr);
156
157 std::lock_guard<std::mutex> autoLock(mLock);
158 mAddInfos.push_back({ sock, InfoType::RECV, fn });
159 interrupt();
160 }
161
postSocketSend(int sock,AsyncFunction fn)162 void RunLoop::postSocketSend(int sock, AsyncFunction fn) {
163 CHECK_GE(sock, 0);
164 CHECK(fn != nullptr);
165
166 std::lock_guard<std::mutex> autoLock(mLock);
167 mAddInfos.push_back({ sock, InfoType::SEND, fn });
168 interrupt();
169 }
170
cancelSocket(int sock)171 void RunLoop::cancelSocket(int sock) {
172 CHECK_GE(sock, 0);
173
174 std::lock_guard<std::mutex> autoLock(mLock);
175 mAddInfos.push_back({ sock, InfoType::CANCEL, nullptr });
176 interrupt();
177 }
178
insert(const QueueElem & elem)179 void RunLoop::insert(const QueueElem &elem) {
180 std::lock_guard<std::mutex> autoLock(mLock);
181
182 auto it = mQueue.begin();
183 while (it != mQueue.end() && *it <= elem) {
184 ++it;
185 }
186
187 if (it == mQueue.begin()) {
188 interrupt();
189 }
190
191 mQueue.insert(it, elem);
192 }
193
run()194 void RunLoop::run() {
195 mPThread = pthread_self();
196
197 std::map<int, SocketCallbacks> socketCallbacksByFd;
198 std::vector<pollfd> pollFds;
199
200 auto removePollFdAt = [&socketCallbacksByFd, &pollFds](size_t i) {
201 if (i + 1 == pollFds.size()) {
202 pollFds.pop_back();
203 } else {
204 // Instead of leaving a hole in the middle of the
205 // pollFds vector, we copy the last item into
206 // that hole and reduce the size of the vector by 1,
207 // taking are of updating the corresponding callback
208 // with the correct, new index.
209 pollFds[i] = pollFds.back();
210 pollFds.pop_back();
211 socketCallbacksByFd[pollFds[i].fd].mPollFdIndex = i;
212 }
213 };
214
215 // The control channel's pollFd will always be at index 0.
216 pollFds.push_back({ mControlFds[0], POLLIN, 0 });
217
218 for (;;) {
219 int timeoutMs = -1; // wait Forever
220
221 {
222 std::lock_guard<std::mutex> autoLock(mLock);
223
224 if (mDone) {
225 break;
226 }
227
228 for (const auto &addInfo : mAddInfos) {
229 const int sock = addInfo.mSock;
230 const auto fn = addInfo.mFn;
231
232 auto it = socketCallbacksByFd.find(sock);
233
234 switch (addInfo.mType) {
235 case InfoType::RECV:
236 {
237 if (it == socketCallbacksByFd.end()) {
238 socketCallbacksByFd[sock] = { fn, nullptr, pollFds.size() };
239 pollFds.push_back({ sock, POLLIN, 0 });
240 } else {
241 // There's already a pollFd for this socket.
242 CHECK(it->second.mSendFn != nullptr);
243
244 CHECK(it->second.mRecvFn == nullptr);
245 it->second.mRecvFn = fn;
246
247 pollFds[it->second.mPollFdIndex].events |= POLLIN;
248 }
249 break;
250 }
251
252 case InfoType::SEND:
253 {
254 if (it == socketCallbacksByFd.end()) {
255 socketCallbacksByFd[sock] = { nullptr, fn, pollFds.size() };
256 pollFds.push_back({ sock, POLLOUT, 0 });
257 } else {
258 // There's already a pollFd for this socket.
259 if (it->second.mRecvFn == nullptr) {
260 LOG(ERROR)
261 << "There's an entry but no recvFn "
262 "notification for socket "
263 << sock;
264 }
265
266 CHECK(it->second.mRecvFn != nullptr);
267
268 if (it->second.mSendFn != nullptr) {
269 LOG(ERROR)
270 << "There's already a pending send "
271 "notification for socket "
272 << sock;
273 }
274 CHECK(it->second.mSendFn == nullptr);
275 it->second.mSendFn = fn;
276
277 pollFds[it->second.mPollFdIndex].events |= POLLOUT;
278 }
279 break;
280 }
281
282 case InfoType::CANCEL:
283 {
284 if (it != socketCallbacksByFd.end()) {
285 const size_t i = it->second.mPollFdIndex;
286
287 socketCallbacksByFd.erase(it);
288 removePollFdAt(i);
289 }
290 break;
291 }
292 }
293 }
294
295 mAddInfos.clear();
296
297 if (!mQueue.empty()) {
298 timeoutMs = 0;
299
300 if (mQueue.front().mWhen) {
301 auto duration =
302 *mQueue.front().mWhen - std::chrono::steady_clock::now();
303
304 auto durationMs =
305 std::chrono::duration_cast<std::chrono::milliseconds>(duration);
306
307 if (durationMs.count() > 0) {
308 timeoutMs = static_cast<int>(durationMs.count());
309 }
310 }
311 }
312 }
313
314 int pollRes = 0;
315 if (timeoutMs != 0) {
316 // NOTE: The inequality is on purpose, we'll want to execute this
317 // code if timeoutMs == -1 (infinite) or timeoutMs > 0, but not
318 // if it's 0.
319
320 pollRes = poll(
321 pollFds.data(),
322 static_cast<nfds_t>(pollFds.size()),
323 timeoutMs);
324 }
325
326 if (pollRes < 0) {
327 if (errno != EINTR) {
328 std::cerr
329 << "poll FAILED w/ "
330 << errno
331 << " ("
332 << strerror(errno)
333 << ")"
334 << std::endl;
335 }
336
337 CHECK_EQ(errno, EINTR);
338 continue;
339 }
340
341 std::vector<AsyncFunction> fnArray;
342
343 {
344 std::lock_guard<std::mutex> autoLock(mLock);
345
346 if (pollRes > 0) {
347 if (pollFds[0].revents & POLLIN) {
348 ssize_t res;
349 do {
350 uint8_t c[32];
351 while ((res = read(mControlFds[0], c, sizeof(c))) < 0
352 && errno == EINTR) {
353 }
354 } while (res > 0);
355 CHECK(res < 0 && errno == EWOULDBLOCK);
356
357 --pollRes;
358 }
359
360 // NOTE: Skip index 0, as we already handled it above.
361 // Also, bail early if we exhausted all actionable pollFds
362 // according to pollRes.
363 for (size_t i = pollFds.size(); pollRes && i-- > 1;) {
364 pollfd &pollFd = pollFds[i];
365 const short revents = pollFd.revents;
366
367 if (revents) {
368 --pollRes;
369 }
370
371 const bool readable = (revents & POLLIN);
372 const bool writable = (revents & POLLOUT);
373 const bool dead = (revents & POLLNVAL);
374
375 bool removeCallback = dead;
376
377 if (readable || writable || dead) {
378 const int sock = pollFd.fd;
379
380 const auto &it = socketCallbacksByFd.find(sock);
381 auto &cb = it->second;
382 CHECK_EQ(cb.mPollFdIndex, i);
383
384 if (readable) {
385 CHECK(cb.mRecvFn != nullptr);
386 fnArray.push_back(cb.mRecvFn);
387 cb.mRecvFn = nullptr;
388 pollFd.events &= ~POLLIN;
389
390 removeCallback |= (cb.mSendFn == nullptr);
391 }
392
393 if (writable) {
394 CHECK(cb.mSendFn != nullptr);
395 fnArray.push_back(cb.mSendFn);
396 cb.mSendFn = nullptr;
397 pollFd.events &= ~POLLOUT;
398
399 removeCallback |= (cb.mRecvFn == nullptr);
400 }
401
402 if (removeCallback) {
403 socketCallbacksByFd.erase(it);
404 removePollFdAt(i);
405 }
406 }
407 }
408 } else {
409 // No interrupt, no socket notifications.
410 fnArray.push_back(mQueue.front().mFn);
411 mQueue.pop_front();
412 }
413 }
414
415 for (const auto &fn : fnArray) {
416 fn();
417 }
418 }
419 }
420
interrupt()421 void RunLoop::interrupt() {
422 uint8_t c = 1;
423 ssize_t res;
424 while ((res = write(mControlFds[1], &c, 1)) < 0 && errno == EINTR) {
425 }
426
427 CHECK_EQ(res, 1);
428 }
429
430 struct MainRunLoop : public RunLoop {
431 };
432
433 static std::mutex gLock;
434 static std::shared_ptr<RunLoop> gMainRunLoop;
435
436 // static
main()437 std::shared_ptr<RunLoop> RunLoop::main() {
438 std::lock_guard<std::mutex> autoLock(gLock);
439 if (!gMainRunLoop) {
440 gMainRunLoop = std::make_shared<MainRunLoop>();
441 }
442 return gMainRunLoop;
443 }
444
isCurrentThread() const445 bool RunLoop::isCurrentThread() const {
446 return pthread_equal(pthread_self(), mPThread);
447 }
448
449