• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define HST_LOG_TAG "Filter"
17 #define MEDIA_PIPELINE
18 
19 #include "filter/filter.h"
20 #include "osal/utils/util.h"
21 #include "common/log.h"
22 #include <algorithm>
23 
24 namespace {
25 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "Filter" };
26 constexpr uint32_t THREAD_PRIORITY_41 = 7;
27 }
28 
29 namespace OHOS {
30 namespace Media {
31 namespace Pipeline {
Filter(std::string name,FilterType type,bool isAyncMode)32 Filter::Filter(std::string name, FilterType type, bool isAyncMode)
33     : name_(std::move(name)), filterType_(type), isAsyncMode_(isAyncMode)
34 {
35 }
36 
~Filter()37 Filter::~Filter()
38 {
39     nextFiltersMap_.clear();
40 }
41 
Init(const std::shared_ptr<EventReceiver> & receiver,const std::shared_ptr<FilterCallback> & callback)42 void Filter::Init(const std::shared_ptr<EventReceiver>& receiver, const std::shared_ptr<FilterCallback>& callback)
43 {
44     receiver_ = receiver;
45     callback_ = callback;
46 }
47 
LinkPipeLine(const std::string & groupId,bool needTurbo)48 void Filter::LinkPipeLine(const std::string &groupId, bool needTurbo)
49 {
50     groupId_ = groupId;
51     MEDIA_LOG_I("Filter %{public}s LinkPipeLine:%{public}s, isAsyncMode_:%{public}d",
52         name_.c_str(), groupId.c_str(), isAsyncMode_);
53     if (isAsyncMode_) {
54         TaskType taskType;
55         switch (filterType_) {
56             case FilterType::FILTERTYPE_VENC:
57             case FilterType::FILTERTYPE_VDEC:
58             case FilterType::VIDEO_CAPTURE:
59                 taskType = TaskType::SINGLETON;
60                 break;
61             case FilterType::FILTERTYPE_ASINK:
62             case FilterType::AUDIO_CAPTURE:
63                 taskType = TaskType::AUDIO;
64                 break;
65             default:
66                 taskType = TaskType::SINGLETON;
67                 break;
68         }
69         filterTask_ = std::make_unique<Task>(name_, groupId_, taskType, TaskPriority::HIGH, false);
70         if (needTurbo) {
71             filterTask_->UpdateThreadPriority(THREAD_PRIORITY_41, "media_service");
72         }
73         filterTask_->SubmitJobOnce([this] {
74             Status ret = DoInitAfterLink();
75             SetErrCode(ret);
76             ChangeState(ret == Status::OK ? FilterState::INITIALIZED : FilterState::ERROR);
77         });
78     } else {
79         Status ret = DoInitAfterLink();
80         SetErrCode(ret);
81         ChangeState(ret == Status::OK ? FilterState::INITIALIZED : FilterState::ERROR);
82     }
83 }
84 
Prepare()85 Status Filter::Prepare()
86 {
87     MEDIA_LOG_D("Prepare %{public}s, pState:%{public}d", name_.c_str(), curState_);
88     if (filterTask_) {
89         filterTask_->SubmitJobOnce([this] {
90             PrepareDone();
91         });
92     } else {
93         return PrepareDone();
94     }
95     return Status::OK;
96 }
97 
PrepareDone()98 Status Filter::PrepareDone()
99 {
100     MEDIA_LOG_I("Prepare enter %{public}s", name_.c_str());
101     if (curState_ == FilterState::ERROR) {
102         // if DoInitAfterLink error, do not prepare.
103         return Status::ERROR_INVALID_OPERATION;
104     }
105     // next filters maybe added in DoPrepare, so we must DoPrepare first
106     Status ret = DoPrepare();
107     SetErrCode(ret);
108     if (ret != Status::OK) {
109         ChangeState(FilterState::ERROR);
110         return ret;
111     }
112     for (auto iter : nextFiltersMap_) {
113         for (auto filter : iter.second) {
114             auto ret = filter->Prepare();
115             if (ret != Status::OK) {
116                 return ret;
117             }
118         }
119     }
120     ChangeState(FilterState::READY);
121     return ret;
122 }
123 
Start()124 Status Filter::Start()
125 {
126     MEDIA_LOG_D("Start %{public}s, pState:%{public}d", name_.c_str(), curState_);
127     if (filterTask_) {
128         filterTask_->SubmitJobOnce([this] {
129             StartDone();
130             filterTask_->Start();
131         });
132         for (auto iter : nextFiltersMap_) {
133             for (auto filter : iter.second) {
134                 filter->Start();
135             }
136         }
137     } else {
138         for (auto iter : nextFiltersMap_) {
139             for (auto filter : iter.second) {
140                 filter->Start();
141             }
142         }
143         return StartDone();
144     }
145     return Status::OK;
146 }
147 
StartDone()148 Status Filter::StartDone()
149 {
150     MEDIA_LOG_I("Start in %{public}s", name_.c_str());
151     Status ret = DoStart();
152     SetErrCode(ret);
153     ChangeState(ret == Status::OK ? FilterState::RUNNING : FilterState::ERROR);
154     return ret;
155 }
156 
Pause()157 Status Filter::Pause()
158 {
159     MEDIA_LOG_D("Pause %{public}s, pState:%{public}d", name_.c_str(), curState_);
160     // In offload case, we need pause to interrupt audio_sink_plugin write function,  so do not use asyncmode
161     auto ret = PauseDone();
162     if (filterTask_) {
163         filterTask_->Pause();
164     }
165     for (auto iter : nextFiltersMap_) {
166         for (auto filter : iter.second) {
167             filter->Pause();
168         }
169     }
170     return ret;
171 }
172 
PauseDragging()173 Status Filter::PauseDragging()
174 {
175     MEDIA_LOG_D("PauseDragging %{public}s, pState:%{public}d", name_.c_str(), curState_);
176     auto ret = DoPauseDragging();
177     if (filterTask_) {
178         filterTask_->Pause();
179     }
180     for (auto iter : nextFiltersMap_) {
181         for (auto filter : iter.second) {
182             auto curRet = filter->PauseDragging();
183             if (curRet != Status::OK) {
184                 ret = curRet;
185             }
186         }
187     }
188     return ret;
189 }
190 
PauseAudioAlign()191 Status Filter::PauseAudioAlign()
192 {
193     MEDIA_LOG_D("PauseAudioAlign %{public}s, pState:%{public}d", name_.c_str(), curState_);
194     auto ret = DoPauseAudioAlign();
195     if (filterTask_) {
196         filterTask_->Pause();
197     }
198     for (auto iter : nextFiltersMap_) {
199         for (auto filter : iter.second) {
200             auto curRet = filter->PauseAudioAlign();
201             if (curRet != Status::OK) {
202                 ret = curRet;
203             }
204         }
205     }
206     return ret;
207 }
208 
PauseDone()209 Status Filter::PauseDone()
210 {
211     MEDIA_LOG_I("Pause in %{public}s", name_.c_str());
212     Status ret = DoPause();
213     SetErrCode(ret);
214     ChangeState(ret == Status::OK ? FilterState::PAUSED : FilterState::ERROR);
215     return ret;
216 }
217 
Resume()218 Status Filter::Resume()
219 {
220     MEDIA_LOG_D("Resume %{public}s, pState:%{public}d", name_.c_str(), curState_);
221     if (filterTask_) {
222         filterTask_->SubmitJobOnce([this]() {
223             ResumeDone();
224             filterTask_->Start();
225         });
226         for (auto iter : nextFiltersMap_) {
227             for (auto filter : iter.second) {
228                 filter->Resume();
229             }
230         }
231     } else {
232         for (auto iter : nextFiltersMap_) {
233             for (auto filter : iter.second) {
234                 filter->Resume();
235             }
236         }
237         return ResumeDone();
238     }
239     return Status::OK;
240 }
241 
ResumeDone()242 Status Filter::ResumeDone()
243 {
244     MEDIA_LOG_I("Resume in %{public}s", name_.c_str());
245     Status ret = DoResume();
246     SetErrCode(ret);
247     ChangeState(ret == Status::OK ? FilterState::RUNNING : FilterState::ERROR);
248     return ret;
249 }
250 
ResumeDragging()251 Status Filter::ResumeDragging()
252 {
253     MEDIA_LOG_D("ResumeDragging %{public}s, pState:%{public}d", name_.c_str(), curState_);
254     auto ret = Status::OK;
255     ret = DoResumeDragging();
256     if (filterTask_) {
257         filterTask_->Start();
258     }
259     for (auto iter : nextFiltersMap_) {
260         for (auto filter : iter.second) {
261             auto curRet = filter->ResumeDragging();
262             if (curRet != Status::OK) {
263                 ret = curRet;
264             }
265         }
266     }
267     return ret;
268 }
269 
ResumeAudioAlign()270 Status Filter::ResumeAudioAlign()
271 {
272     MEDIA_LOG_D("ResumeAudioAlign %{public}s, pState:%{public}d", name_.c_str(), curState_);
273     auto ret = Status::OK;
274     ret = DoResumeAudioAlign();
275     if (filterTask_) {
276         filterTask_->Start();
277     }
278     for (auto iter : nextFiltersMap_) {
279         for (auto filter : iter.second) {
280             auto curRet = filter->ResumeAudioAlign();
281             if (curRet != Status::OK) {
282                 ret = curRet;
283             }
284         }
285     }
286     return ret;
287 }
288 
Stop()289 Status Filter::Stop()
290 {
291     MEDIA_LOG_D("Stop %{public}s, pState:%{public}d", name_.c_str(), curState_);
292     // In offload case, we need stop to interrupt audio_sink_plugin write function,  so do not use asyncmode
293     auto ret = StopDone();
294     if (filterTask_) {
295         filterTask_->Stop();
296     }
297     for (auto iter : nextFiltersMap_) {
298         for (auto filter : iter.second) {
299             filter->Stop();
300         }
301     }
302     return ret;
303 }
304 
StopDone()305 Status Filter::StopDone()
306 {
307     MEDIA_LOG_I("Stop in %{public}s", name_.c_str());
308     Status ret = DoStop();
309     SetErrCode(ret);
310     ChangeState(ret == Status::OK ? FilterState::STOPPED : FilterState::ERROR);
311     return ret;
312 }
313 
Flush()314 Status Filter::Flush()
315 {
316     MEDIA_LOG_D("Flush %{public}s, pState:%{public}d", name_.c_str(), curState_);
317     for (auto iter : nextFiltersMap_) {
318         for (auto filter : iter.second) {
319             filter->Flush();
320         }
321     }
322     jobIdxBase_ = jobIdx_;
323     return DoFlush();
324 }
325 
Release()326 Status Filter::Release()
327 {
328     MEDIA_LOG_D("Release %{public}s, pState:%{public}d", name_.c_str(), curState_);
329     if (filterTask_) {
330         filterTask_->SubmitJobOnce([this]() {
331             ReleaseDone();
332         });
333         for (auto iter : nextFiltersMap_) {
334             for (auto filter : iter.second) {
335                 filter->Release();
336             }
337         }
338     } else {
339         for (auto iter : nextFiltersMap_) {
340             for (auto filter : iter.second) {
341                 filter->Release();
342             }
343         }
344         return ReleaseDone();
345     }
346     return Status::OK;
347 }
348 
ReleaseDone()349 Status Filter::ReleaseDone()
350 {
351     MEDIA_LOG_I("Release in %{public}s", name_.c_str());
352     Status ret = DoRelease();
353     SetErrCode(ret);
354     ChangeState(ret == Status::OK ? FilterState::RELEASED : FilterState::ERROR);
355     return ret;
356 }
357 
Preroll()358 Status Filter::Preroll()
359 {
360     Status ret = DoPreroll();
361     if (ret != Status::OK) {
362         return ret;
363     }
364     for (auto iter : nextFiltersMap_) {
365         for (auto filter : iter.second) {
366             ret = filter->Preroll();
367             if (ret != Status::OK) {
368                 return ret;
369             }
370         }
371     }
372     return Status::OK;
373 }
374 
WaitPrerollDone(bool render)375 Status Filter::WaitPrerollDone(bool render)
376 {
377     Status ret = Status::OK;
378     for (auto iter : nextFiltersMap_) {
379         for (auto filter : iter.second) {
380             auto curRet = filter->WaitPrerollDone(render);
381             if (curRet != Status::OK) {
382                 ret = curRet;
383             }
384         }
385     }
386     auto curRet = DoWaitPrerollDone(render);
387     if (curRet != Status::OK) {
388         ret = curRet;
389     }
390     return ret;
391 }
392 
StartFilterTask()393 void Filter::StartFilterTask()
394 {
395     if (filterTask_) {
396         filterTask_->Start();
397     }
398 }
399 
PauseFilterTask()400 void Filter::PauseFilterTask()
401 {
402     if (filterTask_) {
403         filterTask_->Pause();
404     }
405 }
406 
ClearAllNextFilters()407 Status Filter::ClearAllNextFilters()
408 {
409     nextFiltersMap_.clear();
410     return Status::OK;
411 }
412 
SetPlayRange(int64_t start,int64_t end)413 Status Filter::SetPlayRange(int64_t start, int64_t end)
414 {
415     MEDIA_LOG_D("SetPlayRange %{public}ld, pState:%{public}ld", name_.c_str(), curState_);
416     for (auto iter : nextFiltersMap_) {
417         for (auto filter : iter.second) {
418             filter->SetPlayRange(start, end);
419         }
420     }
421     return DoSetPlayRange(start, end);
422 }
423 
ProcessInputBuffer(int sendArg,int64_t delayUs)424 Status Filter::ProcessInputBuffer(int sendArg, int64_t delayUs)
425 {
426     MEDIA_LOG_D("Filter::ProcessInputBuffer  %{public}s", name_.c_str());
427     if (filterTask_) {
428         jobIdx_++;
429         filterTask_->SubmitJob([this, sendArg]() {
430             processIdx_++;
431             DoProcessInputBuffer(sendArg, processIdx_ <= jobIdxBase_);  // drop frame after flush
432         }, delayUs, 0);
433     } else {
434         Task::SleepInTask(delayUs / 1000); // 1000 convert to ms
435         DoProcessInputBuffer(sendArg, false);
436     }
437     return Status::OK;
438 }
439 
ProcessOutputBuffer(int sendArg,int64_t delayUs,bool byIdx,uint32_t idx,int64_t renderTime)440 Status Filter::ProcessOutputBuffer(int sendArg, int64_t delayUs, bool byIdx, uint32_t idx, int64_t renderTime)
441 {
442     MEDIA_LOG_D("Filter::ProcessOutputBuffer  %{public}s", name_.c_str());
443     if (filterTask_) {
444         jobIdx_++;
445         int64_t processIdx = jobIdx_;
446         filterTask_->SubmitJob([this, sendArg, processIdx, byIdx, idx, renderTime]() {
447             processIdx_++;
448             // drop frame after flush
449             DoProcessOutputBuffer(sendArg, processIdx <= jobIdxBase_, byIdx, idx, renderTime);
450         }, delayUs, 0);
451     } else {
452         Task::SleepInTask(delayUs / 1000); // 1000 convert to ms
453         DoProcessOutputBuffer(sendArg, false, false, idx, renderTime);
454     }
455     return Status::OK;
456 }
457 
SetPerfRecEnabled(bool perfRecNeeded)458 Status Filter::SetPerfRecEnabled(bool perfRecNeeded)
459 {
460     auto ret = DoSetPerfRecEnabled(perfRecNeeded);
461     for (auto iter : nextFiltersMap_) {
462         for (auto filter : iter.second) {
463             filter->SetPerfRecEnabled(perfRecNeeded);
464         }
465     }
466     return ret;
467 }
468 
DoSetPerfRecEnabled(bool perfRecNeeded)469 Status Filter::DoSetPerfRecEnabled(bool perfRecNeeded)
470 {
471     isPerfRecEnabled_ = perfRecNeeded;
472     return Status::OK;
473 }
474 
DoInitAfterLink()475 Status Filter::DoInitAfterLink()
476 {
477     MEDIA_LOG_I("Filter::DoInitAfterLink");
478     return Status::OK;
479 }
480 
DoPrepare()481 Status Filter::DoPrepare()
482 {
483     return Status::OK;
484 }
485 
DoStart()486 Status Filter::DoStart()
487 {
488     return Status::OK;
489 }
490 
DoPause()491 Status Filter::DoPause()
492 {
493     return Status::OK;
494 }
495 
DoPauseDragging()496 Status Filter::DoPauseDragging()
497 {
498     return Status::OK;
499 }
500 
DoPauseAudioAlign()501 Status Filter::DoPauseAudioAlign()
502 {
503     return Status::OK;
504 }
505 
DoResume()506 Status Filter::DoResume()
507 {
508     return Status::OK;
509 }
510 
DoResumeDragging()511 Status Filter::DoResumeDragging()
512 {
513     return Status::OK;
514 }
515 
DoResumeAudioAlign()516 Status Filter::DoResumeAudioAlign()
517 {
518     return Status::OK;
519 }
520 
DoStop()521 Status Filter::DoStop()
522 {
523     return Status::OK;
524 }
525 
DoFlush()526 Status Filter::DoFlush()
527 {
528     return Status::OK;
529 }
530 
DoRelease()531 Status Filter::DoRelease()
532 {
533     return Status::OK;
534 }
535 
DoPreroll()536 Status Filter::DoPreroll()
537 {
538     return Status::OK;
539 }
540 
DoWaitPrerollDone(bool render)541 Status Filter::DoWaitPrerollDone(bool render)
542 {
543     return Status::OK;
544 }
545 
DoSetPlayRange(int64_t start,int64_t end)546 Status Filter::DoSetPlayRange(int64_t start, int64_t end)
547 {
548     return Status::OK;
549 }
550 
DoProcessInputBuffer(int recvArg,bool dropFrame)551 Status Filter::DoProcessInputBuffer(int recvArg, bool dropFrame)
552 {
553     return Status::OK;
554 }
555 
DoProcessOutputBuffer(int recvArg,bool dropFrame,bool byIdx,uint32_t idx,int64_t renderTimee)556 Status Filter::DoProcessOutputBuffer(int recvArg, bool dropFrame, bool byIdx, uint32_t idx, int64_t renderTimee)
557 {
558     return Status::OK;
559 }
560 
ChangeState(FilterState state)561 void Filter::ChangeState(FilterState state)
562 {
563     AutoLock lock(stateMutex_);
564     curState_ = curState_ == FilterState::ERROR ? FilterState::ERROR : state;
565     MEDIA_LOG_I("%{public}s > %{public}d", name_.c_str(), curState_);
566     cond_.NotifyOne();
567 }
568 
WaitAllState(FilterState state)569 Status Filter::WaitAllState(FilterState state)
570 {
571     AutoLock lock(stateMutex_);
572     MEDIA_LOG_I("%{public}s wait %{public}d", name_.c_str(), state);
573     if (curState_ != state) {
574         bool result = cond_.WaitFor(lock, 30000, [this, state] { // 30000 ms timeout
575             return curState_ == state || curState_ == FilterState::ERROR;
576         });
577         if (!result) {
578             SetErrCode(Status::ERROR_TIMED_OUT);
579             return Status::ERROR_TIMED_OUT;
580         }
581         if (curState_ != state) {
582             MEDIA_LOG_E("Filter(%{public}s) wait state %{public}d fail, curState %{public}d",
583                 name_.c_str(), state, curState_);
584             return GetErrCode();
585         }
586     }
587 
588     Status res = Status::OK;
589     for (auto iter : nextFiltersMap_) {
590         for (auto filter : iter.second) {
591             if (filter->WaitAllState(state) != Status::OK) {
592                 res = filter->GetErrCode();
593             }
594         }
595     }
596     return res;
597 }
598 
SetErrCode(Status errCode)599 void Filter::SetErrCode(Status errCode)
600 {
601     errCode_ = errCode;
602 }
603 
GetErrCode()604 Status Filter::GetErrCode()
605 {
606     return errCode_;
607 }
608 
SetParameter(const std::shared_ptr<Meta> & meta)609 void Filter::SetParameter(const std::shared_ptr<Meta>& meta)
610 {
611     meta_ = meta;
612 }
613 
GetParameter(std::shared_ptr<Meta> & meta)614 void Filter::GetParameter(std::shared_ptr<Meta>& meta)
615 {
616     meta = meta_;
617 }
618 
LinkNext(const std::shared_ptr<Filter> &,StreamType)619 Status Filter::LinkNext(const std::shared_ptr<Filter>&, StreamType)
620 {
621     return Status::OK;
622 }
623 
UpdateNext(const std::shared_ptr<Filter> &,StreamType)624 Status Filter::UpdateNext(const std::shared_ptr<Filter>&, StreamType)
625 {
626     return Status::OK;
627 }
628 
UnLinkNext(const std::shared_ptr<Filter> &,StreamType)629 Status Filter::UnLinkNext(const std::shared_ptr<Filter>&, StreamType)
630 {
631     return Status::OK;
632 }
633 
GetFilterType()634 FilterType Filter::GetFilterType()
635 {
636     return filterType_;
637 };
638 
OnLinked(StreamType,const std::shared_ptr<Meta> &,const std::shared_ptr<FilterLinkCallback> &)639 Status Filter::OnLinked(StreamType, const std::shared_ptr<Meta>&, const std::shared_ptr<FilterLinkCallback>&)
640 {
641     return Status::OK;
642 };
643 
OnUpdated(StreamType,const std::shared_ptr<Meta> &,const std::shared_ptr<FilterLinkCallback> &)644 Status Filter::OnUpdated(StreamType, const std::shared_ptr<Meta>&, const std::shared_ptr<FilterLinkCallback>&)
645 {
646     return Status::OK;
647 }
648 
OnUnLinked(StreamType,const std::shared_ptr<FilterLinkCallback> &)649 Status Filter::OnUnLinked(StreamType, const std::shared_ptr<FilterLinkCallback>&)
650 {
651     return Status::OK;
652 }
653 
654 } // namespace Pipeline
655 } // namespace Media
656 } // namespace OHOS
657