• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define HST_LOG_TAG "DataStreamSourcePlugin"
17 
18 #ifndef OHOS_LITE
19 #include "data_stream_source_plugin.h"
20 #include "common/log.h"
21 #include "common/media_core.h"
22 #include "osal/utils/util.h"
23 
24 namespace {
25 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_SYSTEM_PLAYER, "HiStreamer" };
26 }
27 
28 namespace OHOS {
29 namespace Media {
30 namespace Plugin {
31 namespace DataStreamSource {
32 namespace {
33     constexpr uint32_t INIT_MEM_CNT = 10;
34     constexpr int32_t MEM_SIZE = 10240;
35     constexpr uint32_t MAX_MEM_CNT = 10 * 1024;
36     constexpr size_t DEFAULT_PREDOWNLOAD_SIZE_BYTE = 10 * 1024 * 1024;
37     constexpr uint32_t DEFAULT_RETRY_TIMES = 20;
38     constexpr uint32_t READ_AGAIN_RETRY_TIME_ONE = 100;
39     constexpr uint32_t READ_AGAIN_RETRY_TIME_TWO = 200;
40     constexpr uint32_t READ_AGAIN_RETRY_TIME_THREE = 500;
41     constexpr uint32_t RETRY_TIMES_ONE = 5; // The number of attempts determines the sleep duration.
42     constexpr uint32_t RETRY_TIMES_TWO = 15;
43 }
DataStreamSourcePluginCreator(const std::string & name)44 std::shared_ptr<Plugins::SourcePlugin> DataStreamSourcePluginCreator(const std::string& name)
45 {
46     return std::make_shared<DataStreamSourcePlugin>(name);
47 }
48 
DataStreamSourceRegister(const std::shared_ptr<Plugins::Register> & reg)49 Status DataStreamSourceRegister(const std::shared_ptr<Plugins::Register>& reg)
50 {
51     Plugins::SourcePluginDef definition;
52     definition.name = "DataStreamSource";
53     definition.description = "Data stream source";
54     definition.rank = 100; // 100: max rank
55     Plugins::Capability capability;
56     capability.AppendFixedKey<std::vector<Plugins::ProtocolType>>(
57         Tag::MEDIA_PROTOCOL_TYPE, {Plugins::ProtocolType::STREAM});
58     definition.AddInCaps(capability);
59     definition.SetCreator(DataStreamSourcePluginCreator);
60     return reg->AddPlugin(definition);
61 }
62 
__anon9ef717a80302null63 PLUGIN_DEFINITION(DataStreamSource, Plugins::LicenseType::APACHE_V2, DataStreamSourceRegister, [] {});
64 
DataStreamSourcePlugin(std::string name)65 DataStreamSourcePlugin::DataStreamSourcePlugin(std::string name)
66     : SourcePlugin(std::move(name))
67 {
68     MEDIA_LOG_D("ctor called");
69     pool_ = std::make_shared<AVSharedMemoryPool>("pool");
70     InitPool();
71 }
72 
~DataStreamSourcePlugin()73 DataStreamSourcePlugin::~DataStreamSourcePlugin()
74 {
75     MEDIA_LOG_D("dtor called");
76     ResetPool();
77 }
78 
SetSource(std::shared_ptr<Plugins::MediaSource> source)79 Status DataStreamSourcePlugin::SetSource(std::shared_ptr<Plugins::MediaSource> source)
80 {
81     dataSrc_ = source->GetDataSrc();
82     FALSE_RETURN_V(dataSrc_ != nullptr, Status::ERROR_INVALID_PARAMETER);
83     int64_t size = 0;
84     if (dataSrc_->GetSize(size) != 0) {
85         MEDIA_LOG_E("Get size failed");
86     }
87     size_ = size;
88     seekable_ = size_ == -1 ? Plugins::Seekable::UNSEEKABLE : Plugins::Seekable::SEEKABLE;
89     MEDIA_LOG_I("SetSource, size_: " PUBLIC_LOG_D64 ", seekable_: " PUBLIC_LOG_D32, size_, seekable_);
90     return Status::OK;
91 }
92 
SetCallback(Plugins::Callback * cb)93 Status DataStreamSourcePlugin::SetCallback(Plugins::Callback* cb)
94 {
95     MEDIA_LOG_D("IN");
96     callback_ = cb;
97     MEDIA_LOG_D("OUT");
98     return Status::OK;
99 }
100 
WrapAVSharedMemory(const std::shared_ptr<AVSharedMemory> & avSharedMemory,int32_t realLen)101 std::shared_ptr<Plugins::Buffer> DataStreamSourcePlugin::WrapAVSharedMemory(
102     const std::shared_ptr<AVSharedMemory>& avSharedMemory, int32_t realLen)
103 {
104     std::shared_ptr<Plugins::Buffer> buffer = std::make_shared<Plugins::Buffer>();
105     std::shared_ptr<uint8_t> address = std::shared_ptr<uint8_t>(avSharedMemory->GetBase(),
106                                                                 [avSharedMemory](uint8_t* ptr) { ptr = nullptr; });
107     buffer->WrapMemoryPtr(address, avSharedMemory->GetSize(), realLen);
108     return buffer;
109 }
110 
InitPool()111 void DataStreamSourcePlugin::InitPool()
112 {
113     AVSharedMemoryPool::InitializeOption InitOption {
114         INIT_MEM_CNT,
115         MEM_SIZE,
116         MAX_MEM_CNT,
117         AVSharedMemory::Flags::FLAGS_READ_WRITE,
118         true,
119         nullptr,
120     };
121     pool_->Init(InitOption);
122     pool_->GetName();
123     pool_->Reset();
124 }
125 
GetMemory()126 std::shared_ptr<AVSharedMemory> DataStreamSourcePlugin::GetMemory()
127 {
128     return pool_->AcquireMemory(MEM_SIZE); // 10240
129 }
130 
ResetPool()131 void DataStreamSourcePlugin::ResetPool()
132 {
133     pool_->Reset();
134 }
135 
WaitForRetry(uint32_t time)136 void DataStreamSourcePlugin::WaitForRetry(uint32_t time)
137 {
138     std::unique_lock<std::mutex> lock(mutex_);
139     readCond_.wait_for(lock, std::chrono::milliseconds(time), [&] {
140         return isInterrupted_.load() || isExitRead_.load();
141     });
142 }
143 
Read(std::shared_ptr<Plugins::Buffer> & buffer,uint64_t offset,size_t expectedLen)144 Status DataStreamSourcePlugin::Read(std::shared_ptr<Plugins::Buffer>& buffer, uint64_t offset, size_t expectedLen)
145 {
146     MEDIA_LOG_D("Read, offset: " PUBLIC_LOG_D64 ", expectedLen: " PUBLIC_LOG_ZU ", seekable: " PUBLIC_LOG_D32,
147         offset, expectedLen, seekable_);
148     std::shared_ptr<AVSharedMemory> memory = GetMemory();
149     FALSE_RETURN_V_MSG(memory != nullptr, Status::ERROR_NO_MEMORY, "allocate memory failed!");
150     int32_t realLen = 0;
151     do {
152         if (isInterrupted_ || isExitRead_) {
153             retryTimes_ = 0;
154             isBufferingStart = false;
155             return Status::OK;
156         }
157         if (seekable_ == Plugins::Seekable::SEEKABLE) {
158             FALSE_RETURN_V(static_cast<int64_t>(offset_) <= size_, Status::END_OF_STREAM);
159             expectedLen = std::min(static_cast<size_t>(size_ - offset_), expectedLen);
160             expectedLen = std::min(static_cast<size_t>(memory->GetSize()), expectedLen);
161             realLen = dataSrc_->ReadAt(static_cast<int64_t>(offset_), expectedLen, memory);
162         } else {
163             expectedLen = std::min(static_cast<size_t>(memory->GetSize()), expectedLen);
164             realLen = dataSrc_->ReadAt(expectedLen, memory);
165         }
166         FALSE_RETURN_V_MSG(realLen > MediaDataSourceError::SOURCE_ERROR_IO, Status::ERROR_UNKNOWN,
167             "read data error! realLen:" PUBLIC_LOG_D32, realLen);
168         FALSE_RETURN_V_MSG_W(realLen != MediaDataSourceError::SOURCE_ERROR_EOF, Status::END_OF_STREAM, "eos reached!");
169         if (realLen > 0) {
170             FALSE_LOG_MSG_W(realLen == static_cast<int32_t>(expectedLen), "realLen != expectedLen, realLen:"
171                 PUBLIC_LOG_D32 ", expectedLen: " PUBLIC_LOG_ZU, realLen, expectedLen);
172             retryTimes_ = 0;
173             HandleBufferingEnd();
174             break;
175         }
176         if (realLen == 0) {
177             HandleBufferingStart();
178         }
179         WaitForRetry(GetRetryTime());
180         retryTimes_++;
181     } while (retryTimes_ < DEFAULT_RETRY_TIMES);
182     offset_ += static_cast<uint64_t>(realLen);
183     if (buffer && buffer->GetMemory()) {
184         buffer->GetMemory()->Write(memory->GetBase(), realLen, 0);
185     } else {
186         buffer = WrapAVSharedMemory(memory, realLen);
187     }
188     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_AGAIN);
189     MEDIA_LOG_D("DataStreamSourcePlugin Read, size: " PUBLIC_LOG_ZU ", realLen: " PUBLIC_LOG_D32
190         ", retryTimes: " PUBLIC_LOG_U32, (buffer && buffer->GetMemory()) ?
191         buffer->GetMemory()->GetSize() : -100, realLen, retryTimes_); // -100 invalid size
192     FALSE_RETURN_V(realLen != 0, Status::ERROR_AGAIN);
193     return Status::OK;
194 }
195 
GetRetryTime()196 uint32_t DataStreamSourcePlugin::GetRetryTime()
197 {
198     MEDIA_LOG_I("read again. retryTimes:" PUBLIC_LOG_U32, retryTimes_);
199     FALSE_RETURN_V(retryTimes_ > RETRY_TIMES_ONE, READ_AGAIN_RETRY_TIME_ONE);
200     FALSE_RETURN_V(retryTimes_ > RETRY_TIMES_TWO, READ_AGAIN_RETRY_TIME_TWO);
201     return READ_AGAIN_RETRY_TIME_THREE;
202 }
203 
SetInterruptState(bool isInterruptNeeded)204 void DataStreamSourcePlugin::SetInterruptState(bool isInterruptNeeded)
205 {
206     MEDIA_LOG_I("OnInterrupted %{public}d", isInterruptNeeded);
207     std::unique_lock<std::mutex> lock(mutex_);
208     isInterrupted_ = isInterruptNeeded;
209     isExitRead_ = isInterruptNeeded;
210     readCond_.notify_all();
211 }
212 
HandleBufferingStart()213 void DataStreamSourcePlugin::HandleBufferingStart()
214 {
215     if (!isBufferingStart) {
216         isBufferingStart = true;
217         if (callback_ != nullptr) {
218             MEDIA_LOG_I("OnEvent BUFFERING_START.");
219             callback_->OnEvent({Plugins::PluginEventType::BUFFERING_START,
220                 {BufferingInfoType::BUFFERING_START}, "pause"});
221         }
222     }
223 }
224 
HandleBufferingEnd()225 void DataStreamSourcePlugin::HandleBufferingEnd()
226 {
227     if (isBufferingStart) {
228         isBufferingStart = false;
229         if (callback_ != nullptr) {
230             MEDIA_LOG_I("OnEvent BUFFERING_END.");
231             callback_->OnEvent({Plugins::PluginEventType::BUFFERING_END, {BufferingInfoType::BUFFERING_END}, "end"});
232         }
233     }
234 }
235 
GetSize(uint64_t & size)236 Status DataStreamSourcePlugin::GetSize(uint64_t& size)
237 {
238     if (seekable_ == Plugins::Seekable::SEEKABLE) {
239         size = static_cast<uint64_t>(size_);
240     } else {
241         size = std::max(static_cast<size_t>(offset_), DEFAULT_PREDOWNLOAD_SIZE_BYTE);
242     }
243     return Status::OK;
244 }
245 
GetSeekable()246 Plugins::Seekable DataStreamSourcePlugin::GetSeekable()
247 {
248     return seekable_;
249 }
250 
SeekTo(uint64_t offset)251 Status DataStreamSourcePlugin::SeekTo(uint64_t offset)
252 {
253     if (seekable_ == Plugins::Seekable::UNSEEKABLE) {
254         MEDIA_LOG_E("source is unseekable!");
255         return Status::ERROR_INVALID_OPERATION;
256     }
257     if (offset >= static_cast<uint64_t>(size_)) {
258         MEDIA_LOG_E("Invalid parameter");
259         return Status::ERROR_INVALID_PARAMETER;
260     }
261     offset_ = offset;
262     isExitRead_ = false;
263     MEDIA_LOG_D("seek to offset_ " PUBLIC_LOG_U64 " success", offset_);
264     return Status::OK;
265 }
266 
Pause()267 Status DataStreamSourcePlugin::Pause()
268 {
269     MEDIA_LOG_I("Pause enter.");
270     isExitRead_ = true;
271     return Status::OK;
272 }
273 
Resume()274 Status DataStreamSourcePlugin::Resume()
275 {
276     MEDIA_LOG_I("Resume enter.");
277     isExitRead_ = false;
278     return Status::OK;
279 }
280 
Reset()281 Status DataStreamSourcePlugin::Reset()
282 {
283     MEDIA_LOG_I("Reset enter.");
284     isInterrupted_ = true;
285     return Status::OK;
286 }
287 
IsNeedPreDownload()288 bool DataStreamSourcePlugin::IsNeedPreDownload()
289 {
290     return true;
291 }
292 } // namespace DataStreamSourcePlugin
293 } // namespace Plugin
294 } // namespace Media
295 } // namespace OHOS
296 #endif