1 /* GStreamer
2 * Copyright (C) 2011 David Schleef <ds@schleef.org>
3 * Copyright (C) 2011 Tim-Philipp Müller <tim.muller@collabora.co.uk>
4 * Copyright (C) 2014 Tim-Philipp Müller <tim@centricular.com>
5 * Copyright (C) 2014 Vincent Penquerc'h <vincent@collabora.co.uk>
6 *
7 * gstelements_private.c: Shared code for core elements
8 *
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Library General Public
11 * License as published by the Free Software Foundation; either
12 * version 2 of the License, or (at your option) any later version.
13 *
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Library General Public License for more details.
18 *
19 * You should have received a copy of the GNU Library General Public
20 * License along with this library; if not, write to the
21 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22 * Boston, MA 02110-1301, USA.
23 */
24
25 #ifdef HAVE_CONFIG_H
26 # include "config.h"
27 #endif
28 #include <stdio.h>
29 #ifdef HAVE_UNISTD_H
30 #include <unistd.h>
31 #endif
32 #ifdef HAVE_SYS_UIO_H
33 #include <sys/uio.h>
34 #endif
35 #include <sys/types.h>
36 #include <errno.h>
37 #include <string.h>
38 #include <string.h>
39 #include "gst/gst.h"
40 #include "gstelements_private.h"
41
42 #ifdef G_OS_WIN32
43 # include <io.h> /* lseek, open, close, read */
44 # undef lseek
45 # define lseek _lseeki64
46 # undef off_t
47 # define off_t guint64
48 # define WIN32_LEAN_AND_MEAN /* prevents from including too many things */
49 # include <windows.h>
50 # undef WIN32_LEAN_AND_MEAN
51 # ifndef EWOULDBLOCK
52 # define EWOULDBLOCK EAGAIN
53 # endif
54 #endif /* G_OS_WIN32 */
55
56 #define BUFFER_FLAG_SHIFT 4
57
58 G_STATIC_ASSERT ((1 << BUFFER_FLAG_SHIFT) == GST_MINI_OBJECT_FLAG_LAST);
59
60 /* Returns a newly allocated string describing the flags on this buffer */
61 gchar *
gst_buffer_get_flags_string(GstBuffer * buffer)62 gst_buffer_get_flags_string (GstBuffer * buffer)
63 {
64 static const char flag_strings[] =
65 "\000\000\000\000live\000decode-only\000discont\000resync\000corrupted\000"
66 "marker\000header\000gap\000droppable\000delta-unit\000tag-memory\000"
67 "sync-after\000non-droppable\000FIXME";
68 static const guint8 flag_idx[] = { 0, 1, 2, 3, 4, 9, 21, 29, 36, 46, 53,
69 60, 64, 74, 85, 96, 107, 121,
70 };
71 int i, max_bytes;
72 char *flag_str, *end;
73
74 /* max size is all flag strings plus a space or terminator after each one */
75 max_bytes = sizeof (flag_strings);
76 flag_str = g_malloc (max_bytes);
77
78 end = flag_str;
79 end[0] = '\0';
80 for (i = BUFFER_FLAG_SHIFT; i < G_N_ELEMENTS (flag_idx); i++) {
81 if (GST_MINI_OBJECT_CAST (buffer)->flags & (1 << i)) {
82 strcpy (end, flag_strings + flag_idx[i]);
83 end += strlen (end);
84 end[0] = ' ';
85 end[1] = '\0';
86 end++;
87 }
88 }
89
90 return flag_str;
91 }
92
93 /* Returns a newly-allocated string describing the metas on this buffer, or NULL */
94 gchar *
gst_buffer_get_meta_string(GstBuffer * buffer)95 gst_buffer_get_meta_string (GstBuffer * buffer)
96 {
97 gpointer state = NULL;
98 GstMeta *meta;
99 GString *s = NULL;
100
101 while ((meta = gst_buffer_iterate_meta (buffer, &state))) {
102 const gchar *desc = g_type_name (meta->info->type);
103
104 if (s == NULL)
105 s = g_string_new (NULL);
106 else
107 g_string_append (s, ", ");
108
109 g_string_append (s, desc);
110 }
111
112 return (s != NULL) ? g_string_free (s, FALSE) : NULL;
113 }
114
115 /* Define our own iovec structure here, so that we can use it unconditionally
116 * in the code below and use almost the same code path for systems where
117 * writev() is supported and those were it's not supported */
118 #ifndef HAVE_SYS_UIO_H
119 struct iovec
120 {
121 gpointer iov_base;
122 gsize iov_len;
123 };
124 #endif
125
126 /* completely arbitrary thresholds */
127 #define FDSINK_MAX_ALLOCA_SIZE (64 * 1024) /* 64k */
128 #define FDSINK_MAX_MALLOC_SIZE ( 8 * 1024 * 1024) /* 8M */
129
130 /* Adapted from GLib (gio/gioprivate.h)
131 *
132 * POSIX defines IOV_MAX/UIO_MAXIOV as the maximum number of iovecs that can
133 * be sent in one go. We define our own version of it here as there are two
134 * possible names, and also define a fall-back value if none of the constants
135 * are defined */
136 #if defined(IOV_MAX)
137 #define GST_IOV_MAX IOV_MAX
138 #elif defined(UIO_MAXIOV)
139 #define GST_IOV_MAX UIO_MAXIOV
140 #elif defined(__APPLE__)
141 /* For osx/ios, UIO_MAXIOV is documented in writev(2), but <sys/uio.h>
142 * only declares it if defined(KERNEL) */
143 #define GST_IOV_MAX 512
144 #else
145 /* 16 is the minimum value required by POSIX */
146 #define GST_IOV_MAX 16
147 #endif
148
149 static gssize
gst_writev(gint fd,const struct iovec * iov,gint iovcnt,gsize total_bytes)150 gst_writev (gint fd, const struct iovec *iov, gint iovcnt, gsize total_bytes)
151 {
152 gssize written;
153
154 #ifdef HAVE_SYS_UIO_H
155 if (iovcnt <= GST_IOV_MAX) {
156 do {
157 written = writev (fd, iov, iovcnt);
158 } while (written < 0 && errno == EINTR);
159 } else
160 #endif
161 {
162 gint i;
163
164 /* We merge the memories here because technically write()/writev() is
165 * supposed to be atomic, which it's not if we do multiple separate
166 * write() calls. It's very doubtful anyone cares though in our use
167 * cases, and it's not clear how that can be reconciled with the
168 * possibility of short writes, so in any case we might want to
169 * simplify this later or just remove it. */
170 if (iovcnt > 1 && total_bytes <= FDSINK_MAX_MALLOC_SIZE) {
171 gchar *mem, *p;
172
173 if (total_bytes <= FDSINK_MAX_ALLOCA_SIZE)
174 mem = g_alloca (total_bytes);
175 else
176 mem = g_malloc (total_bytes);
177
178 p = mem;
179 for (i = 0; i < iovcnt; ++i) {
180 memcpy (p, iov[i].iov_base, iov[i].iov_len);
181 p += iov[i].iov_len;
182 }
183
184 do {
185 written = write (fd, mem, total_bytes);
186 } while (written < 0 && errno == EINTR);
187
188 if (total_bytes > FDSINK_MAX_ALLOCA_SIZE)
189 g_free (mem);
190 } else {
191 gssize ret;
192
193 written = 0;
194 for (i = 0; i < iovcnt; ++i) {
195 do {
196 ret = write (fd, iov[i].iov_base, iov[i].iov_len);
197 } while (ret < 0 && errno == EINTR);
198 if (ret > 0)
199 written += ret;
200 if (ret != iov[i].iov_len)
201 break;
202 }
203 }
204 }
205
206 return written;
207 }
208
209 static GstFlowReturn
gst_writev_iovecs(GstObject * sink,gint fd,GstPoll * fdset,struct iovec * vecs,guint n_vecs,gsize bytes_to_write,guint64 * bytes_written,gint max_transient_error_timeout,guint64 current_position,gboolean * flushing)210 gst_writev_iovecs (GstObject * sink, gint fd, GstPoll * fdset,
211 struct iovec *vecs, guint n_vecs, gsize bytes_to_write,
212 guint64 * bytes_written, gint max_transient_error_timeout,
213 guint64 current_position, gboolean * flushing)
214 {
215 GstFlowReturn flow_ret;
216 gint64 start_time = 0;
217
218 *bytes_written = 0;
219 max_transient_error_timeout *= 1000;
220 if (max_transient_error_timeout)
221 start_time = g_get_monotonic_time ();
222
223 GST_LOG_OBJECT (sink, "%u iovecs", n_vecs);
224
225 /* now write it all out! */
226 {
227 gssize ret, left;
228
229 left = bytes_to_write;
230
231 do {
232 if (flushing != NULL && g_atomic_int_get (flushing)) {
233 GST_DEBUG_OBJECT (sink, "Flushing, exiting loop");
234 flow_ret = GST_FLOW_FLUSHING;
235 goto out;
236 }
237 #ifndef HAVE_WIN32
238 if (fdset != NULL) {
239 do {
240 GST_DEBUG_OBJECT (sink, "going into select, have %" G_GSSIZE_FORMAT
241 " bytes to write", left);
242 ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE);
243 } while (ret == -1 && (errno == EINTR || errno == EAGAIN));
244
245 if (ret == -1) {
246 if (errno == EBUSY)
247 goto stopped;
248 else
249 goto select_error;
250 }
251 }
252 #endif
253
254 ret = gst_writev (fd, vecs, n_vecs, left);
255
256 if (ret > 0) {
257 /* Wrote something, allow the caller to update the vecs passed here */
258 *bytes_written = ret;
259 break;
260 }
261
262 if (errno == EAGAIN || errno == EWOULDBLOCK || ret == 0) {
263 /* do nothing, try again */
264 if (max_transient_error_timeout)
265 start_time = g_get_monotonic_time ();
266 } else if (errno == EACCES && max_transient_error_timeout > 0) {
267 /* seek back to where we started writing and try again after sleeping
268 * for 10ms.
269 *
270 * Some network file systems report EACCES spuriously, presumably
271 * because at the same time another client is reading the file.
272 * It happens at least on Linux and macOS on SMB/CIFS and NFS file
273 * systems.
274 *
275 * Note that NFS does not check access permissions during open()
276 * but only on write()/read() according to open(2), so we would
277 * loop here in case of NFS.
278 */
279 if (g_get_monotonic_time () > start_time + max_transient_error_timeout) {
280 GST_ERROR_OBJECT (sink, "Got EACCES for more than %dms, failing",
281 max_transient_error_timeout);
282 goto write_error;
283 }
284 GST_DEBUG_OBJECT (sink, "got EACCES, retry after 10ms sleep");
285 g_assert (current_position != -1);
286 g_usleep (10000);
287
288 /* Seek back to the current position, sometimes a partial write
289 * happened and we have no idea how much and if what was written
290 * is actually correct (it sometimes isn't)
291 */
292 ret = lseek (fd, current_position, SEEK_SET);
293 if (ret < 0 || ret != current_position) {
294 GST_ERROR_OBJECT (sink,
295 "failed to seek back to current write position");
296 goto write_error;
297 }
298 } else {
299 goto write_error;
300 }
301 #ifdef HAVE_WIN32
302 /* do short sleep on windows where we don't use gst_poll(),
303 * to avoid excessive busy looping */
304 if (fdset != NULL)
305 g_usleep (1000);
306 #endif
307 }
308 while (left > 0);
309 }
310
311 flow_ret = GST_FLOW_OK;
312
313 out:
314
315 return flow_ret;
316
317 /* ERRORS */
318 #ifndef HAVE_WIN32
319 select_error:
320 {
321 GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
322 ("select on file descriptor: %s", g_strerror (errno)));
323 GST_DEBUG_OBJECT (sink, "Error during select: %s", g_strerror (errno));
324 flow_ret = GST_FLOW_ERROR;
325 goto out;
326 }
327 stopped:
328 {
329 GST_DEBUG_OBJECT (sink, "Select stopped");
330 flow_ret = GST_FLOW_FLUSHING;
331 goto out;
332 }
333 #endif
334 write_error:
335 {
336 switch (errno) {
337 case ENOSPC:
338 GST_ELEMENT_ERROR (sink, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
339 break;
340 default:{
341 GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
342 ("Error while writing to file descriptor %d: %s",
343 fd, g_strerror (errno)));
344 }
345 }
346 flow_ret = GST_FLOW_ERROR;
347 goto out;
348 }
349 }
350
351 GstFlowReturn
gst_writev_buffer(GstObject * sink,gint fd,GstPoll * fdset,GstBuffer * buffer,guint64 * bytes_written,guint64 skip,gint max_transient_error_timeout,guint64 current_position,gboolean * flushing)352 gst_writev_buffer (GstObject * sink, gint fd, GstPoll * fdset,
353 GstBuffer * buffer,
354 guint64 * bytes_written, guint64 skip,
355 gint max_transient_error_timeout, guint64 current_position,
356 gboolean * flushing)
357 {
358 GstFlowReturn flow_ret = GST_FLOW_OK;
359 struct iovec *vecs;
360 GstMapInfo *maps;
361 guint i, num_mem, num_vecs;
362 gsize left = 0;
363
364 /* Buffers can contain up to 16 memories, so we can safely directly call
365 * writev() here without splitting up */
366 g_assert (gst_buffer_get_max_memory () <= GST_IOV_MAX);
367
368 num_mem = num_vecs = gst_buffer_n_memory (buffer);
369
370 GST_DEBUG ("Writing buffer %p with %u memories and %" G_GSIZE_FORMAT " bytes",
371 buffer, num_mem, gst_buffer_get_size (buffer));
372
373 vecs = g_newa (struct iovec, num_mem);
374 maps = g_newa (GstMapInfo, num_mem);
375
376 /* Map all memories */
377 {
378 GstMemory *mem;
379 guint i;
380
381 for (i = 0; i < num_mem; ++i) {
382 mem = gst_buffer_peek_memory (buffer, i);
383 if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) {
384 vecs[i].iov_base = maps[i].data;
385 vecs[i].iov_len = maps[i].size;
386 } else {
387 GST_WARNING ("Failed to map memory %p for reading", mem);
388 vecs[i].iov_base = (void *) "";
389 vecs[i].iov_len = 0;
390 }
391 left += vecs[i].iov_len;
392 }
393 }
394
395 do {
396 guint64 bytes_written_local = 0;
397
398 flow_ret =
399 gst_writev_iovecs (sink, fd, fdset, vecs, num_vecs, left,
400 &bytes_written_local, max_transient_error_timeout, current_position,
401 flushing);
402
403 GST_DEBUG ("Wrote %" G_GUINT64_FORMAT " bytes of %" G_GSIZE_FORMAT ": %s",
404 bytes_written_local, left, gst_flow_get_name (flow_ret));
405
406 if (flow_ret != GST_FLOW_OK) {
407 g_assert (bytes_written_local == 0);
408 break;
409 }
410
411 if (bytes_written)
412 *bytes_written += bytes_written_local;
413
414 /* Done, no need to do bookkeeping */
415 if (bytes_written_local == left)
416 break;
417
418 /* skip vectors that have been written in full */
419 while (bytes_written_local >= vecs[0].iov_len) {
420 bytes_written_local -= vecs[0].iov_len;
421 left -= vecs[0].iov_len;
422 ++vecs;
423 --num_vecs;
424 }
425 g_assert (num_vecs > 0);
426 /* skip partially written vector data */
427 if (bytes_written_local > 0) {
428 vecs[0].iov_len -= bytes_written_local;
429 vecs[0].iov_base = ((guint8 *) vecs[0].iov_base) + bytes_written_local;
430 left -= bytes_written_local;
431 }
432 } while (left > 0);
433
434 for (i = 0; i < num_mem; i++)
435 gst_memory_unmap (maps[i].memory, &maps[i]);
436
437 return flow_ret;
438 }
439
440 GstFlowReturn
gst_writev_mem(GstObject * sink,gint fd,GstPoll * fdset,const guint8 * data,guint size,guint64 * bytes_written,guint64 skip,gint max_transient_error_timeout,guint64 current_position,gboolean * flushing)441 gst_writev_mem (GstObject * sink, gint fd, GstPoll * fdset,
442 const guint8 * data, guint size,
443 guint64 * bytes_written, guint64 skip,
444 gint max_transient_error_timeout, guint64 current_position,
445 gboolean * flushing)
446 {
447 GstFlowReturn flow_ret = GST_FLOW_OK;
448 struct iovec vec;
449 gsize left;
450
451 GST_DEBUG ("Writing memory %p with %u bytes", data, size);
452
453 vec.iov_len = size;
454 vec.iov_base = (guint8 *) data;
455 left = size;
456
457 do {
458 guint64 bytes_written_local = 0;
459
460 flow_ret =
461 gst_writev_iovecs (sink, fd, fdset, &vec, 1, left,
462 &bytes_written_local, max_transient_error_timeout, current_position,
463 flushing);
464
465 GST_DEBUG ("Wrote %" G_GUINT64_FORMAT " bytes of %" G_GSIZE_FORMAT ": %s",
466 bytes_written_local, left, gst_flow_get_name (flow_ret));
467
468 if (flow_ret != GST_FLOW_OK) {
469 g_assert (bytes_written_local == 0);
470 break;
471 }
472
473 if (bytes_written)
474 *bytes_written += bytes_written_local;
475
476 /* All done, no need for bookkeeping */
477 if (bytes_written_local == left)
478 break;
479
480 /* skip partially written vector data */
481 if (bytes_written_local < left) {
482 vec.iov_len -= bytes_written_local;
483 vec.iov_base = ((guint8 *) vec.iov_base) + bytes_written_local;
484 left -= bytes_written_local;
485 }
486 } while (left > 0);
487
488 return flow_ret;
489 }
490
491 GstFlowReturn
gst_writev_buffer_list(GstObject * sink,gint fd,GstPoll * fdset,GstBufferList * buffer_list,guint64 * bytes_written,guint64 skip,gint max_transient_error_timeout,guint64 current_position,gboolean * flushing)492 gst_writev_buffer_list (GstObject * sink, gint fd, GstPoll * fdset,
493 GstBufferList * buffer_list,
494 guint64 * bytes_written, guint64 skip,
495 gint max_transient_error_timeout, guint64 current_position,
496 gboolean * flushing)
497 {
498 GstFlowReturn flow_ret = GST_FLOW_OK;
499 struct iovec *vecs;
500 GstMapInfo *maps;
501 guint num_bufs, current_buf_idx = 0, current_buf_mem_idx = 0;
502 guint i, num_vecs;
503 gsize left = 0;
504
505 num_bufs = gst_buffer_list_length (buffer_list);
506 num_vecs = 0;
507
508 GST_DEBUG ("Writing buffer list %p with %u buffers", buffer_list, num_bufs);
509
510 vecs = g_newa (struct iovec, GST_IOV_MAX);
511 maps = g_newa (GstMapInfo, GST_IOV_MAX);
512
513 /* Map the first GST_IOV_MAX memories */
514 {
515 GstBuffer *buf;
516 GstMemory *mem;
517 guint j = 0;
518
519 for (i = 0; i < num_bufs && num_vecs < GST_IOV_MAX; i++) {
520 guint num_mem;
521
522 buf = gst_buffer_list_get (buffer_list, i);
523 num_mem = gst_buffer_n_memory (buf);
524
525 for (j = 0; j < num_mem && num_vecs < GST_IOV_MAX; j++) {
526 mem = gst_buffer_peek_memory (buf, j);
527 if (gst_memory_map (mem, &maps[num_vecs], GST_MAP_READ)) {
528 vecs[num_vecs].iov_base = maps[num_vecs].data;
529 vecs[num_vecs].iov_len = maps[num_vecs].size;
530 } else {
531 GST_WARNING ("Failed to map memory %p for reading", mem);
532 vecs[num_vecs].iov_base = (void *) "";
533 vecs[num_vecs].iov_len = 0;
534 }
535 left += vecs[num_vecs].iov_len;
536 num_vecs++;
537 }
538 current_buf_mem_idx = j;
539 if (j == num_mem)
540 current_buf_mem_idx = 0;
541 }
542 current_buf_idx = i;
543 }
544
545 do {
546 guint64 bytes_written_local = 0;
547 guint vecs_written = 0;
548
549 flow_ret =
550 gst_writev_iovecs (sink, fd, fdset, vecs, num_vecs, left,
551 &bytes_written_local, max_transient_error_timeout, current_position,
552 flushing);
553
554 GST_DEBUG ("Wrote %" G_GUINT64_FORMAT " bytes of %" G_GSIZE_FORMAT ": %s",
555 bytes_written_local, left, gst_flow_get_name (flow_ret));
556
557 if (flow_ret != GST_FLOW_OK) {
558 g_assert (bytes_written_local == 0);
559 break;
560 }
561
562 if (flow_ret != GST_FLOW_OK) {
563 g_assert (bytes_written_local == 0);
564 break;
565 }
566
567 if (bytes_written)
568 *bytes_written += bytes_written_local;
569
570 /* All done, no need for bookkeeping */
571 if (bytes_written_local == left && current_buf_idx == num_bufs)
572 break;
573
574 /* skip vectors that have been written in full */
575 while (vecs_written < num_vecs
576 && bytes_written_local >= vecs[vecs_written].iov_len) {
577 bytes_written_local -= vecs[vecs_written].iov_len;
578 left -= vecs[vecs_written].iov_len;
579 vecs_written++;
580 }
581 g_assert (vecs_written < num_vecs || bytes_written_local == 0);
582 /* skip partially written vector data */
583 if (bytes_written_local > 0) {
584 vecs[vecs_written].iov_len -= bytes_written_local;
585 vecs[vecs_written].iov_base =
586 ((guint8 *) vecs[0].iov_base) + bytes_written_local;
587 left -= bytes_written_local;
588 }
589
590 /* If we have buffers left, fill them in now */
591 if (current_buf_idx < num_bufs) {
592 GstBuffer *buf;
593 GstMemory *mem;
594 guint j = current_buf_mem_idx;
595
596 /* Unmap the first vecs_written memories now */
597 for (i = 0; i < vecs_written; i++)
598 gst_memory_unmap (maps[i].memory, &maps[i]);
599 /* Move upper remaining vecs and maps back to the beginning */
600 memmove (vecs, &vecs[vecs_written],
601 (num_vecs - vecs_written) * sizeof (vecs[0]));
602 memmove (maps, &maps[vecs_written],
603 (num_vecs - vecs_written) * sizeof (maps[0]));
604 num_vecs -= vecs_written;
605
606 /* And finally refill */
607 for (i = current_buf_idx; i < num_bufs && num_vecs < GST_IOV_MAX; i++) {
608 guint num_mem;
609
610 buf = gst_buffer_list_get (buffer_list, i);
611 num_mem = gst_buffer_n_memory (buf);
612
613 for (j = current_buf_mem_idx; j < num_mem && num_vecs < GST_IOV_MAX;
614 j++) {
615 mem = gst_buffer_peek_memory (buf, j);
616 if (gst_memory_map (mem, &maps[num_vecs], GST_MAP_READ)) {
617 vecs[num_vecs].iov_base = maps[num_vecs].data;
618 vecs[num_vecs].iov_len = maps[num_vecs].size;
619 } else {
620 GST_WARNING ("Failed to map memory %p for reading", mem);
621 vecs[num_vecs].iov_base = (void *) "";
622 vecs[num_vecs].iov_len = 0;
623 }
624 left += vecs[num_vecs].iov_len;
625 num_vecs++;
626 }
627 current_buf_mem_idx = j;
628 if (current_buf_mem_idx == num_mem)
629 current_buf_mem_idx = 0;
630 }
631 current_buf_idx = i;
632 }
633 } while (left > 0);
634
635 for (i = 0; i < num_vecs; i++)
636 gst_memory_unmap (maps[i].memory, &maps[i]);
637
638 return flow_ret;
639 }
640