1 /*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "PlaylistFetcher"
19 #include <utils/Log.h>
20 #include <utils/misc.h>
21
22 #include "PlaylistFetcher.h"
23 #include "HTTPDownloader.h"
24 #include "LiveSession.h"
25 #include "M3UParser.h"
26 #include "include/avc_utils.h"
27 #include "include/ID3.h"
28 #include "mpeg2ts/AnotherPacketSource.h"
29
30 #include <media/stagefright/foundation/ABitReader.h>
31 #include <media/stagefright/foundation/ABuffer.h>
32 #include <media/stagefright/foundation/ADebug.h>
33 #include <media/stagefright/MediaDefs.h>
34 #include <media/stagefright/MetaData.h>
35 #include <media/stagefright/Utils.h>
36
37 #include <ctype.h>
38 #include <inttypes.h>
39 #include <openssl/aes.h>
40
41 #define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__)
42 #define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \
43 LiveSession::getNameForStream(stream), ##__VA_ARGS__)
44
45 namespace android {
46
47 // static
48 const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000ll;
49 const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
50 // LCM of 188 (size of a TS packet) & 1k works well
51 const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
52
53 struct PlaylistFetcher::DownloadState : public RefBase {
54 DownloadState();
55 void resetState();
56 bool hasSavedState() const;
57 void restoreState(
58 AString &uri,
59 sp<AMessage> &itemMeta,
60 sp<ABuffer> &buffer,
61 sp<ABuffer> &tsBuffer,
62 int32_t &firstSeqNumberInPlaylist,
63 int32_t &lastSeqNumberInPlaylist);
64 void saveState(
65 AString &uri,
66 sp<AMessage> &itemMeta,
67 sp<ABuffer> &buffer,
68 sp<ABuffer> &tsBuffer,
69 int32_t &firstSeqNumberInPlaylist,
70 int32_t &lastSeqNumberInPlaylist);
71
72 private:
73 bool mHasSavedState;
74 AString mUri;
75 sp<AMessage> mItemMeta;
76 sp<ABuffer> mBuffer;
77 sp<ABuffer> mTsBuffer;
78 int32_t mFirstSeqNumberInPlaylist;
79 int32_t mLastSeqNumberInPlaylist;
80 };
81
DownloadState()82 PlaylistFetcher::DownloadState::DownloadState() {
83 resetState();
84 }
85
hasSavedState() const86 bool PlaylistFetcher::DownloadState::hasSavedState() const {
87 return mHasSavedState;
88 }
89
resetState()90 void PlaylistFetcher::DownloadState::resetState() {
91 mHasSavedState = false;
92
93 mUri.clear();
94 mItemMeta = NULL;
95 mBuffer = NULL;
96 mTsBuffer = NULL;
97 mFirstSeqNumberInPlaylist = 0;
98 mLastSeqNumberInPlaylist = 0;
99 }
100
restoreState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)101 void PlaylistFetcher::DownloadState::restoreState(
102 AString &uri,
103 sp<AMessage> &itemMeta,
104 sp<ABuffer> &buffer,
105 sp<ABuffer> &tsBuffer,
106 int32_t &firstSeqNumberInPlaylist,
107 int32_t &lastSeqNumberInPlaylist) {
108 if (!mHasSavedState) {
109 return;
110 }
111
112 uri = mUri;
113 itemMeta = mItemMeta;
114 buffer = mBuffer;
115 tsBuffer = mTsBuffer;
116 firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist;
117 lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist;
118
119 resetState();
120 }
121
saveState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)122 void PlaylistFetcher::DownloadState::saveState(
123 AString &uri,
124 sp<AMessage> &itemMeta,
125 sp<ABuffer> &buffer,
126 sp<ABuffer> &tsBuffer,
127 int32_t &firstSeqNumberInPlaylist,
128 int32_t &lastSeqNumberInPlaylist) {
129 mHasSavedState = true;
130
131 mUri = uri;
132 mItemMeta = itemMeta;
133 mBuffer = buffer;
134 mTsBuffer = tsBuffer;
135 mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist;
136 mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist;
137 }
138
PlaylistFetcher(const sp<AMessage> & notify,const sp<LiveSession> & session,const char * uri,int32_t id,int32_t subtitleGeneration)139 PlaylistFetcher::PlaylistFetcher(
140 const sp<AMessage> ¬ify,
141 const sp<LiveSession> &session,
142 const char *uri,
143 int32_t id,
144 int32_t subtitleGeneration)
145 : mNotify(notify),
146 mSession(session),
147 mURI(uri),
148 mFetcherID(id),
149 mStreamTypeMask(0),
150 mStartTimeUs(-1ll),
151 mSegmentStartTimeUs(-1ll),
152 mDiscontinuitySeq(-1ll),
153 mStartTimeUsRelative(false),
154 mLastPlaylistFetchTimeUs(-1ll),
155 mPlaylistTimeUs(-1ll),
156 mSeqNumber(-1),
157 mNumRetries(0),
158 mStartup(true),
159 mIDRFound(false),
160 mSeekMode(LiveSession::kSeekModeExactPosition),
161 mTimeChangeSignaled(false),
162 mNextPTSTimeUs(-1ll),
163 mMonitorQueueGeneration(0),
164 mSubtitleGeneration(subtitleGeneration),
165 mLastDiscontinuitySeq(-1ll),
166 mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
167 mFirstPTSValid(false),
168 mFirstTimeUs(-1ll),
169 mVideoBuffer(new AnotherPacketSource(NULL)),
170 mThresholdRatio(-1.0f),
171 mDownloadState(new DownloadState()),
172 mHasMetadata(false) {
173 memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
174 mHTTPDownloader = mSession->getHTTPDownloader();
175 }
176
~PlaylistFetcher()177 PlaylistFetcher::~PlaylistFetcher() {
178 }
179
getFetcherID() const180 int32_t PlaylistFetcher::getFetcherID() const {
181 return mFetcherID;
182 }
183
getSegmentStartTimeUs(int32_t seqNumber) const184 int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
185 CHECK(mPlaylist != NULL);
186
187 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
188 mPlaylist->getSeqNumberRange(
189 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
190
191 CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
192 CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
193
194 int64_t segmentStartUs = 0ll;
195 for (int32_t index = 0;
196 index < seqNumber - firstSeqNumberInPlaylist; ++index) {
197 sp<AMessage> itemMeta;
198 CHECK(mPlaylist->itemAt(
199 index, NULL /* uri */, &itemMeta));
200
201 int64_t itemDurationUs;
202 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
203
204 segmentStartUs += itemDurationUs;
205 }
206
207 return segmentStartUs;
208 }
209
getSegmentDurationUs(int32_t seqNumber) const210 int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const {
211 CHECK(mPlaylist != NULL);
212
213 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
214 mPlaylist->getSeqNumberRange(
215 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
216
217 CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
218 CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
219
220 int32_t index = seqNumber - firstSeqNumberInPlaylist;
221 sp<AMessage> itemMeta;
222 CHECK(mPlaylist->itemAt(
223 index, NULL /* uri */, &itemMeta));
224
225 int64_t itemDurationUs;
226 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
227
228 return itemDurationUs;
229 }
230
delayUsToRefreshPlaylist() const231 int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
232 int64_t nowUs = ALooper::GetNowUs();
233
234 if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
235 CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
236 return 0ll;
237 }
238
239 if (mPlaylist->isComplete()) {
240 return (~0llu >> 1);
241 }
242
243 int64_t targetDurationUs = mPlaylist->getTargetDuration();
244
245 int64_t minPlaylistAgeUs;
246
247 switch (mRefreshState) {
248 case INITIAL_MINIMUM_RELOAD_DELAY:
249 {
250 size_t n = mPlaylist->size();
251 if (n > 0) {
252 sp<AMessage> itemMeta;
253 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
254
255 int64_t itemDurationUs;
256 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
257
258 minPlaylistAgeUs = itemDurationUs;
259 break;
260 }
261
262 // fall through
263 }
264
265 case FIRST_UNCHANGED_RELOAD_ATTEMPT:
266 {
267 minPlaylistAgeUs = targetDurationUs / 2;
268 break;
269 }
270
271 case SECOND_UNCHANGED_RELOAD_ATTEMPT:
272 {
273 minPlaylistAgeUs = (targetDurationUs * 3) / 2;
274 break;
275 }
276
277 case THIRD_UNCHANGED_RELOAD_ATTEMPT:
278 {
279 minPlaylistAgeUs = targetDurationUs * 3;
280 break;
281 }
282
283 default:
284 TRESPASS();
285 break;
286 }
287
288 int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
289 return delayUs > 0ll ? delayUs : 0ll;
290 }
291
decryptBuffer(size_t playlistIndex,const sp<ABuffer> & buffer,bool first)292 status_t PlaylistFetcher::decryptBuffer(
293 size_t playlistIndex, const sp<ABuffer> &buffer,
294 bool first) {
295 sp<AMessage> itemMeta;
296 bool found = false;
297 AString method;
298
299 for (ssize_t i = playlistIndex; i >= 0; --i) {
300 AString uri;
301 CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
302
303 if (itemMeta->findString("cipher-method", &method)) {
304 found = true;
305 break;
306 }
307 }
308
309 if (!found) {
310 method = "NONE";
311 }
312 buffer->meta()->setString("cipher-method", method.c_str());
313
314 if (method == "NONE") {
315 return OK;
316 } else if (!(method == "AES-128")) {
317 ALOGE("Unsupported cipher method '%s'", method.c_str());
318 return ERROR_UNSUPPORTED;
319 }
320
321 AString keyURI;
322 if (!itemMeta->findString("cipher-uri", &keyURI)) {
323 ALOGE("Missing key uri");
324 return ERROR_MALFORMED;
325 }
326
327 ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
328
329 sp<ABuffer> key;
330 if (index >= 0) {
331 key = mAESKeyForURI.valueAt(index);
332 } else {
333 ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key);
334
335 if (err == ERROR_NOT_CONNECTED) {
336 return ERROR_NOT_CONNECTED;
337 } else if (err < 0) {
338 ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
339 return ERROR_IO;
340 } else if (key->size() != 16) {
341 ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
342 return ERROR_MALFORMED;
343 }
344
345 mAESKeyForURI.add(keyURI, key);
346 }
347
348 AES_KEY aes_key;
349 if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
350 ALOGE("failed to set AES decryption key.");
351 return UNKNOWN_ERROR;
352 }
353
354 size_t n = buffer->size();
355 if (!n) {
356 return OK;
357 }
358
359 if (n < 16 || n % 16) {
360 ALOGE("not enough or trailing bytes (%zu) in encrypted buffer", n);
361 return ERROR_MALFORMED;
362 }
363
364 if (first) {
365 // If decrypting the first block in a file, read the iv from the manifest
366 // or derive the iv from the file's sequence number.
367
368 AString iv;
369 if (itemMeta->findString("cipher-iv", &iv)) {
370 if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
371 || iv.size() > 16 * 2 + 2) {
372 ALOGE("malformed cipher IV '%s'.", iv.c_str());
373 return ERROR_MALFORMED;
374 }
375
376 while (iv.size() < 16 * 2 + 2) {
377 iv.insert("0", 1, 2);
378 }
379
380 memset(mAESInitVec, 0, sizeof(mAESInitVec));
381 for (size_t i = 0; i < 16; ++i) {
382 char c1 = tolower(iv.c_str()[2 + 2 * i]);
383 char c2 = tolower(iv.c_str()[3 + 2 * i]);
384 if (!isxdigit(c1) || !isxdigit(c2)) {
385 ALOGE("malformed cipher IV '%s'.", iv.c_str());
386 return ERROR_MALFORMED;
387 }
388 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
389 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
390
391 mAESInitVec[i] = nibble1 << 4 | nibble2;
392 }
393 } else {
394 memset(mAESInitVec, 0, sizeof(mAESInitVec));
395 mAESInitVec[15] = mSeqNumber & 0xff;
396 mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
397 mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
398 mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
399 }
400 }
401
402 AES_cbc_encrypt(
403 buffer->data(), buffer->data(), buffer->size(),
404 &aes_key, mAESInitVec, AES_DECRYPT);
405
406 return OK;
407 }
408
checkDecryptPadding(const sp<ABuffer> & buffer)409 status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
410 AString method;
411 CHECK(buffer->meta()->findString("cipher-method", &method));
412 if (method == "NONE") {
413 return OK;
414 }
415
416 uint8_t padding = 0;
417 if (buffer->size() > 0) {
418 padding = buffer->data()[buffer->size() - 1];
419 }
420
421 if (padding > 16) {
422 return ERROR_MALFORMED;
423 }
424
425 for (size_t i = buffer->size() - padding; i < padding; i++) {
426 if (buffer->data()[i] != padding) {
427 return ERROR_MALFORMED;
428 }
429 }
430
431 buffer->setRange(buffer->offset(), buffer->size() - padding);
432 return OK;
433 }
434
postMonitorQueue(int64_t delayUs,int64_t minDelayUs)435 void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
436 int64_t maxDelayUs = delayUsToRefreshPlaylist();
437 if (maxDelayUs < minDelayUs) {
438 maxDelayUs = minDelayUs;
439 }
440 if (delayUs > maxDelayUs) {
441 FLOGV("Need to refresh playlist in %lld", (long long)maxDelayUs);
442 delayUs = maxDelayUs;
443 }
444 sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this);
445 msg->setInt32("generation", mMonitorQueueGeneration);
446 msg->post(delayUs);
447 }
448
cancelMonitorQueue()449 void PlaylistFetcher::cancelMonitorQueue() {
450 ++mMonitorQueueGeneration;
451 }
452
setStoppingThreshold(float thresholdRatio,bool disconnect)453 void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) {
454 {
455 AutoMutex _l(mThresholdLock);
456 mThresholdRatio = thresholdRatio;
457 }
458 if (disconnect) {
459 mHTTPDownloader->disconnect();
460 }
461 }
462
resetStoppingThreshold(bool disconnect)463 void PlaylistFetcher::resetStoppingThreshold(bool disconnect) {
464 {
465 AutoMutex _l(mThresholdLock);
466 mThresholdRatio = -1.0f;
467 }
468 if (disconnect) {
469 mHTTPDownloader->disconnect();
470 } else {
471 // allow reconnect
472 mHTTPDownloader->reconnect();
473 }
474 }
475
getStoppingThreshold()476 float PlaylistFetcher::getStoppingThreshold() {
477 AutoMutex _l(mThresholdLock);
478 return mThresholdRatio;
479 }
480
startAsync(const sp<AnotherPacketSource> & audioSource,const sp<AnotherPacketSource> & videoSource,const sp<AnotherPacketSource> & subtitleSource,const sp<AnotherPacketSource> & metadataSource,int64_t startTimeUs,int64_t segmentStartTimeUs,int32_t startDiscontinuitySeq,LiveSession::SeekMode seekMode)481 void PlaylistFetcher::startAsync(
482 const sp<AnotherPacketSource> &audioSource,
483 const sp<AnotherPacketSource> &videoSource,
484 const sp<AnotherPacketSource> &subtitleSource,
485 const sp<AnotherPacketSource> &metadataSource,
486 int64_t startTimeUs,
487 int64_t segmentStartTimeUs,
488 int32_t startDiscontinuitySeq,
489 LiveSession::SeekMode seekMode) {
490 sp<AMessage> msg = new AMessage(kWhatStart, this);
491
492 uint32_t streamTypeMask = 0ul;
493
494 if (audioSource != NULL) {
495 msg->setPointer("audioSource", audioSource.get());
496 streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
497 }
498
499 if (videoSource != NULL) {
500 msg->setPointer("videoSource", videoSource.get());
501 streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
502 }
503
504 if (subtitleSource != NULL) {
505 msg->setPointer("subtitleSource", subtitleSource.get());
506 streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
507 }
508
509 if (metadataSource != NULL) {
510 msg->setPointer("metadataSource", metadataSource.get());
511 // metadataSource does not affect streamTypeMask.
512 }
513
514 msg->setInt32("streamTypeMask", streamTypeMask);
515 msg->setInt64("startTimeUs", startTimeUs);
516 msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
517 msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
518 msg->setInt32("seekMode", seekMode);
519 msg->post();
520 }
521
522 /*
523 * pauseAsync
524 *
525 * threshold: 0.0f - pause after current fetch block (default 47Kbytes)
526 * -1.0f - pause after finishing current segment
527 * 0.0~1.0f - pause if remaining of current segment exceeds threshold
528 */
pauseAsync(float thresholdRatio,bool disconnect)529 void PlaylistFetcher::pauseAsync(
530 float thresholdRatio, bool disconnect) {
531 setStoppingThreshold(thresholdRatio, disconnect);
532
533 (new AMessage(kWhatPause, this))->post();
534 }
535
stopAsync(bool clear)536 void PlaylistFetcher::stopAsync(bool clear) {
537 setStoppingThreshold(0.0f, true /* disconncect */);
538
539 sp<AMessage> msg = new AMessage(kWhatStop, this);
540 msg->setInt32("clear", clear);
541 msg->post();
542 }
543
resumeUntilAsync(const sp<AMessage> & params)544 void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> ¶ms) {
545 FLOGV("resumeUntilAsync: params=%s", params->debugString().c_str());
546
547 AMessage* msg = new AMessage(kWhatResumeUntil, this);
548 msg->setMessage("params", params);
549 msg->post();
550 }
551
fetchPlaylistAsync()552 void PlaylistFetcher::fetchPlaylistAsync() {
553 (new AMessage(kWhatFetchPlaylist, this))->post();
554 }
555
onMessageReceived(const sp<AMessage> & msg)556 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
557 switch (msg->what()) {
558 case kWhatStart:
559 {
560 status_t err = onStart(msg);
561
562 sp<AMessage> notify = mNotify->dup();
563 notify->setInt32("what", kWhatStarted);
564 notify->setInt32("err", err);
565 notify->post();
566 break;
567 }
568
569 case kWhatPause:
570 {
571 onPause();
572
573 sp<AMessage> notify = mNotify->dup();
574 notify->setInt32("what", kWhatPaused);
575 notify->setInt32("seekMode",
576 mDownloadState->hasSavedState()
577 ? LiveSession::kSeekModeNextSample
578 : LiveSession::kSeekModeNextSegment);
579 notify->post();
580 break;
581 }
582
583 case kWhatStop:
584 {
585 onStop(msg);
586
587 sp<AMessage> notify = mNotify->dup();
588 notify->setInt32("what", kWhatStopped);
589 notify->post();
590 break;
591 }
592
593 case kWhatFetchPlaylist:
594 {
595 bool unchanged;
596 sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
597 mURI.c_str(), NULL /* curPlaylistHash */, &unchanged);
598
599 sp<AMessage> notify = mNotify->dup();
600 notify->setInt32("what", kWhatPlaylistFetched);
601 notify->setObject("playlist", playlist);
602 notify->post();
603 break;
604 }
605
606 case kWhatMonitorQueue:
607 case kWhatDownloadNext:
608 {
609 int32_t generation;
610 CHECK(msg->findInt32("generation", &generation));
611
612 if (generation != mMonitorQueueGeneration) {
613 // Stale event
614 break;
615 }
616
617 if (msg->what() == kWhatMonitorQueue) {
618 onMonitorQueue();
619 } else {
620 onDownloadNext();
621 }
622 break;
623 }
624
625 case kWhatResumeUntil:
626 {
627 onResumeUntil(msg);
628 break;
629 }
630
631 default:
632 TRESPASS();
633 }
634 }
635
onStart(const sp<AMessage> & msg)636 status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
637 mPacketSources.clear();
638 mStopParams.clear();
639 mStartTimeUsNotify = mNotify->dup();
640 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
641 mStartTimeUsNotify->setString("uri", mURI);
642
643 uint32_t streamTypeMask;
644 CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
645
646 int64_t startTimeUs;
647 int64_t segmentStartTimeUs;
648 int32_t startDiscontinuitySeq;
649 int32_t seekMode;
650 CHECK(msg->findInt64("startTimeUs", &startTimeUs));
651 CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
652 CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
653 CHECK(msg->findInt32("seekMode", &seekMode));
654
655 if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
656 void *ptr;
657 CHECK(msg->findPointer("audioSource", &ptr));
658
659 mPacketSources.add(
660 LiveSession::STREAMTYPE_AUDIO,
661 static_cast<AnotherPacketSource *>(ptr));
662 }
663
664 if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
665 void *ptr;
666 CHECK(msg->findPointer("videoSource", &ptr));
667
668 mPacketSources.add(
669 LiveSession::STREAMTYPE_VIDEO,
670 static_cast<AnotherPacketSource *>(ptr));
671 }
672
673 if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
674 void *ptr;
675 CHECK(msg->findPointer("subtitleSource", &ptr));
676
677 mPacketSources.add(
678 LiveSession::STREAMTYPE_SUBTITLES,
679 static_cast<AnotherPacketSource *>(ptr));
680 }
681
682 void *ptr;
683 // metadataSource is not part of streamTypeMask
684 if ((streamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO))
685 && msg->findPointer("metadataSource", &ptr)) {
686 mPacketSources.add(
687 LiveSession::STREAMTYPE_METADATA,
688 static_cast<AnotherPacketSource *>(ptr));
689 }
690
691 mStreamTypeMask = streamTypeMask;
692
693 mSegmentStartTimeUs = segmentStartTimeUs;
694
695 if (startDiscontinuitySeq >= 0) {
696 mDiscontinuitySeq = startDiscontinuitySeq;
697 }
698
699 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
700 mSeekMode = (LiveSession::SeekMode) seekMode;
701
702 if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) {
703 mStartup = true;
704 mIDRFound = false;
705 mVideoBuffer->clear();
706 }
707
708 if (startTimeUs >= 0) {
709 mStartTimeUs = startTimeUs;
710 mFirstPTSValid = false;
711 mSeqNumber = -1;
712 mTimeChangeSignaled = false;
713 mDownloadState->resetState();
714 }
715
716 postMonitorQueue();
717
718 return OK;
719 }
720
onPause()721 void PlaylistFetcher::onPause() {
722 cancelMonitorQueue();
723 mLastDiscontinuitySeq = mDiscontinuitySeq;
724
725 resetStoppingThreshold(false /* disconnect */);
726 }
727
onStop(const sp<AMessage> & msg)728 void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
729 cancelMonitorQueue();
730
731 int32_t clear;
732 CHECK(msg->findInt32("clear", &clear));
733 if (clear) {
734 for (size_t i = 0; i < mPacketSources.size(); i++) {
735 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
736 packetSource->clear();
737 }
738 }
739
740 mDownloadState->resetState();
741 mPacketSources.clear();
742 mStreamTypeMask = 0;
743
744 resetStoppingThreshold(true /* disconnect */);
745 }
746
747 // Resume until we have reached the boundary timestamps listed in `msg`; when
748 // the remaining time is too short (within a resume threshold) stop immediately
749 // instead.
onResumeUntil(const sp<AMessage> & msg)750 status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
751 sp<AMessage> params;
752 CHECK(msg->findMessage("params", ¶ms));
753
754 mStopParams = params;
755 onDownloadNext();
756
757 return OK;
758 }
759
notifyStopReached()760 void PlaylistFetcher::notifyStopReached() {
761 sp<AMessage> notify = mNotify->dup();
762 notify->setInt32("what", kWhatStopReached);
763 notify->post();
764 }
765
notifyError(status_t err)766 void PlaylistFetcher::notifyError(status_t err) {
767 sp<AMessage> notify = mNotify->dup();
768 notify->setInt32("what", kWhatError);
769 notify->setInt32("err", err);
770 notify->post();
771 }
772
queueDiscontinuity(ATSParser::DiscontinuityType type,const sp<AMessage> & extra)773 void PlaylistFetcher::queueDiscontinuity(
774 ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
775 for (size_t i = 0; i < mPacketSources.size(); ++i) {
776 // do not discard buffer upon #EXT-X-DISCONTINUITY tag
777 // (seek will discard buffer by abandoning old fetchers)
778 mPacketSources.valueAt(i)->queueDiscontinuity(
779 type, extra, false /* discard */);
780 }
781 }
782
onMonitorQueue()783 void PlaylistFetcher::onMonitorQueue() {
784 // in the middle of an unfinished download, delay
785 // playlist refresh as it'll change seq numbers
786 if (!mDownloadState->hasSavedState()) {
787 refreshPlaylist();
788 }
789
790 int64_t targetDurationUs = kMinBufferedDurationUs;
791 if (mPlaylist != NULL) {
792 targetDurationUs = mPlaylist->getTargetDuration();
793 }
794
795 int64_t bufferedDurationUs = 0ll;
796 status_t finalResult = OK;
797 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
798 sp<AnotherPacketSource> packetSource =
799 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
800
801 bufferedDurationUs =
802 packetSource->getBufferedDurationUs(&finalResult);
803 } else {
804 // Use min stream duration, but ignore streams that never have any packet
805 // enqueued to prevent us from waiting on a non-existent stream;
806 // when we cannot make out from the manifest what streams are included in
807 // a playlist we might assume extra streams.
808 bufferedDurationUs = -1ll;
809 for (size_t i = 0; i < mPacketSources.size(); ++i) {
810 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0
811 || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) {
812 continue;
813 }
814
815 int64_t bufferedStreamDurationUs =
816 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
817
818 FSLOGV(mPacketSources.keyAt(i), "buffered %lld", (long long)bufferedStreamDurationUs);
819
820 if (bufferedDurationUs == -1ll
821 || bufferedStreamDurationUs < bufferedDurationUs) {
822 bufferedDurationUs = bufferedStreamDurationUs;
823 }
824 }
825 if (bufferedDurationUs == -1ll) {
826 bufferedDurationUs = 0ll;
827 }
828 }
829
830 if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) {
831 FLOGV("monitoring, buffered=%lld < %lld",
832 (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs);
833
834 // delay the next download slightly; hopefully this gives other concurrent fetchers
835 // a better chance to run.
836 // onDownloadNext();
837 sp<AMessage> msg = new AMessage(kWhatDownloadNext, this);
838 msg->setInt32("generation", mMonitorQueueGeneration);
839 msg->post(1000l);
840 } else {
841 // We'd like to maintain buffering above durationToBufferUs, so try
842 // again when buffer just about to go below durationToBufferUs
843 // (or after targetDurationUs / 2, whichever is smaller).
844 int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000ll;
845 if (delayUs > targetDurationUs / 2) {
846 delayUs = targetDurationUs / 2;
847 }
848
849 FLOGV("pausing for %lld, buffered=%lld > %lld",
850 (long long)delayUs,
851 (long long)bufferedDurationUs,
852 (long long)kMinBufferedDurationUs);
853
854 postMonitorQueue(delayUs);
855 }
856 }
857
refreshPlaylist()858 status_t PlaylistFetcher::refreshPlaylist() {
859 if (delayUsToRefreshPlaylist() <= 0) {
860 bool unchanged;
861 sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
862 mURI.c_str(), mPlaylistHash, &unchanged);
863
864 if (playlist == NULL) {
865 if (unchanged) {
866 // We succeeded in fetching the playlist, but it was
867 // unchanged from the last time we tried.
868
869 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
870 mRefreshState = (RefreshState)(mRefreshState + 1);
871 }
872 } else {
873 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
874 return ERROR_IO;
875 }
876 } else {
877 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
878 mPlaylist = playlist;
879
880 if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
881 updateDuration();
882 }
883 // Notify LiveSession to use target-duration based buffering level
884 // for up/down switch. Default LiveSession::kUpSwitchMark may not
885 // be reachable for live streams, as our max buffering amount is
886 // limited to 3 segments.
887 if (!mPlaylist->isComplete()) {
888 updateTargetDuration();
889 }
890 mPlaylistTimeUs = ALooper::GetNowUs();
891 }
892
893 mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
894 }
895 return OK;
896 }
897
898 // static
bufferStartsWithTsSyncByte(const sp<ABuffer> & buffer)899 bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
900 return buffer->size() > 0 && buffer->data()[0] == 0x47;
901 }
902
shouldPauseDownload()903 bool PlaylistFetcher::shouldPauseDownload() {
904 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
905 // doesn't apply to subtitles
906 return false;
907 }
908
909 // Calculate threshold to abort current download
910 float thresholdRatio = getStoppingThreshold();
911
912 if (thresholdRatio < 0.0f) {
913 // never abort
914 return false;
915 } else if (thresholdRatio == 0.0f) {
916 // immediately abort
917 return true;
918 }
919
920 // now we have a positive thresholdUs, abort if remaining
921 // portion to download is over that threshold.
922 if (mSegmentFirstPTS < 0) {
923 // this means we haven't even find the first access unit,
924 // abort now as we must be very far away from the end.
925 return true;
926 }
927 int64_t lastEnqueueUs = mSegmentFirstPTS;
928 for (size_t i = 0; i < mPacketSources.size(); ++i) {
929 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
930 continue;
931 }
932 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
933 int32_t type;
934 if (meta == NULL || meta->findInt32("discontinuity", &type)) {
935 continue;
936 }
937 int64_t tmpUs;
938 CHECK(meta->findInt64("timeUs", &tmpUs));
939 if (tmpUs > lastEnqueueUs) {
940 lastEnqueueUs = tmpUs;
941 }
942 }
943 lastEnqueueUs -= mSegmentFirstPTS;
944
945 int64_t targetDurationUs = mPlaylist->getTargetDuration();
946 int64_t thresholdUs = thresholdRatio * targetDurationUs;
947
948 FLOGV("%spausing now, thresholdUs %lld, remaining %lld",
949 targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ",
950 (long long)thresholdUs,
951 (long long)(targetDurationUs - lastEnqueueUs));
952
953 if (targetDurationUs - lastEnqueueUs > thresholdUs) {
954 return true;
955 }
956 return false;
957 }
958
initDownloadState(AString & uri,sp<AMessage> & itemMeta,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)959 bool PlaylistFetcher::initDownloadState(
960 AString &uri,
961 sp<AMessage> &itemMeta,
962 int32_t &firstSeqNumberInPlaylist,
963 int32_t &lastSeqNumberInPlaylist) {
964 status_t err = refreshPlaylist();
965 firstSeqNumberInPlaylist = 0;
966 lastSeqNumberInPlaylist = 0;
967 bool discontinuity = false;
968
969 if (mPlaylist != NULL) {
970 mPlaylist->getSeqNumberRange(
971 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
972
973 if (mDiscontinuitySeq < 0) {
974 mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
975 }
976 }
977
978 mSegmentFirstPTS = -1ll;
979
980 if (mPlaylist != NULL && mSeqNumber < 0) {
981 CHECK_GE(mStartTimeUs, 0ll);
982
983 if (mSegmentStartTimeUs < 0) {
984 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
985 // If this is a live session, start 3 segments from the end on connect
986 mSeqNumber = lastSeqNumberInPlaylist - 3;
987 if (mSeqNumber < firstSeqNumberInPlaylist) {
988 mSeqNumber = firstSeqNumberInPlaylist;
989 }
990 } else {
991 // When seeking mSegmentStartTimeUs is unavailable (< 0), we
992 // use mStartTimeUs (client supplied timestamp) to determine both start segment
993 // and relative position inside a segment
994 mSeqNumber = getSeqNumberForTime(mStartTimeUs);
995 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
996 }
997 mStartTimeUsRelative = true;
998 FLOGV("Initial sequence number for time %lld is %d from (%d .. %d)",
999 (long long)mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
1000 lastSeqNumberInPlaylist);
1001 } else {
1002 // When adapting or track switching, mSegmentStartTimeUs (relative
1003 // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
1004 // timestamps coming from the media container) is used to determine the position
1005 // inside a segments.
1006 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES
1007 && mSeekMode != LiveSession::kSeekModeNextSample) {
1008 // avoid double fetch/decode
1009 // Use (mSegmentStartTimeUs + 1/2 * targetDurationUs) to search
1010 // for the starting segment in new variant.
1011 // If the two variants' segments are aligned, this gives the
1012 // next segment. If they're not aligned, this gives the segment
1013 // that overlaps no more than 1/2 * targetDurationUs.
1014 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs
1015 + mPlaylist->getTargetDuration() / 2);
1016 } else {
1017 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
1018 }
1019 ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
1020 if (mSeqNumber < minSeq) {
1021 mSeqNumber = minSeq;
1022 }
1023
1024 if (mSeqNumber < firstSeqNumberInPlaylist) {
1025 mSeqNumber = firstSeqNumberInPlaylist;
1026 }
1027
1028 if (mSeqNumber > lastSeqNumberInPlaylist) {
1029 mSeqNumber = lastSeqNumberInPlaylist;
1030 }
1031 FLOGV("Initial sequence number is %d from (%d .. %d)",
1032 mSeqNumber, firstSeqNumberInPlaylist,
1033 lastSeqNumberInPlaylist);
1034 }
1035 }
1036
1037 // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
1038 if (mSeqNumber < firstSeqNumberInPlaylist
1039 || mSeqNumber > lastSeqNumberInPlaylist
1040 || err != OK) {
1041 if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
1042 ++mNumRetries;
1043
1044 if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
1045 // make sure we reach this retry logic on refresh failures
1046 // by adding an err != OK clause to all enclosing if's.
1047
1048 // refresh in increasing fraction (1/2, 1/3, ...) of the
1049 // playlist's target duration or 3 seconds, whichever is less
1050 int64_t delayUs = kMaxMonitorDelayUs;
1051 if (mPlaylist != NULL) {
1052 delayUs = mPlaylist->size() * mPlaylist->getTargetDuration()
1053 / (1 + mNumRetries);
1054 }
1055 if (delayUs > kMaxMonitorDelayUs) {
1056 delayUs = kMaxMonitorDelayUs;
1057 }
1058 FLOGV("sequence number high: %d from (%d .. %d), "
1059 "monitor in %lld (retry=%d)",
1060 mSeqNumber, firstSeqNumberInPlaylist,
1061 lastSeqNumberInPlaylist, (long long)delayUs, mNumRetries);
1062 postMonitorQueue(delayUs);
1063 return false;
1064 }
1065
1066 if (err != OK) {
1067 notifyError(err);
1068 return false;
1069 }
1070
1071 // we've missed the boat, let's start 3 segments prior to the latest sequence
1072 // number available and signal a discontinuity.
1073
1074 ALOGI("We've missed the boat, restarting playback."
1075 " mStartup=%d, was looking for %d in %d-%d",
1076 mStartup, mSeqNumber, firstSeqNumberInPlaylist,
1077 lastSeqNumberInPlaylist);
1078 if (mStopParams != NULL) {
1079 // we should have kept on fetching until we hit the boundaries in mStopParams,
1080 // but since the segments we are supposed to fetch have already rolled off
1081 // the playlist, i.e. we have already missed the boat, we inevitably have to
1082 // skip.
1083 notifyStopReached();
1084 return false;
1085 }
1086 mSeqNumber = lastSeqNumberInPlaylist - 3;
1087 if (mSeqNumber < firstSeqNumberInPlaylist) {
1088 mSeqNumber = firstSeqNumberInPlaylist;
1089 }
1090 discontinuity = true;
1091
1092 // fall through
1093 } else {
1094 if (mPlaylist != NULL) {
1095 if (mSeqNumber >= firstSeqNumberInPlaylist + (int32_t)mPlaylist->size()
1096 && !mPlaylist->isComplete()) {
1097 // Live playlists
1098 ALOGW("sequence number %d not yet available", mSeqNumber);
1099 postMonitorQueue(delayUsToRefreshPlaylist());
1100 return false;
1101 }
1102 ALOGE("Cannot find sequence number %d in playlist "
1103 "(contains %d - %d)",
1104 mSeqNumber, firstSeqNumberInPlaylist,
1105 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
1106
1107 if (mTSParser != NULL) {
1108 mTSParser->signalEOS(ERROR_END_OF_STREAM);
1109 // Use an empty buffer; we don't have any new data, just want to extract
1110 // potential new access units after flush. Reset mSeqNumber to
1111 // lastSeqNumberInPlaylist such that we set the correct access unit
1112 // properties in extractAndQueueAccessUnitsFromTs.
1113 sp<ABuffer> buffer = new ABuffer(0);
1114 mSeqNumber = lastSeqNumberInPlaylist;
1115 extractAndQueueAccessUnitsFromTs(buffer);
1116 }
1117 notifyError(ERROR_END_OF_STREAM);
1118 } else {
1119 // It's possible that we were never able to download the playlist.
1120 // In this case we should notify error, instead of EOS, as EOS during
1121 // prepare means we succeeded in downloading everything.
1122 ALOGE("Failed to download playlist!");
1123 notifyError(ERROR_IO);
1124 }
1125
1126 return false;
1127 }
1128 }
1129
1130 mNumRetries = 0;
1131
1132 CHECK(mPlaylist->itemAt(
1133 mSeqNumber - firstSeqNumberInPlaylist,
1134 &uri,
1135 &itemMeta));
1136
1137 CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq));
1138
1139 int32_t val;
1140 if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
1141 discontinuity = true;
1142 } else if (mLastDiscontinuitySeq >= 0
1143 && mDiscontinuitySeq != mLastDiscontinuitySeq) {
1144 // Seek jumped to a new discontinuity sequence. We need to signal
1145 // a format change to decoder. Decoder needs to shutdown and be
1146 // created again if seamless format change is unsupported.
1147 FLOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, "
1148 "mDiscontinuitySeq %d, mStartTimeUs %lld",
1149 mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs);
1150 discontinuity = true;
1151 }
1152 mLastDiscontinuitySeq = -1;
1153
1154 // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
1155 // this avoids interleaved connections to the key and segment file.
1156 {
1157 sp<ABuffer> junk = new ABuffer(16);
1158 junk->setRange(0, 16);
1159 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
1160 true /* first */);
1161 if (err == ERROR_NOT_CONNECTED) {
1162 return false;
1163 } else if (err != OK) {
1164 notifyError(err);
1165 return false;
1166 }
1167 }
1168
1169 if ((mStartup && !mTimeChangeSignaled) || discontinuity) {
1170 // We need to signal a time discontinuity to ATSParser on the
1171 // first segment after start, or on a discontinuity segment.
1172 // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX()
1173 // to send the time discontinuity.
1174 if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
1175 // If this was a live event this made no sense since
1176 // we don't have access to all the segment before the current
1177 // one.
1178 mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
1179 }
1180
1181 // Setting mTimeChangeSignaled to true, so that if start time
1182 // searching goes into 2nd segment (without a discontinuity),
1183 // we don't reset time again. It causes corruption when pending
1184 // data in ATSParser is cleared.
1185 mTimeChangeSignaled = true;
1186 }
1187
1188 if (discontinuity) {
1189 ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
1190
1191 // Signal a format discontinuity to ATSParser to clear partial data
1192 // from previous streams. Not doing this causes bitstream corruption.
1193 if (mTSParser != NULL) {
1194 mTSParser.clear();
1195 }
1196
1197 queueDiscontinuity(
1198 ATSParser::DISCONTINUITY_FORMAT_ONLY,
1199 NULL /* extra */);
1200
1201 if (mStartup && mStartTimeUsRelative && mFirstPTSValid) {
1202 // This means we guessed mStartTimeUs to be in the previous
1203 // segment (likely very close to the end), but either video or
1204 // audio has not found start by the end of that segment.
1205 //
1206 // If this new segment is not a discontinuity, keep searching.
1207 //
1208 // If this new segment even got a discontinuity marker, just
1209 // set mStartTimeUs=0, and take all samples from now on.
1210 mStartTimeUs = 0;
1211 mFirstPTSValid = false;
1212 mIDRFound = false;
1213 mVideoBuffer->clear();
1214 }
1215 }
1216
1217 FLOGV("fetching segment %d from (%d .. %d)",
1218 mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1219 return true;
1220 }
1221
onDownloadNext()1222 void PlaylistFetcher::onDownloadNext() {
1223 AString uri;
1224 sp<AMessage> itemMeta;
1225 sp<ABuffer> buffer;
1226 sp<ABuffer> tsBuffer;
1227 int32_t firstSeqNumberInPlaylist = 0;
1228 int32_t lastSeqNumberInPlaylist = 0;
1229 bool connectHTTP = true;
1230
1231 if (mDownloadState->hasSavedState()) {
1232 mDownloadState->restoreState(
1233 uri,
1234 itemMeta,
1235 buffer,
1236 tsBuffer,
1237 firstSeqNumberInPlaylist,
1238 lastSeqNumberInPlaylist);
1239 connectHTTP = false;
1240 FLOGV("resuming: '%s'", uri.c_str());
1241 } else {
1242 if (!initDownloadState(
1243 uri,
1244 itemMeta,
1245 firstSeqNumberInPlaylist,
1246 lastSeqNumberInPlaylist)) {
1247 return;
1248 }
1249 FLOGV("fetching: '%s'", uri.c_str());
1250 }
1251
1252 int64_t range_offset, range_length;
1253 if (!itemMeta->findInt64("range-offset", &range_offset)
1254 || !itemMeta->findInt64("range-length", &range_length)) {
1255 range_offset = 0;
1256 range_length = -1;
1257 }
1258
1259 // block-wise download
1260 bool shouldPause = false;
1261 ssize_t bytesRead;
1262 do {
1263 int64_t startUs = ALooper::GetNowUs();
1264 bytesRead = mHTTPDownloader->fetchBlock(
1265 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize,
1266 NULL /* actualURL */, connectHTTP);
1267 int64_t delayUs = ALooper::GetNowUs() - startUs;
1268
1269 if (bytesRead == ERROR_NOT_CONNECTED) {
1270 return;
1271 }
1272 if (bytesRead < 0) {
1273 status_t err = bytesRead;
1274 ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
1275 notifyError(err);
1276 return;
1277 }
1278
1279 // add sample for bandwidth estimation, excluding samples from subtitles (as
1280 // its too small), or during startup/resumeUntil (when we could have more than
1281 // one connection open which affects bandwidth)
1282 if (!mStartup && mStopParams == NULL && bytesRead > 0
1283 && (mStreamTypeMask
1284 & (LiveSession::STREAMTYPE_AUDIO
1285 | LiveSession::STREAMTYPE_VIDEO))) {
1286 mSession->addBandwidthMeasurement(bytesRead, delayUs);
1287 if (delayUs > 2000000ll) {
1288 FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip",
1289 bytesRead, (double)delayUs / 1.0e6);
1290 }
1291 }
1292
1293 connectHTTP = false;
1294
1295 CHECK(buffer != NULL);
1296
1297 size_t size = buffer->size();
1298 // Set decryption range.
1299 buffer->setRange(size - bytesRead, bytesRead);
1300 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
1301 buffer->offset() == 0 /* first */);
1302 // Unset decryption range.
1303 buffer->setRange(0, size);
1304
1305 if (err != OK) {
1306 ALOGE("decryptBuffer failed w/ error %d", err);
1307
1308 notifyError(err);
1309 return;
1310 }
1311
1312 bool startUp = mStartup; // save current start up state
1313
1314 err = OK;
1315 if (bufferStartsWithTsSyncByte(buffer)) {
1316 // Incremental extraction is only supported for MPEG2 transport streams.
1317 if (tsBuffer == NULL) {
1318 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1319 tsBuffer->setRange(0, 0);
1320 } else if (tsBuffer->capacity() != buffer->capacity()) {
1321 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
1322 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1323 tsBuffer->setRange(tsOff, tsSize);
1324 }
1325 tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
1326 err = extractAndQueueAccessUnitsFromTs(tsBuffer);
1327 }
1328
1329 if (err == -EAGAIN) {
1330 // starting sequence number too low/high
1331 mTSParser.clear();
1332 for (size_t i = 0; i < mPacketSources.size(); i++) {
1333 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1334 packetSource->clear();
1335 }
1336 postMonitorQueue();
1337 return;
1338 } else if (err == ERROR_OUT_OF_RANGE) {
1339 // reached stopping point
1340 notifyStopReached();
1341 return;
1342 } else if (err != OK) {
1343 notifyError(err);
1344 return;
1345 }
1346 // If we're switching, post start notification
1347 // this should only be posted when the last chunk is full processed by TSParser
1348 if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1349 CHECK(mStartTimeUsNotify != NULL);
1350 mStartTimeUsNotify->post();
1351 mStartTimeUsNotify.clear();
1352 shouldPause = true;
1353 }
1354 if (shouldPause || shouldPauseDownload()) {
1355 // save state and return if this is not the last chunk,
1356 // leaving the fetcher in paused state.
1357 if (bytesRead != 0) {
1358 mDownloadState->saveState(
1359 uri,
1360 itemMeta,
1361 buffer,
1362 tsBuffer,
1363 firstSeqNumberInPlaylist,
1364 lastSeqNumberInPlaylist);
1365 return;
1366 }
1367 shouldPause = true;
1368 }
1369 } while (bytesRead != 0);
1370
1371 if (bufferStartsWithTsSyncByte(buffer)) {
1372 // If we don't see a stream in the program table after fetching a full ts segment
1373 // mark it as nonexistent.
1374 ATSParser::SourceType srcTypes[] =
1375 { ATSParser::VIDEO, ATSParser::AUDIO };
1376 LiveSession::StreamType streamTypes[] =
1377 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
1378 const size_t kNumTypes = NELEM(srcTypes);
1379
1380 for (size_t i = 0; i < kNumTypes; i++) {
1381 ATSParser::SourceType srcType = srcTypes[i];
1382 LiveSession::StreamType streamType = streamTypes[i];
1383
1384 sp<AnotherPacketSource> source =
1385 static_cast<AnotherPacketSource *>(
1386 mTSParser->getSource(srcType).get());
1387
1388 if (!mTSParser->hasSource(srcType)) {
1389 ALOGW("MPEG2 Transport stream does not contain %s data.",
1390 srcType == ATSParser::VIDEO ? "video" : "audio");
1391
1392 mStreamTypeMask &= ~streamType;
1393 mPacketSources.removeItem(streamType);
1394 }
1395 }
1396
1397 }
1398
1399 if (checkDecryptPadding(buffer) != OK) {
1400 ALOGE("Incorrect padding bytes after decryption.");
1401 notifyError(ERROR_MALFORMED);
1402 return;
1403 }
1404
1405 if (tsBuffer != NULL) {
1406 AString method;
1407 CHECK(buffer->meta()->findString("cipher-method", &method));
1408 if ((tsBuffer->size() > 0 && method == "NONE")
1409 || tsBuffer->size() > 16) {
1410 ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1411 "bytes in length.");
1412 notifyError(ERROR_MALFORMED);
1413 return;
1414 }
1415 }
1416
1417 // bulk extract non-ts files
1418 bool startUp = mStartup;
1419 if (tsBuffer == NULL) {
1420 status_t err = extractAndQueueAccessUnits(buffer, itemMeta);
1421 if (err == -EAGAIN) {
1422 // starting sequence number too low/high
1423 postMonitorQueue();
1424 return;
1425 } else if (err == ERROR_OUT_OF_RANGE) {
1426 // reached stopping point
1427 notifyStopReached();
1428 return;
1429 } else if (err != OK) {
1430 notifyError(err);
1431 return;
1432 }
1433 }
1434
1435 ++mSeqNumber;
1436
1437 // if adapting, pause after found the next starting point
1438 if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1439 CHECK(mStartTimeUsNotify != NULL);
1440 mStartTimeUsNotify->post();
1441 mStartTimeUsNotify.clear();
1442 shouldPause = true;
1443 }
1444
1445 if (!shouldPause) {
1446 postMonitorQueue();
1447 }
1448 }
1449
1450 /*
1451 * returns true if we need to adjust mSeqNumber
1452 */
adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs)1453 bool PlaylistFetcher::adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs) {
1454 int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1455
1456 int64_t minDiffUs, maxDiffUs;
1457 if (mSeekMode == LiveSession::kSeekModeNextSample) {
1458 // if the previous fetcher paused in the middle of a segment, we
1459 // want to start at a segment that overlaps the last sample
1460 minDiffUs = -mPlaylist->getTargetDuration();
1461 maxDiffUs = 0ll;
1462 } else {
1463 // if the previous fetcher paused at the end of a segment, ideally
1464 // we want to start at the segment that's roughly aligned with its
1465 // next segment, but if the two variants are not well aligned we
1466 // adjust the diff to within (-T/2, T/2)
1467 minDiffUs = -mPlaylist->getTargetDuration() / 2;
1468 maxDiffUs = mPlaylist->getTargetDuration() / 2;
1469 }
1470
1471 int32_t oldSeqNumber = mSeqNumber;
1472 ssize_t index = mSeqNumber - firstSeqNumberInPlaylist;
1473
1474 // adjust anchorTimeUs to within (minDiffUs, maxDiffUs) from mStartTimeUs
1475 int64_t diffUs = anchorTimeUs - mStartTimeUs;
1476 if (diffUs > maxDiffUs) {
1477 while (index > 0 && diffUs > maxDiffUs) {
1478 --index;
1479
1480 sp<AMessage> itemMeta;
1481 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1482
1483 int64_t itemDurationUs;
1484 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1485
1486 diffUs -= itemDurationUs;
1487 }
1488 } else if (diffUs < minDiffUs) {
1489 while (index + 1 < (ssize_t) mPlaylist->size()
1490 && diffUs < minDiffUs) {
1491 ++index;
1492
1493 sp<AMessage> itemMeta;
1494 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1495
1496 int64_t itemDurationUs;
1497 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1498
1499 diffUs += itemDurationUs;
1500 }
1501 }
1502
1503 mSeqNumber = firstSeqNumberInPlaylist + index;
1504
1505 if (mSeqNumber != oldSeqNumber) {
1506 FLOGV("guessed wrong seg number: diff %lld out of [%lld, %lld]",
1507 (long long) anchorTimeUs - mStartTimeUs,
1508 (long long) minDiffUs,
1509 (long long) maxDiffUs);
1510 return true;
1511 }
1512 return false;
1513 }
1514
getSeqNumberForDiscontinuity(size_t discontinuitySeq) const1515 int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1516 int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1517
1518 size_t index = 0;
1519 while (index < mPlaylist->size()) {
1520 sp<AMessage> itemMeta;
1521 CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1522 size_t curDiscontinuitySeq;
1523 CHECK(itemMeta->findInt32("discontinuity-sequence", (int32_t *)&curDiscontinuitySeq));
1524 int32_t seqNumber = firstSeqNumberInPlaylist + index;
1525 if (curDiscontinuitySeq == discontinuitySeq) {
1526 return seqNumber;
1527 } else if (curDiscontinuitySeq > discontinuitySeq) {
1528 return seqNumber <= 0 ? 0 : seqNumber - 1;
1529 }
1530
1531 ++index;
1532 }
1533
1534 return firstSeqNumberInPlaylist + mPlaylist->size();
1535 }
1536
getSeqNumberForTime(int64_t timeUs) const1537 int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1538 size_t index = 0;
1539 int64_t segmentStartUs = 0;
1540 while (index < mPlaylist->size()) {
1541 sp<AMessage> itemMeta;
1542 CHECK(mPlaylist->itemAt(
1543 index, NULL /* uri */, &itemMeta));
1544
1545 int64_t itemDurationUs;
1546 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1547
1548 if (timeUs < segmentStartUs + itemDurationUs) {
1549 break;
1550 }
1551
1552 segmentStartUs += itemDurationUs;
1553 ++index;
1554 }
1555
1556 if (index >= mPlaylist->size()) {
1557 index = mPlaylist->size() - 1;
1558 }
1559
1560 return mPlaylist->getFirstSeqNumber() + index;
1561 }
1562
setAccessUnitProperties(const sp<ABuffer> & accessUnit,const sp<AnotherPacketSource> & source,bool discard)1563 const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1564 const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1565 sp<MetaData> format = source->getFormat();
1566 if (format != NULL) {
1567 // for simplicity, store a reference to the format in each unit
1568 accessUnit->meta()->setObject("format", format);
1569 }
1570
1571 if (discard) {
1572 accessUnit->meta()->setInt32("discard", discard);
1573 }
1574
1575 accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1576 accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1577 accessUnit->meta()->setInt64("segmentFirstTimeUs", mSegmentFirstPTS);
1578 accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber));
1579 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1580 accessUnit->meta()->setInt64("playlistTimeUs", mPlaylistTimeUs);
1581 }
1582 return accessUnit;
1583 }
1584
isStartTimeReached(int64_t timeUs)1585 bool PlaylistFetcher::isStartTimeReached(int64_t timeUs) {
1586 if (!mFirstPTSValid) {
1587 mFirstTimeUs = timeUs;
1588 mFirstPTSValid = true;
1589 }
1590 bool startTimeReached = true;
1591 if (mStartTimeUsRelative) {
1592 FLOGV("startTimeUsRelative, timeUs (%lld) - %lld = %lld",
1593 (long long)timeUs,
1594 (long long)mFirstTimeUs,
1595 (long long)(timeUs - mFirstTimeUs));
1596 timeUs -= mFirstTimeUs;
1597 if (timeUs < 0) {
1598 FLOGV("clamp negative timeUs to 0");
1599 timeUs = 0;
1600 }
1601 startTimeReached = (timeUs >= mStartTimeUs);
1602 }
1603 return startTimeReached;
1604 }
1605
extractAndQueueAccessUnitsFromTs(const sp<ABuffer> & buffer)1606 status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1607 if (mTSParser == NULL) {
1608 // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1609 mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1610 }
1611
1612 if (mNextPTSTimeUs >= 0ll) {
1613 sp<AMessage> extra = new AMessage;
1614 // Since we are using absolute timestamps, signal an offset of 0 to prevent
1615 // ATSParser from skewing the timestamps of access units.
1616 extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
1617
1618 // When adapting, signal a recent media time to the parser,
1619 // so that PTS wrap around is handled for the new variant.
1620 if (mStartTimeUs >= 0 && !mStartTimeUsRelative) {
1621 extra->setInt64(IStreamListener::kKeyRecentMediaTimeUs, mStartTimeUs);
1622 }
1623
1624 mTSParser->signalDiscontinuity(
1625 ATSParser::DISCONTINUITY_TIME, extra);
1626
1627 mNextPTSTimeUs = -1ll;
1628 }
1629
1630 size_t offset = 0;
1631 while (offset + 188 <= buffer->size()) {
1632 status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1633
1634 if (err != OK) {
1635 return err;
1636 }
1637
1638 offset += 188;
1639 }
1640 // setRange to indicate consumed bytes.
1641 buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1642
1643 if (mSegmentFirstPTS < 0ll) {
1644 // get the smallest first PTS from all streams present in this parser
1645 for (size_t i = mPacketSources.size(); i > 0;) {
1646 i--;
1647 const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1648 if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1649 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1650 return ERROR_MALFORMED;
1651 }
1652 if (stream == LiveSession::STREAMTYPE_METADATA) {
1653 continue;
1654 }
1655 ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1656 sp<AnotherPacketSource> source =
1657 static_cast<AnotherPacketSource *>(
1658 mTSParser->getSource(type).get());
1659
1660 if (source == NULL) {
1661 continue;
1662 }
1663 sp<AMessage> meta = source->getMetaAfterLastDequeued(0);
1664 if (meta != NULL) {
1665 int64_t timeUs;
1666 CHECK(meta->findInt64("timeUs", &timeUs));
1667 if (mSegmentFirstPTS < 0ll || timeUs < mSegmentFirstPTS) {
1668 mSegmentFirstPTS = timeUs;
1669 }
1670 }
1671 }
1672 if (mSegmentFirstPTS < 0ll) {
1673 // didn't find any TS packet, can return early
1674 return OK;
1675 }
1676 if (!mStartTimeUsRelative) {
1677 // mStartup
1678 // mStartup is true until we have queued a packet for all the streams
1679 // we are fetching. We queue packets whose timestamps are greater than
1680 // mStartTimeUs.
1681 // mSegmentStartTimeUs >= 0
1682 // mSegmentStartTimeUs is non-negative when adapting or switching tracks
1683 // adjustSeqNumberWithAnchorTime(timeUs) == true
1684 // we guessed a seq number that's either too large or too small.
1685 // If this happens, we'll adjust mSeqNumber and restart fetching from new
1686 // location. Note that we only want to adjust once, so set mSegmentStartTimeUs
1687 // to -1 so that we don't enter this chunk next time.
1688 if (mStartup && mSegmentStartTimeUs >= 0
1689 && adjustSeqNumberWithAnchorTime(mSegmentFirstPTS)) {
1690 mStartTimeUsNotify = mNotify->dup();
1691 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1692 mStartTimeUsNotify->setString("uri", mURI);
1693 mIDRFound = false;
1694 mSegmentStartTimeUs = -1;
1695 return -EAGAIN;
1696 }
1697 }
1698 }
1699
1700 status_t err = OK;
1701 for (size_t i = mPacketSources.size(); i > 0;) {
1702 i--;
1703 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1704
1705 const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1706 if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1707 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1708 return ERROR_MALFORMED;
1709 }
1710
1711 const char *key = LiveSession::getKeyForStream(stream);
1712 ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1713
1714 sp<AnotherPacketSource> source =
1715 static_cast<AnotherPacketSource *>(
1716 mTSParser->getSource(type).get());
1717
1718 if (source == NULL) {
1719 continue;
1720 }
1721
1722 const char *mime;
1723 sp<MetaData> format = source->getFormat();
1724 bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime)
1725 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
1726
1727 sp<ABuffer> accessUnit;
1728 status_t finalResult;
1729 while (source->hasBufferAvailable(&finalResult)
1730 && source->dequeueAccessUnit(&accessUnit) == OK) {
1731
1732 int64_t timeUs;
1733 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1734
1735 if (mStartup) {
1736 bool startTimeReached = isStartTimeReached(timeUs);
1737
1738 if (!startTimeReached || (isAvc && !mIDRFound)) {
1739 // buffer up to the closest preceding IDR frame in the next segement,
1740 // or the closest succeeding IDR frame after the exact position
1741 FSLOGV(stream, "timeUs(%lld)-mStartTimeUs(%lld)=%lld, mIDRFound=%d",
1742 (long long)timeUs,
1743 (long long)mStartTimeUs,
1744 (long long)timeUs - mStartTimeUs,
1745 mIDRFound);
1746 if (isAvc) {
1747 if (IsIDR(accessUnit)) {
1748 mVideoBuffer->clear();
1749 FSLOGV(stream, "found IDR, clear mVideoBuffer");
1750 mIDRFound = true;
1751 }
1752 if (mIDRFound && mStartTimeUsRelative && !startTimeReached) {
1753 mVideoBuffer->queueAccessUnit(accessUnit);
1754 FSLOGV(stream, "saving AVC video AccessUnit");
1755 }
1756 }
1757 if (!startTimeReached || (isAvc && !mIDRFound)) {
1758 continue;
1759 }
1760 }
1761 }
1762
1763 if (mStartTimeUsNotify != NULL) {
1764 uint32_t streamMask = 0;
1765 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1766 if ((mStreamTypeMask & mPacketSources.keyAt(i))
1767 && !(streamMask & mPacketSources.keyAt(i))) {
1768 streamMask |= mPacketSources.keyAt(i);
1769 mStartTimeUsNotify->setInt32("streamMask", streamMask);
1770 FSLOGV(stream, "found start point, timeUs=%lld, streamMask becomes %x",
1771 (long long)timeUs, streamMask);
1772
1773 if (streamMask == mStreamTypeMask) {
1774 FLOGV("found start point for all streams");
1775 mStartup = false;
1776 }
1777 }
1778 }
1779
1780 if (mStopParams != NULL) {
1781 int32_t discontinuitySeq;
1782 int64_t stopTimeUs;
1783 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1784 || discontinuitySeq > mDiscontinuitySeq
1785 || !mStopParams->findInt64(key, &stopTimeUs)
1786 || (discontinuitySeq == mDiscontinuitySeq
1787 && timeUs >= stopTimeUs)) {
1788 FSLOGV(stream, "reached stop point, timeUs=%lld", (long long)timeUs);
1789 mStreamTypeMask &= ~stream;
1790 mPacketSources.removeItemsAt(i);
1791 break;
1792 }
1793 }
1794
1795 if (stream == LiveSession::STREAMTYPE_VIDEO) {
1796 const bool discard = true;
1797 status_t status;
1798 while (mVideoBuffer->hasBufferAvailable(&status)) {
1799 sp<ABuffer> videoBuffer;
1800 mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1801 setAccessUnitProperties(videoBuffer, source, discard);
1802 packetSource->queueAccessUnit(videoBuffer);
1803 int64_t bufferTimeUs;
1804 CHECK(videoBuffer->meta()->findInt64("timeUs", &bufferTimeUs));
1805 FSLOGV(stream, "queueAccessUnit (saved), timeUs=%lld",
1806 (long long)bufferTimeUs);
1807 }
1808 } else if (stream == LiveSession::STREAMTYPE_METADATA && !mHasMetadata) {
1809 mHasMetadata = true;
1810 sp<AMessage> notify = mNotify->dup();
1811 notify->setInt32("what", kWhatMetadataDetected);
1812 notify->post();
1813 }
1814
1815 setAccessUnitProperties(accessUnit, source);
1816 packetSource->queueAccessUnit(accessUnit);
1817 FSLOGV(stream, "queueAccessUnit, timeUs=%lld", (long long)timeUs);
1818 }
1819
1820 if (err != OK) {
1821 break;
1822 }
1823 }
1824
1825 if (err != OK) {
1826 for (size_t i = mPacketSources.size(); i > 0;) {
1827 i--;
1828 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1829 packetSource->clear();
1830 }
1831 return err;
1832 }
1833
1834 if (!mStreamTypeMask) {
1835 // Signal gap is filled between original and new stream.
1836 FLOGV("reached stop point for all streams");
1837 return ERROR_OUT_OF_RANGE;
1838 }
1839
1840 return OK;
1841 }
1842
1843 /* static */
bufferStartsWithWebVTTMagicSequence(const sp<ABuffer> & buffer)1844 bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1845 const sp<ABuffer> &buffer) {
1846 size_t pos = 0;
1847
1848 // skip possible BOM
1849 if (buffer->size() >= pos + 3 &&
1850 !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1851 pos += 3;
1852 }
1853
1854 // accept WEBVTT followed by SPACE, TAB or (CR) LF
1855 if (buffer->size() < pos + 6 ||
1856 memcmp("WEBVTT", buffer->data() + pos, 6)) {
1857 return false;
1858 }
1859 pos += 6;
1860
1861 if (buffer->size() == pos) {
1862 return true;
1863 }
1864
1865 uint8_t sep = buffer->data()[pos];
1866 return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1867 }
1868
extractAndQueueAccessUnits(const sp<ABuffer> & buffer,const sp<AMessage> & itemMeta)1869 status_t PlaylistFetcher::extractAndQueueAccessUnits(
1870 const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1871 if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1872 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1873 ALOGE("This stream only contains subtitles.");
1874 return ERROR_MALFORMED;
1875 }
1876
1877 const sp<AnotherPacketSource> packetSource =
1878 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1879
1880 int64_t durationUs;
1881 CHECK(itemMeta->findInt64("durationUs", &durationUs));
1882 buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1883 buffer->meta()->setInt64("durationUs", durationUs);
1884 buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1885 buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1886 buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
1887 packetSource->queueAccessUnit(buffer);
1888 return OK;
1889 }
1890
1891 if (mNextPTSTimeUs >= 0ll) {
1892 mNextPTSTimeUs = -1ll;
1893 }
1894
1895 // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1896 // stream prefixed by an ID3 tag.
1897
1898 bool firstID3Tag = true;
1899 uint64_t PTS = 0;
1900
1901 for (;;) {
1902 // Make sure to skip all ID3 tags preceding the audio data.
1903 // At least one must be present to provide the PTS timestamp.
1904
1905 ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
1906 if (!id3.isValid()) {
1907 if (firstID3Tag) {
1908 ALOGE("Unable to parse ID3 tag.");
1909 return ERROR_MALFORMED;
1910 } else {
1911 break;
1912 }
1913 }
1914
1915 if (firstID3Tag) {
1916 bool found = false;
1917
1918 ID3::Iterator it(id3, "PRIV");
1919 while (!it.done()) {
1920 size_t length;
1921 const uint8_t *data = it.getData(&length);
1922 if (!data) {
1923 return ERROR_MALFORMED;
1924 }
1925
1926 static const char *kMatchName =
1927 "com.apple.streaming.transportStreamTimestamp";
1928 static const size_t kMatchNameLen = strlen(kMatchName);
1929
1930 if (length == kMatchNameLen + 1 + 8
1931 && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
1932 found = true;
1933 PTS = U64_AT(&data[kMatchNameLen + 1]);
1934 }
1935
1936 it.next();
1937 }
1938
1939 if (!found) {
1940 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
1941 return ERROR_MALFORMED;
1942 }
1943 }
1944
1945 // skip the ID3 tag
1946 buffer->setRange(
1947 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
1948
1949 firstID3Tag = false;
1950 }
1951
1952 if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
1953 ALOGW("This stream only contains audio data!");
1954
1955 mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
1956
1957 if (mStreamTypeMask == 0) {
1958 return OK;
1959 }
1960 }
1961
1962 sp<AnotherPacketSource> packetSource =
1963 mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
1964
1965 if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
1966 ABitReader bits(buffer->data(), buffer->size());
1967
1968 // adts_fixed_header
1969
1970 CHECK_EQ(bits.getBits(12), 0xfffu);
1971 bits.skipBits(3); // ID, layer
1972 bool protection_absent __unused = bits.getBits(1) != 0;
1973
1974 unsigned profile = bits.getBits(2);
1975 CHECK_NE(profile, 3u);
1976 unsigned sampling_freq_index = bits.getBits(4);
1977 bits.getBits(1); // private_bit
1978 unsigned channel_configuration = bits.getBits(3);
1979 CHECK_NE(channel_configuration, 0u);
1980 bits.skipBits(2); // original_copy, home
1981
1982 sp<MetaData> meta = MakeAACCodecSpecificData(
1983 profile, sampling_freq_index, channel_configuration);
1984
1985 meta->setInt32(kKeyIsADTS, true);
1986
1987 packetSource->setFormat(meta);
1988 }
1989
1990 int64_t numSamples = 0ll;
1991 int32_t sampleRate;
1992 CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
1993
1994 int64_t timeUs = (PTS * 100ll) / 9ll;
1995 if (mStartup && !mFirstPTSValid) {
1996 mFirstPTSValid = true;
1997 mFirstTimeUs = timeUs;
1998 }
1999
2000 if (mSegmentFirstPTS < 0ll) {
2001 mSegmentFirstPTS = timeUs;
2002 if (!mStartTimeUsRelative) {
2003 // Duplicated logic from how we handle .ts playlists.
2004 if (mStartup && mSegmentStartTimeUs >= 0
2005 && adjustSeqNumberWithAnchorTime(timeUs)) {
2006 mSegmentStartTimeUs = -1;
2007 return -EAGAIN;
2008 }
2009 }
2010 }
2011
2012 size_t offset = 0;
2013 while (offset < buffer->size()) {
2014 const uint8_t *adtsHeader = buffer->data() + offset;
2015 CHECK_LT(offset + 5, buffer->size());
2016
2017 unsigned aac_frame_length =
2018 ((adtsHeader[3] & 3) << 11)
2019 | (adtsHeader[4] << 3)
2020 | (adtsHeader[5] >> 5);
2021
2022 if (aac_frame_length == 0) {
2023 const uint8_t *id3Header = adtsHeader;
2024 if (!memcmp(id3Header, "ID3", 3)) {
2025 ID3 id3(id3Header, buffer->size() - offset, true);
2026 if (id3.isValid()) {
2027 offset += id3.rawSize();
2028 continue;
2029 };
2030 }
2031 return ERROR_MALFORMED;
2032 }
2033
2034 CHECK_LE(offset + aac_frame_length, buffer->size());
2035
2036 int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
2037 offset += aac_frame_length;
2038
2039 // Each AAC frame encodes 1024 samples.
2040 numSamples += 1024;
2041
2042 if (mStartup) {
2043 int64_t startTimeUs = unitTimeUs;
2044 if (mStartTimeUsRelative) {
2045 startTimeUs -= mFirstTimeUs;
2046 if (startTimeUs < 0) {
2047 startTimeUs = 0;
2048 }
2049 }
2050 if (startTimeUs < mStartTimeUs) {
2051 continue;
2052 }
2053
2054 if (mStartTimeUsNotify != NULL) {
2055 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
2056 mStartup = false;
2057 }
2058 }
2059
2060 if (mStopParams != NULL) {
2061 int32_t discontinuitySeq;
2062 int64_t stopTimeUs;
2063 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
2064 || discontinuitySeq > mDiscontinuitySeq
2065 || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
2066 || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
2067 mStreamTypeMask = 0;
2068 mPacketSources.clear();
2069 return ERROR_OUT_OF_RANGE;
2070 }
2071 }
2072
2073 sp<ABuffer> unit = new ABuffer(aac_frame_length);
2074 memcpy(unit->data(), adtsHeader, aac_frame_length);
2075
2076 unit->meta()->setInt64("timeUs", unitTimeUs);
2077 setAccessUnitProperties(unit, packetSource);
2078 packetSource->queueAccessUnit(unit);
2079 }
2080
2081 return OK;
2082 }
2083
updateDuration()2084 void PlaylistFetcher::updateDuration() {
2085 int64_t durationUs = 0ll;
2086 for (size_t index = 0; index < mPlaylist->size(); ++index) {
2087 sp<AMessage> itemMeta;
2088 CHECK(mPlaylist->itemAt(
2089 index, NULL /* uri */, &itemMeta));
2090
2091 int64_t itemDurationUs;
2092 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
2093
2094 durationUs += itemDurationUs;
2095 }
2096
2097 sp<AMessage> msg = mNotify->dup();
2098 msg->setInt32("what", kWhatDurationUpdate);
2099 msg->setInt64("durationUs", durationUs);
2100 msg->post();
2101 }
2102
updateTargetDuration()2103 void PlaylistFetcher::updateTargetDuration() {
2104 sp<AMessage> msg = mNotify->dup();
2105 msg->setInt32("what", kWhatTargetDurationUpdate);
2106 msg->setInt64("targetDurationUs", mPlaylist->getTargetDuration());
2107 msg->post();
2108 }
2109
2110 } // namespace android
2111