1 //
2 // Copyright 2010 The Android Open Source Project
3 //
4 // A looper implementation based on epoll().
5 //
6 #define LOG_TAG "Looper"
7
8 //#define LOG_NDEBUG 0
9
10 // Debugs poll and wake interactions.
11 #define DEBUG_POLL_AND_WAKE 0
12
13 // Debugs callback registration and invocation.
14 #define DEBUG_CALLBACKS 0
15
16 #include <cutils/log.h>
17 #include <utils/Looper.h>
18 #include <utils/Timers.h>
19
20 #include <unistd.h>
21 #include <fcntl.h>
22 #include <limits.h>
23
24
25 namespace android {
26
27 // --- WeakMessageHandler ---
28
WeakMessageHandler(const wp<MessageHandler> & handler)29 WeakMessageHandler::WeakMessageHandler(const wp<MessageHandler>& handler) :
30 mHandler(handler) {
31 }
32
~WeakMessageHandler()33 WeakMessageHandler::~WeakMessageHandler() {
34 }
35
handleMessage(const Message & message)36 void WeakMessageHandler::handleMessage(const Message& message) {
37 sp<MessageHandler> handler = mHandler.promote();
38 if (handler != NULL) {
39 handler->handleMessage(message);
40 }
41 }
42
43
44 // --- SimpleLooperCallback ---
45
SimpleLooperCallback(ALooper_callbackFunc callback)46 SimpleLooperCallback::SimpleLooperCallback(ALooper_callbackFunc callback) :
47 mCallback(callback) {
48 }
49
~SimpleLooperCallback()50 SimpleLooperCallback::~SimpleLooperCallback() {
51 }
52
handleEvent(int fd,int events,void * data)53 int SimpleLooperCallback::handleEvent(int fd, int events, void* data) {
54 return mCallback(fd, events, data);
55 }
56
57
58 // --- Looper ---
59
60 // Hint for number of file descriptors to be associated with the epoll instance.
61 static const int EPOLL_SIZE_HINT = 8;
62
63 // Maximum number of file descriptors for which to retrieve poll events each iteration.
64 static const int EPOLL_MAX_EVENTS = 16;
65
66 static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;
67 static pthread_key_t gTLSKey = 0;
68
Looper(bool allowNonCallbacks)69 Looper::Looper(bool allowNonCallbacks) :
70 mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
71 mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
72 int wakeFds[2];
73 int result = pipe(wakeFds);
74 LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno);
75
76 mWakeReadPipeFd = wakeFds[0];
77 mWakeWritePipeFd = wakeFds[1];
78
79 result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
80 LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d",
81 errno);
82
83 result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
84 LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d",
85 errno);
86
87 // Allocate the epoll instance and register the wake pipe.
88 mEpollFd = epoll_create(EPOLL_SIZE_HINT);
89 LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
90
91 struct epoll_event eventItem;
92 memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
93 eventItem.events = EPOLLIN;
94 eventItem.data.fd = mWakeReadPipeFd;
95 result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
96 LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d",
97 errno);
98 }
99
~Looper()100 Looper::~Looper() {
101 close(mWakeReadPipeFd);
102 close(mWakeWritePipeFd);
103 close(mEpollFd);
104 }
105
initTLSKey()106 void Looper::initTLSKey() {
107 int result = pthread_key_create(& gTLSKey, threadDestructor);
108 LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key.");
109 }
110
threadDestructor(void * st)111 void Looper::threadDestructor(void *st) {
112 Looper* const self = static_cast<Looper*>(st);
113 if (self != NULL) {
114 self->decStrong((void*)threadDestructor);
115 }
116 }
117
setForThread(const sp<Looper> & looper)118 void Looper::setForThread(const sp<Looper>& looper) {
119 sp<Looper> old = getForThread(); // also has side-effect of initializing TLS
120
121 if (looper != NULL) {
122 looper->incStrong((void*)threadDestructor);
123 }
124
125 pthread_setspecific(gTLSKey, looper.get());
126
127 if (old != NULL) {
128 old->decStrong((void*)threadDestructor);
129 }
130 }
131
getForThread()132 sp<Looper> Looper::getForThread() {
133 int result = pthread_once(& gTLSOnce, initTLSKey);
134 LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");
135
136 return (Looper*)pthread_getspecific(gTLSKey);
137 }
138
prepare(int opts)139 sp<Looper> Looper::prepare(int opts) {
140 bool allowNonCallbacks = opts & ALOOPER_PREPARE_ALLOW_NON_CALLBACKS;
141 sp<Looper> looper = Looper::getForThread();
142 if (looper == NULL) {
143 looper = new Looper(allowNonCallbacks);
144 Looper::setForThread(looper);
145 }
146 if (looper->getAllowNonCallbacks() != allowNonCallbacks) {
147 ALOGW("Looper already prepared for this thread with a different value for the "
148 "ALOOPER_PREPARE_ALLOW_NON_CALLBACKS option.");
149 }
150 return looper;
151 }
152
getAllowNonCallbacks() const153 bool Looper::getAllowNonCallbacks() const {
154 return mAllowNonCallbacks;
155 }
156
pollOnce(int timeoutMillis,int * outFd,int * outEvents,void ** outData)157 int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
158 int result = 0;
159 for (;;) {
160 while (mResponseIndex < mResponses.size()) {
161 const Response& response = mResponses.itemAt(mResponseIndex++);
162 int ident = response.request.ident;
163 if (ident >= 0) {
164 int fd = response.request.fd;
165 int events = response.events;
166 void* data = response.request.data;
167 #if DEBUG_POLL_AND_WAKE
168 ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
169 "fd=%d, events=0x%x, data=%p",
170 this, ident, fd, events, data);
171 #endif
172 if (outFd != NULL) *outFd = fd;
173 if (outEvents != NULL) *outEvents = events;
174 if (outData != NULL) *outData = data;
175 return ident;
176 }
177 }
178
179 if (result != 0) {
180 #if DEBUG_POLL_AND_WAKE
181 ALOGD("%p ~ pollOnce - returning result %d", this, result);
182 #endif
183 if (outFd != NULL) *outFd = 0;
184 if (outEvents != NULL) *outEvents = 0;
185 if (outData != NULL) *outData = NULL;
186 return result;
187 }
188
189 result = pollInner(timeoutMillis);
190 }
191 }
192
pollInner(int timeoutMillis)193 int Looper::pollInner(int timeoutMillis) {
194 #if DEBUG_POLL_AND_WAKE
195 ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
196 #endif
197
198 // Adjust the timeout based on when the next message is due.
199 if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
200 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
201 int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
202 if (messageTimeoutMillis >= 0
203 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
204 timeoutMillis = messageTimeoutMillis;
205 }
206 #if DEBUG_POLL_AND_WAKE
207 ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
208 this, mNextMessageUptime - now, timeoutMillis);
209 #endif
210 }
211
212 // Poll.
213 int result = ALOOPER_POLL_WAKE;
214 mResponses.clear();
215 mResponseIndex = 0;
216
217 struct epoll_event eventItems[EPOLL_MAX_EVENTS];
218 int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
219
220 // Acquire lock.
221 mLock.lock();
222
223 // Check for poll error.
224 if (eventCount < 0) {
225 if (errno == EINTR) {
226 goto Done;
227 }
228 ALOGW("Poll failed with an unexpected error, errno=%d", errno);
229 result = ALOOPER_POLL_ERROR;
230 goto Done;
231 }
232
233 // Check for poll timeout.
234 if (eventCount == 0) {
235 #if DEBUG_POLL_AND_WAKE
236 ALOGD("%p ~ pollOnce - timeout", this);
237 #endif
238 result = ALOOPER_POLL_TIMEOUT;
239 goto Done;
240 }
241
242 // Handle all events.
243 #if DEBUG_POLL_AND_WAKE
244 ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
245 #endif
246
247 for (int i = 0; i < eventCount; i++) {
248 int fd = eventItems[i].data.fd;
249 uint32_t epollEvents = eventItems[i].events;
250 if (fd == mWakeReadPipeFd) {
251 if (epollEvents & EPOLLIN) {
252 awoken();
253 } else {
254 ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
255 }
256 } else {
257 ssize_t requestIndex = mRequests.indexOfKey(fd);
258 if (requestIndex >= 0) {
259 int events = 0;
260 if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
261 if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
262 if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
263 if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
264 pushResponse(events, mRequests.valueAt(requestIndex));
265 } else {
266 ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
267 "no longer registered.", epollEvents, fd);
268 }
269 }
270 }
271 Done: ;
272
273 // Invoke pending message callbacks.
274 mNextMessageUptime = LLONG_MAX;
275 while (mMessageEnvelopes.size() != 0) {
276 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
277 const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
278 if (messageEnvelope.uptime <= now) {
279 // Remove the envelope from the list.
280 // We keep a strong reference to the handler until the call to handleMessage
281 // finishes. Then we drop it so that the handler can be deleted *before*
282 // we reacquire our lock.
283 { // obtain handler
284 sp<MessageHandler> handler = messageEnvelope.handler;
285 Message message = messageEnvelope.message;
286 mMessageEnvelopes.removeAt(0);
287 mSendingMessage = true;
288 mLock.unlock();
289
290 #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
291 ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
292 this, handler.get(), message.what);
293 #endif
294 handler->handleMessage(message);
295 } // release handler
296
297 mLock.lock();
298 mSendingMessage = false;
299 result = ALOOPER_POLL_CALLBACK;
300 } else {
301 // The last message left at the head of the queue determines the next wakeup time.
302 mNextMessageUptime = messageEnvelope.uptime;
303 break;
304 }
305 }
306
307 // Release lock.
308 mLock.unlock();
309
310 // Invoke all response callbacks.
311 for (size_t i = 0; i < mResponses.size(); i++) {
312 Response& response = mResponses.editItemAt(i);
313 if (response.request.ident == ALOOPER_POLL_CALLBACK) {
314 int fd = response.request.fd;
315 int events = response.events;
316 void* data = response.request.data;
317 #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
318 ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
319 this, response.request.callback.get(), fd, events, data);
320 #endif
321 int callbackResult = response.request.callback->handleEvent(fd, events, data);
322 if (callbackResult == 0) {
323 removeFd(fd);
324 }
325 // Clear the callback reference in the response structure promptly because we
326 // will not clear the response vector itself until the next poll.
327 response.request.callback.clear();
328 result = ALOOPER_POLL_CALLBACK;
329 }
330 }
331 return result;
332 }
333
pollAll(int timeoutMillis,int * outFd,int * outEvents,void ** outData)334 int Looper::pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
335 if (timeoutMillis <= 0) {
336 int result;
337 do {
338 result = pollOnce(timeoutMillis, outFd, outEvents, outData);
339 } while (result == ALOOPER_POLL_CALLBACK);
340 return result;
341 } else {
342 nsecs_t endTime = systemTime(SYSTEM_TIME_MONOTONIC)
343 + milliseconds_to_nanoseconds(timeoutMillis);
344
345 for (;;) {
346 int result = pollOnce(timeoutMillis, outFd, outEvents, outData);
347 if (result != ALOOPER_POLL_CALLBACK) {
348 return result;
349 }
350
351 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
352 timeoutMillis = toMillisecondTimeoutDelay(now, endTime);
353 if (timeoutMillis == 0) {
354 return ALOOPER_POLL_TIMEOUT;
355 }
356 }
357 }
358 }
359
wake()360 void Looper::wake() {
361 #if DEBUG_POLL_AND_WAKE
362 ALOGD("%p ~ wake", this);
363 #endif
364
365 ssize_t nWrite;
366 do {
367 nWrite = write(mWakeWritePipeFd, "W", 1);
368 } while (nWrite == -1 && errno == EINTR);
369
370 if (nWrite != 1) {
371 if (errno != EAGAIN) {
372 ALOGW("Could not write wake signal, errno=%d", errno);
373 }
374 }
375 }
376
awoken()377 void Looper::awoken() {
378 #if DEBUG_POLL_AND_WAKE
379 ALOGD("%p ~ awoken", this);
380 #endif
381
382 char buffer[16];
383 ssize_t nRead;
384 do {
385 nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
386 } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
387 }
388
pushResponse(int events,const Request & request)389 void Looper::pushResponse(int events, const Request& request) {
390 Response response;
391 response.events = events;
392 response.request = request;
393 mResponses.push(response);
394 }
395
addFd(int fd,int ident,int events,ALooper_callbackFunc callback,void * data)396 int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) {
397 return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data);
398 }
399
addFd(int fd,int ident,int events,const sp<LooperCallback> & callback,void * data)400 int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
401 #if DEBUG_CALLBACKS
402 ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
403 events, callback.get(), data);
404 #endif
405
406 if (!callback.get()) {
407 if (! mAllowNonCallbacks) {
408 ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
409 return -1;
410 }
411
412 if (ident < 0) {
413 ALOGE("Invalid attempt to set NULL callback with ident < 0.");
414 return -1;
415 }
416 } else {
417 ident = ALOOPER_POLL_CALLBACK;
418 }
419
420 int epollEvents = 0;
421 if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
422 if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
423
424 { // acquire lock
425 AutoMutex _l(mLock);
426
427 Request request;
428 request.fd = fd;
429 request.ident = ident;
430 request.callback = callback;
431 request.data = data;
432
433 struct epoll_event eventItem;
434 memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
435 eventItem.events = epollEvents;
436 eventItem.data.fd = fd;
437
438 ssize_t requestIndex = mRequests.indexOfKey(fd);
439 if (requestIndex < 0) {
440 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
441 if (epollResult < 0) {
442 ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
443 return -1;
444 }
445 mRequests.add(fd, request);
446 } else {
447 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
448 if (epollResult < 0) {
449 ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
450 return -1;
451 }
452 mRequests.replaceValueAt(requestIndex, request);
453 }
454 } // release lock
455 return 1;
456 }
457
removeFd(int fd)458 int Looper::removeFd(int fd) {
459 #if DEBUG_CALLBACKS
460 ALOGD("%p ~ removeFd - fd=%d", this, fd);
461 #endif
462
463 { // acquire lock
464 AutoMutex _l(mLock);
465 ssize_t requestIndex = mRequests.indexOfKey(fd);
466 if (requestIndex < 0) {
467 return 0;
468 }
469
470 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL);
471 if (epollResult < 0) {
472 ALOGE("Error removing epoll events for fd %d, errno=%d", fd, errno);
473 return -1;
474 }
475
476 mRequests.removeItemsAt(requestIndex);
477 } // release lock
478 return 1;
479 }
480
sendMessage(const sp<MessageHandler> & handler,const Message & message)481 void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
482 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
483 sendMessageAtTime(now, handler, message);
484 }
485
sendMessageDelayed(nsecs_t uptimeDelay,const sp<MessageHandler> & handler,const Message & message)486 void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
487 const Message& message) {
488 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
489 sendMessageAtTime(now + uptimeDelay, handler, message);
490 }
491
sendMessageAtTime(nsecs_t uptime,const sp<MessageHandler> & handler,const Message & message)492 void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
493 const Message& message) {
494 #if DEBUG_CALLBACKS
495 ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d",
496 this, uptime, handler.get(), message.what);
497 #endif
498
499 size_t i = 0;
500 { // acquire lock
501 AutoMutex _l(mLock);
502
503 size_t messageCount = mMessageEnvelopes.size();
504 while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
505 i += 1;
506 }
507
508 MessageEnvelope messageEnvelope(uptime, handler, message);
509 mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
510
511 // Optimization: If the Looper is currently sending a message, then we can skip
512 // the call to wake() because the next thing the Looper will do after processing
513 // messages is to decide when the next wakeup time should be. In fact, it does
514 // not even matter whether this code is running on the Looper thread.
515 if (mSendingMessage) {
516 return;
517 }
518 } // release lock
519
520 // Wake the poll loop only when we enqueue a new message at the head.
521 if (i == 0) {
522 wake();
523 }
524 }
525
removeMessages(const sp<MessageHandler> & handler)526 void Looper::removeMessages(const sp<MessageHandler>& handler) {
527 #if DEBUG_CALLBACKS
528 ALOGD("%p ~ removeMessages - handler=%p", this, handler.get());
529 #endif
530
531 { // acquire lock
532 AutoMutex _l(mLock);
533
534 for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
535 const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
536 if (messageEnvelope.handler == handler) {
537 mMessageEnvelopes.removeAt(i);
538 }
539 }
540 } // release lock
541 }
542
removeMessages(const sp<MessageHandler> & handler,int what)543 void Looper::removeMessages(const sp<MessageHandler>& handler, int what) {
544 #if DEBUG_CALLBACKS
545 ALOGD("%p ~ removeMessages - handler=%p, what=%d", this, handler.get(), what);
546 #endif
547
548 { // acquire lock
549 AutoMutex _l(mLock);
550
551 for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
552 const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
553 if (messageEnvelope.handler == handler
554 && messageEnvelope.message.what == what) {
555 mMessageEnvelopes.removeAt(i);
556 }
557 }
558 } // release lock
559 }
560
561 } // namespace android
562