• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "gst_appsrc_engine.h"
17 #include <algorithm>
18 #include <sys/time.h>
19 #include <unistd.h>
20 #include "avdatasrcmemory.h"
21 #include "media_log.h"
22 #include "media_errors.h"
23 #include "media_dfx.h"
24 #include "securec.h"
25 #include "scope_guard.h"
26 #include "param_wrapper.h"
27 
28 namespace {
29     constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "GstAppsrcEngine"};
30     constexpr int32_t AUDIO_DEFAULT_BUFFER_SIZE = 2048000;
31     constexpr int32_t VIDEO_DEFAULT_BUFFER_SIZE = 8192000;
32     constexpr int32_t MAX_BUFFER_SIZE = 40960000;
33     constexpr int32_t PULL_SIZE = 204800;
34     constexpr int64_t UNKNOW_FILE_SIZE = -1;
35     constexpr int32_t TIME_VAL_MS = 1000;
36     constexpr int32_t TIME_VAL_US = 1000000;
37     constexpr int32_t PULL_BUFFER_TIME_OUT_MS = 100;
38     constexpr int32_t PLAY_TIME_OUT_MS = 15000;
39     constexpr int32_t PULL_BUFFER_SLEEP_US = 3000;
40 }
41 
42 namespace OHOS {
43 namespace Media {
Create(const std::shared_ptr<IMediaDataSource> & dataSrc)44 std::shared_ptr<GstAppsrcEngine> GstAppsrcEngine::Create(const std::shared_ptr<IMediaDataSource> &dataSrc)
45 {
46     MEDIA_LOGD("Create in");
47     CHECK_AND_RETURN_RET_LOG(dataSrc != nullptr, nullptr, "input dataSrc is empty!");
48     int64_t size = 0;
49     int32_t ret = dataSrc->GetSize(size);
50     CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, nullptr, "media data source get size failed!");
51     CHECK_AND_RETURN_RET_LOG(size >= UNKNOW_FILE_SIZE, nullptr,
52         "invalid file size, if unknow file size please set size = -1");
53     std::shared_ptr<GstAppsrcEngine> wrap = std::make_shared<GstAppsrcEngine>(dataSrc, size);
54     CHECK_AND_RETURN_RET_LOG(wrap->Init() == MSERR_OK, nullptr, "init failed");
55     MEDIA_LOGD("Create out");
56     return wrap;
57 }
58 
GstAppsrcEngine(const std::shared_ptr<IMediaDataSource> & dataSrc,const int64_t size)59 GstAppsrcEngine::GstAppsrcEngine(const std::shared_ptr<IMediaDataSource> &dataSrc, const int64_t size)
60     : dataSrc_(dataSrc),
61       size_(size),
62       pullTaskQue_("pullbufferTask"),
63       pushTaskQue_("pushbufferTask")
64 {
65     MEDIA_LOGD("0x%{public}06" PRIXPTR " Instances create and size %{public}" PRId64 "", FAKE_POINTER(this), size);
66     streamType_ = size == UNKNOW_FILE_SIZE ? GST_APP_STREAM_TYPE_STREAM : GST_APP_STREAM_TYPE_RANDOM_ACCESS;
67 }
68 
~GstAppsrcEngine()69 GstAppsrcEngine::~GstAppsrcEngine()
70 {
71     MEDIA_LOGD("0x%{public}06" PRIXPTR " Instances destroy", FAKE_POINTER(this));
72     Stop();
73     ClearAppsrc();
74     gst_object_unref(allocator_);
75 }
76 
Init()77 int32_t GstAppsrcEngine::Init()
78 {
79     MEDIA_LOGD("Init in");
80     appSrcMemVec_.push_back(std::make_shared<AppsrcMemory>());
81     appSrcMem_ = appSrcMemVec_[curSubscript_];
82     allocator_ = gst_shmemory_wrap_allocator_new();
83     CHECK_AND_RETURN_RET_LOG(allocator_ != nullptr, MSERR_NO_MEMORY, "Failed to create allocator");
84     MEDIA_LOGD("Init out");
85     return MSERR_OK;
86 }
87 
Prepare()88 int32_t GstAppsrcEngine::Prepare()
89 {
90     MEDIA_LOGD("Prepare in");
91     std::unique_lock<std::mutex> lock(mutex_);
92     if (appSrcMem_ && appSrcMem_->GetMem() == nullptr) {
93         uint32_t bufferSize = videoMode_ ? VIDEO_DEFAULT_BUFFER_SIZE : AUDIO_DEFAULT_BUFFER_SIZE;
94         appSrcMem_->SetBufferSize(bufferSize);
95         auto mem = AVDataSrcMemory::CreateFromLocal(
96             bufferSize, AVSharedMemory::Flags::FLAGS_READ_WRITE, "appsrc");
97         CHECK_AND_RETURN_RET_LOG(mem != nullptr, MSERR_NO_MEMORY, "init AVSharedMemory failed");
98         appSrcMem_->SetMem(mem);
99     }
100     if (decoderSwitch_) {
101         RecoverParamFromDecSwitch();
102     } else {
103         ResetConfig();
104     }
105     SetPushBufferMode();
106     CHECK_AND_RETURN_RET_LOG(pullTaskQue_.Start() == MSERR_OK, MSERR_INVALID_OPERATION, "init task failed");
107     CHECK_AND_RETURN_RET_LOG(pushTaskQue_.Start() == MSERR_OK, MSERR_INVALID_OPERATION, "init task failed");
108     auto task = std::make_shared<TaskHandler<void>>([this] { PullTask(); });
109     CHECK_AND_RETURN_RET_LOG(pullTaskQue_.EnqueueTask(task) == MSERR_OK,
110         MSERR_INVALID_OPERATION, "enque task failed");
111     task = std::make_shared<TaskHandler<void>>([this] { PushTask(); });
112     CHECK_AND_RETURN_RET_LOG(pushTaskQue_.EnqueueTask(task) == MSERR_OK,
113         MSERR_INVALID_OPERATION, "enque task failed");
114     MEDIA_LOGD("Prepare out");
115     return MSERR_OK;
116 }
117 
RecoverParamFromDecSwitch()118 void GstAppsrcEngine::RecoverParamFromDecSwitch()
119 {
120     appSrcMem_->RestoreMemParam();
121     isExit_ = false;
122     decoderSwitch_ = false;
123 }
124 
Stop()125 void GstAppsrcEngine::Stop()
126 {
127     MEDIA_LOGD("Stop in");
128     {
129         std::unique_lock<std::mutex> lock(mutex_);
130         isExit_ = true;
131         pullCond_.notify_all();
132         pushCond_.notify_all();
133     }
134     (void)pullTaskQue_.Stop();
135     (void)pushTaskQue_.Stop();
136     MEDIA_LOGD("Stop out");
137 }
138 
SetAppsrc(GstElement * appSrc)139 int32_t GstAppsrcEngine::SetAppsrc(GstElement *appSrc)
140 {
141     MEDIA_LOGD("SetAppsrc in");
142     if (appSrc_) {
143         gst_object_unref(appSrc_);
144         appSrc_ = nullptr;
145     }
146     appSrc_ = static_cast<GstElement *>(gst_object_ref(appSrc));
147     CHECK_AND_RETURN_RET_LOG(appSrc_ != nullptr, MSERR_INVALID_VAL, "set appsrc failed");
148     SetCallBackForAppSrc();
149     MEDIA_LOGD("SetAppsrc out");
150     return MSERR_OK;
151 }
152 
SetCallback(AppsrcErrorNotifier notifier)153 int32_t GstAppsrcEngine::SetCallback(AppsrcErrorNotifier notifier)
154 {
155     std::unique_lock<std::mutex> lock(mutex_);
156     notifier_ = notifier;
157     return MSERR_OK;
158 }
159 
IsLiveMode() const160 bool GstAppsrcEngine::IsLiveMode() const
161 {
162     return streamType_ == GST_APP_STREAM_TYPE_STREAM;
163 }
164 
SetVideoMode()165 void GstAppsrcEngine::SetVideoMode()
166 {
167     videoMode_ = true;
168 }
169 
SetPushBufferMode()170 void GstAppsrcEngine::SetPushBufferMode()
171 {
172     std::string copyMode;
173     int32_t res = OHOS::system::GetStringParameter("sys.media.datasrc.set.copymode", copyMode, "");
174     if (res == 0 && !copyMode.empty()) {
175         if (copyMode == "TRUE") {
176             copyMode_ = true;
177             MEDIA_LOGD("set copymode to true");
178         } else if (copyMode == "FALSE") {
179             copyMode_ = false;
180             MEDIA_LOGD("set copymode to false");
181         }
182     }
183 }
184 
DecoderSwitch()185 void GstAppsrcEngine::DecoderSwitch()
186 {
187     decoderSwitch_ = true;
188 }
189 
SetCallBackForAppSrc()190 void GstAppsrcEngine::SetCallBackForAppSrc()
191 {
192     MEDIA_LOGD("SetCallBackForAppSrc in");
193     CHECK_AND_RETURN_LOG(appSrc_ != nullptr, "appSrc_ is nullptr");
194     g_object_set(appSrc_, "stream-type", streamType_, "format", GST_FORMAT_BYTES, "size", size_, nullptr);
195     callbackIds_.push_back(g_signal_connect(appSrc_, "need-data", G_CALLBACK(NeedData), this));
196     if (streamType_ == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
197         callbackIds_.push_back(g_signal_connect(appSrc_, "seek-data", G_CALLBACK(SeekData), this));
198     }
199     MEDIA_LOGD("SetCallBackForAppSrc out, and callbackIds size is %{public}u",
200         static_cast<uint32_t>(callbackIds_.size()));
201 }
202 
ClearAppsrc()203 void GstAppsrcEngine::ClearAppsrc()
204 {
205     MEDIA_LOGD("ClearAppsrc in");
206     if (appSrc_ != nullptr) {
207         MEDIA_LOGD("callbackIds size is %{public}u", static_cast<uint32_t>(callbackIds_.size()));
208         for (auto &id : callbackIds_) {
209             g_signal_handler_disconnect(appSrc_, id);
210         }
211         callbackIds_.clear();
212         gst_object_unref(appSrc_);
213         appSrc_ = nullptr;
214     }
215     MEDIA_LOGD("ClearAppsrc out");
216 }
217 
ResetConfig()218 void GstAppsrcEngine::ResetConfig()
219 {
220     appSrcMem_->ResetMemParam();
221     atEos_ = false;
222     needData_ = false;
223     needDataSize_ = 0;
224     isExit_ = false;
225     timer_ = 0;
226     copyMode_ = false;
227 }
228 
NeedData(const GstElement * appSrc,uint32_t size,gpointer self)229 void GstAppsrcEngine::NeedData(const GstElement *appSrc, uint32_t size, gpointer self)
230 {
231     MEDIA_LOGD("NeedData in");
232     (void)appSrc;
233     CHECK_AND_RETURN_LOG(self != nullptr, "self is nullptr");
234     auto wrap = static_cast<GstAppsrcEngine *>(self);
235     wrap->NeedDataInner(size);
236     MEDIA_LOGD("NeedData out");
237 }
238 
NeedDataInner(uint32_t size)239 void GstAppsrcEngine::NeedDataInner(uint32_t size)
240 {
241     std::unique_lock<std::mutex> pullLock(pullMutex_);
242     MEDIA_LOGD("NeedDataInner in, size %{public}u atEos_ %{public}d, isExit_ %{public}d", size, atEos_, isExit_);
243     needDataSize_ = size;
244     uint32_t availableSize = appSrcMem_->GetAvailableSize();
245     if ((needDataSize_ <= availableSize || atEos_) && !isExit_) {
246         if (needDataSize_ > availableSize) {
247             needDataSize_ = availableSize;
248         }
249         bool needcopy = appSrcMem_->IsNeedCopy(needDataSize_);
250         MEDIA_LOGD("PushBuffer pushSize is %{public}u", needDataSize_);
251         int32_t ret;
252         if (availableSize == 0 && atEos_) {
253             ret = PushEos();
254         } else if (copyMode_ || needcopy) {
255             ret = PushBufferWithCopy(needDataSize_);
256         } else {
257             ret = PushBuffer(needDataSize_);
258         }
259         if (ret != MSERR_OK) {
260             OnError(MSERR_EXT_API9_NO_MEMORY, "GstAppsrcEngine:Push buffer failed.");
261         }
262     } else {
263         std::unique_lock<std::mutex> freeLock(freeMutex_);
264         uint32_t freeSize = appSrcMem_->GetFreeSize();
265         uint32_t bufferSize = appSrcMem_->GetBufferSize();
266         if (needDataSize_ > bufferSize && bufferSize < MAX_BUFFER_SIZE / 2) {
267             // 2 Increase to twice the required buffer
268             if (AddSrcMem(needDataSize_ * 2) != MSERR_OK) {
269                 OnError(MSERR_EXT_API9_NO_MEMORY, "GstAppsrcEngine:AddSrcMem failed.");
270             }
271         } else if (availableSize + (freeSize / PULL_SIZE) * PULL_SIZE < needDataSize_ &&
272             bufferSize < MAX_BUFFER_SIZE / 2) {
273             // 2 Increase to twice the original buffer
274             if (AddSrcMem(bufferSize * 2) != MSERR_OK) {
275                 OnError(MSERR_EXT_API9_NO_MEMORY, "GstAppsrcEngine:AddSrcMem failed.");
276             }
277         }
278 
279         needData_ = true;
280         MEDIA_LOGD("needData_ set to true");
281     }
282     MEDIA_LOGD("NeedDataInner out");
283 }
284 
SeekData(const GstElement * appSrc,uint64_t seekPos,gpointer self)285 gboolean GstAppsrcEngine::SeekData(const GstElement *appSrc, uint64_t seekPos, gpointer self)
286 {
287     MEDIA_LOGD("SeekData in, pos: %{public}" PRIu64 "", seekPos);
288     (void)appSrc;
289     CHECK_AND_RETURN_RET_LOG(self != nullptr, FALSE, "self is nullptr");
290     auto wrap = static_cast<GstAppsrcEngine *>(self);
291     return wrap->SeekDataInner(seekPos);
292 }
293 
SeekDataInner(uint64_t pos)294 gboolean GstAppsrcEngine::SeekDataInner(uint64_t pos)
295 {
296     MEDIA_LOGD("SeekDataInner in");
297     if (pos == appSrcMem_->GetPushOffset()) {
298         MEDIA_LOGD("Seek to current position");
299         return TRUE;
300     }
301     std::unique_lock<std::mutex> lock(mutex_);
302     std::unique_lock<std::mutex> freeLock(freeMutex_);
303     appSrcMem_->PrintCurPos();
304     appSrcMem_->SeekAndChangePos(pos);
305     atEos_ = false;
306     needData_ = false;
307     appSrcMem_->PrintCurPos();
308     pullCond_.notify_all();
309     MEDIA_LOGD("SeekDataInner out");
310 
311     return TRUE;
312 }
313 
PullTask()314 void GstAppsrcEngine::PullTask()
315 {
316     int32_t ret = PullBuffer();
317     if (ret != MSERR_OK) {
318         OnError(MSERR_EXT_API9_NO_MEMORY, "GstAppsrcEngine:Pull buffer failed.");
319     }
320 }
321 
PullBuffer()322 int32_t GstAppsrcEngine::PullBuffer()
323 {
324     int32_t ret = MSERR_OK;
325     while (ret == MSERR_OK) {
326         int32_t readSize;
327         std::unique_lock<std::mutex> lock(mutex_);
328         CHECK_AND_RETURN_RET_LOG(appSrcMem_ != nullptr, MSERR_NO_MEMORY, "no mem");
329         MEDIA_LOGD("PullBuffer loop in");
330         pullCond_.wait(lock, [this] { return (!atEos_ && appSrcMem_->GetFreeSize() >= PULL_SIZE) || isExit_; });
331         CHECK_AND_BREAK(!isExit_);
332         std::unique_lock<std::mutex> pullLock(pullMutex_);
333         appSrcMem_->PrintCurPos();
334         auto mem = appSrcMem_->GetMem();
335         int32_t pullSize = static_cast<int32_t>(appSrcMem_->GetBufferSize() - appSrcMem_->GetBeginPos());
336         pullSize = std::min(pullSize, PULL_SIZE);
337         MEDIA_LOGD("ReadAt begin, length is %{public}d", pullSize);
338         std::static_pointer_cast<AVDataSrcMemory>(mem)->SetOffset(appSrcMem_->GetBeginPos());
339         pullLock.unlock();
340         if (size_ == UNKNOW_FILE_SIZE) {
341             readSize = dataSrc_->ReadAt(mem, pullSize);
342         } else {
343             readSize = dataSrc_->ReadAt(mem, pullSize, appSrcMem_->GetFilePos());
344         }
345         pullLock.lock();
346         MEDIA_LOGD("ReadAt end, readSize is %{public}d", readSize);
347         CHECK_AND_RETURN_RET_LOG(readSize <= pullSize, MSERR_INVALID_VAL,
348             "PullBuffer loop end, readSize > length");
349 
350         if (readSize < 0) {
351             MEDIA_LOGD("no buffer, receive eos!!!");
352             atEos_ = true;
353             timer_ = 0;
354             pushCond_.notify_all();
355         } else if (readSize > 0) {
356             appSrcMem_->PullBufferAndChangePos(readSize);
357             timer_ = 0;
358             if (!playState_ && (appSrcMem_->GetAvailableSize() >= PULL_SIZE ||
359                 static_cast<int64_t>(appSrcMem_->GetAvailableSize() + appSrcMem_->GetFilePos()) >= size_)
360                 && OnBufferReport(100)) {  // 100 buffering 100%, begin set to play
361                 playState_ = true;
362             }
363             appSrcMem_->PrintCurPos();
364             pushCond_.notify_all();
365         } else if (IsConnectTimeout()) {
366             OnError(MSERR_EXT_API9_TIMEOUT, "GstAppsrcEngine:Pull buffer timeout!!!");
367         }
368         pullLock.unlock();
369         lock.unlock();
370         usleep(PULL_BUFFER_SLEEP_US);
371     }
372     return ret;
373 }
374 
PushTask()375 void GstAppsrcEngine::PushTask()
376 {
377     int32_t ret = MSERR_OK;
378     while (ret == MSERR_OK) {
379         std::unique_lock<std::mutex> lock(mutex_);
380         pushCond_.wait(lock, [this] {
381             return ((appSrcMem_->GetAvailableSize() >= needDataSize_ || atEos_) && needData_) || isExit_;
382         });
383         uint32_t availableSize = appSrcMem_->GetAvailableSize();
384         if (isExit_) {
385             break;
386         }
387         if (needData_) {
388             // pushSize is min(needDataSize_, availableSize, appSrcMem_->bufferSize - appSrcMem_->availableBegin)
389             if (needDataSize_ > availableSize) {
390                 needDataSize_ = availableSize;
391             }
392             bool needcopy = appSrcMem_->IsNeedCopy(needDataSize_);
393             MEDIA_LOGD("PushBuffer pushSize is %{public}d", needDataSize_);
394             if (availableSize == 0 && atEos_) {
395                 ret = PushEos();
396             } else if (copyMode_ || needcopy) {
397                 ret = PushBufferWithCopy(needDataSize_);
398             } else {
399                 ret = PushBuffer(needDataSize_);
400             }
401         }
402     }
403     if (ret != MSERR_OK) {
404         OnError(MSERR_EXT_API9_NO_MEMORY, "GstAppsrcEngine:Push buffer failed.");
405     }
406 }
407 
PushEos()408 int32_t GstAppsrcEngine::PushEos()
409 {
410     MEDIA_LOGD("push eos");
411     int32_t ret = gst_app_src_end_of_stream(GST_APP_SRC_CAST(appSrc_));
412     CHECK_AND_RETURN_RET_LOG(ret == GST_FLOW_OK, MSERR_INVALID_OPERATION, "Push eos failed!");
413     needData_ = false;
414     return MSERR_OK;
415 }
416 
PushBuffer(uint32_t pushSize)417 int32_t GstAppsrcEngine::PushBuffer(uint32_t pushSize)
418 {
419     MEDIA_LOGD("PushBuffer in");
420     CHECK_AND_RETURN_RET_LOG(appSrcMem_ != nullptr && appSrcMem_->GetMem() != nullptr, MSERR_NO_MEMORY, "no mem");
421     appSrcMem_->PrintCurPos();
422 
423     auto freeMemory = [this](uint32_t offset, uint32_t length, uint32_t curSubscript_) {
424         this->FreePointerMemory(offset, length, curSubscript_);
425     };
426     GstMemory *mem = gst_shmemory_wrap(GST_ALLOCATOR_CAST(allocator_),
427         appSrcMem_->GetMem(), appSrcMem_->GetAvailableBeginPos(), pushSize, curSubscript_, freeMemory);
428 
429     CHECK_AND_RETURN_RET_LOG(mem != nullptr, MSERR_NO_MEMORY, "Failed to call gst_shmemory_wrap");
430     ON_SCOPE_EXIT(0) { gst_memory_unref(mem); };
431     GstBuffer *buffer = gst_buffer_new();
432     CHECK_AND_RETURN_RET_LOG(buffer != nullptr, MSERR_NO_MEMORY, "Failed to call gst_buffer_new");
433 
434     gst_buffer_append_memory(buffer, mem);
435     GST_BUFFER_OFFSET(buffer) = appSrcMem_->GetPushOffset();
436     MEDIA_LOGD("buffer offset is %{public}" PRId64 "", appSrcMem_->GetPushOffset());
437     appSrcMem_->PushBufferAndChangePos(pushSize, false);
438     if (needDataSize_ == pushSize) {
439         needData_ = false;
440     }
441     needDataSize_ -= pushSize;
442     (void)gst_app_src_push_buffer(GST_APP_SRC_CAST(appSrc_), buffer);
443 
444     appSrcMem_->PrintCurPos();
445     MEDIA_LOGD("PushBuffer out");
446     CANCEL_SCOPE_EXIT_GUARD(0);
447     return MSERR_OK;
448 }
449 
PushBufferWithCopy(uint32_t pushSize)450 int32_t GstAppsrcEngine::PushBufferWithCopy(uint32_t pushSize)
451 {
452     MEDIA_LOGD("PushBufferWithCopy in");
453     std::unique_lock<std::mutex> freeLock(freeMutex_);
454     CHECK_AND_RETURN_RET_LOG(appSrcMem_ != nullptr && appSrcMem_->GetMem() != nullptr, MSERR_NO_MEMORY, "no mem");
455     appSrcMem_->PrintCurPos();
456 
457     GstBuffer *buffer = gst_buffer_new_allocate(nullptr, static_cast<gsize>(pushSize), nullptr);
458     CHECK_AND_RETURN_RET_LOG(buffer != nullptr, MSERR_NO_MEMORY, "no mem");
459     ON_SCOPE_EXIT(0) {  gst_buffer_unref(buffer); };
460     GstMapInfo info = GST_MAP_INFO_INIT;
461     CHECK_AND_RETURN_RET_LOG(gst_buffer_map(buffer, &info, GST_MAP_WRITE) != FALSE,
462         MSERR_NO_MEMORY, "map buffer failed");
463 
464     errno_t rc;
465     guint8 *data = info.data;
466     uint8_t *srcBase = std::static_pointer_cast<AVDataSrcMemory>(appSrcMem_->GetMem())->GetInnerBase();
467     uint32_t bufferSize = appSrcMem_->GetBufferSize();
468     uint32_t copyBegin = appSrcMem_->GetAvailableBeginPos();
469     if (!appSrcMem_->IsNeedCopy(pushSize)) {
470         rc = memcpy_s(data, pushSize, srcBase + copyBegin, pushSize);
471         CHECK_AND_RETURN_RET_LOG(rc == EOK, MSERR_NO_MEMORY, "get mem is nullptr");
472     } else {
473         uint32_t dataSize = bufferSize - copyBegin;
474         rc = memcpy_s(data, dataSize, srcBase + copyBegin, dataSize);
475         CHECK_AND_RETURN_RET_LOG(rc == EOK, MSERR_NO_MEMORY, "get mem is failed");
476         rc = memcpy_s(data + dataSize, pushSize - dataSize, srcBase, pushSize - dataSize);
477         CHECK_AND_RETURN_RET_LOG(rc == EOK, MSERR_NO_MEMORY, "get mem is failed");
478     }
479     gst_buffer_unmap(buffer, &info);
480     GST_BUFFER_OFFSET(buffer) = appSrcMem_->GetPushOffset();
481     MEDIA_LOGD("buffer offset is %{public}" PRId64 "", appSrcMem_->GetPushOffset());
482     appSrcMem_->PushBufferAndChangePos(pushSize, true);
483 
484     if (needDataSize_ == pushSize) {
485         needData_ = false;
486     }
487     needDataSize_ -= pushSize;
488     appSrcMem_->SetNoFreeBuffer(false);
489     (void)gst_app_src_push_buffer(GST_APP_SRC_CAST(appSrc_), buffer);
490 
491     appSrcMem_->PrintCurPos();
492     pullCond_.notify_all();
493     MEDIA_LOGD("PushBufferWithCopy out");
494     CANCEL_SCOPE_EXIT_GUARD(0);
495     return MSERR_OK;
496 }
497 
AddSrcMem(uint32_t bufferSize)498 int32_t GstAppsrcEngine::AddSrcMem(uint32_t bufferSize)
499 {
500     MEDIA_LOGD("AddSrcMem in");
501     appSrcMemVec_.push_back(std::make_shared<AppsrcMemory>());
502     curSubscript_ += 1;
503     MEDIA_LOGD("curSubscript_ change to %{public}u", curSubscript_);
504     std::shared_ptr<AppsrcMemory> appSrcMemTemp = appSrcMem_;
505     appSrcMem_ = appSrcMemVec_[curSubscript_];
506     CHECK_AND_RETURN_RET_LOG(appSrcMem_ != nullptr, MSERR_NO_MEMORY, "appSrcMem_ is nullptr");
507 
508     appSrcMem_->SetBufferSize(bufferSize);
509     auto mem = AVDataSrcMemory::CreateFromLocal(
510         bufferSize, AVSharedMemory::Flags::FLAGS_READ_WRITE, "appsrc");
511     CHECK_AND_RETURN_RET_LOG(mem != nullptr, MSERR_NO_MEMORY, "init AVSharedMemory failed");
512     appSrcMem_->SetMem(mem);
513     appSrcMem_->ResetMemParam();
514 
515     bool ret = appSrcMem_->CopyBufferAndChangePos(appSrcMemTemp);
516     CHECK_AND_RETURN_RET_LOG(ret == true, MSERR_NO_MEMORY, "copy mem is failed");
517 
518     pullCond_.notify_all();
519     MEDIA_LOGD("AddSrcMem out");
520     return MSERR_OK;
521 }
522 
OnError(int32_t errorCode,const std::string & message)523 void GstAppsrcEngine::OnError(int32_t errorCode, const std::string &message)
524 {
525     isExit_ = true;
526     pullCond_.notify_all();
527     pushCond_.notify_all();
528     InnerMessage innerMsg {};
529     innerMsg.type = INNER_MSG_ERROR;
530     innerMsg.detail1 = errorCode;
531     innerMsg.extend = message;
532     (void)ReportMessage(innerMsg);
533 }
534 
OnBufferReport(int32_t percent)535 bool GstAppsrcEngine::OnBufferReport(int32_t percent)
536 {
537     InnerMessage innerMsg {};
538     innerMsg.type = INNER_MSG_BUFFERING;
539     innerMsg.detail1 = percent;
540     return ReportMessage(innerMsg);
541 }
542 
ReportMessage(const InnerMessage & msg)543 bool GstAppsrcEngine::ReportMessage(const InnerMessage &msg)
544 {
545     if (notifier_ != nullptr) {
546         return notifier_(msg);
547     }
548     return false;
549 }
550 
FreePointerMemory(uint32_t offset,uint32_t length,uint32_t subscript)551 void GstAppsrcEngine::FreePointerMemory(uint32_t offset, uint32_t length, uint32_t subscript)
552 {
553     MEDIA_LOGD("FreePointerMemory in, offset is %{public}u, length is %{public}u, subscript is %{public}u",
554         offset, length, subscript);
555     std::unique_lock<std::mutex> freeLock(freeMutex_);
556     CHECK_AND_RETURN_LOG(subscript <= appSrcMemVec_.size(), "Check buffer pool subscript  failed");
557     std::shared_ptr<AppsrcMemory> mem = appSrcMemVec_[subscript];
558     CHECK_AND_RETURN_LOG(mem != nullptr, "Buffer pool has been free");
559 
560     mem->PrintCurPos();
561     mem->CheckBufferUsage();
562     CHECK_AND_RETURN_LOG(mem->FreeBufferAndChangePos(offset, length, copyMode_),
563         "Bufferpool checkout failed.");
564     mem->PrintCurPos();
565     if (subscript == curSubscript_) {
566         pullCond_.notify_all();
567     } else if (mem->GetFreeSize() == mem->GetBufferSize()) {
568         mem->SetMem(nullptr);
569         mem = nullptr;
570     }
571     MEDIA_LOGD("FreePointerMemory out");
572 }
573 
GetTime()574 static int64_t GetTime()
575 {
576     struct timeval time = {};
577     int ret = gettimeofday(&time, nullptr);
578     CHECK_AND_RETURN_RET_LOG(ret != -1, -1, "Get current time failed!");
579     return static_cast<int64_t>(time.tv_sec) * TIME_VAL_MS +
580         static_cast<int64_t>(time.tv_usec) * TIME_VAL_MS / TIME_VAL_US;
581 }
582 
IsConnectTimeout()583 bool GstAppsrcEngine::IsConnectTimeout()
584 {
585     if (!needData_) {
586         return false;
587     }
588     if (timer_ == 0) {
589         timer_ = GetTime();
590         MEDIA_LOGI("Waiting to receive data");
591     } else {
592         int64_t curTime = GetTime();
593         if (curTime - timer_ > PULL_BUFFER_TIME_OUT_MS && playState_ && OnBufferReport(0)) {
594             playState_ = false;
595         } else if (curTime - timer_ > PLAY_TIME_OUT_MS) {
596             MEDIA_LOGE("No data was received for 15 seconds");
597             return true;
598         }
599     }
600     return false;
601 }
602 } // namespace Media
603 } // namespace OHOS
604