1 /* GStreamer
2 * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4 * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5 * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
6 *
7 * gstpoll.c: File descriptor set
8 *
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Library General Public
11 * License as published by the Free Software Foundation; either
12 * version 2 of the License, or (at your option) any later version.
13 *
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Library General Public License for more details.
18 *
19 * You should have received a copy of the GNU Library General Public
20 * License along with this library; if not, write to the
21 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22 * Boston, MA 02110-1301, USA.
23 */
24 /**
25 * SECTION:gstpoll
26 * @title: GstPoll
27 * @short_description: Keep track of file descriptors and make it possible
28 * to wait on them in a cancellable way
29 *
30 * A #GstPoll keeps track of file descriptors much like fd_set (used with
31 * select ()) or a struct pollfd array (used with poll ()). Once created with
32 * gst_poll_new(), the set can be used to wait for file descriptors to be
33 * readable and/or writable. It is possible to make this wait be controlled
34 * by specifying %TRUE for the @controllable flag when creating the set (or
35 * later calling gst_poll_set_controllable()).
36 *
37 * New file descriptors are added to the set using gst_poll_add_fd(), and
38 * removed using gst_poll_remove_fd(). Controlling which file descriptors
39 * should be waited for to become readable and/or writable are done using
40 * gst_poll_fd_ctl_read(), gst_poll_fd_ctl_write() and gst_poll_fd_ctl_pri().
41 *
42 * Use gst_poll_wait() to wait for the file descriptors to actually become
43 * readable and/or writable, or to timeout if no file descriptor is available
44 * in time. The wait can be controlled by calling gst_poll_restart() and
45 * gst_poll_set_flushing().
46 *
47 * Once the file descriptor set has been waited for, one can use
48 * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
49 * gst_poll_fd_has_error() to see if it has generated an error,
50 * gst_poll_fd_can_read() to see if it is possible to read from the file
51 * descriptor, and gst_poll_fd_can_write() to see if it is possible to
52 * write to it.
53 *
54 */
55
56 #ifdef HAVE_CONFIG_H
57 #include "config.h"
58 #endif
59
60 #include "gst_private.h"
61 #include "glib-compat-private.h"
62
63 #include <sys/types.h>
64
65 #ifdef HAVE_UNISTD_H
66 #include <unistd.h>
67 #endif
68
69 #include <errno.h>
70 #include <fcntl.h>
71
72 #include <glib.h>
73
74 #ifdef G_OS_WIN32
75 #include <winsock2.h>
76 #else
77 #define _GNU_SOURCE 1
78 #ifdef HAVE_SYS_POLL_H
79 #include <sys/poll.h>
80 #endif
81 #ifdef HAVE_POLL_H
82 #include <poll.h>
83 #endif
84 #include <sys/time.h>
85 #include <sys/socket.h>
86 #endif
87
88 #ifdef G_OS_WIN32
89 # ifndef EWOULDBLOCK
90 # define EWOULDBLOCK EAGAIN /* This is just to placate gcc */
91 # endif
92 #endif /* G_OS_WIN32 */
93
94 /* OS/X needs this because of bad headers */
95 #include <string.h>
96
97 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
98 * so we prefer our own poll emulation.
99 */
100 #if defined(BROKEN_POLL)
101 #undef HAVE_POLL
102 #endif
103
104 #include "gstpoll.h"
105
106 #define GST_CAT_DEFAULT GST_CAT_POLL
107
108 #ifdef G_OS_WIN32
109 typedef struct _WinsockFd WinsockFd;
110
111 struct _WinsockFd
112 {
113 gint fd;
114 glong event_mask;
115 WSANETWORKEVENTS events;
116 glong ignored_event_mask;
117 };
118 #endif
119
120 typedef enum
121 {
122 GST_POLL_MODE_AUTO,
123 GST_POLL_MODE_SELECT,
124 GST_POLL_MODE_PSELECT,
125 GST_POLL_MODE_POLL,
126 GST_POLL_MODE_PPOLL,
127 GST_POLL_MODE_WINDOWS
128 } GstPollMode;
129
130 struct _GstPoll
131 {
132 GstPollMode mode;
133
134 GMutex lock;
135 /* array of fds, always written to and read from with lock */
136 GArray *fds;
137 /* array of active fds, only written to from the waiting thread with the
138 * lock and read from with the lock or without the lock from the waiting
139 * thread */
140 GArray *active_fds;
141
142 #ifndef G_OS_WIN32
143 GstPollFD control_read_fd;
144 GstPollFD control_write_fd;
145 #else
146 GArray *active_fds_ignored;
147 GArray *events;
148 GArray *active_events;
149
150 HANDLE wakeup_event;
151 #endif
152
153 gboolean controllable;
154 gint waiting;
155 gint control_pending;
156 gint flushing;
157 gboolean timer;
158 gint rebuild;
159 };
160
161 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
162 gboolean active);
163 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
164
165 #define IS_FLUSHING(s) (g_atomic_int_get(&(s)->flushing))
166 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
167
168 #define INC_WAITING(s) (g_atomic_int_add(&(s)->waiting, 1))
169 #define DEC_WAITING(s) (g_atomic_int_add(&(s)->waiting, -1))
170 #define GET_WAITING(s) (g_atomic_int_get(&(s)->waiting))
171
172 #define TEST_REBUILD(s) (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
173 #define MARK_REBUILD(s) (g_atomic_int_set(&(s)->rebuild, 1))
174
175 #ifndef G_OS_WIN32
176
177 static gboolean
wake_event(GstPoll * set)178 wake_event (GstPoll * set)
179 {
180 ssize_t num_written;
181 while ((num_written = write (set->control_write_fd.fd, "W", 1)) != 1) {
182 if (num_written == -1 && errno != EAGAIN && errno != EINTR) {
183 g_critical ("%p: failed to wake event: %s", set, strerror (errno));
184 return FALSE;
185 }
186 }
187 return TRUE;
188 }
189
190 static gboolean
release_event(GstPoll * set)191 release_event (GstPoll * set)
192 {
193 gchar buf[1] = { '\0' };
194 ssize_t num_read;
195 while ((num_read = read (set->control_read_fd.fd, buf, 1)) != 1) {
196 if (num_read == -1 && errno != EAGAIN && errno != EINTR) {
197 g_critical ("%p: failed to release event: %s", set, strerror (errno));
198 return FALSE;
199 }
200 }
201 return TRUE;
202 }
203
204 #else
205
206 static void
format_last_error(gchar * buf,size_t buf_len)207 format_last_error (gchar * buf, size_t buf_len)
208 {
209 DWORD flags = FORMAT_MESSAGE_FROM_SYSTEM;
210 LPCVOID src = NULL;
211 DWORD lang = 0;
212 DWORD id;
213 id = GetLastError ();
214 FormatMessage (flags, src, id, lang, buf, (DWORD) buf_len, NULL);
215 SetLastError (id);
216 }
217
218 static gboolean
wake_event(GstPoll * set)219 wake_event (GstPoll * set)
220 {
221 SetLastError (0);
222 errno = 0;
223 if (!SetEvent (set->wakeup_event)) {
224 gchar msg[1024] = "<unknown>";
225 format_last_error (msg, sizeof (msg));
226 g_critical ("%p: failed to set wakup_event: %s", set, msg);
227 errno = EBADF;
228 return FALSE;
229 }
230
231 return TRUE;
232 }
233
234 static gboolean
release_event(GstPoll * set)235 release_event (GstPoll * set)
236 {
237 DWORD status;
238 SetLastError (0);
239 errno = 0;
240
241 status = WaitForSingleObject (set->wakeup_event, INFINITE);
242 if (status) {
243 const gchar *reason = "unknown";
244 gchar msg[1024] = "<unknown>";
245 switch (status) {
246 case WAIT_ABANDONED:
247 reason = "WAIT_ABANDONED";
248 break;
249 case WAIT_TIMEOUT:
250 reason = "WAIT_TIMEOUT";
251 break;
252 case WAIT_FAILED:
253 format_last_error (msg, sizeof (msg));
254 reason = msg;
255 break;
256 default:
257 reason = "other";
258 break;
259 }
260 g_critical ("%p: failed to block on wakup_event: %s", set, reason);
261 errno = EBADF;
262 return FALSE;
263 }
264
265 if (!ResetEvent (set->wakeup_event)) {
266 gchar msg[1024] = "<unknown>";
267 format_last_error (msg, sizeof (msg));
268 g_critical ("%p: failed to reset wakup_event: %s", set, msg);
269 errno = EBADF;
270 return FALSE;
271 }
272
273 return TRUE;
274 }
275
276 #endif
277
278 /* the poll/select call is also performed on a control socket, that way
279 * we can send special commands to control it */
280 static inline gboolean
raise_wakeup(GstPoll * set)281 raise_wakeup (GstPoll * set)
282 {
283 gboolean result = TRUE;
284
285 /* makes testing control_pending and WAKE_EVENT() atomic. */
286 g_mutex_lock (&set->lock);
287
288 if (set->control_pending == 0) {
289 /* raise when nothing pending */
290 GST_LOG ("%p: raise", set);
291 result = wake_event (set);
292 }
293
294 if (result) {
295 set->control_pending++;
296 }
297
298 g_mutex_unlock (&set->lock);
299
300 return result;
301 }
302
303 static inline gboolean
release_wakeup(GstPoll * set)304 release_wakeup (GstPoll * set)
305 {
306 gboolean result = FALSE;
307
308 /* makes testing/modifying control_pending and RELEASE_EVENT() atomic. */
309 g_mutex_lock (&set->lock);
310
311 if (set->control_pending > 0) {
312 /* release, only if this was the last pending. */
313 if (set->control_pending == 1) {
314 GST_LOG ("%p: release", set);
315 result = release_event (set);
316 } else {
317 result = TRUE;
318 }
319
320 if (result) {
321 set->control_pending--;
322 }
323 } else {
324 errno = EWOULDBLOCK;
325 }
326
327 g_mutex_unlock (&set->lock);
328
329 return result;
330 }
331
332 static inline gint
release_all_wakeup(GstPoll * set)333 release_all_wakeup (GstPoll * set)
334 {
335 gint old;
336
337 /* makes testing control_pending and RELEASE_EVENT() atomic. */
338 g_mutex_lock (&set->lock);
339
340 if ((old = set->control_pending) > 0) {
341 GST_LOG ("%p: releasing %d", set, old);
342 if (release_event (set)) {
343 set->control_pending = 0;
344 } else {
345 old = 0;
346 }
347 }
348
349 g_mutex_unlock (&set->lock);
350
351 return old;
352 }
353
354 static gint
find_index(GArray * array,GstPollFD * fd)355 find_index (GArray * array, GstPollFD * fd)
356 {
357 #ifndef G_OS_WIN32
358 struct pollfd *ifd;
359 #else
360 WinsockFd *ifd;
361 #endif
362 guint i;
363
364 /* start by assuming the index found in the fd is still valid */
365 if (fd->idx >= 0 && fd->idx < array->len) {
366 #ifndef G_OS_WIN32
367 ifd = &g_array_index (array, struct pollfd, fd->idx);
368 #else
369 ifd = &g_array_index (array, WinsockFd, fd->idx);
370 #endif
371
372 if (ifd->fd == fd->fd) {
373 return fd->idx;
374 }
375 }
376
377 /* the pollfd array has changed and we need to lookup the fd again */
378 for (i = 0; i < array->len; i++) {
379 #ifndef G_OS_WIN32
380 ifd = &g_array_index (array, struct pollfd, i);
381 #else
382 ifd = &g_array_index (array, WinsockFd, i);
383 #endif
384
385 if (ifd->fd == fd->fd) {
386 fd->idx = (gint) i;
387 return fd->idx;
388 }
389 }
390
391 fd->idx = -1;
392 return fd->idx;
393 }
394
395 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
396 /* check if all file descriptors will fit in an fd_set */
397 static gboolean
selectable_fds(GstPoll * set)398 selectable_fds (GstPoll * set)
399 {
400 guint i;
401
402 g_mutex_lock (&set->lock);
403 for (i = 0; i < set->fds->len; i++) {
404 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
405
406 if (pfd->fd >= FD_SETSIZE)
407 goto too_many;
408 }
409 g_mutex_unlock (&set->lock);
410
411 return TRUE;
412
413 too_many:
414 {
415 g_mutex_unlock (&set->lock);
416 return FALSE;
417 }
418 }
419
420 /* check if the timeout will convert to a timeout value used for poll()
421 * without a loss of precision
422 */
423 static gboolean
pollable_timeout(GstClockTime timeout)424 pollable_timeout (GstClockTime timeout)
425 {
426 if (timeout == GST_CLOCK_TIME_NONE)
427 return TRUE;
428
429 /* not a nice multiple of milliseconds */
430 if (timeout % 1000000)
431 return FALSE;
432
433 return TRUE;
434 }
435 #endif
436
437 static GstPollMode
choose_mode(GstPoll * set,GstClockTime timeout)438 choose_mode (GstPoll * set, GstClockTime timeout)
439 {
440 GstPollMode mode;
441
442 if (set->mode == GST_POLL_MODE_AUTO) {
443 #ifdef HAVE_PPOLL
444 mode = GST_POLL_MODE_PPOLL;
445 #elif defined(HAVE_POLL)
446 if (!selectable_fds (set) || pollable_timeout (timeout)) {
447 mode = GST_POLL_MODE_POLL;
448 } else {
449 #ifdef HAVE_PSELECT
450 mode = GST_POLL_MODE_PSELECT;
451 #else
452 mode = GST_POLL_MODE_SELECT;
453 #endif
454 }
455 #elif defined(HAVE_PSELECT)
456 mode = GST_POLL_MODE_PSELECT;
457 #else
458 mode = GST_POLL_MODE_SELECT;
459 #endif
460 } else {
461 mode = set->mode;
462 }
463 return mode;
464 }
465
466 #ifndef G_OS_WIN32
467 static gint
pollfd_to_fd_set(GstPoll * set,fd_set * readfds,fd_set * writefds,fd_set * errorfds)468 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
469 fd_set * errorfds)
470 {
471 gint max_fd = -1;
472 guint i;
473
474 FD_ZERO (readfds);
475 FD_ZERO (writefds);
476 FD_ZERO (errorfds);
477
478 g_mutex_lock (&set->lock);
479
480 for (i = 0; i < set->active_fds->len; i++) {
481 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
482
483 if (pfd->fd < FD_SETSIZE) {
484 if (pfd->events & POLLIN)
485 FD_SET (pfd->fd, readfds);
486 if (pfd->events & POLLOUT)
487 FD_SET (pfd->fd, writefds);
488 if (pfd->events)
489 FD_SET (pfd->fd, errorfds);
490 if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
491 max_fd = pfd->fd;
492 }
493 }
494
495 g_mutex_unlock (&set->lock);
496
497 return max_fd;
498 }
499
500 static void
fd_set_to_pollfd(GstPoll * set,fd_set * readfds,fd_set * writefds,fd_set * errorfds)501 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
502 fd_set * errorfds)
503 {
504 guint i;
505
506 g_mutex_lock (&set->lock);
507
508 for (i = 0; i < set->active_fds->len; i++) {
509 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
510
511 if (pfd->fd < FD_SETSIZE) {
512 pfd->revents = 0;
513 if (FD_ISSET (pfd->fd, readfds))
514 pfd->revents |= POLLIN;
515 if (FD_ISSET (pfd->fd, writefds))
516 pfd->revents |= POLLOUT;
517 if (FD_ISSET (pfd->fd, errorfds))
518 pfd->revents |= POLLERR;
519 }
520 }
521
522 g_mutex_unlock (&set->lock);
523 }
524 #else /* G_OS_WIN32 */
525 /*
526 * Translate errors thrown by the Winsock API used by GstPoll:
527 * WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
528 */
529 static gint
gst_poll_winsock_error_to_errno(DWORD last_error)530 gst_poll_winsock_error_to_errno (DWORD last_error)
531 {
532 switch (last_error) {
533 case WSA_INVALID_HANDLE:
534 case WSAEINVAL:
535 case WSAENOTSOCK:
536 return EBADF;
537
538 case WSA_NOT_ENOUGH_MEMORY:
539 return ENOMEM;
540
541 /*
542 * Anything else, including:
543 * WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
544 * WSANOTINITIALISED
545 */
546 default:
547 return EINVAL;
548 }
549 }
550
551 static void
gst_poll_free_winsock_event(GstPoll * set,gint idx)552 gst_poll_free_winsock_event (GstPoll * set, gint idx)
553 {
554 WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
555 HANDLE event = g_array_index (set->events, HANDLE, idx);
556
557 WSAEventSelect (wfd->fd, event, 0);
558 CloseHandle (event);
559 }
560
561 static void
gst_poll_update_winsock_event_mask(GstPoll * set,gint idx,glong flags,gboolean active)562 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
563 gboolean active)
564 {
565 WinsockFd *wfd;
566
567 wfd = &g_array_index (set->fds, WinsockFd, idx);
568
569 if (active)
570 wfd->event_mask |= flags;
571 else
572 wfd->event_mask &= ~flags;
573
574 /* reset ignored state if the new mask doesn't overlap at all */
575 if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
576 wfd->ignored_event_mask = 0;
577 }
578
579 static gboolean
gst_poll_prepare_winsock_active_sets(GstPoll * set)580 gst_poll_prepare_winsock_active_sets (GstPoll * set)
581 {
582 guint i;
583
584 g_array_set_size (set->active_fds, 0);
585 g_array_set_size (set->active_fds_ignored, 0);
586 g_array_set_size (set->active_events, 0);
587 g_array_append_val (set->active_events, set->wakeup_event);
588
589 for (i = 0; i < set->fds->len; i++) {
590 WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
591 HANDLE event = g_array_index (set->events, HANDLE, i);
592
593 if (wfd->ignored_event_mask == 0) {
594 gint ret;
595
596 g_array_append_val (set->active_fds, *wfd);
597 g_array_append_val (set->active_events, event);
598
599 ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
600 if (G_UNLIKELY (ret != 0)) {
601 errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
602 return FALSE;
603 }
604 } else {
605 g_array_append_val (set->active_fds_ignored, wfd);
606 }
607 }
608
609 return TRUE;
610 }
611
612 static gint
gst_poll_collect_winsock_events(GstPoll * set)613 gst_poll_collect_winsock_events (GstPoll * set)
614 {
615 gint res, i;
616
617 /*
618 * We need to check which events are signaled, and call
619 * WSAEnumNetworkEvents for those that are, which resets
620 * the event and clears the internal network event records.
621 */
622 res = 0;
623 for (i = 0; i < set->active_fds->len; i++) {
624 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
625 HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
626 DWORD wait_ret;
627
628 wait_ret = WaitForSingleObject (event, 0);
629 if (wait_ret == WAIT_OBJECT_0) {
630 gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
631
632 if (G_UNLIKELY (enum_ret != 0)) {
633 res = -1;
634 errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
635 break;
636 }
637
638 res++;
639 } else {
640 /* clear any previously stored result */
641 memset (&wfd->events, 0, sizeof (wfd->events));
642 }
643 }
644
645 /* If all went well we also need to reset the ignored fds. */
646 if (res >= 0) {
647 res += set->active_fds_ignored->len;
648
649 for (i = 0; i < set->active_fds_ignored->len; i++) {
650 WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
651
652 wfd->ignored_event_mask = 0;
653 }
654
655 g_array_set_size (set->active_fds_ignored, 0);
656 }
657
658 return res;
659 }
660 #endif
661
662 /**
663 * gst_poll_new: (skip)
664 * @controllable: whether it should be possible to control a wait.
665 *
666 * Create a new file descriptor set. If @controllable, it
667 * is possible to restart or flush a call to gst_poll_wait() with
668 * gst_poll_restart() and gst_poll_set_flushing() respectively.
669 *
670 * Free-function: gst_poll_free
671 *
672 * Returns: (transfer full) (nullable): a new #GstPoll, or %NULL in
673 * case of an error. Free with gst_poll_free().
674 */
675 GstPoll *
gst_poll_new(gboolean controllable)676 gst_poll_new (gboolean controllable)
677 {
678 GstPoll *nset;
679
680 nset = g_slice_new0 (GstPoll);
681 GST_DEBUG ("%p: new controllable : %d", nset, controllable);
682 g_mutex_init (&nset->lock);
683 #ifndef G_OS_WIN32
684 nset->mode = GST_POLL_MODE_AUTO;
685 nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
686 nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
687 nset->control_read_fd.fd = -1;
688 nset->control_write_fd.fd = -1;
689 {
690 gint control_sock[2];
691
692 if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
693 goto no_socket_pair;
694
695 nset->control_read_fd.fd = control_sock[0];
696 nset->control_write_fd.fd = control_sock[1];
697
698 gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
699 gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
700 }
701 #else
702 nset->mode = GST_POLL_MODE_WINDOWS;
703 nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
704 nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
705 nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
706 nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
707 nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
708
709 nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
710 #endif
711
712 /* ensure (re)build, though already sneakily set in non-windows case */
713 MARK_REBUILD (nset);
714
715 nset->controllable = controllable;
716 nset->control_pending = 0;
717
718 return nset;
719
720 /* ERRORS */
721 #ifndef G_OS_WIN32
722 no_socket_pair:
723 {
724 GST_WARNING ("%p: can't create socket pair !", nset);
725 gst_poll_free (nset);
726 return NULL;
727 }
728 #endif
729 }
730
731 /**
732 * gst_poll_new_timer: (skip)
733 *
734 * Create a new poll object that can be used for scheduling cancellable
735 * timeouts.
736 *
737 * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
738 * performed from different threads.
739 *
740 * Free-function: gst_poll_free
741 *
742 * Returns: (transfer full) (nullable): a new #GstPoll, or %NULL in
743 * case of an error. Free with gst_poll_free().
744 */
745 GstPoll *
gst_poll_new_timer(void)746 gst_poll_new_timer (void)
747 {
748 GstPoll *poll;
749
750 /* make a new controllable poll set */
751 if (!(poll = gst_poll_new (TRUE)))
752 goto done;
753
754 /* we are a timer */
755 poll->timer = TRUE;
756
757 done:
758 return poll;
759 }
760
761 /**
762 * gst_poll_free:
763 * @set: (transfer full): a file descriptor set.
764 *
765 * Free a file descriptor set.
766 */
767 void
gst_poll_free(GstPoll * set)768 gst_poll_free (GstPoll * set)
769 {
770 g_return_if_fail (set != NULL);
771
772 GST_DEBUG ("%p: freeing", set);
773
774 #ifndef G_OS_WIN32
775 if (set->control_write_fd.fd >= 0)
776 close (set->control_write_fd.fd);
777 if (set->control_read_fd.fd >= 0)
778 close (set->control_read_fd.fd);
779 #else
780 CloseHandle (set->wakeup_event);
781
782 {
783 guint i;
784
785 for (i = 0; i < set->events->len; i++)
786 gst_poll_free_winsock_event (set, i);
787 }
788
789 g_array_free (set->active_events, TRUE);
790 g_array_free (set->events, TRUE);
791 g_array_free (set->active_fds_ignored, TRUE);
792 #endif
793
794 g_array_free (set->active_fds, TRUE);
795 g_array_free (set->fds, TRUE);
796 g_mutex_clear (&set->lock);
797 g_slice_free (GstPoll, set);
798 }
799
800 /**
801 * gst_poll_get_read_gpollfd:
802 * @set: a #GstPoll
803 * @fd: a #GPollFD
804 *
805 * Get a GPollFD for the reading part of the control socket. This is useful when
806 * integrating with a GSource and GMainLoop.
807 */
808 void
gst_poll_get_read_gpollfd(GstPoll * set,GPollFD * fd)809 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
810 {
811 g_return_if_fail (set != NULL);
812 g_return_if_fail (fd != NULL);
813
814 #ifndef G_OS_WIN32
815 fd->fd = set->control_read_fd.fd;
816 #else
817 #if GLIB_SIZEOF_VOID_P == 8
818 fd->fd = (gint64) set->wakeup_event;
819 #else
820 fd->fd = (gint) set->wakeup_event;
821 #endif
822 #endif
823 fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
824 fd->revents = 0;
825 }
826
827 /**
828 * gst_poll_fd_init:
829 * @fd: a #GstPollFD
830 *
831 * Initializes @fd. Alternatively you can initialize it with
832 * #GST_POLL_FD_INIT.
833 */
834 void
gst_poll_fd_init(GstPollFD * fd)835 gst_poll_fd_init (GstPollFD * fd)
836 {
837 g_return_if_fail (fd != NULL);
838
839 fd->fd = -1;
840 fd->idx = -1;
841 }
842
843 static gboolean
gst_poll_add_fd_unlocked(GstPoll * set,GstPollFD * fd)844 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
845 {
846 gint idx;
847
848 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
849
850 idx = find_index (set->fds, fd);
851 if (idx < 0) {
852 #ifndef G_OS_WIN32
853 struct pollfd nfd;
854
855 nfd.fd = fd->fd;
856 nfd.events = POLLERR | POLLNVAL | POLLHUP;
857 nfd.revents = 0;
858
859 g_array_append_val (set->fds, nfd);
860
861 fd->idx = set->fds->len - 1;
862 #else
863 WinsockFd wfd;
864 HANDLE event;
865
866 wfd.fd = fd->fd;
867 wfd.event_mask = FD_CLOSE;
868 memset (&wfd.events, 0, sizeof (wfd.events));
869 wfd.ignored_event_mask = 0;
870 event = WSACreateEvent ();
871
872 g_array_append_val (set->fds, wfd);
873 g_array_append_val (set->events, event);
874
875 fd->idx = set->fds->len - 1;
876 #endif
877 MARK_REBUILD (set);
878 } else {
879 GST_WARNING ("%p: fd already added !", set);
880 }
881
882 return TRUE;
883 }
884
885 /**
886 * gst_poll_add_fd:
887 * @set: a file descriptor set.
888 * @fd: a file descriptor.
889 *
890 * Add a file descriptor to the file descriptor set.
891 *
892 * Returns: %TRUE if the file descriptor was successfully added to the set.
893 */
894 gboolean
gst_poll_add_fd(GstPoll * set,GstPollFD * fd)895 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
896 {
897 gboolean ret;
898
899 g_return_val_if_fail (set != NULL, FALSE);
900 g_return_val_if_fail (fd != NULL, FALSE);
901 g_return_val_if_fail (fd->fd >= 0, FALSE);
902
903 g_mutex_lock (&set->lock);
904
905 ret = gst_poll_add_fd_unlocked (set, fd);
906
907 g_mutex_unlock (&set->lock);
908
909 return ret;
910 }
911
912 /**
913 * gst_poll_remove_fd:
914 * @set: a file descriptor set.
915 * @fd: a file descriptor.
916 *
917 * Remove a file descriptor from the file descriptor set.
918 *
919 * Returns: %TRUE if the file descriptor was successfully removed from the set.
920 */
921 gboolean
gst_poll_remove_fd(GstPoll * set,GstPollFD * fd)922 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
923 {
924 gint idx;
925
926 g_return_val_if_fail (set != NULL, FALSE);
927 g_return_val_if_fail (fd != NULL, FALSE);
928 g_return_val_if_fail (fd->fd >= 0, FALSE);
929
930
931 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
932
933 g_mutex_lock (&set->lock);
934
935 /* get the index, -1 is an fd that is not added */
936 idx = find_index (set->fds, fd);
937 if (idx >= 0) {
938 #ifdef G_OS_WIN32
939 gst_poll_free_winsock_event (set, idx);
940 g_array_remove_index_fast (set->events, idx);
941 #endif
942
943 /* remove the fd at index, we use _remove_index_fast, which copies the last
944 * element of the array to the freed index */
945 g_array_remove_index_fast (set->fds, idx);
946
947 /* mark fd as removed by setting the index to -1 */
948 fd->idx = -1;
949 MARK_REBUILD (set);
950 } else {
951 GST_WARNING ("%p: couldn't find fd !", set);
952 }
953
954 g_mutex_unlock (&set->lock);
955
956 return idx >= 0;
957 }
958
959 /**
960 * gst_poll_fd_ctl_write:
961 * @set: a file descriptor set.
962 * @fd: a file descriptor.
963 * @active: a new status.
964 *
965 * Control whether the descriptor @fd in @set will be monitored for
966 * writability.
967 *
968 * Returns: %TRUE if the descriptor was successfully updated.
969 */
970 gboolean
gst_poll_fd_ctl_write(GstPoll * set,GstPollFD * fd,gboolean active)971 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
972 {
973 gint idx;
974
975 g_return_val_if_fail (set != NULL, FALSE);
976 g_return_val_if_fail (fd != NULL, FALSE);
977 g_return_val_if_fail (fd->fd >= 0, FALSE);
978
979 GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
980 fd->fd, fd->idx, active);
981
982 g_mutex_lock (&set->lock);
983
984 idx = find_index (set->fds, fd);
985 if (idx >= 0) {
986 #ifndef G_OS_WIN32
987 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
988
989 if (active)
990 pfd->events |= POLLOUT;
991 else
992 pfd->events &= ~POLLOUT;
993
994 GST_LOG ("%p: pfd->events now %d (POLLOUT:%d)", set, pfd->events, POLLOUT);
995 #else
996 gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
997 active);
998 #endif
999 MARK_REBUILD (set);
1000 } else {
1001 GST_WARNING ("%p: couldn't find fd !", set);
1002 }
1003
1004 g_mutex_unlock (&set->lock);
1005
1006 return idx >= 0;
1007 }
1008
1009 static gboolean
gst_poll_fd_ctl_read_unlocked(GstPoll * set,GstPollFD * fd,gboolean active)1010 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
1011 {
1012 gint idx;
1013
1014 GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
1015 fd->fd, fd->idx, active);
1016
1017 idx = find_index (set->fds, fd);
1018
1019 if (idx >= 0) {
1020 #ifndef G_OS_WIN32
1021 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
1022
1023 if (active)
1024 pfd->events |= POLLIN;
1025 else
1026 pfd->events &= ~POLLIN;
1027 #else
1028 gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
1029 #endif
1030 MARK_REBUILD (set);
1031 } else {
1032 GST_WARNING ("%p: couldn't find fd !", set);
1033 }
1034
1035 return idx >= 0;
1036 }
1037
1038 /**
1039 * gst_poll_fd_ctl_read:
1040 * @set: a file descriptor set.
1041 * @fd: a file descriptor.
1042 * @active: a new status.
1043 *
1044 * Control whether the descriptor @fd in @set will be monitored for
1045 * readability.
1046 *
1047 * Returns: %TRUE if the descriptor was successfully updated.
1048 */
1049 gboolean
gst_poll_fd_ctl_read(GstPoll * set,GstPollFD * fd,gboolean active)1050 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
1051 {
1052 gboolean ret;
1053
1054 g_return_val_if_fail (set != NULL, FALSE);
1055 g_return_val_if_fail (fd != NULL, FALSE);
1056 g_return_val_if_fail (fd->fd >= 0, FALSE);
1057
1058 g_mutex_lock (&set->lock);
1059
1060 ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
1061
1062 g_mutex_unlock (&set->lock);
1063
1064 return ret;
1065 }
1066
1067 /**
1068 * gst_poll_fd_ctl_pri:
1069 * @set: a file descriptor set.
1070 * @fd: a file descriptor.
1071 * @active: a new status.
1072 *
1073 * Control whether the descriptor @fd in @set will be monitored for
1074 * exceptional conditions (POLLPRI).
1075 *
1076 * Not implemented on Windows (will just return %FALSE there).
1077 *
1078 * Returns: %TRUE if the descriptor was successfully updated.
1079 *
1080 * Since: 1.16
1081 */
1082 gboolean
gst_poll_fd_ctl_pri(GstPoll * set,GstPollFD * fd,gboolean active)1083 gst_poll_fd_ctl_pri (GstPoll * set, GstPollFD * fd, gboolean active)
1084 {
1085 #ifdef G_OS_WIN32
1086 return FALSE;
1087 #else
1088 gint idx;
1089
1090 g_return_val_if_fail (set != NULL, FALSE);
1091 g_return_val_if_fail (fd != NULL, FALSE);
1092 g_return_val_if_fail (fd->fd >= 0, FALSE);
1093
1094 GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
1095 fd->fd, fd->idx, active);
1096
1097 g_mutex_lock (&set->lock);
1098
1099 idx = find_index (set->fds, fd);
1100 if (idx >= 0) {
1101 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
1102
1103 if (active)
1104 pfd->events |= POLLPRI;
1105 else
1106 pfd->events &= ~POLLPRI;
1107
1108 GST_LOG ("%p: pfd->events now %d (POLLPRI:%d)", set, pfd->events, POLLOUT);
1109 MARK_REBUILD (set);
1110 } else {
1111 GST_WARNING ("%p: couldn't find fd !", set);
1112 }
1113
1114 g_mutex_unlock (&set->lock);
1115
1116 return idx >= 0;
1117 #endif
1118 }
1119
1120 /**
1121 * gst_poll_fd_ignored:
1122 * @set: a file descriptor set.
1123 * @fd: a file descriptor.
1124 *
1125 * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
1126 * the same result for @fd as last time. This function must be called if no
1127 * operation (read/write/recv/send/etc.) will be performed on @fd before
1128 * the next call to gst_poll_wait().
1129 *
1130 * The reason why this is needed is because the underlying implementation
1131 * might not allow querying the fd more than once between calls to one of
1132 * the re-enabling operations.
1133 */
1134 void
gst_poll_fd_ignored(GstPoll * set,GstPollFD * fd)1135 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
1136 {
1137 #ifdef G_OS_WIN32
1138 gint idx;
1139
1140 g_return_if_fail (set != NULL);
1141 g_return_if_fail (fd != NULL);
1142 g_return_if_fail (fd->fd >= 0);
1143
1144 g_mutex_lock (&set->lock);
1145
1146 idx = find_index (set->fds, fd);
1147 if (idx >= 0) {
1148 WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
1149
1150 wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
1151 MARK_REBUILD (set);
1152 }
1153
1154 g_mutex_unlock (&set->lock);
1155 #endif
1156 }
1157
1158 /**
1159 * gst_poll_fd_has_closed:
1160 * @set: a file descriptor set.
1161 * @fd: a file descriptor.
1162 *
1163 * Check if @fd in @set has closed the connection.
1164 *
1165 * Returns: %TRUE if the connection was closed.
1166 */
1167 gboolean
gst_poll_fd_has_closed(const GstPoll * set,GstPollFD * fd)1168 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
1169 {
1170 gboolean res = FALSE;
1171 gint idx;
1172
1173 g_return_val_if_fail (set != NULL, FALSE);
1174 g_return_val_if_fail (fd != NULL, FALSE);
1175 g_return_val_if_fail (fd->fd >= 0, FALSE);
1176
1177 g_mutex_lock (&((GstPoll *) set)->lock);
1178
1179 idx = find_index (set->active_fds, fd);
1180 if (idx >= 0) {
1181 #ifndef G_OS_WIN32
1182 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1183
1184 res = (pfd->revents & POLLHUP) != 0;
1185 #else
1186 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1187
1188 res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1189 #endif
1190 } else {
1191 GST_WARNING ("%p: couldn't find fd !", set);
1192 }
1193 g_mutex_unlock (&((GstPoll *) set)->lock);
1194
1195 GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1196
1197 return res;
1198 }
1199
1200 /**
1201 * gst_poll_fd_has_error:
1202 * @set: a file descriptor set.
1203 * @fd: a file descriptor.
1204 *
1205 * Check if @fd in @set has an error.
1206 *
1207 * Returns: %TRUE if the descriptor has an error.
1208 */
1209 gboolean
gst_poll_fd_has_error(const GstPoll * set,GstPollFD * fd)1210 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1211 {
1212 gboolean res = FALSE;
1213 gint idx;
1214
1215 g_return_val_if_fail (set != NULL, FALSE);
1216 g_return_val_if_fail (fd != NULL, FALSE);
1217 g_return_val_if_fail (fd->fd >= 0, FALSE);
1218
1219 g_mutex_lock (&((GstPoll *) set)->lock);
1220
1221 idx = find_index (set->active_fds, fd);
1222 if (idx >= 0) {
1223 #ifndef G_OS_WIN32
1224 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1225
1226 res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1227 #else
1228 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1229
1230 res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1231 (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1232 (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1233 (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1234 (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1235 #endif
1236 } else {
1237 GST_WARNING ("%p: couldn't find fd !", set);
1238 }
1239 g_mutex_unlock (&((GstPoll *) set)->lock);
1240
1241 GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1242
1243 return res;
1244 }
1245
1246 static gboolean
gst_poll_fd_can_read_unlocked(const GstPoll * set,GstPollFD * fd)1247 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1248 {
1249 gboolean res = FALSE;
1250 gint idx;
1251
1252 idx = find_index (set->active_fds, fd);
1253 if (idx >= 0) {
1254 #ifndef G_OS_WIN32
1255 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1256
1257 res = (pfd->revents & POLLIN) != 0;
1258 #else
1259 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1260
1261 res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1262 #endif
1263 } else {
1264 GST_WARNING ("%p: couldn't find fd !", set);
1265 }
1266 GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1267
1268 return res;
1269 }
1270
1271 /**
1272 * gst_poll_fd_can_read:
1273 * @set: a file descriptor set.
1274 * @fd: a file descriptor.
1275 *
1276 * Check if @fd in @set has data to be read.
1277 *
1278 * Returns: %TRUE if the descriptor has data to be read.
1279 */
1280 gboolean
gst_poll_fd_can_read(const GstPoll * set,GstPollFD * fd)1281 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1282 {
1283 gboolean res = FALSE;
1284
1285 g_return_val_if_fail (set != NULL, FALSE);
1286 g_return_val_if_fail (fd != NULL, FALSE);
1287 g_return_val_if_fail (fd->fd >= 0, FALSE);
1288
1289 g_mutex_lock (&((GstPoll *) set)->lock);
1290
1291 res = gst_poll_fd_can_read_unlocked (set, fd);
1292
1293 g_mutex_unlock (&((GstPoll *) set)->lock);
1294
1295 return res;
1296 }
1297
1298 /**
1299 * gst_poll_fd_can_write:
1300 * @set: a file descriptor set.
1301 * @fd: a file descriptor.
1302 *
1303 * Check if @fd in @set can be used for writing.
1304 *
1305 * Returns: %TRUE if the descriptor can be used for writing.
1306 */
1307 gboolean
gst_poll_fd_can_write(const GstPoll * set,GstPollFD * fd)1308 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1309 {
1310 gboolean res = FALSE;
1311 gint idx;
1312
1313 g_return_val_if_fail (set != NULL, FALSE);
1314 g_return_val_if_fail (fd != NULL, FALSE);
1315 g_return_val_if_fail (fd->fd >= 0, FALSE);
1316
1317 g_mutex_lock (&((GstPoll *) set)->lock);
1318
1319 idx = find_index (set->active_fds, fd);
1320 if (idx >= 0) {
1321 #ifndef G_OS_WIN32
1322 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1323
1324 res = (pfd->revents & POLLOUT) != 0;
1325 #else
1326 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1327
1328 res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1329 #endif
1330 } else {
1331 GST_WARNING ("%p: couldn't find fd !", set);
1332 }
1333 g_mutex_unlock (&((GstPoll *) set)->lock);
1334
1335 GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1336
1337 return res;
1338 }
1339
1340 /**
1341 * gst_poll_fd_has_pri:
1342 * @set: a file descriptor set.
1343 * @fd: a file descriptor.
1344 *
1345 * Check if @fd in @set has an exceptional condition (POLLPRI).
1346 *
1347 * Not implemented on Windows (will just return %FALSE there).
1348 *
1349 * Returns: %TRUE if the descriptor has an exceptional condition.
1350 *
1351 * Since: 1.16
1352 */
1353 gboolean
gst_poll_fd_has_pri(const GstPoll * set,GstPollFD * fd)1354 gst_poll_fd_has_pri (const GstPoll * set, GstPollFD * fd)
1355 {
1356 #ifdef G_OS_WIN32
1357 return FALSE;
1358 #else
1359 gboolean res = FALSE;
1360 gint idx;
1361
1362 g_return_val_if_fail (set != NULL, FALSE);
1363 g_return_val_if_fail (fd != NULL, FALSE);
1364 g_return_val_if_fail (fd->fd >= 0, FALSE);
1365
1366 g_mutex_lock (&((GstPoll *) set)->lock);
1367
1368 idx = find_index (set->active_fds, fd);
1369 if (idx >= 0) {
1370 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1371
1372 res = (pfd->revents & POLLPRI) != 0;
1373 } else {
1374 GST_WARNING ("%p: couldn't find fd !", set);
1375 }
1376 g_mutex_unlock (&((GstPoll *) set)->lock);
1377
1378 GST_DEBUG ("%p: fd (fd:%d, idx:%d) %d", set, fd->fd, fd->idx, res);
1379
1380 return res;
1381 #endif
1382 }
1383
1384 /**
1385 * gst_poll_wait:
1386 * @set: a #GstPoll.
1387 * @timeout: a timeout in nanoseconds.
1388 *
1389 * Wait for activity on the file descriptors in @set. This function waits up to
1390 * the specified @timeout. A timeout of #GST_CLOCK_TIME_NONE waits forever.
1391 *
1392 * For #GstPoll objects created with gst_poll_new(), this function can only be
1393 * called from a single thread at a time. If called from multiple threads,
1394 * -1 will be returned with errno set to EPERM.
1395 *
1396 * This is not true for timer #GstPoll objects created with
1397 * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1398 * simultaneously.
1399 *
1400 * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1401 * activity was detected after @timeout. If an error occurs, -1 is returned
1402 * and errno is set.
1403 */
1404 gint
gst_poll_wait(GstPoll * set,GstClockTime timeout)1405 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1406 {
1407 gboolean restarting;
1408 gboolean is_timer;
1409 int res;
1410 gint old_waiting;
1411
1412 g_return_val_if_fail (set != NULL, -1);
1413
1414 GST_DEBUG ("%p: timeout :%" GST_TIME_FORMAT, set, GST_TIME_ARGS (timeout));
1415
1416 is_timer = set->timer;
1417
1418 /* add one more waiter */
1419 old_waiting = INC_WAITING (set);
1420
1421 /* we cannot wait from multiple threads unless we are a timer */
1422 if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1423 goto already_waiting;
1424
1425 /* flushing, exit immediately */
1426 if (G_UNLIKELY (IS_FLUSHING (set)))
1427 goto flushing;
1428
1429 do {
1430 GstPollMode mode;
1431
1432 res = -1;
1433 restarting = FALSE;
1434
1435 mode = choose_mode (set, timeout);
1436
1437 if (TEST_REBUILD (set)) {
1438 g_mutex_lock (&set->lock);
1439 #ifndef G_OS_WIN32
1440 g_array_set_size (set->active_fds, set->fds->len);
1441 memcpy (set->active_fds->data, set->fds->data,
1442 set->fds->len * sizeof (struct pollfd));
1443 #else
1444 if (!gst_poll_prepare_winsock_active_sets (set))
1445 goto winsock_error;
1446 #endif
1447 g_mutex_unlock (&set->lock);
1448 }
1449
1450 switch (mode) {
1451 case GST_POLL_MODE_AUTO:
1452 g_assert_not_reached ();
1453 break;
1454 case GST_POLL_MODE_PPOLL:
1455 {
1456 #ifdef HAVE_PPOLL
1457 struct timespec ts;
1458 struct timespec *tsptr;
1459
1460 if (timeout != GST_CLOCK_TIME_NONE) {
1461 GST_TIME_TO_TIMESPEC (timeout, ts);
1462 tsptr = &ts;
1463 } else {
1464 tsptr = NULL;
1465 }
1466
1467 res =
1468 ppoll ((struct pollfd *) set->active_fds->data,
1469 set->active_fds->len, tsptr, NULL);
1470 #else
1471 g_assert_not_reached ();
1472 errno = ENOSYS;
1473 #endif
1474 break;
1475 }
1476 case GST_POLL_MODE_POLL:
1477 {
1478 #ifdef HAVE_POLL
1479 gint t;
1480
1481 if (timeout != GST_CLOCK_TIME_NONE) {
1482 t = GST_TIME_AS_MSECONDS (timeout);
1483 } else {
1484 t = -1;
1485 }
1486
1487 res =
1488 poll ((struct pollfd *) set->active_fds->data,
1489 set->active_fds->len, t);
1490 #else
1491 g_assert_not_reached ();
1492 errno = ENOSYS;
1493 #endif
1494 break;
1495 }
1496 case GST_POLL_MODE_PSELECT:
1497 #ifndef HAVE_PSELECT
1498 {
1499 g_assert_not_reached ();
1500 errno = ENOSYS;
1501 break;
1502 }
1503 #endif
1504 case GST_POLL_MODE_SELECT:
1505 {
1506 #ifndef G_OS_WIN32
1507 fd_set readfds;
1508 fd_set writefds;
1509 fd_set errorfds;
1510 gint max_fd;
1511
1512 max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1513
1514 if (mode == GST_POLL_MODE_SELECT) {
1515 struct timeval tv;
1516 struct timeval *tvptr;
1517
1518 if (timeout != GST_CLOCK_TIME_NONE) {
1519 GST_TIME_TO_TIMEVAL (timeout, tv);
1520 tvptr = &tv;
1521 } else {
1522 tvptr = NULL;
1523 }
1524
1525 GST_DEBUG ("%p: Calling select", set);
1526 res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1527 GST_DEBUG ("%p: After select, res:%d", set, res);
1528 } else {
1529 #ifdef HAVE_PSELECT
1530 struct timespec ts;
1531 struct timespec *tsptr;
1532
1533 if (timeout != GST_CLOCK_TIME_NONE) {
1534 GST_TIME_TO_TIMESPEC (timeout, ts);
1535 tsptr = &ts;
1536 } else {
1537 tsptr = NULL;
1538 }
1539
1540 GST_DEBUG ("%p: Calling pselect", set);
1541 res =
1542 pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1543 GST_DEBUG ("%p: After pselect, res:%d", set, res);
1544 #endif
1545 }
1546
1547 if (res >= 0) {
1548 fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1549 }
1550 #else /* G_OS_WIN32 */
1551 g_assert_not_reached ();
1552 errno = ENOSYS;
1553 #endif
1554 break;
1555 }
1556 case GST_POLL_MODE_WINDOWS:
1557 {
1558 #ifdef G_OS_WIN32
1559 gint ignore_count = set->active_fds_ignored->len;
1560 DWORD t, wait_ret;
1561
1562 if (G_LIKELY (ignore_count == 0)) {
1563 if (timeout != GST_CLOCK_TIME_NONE)
1564 t = GST_TIME_AS_MSECONDS (timeout);
1565 else
1566 t = INFINITE;
1567 } else {
1568 /* already one or more ignored fds, so we quickly sweep the others */
1569 t = 0;
1570 }
1571
1572 if (set->active_events->len != 0) {
1573 wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1574 (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1575 } else {
1576 wait_ret = WSA_WAIT_FAILED;
1577 WSASetLastError (WSA_INVALID_PARAMETER);
1578 }
1579
1580 if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1581 res = 0;
1582 } else if (wait_ret == WSA_WAIT_FAILED) {
1583 res = -1;
1584 errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1585 } else {
1586 /* the first entry is the wakeup event */
1587 if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1588 res = gst_poll_collect_winsock_events (set);
1589 } else {
1590 res = 1; /* wakeup event */
1591 }
1592 }
1593 #else
1594 g_assert_not_reached ();
1595 errno = ENOSYS;
1596 #endif
1597 break;
1598 }
1599 }
1600
1601 if (!is_timer) {
1602 /* Applications needs to clear the control socket themselves for timer
1603 * polls.
1604 * For other polls, we need to clear the control socket. If there was only
1605 * one socket with activity and it was the control socket, we need to
1606 * restart */
1607 if (release_all_wakeup (set) > 0 && res == 1)
1608 restarting = TRUE;
1609 }
1610
1611 /* we got woken up and we are flushing, we need to stop */
1612 if (G_UNLIKELY (IS_FLUSHING (set)))
1613 goto flushing;
1614
1615 } while (G_UNLIKELY (restarting));
1616
1617 DEC_WAITING (set);
1618
1619 return res;
1620
1621 /* ERRORS */
1622 already_waiting:
1623 {
1624 GST_LOG ("%p: we are already waiting", set);
1625 DEC_WAITING (set);
1626 errno = EPERM;
1627 return -1;
1628 }
1629 flushing:
1630 {
1631 GST_LOG ("%p: we are flushing", set);
1632 DEC_WAITING (set);
1633 errno = EBUSY;
1634 return -1;
1635 }
1636 #ifdef G_OS_WIN32
1637 winsock_error:
1638 {
1639 GST_LOG ("%p: winsock error", set);
1640 g_mutex_unlock (&set->lock);
1641 DEC_WAITING (set);
1642 return -1;
1643 }
1644 #endif
1645 }
1646
1647 /**
1648 * gst_poll_set_controllable:
1649 * @set: a #GstPoll.
1650 * @controllable: new controllable state.
1651 *
1652 * When @controllable is %TRUE, this function ensures that future calls to
1653 * gst_poll_wait() will be affected by gst_poll_restart() and
1654 * gst_poll_set_flushing().
1655 *
1656 * This function only works for non-timer #GstPoll objects created with
1657 * gst_poll_new().
1658 *
1659 * Returns: %TRUE if the controllability of @set could be updated.
1660 */
1661 gboolean
gst_poll_set_controllable(GstPoll * set,gboolean controllable)1662 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1663 {
1664 g_return_val_if_fail (set != NULL, FALSE);
1665 g_return_val_if_fail (!set->timer, FALSE);
1666
1667 GST_LOG ("%p: controllable : %d", set, controllable);
1668
1669 set->controllable = controllable;
1670
1671 return TRUE;
1672 }
1673
1674 /**
1675 * gst_poll_restart:
1676 * @set: a #GstPoll.
1677 *
1678 * Restart any gst_poll_wait() that is in progress. This function is typically
1679 * used after adding or removing descriptors to @set.
1680 *
1681 * If @set is not controllable, then this call will have no effect.
1682 *
1683 * This function only works for non-timer #GstPoll objects created with
1684 * gst_poll_new().
1685 */
1686 void
gst_poll_restart(GstPoll * set)1687 gst_poll_restart (GstPoll * set)
1688 {
1689 g_return_if_fail (set != NULL);
1690 g_return_if_fail (!set->timer);
1691
1692 if (set->controllable && GET_WAITING (set) > 0) {
1693 /* we are controllable and waiting, wake up the waiter. The socket will be
1694 * cleared by the _wait() thread and the poll will be restarted */
1695 raise_wakeup (set);
1696 }
1697 }
1698
1699 /**
1700 * gst_poll_set_flushing:
1701 * @set: a #GstPoll.
1702 * @flushing: new flushing state.
1703 *
1704 * When @flushing is %TRUE, this function ensures that current and future calls
1705 * to gst_poll_wait() will return -1, with errno set to EBUSY.
1706 *
1707 * Unsetting the flushing state will restore normal operation of @set.
1708 *
1709 * This function only works for non-timer #GstPoll objects created with
1710 * gst_poll_new().
1711 */
1712 void
gst_poll_set_flushing(GstPoll * set,gboolean flushing)1713 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1714 {
1715 g_return_if_fail (set != NULL);
1716 g_return_if_fail (!set->timer);
1717
1718 GST_LOG ("%p: flushing: %d", set, flushing);
1719
1720 /* update the new state first */
1721 SET_FLUSHING (set, flushing);
1722
1723 if (flushing && set->controllable && GET_WAITING (set) > 0) {
1724 /* we are flushing, controllable and waiting, wake up the waiter. When we
1725 * stop the flushing operation we don't clear the wakeup fd here, this will
1726 * happen in the _wait() thread. */
1727 raise_wakeup (set);
1728 }
1729 }
1730
1731 /**
1732 * gst_poll_write_control:
1733 * @set: a #GstPoll.
1734 *
1735 * Write a byte to the control socket of the controllable @set.
1736 * This function is mostly useful for timer #GstPoll objects created with
1737 * gst_poll_new_timer().
1738 *
1739 * It will make any current and future gst_poll_wait() function return with
1740 * 1, meaning the control socket is set. After an equal amount of calls to
1741 * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1742 * block again until their timeout expired.
1743 *
1744 * This function only works for timer #GstPoll objects created with
1745 * gst_poll_new_timer().
1746 *
1747 * Returns: %TRUE on success. %FALSE when when the byte could not be written.
1748 * errno contains the detailed error code but will never be EAGAIN, EINTR or
1749 * EWOULDBLOCK. %FALSE always signals a critical error.
1750 */
1751 gboolean
gst_poll_write_control(GstPoll * set)1752 gst_poll_write_control (GstPoll * set)
1753 {
1754 gboolean res;
1755
1756 g_return_val_if_fail (set != NULL, FALSE);
1757 g_return_val_if_fail (set->timer, FALSE);
1758
1759 res = raise_wakeup (set);
1760
1761 return res;
1762 }
1763
1764 /**
1765 * gst_poll_read_control:
1766 * @set: a #GstPoll.
1767 *
1768 * Read a byte from the control socket of the controllable @set.
1769 *
1770 * This function only works for timer #GstPoll objects created with
1771 * gst_poll_new_timer().
1772 *
1773 * Returns: %TRUE on success. %FALSE when when there was no byte to read or
1774 * reading the byte failed. If there was no byte to read, and only then, errno
1775 * will contain EWOULDBLOCK or EAGAIN. For all other values of errno this always signals a
1776 * critical error.
1777 */
1778 gboolean
gst_poll_read_control(GstPoll * set)1779 gst_poll_read_control (GstPoll * set)
1780 {
1781 gboolean res;
1782
1783 g_return_val_if_fail (set != NULL, FALSE);
1784 g_return_val_if_fail (set->timer, FALSE);
1785
1786 res = release_wakeup (set);
1787
1788 return res;
1789 }
1790