1 /* GStreamer RTMP Library
2 * Copyright (C) 2013 David Schleef <ds@schleef.org>
3 * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4 * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19 * Boston, MA 02110-1335, USA.
20 */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <gst/gst.h>
27 #include <gio/gio.h>
28 #include <string.h>
29 #include "rtmpclient.h"
30 #include "rtmphandshake.h"
31 #include "rtmpmessage.h"
32 #include "rtmputils.h"
33
34 GST_DEBUG_CATEGORY_STATIC (gst_rtmp_client_debug_category);
35 #define GST_CAT_DEFAULT gst_rtmp_client_debug_category
36
37 static void send_connect_done (const gchar * command_name, GPtrArray * args,
38 gpointer user_data);
39 static void create_stream_done (const gchar * command_name, GPtrArray * args,
40 gpointer user_data);
41 static void on_publish_or_play_status (const gchar * command_name,
42 GPtrArray * args, gpointer user_data);
43
44 static void
init_debug(void)45 init_debug (void)
46 {
47 static gsize done = 0;
48 if (g_once_init_enter (&done)) {
49 GST_DEBUG_CATEGORY_INIT (gst_rtmp_client_debug_category,
50 "rtmpclient", 0, "debug category for the rtmp client");
51 GST_DEBUG_REGISTER_FUNCPTR (send_connect_done);
52 GST_DEBUG_REGISTER_FUNCPTR (create_stream_done);
53 GST_DEBUG_REGISTER_FUNCPTR (on_publish_or_play_status);
54 g_once_init_leave (&done, 1);
55 }
56 }
57
58 static const gchar *scheme_strings[] = {
59 "rtmp",
60 "rtmps",
61 NULL
62 };
63
64 #define NUM_SCHEMES (G_N_ELEMENTS (scheme_strings) - 1)
65
66 GType
gst_rtmp_scheme_get_type(void)67 gst_rtmp_scheme_get_type (void)
68 {
69 static gsize scheme_type = 0;
70 static const GEnumValue scheme[] = {
71 {GST_RTMP_SCHEME_RTMP, "GST_RTMP_SCHEME_RTMP", "rtmp"},
72 {GST_RTMP_SCHEME_RTMPS, "GST_RTMP_SCHEME_RTMPS", "rtmps"},
73 {0, NULL, NULL},
74 };
75
76 if (g_once_init_enter (&scheme_type)) {
77 GType tmp = g_enum_register_static ("GstRtmpScheme", scheme);
78 g_once_init_leave (&scheme_type, tmp);
79 }
80
81 return (GType) scheme_type;
82 }
83
84 GstRtmpScheme
gst_rtmp_scheme_from_string(const gchar * string)85 gst_rtmp_scheme_from_string (const gchar * string)
86 {
87 if (string) {
88 gint value;
89
90 for (value = 0; value < NUM_SCHEMES; value++) {
91 if (strcmp (scheme_strings[value], string) == 0) {
92 return value;
93 }
94 }
95 }
96
97 return -1;
98 }
99
100 GstRtmpScheme
gst_rtmp_scheme_from_uri(const GstUri * uri)101 gst_rtmp_scheme_from_uri (const GstUri * uri)
102 {
103 const gchar *scheme = gst_uri_get_scheme (uri);
104 if (!scheme) {
105 return GST_RTMP_SCHEME_RTMP;
106 }
107
108 return gst_rtmp_scheme_from_string (scheme);
109 }
110
111 const gchar *
gst_rtmp_scheme_to_string(GstRtmpScheme scheme)112 gst_rtmp_scheme_to_string (GstRtmpScheme scheme)
113 {
114 if (scheme >= 0 && scheme < NUM_SCHEMES) {
115 return scheme_strings[scheme];
116 }
117
118 return "invalid";
119 }
120
121 const gchar *const *
gst_rtmp_scheme_get_strings(void)122 gst_rtmp_scheme_get_strings (void)
123 {
124 return scheme_strings;
125 }
126
127 guint
gst_rtmp_scheme_get_default_port(GstRtmpScheme scheme)128 gst_rtmp_scheme_get_default_port (GstRtmpScheme scheme)
129 {
130 switch (scheme) {
131 case GST_RTMP_SCHEME_RTMP:
132 return 1935;
133
134 case GST_RTMP_SCHEME_RTMPS:
135 return 443;
136
137 default:
138 g_return_val_if_reached (0);
139 }
140 }
141
142 GType
gst_rtmp_authmod_get_type(void)143 gst_rtmp_authmod_get_type (void)
144 {
145 static gsize authmod_type = 0;
146 static const GEnumValue authmod[] = {
147 {GST_RTMP_AUTHMOD_NONE, "GST_RTMP_AUTHMOD_NONE", "none"},
148 {GST_RTMP_AUTHMOD_AUTO, "GST_RTMP_AUTHMOD_AUTO", "auto"},
149 {GST_RTMP_AUTHMOD_ADOBE, "GST_RTMP_AUTHMOD_ADOBE", "adobe"},
150 {0, NULL, NULL},
151 };
152
153 if (g_once_init_enter (&authmod_type)) {
154 GType tmp = g_enum_register_static ("GstRtmpAuthmod", authmod);
155 g_once_init_leave (&authmod_type, tmp);
156 }
157
158 return (GType) authmod_type;
159 }
160
161 static const gchar *
gst_rtmp_authmod_get_nick(GstRtmpAuthmod value)162 gst_rtmp_authmod_get_nick (GstRtmpAuthmod value)
163 {
164 GEnumClass *klass = g_type_class_peek (GST_TYPE_RTMP_AUTHMOD);
165 GEnumValue *ev = klass ? g_enum_get_value (klass, value) : NULL;
166 return ev ? ev->value_nick : "(unknown)";
167 }
168
169 GType
gst_rtmp_stop_commands_get_type(void)170 gst_rtmp_stop_commands_get_type (void)
171 {
172 static gsize stop_commands_type = 0;
173 static const GFlagsValue stop_commands[] = {
174 {GST_RTMP_STOP_COMMANDS_NONE, "No command", "none"},
175 {GST_RTMP_STOP_COMMANDS_FCUNPUBLISH, "FCUnpublish", "fcunpublish"},
176 {GST_RTMP_STOP_COMMANDS_CLOSE_STREAM, "closeStream", "closestream"},
177 {GST_RTMP_STOP_COMMANDS_DELETE_STREAM, "deleteStream", "deletestream"},
178 {0, NULL, NULL},
179 };
180
181 if (g_once_init_enter (&stop_commands_type)) {
182 GType tmp = g_flags_register_static ("GstRtmpStopCommands", stop_commands);
183 g_once_init_leave (&stop_commands_type, tmp);
184 }
185
186 return (GType) stop_commands_type;
187 }
188
189 void
gst_rtmp_location_copy(GstRtmpLocation * dest,const GstRtmpLocation * src)190 gst_rtmp_location_copy (GstRtmpLocation * dest, const GstRtmpLocation * src)
191 {
192 g_return_if_fail (dest);
193 g_return_if_fail (src);
194
195 dest->scheme = src->scheme;
196 dest->host = g_strdup (src->host);
197 dest->port = src->port;
198 dest->application = g_strdup (src->application);
199 dest->stream = g_strdup (src->stream);
200 dest->username = g_strdup (src->username);
201 dest->password = g_strdup (src->password);
202 dest->secure_token = g_strdup (src->secure_token);
203 dest->authmod = src->authmod;
204 dest->timeout = src->timeout;
205 dest->tls_flags = src->tls_flags;
206 dest->flash_ver = g_strdup (src->flash_ver);
207 dest->publish = src->publish;
208 }
209
210 void
gst_rtmp_location_clear(GstRtmpLocation * location)211 gst_rtmp_location_clear (GstRtmpLocation * location)
212 {
213 g_return_if_fail (location);
214
215 g_clear_pointer (&location->host, g_free);
216 location->port = 0;
217 g_clear_pointer (&location->application, g_free);
218 g_clear_pointer (&location->stream, g_free);
219 g_clear_pointer (&location->username, g_free);
220 g_clear_pointer (&location->password, g_free);
221 g_clear_pointer (&location->secure_token, g_free);
222 g_clear_pointer (&location->flash_ver, g_free);
223 location->publish = FALSE;
224 }
225
226 gchar *
gst_rtmp_location_get_string(const GstRtmpLocation * location,gboolean with_stream)227 gst_rtmp_location_get_string (const GstRtmpLocation * location,
228 gboolean with_stream)
229 {
230 GstUri *uri;
231 gchar *base, *string;
232 const gchar *scheme_string;
233 guint default_port;
234
235 g_return_val_if_fail (location, NULL);
236
237 scheme_string = gst_rtmp_scheme_to_string (location->scheme);
238 default_port = gst_rtmp_scheme_get_default_port (location->scheme);
239
240 uri = gst_uri_new (scheme_string, NULL, location->host,
241 location->port == default_port ? GST_URI_NO_PORT : location->port, "/",
242 NULL, NULL);
243 base = gst_uri_to_string (uri);
244
245 string = g_strconcat (base, location->application, with_stream ? "/" : NULL,
246 location->stream, NULL);
247
248 g_free (base);
249 gst_uri_unref (uri);
250
251 return string;
252 }
253
254 /* Flag values for the audioCodecs property,
255 * rtmp_specification_1.0.pdf page 32 */
256 enum
257 {
258 SUPPORT_SND_NONE = 0x001, /* Raw sound, no compression */
259 SUPPORT_SND_ADPCM = 0x002, /* ADPCM compression */
260 SUPPORT_SND_MP3 = 0x004, /* mp3 compression */
261 SUPPORT_SND_INTEL = 0x008, /* Not used */
262 SUPPORT_SND_UNUSED = 0x010, /* Not used */
263 SUPPORT_SND_NELLY8 = 0x020, /* NellyMoser at 8-kHz compression */
264 SUPPORT_SND_NELLY = 0x040, /* NellyMoser compression
265 * (5, 11, 22, and 44 kHz) */
266 SUPPORT_SND_G711A = 0x080, /* G711A sound compression
267 * (Flash Media Server only) */
268 SUPPORT_SND_G711U = 0x100, /* G711U sound compression
269 * (Flash Media Server only) */
270 SUPPORT_SND_NELLY16 = 0x200, /* NellyMoser at 16-kHz compression */
271 SUPPORT_SND_AAC = 0x400, /* Advanced audio coding (AAC) codec */
272 SUPPORT_SND_SPEEX = 0x800, /* Speex Audio */
273 SUPPORT_SND_ALL = 0xFFF, /* All RTMP-supported audio codecs */
274 };
275
276 /* audioCodecs value sent by libavformat. All "used" codecs. */
277 #define GST_RTMP_AUDIOCODECS \
278 (SUPPORT_SND_ALL & ~SUPPORT_SND_INTEL & ~SUPPORT_SND_UNUSED)
279 G_STATIC_ASSERT (GST_RTMP_AUDIOCODECS == 4071); /* libavformat's magic number */
280
281 /* Flag values for the videoCodecs property,
282 * rtmp_specification_1.0.pdf page 32 */
283 enum
284 {
285 SUPPORT_VID_UNUSED = 0x01, /* Obsolete value */
286 SUPPORT_VID_JPEG = 0x02, /* Obsolete value */
287 SUPPORT_VID_SORENSON = 0x04, /* Sorenson Flash video */
288 SUPPORT_VID_HOMEBREW = 0x08, /* V1 screen sharing */
289 SUPPORT_VID_VP6 = 0x10, /* On2 video (Flash 8+) */
290 SUPPORT_VID_VP6ALPHA = 0x20, /* On2 video with alpha channel */
291 SUPPORT_VID_HOMEBREWV = 0x40, /* Screen sharing version 2 (Flash 8+) */
292 SUPPORT_VID_H264 = 0x80, /* H264 video */
293 SUPPORT_VID_ALL = 0xFF, /* All RTMP-supported video codecs */
294 };
295
296 /* videoCodecs value sent by libavformat. All non-obsolete codecs. */
297 #define GST_RTMP_VIDEOCODECS \
298 (SUPPORT_VID_ALL & ~SUPPORT_VID_UNUSED & ~SUPPORT_VID_JPEG)
299 G_STATIC_ASSERT (GST_RTMP_VIDEOCODECS == 252); /* libavformat's magic number */
300
301 /* Flag values for the videoFunction property,
302 * rtmp_specification_1.0.pdf page 32 */
303 enum
304 {
305 /* Indicates that the client can perform frame-accurate seeks. */
306 SUPPORT_VID_CLIENT_SEEK = 1,
307 };
308
309 /* videoFunction value sent by libavformat */
310 #define GST_RTMP_VIDEOFUNCTION (SUPPORT_VID_CLIENT_SEEK)
311 G_STATIC_ASSERT (GST_RTMP_VIDEOFUNCTION == 1); /* libavformat's magic number */
312
313 static void socket_connect (GTask * task);
314 static void socket_connect_done (GObject * source, GAsyncResult * result,
315 gpointer user_data);
316 static void handshake_done (GObject * source, GAsyncResult * result,
317 gpointer user_data);
318 static void send_connect (GTask * task);
319 static void send_stop (GstRtmpConnection * connection, const gchar * stream,
320 const GstRtmpStopCommands stop_commands);
321 static void send_secure_token_response (GTask * task,
322 GstRtmpConnection * connection, const gchar * challenge);
323 static void connection_error (GstRtmpConnection * connection,
324 gpointer user_data);
325
326 #define DEFAULT_TIMEOUT 5
327
328 typedef struct
329 {
330 GstRtmpLocation location;
331 gchar *auth_query;
332 GstRtmpConnection *connection;
333 gulong error_handler_id;
334 } ConnectTaskData;
335
336 static ConnectTaskData *
connect_task_data_new(const GstRtmpLocation * location)337 connect_task_data_new (const GstRtmpLocation * location)
338 {
339 ConnectTaskData *data = g_slice_new0 (ConnectTaskData);
340 gst_rtmp_location_copy (&data->location, location);
341 return data;
342 }
343
344 static void
connect_task_data_free(gpointer ptr)345 connect_task_data_free (gpointer ptr)
346 {
347 ConnectTaskData *data = ptr;
348 gst_rtmp_location_clear (&data->location);
349 g_clear_pointer (&data->auth_query, g_free);
350 if (data->error_handler_id) {
351 g_signal_handler_disconnect (data->connection, data->error_handler_id);
352 }
353 g_clear_object (&data->connection);
354 g_slice_free (ConnectTaskData, data);
355 }
356
357 static GRegex *auth_regex = NULL;
358
359 void
gst_rtmp_client_connect_async(const GstRtmpLocation * location,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)360 gst_rtmp_client_connect_async (const GstRtmpLocation * location,
361 GCancellable * cancellable, GAsyncReadyCallback callback,
362 gpointer user_data)
363 {
364 GTask *task;
365
366 init_debug ();
367
368 if (g_once_init_enter (&auth_regex)) {
369 GRegex *re = g_regex_new ("\\[ *AccessManager.Reject *\\] *: *"
370 "\\[ *authmod=(?<authmod>.*?) *\\] *: *"
371 "(?<query>\\?.*)\\Z", G_REGEX_DOTALL, 0, NULL);
372 g_once_init_leave (&auth_regex, re);
373 }
374
375 task = g_task_new (NULL, cancellable, callback, user_data);
376
377 g_task_set_task_data (task, connect_task_data_new (location),
378 connect_task_data_free);
379
380 socket_connect (task);
381 }
382
383 static void
socket_connect(GTask * task)384 socket_connect (GTask * task)
385 {
386 ConnectTaskData *data = g_task_get_task_data (task);
387 GSocketConnectable *addr;
388 GSocketClient *socket_client;
389
390 if (data->location.timeout < 0) {
391 data->location.timeout = DEFAULT_TIMEOUT;
392 }
393
394 if (data->error_handler_id) {
395 g_signal_handler_disconnect (data->connection, data->error_handler_id);
396 data->error_handler_id = 0;
397 }
398
399 if (data->connection) {
400 gst_rtmp_connection_close (data->connection);
401 g_clear_object (&data->connection);
402 }
403
404 if (!data->location.host) {
405 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
406 "Host is not set");
407 g_object_unref (task);
408 return;
409 }
410
411 if (!data->location.port) {
412 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
413 "Port is not set");
414 g_object_unref (task);
415 return;
416 }
417
418 socket_client = g_socket_client_new ();
419 g_socket_client_set_timeout (socket_client, data->location.timeout);
420
421 switch (data->location.scheme) {
422 case GST_RTMP_SCHEME_RTMP:
423 break;
424
425 case GST_RTMP_SCHEME_RTMPS:
426 GST_DEBUG ("Configuring TLS, validation flags 0x%02x",
427 data->location.tls_flags);
428 g_socket_client_set_tls (socket_client, TRUE);
429 g_socket_client_set_tls_validation_flags (socket_client,
430 data->location.tls_flags);
431 break;
432
433 default:
434 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
435 "Invalid scheme ID %d", data->location.scheme);
436 g_object_unref (socket_client);
437 g_object_unref (task);
438 return;
439 }
440
441 addr = g_network_address_new (data->location.host, data->location.port);
442
443 GST_DEBUG ("Starting socket connection");
444
445 g_socket_client_connect_async (socket_client, addr,
446 g_task_get_cancellable (task), socket_connect_done, task);
447 g_object_unref (addr);
448 g_object_unref (socket_client);
449 }
450
451 static void
socket_connect_done(GObject * source,GAsyncResult * result,gpointer user_data)452 socket_connect_done (GObject * source, GAsyncResult * result,
453 gpointer user_data)
454 {
455 GSocketClient *socket_client = G_SOCKET_CLIENT (source);
456 GSocketConnection *socket_connection;
457 GTask *task = user_data;
458 GError *error = NULL;
459
460 socket_connection =
461 g_socket_client_connect_finish (socket_client, result, &error);
462
463 if (g_task_return_error_if_cancelled (task)) {
464 GST_DEBUG ("Socket connection was cancelled");
465 g_object_unref (task);
466 return;
467 }
468
469 if (socket_connection == NULL) {
470 GST_ERROR ("Socket connection error");
471 g_task_return_error (task, error);
472 g_object_unref (task);
473 return;
474 }
475
476 GST_DEBUG ("Socket connection established");
477
478 gst_rtmp_client_handshake (G_IO_STREAM (socket_connection), FALSE,
479 g_task_get_cancellable (task), handshake_done, task);
480 g_object_unref (socket_connection);
481 }
482
483
484 static void
handshake_done(GObject * source,GAsyncResult * result,gpointer user_data)485 handshake_done (GObject * source, GAsyncResult * result, gpointer user_data)
486 {
487 GIOStream *stream = G_IO_STREAM (source);
488 GSocketConnection *socket_connection = G_SOCKET_CONNECTION (stream);
489 GTask *task = user_data;
490 ConnectTaskData *data = g_task_get_task_data (task);
491 GError *error = NULL;
492 gboolean res;
493
494 res = gst_rtmp_client_handshake_finish (stream, result, &error);
495 if (!res) {
496 g_io_stream_close_async (stream, G_PRIORITY_DEFAULT, NULL, NULL, NULL);
497 g_task_return_error (task, error);
498 g_object_unref (task);
499 return;
500 }
501
502 data->connection = gst_rtmp_connection_new (socket_connection,
503 g_task_get_cancellable (task));
504 data->error_handler_id = g_signal_connect (data->connection,
505 "error", G_CALLBACK (connection_error), task);
506
507 send_connect (task);
508 }
509
510 static void
connection_error(GstRtmpConnection * connection,gpointer user_data)511 connection_error (GstRtmpConnection * connection, gpointer user_data)
512 {
513 GTask *task = user_data;
514 if (!g_task_had_error (task))
515 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
516 "error during connection attempt");
517 }
518
519 static gchar *
do_adobe_auth(const gchar * username,const gchar * password,const gchar * salt,const gchar * opaque,const gchar * challenge)520 do_adobe_auth (const gchar * username, const gchar * password,
521 const gchar * salt, const gchar * opaque, const gchar * challenge)
522 {
523 guint8 hash[16]; /* MD5 digest */
524 gsize hashlen = sizeof hash;
525 gchar *challenge2, *auth_query;
526 GChecksum *md5;
527
528 g_return_val_if_fail (username, NULL);
529 g_return_val_if_fail (password, NULL);
530 g_return_val_if_fail (salt, NULL);
531
532 md5 = g_checksum_new (G_CHECKSUM_MD5);
533 g_checksum_update (md5, (guchar *) username, -1);
534 g_checksum_update (md5, (guchar *) salt, -1);
535 g_checksum_update (md5, (guchar *) password, -1);
536
537 g_checksum_get_digest (md5, hash, &hashlen);
538 g_warn_if_fail (hashlen == sizeof hash);
539
540 {
541 gchar *hashstr = g_base64_encode ((guchar *) hash, sizeof hash);
542 g_checksum_reset (md5);
543 g_checksum_update (md5, (guchar *) hashstr, -1);
544 g_free (hashstr);
545 }
546
547 if (opaque)
548 g_checksum_update (md5, (guchar *) opaque, -1);
549 else if (challenge)
550 g_checksum_update (md5, (guchar *) challenge, -1);
551
552 challenge2 = g_strdup_printf ("%08x", g_random_int ());
553 g_checksum_update (md5, (guchar *) challenge2, -1);
554
555 g_checksum_get_digest (md5, hash, &hashlen);
556 g_warn_if_fail (hashlen == sizeof hash);
557
558 {
559 gchar *hashstr = g_base64_encode ((guchar *) hash, sizeof hash);
560
561 if (opaque) {
562 auth_query =
563 g_strdup_printf
564 ("authmod=%s&user=%s&challenge=%s&response=%s&opaque=%s", "adobe",
565 username, challenge2, hashstr, opaque);
566 } else {
567 auth_query =
568 g_strdup_printf ("authmod=%s&user=%s&challenge=%s&response=%s",
569 "adobe", username, challenge2, hashstr);
570 }
571 g_free (hashstr);
572 }
573
574 g_checksum_free (md5);
575 g_free (challenge2);
576
577 return auth_query;
578 }
579
580 static void
send_connect(GTask * task)581 send_connect (GTask * task)
582 {
583 ConnectTaskData *data = g_task_get_task_data (task);
584 GstAmfNode *node;
585 const gchar *app, *flash_ver;
586 gchar *uri, *appstr = NULL, *uristr = NULL;
587 gboolean publish;
588
589 node = gst_amf_node_new_object ();
590 app = data->location.application;
591 flash_ver = data->location.flash_ver;
592 publish = data->location.publish;
593 uri = gst_rtmp_location_get_string (&data->location, FALSE);
594
595 if (!app) {
596 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
597 "Application is not set");
598 g_object_unref (task);
599 goto out;
600 }
601
602 if (!flash_ver) {
603 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
604 "Flash version is not set");
605 g_object_unref (task);
606 goto out;
607 }
608
609 if (data->auth_query) {
610 const gchar *query = data->auth_query;
611 appstr = g_strdup_printf ("%s?%s", app, query);
612 uristr = g_strdup_printf ("%s?%s", uri, query);
613 } else if (data->location.authmod == GST_RTMP_AUTHMOD_ADOBE) {
614 const gchar *user = data->location.username;
615 const gchar *authmod = "adobe";
616
617 if (!user) {
618 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
619 "no username for adobe authentication");
620 g_object_unref (task);
621 goto out;
622 }
623
624 if (!data->location.password) {
625 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
626 "no password for adobe authentication");
627 g_object_unref (task);
628 goto out;
629 }
630
631 appstr = g_strdup_printf ("%s?authmod=%s&user=%s", app, authmod, user);
632 uristr = g_strdup_printf ("%s?authmod=%s&user=%s", uri, authmod, user);
633 } else {
634 appstr = g_strdup (app);
635 uristr = g_strdup (uri);
636 }
637
638 /* Arguments for the connect command.
639 * Most of these are described in rtmp_specification_1.0.pdf page 30 */
640
641 /* "The server application name the client is connected to." */
642 gst_amf_node_append_field_take_string (node, "app", appstr, -1);
643
644 if (publish) {
645 /* Undocumented. Sent by both libavformat and librtmp. */
646 gst_amf_node_append_field_string (node, "type", "nonprivate", -1);
647 }
648
649 /* "Flash Player version. It is the same string as returned by the
650 * ApplicationScript getversion () function." */
651 gst_amf_node_append_field_string (node, "flashVer", flash_ver, -1);
652
653 /* "URL of the source SWF file making the connection."
654 * XXX: libavformat sends "swfUrl" here, if provided. */
655
656 /* "URL of the Server. It has the following format.
657 * protocol://servername:port/appName/appInstance" */
658 gst_amf_node_append_field_take_string (node, "tcUrl", uristr, -1);
659
660 if (!publish) {
661 /* "True if proxy is being used." */
662 gst_amf_node_append_field_boolean (node, "fpad", FALSE);
663
664 /* Undocumented. Sent by libavformat. */
665 gst_amf_node_append_field_number (node, "capabilities",
666 15 /* libavformat's magic number */ );
667
668 /* "Indicates what audio codecs the client supports." */
669 gst_amf_node_append_field_number (node, "audioCodecs",
670 GST_RTMP_AUDIOCODECS);
671
672 /* "Indicates what video codecs are supported." */
673 gst_amf_node_append_field_number (node, "videoCodecs",
674 GST_RTMP_VIDEOCODECS);
675
676 /* "Indicates what special video functions are supported." */
677 gst_amf_node_append_field_number (node, "videoFunction",
678 GST_RTMP_VIDEOFUNCTION);
679
680 /* "URL of the web page from where the SWF file was loaded."
681 * XXX: libavformat sends "pageUrl" here, if provided. */
682 }
683
684 gst_rtmp_connection_send_command (data->connection, send_connect_done,
685 task, 0, "connect", node, NULL);
686
687 out:
688 gst_amf_node_free (node);
689 g_free (uri);
690 }
691
692 static void
send_connect_done(const gchar * command_name,GPtrArray * args,gpointer user_data)693 send_connect_done (const gchar * command_name, GPtrArray * args,
694 gpointer user_data)
695 {
696 GTask *task = G_TASK (user_data);
697 ConnectTaskData *data = g_task_get_task_data (task);
698 const GstAmfNode *node, *optional_args;
699 const gchar *code;
700
701 if (g_task_return_error_if_cancelled (task)) {
702 g_object_unref (task);
703 return;
704 }
705
706 if (!args) {
707 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
708 "connect failed: %s", command_name);
709 g_object_unref (task);
710 return;
711 }
712
713 if (args->len < 2) {
714 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
715 "connect failed; not enough return arguments");
716 g_object_unref (task);
717 return;
718 }
719
720 optional_args = g_ptr_array_index (args, 1);
721
722 node = gst_amf_node_get_field (optional_args, "code");
723 if (!node) {
724 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
725 "result code missing from connect cmd result");
726 g_object_unref (task);
727 return;
728 }
729
730 code = gst_amf_node_peek_string (node, NULL);
731 GST_INFO ("connect result: %s", GST_STR_NULL (code));
732
733 if (g_str_equal (code, "NetConnection.Connect.Success")) {
734 node = gst_amf_node_get_field (optional_args, "secureToken");
735 send_secure_token_response (task, data->connection,
736 node ? gst_amf_node_peek_string (node, NULL) : NULL);
737 return;
738 }
739
740 if (g_str_equal (code, "NetConnection.Connect.Rejected")) {
741 GstRtmpAuthmod authmod = data->location.authmod;
742 GMatchInfo *match_info;
743 const gchar *desc;
744 GstUri *query;
745
746 node = gst_amf_node_get_field (optional_args, "description");
747 if (!node) {
748 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
749 "Connect rejected; no description");
750 g_object_unref (task);
751 return;
752 }
753
754 desc = gst_amf_node_peek_string (node, NULL);
755 GST_DEBUG ("connect result desc: %s", GST_STR_NULL (desc));
756
757 if (authmod == GST_RTMP_AUTHMOD_AUTO && strstr (desc, "code=403 need auth")) {
758 if (strstr (desc, "authmod=adobe")) {
759 GST_INFO ("Reconnecting with authmod=adobe");
760 data->location.authmod = GST_RTMP_AUTHMOD_ADOBE;
761 socket_connect (task);
762 return;
763 }
764
765 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
766 "unhandled authentication mode: %s", desc);
767 g_object_unref (task);
768 return;
769 }
770
771 if (!g_regex_match (auth_regex, desc, 0, &match_info)) {
772 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
773 "failed to parse auth rejection: %s", desc);
774 g_object_unref (task);
775 return;
776 }
777
778 {
779 gchar *authmod_str = g_match_info_fetch_named (match_info, "authmod");
780 gchar *query_str = g_match_info_fetch_named (match_info, "query");
781 gboolean matches;
782
783 GST_INFO ("regex parsed auth: authmod=%s, query=%s",
784 GST_STR_NULL (authmod_str), GST_STR_NULL (query_str));
785 g_match_info_free (match_info);
786
787 switch (authmod) {
788 case GST_RTMP_AUTHMOD_ADOBE:
789 matches = g_str_equal (authmod_str, "adobe");
790 break;
791
792 default:
793 matches = FALSE;
794 break;
795 }
796
797 if (!matches) {
798 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
799 "server uses wrong authentication mode '%s'; expected %s",
800 GST_STR_NULL (authmod_str), gst_rtmp_authmod_get_nick (authmod));
801 g_object_unref (task);
802 g_free (authmod_str);
803 g_free (query_str);
804 return;
805 }
806 g_free (authmod_str);
807
808 query = gst_uri_from_string (query_str);
809 if (!query) {
810 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
811 "failed to parse authentication query '%s'",
812 GST_STR_NULL (query_str));
813 g_object_unref (task);
814 g_free (query_str);
815 return;
816 }
817 g_free (query_str);
818 }
819
820 {
821 const gchar *reason = gst_uri_get_query_value (query, "reason");
822
823 if (g_str_equal (reason, "authfailed")) {
824 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
825 "authentication failed! wrong credentials?");
826 g_object_unref (task);
827 gst_uri_unref (query);
828 return;
829 }
830
831 if (!g_str_equal (reason, "needauth")) {
832 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
833 "unhandled rejection reason '%s'", reason ? reason : "");
834 g_object_unref (task);
835 gst_uri_unref (query);
836 return;
837 }
838 }
839
840 g_warn_if_fail (!data->auth_query);
841 data->auth_query = do_adobe_auth (data->location.username,
842 data->location.password, gst_uri_get_query_value (query, "salt"),
843 gst_uri_get_query_value (query, "opaque"),
844 gst_uri_get_query_value (query, "challenge"));
845
846 gst_uri_unref (query);
847
848 if (!data->auth_query) {
849 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
850 "couldn't generate adobe style authentication query");
851 g_object_unref (task);
852 return;
853 }
854
855 socket_connect (task);
856 return;
857 }
858
859 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
860 "unhandled connect result code: %s", GST_STR_NULL (code));
861 g_object_unref (task);
862 }
863
864 /* prep key: pack 1st 16 chars into 4 LittleEndian ints */
865 static void
rtmp_tea_decode_prep_key(const gchar * key,guint32 out[4])866 rtmp_tea_decode_prep_key (const gchar * key, guint32 out[4])
867 {
868 gchar copy[17];
869
870 g_return_if_fail (key);
871 g_return_if_fail (out);
872
873 /* ensure we can read 16 bytes */
874 strncpy (copy, key, 16);
875 /* placate GCC 8 -Wstringop-truncation */
876 copy[16] = 0;
877
878 out[0] = GST_READ_UINT32_LE (copy);
879 out[1] = GST_READ_UINT32_LE (copy + 4);
880 out[2] = GST_READ_UINT32_LE (copy + 8);
881 out[3] = GST_READ_UINT32_LE (copy + 12);
882 }
883
884 /* prep text: hex2bin, each 8 digits -> 4 chars -> 1 uint32 */
885 static GArray *
rtmp_tea_decode_prep_text(const gchar * text)886 rtmp_tea_decode_prep_text (const gchar * text)
887 {
888 GArray *arr;
889 gsize len, i;
890
891 g_return_val_if_fail (text, NULL);
892
893 len = strlen (text);
894 arr = g_array_sized_new (TRUE, TRUE, 4, (len + 7) / 8);
895
896 for (i = 0; i < len; i += 8) {
897 gchar copy[9];
898 guchar chars[4];
899 gsize j;
900 guint32 val;
901
902 /* ensure we can read 8 bytes */
903 strncpy (copy, text + i, 8);
904 /* placate GCC 8 -Wstringop-truncation */
905 copy[8] = 0;
906
907 for (j = 0; j < 4; j++) {
908 gint hi, lo;
909
910 hi = g_ascii_xdigit_value (copy[2 * j]);
911 lo = g_ascii_xdigit_value (copy[2 * j + 1]);
912
913 chars[j] = (hi > 0 ? hi << 4 : 0) + (lo > 0 ? lo : 0);
914 }
915
916 val = GST_READ_UINT32_LE (chars);
917 g_array_append_val (arr, val);
918 }
919
920 return arr;
921 }
922
923 /* return text from uint32s to chars */
924 static gchar *
rtmp_tea_decode_return_text(GArray * arr)925 rtmp_tea_decode_return_text (GArray * arr)
926 {
927 #if G_BYTE_ORDER != G_LITTLE_ENDIAN
928 gsize i;
929
930 g_return_val_if_fail (arr, NULL);
931
932 for (i = 0; i < arr->len; i++) {
933 guint32 *val = &g_array_index (arr, guint32, i);
934 *val = GUINT32_TO_LE (*val);
935 }
936 #endif
937
938 /* array is alredy zero-terminated */
939 return g_array_free (arr, FALSE);
940 }
941
942 /* http://www.movable-type.co.uk/scripts/tea-block.html */
943 static void
rtmp_tea_decode_btea(GArray * text,guint32 key[4])944 rtmp_tea_decode_btea (GArray * text, guint32 key[4])
945 {
946 guint32 *v, n, *k;
947 guint32 z, y, sum = 0, e, DELTA = 0x9e3779b9;
948 guint32 p, q;
949
950 g_return_if_fail (text);
951 g_return_if_fail (text->len > 0);
952 g_return_if_fail (key);
953
954 v = (guint32 *) text->data;
955 n = text->len;
956 k = key;
957 z = v[n - 1];
958 y = v[0];
959 q = 6 + 52 / n;
960 sum = q * DELTA;
961
962 #define MX ((z>>5^y<<2) + (y>>3^z<<4)) ^ ((sum^y) + (k[(p&3)^e]^z));
963
964 while (sum != 0) {
965 e = sum >> 2 & 3;
966 for (p = n - 1; p > 0; p--)
967 z = v[p - 1], y = v[p] -= MX;
968 z = v[n - 1];
969 y = v[0] -= MX;
970 sum -= DELTA;
971 }
972
973 #undef MX
974 }
975
976 /* taken from librtmp */
977 static gchar *
rtmp_tea_decode(const gchar * bin_key,const gchar * hex_text)978 rtmp_tea_decode (const gchar * bin_key, const gchar * hex_text)
979 {
980 guint32 key[4];
981 GArray *text;
982
983 rtmp_tea_decode_prep_key (bin_key, key);
984 text = rtmp_tea_decode_prep_text (hex_text);
985 rtmp_tea_decode_btea (text, key);
986 return rtmp_tea_decode_return_text (text);
987 }
988
989 static void
send_secure_token_response(GTask * task,GstRtmpConnection * connection,const gchar * challenge)990 send_secure_token_response (GTask * task, GstRtmpConnection * connection,
991 const gchar * challenge)
992 {
993 ConnectTaskData *data = g_task_get_task_data (task);
994 if (challenge) {
995 GstAmfNode *node1;
996 GstAmfNode *node2;
997 gchar *response;
998
999 if (!data->location.secure_token || !data->location.secure_token[0]) {
1000 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
1001 "server requires secure token authentication");
1002 g_object_unref (task);
1003 return;
1004 }
1005
1006 response = rtmp_tea_decode (data->location.secure_token, challenge);
1007
1008 GST_DEBUG ("response: %s", response);
1009
1010 node1 = gst_amf_node_new_null ();
1011 node2 = gst_amf_node_new_take_string (response, -1);
1012 gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1013 "secureTokenResponse", node1, node2, NULL);
1014 gst_amf_node_free (node1);
1015 gst_amf_node_free (node2);
1016 }
1017
1018 g_signal_handler_disconnect (connection, data->error_handler_id);
1019 data->error_handler_id = 0;
1020
1021 g_task_return_pointer (task, g_object_ref (connection),
1022 gst_rtmp_connection_close_and_unref);
1023 g_object_unref (task);
1024 }
1025
1026 GstRtmpConnection *
gst_rtmp_client_connect_finish(GAsyncResult * result,GError ** error)1027 gst_rtmp_client_connect_finish (GAsyncResult * result, GError ** error)
1028 {
1029 GTask *task = G_TASK (result);
1030 return g_task_propagate_pointer (task, error);
1031 }
1032
1033 static void send_create_stream (GTask * task);
1034 static void send_publish_or_play (GTask * task);
1035
1036 typedef struct
1037 {
1038 GstRtmpConnection *connection;
1039 gulong error_handler_id;
1040 gchar *stream;
1041 gboolean publish;
1042 guint32 id;
1043 } StreamTaskData;
1044
1045 static StreamTaskData *
stream_task_data_new(GstRtmpConnection * connection,const gchar * stream,gboolean publish)1046 stream_task_data_new (GstRtmpConnection * connection, const gchar * stream,
1047 gboolean publish)
1048 {
1049 StreamTaskData *data = g_slice_new0 (StreamTaskData);
1050 data->connection = g_object_ref (connection);
1051 data->stream = g_strdup (stream);
1052 data->publish = publish;
1053 return data;
1054 }
1055
1056 static void
stream_task_data_free(gpointer ptr)1057 stream_task_data_free (gpointer ptr)
1058 {
1059 StreamTaskData *data = ptr;
1060 g_clear_pointer (&data->stream, g_free);
1061 if (data->error_handler_id) {
1062 g_signal_handler_disconnect (data->connection, data->error_handler_id);
1063 }
1064 g_clear_object (&data->connection);
1065 g_slice_free (StreamTaskData, data);
1066 }
1067
1068 static void
start_stream(GstRtmpConnection * connection,const gchar * stream,gboolean publish,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1069 start_stream (GstRtmpConnection * connection, const gchar * stream,
1070 gboolean publish, GCancellable * cancellable,
1071 GAsyncReadyCallback callback, gpointer user_data)
1072 {
1073 GTask *task;
1074 StreamTaskData *data;
1075
1076 init_debug ();
1077
1078 task = g_task_new (connection, cancellable, callback, user_data);
1079
1080 if (!stream) {
1081 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
1082 "Stream is not set");
1083 g_object_unref (task);
1084 return;
1085 }
1086
1087 data = stream_task_data_new (connection, stream, publish);
1088 g_task_set_task_data (task, data, stream_task_data_free);
1089
1090 data->error_handler_id = g_signal_connect (connection,
1091 "error", G_CALLBACK (connection_error), task);
1092
1093 send_create_stream (task);
1094 }
1095
1096 void
gst_rtmp_client_start_publish_async(GstRtmpConnection * connection,const gchar * stream,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1097 gst_rtmp_client_start_publish_async (GstRtmpConnection * connection,
1098 const gchar * stream, GCancellable * cancellable,
1099 GAsyncReadyCallback callback, gpointer user_data)
1100 {
1101 start_stream (connection, stream, TRUE, cancellable, callback, user_data);
1102 }
1103
1104 void
gst_rtmp_client_start_play_async(GstRtmpConnection * connection,const gchar * stream,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1105 gst_rtmp_client_start_play_async (GstRtmpConnection * connection,
1106 const gchar * stream, GCancellable * cancellable,
1107 GAsyncReadyCallback callback, gpointer user_data)
1108 {
1109 start_stream (connection, stream, FALSE, cancellable, callback, user_data);
1110 }
1111
1112 static void
send_set_buffer_length(GstRtmpConnection * connection,guint32 stream,guint32 ms)1113 send_set_buffer_length (GstRtmpConnection * connection, guint32 stream,
1114 guint32 ms)
1115 {
1116 GstRtmpUserControl uc = {
1117 .type = GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH,
1118 .param = stream,
1119 .param2 = ms,
1120 };
1121
1122 gst_rtmp_connection_queue_message (connection,
1123 gst_rtmp_message_new_user_control (&uc));
1124 }
1125
1126 static void
send_create_stream(GTask * task)1127 send_create_stream (GTask * task)
1128 {
1129 GstRtmpConnection *connection = g_task_get_source_object (task);
1130 StreamTaskData *data = g_task_get_task_data (task);
1131 GstAmfNode *command_object, *stream_name;
1132
1133 command_object = gst_amf_node_new_null ();
1134 stream_name = gst_amf_node_new_string (data->stream, -1);
1135
1136 if (data->publish) {
1137 /* Not part of RTMP documentation */
1138 GST_DEBUG ("Releasing stream '%s'", data->stream);
1139 gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1140 "releaseStream", command_object, stream_name, NULL);
1141 gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1142 "FCPublish", command_object, stream_name, NULL);
1143 } else {
1144 /* Matches librtmp */
1145 gst_rtmp_connection_request_window_size (connection,
1146 GST_RTMP_DEFAULT_WINDOW_ACK_SIZE);
1147 send_set_buffer_length (connection, 0, 300);
1148 }
1149
1150 GST_INFO ("Creating stream '%s'", data->stream);
1151 gst_rtmp_connection_send_command (connection, create_stream_done, task, 0,
1152 "createStream", command_object, NULL);
1153
1154 gst_amf_node_free (stream_name);
1155 gst_amf_node_free (command_object);
1156 }
1157
1158 static void
create_stream_done(const gchar * command_name,GPtrArray * args,gpointer user_data)1159 create_stream_done (const gchar * command_name, GPtrArray * args,
1160 gpointer user_data)
1161 {
1162 GTask *task = G_TASK (user_data);
1163 StreamTaskData *data = g_task_get_task_data (task);
1164 GstAmfNode *result;
1165
1166 if (g_task_return_error_if_cancelled (task)) {
1167 g_object_unref (task);
1168 return;
1169 }
1170
1171 if (!args) {
1172 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1173 "createStream failed: %s", command_name);
1174 g_object_unref (task);
1175 return;
1176 }
1177
1178 if (args->len < 2) {
1179 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1180 "createStream failed; not enough return arguments");
1181 g_object_unref (task);
1182 return;
1183 }
1184
1185 result = g_ptr_array_index (args, 1);
1186 if (gst_amf_node_get_type (result) != GST_AMF_TYPE_NUMBER) {
1187 GString *error_dump = g_string_new ("");
1188
1189 gst_amf_node_dump (result, -1, error_dump);
1190
1191 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1192 "createStream failed: %s", error_dump->str);
1193 g_object_unref (task);
1194
1195 g_string_free (error_dump, TRUE);
1196 return;
1197 }
1198
1199 data->id = gst_amf_node_get_number (result);
1200 GST_INFO ("createStream success, stream_id=%" G_GUINT32_FORMAT, data->id);
1201
1202 if (data->id == 0) {
1203 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_DATA,
1204 "createStream returned ID 0");
1205 g_object_unref (task);
1206 return;
1207 }
1208
1209 send_publish_or_play (task);
1210 }
1211
1212 static void
send_publish_or_play(GTask * task)1213 send_publish_or_play (GTask * task)
1214 {
1215 GstRtmpConnection *connection = g_task_get_source_object (task);
1216 StreamTaskData *data = g_task_get_task_data (task);
1217 const gchar *command = data->publish ? "publish" : "play";
1218 GstAmfNode *command_object, *stream_name, *argument;
1219
1220 command_object = gst_amf_node_new_null ();
1221 stream_name = gst_amf_node_new_string (data->stream, -1);
1222
1223 if (data->publish) {
1224 /* publishing type (live, record, append) */
1225 argument = gst_amf_node_new_string ("live", -1);
1226 } else {
1227 /* "Start" argument: -2 = live or recording, -1 = only live
1228 0 or positive = only recording, seek to X seconds */
1229 argument = gst_amf_node_new_number (-2);
1230 }
1231
1232 GST_INFO ("Sending %s for '%s' on stream %" G_GUINT32_FORMAT,
1233 command, data->stream, data->id);
1234 gst_rtmp_connection_expect_command (connection, on_publish_or_play_status,
1235 task, data->id, "onStatus");
1236 gst_rtmp_connection_send_command (connection, NULL, NULL, data->id,
1237 command, command_object, stream_name, argument, NULL);
1238
1239 if (!data->publish) {
1240 /* Matches librtmp */
1241 send_set_buffer_length (connection, data->id, 30000);
1242 }
1243
1244 gst_amf_node_free (command_object);
1245 gst_amf_node_free (stream_name);
1246 gst_amf_node_free (argument);
1247 }
1248
1249 static void
on_publish_or_play_status(const gchar * command_name,GPtrArray * args,gpointer user_data)1250 on_publish_or_play_status (const gchar * command_name, GPtrArray * args,
1251 gpointer user_data)
1252 {
1253 GTask *task = G_TASK (user_data);
1254 GstRtmpConnection *connection = g_task_get_source_object (task);
1255 StreamTaskData *data = g_task_get_task_data (task);
1256 const gchar *command = data->publish ? "publish" : "play", *code = NULL;
1257 GString *info_dump;
1258
1259 if (g_task_return_error_if_cancelled (task)) {
1260 g_object_unref (task);
1261 return;
1262 }
1263
1264 if (!args) {
1265 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1266 "%s failed: %s", command, command_name);
1267 g_object_unref (task);
1268 return;
1269 }
1270
1271 if (args->len < 2) {
1272 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1273 "%s failed; not enough return arguments", command);
1274 g_object_unref (task);
1275 return;
1276 }
1277
1278 {
1279 const GstAmfNode *info_object, *code_object;
1280 info_object = g_ptr_array_index (args, 1);
1281 code_object = gst_amf_node_get_field (info_object, "code");
1282
1283 if (code_object) {
1284 code = gst_amf_node_peek_string (code_object, NULL);
1285 }
1286
1287 info_dump = g_string_new ("");
1288 gst_amf_node_dump (info_object, -1, info_dump);
1289 }
1290
1291 if (data->publish) {
1292 if (g_strcmp0 (code, "NetStream.Publish.Start") == 0) {
1293 GST_INFO ("publish success: %s", info_dump->str);
1294 g_task_return_boolean (task, TRUE);
1295 goto out;
1296 }
1297
1298 if (g_strcmp0 (code, "NetStream.Publish.BadName") == 0) {
1299 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_EXISTS,
1300 "publish denied: stream already exists: %s", info_dump->str);
1301 goto out;
1302 }
1303
1304 if (g_strcmp0 (code, "NetStream.Publish.Denied") == 0) {
1305 g_task_return_new_error (task, G_IO_ERROR,
1306 G_IO_ERROR_PERMISSION_DENIED, "publish denied: %s", info_dump->str);
1307 goto out;
1308 }
1309 } else {
1310 if (g_strcmp0 (code, "NetStream.Play.Start") == 0 ||
1311 g_strcmp0 (code, "NetStream.Play.PublishNotify") == 0 ||
1312 g_strcmp0 (code, "NetStream.Play.Reset") == 0) {
1313 GST_INFO ("play success: %s", info_dump->str);
1314 g_task_return_boolean (task, TRUE);
1315 goto out;
1316 }
1317
1318 if (g_strcmp0 (code, "NetStream.Play.StreamNotFound") == 0) {
1319 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_FOUND,
1320 "play denied: stream not found: %s", info_dump->str);
1321 goto out;
1322 }
1323 }
1324
1325 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
1326 "unhandled %s result: %s", command, info_dump->str);
1327
1328 out:
1329 g_string_free (info_dump, TRUE);
1330
1331 g_signal_handler_disconnect (connection, data->error_handler_id);
1332 data->error_handler_id = 0;
1333
1334 g_object_unref (task);
1335 }
1336
1337 static gboolean
start_stream_finish(GstRtmpConnection * connection,GAsyncResult * result,guint32 * stream_id,GError ** error)1338 start_stream_finish (GstRtmpConnection * connection,
1339 GAsyncResult * result, guint32 * stream_id, GError ** error)
1340 {
1341 GTask *task;
1342 StreamTaskData *data;
1343
1344 g_return_val_if_fail (g_task_is_valid (result, connection), FALSE);
1345
1346 task = G_TASK (result);
1347
1348 if (!g_task_propagate_boolean (G_TASK (result), error)) {
1349 return FALSE;
1350 }
1351
1352 data = g_task_get_task_data (task);
1353
1354 if (stream_id) {
1355 *stream_id = data->id;
1356 }
1357
1358 return TRUE;
1359 }
1360
1361 gboolean
gst_rtmp_client_start_publish_finish(GstRtmpConnection * connection,GAsyncResult * result,guint32 * stream_id,GError ** error)1362 gst_rtmp_client_start_publish_finish (GstRtmpConnection * connection,
1363 GAsyncResult * result, guint32 * stream_id, GError ** error)
1364 {
1365 return start_stream_finish (connection, result, stream_id, error);
1366 }
1367
1368 gboolean
gst_rtmp_client_start_play_finish(GstRtmpConnection * connection,GAsyncResult * result,guint32 * stream_id,GError ** error)1369 gst_rtmp_client_start_play_finish (GstRtmpConnection * connection,
1370 GAsyncResult * result, guint32 * stream_id, GError ** error)
1371 {
1372 return start_stream_finish (connection, result, stream_id, error);
1373 }
1374
1375 void
gst_rtmp_client_stop_publish(GstRtmpConnection * connection,const gchar * stream,const GstRtmpStopCommands stop_commands)1376 gst_rtmp_client_stop_publish (GstRtmpConnection * connection,
1377 const gchar * stream, const GstRtmpStopCommands stop_commands)
1378 {
1379 send_stop (connection, stream, stop_commands);
1380 }
1381
1382 static void
send_stop(GstRtmpConnection * connection,const gchar * stream,const GstRtmpStopCommands stop_commands)1383 send_stop (GstRtmpConnection * connection, const gchar * stream,
1384 const GstRtmpStopCommands stop_commands)
1385 {
1386 GstAmfNode *command_object, *stream_name;
1387
1388 command_object = gst_amf_node_new_null ();
1389 stream_name = gst_amf_node_new_string (stream, -1);
1390
1391 if (stop_commands & GST_RTMP_STOP_COMMANDS_FCUNPUBLISH) {
1392 GST_DEBUG ("Sending stop command 'FCUnpublish' for stream '%s'", stream);
1393 gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1394 "FCUnpublish", command_object, stream_name, NULL);
1395 }
1396 if (stop_commands & GST_RTMP_STOP_COMMANDS_CLOSE_STREAM) {
1397 GST_DEBUG ("Sending stop command 'closeStream' for stream '%s'", stream);
1398 gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1399 "closeStream", command_object, stream_name, NULL);
1400 }
1401 if (stop_commands & GST_RTMP_STOP_COMMANDS_DELETE_STREAM) {
1402 GST_DEBUG ("Sending stop command 'deleteStream' for stream '%s'", stream);
1403 gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
1404 "deleteStream", command_object, stream_name, NULL);
1405 }
1406
1407 gst_amf_node_free (stream_name);
1408 gst_amf_node_free (command_object);
1409 }
1410