1 /* GStreamer RTMP Library
2 * Copyright (C) 2013 David Schleef <ds@schleef.org>
3 * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4 * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19 * Boston, MA 02110-1335, USA.
20 */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <gst/gst.h>
27 #include <string.h>
28 #include <math.h>
29 #include "rtmpconnection.h"
30 #include "rtmpchunkstream.h"
31 #include "rtmpmessage.h"
32 #include "rtmputils.h"
33 #include "amf.h"
34
35 GST_DEBUG_CATEGORY_STATIC (gst_rtmp_connection_debug_category);
36 #define GST_CAT_DEFAULT gst_rtmp_connection_debug_category
37
38 #define READ_SIZE 8192
39
40 typedef void (*GstRtmpConnectionCallback) (GstRtmpConnection * connection);
41
42 struct _GstRtmpConnection
43 {
44 GObject parent_instance;
45
46 /* should be properties */
47 gboolean input_paused;
48 gboolean error;
49
50 /* private */
51 GThread *thread;
52 GSocketConnection *connection;
53 GCancellable *cancellable;
54 GSocketClient *socket_client;
55 GAsyncQueue *output_queue;
56 GMainContext *main_context;
57
58 GCancellable *outer_cancellable;
59 gulong cancel_handler_id;
60
61 GSource *input_source;
62 GByteArray *input_bytes;
63 guint input_needed_bytes;
64 GstRtmpChunkStreams *input_streams, *output_streams;
65 GList *transactions;
66 GList *expected_commands;
67 guint transaction_count;
68
69 GstRtmpConnectionMessageFunc input_handler;
70 gpointer input_handler_user_data;
71 GDestroyNotify input_handler_user_data_destroy;
72
73 GstRtmpConnectionFunc output_handler;
74 gpointer output_handler_user_data;
75 GDestroyNotify output_handler_user_data_destroy;
76
77 gboolean writing;
78
79 /* Protects the values below during concurrent access.
80 * - Taken by the loop thread when writing, but not reading.
81 * - Taken by other threads when reading (calling get_stats).
82 */
83 GMutex stats_lock;
84
85 /* RTMP configuration */
86 guint32 in_chunk_size;
87 guint32 out_chunk_size, out_chunk_size_pending;
88 guint32 in_window_ack_size;
89 guint32 out_window_ack_size, out_window_ack_size_pending;
90
91 guint64 in_bytes_total;
92 guint64 out_bytes_total;
93 guint64 in_bytes_acked;
94 guint64 out_bytes_acked;
95 };
96
97
98 typedef struct
99 {
100 GObjectClass parent_class;
101 } GstRtmpConnectionClass;
102
103 /* prototypes */
104
105 static void gst_rtmp_connection_dispose (GObject * object);
106 static void gst_rtmp_connection_finalize (GObject * object);
107 static void gst_rtmp_connection_set_cancellable (GstRtmpConnection * self,
108 GCancellable * cancellable);
109 static void gst_rtmp_connection_emit_error (GstRtmpConnection * self);
110 static gboolean gst_rtmp_connection_input_ready (GInputStream * is,
111 gpointer user_data);
112 static void gst_rtmp_connection_start_write (GstRtmpConnection * self);
113 static void gst_rtmp_connection_write_buffer_done (GObject * obj,
114 GAsyncResult * result, gpointer user_data);
115 static void gst_rtmp_connection_start_read (GstRtmpConnection * sc,
116 guint needed_bytes);
117 static void gst_rtmp_connection_try_read (GstRtmpConnection * sc);
118 static void gst_rtmp_connection_do_read (GstRtmpConnection * sc);
119 static void gst_rtmp_connection_handle_aggregate (GstRtmpConnection *
120 connection, GstBuffer * buffer);
121 static void gst_rtmp_connection_handle_protocol_control (GstRtmpConnection *
122 connection, GstBuffer * buffer);
123 static void gst_rtmp_connection_handle_cm (GstRtmpConnection * connection,
124 GstBuffer * buffer);
125 static void gst_rtmp_connection_handle_user_control (GstRtmpConnection * sc,
126 GstBuffer * buffer);
127 static void gst_rtmp_connection_handle_message (GstRtmpConnection * sc,
128 GstBuffer * buffer);
129 static void gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
130 guint32 in_chunk_size);
131 static void gst_rtmp_connection_handle_ack (GstRtmpConnection * self,
132 guint32 bytes);
133 static void gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection *
134 self, guint32 in_chunk_size);
135
136 static void gst_rtmp_connection_send_ack (GstRtmpConnection * connection);
137 static void
138 gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
139 guint32 event_data);
140
141 static gboolean
142 gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
143 GstBuffer * buffer);
144 static void
145 gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self);
146
147 typedef struct
148 {
149 gdouble transaction_id;
150 GstRtmpCommandCallback func;
151 gpointer user_data;
152 } Transaction;
153
154 static Transaction *
transaction_new(gdouble transaction_id,GstRtmpCommandCallback func,gpointer user_data)155 transaction_new (gdouble transaction_id, GstRtmpCommandCallback func,
156 gpointer user_data)
157 {
158 Transaction *data = g_slice_new (Transaction);
159 data->transaction_id = transaction_id;
160 data->func = func;
161 data->user_data = user_data;
162 return data;
163 }
164
165 static void
transaction_free(gpointer ptr)166 transaction_free (gpointer ptr)
167 {
168 Transaction *data = ptr;
169 g_slice_free (Transaction, data);
170 }
171
172 typedef struct
173 {
174 guint32 stream_id;
175 gchar *command_name;
176 GstRtmpCommandCallback func;
177 gpointer user_data;
178 } ExpectedCommand;
179
180 static ExpectedCommand *
expected_command_new(guint32 stream_id,const gchar * command_name,GstRtmpCommandCallback func,gpointer user_data)181 expected_command_new (guint32 stream_id, const gchar * command_name,
182 GstRtmpCommandCallback func, gpointer user_data)
183 {
184 ExpectedCommand *data = g_slice_new (ExpectedCommand);
185 data->stream_id = stream_id;
186 data->command_name = g_strdup (command_name);
187 data->func = func;
188 data->user_data = user_data;
189 return data;
190 }
191
192 static void
expected_command_free(gpointer ptr)193 expected_command_free (gpointer ptr)
194 {
195 ExpectedCommand *data = ptr;
196 g_free (data->command_name);
197 g_slice_free (ExpectedCommand, data);
198 }
199
200 enum
201 {
202 SIGNAL_ERROR,
203 SIGNAL_STREAM_CONTROL,
204
205 N_SIGNALS
206 };
207
208 static guint signals[N_SIGNALS] = { 0, };
209
210 /* singletons */
211
212 static GstMemory *set_data_frame_value;
213
214 static void
init_set_data_frame_value(void)215 init_set_data_frame_value (void)
216 {
217 GstAmfNode *node = gst_amf_node_new_string ("@setDataFrame", -1);
218 GBytes *bytes = gst_amf_node_serialize (node);
219 gsize size;
220 const gchar *data = g_bytes_get_data (bytes, &size);
221
222 set_data_frame_value = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
223 (gpointer) data, size, 0, size, bytes, (GDestroyNotify) g_bytes_unref);
224 GST_MINI_OBJECT_FLAG_SET (set_data_frame_value,
225 GST_MINI_OBJECT_FLAG_MAY_BE_LEAKED);
226
227 gst_amf_node_free (node);
228 }
229
230 /* class initialization */
231
232 G_DEFINE_TYPE_WITH_CODE (GstRtmpConnection, gst_rtmp_connection,
233 G_TYPE_OBJECT,
234 GST_DEBUG_CATEGORY_INIT (gst_rtmp_connection_debug_category,
235 "rtmpconnection", 0, "debug category for GstRtmpConnection class");
236 init_set_data_frame_value ());
237
238 static void
gst_rtmp_connection_class_init(GstRtmpConnectionClass * klass)239 gst_rtmp_connection_class_init (GstRtmpConnectionClass * klass)
240 {
241 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
242
243 gobject_class->dispose = gst_rtmp_connection_dispose;
244 gobject_class->finalize = gst_rtmp_connection_finalize;
245
246 signals[SIGNAL_ERROR] = g_signal_new ("error", G_TYPE_FROM_CLASS (klass),
247 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0);
248
249 signals[SIGNAL_STREAM_CONTROL] = g_signal_new ("stream-control",
250 G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL,
251 G_TYPE_NONE, 2, G_TYPE_INT, G_TYPE_UINT);
252
253 GST_DEBUG_REGISTER_FUNCPTR (gst_rtmp_connection_do_read);
254 }
255
256 static void
gst_rtmp_connection_init(GstRtmpConnection * rtmpconnection)257 gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
258 {
259 rtmpconnection->cancellable = g_cancellable_new ();
260 rtmpconnection->output_queue =
261 g_async_queue_new_full ((GDestroyNotify) gst_buffer_unref);
262 rtmpconnection->input_streams = gst_rtmp_chunk_streams_new ();
263 rtmpconnection->output_streams = gst_rtmp_chunk_streams_new ();
264
265 rtmpconnection->in_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
266 rtmpconnection->out_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
267
268 rtmpconnection->input_bytes = g_byte_array_sized_new (2 * READ_SIZE);
269 rtmpconnection->input_needed_bytes = 1;
270
271 g_mutex_init (&rtmpconnection->stats_lock);
272 }
273
274 void
gst_rtmp_connection_dispose(GObject * object)275 gst_rtmp_connection_dispose (GObject * object)
276 {
277 GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object);
278 GST_DEBUG_OBJECT (rtmpconnection, "dispose");
279
280 /* clean up as possible. may be called multiple times */
281
282 gst_rtmp_connection_close (rtmpconnection);
283 g_cancellable_cancel (rtmpconnection->cancellable);
284 gst_rtmp_connection_set_input_handler (rtmpconnection, NULL, NULL, NULL);
285 gst_rtmp_connection_set_output_handler (rtmpconnection, NULL, NULL, NULL);
286 gst_rtmp_connection_set_cancellable (rtmpconnection, NULL);
287
288 G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->dispose (object);
289 }
290
291 void
gst_rtmp_connection_finalize(GObject * object)292 gst_rtmp_connection_finalize (GObject * object)
293 {
294 GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object);
295 GST_DEBUG_OBJECT (rtmpconnection, "finalize");
296
297 /* clean up object here */
298
299 g_mutex_clear (&rtmpconnection->stats_lock);
300 g_clear_object (&rtmpconnection->cancellable);
301 g_clear_object (&rtmpconnection->connection);
302 g_clear_pointer (&rtmpconnection->output_queue, g_async_queue_unref);
303 g_clear_pointer (&rtmpconnection->input_streams, gst_rtmp_chunk_streams_free);
304 g_clear_pointer (&rtmpconnection->output_streams,
305 gst_rtmp_chunk_streams_free);
306 g_clear_pointer (&rtmpconnection->input_bytes, g_byte_array_unref);
307 g_clear_pointer (&rtmpconnection->main_context, g_main_context_unref);
308 g_clear_pointer (&rtmpconnection->thread, g_thread_unref);
309
310 G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->finalize (object);
311 }
312
313 GSocket *
gst_rtmp_connection_get_socket(GstRtmpConnection * sc)314 gst_rtmp_connection_get_socket (GstRtmpConnection * sc)
315 {
316 return g_socket_connection_get_socket (sc->connection);
317 }
318
319 static void
gst_rtmp_connection_set_socket_connection(GstRtmpConnection * sc,GSocketConnection * connection)320 gst_rtmp_connection_set_socket_connection (GstRtmpConnection * sc,
321 GSocketConnection * connection)
322 {
323 GInputStream *is;
324
325 sc->thread = g_thread_ref (g_thread_self ());
326 sc->main_context = g_main_context_ref_thread_default ();
327 sc->connection = g_object_ref (connection);
328
329 /* refs the socket because it's creating an input stream, which holds a ref */
330 is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
331 /* refs the socket because it's creating a socket source */
332 g_warn_if_fail (!sc->input_source);
333 sc->input_source =
334 g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (is),
335 sc->cancellable);
336 g_source_set_callback (sc->input_source,
337 (GSourceFunc) gst_rtmp_connection_input_ready, g_object_ref (sc),
338 g_object_unref);
339 g_source_attach (sc->input_source, sc->main_context);
340 }
341
342 static void
gst_rtmp_connection_set_cancellable(GstRtmpConnection * self,GCancellable * cancellable)343 gst_rtmp_connection_set_cancellable (GstRtmpConnection * self,
344 GCancellable * cancellable)
345 {
346 g_cancellable_disconnect (self->outer_cancellable, self->cancel_handler_id);
347 g_clear_object (&self->outer_cancellable);
348 self->cancel_handler_id = 0;
349
350 if (cancellable == NULL)
351 return;
352
353 self->outer_cancellable = g_object_ref (cancellable);
354 self->cancel_handler_id =
355 g_cancellable_connect (cancellable, G_CALLBACK (g_cancellable_cancel),
356 g_object_ref (self->cancellable), g_object_unref);
357 }
358
359
360 GstRtmpConnection *
gst_rtmp_connection_new(GSocketConnection * connection,GCancellable * cancellable)361 gst_rtmp_connection_new (GSocketConnection * connection,
362 GCancellable * cancellable)
363 {
364 GstRtmpConnection *sc;
365
366 sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL);
367
368 gst_rtmp_connection_set_socket_connection (sc, connection);
369 gst_rtmp_connection_set_cancellable (sc, cancellable);
370
371 return sc;
372 }
373
374 static void
cancel_all_commands(GstRtmpConnection * self)375 cancel_all_commands (GstRtmpConnection * self)
376 {
377 GList *l;
378
379 for (l = self->transactions; l; l = g_list_next (l)) {
380 Transaction *cc = l->data;
381 GST_LOG_OBJECT (self, "calling transaction callback %s",
382 GST_DEBUG_FUNCPTR_NAME (cc->func));
383 cc->func ("<cancelled>", NULL, cc->user_data);
384 }
385 g_list_free_full (self->transactions, transaction_free);
386 self->transactions = NULL;
387
388 for (l = self->expected_commands; l; l = g_list_next (l)) {
389 ExpectedCommand *cc = l->data;
390 GST_LOG_OBJECT (self, "calling expected command callback %s",
391 GST_DEBUG_FUNCPTR_NAME (cc->func));
392 cc->func ("<cancelled>", NULL, cc->user_data);
393 }
394 g_list_free_full (self->expected_commands, expected_command_free);
395 self->expected_commands = NULL;
396 }
397
398 void
gst_rtmp_connection_close(GstRtmpConnection * self)399 gst_rtmp_connection_close (GstRtmpConnection * self)
400 {
401 if (self->thread != g_thread_self ()) {
402 GST_ERROR_OBJECT (self, "Called from wrong thread");
403 }
404
405 g_cancellable_cancel (self->cancellable);
406 cancel_all_commands (self);
407
408 if (self->input_source) {
409 g_source_destroy (self->input_source);
410 g_clear_pointer (&self->input_source, g_source_unref);
411 }
412
413 if (self->connection) {
414 g_io_stream_close_async (G_IO_STREAM (self->connection),
415 G_PRIORITY_DEFAULT, NULL, NULL, NULL);
416 }
417 }
418
419 void
gst_rtmp_connection_close_and_unref(gpointer ptr)420 gst_rtmp_connection_close_and_unref (gpointer ptr)
421 {
422 GstRtmpConnection *connection;
423
424 g_return_if_fail (ptr);
425
426 connection = GST_RTMP_CONNECTION (ptr);
427 gst_rtmp_connection_close (connection);
428 g_object_unref (connection);
429 }
430
431 void
gst_rtmp_connection_set_input_handler(GstRtmpConnection * sc,GstRtmpConnectionMessageFunc callback,gpointer user_data,GDestroyNotify user_data_destroy)432 gst_rtmp_connection_set_input_handler (GstRtmpConnection * sc,
433 GstRtmpConnectionMessageFunc callback, gpointer user_data,
434 GDestroyNotify user_data_destroy)
435 {
436 if (sc->input_handler_user_data_destroy) {
437 sc->input_handler_user_data_destroy (sc->input_handler_user_data);
438 }
439
440 sc->input_handler = callback;
441 sc->input_handler_user_data = user_data;
442 sc->input_handler_user_data_destroy = user_data_destroy;
443 }
444
445 void
gst_rtmp_connection_set_output_handler(GstRtmpConnection * sc,GstRtmpConnectionFunc callback,gpointer user_data,GDestroyNotify user_data_destroy)446 gst_rtmp_connection_set_output_handler (GstRtmpConnection * sc,
447 GstRtmpConnectionFunc callback, gpointer user_data,
448 GDestroyNotify user_data_destroy)
449 {
450 if (sc->output_handler_user_data_destroy) {
451 sc->output_handler_user_data_destroy (sc->output_handler_user_data);
452 }
453
454 sc->output_handler = callback;
455 sc->output_handler_user_data = user_data;
456 sc->output_handler_user_data_destroy = user_data_destroy;
457 }
458
459 static gboolean
gst_rtmp_connection_input_ready(GInputStream * is,gpointer user_data)460 gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
461 {
462 GstRtmpConnection *sc = user_data;
463 gssize ret;
464 guint oldsize;
465 GError *error = NULL;
466 guint64 bytes_since_ack;
467
468 GST_TRACE_OBJECT (sc, "input ready");
469
470 oldsize = sc->input_bytes->len;
471 g_byte_array_set_size (sc->input_bytes, oldsize + READ_SIZE);
472 ret =
473 g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
474 sc->input_bytes->data + oldsize, READ_SIZE, sc->cancellable, &error);
475 g_byte_array_set_size (sc->input_bytes, oldsize + (ret > 0 ? ret : 0));
476
477 if (ret < 0) {
478 gint code = error->code;
479
480 if (error->domain == G_IO_ERROR && (code == G_IO_ERROR_WOULD_BLOCK ||
481 code == G_IO_ERROR_TIMED_OUT || code == G_IO_ERROR_AGAIN)) {
482 /* should retry */
483 GST_DEBUG_OBJECT (sc, "read IO error %d %s, continuing",
484 code, error->message);
485 g_error_free (error);
486 return G_SOURCE_CONTINUE;
487 }
488
489 GST_ERROR_OBJECT (sc, "read error: %s %d %s",
490 g_quark_to_string (error->domain), code, error->message);
491 g_error_free (error);
492 } else if (ret == 0) {
493 GST_INFO_OBJECT (sc, "read EOF");
494 }
495
496 if (ret <= 0) {
497 gst_rtmp_connection_emit_error (sc);
498 return G_SOURCE_REMOVE;
499 }
500
501 GST_TRACE_OBJECT (sc, "read %" G_GSIZE_FORMAT " bytes", ret);
502
503 g_mutex_lock (&sc->stats_lock);
504 sc->in_bytes_total += ret;
505 g_mutex_unlock (&sc->stats_lock);
506
507 bytes_since_ack = sc->in_bytes_total - sc->in_bytes_acked;
508 if (sc->in_window_ack_size && bytes_since_ack >= sc->in_window_ack_size) {
509 gst_rtmp_connection_send_ack (sc);
510 }
511
512 gst_rtmp_connection_try_read (sc);
513 return G_SOURCE_CONTINUE;
514 }
515
516 static void
gst_rtmp_connection_start_write(GstRtmpConnection * self)517 gst_rtmp_connection_start_write (GstRtmpConnection * self)
518 {
519 GOutputStream *os;
520 GstBuffer *message, *chunks;
521 GstRtmpMeta *meta;
522 GstRtmpChunkStream *cstream;
523
524 if (self->writing) {
525 return;
526 }
527
528 message = g_async_queue_try_pop (self->output_queue);
529 if (!message) {
530 return;
531 }
532
533 meta = gst_buffer_get_rtmp_meta (message);
534 if (!meta) {
535 GST_ERROR_OBJECT (self, "No RTMP meta on %" GST_PTR_FORMAT, message);
536 goto out;
537 }
538
539 if (gst_rtmp_message_is_protocol_control (message)) {
540 if (!gst_rtmp_connection_prepare_protocol_control (self, message)) {
541 GST_ERROR_OBJECT (self,
542 "Failed to prepare protocol control %" GST_PTR_FORMAT, message);
543 goto out;
544 }
545 }
546
547 cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
548 if (!cstream) {
549 GST_ERROR_OBJECT (self, "Failed to get chunk stream for %" GST_PTR_FORMAT,
550 message);
551 goto out;
552 }
553
554 chunks = gst_rtmp_chunk_stream_serialize_all (cstream, message,
555 self->out_chunk_size);
556 if (!chunks) {
557 GST_ERROR_OBJECT (self, "Failed to serialize %" GST_PTR_FORMAT, message);
558 goto out;
559 }
560
561 self->writing = TRUE;
562 if (self->output_handler) {
563 self->output_handler (self, self->output_handler_user_data);
564 }
565
566 os = g_io_stream_get_output_stream (G_IO_STREAM (self->connection));
567 gst_rtmp_output_stream_write_all_buffer_async (os, chunks, G_PRIORITY_DEFAULT,
568 self->cancellable, gst_rtmp_connection_write_buffer_done,
569 g_object_ref (self));
570
571 gst_buffer_unref (chunks);
572
573 out:
574 gst_buffer_unref (message);
575 }
576
577 static void
gst_rtmp_connection_emit_error(GstRtmpConnection * self)578 gst_rtmp_connection_emit_error (GstRtmpConnection * self)
579 {
580 if (self->error) {
581 return;
582 }
583
584 GST_INFO_OBJECT (self, "connection error");
585 self->error = TRUE;
586
587 cancel_all_commands (self);
588
589 g_signal_emit (self, signals[SIGNAL_ERROR], 0);
590 }
591
592 static void
gst_rtmp_connection_write_buffer_done(GObject * obj,GAsyncResult * result,gpointer user_data)593 gst_rtmp_connection_write_buffer_done (GObject * obj,
594 GAsyncResult * result, gpointer user_data)
595 {
596 GOutputStream *os = G_OUTPUT_STREAM (obj);
597 GstRtmpConnection *self = GST_RTMP_CONNECTION (user_data);
598 gsize bytes_written = 0;
599 GError *error = NULL;
600 gboolean res;
601
602 self->writing = FALSE;
603
604 res = gst_rtmp_output_stream_write_all_buffer_finish (os, result,
605 &bytes_written, &error);
606
607 g_mutex_lock (&self->stats_lock);
608 self->out_bytes_total += bytes_written;
609 g_mutex_unlock (&self->stats_lock);
610
611 if (!res) {
612 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
613 GST_INFO_OBJECT (self,
614 "write cancelled (wrote %" G_GSIZE_FORMAT " bytes)", bytes_written);
615 } else {
616 GST_ERROR_OBJECT (self,
617 "write error: %s (wrote %" G_GSIZE_FORMAT " bytes)",
618 error->message, bytes_written);
619 }
620 gst_rtmp_connection_emit_error (self);
621 g_error_free (error);
622 g_object_unref (self);
623 return;
624 }
625
626 GST_LOG_OBJECT (self, "write completed; wrote %" G_GSIZE_FORMAT " bytes",
627 bytes_written);
628
629 gst_rtmp_connection_apply_protocol_control (self);
630 gst_rtmp_connection_start_write (self);
631 g_object_unref (self);
632 }
633
634 static void
gst_rtmp_connection_start_read(GstRtmpConnection * connection,guint needed_bytes)635 gst_rtmp_connection_start_read (GstRtmpConnection * connection,
636 guint needed_bytes)
637 {
638 g_return_if_fail (needed_bytes > 0);
639 connection->input_needed_bytes = needed_bytes;
640 gst_rtmp_connection_try_read (connection);
641 }
642
643 static void
gst_rtmp_connection_try_read(GstRtmpConnection * connection)644 gst_rtmp_connection_try_read (GstRtmpConnection * connection)
645 {
646 guint need = connection->input_needed_bytes,
647 len = connection->input_bytes->len;
648
649 if (len < need) {
650 GST_TRACE_OBJECT (connection, "got %u < %u bytes, need more", len, need);
651 return;
652 }
653
654 GST_TRACE_OBJECT (connection, "got %u >= %u bytes, proceeding", len, need);
655 gst_rtmp_connection_do_read (connection);
656 }
657
658 static void
gst_rtmp_connection_take_input_bytes(GstRtmpConnection * sc,gsize size,GBytes ** outbytes)659 gst_rtmp_connection_take_input_bytes (GstRtmpConnection * sc, gsize size,
660 GBytes ** outbytes)
661 {
662 g_return_if_fail (size <= sc->input_bytes->len);
663
664 if (outbytes) {
665 *outbytes = g_bytes_new (sc->input_bytes->data, size);
666 }
667
668 g_byte_array_remove_range (sc->input_bytes, 0, size);
669 }
670
671 static void
gst_rtmp_connection_do_read(GstRtmpConnection * sc)672 gst_rtmp_connection_do_read (GstRtmpConnection * sc)
673 {
674 GByteArray *input_bytes = sc->input_bytes;
675 gsize needed_bytes = 1;
676
677 while (1) {
678 GstRtmpChunkStream *cstream;
679 guint32 chunk_stream_id, header_size, next_size;
680 guint8 *data;
681
682 chunk_stream_id = gst_rtmp_chunk_stream_parse_id (input_bytes->data,
683 input_bytes->len);
684
685 if (!chunk_stream_id) {
686 needed_bytes = input_bytes->len + 1;
687 break;
688 }
689
690 cstream = gst_rtmp_chunk_streams_get (sc->input_streams, chunk_stream_id);
691 header_size = gst_rtmp_chunk_stream_parse_header (cstream,
692 input_bytes->data, input_bytes->len);
693
694 if (input_bytes->len < header_size) {
695 needed_bytes = header_size;
696 break;
697 }
698
699 next_size = gst_rtmp_chunk_stream_parse_payload (cstream,
700 sc->in_chunk_size, &data);
701
702 if (input_bytes->len < header_size + next_size) {
703 needed_bytes = header_size + next_size;
704 break;
705 }
706
707 memcpy (data, input_bytes->data + header_size, next_size);
708 gst_rtmp_connection_take_input_bytes (sc, header_size + next_size, NULL);
709
710 next_size = gst_rtmp_chunk_stream_wrote_payload (cstream,
711 sc->in_chunk_size);
712
713 if (next_size == 0) {
714 GstBuffer *buffer = gst_rtmp_chunk_stream_parse_finish (cstream);
715 gst_rtmp_connection_handle_message (sc, buffer);
716 gst_buffer_unref (buffer);
717 }
718 }
719
720 gst_rtmp_connection_start_read (sc, needed_bytes);
721 }
722
723 static void
gst_rtmp_connection_handle_message(GstRtmpConnection * sc,GstBuffer * buffer)724 gst_rtmp_connection_handle_message (GstRtmpConnection * sc, GstBuffer * buffer)
725 {
726 if (gst_rtmp_message_is_protocol_control (buffer)) {
727 gst_rtmp_connection_handle_protocol_control (sc, buffer);
728 return;
729 }
730
731 if (gst_rtmp_message_is_user_control (buffer)) {
732 gst_rtmp_connection_handle_user_control (sc, buffer);
733 return;
734 }
735
736 switch (gst_rtmp_message_get_type (buffer)) {
737 case GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0:
738 gst_rtmp_connection_handle_cm (sc, buffer);
739 return;
740
741 case GST_RTMP_MESSAGE_TYPE_AGGREGATE:
742 gst_rtmp_connection_handle_aggregate (sc, buffer);
743 break;
744
745 default:
746 if (sc->input_handler) {
747 sc->input_handler (sc, buffer, sc->input_handler_user_data);
748 }
749 return;
750 }
751 }
752
753 static void
gst_rtmp_connection_handle_aggregate(GstRtmpConnection * connection,GstBuffer * buffer)754 gst_rtmp_connection_handle_aggregate (GstRtmpConnection * connection,
755 GstBuffer * buffer)
756 {
757 GstRtmpMeta *meta;
758 GstMapInfo map;
759 gsize pos = 0;
760 guint32 first_ts = 0;
761
762 meta = gst_buffer_get_rtmp_meta (buffer);
763 g_return_if_fail (meta);
764
765 gst_buffer_map (buffer, &map, GST_MAP_READ);
766 GST_TRACE_OBJECT (connection, "got aggregate message");
767
768 /* Parse Aggregate Messages as described in rtmp_specification_1.0.pdf page 26
769 * The payload is part of a FLV file.
770 *
771 * WARNING: This spec defines the payload to use an "RTMP message format"
772 * which misidentifies the format of the timestamps and omits the size of the
773 * backpointers. */
774
775 while (pos < map.size) {
776 gsize remaining = map.size - pos;
777 GstBuffer *submessage;
778 GstRtmpMeta *submeta;
779 GstRtmpFlvTagHeader header;
780
781 if (!gst_rtmp_flv_tag_parse_header (&header, map.data + pos, remaining)) {
782 GST_ERROR_OBJECT (connection,
783 "aggregate contains incomplete header; want %d, got %" G_GSIZE_FORMAT,
784 GST_RTMP_FLV_TAG_HEADER_SIZE, remaining);
785 break;
786 }
787
788 if (remaining < header.total_size) {
789 GST_ERROR_OBJECT (connection,
790 "aggregate contains incomplete message; want %" G_GSIZE_FORMAT
791 ", got %" G_GSIZE_FORMAT, header.total_size, remaining);
792 break;
793 }
794
795 submessage = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_FLAGS |
796 GST_BUFFER_COPY_META | GST_BUFFER_COPY_MEMORY,
797 pos + GST_RTMP_FLV_TAG_HEADER_SIZE, header.payload_size);
798
799 GST_BUFFER_DTS (submessage) = GST_BUFFER_DTS (buffer);
800 GST_BUFFER_OFFSET (submessage) = GST_BUFFER_OFFSET (buffer) + pos;
801 GST_BUFFER_OFFSET_END (submessage) =
802 GST_BUFFER_OFFSET (submessage) + header.total_size;
803
804 submeta = gst_buffer_get_rtmp_meta (submessage);
805 g_assert (submeta);
806
807 submeta->type = header.type;
808 submeta->size = header.payload_size;
809
810 if (pos == 0) {
811 first_ts = header.timestamp;
812 } else {
813 guint32 ts_offset = header.timestamp - first_ts;
814
815 submeta->ts_delta += ts_offset;
816 GST_BUFFER_DTS (submessage) += ts_offset * GST_MSECOND;
817 GST_BUFFER_FLAG_UNSET (submessage, GST_BUFFER_FLAG_DISCONT);
818 }
819
820 gst_rtmp_buffer_dump (submessage, "<<< submessage");
821 gst_rtmp_connection_handle_message (connection, submessage);
822 gst_buffer_unref (submessage);
823
824 pos += header.total_size;
825 }
826
827 gst_buffer_unmap (buffer, &map);
828 }
829
830 static void
gst_rtmp_connection_handle_protocol_control(GstRtmpConnection * connection,GstBuffer * buffer)831 gst_rtmp_connection_handle_protocol_control (GstRtmpConnection * connection,
832 GstBuffer * buffer)
833 {
834 GstRtmpProtocolControl pc;
835
836 if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) {
837 GST_ERROR_OBJECT (connection, "can't parse protocol control message");
838 return;
839 }
840
841 GST_LOG_OBJECT (connection, "got protocol control message %d:%s", pc.type,
842 gst_rtmp_message_type_get_nick (pc.type));
843
844 switch (pc.type) {
845 case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:
846 GST_INFO_OBJECT (connection, "incoming chunk size %" G_GUINT32_FORMAT,
847 pc.param);
848 gst_rtmp_connection_handle_set_chunk_size (connection, pc.param);
849 break;
850
851 case GST_RTMP_MESSAGE_TYPE_ABORT_MESSAGE:
852 GST_ERROR_OBJECT (connection, "unimplemented: chunk abort, stream_id = %"
853 G_GUINT32_FORMAT, pc.param);
854 break;
855
856 case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT:
857 GST_DEBUG_OBJECT (connection, "acknowledgement %" G_GUINT32_FORMAT,
858 pc.param);
859 gst_rtmp_connection_handle_ack (connection, pc.param);
860 break;
861
862 case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:
863 GST_INFO_OBJECT (connection,
864 "incoming window ack size: %" G_GUINT32_FORMAT, pc.param);
865 gst_rtmp_connection_handle_window_ack_size (connection, pc.param);
866 break;
867
868 case GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH:
869 GST_FIXME_OBJECT (connection, "set peer bandwidth: %" G_GUINT32_FORMAT
870 ", %" G_GUINT32_FORMAT, pc.param, pc.param2);
871 /* FIXME this is not correct, but close enough */
872 gst_rtmp_connection_request_window_size (connection, pc.param);
873 break;
874
875 default:
876 GST_ERROR_OBJECT (connection, "unimplemented protocol control type %d:%s",
877 pc.type, gst_rtmp_message_type_get_nick (pc.type));
878 break;
879 }
880 }
881
882 static void
gst_rtmp_connection_handle_user_control(GstRtmpConnection * connection,GstBuffer * buffer)883 gst_rtmp_connection_handle_user_control (GstRtmpConnection * connection,
884 GstBuffer * buffer)
885 {
886 GstRtmpUserControl uc;
887
888 if (!gst_rtmp_message_parse_user_control (buffer, &uc)) {
889 GST_ERROR_OBJECT (connection, "can't parse user control message");
890 return;
891 }
892
893 GST_LOG_OBJECT (connection, "got user control message %d:%s", uc.type,
894 gst_rtmp_user_control_type_get_nick (uc.type));
895
896 switch (uc.type) {
897 case GST_RTMP_USER_CONTROL_TYPE_STREAM_BEGIN:
898 case GST_RTMP_USER_CONTROL_TYPE_STREAM_EOF:
899 case GST_RTMP_USER_CONTROL_TYPE_STREAM_DRY:
900 case GST_RTMP_USER_CONTROL_TYPE_STREAM_IS_RECORDED:
901 GST_INFO_OBJECT (connection, "stream %u got %s", uc.param,
902 gst_rtmp_user_control_type_get_nick (uc.type));
903 g_signal_emit (connection, signals[SIGNAL_STREAM_CONTROL], 0,
904 uc.type, uc.param);
905 break;
906
907 case GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH:
908 GST_FIXME_OBJECT (connection, "ignoring set buffer length: %"
909 G_GUINT32_FORMAT ", %" G_GUINT32_FORMAT " ms", uc.param, uc.param2);
910 break;
911
912 case GST_RTMP_USER_CONTROL_TYPE_PING_REQUEST:
913 GST_DEBUG_OBJECT (connection, "ping request: %" G_GUINT32_FORMAT,
914 uc.param);
915 gst_rtmp_connection_send_ping_response (connection, uc.param);
916 break;
917
918 case GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE:
919 GST_DEBUG_OBJECT (connection,
920 "ignoring ping response: %" G_GUINT32_FORMAT, uc.param);
921 break;
922
923 case GST_RTMP_USER_CONTROL_TYPE_BUFFER_EMPTY:
924 GST_LOG_OBJECT (connection, "ignoring buffer empty: %" G_GUINT32_FORMAT,
925 uc.param);
926 break;
927
928 case GST_RTMP_USER_CONTROL_TYPE_BUFFER_READY:
929 GST_LOG_OBJECT (connection, "ignoring buffer ready: %" G_GUINT32_FORMAT,
930 uc.param);
931 break;
932
933 default:
934 GST_ERROR_OBJECT (connection, "unimplemented user control type %d:%s",
935 uc.type, gst_rtmp_user_control_type_get_nick (uc.type));
936 break;
937 }
938 }
939
940 static void
gst_rtmp_connection_handle_set_chunk_size(GstRtmpConnection * self,guint32 chunk_size)941 gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
942 guint32 chunk_size)
943 {
944 if (chunk_size < GST_RTMP_MINIMUM_CHUNK_SIZE) {
945 GST_ERROR_OBJECT (self,
946 "peer requested chunk size %" G_GUINT32_FORMAT "; too small",
947 chunk_size);
948 return;
949 }
950
951 if (chunk_size > GST_RTMP_MAXIMUM_CHUNK_SIZE) {
952 GST_ERROR_OBJECT (self,
953 "peer requested chunk size %" G_GUINT32_FORMAT "; too large",
954 chunk_size);
955 return;
956 }
957
958 if (chunk_size < GST_RTMP_DEFAULT_CHUNK_SIZE) {
959 GST_WARNING_OBJECT (self,
960 "peer requested small chunk size %" G_GUINT32_FORMAT, chunk_size);
961 }
962
963 g_mutex_lock (&self->stats_lock);
964 self->in_chunk_size = chunk_size;
965 g_mutex_unlock (&self->stats_lock);
966 }
967
968 static void
gst_rtmp_connection_handle_ack(GstRtmpConnection * self,guint32 bytes)969 gst_rtmp_connection_handle_ack (GstRtmpConnection * self, guint32 bytes)
970 {
971 guint64 last_ack, new_ack;
972 guint32 last_ack_low, last_ack_high;
973
974 last_ack = self->out_bytes_acked;
975 last_ack_low = last_ack & G_MAXUINT32;
976 last_ack_high = (last_ack >> 32) & G_MAXUINT32;
977
978 if (bytes < last_ack_low) {
979 GST_WARNING_OBJECT (self,
980 "Acknowledgement bytes regression, assuming rollover: %"
981 G_GUINT32_FORMAT " < %" G_GUINT32_FORMAT, bytes, last_ack_low);
982 last_ack_high += 1;
983 }
984
985 new_ack = (((guint64) last_ack_high) << 32) | bytes;
986
987 GST_LOG_OBJECT (self, "Peer acknowledged %" G_GUINT64_FORMAT " bytes",
988 new_ack - last_ack);
989
990 g_mutex_lock (&self->stats_lock);
991 self->out_bytes_acked = new_ack;
992 g_mutex_unlock (&self->stats_lock);
993 }
994
995 static void
gst_rtmp_connection_handle_window_ack_size(GstRtmpConnection * self,guint32 window_ack_size)996 gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection * self,
997 guint32 window_ack_size)
998 {
999 if (window_ack_size < GST_RTMP_DEFAULT_WINDOW_ACK_SIZE) {
1000 GST_WARNING_OBJECT (self,
1001 "peer requested small window ack size %" G_GUINT32_FORMAT,
1002 window_ack_size);
1003 }
1004
1005 g_mutex_lock (&self->stats_lock);
1006 self->in_window_ack_size = window_ack_size;
1007 g_mutex_unlock (&self->stats_lock);
1008 }
1009
1010 static gboolean
is_command_response(const gchar * command_name)1011 is_command_response (const gchar * command_name)
1012 {
1013 return g_strcmp0 (command_name, "_result") == 0 ||
1014 g_strcmp0 (command_name, "_error") == 0;
1015 }
1016
1017 static void
gst_rtmp_connection_handle_cm(GstRtmpConnection * sc,GstBuffer * buffer)1018 gst_rtmp_connection_handle_cm (GstRtmpConnection * sc, GstBuffer * buffer)
1019 {
1020 GstRtmpMeta *meta;
1021 gchar *command_name;
1022 gdouble transaction_id;
1023 GPtrArray *args;
1024
1025 meta = gst_buffer_get_rtmp_meta (buffer);
1026 g_return_if_fail (meta);
1027
1028 {
1029 GstMapInfo map;
1030 gst_buffer_map (buffer, &map, GST_MAP_READ);
1031 args = gst_amf_parse_command (map.data, map.size, &transaction_id,
1032 &command_name);
1033 gst_buffer_unmap (buffer, &map);
1034 }
1035
1036 if (!args) {
1037 return;
1038 }
1039
1040 if (!isfinite (transaction_id) || transaction_id < 0 ||
1041 transaction_id > G_MAXUINT) {
1042 GST_WARNING_OBJECT (sc,
1043 "Server sent command \"%s\" with extreme transaction ID %.0f",
1044 GST_STR_NULL (command_name), transaction_id);
1045 } else if (transaction_id > sc->transaction_count) {
1046 GST_WARNING_OBJECT (sc,
1047 "Server sent command \"%s\" with unused transaction ID (%.0f > %u)",
1048 GST_STR_NULL (command_name), transaction_id, sc->transaction_count);
1049 sc->transaction_count = transaction_id;
1050 }
1051
1052 GST_DEBUG_OBJECT (sc,
1053 "got control message \"%s\" transaction %.0f size %"
1054 G_GUINT32_FORMAT, GST_STR_NULL (command_name), transaction_id,
1055 meta->size);
1056
1057 if (is_command_response (command_name)) {
1058 if (transaction_id != 0) {
1059 GList *l;
1060
1061 for (l = sc->transactions; l; l = g_list_next (l)) {
1062 Transaction *t = l->data;
1063
1064 if (t->transaction_id != transaction_id) {
1065 continue;
1066 }
1067
1068 GST_LOG_OBJECT (sc, "calling transaction callback %s",
1069 GST_DEBUG_FUNCPTR_NAME (t->func));
1070 sc->transactions = g_list_remove_link (sc->transactions, l);
1071 t->func (command_name, args, t->user_data);
1072 g_list_free_full (l, transaction_free);
1073 break;
1074 }
1075 } else {
1076 GST_WARNING_OBJECT (sc, "Server sent response \"%s\" without transaction",
1077 GST_STR_NULL (command_name));
1078 }
1079 } else {
1080 GList *l;
1081
1082 if (transaction_id != 0) {
1083 GST_FIXME_OBJECT (sc, "Server sent command \"%s\" expecting reply",
1084 GST_STR_NULL (command_name));
1085 }
1086
1087 for (l = sc->expected_commands; l; l = g_list_next (l)) {
1088 ExpectedCommand *ec = l->data;
1089
1090 if (ec->stream_id != meta->mstream) {
1091 continue;
1092 }
1093
1094 if (g_strcmp0 (ec->command_name, command_name)) {
1095 continue;
1096 }
1097
1098 GST_LOG_OBJECT (sc, "calling expected command callback %s",
1099 GST_DEBUG_FUNCPTR_NAME (ec->func));
1100 sc->expected_commands = g_list_remove_link (sc->expected_commands, l);
1101 ec->func (command_name, args, ec->user_data);
1102 g_list_free_full (l, expected_command_free);
1103 break;
1104 }
1105 }
1106
1107 g_free (command_name);
1108 g_ptr_array_unref (args);
1109 }
1110
1111 static gboolean
start_write(gpointer user_data)1112 start_write (gpointer user_data)
1113 {
1114 GstRtmpConnection *sc = user_data;
1115 gst_rtmp_connection_start_write (sc);
1116 return G_SOURCE_REMOVE;
1117 }
1118
1119 void
gst_rtmp_connection_queue_message(GstRtmpConnection * self,GstBuffer * buffer)1120 gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer)
1121 {
1122 g_return_if_fail (GST_IS_RTMP_CONNECTION (self));
1123 g_return_if_fail (GST_IS_BUFFER (buffer));
1124
1125 g_async_queue_push (self->output_queue, buffer);
1126 g_main_context_invoke_full (self->main_context, G_PRIORITY_DEFAULT,
1127 start_write, g_object_ref (self), g_object_unref);
1128 }
1129
1130 guint
gst_rtmp_connection_get_num_queued(GstRtmpConnection * connection)1131 gst_rtmp_connection_get_num_queued (GstRtmpConnection * connection)
1132 {
1133 return g_async_queue_length (connection->output_queue);
1134 }
1135
1136 guint
gst_rtmp_connection_send_command(GstRtmpConnection * connection,GstRtmpCommandCallback response_command,gpointer user_data,guint32 stream_id,const gchar * command_name,const GstAmfNode * argument,...)1137 gst_rtmp_connection_send_command (GstRtmpConnection * connection,
1138 GstRtmpCommandCallback response_command, gpointer user_data,
1139 guint32 stream_id, const gchar * command_name, const GstAmfNode * argument,
1140 ...)
1141 {
1142 GstBuffer *buffer;
1143 gdouble transaction_id = 0;
1144 va_list ap;
1145 GBytes *payload;
1146 guint8 *data;
1147 gsize size;
1148
1149 g_return_val_if_fail (GST_IS_RTMP_CONNECTION (connection), 0);
1150
1151 if (connection->thread != g_thread_self ()) {
1152 GST_ERROR_OBJECT (connection, "Called from wrong thread");
1153 }
1154
1155 GST_DEBUG_OBJECT (connection,
1156 "Sending command '%s' on stream id %" G_GUINT32_FORMAT,
1157 command_name, stream_id);
1158
1159 if (response_command) {
1160 Transaction *t;
1161
1162 transaction_id = ++connection->transaction_count;
1163
1164 GST_LOG_OBJECT (connection, "Registering %s for transid %.0f",
1165 GST_DEBUG_FUNCPTR_NAME (response_command), transaction_id);
1166
1167 t = transaction_new (transaction_id, response_command, user_data);
1168
1169 connection->transactions = g_list_append (connection->transactions, t);
1170 }
1171
1172 va_start (ap, argument);
1173 payload = gst_amf_serialize_command_valist (transaction_id,
1174 command_name, argument, ap);
1175 va_end (ap);
1176
1177 data = g_bytes_unref_to_data (payload, &size);
1178 buffer = gst_rtmp_message_new_wrapped (GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0,
1179 3, stream_id, data, size);
1180
1181 gst_rtmp_connection_queue_message (connection, buffer);
1182 return transaction_id;
1183 }
1184
1185 void
gst_rtmp_connection_expect_command(GstRtmpConnection * connection,GstRtmpCommandCallback response_command,gpointer user_data,guint32 stream_id,const gchar * command_name)1186 gst_rtmp_connection_expect_command (GstRtmpConnection * connection,
1187 GstRtmpCommandCallback response_command, gpointer user_data,
1188 guint32 stream_id, const gchar * command_name)
1189 {
1190 ExpectedCommand *ec;
1191
1192 g_return_if_fail (response_command);
1193 g_return_if_fail (command_name);
1194 g_return_if_fail (!is_command_response (command_name));
1195
1196 GST_LOG_OBJECT (connection,
1197 "Registering %s for stream id %" G_GUINT32_FORMAT " name \"%s\"",
1198 GST_DEBUG_FUNCPTR_NAME (response_command), stream_id, command_name);
1199
1200 ec = expected_command_new (stream_id, command_name, response_command,
1201 user_data);
1202
1203 connection->expected_commands =
1204 g_list_append (connection->expected_commands, ec);
1205 }
1206
1207 static void
gst_rtmp_connection_send_ack(GstRtmpConnection * connection)1208 gst_rtmp_connection_send_ack (GstRtmpConnection * connection)
1209 {
1210 guint64 in_bytes_total = connection->in_bytes_total;
1211 GstRtmpProtocolControl pc = {
1212 .type = GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT,
1213 .param = (guint32) in_bytes_total,
1214 };
1215
1216 gst_rtmp_connection_queue_message (connection,
1217 gst_rtmp_message_new_protocol_control (&pc));
1218
1219 g_mutex_lock (&connection->stats_lock);
1220 connection->in_bytes_acked = in_bytes_total;
1221 g_mutex_unlock (&connection->stats_lock);
1222 }
1223
1224 static void
gst_rtmp_connection_send_ping_response(GstRtmpConnection * connection,guint32 event_data)1225 gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
1226 guint32 event_data)
1227 {
1228 GstRtmpUserControl uc = {
1229 .type = GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE,
1230 .param = event_data,
1231 };
1232
1233 gst_rtmp_connection_queue_message (connection,
1234 gst_rtmp_message_new_user_control (&uc));
1235 }
1236
1237 void
gst_rtmp_connection_set_chunk_size(GstRtmpConnection * connection,guint32 chunk_size)1238 gst_rtmp_connection_set_chunk_size (GstRtmpConnection * connection,
1239 guint32 chunk_size)
1240 {
1241 GstRtmpProtocolControl pc = {
1242 .type = GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE,
1243 .param = chunk_size,
1244 };
1245
1246 g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1247
1248 gst_rtmp_connection_queue_message (connection,
1249 gst_rtmp_message_new_protocol_control (&pc));
1250 }
1251
1252 void
gst_rtmp_connection_request_window_size(GstRtmpConnection * connection,guint32 window_ack_size)1253 gst_rtmp_connection_request_window_size (GstRtmpConnection * connection,
1254 guint32 window_ack_size)
1255 {
1256 GstRtmpProtocolControl pc = {
1257 .type = GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE,
1258 .param = window_ack_size,
1259 };
1260
1261 g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1262
1263 gst_rtmp_connection_queue_message (connection,
1264 gst_rtmp_message_new_protocol_control (&pc));
1265 }
1266
1267 void
gst_rtmp_connection_set_data_frame(GstRtmpConnection * connection,GstBuffer * buffer)1268 gst_rtmp_connection_set_data_frame (GstRtmpConnection * connection,
1269 GstBuffer * buffer)
1270 {
1271 g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
1272 g_return_if_fail (GST_IS_BUFFER (buffer));
1273
1274 gst_buffer_prepend_memory (buffer, gst_memory_ref (set_data_frame_value));
1275 gst_rtmp_connection_queue_message (connection, buffer);
1276 }
1277
1278 static gboolean
gst_rtmp_connection_prepare_protocol_control(GstRtmpConnection * self,GstBuffer * buffer)1279 gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
1280 GstBuffer * buffer)
1281 {
1282 GstRtmpProtocolControl pc;
1283
1284 if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) {
1285 GST_ERROR_OBJECT (self, "can't parse protocol control message");
1286 return FALSE;
1287 }
1288
1289 switch (pc.type) {
1290 case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:{
1291 guint32 chunk_size = pc.param;
1292
1293 GST_INFO_OBJECT (self, "pending chunk size %" G_GUINT32_FORMAT,
1294 chunk_size);
1295
1296 if (chunk_size < GST_RTMP_MINIMUM_CHUNK_SIZE) {
1297 GST_ERROR_OBJECT (self,
1298 "requested chunk size %" G_GUINT32_FORMAT " is too small",
1299 chunk_size);
1300 return FALSE;
1301 }
1302
1303 if (chunk_size > GST_RTMP_MAXIMUM_CHUNK_SIZE) {
1304 GST_ERROR_OBJECT (self,
1305 "requested chunk size %" G_GUINT32_FORMAT " is too large",
1306 chunk_size);
1307 return FALSE;
1308 }
1309
1310 if (chunk_size < GST_RTMP_DEFAULT_CHUNK_SIZE) {
1311 GST_WARNING_OBJECT (self,
1312 "requesting small chunk size %" G_GUINT32_FORMAT, chunk_size);
1313 }
1314
1315 self->out_chunk_size_pending = pc.param;
1316 break;
1317 }
1318
1319 case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:{
1320 guint32 window_ack_size = pc.param;
1321
1322 GST_INFO_OBJECT (self, "pending window ack size: %" G_GUINT32_FORMAT,
1323 window_ack_size);
1324
1325 if (window_ack_size < GST_RTMP_DEFAULT_WINDOW_ACK_SIZE) {
1326 GST_WARNING_OBJECT (self,
1327 "requesting small window ack size %" G_GUINT32_FORMAT,
1328 window_ack_size);
1329 }
1330
1331 self->out_window_ack_size_pending = window_ack_size;
1332 break;
1333 }
1334
1335 default:
1336 break;
1337 }
1338
1339 return TRUE;
1340 }
1341
1342 static void
gst_rtmp_connection_apply_protocol_control(GstRtmpConnection * self)1343 gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self)
1344 {
1345 guint32 chunk_size, window_ack_size;
1346
1347 chunk_size = self->out_chunk_size_pending;
1348 if (chunk_size) {
1349 self->out_chunk_size_pending = 0;
1350
1351 g_mutex_lock (&self->stats_lock);
1352 self->out_chunk_size = chunk_size;
1353 g_mutex_unlock (&self->stats_lock);
1354
1355 GST_INFO_OBJECT (self, "applied chunk size %" G_GUINT32_FORMAT, chunk_size);
1356 }
1357
1358 window_ack_size = self->out_window_ack_size_pending;
1359 if (window_ack_size) {
1360 self->out_window_ack_size_pending = 0;
1361
1362 g_mutex_lock (&self->stats_lock);
1363 self->out_window_ack_size = window_ack_size;
1364 g_mutex_unlock (&self->stats_lock);
1365
1366 GST_INFO_OBJECT (self, "applied window ack size %" G_GUINT32_FORMAT,
1367 window_ack_size);
1368 }
1369 }
1370
1371 static GstStructure *
get_stats(GstRtmpConnection * self)1372 get_stats (GstRtmpConnection * self)
1373 {
1374 return gst_structure_new ("GstRtmpConnectionStats",
1375 "in-chunk-size", G_TYPE_UINT, self ? self->in_chunk_size : 0,
1376 "out-chunk-size", G_TYPE_UINT, self ? self->out_chunk_size : 0,
1377 "in-window-ack-size", G_TYPE_UINT, self ? self->in_window_ack_size : 0,
1378 "out-window-ack-size", G_TYPE_UINT, self ? self->out_window_ack_size : 0,
1379 "in-bytes-total", G_TYPE_UINT64, self ? self->in_bytes_total : 0,
1380 "out-bytes-total", G_TYPE_UINT64, self ? self->out_bytes_total : 0,
1381 "in-bytes-acked", G_TYPE_UINT64, self ? self->in_bytes_acked : 0,
1382 "out-bytes-acked", G_TYPE_UINT64, self ? self->out_bytes_acked : 0, NULL);
1383 }
1384
1385 GstStructure *
gst_rtmp_connection_get_null_stats(void)1386 gst_rtmp_connection_get_null_stats (void)
1387 {
1388 return get_stats (NULL);
1389 }
1390
1391 GstStructure *
gst_rtmp_connection_get_stats(GstRtmpConnection * self)1392 gst_rtmp_connection_get_stats (GstRtmpConnection * self)
1393 {
1394 GstStructure *s;
1395
1396 g_return_val_if_fail (GST_IS_RTMP_CONNECTION (self), NULL);
1397
1398 g_mutex_lock (&self->stats_lock);
1399 s = get_stats (self);
1400 g_mutex_unlock (&self->stats_lock);
1401
1402 return s;
1403 }
1404