1 /* GStreamer
2 * Copyright (C) 2014 David Schleef <ds@schleef.org>
3 * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4 * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19 * Boston, MA 02110-1335, USA.
20 */
21 /**
22 * SECTION:element-rtmp2src
23 *
24 * The rtmp2src element receives input streams from an RTMP server.
25 *
26 * <refsect2>
27 * <title>Example launch line</title>
28 * |[
29 * gst-launch -v rtmp2src ! decodebin ! fakesink
30 * ]|
31 * FIXME Describe what the pipeline does.
32 * </refsect2>
33 */
34
35 #ifdef HAVE_CONFIG_H
36 #include "config.h"
37 #endif
38
39 #include "gstrtmp2elements.h"
40 #include "gstrtmp2src.h"
41
42 #include "gstrtmp2locationhandler.h"
43 #include "rtmp/rtmpclient.h"
44 #include "rtmp/rtmpmessage.h"
45
46 #include <gst/base/gstpushsrc.h>
47 #include <string.h>
48
49 GST_DEBUG_CATEGORY_STATIC (gst_rtmp2_src_debug_category);
50 #define GST_CAT_DEFAULT gst_rtmp2_src_debug_category
51
52 /* prototypes */
53 #define GST_RTMP2_SRC(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP2_SRC,GstRtmp2Src))
54 #define GST_IS_RTMP2_SRC(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP2_SRC))
55
56 typedef struct
57 {
58 GstPushSrc parent_instance;
59
60 /* properties */
61 GstRtmpLocation location;
62 gboolean async_connect;
63 GstStructure *stats;
64 guint idle_timeout;
65
66 /* If both self->lock and OBJECT_LOCK are needed,
67 * self->lock must be taken first */
68 GMutex lock;
69 GCond cond;
70
71 gboolean running, flushing;
72 gboolean timeout;
73 gboolean started;
74
75 GstTask *task;
76 GRecMutex task_lock;
77
78 GMainLoop *loop;
79 GMainContext *context;
80
81 GCancellable *cancellable;
82 GstRtmpConnection *connection;
83 guint32 stream_id;
84
85 GstBuffer *message;
86 gboolean sent_header;
87 GstClockTime last_ts;
88 } GstRtmp2Src;
89
90 typedef struct
91 {
92 GstPushSrcClass parent_class;
93 } GstRtmp2SrcClass;
94
95 /* GObject virtual functions */
96 static void gst_rtmp2_src_set_property (GObject * object,
97 guint property_id, const GValue * value, GParamSpec * pspec);
98 static void gst_rtmp2_src_get_property (GObject * object,
99 guint property_id, GValue * value, GParamSpec * pspec);
100 static void gst_rtmp2_src_finalize (GObject * object);
101 static void gst_rtmp2_src_uri_handler_init (GstURIHandlerInterface * iface);
102
103 /* GstBaseSrc virtual functions */
104 static gboolean gst_rtmp2_src_start (GstBaseSrc * src);
105 static gboolean gst_rtmp2_src_stop (GstBaseSrc * src);
106 static gboolean gst_rtmp2_src_unlock (GstBaseSrc * src);
107 static gboolean gst_rtmp2_src_unlock_stop (GstBaseSrc * src);
108 static GstFlowReturn gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset,
109 guint size, GstBuffer ** outbuf);
110 static gboolean gst_rtmp2_src_query (GstBaseSrc * src, GstQuery * query);
111
112 /* Internal API */
113 static void gst_rtmp2_src_task_func (gpointer user_data);
114 static void client_connect_done (GObject * source, GAsyncResult * result,
115 gpointer user_data);
116 static void start_play_done (GObject * object, GAsyncResult * result,
117 gpointer user_data);
118 static void connect_task_done (GObject * object, GAsyncResult * result,
119 gpointer user_data);
120
121 static GstStructure *gst_rtmp2_src_get_stats (GstRtmp2Src * self);
122
123 enum
124 {
125 PROP_0,
126 PROP_LOCATION,
127 PROP_SCHEME,
128 PROP_HOST,
129 PROP_PORT,
130 PROP_APPLICATION,
131 PROP_STREAM,
132 PROP_SECURE_TOKEN,
133 PROP_USERNAME,
134 PROP_PASSWORD,
135 PROP_AUTHMOD,
136 PROP_TIMEOUT,
137 PROP_TLS_VALIDATION_FLAGS,
138 PROP_FLASH_VERSION,
139 PROP_ASYNC_CONNECT,
140 PROP_STATS,
141 PROP_IDLE_TIMEOUT,
142 };
143
144 #define DEFAULT_IDLE_TIMEOUT 0
145
146 /* pad templates */
147
148 static GstStaticPadTemplate gst_rtmp2_src_src_template =
149 GST_STATIC_PAD_TEMPLATE ("src",
150 GST_PAD_SRC,
151 GST_PAD_ALWAYS,
152 GST_STATIC_CAPS ("video/x-flv")
153 );
154
155 /* class initialization */
156
157 G_DEFINE_TYPE_WITH_CODE (GstRtmp2Src, gst_rtmp2_src, GST_TYPE_PUSH_SRC,
158 G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
159 gst_rtmp2_src_uri_handler_init);
160 G_IMPLEMENT_INTERFACE (GST_TYPE_RTMP_LOCATION_HANDLER, NULL));
161 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (rtmp2src, "rtmp2src",
162 GST_RANK_PRIMARY + 1, GST_TYPE_RTMP2_SRC, rtmp2_element_init (plugin));
163
164 static void
gst_rtmp2_src_class_init(GstRtmp2SrcClass * klass)165 gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass)
166 {
167 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
168 GstBaseSrcClass *base_src_class = GST_BASE_SRC_CLASS (klass);
169
170 gst_element_class_add_static_pad_template (GST_ELEMENT_CLASS (klass),
171 &gst_rtmp2_src_src_template);
172
173 gst_element_class_set_static_metadata (GST_ELEMENT_CLASS (klass),
174 "RTMP source element", "Source", "Source element for RTMP streams",
175 "Make.TV, Inc. <info@make.tv>");
176
177 gobject_class->set_property = gst_rtmp2_src_set_property;
178 gobject_class->get_property = gst_rtmp2_src_get_property;
179 gobject_class->finalize = gst_rtmp2_src_finalize;
180 base_src_class->start = GST_DEBUG_FUNCPTR (gst_rtmp2_src_start);
181 base_src_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp2_src_stop);
182 base_src_class->unlock = GST_DEBUG_FUNCPTR (gst_rtmp2_src_unlock);
183 base_src_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_rtmp2_src_unlock_stop);
184 base_src_class->create = GST_DEBUG_FUNCPTR (gst_rtmp2_src_create);
185 base_src_class->query = GST_DEBUG_FUNCPTR (gst_rtmp2_src_query);
186
187 g_object_class_override_property (gobject_class, PROP_LOCATION, "location");
188 g_object_class_override_property (gobject_class, PROP_SCHEME, "scheme");
189 g_object_class_override_property (gobject_class, PROP_HOST, "host");
190 g_object_class_override_property (gobject_class, PROP_PORT, "port");
191 g_object_class_override_property (gobject_class, PROP_APPLICATION,
192 "application");
193 g_object_class_override_property (gobject_class, PROP_STREAM, "stream");
194 g_object_class_override_property (gobject_class, PROP_SECURE_TOKEN,
195 "secure-token");
196 g_object_class_override_property (gobject_class, PROP_USERNAME, "username");
197 g_object_class_override_property (gobject_class, PROP_PASSWORD, "password");
198 g_object_class_override_property (gobject_class, PROP_AUTHMOD, "authmod");
199 g_object_class_override_property (gobject_class, PROP_TIMEOUT, "timeout");
200 g_object_class_override_property (gobject_class, PROP_TLS_VALIDATION_FLAGS,
201 "tls-validation-flags");
202 g_object_class_override_property (gobject_class, PROP_FLASH_VERSION,
203 "flash-version");
204
205 g_object_class_install_property (gobject_class, PROP_ASYNC_CONNECT,
206 g_param_spec_boolean ("async-connect", "Async connect",
207 "Connect on READY, otherwise on first push", TRUE,
208 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
209
210 g_object_class_install_property (gobject_class, PROP_STATS,
211 g_param_spec_boxed ("stats", "Stats", "Retrieve a statistics structure",
212 GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
213
214 g_object_class_install_property (gobject_class, PROP_IDLE_TIMEOUT,
215 g_param_spec_uint ("idle-timeout", "Idle timeout",
216 "The maximum allowed time in seconds for valid packets not to arrive "
217 "from the peer (0 = no timeout)",
218 0, G_MAXUINT, DEFAULT_IDLE_TIMEOUT,
219 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
220
221 GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0,
222 "debug category for rtmp2src element");
223 }
224
225 static void
gst_rtmp2_src_init(GstRtmp2Src * self)226 gst_rtmp2_src_init (GstRtmp2Src * self)
227 {
228 self->async_connect = TRUE;
229 self->idle_timeout = DEFAULT_IDLE_TIMEOUT;
230
231 g_mutex_init (&self->lock);
232 g_cond_init (&self->cond);
233
234 self->task = gst_task_new (gst_rtmp2_src_task_func, self, NULL);
235 g_rec_mutex_init (&self->task_lock);
236 gst_task_set_lock (self->task, &self->task_lock);
237 }
238
239 static void
gst_rtmp2_src_uri_handler_init(GstURIHandlerInterface * iface)240 gst_rtmp2_src_uri_handler_init (GstURIHandlerInterface * iface)
241 {
242 gst_rtmp_location_handler_implement_uri_handler (iface, GST_URI_SRC);
243 }
244
245 static void
gst_rtmp2_src_set_property(GObject * object,guint property_id,const GValue * value,GParamSpec * pspec)246 gst_rtmp2_src_set_property (GObject * object, guint property_id,
247 const GValue * value, GParamSpec * pspec)
248 {
249 GstRtmp2Src *self = GST_RTMP2_SRC (object);
250
251 switch (property_id) {
252 case PROP_LOCATION:
253 gst_rtmp_location_handler_set_uri (GST_RTMP_LOCATION_HANDLER (self),
254 g_value_get_string (value));
255 break;
256 case PROP_SCHEME:
257 GST_OBJECT_LOCK (self);
258 self->location.scheme = g_value_get_enum (value);
259 GST_OBJECT_UNLOCK (self);
260 break;
261 case PROP_HOST:
262 GST_OBJECT_LOCK (self);
263 g_free (self->location.host);
264 self->location.host = g_value_dup_string (value);
265 GST_OBJECT_UNLOCK (self);
266 break;
267 case PROP_PORT:
268 GST_OBJECT_LOCK (self);
269 self->location.port = g_value_get_int (value);
270 GST_OBJECT_UNLOCK (self);
271 break;
272 case PROP_APPLICATION:
273 GST_OBJECT_LOCK (self);
274 g_free (self->location.application);
275 self->location.application = g_value_dup_string (value);
276 GST_OBJECT_UNLOCK (self);
277 break;
278 case PROP_STREAM:
279 GST_OBJECT_LOCK (self);
280 g_free (self->location.stream);
281 self->location.stream = g_value_dup_string (value);
282 GST_OBJECT_UNLOCK (self);
283 break;
284 case PROP_SECURE_TOKEN:
285 GST_OBJECT_LOCK (self);
286 g_free (self->location.secure_token);
287 self->location.secure_token = g_value_dup_string (value);
288 GST_OBJECT_UNLOCK (self);
289 break;
290 case PROP_USERNAME:
291 GST_OBJECT_LOCK (self);
292 g_free (self->location.username);
293 self->location.username = g_value_dup_string (value);
294 GST_OBJECT_UNLOCK (self);
295 break;
296 case PROP_PASSWORD:
297 GST_OBJECT_LOCK (self);
298 g_free (self->location.password);
299 self->location.password = g_value_dup_string (value);
300 GST_OBJECT_UNLOCK (self);
301 break;
302 case PROP_AUTHMOD:
303 GST_OBJECT_LOCK (self);
304 self->location.authmod = g_value_get_enum (value);
305 GST_OBJECT_UNLOCK (self);
306 break;
307 case PROP_TIMEOUT:
308 GST_OBJECT_LOCK (self);
309 self->location.timeout = g_value_get_uint (value);
310 GST_OBJECT_UNLOCK (self);
311 break;
312 case PROP_TLS_VALIDATION_FLAGS:
313 GST_OBJECT_LOCK (self);
314 self->location.tls_flags = g_value_get_flags (value);
315 GST_OBJECT_UNLOCK (self);
316 break;
317 case PROP_FLASH_VERSION:
318 GST_OBJECT_LOCK (self);
319 g_free (self->location.flash_ver);
320 self->location.flash_ver = g_value_dup_string (value);
321 GST_OBJECT_UNLOCK (self);
322 break;
323 case PROP_ASYNC_CONNECT:
324 GST_OBJECT_LOCK (self);
325 self->async_connect = g_value_get_boolean (value);
326 GST_OBJECT_UNLOCK (self);
327 break;
328 case PROP_IDLE_TIMEOUT:
329 GST_OBJECT_LOCK (self);
330 self->idle_timeout = g_value_get_uint (value);
331 GST_OBJECT_UNLOCK (self);
332 break;
333 default:
334 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
335 break;
336 }
337 }
338
339 static void
gst_rtmp2_src_get_property(GObject * object,guint property_id,GValue * value,GParamSpec * pspec)340 gst_rtmp2_src_get_property (GObject * object, guint property_id,
341 GValue * value, GParamSpec * pspec)
342 {
343 GstRtmp2Src *self = GST_RTMP2_SRC (object);
344
345 switch (property_id) {
346 case PROP_LOCATION:
347 GST_OBJECT_LOCK (self);
348 g_value_take_string (value, gst_rtmp_location_get_string (&self->location,
349 TRUE));
350 GST_OBJECT_UNLOCK (self);
351 break;
352 case PROP_SCHEME:
353 GST_OBJECT_LOCK (self);
354 g_value_set_enum (value, self->location.scheme);
355 GST_OBJECT_UNLOCK (self);
356 break;
357 case PROP_HOST:
358 GST_OBJECT_LOCK (self);
359 g_value_set_string (value, self->location.host);
360 GST_OBJECT_UNLOCK (self);
361 break;
362 case PROP_PORT:
363 GST_OBJECT_LOCK (self);
364 g_value_set_int (value, self->location.port);
365 GST_OBJECT_UNLOCK (self);
366 break;
367 case PROP_APPLICATION:
368 GST_OBJECT_LOCK (self);
369 g_value_set_string (value, self->location.application);
370 GST_OBJECT_UNLOCK (self);
371 break;
372 case PROP_STREAM:
373 GST_OBJECT_LOCK (self);
374 g_value_set_string (value, self->location.stream);
375 GST_OBJECT_UNLOCK (self);
376 break;
377 case PROP_SECURE_TOKEN:
378 GST_OBJECT_LOCK (self);
379 g_value_set_string (value, self->location.secure_token);
380 GST_OBJECT_UNLOCK (self);
381 break;
382 case PROP_USERNAME:
383 GST_OBJECT_LOCK (self);
384 g_value_set_string (value, self->location.username);
385 GST_OBJECT_UNLOCK (self);
386 break;
387 case PROP_PASSWORD:
388 GST_OBJECT_LOCK (self);
389 g_value_set_string (value, self->location.password);
390 GST_OBJECT_UNLOCK (self);
391 break;
392 case PROP_AUTHMOD:
393 GST_OBJECT_LOCK (self);
394 g_value_set_enum (value, self->location.authmod);
395 GST_OBJECT_UNLOCK (self);
396 break;
397 case PROP_TIMEOUT:
398 GST_OBJECT_LOCK (self);
399 g_value_set_uint (value, self->location.timeout);
400 GST_OBJECT_UNLOCK (self);
401 break;
402 case PROP_TLS_VALIDATION_FLAGS:
403 GST_OBJECT_LOCK (self);
404 g_value_set_flags (value, self->location.tls_flags);
405 GST_OBJECT_UNLOCK (self);
406 break;
407 case PROP_FLASH_VERSION:
408 GST_OBJECT_LOCK (self);
409 g_value_set_string (value, self->location.flash_ver);
410 GST_OBJECT_UNLOCK (self);
411 break;
412 case PROP_ASYNC_CONNECT:
413 GST_OBJECT_LOCK (self);
414 g_value_set_boolean (value, self->async_connect);
415 GST_OBJECT_UNLOCK (self);
416 break;
417 case PROP_STATS:
418 g_value_take_boxed (value, gst_rtmp2_src_get_stats (self));
419 break;
420 case PROP_IDLE_TIMEOUT:
421 GST_OBJECT_LOCK (self);
422 g_value_set_uint (value, self->idle_timeout);
423 GST_OBJECT_UNLOCK (self);
424 break;
425 default:
426 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
427 break;
428 }
429 }
430
431 static void
gst_rtmp2_src_finalize(GObject * object)432 gst_rtmp2_src_finalize (GObject * object)
433 {
434 GstRtmp2Src *self = GST_RTMP2_SRC (object);
435
436 gst_buffer_replace (&self->message, NULL);
437
438 g_clear_object (&self->cancellable);
439 g_clear_object (&self->connection);
440
441 g_clear_object (&self->task);
442 g_rec_mutex_clear (&self->task_lock);
443
444 g_mutex_clear (&self->lock);
445 g_cond_clear (&self->cond);
446
447 g_clear_pointer (&self->stats, gst_structure_free);
448 gst_rtmp_location_clear (&self->location);
449
450 G_OBJECT_CLASS (gst_rtmp2_src_parent_class)->finalize (object);
451 }
452
453 static gboolean
gst_rtmp2_src_start(GstBaseSrc * src)454 gst_rtmp2_src_start (GstBaseSrc * src)
455 {
456 GstRtmp2Src *self = GST_RTMP2_SRC (src);
457 gboolean async;
458
459 GST_OBJECT_LOCK (self);
460 async = self->async_connect;
461 GST_OBJECT_UNLOCK (self);
462
463 GST_INFO_OBJECT (self, "Starting (%s)", async ? "async" : "delayed");
464
465 g_clear_object (&self->cancellable);
466
467 self->running = TRUE;
468 self->cancellable = g_cancellable_new ();
469 self->stream_id = 0;
470 self->sent_header = FALSE;
471 self->last_ts = GST_CLOCK_TIME_NONE;
472 self->timeout = FALSE;
473 self->started = FALSE;
474
475 if (async) {
476 gst_task_start (self->task);
477 }
478
479 return TRUE;
480 }
481
482 static gboolean
quit_invoker(gpointer user_data)483 quit_invoker (gpointer user_data)
484 {
485 g_main_loop_quit (user_data);
486 return G_SOURCE_REMOVE;
487 }
488
489 static void
stop_task(GstRtmp2Src * self)490 stop_task (GstRtmp2Src * self)
491 {
492 gst_task_stop (self->task);
493 self->running = FALSE;
494
495 if (self->cancellable) {
496 GST_DEBUG_OBJECT (self, "Cancelling");
497 g_cancellable_cancel (self->cancellable);
498 }
499
500 if (self->loop) {
501 GST_DEBUG_OBJECT (self, "Stopping loop");
502 g_main_context_invoke_full (self->context, G_PRIORITY_DEFAULT_IDLE,
503 quit_invoker, g_main_loop_ref (self->loop),
504 (GDestroyNotify) g_main_loop_unref);
505 }
506
507 g_cond_broadcast (&self->cond);
508 }
509
510 static gboolean
gst_rtmp2_src_stop(GstBaseSrc * src)511 gst_rtmp2_src_stop (GstBaseSrc * src)
512 {
513 GstRtmp2Src *self = GST_RTMP2_SRC (src);
514
515 GST_DEBUG_OBJECT (self, "stop");
516
517 g_mutex_lock (&self->lock);
518 stop_task (self);
519 g_mutex_unlock (&self->lock);
520
521 gst_task_join (self->task);
522
523 return TRUE;
524 }
525
526 static gboolean
gst_rtmp2_src_unlock(GstBaseSrc * src)527 gst_rtmp2_src_unlock (GstBaseSrc * src)
528 {
529 GstRtmp2Src *self = GST_RTMP2_SRC (src);
530
531 GST_DEBUG_OBJECT (self, "unlock");
532
533 g_mutex_lock (&self->lock);
534 self->flushing = TRUE;
535 g_cond_broadcast (&self->cond);
536 g_mutex_unlock (&self->lock);
537
538 return TRUE;
539 }
540
541 static gboolean
gst_rtmp2_src_unlock_stop(GstBaseSrc * src)542 gst_rtmp2_src_unlock_stop (GstBaseSrc * src)
543 {
544 GstRtmp2Src *self = GST_RTMP2_SRC (src);
545
546 GST_DEBUG_OBJECT (self, "unlock_stop");
547
548 g_mutex_lock (&self->lock);
549 self->flushing = FALSE;
550 g_mutex_unlock (&self->lock);
551
552 return TRUE;
553 }
554
555 static gboolean
on_timeout(GstRtmp2Src * self)556 on_timeout (GstRtmp2Src * self)
557 {
558 g_mutex_lock (&self->lock);
559 self->timeout = TRUE;
560 g_cond_broadcast (&self->cond);
561 g_mutex_unlock (&self->lock);
562
563 return G_SOURCE_REMOVE;
564 }
565
566 static GstFlowReturn
gst_rtmp2_src_create(GstBaseSrc * src,guint64 offset,guint size,GstBuffer ** outbuf)567 gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size,
568 GstBuffer ** outbuf)
569 {
570 GstRtmp2Src *self = GST_RTMP2_SRC (src);
571 GstBuffer *message, *buffer;
572 GstRtmpMeta *meta;
573 guint32 timestamp = 0;
574 GSource *timeout = NULL;
575 GstFlowReturn ret = GST_FLOW_OK;
576
577 static const guint8 flv_header_data[] = {
578 0x46, 0x4c, 0x56, 0x01, 0x01, 0x00, 0x00, 0x00,
579 0x09, 0x00, 0x00, 0x00, 0x00,
580 };
581
582 GST_LOG_OBJECT (self, "create");
583
584 g_mutex_lock (&self->lock);
585
586 if (self->running) {
587 gst_task_start (self->task);
588 }
589
590 /* wait until GMainLoop begins running so that we can attach
591 * timeout source safely.
592 * If the task stopped meanwhile, "running" will be FALSE
593 * than stop_task() will wake up us as well
594 */
595 while ((!self->started && self->running) && (!self->loop
596 || !g_main_loop_is_running (self->loop)))
597 g_cond_wait (&self->cond, &self->lock);
598
599 GST_OBJECT_LOCK (self);
600 if (self->idle_timeout && self->context) {
601 timeout = g_timeout_source_new_seconds (self->idle_timeout);
602
603 g_source_set_callback (timeout, (GSourceFunc) on_timeout, self, NULL);
604 g_source_attach (timeout, self->context);
605 }
606 GST_OBJECT_UNLOCK (self);
607
608 while (!self->message) {
609 if (!self->running) {
610 ret = GST_FLOW_EOS;
611 goto out;
612 }
613 if (self->flushing) {
614 ret = GST_FLOW_FLUSHING;
615 goto out;
616 }
617 if (self->timeout) {
618 GST_DEBUG_OBJECT (self, "Idle timeout, return EOS");
619 ret = GST_FLOW_EOS;
620 goto out;
621 }
622 g_cond_wait (&self->cond, &self->lock);
623 }
624
625 if (timeout) {
626 g_source_destroy (timeout);
627 g_source_unref (timeout);
628 }
629
630 message = self->message;
631 self->message = NULL;
632 g_cond_signal (&self->cond);
633 g_mutex_unlock (&self->lock);
634
635 meta = gst_buffer_get_rtmp_meta (message);
636 if (!meta) {
637 GST_ELEMENT_ERROR (self, CORE, FAILED,
638 ("Internal error: No RTMP meta on buffer"),
639 ("No RTMP meta on %" GST_PTR_FORMAT, message));
640 gst_buffer_unref (message);
641 return GST_FLOW_ERROR;
642 }
643
644 if (GST_BUFFER_DTS_IS_VALID (message)) {
645 GstClockTime last_ts = self->last_ts, ts = GST_BUFFER_DTS (message);
646
647 if (GST_CLOCK_TIME_IS_VALID (last_ts) && last_ts > ts) {
648 GST_LOG_OBJECT (self, "Timestamp regression: %" GST_TIME_FORMAT
649 " > %" GST_TIME_FORMAT, GST_TIME_ARGS (last_ts), GST_TIME_ARGS (ts));
650 }
651
652 self->last_ts = ts;
653 timestamp = ts / GST_MSECOND;
654 }
655
656 buffer = gst_buffer_copy_region (message, GST_BUFFER_COPY_MEMORY, 0, -1);
657
658 {
659 guint8 *tag_header = g_malloc (11);
660 GstMemory *memory =
661 gst_memory_new_wrapped (0, tag_header, 11, 0, 11, tag_header, g_free);
662 GST_WRITE_UINT8 (tag_header, meta->type);
663 GST_WRITE_UINT24_BE (tag_header + 1, meta->size);
664 GST_WRITE_UINT24_BE (tag_header + 4, timestamp);
665 GST_WRITE_UINT8 (tag_header + 7, timestamp >> 24);
666 GST_WRITE_UINT24_BE (tag_header + 8, 0);
667 gst_buffer_prepend_memory (buffer, memory);
668 }
669
670 {
671 guint8 *tag_footer = g_malloc (4);
672 GstMemory *memory =
673 gst_memory_new_wrapped (0, tag_footer, 4, 0, 4, tag_footer, g_free);
674 GST_WRITE_UINT32_BE (tag_footer, meta->size + 11);
675 gst_buffer_append_memory (buffer, memory);
676 }
677
678 if (!self->sent_header) {
679 GstMemory *memory = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
680 (guint8 *) flv_header_data, sizeof flv_header_data, 0,
681 sizeof flv_header_data, NULL, NULL);
682 gst_buffer_prepend_memory (buffer, memory);
683 self->sent_header = TRUE;
684 }
685
686 GST_BUFFER_DTS (buffer) = self->last_ts;
687
688 *outbuf = buffer;
689
690 gst_buffer_unref (message);
691 return GST_FLOW_OK;
692
693 out:
694 if (timeout) {
695 g_source_destroy (timeout);
696 g_source_unref (timeout);
697 }
698 /* Keep the unlock after the destruction of the timeout source to workaround
699 * https://gitlab.gnome.org/GNOME/glib/-/issues/803
700 */
701 g_mutex_unlock (&self->lock);
702
703 return ret;
704 }
705
706 static gboolean
gst_rtmp2_src_query(GstBaseSrc * basesrc,GstQuery * query)707 gst_rtmp2_src_query (GstBaseSrc * basesrc, GstQuery * query)
708 {
709 gboolean ret = FALSE;
710
711 switch (GST_QUERY_TYPE (query)) {
712 case GST_QUERY_SCHEDULING:{
713 gst_query_set_scheduling (query,
714 GST_SCHEDULING_FLAG_SEQUENTIAL |
715 GST_SCHEDULING_FLAG_BANDWIDTH_LIMITED, 1, -1, 0);
716 gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
717
718 ret = TRUE;
719 break;
720 }
721 default:
722 ret = FALSE;
723 break;
724 }
725
726 if (!ret)
727 ret =
728 GST_BASE_SRC_CLASS (gst_rtmp2_src_parent_class)->query (basesrc, query);
729
730 return ret;
731 }
732
733 static gboolean
main_loop_running_cb(GstRtmp2Src * self)734 main_loop_running_cb (GstRtmp2Src * self)
735 {
736 GST_TRACE_OBJECT (self, "Main loop running now");
737
738 g_mutex_lock (&self->lock);
739 self->started = TRUE;
740 g_cond_broadcast (&self->cond);
741 g_mutex_unlock (&self->lock);
742
743 return G_SOURCE_REMOVE;
744 }
745
746 /* Mainloop task */
747 static void
gst_rtmp2_src_task_func(gpointer user_data)748 gst_rtmp2_src_task_func (gpointer user_data)
749 {
750 GstRtmp2Src *self = GST_RTMP2_SRC (user_data);
751 GMainContext *context;
752 GMainLoop *loop;
753 GTask *connector;
754 GSource *source;
755
756 GST_DEBUG_OBJECT (self, "gst_rtmp2_src_task starting");
757 g_mutex_lock (&self->lock);
758
759 context = self->context = g_main_context_new ();
760 g_main_context_push_thread_default (context);
761 loop = self->loop = g_main_loop_new (context, TRUE);
762
763 source = g_idle_source_new ();
764 g_source_set_callback (source, (GSourceFunc) main_loop_running_cb, self,
765 NULL);
766 g_source_attach (source, self->context);
767 g_source_unref (source);
768
769 connector = g_task_new (self, self->cancellable, connect_task_done, NULL);
770
771 g_clear_pointer (&self->stats, gst_structure_free);
772
773 GST_OBJECT_LOCK (self);
774 gst_rtmp_client_connect_async (&self->location, self->cancellable,
775 client_connect_done, connector);
776 GST_OBJECT_UNLOCK (self);
777
778 /* Run loop */
779 g_mutex_unlock (&self->lock);
780 g_main_loop_run (loop);
781 g_mutex_lock (&self->lock);
782
783 if (self->connection) {
784 self->stats = gst_rtmp_connection_get_stats (self->connection);
785 }
786
787 g_clear_pointer (&self->loop, g_main_loop_unref);
788 g_clear_pointer (&self->connection, gst_rtmp_connection_close_and_unref);
789 g_cond_broadcast (&self->cond);
790
791 /* Run loop cleanup */
792 g_mutex_unlock (&self->lock);
793 while (g_main_context_pending (context)) {
794 GST_DEBUG_OBJECT (self, "iterating main context to clean up");
795 g_main_context_iteration (context, FALSE);
796 }
797 g_main_context_pop_thread_default (context);
798 g_mutex_lock (&self->lock);
799
800 g_clear_pointer (&self->context, g_main_context_unref);
801 gst_buffer_replace (&self->message, NULL);
802
803 g_mutex_unlock (&self->lock);
804 GST_DEBUG_OBJECT (self, "gst_rtmp2_src_task exiting");
805 }
806
807 static void
client_connect_done(GObject * source,GAsyncResult * result,gpointer user_data)808 client_connect_done (GObject * source, GAsyncResult * result,
809 gpointer user_data)
810 {
811 GTask *task = user_data;
812 GstRtmp2Src *self = g_task_get_source_object (task);
813 GError *error = NULL;
814 GstRtmpConnection *connection;
815
816 connection = gst_rtmp_client_connect_finish (result, &error);
817 if (!connection) {
818 g_task_return_error (task, error);
819 g_object_unref (task);
820 return;
821 }
822
823 g_task_set_task_data (task, connection, g_object_unref);
824
825 if (g_task_return_error_if_cancelled (task)) {
826 g_object_unref (task);
827 return;
828 }
829
830 GST_OBJECT_LOCK (self);
831 gst_rtmp_client_start_play_async (connection, self->location.stream,
832 g_task_get_cancellable (task), start_play_done, task);
833 GST_OBJECT_UNLOCK (self);
834 }
835
836 static void
start_play_done(GObject * source,GAsyncResult * result,gpointer user_data)837 start_play_done (GObject * source, GAsyncResult * result, gpointer user_data)
838 {
839 GTask *task = G_TASK (user_data);
840 GstRtmp2Src *self = g_task_get_source_object (task);
841 GstRtmpConnection *connection = g_task_get_task_data (task);
842 GError *error = NULL;
843
844 if (g_task_return_error_if_cancelled (task)) {
845 g_object_unref (task);
846 return;
847 }
848
849 if (gst_rtmp_client_start_play_finish (connection, result,
850 &self->stream_id, &error)) {
851 g_task_return_pointer (task, g_object_ref (connection),
852 gst_rtmp_connection_close_and_unref);
853 } else {
854 g_task_return_error (task, error);
855 }
856
857 g_task_set_task_data (task, NULL, NULL);
858 g_object_unref (task);
859 }
860
861 static void
got_message(GstRtmpConnection * connection,GstBuffer * buffer,gpointer user_data)862 got_message (GstRtmpConnection * connection, GstBuffer * buffer,
863 gpointer user_data)
864 {
865 GstRtmp2Src *self = GST_RTMP2_SRC (user_data);
866 GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer);
867 guint32 min_size = 1;
868
869 g_return_if_fail (meta);
870
871 if (meta->mstream != self->stream_id) {
872 GST_DEBUG_OBJECT (self, "Ignoring %s message with stream %" G_GUINT32_FORMAT
873 " != %" G_GUINT32_FORMAT, gst_rtmp_message_type_get_nick (meta->type),
874 meta->mstream, self->stream_id);
875 return;
876 }
877
878 switch (meta->type) {
879 case GST_RTMP_MESSAGE_TYPE_VIDEO:
880 min_size = 6;
881 break;
882
883 case GST_RTMP_MESSAGE_TYPE_AUDIO:
884 min_size = 2;
885 break;
886
887 case GST_RTMP_MESSAGE_TYPE_DATA_AMF0:
888 break;
889
890 default:
891 GST_DEBUG_OBJECT (self, "Ignoring %s message, wrong type",
892 gst_rtmp_message_type_get_nick (meta->type));
893 return;
894 }
895
896 if (meta->size < min_size) {
897 GST_DEBUG_OBJECT (self, "Ignoring too small %s message (%" G_GUINT32_FORMAT
898 " < %" G_GUINT32_FORMAT ")",
899 gst_rtmp_message_type_get_nick (meta->type), meta->size, min_size);
900 return;
901 }
902
903 g_mutex_lock (&self->lock);
904 while (self->message) {
905 if (!self->running) {
906 goto out;
907 }
908 g_cond_wait (&self->cond, &self->lock);
909 }
910
911 self->message = gst_buffer_ref (buffer);
912 g_cond_signal (&self->cond);
913
914 out:
915 g_mutex_unlock (&self->lock);
916 return;
917 }
918
919 static void
error_callback(GstRtmpConnection * connection,GstRtmp2Src * self)920 error_callback (GstRtmpConnection * connection, GstRtmp2Src * self)
921 {
922 g_mutex_lock (&self->lock);
923 if (self->cancellable) {
924 g_cancellable_cancel (self->cancellable);
925 } else if (self->loop) {
926 GST_INFO_OBJECT (self, "Connection error");
927 stop_task (self);
928 }
929 g_mutex_unlock (&self->lock);
930 }
931
932 static void
control_callback(GstRtmpConnection * connection,gint uc_type,guint stream_id,GstRtmp2Src * self)933 control_callback (GstRtmpConnection * connection, gint uc_type,
934 guint stream_id, GstRtmp2Src * self)
935 {
936 GST_INFO_OBJECT (self, "stream %u got %s", stream_id,
937 gst_rtmp_user_control_type_get_nick (uc_type));
938
939 if (uc_type == GST_RTMP_USER_CONTROL_TYPE_STREAM_EOF && stream_id == 1) {
940 GST_INFO_OBJECT (self, "went EOS");
941 stop_task (self);
942 }
943 }
944
945 static void
send_connect_error(GstRtmp2Src * self,GError * error)946 send_connect_error (GstRtmp2Src * self, GError * error)
947 {
948 if (!error) {
949 GST_ERROR_OBJECT (self, "Connect failed with NULL error");
950 GST_ELEMENT_ERROR (self, RESOURCE, FAILED, ("Failed to connect"), (NULL));
951 return;
952 }
953
954 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
955 GST_DEBUG_OBJECT (self, "Connection was cancelled (%s)",
956 GST_STR_NULL (error->message));
957 return;
958 }
959
960 GST_ERROR_OBJECT (self, "Failed to connect (%s:%d): %s",
961 g_quark_to_string (error->domain), error->code,
962 GST_STR_NULL (error->message));
963
964 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED)) {
965 GST_ELEMENT_ERROR (self, RESOURCE, NOT_AUTHORIZED,
966 ("Not authorized to connect"), ("%s", GST_STR_NULL (error->message)));
967 } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CONNECTION_REFUSED)) {
968 GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ,
969 ("Could not connect"), ("%s", GST_STR_NULL (error->message)));
970 } else {
971 GST_ELEMENT_ERROR (self, RESOURCE, FAILED,
972 ("Failed to connect"),
973 ("error %s:%d: %s", g_quark_to_string (error->domain), error->code,
974 GST_STR_NULL (error->message)));
975 }
976 }
977
978 static void
connect_task_done(GObject * object,GAsyncResult * result,gpointer user_data)979 connect_task_done (GObject * object, GAsyncResult * result, gpointer user_data)
980 {
981 GstRtmp2Src *self = GST_RTMP2_SRC (object);
982 GTask *task = G_TASK (result);
983 GError *error = NULL;
984
985 g_mutex_lock (&self->lock);
986
987 g_warn_if_fail (g_task_is_valid (task, object));
988
989 if (self->cancellable == g_task_get_cancellable (task)) {
990 g_clear_object (&self->cancellable);
991 }
992
993 self->connection = g_task_propagate_pointer (task, &error);
994 if (self->connection) {
995 gst_rtmp_connection_set_input_handler (self->connection,
996 got_message, g_object_ref (self), g_object_unref);
997 g_signal_connect_object (self->connection, "error",
998 G_CALLBACK (error_callback), self, 0);
999 g_signal_connect_object (self->connection, "stream-control",
1000 G_CALLBACK (control_callback), self, 0);
1001 } else {
1002 send_connect_error (self, error);
1003 stop_task (self);
1004 g_error_free (error);
1005 }
1006
1007 g_cond_broadcast (&self->cond);
1008 g_mutex_unlock (&self->lock);
1009 }
1010
1011 static GstStructure *
gst_rtmp2_src_get_stats(GstRtmp2Src * self)1012 gst_rtmp2_src_get_stats (GstRtmp2Src * self)
1013 {
1014 GstStructure *s;
1015
1016 g_mutex_lock (&self->lock);
1017
1018 if (self->connection) {
1019 s = gst_rtmp_connection_get_stats (self->connection);
1020 } else if (self->stats) {
1021 s = gst_structure_copy (self->stats);
1022 } else {
1023 s = gst_rtmp_connection_get_null_stats ();
1024 }
1025
1026 g_mutex_unlock (&self->lock);
1027
1028 return s;
1029 }
1030