• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "avbuffer_queue_consumer_impl.h"
17 #include "avbuffer_queue_impl.h"
18 #include "avbuffer_queue_producer_impl.h"
19 #include "common/log.h"
20 #include "meta/media_types.h"
21 
22 namespace {
23 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "AVBufferQueue" };
24 }
25 
26 namespace OHOS {
27 namespace Media {
28 
Create(uint32_t size,MemoryType type,const std::string & name,bool disableAlloc)29 std::shared_ptr<AVBufferQueue> AVBufferQueue::Create(
30     uint32_t size, MemoryType type, const std::string& name, bool disableAlloc)
31 {
32     MEDIA_LOG_D("AVBufferQueue::Create size = %u, type = %u, name = %s",
33                 size, static_cast<uint32_t>(type), name.c_str());
34     return std::make_shared<AVBufferQueueImpl>(size, type, name, disableAlloc);
35 }
36 
GetLocalProducer()37 std::shared_ptr<AVBufferQueueProducer> AVBufferQueueImpl::GetLocalProducer()
38 {
39     std::lock_guard<std::mutex> lockGuard(producerCreatorMutex_);
40     std::shared_ptr<AVBufferQueueProducerImpl> producer = nullptr;
41     if (localProducer_.expired()) {
42         auto shared_this = shared_from_this();
43         FALSE_RETURN_V(shared_this != nullptr, nullptr);
44         producer = std::make_shared<AVBufferQueueProducerImpl>(shared_this);
45         localProducer_ = producer;
46     }
47 
48     return localProducer_.lock();
49 }
50 
GetLocalConsumer()51 std::shared_ptr<AVBufferQueueConsumer> AVBufferQueueImpl::GetLocalConsumer()
52 {
53     std::lock_guard<std::mutex> lockGuard(consumerCreatorMutex_);
54     std::shared_ptr<AVBufferQueueConsumerImpl> consumer = nullptr;
55     if (localConsumer_.expired()) {
56         auto shared_this = shared_from_this();
57         FALSE_RETURN_V(shared_this != nullptr, nullptr);
58         consumer = std::make_shared<AVBufferQueueConsumerImpl>(shared_this);
59         localConsumer_ = consumer;
60     }
61     return localConsumer_.lock();
62 }
63 
GetProducer()64 sptr<AVBufferQueueProducer> AVBufferQueueImpl::GetProducer()
65 {
66     std::lock_guard<std::mutex> lockGuard(producerCreatorMutex_);
67     sptr<AVBufferQueueProducerImpl> producer = nullptr;
68     if (producer_ == nullptr || producer_->GetSptrRefCount() <= 0) {
69         auto shared_this = shared_from_this();
70         FALSE_RETURN_V(shared_this != nullptr, nullptr);
71         producer = new AVBufferQueueProducerImpl(shared_this);
72         producer_ = producer;
73     }
74 
75     return producer_.promote();
76 }
77 
GetConsumer()78 sptr<AVBufferQueueConsumer> AVBufferQueueImpl::GetConsumer()
79 {
80     std::lock_guard<std::mutex> lockGuard(consumerCreatorMutex_);
81     sptr<AVBufferQueueConsumerImpl> consumer = nullptr;
82     if (consumer_ == nullptr || consumer_->GetSptrRefCount() <= 0) {
83         auto shared_this = shared_from_this();
84         FALSE_RETURN_V(shared_this != nullptr, nullptr);
85         consumer = new AVBufferQueueConsumerImpl(shared_this);
86         consumer_ = consumer;
87     }
88 
89     return consumer_.promote();
90 }
91 
AVBufferQueueImpl(const std::string & name)92 AVBufferQueueImpl::AVBufferQueueImpl(const std::string &name)
93     : AVBufferQueue(), name_(name), size_(0), memoryType_(MemoryType::UNKNOWN_MEMORY), disableAlloc_(false) {}
94 
AVBufferQueueImpl(uint32_t size,MemoryType type,const std::string & name,bool disableAlloc)95 AVBufferQueueImpl::AVBufferQueueImpl(uint32_t size, MemoryType type, const std::string &name, bool disableAlloc)
96     : AVBufferQueue(), name_(name), size_(size), memoryType_(type), disableAlloc_(disableAlloc)
97 {
98     if (size_ > AVBUFFER_QUEUE_MAX_QUEUE_SIZE) {
99         size_ = AVBUFFER_QUEUE_MAX_QUEUE_SIZE;
100     }
101 }
102 
GetQueueSize()103 uint32_t AVBufferQueueImpl::GetQueueSize()
104 {
105     return size_;
106 }
107 
SetQueueSize(uint32_t size)108 Status AVBufferQueueImpl::SetQueueSize(uint32_t size)
109 {
110     FALSE_RETURN_V(size >= 0 && size <= AVBUFFER_QUEUE_MAX_QUEUE_SIZE && size != size_,
111                    Status::ERROR_INVALID_BUFFER_SIZE);
112 
113     return SetLargerQueueSize(size);
114 }
115 
SetLargerQueueSize(uint32_t size)116 Status AVBufferQueueImpl::SetLargerQueueSize(uint32_t size)
117 {
118     FALSE_RETURN_V(size >= 0 && size <= AVBUFFER_QUEUE_MAX_QUEUE_SIZE_FOR_LARGER  && size != size_,
119                    Status::ERROR_INVALID_BUFFER_SIZE);
120 
121     if (size > size_) {
122         size_ = size;
123         if (!disableAlloc_) {
124             requestCondition.notify_all();
125         }
126     } else {
127         std::lock_guard<std::mutex> lockGuard(queueMutex_);
128         DeleteBuffers(size_ - size);
129         size_ = size;
130     }
131 
132     return Status::OK;
133 }
134 
ClearBufferIf(std::function<bool (const std::shared_ptr<AVBuffer> &)> pred)135 Status AVBufferQueueImpl::ClearBufferIf(std::function<bool(const std::shared_ptr<AVBuffer>&)> pred)
136 {
137     std::lock_guard<std::mutex> lockGuard(queueMutex_);
138     for (auto dirtyIt = dirtyBufferList_.begin(); dirtyIt != dirtyBufferList_.end();) {
139         uint64_t uniqueId = *dirtyIt;
140         auto cacheIt = cachedBufferMap_.find(uniqueId);
141         if (cacheIt == cachedBufferMap_.end()) {
142             MEDIA_LOG_E("unexpected buffer uniqueId=" PUBLIC_LOG_U64, uniqueId);
143             ++dirtyIt;
144             continue;
145         }
146         if (cacheIt->second.state != AVBUFFER_STATE_PUSHED && cacheIt->second.state != AVBUFFER_STATE_RETURNED) {
147             MEDIA_LOG_I("ignore unexpected buffer status uniqueId=" PUBLIC_LOG_U64 ",state= " PUBLIC_LOG_D32,
148                 uniqueId,
149                 static_cast<int32_t>(cacheIt->second.state));
150             ++dirtyIt;
151             continue;
152         }
153 
154         if (pred(cacheIt->second.buffer)) {
155             MEDIA_LOG_D("ClearBufferIf pred ok uniqueId=" PUBLIC_LOG_U64 ",pts=" PUBLIC_LOG_D64,
156                 uniqueId,
157                 cacheIt->second.buffer->pts_);
158             cacheIt->second.state = AVBUFFER_STATE_RELEASED;
159             InsertFreeBufferInOrder(uniqueId);
160             dirtyIt = dirtyBufferList_.erase(dirtyIt);
161         } else {
162             ++dirtyIt;
163         }
164     }
165     requestCondition.notify_all();
166     return Status::OK;
167 }
168 
IsBufferInQueue(const std::shared_ptr<AVBuffer> & buffer)169 bool AVBufferQueueImpl::IsBufferInQueue(const std::shared_ptr<AVBuffer>& buffer)
170 {
171     FALSE_RETURN_V(buffer != nullptr, false);
172     auto uniqueId = buffer->GetUniqueId();
173     return cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end();
174 }
175 
GetCachedBufferCount() const176 uint32_t AVBufferQueueImpl::GetCachedBufferCount() const
177 {
178     // 确保cachedBufferMap_.size()不会超过MAX_UINT32
179     return static_cast<uint32_t>(cachedBufferMap_.size());
180 }
181 
PopFromFreeBufferList(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)182 Status AVBufferQueueImpl::PopFromFreeBufferList(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
183 {
184     for (auto it = freeBufferList_.begin(); it != freeBufferList_.end(); it++) {
185         if (config <= cachedBufferMap_[*it].config) {
186             buffer = cachedBufferMap_[*it].buffer;
187             freeBufferList_.erase(it);
188             return Status::OK;
189         }
190     }
191 
192     if (freeBufferList_.empty()) {
193         buffer = nullptr;
194         // 没有可以重用的freeBuffer
195         return Status::ERROR_NO_FREE_BUFFER;
196     }
197 
198     buffer = cachedBufferMap_[freeBufferList_.front()].buffer;
199     freeBufferList_.pop_front();
200 
201     return Status::OK;
202 }
203 
PopFromDirtyBufferList(std::shared_ptr<AVBuffer> & buffer)204 Status AVBufferQueueImpl::PopFromDirtyBufferList(std::shared_ptr<AVBuffer>& buffer)
205 {
206     FALSE_RETURN_V_NOLOG(!dirtyBufferList_.empty(), Status::ERROR_NO_DIRTY_BUFFER);
207 
208     buffer = cachedBufferMap_[dirtyBufferList_.front()].buffer;
209     dirtyBufferList_.pop_front();
210     return Status::OK;
211 }
212 
AllocBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)213 Status AVBufferQueueImpl::AllocBuffer(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
214 {
215     auto bufferImpl = AVBuffer::CreateAVBuffer(config);
216     FALSE_RETURN_V(bufferImpl != nullptr, Status::ERROR_CREATE_BUFFER);
217 
218     auto uniqueId = bufferImpl->GetUniqueId();
219     AVBufferElement ele = {
220         .config = bufferImpl->GetConfig(),
221         .state = AVBUFFER_STATE_RELEASED,
222         .isDeleting = false,
223         .buffer = bufferImpl,
224     };
225     cachedBufferMap_[uniqueId] = ele;
226     buffer = bufferImpl;
227 
228     return Status::OK;
229 }
230 
RequestReuseBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)231 Status AVBufferQueueImpl::RequestReuseBuffer(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
232 {
233     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
234 
235     auto uniqueId = buffer->GetUniqueId();
236     FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_CREATE_BUFFER);
237 
238     if (config <= cachedBufferMap_[uniqueId].config) {
239         // 不需要重新分配,直接更新buffer大小
240         cachedBufferMap_[uniqueId].config.size = config.size;
241     } else {
242         // 重新分配
243         DeleteCachedBufferById(uniqueId);
244         NOK_RETURN(AllocBuffer(buffer, config));
245     }
246 
247     // 注意这里的uniqueId可能因为重新分配buffer而更新,所以需要再次获取
248     cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_REQUESTED;
249     return Status::OK;
250 }
251 
DeleteBuffers(uint32_t count)252 void AVBufferQueueImpl::DeleteBuffers(uint32_t count)
253 {
254     FALSE_RETURN(count > 0);
255 
256     while (!freeBufferList_.empty()) {
257         DeleteCachedBufferById(freeBufferList_.front());
258         freeBufferList_.pop_front();
259         count--;
260         if (count <= 0) {
261             return;
262         }
263     }
264 
265     while (!dirtyBufferList_.empty()) {
266         DeleteCachedBufferById(dirtyBufferList_.front());
267         dirtyBufferList_.pop_front();
268         count--;
269         if (count <= 0) {
270             return;
271         }
272     }
273 
274     for (auto&& ele : cachedBufferMap_) {
275         ele.second.isDeleting = true;
276         // we don't have to do anything
277         count--;
278         if (count <= 0) {
279             break;
280         }
281     }
282 }
283 
DeleteCachedBufferById(uint64_t uniqueId)284 void AVBufferQueueImpl::DeleteCachedBufferById(uint64_t uniqueId)
285 {
286     auto it = cachedBufferMap_.find(uniqueId);
287     if (it != cachedBufferMap_.end()) {
288         MEDIA_LOG_D("DeleteCachedBufferById uniqueId:%llu, state:%d", uniqueId, it->second.state);
289         cachedBufferMap_.erase(it);
290     }
291 }
292 
CheckConfig(const AVBufferConfig & config)293 Status AVBufferQueueImpl::CheckConfig(const AVBufferConfig& config)
294 {
295     if (config.memoryType == MemoryType::UNKNOWN_MEMORY) {
296         MEDIA_LOG_D("config.memoryType != MemoryType::UNKNOWN_MEMORY");
297         return Status::ERROR_UNEXPECTED_MEMORY_TYPE;
298     }
299     // memoryType_初始化之后将无法改变。
300     if (memoryType_ != MemoryType::UNKNOWN_MEMORY && config.memoryType != memoryType_) {
301         MEDIA_LOG_D("memoryType_ != MemoryType::UNKNOWN_MEMORY && config.memoryType != memoryType_");
302         return Status::ERROR_UNEXPECTED_MEMORY_TYPE;
303     }
304     memoryType_ = config.memoryType;
305     return Status::OK;
306 }
307 
wait_for(std::unique_lock<std::mutex> & lock,int32_t timeoutMs)308 bool AVBufferQueueImpl::wait_for(std::unique_lock<std::mutex>& lock, int32_t timeoutMs)
309 {
310     MEDIA_LOG_D("wait for free buffer, timeout = %d", timeoutMs);
311     if (timeoutMs > 0) {
312         return requestCondition.wait_for(
313             lock, std::chrono::milliseconds(timeoutMs), [this]() {
314                 return !freeBufferList_.empty() || (GetCachedBufferCount() < GetQueueSize());
315             });
316     } else if (timeoutMs < 0) {
317         requestCondition.wait(lock);
318     }
319     return true;
320 }
321 
RequestBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config,int32_t timeoutMs)322 Status AVBufferQueueImpl::RequestBuffer(
323     std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config, int32_t timeoutMs)
324 {
325     auto configCopy = config;
326     if (config.memoryType == MemoryType::UNKNOWN_MEMORY) {
327         MEDIA_LOG_D("AVBufferQueueImpl::RequestBuffer config.memoryType unknown, "
328                     "memoryType_ = %u", static_cast<uint32_t>(memoryType_));
329         configCopy.memoryType = memoryType_;
330     }
331 
332     // check param
333     std::unique_lock<std::mutex> lock(queueMutex_);
334     auto res = CheckConfig(configCopy);
335     FALSE_RETURN_V_MSG(res == Status::OK,
336         res, "CheckConfig not OK, code %{public}d", static_cast<int32_t>(res));
337     // dequeue from free list
338     auto ret = PopFromFreeBufferList(buffer, configCopy);
339     if (ret == Status::OK) {
340         return RequestReuseBuffer(buffer, configCopy);
341     }
342 
343     // check queue size
344     if (GetCachedBufferCount() >= GetQueueSize()) {
345         if (!wait_for(lock, timeoutMs)) {
346             MEDIA_LOG_D("FALSE_RETURN_V wait_for(lock, timeoutMs)");
347             return Status::ERROR_WAIT_TIMEOUT;
348         }
349         // 被条件唤醒后,再次尝试从freeBufferList中取buffer
350         ret = PopFromFreeBufferList(buffer, configCopy);
351         if (ret == Status::OK) {
352             return RequestReuseBuffer(buffer, configCopy);
353         }
354         if (GetCachedBufferCount() >= GetQueueSize()) {
355             return Status::ERROR_NO_FREE_BUFFER;
356         }
357     }
358 
359     NOK_RETURN(AllocBuffer(buffer, configCopy));
360     cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_REQUESTED;
361 
362     return Status::OK;
363 }
364 
InsertFreeBufferInOrder(uint64_t uniqueId)365 void AVBufferQueueImpl::InsertFreeBufferInOrder(uint64_t uniqueId)
366 {
367     for (auto it = freeBufferList_.begin(); it != freeBufferList_.end(); it++) {
368         if ((*it != uniqueId) &&
369                 (cachedBufferMap_[*it].config.capacity >= cachedBufferMap_[uniqueId].config.capacity)) {
370             freeBufferList_.insert(it, uniqueId);
371             return;
372         }
373     }
374     freeBufferList_.emplace_back(uniqueId);
375 }
376 
CancelBuffer(uint64_t uniqueId)377 Status AVBufferQueueImpl::CancelBuffer(uint64_t uniqueId)
378 {
379     FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_INVALID_BUFFER_ID);
380 
381     FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_REQUESTED ||
382                    cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_PUSHED,
383                    Status::ERROR_INVALID_BUFFER_STATE);
384 
385     InsertFreeBufferInOrder(uniqueId);
386 
387     cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RELEASED;
388 
389     requestCondition.notify_all();
390 
391     MEDIA_LOG_D("cancel buffer id = %llu", uniqueId);
392 
393     return Status::OK;
394 }
395 
PushBuffer(uint64_t uniqueId,bool available)396 Status AVBufferQueueImpl::PushBuffer(uint64_t uniqueId, bool available)
397 {
398     std::shared_ptr<AVBuffer> buffer = nullptr;
399     {
400         std::lock_guard<std::mutex> lockGuard(queueMutex_);
401         FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(),
402                        Status::ERROR_INVALID_BUFFER_ID);
403 
404         auto& ele = cachedBufferMap_[uniqueId];
405         if (ele.isDeleting) {
406             DeleteCachedBufferById(uniqueId);
407             MEDIA_LOG_D("delete push buffer uniqueId(%llu)", uniqueId);
408             return Status::OK;
409         }
410 
411         if (available) {
412             FALSE_RETURN_V(ele.buffer->GetConfig().size >= 0, Status::ERROR_INVALID_BUFFER_SIZE);
413         }
414 
415         FALSE_RETURN_V(ele.state == AVBUFFER_STATE_REQUESTED || ele.state == AVBUFFER_STATE_ATTACHED,
416                        Status::ERROR_INVALID_BUFFER_STATE);
417 
418         ele.state = AVBUFFER_STATE_PUSHED;
419         buffer = cachedBufferMap_[uniqueId].buffer;
420     }
421 
422     if (available) {
423         std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
424         if (!brokerListeners_.empty() && brokerListeners_.back() != nullptr) {
425             brokerListeners_.back()->OnBufferFilled(buffer);
426             return Status::OK;
427         }
428     }
429 
430     return ReturnBuffer(uniqueId, available);
431 }
432 
PushBuffer(const std::shared_ptr<AVBuffer> & buffer,bool available)433 Status AVBufferQueueImpl::PushBuffer(const std::shared_ptr<AVBuffer>& buffer, bool available)
434 {
435     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
436 
437     return PushBuffer(buffer->GetUniqueId(), available);
438 }
439 
ReturnBuffer(uint64_t uniqueId,bool available)440 Status __attribute__((no_sanitize("cfi"))) AVBufferQueueImpl::ReturnBuffer(uint64_t uniqueId, bool available)
441 {
442     {
443         std::lock_guard<std::mutex> lockGuard(queueMutex_);
444         FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(),
445                        Status::ERROR_INVALID_BUFFER_ID);
446 
447         if (cachedBufferMap_[uniqueId].isDeleting) {
448             DeleteCachedBufferById(uniqueId);
449             MEDIA_LOG_D("delete return buffer uniqueId(%llu)", uniqueId);
450             return Status::OK;
451         }
452 
453         FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_PUSHED,
454                        Status::ERROR_INVALID_BUFFER_STATE);
455 
456         if (!available) {
457             NOK_RETURN(CancelBuffer(uniqueId));
458         } else {
459             auto& config = cachedBufferMap_[uniqueId].buffer->GetConfig();
460             bool isEosBuffer = cachedBufferMap_[uniqueId].buffer->flag_ & (uint32_t)(Plugins::AVBufferFlag::EOS);
461             if (!isEosBuffer) {
462                 FALSE_RETURN_V(config.size > 0, Status::ERROR_INVALID_BUFFER_SIZE);
463             }
464             cachedBufferMap_[uniqueId].config = config;
465             cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RETURNED;
466             dirtyBufferList_.push_back(uniqueId);
467         }
468     }
469 
470     if (!available) {
471         std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
472         if (producerListener_ != nullptr) {
473             producerListener_->OnBufferAvailable();
474         }
475         return Status::OK;
476     }
477 
478     std::lock_guard<std::mutex> lockGuard(consumerListenerMutex_);
479     FALSE_RETURN_V(consumerListener_ != nullptr, Status::ERROR_NO_CONSUMER_LISTENER);
480     consumerListener_->OnBufferAvailable();
481 
482     return Status::OK;
483 }
484 
ReturnBuffer(const std::shared_ptr<AVBuffer> & buffer,bool available)485 Status AVBufferQueueImpl::ReturnBuffer(const std::shared_ptr<AVBuffer>& buffer, bool available)
486 {
487     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
488 
489     return ReturnBuffer(buffer->GetUniqueId(), available);
490 }
491 
SetQueueSizeAndAttachBuffer(uint32_t size,std::shared_ptr<AVBuffer> & buffer,bool isFilled)492 Status AVBufferQueueImpl::SetQueueSizeAndAttachBuffer(uint32_t size,
493     std::shared_ptr<AVBuffer>& buffer, bool isFilled)
494 {
495     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
496     auto config = buffer->GetConfig();
497     auto uniqueId = buffer->GetUniqueId();
498     {
499         std::lock_guard<std::mutex> lockGuard(queueMutex_);
500         if (size >= 0 && size <= AVBUFFER_QUEUE_MAX_QUEUE_SIZE && size != size_) {
501             SetQueueSizeBeforeAttachBufferLocked(size);
502         }
503         FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) == cachedBufferMap_.end(),
504                        Status::ERROR_INVALID_BUFFER_ID);
505         NOK_RETURN(CheckConfig(config));
506         Status result = AttachAvailableBufferLocked(buffer);
507         FALSE_RETURN_V(result == Status::OK, result);
508     }
509     if (isFilled) {
510         return PushBufferOnFilled(uniqueId, isFilled);
511     }
512     return ReleaseBuffer(uniqueId);
513 }
514 
AttachAvailableBufferLocked(std::shared_ptr<AVBuffer> & buffer)515 Status AVBufferQueueImpl::AttachAvailableBufferLocked(std::shared_ptr<AVBuffer>& buffer)
516 {
517     auto config = buffer->GetConfig();
518     auto uniqueId = buffer->GetUniqueId();
519     AVBufferElement ele = {
520         .config = config,
521         .state = AVBUFFER_STATE_ATTACHED,
522         .isDeleting = false,
523         .buffer = buffer
524     };
525 
526     auto cachedCount = GetCachedBufferCount();
527     auto queueSize = GetQueueSize();
528     if (cachedCount >= queueSize) {
529         auto validCount = static_cast<uint32_t>(dirtyBufferList_.size() + freeBufferList_.size());
530         auto toBeDeleteCount = cachedCount - queueSize;
531         // 这里表示有可以删除的buffer,或者
532         if (validCount > toBeDeleteCount) {
533             // 在什么场景下需要在此处删除buffer?
534             DeleteBuffers(toBeDeleteCount + 1); // 多删除一个,用于attach当前buffer
535             cachedBufferMap_[uniqueId] = ele;
536             MEDIA_LOG_D("uniqueId(%llu) attached with delete", uniqueId);
537         } else {
538             MEDIA_LOG_E("attach failed, out of range");
539             return Status::ERROR_OUT_OF_RANGE;
540         }
541     } else {
542         cachedBufferMap_[uniqueId] = ele;
543         MEDIA_LOG_D("uniqueId(%llu) attached without delete", uniqueId);
544     }
545     return Status::OK;
546 }
547 
PushBufferOnFilled(uint64_t uniqueId,bool isFilled)548 Status AVBufferQueueImpl::PushBufferOnFilled(uint64_t uniqueId, bool isFilled)
549 {
550     auto ret = PushBuffer(uniqueId, isFilled);
551     if (ret != Status::OK) {
552         // PushBuffer失败,强制Detach
553         DetachBuffer(uniqueId, true);
554     }
555     return ret;
556 }
557 
SetQueueSizeBeforeAttachBufferLocked(uint32_t size)558 void AVBufferQueueImpl::SetQueueSizeBeforeAttachBufferLocked(uint32_t size)
559 {
560     if (size > size_) {
561         size_ = size;
562         if (!disableAlloc_) {
563             requestCondition.notify_all();
564         }
565     } else {
566         DeleteBuffers(size_ - size);
567         size_ = size;
568     }
569 }
570 
AttachBuffer(std::shared_ptr<AVBuffer> & buffer,bool isFilled)571 Status AVBufferQueueImpl::AttachBuffer(std::shared_ptr<AVBuffer>& buffer, bool isFilled)
572 {
573     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
574 
575     auto config = buffer->GetConfig();
576     auto uniqueId = buffer->GetUniqueId();
577     {
578         std::lock_guard<std::mutex> lockGuard(queueMutex_);
579         FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) == cachedBufferMap_.end(),
580                        Status::ERROR_INVALID_BUFFER_ID);
581 
582         NOK_RETURN(CheckConfig(config));
583 
584         Status result = AttachAvailableBufferLocked(buffer);
585         FALSE_RETURN_V(result == Status::OK, result);
586     }
587 
588     if (isFilled) {
589         return PushBufferOnFilled(uniqueId, isFilled);
590     }
591 
592     return ReleaseBuffer(uniqueId);
593 }
594 
DetachBuffer(uint64_t uniqueId,bool force)595 Status AVBufferQueueImpl::DetachBuffer(uint64_t uniqueId, bool force)
596 {
597     FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_INVALID_BUFFER_ID);
598 
599     const auto& ele = cachedBufferMap_[uniqueId];
600 
601     if (!force) {
602         // 只有生产者或消费者在获取到buffer后才能detach
603         if (ele.state == AVBUFFER_STATE_REQUESTED) {
604             MEDIA_LOG_D("detach buffer(%llu) on state requested", uniqueId);
605         } else if (ele.state == AVBUFFER_STATE_ACQUIRED) {
606             MEDIA_LOG_D("detach buffer(%llu) on state acquired", uniqueId);
607         } else {
608             MEDIA_LOG_W("detach buffer(%llu) on state %d forbidden", uniqueId, ele.state);
609             return Status::ERROR_INVALID_BUFFER_STATE;
610         }
611     }
612 
613     cachedBufferMap_.erase(uniqueId);
614 
615     return Status::OK;
616 }
617 
DetachBuffer(uint64_t uniqueId)618 Status AVBufferQueueImpl::DetachBuffer(uint64_t uniqueId)
619 {
620     std::lock_guard<std::mutex> lockGuard(queueMutex_);
621     return DetachBuffer(uniqueId, false);
622 }
623 
DetachBuffer(const std::shared_ptr<AVBuffer> & buffer)624 Status AVBufferQueueImpl::DetachBuffer(const std::shared_ptr<AVBuffer>& buffer)
625 {
626     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
627 
628     return DetachBuffer(buffer->GetUniqueId());
629 }
630 
AcquireBuffer(std::shared_ptr<AVBuffer> & buffer)631 Status AVBufferQueueImpl::AcquireBuffer(std::shared_ptr<AVBuffer>& buffer)
632 {
633     std::lock_guard<std::mutex> lockGuard(queueMutex_);
634     auto ret = PopFromDirtyBufferList(buffer);
635     if (ret != Status::OK) {
636         MEDIA_LOG_D("acquire buffer failed");
637         return ret;
638     }
639 
640     cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_ACQUIRED;
641 
642     return Status::OK;
643 }
644 
ReleaseBuffer(uint64_t uniqueId)645 Status AVBufferQueueImpl::ReleaseBuffer(uint64_t uniqueId)
646 {
647     {
648         std::lock_guard<std::mutex> lockGuard(queueMutex_);
649         FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_INVALID_BUFFER_ID);
650 
651         FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_ACQUIRED ||
652             cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_ATTACHED, Status::ERROR_INVALID_BUFFER_STATE);
653 
654         cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RELEASED;
655         if (cachedBufferMap_[uniqueId].isDeleting) {
656             DeleteCachedBufferById(uniqueId);
657             return Status::OK;
658         }
659 
660         InsertFreeBufferInOrder(uniqueId);
661 
662         requestCondition.notify_all();
663     }
664 
665     // 注意:此时通知生产者有buffer可用,但实际有可能已经被request wait的生产者获取
666     std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
667     if (producerListener_ != nullptr) {
668         producerListener_->OnBufferAvailable();
669     }
670 
671     return Status::OK;
672 }
673 
ReleaseBuffer(const std::shared_ptr<AVBuffer> & buffer)674 Status AVBufferQueueImpl::ReleaseBuffer(const std::shared_ptr<AVBuffer>& buffer)
675 {
676     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
677 
678     return ReleaseBuffer(buffer->GetUniqueId());
679 }
680 
Clear()681 Status AVBufferQueueImpl::Clear()
682 {
683     MEDIA_LOG_E("AVBufferQueueImpl Clear");
684     std::lock_guard<std::mutex> lockGuard(queueMutex_);
685     dirtyBufferList_.clear();
686     for (auto it = cachedBufferMap_.begin(); it != cachedBufferMap_.end(); it++) {
687         if (it->second.state == AVBUFFER_STATE_PUSHED || it->second.state == AVBUFFER_STATE_RETURNED) {
688             it->second.state = AVBUFFER_STATE_RELEASED;
689             InsertFreeBufferInOrder(it->first);
690         }
691     }
692     requestCondition.notify_all();
693     return Status::OK;
694 }
695 
SetBrokerListener(sptr<IBrokerListener> & listener)696 Status AVBufferQueueImpl::SetBrokerListener(sptr<IBrokerListener>& listener)
697 {
698     std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
699     brokerListeners_.push_back(listener);
700     return Status::OK;
701 }
702 
RemoveBrokerListener(sptr<IBrokerListener> & listener)703 Status AVBufferQueueImpl::RemoveBrokerListener(sptr<IBrokerListener>& listener)
704 {
705     std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
706     if (!brokerListeners_.empty() && listener == brokerListeners_.back()) {
707         brokerListeners_.pop_back();
708         MEDIA_LOG_I("RemoveBrokerListener success, size: %{public}d", brokerListeners_.size());
709     } else {
710         MEDIA_LOG_E("removed item is not the back one.");
711     }
712     return Status::OK;
713 }
714 
SetProducerListener(sptr<IProducerListener> & listener)715 Status AVBufferQueueImpl::SetProducerListener(sptr<IProducerListener>& listener)
716 {
717     std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
718     producerListener_ = listener;
719 
720     return Status::OK;
721 }
722 
SetConsumerListener(sptr<IConsumerListener> & listener)723 Status AVBufferQueueImpl::SetConsumerListener(sptr<IConsumerListener>& listener)
724 {
725     std::lock_guard<std::mutex> lockGuard(consumerListenerMutex_);
726     consumerListener_ = listener;
727 
728     return Status::OK;
729 }
730 
GetFilledBufferSize()731 uint32_t AVBufferQueueImpl::GetFilledBufferSize()
732 {
733     std::lock_guard<std::mutex> lockGuard(queueMutex_);
734     return dirtyBufferList_.size();
735 }
736 
737 } // namespace Media
738 } // namespace OHOS
739