• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <inttypes.h>
18 
19 //#define LOG_NDEBUG 0
20 #define LOG_TAG "NuCachedSource2"
21 #include <utils/Log.h>
22 
23 #include "include/NuCachedSource2.h"
24 #include "include/HTTPBase.h"
25 
26 #include <cutils/properties.h>
27 #include <media/stagefright/foundation/ADebug.h>
28 #include <media/stagefright/foundation/AMessage.h>
29 #include <media/stagefright/MediaErrors.h>
30 
31 namespace android {
32 
33 struct PageCache {
34     PageCache(size_t pageSize);
35     ~PageCache();
36 
37     struct Page {
38         void *mData;
39         size_t mSize;
40     };
41 
42     Page *acquirePage();
43     void releasePage(Page *page);
44 
45     void appendPage(Page *page);
46     size_t releaseFromStart(size_t maxBytes);
47 
totalSizeandroid::PageCache48     size_t totalSize() const {
49         return mTotalSize;
50     }
51 
52     void copy(size_t from, void *data, size_t size);
53 
54 private:
55     size_t mPageSize;
56     size_t mTotalSize;
57 
58     List<Page *> mActivePages;
59     List<Page *> mFreePages;
60 
61     void freePages(List<Page *> *list);
62 
63     DISALLOW_EVIL_CONSTRUCTORS(PageCache);
64 };
65 
PageCache(size_t pageSize)66 PageCache::PageCache(size_t pageSize)
67     : mPageSize(pageSize),
68       mTotalSize(0) {
69 }
70 
~PageCache()71 PageCache::~PageCache() {
72     freePages(&mActivePages);
73     freePages(&mFreePages);
74 }
75 
freePages(List<Page * > * list)76 void PageCache::freePages(List<Page *> *list) {
77     List<Page *>::iterator it = list->begin();
78     while (it != list->end()) {
79         Page *page = *it;
80 
81         free(page->mData);
82         delete page;
83         page = NULL;
84 
85         ++it;
86     }
87 }
88 
acquirePage()89 PageCache::Page *PageCache::acquirePage() {
90     if (!mFreePages.empty()) {
91         List<Page *>::iterator it = mFreePages.begin();
92         Page *page = *it;
93         mFreePages.erase(it);
94 
95         return page;
96     }
97 
98     Page *page = new Page;
99     page->mData = malloc(mPageSize);
100     page->mSize = 0;
101 
102     return page;
103 }
104 
releasePage(Page * page)105 void PageCache::releasePage(Page *page) {
106     page->mSize = 0;
107     mFreePages.push_back(page);
108 }
109 
appendPage(Page * page)110 void PageCache::appendPage(Page *page) {
111     mTotalSize += page->mSize;
112     mActivePages.push_back(page);
113 }
114 
releaseFromStart(size_t maxBytes)115 size_t PageCache::releaseFromStart(size_t maxBytes) {
116     size_t bytesReleased = 0;
117 
118     while (maxBytes > 0 && !mActivePages.empty()) {
119         List<Page *>::iterator it = mActivePages.begin();
120 
121         Page *page = *it;
122 
123         if (maxBytes < page->mSize) {
124             break;
125         }
126 
127         mActivePages.erase(it);
128 
129         maxBytes -= page->mSize;
130         bytesReleased += page->mSize;
131 
132         releasePage(page);
133     }
134 
135     mTotalSize -= bytesReleased;
136     return bytesReleased;
137 }
138 
copy(size_t from,void * data,size_t size)139 void PageCache::copy(size_t from, void *data, size_t size) {
140     ALOGV("copy from %zu size %zu", from, size);
141 
142     if (size == 0) {
143         return;
144     }
145 
146     CHECK_LE(from + size, mTotalSize);
147 
148     size_t offset = 0;
149     List<Page *>::iterator it = mActivePages.begin();
150     while (from >= offset + (*it)->mSize) {
151         offset += (*it)->mSize;
152         ++it;
153     }
154 
155     size_t delta = from - offset;
156     size_t avail = (*it)->mSize - delta;
157 
158     if (avail >= size) {
159         memcpy(data, (const uint8_t *)(*it)->mData + delta, size);
160         return;
161     }
162 
163     memcpy(data, (const uint8_t *)(*it)->mData + delta, avail);
164     ++it;
165     data = (uint8_t *)data + avail;
166     size -= avail;
167 
168     while (size > 0) {
169         size_t copy = (*it)->mSize;
170         if (copy > size) {
171             copy = size;
172         }
173         memcpy(data, (*it)->mData, copy);
174         data = (uint8_t *)data + copy;
175         size -= copy;
176         ++it;
177     }
178 }
179 
180 ////////////////////////////////////////////////////////////////////////////////
181 
NuCachedSource2(const sp<DataSource> & source,const char * cacheConfig,bool disconnectAtHighwatermark)182 NuCachedSource2::NuCachedSource2(
183         const sp<DataSource> &source,
184         const char *cacheConfig,
185         bool disconnectAtHighwatermark)
186     : mSource(source),
187       mReflector(new AHandlerReflector<NuCachedSource2>(this)),
188       mLooper(new ALooper),
189       mCache(new PageCache(kPageSize)),
190       mCacheOffset(0),
191       mFinalStatus(OK),
192       mLastAccessPos(0),
193       mFetching(true),
194       mDisconnecting(false),
195       mLastFetchTimeUs(-1),
196       mNumRetriesLeft(kMaxNumRetries),
197       mHighwaterThresholdBytes(kDefaultHighWaterThreshold),
198       mLowwaterThresholdBytes(kDefaultLowWaterThreshold),
199       mKeepAliveIntervalUs(kDefaultKeepAliveIntervalUs),
200       mDisconnectAtHighwatermark(disconnectAtHighwatermark) {
201     // We are NOT going to support disconnect-at-highwatermark indefinitely
202     // and we are not guaranteeing support for client-specified cache
203     // parameters. Both of these are temporary measures to solve a specific
204     // problem that will be solved in a better way going forward.
205 
206     updateCacheParamsFromSystemProperty();
207 
208     if (cacheConfig != NULL) {
209         updateCacheParamsFromString(cacheConfig);
210     }
211 
212     if (mDisconnectAtHighwatermark) {
213         // Makes no sense to disconnect and do keep-alives...
214         mKeepAliveIntervalUs = 0;
215     }
216 
217     mLooper->setName("NuCachedSource2");
218     mLooper->registerHandler(mReflector);
219 
220     // Since it may not be obvious why our looper thread needs to be
221     // able to call into java since it doesn't appear to do so at all...
222     // IMediaHTTPConnection may be (and most likely is) implemented in JAVA
223     // and a local JAVA IBinder will call directly into JNI methods.
224     // So whenever we call DataSource::readAt it may end up in a call to
225     // IMediaHTTPConnection::readAt and therefore call back into JAVA.
226     mLooper->start(false /* runOnCallingThread */, true /* canCallJava */);
227 }
228 
~NuCachedSource2()229 NuCachedSource2::~NuCachedSource2() {
230     mLooper->stop();
231     mLooper->unregisterHandler(mReflector->id());
232 
233     delete mCache;
234     mCache = NULL;
235 }
236 
237 // static
Create(const sp<DataSource> & source,const char * cacheConfig,bool disconnectAtHighwatermark)238 sp<NuCachedSource2> NuCachedSource2::Create(
239         const sp<DataSource> &source,
240         const char *cacheConfig,
241         bool disconnectAtHighwatermark) {
242     sp<NuCachedSource2> instance = new NuCachedSource2(
243             source, cacheConfig, disconnectAtHighwatermark);
244     Mutex::Autolock autoLock(instance->mLock);
245     (new AMessage(kWhatFetchMore, instance->mReflector))->post();
246     return instance;
247 }
248 
getEstimatedBandwidthKbps(int32_t * kbps)249 status_t NuCachedSource2::getEstimatedBandwidthKbps(int32_t *kbps) {
250     if (mSource->flags() & kIsHTTPBasedSource) {
251         HTTPBase* source = static_cast<HTTPBase *>(mSource.get());
252         return source->getEstimatedBandwidthKbps(kbps);
253     }
254     return ERROR_UNSUPPORTED;
255 }
256 
disconnect()257 void NuCachedSource2::disconnect() {
258     if (mSource->flags() & kIsHTTPBasedSource) {
259         ALOGV("disconnecting HTTPBasedSource");
260 
261         {
262             Mutex::Autolock autoLock(mLock);
263             // set mDisconnecting to true, if a fetch returns after
264             // this, the source will be marked as EOS.
265             mDisconnecting = true;
266 
267             // explicitly signal mCondition so that the pending readAt()
268             // will immediately return
269             mCondition.signal();
270         }
271 
272         // explicitly disconnect from the source, to allow any
273         // pending reads to return more promptly
274         static_cast<HTTPBase *>(mSource.get())->disconnect();
275     }
276 }
277 
setCacheStatCollectFreq(int32_t freqMs)278 status_t NuCachedSource2::setCacheStatCollectFreq(int32_t freqMs) {
279     if (mSource->flags() & kIsHTTPBasedSource) {
280         HTTPBase *source = static_cast<HTTPBase *>(mSource.get());
281         return source->setBandwidthStatCollectFreq(freqMs);
282     }
283     return ERROR_UNSUPPORTED;
284 }
285 
initCheck() const286 status_t NuCachedSource2::initCheck() const {
287     return mSource->initCheck();
288 }
289 
getSize(off64_t * size)290 status_t NuCachedSource2::getSize(off64_t *size) {
291     return mSource->getSize(size);
292 }
293 
flags()294 uint32_t NuCachedSource2::flags() {
295     // Remove HTTP related flags since NuCachedSource2 is not HTTP-based.
296     uint32_t flags = mSource->flags() & ~(kWantsPrefetching | kIsHTTPBasedSource);
297     return (flags | kIsCachingDataSource);
298 }
299 
onMessageReceived(const sp<AMessage> & msg)300 void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) {
301     switch (msg->what()) {
302         case kWhatFetchMore:
303         {
304             onFetch();
305             break;
306         }
307 
308         case kWhatRead:
309         {
310             onRead(msg);
311             break;
312         }
313 
314         default:
315             TRESPASS();
316     }
317 }
318 
fetchInternal()319 void NuCachedSource2::fetchInternal() {
320     ALOGV("fetchInternal");
321 
322     bool reconnect = false;
323 
324     {
325         Mutex::Autolock autoLock(mLock);
326         CHECK(mFinalStatus == OK || mNumRetriesLeft > 0);
327 
328         if (mFinalStatus != OK) {
329             --mNumRetriesLeft;
330 
331             reconnect = true;
332         }
333     }
334 
335     if (reconnect) {
336         status_t err =
337             mSource->reconnectAtOffset(mCacheOffset + mCache->totalSize());
338 
339         Mutex::Autolock autoLock(mLock);
340 
341         if (mDisconnecting) {
342             mNumRetriesLeft = 0;
343             mFinalStatus = ERROR_END_OF_STREAM;
344             return;
345         } else if (err == ERROR_UNSUPPORTED || err == -EPIPE) {
346             // These are errors that are not likely to go away even if we
347             // retry, i.e. the server doesn't support range requests or similar.
348             mNumRetriesLeft = 0;
349             return;
350         } else if (err != OK) {
351             ALOGI("The attempt to reconnect failed, %d retries remaining",
352                  mNumRetriesLeft);
353 
354             return;
355         }
356     }
357 
358     PageCache::Page *page = mCache->acquirePage();
359 
360     ssize_t n = mSource->readAt(
361             mCacheOffset + mCache->totalSize(), page->mData, kPageSize);
362 
363     Mutex::Autolock autoLock(mLock);
364 
365     if (n == 0 || mDisconnecting) {
366         ALOGI("caching reached eos.");
367 
368         mNumRetriesLeft = 0;
369         mFinalStatus = ERROR_END_OF_STREAM;
370 
371         mCache->releasePage(page);
372     } else if (n < 0) {
373         mFinalStatus = n;
374         if (n == ERROR_UNSUPPORTED || n == -EPIPE) {
375             // These are errors that are not likely to go away even if we
376             // retry, i.e. the server doesn't support range requests or similar.
377             mNumRetriesLeft = 0;
378         }
379 
380         ALOGE("source returned error %zd, %d retries left", n, mNumRetriesLeft);
381         mCache->releasePage(page);
382     } else {
383         if (mFinalStatus != OK) {
384             ALOGI("retrying a previously failed read succeeded.");
385         }
386         mNumRetriesLeft = kMaxNumRetries;
387         mFinalStatus = OK;
388 
389         page->mSize = n;
390         mCache->appendPage(page);
391     }
392 }
393 
onFetch()394 void NuCachedSource2::onFetch() {
395     ALOGV("onFetch");
396 
397     if (mFinalStatus != OK && mNumRetriesLeft == 0) {
398         ALOGV("EOS reached, done prefetching for now");
399         mFetching = false;
400     }
401 
402     bool keepAlive =
403         !mFetching
404             && mFinalStatus == OK
405             && mKeepAliveIntervalUs > 0
406             && ALooper::GetNowUs() >= mLastFetchTimeUs + mKeepAliveIntervalUs;
407 
408     if (mFetching || keepAlive) {
409         if (keepAlive) {
410             ALOGI("Keep alive");
411         }
412 
413         fetchInternal();
414 
415         mLastFetchTimeUs = ALooper::GetNowUs();
416 
417         if (mFetching && mCache->totalSize() >= mHighwaterThresholdBytes) {
418             ALOGI("Cache full, done prefetching for now");
419             mFetching = false;
420 
421             if (mDisconnectAtHighwatermark
422                     && (mSource->flags() & DataSource::kIsHTTPBasedSource)) {
423                 ALOGV("Disconnecting at high watermark");
424                 static_cast<HTTPBase *>(mSource.get())->disconnect();
425                 mFinalStatus = -EAGAIN;
426             }
427         }
428     } else {
429         Mutex::Autolock autoLock(mLock);
430         restartPrefetcherIfNecessary_l();
431     }
432 
433     int64_t delayUs;
434     if (mFetching) {
435         if (mFinalStatus != OK && mNumRetriesLeft > 0) {
436             // We failed this time and will try again in 3 seconds.
437             delayUs = 3000000ll;
438         } else {
439             delayUs = 0;
440         }
441     } else {
442         delayUs = 100000ll;
443     }
444 
445     (new AMessage(kWhatFetchMore, mReflector))->post(delayUs);
446 }
447 
onRead(const sp<AMessage> & msg)448 void NuCachedSource2::onRead(const sp<AMessage> &msg) {
449     ALOGV("onRead");
450 
451     int64_t offset;
452     CHECK(msg->findInt64("offset", &offset));
453 
454     void *data;
455     CHECK(msg->findPointer("data", &data));
456 
457     size_t size;
458     CHECK(msg->findSize("size", &size));
459 
460     ssize_t result = readInternal(offset, data, size);
461 
462     if (result == -EAGAIN) {
463         msg->post(50000);
464         return;
465     }
466 
467     Mutex::Autolock autoLock(mLock);
468     if (mDisconnecting) {
469         mCondition.signal();
470         return;
471     }
472 
473     CHECK(mAsyncResult == NULL);
474 
475     mAsyncResult = new AMessage;
476     mAsyncResult->setInt32("result", result);
477 
478     mCondition.signal();
479 }
480 
restartPrefetcherIfNecessary_l(bool ignoreLowWaterThreshold,bool force)481 void NuCachedSource2::restartPrefetcherIfNecessary_l(
482         bool ignoreLowWaterThreshold, bool force) {
483     static const size_t kGrayArea = 1024 * 1024;
484 
485     if (mFetching || (mFinalStatus != OK && mNumRetriesLeft == 0)) {
486         return;
487     }
488 
489     if (!ignoreLowWaterThreshold && !force
490             && mCacheOffset + mCache->totalSize() - mLastAccessPos
491                 >= mLowwaterThresholdBytes) {
492         return;
493     }
494 
495     size_t maxBytes = mLastAccessPos - mCacheOffset;
496 
497     if (!force) {
498         if (maxBytes < kGrayArea) {
499             return;
500         }
501 
502         maxBytes -= kGrayArea;
503     }
504 
505     size_t actualBytes = mCache->releaseFromStart(maxBytes);
506     mCacheOffset += actualBytes;
507 
508     ALOGI("restarting prefetcher, totalSize = %zu", mCache->totalSize());
509     mFetching = true;
510 }
511 
readAt(off64_t offset,void * data,size_t size)512 ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) {
513     Mutex::Autolock autoSerializer(mSerializer);
514 
515     ALOGV("readAt offset %lld, size %zu", (long long)offset, size);
516 
517     Mutex::Autolock autoLock(mLock);
518     if (mDisconnecting) {
519         return ERROR_END_OF_STREAM;
520     }
521 
522     // If the request can be completely satisfied from the cache, do so.
523 
524     if (offset >= mCacheOffset
525             && offset + size <= mCacheOffset + mCache->totalSize()) {
526         size_t delta = offset - mCacheOffset;
527         mCache->copy(delta, data, size);
528 
529         mLastAccessPos = offset + size;
530 
531         return size;
532     }
533 
534     sp<AMessage> msg = new AMessage(kWhatRead, mReflector);
535     msg->setInt64("offset", offset);
536     msg->setPointer("data", data);
537     msg->setSize("size", size);
538 
539     CHECK(mAsyncResult == NULL);
540     msg->post();
541 
542     while (mAsyncResult == NULL && !mDisconnecting) {
543         mCondition.wait(mLock);
544     }
545 
546     if (mDisconnecting) {
547         mAsyncResult.clear();
548         return ERROR_END_OF_STREAM;
549     }
550 
551     int32_t result;
552     CHECK(mAsyncResult->findInt32("result", &result));
553 
554     mAsyncResult.clear();
555 
556     if (result > 0) {
557         mLastAccessPos = offset + result;
558     }
559 
560     return (ssize_t)result;
561 }
562 
cachedSize()563 size_t NuCachedSource2::cachedSize() {
564     Mutex::Autolock autoLock(mLock);
565     return mCacheOffset + mCache->totalSize();
566 }
567 
approxDataRemaining(status_t * finalStatus) const568 size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) const {
569     Mutex::Autolock autoLock(mLock);
570     return approxDataRemaining_l(finalStatus);
571 }
572 
approxDataRemaining_l(status_t * finalStatus) const573 size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) const {
574     *finalStatus = mFinalStatus;
575 
576     if (mFinalStatus != OK && mNumRetriesLeft > 0) {
577         // Pretend that everything is fine until we're out of retries.
578         *finalStatus = OK;
579     }
580 
581     off64_t lastBytePosCached = mCacheOffset + mCache->totalSize();
582     if (mLastAccessPos < lastBytePosCached) {
583         return lastBytePosCached - mLastAccessPos;
584     }
585     return 0;
586 }
587 
readInternal(off64_t offset,void * data,size_t size)588 ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) {
589     CHECK_LE(size, (size_t)mHighwaterThresholdBytes);
590 
591     ALOGV("readInternal offset %lld size %zu", (long long)offset, size);
592 
593     Mutex::Autolock autoLock(mLock);
594 
595     // If we're disconnecting, return EOS and don't access *data pointer.
596     // data could be on the stack of the caller to NuCachedSource2::readAt(),
597     // which may have exited already.
598     if (mDisconnecting) {
599         return ERROR_END_OF_STREAM;
600     }
601 
602     if (!mFetching) {
603         mLastAccessPos = offset;
604         restartPrefetcherIfNecessary_l(
605                 false, // ignoreLowWaterThreshold
606                 true); // force
607     }
608 
609     if (offset < mCacheOffset
610             || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) {
611         static const off64_t kPadding = 256 * 1024;
612 
613         // In the presence of multiple decoded streams, once of them will
614         // trigger this seek request, the other one will request data "nearby"
615         // soon, adjust the seek position so that that subsequent request
616         // does not trigger another seek.
617         off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0;
618 
619         seekInternal_l(seekOffset);
620     }
621 
622     size_t delta = offset - mCacheOffset;
623 
624     if (mFinalStatus != OK && mNumRetriesLeft == 0) {
625         if (delta >= mCache->totalSize()) {
626             return mFinalStatus;
627         }
628 
629         size_t avail = mCache->totalSize() - delta;
630 
631         if (avail > size) {
632             avail = size;
633         }
634 
635         mCache->copy(delta, data, avail);
636 
637         return avail;
638     }
639 
640     if (offset + size <= mCacheOffset + mCache->totalSize()) {
641         mCache->copy(delta, data, size);
642 
643         return size;
644     }
645 
646     ALOGV("deferring read");
647 
648     return -EAGAIN;
649 }
650 
seekInternal_l(off64_t offset)651 status_t NuCachedSource2::seekInternal_l(off64_t offset) {
652     mLastAccessPos = offset;
653 
654     if (offset >= mCacheOffset
655             && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) {
656         return OK;
657     }
658 
659     ALOGI("new range: offset= %lld", (long long)offset);
660 
661     mCacheOffset = offset;
662 
663     size_t totalSize = mCache->totalSize();
664     CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
665 
666     mNumRetriesLeft = kMaxNumRetries;
667     mFetching = true;
668 
669     return OK;
670 }
671 
resumeFetchingIfNecessary()672 void NuCachedSource2::resumeFetchingIfNecessary() {
673     Mutex::Autolock autoLock(mLock);
674 
675     restartPrefetcherIfNecessary_l(true /* ignore low water threshold */);
676 }
677 
DrmInitialization(const char * mime)678 sp<DecryptHandle> NuCachedSource2::DrmInitialization(const char* mime) {
679     return mSource->DrmInitialization(mime);
680 }
681 
getDrmInfo(sp<DecryptHandle> & handle,DrmManagerClient ** client)682 void NuCachedSource2::getDrmInfo(sp<DecryptHandle> &handle, DrmManagerClient **client) {
683     mSource->getDrmInfo(handle, client);
684 }
685 
getUri()686 String8 NuCachedSource2::getUri() {
687     return mSource->getUri();
688 }
689 
getMIMEType() const690 String8 NuCachedSource2::getMIMEType() const {
691     return mSource->getMIMEType();
692 }
693 
updateCacheParamsFromSystemProperty()694 void NuCachedSource2::updateCacheParamsFromSystemProperty() {
695     char value[PROPERTY_VALUE_MAX];
696     if (!property_get("media.stagefright.cache-params", value, NULL)) {
697         return;
698     }
699 
700     updateCacheParamsFromString(value);
701 }
702 
updateCacheParamsFromString(const char * s)703 void NuCachedSource2::updateCacheParamsFromString(const char *s) {
704     ssize_t lowwaterMarkKb, highwaterMarkKb;
705     int keepAliveSecs;
706 
707     if (sscanf(s, "%zd/%zd/%d",
708                &lowwaterMarkKb, &highwaterMarkKb, &keepAliveSecs) != 3) {
709         ALOGE("Failed to parse cache parameters from '%s'.", s);
710         return;
711     }
712 
713     if (lowwaterMarkKb >= 0) {
714         mLowwaterThresholdBytes = lowwaterMarkKb * 1024;
715     } else {
716         mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
717     }
718 
719     if (highwaterMarkKb >= 0) {
720         mHighwaterThresholdBytes = highwaterMarkKb * 1024;
721     } else {
722         mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
723     }
724 
725     if (mLowwaterThresholdBytes >= mHighwaterThresholdBytes) {
726         ALOGE("Illegal low/highwater marks specified, reverting to defaults.");
727 
728         mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
729         mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
730     }
731 
732     if (keepAliveSecs >= 0) {
733         mKeepAliveIntervalUs = keepAliveSecs * 1000000ll;
734     } else {
735         mKeepAliveIntervalUs = kDefaultKeepAliveIntervalUs;
736     }
737 
738     ALOGV("lowwater = %zu bytes, highwater = %zu bytes, keepalive = %lld us",
739          mLowwaterThresholdBytes,
740          mHighwaterThresholdBytes,
741          (long long)mKeepAliveIntervalUs);
742 }
743 
744 // static
RemoveCacheSpecificHeaders(KeyedVector<String8,String8> * headers,String8 * cacheConfig,bool * disconnectAtHighwatermark)745 void NuCachedSource2::RemoveCacheSpecificHeaders(
746         KeyedVector<String8, String8> *headers,
747         String8 *cacheConfig,
748         bool *disconnectAtHighwatermark) {
749     *cacheConfig = String8();
750     *disconnectAtHighwatermark = false;
751 
752     if (headers == NULL) {
753         return;
754     }
755 
756     ssize_t index;
757     if ((index = headers->indexOfKey(String8("x-cache-config"))) >= 0) {
758         *cacheConfig = headers->valueAt(index);
759 
760         headers->removeItemsAt(index);
761 
762         ALOGV("Using special cache config '%s'", cacheConfig->string());
763     }
764 
765     if ((index = headers->indexOfKey(
766                     String8("x-disconnect-at-highwatermark"))) >= 0) {
767         *disconnectAtHighwatermark = true;
768         headers->removeItemsAt(index);
769 
770         ALOGV("Client requested disconnection at highwater mark");
771     }
772 }
773 
774 }  // namespace android
775