1 /*
2 * Copyright (c) 2024-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <mutex>
17 #include <list>
18 #include <algorithm>
19 #include <cassert>
20 #include <limits>
21 #include <securec.h>
22 #include "media_cached_buffer.h"
23 #include "common/log.h"
24 #include "avcodec_log.h"
25 #include "avcodec_errors.h"
26
27 namespace OHOS {
28 namespace Media {
29 constexpr size_t CACHE_FRAGMENT_MAX_NUM_DEFAULT = 300; // Maximum number of fragment nodes
30 constexpr size_t CACHE_FRAGMENT_MAX_NUM_LARGE = 10; // Maximum number of fragment nodes
31 constexpr size_t CACHE_FRAGMENT_MIN_NUM_DEFAULT = 3; // Minimum number of fragment nodes
32 constexpr double NEW_FRAGMENT_INIT_CHUNK_NUM = 128.0; // Restricting the cache size of seek operation, 128 = 2MB
33 constexpr double NEW_FRAGMENT_NIT_DEFAULT_DENOMINATOR = 0.25;
34 constexpr double CACHE_RELEASE_FACTOR_DEFAULT = 10;
35 constexpr double TO_PERCENT = 100;
36 constexpr int64_t MAX_TOTAL_READ_SIZE = 2000000;
37 constexpr int64_t UP_LIMIT_MAX_TOTAL_READ_SIZE = 3000000;
38 constexpr int64_t ACCESS_OFFSET_MAX_LENGTH = 2 * 1024;
39
BoundedIntervalComp(int64_t mid,uint64_t start,int64_t end)40 inline constexpr bool BoundedIntervalComp(int64_t mid, uint64_t start, int64_t end)
41 {
42 return (static_cast<int64_t>(start) <= mid && mid <= end);
43 }
44
LeftBoundedRightOpenComp(int64_t mid,uint64_t start,int64_t end)45 inline constexpr bool LeftBoundedRightOpenComp(int64_t mid, uint64_t start, int64_t end)
46 {
47 return (static_cast<int64_t>(start) <= mid && mid < end);
48 }
49
IncreaseStep(uint8_t * & src,uint64_t & offset,size_t & writeSize,size_t step)50 inline void IncreaseStep(uint8_t*& src, uint64_t& offset, size_t& writeSize, size_t step)
51 {
52 src += step;
53 offset += static_cast<uint64_t>(step);
54 writeSize += step;
55 }
56
InitChunkInfo(CacheChunk & chunkInfo,uint64_t offset)57 inline void InitChunkInfo(CacheChunk& chunkInfo, uint64_t offset)
58 {
59 chunkInfo.offset = offset;
60 chunkInfo.dataLength = 0;
61 }
62
CacheMediaChunkBufferImpl()63 CacheMediaChunkBufferImpl::CacheMediaChunkBufferImpl()
64 : totalBuffSize_(0), totalReadSize_(0), chunkMaxNum_(0), chunkSize_(0), bufferAddr_(nullptr),
65 fragmentMaxNum_(CACHE_FRAGMENT_MAX_NUM_DEFAULT),
66 lruCache_(CACHE_FRAGMENT_MAX_NUM_DEFAULT) {}
67
~CacheMediaChunkBufferImpl()68 CacheMediaChunkBufferImpl::~CacheMediaChunkBufferImpl()
69 {
70 std::lock_guard lock(mutex_);
71 freeChunks_.clear();
72 fragmentCacheBuffer_.clear();
73 readPos_ = fragmentCacheBuffer_.end();
74 writePos_ = fragmentCacheBuffer_.end();
75 chunkMaxNum_ = 0;
76 totalReadSize_ = 0;
77 if (bufferAddr_ != nullptr) {
78 free(bufferAddr_);
79 bufferAddr_ = nullptr;
80 }
81 }
82
Init(uint64_t totalBuffSize,uint32_t chunkSize)83 bool CacheMediaChunkBufferImpl::Init(uint64_t totalBuffSize, uint32_t chunkSize)
84 {
85 if (isLargeOffsetSpan_) {
86 lruCache_.ReCacheSize(CACHE_FRAGMENT_MAX_NUM_LARGE);
87 } else {
88 lruCache_.ReCacheSize(CACHE_FRAGMENT_MAX_NUM_DEFAULT);
89 }
90 if (totalBuffSize == 0 || chunkSize == 0 || totalBuffSize < chunkSize) {
91 return false;
92 }
93 double newFragmentInitChunkNum = NEW_FRAGMENT_INIT_CHUNK_NUM;
94 uint64_t diff = (totalBuffSize + chunkSize) > 1 ? (totalBuffSize + chunkSize) - 1 : 0;
95 int64_t chunkNum = static_cast<int64_t>(diff / chunkSize) + 1;
96 if ((chunkNum - static_cast<int64_t>(newFragmentInitChunkNum)) < 0 ||
97 chunkNum > MAX_CACHE_BUFFER_SIZE) {
98 return false;
99 }
100 if (newFragmentInitChunkNum > static_cast<double>(chunkNum) * NEW_FRAGMENT_NIT_DEFAULT_DENOMINATOR) {
101 newFragmentInitChunkNum = std::max(1.0, static_cast<double>(chunkNum) * NEW_FRAGMENT_NIT_DEFAULT_DENOMINATOR);
102 }
103 std::lock_guard lock(mutex_);
104 if (bufferAddr_ != nullptr) {
105 return false;
106 }
107 readPos_ = fragmentCacheBuffer_.end();
108 writePos_ = fragmentCacheBuffer_.end();
109 size_t sizePerChunk = sizeof(CacheChunk) + chunkSize;
110 int64_t totalSize = static_cast<int64_t>(sizePerChunk) * chunkNum;
111 if (totalSize <= 0 || totalSize > MAX_CACHE_BUFFER_SIZE * CHUNK_SIZE) {
112 return false;
113 }
114 bufferAddr_ = static_cast<uint8_t*>(malloc(totalSize));
115 if (bufferAddr_ == nullptr) {
116 return false;
117 }
118 uint8_t* temp = bufferAddr_;
119 for (auto i = 0; i < chunkNum; ++i) {
120 auto chunkInfo = reinterpret_cast<CacheChunk*>(temp);
121 chunkInfo->offset = 0;
122 chunkInfo->dataLength = 0;
123 chunkInfo->chunkSize = static_cast<uint32_t>(chunkSize);
124 freeChunks_.push_back(chunkInfo);
125 temp += sizePerChunk;
126 }
127 chunkMaxNum_ = chunkNum >= 1 ? static_cast<uint32_t>(chunkNum) - 1 : 0; // -1
128 totalBuffSize_ = totalBuffSize;
129 chunkSize_ = chunkSize;
130 initReadSizeFactor_ = newFragmentInitChunkNum / (chunkMaxNum_ - newFragmentInitChunkNum);
131 loopInterruptClock_.Reset();
132 return true;
133 }
134
135 // Upadate the chunk read from the fragment
UpdateAccessPos(FragmentIterator & fragmentPos,ChunkIterator & chunkPos,uint64_t offsetChunk)136 void CacheMediaChunkBufferImpl::UpdateAccessPos(FragmentIterator& fragmentPos, ChunkIterator& chunkPos,
137 uint64_t offsetChunk)
138 {
139 if (chunkPos != fragmentPos->chunks.begin() && chunkPos == fragmentPos->chunks.end()) {
140 auto preChunkPos = std::prev(chunkPos);
141 if (((*preChunkPos)->offset + (*preChunkPos)->chunkSize) == offsetChunk) {
142 fragmentPos->accessPos = chunkPos;
143 } else {
144 fragmentPos->accessPos = preChunkPos;
145 }
146 } else if ((*chunkPos)->offset == offsetChunk) {
147 fragmentPos->accessPos = chunkPos;
148 } else {
149 fragmentPos->accessPos = std::prev(chunkPos);
150 }
151 }
152
Read(void * ptr,uint64_t offset,size_t readSize)153 size_t CacheMediaChunkBufferImpl::Read(void* ptr, uint64_t offset, size_t readSize)
154 {
155 std::lock_guard lock(mutex_);
156 size_t hasReadSize = 0;
157 uint8_t* dst = static_cast<uint8_t*>(ptr);
158 uint64_t hasReadOffset = offset;
159 size_t oneReadSize = ReadInner(dst, hasReadOffset, readSize);
160 hasReadSize = oneReadSize;
161 int64_t loopStartTime = loopInterruptClock_.ElapsedSeconds();
162 while (hasReadSize < readSize && oneReadSize != 0) {
163 if (CheckLoopTimeout(loopStartTime)) {
164 break;
165 }
166 dst += oneReadSize;
167 hasReadOffset += static_cast<uint64_t>(oneReadSize);
168 oneReadSize = ReadInner(dst, hasReadOffset, readSize - hasReadSize);
169 hasReadSize += oneReadSize;
170 }
171 return hasReadSize;
172 }
173
ReadInner(void * ptr,uint64_t offset,size_t readSize)174 size_t CacheMediaChunkBufferImpl::ReadInner(void* ptr, uint64_t offset, size_t readSize)
175 {
176 auto fragmentPos = GetOffsetFragmentCache(readPos_, offset, LeftBoundedRightOpenComp);
177 if (readSize == 0 || fragmentPos == fragmentCacheBuffer_.end()) {
178 return 0;
179 }
180 auto chunkPos = fragmentPos->accessPos;
181 if (chunkPos == fragmentPos->chunks.end() ||
182 offset < (*chunkPos)->offset ||
183 offset > (*chunkPos)->offset + (*chunkPos)->dataLength) {
184 chunkPos = GetOffsetChunkCache(fragmentPos->chunks, offset, LeftBoundedRightOpenComp);
185 }
186
187 uint8_t* dst = static_cast<uint8_t*>(ptr);
188 uint64_t offsetChunk = offset;
189 if (chunkPos != fragmentPos->chunks.end()) {
190 uint64_t readOffset = offset > fragmentPos->offsetBegin ? offset - fragmentPos->offsetBegin : 0;
191 uint64_t temp = readOffset > static_cast<uint64_t>(fragmentPos->accessLength) ?
192 readOffset - static_cast<uint64_t>(fragmentPos->accessLength) : 0;
193 if (temp >= ACCESS_OFFSET_MAX_LENGTH) {
194 chunkPos = SplitFragmentCacheBuffer(fragmentPos, offset, chunkPos);
195 }
196 size_t hasReadSize = 0;
197 int64_t loopStartTime = loopInterruptClock_.ElapsedSeconds();
198 while (hasReadSize < readSize && chunkPos != fragmentPos->chunks.end()) {
199 if (CheckLoopTimeout(loopStartTime)) {
200 break;
201 }
202 auto chunkInfo = *chunkPos;
203 uint64_t diff = offsetChunk > chunkInfo->offset ? offsetChunk - chunkInfo->offset : 0;
204 if (offsetChunk < chunkInfo->offset || diff > chunkInfo->dataLength) {
205 DumpAndCheckInner();
206 return 0;
207 }
208 uint64_t readDiff = chunkInfo->dataLength > diff ? chunkInfo->dataLength - diff : 0;
209 auto readOne = std::min(static_cast<size_t>(readDiff), readSize - hasReadSize);
210 errno_t res = memcpy_s(dst + hasReadSize, readOne, (*chunkPos)->data + diff, readOne);
211 FALSE_RETURN_V_MSG_E(res == EOK, 0, "memcpy_s data err");
212 hasReadSize += readOne;
213 offsetChunk += static_cast<uint64_t>(readOne);
214 chunkPos++;
215 }
216 UpdateAccessPos(fragmentPos, chunkPos, offsetChunk);
217 UpdateFragment(fragmentPos, hasReadSize, offsetChunk);
218 return hasReadSize;
219 }
220 return 0;
221 }
222
UpdateFragment(FragmentIterator & fragmentPos,size_t hasReadSize,uint64_t offsetChunk)223 void CacheMediaChunkBufferImpl::UpdateFragment(FragmentIterator& fragmentPos, size_t hasReadSize,
224 uint64_t offsetChunk)
225 {
226 uint64_t lengthDiff = offsetChunk > fragmentPos->offsetBegin ? offsetChunk - fragmentPos->offsetBegin : 0;
227 fragmentPos->accessLength = static_cast<int64_t>(lengthDiff);
228 fragmentPos->readTime = Clock::now();
229 fragmentPos->totalReadSize += hasReadSize;
230 totalReadSize_ += hasReadSize;
231 readPos_ = fragmentPos;
232 lruCache_.Refer(fragmentPos->offsetBegin, fragmentPos);
233 }
234
WriteInPlace(FragmentIterator & fragmentPos,uint8_t * ptr,uint64_t inOffset,size_t inWriteSize,size_t & outWriteSize)235 bool CacheMediaChunkBufferImpl::WriteInPlace(FragmentIterator& fragmentPos, uint8_t* ptr, uint64_t inOffset,
236 size_t inWriteSize, size_t& outWriteSize)
237 {
238 uint64_t offset = inOffset;
239 size_t writeSize = inWriteSize;
240 uint8_t* src = ptr;
241 auto& chunkList = fragmentPos->chunks;
242 outWriteSize = 0;
243 ChunkIterator chunkPos = std::upper_bound(chunkList.begin(), chunkList.end(), offset,
244 [](auto inputOffset, const CacheChunk* chunk) {
245 return (inputOffset <= chunk->offset + chunk->dataLength);
246 });
247 if (chunkPos == chunkList.end()) {
248 DumpInner(0);
249 return false;
250 }
251 size_t writeSizeTmp = 0;
252 auto chunkInfoTmp = *chunkPos;
253 uint64_t accessLengthTmp = inOffset > writePos_->offsetBegin ? inOffset - writePos_->offsetBegin : 0;
254 if (chunkInfoTmp->offset <= offset &&
255 offset < chunkInfoTmp->offset + static_cast<uint64_t>(chunkInfoTmp->dataLength)) {
256 size_t diff = static_cast<size_t>(offset > chunkInfoTmp->offset ? offset - chunkInfoTmp->offset : 0);
257 size_t copyLen = static_cast<size_t>(chunkInfoTmp->dataLength - diff);
258 copyLen = std::min(copyLen, writeSize);
259 errno_t res = memcpy_s(chunkInfoTmp->data + diff, copyLen, src, copyLen);
260 FALSE_RETURN_V_MSG_E(res == EOK, false, "memcpy_s data err");
261 IncreaseStep(src, offset, writeSizeTmp, copyLen);
262 if (writePos_->accessLength > static_cast<int64_t>(accessLengthTmp)) {
263 writePos_->accessPos = chunkPos;
264 writePos_->accessLength = static_cast<int64_t>(accessLengthTmp);
265 }
266 } else if (writePos_->accessLength > static_cast<int64_t>(accessLengthTmp)) {
267 writePos_->accessPos = std::next(chunkPos);
268 writePos_->accessLength = static_cast<int64_t>(accessLengthTmp);
269 }
270 ++chunkPos;
271 int64_t loopStartTime = loopInterruptClock_.ElapsedSeconds();
272 while (writeSizeTmp < writeSize && chunkPos != chunkList.end()) {
273 if (CheckLoopTimeout(loopStartTime)) {
274 break;
275 }
276 chunkInfoTmp = *chunkPos;
277 auto copyLen = std::min(chunkInfoTmp->dataLength, (uint32_t)(writeSize - writeSizeTmp));
278 errno_t res = memcpy_s(chunkInfoTmp->data, copyLen, src, copyLen);
279 FALSE_RETURN_V_MSG_E(res == EOK, false, "memcpy_s data err");
280 IncreaseStep(src, offset, writeSizeTmp, copyLen);
281 ++chunkPos;
282 }
283 outWriteSize = writeSizeTmp;
284 return true;
285 }
286
WriteMergerPre(uint64_t offset,size_t writeSize,FragmentIterator & nextFragmentPos)287 bool CacheMediaChunkBufferImpl::WriteMergerPre(uint64_t offset, size_t writeSize, FragmentIterator& nextFragmentPos)
288 {
289 nextFragmentPos = std::next(writePos_);
290 bool isLoop = true;
291 while (isLoop) {
292 if (nextFragmentPos == fragmentCacheBuffer_.end() ||
293 offset + static_cast<uint64_t>(writeSize) < nextFragmentPos->offsetBegin) {
294 nextFragmentPos = fragmentCacheBuffer_.end();
295 isLoop = false;
296 break;
297 }
298 if (offset + static_cast<uint64_t>(writeSize) <
299 nextFragmentPos->offsetBegin + static_cast<uint64_t>(nextFragmentPos->dataLength)) {
300 auto endPos = GetOffsetChunkCache(nextFragmentPos->chunks,
301 offset + static_cast<uint64_t>(writeSize), LeftBoundedRightOpenComp);
302 freeChunks_.splice(freeChunks_.end(), nextFragmentPos->chunks, nextFragmentPos->chunks.begin(), endPos);
303 if (endPos == nextFragmentPos->chunks.end()) {
304 nextFragmentPos = EraseFragmentCache(nextFragmentPos);
305 DumpInner(0);
306 return false;
307 }
308 auto &chunkInfo = *endPos;
309 uint64_t newOffset = offset + static_cast<uint64_t>(writeSize);
310 uint64_t dataLength = static_cast<uint64_t>(chunkInfo->dataLength);
311 uint64_t moveLen = std::max(chunkInfo->offset + dataLength, newOffset) - newOffset;
312 auto mergeDataLen = chunkInfo->dataLength > moveLen ? chunkInfo->dataLength - moveLen : 0;
313 errno_t res = memmove_s(chunkInfo->data, moveLen, chunkInfo->data + mergeDataLen, moveLen);
314 FALSE_RETURN_V_MSG_E(res == EOK, false, "memmove_s data err");
315 chunkInfo->offset = newOffset;
316 chunkInfo->dataLength = static_cast<uint32_t>(moveLen);
317 uint64_t lostLength = std::max(newOffset, nextFragmentPos->offsetBegin) - nextFragmentPos->offsetBegin;
318 nextFragmentPos->dataLength -= static_cast<int64_t>(lostLength);
319 lruCache_.Update(nextFragmentPos->offsetBegin, newOffset, nextFragmentPos);
320 nextFragmentPos->offsetBegin = newOffset;
321 nextFragmentPos->accessLength = 0;
322 nextFragmentPos->accessPos = nextFragmentPos->chunks.end();
323 isLoop = false;
324 break;
325 } else {
326 freeChunks_.splice(freeChunks_.end(), nextFragmentPos->chunks);
327 writePos_->totalReadSize += nextFragmentPos->totalReadSize;
328 nextFragmentPos->totalReadSize = 0; // avoid total size sub, chunk num reduce.
329 nextFragmentPos = EraseFragmentCache(nextFragmentPos);
330 }
331 }
332 return true;
333 }
334
WriteMergerPost(FragmentIterator & nextFragmentPos)335 void CacheMediaChunkBufferImpl::WriteMergerPost(FragmentIterator& nextFragmentPos)
336 {
337 if (nextFragmentPos == fragmentCacheBuffer_.end() || writePos_->chunks.empty() ||
338 nextFragmentPos->chunks.empty()) {
339 return;
340 }
341 auto preChunkInfo = writePos_->chunks.back();
342 auto nextChunkInfo = nextFragmentPos->chunks.front();
343 if (preChunkInfo->offset + preChunkInfo->dataLength != nextChunkInfo->offset) {
344 DumpAndCheckInner();
345 return;
346 }
347 writePos_->dataLength += nextFragmentPos->dataLength;
348 writePos_->totalReadSize += nextFragmentPos->totalReadSize;
349 nextFragmentPos->totalReadSize = 0; // avoid total size sub, chunk num reduce
350 writePos_->chunks.splice(writePos_->chunks.end(), nextFragmentPos->chunks);
351 EraseFragmentCache(nextFragmentPos);
352 }
353
Write(void * ptr,uint64_t inOffset,size_t inWriteSize)354 size_t CacheMediaChunkBufferImpl::Write(void* ptr, uint64_t inOffset, size_t inWriteSize)
355 {
356 std::lock_guard lock(mutex_);
357 uint64_t offset = inOffset;
358 size_t writeSize = inWriteSize;
359 uint8_t* src = static_cast<uint8_t*>(ptr);
360 size_t dupWriteSize = 0;
361
362 auto fragmentPos = GetOffsetFragmentCache(writePos_, offset, BoundedIntervalComp);
363 ChunkIterator chunkPos;
364 if (fragmentPos != fragmentCacheBuffer_.end()) {
365 auto& chunkList = fragmentPos->chunks;
366 writePos_ = fragmentPos;
367 if ((fragmentPos->offsetBegin + static_cast<uint64_t>(fragmentPos->dataLength)) != offset) {
368 auto ret = WriteInPlace(fragmentPos, src, offset, writeSize, dupWriteSize);
369 if (!ret || dupWriteSize >= writeSize) {
370 return ret ? writeSize : dupWriteSize;
371 }
372 src += dupWriteSize;
373 offset += dupWriteSize;
374 writeSize -= dupWriteSize;
375 }
376 chunkPos = std::prev(chunkList.end());
377 } else {
378 if (freeChunks_.empty()) {
379 MEDIA_LOG_D("no free chunk.");
380 }
381 MEDIA_LOG_D("not find fragment.");
382 chunkPos = AddFragmentCacheBuffer(offset);
383 }
384 FragmentIterator nextFragmentPos = fragmentCacheBuffer_.end();
385 auto success = WriteMergerPre(offset, writeSize, nextFragmentPos);
386 if (!success) {
387 return dupWriteSize;
388 }
389 auto writeSizeTmp = WriteChunk(*writePos_, chunkPos, src, offset, writeSize);
390 if (writeSize != writeSizeTmp) {
391 nextFragmentPos = fragmentCacheBuffer_.end();
392 }
393 WriteMergerPost(nextFragmentPos);
394 return writeSizeTmp + dupWriteSize;
395 }
396
Write(void * ptr,uint64_t inOffset,size_t inWriteSize)397 size_t CacheMediaChunkBufferHlsImpl::Write(void* ptr, uint64_t inOffset, size_t inWriteSize)
398 {
399 std::lock_guard lock(mutex_);
400 uint64_t offset = inOffset;
401 size_t writeSize = inWriteSize;
402 uint8_t* src = static_cast<uint8_t*>(ptr);
403 size_t dupWriteSize = 0;
404
405 auto fragmentPos = GetOffsetFragmentCache(writePos_, offset, BoundedIntervalComp);
406 ChunkIterator chunkPos;
407 if (fragmentPos != fragmentCacheBuffer_.end()) {
408 auto& chunkList = fragmentPos->chunks;
409 writePos_ = fragmentPos;
410 if ((fragmentPos->offsetBegin + static_cast<uint64_t>(fragmentPos->dataLength)) != offset) {
411 auto ret = WriteInPlace(fragmentPos, src, offset, writeSize, dupWriteSize);
412 if (!ret || dupWriteSize >= writeSize) {
413 return ret ? writeSize : dupWriteSize;
414 }
415 src += dupWriteSize;
416 offset += dupWriteSize;
417 writeSize -= dupWriteSize;
418 }
419 chunkPos = std::prev(chunkList.end());
420 } else {
421 if (freeChunks_.empty()) {
422 return 0; // 只有hls可以return 0
423 }
424 MEDIA_LOG_D("not find fragment.");
425 chunkPos = AddFragmentCacheBuffer(offset);
426 }
427 FragmentIterator nextFragmentPos = fragmentCacheBuffer_.end();
428 auto success = WriteMergerPre(offset, writeSize, nextFragmentPos);
429 if (!success) {
430 return dupWriteSize;
431 }
432 auto writeSizeTmp = WriteChunk(*writePos_, chunkPos, src, offset, writeSize);
433 if (writeSize != writeSizeTmp) {
434 nextFragmentPos = fragmentCacheBuffer_.end();
435 }
436 WriteMergerPost(nextFragmentPos);
437 return writeSizeTmp + dupWriteSize;
438 }
439
Seek(uint64_t offset)440 bool CacheMediaChunkBufferImpl::Seek(uint64_t offset)
441 {
442 std::lock_guard lock(mutex_);
443 auto readPos = GetOffsetFragmentCache(readPos_, offset, BoundedIntervalComp);
444 if (readPos != fragmentCacheBuffer_.end()) {
445 readPos_ = readPos;
446 bool isSeekHit = false;
447 auto chunkPos = GetOffsetChunkCache(readPos->chunks, offset, LeftBoundedRightOpenComp);
448 if (chunkPos != readPos->chunks.end()) {
449 auto readOffset = offset > readPos->offsetBegin ? offset - readPos->offsetBegin : 0;
450 uint64_t diff = readOffset > static_cast<uint64_t>(readPos->accessLength) ?
451 readOffset - static_cast<uint64_t>(readPos->accessLength) : 0;
452 if (diff >= ACCESS_OFFSET_MAX_LENGTH) {
453 chunkPos = SplitFragmentCacheBuffer(readPos, offset, chunkPos);
454 }
455
456 if (chunkPos == readPos->chunks.end()) {
457 return false;
458 }
459 lruCache_.Refer(readPos->offsetBegin, readPos);
460 (*readPos).accessPos = chunkPos;
461 auto tmpLength = offset > (*readPos).offsetBegin ? offset - (*readPos).offsetBegin : 0;
462 (*readPos).accessLength = static_cast<int64_t>(tmpLength);
463 readPos->readTime = Clock::now();
464 isSeekHit = true;
465 }
466 ResetReadSizeAlloc();
467 uint64_t newReadSizeInit = static_cast<uint64_t>(1 + initReadSizeFactor_ * static_cast<double>(totalReadSize_));
468 readPos->totalReadSize += newReadSizeInit;
469 totalReadSize_ += newReadSizeInit;
470 return isSeekHit;
471 }
472 return false;
473 }
474
GetBufferSize(uint64_t offset)475 size_t CacheMediaChunkBufferImpl::GetBufferSize(uint64_t offset)
476 {
477 std::lock_guard lock(mutex_);
478 auto readPos = GetOffsetFragmentCache(readPos_, offset, LeftBoundedRightOpenComp);
479 size_t bufferSize = 0;
480 while (readPos != fragmentCacheBuffer_.end()) {
481 uint64_t nextOffsetBegin = readPos->offsetBegin + static_cast<uint64_t>(readPos->dataLength);
482 bufferSize = static_cast<size_t>(nextOffsetBegin > offset ? nextOffsetBegin - offset : 0);
483 readPos++;
484 if (readPos == fragmentCacheBuffer_.end() || nextOffsetBegin != readPos->offsetBegin) {
485 break;
486 }
487 }
488 return bufferSize;
489 }
490
HandleFragmentPos(FragmentIterator & fragmentIter)491 void CacheMediaChunkBufferImpl::HandleFragmentPos(FragmentIterator& fragmentIter)
492 {
493 uint64_t nextOffsetBegin = fragmentIter->offsetBegin + static_cast<uint64_t>(fragmentIter->dataLength);
494 ++fragmentIter;
495 while (fragmentIter != fragmentCacheBuffer_.end()) {
496 if (nextOffsetBegin != fragmentIter->offsetBegin) {
497 break;
498 }
499 nextOffsetBegin = fragmentIter->offsetBegin + static_cast<uint64_t>(fragmentIter->dataLength);
500 ++fragmentIter;
501 }
502 }
503
GetNextBufferOffset(uint64_t offset)504 uint64_t CacheMediaChunkBufferImpl::GetNextBufferOffset(uint64_t offset)
505 {
506 std::lock_guard lock(mutex_);
507 auto fragmentIter = std::upper_bound(fragmentCacheBuffer_.begin(), fragmentCacheBuffer_.end(), offset,
508 [](auto inputOffset, const FragmentCacheBuffer& fragment) {
509 return (inputOffset < fragment.offsetBegin + fragment.dataLength);
510 });
511 if (fragmentIter != fragmentCacheBuffer_.end()) {
512 if (LeftBoundedRightOpenComp(offset, fragmentIter->offsetBegin,
513 fragmentIter->offsetBegin + fragmentIter->dataLength)) {
514 HandleFragmentPos(fragmentIter);
515 }
516 }
517 if (fragmentIter != fragmentCacheBuffer_.end()) {
518 return fragmentIter->offsetBegin;
519 }
520 return 0;
521 }
522
EraseFragmentCache(const FragmentIterator & iter)523 FragmentIterator CacheMediaChunkBufferImpl::EraseFragmentCache(const FragmentIterator& iter)
524 {
525 if (iter == readPos_) {
526 readPos_ = fragmentCacheBuffer_.end();
527 }
528 if (iter == writePos_) {
529 writePos_ = fragmentCacheBuffer_.end();
530 }
531 totalReadSize_ -= iter->totalReadSize;
532 lruCache_.Delete(iter->offsetBegin);
533 return fragmentCacheBuffer_.erase(iter);
534 }
535
WriteOneChunkData(CacheChunk & chunkInfo,uint8_t * src,uint64_t offset,size_t writeSize)536 inline size_t WriteOneChunkData(CacheChunk& chunkInfo, uint8_t* src, uint64_t offset, size_t writeSize)
537 {
538 uint64_t copyBegin = offset > chunkInfo.offset ? offset - chunkInfo.offset : 0;
539 if (copyBegin > chunkInfo.chunkSize) {
540 return 0;
541 }
542 size_t writePerOne = static_cast<size_t>(chunkInfo.chunkSize - static_cast<size_t>(copyBegin));
543 writePerOne = std::min(writePerOne, writeSize);
544 errno_t res = memcpy_s(chunkInfo.data + copyBegin, writePerOne, src, writePerOne);
545 FALSE_RETURN_V_MSG_E(res == EOK, 0, "memcpy_s data err");
546 chunkInfo.dataLength = static_cast<uint32_t>(static_cast<size_t>(copyBegin) + writePerOne);
547 return writePerOne;
548 }
549
PopFreeCacheChunk(CacheChunkList & freeChunks,uint64_t offset)550 inline CacheChunk* PopFreeCacheChunk(CacheChunkList& freeChunks, uint64_t offset)
551 {
552 if (freeChunks.empty()) {
553 return nullptr;
554 }
555 auto tmp = freeChunks.front();
556 freeChunks.pop_front();
557 InitChunkInfo(*tmp, offset);
558 return tmp;
559 }
560
WriteChunk(FragmentCacheBuffer & fragmentCacheBuffer,ChunkIterator & chunkPos,void * ptr,uint64_t offset,size_t writeSize)561 size_t CacheMediaChunkBufferImpl::WriteChunk(FragmentCacheBuffer& fragmentCacheBuffer, ChunkIterator& chunkPos,
562 void* ptr, uint64_t offset, size_t writeSize)
563 {
564 if (chunkPos == fragmentCacheBuffer.chunks.end()) {
565 MEDIA_LOG_D("input valid.");
566 return 0;
567 }
568 size_t writedTmp = 0;
569 auto chunkInfo = *chunkPos;
570 uint8_t* src = static_cast<uint8_t*>(ptr);
571 if (chunkInfo->chunkSize > chunkInfo->dataLength) {
572 writedTmp += WriteOneChunkData(*chunkInfo, src, offset, writeSize);
573 fragmentCacheBuffer.dataLength += static_cast<int64_t>(writedTmp);
574 }
575 int64_t loopStartTime = loopInterruptClock_.ElapsedSeconds();
576 while (writedTmp < writeSize && writedTmp >= 0) {
577 if (CheckLoopTimeout(loopStartTime)) {
578 break;
579 }
580 auto chunkOffset = offset + static_cast<uint64_t>(writedTmp);
581 auto freeChunk = GetFreeCacheChunk(chunkOffset);
582 if (freeChunk == nullptr) {
583 return writedTmp;
584 }
585 auto writePerOne = WriteOneChunkData(*freeChunk, src + writedTmp, chunkOffset, writeSize - writedTmp);
586 fragmentCacheBuffer.chunks.push_back(freeChunk);
587 writedTmp += writePerOne;
588 fragmentCacheBuffer.dataLength += static_cast<int64_t>(writePerOne);
589
590 if (fragmentCacheBuffer.accessPos == fragmentCacheBuffer.chunks.end()) {
591 fragmentCacheBuffer.accessPos = std::prev(fragmentCacheBuffer.chunks.end());
592 }
593 }
594 return writedTmp;
595 }
596
UpdateFragmentCacheForDelHead(FragmentIterator & fragmentIter)597 CacheChunk* CacheMediaChunkBufferImpl::UpdateFragmentCacheForDelHead(FragmentIterator& fragmentIter)
598 {
599 FragmentCacheBuffer& fragment = *fragmentIter;
600 if (fragment.chunks.empty()) {
601 return nullptr;
602 }
603 auto cacheChunk = fragment.chunks.front();
604 fragment.chunks.pop_front();
605
606 auto oldOffsetBegin = fragment.offsetBegin;
607 int64_t dataLength = static_cast<int64_t>(cacheChunk->dataLength);
608 fragment.offsetBegin += static_cast<uint64_t>(dataLength);
609 fragment.dataLength -= dataLength;
610 if (fragment.accessLength > dataLength) {
611 fragment.accessLength -= dataLength;
612 } else {
613 fragment.accessLength = 0;
614 }
615 lruCache_.Update(oldOffsetBegin, fragmentIter->offsetBegin, fragmentIter);
616 return cacheChunk;
617 }
618
UpdateFragmentCacheForDelTail(FragmentCacheBuffer & fragment)619 CacheChunk* UpdateFragmentCacheForDelTail(FragmentCacheBuffer& fragment)
620 {
621 if (fragment.chunks.empty()) {
622 return nullptr;
623 }
624 if (fragment.accessPos == std::prev(fragment.chunks.end())) {
625 fragment.accessPos = fragment.chunks.end();
626 }
627
628 auto cacheChunk = fragment.chunks.back();
629 fragment.chunks.pop_back();
630
631 auto dataLength = cacheChunk->dataLength;
632 if (fragment.accessLength > fragment.dataLength - static_cast<int64_t>(dataLength)) {
633 fragment.accessLength = fragment.dataLength - static_cast<int64_t>(dataLength);
634 }
635 fragment.dataLength -= static_cast<int64_t>(dataLength);
636 return cacheChunk;
637 }
638
CheckThresholdFragmentCacheBuffer(FragmentIterator & currWritePos)639 bool CacheMediaChunkBufferImpl::CheckThresholdFragmentCacheBuffer(FragmentIterator& currWritePos)
640 {
641 int64_t offset = -1;
642 FragmentIterator fragmentIterator = fragmentCacheBuffer_.end();
643 auto ret = lruCache_.GetLruNode(offset, fragmentIterator);
644 if (!ret) {
645 return false;
646 }
647 if (fragmentIterator == fragmentCacheBuffer_.end()) {
648 return false;
649 }
650 if (currWritePos == fragmentIterator) {
651 lruCache_.Refer(offset, currWritePos);
652 ret = lruCache_.GetLruNode(offset, fragmentIterator);
653 if (!ret) {
654 return false;
655 }
656 if (fragmentIterator == fragmentCacheBuffer_.end()) {
657 return false;
658 }
659 }
660 if (fragmentIterator != fragmentCacheBuffer_.end()) {
661 freeChunks_.splice(freeChunks_.end(), fragmentIterator->chunks);
662 EraseFragmentCache(fragmentIterator);
663 }
664 return true;
665 }
666
667 /***
668 * 总体策略:
669 * 计算最大允许Fragment数,大于 FRAGMENT_MAX_NUM(4)则剔除最近为未读取的Fragment(不包含当前写的节点)
670 * 新分配的节点固定分配 个chunk大小,通过公式计算,保证其能够下载;
671 * 每个Fragment最大允许的Chunk数:(本Fragment读取字节(fragmentReadSize)/ 总读取字节(totalReadSize))* 总Chunk个数
672 * 计算改Fragment最大允许的chunk个数
673 * 如果超过,则删除对应已读chunk,如果没有已读chunk,还超则返回不允许继续写,返回失败;(说明该Fragment不能再写更多的内容)
674 * 如果没有超过则从空闲队列中获取chunk,没有则
675 * for循环其他Fragment,计算每个Fragment的最大允许chunk个数:
676 * 如果超过,则删除对应已读chunk
677 * 如果还不够,则
678 * for循环其他Fragment,计算每个Fragment的最大允许chunk个数:
679 * 如果超过,则删除对应末尾未读chunk
680 *
681 * 如果还没有则返回失败
682 *
683 * 备注:是否一开始:优先从空闲队列中获取,没有则继续。
684 */
DeleteHasReadFragmentCacheBuffer(FragmentIterator & fragmentIter,size_t allowChunkNum)685 void CacheMediaChunkBufferImpl::DeleteHasReadFragmentCacheBuffer(FragmentIterator& fragmentIter, size_t allowChunkNum)
686 {
687 auto& fragmentCacheChunks = *fragmentIter;
688 while (fragmentCacheChunks.chunks.size() >= allowChunkNum &&
689 fragmentCacheChunks.accessLength > static_cast<int64_t>(static_cast<double>(fragmentCacheChunks.dataLength) *
690 CACHE_RELEASE_FACTOR_DEFAULT / TO_PERCENT)) {
691 if (fragmentCacheChunks.accessPos != fragmentCacheChunks.chunks.begin()) {
692 auto tmp = UpdateFragmentCacheForDelHead(fragmentIter);
693 if (tmp != nullptr) {
694 freeChunks_.push_back(tmp);
695 }
696 } else {
697 MEDIA_LOG_D("judge has read finish.");
698 break;
699 }
700 }
701 }
702
DeleteUnreadFragmentCacheBuffer(FragmentIterator & fragmentIter,size_t allowChunkNum)703 void CacheMediaChunkBufferImpl::DeleteUnreadFragmentCacheBuffer(FragmentIterator& fragmentIter, size_t allowChunkNum)
704 {
705 auto& fragmentCacheChunks = *fragmentIter;
706 while (fragmentCacheChunks.chunks.size() > allowChunkNum) {
707 if (!fragmentCacheChunks.chunks.empty()) {
708 auto tmp = UpdateFragmentCacheForDelTail(fragmentCacheChunks);
709 if (tmp != nullptr) {
710 freeChunks_.push_back(tmp);
711 }
712 } else {
713 break;
714 }
715 }
716 }
717
GetFreeCacheChunk(uint64_t offset,bool checkAllowFailContinue)718 CacheChunk* CacheMediaChunkBufferImpl::GetFreeCacheChunk(uint64_t offset, bool checkAllowFailContinue)
719 {
720 if (writePos_ == fragmentCacheBuffer_.end()) {
721 return nullptr;
722 }
723 if (!freeChunks_.empty()) {
724 return PopFreeCacheChunk(freeChunks_, offset);
725 }
726 auto currWritePos = GetOffsetFragmentCache(writePos_, offset, BoundedIntervalComp);
727 size_t allowChunkNum = 0;
728 if (currWritePos != fragmentCacheBuffer_.end()) {
729 allowChunkNum = CalcAllowMaxChunkNum(currWritePos->totalReadSize, currWritePos->offsetBegin);
730 DeleteHasReadFragmentCacheBuffer(currWritePos, allowChunkNum);
731 if (currWritePos->chunks.size() >= allowChunkNum && !checkAllowFailContinue) {
732 return nullptr;
733 }
734 }
735 if (!freeChunks_.empty()) {
736 return PopFreeCacheChunk(freeChunks_, offset);
737 }
738 for (auto iter = fragmentCacheBuffer_.begin(); iter != fragmentCacheBuffer_.end(); ++iter) {
739 if (iter != currWritePos) {
740 allowChunkNum = CalcAllowMaxChunkNum(iter->totalReadSize, iter->offsetBegin);
741 DeleteHasReadFragmentCacheBuffer(iter, allowChunkNum);
742 }
743 }
744 if (!freeChunks_.empty()) {
745 return PopFreeCacheChunk(freeChunks_, offset);
746 }
747 while (fragmentCacheBuffer_.size() > CACHE_FRAGMENT_MIN_NUM_DEFAULT) {
748 auto result = CheckThresholdFragmentCacheBuffer(currWritePos);
749 if (!freeChunks_.empty()) {
750 return PopFreeCacheChunk(freeChunks_, offset);
751 }
752 if (!result) {
753 break;
754 }
755 }
756 for (auto iter = fragmentCacheBuffer_.begin(); iter != fragmentCacheBuffer_.end(); ++iter) {
757 if (iter != currWritePos) {
758 allowChunkNum = CalcAllowMaxChunkNum(iter->totalReadSize, iter->offsetBegin);
759 DeleteUnreadFragmentCacheBuffer(iter, allowChunkNum);
760 }
761 }
762 if (!freeChunks_.empty()) {
763 return PopFreeCacheChunk(freeChunks_, offset);
764 }
765 return nullptr;
766 }
767
GetFreeCacheChunk(uint64_t offset,bool checkAllowFailContinue)768 CacheChunk* CacheMediaChunkBufferHlsImpl::GetFreeCacheChunk(uint64_t offset, bool checkAllowFailContinue)
769 {
770 if (writePos_ == fragmentCacheBuffer_.end()) {
771 return nullptr;
772 }
773 if (!freeChunks_.empty()) {
774 return PopFreeCacheChunk(freeChunks_, offset);
775 }
776 auto currWritePos = GetOffsetFragmentCache(writePos_, offset, BoundedIntervalComp);
777 size_t allowChunkNum = 0;
778 if (currWritePos != fragmentCacheBuffer_.end()) {
779 allowChunkNum = CalcAllowMaxChunkNum(currWritePos->totalReadSize, currWritePos->offsetBegin);
780 DeleteHasReadFragmentCacheBuffer(currWritePos, allowChunkNum);
781 if (currWritePos->chunks.size() >= allowChunkNum && !checkAllowFailContinue) {
782 MEDIA_LOG_D("allowChunkNum limit.");
783 return nullptr;
784 }
785 } else {
786 MEDIA_LOG_D("curr write is new fragment.");
787 }
788 MEDIA_LOG_D("clear other fragment has read chunk.");
789 for (auto iter = fragmentCacheBuffer_.begin(); iter != fragmentCacheBuffer_.end(); ++iter) {
790 if (iter != currWritePos) {
791 allowChunkNum = CalcAllowMaxChunkNum(iter->totalReadSize, iter->offsetBegin);
792 DeleteHasReadFragmentCacheBuffer(iter, allowChunkNum);
793 }
794 }
795 if (!freeChunks_.empty()) {
796 return PopFreeCacheChunk(freeChunks_, offset);
797 }
798 return nullptr;
799 }
800
GetFragmentIterator(FragmentIterator & currFragmentIter,uint64_t offset,ChunkIterator chunkPos,CacheChunk * splitHead,CacheChunk * & chunkInfo)801 FragmentIterator CacheMediaChunkBufferImpl::GetFragmentIterator(FragmentIterator& currFragmentIter,
802 uint64_t offset, ChunkIterator chunkPos, CacheChunk* splitHead, CacheChunk*& chunkInfo)
803 {
804 auto newFragmentPos = fragmentCacheBuffer_.emplace(std::next(currFragmentIter), offset);
805 if (splitHead == nullptr) {
806 newFragmentPos->chunks.splice(newFragmentPos->chunks.end(), currFragmentIter->chunks, chunkPos,
807 currFragmentIter->chunks.end());
808 } else {
809 splitHead->dataLength = 0;
810 newFragmentPos->chunks.splice(newFragmentPos->chunks.end(), currFragmentIter->chunks, std::next(chunkPos),
811 currFragmentIter->chunks.end());
812 newFragmentPos->chunks.push_front(splitHead);
813 splitHead->offset = offset;
814 uint64_t diff = offset > chunkInfo->offset ? offset - chunkInfo->offset : 0;
815 if (chunkInfo->dataLength >= diff) {
816 splitHead->dataLength = chunkInfo->dataLength - static_cast<uint32_t>(diff);
817 chunkInfo->dataLength = static_cast<uint32_t>(diff);
818 memcpy_s(splitHead->data, splitHead->dataLength, chunkInfo->data + diff, splitHead->dataLength);
819 }
820 }
821 newFragmentPos->offsetBegin = offset;
822 uint64_t diff = offset > currFragmentIter->offsetBegin ? offset - currFragmentIter->offsetBegin : 0;
823 newFragmentPos->dataLength = currFragmentIter->dataLength > static_cast<int64_t>(diff) ?
824 currFragmentIter->dataLength - static_cast<int64_t>(diff) : 0;
825 newFragmentPos->accessLength = 0;
826 uint64_t newReadSizeInit = static_cast<uint64_t>(1 + initReadSizeFactor_ * static_cast<double>(totalReadSize_));
827 newReadSizeInit = std::max(newReadSizeInit, currFragmentIter->totalReadSize);
828
829 newFragmentPos->totalReadSize = newReadSizeInit;
830 totalReadSize_ += newReadSizeInit;
831 newFragmentPos->readTime = Clock::now();
832 newFragmentPos->accessPos = newFragmentPos->chunks.begin();
833 newFragmentPos->isSplit = currFragmentIter->isSplit;
834 currFragmentIter->isSplit = true;
835 currFragmentIter->dataLength = static_cast<int64_t>(offset > currFragmentIter->offsetBegin ?
836 offset - currFragmentIter->offsetBegin : 0);
837 return newFragmentPos;
838 }
839
SplitFragmentCacheBuffer(FragmentIterator & currFragmentIter,uint64_t offset,ChunkIterator chunkPos)840 ChunkIterator CacheMediaChunkBufferImpl::SplitFragmentCacheBuffer(FragmentIterator& currFragmentIter,
841 uint64_t offset, ChunkIterator chunkPos)
842 {
843 ResetReadSizeAlloc();
844 auto& chunkInfo = *chunkPos;
845 CacheChunk* splitHead = nullptr;
846 if (offset != chunkInfo->offset) {
847 splitHead = freeChunks_.empty() ? GetFreeCacheChunk(offset, true) : PopFreeCacheChunk(freeChunks_, offset);
848 if (splitHead == nullptr) {
849 return chunkPos;
850 }
851 }
852 auto newFragmentPos = GetFragmentIterator(currFragmentIter, offset, chunkPos, splitHead, chunkInfo);
853 currFragmentIter = newFragmentPos;
854 if (fragmentCacheBuffer_.size() > CACHE_FRAGMENT_MAX_NUM_DEFAULT) {
855 CheckThresholdFragmentCacheBuffer(currFragmentIter);
856 }
857 lruCache_.Refer(newFragmentPos->offsetBegin, newFragmentPos);
858 return newFragmentPos->accessPos;
859 }
860
SplitFragmentCacheBuffer(FragmentIterator & currFragmentIter,uint64_t offset,ChunkIterator chunkPos)861 ChunkIterator CacheMediaChunkBufferHlsImpl::SplitFragmentCacheBuffer(FragmentIterator& currFragmentIter,
862 uint64_t offset, ChunkIterator chunkPos)
863 {
864 ResetReadSizeAlloc();
865 auto& chunkInfo = *chunkPos;
866 CacheChunk* splitHead = nullptr;
867 if (offset != chunkInfo->offset) {
868 splitHead = freeChunks_.empty() ? GetFreeCacheChunk(offset, true) : PopFreeCacheChunk(freeChunks_, offset);
869 if (splitHead == nullptr) {
870 return chunkPos;
871 }
872 }
873 auto newFragmentPos = fragmentCacheBuffer_.emplace(std::next(currFragmentIter), offset);
874 if (splitHead == nullptr) {
875 newFragmentPos->chunks.splice(newFragmentPos->chunks.end(), currFragmentIter->chunks, chunkPos,
876 currFragmentIter->chunks.end());
877 } else {
878 newFragmentPos->chunks.splice(newFragmentPos->chunks.end(), currFragmentIter->chunks, std::next(chunkPos),
879 currFragmentIter->chunks.end());
880 newFragmentPos->chunks.push_front(splitHead);
881 splitHead->offset = offset;
882 uint64_t diff = offset > chunkInfo->offset ? offset - chunkInfo->offset : 0;
883 if (chunkInfo->dataLength >= diff) {
884 splitHead->dataLength = chunkInfo->dataLength > static_cast<uint32_t>(diff) ?
885 chunkInfo->dataLength - static_cast<uint32_t>(diff) : 0;
886 chunkInfo->dataLength = static_cast<uint32_t>(diff);
887 memcpy_s(splitHead->data, splitHead->dataLength, chunkInfo->data + diff, splitHead->dataLength);
888 } else {
889 splitHead->dataLength = 0; // It can't happen. us_asan can check.
890 }
891 }
892 newFragmentPos->offsetBegin = offset;
893 uint64_t diff = offset > currFragmentIter->offsetBegin ? offset - currFragmentIter->offsetBegin : 0;
894 newFragmentPos->dataLength = currFragmentIter->dataLength > static_cast<int64_t>(diff) ?
895 currFragmentIter->dataLength - static_cast<int64_t>(diff) : 0;
896 newFragmentPos->accessLength = 0;
897 uint64_t newReadSizeInit = static_cast<uint64_t>(1 + initReadSizeFactor_ * static_cast<double>(totalReadSize_));
898 if (currFragmentIter->totalReadSize > newReadSizeInit) {
899 newReadSizeInit = currFragmentIter->totalReadSize;
900 }
901 newFragmentPos->totalReadSize = newReadSizeInit;
902 totalReadSize_ += newReadSizeInit;
903 newFragmentPos->readTime = Clock::now();
904 newFragmentPos->accessPos = newFragmentPos->chunks.begin();
905 currFragmentIter->dataLength = static_cast<int64_t>(offset > currFragmentIter->offsetBegin ?
906 offset - currFragmentIter->offsetBegin : 0);
907 currFragmentIter = newFragmentPos;
908 lruCache_.Refer(newFragmentPos->offsetBegin, newFragmentPos);
909 return newFragmentPos->accessPos;
910 }
911
AddFragmentCacheBuffer(uint64_t offset)912 ChunkIterator CacheMediaChunkBufferImpl::AddFragmentCacheBuffer(uint64_t offset)
913 {
914 size_t fragmentThreshold = CACHE_FRAGMENT_MAX_NUM_DEFAULT;
915 if (isLargeOffsetSpan_) {
916 fragmentThreshold = CACHE_FRAGMENT_MAX_NUM_LARGE;
917 }
918 if (fragmentCacheBuffer_.size() >= fragmentThreshold) {
919 auto fragmentIterTmp = fragmentCacheBuffer_.end();
920 CheckThresholdFragmentCacheBuffer(fragmentIterTmp);
921 }
922 ResetReadSizeAlloc();
923 auto fragmentInsertPos = std::upper_bound(fragmentCacheBuffer_.begin(), fragmentCacheBuffer_.end(), offset,
924 [](auto mediaOffset, const FragmentCacheBuffer& fragment) {
925 if (mediaOffset <= fragment.offsetBegin + fragment.dataLength) {
926 return true;
927 }
928 return false;
929 });
930 auto newFragmentPos = fragmentCacheBuffer_.emplace(fragmentInsertPos, offset);
931 uint64_t newReadSizeInit = static_cast<uint64_t>(1 + initReadSizeFactor_ * static_cast<double>(totalReadSize_));
932 totalReadSize_ += newReadSizeInit;
933 newFragmentPos->totalReadSize = newReadSizeInit;
934 writePos_ = newFragmentPos;
935 writePos_->accessPos = writePos_->chunks.end();
936 lruCache_.Refer(newFragmentPos->offsetBegin, newFragmentPos);
937 auto freeChunk = GetFreeCacheChunk(offset);
938 if (freeChunk == nullptr) {
939 MEDIA_LOG_D("get free cache chunk fail.");
940 return writePos_->chunks.end();
941 }
942 writePos_->accessPos = newFragmentPos->chunks.emplace(newFragmentPos->chunks.end(), freeChunk);
943 return writePos_->accessPos;
944 }
945
AddFragmentCacheBuffer(uint64_t offset)946 ChunkIterator CacheMediaChunkBufferHlsImpl::AddFragmentCacheBuffer(uint64_t offset)
947 {
948 ResetReadSizeAlloc();
949 auto fragmentInsertPos = std::upper_bound(fragmentCacheBuffer_.begin(), fragmentCacheBuffer_.end(), offset,
950 [](auto mediaOffset, const FragmentCacheBuffer& fragment) {
951 if (mediaOffset <= fragment.offsetBegin + fragment.dataLength) {
952 return true;
953 }
954 return false;
955 });
956 auto newFragmentPos = fragmentCacheBuffer_.emplace(fragmentInsertPos, offset);
957 uint64_t newReadSizeInit = static_cast<uint64_t>(1 + initReadSizeFactor_ * static_cast<double>(totalReadSize_));
958 totalReadSize_ += newReadSizeInit;
959 newFragmentPos->totalReadSize = newReadSizeInit;
960 writePos_ = newFragmentPos;
961 writePos_->accessPos = writePos_->chunks.end();
962 lruCache_.Refer(newFragmentPos->offsetBegin, newFragmentPos);
963 auto freeChunk = GetFreeCacheChunk(offset);
964 if (freeChunk == nullptr) {
965 MEDIA_LOG_D("get free cache chunk fail.");
966 return writePos_->chunks.end();
967 }
968 writePos_->accessPos = newFragmentPos->chunks.emplace(newFragmentPos->chunks.end(), freeChunk);
969 return writePos_->accessPos;
970 }
971
ResetReadSizeAlloc()972 void CacheMediaChunkBufferImpl::ResetReadSizeAlloc()
973 {
974 size_t chunkNum = chunkMaxNum_ + 1 >= freeChunks_.size() ?
975 chunkMaxNum_ + 1 - freeChunks_.size() : 0;
976 if (totalReadSize_ > static_cast<size_t>(UP_LIMIT_MAX_TOTAL_READ_SIZE) && chunkNum > 0) {
977 size_t preChunkSize = static_cast<size_t>(MAX_TOTAL_READ_SIZE - 1) / chunkNum;
978 for (auto iter = fragmentCacheBuffer_.begin(); iter != fragmentCacheBuffer_.end(); ++iter) {
979 iter->totalReadSize = preChunkSize * iter->chunks.size();
980 }
981 totalReadSize_ = preChunkSize * chunkNum;
982 }
983 }
984
Dump(uint64_t param)985 void CacheMediaChunkBufferImpl::Dump(uint64_t param)
986 {
987 std::lock_guard lock(mutex_);
988 DumpInner(param);
989 }
990
DumpInner(uint64_t param)991 void CacheMediaChunkBufferImpl::DumpInner(uint64_t param)
992 {
993 (void)param;
994 MEDIA_LOG_D("cacheBuff total buffer size : " PUBLIC_LOG_U64, totalBuffSize_);
995 MEDIA_LOG_D("cacheBuff total chunk size : " PUBLIC_LOG_U32, chunkSize_);
996 MEDIA_LOG_D("cacheBuff total chunk num : " PUBLIC_LOG_U32, chunkMaxNum_);
997 MEDIA_LOG_D("cacheBuff total read size : " PUBLIC_LOG_U64, totalReadSize_);
998 MEDIA_LOG_D("cacheBuff read size factor : " PUBLIC_LOG_F, initReadSizeFactor_);
999 MEDIA_LOG_D("cacheBuff free chunk num : " PUBLIC_LOG_ZU, freeChunks_.size());
1000 MEDIA_LOG_D("cacheBuff fragment num : " PUBLIC_LOG_ZU, fragmentCacheBuffer_.size());
1001 for (auto const & fragment : fragmentCacheBuffer_) {
1002 MEDIA_LOG_D("cacheBuff - fragment offset : " PUBLIC_LOG_U64, fragment.offsetBegin);
1003 MEDIA_LOG_D("cacheBuff fragment length : " PUBLIC_LOG_D64, fragment.dataLength);
1004 MEDIA_LOG_D("cacheBuff chunk num : " PUBLIC_LOG_ZU, fragment.chunks.size());
1005 MEDIA_LOG_D("cacheBuff access length : " PUBLIC_LOG_U64, fragment.accessLength);
1006 MEDIA_LOG_D("cacheBuff read size : " PUBLIC_LOG_U64, fragment.totalReadSize);
1007 if (fragment.accessPos != fragment.chunks.end()) {
1008 auto &chunkInfo = *fragment.accessPos;
1009 MEDIA_LOG_D("cacheBuff access offset: " PUBLIC_LOG_D64 ", len: " PUBLIC_LOG_U32,
1010 chunkInfo->offset, chunkInfo->dataLength);
1011 } else {
1012 MEDIA_LOG_D("cacheBuff access ended");
1013 }
1014 if (!fragment.chunks.empty()) {
1015 auto &chunkInfo = fragment.chunks.back();
1016 MEDIA_LOG_D("cacheBuff last chunk offset: " PUBLIC_LOG_D64 ", len: " PUBLIC_LOG_U32,
1017 chunkInfo->offset, chunkInfo->dataLength);
1018 }
1019 MEDIA_LOG_D("cacheBuff ");
1020 }
1021 }
1022
CheckLoopTimeout(int64_t loopStartTime)1023 bool CacheMediaChunkBufferImpl::CheckLoopTimeout(int64_t loopStartTime)
1024 {
1025 int64_t now = loopInterruptClock_.ElapsedSeconds();
1026 int64_t loopDuration = now > loopStartTime ? now - loopStartTime : 0;
1027 bool isLoopTimeout = loopDuration > LOOP_TIMEOUT;
1028 if (isLoopTimeout) {
1029 MEDIA_LOG_E("loop timeout.");
1030 }
1031 return isLoopTimeout;
1032 }
1033
Check()1034 bool CacheMediaChunkBufferImpl::Check()
1035 {
1036 std::lock_guard lock(mutex_);
1037 return CheckInner();
1038 }
1039
Clear()1040 void CacheMediaChunkBufferImpl::Clear()
1041 {
1042 std::lock_guard lock(mutex_);
1043 auto iter = fragmentCacheBuffer_.begin();
1044 while (iter != fragmentCacheBuffer_.end()) {
1045 freeChunks_.splice(freeChunks_.end(), iter->chunks);
1046 iter = EraseFragmentCache(iter);
1047 }
1048 lruCache_.Reset();
1049 totalReadSize_ = 0;
1050 }
1051
GetFreeSize()1052 uint64_t CacheMediaChunkBufferImpl::GetFreeSize()
1053 {
1054 std::lock_guard lock(mutex_);
1055 uint64_t totalFreeSize = totalBuffSize_;
1056 for (auto iter = fragmentCacheBuffer_.begin(); iter != fragmentCacheBuffer_.end(); iter++) {
1057 uint64_t fragmentDataLen = static_cast<uint64_t>(iter->dataLength);
1058 totalFreeSize = totalFreeSize > fragmentDataLen ? totalFreeSize - fragmentDataLen : 0;
1059 }
1060 return totalFreeSize;
1061 }
1062
1063 // Release all fragments before the offset.
ClearChunksOfFragment(uint64_t offset)1064 bool CacheMediaChunkBufferImpl::ClearChunksOfFragment(uint64_t offset)
1065 {
1066 std::lock_guard lock(mutex_);
1067 bool res = false;
1068 auto fragmentPos = GetOffsetFragmentCache(readPos_, offset, LeftBoundedRightOpenComp);
1069 if (fragmentPos == fragmentCacheBuffer_.end()) {
1070 return false;
1071 }
1072 auto& fragment = *fragmentPos;
1073 if (fragment.chunks.empty()) {
1074 return false;
1075 }
1076 uint32_t chunkSize = fragment.chunks.size();
1077 for (uint32_t i = 0; i < chunkSize; ++i) {
1078 auto chunkIter = fragment.chunks.front();
1079 if (fragmentPos->accessPos == fragmentPos->chunks.end() || chunkIter == nullptr ||
1080 chunkIter->offset + chunkIter->dataLength >= offset) {
1081 break;
1082 }
1083
1084 auto chunkPos = fragmentPos->accessPos;
1085 if ((*chunkPos) != nullptr && chunkIter->offset >= (*chunkPos)->offset) { // Update accessPos of fragment
1086 chunkPos = GetOffsetChunkCache(fragmentPos->chunks, chunkIter->offset + chunkIter->dataLength,
1087 LeftBoundedRightOpenComp);
1088 (*fragmentPos).accessPos = chunkPos;
1089 }
1090
1091 MEDIA_LOG_D("ClearChunksOfFragment clear chunk, offsetBegin: " PUBLIC_LOG_U64 " offsetEnd " PUBLIC_LOG_U64,
1092 chunkIter->offset, chunkIter->offset + chunkIter->dataLength);
1093 auto tmp = UpdateFragmentCacheForDelHead(fragmentPos);
1094 if (tmp != nullptr) {
1095 res = true;
1096 freeChunks_.push_back(tmp);
1097 }
1098 }
1099 return res;
1100 }
1101
1102 // Release all chunks before the offset in the fragment to which the specified offset belongs.
ClearFragmentBeforeOffset(uint64_t offset)1103 bool CacheMediaChunkBufferImpl::ClearFragmentBeforeOffset(uint64_t offset)
1104 {
1105 std::lock_guard lock(mutex_);
1106 bool res = false;
1107 for (auto iter = fragmentCacheBuffer_.begin(); iter != fragmentCacheBuffer_.end();) {
1108 if (iter->offsetBegin >= offset) {
1109 break;
1110 }
1111 if (iter->offsetBegin + static_cast<uint64_t>(iter->dataLength) <= offset) {
1112 MEDIA_LOG_D("ClearFragmentBeforeOffset clear fragment, offsetBegin: " PUBLIC_LOG_U64 " offsetEnd "
1113 PUBLIC_LOG_U64, iter->offsetBegin, iter->offsetBegin + iter->dataLength);
1114 freeChunks_.splice(freeChunks_.end(), iter->chunks);
1115 iter = EraseFragmentCache(iter);
1116 res = true;
1117 continue;
1118 }
1119 iter++;
1120 }
1121 return res;
1122 }
1123
1124 // Release all chunks of read fragment between minReadOffset and maxReadOffset.
ClearMiddleReadFragment(uint64_t minOffset,uint64_t maxOffset)1125 bool CacheMediaChunkBufferImpl::ClearMiddleReadFragment(uint64_t minOffset, uint64_t maxOffset)
1126 {
1127 std::lock_guard lock(mutex_);
1128 bool res = false;
1129 for (auto iter = fragmentCacheBuffer_.begin(); iter != fragmentCacheBuffer_.end(); iter++) {
1130 if (iter->offsetBegin + static_cast<uint64_t>(iter->dataLength) < minOffset) {
1131 continue;
1132 }
1133 if (iter->offsetBegin > maxOffset) {
1134 break;
1135 }
1136 if (iter->accessLength <= chunkSize_) {
1137 continue;
1138 }
1139 MEDIA_LOG_D("ClearMiddleReadFragment, minOffset: " PUBLIC_LOG_U64 " maxOffset: "
1140 PUBLIC_LOG_U64 " offsetBegin: " PUBLIC_LOG_U64 " dataLength: " PUBLIC_LOG_D64 " accessLength "
1141 PUBLIC_LOG_D64, minOffset, maxOffset, iter->offsetBegin, iter->dataLength, iter->accessLength);
1142 auto& fragment = *iter;
1143 uint32_t chunksSize = fragment.chunks.size();
1144 for (uint32_t i = 0; i < chunksSize; ++i) {
1145 auto chunkIter = fragment.chunks.front();
1146 if (chunkIter->dataLength >= iter->accessLength ||
1147 (chunkIter->offset + chunkIter->dataLength >= maxOffset &&
1148 chunkIter->offset <= minOffset)) {
1149 break;
1150 }
1151 auto tmp = UpdateFragmentCacheForDelHead(iter);
1152 if (tmp != nullptr) {
1153 freeChunks_.push_back(tmp);
1154 }
1155 }
1156 }
1157 return res;
1158 }
1159
IsReadSplit(uint64_t offset)1160 bool CacheMediaChunkBufferImpl::IsReadSplit(uint64_t offset)
1161 {
1162 std::lock_guard lock(mutex_);
1163 auto readPos = GetOffsetFragmentCache(readPos_, offset, LeftBoundedRightOpenComp);
1164 if (readPos != fragmentCacheBuffer_.end()) {
1165 return readPos->isSplit;
1166 }
1167 return false;
1168 }
1169
SetIsLargeOffsetSpan(bool isLargeOffsetSpan)1170 void CacheMediaChunkBufferImpl::SetIsLargeOffsetSpan(bool isLargeOffsetSpan)
1171 {
1172 isLargeOffsetSpan_ = isLargeOffsetSpan;
1173 }
1174
DumpAndCheckInner()1175 bool CacheMediaChunkBufferImpl::DumpAndCheckInner()
1176 {
1177 DumpInner(0);
1178 return CheckInner();
1179 }
1180
CheckFragment(const FragmentCacheBuffer & fragment,bool & checkSuccess)1181 void CacheMediaChunkBufferImpl::CheckFragment(const FragmentCacheBuffer& fragment, bool& checkSuccess)
1182 {
1183 if (fragment.accessPos != fragment.chunks.end()) {
1184 auto& accessChunk = *fragment.accessPos;
1185 auto accessLength = accessChunk->offset > fragment.offsetBegin ?
1186 accessChunk->offset - fragment.offsetBegin : 0;
1187 if (fragment.accessLength < accessLength ||
1188 fragment.accessLength >
1189 (static_cast<int64_t>(accessLength) + static_cast<int64_t>(accessChunk->dataLength))) {
1190 checkSuccess = false;
1191 }
1192 }
1193 }
1194
CheckInner()1195 bool CacheMediaChunkBufferImpl::CheckInner()
1196 {
1197 uint64_t chunkNum = 0;
1198 uint64_t totalReadSize = 0;
1199 bool checkSuccess = true;
1200 chunkNum = freeChunks_.size();
1201 for (auto const& fragment : fragmentCacheBuffer_) {
1202 int64_t dataLength = 0;
1203 chunkNum += fragment.chunks.size();
1204 totalReadSize += fragment.totalReadSize;
1205
1206 auto prev = fragment.chunks.begin();
1207 auto next = fragment.chunks.end();
1208 if (!fragment.chunks.empty()) {
1209 dataLength += static_cast<int64_t>((*prev)->dataLength);
1210 next = std::next(prev);
1211 if ((*prev)->offset != fragment.offsetBegin) {
1212 checkSuccess = false;
1213 }
1214 }
1215 while (next != fragment.chunks.end()) {
1216 auto &chunkPrev = *prev;
1217 auto &chunkNext = *next;
1218 dataLength += static_cast<int64_t>(chunkNext->dataLength);
1219 if (chunkPrev->offset + chunkPrev->dataLength != chunkNext->offset) {
1220 checkSuccess = false;
1221 }
1222 ++next;
1223 ++prev;
1224 }
1225 if (dataLength != fragment.dataLength) {
1226 checkSuccess = false;
1227 }
1228 CheckFragment(fragment, checkSuccess);
1229 }
1230 if (chunkNum != chunkMaxNum_ + 1) {
1231 checkSuccess = false;
1232 }
1233
1234 if (totalReadSize != totalReadSize_) {
1235 checkSuccess = false;
1236 }
1237 return checkSuccess;
1238 }
1239
1240
CacheMediaChunkBuffer()1241 CacheMediaChunkBuffer::CacheMediaChunkBuffer()
1242 {
1243 MEDIA_LOG_D("enter");
1244 impl_ = std::make_unique<CacheMediaChunkBufferImpl>();
1245 };
1246
~CacheMediaChunkBuffer()1247 CacheMediaChunkBuffer::~CacheMediaChunkBuffer()
1248 {
1249 MEDIA_LOG_D("exit");
1250 }
1251
Init(uint64_t totalBuffSize,uint32_t chunkSize)1252 bool CacheMediaChunkBuffer::Init(uint64_t totalBuffSize, uint32_t chunkSize)
1253 {
1254 return impl_->Init(totalBuffSize, chunkSize);
1255 }
1256
Read(void * ptr,uint64_t offset,size_t readSize)1257 size_t CacheMediaChunkBuffer::Read(void* ptr, uint64_t offset, size_t readSize)
1258 {
1259 return impl_->Read(ptr, offset, readSize);
1260 }
1261
Write(void * ptr,uint64_t offset,size_t writeSize)1262 size_t CacheMediaChunkBuffer::Write(void* ptr, uint64_t offset, size_t writeSize)
1263 {
1264 return impl_->Write(ptr, offset, writeSize);
1265 }
1266
Seek(uint64_t offset)1267 bool CacheMediaChunkBuffer::Seek(uint64_t offset)
1268 {
1269 return impl_->Seek(offset);
1270 }
1271
GetBufferSize(uint64_t offset)1272 size_t CacheMediaChunkBuffer::GetBufferSize(uint64_t offset)
1273 {
1274 return impl_->GetBufferSize(offset);
1275 }
1276
GetNextBufferOffset(uint64_t offset)1277 uint64_t CacheMediaChunkBuffer::GetNextBufferOffset(uint64_t offset)
1278 {
1279 return impl_->GetNextBufferOffset(offset);
1280 }
1281
Clear()1282 void CacheMediaChunkBuffer::Clear()
1283 {
1284 return impl_->Clear();
1285 }
1286
GetFreeSize()1287 uint64_t CacheMediaChunkBuffer::GetFreeSize()
1288 {
1289 return impl_->GetFreeSize();
1290 }
1291
ClearFragmentBeforeOffset(uint64_t offset)1292 bool CacheMediaChunkBuffer::ClearFragmentBeforeOffset(uint64_t offset)
1293 {
1294 return impl_->ClearFragmentBeforeOffset(offset);
1295 }
1296
ClearChunksOfFragment(uint64_t offset)1297 bool CacheMediaChunkBuffer::ClearChunksOfFragment(uint64_t offset)
1298 {
1299 return impl_->ClearChunksOfFragment(offset);
1300 }
1301
ClearMiddleReadFragment(uint64_t minOffset,uint64_t maxOffset)1302 bool CacheMediaChunkBuffer::ClearMiddleReadFragment(uint64_t minOffset, uint64_t maxOffset)
1303 {
1304 return impl_->ClearMiddleReadFragment(minOffset, maxOffset);
1305 }
1306
IsReadSplit(uint64_t offset)1307 bool CacheMediaChunkBuffer::IsReadSplit(uint64_t offset)
1308 {
1309 return impl_->IsReadSplit(offset);
1310 }
1311
SetIsLargeOffsetSpan(bool isLargeOffsetSpan)1312 void CacheMediaChunkBuffer::SetIsLargeOffsetSpan(bool isLargeOffsetSpan)
1313 {
1314 return impl_->SetIsLargeOffsetSpan(isLargeOffsetSpan);
1315 }
1316
SetReadBlocking(bool isReadBlockingAllowed)1317 void CacheMediaChunkBuffer::SetReadBlocking(bool isReadBlockingAllowed)
1318 {
1319 (void)isReadBlockingAllowed;
1320 }
1321
Dump(uint64_t param)1322 void CacheMediaChunkBuffer::Dump(uint64_t param)
1323 {
1324 return impl_->Dump(param);
1325 }
1326
Check()1327 bool CacheMediaChunkBuffer::Check()
1328 {
1329 return impl_->Check();
1330 }
1331 }
1332 }