1 /*
2 * Copyright (c) 2021-2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "avbuffer_queue_consumer_impl.h"
17 #include "avbuffer_queue_impl.h"
18 #include "avbuffer_queue_producer_impl.h"
19 #include "common/log.h"
20 #include "meta/media_types.h"
21
22 namespace OHOS {
23 namespace Media {
24
Create(uint32_t size,MemoryType type,const std::string & name,bool disableAlloc)25 std::shared_ptr<AVBufferQueue> AVBufferQueue::Create(
26 uint32_t size, MemoryType type, const std::string& name, bool disableAlloc)
27 {
28 MEDIA_LOG_D("AVBufferQueue::Create size = %u, type = %u, name = %s",
29 size, static_cast<uint32_t>(type), name.c_str());
30 return std::make_shared<AVBufferQueueImpl>(size, type, name, disableAlloc);
31 }
32
CreateAsSurfaceProducer(sptr<Surface> & surface,const std::string & name)33 std::shared_ptr<AVBufferQueue> AVBufferQueue::CreateAsSurfaceProducer(
34 sptr<Surface>& surface, const std::string& name)
35 {
36 FALSE_RETURN_V(surface != nullptr, nullptr);
37
38 return std::make_shared<AVBufferQueueSurfaceWrapper>(
39 surface, name, AVBufferQueueSurfaceWrapper::PRODUCER_WRAPPER);
40 }
41
CreateAsSurfaceConsumer(sptr<Surface> & surface,const std::string & name)42 std::shared_ptr<AVBufferQueue> AVBufferQueue::CreateAsSurfaceConsumer(
43 sptr<Surface>& surface, const std::string& name)
44 {
45 FALSE_RETURN_V(surface != nullptr, nullptr);
46
47 return std::make_shared<AVBufferQueueSurfaceWrapper>(
48 surface, name, AVBufferQueueSurfaceWrapper::CONSUMER_WRAPPER);
49 }
50
GetLocalProducer()51 std::shared_ptr<AVBufferQueueProducer> AVBufferQueueImpl::GetLocalProducer()
52 {
53 std::lock_guard<std::mutex> lockGuard(producerCreatorMutex_);
54 std::shared_ptr<AVBufferQueueProducerImpl> producer = nullptr;
55 if (localProducer_.expired()) {
56 auto shared_this = shared_from_this();
57 FALSE_RETURN_V(shared_this != nullptr, nullptr);
58 producer = std::make_shared<AVBufferQueueProducerImpl>(shared_this);
59 localProducer_ = producer;
60 }
61
62 return localProducer_.lock();
63 }
64
GetLocalConsumer()65 std::shared_ptr<AVBufferQueueConsumer> AVBufferQueueImpl::GetLocalConsumer()
66 {
67 std::lock_guard<std::mutex> lockGuard(consumerCreatorMutex_);
68 std::shared_ptr<AVBufferQueueConsumerImpl> consumer = nullptr;
69 if (localConsumer_.expired()) {
70 auto shared_this = shared_from_this();
71 FALSE_RETURN_V(shared_this != nullptr, nullptr);
72 consumer = std::make_shared<AVBufferQueueConsumerImpl>(shared_this);
73 localConsumer_ = consumer;
74 }
75 return localConsumer_.lock();
76 }
77
GetProducer()78 sptr<AVBufferQueueProducer> AVBufferQueueImpl::GetProducer()
79 {
80 std::lock_guard<std::mutex> lockGuard(producerCreatorMutex_);
81 sptr<AVBufferQueueProducerImpl> producer = nullptr;
82 if (producer_ == nullptr || producer_->GetSptrRefCount() <= 0) {
83 auto shared_this = shared_from_this();
84 FALSE_RETURN_V(shared_this != nullptr, nullptr);
85 producer = new AVBufferQueueProducerImpl(shared_this);
86 producer_ = producer;
87 }
88
89 return producer_.promote();
90 }
91
GetConsumer()92 sptr<AVBufferQueueConsumer> AVBufferQueueImpl::GetConsumer()
93 {
94 std::lock_guard<std::mutex> lockGuard(consumerCreatorMutex_);
95 sptr<AVBufferQueueConsumerImpl> consumer = nullptr;
96 if (consumer_ == nullptr || consumer_->GetSptrRefCount() <= 0) {
97 auto shared_this = shared_from_this();
98 FALSE_RETURN_V(shared_this != nullptr, nullptr);
99 consumer = new AVBufferQueueConsumerImpl(shared_this);
100 consumer_ = consumer;
101 }
102
103 return consumer_.promote();
104 }
105
AVBufferQueueImpl(const std::string & name)106 AVBufferQueueImpl::AVBufferQueueImpl(const std::string &name)
107 : AVBufferQueue(), name_(name), size_(0), memoryType_(MemoryType::UNKNOWN_MEMORY), disableAlloc_(false) {}
108
AVBufferQueueImpl(uint32_t size,MemoryType type,const std::string & name,bool disableAlloc)109 AVBufferQueueImpl::AVBufferQueueImpl(uint32_t size, MemoryType type, const std::string &name, bool disableAlloc)
110 : AVBufferQueue(), name_(name), size_(size), memoryType_(type), disableAlloc_(disableAlloc)
111 {
112 if (size_ > AVBUFFER_QUEUE_MAX_QUEUE_SIZE) {
113 size_ = AVBUFFER_QUEUE_MAX_QUEUE_SIZE;
114 }
115 }
116
GetQueueSize()117 uint32_t AVBufferQueueImpl::GetQueueSize()
118 {
119 return size_;
120 }
121
SetQueueSize(uint32_t size)122 Status AVBufferQueueImpl::SetQueueSize(uint32_t size)
123 {
124 FALSE_RETURN_V(size >= 0 && size <= AVBUFFER_QUEUE_MAX_QUEUE_SIZE && size != size_,
125 Status::ERROR_INVALID_BUFFER_SIZE);
126
127 if (size > size_) {
128 size_ = size;
129 if (!disableAlloc_) {
130 requestCondition.notify_all();
131 }
132 } else {
133 std::lock_guard<std::mutex> lockGuard(queueMutex_);
134 DeleteBuffers(size_ - size);
135 size_ = size;
136 }
137
138 return Status::OK;
139 }
140
IsBufferInQueue(const std::shared_ptr<AVBuffer> & buffer)141 bool AVBufferQueueImpl::IsBufferInQueue(const std::shared_ptr<AVBuffer>& buffer)
142 {
143 FALSE_RETURN_V(buffer != nullptr, false);
144 auto uniqueId = buffer->GetUniqueId();
145 return cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end();
146 }
147
GetCachedBufferCount() const148 uint32_t AVBufferQueueImpl::GetCachedBufferCount() const
149 {
150 // 确保cachedBufferMap_.size()不会超过MAX_UINT32
151 return static_cast<uint32_t>(cachedBufferMap_.size());
152 }
153
PopFromFreeBufferList(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)154 Status AVBufferQueueImpl::PopFromFreeBufferList(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
155 {
156 for (auto it = freeBufferList_.begin(); it != freeBufferList_.end(); it++) {
157 if (config <= cachedBufferMap_[*it].config) {
158 buffer = cachedBufferMap_[*it].buffer;
159 freeBufferList_.erase(it);
160 return Status::OK;
161 }
162 }
163
164 if (freeBufferList_.empty()) {
165 buffer = nullptr;
166 // 没有可以重用的freeBuffer
167 return Status::ERROR_NO_FREE_BUFFER;
168 }
169
170 buffer = cachedBufferMap_[freeBufferList_.front()].buffer;
171 freeBufferList_.pop_front();
172
173 return Status::OK;
174 }
175
PopFromDirtyBufferList(std::shared_ptr<AVBuffer> & buffer)176 Status AVBufferQueueImpl::PopFromDirtyBufferList(std::shared_ptr<AVBuffer>& buffer)
177 {
178 FALSE_RETURN_V(!dirtyBufferList_.empty(), Status::ERROR_NO_DIRTY_BUFFER);
179
180 buffer = cachedBufferMap_[dirtyBufferList_.front()].buffer;
181 dirtyBufferList_.pop_front();
182 return Status::OK;
183 }
184
AllocBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)185 Status AVBufferQueueImpl::AllocBuffer(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
186 {
187 auto bufferImpl = AVBuffer::CreateAVBuffer(config);
188 FALSE_RETURN_V(bufferImpl != nullptr, Status::ERROR_CREATE_BUFFER);
189
190 auto uniqueId = bufferImpl->GetUniqueId();
191 AVBufferElement ele = {
192 .config = bufferImpl->GetConfig(),
193 .state = AVBUFFER_STATE_RELEASED,
194 .isDeleting = false,
195 .buffer = bufferImpl,
196 };
197 cachedBufferMap_[uniqueId] = ele;
198 buffer = bufferImpl;
199
200 return Status::OK;
201 }
202
RequestReuseBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)203 Status AVBufferQueueImpl::RequestReuseBuffer(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
204 {
205 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
206
207 auto uniqueId = buffer->GetUniqueId();
208 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_CREATE_BUFFER);
209
210 if (config <= cachedBufferMap_[uniqueId].config) {
211 // 不需要重新分配,直接更新buffer大小
212 cachedBufferMap_[uniqueId].config.size = config.size;
213 } else {
214 // 重新分配
215 DeleteCachedBufferById(uniqueId);
216 NOK_RETURN(AllocBuffer(buffer, config));
217 }
218
219 // 注意这里的uniqueId可能因为重新分配buffer而更新,所以需要再次获取
220 cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_REQUESTED;
221 return Status::OK;
222 }
223
DeleteBuffers(uint32_t count)224 void AVBufferQueueImpl::DeleteBuffers(uint32_t count)
225 {
226 FALSE_RETURN(count > 0);
227
228 while (!freeBufferList_.empty()) {
229 DeleteCachedBufferById(freeBufferList_.front());
230 freeBufferList_.pop_front();
231 count--;
232 if (count <= 0) {
233 return;
234 }
235 }
236
237 while (!dirtyBufferList_.empty()) {
238 DeleteCachedBufferById(dirtyBufferList_.front());
239 dirtyBufferList_.pop_front();
240 count--;
241 if (count <= 0) {
242 return;
243 }
244 }
245
246 for (auto&& ele : cachedBufferMap_) {
247 ele.second.isDeleting = true;
248 // we don't have to do anything
249 count--;
250 if (count <= 0) {
251 break;
252 }
253 }
254 }
255
DeleteCachedBufferById(uint64_t uniqueId)256 void AVBufferQueueImpl::DeleteCachedBufferById(uint64_t uniqueId)
257 {
258 auto it = cachedBufferMap_.find(uniqueId);
259 if (it != cachedBufferMap_.end()) {
260 cachedBufferMap_.erase(it);
261 }
262 }
263
CheckConfig(const AVBufferConfig & config)264 Status AVBufferQueueImpl::CheckConfig(const AVBufferConfig& config)
265 {
266 FALSE_RETURN_V(config.memoryType != MemoryType::UNKNOWN_MEMORY, Status::ERROR_UNEXPECTED_MEMORY_TYPE);
267 // memoryType_初始化之后将无法改变。
268 FALSE_RETURN_V(memoryType_ == MemoryType::UNKNOWN_MEMORY || config.memoryType == memoryType_,
269 Status::ERROR_UNEXPECTED_MEMORY_TYPE);
270 memoryType_ = config.memoryType;
271 return Status::OK;
272 }
273
wait_for(std::unique_lock<std::mutex> & lock,int32_t timeoutMs)274 bool AVBufferQueueImpl::wait_for(std::unique_lock<std::mutex>& lock, int32_t timeoutMs)
275 {
276 MEDIA_LOG_D("wait for free buffer, timeout = %d", timeoutMs);
277 if (timeoutMs > 0) {
278 return requestCondition.wait_for(
279 lock, std::chrono::milliseconds(timeoutMs), [this]() {
280 return !freeBufferList_.empty() || (GetCachedBufferCount() < GetQueueSize());
281 });
282 } else if (timeoutMs < 0) {
283 requestCondition.wait(lock);
284 }
285 return true;
286 }
287
RequestBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config,int32_t timeoutMs)288 Status AVBufferQueueImpl::RequestBuffer(
289 std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config, int32_t timeoutMs)
290 {
291 auto configCopy = config;
292 if (config.memoryType == MemoryType::UNKNOWN_MEMORY) {
293 MEDIA_LOG_D("AVBufferQueueImpl::RequestBuffer config.memoryType unknown, "
294 "memoryType_ = %u", static_cast<uint32_t>(memoryType_));
295 configCopy.memoryType = memoryType_;
296 }
297
298 // check param
299 std::unique_lock<std::mutex> lock(queueMutex_);
300 NOK_RETURN(CheckConfig(configCopy));
301
302 // dequeue from free list
303 auto ret = PopFromFreeBufferList(buffer, configCopy);
304 if (ret == Status::OK) {
305 return RequestReuseBuffer(buffer, configCopy);
306 }
307
308 // check queue size
309 if (GetCachedBufferCount() >= GetQueueSize()) {
310 FALSE_RETURN_V(wait_for(lock, timeoutMs), Status::ERROR_WAIT_TIMEOUT);
311
312 // 被条件唤醒后,再次尝试从freeBufferList中取buffer
313 ret = PopFromFreeBufferList(buffer, configCopy);
314 if (ret == Status::OK) {
315 return RequestReuseBuffer(buffer, configCopy);
316 }
317 FALSE_RETURN_V(GetCachedBufferCount() < GetQueueSize(), Status::ERROR_NO_FREE_BUFFER);
318 }
319
320 NOK_RETURN(AllocBuffer(buffer, configCopy));
321 cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_REQUESTED;
322
323 return Status::OK;
324 }
325
InsertFreeBufferInOrder(uint64_t uniqueId)326 void AVBufferQueueImpl::InsertFreeBufferInOrder(uint64_t uniqueId)
327 {
328 for (auto it = freeBufferList_.begin(); it != freeBufferList_.end(); it++) {
329 if ((*it != uniqueId) &&
330 (cachedBufferMap_[*it].config.capacity >= cachedBufferMap_[uniqueId].config.capacity)) {
331 freeBufferList_.insert(it, uniqueId);
332 return;
333 }
334 }
335 freeBufferList_.emplace_back(uniqueId);
336 }
337
CancelBuffer(uint64_t uniqueId)338 Status AVBufferQueueImpl::CancelBuffer(uint64_t uniqueId)
339 {
340 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_INVALID_BUFFER_ID);
341
342 FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_REQUESTED ||
343 cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_PUSHED,
344 Status::ERROR_INVALID_BUFFER_STATE);
345
346 InsertFreeBufferInOrder(uniqueId);
347
348 cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RELEASED;
349
350 requestCondition.notify_all();
351
352 MEDIA_LOG_D("cancel buffer id = %llu", uniqueId);
353
354 return Status::OK;
355 }
356
PushBuffer(uint64_t uniqueId,bool available)357 Status AVBufferQueueImpl::PushBuffer(uint64_t uniqueId, bool available)
358 {
359 std::shared_ptr<AVBuffer> buffer = nullptr;
360 {
361 std::lock_guard<std::mutex> lockGuard(queueMutex_);
362 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(),
363 Status::ERROR_INVALID_BUFFER_ID);
364
365 auto& ele = cachedBufferMap_[uniqueId];
366 if (ele.isDeleting) {
367 DeleteCachedBufferById(uniqueId);
368 MEDIA_LOG_D("delete push buffer uniqueId(%llu)", uniqueId);
369 return Status::OK;
370 }
371
372 if (available) {
373 FALSE_RETURN_V(ele.buffer->GetConfig().size >= 0, Status::ERROR_INVALID_BUFFER_SIZE);
374 }
375
376 FALSE_RETURN_V(ele.state == AVBUFFER_STATE_REQUESTED || ele.state == AVBUFFER_STATE_ATTACHED,
377 Status::ERROR_INVALID_BUFFER_STATE);
378
379 ele.state = AVBUFFER_STATE_PUSHED;
380 buffer = cachedBufferMap_[uniqueId].buffer;
381 }
382
383 if (available) {
384 std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
385 if (brokerListener_ != nullptr) {
386 brokerListener_->OnBufferFilled(buffer);
387 return Status::OK;
388 }
389 }
390
391 return ReturnBuffer(uniqueId, available);
392 }
393
PushBuffer(const std::shared_ptr<AVBuffer> & buffer,bool available)394 Status AVBufferQueueImpl::PushBuffer(const std::shared_ptr<AVBuffer>& buffer, bool available)
395 {
396 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
397
398 return PushBuffer(buffer->GetUniqueId(), available);
399 }
400
ReturnBuffer(uint64_t uniqueId,bool available)401 Status AVBufferQueueImpl::ReturnBuffer(uint64_t uniqueId, bool available)
402 {
403 {
404 std::lock_guard<std::mutex> lockGuard(queueMutex_);
405 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(),
406 Status::ERROR_INVALID_BUFFER_ID);
407
408 if (cachedBufferMap_[uniqueId].isDeleting) {
409 DeleteCachedBufferById(uniqueId);
410 MEDIA_LOG_D("delete return buffer uniqueId(%llu)", uniqueId);
411 return Status::OK;
412 }
413
414 FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_PUSHED,
415 Status::ERROR_INVALID_BUFFER_STATE);
416
417 if (!available) {
418 NOK_RETURN(CancelBuffer(uniqueId));
419 } else {
420 auto& config = cachedBufferMap_[uniqueId].buffer->GetConfig();
421 bool isEosBuffer = cachedBufferMap_[uniqueId].buffer->flag_ & (uint32_t)(Plugins::AVBufferFlag::EOS);
422 if (!isEosBuffer) {
423 FALSE_RETURN_V(config.size > 0, Status::ERROR_INVALID_BUFFER_SIZE);
424 }
425 cachedBufferMap_[uniqueId].config = config;
426 cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RETURNED;
427 dirtyBufferList_.push_back(uniqueId);
428 }
429 }
430
431 if (!available) {
432 std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
433 if (producerListener_ != nullptr) {
434 producerListener_->OnBufferAvailable();
435 }
436 return Status::OK;
437 }
438
439 std::lock_guard<std::mutex> lockGuard(consumerListenerMutex_);
440 FALSE_RETURN_V(consumerListener_ != nullptr, Status::ERROR_NO_CONSUMER_LISTENER);
441 consumerListener_->OnBufferAvailable();
442
443 return Status::OK;
444 }
445
ReturnBuffer(const std::shared_ptr<AVBuffer> & buffer,bool available)446 Status AVBufferQueueImpl::ReturnBuffer(const std::shared_ptr<AVBuffer>& buffer, bool available)
447 {
448 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
449
450 return ReturnBuffer(buffer->GetUniqueId(), available);
451 }
452
AttachBuffer(std::shared_ptr<AVBuffer> & buffer,bool isFilled)453 Status AVBufferQueueImpl::AttachBuffer(std::shared_ptr<AVBuffer>& buffer, bool isFilled)
454 {
455 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
456
457 auto config = buffer->GetConfig();
458 auto uniqueId = buffer->GetUniqueId();
459 {
460 std::lock_guard<std::mutex> lockGuard(queueMutex_);
461 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) == cachedBufferMap_.end(),
462 Status::ERROR_INVALID_BUFFER_ID);
463
464 NOK_RETURN(CheckConfig(config));
465
466 AVBufferElement ele = {
467 .config = config,
468 .state = AVBUFFER_STATE_ATTACHED,
469 .isDeleting = false,
470 .buffer = buffer
471 };
472
473 auto cachedCount = GetCachedBufferCount();
474 auto queueSize = GetQueueSize();
475 if (cachedCount >= queueSize) {
476 auto validCount = static_cast<uint32_t>(dirtyBufferList_.size() + freeBufferList_.size());
477 auto toBeDeleteCount = cachedCount - queueSize;
478 // 这里表示有可以删除的buffer,或者
479 if (validCount > toBeDeleteCount) {
480 // 在什么场景下需要在此处删除buffer?
481 DeleteBuffers(toBeDeleteCount + 1); // 多删除一个,用于attach当前buffer
482 cachedBufferMap_[uniqueId] = ele;
483 MEDIA_LOG_D("uniqueId(%llu) attached with delete", uniqueId);
484 } else {
485 MEDIA_LOG_E("attach failed, out of range");
486 return Status::ERROR_OUT_OF_RANGE;
487 }
488 } else {
489 cachedBufferMap_[uniqueId] = ele;
490 MEDIA_LOG_I("uniqueId(%llu) attached without delete", uniqueId);
491 }
492 }
493
494 if (isFilled) {
495 auto ret = PushBuffer(uniqueId, isFilled);
496 if (ret != Status::OK) {
497 // PushBuffer失败,强制Detach
498 DetachBuffer(uniqueId, true);
499 }
500 return ret;
501 }
502
503 return ReleaseBuffer(uniqueId);
504 }
505
DetachBuffer(uint64_t uniqueId,bool force)506 Status AVBufferQueueImpl::DetachBuffer(uint64_t uniqueId, bool force)
507 {
508 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_INVALID_BUFFER_ID);
509
510 const auto& ele = cachedBufferMap_[uniqueId];
511
512 if (!force) {
513 // 只有生产者或消费者在获取到buffer后才能detach
514 if (ele.state == AVBUFFER_STATE_REQUESTED) {
515 MEDIA_LOG_D("detach buffer(%llu) on state requested", uniqueId);
516 } else if (ele.state == AVBUFFER_STATE_ACQUIRED) {
517 MEDIA_LOG_D("detach buffer(%llu) on state acquired", uniqueId);
518 } else {
519 MEDIA_LOG_W("can not detach buffer(%llu) on state(%d)", uniqueId, ele.state);
520 return Status::ERROR_INVALID_BUFFER_STATE;
521 }
522 }
523
524 cachedBufferMap_.erase(uniqueId);
525
526 return Status::OK;
527 }
528
DetachBuffer(uint64_t uniqueId)529 Status AVBufferQueueImpl::DetachBuffer(uint64_t uniqueId)
530 {
531 std::lock_guard<std::mutex> lockGuard(queueMutex_);
532 return DetachBuffer(uniqueId, false);
533 }
534
DetachBuffer(const std::shared_ptr<AVBuffer> & buffer)535 Status AVBufferQueueImpl::DetachBuffer(const std::shared_ptr<AVBuffer>& buffer)
536 {
537 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
538
539 return DetachBuffer(buffer->GetUniqueId());
540 }
541
AcquireBuffer(std::shared_ptr<AVBuffer> & buffer)542 Status AVBufferQueueImpl::AcquireBuffer(std::shared_ptr<AVBuffer>& buffer)
543 {
544 std::lock_guard<std::mutex> lockGuard(queueMutex_);
545 auto ret = PopFromDirtyBufferList(buffer);
546 if (ret != Status::OK) {
547 MEDIA_LOG_E("acquire buffer failed");
548 return ret;
549 }
550
551 cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_ACQUIRED;
552
553 return Status::OK;
554 }
555
ReleaseBuffer(uint64_t uniqueId)556 Status AVBufferQueueImpl::ReleaseBuffer(uint64_t uniqueId)
557 {
558 {
559 std::lock_guard<std::mutex> lockGuard(queueMutex_);
560 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_INVALID_BUFFER_ID);
561
562 FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_ACQUIRED ||
563 cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_ATTACHED, Status::ERROR_INVALID_BUFFER_STATE);
564
565 cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RELEASED;
566 if (cachedBufferMap_[uniqueId].isDeleting) {
567 DeleteCachedBufferById(uniqueId);
568 return Status::OK;
569 }
570
571 InsertFreeBufferInOrder(uniqueId);
572
573 requestCondition.notify_all();
574 }
575
576 // 注意:此时通知生产者有buffer可用,但实际有可能已经被request wait的生产者获取
577 std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
578 if (producerListener_ != nullptr) {
579 producerListener_->OnBufferAvailable();
580 }
581
582 return Status::OK;
583 }
584
ReleaseBuffer(const std::shared_ptr<AVBuffer> & buffer)585 Status AVBufferQueueImpl::ReleaseBuffer(const std::shared_ptr<AVBuffer>& buffer)
586 {
587 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
588
589 return ReleaseBuffer(buffer->GetUniqueId());
590 }
591
Clear()592 Status AVBufferQueueImpl::Clear()
593 {
594 MEDIA_LOG_E("AVBufferQueueImpl Clear");
595 std::lock_guard<std::mutex> lockGuard(queueMutex_);
596 dirtyBufferList_.clear();
597 for (auto it = cachedBufferMap_.begin(); it != cachedBufferMap_.end(); it++) {
598 if (it->second.state == AVBUFFER_STATE_PUSHED || it->second.state == AVBUFFER_STATE_RETURNED) {
599 it->second.state = AVBUFFER_STATE_RELEASED;
600 InsertFreeBufferInOrder(it->first);
601 }
602 }
603 requestCondition.notify_all();
604 return Status::OK;
605 }
606
SetBrokerListener(sptr<IBrokerListener> & listener)607 Status AVBufferQueueImpl::SetBrokerListener(sptr<IBrokerListener>& listener)
608 {
609 std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
610 brokerListener_ = listener;
611
612 return Status::OK;
613 }
614
SetProducerListener(sptr<IProducerListener> & listener)615 Status AVBufferQueueImpl::SetProducerListener(sptr<IProducerListener>& listener)
616 {
617 std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
618 producerListener_ = listener;
619
620 return Status::OK;
621 }
622
SetConsumerListener(sptr<IConsumerListener> & listener)623 Status AVBufferQueueImpl::SetConsumerListener(sptr<IConsumerListener>& listener)
624 {
625 std::lock_guard<std::mutex> lockGuard(consumerListenerMutex_);
626 consumerListener_ = listener;
627
628 return Status::OK;
629 }
630
631 } // namespace Media
632 } // namespace OHOS
633