• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
3  * Copyright (C) 2008 Mark Nauwelaerts <mnauw@users.sourceforge.net>
4  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
5  *
6  * gstcollectpads.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23 /**
24  * SECTION:gstcollectpads
25  * @title: GstCollectPads
26  * @short_description: manages a set of pads that operate in collect mode
27  *
28  * Manages a set of pads that operate in collect mode. This means that control
29  * is given to the manager of this object when all pads have data.
30  *
31  *   * Collectpads are created with gst_collect_pads_new(). A callback should then
32  *     be installed with gst_collect_pads_set_function ().
33  *
34  *   * Pads are added to the collection with gst_collect_pads_add_pad()/
35  *     gst_collect_pads_remove_pad(). The pad has to be a sinkpad. When added,
36  *     the chain, event and query functions of the pad are overridden. The
37  *     element_private of the pad is used to store private information for the
38  *     collectpads.
39  *
40  *   * For each pad, data is queued in the _chain function or by
41  *     performing a pull_range.
42  *
43  *   * When data is queued on all pads in waiting mode, the callback function is called.
44  *
45  *   * Data can be dequeued from the pad with the gst_collect_pads_pop() method.
46  *     One can peek at the data with the gst_collect_pads_peek() function.
47  *     These functions will return %NULL if the pad received an EOS event. When all
48  *     pads return %NULL from a gst_collect_pads_peek(), the element can emit an EOS
49  *     event itself.
50  *
51  *   * Data can also be dequeued in byte units using the gst_collect_pads_available(),
52  *     gst_collect_pads_read_buffer() and gst_collect_pads_flush() calls.
53  *
54  *   * Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
55  *     their state change functions to start and stop the processing of the collectpads.
56  *     The gst_collect_pads_stop() call should be called before calling the parent
57  *     element state change function in the PAUSED_TO_READY state change to ensure
58  *     no pad is blocked and the element can finish streaming.
59  *
60  *   * gst_collect_pads_set_waiting() sets a pad to waiting or non-waiting mode.
61  *     CollectPads element is not waiting for data to be collected on non-waiting pads.
62  *     Thus these pads may but need not have data when the callback is called.
63  *     All pads are in waiting mode by default.
64  *
65  */
66 
67 #ifdef HAVE_CONFIG_H
68 #  include "config.h"
69 #endif
70 
71 #include <gst/gst_private.h>
72 
73 #include "gstcollectpads.h"
74 
75 #include "../../../gst/glib-compat-private.h"
76 
77 GST_DEBUG_CATEGORY_STATIC (collect_pads_debug);
78 #define GST_CAT_DEFAULT collect_pads_debug
79 
80 struct _GstCollectDataPrivate
81 {
82   /* refcounting for struct, and destroy callback */
83   GstCollectDataDestroyNotify destroy_notify;
84   gint refcount;
85 };
86 
87 struct _GstCollectPadsPrivate
88 {
89   /* with LOCK and/or STREAM_LOCK */
90   gboolean started;
91 
92   /* with STREAM_LOCK */
93   guint32 cookie;               /* pad_list cookie */
94   guint numpads;                /* number of pads in @data */
95   guint queuedpads;             /* number of pads with a buffer */
96   guint eospads;                /* number of pads that are EOS */
97   GstClockTime earliest_time;   /* Current earliest time */
98   GstCollectData *earliest_data;        /* Pad data for current earliest time */
99 
100   /* with LOCK */
101   GSList *pad_list;             /* list of GstCollectData* */
102   guint32 pad_cookie;           /* updated cookie */
103 
104   GstCollectPadsFunction func;  /* function and user_data for callback */
105   gpointer user_data;
106   GstCollectPadsBufferFunction buffer_func;     /* function and user_data for buffer callback */
107   gpointer buffer_user_data;
108   GstCollectPadsCompareFunction compare_func;
109   gpointer compare_user_data;
110   GstCollectPadsEventFunction event_func;       /* function and data for event callback */
111   gpointer event_user_data;
112   GstCollectPadsQueryFunction query_func;
113   gpointer query_user_data;
114   GstCollectPadsClipFunction clip_func;
115   gpointer clip_user_data;
116   GstCollectPadsFlushFunction flush_func;
117   gpointer flush_user_data;
118 
119   /* no other lock needed */
120   GMutex evt_lock;              /* these make up sort of poor man's event signaling */
121   GCond evt_cond;
122   guint32 evt_cookie;
123 
124   gboolean seeking;
125   gboolean pending_flush_start;
126   gboolean pending_flush_stop;
127 };
128 
129 #define parent_class gst_collect_pads_parent_class
130 G_DEFINE_TYPE_WITH_PRIVATE (GstCollectPads, gst_collect_pads, GST_TYPE_OBJECT);
131 
132 static void gst_collect_pads_clear (GstCollectPads * pads,
133     GstCollectData * data);
134 static GstFlowReturn gst_collect_pads_chain (GstPad * pad, GstObject * parent,
135     GstBuffer * buffer);
136 static gboolean gst_collect_pads_event (GstPad * pad, GstObject * parent,
137     GstEvent * event);
138 static gboolean gst_collect_pads_query (GstPad * pad, GstObject * parent,
139     GstQuery * query);
140 static void gst_collect_pads_finalize (GObject * object);
141 static GstFlowReturn gst_collect_pads_default_collected (GstCollectPads *
142     pads, gpointer user_data);
143 static gint gst_collect_pads_default_compare_func (GstCollectPads * pads,
144     GstCollectData * data1, GstClockTime timestamp1, GstCollectData * data2,
145     GstClockTime timestamp2, gpointer user_data);
146 static gboolean gst_collect_pads_recalculate_full (GstCollectPads * pads);
147 static void ref_data (GstCollectData * data);
148 static void unref_data (GstCollectData * data);
149 
150 static gboolean gst_collect_pads_event_default_internal (GstCollectPads *
151     pads, GstCollectData * data, GstEvent * event, gpointer user_data);
152 static gboolean gst_collect_pads_query_default_internal (GstCollectPads *
153     pads, GstCollectData * data, GstQuery * query, gpointer user_data);
154 
155 
156 /* Some properties are protected by LOCK, others by STREAM_LOCK
157  * However, manipulating either of these partitions may require
158  * to signal/wake a _WAIT, so use a separate (sort of) event to prevent races
159  * Alternative implementations are possible, e.g. some low-level re-implementing
160  * of the 2 above locks to drop both of them atomically when going into _WAIT.
161  */
162 #define GST_COLLECT_PADS_GET_EVT_COND(pads) (&((GstCollectPads *)pads)->priv->evt_cond)
163 #define GST_COLLECT_PADS_GET_EVT_LOCK(pads) (&((GstCollectPads *)pads)->priv->evt_lock)
164 #define GST_COLLECT_PADS_EVT_WAIT(pads, cookie) G_STMT_START {    \
165   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
166   /* should work unless a lot of event'ing and thread starvation */\
167   while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie)         \
168     g_cond_wait (GST_COLLECT_PADS_GET_EVT_COND (pads),            \
169         GST_COLLECT_PADS_GET_EVT_LOCK (pads));                    \
170   cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
171   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
172 } G_STMT_END
173 #define GST_COLLECT_PADS_EVT_WAIT_TIMED(pads, cookie, timeout) G_STMT_START { \
174   gint64 end_time = g_get_monotonic_time () + timeout; \
175   \
176   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
177   /* should work unless a lot of event'ing and thread starvation */\
178   while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie)         \
179     g_cond_wait_until (GST_COLLECT_PADS_GET_EVT_COND (pads),            \
180         GST_COLLECT_PADS_GET_EVT_LOCK (pads), end_time);                    \
181   cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
182   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
183 } G_STMT_END
184 #define GST_COLLECT_PADS_EVT_BROADCAST(pads) G_STMT_START {       \
185   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
186   /* never mind wrap-around */                                     \
187   ++(((GstCollectPads *) pads)->priv->evt_cookie);                      \
188   g_cond_broadcast (GST_COLLECT_PADS_GET_EVT_COND (pads));        \
189   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
190 } G_STMT_END
191 #define GST_COLLECT_PADS_EVT_INIT(cookie) G_STMT_START {          \
192   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
193   cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
194   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
195 } G_STMT_END
196 
197 static void
gst_collect_pads_class_init(GstCollectPadsClass * klass)198 gst_collect_pads_class_init (GstCollectPadsClass * klass)
199 {
200   GObjectClass *gobject_class = (GObjectClass *) klass;
201 
202   GST_DEBUG_CATEGORY_INIT (collect_pads_debug, "collectpads", 0,
203       "GstCollectPads");
204 
205   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_collect_pads_finalize);
206 }
207 
208 static void
gst_collect_pads_init(GstCollectPads * pads)209 gst_collect_pads_init (GstCollectPads * pads)
210 {
211   pads->priv = gst_collect_pads_get_instance_private (pads);
212 
213   pads->data = NULL;
214   pads->priv->cookie = 0;
215   pads->priv->numpads = 0;
216   pads->priv->queuedpads = 0;
217   pads->priv->eospads = 0;
218   pads->priv->started = FALSE;
219 
220   g_rec_mutex_init (&pads->stream_lock);
221 
222   pads->priv->func = gst_collect_pads_default_collected;
223   pads->priv->user_data = NULL;
224   pads->priv->event_func = NULL;
225   pads->priv->event_user_data = NULL;
226 
227   /* members for default muxing */
228   pads->priv->buffer_func = NULL;
229   pads->priv->buffer_user_data = NULL;
230   pads->priv->compare_func = gst_collect_pads_default_compare_func;
231   pads->priv->compare_user_data = NULL;
232   pads->priv->earliest_data = NULL;
233   pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
234 
235   pads->priv->event_func = gst_collect_pads_event_default_internal;
236   pads->priv->query_func = gst_collect_pads_query_default_internal;
237 
238   /* members to manage the pad list */
239   pads->priv->pad_cookie = 0;
240   pads->priv->pad_list = NULL;
241 
242   /* members for event */
243   g_mutex_init (&pads->priv->evt_lock);
244   g_cond_init (&pads->priv->evt_cond);
245   pads->priv->evt_cookie = 0;
246 
247   pads->priv->seeking = FALSE;
248   pads->priv->pending_flush_start = FALSE;
249   pads->priv->pending_flush_stop = FALSE;
250 }
251 
252 static void
gst_collect_pads_finalize(GObject * object)253 gst_collect_pads_finalize (GObject * object)
254 {
255   GstCollectPads *pads = GST_COLLECT_PADS (object);
256 
257   GST_DEBUG_OBJECT (object, "finalize");
258 
259   g_rec_mutex_clear (&pads->stream_lock);
260 
261   g_cond_clear (&pads->priv->evt_cond);
262   g_mutex_clear (&pads->priv->evt_lock);
263 
264   /* Remove pads and free pads list */
265   g_slist_foreach (pads->priv->pad_list, (GFunc) unref_data, NULL);
266   g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
267   g_slist_free (pads->data);
268   g_slist_free (pads->priv->pad_list);
269 
270   G_OBJECT_CLASS (parent_class)->finalize (object);
271 }
272 
273 /**
274  * gst_collect_pads_new:
275  *
276  * Create a new instance of #GstCollectPads.
277  *
278  * MT safe.
279  *
280  * Returns: (transfer full): a new #GstCollectPads, or %NULL in case of an error.
281  */
282 GstCollectPads *
gst_collect_pads_new(void)283 gst_collect_pads_new (void)
284 {
285   GstCollectPads *newcoll;
286 
287   newcoll = g_object_new (GST_TYPE_COLLECT_PADS, NULL);
288 
289   /* clear floating flag */
290   gst_object_ref_sink (newcoll);
291 
292   return newcoll;
293 }
294 
295 /* Must be called with GstObject lock! */
296 static void
gst_collect_pads_set_buffer_function_locked(GstCollectPads * pads,GstCollectPadsBufferFunction func,gpointer user_data)297 gst_collect_pads_set_buffer_function_locked (GstCollectPads * pads,
298     GstCollectPadsBufferFunction func, gpointer user_data)
299 {
300   pads->priv->buffer_func = func;
301   pads->priv->buffer_user_data = user_data;
302 }
303 
304 /**
305  * gst_collect_pads_set_buffer_function:
306  * @pads: the collectpads to use
307  * @func: (scope call): the function to set
308  * @user_data: (closure): user data passed to the function
309  *
310  * Set the callback function and user data that will be called with
311  * the oldest buffer when all pads have been collected, or %NULL on EOS.
312  * If a buffer is passed, the callback owns a reference and must unref
313  * it.
314  *
315  * MT safe.
316  */
317 void
gst_collect_pads_set_buffer_function(GstCollectPads * pads,GstCollectPadsBufferFunction func,gpointer user_data)318 gst_collect_pads_set_buffer_function (GstCollectPads * pads,
319     GstCollectPadsBufferFunction func, gpointer user_data)
320 {
321   g_return_if_fail (pads != NULL);
322   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
323 
324   GST_OBJECT_LOCK (pads);
325   gst_collect_pads_set_buffer_function_locked (pads, func, user_data);
326   GST_OBJECT_UNLOCK (pads);
327 }
328 
329 /**
330  * gst_collect_pads_set_compare_function:
331  * @pads: the pads to use
332  * @func: (scope call): the function to set
333  * @user_data: (closure): user data passed to the function
334  *
335  * Set the timestamp comparison function.
336  *
337  * MT safe.
338  */
339 /* NOTE allowing to change comparison seems not advisable;
340 no known use-case, and collaboration with default algorithm is unpredictable.
341 If custom comparing/operation is needed, just use a collect function of
342 your own */
343 void
gst_collect_pads_set_compare_function(GstCollectPads * pads,GstCollectPadsCompareFunction func,gpointer user_data)344 gst_collect_pads_set_compare_function (GstCollectPads * pads,
345     GstCollectPadsCompareFunction func, gpointer user_data)
346 {
347   g_return_if_fail (pads != NULL);
348   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
349 
350   GST_OBJECT_LOCK (pads);
351   pads->priv->compare_func = func;
352   pads->priv->compare_user_data = user_data;
353   GST_OBJECT_UNLOCK (pads);
354 }
355 
356 /**
357  * gst_collect_pads_set_function:
358  * @pads: the collectpads to use
359  * @func: (scope call): the function to set
360  * @user_data: user data passed to the function
361  *
362  * CollectPads provides a default collection algorithm that will determine
363  * the oldest buffer available on all of its pads, and then delegate
364  * to a configured callback.
365  * However, if circumstances are more complicated and/or more control
366  * is desired, this sets a callback that will be invoked instead when
367  * all the pads added to the collection have buffers queued.
368  * Evidently, this callback is not compatible with
369  * gst_collect_pads_set_buffer_function() callback.
370  * If this callback is set, the former will be unset.
371  *
372  * MT safe.
373  */
374 void
gst_collect_pads_set_function(GstCollectPads * pads,GstCollectPadsFunction func,gpointer user_data)375 gst_collect_pads_set_function (GstCollectPads * pads,
376     GstCollectPadsFunction func, gpointer user_data)
377 {
378   g_return_if_fail (pads != NULL);
379   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
380 
381   GST_OBJECT_LOCK (pads);
382   pads->priv->func = func;
383   pads->priv->user_data = user_data;
384   gst_collect_pads_set_buffer_function_locked (pads, NULL, NULL);
385   GST_OBJECT_UNLOCK (pads);
386 }
387 
388 static void
ref_data(GstCollectData * data)389 ref_data (GstCollectData * data)
390 {
391   g_assert (data != NULL);
392 
393   g_atomic_int_inc (&(data->priv->refcount));
394 }
395 
396 static void
unref_data(GstCollectData * data)397 unref_data (GstCollectData * data)
398 {
399   g_assert (data != NULL);
400   g_assert (data->priv->refcount > 0);
401 
402   if (!g_atomic_int_dec_and_test (&(data->priv->refcount)))
403     return;
404 
405   if (data->priv->destroy_notify)
406     data->priv->destroy_notify (data);
407 
408   gst_object_unref (data->pad);
409   if (data->buffer) {
410     gst_buffer_unref (data->buffer);
411   }
412   g_free (data->priv);
413   g_free (data);
414 }
415 
416 /**
417  * gst_collect_pads_set_event_function:
418  * @pads: the collectpads to use
419  * @func: (scope call): the function to set
420  * @user_data: user data passed to the function
421  *
422  * Set the event callback function and user data that will be called when
423  * collectpads has received an event originating from one of the collected
424  * pads.  If the event being processed is a serialized one, this callback is
425  * called with @pads STREAM_LOCK held, otherwise not.  As this lock should be
426  * held when calling a number of CollectPads functions, it should be acquired
427  * if so (unusually) needed.
428  *
429  * MT safe.
430  */
431 void
gst_collect_pads_set_event_function(GstCollectPads * pads,GstCollectPadsEventFunction func,gpointer user_data)432 gst_collect_pads_set_event_function (GstCollectPads * pads,
433     GstCollectPadsEventFunction func, gpointer user_data)
434 {
435   g_return_if_fail (pads != NULL);
436   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
437 
438   GST_OBJECT_LOCK (pads);
439   pads->priv->event_func = func;
440   pads->priv->event_user_data = user_data;
441   GST_OBJECT_UNLOCK (pads);
442 }
443 
444 /**
445  * gst_collect_pads_set_query_function:
446  * @pads: the collectpads to use
447  * @func: (scope call): the function to set
448  * @user_data: user data passed to the function
449  *
450  * Set the query callback function and user data that will be called after
451  * collectpads has received a query originating from one of the collected
452  * pads.  If the query being processed is a serialized one, this callback is
453  * called with @pads STREAM_LOCK held, otherwise not.  As this lock should be
454  * held when calling a number of CollectPads functions, it should be acquired
455  * if so (unusually) needed.
456  *
457  * MT safe.
458  */
459 void
gst_collect_pads_set_query_function(GstCollectPads * pads,GstCollectPadsQueryFunction func,gpointer user_data)460 gst_collect_pads_set_query_function (GstCollectPads * pads,
461     GstCollectPadsQueryFunction func, gpointer user_data)
462 {
463   g_return_if_fail (pads != NULL);
464   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
465 
466   GST_OBJECT_LOCK (pads);
467   pads->priv->query_func = func;
468   pads->priv->query_user_data = user_data;
469   GST_OBJECT_UNLOCK (pads);
470 }
471 
472 /**
473 * gst_collect_pads_clip_running_time:
474 * @pads: the collectpads to use
475 * @cdata: collect data of corresponding pad
476 * @buf: buffer being clipped
477 * @outbuf: (allow-none) (out): output buffer with running time, or NULL if clipped
478 * @user_data: user data (unused)
479 *
480 * Convenience clipping function that converts incoming buffer's timestamp
481 * to running time, or clips the buffer if outside configured segment.
482 *
483 * Since 1.6, this clipping function also sets the DTS parameter of the
484 * GstCollectData structure. This version of the running time DTS can be
485 * negative. G_MININT64 is used to indicate invalid value.
486 */
487 GstFlowReturn
gst_collect_pads_clip_running_time(GstCollectPads * pads,GstCollectData * cdata,GstBuffer * buf,GstBuffer ** outbuf,gpointer user_data)488 gst_collect_pads_clip_running_time (GstCollectPads * pads,
489     GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf,
490     gpointer user_data)
491 {
492   *outbuf = buf;
493 
494   /* invalid left alone and passed */
495   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS_OR_PTS (buf)))) {
496     GstClockTime time;
497     GstClockTime buf_dts, abs_dts;
498     gint dts_sign;
499 
500     time = GST_BUFFER_PTS (buf);
501 
502     if (GST_CLOCK_TIME_IS_VALID (time)) {
503       time =
504           gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
505       if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
506         GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment %"
507             GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf)));
508         gst_buffer_unref (buf);
509         *outbuf = NULL;
510         return GST_FLOW_OK;
511       }
512     }
513 
514     GST_LOG_OBJECT (cdata->pad, "buffer pts %" GST_TIME_FORMAT " -> %"
515         GST_TIME_FORMAT " running time",
516         GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time));
517     *outbuf = gst_buffer_make_writable (buf);
518     GST_BUFFER_PTS (*outbuf) = time;
519 
520     dts_sign = gst_segment_to_running_time_full (&cdata->segment,
521         GST_FORMAT_TIME, GST_BUFFER_DTS (*outbuf), &abs_dts);
522     buf_dts = GST_BUFFER_DTS (*outbuf);
523     if (dts_sign > 0) {
524       GST_BUFFER_DTS (*outbuf) = abs_dts;
525       GST_COLLECT_PADS_DTS (cdata) = abs_dts;
526     } else if (dts_sign < 0) {
527       GST_BUFFER_DTS (*outbuf) = GST_CLOCK_TIME_NONE;
528       GST_COLLECT_PADS_DTS (cdata) = -((gint64) abs_dts);
529     } else {
530       GST_BUFFER_DTS (*outbuf) = GST_CLOCK_TIME_NONE;
531       GST_COLLECT_PADS_DTS (cdata) = GST_CLOCK_STIME_NONE;
532     }
533 
534     GST_LOG_OBJECT (cdata->pad, "buffer dts %" GST_TIME_FORMAT " -> %"
535         GST_STIME_FORMAT " running time", GST_TIME_ARGS (buf_dts),
536         GST_STIME_ARGS (GST_COLLECT_PADS_DTS (cdata)));
537   }
538 
539   return GST_FLOW_OK;
540 }
541 
542 /**
543  * gst_collect_pads_set_clip_function:
544  * @pads: the collectpads to use
545  * @clipfunc: (scope call): clip function to install
546  * @user_data: user data to pass to @clip_func
547  *
548  * Install a clipping function that is called right after a buffer is received
549  * on a pad managed by @pads. See #GstCollectPadsClipFunction for more info.
550  */
551 void
gst_collect_pads_set_clip_function(GstCollectPads * pads,GstCollectPadsClipFunction clipfunc,gpointer user_data)552 gst_collect_pads_set_clip_function (GstCollectPads * pads,
553     GstCollectPadsClipFunction clipfunc, gpointer user_data)
554 {
555   g_return_if_fail (pads != NULL);
556   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
557 
558   pads->priv->clip_func = clipfunc;
559   pads->priv->clip_user_data = user_data;
560 }
561 
562 /**
563  * gst_collect_pads_set_flush_function:
564  * @pads: the collectpads to use
565  * @func: (scope call): flush function to install
566  * @user_data: user data to pass to @func
567  *
568  * Install a flush function that is called when the internal
569  * state of all pads should be flushed as part of flushing seek
570  * handling. See #GstCollectPadsFlushFunction for more info.
571  *
572  * Since: 1.4
573  */
574 void
gst_collect_pads_set_flush_function(GstCollectPads * pads,GstCollectPadsFlushFunction func,gpointer user_data)575 gst_collect_pads_set_flush_function (GstCollectPads * pads,
576     GstCollectPadsFlushFunction func, gpointer user_data)
577 {
578   g_return_if_fail (pads != NULL);
579   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
580 
581   pads->priv->flush_func = func;
582   pads->priv->flush_user_data = user_data;
583 }
584 
585 /**
586  * gst_collect_pads_add_pad:
587  * @pads: the collectpads to use
588  * @pad: (transfer none): the pad to add
589  * @size: the size of the returned #GstCollectData structure
590  * @destroy_notify: (scope async): function to be called before the returned
591  *   #GstCollectData structure is freed
592  * @lock: whether to lock this pad in usual waiting state
593  *
594  * Add a pad to the collection of collect pads. The pad has to be
595  * a sinkpad. The refcount of the pad is incremented. Use
596  * gst_collect_pads_remove_pad() to remove the pad from the collection
597  * again.
598  *
599  * You specify a size for the returned #GstCollectData structure
600  * so that you can use it to store additional information.
601  *
602  * You can also specify a #GstCollectDataDestroyNotify that will be called
603  * just before the #GstCollectData structure is freed. It is passed the
604  * pointer to the structure and should free any custom memory and resources
605  * allocated for it.
606  *
607  * Keeping a pad locked in waiting state is only relevant when using
608  * the default collection algorithm (providing the oldest buffer).
609  * It ensures a buffer must be available on this pad for a collection
610  * to take place.  This is of typical use to a muxer element where
611  * non-subtitle streams should always be in waiting state,
612  * e.g. to assure that caps information is available on all these streams
613  * when initial headers have to be written.
614  *
615  * The pad will be automatically activated in push mode when @pads is
616  * started.
617  *
618  * MT safe.
619  *
620  * Returns: (nullable) (transfer none): a new #GstCollectData to identify the
621  *   new pad. Or %NULL if wrong parameters are supplied.
622  */
623 GstCollectData *
gst_collect_pads_add_pad(GstCollectPads * pads,GstPad * pad,guint size,GstCollectDataDestroyNotify destroy_notify,gboolean lock)624 gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size,
625     GstCollectDataDestroyNotify destroy_notify, gboolean lock)
626 {
627   GstCollectData *data;
628 
629   g_return_val_if_fail (pads != NULL, NULL);
630   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
631   g_return_val_if_fail (pad != NULL, NULL);
632   g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL);
633   g_return_val_if_fail (size >= sizeof (GstCollectData), NULL);
634 
635   GST_DEBUG_OBJECT (pads, "adding pad %s:%s", GST_DEBUG_PAD_NAME (pad));
636 
637   data = g_malloc0 (size);
638   data->priv = g_new0 (GstCollectDataPrivate, 1);
639   data->collect = pads;
640   data->pad = gst_object_ref (pad);
641   data->buffer = NULL;
642   data->pos = 0;
643   gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
644   data->state = GST_COLLECT_PADS_STATE_WAITING;
645   data->state |= lock ? GST_COLLECT_PADS_STATE_LOCKED : 0;
646   data->priv->refcount = 1;
647   data->priv->destroy_notify = destroy_notify;
648   data->ABI.abi.dts = G_MININT64;
649 
650   GST_OBJECT_LOCK (pads);
651   GST_OBJECT_LOCK (pad);
652   gst_pad_set_element_private (pad, data);
653   GST_OBJECT_UNLOCK (pad);
654   pads->priv->pad_list = g_slist_append (pads->priv->pad_list, data);
655   gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain));
656   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event));
657   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_query));
658   /* backward compat, also add to data if stopped, so that the element already
659    * has this in the public data list before going PAUSED (typically)
660    * this can only be done when we are stopped because we don't take the
661    * STREAM_LOCK to protect the pads->data list. */
662   if (!pads->priv->started) {
663     pads->data = g_slist_append (pads->data, data);
664     ref_data (data);
665   }
666   /* activate the pad when needed */
667   if (pads->priv->started)
668     gst_pad_set_active (pad, TRUE);
669   pads->priv->pad_cookie++;
670   GST_OBJECT_UNLOCK (pads);
671 
672   return data;
673 }
674 
675 static gint
find_pad(GstCollectData * data,GstPad * pad)676 find_pad (GstCollectData * data, GstPad * pad)
677 {
678   if (data->pad == pad)
679     return 0;
680   return 1;
681 }
682 
683 /**
684  * gst_collect_pads_remove_pad:
685  * @pads: the collectpads to use
686  * @pad: (transfer none): the pad to remove
687  *
688  * Remove a pad from the collection of collect pads. This function will also
689  * free the #GstCollectData and all the resources that were allocated with
690  * gst_collect_pads_add_pad().
691  *
692  * The pad will be deactivated automatically when @pads is stopped.
693  *
694  * MT safe.
695  *
696  * Returns: %TRUE if the pad could be removed.
697  */
698 gboolean
gst_collect_pads_remove_pad(GstCollectPads * pads,GstPad * pad)699 gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
700 {
701   GstCollectData *data;
702   GSList *list;
703 
704   g_return_val_if_fail (pads != NULL, FALSE);
705   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
706   g_return_val_if_fail (pad != NULL, FALSE);
707   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
708 
709   GST_DEBUG_OBJECT (pads, "removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
710 
711   GST_OBJECT_LOCK (pads);
712   list =
713       g_slist_find_custom (pads->priv->pad_list, pad, (GCompareFunc) find_pad);
714   if (!list)
715     goto unknown_pad;
716 
717   data = (GstCollectData *) list->data;
718 
719   GST_DEBUG_OBJECT (pads, "found pad %s:%s at %p", GST_DEBUG_PAD_NAME (pad),
720       data);
721 
722   /* clear the stuff we configured */
723   gst_pad_set_chain_function (pad, NULL);
724   gst_pad_set_event_function (pad, NULL);
725   GST_OBJECT_LOCK (pad);
726   gst_pad_set_element_private (pad, NULL);
727   GST_OBJECT_UNLOCK (pad);
728 
729   /* backward compat, also remove from data if stopped, note that this function
730    * can only be called when we are stopped because we don't take the
731    * STREAM_LOCK to protect the pads->data list. */
732   if (!pads->priv->started) {
733     GSList *dlist;
734 
735     dlist = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
736     if (dlist) {
737       GstCollectData *pdata = dlist->data;
738 
739       pads->data = g_slist_delete_link (pads->data, dlist);
740       unref_data (pdata);
741     }
742   }
743   /* remove from the pad list */
744   pads->priv->pad_list = g_slist_delete_link (pads->priv->pad_list, list);
745   pads->priv->pad_cookie++;
746 
747   /* signal waiters because something changed */
748   GST_COLLECT_PADS_EVT_BROADCAST (pads);
749 
750   /* deactivate the pad when needed */
751   if (!pads->priv->started)
752     gst_pad_set_active (pad, FALSE);
753 
754   /* clean and free the collect data */
755   unref_data (data);
756 
757   GST_OBJECT_UNLOCK (pads);
758 
759   return TRUE;
760 
761 unknown_pad:
762   {
763     GST_WARNING_OBJECT (pads, "cannot remove unknown pad %s:%s",
764         GST_DEBUG_PAD_NAME (pad));
765     GST_OBJECT_UNLOCK (pads);
766     return FALSE;
767   }
768 }
769 
770 /*
771  * Must be called with STREAM_LOCK and OBJECT_LOCK.
772  */
773 static void
gst_collect_pads_set_flushing_unlocked(GstCollectPads * pads,gboolean flushing)774 gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
775     gboolean flushing)
776 {
777   GSList *walk = NULL;
778 
779   GST_DEBUG ("sink-pads flushing=%d", flushing);
780 
781   /* Update the pads flushing flag */
782   for (walk = pads->priv->pad_list; walk; walk = g_slist_next (walk)) {
783     GstCollectData *cdata = walk->data;
784 
785     if (GST_IS_PAD (cdata->pad)) {
786       GST_OBJECT_LOCK (cdata->pad);
787       if (flushing)
788         GST_PAD_SET_FLUSHING (cdata->pad);
789       else
790         GST_PAD_UNSET_FLUSHING (cdata->pad);
791       if (flushing)
792         GST_COLLECT_PADS_STATE_SET (cdata, GST_COLLECT_PADS_STATE_FLUSHING);
793       else
794         GST_COLLECT_PADS_STATE_UNSET (cdata, GST_COLLECT_PADS_STATE_FLUSHING);
795       gst_collect_pads_clear (pads, cdata);
796       GST_OBJECT_UNLOCK (cdata->pad);
797     }
798   }
799 
800   /* inform _chain of changes */
801   GST_COLLECT_PADS_EVT_BROADCAST (pads);
802 }
803 
804 /**
805  * gst_collect_pads_set_flushing:
806  * @pads: the collectpads to use
807  * @flushing: desired state of the pads
808  *
809  * Change the flushing state of all the pads in the collection. No pad
810  * is able to accept anymore data when @flushing is %TRUE. Calling this
811  * function with @flushing %FALSE makes @pads accept data again.
812  * Caller must ensure that downstream streaming (thread) is not blocked,
813  * e.g. by sending a FLUSH_START downstream.
814  *
815  * MT safe.
816  */
817 void
gst_collect_pads_set_flushing(GstCollectPads * pads,gboolean flushing)818 gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
819 {
820   g_return_if_fail (pads != NULL);
821   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
822 
823   /* NOTE since this eventually calls _pop, some (STREAM_)LOCK is needed here */
824   GST_COLLECT_PADS_STREAM_LOCK (pads);
825   GST_OBJECT_LOCK (pads);
826   gst_collect_pads_set_flushing_unlocked (pads, flushing);
827   GST_OBJECT_UNLOCK (pads);
828   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
829 }
830 
831 /**
832  * gst_collect_pads_start:
833  * @pads: the collectpads to use
834  *
835  * Starts the processing of data in the collect_pads.
836  *
837  * MT safe.
838  */
839 void
gst_collect_pads_start(GstCollectPads * pads)840 gst_collect_pads_start (GstCollectPads * pads)
841 {
842   GSList *collected;
843 
844   g_return_if_fail (pads != NULL);
845   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
846 
847   GST_DEBUG_OBJECT (pads, "starting collect pads");
848 
849   /* make sure stop and collect cannot be called anymore */
850   GST_COLLECT_PADS_STREAM_LOCK (pads);
851 
852   /* make pads streamable */
853   GST_OBJECT_LOCK (pads);
854 
855   /* loop over the master pad list and reset the segment */
856   collected = pads->priv->pad_list;
857   for (; collected; collected = g_slist_next (collected)) {
858     GstCollectData *data;
859 
860     data = collected->data;
861     gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
862   }
863 
864   gst_collect_pads_set_flushing_unlocked (pads, FALSE);
865 
866   /* Start collect pads */
867   pads->priv->started = TRUE;
868   GST_OBJECT_UNLOCK (pads);
869   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
870 }
871 
872 /**
873  * gst_collect_pads_stop:
874  * @pads: the collectpads to use
875  *
876  * Stops the processing of data in the collect_pads. this function
877  * will also unblock any blocking operations.
878  *
879  * MT safe.
880  */
881 void
gst_collect_pads_stop(GstCollectPads * pads)882 gst_collect_pads_stop (GstCollectPads * pads)
883 {
884   GSList *collected;
885 
886   g_return_if_fail (pads != NULL);
887   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
888 
889   GST_DEBUG_OBJECT (pads, "stopping collect pads");
890 
891   /* make sure collect and start cannot be called anymore */
892   GST_COLLECT_PADS_STREAM_LOCK (pads);
893 
894   /* make pads not accept data anymore */
895   GST_OBJECT_LOCK (pads);
896   gst_collect_pads_set_flushing_unlocked (pads, TRUE);
897 
898   /* Stop collect pads */
899   pads->priv->started = FALSE;
900   pads->priv->eospads = 0;
901   pads->priv->queuedpads = 0;
902 
903   /* loop over the master pad list and flush buffers */
904   collected = pads->priv->pad_list;
905   for (; collected; collected = g_slist_next (collected)) {
906     GstCollectData *data;
907     GstBuffer **buffer_p;
908 
909     data = collected->data;
910     if (data->buffer) {
911       buffer_p = &data->buffer;
912       gst_buffer_replace (buffer_p, NULL);
913       data->pos = 0;
914     }
915     GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
916   }
917 
918   if (pads->priv->earliest_data)
919     unref_data (pads->priv->earliest_data);
920   pads->priv->earliest_data = NULL;
921   pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
922 
923   GST_OBJECT_UNLOCK (pads);
924   /* Wake them up so they can end the chain functions. */
925   GST_COLLECT_PADS_EVT_BROADCAST (pads);
926 
927   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
928 }
929 
930 /**
931  * gst_collect_pads_peek:
932  * @pads: the collectpads to peek
933  * @data: the data to use
934  *
935  * Peek at the buffer currently queued in @data. This function
936  * should be called with the @pads STREAM_LOCK held, such as in the callback
937  * handler.
938  *
939  * MT safe.
940  *
941  * Returns: (transfer full) (nullable): The buffer in @data or %NULL if no
942  * buffer is queued. should unref the buffer after usage.
943  */
944 GstBuffer *
gst_collect_pads_peek(GstCollectPads * pads,GstCollectData * data)945 gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
946 {
947   GstBuffer *result;
948 
949   g_return_val_if_fail (pads != NULL, NULL);
950   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
951   g_return_val_if_fail (data != NULL, NULL);
952 
953   if ((result = data->buffer))
954     gst_buffer_ref (result);
955 
956   GST_DEBUG_OBJECT (pads, "Peeking at pad %s:%s: buffer=%" GST_PTR_FORMAT,
957       GST_DEBUG_PAD_NAME (data->pad), result);
958 
959   return result;
960 }
961 
962 /**
963  * gst_collect_pads_pop:
964  * @pads: the collectpads to pop
965  * @data: the data to use
966  *
967  * Pop the buffer currently queued in @data. This function
968  * should be called with the @pads STREAM_LOCK held, such as in the callback
969  * handler.
970  *
971  * MT safe.
972  *
973  * Returns: (transfer full) (nullable): The buffer in @data or %NULL if no
974  * buffer was queued. You should unref the buffer after usage.
975  */
976 GstBuffer *
gst_collect_pads_pop(GstCollectPads * pads,GstCollectData * data)977 gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
978 {
979   GstBuffer *result;
980 
981   g_return_val_if_fail (pads != NULL, NULL);
982   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
983   g_return_val_if_fail (data != NULL, NULL);
984 
985   if ((result = data->buffer)) {
986     data->buffer = NULL;
987     data->pos = 0;
988     /* one less pad with queued data now */
989     if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
990       pads->priv->queuedpads--;
991   }
992 
993   GST_COLLECT_PADS_EVT_BROADCAST (pads);
994 
995   GST_DEBUG_OBJECT (pads, "Pop buffer on pad %s:%s: buffer=%" GST_PTR_FORMAT,
996       GST_DEBUG_PAD_NAME (data->pad), result);
997 
998   return result;
999 }
1000 
1001 /* pop and unref the currently queued buffer, should be called with STREAM_LOCK
1002  * held */
1003 static void
gst_collect_pads_clear(GstCollectPads * pads,GstCollectData * data)1004 gst_collect_pads_clear (GstCollectPads * pads, GstCollectData * data)
1005 {
1006   GstBuffer *buf;
1007 
1008   if ((buf = gst_collect_pads_pop (pads, data)))
1009     gst_buffer_unref (buf);
1010 }
1011 
1012 /**
1013  * gst_collect_pads_available:
1014  * @pads: the collectpads to query
1015  *
1016  * Query how much bytes can be read from each queued buffer. This means
1017  * that the result of this call is the maximum number of bytes that can
1018  * be read from each of the pads.
1019  *
1020  * This function should be called with @pads STREAM_LOCK held, such as
1021  * in the callback.
1022  *
1023  * MT safe.
1024  *
1025  * Returns: The maximum number of bytes queued on all pads. This function
1026  * returns 0 if a pad has no queued buffer.
1027  */
1028 /* we might pre-calculate this in some struct field,
1029  * but would then have to maintain this in _chain and particularly _pop, etc,
1030  * even if element is never interested in this information */
1031 guint
gst_collect_pads_available(GstCollectPads * pads)1032 gst_collect_pads_available (GstCollectPads * pads)
1033 {
1034   GSList *collected;
1035   guint result = G_MAXUINT;
1036 
1037   g_return_val_if_fail (pads != NULL, 0);
1038   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
1039 
1040   collected = pads->data;
1041   for (; collected; collected = g_slist_next (collected)) {
1042     GstCollectData *pdata;
1043     GstBuffer *buffer;
1044     gint size;
1045 
1046     pdata = (GstCollectData *) collected->data;
1047 
1048     /* ignore pad with EOS */
1049     if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (pdata,
1050                 GST_COLLECT_PADS_STATE_EOS))) {
1051       GST_DEBUG_OBJECT (pads, "pad %p is EOS", pdata);
1052       continue;
1053     }
1054 
1055     /* an empty buffer without EOS is weird when we get here.. */
1056     if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
1057       GST_WARNING_OBJECT (pads, "pad %p has no buffer", pdata);
1058       goto not_filled;
1059     }
1060 
1061     /* this is the size left of the buffer */
1062     size = gst_buffer_get_size (buffer) - pdata->pos;
1063     GST_DEBUG_OBJECT (pads, "pad %p has %d bytes left", pdata, size);
1064 
1065     /* need to return the min of all available data */
1066     if (size < result)
1067       result = size;
1068   }
1069   /* nothing changed, all must be EOS then, return 0 */
1070   if (G_UNLIKELY (result == G_MAXUINT))
1071     result = 0;
1072 
1073   return result;
1074 
1075 not_filled:
1076   {
1077     return 0;
1078   }
1079 }
1080 
1081 /**
1082  * gst_collect_pads_flush:
1083  * @pads: the collectpads to query
1084  * @data: the data to use
1085  * @size: the number of bytes to flush
1086  *
1087  * Flush @size bytes from the pad @data.
1088  *
1089  * This function should be called with @pads STREAM_LOCK held, such as
1090  * in the callback.
1091  *
1092  * MT safe.
1093  *
1094  * Returns: The number of bytes flushed This can be less than @size and
1095  * is 0 if the pad was end-of-stream.
1096  */
1097 guint
gst_collect_pads_flush(GstCollectPads * pads,GstCollectData * data,guint size)1098 gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
1099     guint size)
1100 {
1101   guint flushsize;
1102   gsize bsize;
1103   GstBuffer *buffer;
1104 
1105   g_return_val_if_fail (pads != NULL, 0);
1106   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
1107   g_return_val_if_fail (data != NULL, 0);
1108 
1109   /* no buffer, must be EOS */
1110   if ((buffer = data->buffer) == NULL)
1111     return 0;
1112 
1113   bsize = gst_buffer_get_size (buffer);
1114 
1115   /* this is what we can flush at max */
1116   flushsize = MIN (size, bsize - data->pos);
1117 
1118   data->pos += size;
1119 
1120   if (data->pos >= bsize)
1121     /* _clear will also reset data->pos to 0 */
1122     gst_collect_pads_clear (pads, data);
1123 
1124   return flushsize;
1125 }
1126 
1127 /**
1128  * gst_collect_pads_read_buffer:
1129  * @pads: the collectpads to query
1130  * @data: the data to use
1131  * @size: the number of bytes to read
1132  *
1133  * Get a subbuffer of @size bytes from the given pad @data.
1134  *
1135  * This function should be called with @pads STREAM_LOCK held, such as in the
1136  * callback.
1137  *
1138  * MT safe.
1139  *
1140  * Returns: (transfer full) (nullable): A sub buffer. The size of the buffer can
1141  * be less that requested. A return of %NULL signals that the pad is
1142  * end-of-stream. Unref the buffer after use.
1143  */
1144 GstBuffer *
gst_collect_pads_read_buffer(GstCollectPads * pads,GstCollectData * data,guint size)1145 gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
1146     guint size)
1147 {
1148   guint readsize, buf_size;
1149   GstBuffer *buffer;
1150 
1151   g_return_val_if_fail (pads != NULL, NULL);
1152   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
1153   g_return_val_if_fail (data != NULL, NULL);
1154 
1155   /* no buffer, must be EOS */
1156   if ((buffer = data->buffer) == NULL)
1157     return NULL;
1158 
1159   buf_size = gst_buffer_get_size (buffer);
1160   readsize = MIN (size, buf_size - data->pos);
1161 
1162   return gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, data->pos,
1163       readsize);
1164 }
1165 
1166 /**
1167  * gst_collect_pads_take_buffer:
1168  * @pads: the collectpads to query
1169  * @data: the data to use
1170  * @size: the number of bytes to read
1171  *
1172  * Get a subbuffer of @size bytes from the given pad @data. Flushes the amount
1173  * of read bytes.
1174  *
1175  * This function should be called with @pads STREAM_LOCK held, such as in the
1176  * callback.
1177  *
1178  * MT safe.
1179  *
1180  * Returns: (transfer full) (nullable): A sub buffer. The size of the buffer can
1181  * be less that requested. A return of %NULL signals that the pad is
1182  * end-of-stream. Unref the buffer after use.
1183  */
1184 GstBuffer *
gst_collect_pads_take_buffer(GstCollectPads * pads,GstCollectData * data,guint size)1185 gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data,
1186     guint size)
1187 {
1188   GstBuffer *buffer = gst_collect_pads_read_buffer (pads, data, size);
1189 
1190   if (buffer) {
1191     gst_collect_pads_flush (pads, data, gst_buffer_get_size (buffer));
1192   }
1193   return buffer;
1194 }
1195 
1196 /**
1197  * gst_collect_pads_set_waiting:
1198  * @pads: the collectpads
1199  * @data: the data to use
1200  * @waiting: boolean indicating whether this pad should operate
1201  *           in waiting or non-waiting mode
1202  *
1203  * Sets a pad to waiting or non-waiting mode, if at least this pad
1204  * has not been created with locked waiting state,
1205  * in which case nothing happens.
1206  *
1207  * This function should be called with @pads STREAM_LOCK held, such as
1208  * in the callback.
1209  *
1210  * MT safe.
1211  */
1212 void
gst_collect_pads_set_waiting(GstCollectPads * pads,GstCollectData * data,gboolean waiting)1213 gst_collect_pads_set_waiting (GstCollectPads * pads, GstCollectData * data,
1214     gboolean waiting)
1215 {
1216   g_return_if_fail (pads != NULL);
1217   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
1218   g_return_if_fail (data != NULL);
1219 
1220   GST_DEBUG_OBJECT (pads, "Setting pad %s to waiting %d, locked %d",
1221       GST_PAD_NAME (data->pad), waiting,
1222       GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED));
1223 
1224   /* Do something only on a change and if not locked */
1225   if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED) &&
1226       (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING) !=
1227           ! !waiting)) {
1228     /* Set waiting state for this pad */
1229     if (waiting)
1230       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_WAITING);
1231     else
1232       GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_WAITING);
1233     /* Update number of queued pads if needed */
1234     if (!data->buffer &&
1235         !GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS)) {
1236       if (waiting)
1237         pads->priv->queuedpads--;
1238       else
1239         pads->priv->queuedpads++;
1240     }
1241 
1242     /* signal waiters because something changed */
1243     GST_COLLECT_PADS_EVT_BROADCAST (pads);
1244   }
1245 }
1246 
1247 /* see if pads were added or removed and update our stats. Any pad
1248  * added after releasing the LOCK will get collected in the next
1249  * round.
1250  *
1251  * We can do a quick check by checking the cookies, that get changed
1252  * whenever the pad list is updated.
1253  *
1254  * Must be called with STREAM_LOCK.
1255  */
1256 static void
gst_collect_pads_check_pads(GstCollectPads * pads)1257 gst_collect_pads_check_pads (GstCollectPads * pads)
1258 {
1259   /* the master list and cookie are protected with LOCK */
1260   GST_OBJECT_LOCK (pads);
1261   if (G_UNLIKELY (pads->priv->pad_cookie != pads->priv->cookie)) {
1262     GSList *collected;
1263 
1264     /* clear list and stats */
1265     g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
1266     g_slist_free (pads->data);
1267     pads->data = NULL;
1268     pads->priv->numpads = 0;
1269     pads->priv->queuedpads = 0;
1270     pads->priv->eospads = 0;
1271     if (pads->priv->earliest_data)
1272       unref_data (pads->priv->earliest_data);
1273     pads->priv->earliest_data = NULL;
1274     pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
1275 
1276     /* loop over the master pad list */
1277     collected = pads->priv->pad_list;
1278     for (; collected; collected = g_slist_next (collected)) {
1279       GstCollectData *data;
1280 
1281       /* update the stats */
1282       pads->priv->numpads++;
1283       data = collected->data;
1284       if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS))
1285         pads->priv->eospads++;
1286       else if (data->buffer || !GST_COLLECT_PADS_STATE_IS_SET (data,
1287               GST_COLLECT_PADS_STATE_WAITING))
1288         pads->priv->queuedpads++;
1289 
1290       /* add to the list of pads to collect */
1291       ref_data (data);
1292       /* preserve order of adding/requesting pads */
1293       pads->data = g_slist_append (pads->data, data);
1294     }
1295     /* and update the cookie */
1296     pads->priv->cookie = pads->priv->pad_cookie;
1297   }
1298   GST_OBJECT_UNLOCK (pads);
1299 }
1300 
1301 /* checks if all the pads are collected and call the collectfunction
1302  *
1303  * Should be called with STREAM_LOCK.
1304  *
1305  * Returns: The #GstFlowReturn of collection.
1306  */
1307 static GstFlowReturn
gst_collect_pads_check_collected(GstCollectPads * pads)1308 gst_collect_pads_check_collected (GstCollectPads * pads)
1309 {
1310   GstFlowReturn flow_ret = GST_FLOW_OK;
1311   GstCollectPadsFunction func;
1312   gpointer user_data;
1313 
1314   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1315 
1316   GST_OBJECT_LOCK (pads);
1317   func = pads->priv->func;
1318   user_data = pads->priv->user_data;
1319   GST_OBJECT_UNLOCK (pads);
1320 
1321   g_return_val_if_fail (pads->priv->func != NULL, GST_FLOW_NOT_SUPPORTED);
1322 
1323   /* check for new pads, update stats etc.. */
1324   gst_collect_pads_check_pads (pads);
1325 
1326   if (G_UNLIKELY (pads->priv->eospads == pads->priv->numpads)) {
1327     /* If all our pads are EOS just collect once to let the element
1328      * do its final EOS handling. */
1329     GST_DEBUG_OBJECT (pads, "All active pads (%d) are EOS, calling %s",
1330         pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func));
1331 
1332     if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
1333                 TRUE, FALSE))) {
1334       GST_INFO_OBJECT (pads, "finished seeking");
1335     }
1336     do {
1337       flow_ret = func (pads, user_data);
1338     } while (flow_ret == GST_FLOW_OK);
1339   } else {
1340     gboolean collected = FALSE;
1341 
1342     /* We call the collected function as long as our condition matches. */
1343     while (((pads->priv->queuedpads + pads->priv->eospads) >=
1344             pads->priv->numpads)) {
1345       GST_DEBUG_OBJECT (pads,
1346           "All active pads (%d + %d >= %d) have data, " "calling %s",
1347           pads->priv->queuedpads, pads->priv->eospads, pads->priv->numpads,
1348           GST_DEBUG_FUNCPTR_NAME (func));
1349 
1350       if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
1351                   TRUE, FALSE))) {
1352         GST_INFO_OBJECT (pads, "finished seeking");
1353       }
1354       flow_ret = func (pads, user_data);
1355       collected = TRUE;
1356 
1357       /* break on error */
1358       if (flow_ret != GST_FLOW_OK)
1359         break;
1360       /* Don't keep looping after telling the element EOS or flushing */
1361       if (pads->priv->queuedpads == 0)
1362         break;
1363     }
1364     if (!collected)
1365       GST_DEBUG_OBJECT (pads, "Not all active pads (%d) have data, continuing",
1366           pads->priv->numpads);
1367   }
1368   return flow_ret;
1369 }
1370 
1371 
1372 /* General overview:
1373  * - only pad with a buffer can determine earliest_data (and earliest_time)
1374  * - only segment info determines (non-)waiting state
1375  * - ? perhaps use _stream_time for comparison
1376  *   (which muxers might have use as well ?)
1377  */
1378 
1379 /*
1380  * Function to recalculate the waiting state of all pads.
1381  *
1382  * Must be called with STREAM_LOCK.
1383  *
1384  * Returns %TRUE if a pad was set to waiting
1385  * (from non-waiting state).
1386  */
1387 static gboolean
gst_collect_pads_recalculate_waiting(GstCollectPads * pads)1388 gst_collect_pads_recalculate_waiting (GstCollectPads * pads)
1389 {
1390   GSList *collected;
1391   gboolean result = FALSE;
1392 
1393   /* If earliest time is not known, there is nothing to do. */
1394   if (pads->priv->earliest_data == NULL)
1395     return FALSE;
1396 
1397   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1398     GstCollectData *data = (GstCollectData *) collected->data;
1399     int cmp_res;
1400     GstClockTime comp_time;
1401 
1402     /* check if pad has a segment */
1403     if (data->segment.format == GST_FORMAT_UNDEFINED) {
1404       GST_WARNING_OBJECT (pads,
1405           "GstCollectPads has no time segment, assuming 0 based.");
1406       gst_segment_init (&data->segment, GST_FORMAT_TIME);
1407       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1408     }
1409 
1410     /* check segment format */
1411     if (data->segment.format != GST_FORMAT_TIME) {
1412       GST_ERROR_OBJECT (pads, "GstCollectPads can handle only time segments.");
1413       continue;
1414     }
1415 
1416     /* check if the waiting state should be changed */
1417     comp_time = data->segment.position;
1418     cmp_res = pads->priv->compare_func (pads, data, comp_time,
1419         pads->priv->earliest_data, pads->priv->earliest_time,
1420         pads->priv->compare_user_data);
1421     if (cmp_res > 0)
1422       /* stop waiting */
1423       gst_collect_pads_set_waiting (pads, data, FALSE);
1424     else {
1425       if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING)) {
1426         /* start waiting */
1427         gst_collect_pads_set_waiting (pads, data, TRUE);
1428         result = TRUE;
1429       }
1430     }
1431   }
1432 
1433   return result;
1434 }
1435 
1436 /**
1437  * gst_collect_pads_find_best_pad:
1438  * @pads: the collectpads to use
1439  * @data: returns the collectdata for earliest data
1440  * @time: returns the earliest available buffertime
1441  *
1442  * Find the oldest/best pad, i.e. pad holding the oldest buffer and
1443  * and return the corresponding #GstCollectData and buffertime.
1444  *
1445  * This function should be called with STREAM_LOCK held,
1446  * such as in the callback.
1447  */
1448 static void
gst_collect_pads_find_best_pad(GstCollectPads * pads,GstCollectData ** data,GstClockTime * time)1449 gst_collect_pads_find_best_pad (GstCollectPads * pads,
1450     GstCollectData ** data, GstClockTime * time)
1451 {
1452   GSList *collected;
1453   GstCollectData *best = NULL;
1454   GstClockTime best_time = GST_CLOCK_TIME_NONE;
1455 
1456   g_return_if_fail (data != NULL);
1457   g_return_if_fail (time != NULL);
1458 
1459   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1460     GstBuffer *buffer;
1461     GstCollectData *data = (GstCollectData *) collected->data;
1462     GstClockTime timestamp;
1463 
1464     buffer = gst_collect_pads_peek (pads, data);
1465     /* if we have a buffer check if it is better then the current best one */
1466     if (buffer != NULL) {
1467       timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
1468       gst_buffer_unref (buffer);
1469       if (best == NULL || pads->priv->compare_func (pads, data, timestamp,
1470               best, best_time, pads->priv->compare_user_data) < 0) {
1471         best = data;
1472         best_time = timestamp;
1473       }
1474     }
1475   }
1476 
1477   /* set earliest time */
1478   *data = best;
1479   *time = best_time;
1480 
1481   GST_DEBUG_OBJECT (pads, "best pad %s, best time %" GST_TIME_FORMAT,
1482       best ? GST_PAD_NAME (((GstCollectData *) best)->pad) : "(nil)",
1483       GST_TIME_ARGS (best_time));
1484 }
1485 
1486 /*
1487  * Function to recalculate earliest_data and earliest_timestamp. This also calls
1488  * gst_collect_pads_recalculate_waiting
1489  *
1490  * Must be called with STREAM_LOCK.
1491  */
1492 static gboolean
gst_collect_pads_recalculate_full(GstCollectPads * pads)1493 gst_collect_pads_recalculate_full (GstCollectPads * pads)
1494 {
1495   if (pads->priv->earliest_data)
1496     unref_data (pads->priv->earliest_data);
1497   gst_collect_pads_find_best_pad (pads, &pads->priv->earliest_data,
1498       &pads->priv->earliest_time);
1499   if (pads->priv->earliest_data)
1500     ref_data (pads->priv->earliest_data);
1501   return gst_collect_pads_recalculate_waiting (pads);
1502 }
1503 
1504 /*
1505  * Default collect callback triggered when #GstCollectPads gathered all data.
1506  *
1507  * Called with STREAM_LOCK.
1508  */
1509 static GstFlowReturn
gst_collect_pads_default_collected(GstCollectPads * pads,gpointer user_data)1510 gst_collect_pads_default_collected (GstCollectPads * pads, gpointer user_data)
1511 {
1512   GstCollectData *best = NULL;
1513   GstBuffer *buffer;
1514   GstFlowReturn ret = GST_FLOW_OK;
1515   GstCollectPadsBufferFunction func;
1516   gpointer buffer_user_data;
1517 
1518   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1519 
1520   GST_OBJECT_LOCK (pads);
1521   func = pads->priv->buffer_func;
1522   buffer_user_data = pads->priv->buffer_user_data;
1523   GST_OBJECT_UNLOCK (pads);
1524 
1525   g_return_val_if_fail (func != NULL, GST_FLOW_NOT_SUPPORTED);
1526 
1527   /* Find the oldest pad at all cost */
1528   if (gst_collect_pads_recalculate_full (pads)) {
1529     /* waiting was switched on,
1530      * so give another thread a chance to deliver a possibly
1531      * older buffer; don't charge on yet with the current oldest */
1532     ret = GST_FLOW_OK;
1533     goto done;
1534   }
1535 
1536   best = pads->priv->earliest_data;
1537 
1538   /* No data collected means EOS. */
1539   if (G_UNLIKELY (best == NULL)) {
1540     ret = func (pads, best, NULL, buffer_user_data);
1541     if (ret == GST_FLOW_OK)
1542       ret = GST_FLOW_EOS;
1543     goto done;
1544   }
1545 
1546   /* make sure that the pad we take a buffer from is waiting;
1547    * otherwise popping a buffer will seem not to have happened
1548    * and collectpads can get into a busy loop */
1549   gst_collect_pads_set_waiting (pads, best, TRUE);
1550 
1551   /* Send buffer */
1552   buffer = gst_collect_pads_pop (pads, best);
1553   ret = func (pads, best, buffer, buffer_user_data);
1554 
1555   /* maybe non-waiting was forced to waiting above due to
1556    * newsegment events coming too sparsely,
1557    * so re-check to restore state to avoid hanging/waiting */
1558   gst_collect_pads_recalculate_full (pads);
1559 
1560 done:
1561   return ret;
1562 }
1563 
1564 /*
1565  * Default timestamp compare function.
1566  */
1567 static gint
gst_collect_pads_default_compare_func(GstCollectPads * pads,GstCollectData * data1,GstClockTime timestamp1,GstCollectData * data2,GstClockTime timestamp2,gpointer user_data)1568 gst_collect_pads_default_compare_func (GstCollectPads * pads,
1569     GstCollectData * data1, GstClockTime timestamp1,
1570     GstCollectData * data2, GstClockTime timestamp2, gpointer user_data)
1571 {
1572 
1573   GST_LOG_OBJECT (pads, "comparing %" GST_TIME_FORMAT
1574       " and %" GST_TIME_FORMAT, GST_TIME_ARGS (timestamp1),
1575       GST_TIME_ARGS (timestamp2));
1576   /* non-valid timestamps go first as they are probably headers or so */
1577   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp1)))
1578     return GST_CLOCK_TIME_IS_VALID (timestamp2) ? -1 : 0;
1579 
1580   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp2)))
1581     return 1;
1582 
1583   /* compare timestamp */
1584   if (timestamp1 < timestamp2)
1585     return -1;
1586 
1587   if (timestamp1 > timestamp2)
1588     return 1;
1589 
1590   return 0;
1591 }
1592 
1593 /* called with STREAM_LOCK */
1594 static void
gst_collect_pads_handle_position_update(GstCollectPads * pads,GstCollectData * data,GstClockTime new_pos)1595 gst_collect_pads_handle_position_update (GstCollectPads * pads,
1596     GstCollectData * data, GstClockTime new_pos)
1597 {
1598   gint cmp_res;
1599 
1600   /* If oldest time is not known, or current pad got newsegment;
1601    * recalculate the state */
1602   if (!pads->priv->earliest_data || pads->priv->earliest_data == data) {
1603     gst_collect_pads_recalculate_full (pads);
1604     goto exit;
1605   }
1606 
1607   /* Check if the waiting state of the pad should change. */
1608   cmp_res =
1609       pads->priv->compare_func (pads, data, new_pos,
1610       pads->priv->earliest_data, pads->priv->earliest_time,
1611       pads->priv->compare_user_data);
1612 
1613   if (cmp_res > 0)
1614     /* Stop waiting */
1615     gst_collect_pads_set_waiting (pads, data, FALSE);
1616 
1617 exit:
1618   return;
1619 
1620 }
1621 
1622 static GstClockTime
gst_collect_pads_clip_time(GstCollectPads * pads,GstCollectData * data,GstClockTime time)1623 gst_collect_pads_clip_time (GstCollectPads * pads, GstCollectData * data,
1624     GstClockTime time)
1625 {
1626   GstClockTime otime = time;
1627   GstBuffer *in, *out = NULL;
1628 
1629   if (pads->priv->clip_func) {
1630     in = gst_buffer_new ();
1631     GST_BUFFER_PTS (in) = time;
1632     GST_BUFFER_DTS (in) = GST_CLOCK_TIME_NONE;
1633     pads->priv->clip_func (pads, data, in, &out, pads->priv->clip_user_data);
1634     if (out) {
1635       otime = GST_BUFFER_PTS (out);
1636       gst_buffer_unref (out);
1637     } else {
1638       /* FIXME should distinguish between ahead or after segment,
1639        * let's assume after segment and use some large time ... */
1640       otime = G_MAXINT64 / 2;
1641     }
1642   }
1643 
1644   return otime;
1645 }
1646 
1647 /**
1648  * gst_collect_pads_event_default:
1649  * @pads: the collectpads to use
1650  * @data: collect data of corresponding pad
1651  * @event: event being processed
1652  * @discard: process but do not send event downstream
1653  *
1654  * Default #GstCollectPads event handling that elements should always
1655  * chain up to to ensure proper operation.  Element might however indicate
1656  * event should not be forwarded downstream.
1657  */
1658 gboolean
gst_collect_pads_event_default(GstCollectPads * pads,GstCollectData * data,GstEvent * event,gboolean discard)1659 gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
1660     GstEvent * event, gboolean discard)
1661 {
1662   gboolean res = TRUE;
1663   GstCollectPadsBufferFunction buffer_func;
1664   GstObject *parent;
1665   GstPad *pad;
1666 
1667   GST_OBJECT_LOCK (pads);
1668   buffer_func = pads->priv->buffer_func;
1669   GST_OBJECT_UNLOCK (pads);
1670 
1671   pad = data->pad;
1672   parent = GST_OBJECT_PARENT (pad);
1673 
1674   GST_DEBUG_OBJECT (pad, "Got '%s' event", GST_EVENT_TYPE_NAME (event));
1675 
1676   switch (GST_EVENT_TYPE (event)) {
1677     case GST_EVENT_FLUSH_START:
1678     {
1679       if (g_atomic_int_get (&pads->priv->seeking)) {
1680         /* drop all but the first FLUSH_STARTs when seeking */
1681         if (!g_atomic_int_compare_and_exchange (&pads->
1682                 priv->pending_flush_start, TRUE, FALSE))
1683           goto eat;
1684 
1685         /* unblock collect pads */
1686         gst_pad_event_default (pad, parent, event);
1687         event = NULL;
1688 
1689         GST_COLLECT_PADS_STREAM_LOCK (pads);
1690         /* Start flushing. We never call gst_collect_pads_set_flushing (FALSE), we
1691          * instead wait until each pad gets its FLUSH_STOP and let that reset the pad to
1692          * non-flushing (which happens in gst_collect_pads_event_default).
1693          */
1694         gst_collect_pads_set_flushing (pads, TRUE);
1695 
1696         if (pads->priv->flush_func)
1697           pads->priv->flush_func (pads, pads->priv->flush_user_data);
1698 
1699         g_atomic_int_set (&pads->priv->pending_flush_stop, TRUE);
1700         GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1701 
1702         goto eat;
1703       } else {
1704         /* forward event to unblock check_collected */
1705         GST_DEBUG_OBJECT (pad, "forwarding flush start");
1706         if (!(res = gst_pad_event_default (pad, parent, event))) {
1707           GST_WARNING_OBJECT (pad, "forwarding flush start failed");
1708         }
1709         event = NULL;
1710 
1711         /* now unblock the chain function.
1712          * no cond per pad, so they all unblock,
1713          * non-flushing block again */
1714         GST_COLLECT_PADS_STREAM_LOCK (pads);
1715         GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING);
1716         gst_collect_pads_clear (pads, data);
1717 
1718         /* cater for possible default muxing functionality */
1719         if (buffer_func) {
1720           /* restore to initial state */
1721           gst_collect_pads_set_waiting (pads, data, TRUE);
1722           /* if the current pad is affected, reset state, recalculate later */
1723           if (pads->priv->earliest_data == data) {
1724             unref_data (data);
1725             pads->priv->earliest_data = NULL;
1726             pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
1727           }
1728         }
1729 
1730         GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1731 
1732         goto eat;
1733       }
1734     }
1735     case GST_EVENT_FLUSH_STOP:
1736     {
1737       /* flush the 1 buffer queue */
1738       GST_COLLECT_PADS_STREAM_LOCK (pads);
1739       GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_FLUSHING);
1740       gst_collect_pads_clear (pads, data);
1741       /* we need new segment info after the flush */
1742       gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
1743       GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1744       /* if the pad was EOS, remove the EOS flag and
1745        * decrement the number of eospads */
1746       if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
1747                   GST_COLLECT_PADS_STATE_EOS))) {
1748         if (!GST_COLLECT_PADS_STATE_IS_SET (data,
1749                 GST_COLLECT_PADS_STATE_WAITING))
1750           pads->priv->queuedpads++;
1751         if (!g_atomic_int_get (&pads->priv->seeking)) {
1752           pads->priv->eospads--;
1753         }
1754         GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
1755       }
1756       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1757 
1758       if (g_atomic_int_get (&pads->priv->seeking)) {
1759         if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_stop,
1760                 TRUE, FALSE))
1761           goto forward;
1762         else
1763           goto eat;
1764       } else {
1765         goto forward;
1766       }
1767     }
1768     case GST_EVENT_EOS:
1769     {
1770       GST_COLLECT_PADS_STREAM_LOCK (pads);
1771       /* if the pad was not EOS, make it EOS and so we
1772        * have one more eospad */
1773       if (G_LIKELY (!GST_COLLECT_PADS_STATE_IS_SET (data,
1774                   GST_COLLECT_PADS_STATE_EOS))) {
1775         GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_EOS);
1776         if (!GST_COLLECT_PADS_STATE_IS_SET (data,
1777                 GST_COLLECT_PADS_STATE_WAITING))
1778           pads->priv->queuedpads--;
1779         pads->priv->eospads++;
1780       }
1781       /* check if we need collecting anything, we ignore the result. */
1782       gst_collect_pads_check_collected (pads);
1783       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1784 
1785       goto eat;
1786     }
1787     case GST_EVENT_SEGMENT:
1788     {
1789       GstSegment seg;
1790 
1791       GST_COLLECT_PADS_STREAM_LOCK (pads);
1792 
1793       gst_event_copy_segment (event, &seg);
1794 
1795       GST_DEBUG_OBJECT (data->pad, "got segment %" GST_SEGMENT_FORMAT, &seg);
1796 
1797       /* default collection can not handle other segment formats than time */
1798       if (buffer_func && seg.format != GST_FORMAT_TIME) {
1799         GST_WARNING_OBJECT (pads, "GstCollectPads default collecting "
1800             "can only handle time segments. Non time segment ignored.");
1801         goto newsegment_done;
1802       }
1803 
1804       /* need to update segment first */
1805       data->segment = seg;
1806       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1807 
1808       /* now we can use for e.g. running time */
1809       seg.position =
1810           gst_collect_pads_clip_time (pads, data, seg.start + seg.offset);
1811       /* update again */
1812       data->segment = seg;
1813 
1814       /* default muxing functionality */
1815       if (!buffer_func)
1816         goto newsegment_done;
1817 
1818       gst_collect_pads_handle_position_update (pads, data, seg.position);
1819 
1820     newsegment_done:
1821       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1822       /* we must not forward this event since multiple segments will be
1823        * accumulated and this is certainly not what we want. */
1824       goto eat;
1825     }
1826     case GST_EVENT_GAP:
1827     {
1828       GstClockTime start, duration;
1829 
1830       GST_COLLECT_PADS_STREAM_LOCK (pads);
1831 
1832       gst_event_parse_gap (event, &start, &duration);
1833       /* FIXME, handle reverse playback case */
1834       if (GST_CLOCK_TIME_IS_VALID (duration))
1835         start += duration;
1836       /* we do not expect another buffer until after gap,
1837        * so that is our position now */
1838       data->segment.position = gst_collect_pads_clip_time (pads, data, start);
1839 
1840       gst_collect_pads_handle_position_update (pads, data,
1841           data->segment.position);
1842 
1843       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1844       goto eat;
1845     }
1846     case GST_EVENT_STREAM_START:
1847       /* drop stream start events, element must create its own start event,
1848        * we can't just forward the first random stream start event we get */
1849       goto eat;
1850     case GST_EVENT_CAPS:
1851       goto eat;
1852     default:
1853       /* forward other events */
1854       goto forward;
1855   }
1856 
1857 eat:
1858   GST_DEBUG_OBJECT (pads, "dropping event: %" GST_PTR_FORMAT, event);
1859   if (event)
1860     gst_event_unref (event);
1861   return res;
1862 
1863 forward:
1864   if (discard)
1865     goto eat;
1866   else {
1867     GST_DEBUG_OBJECT (pads, "forward event: %" GST_PTR_FORMAT, event);
1868     return gst_pad_event_default (pad, parent, event);
1869   }
1870 }
1871 
1872 typedef struct
1873 {
1874   GstEvent *event;
1875   gboolean result;
1876 } EventData;
1877 
1878 static gboolean
event_forward_func(GstPad * pad,EventData * data)1879 event_forward_func (GstPad * pad, EventData * data)
1880 {
1881   gboolean ret = TRUE;
1882   GstPad *peer = gst_pad_get_peer (pad);
1883 
1884   if (peer) {
1885     ret = gst_pad_send_event (peer, gst_event_ref (data->event));
1886     gst_object_unref (peer);
1887   }
1888 
1889   data->result &= ret;
1890   /* Always send to all pads */
1891   return FALSE;
1892 }
1893 
1894 static gboolean
forward_event_to_all_sinkpads(GstPad * srcpad,GstEvent * event)1895 forward_event_to_all_sinkpads (GstPad * srcpad, GstEvent * event)
1896 {
1897   EventData data;
1898 
1899   data.event = event;
1900   data.result = TRUE;
1901 
1902   gst_pad_forward (srcpad, (GstPadForwardFunction) event_forward_func, &data);
1903 
1904   gst_event_unref (event);
1905 
1906   return data.result;
1907 }
1908 
1909 /**
1910  * gst_collect_pads_src_event_default:
1911  * @pads: the #GstCollectPads to use
1912  * @pad: src #GstPad that received the event
1913  * @event: event being processed
1914  *
1915  * Default #GstCollectPads event handling for the src pad of elements.
1916  * Elements can chain up to this to let flushing seek event handling
1917  * be done by #GstCollectPads.
1918  *
1919  * Since: 1.4
1920  */
1921 gboolean
gst_collect_pads_src_event_default(GstCollectPads * pads,GstPad * pad,GstEvent * event)1922 gst_collect_pads_src_event_default (GstCollectPads * pads, GstPad * pad,
1923     GstEvent * event)
1924 {
1925   GstObject *parent;
1926   gboolean res = TRUE;
1927 
1928   parent = GST_OBJECT_PARENT (pad);
1929 
1930   switch (GST_EVENT_TYPE (event)) {
1931     case GST_EVENT_SEEK:{
1932       GstSeekFlags flags;
1933 
1934       pads->priv->eospads = 0;
1935 
1936       GST_INFO_OBJECT (pads, "starting seek");
1937 
1938       gst_event_parse_seek (event, NULL, NULL, &flags, NULL, NULL, NULL, NULL);
1939       if (flags & GST_SEEK_FLAG_FLUSH) {
1940         g_atomic_int_set (&pads->priv->seeking, TRUE);
1941         g_atomic_int_set (&pads->priv->pending_flush_start, TRUE);
1942         /* forward the seek upstream */
1943         res = forward_event_to_all_sinkpads (pad, event);
1944         event = NULL;
1945         if (!res) {
1946           g_atomic_int_set (&pads->priv->seeking, FALSE);
1947           g_atomic_int_set (&pads->priv->pending_flush_start, FALSE);
1948         }
1949       }
1950 
1951       GST_INFO_OBJECT (pads, "seek done, result: %d", res);
1952 
1953       break;
1954     }
1955     default:
1956       break;
1957   }
1958 
1959   if (event)
1960     res = gst_pad_event_default (pad, parent, event);
1961 
1962   return res;
1963 }
1964 
1965 static gboolean
gst_collect_pads_event_default_internal(GstCollectPads * pads,GstCollectData * data,GstEvent * event,gpointer user_data)1966 gst_collect_pads_event_default_internal (GstCollectPads * pads,
1967     GstCollectData * data, GstEvent * event, gpointer user_data)
1968 {
1969   return gst_collect_pads_event_default (pads, data, event, FALSE);
1970 }
1971 
1972 static gboolean
gst_collect_pads_event(GstPad * pad,GstObject * parent,GstEvent * event)1973 gst_collect_pads_event (GstPad * pad, GstObject * parent, GstEvent * event)
1974 {
1975   gboolean res = FALSE, need_unlock = FALSE;
1976   GstCollectData *data;
1977   GstCollectPads *pads;
1978   GstCollectPadsEventFunction event_func;
1979   gpointer event_user_data;
1980 
1981   /* some magic to get the managing collect_pads */
1982   GST_OBJECT_LOCK (pad);
1983   data = (GstCollectData *) gst_pad_get_element_private (pad);
1984   if (G_UNLIKELY (data == NULL))
1985     goto pad_removed;
1986   ref_data (data);
1987   GST_OBJECT_UNLOCK (pad);
1988 
1989   res = FALSE;
1990 
1991   pads = data->collect;
1992 
1993   GST_DEBUG_OBJECT (data->pad, "Got %s event on sink pad",
1994       GST_EVENT_TYPE_NAME (event));
1995 
1996   GST_OBJECT_LOCK (pads);
1997   event_func = pads->priv->event_func;
1998   event_user_data = pads->priv->event_user_data;
1999   GST_OBJECT_UNLOCK (pads);
2000 
2001   if (GST_EVENT_IS_SERIALIZED (event)) {
2002     GST_COLLECT_PADS_STREAM_LOCK (pads);
2003     need_unlock = TRUE;
2004   }
2005 
2006   if (G_LIKELY (event_func)) {
2007     res = event_func (pads, data, event, event_user_data);
2008   }
2009 
2010   if (need_unlock)
2011     GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2012 
2013   unref_data (data);
2014   return res;
2015 
2016   /* ERRORS */
2017 pad_removed:
2018   {
2019     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2020     GST_OBJECT_UNLOCK (pad);
2021     return FALSE;
2022   }
2023 }
2024 
2025 /**
2026  * gst_collect_pads_query_default:
2027  * @pads: the collectpads to use
2028  * @data: collect data of corresponding pad
2029  * @query: query being processed
2030  * @discard: process but do not send event downstream
2031  *
2032  * Default #GstCollectPads query handling that elements should always
2033  * chain up to to ensure proper operation.  Element might however indicate
2034  * query should not be forwarded downstream.
2035  */
2036 gboolean
gst_collect_pads_query_default(GstCollectPads * pads,GstCollectData * data,GstQuery * query,gboolean discard)2037 gst_collect_pads_query_default (GstCollectPads * pads, GstCollectData * data,
2038     GstQuery * query, gboolean discard)
2039 {
2040   gboolean res = TRUE;
2041   GstObject *parent;
2042   GstPad *pad;
2043 
2044   pad = data->pad;
2045   parent = GST_OBJECT_PARENT (pad);
2046 
2047   switch (GST_QUERY_TYPE (query)) {
2048     case GST_QUERY_SEEKING:
2049     {
2050       GstFormat format;
2051 
2052       /* don't pass it along as some (file)sink might claim it does
2053        * whereas with a collectpads in between that will not likely work */
2054       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
2055       gst_query_set_seeking (query, format, FALSE, 0, -1);
2056       res = TRUE;
2057       discard = TRUE;
2058       break;
2059     }
2060     default:
2061       break;
2062   }
2063 
2064   if (!discard)
2065     return gst_pad_query_default (pad, parent, query);
2066   else
2067     return res;
2068 }
2069 
2070 static gboolean
gst_collect_pads_query_default_internal(GstCollectPads * pads,GstCollectData * data,GstQuery * query,gpointer user_data)2071 gst_collect_pads_query_default_internal (GstCollectPads * pads,
2072     GstCollectData * data, GstQuery * query, gpointer user_data)
2073 {
2074   return gst_collect_pads_query_default (pads, data, query, FALSE);
2075 }
2076 
2077 static gboolean
gst_collect_pads_query(GstPad * pad,GstObject * parent,GstQuery * query)2078 gst_collect_pads_query (GstPad * pad, GstObject * parent, GstQuery * query)
2079 {
2080   gboolean res = FALSE, need_unlock = FALSE;
2081   GstCollectData *data;
2082   GstCollectPads *pads;
2083   GstCollectPadsQueryFunction query_func;
2084   gpointer query_user_data;
2085 
2086   GST_DEBUG_OBJECT (pad, "Got %s query on sink pad",
2087       GST_QUERY_TYPE_NAME (query));
2088 
2089   /* some magic to get the managing collect_pads */
2090   GST_OBJECT_LOCK (pad);
2091   data = (GstCollectData *) gst_pad_get_element_private (pad);
2092   if (G_UNLIKELY (data == NULL))
2093     goto pad_removed;
2094   ref_data (data);
2095   GST_OBJECT_UNLOCK (pad);
2096 
2097   pads = data->collect;
2098 
2099   GST_OBJECT_LOCK (pads);
2100   query_func = pads->priv->query_func;
2101   query_user_data = pads->priv->query_user_data;
2102   GST_OBJECT_UNLOCK (pads);
2103 
2104   if (GST_QUERY_IS_SERIALIZED (query)) {
2105     GST_COLLECT_PADS_STREAM_LOCK (pads);
2106     need_unlock = TRUE;
2107   }
2108 
2109   if (G_LIKELY (query_func)) {
2110     res = query_func (pads, data, query, query_user_data);
2111   }
2112 
2113   if (need_unlock)
2114     GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2115 
2116   unref_data (data);
2117   return res;
2118 
2119   /* ERRORS */
2120 pad_removed:
2121   {
2122     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2123     GST_OBJECT_UNLOCK (pad);
2124     return FALSE;
2125   }
2126 }
2127 
2128 
2129 /* For each buffer we receive we check if our collected condition is reached
2130  * and if so we call the collected function. When this is done we check if
2131  * data has been unqueued. If data is still queued we wait holding the stream
2132  * lock to make sure no EOS event can happen while we are ready to be
2133  * collected
2134  */
2135 static GstFlowReturn
gst_collect_pads_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)2136 gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2137 {
2138   GstCollectData *data;
2139   GstCollectPads *pads;
2140   GstFlowReturn ret;
2141   GstBuffer **buffer_p;
2142   guint32 cookie;
2143 
2144   GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2145 
2146   /* some magic to get the managing collect_pads */
2147   GST_OBJECT_LOCK (pad);
2148   data = (GstCollectData *) gst_pad_get_element_private (pad);
2149   if (G_UNLIKELY (data == NULL))
2150     goto no_data;
2151   ref_data (data);
2152   GST_OBJECT_UNLOCK (pad);
2153 
2154   pads = data->collect;
2155 
2156   GST_COLLECT_PADS_STREAM_LOCK (pads);
2157   /* if not started, bail out */
2158   if (G_UNLIKELY (!pads->priv->started))
2159     goto not_started;
2160   /* check if this pad is flushing */
2161   if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2162               GST_COLLECT_PADS_STATE_FLUSHING)))
2163     goto flushing;
2164   /* pad was EOS, we can refuse this data */
2165   if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2166               GST_COLLECT_PADS_STATE_EOS)))
2167     goto eos;
2168 
2169   /* see if we need to clip */
2170   if (pads->priv->clip_func) {
2171     GstBuffer *outbuf = NULL;
2172     ret =
2173         pads->priv->clip_func (pads, data, buffer, &outbuf,
2174         pads->priv->clip_user_data);
2175     buffer = outbuf;
2176 
2177     if (G_UNLIKELY (outbuf == NULL))
2178       goto clipped;
2179 
2180     if (G_UNLIKELY (ret == GST_FLOW_EOS))
2181       goto eos;
2182     else if (G_UNLIKELY (ret != GST_FLOW_OK))
2183       goto error;
2184   }
2185 
2186   GST_DEBUG_OBJECT (pads, "Queuing buffer %p for pad %s:%s", buffer,
2187       GST_DEBUG_PAD_NAME (pad));
2188 
2189   /* One more pad has data queued */
2190   if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
2191     pads->priv->queuedpads++;
2192   buffer_p = &data->buffer;
2193   gst_buffer_replace (buffer_p, buffer);
2194 
2195   /* update segment last position if in TIME */
2196   if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
2197     GstClockTime timestamp;
2198 
2199     timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
2200 
2201     if (GST_CLOCK_TIME_IS_VALID (timestamp))
2202       data->segment.position = timestamp;
2203   }
2204 
2205   /* While we have data queued on this pad try to collect stuff */
2206   do {
2207     /* Check if our collected condition is matched and call the collected
2208      * function if it is */
2209     ret = gst_collect_pads_check_collected (pads);
2210     /* when an error occurs, we want to report this back to the caller ASAP
2211      * without having to block if the buffer was not popped */
2212     if (G_UNLIKELY (ret != GST_FLOW_OK))
2213       goto error;
2214 
2215     /* data was consumed, we can exit and accept new data */
2216     if (data->buffer == NULL)
2217       break;
2218 
2219     /* Having the _INIT here means we don't care about any broadcast up to here
2220      * (most of which occur with STREAM_LOCK held, so could not have happened
2221      * anyway).  We do care about e.g. a remove initiated broadcast as of this
2222      * point.  Putting it here also makes this thread ignores any evt it raised
2223      * itself (as is a usual WAIT semantic).
2224      */
2225     GST_COLLECT_PADS_EVT_INIT (cookie);
2226 
2227     /* pad could be removed and re-added */
2228     unref_data (data);
2229     GST_OBJECT_LOCK (pad);
2230     if (G_UNLIKELY ((data = gst_pad_get_element_private (pad)) == NULL))
2231       goto pad_removed;
2232     ref_data (data);
2233     GST_OBJECT_UNLOCK (pad);
2234 
2235     GST_DEBUG_OBJECT (pads, "Pad %s:%s has a buffer queued, waiting",
2236         GST_DEBUG_PAD_NAME (pad));
2237 
2238     /* wait to be collected, this must happen from another thread triggered
2239      * by the _chain function of another pad. We release the lock so we
2240      * can get stopped or flushed as well. We can however not get EOS
2241      * because we still hold the STREAM_LOCK.
2242      */
2243     GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2244     GST_COLLECT_PADS_EVT_WAIT (pads, cookie);
2245     GST_COLLECT_PADS_STREAM_LOCK (pads);
2246 
2247     GST_DEBUG_OBJECT (pads, "Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
2248 
2249     /* after a signal, we could be stopped */
2250     if (G_UNLIKELY (!pads->priv->started))
2251       goto not_started;
2252     /* check if this pad is flushing */
2253     if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2254                 GST_COLLECT_PADS_STATE_FLUSHING)))
2255       goto flushing;
2256   }
2257   while (data->buffer != NULL);
2258 
2259 unlock_done:
2260   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2261   /* data is definitely NULL if pad_removed goto was run. */
2262   if (data)
2263     unref_data (data);
2264   if (buffer)
2265     gst_buffer_unref (buffer);
2266   return ret;
2267 
2268 pad_removed:
2269   {
2270     GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2271     GST_OBJECT_UNLOCK (pad);
2272     ret = GST_FLOW_NOT_LINKED;
2273     goto unlock_done;
2274   }
2275   /* ERRORS */
2276 no_data:
2277   {
2278     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2279     GST_OBJECT_UNLOCK (pad);
2280     gst_buffer_unref (buffer);
2281     return GST_FLOW_NOT_LINKED;
2282   }
2283 not_started:
2284   {
2285     GST_DEBUG ("not started");
2286     gst_collect_pads_clear (pads, data);
2287     ret = GST_FLOW_FLUSHING;
2288     goto unlock_done;
2289   }
2290 flushing:
2291   {
2292     GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
2293     gst_collect_pads_clear (pads, data);
2294     ret = GST_FLOW_FLUSHING;
2295     goto unlock_done;
2296   }
2297 eos:
2298   {
2299     /* we should not post an error for this, just inform upstream that
2300      * we don't expect anything anymore */
2301     GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
2302     ret = GST_FLOW_EOS;
2303     goto unlock_done;
2304   }
2305 clipped:
2306   {
2307     GST_DEBUG ("clipped buffer on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2308     ret = GST_FLOW_OK;
2309     goto unlock_done;
2310   }
2311 error:
2312   {
2313     /* we print the error, the element should post a reasonable error
2314      * message for fatal errors */
2315     GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret));
2316     gst_collect_pads_clear (pads, data);
2317     goto unlock_done;
2318   }
2319 }
2320