1 /*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "gst_appsrc_engine.h"
17 #include <algorithm>
18 #include <sys/time.h>
19 #include <unistd.h>
20 #include "avdatasrcmemory.h"
21 #include "media_log.h"
22 #include "media_errors.h"
23 #include "media_dfx.h"
24 #include "securec.h"
25 #include "scope_guard.h"
26 #include "param_wrapper.h"
27
28 namespace {
29 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "GstAppsrcEngine"};
30 constexpr int32_t AUDIO_DEFAULT_BUFFER_SIZE = 2048000;
31 constexpr int32_t VIDEO_DEFAULT_BUFFER_SIZE = 8192000;
32 constexpr int32_t MAX_BUFFER_SIZE = 40960000;
33 constexpr int32_t PULL_SIZE = 204800;
34 constexpr int64_t UNKNOW_FILE_SIZE = -1;
35 constexpr int32_t TIME_VAL_MS = 1000;
36 constexpr int32_t TIME_VAL_US = 1000000;
37 constexpr int32_t PULL_BUFFER_TIME_OUT_MS = 100;
38 constexpr int32_t PLAY_TIME_OUT_MS = 15000;
39 constexpr int32_t PULL_BUFFER_SLEEP_US = 3000;
40 }
41
42 namespace OHOS {
43 namespace Media {
Create(const std::shared_ptr<IMediaDataSource> & dataSrc)44 std::shared_ptr<GstAppsrcEngine> GstAppsrcEngine::Create(const std::shared_ptr<IMediaDataSource> &dataSrc)
45 {
46 MEDIA_LOGD("Create in");
47 CHECK_AND_RETURN_RET_LOG(dataSrc != nullptr, nullptr, "input dataSrc is empty!");
48 int64_t size = 0;
49 int32_t ret = dataSrc->GetSize(size);
50 CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, nullptr, "media data source get size failed!");
51 CHECK_AND_RETURN_RET_LOG(size >= UNKNOW_FILE_SIZE, nullptr,
52 "invalid file size, if unknow file size please set size = -1");
53 std::shared_ptr<GstAppsrcEngine> wrap = std::make_shared<GstAppsrcEngine>(dataSrc, size);
54 CHECK_AND_RETURN_RET_LOG(wrap->Init() == MSERR_OK, nullptr, "init failed");
55 MEDIA_LOGD("Create out");
56 return wrap;
57 }
58
GstAppsrcEngine(const std::shared_ptr<IMediaDataSource> & dataSrc,const int64_t size)59 GstAppsrcEngine::GstAppsrcEngine(const std::shared_ptr<IMediaDataSource> &dataSrc, const int64_t size)
60 : dataSrc_(dataSrc),
61 size_(size),
62 pullTaskQue_("pullbufferTask"),
63 pushTaskQue_("pushbufferTask")
64 {
65 MEDIA_LOGD("0x%{public}06" PRIXPTR " Instances create and size %{public}" PRId64 "", FAKE_POINTER(this), size);
66 streamType_ = size == UNKNOW_FILE_SIZE ? GST_APP_STREAM_TYPE_STREAM : GST_APP_STREAM_TYPE_RANDOM_ACCESS;
67 }
68
~GstAppsrcEngine()69 GstAppsrcEngine::~GstAppsrcEngine()
70 {
71 MEDIA_LOGD("0x%{public}06" PRIXPTR " Instances destroy", FAKE_POINTER(this));
72 Stop();
73 ClearAppsrc();
74 gst_object_unref(allocator_);
75 }
76
Init()77 int32_t GstAppsrcEngine::Init()
78 {
79 MEDIA_LOGD("Init in");
80 appSrcMemVec_.push_back(std::make_shared<AppsrcMemory>());
81 appSrcMem_ = appSrcMemVec_[curSubscript_];
82 allocator_ = gst_shmemory_wrap_allocator_new();
83 CHECK_AND_RETURN_RET_LOG(allocator_ != nullptr, MSERR_NO_MEMORY, "Failed to create allocator");
84 MEDIA_LOGD("Init out");
85 return MSERR_OK;
86 }
87
Prepare()88 int32_t GstAppsrcEngine::Prepare()
89 {
90 MEDIA_LOGD("Prepare in");
91 std::unique_lock<std::mutex> lock(mutex_);
92 if (appSrcMem_ && appSrcMem_->GetMem() == nullptr) {
93 uint32_t bufferSize = videoMode_ ? VIDEO_DEFAULT_BUFFER_SIZE : AUDIO_DEFAULT_BUFFER_SIZE;
94 appSrcMem_->SetBufferSize(bufferSize);
95 auto mem = AVDataSrcMemory::CreateFromLocal(
96 bufferSize, AVSharedMemory::Flags::FLAGS_READ_WRITE, "appsrc");
97 CHECK_AND_RETURN_RET_LOG(mem != nullptr, MSERR_NO_MEMORY, "init AVSharedMemory failed");
98 appSrcMem_->SetMem(mem);
99 }
100 if (decoderSwitch_) {
101 RecoverParamFromDecSwitch();
102 } else {
103 ResetConfig();
104 }
105 SetPushBufferMode();
106 CHECK_AND_RETURN_RET_LOG(pullTaskQue_.Start() == MSERR_OK, MSERR_INVALID_OPERATION, "init task failed");
107 CHECK_AND_RETURN_RET_LOG(pushTaskQue_.Start() == MSERR_OK, MSERR_INVALID_OPERATION, "init task failed");
108 auto task = std::make_shared<TaskHandler<void>>([this] { PullTask(); });
109 CHECK_AND_RETURN_RET_LOG(pullTaskQue_.EnqueueTask(task) == MSERR_OK,
110 MSERR_INVALID_OPERATION, "enque task failed");
111 task = std::make_shared<TaskHandler<void>>([this] { PushTask(); });
112 CHECK_AND_RETURN_RET_LOG(pushTaskQue_.EnqueueTask(task) == MSERR_OK,
113 MSERR_INVALID_OPERATION, "enque task failed");
114 MEDIA_LOGD("Prepare out");
115 return MSERR_OK;
116 }
117
RecoverParamFromDecSwitch()118 void GstAppsrcEngine::RecoverParamFromDecSwitch()
119 {
120 appSrcMem_->RestoreMemParam();
121 isExit_ = false;
122 decoderSwitch_ = false;
123 }
124
Stop()125 void GstAppsrcEngine::Stop()
126 {
127 MEDIA_LOGD("Stop in");
128 {
129 std::unique_lock<std::mutex> lock(mutex_);
130 isExit_ = true;
131 pullCond_.notify_all();
132 pushCond_.notify_all();
133 }
134 (void)pullTaskQue_.Stop();
135 (void)pushTaskQue_.Stop();
136 MEDIA_LOGD("Stop out");
137 }
138
SetAppsrc(GstElement * appSrc)139 int32_t GstAppsrcEngine::SetAppsrc(GstElement *appSrc)
140 {
141 MEDIA_LOGD("SetAppsrc in");
142 if (appSrc_) {
143 gst_object_unref(appSrc_);
144 appSrc_ = nullptr;
145 }
146 appSrc_ = static_cast<GstElement *>(gst_object_ref(appSrc));
147 CHECK_AND_RETURN_RET_LOG(appSrc_ != nullptr, MSERR_INVALID_VAL, "set appsrc failed");
148 SetCallBackForAppSrc();
149 MEDIA_LOGD("SetAppsrc out");
150 return MSERR_OK;
151 }
152
SetCallback(AppsrcErrorNotifier notifier)153 int32_t GstAppsrcEngine::SetCallback(AppsrcErrorNotifier notifier)
154 {
155 std::unique_lock<std::mutex> lock(mutex_);
156 notifier_ = notifier;
157 return MSERR_OK;
158 }
159
IsLiveMode() const160 bool GstAppsrcEngine::IsLiveMode() const
161 {
162 return streamType_ == GST_APP_STREAM_TYPE_STREAM;
163 }
164
SetVideoMode()165 void GstAppsrcEngine::SetVideoMode()
166 {
167 videoMode_ = true;
168 }
169
SetPushBufferMode()170 void GstAppsrcEngine::SetPushBufferMode()
171 {
172 std::string copyMode;
173 int32_t res = OHOS::system::GetStringParameter("sys.media.datasrc.set.copymode", copyMode, "");
174 if (res == 0 && !copyMode.empty()) {
175 if (copyMode == "TRUE") {
176 copyMode_ = true;
177 MEDIA_LOGD("set copymode to true");
178 } else if (copyMode == "FALSE") {
179 copyMode_ = false;
180 MEDIA_LOGD("set copymode to false");
181 }
182 }
183 }
184
DecoderSwitch()185 void GstAppsrcEngine::DecoderSwitch()
186 {
187 decoderSwitch_ = true;
188 }
189
SetCallBackForAppSrc()190 void GstAppsrcEngine::SetCallBackForAppSrc()
191 {
192 MEDIA_LOGD("SetCallBackForAppSrc in");
193 CHECK_AND_RETURN_LOG(appSrc_ != nullptr, "appSrc_ is nullptr");
194 g_object_set(appSrc_, "stream-type", streamType_, "format", GST_FORMAT_BYTES, "size", size_, nullptr);
195 callbackIds_.push_back(g_signal_connect(appSrc_, "need-data", G_CALLBACK(NeedData), this));
196 if (streamType_ == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
197 callbackIds_.push_back(g_signal_connect(appSrc_, "seek-data", G_CALLBACK(SeekData), this));
198 }
199 MEDIA_LOGD("SetCallBackForAppSrc out, and callbackIds size is %{public}u",
200 static_cast<uint32_t>(callbackIds_.size()));
201 }
202
ClearAppsrc()203 void GstAppsrcEngine::ClearAppsrc()
204 {
205 MEDIA_LOGD("ClearAppsrc in");
206 if (appSrc_ != nullptr) {
207 MEDIA_LOGD("callbackIds size is %{public}u", static_cast<uint32_t>(callbackIds_.size()));
208 for (auto &id : callbackIds_) {
209 g_signal_handler_disconnect(appSrc_, id);
210 }
211 callbackIds_.clear();
212 gst_object_unref(appSrc_);
213 appSrc_ = nullptr;
214 }
215 MEDIA_LOGD("ClearAppsrc out");
216 }
217
ResetConfig()218 void GstAppsrcEngine::ResetConfig()
219 {
220 appSrcMem_->ResetMemParam();
221 atEos_ = false;
222 needData_ = false;
223 needDataSize_ = 0;
224 isExit_ = false;
225 timer_ = 0;
226 copyMode_ = false;
227 }
228
NeedData(const GstElement * appSrc,uint32_t size,gpointer self)229 void GstAppsrcEngine::NeedData(const GstElement *appSrc, uint32_t size, gpointer self)
230 {
231 MEDIA_LOGD("NeedData in");
232 (void)appSrc;
233 CHECK_AND_RETURN_LOG(self != nullptr, "self is nullptr");
234 auto wrap = static_cast<GstAppsrcEngine *>(self);
235 wrap->NeedDataInner(size);
236 MEDIA_LOGD("NeedData out");
237 }
238
NeedDataInner(uint32_t size)239 void GstAppsrcEngine::NeedDataInner(uint32_t size)
240 {
241 std::unique_lock<std::mutex> pullLock(pullMutex_);
242 MEDIA_LOGD("NeedDataInner in, size %{public}u atEos_ %{public}d, isExit_ %{public}d", size, atEos_, isExit_);
243 needDataSize_ = size;
244 uint32_t availableSize = appSrcMem_->GetAvailableSize();
245 if ((needDataSize_ <= availableSize || atEos_) && !isExit_) {
246 if (needDataSize_ > availableSize) {
247 needDataSize_ = availableSize;
248 }
249 bool needcopy = appSrcMem_->IsNeedCopy(needDataSize_);
250 MEDIA_LOGD("PushBuffer pushSize is %{public}u", needDataSize_);
251 int32_t ret;
252 if (availableSize == 0 && atEos_) {
253 ret = PushEos();
254 } else if (copyMode_ || needcopy) {
255 ret = PushBufferWithCopy(needDataSize_);
256 } else {
257 ret = PushBuffer(needDataSize_);
258 }
259 if (ret != MSERR_OK) {
260 OnError(MSERR_EXT_API9_NO_MEMORY, "GstAppsrcEngine:Push buffer failed.");
261 }
262 } else {
263 std::unique_lock<std::mutex> freeLock(freeMutex_);
264 uint32_t freeSize = appSrcMem_->GetFreeSize();
265 uint32_t bufferSize = appSrcMem_->GetBufferSize();
266 if (needDataSize_ > bufferSize && bufferSize < MAX_BUFFER_SIZE / 2) {
267 // 2 Increase to twice the required buffer
268 if (AddSrcMem(needDataSize_ * 2) != MSERR_OK) {
269 OnError(MSERR_EXT_API9_NO_MEMORY, "GstAppsrcEngine:AddSrcMem failed.");
270 }
271 } else if (availableSize + (freeSize / PULL_SIZE) * PULL_SIZE < needDataSize_ &&
272 bufferSize < MAX_BUFFER_SIZE / 2) {
273 // 2 Increase to twice the original buffer
274 if (AddSrcMem(bufferSize * 2) != MSERR_OK) {
275 OnError(MSERR_EXT_API9_NO_MEMORY, "GstAppsrcEngine:AddSrcMem failed.");
276 }
277 }
278
279 needData_ = true;
280 MEDIA_LOGD("needData_ set to true");
281 }
282 MEDIA_LOGD("NeedDataInner out");
283 }
284
SeekData(const GstElement * appSrc,uint64_t seekPos,gpointer self)285 gboolean GstAppsrcEngine::SeekData(const GstElement *appSrc, uint64_t seekPos, gpointer self)
286 {
287 MEDIA_LOGD("SeekData in, pos: %{public}" PRIu64 "", seekPos);
288 (void)appSrc;
289 CHECK_AND_RETURN_RET_LOG(self != nullptr, FALSE, "self is nullptr");
290 auto wrap = static_cast<GstAppsrcEngine *>(self);
291 return wrap->SeekDataInner(seekPos);
292 }
293
SeekDataInner(uint64_t pos)294 gboolean GstAppsrcEngine::SeekDataInner(uint64_t pos)
295 {
296 MEDIA_LOGD("SeekDataInner in");
297 if (pos == appSrcMem_->GetPushOffset()) {
298 MEDIA_LOGD("Seek to current position");
299 return TRUE;
300 }
301 std::unique_lock<std::mutex> lock(mutex_);
302 std::unique_lock<std::mutex> freeLock(freeMutex_);
303 appSrcMem_->PrintCurPos();
304 appSrcMem_->SeekAndChangePos(pos);
305 atEos_ = false;
306 needData_ = false;
307 appSrcMem_->PrintCurPos();
308 pullCond_.notify_all();
309 MEDIA_LOGD("SeekDataInner out");
310
311 return TRUE;
312 }
313
PullTask()314 void GstAppsrcEngine::PullTask()
315 {
316 int32_t ret = PullBuffer();
317 if (ret != MSERR_OK) {
318 OnError(MSERR_EXT_API9_NO_MEMORY, "GstAppsrcEngine:Pull buffer failed.");
319 }
320 }
321
PullBuffer()322 int32_t GstAppsrcEngine::PullBuffer()
323 {
324 int32_t ret = MSERR_OK;
325 while (ret == MSERR_OK) {
326 int32_t readSize;
327 std::unique_lock<std::mutex> lock(mutex_);
328 CHECK_AND_RETURN_RET_LOG(appSrcMem_ != nullptr, MSERR_NO_MEMORY, "no mem");
329 MEDIA_LOGD("PullBuffer loop in");
330 pullCond_.wait(lock, [this] { return (!atEos_ && appSrcMem_->GetFreeSize() >= PULL_SIZE) || isExit_; });
331 CHECK_AND_BREAK(!isExit_);
332 std::unique_lock<std::mutex> pullLock(pullMutex_);
333 appSrcMem_->PrintCurPos();
334 auto mem = appSrcMem_->GetMem();
335 int32_t pullSize = static_cast<int32_t>(appSrcMem_->GetBufferSize() - appSrcMem_->GetBeginPos());
336 pullSize = std::min(pullSize, PULL_SIZE);
337 MEDIA_LOGD("ReadAt begin, length is %{public}d", pullSize);
338 std::static_pointer_cast<AVDataSrcMemory>(mem)->SetOffset(appSrcMem_->GetBeginPos());
339 pullLock.unlock();
340 if (size_ == UNKNOW_FILE_SIZE) {
341 readSize = dataSrc_->ReadAt(mem, pullSize);
342 } else {
343 readSize = dataSrc_->ReadAt(mem, pullSize, appSrcMem_->GetFilePos());
344 }
345 pullLock.lock();
346 MEDIA_LOGD("ReadAt end, readSize is %{public}d", readSize);
347 CHECK_AND_RETURN_RET_LOG(readSize <= pullSize, MSERR_INVALID_VAL,
348 "PullBuffer loop end, readSize > length");
349
350 if (readSize < 0) {
351 MEDIA_LOGD("no buffer, receive eos!!!");
352 atEos_ = true;
353 timer_ = 0;
354 pushCond_.notify_all();
355 } else if (readSize > 0) {
356 appSrcMem_->PullBufferAndChangePos(readSize);
357 timer_ = 0;
358 if (!playState_ && (appSrcMem_->GetAvailableSize() >= PULL_SIZE ||
359 static_cast<int64_t>(appSrcMem_->GetAvailableSize() + appSrcMem_->GetFilePos()) >= size_)
360 && OnBufferReport(100)) { // 100 buffering 100%, begin set to play
361 playState_ = true;
362 }
363 appSrcMem_->PrintCurPos();
364 pushCond_.notify_all();
365 } else if (IsConnectTimeout()) {
366 OnError(MSERR_EXT_API9_TIMEOUT, "GstAppsrcEngine:Pull buffer timeout!!!");
367 }
368 pullLock.unlock();
369 lock.unlock();
370 usleep(PULL_BUFFER_SLEEP_US);
371 }
372 return ret;
373 }
374
PushTask()375 void GstAppsrcEngine::PushTask()
376 {
377 int32_t ret = MSERR_OK;
378 while (ret == MSERR_OK) {
379 std::unique_lock<std::mutex> lock(mutex_);
380 pushCond_.wait(lock, [this] {
381 return ((appSrcMem_->GetAvailableSize() >= needDataSize_ || atEos_) && needData_) || isExit_;
382 });
383 uint32_t availableSize = appSrcMem_->GetAvailableSize();
384 if (isExit_) {
385 break;
386 }
387 if (needData_) {
388 // pushSize is min(needDataSize_, availableSize, appSrcMem_->bufferSize - appSrcMem_->availableBegin)
389 if (needDataSize_ > availableSize) {
390 needDataSize_ = availableSize;
391 }
392 bool needcopy = appSrcMem_->IsNeedCopy(needDataSize_);
393 MEDIA_LOGD("PushBuffer pushSize is %{public}d", needDataSize_);
394 if (availableSize == 0 && atEos_) {
395 ret = PushEos();
396 } else if (copyMode_ || needcopy) {
397 ret = PushBufferWithCopy(needDataSize_);
398 } else {
399 ret = PushBuffer(needDataSize_);
400 }
401 }
402 }
403 if (ret != MSERR_OK) {
404 OnError(MSERR_EXT_API9_NO_MEMORY, "GstAppsrcEngine:Push buffer failed.");
405 }
406 }
407
PushEos()408 int32_t GstAppsrcEngine::PushEos()
409 {
410 MEDIA_LOGD("push eos");
411 int32_t ret = gst_app_src_end_of_stream(GST_APP_SRC_CAST(appSrc_));
412 CHECK_AND_RETURN_RET_LOG(ret == GST_FLOW_OK, MSERR_INVALID_OPERATION, "Push eos failed!");
413 needData_ = false;
414 return MSERR_OK;
415 }
416
PushBuffer(uint32_t pushSize)417 int32_t GstAppsrcEngine::PushBuffer(uint32_t pushSize)
418 {
419 MEDIA_LOGD("PushBuffer in");
420 CHECK_AND_RETURN_RET_LOG(appSrcMem_ != nullptr && appSrcMem_->GetMem() != nullptr, MSERR_NO_MEMORY, "no mem");
421 appSrcMem_->PrintCurPos();
422
423 auto freeMemory = [this](uint32_t offset, uint32_t length, uint32_t curSubscript_) {
424 this->FreePointerMemory(offset, length, curSubscript_);
425 };
426 GstMemory *mem = gst_shmemory_wrap(GST_ALLOCATOR_CAST(allocator_),
427 appSrcMem_->GetMem(), appSrcMem_->GetAvailableBeginPos(), pushSize, curSubscript_, freeMemory);
428
429 CHECK_AND_RETURN_RET_LOG(mem != nullptr, MSERR_NO_MEMORY, "Failed to call gst_shmemory_wrap");
430 ON_SCOPE_EXIT(0) { gst_memory_unref(mem); };
431 GstBuffer *buffer = gst_buffer_new();
432 CHECK_AND_RETURN_RET_LOG(buffer != nullptr, MSERR_NO_MEMORY, "Failed to call gst_buffer_new");
433
434 gst_buffer_append_memory(buffer, mem);
435 GST_BUFFER_OFFSET(buffer) = appSrcMem_->GetPushOffset();
436 MEDIA_LOGD("buffer offset is %{public}" PRId64 "", appSrcMem_->GetPushOffset());
437 appSrcMem_->PushBufferAndChangePos(pushSize, false);
438 if (needDataSize_ == pushSize) {
439 needData_ = false;
440 }
441 needDataSize_ -= pushSize;
442 (void)gst_app_src_push_buffer(GST_APP_SRC_CAST(appSrc_), buffer);
443
444 appSrcMem_->PrintCurPos();
445 MEDIA_LOGD("PushBuffer out");
446 CANCEL_SCOPE_EXIT_GUARD(0);
447 return MSERR_OK;
448 }
449
PushBufferWithCopy(uint32_t pushSize)450 int32_t GstAppsrcEngine::PushBufferWithCopy(uint32_t pushSize)
451 {
452 MEDIA_LOGD("PushBufferWithCopy in");
453 std::unique_lock<std::mutex> freeLock(freeMutex_);
454 CHECK_AND_RETURN_RET_LOG(appSrcMem_ != nullptr && appSrcMem_->GetMem() != nullptr, MSERR_NO_MEMORY, "no mem");
455 appSrcMem_->PrintCurPos();
456
457 GstBuffer *buffer = gst_buffer_new_allocate(nullptr, static_cast<gsize>(pushSize), nullptr);
458 CHECK_AND_RETURN_RET_LOG(buffer != nullptr, MSERR_NO_MEMORY, "no mem");
459 ON_SCOPE_EXIT(0) { gst_buffer_unref(buffer); };
460 GstMapInfo info = GST_MAP_INFO_INIT;
461 CHECK_AND_RETURN_RET_LOG(gst_buffer_map(buffer, &info, GST_MAP_WRITE) != FALSE,
462 MSERR_NO_MEMORY, "map buffer failed");
463
464 errno_t rc;
465 guint8 *data = info.data;
466 uint8_t *srcBase = std::static_pointer_cast<AVDataSrcMemory>(appSrcMem_->GetMem())->GetInnerBase();
467 uint32_t bufferSize = appSrcMem_->GetBufferSize();
468 uint32_t copyBegin = appSrcMem_->GetAvailableBeginPos();
469 if (!appSrcMem_->IsNeedCopy(pushSize)) {
470 rc = memcpy_s(data, pushSize, srcBase + copyBegin, pushSize);
471 CHECK_AND_RETURN_RET_LOG(rc == EOK, MSERR_NO_MEMORY, "get mem is nullptr");
472 } else {
473 uint32_t dataSize = bufferSize - copyBegin;
474 rc = memcpy_s(data, dataSize, srcBase + copyBegin, dataSize);
475 CHECK_AND_RETURN_RET_LOG(rc == EOK, MSERR_NO_MEMORY, "get mem is failed");
476 rc = memcpy_s(data + dataSize, pushSize - dataSize, srcBase, pushSize - dataSize);
477 CHECK_AND_RETURN_RET_LOG(rc == EOK, MSERR_NO_MEMORY, "get mem is failed");
478 }
479 gst_buffer_unmap(buffer, &info);
480 GST_BUFFER_OFFSET(buffer) = appSrcMem_->GetPushOffset();
481 MEDIA_LOGD("buffer offset is %{public}" PRId64 "", appSrcMem_->GetPushOffset());
482 appSrcMem_->PushBufferAndChangePos(pushSize, true);
483
484 if (needDataSize_ == pushSize) {
485 needData_ = false;
486 }
487 needDataSize_ -= pushSize;
488 appSrcMem_->SetNoFreeBuffer(false);
489 (void)gst_app_src_push_buffer(GST_APP_SRC_CAST(appSrc_), buffer);
490
491 appSrcMem_->PrintCurPos();
492 pullCond_.notify_all();
493 MEDIA_LOGD("PushBufferWithCopy out");
494 CANCEL_SCOPE_EXIT_GUARD(0);
495 return MSERR_OK;
496 }
497
AddSrcMem(uint32_t bufferSize)498 int32_t GstAppsrcEngine::AddSrcMem(uint32_t bufferSize)
499 {
500 MEDIA_LOGD("AddSrcMem in");
501 appSrcMemVec_.push_back(std::make_shared<AppsrcMemory>());
502 curSubscript_ += 1;
503 MEDIA_LOGD("curSubscript_ change to %{public}u", curSubscript_);
504 std::shared_ptr<AppsrcMemory> appSrcMemTemp = appSrcMem_;
505 appSrcMem_ = appSrcMemVec_[curSubscript_];
506 CHECK_AND_RETURN_RET_LOG(appSrcMem_ != nullptr, MSERR_NO_MEMORY, "appSrcMem_ is nullptr");
507
508 appSrcMem_->SetBufferSize(bufferSize);
509 auto mem = AVDataSrcMemory::CreateFromLocal(
510 bufferSize, AVSharedMemory::Flags::FLAGS_READ_WRITE, "appsrc");
511 CHECK_AND_RETURN_RET_LOG(mem != nullptr, MSERR_NO_MEMORY, "init AVSharedMemory failed");
512 appSrcMem_->SetMem(mem);
513 appSrcMem_->ResetMemParam();
514
515 bool ret = appSrcMem_->CopyBufferAndChangePos(appSrcMemTemp);
516 CHECK_AND_RETURN_RET_LOG(ret == true, MSERR_NO_MEMORY, "copy mem is failed");
517
518 pullCond_.notify_all();
519 MEDIA_LOGD("AddSrcMem out");
520 return MSERR_OK;
521 }
522
OnError(int32_t errorCode,const std::string & message)523 void GstAppsrcEngine::OnError(int32_t errorCode, const std::string &message)
524 {
525 isExit_ = true;
526 pullCond_.notify_all();
527 pushCond_.notify_all();
528 InnerMessage innerMsg {};
529 innerMsg.type = INNER_MSG_ERROR;
530 innerMsg.detail1 = errorCode;
531 innerMsg.extend = message;
532 (void)ReportMessage(innerMsg);
533 }
534
OnBufferReport(int32_t percent)535 bool GstAppsrcEngine::OnBufferReport(int32_t percent)
536 {
537 InnerMessage innerMsg {};
538 innerMsg.type = INNER_MSG_BUFFERING;
539 innerMsg.detail1 = percent;
540 return ReportMessage(innerMsg);
541 }
542
ReportMessage(const InnerMessage & msg)543 bool GstAppsrcEngine::ReportMessage(const InnerMessage &msg)
544 {
545 if (notifier_ != nullptr) {
546 return notifier_(msg);
547 }
548 return false;
549 }
550
FreePointerMemory(uint32_t offset,uint32_t length,uint32_t subscript)551 void GstAppsrcEngine::FreePointerMemory(uint32_t offset, uint32_t length, uint32_t subscript)
552 {
553 MEDIA_LOGD("FreePointerMemory in, offset is %{public}u, length is %{public}u, subscript is %{public}u",
554 offset, length, subscript);
555 std::unique_lock<std::mutex> freeLock(freeMutex_);
556 CHECK_AND_RETURN_LOG(subscript <= appSrcMemVec_.size(), "Check buffer pool subscript failed");
557 std::shared_ptr<AppsrcMemory> mem = appSrcMemVec_[subscript];
558 CHECK_AND_RETURN_LOG(mem != nullptr, "Buffer pool has been free");
559
560 mem->PrintCurPos();
561 mem->CheckBufferUsage();
562 CHECK_AND_RETURN_LOG(mem->FreeBufferAndChangePos(offset, length, copyMode_),
563 "Bufferpool checkout failed.");
564 mem->PrintCurPos();
565 if (subscript == curSubscript_) {
566 pullCond_.notify_all();
567 } else if (mem->GetFreeSize() == mem->GetBufferSize()) {
568 mem->SetMem(nullptr);
569 mem = nullptr;
570 }
571 MEDIA_LOGD("FreePointerMemory out");
572 }
573
GetTime()574 static int64_t GetTime()
575 {
576 struct timeval time = {};
577 int ret = gettimeofday(&time, nullptr);
578 CHECK_AND_RETURN_RET_LOG(ret != -1, -1, "Get current time failed!");
579 return static_cast<int64_t>(time.tv_sec) * TIME_VAL_MS +
580 static_cast<int64_t>(time.tv_usec) * TIME_VAL_MS / TIME_VAL_US;
581 }
582
IsConnectTimeout()583 bool GstAppsrcEngine::IsConnectTimeout()
584 {
585 if (!needData_) {
586 return false;
587 }
588 if (timer_ == 0) {
589 timer_ = GetTime();
590 MEDIA_LOGI("Waiting to receive data");
591 } else {
592 int64_t curTime = GetTime();
593 if (curTime - timer_ > PULL_BUFFER_TIME_OUT_MS && playState_ && OnBufferReport(0)) {
594 playState_ = false;
595 } else if (curTime - timer_ > PLAY_TIME_OUT_MS) {
596 MEDIA_LOGE("No data was received for 15 seconds");
597 return true;
598 }
599 }
600 return false;
601 }
602 } // namespace Media
603 } // namespace OHOS
604