1 /*
2 * Copyright (c) 2021-2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define HST_LOG_TAG "DemuxerFilter"
17
18 #include "demuxer_filter.h"
19 #include <algorithm>
20 #include "compatible_check.h"
21 #include "factory/filter_factory.h"
22 #include "foundation/log.h"
23 #include "pipeline/filters/common/plugin_utils.h"
24 #include "plugin/common/plugin_time.h"
25 #include "utils/constants.h"
26 #include "utils/steady_clock.h"
27
28 namespace OHOS {
29 namespace Media {
30 namespace Pipeline {
31 static AutoRegisterFilter<DemuxerFilter> g_registerFilterHelper("builtin.player.demuxer");
32
33 class DemuxerFilter::DataSourceImpl : public Plugin::DataSourceHelper {
34 public:
35 explicit DataSourceImpl(const DemuxerFilter& filter);
36 ~DataSourceImpl() override = default;
37 Plugin::Status ReadAt(int64_t offset, std::shared_ptr<Plugin::Buffer>& buffer, size_t expectedLen) override;
38 Plugin::Status GetSize(size_t& size) override;
39
40 private:
41 const DemuxerFilter& filter;
42 };
43
DataSourceImpl(const DemuxerFilter & filter)44 DemuxerFilter::DataSourceImpl::DataSourceImpl(const DemuxerFilter& filter) : filter(filter)
45 {
46 }
47
48 /**
49 * ReadAt Plugin::DataSource::ReadAt implementation.
50 * @param offset offset in media stream.
51 * @param buffer caller allocate real buffer.
52 * @param expectedLen buffer size wanted to read.
53 * @return read result.
54 */
ReadAt(int64_t offset,std::shared_ptr<Plugin::Buffer> & buffer,size_t expectedLen)55 Plugin::Status DemuxerFilter::DataSourceImpl::ReadAt(int64_t offset, std::shared_ptr<Plugin::Buffer>& buffer,
56 size_t expectedLen)
57 {
58 if (!buffer || buffer->IsEmpty() || expectedLen == 0 || !filter.IsOffsetValid(offset)) {
59 MEDIA_LOG_E("ReadAt failed, buffer empty: %" PUBLIC_LOG "d, expectedLen: %" PUBLIC_LOG
60 "d, offset: %" PUBLIC_LOG PRId64, !buffer, static_cast<int>(expectedLen), offset);
61 return Plugin::Status::ERROR_UNKNOWN;
62 }
63 Plugin::Status rtv = Plugin::Status::OK;
64 switch (filter.pluginState_.load()) {
65 case DemuxerState::DEMUXER_STATE_NULL:
66 rtv = Plugin::Status::ERROR_WRONG_STATE;
67 MEDIA_LOG_E("ReadAt error due to DEMUXER_STATE_NULL");
68 break;
69 case DemuxerState::DEMUXER_STATE_PARSE_HEADER: {
70 if (!filter.peekRange_(static_cast<uint64_t>(offset), expectedLen, buffer)) {
71 rtv = Plugin::Status::ERROR_NOT_ENOUGH_DATA;
72 }
73 break;
74 }
75 case DemuxerState::DEMUXER_STATE_PARSE_FRAME: {
76 if (!filter.getRange_(static_cast<uint64_t>(offset), expectedLen, buffer)) {
77 rtv = Plugin::Status::END_OF_STREAM;
78 }
79 break;
80 }
81 default:
82 break;
83 }
84 return rtv;
85 }
86
GetSize(size_t & size)87 Plugin::Status DemuxerFilter::DataSourceImpl::GetSize(size_t& size)
88 {
89 size = filter.mediaDataSize_;
90 return (filter.mediaDataSize_ > 0) ? Plugin::Status::OK : Plugin::Status::ERROR_WRONG_STATE;
91 }
92
DemuxerFilter(std::string name)93 DemuxerFilter::DemuxerFilter(std::string name)
94 : FilterBase(std::move(name)),
95 uriSuffix_(),
96 mediaDataSize_(0),
97 task_(nullptr),
98 typeFinder_(nullptr),
99 dataPacker_(nullptr),
100 pluginName_(),
101 plugin_(nullptr),
102 pluginState_(DemuxerState::DEMUXER_STATE_NULL),
103 pluginAllocator_(nullptr),
104 dataSource_(std::make_shared<DataSourceImpl>(*this)),
105 mediaMetaData_()
106 {
107 filterType_ = FilterType::DEMUXER;
108 MEDIA_LOG_D("ctor called");
109 }
110
~DemuxerFilter()111 DemuxerFilter::~DemuxerFilter()
112 {
113 MEDIA_LOG_I("dtor called");
114 if (task_) {
115 task_->Stop();
116 }
117 if (plugin_) {
118 plugin_->Deinit();
119 }
120 }
121
Init(EventReceiver * receiver,FilterCallback * callback)122 void DemuxerFilter::Init(EventReceiver* receiver, FilterCallback* callback)
123 {
124 this->eventReceiver_ = receiver;
125 this->callback_ = callback;
126 inPorts_.clear();
127 outPorts_.clear();
128 inPorts_.push_back(std::make_shared<Pipeline::InPort>(this, PORT_NAME_DEFAULT));
129 state_ = FilterState::INITIALIZED;
130 }
131
Start()132 ErrorCode DemuxerFilter::Start()
133 {
134 MEDIA_LOG_I("Start called.");
135 if (task_) {
136 task_->Start();
137 }
138 return FilterBase::Start();
139 }
140
Stop()141 ErrorCode DemuxerFilter::Stop()
142 {
143 MEDIA_LOG_I("Stop called.");
144 if (task_) {
145 task_->Pause();
146 }
147 Reset();
148 if (!outPorts_.empty()) {
149 PortInfo portInfo;
150 portInfo.type = PortType::OUT;
151 portInfo.ports.reserve(outPorts_.size());
152 for (const auto& outPort : outPorts_) {
153 portInfo.ports.push_back({outPort->GetName(), false});
154 }
155 if (callback_) {
156 callback_->OnCallback(FilterCallbackType::PORT_REMOVE, static_cast<Filter*>(this), portInfo);
157 }
158 }
159 return FilterBase::Stop();
160 }
161
Pause()162 ErrorCode DemuxerFilter::Pause()
163 {
164 MEDIA_LOG_I("Pause called");
165 return FilterBase::Pause();
166 }
167
FlushStart()168 void DemuxerFilter::FlushStart()
169 {
170 MEDIA_LOG_I("FlushStart entered");
171 if (dataPacker_) {
172 dataPacker_->Flush();
173 }
174 if (task_) {
175 task_->Pause();
176 }
177 }
178
FlushEnd()179 void DemuxerFilter::FlushEnd()
180 {
181 MEDIA_LOG_I("FlushEnd entered");
182 }
183
Prepare()184 ErrorCode DemuxerFilter::Prepare()
185 {
186 MEDIA_LOG_I("Prepare called");
187 pluginState_ = DemuxerState::DEMUXER_STATE_NULL;
188 Pipeline::WorkMode mode;
189 GetInPort(PORT_NAME_DEFAULT)->Activate({Pipeline::WorkMode::PULL, Pipeline::WorkMode::PUSH}, mode);
190 if (mode == Pipeline::WorkMode::PULL) {
191 ActivatePullMode();
192 } else {
193 ActivatePushMode();
194 }
195 state_ = FilterState::PREPARING;
196 return ErrorCode::SUCCESS;
197 }
198
PushData(const std::string & inPort,AVBufferPtr buffer,int64_t offset)199 ErrorCode DemuxerFilter::PushData(const std::string& inPort, AVBufferPtr buffer, int64_t offset)
200 {
201 MEDIA_LOG_D("PushData for port: %" PUBLIC_LOG "s", inPort.c_str());
202 if (dataPacker_) {
203 dataPacker_->PushData(std::move(buffer), offset);
204 }
205 return ErrorCode::SUCCESS;
206 }
207
Negotiate(const std::string & inPort,const std::shared_ptr<const Plugin::Capability> & upstreamCap,Plugin::Capability & negotiatedCap,const Plugin::TagMap & upstreamParams,Plugin::TagMap & downstreamParams)208 bool DemuxerFilter::Negotiate(const std::string& inPort,
209 const std::shared_ptr<const Plugin::Capability>& upstreamCap,
210 Plugin::Capability& negotiatedCap,
211 const Plugin::TagMap& upstreamParams,
212 Plugin::TagMap& downstreamParams)
213 {
214 (void)inPort;
215 (void)upstreamCap;
216 (void)negotiatedCap;
217 (void)upstreamParams;
218 (void)downstreamParams;
219 return true;
220 }
221
Configure(const std::string & inPort,const std::shared_ptr<const Plugin::Meta> & upstreamMeta)222 bool DemuxerFilter::Configure(const std::string& inPort, const std::shared_ptr<const Plugin::Meta>& upstreamMeta)
223 {
224 (void)upstreamMeta->GetUint64(Plugin::MetaID::MEDIA_FILE_SIZE, mediaDataSize_);
225 return upstreamMeta->GetString(Plugin::MetaID::MEDIA_FILE_EXTENSION, uriSuffix_);
226 }
227
SeekTo(int64_t pos)228 ErrorCode DemuxerFilter::SeekTo(int64_t pos)
229 {
230 if (!plugin_) {
231 MEDIA_LOG_E("SeekTo failed due to no valid plugin");
232 return ErrorCode::ERROR_INVALID_OPERATION;
233 }
234 auto rtv = TranslatePluginStatus(plugin_->SeekTo(-1, pos, Plugin::SeekMode::BACKWARD));
235 if (rtv == ErrorCode::SUCCESS) {
236 if (task_) {
237 task_->Start();
238 }
239 } else {
240 MEDIA_LOG_E("SeekTo failed with return value: %" PUBLIC_LOG "d", static_cast<int>(rtv));
241 }
242 return rtv;
243 }
244
GetStreamMetaInfo() const245 std::vector<std::shared_ptr<Plugin::Meta>> DemuxerFilter::GetStreamMetaInfo() const
246 {
247 return mediaMetaData_.trackMetas;
248 }
249
GetGlobalMetaInfo() const250 std::shared_ptr<Plugin::Meta> DemuxerFilter::GetGlobalMetaInfo() const
251 {
252 return mediaMetaData_.globalMeta;
253 }
254
Reset()255 void DemuxerFilter::Reset()
256 {
257 mediaMetaData_.globalMeta.reset();
258 mediaMetaData_.trackMetas.clear();
259 mediaMetaData_.trackInfos.clear();
260 }
261
InitTypeFinder()262 void DemuxerFilter::InitTypeFinder()
263 {
264 if (!typeFinder_) {
265 typeFinder_ = std::make_shared<TypeFinder>();
266 }
267 }
268
CreatePlugin(std::string pluginName)269 bool DemuxerFilter::CreatePlugin(std::string pluginName)
270 {
271 if (plugin_) {
272 plugin_->Deinit();
273 }
274 plugin_ = Plugin::PluginManager::Instance().CreateDemuxerPlugin(pluginName);
275 if (!plugin_ || plugin_->Init() != Plugin::Status::OK) {
276 MEDIA_LOG_E("CreatePlugin %" PUBLIC_LOG "s failed.", pluginName.c_str());
277 return false;
278 }
279 plugin_->SetCallback(this);
280 pluginAllocator_ = plugin_->GetAllocator();
281 pluginName_.swap(pluginName);
282 return true;
283 }
284
InitPlugin(std::string pluginName)285 bool DemuxerFilter::InitPlugin(std::string pluginName)
286 {
287 if (pluginName.empty()) {
288 return false;
289 }
290 if (pluginName_ != pluginName) {
291 if (!CreatePlugin(std::move(pluginName))) {
292 return false;
293 }
294 } else {
295 if (plugin_->Reset() != Plugin::Status::OK) {
296 if (!CreatePlugin(std::move(pluginName))) {
297 return false;
298 }
299 }
300 }
301 MEDIA_LOG_I("InitPlugin, %" PUBLIC_LOG "s used.", pluginName_.c_str());
302 (void)plugin_->SetDataSource(std::dynamic_pointer_cast<Plugin::DataSourceHelper>(dataSource_));
303 pluginState_ = DemuxerState::DEMUXER_STATE_PARSE_HEADER;
304 return plugin_->Prepare() == Plugin::Status::OK;
305 }
306
ActivatePullMode()307 void DemuxerFilter::ActivatePullMode()
308 {
309 MEDIA_LOG_D("ActivatePullMode called");
310 InitTypeFinder();
311 if (!task_) {
312 task_ = std::make_shared<OSAL::Task>("DemuxerFilter");
313 }
314 task_->RegisterHandler([this] { DemuxerLoop(); });
315 checkRange_ = [this](uint64_t offset, uint32_t size) {
316 return (offset < mediaDataSize_) && (size <= mediaDataSize_) && (offset <= (mediaDataSize_ - size));
317 };
318 peekRange_ = [this](uint64_t offset, size_t size, AVBufferPtr& bufferPtr) -> bool {
319 return inPorts_.front()->PullData(offset, size, bufferPtr) == ErrorCode::SUCCESS;
320 };
321 getRange_ = peekRange_;
322 typeFinder_->Init(uriSuffix_, mediaDataSize_, checkRange_, peekRange_);
323 MediaTypeFound(typeFinder_->FindMediaType());
324 }
325
ActivatePushMode()326 void DemuxerFilter::ActivatePushMode()
327 {
328 MEDIA_LOG_D("ActivatePushMode called");
329 InitTypeFinder();
330 if (!dataPacker_) {
331 dataPacker_ = std::make_shared<DataPacker>();
332 }
333 checkRange_ = [this](uint64_t offset, uint32_t size) {
334 uint64_t curOffset = offset;
335 return dataPacker_->IsDataAvailable(offset, size, curOffset);
336 };
337 peekRange_ = [this](uint64_t offset, size_t size, AVBufferPtr& bufferPtr) -> bool {
338 return dataPacker_->PeekRange(offset, size, bufferPtr);
339 };
340 getRange_ = [this](uint64_t offset, size_t size, AVBufferPtr& bufferPtr) -> bool {
341 return dataPacker_->GetRange(offset, size, bufferPtr);
342 };
343 typeFinder_->Init(uriSuffix_, mediaDataSize_, checkRange_, peekRange_);
344 typeFinder_->FindMediaTypeAsync([this](std::string pluginName) { MediaTypeFound(std::move(pluginName)); });
345 }
346
MediaTypeFound(std::string pluginName)347 void DemuxerFilter::MediaTypeFound(std::string pluginName)
348 {
349 if (InitPlugin(std::move(pluginName))) {
350 task_->Start();
351 } else {
352 OnEvent({name_, EventType::EVENT_ERROR, ErrorCode::ERROR_UNSUPPORTED_FORMAT});
353 }
354 }
355
InitMediaMetaData(const Plugin::MediaInfoHelper & mediaInfo)356 void DemuxerFilter::InitMediaMetaData(const Plugin::MediaInfoHelper& mediaInfo)
357 {
358 mediaMetaData_.globalMeta = std::make_shared<Plugin::Meta>(mediaInfo.globalMeta);
359 mediaMetaData_.trackMetas.clear();
360 int trackCnt = 0;
361 for (auto& trackMeta : mediaInfo.trackMeta) {
362 mediaMetaData_.trackMetas.push_back(std::make_shared<Plugin::Meta>(trackMeta));
363 if (!trackMeta.Empty()) {
364 ++trackCnt;
365 }
366 }
367 mediaMetaData_.trackInfos.reserve(trackCnt);
368 }
369
IsOffsetValid(int64_t offset) const370 bool DemuxerFilter::IsOffsetValid(int64_t offset) const
371 {
372 return mediaDataSize_ == 0 || offset <= static_cast<int64_t>(mediaDataSize_);
373 }
374
PrepareStreams(const Plugin::MediaInfoHelper & mediaInfo)375 bool DemuxerFilter::PrepareStreams(const Plugin::MediaInfoHelper& mediaInfo)
376 {
377 MEDIA_LOG_I("PrepareStreams called");
378 InitMediaMetaData(mediaInfo);
379 outPorts_.clear();
380 int streamCnt = mediaInfo.trackMeta.size();
381 PortInfo portInfo;
382 portInfo.type = PortType::OUT;
383 portInfo.ports.reserve(streamCnt);
384 int audioTrackCnt = 0;
385 for (int i = 0; i < streamCnt; ++i) {
386 if (mediaInfo.trackMeta[i].Empty()) {
387 MEDIA_LOG_E("PrepareStreams, unsupported stream with trackId = %" PUBLIC_LOG "d", i);
388 continue;
389 }
390 std::string mime;
391 uint32_t trackId = 0;
392 if (!mediaInfo.trackMeta[i].GetString(Plugin::MetaID::MIME, mime) ||
393 !mediaInfo.trackMeta[i].GetUint32(Plugin::MetaID::TRACK_ID, trackId)) {
394 MEDIA_LOG_E("PrepareStreams failed to extract mime or trackId.");
395 continue;
396 }
397 if (IsAudioMime(mime)) {
398 MEDIA_LOG_D("PrepareStreams, audio stream with trackId = %" PUBLIC_LOG "u.", trackId);
399 if (audioTrackCnt == 1) {
400 MEDIA_LOG_E("PrepareStreams, discard audio track: %" PUBLIC_LOG "d.", trackId);
401 continue;
402 }
403 ++audioTrackCnt;
404 }
405 auto port = std::make_shared<OutPort>(this, NamePort(mime));
406 MEDIA_LOG_I("PrepareStreams, trackId: %" PUBLIC_LOG "d, portName: %" PUBLIC_LOG "s",
407 i, port->GetName().c_str());
408 outPorts_.push_back(port);
409 portInfo.ports.push_back({port->GetName(), IsRawAudio(mime)});
410 mediaMetaData_.trackInfos.emplace_back(trackId, std::move(port), true);
411 }
412 if (portInfo.ports.empty()) {
413 MEDIA_LOG_E("PrepareStreams failed due to no valid port.");
414 return false;
415 }
416 ErrorCode ret = ErrorCode::SUCCESS;
417 if (callback_) {
418 ret = callback_->OnCallback(FilterCallbackType::PORT_ADDED, static_cast<Filter*>(this), portInfo);
419 }
420 return ret == ErrorCode::SUCCESS;
421 }
422
ReadFrame(AVBuffer & buffer,uint32_t & trackId)423 ErrorCode DemuxerFilter::ReadFrame(AVBuffer& buffer, uint32_t& trackId)
424 {
425 MEDIA_LOG_D("ReadFrame called");
426 ErrorCode result = ErrorCode::ERROR_UNKNOWN;
427 auto rtv = plugin_->ReadFrame(buffer, 0);
428 if (rtv == Plugin::Status::OK) {
429 trackId = buffer.trackID;
430 result = ErrorCode::SUCCESS;
431 }
432 MEDIA_LOG_D("ReadFrame return with rtv = %" PUBLIC_LOG "d", static_cast<int32_t>(rtv));
433 return (rtv != Plugin::Status::END_OF_STREAM) ? result : ErrorCode::END_OF_STREAM;
434 }
435
GetTrackMeta(uint32_t trackId)436 std::shared_ptr<Plugin::Meta> DemuxerFilter::GetTrackMeta(uint32_t trackId)
437 {
438 return (trackId < mediaMetaData_.trackMetas.size()) ? mediaMetaData_.trackMetas[trackId] : nullptr;
439 }
440
SendEventEos()441 void DemuxerFilter::SendEventEos()
442 {
443 MEDIA_LOG_I("SendEventEos called");
444 AVBufferPtr bufferPtr = std::make_shared<AVBuffer>();
445 bufferPtr->flag = BUFFER_FLAG_EOS;
446 for (const auto& stream : mediaMetaData_.trackInfos) {
447 stream.port->PushData(bufferPtr, -1);
448 }
449 }
450
HandleFrame(const AVBufferPtr & bufferPtr,uint32_t trackId)451 void DemuxerFilter::HandleFrame(const AVBufferPtr& bufferPtr, uint32_t trackId)
452 {
453 for (auto& stream : mediaMetaData_.trackInfos) {
454 if (stream.trackId != trackId) {
455 continue;
456 }
457 stream.port->PushData(bufferPtr, -1);
458 break;
459 }
460 }
461
NegotiateDownstream()462 void DemuxerFilter::NegotiateDownstream()
463 {
464 PROFILE_BEGIN("NegotiateDownstream profile begins.");
465 for (auto& stream : mediaMetaData_.trackInfos) {
466 if (stream.needNegoCaps) {
467 Capability caps;
468 MEDIA_LOG_I("demuxer negotiate with trackId: %" PUBLIC_LOG "u", stream.trackId);
469 auto streamMeta = GetTrackMeta(stream.trackId);
470 auto tmpCap = MetaToCapability(*streamMeta);
471 Plugin::TagMap upstreamParams;
472 Plugin::TagMap downstreamParams;
473 if (stream.port->Negotiate(tmpCap, caps, upstreamParams, downstreamParams) &&
474 stream.port->Configure(streamMeta)) {
475 stream.needNegoCaps = false;
476 } else {
477 task_->PauseAsync();
478 OnEvent({name_, EventType::EVENT_ERROR, ErrorCode::ERROR_UNSUPPORTED_FORMAT});
479 }
480 }
481 }
482 PROFILE_END("NegotiateDownstream end.");
483 }
484
DemuxerLoop()485 void DemuxerFilter::DemuxerLoop()
486 {
487 if (pluginState_.load() == DemuxerState::DEMUXER_STATE_PARSE_FRAME) {
488 AVBufferPtr bufferPtr = std::make_shared<AVBuffer>();
489 uint32_t streamIndex = 0;
490 auto rtv = ReadFrame(*bufferPtr, streamIndex);
491 if (rtv == ErrorCode::SUCCESS) {
492 HandleFrame(bufferPtr, streamIndex);
493 } else {
494 SendEventEos();
495 task_->PauseAsync();
496 if (rtv != ErrorCode::END_OF_STREAM) {
497 MEDIA_LOG_E("ReadFrame failed with rtv = %" PUBLIC_LOG "d", to_underlying(rtv));
498 }
499 }
500 } else {
501 Plugin::MediaInfoHelper mediaInfo;
502 PROFILE_BEGIN();
503 if (plugin_->GetMediaInfo(mediaInfo) == Plugin::Status::OK && PrepareStreams(mediaInfo)) {
504 PROFILE_END("Succeed to extract mediainfo.");
505 NegotiateDownstream();
506 pluginState_ = DemuxerState::DEMUXER_STATE_PARSE_FRAME;
507 state_ = FilterState::READY;
508 OnEvent({name_, EventType::EVENT_READY, {}});
509 } else {
510 task_->PauseAsync();
511 MEDIA_LOG_E("demuxer filter parse meta failed");
512 OnEvent({name_, EventType::EVENT_ERROR, ErrorCode::ERROR_UNKNOWN});
513 }
514 }
515 }
516 } // namespace Pipeline
517 } // namespace Media
518 } // namespace OHOS
519