1 /* GStreamer
2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2001 Thomas <thomas@apestaart.org>
4 * 2005,2006 Wim Taymans <wim@fluendo.com>
5 * 2013 Sebastian Dröge <sebastian@centricular.com>
6 * 2014 Collabora
7 * Olivier Crete <olivier.crete@collabora.com>
8 *
9 * gstaudioaggregator.c:
10 *
11 * This library is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Library General Public
13 * License as published by the Free Software Foundation; either
14 * version 2 of the License, or (at your option) any later version.
15 *
16 * This library is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 * Library General Public License for more details.
20 *
21 * You should have received a copy of the GNU Library General Public
22 * License along with this library; if not, write to the
23 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24 * Boston, MA 02110-1301, USA.
25 */
26 /**
27 * SECTION: gstaudioaggregator
28 * @title: GstAudioAggregator
29 * @short_description: Base class that manages a set of audio input pads
30 * with the purpose of aggregating or mixing their raw audio input buffers
31 * @see_also: #GstAggregator, #GstAudioMixer
32 *
33 * Subclasses must use (a subclass of) #GstAudioAggregatorPad for both
34 * their source and sink pads,
35 * gst_element_class_add_static_pad_template_with_gtype() is a convenient
36 * helper.
37 *
38 * #GstAudioAggregator can perform conversion on the data arriving
39 * on its sink pads, based on the format expected downstream: in order
40 * to enable that behaviour, the GType of the sink pads must either be
41 * a (subclass of) #GstAudioAggregatorConvertPad to use the default
42 * #GstAudioConverter implementation, or a subclass of #GstAudioAggregatorPad
43 * implementing #GstAudioAggregatorPadClass.convert_buffer.
44 *
45 * To allow for the output caps to change, the mechanism is the same as
46 * above, with the GType of the source pad.
47 *
48 * See #GstAudioMixer for an example.
49 *
50 * When conversion is enabled, #GstAudioAggregator will accept
51 * any type of raw audio caps and perform conversion
52 * on the data arriving on its sink pads, with whatever downstream
53 * expects as the target format.
54 *
55 * In case downstream caps are not fully fixated, it will use
56 * the first configured sink pad to finish fixating its source pad
57 * caps.
58 *
59 * A notable exception for now is the sample rate, sink pads must
60 * have the same sample rate as either the downstream requirement,
61 * or the first configured pad, or a combination of both (when
62 * downstream specifies a range or a set of acceptable rates).
63 *
64 * The #GstAggregator::samples-selected signal is provided with some
65 * additional information about the output buffer:
66 * - "offset" G_TYPE_UINT64 Offset in samples since segment start
67 * for the position that is next to be filled in the output buffer.
68 * - "frames" G_TYPE_UINT Number of frames per output buffer.
69 *
70 * In addition the gst_aggregator_peek_next_sample() function returns
71 * additional information in the info #GstStructure of the returned sample:
72 * - "output-offset" G_TYPE_UINT64 Sample offset in output segment relative to
73 * the output segment's start where the current position of this input
74 * buffer would be placed
75 * - "position" G_TYPE_UINT current position in the input buffer in samples
76 * - "size" G_TYPE_UINT size of the input buffer in samples
77 */
78
79
80 #ifdef HAVE_CONFIG_H
81 # include "config.h"
82 #endif
83
84 #include "gstaudioaggregator.h"
85
86 #include <string.h>
87
88 GST_DEBUG_CATEGORY_STATIC (audio_aggregator_debug);
89 #define GST_CAT_DEFAULT audio_aggregator_debug
90
91 enum
92 {
93 PROP_PAD_0,
94 PROP_PAD_QOS_MESSAGES,
95 };
96
97 struct _GstAudioAggregatorPadPrivate
98 {
99 /* All members are protected by the pad object lock */
100
101 GstBuffer *buffer; /* current buffer we're mixing, for
102 comparison with a new input buffer from
103 aggregator to see if we need to update our
104 cached values. */
105
106 guint position, size; /* position in the input buffer and size of the
107 input buffer in number of samples */
108
109 guint64 output_offset; /* Sample offset in output segment relative to
110 srcpad.segment.start where the current position
111 of this input_buffer would be placed. */
112
113 guint64 next_offset; /* Next expected sample offset relative to
114 pad.segment.start. This is -1 when resyncing is
115 needed, e.g. because of a previous discont. */
116
117 /* Last time we noticed a discont */
118 GstClockTime discont_time;
119
120 /* A new unhandled segment event has been received */
121 gboolean new_segment;
122
123 guint64 processed; /* Number of samples processed since the element came out of READY */
124 guint64 dropped; /* Number of sampels dropped since the element came out of READY */
125
126 gboolean qos_messages; /* Property to decide to send QoS messages or not */
127 };
128
129
130 /*****************************************
131 * GstAudioAggregatorPad implementation *
132 *****************************************/
133 G_DEFINE_TYPE_WITH_PRIVATE (GstAudioAggregatorPad, gst_audio_aggregator_pad,
134 GST_TYPE_AGGREGATOR_PAD);
135
136 static GstFlowReturn
137 gst_audio_aggregator_pad_flush_pad (GstAggregatorPad * aggpad,
138 GstAggregator * aggregator);
139
140 static void
gst_audio_aggregator_pad_finalize(GObject * object)141 gst_audio_aggregator_pad_finalize (GObject * object)
142 {
143 GstAudioAggregatorPad *pad = (GstAudioAggregatorPad *) object;
144
145 gst_buffer_replace (&pad->priv->buffer, NULL);
146
147 G_OBJECT_CLASS (gst_audio_aggregator_pad_parent_class)->finalize (object);
148 }
149
150 static void
gst_audio_aggregator_pad_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)151 gst_audio_aggregator_pad_get_property (GObject * object, guint prop_id,
152 GValue * value, GParamSpec * pspec)
153 {
154 GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (object);
155
156 switch (prop_id) {
157 case PROP_PAD_QOS_MESSAGES:
158 GST_OBJECT_LOCK (pad);
159 g_value_set_boolean (value, pad->priv->qos_messages);
160 GST_OBJECT_UNLOCK (pad);
161 break;
162 default:
163 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
164 break;
165 }
166 }
167
168 static void
gst_audio_aggregator_pad_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)169 gst_audio_aggregator_pad_set_property (GObject * object, guint prop_id,
170 const GValue * value, GParamSpec * pspec)
171 {
172 GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (object);
173
174 switch (prop_id) {
175 case PROP_PAD_QOS_MESSAGES:
176 GST_OBJECT_LOCK (pad);
177 pad->priv->qos_messages = g_value_get_boolean (value);
178 GST_OBJECT_UNLOCK (pad);
179 break;
180 default:
181 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
182 break;
183 }
184 }
185
186 static void
gst_audio_aggregator_pad_class_init(GstAudioAggregatorPadClass * klass)187 gst_audio_aggregator_pad_class_init (GstAudioAggregatorPadClass * klass)
188 {
189 GObjectClass *gobject_class = (GObjectClass *) klass;
190 GstAggregatorPadClass *aggpadclass = (GstAggregatorPadClass *) klass;
191
192 gobject_class->set_property = gst_audio_aggregator_pad_set_property;
193 gobject_class->get_property = gst_audio_aggregator_pad_get_property;
194 gobject_class->finalize = gst_audio_aggregator_pad_finalize;
195 aggpadclass->flush = GST_DEBUG_FUNCPTR (gst_audio_aggregator_pad_flush_pad);
196
197 /**
198 * GstAudioAggregatorPad:qos-messages:
199 *
200 * Emit QoS messages when dropping buffers.
201 *
202 * Since: 1.20
203 */
204 g_object_class_install_property (gobject_class,
205 PROP_PAD_QOS_MESSAGES, g_param_spec_boolean ("qos-messages",
206 "Quality of Service Messages",
207 "Emit QoS messages when dropping buffers", FALSE,
208 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
209 }
210
211 static void
gst_audio_aggregator_pad_init(GstAudioAggregatorPad * pad)212 gst_audio_aggregator_pad_init (GstAudioAggregatorPad * pad)
213 {
214 pad->priv = gst_audio_aggregator_pad_get_instance_private (pad);
215
216 gst_audio_info_init (&pad->info);
217
218 pad->priv->buffer = NULL;
219 pad->priv->position = 0;
220 pad->priv->size = 0;
221 pad->priv->output_offset = -1;
222 pad->priv->next_offset = -1;
223 pad->priv->discont_time = GST_CLOCK_TIME_NONE;
224 }
225
226 /* Must be called from srcpad thread or when it is stopped */
227 static void
gst_audio_aggregator_pad_reset_qos(GstAudioAggregatorPad * pad)228 gst_audio_aggregator_pad_reset_qos (GstAudioAggregatorPad * pad)
229 {
230 pad->priv->dropped = 0;
231 pad->priv->processed = 0;
232 }
233
234 static GstFlowReturn
gst_audio_aggregator_pad_flush_pad(GstAggregatorPad * aggpad,GstAggregator * aggregator)235 gst_audio_aggregator_pad_flush_pad (GstAggregatorPad * aggpad,
236 GstAggregator * aggregator)
237 {
238 GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (aggpad);
239
240 GST_OBJECT_LOCK (aggpad);
241 pad->priv->position = pad->priv->size = 0;
242 pad->priv->output_offset = pad->priv->next_offset = -1;
243 pad->priv->discont_time = GST_CLOCK_TIME_NONE;
244 gst_buffer_replace (&pad->priv->buffer, NULL);
245 gst_audio_aggregator_pad_reset_qos (pad);
246 GST_OBJECT_UNLOCK (aggpad);
247
248 return GST_FLOW_OK;
249 }
250
251 enum
252 {
253 PROP_CONVERT_PAD_0,
254 PROP_CONVERT_PAD_CONVERTER_CONFIG
255 };
256
257 struct _GstAudioAggregatorConvertPadPrivate
258 {
259 /* All members are protected by the pad object lock */
260 GstAudioConverter *converter;
261 GstStructure *converter_config;
262 gboolean converter_config_changed;
263 };
264
265
266 G_DEFINE_TYPE_WITH_PRIVATE (GstAudioAggregatorConvertPad,
267 gst_audio_aggregator_convert_pad, GST_TYPE_AUDIO_AGGREGATOR_PAD);
268
269 static gboolean
gst_audio_aggregator_convert_pad_update_converter(GstAudioAggregatorConvertPad * aaggcpad,GstAudioInfo * in_info,GstAudioInfo * out_info)270 gst_audio_aggregator_convert_pad_update_converter (GstAudioAggregatorConvertPad
271 * aaggcpad, GstAudioInfo * in_info, GstAudioInfo * out_info)
272 {
273 GstStructure *config = aaggcpad->priv->converter_config;
274 GstAudioConverter *converter;
275
276 if (!aaggcpad->priv->converter_config_changed) {
277 return TRUE;
278 }
279
280 g_clear_pointer (&aaggcpad->priv->converter, gst_audio_converter_free);
281
282 if (in_info->finfo->format == GST_AUDIO_FORMAT_UNKNOWN) {
283 /* If we haven't received caps yet, this pad should not have
284 * a buffer to convert anyway */
285 GST_FIXME_OBJECT (aaggcpad, "UNREACHABLE CODE: Unknown input format");
286 return FALSE;
287 }
288
289 converter =
290 gst_audio_converter_new (GST_AUDIO_CONVERTER_FLAG_NONE, in_info, out_info,
291 config ? gst_structure_copy (config) : NULL);
292
293 if (converter == NULL) {
294 /* Not converting when we need to but the config is invalid (e.g. because
295 * the mix-matrix is not the right size) produces garbage. An invalid
296 * config causes a GST_FLOW_NOT_NEGOTIATED. */
297 GST_WARNING_OBJECT (aaggcpad, "Failed to update converter");
298 return FALSE;
299 }
300
301 aaggcpad->priv->converter_config_changed = FALSE;
302
303 if (!gst_audio_converter_is_passthrough (converter))
304 aaggcpad->priv->converter = converter;
305 else
306 gst_audio_converter_free (converter);
307
308 return TRUE;
309 }
310
311 static void
gst_audio_aggregator_pad_update_conversion_info(GstAudioAggregatorPad * aaggpad)312 gst_audio_aggregator_pad_update_conversion_info (GstAudioAggregatorPad *
313 aaggpad)
314 {
315 GST_AUDIO_AGGREGATOR_CONVERT_PAD (aaggpad)->priv->converter_config_changed =
316 TRUE;
317 }
318
319 static GstBuffer *
gst_audio_aggregator_convert_pad_convert_buffer(GstAudioAggregatorPad * aaggpad,GstAudioInfo * in_info,GstAudioInfo * out_info,GstBuffer * input_buffer)320 gst_audio_aggregator_convert_pad_convert_buffer (GstAudioAggregatorPad *
321 aaggpad, GstAudioInfo * in_info, GstAudioInfo * out_info,
322 GstBuffer * input_buffer)
323 {
324 GstBuffer *res;
325 GstAudioAggregatorConvertPad *aaggcpad =
326 GST_AUDIO_AGGREGATOR_CONVERT_PAD (aaggpad);
327
328 if (!gst_audio_aggregator_convert_pad_update_converter (aaggcpad, in_info,
329 out_info)) {
330 return NULL;
331 }
332
333 if (aaggcpad->priv->converter) {
334 gint insize = gst_buffer_get_size (input_buffer);
335 gsize insamples = insize / in_info->bpf;
336 gsize outsamples =
337 gst_audio_converter_get_out_frames (aaggcpad->priv->converter,
338 insamples);
339 gint outsize = outsamples * out_info->bpf;
340 GstMapInfo inmap, outmap;
341
342 res = gst_buffer_new_allocate (NULL, outsize, NULL);
343
344 /* We create a perfectly similar buffer, except obviously for
345 * its converted contents */
346 gst_buffer_copy_into (res, input_buffer,
347 GST_BUFFER_COPY_FLAGS | GST_BUFFER_COPY_TIMESTAMPS |
348 GST_BUFFER_COPY_META, 0, -1);
349
350 gst_buffer_map (input_buffer, &inmap, GST_MAP_READ);
351 gst_buffer_map (res, &outmap, GST_MAP_WRITE);
352
353 gst_audio_converter_samples (aaggcpad->priv->converter,
354 GST_AUDIO_CONVERTER_FLAG_NONE,
355 (gpointer *) & inmap.data, insamples,
356 (gpointer *) & outmap.data, outsamples);
357
358 gst_buffer_unmap (input_buffer, &inmap);
359 gst_buffer_unmap (res, &outmap);
360 } else {
361 res = gst_buffer_ref (input_buffer);
362 }
363
364 return res;
365 }
366
367 static void
gst_audio_aggregator_convert_pad_finalize(GObject * object)368 gst_audio_aggregator_convert_pad_finalize (GObject * object)
369 {
370 GstAudioAggregatorConvertPad *pad = (GstAudioAggregatorConvertPad *) object;
371
372 if (pad->priv->converter)
373 gst_audio_converter_free (pad->priv->converter);
374
375 if (pad->priv->converter_config)
376 gst_structure_free (pad->priv->converter_config);
377
378 G_OBJECT_CLASS (gst_audio_aggregator_convert_pad_parent_class)->finalize
379 (object);
380 }
381
382 static void
gst_audio_aggregator_convert_pad_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)383 gst_audio_aggregator_convert_pad_get_property (GObject * object, guint prop_id,
384 GValue * value, GParamSpec * pspec)
385 {
386 GstAudioAggregatorConvertPad *pad = GST_AUDIO_AGGREGATOR_CONVERT_PAD (object);
387
388 switch (prop_id) {
389 case PROP_CONVERT_PAD_CONVERTER_CONFIG:
390 GST_OBJECT_LOCK (pad);
391 if (pad->priv->converter_config)
392 g_value_set_boxed (value, pad->priv->converter_config);
393 GST_OBJECT_UNLOCK (pad);
394 break;
395 default:
396 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
397 break;
398 }
399 }
400
401 static void
gst_audio_aggregator_convert_pad_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)402 gst_audio_aggregator_convert_pad_set_property (GObject * object, guint prop_id,
403 const GValue * value, GParamSpec * pspec)
404 {
405 GstAudioAggregatorConvertPad *pad = GST_AUDIO_AGGREGATOR_CONVERT_PAD (object);
406
407 switch (prop_id) {
408 case PROP_CONVERT_PAD_CONVERTER_CONFIG:
409 GST_OBJECT_LOCK (pad);
410 if (pad->priv->converter_config)
411 gst_structure_free (pad->priv->converter_config);
412 pad->priv->converter_config = g_value_dup_boxed (value);
413 pad->priv->converter_config_changed = TRUE;
414 GST_OBJECT_UNLOCK (pad);
415 break;
416 default:
417 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
418 break;
419 }
420 }
421
422 static void
gst_audio_aggregator_convert_pad_class_init(GstAudioAggregatorConvertPadClass * klass)423 gst_audio_aggregator_convert_pad_class_init (GstAudioAggregatorConvertPadClass *
424 klass)
425 {
426 GObjectClass *gobject_class = (GObjectClass *) klass;
427 GstAudioAggregatorPadClass *aaggpad_class =
428 (GstAudioAggregatorPadClass *) klass;
429
430 gobject_class->set_property = gst_audio_aggregator_convert_pad_set_property;
431 gobject_class->get_property = gst_audio_aggregator_convert_pad_get_property;
432
433 g_object_class_install_property (gobject_class,
434 PROP_CONVERT_PAD_CONVERTER_CONFIG,
435 g_param_spec_boxed ("converter-config", "Converter configuration",
436 "A GstStructure describing the configuration that should be used "
437 "when converting this pad's audio buffers",
438 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
439
440 aaggpad_class->convert_buffer =
441 gst_audio_aggregator_convert_pad_convert_buffer;
442
443 aaggpad_class->update_conversion_info =
444 gst_audio_aggregator_pad_update_conversion_info;
445
446 gobject_class->finalize = gst_audio_aggregator_convert_pad_finalize;
447 }
448
449 static void
gst_audio_aggregator_convert_pad_init(GstAudioAggregatorConvertPad * pad)450 gst_audio_aggregator_convert_pad_init (GstAudioAggregatorConvertPad * pad)
451 {
452 pad->priv = gst_audio_aggregator_convert_pad_get_instance_private (pad);
453 }
454
455 /**************************************
456 * GstAudioAggregator implementation *
457 **************************************/
458
459 struct _GstAudioAggregatorPrivate
460 {
461 GMutex mutex;
462
463 /* All three properties are unprotected, can't be modified while streaming */
464 /* Size in frames that is output per buffer */
465 GstClockTime alignment_threshold;
466 GstClockTime discont_wait;
467
468 gint output_buffer_duration_n;
469 gint output_buffer_duration_d;
470
471 guint samples_per_buffer;
472 guint error_per_buffer;
473 guint accumulated_error;
474 guint current_blocksize;
475
476 /* Protected by srcpad stream clock */
477 /* Output buffer starting at offset containing blocksize frames (calculated
478 * from output_buffer_duration) */
479 GstBuffer *current_buffer;
480
481 /* counters to keep track of timestamps */
482 /* Readable with object lock, writable with both aag lock and object lock */
483
484 /* Sample offset starting from 0 at aggregator.segment.start */
485 gint64 offset;
486
487 /* info structure passed to selected-samples signal, must only be accessed
488 * from the aggregate thread */
489 GstStructure *selected_samples_info;
490
491 /* Only access from src thread */
492 /* Messages to post after releasing locks */
493 GQueue messages;
494 };
495
496 #define GST_AUDIO_AGGREGATOR_LOCK(self) g_mutex_lock (&(self)->priv->mutex);
497 #define GST_AUDIO_AGGREGATOR_UNLOCK(self) g_mutex_unlock (&(self)->priv->mutex);
498
499 static void gst_audio_aggregator_set_property (GObject * object, guint prop_id,
500 const GValue * value, GParamSpec * pspec);
501 static void gst_audio_aggregator_get_property (GObject * object, guint prop_id,
502 GValue * value, GParamSpec * pspec);
503 static void gst_audio_aggregator_dispose (GObject * object);
504
505 static gboolean gst_audio_aggregator_src_event (GstAggregator * agg,
506 GstEvent * event);
507 static gboolean gst_audio_aggregator_sink_event (GstAggregator * agg,
508 GstAggregatorPad * aggpad, GstEvent * event);
509 static gboolean gst_audio_aggregator_src_query (GstAggregator * agg,
510 GstQuery * query);
511 static gboolean
512 gst_audio_aggregator_sink_query (GstAggregator * agg, GstAggregatorPad * aggpad,
513 GstQuery * query);
514 static gboolean gst_audio_aggregator_start (GstAggregator * agg);
515 static gboolean gst_audio_aggregator_stop (GstAggregator * agg);
516 static GstFlowReturn gst_audio_aggregator_flush (GstAggregator * agg);
517
518 static GstBuffer *gst_audio_aggregator_create_output_buffer (GstAudioAggregator
519 * aagg, guint num_frames);
520 static GstBuffer *gst_audio_aggregator_do_clip (GstAggregator * agg,
521 GstAggregatorPad * bpad, GstBuffer * buffer);
522 static GstFlowReturn gst_audio_aggregator_aggregate (GstAggregator * agg,
523 gboolean timeout);
524 static gboolean sync_pad_values (GstElement * aagg, GstPad * pad, gpointer ud);
525 static gboolean gst_audio_aggregator_negotiated_src_caps (GstAggregator * agg,
526 GstCaps * caps);
527 static GstFlowReturn
528 gst_audio_aggregator_update_src_caps (GstAggregator * agg,
529 GstCaps * caps, GstCaps ** ret);
530 static GstCaps *gst_audio_aggregator_fixate_src_caps (GstAggregator * agg,
531 GstCaps * caps);
532 static GstSample *gst_audio_aggregator_peek_next_sample (GstAggregator * agg,
533 GstAggregatorPad * aggpad);
534
535 #define DEFAULT_OUTPUT_BUFFER_DURATION (10 * GST_MSECOND)
536 #define DEFAULT_ALIGNMENT_THRESHOLD (40 * GST_MSECOND)
537 #define DEFAULT_DISCONT_WAIT (1 * GST_SECOND)
538 #define DEFAULT_OUTPUT_BUFFER_DURATION_N (1)
539 #define DEFAULT_OUTPUT_BUFFER_DURATION_D (100)
540
541 enum
542 {
543 PROP_0,
544 PROP_OUTPUT_BUFFER_DURATION,
545 PROP_ALIGNMENT_THRESHOLD,
546 PROP_DISCONT_WAIT,
547 PROP_OUTPUT_BUFFER_DURATION_FRACTION,
548 PROP_IGNORE_INACTIVE_PADS,
549 };
550
551 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GstAudioAggregator, gst_audio_aggregator,
552 GST_TYPE_AGGREGATOR);
553
554 static GstBuffer *
gst_audio_aggregator_convert_buffer(GstAudioAggregator * aagg,GstPad * pad,GstAudioInfo * in_info,GstAudioInfo * out_info,GstBuffer * buffer)555 gst_audio_aggregator_convert_buffer (GstAudioAggregator * aagg, GstPad * pad,
556 GstAudioInfo * in_info, GstAudioInfo * out_info, GstBuffer * buffer)
557 {
558 GstAudioAggregatorPadClass *klass = GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (pad);
559 GstAudioAggregatorPad *aaggpad = GST_AUDIO_AGGREGATOR_PAD (pad);
560
561 g_assert (klass->convert_buffer);
562
563 return klass->convert_buffer (aaggpad, in_info, out_info, buffer);
564 }
565
566 static void
gst_audio_aggregator_translate_output_buffer_duration(GstAudioAggregator * aagg,GstClockTime duration)567 gst_audio_aggregator_translate_output_buffer_duration (GstAudioAggregator *
568 aagg, GstClockTime duration)
569 {
570 gint gcd;
571
572 aagg->priv->output_buffer_duration_n = duration;
573 aagg->priv->output_buffer_duration_d = GST_SECOND;
574
575 gcd = gst_util_greatest_common_divisor (aagg->priv->output_buffer_duration_n,
576 aagg->priv->output_buffer_duration_d);
577
578 if (gcd) {
579 aagg->priv->output_buffer_duration_n /= gcd;
580 aagg->priv->output_buffer_duration_d /= gcd;
581 }
582 }
583
584 static gboolean
gst_audio_aggregator_update_samples_per_buffer(GstAudioAggregator * aagg)585 gst_audio_aggregator_update_samples_per_buffer (GstAudioAggregator * aagg)
586 {
587 gboolean ret = TRUE;
588 GstAudioAggregatorPad *srcpad =
589 GST_AUDIO_AGGREGATOR_PAD (GST_AGGREGATOR_SRC_PAD (aagg));
590
591 if (!srcpad->info.finfo
592 || GST_AUDIO_INFO_FORMAT (&srcpad->info) == GST_AUDIO_FORMAT_UNKNOWN) {
593 ret = FALSE;
594 goto out;
595 }
596
597 aagg->priv->samples_per_buffer =
598 (((guint64) GST_AUDIO_INFO_RATE (&srcpad->info)) *
599 aagg->priv->output_buffer_duration_n) /
600 aagg->priv->output_buffer_duration_d;
601
602 if (aagg->priv->samples_per_buffer == 0) {
603 ret = FALSE;
604 goto out;
605 }
606
607 aagg->priv->error_per_buffer =
608 (((guint64) GST_AUDIO_INFO_RATE (&srcpad->info)) *
609 aagg->priv->output_buffer_duration_n) %
610 aagg->priv->output_buffer_duration_d;
611 aagg->priv->accumulated_error = 0;
612
613 GST_DEBUG_OBJECT (aagg, "Buffer duration: %u/%u",
614 aagg->priv->output_buffer_duration_n,
615 aagg->priv->output_buffer_duration_d);
616 GST_DEBUG_OBJECT (aagg, "Samples per buffer: %u (error: %u/%u)",
617 aagg->priv->samples_per_buffer, aagg->priv->error_per_buffer,
618 aagg->priv->output_buffer_duration_d);
619
620 out:
621 return ret;
622 }
623
624 static void
gst_audio_aggregator_recalculate_latency(GstAudioAggregator * aagg)625 gst_audio_aggregator_recalculate_latency (GstAudioAggregator * aagg)
626 {
627 guint64 latency = gst_util_uint64_scale_int (GST_SECOND,
628 aagg->priv->output_buffer_duration_n,
629 aagg->priv->output_buffer_duration_d);
630
631 gst_aggregator_set_latency (GST_AGGREGATOR (aagg), latency, latency);
632
633 GST_OBJECT_LOCK (aagg);
634 /* Force recalculating in aggregate */
635 aagg->priv->samples_per_buffer = 0;
636 GST_OBJECT_UNLOCK (aagg);
637 }
638
639 static void
gst_audio_aggregator_class_init(GstAudioAggregatorClass * klass)640 gst_audio_aggregator_class_init (GstAudioAggregatorClass * klass)
641 {
642 GObjectClass *gobject_class = (GObjectClass *) klass;
643 GstAggregatorClass *gstaggregator_class = (GstAggregatorClass *) klass;
644
645 gobject_class->set_property = gst_audio_aggregator_set_property;
646 gobject_class->get_property = gst_audio_aggregator_get_property;
647 gobject_class->dispose = gst_audio_aggregator_dispose;
648
649 gstaggregator_class->src_event =
650 GST_DEBUG_FUNCPTR (gst_audio_aggregator_src_event);
651 gstaggregator_class->sink_event =
652 GST_DEBUG_FUNCPTR (gst_audio_aggregator_sink_event);
653 gstaggregator_class->src_query =
654 GST_DEBUG_FUNCPTR (gst_audio_aggregator_src_query);
655 gstaggregator_class->sink_query = gst_audio_aggregator_sink_query;
656 gstaggregator_class->start = gst_audio_aggregator_start;
657 gstaggregator_class->stop = gst_audio_aggregator_stop;
658 gstaggregator_class->flush = gst_audio_aggregator_flush;
659 gstaggregator_class->aggregate =
660 GST_DEBUG_FUNCPTR (gst_audio_aggregator_aggregate);
661 gstaggregator_class->clip = GST_DEBUG_FUNCPTR (gst_audio_aggregator_do_clip);
662 gstaggregator_class->get_next_time = gst_aggregator_simple_get_next_time;
663 gstaggregator_class->update_src_caps =
664 GST_DEBUG_FUNCPTR (gst_audio_aggregator_update_src_caps);
665 gstaggregator_class->fixate_src_caps = gst_audio_aggregator_fixate_src_caps;
666 gstaggregator_class->negotiated_src_caps =
667 gst_audio_aggregator_negotiated_src_caps;
668 gstaggregator_class->peek_next_sample = gst_audio_aggregator_peek_next_sample;
669
670 klass->create_output_buffer = gst_audio_aggregator_create_output_buffer;
671
672 GST_DEBUG_CATEGORY_INIT (audio_aggregator_debug, "audioaggregator",
673 GST_DEBUG_FG_MAGENTA, "GstAudioAggregator");
674
675 g_object_class_install_property (gobject_class, PROP_OUTPUT_BUFFER_DURATION,
676 g_param_spec_uint64 ("output-buffer-duration", "Output Buffer Duration",
677 "Output block size in nanoseconds", 1,
678 G_MAXUINT64, DEFAULT_OUTPUT_BUFFER_DURATION,
679 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
680
681 /**
682 * GstAudioAggregator:output-buffer-duration-fraction:
683 *
684 * Output block size in nanoseconds, expressed as a fraction.
685 *
686 * Since: 1.18
687 */
688 g_object_class_install_property (gobject_class,
689 PROP_OUTPUT_BUFFER_DURATION_FRACTION,
690 gst_param_spec_fraction ("output-buffer-duration-fraction",
691 "Output buffer duration fraction",
692 "Output block size in nanoseconds, expressed as a fraction", 1,
693 G_MAXINT, G_MAXINT, 1, DEFAULT_OUTPUT_BUFFER_DURATION_N,
694 DEFAULT_OUTPUT_BUFFER_DURATION_D,
695 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
696 GST_PARAM_MUTABLE_READY));
697
698 g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
699 g_param_spec_uint64 ("alignment-threshold", "Alignment Threshold",
700 "Timestamp alignment threshold in nanoseconds", 0,
701 G_MAXUINT64 - 1, DEFAULT_ALIGNMENT_THRESHOLD,
702 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
703
704 g_object_class_install_property (gobject_class, PROP_DISCONT_WAIT,
705 g_param_spec_uint64 ("discont-wait", "Discont Wait",
706 "Window of time in nanoseconds to wait before "
707 "creating a discontinuity", 0,
708 G_MAXUINT64 - 1, DEFAULT_DISCONT_WAIT,
709 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
710 GST_PARAM_MUTABLE_PLAYING));
711
712 /**
713 * GstAudioAggregator:ignore-inactive-pads:
714 *
715 * Don't wait for inactive pads when live. An inactive pad
716 * is a pad that hasn't yet received a buffer, but that has
717 * been waited on at least once.
718 *
719 * The purpose of this property is to avoid aggregating on
720 * timeout when new pads are requested in advance of receiving
721 * data flow, for example the user may decide to connect it later,
722 * but wants to configure it already.
723 *
724 * Since: 1.20
725 */
726 g_object_class_install_property (gobject_class,
727 PROP_IGNORE_INACTIVE_PADS, g_param_spec_boolean ("ignore-inactive-pads",
728 "Ignore inactive pads",
729 "Avoid timing out waiting for inactive pads", FALSE,
730 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
731 }
732
733 static void
gst_audio_aggregator_init(GstAudioAggregator * aagg)734 gst_audio_aggregator_init (GstAudioAggregator * aagg)
735 {
736 aagg->priv = gst_audio_aggregator_get_instance_private (aagg);
737
738 g_mutex_init (&aagg->priv->mutex);
739
740 aagg->priv->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
741 aagg->priv->discont_wait = DEFAULT_DISCONT_WAIT;
742
743 gst_audio_aggregator_translate_output_buffer_duration (aagg,
744 DEFAULT_OUTPUT_BUFFER_DURATION);
745 gst_audio_aggregator_recalculate_latency (aagg);
746
747 aagg->current_caps = NULL;
748
749 aagg->priv->selected_samples_info =
750 gst_structure_new_empty ("GstAudioAggregatorSelectedSamplesInfo");
751
752 g_queue_init (&aagg->priv->messages);
753 }
754
755 static void
gst_audio_aggregator_dispose(GObject * object)756 gst_audio_aggregator_dispose (GObject * object)
757 {
758 GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (object);
759
760 gst_caps_replace (&aagg->current_caps, NULL);
761
762 gst_clear_structure (&aagg->priv->selected_samples_info);
763
764 g_mutex_clear (&aagg->priv->mutex);
765
766 G_OBJECT_CLASS (gst_audio_aggregator_parent_class)->dispose (object);
767 }
768
769 static void
gst_audio_aggregator_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)770 gst_audio_aggregator_set_property (GObject * object, guint prop_id,
771 const GValue * value, GParamSpec * pspec)
772 {
773 GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (object);
774
775 switch (prop_id) {
776 case PROP_OUTPUT_BUFFER_DURATION:
777 gst_audio_aggregator_translate_output_buffer_duration (aagg,
778 g_value_get_uint64 (value));
779 g_object_notify (object, "output-buffer-duration-fraction");
780 gst_audio_aggregator_recalculate_latency (aagg);
781 break;
782 case PROP_ALIGNMENT_THRESHOLD:
783 aagg->priv->alignment_threshold = g_value_get_uint64 (value);
784 break;
785 case PROP_DISCONT_WAIT:
786 aagg->priv->discont_wait = g_value_get_uint64 (value);
787 break;
788 case PROP_OUTPUT_BUFFER_DURATION_FRACTION:
789 aagg->priv->output_buffer_duration_n =
790 gst_value_get_fraction_numerator (value);
791 aagg->priv->output_buffer_duration_d =
792 gst_value_get_fraction_denominator (value);
793 g_object_notify (object, "output-buffer-duration");
794 gst_audio_aggregator_recalculate_latency (aagg);
795 break;
796 case PROP_IGNORE_INACTIVE_PADS:
797 gst_aggregator_set_ignore_inactive_pads (GST_AGGREGATOR (object),
798 g_value_get_boolean (value));
799 break;
800 default:
801 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
802 break;
803 }
804 }
805
806 static void
gst_audio_aggregator_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)807 gst_audio_aggregator_get_property (GObject * object, guint prop_id,
808 GValue * value, GParamSpec * pspec)
809 {
810 GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (object);
811
812 switch (prop_id) {
813 case PROP_OUTPUT_BUFFER_DURATION:
814 g_value_set_uint64 (value, gst_util_uint64_scale_int (GST_SECOND,
815 aagg->priv->output_buffer_duration_n,
816 aagg->priv->output_buffer_duration_d));
817 break;
818 case PROP_ALIGNMENT_THRESHOLD:
819 g_value_set_uint64 (value, aagg->priv->alignment_threshold);
820 break;
821 case PROP_DISCONT_WAIT:
822 g_value_set_uint64 (value, aagg->priv->discont_wait);
823 break;
824 case PROP_OUTPUT_BUFFER_DURATION_FRACTION:
825 gst_value_set_fraction (value, aagg->priv->output_buffer_duration_n,
826 aagg->priv->output_buffer_duration_d);
827 break;
828 case PROP_IGNORE_INACTIVE_PADS:
829 g_value_set_boolean (value,
830 gst_aggregator_get_ignore_inactive_pads (GST_AGGREGATOR (object)));
831 break;
832 default:
833 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
834 break;
835 }
836 }
837
838 /* Caps negotiation */
839
840 /* Unref after usage */
841 static GstAudioAggregatorPad *
gst_audio_aggregator_get_first_configured_pad(GstAggregator * agg)842 gst_audio_aggregator_get_first_configured_pad (GstAggregator * agg)
843 {
844 GstAudioAggregatorPad *res = NULL;
845 GList *l;
846
847 GST_OBJECT_LOCK (agg);
848 for (l = GST_ELEMENT (agg)->sinkpads; l; l = l->next) {
849 GstAudioAggregatorPad *aaggpad = l->data;
850
851 if (GST_AUDIO_INFO_FORMAT (&aaggpad->info) != GST_AUDIO_FORMAT_UNKNOWN) {
852 res = gst_object_ref (aaggpad);
853 break;
854 }
855 }
856 GST_OBJECT_UNLOCK (agg);
857
858 return res;
859 }
860
861 static GstCaps *
gst_audio_aggregator_sink_getcaps(GstPad * pad,GstAggregator * agg,GstCaps * filter)862 gst_audio_aggregator_sink_getcaps (GstPad * pad, GstAggregator * agg,
863 GstCaps * filter)
864 {
865 GstAudioAggregatorPad *first_configured_pad =
866 gst_audio_aggregator_get_first_configured_pad (agg);
867 GstCaps *sink_template_caps = gst_pad_get_pad_template_caps (pad);
868 GstCaps *downstream_caps = gst_pad_get_allowed_caps (agg->srcpad);
869 GstCaps *sink_caps;
870
871 GST_INFO_OBJECT (pad, "Getting caps with filter %" GST_PTR_FORMAT, filter);
872 GST_DEBUG_OBJECT (pad, "sink template caps : %" GST_PTR_FORMAT,
873 sink_template_caps);
874 GST_DEBUG_OBJECT (pad, "downstream caps %" GST_PTR_FORMAT, downstream_caps);
875
876 /* If we already have a configured pad, assume that we can only configure
877 * to the very same format filtered with the template caps and continue
878 * with the result of that as the template caps */
879
880 if (first_configured_pad) {
881 GstCaps *first_configured_caps =
882 gst_audio_info_to_caps (&first_configured_pad->info);
883 GstCaps *tmp;
884
885 tmp =
886 gst_caps_intersect_full (sink_template_caps, first_configured_caps,
887 GST_CAPS_INTERSECT_FIRST);
888 gst_caps_unref (first_configured_caps);
889 gst_caps_unref (sink_template_caps);
890 sink_template_caps = tmp;
891
892 gst_object_unref (first_configured_pad);
893 }
894
895 /* If we have downstream caps, filter them against our template caps or
896 * the filtered first configured pad caps from above */
897 if (downstream_caps) {
898 sink_caps =
899 gst_caps_intersect_full (sink_template_caps, downstream_caps,
900 GST_CAPS_INTERSECT_FIRST);
901 } else {
902 sink_caps = gst_caps_ref (sink_template_caps);
903 }
904
905 if (filter) {
906 GstCaps *tmp = gst_caps_intersect_full (sink_caps, filter,
907 GST_CAPS_INTERSECT_FIRST);
908
909 gst_caps_unref (sink_caps);
910 sink_caps = tmp;
911 }
912
913 gst_caps_unref (sink_template_caps);
914
915 if (downstream_caps)
916 gst_caps_unref (downstream_caps);
917
918 GST_INFO_OBJECT (pad, "returned sink caps : %" GST_PTR_FORMAT, sink_caps);
919
920 return sink_caps;
921 }
922
923 static GstCaps *
gst_audio_aggregator_convert_sink_getcaps(GstPad * pad,GstAggregator * agg,GstCaps * filter)924 gst_audio_aggregator_convert_sink_getcaps (GstPad * pad, GstAggregator * agg,
925 GstCaps * filter)
926 {
927 GstAudioAggregatorPad *first_configured_pad =
928 gst_audio_aggregator_get_first_configured_pad (agg);
929 GstCaps *sink_template_caps = gst_pad_get_pad_template_caps (pad);
930 GstCaps *downstream_caps = gst_pad_get_allowed_caps (agg->srcpad);
931 GstCaps *sink_caps;
932
933 GST_INFO_OBJECT (pad, "Getting caps with filter %" GST_PTR_FORMAT, filter);
934 GST_DEBUG_OBJECT (pad, "sink template caps : %" GST_PTR_FORMAT,
935 sink_template_caps);
936 GST_DEBUG_OBJECT (pad, "downstream caps %" GST_PTR_FORMAT, downstream_caps);
937
938 /* We can convert between all formats except for the sample rate, which has
939 * to match. */
940
941 /* If we have a first configured pad, we can only convert everything except
942 * for the sample rate, so modify our template caps to have exactly that
943 * sample rate in all structures */
944 if (first_configured_pad) {
945 GST_INFO_OBJECT (pad, "first configured pad has sample rate %d",
946 first_configured_pad->info.rate);
947 sink_template_caps = gst_caps_make_writable (sink_template_caps);
948 gst_caps_set_simple (sink_template_caps, "rate", G_TYPE_INT,
949 first_configured_pad->info.rate, NULL);
950 gst_object_unref (first_configured_pad);
951 }
952
953 /* Now if we have downstream caps, filter against the template caps from
954 * above, i.e. with potentially fixated sample rate field already. This
955 * filters out any structures with unsupported rates.
956 *
957 * Afterwards we create new caps that only take over the rate fields of the
958 * remaining downstream caps, and filter that against the plain template
959 * caps to get the resulting allowed caps with conversion for everything but
960 * the rate */
961 if (downstream_caps) {
962 GstCaps *tmp;
963 guint i, n;
964
965 tmp =
966 gst_caps_intersect_full (sink_template_caps, downstream_caps,
967 GST_CAPS_INTERSECT_FIRST);
968
969 n = gst_caps_get_size (tmp);
970 sink_caps = gst_caps_new_empty ();
971 for (i = 0; i < n; i++) {
972 GstStructure *s = gst_caps_get_structure (tmp, i);
973 GstStructure *new_s =
974 gst_structure_new_empty (gst_structure_get_name (s));
975 gst_structure_set_value (new_s, "rate", gst_structure_get_value (s,
976 "rate"));
977 sink_caps = gst_caps_merge_structure (sink_caps, new_s);
978 }
979 gst_caps_unref (tmp);
980 tmp = sink_caps;
981
982 sink_caps =
983 gst_caps_intersect_full (sink_template_caps, tmp,
984 GST_CAPS_INTERSECT_FIRST);
985 gst_caps_unref (tmp);
986 } else {
987 sink_caps = gst_caps_ref (sink_template_caps);
988 }
989
990 /* And finally filter anything that remains against the filter caps */
991 if (filter) {
992 GstCaps *tmp =
993 gst_caps_intersect_full (filter, sink_caps, GST_CAPS_INTERSECT_FIRST);
994 gst_caps_unref (sink_caps);
995 sink_caps = tmp;
996 }
997
998 GST_INFO_OBJECT (pad, "returned sink caps : %" GST_PTR_FORMAT, sink_caps);
999
1000 gst_caps_unref (sink_template_caps);
1001
1002 if (downstream_caps)
1003 gst_caps_unref (downstream_caps);
1004
1005 return sink_caps;
1006 }
1007
1008 static gboolean
gst_audio_aggregator_sink_setcaps(GstAudioAggregatorPad * aaggpad,GstAggregator * agg,GstCaps * caps)1009 gst_audio_aggregator_sink_setcaps (GstAudioAggregatorPad * aaggpad,
1010 GstAggregator * agg, GstCaps * caps)
1011 {
1012 GstAudioAggregatorPad *first_configured_pad =
1013 gst_audio_aggregator_get_first_configured_pad (agg);
1014 GstAudioInfo info;
1015 gboolean ret = TRUE;
1016 gboolean downstream_supports_rate = TRUE;
1017
1018 if (!gst_audio_info_from_caps (&info, caps)) {
1019 GST_WARNING_OBJECT (aaggpad, "Rejecting invalid caps: %" GST_PTR_FORMAT,
1020 caps);
1021 return FALSE;
1022 }
1023
1024 /* TODO: handle different rates on sinkpads, a bit complex
1025 * because offsets will have to be updated, and audio resampling
1026 * has a latency to take into account
1027 */
1028
1029 /* Only check against the downstream caps if we didn't configure any caps
1030 * so far. Otherwise we already know that downstream supports the rate
1031 * because we negotiated with downstream */
1032 if (!first_configured_pad) {
1033 GstCaps *downstream_caps = gst_pad_get_allowed_caps (agg->srcpad);
1034
1035 /* Returns NULL if there is no downstream peer */
1036 if (downstream_caps) {
1037 GstCaps *rate_caps =
1038 gst_caps_new_simple ("audio/x-raw", "rate", G_TYPE_INT, info.rate,
1039 NULL);
1040
1041 gst_caps_set_features_simple (rate_caps,
1042 gst_caps_features_copy (GST_CAPS_FEATURES_ANY));
1043
1044 downstream_supports_rate =
1045 gst_caps_can_intersect (rate_caps, downstream_caps);
1046 gst_caps_unref (rate_caps);
1047 gst_caps_unref (downstream_caps);
1048 }
1049 }
1050
1051 if (!downstream_supports_rate || (first_configured_pad
1052 && info.rate != first_configured_pad->info.rate)) {
1053 GST_WARNING_OBJECT (aaggpad,
1054 "Sample rate %d can't be configured (downstream supported: %d, configured rate: %d)",
1055 info.rate, downstream_supports_rate,
1056 first_configured_pad ? first_configured_pad->info.rate : 0);
1057 gst_pad_push_event (GST_PAD (aaggpad), gst_event_new_reconfigure ());
1058 ret = FALSE;
1059 } else {
1060 GstAudioAggregatorPadClass *klass =
1061 GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (aaggpad);
1062 GST_OBJECT_LOCK (aaggpad);
1063 aaggpad->info = info;
1064 if (klass->update_conversion_info)
1065 klass->update_conversion_info (aaggpad);
1066 GST_OBJECT_UNLOCK (aaggpad);
1067 }
1068
1069 if (first_configured_pad)
1070 gst_object_unref (first_configured_pad);
1071
1072 return ret;
1073 }
1074
1075 static GstFlowReturn
gst_audio_aggregator_update_src_caps(GstAggregator * agg,GstCaps * caps,GstCaps ** ret)1076 gst_audio_aggregator_update_src_caps (GstAggregator * agg,
1077 GstCaps * caps, GstCaps ** ret)
1078 {
1079 GstCaps *src_template_caps = gst_pad_get_pad_template_caps (agg->srcpad);
1080 GstCaps *downstream_caps =
1081 gst_pad_peer_query_caps (agg->srcpad, src_template_caps);
1082
1083 gst_caps_unref (src_template_caps);
1084
1085 *ret = gst_caps_intersect (caps, downstream_caps);
1086
1087 GST_INFO ("Updated src caps to %" GST_PTR_FORMAT, *ret);
1088
1089 if (downstream_caps)
1090 gst_caps_unref (downstream_caps);
1091
1092 return GST_FLOW_OK;
1093 }
1094
1095 /* At that point if the caps are not fixed, this means downstream
1096 * didn't have fully specified requirements, we'll just go ahead
1097 * and fixate raw audio fields using our first configured pad, we don't for
1098 * now need a more complicated heuristic
1099 */
1100 static GstCaps *
gst_audio_aggregator_fixate_src_caps(GstAggregator * agg,GstCaps * caps)1101 gst_audio_aggregator_fixate_src_caps (GstAggregator * agg, GstCaps * caps)
1102 {
1103 GstAudioAggregatorPad *first_configured_pad = NULL;
1104
1105 if (GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (agg->srcpad)->convert_buffer)
1106 first_configured_pad = gst_audio_aggregator_get_first_configured_pad (agg);
1107
1108 caps = gst_caps_make_writable (caps);
1109
1110 if (first_configured_pad) {
1111 GstStructure *s, *s2;
1112 GstCaps *first_configured_caps =
1113 gst_audio_info_to_caps (&first_configured_pad->info);
1114 gint first_configured_rate, first_configured_channels;
1115 gint channels;
1116
1117 s = gst_caps_get_structure (caps, 0);
1118 s2 = gst_caps_get_structure (first_configured_caps, 0);
1119
1120 gst_structure_get_int (s2, "rate", &first_configured_rate);
1121 gst_structure_get_int (s2, "channels", &first_configured_channels);
1122
1123 gst_structure_fixate_field_string (s, "format",
1124 gst_structure_get_string (s2, "format"));
1125 gst_structure_fixate_field_string (s, "layout",
1126 gst_structure_get_string (s2, "layout"));
1127 gst_structure_fixate_field_nearest_int (s, "rate", first_configured_rate);
1128 gst_structure_fixate_field_nearest_int (s, "channels",
1129 first_configured_channels);
1130
1131 gst_structure_get_int (s, "channels", &channels);
1132
1133 if (!gst_structure_has_field (s, "channel-mask") && channels > 2) {
1134 guint64 mask;
1135
1136 if (!gst_structure_get (s2, "channel-mask", GST_TYPE_BITMASK, &mask,
1137 NULL)) {
1138 mask = gst_audio_channel_get_fallback_mask (channels);
1139 }
1140 gst_structure_set (s, "channel-mask", GST_TYPE_BITMASK, mask, NULL);
1141 }
1142
1143 gst_caps_unref (first_configured_caps);
1144 gst_object_unref (first_configured_pad);
1145 } else {
1146 GstStructure *s;
1147 gint channels;
1148
1149 s = gst_caps_get_structure (caps, 0);
1150
1151 gst_structure_fixate_field_nearest_int (s, "rate", GST_AUDIO_DEF_RATE);
1152 gst_structure_fixate_field_string (s, "format", GST_AUDIO_NE ("S16"));
1153 gst_structure_fixate_field_string (s, "layout", "interleaved");
1154 gst_structure_fixate_field_nearest_int (s, "channels", 2);
1155
1156 if (gst_structure_get_int (s, "channels", &channels) && channels > 2) {
1157 if (!gst_structure_has_field_typed (s, "channel-mask", GST_TYPE_BITMASK))
1158 gst_structure_set (s, "channel-mask", GST_TYPE_BITMASK, 0ULL, NULL);
1159 }
1160 }
1161
1162 if (!gst_caps_is_fixed (caps))
1163 caps = gst_caps_fixate (caps);
1164
1165 GST_INFO_OBJECT (agg, "Fixated src caps to %" GST_PTR_FORMAT, caps);
1166
1167 return caps;
1168 }
1169
1170 /* Must be called with OBJECT_LOCK taken */
1171 static gboolean
gst_audio_aggregator_update_converters(GstAudioAggregator * aagg,GstAudioInfo * new_info,GstAudioInfo * old_info)1172 gst_audio_aggregator_update_converters (GstAudioAggregator * aagg,
1173 GstAudioInfo * new_info, GstAudioInfo * old_info)
1174 {
1175 GList *l;
1176
1177 for (l = GST_ELEMENT (aagg)->sinkpads; l; l = l->next) {
1178 GstAudioAggregatorPad *aaggpad = l->data;
1179 GstAudioAggregatorPadClass *klass =
1180 GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (aaggpad);
1181
1182 if (klass->update_conversion_info)
1183 klass->update_conversion_info (aaggpad);
1184
1185 /* If we currently were mixing a buffer, we need to convert it to the new
1186 * format */
1187 if (aaggpad->priv->buffer) {
1188 GstBuffer *new_converted_buffer =
1189 gst_audio_aggregator_convert_buffer (aagg, GST_PAD (aaggpad),
1190 old_info, new_info, aaggpad->priv->buffer);
1191 gst_buffer_replace (&aaggpad->priv->buffer, new_converted_buffer);
1192 if (new_converted_buffer)
1193 gst_buffer_unref (new_converted_buffer);
1194 }
1195 }
1196
1197 return TRUE;
1198 }
1199
1200 /* We now have our final output caps, we can create the required converters */
1201 static gboolean
gst_audio_aggregator_negotiated_src_caps(GstAggregator * agg,GstCaps * caps)1202 gst_audio_aggregator_negotiated_src_caps (GstAggregator * agg, GstCaps * caps)
1203 {
1204 GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1205 GstAudioInfo info;
1206 GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
1207
1208 GST_INFO_OBJECT (agg, "src caps negotiated %" GST_PTR_FORMAT, caps);
1209
1210 if (!gst_audio_info_from_caps (&info, caps)) {
1211 GST_WARNING_OBJECT (aagg, "Rejecting invalid caps: %" GST_PTR_FORMAT, caps);
1212 return FALSE;
1213 }
1214
1215 GST_AUDIO_AGGREGATOR_LOCK (aagg);
1216 GST_OBJECT_LOCK (aagg);
1217
1218 if (!gst_audio_info_is_equal (&info, &srcpad->info)) {
1219 GstAudioInfo old_info = srcpad->info;
1220 GstAudioAggregatorPadClass *srcpad_klass =
1221 GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (agg->srcpad);
1222
1223 GST_INFO_OBJECT (aagg, "setting caps to %" GST_PTR_FORMAT, caps);
1224 gst_caps_replace (&aagg->current_caps, caps);
1225
1226 if (old_info.rate != info.rate)
1227 aagg->priv->offset = -1;
1228
1229 memcpy (&srcpad->info, &info, sizeof (info));
1230
1231 if (!gst_audio_aggregator_update_converters (aagg, &info, &old_info)) {
1232 GST_OBJECT_UNLOCK (aagg);
1233 GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1234 return FALSE;
1235 }
1236
1237 if (srcpad_klass->update_conversion_info)
1238 srcpad_klass->update_conversion_info (GST_AUDIO_AGGREGATOR_PAD (agg->
1239 srcpad));
1240
1241 if (aagg->priv->current_buffer) {
1242 GstBuffer *converted;
1243
1244 converted =
1245 gst_audio_aggregator_convert_buffer (aagg, agg->srcpad, &old_info,
1246 &info, aagg->priv->current_buffer);
1247 gst_buffer_unref (aagg->priv->current_buffer);
1248 aagg->priv->current_buffer = converted;
1249 if (!converted) {
1250 GST_OBJECT_UNLOCK (aagg);
1251 GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1252 return FALSE;
1253 }
1254 }
1255
1256 /* Force recalculating in aggregate */
1257 aagg->priv->samples_per_buffer = 0;
1258 }
1259
1260 GST_OBJECT_UNLOCK (aagg);
1261 GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1262
1263 return
1264 GST_AGGREGATOR_CLASS
1265 (gst_audio_aggregator_parent_class)->negotiated_src_caps (agg, caps);
1266 }
1267
1268 /* event handling */
1269
1270 static gboolean
gst_audio_aggregator_src_event(GstAggregator * agg,GstEvent * event)1271 gst_audio_aggregator_src_event (GstAggregator * agg, GstEvent * event)
1272 {
1273 gboolean result;
1274
1275 GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1276 GST_DEBUG_OBJECT (agg->srcpad, "Got %s event on src pad",
1277 GST_EVENT_TYPE_NAME (event));
1278
1279 switch (GST_EVENT_TYPE (event)) {
1280 case GST_EVENT_QOS:
1281 /* QoS might be tricky */
1282 gst_event_unref (event);
1283 return FALSE;
1284 case GST_EVENT_NAVIGATION:
1285 /* navigation is rather pointless. */
1286 gst_event_unref (event);
1287 return FALSE;
1288 break;
1289 case GST_EVENT_SEEK:
1290 {
1291 GstSeekFlags flags;
1292 gdouble rate;
1293 GstSeekType start_type, stop_type;
1294 gint64 start, stop;
1295 GstFormat seek_format, dest_format;
1296
1297 /* parse the seek parameters */
1298 gst_event_parse_seek (event, &rate, &seek_format, &flags, &start_type,
1299 &start, &stop_type, &stop);
1300
1301 /* Check the seeking parameters before linking up */
1302 if ((start_type != GST_SEEK_TYPE_NONE)
1303 && (start_type != GST_SEEK_TYPE_SET)) {
1304 result = FALSE;
1305 GST_DEBUG_OBJECT (aagg,
1306 "seeking failed, unhandled seek type for start: %d", start_type);
1307 goto done;
1308 }
1309 if ((stop_type != GST_SEEK_TYPE_NONE) && (stop_type != GST_SEEK_TYPE_SET)) {
1310 result = FALSE;
1311 GST_DEBUG_OBJECT (aagg,
1312 "seeking failed, unhandled seek type for end: %d", stop_type);
1313 goto done;
1314 }
1315
1316 GST_OBJECT_LOCK (agg);
1317 dest_format = GST_AGGREGATOR_PAD (agg->srcpad)->segment.format;
1318 GST_OBJECT_UNLOCK (agg);
1319 if (seek_format != dest_format) {
1320 result = FALSE;
1321 GST_DEBUG_OBJECT (aagg,
1322 "seeking failed, unhandled seek format: %s",
1323 gst_format_get_name (seek_format));
1324 goto done;
1325 }
1326 }
1327 break;
1328 default:
1329 break;
1330 }
1331
1332 return
1333 GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->src_event (agg,
1334 event);
1335
1336 done:
1337 return result;
1338 }
1339
1340
1341 static gboolean
gst_audio_aggregator_sink_event(GstAggregator * agg,GstAggregatorPad * aggpad,GstEvent * event)1342 gst_audio_aggregator_sink_event (GstAggregator * agg,
1343 GstAggregatorPad * aggpad, GstEvent * event)
1344 {
1345 GstAudioAggregatorPad *aaggpad = GST_AUDIO_AGGREGATOR_PAD (aggpad);
1346 gboolean res = TRUE;
1347
1348 GST_DEBUG_OBJECT (aggpad, "Got %s event on sink pad",
1349 GST_EVENT_TYPE_NAME (event));
1350
1351 switch (GST_EVENT_TYPE (event)) {
1352 case GST_EVENT_SEGMENT:
1353 {
1354 const GstSegment *segment;
1355 gst_event_parse_segment (event, &segment);
1356
1357 if (segment->format != GST_FORMAT_TIME) {
1358 GST_ERROR_OBJECT (aggpad, "Segment of type %s are not supported,"
1359 " only TIME segments are supported",
1360 gst_format_get_name (segment->format));
1361 gst_event_unref (event);
1362 event = NULL;
1363 res = FALSE;
1364 break;
1365 }
1366
1367 GST_OBJECT_LOCK (agg);
1368 if (segment->rate != GST_AGGREGATOR_PAD (agg->srcpad)->segment.rate) {
1369 GST_ERROR_OBJECT (aggpad,
1370 "Got segment event with wrong rate %lf, expected %lf",
1371 segment->rate, GST_AGGREGATOR_PAD (agg->srcpad)->segment.rate);
1372 res = FALSE;
1373 gst_event_unref (event);
1374 event = NULL;
1375 } else if (segment->rate < 0.0) {
1376 GST_ERROR_OBJECT (aggpad, "Negative rates not supported yet");
1377 res = FALSE;
1378 gst_event_unref (event);
1379 event = NULL;
1380 } else {
1381 GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (aggpad);
1382
1383 GST_OBJECT_LOCK (pad);
1384 pad->priv->new_segment = TRUE;
1385 gst_audio_aggregator_pad_reset_qos (pad);
1386 GST_OBJECT_UNLOCK (pad);
1387 }
1388 GST_OBJECT_UNLOCK (agg);
1389
1390 break;
1391 }
1392 case GST_EVENT_CAPS:
1393 {
1394 GstCaps *caps;
1395
1396 gst_event_parse_caps (event, &caps);
1397 GST_INFO_OBJECT (aggpad, "Got caps %" GST_PTR_FORMAT, caps);
1398 res = gst_audio_aggregator_sink_setcaps (aaggpad, agg, caps);
1399 gst_event_unref (event);
1400 event = NULL;
1401 break;
1402 }
1403 default:
1404 break;
1405 }
1406
1407 if (!res) {
1408 if (event)
1409 gst_event_unref (event);
1410 return res;
1411 }
1412
1413 if (event != NULL)
1414 return
1415 GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->sink_event
1416 (agg, aggpad, event);
1417
1418 return res;
1419 }
1420
1421 static gboolean
gst_audio_aggregator_sink_query(GstAggregator * agg,GstAggregatorPad * aggpad,GstQuery * query)1422 gst_audio_aggregator_sink_query (GstAggregator * agg, GstAggregatorPad * aggpad,
1423 GstQuery * query)
1424 {
1425 gboolean res = FALSE;
1426
1427 switch (GST_QUERY_TYPE (query)) {
1428 case GST_QUERY_CAPS:
1429 {
1430 GstCaps *filter, *caps;
1431
1432 gst_query_parse_caps (query, &filter);
1433 if (GST_IS_AUDIO_AGGREGATOR_CONVERT_PAD (aggpad)) {
1434 caps =
1435 gst_audio_aggregator_convert_sink_getcaps (GST_PAD (aggpad), agg,
1436 filter);
1437 } else {
1438 caps =
1439 gst_audio_aggregator_sink_getcaps (GST_PAD (aggpad), agg, filter);
1440 }
1441 gst_query_set_caps_result (query, caps);
1442 gst_caps_unref (caps);
1443 res = TRUE;
1444 break;
1445 }
1446 default:
1447 res =
1448 GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->sink_query
1449 (agg, aggpad, query);
1450 break;
1451 }
1452
1453 return res;
1454 }
1455
1456
1457 /* FIXME, the duration query should reflect how long you will produce
1458 * data, that is the amount of stream time until you will emit EOS.
1459 *
1460 * For synchronized mixing this is always the max of all the durations
1461 * of upstream since we emit EOS when all of them finished.
1462 *
1463 * We don't do synchronized mixing so this really depends on where the
1464 * streams where punched in and what their relative offsets are against
1465 * each other which we can get from the first timestamps we see.
1466 *
1467 * When we add a new stream (or remove a stream) the duration might
1468 * also become invalid again and we need to post a new DURATION
1469 * message to notify this fact to the parent.
1470 * For now we take the max of all the upstream elements so the simple
1471 * cases work at least somewhat.
1472 */
1473 static gboolean
gst_audio_aggregator_query_duration(GstAudioAggregator * aagg,GstQuery * query)1474 gst_audio_aggregator_query_duration (GstAudioAggregator * aagg,
1475 GstQuery * query)
1476 {
1477 gint64 max;
1478 gboolean res;
1479 GstFormat format;
1480 GstIterator *it;
1481 gboolean done;
1482 GValue item = { 0, };
1483
1484 /* parse format */
1485 gst_query_parse_duration (query, &format, NULL);
1486
1487 max = -1;
1488 res = TRUE;
1489 done = FALSE;
1490
1491 it = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (aagg));
1492 while (!done) {
1493 GstIteratorResult ires;
1494
1495 ires = gst_iterator_next (it, &item);
1496 switch (ires) {
1497 case GST_ITERATOR_DONE:
1498 done = TRUE;
1499 break;
1500 case GST_ITERATOR_OK:
1501 {
1502 GstPad *pad = g_value_get_object (&item);
1503 gint64 duration;
1504
1505 /* ask sink peer for duration */
1506 res &= gst_pad_peer_query_duration (pad, format, &duration);
1507 /* take max from all valid return values */
1508 if (res) {
1509 /* valid unknown length, stop searching */
1510 if (duration == -1) {
1511 max = duration;
1512 done = TRUE;
1513 }
1514 /* else see if bigger than current max */
1515 else if (duration > max)
1516 max = duration;
1517 }
1518 g_value_reset (&item);
1519 break;
1520 }
1521 case GST_ITERATOR_RESYNC:
1522 max = -1;
1523 res = TRUE;
1524 gst_iterator_resync (it);
1525 break;
1526 default:
1527 res = FALSE;
1528 done = TRUE;
1529 break;
1530 }
1531 }
1532 g_value_unset (&item);
1533 gst_iterator_free (it);
1534
1535 if (res) {
1536 /* and store the max */
1537 GST_DEBUG_OBJECT (aagg, "Total duration in format %s: %"
1538 GST_TIME_FORMAT, gst_format_get_name (format), GST_TIME_ARGS (max));
1539 gst_query_set_duration (query, format, max);
1540 }
1541
1542 return res;
1543 }
1544
1545
1546 static gboolean
gst_audio_aggregator_src_query(GstAggregator * agg,GstQuery * query)1547 gst_audio_aggregator_src_query (GstAggregator * agg, GstQuery * query)
1548 {
1549 GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1550 GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
1551 gboolean res = FALSE;
1552
1553 switch (GST_QUERY_TYPE (query)) {
1554 case GST_QUERY_DURATION:
1555 res = gst_audio_aggregator_query_duration (aagg, query);
1556 break;
1557 case GST_QUERY_POSITION:
1558 {
1559 GstFormat format;
1560
1561 gst_query_parse_position (query, &format, NULL);
1562
1563 GST_OBJECT_LOCK (aagg);
1564
1565 switch (format) {
1566 case GST_FORMAT_TIME:
1567 gst_query_set_position (query, format,
1568 gst_segment_to_stream_time (&GST_AGGREGATOR_PAD (agg->srcpad)->
1569 segment, GST_FORMAT_TIME,
1570 GST_AGGREGATOR_PAD (agg->srcpad)->segment.position));
1571 res = TRUE;
1572 break;
1573 case GST_FORMAT_BYTES:
1574 if (GST_AUDIO_INFO_BPF (&srcpad->info)) {
1575 gst_query_set_position (query, format, aagg->priv->offset *
1576 GST_AUDIO_INFO_BPF (&srcpad->info));
1577 res = TRUE;
1578 }
1579 break;
1580 case GST_FORMAT_DEFAULT:
1581 gst_query_set_position (query, format, aagg->priv->offset);
1582 res = TRUE;
1583 break;
1584 default:
1585 break;
1586 }
1587
1588 GST_OBJECT_UNLOCK (aagg);
1589
1590 break;
1591 }
1592 default:
1593 res =
1594 GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->src_query
1595 (agg, query);
1596 break;
1597 }
1598
1599 return res;
1600 }
1601
1602
1603 void
gst_audio_aggregator_set_sink_caps(GstAudioAggregator * aagg,GstAudioAggregatorPad * pad,GstCaps * caps)1604 gst_audio_aggregator_set_sink_caps (GstAudioAggregator * aagg,
1605 GstAudioAggregatorPad * pad, GstCaps * caps)
1606 {
1607 #ifndef G_DISABLE_ASSERT
1608 gboolean valid;
1609
1610 GST_OBJECT_LOCK (pad);
1611 valid = gst_audio_info_from_caps (&pad->info, caps);
1612 g_assert (valid);
1613 GST_OBJECT_UNLOCK (pad);
1614 #else
1615 GST_OBJECT_LOCK (pad);
1616 (void) gst_audio_info_from_caps (&pad->info, caps);
1617 GST_OBJECT_UNLOCK (pad);
1618 #endif
1619 }
1620
1621 /* Must hold object lock and aagg lock to call */
1622
1623 static void
gst_audio_aggregator_reset(GstAudioAggregator * aagg)1624 gst_audio_aggregator_reset (GstAudioAggregator * aagg)
1625 {
1626 GstAggregator *agg = GST_AGGREGATOR (aagg);
1627
1628 GST_AUDIO_AGGREGATOR_LOCK (aagg);
1629 GST_OBJECT_LOCK (aagg);
1630 GST_AGGREGATOR_PAD (agg->srcpad)->segment.position = -1;
1631 aagg->priv->offset = -1;
1632 gst_audio_info_init (&GST_AUDIO_AGGREGATOR_PAD (agg->srcpad)->info);
1633 gst_caps_replace (&aagg->current_caps, NULL);
1634 gst_buffer_replace (&aagg->priv->current_buffer, NULL);
1635 aagg->priv->accumulated_error = 0;
1636 GST_OBJECT_UNLOCK (aagg);
1637 GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1638 }
1639
1640 static gboolean
gst_audio_aggregator_start(GstAggregator * agg)1641 gst_audio_aggregator_start (GstAggregator * agg)
1642 {
1643 GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1644
1645 gst_audio_aggregator_reset (aagg);
1646
1647 return TRUE;
1648 }
1649
1650 static gboolean
gst_audio_aggregator_stop(GstAggregator * agg)1651 gst_audio_aggregator_stop (GstAggregator * agg)
1652 {
1653 GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1654
1655 gst_audio_aggregator_reset (aagg);
1656
1657 return TRUE;
1658 }
1659
1660 static GstFlowReturn
gst_audio_aggregator_flush(GstAggregator * agg)1661 gst_audio_aggregator_flush (GstAggregator * agg)
1662 {
1663 GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1664
1665 GST_AUDIO_AGGREGATOR_LOCK (aagg);
1666 GST_OBJECT_LOCK (aagg);
1667 GST_AGGREGATOR_PAD (agg->srcpad)->segment.position = -1;
1668 aagg->priv->offset = -1;
1669 aagg->priv->accumulated_error = 0;
1670 gst_buffer_replace (&aagg->priv->current_buffer, NULL);
1671 GST_OBJECT_UNLOCK (aagg);
1672 GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1673
1674 return GST_FLOW_OK;
1675 }
1676
1677 static GstBuffer *
gst_audio_aggregator_do_clip(GstAggregator * agg,GstAggregatorPad * bpad,GstBuffer * buffer)1678 gst_audio_aggregator_do_clip (GstAggregator * agg,
1679 GstAggregatorPad * bpad, GstBuffer * buffer)
1680 {
1681 GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (bpad);
1682 gint rate, bpf;
1683
1684 /* Guard against invalid audio info, we just don't clip here then */
1685 if (!GST_AUDIO_INFO_IS_VALID (&pad->info))
1686 return buffer;
1687
1688 GST_OBJECT_LOCK (bpad);
1689 rate = GST_AUDIO_INFO_RATE (&pad->info);
1690 bpf = GST_AUDIO_INFO_BPF (&pad->info);
1691 buffer = gst_audio_buffer_clip (buffer, &bpad->segment, rate, bpf);
1692 GST_OBJECT_UNLOCK (bpad);
1693
1694 return buffer;
1695 }
1696
1697
1698 /* Called with the object lock for both the element and pad held,
1699 * as well as the audio aggregator lock.
1700 * Should only be called on the output queue.
1701 */
1702 static GstClockTime
gst_audio_aggregator_pad_enqueue_qos_message(GstAudioAggregatorPad * pad,GstAudioAggregator * aagg,guint64 samples)1703 gst_audio_aggregator_pad_enqueue_qos_message (GstAudioAggregatorPad * pad,
1704 GstAudioAggregator * aagg, guint64 samples)
1705 {
1706 GstAggregator *agg = GST_AGGREGATOR (aagg);
1707 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1708 GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
1709
1710 guint rate_output = GST_AUDIO_INFO_RATE (&srcpad->info);
1711 GstClockTime offset = gst_util_uint64_scale (GST_SECOND, pad->priv->position,
1712 rate_output);
1713 GstClockTime timestamp = GST_BUFFER_PTS (pad->priv->buffer) + offset;
1714 GstClockTime running_time =
1715 gst_segment_to_running_time (&aggpad->segment, GST_FORMAT_TIME,
1716 timestamp);
1717 GstClockTime stream_time = gst_segment_to_stream_time (&aggpad->segment,
1718 GST_FORMAT_TIME, timestamp);
1719 GstClockTime duration;
1720 guint rate_input;
1721 guint64 processed, dropped;
1722 GstMessage *msg;
1723
1724 if (!pad->priv->qos_messages)
1725 return running_time;
1726
1727 if (GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (pad)->convert_buffer)
1728 rate_input = GST_AUDIO_INFO_RATE (&srcpad->info);
1729 else
1730 rate_input = GST_AUDIO_INFO_RATE (&pad->info);
1731
1732 duration = gst_util_uint64_scale (samples, GST_SECOND, rate_input);
1733
1734 processed = gst_util_uint64_scale (pad->priv->processed, rate_input,
1735 rate_output);
1736 dropped = gst_util_uint64_scale (pad->priv->dropped, rate_output,
1737 rate_output);
1738
1739 msg = gst_message_new_qos (GST_OBJECT (aggpad), TRUE, running_time,
1740 stream_time, timestamp, duration);
1741 gst_message_set_qos_stats (msg, GST_FORMAT_DEFAULT, processed, dropped);
1742
1743 g_queue_push_tail (&aagg->priv->messages, msg);
1744
1745 return running_time;
1746 }
1747
1748 static void
gst_audio_aggregator_post_messages(GstAudioAggregator * aagg)1749 gst_audio_aggregator_post_messages (GstAudioAggregator * aagg)
1750 {
1751 if (g_queue_get_length (&aagg->priv->messages) != 0) {
1752 GstClockTime latency = gst_aggregator_get_latency (GST_AGGREGATOR (aagg));
1753 gboolean is_live = GST_CLOCK_TIME_IS_VALID (latency);
1754 GstElement *e = GST_ELEMENT (aagg);
1755 GstMessage *msg;
1756
1757 while ((msg = g_queue_pop_head (&aagg->priv->messages))) {
1758 if (is_live) {
1759 GstStructure *s = gst_message_writable_structure (msg);
1760 gst_structure_set (s, "live", G_TYPE_BOOLEAN, TRUE, NULL);
1761 }
1762
1763 gst_element_post_message (e, msg);
1764 }
1765 }
1766 }
1767
1768 /* Called with the object lock for both the element and pad held,
1769 * as well as the aagg lock
1770 *
1771 * Replace the current buffer with input and update GstAudioAggregatorPadPrivate
1772 * values.
1773 */
1774 static gboolean
gst_audio_aggregator_fill_buffer(GstAudioAggregator * aagg,GstAudioAggregatorPad * pad)1775 gst_audio_aggregator_fill_buffer (GstAudioAggregator * aagg,
1776 GstAudioAggregatorPad * pad)
1777 {
1778 GstClockTime start_time, end_time;
1779 gboolean discont = FALSE;
1780 guint64 start_offset, end_offset;
1781 gint rate, bpf;
1782
1783 GstAggregator *agg = GST_AGGREGATOR (aagg);
1784 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1785 GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
1786
1787 if (GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (pad)->convert_buffer) {
1788 rate = GST_AUDIO_INFO_RATE (&srcpad->info);
1789 bpf = GST_AUDIO_INFO_BPF (&srcpad->info);
1790 } else {
1791 rate = GST_AUDIO_INFO_RATE (&pad->info);
1792 bpf = GST_AUDIO_INFO_BPF (&pad->info);
1793 }
1794
1795 pad->priv->position = 0;
1796 pad->priv->size = gst_buffer_get_size (pad->priv->buffer) / bpf;
1797
1798 if (pad->priv->size == 0) {
1799 if (!GST_BUFFER_DURATION_IS_VALID (pad->priv->buffer) ||
1800 !GST_BUFFER_FLAG_IS_SET (pad->priv->buffer, GST_BUFFER_FLAG_GAP)) {
1801 GST_WARNING_OBJECT (pad, "Dropping 0-sized buffer missing either a"
1802 " duration or a GAP flag: %" GST_PTR_FORMAT, pad->priv->buffer);
1803 return FALSE;
1804 }
1805
1806 pad->priv->size =
1807 gst_util_uint64_scale (GST_BUFFER_DURATION (pad->priv->buffer), rate,
1808 GST_SECOND);
1809 }
1810
1811 if (!GST_BUFFER_PTS_IS_VALID (pad->priv->buffer)) {
1812 if (pad->priv->output_offset == -1)
1813 pad->priv->output_offset = aagg->priv->offset;
1814 if (pad->priv->next_offset == -1)
1815 pad->priv->next_offset = pad->priv->size;
1816 else
1817 pad->priv->next_offset += pad->priv->size;
1818 goto done;
1819 }
1820
1821 start_time = GST_BUFFER_PTS (pad->priv->buffer);
1822 end_time =
1823 start_time + gst_util_uint64_scale_ceil (pad->priv->size, GST_SECOND,
1824 rate);
1825
1826 /* Clipping should've ensured this */
1827 g_assert (start_time >= aggpad->segment.start);
1828
1829 start_offset =
1830 gst_util_uint64_scale (start_time - aggpad->segment.start, rate,
1831 GST_SECOND);
1832 end_offset = start_offset + pad->priv->size;
1833
1834 if (GST_BUFFER_IS_DISCONT (pad->priv->buffer)
1835 || GST_BUFFER_FLAG_IS_SET (pad->priv->buffer, GST_BUFFER_FLAG_RESYNC)
1836 || pad->priv->new_segment || pad->priv->next_offset == -1) {
1837 discont = TRUE;
1838 pad->priv->new_segment = FALSE;
1839 } else {
1840 guint64 diff, max_sample_diff;
1841
1842 /* Check discont, based on audiobasesink */
1843 if (start_offset <= pad->priv->next_offset)
1844 diff = pad->priv->next_offset - start_offset;
1845 else
1846 diff = start_offset - pad->priv->next_offset;
1847
1848 max_sample_diff =
1849 gst_util_uint64_scale_int (aagg->priv->alignment_threshold, rate,
1850 GST_SECOND);
1851
1852 /* Discont! */
1853 if (G_UNLIKELY (diff >= max_sample_diff)) {
1854 if (aagg->priv->discont_wait > 0) {
1855 if (pad->priv->discont_time == GST_CLOCK_TIME_NONE) {
1856 pad->priv->discont_time = start_time;
1857 } else if (start_time - pad->priv->discont_time >=
1858 aagg->priv->discont_wait) {
1859 discont = TRUE;
1860 pad->priv->discont_time = GST_CLOCK_TIME_NONE;
1861 }
1862 } else {
1863 discont = TRUE;
1864 }
1865 } else if (G_UNLIKELY (pad->priv->discont_time != GST_CLOCK_TIME_NONE)) {
1866 /* we have had a discont, but are now back on track! */
1867 pad->priv->discont_time = GST_CLOCK_TIME_NONE;
1868 }
1869 }
1870
1871 if (discont) {
1872 /* Have discont, need resync */
1873 if (pad->priv->next_offset != -1)
1874 GST_DEBUG_OBJECT (pad, "Have discont. Expected %"
1875 G_GUINT64_FORMAT ", got %" G_GUINT64_FORMAT,
1876 pad->priv->next_offset, start_offset);
1877 pad->priv->next_offset = -1;
1878 } else {
1879 pad->priv->next_offset += pad->priv->size;
1880 }
1881
1882 if (pad->priv->output_offset == -1 || discont) {
1883 GstClockTime start_running_time;
1884 GstClockTime end_running_time;
1885 GstClockTime segment_pos;
1886 guint64 start_output_offset = -1;
1887 guint64 end_output_offset = -1;
1888 GstSegment *agg_segment = &GST_AGGREGATOR_PAD (agg->srcpad)->segment;
1889
1890 start_running_time =
1891 gst_segment_to_running_time (&aggpad->segment,
1892 GST_FORMAT_TIME, start_time);
1893 end_running_time =
1894 gst_segment_to_running_time (&aggpad->segment,
1895 GST_FORMAT_TIME, end_time);
1896
1897 /* Convert to position in the output segment */
1898 segment_pos =
1899 gst_segment_position_from_running_time (agg_segment, GST_FORMAT_TIME,
1900 start_running_time);
1901 if (GST_CLOCK_TIME_IS_VALID (segment_pos))
1902 start_output_offset =
1903 gst_util_uint64_scale (segment_pos - agg_segment->start, rate,
1904 GST_SECOND);
1905
1906 segment_pos =
1907 gst_segment_position_from_running_time (agg_segment, GST_FORMAT_TIME,
1908 end_running_time);
1909 if (GST_CLOCK_TIME_IS_VALID (segment_pos))
1910 end_output_offset =
1911 gst_util_uint64_scale (segment_pos - agg_segment->start, rate,
1912 GST_SECOND);
1913
1914 if (start_output_offset == -1 && end_output_offset == -1) {
1915 /* Outside output segment, drop */
1916 pad->priv->position = 0;
1917 pad->priv->size = 0;
1918 GST_DEBUG_OBJECT (pad, "Buffer outside output segment");
1919 return FALSE;
1920 }
1921
1922 /* Calculate end_output_offset if it was outside the output segment */
1923 if (end_output_offset == -1)
1924 end_output_offset = start_output_offset + pad->priv->size;
1925
1926 if (end_output_offset < aagg->priv->offset) {
1927 GstClockTime rt;
1928
1929 pad->priv->dropped += pad->priv->size;
1930 rt = gst_audio_aggregator_pad_enqueue_qos_message (pad, aagg,
1931 pad->priv->size);
1932 GST_DEBUG_OBJECT (pad, "Dropped buffer of %u samples at running time %"
1933 GST_TIME_FORMAT " because input buffer is entirely before current"
1934 " output offset", pad->priv->size, GST_TIME_ARGS (rt));
1935
1936 pad->priv->position = 0;
1937 pad->priv->size = 0;
1938 GST_DEBUG_OBJECT (pad,
1939 "Buffer before segment or current position: %" G_GUINT64_FORMAT " < %"
1940 G_GINT64_FORMAT, end_output_offset, aagg->priv->offset);
1941 return FALSE;
1942 }
1943
1944 if (start_output_offset == -1 ||
1945 start_output_offset < aagg->priv->offset ||
1946 (pad->priv->output_offset != -1 &&
1947 start_output_offset < pad->priv->output_offset)) {
1948 guint diff;
1949
1950 if (start_output_offset == -1 && end_output_offset < pad->priv->size) {
1951 diff = pad->priv->size - end_output_offset + aagg->priv->offset;
1952 } else if (start_output_offset == -1) {
1953 start_output_offset = end_output_offset - pad->priv->size;
1954
1955 if (start_output_offset < aagg->priv->offset)
1956 diff = aagg->priv->offset - start_output_offset;
1957 else
1958 diff = 0;
1959 } else if (pad->priv->output_offset != -1 &&
1960 start_output_offset < pad->priv->output_offset) {
1961 diff = pad->priv->output_offset - start_output_offset;
1962 } else {
1963 diff = aagg->priv->offset - start_output_offset;
1964 }
1965
1966 pad->priv->dropped += MIN (diff, pad->priv->size);
1967 if (diff != 0) {
1968 GstClockTime rt;
1969
1970 rt = gst_audio_aggregator_pad_enqueue_qos_message (pad, aagg, diff);
1971 GST_DEBUG_OBJECT (pad, "Dropped %u samples at running time %"
1972 GST_TIME_FORMAT " because input buffer starts before current"
1973 " output offset", diff, GST_TIME_ARGS (rt));
1974 }
1975
1976 pad->priv->position += diff;
1977 if (start_output_offset != -1)
1978 start_output_offset += diff;
1979 if (pad->priv->position >= pad->priv->size) {
1980 /* Empty buffer, drop */
1981 pad->priv->dropped += pad->priv->size;
1982 pad->priv->position = 0;
1983 pad->priv->size = 0;
1984 GST_DEBUG_OBJECT (pad,
1985 "Buffer before segment or current position: %" G_GUINT64_FORMAT
1986 " < %" G_GINT64_FORMAT, end_output_offset, aagg->priv->offset);
1987 return FALSE;
1988 }
1989 }
1990
1991 if (start_output_offset == -1)
1992 pad->priv->output_offset = aagg->priv->offset;
1993 else
1994 pad->priv->output_offset = start_output_offset;
1995
1996 if (pad->priv->next_offset == -1)
1997 pad->priv->next_offset = end_offset;
1998
1999 GST_DEBUG_OBJECT (pad,
2000 "Buffer resynced: Pad offset %" G_GUINT64_FORMAT
2001 ", current audio aggregator offset %" G_GINT64_FORMAT,
2002 pad->priv->output_offset, aagg->priv->offset);
2003 }
2004
2005 done:
2006
2007 GST_LOG_OBJECT (pad,
2008 "Queued new buffer at offset %" G_GUINT64_FORMAT,
2009 pad->priv->output_offset);
2010
2011 return TRUE;
2012 }
2013
2014 /* Called with pad object lock held */
2015
2016 static gboolean
gst_audio_aggregator_mix_buffer(GstAudioAggregator * aagg,GstAudioAggregatorPad * pad,GstBuffer * inbuf,GstBuffer * outbuf,guint blocksize)2017 gst_audio_aggregator_mix_buffer (GstAudioAggregator * aagg,
2018 GstAudioAggregatorPad * pad, GstBuffer * inbuf, GstBuffer * outbuf,
2019 guint blocksize)
2020 {
2021 guint overlap;
2022 guint out_start;
2023 gboolean filled;
2024 guint in_offset;
2025 gboolean pad_changed = FALSE;
2026
2027 /* Overlap => mix */
2028 if (aagg->priv->offset < pad->priv->output_offset)
2029 out_start = pad->priv->output_offset - aagg->priv->offset;
2030 else
2031 out_start = 0;
2032
2033 overlap = pad->priv->size - pad->priv->position;
2034 if (overlap > blocksize - out_start)
2035 overlap = blocksize - out_start;
2036
2037 if (GST_BUFFER_FLAG_IS_SET (inbuf, GST_BUFFER_FLAG_GAP)) {
2038 /* skip gap buffer */
2039 GST_LOG_OBJECT (pad, "skipping GAP buffer");
2040 pad->priv->output_offset += pad->priv->size - pad->priv->position;
2041 pad->priv->position = pad->priv->size;
2042
2043 gst_buffer_replace (&pad->priv->buffer, NULL);
2044 return FALSE;
2045 }
2046
2047 gst_buffer_ref (inbuf);
2048 in_offset = pad->priv->position;
2049 GST_OBJECT_UNLOCK (pad);
2050 GST_OBJECT_UNLOCK (aagg);
2051
2052 filled = GST_AUDIO_AGGREGATOR_GET_CLASS (aagg)->aggregate_one_buffer (aagg,
2053 pad, inbuf, in_offset, outbuf, out_start, overlap);
2054
2055 GST_OBJECT_LOCK (aagg);
2056 GST_OBJECT_LOCK (pad);
2057
2058 pad_changed = (inbuf != pad->priv->buffer);
2059 gst_buffer_unref (inbuf);
2060
2061 if (filled)
2062 GST_BUFFER_FLAG_UNSET (outbuf, GST_BUFFER_FLAG_GAP);
2063
2064 if (pad_changed)
2065 return FALSE;
2066
2067 pad->priv->processed += overlap;
2068 pad->priv->position += overlap;
2069 pad->priv->output_offset += overlap;
2070
2071 if (pad->priv->position == pad->priv->size) {
2072 /* Buffer done, drop it */
2073 gst_buffer_replace (&pad->priv->buffer, NULL);
2074 GST_LOG_OBJECT (pad, "Finished mixing buffer, waiting for next");
2075 return FALSE;
2076 }
2077
2078 return TRUE;
2079 }
2080
2081 static GstBuffer *
gst_audio_aggregator_create_output_buffer(GstAudioAggregator * aagg,guint num_frames)2082 gst_audio_aggregator_create_output_buffer (GstAudioAggregator * aagg,
2083 guint num_frames)
2084 {
2085 GstAllocator *allocator;
2086 GstAllocationParams params;
2087 GstBuffer *outbuf;
2088 GstMapInfo outmap;
2089 GstAggregator *agg = GST_AGGREGATOR (aagg);
2090 GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
2091
2092 gst_aggregator_get_allocator (GST_AGGREGATOR (aagg), &allocator, ¶ms);
2093
2094 GST_DEBUG ("Creating output buffer with size %d",
2095 num_frames * GST_AUDIO_INFO_BPF (&srcpad->info));
2096
2097 outbuf = gst_buffer_new_allocate (allocator, num_frames *
2098 GST_AUDIO_INFO_BPF (&srcpad->info), ¶ms);
2099
2100 if (allocator)
2101 gst_object_unref (allocator);
2102
2103 gst_buffer_map (outbuf, &outmap, GST_MAP_WRITE);
2104 gst_audio_format_info_fill_silence (srcpad->info.finfo, outmap.data,
2105 outmap.size);
2106 gst_buffer_unmap (outbuf, &outmap);
2107
2108 return outbuf;
2109 }
2110
2111 static gboolean
sync_pad_values(GstElement * aagg,GstPad * pad,gpointer user_data)2112 sync_pad_values (GstElement * aagg, GstPad * pad, gpointer user_data)
2113 {
2114 GstAudioAggregatorPad *aapad = GST_AUDIO_AGGREGATOR_PAD (pad);
2115 GstAggregatorPad *bpad = GST_AGGREGATOR_PAD_CAST (pad);
2116 GstClockTime timestamp, stream_time;
2117
2118 if (aapad->priv->buffer == NULL)
2119 return TRUE;
2120
2121 timestamp = GST_BUFFER_PTS (aapad->priv->buffer);
2122 GST_OBJECT_LOCK (bpad);
2123 stream_time = gst_segment_to_stream_time (&bpad->segment, GST_FORMAT_TIME,
2124 timestamp);
2125 GST_OBJECT_UNLOCK (bpad);
2126
2127 /* sync object properties on stream time */
2128 /* TODO: Ideally we would want to do that on every sample */
2129 if (GST_CLOCK_TIME_IS_VALID (stream_time))
2130 gst_object_sync_values (GST_OBJECT_CAST (pad), stream_time);
2131
2132 return TRUE;
2133 }
2134
2135 static GstSample *
gst_audio_aggregator_peek_next_sample(GstAggregator * agg,GstAggregatorPad * aggpad)2136 gst_audio_aggregator_peek_next_sample (GstAggregator * agg,
2137 GstAggregatorPad * aggpad)
2138 {
2139 GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
2140 GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (aggpad);
2141 GstSample *sample = NULL;
2142
2143 if (pad->priv->buffer && pad->priv->output_offset >= aagg->priv->offset
2144 && pad->priv->output_offset <
2145 aagg->priv->offset + aagg->priv->samples_per_buffer) {
2146 GstCaps *caps = gst_pad_get_current_caps (GST_PAD (aggpad));
2147 GstStructure *info =
2148 gst_structure_new ("GstAudioAggregatorPadNextSampleInfo",
2149 "output-offset", G_TYPE_UINT64, pad->priv->output_offset,
2150 "position", G_TYPE_UINT, pad->priv->position,
2151 "size", G_TYPE_UINT, pad->priv->size,
2152 NULL);
2153
2154 sample = gst_sample_new (pad->priv->buffer, caps, &aggpad->segment, info);
2155 gst_caps_unref (caps);
2156 gst_structure_free (info);
2157 }
2158
2159 return sample;
2160 }
2161
2162 static GstFlowReturn
gst_audio_aggregator_aggregate(GstAggregator * agg,gboolean timeout)2163 gst_audio_aggregator_aggregate (GstAggregator * agg, gboolean timeout)
2164 {
2165 /* Calculate the current output offset/timestamp and offset_end/timestamp_end.
2166 * Allocate a silence buffer for this and store it.
2167 *
2168 * For all pads:
2169 * 1) Once per input buffer (cached)
2170 * 1) Check discont (flag and timestamp with tolerance)
2171 * 2) If discont or new, resync. That means:
2172 * 1) Drop all start data of the buffer that comes before
2173 * the current position/offset.
2174 * 2) Calculate the offset (output segment!) that the first
2175 * frame of the input buffer corresponds to. Base this on
2176 * the running time.
2177 *
2178 * 2) If the current pad's offset/offset_end overlaps with the output
2179 * offset/offset_end, mix it at the appropriate position in the output
2180 * buffer and advance the pad's position. Remember if this pad needs
2181 * a new buffer to advance behind the output offset_end.
2182 *
2183 * If we had no pad with a buffer, go EOS.
2184 *
2185 * If we had at least one pad that did not advance behind output
2186 * offset_end, let aggregate be called again for the current
2187 * output offset/offset_end.
2188 */
2189 GstElement *element;
2190 GstAudioAggregator *aagg;
2191 GList *iter;
2192 GstFlowReturn ret;
2193 GstBuffer *outbuf = NULL;
2194 gint64 next_offset;
2195 gint64 next_timestamp;
2196 gint rate, bpf;
2197 gboolean dropped = FALSE;
2198 gboolean is_eos = TRUE;
2199 gboolean is_done = TRUE;
2200 guint blocksize;
2201 GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
2202 GstSegment *agg_segment = &GST_AGGREGATOR_PAD (agg->srcpad)->segment;
2203
2204 element = GST_ELEMENT (agg);
2205 aagg = GST_AUDIO_AGGREGATOR (agg);
2206
2207 /* Sync pad properties to the stream time */
2208 gst_element_foreach_sink_pad (element, sync_pad_values, NULL);
2209
2210 GST_AUDIO_AGGREGATOR_LOCK (aagg);
2211 GST_OBJECT_LOCK (agg);
2212
2213 if (aagg->priv->samples_per_buffer == 0) {
2214 if (!gst_audio_aggregator_update_samples_per_buffer (aagg)) {
2215 GST_ERROR_OBJECT (aagg,
2216 "Failed to calculate the number of samples per buffer");
2217 GST_OBJECT_UNLOCK (agg);
2218 goto not_negotiated;
2219 }
2220 }
2221
2222 /* Update position from the segment start/stop if needed */
2223 if (agg_segment->position == -1) {
2224 if (agg_segment->rate > 0.0)
2225 agg_segment->position = agg_segment->start;
2226 else
2227 agg_segment->position = agg_segment->stop;
2228 }
2229
2230 rate = GST_AUDIO_INFO_RATE (&srcpad->info);
2231 bpf = GST_AUDIO_INFO_BPF (&srcpad->info);
2232
2233 if (G_UNLIKELY (srcpad->info.finfo->format == GST_AUDIO_FORMAT_UNKNOWN)) {
2234 if (timeout) {
2235 GstClockTime output_buffer_duration;
2236 GST_DEBUG_OBJECT (aagg,
2237 "Got timeout before receiving any caps, don't output anything");
2238
2239 blocksize = aagg->priv->samples_per_buffer;
2240 if (aagg->priv->error_per_buffer + aagg->priv->accumulated_error >=
2241 aagg->priv->output_buffer_duration_d)
2242 blocksize += 1;
2243 aagg->priv->accumulated_error =
2244 (aagg->priv->accumulated_error +
2245 aagg->priv->error_per_buffer) % aagg->priv->output_buffer_duration_d;
2246
2247 output_buffer_duration =
2248 gst_util_uint64_scale (blocksize, GST_SECOND, rate);
2249
2250 /* Advance position */
2251 if (agg_segment->rate > 0.0)
2252 agg_segment->position += output_buffer_duration;
2253 else if (agg_segment->position > output_buffer_duration)
2254 agg_segment->position -= output_buffer_duration;
2255 else
2256 agg_segment->position = 0;
2257
2258 GST_OBJECT_UNLOCK (agg);
2259 GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
2260 return GST_AGGREGATOR_FLOW_NEED_DATA;
2261 } else {
2262 GST_OBJECT_UNLOCK (agg);
2263 goto not_negotiated;
2264 }
2265 }
2266
2267 if (aagg->priv->offset == -1) {
2268 aagg->priv->offset =
2269 gst_util_uint64_scale (agg_segment->position - agg_segment->start, rate,
2270 GST_SECOND);
2271 GST_DEBUG_OBJECT (aagg, "Starting at offset %" G_GINT64_FORMAT,
2272 aagg->priv->offset);
2273 }
2274
2275 if (aagg->priv->current_buffer == NULL) {
2276 blocksize = aagg->priv->samples_per_buffer;
2277
2278 if (aagg->priv->error_per_buffer + aagg->priv->accumulated_error >=
2279 aagg->priv->output_buffer_duration_d)
2280 blocksize += 1;
2281
2282 aagg->priv->current_blocksize = blocksize;
2283
2284 aagg->priv->accumulated_error =
2285 (aagg->priv->accumulated_error +
2286 aagg->priv->error_per_buffer) % aagg->priv->output_buffer_duration_d;
2287
2288 GST_OBJECT_UNLOCK (agg);
2289 aagg->priv->current_buffer =
2290 GST_AUDIO_AGGREGATOR_GET_CLASS (aagg)->create_output_buffer (aagg,
2291 blocksize);
2292 /* Be careful, some things could have changed ? */
2293 GST_OBJECT_LOCK (agg);
2294 GST_BUFFER_FLAG_SET (aagg->priv->current_buffer, GST_BUFFER_FLAG_GAP);
2295 } else {
2296 blocksize = aagg->priv->current_blocksize;
2297 }
2298
2299 /* FIXME: Reverse mixing does not work at all yet */
2300 if (agg_segment->rate > 0.0) {
2301 next_offset = aagg->priv->offset + blocksize;
2302 } else {
2303 next_offset = aagg->priv->offset - blocksize;
2304 }
2305
2306 /* Use the sample counter, which will never accumulate rounding errors */
2307 next_timestamp =
2308 agg_segment->start + gst_util_uint64_scale (next_offset, GST_SECOND,
2309 rate);
2310
2311 outbuf = aagg->priv->current_buffer;
2312
2313 GST_LOG_OBJECT (agg,
2314 "Starting to mix %u samples for offset %" G_GINT64_FORMAT
2315 " with timestamp %" GST_TIME_FORMAT, blocksize,
2316 aagg->priv->offset, GST_TIME_ARGS (agg_segment->position));
2317
2318 for (iter = element->sinkpads; iter; iter = iter->next) {
2319 GstAudioAggregatorPad *pad = (GstAudioAggregatorPad *) iter->data;
2320 GstAggregatorPad *aggpad = (GstAggregatorPad *) iter->data;
2321 gboolean pad_eos = gst_aggregator_pad_is_eos (aggpad);
2322 GstBuffer *input_buffer;
2323
2324 if (gst_aggregator_pad_is_inactive (aggpad))
2325 continue;
2326
2327 if (!pad_eos)
2328 is_eos = FALSE;
2329
2330 input_buffer = gst_aggregator_pad_peek_buffer (aggpad);
2331
2332 GST_OBJECT_LOCK (pad);
2333 if (!input_buffer) {
2334 if (timeout) {
2335 if (pad->priv->output_offset < next_offset) {
2336 gint64 diff = next_offset - pad->priv->output_offset;
2337 GST_DEBUG_OBJECT (pad, "Timeout, missing %" G_GINT64_FORMAT
2338 " frames (%" GST_TIME_FORMAT ")", diff,
2339 GST_TIME_ARGS (gst_util_uint64_scale (diff, GST_SECOND,
2340 GST_AUDIO_INFO_RATE (&srcpad->info))));
2341 }
2342 } else if (!pad_eos) {
2343 is_done = FALSE;
2344 }
2345 GST_OBJECT_UNLOCK (pad);
2346 continue;
2347 } else if (!GST_AUDIO_INFO_IS_VALID (&pad->info)) {
2348 GST_OBJECT_UNLOCK (pad);
2349 GST_OBJECT_UNLOCK (agg);
2350 goto not_negotiated;
2351 }
2352
2353 /* New buffer? */
2354 if (!pad->priv->buffer) {
2355 if (GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (pad)->convert_buffer) {
2356 pad->priv->buffer =
2357 gst_audio_aggregator_convert_buffer
2358 (aagg, GST_PAD (pad), &pad->info, &srcpad->info, input_buffer);
2359 if (!pad->priv->buffer) {
2360 GST_OBJECT_UNLOCK (pad);
2361 GST_OBJECT_UNLOCK (agg);
2362 goto not_negotiated;
2363 }
2364 } else {
2365 pad->priv->buffer = gst_buffer_ref (input_buffer);
2366 }
2367
2368 if (!gst_audio_aggregator_fill_buffer (aagg, pad)) {
2369 gst_buffer_replace (&pad->priv->buffer, NULL);
2370 gst_buffer_unref (input_buffer);
2371 dropped = TRUE;
2372 GST_OBJECT_UNLOCK (pad);
2373
2374 gst_aggregator_pad_drop_buffer (aggpad);
2375 continue;
2376 }
2377 }
2378 gst_buffer_unref (input_buffer);
2379
2380 if (!pad->priv->buffer && !dropped && pad_eos) {
2381 GST_DEBUG_OBJECT (aggpad, "Pad is in EOS state");
2382 GST_OBJECT_UNLOCK (pad);
2383 continue;
2384 }
2385
2386 g_assert (pad->priv->buffer);
2387
2388 /* This pad is lagging behind, we need to update the offset
2389 * and maybe drop the current buffer */
2390 if (pad->priv->output_offset < aagg->priv->offset) {
2391 gint64 diff = aagg->priv->offset - pad->priv->output_offset;
2392 gint64 odiff = diff;
2393
2394 if (pad->priv->position + diff > pad->priv->size)
2395 diff = pad->priv->size - pad->priv->position;
2396 pad->priv->dropped += diff;
2397 if (diff != 0) {
2398 GstClockTime rt;
2399
2400 rt = gst_audio_aggregator_pad_enqueue_qos_message (pad, aagg, diff);
2401 GST_DEBUG_OBJECT (pad, "Dropped %" G_GINT64_FORMAT " samples at"
2402 " running time %" GST_TIME_FORMAT " because input buffer is before"
2403 " output offset", diff, GST_TIME_ARGS (rt));
2404 }
2405 pad->priv->position += diff;
2406 pad->priv->output_offset += diff;
2407
2408 if (pad->priv->position == pad->priv->size) {
2409 GST_DEBUG_OBJECT (pad, "Buffer was late by %" GST_TIME_FORMAT
2410 ", dropping %" GST_PTR_FORMAT,
2411 GST_TIME_ARGS (gst_util_uint64_scale (odiff, GST_SECOND,
2412 GST_AUDIO_INFO_RATE (&srcpad->info))), pad->priv->buffer);
2413 /* Buffer done, drop it */
2414 gst_buffer_replace (&pad->priv->buffer, NULL);
2415 dropped = TRUE;
2416 GST_OBJECT_UNLOCK (pad);
2417 gst_aggregator_pad_drop_buffer (aggpad);
2418 continue;
2419 }
2420 }
2421
2422 g_assert (pad->priv->buffer);
2423 GST_OBJECT_UNLOCK (pad);
2424 }
2425 GST_OBJECT_UNLOCK (agg);
2426
2427 gst_audio_aggregator_post_messages (aagg);
2428
2429 {
2430 gst_structure_set (aagg->priv->selected_samples_info, "offset",
2431 G_TYPE_UINT64, aagg->priv->offset, "frames", G_TYPE_UINT, blocksize,
2432 NULL);
2433 gst_aggregator_selected_samples (agg, agg_segment->position,
2434 GST_CLOCK_TIME_NONE, next_timestamp - agg_segment->position,
2435 aagg->priv->selected_samples_info);
2436 }
2437
2438 GST_OBJECT_LOCK (agg);
2439 for (iter = element->sinkpads; iter; iter = iter->next) {
2440 GstAudioAggregatorPad *pad = (GstAudioAggregatorPad *) iter->data;
2441 GstAggregatorPad *aggpad = (GstAggregatorPad *) iter->data;
2442
2443 if (gst_aggregator_pad_is_inactive (aggpad))
2444 continue;
2445
2446 GST_OBJECT_LOCK (pad);
2447
2448 if (pad->priv->buffer && pad->priv->output_offset >= aagg->priv->offset
2449 && pad->priv->output_offset < aagg->priv->offset + blocksize) {
2450 gboolean drop_buf;
2451
2452 GST_LOG_OBJECT (aggpad, "Mixing buffer for current offset");
2453 drop_buf = !gst_audio_aggregator_mix_buffer (aagg, pad, pad->priv->buffer,
2454 outbuf, blocksize);
2455 if (pad->priv->output_offset >= next_offset) {
2456 GST_LOG_OBJECT (pad,
2457 "Pad is at or after current offset: %" G_GUINT64_FORMAT " >= %"
2458 G_GINT64_FORMAT, pad->priv->output_offset, next_offset);
2459 } else {
2460 is_done = FALSE;
2461 }
2462 if (drop_buf) {
2463 GST_OBJECT_UNLOCK (pad);
2464 gst_aggregator_pad_drop_buffer (aggpad);
2465 continue;
2466 }
2467 }
2468
2469 GST_OBJECT_UNLOCK (pad);
2470 }
2471 GST_OBJECT_UNLOCK (agg);
2472
2473 if (dropped) {
2474 /* We dropped a buffer, retry */
2475 GST_LOG_OBJECT (aagg, "A pad dropped a buffer, wait for the next one");
2476 GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
2477 return GST_AGGREGATOR_FLOW_NEED_DATA;
2478 }
2479
2480 if (!is_done && !is_eos) {
2481 /* Get more buffers */
2482 GST_LOG_OBJECT (aagg,
2483 "We're not done yet for the current offset, waiting for more data");
2484 GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
2485 return GST_AGGREGATOR_FLOW_NEED_DATA;
2486 }
2487
2488 if (is_eos) {
2489 gint64 max_offset = 0;
2490
2491 GST_DEBUG_OBJECT (aagg, "We're EOS");
2492
2493 GST_OBJECT_LOCK (agg);
2494 for (iter = GST_ELEMENT (agg)->sinkpads; iter; iter = iter->next) {
2495 GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (iter->data);
2496
2497 if (gst_aggregator_pad_is_inactive (GST_AGGREGATOR_PAD (pad)))
2498 continue;
2499
2500 max_offset = MAX ((gint64) max_offset, (gint64) pad->priv->output_offset);
2501 }
2502 GST_OBJECT_UNLOCK (agg);
2503
2504 /* This means EOS or nothing mixed in at all */
2505 if (aagg->priv->offset == max_offset) {
2506 gst_buffer_replace (&aagg->priv->current_buffer, NULL);
2507 GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
2508 return GST_FLOW_EOS;
2509 }
2510
2511 if (max_offset <= next_offset) {
2512 GST_DEBUG_OBJECT (aagg,
2513 "Last buffer is incomplete: %" G_GUINT64_FORMAT " <= %"
2514 G_GINT64_FORMAT, max_offset, next_offset);
2515 next_offset = max_offset;
2516 next_timestamp =
2517 agg_segment->start + gst_util_uint64_scale (next_offset, GST_SECOND,
2518 rate);
2519
2520 if (next_offset > aagg->priv->offset)
2521 gst_buffer_resize (outbuf, 0, (next_offset - aagg->priv->offset) * bpf);
2522 }
2523 }
2524
2525 /* set timestamps on the output buffer */
2526 GST_OBJECT_LOCK (agg);
2527 if (agg_segment->rate > 0.0) {
2528 GST_BUFFER_PTS (outbuf) = agg_segment->position;
2529 GST_BUFFER_OFFSET (outbuf) = aagg->priv->offset;
2530 GST_BUFFER_OFFSET_END (outbuf) = next_offset;
2531 GST_BUFFER_DURATION (outbuf) = next_timestamp - agg_segment->position;
2532 } else {
2533 GST_BUFFER_PTS (outbuf) = next_timestamp;
2534 GST_BUFFER_OFFSET (outbuf) = next_offset;
2535 GST_BUFFER_OFFSET_END (outbuf) = aagg->priv->offset;
2536 GST_BUFFER_DURATION (outbuf) = agg_segment->position - next_timestamp;
2537 }
2538
2539 GST_OBJECT_UNLOCK (agg);
2540
2541 /* send it out */
2542 GST_LOG_OBJECT (aagg,
2543 "pushing outbuf %p, timestamp %" GST_TIME_FORMAT " offset %"
2544 G_GINT64_FORMAT, outbuf, GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)),
2545 GST_BUFFER_OFFSET (outbuf));
2546
2547 GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
2548
2549 ret = gst_aggregator_finish_buffer (agg, outbuf);
2550 aagg->priv->current_buffer = NULL;
2551
2552 GST_LOG_OBJECT (aagg, "pushed outbuf, result = %s", gst_flow_get_name (ret));
2553
2554 GST_AUDIO_AGGREGATOR_LOCK (aagg);
2555 GST_OBJECT_LOCK (agg);
2556 aagg->priv->offset = next_offset;
2557 agg_segment->position = next_timestamp;
2558
2559 /* If there was a timeout and there was a gap in data in out of the streams,
2560 * then it's a very good time to for a resync with the timestamps.
2561 */
2562 if (timeout) {
2563 for (iter = element->sinkpads; iter; iter = iter->next) {
2564 GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (iter->data);
2565
2566 GST_OBJECT_LOCK (pad);
2567 if (pad->priv->output_offset < aagg->priv->offset)
2568 pad->priv->output_offset = -1;
2569 GST_OBJECT_UNLOCK (pad);
2570 }
2571 }
2572 GST_OBJECT_UNLOCK (agg);
2573 GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
2574
2575 return ret;
2576 /* ERRORS */
2577 not_negotiated:
2578 {
2579 GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
2580 GST_ELEMENT_ERROR (aagg, STREAM, FORMAT, (NULL),
2581 ("Unknown data received, not negotiated"));
2582 return GST_FLOW_NOT_NEGOTIATED;
2583 }
2584 }
2585