1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define USE_LOG SLAndroidLogLevel_Verbose
18
19 #include "sles_allinclusive.h"
20 #include "android_StreamPlayer.h"
21
22 #include <media/IStreamSource.h>
23 #include <media/IMediaPlayerService.h>
24 #include <media/stagefright/foundation/ADebug.h>
25 #include <media/stagefright/foundation/MediaKeys.h>
26 #include <binder/IPCThreadState.h>
27
28 #include <ATSParser.h>
29
30 //--------------------------------------------------------------------------------------------------
31 namespace android {
32
StreamSourceAppProxy(IAndroidBufferQueue * androidBufferQueue,const sp<CallbackProtector> & callbackProtector,StreamPlayer * player)33 StreamSourceAppProxy::StreamSourceAppProxy(
34 IAndroidBufferQueue *androidBufferQueue,
35 const sp<CallbackProtector> &callbackProtector,
36 // sp<StreamPlayer> would cause StreamPlayer's destructor to run during it's own
37 // construction. If you pass in a sp<> to 'this' inside a constructor, then first the
38 // refcount is increased from 0 to 1, then decreased from 1 to 0, which causes the object's
39 // destructor to run from inside it's own constructor.
40 StreamPlayer * /* const sp<StreamPlayer> & */ player) :
41 mBuffersHasBeenSet(false),
42 mAndroidBufferQueue(androidBufferQueue),
43 mCallbackProtector(callbackProtector),
44 mPlayer(player)
45 {
46 SL_LOGV("StreamSourceAppProxy::StreamSourceAppProxy()");
47 }
48
~StreamSourceAppProxy()49 StreamSourceAppProxy::~StreamSourceAppProxy() {
50 SL_LOGV("StreamSourceAppProxy::~StreamSourceAppProxy()");
51 disconnect();
52 }
53
54 const SLuint32 StreamSourceAppProxy::kItemProcessed[NB_BUFFEREVENT_ITEM_FIELDS] = {
55 SL_ANDROID_ITEMKEY_BUFFERQUEUEEVENT, // item key
56 sizeof(SLuint32), // item size
57 SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED // item data
58 };
59
60 //--------------------------------------------------
61 // IStreamSource implementation
setListener(const sp<IStreamListener> & listener)62 void StreamSourceAppProxy::setListener(const sp<IStreamListener> &listener) {
63 assert(listener != NULL);
64 Mutex::Autolock _l(mLock);
65 assert(mListener == NULL);
66 mListener = listener;
67 }
68
setBuffers(const Vector<sp<IMemory>> & buffers)69 void StreamSourceAppProxy::setBuffers(const Vector<sp<IMemory> > &buffers) {
70 Mutex::Autolock _l(mLock);
71 assert(!mBuffersHasBeenSet);
72 mBuffers = buffers;
73 mBuffersHasBeenSet = true;
74 }
75
onBufferAvailable(size_t index)76 void StreamSourceAppProxy::onBufferAvailable(size_t index) {
77 //SL_LOGD("StreamSourceAppProxy::onBufferAvailable(%d)", index);
78
79 {
80 Mutex::Autolock _l(mLock);
81 if (!mBuffersHasBeenSet) {
82 // no buffers available to push data to from the buffer queue, bail
83 return;
84 }
85 CHECK_LT(index, mBuffers.size());
86 #if 0 // enable if needed for debugging
87 sp<IMemory> mem = mBuffers.itemAt(index);
88 SLAint64 length = (SLAint64) mem->size();
89 #endif
90 mAvailableBuffers.push_back(index);
91 //SL_LOGD("onBufferAvailable() now %d buffers available in queue",
92 // mAvailableBuffers.size());
93 }
94
95 // a new shared mem buffer is available: let's try to fill immediately
96 pullFromBuffQueue();
97 }
98
receivedCmd_l(IStreamListener::Command cmd,const sp<AMessage> & msg)99 void StreamSourceAppProxy::receivedCmd_l(IStreamListener::Command cmd, const sp<AMessage> &msg) {
100 if (mListener != 0) {
101 mListener->issueCommand(cmd, false /* synchronous */, msg);
102 }
103 }
104
receivedBuffer_l(size_t buffIndex,size_t buffLength)105 void StreamSourceAppProxy::receivedBuffer_l(size_t buffIndex, size_t buffLength) {
106 if (mListener != 0) {
107 mListener->queueBuffer(buffIndex, buffLength);
108 }
109 }
110
disconnect()111 void StreamSourceAppProxy::disconnect() {
112 Mutex::Autolock _l(mLock);
113 mListener.clear();
114 // Force binder to push the decremented reference count for sp<IStreamListener>.
115 // mediaserver and client both have sp<> to the other. When you decrement an sp<>
116 // reference count, binder doesn't push that to the other process immediately.
117 IPCThreadState::self()->flushCommands();
118 mBuffers.clear();
119 mBuffersHasBeenSet = false;
120 mAvailableBuffers.clear();
121 }
122
123 //--------------------------------------------------
124 // consumption from ABQ: pull from the ABQ, and push to shared memory (media server)
pullFromBuffQueue()125 void StreamSourceAppProxy::pullFromBuffQueue() {
126
127 if (android::CallbackProtector::enterCbIfOk(mCallbackProtector)) {
128
129 size_t bufferId;
130 void* bufferLoc;
131 size_t buffSize;
132
133 slAndroidBufferQueueCallback callback = NULL;
134 void* pBufferContext, *pBufferData, *callbackPContext = NULL;
135 AdvancedBufferHeader *oldFront = NULL;
136 uint32_t dataSize /* , dataUsed */;
137
138 // retrieve data from the buffer queue
139 interface_lock_exclusive(mAndroidBufferQueue);
140
141 // can this read operation cause us to call the buffer queue callback
142 // (either because there was a command with no data, or all the data has been consumed)
143 bool queueCallbackCandidate = false;
144
145 if (mAndroidBufferQueue->mState.count != 0) {
146 // SL_LOGD("nbBuffers in ABQ = %u, buffSize=%u",abq->mState.count, buffSize);
147 assert(mAndroidBufferQueue->mFront != mAndroidBufferQueue->mRear);
148
149 oldFront = mAndroidBufferQueue->mFront;
150 AdvancedBufferHeader *newFront = &oldFront[1];
151
152 // consume events when starting to read data from a buffer for the first time
153 if (oldFront->mDataSizeConsumed == 0) {
154 // note this code assumes at most one event per buffer; see IAndroidBufferQueue_Enqueue
155 if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_EOS) {
156 receivedCmd_l(IStreamListener::EOS);
157 // EOS has no associated data
158 queueCallbackCandidate = true;
159 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCONTINUITY) {
160 receivedCmd_l(IStreamListener::DISCONTINUITY);
161 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCON_NEWPTS) {
162 sp<AMessage> msg = new AMessage();
163 msg->setInt64(kATSParserKeyResumeAtPTS,
164 (int64_t)oldFront->mItems.mTsCmdData.mPts);
165 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
166 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode
167 & ANDROID_MP2TSEVENT_FORMAT_CHANGE_FULL) {
168 sp<AMessage> msg = new AMessage();
169 msg->setInt32(
170 kIStreamListenerKeyDiscontinuityMask,
171 ATSParser::DISCONTINUITY_FORMATCHANGE);
172 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
173 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode
174 & ANDROID_MP2TSEVENT_FORMAT_CHANGE_VIDEO) {
175 sp<AMessage> msg = new AMessage();
176 msg->setInt32(
177 kIStreamListenerKeyDiscontinuityMask,
178 ATSParser::DISCONTINUITY_VIDEO_FORMAT);
179 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
180 }
181 // note that here we are intentionally only supporting
182 // ANDROID_MP2TSEVENT_FORMAT_CHANGE_VIDEO, see IAndroidBufferQueue.c
183
184 // some commands may introduce a time discontinuity, reevaluate position if needed
185 if (oldFront->mItems.mTsCmdData.mTsCmdCode & (ANDROID_MP2TSEVENT_DISCONTINUITY |
186 ANDROID_MP2TSEVENT_DISCON_NEWPTS | ANDROID_MP2TSEVENT_FORMAT_CHANGE_FULL)) {
187 const sp<StreamPlayer> player(mPlayer.promote());
188 if (player != NULL) {
189 // FIXME see note at onSeek
190 player->seek(ANDROID_UNKNOWN_TIME);
191 }
192 }
193 oldFront->mItems.mTsCmdData.mTsCmdCode = ANDROID_MP2TSEVENT_NONE;
194 }
195
196 {
197 // we're going to change the shared mem buffer queue, so lock it
198 Mutex::Autolock _l(mLock);
199 if (!mAvailableBuffers.empty()) {
200 bufferId = *mAvailableBuffers.begin();
201 CHECK_LT(bufferId, mBuffers.size());
202 sp<IMemory> mem = mBuffers.itemAt(bufferId);
203 bufferLoc = mem->unsecurePointer();
204 buffSize = mem->size();
205
206 char *pSrc = ((char*)oldFront->mDataBuffer) + oldFront->mDataSizeConsumed;
207 if (oldFront->mDataSizeConsumed + buffSize < oldFront->mDataSize) {
208 // more available than requested, copy as much as requested
209 // consume data: 1/ copy to given destination
210 memcpy(bufferLoc, pSrc, buffSize);
211 // 2/ keep track of how much has been consumed
212 oldFront->mDataSizeConsumed += buffSize;
213 // 3/ notify shared mem listener that new data is available
214 receivedBuffer_l(bufferId, buffSize);
215 mAvailableBuffers.erase(mAvailableBuffers.begin());
216 } else {
217 // requested as much available or more: consume the whole of the current
218 // buffer and move to the next
219 size_t consumed = oldFront->mDataSize - oldFront->mDataSizeConsumed;
220 //SL_LOGD("consuming rest of buffer: enqueueing=%u", consumed);
221 oldFront->mDataSizeConsumed = oldFront->mDataSize;
222
223 // move queue to next
224 if (newFront == &mAndroidBufferQueue->
225 mBufferArray[mAndroidBufferQueue->mNumBuffers + 1]) {
226 // reached the end, circle back
227 newFront = mAndroidBufferQueue->mBufferArray;
228 }
229 mAndroidBufferQueue->mFront = newFront;
230 mAndroidBufferQueue->mState.count--;
231 mAndroidBufferQueue->mState.index++;
232
233 if (consumed > 0) {
234 // consume data: 1/ copy to given destination
235 memcpy(bufferLoc, pSrc, consumed);
236 // 2/ keep track of how much has been consumed
237 // here nothing to do because we are done with this buffer
238 // 3/ notify StreamPlayer that new data is available
239 receivedBuffer_l(bufferId, consumed);
240 mAvailableBuffers.erase(mAvailableBuffers.begin());
241 }
242
243 // data has been consumed, and the buffer queue state has been updated
244 // we will notify the client if applicable
245 queueCallbackCandidate = true;
246 }
247 }
248
249 if (queueCallbackCandidate) {
250 if (mAndroidBufferQueue->mCallbackEventsMask &
251 SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED) {
252 callback = mAndroidBufferQueue->mCallback;
253 // save callback data while under lock
254 callbackPContext = mAndroidBufferQueue->mContext;
255 pBufferContext = (void *)oldFront->mBufferContext;
256 pBufferData = (void *)oldFront->mDataBuffer;
257 dataSize = oldFront->mDataSize;
258 // here a buffer is only dequeued when fully consumed
259 //dataUsed = oldFront->mDataSizeConsumed;
260 }
261 }
262 //SL_LOGD("%d buffers available after reading from queue", mAvailableBuffers.size());
263 if (!mAvailableBuffers.empty()) {
264 // there is still room in the shared memory, recheck later if we can pull
265 // data from the buffer queue and write it to shared memory
266 const sp<StreamPlayer> player(mPlayer.promote());
267 if (player != NULL) {
268 player->queueRefilled();
269 }
270 }
271 }
272
273 } else { // empty queue
274 SL_LOGD("ABQ empty, starving!");
275 }
276
277 interface_unlock_exclusive(mAndroidBufferQueue);
278
279 // notify client of buffer processed
280 if (NULL != callback) {
281 SLresult result = (*callback)(&mAndroidBufferQueue->mItf, callbackPContext,
282 pBufferContext, pBufferData, dataSize,
283 dataSize, /* dataUsed */
284 // no messages during playback other than marking the buffer as processed
285 (const SLAndroidBufferItem*)(&kItemProcessed) /* pItems */,
286 NB_BUFFEREVENT_ITEM_FIELDS *sizeof(SLuint32) /* itemsLength */ );
287 if (SL_RESULT_SUCCESS != result) {
288 // Reserved for future use
289 SL_LOGW("Unsuccessful result %d returned from AndroidBufferQueueCallback", result);
290 }
291 }
292
293 mCallbackProtector->exitCb();
294 } // enterCbIfOk
295 }
296
297
298 //--------------------------------------------------------------------------------------------------
StreamPlayer(const AudioPlayback_Parameters * params,bool hasVideo,IAndroidBufferQueue * androidBufferQueue,const sp<CallbackProtector> & callbackProtector)299 StreamPlayer::StreamPlayer(const AudioPlayback_Parameters* params, bool hasVideo,
300 IAndroidBufferQueue *androidBufferQueue, const sp<CallbackProtector> &callbackProtector) :
301 GenericMediaPlayer(params, hasVideo),
302 mAppProxy(new StreamSourceAppProxy(androidBufferQueue, callbackProtector, this)),
303 mStopForDestroyCompleted(false)
304 {
305 SL_LOGD("StreamPlayer::StreamPlayer()");
306 }
307
~StreamPlayer()308 StreamPlayer::~StreamPlayer() {
309 SL_LOGD("StreamPlayer::~StreamPlayer()");
310 mAppProxy->disconnect();
311 }
312
313
onMessageReceived(const sp<AMessage> & msg)314 void StreamPlayer::onMessageReceived(const sp<AMessage> &msg) {
315 switch (msg->what()) {
316 case kWhatPullFromAbq:
317 onPullFromAndroidBufferQueue();
318 break;
319
320 case kWhatStopForDestroy:
321 onStopForDestroy();
322 break;
323
324 default:
325 GenericMediaPlayer::onMessageReceived(msg);
326 break;
327 }
328 }
329
330
preDestroy()331 void StreamPlayer::preDestroy() {
332 // FIXME NuPlayerDriver is currently not thread-safe, so stop() must be called by looper
333 (new AMessage(kWhatStopForDestroy, this))->post();
334 {
335 Mutex::Autolock _l(mStopForDestroyLock);
336 while (!mStopForDestroyCompleted) {
337 mStopForDestroyCondition.wait(mStopForDestroyLock);
338 }
339 }
340 // GenericMediaPlayer::preDestroy will repeat some of what we've done, but that's benign
341 GenericMediaPlayer::preDestroy();
342 }
343
344
onStopForDestroy()345 void StreamPlayer::onStopForDestroy() {
346 if (mPlayer != 0) {
347 mPlayer->stop();
348 // causes CHECK failure in Nuplayer
349 //mPlayer->setDataSource(NULL);
350 mPlayer->setVideoSurfaceTexture(NULL);
351 mPlayer->disconnect();
352 mPlayer.clear();
353 {
354 // FIXME ugh make this a method
355 Mutex::Autolock _l(mPreparedPlayerLock);
356 mPreparedPlayer.clear();
357 }
358 }
359 {
360 Mutex::Autolock _l(mStopForDestroyLock);
361 mStopForDestroyCompleted = true;
362 }
363 mStopForDestroyCondition.signal();
364 }
365
366
367 /**
368 * Asynchronously notify the player that the queue is ready to be pulled from.
369 */
queueRefilled()370 void StreamPlayer::queueRefilled() {
371 // async notification that the ABQ was refilled: the player should pull from the ABQ, and
372 // and push to shared memory (to the media server)
373 (new AMessage(kWhatPullFromAbq, this))->post();
374 }
375
376
appClear_l()377 void StreamPlayer::appClear_l() {
378 // the user of StreamPlayer has cleared its AndroidBufferQueue:
379 // there's no clear() for the shared memory queue, so this is a no-op
380 }
381
382
383 //--------------------------------------------------
384 // Event handlers
onPrepare()385 void StreamPlayer::onPrepare() {
386 SL_LOGD("StreamPlayer::onPrepare()");
387 sp<IMediaPlayerService> mediaPlayerService(getMediaPlayerService());
388 if (mediaPlayerService != NULL) {
389 mPlayer = mediaPlayerService->create(mPlayerClient /*IMediaPlayerClient*/,
390 mPlaybackParams.sessionId);
391 if (mPlayer == NULL) {
392 SL_LOGE("media player service failed to create player by app proxy");
393 } else if (mPlayer->setDataSource(static_cast<sp<IStreamSource>>(mAppProxy)) !=
394 NO_ERROR) {
395 SL_LOGE("setDataSource failed");
396 mPlayer.clear();
397 }
398 }
399 if (mPlayer == NULL) {
400 mStateFlags |= kFlagPreparedUnsuccessfully;
401 }
402 GenericMediaPlayer::onPrepare();
403 SL_LOGD("StreamPlayer::onPrepare() done");
404 }
405
406
onPlay()407 void StreamPlayer::onPlay() {
408 SL_LOGD("StreamPlayer::onPlay()");
409 // enqueue a message that will cause StreamAppProxy to consume from the queue (again if the
410 // player had starved the shared memory)
411 queueRefilled();
412
413 GenericMediaPlayer::onPlay();
414 }
415
416
onPullFromAndroidBufferQueue()417 void StreamPlayer::onPullFromAndroidBufferQueue() {
418 SL_LOGD("StreamPlayer::onPullFromAndroidBufferQueue()");
419 mAppProxy->pullFromBuffQueue();
420 }
421
422 } // namespace android
423