• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "LiveSession"
19 #include <utils/Log.h>
20 
21 #include "include/LiveSession.h"
22 
23 #include "LiveDataSource.h"
24 
25 #include "include/M3UParser.h"
26 #include "include/HTTPBase.h"
27 
28 #include <cutils/properties.h>
29 #include <media/stagefright/foundation/hexdump.h>
30 #include <media/stagefright/foundation/ABuffer.h>
31 #include <media/stagefright/foundation/ADebug.h>
32 #include <media/stagefright/foundation/AMessage.h>
33 #include <media/stagefright/DataSource.h>
34 #include <media/stagefright/FileSource.h>
35 #include <media/stagefright/MediaErrors.h>
36 
37 #include <ctype.h>
38 #include <openssl/aes.h>
39 #include <openssl/md5.h>
40 
41 namespace android {
42 
LiveSession(const sp<AMessage> & notify,uint32_t flags,bool uidValid,uid_t uid)43 LiveSession::LiveSession(
44         const sp<AMessage> &notify, uint32_t flags, bool uidValid, uid_t uid)
45     : mNotify(notify),
46       mFlags(flags),
47       mUIDValid(uidValid),
48       mUID(uid),
49       mInPreparationPhase(true),
50       mDataSource(new LiveDataSource),
51       mHTTPDataSource(
52               HTTPBase::Create(
53                   (mFlags & kFlagIncognito)
54                     ? HTTPBase::kFlagIncognito
55                     : 0)),
56       mPrevBandwidthIndex(-1),
57       mLastPlaylistFetchTimeUs(-1),
58       mSeqNumber(-1),
59       mSeekTimeUs(-1),
60       mNumRetries(0),
61       mStartOfPlayback(true),
62       mDurationUs(-1),
63       mDurationFixed(false),
64       mSeekDone(false),
65       mDisconnectPending(false),
66       mMonitorQueueGeneration(0),
67       mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY) {
68     if (mUIDValid) {
69         mHTTPDataSource->setUID(mUID);
70     }
71 }
72 
~LiveSession()73 LiveSession::~LiveSession() {
74 }
75 
getDataSource()76 sp<DataSource> LiveSession::getDataSource() {
77     return mDataSource;
78 }
79 
connect(const char * url,const KeyedVector<String8,String8> * headers)80 void LiveSession::connect(
81         const char *url, const KeyedVector<String8, String8> *headers) {
82     sp<AMessage> msg = new AMessage(kWhatConnect, id());
83     msg->setString("url", url);
84 
85     if (headers != NULL) {
86         msg->setPointer(
87                 "headers",
88                 new KeyedVector<String8, String8>(*headers));
89     }
90 
91     msg->post();
92 }
93 
disconnect()94 void LiveSession::disconnect() {
95     Mutex::Autolock autoLock(mLock);
96     mDisconnectPending = true;
97 
98     mHTTPDataSource->disconnect();
99 
100     (new AMessage(kWhatDisconnect, id()))->post();
101 }
102 
seekTo(int64_t timeUs)103 void LiveSession::seekTo(int64_t timeUs) {
104     Mutex::Autolock autoLock(mLock);
105     mSeekDone = false;
106 
107     sp<AMessage> msg = new AMessage(kWhatSeek, id());
108     msg->setInt64("timeUs", timeUs);
109     msg->post();
110 
111     while (!mSeekDone) {
112         mCondition.wait(mLock);
113     }
114 }
115 
onMessageReceived(const sp<AMessage> & msg)116 void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
117     switch (msg->what()) {
118         case kWhatConnect:
119             onConnect(msg);
120             break;
121 
122         case kWhatDisconnect:
123             onDisconnect();
124             break;
125 
126         case kWhatMonitorQueue:
127         {
128             int32_t generation;
129             CHECK(msg->findInt32("generation", &generation));
130 
131             if (generation != mMonitorQueueGeneration) {
132                 // Stale event
133                 break;
134             }
135 
136             onMonitorQueue();
137             break;
138         }
139 
140         case kWhatSeek:
141             onSeek(msg);
142             break;
143 
144         default:
145             TRESPASS();
146             break;
147     }
148 }
149 
150 // static
SortByBandwidth(const BandwidthItem * a,const BandwidthItem * b)151 int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
152     if (a->mBandwidth < b->mBandwidth) {
153         return -1;
154     } else if (a->mBandwidth == b->mBandwidth) {
155         return 0;
156     }
157 
158     return 1;
159 }
160 
onConnect(const sp<AMessage> & msg)161 void LiveSession::onConnect(const sp<AMessage> &msg) {
162     AString url;
163     CHECK(msg->findString("url", &url));
164 
165     KeyedVector<String8, String8> *headers = NULL;
166     if (!msg->findPointer("headers", (void **)&headers)) {
167         mExtraHeaders.clear();
168     } else {
169         mExtraHeaders = *headers;
170 
171         delete headers;
172         headers = NULL;
173     }
174 
175     ALOGI("onConnect <URL suppressed>");
176 
177     mMasterURL = url;
178 
179     bool dummy;
180     sp<M3UParser> playlist = fetchPlaylist(url.c_str(), &dummy);
181 
182     if (playlist == NULL) {
183         ALOGE("unable to fetch master playlist '%s'.", url.c_str());
184 
185         signalEOS(ERROR_IO);
186         return;
187     }
188 
189     if (playlist->isVariantPlaylist()) {
190         for (size_t i = 0; i < playlist->size(); ++i) {
191             BandwidthItem item;
192 
193             sp<AMessage> meta;
194             playlist->itemAt(i, &item.mURI, &meta);
195 
196             unsigned long bandwidth;
197             CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
198 
199             mBandwidthItems.push(item);
200         }
201 
202         CHECK_GT(mBandwidthItems.size(), 0u);
203 
204         mBandwidthItems.sort(SortByBandwidth);
205     }
206 
207     postMonitorQueue();
208 }
209 
onDisconnect()210 void LiveSession::onDisconnect() {
211     ALOGI("onDisconnect");
212 
213     signalEOS(ERROR_END_OF_STREAM);
214 
215     Mutex::Autolock autoLock(mLock);
216     mDisconnectPending = false;
217 }
218 
fetchFile(const char * url,sp<ABuffer> * out,int64_t range_offset,int64_t range_length)219 status_t LiveSession::fetchFile(
220         const char *url, sp<ABuffer> *out,
221         int64_t range_offset, int64_t range_length) {
222     *out = NULL;
223 
224     sp<DataSource> source;
225 
226     if (!strncasecmp(url, "file://", 7)) {
227         source = new FileSource(url + 7);
228     } else if (strncasecmp(url, "http://", 7)
229             && strncasecmp(url, "https://", 8)) {
230         return ERROR_UNSUPPORTED;
231     } else {
232         {
233             Mutex::Autolock autoLock(mLock);
234 
235             if (mDisconnectPending) {
236                 return ERROR_IO;
237             }
238         }
239 
240         KeyedVector<String8, String8> headers = mExtraHeaders;
241         if (range_offset > 0 || range_length >= 0) {
242             headers.add(
243                     String8("Range"),
244                     String8(
245                         StringPrintf(
246                             "bytes=%lld-%s",
247                             range_offset,
248                             range_length < 0
249                                 ? "" : StringPrintf("%lld", range_offset + range_length - 1).c_str()).c_str()));
250         }
251         status_t err = mHTTPDataSource->connect(url, &headers);
252 
253         if (err != OK) {
254             return err;
255         }
256 
257         source = mHTTPDataSource;
258     }
259 
260     off64_t size;
261     status_t err = source->getSize(&size);
262 
263     if (err != OK) {
264         size = 65536;
265     }
266 
267     sp<ABuffer> buffer = new ABuffer(size);
268     buffer->setRange(0, 0);
269 
270     for (;;) {
271         size_t bufferRemaining = buffer->capacity() - buffer->size();
272 
273         if (bufferRemaining == 0) {
274             bufferRemaining = 32768;
275 
276             ALOGV("increasing download buffer to %d bytes",
277                  buffer->size() + bufferRemaining);
278 
279             sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
280             memcpy(copy->data(), buffer->data(), buffer->size());
281             copy->setRange(0, buffer->size());
282 
283             buffer = copy;
284         }
285 
286         size_t maxBytesToRead = bufferRemaining;
287         if (range_length >= 0) {
288             int64_t bytesLeftInRange = range_length - buffer->size();
289             if (bytesLeftInRange < maxBytesToRead) {
290                 maxBytesToRead = bytesLeftInRange;
291 
292                 if (bytesLeftInRange == 0) {
293                     break;
294                 }
295             }
296         }
297 
298         ssize_t n = source->readAt(
299                 buffer->size(), buffer->data() + buffer->size(),
300                 maxBytesToRead);
301 
302         if (n < 0) {
303             return n;
304         }
305 
306         if (n == 0) {
307             break;
308         }
309 
310         buffer->setRange(0, buffer->size() + (size_t)n);
311     }
312 
313     *out = buffer;
314 
315     return OK;
316 }
317 
fetchPlaylist(const char * url,bool * unchanged)318 sp<M3UParser> LiveSession::fetchPlaylist(const char *url, bool *unchanged) {
319     ALOGV("fetchPlaylist '%s'", url);
320 
321     *unchanged = false;
322 
323     sp<ABuffer> buffer;
324     status_t err = fetchFile(url, &buffer);
325 
326     if (err != OK) {
327         return NULL;
328     }
329 
330     // MD5 functionality is not available on the simulator, treat all
331     // playlists as changed.
332 
333 #if defined(HAVE_ANDROID_OS)
334     uint8_t hash[16];
335 
336     MD5_CTX m;
337     MD5_Init(&m);
338     MD5_Update(&m, buffer->data(), buffer->size());
339 
340     MD5_Final(hash, &m);
341 
342     if (mPlaylist != NULL && !memcmp(hash, mPlaylistHash, 16)) {
343         // playlist unchanged
344 
345         if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
346             mRefreshState = (RefreshState)(mRefreshState + 1);
347         }
348 
349         *unchanged = true;
350 
351         ALOGV("Playlist unchanged, refresh state is now %d",
352              (int)mRefreshState);
353 
354         return NULL;
355     }
356 
357     memcpy(mPlaylistHash, hash, sizeof(hash));
358 
359     mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
360 #endif
361 
362     sp<M3UParser> playlist =
363         new M3UParser(url, buffer->data(), buffer->size());
364 
365     if (playlist->initCheck() != OK) {
366         ALOGE("failed to parse .m3u8 playlist");
367 
368         return NULL;
369     }
370 
371     return playlist;
372 }
373 
getSegmentStartTimeUs(int32_t seqNumber) const374 int64_t LiveSession::getSegmentStartTimeUs(int32_t seqNumber) const {
375     CHECK(mPlaylist != NULL);
376 
377     int32_t firstSeqNumberInPlaylist;
378     if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
379                 "media-sequence", &firstSeqNumberInPlaylist)) {
380         firstSeqNumberInPlaylist = 0;
381     }
382 
383     int32_t lastSeqNumberInPlaylist =
384         firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
385 
386     CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
387     CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
388 
389     int64_t segmentStartUs = 0ll;
390     for (int32_t index = 0;
391             index < seqNumber - firstSeqNumberInPlaylist; ++index) {
392         sp<AMessage> itemMeta;
393         CHECK(mPlaylist->itemAt(
394                     index, NULL /* uri */, &itemMeta));
395 
396         int64_t itemDurationUs;
397         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
398 
399         segmentStartUs += itemDurationUs;
400     }
401 
402     return segmentStartUs;
403 }
404 
uniformRand()405 static double uniformRand() {
406     return (double)rand() / RAND_MAX;
407 }
408 
getBandwidthIndex()409 size_t LiveSession::getBandwidthIndex() {
410     if (mBandwidthItems.size() == 0) {
411         return 0;
412     }
413 
414 #if 1
415     int32_t bandwidthBps;
416     if (mHTTPDataSource != NULL
417             && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
418         ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
419     } else {
420         ALOGV("no bandwidth estimate.");
421         return 0;  // Pick the lowest bandwidth stream by default.
422     }
423 
424     char value[PROPERTY_VALUE_MAX];
425     if (property_get("media.httplive.max-bw", value, NULL)) {
426         char *end;
427         long maxBw = strtoul(value, &end, 10);
428         if (end > value && *end == '\0') {
429             if (maxBw > 0 && bandwidthBps > maxBw) {
430                 ALOGV("bandwidth capped to %ld bps", maxBw);
431                 bandwidthBps = maxBw;
432             }
433         }
434     }
435 
436     // Consider only 80% of the available bandwidth usable.
437     bandwidthBps = (bandwidthBps * 8) / 10;
438 
439     // Pick the highest bandwidth stream below or equal to estimated bandwidth.
440 
441     size_t index = mBandwidthItems.size() - 1;
442     while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
443                             > (size_t)bandwidthBps) {
444         --index;
445     }
446 #elif 0
447     // Change bandwidth at random()
448     size_t index = uniformRand() * mBandwidthItems.size();
449 #elif 0
450     // There's a 50% chance to stay on the current bandwidth and
451     // a 50% chance to switch to the next higher bandwidth (wrapping around
452     // to lowest)
453     const size_t kMinIndex = 0;
454 
455     size_t index;
456     if (mPrevBandwidthIndex < 0) {
457         index = kMinIndex;
458     } else if (uniformRand() < 0.5) {
459         index = (size_t)mPrevBandwidthIndex;
460     } else {
461         index = mPrevBandwidthIndex + 1;
462         if (index == mBandwidthItems.size()) {
463             index = kMinIndex;
464         }
465     }
466 #elif 0
467     // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
468 
469     size_t index = mBandwidthItems.size() - 1;
470     while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
471         --index;
472     }
473 #else
474     size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
475 #endif
476 
477     return index;
478 }
479 
timeToRefreshPlaylist(int64_t nowUs) const480 bool LiveSession::timeToRefreshPlaylist(int64_t nowUs) const {
481     if (mPlaylist == NULL) {
482         CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
483         return true;
484     }
485 
486     int32_t targetDurationSecs;
487     CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
488 
489     int64_t targetDurationUs = targetDurationSecs * 1000000ll;
490 
491     int64_t minPlaylistAgeUs;
492 
493     switch (mRefreshState) {
494         case INITIAL_MINIMUM_RELOAD_DELAY:
495         {
496             size_t n = mPlaylist->size();
497             if (n > 0) {
498                 sp<AMessage> itemMeta;
499                 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
500 
501                 int64_t itemDurationUs;
502                 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
503 
504                 minPlaylistAgeUs = itemDurationUs;
505                 break;
506             }
507 
508             // fall through
509         }
510 
511         case FIRST_UNCHANGED_RELOAD_ATTEMPT:
512         {
513             minPlaylistAgeUs = targetDurationUs / 2;
514             break;
515         }
516 
517         case SECOND_UNCHANGED_RELOAD_ATTEMPT:
518         {
519             minPlaylistAgeUs = (targetDurationUs * 3) / 2;
520             break;
521         }
522 
523         case THIRD_UNCHANGED_RELOAD_ATTEMPT:
524         {
525             minPlaylistAgeUs = targetDurationUs * 3;
526             break;
527         }
528 
529         default:
530             TRESPASS();
531             break;
532     }
533 
534     return mLastPlaylistFetchTimeUs + minPlaylistAgeUs <= nowUs;
535 }
536 
onDownloadNext()537 void LiveSession::onDownloadNext() {
538     size_t bandwidthIndex = getBandwidthIndex();
539 
540 rinse_repeat:
541     int64_t nowUs = ALooper::GetNowUs();
542 
543     if (mLastPlaylistFetchTimeUs < 0
544             || (ssize_t)bandwidthIndex != mPrevBandwidthIndex
545             || (!mPlaylist->isComplete() && timeToRefreshPlaylist(nowUs))) {
546         AString url;
547         if (mBandwidthItems.size() > 0) {
548             url = mBandwidthItems.editItemAt(bandwidthIndex).mURI;
549         } else {
550             url = mMasterURL;
551         }
552 
553         if ((ssize_t)bandwidthIndex != mPrevBandwidthIndex) {
554             // If we switch bandwidths, do not pay any heed to whether
555             // playlists changed since the last time...
556             mPlaylist.clear();
557         }
558 
559         bool unchanged;
560         sp<M3UParser> playlist = fetchPlaylist(url.c_str(), &unchanged);
561         if (playlist == NULL) {
562             if (unchanged) {
563                 // We succeeded in fetching the playlist, but it was
564                 // unchanged from the last time we tried.
565             } else {
566                 ALOGE("failed to load playlist at url '%s'", url.c_str());
567                 signalEOS(ERROR_IO);
568 
569                 return;
570             }
571         } else {
572             mPlaylist = playlist;
573         }
574 
575         if (!mDurationFixed) {
576             Mutex::Autolock autoLock(mLock);
577 
578             if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
579                 mDurationUs = -1;
580                 mDurationFixed = true;
581             } else {
582                 mDurationUs = 0;
583                 for (size_t i = 0; i < mPlaylist->size(); ++i) {
584                     sp<AMessage> itemMeta;
585                     CHECK(mPlaylist->itemAt(
586                                 i, NULL /* uri */, &itemMeta));
587 
588                     int64_t itemDurationUs;
589                     CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
590 
591                     mDurationUs += itemDurationUs;
592                 }
593 
594                 mDurationFixed = mPlaylist->isComplete();
595             }
596         }
597 
598         mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
599     }
600 
601     int32_t firstSeqNumberInPlaylist;
602     if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
603                 "media-sequence", &firstSeqNumberInPlaylist)) {
604         firstSeqNumberInPlaylist = 0;
605     }
606 
607     bool seekDiscontinuity = false;
608     bool explicitDiscontinuity = false;
609     bool bandwidthChanged = false;
610 
611     if (mSeekTimeUs >= 0) {
612         if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
613             size_t index = 0;
614             int64_t segmentStartUs = 0;
615             while (index < mPlaylist->size()) {
616                 sp<AMessage> itemMeta;
617                 CHECK(mPlaylist->itemAt(
618                             index, NULL /* uri */, &itemMeta));
619 
620                 int64_t itemDurationUs;
621                 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
622 
623                 if (mSeekTimeUs < segmentStartUs + itemDurationUs) {
624                     break;
625                 }
626 
627                 segmentStartUs += itemDurationUs;
628                 ++index;
629             }
630 
631             if (index < mPlaylist->size()) {
632                 int32_t newSeqNumber = firstSeqNumberInPlaylist + index;
633 
634                 ALOGI("seeking to seq no %d", newSeqNumber);
635 
636                 mSeqNumber = newSeqNumber;
637 
638                 mDataSource->reset();
639 
640                 // reseting the data source will have had the
641                 // side effect of discarding any previously queued
642                 // bandwidth change discontinuity.
643                 // Therefore we'll need to treat these seek
644                 // discontinuities as involving a bandwidth change
645                 // even if they aren't directly.
646                 seekDiscontinuity = true;
647                 bandwidthChanged = true;
648             }
649         }
650 
651         mSeekTimeUs = -1;
652 
653         Mutex::Autolock autoLock(mLock);
654         mSeekDone = true;
655         mCondition.broadcast();
656     }
657 
658     const int32_t lastSeqNumberInPlaylist =
659         firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
660 
661     if (mSeqNumber < 0) {
662         if (mPlaylist->isComplete()) {
663             mSeqNumber = firstSeqNumberInPlaylist;
664         } else {
665             // If this is a live session, start 3 segments from the end.
666             mSeqNumber = lastSeqNumberInPlaylist - 3;
667             if (mSeqNumber < firstSeqNumberInPlaylist) {
668                 mSeqNumber = firstSeqNumberInPlaylist;
669             }
670         }
671     }
672 
673     if (mSeqNumber < firstSeqNumberInPlaylist
674             || mSeqNumber > lastSeqNumberInPlaylist) {
675         if (mPrevBandwidthIndex != (ssize_t)bandwidthIndex) {
676             // Go back to the previous bandwidth.
677 
678             ALOGI("new bandwidth does not have the sequence number "
679                  "we're looking for, switching back to previous bandwidth");
680 
681             mLastPlaylistFetchTimeUs = -1;
682             bandwidthIndex = mPrevBandwidthIndex;
683             goto rinse_repeat;
684         }
685 
686         if (!mPlaylist->isComplete() && mNumRetries < kMaxNumRetries) {
687             ++mNumRetries;
688 
689             if (mSeqNumber > lastSeqNumberInPlaylist) {
690                 mLastPlaylistFetchTimeUs = -1;
691                 postMonitorQueue(3000000ll);
692                 return;
693             }
694 
695             // we've missed the boat, let's start from the lowest sequence
696             // number available and signal a discontinuity.
697 
698             ALOGI("We've missed the boat, restarting playback.");
699             mSeqNumber = lastSeqNumberInPlaylist;
700             explicitDiscontinuity = true;
701 
702             // fall through
703         } else {
704             ALOGE("Cannot find sequence number %d in playlist "
705                  "(contains %d - %d)",
706                  mSeqNumber, firstSeqNumberInPlaylist,
707                  firstSeqNumberInPlaylist + mPlaylist->size() - 1);
708 
709             signalEOS(ERROR_END_OF_STREAM);
710             return;
711         }
712     }
713 
714     mNumRetries = 0;
715 
716     AString uri;
717     sp<AMessage> itemMeta;
718     CHECK(mPlaylist->itemAt(
719                 mSeqNumber - firstSeqNumberInPlaylist,
720                 &uri,
721                 &itemMeta));
722 
723     int32_t val;
724     if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
725         explicitDiscontinuity = true;
726     }
727 
728     int64_t range_offset, range_length;
729     if (!itemMeta->findInt64("range-offset", &range_offset)
730             || !itemMeta->findInt64("range-length", &range_length)) {
731         range_offset = 0;
732         range_length = -1;
733     }
734 
735     ALOGV("fetching segment %d from (%d .. %d)",
736           mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
737 
738     sp<ABuffer> buffer;
739     status_t err = fetchFile(uri.c_str(), &buffer, range_offset, range_length);
740     if (err != OK) {
741         ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
742         signalEOS(err);
743         return;
744     }
745 
746     CHECK(buffer != NULL);
747 
748     err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer);
749 
750     if (err != OK) {
751         ALOGE("decryptBuffer failed w/ error %d", err);
752 
753         signalEOS(err);
754         return;
755     }
756 
757     if (buffer->size() == 0 || buffer->data()[0] != 0x47) {
758         // Not a transport stream???
759 
760         ALOGE("This doesn't look like a transport stream...");
761 
762         mBandwidthItems.removeAt(bandwidthIndex);
763 
764         if (mBandwidthItems.isEmpty()) {
765             signalEOS(ERROR_UNSUPPORTED);
766             return;
767         }
768 
769         ALOGI("Retrying with a different bandwidth stream.");
770 
771         mLastPlaylistFetchTimeUs = -1;
772         bandwidthIndex = getBandwidthIndex();
773         mPrevBandwidthIndex = bandwidthIndex;
774         mSeqNumber = -1;
775 
776         goto rinse_repeat;
777     }
778 
779     if ((size_t)mPrevBandwidthIndex != bandwidthIndex) {
780         bandwidthChanged = true;
781     }
782 
783     if (mPrevBandwidthIndex < 0) {
784         // Don't signal a bandwidth change at the very beginning of
785         // playback.
786         bandwidthChanged = false;
787     }
788 
789     if (mStartOfPlayback) {
790         seekDiscontinuity = true;
791         mStartOfPlayback = false;
792     }
793 
794     if (seekDiscontinuity || explicitDiscontinuity || bandwidthChanged) {
795         // Signal discontinuity.
796 
797         ALOGI("queueing discontinuity (seek=%d, explicit=%d, bandwidthChanged=%d)",
798              seekDiscontinuity, explicitDiscontinuity, bandwidthChanged);
799 
800         sp<ABuffer> tmp = new ABuffer(188);
801         memset(tmp->data(), 0, tmp->size());
802 
803         // signal a 'hard' discontinuity for explicit or bandwidthChanged.
804         uint8_t type = (explicitDiscontinuity || bandwidthChanged) ? 1 : 0;
805 
806         if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
807             // If this was a live event this made no sense since
808             // we don't have access to all the segment before the current
809             // one.
810             int64_t segmentStartTimeUs = getSegmentStartTimeUs(mSeqNumber);
811             memcpy(tmp->data() + 2, &segmentStartTimeUs, sizeof(segmentStartTimeUs));
812 
813             type |= 2;
814         }
815 
816         tmp->data()[1] = type;
817 
818         mDataSource->queueBuffer(tmp);
819     }
820 
821     mDataSource->queueBuffer(buffer);
822 
823     mPrevBandwidthIndex = bandwidthIndex;
824     ++mSeqNumber;
825 
826     postMonitorQueue();
827 }
828 
signalEOS(status_t err)829 void LiveSession::signalEOS(status_t err) {
830     if (mInPreparationPhase && mNotify != NULL) {
831         sp<AMessage> notify = mNotify->dup();
832 
833         notify->setInt32(
834                 "what",
835                 err == ERROR_END_OF_STREAM
836                     ? kWhatPrepared : kWhatPreparationFailed);
837 
838         if (err != ERROR_END_OF_STREAM) {
839             notify->setInt32("err", err);
840         }
841 
842         notify->post();
843 
844         mInPreparationPhase = false;
845     }
846 
847     mDataSource->queueEOS(err);
848 }
849 
onMonitorQueue()850 void LiveSession::onMonitorQueue() {
851     if (mSeekTimeUs >= 0
852             || mDataSource->countQueuedBuffers() < kMaxNumQueuedFragments) {
853         onDownloadNext();
854     } else {
855         if (mInPreparationPhase) {
856             if (mNotify != NULL) {
857                 sp<AMessage> notify = mNotify->dup();
858                 notify->setInt32("what", kWhatPrepared);
859                 notify->post();
860             }
861 
862             mInPreparationPhase = false;
863         }
864 
865         postMonitorQueue(1000000ll);
866     }
867 }
868 
decryptBuffer(size_t playlistIndex,const sp<ABuffer> & buffer)869 status_t LiveSession::decryptBuffer(
870         size_t playlistIndex, const sp<ABuffer> &buffer) {
871     sp<AMessage> itemMeta;
872     bool found = false;
873     AString method;
874 
875     for (ssize_t i = playlistIndex; i >= 0; --i) {
876         AString uri;
877         CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
878 
879         if (itemMeta->findString("cipher-method", &method)) {
880             found = true;
881             break;
882         }
883     }
884 
885     if (!found) {
886         method = "NONE";
887     }
888 
889     if (method == "NONE") {
890         return OK;
891     } else if (!(method == "AES-128")) {
892         ALOGE("Unsupported cipher method '%s'", method.c_str());
893         return ERROR_UNSUPPORTED;
894     }
895 
896     AString keyURI;
897     if (!itemMeta->findString("cipher-uri", &keyURI)) {
898         ALOGE("Missing key uri");
899         return ERROR_MALFORMED;
900     }
901 
902     ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
903 
904     sp<ABuffer> key;
905     if (index >= 0) {
906         key = mAESKeyForURI.valueAt(index);
907     } else {
908         key = new ABuffer(16);
909 
910         sp<HTTPBase> keySource =
911               HTTPBase::Create(
912                   (mFlags & kFlagIncognito)
913                     ? HTTPBase::kFlagIncognito
914                     : 0);
915 
916         if (mUIDValid) {
917             keySource->setUID(mUID);
918         }
919 
920         status_t err =
921             keySource->connect(
922                     keyURI.c_str(),
923                     mExtraHeaders.isEmpty() ? NULL : &mExtraHeaders);
924 
925         if (err == OK) {
926             size_t offset = 0;
927             while (offset < 16) {
928                 ssize_t n = keySource->readAt(
929                         offset, key->data() + offset, 16 - offset);
930                 if (n <= 0) {
931                     err = ERROR_IO;
932                     break;
933                 }
934 
935                 offset += n;
936             }
937         }
938 
939         if (err != OK) {
940             ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
941             return ERROR_IO;
942         }
943 
944         mAESKeyForURI.add(keyURI, key);
945     }
946 
947     AES_KEY aes_key;
948     if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
949         ALOGE("failed to set AES decryption key.");
950         return UNKNOWN_ERROR;
951     }
952 
953     unsigned char aes_ivec[16];
954 
955     AString iv;
956     if (itemMeta->findString("cipher-iv", &iv)) {
957         if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
958                 || iv.size() != 16 * 2 + 2) {
959             ALOGE("malformed cipher IV '%s'.", iv.c_str());
960             return ERROR_MALFORMED;
961         }
962 
963         memset(aes_ivec, 0, sizeof(aes_ivec));
964         for (size_t i = 0; i < 16; ++i) {
965             char c1 = tolower(iv.c_str()[2 + 2 * i]);
966             char c2 = tolower(iv.c_str()[3 + 2 * i]);
967             if (!isxdigit(c1) || !isxdigit(c2)) {
968                 ALOGE("malformed cipher IV '%s'.", iv.c_str());
969                 return ERROR_MALFORMED;
970             }
971             uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
972             uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
973 
974             aes_ivec[i] = nibble1 << 4 | nibble2;
975         }
976     } else {
977         memset(aes_ivec, 0, sizeof(aes_ivec));
978         aes_ivec[15] = mSeqNumber & 0xff;
979         aes_ivec[14] = (mSeqNumber >> 8) & 0xff;
980         aes_ivec[13] = (mSeqNumber >> 16) & 0xff;
981         aes_ivec[12] = (mSeqNumber >> 24) & 0xff;
982     }
983 
984     AES_cbc_encrypt(
985             buffer->data(), buffer->data(), buffer->size(),
986             &aes_key, aes_ivec, AES_DECRYPT);
987 
988     // hexdump(buffer->data(), buffer->size());
989 
990     size_t n = buffer->size();
991     CHECK_GT(n, 0u);
992 
993     size_t pad = buffer->data()[n - 1];
994 
995     CHECK_GT(pad, 0u);
996     CHECK_LE(pad, 16u);
997     CHECK_GE((size_t)n, pad);
998     for (size_t i = 0; i < pad; ++i) {
999         CHECK_EQ((unsigned)buffer->data()[n - 1 - i], pad);
1000     }
1001 
1002     n -= pad;
1003 
1004     buffer->setRange(buffer->offset(), n);
1005 
1006     return OK;
1007 }
1008 
postMonitorQueue(int64_t delayUs)1009 void LiveSession::postMonitorQueue(int64_t delayUs) {
1010     sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
1011     msg->setInt32("generation", ++mMonitorQueueGeneration);
1012     msg->post(delayUs);
1013 }
1014 
onSeek(const sp<AMessage> & msg)1015 void LiveSession::onSeek(const sp<AMessage> &msg) {
1016     int64_t timeUs;
1017     CHECK(msg->findInt64("timeUs", &timeUs));
1018 
1019     mSeekTimeUs = timeUs;
1020     postMonitorQueue();
1021 }
1022 
getDuration(int64_t * durationUs) const1023 status_t LiveSession::getDuration(int64_t *durationUs) const {
1024     Mutex::Autolock autoLock(mLock);
1025     *durationUs = mDurationUs;
1026 
1027     return OK;
1028 }
1029 
isSeekable() const1030 bool LiveSession::isSeekable() const {
1031     int64_t durationUs;
1032     return getDuration(&durationUs) == OK && durationUs >= 0;
1033 }
1034 
hasDynamicDuration() const1035 bool LiveSession::hasDynamicDuration() const {
1036     return !mDurationFixed;
1037 }
1038 
1039 }  // namespace android
1040 
1041