1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "audio_codec_worker.h"
17 #include "avcodec_trace.h"
18 #include "avcodec_errors.h"
19 #include "avcodec_log.h"
20 #include "utils.h"
21
22 namespace {
23 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN_AUDIO, "AvCodec-AudioCodecWorker"};
24 constexpr uint8_t LOGD_FREQUENCY = 5;
25 } // namespace
26
27 namespace OHOS {
28 namespace MediaAVCodec {
29 constexpr short DEFAULT_TRY_DECODE_TIME = 10;
30 constexpr short DEFAULT_BUFFER_COUNT = 8;
31 constexpr int TIMEOUT_MS = 1000;
32 const std::string_view INPUT_BUFFER = "inputBuffer";
33 const std::string_view OUTPUT_BUFFER = "outputBuffer";
34 const std::string_view ASYNC_HANDLE_INPUT = "OS_AuCodecIn";
35 const std::string_view ASYNC_DECODE_FRAME = "OS_AuCodecOut";
36
AudioCodecWorker(const std::shared_ptr<AudioBaseCodec> & codec,const std::shared_ptr<AVCodecCallback> & callback)37 AudioCodecWorker::AudioCodecWorker(const std::shared_ptr<AudioBaseCodec> &codec,
38 const std::shared_ptr<AVCodecCallback> &callback)
39 : isFirFrame_(true),
40 isRunning(true),
41 codec_(codec),
42 inputBufferSize(codec_->GetInputBufferSize()),
43 outputBufferSize(codec_->GetOutputBufferSize()),
44 bufferCount(DEFAULT_BUFFER_COUNT),
45 name_(codec->GetCodecType()),
46 inputTask_(std::make_unique<TaskThread>(ASYNC_HANDLE_INPUT)),
47 outputTask_(std::make_unique<TaskThread>(ASYNC_DECODE_FRAME)),
48 callback_(callback),
49 inputBuffer_(std::make_shared<AudioBuffersManager>(inputBufferSize, INPUT_BUFFER, DEFAULT_BUFFER_COUNT)),
50 outputBuffer_(std::make_shared<AudioBuffersManager>(outputBufferSize, OUTPUT_BUFFER, DEFAULT_BUFFER_COUNT))
51 {
52 inputTask_->RegisterHandler([this] { ProduceInputBuffer(); });
53 outputTask_->RegisterHandler([this] { ConsumerOutputBuffer(); });
54 }
55
~AudioCodecWorker()56 AudioCodecWorker::~AudioCodecWorker()
57 {
58 AVCODEC_LOGD("release all data of %{public}s codec worker in destructor.", name_.data());
59 Dispose();
60 ResetTask();
61 ReleaseAllInBufferQueue();
62 ReleaseAllInBufferAvaQueue();
63
64 inputBuffer_->ReleaseAll();
65 outputBuffer_->ReleaseAll();
66
67 if (codec_) {
68 codec_ = nullptr;
69 }
70
71 if (callback_) {
72 callback_.reset();
73 callback_ = nullptr;
74 }
75 }
76
PushInputData(const uint32_t & index)77 bool AudioCodecWorker::PushInputData(const uint32_t &index)
78 {
79 AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "%{public}s Worker PushInputData enter,index:%{public}u", name_.data(), index);
80
81 if (!isRunning) {
82 return true;
83 }
84
85 if (!callback_) {
86 AVCODEC_LOGE("push input buffer failed in worker, callback is nullptr, please check the callback.");
87 Dispose();
88 return false;
89 }
90 if (!codec_) {
91 AVCODEC_LOGE("push input buffer failed in worker, codec is nullptr, please check the codec.");
92 Dispose();
93 return false;
94 }
95
96 std::lock_guard<std::mutex> lock(stateMutex_);
97 inBufIndexQue_.push(index);
98 outputCondition_.notify_all();
99 return true;
100 }
101
Start()102 bool AudioCodecWorker::Start()
103 {
104 AVCODEC_SYNC_TRACE;
105 AVCODEC_LOGD("Worker Start enter");
106 if (!callback_) {
107 AVCODEC_LOGE("Start failed in worker, callback is nullptr, please check the callback.");
108 return false;
109 }
110 if (!codec_) {
111 AVCODEC_LOGE("Start failed in worker, codec_ is nullptr, please check the codec_.");
112 return false;
113 }
114 bool result = Begin();
115 return result;
116 }
117
Stop()118 bool AudioCodecWorker::Stop()
119 {
120 AVCODEC_SYNC_TRACE;
121 AVCODEC_LOGD("Worker Stop enter");
122 Dispose();
123
124 if (inputTask_) {
125 inputTask_->StopAsync();
126 } else {
127 AVCODEC_LOGE("Stop failed in worker, inputTask_ is nullptr, please check the inputTask_.");
128 return false;
129 }
130 if (outputTask_) {
131 outputTask_->StopAsync();
132 } else {
133 AVCODEC_LOGE("Stop failed in worker, outputTask_ is nullptr, please check the outputTask_.");
134 return false;
135 }
136
137 ReleaseAllInBufferQueue();
138 ReleaseAllInBufferAvaQueue();
139
140 inputBuffer_->ReleaseAll();
141 outputBuffer_->ReleaseAll();
142 return true;
143 }
144
Pause()145 bool AudioCodecWorker::Pause()
146 {
147 AVCODEC_SYNC_TRACE;
148 AVCODEC_LOGD("Worker Pause enter");
149 Dispose();
150
151 if (inputTask_) {
152 inputTask_->PauseAsync();
153 } else {
154 AVCODEC_LOGE("Pause failed in worker, inputTask_ is nullptr, please check the inputTask_.");
155 return false;
156 }
157 if (outputTask_) {
158 outputTask_->PauseAsync();
159 } else {
160 AVCODEC_LOGE("Pause failed in worker, outputTask_ is nullptr, please check the outputTask_.");
161 return false;
162 }
163
164 ReleaseAllInBufferQueue();
165 ReleaseAllInBufferAvaQueue();
166
167 inputBuffer_->ReleaseAll();
168 outputBuffer_->ReleaseAll();
169 return true;
170 }
171
Resume()172 bool AudioCodecWorker::Resume()
173 {
174 AVCODEC_SYNC_TRACE;
175 AVCODEC_LOGD("Worker Resume enter");
176 if (!callback_) {
177 AVCODEC_LOGE("Resume failed in worker, callback_ is nullptr, please check the callback_.");
178 return false;
179 }
180 if (!codec_) {
181 AVCODEC_LOGE("Resume failed in worker, codec_ is nullptr, please check the codec_.");
182 return false;
183 }
184 bool result = Begin();
185 return result;
186 }
187
Release()188 bool AudioCodecWorker::Release()
189 {
190 AVCODEC_SYNC_TRACE;
191 AVCODEC_LOGD("Worker Release enter");
192 Dispose();
193 ResetTask();
194 ReleaseAllInBufferQueue();
195 ReleaseAllInBufferAvaQueue();
196
197 inputBuffer_->ReleaseAll();
198 outputBuffer_->ReleaseAll();
199 if (codec_) {
200 codec_ = nullptr;
201 }
202 if (callback_) {
203 callback_.reset();
204 callback_ = nullptr;
205 }
206 AVCODEC_LOGD("Worker Release end");
207 return true;
208 }
209
GetInputBuffer() const210 std::shared_ptr<AudioBuffersManager> AudioCodecWorker::GetInputBuffer() const noexcept
211 {
212 AVCODEC_LOGD("Worker GetInputBuffer enter");
213 return inputBuffer_;
214 }
215
GetOutputBuffer() const216 std::shared_ptr<AudioBuffersManager> AudioCodecWorker::GetOutputBuffer() const noexcept
217 {
218 AVCODEC_LOGD("Worker GetOutputBuffer enter");
219 return outputBuffer_;
220 }
221
GetOutputBufferInfo(const uint32_t & index) const222 std::shared_ptr<AudioBufferInfo> AudioCodecWorker::GetOutputBufferInfo(const uint32_t &index) const noexcept
223 {
224 return outputBuffer_->getMemory(index);
225 }
226
GetInputBufferInfo(const uint32_t & index) const227 std::shared_ptr<AudioBufferInfo> AudioCodecWorker::GetInputBufferInfo(const uint32_t &index) const noexcept
228 {
229 return inputBuffer_->getMemory(index);
230 }
231
ProduceInputBuffer()232 void AudioCodecWorker::ProduceInputBuffer()
233 {
234 AVCODEC_SYNC_TRACE;
235 AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Worker produceInputBuffer enter");
236 if (!isRunning) {
237 usleep(DEFAULT_TRY_DECODE_TIME);
238 return;
239 }
240 std::unique_lock lock(inputMutex_);
241 while (!inBufAvaIndexQue_.empty() && isRunning) {
242 uint32_t index;
243 {
244 std::lock_guard<std::mutex> avaLock(inAvaMutex_);
245 index = inBufAvaIndexQue_.front();
246 inBufAvaIndexQue_.pop();
247 }
248 auto inputBuffer = GetInputBufferInfo(index);
249 if (!inputBuffer) {
250 AVCODEC_LOGE("Failed to get input buffer at index %{public}u", index);
251 continue;
252 }
253 inputBuffer->SetBufferOwned();
254 callback_->OnInputBufferAvailable(index, inputBuffer->GetBuffer());
255 }
256 inputCondition_.wait_for(lock, std::chrono::milliseconds(TIMEOUT_MS),
257 [this] { return (!inBufAvaIndexQue_.empty() || !isRunning); });
258 AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Worker produceInputBuffer exit");
259 }
260
HandInputBuffer(int32_t & ret)261 bool AudioCodecWorker::HandInputBuffer(int32_t &ret)
262 {
263 uint32_t inputIndex;
264 {
265 std::lock_guard<std::mutex> lock(stateMutex_);
266 inputIndex = inBufIndexQue_.front();
267 inBufIndexQue_.pop();
268 }
269 AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "handle input buffer. index:%{public}u", inputIndex);
270 auto inputBuffer = GetInputBufferInfo(inputIndex);
271 if (inputBuffer == nullptr) {
272 AVCODEC_LOGE("inputBuffer is nullptr");
273 return false;
274 }
275 bool isEos = inputBuffer->CheckIsEos();
276 ret = codec_->ProcessSendData(inputBuffer);
277 inputBuffer_->ReleaseBuffer(inputIndex);
278 {
279 std::lock_guard<std::mutex> lock(inAvaMutex_);
280 inBufAvaIndexQue_.push(inputIndex);
281 inputCondition_.notify_all();
282 }
283 if (ret == AVCodecServiceErrCode::AVCS_ERR_INVALID_DATA) {
284 callback_->OnError(AVCodecErrorType::AVCODEC_ERROR_INTERNAL, ret);
285 }
286 return isEos;
287 }
288
ReleaseOutputBuffer(const uint32_t & index,const int32_t & ret)289 void AudioCodecWorker::ReleaseOutputBuffer(const uint32_t &index, const int32_t &ret)
290 {
291 outputBuffer_->ReleaseBuffer(index);
292 callback_->OnError(AVCodecErrorType::AVCODEC_ERROR_INTERNAL, ret);
293 }
294
SetFirstAndEosStatus(std::shared_ptr<AudioBufferInfo> & outBuffer,bool isEos,uint32_t index)295 void AudioCodecWorker::SetFirstAndEosStatus(std::shared_ptr<AudioBufferInfo> &outBuffer, bool isEos, uint32_t index)
296 {
297 if (isEos) {
298 AVCODEC_LOGD("set buffer EOS. index:%{public}u", index);
299 outBuffer->SetEos(isEos);
300 }
301 if (isFirFrame_) {
302 outBuffer->SetFirstFrame();
303 isFirFrame_ = false;
304 }
305 }
306
ConsumerOutputBuffer()307 void AudioCodecWorker::ConsumerOutputBuffer()
308 {
309 AVCODEC_SYNC_TRACE;
310 AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Worker consumerOutputBuffer enter");
311 if (!isRunning) {
312 usleep(DEFAULT_TRY_DECODE_TIME);
313 return;
314 }
315 std::unique_lock lock(outputMutex_);
316 while (!inBufIndexQue_.empty() && isRunning) {
317 int32_t ret;
318 bool isEos = HandInputBuffer(ret);
319 if (ret == AVCodecServiceErrCode::AVCS_ERR_NOT_ENOUGH_DATA) {
320 AVCODEC_LOGW("current input buffer is not enough,skip this frame");
321 continue;
322 }
323 if (ret != AVCodecServiceErrCode::AVCS_ERR_OK && ret != AVCodecServiceErrCode::AVCS_ERR_END_OF_STREAM) {
324 AVCODEC_LOGE("input error!");
325 return;
326 }
327 uint32_t index;
328 if (outputBuffer_->RequestAvailableIndex(index)) {
329 auto outBuffer = GetOutputBufferInfo(index);
330 SetFirstAndEosStatus(outBuffer, isEos, index);
331 ret = codec_->ProcessRecieveData(outBuffer);
332 if (ret == AVCodecServiceErrCode::AVCS_ERR_NOT_ENOUGH_DATA) {
333 AVCODEC_LOGD("current ouput buffer is not enough,skip this frame. index:%{public}u", index);
334 outputBuffer_->ReleaseBuffer(index);
335 continue;
336 }
337 if (ret != AVCodecServiceErrCode::AVCS_ERR_OK && ret != AVCodecServiceErrCode::AVCS_ERR_END_OF_STREAM) {
338 AVCODEC_LOGE("process output buffer error! index:%{public}u", index);
339 ReleaseOutputBuffer(index, ret);
340 return;
341 }
342 AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Work %{public}s consumerOutputBuffer callback_ index:%{public}u",
343 name_.data(), index);
344 lock.unlock();
345 callback_->OnOutputBufferAvailable(index, outBuffer->GetBufferAttr(), outBuffer->GetFlag(),
346 outBuffer->GetBuffer());
347 lock.lock();
348 }
349 }
350 outputCondition_.wait_for(lock, std::chrono::milliseconds(TIMEOUT_MS),
351 [this] { return (inBufIndexQue_.size() > 0 || !isRunning); });
352 AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Work consumerOutputBuffer exit");
353 }
354
Dispose()355 void AudioCodecWorker::Dispose()
356 {
357 AVCODEC_LOGD("Worker dispose enter");
358 isRunning = false;
359 outputBuffer_->DisableRunning();
360 {
361 std::unique_lock lock(inputMutex_);
362 inputCondition_.notify_all();
363 }
364 {
365 std::unique_lock lock(outputMutex_);
366 outputCondition_.notify_all();
367 }
368 }
369
Begin()370 bool AudioCodecWorker::Begin()
371 {
372 AVCODEC_LOGD("Worker begin enter");
373 for (uint32_t i = 0; i < static_cast<uint32_t>(bufferCount); i++) {
374 inBufAvaIndexQue_.push(i);
375 }
376 isRunning = true;
377
378 inputBuffer_->SetRunning();
379 outputBuffer_->SetRunning();
380
381 if (inputTask_) {
382 inputTask_->Start();
383 } else {
384 return false;
385 }
386 if (outputTask_) {
387 outputTask_->Start();
388 } else {
389 return false;
390 }
391 inputCondition_.notify_all();
392 outputCondition_.notify_all();
393 return true;
394 }
395
ReleaseAllInBufferQueue()396 void AudioCodecWorker::ReleaseAllInBufferQueue()
397 {
398 std::lock_guard<std::mutex> lock(stateMutex_);
399 while (!inBufIndexQue_.empty()) {
400 inBufIndexQue_.pop();
401 }
402 }
403
ReleaseAllInBufferAvaQueue()404 void AudioCodecWorker::ReleaseAllInBufferAvaQueue()
405 {
406 std::lock_guard<std::mutex> lock(inAvaMutex_);
407 while (!inBufAvaIndexQue_.empty()) {
408 inBufAvaIndexQue_.pop();
409 }
410 }
411
ResetTask()412 void AudioCodecWorker::ResetTask()
413 {
414 if (inputTask_) {
415 inputTask_->Stop();
416 inputTask_.reset();
417 inputTask_ = nullptr;
418 }
419 if (outputTask_) {
420 outputTask_->Stop();
421 outputTask_.reset();
422 outputTask_ = nullptr;
423 }
424 }
425 } // namespace MediaAVCodec
426 } // namespace OHOS