• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define HST_LOG_TAG "DataStreamSourcePlugin"
17 
18 #ifndef OHOS_LITE
19 #include "data_stream_source_plugin.h"
20 #include "common/log.h"
21 #include "common/media_core.h"
22 #include "osal/utils/util.h"
23 
24 namespace {
25 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_SYSTEM_PLAYER, "HiStreamer" };
26 }
27 
28 namespace OHOS {
29 namespace Media {
30 namespace Plugin {
31 namespace DataStreamSource {
32 namespace {
33     constexpr uint32_t INIT_MEM_CNT = 10;
34     constexpr int32_t MEM_SIZE = 10240;
35     constexpr uint32_t MAX_MEM_CNT = 10 * 1024;
36     constexpr size_t DEFAULT_PREDOWNLOAD_SIZE_BYTE = 10 * 1024 * 1024;
37     constexpr uint32_t DEFAULT_RETRY_TIMES = 20;
38     constexpr uint32_t READ_AGAIN_RETRY_TIME_ONE = 100;
39     constexpr uint32_t READ_AGAIN_RETRY_TIME_TWO = 200;
40     constexpr uint32_t READ_AGAIN_RETRY_TIME_THREE = 500;
41     constexpr uint32_t RETRY_TIMES_ONE = 5; // The number of attempts determines the sleep duration.
42     constexpr uint32_t RETRY_TIMES_TWO = 15;
43 }
DataStreamSourcePluginCreator(const std::string & name)44 std::shared_ptr<Plugins::SourcePlugin> DataStreamSourcePluginCreator(const std::string& name)
45 {
46     return std::make_shared<DataStreamSourcePlugin>(name);
47 }
48 
DataStreamSourceRegister(const std::shared_ptr<Plugins::Register> & reg)49 Status DataStreamSourceRegister(const std::shared_ptr<Plugins::Register>& reg)
50 {
51     Plugins::SourcePluginDef definition;
52     definition.name = "DataStreamSource";
53     definition.description = "Data stream source";
54     definition.rank = 100; // 100: max rank
55     Plugins::Capability capability;
56     capability.AppendFixedKey<std::vector<Plugins::ProtocolType>>(
57         Tag::MEDIA_PROTOCOL_TYPE, {Plugins::ProtocolType::STREAM});
58     definition.AddInCaps(capability);
59     definition.SetCreator(DataStreamSourcePluginCreator);
60     return reg->AddPlugin(definition);
61 }
62 
__anona3c22cca0302null63 PLUGIN_DEFINITION(DataStreamSource, Plugins::LicenseType::APACHE_V2, DataStreamSourceRegister, [] {});
64 
DataStreamSourcePlugin(std::string name)65 DataStreamSourcePlugin::DataStreamSourcePlugin(std::string name)
66     : SourcePlugin(std::move(name))
67 {
68     MEDIA_LOG_D("ctor called");
69     pool_ = std::make_shared<AVSharedMemoryPool>("pool");
70     InitPool();
71 }
72 
~DataStreamSourcePlugin()73 DataStreamSourcePlugin::~DataStreamSourcePlugin()
74 {
75     MEDIA_LOG_D("dtor called");
76     ResetPool();
77 }
78 
SetSource(std::shared_ptr<Plugins::MediaSource> source)79 Status DataStreamSourcePlugin::SetSource(std::shared_ptr<Plugins::MediaSource> source)
80 {
81     dataSrc_ = source->GetDataSrc();
82     FALSE_RETURN_V(dataSrc_ != nullptr, Status::ERROR_INVALID_PARAMETER);
83     int64_t size = 0;
84     if (dataSrc_->GetSize(size) != 0) {
85         MEDIA_LOG_E("Get size failed");
86     }
87     size_ = size;
88     seekable_ = size_ == -1 ? Plugins::Seekable::UNSEEKABLE : Plugins::Seekable::SEEKABLE;
89     MEDIA_LOG_I("SetSource, size_: " PUBLIC_LOG_D64 ", seekable_: " PUBLIC_LOG_D32, size_, seekable_);
90     return Status::OK;
91 }
92 
SetCallback(Plugins::Callback * cb)93 Status DataStreamSourcePlugin::SetCallback(Plugins::Callback* cb)
94 {
95     MEDIA_LOG_D("IN");
96     callback_ = cb;
97     MEDIA_LOG_D("OUT");
98     return Status::OK;
99 }
100 
WrapAVSharedMemory(const std::shared_ptr<AVSharedMemory> & avSharedMemory,int32_t realLen)101 std::shared_ptr<Plugins::Buffer> DataStreamSourcePlugin::WrapAVSharedMemory(
102     const std::shared_ptr<AVSharedMemory>& avSharedMemory, int32_t realLen)
103 {
104     std::shared_ptr<Plugins::Buffer> buffer = std::make_shared<Plugins::Buffer>();
105     std::shared_ptr<uint8_t> address = std::shared_ptr<uint8_t>(avSharedMemory->GetBase(),
106                                                                 [avSharedMemory](uint8_t* ptr) { ptr = nullptr; });
107     buffer->WrapMemoryPtr(address, avSharedMemory->GetSize(), realLen);
108     return buffer;
109 }
110 
InitPool()111 void DataStreamSourcePlugin::InitPool()
112 {
113     AVSharedMemoryPool::InitializeOption InitOption {
114         INIT_MEM_CNT,
115         MEM_SIZE,
116         MAX_MEM_CNT,
117         AVSharedMemory::Flags::FLAGS_READ_WRITE,
118         true,
119         nullptr,
120     };
121     pool_->Init(InitOption);
122     pool_->GetName();
123     pool_->Reset();
124 }
125 
GetMemory()126 std::shared_ptr<AVSharedMemory> DataStreamSourcePlugin::GetMemory()
127 {
128     return pool_->AcquireMemory(MEM_SIZE); // 10240
129 }
130 
ResetPool()131 void DataStreamSourcePlugin::ResetPool()
132 {
133     pool_->Reset();
134 }
135 
WaitForRetry(uint32_t time)136 void DataStreamSourcePlugin::WaitForRetry(uint32_t time)
137 {
138     std::unique_lock<std::mutex> lock(mutex_);
139     readCond_.wait_for(lock, std::chrono::milliseconds(time), [&] {
140         return isInterrupted_.load() || isExitRead_.load();
141     });
142 }
143 
Read(std::shared_ptr<Plugins::Buffer> & buffer,uint64_t offset,size_t expectedLen)144 Status DataStreamSourcePlugin::Read(std::shared_ptr<Plugins::Buffer>& buffer, uint64_t offset, size_t expectedLen)
145 {
146     MEDIA_LOG_D("Read, offset: " PUBLIC_LOG_D64 ", expectedLen: " PUBLIC_LOG_ZU ", seekable: " PUBLIC_LOG_D32,
147         offset, expectedLen, seekable_);
148     std::shared_ptr<AVSharedMemory> memory = GetMemory();
149     FALSE_RETURN_V_MSG(memory != nullptr, Status::ERROR_NO_MEMORY, "allocate memory failed!");
150     int32_t realLen = 0;
151     do {
152         if (isInterrupted_ || isExitRead_) {
153             retryTimes_ = 0;
154             isBufferingStart = false;
155             return Status::OK;
156         }
157         FALSE_RETURN_V_MSG(dataSrc_ != nullptr, Status::ERROR_WRONG_STATE, "dataSrc_ is nullptr!");
158         auto ret = ReadAt(memory, expectedLen, realLen);
159         FALSE_RETURN_V(ret == Status::OK, ret);
160         FALSE_RETURN_V_MSG(realLen > MediaDataSourceError::SOURCE_ERROR_IO, Status::ERROR_UNKNOWN,
161             "read data error! realLen:" PUBLIC_LOG_D32, realLen);
162         FALSE_RETURN_V_MSG_W(realLen != MediaDataSourceError::SOURCE_ERROR_EOF, Status::END_OF_STREAM, "eos reached!");
163         if (realLen > 0) {
164             FALSE_LOG_MSG_W(realLen == static_cast<int32_t>(expectedLen), "realLen != expectedLen, realLen:"
165                 PUBLIC_LOG_D32 ", expectedLen: " PUBLIC_LOG_ZU, realLen, expectedLen);
166             retryTimes_ = 0;
167             HandleBufferingEnd();
168             break;
169         }
170         if (realLen == 0) {
171             HandleBufferingStart();
172         }
173         WaitForRetry(GetRetryTime());
174         retryTimes_++;
175     } while (retryTimes_ < DEFAULT_RETRY_TIMES);
176     offset_ += static_cast<uint64_t>(realLen);
177     if (buffer && buffer->GetMemory()) {
178         buffer->GetMemory()->Write(memory->GetBase(), realLen, 0);
179     } else {
180         buffer = WrapAVSharedMemory(memory, realLen);
181     }
182     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_AGAIN);
183     MEDIA_LOG_D("DataStreamSourcePlugin Read, size: " PUBLIC_LOG_ZU ", realLen: " PUBLIC_LOG_D32
184         ", retryTimes: " PUBLIC_LOG_U32, (buffer && buffer->GetMemory()) ?
185         buffer->GetMemory()->GetSize() : -100, realLen, retryTimes_); // -100 invalid size
186     FALSE_RETURN_V(realLen != 0, Status::ERROR_AGAIN);
187     return Status::OK;
188 }
189 
ReadAt(std::shared_ptr<AVSharedMemory> memory,size_t & expectedLen,int32_t & realLen)190 Status DataStreamSourcePlugin::ReadAt(std::shared_ptr<AVSharedMemory> memory, size_t &expectedLen, int32_t &realLen)
191 {
192     if (seekable_ == Plugins::Seekable::SEEKABLE) {
193         FALSE_RETURN_V(static_cast<int64_t>(offset_) <= size_, Status::END_OF_STREAM);
194         expectedLen = std::min(static_cast<size_t>(size_ - offset_), expectedLen);
195         expectedLen = std::min(static_cast<size_t>(memory->GetSize()), expectedLen);
196         realLen = dataSrc_->ReadAt(static_cast<int64_t>(offset_), expectedLen, memory);
197     } else {
198         expectedLen = std::min(static_cast<size_t>(memory->GetSize()), expectedLen);
199         realLen = dataSrc_->ReadAt(expectedLen, memory);
200     }
201     return Status::OK;
202 }
203 
GetRetryTime()204 uint32_t DataStreamSourcePlugin::GetRetryTime()
205 {
206     MEDIA_LOG_I("read again. retryTimes:" PUBLIC_LOG_U32, retryTimes_);
207     FALSE_RETURN_V(retryTimes_ > RETRY_TIMES_ONE, READ_AGAIN_RETRY_TIME_ONE);
208     FALSE_RETURN_V(retryTimes_ > RETRY_TIMES_TWO, READ_AGAIN_RETRY_TIME_TWO);
209     return READ_AGAIN_RETRY_TIME_THREE;
210 }
211 
SetInterruptState(bool isInterruptNeeded)212 void DataStreamSourcePlugin::SetInterruptState(bool isInterruptNeeded)
213 {
214     MEDIA_LOG_I("OnInterrupted %{public}d", isInterruptNeeded);
215     std::unique_lock<std::mutex> lock(mutex_);
216     isInterrupted_ = isInterruptNeeded;
217     isExitRead_ = isInterruptNeeded;
218     readCond_.notify_all();
219 }
220 
HandleBufferingStart()221 void DataStreamSourcePlugin::HandleBufferingStart()
222 {
223     if (!isBufferingStart) {
224         isBufferingStart = true;
225         if (callback_ != nullptr) {
226             MEDIA_LOG_I("OnEvent BUFFERING_START.");
227             callback_->OnEvent({Plugins::PluginEventType::BUFFERING_START,
228                 {BufferingInfoType::BUFFERING_START}, "pause"});
229         }
230     }
231 }
232 
HandleBufferingEnd()233 void DataStreamSourcePlugin::HandleBufferingEnd()
234 {
235     if (isBufferingStart) {
236         isBufferingStart = false;
237         if (callback_ != nullptr) {
238             MEDIA_LOG_I("OnEvent BUFFERING_END.");
239             callback_->OnEvent({Plugins::PluginEventType::BUFFERING_END, {BufferingInfoType::BUFFERING_END}, "end"});
240         }
241     }
242 }
243 
GetSize(uint64_t & size)244 Status DataStreamSourcePlugin::GetSize(uint64_t& size)
245 {
246     if (seekable_ == Plugins::Seekable::SEEKABLE) {
247         size = static_cast<uint64_t>(size_);
248     } else {
249         size = std::max(static_cast<size_t>(offset_), DEFAULT_PREDOWNLOAD_SIZE_BYTE);
250     }
251     return Status::OK;
252 }
253 
GetSeekable()254 Plugins::Seekable DataStreamSourcePlugin::GetSeekable()
255 {
256     return seekable_;
257 }
258 
SeekTo(uint64_t offset)259 Status DataStreamSourcePlugin::SeekTo(uint64_t offset)
260 {
261     if (seekable_ == Plugins::Seekable::UNSEEKABLE) {
262         MEDIA_LOG_E("source is unseekable!");
263         return Status::ERROR_INVALID_OPERATION;
264     }
265     if (offset >= static_cast<uint64_t>(size_)) {
266         MEDIA_LOG_E("Invalid parameter");
267         return Status::ERROR_INVALID_PARAMETER;
268     }
269     offset_ = offset;
270     isExitRead_ = false;
271     MEDIA_LOG_D("seek to offset_ " PUBLIC_LOG_U64 " success", offset_);
272     return Status::OK;
273 }
274 
Pause()275 Status DataStreamSourcePlugin::Pause()
276 {
277     MEDIA_LOG_I("Pause enter.");
278     isExitRead_ = true;
279     return Status::OK;
280 }
281 
Resume()282 Status DataStreamSourcePlugin::Resume()
283 {
284     MEDIA_LOG_I("Resume enter.");
285     isExitRead_ = false;
286     return Status::OK;
287 }
288 
Reset()289 Status DataStreamSourcePlugin::Reset()
290 {
291     MEDIA_LOG_I("Reset enter.");
292     isInterrupted_ = true;
293     return Status::OK;
294 }
295 
IsNeedPreDownload()296 bool DataStreamSourcePlugin::IsNeedPreDownload()
297 {
298     return true;
299 }
300 } // namespace DataStreamSourcePlugin
301 } // namespace Plugin
302 } // namespace Media
303 } // namespace OHOS
304 #endif