• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2010 The Android Open Source Project
3 //
4 // A looper implementation based on epoll().
5 //
6 #define LOG_TAG "Looper"
7 
8 //#define LOG_NDEBUG 0
9 
10 // Debugs poll and wake interactions.
11 #define DEBUG_POLL_AND_WAKE 0
12 
13 // Debugs callback registration and invocation.
14 #define DEBUG_CALLBACKS 0
15 
16 #include <cutils/log.h>
17 #include <utils/Looper.h>
18 #include <utils/Timers.h>
19 
20 #include <unistd.h>
21 #include <fcntl.h>
22 
23 
24 namespace android {
25 
26 #ifdef LOOPER_USES_EPOLL
27 // Hint for number of file descriptors to be associated with the epoll instance.
28 static const int EPOLL_SIZE_HINT = 8;
29 
30 // Maximum number of file descriptors for which to retrieve poll events each iteration.
31 static const int EPOLL_MAX_EVENTS = 16;
32 #endif
33 
34 static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;
35 static pthread_key_t gTLSKey = 0;
36 
Looper(bool allowNonCallbacks)37 Looper::Looper(bool allowNonCallbacks) :
38         mAllowNonCallbacks(allowNonCallbacks),
39         mResponseIndex(0) {
40     int wakeFds[2];
41     int result = pipe(wakeFds);
42     LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe.  errno=%d", errno);
43 
44     mWakeReadPipeFd = wakeFds[0];
45     mWakeWritePipeFd = wakeFds[1];
46 
47     result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
48     LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking.  errno=%d",
49             errno);
50 
51     result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
52     LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking.  errno=%d",
53             errno);
54 
55 #ifdef LOOPER_USES_EPOLL
56     // Allocate the epoll instance and register the wake pipe.
57     mEpollFd = epoll_create(EPOLL_SIZE_HINT);
58     LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance.  errno=%d", errno);
59 
60     struct epoll_event eventItem;
61     memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
62     eventItem.events = EPOLLIN;
63     eventItem.data.fd = mWakeReadPipeFd;
64     result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
65     LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance.  errno=%d",
66             errno);
67 #else
68     // Add the wake pipe to the head of the request list with a null callback.
69     struct pollfd requestedFd;
70     requestedFd.fd = mWakeReadPipeFd;
71     requestedFd.events = POLLIN;
72     mRequestedFds.push(requestedFd);
73 
74     Request request;
75     request.fd = mWakeReadPipeFd;
76     request.callback = NULL;
77     request.ident = 0;
78     request.data = NULL;
79     mRequests.push(request);
80 
81     mPolling = false;
82     mWaiters = 0;
83 #endif
84 
85 #ifdef LOOPER_STATISTICS
86     mPendingWakeTime = -1;
87     mPendingWakeCount = 0;
88     mSampledWakeCycles = 0;
89     mSampledWakeCountSum = 0;
90     mSampledWakeLatencySum = 0;
91 
92     mSampledPolls = 0;
93     mSampledZeroPollCount = 0;
94     mSampledZeroPollLatencySum = 0;
95     mSampledTimeoutPollCount = 0;
96     mSampledTimeoutPollLatencySum = 0;
97 #endif
98 }
99 
~Looper()100 Looper::~Looper() {
101     close(mWakeReadPipeFd);
102     close(mWakeWritePipeFd);
103 #ifdef LOOPER_USES_EPOLL
104     close(mEpollFd);
105 #endif
106 }
107 
initTLSKey()108 void Looper::initTLSKey() {
109     int result = pthread_key_create(& gTLSKey, threadDestructor);
110     LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key.");
111 }
112 
threadDestructor(void * st)113 void Looper::threadDestructor(void *st) {
114     Looper* const self = static_cast<Looper*>(st);
115     if (self != NULL) {
116         self->decStrong((void*)threadDestructor);
117     }
118 }
119 
setForThread(const sp<Looper> & looper)120 void Looper::setForThread(const sp<Looper>& looper) {
121     sp<Looper> old = getForThread(); // also has side-effect of initializing TLS
122 
123     if (looper != NULL) {
124         looper->incStrong((void*)threadDestructor);
125     }
126 
127     pthread_setspecific(gTLSKey, looper.get());
128 
129     if (old != NULL) {
130         old->decStrong((void*)threadDestructor);
131     }
132 }
133 
getForThread()134 sp<Looper> Looper::getForThread() {
135     int result = pthread_once(& gTLSOnce, initTLSKey);
136     LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");
137 
138     return (Looper*)pthread_getspecific(gTLSKey);
139 }
140 
prepare(int opts)141 sp<Looper> Looper::prepare(int opts) {
142     bool allowNonCallbacks = opts & ALOOPER_PREPARE_ALLOW_NON_CALLBACKS;
143     sp<Looper> looper = Looper::getForThread();
144     if (looper == NULL) {
145         looper = new Looper(allowNonCallbacks);
146         Looper::setForThread(looper);
147     }
148     if (looper->getAllowNonCallbacks() != allowNonCallbacks) {
149         LOGW("Looper already prepared for this thread with a different value for the "
150                 "ALOOPER_PREPARE_ALLOW_NON_CALLBACKS option.");
151     }
152     return looper;
153 }
154 
getAllowNonCallbacks() const155 bool Looper::getAllowNonCallbacks() const {
156     return mAllowNonCallbacks;
157 }
158 
pollOnce(int timeoutMillis,int * outFd,int * outEvents,void ** outData)159 int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
160     int result = 0;
161     for (;;) {
162         while (mResponseIndex < mResponses.size()) {
163             const Response& response = mResponses.itemAt(mResponseIndex++);
164             if (! response.request.callback) {
165 #if DEBUG_POLL_AND_WAKE
166                 LOGD("%p ~ pollOnce - returning signalled identifier %d: "
167                         "fd=%d, events=0x%x, data=%p", this,
168                         response.request.ident, response.request.fd,
169                         response.events, response.request.data);
170 #endif
171                 if (outFd != NULL) *outFd = response.request.fd;
172                 if (outEvents != NULL) *outEvents = response.events;
173                 if (outData != NULL) *outData = response.request.data;
174                 return response.request.ident;
175             }
176         }
177 
178         if (result != 0) {
179 #if DEBUG_POLL_AND_WAKE
180             LOGD("%p ~ pollOnce - returning result %d", this, result);
181 #endif
182             if (outFd != NULL) *outFd = 0;
183             if (outEvents != NULL) *outEvents = NULL;
184             if (outData != NULL) *outData = NULL;
185             return result;
186         }
187 
188         result = pollInner(timeoutMillis);
189     }
190 }
191 
pollInner(int timeoutMillis)192 int Looper::pollInner(int timeoutMillis) {
193 #if DEBUG_POLL_AND_WAKE
194     LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
195 #endif
196 
197     int result = ALOOPER_POLL_WAKE;
198     mResponses.clear();
199     mResponseIndex = 0;
200 
201 #ifdef LOOPER_STATISTICS
202     nsecs_t pollStartTime = systemTime(SYSTEM_TIME_MONOTONIC);
203 #endif
204 
205 #ifdef LOOPER_USES_EPOLL
206     struct epoll_event eventItems[EPOLL_MAX_EVENTS];
207     int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
208     bool acquiredLock = false;
209 #else
210     // Wait for wakeAndLock() waiters to run then set mPolling to true.
211     mLock.lock();
212     while (mWaiters != 0) {
213         mResume.wait(mLock);
214     }
215     mPolling = true;
216     mLock.unlock();
217 
218     size_t requestedCount = mRequestedFds.size();
219     int eventCount = poll(mRequestedFds.editArray(), requestedCount, timeoutMillis);
220 #endif
221 
222     if (eventCount < 0) {
223         if (errno == EINTR) {
224             goto Done;
225         }
226 
227         LOGW("Poll failed with an unexpected error, errno=%d", errno);
228         result = ALOOPER_POLL_ERROR;
229         goto Done;
230     }
231 
232     if (eventCount == 0) {
233 #if DEBUG_POLL_AND_WAKE
234         LOGD("%p ~ pollOnce - timeout", this);
235 #endif
236         result = ALOOPER_POLL_TIMEOUT;
237         goto Done;
238     }
239 
240 #if DEBUG_POLL_AND_WAKE
241     LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
242 #endif
243 
244 #ifdef LOOPER_USES_EPOLL
245     for (int i = 0; i < eventCount; i++) {
246         int fd = eventItems[i].data.fd;
247         uint32_t epollEvents = eventItems[i].events;
248         if (fd == mWakeReadPipeFd) {
249             if (epollEvents & EPOLLIN) {
250                 awoken();
251             } else {
252                 LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
253             }
254         } else {
255             if (! acquiredLock) {
256                 mLock.lock();
257                 acquiredLock = true;
258             }
259 
260             ssize_t requestIndex = mRequests.indexOfKey(fd);
261             if (requestIndex >= 0) {
262                 int events = 0;
263                 if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
264                 if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
265                 if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
266                 if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
267                 pushResponse(events, mRequests.valueAt(requestIndex));
268             } else {
269                 LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
270                         "no longer registered.", epollEvents, fd);
271             }
272         }
273     }
274     if (acquiredLock) {
275         mLock.unlock();
276     }
277 Done: ;
278 #else
279     for (size_t i = 0; i < requestedCount; i++) {
280         const struct pollfd& requestedFd = mRequestedFds.itemAt(i);
281 
282         short pollEvents = requestedFd.revents;
283         if (pollEvents) {
284             if (requestedFd.fd == mWakeReadPipeFd) {
285                 if (pollEvents & POLLIN) {
286                     awoken();
287                 } else {
288                     LOGW("Ignoring unexpected poll events 0x%x on wake read pipe.", pollEvents);
289                 }
290             } else {
291                 int events = 0;
292                 if (pollEvents & POLLIN) events |= ALOOPER_EVENT_INPUT;
293                 if (pollEvents & POLLOUT) events |= ALOOPER_EVENT_OUTPUT;
294                 if (pollEvents & POLLERR) events |= ALOOPER_EVENT_ERROR;
295                 if (pollEvents & POLLHUP) events |= ALOOPER_EVENT_HANGUP;
296                 if (pollEvents & POLLNVAL) events |= ALOOPER_EVENT_INVALID;
297                 pushResponse(events, mRequests.itemAt(i));
298             }
299             if (--eventCount == 0) {
300                 break;
301             }
302         }
303     }
304 
305 Done:
306     // Set mPolling to false and wake up the wakeAndLock() waiters.
307     mLock.lock();
308     mPolling = false;
309     if (mWaiters != 0) {
310         mAwake.broadcast();
311     }
312     mLock.unlock();
313 #endif
314 
315 #ifdef LOOPER_STATISTICS
316     nsecs_t pollEndTime = systemTime(SYSTEM_TIME_MONOTONIC);
317     mSampledPolls += 1;
318     if (timeoutMillis == 0) {
319         mSampledZeroPollCount += 1;
320         mSampledZeroPollLatencySum += pollEndTime - pollStartTime;
321     } else if (timeoutMillis > 0 && result == ALOOPER_POLL_TIMEOUT) {
322         mSampledTimeoutPollCount += 1;
323         mSampledTimeoutPollLatencySum += pollEndTime - pollStartTime
324                 - milliseconds_to_nanoseconds(timeoutMillis);
325     }
326     if (mSampledPolls == SAMPLED_POLLS_TO_AGGREGATE) {
327         LOGD("%p ~ poll latency statistics: %0.3fms zero timeout, %0.3fms non-zero timeout", this,
328                 0.000001f * float(mSampledZeroPollLatencySum) / mSampledZeroPollCount,
329                 0.000001f * float(mSampledTimeoutPollLatencySum) / mSampledTimeoutPollCount);
330         mSampledPolls = 0;
331         mSampledZeroPollCount = 0;
332         mSampledZeroPollLatencySum = 0;
333         mSampledTimeoutPollCount = 0;
334         mSampledTimeoutPollLatencySum = 0;
335     }
336 #endif
337 
338     for (size_t i = 0; i < mResponses.size(); i++) {
339         const Response& response = mResponses.itemAt(i);
340         if (response.request.callback) {
341 #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
342             LOGD("%p ~ pollOnce - invoking callback: fd=%d, events=0x%x, data=%p", this,
343                     response.request.fd, response.events, response.request.data);
344 #endif
345             int callbackResult = response.request.callback(
346                     response.request.fd, response.events, response.request.data);
347             if (callbackResult == 0) {
348                 removeFd(response.request.fd);
349             }
350 
351             result = ALOOPER_POLL_CALLBACK;
352         }
353     }
354     return result;
355 }
356 
pollAll(int timeoutMillis,int * outFd,int * outEvents,void ** outData)357 int Looper::pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
358     if (timeoutMillis <= 0) {
359         int result;
360         do {
361             result = pollOnce(timeoutMillis, outFd, outEvents, outData);
362         } while (result == ALOOPER_POLL_CALLBACK);
363         return result;
364     } else {
365         nsecs_t endTime = systemTime(SYSTEM_TIME_MONOTONIC)
366                 + milliseconds_to_nanoseconds(timeoutMillis);
367 
368         for (;;) {
369             int result = pollOnce(timeoutMillis, outFd, outEvents, outData);
370             if (result != ALOOPER_POLL_CALLBACK) {
371                 return result;
372             }
373 
374             nsecs_t timeoutNanos = endTime - systemTime(SYSTEM_TIME_MONOTONIC);
375             if (timeoutNanos <= 0) {
376                 return ALOOPER_POLL_TIMEOUT;
377             }
378 
379             timeoutMillis = int(nanoseconds_to_milliseconds(timeoutNanos + 999999LL));
380         }
381     }
382 }
383 
wake()384 void Looper::wake() {
385 #if DEBUG_POLL_AND_WAKE
386     LOGD("%p ~ wake", this);
387 #endif
388 
389 #ifdef LOOPER_STATISTICS
390     // FIXME: Possible race with awoken() but this code is for testing only and is rarely enabled.
391     if (mPendingWakeCount++ == 0) {
392         mPendingWakeTime = systemTime(SYSTEM_TIME_MONOTONIC);
393     }
394 #endif
395 
396     ssize_t nWrite;
397     do {
398         nWrite = write(mWakeWritePipeFd, "W", 1);
399     } while (nWrite == -1 && errno == EINTR);
400 
401     if (nWrite != 1) {
402         if (errno != EAGAIN) {
403             LOGW("Could not write wake signal, errno=%d", errno);
404         }
405     }
406 }
407 
awoken()408 void Looper::awoken() {
409 #if DEBUG_POLL_AND_WAKE
410     LOGD("%p ~ awoken", this);
411 #endif
412 
413 #ifdef LOOPER_STATISTICS
414     if (mPendingWakeCount == 0) {
415         LOGD("%p ~ awoken: spurious!", this);
416     } else {
417         mSampledWakeCycles += 1;
418         mSampledWakeCountSum += mPendingWakeCount;
419         mSampledWakeLatencySum += systemTime(SYSTEM_TIME_MONOTONIC) - mPendingWakeTime;
420         mPendingWakeCount = 0;
421         mPendingWakeTime = -1;
422         if (mSampledWakeCycles == SAMPLED_WAKE_CYCLES_TO_AGGREGATE) {
423             LOGD("%p ~ wake statistics: %0.3fms wake latency, %0.3f wakes per cycle", this,
424                     0.000001f * float(mSampledWakeLatencySum) / mSampledWakeCycles,
425                     float(mSampledWakeCountSum) / mSampledWakeCycles);
426             mSampledWakeCycles = 0;
427             mSampledWakeCountSum = 0;
428             mSampledWakeLatencySum = 0;
429         }
430     }
431 #endif
432 
433     char buffer[16];
434     ssize_t nRead;
435     do {
436         nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
437     } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
438 }
439 
pushResponse(int events,const Request & request)440 void Looper::pushResponse(int events, const Request& request) {
441     Response response;
442     response.events = events;
443     response.request = request;
444     mResponses.push(response);
445 }
446 
addFd(int fd,int ident,int events,ALooper_callbackFunc callback,void * data)447 int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) {
448 #if DEBUG_CALLBACKS
449     LOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
450             events, callback, data);
451 #endif
452 
453     if (! callback) {
454         if (! mAllowNonCallbacks) {
455             LOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
456             return -1;
457         }
458 
459         if (ident < 0) {
460             LOGE("Invalid attempt to set NULL callback with ident <= 0.");
461             return -1;
462         }
463     }
464 
465 #ifdef LOOPER_USES_EPOLL
466     int epollEvents = 0;
467     if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
468     if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
469 
470     { // acquire lock
471         AutoMutex _l(mLock);
472 
473         Request request;
474         request.fd = fd;
475         request.ident = ident;
476         request.callback = callback;
477         request.data = data;
478 
479         struct epoll_event eventItem;
480         memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
481         eventItem.events = epollEvents;
482         eventItem.data.fd = fd;
483 
484         ssize_t requestIndex = mRequests.indexOfKey(fd);
485         if (requestIndex < 0) {
486             int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
487             if (epollResult < 0) {
488                 LOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
489                 return -1;
490             }
491             mRequests.add(fd, request);
492         } else {
493             int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
494             if (epollResult < 0) {
495                 LOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
496                 return -1;
497             }
498             mRequests.replaceValueAt(requestIndex, request);
499         }
500     } // release lock
501 #else
502     int pollEvents = 0;
503     if (events & ALOOPER_EVENT_INPUT) pollEvents |= POLLIN;
504     if (events & ALOOPER_EVENT_OUTPUT) pollEvents |= POLLOUT;
505 
506     wakeAndLock(); // acquire lock
507 
508     struct pollfd requestedFd;
509     requestedFd.fd = fd;
510     requestedFd.events = pollEvents;
511 
512     Request request;
513     request.fd = fd;
514     request.ident = ident;
515     request.callback = callback;
516     request.data = data;
517     ssize_t index = getRequestIndexLocked(fd);
518     if (index < 0) {
519         mRequestedFds.push(requestedFd);
520         mRequests.push(request);
521     } else {
522         mRequestedFds.replaceAt(requestedFd, size_t(index));
523         mRequests.replaceAt(request, size_t(index));
524     }
525 
526     mLock.unlock(); // release lock
527 #endif
528     return 1;
529 }
530 
removeFd(int fd)531 int Looper::removeFd(int fd) {
532 #if DEBUG_CALLBACKS
533     LOGD("%p ~ removeFd - fd=%d", this, fd);
534 #endif
535 
536 #ifdef LOOPER_USES_EPOLL
537     { // acquire lock
538         AutoMutex _l(mLock);
539         ssize_t requestIndex = mRequests.indexOfKey(fd);
540         if (requestIndex < 0) {
541             return 0;
542         }
543 
544         int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL);
545         if (epollResult < 0) {
546             LOGE("Error removing epoll events for fd %d, errno=%d", fd, errno);
547             return -1;
548         }
549 
550         mRequests.removeItemsAt(requestIndex);
551     } // release lock
552     return 1;
553 #else
554     wakeAndLock(); // acquire lock
555 
556     ssize_t index = getRequestIndexLocked(fd);
557     if (index >= 0) {
558         mRequestedFds.removeAt(size_t(index));
559         mRequests.removeAt(size_t(index));
560     }
561 
562     mLock.unlock(); // release lock
563     return index >= 0;
564 #endif
565 }
566 
567 #ifndef LOOPER_USES_EPOLL
getRequestIndexLocked(int fd)568 ssize_t Looper::getRequestIndexLocked(int fd) {
569     size_t requestCount = mRequestedFds.size();
570 
571     for (size_t i = 0; i < requestCount; i++) {
572         if (mRequestedFds.itemAt(i).fd == fd) {
573             return i;
574         }
575     }
576 
577     return -1;
578 }
579 
wakeAndLock()580 void Looper::wakeAndLock() {
581     mLock.lock();
582 
583     mWaiters += 1;
584     while (mPolling) {
585         wake();
586         mAwake.wait(mLock);
587     }
588 
589     mWaiters -= 1;
590     if (mWaiters == 0) {
591         mResume.signal();
592     }
593 }
594 #endif
595 
596 } // namespace android
597