1 /*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "PlaylistFetcher"
19 #include <android-base/macros.h>
20 #include <utils/Log.h>
21 #include <utils/misc.h>
22
23 #include "PlaylistFetcher.h"
24 #include "HTTPDownloader.h"
25 #include "LiveSession.h"
26 #include "M3UParser.h"
27 #include <ID3.h>
28 #include <mpeg2ts/AnotherPacketSource.h>
29 #include <mpeg2ts/HlsSampleDecryptor.h>
30
31 #include <datasource/DataURISource.h>
32 #include <media/stagefright/foundation/ABitReader.h>
33 #include <media/stagefright/foundation/ABuffer.h>
34 #include <media/stagefright/foundation/ADebug.h>
35 #include <media/stagefright/foundation/ByteUtils.h>
36 #include <media/stagefright/foundation/MediaKeys.h>
37 #include <media/stagefright/foundation/avc_utils.h>
38 #include <media/stagefright/MediaDefs.h>
39 #include <media/stagefright/MetaData.h>
40 #include <media/stagefright/MetaDataUtils.h>
41 #include <media/stagefright/Utils.h>
42 #include <media/stagefright/FoundationUtils.h>
43
44 #include <ctype.h>
45 #include <inttypes.h>
46
47 #define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__)
48 #define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \
49 LiveSession::getNameForStream(stream), ##__VA_ARGS__)
50
51 namespace android {
52
53 // static
54 const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000LL;
55 const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000LL;
56 // LCM of 188 (size of a TS packet) & 1k works well
57 const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
58
59 struct PlaylistFetcher::DownloadState : public RefBase {
60 DownloadState();
61 void resetState();
62 bool hasSavedState() const;
63 void restoreState(
64 AString &uri,
65 sp<AMessage> &itemMeta,
66 sp<ABuffer> &buffer,
67 sp<ABuffer> &tsBuffer,
68 int32_t &firstSeqNumberInPlaylist,
69 int32_t &lastSeqNumberInPlaylist);
70 void saveState(
71 AString &uri,
72 sp<AMessage> &itemMeta,
73 sp<ABuffer> &buffer,
74 sp<ABuffer> &tsBuffer,
75 int32_t &firstSeqNumberInPlaylist,
76 int32_t &lastSeqNumberInPlaylist);
77
78 private:
79 bool mHasSavedState;
80 AString mUri;
81 sp<AMessage> mItemMeta;
82 sp<ABuffer> mBuffer;
83 sp<ABuffer> mTsBuffer;
84 int32_t mFirstSeqNumberInPlaylist;
85 int32_t mLastSeqNumberInPlaylist;
86 };
87
DownloadState()88 PlaylistFetcher::DownloadState::DownloadState() {
89 resetState();
90 }
91
hasSavedState() const92 bool PlaylistFetcher::DownloadState::hasSavedState() const {
93 return mHasSavedState;
94 }
95
resetState()96 void PlaylistFetcher::DownloadState::resetState() {
97 mHasSavedState = false;
98
99 mUri.clear();
100 mItemMeta = NULL;
101 mBuffer = NULL;
102 mTsBuffer = NULL;
103 mFirstSeqNumberInPlaylist = 0;
104 mLastSeqNumberInPlaylist = 0;
105 }
106
restoreState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)107 void PlaylistFetcher::DownloadState::restoreState(
108 AString &uri,
109 sp<AMessage> &itemMeta,
110 sp<ABuffer> &buffer,
111 sp<ABuffer> &tsBuffer,
112 int32_t &firstSeqNumberInPlaylist,
113 int32_t &lastSeqNumberInPlaylist) {
114 if (!mHasSavedState) {
115 return;
116 }
117
118 uri = mUri;
119 itemMeta = mItemMeta;
120 buffer = mBuffer;
121 tsBuffer = mTsBuffer;
122 firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist;
123 lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist;
124
125 resetState();
126 }
127
saveState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)128 void PlaylistFetcher::DownloadState::saveState(
129 AString &uri,
130 sp<AMessage> &itemMeta,
131 sp<ABuffer> &buffer,
132 sp<ABuffer> &tsBuffer,
133 int32_t &firstSeqNumberInPlaylist,
134 int32_t &lastSeqNumberInPlaylist) {
135 mHasSavedState = true;
136
137 mUri = uri;
138 mItemMeta = itemMeta;
139 mBuffer = buffer;
140 mTsBuffer = tsBuffer;
141 mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist;
142 mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist;
143 }
144
PlaylistFetcher(const sp<AMessage> & notify,const sp<LiveSession> & session,const char * uri,int32_t id,int32_t subtitleGeneration)145 PlaylistFetcher::PlaylistFetcher(
146 const sp<AMessage> ¬ify,
147 const sp<LiveSession> &session,
148 const char *uri,
149 int32_t id,
150 int32_t subtitleGeneration)
151 : mNotify(notify),
152 mSession(session),
153 mURI(uri),
154 mFetcherID(id),
155 mStreamTypeMask(0),
156 mStartTimeUs(-1LL),
157 mSegmentStartTimeUs(-1LL),
158 mDiscontinuitySeq(-1LL),
159 mStartTimeUsRelative(false),
160 mLastPlaylistFetchTimeUs(-1LL),
161 mPlaylistTimeUs(-1LL),
162 mSeqNumber(-1),
163 mNumRetries(0),
164 mNumRetriesForMonitorQueue(0),
165 mStartup(true),
166 mIDRFound(false),
167 mSeekMode(LiveSession::kSeekModeExactPosition),
168 mTimeChangeSignaled(false),
169 mNextPTSTimeUs(-1LL),
170 mMonitorQueueGeneration(0),
171 mSubtitleGeneration(subtitleGeneration),
172 mLastDiscontinuitySeq(-1LL),
173 mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
174 mFirstPTSValid(false),
175 mFirstTimeUs(-1LL),
176 mVideoBuffer(new AnotherPacketSource(NULL)),
177 mSampleAesKeyItemChanged(false),
178 mThresholdRatio(-1.0f),
179 mDownloadState(new DownloadState()),
180 mHasMetadata(false) {
181 memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
182 mHTTPDownloader = mSession->getHTTPDownloader();
183
184 memset(mKeyData, 0, sizeof(mKeyData));
185 memset(mAESInitVec, 0, sizeof(mAESInitVec));
186 }
187
~PlaylistFetcher()188 PlaylistFetcher::~PlaylistFetcher() {
189 }
190
getFetcherID() const191 int32_t PlaylistFetcher::getFetcherID() const {
192 return mFetcherID;
193 }
194
getSegmentStartTimeUs(int32_t seqNumber) const195 int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
196 CHECK(mPlaylist != NULL);
197
198 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
199 mPlaylist->getSeqNumberRange(
200 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
201
202 CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
203 CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
204
205 int64_t segmentStartUs = 0LL;
206 for (int32_t index = 0;
207 index < seqNumber - firstSeqNumberInPlaylist; ++index) {
208 sp<AMessage> itemMeta;
209 CHECK(mPlaylist->itemAt(
210 index, NULL /* uri */, &itemMeta));
211
212 int64_t itemDurationUs;
213 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
214
215 segmentStartUs += itemDurationUs;
216 }
217
218 return segmentStartUs;
219 }
220
getSegmentDurationUs(int32_t seqNumber) const221 int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const {
222 CHECK(mPlaylist != NULL);
223
224 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
225 mPlaylist->getSeqNumberRange(
226 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
227
228 CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
229 CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
230
231 int32_t index = seqNumber - firstSeqNumberInPlaylist;
232 sp<AMessage> itemMeta;
233 CHECK(mPlaylist->itemAt(
234 index, NULL /* uri */, &itemMeta));
235
236 int64_t itemDurationUs;
237 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
238
239 return itemDurationUs;
240 }
241
delayUsToRefreshPlaylist() const242 int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
243 int64_t nowUs = ALooper::GetNowUs();
244
245 if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0LL) {
246 CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
247 return 0LL;
248 }
249
250 if (mPlaylist->isComplete()) {
251 return (~0LLU >> 1);
252 }
253
254 int64_t targetDurationUs = mPlaylist->getTargetDuration();
255
256 int64_t minPlaylistAgeUs;
257
258 switch (mRefreshState) {
259 case INITIAL_MINIMUM_RELOAD_DELAY:
260 {
261 size_t n = mPlaylist->size();
262 if (n > 0) {
263 sp<AMessage> itemMeta;
264 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
265
266 int64_t itemDurationUs;
267 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
268
269 minPlaylistAgeUs = itemDurationUs;
270 break;
271 }
272
273 FALLTHROUGH_INTENDED;
274 }
275
276 case FIRST_UNCHANGED_RELOAD_ATTEMPT:
277 {
278 minPlaylistAgeUs = targetDurationUs / 2;
279 break;
280 }
281
282 case SECOND_UNCHANGED_RELOAD_ATTEMPT:
283 {
284 minPlaylistAgeUs = (targetDurationUs * 3) / 2;
285 break;
286 }
287
288 case THIRD_UNCHANGED_RELOAD_ATTEMPT:
289 {
290 minPlaylistAgeUs = targetDurationUs * 3;
291 break;
292 }
293
294 default:
295 TRESPASS();
296 break;
297 }
298
299 int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
300 return delayUs > 0LL ? delayUs : 0LL;
301 }
302
decryptBuffer(size_t playlistIndex,const sp<ABuffer> & buffer,bool first)303 status_t PlaylistFetcher::decryptBuffer(
304 size_t playlistIndex, const sp<ABuffer> &buffer,
305 bool first) {
306 sp<AMessage> itemMeta;
307 bool found = false;
308 AString method;
309
310 for (ssize_t i = playlistIndex; i >= 0; --i) {
311 AString uri;
312 CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
313
314 if (itemMeta->findString("cipher-method", &method)) {
315 found = true;
316 break;
317 }
318 }
319
320 // TODO: Revise this when we add support for KEYFORMAT
321 // If method has changed (e.g., -> NONE); sufficient to check at the segment boundary
322 if (mSampleAesKeyItem != NULL && first && found && method != "SAMPLE-AES") {
323 ALOGI("decryptBuffer: resetting mSampleAesKeyItem(%p) with method %s",
324 mSampleAesKeyItem.get(), method.c_str());
325 mSampleAesKeyItem = NULL;
326 mSampleAesKeyItemChanged = true;
327 }
328
329 if (!found) {
330 method = "NONE";
331 }
332 buffer->meta()->setString("cipher-method", method.c_str());
333
334 if (method == "NONE") {
335 return OK;
336 } else if (method == "SAMPLE-AES") {
337 ALOGV("decryptBuffer: Non-Widevine SAMPLE-AES is supported now.");
338 } else if (!(method == "AES-128")) {
339 ALOGE("Unsupported cipher method '%s'", method.c_str());
340 return ERROR_UNSUPPORTED;
341 }
342
343 AString keyURI;
344 if (!itemMeta->findString("cipher-uri", &keyURI)) {
345 ALOGE("Missing key uri");
346 return ERROR_MALFORMED;
347 }
348 keyURI = mPlaylist->getFullCipherUri(keyURI);
349
350 ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
351
352 sp<ABuffer> key;
353 if (index >= 0) {
354 key = mAESKeyForURI.valueAt(index);
355 } else if (keyURI.startsWith("data:")) {
356 sp<DataSource> keySrc = DataURISource::Create(keyURI.c_str());
357 off64_t keyLen;
358 if (keySrc == NULL || keySrc->getSize(&keyLen) != OK || keyLen < 0) {
359 ALOGE("Malformed cipher key data uri.");
360 return ERROR_MALFORMED;
361 }
362 key = new ABuffer(keyLen);
363 keySrc->readAt(0, key->data(), keyLen);
364 key->setRange(0, keyLen);
365 } else {
366 ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key);
367
368 if (err == ERROR_NOT_CONNECTED) {
369 return ERROR_NOT_CONNECTED;
370 } else if (err < 0) {
371 ALOGE("failed to fetch cipher key from '%s'.", uriDebugString(keyURI).c_str());
372 return ERROR_IO;
373 } else if (key->size() != 16) {
374 ALOGE("key file '%s' wasn't 16 bytes in size.", uriDebugString(keyURI).c_str());
375 return ERROR_MALFORMED;
376 }
377
378 mAESKeyForURI.add(keyURI, key);
379 }
380
381 if (first) {
382 // If decrypting the first block in a file, read the iv from the manifest
383 // or derive the iv from the file's sequence number.
384
385 unsigned char AESInitVec[AES_BLOCK_SIZE];
386 AString iv;
387 if (itemMeta->findString("cipher-iv", &iv)) {
388 if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
389 || iv.size() > 16 * 2 + 2) {
390 ALOGE("malformed cipher IV '%s'.", iv.c_str());
391 return ERROR_MALFORMED;
392 }
393
394 while (iv.size() < 16 * 2 + 2) {
395 iv.insert("0", 1, 2);
396 }
397
398 memset(AESInitVec, 0, sizeof(AESInitVec));
399 for (size_t i = 0; i < 16; ++i) {
400 char c1 = tolower(iv.c_str()[2 + 2 * i]);
401 char c2 = tolower(iv.c_str()[3 + 2 * i]);
402 if (!isxdigit(c1) || !isxdigit(c2)) {
403 ALOGE("malformed cipher IV '%s'.", iv.c_str());
404 return ERROR_MALFORMED;
405 }
406 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
407 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
408
409 AESInitVec[i] = nibble1 << 4 | nibble2;
410 }
411 } else {
412 memset(AESInitVec, 0, sizeof(AESInitVec));
413 AESInitVec[15] = mSeqNumber & 0xff;
414 AESInitVec[14] = (mSeqNumber >> 8) & 0xff;
415 AESInitVec[13] = (mSeqNumber >> 16) & 0xff;
416 AESInitVec[12] = (mSeqNumber >> 24) & 0xff;
417 }
418
419 bool newKey = memcmp(mKeyData, key->data(), AES_BLOCK_SIZE) != 0;
420 bool newInitVec = memcmp(mAESInitVec, AESInitVec, AES_BLOCK_SIZE) != 0;
421 bool newSampleAesKeyItem = newKey || newInitVec;
422 ALOGV("decryptBuffer: SAMPLE-AES newKeyItem %d/%d (Key %d initVec %d)",
423 mSampleAesKeyItemChanged, newSampleAesKeyItem, newKey, newInitVec);
424
425 if (newSampleAesKeyItem) {
426 memcpy(mKeyData, key->data(), AES_BLOCK_SIZE);
427 memcpy(mAESInitVec, AESInitVec, AES_BLOCK_SIZE);
428
429 if (method == "SAMPLE-AES") {
430 mSampleAesKeyItemChanged = true;
431
432 sp<ABuffer> keyDataBuffer = ABuffer::CreateAsCopy(mKeyData, sizeof(mKeyData));
433 sp<ABuffer> initVecBuffer = ABuffer::CreateAsCopy(mAESInitVec, sizeof(mAESInitVec));
434
435 // always allocating a new one rather than updating the old message
436 // lower layer might still have a reference to the old message
437 mSampleAesKeyItem = new AMessage();
438 mSampleAesKeyItem->setBuffer("keyData", keyDataBuffer);
439 mSampleAesKeyItem->setBuffer("initVec", initVecBuffer);
440
441 ALOGV("decryptBuffer: New SampleAesKeyItem: Key: %s IV: %s",
442 HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
443 HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
444 } // SAMPLE-AES
445 } // newSampleAesKeyItem
446 } // first
447
448 if (method == "SAMPLE-AES") {
449 ALOGV("decryptBuffer: skipping full-seg decrypt for SAMPLE-AES");
450 return OK;
451 }
452
453
454 AES_KEY aes_key;
455 if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
456 ALOGE("failed to set AES decryption key.");
457 return UNKNOWN_ERROR;
458 }
459
460 size_t n = buffer->size();
461 if (!n) {
462 return OK;
463 }
464
465 if (n < 16 || n % 16) {
466 ALOGE("not enough or trailing bytes (%zu) in encrypted buffer", n);
467 return ERROR_MALFORMED;
468 }
469
470 AES_cbc_encrypt(
471 buffer->data(), buffer->data(), buffer->size(),
472 &aes_key, mAESInitVec, AES_DECRYPT);
473
474 return OK;
475 }
476
checkDecryptPadding(const sp<ABuffer> & buffer)477 status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
478 AString method;
479 CHECK(buffer->meta()->findString("cipher-method", &method));
480 if (method == "NONE" || method == "SAMPLE-AES") {
481 return OK;
482 }
483
484 uint8_t padding = 0;
485 if (buffer->size() > 0) {
486 padding = buffer->data()[buffer->size() - 1];
487 }
488
489 if (padding > 16) {
490 return ERROR_MALFORMED;
491 }
492
493 for (size_t i = buffer->size() - padding; i < padding; i++) {
494 if (buffer->data()[i] != padding) {
495 return ERROR_MALFORMED;
496 }
497 }
498
499 buffer->setRange(buffer->offset(), buffer->size() - padding);
500 return OK;
501 }
502
postMonitorQueue(int64_t delayUs,int64_t minDelayUs)503 void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
504 int64_t maxDelayUs = delayUsToRefreshPlaylist();
505 if (maxDelayUs < minDelayUs) {
506 maxDelayUs = minDelayUs;
507 }
508 if (delayUs > maxDelayUs) {
509 FLOGV("Need to refresh playlist in %lld", (long long)maxDelayUs);
510 delayUs = maxDelayUs;
511 }
512 sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this);
513 msg->setInt32("generation", mMonitorQueueGeneration);
514 msg->post(delayUs);
515 }
516
cancelMonitorQueue()517 void PlaylistFetcher::cancelMonitorQueue() {
518 ++mMonitorQueueGeneration;
519 }
520
setStoppingThreshold(float thresholdRatio,bool disconnect)521 void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) {
522 {
523 AutoMutex _l(mThresholdLock);
524 mThresholdRatio = thresholdRatio;
525 }
526 if (disconnect) {
527 mHTTPDownloader->disconnect();
528 }
529 }
530
resetStoppingThreshold(bool disconnect)531 void PlaylistFetcher::resetStoppingThreshold(bool disconnect) {
532 {
533 AutoMutex _l(mThresholdLock);
534 mThresholdRatio = -1.0f;
535 }
536 if (disconnect) {
537 mHTTPDownloader->disconnect();
538 } else {
539 // allow reconnect
540 mHTTPDownloader->reconnect();
541 }
542 }
543
getStoppingThreshold()544 float PlaylistFetcher::getStoppingThreshold() {
545 AutoMutex _l(mThresholdLock);
546 return mThresholdRatio;
547 }
548
startAsync(const sp<AnotherPacketSource> & audioSource,const sp<AnotherPacketSource> & videoSource,const sp<AnotherPacketSource> & subtitleSource,const sp<AnotherPacketSource> & metadataSource,int64_t startTimeUs,int64_t segmentStartTimeUs,int32_t startDiscontinuitySeq,LiveSession::SeekMode seekMode)549 void PlaylistFetcher::startAsync(
550 const sp<AnotherPacketSource> &audioSource,
551 const sp<AnotherPacketSource> &videoSource,
552 const sp<AnotherPacketSource> &subtitleSource,
553 const sp<AnotherPacketSource> &metadataSource,
554 int64_t startTimeUs,
555 int64_t segmentStartTimeUs,
556 int32_t startDiscontinuitySeq,
557 LiveSession::SeekMode seekMode) {
558 sp<AMessage> msg = new AMessage(kWhatStart, this);
559
560 uint32_t streamTypeMask = 0ul;
561
562 if (audioSource != NULL) {
563 msg->setPointer("audioSource", audioSource.get());
564 streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
565 }
566
567 if (videoSource != NULL) {
568 msg->setPointer("videoSource", videoSource.get());
569 streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
570 }
571
572 if (subtitleSource != NULL) {
573 msg->setPointer("subtitleSource", subtitleSource.get());
574 streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
575 }
576
577 if (metadataSource != NULL) {
578 msg->setPointer("metadataSource", metadataSource.get());
579 // metadataSource does not affect streamTypeMask.
580 }
581
582 msg->setInt32("streamTypeMask", streamTypeMask);
583 msg->setInt64("startTimeUs", startTimeUs);
584 msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
585 msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
586 msg->setInt32("seekMode", seekMode);
587 msg->post();
588 }
589
590 /*
591 * pauseAsync
592 *
593 * threshold: 0.0f - pause after current fetch block (default 47Kbytes)
594 * -1.0f - pause after finishing current segment
595 * 0.0~1.0f - pause if remaining of current segment exceeds threshold
596 */
pauseAsync(float thresholdRatio,bool disconnect)597 void PlaylistFetcher::pauseAsync(
598 float thresholdRatio, bool disconnect) {
599 setStoppingThreshold(thresholdRatio, disconnect);
600
601 (new AMessage(kWhatPause, this))->post();
602 }
603
stopAsync(bool clear)604 void PlaylistFetcher::stopAsync(bool clear) {
605 setStoppingThreshold(0.0f, true /* disconncect */);
606
607 sp<AMessage> msg = new AMessage(kWhatStop, this);
608 msg->setInt32("clear", clear);
609 msg->post();
610 }
611
resumeUntilAsync(const sp<AMessage> & params)612 void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> ¶ms) {
613 FLOGV("resumeUntilAsync: params=%s", params->debugString().c_str());
614
615 AMessage* msg = new AMessage(kWhatResumeUntil, this);
616 msg->setMessage("params", params);
617 msg->post();
618 }
619
fetchPlaylistAsync()620 void PlaylistFetcher::fetchPlaylistAsync() {
621 (new AMessage(kWhatFetchPlaylist, this))->post();
622 }
623
onMessageReceived(const sp<AMessage> & msg)624 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
625 switch (msg->what()) {
626 case kWhatStart:
627 {
628 status_t err = onStart(msg);
629
630 sp<AMessage> notify = mNotify->dup();
631 notify->setInt32("what", kWhatStarted);
632 notify->setInt32("err", err);
633 notify->post();
634 break;
635 }
636
637 case kWhatPause:
638 {
639 onPause();
640
641 sp<AMessage> notify = mNotify->dup();
642 notify->setInt32("what", kWhatPaused);
643 notify->setInt32("seekMode",
644 mDownloadState->hasSavedState()
645 ? LiveSession::kSeekModeNextSample
646 : LiveSession::kSeekModeNextSegment);
647 notify->post();
648 break;
649 }
650
651 case kWhatStop:
652 {
653 onStop(msg);
654
655 sp<AMessage> notify = mNotify->dup();
656 notify->setInt32("what", kWhatStopped);
657 notify->post();
658 break;
659 }
660
661 case kWhatFetchPlaylist:
662 {
663 bool unchanged;
664 sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
665 mURI.c_str(), NULL /* curPlaylistHash */, &unchanged);
666
667 sp<AMessage> notify = mNotify->dup();
668 notify->setInt32("what", kWhatPlaylistFetched);
669 notify->setObject("playlist", playlist);
670 notify->post();
671 break;
672 }
673
674 case kWhatMonitorQueue:
675 case kWhatDownloadNext:
676 {
677 int32_t generation;
678 CHECK(msg->findInt32("generation", &generation));
679
680 if (generation != mMonitorQueueGeneration) {
681 // Stale event
682 break;
683 }
684
685 if (msg->what() == kWhatMonitorQueue) {
686 onMonitorQueue();
687 } else {
688 onDownloadNext();
689 }
690 break;
691 }
692
693 case kWhatResumeUntil:
694 {
695 onResumeUntil(msg);
696 break;
697 }
698
699 default:
700 TRESPASS();
701 }
702 }
703
onStart(const sp<AMessage> & msg)704 status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
705 mPacketSources.clear();
706 mStopParams.clear();
707 mStartTimeUsNotify = mNotify->dup();
708 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
709 mStartTimeUsNotify->setString("uri", mURI);
710
711 uint32_t streamTypeMask;
712 CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
713
714 int64_t startTimeUs;
715 int64_t segmentStartTimeUs;
716 int32_t startDiscontinuitySeq;
717 int32_t seekMode;
718 CHECK(msg->findInt64("startTimeUs", &startTimeUs));
719 CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
720 CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
721 CHECK(msg->findInt32("seekMode", &seekMode));
722
723 if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
724 void *ptr;
725 CHECK(msg->findPointer("audioSource", &ptr));
726
727 mPacketSources.add(
728 LiveSession::STREAMTYPE_AUDIO,
729 static_cast<AnotherPacketSource *>(ptr));
730 }
731
732 if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
733 void *ptr;
734 CHECK(msg->findPointer("videoSource", &ptr));
735
736 mPacketSources.add(
737 LiveSession::STREAMTYPE_VIDEO,
738 static_cast<AnotherPacketSource *>(ptr));
739 }
740
741 if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
742 void *ptr;
743 CHECK(msg->findPointer("subtitleSource", &ptr));
744
745 mPacketSources.add(
746 LiveSession::STREAMTYPE_SUBTITLES,
747 static_cast<AnotherPacketSource *>(ptr));
748 }
749
750 void *ptr;
751 // metadataSource is not part of streamTypeMask
752 if ((streamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO))
753 && msg->findPointer("metadataSource", &ptr)) {
754 mPacketSources.add(
755 LiveSession::STREAMTYPE_METADATA,
756 static_cast<AnotherPacketSource *>(ptr));
757 }
758
759 mStreamTypeMask = streamTypeMask;
760
761 mSegmentStartTimeUs = segmentStartTimeUs;
762
763 if (startDiscontinuitySeq >= 0) {
764 mDiscontinuitySeq = startDiscontinuitySeq;
765 }
766
767 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
768 mSeekMode = (LiveSession::SeekMode) seekMode;
769
770 if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) {
771 mStartup = true;
772 mIDRFound = false;
773 mVideoBuffer->clear();
774 }
775
776 if (startTimeUs >= 0) {
777 mStartTimeUs = startTimeUs;
778 mFirstPTSValid = false;
779 mSeqNumber = -1;
780 mTimeChangeSignaled = false;
781 mDownloadState->resetState();
782 }
783
784 postMonitorQueue();
785
786 return OK;
787 }
788
onPause()789 void PlaylistFetcher::onPause() {
790 cancelMonitorQueue();
791 mLastDiscontinuitySeq = mDiscontinuitySeq;
792
793 resetStoppingThreshold(false /* disconnect */);
794 }
795
onStop(const sp<AMessage> & msg)796 void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
797 cancelMonitorQueue();
798
799 int32_t clear;
800 CHECK(msg->findInt32("clear", &clear));
801 if (clear) {
802 for (size_t i = 0; i < mPacketSources.size(); i++) {
803 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
804 packetSource->clear();
805 }
806 }
807
808 mDownloadState->resetState();
809 mPacketSources.clear();
810 mStreamTypeMask = 0;
811
812 resetStoppingThreshold(true /* disconnect */);
813 }
814
815 // Resume until we have reached the boundary timestamps listed in `msg`; when
816 // the remaining time is too short (within a resume threshold) stop immediately
817 // instead.
onResumeUntil(const sp<AMessage> & msg)818 status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
819 sp<AMessage> params;
820 CHECK(msg->findMessage("params", ¶ms));
821
822 mStopParams = params;
823 onDownloadNext();
824
825 return OK;
826 }
827
notifyStopReached()828 void PlaylistFetcher::notifyStopReached() {
829 sp<AMessage> notify = mNotify->dup();
830 notify->setInt32("what", kWhatStopReached);
831 notify->post();
832 }
833
notifyError(status_t err)834 void PlaylistFetcher::notifyError(status_t err) {
835 sp<AMessage> notify = mNotify->dup();
836 notify->setInt32("what", kWhatError);
837 notify->setInt32("err", err);
838 notify->post();
839 }
840
queueDiscontinuity(ATSParser::DiscontinuityType type,const sp<AMessage> & extra)841 void PlaylistFetcher::queueDiscontinuity(
842 ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
843 for (size_t i = 0; i < mPacketSources.size(); ++i) {
844 // do not discard buffer upon #EXT-X-DISCONTINUITY tag
845 // (seek will discard buffer by abandoning old fetchers)
846 mPacketSources.valueAt(i)->queueDiscontinuity(
847 type, extra, false /* discard */);
848 }
849 }
850
onMonitorQueue()851 void PlaylistFetcher::onMonitorQueue() {
852 // in the middle of an unfinished download, delay
853 // playlist refresh as it'll change seq numbers
854 if (!mDownloadState->hasSavedState()) {
855 status_t err = refreshPlaylist();
856 if (err != OK) {
857 if (mNumRetriesForMonitorQueue < kMaxNumRetries) {
858 ++mNumRetriesForMonitorQueue;
859 } else {
860 notifyError(err);
861 }
862 return;
863 } else {
864 mNumRetriesForMonitorQueue = 0;
865 }
866 }
867
868 int64_t targetDurationUs = kMinBufferedDurationUs;
869 if (mPlaylist != NULL) {
870 targetDurationUs = mPlaylist->getTargetDuration();
871 }
872
873 int64_t bufferedDurationUs = 0LL;
874 status_t finalResult = OK;
875 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
876 sp<AnotherPacketSource> packetSource =
877 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
878
879 bufferedDurationUs =
880 packetSource->getBufferedDurationUs(&finalResult);
881 } else {
882 // Use min stream duration, but ignore streams that never have any packet
883 // enqueued to prevent us from waiting on a non-existent stream;
884 // when we cannot make out from the manifest what streams are included in
885 // a playlist we might assume extra streams.
886 bufferedDurationUs = -1LL;
887 for (size_t i = 0; i < mPacketSources.size(); ++i) {
888 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0
889 || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) {
890 continue;
891 }
892
893 int64_t bufferedStreamDurationUs =
894 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
895
896 FSLOGV(mPacketSources.keyAt(i), "buffered %lld", (long long)bufferedStreamDurationUs);
897
898 if (bufferedDurationUs == -1LL
899 || bufferedStreamDurationUs < bufferedDurationUs) {
900 bufferedDurationUs = bufferedStreamDurationUs;
901 }
902 }
903 if (bufferedDurationUs == -1LL) {
904 bufferedDurationUs = 0LL;
905 }
906 }
907
908 if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) {
909 FLOGV("monitoring, buffered=%lld < %lld",
910 (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs);
911
912 // delay the next download slightly; hopefully this gives other concurrent fetchers
913 // a better chance to run.
914 // onDownloadNext();
915 sp<AMessage> msg = new AMessage(kWhatDownloadNext, this);
916 msg->setInt32("generation", mMonitorQueueGeneration);
917 msg->post(1000L);
918 } else {
919 // We'd like to maintain buffering above durationToBufferUs, so try
920 // again when buffer just about to go below durationToBufferUs
921 // (or after targetDurationUs / 2, whichever is smaller).
922 int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000LL;
923 if (delayUs > targetDurationUs / 2) {
924 delayUs = targetDurationUs / 2;
925 }
926
927 FLOGV("pausing for %lld, buffered=%lld > %lld",
928 (long long)delayUs,
929 (long long)bufferedDurationUs,
930 (long long)kMinBufferedDurationUs);
931
932 postMonitorQueue(delayUs);
933 }
934 }
935
refreshPlaylist()936 status_t PlaylistFetcher::refreshPlaylist() {
937 if (delayUsToRefreshPlaylist() <= 0) {
938 bool unchanged;
939 sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
940 mURI.c_str(), mPlaylistHash, &unchanged);
941
942 if (playlist == NULL) {
943 if (unchanged) {
944 // We succeeded in fetching the playlist, but it was
945 // unchanged from the last time we tried.
946
947 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
948 mRefreshState = (RefreshState)(mRefreshState + 1);
949 }
950 } else {
951 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
952 return ERROR_IO;
953 }
954 } else {
955 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
956 mPlaylist = playlist;
957
958 if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
959 updateDuration();
960 }
961 // Notify LiveSession to use target-duration based buffering level
962 // for up/down switch. Default LiveSession::kUpSwitchMark may not
963 // be reachable for live streams, as our max buffering amount is
964 // limited to 3 segments.
965 if (!mPlaylist->isComplete()) {
966 updateTargetDuration();
967 }
968 mPlaylistTimeUs = ALooper::GetNowUs();
969 }
970
971 mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
972 }
973 return OK;
974 }
975
976 // static
bufferStartsWithTsSyncByte(const sp<ABuffer> & buffer)977 bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
978 return buffer->size() > 0 && buffer->data()[0] == 0x47;
979 }
980
shouldPauseDownload()981 bool PlaylistFetcher::shouldPauseDownload() {
982 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
983 // doesn't apply to subtitles
984 return false;
985 }
986
987 // Calculate threshold to abort current download
988 float thresholdRatio = getStoppingThreshold();
989
990 if (thresholdRatio < 0.0f) {
991 // never abort
992 return false;
993 } else if (thresholdRatio == 0.0f) {
994 // immediately abort
995 return true;
996 }
997
998 // now we have a positive thresholdUs, abort if remaining
999 // portion to download is over that threshold.
1000 if (mSegmentFirstPTS < 0) {
1001 // this means we haven't even find the first access unit,
1002 // abort now as we must be very far away from the end.
1003 return true;
1004 }
1005 int64_t lastEnqueueUs = mSegmentFirstPTS;
1006 for (size_t i = 0; i < mPacketSources.size(); ++i) {
1007 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
1008 continue;
1009 }
1010 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
1011 int32_t type;
1012 if (meta == NULL || meta->findInt32("discontinuity", &type)) {
1013 continue;
1014 }
1015 int64_t tmpUs;
1016 CHECK(meta->findInt64("timeUs", &tmpUs));
1017 if (tmpUs > lastEnqueueUs) {
1018 lastEnqueueUs = tmpUs;
1019 }
1020 }
1021 lastEnqueueUs -= mSegmentFirstPTS;
1022
1023 int64_t targetDurationUs = mPlaylist->getTargetDuration();
1024 int64_t thresholdUs = thresholdRatio * targetDurationUs;
1025
1026 FLOGV("%spausing now, thresholdUs %lld, remaining %lld",
1027 targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ",
1028 (long long)thresholdUs,
1029 (long long)(targetDurationUs - lastEnqueueUs));
1030
1031 if (targetDurationUs - lastEnqueueUs > thresholdUs) {
1032 return true;
1033 }
1034 return false;
1035 }
1036
initSeqNumberForLiveStream(int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1037 void PlaylistFetcher::initSeqNumberForLiveStream(
1038 int32_t &firstSeqNumberInPlaylist,
1039 int32_t &lastSeqNumberInPlaylist) {
1040 // start at least 3 target durations from the end.
1041 int64_t timeFromEnd = 0;
1042 size_t index = mPlaylist->size();
1043 sp<AMessage> itemMeta;
1044 int64_t itemDurationUs;
1045 int32_t targetDuration;
1046 if (mPlaylist->meta() != NULL
1047 && mPlaylist->meta()->findInt32("target-duration", &targetDuration)) {
1048 do {
1049 --index;
1050 if (!mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)
1051 || !itemMeta->findInt64("durationUs", &itemDurationUs)) {
1052 ALOGW("item or itemDurationUs missing");
1053 mSeqNumber = lastSeqNumberInPlaylist - 3;
1054 break;
1055 }
1056
1057 timeFromEnd += itemDurationUs;
1058 mSeqNumber = firstSeqNumberInPlaylist + index;
1059 } while (timeFromEnd < targetDuration * 3E6 && index > 0);
1060 } else {
1061 ALOGW("target-duration missing");
1062 mSeqNumber = lastSeqNumberInPlaylist - 3;
1063 }
1064
1065 if (mSeqNumber < firstSeqNumberInPlaylist) {
1066 mSeqNumber = firstSeqNumberInPlaylist;
1067 }
1068 }
1069
initDownloadState(AString & uri,sp<AMessage> & itemMeta,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1070 bool PlaylistFetcher::initDownloadState(
1071 AString &uri,
1072 sp<AMessage> &itemMeta,
1073 int32_t &firstSeqNumberInPlaylist,
1074 int32_t &lastSeqNumberInPlaylist) {
1075 status_t err = refreshPlaylist();
1076 firstSeqNumberInPlaylist = 0;
1077 lastSeqNumberInPlaylist = 0;
1078 bool discontinuity = false;
1079
1080 if (mPlaylist != NULL) {
1081 mPlaylist->getSeqNumberRange(
1082 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
1083
1084 if (mDiscontinuitySeq < 0) {
1085 mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
1086 }
1087 }
1088
1089 mSegmentFirstPTS = -1LL;
1090
1091 if (mPlaylist != NULL && mSeqNumber < 0) {
1092 CHECK_GE(mStartTimeUs, 0LL);
1093
1094 if (mSegmentStartTimeUs < 0) {
1095 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1096 // this is a live session
1097 initSeqNumberForLiveStream(firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1098 } else {
1099 // When seeking mSegmentStartTimeUs is unavailable (< 0), we
1100 // use mStartTimeUs (client supplied timestamp) to determine both start segment
1101 // and relative position inside a segment
1102 mSeqNumber = getSeqNumberForTime(mStartTimeUs);
1103 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
1104 }
1105 mStartTimeUsRelative = true;
1106 FLOGV("Initial sequence number for time %lld is %d from (%d .. %d)",
1107 (long long)mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
1108 lastSeqNumberInPlaylist);
1109 } else {
1110 // When adapting or track switching, mSegmentStartTimeUs (relative
1111 // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
1112 // timestamps coming from the media container) is used to determine the position
1113 // inside a segments.
1114 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES
1115 && mSeekMode != LiveSession::kSeekModeNextSample) {
1116 // avoid double fetch/decode
1117 // Use (mSegmentStartTimeUs + 1/2 * targetDurationUs) to search
1118 // for the starting segment in new variant.
1119 // If the two variants' segments are aligned, this gives the
1120 // next segment. If they're not aligned, this gives the segment
1121 // that overlaps no more than 1/2 * targetDurationUs.
1122 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs
1123 + mPlaylist->getTargetDuration() / 2);
1124 } else {
1125 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
1126 }
1127 ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
1128 if (mSeqNumber < minSeq) {
1129 mSeqNumber = minSeq;
1130 }
1131
1132 if (mSeqNumber < firstSeqNumberInPlaylist) {
1133 mSeqNumber = firstSeqNumberInPlaylist;
1134 }
1135
1136 if (mSeqNumber > lastSeqNumberInPlaylist) {
1137 mSeqNumber = lastSeqNumberInPlaylist;
1138 }
1139 FLOGV("Initial sequence number is %d from (%d .. %d)",
1140 mSeqNumber, firstSeqNumberInPlaylist,
1141 lastSeqNumberInPlaylist);
1142 }
1143 }
1144
1145 // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
1146 if (mSeqNumber < firstSeqNumberInPlaylist
1147 || mSeqNumber > lastSeqNumberInPlaylist
1148 || err != OK) {
1149 if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
1150 ++mNumRetries;
1151
1152 if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
1153 // make sure we reach this retry logic on refresh failures
1154 // by adding an err != OK clause to all enclosing if's.
1155
1156 // refresh in increasing fraction (1/2, 1/3, ...) of the
1157 // playlist's target duration or 3 seconds, whichever is less
1158 int64_t delayUs = kMaxMonitorDelayUs;
1159 if (mPlaylist != NULL) {
1160 delayUs = mPlaylist->size() * mPlaylist->getTargetDuration()
1161 / (1 + mNumRetries);
1162 }
1163 if (delayUs > kMaxMonitorDelayUs) {
1164 delayUs = kMaxMonitorDelayUs;
1165 }
1166 FLOGV("sequence number high: %d from (%d .. %d), "
1167 "monitor in %lld (retry=%d)",
1168 mSeqNumber, firstSeqNumberInPlaylist,
1169 lastSeqNumberInPlaylist, (long long)delayUs, mNumRetries);
1170 postMonitorQueue(delayUs);
1171 return false;
1172 }
1173
1174 if (err != OK) {
1175 notifyError(err);
1176 return false;
1177 }
1178
1179 // we've missed the boat, let's start 3 segments prior to the latest sequence
1180 // number available and signal a discontinuity.
1181
1182 ALOGI("We've missed the boat, restarting playback."
1183 " mStartup=%d, was looking for %d in %d-%d",
1184 mStartup, mSeqNumber, firstSeqNumberInPlaylist,
1185 lastSeqNumberInPlaylist);
1186 if (mStopParams != NULL) {
1187 // we should have kept on fetching until we hit the boundaries in mStopParams,
1188 // but since the segments we are supposed to fetch have already rolled off
1189 // the playlist, i.e. we have already missed the boat, we inevitably have to
1190 // skip.
1191 notifyStopReached();
1192 return false;
1193 }
1194 mSeqNumber = lastSeqNumberInPlaylist - 3;
1195 if (mSeqNumber < firstSeqNumberInPlaylist) {
1196 mSeqNumber = firstSeqNumberInPlaylist;
1197 }
1198 discontinuity = true;
1199
1200 // fall through
1201 } else {
1202 if (mPlaylist != NULL) {
1203 if (mSeqNumber >= firstSeqNumberInPlaylist + (int32_t)mPlaylist->size()
1204 && !mPlaylist->isComplete()) {
1205 // Live playlists
1206 ALOGW("sequence number %d not yet available", mSeqNumber);
1207 postMonitorQueue(delayUsToRefreshPlaylist());
1208 return false;
1209 }
1210 ALOGE("Cannot find sequence number %d in playlist "
1211 "(contains %d - %d)",
1212 mSeqNumber, firstSeqNumberInPlaylist,
1213 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
1214
1215 if (mTSParser != NULL) {
1216 mTSParser->signalEOS(ERROR_END_OF_STREAM);
1217 // Use an empty buffer; we don't have any new data, just want to extract
1218 // potential new access units after flush. Reset mSeqNumber to
1219 // lastSeqNumberInPlaylist such that we set the correct access unit
1220 // properties in extractAndQueueAccessUnitsFromTs.
1221 sp<ABuffer> buffer = new ABuffer(0);
1222 mSeqNumber = lastSeqNumberInPlaylist;
1223 extractAndQueueAccessUnitsFromTs(buffer);
1224 }
1225 notifyError(ERROR_END_OF_STREAM);
1226 } else {
1227 // It's possible that we were never able to download the playlist.
1228 // In this case we should notify error, instead of EOS, as EOS during
1229 // prepare means we succeeded in downloading everything.
1230 ALOGE("Failed to download playlist!");
1231 notifyError(ERROR_IO);
1232 }
1233
1234 return false;
1235 }
1236 }
1237
1238 mNumRetries = 0;
1239
1240 CHECK(mPlaylist->itemAt(
1241 mSeqNumber - firstSeqNumberInPlaylist,
1242 &uri,
1243 &itemMeta));
1244
1245 CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq));
1246
1247 int32_t val;
1248 if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
1249 discontinuity = true;
1250 } else if (mLastDiscontinuitySeq >= 0
1251 && mDiscontinuitySeq != mLastDiscontinuitySeq) {
1252 // Seek jumped to a new discontinuity sequence. We need to signal
1253 // a format change to decoder. Decoder needs to shutdown and be
1254 // created again if seamless format change is unsupported.
1255 FLOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, "
1256 "mDiscontinuitySeq %d, mStartTimeUs %lld",
1257 mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs);
1258 discontinuity = true;
1259 }
1260 mLastDiscontinuitySeq = -1;
1261
1262 // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
1263 // this avoids interleaved connections to the key and segment file.
1264 {
1265 sp<ABuffer> junk = new ABuffer(16);
1266 junk->setRange(0, 16);
1267 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
1268 true /* first */);
1269 if (err == ERROR_NOT_CONNECTED) {
1270 return false;
1271 } else if (err != OK) {
1272 notifyError(err);
1273 return false;
1274 }
1275 }
1276
1277 if ((mStartup && !mTimeChangeSignaled) || discontinuity) {
1278 // We need to signal a time discontinuity to ATSParser on the
1279 // first segment after start, or on a discontinuity segment.
1280 // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX()
1281 // to send the time discontinuity.
1282 if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
1283 // If this was a live event this made no sense since
1284 // we don't have access to all the segment before the current
1285 // one.
1286 mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
1287 }
1288
1289 // Setting mTimeChangeSignaled to true, so that if start time
1290 // searching goes into 2nd segment (without a discontinuity),
1291 // we don't reset time again. It causes corruption when pending
1292 // data in ATSParser is cleared.
1293 mTimeChangeSignaled = true;
1294 }
1295
1296 if (discontinuity) {
1297 ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
1298
1299 // Signal a format discontinuity to ATSParser to clear partial data
1300 // from previous streams. Not doing this causes bitstream corruption.
1301 if (mTSParser != NULL) {
1302 mTSParser.clear();
1303 }
1304
1305 queueDiscontinuity(
1306 ATSParser::DISCONTINUITY_FORMAT_ONLY,
1307 NULL /* extra */);
1308
1309 if (mStartup && mStartTimeUsRelative && mFirstPTSValid) {
1310 // This means we guessed mStartTimeUs to be in the previous
1311 // segment (likely very close to the end), but either video or
1312 // audio has not found start by the end of that segment.
1313 //
1314 // If this new segment is not a discontinuity, keep searching.
1315 //
1316 // If this new segment even got a discontinuity marker, just
1317 // set mStartTimeUs=0, and take all samples from now on.
1318 mStartTimeUs = 0;
1319 mFirstPTSValid = false;
1320 mIDRFound = false;
1321 mVideoBuffer->clear();
1322 }
1323 }
1324
1325 FLOGV("fetching segment %d from (%d .. %d)",
1326 mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1327 return true;
1328 }
1329
onDownloadNext()1330 void PlaylistFetcher::onDownloadNext() {
1331 AString uri;
1332 sp<AMessage> itemMeta;
1333 sp<ABuffer> buffer;
1334 sp<ABuffer> tsBuffer;
1335 int32_t firstSeqNumberInPlaylist = 0;
1336 int32_t lastSeqNumberInPlaylist = 0;
1337 bool connectHTTP = true;
1338
1339 if (mDownloadState->hasSavedState()) {
1340 mDownloadState->restoreState(
1341 uri,
1342 itemMeta,
1343 buffer,
1344 tsBuffer,
1345 firstSeqNumberInPlaylist,
1346 lastSeqNumberInPlaylist);
1347 connectHTTP = false;
1348 FLOGV("resuming: '%s'", uri.c_str());
1349 } else {
1350 if (!initDownloadState(
1351 uri,
1352 itemMeta,
1353 firstSeqNumberInPlaylist,
1354 lastSeqNumberInPlaylist)) {
1355 return;
1356 }
1357 FLOGV("fetching: '%s'", uri.c_str());
1358 }
1359
1360 int64_t range_offset, range_length;
1361 if (!itemMeta->findInt64("range-offset", &range_offset)
1362 || !itemMeta->findInt64("range-length", &range_length)) {
1363 range_offset = 0;
1364 range_length = -1;
1365 }
1366
1367 // block-wise download
1368 bool shouldPause = false;
1369 ssize_t bytesRead;
1370 do {
1371 int64_t startUs = ALooper::GetNowUs();
1372 bytesRead = mHTTPDownloader->fetchBlock(
1373 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize,
1374 NULL /* actualURL */, connectHTTP);
1375 int64_t delayUs = ALooper::GetNowUs() - startUs;
1376
1377 if (bytesRead == ERROR_NOT_CONNECTED) {
1378 return;
1379 }
1380 if (bytesRead < 0) {
1381 status_t err = bytesRead;
1382 ALOGE("failed to fetch .ts segment at url '%s'", uriDebugString(uri).c_str());
1383 notifyError(err);
1384 return;
1385 }
1386
1387 // add sample for bandwidth estimation, excluding samples from subtitles (as
1388 // its too small), or during startup/resumeUntil (when we could have more than
1389 // one connection open which affects bandwidth)
1390 if (!mStartup && mStopParams == NULL && bytesRead > 0
1391 && (mStreamTypeMask
1392 & (LiveSession::STREAMTYPE_AUDIO
1393 | LiveSession::STREAMTYPE_VIDEO))) {
1394 mSession->addBandwidthMeasurement(bytesRead, delayUs);
1395 if (delayUs > 2000000LL) {
1396 FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip",
1397 bytesRead, (double)delayUs / 1.0e6);
1398 }
1399 }
1400
1401 connectHTTP = false;
1402
1403 CHECK(buffer != NULL);
1404
1405 size_t size = buffer->size();
1406 // Set decryption range.
1407 buffer->setRange(size - bytesRead, bytesRead);
1408 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
1409 buffer->offset() == 0 /* first */);
1410 // Unset decryption range.
1411 buffer->setRange(0, size);
1412
1413 if (err != OK) {
1414 ALOGE("decryptBuffer failed w/ error %d", err);
1415
1416 notifyError(err);
1417 return;
1418 }
1419
1420 bool startUp = mStartup; // save current start up state
1421
1422 err = OK;
1423 if (bufferStartsWithTsSyncByte(buffer)) {
1424 // Incremental extraction is only supported for MPEG2 transport streams.
1425 if (tsBuffer == NULL) {
1426 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1427 tsBuffer->setRange(0, 0);
1428 } else if (tsBuffer->capacity() != buffer->capacity()) {
1429 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
1430 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1431 tsBuffer->setRange(tsOff, tsSize);
1432 }
1433 tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
1434 err = extractAndQueueAccessUnitsFromTs(tsBuffer);
1435 }
1436
1437 if (err == -EAGAIN) {
1438 // starting sequence number too low/high
1439 mTSParser.clear();
1440 for (size_t i = 0; i < mPacketSources.size(); i++) {
1441 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1442 packetSource->clear();
1443 }
1444 postMonitorQueue();
1445 return;
1446 } else if (err == ERROR_OUT_OF_RANGE) {
1447 // reached stopping point
1448 notifyStopReached();
1449 return;
1450 } else if (err != OK) {
1451 notifyError(err);
1452 return;
1453 }
1454 // If we're switching, post start notification
1455 // this should only be posted when the last chunk is full processed by TSParser
1456 if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1457 CHECK(mStartTimeUsNotify != NULL);
1458 mStartTimeUsNotify->post();
1459 mStartTimeUsNotify.clear();
1460 shouldPause = true;
1461 }
1462 if (shouldPause || shouldPauseDownload()) {
1463 // save state and return if this is not the last chunk,
1464 // leaving the fetcher in paused state.
1465 if (bytesRead != 0) {
1466 mDownloadState->saveState(
1467 uri,
1468 itemMeta,
1469 buffer,
1470 tsBuffer,
1471 firstSeqNumberInPlaylist,
1472 lastSeqNumberInPlaylist);
1473 return;
1474 }
1475 shouldPause = true;
1476 }
1477 } while (bytesRead != 0);
1478
1479 if (bufferStartsWithTsSyncByte(buffer)) {
1480 // If we don't see a stream in the program table after fetching a full ts segment
1481 // mark it as nonexistent.
1482 ATSParser::SourceType srcTypes[] =
1483 { ATSParser::VIDEO, ATSParser::AUDIO };
1484 LiveSession::StreamType streamTypes[] =
1485 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
1486 const size_t kNumTypes = NELEM(srcTypes);
1487
1488 for (size_t i = 0; i < kNumTypes; i++) {
1489 ATSParser::SourceType srcType = srcTypes[i];
1490 LiveSession::StreamType streamType = streamTypes[i];
1491
1492 sp<AnotherPacketSource> source =
1493 static_cast<AnotherPacketSource *>(
1494 mTSParser->getSource(srcType).get());
1495
1496 if (!mTSParser->hasSource(srcType)) {
1497 ALOGW("MPEG2 Transport stream does not contain %s data.",
1498 srcType == ATSParser::VIDEO ? "video" : "audio");
1499
1500 mStreamTypeMask &= ~streamType;
1501 mPacketSources.removeItem(streamType);
1502 }
1503 }
1504
1505 }
1506
1507 if (checkDecryptPadding(buffer) != OK) {
1508 ALOGE("Incorrect padding bytes after decryption.");
1509 notifyError(ERROR_MALFORMED);
1510 return;
1511 }
1512
1513 if (tsBuffer != NULL) {
1514 AString method;
1515 CHECK(buffer->meta()->findString("cipher-method", &method));
1516 if ((tsBuffer->size() > 0 && method == "NONE")
1517 || tsBuffer->size() > 16) {
1518 ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1519 "bytes in length.");
1520 notifyError(ERROR_MALFORMED);
1521 return;
1522 }
1523 }
1524
1525 // bulk extract non-ts files
1526 bool startUp = mStartup;
1527 if (tsBuffer == NULL) {
1528 status_t err = extractAndQueueAccessUnits(buffer, itemMeta);
1529 if (err == -EAGAIN) {
1530 // starting sequence number too low/high
1531 postMonitorQueue();
1532 return;
1533 } else if (err == ERROR_OUT_OF_RANGE) {
1534 // reached stopping point
1535 notifyStopReached();
1536 return;
1537 } else if (err != OK) {
1538 notifyError(err);
1539 return;
1540 }
1541 }
1542
1543 ++mSeqNumber;
1544
1545 // if adapting, pause after found the next starting point
1546 if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1547 CHECK(mStartTimeUsNotify != NULL);
1548 mStartTimeUsNotify->post();
1549 mStartTimeUsNotify.clear();
1550 shouldPause = true;
1551 }
1552
1553 if (!shouldPause) {
1554 postMonitorQueue();
1555 }
1556 }
1557
1558 /*
1559 * returns true if we need to adjust mSeqNumber
1560 */
adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs)1561 bool PlaylistFetcher::adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs) {
1562 int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1563
1564 int64_t minDiffUs, maxDiffUs;
1565 if (mSeekMode == LiveSession::kSeekModeNextSample) {
1566 // if the previous fetcher paused in the middle of a segment, we
1567 // want to start at a segment that overlaps the last sample
1568 minDiffUs = -mPlaylist->getTargetDuration();
1569 maxDiffUs = 0LL;
1570 } else {
1571 // if the previous fetcher paused at the end of a segment, ideally
1572 // we want to start at the segment that's roughly aligned with its
1573 // next segment, but if the two variants are not well aligned we
1574 // adjust the diff to within (-T/2, T/2)
1575 minDiffUs = -mPlaylist->getTargetDuration() / 2;
1576 maxDiffUs = mPlaylist->getTargetDuration() / 2;
1577 }
1578
1579 int32_t oldSeqNumber = mSeqNumber;
1580 ssize_t index = mSeqNumber - firstSeqNumberInPlaylist;
1581
1582 // adjust anchorTimeUs to within (minDiffUs, maxDiffUs) from mStartTimeUs
1583 int64_t diffUs = anchorTimeUs - mStartTimeUs;
1584 if (diffUs > maxDiffUs) {
1585 while (index > 0 && diffUs > maxDiffUs) {
1586 --index;
1587
1588 sp<AMessage> itemMeta;
1589 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1590
1591 int64_t itemDurationUs;
1592 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1593
1594 diffUs -= itemDurationUs;
1595 }
1596 } else if (diffUs < minDiffUs) {
1597 while (index + 1 < (ssize_t) mPlaylist->size()
1598 && diffUs < minDiffUs) {
1599 ++index;
1600
1601 sp<AMessage> itemMeta;
1602 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1603
1604 int64_t itemDurationUs;
1605 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1606
1607 diffUs += itemDurationUs;
1608 }
1609 }
1610
1611 mSeqNumber = firstSeqNumberInPlaylist + index;
1612
1613 if (mSeqNumber != oldSeqNumber) {
1614 FLOGV("guessed wrong seg number: diff %lld out of [%lld, %lld]",
1615 (long long) anchorTimeUs - mStartTimeUs,
1616 (long long) minDiffUs,
1617 (long long) maxDiffUs);
1618 return true;
1619 }
1620 return false;
1621 }
1622
getSeqNumberForDiscontinuity(size_t discontinuitySeq) const1623 int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1624 int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1625
1626 size_t index = 0;
1627 while (index < mPlaylist->size()) {
1628 sp<AMessage> itemMeta;
1629 CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1630 size_t curDiscontinuitySeq;
1631 CHECK(itemMeta->findInt32("discontinuity-sequence", (int32_t *)&curDiscontinuitySeq));
1632 int32_t seqNumber = firstSeqNumberInPlaylist + index;
1633 if (curDiscontinuitySeq == discontinuitySeq) {
1634 return seqNumber;
1635 } else if (curDiscontinuitySeq > discontinuitySeq) {
1636 return seqNumber <= 0 ? 0 : seqNumber - 1;
1637 }
1638
1639 ++index;
1640 }
1641
1642 return firstSeqNumberInPlaylist + mPlaylist->size();
1643 }
1644
getSeqNumberForTime(int64_t timeUs) const1645 int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1646 size_t index = 0;
1647 int64_t segmentStartUs = 0;
1648 while (index < mPlaylist->size()) {
1649 sp<AMessage> itemMeta;
1650 CHECK(mPlaylist->itemAt(
1651 index, NULL /* uri */, &itemMeta));
1652
1653 int64_t itemDurationUs;
1654 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1655
1656 if (timeUs < segmentStartUs + itemDurationUs) {
1657 break;
1658 }
1659
1660 segmentStartUs += itemDurationUs;
1661 ++index;
1662 }
1663
1664 if (index >= mPlaylist->size()) {
1665 index = mPlaylist->size() - 1;
1666 }
1667
1668 return mPlaylist->getFirstSeqNumber() + index;
1669 }
1670
setAccessUnitProperties(const sp<ABuffer> & accessUnit,const sp<AnotherPacketSource> & source,bool discard)1671 const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1672 const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1673 sp<MetaData> format = source->getFormat();
1674 if (format != NULL) {
1675 // for simplicity, store a reference to the format in each unit
1676 accessUnit->meta()->setObject("format", format);
1677 }
1678
1679 if (discard) {
1680 accessUnit->meta()->setInt32("discard", discard);
1681 }
1682
1683 accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1684 accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1685 accessUnit->meta()->setInt64("segmentFirstTimeUs", mSegmentFirstPTS);
1686 accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber));
1687 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1688 accessUnit->meta()->setInt64("playlistTimeUs", mPlaylistTimeUs);
1689 }
1690 return accessUnit;
1691 }
1692
isStartTimeReached(int64_t timeUs)1693 bool PlaylistFetcher::isStartTimeReached(int64_t timeUs) {
1694 if (!mFirstPTSValid) {
1695 mFirstTimeUs = timeUs;
1696 mFirstPTSValid = true;
1697 }
1698 bool startTimeReached = true;
1699 if (mStartTimeUsRelative) {
1700 FLOGV("startTimeUsRelative, timeUs (%lld) - %lld = %lld",
1701 (long long)timeUs,
1702 (long long)mFirstTimeUs,
1703 (long long)(timeUs - mFirstTimeUs));
1704 timeUs -= mFirstTimeUs;
1705 if (timeUs < 0) {
1706 FLOGV("clamp negative timeUs to 0");
1707 timeUs = 0;
1708 }
1709 startTimeReached = (timeUs >= mStartTimeUs);
1710 }
1711 return startTimeReached;
1712 }
1713
extractAndQueueAccessUnitsFromTs(const sp<ABuffer> & buffer)1714 status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1715 if (mTSParser == NULL) {
1716 // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1717 mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1718 }
1719
1720 if (mNextPTSTimeUs >= 0LL) {
1721 sp<AMessage> extra = new AMessage;
1722 // Since we are using absolute timestamps, signal an offset of 0 to prevent
1723 // ATSParser from skewing the timestamps of access units.
1724 extra->setInt64(kATSParserKeyMediaTimeUs, 0);
1725
1726 // When adapting, signal a recent media time to the parser,
1727 // so that PTS wrap around is handled for the new variant.
1728 if (mStartTimeUs >= 0 && !mStartTimeUsRelative) {
1729 extra->setInt64(kATSParserKeyRecentMediaTimeUs, mStartTimeUs);
1730 }
1731
1732 mTSParser->signalDiscontinuity(
1733 ATSParser::DISCONTINUITY_TIME, extra);
1734
1735 mNextPTSTimeUs = -1LL;
1736 }
1737
1738 if (mSampleAesKeyItemChanged) {
1739 mTSParser->signalNewSampleAesKey(mSampleAesKeyItem);
1740 mSampleAesKeyItemChanged = false;
1741 }
1742
1743 size_t offset = 0;
1744 while (offset + 188 <= buffer->size()) {
1745 status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1746
1747 if (err != OK) {
1748 return err;
1749 }
1750
1751 offset += 188;
1752 }
1753 // setRange to indicate consumed bytes.
1754 buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1755
1756 if (mSegmentFirstPTS < 0LL) {
1757 // get the smallest first PTS from all streams present in this parser
1758 for (size_t i = mPacketSources.size(); i > 0;) {
1759 i--;
1760 const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1761 if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1762 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1763 return ERROR_MALFORMED;
1764 }
1765 if (stream == LiveSession::STREAMTYPE_METADATA) {
1766 continue;
1767 }
1768 ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1769 sp<AnotherPacketSource> source =
1770 static_cast<AnotherPacketSource *>(
1771 mTSParser->getSource(type).get());
1772
1773 if (source == NULL) {
1774 continue;
1775 }
1776 sp<AMessage> meta = source->getMetaAfterLastDequeued(0);
1777 if (meta != NULL) {
1778 int64_t timeUs;
1779 CHECK(meta->findInt64("timeUs", &timeUs));
1780 if (mSegmentFirstPTS < 0LL || timeUs < mSegmentFirstPTS) {
1781 mSegmentFirstPTS = timeUs;
1782 }
1783 }
1784 }
1785 if (mSegmentFirstPTS < 0LL) {
1786 // didn't find any TS packet, can return early
1787 return OK;
1788 }
1789 if (!mStartTimeUsRelative) {
1790 // mStartup
1791 // mStartup is true until we have queued a packet for all the streams
1792 // we are fetching. We queue packets whose timestamps are greater than
1793 // mStartTimeUs.
1794 // mSegmentStartTimeUs >= 0
1795 // mSegmentStartTimeUs is non-negative when adapting or switching tracks
1796 // adjustSeqNumberWithAnchorTime(timeUs) == true
1797 // we guessed a seq number that's either too large or too small.
1798 // If this happens, we'll adjust mSeqNumber and restart fetching from new
1799 // location. Note that we only want to adjust once, so set mSegmentStartTimeUs
1800 // to -1 so that we don't enter this chunk next time.
1801 if (mStartup && mSegmentStartTimeUs >= 0
1802 && adjustSeqNumberWithAnchorTime(mSegmentFirstPTS)) {
1803 mStartTimeUsNotify = mNotify->dup();
1804 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1805 mStartTimeUsNotify->setString("uri", mURI);
1806 mIDRFound = false;
1807 mSegmentStartTimeUs = -1;
1808 return -EAGAIN;
1809 }
1810 }
1811 }
1812
1813 status_t err = OK;
1814 for (size_t i = mPacketSources.size(); i > 0;) {
1815 i--;
1816 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1817
1818 const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1819 if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1820 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1821 return ERROR_MALFORMED;
1822 }
1823
1824 const char *key = LiveSession::getKeyForStream(stream);
1825 ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1826
1827 sp<AnotherPacketSource> source =
1828 static_cast<AnotherPacketSource *>(
1829 mTSParser->getSource(type).get());
1830
1831 if (source == NULL) {
1832 continue;
1833 }
1834
1835 const char *mime;
1836 sp<MetaData> format = source->getFormat();
1837 bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime)
1838 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
1839
1840 sp<ABuffer> accessUnit;
1841 status_t finalResult;
1842 while (source->hasBufferAvailable(&finalResult)
1843 && source->dequeueAccessUnit(&accessUnit) == OK) {
1844
1845 int64_t timeUs;
1846 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1847
1848 if (mStartup) {
1849 bool startTimeReached = isStartTimeReached(timeUs);
1850
1851 if (!startTimeReached || (isAvc && !mIDRFound)) {
1852 // buffer up to the closest preceding IDR frame in the next segement,
1853 // or the closest succeeding IDR frame after the exact position
1854 FSLOGV(stream, "timeUs(%lld)-mStartTimeUs(%lld)=%lld, mIDRFound=%d",
1855 (long long)timeUs,
1856 (long long)mStartTimeUs,
1857 (long long)timeUs - mStartTimeUs,
1858 mIDRFound);
1859 if (isAvc) {
1860 if (IsIDR(accessUnit->data(), accessUnit->size())) {
1861 mVideoBuffer->clear();
1862 FSLOGV(stream, "found IDR, clear mVideoBuffer");
1863 mIDRFound = true;
1864 }
1865 if (mIDRFound && mStartTimeUsRelative && !startTimeReached) {
1866 mVideoBuffer->queueAccessUnit(accessUnit);
1867 FSLOGV(stream, "saving AVC video AccessUnit");
1868 }
1869 }
1870 if (!startTimeReached || (isAvc && !mIDRFound)) {
1871 continue;
1872 }
1873 }
1874 }
1875
1876 if (mStartTimeUsNotify != NULL) {
1877 uint32_t streamMask = 0;
1878 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1879 if ((mStreamTypeMask & mPacketSources.keyAt(i))
1880 && !(streamMask & mPacketSources.keyAt(i))) {
1881 streamMask |= mPacketSources.keyAt(i);
1882 mStartTimeUsNotify->setInt32("streamMask", streamMask);
1883 FSLOGV(stream, "found start point, timeUs=%lld, streamMask becomes %x",
1884 (long long)timeUs, streamMask);
1885
1886 if (streamMask == mStreamTypeMask) {
1887 FLOGV("found start point for all streams");
1888 mStartup = false;
1889 }
1890 }
1891 }
1892
1893 if (mStopParams != NULL) {
1894 int32_t discontinuitySeq;
1895 int64_t stopTimeUs;
1896 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1897 || discontinuitySeq > mDiscontinuitySeq
1898 || !mStopParams->findInt64(key, &stopTimeUs)
1899 || (discontinuitySeq == mDiscontinuitySeq
1900 && timeUs >= stopTimeUs)) {
1901 FSLOGV(stream, "reached stop point, timeUs=%lld", (long long)timeUs);
1902 mStreamTypeMask &= ~stream;
1903 mPacketSources.removeItemsAt(i);
1904 break;
1905 }
1906 }
1907
1908 if (stream == LiveSession::STREAMTYPE_VIDEO) {
1909 const bool discard = true;
1910 status_t status;
1911 while (mVideoBuffer->hasBufferAvailable(&status)) {
1912 sp<ABuffer> videoBuffer;
1913 mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1914 setAccessUnitProperties(videoBuffer, source, discard);
1915 packetSource->queueAccessUnit(videoBuffer);
1916 int64_t bufferTimeUs;
1917 CHECK(videoBuffer->meta()->findInt64("timeUs", &bufferTimeUs));
1918 FSLOGV(stream, "queueAccessUnit (saved), timeUs=%lld",
1919 (long long)bufferTimeUs);
1920 }
1921 } else if (stream == LiveSession::STREAMTYPE_METADATA && !mHasMetadata) {
1922 mHasMetadata = true;
1923 sp<AMessage> notify = mNotify->dup();
1924 notify->setInt32("what", kWhatMetadataDetected);
1925 notify->post();
1926 }
1927
1928 setAccessUnitProperties(accessUnit, source);
1929 packetSource->queueAccessUnit(accessUnit);
1930 FSLOGV(stream, "queueAccessUnit, timeUs=%lld", (long long)timeUs);
1931 }
1932
1933 if (err != OK) {
1934 break;
1935 }
1936 }
1937
1938 if (err != OK) {
1939 for (size_t i = mPacketSources.size(); i > 0;) {
1940 i--;
1941 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1942 packetSource->clear();
1943 }
1944 return err;
1945 }
1946
1947 if (!mStreamTypeMask) {
1948 // Signal gap is filled between original and new stream.
1949 FLOGV("reached stop point for all streams");
1950 return ERROR_OUT_OF_RANGE;
1951 }
1952
1953 return OK;
1954 }
1955
1956 /* static */
bufferStartsWithWebVTTMagicSequence(const sp<ABuffer> & buffer)1957 bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1958 const sp<ABuffer> &buffer) {
1959 size_t pos = 0;
1960
1961 // skip possible BOM
1962 if (buffer->size() >= pos + 3 &&
1963 !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1964 pos += 3;
1965 }
1966
1967 // accept WEBVTT followed by SPACE, TAB or (CR) LF
1968 if (buffer->size() < pos + 6 ||
1969 memcmp("WEBVTT", buffer->data() + pos, 6)) {
1970 return false;
1971 }
1972 pos += 6;
1973
1974 if (buffer->size() == pos) {
1975 return true;
1976 }
1977
1978 uint8_t sep = buffer->data()[pos];
1979 return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1980 }
1981
extractAndQueueAccessUnits(const sp<ABuffer> & buffer,const sp<AMessage> & itemMeta)1982 status_t PlaylistFetcher::extractAndQueueAccessUnits(
1983 const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1984 if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1985 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1986 ALOGE("This stream only contains subtitles.");
1987 return ERROR_MALFORMED;
1988 }
1989
1990 const sp<AnotherPacketSource> packetSource =
1991 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1992
1993 int64_t durationUs;
1994 CHECK(itemMeta->findInt64("durationUs", &durationUs));
1995 buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1996 buffer->meta()->setInt64("durationUs", durationUs);
1997 buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1998 buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1999 buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
2000 packetSource->queueAccessUnit(buffer);
2001 return OK;
2002 }
2003
2004 if (mNextPTSTimeUs >= 0LL) {
2005 mNextPTSTimeUs = -1LL;
2006 }
2007
2008 // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
2009 // stream prefixed by an ID3 tag.
2010
2011 bool firstID3Tag = true;
2012 uint64_t PTS = 0;
2013
2014 for (;;) {
2015 // Make sure to skip all ID3 tags preceding the audio data.
2016 // At least one must be present to provide the PTS timestamp.
2017
2018 ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
2019 if (!id3.isValid()) {
2020 if (firstID3Tag) {
2021 ALOGE("Unable to parse ID3 tag.");
2022 return ERROR_MALFORMED;
2023 } else {
2024 break;
2025 }
2026 }
2027
2028 if (firstID3Tag) {
2029 bool found = false;
2030
2031 ID3::Iterator it(id3, "PRIV");
2032 while (!it.done()) {
2033 size_t length;
2034 const uint8_t *data = it.getData(&length);
2035 if (!data) {
2036 return ERROR_MALFORMED;
2037 }
2038
2039 static const char *kMatchName =
2040 "com.apple.streaming.transportStreamTimestamp";
2041 static const size_t kMatchNameLen = strlen(kMatchName);
2042
2043 if (length == kMatchNameLen + 1 + 8
2044 && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
2045 found = true;
2046 PTS = U64_AT(&data[kMatchNameLen + 1]);
2047 }
2048
2049 it.next();
2050 }
2051
2052 if (!found) {
2053 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
2054 return ERROR_MALFORMED;
2055 }
2056 }
2057
2058 // skip the ID3 tag
2059 buffer->setRange(
2060 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
2061
2062 firstID3Tag = false;
2063 }
2064
2065 if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
2066 ALOGW("This stream only contains audio data!");
2067
2068 mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
2069
2070 if (mStreamTypeMask == 0) {
2071 return OK;
2072 }
2073 }
2074
2075 sp<AnotherPacketSource> packetSource =
2076 mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
2077
2078 if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
2079 ABitReader bits(buffer->data(), buffer->size());
2080
2081 // adts_fixed_header
2082
2083 CHECK_EQ(bits.getBits(12), 0xfffu);
2084 bits.skipBits(3); // ID, layer
2085 bool protection_absent __unused = bits.getBits(1) != 0;
2086
2087 unsigned profile = bits.getBits(2);
2088 CHECK_NE(profile, 3u);
2089 unsigned sampling_freq_index = bits.getBits(4);
2090 bits.getBits(1); // private_bit
2091 unsigned channel_configuration = bits.getBits(3);
2092 CHECK_NE(channel_configuration, 0u);
2093 bits.skipBits(2); // original_copy, home
2094
2095 sp<MetaData> meta = new MetaData();
2096 MakeAACCodecSpecificData(*meta,
2097 profile, sampling_freq_index, channel_configuration);
2098
2099 meta->setInt32(kKeyIsADTS, true);
2100
2101 packetSource->setFormat(meta);
2102 }
2103
2104 int64_t numSamples = 0LL;
2105 int32_t sampleRate;
2106 CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
2107
2108 int64_t timeUs = (PTS * 100LL) / 9LL;
2109 if (mStartup && !mFirstPTSValid) {
2110 mFirstPTSValid = true;
2111 mFirstTimeUs = timeUs;
2112 }
2113
2114 if (mSegmentFirstPTS < 0LL) {
2115 mSegmentFirstPTS = timeUs;
2116 if (!mStartTimeUsRelative) {
2117 // Duplicated logic from how we handle .ts playlists.
2118 if (mStartup && mSegmentStartTimeUs >= 0
2119 && adjustSeqNumberWithAnchorTime(timeUs)) {
2120 mSegmentStartTimeUs = -1;
2121 return -EAGAIN;
2122 }
2123 }
2124 }
2125
2126 sp<HlsSampleDecryptor> sampleDecryptor = NULL;
2127 if (mSampleAesKeyItem != NULL) {
2128 ALOGV("extractAndQueueAccessUnits[%d] SampleAesKeyItem: Key: %s IV: %s",
2129 mSeqNumber,
2130 HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
2131 HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
2132
2133 sampleDecryptor = new HlsSampleDecryptor(mSampleAesKeyItem);
2134 }
2135
2136 int frameId = 0;
2137
2138 size_t offset = 0;
2139 while (offset < buffer->size()) {
2140 const uint8_t *adtsHeader = buffer->data() + offset;
2141 if (buffer->size() <= offset+5) {
2142 ALOGV("buffer does not contain a complete header");
2143 return ERROR_MALFORMED;
2144 }
2145 // non-const pointer for decryption if needed
2146 uint8_t *adtsFrame = buffer->data() + offset;
2147
2148 unsigned aac_frame_length =
2149 ((adtsHeader[3] & 3) << 11)
2150 | (adtsHeader[4] << 3)
2151 | (adtsHeader[5] >> 5);
2152
2153 if (aac_frame_length == 0) {
2154 const uint8_t *id3Header = adtsHeader;
2155 if (!memcmp(id3Header, "ID3", 3)) {
2156 ID3 id3(id3Header, buffer->size() - offset, true);
2157 if (id3.isValid()) {
2158 offset += id3.rawSize();
2159 continue;
2160 };
2161 }
2162 return ERROR_MALFORMED;
2163 }
2164
2165 if (aac_frame_length > buffer->size() - offset) {
2166 return ERROR_MALFORMED;
2167 }
2168
2169 int64_t unitTimeUs = timeUs + numSamples * 1000000LL / sampleRate;
2170 offset += aac_frame_length;
2171
2172 // Each AAC frame encodes 1024 samples.
2173 numSamples += 1024;
2174
2175 if (mStartup) {
2176 int64_t startTimeUs = unitTimeUs;
2177 if (mStartTimeUsRelative) {
2178 startTimeUs -= mFirstTimeUs;
2179 if (startTimeUs < 0) {
2180 startTimeUs = 0;
2181 }
2182 }
2183 if (startTimeUs < mStartTimeUs) {
2184 continue;
2185 }
2186
2187 if (mStartTimeUsNotify != NULL) {
2188 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
2189 mStartup = false;
2190 }
2191 }
2192
2193 if (mStopParams != NULL) {
2194 int32_t discontinuitySeq;
2195 int64_t stopTimeUs;
2196 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
2197 || discontinuitySeq > mDiscontinuitySeq
2198 || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
2199 || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
2200 mStreamTypeMask = 0;
2201 mPacketSources.clear();
2202 return ERROR_OUT_OF_RANGE;
2203 }
2204 }
2205
2206 if (sampleDecryptor != NULL) {
2207 bool protection_absent = (adtsHeader[1] & 0x1);
2208 size_t headerSize = protection_absent ? 7 : 9;
2209 if (frameId == 0) {
2210 ALOGV("extractAndQueueAAC[%d] protection_absent %d (%02x) headerSize %zu",
2211 mSeqNumber, protection_absent, adtsHeader[1], headerSize);
2212 }
2213
2214 sampleDecryptor->processAAC(headerSize, adtsFrame, aac_frame_length);
2215 }
2216 frameId++;
2217
2218 sp<ABuffer> unit = new ABuffer(aac_frame_length);
2219 memcpy(unit->data(), adtsHeader, aac_frame_length);
2220
2221 unit->meta()->setInt64("timeUs", unitTimeUs);
2222 setAccessUnitProperties(unit, packetSource);
2223 packetSource->queueAccessUnit(unit);
2224 }
2225
2226 return OK;
2227 }
2228
updateDuration()2229 void PlaylistFetcher::updateDuration() {
2230 int64_t durationUs = 0LL;
2231 for (size_t index = 0; index < mPlaylist->size(); ++index) {
2232 sp<AMessage> itemMeta;
2233 CHECK(mPlaylist->itemAt(
2234 index, NULL /* uri */, &itemMeta));
2235
2236 int64_t itemDurationUs;
2237 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
2238
2239 durationUs += itemDurationUs;
2240 }
2241
2242 sp<AMessage> msg = mNotify->dup();
2243 msg->setInt32("what", kWhatDurationUpdate);
2244 msg->setInt64("durationUs", durationUs);
2245 msg->post();
2246 }
2247
updateTargetDuration()2248 void PlaylistFetcher::updateTargetDuration() {
2249 sp<AMessage> msg = mNotify->dup();
2250 msg->setInt32("what", kWhatTargetDurationUpdate);
2251 msg->setInt64("targetDurationUs", mPlaylist->getTargetDuration());
2252 msg->post();
2253 }
2254
2255 } // namespace android
2256