1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define USE_LOG SLAndroidLogLevel_Verbose
18
19 #include "sles_allinclusive.h"
20 #include "android_StreamPlayer.h"
21
22 #include <media/IStreamSource.h>
23 #include <media/IMediaPlayerService.h>
24 #include <media/stagefright/foundation/ADebug.h>
25 #include <binder/IPCThreadState.h>
26
27
28 //--------------------------------------------------------------------------------------------------
29 namespace android {
30
StreamSourceAppProxy(IAndroidBufferQueue * androidBufferQueue,const sp<CallbackProtector> & callbackProtector,StreamPlayer * player)31 StreamSourceAppProxy::StreamSourceAppProxy(
32 IAndroidBufferQueue *androidBufferQueue,
33 const sp<CallbackProtector> &callbackProtector,
34 // sp<StreamPlayer> would cause StreamPlayer's destructor to run during it's own
35 // construction. If you pass in a sp<> to 'this' inside a constructor, then first the
36 // refcount is increased from 0 to 1, then decreased from 1 to 0, which causes the object's
37 // destructor to run from inside it's own constructor.
38 StreamPlayer * /* const sp<StreamPlayer> & */ player) :
39 mBuffersHasBeenSet(false),
40 mAndroidBufferQueue(androidBufferQueue),
41 mCallbackProtector(callbackProtector),
42 mPlayer(player)
43 {
44 SL_LOGV("StreamSourceAppProxy::StreamSourceAppProxy()");
45 }
46
~StreamSourceAppProxy()47 StreamSourceAppProxy::~StreamSourceAppProxy() {
48 // FIXME make this an SL_LOGV later; this just proves that the bug is fixed
49 SL_LOGI("StreamSourceAppProxy::~StreamSourceAppProxy()");
50 disconnect();
51 }
52
53 const SLuint32 StreamSourceAppProxy::kItemProcessed[NB_BUFFEREVENT_ITEM_FIELDS] = {
54 SL_ANDROID_ITEMKEY_BUFFERQUEUEEVENT, // item key
55 sizeof(SLuint32), // item size
56 SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED // item data
57 };
58
59 //--------------------------------------------------
60 // IStreamSource implementation
setListener(const sp<IStreamListener> & listener)61 void StreamSourceAppProxy::setListener(const sp<IStreamListener> &listener) {
62 assert(listener != NULL);
63 Mutex::Autolock _l(mLock);
64 assert(mListener == NULL);
65 mListener = listener;
66 }
67
setBuffers(const Vector<sp<IMemory>> & buffers)68 void StreamSourceAppProxy::setBuffers(const Vector<sp<IMemory> > &buffers) {
69 Mutex::Autolock _l(mLock);
70 assert(!mBuffersHasBeenSet);
71 mBuffers = buffers;
72 mBuffersHasBeenSet = true;
73 }
74
onBufferAvailable(size_t index)75 void StreamSourceAppProxy::onBufferAvailable(size_t index) {
76 //SL_LOGD("StreamSourceAppProxy::onBufferAvailable(%d)", index);
77
78 {
79 Mutex::Autolock _l(mLock);
80 // assert not needed because if not set, size() will be zero and the CHECK_LT will also fail
81 // assert(mBuffersHasBeenSet);
82 CHECK_LT(index, mBuffers.size());
83 #if 0 // enable if needed for debugging
84 sp<IMemory> mem = mBuffers.itemAt(index);
85 SLAint64 length = (SLAint64) mem->size();
86 #endif
87 mAvailableBuffers.push_back(index);
88 //SL_LOGD("onBufferAvailable() now %d buffers available in queue", mAvailableBuffers.size());
89 }
90
91 // a new shared mem buffer is available: let's try to fill immediately
92 pullFromBuffQueue();
93 }
94
receivedCmd_l(IStreamListener::Command cmd,const sp<AMessage> & msg)95 void StreamSourceAppProxy::receivedCmd_l(IStreamListener::Command cmd, const sp<AMessage> &msg) {
96 if (mListener != 0) {
97 mListener->issueCommand(cmd, false /* synchronous */, msg);
98 }
99 }
100
receivedBuffer_l(size_t buffIndex,size_t buffLength)101 void StreamSourceAppProxy::receivedBuffer_l(size_t buffIndex, size_t buffLength) {
102 if (mListener != 0) {
103 mListener->queueBuffer(buffIndex, buffLength);
104 }
105 }
106
disconnect()107 void StreamSourceAppProxy::disconnect() {
108 Mutex::Autolock _l(mLock);
109 mListener.clear();
110 // Force binder to push the decremented reference count for sp<IStreamListener>.
111 // mediaserver and client both have sp<> to the other. When you decrement an sp<>
112 // reference count, binder doesn't push that to the other process immediately.
113 IPCThreadState::self()->flushCommands();
114 mBuffers.clear();
115 mBuffersHasBeenSet = false;
116 mAvailableBuffers.clear();
117 }
118
119 //--------------------------------------------------
120 // consumption from ABQ: pull from the ABQ, and push to shared memory (media server)
pullFromBuffQueue()121 void StreamSourceAppProxy::pullFromBuffQueue() {
122
123 if (android::CallbackProtector::enterCbIfOk(mCallbackProtector)) {
124
125 size_t bufferId;
126 void* bufferLoc;
127 size_t buffSize;
128
129 slAndroidBufferQueueCallback callback = NULL;
130 void* pBufferContext, *pBufferData, *callbackPContext = NULL;
131 AdvancedBufferHeader *oldFront = NULL;
132 uint32_t dataSize /* , dataUsed */;
133
134 // retrieve data from the buffer queue
135 interface_lock_exclusive(mAndroidBufferQueue);
136
137 // can this read operation cause us to call the buffer queue callback
138 // (either because there was a command with no data, or all the data has been consumed)
139 bool queueCallbackCandidate = false;
140
141 if (mAndroidBufferQueue->mState.count != 0) {
142 // SL_LOGD("nbBuffers in ABQ = %u, buffSize=%u",abq->mState.count, buffSize);
143 assert(mAndroidBufferQueue->mFront != mAndroidBufferQueue->mRear);
144
145 oldFront = mAndroidBufferQueue->mFront;
146 AdvancedBufferHeader *newFront = &oldFront[1];
147
148 // consume events when starting to read data from a buffer for the first time
149 if (oldFront->mDataSizeConsumed == 0) {
150 // note this code assumes at most one event per buffer; see IAndroidBufferQueue_Enqueue
151 if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_EOS) {
152 receivedCmd_l(IStreamListener::EOS);
153 // EOS has no associated data
154 queueCallbackCandidate = true;
155 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCONTINUITY) {
156 receivedCmd_l(IStreamListener::DISCONTINUITY);
157 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCON_NEWPTS) {
158 sp<AMessage> msg = new AMessage();
159 msg->setInt64(IStreamListener::kKeyResumeAtPTS,
160 (int64_t)oldFront->mItems.mTsCmdData.mPts);
161 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
162 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_FORMAT_CHANGE) {
163 sp<AMessage> msg = new AMessage();
164 // positive value for format change key makes the discontinuity "hard", see key def
165 msg->setInt32(IStreamListener::kKeyFormatChange, (int32_t) 1);
166 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
167 }
168 if (oldFront->mItems.mTsCmdData.mTsCmdCode & (ANDROID_MP2TSEVENT_DISCONTINUITY |
169 ANDROID_MP2TSEVENT_DISCON_NEWPTS | ANDROID_MP2TSEVENT_FORMAT_CHANGE)) {
170 const sp<StreamPlayer> player(mPlayer.promote());
171 if (player != NULL) {
172 // FIXME see note at onSeek
173 player->seek(ANDROID_UNKNOWN_TIME);
174 }
175 }
176 oldFront->mItems.mTsCmdData.mTsCmdCode = ANDROID_MP2TSEVENT_NONE;
177 }
178
179 {
180 // we're going to change the shared mem buffer queue, so lock it
181 Mutex::Autolock _l(mLock);
182 if (!mAvailableBuffers.empty()) {
183 bufferId = *mAvailableBuffers.begin();
184 CHECK_LT(bufferId, mBuffers.size());
185 sp<IMemory> mem = mBuffers.itemAt(bufferId);
186 bufferLoc = mem->pointer();
187 buffSize = mem->size();
188
189 char *pSrc = ((char*)oldFront->mDataBuffer) + oldFront->mDataSizeConsumed;
190 if (oldFront->mDataSizeConsumed + buffSize < oldFront->mDataSize) {
191 // more available than requested, copy as much as requested
192 // consume data: 1/ copy to given destination
193 memcpy(bufferLoc, pSrc, buffSize);
194 // 2/ keep track of how much has been consumed
195 oldFront->mDataSizeConsumed += buffSize;
196 // 3/ notify shared mem listener that new data is available
197 receivedBuffer_l(bufferId, buffSize);
198 mAvailableBuffers.erase(mAvailableBuffers.begin());
199 } else {
200 // requested as much available or more: consume the whole of the current
201 // buffer and move to the next
202 size_t consumed = oldFront->mDataSize - oldFront->mDataSizeConsumed;
203 //SL_LOGD("consuming rest of buffer: enqueueing=%u", consumed);
204 oldFront->mDataSizeConsumed = oldFront->mDataSize;
205
206 // move queue to next
207 if (newFront == &mAndroidBufferQueue->
208 mBufferArray[mAndroidBufferQueue->mNumBuffers + 1]) {
209 // reached the end, circle back
210 newFront = mAndroidBufferQueue->mBufferArray;
211 }
212 mAndroidBufferQueue->mFront = newFront;
213 mAndroidBufferQueue->mState.count--;
214 mAndroidBufferQueue->mState.index++;
215
216 if (consumed > 0) {
217 // consume data: 1/ copy to given destination
218 memcpy(bufferLoc, pSrc, consumed);
219 // 2/ keep track of how much has been consumed
220 // here nothing to do because we are done with this buffer
221 // 3/ notify StreamPlayer that new data is available
222 receivedBuffer_l(bufferId, consumed);
223 mAvailableBuffers.erase(mAvailableBuffers.begin());
224 }
225
226 // data has been consumed, and the buffer queue state has been updated
227 // we will notify the client if applicable
228 queueCallbackCandidate = true;
229 }
230 }
231
232 if (queueCallbackCandidate) {
233 if (mAndroidBufferQueue->mCallbackEventsMask &
234 SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED) {
235 callback = mAndroidBufferQueue->mCallback;
236 // save callback data while under lock
237 callbackPContext = mAndroidBufferQueue->mContext;
238 pBufferContext = (void *)oldFront->mBufferContext;
239 pBufferData = (void *)oldFront->mDataBuffer;
240 dataSize = oldFront->mDataSize;
241 // here a buffer is only dequeued when fully consumed
242 //dataUsed = oldFront->mDataSizeConsumed;
243 }
244 }
245 //SL_LOGD("%d buffers available after reading from queue", mAvailableBuffers.size());
246 if (!mAvailableBuffers.empty()) {
247 // there is still room in the shared memory, recheck later if we can pull
248 // data from the buffer queue and write it to shared memory
249 const sp<StreamPlayer> player(mPlayer.promote());
250 if (player != NULL) {
251 player->queueRefilled();
252 }
253 }
254 }
255
256 } else { // empty queue
257 SL_LOGD("ABQ empty, starving!");
258 }
259
260 interface_unlock_exclusive(mAndroidBufferQueue);
261
262 // notify client of buffer processed
263 if (NULL != callback) {
264 SLresult result = (*callback)(&mAndroidBufferQueue->mItf, callbackPContext,
265 pBufferContext, pBufferData, dataSize,
266 dataSize, /* dataUsed */
267 // no messages during playback other than marking the buffer as processed
268 (const SLAndroidBufferItem*)(&kItemProcessed) /* pItems */,
269 NB_BUFFEREVENT_ITEM_FIELDS *sizeof(SLuint32) /* itemsLength */ );
270 if (SL_RESULT_SUCCESS != result) {
271 // Reserved for future use
272 SL_LOGW("Unsuccessful result %d returned from AndroidBufferQueueCallback", result);
273 }
274 }
275
276 mCallbackProtector->exitCb();
277 } // enterCbIfOk
278 }
279
280
281 //--------------------------------------------------------------------------------------------------
StreamPlayer(AudioPlayback_Parameters * params,bool hasVideo,IAndroidBufferQueue * androidBufferQueue,const sp<CallbackProtector> & callbackProtector)282 StreamPlayer::StreamPlayer(AudioPlayback_Parameters* params, bool hasVideo,
283 IAndroidBufferQueue *androidBufferQueue, const sp<CallbackProtector> &callbackProtector) :
284 GenericMediaPlayer(params, hasVideo),
285 mAppProxy(new StreamSourceAppProxy(androidBufferQueue, callbackProtector, this)),
286 mStopForDestroyCompleted(false)
287 {
288 SL_LOGD("StreamPlayer::StreamPlayer()");
289
290 mPlaybackParams = *params;
291
292 }
293
~StreamPlayer()294 StreamPlayer::~StreamPlayer() {
295 SL_LOGD("StreamPlayer::~StreamPlayer()");
296 mAppProxy->disconnect();
297 }
298
299
onMessageReceived(const sp<AMessage> & msg)300 void StreamPlayer::onMessageReceived(const sp<AMessage> &msg) {
301 switch (msg->what()) {
302 case kWhatPullFromAbq:
303 onPullFromAndroidBufferQueue();
304 break;
305
306 case kWhatStopForDestroy:
307 onStopForDestroy();
308 break;
309
310 default:
311 GenericMediaPlayer::onMessageReceived(msg);
312 break;
313 }
314 }
315
316
preDestroy()317 void StreamPlayer::preDestroy() {
318 // FIXME NuPlayerDriver is currently not thread-safe, so stop() must be called by looper
319 (new AMessage(kWhatStopForDestroy, id()))->post();
320 {
321 Mutex::Autolock _l(mStopForDestroyLock);
322 while (!mStopForDestroyCompleted) {
323 mStopForDestroyCondition.wait(mStopForDestroyLock);
324 }
325 }
326 // GenericMediaPlayer::preDestroy will repeat some of what we've done, but that's benign
327 GenericMediaPlayer::preDestroy();
328 }
329
330
onStopForDestroy()331 void StreamPlayer::onStopForDestroy() {
332 if (mPlayer != 0) {
333 mPlayer->stop();
334 // causes CHECK failure in Nuplayer
335 //mPlayer->setDataSource(NULL);
336 mPlayer->setVideoSurface(NULL);
337 mPlayer->disconnect();
338 mPlayer.clear();
339 {
340 // FIXME ugh make this a method
341 Mutex::Autolock _l(mPreparedPlayerLock);
342 mPreparedPlayer.clear();
343 }
344 }
345 mStopForDestroyCompleted = true;
346 mStopForDestroyCondition.signal();
347 }
348
349
350 /**
351 * Asynchronously notify the player that the queue is ready to be pulled from.
352 */
queueRefilled()353 void StreamPlayer::queueRefilled() {
354 // async notification that the ABQ was refilled: the player should pull from the ABQ, and
355 // and push to shared memory (to the media server)
356 (new AMessage(kWhatPullFromAbq, id()))->post();
357 }
358
359
appClear_l()360 void StreamPlayer::appClear_l() {
361 // the user of StreamPlayer has cleared its AndroidBufferQueue:
362 // there's no clear() for the shared memory queue, so this is a no-op
363 }
364
365
366 //--------------------------------------------------
367 // Event handlers
onPrepare()368 void StreamPlayer::onPrepare() {
369 SL_LOGD("StreamPlayer::onPrepare()");
370 sp<IMediaPlayerService> mediaPlayerService(getMediaPlayerService());
371 if (mediaPlayerService != NULL) {
372 mPlayer = mediaPlayerService->create(getpid(), mPlayerClient /*IMediaPlayerClient*/,
373 mPlaybackParams.sessionId);
374 if (mPlayer == NULL) {
375 SL_LOGE("media player service failed to create player by app proxy");
376 } else if (mPlayer->setDataSource(mAppProxy /*IStreamSource*/) != NO_ERROR) {
377 SL_LOGE("setDataSource failed");
378 mPlayer.clear();
379 }
380 }
381 if (mPlayer == NULL) {
382 mStateFlags |= kFlagPreparedUnsuccessfully;
383 }
384 GenericMediaPlayer::onPrepare();
385 SL_LOGD("StreamPlayer::onPrepare() done");
386 }
387
388
onPlay()389 void StreamPlayer::onPlay() {
390 SL_LOGD("StreamPlayer::onPlay()");
391 // enqueue a message that will cause StreamAppProxy to consume from the queue (again if the
392 // player had starved the shared memory)
393 queueRefilled();
394
395 GenericMediaPlayer::onPlay();
396 }
397
398
onPullFromAndroidBufferQueue()399 void StreamPlayer::onPullFromAndroidBufferQueue() {
400 SL_LOGD("StreamPlayer::onPullFromAndroidBufferQueue()");
401 mAppProxy->pullFromBuffQueue();
402 }
403
404 } // namespace android
405