• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "gst_shmem_src.h"
17 #include <gst/video/video.h>
18 #include <gst/base/base.h>
19 #include "media_errors.h"
20 #include "scope_guard.h"
21 #include "buffer_type_meta.h"
22 #include "gst_shmem_allocator.h"
23 #include "gst_shmem_pool.h"
24 
25 #define gst_shmem_src_parent_class parent_class
26 using namespace OHOS;
27 namespace {
28     constexpr guint DEFAULT_SHMEM_BUF_NUM = 8;
29     constexpr guint DEFAULT_QUEUE_SIZE = 16;
30 }
31 
32 GST_DEBUG_CATEGORY_STATIC(gst_shmem_src_debug_category);
33 #define GST_CAT_DEFAULT gst_shmem_src_debug_category
34 
35 static GstStaticPadTemplate gst_src_template =
36 GST_STATIC_PAD_TEMPLATE("src",
37     GST_PAD_SRC,
38     GST_PAD_ALWAYS,
39     GST_STATIC_CAPS_ANY);
40 
41 struct _GstShmemSrcPrivate {
42     GRecMutex shmem_lock;
43     GstTask *shmem_task;
44     gboolean task_start;
45     GstBufferPool *pool;
46     GMutex priv_lock;
47     GCond task_condition;
48     GstBuffer *available_buffer;
49     gboolean flushing;
50     GMutex queue_lock;
51     GCond queue_condition;
52     GstQueueArray *queue;
53     GstShMemAllocator *allocator;
54     GstAllocationParams allocParams;
55     std::shared_ptr<OHOS::Media::AVSharedMemoryPool> av_shmem_pool;
56     gboolean eos;
57     bool unlock;
58     gboolean need_start_task;
59 };
60 
61 G_DEFINE_TYPE_WITH_PRIVATE(GstShmemSrc, gst_shmem_src, GST_TYPE_MEM_SRC);
62 
63 static GstStateChangeReturn gst_shmem_src_change_state(GstElement *element, GstStateChange transition);
64 static gboolean gst_shmem_src_decide_allocation(GstBaseSrc *basesrc, GstQuery *query);
65 static gboolean gst_shmem_src_start_task(GstShmemSrc *shmemsrc);
66 static gboolean gst_shmem_src_pause_task(GstShmemSrc *shmemsrc);
67 static gboolean gst_shmem_src_stop_task(GstShmemSrc *shmemsrc);
68 static void gst_mem_src_dispose(GObject *object);
69 static void gst_shmem_src_finalize(GObject *object);
70 static void gst_shmem_src_flush_queue(GstShmemSrc *shmemsrc);
71 static gboolean gst_shmem_src_send_event(GstElement *element, GstEvent *event);
72 GstBuffer *gst_shmem_src_pull_buffer(GstMemSrc *memsrc);
73 GstFlowReturn gst_shmem_src_push_buffer(GstMemSrc *memsrc, GstBuffer *buffer);
74 static GstFlowReturn gst_shmem_src_create(GstBaseSrc *src, guint64 offset, guint size, GstBuffer **buffer);
75 static void gst_shmem_src_loop(GstShmemSrc *shmemsrc);
76 static gboolean gst_shmem_src_unlock(GstBaseSrc *src);
77 static gboolean gst_shmem_src_unlock_stop(GstBaseSrc *src);
78 
gst_shmem_src_class_init(GstShmemSrcClass * klass)79 static void gst_shmem_src_class_init(GstShmemSrcClass *klass)
80 {
81     g_return_if_fail(klass != nullptr);
82     GObjectClass *gobject_class = reinterpret_cast<GObjectClass *>(klass);
83     GstElementClass *gstelement_class = reinterpret_cast<GstElementClass *>(klass);
84     GstBaseSrcClass *gstbasesrc_class = reinterpret_cast<GstBaseSrcClass *>(klass);
85     GstMemSrcClass *gstmemsrc_class = reinterpret_cast<GstMemSrcClass *>(klass);
86     GST_DEBUG_CATEGORY_INIT(gst_shmem_src_debug_category, "shmempoolsrc", 0, "shmem pool src base class");
87     gobject_class->finalize = gst_shmem_src_finalize;
88     gobject_class->dispose = gst_mem_src_dispose;
89     gstelement_class->change_state = gst_shmem_src_change_state;
90     gstelement_class->send_event = gst_shmem_src_send_event;
91     gstbasesrc_class->decide_allocation = gst_shmem_src_decide_allocation;
92     gstbasesrc_class->create = gst_shmem_src_create;
93     gstbasesrc_class->unlock = gst_shmem_src_unlock;
94     gstbasesrc_class->unlock_stop = gst_shmem_src_unlock_stop;
95     gstmemsrc_class->pull_buffer = gst_shmem_src_pull_buffer;
96     gstmemsrc_class->push_buffer = gst_shmem_src_push_buffer;
97     gst_element_class_set_static_metadata(gstelement_class,
98         "shmem mem source", "Source/shmem/Pool",
99         "Retrieve frame from shmem pool with raw data", "OpenHarmony");
100 
101     gst_element_class_add_static_pad_template(gstelement_class, &gst_src_template);
102 }
103 
gst_shmem_src_init(GstShmemSrc * shmemsrc)104 static void gst_shmem_src_init(GstShmemSrc *shmemsrc)
105 {
106     g_return_if_fail(shmemsrc != nullptr);
107     auto priv = reinterpret_cast<GstShmemSrcPrivate*>(gst_shmem_src_get_instance_private(shmemsrc));
108     g_return_if_fail(priv != nullptr);
109     shmemsrc->priv = priv;
110     g_rec_mutex_init(&shmemsrc->priv->shmem_lock);
111     shmemsrc->priv->shmem_task =
112         gst_task_new((GstTaskFunction)gst_shmem_src_loop, shmemsrc, nullptr);
113     gst_task_set_lock(shmemsrc->priv->shmem_task, &shmemsrc->priv->shmem_lock);
114     priv->task_start = FALSE;
115     g_mutex_init(&priv->priv_lock);
116     g_mutex_init(&priv->queue_lock);
117     g_cond_init(&priv->task_condition);
118     g_cond_init(&priv->queue_condition);
119     priv->available_buffer = nullptr;
120     priv->flushing = FALSE;
121     priv->eos = FALSE;
122     priv->unlock = FALSE;
123     priv->need_start_task = FALSE;
124     priv->queue = gst_queue_array_new(DEFAULT_QUEUE_SIZE);
125     priv->allocator = gst_shmem_allocator_new();
126     gst_allocation_params_init(&priv->allocParams);
127 }
128 
gst_shmem_src_flush_queue(GstShmemSrc * shmemsrc)129 static void gst_shmem_src_flush_queue(GstShmemSrc *shmemsrc)
130 {
131     g_return_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr);
132     GstMiniObject *object = nullptr;
133     auto *priv = shmemsrc->priv;
134 
135     while (!gst_queue_array_is_empty(priv->queue)) {
136         object = reinterpret_cast<GstMiniObject*>(gst_queue_array_pop_head(priv->queue));
137         if (object) {
138             gst_mini_object_unref(object);
139         }
140     }
141 }
142 
gst_mem_src_dispose(GObject * object)143 static void gst_mem_src_dispose(GObject *object)
144 {
145     GstShmemSrc *shmemsrc = GST_SHMEM_SRC(object);
146     g_return_if_fail(shmemsrc != nullptr);
147     auto priv = shmemsrc->priv;
148     g_return_if_fail(priv != nullptr);
149     g_mutex_lock(&priv->queue_lock);
150     gst_shmem_src_flush_queue(shmemsrc);
151     g_mutex_unlock(&priv->queue_lock);
152 
153     g_mutex_lock(&priv->priv_lock);
154     if (priv->allocator) {
155         gst_object_unref(priv->allocator);
156         priv->allocator = nullptr;
157     }
158     if (priv->pool) {
159         gst_buffer_pool_set_active(priv->pool, FALSE);
160         gst_object_unref(priv->pool);
161         priv->pool = nullptr;
162     }
163     priv->av_shmem_pool = nullptr;
164     if (priv->shmem_task) {
165         g_object_unref(priv->shmem_task);
166         priv->shmem_task = nullptr;
167     }
168     g_mutex_unlock(&priv->priv_lock);
169 
170     G_OBJECT_CLASS(parent_class)->dispose(object);
171 }
172 
gst_shmem_src_finalize(GObject * object)173 static void gst_shmem_src_finalize(GObject *object)
174 {
175     GstShmemSrc *shmemsrc = GST_SHMEM_SRC(object);
176     g_return_if_fail(shmemsrc != nullptr);
177     auto priv = shmemsrc->priv;
178     g_return_if_fail(priv != nullptr);
179 
180     GST_DEBUG_OBJECT(object, "finalize");
181     g_rec_mutex_clear(&priv->shmem_lock);
182     g_mutex_clear(&priv->priv_lock);
183     g_mutex_clear(&priv->queue_lock);
184     g_cond_clear(&priv->task_condition);
185     g_cond_clear(&priv->queue_condition);
186     gst_queue_array_free(priv->queue);
187 }
188 
gst_shmem_src_task_need_wait(GstShmemSrc * shmemsrc)189 static gboolean gst_shmem_src_task_need_wait(GstShmemSrc *shmemsrc)
190 {
191     auto priv = shmemsrc->priv;
192 
193     if (!priv->task_start) {
194         return FALSE;
195     }
196 
197     // available_buffer not be used, need wait.
198     if ((priv->pool != nullptr) && (priv->available_buffer == nullptr)) {
199         return FALSE;
200     }
201 
202     return TRUE;
203 }
204 
gst_shmem_src_loop(GstShmemSrc * shmemsrc)205 static void gst_shmem_src_loop(GstShmemSrc *shmemsrc)
206 {
207     GST_DEBUG_OBJECT(shmemsrc, "Loop start");
208     g_return_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr);
209     GstMemSrc *memsrc = GST_MEM_SRC(shmemsrc);
210     auto priv = shmemsrc->priv;
211 
212     g_mutex_lock(&priv->priv_lock);
213 
214     while (gst_shmem_src_task_need_wait(shmemsrc)) {
215         g_cond_wait(&priv->task_condition, &priv->priv_lock);
216     }
217 
218     if (!priv->task_start) {
219         g_mutex_unlock(&priv->priv_lock);
220         gst_task_pause(priv->shmem_task);
221         GST_DEBUG_OBJECT(shmemsrc, "Task exit");
222         return;
223     }
224 
225     GstBufferPool *pool = reinterpret_cast<GstBufferPool*>(gst_object_ref(shmemsrc->priv->pool));
226     g_mutex_unlock(&priv->priv_lock);
227 
228     GstBuffer *buffer = nullptr;
229     GST_DEBUG_OBJECT(shmemsrc, "Acquire buffer start");
230     GstFlowReturn ret = gst_buffer_pool_acquire_buffer(pool, &buffer, nullptr);
231     GST_DEBUG_OBJECT(shmemsrc, "Acquire buffer end");
232     gst_object_unref(pool);
233     pool = nullptr;
234     if (ret != GST_FLOW_OK) {
235         gst_buffer_unref(buffer);
236         gst_task_pause(priv->shmem_task);
237         GST_DEBUG_OBJECT(shmemsrc, "Task going to pause");
238         return;
239     }
240 
241     g_mutex_lock(&priv->priv_lock);
242     priv->available_buffer = buffer;
243     g_mutex_unlock(&priv->priv_lock);
244 
245     ret = gst_mem_src_buffer_available(memsrc);
246     if (ret != GST_FLOW_OK) {
247         GST_DEBUG_OBJECT(shmemsrc, "Task going to pause");
248         gst_task_pause(priv->shmem_task);
249     }
250 }
251 
gst_shmem_src_pull_buffer(GstMemSrc * memsrc)252 GstBuffer *gst_shmem_src_pull_buffer(GstMemSrc *memsrc)
253 {
254     GST_DEBUG_OBJECT(memsrc, "Pull buffer");
255     GstShmemSrc *shmemsrc = GST_SHMEM_SRC(memsrc);
256     g_return_val_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr, nullptr);
257     auto priv = shmemsrc->priv;
258     g_mutex_lock(&priv->priv_lock);
259     auto buffer = priv->available_buffer;
260     priv->available_buffer = nullptr;
261     g_cond_signal(&priv->task_condition);
262     g_mutex_unlock(&priv->priv_lock);
263     return buffer;
264 }
265 
gst_shmem_src_push_buffer(GstMemSrc * memsrc,GstBuffer * buffer)266 GstFlowReturn gst_shmem_src_push_buffer(GstMemSrc *memsrc, GstBuffer *buffer)
267 {
268     GST_DEBUG_OBJECT(memsrc, "Push buffer ref %u", (reinterpret_cast<GObject*>(buffer)->ref_count));
269     GstShmemSrc *shmemsrc = GST_SHMEM_SRC(memsrc);
270     g_return_val_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr, GST_FLOW_ERROR);
271     auto priv = shmemsrc->priv;
272     g_mutex_lock(&priv->queue_lock);
273     gst_queue_array_push_tail(priv->queue, buffer);
274     g_cond_signal(&priv->queue_condition);
275     g_mutex_unlock(&priv->queue_lock);
276     return GST_FLOW_OK;
277 }
278 
gst_shmem_src_start_task(GstShmemSrc * shmemsrc)279 static gboolean gst_shmem_src_start_task(GstShmemSrc *shmemsrc)
280 {
281     GST_INFO_OBJECT(shmemsrc, "Start task");
282     auto priv = shmemsrc->priv;
283     g_mutex_lock(&priv->priv_lock);
284     if (priv->pool) {
285         gst_buffer_pool_set_active(priv->pool, TRUE);
286     }
287     shmemsrc->priv->task_start = TRUE;
288     g_mutex_unlock(&priv->priv_lock);
289     gboolean ret = gst_task_start(shmemsrc->priv->shmem_task);
290     return ret;
291 }
292 
gst_shmem_src_pause_task(GstShmemSrc * shmemsrc)293 static gboolean gst_shmem_src_pause_task(GstShmemSrc *shmemsrc)
294 {
295     GST_INFO_OBJECT(shmemsrc, "Pause task");
296     auto priv = shmemsrc->priv;
297     g_mutex_lock(&priv->priv_lock);
298     shmemsrc->priv->task_start = FALSE;
299     g_cond_signal(&shmemsrc->priv->task_condition);
300     g_mutex_unlock(&priv->priv_lock);
301     gboolean ret = gst_task_pause(shmemsrc->priv->shmem_task);
302     return ret;
303 }
304 
gst_shmem_src_stop_task(GstShmemSrc * shmemsrc)305 static gboolean gst_shmem_src_stop_task(GstShmemSrc *shmemsrc)
306 {
307     GST_INFO_OBJECT(shmemsrc, "Stop task");
308     auto priv = shmemsrc->priv;
309     // stop will not failed
310     g_mutex_lock(&priv->priv_lock);
311     shmemsrc->priv->task_start = FALSE;
312     if (priv->pool != nullptr) {
313         // will set the pool at flushing state, the task loop will go to pause
314         gst_buffer_pool_set_active(priv->pool, FALSE);
315     }
316     g_cond_signal(&shmemsrc->priv->task_condition);
317     g_mutex_unlock(&priv->priv_lock);
318 
319     // ensure the task loop release the task lock, and go to pause
320     g_rec_mutex_lock(&priv->shmem_lock);
321     GST_INFO_OBJECT(shmemsrc, "got shmemlock");
322     g_rec_mutex_unlock(&priv->shmem_lock);
323 
324     (void)gst_task_stop(shmemsrc->priv->shmem_task);
325     gboolean ret = gst_task_join(shmemsrc->priv->shmem_task);
326     GST_INFO_OBJECT(shmemsrc, "Stop task Ok");
327     return ret;
328 }
329 
gst_shmem_src_change_state(GstElement * element,GstStateChange transition)330 static GstStateChangeReturn gst_shmem_src_change_state(GstElement *element, GstStateChange transition)
331 {
332     GstShmemSrc *shmemsrc = GST_SHMEM_SRC(element);
333     g_return_val_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr, GST_STATE_CHANGE_FAILURE);
334     switch (transition) {
335         case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
336             gst_shmem_src_start_task(shmemsrc);
337             break;
338         default:
339             break;
340     }
341     GstStateChangeReturn ret = GST_ELEMENT_CLASS(parent_class)->change_state(element, transition);
342 
343     switch (transition) {
344         case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
345             gst_shmem_src_pause_task(shmemsrc);
346             break;
347         case GST_STATE_CHANGE_PAUSED_TO_READY:
348             gst_shmem_src_stop_task(shmemsrc);
349             break;
350         default:
351             break;
352     }
353 
354     return ret;
355 }
356 
gst_shmem_src_set_pool_flushing(GstShmemSrc * shmemsrc,gboolean flushing)357 static void gst_shmem_src_set_pool_flushing(GstShmemSrc *shmemsrc, gboolean flushing)
358 {
359     g_return_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr);
360     auto priv = shmemsrc->priv;
361     if (shmemsrc->priv->pool == nullptr) {
362         return;
363     }
364 
365     g_mutex_lock(&priv->priv_lock);
366     GstBufferPool *pool = reinterpret_cast<GstBufferPool*>(gst_object_ref(shmemsrc->priv->pool));
367     g_mutex_unlock(&priv->priv_lock);
368 
369     if (pool != nullptr) {
370         gst_buffer_pool_set_flushing(pool, flushing);
371         gst_object_unref(pool);
372     }
373 }
374 
gst_shmem_src_handle_eos_event(GstShmemSrc * shmemsrc)375 static gboolean gst_shmem_src_handle_eos_event(GstShmemSrc *shmemsrc)
376 {
377     // just mark eos to avoiding the basesrc to process the eos too early, otherwise
378     // the buffers in the queue will be dropped due to the eos event.
379 
380     auto priv = shmemsrc->priv;
381     // ensure that the task loop can be stopped to wait for buffer from pool.
382     gst_shmem_src_set_pool_flushing(shmemsrc, TRUE);
383     g_mutex_lock(&priv->priv_lock);
384     shmemsrc->priv->task_start = FALSE;
385     g_mutex_unlock(&priv->priv_lock);
386     gst_task_pause(shmemsrc->priv->shmem_task);
387 
388     // ensure the task loop release the task lock, and go to pause
389     g_rec_mutex_lock(&priv->shmem_lock);
390     GST_INFO_OBJECT(shmemsrc, "got shmemlock");
391     gst_shmem_src_set_pool_flushing(shmemsrc, FALSE);
392     g_rec_mutex_unlock(&priv->shmem_lock);
393 
394     g_mutex_lock(&priv->queue_lock);
395     priv->eos = TRUE;
396     g_cond_signal(&priv->queue_condition);
397     g_mutex_unlock(&priv->queue_lock);
398 
399     return TRUE;
400 }
401 
gst_shmem_src_handle_flush_start(GstShmemSrc * shmemsrc)402 static gboolean gst_shmem_src_handle_flush_start(GstShmemSrc *shmemsrc)
403 {
404     auto priv = shmemsrc->priv;
405     g_mutex_lock(&priv->queue_lock);
406     priv->flushing = TRUE;
407     priv->eos = FALSE;
408     g_cond_signal(&priv->queue_condition);
409     g_mutex_unlock(&priv->queue_lock);
410 
411     GstState state = GST_STATE_NULL;
412     GstState pending = GST_STATE_VOID_PENDING;
413     GstStateChangeReturn ret = gst_element_get_state(GST_ELEMENT_CAST(shmemsrc), &state, &pending, 0);
414     g_return_val_if_fail(ret != GST_STATE_CHANGE_FAILURE, FALSE);
415     if (state == GST_STATE_PLAYING) {
416         shmemsrc->priv->need_start_task = TRUE;
417     }
418 
419     gst_shmem_src_set_pool_flushing(shmemsrc, TRUE);
420     return TRUE;
421 }
422 
gst_shmem_src_handle_flush_stop(GstShmemSrc * shmemsrc)423 static gboolean gst_shmem_src_handle_flush_stop(GstShmemSrc *shmemsrc)
424 {
425     auto priv = shmemsrc->priv;
426     g_mutex_lock(&priv->queue_lock);
427     priv->flushing = FALSE;
428     gst_shmem_src_flush_queue(shmemsrc);
429     g_mutex_unlock(&priv->queue_lock);
430 
431     // ensure the task loop release the task lock, and go to pause
432     g_rec_mutex_lock(&priv->shmem_lock);
433     GST_INFO_OBJECT(shmemsrc, "got shmemlock");
434     gst_shmem_src_set_pool_flushing(shmemsrc, FALSE);
435     g_rec_mutex_unlock(&priv->shmem_lock);
436 
437     g_mutex_lock(&priv->priv_lock);
438     shmemsrc->priv->task_start = TRUE;
439     g_mutex_unlock(&priv->priv_lock);
440     if (shmemsrc->priv->need_start_task == TRUE) {
441         gst_task_start(shmemsrc->priv->shmem_task);
442         shmemsrc->priv->need_start_task = FALSE;
443     }
444 
445     return TRUE;
446 }
447 
gst_shmem_src_unlock(GstBaseSrc * src)448 static gboolean gst_shmem_src_unlock(GstBaseSrc *src)
449 {
450     GstShmemSrc *shmemsrc = GST_SHMEM_SRC(src);
451     g_return_val_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr, FALSE);
452 
453     GST_DEBUG_OBJECT(shmemsrc, "unblock...");
454 
455     auto priv = shmemsrc->priv;
456     g_mutex_lock(&priv->queue_lock);
457     priv->unlock = TRUE;
458     g_cond_signal(&priv->queue_condition);
459     g_mutex_unlock(&priv->queue_lock);
460 
461     return TRUE;
462 }
463 
gst_shmem_src_unlock_stop(GstBaseSrc * src)464 static gboolean gst_shmem_src_unlock_stop(GstBaseSrc *src)
465 {
466     GstShmemSrc *shmemsrc = GST_SHMEM_SRC(src);
467     g_return_val_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr, FALSE);
468 
469     GST_DEBUG_OBJECT(shmemsrc, "unblock stop...");
470 
471     auto priv = shmemsrc->priv;
472     g_mutex_lock(&priv->queue_lock);
473     priv->unlock = FALSE;
474     g_cond_signal(&priv->queue_condition);
475     g_mutex_unlock(&priv->queue_lock);
476 
477     return TRUE;
478 }
479 
gst_shmem_src_send_event(GstElement * element,GstEvent * event)480 static gboolean gst_shmem_src_send_event(GstElement *element, GstEvent *event)
481 {
482     GstShmemSrc *shmemsrc = GST_SHMEM_SRC(element);
483     g_return_val_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr, FALSE);
484     g_return_val_if_fail(event != nullptr, FALSE);
485     GST_DEBUG_OBJECT(shmemsrc, "New event %s", GST_EVENT_TYPE_NAME(event));
486 
487     switch (GST_EVENT_TYPE(event)) {
488         case GST_EVENT_FLUSH_START:
489             (void)gst_shmem_src_handle_flush_start(shmemsrc);
490             break;
491         case GST_EVENT_FLUSH_STOP:
492             (void)gst_shmem_src_handle_flush_stop(shmemsrc);
493             break;
494         case GST_EVENT_EOS:
495             return gst_shmem_src_handle_eos_event(shmemsrc);
496         default:
497             break;
498     }
499 
500     return GST_ELEMENT_CLASS(parent_class)->send_event(element, event);
501 }
502 
gst_shmem_src_create(GstBaseSrc * src,guint64 offset,guint size,GstBuffer ** buffer)503 static GstFlowReturn gst_shmem_src_create(GstBaseSrc *src, guint64 offset, guint size, GstBuffer **buffer)
504 {
505     GST_DEBUG_OBJECT(src, "Source create");
506     (void)offset;
507     (void)size;
508     GstShmemSrc *shmemsrc = GST_SHMEM_SRC(src);
509     g_return_val_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr, GST_FLOW_ERROR);
510     g_return_val_if_fail(buffer != nullptr, GST_FLOW_ERROR);
511     auto priv = shmemsrc->priv;
512     g_mutex_lock(&priv->queue_lock);
513     while (gst_queue_array_is_empty(priv->queue) && !priv->flushing && !priv->eos && !priv->unlock) {
514         g_cond_wait(&priv->queue_condition, &priv->queue_lock);
515     }
516     if (priv->unlock) {
517         g_mutex_unlock(&priv->queue_lock);
518         GST_DEBUG_OBJECT(src, "Source unlock");
519         return GST_FLOW_FLUSHING;
520     }
521     if (priv->flushing) {
522         g_mutex_unlock(&priv->queue_lock);
523         GST_DEBUG_OBJECT(src, "Source flushing");
524         return GST_FLOW_FLUSHING;
525     }
526     if (gst_queue_array_is_empty(priv->queue) && priv->eos) {
527         g_mutex_unlock(&priv->queue_lock);
528         GST_DEBUG_OBJECT(src, "Source Eos");
529         return GST_FLOW_EOS;
530     }
531     if (!gst_queue_array_is_empty(priv->queue)) {
532         GstMiniObject *obj = reinterpret_cast<GstMiniObject*>(gst_queue_array_pop_head(priv->queue));
533         if (GST_IS_BUFFER(obj)) {
534             *buffer = GST_BUFFER(obj);
535             gsize buf_size = gst_buffer_get_size(*buffer);
536             GST_DEBUG_OBJECT(shmemsrc, "get buffer of size %" G_GSIZE_FORMAT "", buf_size);
537         }
538     }
539     g_mutex_unlock(&priv->queue_lock);
540     return GST_FLOW_OK;
541 }
542 
gst_shmem_src_new_shmem_pool(GstShmemSrc * shmemsrc,GstCaps * caps,guint size,guint buffer_cnt,gboolean is_video)543 static GstBufferPool *gst_shmem_src_new_shmem_pool(GstShmemSrc *shmemsrc, GstCaps *caps,
544     guint size, guint buffer_cnt, gboolean is_video)
545 {
546     g_return_val_if_fail(shmemsrc != nullptr, nullptr);
547     g_return_val_if_fail(shmemsrc->priv != nullptr, nullptr);
548     g_return_val_if_fail(caps != nullptr, nullptr);
549     g_return_val_if_fail(shmemsrc->priv->allocator != nullptr, nullptr);
550     auto priv = shmemsrc->priv;
551     GstShMemPool *pool = gst_shmem_pool_new();
552     g_return_val_if_fail(pool != nullptr, nullptr);
553     ON_SCOPE_EXIT(0) { gst_object_unref(pool); };
554     priv->av_shmem_pool = std::make_shared<OHOS::Media::AVSharedMemoryPool>("shmemsrc");
555     (void)gst_shmem_pool_set_avshmempool(pool, priv->av_shmem_pool);
556     (void)gst_shmem_allocator_set_pool(priv->allocator, priv->av_shmem_pool);
557     GstStructure *config = gst_buffer_pool_get_config(GST_BUFFER_POOL_CAST(pool));
558     g_return_val_if_fail(config != nullptr, nullptr);
559     if (priv->allocator == nullptr) {
560         GST_ERROR_OBJECT(shmemsrc, "Allocator is null");
561     }
562     if (is_video) {
563         gst_buffer_pool_config_add_option(config, GST_BUFFER_POOL_OPTION_VIDEO_META);
564     }
565     gst_buffer_pool_config_set_allocator(config, GST_ALLOCATOR_CAST(priv->allocator), &priv->allocParams);
566     gst_buffer_pool_config_set_params(config, caps, size, buffer_cnt, buffer_cnt);
567     g_return_val_if_fail(gst_buffer_pool_set_config(GST_BUFFER_POOL_CAST(pool), config), nullptr);
568     CANCEL_SCOPE_EXIT_GUARD(0);
569     return GST_BUFFER_POOL(pool);
570 }
571 
gst_shmem_src_set_shmem_pool(GstShmemSrc * shmemsrc,GstBufferPool * pool,GstQuery * query,guint buffer_cnt)572 static gboolean gst_shmem_src_set_shmem_pool(GstShmemSrc *shmemsrc, GstBufferPool *pool,
573     GstQuery *query, guint buffer_cnt)
574 {
575     g_return_val_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr, FALSE);
576     g_return_val_if_fail(query != nullptr, FALSE);
577     GstMemSrc *memsrc = GST_MEM_SRC(shmemsrc);
578     GstCaps *outcaps = nullptr;
579     auto priv = shmemsrc->priv;
580     // buffer size default is buffer_size
581     guint size = memsrc->buffer_size;
582     // get caps and save to video info
583     gst_query_parse_allocation(query, &outcaps, nullptr);
584     gboolean is_video = gst_query_find_allocation_meta(query, GST_VIDEO_META_API_TYPE, nullptr);
585     if (is_video) {
586         // when video need update size
587         GstVideoInfo info;
588         gst_video_info_init(&info);
589         gst_video_info_from_caps(&info, outcaps);
590         GST_INFO_OBJECT(shmemsrc, "raw video size changed");
591         size = info.size;
592     }
593     if (pool == nullptr) {
594         pool = gst_shmem_src_new_shmem_pool(shmemsrc, outcaps, size, buffer_cnt, is_video);
595         g_return_val_if_fail(pool != nullptr, FALSE);
596     }
597     g_mutex_lock(&priv->priv_lock);
598     if (priv->pool) {
599         gst_buffer_pool_set_active(priv->pool, FALSE);
600         gst_object_unref(priv->pool);
601     }
602     priv->pool = pool;
603     gst_buffer_pool_set_active(priv->pool, TRUE);
604     g_cond_signal(&priv->task_condition);
605     g_mutex_unlock(&priv->priv_lock);
606     return TRUE;
607 }
608 
gst_shmem_src_decide_allocation(GstBaseSrc * basesrc,GstQuery * query)609 static gboolean gst_shmem_src_decide_allocation(GstBaseSrc *basesrc, GstQuery *query)
610 {
611     GstShmemSrc *shmemsrc = GST_SHMEM_SRC(basesrc);
612     g_return_val_if_fail(basesrc != nullptr && query != nullptr, FALSE);
613     GstMemSrc *memsrc = GST_MEM_SRC(basesrc);
614     g_return_val_if_fail(memsrc != nullptr, FALSE);
615     GstBufferPool *pool = nullptr;
616     guint size = 0;
617     guint min_buf = memsrc->buffer_num;
618     guint max_buf = memsrc->buffer_num;
619     GST_DEBUG_OBJECT(shmemsrc, "buffer_num: %u", memsrc->buffer_num);
620 
621     // get pool and pool info from down stream
622     if (gst_query_get_n_allocation_pools(query) > 0) {
623         gst_query_parse_nth_allocation_pool(query, 0, &pool, &size, &min_buf, &max_buf);
624     }
625     // when the pool does not have GST_BUFFER_TYPE_META_API_TYPE, we use our own pool
626     if (pool != nullptr) {
627         gst_query_set_nth_allocation_pool(query, 0, nullptr, 0, 0, 0);
628         if (!gst_query_find_allocation_meta(query, GST_BUFFER_TYPE_META_API_TYPE, nullptr)) {
629             gst_object_unref(pool);
630             pool = nullptr;
631         }
632     }
633     if (min_buf > max_buf) {
634         GST_WARNING_OBJECT(shmemsrc, "Change max_buf %u to min_buf %u", max_buf, min_buf);
635         max_buf = min_buf;
636     }
637     if (max_buf == 0) {
638         GST_INFO_OBJECT(shmemsrc, "Change max_buf 0 to default %u", DEFAULT_SHMEM_BUF_NUM);
639         max_buf = DEFAULT_SHMEM_BUF_NUM;
640     }
641     return gst_shmem_src_set_shmem_pool(shmemsrc, pool, query, max_buf);
642 }
643