1 /* GStreamer
2 * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
3 * Copyright (C) 2008 Mark Nauwelaerts <mnauw@users.sourceforge.net>
4 * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
5 *
6 * gstcollectpads.c:
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
17 *
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21 * Boston, MA 02110-1301, USA.
22 */
23 /**
24 * SECTION:gstcollectpads
25 * @title: GstCollectPads
26 * @short_description: manages a set of pads that operate in collect mode
27 *
28 * Manages a set of pads that operate in collect mode. This means that control
29 * is given to the manager of this object when all pads have data.
30 *
31 * * Collectpads are created with gst_collect_pads_new(). A callback should then
32 * be installed with gst_collect_pads_set_function ().
33 *
34 * * Pads are added to the collection with gst_collect_pads_add_pad()/
35 * gst_collect_pads_remove_pad(). The pad has to be a sinkpad. When added,
36 * the chain, event and query functions of the pad are overridden. The
37 * element_private of the pad is used to store private information for the
38 * collectpads.
39 *
40 * * For each pad, data is queued in the _chain function or by
41 * performing a pull_range.
42 *
43 * * When data is queued on all pads in waiting mode, the callback function is called.
44 *
45 * * Data can be dequeued from the pad with the gst_collect_pads_pop() method.
46 * One can peek at the data with the gst_collect_pads_peek() function.
47 * These functions will return %NULL if the pad received an EOS event. When all
48 * pads return %NULL from a gst_collect_pads_peek(), the element can emit an EOS
49 * event itself.
50 *
51 * * Data can also be dequeued in byte units using the gst_collect_pads_available(),
52 * gst_collect_pads_read_buffer() and gst_collect_pads_flush() calls.
53 *
54 * * Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
55 * their state change functions to start and stop the processing of the collectpads.
56 * The gst_collect_pads_stop() call should be called before calling the parent
57 * element state change function in the PAUSED_TO_READY state change to ensure
58 * no pad is blocked and the element can finish streaming.
59 *
60 * * gst_collect_pads_set_waiting() sets a pad to waiting or non-waiting mode.
61 * CollectPads element is not waiting for data to be collected on non-waiting pads.
62 * Thus these pads may but need not have data when the callback is called.
63 * All pads are in waiting mode by default.
64 *
65 */
66
67 #ifdef HAVE_CONFIG_H
68 # include "config.h"
69 #endif
70
71 #include <gst/gst_private.h>
72
73 #include "gstcollectpads.h"
74
75 #include "../../../gst/glib-compat-private.h"
76
77 GST_DEBUG_CATEGORY_STATIC (collect_pads_debug);
78 #define GST_CAT_DEFAULT collect_pads_debug
79
80 struct _GstCollectDataPrivate
81 {
82 /* refcounting for struct, and destroy callback */
83 GstCollectDataDestroyNotify destroy_notify;
84 gint refcount;
85 };
86
87 struct _GstCollectPadsPrivate
88 {
89 /* with LOCK and/or STREAM_LOCK */
90 gboolean started;
91
92 /* with STREAM_LOCK */
93 guint32 cookie; /* pad_list cookie */
94 guint numpads; /* number of pads in @data */
95 guint queuedpads; /* number of pads with a buffer */
96 guint eospads; /* number of pads that are EOS */
97 GstClockTime earliest_time; /* Current earliest time */
98 GstCollectData *earliest_data; /* Pad data for current earliest time */
99
100 /* with LOCK */
101 GSList *pad_list; /* list of GstCollectData* */
102 guint32 pad_cookie; /* updated cookie */
103
104 GstCollectPadsFunction func; /* function and user_data for callback */
105 gpointer user_data;
106 GstCollectPadsBufferFunction buffer_func; /* function and user_data for buffer callback */
107 gpointer buffer_user_data;
108 GstCollectPadsCompareFunction compare_func;
109 gpointer compare_user_data;
110 GstCollectPadsEventFunction event_func; /* function and data for event callback */
111 gpointer event_user_data;
112 GstCollectPadsQueryFunction query_func;
113 gpointer query_user_data;
114 GstCollectPadsClipFunction clip_func;
115 gpointer clip_user_data;
116 GstCollectPadsFlushFunction flush_func;
117 gpointer flush_user_data;
118
119 /* no other lock needed */
120 GMutex evt_lock; /* these make up sort of poor man's event signaling */
121 GCond evt_cond;
122 guint32 evt_cookie;
123
124 gboolean seeking;
125 gboolean pending_flush_start;
126 gboolean pending_flush_stop;
127 };
128
129 #define parent_class gst_collect_pads_parent_class
130 G_DEFINE_TYPE_WITH_PRIVATE (GstCollectPads, gst_collect_pads, GST_TYPE_OBJECT);
131
132 static void gst_collect_pads_clear (GstCollectPads * pads,
133 GstCollectData * data);
134 static GstFlowReturn gst_collect_pads_chain (GstPad * pad, GstObject * parent,
135 GstBuffer * buffer);
136 static gboolean gst_collect_pads_event (GstPad * pad, GstObject * parent,
137 GstEvent * event);
138 static gboolean gst_collect_pads_query (GstPad * pad, GstObject * parent,
139 GstQuery * query);
140 static void gst_collect_pads_finalize (GObject * object);
141 static GstFlowReturn gst_collect_pads_default_collected (GstCollectPads *
142 pads, gpointer user_data);
143 static gint gst_collect_pads_default_compare_func (GstCollectPads * pads,
144 GstCollectData * data1, GstClockTime timestamp1, GstCollectData * data2,
145 GstClockTime timestamp2, gpointer user_data);
146 static gboolean gst_collect_pads_recalculate_full (GstCollectPads * pads);
147 static void ref_data (GstCollectData * data);
148 static void unref_data (GstCollectData * data);
149
150 static gboolean gst_collect_pads_event_default_internal (GstCollectPads *
151 pads, GstCollectData * data, GstEvent * event, gpointer user_data);
152 static gboolean gst_collect_pads_query_default_internal (GstCollectPads *
153 pads, GstCollectData * data, GstQuery * query, gpointer user_data);
154
155
156 /* Some properties are protected by LOCK, others by STREAM_LOCK
157 * However, manipulating either of these partitions may require
158 * to signal/wake a _WAIT, so use a separate (sort of) event to prevent races
159 * Alternative implementations are possible, e.g. some low-level re-implementing
160 * of the 2 above locks to drop both of them atomically when going into _WAIT.
161 */
162 #define GST_COLLECT_PADS_GET_EVT_COND(pads) (&((GstCollectPads *)pads)->priv->evt_cond)
163 #define GST_COLLECT_PADS_GET_EVT_LOCK(pads) (&((GstCollectPads *)pads)->priv->evt_lock)
164 #define GST_COLLECT_PADS_EVT_WAIT(pads, cookie) G_STMT_START { \
165 g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
166 /* should work unless a lot of event'ing and thread starvation */\
167 while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie) \
168 g_cond_wait (GST_COLLECT_PADS_GET_EVT_COND (pads), \
169 GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
170 cookie = ((GstCollectPads *) pads)->priv->evt_cookie; \
171 g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
172 } G_STMT_END
173 #define GST_COLLECT_PADS_EVT_WAIT_TIMED(pads, cookie, timeout) G_STMT_START { \
174 gint64 end_time = g_get_monotonic_time () + timeout; \
175 \
176 g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
177 /* should work unless a lot of event'ing and thread starvation */\
178 while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie) \
179 g_cond_wait_until (GST_COLLECT_PADS_GET_EVT_COND (pads), \
180 GST_COLLECT_PADS_GET_EVT_LOCK (pads), end_time); \
181 cookie = ((GstCollectPads *) pads)->priv->evt_cookie; \
182 g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
183 } G_STMT_END
184 #define GST_COLLECT_PADS_EVT_BROADCAST(pads) G_STMT_START { \
185 g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
186 /* never mind wrap-around */ \
187 ++(((GstCollectPads *) pads)->priv->evt_cookie); \
188 g_cond_broadcast (GST_COLLECT_PADS_GET_EVT_COND (pads)); \
189 g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
190 } G_STMT_END
191 #define GST_COLLECT_PADS_EVT_INIT(cookie) G_STMT_START { \
192 g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
193 cookie = ((GstCollectPads *) pads)->priv->evt_cookie; \
194 g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
195 } G_STMT_END
196
197 static void
gst_collect_pads_class_init(GstCollectPadsClass * klass)198 gst_collect_pads_class_init (GstCollectPadsClass * klass)
199 {
200 GObjectClass *gobject_class = (GObjectClass *) klass;
201
202 GST_DEBUG_CATEGORY_INIT (collect_pads_debug, "collectpads", 0,
203 "GstCollectPads");
204
205 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_collect_pads_finalize);
206 }
207
208 static void
gst_collect_pads_init(GstCollectPads * pads)209 gst_collect_pads_init (GstCollectPads * pads)
210 {
211 pads->priv = gst_collect_pads_get_instance_private (pads);
212
213 pads->data = NULL;
214 pads->priv->cookie = 0;
215 pads->priv->numpads = 0;
216 pads->priv->queuedpads = 0;
217 pads->priv->eospads = 0;
218 pads->priv->started = FALSE;
219
220 g_rec_mutex_init (&pads->stream_lock);
221
222 pads->priv->func = gst_collect_pads_default_collected;
223 pads->priv->user_data = NULL;
224 pads->priv->event_func = NULL;
225 pads->priv->event_user_data = NULL;
226
227 /* members for default muxing */
228 pads->priv->buffer_func = NULL;
229 pads->priv->buffer_user_data = NULL;
230 pads->priv->compare_func = gst_collect_pads_default_compare_func;
231 pads->priv->compare_user_data = NULL;
232 pads->priv->earliest_data = NULL;
233 pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
234
235 pads->priv->event_func = gst_collect_pads_event_default_internal;
236 pads->priv->query_func = gst_collect_pads_query_default_internal;
237
238 /* members to manage the pad list */
239 pads->priv->pad_cookie = 0;
240 pads->priv->pad_list = NULL;
241
242 /* members for event */
243 g_mutex_init (&pads->priv->evt_lock);
244 g_cond_init (&pads->priv->evt_cond);
245 pads->priv->evt_cookie = 0;
246
247 pads->priv->seeking = FALSE;
248 pads->priv->pending_flush_start = FALSE;
249 pads->priv->pending_flush_stop = FALSE;
250 }
251
252 static void
gst_collect_pads_finalize(GObject * object)253 gst_collect_pads_finalize (GObject * object)
254 {
255 GstCollectPads *pads = GST_COLLECT_PADS (object);
256
257 GST_DEBUG_OBJECT (object, "finalize");
258
259 g_rec_mutex_clear (&pads->stream_lock);
260
261 g_cond_clear (&pads->priv->evt_cond);
262 g_mutex_clear (&pads->priv->evt_lock);
263
264 /* Remove pads and free pads list */
265 g_slist_foreach (pads->priv->pad_list, (GFunc) unref_data, NULL);
266 g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
267 g_slist_free (pads->data);
268 g_slist_free (pads->priv->pad_list);
269
270 G_OBJECT_CLASS (parent_class)->finalize (object);
271 }
272
273 /**
274 * gst_collect_pads_new:
275 *
276 * Create a new instance of #GstCollectPads.
277 *
278 * MT safe.
279 *
280 * Returns: (transfer full): a new #GstCollectPads, or %NULL in case of an error.
281 */
282 GstCollectPads *
gst_collect_pads_new(void)283 gst_collect_pads_new (void)
284 {
285 GstCollectPads *newcoll;
286
287 newcoll = g_object_new (GST_TYPE_COLLECT_PADS, NULL);
288
289 /* clear floating flag */
290 gst_object_ref_sink (newcoll);
291
292 return newcoll;
293 }
294
295 /* Must be called with GstObject lock! */
296 static void
gst_collect_pads_set_buffer_function_locked(GstCollectPads * pads,GstCollectPadsBufferFunction func,gpointer user_data)297 gst_collect_pads_set_buffer_function_locked (GstCollectPads * pads,
298 GstCollectPadsBufferFunction func, gpointer user_data)
299 {
300 pads->priv->buffer_func = func;
301 pads->priv->buffer_user_data = user_data;
302 }
303
304 /**
305 * gst_collect_pads_set_buffer_function:
306 * @pads: the collectpads to use
307 * @func: (scope call): the function to set
308 * @user_data: (closure): user data passed to the function
309 *
310 * Set the callback function and user data that will be called with
311 * the oldest buffer when all pads have been collected, or %NULL on EOS.
312 * If a buffer is passed, the callback owns a reference and must unref
313 * it.
314 *
315 * MT safe.
316 */
317 void
gst_collect_pads_set_buffer_function(GstCollectPads * pads,GstCollectPadsBufferFunction func,gpointer user_data)318 gst_collect_pads_set_buffer_function (GstCollectPads * pads,
319 GstCollectPadsBufferFunction func, gpointer user_data)
320 {
321 g_return_if_fail (pads != NULL);
322 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
323
324 GST_OBJECT_LOCK (pads);
325 gst_collect_pads_set_buffer_function_locked (pads, func, user_data);
326 GST_OBJECT_UNLOCK (pads);
327 }
328
329 /**
330 * gst_collect_pads_set_compare_function:
331 * @pads: the pads to use
332 * @func: (scope call): the function to set
333 * @user_data: (closure): user data passed to the function
334 *
335 * Set the timestamp comparison function.
336 *
337 * MT safe.
338 */
339 /* NOTE allowing to change comparison seems not advisable;
340 no known use-case, and collaboration with default algorithm is unpredictable.
341 If custom comparing/operation is needed, just use a collect function of
342 your own */
343 void
gst_collect_pads_set_compare_function(GstCollectPads * pads,GstCollectPadsCompareFunction func,gpointer user_data)344 gst_collect_pads_set_compare_function (GstCollectPads * pads,
345 GstCollectPadsCompareFunction func, gpointer user_data)
346 {
347 g_return_if_fail (pads != NULL);
348 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
349
350 GST_OBJECT_LOCK (pads);
351 pads->priv->compare_func = func;
352 pads->priv->compare_user_data = user_data;
353 GST_OBJECT_UNLOCK (pads);
354 }
355
356 /**
357 * gst_collect_pads_set_function:
358 * @pads: the collectpads to use
359 * @func: (scope call): the function to set
360 * @user_data: user data passed to the function
361 *
362 * CollectPads provides a default collection algorithm that will determine
363 * the oldest buffer available on all of its pads, and then delegate
364 * to a configured callback.
365 * However, if circumstances are more complicated and/or more control
366 * is desired, this sets a callback that will be invoked instead when
367 * all the pads added to the collection have buffers queued.
368 * Evidently, this callback is not compatible with
369 * gst_collect_pads_set_buffer_function() callback.
370 * If this callback is set, the former will be unset.
371 *
372 * MT safe.
373 */
374 void
gst_collect_pads_set_function(GstCollectPads * pads,GstCollectPadsFunction func,gpointer user_data)375 gst_collect_pads_set_function (GstCollectPads * pads,
376 GstCollectPadsFunction func, gpointer user_data)
377 {
378 g_return_if_fail (pads != NULL);
379 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
380
381 GST_OBJECT_LOCK (pads);
382 pads->priv->func = func;
383 pads->priv->user_data = user_data;
384 gst_collect_pads_set_buffer_function_locked (pads, NULL, NULL);
385 GST_OBJECT_UNLOCK (pads);
386 }
387
388 static void
ref_data(GstCollectData * data)389 ref_data (GstCollectData * data)
390 {
391 g_assert (data != NULL);
392
393 g_atomic_int_inc (&(data->priv->refcount));
394 }
395
396 static void
unref_data(GstCollectData * data)397 unref_data (GstCollectData * data)
398 {
399 g_assert (data != NULL);
400 g_assert (data->priv->refcount > 0);
401
402 if (!g_atomic_int_dec_and_test (&(data->priv->refcount)))
403 return;
404
405 if (data->priv->destroy_notify)
406 data->priv->destroy_notify (data);
407
408 gst_object_unref (data->pad);
409 if (data->buffer) {
410 gst_buffer_unref (data->buffer);
411 }
412 g_free (data->priv);
413 g_free (data);
414 }
415
416 /**
417 * gst_collect_pads_set_event_function:
418 * @pads: the collectpads to use
419 * @func: (scope call): the function to set
420 * @user_data: user data passed to the function
421 *
422 * Set the event callback function and user data that will be called when
423 * collectpads has received an event originating from one of the collected
424 * pads. If the event being processed is a serialized one, this callback is
425 * called with @pads STREAM_LOCK held, otherwise not. As this lock should be
426 * held when calling a number of CollectPads functions, it should be acquired
427 * if so (unusually) needed.
428 *
429 * MT safe.
430 */
431 void
gst_collect_pads_set_event_function(GstCollectPads * pads,GstCollectPadsEventFunction func,gpointer user_data)432 gst_collect_pads_set_event_function (GstCollectPads * pads,
433 GstCollectPadsEventFunction func, gpointer user_data)
434 {
435 g_return_if_fail (pads != NULL);
436 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
437
438 GST_OBJECT_LOCK (pads);
439 pads->priv->event_func = func;
440 pads->priv->event_user_data = user_data;
441 GST_OBJECT_UNLOCK (pads);
442 }
443
444 /**
445 * gst_collect_pads_set_query_function:
446 * @pads: the collectpads to use
447 * @func: (scope call): the function to set
448 * @user_data: user data passed to the function
449 *
450 * Set the query callback function and user data that will be called after
451 * collectpads has received a query originating from one of the collected
452 * pads. If the query being processed is a serialized one, this callback is
453 * called with @pads STREAM_LOCK held, otherwise not. As this lock should be
454 * held when calling a number of CollectPads functions, it should be acquired
455 * if so (unusually) needed.
456 *
457 * MT safe.
458 */
459 void
gst_collect_pads_set_query_function(GstCollectPads * pads,GstCollectPadsQueryFunction func,gpointer user_data)460 gst_collect_pads_set_query_function (GstCollectPads * pads,
461 GstCollectPadsQueryFunction func, gpointer user_data)
462 {
463 g_return_if_fail (pads != NULL);
464 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
465
466 GST_OBJECT_LOCK (pads);
467 pads->priv->query_func = func;
468 pads->priv->query_user_data = user_data;
469 GST_OBJECT_UNLOCK (pads);
470 }
471
472 /**
473 * gst_collect_pads_clip_running_time:
474 * @pads: the collectpads to use
475 * @cdata: collect data of corresponding pad
476 * @buf: buffer being clipped
477 * @outbuf: (allow-none) (out): output buffer with running time, or NULL if clipped
478 * @user_data: user data (unused)
479 *
480 * Convenience clipping function that converts incoming buffer's timestamp
481 * to running time, or clips the buffer if outside configured segment.
482 *
483 * Since 1.6, this clipping function also sets the DTS parameter of the
484 * GstCollectData structure. This version of the running time DTS can be
485 * negative. G_MININT64 is used to indicate invalid value.
486 */
487 GstFlowReturn
gst_collect_pads_clip_running_time(GstCollectPads * pads,GstCollectData * cdata,GstBuffer * buf,GstBuffer ** outbuf,gpointer user_data)488 gst_collect_pads_clip_running_time (GstCollectPads * pads,
489 GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf,
490 gpointer user_data)
491 {
492 *outbuf = buf;
493
494 /* invalid left alone and passed */
495 if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS_OR_PTS (buf)))) {
496 GstClockTime time;
497 GstClockTime buf_dts, abs_dts;
498 gint dts_sign;
499
500 time = GST_BUFFER_PTS (buf);
501
502 if (GST_CLOCK_TIME_IS_VALID (time)) {
503 time =
504 gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
505 if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
506 GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment %"
507 GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf)));
508 gst_buffer_unref (buf);
509 *outbuf = NULL;
510 return GST_FLOW_OK;
511 }
512 }
513
514 GST_LOG_OBJECT (cdata->pad, "buffer pts %" GST_TIME_FORMAT " -> %"
515 GST_TIME_FORMAT " running time",
516 GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time));
517 *outbuf = gst_buffer_make_writable (buf);
518 GST_BUFFER_PTS (*outbuf) = time;
519
520 dts_sign = gst_segment_to_running_time_full (&cdata->segment,
521 GST_FORMAT_TIME, GST_BUFFER_DTS (*outbuf), &abs_dts);
522 buf_dts = GST_BUFFER_DTS (*outbuf);
523 if (dts_sign > 0) {
524 GST_BUFFER_DTS (*outbuf) = abs_dts;
525 GST_COLLECT_PADS_DTS (cdata) = abs_dts;
526 } else if (dts_sign < 0) {
527 GST_BUFFER_DTS (*outbuf) = GST_CLOCK_TIME_NONE;
528 GST_COLLECT_PADS_DTS (cdata) = -((gint64) abs_dts);
529 } else {
530 GST_BUFFER_DTS (*outbuf) = GST_CLOCK_TIME_NONE;
531 GST_COLLECT_PADS_DTS (cdata) = GST_CLOCK_STIME_NONE;
532 }
533
534 GST_LOG_OBJECT (cdata->pad, "buffer dts %" GST_TIME_FORMAT " -> %"
535 GST_STIME_FORMAT " running time", GST_TIME_ARGS (buf_dts),
536 GST_STIME_ARGS (GST_COLLECT_PADS_DTS (cdata)));
537 }
538
539 return GST_FLOW_OK;
540 }
541
542 /**
543 * gst_collect_pads_set_clip_function:
544 * @pads: the collectpads to use
545 * @clipfunc: (scope call): clip function to install
546 * @user_data: user data to pass to @clip_func
547 *
548 * Install a clipping function that is called right after a buffer is received
549 * on a pad managed by @pads. See #GstCollectPadsClipFunction for more info.
550 */
551 void
gst_collect_pads_set_clip_function(GstCollectPads * pads,GstCollectPadsClipFunction clipfunc,gpointer user_data)552 gst_collect_pads_set_clip_function (GstCollectPads * pads,
553 GstCollectPadsClipFunction clipfunc, gpointer user_data)
554 {
555 g_return_if_fail (pads != NULL);
556 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
557
558 pads->priv->clip_func = clipfunc;
559 pads->priv->clip_user_data = user_data;
560 }
561
562 /**
563 * gst_collect_pads_set_flush_function:
564 * @pads: the collectpads to use
565 * @func: (scope call): flush function to install
566 * @user_data: user data to pass to @func
567 *
568 * Install a flush function that is called when the internal
569 * state of all pads should be flushed as part of flushing seek
570 * handling. See #GstCollectPadsFlushFunction for more info.
571 *
572 * Since: 1.4
573 */
574 void
gst_collect_pads_set_flush_function(GstCollectPads * pads,GstCollectPadsFlushFunction func,gpointer user_data)575 gst_collect_pads_set_flush_function (GstCollectPads * pads,
576 GstCollectPadsFlushFunction func, gpointer user_data)
577 {
578 g_return_if_fail (pads != NULL);
579 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
580
581 pads->priv->flush_func = func;
582 pads->priv->flush_user_data = user_data;
583 }
584
585 /**
586 * gst_collect_pads_add_pad:
587 * @pads: the collectpads to use
588 * @pad: (transfer none): the pad to add
589 * @size: the size of the returned #GstCollectData structure
590 * @destroy_notify: (scope async): function to be called before the returned
591 * #GstCollectData structure is freed
592 * @lock: whether to lock this pad in usual waiting state
593 *
594 * Add a pad to the collection of collect pads. The pad has to be
595 * a sinkpad. The refcount of the pad is incremented. Use
596 * gst_collect_pads_remove_pad() to remove the pad from the collection
597 * again.
598 *
599 * You specify a size for the returned #GstCollectData structure
600 * so that you can use it to store additional information.
601 *
602 * You can also specify a #GstCollectDataDestroyNotify that will be called
603 * just before the #GstCollectData structure is freed. It is passed the
604 * pointer to the structure and should free any custom memory and resources
605 * allocated for it.
606 *
607 * Keeping a pad locked in waiting state is only relevant when using
608 * the default collection algorithm (providing the oldest buffer).
609 * It ensures a buffer must be available on this pad for a collection
610 * to take place. This is of typical use to a muxer element where
611 * non-subtitle streams should always be in waiting state,
612 * e.g. to assure that caps information is available on all these streams
613 * when initial headers have to be written.
614 *
615 * The pad will be automatically activated in push mode when @pads is
616 * started.
617 *
618 * MT safe.
619 *
620 * Returns: (nullable) (transfer none): a new #GstCollectData to identify the
621 * new pad. Or %NULL if wrong parameters are supplied.
622 */
623 GstCollectData *
gst_collect_pads_add_pad(GstCollectPads * pads,GstPad * pad,guint size,GstCollectDataDestroyNotify destroy_notify,gboolean lock)624 gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size,
625 GstCollectDataDestroyNotify destroy_notify, gboolean lock)
626 {
627 GstCollectData *data;
628
629 g_return_val_if_fail (pads != NULL, NULL);
630 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
631 g_return_val_if_fail (pad != NULL, NULL);
632 g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL);
633 g_return_val_if_fail (size >= sizeof (GstCollectData), NULL);
634
635 GST_DEBUG_OBJECT (pads, "adding pad %s:%s", GST_DEBUG_PAD_NAME (pad));
636
637 data = g_malloc0 (size);
638 data->priv = g_new0 (GstCollectDataPrivate, 1);
639 data->collect = pads;
640 data->pad = gst_object_ref (pad);
641 data->buffer = NULL;
642 data->pos = 0;
643 gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
644 data->state = GST_COLLECT_PADS_STATE_WAITING;
645 data->state |= lock ? GST_COLLECT_PADS_STATE_LOCKED : 0;
646 data->priv->refcount = 1;
647 data->priv->destroy_notify = destroy_notify;
648 data->ABI.abi.dts = G_MININT64;
649
650 GST_OBJECT_LOCK (pads);
651 GST_OBJECT_LOCK (pad);
652 gst_pad_set_element_private (pad, data);
653 GST_OBJECT_UNLOCK (pad);
654 pads->priv->pad_list = g_slist_append (pads->priv->pad_list, data);
655 gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain));
656 gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event));
657 gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_query));
658 /* backward compat, also add to data if stopped, so that the element already
659 * has this in the public data list before going PAUSED (typically)
660 * this can only be done when we are stopped because we don't take the
661 * STREAM_LOCK to protect the pads->data list. */
662 if (!pads->priv->started) {
663 pads->data = g_slist_append (pads->data, data);
664 ref_data (data);
665 }
666 /* activate the pad when needed */
667 if (pads->priv->started)
668 gst_pad_set_active (pad, TRUE);
669 pads->priv->pad_cookie++;
670 GST_OBJECT_UNLOCK (pads);
671
672 return data;
673 }
674
675 static gint
find_pad(GstCollectData * data,GstPad * pad)676 find_pad (GstCollectData * data, GstPad * pad)
677 {
678 if (data->pad == pad)
679 return 0;
680 return 1;
681 }
682
683 /**
684 * gst_collect_pads_remove_pad:
685 * @pads: the collectpads to use
686 * @pad: (transfer none): the pad to remove
687 *
688 * Remove a pad from the collection of collect pads. This function will also
689 * free the #GstCollectData and all the resources that were allocated with
690 * gst_collect_pads_add_pad().
691 *
692 * The pad will be deactivated automatically when @pads is stopped.
693 *
694 * MT safe.
695 *
696 * Returns: %TRUE if the pad could be removed.
697 */
698 gboolean
gst_collect_pads_remove_pad(GstCollectPads * pads,GstPad * pad)699 gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
700 {
701 GstCollectData *data;
702 GSList *list;
703
704 g_return_val_if_fail (pads != NULL, FALSE);
705 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
706 g_return_val_if_fail (pad != NULL, FALSE);
707 g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
708
709 GST_DEBUG_OBJECT (pads, "removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
710
711 GST_OBJECT_LOCK (pads);
712 list =
713 g_slist_find_custom (pads->priv->pad_list, pad, (GCompareFunc) find_pad);
714 if (!list)
715 goto unknown_pad;
716
717 data = (GstCollectData *) list->data;
718
719 GST_DEBUG_OBJECT (pads, "found pad %s:%s at %p", GST_DEBUG_PAD_NAME (pad),
720 data);
721
722 /* clear the stuff we configured */
723 gst_pad_set_chain_function (pad, NULL);
724 gst_pad_set_event_function (pad, NULL);
725 GST_OBJECT_LOCK (pad);
726 gst_pad_set_element_private (pad, NULL);
727 GST_OBJECT_UNLOCK (pad);
728
729 /* backward compat, also remove from data if stopped, note that this function
730 * can only be called when we are stopped because we don't take the
731 * STREAM_LOCK to protect the pads->data list. */
732 if (!pads->priv->started) {
733 GSList *dlist;
734
735 dlist = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
736 if (dlist) {
737 GstCollectData *pdata = dlist->data;
738
739 pads->data = g_slist_delete_link (pads->data, dlist);
740 unref_data (pdata);
741 }
742 }
743 /* remove from the pad list */
744 pads->priv->pad_list = g_slist_delete_link (pads->priv->pad_list, list);
745 pads->priv->pad_cookie++;
746
747 /* signal waiters because something changed */
748 GST_COLLECT_PADS_EVT_BROADCAST (pads);
749
750 /* deactivate the pad when needed */
751 if (!pads->priv->started)
752 gst_pad_set_active (pad, FALSE);
753
754 /* clean and free the collect data */
755 unref_data (data);
756
757 GST_OBJECT_UNLOCK (pads);
758
759 return TRUE;
760
761 unknown_pad:
762 {
763 GST_WARNING_OBJECT (pads, "cannot remove unknown pad %s:%s",
764 GST_DEBUG_PAD_NAME (pad));
765 GST_OBJECT_UNLOCK (pads);
766 return FALSE;
767 }
768 }
769
770 /*
771 * Must be called with STREAM_LOCK and OBJECT_LOCK.
772 */
773 static void
gst_collect_pads_set_flushing_unlocked(GstCollectPads * pads,gboolean flushing)774 gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
775 gboolean flushing)
776 {
777 GSList *walk = NULL;
778
779 GST_DEBUG ("sink-pads flushing=%d", flushing);
780
781 /* Update the pads flushing flag */
782 for (walk = pads->priv->pad_list; walk; walk = g_slist_next (walk)) {
783 GstCollectData *cdata = walk->data;
784
785 if (GST_IS_PAD (cdata->pad)) {
786 GST_OBJECT_LOCK (cdata->pad);
787 if (flushing)
788 GST_PAD_SET_FLUSHING (cdata->pad);
789 else
790 GST_PAD_UNSET_FLUSHING (cdata->pad);
791 if (flushing)
792 GST_COLLECT_PADS_STATE_SET (cdata, GST_COLLECT_PADS_STATE_FLUSHING);
793 else
794 GST_COLLECT_PADS_STATE_UNSET (cdata, GST_COLLECT_PADS_STATE_FLUSHING);
795 gst_collect_pads_clear (pads, cdata);
796 GST_OBJECT_UNLOCK (cdata->pad);
797 }
798 }
799
800 /* inform _chain of changes */
801 GST_COLLECT_PADS_EVT_BROADCAST (pads);
802 }
803
804 /**
805 * gst_collect_pads_set_flushing:
806 * @pads: the collectpads to use
807 * @flushing: desired state of the pads
808 *
809 * Change the flushing state of all the pads in the collection. No pad
810 * is able to accept anymore data when @flushing is %TRUE. Calling this
811 * function with @flushing %FALSE makes @pads accept data again.
812 * Caller must ensure that downstream streaming (thread) is not blocked,
813 * e.g. by sending a FLUSH_START downstream.
814 *
815 * MT safe.
816 */
817 void
gst_collect_pads_set_flushing(GstCollectPads * pads,gboolean flushing)818 gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
819 {
820 g_return_if_fail (pads != NULL);
821 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
822
823 /* NOTE since this eventually calls _pop, some (STREAM_)LOCK is needed here */
824 GST_COLLECT_PADS_STREAM_LOCK (pads);
825 GST_OBJECT_LOCK (pads);
826 gst_collect_pads_set_flushing_unlocked (pads, flushing);
827 GST_OBJECT_UNLOCK (pads);
828 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
829 }
830
831 /**
832 * gst_collect_pads_start:
833 * @pads: the collectpads to use
834 *
835 * Starts the processing of data in the collect_pads.
836 *
837 * MT safe.
838 */
839 void
gst_collect_pads_start(GstCollectPads * pads)840 gst_collect_pads_start (GstCollectPads * pads)
841 {
842 GSList *collected;
843
844 g_return_if_fail (pads != NULL);
845 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
846
847 GST_DEBUG_OBJECT (pads, "starting collect pads");
848
849 /* make sure stop and collect cannot be called anymore */
850 GST_COLLECT_PADS_STREAM_LOCK (pads);
851
852 /* make pads streamable */
853 GST_OBJECT_LOCK (pads);
854
855 /* loop over the master pad list and reset the segment */
856 collected = pads->priv->pad_list;
857 for (; collected; collected = g_slist_next (collected)) {
858 GstCollectData *data;
859
860 data = collected->data;
861 gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
862 }
863
864 gst_collect_pads_set_flushing_unlocked (pads, FALSE);
865
866 /* Start collect pads */
867 pads->priv->started = TRUE;
868 GST_OBJECT_UNLOCK (pads);
869 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
870 }
871
872 /**
873 * gst_collect_pads_stop:
874 * @pads: the collectpads to use
875 *
876 * Stops the processing of data in the collect_pads. this function
877 * will also unblock any blocking operations.
878 *
879 * MT safe.
880 */
881 void
gst_collect_pads_stop(GstCollectPads * pads)882 gst_collect_pads_stop (GstCollectPads * pads)
883 {
884 GSList *collected;
885
886 g_return_if_fail (pads != NULL);
887 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
888
889 GST_DEBUG_OBJECT (pads, "stopping collect pads");
890
891 /* make sure collect and start cannot be called anymore */
892 GST_COLLECT_PADS_STREAM_LOCK (pads);
893
894 /* make pads not accept data anymore */
895 GST_OBJECT_LOCK (pads);
896 gst_collect_pads_set_flushing_unlocked (pads, TRUE);
897
898 /* Stop collect pads */
899 pads->priv->started = FALSE;
900 pads->priv->eospads = 0;
901 pads->priv->queuedpads = 0;
902
903 /* loop over the master pad list and flush buffers */
904 collected = pads->priv->pad_list;
905 for (; collected; collected = g_slist_next (collected)) {
906 GstCollectData *data;
907 GstBuffer **buffer_p;
908
909 data = collected->data;
910 if (data->buffer) {
911 buffer_p = &data->buffer;
912 gst_buffer_replace (buffer_p, NULL);
913 data->pos = 0;
914 }
915 GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
916 }
917
918 if (pads->priv->earliest_data)
919 unref_data (pads->priv->earliest_data);
920 pads->priv->earliest_data = NULL;
921 pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
922
923 GST_OBJECT_UNLOCK (pads);
924 /* Wake them up so they can end the chain functions. */
925 GST_COLLECT_PADS_EVT_BROADCAST (pads);
926
927 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
928 }
929
930 /**
931 * gst_collect_pads_peek:
932 * @pads: the collectpads to peek
933 * @data: the data to use
934 *
935 * Peek at the buffer currently queued in @data. This function
936 * should be called with the @pads STREAM_LOCK held, such as in the callback
937 * handler.
938 *
939 * MT safe.
940 *
941 * Returns: (transfer full) (nullable): The buffer in @data or %NULL if no
942 * buffer is queued. should unref the buffer after usage.
943 */
944 GstBuffer *
gst_collect_pads_peek(GstCollectPads * pads,GstCollectData * data)945 gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
946 {
947 GstBuffer *result;
948
949 g_return_val_if_fail (pads != NULL, NULL);
950 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
951 g_return_val_if_fail (data != NULL, NULL);
952
953 if ((result = data->buffer))
954 gst_buffer_ref (result);
955
956 GST_DEBUG_OBJECT (pads, "Peeking at pad %s:%s: buffer=%" GST_PTR_FORMAT,
957 GST_DEBUG_PAD_NAME (data->pad), result);
958
959 return result;
960 }
961
962 /**
963 * gst_collect_pads_pop:
964 * @pads: the collectpads to pop
965 * @data: the data to use
966 *
967 * Pop the buffer currently queued in @data. This function
968 * should be called with the @pads STREAM_LOCK held, such as in the callback
969 * handler.
970 *
971 * MT safe.
972 *
973 * Returns: (transfer full) (nullable): The buffer in @data or %NULL if no
974 * buffer was queued. You should unref the buffer after usage.
975 */
976 GstBuffer *
gst_collect_pads_pop(GstCollectPads * pads,GstCollectData * data)977 gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
978 {
979 GstBuffer *result;
980
981 g_return_val_if_fail (pads != NULL, NULL);
982 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
983 g_return_val_if_fail (data != NULL, NULL);
984
985 if ((result = data->buffer)) {
986 data->buffer = NULL;
987 data->pos = 0;
988 /* one less pad with queued data now */
989 if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
990 pads->priv->queuedpads--;
991 }
992
993 GST_COLLECT_PADS_EVT_BROADCAST (pads);
994
995 GST_DEBUG_OBJECT (pads, "Pop buffer on pad %s:%s: buffer=%" GST_PTR_FORMAT,
996 GST_DEBUG_PAD_NAME (data->pad), result);
997
998 return result;
999 }
1000
1001 /* pop and unref the currently queued buffer, should be called with STREAM_LOCK
1002 * held */
1003 static void
gst_collect_pads_clear(GstCollectPads * pads,GstCollectData * data)1004 gst_collect_pads_clear (GstCollectPads * pads, GstCollectData * data)
1005 {
1006 GstBuffer *buf;
1007
1008 if ((buf = gst_collect_pads_pop (pads, data)))
1009 gst_buffer_unref (buf);
1010 }
1011
1012 /**
1013 * gst_collect_pads_available:
1014 * @pads: the collectpads to query
1015 *
1016 * Query how much bytes can be read from each queued buffer. This means
1017 * that the result of this call is the maximum number of bytes that can
1018 * be read from each of the pads.
1019 *
1020 * This function should be called with @pads STREAM_LOCK held, such as
1021 * in the callback.
1022 *
1023 * MT safe.
1024 *
1025 * Returns: The maximum number of bytes queued on all pads. This function
1026 * returns 0 if a pad has no queued buffer.
1027 */
1028 /* we might pre-calculate this in some struct field,
1029 * but would then have to maintain this in _chain and particularly _pop, etc,
1030 * even if element is never interested in this information */
1031 guint
gst_collect_pads_available(GstCollectPads * pads)1032 gst_collect_pads_available (GstCollectPads * pads)
1033 {
1034 GSList *collected;
1035 guint result = G_MAXUINT;
1036
1037 g_return_val_if_fail (pads != NULL, 0);
1038 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
1039
1040 collected = pads->data;
1041 for (; collected; collected = g_slist_next (collected)) {
1042 GstCollectData *pdata;
1043 GstBuffer *buffer;
1044 gint size;
1045
1046 pdata = (GstCollectData *) collected->data;
1047
1048 /* ignore pad with EOS */
1049 if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (pdata,
1050 GST_COLLECT_PADS_STATE_EOS))) {
1051 GST_DEBUG_OBJECT (pads, "pad %p is EOS", pdata);
1052 continue;
1053 }
1054
1055 /* an empty buffer without EOS is weird when we get here.. */
1056 if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
1057 GST_WARNING_OBJECT (pads, "pad %p has no buffer", pdata);
1058 goto not_filled;
1059 }
1060
1061 /* this is the size left of the buffer */
1062 size = gst_buffer_get_size (buffer) - pdata->pos;
1063 GST_DEBUG_OBJECT (pads, "pad %p has %d bytes left", pdata, size);
1064
1065 /* need to return the min of all available data */
1066 if (size < result)
1067 result = size;
1068 }
1069 /* nothing changed, all must be EOS then, return 0 */
1070 if (G_UNLIKELY (result == G_MAXUINT))
1071 result = 0;
1072
1073 return result;
1074
1075 not_filled:
1076 {
1077 return 0;
1078 }
1079 }
1080
1081 /**
1082 * gst_collect_pads_flush:
1083 * @pads: the collectpads to query
1084 * @data: the data to use
1085 * @size: the number of bytes to flush
1086 *
1087 * Flush @size bytes from the pad @data.
1088 *
1089 * This function should be called with @pads STREAM_LOCK held, such as
1090 * in the callback.
1091 *
1092 * MT safe.
1093 *
1094 * Returns: The number of bytes flushed This can be less than @size and
1095 * is 0 if the pad was end-of-stream.
1096 */
1097 guint
gst_collect_pads_flush(GstCollectPads * pads,GstCollectData * data,guint size)1098 gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
1099 guint size)
1100 {
1101 guint flushsize;
1102 gsize bsize;
1103 GstBuffer *buffer;
1104
1105 g_return_val_if_fail (pads != NULL, 0);
1106 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
1107 g_return_val_if_fail (data != NULL, 0);
1108
1109 /* no buffer, must be EOS */
1110 if ((buffer = data->buffer) == NULL)
1111 return 0;
1112
1113 bsize = gst_buffer_get_size (buffer);
1114
1115 /* this is what we can flush at max */
1116 flushsize = MIN (size, bsize - data->pos);
1117
1118 data->pos += size;
1119
1120 if (data->pos >= bsize)
1121 /* _clear will also reset data->pos to 0 */
1122 gst_collect_pads_clear (pads, data);
1123
1124 return flushsize;
1125 }
1126
1127 /**
1128 * gst_collect_pads_read_buffer:
1129 * @pads: the collectpads to query
1130 * @data: the data to use
1131 * @size: the number of bytes to read
1132 *
1133 * Get a subbuffer of @size bytes from the given pad @data.
1134 *
1135 * This function should be called with @pads STREAM_LOCK held, such as in the
1136 * callback.
1137 *
1138 * MT safe.
1139 *
1140 * Returns: (transfer full) (nullable): A sub buffer. The size of the buffer can
1141 * be less that requested. A return of %NULL signals that the pad is
1142 * end-of-stream. Unref the buffer after use.
1143 */
1144 GstBuffer *
gst_collect_pads_read_buffer(GstCollectPads * pads,GstCollectData * data,guint size)1145 gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
1146 guint size)
1147 {
1148 guint readsize, buf_size;
1149 GstBuffer *buffer;
1150
1151 g_return_val_if_fail (pads != NULL, NULL);
1152 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
1153 g_return_val_if_fail (data != NULL, NULL);
1154
1155 /* no buffer, must be EOS */
1156 if ((buffer = data->buffer) == NULL)
1157 return NULL;
1158
1159 buf_size = gst_buffer_get_size (buffer);
1160 readsize = MIN (size, buf_size - data->pos);
1161
1162 return gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, data->pos,
1163 readsize);
1164 }
1165
1166 /**
1167 * gst_collect_pads_take_buffer:
1168 * @pads: the collectpads to query
1169 * @data: the data to use
1170 * @size: the number of bytes to read
1171 *
1172 * Get a subbuffer of @size bytes from the given pad @data. Flushes the amount
1173 * of read bytes.
1174 *
1175 * This function should be called with @pads STREAM_LOCK held, such as in the
1176 * callback.
1177 *
1178 * MT safe.
1179 *
1180 * Returns: (transfer full) (nullable): A sub buffer. The size of the buffer can
1181 * be less that requested. A return of %NULL signals that the pad is
1182 * end-of-stream. Unref the buffer after use.
1183 */
1184 GstBuffer *
gst_collect_pads_take_buffer(GstCollectPads * pads,GstCollectData * data,guint size)1185 gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data,
1186 guint size)
1187 {
1188 GstBuffer *buffer = gst_collect_pads_read_buffer (pads, data, size);
1189
1190 if (buffer) {
1191 gst_collect_pads_flush (pads, data, gst_buffer_get_size (buffer));
1192 }
1193 return buffer;
1194 }
1195
1196 /**
1197 * gst_collect_pads_set_waiting:
1198 * @pads: the collectpads
1199 * @data: the data to use
1200 * @waiting: boolean indicating whether this pad should operate
1201 * in waiting or non-waiting mode
1202 *
1203 * Sets a pad to waiting or non-waiting mode, if at least this pad
1204 * has not been created with locked waiting state,
1205 * in which case nothing happens.
1206 *
1207 * This function should be called with @pads STREAM_LOCK held, such as
1208 * in the callback.
1209 *
1210 * MT safe.
1211 */
1212 void
gst_collect_pads_set_waiting(GstCollectPads * pads,GstCollectData * data,gboolean waiting)1213 gst_collect_pads_set_waiting (GstCollectPads * pads, GstCollectData * data,
1214 gboolean waiting)
1215 {
1216 g_return_if_fail (pads != NULL);
1217 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
1218 g_return_if_fail (data != NULL);
1219
1220 GST_DEBUG_OBJECT (pads, "Setting pad %s to waiting %d, locked %d",
1221 GST_PAD_NAME (data->pad), waiting,
1222 GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED));
1223
1224 /* Do something only on a change and if not locked */
1225 if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED) &&
1226 (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING) !=
1227 ! !waiting)) {
1228 /* Set waiting state for this pad */
1229 if (waiting)
1230 GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_WAITING);
1231 else
1232 GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_WAITING);
1233 /* Update number of queued pads if needed */
1234 if (!data->buffer &&
1235 !GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS)) {
1236 if (waiting)
1237 pads->priv->queuedpads--;
1238 else
1239 pads->priv->queuedpads++;
1240 }
1241
1242 /* signal waiters because something changed */
1243 GST_COLLECT_PADS_EVT_BROADCAST (pads);
1244 }
1245 }
1246
1247 /* see if pads were added or removed and update our stats. Any pad
1248 * added after releasing the LOCK will get collected in the next
1249 * round.
1250 *
1251 * We can do a quick check by checking the cookies, that get changed
1252 * whenever the pad list is updated.
1253 *
1254 * Must be called with STREAM_LOCK.
1255 */
1256 static void
gst_collect_pads_check_pads(GstCollectPads * pads)1257 gst_collect_pads_check_pads (GstCollectPads * pads)
1258 {
1259 /* the master list and cookie are protected with LOCK */
1260 GST_OBJECT_LOCK (pads);
1261 if (G_UNLIKELY (pads->priv->pad_cookie != pads->priv->cookie)) {
1262 GSList *collected;
1263
1264 /* clear list and stats */
1265 g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
1266 g_slist_free (pads->data);
1267 pads->data = NULL;
1268 pads->priv->numpads = 0;
1269 pads->priv->queuedpads = 0;
1270 pads->priv->eospads = 0;
1271 if (pads->priv->earliest_data)
1272 unref_data (pads->priv->earliest_data);
1273 pads->priv->earliest_data = NULL;
1274 pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
1275
1276 /* loop over the master pad list */
1277 collected = pads->priv->pad_list;
1278 for (; collected; collected = g_slist_next (collected)) {
1279 GstCollectData *data;
1280
1281 /* update the stats */
1282 pads->priv->numpads++;
1283 data = collected->data;
1284 if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS))
1285 pads->priv->eospads++;
1286 else if (data->buffer || !GST_COLLECT_PADS_STATE_IS_SET (data,
1287 GST_COLLECT_PADS_STATE_WAITING))
1288 pads->priv->queuedpads++;
1289
1290 /* add to the list of pads to collect */
1291 ref_data (data);
1292 /* preserve order of adding/requesting pads */
1293 pads->data = g_slist_append (pads->data, data);
1294 }
1295 /* and update the cookie */
1296 pads->priv->cookie = pads->priv->pad_cookie;
1297 }
1298 GST_OBJECT_UNLOCK (pads);
1299 }
1300
1301 /* checks if all the pads are collected and call the collectfunction
1302 *
1303 * Should be called with STREAM_LOCK.
1304 *
1305 * Returns: The #GstFlowReturn of collection.
1306 */
1307 static GstFlowReturn
gst_collect_pads_check_collected(GstCollectPads * pads)1308 gst_collect_pads_check_collected (GstCollectPads * pads)
1309 {
1310 GstFlowReturn flow_ret = GST_FLOW_OK;
1311 GstCollectPadsFunction func;
1312 gpointer user_data;
1313
1314 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1315
1316 GST_OBJECT_LOCK (pads);
1317 func = pads->priv->func;
1318 user_data = pads->priv->user_data;
1319 GST_OBJECT_UNLOCK (pads);
1320
1321 g_return_val_if_fail (pads->priv->func != NULL, GST_FLOW_NOT_SUPPORTED);
1322
1323 /* check for new pads, update stats etc.. */
1324 gst_collect_pads_check_pads (pads);
1325
1326 if (G_UNLIKELY (pads->priv->eospads == pads->priv->numpads)) {
1327 /* If all our pads are EOS just collect once to let the element
1328 * do its final EOS handling. */
1329 GST_DEBUG_OBJECT (pads, "All active pads (%d) are EOS, calling %s",
1330 pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func));
1331
1332 if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
1333 TRUE, FALSE))) {
1334 GST_INFO_OBJECT (pads, "finished seeking");
1335 }
1336 do {
1337 flow_ret = func (pads, user_data);
1338 } while (flow_ret == GST_FLOW_OK);
1339 } else {
1340 gboolean collected = FALSE;
1341
1342 /* We call the collected function as long as our condition matches. */
1343 while (((pads->priv->queuedpads + pads->priv->eospads) >=
1344 pads->priv->numpads)) {
1345 GST_DEBUG_OBJECT (pads,
1346 "All active pads (%d + %d >= %d) have data, " "calling %s",
1347 pads->priv->queuedpads, pads->priv->eospads, pads->priv->numpads,
1348 GST_DEBUG_FUNCPTR_NAME (func));
1349
1350 if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
1351 TRUE, FALSE))) {
1352 GST_INFO_OBJECT (pads, "finished seeking");
1353 }
1354 flow_ret = func (pads, user_data);
1355 collected = TRUE;
1356
1357 /* break on error */
1358 if (flow_ret != GST_FLOW_OK)
1359 break;
1360 /* Don't keep looping after telling the element EOS or flushing */
1361 if (pads->priv->queuedpads == 0)
1362 break;
1363 }
1364 if (!collected)
1365 GST_DEBUG_OBJECT (pads, "Not all active pads (%d) have data, continuing",
1366 pads->priv->numpads);
1367 }
1368 return flow_ret;
1369 }
1370
1371
1372 /* General overview:
1373 * - only pad with a buffer can determine earliest_data (and earliest_time)
1374 * - only segment info determines (non-)waiting state
1375 * - ? perhaps use _stream_time for comparison
1376 * (which muxers might have use as well ?)
1377 */
1378
1379 /*
1380 * Function to recalculate the waiting state of all pads.
1381 *
1382 * Must be called with STREAM_LOCK.
1383 *
1384 * Returns %TRUE if a pad was set to waiting
1385 * (from non-waiting state).
1386 */
1387 static gboolean
gst_collect_pads_recalculate_waiting(GstCollectPads * pads)1388 gst_collect_pads_recalculate_waiting (GstCollectPads * pads)
1389 {
1390 GSList *collected;
1391 gboolean result = FALSE;
1392
1393 /* If earliest time is not known, there is nothing to do. */
1394 if (pads->priv->earliest_data == NULL)
1395 return FALSE;
1396
1397 for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1398 GstCollectData *data = (GstCollectData *) collected->data;
1399 int cmp_res;
1400 GstClockTime comp_time;
1401
1402 /* check if pad has a segment */
1403 if (data->segment.format == GST_FORMAT_UNDEFINED) {
1404 GST_WARNING_OBJECT (pads,
1405 "GstCollectPads has no time segment, assuming 0 based.");
1406 gst_segment_init (&data->segment, GST_FORMAT_TIME);
1407 GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1408 }
1409
1410 /* check segment format */
1411 if (data->segment.format != GST_FORMAT_TIME) {
1412 GST_ERROR_OBJECT (pads, "GstCollectPads can handle only time segments.");
1413 continue;
1414 }
1415
1416 /* check if the waiting state should be changed */
1417 comp_time = data->segment.position;
1418 cmp_res = pads->priv->compare_func (pads, data, comp_time,
1419 pads->priv->earliest_data, pads->priv->earliest_time,
1420 pads->priv->compare_user_data);
1421 if (cmp_res > 0)
1422 /* stop waiting */
1423 gst_collect_pads_set_waiting (pads, data, FALSE);
1424 else {
1425 if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING)) {
1426 /* start waiting */
1427 gst_collect_pads_set_waiting (pads, data, TRUE);
1428 result = TRUE;
1429 }
1430 }
1431 }
1432
1433 return result;
1434 }
1435
1436 /**
1437 * gst_collect_pads_find_best_pad:
1438 * @pads: the collectpads to use
1439 * @data: returns the collectdata for earliest data
1440 * @time: returns the earliest available buffertime
1441 *
1442 * Find the oldest/best pad, i.e. pad holding the oldest buffer and
1443 * and return the corresponding #GstCollectData and buffertime.
1444 *
1445 * This function should be called with STREAM_LOCK held,
1446 * such as in the callback.
1447 */
1448 static void
gst_collect_pads_find_best_pad(GstCollectPads * pads,GstCollectData ** data,GstClockTime * time)1449 gst_collect_pads_find_best_pad (GstCollectPads * pads,
1450 GstCollectData ** data, GstClockTime * time)
1451 {
1452 GSList *collected;
1453 GstCollectData *best = NULL;
1454 GstClockTime best_time = GST_CLOCK_TIME_NONE;
1455
1456 g_return_if_fail (data != NULL);
1457 g_return_if_fail (time != NULL);
1458
1459 for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1460 GstBuffer *buffer;
1461 GstCollectData *data = (GstCollectData *) collected->data;
1462 GstClockTime timestamp;
1463
1464 buffer = gst_collect_pads_peek (pads, data);
1465 /* if we have a buffer check if it is better then the current best one */
1466 if (buffer != NULL) {
1467 timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
1468 gst_buffer_unref (buffer);
1469 if (best == NULL || pads->priv->compare_func (pads, data, timestamp,
1470 best, best_time, pads->priv->compare_user_data) < 0) {
1471 best = data;
1472 best_time = timestamp;
1473 }
1474 }
1475 }
1476
1477 /* set earliest time */
1478 *data = best;
1479 *time = best_time;
1480
1481 GST_DEBUG_OBJECT (pads, "best pad %s, best time %" GST_TIME_FORMAT,
1482 best ? GST_PAD_NAME (((GstCollectData *) best)->pad) : "(nil)",
1483 GST_TIME_ARGS (best_time));
1484 }
1485
1486 /*
1487 * Function to recalculate earliest_data and earliest_timestamp. This also calls
1488 * gst_collect_pads_recalculate_waiting
1489 *
1490 * Must be called with STREAM_LOCK.
1491 */
1492 static gboolean
gst_collect_pads_recalculate_full(GstCollectPads * pads)1493 gst_collect_pads_recalculate_full (GstCollectPads * pads)
1494 {
1495 if (pads->priv->earliest_data)
1496 unref_data (pads->priv->earliest_data);
1497 gst_collect_pads_find_best_pad (pads, &pads->priv->earliest_data,
1498 &pads->priv->earliest_time);
1499 if (pads->priv->earliest_data)
1500 ref_data (pads->priv->earliest_data);
1501 return gst_collect_pads_recalculate_waiting (pads);
1502 }
1503
1504 /*
1505 * Default collect callback triggered when #GstCollectPads gathered all data.
1506 *
1507 * Called with STREAM_LOCK.
1508 */
1509 static GstFlowReturn
gst_collect_pads_default_collected(GstCollectPads * pads,gpointer user_data)1510 gst_collect_pads_default_collected (GstCollectPads * pads, gpointer user_data)
1511 {
1512 GstCollectData *best = NULL;
1513 GstBuffer *buffer;
1514 GstFlowReturn ret = GST_FLOW_OK;
1515 GstCollectPadsBufferFunction func;
1516 gpointer buffer_user_data;
1517
1518 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1519
1520 GST_OBJECT_LOCK (pads);
1521 func = pads->priv->buffer_func;
1522 buffer_user_data = pads->priv->buffer_user_data;
1523 GST_OBJECT_UNLOCK (pads);
1524
1525 g_return_val_if_fail (func != NULL, GST_FLOW_NOT_SUPPORTED);
1526
1527 /* Find the oldest pad at all cost */
1528 if (gst_collect_pads_recalculate_full (pads)) {
1529 /* waiting was switched on,
1530 * so give another thread a chance to deliver a possibly
1531 * older buffer; don't charge on yet with the current oldest */
1532 ret = GST_FLOW_OK;
1533 goto done;
1534 }
1535
1536 best = pads->priv->earliest_data;
1537
1538 /* No data collected means EOS. */
1539 if (G_UNLIKELY (best == NULL)) {
1540 ret = func (pads, best, NULL, buffer_user_data);
1541 if (ret == GST_FLOW_OK)
1542 ret = GST_FLOW_EOS;
1543 goto done;
1544 }
1545
1546 /* make sure that the pad we take a buffer from is waiting;
1547 * otherwise popping a buffer will seem not to have happened
1548 * and collectpads can get into a busy loop */
1549 gst_collect_pads_set_waiting (pads, best, TRUE);
1550
1551 /* Send buffer */
1552 buffer = gst_collect_pads_pop (pads, best);
1553 ret = func (pads, best, buffer, buffer_user_data);
1554
1555 /* maybe non-waiting was forced to waiting above due to
1556 * newsegment events coming too sparsely,
1557 * so re-check to restore state to avoid hanging/waiting */
1558 gst_collect_pads_recalculate_full (pads);
1559
1560 done:
1561 return ret;
1562 }
1563
1564 /*
1565 * Default timestamp compare function.
1566 */
1567 static gint
gst_collect_pads_default_compare_func(GstCollectPads * pads,GstCollectData * data1,GstClockTime timestamp1,GstCollectData * data2,GstClockTime timestamp2,gpointer user_data)1568 gst_collect_pads_default_compare_func (GstCollectPads * pads,
1569 GstCollectData * data1, GstClockTime timestamp1,
1570 GstCollectData * data2, GstClockTime timestamp2, gpointer user_data)
1571 {
1572
1573 GST_LOG_OBJECT (pads, "comparing %" GST_TIME_FORMAT
1574 " and %" GST_TIME_FORMAT, GST_TIME_ARGS (timestamp1),
1575 GST_TIME_ARGS (timestamp2));
1576 /* non-valid timestamps go first as they are probably headers or so */
1577 if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp1)))
1578 return GST_CLOCK_TIME_IS_VALID (timestamp2) ? -1 : 0;
1579
1580 if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp2)))
1581 return 1;
1582
1583 /* compare timestamp */
1584 if (timestamp1 < timestamp2)
1585 return -1;
1586
1587 if (timestamp1 > timestamp2)
1588 return 1;
1589
1590 return 0;
1591 }
1592
1593 /* called with STREAM_LOCK */
1594 static void
gst_collect_pads_handle_position_update(GstCollectPads * pads,GstCollectData * data,GstClockTime new_pos)1595 gst_collect_pads_handle_position_update (GstCollectPads * pads,
1596 GstCollectData * data, GstClockTime new_pos)
1597 {
1598 gint cmp_res;
1599
1600 /* If oldest time is not known, or current pad got newsegment;
1601 * recalculate the state */
1602 if (!pads->priv->earliest_data || pads->priv->earliest_data == data) {
1603 gst_collect_pads_recalculate_full (pads);
1604 goto exit;
1605 }
1606
1607 /* Check if the waiting state of the pad should change. */
1608 cmp_res =
1609 pads->priv->compare_func (pads, data, new_pos,
1610 pads->priv->earliest_data, pads->priv->earliest_time,
1611 pads->priv->compare_user_data);
1612
1613 if (cmp_res > 0)
1614 /* Stop waiting */
1615 gst_collect_pads_set_waiting (pads, data, FALSE);
1616
1617 exit:
1618 return;
1619
1620 }
1621
1622 static GstClockTime
gst_collect_pads_clip_time(GstCollectPads * pads,GstCollectData * data,GstClockTime time)1623 gst_collect_pads_clip_time (GstCollectPads * pads, GstCollectData * data,
1624 GstClockTime time)
1625 {
1626 GstClockTime otime = time;
1627 GstBuffer *in, *out = NULL;
1628
1629 if (pads->priv->clip_func) {
1630 in = gst_buffer_new ();
1631 GST_BUFFER_PTS (in) = time;
1632 GST_BUFFER_DTS (in) = GST_CLOCK_TIME_NONE;
1633 pads->priv->clip_func (pads, data, in, &out, pads->priv->clip_user_data);
1634 if (out) {
1635 otime = GST_BUFFER_PTS (out);
1636 gst_buffer_unref (out);
1637 } else {
1638 /* FIXME should distinguish between ahead or after segment,
1639 * let's assume after segment and use some large time ... */
1640 otime = G_MAXINT64 / 2;
1641 }
1642 }
1643
1644 return otime;
1645 }
1646
1647 /**
1648 * gst_collect_pads_event_default:
1649 * @pads: the collectpads to use
1650 * @data: collect data of corresponding pad
1651 * @event: event being processed
1652 * @discard: process but do not send event downstream
1653 *
1654 * Default #GstCollectPads event handling that elements should always
1655 * chain up to to ensure proper operation. Element might however indicate
1656 * event should not be forwarded downstream.
1657 */
1658 gboolean
gst_collect_pads_event_default(GstCollectPads * pads,GstCollectData * data,GstEvent * event,gboolean discard)1659 gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
1660 GstEvent * event, gboolean discard)
1661 {
1662 gboolean res = TRUE;
1663 GstCollectPadsBufferFunction buffer_func;
1664 GstObject *parent;
1665 GstPad *pad;
1666
1667 GST_OBJECT_LOCK (pads);
1668 buffer_func = pads->priv->buffer_func;
1669 GST_OBJECT_UNLOCK (pads);
1670
1671 pad = data->pad;
1672 parent = GST_OBJECT_PARENT (pad);
1673
1674 GST_DEBUG_OBJECT (pad, "Got '%s' event", GST_EVENT_TYPE_NAME (event));
1675
1676 switch (GST_EVENT_TYPE (event)) {
1677 case GST_EVENT_FLUSH_START:
1678 {
1679 if (g_atomic_int_get (&pads->priv->seeking)) {
1680 /* drop all but the first FLUSH_STARTs when seeking */
1681 if (!g_atomic_int_compare_and_exchange (&pads->
1682 priv->pending_flush_start, TRUE, FALSE))
1683 goto eat;
1684
1685 /* unblock collect pads */
1686 gst_pad_event_default (pad, parent, event);
1687 event = NULL;
1688
1689 GST_COLLECT_PADS_STREAM_LOCK (pads);
1690 /* Start flushing. We never call gst_collect_pads_set_flushing (FALSE), we
1691 * instead wait until each pad gets its FLUSH_STOP and let that reset the pad to
1692 * non-flushing (which happens in gst_collect_pads_event_default).
1693 */
1694 gst_collect_pads_set_flushing (pads, TRUE);
1695
1696 if (pads->priv->flush_func)
1697 pads->priv->flush_func (pads, pads->priv->flush_user_data);
1698
1699 g_atomic_int_set (&pads->priv->pending_flush_stop, TRUE);
1700 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1701
1702 goto eat;
1703 } else {
1704 /* forward event to unblock check_collected */
1705 GST_DEBUG_OBJECT (pad, "forwarding flush start");
1706 if (!(res = gst_pad_event_default (pad, parent, event))) {
1707 GST_WARNING_OBJECT (pad, "forwarding flush start failed");
1708 }
1709 event = NULL;
1710
1711 /* now unblock the chain function.
1712 * no cond per pad, so they all unblock,
1713 * non-flushing block again */
1714 GST_COLLECT_PADS_STREAM_LOCK (pads);
1715 GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING);
1716 gst_collect_pads_clear (pads, data);
1717
1718 /* cater for possible default muxing functionality */
1719 if (buffer_func) {
1720 /* restore to initial state */
1721 gst_collect_pads_set_waiting (pads, data, TRUE);
1722 /* if the current pad is affected, reset state, recalculate later */
1723 if (pads->priv->earliest_data == data) {
1724 unref_data (data);
1725 pads->priv->earliest_data = NULL;
1726 pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
1727 }
1728 }
1729
1730 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1731
1732 goto eat;
1733 }
1734 }
1735 case GST_EVENT_FLUSH_STOP:
1736 {
1737 /* flush the 1 buffer queue */
1738 GST_COLLECT_PADS_STREAM_LOCK (pads);
1739 GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_FLUSHING);
1740 gst_collect_pads_clear (pads, data);
1741 /* we need new segment info after the flush */
1742 gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
1743 GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1744 /* if the pad was EOS, remove the EOS flag and
1745 * decrement the number of eospads */
1746 if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
1747 GST_COLLECT_PADS_STATE_EOS))) {
1748 if (!GST_COLLECT_PADS_STATE_IS_SET (data,
1749 GST_COLLECT_PADS_STATE_WAITING))
1750 pads->priv->queuedpads++;
1751 if (!g_atomic_int_get (&pads->priv->seeking)) {
1752 pads->priv->eospads--;
1753 }
1754 GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
1755 }
1756 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1757
1758 if (g_atomic_int_get (&pads->priv->seeking)) {
1759 if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_stop,
1760 TRUE, FALSE))
1761 goto forward;
1762 else
1763 goto eat;
1764 } else {
1765 goto forward;
1766 }
1767 }
1768 case GST_EVENT_EOS:
1769 {
1770 GST_COLLECT_PADS_STREAM_LOCK (pads);
1771 /* if the pad was not EOS, make it EOS and so we
1772 * have one more eospad */
1773 if (G_LIKELY (!GST_COLLECT_PADS_STATE_IS_SET (data,
1774 GST_COLLECT_PADS_STATE_EOS))) {
1775 GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_EOS);
1776 if (!GST_COLLECT_PADS_STATE_IS_SET (data,
1777 GST_COLLECT_PADS_STATE_WAITING))
1778 pads->priv->queuedpads--;
1779 pads->priv->eospads++;
1780 }
1781 /* check if we need collecting anything, we ignore the result. */
1782 gst_collect_pads_check_collected (pads);
1783 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1784
1785 goto eat;
1786 }
1787 case GST_EVENT_SEGMENT:
1788 {
1789 GstSegment seg;
1790
1791 GST_COLLECT_PADS_STREAM_LOCK (pads);
1792
1793 gst_event_copy_segment (event, &seg);
1794
1795 GST_DEBUG_OBJECT (data->pad, "got segment %" GST_SEGMENT_FORMAT, &seg);
1796
1797 /* default collection can not handle other segment formats than time */
1798 if (buffer_func && seg.format != GST_FORMAT_TIME) {
1799 GST_WARNING_OBJECT (pads, "GstCollectPads default collecting "
1800 "can only handle time segments. Non time segment ignored.");
1801 goto newsegment_done;
1802 }
1803
1804 /* need to update segment first */
1805 data->segment = seg;
1806 GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1807
1808 /* now we can use for e.g. running time */
1809 seg.position =
1810 gst_collect_pads_clip_time (pads, data, seg.start + seg.offset);
1811 /* update again */
1812 data->segment = seg;
1813
1814 /* default muxing functionality */
1815 if (!buffer_func)
1816 goto newsegment_done;
1817
1818 gst_collect_pads_handle_position_update (pads, data, seg.position);
1819
1820 newsegment_done:
1821 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1822 /* we must not forward this event since multiple segments will be
1823 * accumulated and this is certainly not what we want. */
1824 goto eat;
1825 }
1826 case GST_EVENT_GAP:
1827 {
1828 GstClockTime start, duration;
1829
1830 GST_COLLECT_PADS_STREAM_LOCK (pads);
1831
1832 gst_event_parse_gap (event, &start, &duration);
1833 /* FIXME, handle reverse playback case */
1834 if (GST_CLOCK_TIME_IS_VALID (duration))
1835 start += duration;
1836 /* we do not expect another buffer until after gap,
1837 * so that is our position now */
1838 data->segment.position = gst_collect_pads_clip_time (pads, data, start);
1839
1840 gst_collect_pads_handle_position_update (pads, data,
1841 data->segment.position);
1842
1843 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1844 goto eat;
1845 }
1846 case GST_EVENT_STREAM_START:
1847 /* drop stream start events, element must create its own start event,
1848 * we can't just forward the first random stream start event we get */
1849 goto eat;
1850 case GST_EVENT_CAPS:
1851 goto eat;
1852 default:
1853 /* forward other events */
1854 goto forward;
1855 }
1856
1857 eat:
1858 GST_DEBUG_OBJECT (pads, "dropping event: %" GST_PTR_FORMAT, event);
1859 if (event)
1860 gst_event_unref (event);
1861 return res;
1862
1863 forward:
1864 if (discard)
1865 goto eat;
1866 else {
1867 GST_DEBUG_OBJECT (pads, "forward event: %" GST_PTR_FORMAT, event);
1868 return gst_pad_event_default (pad, parent, event);
1869 }
1870 }
1871
1872 typedef struct
1873 {
1874 GstEvent *event;
1875 gboolean result;
1876 } EventData;
1877
1878 static gboolean
event_forward_func(GstPad * pad,EventData * data)1879 event_forward_func (GstPad * pad, EventData * data)
1880 {
1881 gboolean ret = TRUE;
1882 GstPad *peer = gst_pad_get_peer (pad);
1883
1884 if (peer) {
1885 ret = gst_pad_send_event (peer, gst_event_ref (data->event));
1886 gst_object_unref (peer);
1887 }
1888
1889 data->result &= ret;
1890 /* Always send to all pads */
1891 return FALSE;
1892 }
1893
1894 static gboolean
forward_event_to_all_sinkpads(GstPad * srcpad,GstEvent * event)1895 forward_event_to_all_sinkpads (GstPad * srcpad, GstEvent * event)
1896 {
1897 EventData data;
1898
1899 data.event = event;
1900 data.result = TRUE;
1901
1902 gst_pad_forward (srcpad, (GstPadForwardFunction) event_forward_func, &data);
1903
1904 gst_event_unref (event);
1905
1906 return data.result;
1907 }
1908
1909 /**
1910 * gst_collect_pads_src_event_default:
1911 * @pads: the #GstCollectPads to use
1912 * @pad: src #GstPad that received the event
1913 * @event: event being processed
1914 *
1915 * Default #GstCollectPads event handling for the src pad of elements.
1916 * Elements can chain up to this to let flushing seek event handling
1917 * be done by #GstCollectPads.
1918 *
1919 * Since: 1.4
1920 */
1921 gboolean
gst_collect_pads_src_event_default(GstCollectPads * pads,GstPad * pad,GstEvent * event)1922 gst_collect_pads_src_event_default (GstCollectPads * pads, GstPad * pad,
1923 GstEvent * event)
1924 {
1925 GstObject *parent;
1926 gboolean res = TRUE;
1927
1928 parent = GST_OBJECT_PARENT (pad);
1929
1930 switch (GST_EVENT_TYPE (event)) {
1931 case GST_EVENT_SEEK:{
1932 GstSeekFlags flags;
1933
1934 pads->priv->eospads = 0;
1935
1936 GST_INFO_OBJECT (pads, "starting seek");
1937
1938 gst_event_parse_seek (event, NULL, NULL, &flags, NULL, NULL, NULL, NULL);
1939 if (flags & GST_SEEK_FLAG_FLUSH) {
1940 g_atomic_int_set (&pads->priv->seeking, TRUE);
1941 g_atomic_int_set (&pads->priv->pending_flush_start, TRUE);
1942 /* forward the seek upstream */
1943 res = forward_event_to_all_sinkpads (pad, event);
1944 event = NULL;
1945 if (!res) {
1946 g_atomic_int_set (&pads->priv->seeking, FALSE);
1947 g_atomic_int_set (&pads->priv->pending_flush_start, FALSE);
1948 }
1949 }
1950
1951 GST_INFO_OBJECT (pads, "seek done, result: %d", res);
1952
1953 break;
1954 }
1955 default:
1956 break;
1957 }
1958
1959 if (event)
1960 res = gst_pad_event_default (pad, parent, event);
1961
1962 return res;
1963 }
1964
1965 static gboolean
gst_collect_pads_event_default_internal(GstCollectPads * pads,GstCollectData * data,GstEvent * event,gpointer user_data)1966 gst_collect_pads_event_default_internal (GstCollectPads * pads,
1967 GstCollectData * data, GstEvent * event, gpointer user_data)
1968 {
1969 return gst_collect_pads_event_default (pads, data, event, FALSE);
1970 }
1971
1972 static gboolean
gst_collect_pads_event(GstPad * pad,GstObject * parent,GstEvent * event)1973 gst_collect_pads_event (GstPad * pad, GstObject * parent, GstEvent * event)
1974 {
1975 gboolean res = FALSE, need_unlock = FALSE;
1976 GstCollectData *data;
1977 GstCollectPads *pads;
1978 GstCollectPadsEventFunction event_func;
1979 gpointer event_user_data;
1980
1981 /* some magic to get the managing collect_pads */
1982 GST_OBJECT_LOCK (pad);
1983 data = (GstCollectData *) gst_pad_get_element_private (pad);
1984 if (G_UNLIKELY (data == NULL))
1985 goto pad_removed;
1986 ref_data (data);
1987 GST_OBJECT_UNLOCK (pad);
1988
1989 res = FALSE;
1990
1991 pads = data->collect;
1992
1993 GST_DEBUG_OBJECT (data->pad, "Got %s event on sink pad",
1994 GST_EVENT_TYPE_NAME (event));
1995
1996 GST_OBJECT_LOCK (pads);
1997 event_func = pads->priv->event_func;
1998 event_user_data = pads->priv->event_user_data;
1999 GST_OBJECT_UNLOCK (pads);
2000
2001 if (GST_EVENT_IS_SERIALIZED (event)) {
2002 GST_COLLECT_PADS_STREAM_LOCK (pads);
2003 need_unlock = TRUE;
2004 }
2005
2006 if (G_LIKELY (event_func)) {
2007 res = event_func (pads, data, event, event_user_data);
2008 }
2009
2010 if (need_unlock)
2011 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2012
2013 unref_data (data);
2014 return res;
2015
2016 /* ERRORS */
2017 pad_removed:
2018 {
2019 GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2020 GST_OBJECT_UNLOCK (pad);
2021 return FALSE;
2022 }
2023 }
2024
2025 /**
2026 * gst_collect_pads_query_default:
2027 * @pads: the collectpads to use
2028 * @data: collect data of corresponding pad
2029 * @query: query being processed
2030 * @discard: process but do not send event downstream
2031 *
2032 * Default #GstCollectPads query handling that elements should always
2033 * chain up to to ensure proper operation. Element might however indicate
2034 * query should not be forwarded downstream.
2035 */
2036 gboolean
gst_collect_pads_query_default(GstCollectPads * pads,GstCollectData * data,GstQuery * query,gboolean discard)2037 gst_collect_pads_query_default (GstCollectPads * pads, GstCollectData * data,
2038 GstQuery * query, gboolean discard)
2039 {
2040 gboolean res = TRUE;
2041 GstObject *parent;
2042 GstPad *pad;
2043
2044 pad = data->pad;
2045 parent = GST_OBJECT_PARENT (pad);
2046
2047 switch (GST_QUERY_TYPE (query)) {
2048 case GST_QUERY_SEEKING:
2049 {
2050 GstFormat format;
2051
2052 /* don't pass it along as some (file)sink might claim it does
2053 * whereas with a collectpads in between that will not likely work */
2054 gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
2055 gst_query_set_seeking (query, format, FALSE, 0, -1);
2056 res = TRUE;
2057 discard = TRUE;
2058 break;
2059 }
2060 default:
2061 break;
2062 }
2063
2064 if (!discard)
2065 return gst_pad_query_default (pad, parent, query);
2066 else
2067 return res;
2068 }
2069
2070 static gboolean
gst_collect_pads_query_default_internal(GstCollectPads * pads,GstCollectData * data,GstQuery * query,gpointer user_data)2071 gst_collect_pads_query_default_internal (GstCollectPads * pads,
2072 GstCollectData * data, GstQuery * query, gpointer user_data)
2073 {
2074 return gst_collect_pads_query_default (pads, data, query, FALSE);
2075 }
2076
2077 static gboolean
gst_collect_pads_query(GstPad * pad,GstObject * parent,GstQuery * query)2078 gst_collect_pads_query (GstPad * pad, GstObject * parent, GstQuery * query)
2079 {
2080 gboolean res = FALSE, need_unlock = FALSE;
2081 GstCollectData *data;
2082 GstCollectPads *pads;
2083 GstCollectPadsQueryFunction query_func;
2084 gpointer query_user_data;
2085
2086 GST_DEBUG_OBJECT (pad, "Got %s query on sink pad",
2087 GST_QUERY_TYPE_NAME (query));
2088
2089 /* some magic to get the managing collect_pads */
2090 GST_OBJECT_LOCK (pad);
2091 data = (GstCollectData *) gst_pad_get_element_private (pad);
2092 if (G_UNLIKELY (data == NULL))
2093 goto pad_removed;
2094 ref_data (data);
2095 GST_OBJECT_UNLOCK (pad);
2096
2097 pads = data->collect;
2098
2099 GST_OBJECT_LOCK (pads);
2100 query_func = pads->priv->query_func;
2101 query_user_data = pads->priv->query_user_data;
2102 GST_OBJECT_UNLOCK (pads);
2103
2104 if (GST_QUERY_IS_SERIALIZED (query)) {
2105 GST_COLLECT_PADS_STREAM_LOCK (pads);
2106 need_unlock = TRUE;
2107 }
2108
2109 if (G_LIKELY (query_func)) {
2110 res = query_func (pads, data, query, query_user_data);
2111 }
2112
2113 if (need_unlock)
2114 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2115
2116 unref_data (data);
2117 return res;
2118
2119 /* ERRORS */
2120 pad_removed:
2121 {
2122 GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2123 GST_OBJECT_UNLOCK (pad);
2124 return FALSE;
2125 }
2126 }
2127
2128
2129 /* For each buffer we receive we check if our collected condition is reached
2130 * and if so we call the collected function. When this is done we check if
2131 * data has been unqueued. If data is still queued we wait holding the stream
2132 * lock to make sure no EOS event can happen while we are ready to be
2133 * collected
2134 */
2135 static GstFlowReturn
gst_collect_pads_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)2136 gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2137 {
2138 GstCollectData *data;
2139 GstCollectPads *pads;
2140 GstFlowReturn ret;
2141 GstBuffer **buffer_p;
2142 guint32 cookie;
2143
2144 GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2145
2146 /* some magic to get the managing collect_pads */
2147 GST_OBJECT_LOCK (pad);
2148 data = (GstCollectData *) gst_pad_get_element_private (pad);
2149 if (G_UNLIKELY (data == NULL))
2150 goto no_data;
2151 ref_data (data);
2152 GST_OBJECT_UNLOCK (pad);
2153
2154 pads = data->collect;
2155
2156 GST_COLLECT_PADS_STREAM_LOCK (pads);
2157 /* if not started, bail out */
2158 if (G_UNLIKELY (!pads->priv->started))
2159 goto not_started;
2160 /* check if this pad is flushing */
2161 if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2162 GST_COLLECT_PADS_STATE_FLUSHING)))
2163 goto flushing;
2164 /* pad was EOS, we can refuse this data */
2165 if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2166 GST_COLLECT_PADS_STATE_EOS)))
2167 goto eos;
2168
2169 /* see if we need to clip */
2170 if (pads->priv->clip_func) {
2171 GstBuffer *outbuf = NULL;
2172 ret =
2173 pads->priv->clip_func (pads, data, buffer, &outbuf,
2174 pads->priv->clip_user_data);
2175 buffer = outbuf;
2176
2177 if (G_UNLIKELY (outbuf == NULL))
2178 goto clipped;
2179
2180 if (G_UNLIKELY (ret == GST_FLOW_EOS))
2181 goto eos;
2182 else if (G_UNLIKELY (ret != GST_FLOW_OK))
2183 goto error;
2184 }
2185
2186 GST_DEBUG_OBJECT (pads, "Queuing buffer %p for pad %s:%s", buffer,
2187 GST_DEBUG_PAD_NAME (pad));
2188
2189 /* One more pad has data queued */
2190 if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
2191 pads->priv->queuedpads++;
2192 buffer_p = &data->buffer;
2193 gst_buffer_replace (buffer_p, buffer);
2194
2195 /* update segment last position if in TIME */
2196 if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
2197 GstClockTime timestamp;
2198
2199 timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
2200
2201 if (GST_CLOCK_TIME_IS_VALID (timestamp))
2202 data->segment.position = timestamp;
2203 }
2204
2205 /* While we have data queued on this pad try to collect stuff */
2206 do {
2207 /* Check if our collected condition is matched and call the collected
2208 * function if it is */
2209 ret = gst_collect_pads_check_collected (pads);
2210 /* when an error occurs, we want to report this back to the caller ASAP
2211 * without having to block if the buffer was not popped */
2212 if (G_UNLIKELY (ret != GST_FLOW_OK))
2213 goto error;
2214
2215 /* data was consumed, we can exit and accept new data */
2216 if (data->buffer == NULL)
2217 break;
2218
2219 /* Having the _INIT here means we don't care about any broadcast up to here
2220 * (most of which occur with STREAM_LOCK held, so could not have happened
2221 * anyway). We do care about e.g. a remove initiated broadcast as of this
2222 * point. Putting it here also makes this thread ignores any evt it raised
2223 * itself (as is a usual WAIT semantic).
2224 */
2225 GST_COLLECT_PADS_EVT_INIT (cookie);
2226
2227 /* pad could be removed and re-added */
2228 unref_data (data);
2229 GST_OBJECT_LOCK (pad);
2230 if (G_UNLIKELY ((data = gst_pad_get_element_private (pad)) == NULL))
2231 goto pad_removed;
2232 ref_data (data);
2233 GST_OBJECT_UNLOCK (pad);
2234
2235 GST_DEBUG_OBJECT (pads, "Pad %s:%s has a buffer queued, waiting",
2236 GST_DEBUG_PAD_NAME (pad));
2237
2238 /* wait to be collected, this must happen from another thread triggered
2239 * by the _chain function of another pad. We release the lock so we
2240 * can get stopped or flushed as well. We can however not get EOS
2241 * because we still hold the STREAM_LOCK.
2242 */
2243 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2244 GST_COLLECT_PADS_EVT_WAIT (pads, cookie);
2245 GST_COLLECT_PADS_STREAM_LOCK (pads);
2246
2247 GST_DEBUG_OBJECT (pads, "Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
2248
2249 /* after a signal, we could be stopped */
2250 if (G_UNLIKELY (!pads->priv->started))
2251 goto not_started;
2252 /* check if this pad is flushing */
2253 if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2254 GST_COLLECT_PADS_STATE_FLUSHING)))
2255 goto flushing;
2256 }
2257 while (data->buffer != NULL);
2258
2259 unlock_done:
2260 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2261 /* data is definitely NULL if pad_removed goto was run. */
2262 if (data)
2263 unref_data (data);
2264 if (buffer)
2265 gst_buffer_unref (buffer);
2266 return ret;
2267
2268 pad_removed:
2269 {
2270 GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2271 GST_OBJECT_UNLOCK (pad);
2272 ret = GST_FLOW_NOT_LINKED;
2273 goto unlock_done;
2274 }
2275 /* ERRORS */
2276 no_data:
2277 {
2278 GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2279 GST_OBJECT_UNLOCK (pad);
2280 gst_buffer_unref (buffer);
2281 return GST_FLOW_NOT_LINKED;
2282 }
2283 not_started:
2284 {
2285 GST_DEBUG ("not started");
2286 gst_collect_pads_clear (pads, data);
2287 ret = GST_FLOW_FLUSHING;
2288 goto unlock_done;
2289 }
2290 flushing:
2291 {
2292 GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
2293 gst_collect_pads_clear (pads, data);
2294 ret = GST_FLOW_FLUSHING;
2295 goto unlock_done;
2296 }
2297 eos:
2298 {
2299 /* we should not post an error for this, just inform upstream that
2300 * we don't expect anything anymore */
2301 GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
2302 ret = GST_FLOW_EOS;
2303 goto unlock_done;
2304 }
2305 clipped:
2306 {
2307 GST_DEBUG ("clipped buffer on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2308 ret = GST_FLOW_OK;
2309 goto unlock_done;
2310 }
2311 error:
2312 {
2313 /* we print the error, the element should post a reasonable error
2314 * message for fatal errors */
2315 GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret));
2316 gst_collect_pads_clear (pads, data);
2317 goto unlock_done;
2318 }
2319 }
2320