• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "codec_buffer_circular.h"
17 #include <sstream>
18 #include "avcodec_errors.h"
19 #include "avcodec_log.h"
20 #include "buffer/avbuffer.h"
21 #include "buffer/avsharedmemory.h"
22 #include "buffer_converter.h"
23 #include "meta/format.h"
24 #include "meta/meta.h"
25 
26 using namespace OHOS::Media;
27 namespace {
28 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN_FRAMEWORK, "CodecBufferCircular"};
29 constexpr int64_t MAX_TIMEOUT =
30     std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::time_point::max())
31         .time_since_epoch()
32         .count() /
33     2;
34 } // namespace
35 namespace OHOS {
36 namespace MediaAVCodec {
~CodecBufferCircular()37 CodecBufferCircular::~CodecBufferCircular()
38 {
39     {
40         std::lock_guard<std::mutex> lock(inMutex_);
41         PrintCaches(false);
42     }
43     {
44         std::lock_guard<std::mutex> lock(outMutex_);
45         PrintCaches(true);
46     }
47 }
48 
SetConverter(std::shared_ptr<BufferConverter> & converter)49 void CodecBufferCircular::SetConverter(std::shared_ptr<BufferConverter> &converter)
50 {
51     converter_ = converter;
52 }
53 
SetCallback(const std::shared_ptr<AVCodecCallback> & callback)54 int32_t CodecBufferCircular::SetCallback(const std::shared_ptr<AVCodecCallback> &callback)
55 {
56     std::scoped_lock lock(inMutex_, outMutex_);
57     CHECK_AND_RETURN_RET_LOG_WITH_TAG(CanEnableMode<MODE_ASYNC>(), AVCS_ERR_INVALID_OPERATION,
58                                       "Can not enable async mode");
59     EnableMode<MODE_ASYNC>();
60     callback_ = callback;
61     return AVCS_ERR_OK;
62 }
63 
SetCallback(const std::shared_ptr<MediaCodecCallback> & callback)64 int32_t CodecBufferCircular::SetCallback(const std::shared_ptr<MediaCodecCallback> &callback)
65 {
66     std::scoped_lock lock(inMutex_, outMutex_);
67     CHECK_AND_RETURN_RET_LOG_WITH_TAG(CanEnableMode<MODE_ASYNC>(), AVCS_ERR_INVALID_OPERATION,
68                                       "Can not enable async mode");
69     EnableMode<MODE_ASYNC>();
70     mediaCb_ = callback;
71     return AVCS_ERR_OK;
72 }
73 
SetCallback(const std::shared_ptr<MediaCodecParameterCallback> & callback)74 int32_t CodecBufferCircular::SetCallback(const std::shared_ptr<MediaCodecParameterCallback> &callback)
75 {
76     std::scoped_lock lock(inMutex_, outMutex_);
77     CHECK_AND_RETURN_RET_LOG_WITH_TAG(attrCb_ == nullptr, AVCS_ERR_INVALID_STATE,
78                                       "Already set parameter with atrribute callback");
79     CHECK_AND_RETURN_RET_LOG_WITH_TAG(CanEnableMode<MODE_ASYNC>(), AVCS_ERR_INVALID_OPERATION,
80                                       "Can not enable async mode");
81     EnableMode<MODE_ASYNC>();
82     paramCb_ = callback;
83     return AVCS_ERR_OK;
84 }
85 
SetCallback(const std::shared_ptr<MediaCodecParameterWithAttrCallback> & callback)86 int32_t CodecBufferCircular::SetCallback(const std::shared_ptr<MediaCodecParameterWithAttrCallback> &callback)
87 {
88     std::scoped_lock lock(inMutex_, outMutex_);
89     CHECK_AND_RETURN_RET_LOG_WITH_TAG(paramCb_ == nullptr, AVCS_ERR_INVALID_STATE, "Already set parameter callback");
90     CHECK_AND_RETURN_RET_LOG_WITH_TAG(CanEnableMode<MODE_ASYNC>(), AVCS_ERR_INVALID_OPERATION,
91                                       "Can not enable async mode");
92     EnableMode<MODE_ASYNC>();
93     attrCb_ = callback;
94     return AVCS_ERR_OK;
95 }
96 
SetIsRunning(bool isRunning)97 void CodecBufferCircular::SetIsRunning(bool isRunning)
98 {
99     std::scoped_lock lock(inMutex_, outMutex_);
100     if (isRunning) {
101         AddFlag(FLAG_IS_RUNNING);
102     } else {
103         RemoveFlag(FLAG_IS_RUNNING);
104         RemoveFlag(FLAG_INPUT_EOS);
105         RemoveFlag(FLAG_OUTPUT_EOS);
106     }
107     inCond_.notify_all();
108     outCond_.notify_all();
109 }
110 
CanEnableSyncMode()111 bool CodecBufferCircular::CanEnableSyncMode()
112 {
113     std::scoped_lock lock(inMutex_, outMutex_);
114     return CanEnableMode<MODE_SYNC>();
115 }
116 
CanEnableAsyncMode()117 bool CodecBufferCircular::CanEnableAsyncMode()
118 {
119     std::scoped_lock lock(inMutex_, outMutex_);
120     return CanEnableMode<MODE_ASYNC>();
121 }
122 
EnableSyncMode()123 void CodecBufferCircular::EnableSyncMode()
124 {
125     std::scoped_lock lock(inMutex_, outMutex_);
126     CHECK_AND_RETURN_LOG_WITH_TAG(CanEnableMode<MODE_SYNC>(), "Can not enable sync mode");
127     EnableMode<MODE_SYNC>();
128 }
129 
EnableAsyncMode()130 void CodecBufferCircular::EnableAsyncMode()
131 {
132     std::scoped_lock lock(inMutex_, outMutex_);
133     CHECK_AND_RETURN_LOG_WITH_TAG(CanEnableMode<MODE_ASYNC>(), "Can not enable async mode");
134     EnableMode<MODE_ASYNC>();
135 }
136 
IsSyncMode()137 bool CodecBufferCircular::IsSyncMode()
138 {
139     std::scoped_lock lock(inMutex_, outMutex_);
140     return HasFlag(FLAG_IS_SYNC) && HasFlag(FLAG_SYNC_ASYNC_CONFIGURED);
141 }
142 
ResetFlag()143 void CodecBufferCircular::ResetFlag()
144 {
145     std::scoped_lock lock(inMutex_, outMutex_);
146     RemoveFlag(FLAG_IS_RUNNING);
147     RemoveFlag(FLAG_IS_SYNC);
148     RemoveFlag(FLAG_SYNC_ASYNC_CONFIGURED);
149     RemoveFlag(FLAG_ERROR);
150     RemoveFlag(FLAG_INPUT_EOS);
151     RemoveFlag(FLAG_OUTPUT_EOS);
152 }
153 
ClearCaches()154 void CodecBufferCircular::ClearCaches()
155 {
156     {
157         std::lock_guard<std::mutex> lock(inMutex_);
158         PrintCaches(false);
159         inCache_.clear();
160         EventQueue emptyQueue;
161         std::swap(inQueue_, emptyQueue);
162     }
163     {
164         std::lock_guard<std::mutex> lock(outMutex_);
165         PrintCaches(true);
166         outCache_.clear();
167         EventQueue emptyQueue;
168         std::swap(outQueue_, emptyQueue);
169     }
170 }
171 
FlushCaches()172 void CodecBufferCircular::FlushCaches()
173 {
174     {
175         std::lock_guard<std::mutex> lock(inMutex_);
176         PrintCaches(false);
177         for (auto &val : inCache_) {
178             val.second.owner = OWNED_BY_SERVER;
179         }
180         EventQueue emptyQueue;
181         std::swap(inQueue_, emptyQueue);
182     }
183     {
184         std::lock_guard<std::mutex> lock(outMutex_);
185         PrintCaches(true);
186         for (auto &val : outCache_) {
187             val.second.owner = OWNED_BY_SERVER;
188         }
189         EventQueue emptyQueue;
190         std::swap(outQueue_, emptyQueue);
191     }
192 }
193 
194 /******************************** Common ********************************/
HandleInputBuffer(uint32_t index,AVCodecBufferInfo info,AVCodecBufferFlag flag)195 int32_t CodecBufferCircular::HandleInputBuffer(uint32_t index, AVCodecBufferInfo info, AVCodecBufferFlag flag)
196 {
197     // Api9
198     std::lock_guard<std::mutex> lock(inMutex_);
199     CHECK_AND_RETURN_RET_LOG_WITH_TAG(!HasFlag(FLAG_IS_SYNC), AVCS_ERR_INVALID_OPERATION, "Not support sync mode");
200     BufferCacheIter iter = inCache_.find(index);
201     CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_RUNNING), AVCS_ERR_INVALID_STATE, "Not in running state");
202     if (iter == inCache_.end()) {
203         AVCODEC_LOGW_WITH_TAG("Index is invalid %{public}u", index);
204         return AVCS_ERR_OK;
205     }
206     BufferItem &item = iter->second;
207     item.owner = OWNED_BY_SERVER;
208     CHECK_AND_RETURN_RET_LOG_WITH_TAG(item.buffer != nullptr, AVCS_ERR_INVALID_OPERATION, "Buffer is nullptr");
209     CHECK_AND_RETURN_RET_LOG_WITH_TAG(item.memory != nullptr, AVCS_ERR_INVALID_OPERATION, "Memory is nullptr");
210     CHECK_AND_RETURN_RET_LOG_WITH_TAG(converter_ != nullptr, AVCS_ERR_INVALID_OPERATION, "Converter is nullptr");
211     CHECK_AND_RETURN_RET_LOG_WITH_TAG(item.buffer->memory_ != nullptr, AVCS_ERR_INVALID_OPERATION,
212                                       "Get buffer memory is nullptr");
213     item.buffer->pts_ = info.presentationTimeUs;
214     item.buffer->flag_ = static_cast<uint32_t>(flag);
215     item.buffer->memory_->SetOffset(info.offset);
216     item.buffer->memory_->SetSize(info.size);
217 
218     item.pts = info.presentationTimeUs;
219     item.flag = static_cast<uint32_t>(flag);
220     item.size = info.size;
221 
222     converter_->WriteToBuffer(item.buffer, item.memory);
223     return AVCS_ERR_OK;
224 }
225 
HandleInputBuffer(uint32_t index)226 int32_t CodecBufferCircular::HandleInputBuffer(uint32_t index)
227 {
228     std::lock_guard<std::mutex> lock(inMutex_);
229     CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_RUNNING), AVCS_ERR_INVALID_STATE, "Not in running state");
230     BufferCacheIter iter = inCache_.find(index);
231     if (iter == inCache_.end()) {
232         AVCODEC_LOGW_WITH_TAG("Index is invalid %{public}u", index);
233         return HasFlag(FLAG_IS_SYNC) ? AVCS_ERR_INVALID_OPERATION : AVCS_ERR_OK;
234     }
235     BufferItem &item = iter->second;
236     CHECK_AND_RETURN_RET_LOG_WITH_TAG(item.owner == OWNED_BY_USER, AVCS_ERR_INVALID_OPERATION,
237                                       "Invalid ownership:%{public}s", OwnerToString(item.owner).c_str());
238     item.owner = OWNED_BY_SERVER;
239     if (item.buffer != nullptr) {
240         item.pts = item.buffer->pts_;
241         item.flag = item.buffer->flag_;
242         item.size = (item.buffer->memory_ != nullptr) ? item.buffer->memory_->GetSize() : 0;
243     }
244     return AVCS_ERR_OK;
245 }
246 
HandleOutputBuffer(uint32_t index)247 int32_t CodecBufferCircular::HandleOutputBuffer(uint32_t index)
248 {
249     std::lock_guard<std::mutex> lock(outMutex_);
250     CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_RUNNING), AVCS_ERR_INVALID_STATE, "Not in running state");
251     BufferCacheIter iter = outCache_.find(index);
252     if (iter == outCache_.end()) {
253         AVCODEC_LOGW_WITH_TAG("Index is invalid %{public}u", index);
254         return HasFlag(FLAG_IS_SYNC) ? AVCS_ERR_INVALID_OPERATION : AVCS_ERR_OK;
255     }
256     BufferItem &item = iter->second;
257     CHECK_AND_RETURN_RET_LOG_WITH_TAG(item.owner == OWNED_BY_USER, AVCS_ERR_INVALID_OPERATION,
258                                       "Invalid ownership:%{public}s", OwnerToString(item.owner).c_str());
259     item.owner = OWNED_BY_SERVER;
260     if (item.buffer != nullptr) {
261         item.pts = item.buffer->pts_;
262         item.flag = item.buffer->flag_;
263         item.size = (item.buffer->memory_ != nullptr) ? item.buffer->memory_->GetSize() : 0;
264     }
265     return AVCS_ERR_OK;
266 }
267 
QueueInputBufferDone(uint32_t index)268 void CodecBufferCircular::QueueInputBufferDone(uint32_t index)
269 {
270     std::lock_guard<std::mutex> lock(inMutex_);
271     BufferCacheIter iter = inCache_.find(index);
272     if (iter == inCache_.end()) {
273         AVCODEC_LOGD_WITH_TAG("index=%{public}u", index);
274         return;
275     }
276     // The current owner of buffer is server and cannot read info from this buffer
277     BufferItem &item = iter->second;
278     if (item.flag & AVCODEC_BUFFER_FLAG_EOS) {
279         AddFlag(FLAG_INPUT_EOS);
280         inCond_.notify_all();
281     }
282     AVCODEC_LOGD_WITH_TAG("index=%{public}u, size=%{public}d, flag=%{public}u, pts=%{public}" PRId64, index, item.size,
283                           item.flag, item.pts);
284 }
285 
ReleaseOutputBufferDone(uint32_t index)286 void CodecBufferCircular::ReleaseOutputBufferDone(uint32_t index)
287 {
288     std::lock_guard<std::mutex> lock(outMutex_);
289     BufferCacheIter iter = outCache_.find(index);
290     if (iter == outCache_.end()) {
291         AVCODEC_LOGD_WITH_TAG("index=%{public}u", index);
292         return;
293     }
294     // The current owner of buffer is server and cannot read info from this buffer
295     BufferItem &item = iter->second;
296     AVCODEC_LOGD_WITH_TAG("index=%{public}u, size=%{public}d, flag=%{public}u, pts=%{public}" PRId64, index, item.size,
297                           item.flag, item.pts);
298 }
299 
NotifyEos()300 void CodecBufferCircular::NotifyEos()
301 {
302     std::lock_guard<std::mutex> lock(inMutex_);
303     AddFlag(FLAG_INPUT_EOS);
304     inCond_.notify_all();
305 }
306 
GetParameter(BufferCacheIter & iter)307 std::shared_ptr<Format> CodecBufferCircular::GetParameter(BufferCacheIter &iter)
308 {
309     if (iter->second.parameter == nullptr) {
310         iter->second.parameter = std::make_shared<Format>();
311         iter->second.parameter->SetMetaPtr(iter->second.buffer->meta_);
312     }
313     return iter->second.parameter;
314 }
315 
GetAttribute(BufferCacheIter & iter)316 std::shared_ptr<Format> CodecBufferCircular::GetAttribute(BufferCacheIter &iter)
317 {
318     if (iter->second.attribute == nullptr) {
319         iter->second.attribute = std::make_shared<Format>();
320     }
321     iter->second.attribute->PutLongValue(Media::Tag::MEDIA_TIME_STAMP, iter->second.buffer->pts_);
322     return iter->second.attribute;
323 }
324 
HasFlag(const CodecCircularFlag flag)325 inline bool CodecBufferCircular::HasFlag(const CodecCircularFlag flag)
326 {
327     return flags_ & flag;
328 }
329 
AddFlag(const CodecCircularFlag flag)330 inline void CodecBufferCircular::AddFlag(const CodecCircularFlag flag)
331 {
332     flags_ |= flag;
333 }
334 
RemoveFlag(const CodecCircularFlag flag)335 inline void CodecBufferCircular::RemoveFlag(const CodecCircularFlag flag)
336 {
337     flags_ &= ~flag;
338 }
339 
340 /******************************** DFX ********************************/
PrintCaches(bool isOutput)341 void CodecBufferCircular::PrintCaches(bool isOutput)
342 {
343     BufferCache &cache = isOutput ? outCache_ : inCache_;
344     std::array<std::vector<uint32_t>, 3> ownerArrays; // 3: [SERVER = 0, CLIENT = 1, USER = 2]
345     for (const auto &[key, item] : cache) {
346         ownerArrays[item.owner].emplace_back(key);
347     }
348     auto getCacheInfo = [](const std::vector<uint32_t> &keys, const char *ownerstr) {
349         std::ostringstream oss;
350         oss << ownerstr << "(";
351         if (!keys.empty()) {
352             auto it = keys.begin();
353             oss << *it;
354             for (++it; it != keys.end(); ++it) {
355                 oss << "," << *it;
356             }
357         }
358         oss << ")";
359         return oss.str();
360     };
361     const std::string userInfo = getCacheInfo(ownerArrays[OWNED_BY_USER], "user");
362     const std::string clientInfo = HasFlag(FLAG_IS_SYNC) ? getCacheInfo(ownerArrays[OWNED_BY_CLIENT], ",client") : "";
363     const std::string serverInfo = getCacheInfo(ownerArrays[OWNED_BY_SERVER], ",server");
364     AVCODEC_LOGI_WITH_TAG("%{public}s cache:%{public}s%{public}s%{public}s", (isOutput ? "out" : "in"),
365                           userInfo.c_str(), clientInfo.c_str(), serverInfo.c_str());
366 }
367 
OwnerToString(BufferOwner owner)368 const std::string &CodecBufferCircular::OwnerToString(BufferOwner owner)
369 {
370     static std::unordered_map<BufferOwner, const std::string> ownerStringMap = {
371         {OWNED_BY_SERVER, "server"},
372         {OWNED_BY_CLIENT, "client"},
373         {OWNED_BY_USER, "user"},
374     };
375     static const std::string defaultString = "unknown";
376     auto iter = ownerStringMap.find(owner);
377     return iter == ownerStringMap.end() ? defaultString : iter->second;
378 }
379 
ClearOutputBufferOwnedByCodec()380 void CodecBufferCircular::ClearOutputBufferOwnedByCodec()
381 {
382     for (auto iter = outCache_.begin(); iter != outCache_.end();) {
383         if (iter->second.owner == OWNED_BY_SERVER) {
384             iter = outCache_.erase(iter);
385         } else {
386             ++iter;
387         }
388     }
389 }
390 
391 /******************************** Callback ********************************/
OnError(AVCodecErrorType errorType,int32_t errorCode)392 void CodecBufferCircular::OnError(AVCodecErrorType errorType, int32_t errorCode)
393 {
394     IsSyncMode() ? SyncOnError(errorType, errorCode) : AsyncOnError(errorType, errorCode);
395 }
396 
OnOutputFormatChanged(const Format & format)397 void CodecBufferCircular::OnOutputFormatChanged(const Format &format)
398 {
399     IsSyncMode() ? SyncOnOutputFormatChanged(format) : AsyncOnOutputFormatChanged(format);
400 }
401 
OnInputBufferAvailable(uint32_t index,std::shared_ptr<AVBuffer> buffer)402 void CodecBufferCircular::OnInputBufferAvailable(uint32_t index, std::shared_ptr<AVBuffer> buffer)
403 {
404     CHECK_AND_RETURN_LOG_WITH_TAG(buffer != nullptr, "buffer is nullptr");
405     {
406         std::unique_lock<std::mutex> lock(inMutex_);
407         if (HasFlag(FLAG_INPUT_EOS)) {
408             AVCODEC_LOGD_WITH_TAG("At eos state, no buffer available");
409             return;
410         }
411     }
412     IsSyncMode() ? SyncOnInputBufferAvailable(index, buffer) : AsyncOnInputBufferAvailable(index, buffer);
413 }
414 
OnOutputBufferAvailable(uint32_t index,std::shared_ptr<AVBuffer> buffer)415 void CodecBufferCircular::OnOutputBufferAvailable(uint32_t index, std::shared_ptr<AVBuffer> buffer)
416 {
417     CHECK_AND_RETURN_LOG_WITH_TAG(buffer != nullptr, "buffer is nullptr");
418     IsSyncMode() ? SyncOnOutputBufferAvailable(index, buffer) : AsyncOnOutputBufferAvailable(index, buffer);
419 }
420 
OnOutputBufferBinded(std::map<uint32_t,sptr<SurfaceBuffer>> & bufferMap)421 void CodecBufferCircular::OnOutputBufferBinded(std::map<uint32_t, sptr<SurfaceBuffer>> &bufferMap)
422 {
423     if (!IsSyncMode() && (mediaCb_ != nullptr)) {
424         mediaCb_->OnOutputBufferBinded(bufferMap);
425     }
426 }
427 
OnOutputBufferUnbinded()428 void CodecBufferCircular::OnOutputBufferUnbinded()
429 {
430     if (!IsSyncMode() && (mediaCb_ != nullptr)) {
431         mediaCb_->OnOutputBufferUnbinded();
432     }
433 }
434 
435 /******************************** Async mode ********************************/
AsyncOnError(AVCodecErrorType errorType,int32_t errorCode)436 void CodecBufferCircular::AsyncOnError(AVCodecErrorType errorType, int32_t errorCode)
437 {
438     if (errorType == AVCODEC_ERROR_FRAMEAORK_FAILED) {
439         return;
440     }
441     std::shared_ptr<MediaCodecCallback> mediaCb = nullptr;
442     std::shared_ptr<AVCodecCallback> callback = nullptr;
443     {
444         std::scoped_lock lock(inMutex_, outMutex_);
445         mediaCb = mediaCb_;
446         callback = callback_;
447     }
448     // AVBuffer callback
449     if (mediaCb != nullptr) {
450         mediaCb->OnError(errorType, errorCode);
451         return;
452     }
453     // Api9 callback
454     if (callback != nullptr) {
455         callback->OnError(errorType, errorCode);
456         return;
457     }
458 }
459 
AsyncOnOutputFormatChanged(const Format & format)460 void CodecBufferCircular::AsyncOnOutputFormatChanged(const Format &format)
461 {
462     std::unique_lock<std::mutex> lock(outMutex_);
463     ClearOutputBufferOwnedByCodec();
464     // AVBuffer callback
465     auto mediaCb = mediaCb_;
466     if (mediaCb != nullptr) {
467         lock.unlock();
468         mediaCb->OnOutputFormatChanged(format);
469         return;
470     }
471     // Api9 callback
472     auto callback = callback_;
473     if (callback != nullptr) {
474         lock.unlock();
475         callback->OnOutputFormatChanged(format);
476         return;
477     }
478 }
479 
AsyncOnInputBufferAvailable(uint32_t index,std::shared_ptr<AVBuffer> & buffer)480 void CodecBufferCircular::AsyncOnInputBufferAvailable(uint32_t index, std::shared_ptr<AVBuffer> &buffer)
481 {
482     std::unique_lock<std::mutex> lock(inMutex_);
483     BufferCacheIter iter = inCache_.find(index);
484     if (iter == inCache_.end()) {
485         BufferItem item = {.buffer = buffer};
486         iter = inCache_.emplace(index, item).first;
487     } else {
488         iter->second.buffer = buffer;
489     }
490     BufferItem &item = iter->second;
491     item.owner = OWNED_BY_USER;
492     // Encoder parameter with attribute callback
493     auto attrCb = attrCb_;
494     if (attrCb != nullptr) {
495         auto attribute = GetAttribute(iter);
496         auto parameter = GetParameter(iter);
497         lock.unlock();
498         attrCb->OnInputParameterWithAttrAvailable(index, attribute, parameter);
499         return;
500     }
501     // Encoder parameter callback
502     auto paramCb = paramCb_;
503     if (paramCb != nullptr) {
504         auto parameter = GetParameter(iter);
505         lock.unlock();
506         paramCb->OnInputParameterAvailable(index, parameter);
507         return;
508     }
509     // AVBuffer callback
510     auto mediaCb = mediaCb_;
511     if (mediaCb != nullptr) {
512         item.buffer->pts_ = 0;
513         lock.unlock();
514         mediaCb->OnInputBufferAvailable(index, item.buffer);
515         return;
516     }
517     // Api9 callback
518     auto callback = callback_;
519     if (callback != nullptr) {
520         item.buffer->pts_ = 0;
521         ConvertToSharedMemory(item.buffer, item.memory);
522         if (converter_ != nullptr) {
523             converter_->SetInputBufferFormat(item.buffer);
524         }
525         lock.unlock();
526         callback->OnInputBufferAvailable(index, item.memory);
527         return;
528     }
529 }
530 
AsyncOnOutputBufferAvailable(uint32_t index,std::shared_ptr<AVBuffer> & buffer)531 void CodecBufferCircular::AsyncOnOutputBufferAvailable(uint32_t index, std::shared_ptr<AVBuffer> &buffer)
532 {
533     std::unique_lock<std::mutex> lock(outMutex_);
534     BufferCacheIter iter = outCache_.find(index);
535     if (iter == outCache_.end()) {
536         BufferItem item = {.buffer = buffer};
537         iter = outCache_.emplace(index, item).first;
538     } else {
539         iter->second.buffer = buffer;
540     }
541     BufferItem &item = iter->second;
542     item.owner = OWNED_BY_USER;
543     // AVBuffer callback
544     auto mediaCb = mediaCb_;
545     if (mediaCb != nullptr) {
546         lock.unlock();
547         mediaCb->OnOutputBufferAvailable(index, item.buffer);
548         return;
549     }
550     // Api9 callback
551     auto callback = callback_;
552     if (callback != nullptr) {
553         ConvertToSharedMemory(item.buffer, item.memory);
554         if (converter_ != nullptr) {
555             converter_->SetOutputBufferFormat(item.buffer);
556             converter_->ReadFromBuffer(item.buffer, item.memory);
557         }
558         AVCodecBufferFlag flag = static_cast<AVCodecBufferFlag>(item.buffer->flag_);
559         AVCodecBufferInfo info;
560         info.presentationTimeUs = item.buffer->pts_;
561         if (item.buffer->memory_ != nullptr) {
562             info.offset = item.buffer->memory_->GetOffset();
563             info.size = item.buffer->memory_->GetSize();
564         }
565         lock.unlock();
566         callback->OnOutputBufferAvailable(index, info, flag, item.memory);
567         return;
568     }
569 }
570 
ConvertToSharedMemory(const std::shared_ptr<AVBuffer> & buffer,std::shared_ptr<AVSharedMemory> & memory)571 void CodecBufferCircular::ConvertToSharedMemory(const std::shared_ptr<AVBuffer> &buffer,
572                                                 std::shared_ptr<AVSharedMemory> &memory)
573 {
574     // Api9
575     using Flags = AVSharedMemory::Flags;
576     std::shared_ptr<AVMemory> &bufferMem = buffer->memory_;
577     if (bufferMem == nullptr || memory != nullptr) {
578         return;
579     }
580     MemoryType type = bufferMem->GetMemoryType();
581     int32_t capacity = bufferMem->GetCapacity();
582     if (type == MemoryType::SHARED_MEMORY) {
583         std::string name = std::string("SharedMem_") + std::to_string(buffer->GetUniqueId());
584         int32_t fd = bufferMem->GetFileDescriptor();
585         bool isReadable = bufferMem->GetMemoryFlag() == MemoryFlag::MEMORY_READ_ONLY;
586         uint32_t flag = isReadable ? Flags::FLAGS_READ_ONLY : Flags::FLAGS_READ_WRITE;
587         memory = AVSharedMemoryBase::CreateFromRemote(fd, capacity, flag, name);
588     } else {
589         std::string name = std::string("SharedMem_") + std::to_string(buffer->GetUniqueId());
590         memory = AVSharedMemoryBase::CreateFromLocal(capacity, Flags::FLAGS_READ_WRITE, name);
591         if (memory == nullptr) {
592             AVCODEC_LOGW_WITH_TAG("Create shared memory from local failed");
593         }
594     }
595 }
596 
597 /******************************** Sync mode ********************************/
SyncOnError(AVCodecErrorType errorType,int32_t errorCode)598 void CodecBufferCircular::SyncOnError(AVCodecErrorType errorType, int32_t errorCode)
599 {
600     std::scoped_lock lock(inMutex_, outMutex_);
601     lastError_ = errorCode;
602     AddFlag(FLAG_ERROR);
603     outCond_.notify_all();
604     inCond_.notify_all();
605 }
606 
SyncOnOutputFormatChanged(const Format & format)607 void CodecBufferCircular::SyncOnOutputFormatChanged(const Format &format)
608 {
609     std::lock_guard<std::mutex> lock(outMutex_);
610     outQueue_.push(Event({.type = EVENT_STREAM_CHANGED}));
611     ClearOutputBufferOwnedByCodec();
612     outCond_.notify_all();
613 }
614 
SyncOnInputBufferAvailable(uint32_t index,std::shared_ptr<AVBuffer> & buffer)615 void CodecBufferCircular::SyncOnInputBufferAvailable(uint32_t index, std::shared_ptr<AVBuffer> &buffer)
616 {
617     std::lock_guard<std::mutex> lock(inMutex_);
618     BufferCacheIter iter = inCache_.find(index);
619     if (iter == inCache_.end()) {
620         BufferItem item = {.buffer = buffer};
621         iter = inCache_.emplace(index, item).first;
622     } else {
623         iter->second.buffer = buffer;
624     }
625     iter->second.buffer->pts_ = 0;
626     iter->second.owner = OWNED_BY_CLIENT;
627     inQueue_.push(Event({.type = EVENT_INPUT_BUFFER, .index = index}));
628     inCond_.notify_all();
629 }
630 
SyncOnOutputBufferAvailable(uint32_t index,std::shared_ptr<AVBuffer> & buffer)631 void CodecBufferCircular::SyncOnOutputBufferAvailable(uint32_t index, std::shared_ptr<AVBuffer> &buffer)
632 {
633     std::lock_guard<std::mutex> lock(outMutex_);
634     BufferCacheIter iter = outCache_.find(index);
635     if (iter == outCache_.end()) {
636         BufferItem item = {.buffer = buffer};
637         iter = outCache_.emplace(index, item).first;
638     } else {
639         iter->second.buffer = buffer;
640     }
641     iter->second.owner = OWNED_BY_CLIENT;
642     outQueue_.push(Event({.type = EVENT_OUTPUT_BUFFER, .index = index}));
643     outCond_.notify_all();
644 }
645 
QueryInputBuffer(uint32_t & index,int64_t timeoutUs)646 int32_t CodecBufferCircular::QueryInputBuffer(uint32_t &index, int64_t timeoutUs)
647 {
648     return QueryInputIndex(index, timeoutUs);
649 }
650 
QueryOutputBuffer(uint32_t & index,int64_t timeoutUs)651 int32_t CodecBufferCircular::QueryOutputBuffer(uint32_t &index, int64_t timeoutUs)
652 {
653     return QueryOutputIndex(index, timeoutUs);
654 }
655 
QueryInputIndex(uint32_t & index,int64_t timeoutUs)656 int32_t CodecBufferCircular::QueryInputIndex(uint32_t &index, int64_t timeoutUs)
657 {
658     std::unique_lock<std::mutex> lock(inMutex_);
659     CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_SYNC), AVCS_ERR_INVALID_OPERATION, "Need enable sync mode");
660     bool isNotTimeout = WaitForInputBuffer(lock, timeoutUs);
661 
662     CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_RUNNING), AVCS_ERR_INVALID_STATE, "Not in running state");
663     CHECK_AND_RETURN_RET_LOG_WITH_TAG(!HasFlag(FLAG_INPUT_EOS), AVCS_ERR_INVALID_STATE, "End-of-stream pushed");
664     CHECK_AND_RETURN_RET_LOG_WITH_TAG(!HasFlag(FLAG_ERROR), lastError_, "%{public}s",
665                                       AVCSErrorToString(static_cast<AVCodecServiceErrCode>(lastError_)).c_str());
666     if (!isNotTimeout) {
667         return AVCS_ERR_TRY_AGAIN;
668     }
669     Event event = inQueue_.front();
670     inQueue_.pop();
671     index = event.index;
672     BufferCacheIter iter = inCache_.find(index);
673     if (iter != inCache_.end()) {
674         iter->second.owner = OWNED_BY_USER;
675     }
676     return AVCS_ERR_OK;
677 }
678 
QueryOutputIndex(uint32_t & index,int64_t timeoutUs)679 int32_t CodecBufferCircular::QueryOutputIndex(uint32_t &index, int64_t timeoutUs)
680 {
681     std::unique_lock<std::mutex> lock(outMutex_);
682     CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_SYNC), AVCS_ERR_INVALID_OPERATION, "Need enable sync mode");
683     bool isNotTimeout = WaitForOutputBuffer(lock, timeoutUs);
684 
685     CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_RUNNING), AVCS_ERR_INVALID_STATE, "Not in running state");
686     CHECK_AND_RETURN_RET_LOG_WITH_TAG(!HasFlag(FLAG_OUTPUT_EOS), AVCS_ERR_INVALID_STATE, "End-of-stream reached");
687     CHECK_AND_RETURN_RET_LOG_WITH_TAG(!HasFlag(FLAG_ERROR), lastError_, "%{public}s",
688                                       AVCSErrorToString(static_cast<AVCodecServiceErrCode>(lastError_)).c_str());
689     if (!isNotTimeout) {
690         return AVCS_ERR_TRY_AGAIN;
691     }
692     Event event = outQueue_.front();
693     outQueue_.pop();
694     if (event.type == EVENT_STREAM_CHANGED) {
695         AVCODEC_LOGI_WITH_TAG("Output format changed");
696         return AVCS_ERR_STREAM_CHANGED;
697     }
698     index = event.index;
699     BufferCacheIter iter = outCache_.find(index);
700     if (iter == outCache_.end()) {
701         return AVCS_ERR_OK;
702     }
703     BufferItem &item = iter->second;
704     item.owner = OWNED_BY_USER;
705     if (item.buffer != nullptr) {
706         item.flag = item.buffer->flag_;
707     }
708     if (item.flag & AVCODEC_BUFFER_FLAG_EOS) {
709         AddFlag(FLAG_OUTPUT_EOS);
710         outCond_.notify_all();
711     }
712     return AVCS_ERR_OK;
713 }
714 
WaitForInputBuffer(std::unique_lock<std::mutex> & lock,int64_t timeoutUs)715 bool CodecBufferCircular::WaitForInputBuffer(std::unique_lock<std::mutex> &lock, int64_t timeoutUs)
716 {
717     const auto predicate = [this] {
718         return !HasFlag(FLAG_IS_RUNNING) || // [1] Not in running state
719                HasFlag(FLAG_INPUT_EOS) ||   // [2] End-of-stream pushed
720                HasFlag(FLAG_ERROR) ||       // [3] Error state detected
721                inQueue_.size() > 0;         // [4] Input buffer available
722     };
723 
724     if (timeoutUs < 0) {
725         inCond_.wait(lock, predicate);
726         return true; // Always returns true after wait
727     }
728     if (timeoutUs == 0) {
729         return predicate(); // Immediate status check
730     }
731     if (timeoutUs > MAX_TIMEOUT) {
732         timeoutUs = MAX_TIMEOUT;
733     }
734     // Returns true if predicate satisfied
735     return inCond_.wait_for(lock, std::chrono::microseconds(timeoutUs), predicate);
736 }
737 
WaitForOutputBuffer(std::unique_lock<std::mutex> & lock,int64_t timeoutUs)738 bool CodecBufferCircular::WaitForOutputBuffer(std::unique_lock<std::mutex> &lock, int64_t timeoutUs)
739 {
740     const auto predicate = [this] {
741         return !HasFlag(FLAG_IS_RUNNING) || // [1] Not in running state
742                HasFlag(FLAG_OUTPUT_EOS) ||  // [2] End-of-stream reached
743                HasFlag(FLAG_ERROR) ||       // [3] Error state detected
744                outQueue_.size() > 0;        // [4] Output buffer available | Stream description changed
745     };
746 
747     if (timeoutUs < 0) {
748         outCond_.wait(lock, predicate);
749         return true;
750     }
751     if (timeoutUs == 0) {
752         return predicate();
753     }
754     if (timeoutUs > MAX_TIMEOUT) {
755         timeoutUs = MAX_TIMEOUT;
756     }
757     return outCond_.wait_for(lock, std::chrono::microseconds(timeoutUs), predicate);
758 }
759 
GetInputBuffer(uint32_t index)760 std::shared_ptr<AVBuffer> CodecBufferCircular::GetInputBuffer(uint32_t index)
761 {
762     std::lock_guard<std::mutex> lock(inMutex_);
763     CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_SYNC), nullptr, "Need enable sync mode");
764     CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_RUNNING), nullptr, "Not in running state");
765     CHECK_AND_RETURN_RET_LOG_WITH_TAG(!HasFlag(FLAG_ERROR), nullptr, "%{public}s",
766                                       AVCSErrorToString(static_cast<AVCodecServiceErrCode>(lastError_)).c_str());
767     BufferCacheIter iter = inCache_.find(index);
768     CHECK_AND_RETURN_RET_LOG_WITH_TAG(iter != inCache_.end(), nullptr, "Index is invalid %{public}u", index);
769     CHECK_AND_RETURN_RET_LOG_WITH_TAG(iter->second.owner == OWNED_BY_USER, nullptr, "Invalid ownership:%{public}s",
770                                       OwnerToString(iter->second.owner).c_str());
771     return iter->second.buffer;
772 }
773 
GetOutputBuffer(uint32_t index)774 std::shared_ptr<AVBuffer> CodecBufferCircular::GetOutputBuffer(uint32_t index)
775 {
776     std::lock_guard<std::mutex> lock(outMutex_);
777     CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_SYNC), nullptr, "Need enable sync mode");
778     CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_RUNNING), nullptr, "Not in running state");
779     CHECK_AND_RETURN_RET_LOG_WITH_TAG(!HasFlag(FLAG_ERROR), nullptr, "%{public}s",
780                                       AVCSErrorToString(static_cast<AVCodecServiceErrCode>(lastError_)).c_str());
781     BufferCacheIter iter = outCache_.find(index);
782     CHECK_AND_RETURN_RET_LOG_WITH_TAG(iter != outCache_.end(), nullptr, "Index is invalid %{public}u", index);
783     CHECK_AND_RETURN_RET_LOG_WITH_TAG(iter->second.owner == OWNED_BY_USER, nullptr, "Invalid ownership:%{public}s",
784                                       OwnerToString(iter->second.owner).c_str());
785     return iter->second.buffer;
786 }
787 } // namespace MediaAVCodec
788 } // namespace OHOS