• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define HST_LOG_TAG "StreamSourcePlugin"
17 
18 #include "stream_source_plugin.h"
19 #include "plugin/common/plugin_buffer.h"
20 #include "plugin/common/plugin_source_tags.h"
21 #include "plugin/core/plugin_manager.h"
22 
23 namespace OHOS {
24 namespace Media {
25 namespace Plugin {
26 namespace StreamSource {
StreamSourcePluginCreator(const std::string & name)27 std::shared_ptr<SourcePlugin> StreamSourcePluginCreator(const std::string& name)
28 {
29     return std::make_shared<StreamSourcePlugin>(name);
30 }
31 
StreamSourceRegister(const std::shared_ptr<Register> & reg)32 const Status StreamSourceRegister(const std::shared_ptr<Register>& reg)
33 {
34     SourcePluginDef definition;
35     definition.name = "StreamSource";
36     definition.description = "Stream source";
37     definition.rank = 100; // 100: max rank
38     definition.protocol.emplace_back(ProtocolType::STREAM);
39     definition.creator = StreamSourcePluginCreator;
40     return reg->AddPlugin(definition);
41 }
42 
__anon4a9a5d2b0102null43 PLUGIN_DEFINITION(StreamSource, LicenseType::APACHE_V2, StreamSourceRegister, [] {});
44 
Alloc(size_t size)45 void* StreamSourceAllocator::Alloc(size_t size)
46 {
47     if (size == 0) {
48         return nullptr;
49     }
50     return static_cast<void*>(new (std::nothrow) uint8_t[size]);
51 }
52 
Free(void * ptr)53 void StreamSourceAllocator::Free(void* ptr) // NOLINT: void*
54 {
55     if (ptr != nullptr) {
56         delete[](uint8_t*) ptr;
57     }
58 }
59 
StreamSourceCallback(std::shared_ptr<StreamSourcePlugin> dataSource,std::shared_ptr<StreamSource> & stream)60 StreamSourceCallback::StreamSourceCallback(std::shared_ptr<StreamSourcePlugin> dataSource,
61                                            std::shared_ptr<StreamSource>& stream)
62     : dataSource_(dataSource), streamSource_(stream)
63 {
64 }
65 
GetBuffer(size_t index)66 uint8_t* StreamSourceCallback::GetBuffer(size_t index)
67 {
68     auto bufferPtr = dataSource_->FindBuffer(index);
69     return bufferPtr->GetMemory()->GetWritableAddr(bufferPtr->GetMemory()->GetCapacity());
70 }
71 
QueueBuffer(size_t index,size_t offset,size_t size,int64_t timestampUs,uint32_t flags)72 void StreamSourceCallback::QueueBuffer(size_t index, size_t offset, size_t size, int64_t timestampUs, uint32_t flags)
73 {
74     auto bufferPtr = dataSource_->FindBuffer(index);
75     dataSource_->EraseBuffer(index);
76     bufferPtr->GetMemory()->UpdateDataSize(size);
77     dataSource_->EnqueBuffer(bufferPtr);
78 }
79 
StreamSourcePlugin(std::string name)80 StreamSourcePlugin::StreamSourcePlugin(std::string name)
81     : SourcePlugin(std::move(name)),
82       bufferPool_(0),
83       state_(State::CREATED),
84       isSeekable_(false),
85       waitBuffers_(),
86       bufferQueue_("SourceBuffQue")
87 {
88     MEDIA_LOG_D("ctor called");
89 }
90 
~StreamSourcePlugin()91 StreamSourcePlugin::~StreamSourcePlugin()
92 {
93     MEDIA_LOG_D("dtor called");
94     state_ = State::DESTROYED;
95 }
96 
Init()97 Status StreamSourcePlugin::Init()
98 {
99     MEDIA_LOG_D("IN");
100     bufferPool_.Init(DEFAULT_FRAME_SIZE);
101     mAllocator_ = std::make_shared<StreamSourceAllocator>();
102     state_ = State::INITIALIZED;
103     return Status::OK;
104 }
105 
Deinit()106 Status StreamSourcePlugin::Deinit()
107 {
108     MEDIA_LOG_D("IN");
109     state_ = State::DESTROYED;
110     return Status::OK;
111 }
112 
Prepare()113 Status StreamSourcePlugin::Prepare()
114 {
115     MEDIA_LOG_D("IN");
116     state_ = State::PREPARED;
117     return Status::OK;
118 }
119 
Reset()120 Status StreamSourcePlugin::Reset()
121 {
122     MEDIA_LOG_D("IN");
123     state_ = State::INITIALIZED;
124     return Status::OK;
125 }
126 
Start()127 Status StreamSourcePlugin::Start()
128 {
129     MEDIA_LOG_D("IN");
130     bufferPool_.SetActive(true);
131     taskPtr_->Start();
132     state_ = State::RUNNING;
133     return Status::OK;
134 }
135 
Stop()136 Status StreamSourcePlugin::Stop()
137 {
138     MEDIA_LOG_D("IN");
139     bufferQueue_.SetActive(false);
140     taskPtr_->Stop();
141     state_ = State::PREPARED;
142     return Status::OK;
143 }
144 
IsParameterSupported(Tag tag)145 bool StreamSourcePlugin::IsParameterSupported(Tag tag)
146 {
147     MEDIA_LOG_D("IN");
148     return true;
149 }
150 
GetParameter(Tag tag,ValueType & value)151 Status StreamSourcePlugin::GetParameter(Tag tag, ValueType& value)
152 {
153     MEDIA_LOG_D("IN");
154     return Status::OK;
155 }
156 
SetParameter(Tag tag,const ValueType & value)157 Status StreamSourcePlugin::SetParameter(Tag tag, const ValueType& value)
158 {
159     MEDIA_LOG_D("IN");
160     return Status::OK;
161 }
162 
GetAllocator()163 std::shared_ptr<Allocator> StreamSourcePlugin::GetAllocator()
164 {
165     MEDIA_LOG_D("IN");
166     return mAllocator_;
167 }
168 
SetCallback(Callback * cb)169 Status StreamSourcePlugin::SetCallback(Callback* cb)
170 {
171     MEDIA_LOG_D("IN");
172     return Status::OK;
173 }
174 
SetSource(std::shared_ptr<MediaSource> source)175 Status StreamSourcePlugin::SetSource(std::shared_ptr<MediaSource> source)
176 {
177     auto source_ = std::make_shared<OHOS::Media::Source>("");
178     std::shared_ptr<StreamSource> stream_ = source_->GetSourceStream();
179     if (stream_ == nullptr) {
180         MEDIA_LOG_E("Get StreamSource fail");
181         return Status::ERROR_INVALID_PARAMETER;
182     }
183 
184     streamCallback_ = std::make_shared<StreamSourceCallback>(shared_from_this(), stream_);
185     stream_->SetStreamCallback(streamCallback_);
186     streamSource_ = stream_;
187     taskPtr_ = std::make_shared<OSAL::Task>("StreamSource");
188     taskPtr_->RegisterHandler(std::bind(&StreamSourcePlugin::NotifyAvilableBufferLoop, this));
189     return Status::OK;
190 }
191 
Read(std::shared_ptr<Buffer> & buffer,size_t expectedLen)192 Status StreamSourcePlugin::Read(std::shared_ptr<Buffer>& buffer, size_t expectedLen)
193 {
194     AVBufferPtr bufPtr_ = bufferQueue_.Pop(); // the cached buffer
195     auto availSize = bufPtr_->GetMemory()->GetSize();
196     MEDIA_LOG_D("availSize: %" PUBLIC_LOG "zu, expectedLen: %" PUBLIC_LOG "zu", availSize, expectedLen);
197     if (buffer->IsEmpty()) { // No buffer provided, use the cached buffer.
198         buffer = bufPtr_;
199         return Status::OK;
200     } else { // Buffer provided, copy it.
201         if (buffer->GetMemory()->GetCapacity() < availSize) {
202             MEDIA_LOG_D("buffer->length: %" PUBLIC_LOG "zu is smaller than %" PUBLIC_LOG "zu",
203                         buffer->GetMemory()->GetCapacity(), availSize);
204             return Status::ERROR_NO_MEMORY;
205         }
206         buffer->GetMemory()->Write(bufPtr_->GetMemory()->GetReadOnlyData(), availSize);
207     }
208     return Status::OK;
209 }
210 
GetSize(size_t & size)211 Status StreamSourcePlugin::GetSize(size_t& size)
212 {
213     MEDIA_LOG_D("IN");
214     size = -1;
215     return Status::ERROR_WRONG_STATE;
216 }
217 
IsSeekable()218 bool StreamSourcePlugin::IsSeekable()
219 {
220     MEDIA_LOG_D("IN");
221     return isSeekable_;
222 }
223 
SeekTo(uint64_t offset)224 Status StreamSourcePlugin::SeekTo(uint64_t offset)
225 {
226     MEDIA_LOG_D("IN");
227     return Status::ERROR_UNIMPLEMENTED;
228 }
229 
AllocateBuffer()230 AVBufferPtr StreamSourcePlugin::AllocateBuffer()
231 {
232     return bufferPool_.AllocateBuffer();
233 }
234 
FindBuffer(size_t idx)235 AVBufferPtr StreamSourcePlugin::FindBuffer(size_t idx)
236 {
237     OSAL::ScopedLock lock(mutex_);
238     auto it = waitBuffers_.find(idx);
239     if (it != waitBuffers_.end()) {
240         return it->second;
241     }
242     return nullptr;
243 }
244 
EraseBuffer(size_t idx)245 void StreamSourcePlugin::EraseBuffer(size_t idx)
246 {
247     OSAL::ScopedLock lock(mutex_);
248     waitBuffers_.erase(idx);
249 }
250 
EnqueBuffer(AVBufferPtr & bufferPtr)251 void StreamSourcePlugin::EnqueBuffer(AVBufferPtr& bufferPtr)
252 {
253     bufferQueue_.Push(bufferPtr);
254 }
255 
NotifyAvilableBufferLoop()256 void StreamSourcePlugin::NotifyAvilableBufferLoop()
257 {
258     auto bufferPtr = AllocateBuffer();
259     if (bufferPtr == nullptr) {
260         MEDIA_LOG_E("Alloc buffer fail");
261         return;
262     }
263     size_t idx = GetUniqueIdx();
264     {
265         OSAL::ScopedLock lock(mutex_);
266         waitBuffers_[idx] = bufferPtr;
267     }
268     std::shared_ptr<StreamSource> stream = streamSource_.lock();
269     stream->OnBufferAvailable(idx, 0, bufferPtr->GetMemory()->GetCapacity());
270 }
271 } // namespace StreamSource
272 } // namespace Plugin
273 } // namespace Media
274 } // namespace OHOS
275