1 /* GStreamer
2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2000,2001,2002,2003,2004,2005 Wim Taymans <wim@fluendo.com>
4 * 2007 Wim Taymans <wim.taymans@gmail.com>
5 *
6 * gsttee.c: Tee element, one in N out
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
17 *
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21 * Boston, MA 02110-1301, USA.
22 */
23
24 /**
25 * SECTION:element-tee
26 * @title: tee
27 * @see_also: #GstIdentity
28 *
29 * Split data to multiple pads. Branching the data flow is useful when e.g.
30 * capturing a video where the video is shown on the screen and also encoded and
31 * written to a file. Another example is playing music and hooking up a
32 * visualisation module.
33 *
34 * One needs to use separate queue elements (or a multiqueue) in each branch to
35 * provide separate threads for each branch. Otherwise a blocked dataflow in one
36 * branch would stall the other branches.
37 *
38 * ## Example launch line
39 * |[
40 * gst-launch-1.0 filesrc location=song.ogg ! decodebin ! tee name=t ! queue ! audioconvert ! audioresample ! autoaudiosink t. ! queue ! audioconvert ! goom ! videoconvert ! autovideosink
41 * ]|
42 *
43 * Play song.ogg audio file which must be in the current working directory
44 * and render visualisations using the goom element (this can be easier done
45 * using the playbin element, this is just an example pipeline).
46 */
47
48 #ifdef HAVE_CONFIG_H
49 # include "config.h"
50 #endif
51
52 #include "gsttee.h"
53 #include "gstcoreelementselements.h"
54 #include "gst/glib-compat-private.h"
55
56 #include <string.h>
57 #include <stdio.h>
58
59 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
60 GST_PAD_SINK,
61 GST_PAD_ALWAYS,
62 GST_STATIC_CAPS_ANY);
63
64 GST_DEBUG_CATEGORY_STATIC (gst_tee_debug);
65 #define GST_CAT_DEFAULT gst_tee_debug
66
67 #define GST_TYPE_TEE_PULL_MODE (gst_tee_pull_mode_get_type())
68 static GType
gst_tee_pull_mode_get_type(void)69 gst_tee_pull_mode_get_type (void)
70 {
71 static GType type = 0;
72 static const GEnumValue data[] = {
73 {GST_TEE_PULL_MODE_NEVER, "Never activate in pull mode", "never"},
74 {GST_TEE_PULL_MODE_SINGLE, "Only one src pad can be active in pull mode",
75 "single"},
76 {0, NULL, NULL},
77 };
78
79 if (!type) {
80 type = g_enum_register_static ("GstTeePullMode", data);
81 }
82 return type;
83 }
84
85 #define DEFAULT_PROP_NUM_SRC_PADS 0
86 #define DEFAULT_PROP_HAS_CHAIN TRUE
87 #define DEFAULT_PROP_SILENT TRUE
88 #define DEFAULT_PROP_LAST_MESSAGE NULL
89 #define DEFAULT_PULL_MODE GST_TEE_PULL_MODE_NEVER
90 #define DEFAULT_PROP_ALLOW_NOT_LINKED FALSE
91
92 enum
93 {
94 PROP_0,
95 PROP_NUM_SRC_PADS,
96 PROP_HAS_CHAIN,
97 PROP_SILENT,
98 PROP_LAST_MESSAGE,
99 PROP_PULL_MODE,
100 PROP_ALLOC_PAD,
101 PROP_ALLOW_NOT_LINKED,
102 };
103
104 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
105 GST_PAD_SRC,
106 GST_PAD_REQUEST,
107 GST_STATIC_CAPS_ANY);
108
109 #define _do_init \
110 GST_DEBUG_CATEGORY_INIT (gst_tee_debug, "tee", 0, "tee element");
111 #define gst_tee_parent_class parent_class
112 G_DEFINE_TYPE_WITH_CODE (GstTee, gst_tee, GST_TYPE_ELEMENT, _do_init);
113 GST_ELEMENT_REGISTER_DEFINE (tee, "tee", GST_RANK_NONE, GST_TYPE_TEE);
114
115 static GParamSpec *pspec_last_message = NULL;
116 static GParamSpec *pspec_alloc_pad = NULL;
117
118 GType gst_tee_pad_get_type (void);
119
120 #define GST_TYPE_TEE_PAD \
121 (gst_tee_pad_get_type())
122 #define GST_TEE_PAD(obj) \
123 (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_TEE_PAD, GstTeePad))
124 #define GST_TEE_PAD_CLASS(klass) \
125 (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_TEE_PAD, GstTeePadClass))
126 #define GST_IS_TEE_PAD(obj) \
127 (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_TEE_PAD))
128 #define GST_IS_TEE_PAD_CLASS(klass) \
129 (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_TEE_PAD))
130 #define GST_TEE_PAD_CAST(obj) \
131 ((GstTeePad *)(obj))
132
133 typedef struct _GstTeePad GstTeePad;
134 typedef struct _GstTeePadClass GstTeePadClass;
135
136 struct _GstTeePad
137 {
138 GstPad parent;
139
140 guint index;
141 gboolean pushed;
142 GstFlowReturn result;
143 gboolean removed;
144 };
145
146 struct _GstTeePadClass
147 {
148 GstPadClass parent;
149 };
150
151 G_DEFINE_TYPE (GstTeePad, gst_tee_pad, GST_TYPE_PAD);
152
153 static void
gst_tee_pad_class_init(GstTeePadClass * klass)154 gst_tee_pad_class_init (GstTeePadClass * klass)
155 {
156 }
157
158 static void
gst_tee_pad_reset(GstTeePad * pad)159 gst_tee_pad_reset (GstTeePad * pad)
160 {
161 pad->pushed = FALSE;
162 pad->result = GST_FLOW_NOT_LINKED;
163 pad->removed = FALSE;
164 }
165
166 static void
gst_tee_pad_init(GstTeePad * pad)167 gst_tee_pad_init (GstTeePad * pad)
168 {
169 gst_tee_pad_reset (pad);
170 }
171
172 static GstPad *gst_tee_request_new_pad (GstElement * element,
173 GstPadTemplate * temp, const gchar * unused, const GstCaps * caps);
174 static void gst_tee_release_pad (GstElement * element, GstPad * pad);
175
176 static void gst_tee_finalize (GObject * object);
177 static void gst_tee_set_property (GObject * object, guint prop_id,
178 const GValue * value, GParamSpec * pspec);
179 static void gst_tee_get_property (GObject * object, guint prop_id,
180 GValue * value, GParamSpec * pspec);
181 static void gst_tee_dispose (GObject * object);
182
183 static GstFlowReturn gst_tee_chain (GstPad * pad, GstObject * parent,
184 GstBuffer * buffer);
185 static GstFlowReturn gst_tee_chain_list (GstPad * pad, GstObject * parent,
186 GstBufferList * list);
187 static gboolean gst_tee_sink_event (GstPad * pad, GstObject * parent,
188 GstEvent * event);
189 static gboolean gst_tee_sink_query (GstPad * pad, GstObject * parent,
190 GstQuery * query);
191 static gboolean gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent,
192 GstPadMode mode, gboolean active);
193 static gboolean gst_tee_src_query (GstPad * pad, GstObject * parent,
194 GstQuery * query);
195 static gboolean gst_tee_src_activate_mode (GstPad * pad, GstObject * parent,
196 GstPadMode mode, gboolean active);
197 static GstFlowReturn gst_tee_src_get_range (GstPad * pad, GstObject * parent,
198 guint64 offset, guint length, GstBuffer ** buf);
199
200 static void
gst_tee_dispose(GObject * object)201 gst_tee_dispose (GObject * object)
202 {
203 GList *item;
204
205 restart:
206 for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
207 GstPad *pad = GST_PAD (item->data);
208 if (GST_PAD_IS_SRC (pad)) {
209 gst_element_release_request_pad (GST_ELEMENT (object), pad);
210 goto restart;
211 }
212 }
213
214 G_OBJECT_CLASS (parent_class)->dispose (object);
215 }
216
217 static void
gst_tee_finalize(GObject * object)218 gst_tee_finalize (GObject * object)
219 {
220 GstTee *tee;
221
222 tee = GST_TEE (object);
223
224 g_hash_table_unref (tee->pad_indexes);
225
226 g_free (tee->last_message);
227
228 G_OBJECT_CLASS (parent_class)->finalize (object);
229 }
230
231 static void
gst_tee_class_init(GstTeeClass * klass)232 gst_tee_class_init (GstTeeClass * klass)
233 {
234 GObjectClass *gobject_class;
235 GstElementClass *gstelement_class;
236
237 gobject_class = G_OBJECT_CLASS (klass);
238 gstelement_class = GST_ELEMENT_CLASS (klass);
239
240 gobject_class->finalize = gst_tee_finalize;
241 gobject_class->set_property = gst_tee_set_property;
242 gobject_class->get_property = gst_tee_get_property;
243 gobject_class->dispose = gst_tee_dispose;
244
245 g_object_class_install_property (gobject_class, PROP_NUM_SRC_PADS,
246 g_param_spec_int ("num-src-pads", "Num Src Pads",
247 "The number of source pads", 0, G_MAXINT, DEFAULT_PROP_NUM_SRC_PADS,
248 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
249 g_object_class_install_property (gobject_class, PROP_HAS_CHAIN,
250 g_param_spec_boolean ("has-chain", "Has Chain",
251 "If the element can operate in push mode", DEFAULT_PROP_HAS_CHAIN,
252 G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
253 g_object_class_install_property (gobject_class, PROP_SILENT,
254 g_param_spec_boolean ("silent", "Silent",
255 "Don't produce last_message events", DEFAULT_PROP_SILENT,
256 G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
257 pspec_last_message = g_param_spec_string ("last-message", "Last Message",
258 "The message describing current status", DEFAULT_PROP_LAST_MESSAGE,
259 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
260 g_object_class_install_property (gobject_class, PROP_LAST_MESSAGE,
261 pspec_last_message);
262 g_object_class_install_property (gobject_class, PROP_PULL_MODE,
263 g_param_spec_enum ("pull-mode", "Pull mode",
264 "Behavior of tee in pull mode", GST_TYPE_TEE_PULL_MODE,
265 DEFAULT_PULL_MODE,
266 G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
267 pspec_alloc_pad = g_param_spec_object ("alloc-pad", "Allocation Src Pad",
268 "The pad ALLOCATION queries will be proxied to (DEPRECATED, has no effect)",
269 GST_TYPE_PAD,
270 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED);
271 g_object_class_install_property (gobject_class, PROP_ALLOC_PAD,
272 pspec_alloc_pad);
273
274 /**
275 * GstTee:allow-not-linked
276 *
277 * This property makes sink pad return GST_FLOW_OK even if there are no
278 * source pads or any of them is linked.
279 *
280 * This is useful to avoid errors when you have a dynamic pipeline and during
281 * a reconnection you can have all the pads unlinked or removed.
282 *
283 * Since: 1.6
284 */
285 g_object_class_install_property (gobject_class, PROP_ALLOW_NOT_LINKED,
286 g_param_spec_boolean ("allow-not-linked", "Allow not linked",
287 "Return GST_FLOW_OK even if there are no source pads or they are "
288 "all unlinked", DEFAULT_PROP_ALLOW_NOT_LINKED,
289 G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
290
291 gst_element_class_set_static_metadata (gstelement_class,
292 "Tee pipe fitting",
293 "Generic",
294 "1-to-N pipe fitting",
295 "Erik Walthinsen <omega@cse.ogi.edu>, " "Wim Taymans <wim@fluendo.com>");
296 gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
297 gst_element_class_add_static_pad_template (gstelement_class, &src_template);
298
299 gstelement_class->request_new_pad =
300 GST_DEBUG_FUNCPTR (gst_tee_request_new_pad);
301 gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_tee_release_pad);
302
303 gst_type_mark_as_plugin_api (GST_TYPE_TEE_PULL_MODE, 0);
304 }
305
306 static void
gst_tee_init(GstTee * tee)307 gst_tee_init (GstTee * tee)
308 {
309 tee->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
310 tee->sink_mode = GST_PAD_MODE_NONE;
311
312 gst_pad_set_event_function (tee->sinkpad,
313 GST_DEBUG_FUNCPTR (gst_tee_sink_event));
314 gst_pad_set_query_function (tee->sinkpad,
315 GST_DEBUG_FUNCPTR (gst_tee_sink_query));
316 gst_pad_set_activatemode_function (tee->sinkpad,
317 GST_DEBUG_FUNCPTR (gst_tee_sink_activate_mode));
318 gst_pad_set_chain_function (tee->sinkpad, GST_DEBUG_FUNCPTR (gst_tee_chain));
319 gst_pad_set_chain_list_function (tee->sinkpad,
320 GST_DEBUG_FUNCPTR (gst_tee_chain_list));
321 GST_OBJECT_FLAG_SET (tee->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
322 gst_element_add_pad (GST_ELEMENT (tee), tee->sinkpad);
323
324 tee->pad_indexes = g_hash_table_new (NULL, NULL);
325
326 tee->last_message = NULL;
327 }
328
329 static void
gst_tee_notify_alloc_pad(GstTee * tee)330 gst_tee_notify_alloc_pad (GstTee * tee)
331 {
332 g_object_notify_by_pspec ((GObject *) tee, pspec_alloc_pad);
333 }
334
335 static gboolean
forward_sticky_events(GstPad * pad,GstEvent ** event,gpointer user_data)336 forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
337 {
338 GstPad *srcpad = GST_PAD_CAST (user_data);
339 GstFlowReturn ret;
340
341 ret = gst_pad_store_sticky_event (srcpad, *event);
342 if (ret != GST_FLOW_OK) {
343 GST_DEBUG_OBJECT (srcpad, "storing sticky event %p (%s) failed: %s", *event,
344 GST_EVENT_TYPE_NAME (*event), gst_flow_get_name (ret));
345 }
346
347 return TRUE;
348 }
349
350 static GstPad *
gst_tee_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * name_templ,const GstCaps * caps)351 gst_tee_request_new_pad (GstElement * element, GstPadTemplate * templ,
352 const gchar * name_templ, const GstCaps * caps)
353 {
354 gchar *name;
355 GstPad *srcpad;
356 GstTee *tee;
357 GstPadMode mode;
358 gboolean res;
359 guint index = 0;
360
361 tee = GST_TEE (element);
362
363 GST_DEBUG_OBJECT (tee, "requesting pad");
364
365 GST_OBJECT_LOCK (tee);
366
367 if (name_templ && sscanf (name_templ, "src_%u", &index) == 1) {
368 GST_LOG_OBJECT (element, "name: %s (index %d)", name_templ, index);
369 if (g_hash_table_contains (tee->pad_indexes, GUINT_TO_POINTER (index))) {
370 GST_ERROR_OBJECT (element, "pad name %s is not unique", name_templ);
371 GST_OBJECT_UNLOCK (tee);
372 return NULL;
373 }
374 if (index >= tee->next_pad_index)
375 tee->next_pad_index = index + 1;
376 } else {
377 index = tee->next_pad_index;
378
379 while (g_hash_table_contains (tee->pad_indexes, GUINT_TO_POINTER (index)))
380 index++;
381
382 tee->next_pad_index = index + 1;
383 }
384
385 g_hash_table_insert (tee->pad_indexes, GUINT_TO_POINTER (index), NULL);
386
387 name = g_strdup_printf ("src_%u", index);
388
389 srcpad = GST_PAD_CAST (g_object_new (GST_TYPE_TEE_PAD,
390 "name", name, "direction", templ->direction, "template", templ,
391 NULL));
392 GST_TEE_PAD_CAST (srcpad)->index = index;
393 g_free (name);
394
395 mode = tee->sink_mode;
396
397 GST_OBJECT_UNLOCK (tee);
398
399 switch (mode) {
400 case GST_PAD_MODE_PULL:
401 /* we already have a src pad in pull mode, and our pull mode can only be
402 SINGLE, so fall through to activate this new pad in push mode */
403 case GST_PAD_MODE_PUSH:
404 res = gst_pad_activate_mode (srcpad, GST_PAD_MODE_PUSH, TRUE);
405 break;
406 default:
407 res = TRUE;
408 break;
409 }
410
411 if (!res)
412 goto activate_failed;
413
414 gst_pad_set_activatemode_function (srcpad,
415 GST_DEBUG_FUNCPTR (gst_tee_src_activate_mode));
416 gst_pad_set_query_function (srcpad, GST_DEBUG_FUNCPTR (gst_tee_src_query));
417 gst_pad_set_getrange_function (srcpad,
418 GST_DEBUG_FUNCPTR (gst_tee_src_get_range));
419 GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PROXY_CAPS);
420 /* Forward sticky events to the new srcpad */
421 gst_pad_sticky_events_foreach (tee->sinkpad, forward_sticky_events, srcpad);
422 gst_element_add_pad (GST_ELEMENT_CAST (tee), srcpad);
423
424 return srcpad;
425
426 /* ERRORS */
427 activate_failed:
428 {
429 gboolean changed = FALSE;
430
431 GST_OBJECT_LOCK (tee);
432 GST_DEBUG_OBJECT (tee, "warning failed to activate request pad");
433 if (tee->allocpad == srcpad) {
434 tee->allocpad = NULL;
435 changed = TRUE;
436 }
437 GST_OBJECT_UNLOCK (tee);
438 gst_object_unref (srcpad);
439 if (changed) {
440 gst_tee_notify_alloc_pad (tee);
441 }
442 return NULL;
443 }
444 }
445
446 static void
gst_tee_release_pad(GstElement * element,GstPad * pad)447 gst_tee_release_pad (GstElement * element, GstPad * pad)
448 {
449 GstTee *tee;
450 gboolean changed = FALSE;
451 guint index;
452
453 tee = GST_TEE (element);
454
455 GST_DEBUG_OBJECT (tee, "releasing pad");
456
457 GST_OBJECT_LOCK (tee);
458 index = GST_TEE_PAD_CAST (pad)->index;
459 /* mark the pad as removed so that future pad_alloc fails with NOT_LINKED. */
460 GST_TEE_PAD_CAST (pad)->removed = TRUE;
461 if (tee->allocpad == pad) {
462 tee->allocpad = NULL;
463 changed = TRUE;
464 }
465 GST_OBJECT_UNLOCK (tee);
466
467 gst_pad_set_active (pad, FALSE);
468 gst_element_remove_pad (GST_ELEMENT_CAST (tee), pad);
469
470 if (changed) {
471 gst_tee_notify_alloc_pad (tee);
472 }
473
474 GST_OBJECT_LOCK (tee);
475 g_hash_table_remove (tee->pad_indexes, GUINT_TO_POINTER (index));
476 GST_OBJECT_UNLOCK (tee);
477 }
478
479 static void
gst_tee_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)480 gst_tee_set_property (GObject * object, guint prop_id, const GValue * value,
481 GParamSpec * pspec)
482 {
483 GstTee *tee = GST_TEE (object);
484
485 GST_OBJECT_LOCK (tee);
486 switch (prop_id) {
487 case PROP_HAS_CHAIN:
488 tee->has_chain = g_value_get_boolean (value);
489 break;
490 case PROP_SILENT:
491 tee->silent = g_value_get_boolean (value);
492 break;
493 case PROP_PULL_MODE:
494 tee->pull_mode = (GstTeePullMode) g_value_get_enum (value);
495 break;
496 case PROP_ALLOC_PAD:
497 {
498 GstPad *pad = g_value_get_object (value);
499 GST_OBJECT_LOCK (pad);
500 if (GST_OBJECT_PARENT (pad) == GST_OBJECT_CAST (object))
501 tee->allocpad = pad;
502 else
503 GST_WARNING_OBJECT (object, "Tried to set alloc pad %s which"
504 " is not my pad", GST_OBJECT_NAME (pad));
505 GST_OBJECT_UNLOCK (pad);
506 break;
507 }
508 case PROP_ALLOW_NOT_LINKED:
509 tee->allow_not_linked = g_value_get_boolean (value);
510 break;
511 default:
512 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
513 break;
514 }
515 GST_OBJECT_UNLOCK (tee);
516 }
517
518 static void
gst_tee_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)519 gst_tee_get_property (GObject * object, guint prop_id, GValue * value,
520 GParamSpec * pspec)
521 {
522 GstTee *tee = GST_TEE (object);
523
524 GST_OBJECT_LOCK (tee);
525 switch (prop_id) {
526 case PROP_NUM_SRC_PADS:
527 g_value_set_int (value, GST_ELEMENT (tee)->numsrcpads);
528 break;
529 case PROP_HAS_CHAIN:
530 g_value_set_boolean (value, tee->has_chain);
531 break;
532 case PROP_SILENT:
533 g_value_set_boolean (value, tee->silent);
534 break;
535 case PROP_LAST_MESSAGE:
536 g_value_set_string (value, tee->last_message);
537 break;
538 case PROP_PULL_MODE:
539 g_value_set_enum (value, tee->pull_mode);
540 break;
541 case PROP_ALLOC_PAD:
542 g_value_set_object (value, tee->allocpad);
543 break;
544 case PROP_ALLOW_NOT_LINKED:
545 g_value_set_boolean (value, tee->allow_not_linked);
546 break;
547 default:
548 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
549 break;
550 }
551 GST_OBJECT_UNLOCK (tee);
552 }
553
554 static gboolean
gst_tee_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)555 gst_tee_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
556 {
557 gboolean res;
558
559 switch (GST_EVENT_TYPE (event)) {
560 default:
561 res = gst_pad_event_default (pad, parent, event);
562 break;
563 }
564
565 return res;
566 }
567
568 struct AllocQueryCtx
569 {
570 GstTee *tee;
571 GstQuery *query;
572 GstAllocationParams params;
573 guint size;
574 guint min_buffers;
575 gboolean first_query;
576 guint num_pads;
577 };
578
579 /* This function will aggregate some of the allocation query information with
580 * the strategy to force upstream allocation. Depending on downstream
581 * allocation would otherwise make dynamic pipelines much more complicated as
582 * application would need to now drain buffer in certain cases before getting
583 * rid of a tee branch. */
584 static gboolean
gst_tee_query_allocation(const GValue * item,GValue * ret,gpointer user_data)585 gst_tee_query_allocation (const GValue * item, GValue * ret, gpointer user_data)
586 {
587 struct AllocQueryCtx *ctx = user_data;
588 GstPad *src_pad = g_value_get_object (item);
589 GstPad *peer_pad;
590 GstCaps *caps;
591 GstQuery *query;
592 guint count, i, size, min;
593
594 GST_DEBUG_OBJECT (ctx->tee, "Aggregating allocation from pad %s:%s",
595 GST_DEBUG_PAD_NAME (src_pad));
596
597 peer_pad = gst_pad_get_peer (src_pad);
598 if (!peer_pad) {
599 if (ctx->tee->allow_not_linked) {
600 GST_DEBUG_OBJECT (ctx->tee, "Pad %s:%s has no peer, but allowed.",
601 GST_DEBUG_PAD_NAME (src_pad));
602 return TRUE;
603 } else {
604 GST_DEBUG_OBJECT (ctx->tee, "Pad %s:%s has no peer, ignoring allocation.",
605 GST_DEBUG_PAD_NAME (src_pad));
606 g_value_set_boolean (ret, FALSE);
607 return FALSE;
608 }
609 }
610
611 gst_query_parse_allocation (ctx->query, &caps, NULL);
612
613 query = gst_query_new_allocation (caps, FALSE);
614 if (!gst_pad_query (peer_pad, query)) {
615 GST_DEBUG_OBJECT (ctx->tee,
616 "Allocation query failed on pad %s, ignoring allocation",
617 GST_PAD_NAME (src_pad));
618 g_value_set_boolean (ret, FALSE);
619 gst_query_unref (query);
620 gst_object_unref (peer_pad);
621 return FALSE;
622 }
623
624 gst_object_unref (peer_pad);
625
626 /* Allocation Params:
627 * store the maximum alignment, prefix and padding, but ignore the
628 * allocators and the flags which are tied to downstream allocation */
629 count = gst_query_get_n_allocation_params (query);
630 for (i = 0; i < count; i++) {
631 GstAllocationParams params = { 0, };
632
633 gst_query_parse_nth_allocation_param (query, i, NULL, ¶ms);
634
635 GST_DEBUG_OBJECT (ctx->tee, "Aggregating AllocationParams align=%"
636 G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%"
637 G_GSIZE_FORMAT, params.align, params.prefix, params.padding);
638
639 if (ctx->params.align < params.align)
640 ctx->params.align = params.align;
641
642 if (ctx->params.prefix < params.prefix)
643 ctx->params.prefix = params.prefix;
644
645 if (ctx->params.padding < params.padding)
646 ctx->params.padding = params.padding;
647 }
648
649 /* Allocation Pool:
650 * We want to keep the biggest size and biggest minimum number of buffers to
651 * make sure downstream requirement can be satisfied. We don't really care
652 * about the maximum, as this is a parameter of the downstream provided
653 * pool. We only read the first allocation pool as the minimum number of
654 * buffers is normally constant regardless of the pool being used. */
655 if (gst_query_get_n_allocation_pools (query) > 0) {
656 gst_query_parse_nth_allocation_pool (query, 0, NULL, &size, &min, NULL);
657
658 GST_DEBUG_OBJECT (ctx->tee,
659 "Aggregating allocation pool size=%u min_buffers=%u", size, min);
660
661 if (ctx->size < size)
662 ctx->size = size;
663
664 if (ctx->min_buffers < min)
665 ctx->min_buffers = min;
666 }
667
668 /* Allocation Meta:
669 * For allocation meta, we'll need to aggregate the argument using the new
670 * GstMetaInfo::agggregate_func */
671 count = gst_query_get_n_allocation_metas (query);
672 for (i = 0; i < count; i++) {
673 guint ctx_index;
674 GType api;
675 const GstStructure *param;
676
677 api = gst_query_parse_nth_allocation_meta (query, i, ¶m);
678
679 /* For the first query, copy all metas */
680 if (ctx->first_query) {
681 gst_query_add_allocation_meta (ctx->query, api, param);
682 continue;
683 }
684
685 /* Afterward, aggregate the common params */
686 if (gst_query_find_allocation_meta (ctx->query, api, &ctx_index)) {
687 const GstStructure *ctx_param;
688
689 gst_query_parse_nth_allocation_meta (ctx->query, ctx_index, &ctx_param);
690
691 /* Keep meta which has no params */
692 if (ctx_param == NULL && param == NULL)
693 continue;
694
695 GST_DEBUG_OBJECT (ctx->tee, "Dropping allocation meta %s",
696 g_type_name (api));
697 gst_query_remove_nth_allocation_meta (ctx->query, ctx_index);
698 }
699 }
700
701 /* Finally, cleanup metas from the stored query that aren't support on this
702 * pad. */
703 count = gst_query_get_n_allocation_metas (ctx->query);
704 for (i = 0; i < count;) {
705 GType api = gst_query_parse_nth_allocation_meta (ctx->query, i, NULL);
706
707 if (!gst_query_find_allocation_meta (query, api, NULL)) {
708 GST_DEBUG_OBJECT (ctx->tee, "Dropping allocation meta %s",
709 g_type_name (api));
710 gst_query_remove_nth_allocation_meta (ctx->query, i);
711 count--;
712 continue;
713 }
714
715 i++;
716 }
717
718 ctx->first_query = FALSE;
719 ctx->num_pads++;
720 gst_query_unref (query);
721
722 return TRUE;
723 }
724
725
726 static void
gst_tee_clear_query_allocation_meta(GstQuery * query)727 gst_tee_clear_query_allocation_meta (GstQuery * query)
728 {
729 guint count = gst_query_get_n_allocation_metas (query);
730 guint i;
731
732 for (i = 1; i <= count; i++)
733 gst_query_remove_nth_allocation_meta (query, count - i);
734 }
735
736 static gboolean
gst_tee_sink_query(GstPad * pad,GstObject * parent,GstQuery * query)737 gst_tee_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
738 {
739 GstTee *tee = GST_TEE (parent);
740 gboolean res;
741
742 switch (GST_QUERY_TYPE (query)) {
743 case GST_QUERY_ALLOCATION:
744 {
745 GstIterator *iter;
746 GValue ret = G_VALUE_INIT;
747 struct AllocQueryCtx ctx = { tee, query, };
748
749 g_value_init (&ret, G_TYPE_BOOLEAN);
750 g_value_set_boolean (&ret, TRUE);
751
752 ctx.first_query = TRUE;
753 gst_allocation_params_init (&ctx.params);
754
755 iter = gst_element_iterate_src_pads (GST_ELEMENT (tee));
756 while (GST_ITERATOR_RESYNC ==
757 gst_iterator_fold (iter, gst_tee_query_allocation, &ret, &ctx)) {
758 gst_iterator_resync (iter);
759 ctx.first_query = TRUE;
760 gst_allocation_params_init (&ctx.params);
761 ctx.size = 0;
762 ctx.min_buffers = 0;
763 ctx.num_pads = 0;
764 gst_tee_clear_query_allocation_meta (query);
765 }
766
767 gst_iterator_free (iter);
768 res = g_value_get_boolean (&ret);
769 g_value_unset (&ret);
770
771 if (res) {
772 GST_DEBUG_OBJECT (tee, "Aggregated AllocationParams to align=%"
773 G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%"
774 G_GSIZE_FORMAT, ctx.params.align, ctx.params.prefix,
775 ctx.params.padding);
776
777 GST_DEBUG_OBJECT (tee,
778 "Aggregated allocation pools size=%u min_buffers=%u", ctx.size,
779 ctx.min_buffers);
780
781 #ifndef GST_DISABLE_GST_DEBUG
782 {
783 guint count = gst_query_get_n_allocation_metas (query);
784 guint i;
785
786 GST_DEBUG_OBJECT (tee, "Aggregated %u allocation meta:", count);
787
788 for (i = 0; i < count; i++)
789 GST_DEBUG_OBJECT (tee, " + aggregated allocation meta %s",
790 g_type_name (gst_query_parse_nth_allocation_meta (ctx.query, i,
791 NULL)));
792 }
793 #endif
794
795 /* Allocate one more buffers when multiplexing so we don't starve the
796 * downstream threads. */
797 if (ctx.num_pads > 1)
798 ctx.min_buffers++;
799
800 /* Check that we actually have parameters besides the defaults. */
801 if (ctx.params.align || ctx.params.prefix || ctx.params.padding) {
802 gst_query_add_allocation_param (ctx.query, NULL, &ctx.params);
803 }
804 /* When size == 0, buffers created from this pool would have no memory
805 * allocated. */
806 if (ctx.size) {
807 gst_query_add_allocation_pool (ctx.query, NULL, ctx.size,
808 ctx.min_buffers, 0);
809 }
810 } else {
811 gst_tee_clear_query_allocation_meta (query);
812 }
813 break;
814 }
815 default:
816 res = gst_pad_query_default (pad, parent, query);
817 break;
818 }
819 return res;
820 }
821
822 static void
gst_tee_do_message(GstTee * tee,GstPad * pad,gpointer data,gboolean is_list)823 gst_tee_do_message (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
824 {
825 GST_OBJECT_LOCK (tee);
826 g_free (tee->last_message);
827 if (is_list) {
828 tee->last_message =
829 g_strdup_printf ("chain-list ******* (%s:%s)t %p",
830 GST_DEBUG_PAD_NAME (pad), data);
831 } else {
832 tee->last_message =
833 g_strdup_printf ("chain ******* (%s:%s)t (%" G_GSIZE_FORMAT
834 " bytes, %" G_GUINT64_FORMAT ") %p", GST_DEBUG_PAD_NAME (pad),
835 gst_buffer_get_size (data), GST_BUFFER_TIMESTAMP (data), data);
836 }
837 GST_OBJECT_UNLOCK (tee);
838
839 g_object_notify_by_pspec ((GObject *) tee, pspec_last_message);
840 }
841
842 static GstFlowReturn
gst_tee_do_push(GstTee * tee,GstPad * pad,gpointer data,gboolean is_list)843 gst_tee_do_push (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
844 {
845 GstFlowReturn res;
846
847 /* Push */
848 if (pad == tee->pull_pad) {
849 /* don't push on the pad we're pulling from */
850 res = GST_FLOW_OK;
851 } else if (is_list) {
852 res =
853 gst_pad_push_list (pad,
854 gst_buffer_list_ref (GST_BUFFER_LIST_CAST (data)));
855 } else {
856 res = gst_pad_push (pad, gst_buffer_ref (GST_BUFFER_CAST (data)));
857 }
858 return res;
859 }
860
861 static void
clear_pads(GstPad * pad,GstTee * tee)862 clear_pads (GstPad * pad, GstTee * tee)
863 {
864 GST_TEE_PAD_CAST (pad)->pushed = FALSE;
865 GST_TEE_PAD_CAST (pad)->result = GST_FLOW_NOT_LINKED;
866 }
867
868 static GstFlowReturn
gst_tee_handle_data(GstTee * tee,gpointer data,gboolean is_list)869 gst_tee_handle_data (GstTee * tee, gpointer data, gboolean is_list)
870 {
871 GList *pads;
872 guint32 cookie;
873 GstFlowReturn ret, cret;
874
875 if (G_UNLIKELY (!tee->silent))
876 gst_tee_do_message (tee, tee->sinkpad, data, is_list);
877
878 GST_OBJECT_LOCK (tee);
879 pads = GST_ELEMENT_CAST (tee)->srcpads;
880
881 /* special case for zero pads */
882 if (G_UNLIKELY (!pads))
883 goto no_pads;
884
885 /* special case for just one pad that avoids reffing the buffer */
886 if (!pads->next) {
887 GstPad *pad = GST_PAD_CAST (pads->data);
888
889 /* Keep another ref around, a pad probe
890 * might release and destroy the pad */
891 gst_object_ref (pad);
892 GST_OBJECT_UNLOCK (tee);
893
894 if (pad == tee->pull_pad) {
895 ret = GST_FLOW_OK;
896 } else if (is_list) {
897 ret = gst_pad_push_list (pad, GST_BUFFER_LIST_CAST (data));
898 } else {
899 ret = gst_pad_push (pad, GST_BUFFER_CAST (data));
900 }
901
902 GST_OBJECT_LOCK (tee);
903 if (GST_TEE_PAD_CAST (pad)->removed)
904 ret = GST_FLOW_NOT_LINKED;
905
906 if (ret == GST_FLOW_NOT_LINKED && tee->allow_not_linked) {
907 ret = GST_FLOW_OK;
908 }
909 GST_OBJECT_UNLOCK (tee);
910
911 gst_object_unref (pad);
912
913 return ret;
914 }
915
916 /* mark all pads as 'not pushed on yet' */
917 g_list_foreach (pads, (GFunc) clear_pads, tee);
918
919 restart:
920 if (tee->allow_not_linked) {
921 cret = GST_FLOW_OK;
922 } else {
923 cret = GST_FLOW_NOT_LINKED;
924 }
925 pads = GST_ELEMENT_CAST (tee)->srcpads;
926 cookie = GST_ELEMENT_CAST (tee)->pads_cookie;
927
928 while (pads) {
929 GstPad *pad;
930
931 pad = GST_PAD_CAST (pads->data);
932
933 if (G_LIKELY (!GST_TEE_PAD_CAST (pad)->pushed)) {
934 /* not yet pushed, release lock and start pushing */
935 gst_object_ref (pad);
936 GST_OBJECT_UNLOCK (tee);
937
938 GST_LOG_OBJECT (pad, "Starting to push %s %p",
939 is_list ? "list" : "buffer", data);
940
941 ret = gst_tee_do_push (tee, pad, data, is_list);
942
943 GST_LOG_OBJECT (pad, "Pushing item %p yielded result %s", data,
944 gst_flow_get_name (ret));
945
946 GST_OBJECT_LOCK (tee);
947 /* keep track of which pad we pushed and the result value */
948 if (GST_TEE_PAD_CAST (pad)->removed)
949 ret = GST_FLOW_NOT_LINKED;
950 GST_TEE_PAD_CAST (pad)->pushed = TRUE;
951 GST_TEE_PAD_CAST (pad)->result = ret;
952 gst_object_unref (pad);
953 pad = NULL;
954 } else {
955 /* already pushed, use previous return value */
956 ret = GST_TEE_PAD_CAST (pad)->result;
957 GST_LOG_OBJECT (pad, "pad already pushed with %s",
958 gst_flow_get_name (ret));
959 }
960
961 /* before we go combining the return value, check if the pad list is still
962 * the same. It could be possible that the pad we just pushed was removed
963 * and the return value it not valid anymore */
964 if (G_UNLIKELY (GST_ELEMENT_CAST (tee)->pads_cookie != cookie)) {
965 GST_LOG_OBJECT (tee, "pad list changed");
966 /* the list of pads changed, restart iteration. Pads that we already
967 * pushed on and are still in the new list, will not be pushed on
968 * again. */
969 goto restart;
970 }
971
972 /* stop pushing more buffers when we have a fatal error */
973 if (G_UNLIKELY (ret != GST_FLOW_OK && ret != GST_FLOW_NOT_LINKED))
974 goto error;
975
976 /* keep all other return values, overwriting the previous one. */
977 if (G_LIKELY (ret != GST_FLOW_NOT_LINKED)) {
978 GST_LOG_OBJECT (tee, "Replacing ret val %d with %d", cret, ret);
979 cret = ret;
980 }
981 pads = g_list_next (pads);
982 }
983 GST_OBJECT_UNLOCK (tee);
984
985 gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
986
987 /* no need to unset gvalue */
988 return cret;
989
990 /* ERRORS */
991 no_pads:
992 {
993 if (tee->allow_not_linked) {
994 GST_DEBUG_OBJECT (tee, "there are no pads, dropping %s",
995 is_list ? "buffer-list" : "buffer");
996 ret = GST_FLOW_OK;
997 } else {
998 GST_DEBUG_OBJECT (tee, "there are no pads, return not-linked");
999 ret = GST_FLOW_NOT_LINKED;
1000 }
1001 goto end;
1002 }
1003 error:
1004 {
1005 GST_DEBUG_OBJECT (tee, "received error %s", gst_flow_get_name (ret));
1006 goto end;
1007 }
1008 end:
1009 {
1010 GST_OBJECT_UNLOCK (tee);
1011 gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1012 return ret;
1013 }
1014 }
1015
1016 static GstFlowReturn
gst_tee_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)1017 gst_tee_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
1018 {
1019 GstFlowReturn res;
1020 GstTee *tee;
1021
1022 tee = GST_TEE_CAST (parent);
1023
1024 GST_DEBUG_OBJECT (tee, "received buffer %p", buffer);
1025
1026 res = gst_tee_handle_data (tee, buffer, FALSE);
1027
1028 GST_DEBUG_OBJECT (tee, "handled buffer %s", gst_flow_get_name (res));
1029
1030 return res;
1031 }
1032
1033 static GstFlowReturn
gst_tee_chain_list(GstPad * pad,GstObject * parent,GstBufferList * list)1034 gst_tee_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
1035 {
1036 GstFlowReturn res;
1037 GstTee *tee;
1038
1039 tee = GST_TEE_CAST (parent);
1040
1041 GST_DEBUG_OBJECT (tee, "received list %p", list);
1042
1043 res = gst_tee_handle_data (tee, list, TRUE);
1044
1045 GST_DEBUG_OBJECT (tee, "handled list %s", gst_flow_get_name (res));
1046
1047 return res;
1048 }
1049
1050 static gboolean
gst_tee_sink_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)1051 gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
1052 gboolean active)
1053 {
1054 gboolean res;
1055 GstTee *tee;
1056
1057 tee = GST_TEE (parent);
1058
1059 switch (mode) {
1060 case GST_PAD_MODE_PUSH:
1061 {
1062 GST_OBJECT_LOCK (tee);
1063 tee->sink_mode = active ? mode : GST_PAD_MODE_NONE;
1064
1065 if (active && !tee->has_chain)
1066 goto no_chain;
1067 GST_OBJECT_UNLOCK (tee);
1068 res = TRUE;
1069 break;
1070 }
1071 default:
1072 res = FALSE;
1073 break;
1074 }
1075 return res;
1076
1077 /* ERRORS */
1078 no_chain:
1079 {
1080 GST_OBJECT_UNLOCK (tee);
1081 GST_INFO_OBJECT (tee,
1082 "Tee cannot operate in push mode with has-chain==FALSE");
1083 return FALSE;
1084 }
1085 }
1086
1087 static gboolean
gst_tee_src_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)1088 gst_tee_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
1089 gboolean active)
1090 {
1091 GstTee *tee;
1092 gboolean res;
1093 GstPad *sinkpad;
1094
1095 tee = GST_TEE (parent);
1096
1097 switch (mode) {
1098 case GST_PAD_MODE_PULL:
1099 {
1100 GST_OBJECT_LOCK (tee);
1101
1102 if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER)
1103 goto cannot_pull;
1104
1105 if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && active && tee->pull_pad)
1106 goto cannot_pull_multiple_srcs;
1107
1108 sinkpad = gst_object_ref (tee->sinkpad);
1109
1110 GST_OBJECT_UNLOCK (tee);
1111
1112 res = gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PULL, active);
1113 gst_object_unref (sinkpad);
1114
1115 if (!res)
1116 goto sink_activate_failed;
1117
1118 GST_OBJECT_LOCK (tee);
1119 if (active) {
1120 if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE)
1121 tee->pull_pad = pad;
1122 } else {
1123 if (pad == tee->pull_pad)
1124 tee->pull_pad = NULL;
1125 }
1126 tee->sink_mode = (active ? GST_PAD_MODE_PULL : GST_PAD_MODE_NONE);
1127 GST_OBJECT_UNLOCK (tee);
1128 break;
1129 }
1130 default:
1131 res = TRUE;
1132 break;
1133 }
1134
1135 return res;
1136
1137 /* ERRORS */
1138 cannot_pull:
1139 {
1140 GST_OBJECT_UNLOCK (tee);
1141 GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
1142 "set to NEVER");
1143 return FALSE;
1144 }
1145 cannot_pull_multiple_srcs:
1146 {
1147 GST_OBJECT_UNLOCK (tee);
1148 GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
1149 "pull-mode set to SINGLE");
1150 return FALSE;
1151 }
1152 sink_activate_failed:
1153 {
1154 GST_INFO_OBJECT (tee, "Failed to %sactivate sink pad in pull mode",
1155 active ? "" : "de");
1156 return FALSE;
1157 }
1158 }
1159
1160 static gboolean
gst_tee_src_query(GstPad * pad,GstObject * parent,GstQuery * query)1161 gst_tee_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1162 {
1163 GstTee *tee;
1164 gboolean res;
1165 GstPad *sinkpad;
1166
1167 tee = GST_TEE (parent);
1168
1169 switch (GST_QUERY_TYPE (query)) {
1170 case GST_QUERY_SCHEDULING:
1171 {
1172 gboolean pull_mode;
1173
1174 GST_OBJECT_LOCK (tee);
1175 pull_mode = TRUE;
1176 if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER) {
1177 GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
1178 "set to NEVER");
1179 pull_mode = FALSE;
1180 } else if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && tee->pull_pad) {
1181 GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
1182 "pull-mode set to SINGLE");
1183 pull_mode = FALSE;
1184 }
1185
1186 sinkpad = gst_object_ref (tee->sinkpad);
1187 GST_OBJECT_UNLOCK (tee);
1188
1189 if (pull_mode) {
1190 /* ask peer if we can operate in pull mode */
1191 res = gst_pad_peer_query (sinkpad, query);
1192 } else {
1193 res = TRUE;
1194 }
1195 gst_object_unref (sinkpad);
1196 break;
1197 }
1198 default:
1199 res = gst_pad_query_default (pad, parent, query);
1200 break;
1201 }
1202
1203 return res;
1204 }
1205
1206 static void
gst_tee_push_eos(const GValue * vpad,GstTee * tee)1207 gst_tee_push_eos (const GValue * vpad, GstTee * tee)
1208 {
1209 GstPad *pad = g_value_get_object (vpad);
1210
1211 if (pad != tee->pull_pad)
1212 gst_pad_push_event (pad, gst_event_new_eos ());
1213 }
1214
1215 static void
gst_tee_pull_eos(GstTee * tee)1216 gst_tee_pull_eos (GstTee * tee)
1217 {
1218 GstIterator *iter;
1219
1220 iter = gst_element_iterate_src_pads (GST_ELEMENT (tee));
1221 while (gst_iterator_foreach (iter,
1222 (GstIteratorForeachFunction) gst_tee_push_eos,
1223 tee) == GST_ITERATOR_RESYNC)
1224 gst_iterator_resync (iter);
1225 gst_iterator_free (iter);
1226 }
1227
1228 static GstFlowReturn
gst_tee_src_get_range(GstPad * pad,GstObject * parent,guint64 offset,guint length,GstBuffer ** buf)1229 gst_tee_src_get_range (GstPad * pad, GstObject * parent, guint64 offset,
1230 guint length, GstBuffer ** buf)
1231 {
1232 GstTee *tee;
1233 GstFlowReturn ret;
1234
1235 tee = GST_TEE (parent);
1236
1237 ret = gst_pad_pull_range (tee->sinkpad, offset, length, buf);
1238
1239 if (ret == GST_FLOW_OK)
1240 ret = gst_tee_handle_data (tee, gst_buffer_ref (*buf), FALSE);
1241 else if (ret == GST_FLOW_EOS)
1242 gst_tee_pull_eos (tee);
1243
1244 return ret;
1245 }
1246