• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "audio_codec_worker.h"
17 #include "avcodec_trace.h"
18 #include "avcodec_errors.h"
19 #include "avcodec_log.h"
20 #include "utils.h"
21 
22 namespace {
23 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN_AUDIO, "AvCodec-AudioCodecWorker"};
24 constexpr uint8_t LOGD_FREQUENCY = 5;
25 } // namespace
26 
27 namespace OHOS {
28 namespace MediaAVCodec {
29 constexpr short DEFAULT_TRY_DECODE_TIME = 10;
30 constexpr short DEFAULT_BUFFER_COUNT = 8;
31 constexpr int TIMEOUT_MS = 1000;
32 const std::string_view INPUT_BUFFER = "inputBuffer";
33 const std::string_view OUTPUT_BUFFER = "outputBuffer";
34 const std::string_view ASYNC_HANDLE_INPUT = "OS_AuCodecIn";
35 const std::string_view ASYNC_DECODE_FRAME = "OS_AuCodecOut";
36 
AudioCodecWorker(const std::shared_ptr<AudioBaseCodec> & codec,const std::shared_ptr<AVCodecCallback> & callback)37 AudioCodecWorker::AudioCodecWorker(const std::shared_ptr<AudioBaseCodec> &codec,
38                                    const std::shared_ptr<AVCodecCallback> &callback)
39     : isFirFrame_(true),
40       isRunning(true),
41       codec_(codec),
42       inputBufferSize(codec_->GetInputBufferSize()),
43       outputBufferSize(codec_->GetOutputBufferSize()),
44       bufferCount(DEFAULT_BUFFER_COUNT),
45       name_(codec->GetCodecType()),
46       inputTask_(std::make_unique<TaskThread>(ASYNC_HANDLE_INPUT)),
47       outputTask_(std::make_unique<TaskThread>(ASYNC_DECODE_FRAME)),
48       callback_(callback),
49       inputBuffer_(std::make_shared<AudioBuffersManager>(inputBufferSize, INPUT_BUFFER, DEFAULT_BUFFER_COUNT)),
50       outputBuffer_(std::make_shared<AudioBuffersManager>(outputBufferSize, OUTPUT_BUFFER, DEFAULT_BUFFER_COUNT))
51 {
52     inputTask_->RegisterHandler([this] { ProduceInputBuffer(); });
53     outputTask_->RegisterHandler([this] { ConsumerOutputBuffer(); });
54 }
55 
~AudioCodecWorker()56 AudioCodecWorker::~AudioCodecWorker()
57 {
58     AVCODEC_LOGD("release all data of %{public}s codec worker in destructor.", name_.data());
59     Dispose();
60     ResetTask();
61     ReleaseAllInBufferQueue();
62     ReleaseAllInBufferAvaQueue();
63 
64     inputBuffer_->ReleaseAll();
65     outputBuffer_->ReleaseAll();
66 
67     if (codec_) {
68         codec_ = nullptr;
69     }
70 
71     if (callback_) {
72         callback_.reset();
73         callback_ = nullptr;
74     }
75 }
76 
PushInputData(const uint32_t & index)77 bool AudioCodecWorker::PushInputData(const uint32_t &index)
78 {
79     AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "%{public}s Worker PushInputData enter,index:%{public}u", name_.data(), index);
80 
81     if (!isRunning) {
82         return true;
83     }
84 
85     if (!callback_) {
86         AVCODEC_LOGE("push input buffer failed in worker, callback is nullptr, please check the callback.");
87         Dispose();
88         return false;
89     }
90     if (!codec_) {
91         AVCODEC_LOGE("push input buffer failed in worker, codec is nullptr, please check the codec.");
92         Dispose();
93         return false;
94     }
95 
96     std::lock_guard<std::mutex> lock(stateMutex_);
97     inBufIndexQue_.push(index);
98     outputCondition_.notify_all();
99     return true;
100 }
101 
Start()102 bool AudioCodecWorker::Start()
103 {
104     AVCODEC_SYNC_TRACE;
105     AVCODEC_LOGD("Worker Start enter");
106     if (!callback_) {
107         AVCODEC_LOGE("Start failed in worker, callback is nullptr, please check the callback.");
108         return false;
109     }
110     if (!codec_) {
111         AVCODEC_LOGE("Start failed in worker, codec_ is nullptr, please check the codec_.");
112         return false;
113     }
114     bool result = Begin();
115     return result;
116 }
117 
Stop()118 bool AudioCodecWorker::Stop()
119 {
120     AVCODEC_SYNC_TRACE;
121     AVCODEC_LOGD("Worker Stop enter");
122     Dispose();
123 
124     if (inputTask_) {
125         inputTask_->StopAsync();
126     } else {
127         AVCODEC_LOGE("Stop failed in worker, inputTask_ is nullptr, please check the inputTask_.");
128         return false;
129     }
130     if (outputTask_) {
131         outputTask_->StopAsync();
132     } else {
133         AVCODEC_LOGE("Stop failed in worker, outputTask_ is nullptr, please check the outputTask_.");
134         return false;
135     }
136 
137     ReleaseAllInBufferQueue();
138     ReleaseAllInBufferAvaQueue();
139 
140     inputBuffer_->ReleaseAll();
141     outputBuffer_->ReleaseAll();
142     return true;
143 }
144 
Pause()145 bool AudioCodecWorker::Pause()
146 {
147     AVCODEC_SYNC_TRACE;
148     AVCODEC_LOGD("Worker Pause enter");
149     Dispose();
150 
151     if (inputTask_) {
152         inputTask_->PauseAsync();
153     } else {
154         AVCODEC_LOGE("Pause failed in worker, inputTask_ is nullptr, please check the inputTask_.");
155         return false;
156     }
157     if (outputTask_) {
158         outputTask_->PauseAsync();
159     } else {
160         AVCODEC_LOGE("Pause failed in worker, outputTask_ is nullptr, please check the outputTask_.");
161         return false;
162     }
163 
164     ReleaseAllInBufferQueue();
165     ReleaseAllInBufferAvaQueue();
166 
167     inputBuffer_->ReleaseAll();
168     outputBuffer_->ReleaseAll();
169     return true;
170 }
171 
Resume()172 bool AudioCodecWorker::Resume()
173 {
174     AVCODEC_SYNC_TRACE;
175     AVCODEC_LOGD("Worker Resume enter");
176     if (!callback_) {
177         AVCODEC_LOGE("Resume failed in worker, callback_ is nullptr, please check the callback_.");
178         return false;
179     }
180     if (!codec_) {
181         AVCODEC_LOGE("Resume failed in worker, codec_ is nullptr, please check the codec_.");
182         return false;
183     }
184     bool result = Begin();
185     return result;
186 }
187 
Release()188 bool AudioCodecWorker::Release()
189 {
190     AVCODEC_SYNC_TRACE;
191     AVCODEC_LOGD("Worker Release enter");
192     Dispose();
193     ResetTask();
194     ReleaseAllInBufferQueue();
195     ReleaseAllInBufferAvaQueue();
196 
197     inputBuffer_->ReleaseAll();
198     outputBuffer_->ReleaseAll();
199     if (codec_) {
200         codec_ = nullptr;
201     }
202     if (callback_) {
203         callback_.reset();
204         callback_ = nullptr;
205     }
206     AVCODEC_LOGD("Worker Release end");
207     return true;
208 }
209 
GetInputBuffer() const210 std::shared_ptr<AudioBuffersManager> AudioCodecWorker::GetInputBuffer() const noexcept
211 {
212     AVCODEC_LOGD("Worker GetInputBuffer enter");
213     return inputBuffer_;
214 }
215 
GetOutputBuffer() const216 std::shared_ptr<AudioBuffersManager> AudioCodecWorker::GetOutputBuffer() const noexcept
217 {
218     AVCODEC_LOGD("Worker GetOutputBuffer enter");
219     return outputBuffer_;
220 }
221 
GetOutputBufferInfo(const uint32_t & index) const222 std::shared_ptr<AudioBufferInfo> AudioCodecWorker::GetOutputBufferInfo(const uint32_t &index) const noexcept
223 {
224     return outputBuffer_->getMemory(index);
225 }
226 
GetInputBufferInfo(const uint32_t & index) const227 std::shared_ptr<AudioBufferInfo> AudioCodecWorker::GetInputBufferInfo(const uint32_t &index) const noexcept
228 {
229     return inputBuffer_->getMemory(index);
230 }
231 
ProduceInputBuffer()232 void AudioCodecWorker::ProduceInputBuffer()
233 {
234     AVCODEC_SYNC_TRACE;
235     AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Worker produceInputBuffer enter");
236     if (!isRunning) {
237         usleep(DEFAULT_TRY_DECODE_TIME);
238         return;
239     }
240     std::unique_lock lock(inputMutex_);
241     while (!inBufAvaIndexQue_.empty() && isRunning) {
242         uint32_t index;
243         {
244             std::lock_guard<std::mutex> avaLock(inAvaMutex_);
245             index = inBufAvaIndexQue_.front();
246             inBufAvaIndexQue_.pop();
247         }
248         auto inputBuffer = GetInputBufferInfo(index);
249         if (!inputBuffer) {
250             AVCODEC_LOGE("Failed to get input buffer at index %{public}u", index);
251             continue;
252         }
253         inputBuffer->SetBufferOwned();
254         callback_->OnInputBufferAvailable(index, inputBuffer->GetBuffer());
255     }
256     inputCondition_.wait_for(lock, std::chrono::milliseconds(TIMEOUT_MS),
257                              [this] { return (!inBufAvaIndexQue_.empty() || !isRunning); });
258     AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Worker produceInputBuffer exit");
259 }
260 
HandInputBuffer(int32_t & ret)261 bool AudioCodecWorker::HandInputBuffer(int32_t &ret)
262 {
263     uint32_t inputIndex;
264     {
265         std::lock_guard<std::mutex> lock(stateMutex_);
266         inputIndex = inBufIndexQue_.front();
267         inBufIndexQue_.pop();
268     }
269     AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "handle input buffer. index:%{public}u", inputIndex);
270     auto inputBuffer = GetInputBufferInfo(inputIndex);
271     if (inputBuffer == nullptr) {
272         AVCODEC_LOGE("inputBuffer is nullptr");
273         return false;
274     }
275     bool isEos = inputBuffer->CheckIsEos();
276     ret = codec_->ProcessSendData(inputBuffer);
277     inputBuffer_->ReleaseBuffer(inputIndex);
278     {
279         std::lock_guard<std::mutex> lock(inAvaMutex_);
280         inBufAvaIndexQue_.push(inputIndex);
281         inputCondition_.notify_all();
282     }
283     if (ret == AVCodecServiceErrCode::AVCS_ERR_INVALID_DATA) {
284         callback_->OnError(AVCodecErrorType::AVCODEC_ERROR_INTERNAL, ret);
285     }
286     return isEos;
287 }
288 
ReleaseOutputBuffer(const uint32_t & index,const int32_t & ret)289 void AudioCodecWorker::ReleaseOutputBuffer(const uint32_t &index, const int32_t &ret)
290 {
291     outputBuffer_->ReleaseBuffer(index);
292     callback_->OnError(AVCodecErrorType::AVCODEC_ERROR_INTERNAL, ret);
293 }
294 
SetFirstAndEosStatus(std::shared_ptr<AudioBufferInfo> & outBuffer,bool isEos,uint32_t index)295 void AudioCodecWorker::SetFirstAndEosStatus(std::shared_ptr<AudioBufferInfo> &outBuffer, bool isEos, uint32_t index)
296 {
297     if (isEos) {
298         AVCODEC_LOGD("set buffer EOS. index:%{public}u", index);
299         outBuffer->SetEos(isEos);
300     }
301     if (isFirFrame_) {
302         outBuffer->SetFirstFrame();
303         isFirFrame_ = false;
304     }
305 }
306 
ConsumerOutputBuffer()307 void AudioCodecWorker::ConsumerOutputBuffer()
308 {
309     AVCODEC_SYNC_TRACE;
310     AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Worker consumerOutputBuffer enter");
311     if (!isRunning) {
312         usleep(DEFAULT_TRY_DECODE_TIME);
313         return;
314     }
315     std::unique_lock lock(outputMutex_);
316     while (!inBufIndexQue_.empty() && isRunning) {
317         int32_t ret;
318         bool isEos = HandInputBuffer(ret);
319         if (ret == AVCodecServiceErrCode::AVCS_ERR_NOT_ENOUGH_DATA) {
320             AVCODEC_LOGW("current input buffer is not enough,skip this frame");
321             continue;
322         }
323         if (ret != AVCodecServiceErrCode::AVCS_ERR_OK && ret != AVCodecServiceErrCode::AVCS_ERR_END_OF_STREAM) {
324             AVCODEC_LOGE("input error!");
325             return;
326         }
327         uint32_t index;
328         if (outputBuffer_->RequestAvailableIndex(index)) {
329             auto outBuffer = GetOutputBufferInfo(index);
330             SetFirstAndEosStatus(outBuffer, isEos, index);
331             ret = codec_->ProcessRecieveData(outBuffer);
332             if (ret == AVCodecServiceErrCode::AVCS_ERR_NOT_ENOUGH_DATA) {
333                 AVCODEC_LOGD("current ouput buffer is not enough,skip this frame. index:%{public}u", index);
334                 outputBuffer_->ReleaseBuffer(index);
335                 continue;
336             }
337             if (ret != AVCodecServiceErrCode::AVCS_ERR_OK && ret != AVCodecServiceErrCode::AVCS_ERR_END_OF_STREAM) {
338                 AVCODEC_LOGE("process output buffer error! index:%{public}u", index);
339                 ReleaseOutputBuffer(index, ret);
340                 return;
341             }
342             AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Work %{public}s consumerOutputBuffer callback_ index:%{public}u",
343                                name_.data(), index);
344             lock.unlock();
345             callback_->OnOutputBufferAvailable(index, outBuffer->GetBufferAttr(), outBuffer->GetFlag(),
346                                                outBuffer->GetBuffer());
347             lock.lock();
348         }
349     }
350     outputCondition_.wait_for(lock, std::chrono::milliseconds(TIMEOUT_MS),
351                               [this] { return (inBufIndexQue_.size() > 0 || !isRunning); });
352     AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Work consumerOutputBuffer exit");
353 }
354 
Dispose()355 void AudioCodecWorker::Dispose()
356 {
357     AVCODEC_LOGD("Worker dispose enter");
358     isRunning = false;
359     outputBuffer_->DisableRunning();
360     {
361         std::unique_lock lock(inputMutex_);
362         inputCondition_.notify_all();
363     }
364     {
365         std::unique_lock lock(outputMutex_);
366         outputCondition_.notify_all();
367     }
368 }
369 
Begin()370 bool AudioCodecWorker::Begin()
371 {
372     AVCODEC_LOGD("Worker begin enter");
373     for (uint32_t i = 0; i < static_cast<uint32_t>(bufferCount); i++) {
374         inBufAvaIndexQue_.push(i);
375     }
376     isRunning = true;
377 
378     inputBuffer_->SetRunning();
379     outputBuffer_->SetRunning();
380 
381     if (inputTask_) {
382         inputTask_->Start();
383     } else {
384         return false;
385     }
386     if (outputTask_) {
387         outputTask_->Start();
388     } else {
389         return false;
390     }
391     inputCondition_.notify_all();
392     outputCondition_.notify_all();
393     return true;
394 }
395 
ReleaseAllInBufferQueue()396 void AudioCodecWorker::ReleaseAllInBufferQueue()
397 {
398     std::lock_guard<std::mutex> lock(stateMutex_);
399     while (!inBufIndexQue_.empty()) {
400         inBufIndexQue_.pop();
401     }
402 }
403 
ReleaseAllInBufferAvaQueue()404 void AudioCodecWorker::ReleaseAllInBufferAvaQueue()
405 {
406     std::lock_guard<std::mutex> lock(inAvaMutex_);
407     while (!inBufAvaIndexQue_.empty()) {
408         inBufAvaIndexQue_.pop();
409     }
410 }
411 
ResetTask()412 void AudioCodecWorker::ResetTask()
413 {
414     if (inputTask_) {
415         inputTask_->Stop();
416         inputTask_.reset();
417         inputTask_ = nullptr;
418     }
419     if (outputTask_) {
420         outputTask_->Stop();
421         outputTask_.reset();
422         outputTask_ = nullptr;
423     }
424 }
425 } // namespace MediaAVCodec
426 } // namespace OHOS