1 /* GStreamer RTMP Library
2 * Copyright (C) 2013 David Schleef <ds@schleef.org>
3 * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4 * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19 * Boston, MA 02110-1335, USA.
20 */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include "rtmputils.h"
27 #include <string.h>
28
29 static void read_all_bytes_done (GObject * source, GAsyncResult * result,
30 gpointer user_data);
31 static void write_all_bytes_done (GObject * source, GAsyncResult * result,
32 gpointer user_data);
33 static void write_all_buffer_done (GObject * source, GAsyncResult * result,
34 gpointer user_data);
35
36 void
gst_rtmp_byte_array_append_bytes(GByteArray * bytearray,GBytes * bytes)37 gst_rtmp_byte_array_append_bytes (GByteArray * bytearray, GBytes * bytes)
38 {
39 const guint8 *data;
40 gsize size;
41 guint offset;
42
43 g_return_if_fail (bytearray);
44
45 offset = bytearray->len;
46 data = g_bytes_get_data (bytes, &size);
47
48 g_return_if_fail (data);
49
50 g_byte_array_set_size (bytearray, offset + size);
51 memcpy (bytearray->data + offset, data, size);
52 }
53
54 void
gst_rtmp_input_stream_read_all_bytes_async(GInputStream * stream,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)55 gst_rtmp_input_stream_read_all_bytes_async (GInputStream * stream, gsize count,
56 int io_priority, GCancellable * cancellable, GAsyncReadyCallback callback,
57 gpointer user_data)
58 {
59 GTask *task;
60 GByteArray *ba;
61
62 g_return_if_fail (G_IS_INPUT_STREAM (stream));
63
64 task = g_task_new (stream, cancellable, callback, user_data);
65
66 ba = g_byte_array_sized_new (count);
67 g_byte_array_set_size (ba, count);
68 g_task_set_task_data (task, ba, (GDestroyNotify) g_byte_array_unref);
69
70 g_input_stream_read_all_async (stream, ba->data, count, io_priority,
71 cancellable, read_all_bytes_done, task);
72 }
73
74 static void
read_all_bytes_done(GObject * source,GAsyncResult * result,gpointer user_data)75 read_all_bytes_done (GObject * source, GAsyncResult * result,
76 gpointer user_data)
77 {
78 GInputStream *is = G_INPUT_STREAM (source);
79 GTask *task = user_data;
80 GByteArray *ba = g_task_get_task_data (task);
81 GError *error = NULL;
82 gboolean res;
83 gsize bytes_read;
84 GBytes *bytes;
85
86 res = g_input_stream_read_all_finish (is, result, &bytes_read, &error);
87 if (!res) {
88 g_task_return_error (task, error);
89 g_object_unref (task);
90 return;
91 }
92
93 g_byte_array_set_size (ba, bytes_read);
94 bytes = g_byte_array_free_to_bytes (g_byte_array_ref (ba));
95
96 g_task_return_pointer (task, bytes, (GDestroyNotify) g_bytes_unref);
97 g_object_unref (task);
98 }
99
100 GBytes *
gst_rtmp_input_stream_read_all_bytes_finish(GInputStream * stream,GAsyncResult * result,GError ** error)101 gst_rtmp_input_stream_read_all_bytes_finish (GInputStream * stream,
102 GAsyncResult * result, GError ** error)
103 {
104 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
105 return g_task_propagate_pointer (G_TASK (result), error);
106 }
107
108 void
gst_rtmp_output_stream_write_all_bytes_async(GOutputStream * stream,GBytes * bytes,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)109 gst_rtmp_output_stream_write_all_bytes_async (GOutputStream * stream,
110 GBytes * bytes, int io_priority, GCancellable * cancellable,
111 GAsyncReadyCallback callback, gpointer user_data)
112 {
113 GTask *task;
114 const void *data;
115 gsize size;
116
117 g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
118 g_return_if_fail (bytes);
119
120 data = g_bytes_get_data (bytes, &size);
121 g_return_if_fail (data);
122
123 task = g_task_new (stream, cancellable, callback, user_data);
124 g_task_set_task_data (task, g_bytes_ref (bytes),
125 (GDestroyNotify) g_bytes_unref);
126
127 g_output_stream_write_all_async (stream, data, size, io_priority,
128 cancellable, write_all_bytes_done, task);
129 }
130
131 static void
write_all_bytes_done(GObject * source,GAsyncResult * result,gpointer user_data)132 write_all_bytes_done (GObject * source, GAsyncResult * result,
133 gpointer user_data)
134 {
135 GOutputStream *os = G_OUTPUT_STREAM (source);
136 GTask *task = user_data;
137 GError *error = NULL;
138 gboolean res;
139
140 res = g_output_stream_write_all_finish (os, result, NULL, &error);
141 if (!res) {
142 g_task_return_error (task, error);
143 g_object_unref (task);
144 return;
145 }
146
147 g_task_return_boolean (task, TRUE);
148 g_object_unref (task);
149 }
150
151 gboolean
gst_rtmp_output_stream_write_all_bytes_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)152 gst_rtmp_output_stream_write_all_bytes_finish (GOutputStream * stream,
153 GAsyncResult * result, GError ** error)
154 {
155 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
156 return g_task_propagate_boolean (G_TASK (result), error);
157 }
158
159 typedef struct
160 {
161 GstBuffer *buffer;
162 GstMapInfo map;
163 gboolean mapped;
164 gsize bytes_written;
165 } WriteAllBufferData;
166
167 static WriteAllBufferData *
write_all_buffer_data_new(GstBuffer * buffer)168 write_all_buffer_data_new (GstBuffer * buffer)
169 {
170 WriteAllBufferData *data = g_slice_new0 (WriteAllBufferData);
171 data->buffer = gst_buffer_ref (buffer);
172 return data;
173 }
174
175 static void
write_all_buffer_data_free(gpointer ptr)176 write_all_buffer_data_free (gpointer ptr)
177 {
178 WriteAllBufferData *data = ptr;
179 if (data->mapped) {
180 gst_buffer_unmap (data->buffer, &data->map);
181 }
182 g_clear_pointer (&data->buffer, gst_buffer_unref);
183 g_slice_free (WriteAllBufferData, data);
184 }
185
186 void
gst_rtmp_output_stream_write_all_buffer_async(GOutputStream * stream,GstBuffer * buffer,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)187 gst_rtmp_output_stream_write_all_buffer_async (GOutputStream * stream,
188 GstBuffer * buffer, int io_priority, GCancellable * cancellable,
189 GAsyncReadyCallback callback, gpointer user_data)
190 {
191 GTask *task;
192 WriteAllBufferData *data;
193
194 g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
195 g_return_if_fail (GST_IS_BUFFER (buffer));
196
197 task = g_task_new (stream, cancellable, callback, user_data);
198
199 data = write_all_buffer_data_new (buffer);
200 g_task_set_task_data (task, data, write_all_buffer_data_free);
201
202 if (!gst_buffer_map (buffer, &data->map, GST_MAP_READ)) {
203 g_task_return_new_error (task, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
204 "Failed to map buffer for reading");
205 g_object_unref (task);
206 return;
207 }
208
209 data->mapped = TRUE;
210
211 g_output_stream_write_all_async (stream, data->map.data, data->map.size,
212 io_priority, cancellable, write_all_buffer_done, task);
213 }
214
215 static void
write_all_buffer_done(GObject * source,GAsyncResult * result,gpointer user_data)216 write_all_buffer_done (GObject * source, GAsyncResult * result,
217 gpointer user_data)
218 {
219 GOutputStream *os = G_OUTPUT_STREAM (source);
220 GTask *task = user_data;
221 WriteAllBufferData *data = g_task_get_task_data (task);
222 GError *error = NULL;
223 gboolean res;
224
225 res = g_output_stream_write_all_finish (os, result, &data->bytes_written,
226 &error);
227
228 gst_buffer_unmap (data->buffer, &data->map);
229 data->mapped = FALSE;
230
231 if (!res) {
232 g_task_return_error (task, error);
233 g_object_unref (task);
234 return;
235 }
236
237 g_task_return_boolean (task, TRUE);
238 g_object_unref (task);
239 }
240
241
242 gboolean
gst_rtmp_output_stream_write_all_buffer_finish(GOutputStream * stream,GAsyncResult * result,gsize * bytes_written,GError ** error)243 gst_rtmp_output_stream_write_all_buffer_finish (GOutputStream * stream,
244 GAsyncResult * result, gsize * bytes_written, GError ** error)
245 {
246 WriteAllBufferData *data;
247 GTask *task;
248
249 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
250 task = G_TASK (result);
251
252 data = g_task_get_task_data (task);
253 if (bytes_written) {
254 *bytes_written = data->bytes_written;
255 }
256
257 return g_task_propagate_boolean (task, error);
258 }
259
260 static const gchar ascii_table[128] = {
261 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
262 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
263 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
264 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
265 ' ', '!', 0x0, '#', '$', '%', '&', '\'',
266 '(', ')', '*', '+', ',', '-', '.', '/',
267 '0', '1', '2', '3', '4', '5', '6', '7',
268 '8', '9', ':', ';', '<', '=', '>', '?',
269 '@', 'A', 'B', 'C', 'D', 'E', 'F', 'G',
270 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O',
271 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W',
272 'X', 'Y', 'Z', '[', 0x0, ']', '^', '_',
273 '`', 'a', 'b', 'c', 'd', 'e', 'f', 'g',
274 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o',
275 'p', 'q', 'r', 's', 't', 'u', 'v', 'w',
276 'x', 'y', 'z', '{', '|', '}', '~', 0x0,
277 };
278
279 static const gchar ascii_escapes[128] = {
280 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 'a',
281 'b', 't', 'n', 'v', 'f', 'r', 0x0, 0x0,
282 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
283 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
284 0x0, 0x0, '"', 0x0, 0x0, 0x0, 0x0, 0x0,
285 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
286 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
287 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
288 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
289 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
290 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
291 0x0, 0x0, 0x0, 0x0, '\\', 0x0, 0x0, 0x0,
292 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
293 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
294 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
295 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
296 };
297
298 void
gst_rtmp_string_print_escaped(GString * string,const gchar * data,gssize size)299 gst_rtmp_string_print_escaped (GString * string, const gchar * data,
300 gssize size)
301 {
302 gssize i;
303
304 g_return_if_fail (string);
305
306 if (!data) {
307 g_string_append (string, "(NULL)");
308 return;
309 }
310
311 g_string_append_c (string, '"');
312
313 for (i = 0; size < 0 ? data[i] != 0 : i < size; i++) {
314 guchar c = data[i];
315
316 if (G_LIKELY (c < G_N_ELEMENTS (ascii_table))) {
317 if (ascii_table[c]) {
318 g_string_append_c (string, c);
319 continue;
320 }
321
322 if (ascii_escapes[c]) {
323 g_string_append_c (string, '\\');
324 g_string_append_c (string, ascii_escapes[c]);
325 continue;
326 }
327 } else {
328 gunichar uc = g_utf8_get_char_validated (data + i,
329 size < 0 ? -1 : size - i);
330 if (uc != (gunichar) (-2) && uc != (gunichar) (-1)) {
331 if (g_unichar_isprint (uc)) {
332 g_string_append_unichar (string, uc);
333 } else if (uc <= G_MAXUINT16) {
334 g_string_append_printf (string, "\\u%04X", uc);
335 } else {
336 g_string_append_printf (string, "\\U%08X", uc);
337 }
338
339 i += g_utf8_skip[c] - 1;
340 continue;
341 }
342 }
343
344 g_string_append_printf (string, "\\x%02X", c);
345 }
346
347 g_string_append_c (string, '"');
348
349 }
350
351 gboolean
gst_rtmp_flv_tag_parse_header(GstRtmpFlvTagHeader * header,const guint8 * data,gsize size)352 gst_rtmp_flv_tag_parse_header (GstRtmpFlvTagHeader * header,
353 const guint8 * data, gsize size)
354 {
355 g_return_val_if_fail (header, FALSE);
356 g_return_val_if_fail (data, FALSE);
357
358 /* Parse FLVTAG header as described in
359 * video_file_format_spec_v10.pdf page 5 (page 9 of the PDF) */
360
361 if (size < GST_RTMP_FLV_TAG_HEADER_SIZE) {
362 return FALSE;
363 }
364
365 /* TagType UI8 */
366 header->type = GST_READ_UINT8 (data);
367
368 /* DataSize UI24 */
369 header->payload_size = GST_READ_UINT24_BE (data + 1);
370
371 /* 4 bytes for the PreviousTagSize UI32 following every tag */
372 header->total_size = GST_RTMP_FLV_TAG_HEADER_SIZE + header->payload_size + 4;
373
374 /* Timestamp UI24 + TimestampExtended UI8 */
375 header->timestamp = GST_READ_UINT24_BE (data + 4);
376 header->timestamp |= (guint32) GST_READ_UINT8 (data + 7) << 24;
377
378 /* Skip StreamID UI24. It's "always 0" for FLV files and for aggregated RTMP
379 * messages we're supposed to use the Stream ID from the AGGREGATE. */
380
381 return TRUE;
382 }
383