1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "gst_appsrc_warp.h"
17 #include "avsharedmemorybase.h"
18 #include "media_log.h"
19 #include "media_errors.h"
20 #include "player.h"
21 #include "securec.h"
22
23 namespace {
24 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "GstAppsrcWarp"};
25 constexpr int32_t BUFFERS_NUM = 5;
26 constexpr int32_t BUFFER_SIZE = 81920;
27 constexpr int64_t INVALID_SIZE = -1;
28 }
29
30 namespace OHOS {
31 namespace Media {
Create(const std::shared_ptr<IMediaDataSource> & dataSrc)32 std::shared_ptr<GstAppsrcWarp> GstAppsrcWarp::Create(const std::shared_ptr<IMediaDataSource> &dataSrc)
33 {
34 CHECK_AND_RETURN_RET_LOG(dataSrc != nullptr, nullptr, "input dataSrc is empty!");
35 int64_t size = 0;
36 int32_t ret = dataSrc->GetSize(size);
37 CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, nullptr, "media data source get size failed!");
38 CHECK_AND_RETURN_RET_LOG(size >= INVALID_SIZE, nullptr, "size cannot less than -1");
39 std::shared_ptr<GstAppsrcWarp> warp = std::make_shared<GstAppsrcWarp>(dataSrc, size);
40 CHECK_AND_RETURN_RET_LOG(warp->Init() == MSERR_OK, nullptr, "init failed");
41 return warp;
42 }
43
GstAppsrcWarp(const std::shared_ptr<IMediaDataSource> & dataSrc,const int64_t size)44 GstAppsrcWarp::GstAppsrcWarp(const std::shared_ptr<IMediaDataSource> &dataSrc, const int64_t size)
45 : dataSrc_(dataSrc),
46 size_(size),
47 fillTaskQue_("fillbufferTask"),
48 emptyTaskQue_("emptybufferTask"),
49 bufferSize_(BUFFER_SIZE),
50 buffersNum_(BUFFERS_NUM)
51 {
52 MEDIA_LOGD("0x%{public}06" PRIXPTR " Instances create and size %{public}" PRId64 "", FAKE_POINTER(this), size);
53 streamType_ = size == INVALID_SIZE ? GST_APP_STREAM_TYPE_STREAM : GST_APP_STREAM_TYPE_RANDOM_ACCESS;
54 }
55
~GstAppsrcWarp()56 GstAppsrcWarp::~GstAppsrcWarp()
57 {
58 MEDIA_LOGD("0x%{public}06" PRIXPTR " Instances destroy", FAKE_POINTER(this));
59 Stop();
60 ClearAppsrc();
61 }
62
Init()63 int32_t GstAppsrcWarp::Init()
64 {
65 for (int i = 0; i < buffersNum_; ++i) {
66 std::shared_ptr<AppsrcMemWarp> appSrcMem = std::make_shared<AppsrcMemWarp>();
67 CHECK_AND_RETURN_RET_LOG(appSrcMem != nullptr, MSERR_NO_MEMORY, "init AppsrcMemWarp failed");
68 appSrcMem->mem = AVSharedMemoryBase::CreateFromLocal(
69 bufferSize_, AVSharedMemory::Flags::FLAGS_READ_WRITE, "appsrc");
70 CHECK_AND_RETURN_RET_LOG(appSrcMem->mem != nullptr, MSERR_NO_MEMORY, "init AVSharedMemory failed");
71 (void)emptyBuffers_.emplace(appSrcMem);
72 }
73 return MSERR_OK;
74 }
75
Prepare()76 int32_t GstAppsrcWarp::Prepare()
77 {
78 std::unique_lock<std::mutex> lock(mutex_);
79 MEDIA_LOGD("Prepare in");
80 if (!isExit_) {
81 MEDIA_LOGD("Prepared");
82 return MSERR_OK;
83 }
84 isExit_ = false;
85 needData_ = false;
86 filledBufferSize_ = 0;
87 needDataSize_ = 0;
88 atEos_ = false;
89 curPos_ = 0;
90 while (!filledBuffers_.empty()) {
91 std::shared_ptr<AppsrcMemWarp> appSrcMem = filledBuffers_.front();
92 filledBuffers_.pop();
93 emptyBuffers_.push(appSrcMem);
94 }
95 CHECK_AND_RETURN_RET_LOG(fillTaskQue_.Start() == MSERR_OK, MSERR_INVALID_OPERATION, "init task failed");
96 CHECK_AND_RETURN_RET_LOG(emptyTaskQue_.Start() == MSERR_OK, MSERR_INVALID_OPERATION, "init task failed");
97 auto task = std::make_shared<TaskHandler<void>>([this] {
98 FillTask();
99 });
100 CHECK_AND_RETURN_RET_LOG(fillTaskQue_.EnqueueTask(task) == MSERR_OK,
101 MSERR_INVALID_OPERATION, "enque task failed");
102 task = std::make_shared<TaskHandler<void>>([this] {
103 EmptyTask();
104 });
105 CHECK_AND_RETURN_RET_LOG(emptyTaskQue_.EnqueueTask(task) == MSERR_OK,
106 MSERR_INVALID_OPERATION, "enque task failed");
107 MEDIA_LOGD("Prepare out");
108 return MSERR_OK;
109 }
110
Stop()111 void GstAppsrcWarp::Stop()
112 {
113 {
114 std::unique_lock<std::mutex> lock(mutex_);
115 isExit_ = true;
116 if (bufferWarp_ != nullptr) {
117 gst_buffer_unref(bufferWarp_->buffer);
118 bufferWarp_ = nullptr;
119 }
120 fillCond_.notify_all();
121 emptyCond_.notify_all();
122 }
123 (void)fillTaskQue_.Stop();
124 (void)emptyTaskQue_.Stop();
125 }
126
ClearAppsrc()127 void GstAppsrcWarp::ClearAppsrc()
128 {
129 if (appSrc_ != nullptr) {
130 for (auto &id : callbackIds_) {
131 g_signal_handler_disconnect(appSrc_, id);
132 }
133 callbackIds_.clear();
134 gst_object_unref(appSrc_);
135 appSrc_ = nullptr;
136 }
137 }
138
SetAppsrc(GstElement * appSrc)139 int32_t GstAppsrcWarp::SetAppsrc(GstElement *appSrc)
140 {
141 MEDIA_LOGD("set Appsrc");
142 ClearAppsrc();
143 appSrc_ = static_cast<GstElement *>(gst_object_ref(appSrc));
144 CHECK_AND_RETURN_RET_LOG(appSrc_ != nullptr, MSERR_INVALID_VAL, "gstPlayer_ is nullptr");
145 SetCallBackForAppSrc();
146 return MSERR_OK;
147 }
148
SetCallBackForAppSrc()149 void GstAppsrcWarp::SetCallBackForAppSrc()
150 {
151 MEDIA_LOGD("SetCallBackForAppSrc");
152 CHECK_AND_RETURN_LOG(appSrc_ != nullptr, "appSrc_ is nullptr");
153 int64_t size = static_cast<int64_t>(size_);
154 g_object_set(appSrc_, "stream-type", streamType_, nullptr);
155 g_object_set(appSrc_, "format", GST_FORMAT_BYTES, nullptr);
156 g_object_set(appSrc_, "size", size, nullptr);
157 callbackIds_.push_back(g_signal_connect(appSrc_, "need-data", G_CALLBACK(NeedData), this));
158 if (streamType_ == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
159 callbackIds_.push_back(g_signal_connect(appSrc_, "seek-data", G_CALLBACK(SeekData), this));
160 }
161 MEDIA_LOGD("setcall back end");
162 }
163
IsLiveMode() const164 bool GstAppsrcWarp::IsLiveMode() const
165 {
166 return streamType_ == GST_APP_STREAM_TYPE_STREAM;
167 }
168
SetErrorCallback(const std::weak_ptr<IPlayerEngineObs> & obs)169 int32_t GstAppsrcWarp::SetErrorCallback(const std::weak_ptr<IPlayerEngineObs> &obs)
170 {
171 CHECK_AND_RETURN_RET_LOG(obs.lock() != nullptr, MSERR_INVALID_OPERATION,
172 "IPlayerEngineObs is nullptr, please set errorcallback");
173 obs_ = obs;
174 return MSERR_OK;
175 }
176
NeedData(const GstElement * appSrc,uint32_t size,gpointer self)177 void GstAppsrcWarp::NeedData(const GstElement *appSrc, uint32_t size, gpointer self)
178 {
179 (void)appSrc;
180 CHECK_AND_RETURN_LOG(self != nullptr, "self is nullptr");
181 auto warp = static_cast<GstAppsrcWarp *>(self);
182 warp->NeedDataInner(size);
183 }
184
NeedDataInner(uint32_t size)185 void GstAppsrcWarp::NeedDataInner(uint32_t size)
186 {
187 std::unique_lock<std::mutex> lock(mutex_);
188 int32_t ret = MSERR_OK;
189 needDataSize_ = static_cast<int32_t>(size);
190 if (!filledBuffers_.empty() && (needDataSize_ <= filledBufferSize_ || atEos_ ||
191 streamType_ == GST_APP_STREAM_TYPE_STREAM) && !isExit_) {
192 ret = GetAndPushMem();
193 if (ret != MSERR_OK) {
194 OnError(ret);
195 }
196 } else {
197 needData_ = true;
198 if (!filledBuffers_.empty()) {
199 emptyCond_.notify_all();
200 }
201 }
202 }
203
FillTask()204 void GstAppsrcWarp::FillTask()
205 {
206 int32_t ret = ReadAndGetMem();
207 if (ret != MSERR_OK) {
208 OnError(ret);
209 }
210 }
211
EmptyTask()212 void GstAppsrcWarp::EmptyTask()
213 {
214 int32_t ret = MSERR_OK;
215 while (ret == MSERR_OK) {
216 std::unique_lock<std::mutex> lock(mutex_);
217 emptyCond_.wait(lock, [this] {
218 return (!filledBuffers_.empty() && needData_) || isExit_;
219 });
220 if (isExit_) {
221 break;
222 }
223 ret = GetAndPushMem();
224 }
225 if (ret != MSERR_OK) {
226 OnError(ret);
227 }
228 }
229
SeekData(const GstElement * appSrc,uint64_t seekPos,gpointer self)230 gboolean GstAppsrcWarp::SeekData(const GstElement *appSrc, uint64_t seekPos, gpointer self)
231 {
232 MEDIA_LOGD("SeekData pos: %{public}" PRIu64 "", seekPos);
233 (void)appSrc;
234 CHECK_AND_RETURN_RET_LOG(self != nullptr, FALSE, "self is nullptr");
235 auto warp = static_cast<GstAppsrcWarp *>(self);
236 return warp->SeekDataInner(seekPos);
237 }
238
SeekAndFreeBuffers(uint64_t pos)239 void GstAppsrcWarp::SeekAndFreeBuffers(uint64_t pos)
240 {
241 std::unique_lock<std::mutex> lock(mutex_);
242 while (!filledBuffers_.empty()) {
243 std::shared_ptr<AppsrcMemWarp> appSrcMem = filledBuffers_.front();
244 if (appSrcMem->size < 0) {
245 filledBuffers_.pop();
246 emptyBuffers_.push(appSrcMem);
247 continue;
248 }
249 if (appSrcMem->pos <= pos && appSrcMem->pos + static_cast<uint64_t>(appSrcMem->size) > pos) {
250 int32_t len = static_cast<int32_t>(pos - appSrcMem->pos);
251 filledBufferSize_ += appSrcMem->offset;
252 appSrcMem->offset = len;
253 filledBufferSize_ -= appSrcMem->offset;
254 break;
255 }
256 filledBufferSize_ = filledBufferSize_ - (appSrcMem->size - appSrcMem->offset);
257 filledBuffers_.pop();
258 emptyBuffers_.push(appSrcMem);
259 }
260 if (filledBuffers_.empty()) {
261 curPos_ = pos;
262 atEos_ = false;
263 }
264 fillCond_.notify_all();
265 }
266
SeekDataInner(uint64_t pos)267 gboolean GstAppsrcWarp::SeekDataInner(uint64_t pos)
268 {
269 SeekAndFreeBuffers(pos);
270 return TRUE;
271 }
272
ReadAndGetMem()273 int32_t GstAppsrcWarp::ReadAndGetMem()
274 {
275 int32_t ret = MSERR_OK;
276 while (ret == MSERR_OK) {
277 int32_t size = 0;
278 std::shared_ptr<AppsrcMemWarp> appSrcMem = nullptr;
279 {
280 std::unique_lock<std::mutex> lock(mutex_);
281 fillCond_.wait(lock, [this] { return (!emptyBuffers_.empty() && !atEos_) || isExit_; });
282 if (isExit_) {
283 break;
284 }
285 appSrcMem = emptyBuffers_.front();
286 CHECK_AND_RETURN_RET_LOG(appSrcMem != nullptr && appSrcMem->mem != nullptr, MSERR_NO_MEMORY, "no mem");
287 appSrcMem->pos = curPos_;
288 emptyBuffers_.pop();
289 }
290 if (size_ == INVALID_SIZE) {
291 size = dataSrc_->ReadAt(bufferSize_, appSrcMem->mem);
292 } else {
293 size = dataSrc_->ReadAt(static_cast<int64_t>(appSrcMem->pos), bufferSize_, appSrcMem->mem);
294 }
295 if (size > appSrcMem->mem->GetSize()) {
296 ret = MSERR_INVALID_VAL;
297 }
298 {
299 std::unique_lock<std::mutex> lock(mutex_);
300 if (size == 0 || curPos_ != appSrcMem->pos) {
301 emptyBuffers_.push(appSrcMem);
302 } else if (size < 0) {
303 appSrcMem->size = size;
304 atEos_ = true;
305 filledBuffers_.push(appSrcMem);
306 } else {
307 size = std::min(size, appSrcMem->mem->GetSize());
308 appSrcMem->size = size;
309 filledBufferSize_ += size;
310 appSrcMem->pos = curPos_;
311 appSrcMem->offset = 0;
312 curPos_ = curPos_ + static_cast<uint64_t>(size);
313 filledBuffers_.push(appSrcMem);
314 }
315 emptyCond_.notify_all();
316 }
317 }
318 return ret;
319 }
320
EosAndCheckSize(int32_t size)321 void GstAppsrcWarp::EosAndCheckSize(int32_t size)
322 {
323 MEDIA_LOGD("%{public}d", size);
324 PushEos();
325 switch (size) {
326 case SOURCE_ERROR_IO:
327 OnError(MSERR_DATA_SOURCE_IO_ERROR);
328 MEDIA_LOGW("IO ERROR %{public}d", size);
329 break;
330 case SOURCE_ERROR_EOF:
331 break;
332 default:
333 OnError(MSERR_DATA_SOURCE_ERROR_UNKNOWN);
334 MEDIA_LOGE("unknown error %{public}d", size);
335 break;
336 }
337 }
338
GetAndPushMem()339 int32_t GstAppsrcWarp::GetAndPushMem()
340 {
341 int32_t size = needDataSize_ > filledBufferSize_ ? filledBufferSize_ : needDataSize_;
342 std::shared_ptr<AppsrcMemWarp> appSrcMem = filledBuffers_.front();
343 CHECK_AND_RETURN_RET_LOG(appSrcMem != nullptr && appSrcMem->mem != nullptr, MSERR_NO_MEMORY, "no mem");
344 if (size == 0) {
345 EosAndCheckSize(appSrcMem->size);
346 filledBuffers_.pop();
347 emptyBuffers_.push(appSrcMem);
348 needData_ = false;
349 return MSERR_OK;
350 }
351 GstBuffer *buffer = nullptr;
352 if (bufferWarp_ != nullptr && bufferWarp_->buffer != nullptr) {
353 buffer = bufferWarp_->buffer;
354 } else {
355 bufferWarp_ = std::make_shared<AppsrcBufferWarp>();
356 int32_t allocSize = streamType_ == GST_APP_STREAM_TYPE_STREAM ? size : needDataSize_;
357 buffer = gst_buffer_new_allocate(nullptr, static_cast<gsize>(allocSize), nullptr);
358 CHECK_AND_RETURN_RET_LOG(buffer != nullptr, MSERR_NO_MEMORY, "no mem");
359 GST_BUFFER_OFFSET(buffer) = appSrcMem->pos + static_cast<uint64_t>(appSrcMem->offset);
360 bufferWarp_->buffer = buffer;
361 bufferWarp_->offset = 0;
362 bufferWarp_->size = allocSize;
363 }
364 GstMapInfo info = GST_MAP_INFO_INIT;
365 if (gst_buffer_map(buffer, &info, GST_MAP_WRITE) == FALSE) {
366 gst_buffer_unref(buffer);
367 MEDIA_LOGE("map buffer failed");
368 return MSERR_NO_MEMORY;
369 }
370 bool copyRet = CopyToGstBuffer(info);
371 gst_buffer_unmap(buffer, &info);
372 if (!copyRet) {
373 MEDIA_LOGE("copy buffer failed");
374 gst_buffer_unref(buffer);
375 return MSERR_NO_MEMORY;
376 }
377 if (bufferWarp_->size == bufferWarp_->offset) {
378 bufferWarp_ = nullptr;
379 PushData(buffer);
380 needDataSize_ = 0;
381 needData_ = false;
382 gst_buffer_unref(buffer);
383 } else {
384 needDataSize_ = bufferWarp_->size - bufferWarp_->offset;
385 }
386 filledBufferSize_ -= size;
387 return MSERR_OK;
388 }
389
CopyToGstBuffer(const GstMapInfo & info)390 bool GstAppsrcWarp::CopyToGstBuffer(const GstMapInfo &info)
391 {
392 guint8 *data = info.data + bufferWarp_->offset;
393 int32_t size = static_cast<int32_t>(info.size) - bufferWarp_->offset;
394 while (size > 0 && !filledBuffers_.empty()) {
395 std::shared_ptr<AppsrcMemWarp> appSrcMem = filledBuffers_.front();
396 CHECK_AND_BREAK_LOG(appSrcMem != nullptr && appSrcMem->mem != nullptr
397 && appSrcMem->mem->GetBase() != nullptr
398 && (appSrcMem->size - appSrcMem->offset) > 0,
399 "get mem is nullptr");
400 int32_t lastSize = appSrcMem->size - appSrcMem->offset;
401 int32_t copySize = std::min(lastSize, size);
402 CHECK_AND_BREAK_LOG(memcpy_s(data, static_cast<size_t>(size),
403 appSrcMem->mem->GetBase() + appSrcMem->offset, static_cast<size_t>(copySize)) == EOK,
404 "get mem is nullptr");
405 if (lastSize <= size) {
406 filledBuffers_.pop();
407 emptyBuffers_.push(appSrcMem);
408 fillCond_.notify_all();
409 } else {
410 appSrcMem->offset += copySize;
411 }
412 data = data + copySize;
413 bufferWarp_->offset += copySize;
414 size -= copySize;
415 }
416 if (size != 0 && !filledBuffers_.empty()) {
417 return false;
418 }
419 return true;
420 }
421
OnError(int32_t errorCode)422 void GstAppsrcWarp::OnError(int32_t errorCode)
423 {
424 PlayerErrorType errorType = PLAYER_ERROR_UNKNOWN;
425 std::shared_ptr<IPlayerEngineObs> tempObs = obs_.lock();
426 if (tempObs != nullptr) {
427 tempObs->OnError(errorType, errorCode);
428 }
429 }
430
PushData(const GstBuffer * buffer) const431 void GstAppsrcWarp::PushData(const GstBuffer *buffer) const
432 {
433 int32_t ret = GST_FLOW_OK;
434 if (appSrc_ != nullptr) {
435 g_signal_emit_by_name(appSrc_, "push-buffer", buffer, &ret);
436 }
437 }
438
PushEos()439 void GstAppsrcWarp::PushEos()
440 {
441 int32_t ret = GST_FLOW_OK;
442 if (appSrc_ != nullptr) {
443 g_signal_emit_by_name(appSrc_, "end-of-stream", &ret);
444 }
445 MEDIA_LOGD("appsrcPushEos ret:%{public}d", ret);
446 }
447 } // namespace Media
448 } // namespace OHOS