• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2001 Thomas <thomas@apestaart.org>
4  *               2005,2006 Wim Taymans <wim@fluendo.com>
5  *                    2013 Sebastian Dröge <sebastian@centricular.com>
6  *                    2014 Collabora
7  *                             Olivier Crete <olivier.crete@collabora.com>
8  *
9  * gstaudioaggregator.c:
10  *
11  * This library is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Library General Public
13  * License as published by the Free Software Foundation; either
14  * version 2 of the License, or (at your option) any later version.
15  *
16  * This library is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19  * Library General Public License for more details.
20  *
21  * You should have received a copy of the GNU Library General Public
22  * License along with this library; if not, write to the
23  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24  * Boston, MA 02110-1301, USA.
25  */
26 /**
27  * SECTION: gstaudioaggregator
28  * @title: GstAudioAggregator
29  * @short_description: Base class that manages a set of audio input pads
30  * with the purpose of aggregating or mixing their raw audio input buffers
31  * @see_also: #GstAggregator, #GstAudioMixer
32  *
33  * Subclasses must use (a subclass of) #GstAudioAggregatorPad for both
34  * their source and sink pads,
35  * gst_element_class_add_static_pad_template_with_gtype() is a convenient
36  * helper.
37  *
38  * #GstAudioAggregator can perform conversion on the data arriving
39  * on its sink pads, based on the format expected downstream: in order
40  * to enable that behaviour, the GType of the sink pads must either be
41  * a (subclass of) #GstAudioAggregatorConvertPad to use the default
42  * #GstAudioConverter implementation, or a subclass of #GstAudioAggregatorPad
43  * implementing #GstAudioAggregatorPadClass.convert_buffer.
44  *
45  * To allow for the output caps to change, the mechanism is the same as
46  * above, with the GType of the source pad.
47  *
48  * See #GstAudioMixer for an example.
49  *
50  * When conversion is enabled, #GstAudioAggregator will accept
51  * any type of raw audio caps and perform conversion
52  * on the data arriving on its sink pads, with whatever downstream
53  * expects as the target format.
54  *
55  * In case downstream caps are not fully fixated, it will use
56  * the first configured sink pad to finish fixating its source pad
57  * caps.
58  *
59  * A notable exception for now is the sample rate, sink pads must
60  * have the same sample rate as either the downstream requirement,
61  * or the first configured pad, or a combination of both (when
62  * downstream specifies a range or a set of acceptable rates).
63  *
64  * The #GstAggregator::samples-selected signal is provided with some
65  * additional information about the output buffer:
66  * - "offset"  G_TYPE_UINT64   Offset in samples since segment start
67  *   for the position that is next to be filled in the output buffer.
68  * - "frames"  G_TYPE_UINT   Number of frames per output buffer.
69  *
70  * In addition the gst_aggregator_peek_next_sample() function returns
71  * additional information in the info #GstStructure of the returned sample:
72  * - "output-offset"  G_TYPE_UINT64   Sample offset in output segment relative to
73  *   the output segment's start where the current position of this input
74  *   buffer would be placed
75  * - "position"  G_TYPE_UINT   current position in the input buffer in samples
76  * - "size"  G_TYPE_UINT   size of the input buffer in samples
77  */
78 
79 
80 #ifdef HAVE_CONFIG_H
81 #  include "config.h"
82 #endif
83 
84 #include "gstaudioaggregator.h"
85 
86 #include <string.h>
87 
88 GST_DEBUG_CATEGORY_STATIC (audio_aggregator_debug);
89 #define GST_CAT_DEFAULT audio_aggregator_debug
90 
91 enum
92 {
93   PROP_PAD_0,
94   PROP_PAD_QOS_MESSAGES,
95 };
96 
97 struct _GstAudioAggregatorPadPrivate
98 {
99   /* All members are protected by the pad object lock */
100 
101   GstBuffer *buffer;            /* current buffer we're mixing, for
102                                    comparison with a new input buffer from
103                                    aggregator to see if we need to update our
104                                    cached values. */
105 
106   guint position, size;         /* position in the input buffer and size of the
107                                    input buffer in number of samples */
108 
109   guint64 output_offset;        /* Sample offset in output segment relative to
110                                    srcpad.segment.start where the current position
111                                    of this input_buffer would be placed. */
112 
113   guint64 next_offset;          /* Next expected sample offset relative to
114                                    pad.segment.start. This is -1 when resyncing is
115                                    needed, e.g. because of a previous discont. */
116 
117   /* Last time we noticed a discont */
118   GstClockTime discont_time;
119 
120   /* A new unhandled segment event has been received */
121   gboolean new_segment;
122 
123   guint64 processed;            /* Number of samples processed since the element came out of READY */
124   guint64 dropped;              /* Number of sampels dropped since the element came out of READY */
125 
126   gboolean qos_messages;        /* Property to decide to send QoS messages or not */
127 };
128 
129 
130 /*****************************************
131  * GstAudioAggregatorPad implementation  *
132  *****************************************/
133 G_DEFINE_TYPE_WITH_PRIVATE (GstAudioAggregatorPad, gst_audio_aggregator_pad,
134     GST_TYPE_AGGREGATOR_PAD);
135 
136 static GstFlowReturn
137 gst_audio_aggregator_pad_flush_pad (GstAggregatorPad * aggpad,
138     GstAggregator * aggregator);
139 
140 static void
gst_audio_aggregator_pad_finalize(GObject * object)141 gst_audio_aggregator_pad_finalize (GObject * object)
142 {
143   GstAudioAggregatorPad *pad = (GstAudioAggregatorPad *) object;
144 
145   gst_buffer_replace (&pad->priv->buffer, NULL);
146 
147   G_OBJECT_CLASS (gst_audio_aggregator_pad_parent_class)->finalize (object);
148 }
149 
150 static void
gst_audio_aggregator_pad_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)151 gst_audio_aggregator_pad_get_property (GObject * object, guint prop_id,
152     GValue * value, GParamSpec * pspec)
153 {
154   GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (object);
155 
156   switch (prop_id) {
157     case PROP_PAD_QOS_MESSAGES:
158       GST_OBJECT_LOCK (pad);
159       g_value_set_boolean (value, pad->priv->qos_messages);
160       GST_OBJECT_UNLOCK (pad);
161       break;
162     default:
163       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
164       break;
165   }
166 }
167 
168 static void
gst_audio_aggregator_pad_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)169 gst_audio_aggregator_pad_set_property (GObject * object, guint prop_id,
170     const GValue * value, GParamSpec * pspec)
171 {
172   GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (object);
173 
174   switch (prop_id) {
175     case PROP_PAD_QOS_MESSAGES:
176       GST_OBJECT_LOCK (pad);
177       pad->priv->qos_messages = g_value_get_boolean (value);
178       GST_OBJECT_UNLOCK (pad);
179       break;
180     default:
181       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
182       break;
183   }
184 }
185 
186 static void
gst_audio_aggregator_pad_class_init(GstAudioAggregatorPadClass * klass)187 gst_audio_aggregator_pad_class_init (GstAudioAggregatorPadClass * klass)
188 {
189   GObjectClass *gobject_class = (GObjectClass *) klass;
190   GstAggregatorPadClass *aggpadclass = (GstAggregatorPadClass *) klass;
191 
192   gobject_class->set_property = gst_audio_aggregator_pad_set_property;
193   gobject_class->get_property = gst_audio_aggregator_pad_get_property;
194   gobject_class->finalize = gst_audio_aggregator_pad_finalize;
195   aggpadclass->flush = GST_DEBUG_FUNCPTR (gst_audio_aggregator_pad_flush_pad);
196 
197   /**
198    * GstAudioAggregatorPad:qos-messages:
199    *
200    * Emit QoS messages when dropping buffers.
201    *
202    * Since: 1.20
203    */
204   g_object_class_install_property (gobject_class,
205       PROP_PAD_QOS_MESSAGES, g_param_spec_boolean ("qos-messages",
206           "Quality of Service Messages",
207           "Emit QoS messages when dropping buffers", FALSE,
208           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
209 }
210 
211 static void
gst_audio_aggregator_pad_init(GstAudioAggregatorPad * pad)212 gst_audio_aggregator_pad_init (GstAudioAggregatorPad * pad)
213 {
214   pad->priv = gst_audio_aggregator_pad_get_instance_private (pad);
215 
216   gst_audio_info_init (&pad->info);
217 
218   pad->priv->buffer = NULL;
219   pad->priv->position = 0;
220   pad->priv->size = 0;
221   pad->priv->output_offset = -1;
222   pad->priv->next_offset = -1;
223   pad->priv->discont_time = GST_CLOCK_TIME_NONE;
224 }
225 
226 /* Must be called from srcpad thread or when it is stopped */
227 static void
gst_audio_aggregator_pad_reset_qos(GstAudioAggregatorPad * pad)228 gst_audio_aggregator_pad_reset_qos (GstAudioAggregatorPad * pad)
229 {
230   pad->priv->dropped = 0;
231   pad->priv->processed = 0;
232 }
233 
234 static GstFlowReturn
gst_audio_aggregator_pad_flush_pad(GstAggregatorPad * aggpad,GstAggregator * aggregator)235 gst_audio_aggregator_pad_flush_pad (GstAggregatorPad * aggpad,
236     GstAggregator * aggregator)
237 {
238   GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (aggpad);
239 
240   GST_OBJECT_LOCK (aggpad);
241   pad->priv->position = pad->priv->size = 0;
242   pad->priv->output_offset = pad->priv->next_offset = -1;
243   pad->priv->discont_time = GST_CLOCK_TIME_NONE;
244   gst_buffer_replace (&pad->priv->buffer, NULL);
245   gst_audio_aggregator_pad_reset_qos (pad);
246   GST_OBJECT_UNLOCK (aggpad);
247 
248   return GST_FLOW_OK;
249 }
250 
251 enum
252 {
253   PROP_CONVERT_PAD_0,
254   PROP_CONVERT_PAD_CONVERTER_CONFIG
255 };
256 
257 struct _GstAudioAggregatorConvertPadPrivate
258 {
259   /* All members are protected by the pad object lock */
260   GstAudioConverter *converter;
261   GstStructure *converter_config;
262   gboolean converter_config_changed;
263 };
264 
265 
266 G_DEFINE_TYPE_WITH_PRIVATE (GstAudioAggregatorConvertPad,
267     gst_audio_aggregator_convert_pad, GST_TYPE_AUDIO_AGGREGATOR_PAD);
268 
269 static gboolean
gst_audio_aggregator_convert_pad_update_converter(GstAudioAggregatorConvertPad * aaggcpad,GstAudioInfo * in_info,GstAudioInfo * out_info)270 gst_audio_aggregator_convert_pad_update_converter (GstAudioAggregatorConvertPad
271     * aaggcpad, GstAudioInfo * in_info, GstAudioInfo * out_info)
272 {
273   GstStructure *config = aaggcpad->priv->converter_config;
274   GstAudioConverter *converter;
275 
276   if (!aaggcpad->priv->converter_config_changed) {
277     return TRUE;
278   }
279 
280   g_clear_pointer (&aaggcpad->priv->converter, gst_audio_converter_free);
281 
282   if (in_info->finfo->format == GST_AUDIO_FORMAT_UNKNOWN) {
283     /* If we haven't received caps yet, this pad should not have
284      * a buffer to convert anyway */
285     GST_FIXME_OBJECT (aaggcpad, "UNREACHABLE CODE: Unknown input format");
286     return FALSE;
287   }
288 
289   converter =
290       gst_audio_converter_new (GST_AUDIO_CONVERTER_FLAG_NONE, in_info, out_info,
291       config ? gst_structure_copy (config) : NULL);
292 
293   if (converter == NULL) {
294     /* Not converting when we need to but the config is invalid (e.g. because
295      * the mix-matrix is not the right size) produces garbage. An invalid
296      * config causes a GST_FLOW_NOT_NEGOTIATED. */
297     GST_WARNING_OBJECT (aaggcpad, "Failed to update converter");
298     return FALSE;
299   }
300 
301   aaggcpad->priv->converter_config_changed = FALSE;
302 
303   if (!gst_audio_converter_is_passthrough (converter))
304     aaggcpad->priv->converter = converter;
305   else
306     gst_audio_converter_free (converter);
307 
308   return TRUE;
309 }
310 
311 static void
gst_audio_aggregator_pad_update_conversion_info(GstAudioAggregatorPad * aaggpad)312 gst_audio_aggregator_pad_update_conversion_info (GstAudioAggregatorPad *
313     aaggpad)
314 {
315   GST_AUDIO_AGGREGATOR_CONVERT_PAD (aaggpad)->priv->converter_config_changed =
316       TRUE;
317 }
318 
319 static GstBuffer *
gst_audio_aggregator_convert_pad_convert_buffer(GstAudioAggregatorPad * aaggpad,GstAudioInfo * in_info,GstAudioInfo * out_info,GstBuffer * input_buffer)320 gst_audio_aggregator_convert_pad_convert_buffer (GstAudioAggregatorPad *
321     aaggpad, GstAudioInfo * in_info, GstAudioInfo * out_info,
322     GstBuffer * input_buffer)
323 {
324   GstBuffer *res;
325   GstAudioAggregatorConvertPad *aaggcpad =
326       GST_AUDIO_AGGREGATOR_CONVERT_PAD (aaggpad);
327 
328   if (!gst_audio_aggregator_convert_pad_update_converter (aaggcpad, in_info,
329           out_info)) {
330     return NULL;
331   }
332 
333   if (aaggcpad->priv->converter) {
334     gint insize = gst_buffer_get_size (input_buffer);
335     gsize insamples = insize / in_info->bpf;
336     gsize outsamples =
337         gst_audio_converter_get_out_frames (aaggcpad->priv->converter,
338         insamples);
339     gint outsize = outsamples * out_info->bpf;
340     GstMapInfo inmap, outmap;
341 
342     res = gst_buffer_new_allocate (NULL, outsize, NULL);
343 
344     /* We create a perfectly similar buffer, except obviously for
345      * its converted contents */
346     gst_buffer_copy_into (res, input_buffer,
347         GST_BUFFER_COPY_FLAGS | GST_BUFFER_COPY_TIMESTAMPS |
348         GST_BUFFER_COPY_META, 0, -1);
349 
350     gst_buffer_map (input_buffer, &inmap, GST_MAP_READ);
351     gst_buffer_map (res, &outmap, GST_MAP_WRITE);
352 
353     gst_audio_converter_samples (aaggcpad->priv->converter,
354         GST_AUDIO_CONVERTER_FLAG_NONE,
355         (gpointer *) & inmap.data, insamples,
356         (gpointer *) & outmap.data, outsamples);
357 
358     gst_buffer_unmap (input_buffer, &inmap);
359     gst_buffer_unmap (res, &outmap);
360   } else {
361     res = gst_buffer_ref (input_buffer);
362   }
363 
364   return res;
365 }
366 
367 static void
gst_audio_aggregator_convert_pad_finalize(GObject * object)368 gst_audio_aggregator_convert_pad_finalize (GObject * object)
369 {
370   GstAudioAggregatorConvertPad *pad = (GstAudioAggregatorConvertPad *) object;
371 
372   if (pad->priv->converter)
373     gst_audio_converter_free (pad->priv->converter);
374 
375   if (pad->priv->converter_config)
376     gst_structure_free (pad->priv->converter_config);
377 
378   G_OBJECT_CLASS (gst_audio_aggregator_convert_pad_parent_class)->finalize
379       (object);
380 }
381 
382 static void
gst_audio_aggregator_convert_pad_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)383 gst_audio_aggregator_convert_pad_get_property (GObject * object, guint prop_id,
384     GValue * value, GParamSpec * pspec)
385 {
386   GstAudioAggregatorConvertPad *pad = GST_AUDIO_AGGREGATOR_CONVERT_PAD (object);
387 
388   switch (prop_id) {
389     case PROP_CONVERT_PAD_CONVERTER_CONFIG:
390       GST_OBJECT_LOCK (pad);
391       if (pad->priv->converter_config)
392         g_value_set_boxed (value, pad->priv->converter_config);
393       GST_OBJECT_UNLOCK (pad);
394       break;
395     default:
396       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
397       break;
398   }
399 }
400 
401 static void
gst_audio_aggregator_convert_pad_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)402 gst_audio_aggregator_convert_pad_set_property (GObject * object, guint prop_id,
403     const GValue * value, GParamSpec * pspec)
404 {
405   GstAudioAggregatorConvertPad *pad = GST_AUDIO_AGGREGATOR_CONVERT_PAD (object);
406 
407   switch (prop_id) {
408     case PROP_CONVERT_PAD_CONVERTER_CONFIG:
409       GST_OBJECT_LOCK (pad);
410       if (pad->priv->converter_config)
411         gst_structure_free (pad->priv->converter_config);
412       pad->priv->converter_config = g_value_dup_boxed (value);
413       pad->priv->converter_config_changed = TRUE;
414       GST_OBJECT_UNLOCK (pad);
415       break;
416     default:
417       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
418       break;
419   }
420 }
421 
422 static void
gst_audio_aggregator_convert_pad_class_init(GstAudioAggregatorConvertPadClass * klass)423 gst_audio_aggregator_convert_pad_class_init (GstAudioAggregatorConvertPadClass *
424     klass)
425 {
426   GObjectClass *gobject_class = (GObjectClass *) klass;
427   GstAudioAggregatorPadClass *aaggpad_class =
428       (GstAudioAggregatorPadClass *) klass;
429 
430   gobject_class->set_property = gst_audio_aggregator_convert_pad_set_property;
431   gobject_class->get_property = gst_audio_aggregator_convert_pad_get_property;
432 
433   g_object_class_install_property (gobject_class,
434       PROP_CONVERT_PAD_CONVERTER_CONFIG,
435       g_param_spec_boxed ("converter-config", "Converter configuration",
436           "A GstStructure describing the configuration that should be used "
437           "when converting this pad's audio buffers",
438           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
439 
440   aaggpad_class->convert_buffer =
441       gst_audio_aggregator_convert_pad_convert_buffer;
442 
443   aaggpad_class->update_conversion_info =
444       gst_audio_aggregator_pad_update_conversion_info;
445 
446   gobject_class->finalize = gst_audio_aggregator_convert_pad_finalize;
447 }
448 
449 static void
gst_audio_aggregator_convert_pad_init(GstAudioAggregatorConvertPad * pad)450 gst_audio_aggregator_convert_pad_init (GstAudioAggregatorConvertPad * pad)
451 {
452   pad->priv = gst_audio_aggregator_convert_pad_get_instance_private (pad);
453 }
454 
455 /**************************************
456  * GstAudioAggregator implementation  *
457  **************************************/
458 
459 struct _GstAudioAggregatorPrivate
460 {
461   GMutex mutex;
462 
463   /* All three properties are unprotected, can't be modified while streaming */
464   /* Size in frames that is output per buffer */
465   GstClockTime alignment_threshold;
466   GstClockTime discont_wait;
467 
468   gint output_buffer_duration_n;
469   gint output_buffer_duration_d;
470 
471   guint samples_per_buffer;
472   guint error_per_buffer;
473   guint accumulated_error;
474   guint current_blocksize;
475 
476   /* Protected by srcpad stream clock */
477   /* Output buffer starting at offset containing blocksize frames (calculated
478    * from output_buffer_duration) */
479   GstBuffer *current_buffer;
480 
481   /* counters to keep track of timestamps */
482   /* Readable with object lock, writable with both aag lock and object lock */
483 
484   /* Sample offset starting from 0 at aggregator.segment.start */
485   gint64 offset;
486 
487   /* info structure passed to selected-samples signal, must only be accessed
488    * from the aggregate thread */
489   GstStructure *selected_samples_info;
490 
491   /* Only access from src thread */
492   /* Messages to post after releasing locks */
493   GQueue messages;
494 };
495 
496 #define GST_AUDIO_AGGREGATOR_LOCK(self)   g_mutex_lock (&(self)->priv->mutex);
497 #define GST_AUDIO_AGGREGATOR_UNLOCK(self) g_mutex_unlock (&(self)->priv->mutex);
498 
499 static void gst_audio_aggregator_set_property (GObject * object, guint prop_id,
500     const GValue * value, GParamSpec * pspec);
501 static void gst_audio_aggregator_get_property (GObject * object, guint prop_id,
502     GValue * value, GParamSpec * pspec);
503 static void gst_audio_aggregator_dispose (GObject * object);
504 
505 static gboolean gst_audio_aggregator_src_event (GstAggregator * agg,
506     GstEvent * event);
507 static gboolean gst_audio_aggregator_sink_event (GstAggregator * agg,
508     GstAggregatorPad * aggpad, GstEvent * event);
509 static gboolean gst_audio_aggregator_src_query (GstAggregator * agg,
510     GstQuery * query);
511 static gboolean
512 gst_audio_aggregator_sink_query (GstAggregator * agg, GstAggregatorPad * aggpad,
513     GstQuery * query);
514 static gboolean gst_audio_aggregator_start (GstAggregator * agg);
515 static gboolean gst_audio_aggregator_stop (GstAggregator * agg);
516 static GstFlowReturn gst_audio_aggregator_flush (GstAggregator * agg);
517 
518 static GstBuffer *gst_audio_aggregator_create_output_buffer (GstAudioAggregator
519     * aagg, guint num_frames);
520 static GstBuffer *gst_audio_aggregator_do_clip (GstAggregator * agg,
521     GstAggregatorPad * bpad, GstBuffer * buffer);
522 static GstFlowReturn gst_audio_aggregator_aggregate (GstAggregator * agg,
523     gboolean timeout);
524 static gboolean sync_pad_values (GstElement * aagg, GstPad * pad, gpointer ud);
525 static gboolean gst_audio_aggregator_negotiated_src_caps (GstAggregator * agg,
526     GstCaps * caps);
527 static GstFlowReturn
528 gst_audio_aggregator_update_src_caps (GstAggregator * agg,
529     GstCaps * caps, GstCaps ** ret);
530 static GstCaps *gst_audio_aggregator_fixate_src_caps (GstAggregator * agg,
531     GstCaps * caps);
532 static GstSample *gst_audio_aggregator_peek_next_sample (GstAggregator * agg,
533     GstAggregatorPad * aggpad);
534 
535 #define DEFAULT_OUTPUT_BUFFER_DURATION (10 * GST_MSECOND)
536 #define DEFAULT_ALIGNMENT_THRESHOLD   (40 * GST_MSECOND)
537 #define DEFAULT_DISCONT_WAIT (1 * GST_SECOND)
538 #define DEFAULT_OUTPUT_BUFFER_DURATION_N (1)
539 #define DEFAULT_OUTPUT_BUFFER_DURATION_D (100)
540 
541 enum
542 {
543   PROP_0,
544   PROP_OUTPUT_BUFFER_DURATION,
545   PROP_ALIGNMENT_THRESHOLD,
546   PROP_DISCONT_WAIT,
547   PROP_OUTPUT_BUFFER_DURATION_FRACTION,
548   PROP_IGNORE_INACTIVE_PADS,
549 };
550 
551 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GstAudioAggregator, gst_audio_aggregator,
552     GST_TYPE_AGGREGATOR);
553 
554 static GstBuffer *
gst_audio_aggregator_convert_buffer(GstAudioAggregator * aagg,GstPad * pad,GstAudioInfo * in_info,GstAudioInfo * out_info,GstBuffer * buffer)555 gst_audio_aggregator_convert_buffer (GstAudioAggregator * aagg, GstPad * pad,
556     GstAudioInfo * in_info, GstAudioInfo * out_info, GstBuffer * buffer)
557 {
558   GstAudioAggregatorPadClass *klass = GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (pad);
559   GstAudioAggregatorPad *aaggpad = GST_AUDIO_AGGREGATOR_PAD (pad);
560 
561   g_assert (klass->convert_buffer);
562 
563   return klass->convert_buffer (aaggpad, in_info, out_info, buffer);
564 }
565 
566 static void
gst_audio_aggregator_translate_output_buffer_duration(GstAudioAggregator * aagg,GstClockTime duration)567 gst_audio_aggregator_translate_output_buffer_duration (GstAudioAggregator *
568     aagg, GstClockTime duration)
569 {
570   gint gcd;
571 
572   aagg->priv->output_buffer_duration_n = duration;
573   aagg->priv->output_buffer_duration_d = GST_SECOND;
574 
575   gcd = gst_util_greatest_common_divisor (aagg->priv->output_buffer_duration_n,
576       aagg->priv->output_buffer_duration_d);
577 
578   if (gcd) {
579     aagg->priv->output_buffer_duration_n /= gcd;
580     aagg->priv->output_buffer_duration_d /= gcd;
581   }
582 }
583 
584 static gboolean
gst_audio_aggregator_update_samples_per_buffer(GstAudioAggregator * aagg)585 gst_audio_aggregator_update_samples_per_buffer (GstAudioAggregator * aagg)
586 {
587   gboolean ret = TRUE;
588   GstAudioAggregatorPad *srcpad =
589       GST_AUDIO_AGGREGATOR_PAD (GST_AGGREGATOR_SRC_PAD (aagg));
590 
591   if (!srcpad->info.finfo
592       || GST_AUDIO_INFO_FORMAT (&srcpad->info) == GST_AUDIO_FORMAT_UNKNOWN) {
593     ret = FALSE;
594     goto out;
595   }
596 
597   aagg->priv->samples_per_buffer =
598       (((guint64) GST_AUDIO_INFO_RATE (&srcpad->info)) *
599       aagg->priv->output_buffer_duration_n) /
600       aagg->priv->output_buffer_duration_d;
601 
602   if (aagg->priv->samples_per_buffer == 0) {
603     ret = FALSE;
604     goto out;
605   }
606 
607   aagg->priv->error_per_buffer =
608       (((guint64) GST_AUDIO_INFO_RATE (&srcpad->info)) *
609       aagg->priv->output_buffer_duration_n) %
610       aagg->priv->output_buffer_duration_d;
611   aagg->priv->accumulated_error = 0;
612 
613   GST_DEBUG_OBJECT (aagg, "Buffer duration: %u/%u",
614       aagg->priv->output_buffer_duration_n,
615       aagg->priv->output_buffer_duration_d);
616   GST_DEBUG_OBJECT (aagg, "Samples per buffer: %u (error: %u/%u)",
617       aagg->priv->samples_per_buffer, aagg->priv->error_per_buffer,
618       aagg->priv->output_buffer_duration_d);
619 
620 out:
621   return ret;
622 }
623 
624 static void
gst_audio_aggregator_recalculate_latency(GstAudioAggregator * aagg)625 gst_audio_aggregator_recalculate_latency (GstAudioAggregator * aagg)
626 {
627   guint64 latency = gst_util_uint64_scale_int (GST_SECOND,
628       aagg->priv->output_buffer_duration_n,
629       aagg->priv->output_buffer_duration_d);
630 
631   gst_aggregator_set_latency (GST_AGGREGATOR (aagg), latency, latency);
632 
633   GST_OBJECT_LOCK (aagg);
634   /* Force recalculating in aggregate */
635   aagg->priv->samples_per_buffer = 0;
636   GST_OBJECT_UNLOCK (aagg);
637 }
638 
639 static void
gst_audio_aggregator_class_init(GstAudioAggregatorClass * klass)640 gst_audio_aggregator_class_init (GstAudioAggregatorClass * klass)
641 {
642   GObjectClass *gobject_class = (GObjectClass *) klass;
643   GstAggregatorClass *gstaggregator_class = (GstAggregatorClass *) klass;
644 
645   gobject_class->set_property = gst_audio_aggregator_set_property;
646   gobject_class->get_property = gst_audio_aggregator_get_property;
647   gobject_class->dispose = gst_audio_aggregator_dispose;
648 
649   gstaggregator_class->src_event =
650       GST_DEBUG_FUNCPTR (gst_audio_aggregator_src_event);
651   gstaggregator_class->sink_event =
652       GST_DEBUG_FUNCPTR (gst_audio_aggregator_sink_event);
653   gstaggregator_class->src_query =
654       GST_DEBUG_FUNCPTR (gst_audio_aggregator_src_query);
655   gstaggregator_class->sink_query = gst_audio_aggregator_sink_query;
656   gstaggregator_class->start = gst_audio_aggregator_start;
657   gstaggregator_class->stop = gst_audio_aggregator_stop;
658   gstaggregator_class->flush = gst_audio_aggregator_flush;
659   gstaggregator_class->aggregate =
660       GST_DEBUG_FUNCPTR (gst_audio_aggregator_aggregate);
661   gstaggregator_class->clip = GST_DEBUG_FUNCPTR (gst_audio_aggregator_do_clip);
662   gstaggregator_class->get_next_time = gst_aggregator_simple_get_next_time;
663   gstaggregator_class->update_src_caps =
664       GST_DEBUG_FUNCPTR (gst_audio_aggregator_update_src_caps);
665   gstaggregator_class->fixate_src_caps = gst_audio_aggregator_fixate_src_caps;
666   gstaggregator_class->negotiated_src_caps =
667       gst_audio_aggregator_negotiated_src_caps;
668   gstaggregator_class->peek_next_sample = gst_audio_aggregator_peek_next_sample;
669 
670   klass->create_output_buffer = gst_audio_aggregator_create_output_buffer;
671 
672   GST_DEBUG_CATEGORY_INIT (audio_aggregator_debug, "audioaggregator",
673       GST_DEBUG_FG_MAGENTA, "GstAudioAggregator");
674 
675   g_object_class_install_property (gobject_class, PROP_OUTPUT_BUFFER_DURATION,
676       g_param_spec_uint64 ("output-buffer-duration", "Output Buffer Duration",
677           "Output block size in nanoseconds", 1,
678           G_MAXUINT64, DEFAULT_OUTPUT_BUFFER_DURATION,
679           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
680 
681   /**
682    * GstAudioAggregator:output-buffer-duration-fraction:
683    *
684    * Output block size in nanoseconds, expressed as a fraction.
685    *
686    * Since: 1.18
687    */
688   g_object_class_install_property (gobject_class,
689       PROP_OUTPUT_BUFFER_DURATION_FRACTION,
690       gst_param_spec_fraction ("output-buffer-duration-fraction",
691           "Output buffer duration fraction",
692           "Output block size in nanoseconds, expressed as a fraction", 1,
693           G_MAXINT, G_MAXINT, 1, DEFAULT_OUTPUT_BUFFER_DURATION_N,
694           DEFAULT_OUTPUT_BUFFER_DURATION_D,
695           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
696           GST_PARAM_MUTABLE_READY));
697 
698   g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
699       g_param_spec_uint64 ("alignment-threshold", "Alignment Threshold",
700           "Timestamp alignment threshold in nanoseconds", 0,
701           G_MAXUINT64 - 1, DEFAULT_ALIGNMENT_THRESHOLD,
702           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
703 
704   g_object_class_install_property (gobject_class, PROP_DISCONT_WAIT,
705       g_param_spec_uint64 ("discont-wait", "Discont Wait",
706           "Window of time in nanoseconds to wait before "
707           "creating a discontinuity", 0,
708           G_MAXUINT64 - 1, DEFAULT_DISCONT_WAIT,
709           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
710           GST_PARAM_MUTABLE_PLAYING));
711 
712   /**
713    * GstAudioAggregator:ignore-inactive-pads:
714    *
715    * Don't wait for inactive pads when live. An inactive pad
716    * is a pad that hasn't yet received a buffer, but that has
717    * been waited on at least once.
718    *
719    * The purpose of this property is to avoid aggregating on
720    * timeout when new pads are requested in advance of receiving
721    * data flow, for example the user may decide to connect it later,
722    * but wants to configure it already.
723    *
724    * Since: 1.20
725    */
726   g_object_class_install_property (gobject_class,
727       PROP_IGNORE_INACTIVE_PADS, g_param_spec_boolean ("ignore-inactive-pads",
728           "Ignore inactive pads",
729           "Avoid timing out waiting for inactive pads", FALSE,
730           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
731 }
732 
733 static void
gst_audio_aggregator_init(GstAudioAggregator * aagg)734 gst_audio_aggregator_init (GstAudioAggregator * aagg)
735 {
736   aagg->priv = gst_audio_aggregator_get_instance_private (aagg);
737 
738   g_mutex_init (&aagg->priv->mutex);
739 
740   aagg->priv->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
741   aagg->priv->discont_wait = DEFAULT_DISCONT_WAIT;
742 
743   gst_audio_aggregator_translate_output_buffer_duration (aagg,
744       DEFAULT_OUTPUT_BUFFER_DURATION);
745   gst_audio_aggregator_recalculate_latency (aagg);
746 
747   aagg->current_caps = NULL;
748 
749   aagg->priv->selected_samples_info =
750       gst_structure_new_empty ("GstAudioAggregatorSelectedSamplesInfo");
751 
752   g_queue_init (&aagg->priv->messages);
753 }
754 
755 static void
gst_audio_aggregator_dispose(GObject * object)756 gst_audio_aggregator_dispose (GObject * object)
757 {
758   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (object);
759 
760   gst_caps_replace (&aagg->current_caps, NULL);
761 
762   gst_clear_structure (&aagg->priv->selected_samples_info);
763 
764   g_mutex_clear (&aagg->priv->mutex);
765 
766   G_OBJECT_CLASS (gst_audio_aggregator_parent_class)->dispose (object);
767 }
768 
769 static void
gst_audio_aggregator_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)770 gst_audio_aggregator_set_property (GObject * object, guint prop_id,
771     const GValue * value, GParamSpec * pspec)
772 {
773   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (object);
774 
775   switch (prop_id) {
776     case PROP_OUTPUT_BUFFER_DURATION:
777       gst_audio_aggregator_translate_output_buffer_duration (aagg,
778           g_value_get_uint64 (value));
779       g_object_notify (object, "output-buffer-duration-fraction");
780       gst_audio_aggregator_recalculate_latency (aagg);
781       break;
782     case PROP_ALIGNMENT_THRESHOLD:
783       aagg->priv->alignment_threshold = g_value_get_uint64 (value);
784       break;
785     case PROP_DISCONT_WAIT:
786       aagg->priv->discont_wait = g_value_get_uint64 (value);
787       break;
788     case PROP_OUTPUT_BUFFER_DURATION_FRACTION:
789       aagg->priv->output_buffer_duration_n =
790           gst_value_get_fraction_numerator (value);
791       aagg->priv->output_buffer_duration_d =
792           gst_value_get_fraction_denominator (value);
793       g_object_notify (object, "output-buffer-duration");
794       gst_audio_aggregator_recalculate_latency (aagg);
795       break;
796     case PROP_IGNORE_INACTIVE_PADS:
797       gst_aggregator_set_ignore_inactive_pads (GST_AGGREGATOR (object),
798           g_value_get_boolean (value));
799       break;
800     default:
801       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
802       break;
803   }
804 }
805 
806 static void
gst_audio_aggregator_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)807 gst_audio_aggregator_get_property (GObject * object, guint prop_id,
808     GValue * value, GParamSpec * pspec)
809 {
810   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (object);
811 
812   switch (prop_id) {
813     case PROP_OUTPUT_BUFFER_DURATION:
814       g_value_set_uint64 (value, gst_util_uint64_scale_int (GST_SECOND,
815               aagg->priv->output_buffer_duration_n,
816               aagg->priv->output_buffer_duration_d));
817       break;
818     case PROP_ALIGNMENT_THRESHOLD:
819       g_value_set_uint64 (value, aagg->priv->alignment_threshold);
820       break;
821     case PROP_DISCONT_WAIT:
822       g_value_set_uint64 (value, aagg->priv->discont_wait);
823       break;
824     case PROP_OUTPUT_BUFFER_DURATION_FRACTION:
825       gst_value_set_fraction (value, aagg->priv->output_buffer_duration_n,
826           aagg->priv->output_buffer_duration_d);
827       break;
828     case PROP_IGNORE_INACTIVE_PADS:
829       g_value_set_boolean (value,
830           gst_aggregator_get_ignore_inactive_pads (GST_AGGREGATOR (object)));
831       break;
832     default:
833       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
834       break;
835   }
836 }
837 
838 /* Caps negotiation */
839 
840 /* Unref after usage */
841 static GstAudioAggregatorPad *
gst_audio_aggregator_get_first_configured_pad(GstAggregator * agg)842 gst_audio_aggregator_get_first_configured_pad (GstAggregator * agg)
843 {
844   GstAudioAggregatorPad *res = NULL;
845   GList *l;
846 
847   GST_OBJECT_LOCK (agg);
848   for (l = GST_ELEMENT (agg)->sinkpads; l; l = l->next) {
849     GstAudioAggregatorPad *aaggpad = l->data;
850 
851     if (GST_AUDIO_INFO_FORMAT (&aaggpad->info) != GST_AUDIO_FORMAT_UNKNOWN) {
852       res = gst_object_ref (aaggpad);
853       break;
854     }
855   }
856   GST_OBJECT_UNLOCK (agg);
857 
858   return res;
859 }
860 
861 static GstCaps *
gst_audio_aggregator_sink_getcaps(GstPad * pad,GstAggregator * agg,GstCaps * filter)862 gst_audio_aggregator_sink_getcaps (GstPad * pad, GstAggregator * agg,
863     GstCaps * filter)
864 {
865   GstAudioAggregatorPad *first_configured_pad =
866       gst_audio_aggregator_get_first_configured_pad (agg);
867   GstCaps *sink_template_caps = gst_pad_get_pad_template_caps (pad);
868   GstCaps *downstream_caps = gst_pad_get_allowed_caps (agg->srcpad);
869   GstCaps *sink_caps;
870 
871   GST_INFO_OBJECT (pad, "Getting caps with filter %" GST_PTR_FORMAT, filter);
872   GST_DEBUG_OBJECT (pad, "sink template caps : %" GST_PTR_FORMAT,
873       sink_template_caps);
874   GST_DEBUG_OBJECT (pad, "downstream caps %" GST_PTR_FORMAT, downstream_caps);
875 
876   /* If we already have a configured pad, assume that we can only configure
877    * to the very same format filtered with the template caps and continue
878    * with the result of that as the template caps */
879 
880   if (first_configured_pad) {
881     GstCaps *first_configured_caps =
882         gst_audio_info_to_caps (&first_configured_pad->info);
883     GstCaps *tmp;
884 
885     tmp =
886         gst_caps_intersect_full (sink_template_caps, first_configured_caps,
887         GST_CAPS_INTERSECT_FIRST);
888     gst_caps_unref (first_configured_caps);
889     gst_caps_unref (sink_template_caps);
890     sink_template_caps = tmp;
891 
892     gst_object_unref (first_configured_pad);
893   }
894 
895   /* If we have downstream caps, filter them against our template caps or
896    * the filtered first configured pad caps from above */
897   if (downstream_caps) {
898     sink_caps =
899         gst_caps_intersect_full (sink_template_caps, downstream_caps,
900         GST_CAPS_INTERSECT_FIRST);
901   } else {
902     sink_caps = gst_caps_ref (sink_template_caps);
903   }
904 
905   if (filter) {
906     GstCaps *tmp = gst_caps_intersect_full (sink_caps, filter,
907         GST_CAPS_INTERSECT_FIRST);
908 
909     gst_caps_unref (sink_caps);
910     sink_caps = tmp;
911   }
912 
913   gst_caps_unref (sink_template_caps);
914 
915   if (downstream_caps)
916     gst_caps_unref (downstream_caps);
917 
918   GST_INFO_OBJECT (pad, "returned sink caps : %" GST_PTR_FORMAT, sink_caps);
919 
920   return sink_caps;
921 }
922 
923 static GstCaps *
gst_audio_aggregator_convert_sink_getcaps(GstPad * pad,GstAggregator * agg,GstCaps * filter)924 gst_audio_aggregator_convert_sink_getcaps (GstPad * pad, GstAggregator * agg,
925     GstCaps * filter)
926 {
927   GstAudioAggregatorPad *first_configured_pad =
928       gst_audio_aggregator_get_first_configured_pad (agg);
929   GstCaps *sink_template_caps = gst_pad_get_pad_template_caps (pad);
930   GstCaps *downstream_caps = gst_pad_get_allowed_caps (agg->srcpad);
931   GstCaps *sink_caps;
932 
933   GST_INFO_OBJECT (pad, "Getting caps with filter %" GST_PTR_FORMAT, filter);
934   GST_DEBUG_OBJECT (pad, "sink template caps : %" GST_PTR_FORMAT,
935       sink_template_caps);
936   GST_DEBUG_OBJECT (pad, "downstream caps %" GST_PTR_FORMAT, downstream_caps);
937 
938   /* We can convert between all formats except for the sample rate, which has
939    * to match.  */
940 
941   /* If we have a first configured pad, we can only convert everything except
942    * for the sample rate, so modify our template caps to have exactly that
943    * sample rate in all structures */
944   if (first_configured_pad) {
945     GST_INFO_OBJECT (pad, "first configured pad has sample rate %d",
946         first_configured_pad->info.rate);
947     sink_template_caps = gst_caps_make_writable (sink_template_caps);
948     gst_caps_set_simple (sink_template_caps, "rate", G_TYPE_INT,
949         first_configured_pad->info.rate, NULL);
950     gst_object_unref (first_configured_pad);
951   }
952 
953   /* Now if we have downstream caps, filter against the template caps from
954    * above, i.e. with potentially fixated sample rate field already. This
955    * filters out any structures with unsupported rates.
956    *
957    * Afterwards we create new caps that only take over the rate fields of the
958    * remaining downstream caps, and filter that against the plain template
959    * caps to get the resulting allowed caps with conversion for everything but
960    * the rate */
961   if (downstream_caps) {
962     GstCaps *tmp;
963     guint i, n;
964 
965     tmp =
966         gst_caps_intersect_full (sink_template_caps, downstream_caps,
967         GST_CAPS_INTERSECT_FIRST);
968 
969     n = gst_caps_get_size (tmp);
970     sink_caps = gst_caps_new_empty ();
971     for (i = 0; i < n; i++) {
972       GstStructure *s = gst_caps_get_structure (tmp, i);
973       GstStructure *new_s =
974           gst_structure_new_empty (gst_structure_get_name (s));
975       gst_structure_set_value (new_s, "rate", gst_structure_get_value (s,
976               "rate"));
977       sink_caps = gst_caps_merge_structure (sink_caps, new_s);
978     }
979     gst_caps_unref (tmp);
980     tmp = sink_caps;
981 
982     sink_caps =
983         gst_caps_intersect_full (sink_template_caps, tmp,
984         GST_CAPS_INTERSECT_FIRST);
985     gst_caps_unref (tmp);
986   } else {
987     sink_caps = gst_caps_ref (sink_template_caps);
988   }
989 
990   /* And finally filter anything that remains against the filter caps */
991   if (filter) {
992     GstCaps *tmp =
993         gst_caps_intersect_full (filter, sink_caps, GST_CAPS_INTERSECT_FIRST);
994     gst_caps_unref (sink_caps);
995     sink_caps = tmp;
996   }
997 
998   GST_INFO_OBJECT (pad, "returned sink caps : %" GST_PTR_FORMAT, sink_caps);
999 
1000   gst_caps_unref (sink_template_caps);
1001 
1002   if (downstream_caps)
1003     gst_caps_unref (downstream_caps);
1004 
1005   return sink_caps;
1006 }
1007 
1008 static gboolean
gst_audio_aggregator_sink_setcaps(GstAudioAggregatorPad * aaggpad,GstAggregator * agg,GstCaps * caps)1009 gst_audio_aggregator_sink_setcaps (GstAudioAggregatorPad * aaggpad,
1010     GstAggregator * agg, GstCaps * caps)
1011 {
1012   GstAudioAggregatorPad *first_configured_pad =
1013       gst_audio_aggregator_get_first_configured_pad (agg);
1014   GstAudioInfo info;
1015   gboolean ret = TRUE;
1016   gboolean downstream_supports_rate = TRUE;
1017 
1018   if (!gst_audio_info_from_caps (&info, caps)) {
1019     GST_WARNING_OBJECT (aaggpad, "Rejecting invalid caps: %" GST_PTR_FORMAT,
1020         caps);
1021     return FALSE;
1022   }
1023 
1024   /* TODO: handle different rates on sinkpads, a bit complex
1025    * because offsets will have to be updated, and audio resampling
1026    * has a latency to take into account
1027    */
1028 
1029   /* Only check against the downstream caps if we didn't configure any caps
1030    * so far. Otherwise we already know that downstream supports the rate
1031    * because we negotiated with downstream */
1032   if (!first_configured_pad) {
1033     GstCaps *downstream_caps = gst_pad_get_allowed_caps (agg->srcpad);
1034 
1035     /* Returns NULL if there is no downstream peer */
1036     if (downstream_caps) {
1037       GstCaps *rate_caps =
1038           gst_caps_new_simple ("audio/x-raw", "rate", G_TYPE_INT, info.rate,
1039           NULL);
1040 
1041       gst_caps_set_features_simple (rate_caps,
1042           gst_caps_features_copy (GST_CAPS_FEATURES_ANY));
1043 
1044       downstream_supports_rate =
1045           gst_caps_can_intersect (rate_caps, downstream_caps);
1046       gst_caps_unref (rate_caps);
1047       gst_caps_unref (downstream_caps);
1048     }
1049   }
1050 
1051   if (!downstream_supports_rate || (first_configured_pad
1052           && info.rate != first_configured_pad->info.rate)) {
1053     GST_WARNING_OBJECT (aaggpad,
1054         "Sample rate %d can't be configured (downstream supported: %d, configured rate: %d)",
1055         info.rate, downstream_supports_rate,
1056         first_configured_pad ? first_configured_pad->info.rate : 0);
1057     gst_pad_push_event (GST_PAD (aaggpad), gst_event_new_reconfigure ());
1058     ret = FALSE;
1059   } else {
1060     GstAudioAggregatorPadClass *klass =
1061         GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (aaggpad);
1062     GST_OBJECT_LOCK (aaggpad);
1063     aaggpad->info = info;
1064     if (klass->update_conversion_info)
1065       klass->update_conversion_info (aaggpad);
1066     GST_OBJECT_UNLOCK (aaggpad);
1067   }
1068 
1069   if (first_configured_pad)
1070     gst_object_unref (first_configured_pad);
1071 
1072   return ret;
1073 }
1074 
1075 static GstFlowReturn
gst_audio_aggregator_update_src_caps(GstAggregator * agg,GstCaps * caps,GstCaps ** ret)1076 gst_audio_aggregator_update_src_caps (GstAggregator * agg,
1077     GstCaps * caps, GstCaps ** ret)
1078 {
1079   GstCaps *src_template_caps = gst_pad_get_pad_template_caps (agg->srcpad);
1080   GstCaps *downstream_caps =
1081       gst_pad_peer_query_caps (agg->srcpad, src_template_caps);
1082 
1083   gst_caps_unref (src_template_caps);
1084 
1085   *ret = gst_caps_intersect (caps, downstream_caps);
1086 
1087   GST_INFO ("Updated src caps to %" GST_PTR_FORMAT, *ret);
1088 
1089   if (downstream_caps)
1090     gst_caps_unref (downstream_caps);
1091 
1092   return GST_FLOW_OK;
1093 }
1094 
1095 /* At that point if the caps are not fixed, this means downstream
1096  * didn't have fully specified requirements, we'll just go ahead
1097  * and fixate raw audio fields using our first configured pad, we don't for
1098  * now need a more complicated heuristic
1099  */
1100 static GstCaps *
gst_audio_aggregator_fixate_src_caps(GstAggregator * agg,GstCaps * caps)1101 gst_audio_aggregator_fixate_src_caps (GstAggregator * agg, GstCaps * caps)
1102 {
1103   GstAudioAggregatorPad *first_configured_pad = NULL;
1104 
1105   if (GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (agg->srcpad)->convert_buffer)
1106     first_configured_pad = gst_audio_aggregator_get_first_configured_pad (agg);
1107 
1108   caps = gst_caps_make_writable (caps);
1109 
1110   if (first_configured_pad) {
1111     GstStructure *s, *s2;
1112     GstCaps *first_configured_caps =
1113         gst_audio_info_to_caps (&first_configured_pad->info);
1114     gint first_configured_rate, first_configured_channels;
1115     gint channels;
1116 
1117     s = gst_caps_get_structure (caps, 0);
1118     s2 = gst_caps_get_structure (first_configured_caps, 0);
1119 
1120     gst_structure_get_int (s2, "rate", &first_configured_rate);
1121     gst_structure_get_int (s2, "channels", &first_configured_channels);
1122 
1123     gst_structure_fixate_field_string (s, "format",
1124         gst_structure_get_string (s2, "format"));
1125     gst_structure_fixate_field_string (s, "layout",
1126         gst_structure_get_string (s2, "layout"));
1127     gst_structure_fixate_field_nearest_int (s, "rate", first_configured_rate);
1128     gst_structure_fixate_field_nearest_int (s, "channels",
1129         first_configured_channels);
1130 
1131     gst_structure_get_int (s, "channels", &channels);
1132 
1133     if (!gst_structure_has_field (s, "channel-mask") && channels > 2) {
1134       guint64 mask;
1135 
1136       if (!gst_structure_get (s2, "channel-mask", GST_TYPE_BITMASK, &mask,
1137               NULL)) {
1138         mask = gst_audio_channel_get_fallback_mask (channels);
1139       }
1140       gst_structure_set (s, "channel-mask", GST_TYPE_BITMASK, mask, NULL);
1141     }
1142 
1143     gst_caps_unref (first_configured_caps);
1144     gst_object_unref (first_configured_pad);
1145   } else {
1146     GstStructure *s;
1147     gint channels;
1148 
1149     s = gst_caps_get_structure (caps, 0);
1150 
1151     gst_structure_fixate_field_nearest_int (s, "rate", GST_AUDIO_DEF_RATE);
1152     gst_structure_fixate_field_string (s, "format", GST_AUDIO_NE ("S16"));
1153     gst_structure_fixate_field_string (s, "layout", "interleaved");
1154     gst_structure_fixate_field_nearest_int (s, "channels", 2);
1155 
1156     if (gst_structure_get_int (s, "channels", &channels) && channels > 2) {
1157       if (!gst_structure_has_field_typed (s, "channel-mask", GST_TYPE_BITMASK))
1158         gst_structure_set (s, "channel-mask", GST_TYPE_BITMASK, 0ULL, NULL);
1159     }
1160   }
1161 
1162   if (!gst_caps_is_fixed (caps))
1163     caps = gst_caps_fixate (caps);
1164 
1165   GST_INFO_OBJECT (agg, "Fixated src caps to %" GST_PTR_FORMAT, caps);
1166 
1167   return caps;
1168 }
1169 
1170 /* Must be called with OBJECT_LOCK taken */
1171 static gboolean
gst_audio_aggregator_update_converters(GstAudioAggregator * aagg,GstAudioInfo * new_info,GstAudioInfo * old_info)1172 gst_audio_aggregator_update_converters (GstAudioAggregator * aagg,
1173     GstAudioInfo * new_info, GstAudioInfo * old_info)
1174 {
1175   GList *l;
1176 
1177   for (l = GST_ELEMENT (aagg)->sinkpads; l; l = l->next) {
1178     GstAudioAggregatorPad *aaggpad = l->data;
1179     GstAudioAggregatorPadClass *klass =
1180         GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (aaggpad);
1181 
1182     if (klass->update_conversion_info)
1183       klass->update_conversion_info (aaggpad);
1184 
1185     /* If we currently were mixing a buffer, we need to convert it to the new
1186      * format */
1187     if (aaggpad->priv->buffer) {
1188       GstBuffer *new_converted_buffer =
1189           gst_audio_aggregator_convert_buffer (aagg, GST_PAD (aaggpad),
1190           old_info, new_info, aaggpad->priv->buffer);
1191       gst_buffer_replace (&aaggpad->priv->buffer, new_converted_buffer);
1192       if (new_converted_buffer)
1193         gst_buffer_unref (new_converted_buffer);
1194     }
1195   }
1196 
1197   return TRUE;
1198 }
1199 
1200 /* We now have our final output caps, we can create the required converters */
1201 static gboolean
gst_audio_aggregator_negotiated_src_caps(GstAggregator * agg,GstCaps * caps)1202 gst_audio_aggregator_negotiated_src_caps (GstAggregator * agg, GstCaps * caps)
1203 {
1204   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1205   GstAudioInfo info;
1206   GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
1207 
1208   GST_INFO_OBJECT (agg, "src caps negotiated %" GST_PTR_FORMAT, caps);
1209 
1210   if (!gst_audio_info_from_caps (&info, caps)) {
1211     GST_WARNING_OBJECT (aagg, "Rejecting invalid caps: %" GST_PTR_FORMAT, caps);
1212     return FALSE;
1213   }
1214 
1215   GST_AUDIO_AGGREGATOR_LOCK (aagg);
1216   GST_OBJECT_LOCK (aagg);
1217 
1218   if (!gst_audio_info_is_equal (&info, &srcpad->info)) {
1219     GstAudioInfo old_info = srcpad->info;
1220     GstAudioAggregatorPadClass *srcpad_klass =
1221         GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (agg->srcpad);
1222 
1223     GST_INFO_OBJECT (aagg, "setting caps to %" GST_PTR_FORMAT, caps);
1224     gst_caps_replace (&aagg->current_caps, caps);
1225 
1226     if (old_info.rate != info.rate)
1227       aagg->priv->offset = -1;
1228 
1229     memcpy (&srcpad->info, &info, sizeof (info));
1230 
1231     if (!gst_audio_aggregator_update_converters (aagg, &info, &old_info)) {
1232       GST_OBJECT_UNLOCK (aagg);
1233       GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1234       return FALSE;
1235     }
1236 
1237     if (srcpad_klass->update_conversion_info)
1238       srcpad_klass->update_conversion_info (GST_AUDIO_AGGREGATOR_PAD (agg->
1239               srcpad));
1240 
1241     if (aagg->priv->current_buffer) {
1242       GstBuffer *converted;
1243 
1244       converted =
1245           gst_audio_aggregator_convert_buffer (aagg, agg->srcpad, &old_info,
1246           &info, aagg->priv->current_buffer);
1247       gst_buffer_unref (aagg->priv->current_buffer);
1248       aagg->priv->current_buffer = converted;
1249       if (!converted) {
1250         GST_OBJECT_UNLOCK (aagg);
1251         GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1252         return FALSE;
1253       }
1254     }
1255 
1256     /* Force recalculating in aggregate */
1257     aagg->priv->samples_per_buffer = 0;
1258   }
1259 
1260   GST_OBJECT_UNLOCK (aagg);
1261   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1262 
1263   return
1264       GST_AGGREGATOR_CLASS
1265       (gst_audio_aggregator_parent_class)->negotiated_src_caps (agg, caps);
1266 }
1267 
1268 /* event handling */
1269 
1270 static gboolean
gst_audio_aggregator_src_event(GstAggregator * agg,GstEvent * event)1271 gst_audio_aggregator_src_event (GstAggregator * agg, GstEvent * event)
1272 {
1273   gboolean result;
1274 
1275   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1276   GST_DEBUG_OBJECT (agg->srcpad, "Got %s event on src pad",
1277       GST_EVENT_TYPE_NAME (event));
1278 
1279   switch (GST_EVENT_TYPE (event)) {
1280     case GST_EVENT_QOS:
1281       /* QoS might be tricky */
1282       gst_event_unref (event);
1283       return FALSE;
1284     case GST_EVENT_NAVIGATION:
1285       /* navigation is rather pointless. */
1286       gst_event_unref (event);
1287       return FALSE;
1288       break;
1289     case GST_EVENT_SEEK:
1290     {
1291       GstSeekFlags flags;
1292       gdouble rate;
1293       GstSeekType start_type, stop_type;
1294       gint64 start, stop;
1295       GstFormat seek_format, dest_format;
1296 
1297       /* parse the seek parameters */
1298       gst_event_parse_seek (event, &rate, &seek_format, &flags, &start_type,
1299           &start, &stop_type, &stop);
1300 
1301       /* Check the seeking parameters before linking up */
1302       if ((start_type != GST_SEEK_TYPE_NONE)
1303           && (start_type != GST_SEEK_TYPE_SET)) {
1304         result = FALSE;
1305         GST_DEBUG_OBJECT (aagg,
1306             "seeking failed, unhandled seek type for start: %d", start_type);
1307         goto done;
1308       }
1309       if ((stop_type != GST_SEEK_TYPE_NONE) && (stop_type != GST_SEEK_TYPE_SET)) {
1310         result = FALSE;
1311         GST_DEBUG_OBJECT (aagg,
1312             "seeking failed, unhandled seek type for end: %d", stop_type);
1313         goto done;
1314       }
1315 
1316       GST_OBJECT_LOCK (agg);
1317       dest_format = GST_AGGREGATOR_PAD (agg->srcpad)->segment.format;
1318       GST_OBJECT_UNLOCK (agg);
1319       if (seek_format != dest_format) {
1320         result = FALSE;
1321         GST_DEBUG_OBJECT (aagg,
1322             "seeking failed, unhandled seek format: %s",
1323             gst_format_get_name (seek_format));
1324         goto done;
1325       }
1326     }
1327       break;
1328     default:
1329       break;
1330   }
1331 
1332   return
1333       GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->src_event (agg,
1334       event);
1335 
1336 done:
1337   return result;
1338 }
1339 
1340 
1341 static gboolean
gst_audio_aggregator_sink_event(GstAggregator * agg,GstAggregatorPad * aggpad,GstEvent * event)1342 gst_audio_aggregator_sink_event (GstAggregator * agg,
1343     GstAggregatorPad * aggpad, GstEvent * event)
1344 {
1345   GstAudioAggregatorPad *aaggpad = GST_AUDIO_AGGREGATOR_PAD (aggpad);
1346   gboolean res = TRUE;
1347 
1348   GST_DEBUG_OBJECT (aggpad, "Got %s event on sink pad",
1349       GST_EVENT_TYPE_NAME (event));
1350 
1351   switch (GST_EVENT_TYPE (event)) {
1352     case GST_EVENT_SEGMENT:
1353     {
1354       const GstSegment *segment;
1355       gst_event_parse_segment (event, &segment);
1356 
1357       if (segment->format != GST_FORMAT_TIME) {
1358         GST_ERROR_OBJECT (aggpad, "Segment of type %s are not supported,"
1359             " only TIME segments are supported",
1360             gst_format_get_name (segment->format));
1361         gst_event_unref (event);
1362         event = NULL;
1363         res = FALSE;
1364         break;
1365       }
1366 
1367       GST_OBJECT_LOCK (agg);
1368       if (segment->rate != GST_AGGREGATOR_PAD (agg->srcpad)->segment.rate) {
1369         GST_ERROR_OBJECT (aggpad,
1370             "Got segment event with wrong rate %lf, expected %lf",
1371             segment->rate, GST_AGGREGATOR_PAD (agg->srcpad)->segment.rate);
1372         res = FALSE;
1373         gst_event_unref (event);
1374         event = NULL;
1375       } else if (segment->rate < 0.0) {
1376         GST_ERROR_OBJECT (aggpad, "Negative rates not supported yet");
1377         res = FALSE;
1378         gst_event_unref (event);
1379         event = NULL;
1380       } else {
1381         GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (aggpad);
1382 
1383         GST_OBJECT_LOCK (pad);
1384         pad->priv->new_segment = TRUE;
1385         gst_audio_aggregator_pad_reset_qos (pad);
1386         GST_OBJECT_UNLOCK (pad);
1387       }
1388       GST_OBJECT_UNLOCK (agg);
1389 
1390       break;
1391     }
1392     case GST_EVENT_CAPS:
1393     {
1394       GstCaps *caps;
1395 
1396       gst_event_parse_caps (event, &caps);
1397       GST_INFO_OBJECT (aggpad, "Got caps %" GST_PTR_FORMAT, caps);
1398       res = gst_audio_aggregator_sink_setcaps (aaggpad, agg, caps);
1399       gst_event_unref (event);
1400       event = NULL;
1401       break;
1402     }
1403     default:
1404       break;
1405   }
1406 
1407   if (!res) {
1408     if (event)
1409       gst_event_unref (event);
1410     return res;
1411   }
1412 
1413   if (event != NULL)
1414     return
1415         GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->sink_event
1416         (agg, aggpad, event);
1417 
1418   return res;
1419 }
1420 
1421 static gboolean
gst_audio_aggregator_sink_query(GstAggregator * agg,GstAggregatorPad * aggpad,GstQuery * query)1422 gst_audio_aggregator_sink_query (GstAggregator * agg, GstAggregatorPad * aggpad,
1423     GstQuery * query)
1424 {
1425   gboolean res = FALSE;
1426 
1427   switch (GST_QUERY_TYPE (query)) {
1428     case GST_QUERY_CAPS:
1429     {
1430       GstCaps *filter, *caps;
1431 
1432       gst_query_parse_caps (query, &filter);
1433       if (GST_IS_AUDIO_AGGREGATOR_CONVERT_PAD (aggpad)) {
1434         caps =
1435             gst_audio_aggregator_convert_sink_getcaps (GST_PAD (aggpad), agg,
1436             filter);
1437       } else {
1438         caps =
1439             gst_audio_aggregator_sink_getcaps (GST_PAD (aggpad), agg, filter);
1440       }
1441       gst_query_set_caps_result (query, caps);
1442       gst_caps_unref (caps);
1443       res = TRUE;
1444       break;
1445     }
1446     default:
1447       res =
1448           GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->sink_query
1449           (agg, aggpad, query);
1450       break;
1451   }
1452 
1453   return res;
1454 }
1455 
1456 
1457 /* FIXME, the duration query should reflect how long you will produce
1458  * data, that is the amount of stream time until you will emit EOS.
1459  *
1460  * For synchronized mixing this is always the max of all the durations
1461  * of upstream since we emit EOS when all of them finished.
1462  *
1463  * We don't do synchronized mixing so this really depends on where the
1464  * streams where punched in and what their relative offsets are against
1465  * each other which we can get from the first timestamps we see.
1466  *
1467  * When we add a new stream (or remove a stream) the duration might
1468  * also become invalid again and we need to post a new DURATION
1469  * message to notify this fact to the parent.
1470  * For now we take the max of all the upstream elements so the simple
1471  * cases work at least somewhat.
1472  */
1473 static gboolean
gst_audio_aggregator_query_duration(GstAudioAggregator * aagg,GstQuery * query)1474 gst_audio_aggregator_query_duration (GstAudioAggregator * aagg,
1475     GstQuery * query)
1476 {
1477   gint64 max;
1478   gboolean res;
1479   GstFormat format;
1480   GstIterator *it;
1481   gboolean done;
1482   GValue item = { 0, };
1483 
1484   /* parse format */
1485   gst_query_parse_duration (query, &format, NULL);
1486 
1487   max = -1;
1488   res = TRUE;
1489   done = FALSE;
1490 
1491   it = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (aagg));
1492   while (!done) {
1493     GstIteratorResult ires;
1494 
1495     ires = gst_iterator_next (it, &item);
1496     switch (ires) {
1497       case GST_ITERATOR_DONE:
1498         done = TRUE;
1499         break;
1500       case GST_ITERATOR_OK:
1501       {
1502         GstPad *pad = g_value_get_object (&item);
1503         gint64 duration;
1504 
1505         /* ask sink peer for duration */
1506         res &= gst_pad_peer_query_duration (pad, format, &duration);
1507         /* take max from all valid return values */
1508         if (res) {
1509           /* valid unknown length, stop searching */
1510           if (duration == -1) {
1511             max = duration;
1512             done = TRUE;
1513           }
1514           /* else see if bigger than current max */
1515           else if (duration > max)
1516             max = duration;
1517         }
1518         g_value_reset (&item);
1519         break;
1520       }
1521       case GST_ITERATOR_RESYNC:
1522         max = -1;
1523         res = TRUE;
1524         gst_iterator_resync (it);
1525         break;
1526       default:
1527         res = FALSE;
1528         done = TRUE;
1529         break;
1530     }
1531   }
1532   g_value_unset (&item);
1533   gst_iterator_free (it);
1534 
1535   if (res) {
1536     /* and store the max */
1537     GST_DEBUG_OBJECT (aagg, "Total duration in format %s: %"
1538         GST_TIME_FORMAT, gst_format_get_name (format), GST_TIME_ARGS (max));
1539     gst_query_set_duration (query, format, max);
1540   }
1541 
1542   return res;
1543 }
1544 
1545 
1546 static gboolean
gst_audio_aggregator_src_query(GstAggregator * agg,GstQuery * query)1547 gst_audio_aggregator_src_query (GstAggregator * agg, GstQuery * query)
1548 {
1549   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1550   GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
1551   gboolean res = FALSE;
1552 
1553   switch (GST_QUERY_TYPE (query)) {
1554     case GST_QUERY_DURATION:
1555       res = gst_audio_aggregator_query_duration (aagg, query);
1556       break;
1557     case GST_QUERY_POSITION:
1558     {
1559       GstFormat format;
1560 
1561       gst_query_parse_position (query, &format, NULL);
1562 
1563       GST_OBJECT_LOCK (aagg);
1564 
1565       switch (format) {
1566         case GST_FORMAT_TIME:
1567           gst_query_set_position (query, format,
1568               gst_segment_to_stream_time (&GST_AGGREGATOR_PAD (agg->srcpad)->
1569                   segment, GST_FORMAT_TIME,
1570                   GST_AGGREGATOR_PAD (agg->srcpad)->segment.position));
1571           res = TRUE;
1572           break;
1573         case GST_FORMAT_BYTES:
1574           if (GST_AUDIO_INFO_BPF (&srcpad->info)) {
1575             gst_query_set_position (query, format, aagg->priv->offset *
1576                 GST_AUDIO_INFO_BPF (&srcpad->info));
1577             res = TRUE;
1578           }
1579           break;
1580         case GST_FORMAT_DEFAULT:
1581           gst_query_set_position (query, format, aagg->priv->offset);
1582           res = TRUE;
1583           break;
1584         default:
1585           break;
1586       }
1587 
1588       GST_OBJECT_UNLOCK (aagg);
1589 
1590       break;
1591     }
1592     default:
1593       res =
1594           GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->src_query
1595           (agg, query);
1596       break;
1597   }
1598 
1599   return res;
1600 }
1601 
1602 
1603 void
gst_audio_aggregator_set_sink_caps(GstAudioAggregator * aagg,GstAudioAggregatorPad * pad,GstCaps * caps)1604 gst_audio_aggregator_set_sink_caps (GstAudioAggregator * aagg,
1605     GstAudioAggregatorPad * pad, GstCaps * caps)
1606 {
1607 #ifndef G_DISABLE_ASSERT
1608   gboolean valid;
1609 
1610   GST_OBJECT_LOCK (pad);
1611   valid = gst_audio_info_from_caps (&pad->info, caps);
1612   g_assert (valid);
1613   GST_OBJECT_UNLOCK (pad);
1614 #else
1615   GST_OBJECT_LOCK (pad);
1616   (void) gst_audio_info_from_caps (&pad->info, caps);
1617   GST_OBJECT_UNLOCK (pad);
1618 #endif
1619 }
1620 
1621 /* Must hold object lock and aagg lock to call */
1622 
1623 static void
gst_audio_aggregator_reset(GstAudioAggregator * aagg)1624 gst_audio_aggregator_reset (GstAudioAggregator * aagg)
1625 {
1626   GstAggregator *agg = GST_AGGREGATOR (aagg);
1627 
1628   GST_AUDIO_AGGREGATOR_LOCK (aagg);
1629   GST_OBJECT_LOCK (aagg);
1630   GST_AGGREGATOR_PAD (agg->srcpad)->segment.position = -1;
1631   aagg->priv->offset = -1;
1632   gst_audio_info_init (&GST_AUDIO_AGGREGATOR_PAD (agg->srcpad)->info);
1633   gst_caps_replace (&aagg->current_caps, NULL);
1634   gst_buffer_replace (&aagg->priv->current_buffer, NULL);
1635   aagg->priv->accumulated_error = 0;
1636   GST_OBJECT_UNLOCK (aagg);
1637   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1638 }
1639 
1640 static gboolean
gst_audio_aggregator_start(GstAggregator * agg)1641 gst_audio_aggregator_start (GstAggregator * agg)
1642 {
1643   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1644 
1645   gst_audio_aggregator_reset (aagg);
1646 
1647   return TRUE;
1648 }
1649 
1650 static gboolean
gst_audio_aggregator_stop(GstAggregator * agg)1651 gst_audio_aggregator_stop (GstAggregator * agg)
1652 {
1653   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1654 
1655   gst_audio_aggregator_reset (aagg);
1656 
1657   return TRUE;
1658 }
1659 
1660 static GstFlowReturn
gst_audio_aggregator_flush(GstAggregator * agg)1661 gst_audio_aggregator_flush (GstAggregator * agg)
1662 {
1663   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1664 
1665   GST_AUDIO_AGGREGATOR_LOCK (aagg);
1666   GST_OBJECT_LOCK (aagg);
1667   GST_AGGREGATOR_PAD (agg->srcpad)->segment.position = -1;
1668   aagg->priv->offset = -1;
1669   aagg->priv->accumulated_error = 0;
1670   gst_buffer_replace (&aagg->priv->current_buffer, NULL);
1671   GST_OBJECT_UNLOCK (aagg);
1672   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1673 
1674   return GST_FLOW_OK;
1675 }
1676 
1677 static GstBuffer *
gst_audio_aggregator_do_clip(GstAggregator * agg,GstAggregatorPad * bpad,GstBuffer * buffer)1678 gst_audio_aggregator_do_clip (GstAggregator * agg,
1679     GstAggregatorPad * bpad, GstBuffer * buffer)
1680 {
1681   GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (bpad);
1682   gint rate, bpf;
1683 
1684   /* Guard against invalid audio info, we just don't clip here then */
1685   if (!GST_AUDIO_INFO_IS_VALID (&pad->info))
1686     return buffer;
1687 
1688   GST_OBJECT_LOCK (bpad);
1689   rate = GST_AUDIO_INFO_RATE (&pad->info);
1690   bpf = GST_AUDIO_INFO_BPF (&pad->info);
1691   buffer = gst_audio_buffer_clip (buffer, &bpad->segment, rate, bpf);
1692   GST_OBJECT_UNLOCK (bpad);
1693 
1694   return buffer;
1695 }
1696 
1697 
1698 /* Called with the object lock for both the element and pad held,
1699  * as well as the audio aggregator lock.
1700  * Should only be called on the output queue.
1701  */
1702 static GstClockTime
gst_audio_aggregator_pad_enqueue_qos_message(GstAudioAggregatorPad * pad,GstAudioAggregator * aagg,guint64 samples)1703 gst_audio_aggregator_pad_enqueue_qos_message (GstAudioAggregatorPad * pad,
1704     GstAudioAggregator * aagg, guint64 samples)
1705 {
1706   GstAggregator *agg = GST_AGGREGATOR (aagg);
1707   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1708   GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
1709 
1710   guint rate_output = GST_AUDIO_INFO_RATE (&srcpad->info);
1711   GstClockTime offset = gst_util_uint64_scale (GST_SECOND, pad->priv->position,
1712       rate_output);
1713   GstClockTime timestamp = GST_BUFFER_PTS (pad->priv->buffer) + offset;
1714   GstClockTime running_time =
1715       gst_segment_to_running_time (&aggpad->segment, GST_FORMAT_TIME,
1716       timestamp);
1717   GstClockTime stream_time = gst_segment_to_stream_time (&aggpad->segment,
1718       GST_FORMAT_TIME, timestamp);
1719   GstClockTime duration;
1720   guint rate_input;
1721   guint64 processed, dropped;
1722   GstMessage *msg;
1723 
1724   if (!pad->priv->qos_messages)
1725     return running_time;
1726 
1727   if (GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (pad)->convert_buffer)
1728     rate_input = GST_AUDIO_INFO_RATE (&srcpad->info);
1729   else
1730     rate_input = GST_AUDIO_INFO_RATE (&pad->info);
1731 
1732   duration = gst_util_uint64_scale (samples, GST_SECOND, rate_input);
1733 
1734   processed = gst_util_uint64_scale (pad->priv->processed, rate_input,
1735       rate_output);
1736   dropped = gst_util_uint64_scale (pad->priv->dropped, rate_output,
1737       rate_output);
1738 
1739   msg = gst_message_new_qos (GST_OBJECT (aggpad), TRUE, running_time,
1740       stream_time, timestamp, duration);
1741   gst_message_set_qos_stats (msg, GST_FORMAT_DEFAULT, processed, dropped);
1742 
1743   g_queue_push_tail (&aagg->priv->messages, msg);
1744 
1745   return running_time;
1746 }
1747 
1748 static void
gst_audio_aggregator_post_messages(GstAudioAggregator * aagg)1749 gst_audio_aggregator_post_messages (GstAudioAggregator * aagg)
1750 {
1751   if (g_queue_get_length (&aagg->priv->messages) != 0) {
1752     GstClockTime latency = gst_aggregator_get_latency (GST_AGGREGATOR (aagg));
1753     gboolean is_live = GST_CLOCK_TIME_IS_VALID (latency);
1754     GstElement *e = GST_ELEMENT (aagg);
1755     GstMessage *msg;
1756 
1757     while ((msg = g_queue_pop_head (&aagg->priv->messages))) {
1758       if (is_live) {
1759         GstStructure *s = gst_message_writable_structure (msg);
1760         gst_structure_set (s, "live", G_TYPE_BOOLEAN, TRUE, NULL);
1761       }
1762 
1763       gst_element_post_message (e, msg);
1764     }
1765   }
1766 }
1767 
1768 /* Called with the object lock for both the element and pad held,
1769  * as well as the aagg lock
1770  *
1771  * Replace the current buffer with input and update GstAudioAggregatorPadPrivate
1772  * values.
1773  */
1774 static gboolean
gst_audio_aggregator_fill_buffer(GstAudioAggregator * aagg,GstAudioAggregatorPad * pad)1775 gst_audio_aggregator_fill_buffer (GstAudioAggregator * aagg,
1776     GstAudioAggregatorPad * pad)
1777 {
1778   GstClockTime start_time, end_time;
1779   gboolean discont = FALSE;
1780   guint64 start_offset, end_offset;
1781   gint rate, bpf;
1782 
1783   GstAggregator *agg = GST_AGGREGATOR (aagg);
1784   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1785   GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
1786 
1787   if (GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (pad)->convert_buffer) {
1788     rate = GST_AUDIO_INFO_RATE (&srcpad->info);
1789     bpf = GST_AUDIO_INFO_BPF (&srcpad->info);
1790   } else {
1791     rate = GST_AUDIO_INFO_RATE (&pad->info);
1792     bpf = GST_AUDIO_INFO_BPF (&pad->info);
1793   }
1794 
1795   pad->priv->position = 0;
1796   pad->priv->size = gst_buffer_get_size (pad->priv->buffer) / bpf;
1797 
1798   if (pad->priv->size == 0) {
1799     if (!GST_BUFFER_DURATION_IS_VALID (pad->priv->buffer) ||
1800         !GST_BUFFER_FLAG_IS_SET (pad->priv->buffer, GST_BUFFER_FLAG_GAP)) {
1801       GST_WARNING_OBJECT (pad, "Dropping 0-sized buffer missing either a"
1802           " duration or a GAP flag: %" GST_PTR_FORMAT, pad->priv->buffer);
1803       return FALSE;
1804     }
1805 
1806     pad->priv->size =
1807         gst_util_uint64_scale (GST_BUFFER_DURATION (pad->priv->buffer), rate,
1808         GST_SECOND);
1809   }
1810 
1811   if (!GST_BUFFER_PTS_IS_VALID (pad->priv->buffer)) {
1812     if (pad->priv->output_offset == -1)
1813       pad->priv->output_offset = aagg->priv->offset;
1814     if (pad->priv->next_offset == -1)
1815       pad->priv->next_offset = pad->priv->size;
1816     else
1817       pad->priv->next_offset += pad->priv->size;
1818     goto done;
1819   }
1820 
1821   start_time = GST_BUFFER_PTS (pad->priv->buffer);
1822   end_time =
1823       start_time + gst_util_uint64_scale_ceil (pad->priv->size, GST_SECOND,
1824       rate);
1825 
1826   /* Clipping should've ensured this */
1827   g_assert (start_time >= aggpad->segment.start);
1828 
1829   start_offset =
1830       gst_util_uint64_scale (start_time - aggpad->segment.start, rate,
1831       GST_SECOND);
1832   end_offset = start_offset + pad->priv->size;
1833 
1834   if (GST_BUFFER_IS_DISCONT (pad->priv->buffer)
1835       || GST_BUFFER_FLAG_IS_SET (pad->priv->buffer, GST_BUFFER_FLAG_RESYNC)
1836       || pad->priv->new_segment || pad->priv->next_offset == -1) {
1837     discont = TRUE;
1838     pad->priv->new_segment = FALSE;
1839   } else {
1840     guint64 diff, max_sample_diff;
1841 
1842     /* Check discont, based on audiobasesink */
1843     if (start_offset <= pad->priv->next_offset)
1844       diff = pad->priv->next_offset - start_offset;
1845     else
1846       diff = start_offset - pad->priv->next_offset;
1847 
1848     max_sample_diff =
1849         gst_util_uint64_scale_int (aagg->priv->alignment_threshold, rate,
1850         GST_SECOND);
1851 
1852     /* Discont! */
1853     if (G_UNLIKELY (diff >= max_sample_diff)) {
1854       if (aagg->priv->discont_wait > 0) {
1855         if (pad->priv->discont_time == GST_CLOCK_TIME_NONE) {
1856           pad->priv->discont_time = start_time;
1857         } else if (start_time - pad->priv->discont_time >=
1858             aagg->priv->discont_wait) {
1859           discont = TRUE;
1860           pad->priv->discont_time = GST_CLOCK_TIME_NONE;
1861         }
1862       } else {
1863         discont = TRUE;
1864       }
1865     } else if (G_UNLIKELY (pad->priv->discont_time != GST_CLOCK_TIME_NONE)) {
1866       /* we have had a discont, but are now back on track! */
1867       pad->priv->discont_time = GST_CLOCK_TIME_NONE;
1868     }
1869   }
1870 
1871   if (discont) {
1872     /* Have discont, need resync */
1873     if (pad->priv->next_offset != -1)
1874       GST_DEBUG_OBJECT (pad, "Have discont. Expected %"
1875           G_GUINT64_FORMAT ", got %" G_GUINT64_FORMAT,
1876           pad->priv->next_offset, start_offset);
1877     pad->priv->next_offset = -1;
1878   } else {
1879     pad->priv->next_offset += pad->priv->size;
1880   }
1881 
1882   if (pad->priv->output_offset == -1 || discont) {
1883     GstClockTime start_running_time;
1884     GstClockTime end_running_time;
1885     GstClockTime segment_pos;
1886     guint64 start_output_offset = -1;
1887     guint64 end_output_offset = -1;
1888     GstSegment *agg_segment = &GST_AGGREGATOR_PAD (agg->srcpad)->segment;
1889 
1890     start_running_time =
1891         gst_segment_to_running_time (&aggpad->segment,
1892         GST_FORMAT_TIME, start_time);
1893     end_running_time =
1894         gst_segment_to_running_time (&aggpad->segment,
1895         GST_FORMAT_TIME, end_time);
1896 
1897     /* Convert to position in the output segment */
1898     segment_pos =
1899         gst_segment_position_from_running_time (agg_segment, GST_FORMAT_TIME,
1900         start_running_time);
1901     if (GST_CLOCK_TIME_IS_VALID (segment_pos))
1902       start_output_offset =
1903           gst_util_uint64_scale (segment_pos - agg_segment->start, rate,
1904           GST_SECOND);
1905 
1906     segment_pos =
1907         gst_segment_position_from_running_time (agg_segment, GST_FORMAT_TIME,
1908         end_running_time);
1909     if (GST_CLOCK_TIME_IS_VALID (segment_pos))
1910       end_output_offset =
1911           gst_util_uint64_scale (segment_pos - agg_segment->start, rate,
1912           GST_SECOND);
1913 
1914     if (start_output_offset == -1 && end_output_offset == -1) {
1915       /* Outside output segment, drop */
1916       pad->priv->position = 0;
1917       pad->priv->size = 0;
1918       GST_DEBUG_OBJECT (pad, "Buffer outside output segment");
1919       return FALSE;
1920     }
1921 
1922     /* Calculate end_output_offset if it was outside the output segment */
1923     if (end_output_offset == -1)
1924       end_output_offset = start_output_offset + pad->priv->size;
1925 
1926     if (end_output_offset < aagg->priv->offset) {
1927       GstClockTime rt;
1928 
1929       pad->priv->dropped += pad->priv->size;
1930       rt = gst_audio_aggregator_pad_enqueue_qos_message (pad, aagg,
1931           pad->priv->size);
1932       GST_DEBUG_OBJECT (pad, "Dropped buffer of %u samples at running time %"
1933           GST_TIME_FORMAT " because input buffer is entirely before current"
1934           " output offset", pad->priv->size, GST_TIME_ARGS (rt));
1935 
1936       pad->priv->position = 0;
1937       pad->priv->size = 0;
1938       GST_DEBUG_OBJECT (pad,
1939           "Buffer before segment or current position: %" G_GUINT64_FORMAT " < %"
1940           G_GINT64_FORMAT, end_output_offset, aagg->priv->offset);
1941       return FALSE;
1942     }
1943 
1944     if (start_output_offset == -1 ||
1945         start_output_offset < aagg->priv->offset ||
1946         (pad->priv->output_offset != -1 &&
1947             start_output_offset < pad->priv->output_offset)) {
1948       guint diff;
1949 
1950       if (start_output_offset == -1 && end_output_offset < pad->priv->size) {
1951         diff = pad->priv->size - end_output_offset + aagg->priv->offset;
1952       } else if (start_output_offset == -1) {
1953         start_output_offset = end_output_offset - pad->priv->size;
1954 
1955         if (start_output_offset < aagg->priv->offset)
1956           diff = aagg->priv->offset - start_output_offset;
1957         else
1958           diff = 0;
1959       } else if (pad->priv->output_offset != -1 &&
1960           start_output_offset < pad->priv->output_offset) {
1961         diff = pad->priv->output_offset - start_output_offset;
1962       } else {
1963         diff = aagg->priv->offset - start_output_offset;
1964       }
1965 
1966       pad->priv->dropped += MIN (diff, pad->priv->size);
1967       if (diff != 0) {
1968         GstClockTime rt;
1969 
1970         rt = gst_audio_aggregator_pad_enqueue_qos_message (pad, aagg, diff);
1971         GST_DEBUG_OBJECT (pad, "Dropped %u samples at running time %"
1972             GST_TIME_FORMAT " because input buffer starts before current"
1973             " output offset", diff, GST_TIME_ARGS (rt));
1974       }
1975 
1976       pad->priv->position += diff;
1977       if (start_output_offset != -1)
1978         start_output_offset += diff;
1979       if (pad->priv->position >= pad->priv->size) {
1980         /* Empty buffer, drop */
1981         pad->priv->dropped += pad->priv->size;
1982         pad->priv->position = 0;
1983         pad->priv->size = 0;
1984         GST_DEBUG_OBJECT (pad,
1985             "Buffer before segment or current position: %" G_GUINT64_FORMAT
1986             " < %" G_GINT64_FORMAT, end_output_offset, aagg->priv->offset);
1987         return FALSE;
1988       }
1989     }
1990 
1991     if (start_output_offset == -1)
1992       pad->priv->output_offset = aagg->priv->offset;
1993     else
1994       pad->priv->output_offset = start_output_offset;
1995 
1996     if (pad->priv->next_offset == -1)
1997       pad->priv->next_offset = end_offset;
1998 
1999     GST_DEBUG_OBJECT (pad,
2000         "Buffer resynced: Pad offset %" G_GUINT64_FORMAT
2001         ", current audio aggregator offset %" G_GINT64_FORMAT,
2002         pad->priv->output_offset, aagg->priv->offset);
2003   }
2004 
2005 done:
2006 
2007   GST_LOG_OBJECT (pad,
2008       "Queued new buffer at offset %" G_GUINT64_FORMAT,
2009       pad->priv->output_offset);
2010 
2011   return TRUE;
2012 }
2013 
2014 /* Called with pad object lock held */
2015 
2016 static gboolean
gst_audio_aggregator_mix_buffer(GstAudioAggregator * aagg,GstAudioAggregatorPad * pad,GstBuffer * inbuf,GstBuffer * outbuf,guint blocksize)2017 gst_audio_aggregator_mix_buffer (GstAudioAggregator * aagg,
2018     GstAudioAggregatorPad * pad, GstBuffer * inbuf, GstBuffer * outbuf,
2019     guint blocksize)
2020 {
2021   guint overlap;
2022   guint out_start;
2023   gboolean filled;
2024   guint in_offset;
2025   gboolean pad_changed = FALSE;
2026 
2027   /* Overlap => mix */
2028   if (aagg->priv->offset < pad->priv->output_offset)
2029     out_start = pad->priv->output_offset - aagg->priv->offset;
2030   else
2031     out_start = 0;
2032 
2033   overlap = pad->priv->size - pad->priv->position;
2034   if (overlap > blocksize - out_start)
2035     overlap = blocksize - out_start;
2036 
2037   if (GST_BUFFER_FLAG_IS_SET (inbuf, GST_BUFFER_FLAG_GAP)) {
2038     /* skip gap buffer */
2039     GST_LOG_OBJECT (pad, "skipping GAP buffer");
2040     pad->priv->output_offset += pad->priv->size - pad->priv->position;
2041     pad->priv->position = pad->priv->size;
2042 
2043     gst_buffer_replace (&pad->priv->buffer, NULL);
2044     return FALSE;
2045   }
2046 
2047   gst_buffer_ref (inbuf);
2048   in_offset = pad->priv->position;
2049   GST_OBJECT_UNLOCK (pad);
2050   GST_OBJECT_UNLOCK (aagg);
2051 
2052   filled = GST_AUDIO_AGGREGATOR_GET_CLASS (aagg)->aggregate_one_buffer (aagg,
2053       pad, inbuf, in_offset, outbuf, out_start, overlap);
2054 
2055   GST_OBJECT_LOCK (aagg);
2056   GST_OBJECT_LOCK (pad);
2057 
2058   pad_changed = (inbuf != pad->priv->buffer);
2059   gst_buffer_unref (inbuf);
2060 
2061   if (filled)
2062     GST_BUFFER_FLAG_UNSET (outbuf, GST_BUFFER_FLAG_GAP);
2063 
2064   if (pad_changed)
2065     return FALSE;
2066 
2067   pad->priv->processed += overlap;
2068   pad->priv->position += overlap;
2069   pad->priv->output_offset += overlap;
2070 
2071   if (pad->priv->position == pad->priv->size) {
2072     /* Buffer done, drop it */
2073     gst_buffer_replace (&pad->priv->buffer, NULL);
2074     GST_LOG_OBJECT (pad, "Finished mixing buffer, waiting for next");
2075     return FALSE;
2076   }
2077 
2078   return TRUE;
2079 }
2080 
2081 static GstBuffer *
gst_audio_aggregator_create_output_buffer(GstAudioAggregator * aagg,guint num_frames)2082 gst_audio_aggregator_create_output_buffer (GstAudioAggregator * aagg,
2083     guint num_frames)
2084 {
2085   GstAllocator *allocator;
2086   GstAllocationParams params;
2087   GstBuffer *outbuf;
2088   GstMapInfo outmap;
2089   GstAggregator *agg = GST_AGGREGATOR (aagg);
2090   GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
2091 
2092   gst_aggregator_get_allocator (GST_AGGREGATOR (aagg), &allocator, &params);
2093 
2094   GST_DEBUG ("Creating output buffer with size %d",
2095       num_frames * GST_AUDIO_INFO_BPF (&srcpad->info));
2096 
2097   outbuf = gst_buffer_new_allocate (allocator, num_frames *
2098       GST_AUDIO_INFO_BPF (&srcpad->info), &params);
2099 
2100   if (allocator)
2101     gst_object_unref (allocator);
2102 
2103   gst_buffer_map (outbuf, &outmap, GST_MAP_WRITE);
2104   gst_audio_format_info_fill_silence (srcpad->info.finfo, outmap.data,
2105       outmap.size);
2106   gst_buffer_unmap (outbuf, &outmap);
2107 
2108   return outbuf;
2109 }
2110 
2111 static gboolean
sync_pad_values(GstElement * aagg,GstPad * pad,gpointer user_data)2112 sync_pad_values (GstElement * aagg, GstPad * pad, gpointer user_data)
2113 {
2114   GstAudioAggregatorPad *aapad = GST_AUDIO_AGGREGATOR_PAD (pad);
2115   GstAggregatorPad *bpad = GST_AGGREGATOR_PAD_CAST (pad);
2116   GstClockTime timestamp, stream_time;
2117 
2118   if (aapad->priv->buffer == NULL)
2119     return TRUE;
2120 
2121   timestamp = GST_BUFFER_PTS (aapad->priv->buffer);
2122   GST_OBJECT_LOCK (bpad);
2123   stream_time = gst_segment_to_stream_time (&bpad->segment, GST_FORMAT_TIME,
2124       timestamp);
2125   GST_OBJECT_UNLOCK (bpad);
2126 
2127   /* sync object properties on stream time */
2128   /* TODO: Ideally we would want to do that on every sample */
2129   if (GST_CLOCK_TIME_IS_VALID (stream_time))
2130     gst_object_sync_values (GST_OBJECT_CAST (pad), stream_time);
2131 
2132   return TRUE;
2133 }
2134 
2135 static GstSample *
gst_audio_aggregator_peek_next_sample(GstAggregator * agg,GstAggregatorPad * aggpad)2136 gst_audio_aggregator_peek_next_sample (GstAggregator * agg,
2137     GstAggregatorPad * aggpad)
2138 {
2139   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
2140   GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (aggpad);
2141   GstSample *sample = NULL;
2142 
2143   if (pad->priv->buffer && pad->priv->output_offset >= aagg->priv->offset
2144       && pad->priv->output_offset <
2145       aagg->priv->offset + aagg->priv->samples_per_buffer) {
2146     GstCaps *caps = gst_pad_get_current_caps (GST_PAD (aggpad));
2147     GstStructure *info =
2148         gst_structure_new ("GstAudioAggregatorPadNextSampleInfo",
2149         "output-offset", G_TYPE_UINT64, pad->priv->output_offset,
2150         "position", G_TYPE_UINT, pad->priv->position,
2151         "size", G_TYPE_UINT, pad->priv->size,
2152         NULL);
2153 
2154     sample = gst_sample_new (pad->priv->buffer, caps, &aggpad->segment, info);
2155     gst_caps_unref (caps);
2156     gst_structure_free (info);
2157   }
2158 
2159   return sample;
2160 }
2161 
2162 static GstFlowReturn
gst_audio_aggregator_aggregate(GstAggregator * agg,gboolean timeout)2163 gst_audio_aggregator_aggregate (GstAggregator * agg, gboolean timeout)
2164 {
2165   /* Calculate the current output offset/timestamp and offset_end/timestamp_end.
2166    * Allocate a silence buffer for this and store it.
2167    *
2168    * For all pads:
2169    * 1) Once per input buffer (cached)
2170    *   1) Check discont (flag and timestamp with tolerance)
2171    *   2) If discont or new, resync. That means:
2172    *     1) Drop all start data of the buffer that comes before
2173    *        the current position/offset.
2174    *     2) Calculate the offset (output segment!) that the first
2175    *        frame of the input buffer corresponds to. Base this on
2176    *        the running time.
2177    *
2178    * 2) If the current pad's offset/offset_end overlaps with the output
2179    *    offset/offset_end, mix it at the appropriate position in the output
2180    *    buffer and advance the pad's position. Remember if this pad needs
2181    *    a new buffer to advance behind the output offset_end.
2182    *
2183    * If we had no pad with a buffer, go EOS.
2184    *
2185    * If we had at least one pad that did not advance behind output
2186    * offset_end, let aggregate be called again for the current
2187    * output offset/offset_end.
2188    */
2189   GstElement *element;
2190   GstAudioAggregator *aagg;
2191   GList *iter;
2192   GstFlowReturn ret;
2193   GstBuffer *outbuf = NULL;
2194   gint64 next_offset;
2195   gint64 next_timestamp;
2196   gint rate, bpf;
2197   gboolean dropped = FALSE;
2198   gboolean is_eos = TRUE;
2199   gboolean is_done = TRUE;
2200   guint blocksize;
2201   GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
2202   GstSegment *agg_segment = &GST_AGGREGATOR_PAD (agg->srcpad)->segment;
2203 
2204   element = GST_ELEMENT (agg);
2205   aagg = GST_AUDIO_AGGREGATOR (agg);
2206 
2207   /* Sync pad properties to the stream time */
2208   gst_element_foreach_sink_pad (element, sync_pad_values, NULL);
2209 
2210   GST_AUDIO_AGGREGATOR_LOCK (aagg);
2211   GST_OBJECT_LOCK (agg);
2212 
2213   if (aagg->priv->samples_per_buffer == 0) {
2214     if (!gst_audio_aggregator_update_samples_per_buffer (aagg)) {
2215       GST_ERROR_OBJECT (aagg,
2216           "Failed to calculate the number of samples per buffer");
2217       GST_OBJECT_UNLOCK (agg);
2218       goto not_negotiated;
2219     }
2220   }
2221 
2222   /* Update position from the segment start/stop if needed */
2223   if (agg_segment->position == -1) {
2224     if (agg_segment->rate > 0.0)
2225       agg_segment->position = agg_segment->start;
2226     else
2227       agg_segment->position = agg_segment->stop;
2228   }
2229 
2230   rate = GST_AUDIO_INFO_RATE (&srcpad->info);
2231   bpf = GST_AUDIO_INFO_BPF (&srcpad->info);
2232 
2233   if (G_UNLIKELY (srcpad->info.finfo->format == GST_AUDIO_FORMAT_UNKNOWN)) {
2234     if (timeout) {
2235       GstClockTime output_buffer_duration;
2236       GST_DEBUG_OBJECT (aagg,
2237           "Got timeout before receiving any caps, don't output anything");
2238 
2239       blocksize = aagg->priv->samples_per_buffer;
2240       if (aagg->priv->error_per_buffer + aagg->priv->accumulated_error >=
2241           aagg->priv->output_buffer_duration_d)
2242         blocksize += 1;
2243       aagg->priv->accumulated_error =
2244           (aagg->priv->accumulated_error +
2245           aagg->priv->error_per_buffer) % aagg->priv->output_buffer_duration_d;
2246 
2247       output_buffer_duration =
2248           gst_util_uint64_scale (blocksize, GST_SECOND, rate);
2249 
2250       /* Advance position */
2251       if (agg_segment->rate > 0.0)
2252         agg_segment->position += output_buffer_duration;
2253       else if (agg_segment->position > output_buffer_duration)
2254         agg_segment->position -= output_buffer_duration;
2255       else
2256         agg_segment->position = 0;
2257 
2258       GST_OBJECT_UNLOCK (agg);
2259       GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
2260       return GST_AGGREGATOR_FLOW_NEED_DATA;
2261     } else {
2262       GST_OBJECT_UNLOCK (agg);
2263       goto not_negotiated;
2264     }
2265   }
2266 
2267   if (aagg->priv->offset == -1) {
2268     aagg->priv->offset =
2269         gst_util_uint64_scale (agg_segment->position - agg_segment->start, rate,
2270         GST_SECOND);
2271     GST_DEBUG_OBJECT (aagg, "Starting at offset %" G_GINT64_FORMAT,
2272         aagg->priv->offset);
2273   }
2274 
2275   if (aagg->priv->current_buffer == NULL) {
2276     blocksize = aagg->priv->samples_per_buffer;
2277 
2278     if (aagg->priv->error_per_buffer + aagg->priv->accumulated_error >=
2279         aagg->priv->output_buffer_duration_d)
2280       blocksize += 1;
2281 
2282     aagg->priv->current_blocksize = blocksize;
2283 
2284     aagg->priv->accumulated_error =
2285         (aagg->priv->accumulated_error +
2286         aagg->priv->error_per_buffer) % aagg->priv->output_buffer_duration_d;
2287 
2288     GST_OBJECT_UNLOCK (agg);
2289     aagg->priv->current_buffer =
2290         GST_AUDIO_AGGREGATOR_GET_CLASS (aagg)->create_output_buffer (aagg,
2291         blocksize);
2292     /* Be careful, some things could have changed ? */
2293     GST_OBJECT_LOCK (agg);
2294     GST_BUFFER_FLAG_SET (aagg->priv->current_buffer, GST_BUFFER_FLAG_GAP);
2295   } else {
2296     blocksize = aagg->priv->current_blocksize;
2297   }
2298 
2299   /* FIXME: Reverse mixing does not work at all yet */
2300   if (agg_segment->rate > 0.0) {
2301     next_offset = aagg->priv->offset + blocksize;
2302   } else {
2303     next_offset = aagg->priv->offset - blocksize;
2304   }
2305 
2306   /* Use the sample counter, which will never accumulate rounding errors */
2307   next_timestamp =
2308       agg_segment->start + gst_util_uint64_scale (next_offset, GST_SECOND,
2309       rate);
2310 
2311   outbuf = aagg->priv->current_buffer;
2312 
2313   GST_LOG_OBJECT (agg,
2314       "Starting to mix %u samples for offset %" G_GINT64_FORMAT
2315       " with timestamp %" GST_TIME_FORMAT, blocksize,
2316       aagg->priv->offset, GST_TIME_ARGS (agg_segment->position));
2317 
2318   for (iter = element->sinkpads; iter; iter = iter->next) {
2319     GstAudioAggregatorPad *pad = (GstAudioAggregatorPad *) iter->data;
2320     GstAggregatorPad *aggpad = (GstAggregatorPad *) iter->data;
2321     gboolean pad_eos = gst_aggregator_pad_is_eos (aggpad);
2322     GstBuffer *input_buffer;
2323 
2324     if (gst_aggregator_pad_is_inactive (aggpad))
2325       continue;
2326 
2327     if (!pad_eos)
2328       is_eos = FALSE;
2329 
2330     input_buffer = gst_aggregator_pad_peek_buffer (aggpad);
2331 
2332     GST_OBJECT_LOCK (pad);
2333     if (!input_buffer) {
2334       if (timeout) {
2335         if (pad->priv->output_offset < next_offset) {
2336           gint64 diff = next_offset - pad->priv->output_offset;
2337           GST_DEBUG_OBJECT (pad, "Timeout, missing %" G_GINT64_FORMAT
2338               " frames (%" GST_TIME_FORMAT ")", diff,
2339               GST_TIME_ARGS (gst_util_uint64_scale (diff, GST_SECOND,
2340                       GST_AUDIO_INFO_RATE (&srcpad->info))));
2341         }
2342       } else if (!pad_eos) {
2343         is_done = FALSE;
2344       }
2345       GST_OBJECT_UNLOCK (pad);
2346       continue;
2347     } else if (!GST_AUDIO_INFO_IS_VALID (&pad->info)) {
2348       GST_OBJECT_UNLOCK (pad);
2349       GST_OBJECT_UNLOCK (agg);
2350       goto not_negotiated;
2351     }
2352 
2353     /* New buffer? */
2354     if (!pad->priv->buffer) {
2355       if (GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (pad)->convert_buffer) {
2356         pad->priv->buffer =
2357             gst_audio_aggregator_convert_buffer
2358             (aagg, GST_PAD (pad), &pad->info, &srcpad->info, input_buffer);
2359         if (!pad->priv->buffer) {
2360           GST_OBJECT_UNLOCK (pad);
2361           GST_OBJECT_UNLOCK (agg);
2362           goto not_negotiated;
2363         }
2364       } else {
2365         pad->priv->buffer = gst_buffer_ref (input_buffer);
2366       }
2367 
2368       if (!gst_audio_aggregator_fill_buffer (aagg, pad)) {
2369         gst_buffer_replace (&pad->priv->buffer, NULL);
2370         gst_buffer_unref (input_buffer);
2371         dropped = TRUE;
2372         GST_OBJECT_UNLOCK (pad);
2373 
2374         gst_aggregator_pad_drop_buffer (aggpad);
2375         continue;
2376       }
2377     }
2378     gst_buffer_unref (input_buffer);
2379 
2380     if (!pad->priv->buffer && !dropped && pad_eos) {
2381       GST_DEBUG_OBJECT (aggpad, "Pad is in EOS state");
2382       GST_OBJECT_UNLOCK (pad);
2383       continue;
2384     }
2385 
2386     g_assert (pad->priv->buffer);
2387 
2388     /* This pad is lagging behind, we need to update the offset
2389      * and maybe drop the current buffer */
2390     if (pad->priv->output_offset < aagg->priv->offset) {
2391       gint64 diff = aagg->priv->offset - pad->priv->output_offset;
2392       gint64 odiff = diff;
2393 
2394       if (pad->priv->position + diff > pad->priv->size)
2395         diff = pad->priv->size - pad->priv->position;
2396       pad->priv->dropped += diff;
2397       if (diff != 0) {
2398         GstClockTime rt;
2399 
2400         rt = gst_audio_aggregator_pad_enqueue_qos_message (pad, aagg, diff);
2401         GST_DEBUG_OBJECT (pad, "Dropped %" G_GINT64_FORMAT " samples at"
2402             " running time %" GST_TIME_FORMAT " because input buffer is before"
2403             " output offset", diff, GST_TIME_ARGS (rt));
2404       }
2405       pad->priv->position += diff;
2406       pad->priv->output_offset += diff;
2407 
2408       if (pad->priv->position == pad->priv->size) {
2409         GST_DEBUG_OBJECT (pad, "Buffer was late by %" GST_TIME_FORMAT
2410             ", dropping %" GST_PTR_FORMAT,
2411             GST_TIME_ARGS (gst_util_uint64_scale (odiff, GST_SECOND,
2412                     GST_AUDIO_INFO_RATE (&srcpad->info))), pad->priv->buffer);
2413         /* Buffer done, drop it */
2414         gst_buffer_replace (&pad->priv->buffer, NULL);
2415         dropped = TRUE;
2416         GST_OBJECT_UNLOCK (pad);
2417         gst_aggregator_pad_drop_buffer (aggpad);
2418         continue;
2419       }
2420     }
2421 
2422     g_assert (pad->priv->buffer);
2423     GST_OBJECT_UNLOCK (pad);
2424   }
2425   GST_OBJECT_UNLOCK (agg);
2426 
2427   gst_audio_aggregator_post_messages (aagg);
2428 
2429   {
2430     gst_structure_set (aagg->priv->selected_samples_info, "offset",
2431         G_TYPE_UINT64, aagg->priv->offset, "frames", G_TYPE_UINT, blocksize,
2432         NULL);
2433     gst_aggregator_selected_samples (agg, agg_segment->position,
2434         GST_CLOCK_TIME_NONE, next_timestamp - agg_segment->position,
2435         aagg->priv->selected_samples_info);
2436   }
2437 
2438   GST_OBJECT_LOCK (agg);
2439   for (iter = element->sinkpads; iter; iter = iter->next) {
2440     GstAudioAggregatorPad *pad = (GstAudioAggregatorPad *) iter->data;
2441     GstAggregatorPad *aggpad = (GstAggregatorPad *) iter->data;
2442 
2443     if (gst_aggregator_pad_is_inactive (aggpad))
2444       continue;
2445 
2446     GST_OBJECT_LOCK (pad);
2447 
2448     if (pad->priv->buffer && pad->priv->output_offset >= aagg->priv->offset
2449         && pad->priv->output_offset < aagg->priv->offset + blocksize) {
2450       gboolean drop_buf;
2451 
2452       GST_LOG_OBJECT (aggpad, "Mixing buffer for current offset");
2453       drop_buf = !gst_audio_aggregator_mix_buffer (aagg, pad, pad->priv->buffer,
2454           outbuf, blocksize);
2455       if (pad->priv->output_offset >= next_offset) {
2456         GST_LOG_OBJECT (pad,
2457             "Pad is at or after current offset: %" G_GUINT64_FORMAT " >= %"
2458             G_GINT64_FORMAT, pad->priv->output_offset, next_offset);
2459       } else {
2460         is_done = FALSE;
2461       }
2462       if (drop_buf) {
2463         GST_OBJECT_UNLOCK (pad);
2464         gst_aggregator_pad_drop_buffer (aggpad);
2465         continue;
2466       }
2467     }
2468 
2469     GST_OBJECT_UNLOCK (pad);
2470   }
2471   GST_OBJECT_UNLOCK (agg);
2472 
2473   if (dropped) {
2474     /* We dropped a buffer, retry */
2475     GST_LOG_OBJECT (aagg, "A pad dropped a buffer, wait for the next one");
2476     GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
2477     return GST_AGGREGATOR_FLOW_NEED_DATA;
2478   }
2479 
2480   if (!is_done && !is_eos) {
2481     /* Get more buffers */
2482     GST_LOG_OBJECT (aagg,
2483         "We're not done yet for the current offset, waiting for more data");
2484     GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
2485     return GST_AGGREGATOR_FLOW_NEED_DATA;
2486   }
2487 
2488   if (is_eos) {
2489     gint64 max_offset = 0;
2490 
2491     GST_DEBUG_OBJECT (aagg, "We're EOS");
2492 
2493     GST_OBJECT_LOCK (agg);
2494     for (iter = GST_ELEMENT (agg)->sinkpads; iter; iter = iter->next) {
2495       GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (iter->data);
2496 
2497       if (gst_aggregator_pad_is_inactive (GST_AGGREGATOR_PAD (pad)))
2498         continue;
2499 
2500       max_offset = MAX ((gint64) max_offset, (gint64) pad->priv->output_offset);
2501     }
2502     GST_OBJECT_UNLOCK (agg);
2503 
2504     /* This means EOS or nothing mixed in at all */
2505     if (aagg->priv->offset == max_offset) {
2506       gst_buffer_replace (&aagg->priv->current_buffer, NULL);
2507       GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
2508       return GST_FLOW_EOS;
2509     }
2510 
2511     if (max_offset <= next_offset) {
2512       GST_DEBUG_OBJECT (aagg,
2513           "Last buffer is incomplete: %" G_GUINT64_FORMAT " <= %"
2514           G_GINT64_FORMAT, max_offset, next_offset);
2515       next_offset = max_offset;
2516       next_timestamp =
2517           agg_segment->start + gst_util_uint64_scale (next_offset, GST_SECOND,
2518           rate);
2519 
2520       if (next_offset > aagg->priv->offset)
2521         gst_buffer_resize (outbuf, 0, (next_offset - aagg->priv->offset) * bpf);
2522     }
2523   }
2524 
2525   /* set timestamps on the output buffer */
2526   GST_OBJECT_LOCK (agg);
2527   if (agg_segment->rate > 0.0) {
2528     GST_BUFFER_PTS (outbuf) = agg_segment->position;
2529     GST_BUFFER_OFFSET (outbuf) = aagg->priv->offset;
2530     GST_BUFFER_OFFSET_END (outbuf) = next_offset;
2531     GST_BUFFER_DURATION (outbuf) = next_timestamp - agg_segment->position;
2532   } else {
2533     GST_BUFFER_PTS (outbuf) = next_timestamp;
2534     GST_BUFFER_OFFSET (outbuf) = next_offset;
2535     GST_BUFFER_OFFSET_END (outbuf) = aagg->priv->offset;
2536     GST_BUFFER_DURATION (outbuf) = agg_segment->position - next_timestamp;
2537   }
2538 
2539   GST_OBJECT_UNLOCK (agg);
2540 
2541   /* send it out */
2542   GST_LOG_OBJECT (aagg,
2543       "pushing outbuf %p, timestamp %" GST_TIME_FORMAT " offset %"
2544       G_GINT64_FORMAT, outbuf, GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)),
2545       GST_BUFFER_OFFSET (outbuf));
2546 
2547   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
2548 
2549   ret = gst_aggregator_finish_buffer (agg, outbuf);
2550   aagg->priv->current_buffer = NULL;
2551 
2552   GST_LOG_OBJECT (aagg, "pushed outbuf, result = %s", gst_flow_get_name (ret));
2553 
2554   GST_AUDIO_AGGREGATOR_LOCK (aagg);
2555   GST_OBJECT_LOCK (agg);
2556   aagg->priv->offset = next_offset;
2557   agg_segment->position = next_timestamp;
2558 
2559   /* If there was a timeout and there was a gap in data in out of the streams,
2560    * then it's a very good time to for a resync with the timestamps.
2561    */
2562   if (timeout) {
2563     for (iter = element->sinkpads; iter; iter = iter->next) {
2564       GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (iter->data);
2565 
2566       GST_OBJECT_LOCK (pad);
2567       if (pad->priv->output_offset < aagg->priv->offset)
2568         pad->priv->output_offset = -1;
2569       GST_OBJECT_UNLOCK (pad);
2570     }
2571   }
2572   GST_OBJECT_UNLOCK (agg);
2573   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
2574 
2575   return ret;
2576   /* ERRORS */
2577 not_negotiated:
2578   {
2579     GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
2580     GST_ELEMENT_ERROR (aagg, STREAM, FORMAT, (NULL),
2581         ("Unknown data received, not negotiated"));
2582     return GST_FLOW_NOT_NEGOTIATED;
2583   }
2584 }
2585