• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer RTMP Library
2  * Copyright (C) 2013 David Schleef <ds@schleef.org>
3  * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4  *   Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19  * Boston, MA 02110-1335, USA.
20  */
21 
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25 
26 #include "rtmputils.h"
27 #include <string.h>
28 
29 static void read_all_bytes_done (GObject * source, GAsyncResult * result,
30     gpointer user_data);
31 static void write_all_bytes_done (GObject * source, GAsyncResult * result,
32     gpointer user_data);
33 static void write_all_buffer_done (GObject * source, GAsyncResult * result,
34     gpointer user_data);
35 
36 void
gst_rtmp_byte_array_append_bytes(GByteArray * bytearray,GBytes * bytes)37 gst_rtmp_byte_array_append_bytes (GByteArray * bytearray, GBytes * bytes)
38 {
39   const guint8 *data;
40   gsize size;
41   guint offset;
42 
43   g_return_if_fail (bytearray);
44 
45   offset = bytearray->len;
46   data = g_bytes_get_data (bytes, &size);
47 
48   g_return_if_fail (data);
49 
50   g_byte_array_set_size (bytearray, offset + size);
51   memcpy (bytearray->data + offset, data, size);
52 }
53 
54 void
gst_rtmp_input_stream_read_all_bytes_async(GInputStream * stream,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)55 gst_rtmp_input_stream_read_all_bytes_async (GInputStream * stream, gsize count,
56     int io_priority, GCancellable * cancellable, GAsyncReadyCallback callback,
57     gpointer user_data)
58 {
59   GTask *task;
60   GByteArray *ba;
61 
62   g_return_if_fail (G_IS_INPUT_STREAM (stream));
63 
64   task = g_task_new (stream, cancellable, callback, user_data);
65 
66   ba = g_byte_array_sized_new (count);
67   g_byte_array_set_size (ba, count);
68   g_task_set_task_data (task, ba, (GDestroyNotify) g_byte_array_unref);
69 
70   g_input_stream_read_all_async (stream, ba->data, count, io_priority,
71       cancellable, read_all_bytes_done, task);
72 }
73 
74 static void
read_all_bytes_done(GObject * source,GAsyncResult * result,gpointer user_data)75 read_all_bytes_done (GObject * source, GAsyncResult * result,
76     gpointer user_data)
77 {
78   GInputStream *is = G_INPUT_STREAM (source);
79   GTask *task = user_data;
80   GByteArray *ba = g_task_get_task_data (task);
81   GError *error = NULL;
82   gboolean res;
83   gsize bytes_read;
84   GBytes *bytes;
85 
86   res = g_input_stream_read_all_finish (is, result, &bytes_read, &error);
87   if (!res) {
88     g_task_return_error (task, error);
89     g_object_unref (task);
90     return;
91   }
92 
93   g_byte_array_set_size (ba, bytes_read);
94   bytes = g_byte_array_free_to_bytes (g_byte_array_ref (ba));
95 
96   g_task_return_pointer (task, bytes, (GDestroyNotify) g_bytes_unref);
97   g_object_unref (task);
98 }
99 
100 GBytes *
gst_rtmp_input_stream_read_all_bytes_finish(GInputStream * stream,GAsyncResult * result,GError ** error)101 gst_rtmp_input_stream_read_all_bytes_finish (GInputStream * stream,
102     GAsyncResult * result, GError ** error)
103 {
104   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
105   return g_task_propagate_pointer (G_TASK (result), error);
106 }
107 
108 void
gst_rtmp_output_stream_write_all_bytes_async(GOutputStream * stream,GBytes * bytes,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)109 gst_rtmp_output_stream_write_all_bytes_async (GOutputStream * stream,
110     GBytes * bytes, int io_priority, GCancellable * cancellable,
111     GAsyncReadyCallback callback, gpointer user_data)
112 {
113   GTask *task;
114   const void *data;
115   gsize size;
116 
117   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
118   g_return_if_fail (bytes);
119 
120   data = g_bytes_get_data (bytes, &size);
121   g_return_if_fail (data);
122 
123   task = g_task_new (stream, cancellable, callback, user_data);
124   g_task_set_task_data (task, g_bytes_ref (bytes),
125       (GDestroyNotify) g_bytes_unref);
126 
127   g_output_stream_write_all_async (stream, data, size, io_priority,
128       cancellable, write_all_bytes_done, task);
129 }
130 
131 static void
write_all_bytes_done(GObject * source,GAsyncResult * result,gpointer user_data)132 write_all_bytes_done (GObject * source, GAsyncResult * result,
133     gpointer user_data)
134 {
135   GOutputStream *os = G_OUTPUT_STREAM (source);
136   GTask *task = user_data;
137   GError *error = NULL;
138   gboolean res;
139 
140   res = g_output_stream_write_all_finish (os, result, NULL, &error);
141   if (!res) {
142     g_task_return_error (task, error);
143     g_object_unref (task);
144     return;
145   }
146 
147   g_task_return_boolean (task, TRUE);
148   g_object_unref (task);
149 }
150 
151 gboolean
gst_rtmp_output_stream_write_all_bytes_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)152 gst_rtmp_output_stream_write_all_bytes_finish (GOutputStream * stream,
153     GAsyncResult * result, GError ** error)
154 {
155   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
156   return g_task_propagate_boolean (G_TASK (result), error);
157 }
158 
159 typedef struct
160 {
161   GstBuffer *buffer;
162   GstMapInfo map;
163   gboolean mapped;
164   gsize bytes_written;
165 } WriteAllBufferData;
166 
167 static WriteAllBufferData *
write_all_buffer_data_new(GstBuffer * buffer)168 write_all_buffer_data_new (GstBuffer * buffer)
169 {
170   WriteAllBufferData *data = g_slice_new0 (WriteAllBufferData);
171   data->buffer = gst_buffer_ref (buffer);
172   return data;
173 }
174 
175 static void
write_all_buffer_data_free(gpointer ptr)176 write_all_buffer_data_free (gpointer ptr)
177 {
178   WriteAllBufferData *data = ptr;
179   if (data->mapped) {
180     gst_buffer_unmap (data->buffer, &data->map);
181   }
182   g_clear_pointer (&data->buffer, gst_buffer_unref);
183   g_slice_free (WriteAllBufferData, data);
184 }
185 
186 void
gst_rtmp_output_stream_write_all_buffer_async(GOutputStream * stream,GstBuffer * buffer,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)187 gst_rtmp_output_stream_write_all_buffer_async (GOutputStream * stream,
188     GstBuffer * buffer, int io_priority, GCancellable * cancellable,
189     GAsyncReadyCallback callback, gpointer user_data)
190 {
191   GTask *task;
192   WriteAllBufferData *data;
193 
194   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
195   g_return_if_fail (GST_IS_BUFFER (buffer));
196 
197   task = g_task_new (stream, cancellable, callback, user_data);
198 
199   data = write_all_buffer_data_new (buffer);
200   g_task_set_task_data (task, data, write_all_buffer_data_free);
201 
202   if (!gst_buffer_map (buffer, &data->map, GST_MAP_READ)) {
203     g_task_return_new_error (task, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
204         "Failed to map buffer for reading");
205     g_object_unref (task);
206     return;
207   }
208 
209   data->mapped = TRUE;
210 
211   g_output_stream_write_all_async (stream, data->map.data, data->map.size,
212       io_priority, cancellable, write_all_buffer_done, task);
213 }
214 
215 static void
write_all_buffer_done(GObject * source,GAsyncResult * result,gpointer user_data)216 write_all_buffer_done (GObject * source, GAsyncResult * result,
217     gpointer user_data)
218 {
219   GOutputStream *os = G_OUTPUT_STREAM (source);
220   GTask *task = user_data;
221   WriteAllBufferData *data = g_task_get_task_data (task);
222   GError *error = NULL;
223   gboolean res;
224 
225   res = g_output_stream_write_all_finish (os, result, &data->bytes_written,
226       &error);
227 
228   gst_buffer_unmap (data->buffer, &data->map);
229   data->mapped = FALSE;
230 
231   if (!res) {
232     g_task_return_error (task, error);
233     g_object_unref (task);
234     return;
235   }
236 
237   g_task_return_boolean (task, TRUE);
238   g_object_unref (task);
239 }
240 
241 
242 gboolean
gst_rtmp_output_stream_write_all_buffer_finish(GOutputStream * stream,GAsyncResult * result,gsize * bytes_written,GError ** error)243 gst_rtmp_output_stream_write_all_buffer_finish (GOutputStream * stream,
244     GAsyncResult * result, gsize * bytes_written, GError ** error)
245 {
246   WriteAllBufferData *data;
247   GTask *task;
248 
249   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
250   task = G_TASK (result);
251 
252   data = g_task_get_task_data (task);
253   if (bytes_written) {
254     *bytes_written = data->bytes_written;
255   }
256 
257   return g_task_propagate_boolean (task, error);
258 }
259 
260 static const gchar ascii_table[128] = {
261   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
262   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
263   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
264   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
265   ' ', '!', 0x0, '#', '$', '%', '&', '\'',
266   '(', ')', '*', '+', ',', '-', '.', '/',
267   '0', '1', '2', '3', '4', '5', '6', '7',
268   '8', '9', ':', ';', '<', '=', '>', '?',
269   '@', 'A', 'B', 'C', 'D', 'E', 'F', 'G',
270   'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O',
271   'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W',
272   'X', 'Y', 'Z', '[', 0x0, ']', '^', '_',
273   '`', 'a', 'b', 'c', 'd', 'e', 'f', 'g',
274   'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o',
275   'p', 'q', 'r', 's', 't', 'u', 'v', 'w',
276   'x', 'y', 'z', '{', '|', '}', '~', 0x0,
277 };
278 
279 static const gchar ascii_escapes[128] = {
280   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 'a',
281   'b', 't', 'n', 'v', 'f', 'r', 0x0, 0x0,
282   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
283   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
284   0x0, 0x0, '"', 0x0, 0x0, 0x0, 0x0, 0x0,
285   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
286   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
287   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
288   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
289   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
290   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
291   0x0, 0x0, 0x0, 0x0, '\\', 0x0, 0x0, 0x0,
292   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
293   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
294   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
295   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
296 };
297 
298 void
gst_rtmp_string_print_escaped(GString * string,const gchar * data,gssize size)299 gst_rtmp_string_print_escaped (GString * string, const gchar * data,
300     gssize size)
301 {
302   gssize i;
303 
304   g_return_if_fail (string);
305 
306   if (!data) {
307     g_string_append (string, "(NULL)");
308     return;
309   }
310 
311   g_string_append_c (string, '"');
312 
313   for (i = 0; size < 0 ? data[i] != 0 : i < size; i++) {
314     guchar c = data[i];
315 
316     if (G_LIKELY (c < G_N_ELEMENTS (ascii_table))) {
317       if (ascii_table[c]) {
318         g_string_append_c (string, c);
319         continue;
320       }
321 
322       if (ascii_escapes[c]) {
323         g_string_append_c (string, '\\');
324         g_string_append_c (string, ascii_escapes[c]);
325         continue;
326       }
327     } else {
328       gunichar uc = g_utf8_get_char_validated (data + i,
329           size < 0 ? -1 : size - i);
330       if (uc != (gunichar) (-2) && uc != (gunichar) (-1)) {
331         if (g_unichar_isprint (uc)) {
332           g_string_append_unichar (string, uc);
333         } else if (uc <= G_MAXUINT16) {
334           g_string_append_printf (string, "\\u%04X", uc);
335         } else {
336           g_string_append_printf (string, "\\U%08X", uc);
337         }
338 
339         i += g_utf8_skip[c] - 1;
340         continue;
341       }
342     }
343 
344     g_string_append_printf (string, "\\x%02X", c);
345   }
346 
347   g_string_append_c (string, '"');
348 
349 }
350 
351 gboolean
gst_rtmp_flv_tag_parse_header(GstRtmpFlvTagHeader * header,const guint8 * data,gsize size)352 gst_rtmp_flv_tag_parse_header (GstRtmpFlvTagHeader * header,
353     const guint8 * data, gsize size)
354 {
355   g_return_val_if_fail (header, FALSE);
356   g_return_val_if_fail (data, FALSE);
357 
358   /* Parse FLVTAG header as described in
359    * video_file_format_spec_v10.pdf page 5 (page 9 of the PDF) */
360 
361   if (size < GST_RTMP_FLV_TAG_HEADER_SIZE) {
362     return FALSE;
363   }
364 
365   /* TagType UI8 */
366   header->type = GST_READ_UINT8 (data);
367 
368   /* DataSize UI24 */
369   header->payload_size = GST_READ_UINT24_BE (data + 1);
370 
371   /* 4 bytes for the PreviousTagSize UI32 following every tag */
372   header->total_size = GST_RTMP_FLV_TAG_HEADER_SIZE + header->payload_size + 4;
373 
374   /* Timestamp UI24 + TimestampExtended UI8 */
375   header->timestamp = GST_READ_UINT24_BE (data + 4);
376   header->timestamp |= (guint32) GST_READ_UINT8 (data + 7) << 24;
377 
378   /* Skip StreamID UI24. It's "always 0" for FLV files and for aggregated RTMP
379    * messages we're supposed to use the Stream ID from the AGGREGATE. */
380 
381   return TRUE;
382 }
383