1 /* GStreamer aggregator base class
2 * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3 * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4 *
5 * gstaggregator.c:
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
16 *
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
21 */
22 /**
23 * SECTION: gstaggregator
24 * @title: GstAggregator
25 * @short_description: Base class for mixers and muxers, manages a set of input
26 * pads and aggregates their streams
27 * @see_also: gstcollectpads for historical reasons.
28 *
29 * Manages a set of pads with the purpose of aggregating their buffers.
30 * Control is given to the subclass when all pads have data.
31 *
32 * * Base class for mixers and muxers. Subclasses should at least implement
33 * the #GstAggregatorClass::aggregate virtual method.
34 *
35 * * Installs a #GstPadChainFunction, a #GstPadEventFullFunction and a
36 * #GstPadQueryFunction to queue all serialized data packets per sink pad.
37 * Subclasses should not overwrite those, but instead implement
38 * #GstAggregatorClass::sink_event and #GstAggregatorClass::sink_query as
39 * needed.
40 *
41 * * When data is queued on all pads, the aggregate vmethod is called.
42 *
43 * * One can peek at the data on any given GstAggregatorPad with the
44 * gst_aggregator_pad_peek_buffer() method, and remove it from the pad
45 * with the gst_aggregator_pad_pop_buffer () method. When a buffer
46 * has been taken with pop_buffer (), a new buffer can be queued
47 * on that pad.
48 *
49 * * When gst_aggregator_pad_peek_buffer() or gst_aggregator_pad_has_buffer()
50 * are called, a reference is taken to the returned buffer, which stays
51 * valid until either:
52 *
53 * - gst_aggregator_pad_pop_buffer() is called, in which case the caller
54 * is guaranteed that the buffer they receive is the same as the peeked
55 * buffer.
56 * - gst_aggregator_pad_drop_buffer() is called, in which case the caller
57 * is guaranteed that the dropped buffer is the one that was peeked.
58 * - the subclass implementation of #GstAggregatorClass.aggregate returns.
59 *
60 * Subsequent calls to gst_aggregator_pad_peek_buffer() or
61 * gst_aggregator_pad_has_buffer() return / check the same buffer that was
62 * returned / checked, until one of the conditions listed above is met.
63 *
64 * Subclasses are only allowed to call these methods from the aggregate
65 * thread.
66 *
67 * * If the subclass wishes to push a buffer downstream in its aggregate
68 * implementation, it should do so through the
69 * gst_aggregator_finish_buffer() method. This method will take care
70 * of sending and ordering mandatory events such as stream start, caps
71 * and segment. Buffer lists can also be pushed out with
72 * gst_aggregator_finish_buffer_list().
73 *
74 * * Same goes for EOS events, which should not be pushed directly by the
75 * subclass, it should instead return GST_FLOW_EOS in its aggregate
76 * implementation.
77 *
78 * * Note that the aggregator logic regarding gap event handling is to turn
79 * these into gap buffers with matching PTS and duration. It will also
80 * flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
81 * to ease their identification and subsequent processing.
82 *
83 * * Subclasses must use (a subclass of) #GstAggregatorPad for both their
84 * sink and source pads.
85 * See gst_element_class_add_static_pad_template_with_gtype().
86 *
87 * This class used to live in gst-plugins-bad and was moved to core.
88 *
89 * Since: 1.14
90 */
91
92 /**
93 * SECTION: gstaggregatorpad
94 * @title: GstAggregatorPad
95 * @short_description: #GstPad subclass for pads managed by #GstAggregator
96 * @see_also: gstcollectpads for historical reasons.
97 *
98 * Pads managed by a #GstAggregator subclass.
99 *
100 * This class used to live in gst-plugins-bad and was moved to core.
101 *
102 * Since: 1.14
103 */
104
105 #ifdef HAVE_CONFIG_H
106 # include "config.h"
107 #endif
108
109 #include <string.h> /* strlen */
110
111 #include "gstaggregator.h"
112
113 GType
gst_aggregator_start_time_selection_get_type(void)114 gst_aggregator_start_time_selection_get_type (void)
115 {
116 static GType gtype = 0;
117
118 if (g_once_init_enter (>ype)) {
119 static const GEnumValue values[] = {
120 {GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
121 "GST_AGGREGATOR_START_TIME_SELECTION_ZERO", "zero"},
122 {GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
123 "GST_AGGREGATOR_START_TIME_SELECTION_FIRST", "first"},
124 {GST_AGGREGATOR_START_TIME_SELECTION_SET,
125 "GST_AGGREGATOR_START_TIME_SELECTION_SET", "set"},
126 {0, NULL, NULL}
127 };
128 GType new_type =
129 g_enum_register_static ("GstAggregatorStartTimeSelection", values);
130
131 g_once_init_leave (>ype, new_type);
132 }
133 return gtype;
134 }
135
136 /* Might become API */
137 #if 0
138 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
139 const GstTagList * tags, GstTagMergeMode mode);
140 #endif
141 static void gst_aggregator_set_latency_property (GstAggregator * agg,
142 GstClockTime latency);
143 static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
144
145 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
146
147 static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad,
148 GstBuffer * buffer, gboolean dequeued);
149
150 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
151 #define GST_CAT_DEFAULT aggregator_debug
152
153 /* Locking order, locks in this element must always be taken in this order
154 *
155 * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad)
156 * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad)
157 * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad)
158 * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST
159 * standard element object lock -> GST_OBJECT_LOCK(agg)
160 * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad)
161 * standard src pad object lock -> GST_OBJECT_LOCK(srcpad)
162 * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad)
163 */
164
165 /* GstAggregatorPad definitions */
166 #define PAD_LOCK(pad) G_STMT_START { \
167 GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p", \
168 g_thread_self()); \
169 g_mutex_lock(&pad->priv->lock); \
170 GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p", \
171 g_thread_self()); \
172 } G_STMT_END
173
174 #define PAD_UNLOCK(pad) G_STMT_START { \
175 GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p", \
176 g_thread_self()); \
177 g_mutex_unlock(&pad->priv->lock); \
178 GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p", \
179 g_thread_self()); \
180 } G_STMT_END
181
182
183 #define PAD_WAIT_EVENT(pad) G_STMT_START { \
184 GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p", \
185 g_thread_self()); \
186 g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond), \
187 (&((GstAggregatorPad*)pad)->priv->lock)); \
188 GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \
189 g_thread_self()); \
190 } G_STMT_END
191
192 #define PAD_BROADCAST_EVENT(pad) G_STMT_START { \
193 GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p", \
194 g_thread_self()); \
195 g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond)); \
196 } G_STMT_END
197
198
199 #define PAD_FLUSH_LOCK(pad) G_STMT_START { \
200 GST_TRACE_OBJECT (pad, "Taking lock from thread %p", \
201 g_thread_self()); \
202 g_mutex_lock(&pad->priv->flush_lock); \
203 GST_TRACE_OBJECT (pad, "Took lock from thread %p", \
204 g_thread_self()); \
205 } G_STMT_END
206
207 #define PAD_FLUSH_UNLOCK(pad) G_STMT_START { \
208 GST_TRACE_OBJECT (pad, "Releasing lock from thread %p", \
209 g_thread_self()); \
210 g_mutex_unlock(&pad->priv->flush_lock); \
211 GST_TRACE_OBJECT (pad, "Release lock from thread %p", \
212 g_thread_self()); \
213 } G_STMT_END
214
215 #define SRC_LOCK(self) G_STMT_START { \
216 GST_TRACE_OBJECT (self, "Taking src lock from thread %p", \
217 g_thread_self()); \
218 g_mutex_lock(&self->priv->src_lock); \
219 GST_TRACE_OBJECT (self, "Took src lock from thread %p", \
220 g_thread_self()); \
221 } G_STMT_END
222
223 #define SRC_UNLOCK(self) G_STMT_START { \
224 GST_TRACE_OBJECT (self, "Releasing src lock from thread %p", \
225 g_thread_self()); \
226 g_mutex_unlock(&self->priv->src_lock); \
227 GST_TRACE_OBJECT (self, "Released src lock from thread %p", \
228 g_thread_self()); \
229 } G_STMT_END
230
231 #define SRC_WAIT(self) G_STMT_START { \
232 GST_LOG_OBJECT (self, "Waiting for src on thread %p", \
233 g_thread_self()); \
234 g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock)); \
235 GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p", \
236 g_thread_self()); \
237 } G_STMT_END
238
239 #define SRC_BROADCAST(self) G_STMT_START { \
240 GST_LOG_OBJECT (self, "Signaling src from thread %p", \
241 g_thread_self()); \
242 if (self->priv->aggregate_id) \
243 gst_clock_id_unschedule (self->priv->aggregate_id); \
244 g_cond_broadcast(&(self->priv->src_cond)); \
245 } G_STMT_END
246
247 struct _GstAggregatorPadPrivate
248 {
249 /* Following fields are protected by the PAD_LOCK */
250 GstFlowReturn flow_return;
251
252 guint32 last_flush_start_seqnum;
253 guint32 last_flush_stop_seqnum;
254
255 /* Whether the pad hasn't received a first buffer yet */
256 gboolean first_buffer;
257 /* Whether we waited once for the pad's first buffer */
258 gboolean waited_once;
259
260 GQueue data; /* buffers, events and queries */
261 GstBuffer *clipped_buffer;
262 guint num_buffers;
263 GstBuffer *peeked_buffer;
264
265 /* used to track fill state of queues, only used with live-src and when
266 * latency property is set to > 0 */
267 GstClockTime head_position;
268 GstClockTime tail_position;
269 GstClockTime head_time; /* running time */
270 GstClockTime tail_time;
271 GstClockTime time_level; /* how much head is ahead of tail */
272 GstSegment head_segment; /* segment before the queue */
273
274 gboolean negotiated;
275
276 gboolean eos;
277
278 GMutex lock;
279 GCond event_cond;
280 /* This lock prevents a flush start processing happening while
281 * the chain function is also happening.
282 */
283 GMutex flush_lock;
284
285 /* properties */
286 gboolean emit_signals;
287 };
288
289 /* Must be called with PAD_LOCK held */
290 static void
gst_aggregator_pad_reset_unlocked(GstAggregatorPad * aggpad)291 gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
292 {
293 aggpad->priv->eos = FALSE;
294 aggpad->priv->flow_return = GST_FLOW_OK;
295 GST_OBJECT_LOCK (aggpad);
296 gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
297 gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED);
298 GST_OBJECT_UNLOCK (aggpad);
299 aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
300 aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
301 aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
302 aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
303 aggpad->priv->time_level = 0;
304 aggpad->priv->first_buffer = TRUE;
305 aggpad->priv->waited_once = FALSE;
306 }
307
308 static gboolean
gst_aggregator_pad_flush(GstAggregatorPad * aggpad,GstAggregator * agg)309 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
310 {
311 GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
312
313 PAD_LOCK (aggpad);
314 gst_aggregator_pad_reset_unlocked (aggpad);
315 PAD_UNLOCK (aggpad);
316
317 if (klass->flush)
318 return (klass->flush (aggpad, agg) == GST_FLOW_OK);
319
320 return TRUE;
321 }
322
323 /**
324 * gst_aggregator_peek_next_sample:
325 *
326 * Use this function to determine what input buffers will be aggregated
327 * to produce the next output buffer. This should only be called from
328 * a #GstAggregator::samples-selected handler, and can be used to precisely
329 * control aggregating parameters for a given set of input samples.
330 *
331 * Returns: (nullable) (transfer full): The sample that is about to be aggregated. It may hold a #GstBuffer
332 * or a #GstBufferList. The contents of its info structure is subclass-dependent,
333 * and documented on a subclass basis. The buffers held by the sample are
334 * not writable.
335 * Since: 1.18
336 */
337 GstSample *
gst_aggregator_peek_next_sample(GstAggregator * agg,GstAggregatorPad * aggpad)338 gst_aggregator_peek_next_sample (GstAggregator * agg, GstAggregatorPad * aggpad)
339 {
340 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (agg);
341
342 if (klass->peek_next_sample)
343 return (klass->peek_next_sample (agg, aggpad));
344
345 return NULL;
346 }
347
348 /*************************************
349 * GstAggregator implementation *
350 *************************************/
351 static GstElementClass *aggregator_parent_class = NULL;
352 static gint aggregator_private_offset = 0;
353
354 /* All members are protected by the object lock unless otherwise noted */
355
356 struct _GstAggregatorPrivate
357 {
358 gint max_padserial;
359
360 /* Our state is >= PAUSED */
361 gboolean running; /* protected by src_lock */
362
363 /* seqnum from last seek or common seqnum to flush start events received
364 * on all pads, for flushing without a seek */
365 guint32 next_seqnum;
366 /* seqnum to apply to synthetic segment/eos events */
367 guint32 seqnum;
368 gboolean send_stream_start; /* protected by srcpad stream lock */
369 gboolean send_segment;
370 gboolean flushing;
371 gboolean send_eos; /* protected by srcpad stream lock */
372
373 GstCaps *srccaps; /* protected by the srcpad stream lock */
374
375 GstTagList *tags;
376 gboolean tags_changed;
377
378 gboolean peer_latency_live; /* protected by src_lock */
379 GstClockTime peer_latency_min; /* protected by src_lock */
380 GstClockTime peer_latency_max; /* protected by src_lock */
381 gboolean has_peer_latency; /* protected by src_lock */
382
383 GstClockTime sub_latency_min; /* protected by src_lock */
384 GstClockTime sub_latency_max; /* protected by src_lock */
385
386 GstClockTime upstream_latency_min; /* protected by src_lock */
387
388 /* aggregate */
389 GstClockID aggregate_id; /* protected by src_lock */
390 gboolean selected_samples_called_or_warned; /* protected by src_lock */
391 GMutex src_lock;
392 GCond src_cond;
393
394 gboolean first_buffer; /* protected by object lock */
395 GstAggregatorStartTimeSelection start_time_selection;
396 GstClockTime start_time;
397
398 /* protected by the object lock */
399 GstQuery *allocation_query;
400 GstAllocator *allocator;
401 GstBufferPool *pool;
402 GstAllocationParams allocation_params;
403
404 /* properties */
405 gint64 latency; /* protected by both src_lock and all pad locks */
406 gboolean emit_signals;
407 gboolean ignore_inactive_pads;
408 };
409
410 /* Seek event forwarding helper */
411 typedef struct
412 {
413 /* parameters */
414 GstEvent *event;
415 gboolean flush;
416 gboolean only_to_active_pads;
417
418 /* results */
419 gboolean result;
420 gboolean one_actually_seeked;
421 } EventData;
422
423 #define DEFAULT_LATENCY 0
424 #define DEFAULT_MIN_UPSTREAM_LATENCY 0
425 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
426 #define DEFAULT_START_TIME (-1)
427 #define DEFAULT_EMIT_SIGNALS FALSE
428
429 enum
430 {
431 PROP_0,
432 PROP_LATENCY,
433 PROP_MIN_UPSTREAM_LATENCY,
434 PROP_START_TIME_SELECTION,
435 PROP_START_TIME,
436 PROP_EMIT_SIGNALS,
437 PROP_LAST
438 };
439
440 enum
441 {
442 SIGNAL_SAMPLES_SELECTED,
443 LAST_SIGNAL,
444 };
445
446 static guint gst_aggregator_signals[LAST_SIGNAL] = { 0 };
447
448 static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
449 GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
450
451 static gboolean
gst_aggregator_pad_queue_is_empty(GstAggregatorPad * pad)452 gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
453 {
454 return (g_queue_peek_tail (&pad->priv->data) == NULL &&
455 pad->priv->clipped_buffer == NULL);
456 }
457
458 /* Will return FALSE if there's no buffer available on every non-EOS pad, or
459 * if at least one of the pads has an event or query at the top of its queue.
460 *
461 * Only returns TRUE if all non-EOS pads have a buffer available at the top of
462 * their queue or a clipped buffer already.
463 */
464 static gboolean
gst_aggregator_check_pads_ready(GstAggregator * self,gboolean * have_event_or_query_ret)465 gst_aggregator_check_pads_ready (GstAggregator * self,
466 gboolean * have_event_or_query_ret)
467 {
468 GstAggregatorPad *pad = NULL;
469 GList *l, *sinkpads;
470 gboolean have_buffer = TRUE;
471 gboolean have_event_or_query = FALSE;
472 guint n_ready = 0;
473
474 GST_LOG_OBJECT (self, "checking pads");
475
476 GST_OBJECT_LOCK (self);
477
478 sinkpads = GST_ELEMENT_CAST (self)->sinkpads;
479 if (sinkpads == NULL)
480 goto no_sinkpads;
481
482 for (l = sinkpads; l != NULL; l = l->next) {
483 pad = l->data;
484
485 PAD_LOCK (pad);
486
487 /* If there's an event or query at the top of the queue and we don't yet
488 * have taken the top buffer out and stored it as clip_buffer, remember
489 * that and exit the loop. We first have to handle all events/queries
490 * before we handle any buffers. */
491 if (!pad->priv->clipped_buffer
492 && (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data))
493 || GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))) {
494 PAD_UNLOCK (pad);
495 have_event_or_query = TRUE;
496 break;
497 }
498
499 if (self->priv->ignore_inactive_pads && self->priv->peer_latency_live &&
500 pad->priv->waited_once && pad->priv->first_buffer && !pad->priv->eos) {
501 PAD_UNLOCK (pad);
502 continue;
503 }
504
505 /* Otherwise check if we have a clipped buffer or a buffer at the top of
506 * the queue, and if not then this pad is not ready unless it is also EOS */
507 if (!pad->priv->clipped_buffer
508 && !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
509 /* We must not have any buffers at all in this pad then as otherwise we
510 * would've had an event/query at the top of the queue */
511 g_assert (pad->priv->num_buffers == 0);
512
513 /* Only consider this pad as worth waiting for if it's not already EOS.
514 * There's no point in waiting for buffers on EOS pads */
515 if (!pad->priv->eos)
516 have_buffer = FALSE;
517 else
518 n_ready++;
519 } else if (self->priv->peer_latency_live) {
520 /* In live mode, having a single pad with buffers is enough to
521 * generate a start time from it. In non-live mode all pads need
522 * to have a buffer
523 */
524 self->priv->first_buffer = FALSE;
525 n_ready++;
526 }
527
528 PAD_UNLOCK (pad);
529 }
530
531 if (self->priv->ignore_inactive_pads && self->priv->peer_latency_live
532 && n_ready == 0)
533 goto no_sinkpads;
534
535 if (have_event_or_query)
536 goto pad_not_ready_but_event_or_query;
537
538 if (!have_buffer)
539 goto pad_not_ready;
540
541 if (have_buffer)
542 self->priv->first_buffer = FALSE;
543
544 GST_OBJECT_UNLOCK (self);
545 GST_LOG_OBJECT (self, "pads are ready");
546
547 if (have_event_or_query_ret)
548 *have_event_or_query_ret = have_event_or_query;
549
550 return TRUE;
551
552 no_sinkpads:
553 {
554 GST_LOG_OBJECT (self, "pads not ready: no sink pads");
555 GST_OBJECT_UNLOCK (self);
556
557 if (have_event_or_query_ret)
558 *have_event_or_query_ret = have_event_or_query;
559
560 return FALSE;
561 }
562 pad_not_ready:
563 {
564 GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
565 GST_OBJECT_UNLOCK (self);
566
567 if (have_event_or_query_ret)
568 *have_event_or_query_ret = have_event_or_query;
569
570 return FALSE;
571 }
572 pad_not_ready_but_event_or_query:
573 {
574 GST_LOG_OBJECT (pad,
575 "pad not ready to be aggregated yet, need to handle serialized event or query first");
576 GST_OBJECT_UNLOCK (self);
577
578 if (have_event_or_query_ret)
579 *have_event_or_query_ret = have_event_or_query;
580
581 return FALSE;
582 }
583 }
584
585 static void
gst_aggregator_reset_flow_values(GstAggregator * self)586 gst_aggregator_reset_flow_values (GstAggregator * self)
587 {
588 GST_OBJECT_LOCK (self);
589 self->priv->send_stream_start = TRUE;
590 self->priv->send_segment = TRUE;
591 gst_segment_init (&GST_AGGREGATOR_PAD (self->srcpad)->segment,
592 GST_FORMAT_TIME);
593 /* Initialize to -1 so we set it to the start position once the first buffer
594 * is handled in gst_aggregator_pad_chain_internal() */
595 GST_AGGREGATOR_PAD (self->srcpad)->segment.position = -1;
596 self->priv->first_buffer = TRUE;
597 GST_OBJECT_UNLOCK (self);
598 }
599
600 static inline void
gst_aggregator_push_mandatory_events(GstAggregator * self,gboolean up_to_caps)601 gst_aggregator_push_mandatory_events (GstAggregator * self, gboolean up_to_caps)
602 {
603 GstAggregatorPrivate *priv = self->priv;
604 GstEvent *segment = NULL;
605 GstEvent *tags = NULL;
606
607 if (self->priv->send_stream_start) {
608 gchar s_id[32];
609
610 GST_INFO_OBJECT (self, "pushing stream start");
611 /* stream-start (FIXME: create id based on input ids) */
612 g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
613 if (!gst_pad_push_event (GST_PAD (self->srcpad),
614 gst_event_new_stream_start (s_id))) {
615 GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
616 }
617 self->priv->send_stream_start = FALSE;
618 }
619
620 if (self->priv->srccaps) {
621 GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
622 self->priv->srccaps);
623 if (!gst_pad_push_event (GST_PAD (self->srcpad),
624 gst_event_new_caps (self->priv->srccaps))) {
625 GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
626 }
627 gst_caps_unref (self->priv->srccaps);
628 self->priv->srccaps = NULL;
629 }
630
631 if (up_to_caps)
632 return;
633
634 GST_OBJECT_LOCK (self);
635 if (self->priv->send_segment && !self->priv->flushing) {
636 segment =
637 gst_event_new_segment (&GST_AGGREGATOR_PAD (self->srcpad)->segment);
638
639 if (!self->priv->seqnum)
640 /* This code-path is in preparation to be able to run without a source
641 * connected. Then we won't have a seq-num from a segment event. */
642 self->priv->seqnum = gst_event_get_seqnum (segment);
643 else
644 gst_event_set_seqnum (segment, self->priv->seqnum);
645 self->priv->send_segment = FALSE;
646
647 GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
648 }
649
650 if (priv->tags && priv->tags_changed && !self->priv->flushing) {
651 tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
652 priv->tags_changed = FALSE;
653 }
654 GST_OBJECT_UNLOCK (self);
655
656 if (segment)
657 gst_pad_push_event (self->srcpad, segment);
658 if (tags)
659 gst_pad_push_event (self->srcpad, tags);
660 }
661
662 /**
663 * gst_aggregator_set_src_caps:
664 * @self: The #GstAggregator
665 * @caps: The #GstCaps to set on the src pad.
666 *
667 * Sets the caps to be used on the src pad.
668 */
669 void
gst_aggregator_set_src_caps(GstAggregator * self,GstCaps * caps)670 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
671 {
672 GstCaps *old_caps;
673
674 GST_PAD_STREAM_LOCK (self->srcpad);
675
676 if (caps && (old_caps = gst_pad_get_current_caps (self->srcpad))) {
677 if (gst_caps_is_equal (caps, old_caps)) {
678 GST_DEBUG_OBJECT (self,
679 "New caps are the same as the previously set caps %" GST_PTR_FORMAT,
680 old_caps);
681 gst_caps_unref (old_caps);
682 GST_PAD_STREAM_UNLOCK (self->srcpad);
683 return;
684 }
685 gst_caps_unref (old_caps);
686 }
687
688 gst_caps_replace (&self->priv->srccaps, caps);
689 gst_aggregator_push_mandatory_events (self, TRUE);
690 GST_PAD_STREAM_UNLOCK (self->srcpad);
691 }
692
693 static GstFlowReturn
gst_aggregator_default_finish_buffer(GstAggregator * self,GstBuffer * buffer)694 gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer)
695 {
696 gst_aggregator_push_mandatory_events (self, FALSE);
697
698 GST_OBJECT_LOCK (self);
699 if (!self->priv->flushing && gst_pad_is_active (self->srcpad)) {
700 GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
701 GST_OBJECT_UNLOCK (self);
702 return gst_pad_push (self->srcpad, buffer);
703 } else {
704 GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
705 self->priv->flushing, gst_pad_is_active (self->srcpad));
706 GST_OBJECT_UNLOCK (self);
707 gst_buffer_unref (buffer);
708 return GST_FLOW_OK;
709 }
710 }
711
712 /**
713 * gst_aggregator_finish_buffer:
714 * @aggregator: The #GstAggregator
715 * @buffer: (transfer full): the #GstBuffer to push.
716 *
717 * This method will push the provided output buffer downstream. If needed,
718 * mandatory events such as stream-start, caps, and segment events will be
719 * sent before pushing the buffer.
720 */
721 GstFlowReturn
gst_aggregator_finish_buffer(GstAggregator * aggregator,GstBuffer * buffer)722 gst_aggregator_finish_buffer (GstAggregator * aggregator, GstBuffer * buffer)
723 {
724 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator);
725
726 g_assert (klass->finish_buffer != NULL);
727
728 return klass->finish_buffer (aggregator, buffer);
729 }
730
731 static GstFlowReturn
gst_aggregator_default_finish_buffer_list(GstAggregator * self,GstBufferList * bufferlist)732 gst_aggregator_default_finish_buffer_list (GstAggregator * self,
733 GstBufferList * bufferlist)
734 {
735 gst_aggregator_push_mandatory_events (self, FALSE);
736
737 GST_OBJECT_LOCK (self);
738 if (!self->priv->flushing && gst_pad_is_active (self->srcpad)) {
739 GST_TRACE_OBJECT (self, "pushing bufferlist%" GST_PTR_FORMAT, bufferlist);
740 GST_OBJECT_UNLOCK (self);
741 return gst_pad_push_list (self->srcpad, bufferlist);
742 } else {
743 GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
744 self->priv->flushing, gst_pad_is_active (self->srcpad));
745 GST_OBJECT_UNLOCK (self);
746 gst_buffer_list_unref (bufferlist);
747 return GST_FLOW_OK;
748 }
749 }
750
751 /**
752 * gst_aggregator_finish_buffer_list:
753 * @aggregator: The #GstAggregator
754 * @bufferlist: (transfer full): the #GstBufferList to push.
755 *
756 * This method will push the provided output buffer list downstream. If needed,
757 * mandatory events such as stream-start, caps, and segment events will be
758 * sent before pushing the buffer.
759 *
760 * Since: 1.18
761 */
762 GstFlowReturn
gst_aggregator_finish_buffer_list(GstAggregator * aggregator,GstBufferList * bufferlist)763 gst_aggregator_finish_buffer_list (GstAggregator * aggregator,
764 GstBufferList * bufferlist)
765 {
766 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator);
767
768 g_assert (klass->finish_buffer_list != NULL);
769
770 return klass->finish_buffer_list (aggregator, bufferlist);
771 }
772
773 static void
gst_aggregator_push_eos(GstAggregator * self)774 gst_aggregator_push_eos (GstAggregator * self)
775 {
776 GstEvent *event;
777 gst_aggregator_push_mandatory_events (self, FALSE);
778
779 event = gst_event_new_eos ();
780
781 GST_OBJECT_LOCK (self);
782 self->priv->send_eos = FALSE;
783 gst_event_set_seqnum (event, self->priv->seqnum);
784 GST_OBJECT_UNLOCK (self);
785
786 gst_pad_push_event (self->srcpad, event);
787 }
788
789 static GstClockTime
gst_aggregator_get_next_time(GstAggregator * self)790 gst_aggregator_get_next_time (GstAggregator * self)
791 {
792 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
793
794 if (klass->get_next_time)
795 return klass->get_next_time (self);
796
797 return GST_CLOCK_TIME_NONE;
798 }
799
800 static gboolean
gst_aggregator_wait_and_check(GstAggregator * self,gboolean * timeout)801 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
802 {
803 GstClockTime latency;
804 GstClockTime start;
805 gboolean res;
806 gboolean have_event_or_query = FALSE;
807
808 *timeout = FALSE;
809
810 SRC_LOCK (self);
811
812 latency = gst_aggregator_get_latency_unlocked (self);
813
814 if (gst_aggregator_check_pads_ready (self, &have_event_or_query)) {
815 GST_DEBUG_OBJECT (self, "all pads have data");
816 SRC_UNLOCK (self);
817
818 return TRUE;
819 }
820
821 /* If we have an event or query, immediately return FALSE instead of waiting
822 * and handle it immediately */
823 if (have_event_or_query) {
824 GST_DEBUG_OBJECT (self, "Have serialized event or query to handle first");
825 SRC_UNLOCK (self);
826 return FALSE;
827 }
828
829 /* Before waiting, check if we're actually still running */
830 if (!self->priv->running || !self->priv->send_eos) {
831 SRC_UNLOCK (self);
832
833 return FALSE;
834 }
835
836 start = gst_aggregator_get_next_time (self);
837
838 /* If we're not live, or if we use the running time
839 * of the first buffer as start time, we wait until
840 * all pads have buffers.
841 * Otherwise (i.e. if we are live!), we wait on the clock
842 * and if a pad does not have a buffer in time we ignore
843 * that pad.
844 */
845 GST_OBJECT_LOCK (self);
846 if (!GST_CLOCK_TIME_IS_VALID (latency) ||
847 !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) ||
848 !GST_CLOCK_TIME_IS_VALID (start) ||
849 (self->priv->first_buffer
850 && self->priv->start_time_selection ==
851 GST_AGGREGATOR_START_TIME_SELECTION_FIRST)) {
852 /* We wake up here when something happened, and below
853 * then check if we're ready now. If we return FALSE,
854 * we will be directly called again.
855 */
856 GST_OBJECT_UNLOCK (self);
857 SRC_WAIT (self);
858 } else {
859 GstClockTime base_time, time;
860 GstClock *clock;
861 GstClockReturn status;
862 GstClockTimeDiff jitter;
863
864 GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
865 GST_TIME_ARGS (start));
866
867 base_time = GST_ELEMENT_CAST (self)->base_time;
868 clock = gst_object_ref (GST_ELEMENT_CLOCK (self));
869 GST_OBJECT_UNLOCK (self);
870
871 time = base_time + start;
872 time += latency;
873
874 GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
875 GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
876 " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")",
877 GST_TIME_ARGS (time),
878 GST_TIME_ARGS (base_time),
879 GST_TIME_ARGS (start), GST_TIME_ARGS (latency),
880 GST_TIME_ARGS (gst_clock_get_time (clock)));
881
882 self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
883 gst_object_unref (clock);
884 SRC_UNLOCK (self);
885
886 jitter = 0;
887 status = gst_clock_id_wait (self->priv->aggregate_id, &jitter);
888
889 SRC_LOCK (self);
890 if (self->priv->aggregate_id) {
891 gst_clock_id_unref (self->priv->aggregate_id);
892 self->priv->aggregate_id = NULL;
893 }
894
895 GST_DEBUG_OBJECT (self,
896 "clock returned %d (jitter: %" GST_STIME_FORMAT ")",
897 status, GST_STIME_ARGS (jitter));
898
899 /* we timed out */
900 if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
901 GList *l;
902
903 GST_OBJECT_LOCK (self);
904 for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
905 GstAggregatorPad *pad = GST_AGGREGATOR_PAD (l->data);
906
907 PAD_LOCK (pad);
908 pad->priv->waited_once = TRUE;
909 PAD_UNLOCK (pad);
910 }
911 GST_OBJECT_UNLOCK (self);
912
913 SRC_UNLOCK (self);
914 *timeout = TRUE;
915 return TRUE;
916 }
917 }
918
919 res = gst_aggregator_check_pads_ready (self, NULL);
920 SRC_UNLOCK (self);
921
922 return res;
923 }
924
925 typedef struct
926 {
927 gboolean processed_event;
928 GstFlowReturn flow_ret;
929 } DoHandleEventsAndQueriesData;
930
931 static gboolean
gst_aggregator_do_events_and_queries(GstElement * self,GstPad * epad,gpointer user_data)932 gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
933 gpointer user_data)
934 {
935 GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
936 GstAggregator *aggregator = GST_AGGREGATOR_CAST (self);
937 GstEvent *event = NULL;
938 GstQuery *query = NULL;
939 GstAggregatorClass *klass = NULL;
940 DoHandleEventsAndQueriesData *data = user_data;
941
942 do {
943 event = NULL;
944 query = NULL;
945
946 PAD_LOCK (pad);
947 if (pad->priv->clipped_buffer == NULL &&
948 !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
949 if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data)))
950 event = gst_event_ref (g_queue_peek_tail (&pad->priv->data));
951 if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))
952 query = g_queue_peek_tail (&pad->priv->data);
953 }
954 PAD_UNLOCK (pad);
955 if (event || query) {
956 gboolean ret;
957
958 data->processed_event = TRUE;
959 if (klass == NULL)
960 klass = GST_AGGREGATOR_GET_CLASS (self);
961
962 if (event) {
963 GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
964 gst_event_ref (event);
965 ret = klass->sink_event (aggregator, pad, event);
966
967 PAD_LOCK (pad);
968 if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) {
969 pad->priv->negotiated = ret;
970 }
971 if (g_queue_peek_tail (&pad->priv->data) == event)
972 gst_event_unref (g_queue_pop_tail (&pad->priv->data));
973 gst_event_unref (event);
974 } else if (query) {
975 GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query);
976 ret = klass->sink_query (aggregator, pad, query);
977
978 PAD_LOCK (pad);
979 if (g_queue_peek_tail (&pad->priv->data) == query) {
980 GstStructure *s;
981
982 s = gst_query_writable_structure (query);
983 gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret,
984 NULL);
985 g_queue_pop_tail (&pad->priv->data);
986 }
987 }
988
989 PAD_BROADCAST_EVENT (pad);
990 PAD_UNLOCK (pad);
991 }
992 } while (event || query);
993
994 return TRUE;
995 }
996
997 static gboolean
gst_aggregator_pad_skip_buffers(GstElement * self,GstPad * epad,gpointer user_data)998 gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
999 gpointer user_data)
1000 {
1001 GList *item;
1002 GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
1003 GstAggregator *agg = (GstAggregator *) self;
1004 GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
1005
1006 if (!klass->skip_buffer)
1007 return FALSE;
1008
1009 PAD_LOCK (aggpad);
1010
1011 item = g_queue_peek_tail_link (&aggpad->priv->data);
1012 while (item) {
1013 GList *prev = item->prev;
1014
1015 if (GST_IS_BUFFER (item->data)
1016 && klass->skip_buffer (aggpad, agg, item->data)) {
1017 GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
1018 gst_aggregator_pad_buffer_consumed (aggpad, GST_BUFFER (item->data),
1019 TRUE);
1020 gst_buffer_unref (item->data);
1021 g_queue_delete_link (&aggpad->priv->data, item);
1022 } else {
1023 break;
1024 }
1025
1026 item = prev;
1027 }
1028
1029 PAD_UNLOCK (aggpad);
1030
1031 return TRUE;
1032 }
1033
1034 static gboolean
gst_aggregator_pad_reset_peeked_buffer(GstElement * self,GstPad * epad,gpointer user_data)1035 gst_aggregator_pad_reset_peeked_buffer (GstElement * self, GstPad * epad,
1036 gpointer user_data)
1037 {
1038 GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
1039
1040 PAD_LOCK (aggpad);
1041
1042 gst_buffer_replace (&aggpad->priv->peeked_buffer, NULL);
1043
1044 PAD_UNLOCK (aggpad);
1045
1046 return TRUE;
1047 }
1048
1049
1050 static void
gst_aggregator_pad_set_flushing(GstAggregatorPad * aggpad,GstFlowReturn flow_return,gboolean full)1051 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
1052 GstFlowReturn flow_return, gboolean full)
1053 {
1054 GList *item;
1055
1056 PAD_LOCK (aggpad);
1057 if (flow_return == GST_FLOW_NOT_LINKED)
1058 aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
1059 else
1060 aggpad->priv->flow_return = flow_return;
1061
1062 item = g_queue_peek_head_link (&aggpad->priv->data);
1063 while (item) {
1064 GList *next = item->next;
1065
1066 /* In partial flush, we do like the pad, we get rid of non-sticky events
1067 * and EOS/SEGMENT.
1068 */
1069 if (full || GST_IS_BUFFER (item->data) ||
1070 GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
1071 GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
1072 !GST_EVENT_IS_STICKY (item->data)) {
1073 if (!GST_IS_QUERY (item->data))
1074 gst_mini_object_unref (item->data);
1075 g_queue_delete_link (&aggpad->priv->data, item);
1076 }
1077 item = next;
1078 }
1079 aggpad->priv->num_buffers = 0;
1080 gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
1081
1082 PAD_BROADCAST_EVENT (aggpad);
1083 PAD_UNLOCK (aggpad);
1084 }
1085
1086 static GstFlowReturn
gst_aggregator_default_update_src_caps(GstAggregator * agg,GstCaps * caps,GstCaps ** ret)1087 gst_aggregator_default_update_src_caps (GstAggregator * agg, GstCaps * caps,
1088 GstCaps ** ret)
1089 {
1090 *ret = gst_caps_ref (caps);
1091
1092 return GST_FLOW_OK;
1093 }
1094
1095 static GstCaps *
gst_aggregator_default_fixate_src_caps(GstAggregator * agg,GstCaps * caps)1096 gst_aggregator_default_fixate_src_caps (GstAggregator * agg, GstCaps * caps)
1097 {
1098 caps = gst_caps_fixate (caps);
1099
1100 return caps;
1101 }
1102
1103 static gboolean
gst_aggregator_default_negotiated_src_caps(GstAggregator * agg,GstCaps * caps)1104 gst_aggregator_default_negotiated_src_caps (GstAggregator * agg, GstCaps * caps)
1105 {
1106 return TRUE;
1107 }
1108
1109
1110 /* takes ownership of the pool, allocator and query */
1111 static gboolean
gst_aggregator_set_allocation(GstAggregator * self,GstBufferPool * pool,GstAllocator * allocator,const GstAllocationParams * params,GstQuery * query)1112 gst_aggregator_set_allocation (GstAggregator * self,
1113 GstBufferPool * pool, GstAllocator * allocator,
1114 const GstAllocationParams * params, GstQuery * query)
1115 {
1116 GstAllocator *oldalloc;
1117 GstBufferPool *oldpool;
1118 GstQuery *oldquery;
1119
1120 GST_DEBUG ("storing allocation query");
1121
1122 GST_OBJECT_LOCK (self);
1123 oldpool = self->priv->pool;
1124 self->priv->pool = pool;
1125
1126 oldalloc = self->priv->allocator;
1127 self->priv->allocator = allocator;
1128
1129 oldquery = self->priv->allocation_query;
1130 self->priv->allocation_query = query;
1131
1132 if (params)
1133 self->priv->allocation_params = *params;
1134 else
1135 gst_allocation_params_init (&self->priv->allocation_params);
1136 GST_OBJECT_UNLOCK (self);
1137
1138 if (oldpool) {
1139 GST_DEBUG_OBJECT (self, "deactivating old pool %p", oldpool);
1140 gst_buffer_pool_set_active (oldpool, FALSE);
1141 gst_object_unref (oldpool);
1142 }
1143 if (oldalloc) {
1144 gst_object_unref (oldalloc);
1145 }
1146 if (oldquery) {
1147 gst_query_unref (oldquery);
1148 }
1149 return TRUE;
1150 }
1151
1152
1153 static gboolean
gst_aggregator_decide_allocation(GstAggregator * self,GstQuery * query)1154 gst_aggregator_decide_allocation (GstAggregator * self, GstQuery * query)
1155 {
1156 GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
1157
1158 if (aggclass->decide_allocation)
1159 if (!aggclass->decide_allocation (self, query))
1160 return FALSE;
1161
1162 return TRUE;
1163 }
1164
1165 static gboolean
gst_aggregator_do_allocation(GstAggregator * self,GstCaps * caps)1166 gst_aggregator_do_allocation (GstAggregator * self, GstCaps * caps)
1167 {
1168 GstQuery *query;
1169 gboolean result = TRUE;
1170 GstBufferPool *pool = NULL;
1171 GstAllocator *allocator;
1172 GstAllocationParams params;
1173
1174 /* find a pool for the negotiated caps now */
1175 GST_DEBUG_OBJECT (self, "doing allocation query");
1176 query = gst_query_new_allocation (caps, TRUE);
1177 if (!gst_pad_peer_query (self->srcpad, query)) {
1178 /* not a problem, just debug a little */
1179 GST_DEBUG_OBJECT (self, "peer ALLOCATION query failed");
1180 }
1181
1182 GST_DEBUG_OBJECT (self, "calling decide_allocation");
1183 result = gst_aggregator_decide_allocation (self, query);
1184
1185 GST_DEBUG_OBJECT (self, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
1186 query);
1187
1188 if (!result)
1189 goto no_decide_allocation;
1190
1191 /* we got configuration from our peer or the decide_allocation method,
1192 * parse them */
1193 if (gst_query_get_n_allocation_params (query) > 0) {
1194 gst_query_parse_nth_allocation_param (query, 0, &allocator, ¶ms);
1195 } else {
1196 allocator = NULL;
1197 gst_allocation_params_init (¶ms);
1198 }
1199
1200 if (gst_query_get_n_allocation_pools (query) > 0)
1201 gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
1202
1203 /* now store */
1204 result =
1205 gst_aggregator_set_allocation (self, pool, allocator, ¶ms, query);
1206
1207 return result;
1208
1209 /* Errors */
1210 no_decide_allocation:
1211 {
1212 GST_WARNING_OBJECT (self, "Failed to decide allocation");
1213 gst_query_unref (query);
1214
1215 return result;
1216 }
1217
1218 }
1219
1220 static gboolean
gst_aggregator_default_negotiate(GstAggregator * self)1221 gst_aggregator_default_negotiate (GstAggregator * self)
1222 {
1223 GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self);
1224 GstCaps *downstream_caps, *template_caps, *caps = NULL;
1225 GstFlowReturn ret = GST_FLOW_OK;
1226
1227 template_caps = gst_pad_get_pad_template_caps (self->srcpad);
1228 downstream_caps = gst_pad_peer_query_caps (self->srcpad, template_caps);
1229
1230 if (gst_caps_is_empty (downstream_caps)) {
1231 GST_INFO_OBJECT (self, "Downstream caps (%"
1232 GST_PTR_FORMAT ") not compatible with pad template caps (%"
1233 GST_PTR_FORMAT ")", downstream_caps, template_caps);
1234 ret = GST_FLOW_NOT_NEGOTIATED;
1235 goto done;
1236 }
1237
1238 g_assert (agg_klass->update_src_caps);
1239 GST_DEBUG_OBJECT (self, "updating caps from %" GST_PTR_FORMAT,
1240 downstream_caps);
1241 ret = agg_klass->update_src_caps (self, downstream_caps, &caps);
1242 if (ret < GST_FLOW_OK) {
1243 GST_WARNING_OBJECT (self, "Subclass failed to update provided caps");
1244 goto done;
1245 } else if (ret == GST_AGGREGATOR_FLOW_NEED_DATA) {
1246 GST_DEBUG_OBJECT (self, "Subclass needs more data to decide on caps");
1247 goto done;
1248 }
1249 if ((caps == NULL || gst_caps_is_empty (caps)) && ret >= GST_FLOW_OK) {
1250 ret = GST_FLOW_NOT_NEGOTIATED;
1251 goto done;
1252 }
1253 GST_DEBUG_OBJECT (self, " to %" GST_PTR_FORMAT, caps);
1254
1255 #ifdef GST_ENABLE_EXTRA_CHECKS
1256 if (!gst_caps_is_subset (caps, template_caps)) {
1257 GstCaps *intersection;
1258
1259 GST_ERROR_OBJECT (self,
1260 "update_src_caps returned caps %" GST_PTR_FORMAT
1261 " which are not a real subset of the template caps %"
1262 GST_PTR_FORMAT, caps, template_caps);
1263 g_warning ("%s: update_src_caps returned caps which are not a real "
1264 "subset of the filter caps", GST_ELEMENT_NAME (self));
1265
1266 intersection =
1267 gst_caps_intersect_full (template_caps, caps, GST_CAPS_INTERSECT_FIRST);
1268 gst_caps_unref (caps);
1269 caps = intersection;
1270 }
1271 #endif
1272
1273 if (gst_caps_is_any (caps)) {
1274 goto done;
1275 }
1276
1277 if (!gst_caps_is_fixed (caps)) {
1278 g_assert (agg_klass->fixate_src_caps);
1279
1280 GST_DEBUG_OBJECT (self, "fixate caps from %" GST_PTR_FORMAT, caps);
1281 if (!(caps = agg_klass->fixate_src_caps (self, caps))) {
1282 GST_WARNING_OBJECT (self, "Subclass failed to fixate provided caps");
1283 ret = GST_FLOW_NOT_NEGOTIATED;
1284 goto done;
1285 }
1286 GST_DEBUG_OBJECT (self, " to %" GST_PTR_FORMAT, caps);
1287 }
1288
1289 if (agg_klass->negotiated_src_caps) {
1290 if (!agg_klass->negotiated_src_caps (self, caps)) {
1291 GST_WARNING_OBJECT (self, "Subclass failed to accept negotiated caps");
1292 ret = GST_FLOW_NOT_NEGOTIATED;
1293 goto done;
1294 }
1295 }
1296
1297 gst_aggregator_set_src_caps (self, caps);
1298
1299 if (!gst_aggregator_do_allocation (self, caps)) {
1300 GST_WARNING_OBJECT (self, "Allocation negotiation failed");
1301 ret = GST_FLOW_NOT_NEGOTIATED;
1302 }
1303
1304 done:
1305 gst_caps_unref (downstream_caps);
1306 gst_caps_unref (template_caps);
1307
1308 if (caps)
1309 gst_caps_unref (caps);
1310
1311 return ret >= GST_FLOW_OK || ret == GST_AGGREGATOR_FLOW_NEED_DATA;
1312 }
1313
1314 /* WITH SRC_LOCK held */
1315 static gboolean
gst_aggregator_negotiate_unlocked(GstAggregator * self)1316 gst_aggregator_negotiate_unlocked (GstAggregator * self)
1317 {
1318 GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self);
1319
1320 if (agg_klass->negotiate)
1321 return agg_klass->negotiate (self);
1322
1323 return TRUE;
1324 }
1325
1326 /**
1327 * gst_aggregator_negotiate:
1328 * @self: a #GstAggregator
1329 *
1330 * Negotiates src pad caps with downstream elements.
1331 * Unmarks GST_PAD_FLAG_NEED_RECONFIGURE in any case. But marks it again
1332 * if #GstAggregatorClass::negotiate fails.
1333 *
1334 * Returns: %TRUE if the negotiation succeeded, else %FALSE.
1335 *
1336 * Since: 1.18
1337 */
1338 gboolean
gst_aggregator_negotiate(GstAggregator * self)1339 gst_aggregator_negotiate (GstAggregator * self)
1340 {
1341 gboolean ret = TRUE;
1342
1343 g_return_val_if_fail (GST_IS_AGGREGATOR (self), FALSE);
1344
1345 GST_PAD_STREAM_LOCK (GST_AGGREGATOR_SRC_PAD (self));
1346 gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self));
1347 ret = gst_aggregator_negotiate_unlocked (self);
1348 if (!ret)
1349 gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self));
1350 GST_PAD_STREAM_UNLOCK (GST_AGGREGATOR_SRC_PAD (self));
1351
1352 return ret;
1353 }
1354
1355 static void
gst_aggregator_aggregate_func(GstAggregator * self)1356 gst_aggregator_aggregate_func (GstAggregator * self)
1357 {
1358 GstAggregatorPrivate *priv = self->priv;
1359 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1360 gboolean timeout = FALSE;
1361
1362 if (self->priv->running == FALSE) {
1363 GST_DEBUG_OBJECT (self, "Not running anymore");
1364 return;
1365 }
1366
1367 GST_LOG_OBJECT (self, "Checking aggregate");
1368 while (priv->send_eos && priv->running) {
1369 GstFlowReturn flow_return = GST_FLOW_OK;
1370 DoHandleEventsAndQueriesData events_query_data = { FALSE, GST_FLOW_OK };
1371
1372 gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1373 gst_aggregator_do_events_and_queries, &events_query_data);
1374
1375 if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
1376 goto handle_error;
1377
1378 if (self->priv->peer_latency_live)
1379 gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1380 gst_aggregator_pad_skip_buffers, NULL);
1381
1382 /* Ensure we have buffers ready (either in clipped_buffer or at the head of
1383 * the queue */
1384 if (!gst_aggregator_wait_and_check (self, &timeout)) {
1385 gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1386 gst_aggregator_pad_reset_peeked_buffer, NULL);
1387 continue;
1388 }
1389
1390 if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
1391 if (!gst_aggregator_negotiate_unlocked (self)) {
1392 gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self));
1393 if (GST_PAD_IS_FLUSHING (GST_AGGREGATOR_SRC_PAD (self))) {
1394 flow_return = GST_FLOW_FLUSHING;
1395 } else {
1396 flow_return = GST_FLOW_NOT_NEGOTIATED;
1397 }
1398 }
1399 }
1400
1401 if (timeout || flow_return >= GST_FLOW_OK) {
1402 GST_LOG_OBJECT (self, "Actually aggregating, timeout: %d", timeout);
1403 flow_return = klass->aggregate (self, timeout);
1404 }
1405
1406 gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1407 gst_aggregator_pad_reset_peeked_buffer, NULL);
1408
1409 if (!priv->selected_samples_called_or_warned) {
1410 GST_FIXME_OBJECT (self,
1411 "Subclass should call gst_aggregator_selected_samples() from its "
1412 "aggregate implementation.");
1413 priv->selected_samples_called_or_warned = TRUE;
1414 }
1415
1416 if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA)
1417 continue;
1418
1419 GST_OBJECT_LOCK (self);
1420 if (flow_return == GST_FLOW_FLUSHING && priv->flushing) {
1421 /* We don't want to set the pads to flushing, but we want to
1422 * stop the thread, so just break here */
1423 GST_OBJECT_UNLOCK (self);
1424 break;
1425 }
1426 GST_OBJECT_UNLOCK (self);
1427
1428 if (flow_return == GST_FLOW_EOS || flow_return == GST_FLOW_ERROR) {
1429 gst_aggregator_push_eos (self);
1430 }
1431
1432 handle_error:
1433 GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
1434
1435 if (flow_return != GST_FLOW_OK) {
1436 GList *item;
1437
1438 GST_OBJECT_LOCK (self);
1439 for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
1440 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
1441
1442 gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE);
1443 }
1444 GST_OBJECT_UNLOCK (self);
1445 break;
1446 }
1447 }
1448
1449 /* Pause the task here, the only ways to get here are:
1450 * 1) We're stopping, in which case the task is stopped anyway
1451 * 2) We got a flow error above, in which case it might take
1452 * some time to forward the flow return upstream and we
1453 * would otherwise call the task function over and over
1454 * again without doing anything
1455 */
1456 gst_pad_pause_task (self->srcpad);
1457 }
1458
1459 static gboolean
gst_aggregator_start(GstAggregator * self)1460 gst_aggregator_start (GstAggregator * self)
1461 {
1462 GstAggregatorClass *klass;
1463 gboolean result;
1464
1465 self->priv->send_stream_start = TRUE;
1466 self->priv->send_segment = TRUE;
1467 self->priv->send_eos = TRUE;
1468 self->priv->srccaps = NULL;
1469
1470 self->priv->has_peer_latency = FALSE;
1471 self->priv->peer_latency_live = FALSE;
1472 self->priv->peer_latency_min = self->priv->peer_latency_max = 0;
1473
1474 gst_aggregator_set_allocation (self, NULL, NULL, NULL, NULL);
1475
1476 klass = GST_AGGREGATOR_GET_CLASS (self);
1477
1478 if (klass->start)
1479 result = klass->start (self);
1480 else
1481 result = TRUE;
1482
1483 return result;
1484 }
1485
1486 static gboolean
gst_aggregator_stop_srcpad_task(GstAggregator * self,GstEvent * flush_start)1487 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
1488 {
1489 gboolean res = TRUE;
1490
1491 GST_INFO_OBJECT (self, "%s srcpad task",
1492 flush_start ? "Pausing" : "Stopping");
1493
1494 SRC_LOCK (self);
1495 self->priv->running = FALSE;
1496 SRC_BROADCAST (self);
1497 SRC_UNLOCK (self);
1498
1499 if (flush_start) {
1500 res = gst_pad_push_event (self->srcpad, flush_start);
1501 }
1502
1503 gst_pad_stop_task (self->srcpad);
1504
1505 return res;
1506 }
1507
1508 static void
gst_aggregator_start_srcpad_task(GstAggregator * self)1509 gst_aggregator_start_srcpad_task (GstAggregator * self)
1510 {
1511 GST_INFO_OBJECT (self, "Starting srcpad task");
1512
1513 self->priv->running = TRUE;
1514 gst_pad_start_task (GST_PAD (self->srcpad),
1515 (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
1516 }
1517
1518 static GstFlowReturn
gst_aggregator_flush(GstAggregator * self)1519 gst_aggregator_flush (GstAggregator * self)
1520 {
1521 GstFlowReturn ret = GST_FLOW_OK;
1522 GstAggregatorPrivate *priv = self->priv;
1523 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1524
1525 GST_DEBUG_OBJECT (self, "Flushing everything");
1526 GST_OBJECT_LOCK (self);
1527 priv->send_segment = TRUE;
1528 priv->flushing = FALSE;
1529 priv->tags_changed = FALSE;
1530 GST_OBJECT_UNLOCK (self);
1531 if (klass->flush)
1532 ret = klass->flush (self);
1533
1534 return ret;
1535 }
1536
1537
1538 /* Called with GstAggregator's object lock held */
1539
1540 static gboolean
gst_aggregator_all_flush_stop_received(GstAggregator * self,guint32 seqnum)1541 gst_aggregator_all_flush_stop_received (GstAggregator * self, guint32 seqnum)
1542 {
1543 GList *tmp;
1544 GstAggregatorPad *tmppad;
1545
1546 for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
1547 tmppad = (GstAggregatorPad *) tmp->data;
1548
1549 if (tmppad->priv->last_flush_stop_seqnum != seqnum)
1550 return FALSE;
1551 }
1552
1553 return TRUE;
1554 }
1555
1556 /* Called with GstAggregator's object lock held */
1557
1558 static gboolean
gst_aggregator_all_flush_start_received(GstAggregator * self,guint32 seqnum)1559 gst_aggregator_all_flush_start_received (GstAggregator * self, guint32 seqnum)
1560 {
1561 GList *tmp;
1562 GstAggregatorPad *tmppad;
1563
1564 for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
1565 tmppad = (GstAggregatorPad *) tmp->data;
1566
1567 if (tmppad->priv->last_flush_start_seqnum != seqnum) {
1568 return FALSE;
1569 }
1570 }
1571
1572 return TRUE;
1573 }
1574
1575 static void
gst_aggregator_flush_start(GstAggregator * self,GstAggregatorPad * aggpad,GstEvent * event)1576 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
1577 GstEvent * event)
1578 {
1579 GstAggregatorPrivate *priv = self->priv;
1580 GstAggregatorPadPrivate *padpriv = aggpad->priv;
1581 guint32 seqnum = gst_event_get_seqnum (event);
1582
1583 gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE);
1584
1585 PAD_FLUSH_LOCK (aggpad);
1586 PAD_LOCK (aggpad);
1587 padpriv->last_flush_start_seqnum = seqnum;
1588 PAD_UNLOCK (aggpad);
1589
1590 GST_OBJECT_LOCK (self);
1591
1592 if (!priv->flushing && gst_aggregator_all_flush_start_received (self, seqnum)) {
1593 /* Make sure we don't forward more than one FLUSH_START */
1594 priv->flushing = TRUE;
1595 priv->next_seqnum = seqnum;
1596 GST_OBJECT_UNLOCK (self);
1597
1598 GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
1599 gst_aggregator_stop_srcpad_task (self, event);
1600
1601 event = NULL;
1602 } else {
1603 gst_event_unref (event);
1604 GST_OBJECT_UNLOCK (self);
1605 }
1606
1607 PAD_FLUSH_UNLOCK (aggpad);
1608 }
1609
1610 /* Must be called with the the PAD_LOCK held */
1611 static void
update_time_level(GstAggregatorPad * aggpad,gboolean head)1612 update_time_level (GstAggregatorPad * aggpad, gboolean head)
1613 {
1614 GstAggregatorPadPrivate *priv = aggpad->priv;
1615
1616 if (head) {
1617 if (GST_CLOCK_TIME_IS_VALID (priv->head_position) &&
1618 priv->head_segment.format == GST_FORMAT_TIME)
1619 priv->head_time = gst_segment_to_running_time (&priv->head_segment,
1620 GST_FORMAT_TIME, priv->head_position);
1621 else
1622 priv->head_time = GST_CLOCK_TIME_NONE;
1623
1624 if (!GST_CLOCK_TIME_IS_VALID (priv->tail_time))
1625 priv->tail_time = priv->head_time;
1626 } else {
1627 if (GST_CLOCK_TIME_IS_VALID (priv->tail_position) &&
1628 aggpad->segment.format == GST_FORMAT_TIME)
1629 priv->tail_time = gst_segment_to_running_time (&aggpad->segment,
1630 GST_FORMAT_TIME, priv->tail_position);
1631 else
1632 priv->tail_time = priv->head_time;
1633 }
1634
1635 if (priv->head_time == GST_CLOCK_TIME_NONE ||
1636 priv->tail_time == GST_CLOCK_TIME_NONE) {
1637 priv->time_level = 0;
1638 return;
1639 }
1640
1641 if (priv->tail_time > priv->head_time)
1642 priv->time_level = 0;
1643 else
1644 priv->time_level = priv->head_time - priv->tail_time;
1645 }
1646
1647
1648 /* GstAggregator vmethods default implementations */
1649 static gboolean
gst_aggregator_default_sink_event(GstAggregator * self,GstAggregatorPad * aggpad,GstEvent * event)1650 gst_aggregator_default_sink_event (GstAggregator * self,
1651 GstAggregatorPad * aggpad, GstEvent * event)
1652 {
1653 gboolean res = TRUE;
1654 GstPad *pad = GST_PAD (aggpad);
1655 GstAggregatorPrivate *priv = self->priv;
1656
1657 GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event);
1658
1659 switch (GST_EVENT_TYPE (event)) {
1660 case GST_EVENT_FLUSH_START:
1661 {
1662 gst_aggregator_flush_start (self, aggpad, event);
1663 /* We forward only in one case: right after flushing */
1664 event = NULL;
1665 goto eat;
1666 }
1667 case GST_EVENT_FLUSH_STOP:
1668 {
1669 guint32 seqnum = gst_event_get_seqnum (event);
1670
1671 PAD_FLUSH_LOCK (aggpad);
1672 PAD_LOCK (aggpad);
1673 aggpad->priv->last_flush_stop_seqnum = seqnum;
1674 PAD_UNLOCK (aggpad);
1675
1676 gst_aggregator_pad_flush (aggpad, self);
1677
1678 GST_OBJECT_LOCK (self);
1679 if (priv->flushing
1680 && gst_aggregator_all_flush_stop_received (self, seqnum)) {
1681 GST_OBJECT_UNLOCK (self);
1682 /* That means we received FLUSH_STOP/FLUSH_STOP on
1683 * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
1684 gst_aggregator_flush (self);
1685 gst_pad_push_event (self->srcpad, event);
1686 event = NULL;
1687 SRC_LOCK (self);
1688 priv->send_eos = TRUE;
1689 SRC_BROADCAST (self);
1690 SRC_UNLOCK (self);
1691
1692 GST_INFO_OBJECT (self, "Flush stopped");
1693
1694 gst_aggregator_start_srcpad_task (self);
1695 } else {
1696 GST_OBJECT_UNLOCK (self);
1697 }
1698
1699 PAD_FLUSH_UNLOCK (aggpad);
1700
1701 /* We never forward the event */
1702 goto eat;
1703 }
1704 case GST_EVENT_EOS:
1705 {
1706 SRC_LOCK (self);
1707 PAD_LOCK (aggpad);
1708 g_assert (aggpad->priv->num_buffers == 0);
1709 aggpad->priv->eos = TRUE;
1710 PAD_UNLOCK (aggpad);
1711 SRC_BROADCAST (self);
1712 SRC_UNLOCK (self);
1713 goto eat;
1714 }
1715 case GST_EVENT_SEGMENT:
1716 {
1717 PAD_LOCK (aggpad);
1718 GST_OBJECT_LOCK (aggpad);
1719 gst_event_copy_segment (event, &aggpad->segment);
1720 /* We've got a new segment, tail_position is now meaningless
1721 * and may interfere with the time_level calculation
1722 */
1723 aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
1724 update_time_level (aggpad, FALSE);
1725 GST_OBJECT_UNLOCK (aggpad);
1726 PAD_UNLOCK (aggpad);
1727
1728 GST_OBJECT_LOCK (self);
1729 self->priv->seqnum = gst_event_get_seqnum (event);
1730 GST_OBJECT_UNLOCK (self);
1731 goto eat;
1732 }
1733 case GST_EVENT_STREAM_START:
1734 {
1735 goto eat;
1736 }
1737 case GST_EVENT_GAP:
1738 {
1739 GstClockTime pts, endpts;
1740 GstClockTime duration;
1741 GstBuffer *gapbuf;
1742
1743 gst_event_parse_gap (event, &pts, &duration);
1744
1745 if (GST_CLOCK_TIME_IS_VALID (duration))
1746 endpts = pts + duration;
1747 else
1748 endpts = GST_CLOCK_TIME_NONE;
1749
1750 GST_OBJECT_LOCK (aggpad);
1751 res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts,
1752 &pts, &endpts);
1753 GST_OBJECT_UNLOCK (aggpad);
1754
1755 if (!res) {
1756 GST_WARNING_OBJECT (self, "GAP event outside segment, dropping");
1757 goto eat;
1758 }
1759
1760 if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts))
1761 duration = endpts - pts;
1762 else
1763 duration = GST_CLOCK_TIME_NONE;
1764
1765 gapbuf = gst_buffer_new ();
1766 GST_BUFFER_PTS (gapbuf) = pts;
1767 GST_BUFFER_DURATION (gapbuf) = duration;
1768 GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
1769 GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
1770
1771 /* Remove GAP event so we can replace it with the buffer */
1772 PAD_LOCK (aggpad);
1773 if (g_queue_peek_tail (&aggpad->priv->data) == event)
1774 gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
1775 PAD_UNLOCK (aggpad);
1776
1777 if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
1778 GST_FLOW_OK) {
1779 GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
1780 res = FALSE;
1781 }
1782
1783 goto eat;
1784 }
1785 case GST_EVENT_TAG:
1786 goto eat;
1787 default:
1788 {
1789 break;
1790 }
1791 }
1792
1793 GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
1794 return gst_pad_event_default (pad, GST_OBJECT (self), event);
1795
1796 eat:
1797 GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
1798 if (event)
1799 gst_event_unref (event);
1800
1801 return res;
1802 }
1803
1804 /* Queue serialized events and let the others go through directly.
1805 * The queued events with be handled from the src-pad task in
1806 * gst_aggregator_do_events_and_queries().
1807 */
1808 static GstFlowReturn
gst_aggregator_default_sink_event_pre_queue(GstAggregator * self,GstAggregatorPad * aggpad,GstEvent * event)1809 gst_aggregator_default_sink_event_pre_queue (GstAggregator * self,
1810 GstAggregatorPad * aggpad, GstEvent * event)
1811 {
1812 GstFlowReturn ret = GST_FLOW_OK;
1813
1814 if (GST_EVENT_IS_SERIALIZED (event)
1815 && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
1816 SRC_LOCK (self);
1817 PAD_LOCK (aggpad);
1818
1819 if (aggpad->priv->flow_return != GST_FLOW_OK)
1820 goto flushing;
1821
1822 if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
1823 GST_OBJECT_LOCK (aggpad);
1824 gst_event_copy_segment (event, &aggpad->priv->head_segment);
1825 aggpad->priv->head_position = aggpad->priv->head_segment.position;
1826 update_time_level (aggpad, TRUE);
1827 GST_OBJECT_UNLOCK (aggpad);
1828 }
1829
1830 GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event);
1831 g_queue_push_head (&aggpad->priv->data, event);
1832 SRC_BROADCAST (self);
1833 PAD_UNLOCK (aggpad);
1834 SRC_UNLOCK (self);
1835 } else {
1836 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1837
1838 if (!klass->sink_event (self, aggpad, event)) {
1839 /* Copied from GstPad to convert boolean to a GstFlowReturn in
1840 * the event handling func */
1841 ret = GST_FLOW_ERROR;
1842 }
1843 }
1844
1845 return ret;
1846
1847 flushing:
1848 GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
1849 gst_flow_get_name (aggpad->priv->flow_return));
1850 PAD_UNLOCK (aggpad);
1851 SRC_UNLOCK (self);
1852 if (GST_EVENT_IS_STICKY (event))
1853 gst_pad_store_sticky_event (GST_PAD (aggpad), event);
1854 gst_event_unref (event);
1855
1856 return aggpad->priv->flow_return;
1857 }
1858
1859 static gboolean
gst_aggregator_stop_pad(GstElement * self,GstPad * epad,gpointer user_data)1860 gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data)
1861 {
1862 GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
1863 GstAggregator *agg = GST_AGGREGATOR_CAST (self);
1864
1865 gst_aggregator_pad_flush (pad, agg);
1866
1867 PAD_LOCK (pad);
1868 pad->priv->flow_return = GST_FLOW_FLUSHING;
1869 pad->priv->negotiated = FALSE;
1870 PAD_BROADCAST_EVENT (pad);
1871 PAD_UNLOCK (pad);
1872
1873 return TRUE;
1874 }
1875
1876 static gboolean
gst_aggregator_stop(GstAggregator * agg)1877 gst_aggregator_stop (GstAggregator * agg)
1878 {
1879 GstAggregatorClass *klass;
1880 gboolean result;
1881
1882 gst_aggregator_reset_flow_values (agg);
1883
1884 /* Application needs to make sure no pads are added while it shuts us down */
1885 gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg),
1886 gst_aggregator_stop_pad, NULL);
1887
1888 klass = GST_AGGREGATOR_GET_CLASS (agg);
1889
1890 if (klass->stop)
1891 result = klass->stop (agg);
1892 else
1893 result = TRUE;
1894
1895 agg->priv->has_peer_latency = FALSE;
1896 agg->priv->peer_latency_live = FALSE;
1897 agg->priv->peer_latency_min = agg->priv->peer_latency_max = 0;
1898
1899 if (agg->priv->tags)
1900 gst_tag_list_unref (agg->priv->tags);
1901 agg->priv->tags = NULL;
1902
1903 gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL);
1904
1905 if (agg->priv->running) {
1906 /* As sinkpads get deactivated after the src pad, we
1907 * may have restarted the source pad task after receiving
1908 * flush events on one of our sinkpads. Stop our src pad
1909 * task again if that is the case */
1910 gst_aggregator_stop_srcpad_task (agg, NULL);
1911 }
1912
1913 return result;
1914 }
1915
1916 /* GstElement vmethods implementations */
1917 static GstStateChangeReturn
gst_aggregator_change_state(GstElement * element,GstStateChange transition)1918 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
1919 {
1920 GstStateChangeReturn ret;
1921 GstAggregator *self = GST_AGGREGATOR (element);
1922
1923 switch (transition) {
1924 case GST_STATE_CHANGE_READY_TO_PAUSED:
1925 if (!gst_aggregator_start (self))
1926 goto error_start;
1927 break;
1928 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1929 /* Wake up any waiting as now we have a clock and can do
1930 * proper waiting on the clock if necessary */
1931 SRC_LOCK (self);
1932 SRC_BROADCAST (self);
1933 SRC_UNLOCK (self);
1934 break;
1935 default:
1936 break;
1937 }
1938
1939 if ((ret =
1940 GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
1941 transition)) == GST_STATE_CHANGE_FAILURE)
1942 goto failure;
1943
1944
1945 switch (transition) {
1946 case GST_STATE_CHANGE_PAUSED_TO_READY:
1947 if (!gst_aggregator_stop (self)) {
1948 /* What to do in this case? Error out? */
1949 GST_ERROR_OBJECT (self, "Subclass failed to stop.");
1950 }
1951 break;
1952 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1953 /* Wake up any waiting as now clock might be gone and we might
1954 * need to wait on the condition variable again */
1955 SRC_LOCK (self);
1956 SRC_BROADCAST (self);
1957 SRC_UNLOCK (self);
1958 break;
1959 default:
1960 break;
1961 }
1962
1963 return ret;
1964
1965 /* ERRORS */
1966 failure:
1967 {
1968 GST_ERROR_OBJECT (element, "parent failed state change");
1969 return ret;
1970 }
1971 error_start:
1972 {
1973 GST_ERROR_OBJECT (element, "Subclass failed to start");
1974 return GST_STATE_CHANGE_FAILURE;
1975 }
1976 }
1977
1978 static void
gst_aggregator_release_pad(GstElement * element,GstPad * pad)1979 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
1980 {
1981 GstAggregator *self = GST_AGGREGATOR (element);
1982 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1983
1984 GST_INFO_OBJECT (pad, "Removing pad");
1985
1986 SRC_LOCK (self);
1987 gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
1988 gst_buffer_replace (&aggpad->priv->peeked_buffer, NULL);
1989 gst_element_remove_pad (element, pad);
1990
1991 self->priv->has_peer_latency = FALSE;
1992 SRC_BROADCAST (self);
1993 SRC_UNLOCK (self);
1994 }
1995
1996 static GstAggregatorPad *
gst_aggregator_default_create_new_pad(GstAggregator * self,GstPadTemplate * templ,const gchar * req_name,const GstCaps * caps)1997 gst_aggregator_default_create_new_pad (GstAggregator * self,
1998 GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1999 {
2000 GstAggregatorPad *agg_pad;
2001 GstAggregatorPrivate *priv = self->priv;
2002 gint serial = 0;
2003 gchar *name = NULL;
2004 GType pad_type =
2005 GST_PAD_TEMPLATE_GTYPE (templ) ==
2006 G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ);
2007
2008 if (templ->direction != GST_PAD_SINK)
2009 goto not_sink;
2010
2011 if (templ->presence != GST_PAD_REQUEST)
2012 goto not_request;
2013
2014 GST_OBJECT_LOCK (self);
2015 if (req_name == NULL || strlen (req_name) < 6
2016 || !g_str_has_prefix (req_name, "sink_")
2017 || strrchr (req_name, '%') != NULL) {
2018 /* no name given when requesting the pad, use next available int */
2019 serial = ++priv->max_padserial;
2020 } else {
2021 gchar *endptr = NULL;
2022
2023 /* parse serial number from requested padname */
2024 serial = g_ascii_strtoull (&req_name[5], &endptr, 10);
2025 if (endptr != NULL && *endptr == '\0') {
2026 if (serial > priv->max_padserial) {
2027 priv->max_padserial = serial;
2028 }
2029 } else {
2030 serial = ++priv->max_padserial;
2031 }
2032 }
2033
2034 name = g_strdup_printf ("sink_%u", serial);
2035 g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD));
2036 agg_pad = g_object_new (pad_type,
2037 "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
2038 g_free (name);
2039
2040 GST_OBJECT_UNLOCK (self);
2041
2042 return agg_pad;
2043
2044 /* errors */
2045 not_sink:
2046 {
2047 GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad");
2048 return NULL;
2049 }
2050 not_request:
2051 {
2052 GST_WARNING_OBJECT (self, "request new pad that is not a REQUEST pad");
2053 return NULL;
2054 }
2055 }
2056
2057 static GstPad *
gst_aggregator_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * req_name,const GstCaps * caps)2058 gst_aggregator_request_new_pad (GstElement * element,
2059 GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
2060 {
2061 GstAggregator *self;
2062 GstAggregatorPad *agg_pad;
2063 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (element);
2064 GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
2065
2066 self = GST_AGGREGATOR (element);
2067
2068 agg_pad = klass->create_new_pad (self, templ, req_name, caps);
2069 if (!agg_pad) {
2070 GST_ERROR_OBJECT (element, "Couldn't create new pad");
2071 return NULL;
2072 }
2073
2074 GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
2075
2076 if (priv->running)
2077 gst_pad_set_active (GST_PAD (agg_pad), TRUE);
2078
2079 /* add the pad to the element */
2080 gst_element_add_pad (element, GST_PAD (agg_pad));
2081
2082 return GST_PAD (agg_pad);
2083 }
2084
2085 /* Must be called with SRC_LOCK held, temporarily releases it! */
2086
2087 static gboolean
gst_aggregator_query_latency_unlocked(GstAggregator * self,GstQuery * query)2088 gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
2089 {
2090 gboolean query_ret, live;
2091 GstClockTime our_latency, min, max;
2092
2093 /* Temporarily release the lock to do the query. */
2094 SRC_UNLOCK (self);
2095 query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
2096 SRC_LOCK (self);
2097
2098 if (!query_ret) {
2099 GST_WARNING_OBJECT (self, "Latency query failed");
2100 return FALSE;
2101 }
2102
2103 gst_query_parse_latency (query, &live, &min, &max);
2104
2105 if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
2106 GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
2107 ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
2108 return FALSE;
2109 }
2110
2111 if (self->priv->upstream_latency_min > min) {
2112 GstClockTimeDiff diff =
2113 GST_CLOCK_DIFF (min, self->priv->upstream_latency_min);
2114
2115 min += diff;
2116 if (GST_CLOCK_TIME_IS_VALID (max)) {
2117 max += diff;
2118 }
2119 }
2120
2121 if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
2122 SRC_UNLOCK (self);
2123 GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
2124 ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
2125 GST_TIME_FORMAT ". Add queues or other buffering elements.",
2126 GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
2127 SRC_LOCK (self);
2128 return FALSE;
2129 }
2130
2131 our_latency = self->priv->latency;
2132
2133 self->priv->peer_latency_live = live;
2134 self->priv->peer_latency_min = min;
2135 self->priv->peer_latency_max = max;
2136 self->priv->has_peer_latency = TRUE;
2137
2138 /* add our own */
2139 min += our_latency;
2140 min += self->priv->sub_latency_min;
2141 if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
2142 && GST_CLOCK_TIME_IS_VALID (max))
2143 max += self->priv->sub_latency_max + our_latency;
2144 else
2145 max = GST_CLOCK_TIME_NONE;
2146
2147 SRC_BROADCAST (self);
2148
2149 GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
2150 " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
2151
2152 gst_query_set_latency (query, live, min, max);
2153
2154 return query_ret;
2155 }
2156
2157 /*
2158 * MUST be called with the src_lock held. Temporarily releases the lock inside
2159 * gst_aggregator_query_latency_unlocked() to do the actual query!
2160 *
2161 * See gst_aggregator_get_latency() for doc
2162 */
2163 static GstClockTime
gst_aggregator_get_latency_unlocked(GstAggregator * self)2164 gst_aggregator_get_latency_unlocked (GstAggregator * self)
2165 {
2166 GstClockTime latency;
2167
2168 g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0);
2169
2170 if (!self->priv->has_peer_latency) {
2171 GstQuery *query = gst_query_new_latency ();
2172 gboolean ret;
2173
2174 ret = gst_aggregator_query_latency_unlocked (self, query);
2175 gst_query_unref (query);
2176 if (!ret)
2177 return GST_CLOCK_TIME_NONE;
2178 }
2179
2180 if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
2181 return GST_CLOCK_TIME_NONE;
2182
2183 /* latency_min is never GST_CLOCK_TIME_NONE by construction */
2184 latency = self->priv->peer_latency_min;
2185
2186 /* add our own */
2187 latency += self->priv->latency;
2188 latency += self->priv->sub_latency_min;
2189
2190 return latency;
2191 }
2192
2193 /**
2194 * gst_aggregator_get_latency:
2195 * @self: a #GstAggregator
2196 *
2197 * Retrieves the latency values reported by @self in response to the latency
2198 * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element
2199 * will not wait for the clock.
2200 *
2201 * Typically only called by subclasses.
2202 *
2203 * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync
2204 */
2205 GstClockTime
gst_aggregator_get_latency(GstAggregator * self)2206 gst_aggregator_get_latency (GstAggregator * self)
2207 {
2208 GstClockTime ret;
2209
2210 SRC_LOCK (self);
2211 ret = gst_aggregator_get_latency_unlocked (self);
2212 SRC_UNLOCK (self);
2213
2214 return ret;
2215 }
2216
2217 static gboolean
gst_aggregator_send_event(GstElement * element,GstEvent * event)2218 gst_aggregator_send_event (GstElement * element, GstEvent * event)
2219 {
2220 GstAggregator *self = GST_AGGREGATOR (element);
2221
2222 GST_STATE_LOCK (element);
2223 if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
2224 GST_STATE (element) < GST_STATE_PAUSED) {
2225 gdouble rate;
2226 GstFormat fmt;
2227 GstSeekFlags flags;
2228 GstSeekType start_type, stop_type;
2229 gint64 start, stop;
2230
2231 gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
2232 &start, &stop_type, &stop);
2233
2234 GST_OBJECT_LOCK (self);
2235 gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
2236 flags, start_type, start, stop_type, stop, NULL);
2237 self->priv->next_seqnum = gst_event_get_seqnum (event);
2238 self->priv->first_buffer = FALSE;
2239 GST_OBJECT_UNLOCK (self);
2240
2241 GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
2242 }
2243 GST_STATE_UNLOCK (element);
2244
2245 return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
2246 event);
2247 }
2248
2249 static gboolean
gst_aggregator_default_src_query(GstAggregator * self,GstQuery * query)2250 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
2251 {
2252 gboolean res = TRUE;
2253
2254 switch (GST_QUERY_TYPE (query)) {
2255 case GST_QUERY_SEEKING:
2256 {
2257 GstFormat format;
2258
2259 /* don't pass it along as some (file)sink might claim it does
2260 * whereas with a collectpads in between that will not likely work */
2261 gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
2262 gst_query_set_seeking (query, format, FALSE, 0, -1);
2263 res = TRUE;
2264
2265 break;
2266 }
2267 case GST_QUERY_LATENCY:
2268 SRC_LOCK (self);
2269 res = gst_aggregator_query_latency_unlocked (self, query);
2270 SRC_UNLOCK (self);
2271 break;
2272 default:
2273 return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
2274 }
2275
2276 return res;
2277 }
2278
2279 static gboolean
gst_aggregator_event_forward_func(GstPad * pad,gpointer user_data)2280 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
2281 {
2282 EventData *evdata = user_data;
2283 gboolean ret = TRUE;
2284 GstPad *peer = gst_pad_get_peer (pad);
2285 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2286
2287 if (peer) {
2288 if (evdata->only_to_active_pads && aggpad->priv->first_buffer) {
2289 GST_DEBUG_OBJECT (pad, "not sending event to inactive pad");
2290 ret = TRUE;
2291 } else {
2292 ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
2293 GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
2294 }
2295 }
2296
2297 if (ret == FALSE) {
2298 if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
2299 GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
2300
2301 GST_DEBUG_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
2302
2303 if (gst_pad_query (peer, seeking)) {
2304 gboolean seekable;
2305
2306 gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
2307
2308 if (seekable == FALSE) {
2309 GST_INFO_OBJECT (pad,
2310 "Source not seekable, We failed but it does not matter!");
2311
2312 ret = TRUE;
2313 }
2314 } else {
2315 GST_ERROR_OBJECT (pad, "Query seeking FAILED");
2316 }
2317
2318 gst_query_unref (seeking);
2319 }
2320 } else {
2321 evdata->one_actually_seeked = TRUE;
2322 }
2323
2324 evdata->result &= ret;
2325
2326 if (peer)
2327 gst_object_unref (peer);
2328
2329 /* Always send to all pads */
2330 return FALSE;
2331 }
2332
2333 static void
gst_aggregator_forward_event_to_all_sinkpads(GstAggregator * self,EventData * evdata)2334 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
2335 EventData * evdata)
2336 {
2337 evdata->result = TRUE;
2338 evdata->one_actually_seeked = FALSE;
2339
2340 gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata);
2341
2342 gst_event_unref (evdata->event);
2343 }
2344
2345 static gboolean
gst_aggregator_do_seek(GstAggregator * self,GstEvent * event)2346 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
2347 {
2348 gdouble rate;
2349 GstFormat fmt;
2350 GstSeekFlags flags;
2351 GstSeekType start_type, stop_type;
2352 gint64 start, stop;
2353 gboolean flush;
2354 EventData evdata = { 0, };
2355 GstAggregatorPrivate *priv = self->priv;
2356
2357 gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
2358 &start, &stop_type, &stop);
2359
2360 GST_INFO_OBJECT (self, "starting SEEK");
2361
2362 flush = flags & GST_SEEK_FLAG_FLUSH;
2363
2364 GST_OBJECT_LOCK (self);
2365
2366 if (gst_event_get_seqnum (event) == self->priv->next_seqnum) {
2367 evdata.result = TRUE;
2368 GST_DEBUG_OBJECT (self, "Dropping duplicated seek event with seqnum %d",
2369 self->priv->next_seqnum);
2370 GST_OBJECT_UNLOCK (self);
2371 goto done;
2372 }
2373
2374 self->priv->next_seqnum = gst_event_get_seqnum (event);
2375
2376 gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
2377 flags, start_type, start, stop_type, stop, NULL);
2378
2379 /* Seeking sets a position */
2380 self->priv->first_buffer = FALSE;
2381
2382 if (flush)
2383 priv->flushing = TRUE;
2384
2385 GST_OBJECT_UNLOCK (self);
2386
2387 if (flush) {
2388 GstEvent *event = gst_event_new_flush_start ();
2389
2390 gst_event_set_seqnum (event, self->priv->next_seqnum);
2391 gst_aggregator_stop_srcpad_task (self, event);
2392 }
2393
2394 /* forward the seek upstream */
2395 evdata.event = event;
2396 evdata.flush = flush;
2397 evdata.only_to_active_pads = FALSE;
2398 gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2399 event = NULL;
2400
2401 if (!evdata.result || !evdata.one_actually_seeked) {
2402 GST_OBJECT_LOCK (self);
2403 priv->flushing = FALSE;
2404 GST_OBJECT_UNLOCK (self);
2405
2406 /* No flush stop is inbound for us to forward */
2407 if (flush) {
2408 GstEvent *event = gst_event_new_flush_stop (TRUE);
2409
2410 gst_event_set_seqnum (event, self->priv->next_seqnum);
2411 gst_pad_push_event (self->srcpad, event);
2412 }
2413 }
2414
2415 done:
2416 GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
2417
2418 return evdata.result;
2419 }
2420
2421 static gboolean
gst_aggregator_default_src_event(GstAggregator * self,GstEvent * event)2422 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
2423 {
2424 EventData evdata = { 0, };
2425
2426 switch (GST_EVENT_TYPE (event)) {
2427 case GST_EVENT_SEEK:
2428 /* _do_seek() unrefs the event. */
2429 return gst_aggregator_do_seek (self, event);
2430 case GST_EVENT_NAVIGATION:
2431 /* navigation is rather pointless. */
2432 gst_event_unref (event);
2433 return FALSE;
2434 case GST_EVENT_RECONFIGURE:
2435 /* We will renegotiate with downstream, we don't
2436 * need to forward this further */
2437 gst_event_unref (event);
2438 return TRUE;
2439 default:
2440 break;
2441 }
2442
2443 /* Don't forward QOS events to pads that had no active buffer yet. Otherwise
2444 * they will receive a QOS event that has earliest_time=0 (because we can't
2445 * have negative timestamps), and consider their buffer as too late */
2446 evdata.event = event;
2447 evdata.flush = FALSE;
2448 evdata.only_to_active_pads = GST_EVENT_TYPE (event) == GST_EVENT_QOS;
2449 gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2450 return evdata.result;
2451 }
2452
2453 static gboolean
gst_aggregator_src_pad_event_func(GstPad * pad,GstObject * parent,GstEvent * event)2454 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
2455 GstEvent * event)
2456 {
2457 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2458
2459 return klass->src_event (GST_AGGREGATOR (parent), event);
2460 }
2461
2462 static gboolean
gst_aggregator_src_pad_query_func(GstPad * pad,GstObject * parent,GstQuery * query)2463 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
2464 GstQuery * query)
2465 {
2466 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2467
2468 return klass->src_query (GST_AGGREGATOR (parent), query);
2469 }
2470
2471 static gboolean
gst_aggregator_src_pad_activate_mode_func(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)2472 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
2473 GstObject * parent, GstPadMode mode, gboolean active)
2474 {
2475 GstAggregator *self = GST_AGGREGATOR (parent);
2476 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2477
2478 if (klass->src_activate) {
2479 if (klass->src_activate (self, mode, active) == FALSE) {
2480 return FALSE;
2481 }
2482 }
2483
2484 if (active == TRUE) {
2485 switch (mode) {
2486 case GST_PAD_MODE_PUSH:
2487 {
2488 GST_INFO_OBJECT (pad, "Activating pad!");
2489 gst_aggregator_start_srcpad_task (self);
2490 return TRUE;
2491 }
2492 default:
2493 {
2494 GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
2495 return FALSE;
2496 }
2497 }
2498 }
2499
2500 /* deactivating */
2501 GST_INFO_OBJECT (self, "Deactivating srcpad");
2502
2503 gst_aggregator_stop_srcpad_task (self, FALSE);
2504
2505 return TRUE;
2506 }
2507
2508 static gboolean
gst_aggregator_default_sink_query(GstAggregator * self,GstAggregatorPad * aggpad,GstQuery * query)2509 gst_aggregator_default_sink_query (GstAggregator * self,
2510 GstAggregatorPad * aggpad, GstQuery * query)
2511 {
2512 GstPad *pad = GST_PAD (aggpad);
2513
2514 if (GST_QUERY_TYPE (query) == GST_QUERY_ALLOCATION) {
2515 GstQuery *decide_query = NULL;
2516 GstAggregatorClass *agg_class;
2517 gboolean ret;
2518
2519 GST_OBJECT_LOCK (self);
2520 PAD_LOCK (aggpad);
2521 if (G_UNLIKELY (!aggpad->priv->negotiated)) {
2522 GST_DEBUG_OBJECT (self,
2523 "not negotiated yet, can't answer ALLOCATION query");
2524 PAD_UNLOCK (aggpad);
2525 GST_OBJECT_UNLOCK (self);
2526
2527 return FALSE;
2528 }
2529
2530 if ((decide_query = self->priv->allocation_query))
2531 gst_query_ref (decide_query);
2532 PAD_UNLOCK (aggpad);
2533 GST_OBJECT_UNLOCK (self);
2534
2535 GST_DEBUG_OBJECT (self,
2536 "calling propose allocation with query %" GST_PTR_FORMAT, decide_query);
2537
2538 agg_class = GST_AGGREGATOR_GET_CLASS (self);
2539
2540 /* pass the query to the propose_allocation vmethod if any */
2541 if (agg_class->propose_allocation)
2542 ret = agg_class->propose_allocation (self, aggpad, decide_query, query);
2543 else
2544 ret = FALSE;
2545
2546 if (decide_query)
2547 gst_query_unref (decide_query);
2548
2549 GST_DEBUG_OBJECT (self, "ALLOCATION ret %d, %" GST_PTR_FORMAT, ret, query);
2550 return ret;
2551 }
2552
2553 return gst_pad_query_default (pad, GST_OBJECT (self), query);
2554 }
2555
2556 static gboolean
gst_aggregator_default_sink_query_pre_queue(GstAggregator * self,GstAggregatorPad * aggpad,GstQuery * query)2557 gst_aggregator_default_sink_query_pre_queue (GstAggregator * self,
2558 GstAggregatorPad * aggpad, GstQuery * query)
2559 {
2560 if (GST_QUERY_IS_SERIALIZED (query)) {
2561 GstStructure *s;
2562 gboolean ret = FALSE;
2563
2564 SRC_LOCK (self);
2565 PAD_LOCK (aggpad);
2566
2567 if (aggpad->priv->flow_return != GST_FLOW_OK) {
2568 SRC_UNLOCK (self);
2569 goto flushing;
2570 }
2571
2572 g_queue_push_head (&aggpad->priv->data, query);
2573 SRC_BROADCAST (self);
2574 SRC_UNLOCK (self);
2575
2576 while (!gst_aggregator_pad_queue_is_empty (aggpad)
2577 && aggpad->priv->flow_return == GST_FLOW_OK) {
2578 GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2579 PAD_WAIT_EVENT (aggpad);
2580 }
2581
2582 s = gst_query_writable_structure (query);
2583 if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
2584 gst_structure_remove_field (s, "gst-aggregator-retval");
2585 else
2586 g_queue_remove (&aggpad->priv->data, query);
2587
2588 if (aggpad->priv->flow_return != GST_FLOW_OK)
2589 goto flushing;
2590
2591 PAD_UNLOCK (aggpad);
2592
2593 return ret;
2594 } else {
2595 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
2596
2597 return klass->sink_query (self, aggpad, query);
2598 }
2599
2600 flushing:
2601 GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
2602 gst_flow_get_name (aggpad->priv->flow_return));
2603 PAD_UNLOCK (aggpad);
2604
2605 return FALSE;
2606 }
2607
2608 static void
gst_aggregator_finalize(GObject * object)2609 gst_aggregator_finalize (GObject * object)
2610 {
2611 GstAggregator *self = (GstAggregator *) object;
2612
2613 g_mutex_clear (&self->priv->src_lock);
2614 g_cond_clear (&self->priv->src_cond);
2615
2616 G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
2617 }
2618
2619 /*
2620 * gst_aggregator_set_latency_property:
2621 * @agg: a #GstAggregator
2622 * @latency: the new latency value (in nanoseconds).
2623 *
2624 * Sets the new latency value to @latency. This value is used to limit the
2625 * amount of time a pad waits for data to appear before considering the pad
2626 * as unresponsive.
2627 */
2628 static void
gst_aggregator_set_latency_property(GstAggregator * self,GstClockTime latency)2629 gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency)
2630 {
2631 gboolean changed;
2632
2633 g_return_if_fail (GST_IS_AGGREGATOR (self));
2634 g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
2635
2636 SRC_LOCK (self);
2637 changed = (self->priv->latency != latency);
2638
2639 if (changed) {
2640 GList *item;
2641
2642 GST_OBJECT_LOCK (self);
2643 /* First lock all the pads */
2644 for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2645 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2646 PAD_LOCK (aggpad);
2647 }
2648
2649 self->priv->latency = latency;
2650
2651 SRC_BROADCAST (self);
2652
2653 /* Now wake up the pads */
2654 for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2655 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2656 PAD_BROADCAST_EVENT (aggpad);
2657 PAD_UNLOCK (aggpad);
2658 }
2659 GST_OBJECT_UNLOCK (self);
2660 }
2661
2662 SRC_UNLOCK (self);
2663
2664 if (changed)
2665 gst_element_post_message (GST_ELEMENT_CAST (self),
2666 gst_message_new_latency (GST_OBJECT_CAST (self)));
2667 }
2668
2669 /*
2670 * gst_aggregator_get_latency_property:
2671 * @agg: a #GstAggregator
2672 *
2673 * Gets the latency value. See gst_aggregator_set_latency for
2674 * more details.
2675 *
2676 * Returns: The time in nanoseconds to wait for data to arrive on a sink pad
2677 * before a pad is deemed unresponsive. A value of -1 means an
2678 * unlimited time.
2679 */
2680 static GstClockTime
gst_aggregator_get_latency_property(GstAggregator * agg)2681 gst_aggregator_get_latency_property (GstAggregator * agg)
2682 {
2683 GstClockTime res;
2684
2685 g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE);
2686
2687 GST_OBJECT_LOCK (agg);
2688 res = agg->priv->latency;
2689 GST_OBJECT_UNLOCK (agg);
2690
2691 return res;
2692 }
2693
2694 static void
gst_aggregator_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)2695 gst_aggregator_set_property (GObject * object, guint prop_id,
2696 const GValue * value, GParamSpec * pspec)
2697 {
2698 GstAggregator *agg = GST_AGGREGATOR (object);
2699
2700 switch (prop_id) {
2701 case PROP_LATENCY:
2702 gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value));
2703 break;
2704 case PROP_MIN_UPSTREAM_LATENCY:
2705 SRC_LOCK (agg);
2706 agg->priv->upstream_latency_min = g_value_get_uint64 (value);
2707 SRC_UNLOCK (agg);
2708 break;
2709 case PROP_START_TIME_SELECTION:
2710 agg->priv->start_time_selection = g_value_get_enum (value);
2711 break;
2712 case PROP_START_TIME:
2713 agg->priv->start_time = g_value_get_uint64 (value);
2714 break;
2715 case PROP_EMIT_SIGNALS:
2716 agg->priv->emit_signals = g_value_get_boolean (value);
2717 break;
2718 default:
2719 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2720 break;
2721 }
2722 }
2723
2724 static void
gst_aggregator_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)2725 gst_aggregator_get_property (GObject * object, guint prop_id,
2726 GValue * value, GParamSpec * pspec)
2727 {
2728 GstAggregator *agg = GST_AGGREGATOR (object);
2729
2730 switch (prop_id) {
2731 case PROP_LATENCY:
2732 g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg));
2733 break;
2734 case PROP_MIN_UPSTREAM_LATENCY:
2735 SRC_LOCK (agg);
2736 g_value_set_uint64 (value, agg->priv->upstream_latency_min);
2737 SRC_UNLOCK (agg);
2738 break;
2739 case PROP_START_TIME_SELECTION:
2740 g_value_set_enum (value, agg->priv->start_time_selection);
2741 break;
2742 case PROP_START_TIME:
2743 g_value_set_uint64 (value, agg->priv->start_time);
2744 break;
2745 case PROP_EMIT_SIGNALS:
2746 g_value_set_boolean (value, agg->priv->emit_signals);
2747 break;
2748 default:
2749 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2750 break;
2751 }
2752 }
2753
2754 /* GObject vmethods implementations */
2755 static void
gst_aggregator_class_init(GstAggregatorClass * klass)2756 gst_aggregator_class_init (GstAggregatorClass * klass)
2757 {
2758 GObjectClass *gobject_class = (GObjectClass *) klass;
2759 GstElementClass *gstelement_class = (GstElementClass *) klass;
2760
2761 aggregator_parent_class = g_type_class_peek_parent (klass);
2762
2763 GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
2764 GST_DEBUG_FG_MAGENTA, "GstAggregator");
2765
2766 if (aggregator_private_offset != 0)
2767 g_type_class_adjust_private_offset (klass, &aggregator_private_offset);
2768
2769 klass->finish_buffer = gst_aggregator_default_finish_buffer;
2770 klass->finish_buffer_list = gst_aggregator_default_finish_buffer_list;
2771
2772 klass->sink_event = gst_aggregator_default_sink_event;
2773 klass->sink_query = gst_aggregator_default_sink_query;
2774
2775 klass->src_event = gst_aggregator_default_src_event;
2776 klass->src_query = gst_aggregator_default_src_query;
2777
2778 klass->create_new_pad = gst_aggregator_default_create_new_pad;
2779 klass->update_src_caps = gst_aggregator_default_update_src_caps;
2780 klass->fixate_src_caps = gst_aggregator_default_fixate_src_caps;
2781 klass->negotiated_src_caps = gst_aggregator_default_negotiated_src_caps;
2782
2783 klass->negotiate = gst_aggregator_default_negotiate;
2784
2785 klass->sink_event_pre_queue = gst_aggregator_default_sink_event_pre_queue;
2786 klass->sink_query_pre_queue = gst_aggregator_default_sink_query_pre_queue;
2787
2788 gstelement_class->request_new_pad =
2789 GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
2790 gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
2791 gstelement_class->release_pad =
2792 GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
2793 gstelement_class->change_state =
2794 GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
2795
2796 gobject_class->set_property = gst_aggregator_set_property;
2797 gobject_class->get_property = gst_aggregator_get_property;
2798 gobject_class->finalize = gst_aggregator_finalize;
2799
2800 g_object_class_install_property (gobject_class, PROP_LATENCY,
2801 g_param_spec_uint64 ("latency", "Buffer latency",
2802 "Additional latency in live mode to allow upstream "
2803 "to take longer to produce buffers for the current "
2804 "position (in nanoseconds)", 0, G_MAXUINT64,
2805 DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2806
2807 /**
2808 * GstAggregator:min-upstream-latency:
2809 *
2810 * Force minimum upstream latency (in nanoseconds). When sources with a
2811 * higher latency are expected to be plugged in dynamically after the
2812 * aggregator has started playing, this allows overriding the minimum
2813 * latency reported by the initial source(s). This is only taken into
2814 * account when larger than the actually reported minimum latency.
2815 *
2816 * Since: 1.16
2817 */
2818 g_object_class_install_property (gobject_class, PROP_MIN_UPSTREAM_LATENCY,
2819 g_param_spec_uint64 ("min-upstream-latency", "Buffer latency",
2820 "When sources with a higher latency are expected to be plugged "
2821 "in dynamically after the aggregator has started playing, "
2822 "this allows overriding the minimum latency reported by the "
2823 "initial source(s). This is only taken into account when larger "
2824 "than the actually reported minimum latency. (nanoseconds)",
2825 0, G_MAXUINT64,
2826 DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2827
2828 g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION,
2829 g_param_spec_enum ("start-time-selection", "Start Time Selection",
2830 "Decides which start time is output",
2831 gst_aggregator_start_time_selection_get_type (),
2832 DEFAULT_START_TIME_SELECTION,
2833 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2834
2835 g_object_class_install_property (gobject_class, PROP_START_TIME,
2836 g_param_spec_uint64 ("start-time", "Start Time",
2837 "Start time to use if start-time-selection=set", 0,
2838 G_MAXUINT64,
2839 DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2840
2841 /**
2842 * GstAggregator:emit-signals:
2843 *
2844 * Enables the emission of signals such as #GstAggregator::samples-selected
2845 *
2846 * Since: 1.18
2847 */
2848 g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
2849 g_param_spec_boolean ("emit-signals", "Emit signals",
2850 "Send signals", DEFAULT_EMIT_SIGNALS,
2851 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2852
2853 /**
2854 * GstAggregator::samples-selected:
2855 * @aggregator: The #GstAggregator that emitted the signal
2856 * @segment: The #GstSegment the next output buffer is part of
2857 * @pts: The presentation timestamp of the next output buffer
2858 * @dts: The decoding timestamp of the next output buffer
2859 * @duration: The duration of the next output buffer
2860 * @info: (nullable): a #GstStructure containing additional information
2861 *
2862 * Signals that the #GstAggregator subclass has selected the next set
2863 * of input samples it will aggregate. Handlers may call
2864 * gst_aggregator_peek_next_sample() at that point.
2865 *
2866 * Since: 1.18
2867 */
2868 gst_aggregator_signals[SIGNAL_SAMPLES_SELECTED] =
2869 g_signal_new ("samples-selected", G_TYPE_FROM_CLASS (klass),
2870 G_SIGNAL_RUN_FIRST, 0, NULL, NULL, NULL, G_TYPE_NONE, 5,
2871 GST_TYPE_SEGMENT | G_SIGNAL_TYPE_STATIC_SCOPE, GST_TYPE_CLOCK_TIME,
2872 GST_TYPE_CLOCK_TIME, GST_TYPE_CLOCK_TIME,
2873 GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
2874 }
2875
2876 static inline gpointer
gst_aggregator_get_instance_private(GstAggregator * self)2877 gst_aggregator_get_instance_private (GstAggregator * self)
2878 {
2879 return (G_STRUCT_MEMBER_P (self, aggregator_private_offset));
2880 }
2881
2882 static void
gst_aggregator_init(GstAggregator * self,GstAggregatorClass * klass)2883 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
2884 {
2885 GstPadTemplate *pad_template;
2886 GstAggregatorPrivate *priv;
2887 GType pad_type;
2888
2889 g_return_if_fail (klass->aggregate != NULL);
2890
2891 self->priv = gst_aggregator_get_instance_private (self);
2892
2893 priv = self->priv;
2894
2895 pad_template =
2896 gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
2897 g_return_if_fail (pad_template != NULL);
2898
2899 priv->max_padserial = -1;
2900 priv->tags_changed = FALSE;
2901 priv->ignore_inactive_pads = FALSE;
2902
2903 self->priv->peer_latency_live = FALSE;
2904 self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
2905 self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
2906 self->priv->has_peer_latency = FALSE;
2907
2908 pad_type =
2909 GST_PAD_TEMPLATE_GTYPE (pad_template) ==
2910 G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD :
2911 GST_PAD_TEMPLATE_GTYPE (pad_template);
2912 g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD));
2913 self->srcpad =
2914 g_object_new (pad_type, "name", "src", "direction", GST_PAD_SRC,
2915 "template", pad_template, NULL);
2916
2917 gst_aggregator_reset_flow_values (self);
2918
2919 gst_pad_set_event_function (self->srcpad,
2920 GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
2921 gst_pad_set_query_function (self->srcpad,
2922 GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
2923 gst_pad_set_activatemode_function (self->srcpad,
2924 GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
2925
2926 gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
2927
2928 self->priv->upstream_latency_min = DEFAULT_MIN_UPSTREAM_LATENCY;
2929 self->priv->latency = DEFAULT_LATENCY;
2930 self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION;
2931 self->priv->start_time = DEFAULT_START_TIME;
2932
2933 g_mutex_init (&self->priv->src_lock);
2934 g_cond_init (&self->priv->src_cond);
2935 }
2936
2937 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
2938 * method to get to the padtemplates */
2939 GType
gst_aggregator_get_type(void)2940 gst_aggregator_get_type (void)
2941 {
2942 static gsize type = 0;
2943
2944 if (g_once_init_enter (&type)) {
2945 GType _type;
2946 static const GTypeInfo info = {
2947 sizeof (GstAggregatorClass),
2948 NULL,
2949 NULL,
2950 (GClassInitFunc) gst_aggregator_class_init,
2951 NULL,
2952 NULL,
2953 sizeof (GstAggregator),
2954 0,
2955 (GInstanceInitFunc) gst_aggregator_init,
2956 };
2957
2958 _type = g_type_register_static (GST_TYPE_ELEMENT,
2959 "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
2960
2961 aggregator_private_offset =
2962 g_type_add_instance_private (_type, sizeof (GstAggregatorPrivate));
2963
2964 g_once_init_leave (&type, _type);
2965 }
2966 return type;
2967 }
2968
2969 /* Must be called with SRC lock and PAD lock held */
2970 static gboolean
gst_aggregator_pad_has_space(GstAggregator * self,GstAggregatorPad * aggpad)2971 gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
2972 {
2973 guint64 max_time_level;
2974
2975 /* Empty queue always has space */
2976 if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL)
2977 return TRUE;
2978
2979 /* We also want at least two buffers, one is being processed and one is ready
2980 * for the next iteration when we operate in live mode. */
2981 if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2)
2982 return TRUE;
2983
2984 /* zero latency, if there is a buffer, it's full */
2985 if (self->priv->latency == 0)
2986 return FALSE;
2987
2988 /* On top of our latency, we also want to allow buffering up to the
2989 * minimum upstream latency to allow queue free sources with lower then
2990 * upstream latency. */
2991 max_time_level = self->priv->latency + self->priv->upstream_latency_min;
2992
2993 /* Allow no more buffers than the latency */
2994 return (aggpad->priv->time_level <= max_time_level);
2995 }
2996
2997 /* Must be called with the PAD_LOCK held */
2998 static void
apply_buffer(GstAggregatorPad * aggpad,GstBuffer * buffer,gboolean head)2999 apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
3000 {
3001 GstClockTime timestamp;
3002
3003 if (GST_BUFFER_DTS_IS_VALID (buffer))
3004 timestamp = GST_BUFFER_DTS (buffer);
3005 else
3006 timestamp = GST_BUFFER_PTS (buffer);
3007
3008 if (timestamp == GST_CLOCK_TIME_NONE) {
3009 if (head)
3010 timestamp = aggpad->priv->head_position;
3011 else
3012 timestamp = aggpad->priv->tail_position;
3013 }
3014
3015 /* add duration */
3016 if (GST_BUFFER_DURATION_IS_VALID (buffer))
3017 timestamp += GST_BUFFER_DURATION (buffer);
3018
3019 if (head)
3020 aggpad->priv->head_position = timestamp;
3021 else
3022 aggpad->priv->tail_position = timestamp;
3023
3024 update_time_level (aggpad, head);
3025 }
3026
3027 /*
3028 * Can be called either from the sinkpad's chain function or from the srcpad's
3029 * thread in the case of a buffer synthetized from a GAP event.
3030 * Because of this second case, FLUSH_LOCK can't be used here.
3031 */
3032
3033 static GstFlowReturn
gst_aggregator_pad_chain_internal(GstAggregator * self,GstAggregatorPad * aggpad,GstBuffer * buffer,gboolean head)3034 gst_aggregator_pad_chain_internal (GstAggregator * self,
3035 GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
3036 {
3037 GstFlowReturn flow_return;
3038 GstClockTime buf_pts;
3039
3040 GST_TRACE_OBJECT (aggpad, "entering chain internal");
3041
3042 PAD_LOCK (aggpad);
3043 flow_return = aggpad->priv->flow_return;
3044 if (flow_return != GST_FLOW_OK)
3045 goto flushing;
3046
3047 PAD_UNLOCK (aggpad);
3048
3049 buf_pts = GST_BUFFER_PTS (buffer);
3050
3051 for (;;) {
3052 SRC_LOCK (self);
3053 GST_OBJECT_LOCK (self);
3054 PAD_LOCK (aggpad);
3055
3056 if (aggpad->priv->first_buffer) {
3057 self->priv->has_peer_latency = FALSE;
3058 aggpad->priv->first_buffer = FALSE;
3059 }
3060
3061 if ((gst_aggregator_pad_has_space (self, aggpad) || !head)
3062 && aggpad->priv->flow_return == GST_FLOW_OK) {
3063 if (head)
3064 g_queue_push_head (&aggpad->priv->data, buffer);
3065 else
3066 g_queue_push_tail (&aggpad->priv->data, buffer);
3067 apply_buffer (aggpad, buffer, head);
3068 aggpad->priv->num_buffers++;
3069 buffer = NULL;
3070 SRC_BROADCAST (self);
3071 break;
3072 }
3073
3074 flow_return = aggpad->priv->flow_return;
3075 if (flow_return != GST_FLOW_OK) {
3076 GST_OBJECT_UNLOCK (self);
3077 SRC_UNLOCK (self);
3078 goto flushing;
3079 }
3080 GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed (chain)");
3081 GST_OBJECT_UNLOCK (self);
3082 SRC_UNLOCK (self);
3083 PAD_WAIT_EVENT (aggpad);
3084
3085 PAD_UNLOCK (aggpad);
3086 }
3087
3088 if (self->priv->first_buffer) {
3089 GstClockTime start_time;
3090 GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
3091
3092 switch (self->priv->start_time_selection) {
3093 case GST_AGGREGATOR_START_TIME_SELECTION_ZERO:
3094 default:
3095 start_time = 0;
3096 break;
3097 case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
3098 GST_OBJECT_LOCK (aggpad);
3099 if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) {
3100 start_time = buf_pts;
3101 if (start_time != -1) {
3102 start_time = MAX (start_time, aggpad->priv->head_segment.start);
3103 start_time =
3104 gst_segment_to_running_time (&aggpad->priv->head_segment,
3105 GST_FORMAT_TIME, start_time);
3106 }
3107 } else {
3108 start_time = 0;
3109 GST_WARNING_OBJECT (aggpad,
3110 "Ignoring request of selecting the first start time "
3111 "as the segment is a %s segment instead of a time segment",
3112 gst_format_get_name (aggpad->segment.format));
3113 }
3114 GST_OBJECT_UNLOCK (aggpad);
3115 break;
3116 case GST_AGGREGATOR_START_TIME_SELECTION_SET:
3117 start_time = self->priv->start_time;
3118 if (start_time == -1)
3119 start_time = 0;
3120 break;
3121 }
3122
3123 if (start_time != -1) {
3124 if (srcpad->segment.position == -1)
3125 srcpad->segment.position = start_time;
3126 else
3127 srcpad->segment.position = MIN (start_time, srcpad->segment.position);
3128
3129 GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT,
3130 GST_TIME_ARGS (start_time));
3131 }
3132 }
3133
3134 PAD_UNLOCK (aggpad);
3135 GST_OBJECT_UNLOCK (self);
3136 SRC_UNLOCK (self);
3137
3138 GST_TRACE_OBJECT (aggpad, "Done chaining");
3139
3140 return flow_return;
3141
3142 flushing:
3143 PAD_UNLOCK (aggpad);
3144
3145 GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
3146 gst_flow_get_name (flow_return));
3147 if (buffer)
3148 gst_buffer_unref (buffer);
3149
3150 return flow_return;
3151 }
3152
3153 static GstFlowReturn
gst_aggregator_pad_chain(GstPad * pad,GstObject * object,GstBuffer * buffer)3154 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
3155 {
3156 GstFlowReturn ret;
3157 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
3158
3159 GST_TRACE_OBJECT (aggpad, "entering chain");
3160
3161 PAD_FLUSH_LOCK (aggpad);
3162
3163 ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
3164 aggpad, buffer, TRUE);
3165
3166 PAD_FLUSH_UNLOCK (aggpad);
3167
3168 return ret;
3169 }
3170
3171 static gboolean
gst_aggregator_pad_query_func(GstPad * pad,GstObject * parent,GstQuery * query)3172 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
3173 GstQuery * query)
3174 {
3175 GstAggregator *self = GST_AGGREGATOR (parent);
3176 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
3177 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
3178
3179 g_assert (klass->sink_query_pre_queue);
3180 return klass->sink_query_pre_queue (self, aggpad, query);
3181 }
3182
3183 static GstFlowReturn
gst_aggregator_pad_event_func(GstPad * pad,GstObject * parent,GstEvent * event)3184 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
3185 GstEvent * event)
3186 {
3187 GstAggregator *self = GST_AGGREGATOR (parent);
3188 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
3189 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
3190
3191 g_assert (klass->sink_event_pre_queue);
3192 return klass->sink_event_pre_queue (self, aggpad, event);
3193 }
3194
3195 static gboolean
gst_aggregator_pad_activate_mode_func(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)3196 gst_aggregator_pad_activate_mode_func (GstPad * pad,
3197 GstObject * parent, GstPadMode mode, gboolean active)
3198 {
3199 GstAggregator *self = GST_AGGREGATOR (parent);
3200 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
3201
3202 if (active == FALSE) {
3203 SRC_LOCK (self);
3204 gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
3205 SRC_BROADCAST (self);
3206 SRC_UNLOCK (self);
3207 } else {
3208 PAD_LOCK (aggpad);
3209 aggpad->priv->flow_return = GST_FLOW_OK;
3210 PAD_BROADCAST_EVENT (aggpad);
3211 PAD_UNLOCK (aggpad);
3212 }
3213
3214 return TRUE;
3215 }
3216
3217 /***********************************
3218 * GstAggregatorPad implementation *
3219 ************************************/
3220 G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
3221
3222 #define DEFAULT_PAD_EMIT_SIGNALS FALSE
3223
3224 enum
3225 {
3226 PAD_PROP_0,
3227 PAD_PROP_EMIT_SIGNALS,
3228 };
3229
3230 enum
3231 {
3232 PAD_SIGNAL_BUFFER_CONSUMED,
3233 PAD_LAST_SIGNAL,
3234 };
3235
3236 static guint gst_aggregator_pad_signals[PAD_LAST_SIGNAL] = { 0 };
3237
3238 static void
gst_aggregator_pad_constructed(GObject * object)3239 gst_aggregator_pad_constructed (GObject * object)
3240 {
3241 GstPad *pad = GST_PAD (object);
3242
3243 if (GST_PAD_IS_SINK (pad)) {
3244 gst_pad_set_chain_function (pad,
3245 GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
3246 gst_pad_set_event_full_function_full (pad,
3247 GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
3248 gst_pad_set_query_function (pad,
3249 GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
3250 gst_pad_set_activatemode_function (pad,
3251 GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
3252 }
3253 }
3254
3255 static void
gst_aggregator_pad_finalize(GObject * object)3256 gst_aggregator_pad_finalize (GObject * object)
3257 {
3258 GstAggregatorPad *pad = (GstAggregatorPad *) object;
3259
3260 gst_buffer_replace (&pad->priv->peeked_buffer, NULL);
3261 g_cond_clear (&pad->priv->event_cond);
3262 g_mutex_clear (&pad->priv->flush_lock);
3263 g_mutex_clear (&pad->priv->lock);
3264
3265 G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
3266 }
3267
3268 static void
gst_aggregator_pad_dispose(GObject * object)3269 gst_aggregator_pad_dispose (GObject * object)
3270 {
3271 GstAggregatorPad *pad = (GstAggregatorPad *) object;
3272
3273 gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE);
3274
3275 G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
3276 }
3277
3278 static void
gst_aggregator_pad_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)3279 gst_aggregator_pad_set_property (GObject * object, guint prop_id,
3280 const GValue * value, GParamSpec * pspec)
3281 {
3282 GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object);
3283
3284 switch (prop_id) {
3285 case PAD_PROP_EMIT_SIGNALS:
3286 pad->priv->emit_signals = g_value_get_boolean (value);
3287 break;
3288 default:
3289 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3290 break;
3291 }
3292 }
3293
3294 static void
gst_aggregator_pad_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)3295 gst_aggregator_pad_get_property (GObject * object, guint prop_id,
3296 GValue * value, GParamSpec * pspec)
3297 {
3298 GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object);
3299
3300 switch (prop_id) {
3301 case PAD_PROP_EMIT_SIGNALS:
3302 g_value_set_boolean (value, pad->priv->emit_signals);
3303 break;
3304 default:
3305 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3306 break;
3307 }
3308 }
3309
3310 static void
gst_aggregator_pad_class_init(GstAggregatorPadClass * klass)3311 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
3312 {
3313 GObjectClass *gobject_class = (GObjectClass *) klass;
3314
3315 gobject_class->constructed = gst_aggregator_pad_constructed;
3316 gobject_class->finalize = gst_aggregator_pad_finalize;
3317 gobject_class->dispose = gst_aggregator_pad_dispose;
3318 gobject_class->set_property = gst_aggregator_pad_set_property;
3319 gobject_class->get_property = gst_aggregator_pad_get_property;
3320
3321 /**
3322 * GstAggregatorPad:buffer-consumed:
3323 * @aggregator: The #GstAggregator that emitted the signal
3324 * @buffer: The buffer that was consumed
3325 *
3326 * Signals that a buffer was consumed. As aggregator pads store buffers
3327 * in an internal queue, there is no direct match between input and output
3328 * buffers at any given time. This signal can be useful to forward metas
3329 * such as #GstVideoTimeCodeMeta or #GstVideoCaptionMeta at the right time.
3330 *
3331 * Since: 1.16
3332 */
3333 gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED] =
3334 g_signal_new ("buffer-consumed", G_TYPE_FROM_CLASS (klass),
3335 G_SIGNAL_RUN_FIRST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_BUFFER);
3336
3337 /**
3338 * GstAggregatorPad:emit-signals:
3339 *
3340 * Enables the emission of signals such as #GstAggregatorPad::buffer-consumed
3341 *
3342 * Since: 1.16
3343 */
3344 g_object_class_install_property (gobject_class, PAD_PROP_EMIT_SIGNALS,
3345 g_param_spec_boolean ("emit-signals", "Emit signals",
3346 "Send signals to signal data consumption", DEFAULT_PAD_EMIT_SIGNALS,
3347 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
3348 }
3349
3350 static void
gst_aggregator_pad_init(GstAggregatorPad * pad)3351 gst_aggregator_pad_init (GstAggregatorPad * pad)
3352 {
3353 pad->priv = gst_aggregator_pad_get_instance_private (pad);
3354
3355 g_queue_init (&pad->priv->data);
3356 g_cond_init (&pad->priv->event_cond);
3357
3358 g_mutex_init (&pad->priv->flush_lock);
3359 g_mutex_init (&pad->priv->lock);
3360
3361 gst_aggregator_pad_reset_unlocked (pad);
3362 pad->priv->negotiated = FALSE;
3363 pad->priv->emit_signals = DEFAULT_PAD_EMIT_SIGNALS;
3364 }
3365
3366 /* Must be called with the PAD_LOCK held */
3367 static void
gst_aggregator_pad_buffer_consumed(GstAggregatorPad * pad,GstBuffer * buffer,gboolean dequeued)3368 gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer,
3369 gboolean dequeued)
3370 {
3371 if (dequeued)
3372 pad->priv->num_buffers--;
3373
3374 if (buffer && pad->priv->emit_signals) {
3375 g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED],
3376 0, buffer);
3377 }
3378 PAD_BROADCAST_EVENT (pad);
3379 }
3380
3381 /* Must be called with the PAD_LOCK held */
3382 static void
gst_aggregator_pad_clip_buffer_unlocked(GstAggregatorPad * pad)3383 gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
3384 {
3385 GstAggregator *self = NULL;
3386 GstAggregatorClass *aggclass = NULL;
3387 GstBuffer *buffer = NULL;
3388
3389 while (pad->priv->clipped_buffer == NULL &&
3390 GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
3391 buffer = g_queue_pop_tail (&pad->priv->data);
3392
3393 apply_buffer (pad, buffer, FALSE);
3394
3395 /* We only take the parent here so that it's not taken if the buffer is
3396 * already clipped or if the queue is empty.
3397 */
3398 if (self == NULL) {
3399 self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
3400 if (self == NULL) {
3401 gst_buffer_unref (buffer);
3402 return;
3403 }
3404
3405 aggclass = GST_AGGREGATOR_GET_CLASS (self);
3406 }
3407
3408 if (aggclass->clip) {
3409 GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer);
3410
3411 buffer = aggclass->clip (self, pad, buffer);
3412
3413 if (buffer == NULL) {
3414 gst_aggregator_pad_buffer_consumed (pad, buffer, TRUE);
3415 GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
3416 }
3417 }
3418
3419 pad->priv->clipped_buffer = buffer;
3420 }
3421
3422 if (self)
3423 gst_object_unref (self);
3424 }
3425
3426 /**
3427 * gst_aggregator_pad_pop_buffer:
3428 * @pad: the pad to get buffer from
3429 *
3430 * Steal the ref to the buffer currently queued in @pad.
3431 *
3432 * Returns: (nullable) (transfer full): The buffer in @pad or NULL if no buffer was
3433 * queued. You should unref the buffer after usage.
3434 */
3435 GstBuffer *
gst_aggregator_pad_pop_buffer(GstAggregatorPad * pad)3436 gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
3437 {
3438 GstBuffer *buffer = NULL;
3439
3440 PAD_LOCK (pad);
3441
3442 /* If the subclass has already peeked a buffer, we guarantee
3443 * that it receives the same buffer, no matter if the pad has
3444 * errored out / been flushed in the meantime.
3445 */
3446 if (pad->priv->peeked_buffer) {
3447 buffer = pad->priv->peeked_buffer;
3448 goto done;
3449 }
3450
3451 if (pad->priv->flow_return != GST_FLOW_OK)
3452 goto done;
3453
3454 gst_aggregator_pad_clip_buffer_unlocked (pad);
3455 buffer = pad->priv->clipped_buffer;
3456
3457 done:
3458 if (buffer) {
3459 if (pad->priv->clipped_buffer != NULL) {
3460 /* Here we still hold a reference to both the clipped buffer
3461 * and possibly the peeked buffer, we transfer the first and
3462 * potentially release the second
3463 */
3464 gst_aggregator_pad_buffer_consumed (pad, buffer, TRUE);
3465 pad->priv->clipped_buffer = NULL;
3466 gst_buffer_replace (&pad->priv->peeked_buffer, NULL);
3467 } else {
3468 /* Here our clipped buffer has already been released, for
3469 * example because of a flush. We thus transfer the reference
3470 * to the peeked buffer to the caller, and we don't decrement
3471 * pad.num_buffers as it has already been done elsewhere
3472 */
3473 gst_aggregator_pad_buffer_consumed (pad, buffer, FALSE);
3474 pad->priv->peeked_buffer = NULL;
3475 }
3476 GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
3477 }
3478
3479 PAD_UNLOCK (pad);
3480
3481 return buffer;
3482 }
3483
3484 /**
3485 * gst_aggregator_pad_drop_buffer:
3486 * @pad: the pad where to drop any pending buffer
3487 *
3488 * Drop the buffer currently queued in @pad.
3489 *
3490 * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not.
3491 */
3492 gboolean
gst_aggregator_pad_drop_buffer(GstAggregatorPad * pad)3493 gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
3494 {
3495 GstBuffer *buf;
3496
3497 buf = gst_aggregator_pad_pop_buffer (pad);
3498
3499 if (buf == NULL)
3500 return FALSE;
3501
3502 gst_buffer_unref (buf);
3503 return TRUE;
3504 }
3505
3506 /**
3507 * gst_aggregator_pad_peek_buffer:
3508 * @pad: the pad to get buffer from
3509 *
3510 * Returns: (nullable) (transfer full): A reference to the buffer in @pad or
3511 * NULL if no buffer was queued. You should unref the buffer after
3512 * usage.
3513 */
3514 GstBuffer *
gst_aggregator_pad_peek_buffer(GstAggregatorPad * pad)3515 gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
3516 {
3517 GstBuffer *buffer = NULL;
3518
3519 PAD_LOCK (pad);
3520
3521 if (pad->priv->peeked_buffer) {
3522 buffer = gst_buffer_ref (pad->priv->peeked_buffer);
3523 goto done;
3524 }
3525
3526 if (pad->priv->flow_return != GST_FLOW_OK)
3527 goto done;
3528
3529 gst_aggregator_pad_clip_buffer_unlocked (pad);
3530
3531 if (pad->priv->clipped_buffer) {
3532 buffer = gst_buffer_ref (pad->priv->clipped_buffer);
3533 pad->priv->peeked_buffer = gst_buffer_ref (buffer);
3534 } else {
3535 buffer = NULL;
3536 }
3537
3538 done:
3539 PAD_UNLOCK (pad);
3540 return buffer;
3541 }
3542
3543 /**
3544 * gst_aggregator_pad_has_buffer:
3545 * @pad: the pad to check the buffer on
3546 *
3547 * This checks if a pad has a buffer available that will be returned by
3548 * a call to gst_aggregator_pad_peek_buffer() or
3549 * gst_aggregator_pad_pop_buffer().
3550 *
3551 * Returns: %TRUE if the pad has a buffer available as the next thing.
3552 *
3553 * Since: 1.14.1
3554 */
3555 gboolean
gst_aggregator_pad_has_buffer(GstAggregatorPad * pad)3556 gst_aggregator_pad_has_buffer (GstAggregatorPad * pad)
3557 {
3558 gboolean has_buffer;
3559
3560 PAD_LOCK (pad);
3561
3562 if (pad->priv->peeked_buffer) {
3563 has_buffer = TRUE;
3564 } else {
3565 gst_aggregator_pad_clip_buffer_unlocked (pad);
3566 has_buffer = (pad->priv->clipped_buffer != NULL);
3567 if (has_buffer)
3568 pad->priv->peeked_buffer = gst_buffer_ref (pad->priv->clipped_buffer);
3569 }
3570 PAD_UNLOCK (pad);
3571
3572 return has_buffer;
3573 }
3574
3575 /**
3576 * gst_aggregator_pad_is_eos:
3577 * @pad: an aggregator pad
3578 *
3579 * Returns: %TRUE if the pad is EOS, otherwise %FALSE.
3580 */
3581 gboolean
gst_aggregator_pad_is_eos(GstAggregatorPad * pad)3582 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
3583 {
3584 gboolean is_eos;
3585
3586 PAD_LOCK (pad);
3587 is_eos = pad->priv->eos;
3588 PAD_UNLOCK (pad);
3589
3590 return is_eos;
3591 }
3592
3593 /**
3594 * gst_aggregator_pad_is_inactive:
3595 * @pad: an aggregator pad
3596 *
3597 * It is only valid to call this method from #GstAggregatorClass::aggregate()
3598 *
3599 * Returns: %TRUE if the pad is inactive, %FALSE otherwise.
3600 * See gst_aggregator_ignore_inactive_pads() for more info.
3601 * Since: 1.20
3602 */
3603 gboolean
gst_aggregator_pad_is_inactive(GstAggregatorPad * pad)3604 gst_aggregator_pad_is_inactive (GstAggregatorPad * pad)
3605 {
3606 GstAggregator *self;
3607 gboolean inactive;
3608
3609 self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
3610
3611 g_assert_nonnull (self);
3612
3613 PAD_LOCK (pad);
3614 inactive = self->priv->ignore_inactive_pads && self->priv->peer_latency_live
3615 && pad->priv->first_buffer;
3616 PAD_UNLOCK (pad);
3617
3618 gst_object_unref (self);
3619
3620 return inactive;
3621 }
3622
3623 #if 0
3624 /*
3625 * gst_aggregator_merge_tags:
3626 * @self: a #GstAggregator
3627 * @tags: a #GstTagList to merge
3628 * @mode: the #GstTagMergeMode to use
3629 *
3630 * Adds tags to so-called pending tags, which will be processed
3631 * before pushing out data downstream.
3632 *
3633 * Note that this is provided for convenience, and the subclass is
3634 * not required to use this and can still do tag handling on its own.
3635 *
3636 * MT safe.
3637 */
3638 void
3639 gst_aggregator_merge_tags (GstAggregator * self,
3640 const GstTagList * tags, GstTagMergeMode mode)
3641 {
3642 GstTagList *otags;
3643
3644 g_return_if_fail (GST_IS_AGGREGATOR (self));
3645 g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
3646
3647 /* FIXME Check if we can use OBJECT lock here! */
3648 GST_OBJECT_LOCK (self);
3649 if (tags)
3650 GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
3651 otags = self->priv->tags;
3652 self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
3653 if (otags)
3654 gst_tag_list_unref (otags);
3655 self->priv->tags_changed = TRUE;
3656 GST_OBJECT_UNLOCK (self);
3657 }
3658 #endif
3659
3660 /**
3661 * gst_aggregator_set_latency:
3662 * @self: a #GstAggregator
3663 * @min_latency: minimum latency
3664 * @max_latency: maximum latency
3665 *
3666 * Lets #GstAggregator sub-classes tell the baseclass what their internal
3667 * latency is. Will also post a LATENCY message on the bus so the pipeline
3668 * can reconfigure its global latency.
3669 */
3670 void
gst_aggregator_set_latency(GstAggregator * self,GstClockTime min_latency,GstClockTime max_latency)3671 gst_aggregator_set_latency (GstAggregator * self,
3672 GstClockTime min_latency, GstClockTime max_latency)
3673 {
3674 gboolean changed = FALSE;
3675
3676 g_return_if_fail (GST_IS_AGGREGATOR (self));
3677 g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
3678 g_return_if_fail (max_latency >= min_latency);
3679
3680 SRC_LOCK (self);
3681 if (self->priv->sub_latency_min != min_latency) {
3682 self->priv->sub_latency_min = min_latency;
3683 changed = TRUE;
3684 }
3685 if (self->priv->sub_latency_max != max_latency) {
3686 self->priv->sub_latency_max = max_latency;
3687 changed = TRUE;
3688 }
3689
3690 if (changed)
3691 SRC_BROADCAST (self);
3692 SRC_UNLOCK (self);
3693
3694 if (changed) {
3695 gst_element_post_message (GST_ELEMENT_CAST (self),
3696 gst_message_new_latency (GST_OBJECT_CAST (self)));
3697 }
3698 }
3699
3700 /**
3701 * gst_aggregator_get_buffer_pool:
3702 * @self: a #GstAggregator
3703 *
3704 * Returns: (transfer full) (nullable): the instance of the #GstBufferPool used
3705 * by @trans; free it after use it
3706 */
3707 GstBufferPool *
gst_aggregator_get_buffer_pool(GstAggregator * self)3708 gst_aggregator_get_buffer_pool (GstAggregator * self)
3709 {
3710 GstBufferPool *pool;
3711
3712 g_return_val_if_fail (GST_IS_AGGREGATOR (self), NULL);
3713
3714 GST_OBJECT_LOCK (self);
3715 pool = self->priv->pool;
3716 if (pool)
3717 gst_object_ref (pool);
3718 GST_OBJECT_UNLOCK (self);
3719
3720 return pool;
3721 }
3722
3723 /**
3724 * gst_aggregator_get_allocator:
3725 * @self: a #GstAggregator
3726 * @allocator: (out) (optional) (nullable) (transfer full): the #GstAllocator
3727 * used
3728 * @params: (out caller-allocates) (optional): the
3729 * #GstAllocationParams of @allocator
3730 *
3731 * Lets #GstAggregator sub-classes get the memory @allocator
3732 * acquired by the base class and its @params.
3733 *
3734 * Unref the @allocator after use it.
3735 */
3736 void
gst_aggregator_get_allocator(GstAggregator * self,GstAllocator ** allocator,GstAllocationParams * params)3737 gst_aggregator_get_allocator (GstAggregator * self,
3738 GstAllocator ** allocator, GstAllocationParams * params)
3739 {
3740 g_return_if_fail (GST_IS_AGGREGATOR (self));
3741
3742 if (allocator)
3743 *allocator = self->priv->allocator ?
3744 gst_object_ref (self->priv->allocator) : NULL;
3745
3746 if (params)
3747 *params = self->priv->allocation_params;
3748 }
3749
3750 /**
3751 * gst_aggregator_simple_get_next_time:
3752 * @self: A #GstAggregator
3753 *
3754 * This is a simple #GstAggregatorClass::get_next_time implementation that
3755 * just looks at the #GstSegment on the srcpad of the aggregator and bases
3756 * the next time on the running time there.
3757 *
3758 * This is the desired behaviour in most cases where you have a live source
3759 * and you have a dead line based aggregator subclass.
3760 *
3761 * Returns: The running time based on the position
3762 *
3763 * Since: 1.16
3764 */
3765 GstClockTime
gst_aggregator_simple_get_next_time(GstAggregator * self)3766 gst_aggregator_simple_get_next_time (GstAggregator * self)
3767 {
3768 GstClockTime next_time;
3769 GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
3770 GstSegment *segment = &srcpad->segment;
3771
3772 GST_OBJECT_LOCK (self);
3773 if (segment->position == -1 || segment->position < segment->start)
3774 next_time = segment->start;
3775 else
3776 next_time = segment->position;
3777
3778 if (segment->stop != -1 && next_time > segment->stop)
3779 next_time = segment->stop;
3780
3781 next_time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, next_time);
3782 GST_OBJECT_UNLOCK (self);
3783
3784 return next_time;
3785 }
3786
3787 /**
3788 * gst_aggregator_update_segment:
3789 *
3790 * Subclasses should use this to update the segment on their
3791 * source pad, instead of directly pushing new segment events
3792 * downstream.
3793 *
3794 * Subclasses MUST call this before gst_aggregator_selected_samples(),
3795 * if it is used at all.
3796 *
3797 * Since: 1.18
3798 */
3799 void
gst_aggregator_update_segment(GstAggregator * self,const GstSegment * segment)3800 gst_aggregator_update_segment (GstAggregator * self, const GstSegment * segment)
3801 {
3802 g_return_if_fail (GST_IS_AGGREGATOR (self));
3803 g_return_if_fail (segment != NULL);
3804
3805 GST_INFO_OBJECT (self, "Updating srcpad segment: %" GST_SEGMENT_FORMAT,
3806 segment);
3807
3808 GST_OBJECT_LOCK (self);
3809 GST_AGGREGATOR_PAD (self->srcpad)->segment = *segment;
3810 self->priv->send_segment = TRUE;
3811 /* we have a segment from the subclass now and really shouldn't override
3812 * anything in that segment anymore, like the segment.position */
3813 self->priv->first_buffer = FALSE;
3814 GST_OBJECT_UNLOCK (self);
3815 }
3816
3817 /**
3818 * gst_aggregator_selected_samples:
3819 * @pts: The presentation timestamp of the next output buffer
3820 * @dts: The decoding timestamp of the next output buffer
3821 * @duration: The duration of the next output buffer
3822 * @info: (nullable): a #GstStructure containing additional information
3823 *
3824 * Subclasses should call this when they have prepared the
3825 * buffers they will aggregate for each of their sink pads, but
3826 * before using any of the properties of the pads that govern
3827 * *how* aggregation should be performed, for example z-index
3828 * for video aggregators.
3829 *
3830 * If gst_aggregator_update_segment() is used by the subclass,
3831 * it MUST be called before gst_aggregator_selected_samples().
3832 *
3833 * This function MUST only be called from the #GstAggregatorClass::aggregate()
3834 * function.
3835 *
3836 * Since: 1.18
3837 */
3838 void
gst_aggregator_selected_samples(GstAggregator * self,GstClockTime pts,GstClockTime dts,GstClockTime duration,GstStructure * info)3839 gst_aggregator_selected_samples (GstAggregator * self,
3840 GstClockTime pts, GstClockTime dts, GstClockTime duration,
3841 GstStructure * info)
3842 {
3843 g_return_if_fail (GST_IS_AGGREGATOR (self));
3844
3845 if (self->priv->emit_signals) {
3846 g_signal_emit (self, gst_aggregator_signals[SIGNAL_SAMPLES_SELECTED], 0,
3847 &GST_AGGREGATOR_PAD (self->srcpad)->segment, pts, dts, duration, info);
3848 }
3849
3850 self->priv->selected_samples_called_or_warned = TRUE;
3851 }
3852
3853 /**
3854 * gst_aggregator_set_ignore_inactive_pads:
3855 * @ignore: whether inactive pads should not be waited on
3856 *
3857 * Subclasses should call this when they don't want to time out
3858 * waiting for a pad that hasn't yet received any buffers in live
3859 * mode.
3860 *
3861 * #GstAggregator will still wait once on each newly-added pad, making
3862 * sure upstream has had a fair chance to start up.
3863 *
3864 * Since: 1.20
3865 */
3866 void
gst_aggregator_set_ignore_inactive_pads(GstAggregator * self,gboolean ignore)3867 gst_aggregator_set_ignore_inactive_pads (GstAggregator * self, gboolean ignore)
3868 {
3869 g_return_if_fail (GST_IS_AGGREGATOR (self));
3870
3871 GST_OBJECT_LOCK (self);
3872 self->priv->ignore_inactive_pads = ignore;
3873 GST_OBJECT_UNLOCK (self);
3874 }
3875
3876 /**
3877 * gst_aggregator_get_ignore_inactive_pads:
3878 *
3879 * Returns: whether inactive pads will not be waited on
3880 * Since: 1.20
3881 */
3882 gboolean
gst_aggregator_get_ignore_inactive_pads(GstAggregator * self)3883 gst_aggregator_get_ignore_inactive_pads (GstAggregator * self)
3884 {
3885 gboolean ret;
3886
3887 g_return_val_if_fail (GST_IS_AGGREGATOR (self), FALSE);
3888
3889 GST_OBJECT_LOCK (self);
3890 ret = self->priv->ignore_inactive_pads;
3891 GST_OBJECT_UNLOCK (self);
3892
3893 return ret;
3894 }
3895