1 /*
2 * Copyright (C) 2023-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define HST_LOG_TAG "DataStreamSourcePlugin"
17
18 #ifndef OHOS_LITE
19 #include "data_stream_source_plugin.h"
20 #include "common/log.h"
21 #include "common/media_core.h"
22 #include "osal/utils/util.h"
23
24 namespace {
25 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_SYSTEM_PLAYER, "HiStreamer" };
26 }
27
28 namespace OHOS {
29 namespace Media {
30 namespace Plugin {
31 namespace DataStreamSource {
32 namespace {
33 constexpr uint32_t INIT_MEM_CNT = 10;
34 constexpr int32_t MEM_SIZE = 10240;
35 constexpr uint32_t MAX_MEM_CNT = 10 * 1024;
36 constexpr size_t DEFAULT_PREDOWNLOAD_SIZE_BYTE = 10 * 1024 * 1024;
37 constexpr uint32_t DEFAULT_RETRY_TIMES = 20;
38 constexpr uint32_t READ_AGAIN_RETRY_TIME_ONE = 100;
39 constexpr uint32_t READ_AGAIN_RETRY_TIME_TWO = 200;
40 constexpr uint32_t READ_AGAIN_RETRY_TIME_THREE = 500;
41 constexpr uint32_t RETRY_TIMES_ONE = 5; // The number of attempts determines the sleep duration.
42 constexpr uint32_t RETRY_TIMES_TWO = 15;
43 }
DataStreamSourcePluginCreator(const std::string & name)44 std::shared_ptr<Plugins::SourcePlugin> DataStreamSourcePluginCreator(const std::string& name)
45 {
46 return std::make_shared<DataStreamSourcePlugin>(name);
47 }
48
DataStreamSourceRegister(const std::shared_ptr<Plugins::Register> & reg)49 Status DataStreamSourceRegister(const std::shared_ptr<Plugins::Register>& reg)
50 {
51 Plugins::SourcePluginDef definition;
52 definition.name = "DataStreamSource";
53 definition.description = "Data stream source";
54 definition.rank = 100; // 100: max rank
55 Plugins::Capability capability;
56 capability.AppendFixedKey<std::vector<Plugins::ProtocolType>>(
57 Tag::MEDIA_PROTOCOL_TYPE, {Plugins::ProtocolType::STREAM});
58 definition.AddInCaps(capability);
59 definition.SetCreator(DataStreamSourcePluginCreator);
60 return reg->AddPlugin(definition);
61 }
62
__anon9ef717a80302null63 PLUGIN_DEFINITION(DataStreamSource, Plugins::LicenseType::APACHE_V2, DataStreamSourceRegister, [] {});
64
DataStreamSourcePlugin(std::string name)65 DataStreamSourcePlugin::DataStreamSourcePlugin(std::string name)
66 : SourcePlugin(std::move(name))
67 {
68 MEDIA_LOG_D("ctor called");
69 pool_ = std::make_shared<AVSharedMemoryPool>("pool");
70 InitPool();
71 }
72
~DataStreamSourcePlugin()73 DataStreamSourcePlugin::~DataStreamSourcePlugin()
74 {
75 MEDIA_LOG_D("dtor called");
76 ResetPool();
77 }
78
SetSource(std::shared_ptr<Plugins::MediaSource> source)79 Status DataStreamSourcePlugin::SetSource(std::shared_ptr<Plugins::MediaSource> source)
80 {
81 dataSrc_ = source->GetDataSrc();
82 FALSE_RETURN_V(dataSrc_ != nullptr, Status::ERROR_INVALID_PARAMETER);
83 int64_t size = 0;
84 if (dataSrc_->GetSize(size) != 0) {
85 MEDIA_LOG_E("Get size failed");
86 }
87 size_ = size;
88 seekable_ = size_ == -1 ? Plugins::Seekable::UNSEEKABLE : Plugins::Seekable::SEEKABLE;
89 MEDIA_LOG_I("SetSource, size_: " PUBLIC_LOG_D64 ", seekable_: " PUBLIC_LOG_D32, size_, seekable_);
90 return Status::OK;
91 }
92
SetCallback(Plugins::Callback * cb)93 Status DataStreamSourcePlugin::SetCallback(Plugins::Callback* cb)
94 {
95 MEDIA_LOG_D("IN");
96 callback_ = cb;
97 MEDIA_LOG_D("OUT");
98 return Status::OK;
99 }
100
WrapAVSharedMemory(const std::shared_ptr<AVSharedMemory> & avSharedMemory,int32_t realLen)101 std::shared_ptr<Plugins::Buffer> DataStreamSourcePlugin::WrapAVSharedMemory(
102 const std::shared_ptr<AVSharedMemory>& avSharedMemory, int32_t realLen)
103 {
104 std::shared_ptr<Plugins::Buffer> buffer = std::make_shared<Plugins::Buffer>();
105 std::shared_ptr<uint8_t> address = std::shared_ptr<uint8_t>(avSharedMemory->GetBase(),
106 [avSharedMemory](uint8_t* ptr) { ptr = nullptr; });
107 buffer->WrapMemoryPtr(address, avSharedMemory->GetSize(), realLen);
108 return buffer;
109 }
110
InitPool()111 void DataStreamSourcePlugin::InitPool()
112 {
113 AVSharedMemoryPool::InitializeOption InitOption {
114 INIT_MEM_CNT,
115 MEM_SIZE,
116 MAX_MEM_CNT,
117 AVSharedMemory::Flags::FLAGS_READ_WRITE,
118 true,
119 nullptr,
120 };
121 pool_->Init(InitOption);
122 pool_->GetName();
123 pool_->Reset();
124 }
125
GetMemory()126 std::shared_ptr<AVSharedMemory> DataStreamSourcePlugin::GetMemory()
127 {
128 return pool_->AcquireMemory(MEM_SIZE); // 10240
129 }
130
ResetPool()131 void DataStreamSourcePlugin::ResetPool()
132 {
133 pool_->Reset();
134 }
135
WaitForRetry(uint32_t time)136 void DataStreamSourcePlugin::WaitForRetry(uint32_t time)
137 {
138 std::unique_lock<std::mutex> lock(mutex_);
139 readCond_.wait_for(lock, std::chrono::milliseconds(time), [&] {
140 return isInterrupted_.load() || isExitRead_.load();
141 });
142 }
143
Read(std::shared_ptr<Plugins::Buffer> & buffer,uint64_t offset,size_t expectedLen)144 Status DataStreamSourcePlugin::Read(std::shared_ptr<Plugins::Buffer>& buffer, uint64_t offset, size_t expectedLen)
145 {
146 MEDIA_LOG_D("Read, offset: " PUBLIC_LOG_D64 ", expectedLen: " PUBLIC_LOG_ZU ", seekable: " PUBLIC_LOG_D32,
147 offset, expectedLen, seekable_);
148 std::shared_ptr<AVSharedMemory> memory = GetMemory();
149 FALSE_RETURN_V_MSG(memory != nullptr, Status::ERROR_NO_MEMORY, "allocate memory failed!");
150 int32_t realLen = 0;
151 do {
152 if (isInterrupted_ || isExitRead_) {
153 retryTimes_ = 0;
154 isBufferingStart = false;
155 return Status::OK;
156 }
157 if (seekable_ == Plugins::Seekable::SEEKABLE) {
158 FALSE_RETURN_V(static_cast<int64_t>(offset_) <= size_, Status::END_OF_STREAM);
159 expectedLen = std::min(static_cast<size_t>(size_ - offset_), expectedLen);
160 expectedLen = std::min(static_cast<size_t>(memory->GetSize()), expectedLen);
161 realLen = dataSrc_->ReadAt(static_cast<int64_t>(offset_), expectedLen, memory);
162 } else {
163 expectedLen = std::min(static_cast<size_t>(memory->GetSize()), expectedLen);
164 realLen = dataSrc_->ReadAt(expectedLen, memory);
165 }
166 FALSE_RETURN_V_MSG(realLen > MediaDataSourceError::SOURCE_ERROR_IO, Status::ERROR_UNKNOWN,
167 "read data error! realLen:" PUBLIC_LOG_D32, realLen);
168 FALSE_RETURN_V_MSG_W(realLen != MediaDataSourceError::SOURCE_ERROR_EOF, Status::END_OF_STREAM, "eos reached!");
169 if (realLen > 0) {
170 FALSE_LOG_MSG_W(realLen == static_cast<int32_t>(expectedLen), "realLen != expectedLen, realLen:"
171 PUBLIC_LOG_D32 ", expectedLen: " PUBLIC_LOG_ZU, realLen, expectedLen);
172 retryTimes_ = 0;
173 HandleBufferingEnd();
174 break;
175 }
176 if (realLen == 0) {
177 HandleBufferingStart();
178 }
179 WaitForRetry(GetRetryTime());
180 retryTimes_++;
181 } while (retryTimes_ < DEFAULT_RETRY_TIMES);
182 offset_ += static_cast<uint64_t>(realLen);
183 if (buffer && buffer->GetMemory()) {
184 buffer->GetMemory()->Write(memory->GetBase(), realLen, 0);
185 } else {
186 buffer = WrapAVSharedMemory(memory, realLen);
187 }
188 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_AGAIN);
189 MEDIA_LOG_D("DataStreamSourcePlugin Read, size: " PUBLIC_LOG_ZU ", realLen: " PUBLIC_LOG_D32
190 ", retryTimes: " PUBLIC_LOG_U32, (buffer && buffer->GetMemory()) ?
191 buffer->GetMemory()->GetSize() : -100, realLen, retryTimes_); // -100 invalid size
192 FALSE_RETURN_V(realLen != 0, Status::ERROR_AGAIN);
193 return Status::OK;
194 }
195
GetRetryTime()196 uint32_t DataStreamSourcePlugin::GetRetryTime()
197 {
198 MEDIA_LOG_I("read again. retryTimes:" PUBLIC_LOG_U32, retryTimes_);
199 FALSE_RETURN_V(retryTimes_ > RETRY_TIMES_ONE, READ_AGAIN_RETRY_TIME_ONE);
200 FALSE_RETURN_V(retryTimes_ > RETRY_TIMES_TWO, READ_AGAIN_RETRY_TIME_TWO);
201 return READ_AGAIN_RETRY_TIME_THREE;
202 }
203
SetInterruptState(bool isInterruptNeeded)204 void DataStreamSourcePlugin::SetInterruptState(bool isInterruptNeeded)
205 {
206 MEDIA_LOG_I("OnInterrupted %{public}d", isInterruptNeeded);
207 std::unique_lock<std::mutex> lock(mutex_);
208 isInterrupted_ = isInterruptNeeded;
209 isExitRead_ = isInterruptNeeded;
210 readCond_.notify_all();
211 }
212
HandleBufferingStart()213 void DataStreamSourcePlugin::HandleBufferingStart()
214 {
215 if (!isBufferingStart) {
216 isBufferingStart = true;
217 if (callback_ != nullptr) {
218 MEDIA_LOG_I("OnEvent BUFFERING_START.");
219 callback_->OnEvent({Plugins::PluginEventType::BUFFERING_START,
220 {BufferingInfoType::BUFFERING_START}, "pause"});
221 }
222 }
223 }
224
HandleBufferingEnd()225 void DataStreamSourcePlugin::HandleBufferingEnd()
226 {
227 if (isBufferingStart) {
228 isBufferingStart = false;
229 if (callback_ != nullptr) {
230 MEDIA_LOG_I("OnEvent BUFFERING_END.");
231 callback_->OnEvent({Plugins::PluginEventType::BUFFERING_END, {BufferingInfoType::BUFFERING_END}, "end"});
232 }
233 }
234 }
235
GetSize(uint64_t & size)236 Status DataStreamSourcePlugin::GetSize(uint64_t& size)
237 {
238 if (seekable_ == Plugins::Seekable::SEEKABLE) {
239 size = static_cast<uint64_t>(size_);
240 } else {
241 size = std::max(static_cast<size_t>(offset_), DEFAULT_PREDOWNLOAD_SIZE_BYTE);
242 }
243 return Status::OK;
244 }
245
GetSeekable()246 Plugins::Seekable DataStreamSourcePlugin::GetSeekable()
247 {
248 return seekable_;
249 }
250
SeekTo(uint64_t offset)251 Status DataStreamSourcePlugin::SeekTo(uint64_t offset)
252 {
253 if (seekable_ == Plugins::Seekable::UNSEEKABLE) {
254 MEDIA_LOG_E("source is unseekable!");
255 return Status::ERROR_INVALID_OPERATION;
256 }
257 if (offset >= static_cast<uint64_t>(size_)) {
258 MEDIA_LOG_E("Invalid parameter");
259 return Status::ERROR_INVALID_PARAMETER;
260 }
261 offset_ = offset;
262 isExitRead_ = false;
263 MEDIA_LOG_D("seek to offset_ " PUBLIC_LOG_U64 " success", offset_);
264 return Status::OK;
265 }
266
Pause()267 Status DataStreamSourcePlugin::Pause()
268 {
269 MEDIA_LOG_I("Pause enter.");
270 isExitRead_ = true;
271 return Status::OK;
272 }
273
Resume()274 Status DataStreamSourcePlugin::Resume()
275 {
276 MEDIA_LOG_I("Resume enter.");
277 isExitRead_ = false;
278 return Status::OK;
279 }
280
Reset()281 Status DataStreamSourcePlugin::Reset()
282 {
283 MEDIA_LOG_I("Reset enter.");
284 isInterrupted_ = true;
285 return Status::OK;
286 }
287
IsNeedPreDownload()288 bool DataStreamSourcePlugin::IsNeedPreDownload()
289 {
290 return true;
291 }
292 } // namespace DataStreamSourcePlugin
293 } // namespace Plugin
294 } // namespace Media
295 } // namespace OHOS
296 #endif