1 /*
2 * Copyright (c) 2023-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define HST_LOG_TAG "Filter"
17 #define MEDIA_PIPELINE
18
19 #include "filter/filter.h"
20 #include "osal/utils/util.h"
21 #include "common/log.h"
22 #include <algorithm>
23
24 namespace {
25 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "Filter" };
26 constexpr uint32_t THREAD_PRIORITY_41 = 7;
27 }
28
29 namespace OHOS {
30 namespace Media {
31 namespace Pipeline {
Filter(std::string name,FilterType type,bool isAyncMode)32 Filter::Filter(std::string name, FilterType type, bool isAyncMode)
33 : name_(std::move(name)), filterType_(type), isAsyncMode_(isAyncMode)
34 {
35 }
36
~Filter()37 Filter::~Filter()
38 {
39 nextFiltersMap_.clear();
40 }
41
Init(const std::shared_ptr<EventReceiver> & receiver,const std::shared_ptr<FilterCallback> & callback)42 void Filter::Init(const std::shared_ptr<EventReceiver>& receiver, const std::shared_ptr<FilterCallback>& callback)
43 {
44 receiver_ = receiver;
45 callback_ = callback;
46 }
47
LinkPipeLine(const std::string & groupId,bool needTurbo)48 void Filter::LinkPipeLine(const std::string &groupId, bool needTurbo)
49 {
50 groupId_ = groupId;
51 MEDIA_LOG_I("Filter %{public}s LinkPipeLine:%{public}s, isAsyncMode_:%{public}d",
52 name_.c_str(), groupId.c_str(), isAsyncMode_);
53 if (isAsyncMode_) {
54 TaskType taskType;
55 switch (filterType_) {
56 case FilterType::FILTERTYPE_VENC:
57 case FilterType::FILTERTYPE_VDEC:
58 case FilterType::VIDEO_CAPTURE:
59 taskType = TaskType::SINGLETON;
60 break;
61 case FilterType::FILTERTYPE_ASINK:
62 case FilterType::AUDIO_CAPTURE:
63 taskType = TaskType::AUDIO;
64 break;
65 default:
66 taskType = TaskType::SINGLETON;
67 break;
68 }
69 filterTask_ = std::make_unique<Task>(name_, groupId_, taskType, TaskPriority::HIGH, false);
70 if (needTurbo) {
71 filterTask_->UpdateThreadPriority(THREAD_PRIORITY_41, "media_service");
72 }
73 filterTask_->SubmitJobOnce([this] {
74 Status ret = DoInitAfterLink();
75 SetErrCode(ret);
76 ChangeState(ret == Status::OK ? FilterState::INITIALIZED : FilterState::ERROR);
77 });
78 } else {
79 Status ret = DoInitAfterLink();
80 SetErrCode(ret);
81 ChangeState(ret == Status::OK ? FilterState::INITIALIZED : FilterState::ERROR);
82 }
83 }
84
Prepare()85 Status Filter::Prepare()
86 {
87 MEDIA_LOG_D("Prepare %{public}s, pState:%{public}d", name_.c_str(), curState_);
88 if (filterTask_) {
89 filterTask_->SubmitJobOnce([this] {
90 PrepareDone();
91 });
92 } else {
93 return PrepareDone();
94 }
95 return Status::OK;
96 }
97
PrepareDone()98 Status Filter::PrepareDone()
99 {
100 MEDIA_LOG_I("Prepare enter %{public}s", name_.c_str());
101 if (curState_ == FilterState::ERROR) {
102 // if DoInitAfterLink error, do not prepare.
103 return Status::ERROR_INVALID_OPERATION;
104 }
105 // next filters maybe added in DoPrepare, so we must DoPrepare first
106 Status ret = DoPrepare();
107 SetErrCode(ret);
108 if (ret != Status::OK) {
109 ChangeState(FilterState::ERROR);
110 return ret;
111 }
112 for (auto iter : nextFiltersMap_) {
113 for (auto filter : iter.second) {
114 auto ret = filter->Prepare();
115 if (ret != Status::OK) {
116 return ret;
117 }
118 }
119 }
120 ChangeState(FilterState::READY);
121 return ret;
122 }
123
Start()124 Status Filter::Start()
125 {
126 MEDIA_LOG_D("Start %{public}s, pState:%{public}d", name_.c_str(), curState_);
127 if (filterTask_) {
128 filterTask_->SubmitJobOnce([this] {
129 StartDone();
130 filterTask_->Start();
131 });
132 for (auto iter : nextFiltersMap_) {
133 for (auto filter : iter.second) {
134 filter->Start();
135 }
136 }
137 } else {
138 for (auto iter : nextFiltersMap_) {
139 for (auto filter : iter.second) {
140 filter->Start();
141 }
142 }
143 return StartDone();
144 }
145 return Status::OK;
146 }
147
StartDone()148 Status Filter::StartDone()
149 {
150 MEDIA_LOG_I("Start in %{public}s", name_.c_str());
151 Status ret = DoStart();
152 SetErrCode(ret);
153 ChangeState(ret == Status::OK ? FilterState::RUNNING : FilterState::ERROR);
154 return ret;
155 }
156
Pause()157 Status Filter::Pause()
158 {
159 MEDIA_LOG_D("Pause %{public}s, pState:%{public}d", name_.c_str(), curState_);
160 // In offload case, we need pause to interrupt audio_sink_plugin write function, so do not use asyncmode
161 auto ret = PauseDone();
162 if (filterTask_) {
163 filterTask_->Pause();
164 }
165 for (auto iter : nextFiltersMap_) {
166 for (auto filter : iter.second) {
167 filter->Pause();
168 }
169 }
170 return ret;
171 }
172
PauseDragging()173 Status Filter::PauseDragging()
174 {
175 MEDIA_LOG_D("PauseDragging %{public}s, pState:%{public}d", name_.c_str(), curState_);
176 auto ret = DoPauseDragging();
177 if (filterTask_) {
178 filterTask_->Pause();
179 }
180 for (auto iter : nextFiltersMap_) {
181 for (auto filter : iter.second) {
182 auto curRet = filter->PauseDragging();
183 if (curRet != Status::OK) {
184 ret = curRet;
185 }
186 }
187 }
188 return ret;
189 }
190
PauseAudioAlign()191 Status Filter::PauseAudioAlign()
192 {
193 MEDIA_LOG_D("PauseAudioAlign %{public}s, pState:%{public}d", name_.c_str(), curState_);
194 auto ret = DoPauseAudioAlign();
195 if (filterTask_) {
196 filterTask_->Pause();
197 }
198 for (auto iter : nextFiltersMap_) {
199 for (auto filter : iter.second) {
200 auto curRet = filter->PauseAudioAlign();
201 if (curRet != Status::OK) {
202 ret = curRet;
203 }
204 }
205 }
206 return ret;
207 }
208
PauseDone()209 Status Filter::PauseDone()
210 {
211 MEDIA_LOG_I("Pause in %{public}s", name_.c_str());
212 Status ret = DoPause();
213 SetErrCode(ret);
214 ChangeState(ret == Status::OK ? FilterState::PAUSED : FilterState::ERROR);
215 return ret;
216 }
217
Resume()218 Status Filter::Resume()
219 {
220 MEDIA_LOG_D("Resume %{public}s, pState:%{public}d", name_.c_str(), curState_);
221 if (filterTask_) {
222 filterTask_->SubmitJobOnce([this]() {
223 ResumeDone();
224 filterTask_->Start();
225 });
226 for (auto iter : nextFiltersMap_) {
227 for (auto filter : iter.second) {
228 filter->Resume();
229 }
230 }
231 } else {
232 for (auto iter : nextFiltersMap_) {
233 for (auto filter : iter.second) {
234 filter->Resume();
235 }
236 }
237 return ResumeDone();
238 }
239 return Status::OK;
240 }
241
ResumeDone()242 Status Filter::ResumeDone()
243 {
244 MEDIA_LOG_I("Resume in %{public}s", name_.c_str());
245 Status ret = DoResume();
246 SetErrCode(ret);
247 ChangeState(ret == Status::OK ? FilterState::RUNNING : FilterState::ERROR);
248 return ret;
249 }
250
ResumeDragging()251 Status Filter::ResumeDragging()
252 {
253 MEDIA_LOG_D("ResumeDragging %{public}s, pState:%{public}d", name_.c_str(), curState_);
254 auto ret = Status::OK;
255 ret = DoResumeDragging();
256 if (filterTask_) {
257 filterTask_->Start();
258 }
259 for (auto iter : nextFiltersMap_) {
260 for (auto filter : iter.second) {
261 auto curRet = filter->ResumeDragging();
262 if (curRet != Status::OK) {
263 ret = curRet;
264 }
265 }
266 }
267 return ret;
268 }
269
ResumeAudioAlign()270 Status Filter::ResumeAudioAlign()
271 {
272 MEDIA_LOG_D("ResumeAudioAlign %{public}s, pState:%{public}d", name_.c_str(), curState_);
273 auto ret = Status::OK;
274 ret = DoResumeAudioAlign();
275 if (filterTask_) {
276 filterTask_->Start();
277 }
278 for (auto iter : nextFiltersMap_) {
279 for (auto filter : iter.second) {
280 auto curRet = filter->ResumeAudioAlign();
281 if (curRet != Status::OK) {
282 ret = curRet;
283 }
284 }
285 }
286 return ret;
287 }
288
Stop()289 Status Filter::Stop()
290 {
291 MEDIA_LOG_D("Stop %{public}s, pState:%{public}d", name_.c_str(), curState_);
292 // In offload case, we need stop to interrupt audio_sink_plugin write function, so do not use asyncmode
293 auto ret = StopDone();
294 if (filterTask_) {
295 filterTask_->Stop();
296 }
297 for (auto iter : nextFiltersMap_) {
298 for (auto filter : iter.second) {
299 filter->Stop();
300 }
301 }
302 return ret;
303 }
304
StopDone()305 Status Filter::StopDone()
306 {
307 MEDIA_LOG_I("Stop in %{public}s", name_.c_str());
308 Status ret = DoStop();
309 SetErrCode(ret);
310 ChangeState(ret == Status::OK ? FilterState::STOPPED : FilterState::ERROR);
311 return ret;
312 }
313
Flush()314 Status Filter::Flush()
315 {
316 MEDIA_LOG_D("Flush %{public}s, pState:%{public}d", name_.c_str(), curState_);
317 for (auto iter : nextFiltersMap_) {
318 for (auto filter : iter.second) {
319 filter->Flush();
320 }
321 }
322 jobIdxBase_ = jobIdx_;
323 return DoFlush();
324 }
325
Release()326 Status Filter::Release()
327 {
328 MEDIA_LOG_D("Release %{public}s, pState:%{public}d", name_.c_str(), curState_);
329 if (filterTask_) {
330 filterTask_->SubmitJobOnce([this]() {
331 ReleaseDone();
332 });
333 for (auto iter : nextFiltersMap_) {
334 for (auto filter : iter.second) {
335 filter->Release();
336 }
337 }
338 } else {
339 for (auto iter : nextFiltersMap_) {
340 for (auto filter : iter.second) {
341 filter->Release();
342 }
343 }
344 return ReleaseDone();
345 }
346 return Status::OK;
347 }
348
ReleaseDone()349 Status Filter::ReleaseDone()
350 {
351 MEDIA_LOG_I("Release in %{public}s", name_.c_str());
352 Status ret = DoRelease();
353 SetErrCode(ret);
354 ChangeState(ret == Status::OK ? FilterState::RELEASED : FilterState::ERROR);
355 return ret;
356 }
357
Preroll()358 Status Filter::Preroll()
359 {
360 Status ret = DoPreroll();
361 if (ret != Status::OK) {
362 return ret;
363 }
364 for (auto iter : nextFiltersMap_) {
365 for (auto filter : iter.second) {
366 ret = filter->Preroll();
367 if (ret != Status::OK) {
368 return ret;
369 }
370 }
371 }
372 return Status::OK;
373 }
374
WaitPrerollDone(bool render)375 Status Filter::WaitPrerollDone(bool render)
376 {
377 Status ret = Status::OK;
378 for (auto iter : nextFiltersMap_) {
379 for (auto filter : iter.second) {
380 auto curRet = filter->WaitPrerollDone(render);
381 if (curRet != Status::OK) {
382 ret = curRet;
383 }
384 }
385 }
386 auto curRet = DoWaitPrerollDone(render);
387 if (curRet != Status::OK) {
388 ret = curRet;
389 }
390 return ret;
391 }
392
StartFilterTask()393 void Filter::StartFilterTask()
394 {
395 if (filterTask_) {
396 filterTask_->Start();
397 }
398 }
399
PauseFilterTask()400 void Filter::PauseFilterTask()
401 {
402 if (filterTask_) {
403 filterTask_->Pause();
404 }
405 }
406
ClearAllNextFilters()407 Status Filter::ClearAllNextFilters()
408 {
409 nextFiltersMap_.clear();
410 return Status::OK;
411 }
412
SetPlayRange(int64_t start,int64_t end)413 Status Filter::SetPlayRange(int64_t start, int64_t end)
414 {
415 MEDIA_LOG_D("SetPlayRange %{public}ld, pState:%{public}ld", name_.c_str(), curState_);
416 for (auto iter : nextFiltersMap_) {
417 for (auto filter : iter.second) {
418 filter->SetPlayRange(start, end);
419 }
420 }
421 return DoSetPlayRange(start, end);
422 }
423
ProcessInputBuffer(int sendArg,int64_t delayUs)424 Status Filter::ProcessInputBuffer(int sendArg, int64_t delayUs)
425 {
426 MEDIA_LOG_D("Filter::ProcessInputBuffer %{public}s", name_.c_str());
427 if (filterTask_) {
428 jobIdx_++;
429 filterTask_->SubmitJob([this, sendArg]() {
430 processIdx_++;
431 DoProcessInputBuffer(sendArg, processIdx_ <= jobIdxBase_); // drop frame after flush
432 }, delayUs, 0);
433 } else {
434 Task::SleepInTask(delayUs / 1000); // 1000 convert to ms
435 DoProcessInputBuffer(sendArg, false);
436 }
437 return Status::OK;
438 }
439
ProcessOutputBuffer(int sendArg,int64_t delayUs,bool byIdx,uint32_t idx,int64_t renderTime)440 Status Filter::ProcessOutputBuffer(int sendArg, int64_t delayUs, bool byIdx, uint32_t idx, int64_t renderTime)
441 {
442 MEDIA_LOG_D("Filter::ProcessOutputBuffer %{public}s", name_.c_str());
443 if (filterTask_) {
444 jobIdx_++;
445 int64_t processIdx = jobIdx_;
446 filterTask_->SubmitJob([this, sendArg, processIdx, byIdx, idx, renderTime]() {
447 processIdx_++;
448 // drop frame after flush
449 DoProcessOutputBuffer(sendArg, processIdx <= jobIdxBase_, byIdx, idx, renderTime);
450 }, delayUs, 0);
451 } else {
452 Task::SleepInTask(delayUs / 1000); // 1000 convert to ms
453 DoProcessOutputBuffer(sendArg, false, false, idx, renderTime);
454 }
455 return Status::OK;
456 }
457
SetPerfRecEnabled(bool perfRecNeeded)458 Status Filter::SetPerfRecEnabled(bool perfRecNeeded)
459 {
460 auto ret = DoSetPerfRecEnabled(perfRecNeeded);
461 for (auto iter : nextFiltersMap_) {
462 for (auto filter : iter.second) {
463 filter->SetPerfRecEnabled(perfRecNeeded);
464 }
465 }
466 return ret;
467 }
468
DoSetPerfRecEnabled(bool perfRecNeeded)469 Status Filter::DoSetPerfRecEnabled(bool perfRecNeeded)
470 {
471 isPerfRecEnabled_ = perfRecNeeded;
472 return Status::OK;
473 }
474
DoInitAfterLink()475 Status Filter::DoInitAfterLink()
476 {
477 MEDIA_LOG_I("Filter::DoInitAfterLink");
478 return Status::OK;
479 }
480
DoPrepare()481 Status Filter::DoPrepare()
482 {
483 return Status::OK;
484 }
485
DoStart()486 Status Filter::DoStart()
487 {
488 return Status::OK;
489 }
490
DoPause()491 Status Filter::DoPause()
492 {
493 return Status::OK;
494 }
495
DoPauseDragging()496 Status Filter::DoPauseDragging()
497 {
498 return Status::OK;
499 }
500
DoPauseAudioAlign()501 Status Filter::DoPauseAudioAlign()
502 {
503 return Status::OK;
504 }
505
DoResume()506 Status Filter::DoResume()
507 {
508 return Status::OK;
509 }
510
DoResumeDragging()511 Status Filter::DoResumeDragging()
512 {
513 return Status::OK;
514 }
515
DoResumeAudioAlign()516 Status Filter::DoResumeAudioAlign()
517 {
518 return Status::OK;
519 }
520
DoStop()521 Status Filter::DoStop()
522 {
523 return Status::OK;
524 }
525
DoFlush()526 Status Filter::DoFlush()
527 {
528 return Status::OK;
529 }
530
DoRelease()531 Status Filter::DoRelease()
532 {
533 return Status::OK;
534 }
535
DoPreroll()536 Status Filter::DoPreroll()
537 {
538 return Status::OK;
539 }
540
DoWaitPrerollDone(bool render)541 Status Filter::DoWaitPrerollDone(bool render)
542 {
543 return Status::OK;
544 }
545
DoSetPlayRange(int64_t start,int64_t end)546 Status Filter::DoSetPlayRange(int64_t start, int64_t end)
547 {
548 return Status::OK;
549 }
550
DoProcessInputBuffer(int recvArg,bool dropFrame)551 Status Filter::DoProcessInputBuffer(int recvArg, bool dropFrame)
552 {
553 return Status::OK;
554 }
555
DoProcessOutputBuffer(int recvArg,bool dropFrame,bool byIdx,uint32_t idx,int64_t renderTimee)556 Status Filter::DoProcessOutputBuffer(int recvArg, bool dropFrame, bool byIdx, uint32_t idx, int64_t renderTimee)
557 {
558 return Status::OK;
559 }
560
ChangeState(FilterState state)561 void Filter::ChangeState(FilterState state)
562 {
563 AutoLock lock(stateMutex_);
564 curState_ = curState_ == FilterState::ERROR ? FilterState::ERROR : state;
565 MEDIA_LOG_I("%{public}s > %{public}d", name_.c_str(), curState_);
566 cond_.NotifyOne();
567 }
568
WaitAllState(FilterState state)569 Status Filter::WaitAllState(FilterState state)
570 {
571 AutoLock lock(stateMutex_);
572 MEDIA_LOG_I("%{public}s wait %{public}d", name_.c_str(), state);
573 if (curState_ != state) {
574 bool result = cond_.WaitFor(lock, 30000, [this, state] { // 30000 ms timeout
575 return curState_ == state || curState_ == FilterState::ERROR;
576 });
577 if (!result) {
578 SetErrCode(Status::ERROR_TIMED_OUT);
579 return Status::ERROR_TIMED_OUT;
580 }
581 if (curState_ != state) {
582 MEDIA_LOG_E("Filter(%{public}s) wait state %{public}d fail, curState %{public}d",
583 name_.c_str(), state, curState_);
584 return GetErrCode();
585 }
586 }
587
588 Status res = Status::OK;
589 for (auto iter : nextFiltersMap_) {
590 for (auto filter : iter.second) {
591 if (filter->WaitAllState(state) != Status::OK) {
592 res = filter->GetErrCode();
593 }
594 }
595 }
596 return res;
597 }
598
SetErrCode(Status errCode)599 void Filter::SetErrCode(Status errCode)
600 {
601 errCode_ = errCode;
602 }
603
GetErrCode()604 Status Filter::GetErrCode()
605 {
606 return errCode_;
607 }
608
SetParameter(const std::shared_ptr<Meta> & meta)609 void Filter::SetParameter(const std::shared_ptr<Meta>& meta)
610 {
611 meta_ = meta;
612 }
613
GetParameter(std::shared_ptr<Meta> & meta)614 void Filter::GetParameter(std::shared_ptr<Meta>& meta)
615 {
616 meta = meta_;
617 }
618
LinkNext(const std::shared_ptr<Filter> &,StreamType)619 Status Filter::LinkNext(const std::shared_ptr<Filter>&, StreamType)
620 {
621 return Status::OK;
622 }
623
UpdateNext(const std::shared_ptr<Filter> &,StreamType)624 Status Filter::UpdateNext(const std::shared_ptr<Filter>&, StreamType)
625 {
626 return Status::OK;
627 }
628
UnLinkNext(const std::shared_ptr<Filter> &,StreamType)629 Status Filter::UnLinkNext(const std::shared_ptr<Filter>&, StreamType)
630 {
631 return Status::OK;
632 }
633
GetFilterType()634 FilterType Filter::GetFilterType()
635 {
636 return filterType_;
637 };
638
OnLinked(StreamType,const std::shared_ptr<Meta> &,const std::shared_ptr<FilterLinkCallback> &)639 Status Filter::OnLinked(StreamType, const std::shared_ptr<Meta>&, const std::shared_ptr<FilterLinkCallback>&)
640 {
641 return Status::OK;
642 };
643
OnUpdated(StreamType,const std::shared_ptr<Meta> &,const std::shared_ptr<FilterLinkCallback> &)644 Status Filter::OnUpdated(StreamType, const std::shared_ptr<Meta>&, const std::shared_ptr<FilterLinkCallback>&)
645 {
646 return Status::OK;
647 }
648
OnUnLinked(StreamType,const std::shared_ptr<FilterLinkCallback> &)649 Status Filter::OnUnLinked(StreamType, const std::shared_ptr<FilterLinkCallback>&)
650 {
651 return Status::OK;
652 }
653
654 } // namespace Pipeline
655 } // namespace Media
656 } // namespace OHOS
657