1 /*
2 * Copyright (c) 2021-2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define HST_LOG_TAG "DataPacker"
16
17 #include "data_packer.h"
18 #include <cstring>
19 #include "foundation/log.h"
20 #include "utils/dump_buffer.h"
21
22 namespace OHOS {
23 namespace Media {
24 #define EXEC_WHEN_GET(isGet, exec) \
25 do { \
26 if (isGet) { \
27 exec; \
28 } \
29 } while (0)
30
31 static const DataPacker::Position INVALID_POSITION = DataPacker::Position(-1, 0, 0);
32 static constexpr size_t MAX_BUFFER_NUMBER_IN_DATA_PACKER = 30;
33
DataPacker()34 DataPacker::DataPacker() : mutex_(), que_(), size_(0), mediaOffset_(0), pts_(0), dts_(0),
35 prevGet_(INVALID_POSITION), currentGet_(INVALID_POSITION), capacity_(MAX_BUFFER_NUMBER_IN_DATA_PACKER)
36 {
37 MEDIA_LOG_I("DataPacker ctor...");
38 }
39
~DataPacker()40 DataPacker::~DataPacker()
41 {
42 MEDIA_LOG_I("DataPacker dtor...");
43 cvEmpty_.NotifyAll();
44 cvFull_.NotifyAll();
45 }
46
AudioBufferSize(AVBufferPtr & ptr)47 inline static size_t AudioBufferSize(AVBufferPtr& ptr)
48 {
49 return ptr->GetMemory()->GetSize();
50 }
51
AudioBufferWritableData(AVBufferPtr & ptr,size_t size,size_t position=0)52 inline static uint8_t* AudioBufferWritableData(AVBufferPtr& ptr, size_t size, size_t position = 0)
53 {
54 return ptr->GetMemory()->GetWritableAddr(size, position);
55 }
56
AudioBufferReadOnlyData(AVBufferPtr & ptr)57 inline static const uint8_t* AudioBufferReadOnlyData(AVBufferPtr& ptr)
58 {
59 return ptr->GetMemory()->GetReadOnlyData();
60 }
61
PushData(AVBufferPtr bufferPtr,uint64_t offset)62 void DataPacker::PushData(AVBufferPtr bufferPtr, uint64_t offset)
63 {
64 size_t bufferSize = AudioBufferSize(bufferPtr);
65 MEDIA_LOG_DD("DataPacker PushData begin... buffer (offset " PUBLIC_LOG_U64 ", size " PUBLIC_LOG_ZU ")",
66 offset, bufferSize);
67 DUMP_BUFFER2LOG("DataPacker Push", bufferPtr, offset);
68 FALSE_RETURN_MSG(bufferSize > 0, "Can not push zero length buffer.");
69
70 OSAL::ScopedLock lock(mutex_);
71 if (que_.size() >= capacity_) {
72 MEDIA_LOG_D("DataPacker is full, waiting for pop.");
73 do {
74 cvFull_.WaitFor(lock, 1000, // 1000 ms
75 [this] { return que_.size() < capacity_ || stopped_.load(); });
76 if (stopped_.load()) {
77 MEDIA_LOG_D("DataPacker stopped, so return.");
78 return;
79 }
80 } while (que_.size() >= capacity_);
81 }
82
83 size_ += AudioBufferSize(bufferPtr);
84 if (que_.empty()) {
85 mediaOffset_ = offset;
86 dts_ = bufferPtr->dts;
87 pts_ = bufferPtr->pts;
88 }
89 que_.emplace_back(std::move(bufferPtr));
90 cvEmpty_.NotifyOne();
91 MEDIA_LOG_DD("DataPacker PushData end. " PUBLIC_LOG_S, ToString().c_str());
92 }
93
94 // curOffset: the offset end of this dataPacker. if IsDataAvailable() false, we can get data from source from curOffset
IsDataAvailable(uint64_t offset,uint32_t size,uint64_t & curOffset)95 bool DataPacker::IsDataAvailable(uint64_t offset, uint32_t size, uint64_t &curOffset)
96 {
97 MEDIA_LOG_DD("dataPacker (offset " PUBLIC_LOG_U64 ", size " PUBLIC_LOG_U32 "), curOffsetEnd is " PUBLIC_LOG_U64,
98 mediaOffset_, size_.load(), mediaOffset_ + size_.load());
99 MEDIA_LOG_DD(PUBLIC_LOG_S, ToString().c_str());
100 OSAL::ScopedLock lock(mutex_);
101 auto curOffsetTemp = mediaOffset_;
102 if (que_.empty() || offset < curOffsetTemp || offset > curOffsetTemp + size_) { // 原有数据无法命中, 则删除原有数据
103 curOffset = offset;
104 FlushInternal();
105 MEDIA_LOG_DD("IsDataAvailable false, offset not in cached data, clear it.");
106 return false;
107 }
108 size_t bufCnt = que_.size();
109 uint64_t offsetEnd = offset + size;
110 uint64_t curOffsetEnd = mediaOffset_ + AudioBufferSize(que_.front());
111 if (bufCnt == 1) {
112 curOffset = curOffsetEnd;
113 MEDIA_LOG_DD("IsDataAvailable bufCnt == 1, result " PUBLIC_LOG_D32, offsetEnd <= curOffsetEnd);
114 return offsetEnd <= curOffsetEnd;
115 }
116 auto preOffsetEnd = curOffsetEnd;
117 for (size_t i = 1; i < bufCnt; ++i) {
118 curOffsetEnd = preOffsetEnd + AudioBufferSize(que_[i]);
119 if (curOffsetEnd >= offsetEnd) {
120 MEDIA_LOG_DD("IsDataAvailable true, last buffer index " PUBLIC_LOG_ZU ", offsetEnd " PUBLIC_LOG_U64
121 ", curOffsetEnd " PUBLIC_LOG_U64, i, offsetEnd, curOffsetEnd);
122 return true;
123 } else {
124 preOffsetEnd = curOffsetEnd;
125 }
126 }
127 if (preOffsetEnd >= offsetEnd) {
128 MEDIA_LOG_DD("IsDataAvailable true, use all buffers, last buffer index " PUBLIC_LOG_ZU ", offsetEnd "
129 PUBLIC_LOG_U64 ", curOffsetEnd " PUBLIC_LOG_U64, bufCnt - 1, offsetEnd, curOffsetEnd);
130 return true;
131 }
132 curOffset = preOffsetEnd;
133 MEDIA_LOG_DD("IsDataAvailable false, offsetEnd " PUBLIC_LOG_U64 ", curOffsetEnd " PUBLIC_LOG_U64,
134 offsetEnd, preOffsetEnd);
135 return false;
136 }
137
PeekRange(uint64_t offset,uint32_t size,AVBufferPtr & bufferPtr)138 bool DataPacker::PeekRange(uint64_t offset, uint32_t size, AVBufferPtr& bufferPtr)
139 {
140 OSAL::ScopedLock lock(mutex_);
141 if (que_.empty()) {
142 MEDIA_LOG_D("DataPacker is empty, waiting for push.");
143 cvEmpty_.Wait(lock, [this] { return !que_.empty(); });
144 }
145
146 return PeekRangeInternal(offset, size, bufferPtr, false);
147 }
148
149 // Should call IsDataAvailable() before to make sure there is enough buffer to copy.
150 // offset : the offset (of the media file) to peek ( 要peek的数据起始位置 在media file文件 中的 offset )
151 // size : the size of data to peek
152 // bufferPtr : out buffer
153 // isGet : is it called from GetRange.
PeekRangeInternal(uint64_t offset,uint32_t size,AVBufferPtr & bufferPtr,bool isGet)154 bool DataPacker::PeekRangeInternal(uint64_t offset, uint32_t size, AVBufferPtr &bufferPtr, bool isGet)
155 {
156 MEDIA_LOG_DD("PeekRangeInternal (offset, size) = (" PUBLIC_LOG_U64 ", " PUBLIC_LOG_U32 ")...", offset, size);
157 int32_t startIndex = 0; // The index of buffer that we first use
158 size_t copySize = 0;
159 uint32_t needCopySize = size;
160 uint32_t firstBufferOffset = 0;
161 uint8_t* dstPtr = AudioBufferWritableData(bufferPtr, needCopySize);
162 FALSE_RETURN_V(dstPtr != nullptr, false);
163
164 auto offsetEnd = offset + needCopySize;
165 auto curOffsetEnd = mediaOffset_ + AudioBufferSize(que_[startIndex]);
166 if (offsetEnd <= curOffsetEnd) { // first buffer is enough
167 auto bufferOffset = static_cast<int32_t>(offset - mediaOffset_);
168 FALSE_RETURN_V_MSG_E(bufferOffset >= 0, false, "Copy buffer start position error.");
169 firstBufferOffset = bufferOffset;
170 copySize = CopyFirstBuffer(size, startIndex, dstPtr, bufferPtr, bufferOffset);
171 needCopySize -= copySize;
172 FALSE_LOG_MSG(needCopySize == 0, "First buffer is enough, but copySize is not enough");
173 EXEC_WHEN_GET(isGet, currentGet_ = Position(startIndex, firstBufferOffset, offset));
174 return true;
175 } else { // first buffer not enough
176 // Find the first buffer that should copy
177 uint64_t prevOffset; // The media offset of the startIndex buffer start byte
178 FALSE_RETURN_V_MSG_E(FindFirstBufferToCopy(offset, startIndex, prevOffset), false,
179 "Read offset(" PUBLIC_LOG_D64 ") size(" PUBLIC_LOG_D32 ") from " PUBLIC_LOG_S,
180 offset, size, ToString().c_str());
181 auto bufferOffset = static_cast<int32_t>(offset - prevOffset);
182 FALSE_RETURN_V_MSG_E(bufferOffset >= 0, false, "Copy buffer start position error.");
183 firstBufferOffset = bufferOffset;
184 copySize = CopyFirstBuffer(size, startIndex, dstPtr, bufferPtr, bufferOffset);
185
186 needCopySize -= copySize;
187 if (needCopySize == 0) { // First buffer is enough
188 EXEC_WHEN_GET(isGet, currentGet_ = Position(startIndex, firstBufferOffset, offset));
189 return true;
190 }
191 dstPtr += copySize;
192
193 // First buffer is not enough, copy from successive buffers
194 (void)CopyFromSuccessiveBuffer(prevOffset, offsetEnd, startIndex, dstPtr, needCopySize);
195 }
196 EXEC_WHEN_GET(isGet, currentGet_ = Position(startIndex, firstBufferOffset, offset));
197
198 // Update to the real size, especially at the end.
199 bufferPtr->GetMemory()->UpdateDataSize(size - needCopySize);
200 return true;
201 }
202
203 // Call IsDataAvailable() first before call GetRange
GetRange(uint64_t offset,uint32_t size,AVBufferPtr & bufferPtr)204 bool DataPacker::GetRange(uint64_t offset, uint32_t size, AVBufferPtr& bufferPtr)
205 {
206 MEDIA_LOG_DD("DataPacker GetRange(offset, size) = (" PUBLIC_LOG_U64 ", "
207 PUBLIC_LOG_U32 ")...", offset, size);
208 DUMP_BUFFER2LOG("GetRange Input", bufferPtr, 0);
209 FALSE_RETURN_V_MSG_E(bufferPtr && (!bufferPtr->IsEmpty()) && bufferPtr->GetMemory()->GetCapacity() >= size, false,
210 "GetRange input bufferPtr empty or capacity not enough.");
211
212 OSAL::ScopedLock lock(mutex_);
213 if (que_.empty()) {
214 MEDIA_LOG_D("DataPacker is empty, waiting for push");
215 cvEmpty_.Wait(lock, [this] { return !que_.empty(); });
216 }
217
218 FALSE_RETURN_V(!que_.empty(), false);
219 prevGet_ = currentGet_; // store last get position to prevGet_
220
221 FALSE_RETURN_V(PeekRangeInternal(offset, size, bufferPtr, true), false);
222 if (isEos_ && size_ <= size) { // Is EOS, and this time get all the data.
223 FlushInternal();
224 } else {
225 if (prevGet_ < currentGet_) {
226 RemoveOldData(currentGet_);
227 }
228 }
229
230 if (que_.size() < capacity_) {
231 cvFull_.NotifyOne();
232 }
233 return true;
234 }
235
236 // GetRange in live play mode
237 // 1. not use offset
238 // 2. remove the data have been read
GetRange(uint32_t size,AVBufferPtr & bufferPtr)239 bool DataPacker::GetRange(uint32_t size, AVBufferPtr& bufferPtr)
240 {
241 MEDIA_LOG_D("DataPacker live play GetRange(size) = (" PUBLIC_LOG_U32 ")...", size);
242 FALSE_RETURN_V_MSG_E(bufferPtr && (!bufferPtr->IsEmpty()) && bufferPtr->GetMemory()->GetCapacity() >= size, false,
243 "Live play GetRange input bufferPtr empty or capacity not enough.");
244
245 OSAL::ScopedLock lock(mutex_);
246 if (que_.empty()) {
247 FALSE_RETURN_V_W(!isEos_, false);
248 MEDIA_LOG_D("DataPacker is empty, live play GetRange waiting for push");
249 cvEmpty_.Wait(lock, [this] { return !que_.empty() || isEos_; });
250 if (isEos_) {
251 MEDIA_LOG_D("Eos wakeup the cvEmpty ConditionVariable");
252 return false;
253 }
254 }
255
256 FALSE_RETURN_V(!que_.empty(), false);
257
258 int32_t needCopySize = static_cast<int32_t>(size);
259 int32_t currCopySize = 0;
260 int32_t index = 0;
261 uint32_t lastBufferOffsetEnd = 0;
262
263 uint8_t* dstPtr = AudioBufferWritableData(bufferPtr, size);
264 FALSE_RETURN_V(dstPtr != nullptr, false);
265
266 while (index < que_.size()) {
267 AVBufferPtr& buffer = que_[index];
268 size_t bufferSize = AudioBufferSize(buffer);
269 currCopySize = std::min(static_cast<int32_t>(bufferSize), needCopySize);
270 currCopySize = CopyFirstBuffer(currCopySize, index, dstPtr, bufferPtr, 0);
271 lastBufferOffsetEnd = currCopySize;
272 dstPtr += currCopySize;
273 needCopySize -= currCopySize;
274 if (needCopySize <= 0) { // it is enough
275 break;
276 }
277 index++;
278 lastBufferOffsetEnd = 0;
279 }
280 FALSE_LOG(needCopySize >= 0);
281 if (needCopySize < 0) {
282 needCopySize = 0;
283 }
284 bufferPtr->GetMemory()->UpdateDataSize(size - needCopySize);
285
286 auto endPosition = Position(index, lastBufferOffsetEnd, mediaOffset_ + size - needCopySize);
287 RemoveOldData(endPosition); // Live play, remove the got data
288 if (que_.size() < capacity_) {
289 cvFull_.NotifyOne();
290 }
291 return true;
292 }
293
Flush()294 void DataPacker::Flush()
295 {
296 MEDIA_LOG_I("DataPacker Flush called.");
297 OSAL::ScopedLock lock(mutex_);
298 FlushInternal();
299 }
300
SetEos()301 void DataPacker::SetEos()
302 {
303 MEDIA_LOG_I("DataPacker SetEos called.");
304 OSAL::ScopedLock lock(mutex_);
305 isEos_ = true;
306 cvEmpty_.NotifyOne();
307 }
308
IsEmpty()309 bool DataPacker::IsEmpty()
310 {
311 OSAL::ScopedLock lock(mutex_);
312 return size_ > 0;
313 }
314
Start()315 void DataPacker::Start()
316 {
317 MEDIA_LOG_I("DataPacker Start called.");
318 stopped_.store(false);
319 }
320
Stop()321 void DataPacker::Stop()
322 {
323 MEDIA_LOG_I("DataPacker Stop called.");
324 stopped_.store(true);
325 cvEmpty_.NotifyAll(); // avoid some thread can not exit
326 cvFull_.NotifyAll();
327 }
328
FlushInternal()329 void DataPacker::FlushInternal()
330 {
331 MEDIA_LOG_D("DataPacker FlushInternal called.");
332 que_.clear();
333 size_ = 0;
334 mediaOffset_ = 0;
335 dts_ = 0;
336 pts_ = 0;
337 isEos_ = false;
338 prevGet_ = INVALID_POSITION;
339 currentGet_ = INVALID_POSITION;
340 }
341
342 // Remove first removeSize data in the buffer
RemoveBufferContent(std::shared_ptr<AVBuffer> & buffer,size_t removeSize)343 void DataPacker::RemoveBufferContent(std::shared_ptr<AVBuffer> &buffer, size_t removeSize)
344 {
345 if (removeSize == 0) {
346 return;
347 }
348 auto memory = buffer->GetMemory();
349 FALSE_RETURN(removeSize < memory->GetSize());
350 auto copySize = memory->GetSize() - removeSize;
351 FALSE_LOG_MSG(memmove_s(memory->GetWritableAddr(copySize), memory->GetCapacity(),
352 memory->GetReadOnlyData(removeSize), copySize) == EOK, "memmove failed.");
353 FALSE_RETURN(UpdateWhenFrontDataRemoved(removeSize));
354 }
355
356 // Remove consumed data, and make the remaining data continuous
357 // Consumed data - between prevGet_.first and currentGet_.first
358 // In order to make remaining data continuous, also remove the data before prevGet_.first
359 // Update to support live play mode, Remove the data before position
RemoveOldData(const Position & position)360 void DataPacker::RemoveOldData(const Position& position)
361 {
362 MEDIA_LOG_DD("Before RemoveOldData " PUBLIC_LOG_S, ToString().c_str());
363 FALSE_LOG(RemoveTo(position));
364 if (que_.empty()) {
365 mediaOffset_ = 0;
366 size_ = 0;
367 pts_ = 0;
368 dts_ = 0;
369 } else {
370 pts_ = que_.front()->pts;
371 dts_ = que_.front()->dts;
372 }
373 MEDIA_LOG_DD("After RemoveOldData " PUBLIC_LOG_S, ToString().c_str());
374 }
375
RemoveTo(const Position & position)376 bool DataPacker::RemoveTo(const Position& position)
377 {
378 MEDIA_LOG_DD("Remove to " PUBLIC_LOG_S, position.ToString().c_str());
379 size_t removeSize;
380 int32_t i = 0;
381 while (i < position.index && !que_.empty()) { // Remove all whole buffer before position.index
382 removeSize = AudioBufferSize(que_.front());
383 FALSE_RETURN_V(UpdateWhenFrontDataRemoved(removeSize), false);
384 que_.pop_front();
385 i++;
386 }
387 FALSE_RETURN_V_W(!que_.empty(), true);
388
389 // The last buffer
390 removeSize = AudioBufferSize(que_.front());
391 // 1. If whole buffer should be removed
392 if (position.bufferOffset >= removeSize) {
393 FALSE_RETURN_V(UpdateWhenFrontDataRemoved(removeSize), false);
394 que_.pop_front();
395 return true;
396 }
397 // 2. Remove the front part of the buffer data
398 RemoveBufferContent(que_.front(), position.bufferOffset);
399 return true;
400 }
401
UpdateWhenFrontDataRemoved(size_t removeSize)402 bool DataPacker::UpdateWhenFrontDataRemoved(size_t removeSize)
403 {
404 mediaOffset_ += removeSize;
405 FALSE_RETURN_V_MSG_E(size_.load() >= removeSize, false, "Total size(size_ " PUBLIC_LOG_U32
406 ") smaller than removeSize(" PUBLIC_LOG_ZU ")", size_.load(), removeSize);
407 size_ -= removeSize;
408 return true;
409 }
410
411 // offset : from GetRange(offset, size)
412 // startIndex : out, find the first buffer should copy
413 // prevOffset : the first copied buffer's media offset.
FindFirstBufferToCopy(uint64_t offset,int32_t & startIndex,uint64_t & prevOffset)414 bool DataPacker::FindFirstBufferToCopy(uint64_t offset, int32_t &startIndex, uint64_t &prevOffset)
415 {
416 startIndex = 0;
417 prevOffset= mediaOffset_;
418 do {
419 if (offset >= prevOffset && offset - prevOffset < AudioBufferSize(que_[startIndex])) {
420 return true;
421 }
422 prevOffset += AudioBufferSize(que_[startIndex]);
423 startIndex++;
424 } while (static_cast<size_t>(startIndex) < que_.size());
425 return false;
426 }
427
428 // size : the GetRange size
429 // dstPtr : copy data to here
430 // dstBufferPtr : the AVBuffer contains dstPtr, pass this parameter to update pts / dts.
431 // bufferOffset : the buffer offset that we start copy
CopyFirstBuffer(size_t size,int32_t index,uint8_t * dstPtr,AVBufferPtr & dstBufferPtr,int32_t bufferOffset)432 size_t DataPacker::CopyFirstBuffer(size_t size, int32_t index, uint8_t *dstPtr, AVBufferPtr &dstBufferPtr,
433 int32_t bufferOffset)
434 {
435 auto remainSize = static_cast<int32_t>(AudioBufferSize(que_[index]) - bufferOffset);
436 FALSE_RETURN_V_MSG_E(remainSize > 0, 0, "Copy size can not be negative.");
437 size_t copySize = std::min(static_cast<size_t>(remainSize), size);
438 NZERO_LOG(memcpy_s(dstPtr, copySize,
439 AudioBufferReadOnlyData(que_[index]) + bufferOffset, copySize));
440
441 dstBufferPtr->pts = que_[index]->pts;
442 dstBufferPtr->dts = que_[index]->dts;
443 return copySize;
444 }
445
446 // prevOffset : the media offset of the first byte in the startIndex + 1 buffer
447 // offsetEnd : calculate from GetRange(offset, size), offsetEnd = offset + size.
448 // startIndex : the index start copy data for this GetRange. CopyFromSuccessiveBuffer process from startIndex + 1.
449 // dstPtr : copy data to here
450 // needCopySize : in and out, indicate how many bytes still need to copy.
CopyFromSuccessiveBuffer(uint64_t prevOffset,uint64_t offsetEnd,int32_t startIndex,uint8_t * dstPtr,uint32_t & needCopySize)451 int32_t DataPacker::CopyFromSuccessiveBuffer(uint64_t prevOffset, uint64_t offsetEnd, int32_t startIndex,
452 uint8_t *dstPtr, uint32_t &needCopySize)
453 {
454 size_t copySize;
455 int32_t usedCount = 0;
456 uint64_t curOffsetEnd;
457 prevOffset = prevOffset + AudioBufferSize(que_[startIndex]);
458 for (size_t i = startIndex + 1; i < que_.size(); ++i) {
459 usedCount++;
460 curOffsetEnd = prevOffset + AudioBufferSize(que_[i]);
461 if (curOffsetEnd >= offsetEnd) { // This buffer is enough
462 NZERO_LOG(memcpy_s(dstPtr, needCopySize, AudioBufferReadOnlyData(que_[i]), needCopySize));
463 needCopySize = 0;
464 return usedCount; // Finished copy buffer
465 } else {
466 copySize = AudioBufferSize(que_[i]);
467 NZERO_LOG(memcpy_s(dstPtr, copySize, AudioBufferReadOnlyData(que_[i]), copySize));
468 dstPtr += copySize;
469 needCopySize -= copySize;
470 prevOffset += copySize;
471 }
472 }
473 MEDIA_LOG_W("Processed all cached buffers, still not meet offsetEnd, maybe EOS reached.");
474 return usedCount;
475 }
476
ToString() const477 std::string DataPacker::ToString() const
478 {
479 return "DataPacker (offset " + std::to_string(mediaOffset_) + ", size " + std::to_string(size_) +
480 ", buffer count " + std::to_string(que_.size()) + ")";
481 }
482 } // namespace Media
483 } // namespace OHOS
484