• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "PlaylistFetcher"
19 #include <utils/Log.h>
20 #include <utils/misc.h>
21 
22 #include "PlaylistFetcher.h"
23 #include "HTTPDownloader.h"
24 #include "LiveSession.h"
25 #include "M3UParser.h"
26 #include "include/avc_utils.h"
27 #include "include/ID3.h"
28 #include "mpeg2ts/AnotherPacketSource.h"
29 
30 #include <media/stagefright/foundation/ABitReader.h>
31 #include <media/stagefright/foundation/ABuffer.h>
32 #include <media/stagefright/foundation/ADebug.h>
33 #include <media/stagefright/MediaDefs.h>
34 #include <media/stagefright/MetaData.h>
35 #include <media/stagefright/Utils.h>
36 
37 #include <ctype.h>
38 #include <inttypes.h>
39 #include <openssl/aes.h>
40 
41 #define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__)
42 #define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \
43          LiveSession::getNameForStream(stream), ##__VA_ARGS__)
44 
45 namespace android {
46 
47 // static
48 const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000ll;
49 const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
50 // LCM of 188 (size of a TS packet) & 1k works well
51 const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
52 
53 struct PlaylistFetcher::DownloadState : public RefBase {
54     DownloadState();
55     void resetState();
56     bool hasSavedState() const;
57     void restoreState(
58             AString &uri,
59             sp<AMessage> &itemMeta,
60             sp<ABuffer> &buffer,
61             sp<ABuffer> &tsBuffer,
62             int32_t &firstSeqNumberInPlaylist,
63             int32_t &lastSeqNumberInPlaylist);
64     void saveState(
65             AString &uri,
66             sp<AMessage> &itemMeta,
67             sp<ABuffer> &buffer,
68             sp<ABuffer> &tsBuffer,
69             int32_t &firstSeqNumberInPlaylist,
70             int32_t &lastSeqNumberInPlaylist);
71 
72 private:
73     bool mHasSavedState;
74     AString mUri;
75     sp<AMessage> mItemMeta;
76     sp<ABuffer> mBuffer;
77     sp<ABuffer> mTsBuffer;
78     int32_t mFirstSeqNumberInPlaylist;
79     int32_t mLastSeqNumberInPlaylist;
80 };
81 
DownloadState()82 PlaylistFetcher::DownloadState::DownloadState() {
83     resetState();
84 }
85 
hasSavedState() const86 bool PlaylistFetcher::DownloadState::hasSavedState() const {
87     return mHasSavedState;
88 }
89 
resetState()90 void PlaylistFetcher::DownloadState::resetState() {
91     mHasSavedState = false;
92 
93     mUri.clear();
94     mItemMeta = NULL;
95     mBuffer = NULL;
96     mTsBuffer = NULL;
97     mFirstSeqNumberInPlaylist = 0;
98     mLastSeqNumberInPlaylist = 0;
99 }
100 
restoreState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)101 void PlaylistFetcher::DownloadState::restoreState(
102         AString &uri,
103         sp<AMessage> &itemMeta,
104         sp<ABuffer> &buffer,
105         sp<ABuffer> &tsBuffer,
106         int32_t &firstSeqNumberInPlaylist,
107         int32_t &lastSeqNumberInPlaylist) {
108     if (!mHasSavedState) {
109         return;
110     }
111 
112     uri = mUri;
113     itemMeta = mItemMeta;
114     buffer = mBuffer;
115     tsBuffer = mTsBuffer;
116     firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist;
117     lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist;
118 
119     resetState();
120 }
121 
saveState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)122 void PlaylistFetcher::DownloadState::saveState(
123         AString &uri,
124         sp<AMessage> &itemMeta,
125         sp<ABuffer> &buffer,
126         sp<ABuffer> &tsBuffer,
127         int32_t &firstSeqNumberInPlaylist,
128         int32_t &lastSeqNumberInPlaylist) {
129     mHasSavedState = true;
130 
131     mUri = uri;
132     mItemMeta = itemMeta;
133     mBuffer = buffer;
134     mTsBuffer = tsBuffer;
135     mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist;
136     mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist;
137 }
138 
PlaylistFetcher(const sp<AMessage> & notify,const sp<LiveSession> & session,const char * uri,int32_t id,int32_t subtitleGeneration)139 PlaylistFetcher::PlaylistFetcher(
140         const sp<AMessage> &notify,
141         const sp<LiveSession> &session,
142         const char *uri,
143         int32_t id,
144         int32_t subtitleGeneration)
145     : mNotify(notify),
146       mSession(session),
147       mURI(uri),
148       mFetcherID(id),
149       mStreamTypeMask(0),
150       mStartTimeUs(-1ll),
151       mSegmentStartTimeUs(-1ll),
152       mDiscontinuitySeq(-1ll),
153       mStartTimeUsRelative(false),
154       mLastPlaylistFetchTimeUs(-1ll),
155       mPlaylistTimeUs(-1ll),
156       mSeqNumber(-1),
157       mNumRetries(0),
158       mStartup(true),
159       mIDRFound(false),
160       mSeekMode(LiveSession::kSeekModeExactPosition),
161       mTimeChangeSignaled(false),
162       mNextPTSTimeUs(-1ll),
163       mMonitorQueueGeneration(0),
164       mSubtitleGeneration(subtitleGeneration),
165       mLastDiscontinuitySeq(-1ll),
166       mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
167       mFirstPTSValid(false),
168       mFirstTimeUs(-1ll),
169       mVideoBuffer(new AnotherPacketSource(NULL)),
170       mThresholdRatio(-1.0f),
171       mDownloadState(new DownloadState()),
172       mHasMetadata(false) {
173     memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
174     mHTTPDownloader = mSession->getHTTPDownloader();
175 }
176 
~PlaylistFetcher()177 PlaylistFetcher::~PlaylistFetcher() {
178 }
179 
getFetcherID() const180 int32_t PlaylistFetcher::getFetcherID() const {
181     return mFetcherID;
182 }
183 
getSegmentStartTimeUs(int32_t seqNumber) const184 int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
185     CHECK(mPlaylist != NULL);
186 
187     int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
188     mPlaylist->getSeqNumberRange(
189             &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
190 
191     CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
192     CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
193 
194     int64_t segmentStartUs = 0ll;
195     for (int32_t index = 0;
196             index < seqNumber - firstSeqNumberInPlaylist; ++index) {
197         sp<AMessage> itemMeta;
198         CHECK(mPlaylist->itemAt(
199                     index, NULL /* uri */, &itemMeta));
200 
201         int64_t itemDurationUs;
202         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
203 
204         segmentStartUs += itemDurationUs;
205     }
206 
207     return segmentStartUs;
208 }
209 
getSegmentDurationUs(int32_t seqNumber) const210 int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const {
211     CHECK(mPlaylist != NULL);
212 
213     int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
214     mPlaylist->getSeqNumberRange(
215             &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
216 
217     CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
218     CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
219 
220     int32_t index = seqNumber - firstSeqNumberInPlaylist;
221     sp<AMessage> itemMeta;
222     CHECK(mPlaylist->itemAt(
223                 index, NULL /* uri */, &itemMeta));
224 
225     int64_t itemDurationUs;
226     CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
227 
228     return itemDurationUs;
229 }
230 
delayUsToRefreshPlaylist() const231 int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
232     int64_t nowUs = ALooper::GetNowUs();
233 
234     if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
235         CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
236         return 0ll;
237     }
238 
239     if (mPlaylist->isComplete()) {
240         return (~0llu >> 1);
241     }
242 
243     int64_t targetDurationUs = mPlaylist->getTargetDuration();
244 
245     int64_t minPlaylistAgeUs;
246 
247     switch (mRefreshState) {
248         case INITIAL_MINIMUM_RELOAD_DELAY:
249         {
250             size_t n = mPlaylist->size();
251             if (n > 0) {
252                 sp<AMessage> itemMeta;
253                 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
254 
255                 int64_t itemDurationUs;
256                 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
257 
258                 minPlaylistAgeUs = itemDurationUs;
259                 break;
260             }
261 
262             // fall through
263         }
264 
265         case FIRST_UNCHANGED_RELOAD_ATTEMPT:
266         {
267             minPlaylistAgeUs = targetDurationUs / 2;
268             break;
269         }
270 
271         case SECOND_UNCHANGED_RELOAD_ATTEMPT:
272         {
273             minPlaylistAgeUs = (targetDurationUs * 3) / 2;
274             break;
275         }
276 
277         case THIRD_UNCHANGED_RELOAD_ATTEMPT:
278         {
279             minPlaylistAgeUs = targetDurationUs * 3;
280             break;
281         }
282 
283         default:
284             TRESPASS();
285             break;
286     }
287 
288     int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
289     return delayUs > 0ll ? delayUs : 0ll;
290 }
291 
decryptBuffer(size_t playlistIndex,const sp<ABuffer> & buffer,bool first)292 status_t PlaylistFetcher::decryptBuffer(
293         size_t playlistIndex, const sp<ABuffer> &buffer,
294         bool first) {
295     sp<AMessage> itemMeta;
296     bool found = false;
297     AString method;
298 
299     for (ssize_t i = playlistIndex; i >= 0; --i) {
300         AString uri;
301         CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
302 
303         if (itemMeta->findString("cipher-method", &method)) {
304             found = true;
305             break;
306         }
307     }
308 
309     if (!found) {
310         method = "NONE";
311     }
312     buffer->meta()->setString("cipher-method", method.c_str());
313 
314     if (method == "NONE") {
315         return OK;
316     } else if (!(method == "AES-128")) {
317         ALOGE("Unsupported cipher method '%s'", method.c_str());
318         return ERROR_UNSUPPORTED;
319     }
320 
321     AString keyURI;
322     if (!itemMeta->findString("cipher-uri", &keyURI)) {
323         ALOGE("Missing key uri");
324         return ERROR_MALFORMED;
325     }
326 
327     ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
328 
329     sp<ABuffer> key;
330     if (index >= 0) {
331         key = mAESKeyForURI.valueAt(index);
332     } else {
333         ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key);
334 
335         if (err == ERROR_NOT_CONNECTED) {
336             return ERROR_NOT_CONNECTED;
337         } else if (err < 0) {
338             ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
339             return ERROR_IO;
340         } else if (key->size() != 16) {
341             ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
342             return ERROR_MALFORMED;
343         }
344 
345         mAESKeyForURI.add(keyURI, key);
346     }
347 
348     AES_KEY aes_key;
349     if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
350         ALOGE("failed to set AES decryption key.");
351         return UNKNOWN_ERROR;
352     }
353 
354     size_t n = buffer->size();
355     if (!n) {
356         return OK;
357     }
358 
359     if (n < 16 || n % 16) {
360         ALOGE("not enough or trailing bytes (%zu) in encrypted buffer", n);
361         return ERROR_MALFORMED;
362     }
363 
364     if (first) {
365         // If decrypting the first block in a file, read the iv from the manifest
366         // or derive the iv from the file's sequence number.
367 
368         AString iv;
369         if (itemMeta->findString("cipher-iv", &iv)) {
370             if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
371                     || iv.size() > 16 * 2 + 2) {
372                 ALOGE("malformed cipher IV '%s'.", iv.c_str());
373                 return ERROR_MALFORMED;
374             }
375 
376             while (iv.size() < 16 * 2 + 2) {
377                 iv.insert("0", 1, 2);
378             }
379 
380             memset(mAESInitVec, 0, sizeof(mAESInitVec));
381             for (size_t i = 0; i < 16; ++i) {
382                 char c1 = tolower(iv.c_str()[2 + 2 * i]);
383                 char c2 = tolower(iv.c_str()[3 + 2 * i]);
384                 if (!isxdigit(c1) || !isxdigit(c2)) {
385                     ALOGE("malformed cipher IV '%s'.", iv.c_str());
386                     return ERROR_MALFORMED;
387                 }
388                 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
389                 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
390 
391                 mAESInitVec[i] = nibble1 << 4 | nibble2;
392             }
393         } else {
394             memset(mAESInitVec, 0, sizeof(mAESInitVec));
395             mAESInitVec[15] = mSeqNumber & 0xff;
396             mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
397             mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
398             mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
399         }
400     }
401 
402     AES_cbc_encrypt(
403             buffer->data(), buffer->data(), buffer->size(),
404             &aes_key, mAESInitVec, AES_DECRYPT);
405 
406     return OK;
407 }
408 
checkDecryptPadding(const sp<ABuffer> & buffer)409 status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
410     AString method;
411     CHECK(buffer->meta()->findString("cipher-method", &method));
412     if (method == "NONE") {
413         return OK;
414     }
415 
416     uint8_t padding = 0;
417     if (buffer->size() > 0) {
418         padding = buffer->data()[buffer->size() - 1];
419     }
420 
421     if (padding > 16) {
422         return ERROR_MALFORMED;
423     }
424 
425     for (size_t i = buffer->size() - padding; i < padding; i++) {
426         if (buffer->data()[i] != padding) {
427             return ERROR_MALFORMED;
428         }
429     }
430 
431     buffer->setRange(buffer->offset(), buffer->size() - padding);
432     return OK;
433 }
434 
postMonitorQueue(int64_t delayUs,int64_t minDelayUs)435 void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
436     int64_t maxDelayUs = delayUsToRefreshPlaylist();
437     if (maxDelayUs < minDelayUs) {
438         maxDelayUs = minDelayUs;
439     }
440     if (delayUs > maxDelayUs) {
441         FLOGV("Need to refresh playlist in %lld", (long long)maxDelayUs);
442         delayUs = maxDelayUs;
443     }
444     sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this);
445     msg->setInt32("generation", mMonitorQueueGeneration);
446     msg->post(delayUs);
447 }
448 
cancelMonitorQueue()449 void PlaylistFetcher::cancelMonitorQueue() {
450     ++mMonitorQueueGeneration;
451 }
452 
setStoppingThreshold(float thresholdRatio,bool disconnect)453 void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) {
454     {
455         AutoMutex _l(mThresholdLock);
456         mThresholdRatio = thresholdRatio;
457     }
458     if (disconnect) {
459         mHTTPDownloader->disconnect();
460     }
461 }
462 
resetStoppingThreshold(bool disconnect)463 void PlaylistFetcher::resetStoppingThreshold(bool disconnect) {
464     {
465         AutoMutex _l(mThresholdLock);
466         mThresholdRatio = -1.0f;
467     }
468     if (disconnect) {
469         mHTTPDownloader->disconnect();
470     } else {
471         // allow reconnect
472         mHTTPDownloader->reconnect();
473     }
474 }
475 
getStoppingThreshold()476 float PlaylistFetcher::getStoppingThreshold() {
477     AutoMutex _l(mThresholdLock);
478     return mThresholdRatio;
479 }
480 
startAsync(const sp<AnotherPacketSource> & audioSource,const sp<AnotherPacketSource> & videoSource,const sp<AnotherPacketSource> & subtitleSource,const sp<AnotherPacketSource> & metadataSource,int64_t startTimeUs,int64_t segmentStartTimeUs,int32_t startDiscontinuitySeq,LiveSession::SeekMode seekMode)481 void PlaylistFetcher::startAsync(
482         const sp<AnotherPacketSource> &audioSource,
483         const sp<AnotherPacketSource> &videoSource,
484         const sp<AnotherPacketSource> &subtitleSource,
485         const sp<AnotherPacketSource> &metadataSource,
486         int64_t startTimeUs,
487         int64_t segmentStartTimeUs,
488         int32_t startDiscontinuitySeq,
489         LiveSession::SeekMode seekMode) {
490     sp<AMessage> msg = new AMessage(kWhatStart, this);
491 
492     uint32_t streamTypeMask = 0ul;
493 
494     if (audioSource != NULL) {
495         msg->setPointer("audioSource", audioSource.get());
496         streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
497     }
498 
499     if (videoSource != NULL) {
500         msg->setPointer("videoSource", videoSource.get());
501         streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
502     }
503 
504     if (subtitleSource != NULL) {
505         msg->setPointer("subtitleSource", subtitleSource.get());
506         streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
507     }
508 
509     if (metadataSource != NULL) {
510         msg->setPointer("metadataSource", metadataSource.get());
511         // metadataSource does not affect streamTypeMask.
512     }
513 
514     msg->setInt32("streamTypeMask", streamTypeMask);
515     msg->setInt64("startTimeUs", startTimeUs);
516     msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
517     msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
518     msg->setInt32("seekMode", seekMode);
519     msg->post();
520 }
521 
522 /*
523  * pauseAsync
524  *
525  * threshold: 0.0f - pause after current fetch block (default 47Kbytes)
526  *           -1.0f - pause after finishing current segment
527  *        0.0~1.0f - pause if remaining of current segment exceeds threshold
528  */
pauseAsync(float thresholdRatio,bool disconnect)529 void PlaylistFetcher::pauseAsync(
530         float thresholdRatio, bool disconnect) {
531     setStoppingThreshold(thresholdRatio, disconnect);
532 
533     (new AMessage(kWhatPause, this))->post();
534 }
535 
stopAsync(bool clear)536 void PlaylistFetcher::stopAsync(bool clear) {
537     setStoppingThreshold(0.0f, true /* disconncect */);
538 
539     sp<AMessage> msg = new AMessage(kWhatStop, this);
540     msg->setInt32("clear", clear);
541     msg->post();
542 }
543 
resumeUntilAsync(const sp<AMessage> & params)544 void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
545     FLOGV("resumeUntilAsync: params=%s", params->debugString().c_str());
546 
547     AMessage* msg = new AMessage(kWhatResumeUntil, this);
548     msg->setMessage("params", params);
549     msg->post();
550 }
551 
fetchPlaylistAsync()552 void PlaylistFetcher::fetchPlaylistAsync() {
553     (new AMessage(kWhatFetchPlaylist, this))->post();
554 }
555 
onMessageReceived(const sp<AMessage> & msg)556 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
557     switch (msg->what()) {
558         case kWhatStart:
559         {
560             status_t err = onStart(msg);
561 
562             sp<AMessage> notify = mNotify->dup();
563             notify->setInt32("what", kWhatStarted);
564             notify->setInt32("err", err);
565             notify->post();
566             break;
567         }
568 
569         case kWhatPause:
570         {
571             onPause();
572 
573             sp<AMessage> notify = mNotify->dup();
574             notify->setInt32("what", kWhatPaused);
575             notify->setInt32("seekMode",
576                     mDownloadState->hasSavedState()
577                     ? LiveSession::kSeekModeNextSample
578                     : LiveSession::kSeekModeNextSegment);
579             notify->post();
580             break;
581         }
582 
583         case kWhatStop:
584         {
585             onStop(msg);
586 
587             sp<AMessage> notify = mNotify->dup();
588             notify->setInt32("what", kWhatStopped);
589             notify->post();
590             break;
591         }
592 
593         case kWhatFetchPlaylist:
594         {
595             bool unchanged;
596             sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
597                     mURI.c_str(), NULL /* curPlaylistHash */, &unchanged);
598 
599             sp<AMessage> notify = mNotify->dup();
600             notify->setInt32("what", kWhatPlaylistFetched);
601             notify->setObject("playlist", playlist);
602             notify->post();
603             break;
604         }
605 
606         case kWhatMonitorQueue:
607         case kWhatDownloadNext:
608         {
609             int32_t generation;
610             CHECK(msg->findInt32("generation", &generation));
611 
612             if (generation != mMonitorQueueGeneration) {
613                 // Stale event
614                 break;
615             }
616 
617             if (msg->what() == kWhatMonitorQueue) {
618                 onMonitorQueue();
619             } else {
620                 onDownloadNext();
621             }
622             break;
623         }
624 
625         case kWhatResumeUntil:
626         {
627             onResumeUntil(msg);
628             break;
629         }
630 
631         default:
632             TRESPASS();
633     }
634 }
635 
onStart(const sp<AMessage> & msg)636 status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
637     mPacketSources.clear();
638     mStopParams.clear();
639     mStartTimeUsNotify = mNotify->dup();
640     mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
641     mStartTimeUsNotify->setString("uri", mURI);
642 
643     uint32_t streamTypeMask;
644     CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
645 
646     int64_t startTimeUs;
647     int64_t segmentStartTimeUs;
648     int32_t startDiscontinuitySeq;
649     int32_t seekMode;
650     CHECK(msg->findInt64("startTimeUs", &startTimeUs));
651     CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
652     CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
653     CHECK(msg->findInt32("seekMode", &seekMode));
654 
655     if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
656         void *ptr;
657         CHECK(msg->findPointer("audioSource", &ptr));
658 
659         mPacketSources.add(
660                 LiveSession::STREAMTYPE_AUDIO,
661                 static_cast<AnotherPacketSource *>(ptr));
662     }
663 
664     if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
665         void *ptr;
666         CHECK(msg->findPointer("videoSource", &ptr));
667 
668         mPacketSources.add(
669                 LiveSession::STREAMTYPE_VIDEO,
670                 static_cast<AnotherPacketSource *>(ptr));
671     }
672 
673     if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
674         void *ptr;
675         CHECK(msg->findPointer("subtitleSource", &ptr));
676 
677         mPacketSources.add(
678                 LiveSession::STREAMTYPE_SUBTITLES,
679                 static_cast<AnotherPacketSource *>(ptr));
680     }
681 
682     void *ptr;
683     // metadataSource is not part of streamTypeMask
684     if ((streamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO))
685             && msg->findPointer("metadataSource", &ptr)) {
686         mPacketSources.add(
687                 LiveSession::STREAMTYPE_METADATA,
688                 static_cast<AnotherPacketSource *>(ptr));
689     }
690 
691     mStreamTypeMask = streamTypeMask;
692 
693     mSegmentStartTimeUs = segmentStartTimeUs;
694 
695     if (startDiscontinuitySeq >= 0) {
696         mDiscontinuitySeq = startDiscontinuitySeq;
697     }
698 
699     mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
700     mSeekMode = (LiveSession::SeekMode) seekMode;
701 
702     if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) {
703         mStartup = true;
704         mIDRFound = false;
705         mVideoBuffer->clear();
706     }
707 
708     if (startTimeUs >= 0) {
709         mStartTimeUs = startTimeUs;
710         mFirstPTSValid = false;
711         mSeqNumber = -1;
712         mTimeChangeSignaled = false;
713         mDownloadState->resetState();
714     }
715 
716     postMonitorQueue();
717 
718     return OK;
719 }
720 
onPause()721 void PlaylistFetcher::onPause() {
722     cancelMonitorQueue();
723     mLastDiscontinuitySeq = mDiscontinuitySeq;
724 
725     resetStoppingThreshold(false /* disconnect */);
726 }
727 
onStop(const sp<AMessage> & msg)728 void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
729     cancelMonitorQueue();
730 
731     int32_t clear;
732     CHECK(msg->findInt32("clear", &clear));
733     if (clear) {
734         for (size_t i = 0; i < mPacketSources.size(); i++) {
735             sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
736             packetSource->clear();
737         }
738     }
739 
740     mDownloadState->resetState();
741     mPacketSources.clear();
742     mStreamTypeMask = 0;
743 
744     resetStoppingThreshold(true /* disconnect */);
745 }
746 
747 // Resume until we have reached the boundary timestamps listed in `msg`; when
748 // the remaining time is too short (within a resume threshold) stop immediately
749 // instead.
onResumeUntil(const sp<AMessage> & msg)750 status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
751     sp<AMessage> params;
752     CHECK(msg->findMessage("params", &params));
753 
754     mStopParams = params;
755     onDownloadNext();
756 
757     return OK;
758 }
759 
notifyStopReached()760 void PlaylistFetcher::notifyStopReached() {
761     sp<AMessage> notify = mNotify->dup();
762     notify->setInt32("what", kWhatStopReached);
763     notify->post();
764 }
765 
notifyError(status_t err)766 void PlaylistFetcher::notifyError(status_t err) {
767     sp<AMessage> notify = mNotify->dup();
768     notify->setInt32("what", kWhatError);
769     notify->setInt32("err", err);
770     notify->post();
771 }
772 
queueDiscontinuity(ATSParser::DiscontinuityType type,const sp<AMessage> & extra)773 void PlaylistFetcher::queueDiscontinuity(
774         ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
775     for (size_t i = 0; i < mPacketSources.size(); ++i) {
776         // do not discard buffer upon #EXT-X-DISCONTINUITY tag
777         // (seek will discard buffer by abandoning old fetchers)
778         mPacketSources.valueAt(i)->queueDiscontinuity(
779                 type, extra, false /* discard */);
780     }
781 }
782 
onMonitorQueue()783 void PlaylistFetcher::onMonitorQueue() {
784     // in the middle of an unfinished download, delay
785     // playlist refresh as it'll change seq numbers
786     if (!mDownloadState->hasSavedState()) {
787         refreshPlaylist();
788     }
789 
790     int64_t targetDurationUs = kMinBufferedDurationUs;
791     if (mPlaylist != NULL) {
792         targetDurationUs = mPlaylist->getTargetDuration();
793     }
794 
795     int64_t bufferedDurationUs = 0ll;
796     status_t finalResult = OK;
797     if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
798         sp<AnotherPacketSource> packetSource =
799             mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
800 
801         bufferedDurationUs =
802                 packetSource->getBufferedDurationUs(&finalResult);
803     } else {
804         // Use min stream duration, but ignore streams that never have any packet
805         // enqueued to prevent us from waiting on a non-existent stream;
806         // when we cannot make out from the manifest what streams are included in
807         // a playlist we might assume extra streams.
808         bufferedDurationUs = -1ll;
809         for (size_t i = 0; i < mPacketSources.size(); ++i) {
810             if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0
811                     || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) {
812                 continue;
813             }
814 
815             int64_t bufferedStreamDurationUs =
816                 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
817 
818             FSLOGV(mPacketSources.keyAt(i), "buffered %lld", (long long)bufferedStreamDurationUs);
819 
820             if (bufferedDurationUs == -1ll
821                  || bufferedStreamDurationUs < bufferedDurationUs) {
822                 bufferedDurationUs = bufferedStreamDurationUs;
823             }
824         }
825         if (bufferedDurationUs == -1ll) {
826             bufferedDurationUs = 0ll;
827         }
828     }
829 
830     if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) {
831         FLOGV("monitoring, buffered=%lld < %lld",
832                 (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs);
833 
834         // delay the next download slightly; hopefully this gives other concurrent fetchers
835         // a better chance to run.
836         // onDownloadNext();
837         sp<AMessage> msg = new AMessage(kWhatDownloadNext, this);
838         msg->setInt32("generation", mMonitorQueueGeneration);
839         msg->post(1000l);
840     } else {
841         // We'd like to maintain buffering above durationToBufferUs, so try
842         // again when buffer just about to go below durationToBufferUs
843         // (or after targetDurationUs / 2, whichever is smaller).
844         int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000ll;
845         if (delayUs > targetDurationUs / 2) {
846             delayUs = targetDurationUs / 2;
847         }
848 
849         FLOGV("pausing for %lld, buffered=%lld > %lld",
850                 (long long)delayUs,
851                 (long long)bufferedDurationUs,
852                 (long long)kMinBufferedDurationUs);
853 
854         postMonitorQueue(delayUs);
855     }
856 }
857 
refreshPlaylist()858 status_t PlaylistFetcher::refreshPlaylist() {
859     if (delayUsToRefreshPlaylist() <= 0) {
860         bool unchanged;
861         sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
862                 mURI.c_str(), mPlaylistHash, &unchanged);
863 
864         if (playlist == NULL) {
865             if (unchanged) {
866                 // We succeeded in fetching the playlist, but it was
867                 // unchanged from the last time we tried.
868 
869                 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
870                     mRefreshState = (RefreshState)(mRefreshState + 1);
871                 }
872             } else {
873                 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
874                 return ERROR_IO;
875             }
876         } else {
877             mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
878             mPlaylist = playlist;
879 
880             if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
881                 updateDuration();
882             }
883             // Notify LiveSession to use target-duration based buffering level
884             // for up/down switch. Default LiveSession::kUpSwitchMark may not
885             // be reachable for live streams, as our max buffering amount is
886             // limited to 3 segments.
887             if (!mPlaylist->isComplete()) {
888                 updateTargetDuration();
889             }
890             mPlaylistTimeUs = ALooper::GetNowUs();
891         }
892 
893         mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
894     }
895     return OK;
896 }
897 
898 // static
bufferStartsWithTsSyncByte(const sp<ABuffer> & buffer)899 bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
900     return buffer->size() > 0 && buffer->data()[0] == 0x47;
901 }
902 
shouldPauseDownload()903 bool PlaylistFetcher::shouldPauseDownload() {
904     if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
905         // doesn't apply to subtitles
906         return false;
907     }
908 
909     // Calculate threshold to abort current download
910     float thresholdRatio = getStoppingThreshold();
911 
912     if (thresholdRatio < 0.0f) {
913         // never abort
914         return false;
915     } else if (thresholdRatio == 0.0f) {
916         // immediately abort
917         return true;
918     }
919 
920     // now we have a positive thresholdUs, abort if remaining
921     // portion to download is over that threshold.
922     if (mSegmentFirstPTS < 0) {
923         // this means we haven't even find the first access unit,
924         // abort now as we must be very far away from the end.
925         return true;
926     }
927     int64_t lastEnqueueUs = mSegmentFirstPTS;
928     for (size_t i = 0; i < mPacketSources.size(); ++i) {
929         if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
930             continue;
931         }
932         sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
933         int32_t type;
934         if (meta == NULL || meta->findInt32("discontinuity", &type)) {
935             continue;
936         }
937         int64_t tmpUs;
938         CHECK(meta->findInt64("timeUs", &tmpUs));
939         if (tmpUs > lastEnqueueUs) {
940             lastEnqueueUs = tmpUs;
941         }
942     }
943     lastEnqueueUs -= mSegmentFirstPTS;
944 
945     int64_t targetDurationUs = mPlaylist->getTargetDuration();
946     int64_t thresholdUs = thresholdRatio * targetDurationUs;
947 
948     FLOGV("%spausing now, thresholdUs %lld, remaining %lld",
949             targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ",
950             (long long)thresholdUs,
951             (long long)(targetDurationUs - lastEnqueueUs));
952 
953     if (targetDurationUs - lastEnqueueUs > thresholdUs) {
954         return true;
955     }
956     return false;
957 }
958 
initDownloadState(AString & uri,sp<AMessage> & itemMeta,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)959 bool PlaylistFetcher::initDownloadState(
960         AString &uri,
961         sp<AMessage> &itemMeta,
962         int32_t &firstSeqNumberInPlaylist,
963         int32_t &lastSeqNumberInPlaylist) {
964     status_t err = refreshPlaylist();
965     firstSeqNumberInPlaylist = 0;
966     lastSeqNumberInPlaylist = 0;
967     bool discontinuity = false;
968 
969     if (mPlaylist != NULL) {
970         mPlaylist->getSeqNumberRange(
971                 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
972 
973         if (mDiscontinuitySeq < 0) {
974             mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
975         }
976     }
977 
978     mSegmentFirstPTS = -1ll;
979 
980     if (mPlaylist != NULL && mSeqNumber < 0) {
981         CHECK_GE(mStartTimeUs, 0ll);
982 
983         if (mSegmentStartTimeUs < 0) {
984             if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
985                 // If this is a live session, start 3 segments from the end on connect
986                 mSeqNumber = lastSeqNumberInPlaylist - 3;
987                 if (mSeqNumber < firstSeqNumberInPlaylist) {
988                     mSeqNumber = firstSeqNumberInPlaylist;
989                 }
990             } else {
991                 // When seeking mSegmentStartTimeUs is unavailable (< 0), we
992                 // use mStartTimeUs (client supplied timestamp) to determine both start segment
993                 // and relative position inside a segment
994                 mSeqNumber = getSeqNumberForTime(mStartTimeUs);
995                 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
996             }
997             mStartTimeUsRelative = true;
998             FLOGV("Initial sequence number for time %lld is %d from (%d .. %d)",
999                     (long long)mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
1000                     lastSeqNumberInPlaylist);
1001         } else {
1002             // When adapting or track switching, mSegmentStartTimeUs (relative
1003             // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
1004             // timestamps coming from the media container) is used to determine the position
1005             // inside a segments.
1006             if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES
1007                     && mSeekMode != LiveSession::kSeekModeNextSample) {
1008                 // avoid double fetch/decode
1009                 // Use (mSegmentStartTimeUs + 1/2 * targetDurationUs) to search
1010                 // for the starting segment in new variant.
1011                 // If the two variants' segments are aligned, this gives the
1012                 // next segment. If they're not aligned, this gives the segment
1013                 // that overlaps no more than 1/2 * targetDurationUs.
1014                 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs
1015                         + mPlaylist->getTargetDuration() / 2);
1016             } else {
1017                 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
1018             }
1019             ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
1020             if (mSeqNumber < minSeq) {
1021                 mSeqNumber = minSeq;
1022             }
1023 
1024             if (mSeqNumber < firstSeqNumberInPlaylist) {
1025                 mSeqNumber = firstSeqNumberInPlaylist;
1026             }
1027 
1028             if (mSeqNumber > lastSeqNumberInPlaylist) {
1029                 mSeqNumber = lastSeqNumberInPlaylist;
1030             }
1031             FLOGV("Initial sequence number is %d from (%d .. %d)",
1032                     mSeqNumber, firstSeqNumberInPlaylist,
1033                     lastSeqNumberInPlaylist);
1034         }
1035     }
1036 
1037     // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
1038     if (mSeqNumber < firstSeqNumberInPlaylist
1039             || mSeqNumber > lastSeqNumberInPlaylist
1040             || err != OK) {
1041         if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
1042             ++mNumRetries;
1043 
1044             if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
1045                 // make sure we reach this retry logic on refresh failures
1046                 // by adding an err != OK clause to all enclosing if's.
1047 
1048                 // refresh in increasing fraction (1/2, 1/3, ...) of the
1049                 // playlist's target duration or 3 seconds, whichever is less
1050                 int64_t delayUs = kMaxMonitorDelayUs;
1051                 if (mPlaylist != NULL) {
1052                     delayUs = mPlaylist->size() * mPlaylist->getTargetDuration()
1053                             / (1 + mNumRetries);
1054                 }
1055                 if (delayUs > kMaxMonitorDelayUs) {
1056                     delayUs = kMaxMonitorDelayUs;
1057                 }
1058                 FLOGV("sequence number high: %d from (%d .. %d), "
1059                       "monitor in %lld (retry=%d)",
1060                         mSeqNumber, firstSeqNumberInPlaylist,
1061                         lastSeqNumberInPlaylist, (long long)delayUs, mNumRetries);
1062                 postMonitorQueue(delayUs);
1063                 return false;
1064             }
1065 
1066             if (err != OK) {
1067                 notifyError(err);
1068                 return false;
1069             }
1070 
1071             // we've missed the boat, let's start 3 segments prior to the latest sequence
1072             // number available and signal a discontinuity.
1073 
1074             ALOGI("We've missed the boat, restarting playback."
1075                   "  mStartup=%d, was  looking for %d in %d-%d",
1076                     mStartup, mSeqNumber, firstSeqNumberInPlaylist,
1077                     lastSeqNumberInPlaylist);
1078             if (mStopParams != NULL) {
1079                 // we should have kept on fetching until we hit the boundaries in mStopParams,
1080                 // but since the segments we are supposed to fetch have already rolled off
1081                 // the playlist, i.e. we have already missed the boat, we inevitably have to
1082                 // skip.
1083                 notifyStopReached();
1084                 return false;
1085             }
1086             mSeqNumber = lastSeqNumberInPlaylist - 3;
1087             if (mSeqNumber < firstSeqNumberInPlaylist) {
1088                 mSeqNumber = firstSeqNumberInPlaylist;
1089             }
1090             discontinuity = true;
1091 
1092             // fall through
1093         } else {
1094             if (mPlaylist != NULL) {
1095                 if (mSeqNumber >= firstSeqNumberInPlaylist + (int32_t)mPlaylist->size()
1096                         && !mPlaylist->isComplete()) {
1097                     // Live playlists
1098                     ALOGW("sequence number %d not yet available", mSeqNumber);
1099                     postMonitorQueue(delayUsToRefreshPlaylist());
1100                     return false;
1101                 }
1102                 ALOGE("Cannot find sequence number %d in playlist "
1103                      "(contains %d - %d)",
1104                      mSeqNumber, firstSeqNumberInPlaylist,
1105                       firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
1106 
1107                 if (mTSParser != NULL) {
1108                     mTSParser->signalEOS(ERROR_END_OF_STREAM);
1109                     // Use an empty buffer; we don't have any new data, just want to extract
1110                     // potential new access units after flush.  Reset mSeqNumber to
1111                     // lastSeqNumberInPlaylist such that we set the correct access unit
1112                     // properties in extractAndQueueAccessUnitsFromTs.
1113                     sp<ABuffer> buffer = new ABuffer(0);
1114                     mSeqNumber = lastSeqNumberInPlaylist;
1115                     extractAndQueueAccessUnitsFromTs(buffer);
1116                 }
1117                 notifyError(ERROR_END_OF_STREAM);
1118             } else {
1119                 // It's possible that we were never able to download the playlist.
1120                 // In this case we should notify error, instead of EOS, as EOS during
1121                 // prepare means we succeeded in downloading everything.
1122                 ALOGE("Failed to download playlist!");
1123                 notifyError(ERROR_IO);
1124             }
1125 
1126             return false;
1127         }
1128     }
1129 
1130     mNumRetries = 0;
1131 
1132     CHECK(mPlaylist->itemAt(
1133                 mSeqNumber - firstSeqNumberInPlaylist,
1134                 &uri,
1135                 &itemMeta));
1136 
1137     CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq));
1138 
1139     int32_t val;
1140     if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
1141         discontinuity = true;
1142     } else if (mLastDiscontinuitySeq >= 0
1143             && mDiscontinuitySeq != mLastDiscontinuitySeq) {
1144         // Seek jumped to a new discontinuity sequence. We need to signal
1145         // a format change to decoder. Decoder needs to shutdown and be
1146         // created again if seamless format change is unsupported.
1147         FLOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, "
1148                 "mDiscontinuitySeq %d, mStartTimeUs %lld",
1149                 mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs);
1150         discontinuity = true;
1151     }
1152     mLastDiscontinuitySeq = -1;
1153 
1154     // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
1155     // this avoids interleaved connections to the key and segment file.
1156     {
1157         sp<ABuffer> junk = new ABuffer(16);
1158         junk->setRange(0, 16);
1159         status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
1160                 true /* first */);
1161         if (err == ERROR_NOT_CONNECTED) {
1162             return false;
1163         } else if (err != OK) {
1164             notifyError(err);
1165             return false;
1166         }
1167     }
1168 
1169     if ((mStartup && !mTimeChangeSignaled) || discontinuity) {
1170         // We need to signal a time discontinuity to ATSParser on the
1171         // first segment after start, or on a discontinuity segment.
1172         // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX()
1173         // to send the time discontinuity.
1174         if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
1175             // If this was a live event this made no sense since
1176             // we don't have access to all the segment before the current
1177             // one.
1178             mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
1179         }
1180 
1181         // Setting mTimeChangeSignaled to true, so that if start time
1182         // searching goes into 2nd segment (without a discontinuity),
1183         // we don't reset time again. It causes corruption when pending
1184         // data in ATSParser is cleared.
1185         mTimeChangeSignaled = true;
1186     }
1187 
1188     if (discontinuity) {
1189         ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
1190 
1191         // Signal a format discontinuity to ATSParser to clear partial data
1192         // from previous streams. Not doing this causes bitstream corruption.
1193         if (mTSParser != NULL) {
1194             mTSParser.clear();
1195         }
1196 
1197         queueDiscontinuity(
1198                 ATSParser::DISCONTINUITY_FORMAT_ONLY,
1199                 NULL /* extra */);
1200 
1201         if (mStartup && mStartTimeUsRelative && mFirstPTSValid) {
1202             // This means we guessed mStartTimeUs to be in the previous
1203             // segment (likely very close to the end), but either video or
1204             // audio has not found start by the end of that segment.
1205             //
1206             // If this new segment is not a discontinuity, keep searching.
1207             //
1208             // If this new segment even got a discontinuity marker, just
1209             // set mStartTimeUs=0, and take all samples from now on.
1210             mStartTimeUs = 0;
1211             mFirstPTSValid = false;
1212             mIDRFound = false;
1213             mVideoBuffer->clear();
1214         }
1215     }
1216 
1217     FLOGV("fetching segment %d from (%d .. %d)",
1218             mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1219     return true;
1220 }
1221 
onDownloadNext()1222 void PlaylistFetcher::onDownloadNext() {
1223     AString uri;
1224     sp<AMessage> itemMeta;
1225     sp<ABuffer> buffer;
1226     sp<ABuffer> tsBuffer;
1227     int32_t firstSeqNumberInPlaylist = 0;
1228     int32_t lastSeqNumberInPlaylist = 0;
1229     bool connectHTTP = true;
1230 
1231     if (mDownloadState->hasSavedState()) {
1232         mDownloadState->restoreState(
1233                 uri,
1234                 itemMeta,
1235                 buffer,
1236                 tsBuffer,
1237                 firstSeqNumberInPlaylist,
1238                 lastSeqNumberInPlaylist);
1239         connectHTTP = false;
1240         FLOGV("resuming: '%s'", uri.c_str());
1241     } else {
1242         if (!initDownloadState(
1243                 uri,
1244                 itemMeta,
1245                 firstSeqNumberInPlaylist,
1246                 lastSeqNumberInPlaylist)) {
1247             return;
1248         }
1249         FLOGV("fetching: '%s'", uri.c_str());
1250     }
1251 
1252     int64_t range_offset, range_length;
1253     if (!itemMeta->findInt64("range-offset", &range_offset)
1254             || !itemMeta->findInt64("range-length", &range_length)) {
1255         range_offset = 0;
1256         range_length = -1;
1257     }
1258 
1259     // block-wise download
1260     bool shouldPause = false;
1261     ssize_t bytesRead;
1262     do {
1263         int64_t startUs = ALooper::GetNowUs();
1264         bytesRead = mHTTPDownloader->fetchBlock(
1265                 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize,
1266                 NULL /* actualURL */, connectHTTP);
1267         int64_t delayUs = ALooper::GetNowUs() - startUs;
1268 
1269         if (bytesRead == ERROR_NOT_CONNECTED) {
1270             return;
1271         }
1272         if (bytesRead < 0) {
1273             status_t err = bytesRead;
1274             ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
1275             notifyError(err);
1276             return;
1277         }
1278 
1279         // add sample for bandwidth estimation, excluding samples from subtitles (as
1280         // its too small), or during startup/resumeUntil (when we could have more than
1281         // one connection open which affects bandwidth)
1282         if (!mStartup && mStopParams == NULL && bytesRead > 0
1283                 && (mStreamTypeMask
1284                         & (LiveSession::STREAMTYPE_AUDIO
1285                         | LiveSession::STREAMTYPE_VIDEO))) {
1286             mSession->addBandwidthMeasurement(bytesRead, delayUs);
1287             if (delayUs > 2000000ll) {
1288                 FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip",
1289                         bytesRead, (double)delayUs / 1.0e6);
1290             }
1291         }
1292 
1293         connectHTTP = false;
1294 
1295         CHECK(buffer != NULL);
1296 
1297         size_t size = buffer->size();
1298         // Set decryption range.
1299         buffer->setRange(size - bytesRead, bytesRead);
1300         status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
1301                 buffer->offset() == 0 /* first */);
1302         // Unset decryption range.
1303         buffer->setRange(0, size);
1304 
1305         if (err != OK) {
1306             ALOGE("decryptBuffer failed w/ error %d", err);
1307 
1308             notifyError(err);
1309             return;
1310         }
1311 
1312         bool startUp = mStartup; // save current start up state
1313 
1314         err = OK;
1315         if (bufferStartsWithTsSyncByte(buffer)) {
1316             // Incremental extraction is only supported for MPEG2 transport streams.
1317             if (tsBuffer == NULL) {
1318                 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1319                 tsBuffer->setRange(0, 0);
1320             } else if (tsBuffer->capacity() != buffer->capacity()) {
1321                 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
1322                 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1323                 tsBuffer->setRange(tsOff, tsSize);
1324             }
1325             tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
1326             err = extractAndQueueAccessUnitsFromTs(tsBuffer);
1327         }
1328 
1329         if (err == -EAGAIN) {
1330             // starting sequence number too low/high
1331             mTSParser.clear();
1332             for (size_t i = 0; i < mPacketSources.size(); i++) {
1333                 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1334                 packetSource->clear();
1335             }
1336             postMonitorQueue();
1337             return;
1338         } else if (err == ERROR_OUT_OF_RANGE) {
1339             // reached stopping point
1340             notifyStopReached();
1341             return;
1342         } else if (err != OK) {
1343             notifyError(err);
1344             return;
1345         }
1346         // If we're switching, post start notification
1347         // this should only be posted when the last chunk is full processed by TSParser
1348         if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1349             CHECK(mStartTimeUsNotify != NULL);
1350             mStartTimeUsNotify->post();
1351             mStartTimeUsNotify.clear();
1352             shouldPause = true;
1353         }
1354         if (shouldPause || shouldPauseDownload()) {
1355             // save state and return if this is not the last chunk,
1356             // leaving the fetcher in paused state.
1357             if (bytesRead != 0) {
1358                 mDownloadState->saveState(
1359                         uri,
1360                         itemMeta,
1361                         buffer,
1362                         tsBuffer,
1363                         firstSeqNumberInPlaylist,
1364                         lastSeqNumberInPlaylist);
1365                 return;
1366             }
1367             shouldPause = true;
1368         }
1369     } while (bytesRead != 0);
1370 
1371     if (bufferStartsWithTsSyncByte(buffer)) {
1372         // If we don't see a stream in the program table after fetching a full ts segment
1373         // mark it as nonexistent.
1374         ATSParser::SourceType srcTypes[] =
1375                 { ATSParser::VIDEO, ATSParser::AUDIO };
1376         LiveSession::StreamType streamTypes[] =
1377                 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
1378         const size_t kNumTypes = NELEM(srcTypes);
1379 
1380         for (size_t i = 0; i < kNumTypes; i++) {
1381             ATSParser::SourceType srcType = srcTypes[i];
1382             LiveSession::StreamType streamType = streamTypes[i];
1383 
1384             sp<AnotherPacketSource> source =
1385                 static_cast<AnotherPacketSource *>(
1386                     mTSParser->getSource(srcType).get());
1387 
1388             if (!mTSParser->hasSource(srcType)) {
1389                 ALOGW("MPEG2 Transport stream does not contain %s data.",
1390                       srcType == ATSParser::VIDEO ? "video" : "audio");
1391 
1392                 mStreamTypeMask &= ~streamType;
1393                 mPacketSources.removeItem(streamType);
1394             }
1395         }
1396 
1397     }
1398 
1399     if (checkDecryptPadding(buffer) != OK) {
1400         ALOGE("Incorrect padding bytes after decryption.");
1401         notifyError(ERROR_MALFORMED);
1402         return;
1403     }
1404 
1405     if (tsBuffer != NULL) {
1406         AString method;
1407         CHECK(buffer->meta()->findString("cipher-method", &method));
1408         if ((tsBuffer->size() > 0 && method == "NONE")
1409                 || tsBuffer->size() > 16) {
1410             ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1411                     "bytes in length.");
1412             notifyError(ERROR_MALFORMED);
1413             return;
1414         }
1415     }
1416 
1417     // bulk extract non-ts files
1418     bool startUp = mStartup;
1419     if (tsBuffer == NULL) {
1420         status_t err = extractAndQueueAccessUnits(buffer, itemMeta);
1421         if (err == -EAGAIN) {
1422             // starting sequence number too low/high
1423             postMonitorQueue();
1424             return;
1425         } else if (err == ERROR_OUT_OF_RANGE) {
1426             // reached stopping point
1427             notifyStopReached();
1428             return;
1429         } else if (err != OK) {
1430             notifyError(err);
1431             return;
1432         }
1433     }
1434 
1435     ++mSeqNumber;
1436 
1437     // if adapting, pause after found the next starting point
1438     if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1439         CHECK(mStartTimeUsNotify != NULL);
1440         mStartTimeUsNotify->post();
1441         mStartTimeUsNotify.clear();
1442         shouldPause = true;
1443     }
1444 
1445     if (!shouldPause) {
1446         postMonitorQueue();
1447     }
1448 }
1449 
1450 /*
1451  * returns true if we need to adjust mSeqNumber
1452  */
adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs)1453 bool PlaylistFetcher::adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs) {
1454     int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1455 
1456     int64_t minDiffUs, maxDiffUs;
1457     if (mSeekMode == LiveSession::kSeekModeNextSample) {
1458         // if the previous fetcher paused in the middle of a segment, we
1459         // want to start at a segment that overlaps the last sample
1460         minDiffUs = -mPlaylist->getTargetDuration();
1461         maxDiffUs = 0ll;
1462     } else {
1463         // if the previous fetcher paused at the end of a segment, ideally
1464         // we want to start at the segment that's roughly aligned with its
1465         // next segment, but if the two variants are not well aligned we
1466         // adjust the diff to within (-T/2, T/2)
1467         minDiffUs = -mPlaylist->getTargetDuration() / 2;
1468         maxDiffUs = mPlaylist->getTargetDuration() / 2;
1469     }
1470 
1471     int32_t oldSeqNumber = mSeqNumber;
1472     ssize_t index = mSeqNumber - firstSeqNumberInPlaylist;
1473 
1474     // adjust anchorTimeUs to within (minDiffUs, maxDiffUs) from mStartTimeUs
1475     int64_t diffUs = anchorTimeUs - mStartTimeUs;
1476     if (diffUs > maxDiffUs) {
1477         while (index > 0 && diffUs > maxDiffUs) {
1478             --index;
1479 
1480             sp<AMessage> itemMeta;
1481             CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1482 
1483             int64_t itemDurationUs;
1484             CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1485 
1486             diffUs -= itemDurationUs;
1487         }
1488     } else if (diffUs < minDiffUs) {
1489         while (index + 1 < (ssize_t) mPlaylist->size()
1490                 && diffUs < minDiffUs) {
1491             ++index;
1492 
1493             sp<AMessage> itemMeta;
1494             CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1495 
1496             int64_t itemDurationUs;
1497             CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1498 
1499             diffUs += itemDurationUs;
1500         }
1501     }
1502 
1503     mSeqNumber = firstSeqNumberInPlaylist + index;
1504 
1505     if (mSeqNumber != oldSeqNumber) {
1506         FLOGV("guessed wrong seg number: diff %lld out of [%lld, %lld]",
1507                 (long long) anchorTimeUs - mStartTimeUs,
1508                 (long long) minDiffUs,
1509                 (long long) maxDiffUs);
1510         return true;
1511     }
1512     return false;
1513 }
1514 
getSeqNumberForDiscontinuity(size_t discontinuitySeq) const1515 int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1516     int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1517 
1518     size_t index = 0;
1519     while (index < mPlaylist->size()) {
1520         sp<AMessage> itemMeta;
1521         CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1522         size_t curDiscontinuitySeq;
1523         CHECK(itemMeta->findInt32("discontinuity-sequence", (int32_t *)&curDiscontinuitySeq));
1524         int32_t seqNumber = firstSeqNumberInPlaylist + index;
1525         if (curDiscontinuitySeq == discontinuitySeq) {
1526             return seqNumber;
1527         } else if (curDiscontinuitySeq > discontinuitySeq) {
1528             return seqNumber <= 0 ? 0 : seqNumber - 1;
1529         }
1530 
1531         ++index;
1532     }
1533 
1534     return firstSeqNumberInPlaylist + mPlaylist->size();
1535 }
1536 
getSeqNumberForTime(int64_t timeUs) const1537 int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1538     size_t index = 0;
1539     int64_t segmentStartUs = 0;
1540     while (index < mPlaylist->size()) {
1541         sp<AMessage> itemMeta;
1542         CHECK(mPlaylist->itemAt(
1543                     index, NULL /* uri */, &itemMeta));
1544 
1545         int64_t itemDurationUs;
1546         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1547 
1548         if (timeUs < segmentStartUs + itemDurationUs) {
1549             break;
1550         }
1551 
1552         segmentStartUs += itemDurationUs;
1553         ++index;
1554     }
1555 
1556     if (index >= mPlaylist->size()) {
1557         index = mPlaylist->size() - 1;
1558     }
1559 
1560     return mPlaylist->getFirstSeqNumber() + index;
1561 }
1562 
setAccessUnitProperties(const sp<ABuffer> & accessUnit,const sp<AnotherPacketSource> & source,bool discard)1563 const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1564         const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1565     sp<MetaData> format = source->getFormat();
1566     if (format != NULL) {
1567         // for simplicity, store a reference to the format in each unit
1568         accessUnit->meta()->setObject("format", format);
1569     }
1570 
1571     if (discard) {
1572         accessUnit->meta()->setInt32("discard", discard);
1573     }
1574 
1575     accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1576     accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1577     accessUnit->meta()->setInt64("segmentFirstTimeUs", mSegmentFirstPTS);
1578     accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber));
1579     if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1580         accessUnit->meta()->setInt64("playlistTimeUs", mPlaylistTimeUs);
1581     }
1582     return accessUnit;
1583 }
1584 
isStartTimeReached(int64_t timeUs)1585 bool PlaylistFetcher::isStartTimeReached(int64_t timeUs) {
1586     if (!mFirstPTSValid) {
1587         mFirstTimeUs = timeUs;
1588         mFirstPTSValid = true;
1589     }
1590     bool startTimeReached = true;
1591     if (mStartTimeUsRelative) {
1592         FLOGV("startTimeUsRelative, timeUs (%lld) - %lld = %lld",
1593                 (long long)timeUs,
1594                 (long long)mFirstTimeUs,
1595                 (long long)(timeUs - mFirstTimeUs));
1596         timeUs -= mFirstTimeUs;
1597         if (timeUs < 0) {
1598             FLOGV("clamp negative timeUs to 0");
1599             timeUs = 0;
1600         }
1601         startTimeReached = (timeUs >= mStartTimeUs);
1602     }
1603     return startTimeReached;
1604 }
1605 
extractAndQueueAccessUnitsFromTs(const sp<ABuffer> & buffer)1606 status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1607     if (mTSParser == NULL) {
1608         // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1609         mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1610     }
1611 
1612     if (mNextPTSTimeUs >= 0ll) {
1613         sp<AMessage> extra = new AMessage;
1614         // Since we are using absolute timestamps, signal an offset of 0 to prevent
1615         // ATSParser from skewing the timestamps of access units.
1616         extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
1617 
1618         // When adapting, signal a recent media time to the parser,
1619         // so that PTS wrap around is handled for the new variant.
1620         if (mStartTimeUs >= 0 && !mStartTimeUsRelative) {
1621             extra->setInt64(IStreamListener::kKeyRecentMediaTimeUs, mStartTimeUs);
1622         }
1623 
1624         mTSParser->signalDiscontinuity(
1625                 ATSParser::DISCONTINUITY_TIME, extra);
1626 
1627         mNextPTSTimeUs = -1ll;
1628     }
1629 
1630     size_t offset = 0;
1631     while (offset + 188 <= buffer->size()) {
1632         status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1633 
1634         if (err != OK) {
1635             return err;
1636         }
1637 
1638         offset += 188;
1639     }
1640     // setRange to indicate consumed bytes.
1641     buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1642 
1643     if (mSegmentFirstPTS < 0ll) {
1644         // get the smallest first PTS from all streams present in this parser
1645         for (size_t i = mPacketSources.size(); i > 0;) {
1646             i--;
1647             const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1648             if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1649                 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1650                 return ERROR_MALFORMED;
1651             }
1652             if (stream == LiveSession::STREAMTYPE_METADATA) {
1653                 continue;
1654             }
1655             ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1656             sp<AnotherPacketSource> source =
1657                 static_cast<AnotherPacketSource *>(
1658                         mTSParser->getSource(type).get());
1659 
1660             if (source == NULL) {
1661                 continue;
1662             }
1663             sp<AMessage> meta = source->getMetaAfterLastDequeued(0);
1664             if (meta != NULL) {
1665                 int64_t timeUs;
1666                 CHECK(meta->findInt64("timeUs", &timeUs));
1667                 if (mSegmentFirstPTS < 0ll || timeUs < mSegmentFirstPTS) {
1668                     mSegmentFirstPTS = timeUs;
1669                 }
1670             }
1671         }
1672         if (mSegmentFirstPTS < 0ll) {
1673             // didn't find any TS packet, can return early
1674             return OK;
1675         }
1676         if (!mStartTimeUsRelative) {
1677             // mStartup
1678             //   mStartup is true until we have queued a packet for all the streams
1679             //   we are fetching. We queue packets whose timestamps are greater than
1680             //   mStartTimeUs.
1681             // mSegmentStartTimeUs >= 0
1682             //   mSegmentStartTimeUs is non-negative when adapting or switching tracks
1683             // adjustSeqNumberWithAnchorTime(timeUs) == true
1684             //   we guessed a seq number that's either too large or too small.
1685             // If this happens, we'll adjust mSeqNumber and restart fetching from new
1686             // location. Note that we only want to adjust once, so set mSegmentStartTimeUs
1687             // to -1 so that we don't enter this chunk next time.
1688             if (mStartup && mSegmentStartTimeUs >= 0
1689                     && adjustSeqNumberWithAnchorTime(mSegmentFirstPTS)) {
1690                 mStartTimeUsNotify = mNotify->dup();
1691                 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1692                 mStartTimeUsNotify->setString("uri", mURI);
1693                 mIDRFound = false;
1694                 mSegmentStartTimeUs = -1;
1695                 return -EAGAIN;
1696             }
1697         }
1698     }
1699 
1700     status_t err = OK;
1701     for (size_t i = mPacketSources.size(); i > 0;) {
1702         i--;
1703         sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1704 
1705         const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1706         if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1707             ALOGE("MPEG2 Transport streams do not contain subtitles.");
1708             return ERROR_MALFORMED;
1709         }
1710 
1711         const char *key = LiveSession::getKeyForStream(stream);
1712         ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1713 
1714         sp<AnotherPacketSource> source =
1715             static_cast<AnotherPacketSource *>(
1716                     mTSParser->getSource(type).get());
1717 
1718         if (source == NULL) {
1719             continue;
1720         }
1721 
1722         const char *mime;
1723         sp<MetaData> format  = source->getFormat();
1724         bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime)
1725                 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
1726 
1727         sp<ABuffer> accessUnit;
1728         status_t finalResult;
1729         while (source->hasBufferAvailable(&finalResult)
1730                 && source->dequeueAccessUnit(&accessUnit) == OK) {
1731 
1732             int64_t timeUs;
1733             CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1734 
1735             if (mStartup) {
1736                 bool startTimeReached = isStartTimeReached(timeUs);
1737 
1738                 if (!startTimeReached || (isAvc && !mIDRFound)) {
1739                     // buffer up to the closest preceding IDR frame in the next segement,
1740                     // or the closest succeeding IDR frame after the exact position
1741                     FSLOGV(stream, "timeUs(%lld)-mStartTimeUs(%lld)=%lld, mIDRFound=%d",
1742                             (long long)timeUs,
1743                             (long long)mStartTimeUs,
1744                             (long long)timeUs - mStartTimeUs,
1745                             mIDRFound);
1746                     if (isAvc) {
1747                         if (IsIDR(accessUnit)) {
1748                             mVideoBuffer->clear();
1749                             FSLOGV(stream, "found IDR, clear mVideoBuffer");
1750                             mIDRFound = true;
1751                         }
1752                         if (mIDRFound && mStartTimeUsRelative && !startTimeReached) {
1753                             mVideoBuffer->queueAccessUnit(accessUnit);
1754                             FSLOGV(stream, "saving AVC video AccessUnit");
1755                         }
1756                     }
1757                     if (!startTimeReached || (isAvc && !mIDRFound)) {
1758                         continue;
1759                     }
1760                 }
1761             }
1762 
1763             if (mStartTimeUsNotify != NULL) {
1764                 uint32_t streamMask = 0;
1765                 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1766                 if ((mStreamTypeMask & mPacketSources.keyAt(i))
1767                         && !(streamMask & mPacketSources.keyAt(i))) {
1768                     streamMask |= mPacketSources.keyAt(i);
1769                     mStartTimeUsNotify->setInt32("streamMask", streamMask);
1770                     FSLOGV(stream, "found start point, timeUs=%lld, streamMask becomes %x",
1771                             (long long)timeUs, streamMask);
1772 
1773                     if (streamMask == mStreamTypeMask) {
1774                         FLOGV("found start point for all streams");
1775                         mStartup = false;
1776                     }
1777                 }
1778             }
1779 
1780             if (mStopParams != NULL) {
1781                 int32_t discontinuitySeq;
1782                 int64_t stopTimeUs;
1783                 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1784                         || discontinuitySeq > mDiscontinuitySeq
1785                         || !mStopParams->findInt64(key, &stopTimeUs)
1786                         || (discontinuitySeq == mDiscontinuitySeq
1787                                 && timeUs >= stopTimeUs)) {
1788                     FSLOGV(stream, "reached stop point, timeUs=%lld", (long long)timeUs);
1789                     mStreamTypeMask &= ~stream;
1790                     mPacketSources.removeItemsAt(i);
1791                     break;
1792                 }
1793             }
1794 
1795             if (stream == LiveSession::STREAMTYPE_VIDEO) {
1796                 const bool discard = true;
1797                 status_t status;
1798                 while (mVideoBuffer->hasBufferAvailable(&status)) {
1799                     sp<ABuffer> videoBuffer;
1800                     mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1801                     setAccessUnitProperties(videoBuffer, source, discard);
1802                     packetSource->queueAccessUnit(videoBuffer);
1803                     int64_t bufferTimeUs;
1804                     CHECK(videoBuffer->meta()->findInt64("timeUs", &bufferTimeUs));
1805                     FSLOGV(stream, "queueAccessUnit (saved), timeUs=%lld",
1806                             (long long)bufferTimeUs);
1807                 }
1808             } else if (stream == LiveSession::STREAMTYPE_METADATA && !mHasMetadata) {
1809                 mHasMetadata = true;
1810                 sp<AMessage> notify = mNotify->dup();
1811                 notify->setInt32("what", kWhatMetadataDetected);
1812                 notify->post();
1813             }
1814 
1815             setAccessUnitProperties(accessUnit, source);
1816             packetSource->queueAccessUnit(accessUnit);
1817             FSLOGV(stream, "queueAccessUnit, timeUs=%lld", (long long)timeUs);
1818         }
1819 
1820         if (err != OK) {
1821             break;
1822         }
1823     }
1824 
1825     if (err != OK) {
1826         for (size_t i = mPacketSources.size(); i > 0;) {
1827             i--;
1828             sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1829             packetSource->clear();
1830         }
1831         return err;
1832     }
1833 
1834     if (!mStreamTypeMask) {
1835         // Signal gap is filled between original and new stream.
1836         FLOGV("reached stop point for all streams");
1837         return ERROR_OUT_OF_RANGE;
1838     }
1839 
1840     return OK;
1841 }
1842 
1843 /* static */
bufferStartsWithWebVTTMagicSequence(const sp<ABuffer> & buffer)1844 bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1845         const sp<ABuffer> &buffer) {
1846     size_t pos = 0;
1847 
1848     // skip possible BOM
1849     if (buffer->size() >= pos + 3 &&
1850             !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1851         pos += 3;
1852     }
1853 
1854     // accept WEBVTT followed by SPACE, TAB or (CR) LF
1855     if (buffer->size() < pos + 6 ||
1856             memcmp("WEBVTT", buffer->data() + pos, 6)) {
1857         return false;
1858     }
1859     pos += 6;
1860 
1861     if (buffer->size() == pos) {
1862         return true;
1863     }
1864 
1865     uint8_t sep = buffer->data()[pos];
1866     return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1867 }
1868 
extractAndQueueAccessUnits(const sp<ABuffer> & buffer,const sp<AMessage> & itemMeta)1869 status_t PlaylistFetcher::extractAndQueueAccessUnits(
1870         const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1871     if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1872         if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1873             ALOGE("This stream only contains subtitles.");
1874             return ERROR_MALFORMED;
1875         }
1876 
1877         const sp<AnotherPacketSource> packetSource =
1878             mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1879 
1880         int64_t durationUs;
1881         CHECK(itemMeta->findInt64("durationUs", &durationUs));
1882         buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1883         buffer->meta()->setInt64("durationUs", durationUs);
1884         buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1885         buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1886         buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
1887         packetSource->queueAccessUnit(buffer);
1888         return OK;
1889     }
1890 
1891     if (mNextPTSTimeUs >= 0ll) {
1892         mNextPTSTimeUs = -1ll;
1893     }
1894 
1895     // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1896     // stream prefixed by an ID3 tag.
1897 
1898     bool firstID3Tag = true;
1899     uint64_t PTS = 0;
1900 
1901     for (;;) {
1902         // Make sure to skip all ID3 tags preceding the audio data.
1903         // At least one must be present to provide the PTS timestamp.
1904 
1905         ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
1906         if (!id3.isValid()) {
1907             if (firstID3Tag) {
1908                 ALOGE("Unable to parse ID3 tag.");
1909                 return ERROR_MALFORMED;
1910             } else {
1911                 break;
1912             }
1913         }
1914 
1915         if (firstID3Tag) {
1916             bool found = false;
1917 
1918             ID3::Iterator it(id3, "PRIV");
1919             while (!it.done()) {
1920                 size_t length;
1921                 const uint8_t *data = it.getData(&length);
1922                 if (!data) {
1923                     return ERROR_MALFORMED;
1924                 }
1925 
1926                 static const char *kMatchName =
1927                     "com.apple.streaming.transportStreamTimestamp";
1928                 static const size_t kMatchNameLen = strlen(kMatchName);
1929 
1930                 if (length == kMatchNameLen + 1 + 8
1931                         && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
1932                     found = true;
1933                     PTS = U64_AT(&data[kMatchNameLen + 1]);
1934                 }
1935 
1936                 it.next();
1937             }
1938 
1939             if (!found) {
1940                 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
1941                 return ERROR_MALFORMED;
1942             }
1943         }
1944 
1945         // skip the ID3 tag
1946         buffer->setRange(
1947                 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
1948 
1949         firstID3Tag = false;
1950     }
1951 
1952     if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
1953         ALOGW("This stream only contains audio data!");
1954 
1955         mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
1956 
1957         if (mStreamTypeMask == 0) {
1958             return OK;
1959         }
1960     }
1961 
1962     sp<AnotherPacketSource> packetSource =
1963         mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
1964 
1965     if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
1966         ABitReader bits(buffer->data(), buffer->size());
1967 
1968         // adts_fixed_header
1969 
1970         CHECK_EQ(bits.getBits(12), 0xfffu);
1971         bits.skipBits(3);  // ID, layer
1972         bool protection_absent __unused = bits.getBits(1) != 0;
1973 
1974         unsigned profile = bits.getBits(2);
1975         CHECK_NE(profile, 3u);
1976         unsigned sampling_freq_index = bits.getBits(4);
1977         bits.getBits(1);  // private_bit
1978         unsigned channel_configuration = bits.getBits(3);
1979         CHECK_NE(channel_configuration, 0u);
1980         bits.skipBits(2);  // original_copy, home
1981 
1982         sp<MetaData> meta = MakeAACCodecSpecificData(
1983                 profile, sampling_freq_index, channel_configuration);
1984 
1985         meta->setInt32(kKeyIsADTS, true);
1986 
1987         packetSource->setFormat(meta);
1988     }
1989 
1990     int64_t numSamples = 0ll;
1991     int32_t sampleRate;
1992     CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
1993 
1994     int64_t timeUs = (PTS * 100ll) / 9ll;
1995     if (mStartup && !mFirstPTSValid) {
1996         mFirstPTSValid = true;
1997         mFirstTimeUs = timeUs;
1998     }
1999 
2000     if (mSegmentFirstPTS < 0ll) {
2001         mSegmentFirstPTS = timeUs;
2002         if (!mStartTimeUsRelative) {
2003             // Duplicated logic from how we handle .ts playlists.
2004             if (mStartup && mSegmentStartTimeUs >= 0
2005                     && adjustSeqNumberWithAnchorTime(timeUs)) {
2006                 mSegmentStartTimeUs = -1;
2007                 return -EAGAIN;
2008             }
2009         }
2010     }
2011 
2012     size_t offset = 0;
2013     while (offset < buffer->size()) {
2014         const uint8_t *adtsHeader = buffer->data() + offset;
2015         CHECK_LT(offset + 5, buffer->size());
2016 
2017         unsigned aac_frame_length =
2018             ((adtsHeader[3] & 3) << 11)
2019             | (adtsHeader[4] << 3)
2020             | (adtsHeader[5] >> 5);
2021 
2022         if (aac_frame_length == 0) {
2023             const uint8_t *id3Header = adtsHeader;
2024             if (!memcmp(id3Header, "ID3", 3)) {
2025                 ID3 id3(id3Header, buffer->size() - offset, true);
2026                 if (id3.isValid()) {
2027                     offset += id3.rawSize();
2028                     continue;
2029                 };
2030             }
2031             return ERROR_MALFORMED;
2032         }
2033 
2034         CHECK_LE(offset + aac_frame_length, buffer->size());
2035 
2036         int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
2037         offset += aac_frame_length;
2038 
2039         // Each AAC frame encodes 1024 samples.
2040         numSamples += 1024;
2041 
2042         if (mStartup) {
2043             int64_t startTimeUs = unitTimeUs;
2044             if (mStartTimeUsRelative) {
2045                 startTimeUs -= mFirstTimeUs;
2046                 if (startTimeUs  < 0) {
2047                     startTimeUs = 0;
2048                 }
2049             }
2050             if (startTimeUs < mStartTimeUs) {
2051                 continue;
2052             }
2053 
2054             if (mStartTimeUsNotify != NULL) {
2055                 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
2056                 mStartup = false;
2057             }
2058         }
2059 
2060         if (mStopParams != NULL) {
2061             int32_t discontinuitySeq;
2062             int64_t stopTimeUs;
2063             if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
2064                     || discontinuitySeq > mDiscontinuitySeq
2065                     || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
2066                     || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
2067                 mStreamTypeMask = 0;
2068                 mPacketSources.clear();
2069                 return ERROR_OUT_OF_RANGE;
2070             }
2071         }
2072 
2073         sp<ABuffer> unit = new ABuffer(aac_frame_length);
2074         memcpy(unit->data(), adtsHeader, aac_frame_length);
2075 
2076         unit->meta()->setInt64("timeUs", unitTimeUs);
2077         setAccessUnitProperties(unit, packetSource);
2078         packetSource->queueAccessUnit(unit);
2079     }
2080 
2081     return OK;
2082 }
2083 
updateDuration()2084 void PlaylistFetcher::updateDuration() {
2085     int64_t durationUs = 0ll;
2086     for (size_t index = 0; index < mPlaylist->size(); ++index) {
2087         sp<AMessage> itemMeta;
2088         CHECK(mPlaylist->itemAt(
2089                     index, NULL /* uri */, &itemMeta));
2090 
2091         int64_t itemDurationUs;
2092         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
2093 
2094         durationUs += itemDurationUs;
2095     }
2096 
2097     sp<AMessage> msg = mNotify->dup();
2098     msg->setInt32("what", kWhatDurationUpdate);
2099     msg->setInt64("durationUs", durationUs);
2100     msg->post();
2101 }
2102 
updateTargetDuration()2103 void PlaylistFetcher::updateTargetDuration() {
2104     sp<AMessage> msg = mNotify->dup();
2105     msg->setInt32("what", kWhatTargetDurationUpdate);
2106     msg->setInt64("targetDurationUs", mPlaylist->getTargetDuration());
2107     msg->post();
2108 }
2109 
2110 }  // namespace android
2111