• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * GStreamer
3  * Copyright (C) 2010 Jan Schmidt <thaytan@noraisin.net>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 
21 /**
22  * SECTION:element-rtmpsink
23  * @title: rtmpsink
24  *
25  * This element delivers data to a streaming server via RTMP. It uses
26  * librtmp, and supports any protocols/urls that librtmp supports.
27  * The URL/location can contain extra connection or session parameters
28  * for librtmp, such as 'flashver=version'. See the librtmp documentation
29  * for more detail
30  *
31  * ## Example launch line
32  * |[
33  * gst-launch-1.0 -v videotestsrc ! ffenc_flv ! flvmux ! rtmpsink location='rtmp://localhost/path/to/stream live=1'
34  * ]| Encode a test video stream to FLV video format and stream it via RTMP.
35  *
36  */
37 
38 #ifdef HAVE_CONFIG_H
39 #include "config.h"
40 #endif
41 
42 #include <gst/gst.h>
43 
44 #include "gstrtmpelements.h"
45 #include "gstrtmpsink.h"
46 
47 #ifdef G_OS_WIN32
48 #include <winsock2.h>
49 #endif
50 
51 #include <stdlib.h>
52 
53 GST_DEBUG_CATEGORY_STATIC (gst_rtmp_sink_debug);
54 #define GST_CAT_DEFAULT gst_rtmp_sink_debug
55 
56 #define DEFAULT_LOCATION NULL
57 
58 enum
59 {
60   PROP_0,
61   PROP_LOCATION
62 };
63 
64 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
65     GST_PAD_SINK,
66     GST_PAD_ALWAYS,
67     GST_STATIC_CAPS ("video/x-flv")
68     );
69 
70 static void gst_rtmp_sink_uri_handler_init (gpointer g_iface,
71     gpointer iface_data);
72 static void gst_rtmp_sink_set_property (GObject * object, guint prop_id,
73     const GValue * value, GParamSpec * pspec);
74 static void gst_rtmp_sink_get_property (GObject * object, guint prop_id,
75     GValue * value, GParamSpec * pspec);
76 static void gst_rtmp_sink_finalize (GObject * object);
77 static gboolean gst_rtmp_sink_stop (GstBaseSink * sink);
78 static gboolean gst_rtmp_sink_start (GstBaseSink * sink);
79 static gboolean gst_rtmp_sink_event (GstBaseSink * sink, GstEvent * event);
80 static gboolean gst_rtmp_sink_setcaps (GstBaseSink * sink, GstCaps * caps);
81 static GstFlowReturn gst_rtmp_sink_render (GstBaseSink * sink, GstBuffer * buf);
82 
83 #define gst_rtmp_sink_parent_class parent_class
84 G_DEFINE_TYPE_WITH_CODE (GstRTMPSink, gst_rtmp_sink, GST_TYPE_BASE_SINK,
85     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
86         gst_rtmp_sink_uri_handler_init));
87 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (rtmpsink, "rtmpsink", GST_RANK_PRIMARY,
88     GST_TYPE_RTMP_SINK, rtmp_element_init (plugin));
89 
90 /* initialize the plugin's class */
91 static void
gst_rtmp_sink_class_init(GstRTMPSinkClass * klass)92 gst_rtmp_sink_class_init (GstRTMPSinkClass * klass)
93 {
94   GObjectClass *gobject_class;
95   GstElementClass *gstelement_class;
96   GstBaseSinkClass *gstbasesink_class;
97 
98   gobject_class = (GObjectClass *) klass;
99   gstelement_class = (GstElementClass *) klass;
100   gstbasesink_class = (GstBaseSinkClass *) klass;
101 
102   gobject_class->finalize = gst_rtmp_sink_finalize;
103   gobject_class->set_property = gst_rtmp_sink_set_property;
104   gobject_class->get_property = gst_rtmp_sink_get_property;
105 
106   g_object_class_install_property (gobject_class, PROP_LOCATION,
107       g_param_spec_string ("location", "RTMP Location", "RTMP url",
108           DEFAULT_LOCATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
109 
110   gst_element_class_set_static_metadata (gstelement_class,
111       "RTMP output sink",
112       "Sink/Network", "Sends FLV content to a server via RTMP",
113       "Jan Schmidt <thaytan@noraisin.net>");
114 
115   gst_element_class_add_static_pad_template (gstelement_class, &sink_template);
116 
117   gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_rtmp_sink_start);
118   gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp_sink_stop);
119   gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_rtmp_sink_render);
120   gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_rtmp_sink_setcaps);
121   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_rtmp_sink_event);
122 
123   GST_DEBUG_CATEGORY_INIT (gst_rtmp_sink_debug, "rtmpsink", 0,
124       "RTMP server element");
125 }
126 
127 /* initialize the new element
128  * initialize instance structure
129  */
130 static void
gst_rtmp_sink_init(GstRTMPSink * sink)131 gst_rtmp_sink_init (GstRTMPSink * sink)
132 {
133 #ifdef G_OS_WIN32
134   WSADATA wsa_data;
135 
136   if (WSAStartup (MAKEWORD (2, 2), &wsa_data) != 0) {
137     GST_ERROR_OBJECT (sink, "WSAStartup failed: 0x%08x", WSAGetLastError ());
138   }
139 #endif
140 }
141 
142 static void
gst_rtmp_sink_finalize(GObject * object)143 gst_rtmp_sink_finalize (GObject * object)
144 {
145   GstRTMPSink *sink = GST_RTMP_SINK (object);
146 
147 #ifdef G_OS_WIN32
148   WSACleanup ();
149 #endif
150   g_free (sink->uri);
151 
152   G_OBJECT_CLASS (parent_class)->finalize (object);
153 }
154 
155 
156 static gboolean
gst_rtmp_sink_start(GstBaseSink * basesink)157 gst_rtmp_sink_start (GstBaseSink * basesink)
158 {
159   GstRTMPSink *sink = GST_RTMP_SINK (basesink);
160 
161   if (!sink->uri) {
162     GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE,
163         ("Please set URI for RTMP output"), ("No URI set before starting"));
164     return FALSE;
165   }
166 
167   sink->rtmp_uri = g_strdup (sink->uri);
168   sink->rtmp = RTMP_Alloc ();
169 
170   if (!sink->rtmp) {
171     GST_ERROR_OBJECT (sink, "Could not allocate librtmp's RTMP context");
172     goto error;
173   }
174 
175   RTMP_Init (sink->rtmp);
176   if (!RTMP_SetupURL (sink->rtmp, sink->rtmp_uri)) {
177     GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE, (NULL),
178         ("Failed to setup URL '%s'", sink->uri));
179     goto error;
180   }
181 
182   GST_DEBUG_OBJECT (sink, "Created RTMP object");
183 
184   /* Mark this as an output connection */
185   RTMP_EnableWrite (sink->rtmp);
186 
187   sink->first = TRUE;
188   sink->have_write_error = FALSE;
189 
190   return TRUE;
191 
192 error:
193   if (sink->rtmp) {
194     RTMP_Free (sink->rtmp);
195     sink->rtmp = NULL;
196   }
197   g_free (sink->rtmp_uri);
198   sink->rtmp_uri = NULL;
199   return FALSE;
200 }
201 
202 static gboolean
gst_rtmp_sink_stop(GstBaseSink * basesink)203 gst_rtmp_sink_stop (GstBaseSink * basesink)
204 {
205   GstRTMPSink *sink = GST_RTMP_SINK (basesink);
206 
207   if (sink->header) {
208     gst_buffer_unref (sink->header);
209     sink->header = NULL;
210   }
211   if (sink->rtmp) {
212     RTMP_Close (sink->rtmp);
213     RTMP_Free (sink->rtmp);
214     sink->rtmp = NULL;
215   }
216   if (sink->rtmp_uri) {
217     g_free (sink->rtmp_uri);
218     sink->rtmp_uri = NULL;
219   }
220 
221   return TRUE;
222 }
223 
224 static GstFlowReturn
gst_rtmp_sink_render(GstBaseSink * bsink,GstBuffer * buf)225 gst_rtmp_sink_render (GstBaseSink * bsink, GstBuffer * buf)
226 {
227   GstRTMPSink *sink = GST_RTMP_SINK (bsink);
228   gboolean need_unref = FALSE;
229   GstMapInfo map = GST_MAP_INFO_INIT;
230 
231   if (sink->rtmp == NULL) {
232     /* Do not crash */
233     GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL), ("Failed to write data"));
234     return GST_FLOW_ERROR;
235   }
236 
237   /* Ignore buffers that are in the stream headers (caps) */
238   if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
239     return GST_FLOW_OK;
240   }
241 
242   if (sink->first) {
243     /* open the connection */
244     if (!RTMP_IsConnected (sink->rtmp)) {
245       if (!RTMP_Connect (sink->rtmp, NULL))
246         goto connection_failed;
247 
248       if (!RTMP_ConnectStream (sink->rtmp, 0)) {
249         RTMP_Close (sink->rtmp);
250         goto connection_failed;
251       }
252 
253       GST_DEBUG_OBJECT (sink, "Opened connection to %s", sink->rtmp_uri);
254     }
255 
256     /* Prepend the header from the caps to the first non header buffer */
257     if (sink->header) {
258       buf = gst_buffer_append (gst_buffer_ref (sink->header),
259           gst_buffer_ref (buf));
260       need_unref = TRUE;
261     }
262 
263     sink->first = FALSE;
264   }
265 
266   if (sink->have_write_error)
267     goto write_failed;
268 
269   GST_LOG_OBJECT (sink, "Sending %" G_GSIZE_FORMAT " bytes to RTMP server",
270       gst_buffer_get_size (buf));
271 
272   gst_buffer_map (buf, &map, GST_MAP_READ);
273 
274   if (RTMP_Write (sink->rtmp, (char *) map.data, map.size) <= 0)
275     goto write_failed;
276 
277   gst_buffer_unmap (buf, &map);
278   if (need_unref)
279     gst_buffer_unref (buf);
280 
281   return GST_FLOW_OK;
282 
283   /* ERRORS */
284 write_failed:
285   {
286     GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL), ("Failed to write data"));
287     gst_buffer_unmap (buf, &map);
288     if (need_unref)
289       gst_buffer_unref (buf);
290     sink->have_write_error = TRUE;
291     return GST_FLOW_ERROR;
292   }
293 
294 connection_failed:
295   {
296     GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE, (NULL),
297         ("Could not connect to RTMP stream \"%s\" for writing", sink->uri));
298     RTMP_Free (sink->rtmp);
299     sink->rtmp = NULL;
300     g_free (sink->rtmp_uri);
301     sink->rtmp_uri = NULL;
302     sink->have_write_error = TRUE;
303 
304     return GST_FLOW_ERROR;
305   }
306 }
307 
308 /*
309  * URI interface support.
310  */
311 static GstURIType
gst_rtmp_sink_uri_get_type(GType type)312 gst_rtmp_sink_uri_get_type (GType type)
313 {
314   return GST_URI_SINK;
315 }
316 
317 static const gchar *const *
gst_rtmp_sink_uri_get_protocols(GType type)318 gst_rtmp_sink_uri_get_protocols (GType type)
319 {
320   static const gchar *protocols[] =
321       { "rtmp", "rtmpt", "rtmps", "rtmpe", "rtmfp", "rtmpte", "rtmpts", NULL };
322 
323   return protocols;
324 }
325 
326 static gchar *
gst_rtmp_sink_uri_get_uri(GstURIHandler * handler)327 gst_rtmp_sink_uri_get_uri (GstURIHandler * handler)
328 {
329   GstRTMPSink *sink = GST_RTMP_SINK (handler);
330 
331   /* FIXME: make thread-safe */
332   return g_strdup (sink->uri);
333 }
334 
335 static gboolean
gst_rtmp_sink_uri_set_uri(GstURIHandler * handler,const gchar * uri,GError ** error)336 gst_rtmp_sink_uri_set_uri (GstURIHandler * handler, const gchar * uri,
337     GError ** error)
338 {
339   GstRTMPSink *sink = GST_RTMP_SINK (handler);
340   gboolean ret = TRUE;
341 
342   if (GST_STATE (sink) >= GST_STATE_PAUSED) {
343     g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
344         "Changing the URI on rtmpsink when it is running is not supported");
345     return FALSE;
346   }
347 
348   g_free (sink->uri);
349   sink->uri = NULL;
350 
351   if (uri != NULL) {
352     int protocol;
353     AVal host;
354     unsigned int port;
355     AVal playpath, app;
356 
357     if (!RTMP_ParseURL (uri, &protocol, &host, &port, &playpath, &app) ||
358         !host.av_len) {
359       GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE,
360           ("Failed to parse URI %s", uri), (NULL));
361       g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
362           "Could not parse RTMP URI");
363       ret = FALSE;
364     } else {
365       sink->uri = g_strdup (uri);
366     }
367 
368     if (playpath.av_val)
369       free (playpath.av_val);
370   }
371 
372   if (ret) {
373     sink->have_write_error = FALSE;
374     GST_DEBUG_OBJECT (sink, "Changed URI to %s", GST_STR_NULL (uri));
375   }
376 
377   return ret;
378 }
379 
380 static void
gst_rtmp_sink_uri_handler_init(gpointer g_iface,gpointer iface_data)381 gst_rtmp_sink_uri_handler_init (gpointer g_iface, gpointer iface_data)
382 {
383   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
384 
385   iface->get_type = gst_rtmp_sink_uri_get_type;
386   iface->get_protocols = gst_rtmp_sink_uri_get_protocols;
387   iface->get_uri = gst_rtmp_sink_uri_get_uri;
388   iface->set_uri = gst_rtmp_sink_uri_set_uri;
389 }
390 
391 static void
gst_rtmp_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)392 gst_rtmp_sink_set_property (GObject * object, guint prop_id,
393     const GValue * value, GParamSpec * pspec)
394 {
395   GstRTMPSink *sink = GST_RTMP_SINK (object);
396 
397   switch (prop_id) {
398     case PROP_LOCATION:
399       gst_rtmp_sink_uri_set_uri (GST_URI_HANDLER (sink),
400           g_value_get_string (value), NULL);
401       break;
402     default:
403       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
404       break;
405   }
406 }
407 
408 static gboolean
gst_rtmp_sink_setcaps(GstBaseSink * sink,GstCaps * caps)409 gst_rtmp_sink_setcaps (GstBaseSink * sink, GstCaps * caps)
410 {
411   GstRTMPSink *rtmpsink = GST_RTMP_SINK (sink);
412   GstStructure *s;
413   const GValue *sh;
414 
415   GST_DEBUG_OBJECT (sink, "caps set to %" GST_PTR_FORMAT, caps);
416 
417   /* Clear our current header buffer */
418   if (rtmpsink->header) {
419     gst_buffer_unref (rtmpsink->header);
420     rtmpsink->header = NULL;
421   }
422 
423   s = gst_caps_get_structure (caps, 0);
424 
425   sh = gst_structure_get_value (s, "streamheader");
426   if (sh == NULL)
427     goto out;
428 
429   if (GST_VALUE_HOLDS_BUFFER (sh)) {
430     rtmpsink->header = gst_buffer_ref (gst_value_get_buffer (sh));
431   } else if (GST_VALUE_HOLDS_ARRAY (sh)) {
432     GArray *buffers;
433     gint i;
434 
435     buffers = g_value_peek_pointer (sh);
436 
437     /* Concatenate all buffers in streamheader into one */
438     rtmpsink->header = gst_buffer_new ();
439     for (i = 0; i < buffers->len; ++i) {
440       GValue *val;
441       GstBuffer *buf;
442 
443       val = &g_array_index (buffers, GValue, i);
444       buf = g_value_peek_pointer (val);
445 
446       gst_buffer_ref (buf);
447 
448       rtmpsink->header = gst_buffer_append (rtmpsink->header, buf);
449     }
450   } else {
451     GST_ERROR_OBJECT (rtmpsink, "streamheader field has unexpected type %s",
452         G_VALUE_TYPE_NAME (sh));
453   }
454 
455   GST_DEBUG_OBJECT (rtmpsink, "have %" G_GSIZE_FORMAT " bytes of header data",
456       gst_buffer_get_size (rtmpsink->header));
457 
458 out:
459 
460   return TRUE;
461 }
462 
463 static gboolean
gst_rtmp_sink_event(GstBaseSink * sink,GstEvent * event)464 gst_rtmp_sink_event (GstBaseSink * sink, GstEvent * event)
465 {
466   GstRTMPSink *rtmpsink = GST_RTMP_SINK (sink);
467 
468   switch (event->type) {
469     case GST_EVENT_FLUSH_STOP:
470       rtmpsink->have_write_error = FALSE;
471       break;
472     default:
473       break;
474   }
475 
476   return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
477 }
478 
479 static void
gst_rtmp_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)480 gst_rtmp_sink_get_property (GObject * object, guint prop_id,
481     GValue * value, GParamSpec * pspec)
482 {
483   GstRTMPSink *sink = GST_RTMP_SINK (object);
484 
485   switch (prop_id) {
486     case PROP_LOCATION:
487       g_value_set_string (value, sink->uri);
488       break;
489     default:
490       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
491       break;
492   }
493 }
494