• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RECORDER_SUPPORT
16 
17 #define HST_LOG_TAG "MuxerFilter"
18 
19 #include "muxer_filter.h"
20 
21 #include "data_spliter.h"
22 #include "factory/filter_factory.h"
23 #include "foundation/log.h"
24 #include "common/plugin_settings.h"
25 #include "common/plugin_utils.h"
26 #include "pipeline/core/plugin_attr_desc.h"
27 
28 namespace OHOS {
29 namespace Media {
30 namespace Pipeline {
31 namespace {
Intersections(const std::vector<std::shared_ptr<Plugin::PluginInfo>> & caps1,const std::vector<std::pair<std::shared_ptr<Plugin::PluginInfo>,Plugin::Capability>> & caps2)32 std::vector<std::shared_ptr<Plugin::PluginInfo>> Intersections(
33     const std::vector<std::shared_ptr<Plugin::PluginInfo>>& caps1,
34     const std::vector<std::pair<std::shared_ptr<Plugin::PluginInfo>, Plugin::Capability>>& caps2)
35 {
36     std::vector<std::shared_ptr<Plugin::PluginInfo>> intersections;
37     for (const auto& cap1 : caps1) {
38         for (const auto& cap2 : caps2) {
39             if (cap1->name == cap2.first->name) {
40                 intersections.emplace_back(cap1);
41             }
42         }
43     }
44     return intersections;
45 }
46 }
47 static AutoRegisterFilter<MuxerFilter> g_registerFilterHelper("builtin.recorder.muxer");
48 
MuxerFilter(std::string name)49 MuxerFilter::MuxerFilter(std::string name) : FilterBase(std::move(name)),
50     muxerDataSink_(std::make_shared<MuxerDataSink>())
51 {
52     filterType_ = FilterType::MUXER;
53 }
54 
~MuxerFilter()55 MuxerFilter::~MuxerFilter() {}
Init(EventReceiver * receiver,FilterCallback * callback)56 void MuxerFilter::Init(EventReceiver* receiver, FilterCallback* callback)
57 {
58     this->eventReceiver_ = receiver;
59     this->callback_ = callback;
60     inPorts_.clear();
61     outPorts_.clear();
62     outPorts_.emplace_back(std::make_shared<Pipeline::OutPort>(this, PORT_NAME_DEFAULT));
63     muxerDataSink_->muxerFilter_ = this;
64     state_ = FilterState::INITIALIZED;
65 }
UpdateAndInitPluginByInfo(const std::shared_ptr<Plugin::PluginInfo> & selectedPluginInfo)66 bool MuxerFilter::UpdateAndInitPluginByInfo(const std::shared_ptr<Plugin::PluginInfo>& selectedPluginInfo)
67 {
68     if (selectedPluginInfo == nullptr) {
69         MEDIA_LOG_W("no available info to update plugin");
70         return false;
71     }
72     if (plugin_ != nullptr) {
73         if (targetPluginInfo_ != nullptr && targetPluginInfo_->name == selectedPluginInfo->name) {
74             if (plugin_->Reset() == Plugin::Status::OK) {
75                 return true;
76             }
77             MEDIA_LOG_W("reuse previous plugin " PUBLIC_LOG_S " failed, will create new plugin",
78                         targetPluginInfo_->name.c_str());
79         }
80         plugin_->Deinit();
81     }
82 
83     plugin_ = Plugin::PluginManager::Instance().CreateMuxerPlugin(selectedPluginInfo->name);
84     if (plugin_ == nullptr) {
85         MEDIA_LOG_E("cannot create plugin " PUBLIC_LOG_S, selectedPluginInfo->name.c_str());
86         return false;
87     }
88     auto err = TranslatePluginStatus(plugin_->Init());
89     if (err != ErrorCode::SUCCESS) {
90         MEDIA_LOG_E("muxer plugin init error");
91         return false;
92     }
93     plugin_->SetCallback(this);
94     targetPluginInfo_ = selectedPluginInfo;
95     return true;
96 }
97 
Negotiate(const std::string & inPort,const std::shared_ptr<const Plugin::Capability> & upstreamCap,Plugin::Capability & negotiatedCap,const Plugin::TagMap & upstreamParams,Plugin::TagMap & downstreamParams)98 bool MuxerFilter::Negotiate(const std::string& inPort,
99                             const std::shared_ptr<const Plugin::Capability>& upstreamCap,
100                             Plugin::Capability& negotiatedCap,
101                             const Plugin::TagMap& upstreamParams,
102                             Plugin::TagMap& downstreamParams)
103 {
104     if (state_ != FilterState::PREPARING) {
105         MEDIA_LOG_W("muxer filter is not in preparing when negotiate");
106         return false;
107     }
108     hasWriteHeader_ = false;
109     capabilityCache_.emplace_back(std::make_pair(inPort, *upstreamCap));
110     if (capabilityCache_.size() < inPorts_.size()) {
111         return true;
112     }
113     MEDIA_LOG_I("all track caps has been received, start negotiating downstream");
114     auto candidate = FindAvailablePluginsByOutputMime(containerMime_, Plugin::PluginType::MUXER);
115     for (const auto& cache: capabilityCache_) {
116         auto tmp = FindAvailablePlugins(cache.second, Plugin::PluginType::MUXER);
117         candidate = Intersections(candidate, tmp);
118         if (candidate.empty()) {
119             break;
120         }
121     }
122     if (candidate.empty()) {
123         MEDIA_LOG_E("cannot find any available plugins");
124         return false;
125     }
126     auto muxerCap = std::make_shared<Capability>(containerMime_);
127     Capability downCap;
128     if (!outPorts_[0]->Negotiate(muxerCap, downCap, upstreamParams, downstreamParams)) {
129         MEDIA_LOG_E("downstream of muxer filter negotiate failed");
130         return false;
131     }
132     // always use the first candidate plugin info
133     return UpdateAndInitPluginByInfo(candidate[0]);
134 }
AddTrackThenConfigure(const std::pair<std::string,Plugin::Meta> & metaPair)135 ErrorCode MuxerFilter::AddTrackThenConfigure(const std::pair<std::string, Plugin::Meta>& metaPair)
136 {
137     uint32_t trackId = 0;
138     ErrorCode ret = TranslatePluginStatus(plugin_->AddTrack(trackId));
139     if (ret != ErrorCode::SUCCESS) {
140         MEDIA_LOG_E("muxer plugin add track failed");
141         return ret;
142     }
143     trackInfos_.emplace_back(TrackInfo{static_cast<int32_t>(trackId), metaPair.first, false});
144     auto parameterMap = PluginParameterTable::FindAllowedParameterMap(filterType_);
145     for (const auto& keyPair : parameterMap) {
146         auto outValue = metaPair.second.GetData(static_cast<Plugin::MetaID>(keyPair.first));
147         if (outValue &&
148             (keyPair.second.second & PARAM_SET) &&
149             keyPair.second.first(keyPair.first, *outValue)) {
150             plugin_->SetTrackParameter(trackId, keyPair.first, *outValue);
151         } else {
152             if (!HasTagInfo(keyPair.first)) {
153                 MEDIA_LOG_W("tag " PUBLIC_LOG_D32 " is not in map, may be update it?", keyPair.first);
154             } else {
155                 MEDIA_LOG_W("parameter " PUBLIC_LOG_S " in meta is not found or type mismatch",
156                     GetTagStrName(keyPair.first));
157             }
158         }
159     }
160     return ErrorCode::SUCCESS;
161 }
162 
ConfigureToStart()163 ErrorCode MuxerFilter::ConfigureToStart()
164 {
165     ErrorCode ret;
166     for (const auto& cache: metaCache_) {
167         ret = AddTrackThenConfigure(cache);
168         if (ret != ErrorCode::SUCCESS) {
169             MEDIA_LOG_E("add and configure for track from inPort " PUBLIC_LOG_S " failed", cache.first.c_str());
170             return ret;
171         }
172     }
173     // todo add other global meta
174 
175     ret = TranslatePluginStatus(plugin_->Prepare());
176     if (ret != ErrorCode::SUCCESS) {
177         MEDIA_LOG_E("muxer plugin prepare failed");
178         return ret;
179     }
180     ret = TranslatePluginStatus(plugin_->Start());
181     if (ret != ErrorCode::SUCCESS) {
182         MEDIA_LOG_E("muxer plugin start failed");
183     }
184     return ret;
185 }
Configure(const std::string & inPort,const std::shared_ptr<const Plugin::Meta> & upstreamMeta,Plugin::TagMap & upstreamParams,Plugin::TagMap & downstreamParams)186 bool MuxerFilter::Configure(const std::string &inPort, const std::shared_ptr<const Plugin::Meta> &upstreamMeta,
187                             Plugin::TagMap &upstreamParams, Plugin::TagMap &downstreamParams)
188 {
189     std::string tmp;
190     if (!upstreamMeta->GetString(Plugin::MetaID::MIME, tmp)) {
191         MEDIA_LOG_E("stream meta must contain mime, which is not found in current stream from port " PUBLIC_LOG_S,
192                     inPort.c_str());
193         return false;
194     }
195     metaCache_.emplace_back(std::make_pair(inPort, *upstreamMeta));
196     if (metaCache_.size() < inPorts_.size()) {
197         return true;
198     }
199     if (plugin_ == nullptr) {
200         MEDIA_LOG_E("cannot configure when no plugin available");
201         return false;
202     }
203 
204     auto meta = std::make_shared<Plugin::Meta>();
205     meta->SetString(Plugin::MetaID::MIME, containerMime_);
206     if (!outPorts_[0]->Configure(meta, upstreamParams, downstreamParams)) {
207         MEDIA_LOG_E("downstream of muxer filter configure failed");
208         return false;
209     }
210     plugin_->SetDataSink(muxerDataSink_);
211     auto ret = ConfigureToStart();
212     if (ret != ErrorCode::SUCCESS) {
213         MEDIA_LOG_E("muxer filter configure and start error");
214         OnEvent({name_, EventType::EVENT_ERROR, ret});
215         return false;
216     }
217     state_ = FilterState::READY;
218     OnEvent({name_, EventType::EVENT_READY});
219     MEDIA_LOG_I("muxer send EVENT_READY");
220     return true;
221 }
222 
SetOutputFormat(std::string containerMime)223 ErrorCode MuxerFilter::SetOutputFormat(std::string containerMime)
224 {
225     containerMime_ = std::move(containerMime);
226     return ErrorCode::SUCCESS;
227 }
228 
AddTrack(std::shared_ptr<InPort> & trackPort)229 ErrorCode MuxerFilter::AddTrack(std::shared_ptr<InPort> &trackPort)
230 {
231     if (state_ != FilterState::INITIALIZED) {
232         return ErrorCode::ERROR_INVALID_OPERATION;
233     }
234     trackPort = std::make_shared<InPort>(this, std::string(PORT_NAME_DEFAULT) + std::to_string(inPorts_.size()));
235     inPorts_.emplace_back(trackPort);
236     return ErrorCode::SUCCESS;
237 }
238 
SetMaxDuration(uint64_t maxDuration)239 ErrorCode MuxerFilter::SetMaxDuration(uint64_t maxDuration)
240 {
241     return ErrorCode::SUCCESS;
242 }
243 
SetMaxSize(uint64_t maxSize)244 ErrorCode MuxerFilter::SetMaxSize(uint64_t maxSize)
245 {
246     return ErrorCode::SUCCESS;
247 }
248 
StartNextSegment()249 ErrorCode MuxerFilter::StartNextSegment()
250 {
251     return ErrorCode::SUCCESS;
252 }
253 
SendEos()254 ErrorCode MuxerFilter::SendEos()
255 {
256     OSAL::ScopedLock lock(pushDataMutex_);
257     MEDIA_LOG_I("SendEos entered.");
258     eos_ = true;
259     if (hasWriteHeader_ && plugin_) {
260         plugin_->WriteTrailer();
261     }
262     hasWriteHeader_ = false;
263     auto buf = std::make_shared<AVBuffer>();
264     buf->flag |= BUFFER_FLAG_EOS;
265     outPorts_[0]->PushData(buf, -1);
266     metaCache_.clear();
267     return ErrorCode::SUCCESS;
268 }
269 
AllTracksEos()270 bool MuxerFilter::AllTracksEos()
271 {
272     return static_cast<size_t>(eosTrackCnt.load()) == trackInfos_.size();
273 }
UpdateEosState(const std::string & inPort)274 void MuxerFilter::UpdateEosState(const std::string& inPort)
275 {
276     int32_t eosCnt = 0;
277     for (auto& item : trackInfos_) {
278         if (item.inPort == inPort) {
279             item.eos = true;
280         }
281         if (item.eos) {
282             eosCnt++;
283         }
284     }
285     eosTrackCnt = eosCnt;
286 }
287 
PushData(const std::string & inPort,const AVBufferPtr & buffer,int64_t offset)288 ErrorCode MuxerFilter::PushData(const std::string& inPort, const AVBufferPtr& buffer, int64_t offset)
289 {
290     {
291         OSAL::ScopedLock lock(pushDataMutex_);
292         if (state_ != FilterState::READY && state_ != FilterState::PAUSED && state_ != FilterState::RUNNING) {
293             MEDIA_LOG_W("pushing data to muxer when state is " PUBLIC_LOG_D32, static_cast<int>(state_.load()));
294             return ErrorCode::ERROR_INVALID_OPERATION;
295         }
296         if (eos_) {
297             MEDIA_LOG_D("SendEos exit");
298             return ErrorCode::SUCCESS;
299         }
300         // todo we should consider more tracks
301         if (!hasWriteHeader_) {
302             plugin_->WriteHeader();
303             hasWriteHeader_ = true;
304         }
305         if (buffer->GetMemory()->GetSize() != 0) {
306             plugin_->WriteFrame(buffer);
307         }
308 
309         if (buffer->flag & BUFFER_FLAG_EOS) {
310             MEDIA_LOG_I("It is EOS buffer");
311             UpdateEosState(inPort);
312         }
313     }
314     if (AllTracksEos()) {
315         SendEos();
316     }
317     return ErrorCode::SUCCESS;
318 }
319 
WriteAt(int64_t offset,const std::shared_ptr<Plugin::Buffer> & buffer)320 Plugin::Status MuxerFilter::MuxerDataSink::WriteAt(int64_t offset, const std::shared_ptr<Plugin::Buffer> &buffer)
321 {
322     if (muxerFilter_ != nullptr) {
323         muxerFilter_->outPorts_[0]->PushData(buffer, offset);
324     }
325     return Plugin::Status::OK;
326 }
327 
Start()328 ErrorCode MuxerFilter::Start()
329 {
330     eos_ = false;
331     return FilterBase::Start();
332 }
333 } // Pipeline
334 } // Media
335 } // OHOS
336 #endif // RECORDER_SUPPORT