• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define HST_LOG_TAG "DataPacker"
16 
17 #include "data_packer.h"
18 #include <cstring>
19 #include "foundation/log.h"
20 #include "utils/dump_buffer.h"
21 
22 namespace OHOS {
23 namespace Media {
24 #define EXEC_WHEN_GET(isGet, exec)     \
25     do {                               \
26         if (isGet) {                   \
27             exec;                      \
28         }                              \
29     } while (0)
30 
31 static const DataPacker::Position INVALID_POSITION = DataPacker::Position(-1, 0, 0);
32 static constexpr size_t MAX_BUFFER_NUMBER_IN_DATA_PACKER = 30;
33 
DataPacker()34 DataPacker::DataPacker() : mutex_(), que_(), size_(0), mediaOffset_(0), pts_(0), dts_(0),
35     prevGet_(INVALID_POSITION), currentGet_(INVALID_POSITION), capacity_(MAX_BUFFER_NUMBER_IN_DATA_PACKER)
36 {
37     MEDIA_LOG_I("DataPacker ctor...");
38 }
39 
~DataPacker()40 DataPacker::~DataPacker()
41 {
42     MEDIA_LOG_I("DataPacker dtor...");
43     cvEmpty_.NotifyAll();
44     cvFull_.NotifyAll();
45 }
46 
AudioBufferSize(AVBufferPtr & ptr)47 inline static size_t AudioBufferSize(AVBufferPtr& ptr)
48 {
49     return ptr->GetMemory()->GetSize();
50 }
51 
AudioBufferWritableData(AVBufferPtr & ptr,size_t size,size_t position=0)52 inline static uint8_t* AudioBufferWritableData(AVBufferPtr& ptr, size_t size, size_t position = 0)
53 {
54     return ptr->GetMemory()->GetWritableAddr(size, position);
55 }
56 
AudioBufferReadOnlyData(AVBufferPtr & ptr)57 inline static const uint8_t* AudioBufferReadOnlyData(AVBufferPtr& ptr)
58 {
59     return ptr->GetMemory()->GetReadOnlyData();
60 }
61 
PushData(AVBufferPtr bufferPtr,uint64_t offset)62 void DataPacker::PushData(AVBufferPtr bufferPtr, uint64_t offset)
63 {
64     size_t bufferSize = AudioBufferSize(bufferPtr);
65     MEDIA_LOG_DD("DataPacker PushData begin... buffer (offset " PUBLIC_LOG_U64 ", size " PUBLIC_LOG_ZU ")",
66                  offset, bufferSize);
67     DUMP_BUFFER2LOG("DataPacker Push", bufferPtr, offset);
68     FALSE_RETURN_MSG(bufferSize > 0, "Can not push zero length buffer.");
69 
70     OSAL::ScopedLock lock(mutex_);
71     if (que_.size() >= capacity_) {
72         MEDIA_LOG_D("DataPacker is full, waiting for pop.");
73         do {
74             cvFull_.WaitFor(lock, 1000,  // 1000 ms
75                             [this] { return que_.size() < capacity_ || stopped_.load(); });
76             if (stopped_.load()) {
77                 MEDIA_LOG_D("DataPacker stopped, so return.");
78                 return;
79             }
80         } while (que_.size() >= capacity_);
81     }
82 
83     size_ += AudioBufferSize(bufferPtr);
84     if (que_.empty()) {
85         mediaOffset_ = offset;
86         dts_ = bufferPtr->dts;
87         pts_ = bufferPtr->pts;
88     }
89     que_.emplace_back(std::move(bufferPtr));
90     cvEmpty_.NotifyOne();
91     MEDIA_LOG_DD("DataPacker PushData end. " PUBLIC_LOG_S, ToString().c_str());
92 }
93 
94 // curOffset: the offset end of this dataPacker. if IsDataAvailable() false, we can get data from source from curOffset
IsDataAvailable(uint64_t offset,uint32_t size,uint64_t & curOffset)95 bool DataPacker::IsDataAvailable(uint64_t offset, uint32_t size, uint64_t &curOffset)
96 {
97     MEDIA_LOG_DD("dataPacker (offset " PUBLIC_LOG_U64 ", size " PUBLIC_LOG_U32 "), curOffsetEnd is " PUBLIC_LOG_U64,
98                  mediaOffset_, size_.load(), mediaOffset_ + size_.load());
99     MEDIA_LOG_DD(PUBLIC_LOG_S, ToString().c_str());
100     OSAL::ScopedLock lock(mutex_);
101     auto curOffsetTemp = mediaOffset_;
102     if (que_.empty() || offset < curOffsetTemp || offset > curOffsetTemp + size_) { // 原有数据无法命中, 则删除原有数据
103         curOffset = offset;
104         FlushInternal();
105         MEDIA_LOG_DD("IsDataAvailable false, offset not in cached data, clear it.");
106         return false;
107     }
108     size_t bufCnt = que_.size();
109     uint64_t offsetEnd = offset + size;
110     uint64_t curOffsetEnd = mediaOffset_ + AudioBufferSize(que_.front());
111     if (bufCnt == 1) {
112         curOffset = curOffsetEnd;
113         MEDIA_LOG_DD("IsDataAvailable bufCnt == 1, result " PUBLIC_LOG_D32, offsetEnd <= curOffsetEnd);
114         return offsetEnd <= curOffsetEnd;
115     }
116     auto preOffsetEnd = curOffsetEnd;
117     for (size_t i = 1; i < bufCnt; ++i) {
118         curOffsetEnd = preOffsetEnd + AudioBufferSize(que_[i]);
119         if (curOffsetEnd >= offsetEnd) {
120             MEDIA_LOG_DD("IsDataAvailable true, last buffer index " PUBLIC_LOG_ZU ", offsetEnd " PUBLIC_LOG_U64
121                          ", curOffsetEnd " PUBLIC_LOG_U64, i, offsetEnd, curOffsetEnd);
122             return true;
123         } else {
124             preOffsetEnd = curOffsetEnd;
125         }
126     }
127     if (preOffsetEnd >= offsetEnd) {
128         MEDIA_LOG_DD("IsDataAvailable true, use all buffers, last buffer index " PUBLIC_LOG_ZU ", offsetEnd "
129                      PUBLIC_LOG_U64 ", curOffsetEnd " PUBLIC_LOG_U64, bufCnt - 1, offsetEnd, curOffsetEnd);
130         return true;
131     }
132     curOffset = preOffsetEnd;
133     MEDIA_LOG_DD("IsDataAvailable false, offsetEnd " PUBLIC_LOG_U64 ", curOffsetEnd " PUBLIC_LOG_U64,
134                  offsetEnd, preOffsetEnd);
135     return false;
136 }
137 
PeekRange(uint64_t offset,uint32_t size,AVBufferPtr & bufferPtr)138 bool DataPacker::PeekRange(uint64_t offset, uint32_t size, AVBufferPtr& bufferPtr)
139 {
140     OSAL::ScopedLock lock(mutex_);
141     if (que_.empty()) {
142         MEDIA_LOG_D("DataPacker is empty, waiting for push.");
143         cvEmpty_.Wait(lock, [this] { return !que_.empty(); });
144     }
145 
146     return PeekRangeInternal(offset, size, bufferPtr, false);
147 }
148 
149 // Should call IsDataAvailable() before to make sure there is enough buffer to copy.
150 // offset : the offset (of the media file) to peek ( 要peek的数据起始位置 在media file文件 中的 offset )
151 // size : the size of data to peek
152 // bufferPtr : out buffer
153 // isGet : is it called from GetRange.
PeekRangeInternal(uint64_t offset,uint32_t size,AVBufferPtr & bufferPtr,bool isGet)154 bool DataPacker::PeekRangeInternal(uint64_t offset, uint32_t size, AVBufferPtr &bufferPtr, bool isGet)
155 {
156     MEDIA_LOG_DD("PeekRangeInternal (offset, size) = (" PUBLIC_LOG_U64 ", " PUBLIC_LOG_U32 ")...", offset, size);
157     int32_t startIndex = 0; // The index of buffer that we first use
158     size_t copySize = 0;
159     uint32_t needCopySize = size;
160     uint32_t firstBufferOffset = 0;
161     uint8_t* dstPtr = AudioBufferWritableData(bufferPtr, needCopySize);
162     FALSE_RETURN_V(dstPtr != nullptr, false);
163 
164     auto offsetEnd = offset + needCopySize;
165     auto curOffsetEnd = mediaOffset_ + AudioBufferSize(que_[startIndex]);
166     if (offsetEnd <= curOffsetEnd) { // first buffer is enough
167         auto bufferOffset = static_cast<int32_t>(offset - mediaOffset_);
168         FALSE_RETURN_V_MSG_E(bufferOffset >= 0, false, "Copy buffer start position error.");
169         firstBufferOffset = bufferOffset;
170         copySize = CopyFirstBuffer(size, startIndex, dstPtr, bufferPtr, bufferOffset);
171         needCopySize -= copySize;
172         FALSE_LOG_MSG(needCopySize == 0, "First buffer is enough, but copySize is not enough");
173         EXEC_WHEN_GET(isGet, currentGet_ = Position(startIndex, firstBufferOffset, offset));
174         return true;
175     } else { // first buffer not enough
176         // Find the first buffer that should copy
177         uint64_t prevOffset; // The media offset of the startIndex buffer start byte
178         FALSE_RETURN_V_MSG_E(FindFirstBufferToCopy(offset, startIndex, prevOffset), false,
179             "Read offset(" PUBLIC_LOG_D64 ") size(" PUBLIC_LOG_D32 ") from " PUBLIC_LOG_S,
180             offset, size, ToString().c_str());
181         auto bufferOffset = static_cast<int32_t>(offset - prevOffset);
182         FALSE_RETURN_V_MSG_E(bufferOffset >= 0, false, "Copy buffer start position error.");
183         firstBufferOffset = bufferOffset;
184         copySize = CopyFirstBuffer(size, startIndex, dstPtr, bufferPtr, bufferOffset);
185 
186         needCopySize -= copySize;
187         if (needCopySize == 0) { // First buffer is enough
188             EXEC_WHEN_GET(isGet, currentGet_ = Position(startIndex, firstBufferOffset, offset));
189             return true;
190         }
191         dstPtr += copySize;
192 
193         // First buffer is not enough, copy from successive buffers
194         (void)CopyFromSuccessiveBuffer(prevOffset, offsetEnd, startIndex, dstPtr, needCopySize);
195     }
196     EXEC_WHEN_GET(isGet, currentGet_ = Position(startIndex, firstBufferOffset, offset));
197 
198     // Update to the real size, especially at the end.
199     bufferPtr->GetMemory()->UpdateDataSize(size - needCopySize);
200     return true;
201 }
202 
203 // Call IsDataAvailable() first before call GetRange
GetRange(uint64_t offset,uint32_t size,AVBufferPtr & bufferPtr)204 bool DataPacker::GetRange(uint64_t offset, uint32_t size, AVBufferPtr& bufferPtr)
205 {
206     MEDIA_LOG_DD("DataPacker GetRange(offset, size) = (" PUBLIC_LOG_U64 ", "
207                  PUBLIC_LOG_U32 ")...", offset, size);
208     DUMP_BUFFER2LOG("GetRange Input", bufferPtr, 0);
209     FALSE_RETURN_V_MSG_E(bufferPtr && (!bufferPtr->IsEmpty()) && bufferPtr->GetMemory()->GetCapacity() >= size, false,
210         "GetRange input bufferPtr empty or capacity not enough.");
211 
212     OSAL::ScopedLock lock(mutex_);
213     if (que_.empty()) {
214         MEDIA_LOG_D("DataPacker is empty, waiting for push");
215         cvEmpty_.Wait(lock, [this] { return !que_.empty(); });
216     }
217 
218     FALSE_RETURN_V(!que_.empty(), false);
219     prevGet_ = currentGet_; // store last get position to prevGet_
220 
221     FALSE_RETURN_V(PeekRangeInternal(offset, size, bufferPtr, true), false);
222     if (isEos_ && size_ <= size) { // Is EOS, and this time get all the data.
223         FlushInternal();
224     } else {
225         if (prevGet_ < currentGet_) {
226             RemoveOldData(currentGet_);
227         }
228     }
229 
230     if (que_.size() < capacity_) {
231         cvFull_.NotifyOne();
232     }
233     return true;
234 }
235 
236 // GetRange in live play mode
237 //  1. not use offset
238 //  2. remove the data have been read
GetRange(uint32_t size,AVBufferPtr & bufferPtr)239 bool DataPacker::GetRange(uint32_t size, AVBufferPtr& bufferPtr)
240 {
241     MEDIA_LOG_D("DataPacker live play GetRange(size) = (" PUBLIC_LOG_U32 ")...", size);
242     FALSE_RETURN_V_MSG_E(bufferPtr && (!bufferPtr->IsEmpty()) && bufferPtr->GetMemory()->GetCapacity() >= size, false,
243         "Live play GetRange input bufferPtr empty or capacity not enough.");
244 
245     OSAL::ScopedLock lock(mutex_);
246     if (que_.empty()) {
247         FALSE_RETURN_V_W(!isEos_, false);
248         MEDIA_LOG_D("DataPacker is empty, live play GetRange waiting for push");
249         cvEmpty_.Wait(lock, [this] { return !que_.empty() || isEos_; });
250         if (isEos_) {
251             MEDIA_LOG_D("Eos wakeup the cvEmpty ConditionVariable");
252             return false;
253         }
254     }
255 
256     FALSE_RETURN_V(!que_.empty(), false);
257 
258     int32_t needCopySize = static_cast<int32_t>(size);
259     int32_t currCopySize = 0;
260     int32_t index = 0;
261     uint32_t lastBufferOffsetEnd = 0;
262 
263     uint8_t* dstPtr = AudioBufferWritableData(bufferPtr, size);
264     FALSE_RETURN_V(dstPtr != nullptr, false);
265 
266     while (index < que_.size()) {
267         AVBufferPtr& buffer = que_[index];
268         size_t bufferSize = AudioBufferSize(buffer);
269         currCopySize = std::min(static_cast<int32_t>(bufferSize), needCopySize);
270         currCopySize = CopyFirstBuffer(currCopySize, index, dstPtr, bufferPtr, 0);
271         lastBufferOffsetEnd = currCopySize;
272         dstPtr += currCopySize;
273         needCopySize -= currCopySize;
274         if (needCopySize <= 0) { // it is enough
275             break;
276         }
277         index++;
278         lastBufferOffsetEnd = 0;
279     }
280     FALSE_LOG(needCopySize >= 0);
281     if (needCopySize < 0) {
282         needCopySize = 0;
283     }
284     bufferPtr->GetMemory()->UpdateDataSize(size - needCopySize);
285 
286     auto endPosition = Position(index, lastBufferOffsetEnd, mediaOffset_ + size - needCopySize);
287     RemoveOldData(endPosition); // Live play, remove the got data
288     if (que_.size() < capacity_) {
289         cvFull_.NotifyOne();
290     }
291     return true;
292 }
293 
Flush()294 void DataPacker::Flush()
295 {
296     MEDIA_LOG_I("DataPacker Flush called.");
297     OSAL::ScopedLock lock(mutex_);
298     FlushInternal();
299 }
300 
SetEos()301 void DataPacker::SetEos()
302 {
303     MEDIA_LOG_I("DataPacker SetEos called.");
304     OSAL::ScopedLock lock(mutex_);
305     isEos_ = true;
306     cvEmpty_.NotifyOne();
307 }
308 
IsEmpty()309 bool DataPacker::IsEmpty()
310 {
311     OSAL::ScopedLock lock(mutex_);
312     return size_ > 0;
313 }
314 
Start()315 void DataPacker::Start()
316 {
317     MEDIA_LOG_I("DataPacker Start called.");
318     stopped_.store(false);
319 }
320 
Stop()321 void DataPacker::Stop()
322 {
323     MEDIA_LOG_I("DataPacker Stop called.");
324     stopped_.store(true);
325     cvEmpty_.NotifyAll(); // avoid some thread can not exit
326     cvFull_.NotifyAll();
327 }
328 
FlushInternal()329 void DataPacker::FlushInternal()
330 {
331     MEDIA_LOG_D("DataPacker FlushInternal called.");
332     que_.clear();
333     size_ = 0;
334     mediaOffset_ = 0;
335     dts_ = 0;
336     pts_ = 0;
337     isEos_ = false;
338     prevGet_ = INVALID_POSITION;
339     currentGet_ = INVALID_POSITION;
340 }
341 
342 // Remove first removeSize data in the buffer
RemoveBufferContent(std::shared_ptr<AVBuffer> & buffer,size_t removeSize)343 void DataPacker::RemoveBufferContent(std::shared_ptr<AVBuffer> &buffer, size_t removeSize)
344 {
345     if (removeSize == 0) {
346         return;
347     }
348     auto memory = buffer->GetMemory();
349     FALSE_RETURN(removeSize < memory->GetSize());
350     auto copySize = memory->GetSize() - removeSize;
351     FALSE_LOG_MSG(memmove_s(memory->GetWritableAddr(copySize), memory->GetCapacity(),
352         memory->GetReadOnlyData(removeSize), copySize) == EOK, "memmove failed.");
353     FALSE_RETURN(UpdateWhenFrontDataRemoved(removeSize));
354 }
355 
356 // Remove consumed data, and make the remaining data continuous
357 // Consumed data - between prevGet_.first and currentGet_.first
358 // In order to make remaining data continuous, also remove the data before prevGet_.first
359 // Update to support live play mode, Remove the data before position
RemoveOldData(const Position & position)360 void DataPacker::RemoveOldData(const Position& position)
361 {
362     MEDIA_LOG_DD("Before RemoveOldData " PUBLIC_LOG_S, ToString().c_str());
363     FALSE_LOG(RemoveTo(position));
364     if (que_.empty()) {
365         mediaOffset_ = 0;
366         size_ = 0;
367         pts_ = 0;
368         dts_ = 0;
369     } else {
370         pts_ = que_.front()->pts;
371         dts_ = que_.front()->dts;
372     }
373     MEDIA_LOG_DD("After RemoveOldData " PUBLIC_LOG_S, ToString().c_str());
374 }
375 
RemoveTo(const Position & position)376 bool DataPacker::RemoveTo(const Position& position)
377 {
378     MEDIA_LOG_DD("Remove to " PUBLIC_LOG_S, position.ToString().c_str());
379     size_t removeSize;
380     int32_t i = 0;
381     while (i < position.index && !que_.empty()) { // Remove all whole buffer before position.index
382         removeSize = AudioBufferSize(que_.front());
383         FALSE_RETURN_V(UpdateWhenFrontDataRemoved(removeSize), false);
384         que_.pop_front();
385         i++;
386     }
387     FALSE_RETURN_V_W(!que_.empty(), true);
388 
389     // The last buffer
390     removeSize = AudioBufferSize(que_.front());
391     // 1. If whole buffer should be removed
392     if (position.bufferOffset >= removeSize) {
393         FALSE_RETURN_V(UpdateWhenFrontDataRemoved(removeSize), false);
394         que_.pop_front();
395         return true;
396     }
397     // 2. Remove the front part of the buffer data
398     RemoveBufferContent(que_.front(), position.bufferOffset);
399     return true;
400 }
401 
UpdateWhenFrontDataRemoved(size_t removeSize)402 bool DataPacker::UpdateWhenFrontDataRemoved(size_t removeSize)
403 {
404     mediaOffset_ += removeSize;
405     FALSE_RETURN_V_MSG_E(size_.load() >= removeSize, false, "Total size(size_ " PUBLIC_LOG_U32
406         ") smaller than removeSize(" PUBLIC_LOG_ZU ")", size_.load(), removeSize);
407     size_ -= removeSize;
408     return true;
409 }
410 
411 // offset : from GetRange(offset, size)
412 // startIndex : out, find the first buffer should copy
413 // prevOffset : the first copied buffer's media offset.
FindFirstBufferToCopy(uint64_t offset,int32_t & startIndex,uint64_t & prevOffset)414 bool DataPacker::FindFirstBufferToCopy(uint64_t offset, int32_t &startIndex, uint64_t &prevOffset)
415 {
416     startIndex = 0;
417     prevOffset= mediaOffset_;
418     do {
419         if (offset >= prevOffset && offset - prevOffset < AudioBufferSize(que_[startIndex])) {
420             return true;
421         }
422         prevOffset += AudioBufferSize(que_[startIndex]);
423         startIndex++;
424     } while (static_cast<size_t>(startIndex) < que_.size());
425     return false;
426 }
427 
428 // size : the GetRange size
429 // dstPtr : copy data to here
430 // dstBufferPtr : the AVBuffer contains dstPtr, pass this parameter to update pts / dts.
431 // bufferOffset : the buffer offset that we start copy
CopyFirstBuffer(size_t size,int32_t index,uint8_t * dstPtr,AVBufferPtr & dstBufferPtr,int32_t bufferOffset)432 size_t DataPacker::CopyFirstBuffer(size_t size, int32_t index, uint8_t *dstPtr, AVBufferPtr &dstBufferPtr,
433                                    int32_t bufferOffset)
434 {
435     auto remainSize = static_cast<int32_t>(AudioBufferSize(que_[index]) - bufferOffset);
436     FALSE_RETURN_V_MSG_E(remainSize > 0, 0, "Copy size can not be negative.");
437     size_t copySize = std::min(static_cast<size_t>(remainSize), size);
438     NZERO_LOG(memcpy_s(dstPtr, copySize,
439         AudioBufferReadOnlyData(que_[index]) + bufferOffset, copySize));
440 
441     dstBufferPtr->pts = que_[index]->pts;
442     dstBufferPtr->dts = que_[index]->dts;
443     return copySize;
444 }
445 
446 // prevOffset : the media offset of the first byte in the startIndex + 1 buffer
447 // offsetEnd : calculate from GetRange(offset, size), offsetEnd = offset + size.
448 // startIndex : the index start copy data for this GetRange. CopyFromSuccessiveBuffer process from startIndex + 1.
449 // dstPtr : copy data to here
450 // needCopySize : in and out, indicate how many bytes still need to copy.
CopyFromSuccessiveBuffer(uint64_t prevOffset,uint64_t offsetEnd,int32_t startIndex,uint8_t * dstPtr,uint32_t & needCopySize)451 int32_t DataPacker::CopyFromSuccessiveBuffer(uint64_t prevOffset, uint64_t offsetEnd, int32_t startIndex,
452                                              uint8_t *dstPtr, uint32_t &needCopySize)
453 {
454     size_t copySize;
455     int32_t usedCount = 0;
456     uint64_t curOffsetEnd;
457     prevOffset = prevOffset + AudioBufferSize(que_[startIndex]);
458     for (size_t i = startIndex + 1; i < que_.size(); ++i) {
459         usedCount++;
460         curOffsetEnd = prevOffset + AudioBufferSize(que_[i]);
461         if (curOffsetEnd >= offsetEnd) { // This buffer is enough
462             NZERO_LOG(memcpy_s(dstPtr, needCopySize, AudioBufferReadOnlyData(que_[i]), needCopySize));
463             needCopySize = 0;
464             return usedCount; // Finished copy buffer
465         } else {
466             copySize = AudioBufferSize(que_[i]);
467             NZERO_LOG(memcpy_s(dstPtr, copySize, AudioBufferReadOnlyData(que_[i]), copySize));
468             dstPtr += copySize;
469             needCopySize -= copySize;
470             prevOffset += copySize;
471         }
472     }
473     MEDIA_LOG_W("Processed all cached buffers, still not meet offsetEnd, maybe EOS reached.");
474     return usedCount;
475 }
476 
ToString() const477 std::string DataPacker::ToString() const
478 {
479     return "DataPacker (offset " + std::to_string(mediaOffset_) + ", size " + std::to_string(size_) +
480            ", buffer count " + std::to_string(que_.size()) + ")";
481 }
482 } // namespace Media
483 } // namespace OHOS
484