1 /* GStreamer
2 * Copyright (C) 2014 David Schleef <ds@schleef.org>
3 * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4 * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19 * Boston, MA 02110-1335, USA.
20 */
21 /**
22 * SECTION:element-rtmp2sink
23 *
24 * The rtmp2sink element sends audio and video streams to an RTMP
25 * server.
26 *
27 * <refsect2>
28 * <title>Example launch line</title>
29 * |[
30 * gst-launch -v videotestsrc ! x264enc ! flvmux ! rtmp2sink
31 * location=rtmp://server.example.com/live/myStream
32 * ]|
33 * FIXME Describe what the pipeline does.
34 * </refsect2>
35 */
36
37 #ifdef HAVE_CONFIG_H
38 #include "config.h"
39 #endif
40
41 #include "gstrtmp2elements.h"
42 #include "gstrtmp2sink.h"
43
44 #include "gstrtmp2locationhandler.h"
45 #include "rtmp/amf.h"
46 #include "rtmp/rtmpclient.h"
47 #include "rtmp/rtmpmessage.h"
48 #include "rtmp/rtmputils.h"
49
50 #include <gst/gst.h>
51 #include <gst/base/gstbasesink.h>
52 #include <gio/gnetworking.h>
53 #include <string.h>
54
55 GST_DEBUG_CATEGORY_STATIC (gst_rtmp2_sink_debug_category);
56 #define GST_CAT_DEFAULT gst_rtmp2_sink_debug_category
57
58 /* prototypes */
59 #define GST_RTMP2_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP2_SINK,GstRtmp2Sink))
60 #define GST_IS_RTMP2_SINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP2_SINK))
61
62 typedef struct
63 {
64 GstBaseSink parent_instance;
65
66 /* properties */
67 GstRtmpLocation location;
68 gboolean async_connect;
69 guint peak_kbps;
70 guint32 chunk_size;
71 GstRtmpStopCommands stop_commands;
72 GstStructure *stats;
73
74 /* If both self->lock and OBJECT_LOCK are needed,
75 * self->lock must be taken first */
76 GMutex lock;
77 GCond cond;
78
79 gboolean running, flushing;
80
81 GstTask *task;
82 GRecMutex task_lock;
83
84 GMainLoop *loop;
85 GMainContext *context;
86
87 GCancellable *cancellable;
88 GstRtmpConnection *connection;
89 guint32 stream_id;
90
91 GPtrArray *headers;
92 guint64 last_ts, base_ts; /* timestamp fixup */
93 } GstRtmp2Sink;
94
95 typedef struct
96 {
97 GstBaseSinkClass parent_class;
98 } GstRtmp2SinkClass;
99
100 /* GObject virtual functions */
101 static void gst_rtmp2_sink_set_property (GObject * object,
102 guint property_id, const GValue * value, GParamSpec * pspec);
103 static void gst_rtmp2_sink_get_property (GObject * object,
104 guint property_id, GValue * value, GParamSpec * pspec);
105 static void gst_rtmp2_sink_finalize (GObject * object);
106 static void gst_rtmp2_sink_uri_handler_init (GstURIHandlerInterface * iface);
107
108 /* GstBaseSink virtual functions */
109 static gboolean gst_rtmp2_sink_start (GstBaseSink * sink);
110 static gboolean gst_rtmp2_sink_stop (GstBaseSink * sink);
111 static gboolean gst_rtmp2_sink_event (GstBaseSink * sink, GstEvent * event);
112 static gboolean gst_rtmp2_sink_unlock (GstBaseSink * sink);
113 static gboolean gst_rtmp2_sink_unlock_stop (GstBaseSink * sink);
114 static GstFlowReturn gst_rtmp2_sink_render (GstBaseSink * sink,
115 GstBuffer * buffer);
116 static gboolean gst_rtmp2_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
117
118 /* Internal API */
119 static void gst_rtmp2_sink_task_func (gpointer user_data);
120
121 static void client_connect_done (GObject * source, GAsyncResult * result,
122 gpointer user_data);
123 static void start_publish_done (GObject * source, GAsyncResult * result,
124 gpointer user_data);
125 static void connect_task_done (GObject * object, GAsyncResult * result,
126 gpointer user_data);
127
128 static void set_pacing_rate (GstRtmp2Sink * self);
129 static void set_chunk_size (GstRtmp2Sink * self);
130
131 static GstStructure *gst_rtmp2_sink_get_stats (GstRtmp2Sink * self);
132
133 enum
134 {
135 PROP_0,
136 PROP_LOCATION,
137 PROP_SCHEME,
138 PROP_HOST,
139 PROP_PORT,
140 PROP_APPLICATION,
141 PROP_STREAM,
142 PROP_SECURE_TOKEN,
143 PROP_USERNAME,
144 PROP_PASSWORD,
145 PROP_AUTHMOD,
146 PROP_TIMEOUT,
147 PROP_TLS_VALIDATION_FLAGS,
148 PROP_FLASH_VERSION,
149 PROP_ASYNC_CONNECT,
150 PROP_PEAK_KBPS,
151 PROP_CHUNK_SIZE,
152 PROP_STATS,
153 PROP_STOP_COMMANDS,
154 };
155
156 /* pad templates */
157
158 static GstStaticPadTemplate gst_rtmp2_sink_sink_template =
159 GST_STATIC_PAD_TEMPLATE ("sink",
160 GST_PAD_SINK,
161 GST_PAD_ALWAYS,
162 GST_STATIC_CAPS ("video/x-flv")
163 );
164
165 /* class initialization */
166
167 G_DEFINE_TYPE_WITH_CODE (GstRtmp2Sink, gst_rtmp2_sink, GST_TYPE_BASE_SINK,
168 G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
169 gst_rtmp2_sink_uri_handler_init);
170 G_IMPLEMENT_INTERFACE (GST_TYPE_RTMP_LOCATION_HANDLER, NULL));
171 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (rtmp2sink, "rtmp2sink",
172 GST_RANK_PRIMARY + 1, GST_TYPE_RTMP2_SINK, rtmp2_element_init (plugin));
173
174 static void
gst_rtmp2_sink_class_init(GstRtmp2SinkClass * klass)175 gst_rtmp2_sink_class_init (GstRtmp2SinkClass * klass)
176 {
177 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
178 GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass);
179
180 gst_element_class_add_static_pad_template (GST_ELEMENT_CLASS (klass),
181 &gst_rtmp2_sink_sink_template);
182
183 gst_element_class_set_static_metadata (GST_ELEMENT_CLASS (klass),
184 "RTMP sink element", "Sink", "Sink element for RTMP streams",
185 "Make.TV, Inc. <info@make.tv>");
186
187 gobject_class->set_property = gst_rtmp2_sink_set_property;
188 gobject_class->get_property = gst_rtmp2_sink_get_property;
189 gobject_class->finalize = gst_rtmp2_sink_finalize;
190 base_sink_class->start = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_start);
191 base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_stop);
192 base_sink_class->event = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_event);
193 base_sink_class->unlock = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_unlock);
194 base_sink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_unlock_stop);
195 base_sink_class->render = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_render);
196 base_sink_class->set_caps = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_set_caps);
197
198 g_object_class_override_property (gobject_class, PROP_LOCATION, "location");
199 g_object_class_override_property (gobject_class, PROP_SCHEME, "scheme");
200 g_object_class_override_property (gobject_class, PROP_HOST, "host");
201 g_object_class_override_property (gobject_class, PROP_PORT, "port");
202 g_object_class_override_property (gobject_class, PROP_APPLICATION,
203 "application");
204 g_object_class_override_property (gobject_class, PROP_STREAM, "stream");
205 g_object_class_override_property (gobject_class, PROP_SECURE_TOKEN,
206 "secure-token");
207 g_object_class_override_property (gobject_class, PROP_USERNAME, "username");
208 g_object_class_override_property (gobject_class, PROP_PASSWORD, "password");
209 g_object_class_override_property (gobject_class, PROP_AUTHMOD, "authmod");
210 g_object_class_override_property (gobject_class, PROP_TIMEOUT, "timeout");
211 g_object_class_override_property (gobject_class, PROP_TLS_VALIDATION_FLAGS,
212 "tls-validation-flags");
213 g_object_class_override_property (gobject_class, PROP_FLASH_VERSION,
214 "flash-version");
215
216 g_object_class_install_property (gobject_class, PROP_ASYNC_CONNECT,
217 g_param_spec_boolean ("async-connect", "Async connect",
218 "Connect on READY, otherwise on first push", TRUE,
219 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
220
221 g_object_class_install_property (gobject_class, PROP_PEAK_KBPS,
222 g_param_spec_uint ("peak-kbps", "Peak bitrate",
223 "Bitrate in kbit/sec to pace outgoing packets", 0, G_MAXINT / 125, 0,
224 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
225 GST_PARAM_MUTABLE_PLAYING));
226
227 g_object_class_install_property (gobject_class, PROP_CHUNK_SIZE,
228 g_param_spec_uint ("chunk-size", "Chunk size", "RTMP chunk size",
229 GST_RTMP_MINIMUM_CHUNK_SIZE, GST_RTMP_MAXIMUM_CHUNK_SIZE,
230 GST_RTMP_DEFAULT_CHUNK_SIZE, G_PARAM_READWRITE |
231 G_PARAM_STATIC_STRINGS | GST_PARAM_MUTABLE_PLAYING));
232
233 g_object_class_install_property (gobject_class, PROP_STATS,
234 g_param_spec_boxed ("stats", "Stats", "Retrieve a statistics structure",
235 GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
236
237 /**
238 * GstRtmp2Sink:stop-commands:
239 *
240 * Which commands (if any) to send on EOS event before closing connection
241 *
242 * Since: 1.20
243 */
244 g_object_class_install_property (gobject_class, PROP_STOP_COMMANDS,
245 g_param_spec_flags ("stop-commands", "Stop commands",
246 "RTMP commands to send on EOS event before closing connection",
247 GST_TYPE_RTMP_STOP_COMMANDS, GST_RTMP_DEFAULT_STOP_COMMANDS,
248 (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
249
250 gst_type_mark_as_plugin_api (GST_TYPE_RTMP_LOCATION_HANDLER, 0);
251 GST_DEBUG_CATEGORY_INIT (gst_rtmp2_sink_debug_category, "rtmp2sink", 0,
252 "debug category for rtmp2sink element");
253 }
254
255 static void
gst_rtmp2_sink_init(GstRtmp2Sink * self)256 gst_rtmp2_sink_init (GstRtmp2Sink * self)
257 {
258 self->location.flash_ver = g_strdup ("FMLE/3.0 (compatible; FMSc/1.0)");
259 self->location.publish = TRUE;
260 self->async_connect = TRUE;
261 self->chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
262 self->stop_commands = GST_RTMP_DEFAULT_STOP_COMMANDS;
263
264 g_mutex_init (&self->lock);
265 g_cond_init (&self->cond);
266
267 self->task = gst_task_new (gst_rtmp2_sink_task_func, self, NULL);
268 g_rec_mutex_init (&self->task_lock);
269 gst_task_set_lock (self->task, &self->task_lock);
270
271 self->headers = g_ptr_array_new_with_free_func
272 ((GDestroyNotify) gst_mini_object_unref);
273 }
274
275 static void
gst_rtmp2_sink_uri_handler_init(GstURIHandlerInterface * iface)276 gst_rtmp2_sink_uri_handler_init (GstURIHandlerInterface * iface)
277 {
278 gst_rtmp_location_handler_implement_uri_handler (iface, GST_URI_SINK);
279 }
280
281 static void
gst_rtmp2_sink_set_property(GObject * object,guint property_id,const GValue * value,GParamSpec * pspec)282 gst_rtmp2_sink_set_property (GObject * object, guint property_id,
283 const GValue * value, GParamSpec * pspec)
284 {
285 GstRtmp2Sink *self = GST_RTMP2_SINK (object);
286
287 switch (property_id) {
288 case PROP_LOCATION:
289 gst_rtmp_location_handler_set_uri (GST_RTMP_LOCATION_HANDLER (self),
290 g_value_get_string (value));
291 break;
292 case PROP_SCHEME:
293 GST_OBJECT_LOCK (self);
294 self->location.scheme = g_value_get_enum (value);
295 GST_OBJECT_UNLOCK (self);
296 break;
297 case PROP_HOST:
298 GST_OBJECT_LOCK (self);
299 g_free (self->location.host);
300 self->location.host = g_value_dup_string (value);
301 GST_OBJECT_UNLOCK (self);
302 break;
303 case PROP_PORT:
304 GST_OBJECT_LOCK (self);
305 self->location.port = g_value_get_int (value);
306 GST_OBJECT_UNLOCK (self);
307 break;
308 case PROP_APPLICATION:
309 GST_OBJECT_LOCK (self);
310 g_free (self->location.application);
311 self->location.application = g_value_dup_string (value);
312 GST_OBJECT_UNLOCK (self);
313 break;
314 case PROP_STREAM:
315 GST_OBJECT_LOCK (self);
316 g_free (self->location.stream);
317 self->location.stream = g_value_dup_string (value);
318 GST_OBJECT_UNLOCK (self);
319 break;
320 case PROP_SECURE_TOKEN:
321 GST_OBJECT_LOCK (self);
322 g_free (self->location.secure_token);
323 self->location.secure_token = g_value_dup_string (value);
324 GST_OBJECT_UNLOCK (self);
325 break;
326 case PROP_USERNAME:
327 GST_OBJECT_LOCK (self);
328 g_free (self->location.username);
329 self->location.username = g_value_dup_string (value);
330 GST_OBJECT_UNLOCK (self);
331 break;
332 case PROP_PASSWORD:
333 GST_OBJECT_LOCK (self);
334 g_free (self->location.password);
335 self->location.password = g_value_dup_string (value);
336 GST_OBJECT_UNLOCK (self);
337 break;
338 case PROP_AUTHMOD:
339 GST_OBJECT_LOCK (self);
340 self->location.authmod = g_value_get_enum (value);
341 GST_OBJECT_UNLOCK (self);
342 break;
343 case PROP_TIMEOUT:
344 GST_OBJECT_LOCK (self);
345 self->location.timeout = g_value_get_uint (value);
346 GST_OBJECT_UNLOCK (self);
347 break;
348 case PROP_TLS_VALIDATION_FLAGS:
349 GST_OBJECT_LOCK (self);
350 self->location.tls_flags = g_value_get_flags (value);
351 GST_OBJECT_UNLOCK (self);
352 break;
353 case PROP_FLASH_VERSION:
354 GST_OBJECT_LOCK (self);
355 g_free (self->location.flash_ver);
356 self->location.flash_ver = g_value_dup_string (value);
357 GST_OBJECT_UNLOCK (self);
358 break;
359 case PROP_ASYNC_CONNECT:
360 GST_OBJECT_LOCK (self);
361 self->async_connect = g_value_get_boolean (value);
362 GST_OBJECT_UNLOCK (self);
363 break;
364 case PROP_PEAK_KBPS:
365 g_mutex_lock (&self->lock);
366
367 GST_OBJECT_LOCK (self);
368 self->peak_kbps = g_value_get_uint (value);
369 GST_OBJECT_UNLOCK (self);
370
371 set_pacing_rate (self);
372 g_mutex_unlock (&self->lock);
373 break;
374 case PROP_CHUNK_SIZE:
375 g_mutex_lock (&self->lock);
376
377 GST_OBJECT_LOCK (self);
378 self->chunk_size = g_value_get_uint (value);
379 GST_OBJECT_UNLOCK (self);
380
381 set_chunk_size (self);
382 g_mutex_unlock (&self->lock);
383 break;
384 case PROP_STOP_COMMANDS:
385 GST_OBJECT_LOCK (self);
386 self->stop_commands = g_value_get_flags (value);
387 GST_OBJECT_UNLOCK (self);
388 break;
389 default:
390 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
391 break;
392 }
393 }
394
395 static void
gst_rtmp2_sink_get_property(GObject * object,guint property_id,GValue * value,GParamSpec * pspec)396 gst_rtmp2_sink_get_property (GObject * object, guint property_id,
397 GValue * value, GParamSpec * pspec)
398 {
399 GstRtmp2Sink *self = GST_RTMP2_SINK (object);
400
401 switch (property_id) {
402 case PROP_LOCATION:
403 GST_OBJECT_LOCK (self);
404 g_value_take_string (value, gst_rtmp_location_get_string (&self->location,
405 TRUE));
406 GST_OBJECT_UNLOCK (self);
407 break;
408 case PROP_SCHEME:
409 GST_OBJECT_LOCK (self);
410 g_value_set_enum (value, self->location.scheme);
411 GST_OBJECT_UNLOCK (self);
412 break;
413 case PROP_HOST:
414 GST_OBJECT_LOCK (self);
415 g_value_set_string (value, self->location.host);
416 GST_OBJECT_UNLOCK (self);
417 break;
418 case PROP_PORT:
419 GST_OBJECT_LOCK (self);
420 g_value_set_int (value, self->location.port);
421 GST_OBJECT_UNLOCK (self);
422 break;
423 case PROP_APPLICATION:
424 GST_OBJECT_LOCK (self);
425 g_value_set_string (value, self->location.application);
426 GST_OBJECT_UNLOCK (self);
427 break;
428 case PROP_STREAM:
429 GST_OBJECT_LOCK (self);
430 g_value_set_string (value, self->location.stream);
431 GST_OBJECT_UNLOCK (self);
432 break;
433 case PROP_SECURE_TOKEN:
434 GST_OBJECT_LOCK (self);
435 g_value_set_string (value, self->location.secure_token);
436 GST_OBJECT_UNLOCK (self);
437 break;
438 case PROP_USERNAME:
439 GST_OBJECT_LOCK (self);
440 g_value_set_string (value, self->location.username);
441 GST_OBJECT_UNLOCK (self);
442 break;
443 case PROP_PASSWORD:
444 GST_OBJECT_LOCK (self);
445 g_value_set_string (value, self->location.password);
446 GST_OBJECT_UNLOCK (self);
447 break;
448 case PROP_AUTHMOD:
449 GST_OBJECT_LOCK (self);
450 g_value_set_enum (value, self->location.authmod);
451 GST_OBJECT_UNLOCK (self);
452 break;
453 case PROP_TIMEOUT:
454 GST_OBJECT_LOCK (self);
455 g_value_set_uint (value, self->location.timeout);
456 GST_OBJECT_UNLOCK (self);
457 break;
458 case PROP_TLS_VALIDATION_FLAGS:
459 GST_OBJECT_LOCK (self);
460 g_value_set_flags (value, self->location.tls_flags);
461 GST_OBJECT_UNLOCK (self);
462 break;
463 case PROP_FLASH_VERSION:
464 GST_OBJECT_LOCK (self);
465 g_value_set_string (value, self->location.flash_ver);
466 GST_OBJECT_UNLOCK (self);
467 break;
468 case PROP_ASYNC_CONNECT:
469 GST_OBJECT_LOCK (self);
470 g_value_set_boolean (value, self->async_connect);
471 GST_OBJECT_UNLOCK (self);
472 break;
473 case PROP_PEAK_KBPS:
474 GST_OBJECT_LOCK (self);
475 g_value_set_uint (value, self->peak_kbps);
476 GST_OBJECT_UNLOCK (self);
477 break;
478 case PROP_CHUNK_SIZE:
479 GST_OBJECT_LOCK (self);
480 g_value_set_uint (value, self->chunk_size);
481 GST_OBJECT_UNLOCK (self);
482 break;
483 case PROP_STATS:
484 g_value_take_boxed (value, gst_rtmp2_sink_get_stats (self));
485 break;
486 case PROP_STOP_COMMANDS:
487 GST_OBJECT_LOCK (self);
488 g_value_set_flags (value, self->stop_commands);
489 GST_OBJECT_UNLOCK (self);
490 break;
491 default:
492 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
493 break;
494 }
495 }
496
497 static void
gst_rtmp2_sink_finalize(GObject * object)498 gst_rtmp2_sink_finalize (GObject * object)
499 {
500 GstRtmp2Sink *self = GST_RTMP2_SINK (object);
501
502 g_clear_pointer (&self->headers, g_ptr_array_unref);
503
504 g_clear_object (&self->cancellable);
505 g_clear_object (&self->connection);
506
507 g_clear_object (&self->task);
508 g_rec_mutex_clear (&self->task_lock);
509
510 g_mutex_clear (&self->lock);
511 g_cond_clear (&self->cond);
512
513 g_clear_pointer (&self->stats, gst_structure_free);
514 gst_rtmp_location_clear (&self->location);
515
516 G_OBJECT_CLASS (gst_rtmp2_sink_parent_class)->finalize (object);
517 }
518
519 static gboolean
gst_rtmp2_sink_start(GstBaseSink * sink)520 gst_rtmp2_sink_start (GstBaseSink * sink)
521 {
522 GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
523 gboolean async;
524
525 GST_OBJECT_LOCK (self);
526 async = self->async_connect;
527 GST_OBJECT_UNLOCK (self);
528
529 GST_INFO_OBJECT (self, "Starting (%s)", async ? "async" : "delayed");
530
531 g_clear_object (&self->cancellable);
532
533 self->running = TRUE;
534 self->cancellable = g_cancellable_new ();
535 self->stream_id = 0;
536 self->last_ts = 0;
537 self->base_ts = 0;
538
539 if (async) {
540 gst_task_start (self->task);
541 }
542
543 return TRUE;
544 }
545
546 static gboolean
quit_invoker(gpointer user_data)547 quit_invoker (gpointer user_data)
548 {
549 g_main_loop_quit (user_data);
550 return G_SOURCE_REMOVE;
551 }
552
553 static void
stop_task(GstRtmp2Sink * self)554 stop_task (GstRtmp2Sink * self)
555 {
556 gst_task_stop (self->task);
557 self->running = FALSE;
558
559 if (self->cancellable) {
560 GST_DEBUG_OBJECT (self, "Cancelling");
561 g_cancellable_cancel (self->cancellable);
562 }
563
564 if (self->loop) {
565 GST_DEBUG_OBJECT (self, "Stopping loop");
566 g_main_context_invoke_full (self->context, G_PRIORITY_DEFAULT_IDLE,
567 quit_invoker, g_main_loop_ref (self->loop),
568 (GDestroyNotify) g_main_loop_unref);
569 }
570
571 g_cond_broadcast (&self->cond);
572 }
573
574 static gboolean
gst_rtmp2_sink_stop(GstBaseSink * sink)575 gst_rtmp2_sink_stop (GstBaseSink * sink)
576 {
577 GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
578
579 GST_DEBUG_OBJECT (self, "stop");
580
581 g_mutex_lock (&self->lock);
582 stop_task (self);
583 g_mutex_unlock (&self->lock);
584
585 gst_task_join (self->task);
586
587 return TRUE;
588 }
589
590 static gboolean
stop_publish_invoker(gpointer user_data)591 stop_publish_invoker (gpointer user_data)
592 {
593 GstRtmp2Sink *self = user_data;
594
595 if (self->connection) {
596 GST_OBJECT_LOCK (self);
597 if (self->stop_commands != GST_RTMP_STOP_COMMANDS_NONE) {
598 gst_rtmp_client_stop_publish (self->connection, self->location.stream,
599 self->stop_commands);
600 }
601 GST_OBJECT_UNLOCK (self);
602 }
603
604 return G_SOURCE_REMOVE;
605 }
606
607 static gboolean
gst_rtmp2_sink_event(GstBaseSink * sink,GstEvent * event)608 gst_rtmp2_sink_event (GstBaseSink * sink, GstEvent * event)
609 {
610 GstEventType type;
611 GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
612
613 type = GST_EVENT_TYPE (event);
614
615 switch (type) {
616 case GST_EVENT_EOS:
617 g_mutex_lock (&self->lock);
618 if (self->loop) {
619 GST_DEBUG_OBJECT (self, "Got EOS: stopping publish");
620 g_main_context_invoke (self->context, stop_publish_invoker, self);
621 }
622 g_mutex_unlock (&self->lock);
623 break;
624 default:
625 break;
626 }
627
628 return GST_BASE_SINK_CLASS (gst_rtmp2_sink_parent_class)->event (sink, event);
629 }
630
631 static gboolean
gst_rtmp2_sink_unlock(GstBaseSink * sink)632 gst_rtmp2_sink_unlock (GstBaseSink * sink)
633 {
634 GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
635
636 GST_DEBUG_OBJECT (self, "unlock");
637
638 g_mutex_lock (&self->lock);
639 self->flushing = TRUE;
640 g_cond_broadcast (&self->cond);
641 g_mutex_unlock (&self->lock);
642
643 return TRUE;
644 }
645
646 static gboolean
gst_rtmp2_sink_unlock_stop(GstBaseSink * sink)647 gst_rtmp2_sink_unlock_stop (GstBaseSink * sink)
648 {
649 GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
650
651 GST_DEBUG_OBJECT (self, "unlock_stop");
652
653 g_mutex_lock (&self->lock);
654 self->flushing = FALSE;
655 g_mutex_unlock (&self->lock);
656
657 return TRUE;
658 }
659
660 static gboolean
buffer_to_message(GstRtmp2Sink * self,GstBuffer * buffer,GstBuffer ** outbuf)661 buffer_to_message (GstRtmp2Sink * self, GstBuffer * buffer, GstBuffer ** outbuf)
662 {
663 GstBuffer *message;
664 GstRtmpFlvTagHeader header;
665 guint64 timestamp;
666 guint32 cstream;
667
668 {
669 GstMapInfo info;
670
671 if (G_UNLIKELY (!gst_buffer_map (buffer, &info, GST_MAP_READ))) {
672 GST_ERROR_OBJECT (self, "map failed: %" GST_PTR_FORMAT, buffer);
673 return FALSE;
674 }
675
676 /* FIXME: This is ugly and only works behind flvmux.
677 * Implement true RTMP muxing. */
678
679 if (G_UNLIKELY (info.size >= 4 && memcmp (info.data, "FLV", 3) == 0)) {
680 /* drop the header, we don't need it */
681 GST_DEBUG_OBJECT (self, "ignoring FLV header: %" GST_PTR_FORMAT, buffer);
682 gst_buffer_unmap (buffer, &info);
683 *outbuf = NULL;
684 return TRUE;
685 }
686
687 if (!gst_rtmp_flv_tag_parse_header (&header, info.data, info.size)) {
688 GST_ERROR_OBJECT (self, "too small for tag header: %" GST_PTR_FORMAT,
689 buffer);
690 gst_buffer_unmap (buffer, &info);
691 return FALSE;
692 }
693
694 if (info.size < header.total_size) {
695 GST_ERROR_OBJECT (self, "too small for tag body: buffer %" G_GSIZE_FORMAT
696 ", tag %" G_GSIZE_FORMAT, info.size, header.total_size);
697 gst_buffer_unmap (buffer, &info);
698 return FALSE;
699 }
700
701 /* flvmux timestamps roll over after about 49 days */
702 timestamp = header.timestamp;
703 if (timestamp + self->base_ts + G_MAXINT32 < self->last_ts) {
704 GST_WARNING_OBJECT (self, "Timestamp regression %" G_GUINT64_FORMAT
705 " -> %" G_GUINT64_FORMAT "; assuming overflow", self->last_ts,
706 timestamp + self->base_ts);
707 self->base_ts += G_MAXUINT32;
708 self->base_ts += 1;
709 } else if (timestamp + self->base_ts > self->last_ts + G_MAXINT32) {
710 GST_WARNING_OBJECT (self, "Timestamp jump %" G_GUINT64_FORMAT
711 " -> %" G_GUINT64_FORMAT "; assuming underflow", self->last_ts,
712 timestamp + self->base_ts);
713 if (self->base_ts > 0) {
714 self->base_ts -= G_MAXUINT32;
715 self->base_ts -= 1;
716 } else {
717 GST_WARNING_OBJECT (self, "Cannot regress further;"
718 " forcing timestamp to zero");
719 timestamp = 0;
720 }
721 }
722 timestamp += self->base_ts;
723 self->last_ts = timestamp;
724
725 gst_buffer_unmap (buffer, &info);
726 }
727
728 switch (header.type) {
729 case GST_RTMP_MESSAGE_TYPE_DATA_AMF0:
730 cstream = 4;
731 break;
732
733 case GST_RTMP_MESSAGE_TYPE_AUDIO:
734 cstream = 5;
735 break;
736
737 case GST_RTMP_MESSAGE_TYPE_VIDEO:
738 cstream = 6;
739 break;
740
741 default:
742 GST_ERROR_OBJECT (self, "unknown tag type %d", header.type);
743 return FALSE;
744 }
745
746 /* May not know stream ID yet; set later */
747 message = gst_rtmp_message_new (header.type, cstream, 0);
748 message = gst_buffer_append_region (message, gst_buffer_ref (buffer),
749 GST_RTMP_FLV_TAG_HEADER_SIZE, header.payload_size);
750
751 GST_BUFFER_DTS (message) = timestamp * GST_MSECOND;
752
753 *outbuf = message;
754 return TRUE;
755 }
756
757 static gboolean
should_drop_header(GstRtmp2Sink * self,GstBuffer * buffer)758 should_drop_header (GstRtmp2Sink * self, GstBuffer * buffer)
759 {
760 guint len;
761
762 if (G_LIKELY (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER))) {
763 return FALSE;
764 }
765
766 g_mutex_lock (&self->lock);
767 len = self->headers->len;
768 g_mutex_unlock (&self->lock);
769
770 /* Drop header buffers when we have streamheader caps */
771 return len > 0;
772 }
773
774 static void
send_message(GstRtmp2Sink * self,GstBuffer * message)775 send_message (GstRtmp2Sink * self, GstBuffer * message)
776 {
777 GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (message);
778
779 g_return_if_fail (meta != NULL);
780 g_return_if_fail (self->stream_id != 0);
781
782 meta->mstream = self->stream_id;
783
784 if (gst_rtmp_message_is_metadata (message)) {
785 gst_rtmp_connection_set_data_frame (self->connection, message);
786 } else {
787 gst_rtmp_connection_queue_message (self->connection, message);
788 }
789 }
790
791 static void
send_streamheader(GstRtmp2Sink * self)792 send_streamheader (GstRtmp2Sink * self)
793 {
794 guint i;
795
796 if (G_LIKELY (self->headers->len == 0)) {
797 return;
798 }
799
800 GST_DEBUG_OBJECT (self, "Sending %u streamheader messages",
801 self->headers->len);
802
803 for (i = 0; i < self->headers->len; i++) {
804 send_message (self, g_ptr_array_index (self->headers, i));
805 }
806
807 /* Steal pointers: suppress free */
808 g_ptr_array_set_free_func (self->headers, NULL);
809 g_ptr_array_set_size (self->headers, 0);
810 g_ptr_array_set_free_func (self->headers,
811 (GDestroyNotify) gst_mini_object_unref);
812 }
813
814 static inline gboolean
is_running(GstRtmp2Sink * self)815 is_running (GstRtmp2Sink * self)
816 {
817 return G_LIKELY (self->running && !self->flushing);
818 }
819
820 static GstFlowReturn
gst_rtmp2_sink_render(GstBaseSink * sink,GstBuffer * buffer)821 gst_rtmp2_sink_render (GstBaseSink * sink, GstBuffer * buffer)
822 {
823 GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
824 GstBuffer *message;
825 GstFlowReturn ret;
826
827 if (G_UNLIKELY (should_drop_header (self, buffer))) {
828 GST_DEBUG_OBJECT (self, "Skipping header %" GST_PTR_FORMAT, buffer);
829 return GST_FLOW_OK;
830 }
831
832 GST_LOG_OBJECT (self, "render %" GST_PTR_FORMAT, buffer);
833
834 if (G_UNLIKELY (!buffer_to_message (self, buffer, &message))) {
835 GST_ELEMENT_ERROR (self, STREAM, FAILED, ("Failed to convert FLV to RTMP"),
836 ("Failed to convert %" GST_PTR_FORMAT, message));
837 return GST_FLOW_ERROR;
838 }
839
840 if (G_UNLIKELY (!message)) {
841 GST_DEBUG_OBJECT (self, "Skipping %" GST_PTR_FORMAT, buffer);
842 return GST_FLOW_OK;
843 }
844
845 g_mutex_lock (&self->lock);
846
847 if (G_UNLIKELY (is_running (self) && self->cancellable &&
848 gst_task_get_state (self->task) != GST_TASK_STARTED)) {
849 GST_DEBUG_OBJECT (self, "Starting connect");
850 gst_task_start (self->task);
851 }
852
853 while (G_UNLIKELY (is_running (self) && !self->connection)) {
854 GST_DEBUG_OBJECT (self, "Waiting for connection");
855 g_cond_wait (&self->cond, &self->lock);
856 }
857
858 while (G_UNLIKELY (is_running (self) && self->connection &&
859 gst_rtmp_connection_get_num_queued (self->connection) > 3)) {
860 GST_LOG_OBJECT (self, "Waiting for queue");
861 g_cond_wait (&self->cond, &self->lock);
862 }
863
864 if (G_UNLIKELY (!is_running (self))) {
865 gst_buffer_unref (message);
866 ret = GST_FLOW_FLUSHING;
867 } else if (G_UNLIKELY (!self->connection)) {
868 gst_buffer_unref (message);
869 /* send_connect_error has sent an ERROR message */
870 ret = GST_FLOW_ERROR;
871 } else {
872 send_streamheader (self);
873 send_message (self, message);
874 ret = GST_FLOW_OK;
875 }
876
877 g_mutex_unlock (&self->lock);
878 return ret;
879 }
880
881 static gboolean
add_streamheader(GstRtmp2Sink * self,const GValue * value)882 add_streamheader (GstRtmp2Sink * self, const GValue * value)
883 {
884 GstBuffer *buffer, *message;
885
886 g_return_val_if_fail (value, FALSE);
887
888 if (!GST_VALUE_HOLDS_BUFFER (value)) {
889 GST_ERROR_OBJECT (self, "'streamheader' item of unexpected type '%s'",
890 G_VALUE_TYPE_NAME (value));
891 return FALSE;
892 }
893
894 buffer = gst_value_get_buffer (value);
895
896 if (!buffer_to_message (self, buffer, &message)) {
897 GST_ERROR_OBJECT (self, "Failed to read streamheader %" GST_PTR_FORMAT,
898 buffer);
899 return FALSE;
900 }
901
902 if (message) {
903 GST_DEBUG_OBJECT (self, "Adding streamheader %" GST_PTR_FORMAT, buffer);
904 g_ptr_array_add (self->headers, message);
905 } else {
906 GST_DEBUG_OBJECT (self, "Skipping streamheader %" GST_PTR_FORMAT, buffer);
907 }
908
909 return TRUE;
910 }
911
912 static gboolean
gst_rtmp2_sink_set_caps(GstBaseSink * sink,GstCaps * caps)913 gst_rtmp2_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
914 {
915 GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
916 GstStructure *s;
917 const GValue *streamheader;
918 guint i = 0;
919
920 GST_DEBUG_OBJECT (self, "setcaps %" GST_PTR_FORMAT, caps);
921
922 g_ptr_array_set_size (self->headers, 0);
923
924 s = gst_caps_get_structure (caps, 0);
925 streamheader = gst_structure_get_value (s, "streamheader");
926
927 if (!streamheader) {
928 GST_DEBUG_OBJECT (self, "'streamheader' field not present");
929 } else if (GST_VALUE_HOLDS_BUFFER (streamheader)) {
930 GST_DEBUG_OBJECT (self, "'streamheader' field holds buffer");
931 if (!add_streamheader (self, streamheader)) {
932 return FALSE;
933 }
934
935 i = 1;
936 } else if (GST_VALUE_HOLDS_ARRAY (streamheader)) {
937 guint size = gst_value_array_get_size (streamheader);
938
939 GST_DEBUG_OBJECT (self, "'streamheader' field holds array");
940
941 for (; i < size; i++) {
942 const GValue *v = gst_value_array_get_value (streamheader, i);
943
944 if (!add_streamheader (self, v)) {
945 return FALSE;
946 }
947 }
948 } else {
949 GST_ERROR_OBJECT (self, "'streamheader' field has unexpected type '%s'",
950 G_VALUE_TYPE_NAME (streamheader));
951 return FALSE;
952 }
953
954 GST_DEBUG_OBJECT (self, "Collected streamheaders: %u buffers -> %u messages",
955 i, self->headers->len);
956
957 return TRUE;
958 }
959
960 /* Mainloop task */
961 static void
gst_rtmp2_sink_task_func(gpointer user_data)962 gst_rtmp2_sink_task_func (gpointer user_data)
963 {
964 GstRtmp2Sink *self = GST_RTMP2_SINK (user_data);
965 GMainContext *context;
966 GMainLoop *loop;
967 GTask *connector;
968
969 GST_DEBUG_OBJECT (self, "gst_rtmp2_sink_task starting");
970 g_mutex_lock (&self->lock);
971
972 context = self->context = g_main_context_new ();
973 g_main_context_push_thread_default (context);
974 loop = self->loop = g_main_loop_new (context, TRUE);
975 connector = g_task_new (self, self->cancellable, connect_task_done, NULL);
976
977 g_clear_pointer (&self->stats, gst_structure_free);
978
979 GST_OBJECT_LOCK (self);
980 gst_rtmp_client_connect_async (&self->location, self->cancellable,
981 client_connect_done, connector);
982 GST_OBJECT_UNLOCK (self);
983
984 /* Run loop */
985 g_mutex_unlock (&self->lock);
986 g_main_loop_run (loop);
987 g_mutex_lock (&self->lock);
988
989 if (self->connection) {
990 self->stats = gst_rtmp_connection_get_stats (self->connection);
991 }
992
993 g_clear_pointer (&self->loop, g_main_loop_unref);
994 g_clear_pointer (&self->connection, gst_rtmp_connection_close_and_unref);
995 g_cond_broadcast (&self->cond);
996
997 /* Run loop cleanup */
998 g_mutex_unlock (&self->lock);
999 while (g_main_context_pending (context)) {
1000 GST_DEBUG_OBJECT (self, "iterating main context to clean up");
1001 g_main_context_iteration (context, FALSE);
1002 }
1003 g_main_context_pop_thread_default (context);
1004 g_mutex_lock (&self->lock);
1005
1006 g_clear_pointer (&self->context, g_main_context_unref);
1007 g_ptr_array_set_size (self->headers, 0);
1008
1009 g_mutex_unlock (&self->lock);
1010 GST_DEBUG_OBJECT (self, "gst_rtmp2_sink_task exiting");
1011 }
1012
1013 static void
client_connect_done(GObject * source,GAsyncResult * result,gpointer user_data)1014 client_connect_done (GObject * source, GAsyncResult * result,
1015 gpointer user_data)
1016 {
1017 GTask *task = user_data;
1018 GstRtmp2Sink *self = g_task_get_source_object (task);
1019 GError *error = NULL;
1020 GstRtmpConnection *connection;
1021
1022 connection = gst_rtmp_client_connect_finish (result, &error);
1023 if (!connection) {
1024 g_task_return_error (task, error);
1025 g_object_unref (task);
1026 return;
1027 }
1028
1029 g_task_set_task_data (task, connection, g_object_unref);
1030
1031 if (g_task_return_error_if_cancelled (task)) {
1032 g_object_unref (task);
1033 return;
1034 }
1035
1036 GST_OBJECT_LOCK (self);
1037 gst_rtmp_client_start_publish_async (connection, self->location.stream,
1038 g_task_get_cancellable (task), start_publish_done, task);
1039 GST_OBJECT_UNLOCK (self);
1040 }
1041
1042 static void
start_publish_done(GObject * source,GAsyncResult * result,gpointer user_data)1043 start_publish_done (GObject * source, GAsyncResult * result, gpointer user_data)
1044 {
1045 GTask *task = G_TASK (user_data);
1046 GstRtmp2Sink *self = g_task_get_source_object (task);
1047 GstRtmpConnection *connection = g_task_get_task_data (task);
1048 GError *error = NULL;
1049
1050 if (g_task_return_error_if_cancelled (task)) {
1051 g_object_unref (task);
1052 return;
1053 }
1054
1055 if (gst_rtmp_client_start_publish_finish (connection, result,
1056 &self->stream_id, &error)) {
1057 g_task_return_pointer (task, g_object_ref (connection),
1058 gst_rtmp_connection_close_and_unref);
1059 } else {
1060 g_task_return_error (task, error);
1061 }
1062
1063 g_task_set_task_data (task, NULL, NULL);
1064 g_object_unref (task);
1065 }
1066
1067 static void
put_chunk(GstRtmpConnection * connection,gpointer user_data)1068 put_chunk (GstRtmpConnection * connection, gpointer user_data)
1069 {
1070 GstRtmp2Sink *self = GST_RTMP2_SINK (user_data);
1071
1072 g_mutex_lock (&self->lock);
1073 g_cond_signal (&self->cond);
1074 g_mutex_unlock (&self->lock);
1075 }
1076
1077 static void
error_callback(GstRtmpConnection * connection,GstRtmp2Sink * self)1078 error_callback (GstRtmpConnection * connection, GstRtmp2Sink * self)
1079 {
1080 g_mutex_lock (&self->lock);
1081 if (self->cancellable) {
1082 g_cancellable_cancel (self->cancellable);
1083 } else if (self->loop) {
1084 GST_ELEMENT_ERROR (self, RESOURCE, WRITE, ("Connection error"), (NULL));
1085 stop_task (self);
1086 }
1087 g_mutex_unlock (&self->lock);
1088 }
1089
1090 static void
send_connect_error(GstRtmp2Sink * self,GError * error)1091 send_connect_error (GstRtmp2Sink * self, GError * error)
1092 {
1093 if (!error) {
1094 GST_ERROR_OBJECT (self, "Connect failed with NULL error");
1095 GST_ELEMENT_ERROR (self, RESOURCE, FAILED, ("Failed to connect"), (NULL));
1096 return;
1097 }
1098
1099 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
1100 GST_DEBUG_OBJECT (self, "Connection was cancelled (%s)",
1101 GST_STR_NULL (error->message));
1102 return;
1103 }
1104
1105 GST_ERROR_OBJECT (self, "Failed to connect (%s:%d): %s",
1106 g_quark_to_string (error->domain), error->code,
1107 GST_STR_NULL (error->message));
1108
1109 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED)) {
1110 GST_ELEMENT_ERROR (self, RESOURCE, NOT_AUTHORIZED,
1111 ("Not authorized to connect"), ("%s", GST_STR_NULL (error->message)));
1112 } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CONNECTION_REFUSED)) {
1113 GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ,
1114 ("Could not connect"), ("%s", GST_STR_NULL (error->message)));
1115 } else {
1116 GST_ELEMENT_ERROR (self, RESOURCE, FAILED,
1117 ("Failed to connect"),
1118 ("error %s:%d: %s", g_quark_to_string (error->domain), error->code,
1119 GST_STR_NULL (error->message)));
1120 }
1121 }
1122
1123 static void
connect_task_done(GObject * object,GAsyncResult * result,gpointer user_data)1124 connect_task_done (GObject * object, GAsyncResult * result, gpointer user_data)
1125 {
1126 GstRtmp2Sink *self = GST_RTMP2_SINK (object);
1127 GTask *task = G_TASK (result);
1128 GError *error = NULL;
1129
1130 g_mutex_lock (&self->lock);
1131
1132 g_warn_if_fail (g_task_is_valid (task, object));
1133
1134 if (self->cancellable == g_task_get_cancellable (task)) {
1135 g_clear_object (&self->cancellable);
1136 }
1137
1138 self->connection = g_task_propagate_pointer (task, &error);
1139 if (self->connection) {
1140 set_pacing_rate (self);
1141 set_chunk_size (self);
1142 gst_rtmp_connection_set_output_handler (self->connection,
1143 put_chunk, g_object_ref (self), g_object_unref);
1144 g_signal_connect_object (self->connection, "error",
1145 G_CALLBACK (error_callback), self, 0);
1146 } else {
1147 send_connect_error (self, error);
1148 stop_task (self);
1149 g_error_free (error);
1150 }
1151
1152 g_cond_broadcast (&self->cond);
1153 g_mutex_unlock (&self->lock);
1154 }
1155
1156 static gboolean
socket_set_pacing_rate(GSocket * socket,gint pacing_rate,GError ** error)1157 socket_set_pacing_rate (GSocket * socket, gint pacing_rate, GError ** error)
1158 {
1159 #ifdef SO_MAX_PACING_RATE
1160 if (!g_socket_set_option (socket, SOL_SOCKET, SO_MAX_PACING_RATE,
1161 pacing_rate, error)) {
1162 g_prefix_error (error, "setsockopt failed: ");
1163 return FALSE;
1164 }
1165 #else
1166 if (pacing_rate != -1) {
1167 g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
1168 "SO_MAX_PACING_RATE is not supported");
1169 return FALSE;
1170 }
1171 #endif
1172
1173 return TRUE;
1174 }
1175
1176 static void
set_pacing_rate(GstRtmp2Sink * self)1177 set_pacing_rate (GstRtmp2Sink * self)
1178 {
1179 GError *error = NULL;
1180 gint pacing_rate;
1181
1182 if (!self->connection)
1183 return;
1184
1185 GST_OBJECT_LOCK (self);
1186 pacing_rate = self->peak_kbps ? self->peak_kbps * 125 : -1;
1187 GST_OBJECT_UNLOCK (self);
1188
1189 if (socket_set_pacing_rate (gst_rtmp_connection_get_socket (self->connection),
1190 pacing_rate, &error))
1191 GST_INFO_OBJECT (self, "Set pacing rate to %d Bps", pacing_rate);
1192 else
1193 GST_WARNING_OBJECT (self, "Could not set pacing rate: %s", error->message);
1194
1195 g_clear_error (&error);
1196 }
1197
1198 static void
set_chunk_size(GstRtmp2Sink * self)1199 set_chunk_size (GstRtmp2Sink * self)
1200 {
1201 guint32 chunk_size;
1202
1203 if (!self->connection)
1204 return;
1205
1206 GST_OBJECT_LOCK (self);
1207 chunk_size = self->chunk_size;
1208 GST_OBJECT_UNLOCK (self);
1209
1210 gst_rtmp_connection_set_chunk_size (self->connection, chunk_size);
1211 GST_INFO_OBJECT (self, "Set chunk size to %" G_GUINT32_FORMAT, chunk_size);
1212 }
1213
1214 static GstStructure *
gst_rtmp2_sink_get_stats(GstRtmp2Sink * self)1215 gst_rtmp2_sink_get_stats (GstRtmp2Sink * self)
1216 {
1217 GstStructure *s;
1218
1219 g_mutex_lock (&self->lock);
1220
1221 if (self->connection) {
1222 s = gst_rtmp_connection_get_stats (self->connection);
1223 } else if (self->stats) {
1224 s = gst_structure_copy (self->stats);
1225 } else {
1226 s = gst_rtmp_connection_get_null_stats ();
1227 }
1228
1229 g_mutex_unlock (&self->lock);
1230
1231 return s;
1232 }
1233