1 /*
2 * Copyright (c) 2021-2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RECORDER_SUPPORT
16
17 #define HST_LOG_TAG "MuxerFilter"
18
19 #include "muxer_filter.h"
20
21 #include "data_spliter.h"
22 #include "factory/filter_factory.h"
23 #include "foundation/log.h"
24 #include "common/plugin_settings.h"
25 #include "common/plugin_utils.h"
26 #include "pipeline/core/plugin_attr_desc.h"
27
28 namespace OHOS {
29 namespace Media {
30 namespace Pipeline {
31 namespace {
Intersections(const std::vector<std::shared_ptr<Plugin::PluginInfo>> & caps1,const std::vector<std::pair<std::shared_ptr<Plugin::PluginInfo>,Plugin::Capability>> & caps2)32 std::vector<std::shared_ptr<Plugin::PluginInfo>> Intersections(
33 const std::vector<std::shared_ptr<Plugin::PluginInfo>>& caps1,
34 const std::vector<std::pair<std::shared_ptr<Plugin::PluginInfo>, Plugin::Capability>>& caps2)
35 {
36 std::vector<std::shared_ptr<Plugin::PluginInfo>> intersections;
37 for (const auto& cap1 : caps1) {
38 for (const auto& cap2 : caps2) {
39 if (cap1->name == cap2.first->name) {
40 intersections.emplace_back(cap1);
41 }
42 }
43 }
44 return intersections;
45 }
46 }
47 static AutoRegisterFilter<MuxerFilter> g_registerFilterHelper("builtin.recorder.muxer");
48
MuxerFilter(std::string name)49 MuxerFilter::MuxerFilter(std::string name) : FilterBase(std::move(name)),
50 muxerDataSink_(std::make_shared<MuxerDataSink>())
51 {
52 filterType_ = FilterType::MUXER;
53 }
54
~MuxerFilter()55 MuxerFilter::~MuxerFilter() {}
Init(EventReceiver * receiver,FilterCallback * callback)56 void MuxerFilter::Init(EventReceiver* receiver, FilterCallback* callback)
57 {
58 this->eventReceiver_ = receiver;
59 this->callback_ = callback;
60 inPorts_.clear();
61 outPorts_.clear();
62 outPorts_.emplace_back(std::make_shared<Pipeline::OutPort>(this, PORT_NAME_DEFAULT));
63 muxerDataSink_->muxerFilter_ = this;
64 state_ = FilterState::INITIALIZED;
65 }
UpdateAndInitPluginByInfo(const std::shared_ptr<Plugin::PluginInfo> & selectedPluginInfo)66 bool MuxerFilter::UpdateAndInitPluginByInfo(const std::shared_ptr<Plugin::PluginInfo>& selectedPluginInfo)
67 {
68 if (selectedPluginInfo == nullptr) {
69 MEDIA_LOG_W("no available info to update plugin");
70 return false;
71 }
72 if (plugin_ != nullptr) {
73 if (targetPluginInfo_ != nullptr && targetPluginInfo_->name == selectedPluginInfo->name) {
74 if (plugin_->Reset() == Plugin::Status::OK) {
75 return true;
76 }
77 MEDIA_LOG_W("reuse previous plugin " PUBLIC_LOG_S " failed, will create new plugin",
78 targetPluginInfo_->name.c_str());
79 }
80 plugin_->Deinit();
81 }
82
83 plugin_ = Plugin::PluginManager::Instance().CreateMuxerPlugin(selectedPluginInfo->name);
84 if (plugin_ == nullptr) {
85 MEDIA_LOG_E("cannot create plugin " PUBLIC_LOG_S, selectedPluginInfo->name.c_str());
86 return false;
87 }
88 auto err = TranslatePluginStatus(plugin_->Init());
89 if (err != ErrorCode::SUCCESS) {
90 MEDIA_LOG_E("muxer plugin init error");
91 return false;
92 }
93 plugin_->SetCallback(this);
94 targetPluginInfo_ = selectedPluginInfo;
95 return true;
96 }
97
Negotiate(const std::string & inPort,const std::shared_ptr<const Plugin::Capability> & upstreamCap,Plugin::Capability & negotiatedCap,const Plugin::TagMap & upstreamParams,Plugin::TagMap & downstreamParams)98 bool MuxerFilter::Negotiate(const std::string& inPort,
99 const std::shared_ptr<const Plugin::Capability>& upstreamCap,
100 Plugin::Capability& negotiatedCap,
101 const Plugin::TagMap& upstreamParams,
102 Plugin::TagMap& downstreamParams)
103 {
104 if (state_ != FilterState::PREPARING) {
105 MEDIA_LOG_W("muxer filter is not in preparing when negotiate");
106 return false;
107 }
108 hasWriteHeader_ = false;
109 capabilityCache_.emplace_back(std::make_pair(inPort, *upstreamCap));
110 if (capabilityCache_.size() < inPorts_.size()) {
111 return true;
112 }
113 MEDIA_LOG_I("all track caps has been received, start negotiating downstream");
114 auto candidate = FindAvailablePluginsByOutputMime(containerMime_, Plugin::PluginType::MUXER);
115 for (const auto& cache: capabilityCache_) {
116 auto tmp = FindAvailablePlugins(cache.second, Plugin::PluginType::MUXER);
117 candidate = Intersections(candidate, tmp);
118 if (candidate.empty()) {
119 break;
120 }
121 }
122 if (candidate.empty()) {
123 MEDIA_LOG_E("cannot find any available plugins");
124 return false;
125 }
126 auto muxerCap = std::make_shared<Capability>(containerMime_);
127 Capability downCap;
128 if (!outPorts_[0]->Negotiate(muxerCap, downCap, upstreamParams, downstreamParams)) {
129 MEDIA_LOG_E("downstream of muxer filter negotiate failed");
130 return false;
131 }
132 // always use the first candidate plugin info
133 return UpdateAndInitPluginByInfo(candidate[0]);
134 }
AddTrackThenConfigure(const std::pair<std::string,Plugin::Meta> & metaPair)135 ErrorCode MuxerFilter::AddTrackThenConfigure(const std::pair<std::string, Plugin::Meta>& metaPair)
136 {
137 uint32_t trackId = 0;
138 ErrorCode ret = TranslatePluginStatus(plugin_->AddTrack(trackId));
139 if (ret != ErrorCode::SUCCESS) {
140 MEDIA_LOG_E("muxer plugin add track failed");
141 return ret;
142 }
143 trackInfos_.emplace_back(TrackInfo{static_cast<int32_t>(trackId), metaPair.first, false});
144 auto parameterMap = PluginParameterTable::FindAllowedParameterMap(filterType_);
145 for (const auto& keyPair : parameterMap) {
146 auto outValue = metaPair.second.GetData(static_cast<Plugin::MetaID>(keyPair.first));
147 if (outValue &&
148 (keyPair.second.second & PARAM_SET) &&
149 keyPair.second.first(keyPair.first, *outValue)) {
150 plugin_->SetTrackParameter(trackId, keyPair.first, *outValue);
151 } else {
152 if (!HasTagInfo(keyPair.first)) {
153 MEDIA_LOG_W("tag " PUBLIC_LOG_D32 " is not in map, may be update it?", keyPair.first);
154 } else {
155 MEDIA_LOG_W("parameter " PUBLIC_LOG_S " in meta is not found or type mismatch",
156 GetTagStrName(keyPair.first));
157 }
158 }
159 }
160 return ErrorCode::SUCCESS;
161 }
162
ConfigureToStart()163 ErrorCode MuxerFilter::ConfigureToStart()
164 {
165 ErrorCode ret;
166 for (const auto& cache: metaCache_) {
167 ret = AddTrackThenConfigure(cache);
168 if (ret != ErrorCode::SUCCESS) {
169 MEDIA_LOG_E("add and configure for track from inPort " PUBLIC_LOG_S " failed", cache.first.c_str());
170 return ret;
171 }
172 }
173 // todo add other global meta
174
175 ret = TranslatePluginStatus(plugin_->Prepare());
176 if (ret != ErrorCode::SUCCESS) {
177 MEDIA_LOG_E("muxer plugin prepare failed");
178 return ret;
179 }
180 ret = TranslatePluginStatus(plugin_->Start());
181 if (ret != ErrorCode::SUCCESS) {
182 MEDIA_LOG_E("muxer plugin start failed");
183 }
184 return ret;
185 }
Configure(const std::string & inPort,const std::shared_ptr<const Plugin::Meta> & upstreamMeta,Plugin::TagMap & upstreamParams,Plugin::TagMap & downstreamParams)186 bool MuxerFilter::Configure(const std::string &inPort, const std::shared_ptr<const Plugin::Meta> &upstreamMeta,
187 Plugin::TagMap &upstreamParams, Plugin::TagMap &downstreamParams)
188 {
189 std::string tmp;
190 if (!upstreamMeta->GetString(Plugin::MetaID::MIME, tmp)) {
191 MEDIA_LOG_E("stream meta must contain mime, which is not found in current stream from port " PUBLIC_LOG_S,
192 inPort.c_str());
193 return false;
194 }
195 metaCache_.emplace_back(std::make_pair(inPort, *upstreamMeta));
196 if (metaCache_.size() < inPorts_.size()) {
197 return true;
198 }
199 if (plugin_ == nullptr) {
200 MEDIA_LOG_E("cannot configure when no plugin available");
201 return false;
202 }
203
204 auto meta = std::make_shared<Plugin::Meta>();
205 meta->SetString(Plugin::MetaID::MIME, containerMime_);
206 if (!outPorts_[0]->Configure(meta, upstreamParams, downstreamParams)) {
207 MEDIA_LOG_E("downstream of muxer filter configure failed");
208 return false;
209 }
210 plugin_->SetDataSink(muxerDataSink_);
211 auto ret = ConfigureToStart();
212 if (ret != ErrorCode::SUCCESS) {
213 MEDIA_LOG_E("muxer filter configure and start error");
214 OnEvent({name_, EventType::EVENT_ERROR, ret});
215 return false;
216 }
217 state_ = FilterState::READY;
218 OnEvent({name_, EventType::EVENT_READY});
219 MEDIA_LOG_I("muxer send EVENT_READY");
220 return true;
221 }
222
SetOutputFormat(std::string containerMime)223 ErrorCode MuxerFilter::SetOutputFormat(std::string containerMime)
224 {
225 containerMime_ = std::move(containerMime);
226 return ErrorCode::SUCCESS;
227 }
228
AddTrack(std::shared_ptr<InPort> & trackPort)229 ErrorCode MuxerFilter::AddTrack(std::shared_ptr<InPort> &trackPort)
230 {
231 if (state_ != FilterState::INITIALIZED) {
232 return ErrorCode::ERROR_INVALID_OPERATION;
233 }
234 trackPort = std::make_shared<InPort>(this, std::string(PORT_NAME_DEFAULT) + std::to_string(inPorts_.size()));
235 inPorts_.emplace_back(trackPort);
236 return ErrorCode::SUCCESS;
237 }
238
SetMaxDuration(uint64_t maxDuration)239 ErrorCode MuxerFilter::SetMaxDuration(uint64_t maxDuration)
240 {
241 return ErrorCode::SUCCESS;
242 }
243
SetMaxSize(uint64_t maxSize)244 ErrorCode MuxerFilter::SetMaxSize(uint64_t maxSize)
245 {
246 return ErrorCode::SUCCESS;
247 }
248
StartNextSegment()249 ErrorCode MuxerFilter::StartNextSegment()
250 {
251 return ErrorCode::SUCCESS;
252 }
253
SendEos()254 ErrorCode MuxerFilter::SendEos()
255 {
256 OSAL::ScopedLock lock(pushDataMutex_);
257 MEDIA_LOG_I("SendEos entered.");
258 eos_ = true;
259 if (hasWriteHeader_ && plugin_) {
260 plugin_->WriteTrailer();
261 }
262 hasWriteHeader_ = false;
263 auto buf = std::make_shared<AVBuffer>();
264 buf->flag |= BUFFER_FLAG_EOS;
265 outPorts_[0]->PushData(buf, -1);
266 metaCache_.clear();
267 return ErrorCode::SUCCESS;
268 }
269
AllTracksEos()270 bool MuxerFilter::AllTracksEos()
271 {
272 return static_cast<size_t>(eosTrackCnt.load()) == trackInfos_.size();
273 }
UpdateEosState(const std::string & inPort)274 void MuxerFilter::UpdateEosState(const std::string& inPort)
275 {
276 int32_t eosCnt = 0;
277 for (auto& item : trackInfos_) {
278 if (item.inPort == inPort) {
279 item.eos = true;
280 }
281 if (item.eos) {
282 eosCnt++;
283 }
284 }
285 eosTrackCnt = eosCnt;
286 }
287
PushData(const std::string & inPort,const AVBufferPtr & buffer,int64_t offset)288 ErrorCode MuxerFilter::PushData(const std::string& inPort, const AVBufferPtr& buffer, int64_t offset)
289 {
290 {
291 OSAL::ScopedLock lock(pushDataMutex_);
292 if (state_ != FilterState::READY && state_ != FilterState::PAUSED && state_ != FilterState::RUNNING) {
293 MEDIA_LOG_W("pushing data to muxer when state is " PUBLIC_LOG_D32, static_cast<int>(state_.load()));
294 return ErrorCode::ERROR_INVALID_OPERATION;
295 }
296 if (eos_) {
297 MEDIA_LOG_D("SendEos exit");
298 return ErrorCode::SUCCESS;
299 }
300 // todo we should consider more tracks
301 if (!hasWriteHeader_) {
302 plugin_->WriteHeader();
303 hasWriteHeader_ = true;
304 }
305 if (buffer->GetMemory()->GetSize() != 0) {
306 plugin_->WriteFrame(buffer);
307 }
308
309 if (buffer->flag & BUFFER_FLAG_EOS) {
310 MEDIA_LOG_I("It is EOS buffer");
311 UpdateEosState(inPort);
312 }
313 }
314 if (AllTracksEos()) {
315 SendEos();
316 }
317 return ErrorCode::SUCCESS;
318 }
319
WriteAt(int64_t offset,const std::shared_ptr<Plugin::Buffer> & buffer)320 Plugin::Status MuxerFilter::MuxerDataSink::WriteAt(int64_t offset, const std::shared_ptr<Plugin::Buffer> &buffer)
321 {
322 if (muxerFilter_ != nullptr) {
323 muxerFilter_->outPorts_[0]->PushData(buffer, offset);
324 }
325 return Plugin::Status::OK;
326 }
327
Start()328 ErrorCode MuxerFilter::Start()
329 {
330 eos_ = false;
331 return FilterBase::Start();
332 }
333 } // Pipeline
334 } // Media
335 } // OHOS
336 #endif // RECORDER_SUPPORT