1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "ALooper"
19
20 #include <media/stagefright/foundation/ADebug.h>
21
22 #include <utils/Log.h>
23
24 #include <sys/time.h>
25
26 #include "ALooper.h"
27
28 #include "AHandler.h"
29 #include "ALooperRoster.h"
30 #include "AMessage.h"
31
32 namespace android {
33
34 ALooperRoster gLooperRoster;
35
36 struct ALooper::LooperThread : public Thread {
LooperThreadandroid::ALooper::LooperThread37 LooperThread(ALooper *looper, bool canCallJava)
38 : Thread(canCallJava),
39 mLooper(looper),
40 mThreadId(NULL) {
41 }
42
readyToRunandroid::ALooper::LooperThread43 virtual status_t readyToRun() {
44 mThreadId = androidGetThreadId();
45
46 return Thread::readyToRun();
47 }
48
threadLoopandroid::ALooper::LooperThread49 virtual bool threadLoop() {
50 return mLooper->loop();
51 }
52
isCurrentThreadandroid::ALooper::LooperThread53 bool isCurrentThread() const {
54 return mThreadId == androidGetThreadId();
55 }
56
57 protected:
~LooperThreadandroid::ALooper::LooperThread58 virtual ~LooperThread() {}
59
60 private:
61 ALooper *mLooper;
62 android_thread_id_t mThreadId;
63
64 DISALLOW_EVIL_CONSTRUCTORS(LooperThread);
65 };
66
67 // static
GetNowUs()68 int64_t ALooper::GetNowUs() {
69 return systemTime(SYSTEM_TIME_MONOTONIC) / 1000LL;
70 }
71
getNowUs()72 int64_t ALooper::getNowUs() {
73 return GetNowUs();
74 }
75
ALooper()76 ALooper::ALooper()
77 : mRunningLocally(false) {
78 // clean up stale AHandlers. Doing it here instead of in the destructor avoids
79 // the side effect of objects being deleted from the unregister function recursively.
80 gLooperRoster.unregisterStaleHandlers();
81 }
82
~ALooper()83 ALooper::~ALooper() {
84 stop();
85 // stale AHandlers are now cleaned up in the constructor of the next ALooper to come along
86 }
87
setName(const char * name)88 void ALooper::setName(const char *name) {
89 mName = name;
90 }
91
registerHandler(const sp<AHandler> & handler)92 ALooper::handler_id ALooper::registerHandler(const sp<AHandler> &handler) {
93 return gLooperRoster.registerHandler(this, handler);
94 }
95
unregisterHandler(handler_id handlerID)96 void ALooper::unregisterHandler(handler_id handlerID) {
97 gLooperRoster.unregisterHandler(handlerID);
98 }
99
start(bool runOnCallingThread,bool canCallJava,int32_t priority)100 status_t ALooper::start(
101 bool runOnCallingThread, bool canCallJava, int32_t priority) {
102 if (runOnCallingThread) {
103 {
104 Mutex::Autolock autoLock(mLock);
105
106 if (mThread != NULL || mRunningLocally) {
107 return INVALID_OPERATION;
108 }
109
110 mRunningLocally = true;
111 }
112
113 do {
114 } while (loop());
115
116 return OK;
117 }
118
119 Mutex::Autolock autoLock(mLock);
120
121 if (mThread != NULL || mRunningLocally) {
122 return INVALID_OPERATION;
123 }
124
125 mThread = new LooperThread(this, canCallJava);
126
127 status_t err = mThread->run(
128 mName.empty() ? "ALooper" : mName.c_str(), priority);
129 if (err != OK) {
130 mThread.clear();
131 }
132
133 return err;
134 }
135
stop()136 status_t ALooper::stop() {
137 sp<LooperThread> thread;
138 bool runningLocally;
139
140 {
141 Mutex::Autolock autoLock(mLock);
142
143 thread = mThread;
144 runningLocally = mRunningLocally;
145 mThread.clear();
146 mRunningLocally = false;
147 }
148
149 if (thread == NULL && !runningLocally) {
150 return INVALID_OPERATION;
151 }
152
153 if (thread != NULL) {
154 thread->requestExit();
155 }
156
157 mQueueChangedCondition.signal();
158 {
159 Mutex::Autolock autoLock(mRepliesLock);
160 mRepliesCondition.broadcast();
161 }
162
163 if (!runningLocally && !thread->isCurrentThread()) {
164 // If not running locally and this thread _is_ the looper thread,
165 // the loop() function will return and never be called again.
166 thread->requestExitAndWait();
167 }
168
169 return OK;
170 }
171
post(const sp<AMessage> & msg,int64_t delayUs)172 void ALooper::post(const sp<AMessage> &msg, int64_t delayUs) {
173 Mutex::Autolock autoLock(mLock);
174
175 int64_t whenUs;
176 if (delayUs > 0) {
177 int64_t nowUs = getNowUs();
178 whenUs = (delayUs > INT64_MAX - nowUs ? INT64_MAX : nowUs + delayUs);
179
180 } else {
181 whenUs = getNowUs();
182 }
183
184 List<Event>::iterator it = mEventQueue.begin();
185 while (it != mEventQueue.end() && (*it).mWhenUs <= whenUs) {
186 ++it;
187 }
188
189 Event event;
190 event.mWhenUs = whenUs;
191 event.mMessage = msg;
192 event.mToken = nullptr;
193
194 if (it == mEventQueue.begin()) {
195 mQueueChangedCondition.signal();
196 }
197
198 mEventQueue.insert(it, event);
199 }
200
postUnique(const sp<AMessage> & msg,const sp<RefBase> & token,int64_t delayUs)201 status_t ALooper::postUnique(const sp<AMessage> &msg, const sp<RefBase> &token, int64_t delayUs) {
202 if (token == nullptr) {
203 return -EINVAL;
204 }
205 Mutex::Autolock autoLock(mLock);
206
207 int64_t whenUs;
208 if (delayUs > 0) {
209 int64_t nowUs = getNowUs();
210 whenUs = (delayUs > INT64_MAX - nowUs ? INT64_MAX : nowUs + delayUs);
211 } else {
212 whenUs = getNowUs();
213 }
214
215 // We only need to wake the loop up if we're rescheduling to the earliest event in the queue.
216 // This needs to be checked now, before we reschedule the message, in case this message is
217 // already at the beginning of the queue.
218 bool shouldAwakeLoop = mEventQueue.empty() || whenUs < mEventQueue.begin()->mWhenUs;
219
220 // Erase any previously-posted event with this token.
221 for (auto i = mEventQueue.begin(); i != mEventQueue.end();) {
222 if (i->mToken == token) {
223 i = mEventQueue.erase(i);
224 } else {
225 ++i;
226 }
227 }
228
229 // Find the insertion point for the rescheduled message.
230 List<Event>::iterator i = mEventQueue.begin();
231 while (i != mEventQueue.end() && i->mWhenUs <= whenUs) {
232 ++i;
233 }
234
235 Event event;
236 event.mWhenUs = whenUs;
237 event.mMessage = msg;
238 event.mToken = token;
239 mEventQueue.insert(i, event);
240
241 // If we rescheduled the event to be earlier than the first event, then we need to wake up the
242 // looper earlier than it was previously scheduled to be woken up. Otherwise, it can sleep until
243 // the previous wake-up time and then go to sleep again if needed.
244 if (shouldAwakeLoop){
245 mQueueChangedCondition.signal();
246 }
247 return OK;
248 }
249
loop()250 bool ALooper::loop() {
251
252 Event event;
253
254 {
255 Mutex::Autolock autoLock(mLock);
256 if (mThread == NULL && !mRunningLocally) {
257 return false;
258 }
259 if (mEventQueue.empty()) {
260 mQueueChangedCondition.wait(mLock);
261 return true;
262 }
263 int64_t whenUs = (*mEventQueue.begin()).mWhenUs;
264 int64_t nowUs = getNowUs();
265
266 if (whenUs > nowUs) {
267 int64_t delayUs = whenUs - nowUs;
268 if (delayUs > INT64_MAX / 1000) {
269 delayUs = INT64_MAX / 1000;
270 }
271 mQueueChangedCondition.waitRelative(mLock, delayUs * 1000ll);
272
273 return true;
274 }
275
276 event = *mEventQueue.begin();
277 mEventQueue.erase(mEventQueue.begin());
278 }
279
280 event.mMessage->deliver();
281
282 // NOTE: It's important to note that at this point our "ALooper" object
283 // may no longer exist (its final reference may have gone away while
284 // delivering the message). We have made sure, however, that loop()
285 // won't be called again.
286
287 return true;
288 }
289
290 // to be called by AMessage::postAndAwaitResponse only
createReplyToken()291 sp<AReplyToken> ALooper::createReplyToken() {
292 return new AReplyToken(this);
293 }
294
295 // to be called by AMessage::postAndAwaitResponse only
awaitResponse(const sp<AReplyToken> & replyToken,sp<AMessage> * response)296 status_t ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response) {
297 // return status in case we want to handle an interrupted wait
298 Mutex::Autolock autoLock(mRepliesLock);
299 CHECK(replyToken != NULL);
300 while (!replyToken->retrieveReply(response)) {
301 {
302 Mutex::Autolock autoLock(mLock);
303 if (mThread == NULL) {
304 return -ENOENT;
305 }
306 }
307 mRepliesCondition.wait(mRepliesLock);
308 }
309 return OK;
310 }
311
postReply(const sp<AReplyToken> & replyToken,const sp<AMessage> & reply)312 status_t ALooper::postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &reply) {
313 Mutex::Autolock autoLock(mRepliesLock);
314 status_t err = replyToken->setReply(reply);
315 if (err == OK) {
316 mRepliesCondition.broadcast();
317 }
318 return err;
319 }
320
321 } // namespace android
322