1 /* GStreamer
2 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4 * Copyright (C) 2006 Wim Taymans <wim at fluendo dot com>
5 * Copyright (C) <2011> Collabora Ltd.
6 * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
17 *
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21 * Boston, MA 02110-1301, USA.
22 */
23
24
25 #ifdef HAVE_CONFIG_H
26 #include "config.h"
27 #endif
28
29 #include <gst/gst-i18n-plugin.h>
30
31 #include "gstmultihandlesink.h"
32
33 #ifdef HAVE_SYS_SOCKET_H
34 #include <sys/socket.h>
35 #endif
36
37 #ifndef G_OS_WIN32
38 #include <netinet/in.h>
39 #endif
40
41 #include <string.h>
42
43 #define NOT_IMPLEMENTED 0
44
45 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
46 GST_PAD_SINK,
47 GST_PAD_ALWAYS,
48 GST_STATIC_CAPS_ANY);
49
50 GST_DEBUG_CATEGORY_STATIC (multihandlesink_debug);
51 #define GST_CAT_DEFAULT (multihandlesink_debug)
52
53 /* MultiHandleSink signals and args */
54 enum
55 {
56 GST_MULTI_SINK_LAST_SIGNAL,
57
58 /* methods */
59 SIGNAL_ADD,
60 SIGNAL_ADD_BURST,
61 SIGNAL_CLEAR,
62
63 /* signals */
64 SIGNAL_CLIENT_ADDED,
65 SIGNAL_CLIENT_REMOVED,
66 SIGNAL_CLIENT_SOCKET_REMOVED,
67
68 LAST_SIGNAL
69 };
70
71
72 /* this is really arbitrarily chosen */
73 #define DEFAULT_BUFFERS_MAX -1
74 #define DEFAULT_BUFFERS_SOFT_MAX -1
75 #define DEFAULT_TIME_MIN -1
76 #define DEFAULT_BYTES_MIN -1
77 #define DEFAULT_BUFFERS_MIN -1
78 #define DEFAULT_UNIT_FORMAT GST_FORMAT_BUFFERS
79 #define DEFAULT_UNITS_MAX -1
80 #define DEFAULT_UNITS_SOFT_MAX -1
81 #define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE
82 #define DEFAULT_TIMEOUT 0
83 #define DEFAULT_SYNC_METHOD GST_SYNC_METHOD_LATEST
84
85 #define DEFAULT_BURST_FORMAT GST_FORMAT_UNDEFINED
86 #define DEFAULT_BURST_VALUE 0
87
88 #define DEFAULT_QOS_DSCP -1
89
90 #define DEFAULT_RESEND_STREAMHEADER TRUE
91
92 enum
93 {
94 PROP_0,
95 PROP_BUFFERS_QUEUED,
96 PROP_BYTES_QUEUED,
97 PROP_TIME_QUEUED,
98
99 PROP_UNIT_FORMAT,
100 PROP_UNITS_MAX,
101 PROP_UNITS_SOFT_MAX,
102
103 PROP_BUFFERS_MAX,
104 PROP_BUFFERS_SOFT_MAX,
105
106 PROP_TIME_MIN,
107 PROP_BYTES_MIN,
108 PROP_BUFFERS_MIN,
109
110 PROP_RECOVER_POLICY,
111 PROP_TIMEOUT,
112 PROP_SYNC_METHOD,
113 PROP_BYTES_TO_SERVE,
114 PROP_BYTES_SERVED,
115
116 PROP_BURST_FORMAT,
117 PROP_BURST_VALUE,
118
119 PROP_QOS_DSCP,
120
121 PROP_RESEND_STREAMHEADER,
122
123 PROP_NUM_HANDLES
124 };
125
126 GType
gst_multi_handle_sink_recover_policy_get_type(void)127 gst_multi_handle_sink_recover_policy_get_type (void)
128 {
129 static GType recover_policy_type = 0;
130 static const GEnumValue recover_policy[] = {
131 {GST_RECOVER_POLICY_NONE,
132 "Do not try to recover", "none"},
133 {GST_RECOVER_POLICY_RESYNC_LATEST,
134 "Resync client to latest buffer", "latest"},
135 {GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT,
136 "Resync client to soft limit", "soft-limit"},
137 {GST_RECOVER_POLICY_RESYNC_KEYFRAME,
138 "Resync client to most recent keyframe", "keyframe"},
139 {0, NULL, NULL},
140 };
141
142 if (!recover_policy_type) {
143 recover_policy_type =
144 g_enum_register_static ("GstMultiHandleSinkRecoverPolicy",
145 recover_policy);
146 }
147 return recover_policy_type;
148 }
149
150 GType
gst_multi_handle_sink_sync_method_get_type(void)151 gst_multi_handle_sink_sync_method_get_type (void)
152 {
153 static GType sync_method_type = 0;
154 static const GEnumValue sync_method[] = {
155 {GST_SYNC_METHOD_LATEST,
156 "Serve starting from the latest buffer", "latest"},
157 {GST_SYNC_METHOD_NEXT_KEYFRAME,
158 "Serve starting from the next keyframe", "next-keyframe"},
159 {GST_SYNC_METHOD_LATEST_KEYFRAME,
160 "Serve everything since the latest keyframe (burst)",
161 "latest-keyframe"},
162 {GST_SYNC_METHOD_BURST, "Serve burst-value data to client", "burst"},
163 {GST_SYNC_METHOD_BURST_KEYFRAME,
164 "Serve burst-value data starting on a keyframe",
165 "burst-keyframe"},
166 {GST_SYNC_METHOD_BURST_WITH_KEYFRAME,
167 "Serve burst-value data preferably starting on a keyframe",
168 "burst-with-keyframe"},
169 {0, NULL, NULL},
170 };
171
172 if (!sync_method_type) {
173 sync_method_type =
174 g_enum_register_static ("GstMultiHandleSinkSyncMethod", sync_method);
175 }
176 return sync_method_type;
177 }
178
179 GType
gst_multi_handle_sink_client_status_get_type(void)180 gst_multi_handle_sink_client_status_get_type (void)
181 {
182 static GType client_status_type = 0;
183 static const GEnumValue client_status[] = {
184 {GST_CLIENT_STATUS_OK, "ok", "ok"},
185 {GST_CLIENT_STATUS_CLOSED, "Closed", "closed"},
186 {GST_CLIENT_STATUS_REMOVED, "Removed", "removed"},
187 {GST_CLIENT_STATUS_SLOW, "Too slow", "slow"},
188 {GST_CLIENT_STATUS_ERROR, "Error", "error"},
189 {GST_CLIENT_STATUS_DUPLICATE, "Duplicate", "duplicate"},
190 {GST_CLIENT_STATUS_FLUSHING, "Flushing", "flushing"},
191 {0, NULL, NULL},
192 };
193
194 if (!client_status_type) {
195 client_status_type =
196 g_enum_register_static ("GstMultiHandleSinkClientStatus",
197 client_status);
198 }
199 return client_status_type;
200 }
201
202 static void gst_multi_handle_sink_finalize (GObject * object);
203 static void gst_multi_handle_sink_clear (GstMultiHandleSink * mhsink);
204
205 static GstFlowReturn gst_multi_handle_sink_render (GstBaseSink * bsink,
206 GstBuffer * buf);
207 static void gst_multi_handle_sink_queue_buffer (GstMultiHandleSink * mhsink,
208 GstBuffer * buffer);
209 static gboolean gst_multi_handle_sink_client_queue_buffer (GstMultiHandleSink *
210 mhsink, GstMultiHandleClient * mhclient, GstBuffer * buffer);
211 static GstStateChangeReturn gst_multi_handle_sink_change_state (GstElement *
212 element, GstStateChange transition);
213
214 static void gst_multi_handle_sink_set_property (GObject * object, guint prop_id,
215 const GValue * value, GParamSpec * pspec);
216 static void gst_multi_handle_sink_get_property (GObject * object, guint prop_id,
217 GValue * value, GParamSpec * pspec);
218
219 #define gst_multi_handle_sink_parent_class parent_class
220 G_DEFINE_TYPE (GstMultiHandleSink, gst_multi_handle_sink, GST_TYPE_BASE_SINK);
221
222 static guint gst_multi_handle_sink_signals[LAST_SIGNAL] = { 0 };
223
224 static gint
225 find_syncframe (GstMultiHandleSink * sink, gint idx, gint direction);
226 #define find_next_syncframe(s,i) find_syncframe(s,i,1)
227 #define find_prev_syncframe(s,i) find_syncframe(s,i,-1)
228 static gboolean is_sync_frame (GstMultiHandleSink * sink, GstBuffer * buffer);
229 static gboolean gst_multi_handle_sink_stop (GstBaseSink * bsink);
230 static gboolean gst_multi_handle_sink_start (GstBaseSink * bsink);
231 static gint get_buffers_max (GstMultiHandleSink * sink, gint64 max);
232 static gint
233 gst_multi_handle_sink_recover_client (GstMultiHandleSink * sink,
234 GstMultiHandleClient * client);
235 static void gst_multi_handle_sink_setup_dscp (GstMultiHandleSink * mhsink);
236 static gboolean
237 find_limits (GstMultiHandleSink * sink,
238 gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min,
239 gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max);
240
241
242 static void
gst_multi_handle_sink_class_init(GstMultiHandleSinkClass * klass)243 gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
244 {
245 GObjectClass *gobject_class;
246 GstElementClass *gstelement_class;
247 GstBaseSinkClass *gstbasesink_class;
248
249 gobject_class = (GObjectClass *) klass;
250 gstelement_class = (GstElementClass *) klass;
251 gstbasesink_class = (GstBaseSinkClass *) klass;
252
253 gobject_class->set_property = gst_multi_handle_sink_set_property;
254 gobject_class->get_property = gst_multi_handle_sink_get_property;
255 gobject_class->finalize = gst_multi_handle_sink_finalize;
256
257 g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
258 g_param_spec_int ("buffers-max", "Buffers max",
259 "max number of buffers to queue for a client (-1 = no limit)", -1,
260 G_MAXINT, DEFAULT_BUFFERS_MAX,
261 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
262 g_object_class_install_property (gobject_class, PROP_BUFFERS_SOFT_MAX,
263 g_param_spec_int ("buffers-soft-max", "Buffers soft max",
264 "Recover client when going over this limit (-1 = no limit)", -1,
265 G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX,
266 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
267
268 g_object_class_install_property (gobject_class, PROP_BYTES_MIN,
269 g_param_spec_int ("bytes-min", "Bytes min",
270 "min number of bytes to queue (-1 = as little as possible)", -1,
271 G_MAXINT, DEFAULT_BYTES_MIN,
272 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
273 g_object_class_install_property (gobject_class, PROP_TIME_MIN,
274 g_param_spec_int64 ("time-min", "Time min",
275 "min amount of time to queue (in nanoseconds) "
276 "(-1 = as little as possible)", -1, G_MAXINT64, DEFAULT_TIME_MIN,
277 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
278 g_object_class_install_property (gobject_class, PROP_BUFFERS_MIN,
279 g_param_spec_int ("buffers-min", "Buffers min",
280 "min number of buffers to queue (-1 = as few as possible)", -1,
281 G_MAXINT, DEFAULT_BUFFERS_MIN,
282 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
283
284 g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT,
285 g_param_spec_enum ("unit-format", "Units format",
286 "The unit to measure the max/soft-max/queued properties",
287 GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT,
288 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
289 g_object_class_install_property (gobject_class, PROP_UNITS_MAX,
290 g_param_spec_int64 ("units-max", "Units max",
291 "max number of units to queue (-1 = no limit)", -1, G_MAXINT64,
292 DEFAULT_UNITS_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
293 g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX,
294 g_param_spec_int64 ("units-soft-max", "Units soft max",
295 "Recover client when going over this limit (-1 = no limit)", -1,
296 G_MAXINT64, DEFAULT_UNITS_SOFT_MAX,
297 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
298
299 g_object_class_install_property (gobject_class, PROP_BUFFERS_QUEUED,
300 g_param_spec_uint ("buffers-queued", "Buffers queued",
301 "Number of buffers currently queued", 0, G_MAXUINT, 0,
302 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
303 #if NOT_IMPLEMENTED
304 g_object_class_install_property (gobject_class, PROP_BYTES_QUEUED,
305 g_param_spec_uint ("bytes-queued", "Bytes queued",
306 "Number of bytes currently queued", 0, G_MAXUINT, 0,
307 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
308 g_object_class_install_property (gobject_class, PROP_TIME_QUEUED,
309 g_param_spec_uint64 ("time-queued", "Time queued",
310 "Amount of time currently queued (in nanoseconds)", 0, G_MAXUINT64, 0,
311 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
312 #endif
313
314 g_object_class_install_property (gobject_class, PROP_RECOVER_POLICY,
315 g_param_spec_enum ("recover-policy", "Recover Policy",
316 "How to recover when client reaches the soft max",
317 GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY,
318 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
319 g_object_class_install_property (gobject_class, PROP_TIMEOUT,
320 g_param_spec_uint64 ("timeout", "Timeout",
321 "Maximum inactivity timeout in nanoseconds for a client (0 = no limit)",
322 0, G_MAXUINT64, DEFAULT_TIMEOUT,
323 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
324 g_object_class_install_property (gobject_class, PROP_SYNC_METHOD,
325 g_param_spec_enum ("sync-method", "Sync Method",
326 "How to sync new clients to the stream", GST_TYPE_SYNC_METHOD,
327 DEFAULT_SYNC_METHOD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
328 g_object_class_install_property (gobject_class, PROP_BYTES_TO_SERVE,
329 g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
330 "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
331 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
332 g_object_class_install_property (gobject_class, PROP_BYTES_SERVED,
333 g_param_spec_uint64 ("bytes-served", "Bytes served",
334 "Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
335 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
336
337 g_object_class_install_property (gobject_class, PROP_BURST_FORMAT,
338 g_param_spec_enum ("burst-format", "Burst format",
339 "The format of the burst units (when sync-method is burst[[-with]-keyframe])",
340 GST_TYPE_FORMAT, DEFAULT_BURST_FORMAT,
341 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
342 g_object_class_install_property (gobject_class, PROP_BURST_VALUE,
343 g_param_spec_uint64 ("burst-value", "Burst value",
344 "The amount of burst expressed in burst-format", 0, G_MAXUINT64,
345 DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
346
347 g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
348 g_param_spec_int ("qos-dscp", "QoS diff srv code point",
349 "Quality of Service, differentiated services code point (-1 default)",
350 -1, 63, DEFAULT_QOS_DSCP,
351 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
352
353 /**
354 * GstMultiHandleSink::resend-streamheader
355 *
356 * Resend the streamheaders to existing clients when they change.
357 */
358 g_object_class_install_property (gobject_class, PROP_RESEND_STREAMHEADER,
359 g_param_spec_boolean ("resend-streamheader", "Resend streamheader",
360 "Resend the streamheader if it changes in the caps",
361 DEFAULT_RESEND_STREAMHEADER,
362 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363
364 g_object_class_install_property (gobject_class, PROP_NUM_HANDLES,
365 g_param_spec_uint ("num-handles", "Number of handles",
366 "The current number of client handles",
367 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
368
369 /**
370 * GstMultiHandleSink::clear:
371 * @gstmultihandlesink: the multihandlesink element to emit this signal on
372 *
373 * Remove all sockets from multihandlesink. Since multihandlesink did not
374 * open sockets itself, it does not explicitly close the sockets. The application
375 * should do so by connecting to the client-socket-removed callback.
376 */
377 gst_multi_handle_sink_signals[SIGNAL_CLEAR] =
378 g_signal_new ("clear", G_TYPE_FROM_CLASS (klass),
379 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
380 G_STRUCT_OFFSET (GstMultiHandleSinkClass, clear), NULL, NULL,
381 NULL, G_TYPE_NONE, 0);
382
383 gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
384
385 gst_element_class_set_static_metadata (gstelement_class,
386 "Multi socket sink", "Sink/Network",
387 "Send data to multiple sockets",
388 "Thomas Vander Stichele <thomas at apestaart dot org>, "
389 "Wim Taymans <wim@fluendo.com>, "
390 "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
391
392 gstelement_class->change_state =
393 GST_DEBUG_FUNCPTR (gst_multi_handle_sink_change_state);
394
395 gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_render);
396 klass->client_queue_buffer =
397 GST_DEBUG_FUNCPTR (gst_multi_handle_sink_client_queue_buffer);
398
399 #if 0
400 klass->add = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_add);
401 klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_add_full);
402 klass->remove = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_remove);
403 klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_remove_flush);
404 #endif
405
406 klass->clear = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_clear);
407
408 GST_DEBUG_CATEGORY_INIT (multihandlesink_debug, "multihandlesink", 0,
409 "Multi socket sink");
410
411 gst_type_mark_as_plugin_api (GST_TYPE_RECOVER_POLICY, 0);
412 gst_type_mark_as_plugin_api (GST_TYPE_SYNC_METHOD, 0);
413 gst_type_mark_as_plugin_api (GST_TYPE_CLIENT_STATUS, 0);
414 gst_type_mark_as_plugin_api (GST_TYPE_MULTI_HANDLE_SINK, 0);
415 }
416
417 static void
gst_multi_handle_sink_init(GstMultiHandleSink * this)418 gst_multi_handle_sink_init (GstMultiHandleSink * this)
419 {
420 GST_OBJECT_FLAG_UNSET (this, GST_MULTI_HANDLE_SINK_OPEN);
421
422 CLIENTS_LOCK_INIT (this);
423 this->clients = NULL;
424
425 this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
426 this->unit_format = DEFAULT_UNIT_FORMAT;
427 this->units_max = DEFAULT_UNITS_MAX;
428 this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
429 this->time_min = DEFAULT_TIME_MIN;
430 this->bytes_min = DEFAULT_BYTES_MIN;
431 this->buffers_min = DEFAULT_BUFFERS_MIN;
432 this->recover_policy = DEFAULT_RECOVER_POLICY;
433
434 this->timeout = DEFAULT_TIMEOUT;
435 this->def_sync_method = DEFAULT_SYNC_METHOD;
436
437 this->def_burst_format = DEFAULT_BURST_FORMAT;
438 this->def_burst_value = DEFAULT_BURST_VALUE;
439
440 this->qos_dscp = DEFAULT_QOS_DSCP;
441
442 this->resend_streamheader = DEFAULT_RESEND_STREAMHEADER;
443 }
444
445 static void
gst_multi_handle_sink_finalize(GObject * object)446 gst_multi_handle_sink_finalize (GObject * object)
447 {
448 GstMultiHandleSink *this;
449
450 this = GST_MULTI_HANDLE_SINK (object);
451
452 CLIENTS_LOCK_CLEAR (this);
453 g_array_free (this->bufqueue, TRUE);
454 g_hash_table_destroy (this->handle_hash);
455
456 G_OBJECT_CLASS (parent_class)->finalize (object);
457 }
458
459 gint
gst_multi_handle_sink_setup_dscp_client(GstMultiHandleSink * sink,GstMultiHandleClient * client)460 gst_multi_handle_sink_setup_dscp_client (GstMultiHandleSink * sink,
461 GstMultiHandleClient * client)
462 {
463 #if !defined(IP_TOS) || !defined(HAVE_SYS_SOCKET_H)
464 return 0;
465 #else
466 gint tos;
467 gint ret;
468 int fd;
469 union gst_sockaddr
470 {
471 struct sockaddr sa;
472 struct sockaddr_in6 sa_in6;
473 struct sockaddr_storage sa_stor;
474 } sa;
475 socklen_t slen = sizeof (sa);
476 gint af;
477 GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (sink);
478
479 /* don't touch */
480 if (sink->qos_dscp < 0)
481 return 0;
482
483 fd = mhsinkclass->client_get_fd (client);
484
485 if ((ret = getsockname (fd, &sa.sa, &slen)) < 0) {
486 GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
487 return ret;
488 }
489
490 af = sa.sa.sa_family;
491
492 /* if this is an IPv4-mapped address then do IPv4 QoS */
493 if (af == AF_INET6) {
494
495 GST_DEBUG_OBJECT (sink, "check IP6 socket");
496 if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
497 GST_DEBUG_OBJECT (sink, "mapped to IPV4");
498 af = AF_INET;
499 }
500 }
501
502 /* extract and shift 6 bits of the DSCP */
503 tos = (sink->qos_dscp & 0x3f) << 2;
504
505 switch (af) {
506 case AF_INET:
507 ret = setsockopt (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos));
508 break;
509 case AF_INET6:
510 #ifdef IPV6_TCLASS
511 ret = setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos));
512 break;
513 #endif
514 default:
515 ret = 0;
516 GST_ERROR_OBJECT (sink, "unsupported AF");
517 break;
518 }
519 if (ret)
520 GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));
521
522 return ret;
523 #endif
524 }
525
526 void
gst_multi_handle_sink_client_init(GstMultiHandleClient * client,GstSyncMethod sync_method)527 gst_multi_handle_sink_client_init (GstMultiHandleClient * client,
528 GstSyncMethod sync_method)
529 {
530 client->status = GST_CLIENT_STATUS_OK;
531 client->bufpos = -1;
532 client->flushcount = -1;
533 client->bufoffset = 0;
534 client->sending = NULL;
535 client->bytes_sent = 0;
536 client->dropped_buffers = 0;
537 client->avg_queue_size = 0;
538 client->first_buffer_ts = GST_CLOCK_TIME_NONE;
539 client->last_buffer_ts = GST_CLOCK_TIME_NONE;
540 client->new_connection = TRUE;
541 client->sync_method = sync_method;
542 client->currently_removing = FALSE;
543
544 /* update start time */
545 client->connect_time = g_get_real_time () * GST_USECOND;
546 client->connect_time_monotonic = g_get_monotonic_time () * GST_USECOND;
547 client->disconnect_time = 0;
548 client->disconnect_time_monotonic = 0;
549 /* set last activity time to connect time */
550 client->last_activity_time = client->connect_time;
551 client->last_activity_time_monotonic = client->connect_time_monotonic;
552 }
553
554 static void
gst_multi_handle_sink_setup_dscp(GstMultiHandleSink * mhsink)555 gst_multi_handle_sink_setup_dscp (GstMultiHandleSink * mhsink)
556 {
557 GList *clients;
558
559 CLIENTS_LOCK (mhsink);
560 for (clients = mhsink->clients; clients; clients = clients->next) {
561 GstMultiHandleClient *client;
562
563 client = clients->data;
564
565 gst_multi_handle_sink_setup_dscp_client (mhsink, client);
566 }
567 CLIENTS_UNLOCK (mhsink);
568 }
569
570 void
gst_multi_handle_sink_add_full(GstMultiHandleSink * sink,GstMultiSinkHandle handle,GstSyncMethod sync_method,GstFormat min_format,guint64 min_value,GstFormat max_format,guint64 max_value)571 gst_multi_handle_sink_add_full (GstMultiHandleSink * sink,
572 GstMultiSinkHandle handle, GstSyncMethod sync_method, GstFormat min_format,
573 guint64 min_value, GstFormat max_format, guint64 max_value)
574 {
575 GstMultiHandleClient *mhclient;
576 GList *clink;
577 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
578 gchar debug[30];
579 GstMultiHandleSinkClass *mhsinkclass =
580 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
581
582 if (!sink->running) {
583 g_warning ("Element %s must be set to READY, PAUSED or PLAYING state "
584 "before clients can be added", GST_OBJECT_NAME (sink));
585 return;
586 }
587
588 mhsinkclass->handle_debug (handle, debug);
589 GST_DEBUG_OBJECT (sink, "%s adding client, sync_method %d, "
590 "min_format %d, min_value %" G_GUINT64_FORMAT
591 ", max_format %d, max_value %" G_GUINT64_FORMAT, debug,
592 sync_method, min_format, min_value, max_format, max_value);
593
594 /* do limits check if we can */
595 if (min_format == max_format) {
596 if (max_value != -1 && min_value != -1 && max_value < min_value)
597 goto wrong_limits;
598 }
599
600 CLIENTS_LOCK (sink);
601
602 /* check the hash to find a duplicate handle */
603 clink = g_hash_table_lookup (mhsink->handle_hash,
604 mhsinkclass->handle_hash_key (handle));
605 if (clink != NULL)
606 goto duplicate;
607
608 /* We do not take ownership of @handle in this function, but we can't take a
609 * reference directly as we don't know the concrete type of the handle.
610 * GstMultiHandleSink relies on the derived class to take a reference for us
611 * in new_client: */
612 mhclient = mhsinkclass->new_client (mhsink, handle, sync_method);
613
614 /* we can add the handle now */
615 clink = mhsink->clients = g_list_prepend (mhsink->clients, mhclient);
616 g_hash_table_insert (mhsink->handle_hash,
617 mhsinkclass->handle_hash_key (mhclient->handle), clink);
618 mhsink->clients_cookie++;
619
620
621 mhclient->burst_min_format = min_format;
622 mhclient->burst_min_value = min_value;
623 mhclient->burst_max_format = max_format;
624 mhclient->burst_max_value = max_value;
625
626 if (mhsinkclass->hash_changed)
627 mhsinkclass->hash_changed (mhsink);
628
629 CLIENTS_UNLOCK (sink);
630
631 mhsinkclass->emit_client_added (mhsink, handle);
632
633 return;
634
635 /* errors */
636 wrong_limits:
637 {
638 GST_WARNING_OBJECT (sink,
639 "%s wrong values min =%" G_GUINT64_FORMAT ", max=%"
640 G_GUINT64_FORMAT ", unit %d specified when adding client",
641 debug, min_value, max_value, min_format);
642 return;
643 }
644 duplicate:
645 {
646 CLIENTS_UNLOCK (sink);
647 GST_WARNING_OBJECT (sink, "%s duplicate client found, refusing", debug);
648 mhsinkclass->emit_client_removed (mhsink, handle,
649 GST_CLIENT_STATUS_DUPLICATE);
650 return;
651 }
652 }
653
654 /* "add" signal implementation */
655 void
gst_multi_handle_sink_add(GstMultiHandleSink * sink,GstMultiSinkHandle handle)656 gst_multi_handle_sink_add (GstMultiHandleSink * sink, GstMultiSinkHandle handle)
657 {
658 gst_multi_handle_sink_add_full (sink, handle, sink->def_sync_method,
659 sink->def_burst_format, sink->def_burst_value, sink->def_burst_format,
660 -1);
661 }
662
663 /* "remove" signal implementation */
664 void
gst_multi_handle_sink_remove(GstMultiHandleSink * sink,GstMultiSinkHandle handle)665 gst_multi_handle_sink_remove (GstMultiHandleSink * sink,
666 GstMultiSinkHandle handle)
667 {
668 GList *clink;
669 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
670 GstMultiHandleSinkClass *mhsinkclass =
671 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
672 gchar debug[30];
673
674 mhsinkclass->handle_debug (handle, debug);
675
676 GST_DEBUG_OBJECT (sink, "%s removing client", debug);
677
678 CLIENTS_LOCK (sink);
679 clink = g_hash_table_lookup (mhsink->handle_hash,
680 mhsinkclass->handle_hash_key (handle));
681 if (clink != NULL) {
682 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) clink->data;
683
684 if (mhclient->status != GST_CLIENT_STATUS_OK) {
685 GST_INFO_OBJECT (sink,
686 "%s Client already disconnecting with status %d",
687 debug, mhclient->status);
688 goto done;
689 }
690
691 mhclient->status = GST_CLIENT_STATUS_REMOVED;
692 gst_multi_handle_sink_remove_client_link (GST_MULTI_HANDLE_SINK (sink),
693 clink);
694 if (mhsinkclass->hash_changed)
695 mhsinkclass->hash_changed (mhsink);
696 } else {
697 GST_WARNING_OBJECT (sink, "%s no client with this handle found!", debug);
698 }
699
700 done:
701 CLIENTS_UNLOCK (sink);
702 }
703
704 /* "remove-flush" signal implementation */
705 void
gst_multi_handle_sink_remove_flush(GstMultiHandleSink * sink,GstMultiSinkHandle handle)706 gst_multi_handle_sink_remove_flush (GstMultiHandleSink * sink,
707 GstMultiSinkHandle handle)
708 {
709 GList *clink;
710 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
711 GstMultiHandleSinkClass *mhsinkclass =
712 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
713 gchar debug[30];
714
715 mhsinkclass->handle_debug (handle, debug);
716
717 GST_DEBUG_OBJECT (sink, "%s flushing client", debug);
718
719 CLIENTS_LOCK (sink);
720 clink = g_hash_table_lookup (mhsink->handle_hash,
721 mhsinkclass->handle_hash_key (handle));
722 if (clink != NULL) {
723 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) clink->data;
724
725 if (mhclient->status != GST_CLIENT_STATUS_OK) {
726 GST_INFO_OBJECT (sink,
727 "%s Client already disconnecting with status %d",
728 mhclient->debug, mhclient->status);
729 goto done;
730 }
731
732 /* take the position of the client as the number of buffers left to flush.
733 * If the client was at position -1, we flush 0 buffers, 0 == flush 1
734 * buffer, etc... */
735 mhclient->flushcount = mhclient->bufpos + 1;
736 /* mark client as flushing. We can not remove the client right away because
737 * it might have some buffers to flush in the ->sending queue. */
738 mhclient->status = GST_CLIENT_STATUS_FLUSHING;
739 } else {
740 GST_WARNING_OBJECT (sink, "%s no client with this handle found!", debug);
741 }
742 done:
743 CLIENTS_UNLOCK (sink);
744 }
745
746 /* can be called both through the signal (i.e. from any thread) or when
747 * stopping, after the writing thread has shut down */
748 static void
gst_multi_handle_sink_clear(GstMultiHandleSink * mhsink)749 gst_multi_handle_sink_clear (GstMultiHandleSink * mhsink)
750 {
751 GList *clients, *next;
752 guint32 cookie;
753 GstMultiHandleSinkClass *mhsinkclass =
754 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
755
756 GST_DEBUG_OBJECT (mhsink, "clearing all clients");
757
758 CLIENTS_LOCK (mhsink);
759 restart:
760 cookie = mhsink->clients_cookie;
761 for (clients = mhsink->clients; clients; clients = next) {
762 GstMultiHandleClient *mhclient;
763
764 if (cookie != mhsink->clients_cookie) {
765 GST_DEBUG_OBJECT (mhsink, "cookie changed while removing all clients");
766 goto restart;
767 }
768
769 mhclient = (GstMultiHandleClient *) clients->data;
770 next = g_list_next (clients);
771
772 mhclient->status = GST_CLIENT_STATUS_REMOVED;
773 /* the next call changes the list, which is why we iterate
774 * with a temporary next pointer */
775 gst_multi_handle_sink_remove_client_link (mhsink, clients);
776 }
777 if (mhsinkclass->hash_changed)
778 mhsinkclass->hash_changed (mhsink);
779
780 CLIENTS_UNLOCK (mhsink);
781 }
782
783
784 /* "get-stats" signal implementation
785 */
786 GstStructure *
gst_multi_handle_sink_get_stats(GstMultiHandleSink * sink,GstMultiSinkHandle handle)787 gst_multi_handle_sink_get_stats (GstMultiHandleSink * sink,
788 GstMultiSinkHandle handle)
789 {
790 GstMultiHandleClient *client;
791 GstStructure *result = NULL;
792 GList *clink;
793 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
794 GstMultiHandleSinkClass *mhsinkclass =
795 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
796 gchar debug[30];
797
798 mhsinkclass->handle_debug (handle, debug);
799
800 CLIENTS_LOCK (sink);
801 clink = g_hash_table_lookup (mhsink->handle_hash,
802 mhsinkclass->handle_hash_key (handle));
803 if (clink == NULL)
804 goto noclient;
805
806 client = clink->data;
807 if (client != NULL) {
808 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
809 guint64 interval;
810
811 result = gst_structure_new_empty ("multihandlesink-stats");
812
813 if (mhclient->disconnect_time_monotonic == 0) {
814 interval =
815 (g_get_monotonic_time () * GST_USECOND) -
816 mhclient->connect_time_monotonic;
817 } else {
818 interval =
819 mhclient->disconnect_time_monotonic -
820 mhclient->connect_time_monotonic;
821 }
822
823 gst_structure_set (result,
824 "bytes-sent", G_TYPE_UINT64, mhclient->bytes_sent,
825 "connect-time", G_TYPE_UINT64, mhclient->connect_time,
826 "connect-time-monotonic", G_TYPE_UINT64,
827 mhclient->connect_time_monotonic, "disconnect-time", G_TYPE_UINT64,
828 mhclient->disconnect_time, "disconnect-time-monotonic", G_TYPE_UINT64,
829 mhclient->disconnect_time_monotonic, "connect-duration", G_TYPE_UINT64,
830 interval, "last-activity-time-monotonic", G_TYPE_UINT64,
831 mhclient->last_activity_time_monotonic, "buffers-dropped",
832 G_TYPE_UINT64, mhclient->dropped_buffers, "first-buffer-ts",
833 G_TYPE_UINT64, mhclient->first_buffer_ts, "last-buffer-ts",
834 G_TYPE_UINT64, mhclient->last_buffer_ts, NULL);
835 }
836
837 noclient:
838 CLIENTS_UNLOCK (sink);
839
840 /* python doesn't like a NULL pointer yet */
841 if (result == NULL) {
842 GST_WARNING_OBJECT (sink, "%s no client with this found!", debug);
843 result = gst_structure_new_empty ("multihandlesink-stats");
844 }
845
846 return result;
847 }
848
849 /* should be called with the clientslock held.
850 * Note that we don't close the fd as we didn't open it in the first
851 * place. An application should connect to the client-fd-removed signal and
852 * close the fd itself.
853 */
854 void
gst_multi_handle_sink_remove_client_link(GstMultiHandleSink * sink,GList * link)855 gst_multi_handle_sink_remove_client_link (GstMultiHandleSink * sink,
856 GList * link)
857 {
858 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) link->data;
859 GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (sink);
860
861 if (mhclient->currently_removing) {
862 GST_WARNING_OBJECT (sink, "%s client is already being removed",
863 mhclient->debug);
864 return;
865 } else {
866 mhclient->currently_removing = TRUE;
867 }
868
869 /* FIXME: if we keep track of ip we can log it here and signal */
870 switch (mhclient->status) {
871 case GST_CLIENT_STATUS_OK:
872 GST_WARNING_OBJECT (sink, "%s removing client %p for no reason",
873 mhclient->debug, mhclient);
874 break;
875 case GST_CLIENT_STATUS_CLOSED:
876 GST_DEBUG_OBJECT (sink, "%s removing client %p because of close",
877 mhclient->debug, mhclient);
878 break;
879 case GST_CLIENT_STATUS_REMOVED:
880 GST_DEBUG_OBJECT (sink,
881 "%s removing client %p because the app removed it", mhclient->debug,
882 mhclient);
883 break;
884 case GST_CLIENT_STATUS_SLOW:
885 GST_INFO_OBJECT (sink,
886 "%s removing client %p because it was too slow", mhclient->debug,
887 mhclient);
888 break;
889 case GST_CLIENT_STATUS_ERROR:
890 GST_WARNING_OBJECT (sink,
891 "%s removing client %p because of error", mhclient->debug, mhclient);
892 break;
893 case GST_CLIENT_STATUS_FLUSHING:
894 default:
895 GST_WARNING_OBJECT (sink,
896 "%s removing client %p with invalid reason %d", mhclient->debug,
897 mhclient, mhclient->status);
898 break;
899 }
900
901 mhsinkclass->hash_removing (sink, mhclient);
902
903 mhclient->disconnect_time = g_get_real_time () * GST_USECOND;
904 mhclient->disconnect_time_monotonic = g_get_monotonic_time () * GST_USECOND;
905
906 /* free client buffers */
907 g_slist_foreach (mhclient->sending, (GFunc) gst_mini_object_unref, NULL);
908 g_slist_free (mhclient->sending);
909 mhclient->sending = NULL;
910
911 if (mhclient->caps)
912 gst_caps_unref (mhclient->caps);
913 mhclient->caps = NULL;
914
915 /* unlock the mutex before signaling because the signal handler
916 * might query some properties */
917 CLIENTS_UNLOCK (sink);
918
919 mhsinkclass->emit_client_removed (sink, mhclient->handle, mhclient->status);
920
921 /* lock again before we remove the client completely */
922 CLIENTS_LOCK (sink);
923
924 /* handle cannot be reused in the above signal callback so we can safely
925 * remove it from the hashtable here */
926 if (!g_hash_table_remove (sink->handle_hash,
927 mhsinkclass->handle_hash_key (mhclient->handle))) {
928 GST_WARNING_OBJECT (sink,
929 "%s error removing client %p from hash", mhclient->debug, mhclient);
930 }
931 /* after releasing the lock above, the link could be invalid, more
932 * precisely, the next and prev pointers could point to invalid list
933 * links. One optimisation could be to add a cookie to the linked list
934 * and take a shortcut when it did not change between unlocking and locking
935 * our mutex. For now we just walk the list again. */
936 sink->clients = g_list_remove (sink->clients, mhclient);
937 sink->clients_cookie++;
938
939 if (mhsinkclass->removed)
940 mhsinkclass->removed (sink, mhclient->handle);
941
942 CLIENTS_UNLOCK (sink);
943
944 /* sub-class must implement this to emit the client-$handle-removed signal */
945 g_assert (mhsinkclass->client_free != NULL);
946
947 /* and the handle is really gone now */
948 mhsinkclass->client_free (sink, mhclient);
949
950 g_free (mhclient);
951
952 CLIENTS_LOCK (sink);
953 }
954
955 static gboolean
gst_multi_handle_sink_client_queue_buffer(GstMultiHandleSink * mhsink,GstMultiHandleClient * mhclient,GstBuffer * buffer)956 gst_multi_handle_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
957 GstMultiHandleClient * mhclient, GstBuffer * buffer)
958 {
959 GstMultiHandleSink *sink = GST_MULTI_HANDLE_SINK (mhsink);
960 GstCaps *caps;
961
962 /* TRUE: send them if the new caps have them */
963 gboolean send_streamheader = FALSE;
964 GstStructure *s;
965
966 /* before we queue the buffer, we check if we need to queue streamheader
967 * buffers (because it's a new client, or because they changed) */
968 caps = gst_pad_get_current_caps (GST_BASE_SINK_PAD (sink));
969
970 if (!mhclient->caps) {
971 if (caps) {
972 GST_DEBUG_OBJECT (sink,
973 "%s no previous caps for this client, send streamheader",
974 mhclient->debug);
975 send_streamheader = TRUE;
976 mhclient->caps = gst_caps_ref (caps);
977 }
978 } else {
979 /* there were previous caps recorded, so compare */
980 if (caps && !gst_caps_is_equal (caps, mhclient->caps)) {
981 const GValue *sh1, *sh2;
982
983 /* caps are not equal, but could still have the same streamheader */
984 s = gst_caps_get_structure (caps, 0);
985 if (!gst_structure_has_field (s, "streamheader")) {
986 /* no new streamheader, so nothing new to send */
987 GST_DEBUG_OBJECT (sink,
988 "%s new caps do not have streamheader, not sending",
989 mhclient->debug);
990 } else {
991 /* there is a new streamheader */
992 s = gst_caps_get_structure (mhclient->caps, 0);
993 if (!gst_structure_has_field (s, "streamheader")) {
994 /* no previous streamheader, so send the new one */
995 GST_DEBUG_OBJECT (sink,
996 "%s previous caps did not have streamheader, sending",
997 mhclient->debug);
998 send_streamheader = TRUE;
999 } else {
1000 /* both old and new caps have streamheader set */
1001 if (!mhsink->resend_streamheader) {
1002 GST_DEBUG_OBJECT (sink,
1003 "%s asked to not resend the streamheader, not sending",
1004 mhclient->debug);
1005 send_streamheader = FALSE;
1006 } else {
1007 sh1 = gst_structure_get_value (s, "streamheader");
1008 s = gst_caps_get_structure (caps, 0);
1009 sh2 = gst_structure_get_value (s, "streamheader");
1010 if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) {
1011 GST_DEBUG_OBJECT (sink,
1012 "%s new streamheader different from old, sending",
1013 mhclient->debug);
1014 send_streamheader = TRUE;
1015 }
1016 }
1017 }
1018 }
1019 }
1020 /* Replace the old caps */
1021 gst_caps_replace (&mhclient->caps, caps);
1022 }
1023
1024 if (G_UNLIKELY (send_streamheader)) {
1025 const GValue *sh;
1026 GArray *buffers;
1027 int i;
1028
1029 GST_LOG_OBJECT (sink,
1030 "%s sending streamheader from caps %" GST_PTR_FORMAT,
1031 mhclient->debug, caps);
1032 s = gst_caps_get_structure (caps, 0);
1033 if (!gst_structure_has_field (s, "streamheader")) {
1034 GST_DEBUG_OBJECT (sink,
1035 "%s no new streamheader, so nothing to send", mhclient->debug);
1036 } else {
1037 GST_LOG_OBJECT (sink,
1038 "%s sending streamheader from caps %" GST_PTR_FORMAT,
1039 mhclient->debug, caps);
1040 sh = gst_structure_get_value (s, "streamheader");
1041 g_assert (G_VALUE_TYPE (sh) == GST_TYPE_ARRAY);
1042 buffers = g_value_peek_pointer (sh);
1043 GST_DEBUG_OBJECT (sink, "%d streamheader buffers", buffers->len);
1044 for (i = 0; i < buffers->len; ++i) {
1045 GValue *bufval;
1046 GstBuffer *buffer;
1047
1048 bufval = &g_array_index (buffers, GValue, i);
1049 g_assert (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER);
1050 buffer = g_value_peek_pointer (bufval);
1051 GST_DEBUG_OBJECT (sink,
1052 "%s queueing streamheader buffer of length %" G_GSIZE_FORMAT,
1053 mhclient->debug, gst_buffer_get_size (buffer));
1054 gst_buffer_ref (buffer);
1055
1056 mhclient->sending = g_slist_append (mhclient->sending, buffer);
1057 }
1058 }
1059 }
1060
1061 if (caps)
1062 gst_caps_unref (caps);
1063 caps = NULL;
1064
1065 GST_LOG_OBJECT (sink, "%s queueing buffer of length %" G_GSIZE_FORMAT,
1066 mhclient->debug, gst_buffer_get_size (buffer));
1067
1068 gst_buffer_ref (buffer);
1069 mhclient->sending = g_slist_append (mhclient->sending, buffer);
1070
1071 return TRUE;
1072 }
1073
1074 static gboolean
is_sync_frame(GstMultiHandleSink * sink,GstBuffer * buffer)1075 is_sync_frame (GstMultiHandleSink * sink, GstBuffer * buffer)
1076 {
1077 if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT))
1078 return FALSE;
1079 return TRUE;
1080 }
1081
1082 /* find the keyframe in the list of buffers starting the
1083 * search from @idx. @direction as -1 will search backwards,
1084 * 1 will search forwards.
1085 * Returns: the index or -1 if there is no keyframe after idx.
1086 */
1087 gint
find_syncframe(GstMultiHandleSink * sink,gint idx,gint direction)1088 find_syncframe (GstMultiHandleSink * sink, gint idx, gint direction)
1089 {
1090 gint i, len, result;
1091
1092 /* take length of queued buffers */
1093 len = sink->bufqueue->len;
1094
1095 /* assume we don't find a keyframe */
1096 result = -1;
1097
1098 /* then loop over all buffers to find the first keyframe */
1099 for (i = idx; i >= 0 && i < len; i += direction) {
1100 GstBuffer *buf;
1101
1102 buf = g_array_index (sink->bufqueue, GstBuffer *, i);
1103 if (is_sync_frame (sink, buf)) {
1104 GST_LOG_OBJECT (sink, "found keyframe at %d from %d, direction %d",
1105 i, idx, direction);
1106 result = i;
1107 break;
1108 }
1109 }
1110 return result;
1111 }
1112
1113 /* Get the number of buffers from the buffer queue needed to satisfy
1114 * the maximum max in the configured units.
1115 * If units are not BUFFERS, and there are insufficient buffers in the
1116 * queue to satisfy the limit, return len(queue) + 1 */
1117 gint
get_buffers_max(GstMultiHandleSink * sink,gint64 max)1118 get_buffers_max (GstMultiHandleSink * sink, gint64 max)
1119 {
1120 switch (sink->unit_format) {
1121 case GST_FORMAT_BUFFERS:
1122 return max;
1123 case GST_FORMAT_TIME:
1124 {
1125 GstBuffer *buf;
1126 int i;
1127 int len;
1128 gint64 diff;
1129 GstClockTime first = GST_CLOCK_TIME_NONE;
1130
1131 len = sink->bufqueue->len;
1132
1133 for (i = 0; i < len; i++) {
1134 buf = g_array_index (sink->bufqueue, GstBuffer *, i);
1135 if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
1136 if (first == -1)
1137 first = GST_BUFFER_TIMESTAMP (buf);
1138
1139 diff = first - GST_BUFFER_TIMESTAMP (buf);
1140
1141 if (diff > max)
1142 return i + 1;
1143 }
1144 }
1145 return len + 1;
1146 }
1147 case GST_FORMAT_BYTES:
1148 {
1149 GstBuffer *buf;
1150 int i;
1151 int len;
1152 gint acc = 0;
1153
1154 len = sink->bufqueue->len;
1155
1156 for (i = 0; i < len; i++) {
1157 buf = g_array_index (sink->bufqueue, GstBuffer *, i);
1158 acc += gst_buffer_get_size (buf);
1159
1160 if (acc > max)
1161 return i + 1;
1162 }
1163 return len + 1;
1164 }
1165 default:
1166 return max;
1167 }
1168 }
1169
1170 /* find the positions in the buffer queue where *_min and *_max
1171 * is satisfied
1172 */
1173 /* count the amount of data in the buffers and return the index
1174 * that satisfies the given limits.
1175 *
1176 * Returns: index @idx in the buffer queue so that the given limits are
1177 * satisfied. TRUE if all the limits could be satisfied, FALSE if not
1178 * enough data was in the queue.
1179 *
1180 * FIXME, this code might now work if any of the units is in buffers...
1181 */
1182 gboolean
find_limits(GstMultiHandleSink * sink,gint * min_idx,gint bytes_min,gint buffers_min,gint64 time_min,gint * max_idx,gint bytes_max,gint buffers_max,gint64 time_max)1183 find_limits (GstMultiHandleSink * sink,
1184 gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min,
1185 gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max)
1186 {
1187 GstClockTime first, time;
1188 gint i, len, bytes;
1189 gboolean result, max_hit;
1190
1191 /* take length of queue */
1192 len = sink->bufqueue->len;
1193
1194 /* this must hold */
1195 g_assert (len > 0);
1196
1197 GST_LOG_OBJECT (sink,
1198 "bytes_min %d, buffers_min %d, time_min %" GST_TIME_FORMAT
1199 ", bytes_max %d, buffers_max %d, time_max %" GST_TIME_FORMAT, bytes_min,
1200 buffers_min, GST_TIME_ARGS (time_min), bytes_max, buffers_max,
1201 GST_TIME_ARGS (time_max));
1202
1203 /* do the trivial buffer limit test */
1204 if (buffers_min != -1 && len < buffers_min) {
1205 *min_idx = len - 1;
1206 *max_idx = len - 1;
1207 return FALSE;
1208 }
1209
1210 result = FALSE;
1211 /* else count bytes and time */
1212 first = -1;
1213 bytes = 0;
1214 /* unset limits */
1215 *min_idx = -1;
1216 *max_idx = -1;
1217 max_hit = FALSE;
1218
1219 i = 0;
1220 /* loop through the buffers, when a limit is ok, mark it
1221 * as -1, we have at least one buffer in the queue. */
1222 do {
1223 GstBuffer *buf;
1224
1225 /* if we checked all min limits, update result */
1226 if (bytes_min == -1 && time_min == -1 && *min_idx == -1) {
1227 /* don't go below 0 */
1228 *min_idx = MAX (i - 1, 0);
1229 }
1230 /* if we reached one max limit break out */
1231 if (max_hit) {
1232 /* i > 0 when we get here, we subtract one to get the position
1233 * of the previous buffer. */
1234 *max_idx = i - 1;
1235 /* we have valid complete result if we found a min_idx too */
1236 result = *min_idx != -1;
1237 break;
1238 }
1239 buf = g_array_index (sink->bufqueue, GstBuffer *, i);
1240
1241 bytes += gst_buffer_get_size (buf);
1242
1243 /* take timestamp and save for the base first timestamp */
1244 if ((time = GST_BUFFER_TIMESTAMP (buf)) != -1) {
1245 GST_LOG_OBJECT (sink, "Ts %" GST_TIME_FORMAT " on buffer",
1246 GST_TIME_ARGS (time));
1247 if (first == -1)
1248 first = time;
1249
1250 /* increase max usage if we did not fill enough. Note that
1251 * buffers are sorted from new to old, so the first timestamp is
1252 * bigger than the next one. */
1253 if (time_min != -1 && first - time >= time_min)
1254 time_min = -1;
1255 if (time_max != -1 && first - time >= time_max)
1256 max_hit = TRUE;
1257 } else {
1258 GST_LOG_OBJECT (sink, "No timestamp on buffer");
1259 }
1260 /* time is OK or unknown, check and increase if not enough bytes */
1261 if (bytes_min != -1) {
1262 if (bytes >= bytes_min)
1263 bytes_min = -1;
1264 }
1265 if (bytes_max != -1) {
1266 if (bytes >= bytes_max) {
1267 max_hit = TRUE;
1268 }
1269 }
1270 i++;
1271 }
1272 while (i < len);
1273
1274 /* if we did not hit the max or min limit, set to buffer size */
1275 if (*max_idx == -1)
1276 *max_idx = len - 1;
1277 /* make sure min does not exceed max */
1278 if (*min_idx == -1)
1279 *min_idx = *max_idx;
1280
1281 return result;
1282 }
1283
1284 /* parse the unit/value pair and assign it to the result value of the
1285 * right type, leave the other values untouched
1286 *
1287 * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise.
1288 */
1289 static gboolean
assign_value(GstFormat format,guint64 value,gint * bytes,gint * buffers,GstClockTime * time)1290 assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers,
1291 GstClockTime * time)
1292 {
1293 gboolean res = TRUE;
1294
1295 /* set only the limit of the given format to the given value */
1296 switch (format) {
1297 case GST_FORMAT_BUFFERS:
1298 *buffers = (gint) value;
1299 break;
1300 case GST_FORMAT_TIME:
1301 *time = value;
1302 break;
1303 case GST_FORMAT_BYTES:
1304 *bytes = (gint) value;
1305 break;
1306 case GST_FORMAT_UNDEFINED:
1307 default:
1308 res = FALSE;
1309 break;
1310 }
1311 return res;
1312 }
1313
1314 /* count the index in the buffer queue to satisfy the given unit
1315 * and value pair starting from buffer at index 0.
1316 *
1317 * Returns: TRUE if there was enough data in the queue to satisfy the
1318 * burst values. @idx contains the index in the buffer that contains enough
1319 * data to satisfy the limits or the last buffer in the queue when the
1320 * function returns FALSE.
1321 */
1322 static gboolean
count_burst_unit(GstMultiHandleSink * sink,gint * min_idx,GstFormat min_format,guint64 min_value,gint * max_idx,GstFormat max_format,guint64 max_value)1323 count_burst_unit (GstMultiHandleSink * sink, gint * min_idx,
1324 GstFormat min_format, guint64 min_value, gint * max_idx,
1325 GstFormat max_format, guint64 max_value)
1326 {
1327 gint bytes_min = -1, buffers_min = -1;
1328 gint bytes_max = -1, buffers_max = -1;
1329 GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE;
1330
1331 assign_value (min_format, min_value, &bytes_min, &buffers_min, &time_min);
1332 assign_value (max_format, max_value, &bytes_max, &buffers_max, &time_max);
1333
1334 return find_limits (sink, min_idx, bytes_min, buffers_min, time_min,
1335 max_idx, bytes_max, buffers_max, time_max);
1336 }
1337
1338 /* decide where in the current buffer queue this new client should start
1339 * receiving buffers from.
1340 * This function is called whenever a client is connected and has not yet
1341 * received a buffer.
1342 * If this returns -1, it means that we haven't found a good point to
1343 * start streaming from yet, and this function should be called again later
1344 * when more buffers have arrived.
1345 */
1346 gint
gst_multi_handle_sink_new_client_position(GstMultiHandleSink * sink,GstMultiHandleClient * client)1347 gst_multi_handle_sink_new_client_position (GstMultiHandleSink * sink,
1348 GstMultiHandleClient * client)
1349 {
1350 gint result;
1351
1352 GST_DEBUG_OBJECT (sink,
1353 "%s new client, deciding where to start in queue", client->debug);
1354 GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long",
1355 sink->bufqueue->len);
1356 switch (client->sync_method) {
1357 case GST_SYNC_METHOD_LATEST:
1358 /* no syncing, we are happy with whatever the client is going to get */
1359 result = client->bufpos;
1360 GST_DEBUG_OBJECT (sink,
1361 "%s SYNC_METHOD_LATEST, position %d", client->debug, result);
1362 break;
1363 case GST_SYNC_METHOD_NEXT_KEYFRAME:
1364 {
1365 /* if one of the new buffers (between client->bufpos and 0) in the queue
1366 * is a sync point, we can proceed, otherwise we need to keep waiting */
1367 GST_LOG_OBJECT (sink,
1368 "%s new client, bufpos %d, waiting for keyframe",
1369 client->debug, client->bufpos);
1370
1371 result = find_prev_syncframe (sink, client->bufpos);
1372 if (result != -1) {
1373 GST_DEBUG_OBJECT (sink,
1374 "%s SYNC_METHOD_NEXT_KEYFRAME: result %d", client->debug, result);
1375 break;
1376 }
1377
1378 /* client is not on a syncbuffer, need to skip these buffers and
1379 * wait some more */
1380 GST_LOG_OBJECT (sink,
1381 "%s new client, skipping buffer(s), no syncpoint found",
1382 client->debug);
1383 client->bufpos = -1;
1384 break;
1385 }
1386 case GST_SYNC_METHOD_LATEST_KEYFRAME:
1387 {
1388 GST_DEBUG_OBJECT (sink, "%s SYNC_METHOD_LATEST_KEYFRAME", client->debug);
1389
1390 /* for new clients we initially scan the complete buffer queue for
1391 * a sync point when a buffer is added. If we don't find a keyframe,
1392 * we need to wait for the next keyframe and so we change the client's
1393 * sync method to GST_SYNC_METHOD_NEXT_KEYFRAME.
1394 */
1395 result = find_next_syncframe (sink, 0);
1396 if (result != -1) {
1397 GST_DEBUG_OBJECT (sink,
1398 "%s SYNC_METHOD_LATEST_KEYFRAME: result %d", client->debug, result);
1399 break;
1400 }
1401
1402 GST_DEBUG_OBJECT (sink,
1403 "%s SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, "
1404 "switching to SYNC_METHOD_NEXT_KEYFRAME", client->debug);
1405 /* throw client to the waiting state */
1406 client->bufpos = -1;
1407 /* and make client sync to next keyframe */
1408 client->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
1409 break;
1410 }
1411 case GST_SYNC_METHOD_BURST:
1412 {
1413 gboolean ok;
1414 gint max;
1415
1416 /* move to the position where we satisfy the client's burst
1417 * parameters. If we could not satisfy the parameters because there
1418 * is not enough data, we just send what we have (which is in result).
1419 * We use the max value to limit the search
1420 */
1421 ok = count_burst_unit (sink, &result, client->burst_min_format,
1422 client->burst_min_value, &max, client->burst_max_format,
1423 client->burst_max_value);
1424 GST_DEBUG_OBJECT (sink,
1425 "%s SYNC_METHOD_BURST: burst_unit returned %d, result %d",
1426 client->debug, ok, result);
1427
1428 GST_LOG_OBJECT (sink, "min %d, max %d", result, max);
1429
1430 /* we hit the max and it is below the min, use that then */
1431 if (max != -1 && max <= result) {
1432 result = MAX (max - 1, 0);
1433 GST_DEBUG_OBJECT (sink,
1434 "%s SYNC_METHOD_BURST: result above max, taken down to %d",
1435 client->debug, result);
1436 }
1437 break;
1438 }
1439 case GST_SYNC_METHOD_BURST_KEYFRAME:
1440 {
1441 gint min_idx, max_idx;
1442 gint next_syncframe, prev_syncframe;
1443
1444 /* BURST_KEYFRAME:
1445 *
1446 * _always_ start sending a keyframe to the client. We first search
1447 * a keyframe between min/max limits. If there is none, we send it the
1448 * last keyframe before min. If there is none, the behaviour is like
1449 * NEXT_KEYFRAME.
1450 */
1451 /* gather burst limits */
1452 count_burst_unit (sink, &min_idx, client->burst_min_format,
1453 client->burst_min_value, &max_idx, client->burst_max_format,
1454 client->burst_max_value);
1455
1456 GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
1457
1458 /* first find a keyframe after min_idx */
1459 next_syncframe = find_next_syncframe (sink, min_idx);
1460 if (next_syncframe != -1 && next_syncframe < max_idx) {
1461 /* we have a valid keyframe and it's below the max */
1462 GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
1463 result = next_syncframe;
1464 break;
1465 }
1466
1467 /* no valid keyframe, try to find one below min */
1468 prev_syncframe = find_prev_syncframe (sink, min_idx);
1469 if (prev_syncframe != -1) {
1470 GST_WARNING_OBJECT (sink,
1471 "using keyframe below min in BURST_KEYFRAME sync mode");
1472 result = prev_syncframe;
1473 break;
1474 }
1475
1476 /* no prev keyframe or not enough data */
1477 GST_WARNING_OBJECT (sink,
1478 "no prev keyframe found in BURST_KEYFRAME sync mode, waiting for next");
1479
1480 /* throw client to the waiting state */
1481 client->bufpos = -1;
1482 /* and make client sync to next keyframe */
1483 client->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
1484 result = -1;
1485 break;
1486 }
1487 case GST_SYNC_METHOD_BURST_WITH_KEYFRAME:
1488 {
1489 gint min_idx, max_idx;
1490 gint next_syncframe;
1491
1492 /* BURST_WITH_KEYFRAME:
1493 *
1494 * try to start sending a keyframe to the client. We first search
1495 * a keyframe between min/max limits. If there is none, we send it the
1496 * amount of data up 'till min.
1497 */
1498 /* gather enough data to burst */
1499 count_burst_unit (sink, &min_idx, client->burst_min_format,
1500 client->burst_min_value, &max_idx, client->burst_max_format,
1501 client->burst_max_value);
1502
1503 GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
1504
1505 /* first find a keyframe after min_idx */
1506 next_syncframe = find_next_syncframe (sink, min_idx);
1507 if (next_syncframe != -1 && next_syncframe < max_idx) {
1508 /* we have a valid keyframe and it's below the max */
1509 GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
1510 result = next_syncframe;
1511 break;
1512 }
1513
1514 /* no keyframe, send data from min_idx */
1515 GST_WARNING_OBJECT (sink, "using min in BURST_WITH_KEYFRAME sync mode");
1516
1517 /* make sure we don't go over the max limit */
1518 if (max_idx != -1 && max_idx <= min_idx) {
1519 result = MAX (max_idx - 1, 0);
1520 } else {
1521 result = min_idx;
1522 }
1523
1524 break;
1525 }
1526 default:
1527 g_warning ("unknown sync method %d", client->sync_method);
1528 result = client->bufpos;
1529 break;
1530 }
1531 return result;
1532 }
1533
1534 /* calculate the new position for a client after recovery. This function
1535 * does not update the client position but merely returns the required
1536 * position.
1537 */
1538 gint
gst_multi_handle_sink_recover_client(GstMultiHandleSink * sink,GstMultiHandleClient * client)1539 gst_multi_handle_sink_recover_client (GstMultiHandleSink * sink,
1540 GstMultiHandleClient * client)
1541 {
1542 gint newbufpos;
1543
1544 GST_WARNING_OBJECT (sink,
1545 "%s client %p is lagging at %d, recover using policy %d",
1546 client->debug, client, client->bufpos, sink->recover_policy);
1547
1548 switch (sink->recover_policy) {
1549 case GST_RECOVER_POLICY_NONE:
1550 /* do nothing, client will catch up or get kicked out when it reaches
1551 * the hard max */
1552 newbufpos = client->bufpos;
1553 break;
1554 case GST_RECOVER_POLICY_RESYNC_LATEST:
1555 /* move to beginning of queue */
1556 newbufpos = -1;
1557 break;
1558 case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT:
1559 /* move to beginning of soft max */
1560 newbufpos = get_buffers_max (sink, sink->units_soft_max);
1561 break;
1562 case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
1563 /* find keyframe in buffers, we search backwards to find the
1564 * closest keyframe relative to what this client already received. */
1565 newbufpos = MIN (sink->bufqueue->len - 1,
1566 get_buffers_max (sink, sink->units_soft_max) - 1);
1567
1568 while (newbufpos >= 0) {
1569 GstBuffer *buf;
1570
1571 buf = g_array_index (sink->bufqueue, GstBuffer *, newbufpos);
1572 if (is_sync_frame (sink, buf)) {
1573 /* found a buffer that is not a delta unit */
1574 break;
1575 }
1576 newbufpos--;
1577 }
1578 break;
1579 default:
1580 /* unknown recovery procedure */
1581 newbufpos = get_buffers_max (sink, sink->units_soft_max);
1582 break;
1583 }
1584 return newbufpos;
1585 }
1586
1587 /* Queue a buffer on the global queue.
1588 *
1589 * This function adds the buffer to the front of a GArray. It removes the
1590 * tail buffer if the max queue size is exceeded, unreffing the queued buffer.
1591 * Note that unreffing the buffer is not a problem as clients who
1592 * started writing out this buffer will still have a reference to it in the
1593 * mhclient->sending queue.
1594 *
1595 * After adding the buffer, we update all client positions in the queue. If
1596 * a client moves over the soft max, we start the recovery procedure for this
1597 * slow client. If it goes over the hard max, it is put into the slow list
1598 * and removed.
1599 *
1600 * Special care is taken of clients that were waiting for a new buffer (they
1601 * had a position of -1) because they can proceed after adding this new buffer.
1602 * This is done by adding the client back into the write fd_set and signaling
1603 * the select thread that the fd_set changed.
1604 */
1605 static void
gst_multi_handle_sink_queue_buffer(GstMultiHandleSink * mhsink,GstBuffer * buffer)1606 gst_multi_handle_sink_queue_buffer (GstMultiHandleSink * mhsink,
1607 GstBuffer * buffer)
1608 {
1609 GList *clients, *next;
1610 gint queuelen;
1611 gboolean hash_changed = FALSE;
1612 gint max_buffer_usage;
1613 gint i;
1614 GstClockTime now;
1615 gint max_buffers, soft_max_buffers;
1616 guint cookie;
1617 GstMultiHandleSink *sink = GST_MULTI_HANDLE_SINK (mhsink);
1618 GstMultiHandleSinkClass *mhsinkclass =
1619 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1620
1621 CLIENTS_LOCK (mhsink);
1622 /* add buffer to queue */
1623 g_array_prepend_val (mhsink->bufqueue, buffer);
1624 queuelen = mhsink->bufqueue->len;
1625
1626 if (mhsink->units_max > 0)
1627 max_buffers = get_buffers_max (mhsink, mhsink->units_max);
1628 else
1629 max_buffers = -1;
1630
1631 if (mhsink->units_soft_max > 0)
1632 soft_max_buffers = get_buffers_max (mhsink, mhsink->units_soft_max);
1633 else
1634 soft_max_buffers = -1;
1635 GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers,
1636 soft_max_buffers);
1637
1638 /* then loop over the clients and update the positions */
1639 cookie = mhsink->clients_cookie;
1640 for (clients = mhsink->clients; clients; clients = clients->next) {
1641 GstMultiHandleClient *mhclient = clients->data;
1642
1643 mhclient->bufpos++;
1644 GST_LOG_OBJECT (sink, "%s client %p at position %d",
1645 mhclient->debug, mhclient, mhclient->bufpos);
1646
1647 /* check soft max if needed, recover client */
1648 if (soft_max_buffers > 0 && mhclient->bufpos >= soft_max_buffers) {
1649 gint newpos;
1650
1651 newpos = gst_multi_handle_sink_recover_client (mhsink, mhclient);
1652 if (newpos != mhclient->bufpos) {
1653 mhclient->dropped_buffers += mhclient->bufpos - newpos;
1654 mhclient->bufpos = newpos;
1655 mhclient->discont = TRUE;
1656 GST_INFO_OBJECT (sink, "%s client %p position reset to %d",
1657 mhclient->debug, mhclient, mhclient->bufpos);
1658 } else {
1659 GST_INFO_OBJECT (sink,
1660 "%s client %p not recovering position", mhclient->debug, mhclient);
1661 }
1662 }
1663 }
1664
1665 max_buffer_usage = 0;
1666 now = g_get_monotonic_time () * GST_USECOND;
1667
1668 /* now check for new or slow clients */
1669 restart:
1670 cookie = mhsink->clients_cookie;
1671 for (clients = mhsink->clients; clients; clients = next) {
1672 GstMultiHandleClient *mhclient = clients->data;
1673
1674 if (cookie != mhsink->clients_cookie) {
1675 GST_DEBUG_OBJECT (sink, "Clients cookie outdated, restarting");
1676 goto restart;
1677 }
1678
1679 next = g_list_next (clients);
1680
1681 /* check hard max and timeout, remove client */
1682 if ((max_buffers > 0 && mhclient->bufpos >= max_buffers) ||
1683 (mhsink->timeout > 0
1684 && now - mhclient->last_activity_time_monotonic >
1685 mhsink->timeout)) {
1686 /* remove client */
1687 GST_WARNING_OBJECT (sink, "%s client %p is too slow, removing",
1688 mhclient->debug, mhclient);
1689 /* remove the client, the handle set will be cleared and the select thread
1690 * will be signaled */
1691 mhclient->status = GST_CLIENT_STATUS_SLOW;
1692 /* set client to invalid position while being removed */
1693 mhclient->bufpos = -1;
1694 gst_multi_handle_sink_remove_client_link (mhsink, clients);
1695 hash_changed = TRUE;
1696 continue;
1697 } else if (mhclient->bufpos == 0 || mhclient->new_connection) {
1698 /* can send data to this client now. need to signal the select thread that
1699 * the handle_set changed */
1700 mhsinkclass->hash_adding (mhsink, mhclient);
1701 hash_changed = TRUE;
1702 }
1703
1704 /* keep track of maximum buffer usage */
1705 if (mhclient->bufpos > max_buffer_usage) {
1706 max_buffer_usage = mhclient->bufpos;
1707 }
1708 }
1709
1710 /* make sure we respect bytes-min, buffers-min and time-min when they are set */
1711 {
1712 gint usage, max;
1713
1714 GST_LOG_OBJECT (sink,
1715 "extending queue %d to respect time_min %" GST_TIME_FORMAT
1716 ", bytes_min %d, buffers_min %d", max_buffer_usage,
1717 GST_TIME_ARGS (mhsink->time_min), mhsink->bytes_min,
1718 mhsink->buffers_min);
1719
1720 /* get index where the limits are ok, we don't really care if all limits
1721 * are ok, we just queue as much as we need. We also don't compare against
1722 * the max limits. */
1723 find_limits (mhsink, &usage, mhsink->bytes_min, mhsink->buffers_min,
1724 mhsink->time_min, &max, -1, -1, -1);
1725
1726 max_buffer_usage = MAX (max_buffer_usage, usage);
1727 GST_LOG_OBJECT (sink, "extended queue to %d", max_buffer_usage);
1728 }
1729
1730 /* now look for sync points and make sure there is at least one
1731 * sync point in the queue. We only do this if the LATEST_KEYFRAME or
1732 * BURST_KEYFRAME mode is selected */
1733 if (mhsink->def_sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME ||
1734 mhsink->def_sync_method == GST_SYNC_METHOD_BURST_KEYFRAME) {
1735 /* no point in searching beyond the queue length */
1736 gint limit = queuelen;
1737 GstBuffer *buf;
1738
1739 /* no point in searching beyond the soft-max if any. */
1740 if (soft_max_buffers > 0) {
1741 limit = MIN (limit, soft_max_buffers);
1742 }
1743 GST_LOG_OBJECT (sink,
1744 "extending queue to include sync point, now at %d, limit is %d",
1745 max_buffer_usage, limit);
1746 for (i = 0; i < limit; i++) {
1747 buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
1748 if (is_sync_frame (mhsink, buf)) {
1749 /* found a sync frame, now extend the buffer usage to
1750 * include at least this frame. */
1751 max_buffer_usage = MAX (max_buffer_usage, i);
1752 break;
1753 }
1754 }
1755 GST_LOG_OBJECT (sink, "max buffer usage is now %d", max_buffer_usage);
1756 }
1757
1758 GST_LOG_OBJECT (sink, "len %d, usage %d", queuelen, max_buffer_usage);
1759
1760 /* nobody is referencing units after max_buffer_usage so we can
1761 * remove them from the queue. We remove them in reverse order as
1762 * this is the most optimal for GArray. */
1763 for (i = queuelen - 1; i > max_buffer_usage; i--) {
1764 GstBuffer *old;
1765
1766 /* queue exceeded max size */
1767 queuelen--;
1768 old = g_array_index (mhsink->bufqueue, GstBuffer *, i);
1769 mhsink->bufqueue = g_array_remove_index (mhsink->bufqueue, i);
1770
1771 /* unref tail buffer */
1772 gst_buffer_unref (old);
1773 }
1774 /* save for stats */
1775 mhsink->buffers_queued = max_buffer_usage + 1;
1776 CLIENTS_UNLOCK (sink);
1777
1778 /* and send a signal to thread if handle_set changed */
1779 if (hash_changed && mhsinkclass->hash_changed) {
1780 mhsinkclass->hash_changed (mhsink);
1781 }
1782 }
1783
1784 static gboolean
buffer_is_in_caps(GstMultiHandleSink * sink,GstBuffer * buf)1785 buffer_is_in_caps (GstMultiHandleSink * sink, GstBuffer * buf)
1786 {
1787 GstCaps *caps;
1788 GstStructure *s;
1789 const GValue *v;
1790
1791 caps = gst_pad_get_current_caps (GST_BASE_SINK_PAD (sink));
1792 if (!caps)
1793 return FALSE;
1794 s = gst_caps_get_structure (caps, 0);
1795 if (!gst_structure_has_field (s, "streamheader")) {
1796 gst_caps_unref (caps);
1797 return FALSE;
1798 }
1799
1800 v = gst_structure_get_value (s, "streamheader");
1801 if (GST_VALUE_HOLDS_ARRAY (v)) {
1802 guint n = gst_value_array_get_size (v);
1803 guint i;
1804 GstMapInfo map;
1805
1806 gst_buffer_map (buf, &map, GST_MAP_READ);
1807
1808 for (i = 0; i < n; i++) {
1809 const GValue *v2 = gst_value_array_get_value (v, i);
1810 GstBuffer *buf2;
1811 GstMapInfo map2;
1812
1813 if (!GST_VALUE_HOLDS_BUFFER (v2))
1814 continue;
1815
1816 buf2 = gst_value_get_buffer (v2);
1817 if (buf == buf2) {
1818 gst_caps_unref (caps);
1819 return TRUE;
1820 }
1821 gst_buffer_map (buf2, &map2, GST_MAP_READ);
1822 if (map.size == map2.size && memcmp (map.data, map2.data, map.size) == 0) {
1823 gst_buffer_unmap (buf2, &map2);
1824 gst_buffer_unmap (buf, &map);
1825 gst_caps_unref (caps);
1826 return TRUE;
1827 }
1828 gst_buffer_unmap (buf2, &map2);
1829 }
1830 gst_buffer_unmap (buf, &map);
1831 }
1832
1833 gst_caps_unref (caps);
1834
1835 return FALSE;
1836 }
1837
1838 static GstFlowReturn
gst_multi_handle_sink_render(GstBaseSink * bsink,GstBuffer * buf)1839 gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf)
1840 {
1841 gboolean is_header, in_caps;
1842 #if 0
1843 GstCaps *bufcaps, *padcaps;
1844 #endif
1845
1846 GstMultiHandleSink *sink = GST_MULTI_HANDLE_SINK (bsink);
1847
1848 g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink,
1849 GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_FLUSHING);
1850
1851 #if 0
1852 /* since we check every buffer for streamheader caps, we need to make
1853 * sure every buffer has caps set */
1854 bufcaps = gst_buffer_get_caps (buf);
1855 padcaps = GST_PAD_CAPS (GST_BASE_SINK_PAD (bsink));
1856
1857 /* make sure we have caps on the pad */
1858 if (!padcaps && !bufcaps)
1859 goto no_caps;
1860 #endif
1861
1862 /* get HEADER first, code below might mess with the flags */
1863 is_header = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
1864 in_caps = is_header && buffer_is_in_caps (sink, buf);
1865
1866 #if 0
1867 /* stamp the buffer with previous caps if no caps set */
1868 if (!bufcaps) {
1869 if (!gst_buffer_is_writable (buf)) {
1870 /* metadata is not writable, copy will be made and original buffer
1871 * will be unreffed so we need to ref so that we don't lose the
1872 * buffer in the render method. */
1873 gst_buffer_ref (buf);
1874 /* the new buffer is ours only, we keep it out of the scope of this
1875 * function */
1876 buf = gst_buffer_make_writable (buf);
1877 } else {
1878 /* else the metadata is writable, we ref because we keep the buffer
1879 * out of the scope of this method */
1880 gst_buffer_ref (buf);
1881 }
1882 /* buffer metadata is writable now, set the caps */
1883 gst_buffer_set_caps (buf, padcaps);
1884 } else {
1885 gst_caps_unref (bufcaps);
1886
1887 /* since we keep this buffer out of the scope of this method */
1888 gst_buffer_ref (buf);
1889 }
1890 #endif
1891 gst_buffer_ref (buf);
1892
1893 GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %s, offset %"
1894 G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT
1895 ", timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT,
1896 buf, in_caps ? "yes" : "no", GST_BUFFER_OFFSET (buf),
1897 GST_BUFFER_OFFSET_END (buf),
1898 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
1899 GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
1900
1901 /* if the incoming buffer is a streamheader from the caps, then we assume for now
1902 * it's a streamheader that needs to be sent to each new client.
1903 *
1904 * We don't send the buffer to the client, since streamheaders are sent
1905 * separately when necessary. */
1906 if (in_caps) {
1907 GST_DEBUG_OBJECT (sink, "ignoring HEADER buffer with length %"
1908 G_GSIZE_FORMAT, gst_buffer_get_size (buf));
1909 gst_buffer_unref (buf);
1910 } else {
1911 /* queue the buffer, this is a regular data buffer. */
1912 gst_multi_handle_sink_queue_buffer (sink, buf);
1913
1914 sink->bytes_to_serve += gst_buffer_get_size (buf);
1915 }
1916 return GST_FLOW_OK;
1917
1918 /* ERRORS */
1919 #if 0
1920 no_caps:
1921 {
1922 GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, (NULL),
1923 ("Received first buffer without caps set"));
1924 return GST_FLOW_NOT_NEGOTIATED;
1925 }
1926 #endif
1927 }
1928
1929 static void
gst_multi_handle_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)1930 gst_multi_handle_sink_set_property (GObject * object, guint prop_id,
1931 const GValue * value, GParamSpec * pspec)
1932 {
1933 GstMultiHandleSink *multihandlesink;
1934
1935 multihandlesink = GST_MULTI_HANDLE_SINK (object);
1936
1937 switch (prop_id) {
1938 case PROP_BUFFERS_MAX:
1939 multihandlesink->units_max = g_value_get_int (value);
1940 break;
1941 case PROP_BUFFERS_SOFT_MAX:
1942 multihandlesink->units_soft_max = g_value_get_int (value);
1943 break;
1944 case PROP_TIME_MIN:
1945 multihandlesink->time_min = g_value_get_int64 (value);
1946 break;
1947 case PROP_BYTES_MIN:
1948 multihandlesink->bytes_min = g_value_get_int (value);
1949 break;
1950 case PROP_BUFFERS_MIN:
1951 multihandlesink->buffers_min = g_value_get_int (value);
1952 break;
1953 case PROP_UNIT_FORMAT:
1954 multihandlesink->unit_format = g_value_get_enum (value);
1955 break;
1956 case PROP_UNITS_MAX:
1957 multihandlesink->units_max = g_value_get_int64 (value);
1958 break;
1959 case PROP_UNITS_SOFT_MAX:
1960 multihandlesink->units_soft_max = g_value_get_int64 (value);
1961 break;
1962 case PROP_RECOVER_POLICY:
1963 multihandlesink->recover_policy = g_value_get_enum (value);
1964 break;
1965 case PROP_TIMEOUT:
1966 multihandlesink->timeout = g_value_get_uint64 (value);
1967 break;
1968 case PROP_SYNC_METHOD:
1969 multihandlesink->def_sync_method = g_value_get_enum (value);
1970 break;
1971 case PROP_BURST_FORMAT:
1972 multihandlesink->def_burst_format = g_value_get_enum (value);
1973 break;
1974 case PROP_BURST_VALUE:
1975 multihandlesink->def_burst_value = g_value_get_uint64 (value);
1976 break;
1977 case PROP_QOS_DSCP:
1978 multihandlesink->qos_dscp = g_value_get_int (value);
1979 gst_multi_handle_sink_setup_dscp (multihandlesink);
1980 break;
1981
1982 case PROP_RESEND_STREAMHEADER:
1983 multihandlesink->resend_streamheader = g_value_get_boolean (value);
1984 break;
1985
1986 default:
1987 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1988 break;
1989 }
1990 }
1991
1992 static void
gst_multi_handle_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)1993 gst_multi_handle_sink_get_property (GObject * object, guint prop_id,
1994 GValue * value, GParamSpec * pspec)
1995 {
1996 GstMultiHandleSink *multihandlesink;
1997
1998 multihandlesink = GST_MULTI_HANDLE_SINK (object);
1999
2000 switch (prop_id) {
2001 case PROP_BUFFERS_MAX:
2002 g_value_set_int (value, multihandlesink->units_max);
2003 break;
2004 case PROP_BUFFERS_SOFT_MAX:
2005 g_value_set_int (value, multihandlesink->units_soft_max);
2006 break;
2007 case PROP_TIME_MIN:
2008 g_value_set_int64 (value, multihandlesink->time_min);
2009 break;
2010 case PROP_BYTES_MIN:
2011 g_value_set_int (value, multihandlesink->bytes_min);
2012 break;
2013 case PROP_BUFFERS_MIN:
2014 g_value_set_int (value, multihandlesink->buffers_min);
2015 break;
2016 case PROP_BUFFERS_QUEUED:
2017 g_value_set_uint (value, multihandlesink->buffers_queued);
2018 break;
2019 case PROP_BYTES_QUEUED:
2020 g_value_set_uint (value, multihandlesink->bytes_queued);
2021 break;
2022 case PROP_TIME_QUEUED:
2023 g_value_set_uint64 (value, multihandlesink->time_queued);
2024 break;
2025 case PROP_UNIT_FORMAT:
2026 g_value_set_enum (value, multihandlesink->unit_format);
2027 break;
2028 case PROP_UNITS_MAX:
2029 g_value_set_int64 (value, multihandlesink->units_max);
2030 break;
2031 case PROP_UNITS_SOFT_MAX:
2032 g_value_set_int64 (value, multihandlesink->units_soft_max);
2033 break;
2034 case PROP_RECOVER_POLICY:
2035 g_value_set_enum (value, multihandlesink->recover_policy);
2036 break;
2037 case PROP_TIMEOUT:
2038 g_value_set_uint64 (value, multihandlesink->timeout);
2039 break;
2040 case PROP_SYNC_METHOD:
2041 g_value_set_enum (value, multihandlesink->def_sync_method);
2042 break;
2043 case PROP_BYTES_TO_SERVE:
2044 g_value_set_uint64 (value, multihandlesink->bytes_to_serve);
2045 break;
2046 case PROP_BYTES_SERVED:
2047 g_value_set_uint64 (value, multihandlesink->bytes_served);
2048 break;
2049 case PROP_BURST_FORMAT:
2050 g_value_set_enum (value, multihandlesink->def_burst_format);
2051 break;
2052 case PROP_BURST_VALUE:
2053 g_value_set_uint64 (value, multihandlesink->def_burst_value);
2054 break;
2055 case PROP_QOS_DSCP:
2056 g_value_set_int (value, multihandlesink->qos_dscp);
2057 break;
2058 case PROP_RESEND_STREAMHEADER:
2059 g_value_set_boolean (value, multihandlesink->resend_streamheader);
2060 break;
2061 case PROP_NUM_HANDLES:
2062 g_value_set_uint (value,
2063 g_hash_table_size (multihandlesink->handle_hash));
2064 break;
2065 default:
2066 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2067 break;
2068 }
2069 }
2070
2071 /* create a socket for sending to remote machine */
2072 static gboolean
gst_multi_handle_sink_start(GstBaseSink * bsink)2073 gst_multi_handle_sink_start (GstBaseSink * bsink)
2074 {
2075 GstMultiHandleSinkClass *mhsclass;
2076 GstMultiHandleSink *mhsink;
2077
2078 if (GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_HANDLE_SINK_OPEN))
2079 return TRUE;
2080
2081 mhsink = GST_MULTI_HANDLE_SINK (bsink);
2082 mhsclass = GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
2083
2084 if (!mhsclass->start_pre (mhsink))
2085 return FALSE;
2086
2087 mhsink->bytes_to_serve = 0;
2088 mhsink->bytes_served = 0;
2089
2090 if (mhsclass->init) {
2091 mhsclass->init (mhsink);
2092 }
2093
2094 mhsink->running = TRUE;
2095
2096 mhsink->thread = g_thread_new ("multihandlesink",
2097 (GThreadFunc) mhsclass->thread, mhsink);
2098
2099 GST_OBJECT_FLAG_SET (bsink, GST_MULTI_HANDLE_SINK_OPEN);
2100
2101 return TRUE;
2102 }
2103
2104 static gboolean
gst_multi_handle_sink_stop(GstBaseSink * bsink)2105 gst_multi_handle_sink_stop (GstBaseSink * bsink)
2106 {
2107 GstMultiHandleSinkClass *mhclass;
2108 GstBuffer *buf;
2109 gint i;
2110 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (bsink);
2111
2112 mhclass = GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
2113
2114 if (!GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_HANDLE_SINK_OPEN))
2115 return TRUE;
2116
2117 mhsink->running = FALSE;
2118
2119 mhclass->stop_pre (mhsink);
2120
2121 if (mhsink->thread) {
2122 GST_DEBUG_OBJECT (mhsink, "joining thread");
2123 g_thread_join (mhsink->thread);
2124 GST_DEBUG_OBJECT (mhsink, "joined thread");
2125 mhsink->thread = NULL;
2126 }
2127
2128 /* free the clients */
2129 mhclass->clear (GST_MULTI_HANDLE_SINK (mhsink));
2130
2131 if (mhclass->close)
2132 mhclass->close (mhsink);
2133
2134 mhclass->stop_post (mhsink);
2135
2136 /* remove all queued buffers */
2137 if (mhsink->bufqueue) {
2138 GST_DEBUG_OBJECT (mhsink, "Emptying bufqueue with %d buffers",
2139 mhsink->bufqueue->len);
2140 for (i = mhsink->bufqueue->len - 1; i >= 0; --i) {
2141 buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
2142 GST_LOG_OBJECT (mhsink, "Removing buffer %p (%d) with refcount %d", buf,
2143 i, GST_MINI_OBJECT_REFCOUNT (buf));
2144 gst_buffer_unref (buf);
2145 mhsink->bufqueue = g_array_remove_index (mhsink->bufqueue, i);
2146 }
2147 /* freeing the array is done in _finalize */
2148 }
2149 GST_OBJECT_FLAG_UNSET (mhsink, GST_MULTI_HANDLE_SINK_OPEN);
2150
2151 return TRUE;
2152 }
2153
2154 static GstStateChangeReturn
gst_multi_handle_sink_change_state(GstElement * element,GstStateChange transition)2155 gst_multi_handle_sink_change_state (GstElement * element,
2156 GstStateChange transition)
2157 {
2158 GstMultiHandleSink *sink;
2159 GstStateChangeReturn ret;
2160
2161 sink = GST_MULTI_HANDLE_SINK (element);
2162
2163 /* we disallow changing the state from the streaming thread */
2164 if (g_thread_self () == sink->thread) {
2165 g_warning
2166 ("\nTrying to change %s's state from its streaming thread would deadlock.\n"
2167 "You cannot change the state of an element from its streaming\n"
2168 "thread. Use g_idle_add() or post a GstMessage on the bus to\n"
2169 "schedule the state change from the main thread.\n",
2170 GST_ELEMENT_NAME (sink));
2171
2172 return GST_STATE_CHANGE_FAILURE;
2173 }
2174
2175 switch (transition) {
2176 case GST_STATE_CHANGE_NULL_TO_READY:
2177 if (!gst_multi_handle_sink_start (GST_BASE_SINK (sink)))
2178 goto start_failed;
2179 break;
2180 case GST_STATE_CHANGE_READY_TO_PAUSED:
2181 break;
2182 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
2183 break;
2184 default:
2185 break;
2186 }
2187
2188 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2189
2190 switch (transition) {
2191 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2192 break;
2193 case GST_STATE_CHANGE_PAUSED_TO_READY:
2194 break;
2195 case GST_STATE_CHANGE_READY_TO_NULL:
2196 gst_multi_handle_sink_stop (GST_BASE_SINK (sink));
2197 break;
2198 default:
2199 break;
2200 }
2201 return ret;
2202
2203 /* ERRORS */
2204 start_failed:
2205 {
2206 /* error message was posted */
2207 return GST_STATE_CHANGE_FAILURE;
2208 }
2209 }
2210