diff -r 000000000000 -r 0e761a78d257 gstreamer_core/gst/gstpoll.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gstreamer_core/gst/gstpoll.c Thu Dec 17 08:53:32 2009 +0200 @@ -0,0 +1,1509 @@ +/* GStreamer + * Copyright (C) 1999 Erik Walthinsen + * Copyright (C) 2004 Wim Taymans + * Copyright (C) 2007 Peter Kjellerstedt + * Copyright (C) 2008 Ole André Vadla Ravnås + * + * gstpoll.c: File descriptor set + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ +/** + * SECTION:gstpoll + * @short_description: Keep track of file descriptors and make it possible + * to wait on them in a cancelable way + * + * A #GstPoll keeps track of file descriptors much like fd_set (used with + * select()) or a struct pollfd array (used with poll()). Once created with + * gst_poll_new(), the set can be used to wait for file descriptors to be + * readable and/or writeable. It is possible to make this wait be controlled + * by specifying %TRUE for the @controllable flag when creating the set (or + * later calling gst_poll_set_controllable()). + * + * New file descriptors are added to the set using gst_poll_add_fd(), and + * removed using gst_poll_remove_fd(). Controlling which file descriptors + * should be waited for to become readable and/ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif +or writeable are done using + * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write(). + * + * Use gst_poll_wait() to wait for the file descriptors to actually become + * readable and/or writeable, or to timeout if no file descriptor is available + * in time. The wait can be controlled by calling gst_poll_restart() and + * gst_poll_set_flushing(). + * + * Once the file descriptor set has been waited for, one can use + * gst_poll_fd_has_closed() to see if the file descriptor has been closed, + * gst_poll_fd_has_error() to see if it has generated an error, + * gst_poll_fd_can_read() to see if it is possible to read from the file + * descriptor, and gst_poll_fd_can_write() to see if it is possible to + * write to it. + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#ifndef _MSC_VER +#define _GNU_SOURCE 1 +#ifndef __SYMBIAN32__ +#include +#endif +#include +#endif + +#include + +#ifdef HAVE_UNISTD_H +#include +#endif + +#include +#include + +#include + +#ifdef G_OS_WIN32 +#include +#define EINPROGRESS WSAEINPROGRESS +#else +#include +#endif + +/* OS/X needs this because of bad headers */ +#include + +#include "gst_private.h" + +#include "gstpoll.h" + +#ifdef __SYMBIAN32__ +#include +#include +#include +#include +#include + +#define POLLIN 1 +#define POLLOUT 4 +#define POLLPRI 2 +#define POLLHUP 6 +#define POLLERR 8 +#define POLLNVAL 32 + +#endif +#ifndef G_OS_WIN32 +/* the poll/select call is also performed on a control socket, that way + * we can send special commands to control it + */ +#define SEND_COMMAND(set, command) \ +G_STMT_START { \ + unsigned char c = command; \ + write (set->control_write_fd.fd, &c, 1); \ +} G_STMT_END + +#define READ_COMMAND(set, command, res) \ +G_STMT_START { \ + res = read (set->control_read_fd.fd, &command, 1); \ +} G_STMT_END + +#define GST_POLL_CMD_WAKEUP 'W' /* restart the poll/select call */ + +#else /* G_OS_WIN32 */ +typedef struct _WinsockFd WinsockFd; + +struct _WinsockFd +{ + gint fd; + glong event_mask; + WSANETWORKEVENTS events; + glong ignored_event_mask; +}; +#endif + +typedef enum +{ + GST_POLL_MODE_AUTO, + GST_POLL_MODE_SELECT, + GST_POLL_MODE_PSELECT, + GST_POLL_MODE_POLL, + GST_POLL_MODE_PPOLL, + GST_POLL_MODE_WINDOWS +} GstPollMode; + +struct _GstPoll +{ + GstPollMode mode; + + GMutex *lock; + + GArray *fds; + GArray *active_fds; +#ifndef G_OS_WIN32 + GstPollFD control_read_fd; + GstPollFD control_write_fd; +#else + GArray *active_fds_ignored; + + GArray *events; + GArray *active_events; + + HANDLE wakeup_event; +#endif + + gboolean controllable; + gboolean new_controllable; + gboolean waiting; + gboolean flushing; +}; + +static gint +find_index (GArray * array, GstPollFD * fd) +{ +#ifndef G_OS_WIN32 + //struct pollfd *ifd; +GstPollFD *ifd; + +#else + WinsockFd *ifd; +#endif + guint i; + + /* start by assuming the index found in the fd is still valid */ + if (fd->idx >= 0 && fd->idx < array->len) { +#ifndef G_OS_WIN32 +#ifndef __SYMBIAN32__ + ifd = &g_array_index (array, struct pollfd, fd->idx); +#else + ifd = &g_array_index (array, GstPollFD, fd->idx); +#endif +#else + ifd = &g_array_index (array, WinsockFd, fd->idx); +#endif + + if (ifd->fd == fd->fd) { + return fd->idx; + } + } + + /* the pollfd array has changed and we need to lookup the fd again */ + for (i = 0; i < array->len; i++) { +#ifndef G_OS_WIN32 +#ifndef __SYMBIAN32__ + ifd = &g_array_index (array, struct pollfd, i); +#else + ifd = &g_array_index (array, GstPollFD, i); +#endif +#else + ifd = &g_array_index (array, WinsockFd, i); +#endif + + if (ifd->fd == fd->fd) { + fd->idx = (gint) i; + return fd->idx; + } + } + + fd->idx = -1; + return fd->idx; +} + +#if !defined(HAVE_PPOLL) && defined(HAVE_POLL) +/* check if all file descriptors will fit in an fd_set */ +static gboolean +selectable_fds (const GstPoll * set) +{ + guint i; + + for (i = 0; i < set->fds->len; i++) { + struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i); + + if (pfd->fd >= FD_SETSIZE) + return FALSE; + } + + return TRUE; +} + +/* check if the timeout will convert to a timeout value used for poll() + * without a loss of precision + */ +static gboolean +pollable_timeout (GstClockTime timeout) +{ + if (timeout == GST_CLOCK_TIME_NONE) + return TRUE; + + /* not a nice multiple of milliseconds */ + if (timeout % 1000000) + return FALSE; + + return TRUE; +} +#endif + +static GstPollMode +choose_mode (const GstPoll * set, GstClockTime timeout) +{ + GstPollMode mode; + + if (set->mode == GST_POLL_MODE_AUTO) { +#ifdef HAVE_PPOLL + mode = GST_POLL_MODE_PPOLL; +#elif defined(HAVE_POLL) + if (!selectable_fds (set) || pollable_timeout (timeout)) { + mode = GST_POLL_MODE_POLL; + } else { +#ifdef HAVE_PSELECT + mode = GST_POLL_MODE_PSELECT; +#else + mode = GST_POLL_MODE_SELECT; +#endif + } +#elif defined(HAVE_PSELECT) + mode = GST_POLL_MODE_PSELECT; +#else + mode = GST_POLL_MODE_SELECT; +#endif + } else { + mode = set->mode; + } + return mode; +} + +#ifndef G_OS_WIN32 +static gint +pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds) +{ + gint max_fd = -1; + guint i; + + FD_ZERO (readfds); + FD_ZERO (writefds); + + g_mutex_lock (set->lock); + + for (i = 0; i < set->active_fds->len; i++) { +#ifndef __SYMBIAN32__ + struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i); +#else + GPollFD *pfd = &g_array_index (set->fds, GPollFD, i); +#endif + + if (pfd->fd < FD_SETSIZE) { + if (pfd->events & POLLIN) + FD_SET (pfd->fd, readfds); + if (pfd->events & POLLOUT) + FD_SET (pfd->fd, writefds); + if (pfd->fd > max_fd) + max_fd = pfd->fd; + } + } + + g_mutex_unlock (set->lock); + + + return max_fd; +} + +static void +fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds) +{ + guint i; + + g_mutex_lock (set->lock); + + for (i = 0; i < set->active_fds->len; i++) { + //rj struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i); + GPollFD *pfd = &g_array_index (set->active_fds, GPollFD, i); + + if (pfd->fd < FD_SETSIZE) { + if (FD_ISSET (pfd->fd, readfds)) + pfd->revents |= POLLIN; + if (FD_ISSET (pfd->fd, writefds)) + pfd->revents |= POLLOUT; + } + } + + g_mutex_unlock (set->lock); +} +#else /* G_OS_WIN32 */ +/* + * Translate errors thrown by the Winsock API used by GstPoll: + * WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents + */ +static gint +gst_poll_winsock_error_to_errno (DWORD last_error) +{ + switch (last_error) { + case WSA_INVALID_HANDLE: + case WSAEINVAL: + case WSAENOTSOCK: + return EBADF; + + case WSA_NOT_ENOUGH_MEMORY: + return ENOMEM; + + /* + * Anything else, including: + * WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN, + * WSANOTINITIALISED + */ + default: + return EINVAL; + } +} + +static void +gst_poll_free_winsock_event (GstPoll * set, gint idx) +{ + WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx); + HANDLE event = g_array_index (set->events, HANDLE, idx); + + WSAEventSelect (wfd->fd, event, 0); + CloseHandle (event); +} + +static void +gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags, + gboolean active) +{ + WinsockFd *wfd; + + wfd = &g_array_index (set->fds, WinsockFd, idx); + + if (active) + wfd->event_mask |= flags; + else + wfd->event_mask &= ~flags; + + /* reset ignored state if the new mask doesn't overlap at all */ + if ((wfd->ignored_event_mask & wfd->event_mask) == 0) + wfd->ignored_event_mask = 0; +} + +static gboolean +gst_poll_prepare_winsock_active_sets (GstPoll * set) +{ + guint i; + + g_array_set_size (set->active_fds, 0); + g_array_set_size (set->active_fds_ignored, 0); + g_array_set_size (set->active_events, 0); + g_array_append_val (set->active_events, set->wakeup_event); + + for (i = 0; i < set->fds->len; i++) { + WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i); + HANDLE event = g_array_index (set->events, HANDLE, i); + + if (wfd->ignored_event_mask == 0) { + gint ret; + + g_array_append_val (set->active_fds, *wfd); + g_array_append_val (set->active_events, event); + + ret = WSAEventSelect (wfd->fd, event, wfd->event_mask); + if (G_UNLIKELY (ret != 0)) { + errno = gst_poll_winsock_error_to_errno (WSAGetLastError ()); + return FALSE; + } + } else { + g_array_append_val (set->active_fds_ignored, wfd); + } + } + + return TRUE; +} + +static gint +gst_poll_collect_winsock_events (GstPoll * set) +{ + gint res, i; + + /* + * We need to check which events are signaled, and call + * WSAEnumNetworkEvents for those that are, which resets + * the event and clears the internal network event records. + */ + res = 0; + for (i = 0; i < set->active_fds->len; i++) { + WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i); + HANDLE event = g_array_index (set->active_events, HANDLE, i + 1); + DWORD wait_ret; + + wait_ret = WaitForSingleObject (event, 0); + if (wait_ret == WAIT_OBJECT_0) { + gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events); + + if (G_UNLIKELY (enum_ret != 0)) { + res = -1; + errno = gst_poll_winsock_error_to_errno (WSAGetLastError ()); + break; + } + + res++; + } else { + /* clear any previously stored result */ + memset (&wfd->events, 0, sizeof (wfd->events)); + } + } + + /* If all went well we also need to reset the ignored fds. */ + if (res >= 0) { + res += set->active_fds_ignored->len; + + for (i = 0; i < set->active_fds_ignored->len; i++) { + WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i); + + wfd->ignored_event_mask = 0; + } + + g_array_set_size (set->active_fds_ignored, 0); + } + + return res; +} +#endif + +/** + * gst_poll_new: + * @controllable: whether it should be possible to control a wait. + * + * Create a new file descriptor set. If @controllable, it + * is possible to restart or flush a call to gst_poll_wait() with + * gst_poll_restart() and gst_poll_set_flushing() respectively. + * + * Returns: a new #GstPoll, or %NULL in case of an error. Free with + * gst_poll_free(). + * + * Since: 0.10.18 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + + GstPoll *gst_poll_new (gboolean controllable) +{ + GstPoll *nset; + + nset = g_new0 (GstPoll, 1); + nset->lock = g_mutex_new (); +#ifndef G_OS_WIN32 + nset->mode = GST_POLL_MODE_AUTO; + //rj nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd)); + //rj nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd)); + nset->fds = g_array_new (FALSE, FALSE, sizeof (GstPollFD)); + nset->active_fds = g_array_new (FALSE, FALSE, sizeof (GstPollFD)); + nset->control_read_fd.fd = -1; + nset->control_write_fd.fd = -1; +#else + nset->mode = GST_POLL_MODE_WINDOWS; + nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd)); + nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd)); + nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *)); + nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE)); + nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE)); + + nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL); +#endif + + if (!gst_poll_set_controllable (nset, controllable)) + goto not_controllable; + + return nset; + + /* ERRORS */ +not_controllable: + { + gst_poll_free (nset); + return NULL; + } +} + +/** + * gst_poll_free: + * @set: a file descriptor set. + * + * Free a file descriptor set. + * + * Since: 0.10.18 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +void gst_poll_free (GstPoll * set) +{ + g_return_if_fail (set != NULL); + +#ifndef G_OS_WIN32 + if (set->control_write_fd.fd >= 0) + close (set->control_write_fd.fd); + if (set->control_read_fd.fd >= 0) + close (set->control_read_fd.fd); +#else + CloseHandle (set->wakeup_event); + + { + guint i; + + for (i = 0; i < set->events->len; i++) + gst_poll_free_winsock_event (set, i); + } + + g_array_free (set->active_events, TRUE); + g_array_free (set->events, TRUE); + g_array_free (set->active_fds_ignored, TRUE); +#endif + + g_array_free (set->active_fds, TRUE); + g_array_free (set->fds, TRUE); + g_mutex_free (set->lock); + g_free (set); +} + +/** + * gst_poll_fd_init: + * @fd: a #GstPollFD + * + * Initializes @fd. Alternatively you can initialize it with + * #GST_POLL_FD_INIT. + * + * Since: 0.10.18 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + + void gst_poll_fd_init (GstPollFD * fd) +{ + g_return_if_fail (fd != NULL); + + fd->fd = -1; + fd->idx = -1; +} + +static gboolean +gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd) +{ + gint idx; + + idx = find_index (set->fds, fd); + if (idx < 0) { +#ifndef G_OS_WIN32 + //rj struct pollfd nfd; + GPollFD nfd; + + nfd.fd = fd->fd; +#ifndef __SYMBIAN32__ + nfd.events = POLLERR | POLLNVAL | POLLHUP; +#else + nfd.events = POLLERR | POLLNVAL ; +#endif + nfd.revents = 0; + + g_array_append_val (set->fds, nfd); + + fd->idx = set->fds->len - 1; +#else + WinsockFd wfd; + HANDLE event; + + wfd.fd = fd->fd; + wfd.event_mask = FD_CLOSE; + memset (&wfd.events, 0, sizeof (wfd.events)); + wfd.ignored_event_mask = 0; + event = WSACreateEvent (); + + g_array_append_val (set->fds, wfd); + g_array_append_val (set->events, event); + + fd->idx = set->fds->len - 1; +#endif + } + + return TRUE; +} + +/** + * gst_poll_add_fd: + * @set: a file descriptor set. + * @fd: a file descriptor. + * + * Add a file descriptor to the file descriptor set. + * + * Returns: %TRUE if the file descriptor was successfully added to the set. + * + * Since: 0.10.18 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + + gboolean gst_poll_add_fd (GstPoll * set, GstPollFD * fd) +{ + gboolean ret; + + g_return_val_if_fail (set != NULL, FALSE); + g_return_val_if_fail (fd != NULL, FALSE); + g_return_val_if_fail (fd->fd >= 0, FALSE); + + g_mutex_lock (set->lock); + + ret = gst_poll_add_fd_unlocked (set, fd); + + g_mutex_unlock (set->lock); + + return ret; +} + +/** + * gst_poll_remove_fd: + * @set: a file descriptor set. + * @fd: a file descriptor. + * + * Remove a file descriptor from the file descriptor set. + * + * Returns: %TRUE if the file descriptor was successfully removed from the set. + * + * Since: 0.10.18 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +gboolean gst_poll_remove_fd (GstPoll * set, GstPollFD * fd) +{ + gint idx; + + g_return_val_if_fail (set != NULL, FALSE); + g_return_val_if_fail (fd != NULL, FALSE); + g_return_val_if_fail (fd->fd >= 0, FALSE); + + g_mutex_lock (set->lock); + + /* get the index, -1 is an fd that is not added */ + idx = find_index (set->fds, fd); + if (idx >= 0) { +#ifdef G_OS_WIN32 + gst_poll_free_winsock_event (set, idx); + g_array_remove_index_fast (set->events, idx); +#endif + + /* remove the fd at index, we use _remove_index_fast, which copies the last + * element of the array to the freed index */ + g_array_remove_index_fast (set->fds, idx); + + /* mark fd as removed by setting the index to -1 */ + fd->idx = -1; + } + + g_mutex_unlock (set->lock); + + return idx >= 0; +} + +/** + * gst_poll_fd_ctl_write: + * @set: a file descriptor set. + * @fd: a file descriptor. + * @active: a new status. + * + * Control whether the descriptor @fd in @set will be monitored for + * writability. + * + * Returns: %TRUE if the descriptor was successfully updated. + * + * Since: 0.10.18 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +gboolean gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active) +{ + gint idx; + + g_return_val_if_fail (set != NULL, FALSE); + g_return_val_if_fail (fd != NULL, FALSE); + g_return_val_if_fail (fd->fd >= 0, FALSE); + + g_mutex_lock (set->lock); + + idx = find_index (set->fds, fd); + if (idx >= 0) { +#ifndef G_OS_WIN32 + // rj struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx); + GPollFD *pfd = &g_array_index (set->fds, GPollFD, idx); + if (active) + pfd->events |= POLLOUT; + else + pfd->events &= ~POLLOUT; +#else + gst_poll_update_winsock_event_mask (set, idx, FD_WRITE, active); +#endif + } + + g_mutex_unlock (set->lock); + + return idx >= 0; +} + +static gboolean +gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active) +{ + gint idx; + + idx = find_index (set->fds, fd); + + if (idx >= 0) { +#ifndef G_OS_WIN32 + //rj struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx); + GPollFD *pfd = &g_array_index (set->fds, GPollFD, idx); + + + if (active) + { +#ifndef __SYMBIAN32__ + pfd->events |= (POLLIN | POLLPRI); +#else + pfd->events |= (POLLIN ); +#endif + } + else + pfd->events &= ~(POLLIN | POLLPRI); +#else + gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active); +#endif + } + + return idx >= 0; +} + +/** + * gst_poll_fd_ctl_read: + * @set: a file descriptor set. + * @fd: a file descriptor. + * @active: a new status. + * + * Control whether the descriptor @fd in @set will be monitored for + * readability. + * + * Returns: %TRUE if the descriptor was successfully updated. + * + * Since: 0.10.18 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + gboolean gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active) +{ + gboolean ret; + + g_return_val_if_fail (set != NULL, FALSE); + g_return_val_if_fail (fd != NULL, FALSE); + g_return_val_if_fail (fd->fd >= 0, FALSE); + + g_mutex_lock (set->lock); + + ret = gst_poll_fd_ctl_read_unlocked (set, fd, active); + + g_mutex_unlock (set->lock); + + return ret; +} + +/** + * gst_poll_fd_ignored: + * @set: a file descriptor set. + * @fd: a file descriptor. + * + * Mark @fd as ignored so that the next call to gst_poll_wait() will yield + * the same result for @fd as last time. This function must be called if no + * operation (read/write/recv/send/etc.) will be performed on @fd before + * the next call to gst_poll_wait(). + * + * The reason why this is needed is because the underlying implementation + * might not allow querying the fd more than once between calls to one of + * the re-enabling operations. + * + * Since: 0.10.18 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + + void gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd) +{ +#ifdef G_OS_WIN32 + gint idx; + + g_return_if_fail (set != NULL); + g_return_if_fail (fd != NULL); + g_return_if_fail (fd->fd >= 0); + + g_mutex_lock (set->lock); + + idx = find_index (set->fds, fd); + if (idx >= 0) { + WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx); + + wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE); + } + + g_mutex_unlock (set->lock); +#endif +} + +/** + * gst_poll_fd_has_closed: + * @set: a file descriptor set. + * @fd: a file descriptor. + * + * Check if @fd in @set has closed the connection. + * + * Returns: %TRUE if the connection was closed. + * + * Since: 0.10.18 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + + gboolean gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd) +{ + gboolean res = FALSE; + gint idx; + + g_return_val_if_fail (set != NULL, FALSE); + g_return_val_if_fail (fd != NULL, FALSE); + g_return_val_if_fail (fd->fd >= 0, FALSE); + + g_mutex_lock (set->lock); + + idx = find_index (set->active_fds, fd); + if (idx >= 0) { +#ifndef G_OS_WIN32 + //rj struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx); + GPollFD *pfd = &g_array_index (set->active_fds, GPollFD, idx); + #ifndef __SYMBIAN32__ + res = (pfd->revents & POLLHUP) != 0; + #else + res = (pfd->revents & POLLHUP) == POLLHUP; + #endif + +#else + WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx); + + res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0; +#endif + } + + g_mutex_unlock (set->lock); + + return res; +} + +/** + * gst_poll_fd_has_error: + * @set: a file descriptor set. + * @fd: a file descriptor. + * + * Check if @fd in @set has an error. + * + * Returns: %TRUE if the descriptor has an error. + * + * Since: 0.10.18 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +gboolean gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd) +{ + gboolean res = FALSE; + gint idx; + + g_return_val_if_fail (set != NULL, FALSE); + g_return_val_if_fail (fd != NULL, FALSE); + g_return_val_if_fail (fd->fd >= 0, FALSE); + + g_mutex_lock (set->lock); + + idx = find_index (set->active_fds, fd); + if (idx >= 0) { +#ifndef G_OS_WIN32 + //rj struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx); + GPollFD *pfd = &g_array_index (set->active_fds, GPollFD, idx); + + res = (pfd->revents & (POLLERR | POLLNVAL)) != 0; +#else + WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx); + + res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) || + (wfd->events.iErrorCode[FD_READ_BIT] != 0) || + (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) || + (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0); +#endif + } + + g_mutex_unlock (set->lock); + + return res; +} + +static gboolean +gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd) +{ + gboolean res = FALSE; + gint idx; + + idx = find_index (set->active_fds, fd); + if (idx >= 0) { +#ifndef G_OS_WIN32 + //rj struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx); + GPollFD *pfd = &g_array_index (set->active_fds, GPollFD, idx); + + res = (pfd->revents & (POLLIN | POLLPRI)) != 0; +#else + WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx); + + res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0; +#endif + } + + return res; +} + +/** + * gst_poll_fd_can_read: + * @set: a file descriptor set. + * @fd: a file descriptor. + * + * Check if @fd in @set has data to be read. + * + * Returns: %TRUE if the descriptor has data to be read. + * + * Since: 0.10.18 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +gboolean gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd) +{ + gboolean res = FALSE; + + g_return_val_if_fail (set != NULL, FALSE); + g_return_val_if_fail (fd != NULL, FALSE); + g_return_val_if_fail (fd->fd >= 0, FALSE); + + g_mutex_lock (set->lock); + + res = gst_poll_fd_can_read_unlocked (set, fd); + + g_mutex_unlock (set->lock); + + return res; +} + +/** + * gst_poll_fd_can_write: + * @set: a file descriptor set. + * @fd: a file descriptor. + * + * Check if @fd in @set can be used for writing. + * + * Returns: %TRUE if the descriptor can be used for writing. + * + * Since: 0.10.18 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +gboolean gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd) +{ + gboolean res = FALSE; + gint idx; + + g_return_val_if_fail (set != NULL, FALSE); + g_return_val_if_fail (fd != NULL, FALSE); + g_return_val_if_fail (fd->fd >= 0, FALSE); + + g_mutex_lock (set->lock); + + idx = find_index (set->active_fds, fd); + if (idx >= 0) { +#ifndef G_OS_WIN32 + //rj struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx); + GPollFD *pfd = &g_array_index (set->active_fds, GPollFD, idx); + + //rj res = (pfd->revents & POLLOUT) != 0; + res = (pfd->revents & POLLOUT) != 0; +#else + WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx); + + res = (wfd->events.lNetworkEvents & FD_WRITE) != 0; +#endif + } + + g_mutex_unlock (set->lock); + + return res; +} + +static void +gst_poll_check_ctrl_commands (GstPoll * set, gint res, gboolean * restarting) +{ + /* check if the poll/select was aborted due to a command */ + if (set->controllable) { +#ifndef G_OS_WIN32 + while (TRUE) { + guchar cmd; + gint result; + + /* we do not check the read status of the control socket here because + * there may have been a write to the socket between the time the + * poll/select finished and before we got the mutex back, and we need + * to clear out the control socket before leaving */ + READ_COMMAND (set, cmd, result); + if (result <= 0) { + /* no more commands, quit the loop */ + break; + } + + /* if the control socket is the only socket with activity when we get + * here, we restart the _wait operation, else we allow the caller to + * process the other file descriptors */ + if (res == 1 && + gst_poll_fd_can_read_unlocked (set, &set->control_read_fd)) + *restarting = TRUE; + } +#else + if (WaitForSingleObject (set->wakeup_event, 0) == WAIT_OBJECT_0) { + ResetEvent (set->wakeup_event); + *restarting = TRUE; + } +#endif + } +} + +/** + * gst_poll_wait: + * @set: a #GstPoll. + * @timeout: a timeout in nanoseconds. + * + * Wait for activity on the file descriptors in @set. This function waits up to + * the specified @timeout. A timeout of #GST_CLOCK_TIME_NONE waits forever. + * + * When this function is called from multiple threads, -1 will be returned with + * errno set to EPERM. + * + * Returns: The number of #GstPollFD in @set that have activity or 0 when no + * activity was detected after @timeout. If an error occurs, -1 is returned + * and errno is set. + * + * Since: 0.10.18 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +gint gst_poll_wait (GstPoll * set, GstClockTime timeout) +{ + gboolean restarting; + int res = -1; + + g_return_val_if_fail (set != NULL, -1); + + g_mutex_lock (set->lock); + + /* we cannot wait from multiple threads */ + if (set->waiting) + goto already_waiting; + + /* flushing, exit immediatly */ + if (set->flushing) + goto flushing; + + set->waiting = TRUE; + + do { + GstPollMode mode; + + res = -1; + restarting = FALSE; + + mode = choose_mode (set, timeout); + +#ifndef G_OS_WIN32 + g_array_set_size (set->active_fds, set->fds->len); + //rj memcpy (set->active_fds->data, set->fds->data, + // set->fds->len * sizeof (struct pollfd)); + memcpy (set->active_fds->data, set->fds->data, + set->fds->len * sizeof (GstPollFD)); +#else + if (!gst_poll_prepare_winsock_active_sets (set)) + goto winsock_error; +#endif + + g_mutex_unlock (set->lock); + + switch (mode) { + case GST_POLL_MODE_AUTO: + g_assert_not_reached (); + break; + case GST_POLL_MODE_PPOLL: + { +#ifdef HAVE_PPOLL + struct timespec ts; + struct timespec *tsptr; + + if (timeout != GST_CLOCK_TIME_NONE) { + GST_TIME_TO_TIMESPEC (timeout, ts); + tsptr = &ts; + } else { + tsptr = NULL; + } + + res = + ppoll ((struct pollfd *) set->active_fds->data, + set->active_fds->len, tsptr, NULL); +#else + g_assert_not_reached (); + errno = ENOSYS; +#endif + break; + } + case GST_POLL_MODE_POLL: + { +#ifdef HAVE_POLL + gint t; + + if (timeout != GST_CLOCK_TIME_NONE) { + t = GST_TIME_AS_MSECONDS (timeout); + } else { + t = -1; + } + + res = + poll ((struct pollfd *) set->active_fds->data, + set->active_fds->len, t); +#else + g_assert_not_reached (); + errno = ENOSYS; +#endif + break; + } + case GST_POLL_MODE_PSELECT: +#ifndef HAVE_PSELECT + { + g_assert_not_reached (); + errno = ENOSYS; + break; + } +#endif + case GST_POLL_MODE_SELECT: + { +#ifndef G_OS_WIN32 + fd_set readfds; + fd_set writefds; + fd_set excepfds; + gint max_fd; + + max_fd = pollfd_to_fd_set (set, &readfds, &writefds); + + if (mode == GST_POLL_MODE_SELECT) { + struct timeval tv; + struct timeval *tvptr; + + if (timeout != GST_CLOCK_TIME_NONE) { + GST_TIME_TO_TIMEVAL (timeout, tv); + tvptr = &tv; + } else { + tvptr = NULL; + } +//temporary fix for multifdsink + FD_ZERO(&excepfds); + + if( max_fd != -1) + { + if( ! FD_ISSET(max_fd,&readfds)) + { + if( ! FD_ISSET(max_fd,&writefds)) + { + FD_SET(max_fd,&excepfds); + } + } + } + + /* + if( max_fd != -1) + FD_SET(max_fd,&excepfds); + */ +#ifdef __SYMBIAN32__ + if( max_fd == -1 && !tvptr); + else + res = select (max_fd + 1, &readfds, &writefds, &excepfds, tvptr); +#else + res = select (max_fd + 1, &readfds, &writefds, NULL, tvptr); +#endif + } else { +#ifdef HAVE_PSELECT + struct timespec ts; + struct timespec *tsptr; + + if (timeout != GST_CLOCK_TIME_NONE) { + GST_TIME_TO_TIMESPEC (timeout, ts); + tsptr = &ts; + } else { + tsptr = NULL; + } + + res = pselect (max_fd + 1, &readfds, &writefds, NULL, tsptr, NULL); +#endif + } + + if (res > 0) { + fd_set_to_pollfd (set, &readfds, &writefds); + } +#else /* G_OS_WIN32 */ + g_assert_not_reached (); + errno = ENOSYS; +#endif + break; + } + case GST_POLL_MODE_WINDOWS: + { +#ifdef G_OS_WIN32 + gint ignore_count = set->active_fds_ignored->len; + DWORD t, wait_ret; + + if (G_LIKELY (ignore_count == 0)) { + if (timeout != GST_CLOCK_TIME_NONE) + t = GST_TIME_AS_MSECONDS (timeout); + else + t = INFINITE; + } else { + /* already one or more ignored fds, so we quickly sweep the others */ + t = 0; + } + + wait_ret = WSAWaitForMultipleEvents (set->active_events->len, + (HANDLE *) set->active_events->data, FALSE, t, FALSE); + + if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) { + res = 0; + } else if (wait_ret == WSA_WAIT_FAILED) { + res = -1; + errno = gst_poll_winsock_error_to_errno (WSAGetLastError ()); + } else { + /* the first entry is the wakeup event */ + if (wait_ret - WSA_WAIT_EVENT_0 >= 1) { + res = gst_poll_collect_winsock_events (set); + } else { + res = 1; /* wakeup event */ + } + } +#else + g_assert_not_reached (); + errno = ENOSYS; +#endif + break; + } + } + + g_mutex_lock (set->lock); + + gst_poll_check_ctrl_commands (set, res, &restarting); + + /* update the controllable state if needed */ + set->controllable = set->new_controllable; + + if (set->flushing) { + /* we got woken up and we are flushing, we need to stop */ + errno = EBUSY; + res = -1; + break; + } + } while (restarting); + + set->waiting = FALSE; + + g_mutex_unlock (set->lock); + + return res; + + /* ERRORS */ +already_waiting: + { + g_mutex_unlock (set->lock); + errno = EPERM; + return -1; + } +flushing: + { + g_mutex_unlock (set->lock); + errno = EBUSY; + return -1; + } +#ifdef G_OS_WIN32 +winsock_error: + { + g_mutex_unlock (set->lock); + return -1; + } +#endif +} + +/** + * gst_poll_set_controllable: + * @set: a #GstPoll. + * @controllable: new controllable state. + * + * When @controllable is %TRUE, this function ensures that future calls to + * gst_poll_wait() will be affected by gst_poll_restart() and + * gst_poll_set_flushing(). + * + * Returns: %TRUE if the controllability of @set could be updated. + * + * Since: 0.10.18 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +gboolean gst_poll_set_controllable (GstPoll * set, gboolean controllable) +{ + g_return_val_if_fail (set != NULL, FALSE); + + g_mutex_lock (set->lock); + +#ifndef G_OS_WIN32 + if (controllable && set->control_read_fd.fd < 0) { + gint control_sock[2]; +/* rj + if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0) + goto no_socket_pair; +*/ + fcntl (control_sock[0], F_SETFL, O_NONBLOCK); + fcntl (control_sock[1], F_SETFL, O_NONBLOCK); + + set->control_read_fd.fd = control_sock[0]; + set->control_write_fd.fd = control_sock[1]; + + gst_poll_add_fd_unlocked (set, &set->control_read_fd); + } + + if (set->control_read_fd.fd >= 0) + gst_poll_fd_ctl_read_unlocked (set, &set->control_read_fd, controllable); +#endif + + /* delay the change of the controllable state if we are waiting */ + set->new_controllable = controllable; + if (!set->waiting) + set->controllable = controllable; + + g_mutex_unlock (set->lock); + + return TRUE; + + /* ERRORS */ +#ifndef G_OS_WIN32 +no_socket_pair: + { + g_mutex_unlock (set->lock); + return FALSE; + } +#endif +} + +/** + * gst_poll_restart: + * @set: a #GstPoll. + * + * Restart any gst_poll_wait() that is in progress. This function is typically + * used after adding or removing descriptors to @set. + * + * If @set is not controllable, then this call will have no effect. + * + * Since: 0.10.18 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +void gst_poll_restart (GstPoll * set) +{ + g_return_if_fail (set != NULL); + + g_mutex_lock (set->lock); + + if (set->controllable && set->waiting) { +#ifndef G_OS_WIN32 + /* if we are waiting, we can send the command, else we do not have to + * bother, future calls will automatically pick up the new fdset */ + SEND_COMMAND (set, GST_POLL_CMD_WAKEUP); +#else + SetEvent (set->wakeup_event); +#endif + } + + g_mutex_unlock (set->lock); +} + +/** + * gst_poll_set_flushing: + * @set: a #GstPoll. + * @flushing: new flushing state. + * + * When @flushing is %TRUE, this function ensures that current and future calls + * to gst_poll_wait() will return -1, with errno set to EBUSY. + * + * Unsetting the flushing state will restore normal operation of @set. + * + * Since: 0.10.18 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +void gst_poll_set_flushing (GstPoll * set, gboolean flushing) +{ + g_return_if_fail (set != NULL); + + g_mutex_lock (set->lock); + + /* update the new state first */ + set->flushing = flushing; + + if (flushing && set->controllable && set->waiting) { + /* we are flushing, controllable and waiting, wake up the waiter. When we + * stop the flushing operation we don't clear the wakeup fd here, this will + * happen in the _wait() thread. */ +#ifndef G_OS_WIN32 + SEND_COMMAND (set, GST_POLL_CMD_WAKEUP); +#else + SetEvent (set->wakeup_event); +#endif + } + + g_mutex_unlock (set->lock); +}