1 /*
2 * Copyright (C) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "codec_buffer_circular.h"
17 #include <sstream>
18 #include "avcodec_errors.h"
19 #include "avcodec_log.h"
20 #include "buffer/avbuffer.h"
21 #include "buffer/avsharedmemory.h"
22 #include "buffer_converter.h"
23 #include "meta/format.h"
24 #include "meta/meta.h"
25
26 using namespace OHOS::Media;
27 namespace {
28 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN_FRAMEWORK, "CodecBufferCircular"};
29 constexpr int64_t MAX_TIMEOUT =
30 std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::time_point::max())
31 .time_since_epoch()
32 .count() /
33 2;
34 } // namespace
35 namespace OHOS {
36 namespace MediaAVCodec {
~CodecBufferCircular()37 CodecBufferCircular::~CodecBufferCircular()
38 {
39 {
40 std::lock_guard<std::mutex> lock(inMutex_);
41 PrintCaches(false);
42 }
43 {
44 std::lock_guard<std::mutex> lock(outMutex_);
45 PrintCaches(true);
46 }
47 }
48
SetConverter(std::shared_ptr<BufferConverter> & converter)49 void CodecBufferCircular::SetConverter(std::shared_ptr<BufferConverter> &converter)
50 {
51 converter_ = converter;
52 }
53
SetCallback(const std::shared_ptr<AVCodecCallback> & callback)54 int32_t CodecBufferCircular::SetCallback(const std::shared_ptr<AVCodecCallback> &callback)
55 {
56 std::scoped_lock lock(inMutex_, outMutex_);
57 CHECK_AND_RETURN_RET_LOG_WITH_TAG(CanEnableMode<MODE_ASYNC>(), AVCS_ERR_INVALID_OPERATION,
58 "Can not enable async mode");
59 EnableMode<MODE_ASYNC>();
60 callback_ = callback;
61 return AVCS_ERR_OK;
62 }
63
SetCallback(const std::shared_ptr<MediaCodecCallback> & callback)64 int32_t CodecBufferCircular::SetCallback(const std::shared_ptr<MediaCodecCallback> &callback)
65 {
66 std::scoped_lock lock(inMutex_, outMutex_);
67 CHECK_AND_RETURN_RET_LOG_WITH_TAG(CanEnableMode<MODE_ASYNC>(), AVCS_ERR_INVALID_OPERATION,
68 "Can not enable async mode");
69 EnableMode<MODE_ASYNC>();
70 mediaCb_ = callback;
71 return AVCS_ERR_OK;
72 }
73
SetCallback(const std::shared_ptr<MediaCodecParameterCallback> & callback)74 int32_t CodecBufferCircular::SetCallback(const std::shared_ptr<MediaCodecParameterCallback> &callback)
75 {
76 std::scoped_lock lock(inMutex_, outMutex_);
77 CHECK_AND_RETURN_RET_LOG_WITH_TAG(attrCb_ == nullptr, AVCS_ERR_INVALID_STATE,
78 "Already set parameter with atrribute callback");
79 CHECK_AND_RETURN_RET_LOG_WITH_TAG(CanEnableMode<MODE_ASYNC>(), AVCS_ERR_INVALID_OPERATION,
80 "Can not enable async mode");
81 EnableMode<MODE_ASYNC>();
82 paramCb_ = callback;
83 return AVCS_ERR_OK;
84 }
85
SetCallback(const std::shared_ptr<MediaCodecParameterWithAttrCallback> & callback)86 int32_t CodecBufferCircular::SetCallback(const std::shared_ptr<MediaCodecParameterWithAttrCallback> &callback)
87 {
88 std::scoped_lock lock(inMutex_, outMutex_);
89 CHECK_AND_RETURN_RET_LOG_WITH_TAG(paramCb_ == nullptr, AVCS_ERR_INVALID_STATE, "Already set parameter callback");
90 CHECK_AND_RETURN_RET_LOG_WITH_TAG(CanEnableMode<MODE_ASYNC>(), AVCS_ERR_INVALID_OPERATION,
91 "Can not enable async mode");
92 EnableMode<MODE_ASYNC>();
93 attrCb_ = callback;
94 return AVCS_ERR_OK;
95 }
96
SetIsRunning(bool isRunning)97 void CodecBufferCircular::SetIsRunning(bool isRunning)
98 {
99 std::scoped_lock lock(inMutex_, outMutex_);
100 if (isRunning) {
101 AddFlag(FLAG_IS_RUNNING);
102 } else {
103 RemoveFlag(FLAG_IS_RUNNING);
104 RemoveFlag(FLAG_INPUT_EOS);
105 RemoveFlag(FLAG_OUTPUT_EOS);
106 }
107 inCond_.notify_all();
108 outCond_.notify_all();
109 }
110
CanEnableSyncMode()111 bool CodecBufferCircular::CanEnableSyncMode()
112 {
113 std::scoped_lock lock(inMutex_, outMutex_);
114 return CanEnableMode<MODE_SYNC>();
115 }
116
CanEnableAsyncMode()117 bool CodecBufferCircular::CanEnableAsyncMode()
118 {
119 std::scoped_lock lock(inMutex_, outMutex_);
120 return CanEnableMode<MODE_ASYNC>();
121 }
122
EnableSyncMode()123 void CodecBufferCircular::EnableSyncMode()
124 {
125 std::scoped_lock lock(inMutex_, outMutex_);
126 CHECK_AND_RETURN_LOG_WITH_TAG(CanEnableMode<MODE_SYNC>(), "Can not enable sync mode");
127 EnableMode<MODE_SYNC>();
128 }
129
EnableAsyncMode()130 void CodecBufferCircular::EnableAsyncMode()
131 {
132 std::scoped_lock lock(inMutex_, outMutex_);
133 CHECK_AND_RETURN_LOG_WITH_TAG(CanEnableMode<MODE_ASYNC>(), "Can not enable async mode");
134 EnableMode<MODE_ASYNC>();
135 }
136
IsSyncMode()137 bool CodecBufferCircular::IsSyncMode()
138 {
139 std::scoped_lock lock(inMutex_, outMutex_);
140 return HasFlag(FLAG_IS_SYNC) && HasFlag(FLAG_SYNC_ASYNC_CONFIGURED);
141 }
142
ResetFlag()143 void CodecBufferCircular::ResetFlag()
144 {
145 std::scoped_lock lock(inMutex_, outMutex_);
146 RemoveFlag(FLAG_IS_RUNNING);
147 RemoveFlag(FLAG_IS_SYNC);
148 RemoveFlag(FLAG_SYNC_ASYNC_CONFIGURED);
149 RemoveFlag(FLAG_ERROR);
150 RemoveFlag(FLAG_INPUT_EOS);
151 RemoveFlag(FLAG_OUTPUT_EOS);
152 }
153
ClearCaches()154 void CodecBufferCircular::ClearCaches()
155 {
156 {
157 std::lock_guard<std::mutex> lock(inMutex_);
158 PrintCaches(false);
159 inCache_.clear();
160 EventQueue emptyQueue;
161 std::swap(inQueue_, emptyQueue);
162 }
163 {
164 std::lock_guard<std::mutex> lock(outMutex_);
165 PrintCaches(true);
166 outCache_.clear();
167 EventQueue emptyQueue;
168 std::swap(outQueue_, emptyQueue);
169 }
170 }
171
FlushCaches()172 void CodecBufferCircular::FlushCaches()
173 {
174 {
175 std::lock_guard<std::mutex> lock(inMutex_);
176 PrintCaches(false);
177 for (auto &val : inCache_) {
178 val.second.owner = OWNED_BY_SERVER;
179 }
180 EventQueue emptyQueue;
181 std::swap(inQueue_, emptyQueue);
182 }
183 {
184 std::lock_guard<std::mutex> lock(outMutex_);
185 PrintCaches(true);
186 for (auto &val : outCache_) {
187 val.second.owner = OWNED_BY_SERVER;
188 }
189 EventQueue emptyQueue;
190 std::swap(outQueue_, emptyQueue);
191 }
192 }
193
194 /******************************** Common ********************************/
HandleInputBuffer(uint32_t index,AVCodecBufferInfo info,AVCodecBufferFlag flag)195 int32_t CodecBufferCircular::HandleInputBuffer(uint32_t index, AVCodecBufferInfo info, AVCodecBufferFlag flag)
196 {
197 // Api9
198 std::lock_guard<std::mutex> lock(inMutex_);
199 CHECK_AND_RETURN_RET_LOG_WITH_TAG(!HasFlag(FLAG_IS_SYNC), AVCS_ERR_INVALID_OPERATION, "Not support sync mode");
200 BufferCacheIter iter = inCache_.find(index);
201 CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_RUNNING), AVCS_ERR_INVALID_STATE, "Not in running state");
202 if (iter == inCache_.end()) {
203 AVCODEC_LOGW_WITH_TAG("Index is invalid %{public}u", index);
204 return AVCS_ERR_OK;
205 }
206 BufferItem &item = iter->second;
207 item.owner = OWNED_BY_SERVER;
208 CHECK_AND_RETURN_RET_LOG_WITH_TAG(item.buffer != nullptr, AVCS_ERR_INVALID_OPERATION, "Buffer is nullptr");
209 CHECK_AND_RETURN_RET_LOG_WITH_TAG(item.memory != nullptr, AVCS_ERR_INVALID_OPERATION, "Memory is nullptr");
210 CHECK_AND_RETURN_RET_LOG_WITH_TAG(converter_ != nullptr, AVCS_ERR_INVALID_OPERATION, "Converter is nullptr");
211 CHECK_AND_RETURN_RET_LOG_WITH_TAG(item.buffer->memory_ != nullptr, AVCS_ERR_INVALID_OPERATION,
212 "Get buffer memory is nullptr");
213 item.buffer->pts_ = info.presentationTimeUs;
214 item.buffer->flag_ = static_cast<uint32_t>(flag);
215 item.buffer->memory_->SetOffset(info.offset);
216 item.buffer->memory_->SetSize(info.size);
217
218 item.pts = info.presentationTimeUs;
219 item.flag = static_cast<uint32_t>(flag);
220 item.size = info.size;
221
222 converter_->WriteToBuffer(item.buffer, item.memory);
223 return AVCS_ERR_OK;
224 }
225
HandleInputBuffer(uint32_t index)226 int32_t CodecBufferCircular::HandleInputBuffer(uint32_t index)
227 {
228 std::lock_guard<std::mutex> lock(inMutex_);
229 CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_RUNNING), AVCS_ERR_INVALID_STATE, "Not in running state");
230 BufferCacheIter iter = inCache_.find(index);
231 if (iter == inCache_.end()) {
232 AVCODEC_LOGW_WITH_TAG("Index is invalid %{public}u", index);
233 return HasFlag(FLAG_IS_SYNC) ? AVCS_ERR_INVALID_OPERATION : AVCS_ERR_OK;
234 }
235 BufferItem &item = iter->second;
236 CHECK_AND_RETURN_RET_LOG_WITH_TAG(item.owner == OWNED_BY_USER, AVCS_ERR_INVALID_OPERATION,
237 "Invalid ownership:%{public}s", OwnerToString(item.owner).c_str());
238 item.owner = OWNED_BY_SERVER;
239 if (item.buffer != nullptr) {
240 item.pts = item.buffer->pts_;
241 item.flag = item.buffer->flag_;
242 item.size = (item.buffer->memory_ != nullptr) ? item.buffer->memory_->GetSize() : 0;
243 }
244 return AVCS_ERR_OK;
245 }
246
HandleOutputBuffer(uint32_t index)247 int32_t CodecBufferCircular::HandleOutputBuffer(uint32_t index)
248 {
249 std::lock_guard<std::mutex> lock(outMutex_);
250 CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_RUNNING), AVCS_ERR_INVALID_STATE, "Not in running state");
251 BufferCacheIter iter = outCache_.find(index);
252 if (iter == outCache_.end()) {
253 AVCODEC_LOGW_WITH_TAG("Index is invalid %{public}u", index);
254 return HasFlag(FLAG_IS_SYNC) ? AVCS_ERR_INVALID_OPERATION : AVCS_ERR_OK;
255 }
256 BufferItem &item = iter->second;
257 CHECK_AND_RETURN_RET_LOG_WITH_TAG(item.owner == OWNED_BY_USER, AVCS_ERR_INVALID_OPERATION,
258 "Invalid ownership:%{public}s", OwnerToString(item.owner).c_str());
259 item.owner = OWNED_BY_SERVER;
260 if (item.buffer != nullptr) {
261 item.pts = item.buffer->pts_;
262 item.flag = item.buffer->flag_;
263 item.size = (item.buffer->memory_ != nullptr) ? item.buffer->memory_->GetSize() : 0;
264 }
265 return AVCS_ERR_OK;
266 }
267
QueueInputBufferDone(uint32_t index)268 void CodecBufferCircular::QueueInputBufferDone(uint32_t index)
269 {
270 std::lock_guard<std::mutex> lock(inMutex_);
271 BufferCacheIter iter = inCache_.find(index);
272 if (iter == inCache_.end()) {
273 AVCODEC_LOGD_WITH_TAG("index=%{public}u", index);
274 return;
275 }
276 // The current owner of buffer is server and cannot read info from this buffer
277 BufferItem &item = iter->second;
278 if (item.flag & AVCODEC_BUFFER_FLAG_EOS) {
279 AddFlag(FLAG_INPUT_EOS);
280 inCond_.notify_all();
281 }
282 AVCODEC_LOGD_WITH_TAG("index=%{public}u, size=%{public}d, flag=%{public}u, pts=%{public}" PRId64, index, item.size,
283 item.flag, item.pts);
284 }
285
ReleaseOutputBufferDone(uint32_t index)286 void CodecBufferCircular::ReleaseOutputBufferDone(uint32_t index)
287 {
288 std::lock_guard<std::mutex> lock(outMutex_);
289 BufferCacheIter iter = outCache_.find(index);
290 if (iter == outCache_.end()) {
291 AVCODEC_LOGD_WITH_TAG("index=%{public}u", index);
292 return;
293 }
294 // The current owner of buffer is server and cannot read info from this buffer
295 BufferItem &item = iter->second;
296 AVCODEC_LOGD_WITH_TAG("index=%{public}u, size=%{public}d, flag=%{public}u, pts=%{public}" PRId64, index, item.size,
297 item.flag, item.pts);
298 }
299
NotifyEos()300 void CodecBufferCircular::NotifyEos()
301 {
302 std::lock_guard<std::mutex> lock(inMutex_);
303 AddFlag(FLAG_INPUT_EOS);
304 inCond_.notify_all();
305 }
306
GetParameter(BufferCacheIter & iter)307 std::shared_ptr<Format> CodecBufferCircular::GetParameter(BufferCacheIter &iter)
308 {
309 if (iter->second.parameter == nullptr) {
310 iter->second.parameter = std::make_shared<Format>();
311 iter->second.parameter->SetMetaPtr(iter->second.buffer->meta_);
312 }
313 return iter->second.parameter;
314 }
315
GetAttribute(BufferCacheIter & iter)316 std::shared_ptr<Format> CodecBufferCircular::GetAttribute(BufferCacheIter &iter)
317 {
318 if (iter->second.attribute == nullptr) {
319 iter->second.attribute = std::make_shared<Format>();
320 }
321 iter->second.attribute->PutLongValue(Media::Tag::MEDIA_TIME_STAMP, iter->second.buffer->pts_);
322 return iter->second.attribute;
323 }
324
HasFlag(const CodecCircularFlag flag)325 inline bool CodecBufferCircular::HasFlag(const CodecCircularFlag flag)
326 {
327 return flags_ & flag;
328 }
329
AddFlag(const CodecCircularFlag flag)330 inline void CodecBufferCircular::AddFlag(const CodecCircularFlag flag)
331 {
332 flags_ |= flag;
333 }
334
RemoveFlag(const CodecCircularFlag flag)335 inline void CodecBufferCircular::RemoveFlag(const CodecCircularFlag flag)
336 {
337 flags_ &= ~flag;
338 }
339
340 /******************************** DFX ********************************/
PrintCaches(bool isOutput)341 void CodecBufferCircular::PrintCaches(bool isOutput)
342 {
343 BufferCache &cache = isOutput ? outCache_ : inCache_;
344 std::array<std::vector<uint32_t>, 3> ownerArrays; // 3: [SERVER = 0, CLIENT = 1, USER = 2]
345 for (const auto &[key, item] : cache) {
346 ownerArrays[item.owner].emplace_back(key);
347 }
348 auto getCacheInfo = [](const std::vector<uint32_t> &keys, const char *ownerstr) {
349 std::ostringstream oss;
350 oss << ownerstr << "(";
351 if (!keys.empty()) {
352 auto it = keys.begin();
353 oss << *it;
354 for (++it; it != keys.end(); ++it) {
355 oss << "," << *it;
356 }
357 }
358 oss << ")";
359 return oss.str();
360 };
361 const std::string userInfo = getCacheInfo(ownerArrays[OWNED_BY_USER], "user");
362 const std::string clientInfo = HasFlag(FLAG_IS_SYNC) ? getCacheInfo(ownerArrays[OWNED_BY_CLIENT], ",client") : "";
363 const std::string serverInfo = getCacheInfo(ownerArrays[OWNED_BY_SERVER], ",server");
364 AVCODEC_LOGI_WITH_TAG("%{public}s cache:%{public}s%{public}s%{public}s", (isOutput ? "out" : "in"),
365 userInfo.c_str(), clientInfo.c_str(), serverInfo.c_str());
366 }
367
OwnerToString(BufferOwner owner)368 const std::string &CodecBufferCircular::OwnerToString(BufferOwner owner)
369 {
370 static std::unordered_map<BufferOwner, const std::string> ownerStringMap = {
371 {OWNED_BY_SERVER, "server"},
372 {OWNED_BY_CLIENT, "client"},
373 {OWNED_BY_USER, "user"},
374 };
375 static const std::string defaultString = "unknown";
376 auto iter = ownerStringMap.find(owner);
377 return iter == ownerStringMap.end() ? defaultString : iter->second;
378 }
379
ClearOutputBufferOwnedByCodec()380 void CodecBufferCircular::ClearOutputBufferOwnedByCodec()
381 {
382 for (auto iter = outCache_.begin(); iter != outCache_.end();) {
383 if (iter->second.owner == OWNED_BY_SERVER) {
384 iter = outCache_.erase(iter);
385 } else {
386 ++iter;
387 }
388 }
389 }
390
391 /******************************** Callback ********************************/
OnError(AVCodecErrorType errorType,int32_t errorCode)392 void CodecBufferCircular::OnError(AVCodecErrorType errorType, int32_t errorCode)
393 {
394 IsSyncMode() ? SyncOnError(errorType, errorCode) : AsyncOnError(errorType, errorCode);
395 }
396
OnOutputFormatChanged(const Format & format)397 void CodecBufferCircular::OnOutputFormatChanged(const Format &format)
398 {
399 IsSyncMode() ? SyncOnOutputFormatChanged(format) : AsyncOnOutputFormatChanged(format);
400 }
401
OnInputBufferAvailable(uint32_t index,std::shared_ptr<AVBuffer> buffer)402 void CodecBufferCircular::OnInputBufferAvailable(uint32_t index, std::shared_ptr<AVBuffer> buffer)
403 {
404 CHECK_AND_RETURN_LOG_WITH_TAG(buffer != nullptr, "buffer is nullptr");
405 {
406 std::unique_lock<std::mutex> lock(inMutex_);
407 if (HasFlag(FLAG_INPUT_EOS)) {
408 AVCODEC_LOGD_WITH_TAG("At eos state, no buffer available");
409 return;
410 }
411 }
412 IsSyncMode() ? SyncOnInputBufferAvailable(index, buffer) : AsyncOnInputBufferAvailable(index, buffer);
413 }
414
OnOutputBufferAvailable(uint32_t index,std::shared_ptr<AVBuffer> buffer)415 void CodecBufferCircular::OnOutputBufferAvailable(uint32_t index, std::shared_ptr<AVBuffer> buffer)
416 {
417 CHECK_AND_RETURN_LOG_WITH_TAG(buffer != nullptr, "buffer is nullptr");
418 IsSyncMode() ? SyncOnOutputBufferAvailable(index, buffer) : AsyncOnOutputBufferAvailable(index, buffer);
419 }
420
OnOutputBufferBinded(std::map<uint32_t,sptr<SurfaceBuffer>> & bufferMap)421 void CodecBufferCircular::OnOutputBufferBinded(std::map<uint32_t, sptr<SurfaceBuffer>> &bufferMap)
422 {
423 if (!IsSyncMode() && (mediaCb_ != nullptr)) {
424 mediaCb_->OnOutputBufferBinded(bufferMap);
425 }
426 }
427
OnOutputBufferUnbinded()428 void CodecBufferCircular::OnOutputBufferUnbinded()
429 {
430 if (!IsSyncMode() && (mediaCb_ != nullptr)) {
431 mediaCb_->OnOutputBufferUnbinded();
432 }
433 }
434
435 /******************************** Async mode ********************************/
AsyncOnError(AVCodecErrorType errorType,int32_t errorCode)436 void CodecBufferCircular::AsyncOnError(AVCodecErrorType errorType, int32_t errorCode)
437 {
438 if (errorType == AVCODEC_ERROR_FRAMEAORK_FAILED) {
439 return;
440 }
441 std::shared_ptr<MediaCodecCallback> mediaCb = nullptr;
442 std::shared_ptr<AVCodecCallback> callback = nullptr;
443 {
444 std::scoped_lock lock(inMutex_, outMutex_);
445 mediaCb = mediaCb_;
446 callback = callback_;
447 }
448 // AVBuffer callback
449 if (mediaCb != nullptr) {
450 mediaCb->OnError(errorType, errorCode);
451 return;
452 }
453 // Api9 callback
454 if (callback != nullptr) {
455 callback->OnError(errorType, errorCode);
456 return;
457 }
458 }
459
AsyncOnOutputFormatChanged(const Format & format)460 void CodecBufferCircular::AsyncOnOutputFormatChanged(const Format &format)
461 {
462 std::unique_lock<std::mutex> lock(outMutex_);
463 ClearOutputBufferOwnedByCodec();
464 // AVBuffer callback
465 auto mediaCb = mediaCb_;
466 if (mediaCb != nullptr) {
467 lock.unlock();
468 mediaCb->OnOutputFormatChanged(format);
469 return;
470 }
471 // Api9 callback
472 auto callback = callback_;
473 if (callback != nullptr) {
474 lock.unlock();
475 callback->OnOutputFormatChanged(format);
476 return;
477 }
478 }
479
AsyncOnInputBufferAvailable(uint32_t index,std::shared_ptr<AVBuffer> & buffer)480 void CodecBufferCircular::AsyncOnInputBufferAvailable(uint32_t index, std::shared_ptr<AVBuffer> &buffer)
481 {
482 std::unique_lock<std::mutex> lock(inMutex_);
483 BufferCacheIter iter = inCache_.find(index);
484 if (iter == inCache_.end()) {
485 BufferItem item = {.buffer = buffer};
486 iter = inCache_.emplace(index, item).first;
487 } else {
488 iter->second.buffer = buffer;
489 }
490 BufferItem &item = iter->second;
491 item.owner = OWNED_BY_USER;
492 // Encoder parameter with attribute callback
493 auto attrCb = attrCb_;
494 if (attrCb != nullptr) {
495 auto attribute = GetAttribute(iter);
496 auto parameter = GetParameter(iter);
497 lock.unlock();
498 attrCb->OnInputParameterWithAttrAvailable(index, attribute, parameter);
499 return;
500 }
501 // Encoder parameter callback
502 auto paramCb = paramCb_;
503 if (paramCb != nullptr) {
504 auto parameter = GetParameter(iter);
505 lock.unlock();
506 paramCb->OnInputParameterAvailable(index, parameter);
507 return;
508 }
509 // AVBuffer callback
510 auto mediaCb = mediaCb_;
511 if (mediaCb != nullptr) {
512 item.buffer->pts_ = 0;
513 lock.unlock();
514 mediaCb->OnInputBufferAvailable(index, item.buffer);
515 return;
516 }
517 // Api9 callback
518 auto callback = callback_;
519 if (callback != nullptr) {
520 item.buffer->pts_ = 0;
521 ConvertToSharedMemory(item.buffer, item.memory);
522 if (converter_ != nullptr) {
523 converter_->SetInputBufferFormat(item.buffer);
524 }
525 lock.unlock();
526 callback->OnInputBufferAvailable(index, item.memory);
527 return;
528 }
529 }
530
AsyncOnOutputBufferAvailable(uint32_t index,std::shared_ptr<AVBuffer> & buffer)531 void CodecBufferCircular::AsyncOnOutputBufferAvailable(uint32_t index, std::shared_ptr<AVBuffer> &buffer)
532 {
533 std::unique_lock<std::mutex> lock(outMutex_);
534 BufferCacheIter iter = outCache_.find(index);
535 if (iter == outCache_.end()) {
536 BufferItem item = {.buffer = buffer};
537 iter = outCache_.emplace(index, item).first;
538 } else {
539 iter->second.buffer = buffer;
540 }
541 BufferItem &item = iter->second;
542 item.owner = OWNED_BY_USER;
543 // AVBuffer callback
544 auto mediaCb = mediaCb_;
545 if (mediaCb != nullptr) {
546 lock.unlock();
547 mediaCb->OnOutputBufferAvailable(index, item.buffer);
548 return;
549 }
550 // Api9 callback
551 auto callback = callback_;
552 if (callback != nullptr) {
553 ConvertToSharedMemory(item.buffer, item.memory);
554 if (converter_ != nullptr) {
555 converter_->SetOutputBufferFormat(item.buffer);
556 converter_->ReadFromBuffer(item.buffer, item.memory);
557 }
558 AVCodecBufferFlag flag = static_cast<AVCodecBufferFlag>(item.buffer->flag_);
559 AVCodecBufferInfo info;
560 info.presentationTimeUs = item.buffer->pts_;
561 if (item.buffer->memory_ != nullptr) {
562 info.offset = item.buffer->memory_->GetOffset();
563 info.size = item.buffer->memory_->GetSize();
564 }
565 lock.unlock();
566 callback->OnOutputBufferAvailable(index, info, flag, item.memory);
567 return;
568 }
569 }
570
ConvertToSharedMemory(const std::shared_ptr<AVBuffer> & buffer,std::shared_ptr<AVSharedMemory> & memory)571 void CodecBufferCircular::ConvertToSharedMemory(const std::shared_ptr<AVBuffer> &buffer,
572 std::shared_ptr<AVSharedMemory> &memory)
573 {
574 // Api9
575 using Flags = AVSharedMemory::Flags;
576 std::shared_ptr<AVMemory> &bufferMem = buffer->memory_;
577 if (bufferMem == nullptr || memory != nullptr) {
578 return;
579 }
580 MemoryType type = bufferMem->GetMemoryType();
581 int32_t capacity = bufferMem->GetCapacity();
582 if (type == MemoryType::SHARED_MEMORY) {
583 std::string name = std::string("SharedMem_") + std::to_string(buffer->GetUniqueId());
584 int32_t fd = bufferMem->GetFileDescriptor();
585 bool isReadable = bufferMem->GetMemoryFlag() == MemoryFlag::MEMORY_READ_ONLY;
586 uint32_t flag = isReadable ? Flags::FLAGS_READ_ONLY : Flags::FLAGS_READ_WRITE;
587 memory = AVSharedMemoryBase::CreateFromRemote(fd, capacity, flag, name);
588 } else {
589 std::string name = std::string("SharedMem_") + std::to_string(buffer->GetUniqueId());
590 memory = AVSharedMemoryBase::CreateFromLocal(capacity, Flags::FLAGS_READ_WRITE, name);
591 if (memory == nullptr) {
592 AVCODEC_LOGW_WITH_TAG("Create shared memory from local failed");
593 }
594 }
595 }
596
597 /******************************** Sync mode ********************************/
SyncOnError(AVCodecErrorType errorType,int32_t errorCode)598 void CodecBufferCircular::SyncOnError(AVCodecErrorType errorType, int32_t errorCode)
599 {
600 std::scoped_lock lock(inMutex_, outMutex_);
601 lastError_ = errorCode;
602 AddFlag(FLAG_ERROR);
603 outCond_.notify_all();
604 inCond_.notify_all();
605 }
606
SyncOnOutputFormatChanged(const Format & format)607 void CodecBufferCircular::SyncOnOutputFormatChanged(const Format &format)
608 {
609 std::lock_guard<std::mutex> lock(outMutex_);
610 outQueue_.push(Event({.type = EVENT_STREAM_CHANGED}));
611 ClearOutputBufferOwnedByCodec();
612 outCond_.notify_all();
613 }
614
SyncOnInputBufferAvailable(uint32_t index,std::shared_ptr<AVBuffer> & buffer)615 void CodecBufferCircular::SyncOnInputBufferAvailable(uint32_t index, std::shared_ptr<AVBuffer> &buffer)
616 {
617 std::lock_guard<std::mutex> lock(inMutex_);
618 BufferCacheIter iter = inCache_.find(index);
619 if (iter == inCache_.end()) {
620 BufferItem item = {.buffer = buffer};
621 iter = inCache_.emplace(index, item).first;
622 } else {
623 iter->second.buffer = buffer;
624 }
625 iter->second.buffer->pts_ = 0;
626 iter->second.owner = OWNED_BY_CLIENT;
627 inQueue_.push(Event({.type = EVENT_INPUT_BUFFER, .index = index}));
628 inCond_.notify_all();
629 }
630
SyncOnOutputBufferAvailable(uint32_t index,std::shared_ptr<AVBuffer> & buffer)631 void CodecBufferCircular::SyncOnOutputBufferAvailable(uint32_t index, std::shared_ptr<AVBuffer> &buffer)
632 {
633 std::lock_guard<std::mutex> lock(outMutex_);
634 BufferCacheIter iter = outCache_.find(index);
635 if (iter == outCache_.end()) {
636 BufferItem item = {.buffer = buffer};
637 iter = outCache_.emplace(index, item).first;
638 } else {
639 iter->second.buffer = buffer;
640 }
641 iter->second.owner = OWNED_BY_CLIENT;
642 outQueue_.push(Event({.type = EVENT_OUTPUT_BUFFER, .index = index}));
643 outCond_.notify_all();
644 }
645
QueryInputBuffer(uint32_t & index,int64_t timeoutUs)646 int32_t CodecBufferCircular::QueryInputBuffer(uint32_t &index, int64_t timeoutUs)
647 {
648 return QueryInputIndex(index, timeoutUs);
649 }
650
QueryOutputBuffer(uint32_t & index,int64_t timeoutUs)651 int32_t CodecBufferCircular::QueryOutputBuffer(uint32_t &index, int64_t timeoutUs)
652 {
653 return QueryOutputIndex(index, timeoutUs);
654 }
655
QueryInputIndex(uint32_t & index,int64_t timeoutUs)656 int32_t CodecBufferCircular::QueryInputIndex(uint32_t &index, int64_t timeoutUs)
657 {
658 std::unique_lock<std::mutex> lock(inMutex_);
659 CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_SYNC), AVCS_ERR_INVALID_OPERATION, "Need enable sync mode");
660 bool isNotTimeout = WaitForInputBuffer(lock, timeoutUs);
661
662 CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_RUNNING), AVCS_ERR_INVALID_STATE, "Not in running state");
663 CHECK_AND_RETURN_RET_LOG_WITH_TAG(!HasFlag(FLAG_INPUT_EOS), AVCS_ERR_INVALID_STATE, "End-of-stream pushed");
664 CHECK_AND_RETURN_RET_LOG_WITH_TAG(!HasFlag(FLAG_ERROR), lastError_, "%{public}s",
665 AVCSErrorToString(static_cast<AVCodecServiceErrCode>(lastError_)).c_str());
666 if (!isNotTimeout) {
667 return AVCS_ERR_TRY_AGAIN;
668 }
669 Event event = inQueue_.front();
670 inQueue_.pop();
671 index = event.index;
672 BufferCacheIter iter = inCache_.find(index);
673 if (iter != inCache_.end()) {
674 iter->second.owner = OWNED_BY_USER;
675 }
676 return AVCS_ERR_OK;
677 }
678
QueryOutputIndex(uint32_t & index,int64_t timeoutUs)679 int32_t CodecBufferCircular::QueryOutputIndex(uint32_t &index, int64_t timeoutUs)
680 {
681 std::unique_lock<std::mutex> lock(outMutex_);
682 CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_SYNC), AVCS_ERR_INVALID_OPERATION, "Need enable sync mode");
683 bool isNotTimeout = WaitForOutputBuffer(lock, timeoutUs);
684
685 CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_RUNNING), AVCS_ERR_INVALID_STATE, "Not in running state");
686 CHECK_AND_RETURN_RET_LOG_WITH_TAG(!HasFlag(FLAG_OUTPUT_EOS), AVCS_ERR_INVALID_STATE, "End-of-stream reached");
687 CHECK_AND_RETURN_RET_LOG_WITH_TAG(!HasFlag(FLAG_ERROR), lastError_, "%{public}s",
688 AVCSErrorToString(static_cast<AVCodecServiceErrCode>(lastError_)).c_str());
689 if (!isNotTimeout) {
690 return AVCS_ERR_TRY_AGAIN;
691 }
692 Event event = outQueue_.front();
693 outQueue_.pop();
694 if (event.type == EVENT_STREAM_CHANGED) {
695 AVCODEC_LOGI_WITH_TAG("Output format changed");
696 return AVCS_ERR_STREAM_CHANGED;
697 }
698 index = event.index;
699 BufferCacheIter iter = outCache_.find(index);
700 if (iter == outCache_.end()) {
701 return AVCS_ERR_OK;
702 }
703 BufferItem &item = iter->second;
704 item.owner = OWNED_BY_USER;
705 if (item.buffer != nullptr) {
706 item.flag = item.buffer->flag_;
707 }
708 if (item.flag & AVCODEC_BUFFER_FLAG_EOS) {
709 AddFlag(FLAG_OUTPUT_EOS);
710 outCond_.notify_all();
711 }
712 return AVCS_ERR_OK;
713 }
714
WaitForInputBuffer(std::unique_lock<std::mutex> & lock,int64_t timeoutUs)715 bool CodecBufferCircular::WaitForInputBuffer(std::unique_lock<std::mutex> &lock, int64_t timeoutUs)
716 {
717 const auto predicate = [this] {
718 return !HasFlag(FLAG_IS_RUNNING) || // [1] Not in running state
719 HasFlag(FLAG_INPUT_EOS) || // [2] End-of-stream pushed
720 HasFlag(FLAG_ERROR) || // [3] Error state detected
721 inQueue_.size() > 0; // [4] Input buffer available
722 };
723
724 if (timeoutUs < 0) {
725 inCond_.wait(lock, predicate);
726 return true; // Always returns true after wait
727 }
728 if (timeoutUs == 0) {
729 return predicate(); // Immediate status check
730 }
731 if (timeoutUs > MAX_TIMEOUT) {
732 timeoutUs = MAX_TIMEOUT;
733 }
734 // Returns true if predicate satisfied
735 return inCond_.wait_for(lock, std::chrono::microseconds(timeoutUs), predicate);
736 }
737
WaitForOutputBuffer(std::unique_lock<std::mutex> & lock,int64_t timeoutUs)738 bool CodecBufferCircular::WaitForOutputBuffer(std::unique_lock<std::mutex> &lock, int64_t timeoutUs)
739 {
740 const auto predicate = [this] {
741 return !HasFlag(FLAG_IS_RUNNING) || // [1] Not in running state
742 HasFlag(FLAG_OUTPUT_EOS) || // [2] End-of-stream reached
743 HasFlag(FLAG_ERROR) || // [3] Error state detected
744 outQueue_.size() > 0; // [4] Output buffer available | Stream description changed
745 };
746
747 if (timeoutUs < 0) {
748 outCond_.wait(lock, predicate);
749 return true;
750 }
751 if (timeoutUs == 0) {
752 return predicate();
753 }
754 if (timeoutUs > MAX_TIMEOUT) {
755 timeoutUs = MAX_TIMEOUT;
756 }
757 return outCond_.wait_for(lock, std::chrono::microseconds(timeoutUs), predicate);
758 }
759
GetInputBuffer(uint32_t index)760 std::shared_ptr<AVBuffer> CodecBufferCircular::GetInputBuffer(uint32_t index)
761 {
762 std::lock_guard<std::mutex> lock(inMutex_);
763 CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_SYNC), nullptr, "Need enable sync mode");
764 CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_RUNNING), nullptr, "Not in running state");
765 CHECK_AND_RETURN_RET_LOG_WITH_TAG(!HasFlag(FLAG_ERROR), nullptr, "%{public}s",
766 AVCSErrorToString(static_cast<AVCodecServiceErrCode>(lastError_)).c_str());
767 BufferCacheIter iter = inCache_.find(index);
768 CHECK_AND_RETURN_RET_LOG_WITH_TAG(iter != inCache_.end(), nullptr, "Index is invalid %{public}u", index);
769 CHECK_AND_RETURN_RET_LOG_WITH_TAG(iter->second.owner == OWNED_BY_USER, nullptr, "Invalid ownership:%{public}s",
770 OwnerToString(iter->second.owner).c_str());
771 return iter->second.buffer;
772 }
773
GetOutputBuffer(uint32_t index)774 std::shared_ptr<AVBuffer> CodecBufferCircular::GetOutputBuffer(uint32_t index)
775 {
776 std::lock_guard<std::mutex> lock(outMutex_);
777 CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_SYNC), nullptr, "Need enable sync mode");
778 CHECK_AND_RETURN_RET_LOG_WITH_TAG(HasFlag(FLAG_IS_RUNNING), nullptr, "Not in running state");
779 CHECK_AND_RETURN_RET_LOG_WITH_TAG(!HasFlag(FLAG_ERROR), nullptr, "%{public}s",
780 AVCSErrorToString(static_cast<AVCodecServiceErrCode>(lastError_)).c_str());
781 BufferCacheIter iter = outCache_.find(index);
782 CHECK_AND_RETURN_RET_LOG_WITH_TAG(iter != outCache_.end(), nullptr, "Index is invalid %{public}u", index);
783 CHECK_AND_RETURN_RET_LOG_WITH_TAG(iter->second.owner == OWNED_BY_USER, nullptr, "Invalid ownership:%{public}s",
784 OwnerToString(iter->second.owner).c_str());
785 return iter->second.buffer;
786 }
787 } // namespace MediaAVCodec
788 } // namespace OHOS