1 /*
2 * Copyright (C) 2023-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define HST_LOG_TAG "DataStreamSourcePlugin"
17
18 #ifndef OHOS_LITE
19 #include "data_stream_source_plugin.h"
20 #include "common/log.h"
21 #include "common/media_core.h"
22 #include "osal/utils/util.h"
23
24 namespace {
25 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_SYSTEM_PLAYER, "HiStreamer" };
26 }
27
28 namespace OHOS {
29 namespace Media {
30 namespace Plugin {
31 namespace DataStreamSource {
32 namespace {
33 constexpr uint32_t INIT_MEM_CNT = 10;
34 constexpr int32_t MEM_SIZE = 10240;
35 constexpr uint32_t MAX_MEM_CNT = 10 * 1024;
36 constexpr size_t DEFAULT_PREDOWNLOAD_SIZE_BYTE = 10 * 1024 * 1024;
37 constexpr uint32_t DEFAULT_RETRY_TIMES = 20;
38 constexpr uint32_t READ_AGAIN_RETRY_TIME_ONE = 100;
39 constexpr uint32_t READ_AGAIN_RETRY_TIME_TWO = 200;
40 constexpr uint32_t READ_AGAIN_RETRY_TIME_THREE = 500;
41 constexpr uint32_t RETRY_TIMES_ONE = 5; // The number of attempts determines the sleep duration.
42 constexpr uint32_t RETRY_TIMES_TWO = 15;
43 }
DataStreamSourcePluginCreator(const std::string & name)44 std::shared_ptr<Plugins::SourcePlugin> DataStreamSourcePluginCreator(const std::string& name)
45 {
46 return std::make_shared<DataStreamSourcePlugin>(name);
47 }
48
DataStreamSourceRegister(const std::shared_ptr<Plugins::Register> & reg)49 Status DataStreamSourceRegister(const std::shared_ptr<Plugins::Register>& reg)
50 {
51 Plugins::SourcePluginDef definition;
52 definition.name = "DataStreamSource";
53 definition.description = "Data stream source";
54 definition.rank = 100; // 100: max rank
55 Plugins::Capability capability;
56 capability.AppendFixedKey<std::vector<Plugins::ProtocolType>>(
57 Tag::MEDIA_PROTOCOL_TYPE, {Plugins::ProtocolType::STREAM});
58 definition.AddInCaps(capability);
59 definition.SetCreator(DataStreamSourcePluginCreator);
60 return reg->AddPlugin(definition);
61 }
62
__anona3c22cca0302null63 PLUGIN_DEFINITION(DataStreamSource, Plugins::LicenseType::APACHE_V2, DataStreamSourceRegister, [] {});
64
DataStreamSourcePlugin(std::string name)65 DataStreamSourcePlugin::DataStreamSourcePlugin(std::string name)
66 : SourcePlugin(std::move(name))
67 {
68 MEDIA_LOG_D("ctor called");
69 pool_ = std::make_shared<AVSharedMemoryPool>("pool");
70 InitPool();
71 }
72
~DataStreamSourcePlugin()73 DataStreamSourcePlugin::~DataStreamSourcePlugin()
74 {
75 MEDIA_LOG_D("dtor called");
76 ResetPool();
77 }
78
SetSource(std::shared_ptr<Plugins::MediaSource> source)79 Status DataStreamSourcePlugin::SetSource(std::shared_ptr<Plugins::MediaSource> source)
80 {
81 dataSrc_ = source->GetDataSrc();
82 FALSE_RETURN_V(dataSrc_ != nullptr, Status::ERROR_INVALID_PARAMETER);
83 int64_t size = 0;
84 if (dataSrc_->GetSize(size) != 0) {
85 MEDIA_LOG_E("Get size failed");
86 }
87 size_ = size;
88 seekable_ = size_ == -1 ? Plugins::Seekable::UNSEEKABLE : Plugins::Seekable::SEEKABLE;
89 MEDIA_LOG_I("SetSource, size_: " PUBLIC_LOG_D64 ", seekable_: " PUBLIC_LOG_D32, size_, seekable_);
90 return Status::OK;
91 }
92
SetCallback(Plugins::Callback * cb)93 Status DataStreamSourcePlugin::SetCallback(Plugins::Callback* cb)
94 {
95 MEDIA_LOG_D("IN");
96 callback_ = cb;
97 MEDIA_LOG_D("OUT");
98 return Status::OK;
99 }
100
WrapAVSharedMemory(const std::shared_ptr<AVSharedMemory> & avSharedMemory,int32_t realLen)101 std::shared_ptr<Plugins::Buffer> DataStreamSourcePlugin::WrapAVSharedMemory(
102 const std::shared_ptr<AVSharedMemory>& avSharedMemory, int32_t realLen)
103 {
104 std::shared_ptr<Plugins::Buffer> buffer = std::make_shared<Plugins::Buffer>();
105 std::shared_ptr<uint8_t> address = std::shared_ptr<uint8_t>(avSharedMemory->GetBase(),
106 [avSharedMemory](uint8_t* ptr) { ptr = nullptr; });
107 buffer->WrapMemoryPtr(address, avSharedMemory->GetSize(), realLen);
108 return buffer;
109 }
110
InitPool()111 void DataStreamSourcePlugin::InitPool()
112 {
113 AVSharedMemoryPool::InitializeOption InitOption {
114 INIT_MEM_CNT,
115 MEM_SIZE,
116 MAX_MEM_CNT,
117 AVSharedMemory::Flags::FLAGS_READ_WRITE,
118 true,
119 nullptr,
120 };
121 pool_->Init(InitOption);
122 pool_->GetName();
123 pool_->Reset();
124 }
125
GetMemory()126 std::shared_ptr<AVSharedMemory> DataStreamSourcePlugin::GetMemory()
127 {
128 return pool_->AcquireMemory(MEM_SIZE); // 10240
129 }
130
ResetPool()131 void DataStreamSourcePlugin::ResetPool()
132 {
133 pool_->Reset();
134 }
135
WaitForRetry(uint32_t time)136 void DataStreamSourcePlugin::WaitForRetry(uint32_t time)
137 {
138 std::unique_lock<std::mutex> lock(mutex_);
139 readCond_.wait_for(lock, std::chrono::milliseconds(time), [&] {
140 return isInterrupted_.load() || isExitRead_.load();
141 });
142 }
143
Read(std::shared_ptr<Plugins::Buffer> & buffer,uint64_t offset,size_t expectedLen)144 Status DataStreamSourcePlugin::Read(std::shared_ptr<Plugins::Buffer>& buffer, uint64_t offset, size_t expectedLen)
145 {
146 MEDIA_LOG_D("Read, offset: " PUBLIC_LOG_D64 ", expectedLen: " PUBLIC_LOG_ZU ", seekable: " PUBLIC_LOG_D32,
147 offset, expectedLen, seekable_);
148 std::shared_ptr<AVSharedMemory> memory = GetMemory();
149 FALSE_RETURN_V_MSG(memory != nullptr, Status::ERROR_NO_MEMORY, "allocate memory failed!");
150 int32_t realLen = 0;
151 do {
152 if (isInterrupted_ || isExitRead_) {
153 retryTimes_ = 0;
154 isBufferingStart = false;
155 return Status::OK;
156 }
157 FALSE_RETURN_V_MSG(dataSrc_ != nullptr, Status::ERROR_WRONG_STATE, "dataSrc_ is nullptr!");
158 auto ret = ReadAt(memory, expectedLen, realLen);
159 FALSE_RETURN_V(ret == Status::OK, ret);
160 FALSE_RETURN_V_MSG(realLen > MediaDataSourceError::SOURCE_ERROR_IO, Status::ERROR_UNKNOWN,
161 "read data error! realLen:" PUBLIC_LOG_D32, realLen);
162 FALSE_RETURN_V_MSG_W(realLen != MediaDataSourceError::SOURCE_ERROR_EOF, Status::END_OF_STREAM, "eos reached!");
163 if (realLen > 0) {
164 FALSE_LOG_MSG_W(realLen == static_cast<int32_t>(expectedLen), "realLen != expectedLen, realLen:"
165 PUBLIC_LOG_D32 ", expectedLen: " PUBLIC_LOG_ZU, realLen, expectedLen);
166 retryTimes_ = 0;
167 HandleBufferingEnd();
168 break;
169 }
170 if (realLen == 0) {
171 HandleBufferingStart();
172 }
173 WaitForRetry(GetRetryTime());
174 retryTimes_++;
175 } while (retryTimes_ < DEFAULT_RETRY_TIMES);
176 offset_ += static_cast<uint64_t>(realLen);
177 if (buffer && buffer->GetMemory()) {
178 buffer->GetMemory()->Write(memory->GetBase(), realLen, 0);
179 } else {
180 buffer = WrapAVSharedMemory(memory, realLen);
181 }
182 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_AGAIN);
183 MEDIA_LOG_D("DataStreamSourcePlugin Read, size: " PUBLIC_LOG_ZU ", realLen: " PUBLIC_LOG_D32
184 ", retryTimes: " PUBLIC_LOG_U32, (buffer && buffer->GetMemory()) ?
185 buffer->GetMemory()->GetSize() : -100, realLen, retryTimes_); // -100 invalid size
186 FALSE_RETURN_V(realLen != 0, Status::ERROR_AGAIN);
187 return Status::OK;
188 }
189
ReadAt(std::shared_ptr<AVSharedMemory> memory,size_t & expectedLen,int32_t & realLen)190 Status DataStreamSourcePlugin::ReadAt(std::shared_ptr<AVSharedMemory> memory, size_t &expectedLen, int32_t &realLen)
191 {
192 if (seekable_ == Plugins::Seekable::SEEKABLE) {
193 FALSE_RETURN_V(static_cast<int64_t>(offset_) <= size_, Status::END_OF_STREAM);
194 expectedLen = std::min(static_cast<size_t>(size_ - offset_), expectedLen);
195 expectedLen = std::min(static_cast<size_t>(memory->GetSize()), expectedLen);
196 realLen = dataSrc_->ReadAt(static_cast<int64_t>(offset_), expectedLen, memory);
197 } else {
198 expectedLen = std::min(static_cast<size_t>(memory->GetSize()), expectedLen);
199 realLen = dataSrc_->ReadAt(expectedLen, memory);
200 }
201 return Status::OK;
202 }
203
GetRetryTime()204 uint32_t DataStreamSourcePlugin::GetRetryTime()
205 {
206 MEDIA_LOG_I("read again. retryTimes:" PUBLIC_LOG_U32, retryTimes_);
207 FALSE_RETURN_V(retryTimes_ > RETRY_TIMES_ONE, READ_AGAIN_RETRY_TIME_ONE);
208 FALSE_RETURN_V(retryTimes_ > RETRY_TIMES_TWO, READ_AGAIN_RETRY_TIME_TWO);
209 return READ_AGAIN_RETRY_TIME_THREE;
210 }
211
SetInterruptState(bool isInterruptNeeded)212 void DataStreamSourcePlugin::SetInterruptState(bool isInterruptNeeded)
213 {
214 MEDIA_LOG_I("OnInterrupted %{public}d", isInterruptNeeded);
215 std::unique_lock<std::mutex> lock(mutex_);
216 isInterrupted_ = isInterruptNeeded;
217 isExitRead_ = isInterruptNeeded;
218 readCond_.notify_all();
219 }
220
HandleBufferingStart()221 void DataStreamSourcePlugin::HandleBufferingStart()
222 {
223 if (!isBufferingStart) {
224 isBufferingStart = true;
225 if (callback_ != nullptr) {
226 MEDIA_LOG_I("OnEvent BUFFERING_START.");
227 callback_->OnEvent({Plugins::PluginEventType::BUFFERING_START,
228 {BufferingInfoType::BUFFERING_START}, "pause"});
229 }
230 }
231 }
232
HandleBufferingEnd()233 void DataStreamSourcePlugin::HandleBufferingEnd()
234 {
235 if (isBufferingStart) {
236 isBufferingStart = false;
237 if (callback_ != nullptr) {
238 MEDIA_LOG_I("OnEvent BUFFERING_END.");
239 callback_->OnEvent({Plugins::PluginEventType::BUFFERING_END, {BufferingInfoType::BUFFERING_END}, "end"});
240 }
241 }
242 }
243
GetSize(uint64_t & size)244 Status DataStreamSourcePlugin::GetSize(uint64_t& size)
245 {
246 if (seekable_ == Plugins::Seekable::SEEKABLE) {
247 size = static_cast<uint64_t>(size_);
248 } else {
249 size = std::max(static_cast<size_t>(offset_), DEFAULT_PREDOWNLOAD_SIZE_BYTE);
250 }
251 return Status::OK;
252 }
253
GetSeekable()254 Plugins::Seekable DataStreamSourcePlugin::GetSeekable()
255 {
256 return seekable_;
257 }
258
SeekTo(uint64_t offset)259 Status DataStreamSourcePlugin::SeekTo(uint64_t offset)
260 {
261 if (seekable_ == Plugins::Seekable::UNSEEKABLE) {
262 MEDIA_LOG_E("source is unseekable!");
263 return Status::ERROR_INVALID_OPERATION;
264 }
265 if (offset >= static_cast<uint64_t>(size_)) {
266 MEDIA_LOG_E("Invalid parameter");
267 return Status::ERROR_INVALID_PARAMETER;
268 }
269 offset_ = offset;
270 isExitRead_ = false;
271 MEDIA_LOG_D("seek to offset_ " PUBLIC_LOG_U64 " success", offset_);
272 return Status::OK;
273 }
274
Pause()275 Status DataStreamSourcePlugin::Pause()
276 {
277 MEDIA_LOG_I("Pause enter.");
278 isExitRead_ = true;
279 return Status::OK;
280 }
281
Resume()282 Status DataStreamSourcePlugin::Resume()
283 {
284 MEDIA_LOG_I("Resume enter.");
285 isExitRead_ = false;
286 return Status::OK;
287 }
288
Reset()289 Status DataStreamSourcePlugin::Reset()
290 {
291 MEDIA_LOG_I("Reset enter.");
292 isInterrupted_ = true;
293 return Status::OK;
294 }
295
IsNeedPreDownload()296 bool DataStreamSourcePlugin::IsNeedPreDownload()
297 {
298 return true;
299 }
300 } // namespace DataStreamSourcePlugin
301 } // namespace Plugin
302 } // namespace Media
303 } // namespace OHOS
304 #endif