• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "gst_appsrc_wrap.h"
17 #include "avsharedmemorybase.h"
18 #include "media_log.h"
19 #include "media_errors.h"
20 #include "securec.h"
21 
22 namespace {
23     constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "GstAppsrcWrap"};
24     constexpr int32_t BUFFERS_NUM = 5;
25     constexpr int32_t BUFFER_SIZE = 81920;
26     constexpr int64_t INVALID_SIZE = -1;
27 }
28 
29 namespace OHOS {
30 namespace Media {
Create(const std::shared_ptr<IMediaDataSource> & dataSrc)31 std::shared_ptr<GstAppsrcWrap> GstAppsrcWrap::Create(const std::shared_ptr<IMediaDataSource> &dataSrc)
32 {
33     CHECK_AND_RETURN_RET_LOG(dataSrc != nullptr, nullptr, "input dataSrc is empty!");
34     int64_t size = 0;
35     int32_t ret = dataSrc->GetSize(size);
36     CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, nullptr, "media data source get size failed!");
37     CHECK_AND_RETURN_RET_LOG(size >= INVALID_SIZE, nullptr, "size cannot less than -1");
38     std::shared_ptr<GstAppsrcWrap> wrap = std::make_shared<GstAppsrcWrap>(dataSrc, size);
39     CHECK_AND_RETURN_RET_LOG(wrap->Init() == MSERR_OK, nullptr, "init failed");
40     return wrap;
41 }
42 
GstAppsrcWrap(const std::shared_ptr<IMediaDataSource> & dataSrc,const int64_t size)43 GstAppsrcWrap::GstAppsrcWrap(const std::shared_ptr<IMediaDataSource> &dataSrc, const int64_t size)
44     : dataSrc_(dataSrc),
45       size_(size),
46       fillTaskQue_("fillbufferTask"),
47       emptyTaskQue_("emptybufferTask"),
48       bufferSize_(BUFFER_SIZE),
49       buffersNum_(BUFFERS_NUM)
50 {
51     MEDIA_LOGD("0x%{public}06" PRIXPTR " Instances create and size %{public}" PRId64 "", FAKE_POINTER(this), size);
52     streamType_ = size == INVALID_SIZE ? GST_APP_STREAM_TYPE_STREAM : GST_APP_STREAM_TYPE_RANDOM_ACCESS;
53 }
54 
~GstAppsrcWrap()55 GstAppsrcWrap::~GstAppsrcWrap()
56 {
57     MEDIA_LOGD("0x%{public}06" PRIXPTR " Instances destroy", FAKE_POINTER(this));
58     Stop();
59     ClearAppsrc();
60 }
61 
Init()62 int32_t GstAppsrcWrap::Init()
63 {
64     for (int i = 0; i < buffersNum_; ++i) {
65         std::shared_ptr<AppsrcMemWrap> appSrcMem = std::make_shared<AppsrcMemWrap>();
66         CHECK_AND_RETURN_RET_LOG(appSrcMem != nullptr, MSERR_NO_MEMORY, "init AppsrcMemWrap failed");
67         appSrcMem->mem = AVSharedMemoryBase::CreateFromLocal(
68             bufferSize_, AVSharedMemory::Flags::FLAGS_READ_WRITE, "appsrc");
69         CHECK_AND_RETURN_RET_LOG(appSrcMem->mem != nullptr, MSERR_NO_MEMORY, "init AVSharedMemory failed");
70         (void)emptyBuffers_.emplace(appSrcMem);
71     }
72     return MSERR_OK;
73 }
74 
Prepare()75 int32_t GstAppsrcWrap::Prepare()
76 {
77     std::unique_lock<std::mutex> lock(mutex_);
78     MEDIA_LOGD("Prepare in");
79     if (!isExit_) {
80         MEDIA_LOGD("Prepared");
81         return MSERR_OK;
82     }
83     isExit_ = false;
84     needData_ = false;
85     filledBufferSize_ = 0;
86     needDataSize_ = 0;
87     atEos_ = false;
88     curPos_ = 0;
89     while (!filledBuffers_.empty()) {
90         std::shared_ptr<AppsrcMemWrap> appSrcMem = filledBuffers_.front();
91         filledBuffers_.pop();
92         emptyBuffers_.push(appSrcMem);
93     }
94     CHECK_AND_RETURN_RET_LOG(fillTaskQue_.Start() == MSERR_OK, MSERR_INVALID_OPERATION, "init task failed");
95     CHECK_AND_RETURN_RET_LOG(emptyTaskQue_.Start() == MSERR_OK, MSERR_INVALID_OPERATION, "init task failed");
96     auto task = std::make_shared<TaskHandler<void>>([this] {
97         FillTask();
98     });
99     CHECK_AND_RETURN_RET_LOG(fillTaskQue_.EnqueueTask(task) == MSERR_OK,
100         MSERR_INVALID_OPERATION, "enque task failed");
101     task = std::make_shared<TaskHandler<void>>([this] {
102         EmptyTask();
103     });
104     CHECK_AND_RETURN_RET_LOG(emptyTaskQue_.EnqueueTask(task) == MSERR_OK,
105         MSERR_INVALID_OPERATION, "enque task failed");
106     MEDIA_LOGD("Prepare out");
107     return MSERR_OK;
108 }
109 
Stop()110 void GstAppsrcWrap::Stop()
111 {
112     {
113         std::unique_lock<std::mutex> lock(mutex_);
114         isExit_ = true;
115         if (bufferWrap_ != nullptr) {
116             gst_buffer_unref(bufferWrap_->buffer);
117             bufferWrap_ = nullptr;
118         }
119         fillCond_.notify_all();
120         emptyCond_.notify_all();
121     }
122     (void)fillTaskQue_.Stop();
123     (void)emptyTaskQue_.Stop();
124 }
125 
ClearAppsrc()126 void GstAppsrcWrap::ClearAppsrc()
127 {
128     if (appSrc_ != nullptr) {
129         for (auto &id : callbackIds_) {
130             g_signal_handler_disconnect(appSrc_, id);
131         }
132         callbackIds_.clear();
133         gst_object_unref(appSrc_);
134         appSrc_ = nullptr;
135     }
136 }
137 
SetAppsrc(GstElement * appSrc)138 int32_t GstAppsrcWrap::SetAppsrc(GstElement *appSrc)
139 {
140     MEDIA_LOGD("set Appsrc");
141     ClearAppsrc();
142     appSrc_ = static_cast<GstElement *>(gst_object_ref(appSrc));
143     CHECK_AND_RETURN_RET_LOG(appSrc_ != nullptr, MSERR_INVALID_VAL, "set appsrc failed");
144     SetCallBackForAppSrc();
145     return MSERR_OK;
146 }
147 
SetCallBackForAppSrc()148 void GstAppsrcWrap::SetCallBackForAppSrc()
149 {
150     MEDIA_LOGD("SetCallBackForAppSrc");
151     CHECK_AND_RETURN_LOG(appSrc_ != nullptr, "appSrc_ is nullptr");
152     int64_t size = static_cast<int64_t>(size_);
153     g_object_set(appSrc_, "stream-type", streamType_, nullptr);
154     g_object_set(appSrc_, "format", GST_FORMAT_BYTES, nullptr);
155     g_object_set(appSrc_, "size", size, nullptr);
156     callbackIds_.push_back(g_signal_connect(appSrc_, "need-data", G_CALLBACK(NeedData), this));
157     if (streamType_ == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
158         callbackIds_.push_back(g_signal_connect(appSrc_, "seek-data", G_CALLBACK(SeekData), this));
159     }
160     MEDIA_LOGD("setcall back end");
161 }
162 
IsLiveMode() const163 bool GstAppsrcWrap::IsLiveMode() const
164 {
165     return streamType_ == GST_APP_STREAM_TYPE_STREAM;
166 }
167 
SetErrorCallback(AppsrcErrorNotifier notifier)168 int32_t GstAppsrcWrap::SetErrorCallback(AppsrcErrorNotifier notifier)
169 {
170     std::unique_lock<std::mutex> lock(mutex_);
171     notifier_ = notifier;
172     return MSERR_OK;
173 }
174 
NeedData(const GstElement * appSrc,uint32_t size,gpointer self)175 void GstAppsrcWrap::NeedData(const GstElement *appSrc, uint32_t size, gpointer self)
176 {
177     (void)appSrc;
178     CHECK_AND_RETURN_LOG(self != nullptr, "self is nullptr");
179     auto wrap = static_cast<GstAppsrcWrap *>(self);
180     wrap->NeedDataInner(size);
181 }
182 
NeedDataInner(uint32_t size)183 void GstAppsrcWrap::NeedDataInner(uint32_t size)
184 {
185     std::unique_lock<std::mutex> lock(mutex_);
186     needDataSize_ = static_cast<int32_t>(size);
187     if (!filledBuffers_.empty() && (needDataSize_ <= filledBufferSize_ || atEos_ ||
188         streamType_ == GST_APP_STREAM_TYPE_STREAM) && !isExit_) {
189         int32_t ret = GetAndPushMem();
190         if (ret != MSERR_OK) {
191             OnError(ret);
192         }
193     } else {
194         needData_ = true;
195         if (!filledBuffers_.empty()) {
196             emptyCond_.notify_all();
197         }
198     }
199 }
200 
FillTask()201 void GstAppsrcWrap::FillTask()
202 {
203     int32_t ret = ReadAndGetMem();
204     if (ret != MSERR_OK) {
205         OnError(ret);
206     }
207 }
208 
EmptyTask()209 void GstAppsrcWrap::EmptyTask()
210 {
211     int32_t ret = MSERR_OK;
212     while (ret == MSERR_OK) {
213         std::unique_lock<std::mutex> lock(mutex_);
214         emptyCond_.wait(lock, [this] {
215             return (!filledBuffers_.empty() && needData_) || isExit_;
216         });
217         if (isExit_) {
218             break;
219         }
220         ret = GetAndPushMem();
221     }
222     if (ret != MSERR_OK) {
223         OnError(ret);
224     }
225 }
226 
SeekData(const GstElement * appSrc,uint64_t seekPos,gpointer self)227 gboolean GstAppsrcWrap::SeekData(const GstElement *appSrc, uint64_t seekPos, gpointer self)
228 {
229     MEDIA_LOGD("SeekData pos: %{public}" PRIu64 "", seekPos);
230     (void)appSrc;
231     CHECK_AND_RETURN_RET_LOG(self != nullptr, FALSE, "self is nullptr");
232     auto wrap = static_cast<GstAppsrcWrap *>(self);
233     return wrap->SeekDataInner(seekPos);
234 }
235 
SeekAndFreeBuffers(uint64_t pos)236 void GstAppsrcWrap::SeekAndFreeBuffers(uint64_t pos)
237 {
238     std::unique_lock<std::mutex> lock(mutex_);
239     while (!filledBuffers_.empty()) {
240         std::shared_ptr<AppsrcMemWrap> appSrcMem = filledBuffers_.front();
241         CHECK_AND_RETURN_LOG(appSrcMem != nullptr, "appSrcMem is nullptr");
242         if (appSrcMem->size < 0) {
243             filledBuffers_.pop();
244             emptyBuffers_.push(appSrcMem);
245             continue;
246         }
247         if (appSrcMem->pos <= pos && appSrcMem->pos + static_cast<uint64_t>(appSrcMem->size) > pos) {
248             int32_t len = static_cast<int32_t>(pos - appSrcMem->pos);
249             filledBufferSize_ += appSrcMem->offset;
250             appSrcMem->offset = len;
251             filledBufferSize_ -= appSrcMem->offset;
252             break;
253         }
254         filledBufferSize_ = filledBufferSize_ - (appSrcMem->size - appSrcMem->offset);
255         filledBuffers_.pop();
256         emptyBuffers_.push(appSrcMem);
257     }
258     if (filledBuffers_.empty()) {
259         curPos_ = pos;
260         atEos_ = false;
261     }
262     fillCond_.notify_all();
263 }
264 
SeekDataInner(uint64_t pos)265 gboolean GstAppsrcWrap::SeekDataInner(uint64_t pos)
266 {
267     SeekAndFreeBuffers(pos);
268     return TRUE;
269 }
270 
ReadAndGetMem()271 int32_t GstAppsrcWrap::ReadAndGetMem()
272 {
273     int32_t ret = MSERR_OK;
274     while (ret == MSERR_OK) {
275         int32_t size = 0;
276         std::shared_ptr<AppsrcMemWrap> appSrcMem = nullptr;
277         {
278             std::unique_lock<std::mutex> lock(mutex_);
279             fillCond_.wait(lock, [this] { return (!emptyBuffers_.empty() && !atEos_) || isExit_; });
280             if (isExit_) {
281                 break;
282             }
283             appSrcMem = emptyBuffers_.front();
284             CHECK_AND_RETURN_RET_LOG(appSrcMem != nullptr && appSrcMem->mem != nullptr, MSERR_NO_MEMORY, "no mem");
285             appSrcMem->pos = curPos_;
286             emptyBuffers_.pop();
287         }
288         if (size_ == INVALID_SIZE) {
289             size = dataSrc_->ReadAt(bufferSize_, appSrcMem->mem);
290         } else {
291             size = dataSrc_->ReadAt(static_cast<int64_t>(appSrcMem->pos), bufferSize_, appSrcMem->mem);
292         }
293         if (size > appSrcMem->mem->GetSize()) {
294             ret = MSERR_INVALID_VAL;
295         }
296         {
297             std::unique_lock<std::mutex> lock(mutex_);
298             if (size == 0 || curPos_ != appSrcMem->pos) {
299                 emptyBuffers_.push(appSrcMem);
300             } else if (size < 0) {
301                 appSrcMem->size = size;
302                 atEos_ = true;
303                 filledBuffers_.push(appSrcMem);
304             } else {
305                 size = std::min(size, appSrcMem->mem->GetSize());
306                 appSrcMem->size = size;
307                 filledBufferSize_ += size;
308                 appSrcMem->pos = curPos_;
309                 appSrcMem->offset = 0;
310                 curPos_ = curPos_ + static_cast<uint64_t>(size);
311                 filledBuffers_.push(appSrcMem);
312             }
313             emptyCond_.notify_all();
314         }
315     }
316     return ret;
317 }
318 
EosAndCheckSize(int32_t size)319 void GstAppsrcWrap::EosAndCheckSize(int32_t size)
320 {
321     MEDIA_LOGD("%{public}d", size);
322     PushEos();
323     switch (size) {
324         case SOURCE_ERROR_IO:
325             OnError(MSERR_DATA_SOURCE_IO_ERROR);
326             MEDIA_LOGW("IO ERROR %{public}d", size);
327             break;
328         case SOURCE_ERROR_EOF:
329             break;
330         default:
331             OnError(MSERR_DATA_SOURCE_ERROR_UNKNOWN);
332             MEDIA_LOGE("unknown error %{public}d", size);
333             break;
334     }
335 }
336 
GetAndPushMem()337 int32_t GstAppsrcWrap::GetAndPushMem()
338 {
339     int32_t size = needDataSize_ > filledBufferSize_ ? filledBufferSize_ : needDataSize_;
340     std::shared_ptr<AppsrcMemWrap> appSrcMem = filledBuffers_.front();
341     CHECK_AND_RETURN_RET_LOG(appSrcMem != nullptr && appSrcMem->mem != nullptr, MSERR_NO_MEMORY, "no mem");
342     if (size == 0) {
343         EosAndCheckSize(appSrcMem->size);
344         filledBuffers_.pop();
345         emptyBuffers_.push(appSrcMem);
346         needData_ = false;
347         return MSERR_OK;
348     }
349     GstBuffer *buffer = nullptr;
350     if (bufferWrap_ != nullptr && bufferWrap_->buffer != nullptr) {
351         buffer = bufferWrap_->buffer;
352     } else {
353         bufferWrap_ = std::make_shared<AppsrcBufferWrap>();
354         int32_t allocSize = streamType_ == GST_APP_STREAM_TYPE_STREAM ? size : needDataSize_;
355         buffer = gst_buffer_new_allocate(nullptr, static_cast<gsize>(allocSize), nullptr);
356         CHECK_AND_RETURN_RET_LOG(buffer != nullptr, MSERR_NO_MEMORY, "no mem");
357         GST_BUFFER_OFFSET(buffer) = appSrcMem->pos + static_cast<uint64_t>(appSrcMem->offset);
358         bufferWrap_->buffer = buffer;
359         bufferWrap_->offset = 0;
360         bufferWrap_->size = allocSize;
361     }
362     GstMapInfo info = GST_MAP_INFO_INIT;
363     if (gst_buffer_map(buffer, &info, GST_MAP_WRITE) == FALSE) {
364         gst_buffer_unref(buffer);
365         MEDIA_LOGE("map buffer failed");
366         return MSERR_NO_MEMORY;
367     }
368     bool copyRet = CopyToGstBuffer(info);
369     gst_buffer_unmap(buffer, &info);
370     if (!copyRet) {
371         MEDIA_LOGE("copy buffer failed");
372         gst_buffer_unref(buffer);
373         return MSERR_NO_MEMORY;
374     }
375     if (bufferWrap_->size == bufferWrap_->offset) {
376         bufferWrap_ = nullptr;
377         PushData(buffer);
378         needDataSize_ = 0;
379         needData_ = false;
380         gst_buffer_unref(buffer);
381     } else {
382         needDataSize_ = bufferWrap_->size - bufferWrap_->offset;
383     }
384     filledBufferSize_ -= size;
385     return MSERR_OK;
386 }
387 
CopyToGstBuffer(const GstMapInfo & info)388 bool GstAppsrcWrap::CopyToGstBuffer(const GstMapInfo &info)
389 {
390     guint8 *data = info.data + bufferWrap_->offset;
391     int32_t size = static_cast<int32_t>(info.size) - bufferWrap_->offset;
392     while (size > 0 && !filledBuffers_.empty()) {
393         std::shared_ptr<AppsrcMemWrap> appSrcMem = filledBuffers_.front();
394         CHECK_AND_BREAK_LOG(appSrcMem != nullptr && appSrcMem->mem != nullptr
395             && appSrcMem->mem->GetBase() != nullptr
396             && (appSrcMem->size - appSrcMem->offset) > 0,
397             "get mem is nullptr");
398         int32_t lastSize = appSrcMem->size - appSrcMem->offset;
399         int32_t copySize = std::min(lastSize, size);
400         CHECK_AND_BREAK_LOG(memcpy_s(data, static_cast<size_t>(size),
401             appSrcMem->mem->GetBase() + appSrcMem->offset, static_cast<size_t>(copySize)) == EOK,
402             "get mem is nullptr");
403         if (lastSize <= size) {
404             filledBuffers_.pop();
405             emptyBuffers_.push(appSrcMem);
406             fillCond_.notify_all();
407         } else {
408             appSrcMem->offset += copySize;
409         }
410         data = data + copySize;
411         bufferWrap_->offset += copySize;
412         size -= copySize;
413     }
414     if (size != 0 && !filledBuffers_.empty()) {
415         return false;
416     }
417     return true;
418 }
419 
OnError(int32_t errorCode)420 void GstAppsrcWrap::OnError(int32_t errorCode)
421 {
422     if (notifier_ != nullptr) {
423         notifier_(errorCode);
424     }
425 }
426 
PushData(const GstBuffer * buffer) const427 void GstAppsrcWrap::PushData(const GstBuffer *buffer) const
428 {
429     int32_t ret = GST_FLOW_OK;
430     if (appSrc_ != nullptr) {
431         g_signal_emit_by_name(appSrc_, "push-buffer", buffer, &ret);
432     }
433 }
434 
PushEos()435 void GstAppsrcWrap::PushEos()
436 {
437     int32_t ret = GST_FLOW_OK;
438     if (appSrc_ != nullptr) {
439         g_signal_emit_by_name(appSrc_, "end-of-stream", &ret);
440     }
441     MEDIA_LOGD("appsrcPushEos ret:%{public}d", ret);
442 }
443 } // namespace Media
444 } // namespace OHOS
445