1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "gst_shmem_src.h"
17 #include <gst/video/video.h>
18 #include <gst/base/base.h>
19 #include "media_errors.h"
20 #include "scope_guard.h"
21 #include "buffer_type_meta.h"
22 #include "gst_shmem_allocator.h"
23 #include "gst_shmem_pool.h"
24
25 #define gst_shmem_src_parent_class parent_class
26 using namespace OHOS;
27 namespace {
28 constexpr guint DEFAULT_SHMEM_BUF_NUM = 8;
29 constexpr guint DEFAULT_QUEUE_SIZE = 16;
30 }
31
32 GST_DEBUG_CATEGORY_STATIC(gst_shmem_src_debug_category);
33 #define GST_CAT_DEFAULT gst_shmem_src_debug_category
34
35 static GstStaticPadTemplate gst_src_template =
36 GST_STATIC_PAD_TEMPLATE("src",
37 GST_PAD_SRC,
38 GST_PAD_ALWAYS,
39 GST_STATIC_CAPS_ANY);
40
41 struct _GstShmemSrcPrivate {
42 GRecMutex shmem_lock;
43 GstTask *shmem_task;
44 gboolean task_start;
45 GstBufferPool *pool;
46 GMutex priv_lock;
47 GCond task_condition;
48 GstBuffer *available_buffer;
49 gboolean flushing;
50 GMutex queue_lock;
51 GCond queue_condition;
52 GstQueueArray *queue;
53 GstShMemAllocator *allocator;
54 GstAllocationParams allocParams;
55 std::shared_ptr<OHOS::Media::AVSharedMemoryPool> av_shmem_pool;
56 gboolean eos;
57 bool unlock;
58 gboolean need_start_task;
59 };
60
61 G_DEFINE_TYPE_WITH_PRIVATE(GstShmemSrc, gst_shmem_src, GST_TYPE_MEM_SRC);
62
63 static GstStateChangeReturn gst_shmem_src_change_state(GstElement *element, GstStateChange transition);
64 static gboolean gst_shmem_src_decide_allocation(GstBaseSrc *basesrc, GstQuery *query);
65 static gboolean gst_shmem_src_start_task(GstShmemSrc *shmemsrc);
66 static gboolean gst_shmem_src_pause_task(GstShmemSrc *shmemsrc);
67 static gboolean gst_shmem_src_stop_task(GstShmemSrc *shmemsrc);
68 static void gst_mem_src_dispose(GObject *object);
69 static void gst_shmem_src_finalize(GObject *object);
70 static void gst_shmem_src_flush_queue(GstShmemSrc *shmemsrc);
71 static gboolean gst_shmem_src_send_event(GstElement *element, GstEvent *event);
72 GstBuffer *gst_shmem_src_pull_buffer(GstMemSrc *memsrc);
73 GstFlowReturn gst_shmem_src_push_buffer(GstMemSrc *memsrc, GstBuffer *buffer);
74 static GstFlowReturn gst_shmem_src_create(GstBaseSrc *src, guint64 offset, guint size, GstBuffer **buffer);
75 static void gst_shmem_src_loop(GstShmemSrc *shmemsrc);
76 static gboolean gst_shmem_src_unlock(GstBaseSrc *src);
77 static gboolean gst_shmem_src_unlock_stop(GstBaseSrc *src);
78
gst_shmem_src_class_init(GstShmemSrcClass * klass)79 static void gst_shmem_src_class_init(GstShmemSrcClass *klass)
80 {
81 g_return_if_fail(klass != nullptr);
82 GObjectClass *gobject_class = reinterpret_cast<GObjectClass *>(klass);
83 GstElementClass *gstelement_class = reinterpret_cast<GstElementClass *>(klass);
84 GstBaseSrcClass *gstbasesrc_class = reinterpret_cast<GstBaseSrcClass *>(klass);
85 GstMemSrcClass *gstmemsrc_class = reinterpret_cast<GstMemSrcClass *>(klass);
86 GST_DEBUG_CATEGORY_INIT(gst_shmem_src_debug_category, "shmempoolsrc", 0, "shmem pool src base class");
87 gobject_class->finalize = gst_shmem_src_finalize;
88 gobject_class->dispose = gst_mem_src_dispose;
89 gstelement_class->change_state = gst_shmem_src_change_state;
90 gstelement_class->send_event = gst_shmem_src_send_event;
91 gstbasesrc_class->decide_allocation = gst_shmem_src_decide_allocation;
92 gstbasesrc_class->create = gst_shmem_src_create;
93 gstbasesrc_class->unlock = gst_shmem_src_unlock;
94 gstbasesrc_class->unlock_stop = gst_shmem_src_unlock_stop;
95 gstmemsrc_class->pull_buffer = gst_shmem_src_pull_buffer;
96 gstmemsrc_class->push_buffer = gst_shmem_src_push_buffer;
97 gst_element_class_set_static_metadata(gstelement_class,
98 "shmem mem source", "Source/shmem/Pool",
99 "Retrieve frame from shmem pool with raw data", "OpenHarmony");
100
101 gst_element_class_add_static_pad_template(gstelement_class, &gst_src_template);
102 }
103
gst_shmem_src_init(GstShmemSrc * shmemsrc)104 static void gst_shmem_src_init(GstShmemSrc *shmemsrc)
105 {
106 g_return_if_fail(shmemsrc != nullptr);
107 auto priv = reinterpret_cast<GstShmemSrcPrivate*>(gst_shmem_src_get_instance_private(shmemsrc));
108 g_return_if_fail(priv != nullptr);
109 shmemsrc->priv = priv;
110 g_rec_mutex_init(&shmemsrc->priv->shmem_lock);
111 shmemsrc->priv->shmem_task =
112 gst_task_new((GstTaskFunction)gst_shmem_src_loop, shmemsrc, nullptr);
113 gst_task_set_lock(shmemsrc->priv->shmem_task, &shmemsrc->priv->shmem_lock);
114 priv->task_start = FALSE;
115 g_mutex_init(&priv->priv_lock);
116 g_mutex_init(&priv->queue_lock);
117 g_cond_init(&priv->task_condition);
118 g_cond_init(&priv->queue_condition);
119 priv->available_buffer = nullptr;
120 priv->flushing = FALSE;
121 priv->eos = FALSE;
122 priv->unlock = FALSE;
123 priv->need_start_task = FALSE;
124 priv->queue = gst_queue_array_new(DEFAULT_QUEUE_SIZE);
125 priv->allocator = gst_shmem_allocator_new();
126 gst_allocation_params_init(&priv->allocParams);
127 }
128
gst_shmem_src_flush_queue(GstShmemSrc * shmemsrc)129 static void gst_shmem_src_flush_queue(GstShmemSrc *shmemsrc)
130 {
131 g_return_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr);
132 GstMiniObject *object = nullptr;
133 auto *priv = shmemsrc->priv;
134
135 while (!gst_queue_array_is_empty(priv->queue)) {
136 object = reinterpret_cast<GstMiniObject*>(gst_queue_array_pop_head(priv->queue));
137 if (object) {
138 gst_mini_object_unref(object);
139 }
140 }
141 }
142
gst_mem_src_dispose(GObject * object)143 static void gst_mem_src_dispose(GObject *object)
144 {
145 GstShmemSrc *shmemsrc = GST_SHMEM_SRC(object);
146 g_return_if_fail(shmemsrc != nullptr);
147 auto priv = shmemsrc->priv;
148 g_return_if_fail(priv != nullptr);
149 g_mutex_lock(&priv->queue_lock);
150 gst_shmem_src_flush_queue(shmemsrc);
151 g_mutex_unlock(&priv->queue_lock);
152
153 g_mutex_lock(&priv->priv_lock);
154 if (priv->allocator) {
155 gst_object_unref(priv->allocator);
156 priv->allocator = nullptr;
157 }
158 if (priv->pool) {
159 gst_buffer_pool_set_active(priv->pool, FALSE);
160 gst_object_unref(priv->pool);
161 priv->pool = nullptr;
162 }
163 priv->av_shmem_pool = nullptr;
164 if (priv->shmem_task) {
165 g_object_unref(priv->shmem_task);
166 priv->shmem_task = nullptr;
167 }
168 g_mutex_unlock(&priv->priv_lock);
169
170 G_OBJECT_CLASS(parent_class)->dispose(object);
171 }
172
gst_shmem_src_finalize(GObject * object)173 static void gst_shmem_src_finalize(GObject *object)
174 {
175 GstShmemSrc *shmemsrc = GST_SHMEM_SRC(object);
176 g_return_if_fail(shmemsrc != nullptr);
177 auto priv = shmemsrc->priv;
178 g_return_if_fail(priv != nullptr);
179
180 GST_DEBUG_OBJECT(object, "finalize");
181 g_rec_mutex_clear(&priv->shmem_lock);
182 g_mutex_clear(&priv->priv_lock);
183 g_mutex_clear(&priv->queue_lock);
184 g_cond_clear(&priv->task_condition);
185 g_cond_clear(&priv->queue_condition);
186 gst_queue_array_free(priv->queue);
187 }
188
gst_shmem_src_task_need_wait(GstShmemSrc * shmemsrc)189 static gboolean gst_shmem_src_task_need_wait(GstShmemSrc *shmemsrc)
190 {
191 auto priv = shmemsrc->priv;
192
193 if (!priv->task_start) {
194 return FALSE;
195 }
196
197 // available_buffer not be used, need wait.
198 if ((priv->pool != nullptr) && (priv->available_buffer == nullptr)) {
199 return FALSE;
200 }
201
202 return TRUE;
203 }
204
gst_shmem_src_loop(GstShmemSrc * shmemsrc)205 static void gst_shmem_src_loop(GstShmemSrc *shmemsrc)
206 {
207 GST_DEBUG_OBJECT(shmemsrc, "Loop start");
208 g_return_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr);
209 GstMemSrc *memsrc = GST_MEM_SRC(shmemsrc);
210 auto priv = shmemsrc->priv;
211
212 g_mutex_lock(&priv->priv_lock);
213
214 while (gst_shmem_src_task_need_wait(shmemsrc)) {
215 g_cond_wait(&priv->task_condition, &priv->priv_lock);
216 }
217
218 if (!priv->task_start) {
219 g_mutex_unlock(&priv->priv_lock);
220 gst_task_pause(priv->shmem_task);
221 GST_DEBUG_OBJECT(shmemsrc, "Task exit");
222 return;
223 }
224
225 GstBufferPool *pool = reinterpret_cast<GstBufferPool*>(gst_object_ref(shmemsrc->priv->pool));
226 g_mutex_unlock(&priv->priv_lock);
227
228 GstBuffer *buffer = nullptr;
229 GST_DEBUG_OBJECT(shmemsrc, "Acquire buffer start");
230 GstFlowReturn ret = gst_buffer_pool_acquire_buffer(pool, &buffer, nullptr);
231 GST_DEBUG_OBJECT(shmemsrc, "Acquire buffer end");
232 gst_object_unref(pool);
233 pool = nullptr;
234 if (ret != GST_FLOW_OK) {
235 gst_buffer_unref(buffer);
236 gst_task_pause(priv->shmem_task);
237 GST_DEBUG_OBJECT(shmemsrc, "Task going to pause");
238 return;
239 }
240
241 g_mutex_lock(&priv->priv_lock);
242 priv->available_buffer = buffer;
243 g_mutex_unlock(&priv->priv_lock);
244
245 ret = gst_mem_src_buffer_available(memsrc);
246 if (ret != GST_FLOW_OK) {
247 GST_DEBUG_OBJECT(shmemsrc, "Task going to pause");
248 gst_task_pause(priv->shmem_task);
249 }
250 }
251
gst_shmem_src_pull_buffer(GstMemSrc * memsrc)252 GstBuffer *gst_shmem_src_pull_buffer(GstMemSrc *memsrc)
253 {
254 GST_DEBUG_OBJECT(memsrc, "Pull buffer");
255 GstShmemSrc *shmemsrc = GST_SHMEM_SRC(memsrc);
256 g_return_val_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr, nullptr);
257 auto priv = shmemsrc->priv;
258 g_mutex_lock(&priv->priv_lock);
259 auto buffer = priv->available_buffer;
260 priv->available_buffer = nullptr;
261 g_cond_signal(&priv->task_condition);
262 g_mutex_unlock(&priv->priv_lock);
263 return buffer;
264 }
265
gst_shmem_src_push_buffer(GstMemSrc * memsrc,GstBuffer * buffer)266 GstFlowReturn gst_shmem_src_push_buffer(GstMemSrc *memsrc, GstBuffer *buffer)
267 {
268 GST_DEBUG_OBJECT(memsrc, "Push buffer ref %u", (reinterpret_cast<GObject*>(buffer)->ref_count));
269 GstShmemSrc *shmemsrc = GST_SHMEM_SRC(memsrc);
270 g_return_val_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr, GST_FLOW_ERROR);
271 auto priv = shmemsrc->priv;
272 g_mutex_lock(&priv->queue_lock);
273 gst_queue_array_push_tail(priv->queue, buffer);
274 g_cond_signal(&priv->queue_condition);
275 g_mutex_unlock(&priv->queue_lock);
276 return GST_FLOW_OK;
277 }
278
gst_shmem_src_start_task(GstShmemSrc * shmemsrc)279 static gboolean gst_shmem_src_start_task(GstShmemSrc *shmemsrc)
280 {
281 GST_INFO_OBJECT(shmemsrc, "Start task");
282 auto priv = shmemsrc->priv;
283 g_mutex_lock(&priv->priv_lock);
284 if (priv->pool) {
285 gst_buffer_pool_set_active(priv->pool, TRUE);
286 }
287 shmemsrc->priv->task_start = TRUE;
288 g_mutex_unlock(&priv->priv_lock);
289 gboolean ret = gst_task_start(shmemsrc->priv->shmem_task);
290 return ret;
291 }
292
gst_shmem_src_pause_task(GstShmemSrc * shmemsrc)293 static gboolean gst_shmem_src_pause_task(GstShmemSrc *shmemsrc)
294 {
295 GST_INFO_OBJECT(shmemsrc, "Pause task");
296 auto priv = shmemsrc->priv;
297 g_mutex_lock(&priv->priv_lock);
298 shmemsrc->priv->task_start = FALSE;
299 g_cond_signal(&shmemsrc->priv->task_condition);
300 g_mutex_unlock(&priv->priv_lock);
301 gboolean ret = gst_task_pause(shmemsrc->priv->shmem_task);
302 return ret;
303 }
304
gst_shmem_src_stop_task(GstShmemSrc * shmemsrc)305 static gboolean gst_shmem_src_stop_task(GstShmemSrc *shmemsrc)
306 {
307 GST_INFO_OBJECT(shmemsrc, "Stop task");
308 auto priv = shmemsrc->priv;
309 // stop will not failed
310 g_mutex_lock(&priv->priv_lock);
311 shmemsrc->priv->task_start = FALSE;
312 if (priv->pool != nullptr) {
313 // will set the pool at flushing state, the task loop will go to pause
314 gst_buffer_pool_set_active(priv->pool, FALSE);
315 }
316 g_cond_signal(&shmemsrc->priv->task_condition);
317 g_mutex_unlock(&priv->priv_lock);
318
319 // ensure the task loop release the task lock, and go to pause
320 g_rec_mutex_lock(&priv->shmem_lock);
321 GST_INFO_OBJECT(shmemsrc, "got shmemlock");
322 g_rec_mutex_unlock(&priv->shmem_lock);
323
324 (void)gst_task_stop(shmemsrc->priv->shmem_task);
325 gboolean ret = gst_task_join(shmemsrc->priv->shmem_task);
326 GST_INFO_OBJECT(shmemsrc, "Stop task Ok");
327 return ret;
328 }
329
gst_shmem_src_change_state(GstElement * element,GstStateChange transition)330 static GstStateChangeReturn gst_shmem_src_change_state(GstElement *element, GstStateChange transition)
331 {
332 GstShmemSrc *shmemsrc = GST_SHMEM_SRC(element);
333 g_return_val_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr, GST_STATE_CHANGE_FAILURE);
334 switch (transition) {
335 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
336 gst_shmem_src_start_task(shmemsrc);
337 break;
338 default:
339 break;
340 }
341 GstStateChangeReturn ret = GST_ELEMENT_CLASS(parent_class)->change_state(element, transition);
342
343 switch (transition) {
344 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
345 gst_shmem_src_pause_task(shmemsrc);
346 break;
347 case GST_STATE_CHANGE_PAUSED_TO_READY:
348 gst_shmem_src_stop_task(shmemsrc);
349 break;
350 default:
351 break;
352 }
353
354 return ret;
355 }
356
gst_shmem_src_set_pool_flushing(GstShmemSrc * shmemsrc,gboolean flushing)357 static void gst_shmem_src_set_pool_flushing(GstShmemSrc *shmemsrc, gboolean flushing)
358 {
359 g_return_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr);
360 auto priv = shmemsrc->priv;
361 if (shmemsrc->priv->pool == nullptr) {
362 return;
363 }
364
365 g_mutex_lock(&priv->priv_lock);
366 GstBufferPool *pool = reinterpret_cast<GstBufferPool*>(gst_object_ref(shmemsrc->priv->pool));
367 g_mutex_unlock(&priv->priv_lock);
368
369 if (pool != nullptr) {
370 gst_buffer_pool_set_flushing(pool, flushing);
371 gst_object_unref(pool);
372 }
373 }
374
gst_shmem_src_handle_eos_event(GstShmemSrc * shmemsrc)375 static gboolean gst_shmem_src_handle_eos_event(GstShmemSrc *shmemsrc)
376 {
377 // just mark eos to avoiding the basesrc to process the eos too early, otherwise
378 // the buffers in the queue will be dropped due to the eos event.
379
380 auto priv = shmemsrc->priv;
381 // ensure that the task loop can be stopped to wait for buffer from pool.
382 gst_shmem_src_set_pool_flushing(shmemsrc, TRUE);
383 g_mutex_lock(&priv->priv_lock);
384 shmemsrc->priv->task_start = FALSE;
385 g_mutex_unlock(&priv->priv_lock);
386 gst_task_pause(shmemsrc->priv->shmem_task);
387
388 // ensure the task loop release the task lock, and go to pause
389 g_rec_mutex_lock(&priv->shmem_lock);
390 GST_INFO_OBJECT(shmemsrc, "got shmemlock");
391 gst_shmem_src_set_pool_flushing(shmemsrc, FALSE);
392 g_rec_mutex_unlock(&priv->shmem_lock);
393
394 g_mutex_lock(&priv->queue_lock);
395 priv->eos = TRUE;
396 g_cond_signal(&priv->queue_condition);
397 g_mutex_unlock(&priv->queue_lock);
398
399 return TRUE;
400 }
401
gst_shmem_src_handle_flush_start(GstShmemSrc * shmemsrc)402 static gboolean gst_shmem_src_handle_flush_start(GstShmemSrc *shmemsrc)
403 {
404 auto priv = shmemsrc->priv;
405 g_mutex_lock(&priv->queue_lock);
406 priv->flushing = TRUE;
407 priv->eos = FALSE;
408 g_cond_signal(&priv->queue_condition);
409 g_mutex_unlock(&priv->queue_lock);
410
411 GstState state = GST_STATE_NULL;
412 GstState pending = GST_STATE_VOID_PENDING;
413 GstStateChangeReturn ret = gst_element_get_state(GST_ELEMENT_CAST(shmemsrc), &state, &pending, 0);
414 g_return_val_if_fail(ret != GST_STATE_CHANGE_FAILURE, FALSE);
415 if (state == GST_STATE_PLAYING) {
416 shmemsrc->priv->need_start_task = TRUE;
417 }
418
419 gst_shmem_src_set_pool_flushing(shmemsrc, TRUE);
420 return TRUE;
421 }
422
gst_shmem_src_handle_flush_stop(GstShmemSrc * shmemsrc)423 static gboolean gst_shmem_src_handle_flush_stop(GstShmemSrc *shmemsrc)
424 {
425 auto priv = shmemsrc->priv;
426 g_mutex_lock(&priv->queue_lock);
427 priv->flushing = FALSE;
428 gst_shmem_src_flush_queue(shmemsrc);
429 g_mutex_unlock(&priv->queue_lock);
430
431 // ensure the task loop release the task lock, and go to pause
432 g_rec_mutex_lock(&priv->shmem_lock);
433 GST_INFO_OBJECT(shmemsrc, "got shmemlock");
434 gst_shmem_src_set_pool_flushing(shmemsrc, FALSE);
435 g_rec_mutex_unlock(&priv->shmem_lock);
436
437 g_mutex_lock(&priv->priv_lock);
438 shmemsrc->priv->task_start = TRUE;
439 g_mutex_unlock(&priv->priv_lock);
440 if (shmemsrc->priv->need_start_task == TRUE) {
441 gst_task_start(shmemsrc->priv->shmem_task);
442 shmemsrc->priv->need_start_task = FALSE;
443 }
444
445 return TRUE;
446 }
447
gst_shmem_src_unlock(GstBaseSrc * src)448 static gboolean gst_shmem_src_unlock(GstBaseSrc *src)
449 {
450 GstShmemSrc *shmemsrc = GST_SHMEM_SRC(src);
451 g_return_val_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr, FALSE);
452
453 GST_DEBUG_OBJECT(shmemsrc, "unblock...");
454
455 auto priv = shmemsrc->priv;
456 g_mutex_lock(&priv->queue_lock);
457 priv->unlock = TRUE;
458 g_cond_signal(&priv->queue_condition);
459 g_mutex_unlock(&priv->queue_lock);
460
461 return TRUE;
462 }
463
gst_shmem_src_unlock_stop(GstBaseSrc * src)464 static gboolean gst_shmem_src_unlock_stop(GstBaseSrc *src)
465 {
466 GstShmemSrc *shmemsrc = GST_SHMEM_SRC(src);
467 g_return_val_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr, FALSE);
468
469 GST_DEBUG_OBJECT(shmemsrc, "unblock stop...");
470
471 auto priv = shmemsrc->priv;
472 g_mutex_lock(&priv->queue_lock);
473 priv->unlock = FALSE;
474 g_cond_signal(&priv->queue_condition);
475 g_mutex_unlock(&priv->queue_lock);
476
477 return TRUE;
478 }
479
gst_shmem_src_send_event(GstElement * element,GstEvent * event)480 static gboolean gst_shmem_src_send_event(GstElement *element, GstEvent *event)
481 {
482 GstShmemSrc *shmemsrc = GST_SHMEM_SRC(element);
483 g_return_val_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr, FALSE);
484 g_return_val_if_fail(event != nullptr, FALSE);
485 GST_DEBUG_OBJECT(shmemsrc, "New event %s", GST_EVENT_TYPE_NAME(event));
486
487 switch (GST_EVENT_TYPE(event)) {
488 case GST_EVENT_FLUSH_START:
489 (void)gst_shmem_src_handle_flush_start(shmemsrc);
490 break;
491 case GST_EVENT_FLUSH_STOP:
492 (void)gst_shmem_src_handle_flush_stop(shmemsrc);
493 break;
494 case GST_EVENT_EOS:
495 return gst_shmem_src_handle_eos_event(shmemsrc);
496 default:
497 break;
498 }
499
500 return GST_ELEMENT_CLASS(parent_class)->send_event(element, event);
501 }
502
gst_shmem_src_create(GstBaseSrc * src,guint64 offset,guint size,GstBuffer ** buffer)503 static GstFlowReturn gst_shmem_src_create(GstBaseSrc *src, guint64 offset, guint size, GstBuffer **buffer)
504 {
505 GST_DEBUG_OBJECT(src, "Source create");
506 (void)offset;
507 (void)size;
508 GstShmemSrc *shmemsrc = GST_SHMEM_SRC(src);
509 g_return_val_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr, GST_FLOW_ERROR);
510 g_return_val_if_fail(buffer != nullptr, GST_FLOW_ERROR);
511 auto priv = shmemsrc->priv;
512 g_mutex_lock(&priv->queue_lock);
513 while (gst_queue_array_is_empty(priv->queue) && !priv->flushing && !priv->eos && !priv->unlock) {
514 g_cond_wait(&priv->queue_condition, &priv->queue_lock);
515 }
516 if (priv->unlock) {
517 g_mutex_unlock(&priv->queue_lock);
518 GST_DEBUG_OBJECT(src, "Source unlock");
519 return GST_FLOW_FLUSHING;
520 }
521 if (priv->flushing) {
522 g_mutex_unlock(&priv->queue_lock);
523 GST_DEBUG_OBJECT(src, "Source flushing");
524 return GST_FLOW_FLUSHING;
525 }
526 if (gst_queue_array_is_empty(priv->queue) && priv->eos) {
527 g_mutex_unlock(&priv->queue_lock);
528 GST_DEBUG_OBJECT(src, "Source Eos");
529 return GST_FLOW_EOS;
530 }
531 if (!gst_queue_array_is_empty(priv->queue)) {
532 GstMiniObject *obj = reinterpret_cast<GstMiniObject*>(gst_queue_array_pop_head(priv->queue));
533 if (GST_IS_BUFFER(obj)) {
534 *buffer = GST_BUFFER(obj);
535 gsize buf_size = gst_buffer_get_size(*buffer);
536 GST_DEBUG_OBJECT(shmemsrc, "get buffer of size %" G_GSIZE_FORMAT "", buf_size);
537 }
538 }
539 g_mutex_unlock(&priv->queue_lock);
540 return GST_FLOW_OK;
541 }
542
gst_shmem_src_new_shmem_pool(GstShmemSrc * shmemsrc,GstCaps * caps,guint size,guint buffer_cnt,gboolean is_video)543 static GstBufferPool *gst_shmem_src_new_shmem_pool(GstShmemSrc *shmemsrc, GstCaps *caps,
544 guint size, guint buffer_cnt, gboolean is_video)
545 {
546 g_return_val_if_fail(shmemsrc != nullptr, nullptr);
547 g_return_val_if_fail(shmemsrc->priv != nullptr, nullptr);
548 g_return_val_if_fail(caps != nullptr, nullptr);
549 g_return_val_if_fail(shmemsrc->priv->allocator != nullptr, nullptr);
550 auto priv = shmemsrc->priv;
551 GstShMemPool *pool = gst_shmem_pool_new();
552 g_return_val_if_fail(pool != nullptr, nullptr);
553 ON_SCOPE_EXIT(0) { gst_object_unref(pool); };
554 priv->av_shmem_pool = std::make_shared<OHOS::Media::AVSharedMemoryPool>("shmemsrc");
555 (void)gst_shmem_pool_set_avshmempool(pool, priv->av_shmem_pool);
556 (void)gst_shmem_allocator_set_pool(priv->allocator, priv->av_shmem_pool);
557 GstStructure *config = gst_buffer_pool_get_config(GST_BUFFER_POOL_CAST(pool));
558 g_return_val_if_fail(config != nullptr, nullptr);
559 if (priv->allocator == nullptr) {
560 GST_ERROR_OBJECT(shmemsrc, "Allocator is null");
561 }
562 if (is_video) {
563 gst_buffer_pool_config_add_option(config, GST_BUFFER_POOL_OPTION_VIDEO_META);
564 }
565 gst_buffer_pool_config_set_allocator(config, GST_ALLOCATOR_CAST(priv->allocator), &priv->allocParams);
566 gst_buffer_pool_config_set_params(config, caps, size, buffer_cnt, buffer_cnt);
567 g_return_val_if_fail(gst_buffer_pool_set_config(GST_BUFFER_POOL_CAST(pool), config), nullptr);
568 CANCEL_SCOPE_EXIT_GUARD(0);
569 return GST_BUFFER_POOL(pool);
570 }
571
gst_shmem_src_set_shmem_pool(GstShmemSrc * shmemsrc,GstBufferPool * pool,GstQuery * query,guint buffer_cnt)572 static gboolean gst_shmem_src_set_shmem_pool(GstShmemSrc *shmemsrc, GstBufferPool *pool,
573 GstQuery *query, guint buffer_cnt)
574 {
575 g_return_val_if_fail(shmemsrc != nullptr && shmemsrc->priv != nullptr, FALSE);
576 g_return_val_if_fail(query != nullptr, FALSE);
577 GstMemSrc *memsrc = GST_MEM_SRC(shmemsrc);
578 GstCaps *outcaps = nullptr;
579 auto priv = shmemsrc->priv;
580 // buffer size default is buffer_size
581 guint size = memsrc->buffer_size;
582 // get caps and save to video info
583 gst_query_parse_allocation(query, &outcaps, nullptr);
584 gboolean is_video = gst_query_find_allocation_meta(query, GST_VIDEO_META_API_TYPE, nullptr);
585 if (is_video) {
586 // when video need update size
587 GstVideoInfo info;
588 gst_video_info_init(&info);
589 gst_video_info_from_caps(&info, outcaps);
590 GST_INFO_OBJECT(shmemsrc, "raw video size changed");
591 size = info.size;
592 }
593 if (pool == nullptr) {
594 pool = gst_shmem_src_new_shmem_pool(shmemsrc, outcaps, size, buffer_cnt, is_video);
595 g_return_val_if_fail(pool != nullptr, FALSE);
596 }
597 g_mutex_lock(&priv->priv_lock);
598 if (priv->pool) {
599 gst_buffer_pool_set_active(priv->pool, FALSE);
600 gst_object_unref(priv->pool);
601 }
602 priv->pool = pool;
603 gst_buffer_pool_set_active(priv->pool, TRUE);
604 g_cond_signal(&priv->task_condition);
605 g_mutex_unlock(&priv->priv_lock);
606 return TRUE;
607 }
608
gst_shmem_src_decide_allocation(GstBaseSrc * basesrc,GstQuery * query)609 static gboolean gst_shmem_src_decide_allocation(GstBaseSrc *basesrc, GstQuery *query)
610 {
611 GstShmemSrc *shmemsrc = GST_SHMEM_SRC(basesrc);
612 g_return_val_if_fail(basesrc != nullptr && query != nullptr, FALSE);
613 GstMemSrc *memsrc = GST_MEM_SRC(basesrc);
614 g_return_val_if_fail(memsrc != nullptr, FALSE);
615 GstBufferPool *pool = nullptr;
616 guint size = 0;
617 guint min_buf = memsrc->buffer_num;
618 guint max_buf = memsrc->buffer_num;
619 GST_DEBUG_OBJECT(shmemsrc, "buffer_num: %u", memsrc->buffer_num);
620
621 // get pool and pool info from down stream
622 if (gst_query_get_n_allocation_pools(query) > 0) {
623 gst_query_parse_nth_allocation_pool(query, 0, &pool, &size, &min_buf, &max_buf);
624 }
625 // when the pool does not have GST_BUFFER_TYPE_META_API_TYPE, we use our own pool
626 if (pool != nullptr) {
627 gst_query_set_nth_allocation_pool(query, 0, nullptr, 0, 0, 0);
628 if (!gst_query_find_allocation_meta(query, GST_BUFFER_TYPE_META_API_TYPE, nullptr)) {
629 gst_object_unref(pool);
630 pool = nullptr;
631 }
632 }
633 if (min_buf > max_buf) {
634 GST_WARNING_OBJECT(shmemsrc, "Change max_buf %u to min_buf %u", max_buf, min_buf);
635 max_buf = min_buf;
636 }
637 if (max_buf == 0) {
638 GST_INFO_OBJECT(shmemsrc, "Change max_buf 0 to default %u", DEFAULT_SHMEM_BUF_NUM);
639 max_buf = DEFAULT_SHMEM_BUF_NUM;
640 }
641 return gst_shmem_src_set_shmem_pool(shmemsrc, pool, query, max_buf);
642 }
643