1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "gst_appsrc_wrap.h"
17 #include "avsharedmemorybase.h"
18 #include "media_log.h"
19 #include "media_errors.h"
20 #include "securec.h"
21
22 namespace {
23 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "GstAppsrcWrap"};
24 constexpr int32_t BUFFERS_NUM = 5;
25 constexpr int32_t BUFFER_SIZE = 81920;
26 constexpr int64_t INVALID_SIZE = -1;
27 }
28
29 namespace OHOS {
30 namespace Media {
Create(const std::shared_ptr<IMediaDataSource> & dataSrc)31 std::shared_ptr<GstAppsrcWrap> GstAppsrcWrap::Create(const std::shared_ptr<IMediaDataSource> &dataSrc)
32 {
33 CHECK_AND_RETURN_RET_LOG(dataSrc != nullptr, nullptr, "input dataSrc is empty!");
34 int64_t size = 0;
35 int32_t ret = dataSrc->GetSize(size);
36 CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, nullptr, "media data source get size failed!");
37 CHECK_AND_RETURN_RET_LOG(size >= INVALID_SIZE, nullptr, "size cannot less than -1");
38 std::shared_ptr<GstAppsrcWrap> wrap = std::make_shared<GstAppsrcWrap>(dataSrc, size);
39 CHECK_AND_RETURN_RET_LOG(wrap->Init() == MSERR_OK, nullptr, "init failed");
40 return wrap;
41 }
42
GstAppsrcWrap(const std::shared_ptr<IMediaDataSource> & dataSrc,const int64_t size)43 GstAppsrcWrap::GstAppsrcWrap(const std::shared_ptr<IMediaDataSource> &dataSrc, const int64_t size)
44 : dataSrc_(dataSrc),
45 size_(size),
46 fillTaskQue_("fillbufferTask"),
47 emptyTaskQue_("emptybufferTask"),
48 bufferSize_(BUFFER_SIZE),
49 buffersNum_(BUFFERS_NUM)
50 {
51 MEDIA_LOGD("0x%{public}06" PRIXPTR " Instances create and size %{public}" PRId64 "", FAKE_POINTER(this), size);
52 streamType_ = size == INVALID_SIZE ? GST_APP_STREAM_TYPE_STREAM : GST_APP_STREAM_TYPE_RANDOM_ACCESS;
53 }
54
~GstAppsrcWrap()55 GstAppsrcWrap::~GstAppsrcWrap()
56 {
57 MEDIA_LOGD("0x%{public}06" PRIXPTR " Instances destroy", FAKE_POINTER(this));
58 Stop();
59 ClearAppsrc();
60 }
61
Init()62 int32_t GstAppsrcWrap::Init()
63 {
64 for (int i = 0; i < buffersNum_; ++i) {
65 std::shared_ptr<AppsrcMemWrap> appSrcMem = std::make_shared<AppsrcMemWrap>();
66 CHECK_AND_RETURN_RET_LOG(appSrcMem != nullptr, MSERR_NO_MEMORY, "init AppsrcMemWrap failed");
67 appSrcMem->mem = AVSharedMemoryBase::CreateFromLocal(
68 bufferSize_, AVSharedMemory::Flags::FLAGS_READ_WRITE, "appsrc");
69 CHECK_AND_RETURN_RET_LOG(appSrcMem->mem != nullptr, MSERR_NO_MEMORY, "init AVSharedMemory failed");
70 (void)emptyBuffers_.emplace(appSrcMem);
71 }
72 return MSERR_OK;
73 }
74
Prepare()75 int32_t GstAppsrcWrap::Prepare()
76 {
77 std::unique_lock<std::mutex> lock(mutex_);
78 MEDIA_LOGD("Prepare in");
79 if (!isExit_) {
80 MEDIA_LOGD("Prepared");
81 return MSERR_OK;
82 }
83 isExit_ = false;
84 needData_ = false;
85 filledBufferSize_ = 0;
86 needDataSize_ = 0;
87 atEos_ = false;
88 curPos_ = 0;
89 while (!filledBuffers_.empty()) {
90 std::shared_ptr<AppsrcMemWrap> appSrcMem = filledBuffers_.front();
91 filledBuffers_.pop();
92 emptyBuffers_.push(appSrcMem);
93 }
94 CHECK_AND_RETURN_RET_LOG(fillTaskQue_.Start() == MSERR_OK, MSERR_INVALID_OPERATION, "init task failed");
95 CHECK_AND_RETURN_RET_LOG(emptyTaskQue_.Start() == MSERR_OK, MSERR_INVALID_OPERATION, "init task failed");
96 auto task = std::make_shared<TaskHandler<void>>([this] {
97 FillTask();
98 });
99 CHECK_AND_RETURN_RET_LOG(fillTaskQue_.EnqueueTask(task) == MSERR_OK,
100 MSERR_INVALID_OPERATION, "enque task failed");
101 task = std::make_shared<TaskHandler<void>>([this] {
102 EmptyTask();
103 });
104 CHECK_AND_RETURN_RET_LOG(emptyTaskQue_.EnqueueTask(task) == MSERR_OK,
105 MSERR_INVALID_OPERATION, "enque task failed");
106 MEDIA_LOGD("Prepare out");
107 return MSERR_OK;
108 }
109
Stop()110 void GstAppsrcWrap::Stop()
111 {
112 {
113 std::unique_lock<std::mutex> lock(mutex_);
114 isExit_ = true;
115 if (bufferWrap_ != nullptr) {
116 gst_buffer_unref(bufferWrap_->buffer);
117 bufferWrap_ = nullptr;
118 }
119 fillCond_.notify_all();
120 emptyCond_.notify_all();
121 }
122 (void)fillTaskQue_.Stop();
123 (void)emptyTaskQue_.Stop();
124 }
125
ClearAppsrc()126 void GstAppsrcWrap::ClearAppsrc()
127 {
128 if (appSrc_ != nullptr) {
129 for (auto &id : callbackIds_) {
130 g_signal_handler_disconnect(appSrc_, id);
131 }
132 callbackIds_.clear();
133 gst_object_unref(appSrc_);
134 appSrc_ = nullptr;
135 }
136 }
137
SetAppsrc(GstElement * appSrc)138 int32_t GstAppsrcWrap::SetAppsrc(GstElement *appSrc)
139 {
140 MEDIA_LOGD("set Appsrc");
141 ClearAppsrc();
142 appSrc_ = static_cast<GstElement *>(gst_object_ref(appSrc));
143 CHECK_AND_RETURN_RET_LOG(appSrc_ != nullptr, MSERR_INVALID_VAL, "set appsrc failed");
144 SetCallBackForAppSrc();
145 return MSERR_OK;
146 }
147
SetCallBackForAppSrc()148 void GstAppsrcWrap::SetCallBackForAppSrc()
149 {
150 MEDIA_LOGD("SetCallBackForAppSrc");
151 CHECK_AND_RETURN_LOG(appSrc_ != nullptr, "appSrc_ is nullptr");
152 int64_t size = static_cast<int64_t>(size_);
153 g_object_set(appSrc_, "stream-type", streamType_, nullptr);
154 g_object_set(appSrc_, "format", GST_FORMAT_BYTES, nullptr);
155 g_object_set(appSrc_, "size", size, nullptr);
156 callbackIds_.push_back(g_signal_connect(appSrc_, "need-data", G_CALLBACK(NeedData), this));
157 if (streamType_ == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
158 callbackIds_.push_back(g_signal_connect(appSrc_, "seek-data", G_CALLBACK(SeekData), this));
159 }
160 MEDIA_LOGD("setcall back end");
161 }
162
IsLiveMode() const163 bool GstAppsrcWrap::IsLiveMode() const
164 {
165 return streamType_ == GST_APP_STREAM_TYPE_STREAM;
166 }
167
SetErrorCallback(AppsrcErrorNotifier notifier)168 int32_t GstAppsrcWrap::SetErrorCallback(AppsrcErrorNotifier notifier)
169 {
170 std::unique_lock<std::mutex> lock(mutex_);
171 notifier_ = notifier;
172 return MSERR_OK;
173 }
174
NeedData(const GstElement * appSrc,uint32_t size,gpointer self)175 void GstAppsrcWrap::NeedData(const GstElement *appSrc, uint32_t size, gpointer self)
176 {
177 (void)appSrc;
178 CHECK_AND_RETURN_LOG(self != nullptr, "self is nullptr");
179 auto wrap = static_cast<GstAppsrcWrap *>(self);
180 wrap->NeedDataInner(size);
181 }
182
NeedDataInner(uint32_t size)183 void GstAppsrcWrap::NeedDataInner(uint32_t size)
184 {
185 std::unique_lock<std::mutex> lock(mutex_);
186 needDataSize_ = static_cast<int32_t>(size);
187 if (!filledBuffers_.empty() && (needDataSize_ <= filledBufferSize_ || atEos_ ||
188 streamType_ == GST_APP_STREAM_TYPE_STREAM) && !isExit_) {
189 int32_t ret = GetAndPushMem();
190 if (ret != MSERR_OK) {
191 OnError(ret);
192 }
193 } else {
194 needData_ = true;
195 if (!filledBuffers_.empty()) {
196 emptyCond_.notify_all();
197 }
198 }
199 }
200
FillTask()201 void GstAppsrcWrap::FillTask()
202 {
203 int32_t ret = ReadAndGetMem();
204 if (ret != MSERR_OK) {
205 OnError(ret);
206 }
207 }
208
EmptyTask()209 void GstAppsrcWrap::EmptyTask()
210 {
211 int32_t ret = MSERR_OK;
212 while (ret == MSERR_OK) {
213 std::unique_lock<std::mutex> lock(mutex_);
214 emptyCond_.wait(lock, [this] {
215 return (!filledBuffers_.empty() && needData_) || isExit_;
216 });
217 if (isExit_) {
218 break;
219 }
220 ret = GetAndPushMem();
221 }
222 if (ret != MSERR_OK) {
223 OnError(ret);
224 }
225 }
226
SeekData(const GstElement * appSrc,uint64_t seekPos,gpointer self)227 gboolean GstAppsrcWrap::SeekData(const GstElement *appSrc, uint64_t seekPos, gpointer self)
228 {
229 MEDIA_LOGD("SeekData pos: %{public}" PRIu64 "", seekPos);
230 (void)appSrc;
231 CHECK_AND_RETURN_RET_LOG(self != nullptr, FALSE, "self is nullptr");
232 auto wrap = static_cast<GstAppsrcWrap *>(self);
233 return wrap->SeekDataInner(seekPos);
234 }
235
SeekAndFreeBuffers(uint64_t pos)236 void GstAppsrcWrap::SeekAndFreeBuffers(uint64_t pos)
237 {
238 std::unique_lock<std::mutex> lock(mutex_);
239 while (!filledBuffers_.empty()) {
240 std::shared_ptr<AppsrcMemWrap> appSrcMem = filledBuffers_.front();
241 CHECK_AND_RETURN_LOG(appSrcMem != nullptr, "appSrcMem is nullptr");
242 if (appSrcMem->size < 0) {
243 filledBuffers_.pop();
244 emptyBuffers_.push(appSrcMem);
245 continue;
246 }
247 if (appSrcMem->pos <= pos && appSrcMem->pos + static_cast<uint64_t>(appSrcMem->size) > pos) {
248 int32_t len = static_cast<int32_t>(pos - appSrcMem->pos);
249 filledBufferSize_ += appSrcMem->offset;
250 appSrcMem->offset = len;
251 filledBufferSize_ -= appSrcMem->offset;
252 break;
253 }
254 filledBufferSize_ = filledBufferSize_ - (appSrcMem->size - appSrcMem->offset);
255 filledBuffers_.pop();
256 emptyBuffers_.push(appSrcMem);
257 }
258 if (filledBuffers_.empty()) {
259 curPos_ = pos;
260 atEos_ = false;
261 }
262 fillCond_.notify_all();
263 }
264
SeekDataInner(uint64_t pos)265 gboolean GstAppsrcWrap::SeekDataInner(uint64_t pos)
266 {
267 SeekAndFreeBuffers(pos);
268 return TRUE;
269 }
270
ReadAndGetMem()271 int32_t GstAppsrcWrap::ReadAndGetMem()
272 {
273 int32_t ret = MSERR_OK;
274 while (ret == MSERR_OK) {
275 int32_t size = 0;
276 std::shared_ptr<AppsrcMemWrap> appSrcMem = nullptr;
277 {
278 std::unique_lock<std::mutex> lock(mutex_);
279 fillCond_.wait(lock, [this] { return (!emptyBuffers_.empty() && !atEos_) || isExit_; });
280 if (isExit_) {
281 break;
282 }
283 appSrcMem = emptyBuffers_.front();
284 CHECK_AND_RETURN_RET_LOG(appSrcMem != nullptr && appSrcMem->mem != nullptr, MSERR_NO_MEMORY, "no mem");
285 appSrcMem->pos = curPos_;
286 emptyBuffers_.pop();
287 }
288 if (size_ == INVALID_SIZE) {
289 size = dataSrc_->ReadAt(bufferSize_, appSrcMem->mem);
290 } else {
291 size = dataSrc_->ReadAt(static_cast<int64_t>(appSrcMem->pos), bufferSize_, appSrcMem->mem);
292 }
293 if (size > appSrcMem->mem->GetSize()) {
294 ret = MSERR_INVALID_VAL;
295 }
296 {
297 std::unique_lock<std::mutex> lock(mutex_);
298 if (size == 0 || curPos_ != appSrcMem->pos) {
299 emptyBuffers_.push(appSrcMem);
300 } else if (size < 0) {
301 appSrcMem->size = size;
302 atEos_ = true;
303 filledBuffers_.push(appSrcMem);
304 } else {
305 size = std::min(size, appSrcMem->mem->GetSize());
306 appSrcMem->size = size;
307 filledBufferSize_ += size;
308 appSrcMem->pos = curPos_;
309 appSrcMem->offset = 0;
310 curPos_ = curPos_ + static_cast<uint64_t>(size);
311 filledBuffers_.push(appSrcMem);
312 }
313 emptyCond_.notify_all();
314 }
315 }
316 return ret;
317 }
318
EosAndCheckSize(int32_t size)319 void GstAppsrcWrap::EosAndCheckSize(int32_t size)
320 {
321 MEDIA_LOGD("%{public}d", size);
322 PushEos();
323 switch (size) {
324 case SOURCE_ERROR_IO:
325 OnError(MSERR_DATA_SOURCE_IO_ERROR);
326 MEDIA_LOGW("IO ERROR %{public}d", size);
327 break;
328 case SOURCE_ERROR_EOF:
329 break;
330 default:
331 OnError(MSERR_DATA_SOURCE_ERROR_UNKNOWN);
332 MEDIA_LOGE("unknown error %{public}d", size);
333 break;
334 }
335 }
336
GetAndPushMem()337 int32_t GstAppsrcWrap::GetAndPushMem()
338 {
339 int32_t size = needDataSize_ > filledBufferSize_ ? filledBufferSize_ : needDataSize_;
340 std::shared_ptr<AppsrcMemWrap> appSrcMem = filledBuffers_.front();
341 CHECK_AND_RETURN_RET_LOG(appSrcMem != nullptr && appSrcMem->mem != nullptr, MSERR_NO_MEMORY, "no mem");
342 if (size == 0) {
343 EosAndCheckSize(appSrcMem->size);
344 filledBuffers_.pop();
345 emptyBuffers_.push(appSrcMem);
346 needData_ = false;
347 return MSERR_OK;
348 }
349 GstBuffer *buffer = nullptr;
350 if (bufferWrap_ != nullptr && bufferWrap_->buffer != nullptr) {
351 buffer = bufferWrap_->buffer;
352 } else {
353 bufferWrap_ = std::make_shared<AppsrcBufferWrap>();
354 int32_t allocSize = streamType_ == GST_APP_STREAM_TYPE_STREAM ? size : needDataSize_;
355 buffer = gst_buffer_new_allocate(nullptr, static_cast<gsize>(allocSize), nullptr);
356 CHECK_AND_RETURN_RET_LOG(buffer != nullptr, MSERR_NO_MEMORY, "no mem");
357 GST_BUFFER_OFFSET(buffer) = appSrcMem->pos + static_cast<uint64_t>(appSrcMem->offset);
358 bufferWrap_->buffer = buffer;
359 bufferWrap_->offset = 0;
360 bufferWrap_->size = allocSize;
361 }
362 GstMapInfo info = GST_MAP_INFO_INIT;
363 if (gst_buffer_map(buffer, &info, GST_MAP_WRITE) == FALSE) {
364 gst_buffer_unref(buffer);
365 MEDIA_LOGE("map buffer failed");
366 return MSERR_NO_MEMORY;
367 }
368 bool copyRet = CopyToGstBuffer(info);
369 gst_buffer_unmap(buffer, &info);
370 if (!copyRet) {
371 MEDIA_LOGE("copy buffer failed");
372 gst_buffer_unref(buffer);
373 return MSERR_NO_MEMORY;
374 }
375 if (bufferWrap_->size == bufferWrap_->offset) {
376 bufferWrap_ = nullptr;
377 PushData(buffer);
378 needDataSize_ = 0;
379 needData_ = false;
380 gst_buffer_unref(buffer);
381 } else {
382 needDataSize_ = bufferWrap_->size - bufferWrap_->offset;
383 }
384 filledBufferSize_ -= size;
385 return MSERR_OK;
386 }
387
CopyToGstBuffer(const GstMapInfo & info)388 bool GstAppsrcWrap::CopyToGstBuffer(const GstMapInfo &info)
389 {
390 guint8 *data = info.data + bufferWrap_->offset;
391 int32_t size = static_cast<int32_t>(info.size) - bufferWrap_->offset;
392 while (size > 0 && !filledBuffers_.empty()) {
393 std::shared_ptr<AppsrcMemWrap> appSrcMem = filledBuffers_.front();
394 CHECK_AND_BREAK_LOG(appSrcMem != nullptr && appSrcMem->mem != nullptr
395 && appSrcMem->mem->GetBase() != nullptr
396 && (appSrcMem->size - appSrcMem->offset) > 0,
397 "get mem is nullptr");
398 int32_t lastSize = appSrcMem->size - appSrcMem->offset;
399 int32_t copySize = std::min(lastSize, size);
400 CHECK_AND_BREAK_LOG(memcpy_s(data, static_cast<size_t>(size),
401 appSrcMem->mem->GetBase() + appSrcMem->offset, static_cast<size_t>(copySize)) == EOK,
402 "get mem is nullptr");
403 if (lastSize <= size) {
404 filledBuffers_.pop();
405 emptyBuffers_.push(appSrcMem);
406 fillCond_.notify_all();
407 } else {
408 appSrcMem->offset += copySize;
409 }
410 data = data + copySize;
411 bufferWrap_->offset += copySize;
412 size -= copySize;
413 }
414 if (size != 0 && !filledBuffers_.empty()) {
415 return false;
416 }
417 return true;
418 }
419
OnError(int32_t errorCode)420 void GstAppsrcWrap::OnError(int32_t errorCode)
421 {
422 if (notifier_ != nullptr) {
423 notifier_(errorCode);
424 }
425 }
426
PushData(const GstBuffer * buffer) const427 void GstAppsrcWrap::PushData(const GstBuffer *buffer) const
428 {
429 int32_t ret = GST_FLOW_OK;
430 if (appSrc_ != nullptr) {
431 g_signal_emit_by_name(appSrc_, "push-buffer", buffer, &ret);
432 }
433 }
434
PushEos()435 void GstAppsrcWrap::PushEos()
436 {
437 int32_t ret = GST_FLOW_OK;
438 if (appSrc_ != nullptr) {
439 g_signal_emit_by_name(appSrc_, "end-of-stream", &ret);
440 }
441 MEDIA_LOGD("appsrcPushEos ret:%{public}d", ret);
442 }
443 } // namespace Media
444 } // namespace OHOS
445