1 /*
2 * Copyright (c) 2021-2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define HST_LOG_TAG "StreamSourcePlugin"
17
18 #include "stream_source_plugin.h"
19 #include "plugin/common/plugin_buffer.h"
20 #include "plugin/common/plugin_source_tags.h"
21 #include "plugin/core/plugin_manager.h"
22
23 namespace OHOS {
24 namespace Media {
25 namespace Plugin {
26 namespace StreamSource {
StreamSourcePluginCreator(const std::string & name)27 std::shared_ptr<SourcePlugin> StreamSourcePluginCreator(const std::string& name)
28 {
29 return std::make_shared<StreamSourcePlugin>(name);
30 }
31
StreamSourceRegister(const std::shared_ptr<Register> & reg)32 const Status StreamSourceRegister(const std::shared_ptr<Register>& reg)
33 {
34 SourcePluginDef definition;
35 definition.name = "StreamSource";
36 definition.description = "Stream source";
37 definition.rank = 100; // 100: max rank
38 definition.protocol.emplace_back(ProtocolType::STREAM);
39 definition.creator = StreamSourcePluginCreator;
40 return reg->AddPlugin(definition);
41 }
42
__anon4a9a5d2b0102null43 PLUGIN_DEFINITION(StreamSource, LicenseType::APACHE_V2, StreamSourceRegister, [] {});
44
Alloc(size_t size)45 void* StreamSourceAllocator::Alloc(size_t size)
46 {
47 if (size == 0) {
48 return nullptr;
49 }
50 return static_cast<void*>(new (std::nothrow) uint8_t[size]);
51 }
52
Free(void * ptr)53 void StreamSourceAllocator::Free(void* ptr) // NOLINT: void*
54 {
55 if (ptr != nullptr) {
56 delete[](uint8_t*) ptr;
57 }
58 }
59
StreamSourceCallback(std::shared_ptr<StreamSourcePlugin> dataSource,std::shared_ptr<StreamSource> & stream)60 StreamSourceCallback::StreamSourceCallback(std::shared_ptr<StreamSourcePlugin> dataSource,
61 std::shared_ptr<StreamSource>& stream)
62 : dataSource_(dataSource), streamSource_(stream)
63 {
64 }
65
GetBuffer(size_t index)66 uint8_t* StreamSourceCallback::GetBuffer(size_t index)
67 {
68 auto bufferPtr = dataSource_->FindBuffer(index);
69 return bufferPtr->GetMemory()->GetWritableAddr(bufferPtr->GetMemory()->GetCapacity());
70 }
71
QueueBuffer(size_t index,size_t offset,size_t size,int64_t timestampUs,uint32_t flags)72 void StreamSourceCallback::QueueBuffer(size_t index, size_t offset, size_t size, int64_t timestampUs, uint32_t flags)
73 {
74 auto bufferPtr = dataSource_->FindBuffer(index);
75 dataSource_->EraseBuffer(index);
76 bufferPtr->GetMemory()->UpdateDataSize(size);
77 dataSource_->EnqueBuffer(bufferPtr);
78 }
79
StreamSourcePlugin(std::string name)80 StreamSourcePlugin::StreamSourcePlugin(std::string name)
81 : SourcePlugin(std::move(name)),
82 bufferPool_(0),
83 state_(State::CREATED),
84 isSeekable_(false),
85 waitBuffers_(),
86 bufferQueue_("SourceBuffQue")
87 {
88 MEDIA_LOG_D("ctor called");
89 }
90
~StreamSourcePlugin()91 StreamSourcePlugin::~StreamSourcePlugin()
92 {
93 MEDIA_LOG_D("dtor called");
94 state_ = State::DESTROYED;
95 }
96
Init()97 Status StreamSourcePlugin::Init()
98 {
99 MEDIA_LOG_D("IN");
100 bufferPool_.Init(DEFAULT_FRAME_SIZE);
101 mAllocator_ = std::make_shared<StreamSourceAllocator>();
102 state_ = State::INITIALIZED;
103 return Status::OK;
104 }
105
Deinit()106 Status StreamSourcePlugin::Deinit()
107 {
108 MEDIA_LOG_D("IN");
109 state_ = State::DESTROYED;
110 return Status::OK;
111 }
112
Prepare()113 Status StreamSourcePlugin::Prepare()
114 {
115 MEDIA_LOG_D("IN");
116 state_ = State::PREPARED;
117 return Status::OK;
118 }
119
Reset()120 Status StreamSourcePlugin::Reset()
121 {
122 MEDIA_LOG_D("IN");
123 state_ = State::INITIALIZED;
124 return Status::OK;
125 }
126
Start()127 Status StreamSourcePlugin::Start()
128 {
129 MEDIA_LOG_D("IN");
130 bufferPool_.SetActive(true);
131 taskPtr_->Start();
132 state_ = State::RUNNING;
133 return Status::OK;
134 }
135
Stop()136 Status StreamSourcePlugin::Stop()
137 {
138 MEDIA_LOG_D("IN");
139 bufferQueue_.SetActive(false);
140 taskPtr_->Stop();
141 state_ = State::PREPARED;
142 return Status::OK;
143 }
144
IsParameterSupported(Tag tag)145 bool StreamSourcePlugin::IsParameterSupported(Tag tag)
146 {
147 MEDIA_LOG_D("IN");
148 return true;
149 }
150
GetParameter(Tag tag,ValueType & value)151 Status StreamSourcePlugin::GetParameter(Tag tag, ValueType& value)
152 {
153 MEDIA_LOG_D("IN");
154 return Status::OK;
155 }
156
SetParameter(Tag tag,const ValueType & value)157 Status StreamSourcePlugin::SetParameter(Tag tag, const ValueType& value)
158 {
159 MEDIA_LOG_D("IN");
160 return Status::OK;
161 }
162
GetAllocator()163 std::shared_ptr<Allocator> StreamSourcePlugin::GetAllocator()
164 {
165 MEDIA_LOG_D("IN");
166 return mAllocator_;
167 }
168
SetCallback(Callback * cb)169 Status StreamSourcePlugin::SetCallback(Callback* cb)
170 {
171 MEDIA_LOG_D("IN");
172 return Status::OK;
173 }
174
SetSource(std::shared_ptr<MediaSource> source)175 Status StreamSourcePlugin::SetSource(std::shared_ptr<MediaSource> source)
176 {
177 auto source_ = std::make_shared<OHOS::Media::Source>("");
178 std::shared_ptr<StreamSource> stream_ = source_->GetSourceStream();
179 if (stream_ == nullptr) {
180 MEDIA_LOG_E("Get StreamSource fail");
181 return Status::ERROR_INVALID_PARAMETER;
182 }
183
184 streamCallback_ = std::make_shared<StreamSourceCallback>(shared_from_this(), stream_);
185 stream_->SetStreamCallback(streamCallback_);
186 streamSource_ = stream_;
187 taskPtr_ = std::make_shared<OSAL::Task>("StreamSource");
188 taskPtr_->RegisterHandler(std::bind(&StreamSourcePlugin::NotifyAvilableBufferLoop, this));
189 return Status::OK;
190 }
191
Read(std::shared_ptr<Buffer> & buffer,size_t expectedLen)192 Status StreamSourcePlugin::Read(std::shared_ptr<Buffer>& buffer, size_t expectedLen)
193 {
194 AVBufferPtr bufPtr_ = bufferQueue_.Pop(); // the cached buffer
195 auto availSize = bufPtr_->GetMemory()->GetSize();
196 MEDIA_LOG_D("availSize: %" PUBLIC_LOG "zu, expectedLen: %" PUBLIC_LOG "zu", availSize, expectedLen);
197 if (buffer->IsEmpty()) { // No buffer provided, use the cached buffer.
198 buffer = bufPtr_;
199 return Status::OK;
200 } else { // Buffer provided, copy it.
201 if (buffer->GetMemory()->GetCapacity() < availSize) {
202 MEDIA_LOG_D("buffer->length: %" PUBLIC_LOG "zu is smaller than %" PUBLIC_LOG "zu",
203 buffer->GetMemory()->GetCapacity(), availSize);
204 return Status::ERROR_NO_MEMORY;
205 }
206 buffer->GetMemory()->Write(bufPtr_->GetMemory()->GetReadOnlyData(), availSize);
207 }
208 return Status::OK;
209 }
210
GetSize(size_t & size)211 Status StreamSourcePlugin::GetSize(size_t& size)
212 {
213 MEDIA_LOG_D("IN");
214 size = -1;
215 return Status::ERROR_WRONG_STATE;
216 }
217
IsSeekable()218 bool StreamSourcePlugin::IsSeekable()
219 {
220 MEDIA_LOG_D("IN");
221 return isSeekable_;
222 }
223
SeekTo(uint64_t offset)224 Status StreamSourcePlugin::SeekTo(uint64_t offset)
225 {
226 MEDIA_LOG_D("IN");
227 return Status::ERROR_UNIMPLEMENTED;
228 }
229
AllocateBuffer()230 AVBufferPtr StreamSourcePlugin::AllocateBuffer()
231 {
232 return bufferPool_.AllocateBuffer();
233 }
234
FindBuffer(size_t idx)235 AVBufferPtr StreamSourcePlugin::FindBuffer(size_t idx)
236 {
237 OSAL::ScopedLock lock(mutex_);
238 auto it = waitBuffers_.find(idx);
239 if (it != waitBuffers_.end()) {
240 return it->second;
241 }
242 return nullptr;
243 }
244
EraseBuffer(size_t idx)245 void StreamSourcePlugin::EraseBuffer(size_t idx)
246 {
247 OSAL::ScopedLock lock(mutex_);
248 waitBuffers_.erase(idx);
249 }
250
EnqueBuffer(AVBufferPtr & bufferPtr)251 void StreamSourcePlugin::EnqueBuffer(AVBufferPtr& bufferPtr)
252 {
253 bufferQueue_.Push(bufferPtr);
254 }
255
NotifyAvilableBufferLoop()256 void StreamSourcePlugin::NotifyAvilableBufferLoop()
257 {
258 auto bufferPtr = AllocateBuffer();
259 if (bufferPtr == nullptr) {
260 MEDIA_LOG_E("Alloc buffer fail");
261 return;
262 }
263 size_t idx = GetUniqueIdx();
264 {
265 OSAL::ScopedLock lock(mutex_);
266 waitBuffers_[idx] = bufferPtr;
267 }
268 std::shared_ptr<StreamSource> stream = streamSource_.lock();
269 stream->OnBufferAvailable(idx, 0, bufferPtr->GetMemory()->GetCapacity());
270 }
271 } // namespace StreamSource
272 } // namespace Plugin
273 } // namespace Media
274 } // namespace OHOS
275