winlin

research st, remove all events except epoll.

... ... @@ -55,21 +55,6 @@ DEFINES = -D$(OS) -DDEBUG -DMD_HAVE_EPOLL -DMALLOC_STACK
##########################
# Other possible defines:
# To use poll(2) instead of select(2) for events checking:
# DEFINES += -DUSE_POLL
# You may prefer to use select for applications that have many threads
# using one file descriptor, and poll for applications that have many
# different file descriptors. With USE_POLL poll() is called with at
# least one pollfd per I/O-blocked thread, so 1000 threads sharing one
# descriptor will poll 1000 identical pollfds and select would be more
# efficient. But if the threads all use different descriptors poll()
# may be better depending on your operating system's implementation of
# poll and select. Really, it's up to you. Oh, and on some platforms
# poll() fails with more than a few dozen descriptors.
#
# Some platforms allow to define FD_SETSIZE (if select() is used), e.g.:
# DEFINES += -DFD_SETSIZE=4096
#
# To use malloc(3) instead of mmap(2) for stack allocation:
# DEFINES += -DMALLOC_STACK
#
... ... @@ -77,25 +62,9 @@ DEFINES = -D$(OS) -DDEBUG -DMD_HAVE_EPOLL -DMALLOC_STACK
# (but not too many!):
# DEFINES += -DST_KEYS_MAX=<n>
#
# To start with more than the default 64 initial pollfd slots
# (but the table grows dynamically anyway):
# DEFINES += -DST_MIN_POLLFDS_SIZE=<n>
#
# Note that you can also add these defines by specifying them as
# make/gmake arguments (without editing this Makefile). For example:
#
# make EXTRA_CFLAGS=-DUSE_POLL <target>
#
# (replace make with gmake if needed).
#
# You can also modify the default selection of an alternative event
# notification mechanism. E.g., to enable kqueue(2) support (if it's not
# enabled by default):
#
# gmake EXTRA_CFLAGS=-DMD_HAVE_KQUEUE <target>
#
# or to disable default epoll(4) support:
#
# make EXTRA_CFLAGS=-UMD_HAVE_EPOLL <target>
#
##########################
... ...
... ... @@ -40,80 +40,21 @@
#include <errno.h>
#include "common.h"
#ifdef MD_HAVE_KQUEUE
#include <sys/event.h>
#endif
#ifdef MD_HAVE_EPOLL
#include <sys/epoll.h>
#ifdef USE_POLL
#error "Not support USE_POLL"
#endif
#if defined(USE_POLL) && !defined(MD_HAVE_POLL)
/* Force poll usage if explicitly asked for it */
#define MD_HAVE_POLL
#ifdef MD_HAVE_KQUEUE
#error "Not support MD_HAVE_KQUEUE"
#endif
static struct _st_seldata {
fd_set fd_read_set, fd_write_set, fd_exception_set;
int fd_ref_cnts[FD_SETSIZE][3];
int maxfd;
} *_st_select_data;
#define _ST_SELECT_MAX_OSFD (_st_select_data->maxfd)
#define _ST_SELECT_READ_SET (_st_select_data->fd_read_set)
#define _ST_SELECT_WRITE_SET (_st_select_data->fd_write_set)
#define _ST_SELECT_EXCEP_SET (_st_select_data->fd_exception_set)
#define _ST_SELECT_READ_CNT(fd) (_st_select_data->fd_ref_cnts[fd][0])
#define _ST_SELECT_WRITE_CNT(fd) (_st_select_data->fd_ref_cnts[fd][1])
#define _ST_SELECT_EXCEP_CNT(fd) (_st_select_data->fd_ref_cnts[fd][2])
#ifdef MD_HAVE_POLL
static struct _st_polldata {
struct pollfd *pollfds;
int pollfds_size;
int fdcnt;
} *_st_poll_data;
#define _ST_POLL_OSFD_CNT (_st_poll_data->fdcnt)
#define _ST_POLLFDS (_st_poll_data->pollfds)
#define _ST_POLLFDS_SIZE (_st_poll_data->pollfds_size)
#endif /* MD_HAVE_POLL */
#ifdef MD_HAVE_KQUEUE
typedef struct _kq_fd_data {
int rd_ref_cnt;
int wr_ref_cnt;
int revents;
} _kq_fd_data_t;
static struct _st_kqdata {
_kq_fd_data_t *fd_data;
struct kevent *evtlist;
struct kevent *addlist;
struct kevent *dellist;
int fd_data_size;
int evtlist_size;
int addlist_size;
int addlist_cnt;
int dellist_size;
int dellist_cnt;
int kq;
pid_t pid;
} *_st_kq_data;
#ifndef ST_KQ_MIN_EVTLIST_SIZE
#define ST_KQ_MIN_EVTLIST_SIZE 64
#error "Not support MD_HAVE_POLL"
#endif
#ifndef MD_HAVE_EPOLL
#error "Only support MD_HAVE_EPOLL"
#endif
#define _ST_KQ_READ_CNT(fd) (_st_kq_data->fd_data[fd].rd_ref_cnt)
#define _ST_KQ_WRITE_CNT(fd) (_st_kq_data->fd_data[fd].wr_ref_cnt)
#define _ST_KQ_REVENTS(fd) (_st_kq_data->fd_data[fd].revents)
#endif /* MD_HAVE_KQUEUE */
#include <sys/epoll.h>
#ifdef MD_HAVE_EPOLL
typedef struct _epoll_fd_data {
int rd_ref_cnt;
int wr_ref_cnt;
... ... @@ -148,878 +89,9 @@ static struct _st_epolldata {
#define _ST_EPOLL_EVENTS(fd) \
(_ST_EPOLL_READ_BIT(fd)|_ST_EPOLL_WRITE_BIT(fd)|_ST_EPOLL_EXCEP_BIT(fd))
#endif /* MD_HAVE_EPOLL */
_st_eventsys_t *_st_eventsys = NULL;
/*****************************************
* select event system
*/
ST_HIDDEN int _st_select_init(void)
{
_st_select_data = (struct _st_seldata *) malloc(sizeof(*_st_select_data));
if (!_st_select_data) {
return -1;
}
memset(_st_select_data, 0, sizeof(*_st_select_data));
_st_select_data->maxfd = -1;
return 0;
}
ST_HIDDEN int _st_select_pollset_add(struct pollfd *pds, int npds)
{
struct pollfd *pd;
struct pollfd *epd = pds + npds;
/* Do checks up front */
for (pd = pds; pd < epd; pd++) {
if (pd->fd < 0 || pd->fd >= FD_SETSIZE || !pd->events || (pd->events & ~(POLLIN | POLLOUT | POLLPRI))) {
errno = EINVAL;
return -1;
}
}
for (pd = pds; pd < epd; pd++) {
if (pd->events & POLLIN) {
FD_SET(pd->fd, &_ST_SELECT_READ_SET);
_ST_SELECT_READ_CNT(pd->fd)++;
}
if (pd->events & POLLOUT) {
FD_SET(pd->fd, &_ST_SELECT_WRITE_SET);
_ST_SELECT_WRITE_CNT(pd->fd)++;
}
if (pd->events & POLLPRI) {
FD_SET(pd->fd, &_ST_SELECT_EXCEP_SET);
_ST_SELECT_EXCEP_CNT(pd->fd)++;
}
if (_ST_SELECT_MAX_OSFD < pd->fd)
_ST_SELECT_MAX_OSFD = pd->fd;
}
return 0;
}
ST_HIDDEN void _st_select_pollset_del(struct pollfd *pds, int npds)
{
struct pollfd *pd;
struct pollfd *epd = pds + npds;
for (pd = pds; pd < epd; pd++) {
if (pd->events & POLLIN) {
if (--_ST_SELECT_READ_CNT(pd->fd) == 0) {
FD_CLR(pd->fd, &_ST_SELECT_READ_SET);
}
}
if (pd->events & POLLOUT) {
if (--_ST_SELECT_WRITE_CNT(pd->fd) == 0) {
FD_CLR(pd->fd, &_ST_SELECT_WRITE_SET);
}
}
if (pd->events & POLLPRI) {
if (--_ST_SELECT_EXCEP_CNT(pd->fd) == 0) {
FD_CLR(pd->fd, &_ST_SELECT_EXCEP_SET);
}
}
}
}
ST_HIDDEN void _st_select_find_bad_fd(void)
{
_st_clist_t *q;
_st_pollq_t *pq;
int notify;
struct pollfd *pds, *epds;
int pq_max_osfd, osfd;
short events;
_ST_SELECT_MAX_OSFD = -1;
for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) {
pq = _ST_POLLQUEUE_PTR(q);
notify = 0;
epds = pq->pds + pq->npds;
pq_max_osfd = -1;
for (pds = pq->pds; pds < epds; pds++) {
osfd = pds->fd;
pds->revents = 0;
if (pds->events == 0) {
continue;
}
if (fcntl(osfd, F_GETFL, 0) < 0) {
pds->revents = POLLNVAL;
notify = 1;
}
if (osfd > pq_max_osfd) {
pq_max_osfd = osfd;
}
}
if (notify) {
ST_REMOVE_LINK(&pq->links);
pq->on_ioq = 0;
/*
* Decrement the count of descriptors for each descriptor/event
* because this I/O request is being removed from the ioq
*/
for (pds = pq->pds; pds < epds; pds++) {
osfd = pds->fd;
events = pds->events;
if (events & POLLIN) {
if (--_ST_SELECT_READ_CNT(osfd) == 0) {
FD_CLR(osfd, &_ST_SELECT_READ_SET);
}
}
if (events & POLLOUT) {
if (--_ST_SELECT_WRITE_CNT(osfd) == 0) {
FD_CLR(osfd, &_ST_SELECT_WRITE_SET);
}
}
if (events & POLLPRI) {
if (--_ST_SELECT_EXCEP_CNT(osfd) == 0) {
FD_CLR(osfd, &_ST_SELECT_EXCEP_SET);
}
}
}
if (pq->thread->flags & _ST_FL_ON_SLEEPQ) {
_ST_DEL_SLEEPQ(pq->thread);
}
pq->thread->state = _ST_ST_RUNNABLE;
_ST_ADD_RUNQ(pq->thread);
} else {
if (_ST_SELECT_MAX_OSFD < pq_max_osfd) {
_ST_SELECT_MAX_OSFD = pq_max_osfd;
}
}
}
}
ST_HIDDEN void _st_select_dispatch(void)
{
struct timeval timeout, *tvp;
fd_set r, w, e;
fd_set *rp, *wp, *ep;
int nfd, pq_max_osfd, osfd;
_st_clist_t *q;
st_utime_t min_timeout;
_st_pollq_t *pq;
int notify;
struct pollfd *pds, *epds;
short events, revents;
/*
* Assignment of fd_sets
*/
r = _ST_SELECT_READ_SET;
w = _ST_SELECT_WRITE_SET;
e = _ST_SELECT_EXCEP_SET;
rp = &r;
wp = &w;
ep = &e;
if (_ST_SLEEPQ == NULL) {
tvp = NULL;
} else {
min_timeout = (_ST_SLEEPQ->due <= _ST_LAST_CLOCK) ? 0 : (_ST_SLEEPQ->due - _ST_LAST_CLOCK);
timeout.tv_sec = (int) (min_timeout / 1000000);
timeout.tv_usec = (int) (min_timeout % 1000000);
tvp = &timeout;
}
/* Check for I/O operations */
nfd = select(_ST_SELECT_MAX_OSFD + 1, rp, wp, ep, tvp);
/* Notify threads that are associated with the selected descriptors */
if (nfd > 0) {
_ST_SELECT_MAX_OSFD = -1;
for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) {
pq = _ST_POLLQUEUE_PTR(q);
notify = 0;
epds = pq->pds + pq->npds;
pq_max_osfd = -1;
for (pds = pq->pds; pds < epds; pds++) {
osfd = pds->fd;
events = pds->events;
revents = 0;
if ((events & POLLIN) && FD_ISSET(osfd, rp)) {
revents |= POLLIN;
}
if ((events & POLLOUT) && FD_ISSET(osfd, wp)) {
revents |= POLLOUT;
}
if ((events & POLLPRI) && FD_ISSET(osfd, ep)) {
revents |= POLLPRI;
}
pds->revents = revents;
if (revents) {
notify = 1;
}
if (osfd > pq_max_osfd) {
pq_max_osfd = osfd;
}
}
if (notify) {
ST_REMOVE_LINK(&pq->links);
pq->on_ioq = 0;
/*
* Decrement the count of descriptors for each descriptor/event
* because this I/O request is being removed from the ioq
*/
for (pds = pq->pds; pds < epds; pds++) {
osfd = pds->fd;
events = pds->events;
if (events & POLLIN) {
if (--_ST_SELECT_READ_CNT(osfd) == 0) {
FD_CLR(osfd, &_ST_SELECT_READ_SET);
}
}
if (events & POLLOUT) {
if (--_ST_SELECT_WRITE_CNT(osfd) == 0) {
FD_CLR(osfd, &_ST_SELECT_WRITE_SET);
}
}
if (events & POLLPRI) {
if (--_ST_SELECT_EXCEP_CNT(osfd) == 0) {
FD_CLR(osfd, &_ST_SELECT_EXCEP_SET);
}
}
}
if (pq->thread->flags & _ST_FL_ON_SLEEPQ) {
_ST_DEL_SLEEPQ(pq->thread);
}
pq->thread->state = _ST_ST_RUNNABLE;
_ST_ADD_RUNQ(pq->thread);
} else {
if (_ST_SELECT_MAX_OSFD < pq_max_osfd) {
_ST_SELECT_MAX_OSFD = pq_max_osfd;
}
}
}
} else if (nfd < 0) {
/*
* It can happen when a thread closes file descriptor
* that is being used by some other thread -- BAD!
*/
if (errno == EBADF) {
_st_select_find_bad_fd();
}
}
}
ST_HIDDEN int _st_select_fd_new(int osfd)
{
if (osfd >= FD_SETSIZE) {
errno = EMFILE;
return -1;
}
return 0;
}
ST_HIDDEN int _st_select_fd_close(int osfd)
{
if (_ST_SELECT_READ_CNT(osfd) || _ST_SELECT_WRITE_CNT(osfd) || _ST_SELECT_EXCEP_CNT(osfd)) {
errno = EBUSY;
return -1;
}
return 0;
}
ST_HIDDEN int _st_select_fd_getlimit(void)
{
return FD_SETSIZE;
}
static _st_eventsys_t _st_select_eventsys = {
"select",
ST_EVENTSYS_SELECT,
_st_select_init,
_st_select_dispatch,
_st_select_pollset_add,
_st_select_pollset_del,
_st_select_fd_new,
_st_select_fd_close,
_st_select_fd_getlimit
};
#ifdef MD_HAVE_POLL
/*****************************************
* poll event system
*/
ST_HIDDEN int _st_poll_init(void)
{
_st_poll_data = (struct _st_polldata *) malloc(sizeof(*_st_poll_data));
if (!_st_poll_data) {
return -1;
}
_ST_POLLFDS = (struct pollfd *) malloc(ST_MIN_POLLFDS_SIZE * sizeof(struct pollfd));
if (!_ST_POLLFDS) {
free(_st_poll_data);
_st_poll_data = NULL;
return -1;
}
_ST_POLLFDS_SIZE = ST_MIN_POLLFDS_SIZE;
_ST_POLL_OSFD_CNT = 0;
return 0;
}
ST_HIDDEN int _st_poll_pollset_add(struct pollfd *pds, int npds)
{
struct pollfd *pd;
struct pollfd *epd = pds + npds;
for (pd = pds; pd < epd; pd++) {
if (pd->fd < 0 || !pd->events) {
errno = EINVAL;
return -1;
}
}
_ST_POLL_OSFD_CNT += npds;
return 0;
}
/* ARGSUSED */
ST_HIDDEN void _st_poll_pollset_del(struct pollfd *pds, int npds)
{
_ST_POLL_OSFD_CNT -= npds;
ST_ASSERT(_ST_POLL_OSFD_CNT >= 0);
}
ST_HIDDEN void _st_poll_dispatch(void)
{
int timeout, nfd;
_st_clist_t *q;
st_utime_t min_timeout;
_st_pollq_t *pq;
struct pollfd *pds, *epds, *pollfds;
/*
* Build up the array of struct pollfd to wait on.
* If existing array is not big enough, release it and allocate a new one.
*/
ST_ASSERT(_ST_POLL_OSFD_CNT >= 0);
if (_ST_POLL_OSFD_CNT > _ST_POLLFDS_SIZE) {
free(_ST_POLLFDS);
_ST_POLLFDS = (struct pollfd *) malloc((_ST_POLL_OSFD_CNT + 10) * sizeof(struct pollfd));
ST_ASSERT(_ST_POLLFDS != NULL);
_ST_POLLFDS_SIZE = _ST_POLL_OSFD_CNT + 10;
}
pollfds = _ST_POLLFDS;
/* Gather all descriptors into one array */
for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) {
pq = _ST_POLLQUEUE_PTR(q);
memcpy(pollfds, pq->pds, sizeof(struct pollfd) * pq->npds);
pollfds += pq->npds;
}
ST_ASSERT(pollfds <= _ST_POLLFDS + _ST_POLLFDS_SIZE);
if (_ST_SLEEPQ == NULL) {
timeout = -1;
} else {
min_timeout = (_ST_SLEEPQ->due <= _ST_LAST_CLOCK) ? 0 : (_ST_SLEEPQ->due - _ST_LAST_CLOCK);
timeout = (int) (min_timeout / 1000);
}
/* Check for I/O operations */
nfd = poll(_ST_POLLFDS, _ST_POLL_OSFD_CNT, timeout);
/* Notify threads that are associated with the selected descriptors */
if (nfd > 0) {
pollfds = _ST_POLLFDS;
for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) {
pq = _ST_POLLQUEUE_PTR(q);
epds = pollfds + pq->npds;
for (pds = pollfds; pds < epds; pds++) {
if (pds->revents) {
break;
}
}
if (pds < epds) {
memcpy(pq->pds, pollfds, sizeof(struct pollfd) * pq->npds);
ST_REMOVE_LINK(&pq->links);
pq->on_ioq = 0;
if (pq->thread->flags & _ST_FL_ON_SLEEPQ) {
_ST_DEL_SLEEPQ(pq->thread);
}
pq->thread->state = _ST_ST_RUNNABLE;
_ST_ADD_RUNQ(pq->thread);
_ST_POLL_OSFD_CNT -= pq->npds;
ST_ASSERT(_ST_POLL_OSFD_CNT >= 0);
}
pollfds = epds;
}
}
}
/* ARGSUSED */
ST_HIDDEN int _st_poll_fd_new(int osfd)
{
return 0;
}
/* ARGSUSED */
ST_HIDDEN int _st_poll_fd_close(int osfd)
{
/*
* We don't maintain I/O counts for poll event system
* so nothing to check here.
*/
return 0;
}
ST_HIDDEN int _st_poll_fd_getlimit(void)
{
/* zero means no specific limit */
return 0;
}
static _st_eventsys_t _st_poll_eventsys = {
"poll",
ST_EVENTSYS_POLL,
_st_poll_init,
_st_poll_dispatch,
_st_poll_pollset_add,
_st_poll_pollset_del,
_st_poll_fd_new,
_st_poll_fd_close,
_st_poll_fd_getlimit
};
#endif /* MD_HAVE_POLL */
#ifdef MD_HAVE_KQUEUE
/*****************************************
* kqueue event system
*/
ST_HIDDEN int _st_kq_init(void)
{
int err = 0;
int rv = 0;
_st_kq_data = (struct _st_kqdata *) calloc(1, sizeof(*_st_kq_data));
if (!_st_kq_data) {
return -1;
}
if ((_st_kq_data->kq = kqueue()) < 0) {
err = errno;
rv = -1;
goto cleanup_kq;
}
fcntl(_st_kq_data->kq, F_SETFD, FD_CLOEXEC);
_st_kq_data->pid = getpid();
/*
* Allocate file descriptor data array.
* FD_SETSIZE looks like good initial size.
*/
_st_kq_data->fd_data_size = FD_SETSIZE;
_st_kq_data->fd_data = (_kq_fd_data_t *)calloc(_st_kq_data->fd_data_size, sizeof(_kq_fd_data_t));
if (!_st_kq_data->fd_data) {
err = errno;
rv = -1;
goto cleanup_kq;
}
/* Allocate event lists */
_st_kq_data->evtlist_size = ST_KQ_MIN_EVTLIST_SIZE;
_st_kq_data->evtlist = (struct kevent *)malloc(_st_kq_data->evtlist_size * sizeof(struct kevent));
_st_kq_data->addlist_size = ST_KQ_MIN_EVTLIST_SIZE;
_st_kq_data->addlist = (struct kevent *)malloc(_st_kq_data->addlist_size * sizeof(struct kevent));
_st_kq_data->dellist_size = ST_KQ_MIN_EVTLIST_SIZE;
_st_kq_data->dellist = (struct kevent *)malloc(_st_kq_data->dellist_size * sizeof(struct kevent));
if (!_st_kq_data->evtlist || !_st_kq_data->addlist ||
!_st_kq_data->dellist) {
err = ENOMEM;
rv = -1;
}
cleanup_kq:
if (rv < 0) {
if (_st_kq_data->kq >= 0) {
close(_st_kq_data->kq);
}
free(_st_kq_data->fd_data);
free(_st_kq_data->evtlist);
free(_st_kq_data->addlist);
free(_st_kq_data->dellist);
free(_st_kq_data);
_st_kq_data = NULL;
errno = err;
}
return rv;
}
ST_HIDDEN int _st_kq_fd_data_expand(int maxfd)
{
_kq_fd_data_t *ptr;
int n = _st_kq_data->fd_data_size;
while (maxfd >= n) {
n <<= 1;
}
ptr = (_kq_fd_data_t *)realloc(_st_kq_data->fd_data, n * sizeof(_kq_fd_data_t));
if (!ptr) {
return -1;
}
memset(ptr + _st_kq_data->fd_data_size, 0, (n - _st_kq_data->fd_data_size) * sizeof(_kq_fd_data_t));
_st_kq_data->fd_data = ptr;
_st_kq_data->fd_data_size = n;
return 0;
}
ST_HIDDEN int _st_kq_addlist_expand(int avail)
{
struct kevent *ptr;
int n = _st_kq_data->addlist_size;
while (avail > n - _st_kq_data->addlist_cnt) {
n <<= 1;
}
ptr = (struct kevent *)realloc(_st_kq_data->addlist, n * sizeof(struct kevent));
if (!ptr) {
return -1;
}
_st_kq_data->addlist = ptr;
_st_kq_data->addlist_size = n;
/*
* Try to expand the result event list too
* (although we don't have to do it).
*/
ptr = (struct kevent *)realloc(_st_kq_data->evtlist, n * sizeof(struct kevent));
if (ptr) {
_st_kq_data->evtlist = ptr;
_st_kq_data->evtlist_size = n;
}
return 0;
}
ST_HIDDEN void _st_kq_addlist_add(const struct kevent *kev)
{
ST_ASSERT(_st_kq_data->addlist_cnt < _st_kq_data->addlist_size);
memcpy(_st_kq_data->addlist + _st_kq_data->addlist_cnt, kev, sizeof(struct kevent));
_st_kq_data->addlist_cnt++;
}
ST_HIDDEN void _st_kq_dellist_add(const struct kevent *kev)
{
int n = _st_kq_data->dellist_size;
if (_st_kq_data->dellist_cnt >= n) {
struct kevent *ptr;
n <<= 1;
ptr = (struct kevent *)realloc(_st_kq_data->dellist, n * sizeof(struct kevent));
if (!ptr) {
/* See comment in _st_kq_pollset_del() */
return;
}
_st_kq_data->dellist = ptr;
_st_kq_data->dellist_size = n;
}
memcpy(_st_kq_data->dellist + _st_kq_data->dellist_cnt, kev, sizeof(struct kevent));
_st_kq_data->dellist_cnt++;
}
ST_HIDDEN int _st_kq_pollset_add(struct pollfd *pds, int npds)
{
struct kevent kev;
struct pollfd *pd;
struct pollfd *epd = pds + npds;
/*
* Pollset adding is "atomic". That is, either it succeeded for
* all descriptors in the set or it failed. It means that we
* need to do all the checks up front so we don't have to
* "unwind" if adding of one of the descriptors failed.
*/
for (pd = pds; pd < epd; pd++) {
/* POLLIN and/or POLLOUT must be set, but nothing else */
if (pd->fd < 0 || !pd->events || (pd->events & ~(POLLIN | POLLOUT))) {
errno = EINVAL;
return -1;
}
if (pd->fd >= _st_kq_data->fd_data_size && _st_kq_fd_data_expand(pd->fd) < 0) {
return -1;
}
}
/*
* Make sure we have enough room in the addlist for twice as many
* descriptors as in the pollset (for both READ and WRITE filters).
*/
npds <<= 1;
if (npds > _st_kq_data->addlist_size - _st_kq_data->addlist_cnt && _st_kq_addlist_expand(npds) < 0) {
return -1;
}
for (pd = pds; pd < epd; pd++) {
if ((pd->events & POLLIN) && (_ST_KQ_READ_CNT(pd->fd)++ == 0)) {
memset(&kev, 0, sizeof(kev));
kev.ident = pd->fd;
kev.filter = EVFILT_READ;
#ifdef NOTE_EOF
/* Make it behave like select() and poll() */
kev.fflags = NOTE_EOF;
#endif
kev.flags = (EV_ADD | EV_ONESHOT);
_st_kq_addlist_add(&kev);
}
if ((pd->events & POLLOUT) && (_ST_KQ_WRITE_CNT(pd->fd)++ == 0)) {
memset(&kev, 0, sizeof(kev));
kev.ident = pd->fd;
kev.filter = EVFILT_WRITE;
kev.flags = (EV_ADD | EV_ONESHOT);
_st_kq_addlist_add(&kev);
}
}
return 0;
}
ST_HIDDEN void _st_kq_pollset_del(struct pollfd *pds, int npds)
{
struct kevent kev;
struct pollfd *pd;
struct pollfd *epd = pds + npds;
/*
* It's OK if deleting fails because a descriptor will either be
* closed or fire only once (we set EV_ONESHOT flag).
*/
_st_kq_data->dellist_cnt = 0;
for (pd = pds; pd < epd; pd++) {
if ((pd->events & POLLIN) && (--_ST_KQ_READ_CNT(pd->fd) == 0)) {
memset(&kev, 0, sizeof(kev));
kev.ident = pd->fd;
kev.filter = EVFILT_READ;
kev.flags = EV_DELETE;
_st_kq_dellist_add(&kev);
}
if ((pd->events & POLLOUT) && (--_ST_KQ_WRITE_CNT(pd->fd) == 0)) {
memset(&kev, 0, sizeof(kev));
kev.ident = pd->fd;
kev.filter = EVFILT_WRITE;
kev.flags = EV_DELETE;
_st_kq_dellist_add(&kev);
}
}
if (_st_kq_data->dellist_cnt > 0) {
/*
* We do "synchronous" kqueue deletes to avoid deleting
* closed descriptors and other possible problems.
*/
int rv;
do {
/* This kevent() won't block since result list size is 0 */
rv = kevent(_st_kq_data->kq, _st_kq_data->dellist, _st_kq_data->dellist_cnt, NULL, 0, NULL);
} while (rv < 0 && errno == EINTR);
}
}
ST_HIDDEN void _st_kq_dispatch(void)
{
struct timespec timeout, *tsp;
struct kevent kev;
st_utime_t min_timeout;
_st_clist_t *q;
_st_pollq_t *pq;
struct pollfd *pds, *epds;
int nfd, i, osfd, notify, filter;
short events, revents;
if (_ST_SLEEPQ == NULL) {
tsp = NULL;
} else {
min_timeout = (_ST_SLEEPQ->due <= _ST_LAST_CLOCK) ? 0 : (_ST_SLEEPQ->due - _ST_LAST_CLOCK);
timeout.tv_sec = (time_t) (min_timeout / 1000000);
timeout.tv_nsec = (long) ((min_timeout % 1000000) * 1000);
tsp = &timeout;
}
retry_kevent:
/* Check for I/O operations */
nfd = kevent(_st_kq_data->kq, _st_kq_data->addlist, _st_kq_data->addlist_cnt,
_st_kq_data->evtlist, _st_kq_data->evtlist_size, tsp);
_st_kq_data->addlist_cnt = 0;
if (nfd > 0) {
for (i = 0; i < nfd; i++) {
osfd = _st_kq_data->evtlist[i].ident;
filter = _st_kq_data->evtlist[i].filter;
if (filter == EVFILT_READ) {
_ST_KQ_REVENTS(osfd) |= POLLIN;
} else if (filter == EVFILT_WRITE) {
_ST_KQ_REVENTS(osfd) |= POLLOUT;
}
if (_st_kq_data->evtlist[i].flags & EV_ERROR) {
if (_st_kq_data->evtlist[i].data == EBADF) {
_ST_KQ_REVENTS(osfd) |= POLLNVAL;
} else {
_ST_KQ_REVENTS(osfd) |= POLLERR;
}
}
}
_st_kq_data->dellist_cnt = 0;
for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) {
pq = _ST_POLLQUEUE_PTR(q);
notify = 0;
epds = pq->pds + pq->npds;
for (pds = pq->pds; pds < epds; pds++) {
osfd = pds->fd;
events = pds->events;
revents = (short)(_ST_KQ_REVENTS(osfd) & ~(POLLIN | POLLOUT));
if ((events & POLLIN) && (_ST_KQ_REVENTS(osfd) & POLLIN)) {
revents |= POLLIN;
}
if ((events & POLLOUT) && (_ST_KQ_REVENTS(osfd) & POLLOUT)) {
revents |= POLLOUT;
}
pds->revents = revents;
if (revents) {
notify = 1;
}
}
if (notify) {
ST_REMOVE_LINK(&pq->links);
pq->on_ioq = 0;
for (pds = pq->pds; pds < epds; pds++) {
osfd = pds->fd;
events = pds->events;
/*
* We set EV_ONESHOT flag so we only need to delete
* descriptor if it didn't fire.
*/
if ((events & POLLIN) && (--_ST_KQ_READ_CNT(osfd) == 0) && ((_ST_KQ_REVENTS(osfd) & POLLIN) == 0)) {
memset(&kev, 0, sizeof(kev));
kev.ident = osfd;
kev.filter = EVFILT_READ;
kev.flags = EV_DELETE;
_st_kq_dellist_add(&kev);
}
if ((events & POLLOUT) && (--_ST_KQ_WRITE_CNT(osfd) == 0) && ((_ST_KQ_REVENTS(osfd) & POLLOUT) == 0)) {
memset(&kev, 0, sizeof(kev));
kev.ident = osfd;
kev.filter = EVFILT_WRITE;
kev.flags = EV_DELETE;
_st_kq_dellist_add(&kev);
}
}
if (pq->thread->flags & _ST_FL_ON_SLEEPQ) {
_ST_DEL_SLEEPQ(pq->thread);
}
pq->thread->state = _ST_ST_RUNNABLE;
_ST_ADD_RUNQ(pq->thread);
}
}
if (_st_kq_data->dellist_cnt > 0) {
int rv;
do {
/* This kevent() won't block since result list size is 0 */
rv = kevent(_st_kq_data->kq, _st_kq_data->dellist, _st_kq_data->dellist_cnt, NULL, 0, NULL);
} while (rv < 0 && errno == EINTR);
}
for (i = 0; i < nfd; i++) {
osfd = _st_kq_data->evtlist[i].ident;
_ST_KQ_REVENTS(osfd) = 0;
}
} else if (nfd < 0) {
if (errno == EBADF && _st_kq_data->pid != getpid()) {
/* We probably forked, reinitialize kqueue */
if ((_st_kq_data->kq = kqueue()) < 0) {
/* There is nothing we can do here, will retry later */
return;
}
fcntl(_st_kq_data->kq, F_SETFD, FD_CLOEXEC);
_st_kq_data->pid = getpid();
/* Re-register all descriptors on ioq with new kqueue */
memset(_st_kq_data->fd_data, 0, _st_kq_data->fd_data_size * sizeof(_kq_fd_data_t));
for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) {
pq = _ST_POLLQUEUE_PTR(q);
_st_kq_pollset_add(pq->pds, pq->npds);
}
goto retry_kevent;
}
}
}
ST_HIDDEN int _st_kq_fd_new(int osfd)
{
if (osfd >= _st_kq_data->fd_data_size && _st_kq_fd_data_expand(osfd) < 0) {
return -1;
}
return 0;
}
ST_HIDDEN int _st_kq_fd_close(int osfd)
{
if (_ST_KQ_READ_CNT(osfd) || _ST_KQ_WRITE_CNT(osfd)) {
errno = EBUSY;
return -1;
}
return 0;
}
ST_HIDDEN int _st_kq_fd_getlimit(void)
{
/* zero means no specific limit */
return 0;
}
static _st_eventsys_t _st_kq_eventsys = {
"kqueue",
ST_EVENTSYS_ALT,
_st_kq_init,
_st_kq_dispatch,
_st_kq_pollset_add,
_st_kq_pollset_del,
_st_kq_fd_new,
_st_kq_fd_close,
_st_kq_fd_getlimit
};
#endif /* MD_HAVE_KQUEUE */
#ifdef MD_HAVE_EPOLL
/*****************************************
* epoll event system
*/
... ... @@ -1386,8 +458,6 @@ static _st_eventsys_t _st_epoll_eventsys = {
_st_epoll_fd_close,
_st_epoll_fd_getlimit
};
#endif /* MD_HAVE_EPOLL */
/*****************************************
* Public functions
... ... @@ -1402,30 +472,12 @@ int st_set_eventsys(int eventsys)
switch (eventsys) {
case ST_EVENTSYS_DEFAULT:
#ifdef USE_POLL
_st_eventsys = &_st_poll_eventsys;
#else
_st_eventsys = &_st_select_eventsys;
#endif
break;
case ST_EVENTSYS_SELECT:
_st_eventsys = &_st_select_eventsys;
break;
#ifdef MD_HAVE_POLL
case ST_EVENTSYS_POLL:
_st_eventsys = &_st_poll_eventsys;
break;
#endif
case ST_EVENTSYS_ALT:
#if defined (MD_HAVE_KQUEUE)
_st_eventsys = &_st_kq_eventsys;
#elif defined (MD_HAVE_EPOLL)
default:
if (_st_epoll_is_supported()) {
_st_eventsys = &_st_epoll_eventsys;
break;
}
#endif
break;
default:
errno = EINVAL;
return -1;
}
... ...
... ... @@ -177,10 +177,6 @@
/*****************************************
* Other defines
*/
#if !defined(MD_HAVE_POLL) && !defined(MD_DONT_HAVE_POLL)
#define MD_HAVE_POLL
#endif
#ifndef MD_STACK_PAD_SIZE
#define MD_STACK_PAD_SIZE 128
#endif
... ...