• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 2014 David Schleef <ds@schleef.org>
3  * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4  *   Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19  * Boston, MA 02110-1335, USA.
20  */
21 /**
22  * SECTION:element-rtmp2sink
23  *
24  * The rtmp2sink element sends audio and video streams to an RTMP
25  * server.
26  *
27  * <refsect2>
28  * <title>Example launch line</title>
29  * |[
30  * gst-launch -v videotestsrc ! x264enc ! flvmux ! rtmp2sink
31  *     location=rtmp://server.example.com/live/myStream
32  * ]|
33  * FIXME Describe what the pipeline does.
34  * </refsect2>
35  */
36 
37 #ifdef HAVE_CONFIG_H
38 #include "config.h"
39 #endif
40 
41 #include "gstrtmp2elements.h"
42 #include "gstrtmp2sink.h"
43 
44 #include "gstrtmp2locationhandler.h"
45 #include "rtmp/amf.h"
46 #include "rtmp/rtmpclient.h"
47 #include "rtmp/rtmpmessage.h"
48 #include "rtmp/rtmputils.h"
49 
50 #include <gst/gst.h>
51 #include <gst/base/gstbasesink.h>
52 #include <gio/gnetworking.h>
53 #include <string.h>
54 
55 GST_DEBUG_CATEGORY_STATIC (gst_rtmp2_sink_debug_category);
56 #define GST_CAT_DEFAULT gst_rtmp2_sink_debug_category
57 
58 /* prototypes */
59 #define GST_RTMP2_SINK(obj)   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP2_SINK,GstRtmp2Sink))
60 #define GST_IS_RTMP2_SINK(obj)   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP2_SINK))
61 
62 typedef struct
63 {
64   GstBaseSink parent_instance;
65 
66   /* properties */
67   GstRtmpLocation location;
68   gboolean async_connect;
69   guint peak_kbps;
70   guint32 chunk_size;
71   GstRtmpStopCommands stop_commands;
72   GstStructure *stats;
73 
74   /* If both self->lock and OBJECT_LOCK are needed,
75    * self->lock must be taken first */
76   GMutex lock;
77   GCond cond;
78 
79   gboolean running, flushing;
80 
81   GstTask *task;
82   GRecMutex task_lock;
83 
84   GMainLoop *loop;
85   GMainContext *context;
86 
87   GCancellable *cancellable;
88   GstRtmpConnection *connection;
89   guint32 stream_id;
90 
91   GPtrArray *headers;
92   guint64 last_ts, base_ts;     /* timestamp fixup */
93 } GstRtmp2Sink;
94 
95 typedef struct
96 {
97   GstBaseSinkClass parent_class;
98 } GstRtmp2SinkClass;
99 
100 /* GObject virtual functions */
101 static void gst_rtmp2_sink_set_property (GObject * object,
102     guint property_id, const GValue * value, GParamSpec * pspec);
103 static void gst_rtmp2_sink_get_property (GObject * object,
104     guint property_id, GValue * value, GParamSpec * pspec);
105 static void gst_rtmp2_sink_finalize (GObject * object);
106 static void gst_rtmp2_sink_uri_handler_init (GstURIHandlerInterface * iface);
107 
108 /* GstBaseSink virtual functions */
109 static gboolean gst_rtmp2_sink_start (GstBaseSink * sink);
110 static gboolean gst_rtmp2_sink_stop (GstBaseSink * sink);
111 static gboolean gst_rtmp2_sink_event (GstBaseSink * sink, GstEvent * event);
112 static gboolean gst_rtmp2_sink_unlock (GstBaseSink * sink);
113 static gboolean gst_rtmp2_sink_unlock_stop (GstBaseSink * sink);
114 static GstFlowReturn gst_rtmp2_sink_render (GstBaseSink * sink,
115     GstBuffer * buffer);
116 static gboolean gst_rtmp2_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
117 
118 /* Internal API */
119 static void gst_rtmp2_sink_task_func (gpointer user_data);
120 
121 static void client_connect_done (GObject * source, GAsyncResult * result,
122     gpointer user_data);
123 static void start_publish_done (GObject * source, GAsyncResult * result,
124     gpointer user_data);
125 static void connect_task_done (GObject * object, GAsyncResult * result,
126     gpointer user_data);
127 
128 static void set_pacing_rate (GstRtmp2Sink * self);
129 static void set_chunk_size (GstRtmp2Sink * self);
130 
131 static GstStructure *gst_rtmp2_sink_get_stats (GstRtmp2Sink * self);
132 
133 enum
134 {
135   PROP_0,
136   PROP_LOCATION,
137   PROP_SCHEME,
138   PROP_HOST,
139   PROP_PORT,
140   PROP_APPLICATION,
141   PROP_STREAM,
142   PROP_SECURE_TOKEN,
143   PROP_USERNAME,
144   PROP_PASSWORD,
145   PROP_AUTHMOD,
146   PROP_TIMEOUT,
147   PROP_TLS_VALIDATION_FLAGS,
148   PROP_FLASH_VERSION,
149   PROP_ASYNC_CONNECT,
150   PROP_PEAK_KBPS,
151   PROP_CHUNK_SIZE,
152   PROP_STATS,
153   PROP_STOP_COMMANDS,
154 };
155 
156 /* pad templates */
157 
158 static GstStaticPadTemplate gst_rtmp2_sink_sink_template =
159 GST_STATIC_PAD_TEMPLATE ("sink",
160     GST_PAD_SINK,
161     GST_PAD_ALWAYS,
162     GST_STATIC_CAPS ("video/x-flv")
163     );
164 
165 /* class initialization */
166 
167 G_DEFINE_TYPE_WITH_CODE (GstRtmp2Sink, gst_rtmp2_sink, GST_TYPE_BASE_SINK,
168     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
169         gst_rtmp2_sink_uri_handler_init);
170     G_IMPLEMENT_INTERFACE (GST_TYPE_RTMP_LOCATION_HANDLER, NULL));
171 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (rtmp2sink, "rtmp2sink",
172     GST_RANK_PRIMARY + 1, GST_TYPE_RTMP2_SINK, rtmp2_element_init (plugin));
173 
174 static void
gst_rtmp2_sink_class_init(GstRtmp2SinkClass * klass)175 gst_rtmp2_sink_class_init (GstRtmp2SinkClass * klass)
176 {
177   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
178   GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass);
179 
180   gst_element_class_add_static_pad_template (GST_ELEMENT_CLASS (klass),
181       &gst_rtmp2_sink_sink_template);
182 
183   gst_element_class_set_static_metadata (GST_ELEMENT_CLASS (klass),
184       "RTMP sink element", "Sink", "Sink element for RTMP streams",
185       "Make.TV, Inc. <info@make.tv>");
186 
187   gobject_class->set_property = gst_rtmp2_sink_set_property;
188   gobject_class->get_property = gst_rtmp2_sink_get_property;
189   gobject_class->finalize = gst_rtmp2_sink_finalize;
190   base_sink_class->start = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_start);
191   base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_stop);
192   base_sink_class->event = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_event);
193   base_sink_class->unlock = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_unlock);
194   base_sink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_unlock_stop);
195   base_sink_class->render = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_render);
196   base_sink_class->set_caps = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_set_caps);
197 
198   g_object_class_override_property (gobject_class, PROP_LOCATION, "location");
199   g_object_class_override_property (gobject_class, PROP_SCHEME, "scheme");
200   g_object_class_override_property (gobject_class, PROP_HOST, "host");
201   g_object_class_override_property (gobject_class, PROP_PORT, "port");
202   g_object_class_override_property (gobject_class, PROP_APPLICATION,
203       "application");
204   g_object_class_override_property (gobject_class, PROP_STREAM, "stream");
205   g_object_class_override_property (gobject_class, PROP_SECURE_TOKEN,
206       "secure-token");
207   g_object_class_override_property (gobject_class, PROP_USERNAME, "username");
208   g_object_class_override_property (gobject_class, PROP_PASSWORD, "password");
209   g_object_class_override_property (gobject_class, PROP_AUTHMOD, "authmod");
210   g_object_class_override_property (gobject_class, PROP_TIMEOUT, "timeout");
211   g_object_class_override_property (gobject_class, PROP_TLS_VALIDATION_FLAGS,
212       "tls-validation-flags");
213   g_object_class_override_property (gobject_class, PROP_FLASH_VERSION,
214       "flash-version");
215 
216   g_object_class_install_property (gobject_class, PROP_ASYNC_CONNECT,
217       g_param_spec_boolean ("async-connect", "Async connect",
218           "Connect on READY, otherwise on first push", TRUE,
219           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
220 
221   g_object_class_install_property (gobject_class, PROP_PEAK_KBPS,
222       g_param_spec_uint ("peak-kbps", "Peak bitrate",
223           "Bitrate in kbit/sec to pace outgoing packets", 0, G_MAXINT / 125, 0,
224           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
225           GST_PARAM_MUTABLE_PLAYING));
226 
227   g_object_class_install_property (gobject_class, PROP_CHUNK_SIZE,
228       g_param_spec_uint ("chunk-size", "Chunk size", "RTMP chunk size",
229           GST_RTMP_MINIMUM_CHUNK_SIZE, GST_RTMP_MAXIMUM_CHUNK_SIZE,
230           GST_RTMP_DEFAULT_CHUNK_SIZE, G_PARAM_READWRITE |
231           G_PARAM_STATIC_STRINGS | GST_PARAM_MUTABLE_PLAYING));
232 
233   g_object_class_install_property (gobject_class, PROP_STATS,
234       g_param_spec_boxed ("stats", "Stats", "Retrieve a statistics structure",
235           GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
236 
237   /**
238    * GstRtmp2Sink:stop-commands:
239    *
240    * Which commands (if any) to send on EOS event before closing connection
241    *
242    * Since: 1.20
243    */
244   g_object_class_install_property (gobject_class, PROP_STOP_COMMANDS,
245       g_param_spec_flags ("stop-commands", "Stop commands",
246           "RTMP commands to send on EOS event before closing connection",
247           GST_TYPE_RTMP_STOP_COMMANDS, GST_RTMP_DEFAULT_STOP_COMMANDS,
248           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
249 
250   gst_type_mark_as_plugin_api (GST_TYPE_RTMP_LOCATION_HANDLER, 0);
251   GST_DEBUG_CATEGORY_INIT (gst_rtmp2_sink_debug_category, "rtmp2sink", 0,
252       "debug category for rtmp2sink element");
253 }
254 
255 static void
gst_rtmp2_sink_init(GstRtmp2Sink * self)256 gst_rtmp2_sink_init (GstRtmp2Sink * self)
257 {
258   self->location.flash_ver = g_strdup ("FMLE/3.0 (compatible; FMSc/1.0)");
259   self->location.publish = TRUE;
260   self->async_connect = TRUE;
261   self->chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
262   self->stop_commands = GST_RTMP_DEFAULT_STOP_COMMANDS;
263 
264   g_mutex_init (&self->lock);
265   g_cond_init (&self->cond);
266 
267   self->task = gst_task_new (gst_rtmp2_sink_task_func, self, NULL);
268   g_rec_mutex_init (&self->task_lock);
269   gst_task_set_lock (self->task, &self->task_lock);
270 
271   self->headers = g_ptr_array_new_with_free_func
272       ((GDestroyNotify) gst_mini_object_unref);
273 }
274 
275 static void
gst_rtmp2_sink_uri_handler_init(GstURIHandlerInterface * iface)276 gst_rtmp2_sink_uri_handler_init (GstURIHandlerInterface * iface)
277 {
278   gst_rtmp_location_handler_implement_uri_handler (iface, GST_URI_SINK);
279 }
280 
281 static void
gst_rtmp2_sink_set_property(GObject * object,guint property_id,const GValue * value,GParamSpec * pspec)282 gst_rtmp2_sink_set_property (GObject * object, guint property_id,
283     const GValue * value, GParamSpec * pspec)
284 {
285   GstRtmp2Sink *self = GST_RTMP2_SINK (object);
286 
287   switch (property_id) {
288     case PROP_LOCATION:
289       gst_rtmp_location_handler_set_uri (GST_RTMP_LOCATION_HANDLER (self),
290           g_value_get_string (value));
291       break;
292     case PROP_SCHEME:
293       GST_OBJECT_LOCK (self);
294       self->location.scheme = g_value_get_enum (value);
295       GST_OBJECT_UNLOCK (self);
296       break;
297     case PROP_HOST:
298       GST_OBJECT_LOCK (self);
299       g_free (self->location.host);
300       self->location.host = g_value_dup_string (value);
301       GST_OBJECT_UNLOCK (self);
302       break;
303     case PROP_PORT:
304       GST_OBJECT_LOCK (self);
305       self->location.port = g_value_get_int (value);
306       GST_OBJECT_UNLOCK (self);
307       break;
308     case PROP_APPLICATION:
309       GST_OBJECT_LOCK (self);
310       g_free (self->location.application);
311       self->location.application = g_value_dup_string (value);
312       GST_OBJECT_UNLOCK (self);
313       break;
314     case PROP_STREAM:
315       GST_OBJECT_LOCK (self);
316       g_free (self->location.stream);
317       self->location.stream = g_value_dup_string (value);
318       GST_OBJECT_UNLOCK (self);
319       break;
320     case PROP_SECURE_TOKEN:
321       GST_OBJECT_LOCK (self);
322       g_free (self->location.secure_token);
323       self->location.secure_token = g_value_dup_string (value);
324       GST_OBJECT_UNLOCK (self);
325       break;
326     case PROP_USERNAME:
327       GST_OBJECT_LOCK (self);
328       g_free (self->location.username);
329       self->location.username = g_value_dup_string (value);
330       GST_OBJECT_UNLOCK (self);
331       break;
332     case PROP_PASSWORD:
333       GST_OBJECT_LOCK (self);
334       g_free (self->location.password);
335       self->location.password = g_value_dup_string (value);
336       GST_OBJECT_UNLOCK (self);
337       break;
338     case PROP_AUTHMOD:
339       GST_OBJECT_LOCK (self);
340       self->location.authmod = g_value_get_enum (value);
341       GST_OBJECT_UNLOCK (self);
342       break;
343     case PROP_TIMEOUT:
344       GST_OBJECT_LOCK (self);
345       self->location.timeout = g_value_get_uint (value);
346       GST_OBJECT_UNLOCK (self);
347       break;
348     case PROP_TLS_VALIDATION_FLAGS:
349       GST_OBJECT_LOCK (self);
350       self->location.tls_flags = g_value_get_flags (value);
351       GST_OBJECT_UNLOCK (self);
352       break;
353     case PROP_FLASH_VERSION:
354       GST_OBJECT_LOCK (self);
355       g_free (self->location.flash_ver);
356       self->location.flash_ver = g_value_dup_string (value);
357       GST_OBJECT_UNLOCK (self);
358       break;
359     case PROP_ASYNC_CONNECT:
360       GST_OBJECT_LOCK (self);
361       self->async_connect = g_value_get_boolean (value);
362       GST_OBJECT_UNLOCK (self);
363       break;
364     case PROP_PEAK_KBPS:
365       g_mutex_lock (&self->lock);
366 
367       GST_OBJECT_LOCK (self);
368       self->peak_kbps = g_value_get_uint (value);
369       GST_OBJECT_UNLOCK (self);
370 
371       set_pacing_rate (self);
372       g_mutex_unlock (&self->lock);
373       break;
374     case PROP_CHUNK_SIZE:
375       g_mutex_lock (&self->lock);
376 
377       GST_OBJECT_LOCK (self);
378       self->chunk_size = g_value_get_uint (value);
379       GST_OBJECT_UNLOCK (self);
380 
381       set_chunk_size (self);
382       g_mutex_unlock (&self->lock);
383       break;
384     case PROP_STOP_COMMANDS:
385       GST_OBJECT_LOCK (self);
386       self->stop_commands = g_value_get_flags (value);
387       GST_OBJECT_UNLOCK (self);
388       break;
389     default:
390       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
391       break;
392   }
393 }
394 
395 static void
gst_rtmp2_sink_get_property(GObject * object,guint property_id,GValue * value,GParamSpec * pspec)396 gst_rtmp2_sink_get_property (GObject * object, guint property_id,
397     GValue * value, GParamSpec * pspec)
398 {
399   GstRtmp2Sink *self = GST_RTMP2_SINK (object);
400 
401   switch (property_id) {
402     case PROP_LOCATION:
403       GST_OBJECT_LOCK (self);
404       g_value_take_string (value, gst_rtmp_location_get_string (&self->location,
405               TRUE));
406       GST_OBJECT_UNLOCK (self);
407       break;
408     case PROP_SCHEME:
409       GST_OBJECT_LOCK (self);
410       g_value_set_enum (value, self->location.scheme);
411       GST_OBJECT_UNLOCK (self);
412       break;
413     case PROP_HOST:
414       GST_OBJECT_LOCK (self);
415       g_value_set_string (value, self->location.host);
416       GST_OBJECT_UNLOCK (self);
417       break;
418     case PROP_PORT:
419       GST_OBJECT_LOCK (self);
420       g_value_set_int (value, self->location.port);
421       GST_OBJECT_UNLOCK (self);
422       break;
423     case PROP_APPLICATION:
424       GST_OBJECT_LOCK (self);
425       g_value_set_string (value, self->location.application);
426       GST_OBJECT_UNLOCK (self);
427       break;
428     case PROP_STREAM:
429       GST_OBJECT_LOCK (self);
430       g_value_set_string (value, self->location.stream);
431       GST_OBJECT_UNLOCK (self);
432       break;
433     case PROP_SECURE_TOKEN:
434       GST_OBJECT_LOCK (self);
435       g_value_set_string (value, self->location.secure_token);
436       GST_OBJECT_UNLOCK (self);
437       break;
438     case PROP_USERNAME:
439       GST_OBJECT_LOCK (self);
440       g_value_set_string (value, self->location.username);
441       GST_OBJECT_UNLOCK (self);
442       break;
443     case PROP_PASSWORD:
444       GST_OBJECT_LOCK (self);
445       g_value_set_string (value, self->location.password);
446       GST_OBJECT_UNLOCK (self);
447       break;
448     case PROP_AUTHMOD:
449       GST_OBJECT_LOCK (self);
450       g_value_set_enum (value, self->location.authmod);
451       GST_OBJECT_UNLOCK (self);
452       break;
453     case PROP_TIMEOUT:
454       GST_OBJECT_LOCK (self);
455       g_value_set_uint (value, self->location.timeout);
456       GST_OBJECT_UNLOCK (self);
457       break;
458     case PROP_TLS_VALIDATION_FLAGS:
459       GST_OBJECT_LOCK (self);
460       g_value_set_flags (value, self->location.tls_flags);
461       GST_OBJECT_UNLOCK (self);
462       break;
463     case PROP_FLASH_VERSION:
464       GST_OBJECT_LOCK (self);
465       g_value_set_string (value, self->location.flash_ver);
466       GST_OBJECT_UNLOCK (self);
467       break;
468     case PROP_ASYNC_CONNECT:
469       GST_OBJECT_LOCK (self);
470       g_value_set_boolean (value, self->async_connect);
471       GST_OBJECT_UNLOCK (self);
472       break;
473     case PROP_PEAK_KBPS:
474       GST_OBJECT_LOCK (self);
475       g_value_set_uint (value, self->peak_kbps);
476       GST_OBJECT_UNLOCK (self);
477       break;
478     case PROP_CHUNK_SIZE:
479       GST_OBJECT_LOCK (self);
480       g_value_set_uint (value, self->chunk_size);
481       GST_OBJECT_UNLOCK (self);
482       break;
483     case PROP_STATS:
484       g_value_take_boxed (value, gst_rtmp2_sink_get_stats (self));
485       break;
486     case PROP_STOP_COMMANDS:
487       GST_OBJECT_LOCK (self);
488       g_value_set_flags (value, self->stop_commands);
489       GST_OBJECT_UNLOCK (self);
490       break;
491     default:
492       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
493       break;
494   }
495 }
496 
497 static void
gst_rtmp2_sink_finalize(GObject * object)498 gst_rtmp2_sink_finalize (GObject * object)
499 {
500   GstRtmp2Sink *self = GST_RTMP2_SINK (object);
501 
502   g_clear_pointer (&self->headers, g_ptr_array_unref);
503 
504   g_clear_object (&self->cancellable);
505   g_clear_object (&self->connection);
506 
507   g_clear_object (&self->task);
508   g_rec_mutex_clear (&self->task_lock);
509 
510   g_mutex_clear (&self->lock);
511   g_cond_clear (&self->cond);
512 
513   g_clear_pointer (&self->stats, gst_structure_free);
514   gst_rtmp_location_clear (&self->location);
515 
516   G_OBJECT_CLASS (gst_rtmp2_sink_parent_class)->finalize (object);
517 }
518 
519 static gboolean
gst_rtmp2_sink_start(GstBaseSink * sink)520 gst_rtmp2_sink_start (GstBaseSink * sink)
521 {
522   GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
523   gboolean async;
524 
525   GST_OBJECT_LOCK (self);
526   async = self->async_connect;
527   GST_OBJECT_UNLOCK (self);
528 
529   GST_INFO_OBJECT (self, "Starting (%s)", async ? "async" : "delayed");
530 
531   g_clear_object (&self->cancellable);
532 
533   self->running = TRUE;
534   self->cancellable = g_cancellable_new ();
535   self->stream_id = 0;
536   self->last_ts = 0;
537   self->base_ts = 0;
538 
539   if (async) {
540     gst_task_start (self->task);
541   }
542 
543   return TRUE;
544 }
545 
546 static gboolean
quit_invoker(gpointer user_data)547 quit_invoker (gpointer user_data)
548 {
549   g_main_loop_quit (user_data);
550   return G_SOURCE_REMOVE;
551 }
552 
553 static void
stop_task(GstRtmp2Sink * self)554 stop_task (GstRtmp2Sink * self)
555 {
556   gst_task_stop (self->task);
557   self->running = FALSE;
558 
559   if (self->cancellable) {
560     GST_DEBUG_OBJECT (self, "Cancelling");
561     g_cancellable_cancel (self->cancellable);
562   }
563 
564   if (self->loop) {
565     GST_DEBUG_OBJECT (self, "Stopping loop");
566     g_main_context_invoke_full (self->context, G_PRIORITY_DEFAULT_IDLE,
567         quit_invoker, g_main_loop_ref (self->loop),
568         (GDestroyNotify) g_main_loop_unref);
569   }
570 
571   g_cond_broadcast (&self->cond);
572 }
573 
574 static gboolean
gst_rtmp2_sink_stop(GstBaseSink * sink)575 gst_rtmp2_sink_stop (GstBaseSink * sink)
576 {
577   GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
578 
579   GST_DEBUG_OBJECT (self, "stop");
580 
581   g_mutex_lock (&self->lock);
582   stop_task (self);
583   g_mutex_unlock (&self->lock);
584 
585   gst_task_join (self->task);
586 
587   return TRUE;
588 }
589 
590 static gboolean
stop_publish_invoker(gpointer user_data)591 stop_publish_invoker (gpointer user_data)
592 {
593   GstRtmp2Sink *self = user_data;
594 
595   if (self->connection) {
596     GST_OBJECT_LOCK (self);
597     if (self->stop_commands != GST_RTMP_STOP_COMMANDS_NONE) {
598       gst_rtmp_client_stop_publish (self->connection, self->location.stream,
599           self->stop_commands);
600     }
601     GST_OBJECT_UNLOCK (self);
602   }
603 
604   return G_SOURCE_REMOVE;
605 }
606 
607 static gboolean
gst_rtmp2_sink_event(GstBaseSink * sink,GstEvent * event)608 gst_rtmp2_sink_event (GstBaseSink * sink, GstEvent * event)
609 {
610   GstEventType type;
611   GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
612 
613   type = GST_EVENT_TYPE (event);
614 
615   switch (type) {
616     case GST_EVENT_EOS:
617       g_mutex_lock (&self->lock);
618       if (self->loop) {
619         GST_DEBUG_OBJECT (self, "Got EOS: stopping publish");
620         g_main_context_invoke (self->context, stop_publish_invoker, self);
621       }
622       g_mutex_unlock (&self->lock);
623       break;
624     default:
625       break;
626   }
627 
628   return GST_BASE_SINK_CLASS (gst_rtmp2_sink_parent_class)->event (sink, event);
629 }
630 
631 static gboolean
gst_rtmp2_sink_unlock(GstBaseSink * sink)632 gst_rtmp2_sink_unlock (GstBaseSink * sink)
633 {
634   GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
635 
636   GST_DEBUG_OBJECT (self, "unlock");
637 
638   g_mutex_lock (&self->lock);
639   self->flushing = TRUE;
640   g_cond_broadcast (&self->cond);
641   g_mutex_unlock (&self->lock);
642 
643   return TRUE;
644 }
645 
646 static gboolean
gst_rtmp2_sink_unlock_stop(GstBaseSink * sink)647 gst_rtmp2_sink_unlock_stop (GstBaseSink * sink)
648 {
649   GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
650 
651   GST_DEBUG_OBJECT (self, "unlock_stop");
652 
653   g_mutex_lock (&self->lock);
654   self->flushing = FALSE;
655   g_mutex_unlock (&self->lock);
656 
657   return TRUE;
658 }
659 
660 static gboolean
buffer_to_message(GstRtmp2Sink * self,GstBuffer * buffer,GstBuffer ** outbuf)661 buffer_to_message (GstRtmp2Sink * self, GstBuffer * buffer, GstBuffer ** outbuf)
662 {
663   GstBuffer *message;
664   GstRtmpFlvTagHeader header;
665   guint64 timestamp;
666   guint32 cstream;
667 
668   {
669     GstMapInfo info;
670 
671     if (G_UNLIKELY (!gst_buffer_map (buffer, &info, GST_MAP_READ))) {
672       GST_ERROR_OBJECT (self, "map failed: %" GST_PTR_FORMAT, buffer);
673       return FALSE;
674     }
675 
676     /* FIXME: This is ugly and only works behind flvmux.
677      *        Implement true RTMP muxing. */
678 
679     if (G_UNLIKELY (info.size >= 4 && memcmp (info.data, "FLV", 3) == 0)) {
680       /* drop the header, we don't need it */
681       GST_DEBUG_OBJECT (self, "ignoring FLV header: %" GST_PTR_FORMAT, buffer);
682       gst_buffer_unmap (buffer, &info);
683       *outbuf = NULL;
684       return TRUE;
685     }
686 
687     if (!gst_rtmp_flv_tag_parse_header (&header, info.data, info.size)) {
688       GST_ERROR_OBJECT (self, "too small for tag header: %" GST_PTR_FORMAT,
689           buffer);
690       gst_buffer_unmap (buffer, &info);
691       return FALSE;
692     }
693 
694     if (info.size < header.total_size) {
695       GST_ERROR_OBJECT (self, "too small for tag body: buffer %" G_GSIZE_FORMAT
696           ", tag %" G_GSIZE_FORMAT, info.size, header.total_size);
697       gst_buffer_unmap (buffer, &info);
698       return FALSE;
699     }
700 
701     /* flvmux timestamps roll over after about 49 days */
702     timestamp = header.timestamp;
703     if (timestamp + self->base_ts + G_MAXINT32 < self->last_ts) {
704       GST_WARNING_OBJECT (self, "Timestamp regression %" G_GUINT64_FORMAT
705           " -> %" G_GUINT64_FORMAT "; assuming overflow", self->last_ts,
706           timestamp + self->base_ts);
707       self->base_ts += G_MAXUINT32;
708       self->base_ts += 1;
709     } else if (timestamp + self->base_ts > self->last_ts + G_MAXINT32) {
710       GST_WARNING_OBJECT (self, "Timestamp jump %" G_GUINT64_FORMAT
711           " -> %" G_GUINT64_FORMAT "; assuming underflow", self->last_ts,
712           timestamp + self->base_ts);
713       if (self->base_ts > 0) {
714         self->base_ts -= G_MAXUINT32;
715         self->base_ts -= 1;
716       } else {
717         GST_WARNING_OBJECT (self, "Cannot regress further;"
718             " forcing timestamp to zero");
719         timestamp = 0;
720       }
721     }
722     timestamp += self->base_ts;
723     self->last_ts = timestamp;
724 
725     gst_buffer_unmap (buffer, &info);
726   }
727 
728   switch (header.type) {
729     case GST_RTMP_MESSAGE_TYPE_DATA_AMF0:
730       cstream = 4;
731       break;
732 
733     case GST_RTMP_MESSAGE_TYPE_AUDIO:
734       cstream = 5;
735       break;
736 
737     case GST_RTMP_MESSAGE_TYPE_VIDEO:
738       cstream = 6;
739       break;
740 
741     default:
742       GST_ERROR_OBJECT (self, "unknown tag type %d", header.type);
743       return FALSE;
744   }
745 
746   /* May not know stream ID yet; set later */
747   message = gst_rtmp_message_new (header.type, cstream, 0);
748   message = gst_buffer_append_region (message, gst_buffer_ref (buffer),
749       GST_RTMP_FLV_TAG_HEADER_SIZE, header.payload_size);
750 
751   GST_BUFFER_DTS (message) = timestamp * GST_MSECOND;
752 
753   *outbuf = message;
754   return TRUE;
755 }
756 
757 static gboolean
should_drop_header(GstRtmp2Sink * self,GstBuffer * buffer)758 should_drop_header (GstRtmp2Sink * self, GstBuffer * buffer)
759 {
760   guint len;
761 
762   if (G_LIKELY (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER))) {
763     return FALSE;
764   }
765 
766   g_mutex_lock (&self->lock);
767   len = self->headers->len;
768   g_mutex_unlock (&self->lock);
769 
770   /* Drop header buffers when we have streamheader caps */
771   return len > 0;
772 }
773 
774 static void
send_message(GstRtmp2Sink * self,GstBuffer * message)775 send_message (GstRtmp2Sink * self, GstBuffer * message)
776 {
777   GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (message);
778 
779   g_return_if_fail (meta != NULL);
780   g_return_if_fail (self->stream_id != 0);
781 
782   meta->mstream = self->stream_id;
783 
784   if (gst_rtmp_message_is_metadata (message)) {
785     gst_rtmp_connection_set_data_frame (self->connection, message);
786   } else {
787     gst_rtmp_connection_queue_message (self->connection, message);
788   }
789 }
790 
791 static void
send_streamheader(GstRtmp2Sink * self)792 send_streamheader (GstRtmp2Sink * self)
793 {
794   guint i;
795 
796   if (G_LIKELY (self->headers->len == 0)) {
797     return;
798   }
799 
800   GST_DEBUG_OBJECT (self, "Sending %u streamheader messages",
801       self->headers->len);
802 
803   for (i = 0; i < self->headers->len; i++) {
804     send_message (self, g_ptr_array_index (self->headers, i));
805   }
806 
807   /* Steal pointers: suppress free */
808   g_ptr_array_set_free_func (self->headers, NULL);
809   g_ptr_array_set_size (self->headers, 0);
810   g_ptr_array_set_free_func (self->headers,
811       (GDestroyNotify) gst_mini_object_unref);
812 }
813 
814 static inline gboolean
is_running(GstRtmp2Sink * self)815 is_running (GstRtmp2Sink * self)
816 {
817   return G_LIKELY (self->running && !self->flushing);
818 }
819 
820 static GstFlowReturn
gst_rtmp2_sink_render(GstBaseSink * sink,GstBuffer * buffer)821 gst_rtmp2_sink_render (GstBaseSink * sink, GstBuffer * buffer)
822 {
823   GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
824   GstBuffer *message;
825   GstFlowReturn ret;
826 
827   if (G_UNLIKELY (should_drop_header (self, buffer))) {
828     GST_DEBUG_OBJECT (self, "Skipping header %" GST_PTR_FORMAT, buffer);
829     return GST_FLOW_OK;
830   }
831 
832   GST_LOG_OBJECT (self, "render %" GST_PTR_FORMAT, buffer);
833 
834   if (G_UNLIKELY (!buffer_to_message (self, buffer, &message))) {
835     GST_ELEMENT_ERROR (self, STREAM, FAILED, ("Failed to convert FLV to RTMP"),
836         ("Failed to convert %" GST_PTR_FORMAT, message));
837     return GST_FLOW_ERROR;
838   }
839 
840   if (G_UNLIKELY (!message)) {
841     GST_DEBUG_OBJECT (self, "Skipping %" GST_PTR_FORMAT, buffer);
842     return GST_FLOW_OK;
843   }
844 
845   g_mutex_lock (&self->lock);
846 
847   if (G_UNLIKELY (is_running (self) && self->cancellable &&
848           gst_task_get_state (self->task) != GST_TASK_STARTED)) {
849     GST_DEBUG_OBJECT (self, "Starting connect");
850     gst_task_start (self->task);
851   }
852 
853   while (G_UNLIKELY (is_running (self) && !self->connection)) {
854     GST_DEBUG_OBJECT (self, "Waiting for connection");
855     g_cond_wait (&self->cond, &self->lock);
856   }
857 
858   while (G_UNLIKELY (is_running (self) && self->connection &&
859           gst_rtmp_connection_get_num_queued (self->connection) > 3)) {
860     GST_LOG_OBJECT (self, "Waiting for queue");
861     g_cond_wait (&self->cond, &self->lock);
862   }
863 
864   if (G_UNLIKELY (!is_running (self))) {
865     gst_buffer_unref (message);
866     ret = GST_FLOW_FLUSHING;
867   } else if (G_UNLIKELY (!self->connection)) {
868     gst_buffer_unref (message);
869     /* send_connect_error has sent an ERROR message */
870     ret = GST_FLOW_ERROR;
871   } else {
872     send_streamheader (self);
873     send_message (self, message);
874     ret = GST_FLOW_OK;
875   }
876 
877   g_mutex_unlock (&self->lock);
878   return ret;
879 }
880 
881 static gboolean
add_streamheader(GstRtmp2Sink * self,const GValue * value)882 add_streamheader (GstRtmp2Sink * self, const GValue * value)
883 {
884   GstBuffer *buffer, *message;
885 
886   g_return_val_if_fail (value, FALSE);
887 
888   if (!GST_VALUE_HOLDS_BUFFER (value)) {
889     GST_ERROR_OBJECT (self, "'streamheader' item of unexpected type '%s'",
890         G_VALUE_TYPE_NAME (value));
891     return FALSE;
892   }
893 
894   buffer = gst_value_get_buffer (value);
895 
896   if (!buffer_to_message (self, buffer, &message)) {
897     GST_ERROR_OBJECT (self, "Failed to read streamheader %" GST_PTR_FORMAT,
898         buffer);
899     return FALSE;
900   }
901 
902   if (message) {
903     GST_DEBUG_OBJECT (self, "Adding streamheader %" GST_PTR_FORMAT, buffer);
904     g_ptr_array_add (self->headers, message);
905   } else {
906     GST_DEBUG_OBJECT (self, "Skipping streamheader %" GST_PTR_FORMAT, buffer);
907   }
908 
909   return TRUE;
910 }
911 
912 static gboolean
gst_rtmp2_sink_set_caps(GstBaseSink * sink,GstCaps * caps)913 gst_rtmp2_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
914 {
915   GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
916   GstStructure *s;
917   const GValue *streamheader;
918   guint i = 0;
919 
920   GST_DEBUG_OBJECT (self, "setcaps %" GST_PTR_FORMAT, caps);
921 
922   g_ptr_array_set_size (self->headers, 0);
923 
924   s = gst_caps_get_structure (caps, 0);
925   streamheader = gst_structure_get_value (s, "streamheader");
926 
927   if (!streamheader) {
928     GST_DEBUG_OBJECT (self, "'streamheader' field not present");
929   } else if (GST_VALUE_HOLDS_BUFFER (streamheader)) {
930     GST_DEBUG_OBJECT (self, "'streamheader' field holds buffer");
931     if (!add_streamheader (self, streamheader)) {
932       return FALSE;
933     }
934 
935     i = 1;
936   } else if (GST_VALUE_HOLDS_ARRAY (streamheader)) {
937     guint size = gst_value_array_get_size (streamheader);
938 
939     GST_DEBUG_OBJECT (self, "'streamheader' field holds array");
940 
941     for (; i < size; i++) {
942       const GValue *v = gst_value_array_get_value (streamheader, i);
943 
944       if (!add_streamheader (self, v)) {
945         return FALSE;
946       }
947     }
948   } else {
949     GST_ERROR_OBJECT (self, "'streamheader' field has unexpected type '%s'",
950         G_VALUE_TYPE_NAME (streamheader));
951     return FALSE;
952   }
953 
954   GST_DEBUG_OBJECT (self, "Collected streamheaders: %u buffers -> %u messages",
955       i, self->headers->len);
956 
957   return TRUE;
958 }
959 
960 /* Mainloop task */
961 static void
gst_rtmp2_sink_task_func(gpointer user_data)962 gst_rtmp2_sink_task_func (gpointer user_data)
963 {
964   GstRtmp2Sink *self = GST_RTMP2_SINK (user_data);
965   GMainContext *context;
966   GMainLoop *loop;
967   GTask *connector;
968 
969   GST_DEBUG_OBJECT (self, "gst_rtmp2_sink_task starting");
970   g_mutex_lock (&self->lock);
971 
972   context = self->context = g_main_context_new ();
973   g_main_context_push_thread_default (context);
974   loop = self->loop = g_main_loop_new (context, TRUE);
975   connector = g_task_new (self, self->cancellable, connect_task_done, NULL);
976 
977   g_clear_pointer (&self->stats, gst_structure_free);
978 
979   GST_OBJECT_LOCK (self);
980   gst_rtmp_client_connect_async (&self->location, self->cancellable,
981       client_connect_done, connector);
982   GST_OBJECT_UNLOCK (self);
983 
984   /* Run loop */
985   g_mutex_unlock (&self->lock);
986   g_main_loop_run (loop);
987   g_mutex_lock (&self->lock);
988 
989   if (self->connection) {
990     self->stats = gst_rtmp_connection_get_stats (self->connection);
991   }
992 
993   g_clear_pointer (&self->loop, g_main_loop_unref);
994   g_clear_pointer (&self->connection, gst_rtmp_connection_close_and_unref);
995   g_cond_broadcast (&self->cond);
996 
997   /* Run loop cleanup */
998   g_mutex_unlock (&self->lock);
999   while (g_main_context_pending (context)) {
1000     GST_DEBUG_OBJECT (self, "iterating main context to clean up");
1001     g_main_context_iteration (context, FALSE);
1002   }
1003   g_main_context_pop_thread_default (context);
1004   g_mutex_lock (&self->lock);
1005 
1006   g_clear_pointer (&self->context, g_main_context_unref);
1007   g_ptr_array_set_size (self->headers, 0);
1008 
1009   g_mutex_unlock (&self->lock);
1010   GST_DEBUG_OBJECT (self, "gst_rtmp2_sink_task exiting");
1011 }
1012 
1013 static void
client_connect_done(GObject * source,GAsyncResult * result,gpointer user_data)1014 client_connect_done (GObject * source, GAsyncResult * result,
1015     gpointer user_data)
1016 {
1017   GTask *task = user_data;
1018   GstRtmp2Sink *self = g_task_get_source_object (task);
1019   GError *error = NULL;
1020   GstRtmpConnection *connection;
1021 
1022   connection = gst_rtmp_client_connect_finish (result, &error);
1023   if (!connection) {
1024     g_task_return_error (task, error);
1025     g_object_unref (task);
1026     return;
1027   }
1028 
1029   g_task_set_task_data (task, connection, g_object_unref);
1030 
1031   if (g_task_return_error_if_cancelled (task)) {
1032     g_object_unref (task);
1033     return;
1034   }
1035 
1036   GST_OBJECT_LOCK (self);
1037   gst_rtmp_client_start_publish_async (connection, self->location.stream,
1038       g_task_get_cancellable (task), start_publish_done, task);
1039   GST_OBJECT_UNLOCK (self);
1040 }
1041 
1042 static void
start_publish_done(GObject * source,GAsyncResult * result,gpointer user_data)1043 start_publish_done (GObject * source, GAsyncResult * result, gpointer user_data)
1044 {
1045   GTask *task = G_TASK (user_data);
1046   GstRtmp2Sink *self = g_task_get_source_object (task);
1047   GstRtmpConnection *connection = g_task_get_task_data (task);
1048   GError *error = NULL;
1049 
1050   if (g_task_return_error_if_cancelled (task)) {
1051     g_object_unref (task);
1052     return;
1053   }
1054 
1055   if (gst_rtmp_client_start_publish_finish (connection, result,
1056           &self->stream_id, &error)) {
1057     g_task_return_pointer (task, g_object_ref (connection),
1058         gst_rtmp_connection_close_and_unref);
1059   } else {
1060     g_task_return_error (task, error);
1061   }
1062 
1063   g_task_set_task_data (task, NULL, NULL);
1064   g_object_unref (task);
1065 }
1066 
1067 static void
put_chunk(GstRtmpConnection * connection,gpointer user_data)1068 put_chunk (GstRtmpConnection * connection, gpointer user_data)
1069 {
1070   GstRtmp2Sink *self = GST_RTMP2_SINK (user_data);
1071 
1072   g_mutex_lock (&self->lock);
1073   g_cond_signal (&self->cond);
1074   g_mutex_unlock (&self->lock);
1075 }
1076 
1077 static void
error_callback(GstRtmpConnection * connection,GstRtmp2Sink * self)1078 error_callback (GstRtmpConnection * connection, GstRtmp2Sink * self)
1079 {
1080   g_mutex_lock (&self->lock);
1081   if (self->cancellable) {
1082     g_cancellable_cancel (self->cancellable);
1083   } else if (self->loop) {
1084     GST_ELEMENT_ERROR (self, RESOURCE, WRITE, ("Connection error"), (NULL));
1085     stop_task (self);
1086   }
1087   g_mutex_unlock (&self->lock);
1088 }
1089 
1090 static void
send_connect_error(GstRtmp2Sink * self,GError * error)1091 send_connect_error (GstRtmp2Sink * self, GError * error)
1092 {
1093   if (!error) {
1094     GST_ERROR_OBJECT (self, "Connect failed with NULL error");
1095     GST_ELEMENT_ERROR (self, RESOURCE, FAILED, ("Failed to connect"), (NULL));
1096     return;
1097   }
1098 
1099   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
1100     GST_DEBUG_OBJECT (self, "Connection was cancelled (%s)",
1101         GST_STR_NULL (error->message));
1102     return;
1103   }
1104 
1105   GST_ERROR_OBJECT (self, "Failed to connect (%s:%d): %s",
1106       g_quark_to_string (error->domain), error->code,
1107       GST_STR_NULL (error->message));
1108 
1109   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED)) {
1110     GST_ELEMENT_ERROR (self, RESOURCE, NOT_AUTHORIZED,
1111         ("Not authorized to connect"), ("%s", GST_STR_NULL (error->message)));
1112   } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CONNECTION_REFUSED)) {
1113     GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ,
1114         ("Could not connect"), ("%s", GST_STR_NULL (error->message)));
1115   } else {
1116     GST_ELEMENT_ERROR (self, RESOURCE, FAILED,
1117         ("Failed to connect"),
1118         ("error %s:%d: %s", g_quark_to_string (error->domain), error->code,
1119             GST_STR_NULL (error->message)));
1120   }
1121 }
1122 
1123 static void
connect_task_done(GObject * object,GAsyncResult * result,gpointer user_data)1124 connect_task_done (GObject * object, GAsyncResult * result, gpointer user_data)
1125 {
1126   GstRtmp2Sink *self = GST_RTMP2_SINK (object);
1127   GTask *task = G_TASK (result);
1128   GError *error = NULL;
1129 
1130   g_mutex_lock (&self->lock);
1131 
1132   g_warn_if_fail (g_task_is_valid (task, object));
1133 
1134   if (self->cancellable == g_task_get_cancellable (task)) {
1135     g_clear_object (&self->cancellable);
1136   }
1137 
1138   self->connection = g_task_propagate_pointer (task, &error);
1139   if (self->connection) {
1140     set_pacing_rate (self);
1141     set_chunk_size (self);
1142     gst_rtmp_connection_set_output_handler (self->connection,
1143         put_chunk, g_object_ref (self), g_object_unref);
1144     g_signal_connect_object (self->connection, "error",
1145         G_CALLBACK (error_callback), self, 0);
1146   } else {
1147     send_connect_error (self, error);
1148     stop_task (self);
1149     g_error_free (error);
1150   }
1151 
1152   g_cond_broadcast (&self->cond);
1153   g_mutex_unlock (&self->lock);
1154 }
1155 
1156 static gboolean
socket_set_pacing_rate(GSocket * socket,gint pacing_rate,GError ** error)1157 socket_set_pacing_rate (GSocket * socket, gint pacing_rate, GError ** error)
1158 {
1159 #ifdef SO_MAX_PACING_RATE
1160   if (!g_socket_set_option (socket, SOL_SOCKET, SO_MAX_PACING_RATE,
1161           pacing_rate, error)) {
1162     g_prefix_error (error, "setsockopt failed: ");
1163     return FALSE;
1164   }
1165 #else
1166   if (pacing_rate != -1) {
1167     g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
1168         "SO_MAX_PACING_RATE is not supported");
1169     return FALSE;
1170   }
1171 #endif
1172 
1173   return TRUE;
1174 }
1175 
1176 static void
set_pacing_rate(GstRtmp2Sink * self)1177 set_pacing_rate (GstRtmp2Sink * self)
1178 {
1179   GError *error = NULL;
1180   gint pacing_rate;
1181 
1182   if (!self->connection)
1183     return;
1184 
1185   GST_OBJECT_LOCK (self);
1186   pacing_rate = self->peak_kbps ? self->peak_kbps * 125 : -1;
1187   GST_OBJECT_UNLOCK (self);
1188 
1189   if (socket_set_pacing_rate (gst_rtmp_connection_get_socket (self->connection),
1190           pacing_rate, &error))
1191     GST_INFO_OBJECT (self, "Set pacing rate to %d Bps", pacing_rate);
1192   else
1193     GST_WARNING_OBJECT (self, "Could not set pacing rate: %s", error->message);
1194 
1195   g_clear_error (&error);
1196 }
1197 
1198 static void
set_chunk_size(GstRtmp2Sink * self)1199 set_chunk_size (GstRtmp2Sink * self)
1200 {
1201   guint32 chunk_size;
1202 
1203   if (!self->connection)
1204     return;
1205 
1206   GST_OBJECT_LOCK (self);
1207   chunk_size = self->chunk_size;
1208   GST_OBJECT_UNLOCK (self);
1209 
1210   gst_rtmp_connection_set_chunk_size (self->connection, chunk_size);
1211   GST_INFO_OBJECT (self, "Set chunk size to %" G_GUINT32_FORMAT, chunk_size);
1212 }
1213 
1214 static GstStructure *
gst_rtmp2_sink_get_stats(GstRtmp2Sink * self)1215 gst_rtmp2_sink_get_stats (GstRtmp2Sink * self)
1216 {
1217   GstStructure *s;
1218 
1219   g_mutex_lock (&self->lock);
1220 
1221   if (self->connection) {
1222     s = gst_rtmp_connection_get_stats (self->connection);
1223   } else if (self->stats) {
1224     s = gst_structure_copy (self->stats);
1225   } else {
1226     s = gst_rtmp_connection_get_null_stats ();
1227   }
1228 
1229   g_mutex_unlock (&self->lock);
1230 
1231   return s;
1232 }
1233