• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "NuCachedSource2"
19 #include <utils/Log.h>
20 
21 #include "include/NuCachedSource2.h"
22 #include "include/HTTPBase.h"
23 
24 #include <cutils/properties.h>
25 #include <media/stagefright/foundation/ADebug.h>
26 #include <media/stagefright/foundation/AMessage.h>
27 #include <media/stagefright/MediaErrors.h>
28 
29 namespace android {
30 
31 struct PageCache {
32     PageCache(size_t pageSize);
33     ~PageCache();
34 
35     struct Page {
36         void *mData;
37         size_t mSize;
38     };
39 
40     Page *acquirePage();
41     void releasePage(Page *page);
42 
43     void appendPage(Page *page);
44     size_t releaseFromStart(size_t maxBytes);
45 
totalSizeandroid::PageCache46     size_t totalSize() const {
47         return mTotalSize;
48     }
49 
50     void copy(size_t from, void *data, size_t size);
51 
52 private:
53     size_t mPageSize;
54     size_t mTotalSize;
55 
56     List<Page *> mActivePages;
57     List<Page *> mFreePages;
58 
59     void freePages(List<Page *> *list);
60 
61     DISALLOW_EVIL_CONSTRUCTORS(PageCache);
62 };
63 
PageCache(size_t pageSize)64 PageCache::PageCache(size_t pageSize)
65     : mPageSize(pageSize),
66       mTotalSize(0) {
67 }
68 
~PageCache()69 PageCache::~PageCache() {
70     freePages(&mActivePages);
71     freePages(&mFreePages);
72 }
73 
freePages(List<Page * > * list)74 void PageCache::freePages(List<Page *> *list) {
75     List<Page *>::iterator it = list->begin();
76     while (it != list->end()) {
77         Page *page = *it;
78 
79         free(page->mData);
80         delete page;
81         page = NULL;
82 
83         ++it;
84     }
85 }
86 
acquirePage()87 PageCache::Page *PageCache::acquirePage() {
88     if (!mFreePages.empty()) {
89         List<Page *>::iterator it = mFreePages.begin();
90         Page *page = *it;
91         mFreePages.erase(it);
92 
93         return page;
94     }
95 
96     Page *page = new Page;
97     page->mData = malloc(mPageSize);
98     page->mSize = 0;
99 
100     return page;
101 }
102 
releasePage(Page * page)103 void PageCache::releasePage(Page *page) {
104     page->mSize = 0;
105     mFreePages.push_back(page);
106 }
107 
appendPage(Page * page)108 void PageCache::appendPage(Page *page) {
109     mTotalSize += page->mSize;
110     mActivePages.push_back(page);
111 }
112 
releaseFromStart(size_t maxBytes)113 size_t PageCache::releaseFromStart(size_t maxBytes) {
114     size_t bytesReleased = 0;
115 
116     while (maxBytes > 0 && !mActivePages.empty()) {
117         List<Page *>::iterator it = mActivePages.begin();
118 
119         Page *page = *it;
120 
121         if (maxBytes < page->mSize) {
122             break;
123         }
124 
125         mActivePages.erase(it);
126 
127         maxBytes -= page->mSize;
128         bytesReleased += page->mSize;
129 
130         releasePage(page);
131     }
132 
133     mTotalSize -= bytesReleased;
134     return bytesReleased;
135 }
136 
copy(size_t from,void * data,size_t size)137 void PageCache::copy(size_t from, void *data, size_t size) {
138     ALOGV("copy from %d size %d", from, size);
139 
140     if (size == 0) {
141         return;
142     }
143 
144     CHECK_LE(from + size, mTotalSize);
145 
146     size_t offset = 0;
147     List<Page *>::iterator it = mActivePages.begin();
148     while (from >= offset + (*it)->mSize) {
149         offset += (*it)->mSize;
150         ++it;
151     }
152 
153     size_t delta = from - offset;
154     size_t avail = (*it)->mSize - delta;
155 
156     if (avail >= size) {
157         memcpy(data, (const uint8_t *)(*it)->mData + delta, size);
158         return;
159     }
160 
161     memcpy(data, (const uint8_t *)(*it)->mData + delta, avail);
162     ++it;
163     data = (uint8_t *)data + avail;
164     size -= avail;
165 
166     while (size > 0) {
167         size_t copy = (*it)->mSize;
168         if (copy > size) {
169             copy = size;
170         }
171         memcpy(data, (*it)->mData, copy);
172         data = (uint8_t *)data + copy;
173         size -= copy;
174         ++it;
175     }
176 }
177 
178 ////////////////////////////////////////////////////////////////////////////////
179 
NuCachedSource2(const sp<DataSource> & source,const char * cacheConfig,bool disconnectAtHighwatermark)180 NuCachedSource2::NuCachedSource2(
181         const sp<DataSource> &source,
182         const char *cacheConfig,
183         bool disconnectAtHighwatermark)
184     : mSource(source),
185       mReflector(new AHandlerReflector<NuCachedSource2>(this)),
186       mLooper(new ALooper),
187       mCache(new PageCache(kPageSize)),
188       mCacheOffset(0),
189       mFinalStatus(OK),
190       mLastAccessPos(0),
191       mFetching(true),
192       mLastFetchTimeUs(-1),
193       mNumRetriesLeft(kMaxNumRetries),
194       mHighwaterThresholdBytes(kDefaultHighWaterThreshold),
195       mLowwaterThresholdBytes(kDefaultLowWaterThreshold),
196       mKeepAliveIntervalUs(kDefaultKeepAliveIntervalUs),
197       mDisconnectAtHighwatermark(disconnectAtHighwatermark) {
198     // We are NOT going to support disconnect-at-highwatermark indefinitely
199     // and we are not guaranteeing support for client-specified cache
200     // parameters. Both of these are temporary measures to solve a specific
201     // problem that will be solved in a better way going forward.
202 
203     updateCacheParamsFromSystemProperty();
204 
205     if (cacheConfig != NULL) {
206         updateCacheParamsFromString(cacheConfig);
207     }
208 
209     if (mDisconnectAtHighwatermark) {
210         // Makes no sense to disconnect and do keep-alives...
211         mKeepAliveIntervalUs = 0;
212     }
213 
214     mLooper->setName("NuCachedSource2");
215     mLooper->registerHandler(mReflector);
216     mLooper->start();
217 
218     Mutex::Autolock autoLock(mLock);
219     (new AMessage(kWhatFetchMore, mReflector->id()))->post();
220 }
221 
~NuCachedSource2()222 NuCachedSource2::~NuCachedSource2() {
223     mLooper->stop();
224     mLooper->unregisterHandler(mReflector->id());
225 
226     delete mCache;
227     mCache = NULL;
228 }
229 
getEstimatedBandwidthKbps(int32_t * kbps)230 status_t NuCachedSource2::getEstimatedBandwidthKbps(int32_t *kbps) {
231     if (mSource->flags() & kIsHTTPBasedSource) {
232         HTTPBase* source = static_cast<HTTPBase *>(mSource.get());
233         return source->getEstimatedBandwidthKbps(kbps);
234     }
235     return ERROR_UNSUPPORTED;
236 }
237 
setCacheStatCollectFreq(int32_t freqMs)238 status_t NuCachedSource2::setCacheStatCollectFreq(int32_t freqMs) {
239     if (mSource->flags() & kIsHTTPBasedSource) {
240         HTTPBase *source = static_cast<HTTPBase *>(mSource.get());
241         return source->setBandwidthStatCollectFreq(freqMs);
242     }
243     return ERROR_UNSUPPORTED;
244 }
245 
initCheck() const246 status_t NuCachedSource2::initCheck() const {
247     return mSource->initCheck();
248 }
249 
getSize(off64_t * size)250 status_t NuCachedSource2::getSize(off64_t *size) {
251     return mSource->getSize(size);
252 }
253 
flags()254 uint32_t NuCachedSource2::flags() {
255     // Remove HTTP related flags since NuCachedSource2 is not HTTP-based.
256     uint32_t flags = mSource->flags() & ~(kWantsPrefetching | kIsHTTPBasedSource);
257     return (flags | kIsCachingDataSource);
258 }
259 
onMessageReceived(const sp<AMessage> & msg)260 void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) {
261     switch (msg->what()) {
262         case kWhatFetchMore:
263         {
264             onFetch();
265             break;
266         }
267 
268         case kWhatRead:
269         {
270             onRead(msg);
271             break;
272         }
273 
274         default:
275             TRESPASS();
276     }
277 }
278 
fetchInternal()279 void NuCachedSource2::fetchInternal() {
280     ALOGV("fetchInternal");
281 
282     bool reconnect = false;
283 
284     {
285         Mutex::Autolock autoLock(mLock);
286         CHECK(mFinalStatus == OK || mNumRetriesLeft > 0);
287 
288         if (mFinalStatus != OK) {
289             --mNumRetriesLeft;
290 
291             reconnect = true;
292         }
293     }
294 
295     if (reconnect) {
296         status_t err =
297             mSource->reconnectAtOffset(mCacheOffset + mCache->totalSize());
298 
299         Mutex::Autolock autoLock(mLock);
300 
301         if (err == ERROR_UNSUPPORTED || err == -EPIPE) {
302             // These are errors that are not likely to go away even if we
303             // retry, i.e. the server doesn't support range requests or similar.
304             mNumRetriesLeft = 0;
305             return;
306         } else if (err != OK) {
307             ALOGI("The attempt to reconnect failed, %d retries remaining",
308                  mNumRetriesLeft);
309 
310             return;
311         }
312     }
313 
314     PageCache::Page *page = mCache->acquirePage();
315 
316     ssize_t n = mSource->readAt(
317             mCacheOffset + mCache->totalSize(), page->mData, kPageSize);
318 
319     Mutex::Autolock autoLock(mLock);
320 
321     if (n < 0) {
322         mFinalStatus = n;
323         if (n == ERROR_UNSUPPORTED || n == -EPIPE) {
324             // These are errors that are not likely to go away even if we
325             // retry, i.e. the server doesn't support range requests or similar.
326             mNumRetriesLeft = 0;
327         }
328 
329         ALOGE("source returned error %ld, %d retries left", n, mNumRetriesLeft);
330         mCache->releasePage(page);
331     } else if (n == 0) {
332         ALOGI("ERROR_END_OF_STREAM");
333 
334         mNumRetriesLeft = 0;
335         mFinalStatus = ERROR_END_OF_STREAM;
336 
337         mCache->releasePage(page);
338     } else {
339         if (mFinalStatus != OK) {
340             ALOGI("retrying a previously failed read succeeded.");
341         }
342         mNumRetriesLeft = kMaxNumRetries;
343         mFinalStatus = OK;
344 
345         page->mSize = n;
346         mCache->appendPage(page);
347     }
348 }
349 
onFetch()350 void NuCachedSource2::onFetch() {
351     ALOGV("onFetch");
352 
353     if (mFinalStatus != OK && mNumRetriesLeft == 0) {
354         ALOGV("EOS reached, done prefetching for now");
355         mFetching = false;
356     }
357 
358     bool keepAlive =
359         !mFetching
360             && mFinalStatus == OK
361             && mKeepAliveIntervalUs > 0
362             && ALooper::GetNowUs() >= mLastFetchTimeUs + mKeepAliveIntervalUs;
363 
364     if (mFetching || keepAlive) {
365         if (keepAlive) {
366             ALOGI("Keep alive");
367         }
368 
369         fetchInternal();
370 
371         mLastFetchTimeUs = ALooper::GetNowUs();
372 
373         if (mFetching && mCache->totalSize() >= mHighwaterThresholdBytes) {
374             ALOGI("Cache full, done prefetching for now");
375             mFetching = false;
376 
377             if (mDisconnectAtHighwatermark
378                     && (mSource->flags() & DataSource::kIsHTTPBasedSource)) {
379                 ALOGV("Disconnecting at high watermark");
380                 static_cast<HTTPBase *>(mSource.get())->disconnect();
381                 mFinalStatus = -EAGAIN;
382             }
383         }
384     } else {
385         Mutex::Autolock autoLock(mLock);
386         restartPrefetcherIfNecessary_l();
387     }
388 
389     int64_t delayUs;
390     if (mFetching) {
391         if (mFinalStatus != OK && mNumRetriesLeft > 0) {
392             // We failed this time and will try again in 3 seconds.
393             delayUs = 3000000ll;
394         } else {
395             delayUs = 0;
396         }
397     } else {
398         delayUs = 100000ll;
399     }
400 
401     (new AMessage(kWhatFetchMore, mReflector->id()))->post(delayUs);
402 }
403 
onRead(const sp<AMessage> & msg)404 void NuCachedSource2::onRead(const sp<AMessage> &msg) {
405     ALOGV("onRead");
406 
407     int64_t offset;
408     CHECK(msg->findInt64("offset", &offset));
409 
410     void *data;
411     CHECK(msg->findPointer("data", &data));
412 
413     size_t size;
414     CHECK(msg->findSize("size", &size));
415 
416     ssize_t result = readInternal(offset, data, size);
417 
418     if (result == -EAGAIN) {
419         msg->post(50000);
420         return;
421     }
422 
423     Mutex::Autolock autoLock(mLock);
424 
425     CHECK(mAsyncResult == NULL);
426 
427     mAsyncResult = new AMessage;
428     mAsyncResult->setInt32("result", result);
429 
430     mCondition.signal();
431 }
432 
restartPrefetcherIfNecessary_l(bool ignoreLowWaterThreshold,bool force)433 void NuCachedSource2::restartPrefetcherIfNecessary_l(
434         bool ignoreLowWaterThreshold, bool force) {
435     static const size_t kGrayArea = 1024 * 1024;
436 
437     if (mFetching || (mFinalStatus != OK && mNumRetriesLeft == 0)) {
438         return;
439     }
440 
441     if (!ignoreLowWaterThreshold && !force
442             && mCacheOffset + mCache->totalSize() - mLastAccessPos
443                 >= mLowwaterThresholdBytes) {
444         return;
445     }
446 
447     size_t maxBytes = mLastAccessPos - mCacheOffset;
448 
449     if (!force) {
450         if (maxBytes < kGrayArea) {
451             return;
452         }
453 
454         maxBytes -= kGrayArea;
455     }
456 
457     size_t actualBytes = mCache->releaseFromStart(maxBytes);
458     mCacheOffset += actualBytes;
459 
460     ALOGI("restarting prefetcher, totalSize = %d", mCache->totalSize());
461     mFetching = true;
462 }
463 
readAt(off64_t offset,void * data,size_t size)464 ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) {
465     Mutex::Autolock autoSerializer(mSerializer);
466 
467     ALOGV("readAt offset %lld, size %d", offset, size);
468 
469     Mutex::Autolock autoLock(mLock);
470 
471     // If the request can be completely satisfied from the cache, do so.
472 
473     if (offset >= mCacheOffset
474             && offset + size <= mCacheOffset + mCache->totalSize()) {
475         size_t delta = offset - mCacheOffset;
476         mCache->copy(delta, data, size);
477 
478         mLastAccessPos = offset + size;
479 
480         return size;
481     }
482 
483     sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id());
484     msg->setInt64("offset", offset);
485     msg->setPointer("data", data);
486     msg->setSize("size", size);
487 
488     CHECK(mAsyncResult == NULL);
489     msg->post();
490 
491     while (mAsyncResult == NULL) {
492         mCondition.wait(mLock);
493     }
494 
495     int32_t result;
496     CHECK(mAsyncResult->findInt32("result", &result));
497 
498     mAsyncResult.clear();
499 
500     if (result > 0) {
501         mLastAccessPos = offset + result;
502     }
503 
504     return (ssize_t)result;
505 }
506 
cachedSize()507 size_t NuCachedSource2::cachedSize() {
508     Mutex::Autolock autoLock(mLock);
509     return mCacheOffset + mCache->totalSize();
510 }
511 
approxDataRemaining(status_t * finalStatus) const512 size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) const {
513     Mutex::Autolock autoLock(mLock);
514     return approxDataRemaining_l(finalStatus);
515 }
516 
approxDataRemaining_l(status_t * finalStatus) const517 size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) const {
518     *finalStatus = mFinalStatus;
519 
520     if (mFinalStatus != OK && mNumRetriesLeft > 0) {
521         // Pretend that everything is fine until we're out of retries.
522         *finalStatus = OK;
523     }
524 
525     off64_t lastBytePosCached = mCacheOffset + mCache->totalSize();
526     if (mLastAccessPos < lastBytePosCached) {
527         return lastBytePosCached - mLastAccessPos;
528     }
529     return 0;
530 }
531 
readInternal(off64_t offset,void * data,size_t size)532 ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) {
533     CHECK_LE(size, (size_t)mHighwaterThresholdBytes);
534 
535     ALOGV("readInternal offset %lld size %d", offset, size);
536 
537     Mutex::Autolock autoLock(mLock);
538 
539     if (!mFetching) {
540         mLastAccessPos = offset;
541         restartPrefetcherIfNecessary_l(
542                 false, // ignoreLowWaterThreshold
543                 true); // force
544     }
545 
546     if (offset < mCacheOffset
547             || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) {
548         static const off64_t kPadding = 256 * 1024;
549 
550         // In the presence of multiple decoded streams, once of them will
551         // trigger this seek request, the other one will request data "nearby"
552         // soon, adjust the seek position so that that subsequent request
553         // does not trigger another seek.
554         off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0;
555 
556         seekInternal_l(seekOffset);
557     }
558 
559     size_t delta = offset - mCacheOffset;
560 
561     if (mFinalStatus != OK && mNumRetriesLeft == 0) {
562         if (delta >= mCache->totalSize()) {
563             return mFinalStatus;
564         }
565 
566         size_t avail = mCache->totalSize() - delta;
567 
568         if (avail > size) {
569             avail = size;
570         }
571 
572         mCache->copy(delta, data, avail);
573 
574         return avail;
575     }
576 
577     if (offset + size <= mCacheOffset + mCache->totalSize()) {
578         mCache->copy(delta, data, size);
579 
580         return size;
581     }
582 
583     ALOGV("deferring read");
584 
585     return -EAGAIN;
586 }
587 
seekInternal_l(off64_t offset)588 status_t NuCachedSource2::seekInternal_l(off64_t offset) {
589     mLastAccessPos = offset;
590 
591     if (offset >= mCacheOffset
592             && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) {
593         return OK;
594     }
595 
596     ALOGI("new range: offset= %lld", offset);
597 
598     mCacheOffset = offset;
599 
600     size_t totalSize = mCache->totalSize();
601     CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
602 
603     mNumRetriesLeft = kMaxNumRetries;
604     mFetching = true;
605 
606     return OK;
607 }
608 
resumeFetchingIfNecessary()609 void NuCachedSource2::resumeFetchingIfNecessary() {
610     Mutex::Autolock autoLock(mLock);
611 
612     restartPrefetcherIfNecessary_l(true /* ignore low water threshold */);
613 }
614 
DrmInitialization(const char * mime)615 sp<DecryptHandle> NuCachedSource2::DrmInitialization(const char* mime) {
616     return mSource->DrmInitialization(mime);
617 }
618 
getDrmInfo(sp<DecryptHandle> & handle,DrmManagerClient ** client)619 void NuCachedSource2::getDrmInfo(sp<DecryptHandle> &handle, DrmManagerClient **client) {
620     mSource->getDrmInfo(handle, client);
621 }
622 
getUri()623 String8 NuCachedSource2::getUri() {
624     return mSource->getUri();
625 }
626 
getMIMEType() const627 String8 NuCachedSource2::getMIMEType() const {
628     return mSource->getMIMEType();
629 }
630 
updateCacheParamsFromSystemProperty()631 void NuCachedSource2::updateCacheParamsFromSystemProperty() {
632     char value[PROPERTY_VALUE_MAX];
633     if (!property_get("media.stagefright.cache-params", value, NULL)) {
634         return;
635     }
636 
637     updateCacheParamsFromString(value);
638 }
639 
updateCacheParamsFromString(const char * s)640 void NuCachedSource2::updateCacheParamsFromString(const char *s) {
641     ssize_t lowwaterMarkKb, highwaterMarkKb;
642     int keepAliveSecs;
643 
644     if (sscanf(s, "%ld/%ld/%d",
645                &lowwaterMarkKb, &highwaterMarkKb, &keepAliveSecs) != 3) {
646         ALOGE("Failed to parse cache parameters from '%s'.", s);
647         return;
648     }
649 
650     if (lowwaterMarkKb >= 0) {
651         mLowwaterThresholdBytes = lowwaterMarkKb * 1024;
652     } else {
653         mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
654     }
655 
656     if (highwaterMarkKb >= 0) {
657         mHighwaterThresholdBytes = highwaterMarkKb * 1024;
658     } else {
659         mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
660     }
661 
662     if (mLowwaterThresholdBytes >= mHighwaterThresholdBytes) {
663         ALOGE("Illegal low/highwater marks specified, reverting to defaults.");
664 
665         mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
666         mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
667     }
668 
669     if (keepAliveSecs >= 0) {
670         mKeepAliveIntervalUs = keepAliveSecs * 1000000ll;
671     } else {
672         mKeepAliveIntervalUs = kDefaultKeepAliveIntervalUs;
673     }
674 
675     ALOGV("lowwater = %d bytes, highwater = %d bytes, keepalive = %lld us",
676          mLowwaterThresholdBytes,
677          mHighwaterThresholdBytes,
678          mKeepAliveIntervalUs);
679 }
680 
681 // static
RemoveCacheSpecificHeaders(KeyedVector<String8,String8> * headers,String8 * cacheConfig,bool * disconnectAtHighwatermark)682 void NuCachedSource2::RemoveCacheSpecificHeaders(
683         KeyedVector<String8, String8> *headers,
684         String8 *cacheConfig,
685         bool *disconnectAtHighwatermark) {
686     *cacheConfig = String8();
687     *disconnectAtHighwatermark = false;
688 
689     if (headers == NULL) {
690         return;
691     }
692 
693     ssize_t index;
694     if ((index = headers->indexOfKey(String8("x-cache-config"))) >= 0) {
695         *cacheConfig = headers->valueAt(index);
696 
697         headers->removeItemsAt(index);
698 
699         ALOGV("Using special cache config '%s'", cacheConfig->string());
700     }
701 
702     if ((index = headers->indexOfKey(
703                     String8("x-disconnect-at-highwatermark"))) >= 0) {
704         *disconnectAtHighwatermark = true;
705         headers->removeItemsAt(index);
706 
707         ALOGV("Client requested disconnection at highwater mark");
708     }
709 }
710 
711 }  // namespace android
712