• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer aggregator base class
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @title: GstAggregator
25  * @short_description: Base class for mixers and muxers, manages a set of input
26  *     pads and aggregates their streams
27  * @see_also: gstcollectpads for historical reasons.
28  *
29  * Manages a set of pads with the purpose of aggregating their buffers.
30  * Control is given to the subclass when all pads have data.
31  *
32  *  * Base class for mixers and muxers. Subclasses should at least implement
33  *    the #GstAggregatorClass::aggregate virtual method.
34  *
35  *  * Installs a #GstPadChainFunction, a #GstPadEventFullFunction and a
36  *    #GstPadQueryFunction to queue all serialized data packets per sink pad.
37  *    Subclasses should not overwrite those, but instead implement
38  *    #GstAggregatorClass::sink_event and #GstAggregatorClass::sink_query as
39  *    needed.
40  *
41  *  * When data is queued on all pads, the aggregate vmethod is called.
42  *
43  *  * One can peek at the data on any given GstAggregatorPad with the
44  *    gst_aggregator_pad_peek_buffer() method, and remove it from the pad
45  *    with the gst_aggregator_pad_pop_buffer () method. When a buffer
46  *    has been taken with pop_buffer (), a new buffer can be queued
47  *    on that pad.
48  *
49  *  * When gst_aggregator_pad_peek_buffer() or gst_aggregator_pad_has_buffer()
50  *    are called, a reference is taken to the returned buffer, which stays
51  *    valid until either:
52  *
53  *      - gst_aggregator_pad_pop_buffer() is called, in which case the caller
54  *        is guaranteed that the buffer they receive is the same as the peeked
55  *        buffer.
56  *      - gst_aggregator_pad_drop_buffer() is called, in which case the caller
57  *        is guaranteed that the dropped buffer is the one that was peeked.
58  *      - the subclass implementation of #GstAggregatorClass.aggregate returns.
59  *
60  *    Subsequent calls to gst_aggregator_pad_peek_buffer() or
61  *    gst_aggregator_pad_has_buffer() return / check the same buffer that was
62  *    returned / checked, until one of the conditions listed above is met.
63  *
64  *    Subclasses are only allowed to call these methods from the aggregate
65  *    thread.
66  *
67  *  * If the subclass wishes to push a buffer downstream in its aggregate
68  *    implementation, it should do so through the
69  *    gst_aggregator_finish_buffer() method. This method will take care
70  *    of sending and ordering mandatory events such as stream start, caps
71  *    and segment. Buffer lists can also be pushed out with
72  *    gst_aggregator_finish_buffer_list().
73  *
74  *  * Same goes for EOS events, which should not be pushed directly by the
75  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
76  *    implementation.
77  *
78  *  * Note that the aggregator logic regarding gap event handling is to turn
79  *    these into gap buffers with matching PTS and duration. It will also
80  *    flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
81  *    to ease their identification and subsequent processing.
82  *
83  *  * Subclasses must use (a subclass of) #GstAggregatorPad for both their
84  *    sink and source pads.
85  *    See gst_element_class_add_static_pad_template_with_gtype().
86  *
87  * This class used to live in gst-plugins-bad and was moved to core.
88  *
89  * Since: 1.14
90  */
91 
92 /**
93  * SECTION: gstaggregatorpad
94  * @title: GstAggregatorPad
95  * @short_description: #GstPad subclass for pads managed by #GstAggregator
96  * @see_also: gstcollectpads for historical reasons.
97  *
98  * Pads managed by a #GstAggregator subclass.
99  *
100  * This class used to live in gst-plugins-bad and was moved to core.
101  *
102  * Since: 1.14
103  */
104 
105 #ifdef HAVE_CONFIG_H
106 #  include "config.h"
107 #endif
108 
109 #include <string.h>             /* strlen */
110 
111 #include "gstaggregator.h"
112 
113 GType
gst_aggregator_start_time_selection_get_type(void)114 gst_aggregator_start_time_selection_get_type (void)
115 {
116   static GType gtype = 0;
117 
118   if (g_once_init_enter (&gtype)) {
119     static const GEnumValue values[] = {
120       {GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
121           "GST_AGGREGATOR_START_TIME_SELECTION_ZERO", "zero"},
122       {GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
123           "GST_AGGREGATOR_START_TIME_SELECTION_FIRST", "first"},
124       {GST_AGGREGATOR_START_TIME_SELECTION_SET,
125           "GST_AGGREGATOR_START_TIME_SELECTION_SET", "set"},
126       {0, NULL, NULL}
127     };
128     GType new_type =
129         g_enum_register_static ("GstAggregatorStartTimeSelection", values);
130 
131     g_once_init_leave (&gtype, new_type);
132   }
133   return gtype;
134 }
135 
136 /*  Might become API */
137 #if 0
138 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
139     const GstTagList * tags, GstTagMergeMode mode);
140 #endif
141 static void gst_aggregator_set_latency_property (GstAggregator * agg,
142     GstClockTime latency);
143 static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
144 
145 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
146 
147 static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad,
148     GstBuffer * buffer, gboolean dequeued);
149 
150 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
151 #define GST_CAT_DEFAULT aggregator_debug
152 
153 /* Locking order, locks in this element must always be taken in this order
154  *
155  * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad)
156  * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad)
157  * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad)
158  * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST
159  * standard element object lock -> GST_OBJECT_LOCK(agg)
160  * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad)
161  * standard src pad object lock -> GST_OBJECT_LOCK(srcpad)
162  * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad)
163  */
164 
165 /* GstAggregatorPad definitions */
166 #define PAD_LOCK(pad)   G_STMT_START {                                  \
167   GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p",              \
168         g_thread_self());                                               \
169   g_mutex_lock(&pad->priv->lock);                                       \
170   GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p",                \
171         g_thread_self());                                               \
172   } G_STMT_END
173 
174 #define PAD_UNLOCK(pad)  G_STMT_START {                                 \
175   GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p",           \
176       g_thread_self());                                                 \
177   g_mutex_unlock(&pad->priv->lock);                                     \
178   GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p",             \
179         g_thread_self());                                               \
180   } G_STMT_END
181 
182 
183 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
184   GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p",   \
185         g_thread_self());                                               \
186   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),           \
187       (&((GstAggregatorPad*)pad)->priv->lock));                         \
188   GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \
189         g_thread_self());                                               \
190   } G_STMT_END
191 
192 #define PAD_BROADCAST_EVENT(pad) G_STMT_START {                        \
193   GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p",     \
194         g_thread_self());                                              \
195   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond));    \
196   } G_STMT_END
197 
198 
199 #define PAD_FLUSH_LOCK(pad)     G_STMT_START {                          \
200   GST_TRACE_OBJECT (pad, "Taking lock from thread %p",                  \
201         g_thread_self());                                               \
202   g_mutex_lock(&pad->priv->flush_lock);                                 \
203   GST_TRACE_OBJECT (pad, "Took lock from thread %p",                    \
204         g_thread_self());                                               \
205   } G_STMT_END
206 
207 #define PAD_FLUSH_UNLOCK(pad)   G_STMT_START {                          \
208   GST_TRACE_OBJECT (pad, "Releasing lock from thread %p",               \
209         g_thread_self());                                               \
210   g_mutex_unlock(&pad->priv->flush_lock);                               \
211   GST_TRACE_OBJECT (pad, "Release lock from thread %p",                 \
212         g_thread_self());                                               \
213   } G_STMT_END
214 
215 #define SRC_LOCK(self)   G_STMT_START {                             \
216   GST_TRACE_OBJECT (self, "Taking src lock from thread %p",         \
217       g_thread_self());                                             \
218   g_mutex_lock(&self->priv->src_lock);                              \
219   GST_TRACE_OBJECT (self, "Took src lock from thread %p",           \
220         g_thread_self());                                           \
221   } G_STMT_END
222 
223 #define SRC_UNLOCK(self)  G_STMT_START {                            \
224   GST_TRACE_OBJECT (self, "Releasing src lock from thread %p",      \
225         g_thread_self());                                           \
226   g_mutex_unlock(&self->priv->src_lock);                            \
227   GST_TRACE_OBJECT (self, "Released src lock from thread %p",       \
228         g_thread_self());                                           \
229   } G_STMT_END
230 
231 #define SRC_WAIT(self) G_STMT_START {                               \
232   GST_LOG_OBJECT (self, "Waiting for src on thread %p",             \
233         g_thread_self());                                           \
234   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));    \
235   GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p",        \
236         g_thread_self());                                           \
237   } G_STMT_END
238 
239 #define SRC_BROADCAST(self) G_STMT_START {                          \
240     GST_LOG_OBJECT (self, "Signaling src from thread %p",           \
241         g_thread_self());                                           \
242     if (self->priv->aggregate_id)                                   \
243       gst_clock_id_unschedule (self->priv->aggregate_id);           \
244     g_cond_broadcast(&(self->priv->src_cond));                      \
245   } G_STMT_END
246 
247 struct _GstAggregatorPadPrivate
248 {
249   /* Following fields are protected by the PAD_LOCK */
250   GstFlowReturn flow_return;
251 
252   guint32 last_flush_start_seqnum;
253   guint32 last_flush_stop_seqnum;
254 
255   /* Whether the pad hasn't received a first buffer yet */
256   gboolean first_buffer;
257   /* Whether we waited once for the pad's first buffer */
258   gboolean waited_once;
259 
260   GQueue data;                  /* buffers, events and queries */
261   GstBuffer *clipped_buffer;
262   guint num_buffers;
263   GstBuffer *peeked_buffer;
264 
265   /* used to track fill state of queues, only used with live-src and when
266    * latency property is set to > 0 */
267   GstClockTime head_position;
268   GstClockTime tail_position;
269   GstClockTime head_time;       /* running time */
270   GstClockTime tail_time;
271   GstClockTime time_level;      /* how much head is ahead of tail */
272   GstSegment head_segment;      /* segment before the queue */
273 
274   gboolean negotiated;
275 
276   gboolean eos;
277 
278   GMutex lock;
279   GCond event_cond;
280   /* This lock prevents a flush start processing happening while
281    * the chain function is also happening.
282    */
283   GMutex flush_lock;
284 
285   /* properties */
286   gboolean emit_signals;
287 };
288 
289 /* Must be called with PAD_LOCK held */
290 static void
gst_aggregator_pad_reset_unlocked(GstAggregatorPad * aggpad)291 gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
292 {
293   aggpad->priv->eos = FALSE;
294   aggpad->priv->flow_return = GST_FLOW_OK;
295   GST_OBJECT_LOCK (aggpad);
296   gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
297   gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED);
298   GST_OBJECT_UNLOCK (aggpad);
299   aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
300   aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
301   aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
302   aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
303   aggpad->priv->time_level = 0;
304   aggpad->priv->first_buffer = TRUE;
305   aggpad->priv->waited_once = FALSE;
306 }
307 
308 static gboolean
gst_aggregator_pad_flush(GstAggregatorPad * aggpad,GstAggregator * agg)309 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
310 {
311   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
312 
313   PAD_LOCK (aggpad);
314   gst_aggregator_pad_reset_unlocked (aggpad);
315   PAD_UNLOCK (aggpad);
316 
317   if (klass->flush)
318     return (klass->flush (aggpad, agg) == GST_FLOW_OK);
319 
320   return TRUE;
321 }
322 
323 /**
324  * gst_aggregator_peek_next_sample:
325  *
326  * Use this function to determine what input buffers will be aggregated
327  * to produce the next output buffer. This should only be called from
328  * a #GstAggregator::samples-selected handler, and can be used to precisely
329  * control aggregating parameters for a given set of input samples.
330  *
331  * Returns: (nullable) (transfer full): The sample that is about to be aggregated. It may hold a #GstBuffer
332  *   or a #GstBufferList. The contents of its info structure is subclass-dependent,
333  *   and documented on a subclass basis. The buffers held by the sample are
334  *   not writable.
335  * Since: 1.18
336  */
337 GstSample *
gst_aggregator_peek_next_sample(GstAggregator * agg,GstAggregatorPad * aggpad)338 gst_aggregator_peek_next_sample (GstAggregator * agg, GstAggregatorPad * aggpad)
339 {
340   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (agg);
341 
342   if (klass->peek_next_sample)
343     return (klass->peek_next_sample (agg, aggpad));
344 
345   return NULL;
346 }
347 
348 /*************************************
349  * GstAggregator implementation  *
350  *************************************/
351 static GstElementClass *aggregator_parent_class = NULL;
352 static gint aggregator_private_offset = 0;
353 
354 /* All members are protected by the object lock unless otherwise noted */
355 
356 struct _GstAggregatorPrivate
357 {
358   gint max_padserial;
359 
360   /* Our state is >= PAUSED */
361   gboolean running;             /* protected by src_lock */
362 
363   /* seqnum from last seek or common seqnum to flush start events received
364    * on all pads, for flushing without a seek */
365   guint32 next_seqnum;
366   /* seqnum to apply to synthetic segment/eos events */
367   guint32 seqnum;
368   gboolean send_stream_start;   /* protected by srcpad stream lock */
369   gboolean send_segment;
370   gboolean flushing;
371   gboolean send_eos;            /* protected by srcpad stream lock */
372 
373   GstCaps *srccaps;             /* protected by the srcpad stream lock */
374 
375   GstTagList *tags;
376   gboolean tags_changed;
377 
378   gboolean peer_latency_live;   /* protected by src_lock */
379   GstClockTime peer_latency_min;        /* protected by src_lock */
380   GstClockTime peer_latency_max;        /* protected by src_lock */
381   gboolean has_peer_latency;    /* protected by src_lock */
382 
383   GstClockTime sub_latency_min; /* protected by src_lock */
384   GstClockTime sub_latency_max; /* protected by src_lock */
385 
386   GstClockTime upstream_latency_min;    /* protected by src_lock */
387 
388   /* aggregate */
389   GstClockID aggregate_id;      /* protected by src_lock */
390   gboolean selected_samples_called_or_warned;   /* protected by src_lock */
391   GMutex src_lock;
392   GCond src_cond;
393 
394   gboolean first_buffer;        /* protected by object lock */
395   GstAggregatorStartTimeSelection start_time_selection;
396   GstClockTime start_time;
397 
398   /* protected by the object lock */
399   GstQuery *allocation_query;
400   GstAllocator *allocator;
401   GstBufferPool *pool;
402   GstAllocationParams allocation_params;
403 
404   /* properties */
405   gint64 latency;               /* protected by both src_lock and all pad locks */
406   gboolean emit_signals;
407   gboolean ignore_inactive_pads;
408 };
409 
410 /* Seek event forwarding helper */
411 typedef struct
412 {
413   /* parameters */
414   GstEvent *event;
415   gboolean flush;
416   gboolean only_to_active_pads;
417 
418   /* results */
419   gboolean result;
420   gboolean one_actually_seeked;
421 } EventData;
422 
423 #define DEFAULT_LATENCY              0
424 #define DEFAULT_MIN_UPSTREAM_LATENCY              0
425 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
426 #define DEFAULT_START_TIME           (-1)
427 #define DEFAULT_EMIT_SIGNALS         FALSE
428 
429 enum
430 {
431   PROP_0,
432   PROP_LATENCY,
433   PROP_MIN_UPSTREAM_LATENCY,
434   PROP_START_TIME_SELECTION,
435   PROP_START_TIME,
436   PROP_EMIT_SIGNALS,
437   PROP_LAST
438 };
439 
440 enum
441 {
442   SIGNAL_SAMPLES_SELECTED,
443   LAST_SIGNAL,
444 };
445 
446 static guint gst_aggregator_signals[LAST_SIGNAL] = { 0 };
447 
448 static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
449     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
450 
451 static gboolean
gst_aggregator_pad_queue_is_empty(GstAggregatorPad * pad)452 gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
453 {
454   return (g_queue_peek_tail (&pad->priv->data) == NULL &&
455       pad->priv->clipped_buffer == NULL);
456 }
457 
458 /* Will return FALSE if there's no buffer available on every non-EOS pad, or
459  * if at least one of the pads has an event or query at the top of its queue.
460  *
461  * Only returns TRUE if all non-EOS pads have a buffer available at the top of
462  * their queue or a clipped buffer already.
463  */
464 static gboolean
gst_aggregator_check_pads_ready(GstAggregator * self,gboolean * have_event_or_query_ret)465 gst_aggregator_check_pads_ready (GstAggregator * self,
466     gboolean * have_event_or_query_ret)
467 {
468   GstAggregatorPad *pad = NULL;
469   GList *l, *sinkpads;
470   gboolean have_buffer = TRUE;
471   gboolean have_event_or_query = FALSE;
472   guint n_ready = 0;
473 
474   GST_LOG_OBJECT (self, "checking pads");
475 
476   GST_OBJECT_LOCK (self);
477 
478   sinkpads = GST_ELEMENT_CAST (self)->sinkpads;
479   if (sinkpads == NULL)
480     goto no_sinkpads;
481 
482   for (l = sinkpads; l != NULL; l = l->next) {
483     pad = l->data;
484 
485     PAD_LOCK (pad);
486 
487     /* If there's an event or query at the top of the queue and we don't yet
488      * have taken the top buffer out and stored it as clip_buffer, remember
489      * that and exit the loop. We first have to handle all events/queries
490      * before we handle any buffers. */
491     if (!pad->priv->clipped_buffer
492         && (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data))
493             || GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))) {
494       PAD_UNLOCK (pad);
495       have_event_or_query = TRUE;
496       break;
497     }
498 
499     if (self->priv->ignore_inactive_pads && self->priv->peer_latency_live &&
500         pad->priv->waited_once && pad->priv->first_buffer && !pad->priv->eos) {
501       PAD_UNLOCK (pad);
502       continue;
503     }
504 
505     /* Otherwise check if we have a clipped buffer or a buffer at the top of
506      * the queue, and if not then this pad is not ready unless it is also EOS */
507     if (!pad->priv->clipped_buffer
508         && !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
509       /* We must not have any buffers at all in this pad then as otherwise we
510        * would've had an event/query at the top of the queue */
511       g_assert (pad->priv->num_buffers == 0);
512 
513       /* Only consider this pad as worth waiting for if it's not already EOS.
514        * There's no point in waiting for buffers on EOS pads */
515       if (!pad->priv->eos)
516         have_buffer = FALSE;
517       else
518         n_ready++;
519     } else if (self->priv->peer_latency_live) {
520       /* In live mode, having a single pad with buffers is enough to
521        * generate a start time from it. In non-live mode all pads need
522        * to have a buffer
523        */
524       self->priv->first_buffer = FALSE;
525       n_ready++;
526     }
527 
528     PAD_UNLOCK (pad);
529   }
530 
531   if (self->priv->ignore_inactive_pads && self->priv->peer_latency_live
532       && n_ready == 0)
533     goto no_sinkpads;
534 
535   if (have_event_or_query)
536     goto pad_not_ready_but_event_or_query;
537 
538   if (!have_buffer)
539     goto pad_not_ready;
540 
541   if (have_buffer)
542     self->priv->first_buffer = FALSE;
543 
544   GST_OBJECT_UNLOCK (self);
545   GST_LOG_OBJECT (self, "pads are ready");
546 
547   if (have_event_or_query_ret)
548     *have_event_or_query_ret = have_event_or_query;
549 
550   return TRUE;
551 
552 no_sinkpads:
553   {
554     GST_LOG_OBJECT (self, "pads not ready: no sink pads");
555     GST_OBJECT_UNLOCK (self);
556 
557     if (have_event_or_query_ret)
558       *have_event_or_query_ret = have_event_or_query;
559 
560     return FALSE;
561   }
562 pad_not_ready:
563   {
564     GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
565     GST_OBJECT_UNLOCK (self);
566 
567     if (have_event_or_query_ret)
568       *have_event_or_query_ret = have_event_or_query;
569 
570     return FALSE;
571   }
572 pad_not_ready_but_event_or_query:
573   {
574     GST_LOG_OBJECT (pad,
575         "pad not ready to be aggregated yet, need to handle serialized event or query first");
576     GST_OBJECT_UNLOCK (self);
577 
578     if (have_event_or_query_ret)
579       *have_event_or_query_ret = have_event_or_query;
580 
581     return FALSE;
582   }
583 }
584 
585 static void
gst_aggregator_reset_flow_values(GstAggregator * self)586 gst_aggregator_reset_flow_values (GstAggregator * self)
587 {
588   GST_OBJECT_LOCK (self);
589   self->priv->send_stream_start = TRUE;
590   self->priv->send_segment = TRUE;
591   gst_segment_init (&GST_AGGREGATOR_PAD (self->srcpad)->segment,
592       GST_FORMAT_TIME);
593   /* Initialize to -1 so we set it to the start position once the first buffer
594    * is handled in gst_aggregator_pad_chain_internal() */
595   GST_AGGREGATOR_PAD (self->srcpad)->segment.position = -1;
596   self->priv->first_buffer = TRUE;
597   GST_OBJECT_UNLOCK (self);
598 }
599 
600 static inline void
gst_aggregator_push_mandatory_events(GstAggregator * self,gboolean up_to_caps)601 gst_aggregator_push_mandatory_events (GstAggregator * self, gboolean up_to_caps)
602 {
603   GstAggregatorPrivate *priv = self->priv;
604   GstEvent *segment = NULL;
605   GstEvent *tags = NULL;
606 
607   if (self->priv->send_stream_start) {
608     gchar s_id[32];
609 
610     GST_INFO_OBJECT (self, "pushing stream start");
611     /* stream-start (FIXME: create id based on input ids) */
612     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
613     if (!gst_pad_push_event (GST_PAD (self->srcpad),
614             gst_event_new_stream_start (s_id))) {
615       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
616     }
617     self->priv->send_stream_start = FALSE;
618   }
619 
620   if (self->priv->srccaps) {
621     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
622         self->priv->srccaps);
623     if (!gst_pad_push_event (GST_PAD (self->srcpad),
624             gst_event_new_caps (self->priv->srccaps))) {
625       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
626     }
627     gst_caps_unref (self->priv->srccaps);
628     self->priv->srccaps = NULL;
629   }
630 
631   if (up_to_caps)
632     return;
633 
634   GST_OBJECT_LOCK (self);
635   if (self->priv->send_segment && !self->priv->flushing) {
636     segment =
637         gst_event_new_segment (&GST_AGGREGATOR_PAD (self->srcpad)->segment);
638 
639     if (!self->priv->seqnum)
640       /* This code-path is in preparation to be able to run without a source
641        * connected. Then we won't have a seq-num from a segment event. */
642       self->priv->seqnum = gst_event_get_seqnum (segment);
643     else
644       gst_event_set_seqnum (segment, self->priv->seqnum);
645     self->priv->send_segment = FALSE;
646 
647     GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
648   }
649 
650   if (priv->tags && priv->tags_changed && !self->priv->flushing) {
651     tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
652     priv->tags_changed = FALSE;
653   }
654   GST_OBJECT_UNLOCK (self);
655 
656   if (segment)
657     gst_pad_push_event (self->srcpad, segment);
658   if (tags)
659     gst_pad_push_event (self->srcpad, tags);
660 }
661 
662 /**
663  * gst_aggregator_set_src_caps:
664  * @self: The #GstAggregator
665  * @caps: The #GstCaps to set on the src pad.
666  *
667  * Sets the caps to be used on the src pad.
668  */
669 void
gst_aggregator_set_src_caps(GstAggregator * self,GstCaps * caps)670 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
671 {
672   GstCaps *old_caps;
673 
674   GST_PAD_STREAM_LOCK (self->srcpad);
675 
676   if (caps && (old_caps = gst_pad_get_current_caps (self->srcpad))) {
677     if (gst_caps_is_equal (caps, old_caps)) {
678       GST_DEBUG_OBJECT (self,
679           "New caps are the same as the previously set caps %" GST_PTR_FORMAT,
680           old_caps);
681       gst_caps_unref (old_caps);
682       GST_PAD_STREAM_UNLOCK (self->srcpad);
683       return;
684     }
685     gst_caps_unref (old_caps);
686   }
687 
688   gst_caps_replace (&self->priv->srccaps, caps);
689   gst_aggregator_push_mandatory_events (self, TRUE);
690   GST_PAD_STREAM_UNLOCK (self->srcpad);
691 }
692 
693 static GstFlowReturn
gst_aggregator_default_finish_buffer(GstAggregator * self,GstBuffer * buffer)694 gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer)
695 {
696   gst_aggregator_push_mandatory_events (self, FALSE);
697 
698   GST_OBJECT_LOCK (self);
699   if (!self->priv->flushing && gst_pad_is_active (self->srcpad)) {
700     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
701     GST_OBJECT_UNLOCK (self);
702     return gst_pad_push (self->srcpad, buffer);
703   } else {
704     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
705         self->priv->flushing, gst_pad_is_active (self->srcpad));
706     GST_OBJECT_UNLOCK (self);
707     gst_buffer_unref (buffer);
708     return GST_FLOW_OK;
709   }
710 }
711 
712 /**
713  * gst_aggregator_finish_buffer:
714  * @aggregator: The #GstAggregator
715  * @buffer: (transfer full): the #GstBuffer to push.
716  *
717  * This method will push the provided output buffer downstream. If needed,
718  * mandatory events such as stream-start, caps, and segment events will be
719  * sent before pushing the buffer.
720  */
721 GstFlowReturn
gst_aggregator_finish_buffer(GstAggregator * aggregator,GstBuffer * buffer)722 gst_aggregator_finish_buffer (GstAggregator * aggregator, GstBuffer * buffer)
723 {
724   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator);
725 
726   g_assert (klass->finish_buffer != NULL);
727 
728   return klass->finish_buffer (aggregator, buffer);
729 }
730 
731 static GstFlowReturn
gst_aggregator_default_finish_buffer_list(GstAggregator * self,GstBufferList * bufferlist)732 gst_aggregator_default_finish_buffer_list (GstAggregator * self,
733     GstBufferList * bufferlist)
734 {
735   gst_aggregator_push_mandatory_events (self, FALSE);
736 
737   GST_OBJECT_LOCK (self);
738   if (!self->priv->flushing && gst_pad_is_active (self->srcpad)) {
739     GST_TRACE_OBJECT (self, "pushing bufferlist%" GST_PTR_FORMAT, bufferlist);
740     GST_OBJECT_UNLOCK (self);
741     return gst_pad_push_list (self->srcpad, bufferlist);
742   } else {
743     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
744         self->priv->flushing, gst_pad_is_active (self->srcpad));
745     GST_OBJECT_UNLOCK (self);
746     gst_buffer_list_unref (bufferlist);
747     return GST_FLOW_OK;
748   }
749 }
750 
751 /**
752  * gst_aggregator_finish_buffer_list:
753  * @aggregator: The #GstAggregator
754  * @bufferlist: (transfer full): the #GstBufferList to push.
755  *
756  * This method will push the provided output buffer list downstream. If needed,
757  * mandatory events such as stream-start, caps, and segment events will be
758  * sent before pushing the buffer.
759  *
760  * Since: 1.18
761  */
762 GstFlowReturn
gst_aggregator_finish_buffer_list(GstAggregator * aggregator,GstBufferList * bufferlist)763 gst_aggregator_finish_buffer_list (GstAggregator * aggregator,
764     GstBufferList * bufferlist)
765 {
766   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator);
767 
768   g_assert (klass->finish_buffer_list != NULL);
769 
770   return klass->finish_buffer_list (aggregator, bufferlist);
771 }
772 
773 static void
gst_aggregator_push_eos(GstAggregator * self)774 gst_aggregator_push_eos (GstAggregator * self)
775 {
776   GstEvent *event;
777   gst_aggregator_push_mandatory_events (self, FALSE);
778 
779   event = gst_event_new_eos ();
780 
781   GST_OBJECT_LOCK (self);
782   self->priv->send_eos = FALSE;
783   gst_event_set_seqnum (event, self->priv->seqnum);
784   GST_OBJECT_UNLOCK (self);
785 
786   gst_pad_push_event (self->srcpad, event);
787 }
788 
789 static GstClockTime
gst_aggregator_get_next_time(GstAggregator * self)790 gst_aggregator_get_next_time (GstAggregator * self)
791 {
792   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
793 
794   if (klass->get_next_time)
795     return klass->get_next_time (self);
796 
797   return GST_CLOCK_TIME_NONE;
798 }
799 
800 static gboolean
gst_aggregator_wait_and_check(GstAggregator * self,gboolean * timeout)801 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
802 {
803   GstClockTime latency;
804   GstClockTime start;
805   gboolean res;
806   gboolean have_event_or_query = FALSE;
807 
808   *timeout = FALSE;
809 
810   SRC_LOCK (self);
811 
812   latency = gst_aggregator_get_latency_unlocked (self);
813 
814   if (gst_aggregator_check_pads_ready (self, &have_event_or_query)) {
815     GST_DEBUG_OBJECT (self, "all pads have data");
816     SRC_UNLOCK (self);
817 
818     return TRUE;
819   }
820 
821   /* If we have an event or query, immediately return FALSE instead of waiting
822    * and handle it immediately */
823   if (have_event_or_query) {
824     GST_DEBUG_OBJECT (self, "Have serialized event or query to handle first");
825     SRC_UNLOCK (self);
826     return FALSE;
827   }
828 
829   /* Before waiting, check if we're actually still running */
830   if (!self->priv->running || !self->priv->send_eos) {
831     SRC_UNLOCK (self);
832 
833     return FALSE;
834   }
835 
836   start = gst_aggregator_get_next_time (self);
837 
838   /* If we're not live, or if we use the running time
839    * of the first buffer as start time, we wait until
840    * all pads have buffers.
841    * Otherwise (i.e. if we are live!), we wait on the clock
842    * and if a pad does not have a buffer in time we ignore
843    * that pad.
844    */
845   GST_OBJECT_LOCK (self);
846   if (!GST_CLOCK_TIME_IS_VALID (latency) ||
847       !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) ||
848       !GST_CLOCK_TIME_IS_VALID (start) ||
849       (self->priv->first_buffer
850           && self->priv->start_time_selection ==
851           GST_AGGREGATOR_START_TIME_SELECTION_FIRST)) {
852     /* We wake up here when something happened, and below
853      * then check if we're ready now. If we return FALSE,
854      * we will be directly called again.
855      */
856     GST_OBJECT_UNLOCK (self);
857     SRC_WAIT (self);
858   } else {
859     GstClockTime base_time, time;
860     GstClock *clock;
861     GstClockReturn status;
862     GstClockTimeDiff jitter;
863 
864     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
865         GST_TIME_ARGS (start));
866 
867     base_time = GST_ELEMENT_CAST (self)->base_time;
868     clock = gst_object_ref (GST_ELEMENT_CLOCK (self));
869     GST_OBJECT_UNLOCK (self);
870 
871     time = base_time + start;
872     time += latency;
873 
874     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
875         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
876         " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")",
877         GST_TIME_ARGS (time),
878         GST_TIME_ARGS (base_time),
879         GST_TIME_ARGS (start), GST_TIME_ARGS (latency),
880         GST_TIME_ARGS (gst_clock_get_time (clock)));
881 
882     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
883     gst_object_unref (clock);
884     SRC_UNLOCK (self);
885 
886     jitter = 0;
887     status = gst_clock_id_wait (self->priv->aggregate_id, &jitter);
888 
889     SRC_LOCK (self);
890     if (self->priv->aggregate_id) {
891       gst_clock_id_unref (self->priv->aggregate_id);
892       self->priv->aggregate_id = NULL;
893     }
894 
895     GST_DEBUG_OBJECT (self,
896         "clock returned %d (jitter: %" GST_STIME_FORMAT ")",
897         status, GST_STIME_ARGS (jitter));
898 
899     /* we timed out */
900     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
901       GList *l;
902 
903       GST_OBJECT_LOCK (self);
904       for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
905         GstAggregatorPad *pad = GST_AGGREGATOR_PAD (l->data);
906 
907         PAD_LOCK (pad);
908         pad->priv->waited_once = TRUE;
909         PAD_UNLOCK (pad);
910       }
911       GST_OBJECT_UNLOCK (self);
912 
913       SRC_UNLOCK (self);
914       *timeout = TRUE;
915       return TRUE;
916     }
917   }
918 
919   res = gst_aggregator_check_pads_ready (self, NULL);
920   SRC_UNLOCK (self);
921 
922   return res;
923 }
924 
925 typedef struct
926 {
927   gboolean processed_event;
928   GstFlowReturn flow_ret;
929 } DoHandleEventsAndQueriesData;
930 
931 static gboolean
gst_aggregator_do_events_and_queries(GstElement * self,GstPad * epad,gpointer user_data)932 gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
933     gpointer user_data)
934 {
935   GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
936   GstAggregator *aggregator = GST_AGGREGATOR_CAST (self);
937   GstEvent *event = NULL;
938   GstQuery *query = NULL;
939   GstAggregatorClass *klass = NULL;
940   DoHandleEventsAndQueriesData *data = user_data;
941 
942   do {
943     event = NULL;
944     query = NULL;
945 
946     PAD_LOCK (pad);
947     if (pad->priv->clipped_buffer == NULL &&
948         !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
949       if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data)))
950         event = gst_event_ref (g_queue_peek_tail (&pad->priv->data));
951       if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))
952         query = g_queue_peek_tail (&pad->priv->data);
953     }
954     PAD_UNLOCK (pad);
955     if (event || query) {
956       gboolean ret;
957 
958       data->processed_event = TRUE;
959       if (klass == NULL)
960         klass = GST_AGGREGATOR_GET_CLASS (self);
961 
962       if (event) {
963         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
964         gst_event_ref (event);
965         ret = klass->sink_event (aggregator, pad, event);
966 
967         PAD_LOCK (pad);
968         if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) {
969           pad->priv->negotiated = ret;
970         }
971         if (g_queue_peek_tail (&pad->priv->data) == event)
972           gst_event_unref (g_queue_pop_tail (&pad->priv->data));
973         gst_event_unref (event);
974       } else if (query) {
975         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query);
976         ret = klass->sink_query (aggregator, pad, query);
977 
978         PAD_LOCK (pad);
979         if (g_queue_peek_tail (&pad->priv->data) == query) {
980           GstStructure *s;
981 
982           s = gst_query_writable_structure (query);
983           gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret,
984               NULL);
985           g_queue_pop_tail (&pad->priv->data);
986         }
987       }
988 
989       PAD_BROADCAST_EVENT (pad);
990       PAD_UNLOCK (pad);
991     }
992   } while (event || query);
993 
994   return TRUE;
995 }
996 
997 static gboolean
gst_aggregator_pad_skip_buffers(GstElement * self,GstPad * epad,gpointer user_data)998 gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
999     gpointer user_data)
1000 {
1001   GList *item;
1002   GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
1003   GstAggregator *agg = (GstAggregator *) self;
1004   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
1005 
1006   if (!klass->skip_buffer)
1007     return FALSE;
1008 
1009   PAD_LOCK (aggpad);
1010 
1011   item = g_queue_peek_tail_link (&aggpad->priv->data);
1012   while (item) {
1013     GList *prev = item->prev;
1014 
1015     if (GST_IS_BUFFER (item->data)
1016         && klass->skip_buffer (aggpad, agg, item->data)) {
1017       GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
1018       gst_aggregator_pad_buffer_consumed (aggpad, GST_BUFFER (item->data),
1019           TRUE);
1020       gst_buffer_unref (item->data);
1021       g_queue_delete_link (&aggpad->priv->data, item);
1022     } else {
1023       break;
1024     }
1025 
1026     item = prev;
1027   }
1028 
1029   PAD_UNLOCK (aggpad);
1030 
1031   return TRUE;
1032 }
1033 
1034 static gboolean
gst_aggregator_pad_reset_peeked_buffer(GstElement * self,GstPad * epad,gpointer user_data)1035 gst_aggregator_pad_reset_peeked_buffer (GstElement * self, GstPad * epad,
1036     gpointer user_data)
1037 {
1038   GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
1039 
1040   PAD_LOCK (aggpad);
1041 
1042   gst_buffer_replace (&aggpad->priv->peeked_buffer, NULL);
1043 
1044   PAD_UNLOCK (aggpad);
1045 
1046   return TRUE;
1047 }
1048 
1049 
1050 static void
gst_aggregator_pad_set_flushing(GstAggregatorPad * aggpad,GstFlowReturn flow_return,gboolean full)1051 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
1052     GstFlowReturn flow_return, gboolean full)
1053 {
1054   GList *item;
1055 
1056   PAD_LOCK (aggpad);
1057   if (flow_return == GST_FLOW_NOT_LINKED)
1058     aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
1059   else
1060     aggpad->priv->flow_return = flow_return;
1061 
1062   item = g_queue_peek_head_link (&aggpad->priv->data);
1063   while (item) {
1064     GList *next = item->next;
1065 
1066     /* In partial flush, we do like the pad, we get rid of non-sticky events
1067      * and EOS/SEGMENT.
1068      */
1069     if (full || GST_IS_BUFFER (item->data) ||
1070         GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
1071         GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
1072         !GST_EVENT_IS_STICKY (item->data)) {
1073       if (!GST_IS_QUERY (item->data))
1074         gst_mini_object_unref (item->data);
1075       g_queue_delete_link (&aggpad->priv->data, item);
1076     }
1077     item = next;
1078   }
1079   aggpad->priv->num_buffers = 0;
1080   gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
1081 
1082   PAD_BROADCAST_EVENT (aggpad);
1083   PAD_UNLOCK (aggpad);
1084 }
1085 
1086 static GstFlowReturn
gst_aggregator_default_update_src_caps(GstAggregator * agg,GstCaps * caps,GstCaps ** ret)1087 gst_aggregator_default_update_src_caps (GstAggregator * agg, GstCaps * caps,
1088     GstCaps ** ret)
1089 {
1090   *ret = gst_caps_ref (caps);
1091 
1092   return GST_FLOW_OK;
1093 }
1094 
1095 static GstCaps *
gst_aggregator_default_fixate_src_caps(GstAggregator * agg,GstCaps * caps)1096 gst_aggregator_default_fixate_src_caps (GstAggregator * agg, GstCaps * caps)
1097 {
1098   caps = gst_caps_fixate (caps);
1099 
1100   return caps;
1101 }
1102 
1103 static gboolean
gst_aggregator_default_negotiated_src_caps(GstAggregator * agg,GstCaps * caps)1104 gst_aggregator_default_negotiated_src_caps (GstAggregator * agg, GstCaps * caps)
1105 {
1106   return TRUE;
1107 }
1108 
1109 
1110 /* takes ownership of the pool, allocator and query */
1111 static gboolean
gst_aggregator_set_allocation(GstAggregator * self,GstBufferPool * pool,GstAllocator * allocator,const GstAllocationParams * params,GstQuery * query)1112 gst_aggregator_set_allocation (GstAggregator * self,
1113     GstBufferPool * pool, GstAllocator * allocator,
1114     const GstAllocationParams * params, GstQuery * query)
1115 {
1116   GstAllocator *oldalloc;
1117   GstBufferPool *oldpool;
1118   GstQuery *oldquery;
1119 
1120   GST_DEBUG ("storing allocation query");
1121 
1122   GST_OBJECT_LOCK (self);
1123   oldpool = self->priv->pool;
1124   self->priv->pool = pool;
1125 
1126   oldalloc = self->priv->allocator;
1127   self->priv->allocator = allocator;
1128 
1129   oldquery = self->priv->allocation_query;
1130   self->priv->allocation_query = query;
1131 
1132   if (params)
1133     self->priv->allocation_params = *params;
1134   else
1135     gst_allocation_params_init (&self->priv->allocation_params);
1136   GST_OBJECT_UNLOCK (self);
1137 
1138   if (oldpool) {
1139     GST_DEBUG_OBJECT (self, "deactivating old pool %p", oldpool);
1140     gst_buffer_pool_set_active (oldpool, FALSE);
1141     gst_object_unref (oldpool);
1142   }
1143   if (oldalloc) {
1144     gst_object_unref (oldalloc);
1145   }
1146   if (oldquery) {
1147     gst_query_unref (oldquery);
1148   }
1149   return TRUE;
1150 }
1151 
1152 
1153 static gboolean
gst_aggregator_decide_allocation(GstAggregator * self,GstQuery * query)1154 gst_aggregator_decide_allocation (GstAggregator * self, GstQuery * query)
1155 {
1156   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
1157 
1158   if (aggclass->decide_allocation)
1159     if (!aggclass->decide_allocation (self, query))
1160       return FALSE;
1161 
1162   return TRUE;
1163 }
1164 
1165 static gboolean
gst_aggregator_do_allocation(GstAggregator * self,GstCaps * caps)1166 gst_aggregator_do_allocation (GstAggregator * self, GstCaps * caps)
1167 {
1168   GstQuery *query;
1169   gboolean result = TRUE;
1170   GstBufferPool *pool = NULL;
1171   GstAllocator *allocator;
1172   GstAllocationParams params;
1173 
1174   /* find a pool for the negotiated caps now */
1175   GST_DEBUG_OBJECT (self, "doing allocation query");
1176   query = gst_query_new_allocation (caps, TRUE);
1177   if (!gst_pad_peer_query (self->srcpad, query)) {
1178     /* not a problem, just debug a little */
1179     GST_DEBUG_OBJECT (self, "peer ALLOCATION query failed");
1180   }
1181 
1182   GST_DEBUG_OBJECT (self, "calling decide_allocation");
1183   result = gst_aggregator_decide_allocation (self, query);
1184 
1185   GST_DEBUG_OBJECT (self, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
1186       query);
1187 
1188   if (!result)
1189     goto no_decide_allocation;
1190 
1191   /* we got configuration from our peer or the decide_allocation method,
1192    * parse them */
1193   if (gst_query_get_n_allocation_params (query) > 0) {
1194     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
1195   } else {
1196     allocator = NULL;
1197     gst_allocation_params_init (&params);
1198   }
1199 
1200   if (gst_query_get_n_allocation_pools (query) > 0)
1201     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
1202 
1203   /* now store */
1204   result =
1205       gst_aggregator_set_allocation (self, pool, allocator, &params, query);
1206 
1207   return result;
1208 
1209   /* Errors */
1210 no_decide_allocation:
1211   {
1212     GST_WARNING_OBJECT (self, "Failed to decide allocation");
1213     gst_query_unref (query);
1214 
1215     return result;
1216   }
1217 
1218 }
1219 
1220 static gboolean
gst_aggregator_default_negotiate(GstAggregator * self)1221 gst_aggregator_default_negotiate (GstAggregator * self)
1222 {
1223   GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self);
1224   GstCaps *downstream_caps, *template_caps, *caps = NULL;
1225   GstFlowReturn ret = GST_FLOW_OK;
1226 
1227   template_caps = gst_pad_get_pad_template_caps (self->srcpad);
1228   downstream_caps = gst_pad_peer_query_caps (self->srcpad, template_caps);
1229 
1230   if (gst_caps_is_empty (downstream_caps)) {
1231     GST_INFO_OBJECT (self, "Downstream caps (%"
1232         GST_PTR_FORMAT ") not compatible with pad template caps (%"
1233         GST_PTR_FORMAT ")", downstream_caps, template_caps);
1234     ret = GST_FLOW_NOT_NEGOTIATED;
1235     goto done;
1236   }
1237 
1238   g_assert (agg_klass->update_src_caps);
1239   GST_DEBUG_OBJECT (self, "updating caps from %" GST_PTR_FORMAT,
1240       downstream_caps);
1241   ret = agg_klass->update_src_caps (self, downstream_caps, &caps);
1242   if (ret < GST_FLOW_OK) {
1243     GST_WARNING_OBJECT (self, "Subclass failed to update provided caps");
1244     goto done;
1245   } else if (ret == GST_AGGREGATOR_FLOW_NEED_DATA) {
1246     GST_DEBUG_OBJECT (self, "Subclass needs more data to decide on caps");
1247     goto done;
1248   }
1249   if ((caps == NULL || gst_caps_is_empty (caps)) && ret >= GST_FLOW_OK) {
1250     ret = GST_FLOW_NOT_NEGOTIATED;
1251     goto done;
1252   }
1253   GST_DEBUG_OBJECT (self, "               to %" GST_PTR_FORMAT, caps);
1254 
1255 #ifdef GST_ENABLE_EXTRA_CHECKS
1256   if (!gst_caps_is_subset (caps, template_caps)) {
1257     GstCaps *intersection;
1258 
1259     GST_ERROR_OBJECT (self,
1260         "update_src_caps returned caps %" GST_PTR_FORMAT
1261         " which are not a real subset of the template caps %"
1262         GST_PTR_FORMAT, caps, template_caps);
1263     g_warning ("%s: update_src_caps returned caps which are not a real "
1264         "subset of the filter caps", GST_ELEMENT_NAME (self));
1265 
1266     intersection =
1267         gst_caps_intersect_full (template_caps, caps, GST_CAPS_INTERSECT_FIRST);
1268     gst_caps_unref (caps);
1269     caps = intersection;
1270   }
1271 #endif
1272 
1273   if (gst_caps_is_any (caps)) {
1274     goto done;
1275   }
1276 
1277   if (!gst_caps_is_fixed (caps)) {
1278     g_assert (agg_klass->fixate_src_caps);
1279 
1280     GST_DEBUG_OBJECT (self, "fixate caps from %" GST_PTR_FORMAT, caps);
1281     if (!(caps = agg_klass->fixate_src_caps (self, caps))) {
1282       GST_WARNING_OBJECT (self, "Subclass failed to fixate provided caps");
1283       ret = GST_FLOW_NOT_NEGOTIATED;
1284       goto done;
1285     }
1286     GST_DEBUG_OBJECT (self, "             to %" GST_PTR_FORMAT, caps);
1287   }
1288 
1289   if (agg_klass->negotiated_src_caps) {
1290     if (!agg_klass->negotiated_src_caps (self, caps)) {
1291       GST_WARNING_OBJECT (self, "Subclass failed to accept negotiated caps");
1292       ret = GST_FLOW_NOT_NEGOTIATED;
1293       goto done;
1294     }
1295   }
1296 
1297   gst_aggregator_set_src_caps (self, caps);
1298 
1299   if (!gst_aggregator_do_allocation (self, caps)) {
1300     GST_WARNING_OBJECT (self, "Allocation negotiation failed");
1301     ret = GST_FLOW_NOT_NEGOTIATED;
1302   }
1303 
1304 done:
1305   gst_caps_unref (downstream_caps);
1306   gst_caps_unref (template_caps);
1307 
1308   if (caps)
1309     gst_caps_unref (caps);
1310 
1311   return ret >= GST_FLOW_OK || ret == GST_AGGREGATOR_FLOW_NEED_DATA;
1312 }
1313 
1314 /* WITH SRC_LOCK held */
1315 static gboolean
gst_aggregator_negotiate_unlocked(GstAggregator * self)1316 gst_aggregator_negotiate_unlocked (GstAggregator * self)
1317 {
1318   GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self);
1319 
1320   if (agg_klass->negotiate)
1321     return agg_klass->negotiate (self);
1322 
1323   return TRUE;
1324 }
1325 
1326 /**
1327  * gst_aggregator_negotiate:
1328  * @self: a #GstAggregator
1329  *
1330  * Negotiates src pad caps with downstream elements.
1331  * Unmarks GST_PAD_FLAG_NEED_RECONFIGURE in any case. But marks it again
1332  * if #GstAggregatorClass::negotiate fails.
1333  *
1334  * Returns: %TRUE if the negotiation succeeded, else %FALSE.
1335  *
1336  * Since: 1.18
1337  */
1338 gboolean
gst_aggregator_negotiate(GstAggregator * self)1339 gst_aggregator_negotiate (GstAggregator * self)
1340 {
1341   gboolean ret = TRUE;
1342 
1343   g_return_val_if_fail (GST_IS_AGGREGATOR (self), FALSE);
1344 
1345   GST_PAD_STREAM_LOCK (GST_AGGREGATOR_SRC_PAD (self));
1346   gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self));
1347   ret = gst_aggregator_negotiate_unlocked (self);
1348   if (!ret)
1349     gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self));
1350   GST_PAD_STREAM_UNLOCK (GST_AGGREGATOR_SRC_PAD (self));
1351 
1352   return ret;
1353 }
1354 
1355 static void
gst_aggregator_aggregate_func(GstAggregator * self)1356 gst_aggregator_aggregate_func (GstAggregator * self)
1357 {
1358   GstAggregatorPrivate *priv = self->priv;
1359   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1360   gboolean timeout = FALSE;
1361 
1362   if (self->priv->running == FALSE) {
1363     GST_DEBUG_OBJECT (self, "Not running anymore");
1364     return;
1365   }
1366 
1367   GST_LOG_OBJECT (self, "Checking aggregate");
1368   while (priv->send_eos && priv->running) {
1369     GstFlowReturn flow_return = GST_FLOW_OK;
1370     DoHandleEventsAndQueriesData events_query_data = { FALSE, GST_FLOW_OK };
1371 
1372     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1373         gst_aggregator_do_events_and_queries, &events_query_data);
1374 
1375     if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
1376       goto handle_error;
1377 
1378     if (self->priv->peer_latency_live)
1379       gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1380           gst_aggregator_pad_skip_buffers, NULL);
1381 
1382     /* Ensure we have buffers ready (either in clipped_buffer or at the head of
1383      * the queue */
1384     if (!gst_aggregator_wait_and_check (self, &timeout)) {
1385       gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1386           gst_aggregator_pad_reset_peeked_buffer, NULL);
1387       continue;
1388     }
1389 
1390     if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
1391       if (!gst_aggregator_negotiate_unlocked (self)) {
1392         gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self));
1393         if (GST_PAD_IS_FLUSHING (GST_AGGREGATOR_SRC_PAD (self))) {
1394           flow_return = GST_FLOW_FLUSHING;
1395         } else {
1396           flow_return = GST_FLOW_NOT_NEGOTIATED;
1397         }
1398       }
1399     }
1400 
1401     if (timeout || flow_return >= GST_FLOW_OK) {
1402       GST_LOG_OBJECT (self, "Actually aggregating, timeout: %d", timeout);
1403       flow_return = klass->aggregate (self, timeout);
1404     }
1405 
1406     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1407         gst_aggregator_pad_reset_peeked_buffer, NULL);
1408 
1409     if (!priv->selected_samples_called_or_warned) {
1410       GST_FIXME_OBJECT (self,
1411           "Subclass should call gst_aggregator_selected_samples() from its "
1412           "aggregate implementation.");
1413       priv->selected_samples_called_or_warned = TRUE;
1414     }
1415 
1416     if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA)
1417       continue;
1418 
1419     GST_OBJECT_LOCK (self);
1420     if (flow_return == GST_FLOW_FLUSHING && priv->flushing) {
1421       /* We don't want to set the pads to flushing, but we want to
1422        * stop the thread, so just break here */
1423       GST_OBJECT_UNLOCK (self);
1424       break;
1425     }
1426     GST_OBJECT_UNLOCK (self);
1427 
1428     if (flow_return == GST_FLOW_EOS || flow_return == GST_FLOW_ERROR) {
1429       gst_aggregator_push_eos (self);
1430     }
1431 
1432   handle_error:
1433     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
1434 
1435     if (flow_return != GST_FLOW_OK) {
1436       GList *item;
1437 
1438       GST_OBJECT_LOCK (self);
1439       for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
1440         GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
1441 
1442         gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE);
1443       }
1444       GST_OBJECT_UNLOCK (self);
1445       break;
1446     }
1447   }
1448 
1449   /* Pause the task here, the only ways to get here are:
1450    * 1) We're stopping, in which case the task is stopped anyway
1451    * 2) We got a flow error above, in which case it might take
1452    *    some time to forward the flow return upstream and we
1453    *    would otherwise call the task function over and over
1454    *    again without doing anything
1455    */
1456   gst_pad_pause_task (self->srcpad);
1457 }
1458 
1459 static gboolean
gst_aggregator_start(GstAggregator * self)1460 gst_aggregator_start (GstAggregator * self)
1461 {
1462   GstAggregatorClass *klass;
1463   gboolean result;
1464 
1465   self->priv->send_stream_start = TRUE;
1466   self->priv->send_segment = TRUE;
1467   self->priv->send_eos = TRUE;
1468   self->priv->srccaps = NULL;
1469 
1470   self->priv->has_peer_latency = FALSE;
1471   self->priv->peer_latency_live = FALSE;
1472   self->priv->peer_latency_min = self->priv->peer_latency_max = 0;
1473 
1474   gst_aggregator_set_allocation (self, NULL, NULL, NULL, NULL);
1475 
1476   klass = GST_AGGREGATOR_GET_CLASS (self);
1477 
1478   if (klass->start)
1479     result = klass->start (self);
1480   else
1481     result = TRUE;
1482 
1483   return result;
1484 }
1485 
1486 static gboolean
gst_aggregator_stop_srcpad_task(GstAggregator * self,GstEvent * flush_start)1487 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
1488 {
1489   gboolean res = TRUE;
1490 
1491   GST_INFO_OBJECT (self, "%s srcpad task",
1492       flush_start ? "Pausing" : "Stopping");
1493 
1494   SRC_LOCK (self);
1495   self->priv->running = FALSE;
1496   SRC_BROADCAST (self);
1497   SRC_UNLOCK (self);
1498 
1499   if (flush_start) {
1500     res = gst_pad_push_event (self->srcpad, flush_start);
1501   }
1502 
1503   gst_pad_stop_task (self->srcpad);
1504 
1505   return res;
1506 }
1507 
1508 static void
gst_aggregator_start_srcpad_task(GstAggregator * self)1509 gst_aggregator_start_srcpad_task (GstAggregator * self)
1510 {
1511   GST_INFO_OBJECT (self, "Starting srcpad task");
1512 
1513   self->priv->running = TRUE;
1514   gst_pad_start_task (GST_PAD (self->srcpad),
1515       (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
1516 }
1517 
1518 static GstFlowReturn
gst_aggregator_flush(GstAggregator * self)1519 gst_aggregator_flush (GstAggregator * self)
1520 {
1521   GstFlowReturn ret = GST_FLOW_OK;
1522   GstAggregatorPrivate *priv = self->priv;
1523   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1524 
1525   GST_DEBUG_OBJECT (self, "Flushing everything");
1526   GST_OBJECT_LOCK (self);
1527   priv->send_segment = TRUE;
1528   priv->flushing = FALSE;
1529   priv->tags_changed = FALSE;
1530   GST_OBJECT_UNLOCK (self);
1531   if (klass->flush)
1532     ret = klass->flush (self);
1533 
1534   return ret;
1535 }
1536 
1537 
1538 /* Called with GstAggregator's object lock held */
1539 
1540 static gboolean
gst_aggregator_all_flush_stop_received(GstAggregator * self,guint32 seqnum)1541 gst_aggregator_all_flush_stop_received (GstAggregator * self, guint32 seqnum)
1542 {
1543   GList *tmp;
1544   GstAggregatorPad *tmppad;
1545 
1546   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
1547     tmppad = (GstAggregatorPad *) tmp->data;
1548 
1549     if (tmppad->priv->last_flush_stop_seqnum != seqnum)
1550       return FALSE;
1551   }
1552 
1553   return TRUE;
1554 }
1555 
1556 /* Called with GstAggregator's object lock held */
1557 
1558 static gboolean
gst_aggregator_all_flush_start_received(GstAggregator * self,guint32 seqnum)1559 gst_aggregator_all_flush_start_received (GstAggregator * self, guint32 seqnum)
1560 {
1561   GList *tmp;
1562   GstAggregatorPad *tmppad;
1563 
1564   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
1565     tmppad = (GstAggregatorPad *) tmp->data;
1566 
1567     if (tmppad->priv->last_flush_start_seqnum != seqnum) {
1568       return FALSE;
1569     }
1570   }
1571 
1572   return TRUE;
1573 }
1574 
1575 static void
gst_aggregator_flush_start(GstAggregator * self,GstAggregatorPad * aggpad,GstEvent * event)1576 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
1577     GstEvent * event)
1578 {
1579   GstAggregatorPrivate *priv = self->priv;
1580   GstAggregatorPadPrivate *padpriv = aggpad->priv;
1581   guint32 seqnum = gst_event_get_seqnum (event);
1582 
1583   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE);
1584 
1585   PAD_FLUSH_LOCK (aggpad);
1586   PAD_LOCK (aggpad);
1587   padpriv->last_flush_start_seqnum = seqnum;
1588   PAD_UNLOCK (aggpad);
1589 
1590   GST_OBJECT_LOCK (self);
1591 
1592   if (!priv->flushing && gst_aggregator_all_flush_start_received (self, seqnum)) {
1593     /* Make sure we don't forward more than one FLUSH_START */
1594     priv->flushing = TRUE;
1595     priv->next_seqnum = seqnum;
1596     GST_OBJECT_UNLOCK (self);
1597 
1598     GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
1599     gst_aggregator_stop_srcpad_task (self, event);
1600 
1601     event = NULL;
1602   } else {
1603     gst_event_unref (event);
1604     GST_OBJECT_UNLOCK (self);
1605   }
1606 
1607   PAD_FLUSH_UNLOCK (aggpad);
1608 }
1609 
1610 /* Must be called with the the PAD_LOCK held */
1611 static void
update_time_level(GstAggregatorPad * aggpad,gboolean head)1612 update_time_level (GstAggregatorPad * aggpad, gboolean head)
1613 {
1614   GstAggregatorPadPrivate *priv = aggpad->priv;
1615 
1616   if (head) {
1617     if (GST_CLOCK_TIME_IS_VALID (priv->head_position) &&
1618         priv->head_segment.format == GST_FORMAT_TIME)
1619       priv->head_time = gst_segment_to_running_time (&priv->head_segment,
1620           GST_FORMAT_TIME, priv->head_position);
1621     else
1622       priv->head_time = GST_CLOCK_TIME_NONE;
1623 
1624     if (!GST_CLOCK_TIME_IS_VALID (priv->tail_time))
1625       priv->tail_time = priv->head_time;
1626   } else {
1627     if (GST_CLOCK_TIME_IS_VALID (priv->tail_position) &&
1628         aggpad->segment.format == GST_FORMAT_TIME)
1629       priv->tail_time = gst_segment_to_running_time (&aggpad->segment,
1630           GST_FORMAT_TIME, priv->tail_position);
1631     else
1632       priv->tail_time = priv->head_time;
1633   }
1634 
1635   if (priv->head_time == GST_CLOCK_TIME_NONE ||
1636       priv->tail_time == GST_CLOCK_TIME_NONE) {
1637     priv->time_level = 0;
1638     return;
1639   }
1640 
1641   if (priv->tail_time > priv->head_time)
1642     priv->time_level = 0;
1643   else
1644     priv->time_level = priv->head_time - priv->tail_time;
1645 }
1646 
1647 
1648 /* GstAggregator vmethods default implementations */
1649 static gboolean
gst_aggregator_default_sink_event(GstAggregator * self,GstAggregatorPad * aggpad,GstEvent * event)1650 gst_aggregator_default_sink_event (GstAggregator * self,
1651     GstAggregatorPad * aggpad, GstEvent * event)
1652 {
1653   gboolean res = TRUE;
1654   GstPad *pad = GST_PAD (aggpad);
1655   GstAggregatorPrivate *priv = self->priv;
1656 
1657   GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event);
1658 
1659   switch (GST_EVENT_TYPE (event)) {
1660     case GST_EVENT_FLUSH_START:
1661     {
1662       gst_aggregator_flush_start (self, aggpad, event);
1663       /* We forward only in one case: right after flushing */
1664       event = NULL;
1665       goto eat;
1666     }
1667     case GST_EVENT_FLUSH_STOP:
1668     {
1669       guint32 seqnum = gst_event_get_seqnum (event);
1670 
1671       PAD_FLUSH_LOCK (aggpad);
1672       PAD_LOCK (aggpad);
1673       aggpad->priv->last_flush_stop_seqnum = seqnum;
1674       PAD_UNLOCK (aggpad);
1675 
1676       gst_aggregator_pad_flush (aggpad, self);
1677 
1678       GST_OBJECT_LOCK (self);
1679       if (priv->flushing
1680           && gst_aggregator_all_flush_stop_received (self, seqnum)) {
1681         GST_OBJECT_UNLOCK (self);
1682         /* That means we received FLUSH_STOP/FLUSH_STOP on
1683          * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
1684         gst_aggregator_flush (self);
1685         gst_pad_push_event (self->srcpad, event);
1686         event = NULL;
1687         SRC_LOCK (self);
1688         priv->send_eos = TRUE;
1689         SRC_BROADCAST (self);
1690         SRC_UNLOCK (self);
1691 
1692         GST_INFO_OBJECT (self, "Flush stopped");
1693 
1694         gst_aggregator_start_srcpad_task (self);
1695       } else {
1696         GST_OBJECT_UNLOCK (self);
1697       }
1698 
1699       PAD_FLUSH_UNLOCK (aggpad);
1700 
1701       /* We never forward the event */
1702       goto eat;
1703     }
1704     case GST_EVENT_EOS:
1705     {
1706       SRC_LOCK (self);
1707       PAD_LOCK (aggpad);
1708       g_assert (aggpad->priv->num_buffers == 0);
1709       aggpad->priv->eos = TRUE;
1710       PAD_UNLOCK (aggpad);
1711       SRC_BROADCAST (self);
1712       SRC_UNLOCK (self);
1713       goto eat;
1714     }
1715     case GST_EVENT_SEGMENT:
1716     {
1717       PAD_LOCK (aggpad);
1718       GST_OBJECT_LOCK (aggpad);
1719       gst_event_copy_segment (event, &aggpad->segment);
1720       /* We've got a new segment, tail_position is now meaningless
1721        * and may interfere with the time_level calculation
1722        */
1723       aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
1724       update_time_level (aggpad, FALSE);
1725       GST_OBJECT_UNLOCK (aggpad);
1726       PAD_UNLOCK (aggpad);
1727 
1728       GST_OBJECT_LOCK (self);
1729       self->priv->seqnum = gst_event_get_seqnum (event);
1730       GST_OBJECT_UNLOCK (self);
1731       goto eat;
1732     }
1733     case GST_EVENT_STREAM_START:
1734     {
1735       goto eat;
1736     }
1737     case GST_EVENT_GAP:
1738     {
1739       GstClockTime pts, endpts;
1740       GstClockTime duration;
1741       GstBuffer *gapbuf;
1742 
1743       gst_event_parse_gap (event, &pts, &duration);
1744 
1745       if (GST_CLOCK_TIME_IS_VALID (duration))
1746         endpts = pts + duration;
1747       else
1748         endpts = GST_CLOCK_TIME_NONE;
1749 
1750       GST_OBJECT_LOCK (aggpad);
1751       res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts,
1752           &pts, &endpts);
1753       GST_OBJECT_UNLOCK (aggpad);
1754 
1755       if (!res) {
1756         GST_WARNING_OBJECT (self, "GAP event outside segment, dropping");
1757         goto eat;
1758       }
1759 
1760       if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts))
1761         duration = endpts - pts;
1762       else
1763         duration = GST_CLOCK_TIME_NONE;
1764 
1765       gapbuf = gst_buffer_new ();
1766       GST_BUFFER_PTS (gapbuf) = pts;
1767       GST_BUFFER_DURATION (gapbuf) = duration;
1768       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
1769       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
1770 
1771       /* Remove GAP event so we can replace it with the buffer */
1772       PAD_LOCK (aggpad);
1773       if (g_queue_peek_tail (&aggpad->priv->data) == event)
1774         gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
1775       PAD_UNLOCK (aggpad);
1776 
1777       if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
1778           GST_FLOW_OK) {
1779         GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
1780         res = FALSE;
1781       }
1782 
1783       goto eat;
1784     }
1785     case GST_EVENT_TAG:
1786       goto eat;
1787     default:
1788     {
1789       break;
1790     }
1791   }
1792 
1793   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
1794   return gst_pad_event_default (pad, GST_OBJECT (self), event);
1795 
1796 eat:
1797   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
1798   if (event)
1799     gst_event_unref (event);
1800 
1801   return res;
1802 }
1803 
1804 /* Queue serialized events and let the others go through directly.
1805  * The queued events with be handled from the src-pad task in
1806  * gst_aggregator_do_events_and_queries().
1807  */
1808 static GstFlowReturn
gst_aggregator_default_sink_event_pre_queue(GstAggregator * self,GstAggregatorPad * aggpad,GstEvent * event)1809 gst_aggregator_default_sink_event_pre_queue (GstAggregator * self,
1810     GstAggregatorPad * aggpad, GstEvent * event)
1811 {
1812   GstFlowReturn ret = GST_FLOW_OK;
1813 
1814   if (GST_EVENT_IS_SERIALIZED (event)
1815       && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
1816     SRC_LOCK (self);
1817     PAD_LOCK (aggpad);
1818 
1819     if (aggpad->priv->flow_return != GST_FLOW_OK)
1820       goto flushing;
1821 
1822     if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
1823       GST_OBJECT_LOCK (aggpad);
1824       gst_event_copy_segment (event, &aggpad->priv->head_segment);
1825       aggpad->priv->head_position = aggpad->priv->head_segment.position;
1826       update_time_level (aggpad, TRUE);
1827       GST_OBJECT_UNLOCK (aggpad);
1828     }
1829 
1830     GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event);
1831     g_queue_push_head (&aggpad->priv->data, event);
1832     SRC_BROADCAST (self);
1833     PAD_UNLOCK (aggpad);
1834     SRC_UNLOCK (self);
1835   } else {
1836     GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1837 
1838     if (!klass->sink_event (self, aggpad, event)) {
1839       /* Copied from GstPad to convert boolean to a GstFlowReturn in
1840        * the event handling func */
1841       ret = GST_FLOW_ERROR;
1842     }
1843   }
1844 
1845   return ret;
1846 
1847 flushing:
1848   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
1849       gst_flow_get_name (aggpad->priv->flow_return));
1850   PAD_UNLOCK (aggpad);
1851   SRC_UNLOCK (self);
1852   if (GST_EVENT_IS_STICKY (event))
1853     gst_pad_store_sticky_event (GST_PAD (aggpad), event);
1854   gst_event_unref (event);
1855 
1856   return aggpad->priv->flow_return;
1857 }
1858 
1859 static gboolean
gst_aggregator_stop_pad(GstElement * self,GstPad * epad,gpointer user_data)1860 gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data)
1861 {
1862   GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
1863   GstAggregator *agg = GST_AGGREGATOR_CAST (self);
1864 
1865   gst_aggregator_pad_flush (pad, agg);
1866 
1867   PAD_LOCK (pad);
1868   pad->priv->flow_return = GST_FLOW_FLUSHING;
1869   pad->priv->negotiated = FALSE;
1870   PAD_BROADCAST_EVENT (pad);
1871   PAD_UNLOCK (pad);
1872 
1873   return TRUE;
1874 }
1875 
1876 static gboolean
gst_aggregator_stop(GstAggregator * agg)1877 gst_aggregator_stop (GstAggregator * agg)
1878 {
1879   GstAggregatorClass *klass;
1880   gboolean result;
1881 
1882   gst_aggregator_reset_flow_values (agg);
1883 
1884   /* Application needs to make sure no pads are added while it shuts us down */
1885   gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg),
1886       gst_aggregator_stop_pad, NULL);
1887 
1888   klass = GST_AGGREGATOR_GET_CLASS (agg);
1889 
1890   if (klass->stop)
1891     result = klass->stop (agg);
1892   else
1893     result = TRUE;
1894 
1895   agg->priv->has_peer_latency = FALSE;
1896   agg->priv->peer_latency_live = FALSE;
1897   agg->priv->peer_latency_min = agg->priv->peer_latency_max = 0;
1898 
1899   if (agg->priv->tags)
1900     gst_tag_list_unref (agg->priv->tags);
1901   agg->priv->tags = NULL;
1902 
1903   gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL);
1904 
1905   if (agg->priv->running) {
1906     /* As sinkpads get deactivated after the src pad, we
1907      * may have restarted the source pad task after receiving
1908      * flush events on one of our sinkpads. Stop our src pad
1909      * task again if that is the case */
1910     gst_aggregator_stop_srcpad_task (agg, NULL);
1911   }
1912 
1913   return result;
1914 }
1915 
1916 /* GstElement vmethods implementations */
1917 static GstStateChangeReturn
gst_aggregator_change_state(GstElement * element,GstStateChange transition)1918 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
1919 {
1920   GstStateChangeReturn ret;
1921   GstAggregator *self = GST_AGGREGATOR (element);
1922 
1923   switch (transition) {
1924     case GST_STATE_CHANGE_READY_TO_PAUSED:
1925       if (!gst_aggregator_start (self))
1926         goto error_start;
1927       break;
1928     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1929       /* Wake up any waiting as now we have a clock and can do
1930        * proper waiting on the clock if necessary */
1931       SRC_LOCK (self);
1932       SRC_BROADCAST (self);
1933       SRC_UNLOCK (self);
1934       break;
1935     default:
1936       break;
1937   }
1938 
1939   if ((ret =
1940           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
1941               transition)) == GST_STATE_CHANGE_FAILURE)
1942     goto failure;
1943 
1944 
1945   switch (transition) {
1946     case GST_STATE_CHANGE_PAUSED_TO_READY:
1947       if (!gst_aggregator_stop (self)) {
1948         /* What to do in this case? Error out? */
1949         GST_ERROR_OBJECT (self, "Subclass failed to stop.");
1950       }
1951       break;
1952     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1953       /* Wake up any waiting as now clock might be gone and we might
1954        * need to wait on the condition variable again */
1955       SRC_LOCK (self);
1956       SRC_BROADCAST (self);
1957       SRC_UNLOCK (self);
1958       break;
1959     default:
1960       break;
1961   }
1962 
1963   return ret;
1964 
1965 /* ERRORS */
1966 failure:
1967   {
1968     GST_ERROR_OBJECT (element, "parent failed state change");
1969     return ret;
1970   }
1971 error_start:
1972   {
1973     GST_ERROR_OBJECT (element, "Subclass failed to start");
1974     return GST_STATE_CHANGE_FAILURE;
1975   }
1976 }
1977 
1978 static void
gst_aggregator_release_pad(GstElement * element,GstPad * pad)1979 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
1980 {
1981   GstAggregator *self = GST_AGGREGATOR (element);
1982   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1983 
1984   GST_INFO_OBJECT (pad, "Removing pad");
1985 
1986   SRC_LOCK (self);
1987   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
1988   gst_buffer_replace (&aggpad->priv->peeked_buffer, NULL);
1989   gst_element_remove_pad (element, pad);
1990 
1991   self->priv->has_peer_latency = FALSE;
1992   SRC_BROADCAST (self);
1993   SRC_UNLOCK (self);
1994 }
1995 
1996 static GstAggregatorPad *
gst_aggregator_default_create_new_pad(GstAggregator * self,GstPadTemplate * templ,const gchar * req_name,const GstCaps * caps)1997 gst_aggregator_default_create_new_pad (GstAggregator * self,
1998     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1999 {
2000   GstAggregatorPad *agg_pad;
2001   GstAggregatorPrivate *priv = self->priv;
2002   gint serial = 0;
2003   gchar *name = NULL;
2004   GType pad_type =
2005       GST_PAD_TEMPLATE_GTYPE (templ) ==
2006       G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ);
2007 
2008   if (templ->direction != GST_PAD_SINK)
2009     goto not_sink;
2010 
2011   if (templ->presence != GST_PAD_REQUEST)
2012     goto not_request;
2013 
2014   GST_OBJECT_LOCK (self);
2015   if (req_name == NULL || strlen (req_name) < 6
2016       || !g_str_has_prefix (req_name, "sink_")
2017       || strrchr (req_name, '%') != NULL) {
2018     /* no name given when requesting the pad, use next available int */
2019     serial = ++priv->max_padserial;
2020   } else {
2021     gchar *endptr = NULL;
2022 
2023     /* parse serial number from requested padname */
2024     serial = g_ascii_strtoull (&req_name[5], &endptr, 10);
2025     if (endptr != NULL && *endptr == '\0') {
2026       if (serial > priv->max_padserial) {
2027         priv->max_padserial = serial;
2028       }
2029     } else {
2030       serial = ++priv->max_padserial;
2031     }
2032   }
2033 
2034   name = g_strdup_printf ("sink_%u", serial);
2035   g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD));
2036   agg_pad = g_object_new (pad_type,
2037       "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
2038   g_free (name);
2039 
2040   GST_OBJECT_UNLOCK (self);
2041 
2042   return agg_pad;
2043 
2044   /* errors */
2045 not_sink:
2046   {
2047     GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad");
2048     return NULL;
2049   }
2050 not_request:
2051   {
2052     GST_WARNING_OBJECT (self, "request new pad that is not a REQUEST pad");
2053     return NULL;
2054   }
2055 }
2056 
2057 static GstPad *
gst_aggregator_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * req_name,const GstCaps * caps)2058 gst_aggregator_request_new_pad (GstElement * element,
2059     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
2060 {
2061   GstAggregator *self;
2062   GstAggregatorPad *agg_pad;
2063   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (element);
2064   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
2065 
2066   self = GST_AGGREGATOR (element);
2067 
2068   agg_pad = klass->create_new_pad (self, templ, req_name, caps);
2069   if (!agg_pad) {
2070     GST_ERROR_OBJECT (element, "Couldn't create new pad");
2071     return NULL;
2072   }
2073 
2074   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
2075 
2076   if (priv->running)
2077     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
2078 
2079   /* add the pad to the element */
2080   gst_element_add_pad (element, GST_PAD (agg_pad));
2081 
2082   return GST_PAD (agg_pad);
2083 }
2084 
2085 /* Must be called with SRC_LOCK held, temporarily releases it! */
2086 
2087 static gboolean
gst_aggregator_query_latency_unlocked(GstAggregator * self,GstQuery * query)2088 gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
2089 {
2090   gboolean query_ret, live;
2091   GstClockTime our_latency, min, max;
2092 
2093   /* Temporarily release the lock to do the query. */
2094   SRC_UNLOCK (self);
2095   query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
2096   SRC_LOCK (self);
2097 
2098   if (!query_ret) {
2099     GST_WARNING_OBJECT (self, "Latency query failed");
2100     return FALSE;
2101   }
2102 
2103   gst_query_parse_latency (query, &live, &min, &max);
2104 
2105   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
2106     GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
2107         ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
2108     return FALSE;
2109   }
2110 
2111   if (self->priv->upstream_latency_min > min) {
2112     GstClockTimeDiff diff =
2113         GST_CLOCK_DIFF (min, self->priv->upstream_latency_min);
2114 
2115     min += diff;
2116     if (GST_CLOCK_TIME_IS_VALID (max)) {
2117       max += diff;
2118     }
2119   }
2120 
2121   if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
2122     SRC_UNLOCK (self);
2123     GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
2124         ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
2125             GST_TIME_FORMAT ". Add queues or other buffering elements.",
2126             GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
2127     SRC_LOCK (self);
2128     return FALSE;
2129   }
2130 
2131   our_latency = self->priv->latency;
2132 
2133   self->priv->peer_latency_live = live;
2134   self->priv->peer_latency_min = min;
2135   self->priv->peer_latency_max = max;
2136   self->priv->has_peer_latency = TRUE;
2137 
2138   /* add our own */
2139   min += our_latency;
2140   min += self->priv->sub_latency_min;
2141   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
2142       && GST_CLOCK_TIME_IS_VALID (max))
2143     max += self->priv->sub_latency_max + our_latency;
2144   else
2145     max = GST_CLOCK_TIME_NONE;
2146 
2147   SRC_BROADCAST (self);
2148 
2149   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
2150       " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
2151 
2152   gst_query_set_latency (query, live, min, max);
2153 
2154   return query_ret;
2155 }
2156 
2157 /*
2158  * MUST be called with the src_lock held. Temporarily releases the lock inside
2159  * gst_aggregator_query_latency_unlocked() to do the actual query!
2160  *
2161  * See  gst_aggregator_get_latency() for doc
2162  */
2163 static GstClockTime
gst_aggregator_get_latency_unlocked(GstAggregator * self)2164 gst_aggregator_get_latency_unlocked (GstAggregator * self)
2165 {
2166   GstClockTime latency;
2167 
2168   g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0);
2169 
2170   if (!self->priv->has_peer_latency) {
2171     GstQuery *query = gst_query_new_latency ();
2172     gboolean ret;
2173 
2174     ret = gst_aggregator_query_latency_unlocked (self, query);
2175     gst_query_unref (query);
2176     if (!ret)
2177       return GST_CLOCK_TIME_NONE;
2178   }
2179 
2180   if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
2181     return GST_CLOCK_TIME_NONE;
2182 
2183   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
2184   latency = self->priv->peer_latency_min;
2185 
2186   /* add our own */
2187   latency += self->priv->latency;
2188   latency += self->priv->sub_latency_min;
2189 
2190   return latency;
2191 }
2192 
2193 /**
2194  * gst_aggregator_get_latency:
2195  * @self: a #GstAggregator
2196  *
2197  * Retrieves the latency values reported by @self in response to the latency
2198  * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element
2199  * will not wait for the clock.
2200  *
2201  * Typically only called by subclasses.
2202  *
2203  * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync
2204  */
2205 GstClockTime
gst_aggregator_get_latency(GstAggregator * self)2206 gst_aggregator_get_latency (GstAggregator * self)
2207 {
2208   GstClockTime ret;
2209 
2210   SRC_LOCK (self);
2211   ret = gst_aggregator_get_latency_unlocked (self);
2212   SRC_UNLOCK (self);
2213 
2214   return ret;
2215 }
2216 
2217 static gboolean
gst_aggregator_send_event(GstElement * element,GstEvent * event)2218 gst_aggregator_send_event (GstElement * element, GstEvent * event)
2219 {
2220   GstAggregator *self = GST_AGGREGATOR (element);
2221 
2222   GST_STATE_LOCK (element);
2223   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
2224       GST_STATE (element) < GST_STATE_PAUSED) {
2225     gdouble rate;
2226     GstFormat fmt;
2227     GstSeekFlags flags;
2228     GstSeekType start_type, stop_type;
2229     gint64 start, stop;
2230 
2231     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
2232         &start, &stop_type, &stop);
2233 
2234     GST_OBJECT_LOCK (self);
2235     gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
2236         flags, start_type, start, stop_type, stop, NULL);
2237     self->priv->next_seqnum = gst_event_get_seqnum (event);
2238     self->priv->first_buffer = FALSE;
2239     GST_OBJECT_UNLOCK (self);
2240 
2241     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
2242   }
2243   GST_STATE_UNLOCK (element);
2244 
2245   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
2246       event);
2247 }
2248 
2249 static gboolean
gst_aggregator_default_src_query(GstAggregator * self,GstQuery * query)2250 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
2251 {
2252   gboolean res = TRUE;
2253 
2254   switch (GST_QUERY_TYPE (query)) {
2255     case GST_QUERY_SEEKING:
2256     {
2257       GstFormat format;
2258 
2259       /* don't pass it along as some (file)sink might claim it does
2260        * whereas with a collectpads in between that will not likely work */
2261       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
2262       gst_query_set_seeking (query, format, FALSE, 0, -1);
2263       res = TRUE;
2264 
2265       break;
2266     }
2267     case GST_QUERY_LATENCY:
2268       SRC_LOCK (self);
2269       res = gst_aggregator_query_latency_unlocked (self, query);
2270       SRC_UNLOCK (self);
2271       break;
2272     default:
2273       return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
2274   }
2275 
2276   return res;
2277 }
2278 
2279 static gboolean
gst_aggregator_event_forward_func(GstPad * pad,gpointer user_data)2280 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
2281 {
2282   EventData *evdata = user_data;
2283   gboolean ret = TRUE;
2284   GstPad *peer = gst_pad_get_peer (pad);
2285   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2286 
2287   if (peer) {
2288     if (evdata->only_to_active_pads && aggpad->priv->first_buffer) {
2289       GST_DEBUG_OBJECT (pad, "not sending event to inactive pad");
2290       ret = TRUE;
2291     } else {
2292       ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
2293       GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
2294     }
2295   }
2296 
2297   if (ret == FALSE) {
2298     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
2299       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
2300 
2301       GST_DEBUG_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
2302 
2303       if (gst_pad_query (peer, seeking)) {
2304         gboolean seekable;
2305 
2306         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
2307 
2308         if (seekable == FALSE) {
2309           GST_INFO_OBJECT (pad,
2310               "Source not seekable, We failed but it does not matter!");
2311 
2312           ret = TRUE;
2313         }
2314       } else {
2315         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
2316       }
2317 
2318       gst_query_unref (seeking);
2319     }
2320   } else {
2321     evdata->one_actually_seeked = TRUE;
2322   }
2323 
2324   evdata->result &= ret;
2325 
2326   if (peer)
2327     gst_object_unref (peer);
2328 
2329   /* Always send to all pads */
2330   return FALSE;
2331 }
2332 
2333 static void
gst_aggregator_forward_event_to_all_sinkpads(GstAggregator * self,EventData * evdata)2334 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
2335     EventData * evdata)
2336 {
2337   evdata->result = TRUE;
2338   evdata->one_actually_seeked = FALSE;
2339 
2340   gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata);
2341 
2342   gst_event_unref (evdata->event);
2343 }
2344 
2345 static gboolean
gst_aggregator_do_seek(GstAggregator * self,GstEvent * event)2346 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
2347 {
2348   gdouble rate;
2349   GstFormat fmt;
2350   GstSeekFlags flags;
2351   GstSeekType start_type, stop_type;
2352   gint64 start, stop;
2353   gboolean flush;
2354   EventData evdata = { 0, };
2355   GstAggregatorPrivate *priv = self->priv;
2356 
2357   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
2358       &start, &stop_type, &stop);
2359 
2360   GST_INFO_OBJECT (self, "starting SEEK");
2361 
2362   flush = flags & GST_SEEK_FLAG_FLUSH;
2363 
2364   GST_OBJECT_LOCK (self);
2365 
2366   if (gst_event_get_seqnum (event) == self->priv->next_seqnum) {
2367     evdata.result = TRUE;
2368     GST_DEBUG_OBJECT (self, "Dropping duplicated seek event with seqnum %d",
2369         self->priv->next_seqnum);
2370     GST_OBJECT_UNLOCK (self);
2371     goto done;
2372   }
2373 
2374   self->priv->next_seqnum = gst_event_get_seqnum (event);
2375 
2376   gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
2377       flags, start_type, start, stop_type, stop, NULL);
2378 
2379   /* Seeking sets a position */
2380   self->priv->first_buffer = FALSE;
2381 
2382   if (flush)
2383     priv->flushing = TRUE;
2384 
2385   GST_OBJECT_UNLOCK (self);
2386 
2387   if (flush) {
2388     GstEvent *event = gst_event_new_flush_start ();
2389 
2390     gst_event_set_seqnum (event, self->priv->next_seqnum);
2391     gst_aggregator_stop_srcpad_task (self, event);
2392   }
2393 
2394   /* forward the seek upstream */
2395   evdata.event = event;
2396   evdata.flush = flush;
2397   evdata.only_to_active_pads = FALSE;
2398   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2399   event = NULL;
2400 
2401   if (!evdata.result || !evdata.one_actually_seeked) {
2402     GST_OBJECT_LOCK (self);
2403     priv->flushing = FALSE;
2404     GST_OBJECT_UNLOCK (self);
2405 
2406     /* No flush stop is inbound for us to forward */
2407     if (flush) {
2408       GstEvent *event = gst_event_new_flush_stop (TRUE);
2409 
2410       gst_event_set_seqnum (event, self->priv->next_seqnum);
2411       gst_pad_push_event (self->srcpad, event);
2412     }
2413   }
2414 
2415 done:
2416   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
2417 
2418   return evdata.result;
2419 }
2420 
2421 static gboolean
gst_aggregator_default_src_event(GstAggregator * self,GstEvent * event)2422 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
2423 {
2424   EventData evdata = { 0, };
2425 
2426   switch (GST_EVENT_TYPE (event)) {
2427     case GST_EVENT_SEEK:
2428       /* _do_seek() unrefs the event. */
2429       return gst_aggregator_do_seek (self, event);
2430     case GST_EVENT_NAVIGATION:
2431       /* navigation is rather pointless. */
2432       gst_event_unref (event);
2433       return FALSE;
2434     case GST_EVENT_RECONFIGURE:
2435       /* We will renegotiate with downstream, we don't
2436        * need to forward this further */
2437       gst_event_unref (event);
2438       return TRUE;
2439     default:
2440       break;
2441   }
2442 
2443   /* Don't forward QOS events to pads that had no active buffer yet. Otherwise
2444    * they will receive a QOS event that has earliest_time=0 (because we can't
2445    * have negative timestamps), and consider their buffer as too late */
2446   evdata.event = event;
2447   evdata.flush = FALSE;
2448   evdata.only_to_active_pads = GST_EVENT_TYPE (event) == GST_EVENT_QOS;
2449   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2450   return evdata.result;
2451 }
2452 
2453 static gboolean
gst_aggregator_src_pad_event_func(GstPad * pad,GstObject * parent,GstEvent * event)2454 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
2455     GstEvent * event)
2456 {
2457   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2458 
2459   return klass->src_event (GST_AGGREGATOR (parent), event);
2460 }
2461 
2462 static gboolean
gst_aggregator_src_pad_query_func(GstPad * pad,GstObject * parent,GstQuery * query)2463 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
2464     GstQuery * query)
2465 {
2466   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2467 
2468   return klass->src_query (GST_AGGREGATOR (parent), query);
2469 }
2470 
2471 static gboolean
gst_aggregator_src_pad_activate_mode_func(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)2472 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
2473     GstObject * parent, GstPadMode mode, gboolean active)
2474 {
2475   GstAggregator *self = GST_AGGREGATOR (parent);
2476   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2477 
2478   if (klass->src_activate) {
2479     if (klass->src_activate (self, mode, active) == FALSE) {
2480       return FALSE;
2481     }
2482   }
2483 
2484   if (active == TRUE) {
2485     switch (mode) {
2486       case GST_PAD_MODE_PUSH:
2487       {
2488         GST_INFO_OBJECT (pad, "Activating pad!");
2489         gst_aggregator_start_srcpad_task (self);
2490         return TRUE;
2491       }
2492       default:
2493       {
2494         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
2495         return FALSE;
2496       }
2497     }
2498   }
2499 
2500   /* deactivating */
2501   GST_INFO_OBJECT (self, "Deactivating srcpad");
2502 
2503   gst_aggregator_stop_srcpad_task (self, FALSE);
2504 
2505   return TRUE;
2506 }
2507 
2508 static gboolean
gst_aggregator_default_sink_query(GstAggregator * self,GstAggregatorPad * aggpad,GstQuery * query)2509 gst_aggregator_default_sink_query (GstAggregator * self,
2510     GstAggregatorPad * aggpad, GstQuery * query)
2511 {
2512   GstPad *pad = GST_PAD (aggpad);
2513 
2514   if (GST_QUERY_TYPE (query) == GST_QUERY_ALLOCATION) {
2515     GstQuery *decide_query = NULL;
2516     GstAggregatorClass *agg_class;
2517     gboolean ret;
2518 
2519     GST_OBJECT_LOCK (self);
2520     PAD_LOCK (aggpad);
2521     if (G_UNLIKELY (!aggpad->priv->negotiated)) {
2522       GST_DEBUG_OBJECT (self,
2523           "not negotiated yet, can't answer ALLOCATION query");
2524       PAD_UNLOCK (aggpad);
2525       GST_OBJECT_UNLOCK (self);
2526 
2527       return FALSE;
2528     }
2529 
2530     if ((decide_query = self->priv->allocation_query))
2531       gst_query_ref (decide_query);
2532     PAD_UNLOCK (aggpad);
2533     GST_OBJECT_UNLOCK (self);
2534 
2535     GST_DEBUG_OBJECT (self,
2536         "calling propose allocation with query %" GST_PTR_FORMAT, decide_query);
2537 
2538     agg_class = GST_AGGREGATOR_GET_CLASS (self);
2539 
2540     /* pass the query to the propose_allocation vmethod if any */
2541     if (agg_class->propose_allocation)
2542       ret = agg_class->propose_allocation (self, aggpad, decide_query, query);
2543     else
2544       ret = FALSE;
2545 
2546     if (decide_query)
2547       gst_query_unref (decide_query);
2548 
2549     GST_DEBUG_OBJECT (self, "ALLOCATION ret %d, %" GST_PTR_FORMAT, ret, query);
2550     return ret;
2551   }
2552 
2553   return gst_pad_query_default (pad, GST_OBJECT (self), query);
2554 }
2555 
2556 static gboolean
gst_aggregator_default_sink_query_pre_queue(GstAggregator * self,GstAggregatorPad * aggpad,GstQuery * query)2557 gst_aggregator_default_sink_query_pre_queue (GstAggregator * self,
2558     GstAggregatorPad * aggpad, GstQuery * query)
2559 {
2560   if (GST_QUERY_IS_SERIALIZED (query)) {
2561     GstStructure *s;
2562     gboolean ret = FALSE;
2563 
2564     SRC_LOCK (self);
2565     PAD_LOCK (aggpad);
2566 
2567     if (aggpad->priv->flow_return != GST_FLOW_OK) {
2568       SRC_UNLOCK (self);
2569       goto flushing;
2570     }
2571 
2572     g_queue_push_head (&aggpad->priv->data, query);
2573     SRC_BROADCAST (self);
2574     SRC_UNLOCK (self);
2575 
2576     while (!gst_aggregator_pad_queue_is_empty (aggpad)
2577         && aggpad->priv->flow_return == GST_FLOW_OK) {
2578       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2579       PAD_WAIT_EVENT (aggpad);
2580     }
2581 
2582     s = gst_query_writable_structure (query);
2583     if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
2584       gst_structure_remove_field (s, "gst-aggregator-retval");
2585     else
2586       g_queue_remove (&aggpad->priv->data, query);
2587 
2588     if (aggpad->priv->flow_return != GST_FLOW_OK)
2589       goto flushing;
2590 
2591     PAD_UNLOCK (aggpad);
2592 
2593     return ret;
2594   } else {
2595     GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
2596 
2597     return klass->sink_query (self, aggpad, query);
2598   }
2599 
2600 flushing:
2601   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
2602       gst_flow_get_name (aggpad->priv->flow_return));
2603   PAD_UNLOCK (aggpad);
2604 
2605   return FALSE;
2606 }
2607 
2608 static void
gst_aggregator_finalize(GObject * object)2609 gst_aggregator_finalize (GObject * object)
2610 {
2611   GstAggregator *self = (GstAggregator *) object;
2612 
2613   g_mutex_clear (&self->priv->src_lock);
2614   g_cond_clear (&self->priv->src_cond);
2615 
2616   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
2617 }
2618 
2619 /*
2620  * gst_aggregator_set_latency_property:
2621  * @agg: a #GstAggregator
2622  * @latency: the new latency value (in nanoseconds).
2623  *
2624  * Sets the new latency value to @latency. This value is used to limit the
2625  * amount of time a pad waits for data to appear before considering the pad
2626  * as unresponsive.
2627  */
2628 static void
gst_aggregator_set_latency_property(GstAggregator * self,GstClockTime latency)2629 gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency)
2630 {
2631   gboolean changed;
2632 
2633   g_return_if_fail (GST_IS_AGGREGATOR (self));
2634   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
2635 
2636   SRC_LOCK (self);
2637   changed = (self->priv->latency != latency);
2638 
2639   if (changed) {
2640     GList *item;
2641 
2642     GST_OBJECT_LOCK (self);
2643     /* First lock all the pads */
2644     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2645       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2646       PAD_LOCK (aggpad);
2647     }
2648 
2649     self->priv->latency = latency;
2650 
2651     SRC_BROADCAST (self);
2652 
2653     /* Now wake up the pads */
2654     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2655       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2656       PAD_BROADCAST_EVENT (aggpad);
2657       PAD_UNLOCK (aggpad);
2658     }
2659     GST_OBJECT_UNLOCK (self);
2660   }
2661 
2662   SRC_UNLOCK (self);
2663 
2664   if (changed)
2665     gst_element_post_message (GST_ELEMENT_CAST (self),
2666         gst_message_new_latency (GST_OBJECT_CAST (self)));
2667 }
2668 
2669 /*
2670  * gst_aggregator_get_latency_property:
2671  * @agg: a #GstAggregator
2672  *
2673  * Gets the latency value. See gst_aggregator_set_latency for
2674  * more details.
2675  *
2676  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad
2677  * before a pad is deemed unresponsive. A value of -1 means an
2678  * unlimited time.
2679  */
2680 static GstClockTime
gst_aggregator_get_latency_property(GstAggregator * agg)2681 gst_aggregator_get_latency_property (GstAggregator * agg)
2682 {
2683   GstClockTime res;
2684 
2685   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE);
2686 
2687   GST_OBJECT_LOCK (agg);
2688   res = agg->priv->latency;
2689   GST_OBJECT_UNLOCK (agg);
2690 
2691   return res;
2692 }
2693 
2694 static void
gst_aggregator_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)2695 gst_aggregator_set_property (GObject * object, guint prop_id,
2696     const GValue * value, GParamSpec * pspec)
2697 {
2698   GstAggregator *agg = GST_AGGREGATOR (object);
2699 
2700   switch (prop_id) {
2701     case PROP_LATENCY:
2702       gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value));
2703       break;
2704     case PROP_MIN_UPSTREAM_LATENCY:
2705       SRC_LOCK (agg);
2706       agg->priv->upstream_latency_min = g_value_get_uint64 (value);
2707       SRC_UNLOCK (agg);
2708       break;
2709     case PROP_START_TIME_SELECTION:
2710       agg->priv->start_time_selection = g_value_get_enum (value);
2711       break;
2712     case PROP_START_TIME:
2713       agg->priv->start_time = g_value_get_uint64 (value);
2714       break;
2715     case PROP_EMIT_SIGNALS:
2716       agg->priv->emit_signals = g_value_get_boolean (value);
2717       break;
2718     default:
2719       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2720       break;
2721   }
2722 }
2723 
2724 static void
gst_aggregator_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)2725 gst_aggregator_get_property (GObject * object, guint prop_id,
2726     GValue * value, GParamSpec * pspec)
2727 {
2728   GstAggregator *agg = GST_AGGREGATOR (object);
2729 
2730   switch (prop_id) {
2731     case PROP_LATENCY:
2732       g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg));
2733       break;
2734     case PROP_MIN_UPSTREAM_LATENCY:
2735       SRC_LOCK (agg);
2736       g_value_set_uint64 (value, agg->priv->upstream_latency_min);
2737       SRC_UNLOCK (agg);
2738       break;
2739     case PROP_START_TIME_SELECTION:
2740       g_value_set_enum (value, agg->priv->start_time_selection);
2741       break;
2742     case PROP_START_TIME:
2743       g_value_set_uint64 (value, agg->priv->start_time);
2744       break;
2745     case PROP_EMIT_SIGNALS:
2746       g_value_set_boolean (value, agg->priv->emit_signals);
2747       break;
2748     default:
2749       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2750       break;
2751   }
2752 }
2753 
2754 /* GObject vmethods implementations */
2755 static void
gst_aggregator_class_init(GstAggregatorClass * klass)2756 gst_aggregator_class_init (GstAggregatorClass * klass)
2757 {
2758   GObjectClass *gobject_class = (GObjectClass *) klass;
2759   GstElementClass *gstelement_class = (GstElementClass *) klass;
2760 
2761   aggregator_parent_class = g_type_class_peek_parent (klass);
2762 
2763   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
2764       GST_DEBUG_FG_MAGENTA, "GstAggregator");
2765 
2766   if (aggregator_private_offset != 0)
2767     g_type_class_adjust_private_offset (klass, &aggregator_private_offset);
2768 
2769   klass->finish_buffer = gst_aggregator_default_finish_buffer;
2770   klass->finish_buffer_list = gst_aggregator_default_finish_buffer_list;
2771 
2772   klass->sink_event = gst_aggregator_default_sink_event;
2773   klass->sink_query = gst_aggregator_default_sink_query;
2774 
2775   klass->src_event = gst_aggregator_default_src_event;
2776   klass->src_query = gst_aggregator_default_src_query;
2777 
2778   klass->create_new_pad = gst_aggregator_default_create_new_pad;
2779   klass->update_src_caps = gst_aggregator_default_update_src_caps;
2780   klass->fixate_src_caps = gst_aggregator_default_fixate_src_caps;
2781   klass->negotiated_src_caps = gst_aggregator_default_negotiated_src_caps;
2782 
2783   klass->negotiate = gst_aggregator_default_negotiate;
2784 
2785   klass->sink_event_pre_queue = gst_aggregator_default_sink_event_pre_queue;
2786   klass->sink_query_pre_queue = gst_aggregator_default_sink_query_pre_queue;
2787 
2788   gstelement_class->request_new_pad =
2789       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
2790   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
2791   gstelement_class->release_pad =
2792       GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
2793   gstelement_class->change_state =
2794       GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
2795 
2796   gobject_class->set_property = gst_aggregator_set_property;
2797   gobject_class->get_property = gst_aggregator_get_property;
2798   gobject_class->finalize = gst_aggregator_finalize;
2799 
2800   g_object_class_install_property (gobject_class, PROP_LATENCY,
2801       g_param_spec_uint64 ("latency", "Buffer latency",
2802           "Additional latency in live mode to allow upstream "
2803           "to take longer to produce buffers for the current "
2804           "position (in nanoseconds)", 0, G_MAXUINT64,
2805           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2806 
2807   /**
2808    * GstAggregator:min-upstream-latency:
2809    *
2810    * Force minimum upstream latency (in nanoseconds). When sources with a
2811    * higher latency are expected to be plugged in dynamically after the
2812    * aggregator has started playing, this allows overriding the minimum
2813    * latency reported by the initial source(s). This is only taken into
2814    * account when larger than the actually reported minimum latency.
2815    *
2816    * Since: 1.16
2817    */
2818   g_object_class_install_property (gobject_class, PROP_MIN_UPSTREAM_LATENCY,
2819       g_param_spec_uint64 ("min-upstream-latency", "Buffer latency",
2820           "When sources with a higher latency are expected to be plugged "
2821           "in dynamically after the aggregator has started playing, "
2822           "this allows overriding the minimum latency reported by the "
2823           "initial source(s). This is only taken into account when larger "
2824           "than the actually reported minimum latency. (nanoseconds)",
2825           0, G_MAXUINT64,
2826           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2827 
2828   g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION,
2829       g_param_spec_enum ("start-time-selection", "Start Time Selection",
2830           "Decides which start time is output",
2831           gst_aggregator_start_time_selection_get_type (),
2832           DEFAULT_START_TIME_SELECTION,
2833           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2834 
2835   g_object_class_install_property (gobject_class, PROP_START_TIME,
2836       g_param_spec_uint64 ("start-time", "Start Time",
2837           "Start time to use if start-time-selection=set", 0,
2838           G_MAXUINT64,
2839           DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2840 
2841   /**
2842    * GstAggregator:emit-signals:
2843    *
2844    * Enables the emission of signals such as #GstAggregator::samples-selected
2845    *
2846    * Since: 1.18
2847    */
2848   g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
2849       g_param_spec_boolean ("emit-signals", "Emit signals",
2850           "Send signals", DEFAULT_EMIT_SIGNALS,
2851           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2852 
2853   /**
2854    * GstAggregator::samples-selected:
2855    * @aggregator: The #GstAggregator that emitted the signal
2856    * @segment: The #GstSegment the next output buffer is part of
2857    * @pts: The presentation timestamp of the next output buffer
2858    * @dts: The decoding timestamp of the next output buffer
2859    * @duration: The duration of the next output buffer
2860    * @info: (nullable): a #GstStructure containing additional information
2861    *
2862    * Signals that the #GstAggregator subclass has selected the next set
2863    * of input samples it will aggregate. Handlers may call
2864    * gst_aggregator_peek_next_sample() at that point.
2865    *
2866    * Since: 1.18
2867    */
2868   gst_aggregator_signals[SIGNAL_SAMPLES_SELECTED] =
2869       g_signal_new ("samples-selected", G_TYPE_FROM_CLASS (klass),
2870       G_SIGNAL_RUN_FIRST, 0, NULL, NULL, NULL, G_TYPE_NONE, 5,
2871       GST_TYPE_SEGMENT | G_SIGNAL_TYPE_STATIC_SCOPE, GST_TYPE_CLOCK_TIME,
2872       GST_TYPE_CLOCK_TIME, GST_TYPE_CLOCK_TIME,
2873       GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
2874 }
2875 
2876 static inline gpointer
gst_aggregator_get_instance_private(GstAggregator * self)2877 gst_aggregator_get_instance_private (GstAggregator * self)
2878 {
2879   return (G_STRUCT_MEMBER_P (self, aggregator_private_offset));
2880 }
2881 
2882 static void
gst_aggregator_init(GstAggregator * self,GstAggregatorClass * klass)2883 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
2884 {
2885   GstPadTemplate *pad_template;
2886   GstAggregatorPrivate *priv;
2887   GType pad_type;
2888 
2889   g_return_if_fail (klass->aggregate != NULL);
2890 
2891   self->priv = gst_aggregator_get_instance_private (self);
2892 
2893   priv = self->priv;
2894 
2895   pad_template =
2896       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
2897   g_return_if_fail (pad_template != NULL);
2898 
2899   priv->max_padserial = -1;
2900   priv->tags_changed = FALSE;
2901   priv->ignore_inactive_pads = FALSE;
2902 
2903   self->priv->peer_latency_live = FALSE;
2904   self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
2905   self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
2906   self->priv->has_peer_latency = FALSE;
2907 
2908   pad_type =
2909       GST_PAD_TEMPLATE_GTYPE (pad_template) ==
2910       G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD :
2911       GST_PAD_TEMPLATE_GTYPE (pad_template);
2912   g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD));
2913   self->srcpad =
2914       g_object_new (pad_type, "name", "src", "direction", GST_PAD_SRC,
2915       "template", pad_template, NULL);
2916 
2917   gst_aggregator_reset_flow_values (self);
2918 
2919   gst_pad_set_event_function (self->srcpad,
2920       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
2921   gst_pad_set_query_function (self->srcpad,
2922       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
2923   gst_pad_set_activatemode_function (self->srcpad,
2924       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
2925 
2926   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
2927 
2928   self->priv->upstream_latency_min = DEFAULT_MIN_UPSTREAM_LATENCY;
2929   self->priv->latency = DEFAULT_LATENCY;
2930   self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION;
2931   self->priv->start_time = DEFAULT_START_TIME;
2932 
2933   g_mutex_init (&self->priv->src_lock);
2934   g_cond_init (&self->priv->src_cond);
2935 }
2936 
2937 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
2938  * method to get to the padtemplates */
2939 GType
gst_aggregator_get_type(void)2940 gst_aggregator_get_type (void)
2941 {
2942   static gsize type = 0;
2943 
2944   if (g_once_init_enter (&type)) {
2945     GType _type;
2946     static const GTypeInfo info = {
2947       sizeof (GstAggregatorClass),
2948       NULL,
2949       NULL,
2950       (GClassInitFunc) gst_aggregator_class_init,
2951       NULL,
2952       NULL,
2953       sizeof (GstAggregator),
2954       0,
2955       (GInstanceInitFunc) gst_aggregator_init,
2956     };
2957 
2958     _type = g_type_register_static (GST_TYPE_ELEMENT,
2959         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
2960 
2961     aggregator_private_offset =
2962         g_type_add_instance_private (_type, sizeof (GstAggregatorPrivate));
2963 
2964     g_once_init_leave (&type, _type);
2965   }
2966   return type;
2967 }
2968 
2969 /* Must be called with SRC lock and PAD lock held */
2970 static gboolean
gst_aggregator_pad_has_space(GstAggregator * self,GstAggregatorPad * aggpad)2971 gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
2972 {
2973   guint64 max_time_level;
2974 
2975   /* Empty queue always has space */
2976   if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL)
2977     return TRUE;
2978 
2979   /* We also want at least two buffers, one is being processed and one is ready
2980    * for the next iteration when we operate in live mode. */
2981   if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2)
2982     return TRUE;
2983 
2984   /* zero latency, if there is a buffer, it's full */
2985   if (self->priv->latency == 0)
2986     return FALSE;
2987 
2988   /* On top of our latency, we also want to allow buffering up to the
2989    * minimum upstream latency to allow queue free sources with lower then
2990    * upstream latency. */
2991   max_time_level = self->priv->latency + self->priv->upstream_latency_min;
2992 
2993   /* Allow no more buffers than the latency */
2994   return (aggpad->priv->time_level <= max_time_level);
2995 }
2996 
2997 /* Must be called with the PAD_LOCK held */
2998 static void
apply_buffer(GstAggregatorPad * aggpad,GstBuffer * buffer,gboolean head)2999 apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
3000 {
3001   GstClockTime timestamp;
3002 
3003   if (GST_BUFFER_DTS_IS_VALID (buffer))
3004     timestamp = GST_BUFFER_DTS (buffer);
3005   else
3006     timestamp = GST_BUFFER_PTS (buffer);
3007 
3008   if (timestamp == GST_CLOCK_TIME_NONE) {
3009     if (head)
3010       timestamp = aggpad->priv->head_position;
3011     else
3012       timestamp = aggpad->priv->tail_position;
3013   }
3014 
3015   /* add duration */
3016   if (GST_BUFFER_DURATION_IS_VALID (buffer))
3017     timestamp += GST_BUFFER_DURATION (buffer);
3018 
3019   if (head)
3020     aggpad->priv->head_position = timestamp;
3021   else
3022     aggpad->priv->tail_position = timestamp;
3023 
3024   update_time_level (aggpad, head);
3025 }
3026 
3027 /*
3028  * Can be called either from the sinkpad's chain function or from the srcpad's
3029  * thread in the case of a buffer synthetized from a GAP event.
3030  * Because of this second case, FLUSH_LOCK can't be used here.
3031  */
3032 
3033 static GstFlowReturn
gst_aggregator_pad_chain_internal(GstAggregator * self,GstAggregatorPad * aggpad,GstBuffer * buffer,gboolean head)3034 gst_aggregator_pad_chain_internal (GstAggregator * self,
3035     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
3036 {
3037   GstFlowReturn flow_return;
3038   GstClockTime buf_pts;
3039 
3040   GST_TRACE_OBJECT (aggpad, "entering chain internal");
3041 
3042   PAD_LOCK (aggpad);
3043   flow_return = aggpad->priv->flow_return;
3044   if (flow_return != GST_FLOW_OK)
3045     goto flushing;
3046 
3047   PAD_UNLOCK (aggpad);
3048 
3049   buf_pts = GST_BUFFER_PTS (buffer);
3050 
3051   for (;;) {
3052     SRC_LOCK (self);
3053     GST_OBJECT_LOCK (self);
3054     PAD_LOCK (aggpad);
3055 
3056     if (aggpad->priv->first_buffer) {
3057       self->priv->has_peer_latency = FALSE;
3058       aggpad->priv->first_buffer = FALSE;
3059     }
3060 
3061     if ((gst_aggregator_pad_has_space (self, aggpad) || !head)
3062         && aggpad->priv->flow_return == GST_FLOW_OK) {
3063       if (head)
3064         g_queue_push_head (&aggpad->priv->data, buffer);
3065       else
3066         g_queue_push_tail (&aggpad->priv->data, buffer);
3067       apply_buffer (aggpad, buffer, head);
3068       aggpad->priv->num_buffers++;
3069       buffer = NULL;
3070       SRC_BROADCAST (self);
3071       break;
3072     }
3073 
3074     flow_return = aggpad->priv->flow_return;
3075     if (flow_return != GST_FLOW_OK) {
3076       GST_OBJECT_UNLOCK (self);
3077       SRC_UNLOCK (self);
3078       goto flushing;
3079     }
3080     GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed (chain)");
3081     GST_OBJECT_UNLOCK (self);
3082     SRC_UNLOCK (self);
3083     PAD_WAIT_EVENT (aggpad);
3084 
3085     PAD_UNLOCK (aggpad);
3086   }
3087 
3088   if (self->priv->first_buffer) {
3089     GstClockTime start_time;
3090     GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
3091 
3092     switch (self->priv->start_time_selection) {
3093       case GST_AGGREGATOR_START_TIME_SELECTION_ZERO:
3094       default:
3095         start_time = 0;
3096         break;
3097       case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
3098         GST_OBJECT_LOCK (aggpad);
3099         if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) {
3100           start_time = buf_pts;
3101           if (start_time != -1) {
3102             start_time = MAX (start_time, aggpad->priv->head_segment.start);
3103             start_time =
3104                 gst_segment_to_running_time (&aggpad->priv->head_segment,
3105                 GST_FORMAT_TIME, start_time);
3106           }
3107         } else {
3108           start_time = 0;
3109           GST_WARNING_OBJECT (aggpad,
3110               "Ignoring request of selecting the first start time "
3111               "as the segment is a %s segment instead of a time segment",
3112               gst_format_get_name (aggpad->segment.format));
3113         }
3114         GST_OBJECT_UNLOCK (aggpad);
3115         break;
3116       case GST_AGGREGATOR_START_TIME_SELECTION_SET:
3117         start_time = self->priv->start_time;
3118         if (start_time == -1)
3119           start_time = 0;
3120         break;
3121     }
3122 
3123     if (start_time != -1) {
3124       if (srcpad->segment.position == -1)
3125         srcpad->segment.position = start_time;
3126       else
3127         srcpad->segment.position = MIN (start_time, srcpad->segment.position);
3128 
3129       GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT,
3130           GST_TIME_ARGS (start_time));
3131     }
3132   }
3133 
3134   PAD_UNLOCK (aggpad);
3135   GST_OBJECT_UNLOCK (self);
3136   SRC_UNLOCK (self);
3137 
3138   GST_TRACE_OBJECT (aggpad, "Done chaining");
3139 
3140   return flow_return;
3141 
3142 flushing:
3143   PAD_UNLOCK (aggpad);
3144 
3145   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
3146       gst_flow_get_name (flow_return));
3147   if (buffer)
3148     gst_buffer_unref (buffer);
3149 
3150   return flow_return;
3151 }
3152 
3153 static GstFlowReturn
gst_aggregator_pad_chain(GstPad * pad,GstObject * object,GstBuffer * buffer)3154 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
3155 {
3156   GstFlowReturn ret;
3157   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
3158 
3159   GST_TRACE_OBJECT (aggpad, "entering chain");
3160 
3161   PAD_FLUSH_LOCK (aggpad);
3162 
3163   ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
3164       aggpad, buffer, TRUE);
3165 
3166   PAD_FLUSH_UNLOCK (aggpad);
3167 
3168   return ret;
3169 }
3170 
3171 static gboolean
gst_aggregator_pad_query_func(GstPad * pad,GstObject * parent,GstQuery * query)3172 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
3173     GstQuery * query)
3174 {
3175   GstAggregator *self = GST_AGGREGATOR (parent);
3176   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
3177   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
3178 
3179   g_assert (klass->sink_query_pre_queue);
3180   return klass->sink_query_pre_queue (self, aggpad, query);
3181 }
3182 
3183 static GstFlowReturn
gst_aggregator_pad_event_func(GstPad * pad,GstObject * parent,GstEvent * event)3184 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
3185     GstEvent * event)
3186 {
3187   GstAggregator *self = GST_AGGREGATOR (parent);
3188   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
3189   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
3190 
3191   g_assert (klass->sink_event_pre_queue);
3192   return klass->sink_event_pre_queue (self, aggpad, event);
3193 }
3194 
3195 static gboolean
gst_aggregator_pad_activate_mode_func(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)3196 gst_aggregator_pad_activate_mode_func (GstPad * pad,
3197     GstObject * parent, GstPadMode mode, gboolean active)
3198 {
3199   GstAggregator *self = GST_AGGREGATOR (parent);
3200   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
3201 
3202   if (active == FALSE) {
3203     SRC_LOCK (self);
3204     gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
3205     SRC_BROADCAST (self);
3206     SRC_UNLOCK (self);
3207   } else {
3208     PAD_LOCK (aggpad);
3209     aggpad->priv->flow_return = GST_FLOW_OK;
3210     PAD_BROADCAST_EVENT (aggpad);
3211     PAD_UNLOCK (aggpad);
3212   }
3213 
3214   return TRUE;
3215 }
3216 
3217 /***********************************
3218  * GstAggregatorPad implementation  *
3219  ************************************/
3220 G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
3221 
3222 #define DEFAULT_PAD_EMIT_SIGNALS FALSE
3223 
3224 enum
3225 {
3226   PAD_PROP_0,
3227   PAD_PROP_EMIT_SIGNALS,
3228 };
3229 
3230 enum
3231 {
3232   PAD_SIGNAL_BUFFER_CONSUMED,
3233   PAD_LAST_SIGNAL,
3234 };
3235 
3236 static guint gst_aggregator_pad_signals[PAD_LAST_SIGNAL] = { 0 };
3237 
3238 static void
gst_aggregator_pad_constructed(GObject * object)3239 gst_aggregator_pad_constructed (GObject * object)
3240 {
3241   GstPad *pad = GST_PAD (object);
3242 
3243   if (GST_PAD_IS_SINK (pad)) {
3244     gst_pad_set_chain_function (pad,
3245         GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
3246     gst_pad_set_event_full_function_full (pad,
3247         GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
3248     gst_pad_set_query_function (pad,
3249         GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
3250     gst_pad_set_activatemode_function (pad,
3251         GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
3252   }
3253 }
3254 
3255 static void
gst_aggregator_pad_finalize(GObject * object)3256 gst_aggregator_pad_finalize (GObject * object)
3257 {
3258   GstAggregatorPad *pad = (GstAggregatorPad *) object;
3259 
3260   gst_buffer_replace (&pad->priv->peeked_buffer, NULL);
3261   g_cond_clear (&pad->priv->event_cond);
3262   g_mutex_clear (&pad->priv->flush_lock);
3263   g_mutex_clear (&pad->priv->lock);
3264 
3265   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
3266 }
3267 
3268 static void
gst_aggregator_pad_dispose(GObject * object)3269 gst_aggregator_pad_dispose (GObject * object)
3270 {
3271   GstAggregatorPad *pad = (GstAggregatorPad *) object;
3272 
3273   gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE);
3274 
3275   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
3276 }
3277 
3278 static void
gst_aggregator_pad_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)3279 gst_aggregator_pad_set_property (GObject * object, guint prop_id,
3280     const GValue * value, GParamSpec * pspec)
3281 {
3282   GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object);
3283 
3284   switch (prop_id) {
3285     case PAD_PROP_EMIT_SIGNALS:
3286       pad->priv->emit_signals = g_value_get_boolean (value);
3287       break;
3288     default:
3289       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3290       break;
3291   }
3292 }
3293 
3294 static void
gst_aggregator_pad_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)3295 gst_aggregator_pad_get_property (GObject * object, guint prop_id,
3296     GValue * value, GParamSpec * pspec)
3297 {
3298   GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object);
3299 
3300   switch (prop_id) {
3301     case PAD_PROP_EMIT_SIGNALS:
3302       g_value_set_boolean (value, pad->priv->emit_signals);
3303       break;
3304     default:
3305       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3306       break;
3307   }
3308 }
3309 
3310 static void
gst_aggregator_pad_class_init(GstAggregatorPadClass * klass)3311 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
3312 {
3313   GObjectClass *gobject_class = (GObjectClass *) klass;
3314 
3315   gobject_class->constructed = gst_aggregator_pad_constructed;
3316   gobject_class->finalize = gst_aggregator_pad_finalize;
3317   gobject_class->dispose = gst_aggregator_pad_dispose;
3318   gobject_class->set_property = gst_aggregator_pad_set_property;
3319   gobject_class->get_property = gst_aggregator_pad_get_property;
3320 
3321   /**
3322    * GstAggregatorPad:buffer-consumed:
3323    * @aggregator: The #GstAggregator that emitted the signal
3324    * @buffer: The buffer that was consumed
3325    *
3326    * Signals that a buffer was consumed. As aggregator pads store buffers
3327    * in an internal queue, there is no direct match between input and output
3328    * buffers at any given time. This signal can be useful to forward metas
3329    * such as #GstVideoTimeCodeMeta or #GstVideoCaptionMeta at the right time.
3330    *
3331    * Since: 1.16
3332    */
3333   gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED] =
3334       g_signal_new ("buffer-consumed", G_TYPE_FROM_CLASS (klass),
3335       G_SIGNAL_RUN_FIRST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_BUFFER);
3336 
3337   /**
3338    * GstAggregatorPad:emit-signals:
3339    *
3340    * Enables the emission of signals such as #GstAggregatorPad::buffer-consumed
3341    *
3342    * Since: 1.16
3343    */
3344   g_object_class_install_property (gobject_class, PAD_PROP_EMIT_SIGNALS,
3345       g_param_spec_boolean ("emit-signals", "Emit signals",
3346           "Send signals to signal data consumption", DEFAULT_PAD_EMIT_SIGNALS,
3347           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
3348 }
3349 
3350 static void
gst_aggregator_pad_init(GstAggregatorPad * pad)3351 gst_aggregator_pad_init (GstAggregatorPad * pad)
3352 {
3353   pad->priv = gst_aggregator_pad_get_instance_private (pad);
3354 
3355   g_queue_init (&pad->priv->data);
3356   g_cond_init (&pad->priv->event_cond);
3357 
3358   g_mutex_init (&pad->priv->flush_lock);
3359   g_mutex_init (&pad->priv->lock);
3360 
3361   gst_aggregator_pad_reset_unlocked (pad);
3362   pad->priv->negotiated = FALSE;
3363   pad->priv->emit_signals = DEFAULT_PAD_EMIT_SIGNALS;
3364 }
3365 
3366 /* Must be called with the PAD_LOCK held */
3367 static void
gst_aggregator_pad_buffer_consumed(GstAggregatorPad * pad,GstBuffer * buffer,gboolean dequeued)3368 gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer,
3369     gboolean dequeued)
3370 {
3371   if (dequeued)
3372     pad->priv->num_buffers--;
3373 
3374   if (buffer && pad->priv->emit_signals) {
3375     g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED],
3376         0, buffer);
3377   }
3378   PAD_BROADCAST_EVENT (pad);
3379 }
3380 
3381 /* Must be called with the PAD_LOCK held */
3382 static void
gst_aggregator_pad_clip_buffer_unlocked(GstAggregatorPad * pad)3383 gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
3384 {
3385   GstAggregator *self = NULL;
3386   GstAggregatorClass *aggclass = NULL;
3387   GstBuffer *buffer = NULL;
3388 
3389   while (pad->priv->clipped_buffer == NULL &&
3390       GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
3391     buffer = g_queue_pop_tail (&pad->priv->data);
3392 
3393     apply_buffer (pad, buffer, FALSE);
3394 
3395     /* We only take the parent here so that it's not taken if the buffer is
3396      * already clipped or if the queue is empty.
3397      */
3398     if (self == NULL) {
3399       self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
3400       if (self == NULL) {
3401         gst_buffer_unref (buffer);
3402         return;
3403       }
3404 
3405       aggclass = GST_AGGREGATOR_GET_CLASS (self);
3406     }
3407 
3408     if (aggclass->clip) {
3409       GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer);
3410 
3411       buffer = aggclass->clip (self, pad, buffer);
3412 
3413       if (buffer == NULL) {
3414         gst_aggregator_pad_buffer_consumed (pad, buffer, TRUE);
3415         GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
3416       }
3417     }
3418 
3419     pad->priv->clipped_buffer = buffer;
3420   }
3421 
3422   if (self)
3423     gst_object_unref (self);
3424 }
3425 
3426 /**
3427  * gst_aggregator_pad_pop_buffer:
3428  * @pad: the pad to get buffer from
3429  *
3430  * Steal the ref to the buffer currently queued in @pad.
3431  *
3432  * Returns: (nullable) (transfer full): The buffer in @pad or NULL if no buffer was
3433  *   queued. You should unref the buffer after usage.
3434  */
3435 GstBuffer *
gst_aggregator_pad_pop_buffer(GstAggregatorPad * pad)3436 gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
3437 {
3438   GstBuffer *buffer = NULL;
3439 
3440   PAD_LOCK (pad);
3441 
3442   /* If the subclass has already peeked a buffer, we guarantee
3443    * that it receives the same buffer, no matter if the pad has
3444    * errored out / been flushed in the meantime.
3445    */
3446   if (pad->priv->peeked_buffer) {
3447     buffer = pad->priv->peeked_buffer;
3448     goto done;
3449   }
3450 
3451   if (pad->priv->flow_return != GST_FLOW_OK)
3452     goto done;
3453 
3454   gst_aggregator_pad_clip_buffer_unlocked (pad);
3455   buffer = pad->priv->clipped_buffer;
3456 
3457 done:
3458   if (buffer) {
3459     if (pad->priv->clipped_buffer != NULL) {
3460       /* Here we still hold a reference to both the clipped buffer
3461        * and possibly the peeked buffer, we transfer the first and
3462        * potentially release the second
3463        */
3464       gst_aggregator_pad_buffer_consumed (pad, buffer, TRUE);
3465       pad->priv->clipped_buffer = NULL;
3466       gst_buffer_replace (&pad->priv->peeked_buffer, NULL);
3467     } else {
3468       /* Here our clipped buffer has already been released, for
3469        * example because of a flush. We thus transfer the reference
3470        * to the peeked buffer to the caller, and we don't decrement
3471        * pad.num_buffers as it has already been done elsewhere
3472        */
3473       gst_aggregator_pad_buffer_consumed (pad, buffer, FALSE);
3474       pad->priv->peeked_buffer = NULL;
3475     }
3476     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
3477   }
3478 
3479   PAD_UNLOCK (pad);
3480 
3481   return buffer;
3482 }
3483 
3484 /**
3485  * gst_aggregator_pad_drop_buffer:
3486  * @pad: the pad where to drop any pending buffer
3487  *
3488  * Drop the buffer currently queued in @pad.
3489  *
3490  * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not.
3491  */
3492 gboolean
gst_aggregator_pad_drop_buffer(GstAggregatorPad * pad)3493 gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
3494 {
3495   GstBuffer *buf;
3496 
3497   buf = gst_aggregator_pad_pop_buffer (pad);
3498 
3499   if (buf == NULL)
3500     return FALSE;
3501 
3502   gst_buffer_unref (buf);
3503   return TRUE;
3504 }
3505 
3506 /**
3507  * gst_aggregator_pad_peek_buffer:
3508  * @pad: the pad to get buffer from
3509  *
3510  * Returns: (nullable) (transfer full): A reference to the buffer in @pad or
3511  * NULL if no buffer was queued. You should unref the buffer after
3512  * usage.
3513  */
3514 GstBuffer *
gst_aggregator_pad_peek_buffer(GstAggregatorPad * pad)3515 gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
3516 {
3517   GstBuffer *buffer = NULL;
3518 
3519   PAD_LOCK (pad);
3520 
3521   if (pad->priv->peeked_buffer) {
3522     buffer = gst_buffer_ref (pad->priv->peeked_buffer);
3523     goto done;
3524   }
3525 
3526   if (pad->priv->flow_return != GST_FLOW_OK)
3527     goto done;
3528 
3529   gst_aggregator_pad_clip_buffer_unlocked (pad);
3530 
3531   if (pad->priv->clipped_buffer) {
3532     buffer = gst_buffer_ref (pad->priv->clipped_buffer);
3533     pad->priv->peeked_buffer = gst_buffer_ref (buffer);
3534   } else {
3535     buffer = NULL;
3536   }
3537 
3538 done:
3539   PAD_UNLOCK (pad);
3540   return buffer;
3541 }
3542 
3543 /**
3544  * gst_aggregator_pad_has_buffer:
3545  * @pad: the pad to check the buffer on
3546  *
3547  * This checks if a pad has a buffer available that will be returned by
3548  * a call to gst_aggregator_pad_peek_buffer() or
3549  * gst_aggregator_pad_pop_buffer().
3550  *
3551  * Returns: %TRUE if the pad has a buffer available as the next thing.
3552  *
3553  * Since: 1.14.1
3554  */
3555 gboolean
gst_aggregator_pad_has_buffer(GstAggregatorPad * pad)3556 gst_aggregator_pad_has_buffer (GstAggregatorPad * pad)
3557 {
3558   gboolean has_buffer;
3559 
3560   PAD_LOCK (pad);
3561 
3562   if (pad->priv->peeked_buffer) {
3563     has_buffer = TRUE;
3564   } else {
3565     gst_aggregator_pad_clip_buffer_unlocked (pad);
3566     has_buffer = (pad->priv->clipped_buffer != NULL);
3567     if (has_buffer)
3568       pad->priv->peeked_buffer = gst_buffer_ref (pad->priv->clipped_buffer);
3569   }
3570   PAD_UNLOCK (pad);
3571 
3572   return has_buffer;
3573 }
3574 
3575 /**
3576  * gst_aggregator_pad_is_eos:
3577  * @pad: an aggregator pad
3578  *
3579  * Returns: %TRUE if the pad is EOS, otherwise %FALSE.
3580  */
3581 gboolean
gst_aggregator_pad_is_eos(GstAggregatorPad * pad)3582 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
3583 {
3584   gboolean is_eos;
3585 
3586   PAD_LOCK (pad);
3587   is_eos = pad->priv->eos;
3588   PAD_UNLOCK (pad);
3589 
3590   return is_eos;
3591 }
3592 
3593 /**
3594  * gst_aggregator_pad_is_inactive:
3595  * @pad: an aggregator pad
3596  *
3597  * It is only valid to call this method from #GstAggregatorClass::aggregate()
3598  *
3599  * Returns: %TRUE if the pad is inactive, %FALSE otherwise.
3600  *   See gst_aggregator_ignore_inactive_pads() for more info.
3601  * Since: 1.20
3602  */
3603 gboolean
gst_aggregator_pad_is_inactive(GstAggregatorPad * pad)3604 gst_aggregator_pad_is_inactive (GstAggregatorPad * pad)
3605 {
3606   GstAggregator *self;
3607   gboolean inactive;
3608 
3609   self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
3610 
3611   g_assert_nonnull (self);
3612 
3613   PAD_LOCK (pad);
3614   inactive = self->priv->ignore_inactive_pads && self->priv->peer_latency_live
3615       && pad->priv->first_buffer;
3616   PAD_UNLOCK (pad);
3617 
3618   gst_object_unref (self);
3619 
3620   return inactive;
3621 }
3622 
3623 #if 0
3624 /*
3625  * gst_aggregator_merge_tags:
3626  * @self: a #GstAggregator
3627  * @tags: a #GstTagList to merge
3628  * @mode: the #GstTagMergeMode to use
3629  *
3630  * Adds tags to so-called pending tags, which will be processed
3631  * before pushing out data downstream.
3632  *
3633  * Note that this is provided for convenience, and the subclass is
3634  * not required to use this and can still do tag handling on its own.
3635  *
3636  * MT safe.
3637  */
3638 void
3639 gst_aggregator_merge_tags (GstAggregator * self,
3640     const GstTagList * tags, GstTagMergeMode mode)
3641 {
3642   GstTagList *otags;
3643 
3644   g_return_if_fail (GST_IS_AGGREGATOR (self));
3645   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
3646 
3647   /* FIXME Check if we can use OBJECT lock here! */
3648   GST_OBJECT_LOCK (self);
3649   if (tags)
3650     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
3651   otags = self->priv->tags;
3652   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
3653   if (otags)
3654     gst_tag_list_unref (otags);
3655   self->priv->tags_changed = TRUE;
3656   GST_OBJECT_UNLOCK (self);
3657 }
3658 #endif
3659 
3660 /**
3661  * gst_aggregator_set_latency:
3662  * @self: a #GstAggregator
3663  * @min_latency: minimum latency
3664  * @max_latency: maximum latency
3665  *
3666  * Lets #GstAggregator sub-classes tell the baseclass what their internal
3667  * latency is. Will also post a LATENCY message on the bus so the pipeline
3668  * can reconfigure its global latency.
3669  */
3670 void
gst_aggregator_set_latency(GstAggregator * self,GstClockTime min_latency,GstClockTime max_latency)3671 gst_aggregator_set_latency (GstAggregator * self,
3672     GstClockTime min_latency, GstClockTime max_latency)
3673 {
3674   gboolean changed = FALSE;
3675 
3676   g_return_if_fail (GST_IS_AGGREGATOR (self));
3677   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
3678   g_return_if_fail (max_latency >= min_latency);
3679 
3680   SRC_LOCK (self);
3681   if (self->priv->sub_latency_min != min_latency) {
3682     self->priv->sub_latency_min = min_latency;
3683     changed = TRUE;
3684   }
3685   if (self->priv->sub_latency_max != max_latency) {
3686     self->priv->sub_latency_max = max_latency;
3687     changed = TRUE;
3688   }
3689 
3690   if (changed)
3691     SRC_BROADCAST (self);
3692   SRC_UNLOCK (self);
3693 
3694   if (changed) {
3695     gst_element_post_message (GST_ELEMENT_CAST (self),
3696         gst_message_new_latency (GST_OBJECT_CAST (self)));
3697   }
3698 }
3699 
3700 /**
3701  * gst_aggregator_get_buffer_pool:
3702  * @self: a #GstAggregator
3703  *
3704  * Returns: (transfer full) (nullable): the instance of the #GstBufferPool used
3705  * by @trans; free it after use it
3706  */
3707 GstBufferPool *
gst_aggregator_get_buffer_pool(GstAggregator * self)3708 gst_aggregator_get_buffer_pool (GstAggregator * self)
3709 {
3710   GstBufferPool *pool;
3711 
3712   g_return_val_if_fail (GST_IS_AGGREGATOR (self), NULL);
3713 
3714   GST_OBJECT_LOCK (self);
3715   pool = self->priv->pool;
3716   if (pool)
3717     gst_object_ref (pool);
3718   GST_OBJECT_UNLOCK (self);
3719 
3720   return pool;
3721 }
3722 
3723 /**
3724  * gst_aggregator_get_allocator:
3725  * @self: a #GstAggregator
3726  * @allocator: (out) (optional) (nullable) (transfer full): the #GstAllocator
3727  * used
3728  * @params: (out caller-allocates) (optional): the
3729  * #GstAllocationParams of @allocator
3730  *
3731  * Lets #GstAggregator sub-classes get the memory @allocator
3732  * acquired by the base class and its @params.
3733  *
3734  * Unref the @allocator after use it.
3735  */
3736 void
gst_aggregator_get_allocator(GstAggregator * self,GstAllocator ** allocator,GstAllocationParams * params)3737 gst_aggregator_get_allocator (GstAggregator * self,
3738     GstAllocator ** allocator, GstAllocationParams * params)
3739 {
3740   g_return_if_fail (GST_IS_AGGREGATOR (self));
3741 
3742   if (allocator)
3743     *allocator = self->priv->allocator ?
3744         gst_object_ref (self->priv->allocator) : NULL;
3745 
3746   if (params)
3747     *params = self->priv->allocation_params;
3748 }
3749 
3750 /**
3751  * gst_aggregator_simple_get_next_time:
3752  * @self: A #GstAggregator
3753  *
3754  * This is a simple #GstAggregatorClass::get_next_time implementation that
3755  * just looks at the #GstSegment on the srcpad of the aggregator and bases
3756  * the next time on the running time there.
3757  *
3758  * This is the desired behaviour in most cases where you have a live source
3759  * and you have a dead line based aggregator subclass.
3760  *
3761  * Returns: The running time based on the position
3762  *
3763  * Since: 1.16
3764  */
3765 GstClockTime
gst_aggregator_simple_get_next_time(GstAggregator * self)3766 gst_aggregator_simple_get_next_time (GstAggregator * self)
3767 {
3768   GstClockTime next_time;
3769   GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
3770   GstSegment *segment = &srcpad->segment;
3771 
3772   GST_OBJECT_LOCK (self);
3773   if (segment->position == -1 || segment->position < segment->start)
3774     next_time = segment->start;
3775   else
3776     next_time = segment->position;
3777 
3778   if (segment->stop != -1 && next_time > segment->stop)
3779     next_time = segment->stop;
3780 
3781   next_time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, next_time);
3782   GST_OBJECT_UNLOCK (self);
3783 
3784   return next_time;
3785 }
3786 
3787 /**
3788  * gst_aggregator_update_segment:
3789  *
3790  * Subclasses should use this to update the segment on their
3791  * source pad, instead of directly pushing new segment events
3792  * downstream.
3793  *
3794  * Subclasses MUST call this before gst_aggregator_selected_samples(),
3795  * if it is used at all.
3796  *
3797  * Since: 1.18
3798  */
3799 void
gst_aggregator_update_segment(GstAggregator * self,const GstSegment * segment)3800 gst_aggregator_update_segment (GstAggregator * self, const GstSegment * segment)
3801 {
3802   g_return_if_fail (GST_IS_AGGREGATOR (self));
3803   g_return_if_fail (segment != NULL);
3804 
3805   GST_INFO_OBJECT (self, "Updating srcpad segment: %" GST_SEGMENT_FORMAT,
3806       segment);
3807 
3808   GST_OBJECT_LOCK (self);
3809   GST_AGGREGATOR_PAD (self->srcpad)->segment = *segment;
3810   self->priv->send_segment = TRUE;
3811   /* we have a segment from the subclass now and really shouldn't override
3812    * anything in that segment anymore, like the segment.position */
3813   self->priv->first_buffer = FALSE;
3814   GST_OBJECT_UNLOCK (self);
3815 }
3816 
3817 /**
3818  * gst_aggregator_selected_samples:
3819  * @pts: The presentation timestamp of the next output buffer
3820  * @dts: The decoding timestamp of the next output buffer
3821  * @duration: The duration of the next output buffer
3822  * @info: (nullable): a #GstStructure containing additional information
3823  *
3824  * Subclasses should call this when they have prepared the
3825  * buffers they will aggregate for each of their sink pads, but
3826  * before using any of the properties of the pads that govern
3827  * *how* aggregation should be performed, for example z-index
3828  * for video aggregators.
3829  *
3830  * If gst_aggregator_update_segment() is used by the subclass,
3831  * it MUST be called before gst_aggregator_selected_samples().
3832  *
3833  * This function MUST only be called from the #GstAggregatorClass::aggregate()
3834  * function.
3835  *
3836  * Since: 1.18
3837  */
3838 void
gst_aggregator_selected_samples(GstAggregator * self,GstClockTime pts,GstClockTime dts,GstClockTime duration,GstStructure * info)3839 gst_aggregator_selected_samples (GstAggregator * self,
3840     GstClockTime pts, GstClockTime dts, GstClockTime duration,
3841     GstStructure * info)
3842 {
3843   g_return_if_fail (GST_IS_AGGREGATOR (self));
3844 
3845   if (self->priv->emit_signals) {
3846     g_signal_emit (self, gst_aggregator_signals[SIGNAL_SAMPLES_SELECTED], 0,
3847         &GST_AGGREGATOR_PAD (self->srcpad)->segment, pts, dts, duration, info);
3848   }
3849 
3850   self->priv->selected_samples_called_or_warned = TRUE;
3851 }
3852 
3853 /**
3854  * gst_aggregator_set_ignore_inactive_pads:
3855  * @ignore: whether inactive pads should not be waited on
3856  *
3857  * Subclasses should call this when they don't want to time out
3858  * waiting for a pad that hasn't yet received any buffers in live
3859  * mode.
3860  *
3861  * #GstAggregator will still wait once on each newly-added pad, making
3862  * sure upstream has had a fair chance to start up.
3863  *
3864  * Since: 1.20
3865  */
3866 void
gst_aggregator_set_ignore_inactive_pads(GstAggregator * self,gboolean ignore)3867 gst_aggregator_set_ignore_inactive_pads (GstAggregator * self, gboolean ignore)
3868 {
3869   g_return_if_fail (GST_IS_AGGREGATOR (self));
3870 
3871   GST_OBJECT_LOCK (self);
3872   self->priv->ignore_inactive_pads = ignore;
3873   GST_OBJECT_UNLOCK (self);
3874 }
3875 
3876 /**
3877  * gst_aggregator_get_ignore_inactive_pads:
3878  *
3879  * Returns: whether inactive pads will not be waited on
3880  * Since: 1.20
3881  */
3882 gboolean
gst_aggregator_get_ignore_inactive_pads(GstAggregator * self)3883 gst_aggregator_get_ignore_inactive_pads (GstAggregator * self)
3884 {
3885   gboolean ret;
3886 
3887   g_return_val_if_fail (GST_IS_AGGREGATOR (self), FALSE);
3888 
3889   GST_OBJECT_LOCK (self);
3890   ret = self->priv->ignore_inactive_pads;
3891   GST_OBJECT_UNLOCK (self);
3892 
3893   return ret;
3894 }
3895