• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2010 The Android Open Source Project
3 //
4 // A looper implementation based on epoll().
5 //
6 #define LOG_TAG "Looper"
7 
8 //#define LOG_NDEBUG 0
9 
10 // Debugs poll and wake interactions.
11 #define DEBUG_POLL_AND_WAKE 0
12 
13 // Debugs callback registration and invocation.
14 #define DEBUG_CALLBACKS 0
15 
16 #include <cutils/log.h>
17 #include <utils/Looper.h>
18 #include <utils/Timers.h>
19 
20 #include <unistd.h>
21 #include <fcntl.h>
22 #include <limits.h>
23 
24 
25 namespace android {
26 
27 // --- WeakMessageHandler ---
28 
WeakMessageHandler(const wp<MessageHandler> & handler)29 WeakMessageHandler::WeakMessageHandler(const wp<MessageHandler>& handler) :
30         mHandler(handler) {
31 }
32 
~WeakMessageHandler()33 WeakMessageHandler::~WeakMessageHandler() {
34 }
35 
handleMessage(const Message & message)36 void WeakMessageHandler::handleMessage(const Message& message) {
37     sp<MessageHandler> handler = mHandler.promote();
38     if (handler != NULL) {
39         handler->handleMessage(message);
40     }
41 }
42 
43 
44 // --- SimpleLooperCallback ---
45 
SimpleLooperCallback(ALooper_callbackFunc callback)46 SimpleLooperCallback::SimpleLooperCallback(ALooper_callbackFunc callback) :
47         mCallback(callback) {
48 }
49 
~SimpleLooperCallback()50 SimpleLooperCallback::~SimpleLooperCallback() {
51 }
52 
handleEvent(int fd,int events,void * data)53 int SimpleLooperCallback::handleEvent(int fd, int events, void* data) {
54     return mCallback(fd, events, data);
55 }
56 
57 
58 // --- Looper ---
59 
60 // Hint for number of file descriptors to be associated with the epoll instance.
61 static const int EPOLL_SIZE_HINT = 8;
62 
63 // Maximum number of file descriptors for which to retrieve poll events each iteration.
64 static const int EPOLL_MAX_EVENTS = 16;
65 
66 static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;
67 static pthread_key_t gTLSKey = 0;
68 
Looper(bool allowNonCallbacks)69 Looper::Looper(bool allowNonCallbacks) :
70         mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
71         mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
72     int wakeFds[2];
73     int result = pipe(wakeFds);
74     LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe.  errno=%d", errno);
75 
76     mWakeReadPipeFd = wakeFds[0];
77     mWakeWritePipeFd = wakeFds[1];
78 
79     result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
80     LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking.  errno=%d",
81             errno);
82 
83     result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
84     LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking.  errno=%d",
85             errno);
86 
87     // Allocate the epoll instance and register the wake pipe.
88     mEpollFd = epoll_create(EPOLL_SIZE_HINT);
89     LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance.  errno=%d", errno);
90 
91     struct epoll_event eventItem;
92     memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
93     eventItem.events = EPOLLIN;
94     eventItem.data.fd = mWakeReadPipeFd;
95     result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
96     LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance.  errno=%d",
97             errno);
98 }
99 
~Looper()100 Looper::~Looper() {
101     close(mWakeReadPipeFd);
102     close(mWakeWritePipeFd);
103     close(mEpollFd);
104 }
105 
initTLSKey()106 void Looper::initTLSKey() {
107     int result = pthread_key_create(& gTLSKey, threadDestructor);
108     LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key.");
109 }
110 
threadDestructor(void * st)111 void Looper::threadDestructor(void *st) {
112     Looper* const self = static_cast<Looper*>(st);
113     if (self != NULL) {
114         self->decStrong((void*)threadDestructor);
115     }
116 }
117 
setForThread(const sp<Looper> & looper)118 void Looper::setForThread(const sp<Looper>& looper) {
119     sp<Looper> old = getForThread(); // also has side-effect of initializing TLS
120 
121     if (looper != NULL) {
122         looper->incStrong((void*)threadDestructor);
123     }
124 
125     pthread_setspecific(gTLSKey, looper.get());
126 
127     if (old != NULL) {
128         old->decStrong((void*)threadDestructor);
129     }
130 }
131 
getForThread()132 sp<Looper> Looper::getForThread() {
133     int result = pthread_once(& gTLSOnce, initTLSKey);
134     LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");
135 
136     return (Looper*)pthread_getspecific(gTLSKey);
137 }
138 
prepare(int opts)139 sp<Looper> Looper::prepare(int opts) {
140     bool allowNonCallbacks = opts & ALOOPER_PREPARE_ALLOW_NON_CALLBACKS;
141     sp<Looper> looper = Looper::getForThread();
142     if (looper == NULL) {
143         looper = new Looper(allowNonCallbacks);
144         Looper::setForThread(looper);
145     }
146     if (looper->getAllowNonCallbacks() != allowNonCallbacks) {
147         ALOGW("Looper already prepared for this thread with a different value for the "
148                 "ALOOPER_PREPARE_ALLOW_NON_CALLBACKS option.");
149     }
150     return looper;
151 }
152 
getAllowNonCallbacks() const153 bool Looper::getAllowNonCallbacks() const {
154     return mAllowNonCallbacks;
155 }
156 
pollOnce(int timeoutMillis,int * outFd,int * outEvents,void ** outData)157 int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
158     int result = 0;
159     for (;;) {
160         while (mResponseIndex < mResponses.size()) {
161             const Response& response = mResponses.itemAt(mResponseIndex++);
162             int ident = response.request.ident;
163             if (ident >= 0) {
164                 int fd = response.request.fd;
165                 int events = response.events;
166                 void* data = response.request.data;
167 #if DEBUG_POLL_AND_WAKE
168                 ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
169                         "fd=%d, events=0x%x, data=%p",
170                         this, ident, fd, events, data);
171 #endif
172                 if (outFd != NULL) *outFd = fd;
173                 if (outEvents != NULL) *outEvents = events;
174                 if (outData != NULL) *outData = data;
175                 return ident;
176             }
177         }
178 
179         if (result != 0) {
180 #if DEBUG_POLL_AND_WAKE
181             ALOGD("%p ~ pollOnce - returning result %d", this, result);
182 #endif
183             if (outFd != NULL) *outFd = 0;
184             if (outEvents != NULL) *outEvents = 0;
185             if (outData != NULL) *outData = NULL;
186             return result;
187         }
188 
189         result = pollInner(timeoutMillis);
190     }
191 }
192 
pollInner(int timeoutMillis)193 int Looper::pollInner(int timeoutMillis) {
194 #if DEBUG_POLL_AND_WAKE
195     ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
196 #endif
197 
198     // Adjust the timeout based on when the next message is due.
199     if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
200         nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
201         int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
202         if (messageTimeoutMillis >= 0
203                 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
204             timeoutMillis = messageTimeoutMillis;
205         }
206 #if DEBUG_POLL_AND_WAKE
207         ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
208                 this, mNextMessageUptime - now, timeoutMillis);
209 #endif
210     }
211 
212     // Poll.
213     int result = ALOOPER_POLL_WAKE;
214     mResponses.clear();
215     mResponseIndex = 0;
216 
217     struct epoll_event eventItems[EPOLL_MAX_EVENTS];
218     int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
219 
220     // Acquire lock.
221     mLock.lock();
222 
223     // Check for poll error.
224     if (eventCount < 0) {
225         if (errno == EINTR) {
226             goto Done;
227         }
228         ALOGW("Poll failed with an unexpected error, errno=%d", errno);
229         result = ALOOPER_POLL_ERROR;
230         goto Done;
231     }
232 
233     // Check for poll timeout.
234     if (eventCount == 0) {
235 #if DEBUG_POLL_AND_WAKE
236         ALOGD("%p ~ pollOnce - timeout", this);
237 #endif
238         result = ALOOPER_POLL_TIMEOUT;
239         goto Done;
240     }
241 
242     // Handle all events.
243 #if DEBUG_POLL_AND_WAKE
244     ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
245 #endif
246 
247     for (int i = 0; i < eventCount; i++) {
248         int fd = eventItems[i].data.fd;
249         uint32_t epollEvents = eventItems[i].events;
250         if (fd == mWakeReadPipeFd) {
251             if (epollEvents & EPOLLIN) {
252                 awoken();
253             } else {
254                 ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
255             }
256         } else {
257             ssize_t requestIndex = mRequests.indexOfKey(fd);
258             if (requestIndex >= 0) {
259                 int events = 0;
260                 if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
261                 if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
262                 if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
263                 if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
264                 pushResponse(events, mRequests.valueAt(requestIndex));
265             } else {
266                 ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
267                         "no longer registered.", epollEvents, fd);
268             }
269         }
270     }
271 Done: ;
272 
273     // Invoke pending message callbacks.
274     mNextMessageUptime = LLONG_MAX;
275     while (mMessageEnvelopes.size() != 0) {
276         nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
277         const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
278         if (messageEnvelope.uptime <= now) {
279             // Remove the envelope from the list.
280             // We keep a strong reference to the handler until the call to handleMessage
281             // finishes.  Then we drop it so that the handler can be deleted *before*
282             // we reacquire our lock.
283             { // obtain handler
284                 sp<MessageHandler> handler = messageEnvelope.handler;
285                 Message message = messageEnvelope.message;
286                 mMessageEnvelopes.removeAt(0);
287                 mSendingMessage = true;
288                 mLock.unlock();
289 
290 #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
291                 ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
292                         this, handler.get(), message.what);
293 #endif
294                 handler->handleMessage(message);
295             } // release handler
296 
297             mLock.lock();
298             mSendingMessage = false;
299             result = ALOOPER_POLL_CALLBACK;
300         } else {
301             // The last message left at the head of the queue determines the next wakeup time.
302             mNextMessageUptime = messageEnvelope.uptime;
303             break;
304         }
305     }
306 
307     // Release lock.
308     mLock.unlock();
309 
310     // Invoke all response callbacks.
311     for (size_t i = 0; i < mResponses.size(); i++) {
312         Response& response = mResponses.editItemAt(i);
313         if (response.request.ident == ALOOPER_POLL_CALLBACK) {
314             int fd = response.request.fd;
315             int events = response.events;
316             void* data = response.request.data;
317 #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
318             ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
319                     this, response.request.callback.get(), fd, events, data);
320 #endif
321             int callbackResult = response.request.callback->handleEvent(fd, events, data);
322             if (callbackResult == 0) {
323                 removeFd(fd);
324             }
325             // Clear the callback reference in the response structure promptly because we
326             // will not clear the response vector itself until the next poll.
327             response.request.callback.clear();
328             result = ALOOPER_POLL_CALLBACK;
329         }
330     }
331     return result;
332 }
333 
pollAll(int timeoutMillis,int * outFd,int * outEvents,void ** outData)334 int Looper::pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
335     if (timeoutMillis <= 0) {
336         int result;
337         do {
338             result = pollOnce(timeoutMillis, outFd, outEvents, outData);
339         } while (result == ALOOPER_POLL_CALLBACK);
340         return result;
341     } else {
342         nsecs_t endTime = systemTime(SYSTEM_TIME_MONOTONIC)
343                 + milliseconds_to_nanoseconds(timeoutMillis);
344 
345         for (;;) {
346             int result = pollOnce(timeoutMillis, outFd, outEvents, outData);
347             if (result != ALOOPER_POLL_CALLBACK) {
348                 return result;
349             }
350 
351             nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
352             timeoutMillis = toMillisecondTimeoutDelay(now, endTime);
353             if (timeoutMillis == 0) {
354                 return ALOOPER_POLL_TIMEOUT;
355             }
356         }
357     }
358 }
359 
wake()360 void Looper::wake() {
361 #if DEBUG_POLL_AND_WAKE
362     ALOGD("%p ~ wake", this);
363 #endif
364 
365     ssize_t nWrite;
366     do {
367         nWrite = write(mWakeWritePipeFd, "W", 1);
368     } while (nWrite == -1 && errno == EINTR);
369 
370     if (nWrite != 1) {
371         if (errno != EAGAIN) {
372             ALOGW("Could not write wake signal, errno=%d", errno);
373         }
374     }
375 }
376 
awoken()377 void Looper::awoken() {
378 #if DEBUG_POLL_AND_WAKE
379     ALOGD("%p ~ awoken", this);
380 #endif
381 
382     char buffer[16];
383     ssize_t nRead;
384     do {
385         nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
386     } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
387 }
388 
pushResponse(int events,const Request & request)389 void Looper::pushResponse(int events, const Request& request) {
390     Response response;
391     response.events = events;
392     response.request = request;
393     mResponses.push(response);
394 }
395 
addFd(int fd,int ident,int events,ALooper_callbackFunc callback,void * data)396 int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) {
397     return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data);
398 }
399 
addFd(int fd,int ident,int events,const sp<LooperCallback> & callback,void * data)400 int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
401 #if DEBUG_CALLBACKS
402     ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
403             events, callback.get(), data);
404 #endif
405 
406     if (!callback.get()) {
407         if (! mAllowNonCallbacks) {
408             ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
409             return -1;
410         }
411 
412         if (ident < 0) {
413             ALOGE("Invalid attempt to set NULL callback with ident < 0.");
414             return -1;
415         }
416     } else {
417         ident = ALOOPER_POLL_CALLBACK;
418     }
419 
420     int epollEvents = 0;
421     if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
422     if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
423 
424     { // acquire lock
425         AutoMutex _l(mLock);
426 
427         Request request;
428         request.fd = fd;
429         request.ident = ident;
430         request.callback = callback;
431         request.data = data;
432 
433         struct epoll_event eventItem;
434         memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
435         eventItem.events = epollEvents;
436         eventItem.data.fd = fd;
437 
438         ssize_t requestIndex = mRequests.indexOfKey(fd);
439         if (requestIndex < 0) {
440             int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
441             if (epollResult < 0) {
442                 ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
443                 return -1;
444             }
445             mRequests.add(fd, request);
446         } else {
447             int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
448             if (epollResult < 0) {
449                 ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
450                 return -1;
451             }
452             mRequests.replaceValueAt(requestIndex, request);
453         }
454     } // release lock
455     return 1;
456 }
457 
removeFd(int fd)458 int Looper::removeFd(int fd) {
459 #if DEBUG_CALLBACKS
460     ALOGD("%p ~ removeFd - fd=%d", this, fd);
461 #endif
462 
463     { // acquire lock
464         AutoMutex _l(mLock);
465         ssize_t requestIndex = mRequests.indexOfKey(fd);
466         if (requestIndex < 0) {
467             return 0;
468         }
469 
470         int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL);
471         if (epollResult < 0) {
472             ALOGE("Error removing epoll events for fd %d, errno=%d", fd, errno);
473             return -1;
474         }
475 
476         mRequests.removeItemsAt(requestIndex);
477     } // release lock
478     return 1;
479 }
480 
sendMessage(const sp<MessageHandler> & handler,const Message & message)481 void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
482     nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
483     sendMessageAtTime(now, handler, message);
484 }
485 
sendMessageDelayed(nsecs_t uptimeDelay,const sp<MessageHandler> & handler,const Message & message)486 void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
487         const Message& message) {
488     nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
489     sendMessageAtTime(now + uptimeDelay, handler, message);
490 }
491 
sendMessageAtTime(nsecs_t uptime,const sp<MessageHandler> & handler,const Message & message)492 void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
493         const Message& message) {
494 #if DEBUG_CALLBACKS
495     ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d",
496             this, uptime, handler.get(), message.what);
497 #endif
498 
499     size_t i = 0;
500     { // acquire lock
501         AutoMutex _l(mLock);
502 
503         size_t messageCount = mMessageEnvelopes.size();
504         while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
505             i += 1;
506         }
507 
508         MessageEnvelope messageEnvelope(uptime, handler, message);
509         mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
510 
511         // Optimization: If the Looper is currently sending a message, then we can skip
512         // the call to wake() because the next thing the Looper will do after processing
513         // messages is to decide when the next wakeup time should be.  In fact, it does
514         // not even matter whether this code is running on the Looper thread.
515         if (mSendingMessage) {
516             return;
517         }
518     } // release lock
519 
520     // Wake the poll loop only when we enqueue a new message at the head.
521     if (i == 0) {
522         wake();
523     }
524 }
525 
removeMessages(const sp<MessageHandler> & handler)526 void Looper::removeMessages(const sp<MessageHandler>& handler) {
527 #if DEBUG_CALLBACKS
528     ALOGD("%p ~ removeMessages - handler=%p", this, handler.get());
529 #endif
530 
531     { // acquire lock
532         AutoMutex _l(mLock);
533 
534         for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
535             const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
536             if (messageEnvelope.handler == handler) {
537                 mMessageEnvelopes.removeAt(i);
538             }
539         }
540     } // release lock
541 }
542 
removeMessages(const sp<MessageHandler> & handler,int what)543 void Looper::removeMessages(const sp<MessageHandler>& handler, int what) {
544 #if DEBUG_CALLBACKS
545     ALOGD("%p ~ removeMessages - handler=%p, what=%d", this, handler.get(), what);
546 #endif
547 
548     { // acquire lock
549         AutoMutex _l(mLock);
550 
551         for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
552             const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
553             if (messageEnvelope.handler == handler
554                     && messageEnvelope.message.what == what) {
555                 mMessageEnvelopes.removeAt(i);
556             }
557         }
558     } // release lock
559 }
560 
561 } // namespace android
562