1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "LiveSession"
19 #include <utils/Log.h>
20
21 #include "LiveSession.h"
22 #include "HTTPDownloader.h"
23 #include "M3UParser.h"
24 #include "PlaylistFetcher.h"
25
26 #include <mpeg2ts/AnotherPacketSource.h>
27
28 #include <cutils/properties.h>
29 #include <media/MediaHTTPService.h>
30 #include <media/stagefright/foundation/ABuffer.h>
31 #include <media/stagefright/foundation/ADebug.h>
32 #include <media/stagefright/foundation/AMessage.h>
33 #include <media/stagefright/foundation/AUtils.h>
34 #include <media/stagefright/MediaDefs.h>
35 #include <media/stagefright/MetaData.h>
36 #include <media/stagefright/Utils.h>
37 #include <media/stagefright/FoundationUtils.h>
38
39 #include <utils/Mutex.h>
40
41 #include <ctype.h>
42 #include <inttypes.h>
43
44 namespace android {
45
46 // static
47 // Bandwidth Switch Mark Defaults
48 const int64_t LiveSession::kUpSwitchMarkUs = 15000000LL;
49 const int64_t LiveSession::kDownSwitchMarkUs = 20000000LL;
50 const int64_t LiveSession::kUpSwitchMarginUs = 5000000LL;
51 const int64_t LiveSession::kResumeThresholdUs = 100000LL;
52
53 //TODO: redefine this mark to a fair value
54 // default buffer underflow mark
55 static const int kUnderflowMarkMs = 1000; // 1 second
56
57 struct LiveSession::BandwidthEstimator : public RefBase {
58 BandwidthEstimator();
59
60 void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
61 bool estimateBandwidth(
62 int32_t *bandwidth,
63 bool *isStable = NULL,
64 int32_t *shortTermBps = NULL);
65
66 private:
67 // Bandwidth estimation parameters
68 static const int32_t kShortTermBandwidthItems = 3;
69 static const int32_t kMinBandwidthHistoryItems = 20;
70 static const int64_t kMinBandwidthHistoryWindowUs = 5000000LL; // 5 sec
71 static const int64_t kMaxBandwidthHistoryWindowUs = 30000000LL; // 30 sec
72 static const int64_t kMaxBandwidthHistoryAgeUs = 60000000LL; // 60 sec
73
74 struct BandwidthEntry {
75 int64_t mTimestampUs;
76 int64_t mDelayUs;
77 size_t mNumBytes;
78 };
79
80 Mutex mLock;
81 List<BandwidthEntry> mBandwidthHistory;
82 List<int32_t> mPrevEstimates;
83 int32_t mShortTermEstimate;
84 bool mHasNewSample;
85 bool mIsStable;
86 int64_t mTotalTransferTimeUs;
87 size_t mTotalTransferBytes;
88
89 DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator);
90 };
91
BandwidthEstimator()92 LiveSession::BandwidthEstimator::BandwidthEstimator() :
93 mShortTermEstimate(0),
94 mHasNewSample(false),
95 mIsStable(true),
96 mTotalTransferTimeUs(0),
97 mTotalTransferBytes(0) {
98 }
99
addBandwidthMeasurement(size_t numBytes,int64_t delayUs)100 void LiveSession::BandwidthEstimator::addBandwidthMeasurement(
101 size_t numBytes, int64_t delayUs) {
102 AutoMutex autoLock(mLock);
103
104 int64_t nowUs = ALooper::GetNowUs();
105 BandwidthEntry entry;
106 entry.mTimestampUs = nowUs;
107 entry.mDelayUs = delayUs;
108 entry.mNumBytes = numBytes;
109 mTotalTransferTimeUs += delayUs;
110 mTotalTransferBytes += numBytes;
111 mBandwidthHistory.push_back(entry);
112 mHasNewSample = true;
113
114 // Remove no more than 10% of total transfer time at a time
115 // to avoid sudden jump on bandwidth estimation. There might
116 // be long blocking reads that takes up signification time,
117 // we have to keep a longer window in that case.
118 int64_t bandwidthHistoryWindowUs = mTotalTransferTimeUs * 9 / 10;
119 if (bandwidthHistoryWindowUs < kMinBandwidthHistoryWindowUs) {
120 bandwidthHistoryWindowUs = kMinBandwidthHistoryWindowUs;
121 } else if (bandwidthHistoryWindowUs > kMaxBandwidthHistoryWindowUs) {
122 bandwidthHistoryWindowUs = kMaxBandwidthHistoryWindowUs;
123 }
124 // trim old samples, keeping at least kMaxBandwidthHistoryItems samples,
125 // and total transfer time at least kMaxBandwidthHistoryWindowUs.
126 while (mBandwidthHistory.size() > kMinBandwidthHistoryItems) {
127 List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
128 // remove sample if either absolute age or total transfer time is
129 // over kMaxBandwidthHistoryWindowUs
130 if (nowUs - it->mTimestampUs < kMaxBandwidthHistoryAgeUs &&
131 mTotalTransferTimeUs - it->mDelayUs < bandwidthHistoryWindowUs) {
132 break;
133 }
134 mTotalTransferTimeUs -= it->mDelayUs;
135 mTotalTransferBytes -= it->mNumBytes;
136 mBandwidthHistory.erase(mBandwidthHistory.begin());
137 }
138 }
139
estimateBandwidth(int32_t * bandwidthBps,bool * isStable,int32_t * shortTermBps)140 bool LiveSession::BandwidthEstimator::estimateBandwidth(
141 int32_t *bandwidthBps, bool *isStable, int32_t *shortTermBps) {
142 AutoMutex autoLock(mLock);
143
144 if (mBandwidthHistory.size() < 2) {
145 return false;
146 }
147
148 if (!mHasNewSample) {
149 *bandwidthBps = *(--mPrevEstimates.end());
150 if (isStable) {
151 *isStable = mIsStable;
152 }
153 if (shortTermBps) {
154 *shortTermBps = mShortTermEstimate;
155 }
156 return true;
157 }
158
159 *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs);
160 mPrevEstimates.push_back(*bandwidthBps);
161 while (mPrevEstimates.size() > 3) {
162 mPrevEstimates.erase(mPrevEstimates.begin());
163 }
164 mHasNewSample = false;
165
166 int64_t totalTimeUs = 0;
167 size_t totalBytes = 0;
168 if (mBandwidthHistory.size() >= kShortTermBandwidthItems) {
169 List<BandwidthEntry>::iterator it = --mBandwidthHistory.end();
170 for (size_t i = 0; i < kShortTermBandwidthItems; i++, it--) {
171 totalTimeUs += it->mDelayUs;
172 totalBytes += it->mNumBytes;
173 }
174 }
175 mShortTermEstimate = totalTimeUs > 0 ?
176 (totalBytes * 8E6 / totalTimeUs) : *bandwidthBps;
177 if (shortTermBps) {
178 *shortTermBps = mShortTermEstimate;
179 }
180
181 int64_t minEstimate = -1, maxEstimate = -1;
182 List<int32_t>::iterator it;
183 for (it = mPrevEstimates.begin(); it != mPrevEstimates.end(); it++) {
184 int32_t estimate = *it;
185 if (minEstimate < 0 || minEstimate > estimate) {
186 minEstimate = estimate;
187 }
188 if (maxEstimate < 0 || maxEstimate < estimate) {
189 maxEstimate = estimate;
190 }
191 }
192 // consider it stable if long-term average is not jumping a lot
193 // and short-term average is not much lower than long-term average
194 mIsStable = (maxEstimate <= minEstimate * 4 / 3)
195 && mShortTermEstimate > minEstimate * 7 / 10;
196 if (isStable) {
197 *isStable = mIsStable;
198 }
199
200 #if 0
201 {
202 char dumpStr[1024] = {0};
203 size_t itemIdx = 0;
204 size_t histSize = mBandwidthHistory.size();
205 sprintf(dumpStr, "estimate bps=%d stable=%d history (n=%d): {",
206 *bandwidthBps, mIsStable, histSize);
207 List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
208 for (; it != mBandwidthHistory.end(); ++it) {
209 if (itemIdx > 50) {
210 sprintf(dumpStr + strlen(dumpStr),
211 "...(%zd more items)... }", histSize - itemIdx);
212 break;
213 }
214 sprintf(dumpStr + strlen(dumpStr), "%dk/%.3fs%s",
215 it->mNumBytes / 1024,
216 (double)it->mDelayUs * 1.0e-6,
217 (it == (--mBandwidthHistory.end())) ? "}" : ", ");
218 itemIdx++;
219 }
220 ALOGE(dumpStr);
221 }
222 #endif
223 return true;
224 }
225
226 //static
getKeyForStream(StreamType type)227 const char *LiveSession::getKeyForStream(StreamType type) {
228 switch (type) {
229 case STREAMTYPE_VIDEO:
230 return "timeUsVideo";
231 case STREAMTYPE_AUDIO:
232 return "timeUsAudio";
233 case STREAMTYPE_SUBTITLES:
234 return "timeUsSubtitle";
235 case STREAMTYPE_METADATA:
236 return "timeUsMetadata"; // unused
237 default:
238 TRESPASS();
239 }
240 return NULL;
241 }
242
243 //static
getNameForStream(StreamType type)244 const char *LiveSession::getNameForStream(StreamType type) {
245 switch (type) {
246 case STREAMTYPE_VIDEO:
247 return "video";
248 case STREAMTYPE_AUDIO:
249 return "audio";
250 case STREAMTYPE_SUBTITLES:
251 return "subs";
252 case STREAMTYPE_METADATA:
253 return "metadata";
254 default:
255 break;
256 }
257 return "unknown";
258 }
259
260 //static
getSourceTypeForStream(StreamType type)261 ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) {
262 switch (type) {
263 case STREAMTYPE_VIDEO:
264 return ATSParser::VIDEO;
265 case STREAMTYPE_AUDIO:
266 return ATSParser::AUDIO;
267 case STREAMTYPE_METADATA:
268 return ATSParser::META;
269 case STREAMTYPE_SUBTITLES:
270 default:
271 TRESPASS();
272 }
273 return ATSParser::NUM_SOURCE_TYPES; // should not reach here
274 }
275
LiveSession(const sp<AMessage> & notify,uint32_t flags,const sp<MediaHTTPService> & httpService)276 LiveSession::LiveSession(
277 const sp<AMessage> ¬ify, uint32_t flags,
278 const sp<MediaHTTPService> &httpService)
279 : mNotify(notify),
280 mFlags(flags),
281 mHTTPService(httpService),
282 mBuffering(false),
283 mInPreparationPhase(true),
284 mPollBufferingGeneration(0),
285 mPrevBufferPercentage(-1),
286 mCurBandwidthIndex(-1),
287 mOrigBandwidthIndex(-1),
288 mLastBandwidthBps(-1LL),
289 mLastBandwidthStable(false),
290 mBandwidthEstimator(new BandwidthEstimator()),
291 mMaxWidth(720),
292 mMaxHeight(480),
293 mStreamMask(0),
294 mNewStreamMask(0),
295 mSwapMask(0),
296 mSwitchGeneration(0),
297 mSubtitleGeneration(0),
298 mLastDequeuedTimeUs(0LL),
299 mRealTimeBaseUs(0LL),
300 mReconfigurationInProgress(false),
301 mSwitchInProgress(false),
302 mUpSwitchMark(kUpSwitchMarkUs),
303 mDownSwitchMark(kDownSwitchMarkUs),
304 mUpSwitchMargin(kUpSwitchMarginUs),
305 mFirstTimeUsValid(false),
306 mFirstTimeUs(0),
307 mLastSeekTimeUs(0),
308 mHasMetadata(false) {
309 mStreams[kAudioIndex] = StreamItem("audio");
310 mStreams[kVideoIndex] = StreamItem("video");
311 mStreams[kSubtitleIndex] = StreamItem("subtitles");
312
313 for (size_t i = 0; i < kNumSources; ++i) {
314 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
315 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
316 }
317 }
318
~LiveSession()319 LiveSession::~LiveSession() {
320 if (mFetcherLooper != NULL) {
321 mFetcherLooper->stop();
322 }
323 }
324
calculateMediaTimeUs(int64_t firstTimeUs,int64_t timeUs,int32_t discontinuitySeq)325 int64_t LiveSession::calculateMediaTimeUs(
326 int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) {
327 if (timeUs >= firstTimeUs) {
328 timeUs -= firstTimeUs;
329 } else {
330 timeUs = 0;
331 }
332 timeUs += mLastSeekTimeUs;
333 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
334 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
335 }
336 return timeUs;
337 }
338
dequeueAccessUnit(StreamType stream,sp<ABuffer> * accessUnit)339 status_t LiveSession::dequeueAccessUnit(
340 StreamType stream, sp<ABuffer> *accessUnit) {
341 status_t finalResult = OK;
342 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
343
344 ssize_t streamIdx = typeToIndex(stream);
345 if (streamIdx < 0) {
346 return BAD_VALUE;
347 }
348 const char *streamStr = getNameForStream(stream);
349 // Do not let client pull data if we don't have data packets yet.
350 // We might only have a format discontinuity queued without data.
351 // When NuPlayerDecoder dequeues the format discontinuity, it will
352 // immediately try to getFormat. If we return NULL, NuPlayerDecoder
353 // thinks it can do seamless change, so will not shutdown decoder.
354 // When the actual format arrives, it can't handle it and get stuck.
355 if (!packetSource->hasDataBufferAvailable(&finalResult)) {
356 ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)",
357 streamStr, finalResult);
358
359 if (finalResult == OK) {
360 return -EAGAIN;
361 } else {
362 return finalResult;
363 }
364 }
365
366 // Let the client dequeue as long as we have buffers available
367 // Do not make pause/resume decisions here.
368
369 status_t err = packetSource->dequeueAccessUnit(accessUnit);
370
371 if (err == INFO_DISCONTINUITY) {
372 // adaptive streaming, discontinuities in the playlist
373 int32_t type;
374 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
375
376 sp<AMessage> extra;
377 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
378 extra.clear();
379 }
380
381 ALOGI("[%s] read discontinuity of type %d, extra = %s",
382 streamStr,
383 type,
384 extra == NULL ? "NULL" : extra->debugString().c_str());
385 } else if (err == OK) {
386
387 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
388 int64_t timeUs, originalTimeUs;
389 int32_t discontinuitySeq = 0;
390 StreamItem& strm = mStreams[streamIdx];
391 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs));
392 originalTimeUs = timeUs;
393 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
394 if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) {
395 int64_t offsetTimeUs;
396 if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
397 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq);
398 } else {
399 offsetTimeUs = 0;
400 }
401
402 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0
403 && strm.mLastDequeuedTimeUs >= 0) {
404 int64_t firstTimeUs;
405 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
406 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
407 offsetTimeUs += strm.mLastSampleDurationUs;
408 } else {
409 offsetTimeUs += strm.mLastSampleDurationUs;
410 }
411
412 mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs);
413 strm.mCurDiscontinuitySeq = discontinuitySeq;
414 }
415
416 int32_t discard = 0;
417 int64_t firstTimeUs;
418 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
419 int64_t durUs; // approximate sample duration
420 if (timeUs > strm.mLastDequeuedTimeUs) {
421 durUs = timeUs - strm.mLastDequeuedTimeUs;
422 } else {
423 durUs = strm.mLastDequeuedTimeUs - timeUs;
424 }
425 strm.mLastSampleDurationUs = durUs;
426 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
427 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
428 firstTimeUs = timeUs;
429 } else {
430 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
431 firstTimeUs = timeUs;
432 }
433
434 strm.mLastDequeuedTimeUs = timeUs;
435 timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq);
436
437 ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us",
438 streamStr, (long long)timeUs, (long long)originalTimeUs);
439 (*accessUnit)->meta()->setInt64("timeUs", timeUs);
440 mLastDequeuedTimeUs = timeUs;
441 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
442 } else if (stream == STREAMTYPE_SUBTITLES) {
443 int32_t subtitleGeneration;
444 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
445 && subtitleGeneration != mSubtitleGeneration) {
446 return -EAGAIN;
447 };
448 (*accessUnit)->meta()->setInt32(
449 "track-index", mPlaylist->getSelectedIndex());
450 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
451 } else if (stream == STREAMTYPE_METADATA) {
452 HLSTime mdTime((*accessUnit)->meta());
453 if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) {
454 packetSource->requeueAccessUnit((*accessUnit));
455 return -EAGAIN;
456 } else {
457 int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq);
458 int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq);
459 (*accessUnit)->meta()->setInt64("timeUs", timeUs);
460 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
461 }
462 }
463 } else {
464 ALOGI("[%s] encountered error %d", streamStr, err);
465 }
466
467 return err;
468 }
469
getStreamFormatMeta(StreamType stream,sp<MetaData> * meta)470 status_t LiveSession::getStreamFormatMeta(StreamType stream, sp<MetaData> *meta) {
471 if (!(mStreamMask & stream)) {
472 return UNKNOWN_ERROR;
473 }
474
475 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
476
477 *meta = packetSource->getFormat();
478
479 if (*meta == NULL) {
480 return -EWOULDBLOCK;
481 }
482
483 if (stream == STREAMTYPE_AUDIO) {
484 // set AAC input buffer size to 32K bytes (256kbps x 1sec)
485 (*meta)->setInt32(kKeyMaxInputSize, 32 * 1024);
486 } else if (stream == STREAMTYPE_VIDEO) {
487 (*meta)->setInt32(kKeyMaxWidth, mMaxWidth);
488 (*meta)->setInt32(kKeyMaxHeight, mMaxHeight);
489 }
490
491 return OK;
492 }
493
getHTTPDownloader()494 sp<HTTPDownloader> LiveSession::getHTTPDownloader() {
495 return new HTTPDownloader(mHTTPService, mExtraHeaders);
496 }
497
setBufferingSettings(const BufferingSettings & buffering)498 void LiveSession::setBufferingSettings(
499 const BufferingSettings &buffering) {
500 sp<AMessage> msg = new AMessage(kWhatSetBufferingSettings, this);
501 writeToAMessage(msg, buffering);
502 msg->post();
503 }
504
connectAsync(const char * url,const KeyedVector<String8,String8> * headers)505 void LiveSession::connectAsync(
506 const char *url, const KeyedVector<String8, String8> *headers) {
507 sp<AMessage> msg = new AMessage(kWhatConnect, this);
508 msg->setString("url", url);
509
510 if (headers != NULL) {
511 msg->setPointer(
512 "headers",
513 new KeyedVector<String8, String8>(*headers));
514 }
515
516 msg->post();
517 }
518
disconnect()519 status_t LiveSession::disconnect() {
520 sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
521
522 sp<AMessage> response;
523 status_t err = msg->postAndAwaitResponse(&response);
524
525 return err;
526 }
527
seekTo(int64_t timeUs,MediaPlayerSeekMode mode)528 status_t LiveSession::seekTo(int64_t timeUs, MediaPlayerSeekMode mode) {
529 sp<AMessage> msg = new AMessage(kWhatSeek, this);
530 msg->setInt64("timeUs", timeUs);
531 msg->setInt32("mode", mode);
532
533 sp<AMessage> response;
534 status_t err = msg->postAndAwaitResponse(&response);
535
536 return err;
537 }
538
checkSwitchProgress(sp<AMessage> & stopParams,int64_t delayUs,bool * needResumeUntil)539 bool LiveSession::checkSwitchProgress(
540 sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) {
541 AString newUri;
542 CHECK(stopParams->findString("uri", &newUri));
543
544 *needResumeUntil = false;
545 sp<AMessage> firstNewMeta[kMaxStreams];
546 for (size_t i = 0; i < kMaxStreams; ++i) {
547 StreamType stream = indexToType(i);
548 if (!(mSwapMask & mNewStreamMask & stream)
549 || (mStreams[i].mNewUri != newUri)) {
550 continue;
551 }
552 if (stream == STREAMTYPE_SUBTITLES) {
553 continue;
554 }
555 sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i);
556
557 // First, get latest dequeued meta, which is where the decoder is at.
558 // (when upswitching, we take the meta after a certain delay, so that
559 // the decoder is left with some cushion)
560 sp<AMessage> lastDequeueMeta, lastEnqueueMeta;
561 if (delayUs > 0) {
562 lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs);
563 if (lastDequeueMeta == NULL) {
564 // this means we don't have enough cushion, try again later
565 ALOGV("[%s] up switching failed due to insufficient buffer",
566 getNameForStream(stream));
567 return false;
568 }
569 } else {
570 // It's okay for lastDequeueMeta to be NULL here, it means the
571 // decoder hasn't even started dequeueing
572 lastDequeueMeta = source->getLatestDequeuedMeta();
573 }
574 // Then, trim off packets at beginning of mPacketSources2 that's before
575 // the latest dequeued time. These samples are definitely too late.
576 firstNewMeta[i] = mPacketSources2.editValueAt(i)
577 ->trimBuffersBeforeMeta(lastDequeueMeta);
578
579 // Now firstNewMeta[i] is the first sample after the trim.
580 // If it's NULL, we failed because dequeue already past all samples
581 // in mPacketSource2, we have to try again.
582 if (firstNewMeta[i] == NULL) {
583 HLSTime dequeueTime(lastDequeueMeta);
584 ALOGV("[%s] dequeue time (%d, %lld) past start time",
585 getNameForStream(stream),
586 dequeueTime.mSeq, (long long) dequeueTime.mTimeUs);
587 return false;
588 }
589
590 // Otherwise, we check if mPacketSources2 overlaps with what old fetcher
591 // already fetched, and see if we need to resumeUntil
592 lastEnqueueMeta = source->getLatestEnqueuedMeta();
593 // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity
594 // boundary, no need to resume as the content will look different anyways
595 if (lastEnqueueMeta != NULL) {
596 HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]);
597
598 // no need to resume old fetcher if new fetcher started in different
599 // discontinuity sequence, as the content will look different.
600 *needResumeUntil |= (startTime.mSeq == lastTime.mSeq
601 && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs);
602
603 // update the stopTime for resumeUntil
604 stopParams->setInt32("discontinuitySeq", startTime.mSeq);
605 stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs);
606 }
607 }
608
609 // if we're here, it means dequeue progress hasn't passed some samples in
610 // mPacketSource2, we can trim off the excess in mPacketSource.
611 // (old fetcher might still need to resumeUntil the start time of new fetcher)
612 for (size_t i = 0; i < kMaxStreams; ++i) {
613 StreamType stream = indexToType(i);
614 if (!(mSwapMask & mNewStreamMask & stream)
615 || (newUri != mStreams[i].mNewUri)
616 || stream == STREAMTYPE_SUBTITLES) {
617 continue;
618 }
619 mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]);
620 }
621
622 // no resumeUntil if already underflow
623 *needResumeUntil &= !mBuffering;
624
625 return true;
626 }
627
onMessageReceived(const sp<AMessage> & msg)628 void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
629 switch (msg->what()) {
630 case kWhatSetBufferingSettings:
631 {
632 readFromAMessage(msg, &mBufferingSettings);
633 break;
634 }
635
636 case kWhatConnect:
637 {
638 onConnect(msg);
639 break;
640 }
641
642 case kWhatDisconnect:
643 {
644 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
645
646 if (mReconfigurationInProgress) {
647 break;
648 }
649
650 finishDisconnect();
651 break;
652 }
653
654 case kWhatSeek:
655 {
656 if (mReconfigurationInProgress) {
657 msg->post(50000);
658 break;
659 }
660
661 CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
662 mSeekReply = new AMessage;
663
664 onSeek(msg);
665 break;
666 }
667
668 case kWhatFetcherNotify:
669 {
670 int32_t what;
671 CHECK(msg->findInt32("what", &what));
672
673 switch (what) {
674 case PlaylistFetcher::kWhatStarted:
675 break;
676 case PlaylistFetcher::kWhatPaused:
677 case PlaylistFetcher::kWhatStopped:
678 {
679 AString uri;
680 CHECK(msg->findString("uri", &uri));
681 ssize_t index = mFetcherInfos.indexOfKey(uri);
682 if (index < 0) {
683 // ignore msgs from fetchers that's already gone
684 break;
685 }
686
687 ALOGV("fetcher-%d %s",
688 mFetcherInfos[index].mFetcher->getFetcherID(),
689 what == PlaylistFetcher::kWhatPaused ?
690 "paused" : "stopped");
691
692 if (what == PlaylistFetcher::kWhatStopped) {
693 mFetcherLooper->unregisterHandler(
694 mFetcherInfos[index].mFetcher->id());
695 mFetcherInfos.removeItemsAt(index);
696 } else if (what == PlaylistFetcher::kWhatPaused) {
697 int32_t seekMode;
698 CHECK(msg->findInt32("seekMode", &seekMode));
699 for (size_t i = 0; i < kMaxStreams; ++i) {
700 if (mStreams[i].mUri == uri) {
701 mStreams[i].mSeekMode = (SeekMode) seekMode;
702 }
703 }
704 }
705
706 if (mContinuation != NULL) {
707 CHECK_GT(mContinuationCounter, 0u);
708 if (--mContinuationCounter == 0) {
709 mContinuation->post();
710 }
711 ALOGV("%zu fetcher(s) left", mContinuationCounter);
712 }
713 break;
714 }
715
716 case PlaylistFetcher::kWhatDurationUpdate:
717 {
718 AString uri;
719 CHECK(msg->findString("uri", &uri));
720
721 int64_t durationUs;
722 CHECK(msg->findInt64("durationUs", &durationUs));
723
724 ssize_t index = mFetcherInfos.indexOfKey(uri);
725 if (index >= 0) {
726 FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
727 info->mDurationUs = durationUs;
728 }
729 break;
730 }
731
732 case PlaylistFetcher::kWhatTargetDurationUpdate:
733 {
734 int64_t targetDurationUs;
735 CHECK(msg->findInt64("targetDurationUs", &targetDurationUs));
736 mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4);
737 mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4);
738 mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs);
739 break;
740 }
741
742 case PlaylistFetcher::kWhatError:
743 {
744 status_t err;
745 CHECK(msg->findInt32("err", &err));
746
747 ALOGE("XXX Received error %d from PlaylistFetcher.", err);
748
749 // handle EOS on subtitle tracks independently
750 AString uri;
751 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
752 ssize_t i = mFetcherInfos.indexOfKey(uri);
753 if (i >= 0) {
754 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
755 if (fetcher != NULL) {
756 uint32_t type = fetcher->getStreamTypeMask();
757 if (type == STREAMTYPE_SUBTITLES) {
758 mPacketSources.valueFor(
759 STREAMTYPE_SUBTITLES)->signalEOS(err);;
760 break;
761 }
762 }
763 }
764 }
765
766 // remember the failure index (as mCurBandwidthIndex will be restored
767 // after cancelBandwidthSwitch()), and record last fail time
768 size_t failureIndex = mCurBandwidthIndex;
769 mBandwidthItems.editItemAt(
770 failureIndex).mLastFailureUs = ALooper::GetNowUs();
771
772 if (mSwitchInProgress) {
773 // if error happened when we switch to a variant, try fallback
774 // to other variant to save the session
775 if (tryBandwidthFallback()) {
776 break;
777 }
778 }
779
780 if (mInPreparationPhase) {
781 postPrepared(err);
782 }
783
784 cancelBandwidthSwitch();
785
786 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
787
788 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
789
790 mPacketSources.valueFor(
791 STREAMTYPE_SUBTITLES)->signalEOS(err);
792
793 postError(err);
794 break;
795 }
796
797 case PlaylistFetcher::kWhatStopReached:
798 {
799 ALOGV("kWhatStopReached");
800
801 AString oldUri;
802 CHECK(msg->findString("uri", &oldUri));
803
804 ssize_t index = mFetcherInfos.indexOfKey(oldUri);
805 if (index < 0) {
806 break;
807 }
808
809 tryToFinishBandwidthSwitch(oldUri);
810 break;
811 }
812
813 case PlaylistFetcher::kWhatStartedAt:
814 {
815 int32_t switchGeneration;
816 CHECK(msg->findInt32("switchGeneration", &switchGeneration));
817
818 ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d",
819 switchGeneration, mSwitchGeneration);
820
821 if (switchGeneration != mSwitchGeneration) {
822 break;
823 }
824
825 AString uri;
826 CHECK(msg->findString("uri", &uri));
827
828 // mark new fetcher mToBeResumed
829 ssize_t index = mFetcherInfos.indexOfKey(uri);
830 if (index >= 0) {
831 mFetcherInfos.editValueAt(index).mToBeResumed = true;
832 }
833
834 // temporarily disable packet sources to be swapped to prevent
835 // NuPlayerDecoder from dequeuing while we check progress
836 for (size_t i = 0; i < mPacketSources.size(); ++i) {
837 if ((mSwapMask & mPacketSources.keyAt(i))
838 && uri == mStreams[i].mNewUri) {
839 mPacketSources.editValueAt(i)->enable(false);
840 }
841 }
842 bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex);
843 // If switching up, require a cushion bigger than kUnderflowMark
844 // to avoid buffering immediately after the switch.
845 // (If we don't have that cushion we'd rather cancel and try again.)
846 int64_t delayUs =
847 switchUp ?
848 (kUnderflowMarkMs * 1000LL + 1000000LL)
849 : 0;
850 bool needResumeUntil = false;
851 sp<AMessage> stopParams = msg;
852 if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) {
853 // playback time hasn't passed startAt time
854 if (!needResumeUntil) {
855 ALOGV("finish switch");
856 for (size_t i = 0; i < kMaxStreams; ++i) {
857 if ((mSwapMask & indexToType(i))
858 && uri == mStreams[i].mNewUri) {
859 // have to make a copy of mStreams[i].mUri because
860 // tryToFinishBandwidthSwitch is modifying mStreams[]
861 AString oldURI = mStreams[i].mUri;
862 tryToFinishBandwidthSwitch(oldURI);
863 break;
864 }
865 }
866 } else {
867 // startAt time is after last enqueue time
868 // Resume fetcher for the original variant; the resumed fetcher should
869 // continue until the timestamps found in msg, which is stored by the
870 // new fetcher to indicate where the new variant has started buffering.
871 ALOGV("finish switch with resumeUntilAsync");
872 for (size_t i = 0; i < mFetcherInfos.size(); i++) {
873 const FetcherInfo &info = mFetcherInfos.valueAt(i);
874 if (info.mToBeRemoved) {
875 info.mFetcher->resumeUntilAsync(stopParams);
876 }
877 }
878 }
879 } else {
880 // playback time passed startAt time
881 if (switchUp) {
882 // if switching up, cancel and retry if condition satisfies again
883 ALOGV("cancel up switch because we're too late");
884 cancelBandwidthSwitch(true /* resume */);
885 } else {
886 ALOGV("retry down switch at next sample");
887 resumeFetcher(uri, mSwapMask, -1, true /* newUri */);
888 }
889 }
890 // re-enable all packet sources
891 for (size_t i = 0; i < mPacketSources.size(); ++i) {
892 mPacketSources.editValueAt(i)->enable(true);
893 }
894
895 break;
896 }
897
898 case PlaylistFetcher::kWhatPlaylistFetched:
899 {
900 onMasterPlaylistFetched(msg);
901 break;
902 }
903
904 case PlaylistFetcher::kWhatMetadataDetected:
905 {
906 if (!mHasMetadata) {
907 mHasMetadata = true;
908 sp<AMessage> notify = mNotify->dup();
909 notify->setInt32("what", kWhatMetadataDetected);
910 notify->post();
911 }
912 break;
913 }
914
915 default:
916 TRESPASS();
917 }
918
919 break;
920 }
921
922 case kWhatChangeConfiguration:
923 {
924 onChangeConfiguration(msg);
925 break;
926 }
927
928 case kWhatChangeConfiguration2:
929 {
930 onChangeConfiguration2(msg);
931 break;
932 }
933
934 case kWhatChangeConfiguration3:
935 {
936 onChangeConfiguration3(msg);
937 break;
938 }
939
940 case kWhatPollBuffering:
941 {
942 int32_t generation;
943 CHECK(msg->findInt32("generation", &generation));
944 if (generation == mPollBufferingGeneration) {
945 onPollBuffering();
946 }
947 break;
948 }
949
950 default:
951 TRESPASS();
952 break;
953 }
954 }
955
956 // static
isBandwidthValid(const BandwidthItem & item)957 bool LiveSession::isBandwidthValid(const BandwidthItem &item) {
958 static const int64_t kBlacklistWindowUs = 300 * 1000000LL;
959 return item.mLastFailureUs < 0
960 || ALooper::GetNowUs() - item.mLastFailureUs > kBlacklistWindowUs;
961 }
962
963 // static
SortByBandwidth(const BandwidthItem * a,const BandwidthItem * b)964 int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
965 if (a->mBandwidth < b->mBandwidth) {
966 return -1;
967 } else if (a->mBandwidth == b->mBandwidth) {
968 return 0;
969 }
970
971 return 1;
972 }
973
974 // static
indexToType(int idx)975 LiveSession::StreamType LiveSession::indexToType(int idx) {
976 CHECK(idx >= 0 && idx < kNumSources);
977 return (StreamType)(1 << idx);
978 }
979
980 // static
typeToIndex(int32_t type)981 ssize_t LiveSession::typeToIndex(int32_t type) {
982 switch (type) {
983 case STREAMTYPE_AUDIO:
984 return 0;
985 case STREAMTYPE_VIDEO:
986 return 1;
987 case STREAMTYPE_SUBTITLES:
988 return 2;
989 case STREAMTYPE_METADATA:
990 return 3;
991 default:
992 return -1;
993 };
994 return -1;
995 }
996
onConnect(const sp<AMessage> & msg)997 void LiveSession::onConnect(const sp<AMessage> &msg) {
998 CHECK(msg->findString("url", &mMasterURL));
999
1000 // TODO currently we don't know if we are coming here from incognito mode
1001 ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str());
1002
1003 KeyedVector<String8, String8> *headers = NULL;
1004 if (!msg->findPointer("headers", (void **)&headers)) {
1005 mExtraHeaders.clear();
1006 } else {
1007 mExtraHeaders = *headers;
1008
1009 delete headers;
1010 headers = NULL;
1011 }
1012
1013 // create looper for fetchers
1014 if (mFetcherLooper == NULL) {
1015 mFetcherLooper = new ALooper();
1016
1017 mFetcherLooper->setName("Fetcher");
1018 mFetcherLooper->start(false, /* runOnCallingThread */
1019 true /* canCallJava */);
1020 }
1021
1022 // create fetcher to fetch the master playlist
1023 addFetcher(mMasterURL.c_str())->fetchPlaylistAsync();
1024 }
1025
onMasterPlaylistFetched(const sp<AMessage> & msg)1026 void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) {
1027 AString uri;
1028 CHECK(msg->findString("uri", &uri));
1029 ssize_t index = mFetcherInfos.indexOfKey(uri);
1030 if (index < 0) {
1031 ALOGW("fetcher for master playlist is gone.");
1032 return;
1033 }
1034
1035 // no longer useful, remove
1036 mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id());
1037 mFetcherInfos.removeItemsAt(index);
1038
1039 CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist));
1040 if (mPlaylist == NULL) {
1041 ALOGE("unable to fetch master playlist %s.",
1042 uriDebugString(mMasterURL).c_str());
1043
1044 postPrepared(ERROR_IO);
1045 return;
1046 }
1047 // We trust the content provider to make a reasonable choice of preferred
1048 // initial bandwidth by listing it first in the variant playlist.
1049 // At startup we really don't have a good estimate on the available
1050 // network bandwidth since we haven't tranferred any data yet. Once
1051 // we have we can make a better informed choice.
1052 size_t initialBandwidth = 0;
1053 size_t initialBandwidthIndex = 0;
1054
1055 int32_t maxWidth = 0;
1056 int32_t maxHeight = 0;
1057
1058 if (mPlaylist->isVariantPlaylist()) {
1059 Vector<BandwidthItem> itemsWithVideo;
1060 for (size_t i = 0; i < mPlaylist->size(); ++i) {
1061 BandwidthItem item;
1062
1063 item.mPlaylistIndex = i;
1064 item.mLastFailureUs = -1LL;
1065
1066 sp<AMessage> meta;
1067 AString uri;
1068 mPlaylist->itemAt(i, &uri, &meta);
1069
1070 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
1071
1072 int32_t width, height;
1073 if (meta->findInt32("width", &width)) {
1074 maxWidth = max(maxWidth, width);
1075 }
1076 if (meta->findInt32("height", &height)) {
1077 maxHeight = max(maxHeight, height);
1078 }
1079
1080 mBandwidthItems.push(item);
1081 if (mPlaylist->hasType(i, "video")) {
1082 itemsWithVideo.push(item);
1083 }
1084 }
1085 // remove the audio-only variants if we have at least one with video
1086 if (!itemsWithVideo.empty()
1087 && itemsWithVideo.size() < mBandwidthItems.size()) {
1088 mBandwidthItems.clear();
1089 for (size_t i = 0; i < itemsWithVideo.size(); ++i) {
1090 mBandwidthItems.push(itemsWithVideo[i]);
1091 }
1092 }
1093
1094 CHECK_GT(mBandwidthItems.size(), 0u);
1095 initialBandwidth = mBandwidthItems[0].mBandwidth;
1096
1097 mBandwidthItems.sort(SortByBandwidth);
1098
1099 for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
1100 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
1101 initialBandwidthIndex = i;
1102 break;
1103 }
1104 }
1105 } else {
1106 // dummy item.
1107 BandwidthItem item;
1108 item.mPlaylistIndex = 0;
1109 item.mBandwidth = 0;
1110 mBandwidthItems.push(item);
1111 }
1112
1113 mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth;
1114 mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight;
1115
1116 mPlaylist->pickRandomMediaItems();
1117 changeConfiguration(
1118 0LL /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
1119 }
1120
finishDisconnect()1121 void LiveSession::finishDisconnect() {
1122 ALOGV("finishDisconnect");
1123
1124 // No reconfiguration is currently pending, make sure none will trigger
1125 // during disconnection either.
1126 cancelBandwidthSwitch();
1127
1128 // cancel buffer polling
1129 cancelPollBuffering();
1130
1131 // TRICKY: don't wait for all fetcher to be stopped when disconnecting
1132 //
1133 // Some fetchers might be stuck in connect/getSize at this point. These
1134 // operations will eventually timeout (as we have a timeout set in
1135 // MediaHTTPConnection), but we don't want to block the main UI thread
1136 // until then. Here we just need to make sure we clear all references
1137 // to the fetchers, so that when they finally exit from the blocking
1138 // operation, they can be destructed.
1139 //
1140 // There is one very tricky point though. For this scheme to work, the
1141 // fecther must hold a reference to LiveSession, so that LiveSession is
1142 // destroyed after fetcher. Otherwise LiveSession would get stuck in its
1143 // own destructor when it waits for mFetcherLooper to stop, which still
1144 // blocks main UI thread.
1145 for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1146 mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1147 mFetcherLooper->unregisterHandler(
1148 mFetcherInfos.valueAt(i).mFetcher->id());
1149 }
1150 mFetcherInfos.clear();
1151
1152 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
1153 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
1154
1155 mPacketSources.valueFor(
1156 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
1157
1158 sp<AMessage> response = new AMessage;
1159 response->setInt32("err", OK);
1160
1161 response->postReply(mDisconnectReplyID);
1162 mDisconnectReplyID.clear();
1163 }
1164
addFetcher(const char * uri)1165 sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
1166 ssize_t index = mFetcherInfos.indexOfKey(uri);
1167
1168 if (index >= 0) {
1169 return NULL;
1170 }
1171
1172 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this);
1173 notify->setString("uri", uri);
1174 notify->setInt32("switchGeneration", mSwitchGeneration);
1175
1176 FetcherInfo info;
1177 info.mFetcher = new PlaylistFetcher(
1178 notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration);
1179 info.mDurationUs = -1LL;
1180 info.mToBeRemoved = false;
1181 info.mToBeResumed = false;
1182 mFetcherLooper->registerHandler(info.mFetcher);
1183
1184 mFetcherInfos.add(uri, info);
1185
1186 return info.mFetcher;
1187 }
1188
1189 #if 0
1190 static double uniformRand() {
1191 return (double)rand() / RAND_MAX;
1192 }
1193 #endif
1194
UriIsSameAsIndex(const AString & uri,int32_t i,bool newUri)1195 bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) {
1196 ALOGV("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i,
1197 newUri ? "true" : "false",
1198 newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str());
1199 return i >= 0
1200 && ((!newUri && uri == mStreams[i].mUri)
1201 || (newUri && uri == mStreams[i].mNewUri));
1202 }
1203
getPacketSourceForStreamIndex(size_t trackIndex,bool newUri)1204 sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex(
1205 size_t trackIndex, bool newUri) {
1206 StreamType type = indexToType(trackIndex);
1207 sp<AnotherPacketSource> source = NULL;
1208 if (newUri) {
1209 source = mPacketSources2.valueFor(type);
1210 source->clear();
1211 } else {
1212 source = mPacketSources.valueFor(type);
1213 };
1214 return source;
1215 }
1216
getMetadataSource(sp<AnotherPacketSource> sources[kNumSources],uint32_t streamMask,bool newUri)1217 sp<AnotherPacketSource> LiveSession::getMetadataSource(
1218 sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) {
1219 // todo: One case where the following strategy can fail is when audio and video
1220 // are in separate playlists, both are transport streams, and the metadata
1221 // is actually contained in the audio stream.
1222 ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s",
1223 streamMask, newUri ? "true" : "false");
1224
1225 if ((sources[kVideoIndex] != NULL) // video fetcher; or ...
1226 || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) {
1227 // ... audio fetcher for audio only variant
1228 return getPacketSourceForStreamIndex(kMetaDataIndex, newUri);
1229 }
1230
1231 return NULL;
1232 }
1233
resumeFetcher(const AString & uri,uint32_t streamMask,int64_t timeUs,bool newUri)1234 bool LiveSession::resumeFetcher(
1235 const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) {
1236 ssize_t index = mFetcherInfos.indexOfKey(uri);
1237 if (index < 0) {
1238 ALOGE("did not find fetcher for uri: %s", uriDebugString(uri).c_str());
1239 return false;
1240 }
1241
1242 bool resume = false;
1243 sp<AnotherPacketSource> sources[kNumSources];
1244 for (size_t i = 0; i < kMaxStreams; ++i) {
1245 if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) {
1246 resume = true;
1247 sources[i] = getPacketSourceForStreamIndex(i, newUri);
1248 }
1249 }
1250
1251 if (resume) {
1252 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher;
1253 SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition;
1254
1255 ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d",
1256 fetcher->getFetcherID(), (long long)timeUs, seekMode);
1257
1258 fetcher->startAsync(
1259 sources[kAudioIndex],
1260 sources[kVideoIndex],
1261 sources[kSubtitleIndex],
1262 getMetadataSource(sources, streamMask, newUri),
1263 timeUs, -1, -1, seekMode);
1264 }
1265
1266 return resume;
1267 }
1268
getAbortThreshold(ssize_t currentBWIndex,ssize_t targetBWIndex) const1269 float LiveSession::getAbortThreshold(
1270 ssize_t currentBWIndex, ssize_t targetBWIndex) const {
1271 float abortThreshold = -1.0f;
1272 if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) {
1273 /*
1274 If we're switching down, we need to decide whether to
1275
1276 1) finish last segment of high-bandwidth variant, or
1277 2) abort last segment of high-bandwidth variant, and fetch an
1278 overlapping portion from low-bandwidth variant.
1279
1280 Here we try to maximize the amount of buffer left when the
1281 switch point is met. Given the following parameters:
1282
1283 B: our current buffering level in seconds
1284 T: target duration in seconds
1285 X: sample duration in seconds remain to fetch in last segment
1286 bw0: bandwidth of old variant (as specified in playlist)
1287 bw1: bandwidth of new variant (as specified in playlist)
1288 bw: measured bandwidth available
1289
1290 If we choose 1), when switch happens at the end of current
1291 segment, our buffering will be
1292 B + X - X * bw0 / bw
1293
1294 If we choose 2), when switch happens where we aborted current
1295 segment, our buffering will be
1296 B - (T - X) * bw1 / bw
1297
1298 We should only choose 1) if
1299 X/T < bw1 / (bw1 + bw0 - bw)
1300 */
1301
1302 // abort old bandwidth immediately if bandwidth is fluctuating a lot.
1303 // our estimate could be far off, and fetching old bandwidth could
1304 // take too long.
1305 if (!mLastBandwidthStable) {
1306 return 0.0f;
1307 }
1308
1309 // Taking the measured current bandwidth at 50% face value only,
1310 // as our bandwidth estimation is a lagging indicator. Being
1311 // conservative on this, we prefer switching to lower bandwidth
1312 // unless we're really confident finishing up the last segment
1313 // of higher bandwidth will be fast.
1314 CHECK(mLastBandwidthBps >= 0);
1315 abortThreshold =
1316 (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1317 / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1318 + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
1319 - (float)mLastBandwidthBps * 0.5f);
1320 if (abortThreshold < 0.0f) {
1321 abortThreshold = -1.0f; // do not abort
1322 }
1323 ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f",
1324 mBandwidthItems.itemAt(currentBWIndex).mBandwidth,
1325 mBandwidthItems.itemAt(targetBWIndex).mBandwidth,
1326 mLastBandwidthBps,
1327 abortThreshold);
1328 }
1329 return abortThreshold;
1330 }
1331
addBandwidthMeasurement(size_t numBytes,int64_t delayUs)1332 void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
1333 mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
1334 }
1335
getLowestValidBandwidthIndex() const1336 ssize_t LiveSession::getLowestValidBandwidthIndex() const {
1337 for (size_t index = 0; index < mBandwidthItems.size(); index++) {
1338 if (isBandwidthValid(mBandwidthItems[index])) {
1339 return index;
1340 }
1341 }
1342 // if playlists are all blacklisted, return 0 and hope it's alive
1343 return 0;
1344 }
1345
getBandwidthIndex(int32_t bandwidthBps)1346 size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) {
1347 if (mBandwidthItems.size() < 2) {
1348 // shouldn't be here if we only have 1 bandwidth, check
1349 // logic to get rid of redundant bandwidth polling
1350 ALOGW("getBandwidthIndex() called for single bandwidth playlist!");
1351 return 0;
1352 }
1353
1354 #if 1
1355 char value[PROPERTY_VALUE_MAX];
1356 ssize_t index = -1;
1357 if (property_get("media.httplive.bw-index", value, NULL)) {
1358 char *end;
1359 index = strtol(value, &end, 10);
1360 CHECK(end > value && *end == '\0');
1361
1362 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
1363 index = mBandwidthItems.size() - 1;
1364 }
1365 }
1366
1367 if (index < 0) {
1368 char value[PROPERTY_VALUE_MAX];
1369 if (property_get("media.httplive.max-bw", value, NULL)) {
1370 char *end;
1371 long maxBw = strtoul(value, &end, 10);
1372 if (end > value && *end == '\0') {
1373 if (maxBw > 0 && bandwidthBps > maxBw) {
1374 ALOGV("bandwidth capped to %ld bps", maxBw);
1375 bandwidthBps = maxBw;
1376 }
1377 }
1378 }
1379
1380 // Pick the highest bandwidth stream that's not currently blacklisted
1381 // below or equal to estimated bandwidth.
1382
1383 index = mBandwidthItems.size() - 1;
1384 ssize_t lowestBandwidth = getLowestValidBandwidthIndex();
1385 while (index > lowestBandwidth) {
1386 // be conservative (70%) to avoid overestimating and immediately
1387 // switching down again.
1388 size_t adjustedBandwidthBps = bandwidthBps * .7f;
1389 const BandwidthItem &item = mBandwidthItems[index];
1390 if (item.mBandwidth <= adjustedBandwidthBps
1391 && isBandwidthValid(item)) {
1392 break;
1393 }
1394 --index;
1395 }
1396 }
1397 #elif 0
1398 // Change bandwidth at random()
1399 size_t index = uniformRand() * mBandwidthItems.size();
1400 #elif 0
1401 // There's a 50% chance to stay on the current bandwidth and
1402 // a 50% chance to switch to the next higher bandwidth (wrapping around
1403 // to lowest)
1404 const size_t kMinIndex = 0;
1405
1406 static ssize_t mCurBandwidthIndex = -1;
1407
1408 size_t index;
1409 if (mCurBandwidthIndex < 0) {
1410 index = kMinIndex;
1411 } else if (uniformRand() < 0.5) {
1412 index = (size_t)mCurBandwidthIndex;
1413 } else {
1414 index = mCurBandwidthIndex + 1;
1415 if (index == mBandwidthItems.size()) {
1416 index = kMinIndex;
1417 }
1418 }
1419 mCurBandwidthIndex = index;
1420 #elif 0
1421 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1422
1423 size_t index = mBandwidthItems.size() - 1;
1424 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1425 --index;
1426 }
1427 #elif 1
1428 char value[PROPERTY_VALUE_MAX];
1429 size_t index;
1430 if (property_get("media.httplive.bw-index", value, NULL)) {
1431 char *end;
1432 index = strtoul(value, &end, 10);
1433 CHECK(end > value && *end == '\0');
1434
1435 if (index >= mBandwidthItems.size()) {
1436 index = mBandwidthItems.size() - 1;
1437 }
1438 } else {
1439 index = 0;
1440 }
1441 #else
1442 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream
1443 #endif
1444
1445 CHECK_GE(index, 0);
1446
1447 return index;
1448 }
1449
latestMediaSegmentStartTime() const1450 HLSTime LiveSession::latestMediaSegmentStartTime() const {
1451 HLSTime audioTime(mPacketSources.valueFor(
1452 STREAMTYPE_AUDIO)->getLatestDequeuedMeta());
1453
1454 HLSTime videoTime(mPacketSources.valueFor(
1455 STREAMTYPE_VIDEO)->getLatestDequeuedMeta());
1456
1457 return audioTime < videoTime ? videoTime : audioTime;
1458 }
1459
onSeek(const sp<AMessage> & msg)1460 void LiveSession::onSeek(const sp<AMessage> &msg) {
1461 int64_t timeUs;
1462 int32_t mode;
1463 CHECK(msg->findInt64("timeUs", &timeUs));
1464 CHECK(msg->findInt32("mode", &mode));
1465 // TODO: add "mode" to changeConfiguration.
1466 changeConfiguration(timeUs/* , (MediaPlayerSeekMode)mode */);
1467 }
1468
getDuration(int64_t * durationUs) const1469 status_t LiveSession::getDuration(int64_t *durationUs) const {
1470 int64_t maxDurationUs = -1LL;
1471 for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1472 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1473
1474 if (fetcherDurationUs > maxDurationUs) {
1475 maxDurationUs = fetcherDurationUs;
1476 }
1477 }
1478
1479 *durationUs = maxDurationUs;
1480
1481 return OK;
1482 }
1483
isSeekable() const1484 bool LiveSession::isSeekable() const {
1485 int64_t durationUs;
1486 return getDuration(&durationUs) == OK && durationUs >= 0;
1487 }
1488
hasDynamicDuration() const1489 bool LiveSession::hasDynamicDuration() const {
1490 return false;
1491 }
1492
getTrackCount() const1493 size_t LiveSession::getTrackCount() const {
1494 if (mPlaylist == NULL) {
1495 return 0;
1496 } else {
1497 return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0);
1498 }
1499 }
1500
getTrackInfo(size_t trackIndex) const1501 sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1502 if (mPlaylist == NULL) {
1503 return NULL;
1504 } else {
1505 if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) {
1506 sp<AMessage> format = new AMessage();
1507 format->setInt32("type", MEDIA_TRACK_TYPE_METADATA);
1508 format->setString("language", "und");
1509 format->setString("mime", MEDIA_MIMETYPE_DATA_TIMED_ID3);
1510 return format;
1511 }
1512 return mPlaylist->getTrackInfo(trackIndex);
1513 }
1514 }
1515
selectTrack(size_t index,bool select)1516 status_t LiveSession::selectTrack(size_t index, bool select) {
1517 if (mPlaylist == NULL) {
1518 return INVALID_OPERATION;
1519 }
1520
1521 ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++",
1522 index, select, mSubtitleGeneration);
1523
1524 ++mSubtitleGeneration;
1525 status_t err = mPlaylist->selectTrack(index, select);
1526 if (err == OK) {
1527 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
1528 msg->setInt32("pickTrack", select);
1529 msg->post();
1530 }
1531 return err;
1532 }
1533
getSelectedTrack(media_track_type type) const1534 ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1535 if (mPlaylist == NULL) {
1536 return -1;
1537 } else {
1538 return mPlaylist->getSelectedTrack(type);
1539 }
1540 }
1541
changeConfiguration(int64_t timeUs,ssize_t bandwidthIndex,bool pickTrack)1542 void LiveSession::changeConfiguration(
1543 int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
1544 ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d",
1545 (long long)timeUs, bandwidthIndex, pickTrack);
1546
1547 cancelBandwidthSwitch();
1548
1549 CHECK(!mReconfigurationInProgress);
1550 mReconfigurationInProgress = true;
1551 if (bandwidthIndex >= 0) {
1552 mOrigBandwidthIndex = mCurBandwidthIndex;
1553 mCurBandwidthIndex = bandwidthIndex;
1554 if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1555 ALOGI("#### Starting Bandwidth Switch: %zd => %zd",
1556 mOrigBandwidthIndex, mCurBandwidthIndex);
1557 }
1558 }
1559 CHECK_LT((size_t)mCurBandwidthIndex, mBandwidthItems.size());
1560 const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
1561
1562 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1563 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1564
1565 AString URIs[kMaxStreams];
1566 for (size_t i = 0; i < kMaxStreams; ++i) {
1567 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1568 streamMask |= indexToType(i);
1569 }
1570 }
1571
1572 // Step 1, stop and discard fetchers that are no longer needed.
1573 // Pause those that we'll reuse.
1574 for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1575 // skip fetchers that are marked mToBeRemoved,
1576 // these are done and can't be reused
1577 if (mFetcherInfos[i].mToBeRemoved) {
1578 continue;
1579 }
1580
1581 const AString &uri = mFetcherInfos.keyAt(i);
1582 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher;
1583
1584 bool discardFetcher = true, delayRemoval = false;
1585 for (size_t j = 0; j < kMaxStreams; ++j) {
1586 StreamType type = indexToType(j);
1587 if ((streamMask & type) && uri == URIs[j]) {
1588 resumeMask |= type;
1589 streamMask &= ~type;
1590 discardFetcher = false;
1591 }
1592 }
1593 // Delay fetcher removal if not picking tracks, AND old fetcher
1594 // has stream mask that overlaps new variant. (Okay to discard
1595 // old fetcher now, if completely no overlap.)
1596 if (discardFetcher && timeUs < 0LL && !pickTrack
1597 && (fetcher->getStreamTypeMask() & streamMask)) {
1598 discardFetcher = false;
1599 delayRemoval = true;
1600 }
1601
1602 if (discardFetcher) {
1603 ALOGV("discarding fetcher-%d", fetcher->getFetcherID());
1604 fetcher->stopAsync();
1605 } else {
1606 float threshold = 0.0f; // default to pause after current block (47Kbytes)
1607 bool disconnect = false;
1608 if (timeUs >= 0LL) {
1609 // seeking, no need to finish fetching
1610 disconnect = true;
1611 } else if (delayRemoval) {
1612 // adapting, abort if remaining of current segment is over threshold
1613 threshold = getAbortThreshold(
1614 mOrigBandwidthIndex, mCurBandwidthIndex);
1615 }
1616
1617 ALOGV("pausing fetcher-%d, threshold=%.2f",
1618 fetcher->getFetcherID(), threshold);
1619 fetcher->pauseAsync(threshold, disconnect);
1620 }
1621 }
1622
1623 sp<AMessage> msg;
1624 if (timeUs < 0LL) {
1625 // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1626 msg = new AMessage(kWhatChangeConfiguration3, this);
1627 } else {
1628 msg = new AMessage(kWhatChangeConfiguration2, this);
1629 }
1630 msg->setInt32("streamMask", streamMask);
1631 msg->setInt32("resumeMask", resumeMask);
1632 msg->setInt32("pickTrack", pickTrack);
1633 msg->setInt64("timeUs", timeUs);
1634 for (size_t i = 0; i < kMaxStreams; ++i) {
1635 if ((streamMask | resumeMask) & indexToType(i)) {
1636 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1637 }
1638 }
1639
1640 // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1641 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1642 // fetchers have completed their asynchronous operation, we'll post
1643 // mContinuation, which then is handled below in onChangeConfiguration2.
1644 mContinuationCounter = mFetcherInfos.size();
1645 mContinuation = msg;
1646
1647 if (mContinuationCounter == 0) {
1648 msg->post();
1649 }
1650 }
1651
onChangeConfiguration(const sp<AMessage> & msg)1652 void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1653 ALOGV("onChangeConfiguration");
1654
1655 if (!mReconfigurationInProgress) {
1656 int32_t pickTrack = 0;
1657 msg->findInt32("pickTrack", &pickTrack);
1658 changeConfiguration(-1LL /* timeUs */, -1, pickTrack);
1659 } else {
1660 msg->post(1000000LL); // retry in 1 sec
1661 }
1662 }
1663
onChangeConfiguration2(const sp<AMessage> & msg)1664 void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1665 ALOGV("onChangeConfiguration2");
1666
1667 mContinuation.clear();
1668
1669 // All fetchers are either suspended or have been removed now.
1670
1671 // If we're seeking, clear all packet sources before we report
1672 // seek complete, to prevent decoder from pulling stale data.
1673 int64_t timeUs;
1674 CHECK(msg->findInt64("timeUs", &timeUs));
1675
1676 if (timeUs >= 0) {
1677 mLastSeekTimeUs = timeUs;
1678 mLastDequeuedTimeUs = timeUs;
1679
1680 for (size_t i = 0; i < mPacketSources.size(); i++) {
1681 sp<AnotherPacketSource> packetSource = mPacketSources.editValueAt(i);
1682 sp<MetaData> format = packetSource->getFormat();
1683 packetSource->clear();
1684 // Set a tentative format here such that HTTPLiveSource will always have
1685 // a format available when NuPlayer queries. Without an available video
1686 // format when setting a surface NuPlayer might disable video decoding
1687 // altogether. The tentative format will be overwritten by the
1688 // authoritative (and possibly same) format once content from the new
1689 // position is dequeued.
1690 packetSource->setFormat(format);
1691 }
1692
1693 for (size_t i = 0; i < kMaxStreams; ++i) {
1694 mStreams[i].reset();
1695 }
1696
1697 mDiscontinuityOffsetTimesUs.clear();
1698 mDiscontinuityAbsStartTimesUs.clear();
1699
1700 if (mSeekReplyID != NULL) {
1701 CHECK(mSeekReply != NULL);
1702 mSeekReply->setInt32("err", OK);
1703 mSeekReply->postReply(mSeekReplyID);
1704 mSeekReplyID.clear();
1705 mSeekReply.clear();
1706 }
1707
1708 // restart buffer polling after seek becauese previous
1709 // buffering position is no longer valid.
1710 restartPollBuffering();
1711 }
1712
1713 uint32_t streamMask, resumeMask;
1714 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1715 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1716
1717 streamMask |= resumeMask;
1718
1719 AString URIs[kMaxStreams];
1720 for (size_t i = 0; i < kMaxStreams; ++i) {
1721 if (streamMask & indexToType(i)) {
1722 const AString &uriKey = mStreams[i].uriKey();
1723 CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1724 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1725 }
1726 }
1727
1728 uint32_t changedMask = 0;
1729 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1730 // stream URI could change even if onChangeConfiguration2 is only
1731 // used for seek. Seek could happen during a bw switch, in this
1732 // case bw switch will be cancelled, but the seekTo position will
1733 // fetch from the new URI.
1734 if ((mStreamMask & streamMask & indexToType(i))
1735 && !mStreams[i].mUri.empty()
1736 && !(URIs[i] == mStreams[i].mUri)) {
1737 ALOGV("stream %zu changed: oldURI %s, newURI %s", i,
1738 mStreams[i].mUri.c_str(), URIs[i].c_str());
1739 sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
1740 if (source->getLatestDequeuedMeta() != NULL) {
1741 source->queueDiscontinuity(
1742 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1743 }
1744 }
1745 // Determine which decoders to shutdown on the player side,
1746 // a decoder has to be shutdown if its streamtype was active
1747 // before but now longer isn't.
1748 if ((mStreamMask & ~streamMask & indexToType(i))) {
1749 changedMask |= indexToType(i);
1750 }
1751 }
1752
1753 if (changedMask == 0) {
1754 // If nothing changed as far as the audio/video decoders
1755 // are concerned we can proceed.
1756 onChangeConfiguration3(msg);
1757 return;
1758 }
1759
1760 // Something changed, inform the player which will shutdown the
1761 // corresponding decoders and will post the reply once that's done.
1762 // Handling the reply will continue executing below in
1763 // onChangeConfiguration3.
1764 sp<AMessage> notify = mNotify->dup();
1765 notify->setInt32("what", kWhatStreamsChanged);
1766 notify->setInt32("changedMask", changedMask);
1767
1768 msg->setWhat(kWhatChangeConfiguration3);
1769 msg->setTarget(this);
1770
1771 notify->setMessage("reply", msg);
1772 notify->post();
1773 }
1774
onChangeConfiguration3(const sp<AMessage> & msg)1775 void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1776 mContinuation.clear();
1777 // All remaining fetchers are still suspended, the player has shutdown
1778 // any decoders that needed it.
1779
1780 uint32_t streamMask, resumeMask;
1781 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1782 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1783
1784 mNewStreamMask = streamMask | resumeMask;
1785
1786 int64_t timeUs;
1787 int32_t pickTrack;
1788 bool switching = false;
1789 CHECK(msg->findInt64("timeUs", &timeUs));
1790 CHECK(msg->findInt32("pickTrack", &pickTrack));
1791
1792 if (timeUs < 0LL) {
1793 if (!pickTrack) {
1794 // mSwapMask contains streams that are in both old and new variant,
1795 // (in mNewStreamMask & mStreamMask) but with different URIs
1796 // (not in resumeMask).
1797 // For example, old variant has video and audio in two separate
1798 // URIs, and new variant has only audio with unchanged URI. mSwapMask
1799 // should be 0 as there is nothing to swap. We only need to stop video,
1800 // and resume audio.
1801 mSwapMask = mNewStreamMask & mStreamMask & ~resumeMask;
1802 switching = (mSwapMask != 0);
1803 }
1804 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1805 } else {
1806 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1807 }
1808
1809 ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, "
1810 "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x",
1811 (long long)timeUs, switching, pickTrack,
1812 mStreamMask, mNewStreamMask, mSwapMask);
1813
1814 for (size_t i = 0; i < kMaxStreams; ++i) {
1815 if (streamMask & indexToType(i)) {
1816 if (switching) {
1817 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1818 } else {
1819 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1820 }
1821 }
1822 }
1823
1824 // Of all existing fetchers:
1825 // * Resume fetchers that are still needed and assign them original packet sources.
1826 // * Mark otherwise unneeded fetchers for removal.
1827 ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1828 for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1829 const AString &uri = mFetcherInfos.keyAt(i);
1830 if (!resumeFetcher(uri, resumeMask, timeUs)) {
1831 ALOGV("marking fetcher-%d to be removed",
1832 mFetcherInfos[i].mFetcher->getFetcherID());
1833
1834 mFetcherInfos.editValueAt(i).mToBeRemoved = true;
1835 }
1836 }
1837
1838 // streamMask now only contains the types that need a new fetcher created.
1839 if (streamMask != 0) {
1840 ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1841 }
1842
1843 // Find out when the original fetchers have buffered up to and start the new fetchers
1844 // at a later timestamp.
1845 for (size_t i = 0; i < kMaxStreams; i++) {
1846 if (!(indexToType(i) & streamMask)) {
1847 continue;
1848 }
1849
1850 AString uri;
1851 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1852
1853 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1854 CHECK(fetcher != NULL);
1855
1856 HLSTime startTime;
1857 SeekMode seekMode = kSeekModeExactPosition;
1858 sp<AnotherPacketSource> sources[kNumSources];
1859
1860 if (i == kSubtitleIndex || (!pickTrack && !switching)) {
1861 startTime = latestMediaSegmentStartTime();
1862 }
1863
1864 // TRICKY: looping from i as earlier streams are already removed from streamMask
1865 for (size_t j = i; j < kMaxStreams; ++j) {
1866 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1867 if ((streamMask & indexToType(j)) && uri == streamUri) {
1868 sources[j] = mPacketSources.valueFor(indexToType(j));
1869
1870 if (timeUs >= 0) {
1871 startTime.mTimeUs = timeUs;
1872 } else {
1873 int32_t type;
1874 sp<AMessage> meta;
1875 if (!switching) {
1876 // selecting, or adapting but no swap required
1877 meta = sources[j]->getLatestDequeuedMeta();
1878 } else {
1879 // adapting and swap required
1880 meta = sources[j]->getLatestEnqueuedMeta();
1881 if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
1882 // switching up
1883 meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
1884 }
1885 }
1886
1887 if ((j == kAudioIndex || j == kVideoIndex)
1888 && meta != NULL && !meta->findInt32("discontinuity", &type)) {
1889 HLSTime tmpTime(meta);
1890 if (startTime < tmpTime) {
1891 startTime = tmpTime;
1892 }
1893 }
1894
1895 if (!switching) {
1896 // selecting, or adapting but no swap required
1897 sources[j]->clear();
1898 if (j == kSubtitleIndex) {
1899 break;
1900 }
1901
1902 ALOGV("stream[%zu]: queue format change", j);
1903 sources[j]->queueDiscontinuity(
1904 ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
1905 } else {
1906 // switching, queue discontinuities after resume
1907 sources[j] = mPacketSources2.valueFor(indexToType(j));
1908 sources[j]->clear();
1909 // the new fetcher might be providing streams that used to be
1910 // provided by two different fetchers, if one of the fetcher
1911 // paused in the middle while the other somehow paused in next
1912 // seg, we have to start from next seg.
1913 if (seekMode < mStreams[j].mSeekMode) {
1914 seekMode = mStreams[j].mSeekMode;
1915 }
1916 }
1917 }
1918
1919 streamMask &= ~indexToType(j);
1920 }
1921 }
1922
1923 ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld "
1924 "segmentStartTimeUs %lld seekMode %d",
1925 fetcher->getFetcherID(),
1926 (long long)startTime.mTimeUs,
1927 (long long)mLastSeekTimeUs,
1928 (long long)startTime.getSegmentTimeUs(),
1929 seekMode);
1930
1931 // Set the target segment start time to the middle point of the
1932 // segment where the last sample was.
1933 // This gives a better guess if segments of the two variants are not
1934 // perfectly aligned. (If the corresponding segment in new variant
1935 // starts slightly later than that in the old variant, we still want
1936 // to pick that segment, not the one before)
1937 fetcher->startAsync(
1938 sources[kAudioIndex],
1939 sources[kVideoIndex],
1940 sources[kSubtitleIndex],
1941 getMetadataSource(sources, mNewStreamMask, switching),
1942 startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs,
1943 startTime.getSegmentTimeUs(),
1944 startTime.mSeq,
1945 seekMode);
1946 }
1947
1948 // All fetchers have now been started, the configuration change
1949 // has completed.
1950
1951 mReconfigurationInProgress = false;
1952 if (switching) {
1953 mSwitchInProgress = true;
1954 } else {
1955 mStreamMask = mNewStreamMask;
1956 if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1957 ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd",
1958 mOrigBandwidthIndex, mCurBandwidthIndex);
1959 mOrigBandwidthIndex = mCurBandwidthIndex;
1960 }
1961 }
1962
1963 ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x",
1964 mSwitchInProgress, mStreamMask);
1965
1966 if (mDisconnectReplyID != NULL) {
1967 finishDisconnect();
1968 }
1969 }
1970
swapPacketSource(StreamType stream)1971 void LiveSession::swapPacketSource(StreamType stream) {
1972 ALOGV("[%s] swapPacketSource", getNameForStream(stream));
1973
1974 // transfer packets from source2 to source
1975 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
1976 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
1977
1978 // queue discontinuity in mPacketSource
1979 aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false);
1980
1981 // queue packets in mPacketSource2 to mPacketSource
1982 status_t finalResult = OK;
1983 sp<ABuffer> accessUnit;
1984 while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK &&
1985 OK == aps2->dequeueAccessUnit(&accessUnit)) {
1986 aps->queueAccessUnit(accessUnit);
1987 }
1988 aps2->clear();
1989 }
1990
tryToFinishBandwidthSwitch(const AString & oldUri)1991 void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) {
1992 if (!mSwitchInProgress) {
1993 return;
1994 }
1995
1996 ssize_t index = mFetcherInfos.indexOfKey(oldUri);
1997 if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
1998 return;
1999 }
2000
2001 // Swap packet source of streams provided by old variant
2002 for (size_t idx = 0; idx < kMaxStreams; idx++) {
2003 StreamType stream = indexToType(idx);
2004 if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) {
2005 swapPacketSource(stream);
2006
2007 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
2008 ALOGW("swapping stream type %d %s to empty stream",
2009 stream, uriDebugString(mStreams[idx].mUri).c_str());
2010 }
2011 mStreams[idx].mUri = mStreams[idx].mNewUri;
2012 mStreams[idx].mNewUri.clear();
2013
2014 mSwapMask &= ~stream;
2015 }
2016 }
2017
2018 mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */);
2019
2020 ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask);
2021 if (mSwapMask != 0) {
2022 return;
2023 }
2024
2025 // Check if new variant contains extra streams.
2026 uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
2027 while (extraStreams) {
2028 StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1));
2029 extraStreams &= ~stream;
2030
2031 swapPacketSource(stream);
2032
2033 ssize_t idx = typeToIndex(stream);
2034 CHECK(idx >= 0);
2035 if (mStreams[idx].mNewUri.empty()) {
2036 ALOGW("swapping extra stream type %d %s to empty stream",
2037 stream, uriDebugString(mStreams[idx].mUri).c_str());
2038 }
2039 mStreams[idx].mUri = mStreams[idx].mNewUri;
2040 mStreams[idx].mNewUri.clear();
2041 }
2042
2043 // Restart new fetcher (it was paused after the first 47k block)
2044 // and let it fetch into mPacketSources (not mPacketSources2)
2045 for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2046 FetcherInfo &info = mFetcherInfos.editValueAt(i);
2047 if (info.mToBeResumed) {
2048 resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask);
2049 info.mToBeResumed = false;
2050 }
2051 }
2052
2053 ALOGI("#### Finished Bandwidth Switch: %zd => %zd",
2054 mOrigBandwidthIndex, mCurBandwidthIndex);
2055
2056 mStreamMask = mNewStreamMask;
2057 mSwitchInProgress = false;
2058 mOrigBandwidthIndex = mCurBandwidthIndex;
2059
2060 restartPollBuffering();
2061 }
2062
schedulePollBuffering()2063 void LiveSession::schedulePollBuffering() {
2064 sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
2065 msg->setInt32("generation", mPollBufferingGeneration);
2066 msg->post(1000000LL);
2067 }
2068
cancelPollBuffering()2069 void LiveSession::cancelPollBuffering() {
2070 ++mPollBufferingGeneration;
2071 mPrevBufferPercentage = -1;
2072 }
2073
restartPollBuffering()2074 void LiveSession::restartPollBuffering() {
2075 cancelPollBuffering();
2076 onPollBuffering();
2077 }
2078
onPollBuffering()2079 void LiveSession::onPollBuffering() {
2080 ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
2081 "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x",
2082 mSwitchInProgress, mReconfigurationInProgress,
2083 mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
2084
2085 bool underflow, ready, down, up;
2086 if (checkBuffering(underflow, ready, down, up)) {
2087 if (mInPreparationPhase) {
2088 // Allow down switch even if we're still preparing.
2089 //
2090 // Some streams have a high bandwidth index as default,
2091 // when bandwidth is low, it takes a long time to buffer
2092 // to ready mark, then it immediately pauses after start
2093 // as we have to do a down switch. It's better experience
2094 // to restart from a lower index, if we detect low bw.
2095 if (!switchBandwidthIfNeeded(false /* up */, down) && ready) {
2096 postPrepared(OK);
2097 }
2098 }
2099
2100 if (!mInPreparationPhase) {
2101 if (ready) {
2102 stopBufferingIfNecessary();
2103 } else if (underflow) {
2104 startBufferingIfNecessary();
2105 }
2106 switchBandwidthIfNeeded(up, down);
2107 }
2108 }
2109
2110 schedulePollBuffering();
2111 }
2112
cancelBandwidthSwitch(bool resume)2113 void LiveSession::cancelBandwidthSwitch(bool resume) {
2114 ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd",
2115 mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex);
2116 if (!mSwitchInProgress) {
2117 return;
2118 }
2119
2120 for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2121 FetcherInfo& info = mFetcherInfos.editValueAt(i);
2122 if (info.mToBeRemoved) {
2123 info.mToBeRemoved = false;
2124 if (resume) {
2125 resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask);
2126 }
2127 }
2128 }
2129
2130 for (size_t i = 0; i < kMaxStreams; ++i) {
2131 AString newUri = mStreams[i].mNewUri;
2132 if (!newUri.empty()) {
2133 // clear all mNewUri matching this newUri
2134 for (size_t j = i; j < kMaxStreams; ++j) {
2135 if (mStreams[j].mNewUri == newUri) {
2136 mStreams[j].mNewUri.clear();
2137 }
2138 }
2139 ALOGV("stopping newUri = %s", newUri.c_str());
2140 ssize_t index = mFetcherInfos.indexOfKey(newUri);
2141 if (index < 0) {
2142 ALOGE("did not find fetcher for newUri: %s", uriDebugString(newUri).c_str());
2143 continue;
2144 }
2145 FetcherInfo &info = mFetcherInfos.editValueAt(index);
2146 info.mToBeRemoved = true;
2147 info.mFetcher->stopAsync();
2148 }
2149 }
2150
2151 ALOGI("#### Canceled Bandwidth Switch: %zd => %zd",
2152 mOrigBandwidthIndex, mCurBandwidthIndex);
2153
2154 mSwitchGeneration++;
2155 mSwitchInProgress = false;
2156 mCurBandwidthIndex = mOrigBandwidthIndex;
2157 mSwapMask = 0;
2158 }
2159
checkBuffering(bool & underflow,bool & ready,bool & down,bool & up)2160 bool LiveSession::checkBuffering(
2161 bool &underflow, bool &ready, bool &down, bool &up) {
2162 underflow = ready = down = up = false;
2163
2164 if (mReconfigurationInProgress) {
2165 ALOGV("Switch/Reconfig in progress, defer buffer polling");
2166 return false;
2167 }
2168
2169 size_t activeCount, underflowCount, readyCount, downCount, upCount;
2170 activeCount = underflowCount = readyCount = downCount = upCount =0;
2171 int32_t minBufferPercent = -1;
2172 int64_t durationUs;
2173 if (getDuration(&durationUs) != OK) {
2174 durationUs = -1;
2175 }
2176 for (size_t i = 0; i < mPacketSources.size(); ++i) {
2177 // we don't check subtitles for buffering level
2178 if (!(mStreamMask & mPacketSources.keyAt(i)
2179 & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) {
2180 continue;
2181 }
2182 // ignore streams that never had any packet queued.
2183 // (it's possible that the variant only has audio or video)
2184 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
2185 if (meta == NULL) {
2186 continue;
2187 }
2188
2189 status_t finalResult;
2190 int64_t bufferedDurationUs =
2191 mPacketSources[i]->getBufferedDurationUs(&finalResult);
2192 ALOGV("[%s] buffered %lld us",
2193 getNameForStream(mPacketSources.keyAt(i)),
2194 (long long)bufferedDurationUs);
2195 if (durationUs >= 0) {
2196 int32_t percent;
2197 if (mPacketSources[i]->isFinished(0 /* duration */)) {
2198 percent = 100;
2199 } else {
2200 percent = (int32_t)(100.0 *
2201 (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs);
2202 }
2203 if (minBufferPercent < 0 || percent < minBufferPercent) {
2204 minBufferPercent = percent;
2205 }
2206 }
2207
2208 ++activeCount;
2209 int64_t readyMarkUs =
2210 (mInPreparationPhase ?
2211 mBufferingSettings.mInitialMarkMs :
2212 mBufferingSettings.mResumePlaybackMarkMs) * 1000LL;
2213 if (bufferedDurationUs > readyMarkUs
2214 || mPacketSources[i]->isFinished(0)) {
2215 ++readyCount;
2216 }
2217 if (!mPacketSources[i]->isFinished(0)) {
2218 if (bufferedDurationUs < kUnderflowMarkMs * 1000LL) {
2219 ++underflowCount;
2220 }
2221 if (bufferedDurationUs > mUpSwitchMark) {
2222 ++upCount;
2223 }
2224 if (bufferedDurationUs < mDownSwitchMark) {
2225 ++downCount;
2226 }
2227 }
2228 }
2229
2230 if (minBufferPercent >= 0) {
2231 notifyBufferingUpdate(minBufferPercent);
2232 }
2233
2234 if (activeCount > 0) {
2235 up = (upCount == activeCount);
2236 down = (downCount > 0);
2237 ready = (readyCount == activeCount);
2238 underflow = (underflowCount > 0);
2239 return true;
2240 }
2241
2242 return false;
2243 }
2244
startBufferingIfNecessary()2245 void LiveSession::startBufferingIfNecessary() {
2246 ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2247 mInPreparationPhase, mBuffering);
2248 if (!mBuffering) {
2249 mBuffering = true;
2250
2251 sp<AMessage> notify = mNotify->dup();
2252 notify->setInt32("what", kWhatBufferingStart);
2253 notify->post();
2254 }
2255 }
2256
stopBufferingIfNecessary()2257 void LiveSession::stopBufferingIfNecessary() {
2258 ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2259 mInPreparationPhase, mBuffering);
2260
2261 if (mBuffering) {
2262 mBuffering = false;
2263
2264 sp<AMessage> notify = mNotify->dup();
2265 notify->setInt32("what", kWhatBufferingEnd);
2266 notify->post();
2267 }
2268 }
2269
notifyBufferingUpdate(int32_t percentage)2270 void LiveSession::notifyBufferingUpdate(int32_t percentage) {
2271 if (percentage < mPrevBufferPercentage) {
2272 percentage = mPrevBufferPercentage;
2273 } else if (percentage > 100) {
2274 percentage = 100;
2275 }
2276
2277 mPrevBufferPercentage = percentage;
2278
2279 ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage);
2280
2281 sp<AMessage> notify = mNotify->dup();
2282 notify->setInt32("what", kWhatBufferingUpdate);
2283 notify->setInt32("percentage", percentage);
2284 notify->post();
2285 }
2286
tryBandwidthFallback()2287 bool LiveSession::tryBandwidthFallback() {
2288 if (mInPreparationPhase || mReconfigurationInProgress) {
2289 // Don't try fallback during prepare or reconfig.
2290 // If error happens there, it's likely unrecoverable.
2291 return false;
2292 }
2293 if (mCurBandwidthIndex > mOrigBandwidthIndex) {
2294 // if we're switching up, simply cancel and resume old variant
2295 cancelBandwidthSwitch(true /* resume */);
2296 return true;
2297 } else {
2298 // if we're switching down, we're likely about to underflow (if
2299 // not already underflowing). try the lowest viable bandwidth if
2300 // not on that variant already.
2301 ssize_t lowestValid = getLowestValidBandwidthIndex();
2302 if (mCurBandwidthIndex > lowestValid) {
2303 cancelBandwidthSwitch();
2304 changeConfiguration(-1LL, lowestValid);
2305 return true;
2306 }
2307 }
2308 // return false if we couldn't find any fallback
2309 return false;
2310 }
2311
2312 /*
2313 * returns true if a bandwidth switch is actually needed (and started),
2314 * returns false otherwise
2315 */
switchBandwidthIfNeeded(bool bufferHigh,bool bufferLow)2316 bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
2317 // no need to check bandwidth if we only have 1 bandwidth settings
2318 if (mBandwidthItems.size() < 2) {
2319 return false;
2320 }
2321
2322 if (mSwitchInProgress) {
2323 if (mBuffering) {
2324 tryBandwidthFallback();
2325 }
2326 return false;
2327 }
2328
2329 int32_t bandwidthBps, shortTermBps;
2330 bool isStable;
2331 if (mBandwidthEstimator->estimateBandwidth(
2332 &bandwidthBps, &isStable, &shortTermBps)) {
2333 ALOGV("bandwidth estimated at %.2f kbps, "
2334 "stable %d, shortTermBps %.2f kbps",
2335 bandwidthBps / 1024.0f, isStable, shortTermBps / 1024.0f);
2336 mLastBandwidthBps = bandwidthBps;
2337 mLastBandwidthStable = isStable;
2338 } else {
2339 ALOGV("no bandwidth estimate.");
2340 return false;
2341 }
2342
2343 int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth;
2344 // canSwithDown and canSwitchUp can't both be true.
2345 // we only want to switch up when measured bw is 120% higher than current variant,
2346 // and we only want to switch down when measured bw is below current variant.
2347 bool canSwitchDown = bufferLow
2348 && (bandwidthBps < (int32_t)curBandwidth);
2349 bool canSwitchUp = bufferHigh
2350 && (bandwidthBps > (int32_t)curBandwidth * 12 / 10);
2351
2352 if (canSwitchDown || canSwitchUp) {
2353 // bandwidth estimating has some delay, if we have to downswitch when
2354 // it hasn't stabilized, use the short term to guess real bandwidth,
2355 // since it may be dropping too fast.
2356 // (note this doesn't apply to upswitch, always use longer average there)
2357 if (!isStable && canSwitchDown) {
2358 if (shortTermBps < bandwidthBps) {
2359 bandwidthBps = shortTermBps;
2360 }
2361 }
2362
2363 ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps);
2364
2365 // it's possible that we're checking for canSwitchUp case, but the returned
2366 // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70%
2367 // of measured bw. In that case we don't want to do anything, since we have
2368 // both enough buffer and enough bw.
2369 if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex)
2370 || (canSwitchDown && bandwidthIndex < mCurBandwidthIndex)) {
2371 // if not yet prepared, just restart again with new bw index.
2372 // this is faster and playback experience is cleaner.
2373 changeConfiguration(
2374 mInPreparationPhase ? 0 : -1LL, bandwidthIndex);
2375 return true;
2376 }
2377 }
2378 return false;
2379 }
2380
postError(status_t err)2381 void LiveSession::postError(status_t err) {
2382 // if we reached EOS, notify buffering of 100%
2383 if (err == ERROR_END_OF_STREAM) {
2384 notifyBufferingUpdate(100);
2385 }
2386 // we'll stop buffer polling now, before that notify
2387 // stop buffering to stop the spinning icon
2388 stopBufferingIfNecessary();
2389 cancelPollBuffering();
2390
2391 sp<AMessage> notify = mNotify->dup();
2392 notify->setInt32("what", kWhatError);
2393 notify->setInt32("err", err);
2394 notify->post();
2395 }
2396
postPrepared(status_t err)2397 void LiveSession::postPrepared(status_t err) {
2398 CHECK(mInPreparationPhase);
2399
2400 sp<AMessage> notify = mNotify->dup();
2401 if (err == OK || err == ERROR_END_OF_STREAM) {
2402 notify->setInt32("what", kWhatPrepared);
2403 } else {
2404 cancelPollBuffering();
2405
2406 notify->setInt32("what", kWhatPreparationFailed);
2407 notify->setInt32("err", err);
2408 }
2409
2410 notify->post();
2411
2412 mInPreparationPhase = false;
2413 }
2414
2415
2416 } // namespace android
2417
2418