• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "gst_appsrc_warp.h"
17 #include "avsharedmemorybase.h"
18 #include "media_log.h"
19 #include "media_errors.h"
20 #include "player.h"
21 #include "securec.h"
22 
23 namespace {
24     constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "GstAppsrcWarp"};
25     constexpr int32_t BUFFERS_NUM = 5;
26     constexpr int32_t BUFFER_SIZE = 81920;
27     constexpr int64_t INVALID_SIZE = -1;
28 }
29 
30 namespace OHOS {
31 namespace Media {
Create(const std::shared_ptr<IMediaDataSource> & dataSrc)32 std::shared_ptr<GstAppsrcWarp> GstAppsrcWarp::Create(const std::shared_ptr<IMediaDataSource> &dataSrc)
33 {
34     CHECK_AND_RETURN_RET_LOG(dataSrc != nullptr, nullptr, "input dataSrc is empty!");
35     int64_t size = 0;
36     int32_t ret = dataSrc->GetSize(size);
37     CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, nullptr, "media data source get size failed!");
38     CHECK_AND_RETURN_RET_LOG(size >= INVALID_SIZE, nullptr, "size cannot less than -1");
39     std::shared_ptr<GstAppsrcWarp> warp = std::make_shared<GstAppsrcWarp>(dataSrc, size);
40     CHECK_AND_RETURN_RET_LOG(warp->Init() == MSERR_OK, nullptr, "init failed");
41     return warp;
42 }
43 
GstAppsrcWarp(const std::shared_ptr<IMediaDataSource> & dataSrc,const int64_t size)44 GstAppsrcWarp::GstAppsrcWarp(const std::shared_ptr<IMediaDataSource> &dataSrc, const int64_t size)
45     : dataSrc_(dataSrc),
46       size_(size),
47       fillTaskQue_("fillbufferTask"),
48       emptyTaskQue_("emptybufferTask"),
49       bufferSize_(BUFFER_SIZE),
50       buffersNum_(BUFFERS_NUM)
51 {
52     MEDIA_LOGD("0x%{public}06" PRIXPTR " Instances create and size %{public}" PRId64 "", FAKE_POINTER(this), size);
53     streamType_ = size == INVALID_SIZE ? GST_APP_STREAM_TYPE_STREAM : GST_APP_STREAM_TYPE_RANDOM_ACCESS;
54 }
55 
~GstAppsrcWarp()56 GstAppsrcWarp::~GstAppsrcWarp()
57 {
58     MEDIA_LOGD("0x%{public}06" PRIXPTR " Instances destroy", FAKE_POINTER(this));
59     Stop();
60     ClearAppsrc();
61 }
62 
Init()63 int32_t GstAppsrcWarp::Init()
64 {
65     for (int i = 0; i < buffersNum_; ++i) {
66         std::shared_ptr<AppsrcMemWarp> appSrcMem = std::make_shared<AppsrcMemWarp>();
67         CHECK_AND_RETURN_RET_LOG(appSrcMem != nullptr, MSERR_NO_MEMORY, "init AppsrcMemWarp failed");
68         appSrcMem->mem = AVSharedMemoryBase::CreateFromLocal(
69             bufferSize_, AVSharedMemory::Flags::FLAGS_READ_WRITE, "appsrc");
70         CHECK_AND_RETURN_RET_LOG(appSrcMem->mem != nullptr, MSERR_NO_MEMORY, "init AVSharedMemory failed");
71         (void)emptyBuffers_.emplace(appSrcMem);
72     }
73     return MSERR_OK;
74 }
75 
Prepare()76 int32_t GstAppsrcWarp::Prepare()
77 {
78     std::unique_lock<std::mutex> lock(mutex_);
79     MEDIA_LOGD("Prepare in");
80     if (!isExit_) {
81         MEDIA_LOGD("Prepared");
82         return MSERR_OK;
83     }
84     isExit_ = false;
85     needData_ = false;
86     filledBufferSize_ = 0;
87     needDataSize_ = 0;
88     atEos_ = false;
89     curPos_ = 0;
90     while (!filledBuffers_.empty()) {
91         std::shared_ptr<AppsrcMemWarp> appSrcMem = filledBuffers_.front();
92         filledBuffers_.pop();
93         emptyBuffers_.push(appSrcMem);
94     }
95     CHECK_AND_RETURN_RET_LOG(fillTaskQue_.Start() == MSERR_OK, MSERR_INVALID_OPERATION, "init task failed");
96     CHECK_AND_RETURN_RET_LOG(emptyTaskQue_.Start() == MSERR_OK, MSERR_INVALID_OPERATION, "init task failed");
97     auto task = std::make_shared<TaskHandler<void>>([this] {
98         FillTask();
99     });
100     CHECK_AND_RETURN_RET_LOG(fillTaskQue_.EnqueueTask(task) == MSERR_OK,
101         MSERR_INVALID_OPERATION, "enque task failed");
102     task = std::make_shared<TaskHandler<void>>([this] {
103         EmptyTask();
104     });
105     CHECK_AND_RETURN_RET_LOG(emptyTaskQue_.EnqueueTask(task) == MSERR_OK,
106         MSERR_INVALID_OPERATION, "enque task failed");
107     MEDIA_LOGD("Prepare out");
108     return MSERR_OK;
109 }
110 
Stop()111 void GstAppsrcWarp::Stop()
112 {
113     {
114         std::unique_lock<std::mutex> lock(mutex_);
115         isExit_ = true;
116         if (bufferWarp_ != nullptr) {
117             gst_buffer_unref(bufferWarp_->buffer);
118             bufferWarp_ = nullptr;
119         }
120         fillCond_.notify_all();
121         emptyCond_.notify_all();
122     }
123     (void)fillTaskQue_.Stop();
124     (void)emptyTaskQue_.Stop();
125 }
126 
ClearAppsrc()127 void GstAppsrcWarp::ClearAppsrc()
128 {
129     if (appSrc_ != nullptr) {
130         for (auto &id : callbackIds_) {
131             g_signal_handler_disconnect(appSrc_, id);
132         }
133         callbackIds_.clear();
134         gst_object_unref(appSrc_);
135         appSrc_ = nullptr;
136     }
137 }
138 
SetAppsrc(GstElement * appSrc)139 int32_t GstAppsrcWarp::SetAppsrc(GstElement *appSrc)
140 {
141     MEDIA_LOGD("set Appsrc");
142     ClearAppsrc();
143     appSrc_ = static_cast<GstElement *>(gst_object_ref(appSrc));
144     CHECK_AND_RETURN_RET_LOG(appSrc_ != nullptr, MSERR_INVALID_VAL, "gstPlayer_ is nullptr");
145     SetCallBackForAppSrc();
146     return MSERR_OK;
147 }
148 
SetCallBackForAppSrc()149 void GstAppsrcWarp::SetCallBackForAppSrc()
150 {
151     MEDIA_LOGD("SetCallBackForAppSrc");
152     CHECK_AND_RETURN_LOG(appSrc_ != nullptr, "appSrc_ is nullptr");
153     int64_t size = static_cast<int64_t>(size_);
154     g_object_set(appSrc_, "stream-type", streamType_, nullptr);
155     g_object_set(appSrc_, "format", GST_FORMAT_BYTES, nullptr);
156     g_object_set(appSrc_, "size", size, nullptr);
157     callbackIds_.push_back(g_signal_connect(appSrc_, "need-data", G_CALLBACK(NeedData), this));
158     if (streamType_ == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
159         callbackIds_.push_back(g_signal_connect(appSrc_, "seek-data", G_CALLBACK(SeekData), this));
160     }
161     MEDIA_LOGD("setcall back end");
162 }
163 
IsLiveMode() const164 bool GstAppsrcWarp::IsLiveMode() const
165 {
166     return streamType_ == GST_APP_STREAM_TYPE_STREAM;
167 }
168 
SetErrorCallback(const std::weak_ptr<IPlayerEngineObs> & obs)169 int32_t GstAppsrcWarp::SetErrorCallback(const std::weak_ptr<IPlayerEngineObs> &obs)
170 {
171     CHECK_AND_RETURN_RET_LOG(obs.lock() != nullptr, MSERR_INVALID_OPERATION,
172         "IPlayerEngineObs is nullptr, please set errorcallback");
173     obs_ = obs;
174     return MSERR_OK;
175 }
176 
NeedData(const GstElement * appSrc,uint32_t size,gpointer self)177 void GstAppsrcWarp::NeedData(const GstElement *appSrc, uint32_t size, gpointer self)
178 {
179     (void)appSrc;
180     CHECK_AND_RETURN_LOG(self != nullptr, "self is nullptr");
181     auto warp = static_cast<GstAppsrcWarp *>(self);
182     warp->NeedDataInner(size);
183 }
184 
NeedDataInner(uint32_t size)185 void GstAppsrcWarp::NeedDataInner(uint32_t size)
186 {
187     std::unique_lock<std::mutex> lock(mutex_);
188     int32_t ret = MSERR_OK;
189     needDataSize_ = static_cast<int32_t>(size);
190     if (!filledBuffers_.empty() && (needDataSize_ <= filledBufferSize_ || atEos_ ||
191         streamType_ == GST_APP_STREAM_TYPE_STREAM) && !isExit_) {
192         ret = GetAndPushMem();
193         if (ret != MSERR_OK) {
194             OnError(ret);
195         }
196     } else {
197         needData_ = true;
198         if (!filledBuffers_.empty()) {
199             emptyCond_.notify_all();
200         }
201     }
202 }
203 
FillTask()204 void GstAppsrcWarp::FillTask()
205 {
206     int32_t ret = ReadAndGetMem();
207     if (ret != MSERR_OK) {
208         OnError(ret);
209     }
210 }
211 
EmptyTask()212 void GstAppsrcWarp::EmptyTask()
213 {
214     int32_t ret = MSERR_OK;
215     while (ret == MSERR_OK) {
216         std::unique_lock<std::mutex> lock(mutex_);
217         emptyCond_.wait(lock, [this] {
218             return (!filledBuffers_.empty() && needData_) || isExit_;
219         });
220         if (isExit_) {
221             break;
222         }
223         ret = GetAndPushMem();
224     }
225     if (ret != MSERR_OK) {
226         OnError(ret);
227     }
228 }
229 
SeekData(const GstElement * appSrc,uint64_t seekPos,gpointer self)230 gboolean GstAppsrcWarp::SeekData(const GstElement *appSrc, uint64_t seekPos, gpointer self)
231 {
232     MEDIA_LOGD("SeekData pos: %{public}" PRIu64 "", seekPos);
233     (void)appSrc;
234     CHECK_AND_RETURN_RET_LOG(self != nullptr, FALSE, "self is nullptr");
235     auto warp = static_cast<GstAppsrcWarp *>(self);
236     return warp->SeekDataInner(seekPos);
237 }
238 
SeekAndFreeBuffers(uint64_t pos)239 void GstAppsrcWarp::SeekAndFreeBuffers(uint64_t pos)
240 {
241     std::unique_lock<std::mutex> lock(mutex_);
242     while (!filledBuffers_.empty()) {
243         std::shared_ptr<AppsrcMemWarp> appSrcMem = filledBuffers_.front();
244         if (appSrcMem->size < 0) {
245             filledBuffers_.pop();
246             emptyBuffers_.push(appSrcMem);
247             continue;
248         }
249         if (appSrcMem->pos <= pos && appSrcMem->pos + static_cast<uint64_t>(appSrcMem->size) > pos) {
250             int32_t len = static_cast<int32_t>(pos - appSrcMem->pos);
251             filledBufferSize_ += appSrcMem->offset;
252             appSrcMem->offset = len;
253             filledBufferSize_ -= appSrcMem->offset;
254             break;
255         }
256         filledBufferSize_ = filledBufferSize_ - (appSrcMem->size - appSrcMem->offset);
257         filledBuffers_.pop();
258         emptyBuffers_.push(appSrcMem);
259     }
260     if (filledBuffers_.empty()) {
261         curPos_ = pos;
262         atEos_ = false;
263     }
264     fillCond_.notify_all();
265 }
266 
SeekDataInner(uint64_t pos)267 gboolean GstAppsrcWarp::SeekDataInner(uint64_t pos)
268 {
269     SeekAndFreeBuffers(pos);
270     return TRUE;
271 }
272 
ReadAndGetMem()273 int32_t GstAppsrcWarp::ReadAndGetMem()
274 {
275     int32_t ret = MSERR_OK;
276     while (ret == MSERR_OK) {
277         int32_t size = 0;
278         std::shared_ptr<AppsrcMemWarp> appSrcMem = nullptr;
279         {
280             std::unique_lock<std::mutex> lock(mutex_);
281             fillCond_.wait(lock, [this] { return (!emptyBuffers_.empty() && !atEos_) || isExit_; });
282             if (isExit_) {
283                 break;
284             }
285             appSrcMem = emptyBuffers_.front();
286             CHECK_AND_RETURN_RET_LOG(appSrcMem != nullptr && appSrcMem->mem != nullptr, MSERR_NO_MEMORY, "no mem");
287             appSrcMem->pos = curPos_;
288             emptyBuffers_.pop();
289         }
290         if (size_ == INVALID_SIZE) {
291             size = dataSrc_->ReadAt(bufferSize_, appSrcMem->mem);
292         } else {
293             size = dataSrc_->ReadAt(static_cast<int64_t>(appSrcMem->pos), bufferSize_, appSrcMem->mem);
294         }
295         if (size > appSrcMem->mem->GetSize()) {
296             ret = MSERR_INVALID_VAL;
297         }
298         {
299             std::unique_lock<std::mutex> lock(mutex_);
300             if (size == 0 || curPos_ != appSrcMem->pos) {
301                 emptyBuffers_.push(appSrcMem);
302             } else if (size < 0) {
303                 appSrcMem->size = size;
304                 atEos_ = true;
305                 filledBuffers_.push(appSrcMem);
306             } else {
307                 size = std::min(size, appSrcMem->mem->GetSize());
308                 appSrcMem->size = size;
309                 filledBufferSize_ += size;
310                 appSrcMem->pos = curPos_;
311                 appSrcMem->offset = 0;
312                 curPos_ = curPos_ + static_cast<uint64_t>(size);
313                 filledBuffers_.push(appSrcMem);
314             }
315             emptyCond_.notify_all();
316         }
317     }
318     return ret;
319 }
320 
EosAndCheckSize(int32_t size)321 void GstAppsrcWarp::EosAndCheckSize(int32_t size)
322 {
323     MEDIA_LOGD("%{public}d", size);
324     PushEos();
325     switch (size) {
326         case SOURCE_ERROR_IO:
327             OnError(MSERR_DATA_SOURCE_IO_ERROR);
328             MEDIA_LOGW("IO ERROR %{public}d", size);
329             break;
330         case SOURCE_ERROR_EOF:
331             break;
332         default:
333             OnError(MSERR_DATA_SOURCE_ERROR_UNKNOWN);
334             MEDIA_LOGE("unknown error %{public}d", size);
335             break;
336     }
337 }
338 
GetAndPushMem()339 int32_t GstAppsrcWarp::GetAndPushMem()
340 {
341     int32_t size = needDataSize_ > filledBufferSize_ ? filledBufferSize_ : needDataSize_;
342     std::shared_ptr<AppsrcMemWarp> appSrcMem = filledBuffers_.front();
343     CHECK_AND_RETURN_RET_LOG(appSrcMem != nullptr && appSrcMem->mem != nullptr, MSERR_NO_MEMORY, "no mem");
344     if (size == 0) {
345         EosAndCheckSize(appSrcMem->size);
346         filledBuffers_.pop();
347         emptyBuffers_.push(appSrcMem);
348         needData_ = false;
349         return MSERR_OK;
350     }
351     GstBuffer *buffer = nullptr;
352     if (bufferWarp_ != nullptr && bufferWarp_->buffer != nullptr) {
353         buffer = bufferWarp_->buffer;
354     } else {
355         bufferWarp_ = std::make_shared<AppsrcBufferWarp>();
356         int32_t allocSize = streamType_ == GST_APP_STREAM_TYPE_STREAM ? size : needDataSize_;
357         buffer = gst_buffer_new_allocate(nullptr, static_cast<gsize>(allocSize), nullptr);
358         CHECK_AND_RETURN_RET_LOG(buffer != nullptr, MSERR_NO_MEMORY, "no mem");
359         GST_BUFFER_OFFSET(buffer) = appSrcMem->pos + static_cast<uint64_t>(appSrcMem->offset);
360         bufferWarp_->buffer = buffer;
361         bufferWarp_->offset = 0;
362         bufferWarp_->size = allocSize;
363     }
364     GstMapInfo info = GST_MAP_INFO_INIT;
365     if (gst_buffer_map(buffer, &info, GST_MAP_WRITE) == FALSE) {
366         gst_buffer_unref(buffer);
367         MEDIA_LOGE("map buffer failed");
368         return MSERR_NO_MEMORY;
369     }
370     bool copyRet = CopyToGstBuffer(info);
371     gst_buffer_unmap(buffer, &info);
372     if (!copyRet) {
373         MEDIA_LOGE("copy buffer failed");
374         gst_buffer_unref(buffer);
375         return MSERR_NO_MEMORY;
376     }
377     if (bufferWarp_->size == bufferWarp_->offset) {
378         bufferWarp_ = nullptr;
379         PushData(buffer);
380         needDataSize_ = 0;
381         needData_ = false;
382         gst_buffer_unref(buffer);
383     } else {
384         needDataSize_ = bufferWarp_->size - bufferWarp_->offset;
385     }
386     filledBufferSize_ -= size;
387     return MSERR_OK;
388 }
389 
CopyToGstBuffer(const GstMapInfo & info)390 bool GstAppsrcWarp::CopyToGstBuffer(const GstMapInfo &info)
391 {
392     guint8 *data = info.data + bufferWarp_->offset;
393     int32_t size = static_cast<int32_t>(info.size) - bufferWarp_->offset;
394     while (size > 0 && !filledBuffers_.empty()) {
395         std::shared_ptr<AppsrcMemWarp> appSrcMem = filledBuffers_.front();
396         CHECK_AND_BREAK_LOG(appSrcMem != nullptr && appSrcMem->mem != nullptr
397             && appSrcMem->mem->GetBase() != nullptr
398             && (appSrcMem->size - appSrcMem->offset) > 0,
399             "get mem is nullptr");
400         int32_t lastSize = appSrcMem->size - appSrcMem->offset;
401         int32_t copySize = std::min(lastSize, size);
402         CHECK_AND_BREAK_LOG(memcpy_s(data, static_cast<size_t>(size),
403             appSrcMem->mem->GetBase() + appSrcMem->offset, static_cast<size_t>(copySize)) == EOK,
404             "get mem is nullptr");
405         if (lastSize <= size) {
406             filledBuffers_.pop();
407             emptyBuffers_.push(appSrcMem);
408             fillCond_.notify_all();
409         } else {
410             appSrcMem->offset += copySize;
411         }
412         data = data + copySize;
413         bufferWarp_->offset += copySize;
414         size -= copySize;
415     }
416     if (size != 0 && !filledBuffers_.empty()) {
417         return false;
418     }
419     return true;
420 }
421 
OnError(int32_t errorCode)422 void GstAppsrcWarp::OnError(int32_t errorCode)
423 {
424     PlayerErrorType errorType = PLAYER_ERROR_UNKNOWN;
425     std::shared_ptr<IPlayerEngineObs> tempObs = obs_.lock();
426     if (tempObs != nullptr) {
427         tempObs->OnError(errorType, errorCode);
428     }
429 }
430 
PushData(const GstBuffer * buffer) const431 void GstAppsrcWarp::PushData(const GstBuffer *buffer) const
432 {
433     int32_t ret = GST_FLOW_OK;
434     if (appSrc_ != nullptr) {
435         g_signal_emit_by_name(appSrc_, "push-buffer", buffer, &ret);
436     }
437 }
438 
PushEos()439 void GstAppsrcWarp::PushEos()
440 {
441     int32_t ret = GST_FLOW_OK;
442     if (appSrc_ != nullptr) {
443         g_signal_emit_by_name(appSrc_, "end-of-stream", &ret);
444     }
445     MEDIA_LOGD("appsrcPushEos ret:%{public}d", ret);
446 }
447 } // namespace Media
448 } // namespace OHOS