1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "LiveSession"
19 #include <utils/Log.h>
20
21 #include "LiveSession.h"
22 #include "HTTPDownloader.h"
23 #include "M3UParser.h"
24 #include "PlaylistFetcher.h"
25
26 #include "mpeg2ts/AnotherPacketSource.h"
27
28 #include <cutils/properties.h>
29 #include <media/IMediaHTTPService.h>
30 #include <media/stagefright/foundation/ABuffer.h>
31 #include <media/stagefright/foundation/ADebug.h>
32 #include <media/stagefright/foundation/AMessage.h>
33 #include <media/stagefright/foundation/AUtils.h>
34 #include <media/stagefright/MediaDefs.h>
35 #include <media/stagefright/MetaData.h>
36 #include <media/stagefright/Utils.h>
37
38 #include <utils/Mutex.h>
39
40 #include <ctype.h>
41 #include <inttypes.h>
42
43 namespace android {
44
45 // static
46 // Bandwidth Switch Mark Defaults
47 const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll;
48 const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll;
49 const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll;
50 const int64_t LiveSession::kResumeThresholdUs = 100000ll;
51
52 // Buffer Prepare/Ready/Underflow Marks
53 const int64_t LiveSession::kReadyMarkUs = 5000000ll;
54 const int64_t LiveSession::kPrepareMarkUs = 1500000ll;
55 const int64_t LiveSession::kUnderflowMarkUs = 1000000ll;
56
57 struct LiveSession::BandwidthEstimator : public RefBase {
58 BandwidthEstimator();
59
60 void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
61 bool estimateBandwidth(
62 int32_t *bandwidth,
63 bool *isStable = NULL,
64 int32_t *shortTermBps = NULL);
65
66 private:
67 // Bandwidth estimation parameters
68 static const int32_t kShortTermBandwidthItems = 3;
69 static const int32_t kMinBandwidthHistoryItems = 20;
70 static const int64_t kMinBandwidthHistoryWindowUs = 5000000ll; // 5 sec
71 static const int64_t kMaxBandwidthHistoryWindowUs = 30000000ll; // 30 sec
72 static const int64_t kMaxBandwidthHistoryAgeUs = 60000000ll; // 60 sec
73
74 struct BandwidthEntry {
75 int64_t mTimestampUs;
76 int64_t mDelayUs;
77 size_t mNumBytes;
78 };
79
80 Mutex mLock;
81 List<BandwidthEntry> mBandwidthHistory;
82 List<int32_t> mPrevEstimates;
83 int32_t mShortTermEstimate;
84 bool mHasNewSample;
85 bool mIsStable;
86 int64_t mTotalTransferTimeUs;
87 size_t mTotalTransferBytes;
88
89 DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator);
90 };
91
BandwidthEstimator()92 LiveSession::BandwidthEstimator::BandwidthEstimator() :
93 mShortTermEstimate(0),
94 mHasNewSample(false),
95 mIsStable(true),
96 mTotalTransferTimeUs(0),
97 mTotalTransferBytes(0) {
98 }
99
addBandwidthMeasurement(size_t numBytes,int64_t delayUs)100 void LiveSession::BandwidthEstimator::addBandwidthMeasurement(
101 size_t numBytes, int64_t delayUs) {
102 AutoMutex autoLock(mLock);
103
104 int64_t nowUs = ALooper::GetNowUs();
105 BandwidthEntry entry;
106 entry.mTimestampUs = nowUs;
107 entry.mDelayUs = delayUs;
108 entry.mNumBytes = numBytes;
109 mTotalTransferTimeUs += delayUs;
110 mTotalTransferBytes += numBytes;
111 mBandwidthHistory.push_back(entry);
112 mHasNewSample = true;
113
114 // Remove no more than 10% of total transfer time at a time
115 // to avoid sudden jump on bandwidth estimation. There might
116 // be long blocking reads that takes up signification time,
117 // we have to keep a longer window in that case.
118 int64_t bandwidthHistoryWindowUs = mTotalTransferTimeUs * 9 / 10;
119 if (bandwidthHistoryWindowUs < kMinBandwidthHistoryWindowUs) {
120 bandwidthHistoryWindowUs = kMinBandwidthHistoryWindowUs;
121 } else if (bandwidthHistoryWindowUs > kMaxBandwidthHistoryWindowUs) {
122 bandwidthHistoryWindowUs = kMaxBandwidthHistoryWindowUs;
123 }
124 // trim old samples, keeping at least kMaxBandwidthHistoryItems samples,
125 // and total transfer time at least kMaxBandwidthHistoryWindowUs.
126 while (mBandwidthHistory.size() > kMinBandwidthHistoryItems) {
127 List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
128 // remove sample if either absolute age or total transfer time is
129 // over kMaxBandwidthHistoryWindowUs
130 if (nowUs - it->mTimestampUs < kMaxBandwidthHistoryAgeUs &&
131 mTotalTransferTimeUs - it->mDelayUs < bandwidthHistoryWindowUs) {
132 break;
133 }
134 mTotalTransferTimeUs -= it->mDelayUs;
135 mTotalTransferBytes -= it->mNumBytes;
136 mBandwidthHistory.erase(mBandwidthHistory.begin());
137 }
138 }
139
estimateBandwidth(int32_t * bandwidthBps,bool * isStable,int32_t * shortTermBps)140 bool LiveSession::BandwidthEstimator::estimateBandwidth(
141 int32_t *bandwidthBps, bool *isStable, int32_t *shortTermBps) {
142 AutoMutex autoLock(mLock);
143
144 if (mBandwidthHistory.size() < 2) {
145 return false;
146 }
147
148 if (!mHasNewSample) {
149 *bandwidthBps = *(--mPrevEstimates.end());
150 if (isStable) {
151 *isStable = mIsStable;
152 }
153 if (shortTermBps) {
154 *shortTermBps = mShortTermEstimate;
155 }
156 return true;
157 }
158
159 *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs);
160 mPrevEstimates.push_back(*bandwidthBps);
161 while (mPrevEstimates.size() > 3) {
162 mPrevEstimates.erase(mPrevEstimates.begin());
163 }
164 mHasNewSample = false;
165
166 int64_t totalTimeUs = 0;
167 size_t totalBytes = 0;
168 if (mBandwidthHistory.size() >= kShortTermBandwidthItems) {
169 List<BandwidthEntry>::iterator it = --mBandwidthHistory.end();
170 for (size_t i = 0; i < kShortTermBandwidthItems; i++, it--) {
171 totalTimeUs += it->mDelayUs;
172 totalBytes += it->mNumBytes;
173 }
174 }
175 mShortTermEstimate = totalTimeUs > 0 ?
176 (totalBytes * 8E6 / totalTimeUs) : *bandwidthBps;
177 if (shortTermBps) {
178 *shortTermBps = mShortTermEstimate;
179 }
180
181 int32_t minEstimate = -1, maxEstimate = -1;
182 List<int32_t>::iterator it;
183 for (it = mPrevEstimates.begin(); it != mPrevEstimates.end(); it++) {
184 int32_t estimate = *it;
185 if (minEstimate < 0 || minEstimate > estimate) {
186 minEstimate = estimate;
187 }
188 if (maxEstimate < 0 || maxEstimate < estimate) {
189 maxEstimate = estimate;
190 }
191 }
192 // consider it stable if long-term average is not jumping a lot
193 // and short-term average is not much lower than long-term average
194 mIsStable = (maxEstimate <= minEstimate * 4 / 3)
195 && mShortTermEstimate > minEstimate * 7 / 10;
196 if (isStable) {
197 *isStable = mIsStable;
198 }
199
200 #if 0
201 {
202 char dumpStr[1024] = {0};
203 size_t itemIdx = 0;
204 size_t histSize = mBandwidthHistory.size();
205 sprintf(dumpStr, "estimate bps=%d stable=%d history (n=%d): {",
206 *bandwidthBps, mIsStable, histSize);
207 List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
208 for (; it != mBandwidthHistory.end(); ++it) {
209 if (itemIdx > 50) {
210 sprintf(dumpStr + strlen(dumpStr),
211 "...(%zd more items)... }", histSize - itemIdx);
212 break;
213 }
214 sprintf(dumpStr + strlen(dumpStr), "%dk/%.3fs%s",
215 it->mNumBytes / 1024,
216 (double)it->mDelayUs * 1.0e-6,
217 (it == (--mBandwidthHistory.end())) ? "}" : ", ");
218 itemIdx++;
219 }
220 ALOGE(dumpStr);
221 }
222 #endif
223 return true;
224 }
225
226 //static
getKeyForStream(StreamType type)227 const char *LiveSession::getKeyForStream(StreamType type) {
228 switch (type) {
229 case STREAMTYPE_VIDEO:
230 return "timeUsVideo";
231 case STREAMTYPE_AUDIO:
232 return "timeUsAudio";
233 case STREAMTYPE_SUBTITLES:
234 return "timeUsSubtitle";
235 case STREAMTYPE_METADATA:
236 return "timeUsMetadata"; // unused
237 default:
238 TRESPASS();
239 }
240 return NULL;
241 }
242
243 //static
getNameForStream(StreamType type)244 const char *LiveSession::getNameForStream(StreamType type) {
245 switch (type) {
246 case STREAMTYPE_VIDEO:
247 return "video";
248 case STREAMTYPE_AUDIO:
249 return "audio";
250 case STREAMTYPE_SUBTITLES:
251 return "subs";
252 case STREAMTYPE_METADATA:
253 return "metadata";
254 default:
255 break;
256 }
257 return "unknown";
258 }
259
260 //static
getSourceTypeForStream(StreamType type)261 ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) {
262 switch (type) {
263 case STREAMTYPE_VIDEO:
264 return ATSParser::VIDEO;
265 case STREAMTYPE_AUDIO:
266 return ATSParser::AUDIO;
267 case STREAMTYPE_METADATA:
268 return ATSParser::META;
269 case STREAMTYPE_SUBTITLES:
270 default:
271 TRESPASS();
272 }
273 return ATSParser::NUM_SOURCE_TYPES; // should not reach here
274 }
275
LiveSession(const sp<AMessage> & notify,uint32_t flags,const sp<IMediaHTTPService> & httpService)276 LiveSession::LiveSession(
277 const sp<AMessage> ¬ify, uint32_t flags,
278 const sp<IMediaHTTPService> &httpService)
279 : mNotify(notify),
280 mFlags(flags),
281 mHTTPService(httpService),
282 mBuffering(false),
283 mInPreparationPhase(true),
284 mPollBufferingGeneration(0),
285 mPrevBufferPercentage(-1),
286 mCurBandwidthIndex(-1),
287 mOrigBandwidthIndex(-1),
288 mLastBandwidthBps(-1ll),
289 mLastBandwidthStable(false),
290 mBandwidthEstimator(new BandwidthEstimator()),
291 mMaxWidth(720),
292 mMaxHeight(480),
293 mStreamMask(0),
294 mNewStreamMask(0),
295 mSwapMask(0),
296 mSwitchGeneration(0),
297 mSubtitleGeneration(0),
298 mLastDequeuedTimeUs(0ll),
299 mRealTimeBaseUs(0ll),
300 mReconfigurationInProgress(false),
301 mSwitchInProgress(false),
302 mUpSwitchMark(kUpSwitchMarkUs),
303 mDownSwitchMark(kDownSwitchMarkUs),
304 mUpSwitchMargin(kUpSwitchMarginUs),
305 mFirstTimeUsValid(false),
306 mFirstTimeUs(0),
307 mLastSeekTimeUs(0),
308 mHasMetadata(false) {
309 mStreams[kAudioIndex] = StreamItem("audio");
310 mStreams[kVideoIndex] = StreamItem("video");
311 mStreams[kSubtitleIndex] = StreamItem("subtitles");
312
313 for (size_t i = 0; i < kNumSources; ++i) {
314 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
315 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
316 }
317 }
318
~LiveSession()319 LiveSession::~LiveSession() {
320 if (mFetcherLooper != NULL) {
321 mFetcherLooper->stop();
322 }
323 }
324
calculateMediaTimeUs(int64_t firstTimeUs,int64_t timeUs,int32_t discontinuitySeq)325 int64_t LiveSession::calculateMediaTimeUs(
326 int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) {
327 if (timeUs >= firstTimeUs) {
328 timeUs -= firstTimeUs;
329 } else {
330 timeUs = 0;
331 }
332 timeUs += mLastSeekTimeUs;
333 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
334 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
335 }
336 return timeUs;
337 }
338
dequeueAccessUnit(StreamType stream,sp<ABuffer> * accessUnit)339 status_t LiveSession::dequeueAccessUnit(
340 StreamType stream, sp<ABuffer> *accessUnit) {
341 status_t finalResult = OK;
342 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
343
344 ssize_t streamIdx = typeToIndex(stream);
345 if (streamIdx < 0) {
346 return BAD_VALUE;
347 }
348 const char *streamStr = getNameForStream(stream);
349 // Do not let client pull data if we don't have data packets yet.
350 // We might only have a format discontinuity queued without data.
351 // When NuPlayerDecoder dequeues the format discontinuity, it will
352 // immediately try to getFormat. If we return NULL, NuPlayerDecoder
353 // thinks it can do seamless change, so will not shutdown decoder.
354 // When the actual format arrives, it can't handle it and get stuck.
355 if (!packetSource->hasDataBufferAvailable(&finalResult)) {
356 ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)",
357 streamStr, finalResult);
358
359 if (finalResult == OK) {
360 return -EAGAIN;
361 } else {
362 return finalResult;
363 }
364 }
365
366 // Let the client dequeue as long as we have buffers available
367 // Do not make pause/resume decisions here.
368
369 status_t err = packetSource->dequeueAccessUnit(accessUnit);
370
371 if (err == INFO_DISCONTINUITY) {
372 // adaptive streaming, discontinuities in the playlist
373 int32_t type;
374 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
375
376 sp<AMessage> extra;
377 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
378 extra.clear();
379 }
380
381 ALOGI("[%s] read discontinuity of type %d, extra = %s",
382 streamStr,
383 type,
384 extra == NULL ? "NULL" : extra->debugString().c_str());
385 } else if (err == OK) {
386
387 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
388 int64_t timeUs, originalTimeUs;
389 int32_t discontinuitySeq = 0;
390 StreamItem& strm = mStreams[streamIdx];
391 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs));
392 originalTimeUs = timeUs;
393 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
394 if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) {
395 int64_t offsetTimeUs;
396 if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
397 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq);
398 } else {
399 offsetTimeUs = 0;
400 }
401
402 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0
403 && strm.mLastDequeuedTimeUs >= 0) {
404 int64_t firstTimeUs;
405 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
406 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
407 offsetTimeUs += strm.mLastSampleDurationUs;
408 } else {
409 offsetTimeUs += strm.mLastSampleDurationUs;
410 }
411
412 mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs);
413 strm.mCurDiscontinuitySeq = discontinuitySeq;
414 }
415
416 int32_t discard = 0;
417 int64_t firstTimeUs;
418 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
419 int64_t durUs; // approximate sample duration
420 if (timeUs > strm.mLastDequeuedTimeUs) {
421 durUs = timeUs - strm.mLastDequeuedTimeUs;
422 } else {
423 durUs = strm.mLastDequeuedTimeUs - timeUs;
424 }
425 strm.mLastSampleDurationUs = durUs;
426 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
427 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
428 firstTimeUs = timeUs;
429 } else {
430 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
431 firstTimeUs = timeUs;
432 }
433
434 strm.mLastDequeuedTimeUs = timeUs;
435 timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq);
436
437 ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us",
438 streamStr, (long long)timeUs, (long long)originalTimeUs);
439 (*accessUnit)->meta()->setInt64("timeUs", timeUs);
440 mLastDequeuedTimeUs = timeUs;
441 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
442 } else if (stream == STREAMTYPE_SUBTITLES) {
443 int32_t subtitleGeneration;
444 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
445 && subtitleGeneration != mSubtitleGeneration) {
446 return -EAGAIN;
447 };
448 (*accessUnit)->meta()->setInt32(
449 "trackIndex", mPlaylist->getSelectedIndex());
450 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
451 } else if (stream == STREAMTYPE_METADATA) {
452 HLSTime mdTime((*accessUnit)->meta());
453 if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) {
454 packetSource->requeueAccessUnit((*accessUnit));
455 return -EAGAIN;
456 } else {
457 int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq);
458 int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq);
459 (*accessUnit)->meta()->setInt64("timeUs", timeUs);
460 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
461 }
462 }
463 } else {
464 ALOGI("[%s] encountered error %d", streamStr, err);
465 }
466
467 return err;
468 }
469
getStreamFormatMeta(StreamType stream,sp<MetaData> * meta)470 status_t LiveSession::getStreamFormatMeta(StreamType stream, sp<MetaData> *meta) {
471 if (!(mStreamMask & stream)) {
472 return UNKNOWN_ERROR;
473 }
474
475 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
476
477 *meta = packetSource->getFormat();
478
479 if (*meta == NULL) {
480 return -EWOULDBLOCK;
481 }
482
483 if (stream == STREAMTYPE_AUDIO) {
484 // set AAC input buffer size to 32K bytes (256kbps x 1sec)
485 (*meta)->setInt32(kKeyMaxInputSize, 32 * 1024);
486 } else if (stream == STREAMTYPE_VIDEO) {
487 (*meta)->setInt32(kKeyMaxWidth, mMaxWidth);
488 (*meta)->setInt32(kKeyMaxHeight, mMaxHeight);
489 }
490
491 return OK;
492 }
493
getHTTPDownloader()494 sp<HTTPDownloader> LiveSession::getHTTPDownloader() {
495 return new HTTPDownloader(mHTTPService, mExtraHeaders);
496 }
497
connectAsync(const char * url,const KeyedVector<String8,String8> * headers)498 void LiveSession::connectAsync(
499 const char *url, const KeyedVector<String8, String8> *headers) {
500 sp<AMessage> msg = new AMessage(kWhatConnect, this);
501 msg->setString("url", url);
502
503 if (headers != NULL) {
504 msg->setPointer(
505 "headers",
506 new KeyedVector<String8, String8>(*headers));
507 }
508
509 msg->post();
510 }
511
disconnect()512 status_t LiveSession::disconnect() {
513 sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
514
515 sp<AMessage> response;
516 status_t err = msg->postAndAwaitResponse(&response);
517
518 return err;
519 }
520
seekTo(int64_t timeUs)521 status_t LiveSession::seekTo(int64_t timeUs) {
522 sp<AMessage> msg = new AMessage(kWhatSeek, this);
523 msg->setInt64("timeUs", timeUs);
524
525 sp<AMessage> response;
526 status_t err = msg->postAndAwaitResponse(&response);
527
528 return err;
529 }
530
checkSwitchProgress(sp<AMessage> & stopParams,int64_t delayUs,bool * needResumeUntil)531 bool LiveSession::checkSwitchProgress(
532 sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) {
533 AString newUri;
534 CHECK(stopParams->findString("uri", &newUri));
535
536 *needResumeUntil = false;
537 sp<AMessage> firstNewMeta[kMaxStreams];
538 for (size_t i = 0; i < kMaxStreams; ++i) {
539 StreamType stream = indexToType(i);
540 if (!(mSwapMask & mNewStreamMask & stream)
541 || (mStreams[i].mNewUri != newUri)) {
542 continue;
543 }
544 if (stream == STREAMTYPE_SUBTITLES) {
545 continue;
546 }
547 sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i);
548
549 // First, get latest dequeued meta, which is where the decoder is at.
550 // (when upswitching, we take the meta after a certain delay, so that
551 // the decoder is left with some cushion)
552 sp<AMessage> lastDequeueMeta, lastEnqueueMeta;
553 if (delayUs > 0) {
554 lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs);
555 if (lastDequeueMeta == NULL) {
556 // this means we don't have enough cushion, try again later
557 ALOGV("[%s] up switching failed due to insufficient buffer",
558 getNameForStream(stream));
559 return false;
560 }
561 } else {
562 // It's okay for lastDequeueMeta to be NULL here, it means the
563 // decoder hasn't even started dequeueing
564 lastDequeueMeta = source->getLatestDequeuedMeta();
565 }
566 // Then, trim off packets at beginning of mPacketSources2 that's before
567 // the latest dequeued time. These samples are definitely too late.
568 firstNewMeta[i] = mPacketSources2.editValueAt(i)
569 ->trimBuffersBeforeMeta(lastDequeueMeta);
570
571 // Now firstNewMeta[i] is the first sample after the trim.
572 // If it's NULL, we failed because dequeue already past all samples
573 // in mPacketSource2, we have to try again.
574 if (firstNewMeta[i] == NULL) {
575 HLSTime dequeueTime(lastDequeueMeta);
576 ALOGV("[%s] dequeue time (%d, %lld) past start time",
577 getNameForStream(stream),
578 dequeueTime.mSeq, (long long) dequeueTime.mTimeUs);
579 return false;
580 }
581
582 // Otherwise, we check if mPacketSources2 overlaps with what old fetcher
583 // already fetched, and see if we need to resumeUntil
584 lastEnqueueMeta = source->getLatestEnqueuedMeta();
585 // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity
586 // boundary, no need to resume as the content will look different anyways
587 if (lastEnqueueMeta != NULL) {
588 HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]);
589
590 // no need to resume old fetcher if new fetcher started in different
591 // discontinuity sequence, as the content will look different.
592 *needResumeUntil |= (startTime.mSeq == lastTime.mSeq
593 && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs);
594
595 // update the stopTime for resumeUntil
596 stopParams->setInt32("discontinuitySeq", startTime.mSeq);
597 stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs);
598 }
599 }
600
601 // if we're here, it means dequeue progress hasn't passed some samples in
602 // mPacketSource2, we can trim off the excess in mPacketSource.
603 // (old fetcher might still need to resumeUntil the start time of new fetcher)
604 for (size_t i = 0; i < kMaxStreams; ++i) {
605 StreamType stream = indexToType(i);
606 if (!(mSwapMask & mNewStreamMask & stream)
607 || (newUri != mStreams[i].mNewUri)
608 || stream == STREAMTYPE_SUBTITLES) {
609 continue;
610 }
611 mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]);
612 }
613
614 // no resumeUntil if already underflow
615 *needResumeUntil &= !mBuffering;
616
617 return true;
618 }
619
onMessageReceived(const sp<AMessage> & msg)620 void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
621 switch (msg->what()) {
622 case kWhatConnect:
623 {
624 onConnect(msg);
625 break;
626 }
627
628 case kWhatDisconnect:
629 {
630 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
631
632 if (mReconfigurationInProgress) {
633 break;
634 }
635
636 finishDisconnect();
637 break;
638 }
639
640 case kWhatSeek:
641 {
642 if (mReconfigurationInProgress) {
643 msg->post(50000);
644 break;
645 }
646
647 CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
648 mSeekReply = new AMessage;
649
650 onSeek(msg);
651 break;
652 }
653
654 case kWhatFetcherNotify:
655 {
656 int32_t what;
657 CHECK(msg->findInt32("what", &what));
658
659 switch (what) {
660 case PlaylistFetcher::kWhatStarted:
661 break;
662 case PlaylistFetcher::kWhatPaused:
663 case PlaylistFetcher::kWhatStopped:
664 {
665 AString uri;
666 CHECK(msg->findString("uri", &uri));
667 ssize_t index = mFetcherInfos.indexOfKey(uri);
668 if (index < 0) {
669 // ignore msgs from fetchers that's already gone
670 break;
671 }
672
673 ALOGV("fetcher-%d %s",
674 mFetcherInfos[index].mFetcher->getFetcherID(),
675 what == PlaylistFetcher::kWhatPaused ?
676 "paused" : "stopped");
677
678 if (what == PlaylistFetcher::kWhatStopped) {
679 mFetcherLooper->unregisterHandler(
680 mFetcherInfos[index].mFetcher->id());
681 mFetcherInfos.removeItemsAt(index);
682 } else if (what == PlaylistFetcher::kWhatPaused) {
683 int32_t seekMode;
684 CHECK(msg->findInt32("seekMode", &seekMode));
685 for (size_t i = 0; i < kMaxStreams; ++i) {
686 if (mStreams[i].mUri == uri) {
687 mStreams[i].mSeekMode = (SeekMode) seekMode;
688 }
689 }
690 }
691
692 if (mContinuation != NULL) {
693 CHECK_GT(mContinuationCounter, 0);
694 if (--mContinuationCounter == 0) {
695 mContinuation->post();
696 }
697 ALOGV("%zu fetcher(s) left", mContinuationCounter);
698 }
699 break;
700 }
701
702 case PlaylistFetcher::kWhatDurationUpdate:
703 {
704 AString uri;
705 CHECK(msg->findString("uri", &uri));
706
707 int64_t durationUs;
708 CHECK(msg->findInt64("durationUs", &durationUs));
709
710 ssize_t index = mFetcherInfos.indexOfKey(uri);
711 if (index >= 0) {
712 FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
713 info->mDurationUs = durationUs;
714 }
715 break;
716 }
717
718 case PlaylistFetcher::kWhatTargetDurationUpdate:
719 {
720 int64_t targetDurationUs;
721 CHECK(msg->findInt64("targetDurationUs", &targetDurationUs));
722 mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4);
723 mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4);
724 mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs);
725 break;
726 }
727
728 case PlaylistFetcher::kWhatError:
729 {
730 status_t err;
731 CHECK(msg->findInt32("err", &err));
732
733 ALOGE("XXX Received error %d from PlaylistFetcher.", err);
734
735 // handle EOS on subtitle tracks independently
736 AString uri;
737 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
738 ssize_t i = mFetcherInfos.indexOfKey(uri);
739 if (i >= 0) {
740 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
741 if (fetcher != NULL) {
742 uint32_t type = fetcher->getStreamTypeMask();
743 if (type == STREAMTYPE_SUBTITLES) {
744 mPacketSources.valueFor(
745 STREAMTYPE_SUBTITLES)->signalEOS(err);;
746 break;
747 }
748 }
749 }
750 }
751
752 // remember the failure index (as mCurBandwidthIndex will be restored
753 // after cancelBandwidthSwitch()), and record last fail time
754 size_t failureIndex = mCurBandwidthIndex;
755 mBandwidthItems.editItemAt(
756 failureIndex).mLastFailureUs = ALooper::GetNowUs();
757
758 if (mSwitchInProgress) {
759 // if error happened when we switch to a variant, try fallback
760 // to other variant to save the session
761 if (tryBandwidthFallback()) {
762 break;
763 }
764 }
765
766 if (mInPreparationPhase) {
767 postPrepared(err);
768 }
769
770 cancelBandwidthSwitch();
771
772 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
773
774 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
775
776 mPacketSources.valueFor(
777 STREAMTYPE_SUBTITLES)->signalEOS(err);
778
779 postError(err);
780 break;
781 }
782
783 case PlaylistFetcher::kWhatStopReached:
784 {
785 ALOGV("kWhatStopReached");
786
787 AString oldUri;
788 CHECK(msg->findString("uri", &oldUri));
789
790 ssize_t index = mFetcherInfos.indexOfKey(oldUri);
791 if (index < 0) {
792 break;
793 }
794
795 tryToFinishBandwidthSwitch(oldUri);
796 break;
797 }
798
799 case PlaylistFetcher::kWhatStartedAt:
800 {
801 int32_t switchGeneration;
802 CHECK(msg->findInt32("switchGeneration", &switchGeneration));
803
804 ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d",
805 switchGeneration, mSwitchGeneration);
806
807 if (switchGeneration != mSwitchGeneration) {
808 break;
809 }
810
811 AString uri;
812 CHECK(msg->findString("uri", &uri));
813
814 // mark new fetcher mToBeResumed
815 ssize_t index = mFetcherInfos.indexOfKey(uri);
816 if (index >= 0) {
817 mFetcherInfos.editValueAt(index).mToBeResumed = true;
818 }
819
820 // temporarily disable packet sources to be swapped to prevent
821 // NuPlayerDecoder from dequeuing while we check progress
822 for (size_t i = 0; i < mPacketSources.size(); ++i) {
823 if ((mSwapMask & mPacketSources.keyAt(i))
824 && uri == mStreams[i].mNewUri) {
825 mPacketSources.editValueAt(i)->enable(false);
826 }
827 }
828 bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex);
829 // If switching up, require a cushion bigger than kUnderflowMark
830 // to avoid buffering immediately after the switch.
831 // (If we don't have that cushion we'd rather cancel and try again.)
832 int64_t delayUs = switchUp ? (kUnderflowMarkUs + 1000000ll) : 0;
833 bool needResumeUntil = false;
834 sp<AMessage> stopParams = msg;
835 if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) {
836 // playback time hasn't passed startAt time
837 if (!needResumeUntil) {
838 ALOGV("finish switch");
839 for (size_t i = 0; i < kMaxStreams; ++i) {
840 if ((mSwapMask & indexToType(i))
841 && uri == mStreams[i].mNewUri) {
842 // have to make a copy of mStreams[i].mUri because
843 // tryToFinishBandwidthSwitch is modifying mStreams[]
844 AString oldURI = mStreams[i].mUri;
845 tryToFinishBandwidthSwitch(oldURI);
846 break;
847 }
848 }
849 } else {
850 // startAt time is after last enqueue time
851 // Resume fetcher for the original variant; the resumed fetcher should
852 // continue until the timestamps found in msg, which is stored by the
853 // new fetcher to indicate where the new variant has started buffering.
854 ALOGV("finish switch with resumeUntilAsync");
855 for (size_t i = 0; i < mFetcherInfos.size(); i++) {
856 const FetcherInfo &info = mFetcherInfos.valueAt(i);
857 if (info.mToBeRemoved) {
858 info.mFetcher->resumeUntilAsync(stopParams);
859 }
860 }
861 }
862 } else {
863 // playback time passed startAt time
864 if (switchUp) {
865 // if switching up, cancel and retry if condition satisfies again
866 ALOGV("cancel up switch because we're too late");
867 cancelBandwidthSwitch(true /* resume */);
868 } else {
869 ALOGV("retry down switch at next sample");
870 resumeFetcher(uri, mSwapMask, -1, true /* newUri */);
871 }
872 }
873 // re-enable all packet sources
874 for (size_t i = 0; i < mPacketSources.size(); ++i) {
875 mPacketSources.editValueAt(i)->enable(true);
876 }
877
878 break;
879 }
880
881 case PlaylistFetcher::kWhatPlaylistFetched:
882 {
883 onMasterPlaylistFetched(msg);
884 break;
885 }
886
887 case PlaylistFetcher::kWhatMetadataDetected:
888 {
889 if (!mHasMetadata) {
890 mHasMetadata = true;
891 sp<AMessage> notify = mNotify->dup();
892 notify->setInt32("what", kWhatMetadataDetected);
893 notify->post();
894 }
895 break;
896 }
897
898 default:
899 TRESPASS();
900 }
901
902 break;
903 }
904
905 case kWhatChangeConfiguration:
906 {
907 onChangeConfiguration(msg);
908 break;
909 }
910
911 case kWhatChangeConfiguration2:
912 {
913 onChangeConfiguration2(msg);
914 break;
915 }
916
917 case kWhatChangeConfiguration3:
918 {
919 onChangeConfiguration3(msg);
920 break;
921 }
922
923 case kWhatPollBuffering:
924 {
925 int32_t generation;
926 CHECK(msg->findInt32("generation", &generation));
927 if (generation == mPollBufferingGeneration) {
928 onPollBuffering();
929 }
930 break;
931 }
932
933 default:
934 TRESPASS();
935 break;
936 }
937 }
938
939 // static
isBandwidthValid(const BandwidthItem & item)940 bool LiveSession::isBandwidthValid(const BandwidthItem &item) {
941 static const int64_t kBlacklistWindowUs = 300 * 1000000ll;
942 return item.mLastFailureUs < 0
943 || ALooper::GetNowUs() - item.mLastFailureUs > kBlacklistWindowUs;
944 }
945
946 // static
SortByBandwidth(const BandwidthItem * a,const BandwidthItem * b)947 int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
948 if (a->mBandwidth < b->mBandwidth) {
949 return -1;
950 } else if (a->mBandwidth == b->mBandwidth) {
951 return 0;
952 }
953
954 return 1;
955 }
956
957 // static
indexToType(int idx)958 LiveSession::StreamType LiveSession::indexToType(int idx) {
959 CHECK(idx >= 0 && idx < kNumSources);
960 return (StreamType)(1 << idx);
961 }
962
963 // static
typeToIndex(int32_t type)964 ssize_t LiveSession::typeToIndex(int32_t type) {
965 switch (type) {
966 case STREAMTYPE_AUDIO:
967 return 0;
968 case STREAMTYPE_VIDEO:
969 return 1;
970 case STREAMTYPE_SUBTITLES:
971 return 2;
972 case STREAMTYPE_METADATA:
973 return 3;
974 default:
975 return -1;
976 };
977 return -1;
978 }
979
onConnect(const sp<AMessage> & msg)980 void LiveSession::onConnect(const sp<AMessage> &msg) {
981 CHECK(msg->findString("url", &mMasterURL));
982
983 // TODO currently we don't know if we are coming here from incognito mode
984 ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str());
985
986 KeyedVector<String8, String8> *headers = NULL;
987 if (!msg->findPointer("headers", (void **)&headers)) {
988 mExtraHeaders.clear();
989 } else {
990 mExtraHeaders = *headers;
991
992 delete headers;
993 headers = NULL;
994 }
995
996 // create looper for fetchers
997 if (mFetcherLooper == NULL) {
998 mFetcherLooper = new ALooper();
999
1000 mFetcherLooper->setName("Fetcher");
1001 mFetcherLooper->start(false, false);
1002 }
1003
1004 // create fetcher to fetch the master playlist
1005 addFetcher(mMasterURL.c_str())->fetchPlaylistAsync();
1006 }
1007
onMasterPlaylistFetched(const sp<AMessage> & msg)1008 void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) {
1009 AString uri;
1010 CHECK(msg->findString("uri", &uri));
1011 ssize_t index = mFetcherInfos.indexOfKey(uri);
1012 if (index < 0) {
1013 ALOGW("fetcher for master playlist is gone.");
1014 return;
1015 }
1016
1017 // no longer useful, remove
1018 mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id());
1019 mFetcherInfos.removeItemsAt(index);
1020
1021 CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist));
1022 if (mPlaylist == NULL) {
1023 ALOGE("unable to fetch master playlist %s.",
1024 uriDebugString(mMasterURL).c_str());
1025
1026 postPrepared(ERROR_IO);
1027 return;
1028 }
1029 // We trust the content provider to make a reasonable choice of preferred
1030 // initial bandwidth by listing it first in the variant playlist.
1031 // At startup we really don't have a good estimate on the available
1032 // network bandwidth since we haven't tranferred any data yet. Once
1033 // we have we can make a better informed choice.
1034 size_t initialBandwidth = 0;
1035 size_t initialBandwidthIndex = 0;
1036
1037 int32_t maxWidth = 0;
1038 int32_t maxHeight = 0;
1039
1040 if (mPlaylist->isVariantPlaylist()) {
1041 Vector<BandwidthItem> itemsWithVideo;
1042 for (size_t i = 0; i < mPlaylist->size(); ++i) {
1043 BandwidthItem item;
1044
1045 item.mPlaylistIndex = i;
1046 item.mLastFailureUs = -1ll;
1047
1048 sp<AMessage> meta;
1049 AString uri;
1050 mPlaylist->itemAt(i, &uri, &meta);
1051
1052 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
1053
1054 int32_t width, height;
1055 if (meta->findInt32("width", &width)) {
1056 maxWidth = max(maxWidth, width);
1057 }
1058 if (meta->findInt32("height", &height)) {
1059 maxHeight = max(maxHeight, height);
1060 }
1061
1062 mBandwidthItems.push(item);
1063 if (mPlaylist->hasType(i, "video")) {
1064 itemsWithVideo.push(item);
1065 }
1066 }
1067 // remove the audio-only variants if we have at least one with video
1068 if (!itemsWithVideo.empty()
1069 && itemsWithVideo.size() < mBandwidthItems.size()) {
1070 mBandwidthItems.clear();
1071 for (size_t i = 0; i < itemsWithVideo.size(); ++i) {
1072 mBandwidthItems.push(itemsWithVideo[i]);
1073 }
1074 }
1075
1076 CHECK_GT(mBandwidthItems.size(), 0u);
1077 initialBandwidth = mBandwidthItems[0].mBandwidth;
1078
1079 mBandwidthItems.sort(SortByBandwidth);
1080
1081 for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
1082 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
1083 initialBandwidthIndex = i;
1084 break;
1085 }
1086 }
1087 } else {
1088 // dummy item.
1089 BandwidthItem item;
1090 item.mPlaylistIndex = 0;
1091 item.mBandwidth = 0;
1092 mBandwidthItems.push(item);
1093 }
1094
1095 mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth;
1096 mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight;
1097
1098 mPlaylist->pickRandomMediaItems();
1099 changeConfiguration(
1100 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
1101 }
1102
finishDisconnect()1103 void LiveSession::finishDisconnect() {
1104 ALOGV("finishDisconnect");
1105
1106 // No reconfiguration is currently pending, make sure none will trigger
1107 // during disconnection either.
1108 cancelBandwidthSwitch();
1109
1110 // cancel buffer polling
1111 cancelPollBuffering();
1112
1113 // TRICKY: don't wait for all fetcher to be stopped when disconnecting
1114 //
1115 // Some fetchers might be stuck in connect/getSize at this point. These
1116 // operations will eventually timeout (as we have a timeout set in
1117 // MediaHTTPConnection), but we don't want to block the main UI thread
1118 // until then. Here we just need to make sure we clear all references
1119 // to the fetchers, so that when they finally exit from the blocking
1120 // operation, they can be destructed.
1121 //
1122 // There is one very tricky point though. For this scheme to work, the
1123 // fecther must hold a reference to LiveSession, so that LiveSession is
1124 // destroyed after fetcher. Otherwise LiveSession would get stuck in its
1125 // own destructor when it waits for mFetcherLooper to stop, which still
1126 // blocks main UI thread.
1127 for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1128 mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1129 mFetcherLooper->unregisterHandler(
1130 mFetcherInfos.valueAt(i).mFetcher->id());
1131 }
1132 mFetcherInfos.clear();
1133
1134 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
1135 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
1136
1137 mPacketSources.valueFor(
1138 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
1139
1140 sp<AMessage> response = new AMessage;
1141 response->setInt32("err", OK);
1142
1143 response->postReply(mDisconnectReplyID);
1144 mDisconnectReplyID.clear();
1145 }
1146
addFetcher(const char * uri)1147 sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
1148 ssize_t index = mFetcherInfos.indexOfKey(uri);
1149
1150 if (index >= 0) {
1151 return NULL;
1152 }
1153
1154 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this);
1155 notify->setString("uri", uri);
1156 notify->setInt32("switchGeneration", mSwitchGeneration);
1157
1158 FetcherInfo info;
1159 info.mFetcher = new PlaylistFetcher(
1160 notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration);
1161 info.mDurationUs = -1ll;
1162 info.mToBeRemoved = false;
1163 info.mToBeResumed = false;
1164 mFetcherLooper->registerHandler(info.mFetcher);
1165
1166 mFetcherInfos.add(uri, info);
1167
1168 return info.mFetcher;
1169 }
1170
1171 #if 0
1172 static double uniformRand() {
1173 return (double)rand() / RAND_MAX;
1174 }
1175 #endif
1176
UriIsSameAsIndex(const AString & uri,int32_t i,bool newUri)1177 bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) {
1178 ALOGI("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i,
1179 newUri ? "true" : "false",
1180 newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str());
1181 return i >= 0
1182 && ((!newUri && uri == mStreams[i].mUri)
1183 || (newUri && uri == mStreams[i].mNewUri));
1184 }
1185
getPacketSourceForStreamIndex(size_t trackIndex,bool newUri)1186 sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex(
1187 size_t trackIndex, bool newUri) {
1188 StreamType type = indexToType(trackIndex);
1189 sp<AnotherPacketSource> source = NULL;
1190 if (newUri) {
1191 source = mPacketSources2.valueFor(type);
1192 source->clear();
1193 } else {
1194 source = mPacketSources.valueFor(type);
1195 };
1196 return source;
1197 }
1198
getMetadataSource(sp<AnotherPacketSource> sources[kNumSources],uint32_t streamMask,bool newUri)1199 sp<AnotherPacketSource> LiveSession::getMetadataSource(
1200 sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) {
1201 // todo: One case where the following strategy can fail is when audio and video
1202 // are in separate playlists, both are transport streams, and the metadata
1203 // is actually contained in the audio stream.
1204 ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s",
1205 streamMask, newUri ? "true" : "false");
1206
1207 if ((sources[kVideoIndex] != NULL) // video fetcher; or ...
1208 || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) {
1209 // ... audio fetcher for audio only variant
1210 return getPacketSourceForStreamIndex(kMetaDataIndex, newUri);
1211 }
1212
1213 return NULL;
1214 }
1215
resumeFetcher(const AString & uri,uint32_t streamMask,int64_t timeUs,bool newUri)1216 bool LiveSession::resumeFetcher(
1217 const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) {
1218 ssize_t index = mFetcherInfos.indexOfKey(uri);
1219 if (index < 0) {
1220 ALOGE("did not find fetcher for uri: %s", uri.c_str());
1221 return false;
1222 }
1223
1224 bool resume = false;
1225 sp<AnotherPacketSource> sources[kNumSources];
1226 for (size_t i = 0; i < kMaxStreams; ++i) {
1227 if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) {
1228 resume = true;
1229 sources[i] = getPacketSourceForStreamIndex(i, newUri);
1230 }
1231 }
1232
1233 if (resume) {
1234 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher;
1235 SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition;
1236
1237 ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d",
1238 fetcher->getFetcherID(), (long long)timeUs, seekMode);
1239
1240 fetcher->startAsync(
1241 sources[kAudioIndex],
1242 sources[kVideoIndex],
1243 sources[kSubtitleIndex],
1244 getMetadataSource(sources, streamMask, newUri),
1245 timeUs, -1, -1, seekMode);
1246 }
1247
1248 return resume;
1249 }
1250
getAbortThreshold(ssize_t currentBWIndex,ssize_t targetBWIndex) const1251 float LiveSession::getAbortThreshold(
1252 ssize_t currentBWIndex, ssize_t targetBWIndex) const {
1253 float abortThreshold = -1.0f;
1254 if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) {
1255 /*
1256 If we're switching down, we need to decide whether to
1257
1258 1) finish last segment of high-bandwidth variant, or
1259 2) abort last segment of high-bandwidth variant, and fetch an
1260 overlapping portion from low-bandwidth variant.
1261
1262 Here we try to maximize the amount of buffer left when the
1263 switch point is met. Given the following parameters:
1264
1265 B: our current buffering level in seconds
1266 T: target duration in seconds
1267 X: sample duration in seconds remain to fetch in last segment
1268 bw0: bandwidth of old variant (as specified in playlist)
1269 bw1: bandwidth of new variant (as specified in playlist)
1270 bw: measured bandwidth available
1271
1272 If we choose 1), when switch happens at the end of current
1273 segment, our buffering will be
1274 B + X - X * bw0 / bw
1275
1276 If we choose 2), when switch happens where we aborted current
1277 segment, our buffering will be
1278 B - (T - X) * bw1 / bw
1279
1280 We should only choose 1) if
1281 X/T < bw1 / (bw1 + bw0 - bw)
1282 */
1283
1284 // abort old bandwidth immediately if bandwidth is fluctuating a lot.
1285 // our estimate could be far off, and fetching old bandwidth could
1286 // take too long.
1287 if (!mLastBandwidthStable) {
1288 return 0.0f;
1289 }
1290
1291 // Taking the measured current bandwidth at 50% face value only,
1292 // as our bandwidth estimation is a lagging indicator. Being
1293 // conservative on this, we prefer switching to lower bandwidth
1294 // unless we're really confident finishing up the last segment
1295 // of higher bandwidth will be fast.
1296 CHECK(mLastBandwidthBps >= 0);
1297 abortThreshold =
1298 (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1299 / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1300 + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
1301 - (float)mLastBandwidthBps * 0.5f);
1302 if (abortThreshold < 0.0f) {
1303 abortThreshold = -1.0f; // do not abort
1304 }
1305 ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f",
1306 mBandwidthItems.itemAt(currentBWIndex).mBandwidth,
1307 mBandwidthItems.itemAt(targetBWIndex).mBandwidth,
1308 mLastBandwidthBps,
1309 abortThreshold);
1310 }
1311 return abortThreshold;
1312 }
1313
addBandwidthMeasurement(size_t numBytes,int64_t delayUs)1314 void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
1315 mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
1316 }
1317
getLowestValidBandwidthIndex() const1318 ssize_t LiveSession::getLowestValidBandwidthIndex() const {
1319 for (size_t index = 0; index < mBandwidthItems.size(); index++) {
1320 if (isBandwidthValid(mBandwidthItems[index])) {
1321 return index;
1322 }
1323 }
1324 // if playlists are all blacklisted, return 0 and hope it's alive
1325 return 0;
1326 }
1327
getBandwidthIndex(int32_t bandwidthBps)1328 size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) {
1329 if (mBandwidthItems.size() < 2) {
1330 // shouldn't be here if we only have 1 bandwidth, check
1331 // logic to get rid of redundant bandwidth polling
1332 ALOGW("getBandwidthIndex() called for single bandwidth playlist!");
1333 return 0;
1334 }
1335
1336 #if 1
1337 char value[PROPERTY_VALUE_MAX];
1338 ssize_t index = -1;
1339 if (property_get("media.httplive.bw-index", value, NULL)) {
1340 char *end;
1341 index = strtol(value, &end, 10);
1342 CHECK(end > value && *end == '\0');
1343
1344 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
1345 index = mBandwidthItems.size() - 1;
1346 }
1347 }
1348
1349 if (index < 0) {
1350 char value[PROPERTY_VALUE_MAX];
1351 if (property_get("media.httplive.max-bw", value, NULL)) {
1352 char *end;
1353 long maxBw = strtoul(value, &end, 10);
1354 if (end > value && *end == '\0') {
1355 if (maxBw > 0 && bandwidthBps > maxBw) {
1356 ALOGV("bandwidth capped to %ld bps", maxBw);
1357 bandwidthBps = maxBw;
1358 }
1359 }
1360 }
1361
1362 // Pick the highest bandwidth stream that's not currently blacklisted
1363 // below or equal to estimated bandwidth.
1364
1365 index = mBandwidthItems.size() - 1;
1366 ssize_t lowestBandwidth = getLowestValidBandwidthIndex();
1367 while (index > lowestBandwidth) {
1368 // be conservative (70%) to avoid overestimating and immediately
1369 // switching down again.
1370 size_t adjustedBandwidthBps = bandwidthBps * 7 / 10;
1371 const BandwidthItem &item = mBandwidthItems[index];
1372 if (item.mBandwidth <= adjustedBandwidthBps
1373 && isBandwidthValid(item)) {
1374 break;
1375 }
1376 --index;
1377 }
1378 }
1379 #elif 0
1380 // Change bandwidth at random()
1381 size_t index = uniformRand() * mBandwidthItems.size();
1382 #elif 0
1383 // There's a 50% chance to stay on the current bandwidth and
1384 // a 50% chance to switch to the next higher bandwidth (wrapping around
1385 // to lowest)
1386 const size_t kMinIndex = 0;
1387
1388 static ssize_t mCurBandwidthIndex = -1;
1389
1390 size_t index;
1391 if (mCurBandwidthIndex < 0) {
1392 index = kMinIndex;
1393 } else if (uniformRand() < 0.5) {
1394 index = (size_t)mCurBandwidthIndex;
1395 } else {
1396 index = mCurBandwidthIndex + 1;
1397 if (index == mBandwidthItems.size()) {
1398 index = kMinIndex;
1399 }
1400 }
1401 mCurBandwidthIndex = index;
1402 #elif 0
1403 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1404
1405 size_t index = mBandwidthItems.size() - 1;
1406 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1407 --index;
1408 }
1409 #elif 1
1410 char value[PROPERTY_VALUE_MAX];
1411 size_t index;
1412 if (property_get("media.httplive.bw-index", value, NULL)) {
1413 char *end;
1414 index = strtoul(value, &end, 10);
1415 CHECK(end > value && *end == '\0');
1416
1417 if (index >= mBandwidthItems.size()) {
1418 index = mBandwidthItems.size() - 1;
1419 }
1420 } else {
1421 index = 0;
1422 }
1423 #else
1424 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream
1425 #endif
1426
1427 CHECK_GE(index, 0);
1428
1429 return index;
1430 }
1431
latestMediaSegmentStartTime() const1432 HLSTime LiveSession::latestMediaSegmentStartTime() const {
1433 HLSTime audioTime(mPacketSources.valueFor(
1434 STREAMTYPE_AUDIO)->getLatestDequeuedMeta());
1435
1436 HLSTime videoTime(mPacketSources.valueFor(
1437 STREAMTYPE_VIDEO)->getLatestDequeuedMeta());
1438
1439 return audioTime < videoTime ? videoTime : audioTime;
1440 }
1441
onSeek(const sp<AMessage> & msg)1442 void LiveSession::onSeek(const sp<AMessage> &msg) {
1443 int64_t timeUs;
1444 CHECK(msg->findInt64("timeUs", &timeUs));
1445 changeConfiguration(timeUs);
1446 }
1447
getDuration(int64_t * durationUs) const1448 status_t LiveSession::getDuration(int64_t *durationUs) const {
1449 int64_t maxDurationUs = -1ll;
1450 for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1451 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1452
1453 if (fetcherDurationUs > maxDurationUs) {
1454 maxDurationUs = fetcherDurationUs;
1455 }
1456 }
1457
1458 *durationUs = maxDurationUs;
1459
1460 return OK;
1461 }
1462
isSeekable() const1463 bool LiveSession::isSeekable() const {
1464 int64_t durationUs;
1465 return getDuration(&durationUs) == OK && durationUs >= 0;
1466 }
1467
hasDynamicDuration() const1468 bool LiveSession::hasDynamicDuration() const {
1469 return false;
1470 }
1471
getTrackCount() const1472 size_t LiveSession::getTrackCount() const {
1473 if (mPlaylist == NULL) {
1474 return 0;
1475 } else {
1476 return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0);
1477 }
1478 }
1479
getTrackInfo(size_t trackIndex) const1480 sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1481 if (mPlaylist == NULL) {
1482 return NULL;
1483 } else {
1484 if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) {
1485 sp<AMessage> format = new AMessage();
1486 format->setInt32("type", MEDIA_TRACK_TYPE_METADATA);
1487 format->setString("language", "und");
1488 format->setString("mime", MEDIA_MIMETYPE_DATA_TIMED_ID3);
1489 return format;
1490 }
1491 return mPlaylist->getTrackInfo(trackIndex);
1492 }
1493 }
1494
selectTrack(size_t index,bool select)1495 status_t LiveSession::selectTrack(size_t index, bool select) {
1496 if (mPlaylist == NULL) {
1497 return INVALID_OPERATION;
1498 }
1499
1500 ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++",
1501 index, select, mSubtitleGeneration);
1502
1503 ++mSubtitleGeneration;
1504 status_t err = mPlaylist->selectTrack(index, select);
1505 if (err == OK) {
1506 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
1507 msg->setInt32("pickTrack", select);
1508 msg->post();
1509 }
1510 return err;
1511 }
1512
getSelectedTrack(media_track_type type) const1513 ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1514 if (mPlaylist == NULL) {
1515 return -1;
1516 } else {
1517 return mPlaylist->getSelectedTrack(type);
1518 }
1519 }
1520
changeConfiguration(int64_t timeUs,ssize_t bandwidthIndex,bool pickTrack)1521 void LiveSession::changeConfiguration(
1522 int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
1523 ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d",
1524 (long long)timeUs, bandwidthIndex, pickTrack);
1525
1526 cancelBandwidthSwitch();
1527
1528 CHECK(!mReconfigurationInProgress);
1529 mReconfigurationInProgress = true;
1530 if (bandwidthIndex >= 0) {
1531 mOrigBandwidthIndex = mCurBandwidthIndex;
1532 mCurBandwidthIndex = bandwidthIndex;
1533 if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1534 ALOGI("#### Starting Bandwidth Switch: %zd => %zd",
1535 mOrigBandwidthIndex, mCurBandwidthIndex);
1536 }
1537 }
1538 CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size());
1539 const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
1540
1541 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1542 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1543
1544 AString URIs[kMaxStreams];
1545 for (size_t i = 0; i < kMaxStreams; ++i) {
1546 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1547 streamMask |= indexToType(i);
1548 }
1549 }
1550
1551 // Step 1, stop and discard fetchers that are no longer needed.
1552 // Pause those that we'll reuse.
1553 for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1554 // skip fetchers that are marked mToBeRemoved,
1555 // these are done and can't be reused
1556 if (mFetcherInfos[i].mToBeRemoved) {
1557 continue;
1558 }
1559
1560 const AString &uri = mFetcherInfos.keyAt(i);
1561 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher;
1562
1563 bool discardFetcher = true, delayRemoval = false;
1564 for (size_t j = 0; j < kMaxStreams; ++j) {
1565 StreamType type = indexToType(j);
1566 if ((streamMask & type) && uri == URIs[j]) {
1567 resumeMask |= type;
1568 streamMask &= ~type;
1569 discardFetcher = false;
1570 }
1571 }
1572 // Delay fetcher removal if not picking tracks, AND old fetcher
1573 // has stream mask that overlaps new variant. (Okay to discard
1574 // old fetcher now, if completely no overlap.)
1575 if (discardFetcher && timeUs < 0ll && !pickTrack
1576 && (fetcher->getStreamTypeMask() & streamMask)) {
1577 discardFetcher = false;
1578 delayRemoval = true;
1579 }
1580
1581 if (discardFetcher) {
1582 ALOGV("discarding fetcher-%d", fetcher->getFetcherID());
1583 fetcher->stopAsync();
1584 } else {
1585 float threshold = 0.0f; // default to pause after current block (47Kbytes)
1586 bool disconnect = false;
1587 if (timeUs >= 0ll) {
1588 // seeking, no need to finish fetching
1589 disconnect = true;
1590 } else if (delayRemoval) {
1591 // adapting, abort if remaining of current segment is over threshold
1592 threshold = getAbortThreshold(
1593 mOrigBandwidthIndex, mCurBandwidthIndex);
1594 }
1595
1596 ALOGV("pausing fetcher-%d, threshold=%.2f",
1597 fetcher->getFetcherID(), threshold);
1598 fetcher->pauseAsync(threshold, disconnect);
1599 }
1600 }
1601
1602 sp<AMessage> msg;
1603 if (timeUs < 0ll) {
1604 // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1605 msg = new AMessage(kWhatChangeConfiguration3, this);
1606 } else {
1607 msg = new AMessage(kWhatChangeConfiguration2, this);
1608 }
1609 msg->setInt32("streamMask", streamMask);
1610 msg->setInt32("resumeMask", resumeMask);
1611 msg->setInt32("pickTrack", pickTrack);
1612 msg->setInt64("timeUs", timeUs);
1613 for (size_t i = 0; i < kMaxStreams; ++i) {
1614 if ((streamMask | resumeMask) & indexToType(i)) {
1615 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1616 }
1617 }
1618
1619 // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1620 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1621 // fetchers have completed their asynchronous operation, we'll post
1622 // mContinuation, which then is handled below in onChangeConfiguration2.
1623 mContinuationCounter = mFetcherInfos.size();
1624 mContinuation = msg;
1625
1626 if (mContinuationCounter == 0) {
1627 msg->post();
1628 }
1629 }
1630
onChangeConfiguration(const sp<AMessage> & msg)1631 void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1632 ALOGV("onChangeConfiguration");
1633
1634 if (!mReconfigurationInProgress) {
1635 int32_t pickTrack = 0;
1636 msg->findInt32("pickTrack", &pickTrack);
1637 changeConfiguration(-1ll /* timeUs */, -1, pickTrack);
1638 } else {
1639 msg->post(1000000ll); // retry in 1 sec
1640 }
1641 }
1642
onChangeConfiguration2(const sp<AMessage> & msg)1643 void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1644 ALOGV("onChangeConfiguration2");
1645
1646 mContinuation.clear();
1647
1648 // All fetchers are either suspended or have been removed now.
1649
1650 // If we're seeking, clear all packet sources before we report
1651 // seek complete, to prevent decoder from pulling stale data.
1652 int64_t timeUs;
1653 CHECK(msg->findInt64("timeUs", &timeUs));
1654
1655 if (timeUs >= 0) {
1656 mLastSeekTimeUs = timeUs;
1657 mLastDequeuedTimeUs = timeUs;
1658
1659 for (size_t i = 0; i < mPacketSources.size(); i++) {
1660 sp<AnotherPacketSource> packetSource = mPacketSources.editValueAt(i);
1661 sp<MetaData> format = packetSource->getFormat();
1662 packetSource->clear();
1663 // Set a tentative format here such that HTTPLiveSource will always have
1664 // a format available when NuPlayer queries. Without an available video
1665 // format when setting a surface NuPlayer might disable video decoding
1666 // altogether. The tentative format will be overwritten by the
1667 // authoritative (and possibly same) format once content from the new
1668 // position is dequeued.
1669 packetSource->setFormat(format);
1670 }
1671
1672 for (size_t i = 0; i < kMaxStreams; ++i) {
1673 mStreams[i].reset();
1674 }
1675
1676 mDiscontinuityOffsetTimesUs.clear();
1677 mDiscontinuityAbsStartTimesUs.clear();
1678
1679 if (mSeekReplyID != NULL) {
1680 CHECK(mSeekReply != NULL);
1681 mSeekReply->setInt32("err", OK);
1682 mSeekReply->postReply(mSeekReplyID);
1683 mSeekReplyID.clear();
1684 mSeekReply.clear();
1685 }
1686
1687 // restart buffer polling after seek becauese previous
1688 // buffering position is no longer valid.
1689 restartPollBuffering();
1690 }
1691
1692 uint32_t streamMask, resumeMask;
1693 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1694 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1695
1696 streamMask |= resumeMask;
1697
1698 AString URIs[kMaxStreams];
1699 for (size_t i = 0; i < kMaxStreams; ++i) {
1700 if (streamMask & indexToType(i)) {
1701 const AString &uriKey = mStreams[i].uriKey();
1702 CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1703 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1704 }
1705 }
1706
1707 uint32_t changedMask = 0;
1708 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1709 // stream URI could change even if onChangeConfiguration2 is only
1710 // used for seek. Seek could happen during a bw switch, in this
1711 // case bw switch will be cancelled, but the seekTo position will
1712 // fetch from the new URI.
1713 if ((mStreamMask & streamMask & indexToType(i))
1714 && !mStreams[i].mUri.empty()
1715 && !(URIs[i] == mStreams[i].mUri)) {
1716 ALOGV("stream %zu changed: oldURI %s, newURI %s", i,
1717 mStreams[i].mUri.c_str(), URIs[i].c_str());
1718 sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
1719 if (source->getLatestDequeuedMeta() != NULL) {
1720 source->queueDiscontinuity(
1721 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1722 }
1723 }
1724 // Determine which decoders to shutdown on the player side,
1725 // a decoder has to be shutdown if its streamtype was active
1726 // before but now longer isn't.
1727 if ((mStreamMask & ~streamMask & indexToType(i))) {
1728 changedMask |= indexToType(i);
1729 }
1730 }
1731
1732 if (changedMask == 0) {
1733 // If nothing changed as far as the audio/video decoders
1734 // are concerned we can proceed.
1735 onChangeConfiguration3(msg);
1736 return;
1737 }
1738
1739 // Something changed, inform the player which will shutdown the
1740 // corresponding decoders and will post the reply once that's done.
1741 // Handling the reply will continue executing below in
1742 // onChangeConfiguration3.
1743 sp<AMessage> notify = mNotify->dup();
1744 notify->setInt32("what", kWhatStreamsChanged);
1745 notify->setInt32("changedMask", changedMask);
1746
1747 msg->setWhat(kWhatChangeConfiguration3);
1748 msg->setTarget(this);
1749
1750 notify->setMessage("reply", msg);
1751 notify->post();
1752 }
1753
onChangeConfiguration3(const sp<AMessage> & msg)1754 void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1755 mContinuation.clear();
1756 // All remaining fetchers are still suspended, the player has shutdown
1757 // any decoders that needed it.
1758
1759 uint32_t streamMask, resumeMask;
1760 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1761 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1762
1763 mNewStreamMask = streamMask | resumeMask;
1764
1765 int64_t timeUs;
1766 int32_t pickTrack;
1767 bool switching = false;
1768 CHECK(msg->findInt64("timeUs", &timeUs));
1769 CHECK(msg->findInt32("pickTrack", &pickTrack));
1770
1771 if (timeUs < 0ll) {
1772 if (!pickTrack) {
1773 // mSwapMask contains streams that are in both old and new variant,
1774 // (in mNewStreamMask & mStreamMask) but with different URIs
1775 // (not in resumeMask).
1776 // For example, old variant has video and audio in two separate
1777 // URIs, and new variant has only audio with unchanged URI. mSwapMask
1778 // should be 0 as there is nothing to swap. We only need to stop video,
1779 // and resume audio.
1780 mSwapMask = mNewStreamMask & mStreamMask & ~resumeMask;
1781 switching = (mSwapMask != 0);
1782 }
1783 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1784 } else {
1785 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1786 }
1787
1788 ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, "
1789 "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x",
1790 (long long)timeUs, switching, pickTrack,
1791 mStreamMask, mNewStreamMask, mSwapMask);
1792
1793 for (size_t i = 0; i < kMaxStreams; ++i) {
1794 if (streamMask & indexToType(i)) {
1795 if (switching) {
1796 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1797 } else {
1798 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1799 }
1800 }
1801 }
1802
1803 // Of all existing fetchers:
1804 // * Resume fetchers that are still needed and assign them original packet sources.
1805 // * Mark otherwise unneeded fetchers for removal.
1806 ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1807 for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1808 const AString &uri = mFetcherInfos.keyAt(i);
1809 if (!resumeFetcher(uri, resumeMask, timeUs)) {
1810 ALOGV("marking fetcher-%d to be removed",
1811 mFetcherInfos[i].mFetcher->getFetcherID());
1812
1813 mFetcherInfos.editValueAt(i).mToBeRemoved = true;
1814 }
1815 }
1816
1817 // streamMask now only contains the types that need a new fetcher created.
1818 if (streamMask != 0) {
1819 ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1820 }
1821
1822 // Find out when the original fetchers have buffered up to and start the new fetchers
1823 // at a later timestamp.
1824 for (size_t i = 0; i < kMaxStreams; i++) {
1825 if (!(indexToType(i) & streamMask)) {
1826 continue;
1827 }
1828
1829 AString uri;
1830 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1831
1832 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1833 CHECK(fetcher != NULL);
1834
1835 HLSTime startTime;
1836 SeekMode seekMode = kSeekModeExactPosition;
1837 sp<AnotherPacketSource> sources[kNumSources];
1838
1839 if (i == kSubtitleIndex || (!pickTrack && !switching)) {
1840 startTime = latestMediaSegmentStartTime();
1841 }
1842
1843 // TRICKY: looping from i as earlier streams are already removed from streamMask
1844 for (size_t j = i; j < kMaxStreams; ++j) {
1845 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1846 if ((streamMask & indexToType(j)) && uri == streamUri) {
1847 sources[j] = mPacketSources.valueFor(indexToType(j));
1848
1849 if (timeUs >= 0) {
1850 startTime.mTimeUs = timeUs;
1851 } else {
1852 int32_t type;
1853 sp<AMessage> meta;
1854 if (!switching) {
1855 // selecting, or adapting but no swap required
1856 meta = sources[j]->getLatestDequeuedMeta();
1857 } else {
1858 // adapting and swap required
1859 meta = sources[j]->getLatestEnqueuedMeta();
1860 if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
1861 // switching up
1862 meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
1863 }
1864 }
1865
1866 if ((j == kAudioIndex || j == kVideoIndex)
1867 && meta != NULL && !meta->findInt32("discontinuity", &type)) {
1868 HLSTime tmpTime(meta);
1869 if (startTime < tmpTime) {
1870 startTime = tmpTime;
1871 }
1872 }
1873
1874 if (!switching) {
1875 // selecting, or adapting but no swap required
1876 sources[j]->clear();
1877 if (j == kSubtitleIndex) {
1878 break;
1879 }
1880
1881 ALOGV("stream[%zu]: queue format change", j);
1882 sources[j]->queueDiscontinuity(
1883 ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
1884 } else {
1885 // switching, queue discontinuities after resume
1886 sources[j] = mPacketSources2.valueFor(indexToType(j));
1887 sources[j]->clear();
1888 // the new fetcher might be providing streams that used to be
1889 // provided by two different fetchers, if one of the fetcher
1890 // paused in the middle while the other somehow paused in next
1891 // seg, we have to start from next seg.
1892 if (seekMode < mStreams[j].mSeekMode) {
1893 seekMode = mStreams[j].mSeekMode;
1894 }
1895 }
1896 }
1897
1898 streamMask &= ~indexToType(j);
1899 }
1900 }
1901
1902 ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld "
1903 "segmentStartTimeUs %lld seekMode %d",
1904 fetcher->getFetcherID(),
1905 (long long)startTime.mTimeUs,
1906 (long long)mLastSeekTimeUs,
1907 (long long)startTime.getSegmentTimeUs(),
1908 seekMode);
1909
1910 // Set the target segment start time to the middle point of the
1911 // segment where the last sample was.
1912 // This gives a better guess if segments of the two variants are not
1913 // perfectly aligned. (If the corresponding segment in new variant
1914 // starts slightly later than that in the old variant, we still want
1915 // to pick that segment, not the one before)
1916 fetcher->startAsync(
1917 sources[kAudioIndex],
1918 sources[kVideoIndex],
1919 sources[kSubtitleIndex],
1920 getMetadataSource(sources, mNewStreamMask, switching),
1921 startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs,
1922 startTime.getSegmentTimeUs(),
1923 startTime.mSeq,
1924 seekMode);
1925 }
1926
1927 // All fetchers have now been started, the configuration change
1928 // has completed.
1929
1930 mReconfigurationInProgress = false;
1931 if (switching) {
1932 mSwitchInProgress = true;
1933 } else {
1934 mStreamMask = mNewStreamMask;
1935 if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1936 ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd",
1937 mOrigBandwidthIndex, mCurBandwidthIndex);
1938 mOrigBandwidthIndex = mCurBandwidthIndex;
1939 }
1940 }
1941
1942 ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x",
1943 mSwitchInProgress, mStreamMask);
1944
1945 if (mDisconnectReplyID != NULL) {
1946 finishDisconnect();
1947 }
1948 }
1949
swapPacketSource(StreamType stream)1950 void LiveSession::swapPacketSource(StreamType stream) {
1951 ALOGV("[%s] swapPacketSource", getNameForStream(stream));
1952
1953 // transfer packets from source2 to source
1954 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
1955 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
1956
1957 // queue discontinuity in mPacketSource
1958 aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false);
1959
1960 // queue packets in mPacketSource2 to mPacketSource
1961 status_t finalResult = OK;
1962 sp<ABuffer> accessUnit;
1963 while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK &&
1964 OK == aps2->dequeueAccessUnit(&accessUnit)) {
1965 aps->queueAccessUnit(accessUnit);
1966 }
1967 aps2->clear();
1968 }
1969
tryToFinishBandwidthSwitch(const AString & oldUri)1970 void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) {
1971 if (!mSwitchInProgress) {
1972 return;
1973 }
1974
1975 ssize_t index = mFetcherInfos.indexOfKey(oldUri);
1976 if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
1977 return;
1978 }
1979
1980 // Swap packet source of streams provided by old variant
1981 for (size_t idx = 0; idx < kMaxStreams; idx++) {
1982 StreamType stream = indexToType(idx);
1983 if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) {
1984 swapPacketSource(stream);
1985
1986 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
1987 ALOGW("swapping stream type %d %s to empty stream",
1988 stream, mStreams[idx].mUri.c_str());
1989 }
1990 mStreams[idx].mUri = mStreams[idx].mNewUri;
1991 mStreams[idx].mNewUri.clear();
1992
1993 mSwapMask &= ~stream;
1994 }
1995 }
1996
1997 mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */);
1998
1999 ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask);
2000 if (mSwapMask != 0) {
2001 return;
2002 }
2003
2004 // Check if new variant contains extra streams.
2005 uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
2006 while (extraStreams) {
2007 StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1));
2008 extraStreams &= ~stream;
2009
2010 swapPacketSource(stream);
2011
2012 ssize_t idx = typeToIndex(stream);
2013 CHECK(idx >= 0);
2014 if (mStreams[idx].mNewUri.empty()) {
2015 ALOGW("swapping extra stream type %d %s to empty stream",
2016 stream, mStreams[idx].mUri.c_str());
2017 }
2018 mStreams[idx].mUri = mStreams[idx].mNewUri;
2019 mStreams[idx].mNewUri.clear();
2020 }
2021
2022 // Restart new fetcher (it was paused after the first 47k block)
2023 // and let it fetch into mPacketSources (not mPacketSources2)
2024 for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2025 FetcherInfo &info = mFetcherInfos.editValueAt(i);
2026 if (info.mToBeResumed) {
2027 resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask);
2028 info.mToBeResumed = false;
2029 }
2030 }
2031
2032 ALOGI("#### Finished Bandwidth Switch: %zd => %zd",
2033 mOrigBandwidthIndex, mCurBandwidthIndex);
2034
2035 mStreamMask = mNewStreamMask;
2036 mSwitchInProgress = false;
2037 mOrigBandwidthIndex = mCurBandwidthIndex;
2038
2039 restartPollBuffering();
2040 }
2041
schedulePollBuffering()2042 void LiveSession::schedulePollBuffering() {
2043 sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
2044 msg->setInt32("generation", mPollBufferingGeneration);
2045 msg->post(1000000ll);
2046 }
2047
cancelPollBuffering()2048 void LiveSession::cancelPollBuffering() {
2049 ++mPollBufferingGeneration;
2050 mPrevBufferPercentage = -1;
2051 }
2052
restartPollBuffering()2053 void LiveSession::restartPollBuffering() {
2054 cancelPollBuffering();
2055 onPollBuffering();
2056 }
2057
onPollBuffering()2058 void LiveSession::onPollBuffering() {
2059 ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
2060 "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x",
2061 mSwitchInProgress, mReconfigurationInProgress,
2062 mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
2063
2064 bool underflow, ready, down, up;
2065 if (checkBuffering(underflow, ready, down, up)) {
2066 if (mInPreparationPhase) {
2067 // Allow down switch even if we're still preparing.
2068 //
2069 // Some streams have a high bandwidth index as default,
2070 // when bandwidth is low, it takes a long time to buffer
2071 // to ready mark, then it immediately pauses after start
2072 // as we have to do a down switch. It's better experience
2073 // to restart from a lower index, if we detect low bw.
2074 if (!switchBandwidthIfNeeded(false /* up */, down) && ready) {
2075 postPrepared(OK);
2076 }
2077 }
2078
2079 if (!mInPreparationPhase) {
2080 if (ready) {
2081 stopBufferingIfNecessary();
2082 } else if (underflow) {
2083 startBufferingIfNecessary();
2084 }
2085 switchBandwidthIfNeeded(up, down);
2086 }
2087 }
2088
2089 schedulePollBuffering();
2090 }
2091
cancelBandwidthSwitch(bool resume)2092 void LiveSession::cancelBandwidthSwitch(bool resume) {
2093 ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd",
2094 mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex);
2095 if (!mSwitchInProgress) {
2096 return;
2097 }
2098
2099 for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2100 FetcherInfo& info = mFetcherInfos.editValueAt(i);
2101 if (info.mToBeRemoved) {
2102 info.mToBeRemoved = false;
2103 if (resume) {
2104 resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask);
2105 }
2106 }
2107 }
2108
2109 for (size_t i = 0; i < kMaxStreams; ++i) {
2110 AString newUri = mStreams[i].mNewUri;
2111 if (!newUri.empty()) {
2112 // clear all mNewUri matching this newUri
2113 for (size_t j = i; j < kMaxStreams; ++j) {
2114 if (mStreams[j].mNewUri == newUri) {
2115 mStreams[j].mNewUri.clear();
2116 }
2117 }
2118 ALOGV("stopping newUri = %s", newUri.c_str());
2119 ssize_t index = mFetcherInfos.indexOfKey(newUri);
2120 if (index < 0) {
2121 ALOGE("did not find fetcher for newUri: %s", newUri.c_str());
2122 continue;
2123 }
2124 FetcherInfo &info = mFetcherInfos.editValueAt(index);
2125 info.mToBeRemoved = true;
2126 info.mFetcher->stopAsync();
2127 }
2128 }
2129
2130 ALOGI("#### Canceled Bandwidth Switch: %zd => %zd",
2131 mOrigBandwidthIndex, mCurBandwidthIndex);
2132
2133 mSwitchGeneration++;
2134 mSwitchInProgress = false;
2135 mCurBandwidthIndex = mOrigBandwidthIndex;
2136 mSwapMask = 0;
2137 }
2138
checkBuffering(bool & underflow,bool & ready,bool & down,bool & up)2139 bool LiveSession::checkBuffering(
2140 bool &underflow, bool &ready, bool &down, bool &up) {
2141 underflow = ready = down = up = false;
2142
2143 if (mReconfigurationInProgress) {
2144 ALOGV("Switch/Reconfig in progress, defer buffer polling");
2145 return false;
2146 }
2147
2148 size_t activeCount, underflowCount, readyCount, downCount, upCount;
2149 activeCount = underflowCount = readyCount = downCount = upCount =0;
2150 int32_t minBufferPercent = -1;
2151 int64_t durationUs;
2152 if (getDuration(&durationUs) != OK) {
2153 durationUs = -1;
2154 }
2155 for (size_t i = 0; i < mPacketSources.size(); ++i) {
2156 // we don't check subtitles for buffering level
2157 if (!(mStreamMask & mPacketSources.keyAt(i)
2158 & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) {
2159 continue;
2160 }
2161 // ignore streams that never had any packet queued.
2162 // (it's possible that the variant only has audio or video)
2163 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
2164 if (meta == NULL) {
2165 continue;
2166 }
2167
2168 status_t finalResult;
2169 int64_t bufferedDurationUs =
2170 mPacketSources[i]->getBufferedDurationUs(&finalResult);
2171 ALOGV("[%s] buffered %lld us",
2172 getNameForStream(mPacketSources.keyAt(i)),
2173 (long long)bufferedDurationUs);
2174 if (durationUs >= 0) {
2175 int32_t percent;
2176 if (mPacketSources[i]->isFinished(0 /* duration */)) {
2177 percent = 100;
2178 } else {
2179 percent = (int32_t)(100.0 *
2180 (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs);
2181 }
2182 if (minBufferPercent < 0 || percent < minBufferPercent) {
2183 minBufferPercent = percent;
2184 }
2185 }
2186
2187 ++activeCount;
2188 int64_t readyMark = mInPreparationPhase ? kPrepareMarkUs : kReadyMarkUs;
2189 if (bufferedDurationUs > readyMark
2190 || mPacketSources[i]->isFinished(0)) {
2191 ++readyCount;
2192 }
2193 if (!mPacketSources[i]->isFinished(0)) {
2194 if (bufferedDurationUs < kUnderflowMarkUs) {
2195 ++underflowCount;
2196 }
2197 if (bufferedDurationUs > mUpSwitchMark) {
2198 ++upCount;
2199 }
2200 if (bufferedDurationUs < mDownSwitchMark) {
2201 ++downCount;
2202 }
2203 }
2204 }
2205
2206 if (minBufferPercent >= 0) {
2207 notifyBufferingUpdate(minBufferPercent);
2208 }
2209
2210 if (activeCount > 0) {
2211 up = (upCount == activeCount);
2212 down = (downCount > 0);
2213 ready = (readyCount == activeCount);
2214 underflow = (underflowCount > 0);
2215 return true;
2216 }
2217
2218 return false;
2219 }
2220
startBufferingIfNecessary()2221 void LiveSession::startBufferingIfNecessary() {
2222 ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2223 mInPreparationPhase, mBuffering);
2224 if (!mBuffering) {
2225 mBuffering = true;
2226
2227 sp<AMessage> notify = mNotify->dup();
2228 notify->setInt32("what", kWhatBufferingStart);
2229 notify->post();
2230 }
2231 }
2232
stopBufferingIfNecessary()2233 void LiveSession::stopBufferingIfNecessary() {
2234 ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2235 mInPreparationPhase, mBuffering);
2236
2237 if (mBuffering) {
2238 mBuffering = false;
2239
2240 sp<AMessage> notify = mNotify->dup();
2241 notify->setInt32("what", kWhatBufferingEnd);
2242 notify->post();
2243 }
2244 }
2245
notifyBufferingUpdate(int32_t percentage)2246 void LiveSession::notifyBufferingUpdate(int32_t percentage) {
2247 if (percentage < mPrevBufferPercentage) {
2248 percentage = mPrevBufferPercentage;
2249 } else if (percentage > 100) {
2250 percentage = 100;
2251 }
2252
2253 mPrevBufferPercentage = percentage;
2254
2255 ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage);
2256
2257 sp<AMessage> notify = mNotify->dup();
2258 notify->setInt32("what", kWhatBufferingUpdate);
2259 notify->setInt32("percentage", percentage);
2260 notify->post();
2261 }
2262
tryBandwidthFallback()2263 bool LiveSession::tryBandwidthFallback() {
2264 if (mInPreparationPhase || mReconfigurationInProgress) {
2265 // Don't try fallback during prepare or reconfig.
2266 // If error happens there, it's likely unrecoverable.
2267 return false;
2268 }
2269 if (mCurBandwidthIndex > mOrigBandwidthIndex) {
2270 // if we're switching up, simply cancel and resume old variant
2271 cancelBandwidthSwitch(true /* resume */);
2272 return true;
2273 } else {
2274 // if we're switching down, we're likely about to underflow (if
2275 // not already underflowing). try the lowest viable bandwidth if
2276 // not on that variant already.
2277 ssize_t lowestValid = getLowestValidBandwidthIndex();
2278 if (mCurBandwidthIndex > lowestValid) {
2279 cancelBandwidthSwitch();
2280 changeConfiguration(-1ll, lowestValid);
2281 return true;
2282 }
2283 }
2284 // return false if we couldn't find any fallback
2285 return false;
2286 }
2287
2288 /*
2289 * returns true if a bandwidth switch is actually needed (and started),
2290 * returns false otherwise
2291 */
switchBandwidthIfNeeded(bool bufferHigh,bool bufferLow)2292 bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
2293 // no need to check bandwidth if we only have 1 bandwidth settings
2294 if (mBandwidthItems.size() < 2) {
2295 return false;
2296 }
2297
2298 if (mSwitchInProgress) {
2299 if (mBuffering) {
2300 tryBandwidthFallback();
2301 }
2302 return false;
2303 }
2304
2305 int32_t bandwidthBps, shortTermBps;
2306 bool isStable;
2307 if (mBandwidthEstimator->estimateBandwidth(
2308 &bandwidthBps, &isStable, &shortTermBps)) {
2309 ALOGV("bandwidth estimated at %.2f kbps, "
2310 "stable %d, shortTermBps %.2f kbps",
2311 bandwidthBps / 1024.0f, isStable, shortTermBps / 1024.0f);
2312 mLastBandwidthBps = bandwidthBps;
2313 mLastBandwidthStable = isStable;
2314 } else {
2315 ALOGV("no bandwidth estimate.");
2316 return false;
2317 }
2318
2319 int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth;
2320 // canSwithDown and canSwitchUp can't both be true.
2321 // we only want to switch up when measured bw is 120% higher than current variant,
2322 // and we only want to switch down when measured bw is below current variant.
2323 bool canSwitchDown = bufferLow
2324 && (bandwidthBps < (int32_t)curBandwidth);
2325 bool canSwitchUp = bufferHigh
2326 && (bandwidthBps > (int32_t)curBandwidth * 12 / 10);
2327
2328 if (canSwitchDown || canSwitchUp) {
2329 // bandwidth estimating has some delay, if we have to downswitch when
2330 // it hasn't stabilized, use the short term to guess real bandwidth,
2331 // since it may be dropping too fast.
2332 // (note this doesn't apply to upswitch, always use longer average there)
2333 if (!isStable && canSwitchDown) {
2334 if (shortTermBps < bandwidthBps) {
2335 bandwidthBps = shortTermBps;
2336 }
2337 }
2338
2339 ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps);
2340
2341 // it's possible that we're checking for canSwitchUp case, but the returned
2342 // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70%
2343 // of measured bw. In that case we don't want to do anything, since we have
2344 // both enough buffer and enough bw.
2345 if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex)
2346 || (canSwitchDown && bandwidthIndex < mCurBandwidthIndex)) {
2347 // if not yet prepared, just restart again with new bw index.
2348 // this is faster and playback experience is cleaner.
2349 changeConfiguration(
2350 mInPreparationPhase ? 0 : -1ll, bandwidthIndex);
2351 return true;
2352 }
2353 }
2354 return false;
2355 }
2356
postError(status_t err)2357 void LiveSession::postError(status_t err) {
2358 // if we reached EOS, notify buffering of 100%
2359 if (err == ERROR_END_OF_STREAM) {
2360 notifyBufferingUpdate(100);
2361 }
2362 // we'll stop buffer polling now, before that notify
2363 // stop buffering to stop the spinning icon
2364 stopBufferingIfNecessary();
2365 cancelPollBuffering();
2366
2367 sp<AMessage> notify = mNotify->dup();
2368 notify->setInt32("what", kWhatError);
2369 notify->setInt32("err", err);
2370 notify->post();
2371 }
2372
postPrepared(status_t err)2373 void LiveSession::postPrepared(status_t err) {
2374 CHECK(mInPreparationPhase);
2375
2376 sp<AMessage> notify = mNotify->dup();
2377 if (err == OK || err == ERROR_END_OF_STREAM) {
2378 notify->setInt32("what", kWhatPrepared);
2379 } else {
2380 cancelPollBuffering();
2381
2382 notify->setInt32("what", kWhatPreparationFailed);
2383 notify->setInt32("err", err);
2384 }
2385
2386 notify->post();
2387
2388 mInPreparationPhase = false;
2389 }
2390
2391
2392 } // namespace android
2393
2394