• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "avbuffer_queue_consumer_impl.h"
17 #include "avbuffer_queue_impl.h"
18 #include "avbuffer_queue_producer_impl.h"
19 #include "common/log.h"
20 #include "meta/media_types.h"
21 
22 namespace {
23 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "AVBufferQueue" };
24 static constexpr uint8_t LOG_LIMIT_LOW_FREQ = 64;
25 constexpr int64_t MILLISECONDS_TO_MICROSECONDS = 1000;
26 }
27 
28 namespace OHOS {
29 namespace Media {
30 
Create(uint32_t size,MemoryType type,const std::string & name,bool disableAlloc)31 std::shared_ptr<AVBufferQueue> AVBufferQueue::Create(
32     uint32_t size, MemoryType type, const std::string& name, bool disableAlloc)
33 {
34     MEDIA_LOG_D("AVBufferQueue::Create size = %u, type = %u, name = %s",
35                 size, static_cast<uint32_t>(type), name.c_str());
36     return std::make_shared<AVBufferQueueImpl>(size, type, name, disableAlloc);
37 }
38 
GetLocalProducer()39 std::shared_ptr<AVBufferQueueProducer> AVBufferQueueImpl::GetLocalProducer()
40 {
41     std::lock_guard<std::mutex> lockGuard(producerCreatorMutex_);
42     std::shared_ptr<AVBufferQueueProducerImpl> producer = nullptr;
43     if (localProducer_.expired()) {
44         auto shared_this = shared_from_this();
45         FALSE_RETURN_V(shared_this != nullptr, nullptr);
46         producer = std::make_shared<AVBufferQueueProducerImpl>(shared_this);
47         localProducer_ = producer;
48     }
49 
50     return localProducer_.lock();
51 }
52 
GetLocalConsumer()53 std::shared_ptr<AVBufferQueueConsumer> AVBufferQueueImpl::GetLocalConsumer()
54 {
55     std::lock_guard<std::mutex> lockGuard(consumerCreatorMutex_);
56     std::shared_ptr<AVBufferQueueConsumerImpl> consumer = nullptr;
57     if (localConsumer_.expired()) {
58         auto shared_this = shared_from_this();
59         FALSE_RETURN_V(shared_this != nullptr, nullptr);
60         consumer = std::make_shared<AVBufferQueueConsumerImpl>(shared_this);
61         localConsumer_ = consumer;
62     }
63     return localConsumer_.lock();
64 }
65 
GetProducer()66 sptr<AVBufferQueueProducer> AVBufferQueueImpl::GetProducer()
67 {
68     std::lock_guard<std::mutex> lockGuard(producerCreatorMutex_);
69     sptr<AVBufferQueueProducerImpl> producer = nullptr;
70     if (producer_ == nullptr || producer_->GetSptrRefCount() <= 0) {
71         auto shared_this = shared_from_this();
72         FALSE_RETURN_V(shared_this != nullptr, nullptr);
73         producer = new AVBufferQueueProducerImpl(shared_this);
74         producer_ = producer;
75     }
76 
77     return producer_.promote();
78 }
79 
GetConsumer()80 sptr<AVBufferQueueConsumer> AVBufferQueueImpl::GetConsumer()
81 {
82     std::lock_guard<std::mutex> lockGuard(consumerCreatorMutex_);
83     sptr<AVBufferQueueConsumerImpl> consumer = nullptr;
84     if (consumer_ == nullptr || consumer_->GetSptrRefCount() <= 0) {
85         auto shared_this = shared_from_this();
86         FALSE_RETURN_V(shared_this != nullptr, nullptr);
87         consumer = new AVBufferQueueConsumerImpl(shared_this);
88         consumer_ = consumer;
89     }
90 
91     return consumer_.promote();
92 }
93 
AVBufferQueueImpl(const std::string & name)94 AVBufferQueueImpl::AVBufferQueueImpl(const std::string &name)
95     : AVBufferQueue(), name_(name), size_(0), memoryType_(MemoryType::UNKNOWN_MEMORY), disableAlloc_(false) {}
96 
AVBufferQueueImpl(uint32_t size,MemoryType type,const std::string & name,bool disableAlloc)97 AVBufferQueueImpl::AVBufferQueueImpl(uint32_t size, MemoryType type, const std::string &name, bool disableAlloc)
98     : AVBufferQueue(), name_(name), size_(size), memoryType_(type), disableAlloc_(disableAlloc)
99 {
100     if (size_ > AVBUFFER_QUEUE_MAX_QUEUE_SIZE) {
101         size_ = AVBUFFER_QUEUE_MAX_QUEUE_SIZE;
102     }
103 }
104 
GetQueueSize()105 uint32_t AVBufferQueueImpl::GetQueueSize()
106 {
107     return size_;
108 }
109 
SetQueueSize(uint32_t size)110 Status AVBufferQueueImpl::SetQueueSize(uint32_t size)
111 {
112     FALSE_RETURN_V(size >= 0 && size <= AVBUFFER_QUEUE_MAX_QUEUE_SIZE && size != size_,
113                    Status::ERROR_INVALID_BUFFER_SIZE);
114 
115     return SetLargerQueueSize(size);
116 }
117 
SetLargerQueueSize(uint32_t size)118 Status AVBufferQueueImpl::SetLargerQueueSize(uint32_t size)
119 {
120     FALSE_RETURN_V(size >= 0 && size <= AVBUFFER_QUEUE_MAX_QUEUE_SIZE_FOR_LARGER  && size != size_,
121                    Status::ERROR_INVALID_BUFFER_SIZE);
122 
123     if (size > size_) {
124         size_ = size;
125         if (!disableAlloc_) {
126             requestCondition.notify_all();
127         }
128     } else {
129         std::lock_guard<std::mutex> lockGuard(queueMutex_);
130         DeleteBuffers(size_ - size);
131         size_ = size;
132     }
133 
134     return Status::OK;
135 }
136 
ClearBufferIf(std::function<bool (const std::shared_ptr<AVBuffer> &)> pred)137 Status AVBufferQueueImpl::ClearBufferIf(std::function<bool(const std::shared_ptr<AVBuffer>&)> pred)
138 {
139     std::lock_guard<std::mutex> lockGuard(queueMutex_);
140     for (auto dirtyIt = dirtyBufferList_.begin(); dirtyIt != dirtyBufferList_.end();) {
141         uint64_t uniqueId = *dirtyIt;
142         auto cacheIt = cachedBufferMap_.find(uniqueId);
143         if (cacheIt == cachedBufferMap_.end()) {
144             MEDIA_LOG_E("unexpected buffer uniqueId=" PUBLIC_LOG_U64, uniqueId);
145             ++dirtyIt;
146             continue;
147         }
148         if (cacheIt->second.state != AVBUFFER_STATE_PUSHED && cacheIt->second.state != AVBUFFER_STATE_RETURNED) {
149             MEDIA_LOG_I("ignore unexpected buffer status uniqueId=" PUBLIC_LOG_U64 ",state= " PUBLIC_LOG_D32,
150                 uniqueId,
151                 static_cast<int32_t>(cacheIt->second.state));
152             ++dirtyIt;
153             continue;
154         }
155 
156         if (pred(cacheIt->second.buffer)) {
157             MEDIA_LOG_D("ClearBufferIf pred ok uniqueId=" PUBLIC_LOG_U64 ",pts=" PUBLIC_LOG_D64,
158                 uniqueId,
159                 cacheIt->second.buffer->pts_);
160             cacheIt->second.state = AVBUFFER_STATE_RELEASED;
161             InsertFreeBufferInOrder(uniqueId);
162             dirtyIt = dirtyBufferList_.erase(dirtyIt);
163         } else {
164             ++dirtyIt;
165         }
166     }
167     requestCondition.notify_all();
168     return Status::OK;
169 }
170 
IsBufferInQueue(const std::shared_ptr<AVBuffer> & buffer)171 bool AVBufferQueueImpl::IsBufferInQueue(const std::shared_ptr<AVBuffer>& buffer)
172 {
173     FALSE_RETURN_V(buffer != nullptr, false);
174     auto uniqueId = buffer->GetUniqueId();
175     return cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end();
176 }
177 
GetCachedBufferCount() const178 uint32_t AVBufferQueueImpl::GetCachedBufferCount() const
179 {
180     // 确保cachedBufferMap_.size()不会超过MAX_UINT32
181     return static_cast<uint32_t>(cachedBufferMap_.size());
182 }
183 
PopFromFreeBufferList(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)184 Status AVBufferQueueImpl::PopFromFreeBufferList(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
185 {
186     for (auto it = freeBufferList_.begin(); it != freeBufferList_.end(); it++) {
187         if (config <= cachedBufferMap_[*it].config) {
188             buffer = cachedBufferMap_[*it].buffer;
189             freeBufferList_.erase(it);
190             return Status::OK;
191         }
192     }
193 
194     if (freeBufferList_.empty()) {
195         buffer = nullptr;
196         // 没有可以重用的freeBuffer
197         return Status::ERROR_NO_FREE_BUFFER;
198     }
199 
200     buffer = cachedBufferMap_[freeBufferList_.front()].buffer;
201     freeBufferList_.pop_front();
202 
203     return Status::OK;
204 }
205 
PopFromDirtyBufferList(std::shared_ptr<AVBuffer> & buffer)206 Status AVBufferQueueImpl::PopFromDirtyBufferList(std::shared_ptr<AVBuffer>& buffer)
207 {
208     FALSE_RETURN_V_NOLOG(!dirtyBufferList_.empty(), Status::ERROR_NO_DIRTY_BUFFER);
209 
210     buffer = cachedBufferMap_[dirtyBufferList_.front()].buffer;
211     dirtyBufferList_.pop_front();
212     return Status::OK;
213 }
214 
AllocBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)215 Status AVBufferQueueImpl::AllocBuffer(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
216 {
217     auto bufferImpl = AVBuffer::CreateAVBuffer(config);
218     FALSE_RETURN_V(bufferImpl != nullptr, Status::ERROR_CREATE_BUFFER);
219 
220     auto uniqueId = bufferImpl->GetUniqueId();
221     AVBufferElement ele = {
222         .config = bufferImpl->GetConfig(),
223         .state = AVBUFFER_STATE_RELEASED,
224         .isDeleting = false,
225         .buffer = bufferImpl,
226     };
227     cachedBufferMap_[uniqueId] = ele;
228     buffer = bufferImpl;
229     TotalMemoryCalculation(true, ele.config.capacity);
230 
231     return Status::OK;
232 }
233 
RequestReuseBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)234 Status AVBufferQueueImpl::RequestReuseBuffer(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
235 {
236     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
237 
238     auto uniqueId = buffer->GetUniqueId();
239     FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_CREATE_BUFFER);
240 
241     if (config <= cachedBufferMap_[uniqueId].config) {
242         // 不需要重新分配,直接更新buffer大小
243         cachedBufferMap_[uniqueId].config.size = config.size;
244     } else {
245         // 重新分配
246         DeleteCachedBufferById(uniqueId);
247         NOK_RETURN(AllocBuffer(buffer, config));
248     }
249 
250     // 注意这里的uniqueId可能因为重新分配buffer而更新,所以需要再次获取
251     cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_REQUESTED;
252     return Status::OK;
253 }
254 
DeleteBuffers(uint32_t count)255 void AVBufferQueueImpl::DeleteBuffers(uint32_t count)
256 {
257     FALSE_RETURN(count > 0);
258 
259     while (!freeBufferList_.empty()) {
260         DeleteCachedBufferById(freeBufferList_.front());
261         freeBufferList_.pop_front();
262         count--;
263         if (count <= 0) {
264             return;
265         }
266     }
267 
268     while (!dirtyBufferList_.empty()) {
269         DeleteCachedBufferById(dirtyBufferList_.front());
270         dirtyBufferList_.pop_front();
271         count--;
272         if (count <= 0) {
273             return;
274         }
275     }
276 
277     for (auto&& ele : cachedBufferMap_) {
278         ele.second.isDeleting = true;
279         // we don't have to do anything
280         count--;
281         if (count <= 0) {
282             break;
283         }
284     }
285 }
286 
DeleteCachedBufferById(uint64_t uniqueId)287 void AVBufferQueueImpl::DeleteCachedBufferById(uint64_t uniqueId)
288 {
289     auto it = cachedBufferMap_.find(uniqueId);
290     if (it != cachedBufferMap_.end()) {
291         MEDIA_LOG_D("DeleteCachedBufferById uniqueId:%llu, state:%d", uniqueId, it->second.state);
292         TotalMemoryCalculation(false, it->second.config.capacity);
293         cachedBufferMap_.erase(it);
294     }
295 }
296 
CheckConfig(const AVBufferConfig & config)297 Status AVBufferQueueImpl::CheckConfig(const AVBufferConfig& config)
298 {
299     if (config.memoryType == MemoryType::UNKNOWN_MEMORY) {
300         MEDIA_LOG_D("config.memoryType != MemoryType::UNKNOWN_MEMORY");
301         return Status::ERROR_UNEXPECTED_MEMORY_TYPE;
302     }
303     // memoryType_初始化之后将无法改变。
304     if (memoryType_ != MemoryType::UNKNOWN_MEMORY && config.memoryType != memoryType_) {
305         MEDIA_LOG_D("memoryType_ != MemoryType::UNKNOWN_MEMORY && config.memoryType != memoryType_");
306         return Status::ERROR_UNEXPECTED_MEMORY_TYPE;
307     }
308     memoryType_ = config.memoryType;
309     return Status::OK;
310 }
311 
wait_for(std::unique_lock<std::mutex> & lock,int64_t timeoutUs)312 bool AVBufferQueueImpl::wait_for(std::unique_lock<std::mutex>& lock, int64_t timeoutUs)
313 {
314     MEDIA_LOG_D("wait for free buffer, timeout = %{public}" PRId64, timeoutUs);
315     if (timeoutUs > 0) {
316         return requestCondition.wait_for(
317             lock, std::chrono::microseconds(timeoutUs), [this]() {
318                 return !freeBufferList_.empty() || (GetCachedBufferCount() < GetQueueSize());
319             });
320     } else if (timeoutUs < 0) {
321         requestCondition.wait(lock);
322     }
323     return true;
324 }
325 
RequestBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config,int32_t timeoutMs)326 Status AVBufferQueueImpl::RequestBuffer(
327     std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config, int32_t timeoutMs)
328 {
329     return RequestBufferWaitUs(buffer, config, static_cast<int64_t>(timeoutMs) * MILLISECONDS_TO_MICROSECONDS);
330 }
331 
RequestBufferWaitUs(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config,int64_t timeoutUs)332 Status AVBufferQueueImpl::RequestBufferWaitUs(
333     std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config, int64_t timeoutUs)
334 {
335     auto configCopy = config;
336     if (config.memoryType == MemoryType::UNKNOWN_MEMORY) {
337         MEDIA_LOG_D("AVBufferQueueImpl::RequestBuffer config.memoryType unknown, "
338                     "memoryType_ = %u", static_cast<uint32_t>(memoryType_));
339         configCopy.memoryType = memoryType_;
340     }
341 
342     // check param
343     std::unique_lock<std::mutex> lock(queueMutex_);
344     auto res = CheckConfig(configCopy);
345     FALSE_RETURN_V_MSG(res == Status::OK,
346         res, "CheckConfig not OK, code %{public}d", static_cast<int32_t>(res));
347     // dequeue from free list
348     auto ret = PopFromFreeBufferList(buffer, configCopy);
349     if (ret == Status::OK) {
350         return RequestReuseBuffer(buffer, configCopy);
351     }
352 
353     // check queue size
354     if (GetCachedBufferCount() >= GetQueueSize()) {
355         if (!wait_for(lock, timeoutUs)) {
356             MEDIA_LOG_D("FALSE_RETURN_V wait_for(lock, timeoutUs)");
357             return Status::ERROR_WAIT_TIMEOUT;
358         }
359         // 被条件唤醒后,再次尝试从freeBufferList中取buffer
360         ret = PopFromFreeBufferList(buffer, configCopy);
361         if (ret == Status::OK) {
362             return RequestReuseBuffer(buffer, configCopy);
363         }
364         if (GetCachedBufferCount() >= GetQueueSize()) {
365             return Status::ERROR_NO_FREE_BUFFER;
366         }
367     }
368 
369     NOK_RETURN(AllocBuffer(buffer, configCopy));
370     cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_REQUESTED;
371 
372     return Status::OK;
373 }
374 
InsertFreeBufferInOrder(uint64_t uniqueId)375 void AVBufferQueueImpl::InsertFreeBufferInOrder(uint64_t uniqueId)
376 {
377     for (auto it = freeBufferList_.begin(); it != freeBufferList_.end(); it++) {
378         if ((*it != uniqueId) &&
379                 (cachedBufferMap_[*it].config.capacity >= cachedBufferMap_[uniqueId].config.capacity)) {
380             freeBufferList_.insert(it, uniqueId);
381             return;
382         }
383     }
384     freeBufferList_.emplace_back(uniqueId);
385 }
386 
CancelBuffer(uint64_t uniqueId)387 Status AVBufferQueueImpl::CancelBuffer(uint64_t uniqueId)
388 {
389     FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_INVALID_BUFFER_ID);
390 
391     FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_REQUESTED ||
392                    cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_PUSHED,
393                    Status::ERROR_INVALID_BUFFER_STATE);
394 
395     InsertFreeBufferInOrder(uniqueId);
396 
397     cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RELEASED;
398 
399     requestCondition.notify_all();
400 
401     MEDIA_LOG_D("cancel buffer id = %llu", uniqueId);
402 
403     return Status::OK;
404 }
405 
PushBuffer(uint64_t uniqueId,bool available)406 Status AVBufferQueueImpl::PushBuffer(uint64_t uniqueId, bool available)
407 {
408     std::shared_ptr<AVBuffer> buffer = nullptr;
409     {
410         std::lock_guard<std::mutex> lockGuard(queueMutex_);
411         FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(),
412                        Status::ERROR_INVALID_BUFFER_ID);
413 
414         auto& ele = cachedBufferMap_[uniqueId];
415         if (ele.isDeleting) {
416             DeleteCachedBufferById(uniqueId);
417             MEDIA_LOG_D("delete push buffer uniqueId(%llu)", uniqueId);
418             return Status::OK;
419         }
420 
421         if (available) {
422             FALSE_RETURN_V(ele.buffer->GetConfig().size >= 0, Status::ERROR_INVALID_BUFFER_SIZE);
423         }
424 
425         FALSE_RETURN_V(ele.state == AVBUFFER_STATE_REQUESTED || ele.state == AVBUFFER_STATE_ATTACHED,
426                        Status::ERROR_INVALID_BUFFER_STATE);
427 
428         ele.state = AVBUFFER_STATE_PUSHED;
429         buffer = cachedBufferMap_[uniqueId].buffer;
430     }
431 
432     if (available) {
433         std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
434         if (!brokerListeners_.empty() && brokerListeners_.back() != nullptr) {
435             brokerListeners_.back()->OnBufferFilled(buffer);
436             return Status::OK;
437         }
438     }
439 
440     return ReturnBuffer(uniqueId, available);
441 }
442 
PushBuffer(const std::shared_ptr<AVBuffer> & buffer,bool available)443 Status AVBufferQueueImpl::PushBuffer(const std::shared_ptr<AVBuffer>& buffer, bool available)
444 {
445     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
446 
447     return PushBuffer(buffer->GetUniqueId(), available);
448 }
449 
ReturnBuffer(uint64_t uniqueId,bool available)450 Status __attribute__((no_sanitize("cfi"))) AVBufferQueueImpl::ReturnBuffer(uint64_t uniqueId, bool available)
451 {
452     {
453         std::lock_guard<std::mutex> lockGuard(queueMutex_);
454         FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(),
455                        Status::ERROR_INVALID_BUFFER_ID);
456 
457         if (cachedBufferMap_[uniqueId].isDeleting) {
458             DeleteCachedBufferById(uniqueId);
459             MEDIA_LOG_D("delete return buffer uniqueId(%llu)", uniqueId);
460             return Status::OK;
461         }
462 
463         FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_PUSHED,
464                        Status::ERROR_INVALID_BUFFER_STATE);
465 
466         if (!available) {
467             NOK_RETURN(CancelBuffer(uniqueId));
468         } else {
469             auto& config = cachedBufferMap_[uniqueId].buffer->GetConfig();
470             bool isEosBuffer = cachedBufferMap_[uniqueId].buffer->flag_ & (uint32_t)(Plugins::AVBufferFlag::EOS);
471             if (!isEosBuffer) {
472                 FALSE_RETURN_V(config.size > 0, Status::ERROR_INVALID_BUFFER_SIZE);
473             }
474             cachedBufferMap_[uniqueId].config = config;
475             cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RETURNED;
476             dirtyBufferList_.push_back(uniqueId);
477         }
478     }
479 
480     if (!available) {
481         std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
482         if (producerListener_ != nullptr) {
483             producerListener_->OnBufferAvailable();
484         }
485         return Status::OK;
486     }
487 
488     std::lock_guard<std::mutex> lockGuard(consumerListenerMutex_);
489     FALSE_RETURN_V(consumerListener_ != nullptr, Status::ERROR_NO_CONSUMER_LISTENER);
490     consumerListener_->OnBufferAvailable();
491 
492     return Status::OK;
493 }
494 
ReturnBuffer(const std::shared_ptr<AVBuffer> & buffer,bool available)495 Status AVBufferQueueImpl::ReturnBuffer(const std::shared_ptr<AVBuffer>& buffer, bool available)
496 {
497     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
498 
499     return ReturnBuffer(buffer->GetUniqueId(), available);
500 }
501 
GetMemoryUsage()502 uint32_t AVBufferQueueImpl::GetMemoryUsage()
503 {
504     return memoryUsage_.load();
505 }
506 
SetQueueSizeAndAttachBuffer(uint32_t size,std::shared_ptr<AVBuffer> & buffer,bool isFilled)507 Status AVBufferQueueImpl::SetQueueSizeAndAttachBuffer(uint32_t size,
508     std::shared_ptr<AVBuffer>& buffer, bool isFilled)
509 {
510     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
511     auto config = buffer->GetConfig();
512     auto uniqueId = buffer->GetUniqueId();
513     {
514         std::lock_guard<std::mutex> lockGuard(queueMutex_);
515         if (size >= 0 && size <= AVBUFFER_QUEUE_MAX_QUEUE_SIZE && size != size_) {
516             SetQueueSizeBeforeAttachBufferLocked(size);
517         }
518         FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) == cachedBufferMap_.end(),
519                        Status::ERROR_INVALID_BUFFER_ID);
520         NOK_RETURN(CheckConfig(config));
521         Status result = AttachAvailableBufferLocked(buffer);
522         FALSE_RETURN_V(result == Status::OK, result);
523     }
524     if (isFilled) {
525         return PushBufferOnFilled(uniqueId, isFilled);
526     }
527     return ReleaseBuffer(uniqueId);
528 }
529 
TotalMemoryCalculation(bool isAdd,int32_t capacity)530 void AVBufferQueueImpl::TotalMemoryCalculation(bool isAdd, int32_t capacity)
531 {
532     FALSE_RETURN(capacity > 0);
533     uint32_t capacityUint = static_cast<uint32_t>(capacity);
534     if (isAdd) {
535         memoryUsage_.fetch_add(capacityUint);
536     } else {
537         FALSE_RETURN(capacityUint <= memoryUsage_.load());
538         memoryUsage_.fetch_sub(capacityUint);
539     }
540 }
541 
AttachAvailableBufferLocked(std::shared_ptr<AVBuffer> & buffer)542 Status AVBufferQueueImpl::AttachAvailableBufferLocked(std::shared_ptr<AVBuffer>& buffer)
543 {
544     auto config = buffer->GetConfig();
545     auto uniqueId = buffer->GetUniqueId();
546     AVBufferElement ele = {
547         .config = config,
548         .state = AVBUFFER_STATE_ATTACHED,
549         .isDeleting = false,
550         .buffer = buffer
551     };
552 
553     auto cachedCount = GetCachedBufferCount();
554     auto queueSize = GetQueueSize();
555     if (cachedCount >= queueSize) {
556         auto validCount = static_cast<uint32_t>(dirtyBufferList_.size() + freeBufferList_.size());
557         auto toBeDeleteCount = cachedCount - queueSize;
558         // 这里表示有可以删除的buffer,或者
559         if (validCount > toBeDeleteCount) {
560             // 在什么场景下需要在此处删除buffer?
561             DeleteBuffers(toBeDeleteCount + 1); // 多删除一个,用于attach当前buffer
562             cachedBufferMap_[uniqueId] = ele;
563             TotalMemoryCalculation(true, ele.config.capacity);
564             MEDIA_LOG_D("uniqueId(%llu) attached with delete", uniqueId);
565         } else {
566             MEDIA_LOG_E("attach failed, out of range");
567             return Status::ERROR_OUT_OF_RANGE;
568         }
569     } else {
570         cachedBufferMap_[uniqueId] = ele;
571         TotalMemoryCalculation(true, ele.config.capacity);
572         MEDIA_LOG_D("uniqueId(%llu) attached without delete", uniqueId);
573     }
574     return Status::OK;
575 }
576 
PushBufferOnFilled(uint64_t uniqueId,bool isFilled)577 Status AVBufferQueueImpl::PushBufferOnFilled(uint64_t uniqueId, bool isFilled)
578 {
579     auto ret = PushBuffer(uniqueId, isFilled);
580     if (ret != Status::OK) {
581         // PushBuffer失败,强制Detach
582         DetachBuffer(uniqueId, true);
583     }
584     return ret;
585 }
586 
SetQueueSizeBeforeAttachBufferLocked(uint32_t size)587 void AVBufferQueueImpl::SetQueueSizeBeforeAttachBufferLocked(uint32_t size)
588 {
589     if (size > size_) {
590         size_ = size;
591         if (!disableAlloc_) {
592             requestCondition.notify_all();
593         }
594     } else {
595         DeleteBuffers(size_ - size);
596         size_ = size;
597     }
598 }
599 
AttachBuffer(std::shared_ptr<AVBuffer> & buffer,bool isFilled)600 Status AVBufferQueueImpl::AttachBuffer(std::shared_ptr<AVBuffer>& buffer, bool isFilled)
601 {
602     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
603 
604     auto config = buffer->GetConfig();
605     auto uniqueId = buffer->GetUniqueId();
606     {
607         std::lock_guard<std::mutex> lockGuard(queueMutex_);
608         FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) == cachedBufferMap_.end(),
609                        Status::ERROR_INVALID_BUFFER_ID);
610 
611         NOK_RETURN(CheckConfig(config));
612 
613         Status result = AttachAvailableBufferLocked(buffer);
614         FALSE_RETURN_V(result == Status::OK, result);
615     }
616 
617     if (isFilled) {
618         return PushBufferOnFilled(uniqueId, isFilled);
619     }
620 
621     return ReleaseBuffer(uniqueId);
622 }
623 
DetachBuffer(uint64_t uniqueId,bool force)624 Status AVBufferQueueImpl::DetachBuffer(uint64_t uniqueId, bool force)
625 {
626     FALSE_RETURN_V_NOLOG(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(),
627         Status::ERROR_INVALID_BUFFER_ID);
628 
629     const auto& ele = cachedBufferMap_[uniqueId];
630 
631     if (!force) {
632         // 只有生产者或消费者在获取到buffer后才能detach
633         if (ele.state == AVBUFFER_STATE_REQUESTED) {
634             MEDIA_LOG_D("detach buffer(%llu) on state requested", uniqueId);
635         } else if (ele.state == AVBUFFER_STATE_ACQUIRED) {
636             MEDIA_LOG_D("detach buffer(%llu) on state acquired", uniqueId);
637         } else {
638             MEDIA_LOGW_LIMIT(LOG_LIMIT_LOW_FREQ, "detach buffer(" PUBLIC_LOG_U64 ") on state "
639                 PUBLIC_LOG_D32 "forbidden", uniqueId, ele.state);
640             return Status::ERROR_INVALID_BUFFER_STATE;
641         }
642     }
643     TotalMemoryCalculation(false, ele.config.capacity);
644     cachedBufferMap_.erase(uniqueId);
645 
646     return Status::OK;
647 }
648 
DetachBuffer(uint64_t uniqueId)649 Status AVBufferQueueImpl::DetachBuffer(uint64_t uniqueId)
650 {
651     std::lock_guard<std::mutex> lockGuard(queueMutex_);
652     return DetachBuffer(uniqueId, false);
653 }
654 
DetachBuffer(const std::shared_ptr<AVBuffer> & buffer)655 Status AVBufferQueueImpl::DetachBuffer(const std::shared_ptr<AVBuffer>& buffer)
656 {
657     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
658 
659     return DetachBuffer(buffer->GetUniqueId());
660 }
661 
AcquireBuffer(std::shared_ptr<AVBuffer> & buffer)662 Status AVBufferQueueImpl::AcquireBuffer(std::shared_ptr<AVBuffer>& buffer)
663 {
664     std::lock_guard<std::mutex> lockGuard(queueMutex_);
665     auto ret = PopFromDirtyBufferList(buffer);
666     FALSE_RETURN_V_MSG_D(ret == Status::OK, ret, "acquire buffer failed");
667 
668     cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_ACQUIRED;
669 
670     return Status::OK;
671 }
672 
ReleaseBuffer(uint64_t uniqueId)673 Status AVBufferQueueImpl::ReleaseBuffer(uint64_t uniqueId)
674 {
675     {
676         std::lock_guard<std::mutex> lockGuard(queueMutex_);
677         FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_INVALID_BUFFER_ID);
678 
679         FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_ACQUIRED ||
680             cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_ATTACHED, Status::ERROR_INVALID_BUFFER_STATE);
681 
682         cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RELEASED;
683         if (cachedBufferMap_[uniqueId].isDeleting) {
684             DeleteCachedBufferById(uniqueId);
685             return Status::OK;
686         }
687 
688         InsertFreeBufferInOrder(uniqueId);
689 
690         requestCondition.notify_all();
691     }
692 
693     // 注意:此时通知生产者有buffer可用,但实际有可能已经被request wait的生产者获取
694     std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
695     if (producerListener_ != nullptr) {
696         producerListener_->OnBufferAvailable();
697     }
698 
699     return Status::OK;
700 }
701 
ReleaseBuffer(const std::shared_ptr<AVBuffer> & buffer)702 Status AVBufferQueueImpl::ReleaseBuffer(const std::shared_ptr<AVBuffer>& buffer)
703 {
704     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
705 
706     return ReleaseBuffer(buffer->GetUniqueId());
707 }
708 
Clear()709 Status AVBufferQueueImpl::Clear()
710 {
711     MEDIA_LOG_E("AVBufferQueueImpl Clear");
712     std::lock_guard<std::mutex> lockGuard(queueMutex_);
713     dirtyBufferList_.clear();
714     for (auto it = cachedBufferMap_.begin(); it != cachedBufferMap_.end(); it++) {
715         if (it->second.state == AVBUFFER_STATE_PUSHED || it->second.state == AVBUFFER_STATE_RETURNED) {
716             it->second.state = AVBUFFER_STATE_RELEASED;
717             InsertFreeBufferInOrder(it->first);
718         }
719     }
720     requestCondition.notify_all();
721     return Status::OK;
722 }
723 
SetBrokerListener(sptr<IBrokerListener> & listener)724 Status AVBufferQueueImpl::SetBrokerListener(sptr<IBrokerListener>& listener)
725 {
726     std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
727     brokerListeners_.push_back(listener);
728     return Status::OK;
729 }
730 
RemoveBrokerListener(sptr<IBrokerListener> & listener)731 Status AVBufferQueueImpl::RemoveBrokerListener(sptr<IBrokerListener>& listener)
732 {
733     std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
734     if (!brokerListeners_.empty() && listener == brokerListeners_.back()) {
735         brokerListeners_.pop_back();
736         MEDIA_LOG_I("RemoveBrokerListener success, size: %{public}d", brokerListeners_.size());
737     } else {
738         MEDIA_LOG_E("removed item is not the back one.");
739     }
740     return Status::OK;
741 }
742 
SetProducerListener(sptr<IProducerListener> & listener)743 Status AVBufferQueueImpl::SetProducerListener(sptr<IProducerListener>& listener)
744 {
745     std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
746     producerListener_ = listener;
747 
748     return Status::OK;
749 }
750 
SetConsumerListener(sptr<IConsumerListener> & listener)751 Status AVBufferQueueImpl::SetConsumerListener(sptr<IConsumerListener>& listener)
752 {
753     std::lock_guard<std::mutex> lockGuard(consumerListenerMutex_);
754     consumerListener_ = listener;
755 
756     return Status::OK;
757 }
758 
GetFilledBufferSize()759 uint32_t AVBufferQueueImpl::GetFilledBufferSize()
760 {
761     std::lock_guard<std::mutex> lockGuard(queueMutex_);
762     return dirtyBufferList_.size();
763 }
764 
765 } // namespace Media
766 } // namespace OHOS
767