Compile multiple network backends into the binary and choose which one to use at runtime.

This makes it possible to share more code between the backends and also work around
bugs of certain backends on some (versions of) operating systems.
This commit is contained in:
Jan Vidar Krey 2010-02-10 21:57:27 +01:00
parent 6ac78e1a10
commit 896bbfcb5d
9 changed files with 446 additions and 382 deletions

View File

@ -21,6 +21,9 @@
#include "network/connection.h" #include "network/connection.h"
struct net_backend;
struct net_connection;
struct net_cleanup_handler struct net_cleanup_handler
{ {
size_t num; size_t num;
@ -28,6 +31,157 @@ struct net_cleanup_handler
struct net_connection** queue; struct net_connection** queue;
}; };
struct net_backend
{
struct net_backend_common common;
time_t now; /* the time now (used for timeout handling) */
struct timeout_queue timeout_queue; /* used for timeout handling */
struct net_cleanup_handler* cleaner; /* handler to cleanup connections at a safe point */
struct net_backend_handler handler; /* backend event handler */
struct net_backend* data; /* backend specific data */
};
static struct net_backend* g_backend;
#ifdef USE_EPOLL
extern struct net_backend* net_backend_init_epoll(struct net_backend_handler*, struct net_backend_common*);
#endif
#ifdef USE_KQUEUE
extern struct net_backend* net_backend_init_kqueue(struct net_backend_handler*, struct net_backend_common*);
#endif
#ifdef USE_SELECT
extern struct net_backend* net_backend_init_select(struct net_backend_handler*, struct net_backend_common*);
#endif
static net_backend_init_t net_backend_init_funcs[] = {
#ifdef USE_EPOLL
net_backend_init_epoll,
#endif
#ifdef USE_KQUEUE
net_backend_init_kqueue,
#endif
#ifdef USE_SELECT
net_backend_init_select,
#endif
0
};
int net_backend_init()
{
size_t n;
g_backend = hub_malloc_zero(sizeof(struct net_backend));
g_backend->common.num = 0;
g_backend->common.max = net_get_max_sockets();
g_backend->now = time(0);
timeout_queue_initialize(&g_backend->timeout_queue, g_backend->now, 600); /* FIXME: max 600 secs! */
g_backend->cleaner = net_cleanup_initialize(g_backend->common.max);
for (n = 0; n < sizeof(net_backend_init_funcs); n++)
{
if (!net_backend_init_funcs[n])
break;
g_backend->data = net_backend_init_funcs[n](&g_backend->handler, &g_backend->common);
if (g_backend->data)
{
LOG_DEBUG("Initialized %s network backend.", g_backend->handler.backend_name());
return 1;
}
}
LOG_FATAL("Unable to find a suitable network backend");
return 0;
}
void net_backend_shutdown()
{
g_backend->handler.backend_shutdown(g_backend->data);
timeout_queue_shutdown(&g_backend->timeout_queue);
net_cleanup_shutdown(g_backend->cleaner);
hub_free(g_backend);
g_backend = 0;
}
void net_con_reinitialize(struct net_connection* con, net_connection_cb callback, const void* ptr, int events)
{
con->callback = callback;
con->ptr = (void*) ptr;
net_con_update(con, events);
}
void net_con_update(struct net_connection* con, int events)
{
g_backend->handler.con_mod(g_backend->data, con, events);
}
struct net_connection* net_con_create()
{
return g_backend->handler.con_create(g_backend->data);
}
struct timeout_queue* net_backend_get_timeout_queue()
{
if (!g_backend)
return 0;
return &g_backend->timeout_queue;
}
/**
* Process the network backend.
*/
int net_backend_process()
{
int res;
size_t secs = timeout_queue_get_next_timeout(&g_backend->timeout_queue, g_backend->now);
res = g_backend->handler.backend_poll(g_backend->data, secs * 1000);
g_backend->now = time(0);
timeout_queue_process(&g_backend->timeout_queue, g_backend->now);
if (res == -1)
{
LOG_WARN("backend error.");
return 0;
}
g_backend->handler.backend_process(g_backend->data, res);
net_cleanup_process(g_backend->cleaner);
return 1;
}
void net_con_initialize(struct net_connection* con, int sd, net_connection_cb callback, const void* ptr, int events)
{
g_backend->handler.con_init(g_backend->data, con, sd, callback, ptr);
net_set_nonblocking(con->sd, 1);
net_set_nosigpipe(con->sd, 1);
g_backend->handler.con_add(g_backend->data, con, events);
g_backend->common.num++;
}
void net_con_close(struct net_connection* con)
{
if (con->flags & NET_CLEANUP)
return;
g_backend->common.num--;
net_con_clear_timeout(con);
g_backend->handler.con_del(g_backend->data, con);
net_close(con->sd);
con->sd = -1;
net_cleanup_delayed_free(g_backend->cleaner, con);
}
struct net_cleanup_handler* net_cleanup_initialize(size_t max) struct net_cleanup_handler* net_cleanup_initialize(size_t max)
{ {
struct net_cleanup_handler* handler = (struct net_cleanup_handler*) hub_malloc(sizeof(struct net_cleanup_handler)); struct net_cleanup_handler* handler = (struct net_cleanup_handler*) hub_malloc(sizeof(struct net_cleanup_handler));

View File

@ -20,14 +20,50 @@
#ifndef HAVE_UHUB_NETWORK_BACKEND_H #ifndef HAVE_UHUB_NETWORK_BACKEND_H
#define HAVE_UHUB_NETWORK_BACKEND_H #define HAVE_UHUB_NETWORK_BACKEND_H
struct net_backend;
struct net_backend_common;
struct net_backend_handler;
struct net_cleanup_handler; struct net_cleanup_handler;
struct net_connection; struct net_connection;
typedef void (*net_connection_cb)(struct net_connection*, int event, void* ptr);
typedef struct net_backend* (*net_backend_init_t)(struct net_backend_handler* handler, struct net_backend_common* common);
typedef int (*net_backend_poll)(struct net_backend*, int ms);
typedef void (*net_backend_proc)(struct net_backend*, int res);
typedef void (*net_backend_destroy)(struct net_backend*);
typedef struct net_connection* (*net_con_backend_create)(struct net_backend*);
typedef void (*net_con_backend_init)(struct net_backend*, struct net_connection*, int sd, net_connection_cb callback, const void* ptr);
typedef void (*net_con_backend_add)(struct net_backend*, struct net_connection*, int mask);
typedef void (*net_con_backend_mod)(struct net_backend*, struct net_connection*, int mask);
typedef void (*net_con_backend_del)(struct net_backend*,struct net_connection*);
typedef const char* (*net_con_backend_name)(void);
struct net_backend_handler
{
net_con_backend_name backend_name;
net_backend_poll backend_poll;
net_backend_proc backend_process;
net_backend_destroy backend_shutdown;
net_con_backend_create con_create;
net_con_backend_init con_init;
net_con_backend_add con_add;
net_con_backend_mod con_mod;
net_con_backend_del con_del;
};
struct net_backend_common
{
size_t num; /* number of connections monitored by the backend */
size_t max; /* max number of connections that can be monitored */
};
/** /**
* Initialize the network backend. * Initialize the network backend.
* Returns 1 on success, or 0 on failure. * Returns 1 on success, or 0 on failure.
*/ */
extern int net_backend_initialize(); extern int net_backend_init();
/** /**
* Shutdown the network connection backend. * Shutdown the network connection backend.

View File

@ -249,6 +249,11 @@ void* net_con_get_ptr(struct net_connection* con)
return con->ptr; return con->ptr;
} }
void net_con_destroy(struct net_connection* con)
{
hub_free(con);
}
void net_con_callback(struct net_connection* con, int events) void net_con_callback(struct net_connection* con, int events)
{ {
if (con->flags & NET_CLEANUP) if (con->flags & NET_CLEANUP)

View File

@ -28,10 +28,6 @@
#define NET_EVENT_READ 0x0002 #define NET_EVENT_READ 0x0002
#define NET_EVENT_WRITE 0x0004 #define NET_EVENT_WRITE 0x0004
struct net_connection;
typedef void (*net_connection_cb)(struct net_connection*, int event, void* ptr);
struct net_connection struct net_connection
{ {
NET_CON_STRUCT_COMMON NET_CON_STRUCT_COMMON

View File

@ -33,128 +33,52 @@ struct net_connection_epoll
struct epoll_event ev; struct epoll_event ev;
}; };
struct net_backend struct net_backend_epoll
{ {
int epfd; int epfd;
size_t num;
size_t max;
struct net_connection_epoll** conns; struct net_connection_epoll** conns;
struct epoll_event events[EPOLL_EVBUFFER]; struct epoll_event events[EPOLL_EVBUFFER];
time_t now; struct net_backend_common* common;
struct timeout_queue timeout_queue;
struct net_cleanup_handler* cleaner;
}; };
static struct net_backend* g_backend = 0; static void net_backend_set_handlers(struct net_backend_handler* handler);
static void net_con_print(const char* prefix, struct net_connection_epoll* con) const char* net_backend_name_epoll()
{ {
char buf[512]; return "epoll";
int off = snprintf(buf, 512, "%s: net_connection={ sd=%d, flags=%u, callback=%p, ptr=%p, ev={ events=%s%s, data.ptr=%p }",
prefix, con->sd, con->flags, con->callback, con->ptr, (con->ev.events & EPOLLIN ? "R" : ""),(con->ev.events & EPOLLOUT ? "W" : "") , con->ev.data.ptr);
if (con->timeout)
{
sprintf(buf + off, ", timeout={ %d seconds left }", (int) (con->timeout->timestamp - g_backend->now));
}
else
{
sprintf(buf + off, ", timeout=NULL");
}
LOG_TRACE(buf);
} }
/** int net_backend_poll_epoll(struct net_backend* data, int ms)
* Initialize the network backend.
* Returns 1 on success, or 0 on failure.
*/
int net_backend_initialize()
{ {
size_t max = net_get_max_sockets(); struct net_backend_epoll* backend = (struct net_backend_epoll*) data;
g_backend = hub_malloc(sizeof(struct net_backend)); int res = epoll_wait(backend->epfd, backend->events, MIN(backend->common->num, EPOLL_EVBUFFER), ms);
g_backend->epfd = epoll_create(max); if (res == -1 && errno == EINTR)
if (g_backend->epfd == -1)
{
LOG_WARN("Unable to create epoll socket.");
return 0; return 0;
return res;
} }
g_backend->num = 0; void net_backend_process_epoll(struct net_backend* data, int res)
g_backend->max = max;
g_backend->conns = hub_malloc_zero(sizeof(struct net_connection_epoll*) * max);
memset(g_backend->events, 0, sizeof(g_backend->events));
g_backend->now = time(0);
timeout_queue_initialize(&g_backend->timeout_queue, g_backend->now, 600); /* look max 10 minutes into the future. */
g_backend->cleaner = net_cleanup_initialize(max);
return 1;
}
/**
* Shutdown the network connection backend.
*/
void net_backend_shutdown()
{ {
close(g_backend->epfd); int n;
timeout_queue_shutdown(&g_backend->timeout_queue); struct net_backend_epoll* backend = (struct net_backend_epoll*) data;
net_cleanup_shutdown(g_backend->cleaner);
hub_free(g_backend->conns);
hub_free(g_backend);
g_backend = 0;
}
/**
* Process the network backend.
*/
int net_backend_process()
{
int n, res;
size_t secs = timeout_queue_get_next_timeout(&g_backend->timeout_queue, g_backend->now);
LOG_TRACE("epoll_wait: fd=%d, events=%x, max=%zu, seconds=%d", g_backend->epfd, g_backend->events, MIN(g_backend->num, EPOLL_EVBUFFER), (int) secs);
res = epoll_wait(g_backend->epfd, g_backend->events, MIN(g_backend->num, EPOLL_EVBUFFER), secs * 1000);
g_backend->now = time(0);
timeout_queue_process(&g_backend->timeout_queue, g_backend->now);
if (res == -1)
{
LOG_WARN("epoll_wait returned -1");
return 0;
}
for (n = 0; n < res; n++) for (n = 0; n < res; n++)
{ {
struct net_connection_epoll* con = (struct net_connection_epoll*) g_backend->events[n].data.ptr; struct net_connection_epoll* con = (struct net_connection_epoll*) backend->events[n].data.ptr;
int ev = 0; int ev = 0;
if (g_backend->events[n].events & EPOLLIN) ev |= NET_EVENT_READ; if (backend->events[n].events & EPOLLIN) ev |= NET_EVENT_READ;
if (g_backend->events[n].events & EPOLLOUT) ev |= NET_EVENT_WRITE; if (backend->events[n].events & EPOLLOUT) ev |= NET_EVENT_WRITE;
net_con_callback((struct net_connection*) con, ev); net_con_callback((struct net_connection*) con, ev);
} }
net_cleanup_process(g_backend->cleaner);
return 1;
} }
struct timeout_queue* net_backend_get_timeout_queue() struct net_connection* net_con_create_epoll(struct net_backend* data)
{
if (!g_backend)
return 0;
return &g_backend->timeout_queue;
}
struct net_connection* net_con_create()
{ {
struct net_connection* con = (struct net_connection*) hub_malloc_zero(sizeof(struct net_connection_epoll)); struct net_connection* con = (struct net_connection*) hub_malloc_zero(sizeof(struct net_connection_epoll));
con->sd = -1; con->sd = -1;
return con; return con;
} }
void net_con_destroy(struct net_connection* con) void net_con_initialize_epoll(struct net_backend* data, struct net_connection* con_, int sd, net_connection_cb callback, const void* ptr)
{
hub_free(con);
}
void net_con_initialize(struct net_connection* con_, int sd, net_connection_cb callback, const void* ptr, int events)
{ {
struct net_connection_epoll* con = (struct net_connection_epoll*) con_; struct net_connection_epoll* con = (struct net_connection_epoll*) con_;
con->sd = sd; con->sd = sd;
@ -163,34 +87,29 @@ void net_con_initialize(struct net_connection* con_, int sd, net_connection_cb c
con->ev.events = 0; con->ev.events = 0;
con->ptr = (void*) ptr; con->ptr = (void*) ptr;
con->ev.data.ptr = (void*) con; con->ev.data.ptr = (void*) con;
}
net_set_nonblocking(con->sd, 1); void net_con_backend_add_epoll(struct net_backend* data, struct net_connection* con_, int events)
net_set_nosigpipe(con->sd, 1); {
struct net_backend_epoll* backend = (struct net_backend_epoll*) data;
struct net_connection_epoll* con = (struct net_connection_epoll*) con_;
backend->conns[con->sd] = con;
if (events & NET_EVENT_READ) con->ev.events |= EPOLLIN; if (events & NET_EVENT_READ) con->ev.events |= EPOLLIN;
if (events & NET_EVENT_WRITE) con->ev.events |= EPOLLOUT; if (events & NET_EVENT_WRITE) con->ev.events |= EPOLLOUT;
g_backend->conns[sd] = con; if (epoll_ctl(backend->epfd, EPOLL_CTL_ADD, con->sd, &con->ev) == -1)
g_backend->num++;
if (epoll_ctl(g_backend->epfd, EPOLL_CTL_ADD, con->sd, &con->ev) == -1)
{ {
LOG_TRACE("epoll_ctl() add failed."); LOG_TRACE("epoll_ctl() add failed.");
} }
net_con_print("ADD", con);
} }
void net_con_reinitialize(struct net_connection* con, net_connection_cb callback, const void* ptr, int events) void net_con_backend_mod_epoll(struct net_backend* data, struct net_connection* con_, int events)
{
con->callback = callback;
con->ptr = (void*) ptr;
net_con_update(con, events);
}
void net_con_update(struct net_connection* con_, int events)
{ {
struct net_backend_epoll* backend = (struct net_backend_epoll*) data;
struct net_connection_epoll* con = (struct net_connection_epoll*) con_; struct net_connection_epoll* con = (struct net_connection_epoll*) con_;
int newev = 0; int newev = 0;
if (events & NET_EVENT_READ) newev |= EPOLLIN; if (events & NET_EVENT_READ) newev |= EPOLLIN;
if (events & NET_EVENT_WRITE) newev |= EPOLLOUT; if (events & NET_EVENT_WRITE) newev |= EPOLLOUT;
@ -199,37 +118,66 @@ void net_con_update(struct net_connection* con_, int events)
return; return;
con->ev.events = newev; con->ev.events = newev;
if (epoll_ctl(g_backend->epfd, EPOLL_CTL_MOD, con->sd, &con->ev) == -1) if (epoll_ctl(backend->epfd, EPOLL_CTL_MOD, con->sd, &con->ev) == -1)
{ {
LOG_TRACE("epoll_ctl() modify failed."); LOG_TRACE("epoll_ctl() modify failed.");
} }
net_con_print("MOD", con);
} }
void net_con_close(struct net_connection* con_) void net_con_backend_del_epoll(struct net_backend* data, struct net_connection* con_)
{ {
struct net_backend_epoll* backend = (struct net_backend_epoll*) data;
struct net_connection_epoll* con = (struct net_connection_epoll*) con_; struct net_connection_epoll* con = (struct net_connection_epoll*) con_;
if (con->flags & NET_CLEANUP)
return;
if (con->sd != -1) backend->conns[con->sd] = 0;
{
g_backend->conns[con->sd] = 0;
g_backend->num--;
}
net_con_clear_timeout(con_); if (epoll_ctl(backend->epfd, EPOLL_CTL_DEL, con->sd, &con->ev) == -1)
if (epoll_ctl(g_backend->epfd, EPOLL_CTL_DEL, con->sd, &con->ev) == -1)
{ {
LOG_WARN("epoll_ctl() delete failed."); LOG_WARN("epoll_ctl() delete failed.");
} }
}
net_close(con->sd); void net_backend_shutdown_epoll(struct net_backend* data)
con->sd = -1; {
struct net_backend_epoll* backend = (struct net_backend_epoll*) data;
close(backend->epfd);
hub_free(backend->conns);
hub_free(backend);
}
net_con_print("DEL", con); struct net_backend* net_backend_init_epoll(struct net_backend_handler* handler, struct net_backend_common* common)
net_cleanup_delayed_free(g_backend->cleaner, con_); {
struct net_backend_epoll* backend;
if (getenv("EVENT_NOEPOLL"))
return 0;
backend = hub_malloc_zero(sizeof(struct net_backend_epoll));
backend->epfd = epoll_create(common->max);
if (backend->epfd == -1)
{
LOG_WARN("Unable to create epoll socket.");
return 0;
}
backend->conns = hub_malloc_zero(sizeof(struct net_connection_epoll*) * common->max);
backend->common = common;
net_backend_set_handlers(handler);
return (struct net_backend*) backend;
}
static void net_backend_set_handlers(struct net_backend_handler* handler)
{
handler->backend_name = net_backend_name_epoll;
handler->backend_poll = net_backend_poll_epoll;
handler->backend_process = net_backend_process_epoll;
handler->backend_shutdown = net_backend_shutdown_epoll;
handler->con_create = net_con_create_epoll;
handler->con_init = net_con_initialize_epoll;
handler->con_add = net_con_backend_add_epoll;
handler->con_mod = net_con_backend_mod_epoll;
handler->con_del = net_con_backend_del_epoll;
} }
#endif /* USE_EPOLL */ #endif /* USE_EPOLL */

View File

@ -33,149 +33,87 @@ struct net_connection_kqueue
struct kevent ev; struct kevent ev;
}; };
struct net_backend struct net_backend_kqueue
{ {
int kqfd; int kqfd;
size_t num;
size_t max;
struct net_connection_kqueue** conns; struct net_connection_kqueue** conns;
struct kevent** changes; struct kevent** changes;
size_t nchanges; size_t nchanges;
struct kevent events[KQUEUE_EVBUFFER]; struct kevent events[KQUEUE_EVBUFFER];
time_t now; struct net_backend_common* common;
struct timeout_queue timeout_queue;
struct net_cleanup_handler* cleaner;
}; };
static struct net_backend* g_backend = 0; static void net_backend_set_handlers(struct net_backend_handler* handler);
const char* net_backend_name_kqueue()
{
return "kqueue";
}
/** int net_backend_poll_kqueue(struct net_backend* data, int ms)
* Initialize the network backend.
* Returns 1 on success, or 0 on failure.
*/
int net_backend_initialize()
{ {
g_backend = hub_malloc_zero(sizeof(struct net_backend)); int res;
g_backend->kqfd = kqueue(); struct timespec tspec = { 0, };
if (g_backend->kqfd == -1) struct net_backend_kqueue* backend = (struct net_backend_kqueue*) data;
{
LOG_WARN("Unable to create epoll socket."); tspec.tv_sec = (ms / 1000);
tspec.tv_nsec = ((ms % 1000) * 1000000); /* FIXME: correct? */
res = kevent(backend->kqfd, *backend->changes, backend->nchanges, backend->events, KQUEUE_EVBUFFER, &tspec);
backend->nchanges = 0;
if (res == -1 && errno == EINTR)
return 0; return 0;
return res;
} }
size_t max = net_get_max_sockets(); void net_backend_process_kqueue(struct net_backend* data, int res)
g_backend->max = max;
g_backend->conns = hub_malloc_zero(sizeof(struct net_connection_kqueue*) * max);
g_backend->changes = hub_malloc_zero(sizeof(struct kevent*) * max);
g_backend->now = time(0);
timeout_queue_initialize(&g_backend->timeout_queue, g_backend->now, 600); /* look max 10 minutes into the future. */
g_backend->cleaner = net_cleanup_initialize(max);
return 1;
}
/**
* Shutdown the network connection backend.
*/
void net_backend_shutdown()
{
close(g_backend->kqfd);
timeout_queue_shutdown(&g_backend->timeout_queue);
net_cleanup_shutdown(g_backend->cleaner);
hub_free(g_backend->conns);
hub_free(g_backend->changes);
hub_free(g_backend);
g_backend = 0;
}
/**
* Process the network backend.
*/
int net_backend_process()
{ {
int n; int n;
struct timespec tspec = { 0, }; struct net_backend_kqueue* backend = (struct net_backend_kqueue*) data;
size_t secs = timeout_queue_get_next_timeout(&g_backend->timeout_queue, g_backend->now);
tspec.tv_sec = secs;
int res = kevent(g_backend->kqfd, *g_backend->changes, g_backend->nchanges, g_backend->events, KQUEUE_EVBUFFER, &tspec);
g_backend->nchanges = 0;
g_backend->now = time(0);
timeout_queue_process(&g_backend->timeout_queue, g_backend->now);
if (res == -1)
{
LOG_WARN("kevent returned -1");
return 0;
}
for (n = 0; n < res; n++) for (n = 0; n < res; n++)
{ {
struct net_connection_kqueue* con = (struct net_connection_kqueue*) g_backend->events[n].udata; struct net_connection_kqueue* con = (struct net_connection_kqueue*) backend->events[n].udata;
int ev = 0; int ev = 0;
if (g_backend->events[n].filter & EVFILT_READ) ev |= NET_EVENT_READ; if (backend->events[n].filter & EVFILT_READ) ev |= NET_EVENT_READ;
if (g_backend->events[n].filter & EVFILT_WRITE) ev |= NET_EVENT_WRITE; if (backend->events[n].filter & EVFILT_WRITE) ev |= NET_EVENT_WRITE;
net_con_callback((struct net_connection*) con, ev); net_con_callback((struct net_connection*) con, ev);
} }
net_cleanup_process(g_backend->cleaner);
return 1;
} }
struct timeout_queue* net_backend_get_timeout_queue() struct net_connection* net_con_create_kqueue(struct net_backend* data)
{
if (!g_backend)
return 0;
return &g_backend->timeout_queue;
}
struct net_connection* net_con_create()
{ {
struct net_connection* con = (struct net_connection*) hub_malloc_zero(sizeof(struct net_connection_kqueue)); struct net_connection* con = (struct net_connection*) hub_malloc_zero(sizeof(struct net_connection_kqueue));
con->sd = -1; con->sd = -1;
return con; return con;
} }
void net_con_destroy(struct net_connection* con) void net_con_initialize_kqueue(struct net_backend* data, struct net_connection* con_, int sd, net_connection_cb callback, const void* ptr)
{ {
hub_free(con);
}
void net_con_initialize(struct net_connection* con_, int sd, net_connection_cb callback, const void* ptr, int events)
{
short filter = 0;
struct net_connection_kqueue* con = (struct net_connection_kqueue*) con_; struct net_connection_kqueue* con = (struct net_connection_kqueue*) con_;
con->sd = sd; con->sd = sd;
con->flags = 0; con->flags = 0;
con->callback = callback; con->callback = callback;
con->ptr = (void*) ptr; con->ptr = (void*) ptr;
net_set_nonblocking(con->sd, 1);
net_set_nosigpipe(con->sd, 1);
if (events & NET_EVENT_READ) filter |= EVFILT_READ;
if (events & NET_EVENT_WRITE) filter |= EVFILT_READ;
EV_SET(&con->ev, sd, filter, EV_ADD, 0, 0, con);
g_backend->changes[g_backend->nchanges++] = &con->ev;
g_backend->conns[sd] = con;
g_backend->num++;
} }
void net_con_reinitialize(struct net_connection* con, net_connection_cb callback, const void* ptr, int events) void net_con_backend_add_kqueue(struct net_backend* data, struct net_connection* con_, int events)
{
con->callback = callback;
con->ptr = (void*) ptr;
net_con_update(con, events);
}
void net_con_update(struct net_connection* con_, int events)
{ {
short filter = 0; short filter = 0;
struct net_backend_kqueue* backend = (struct net_backend_kqueue*) data;
struct net_connection_kqueue* con = (struct net_connection_kqueue*) con_;
if (events & NET_EVENT_READ) filter |= EVFILT_READ;
if (events & NET_EVENT_WRITE) filter |= EVFILT_READ;
EV_SET(&con->ev, con->sd, filter, EV_ADD, 0, 0, con);
backend->changes[backend->nchanges++] = &con->ev;
backend->conns[con->sd] = con;
}
void net_con_backend_mod_kqueue(struct net_backend* data, struct net_connection* con_, int events)
{
short filter = 0;
struct net_backend_kqueue* backend = (struct net_backend_kqueue*) data;
struct net_connection_kqueue* con = (struct net_connection_kqueue*) con_; struct net_connection_kqueue* con = (struct net_connection_kqueue*) con_;
if (events & NET_EVENT_READ) filter |= EVFILT_READ; if (events & NET_EVENT_READ) filter |= EVFILT_READ;
@ -185,30 +123,64 @@ void net_con_update(struct net_connection* con_, int events)
return; return;
EV_SET(&con->ev, con->sd, filter, EV_ADD, 0, 0, con); EV_SET(&con->ev, con->sd, filter, EV_ADD, 0, 0, con);
g_backend->changes[g_backend->nchanges++] = &con->ev; backend->changes[backend->nchanges++] = &con->ev;
} }
void net_con_close(struct net_connection* con_) void net_con_backend_del_kqueue(struct net_backend* data, struct net_connection* con_)
{ {
struct net_backend_kqueue* backend = (struct net_backend_kqueue*) data;
struct net_connection_kqueue* con = (struct net_connection_kqueue*) con_; struct net_connection_kqueue* con = (struct net_connection_kqueue*) con_;
if (con->flags & NET_CLEANUP)
return;
if (con->sd != -1) backend->conns[con->sd] = 0;
{
g_backend->conns[con->sd] = 0;
g_backend->num--;
}
net_con_clear_timeout(con_);
/* No need to remove it from the kqueue filter, the kqueue man page says /* No need to remove it from the kqueue filter, the kqueue man page says
it is automatically removed when the descriptor is closed. */ it is automatically removed when the descriptor is closed. */
}
net_close(con->sd); void net_backend_shutdown_kqueue(struct net_backend* data)
con->sd = -1; {
struct net_backend_kqueue* backend = (struct net_backend_kqueue*) data;
close(backend->kqfd);
hub_free(backend->conns);
hub_free(backend->changes);
hub_free(backend);
}
net_cleanup_delayed_free(g_backend->cleaner, con_); struct net_backend* net_backend_init_kqueue(struct net_backend_handler* handler, struct net_backend_common* common)
{
struct net_backend_kqueue* backend;
if (getenv("EVENT_NOKQUEUE"))
return 0;
backend = hub_malloc_zero(sizeof(struct net_backend_kqueue));
backend->kqfd = kqueue(common->max);
if (backend->kqfd == -1)
{
LOG_WARN("Unable to create kqueue socket.");
return 0;
}
backend->conns = hub_malloc_zero(sizeof(struct net_connection_kqueue*) * common->max);
backend->conns = hub_malloc_zero(sizeof(struct net_connection_kqueue*) * common->max);
backend->changes = hub_malloc_zero(sizeof(struct kevent*) * common->max);
backend->common = common;
net_backend_set_handlers(handler);
return (struct net_backend*) backend;
}
static void net_backend_set_handlers(struct net_backend_handler* handler)
{
handler->backend_name = net_backend_name_kqueue;
handler->backend_poll = net_backend_poll_kqueue;
handler->backend_process = net_backend_process_kqueue;
handler->backend_shutdown = net_backend_shutdown_kqueue;
handler->con_create = net_con_create_kqueue;
handler->con_init = net_con_initialize_kqueue;
handler->con_add = net_con_backend_add_kqueue;
handler->con_mod = net_con_backend_mod_kqueue;
handler->con_del = net_con_backend_del_kqueue;
} }
#endif /* USE_KQUEUE */ #endif /* USE_KQUEUE */

View File

@ -47,7 +47,13 @@ int net_initialize()
} }
#endif /* WINSOCK */ #endif /* WINSOCK */
net_backend_initialize(); if (!net_backend_init())
{
#ifdef WINSOCK
WSACleanup();
#endif
return -1;
}
net_stats_initialize(); net_stats_initialize();
#ifdef SSL_SUPPORT #ifdef SSL_SUPPORT

View File

@ -30,113 +30,67 @@ struct net_connection_select
NET_CON_STRUCT_COMMON NET_CON_STRUCT_COMMON
}; };
struct net_backend struct net_backend_select
{ {
size_t num;
size_t max;
struct net_connection_select** conns; struct net_connection_select** conns;
fd_set rfds; fd_set rfds;
fd_set wfds; fd_set wfds;
time_t now; int maxfd;
struct timeout_queue timeout_queue; struct net_backend_common* common;
struct net_cleanup_handler* cleaner;
}; };
static struct net_backend* g_backend = 0; static void net_backend_set_handlers(struct net_backend_handler* handler);
static void net_con_print(const char* prefix, struct net_connection_select* con) const char* net_backend_name_select()
{ {
char buf[512]; return "select";
int off = snprintf(buf, 512, "%s: net_connection={ sd=%d, flags=%u, callback=%p, ptr=%p, events=%s%s",
prefix, con->sd, con->flags, con->callback, con->ptr, (con->flags & NET_EVENT_READ ? "R" : ""),(con->flags & NET_EVENT_WRITE ? "W" : ""));
if (con->timeout)
{
sprintf(buf + off, ", timeout={ %d seconds left }", (int) (con->timeout->timestamp - g_backend->now));
}
else
{
sprintf(buf + off, ", timeout=NULL");
}
LOG_TRACE(buf);
} }
/** int net_backend_poll_select(struct net_backend* data, int ms)
* Initialize the network backend.
* Returns 1 on success, or 0 on failure.
*/
int net_backend_initialize()
{ {
size_t max = net_get_max_sockets(); int found, res, n;
g_backend = hub_malloc(sizeof(struct net_backend));
g_backend->num = 0;
g_backend->max = max;
g_backend->conns = hub_malloc_zero(sizeof(struct net_connection_select*) * max);
FD_ZERO(&g_backend->rfds);
FD_ZERO(&g_backend->wfds);
g_backend->now = time(0);
timeout_queue_initialize(&g_backend->timeout_queue, g_backend->now, 600); /* look max 10 minutes into the future. */
g_backend->cleaner = net_cleanup_initialize(max);
return 1;
}
/**
* Shutdown the network connection backend.
*/
void net_backend_shutdown()
{
timeout_queue_shutdown(&g_backend->timeout_queue);
net_cleanup_shutdown(g_backend->cleaner);
hub_free(g_backend->conns);
hub_free(g_backend);
g_backend = 0;
}
/**
* Process the network backend.
*/
int net_backend_process()
{
int n, found, maxfd, res;
struct timeval tval; struct timeval tval;
size_t secs; struct net_backend_select* backend = (struct net_backend_select*) data;
FD_ZERO(&g_backend->rfds); tval.tv_sec = ms / 1000;
FD_ZERO(&g_backend->wfds); tval.tv_usec = ((ms % 1000) * 1000); // FIXME: correct?
secs = timeout_queue_get_next_timeout(&g_backend->timeout_queue, g_backend->now); FD_ZERO(&backend->rfds);
tval.tv_sec = secs; FD_ZERO(&backend->wfds);
tval.tv_usec = 0;
for (n = 0, found = 0; found < g_backend->num && n < g_backend->max; n++) backend->maxfd = -1;
for (n = 0, found = 0; found < backend->common->num && n < backend->common->max; n++)
{ {
struct net_connection_select* con = g_backend->conns[n]; struct net_connection_select* con = backend->conns[n];
if (con) if (con)
{ {
if (con->flags & NET_EVENT_READ) FD_SET(con->sd, &g_backend->rfds); if (con->flags & NET_EVENT_READ) FD_SET(con->sd, &backend->rfds);
if (con->flags & NET_EVENT_WRITE) FD_SET(con->sd, &g_backend->wfds); if (con->flags & NET_EVENT_WRITE) FD_SET(con->sd, &backend->wfds);
found++; found++;
maxfd = con->sd; backend->maxfd = con->sd;
} }
} }
backend->maxfd++;
res = select(maxfd+1, &g_backend->rfds, &g_backend->wfds, 0, &tval); res = select(backend->maxfd, &backend->rfds, &backend->wfds, 0, &tval);
g_backend->now = time(0);
timeout_queue_process(&g_backend->timeout_queue, g_backend->now);
if (res == -1) if (res == -1 && errno == EINTR)
{
LOG_WARN("select returned -1");
return 0; return 0;
return res;
} }
for (n = 0, found = 0; found < res && n < (maxfd+1); n++) void net_backend_process_select(struct net_backend* data, int res)
{ {
struct net_connection_select* con = g_backend->conns[n]; int n, found;
struct net_backend_select* backend = (struct net_backend_select*) data;
for (n = 0, found = 0; found < res && n < backend->maxfd; n++)
{
struct net_connection_select* con = backend->conns[n];
if (con) if (con)
{ {
int ev = 0; int ev = 0;
if (FD_ISSET(con->sd, &g_backend->rfds)) ev |= NET_EVENT_READ; if (FD_ISSET(con->sd, &backend->rfds)) ev |= NET_EVENT_READ;
if (FD_ISSET(con->sd, &g_backend->wfds)) ev |= NET_EVENT_WRITE; if (FD_ISSET(con->sd, &backend->wfds)) ev |= NET_EVENT_WRITE;
if (ev) if (ev)
{ {
@ -145,78 +99,75 @@ int net_backend_process()
} }
} }
} }
net_cleanup_process(g_backend->cleaner);
return 1;
} }
struct timeout_queue* net_backend_get_timeout_queue() struct net_connection* net_con_create_select(struct net_backend* data)
{
if (!g_backend)
return 0;
return &g_backend->timeout_queue;
}
struct net_connection* net_con_create()
{ {
struct net_connection* con = (struct net_connection*) hub_malloc_zero(sizeof(struct net_connection_select)); struct net_connection* con = (struct net_connection*) hub_malloc_zero(sizeof(struct net_connection_select));
con->sd = -1; con->sd = -1;
return con; return con;
} }
void net_con_destroy(struct net_connection* con) void net_con_initialize_select(struct net_backend* data, struct net_connection* con_, int sd, net_connection_cb callback, const void* ptr)
{
hub_free(con);
}
void net_con_initialize(struct net_connection* con_, int sd, net_connection_cb callback, const void* ptr, int events)
{ {
struct net_connection_select* con = (struct net_connection_select*) con_; struct net_connection_select* con = (struct net_connection_select*) con_;
con->sd = sd; con->sd = sd;
con->flags = events; con->flags = 0;
con->callback = callback; con->callback = callback;
con->ptr = (void*) ptr; con->ptr = (void*) ptr;
net_set_nonblocking(con->sd, 1);
net_set_nosigpipe(con->sd, 1);
g_backend->conns[sd] = con;
g_backend->num++;
net_con_print("ADD", con);
} }
void net_con_reinitialize(struct net_connection* con, net_connection_cb callback, const void* ptr, int events) void net_con_backend_add_select(struct net_backend* data, struct net_connection* con, int events)
{ {
con->callback = callback; struct net_backend_select* backend = (struct net_backend_select*) data;
con->ptr = (void*) ptr; backend->conns[con->sd] = (struct net_connection_select*) con;
net_con_update(con, events);
} }
void net_con_update(struct net_connection* con, int events) void net_con_backend_mod_select(struct net_backend* data, struct net_connection* con, int events)
{ {
con->flags = events; con->flags |= (events & (NET_EVENT_READ | NET_EVENT_WRITE));;
net_con_print("MOD", (struct net_connection_select*) con);
} }
void net_con_close(struct net_connection* con) void net_con_backend_del_select(struct net_backend* data, struct net_connection* con)
{ {
if (con->flags & NET_CLEANUP) struct net_backend_select* backend = (struct net_backend_select*) data;
return; backend->conns[con->sd] = 0;
if (con->sd != -1)
{
g_backend->conns[con->sd] = 0;
g_backend->num--;
} }
net_con_clear_timeout(con); void net_backend_shutdown_select(struct net_backend* data)
{
struct net_backend_select* backend = (struct net_backend_select*) data;
hub_free(backend->conns);
hub_free(backend);
}
net_close(con->sd); struct net_backend* net_backend_init_select(struct net_backend_handler* handler, struct net_backend_common* common)
con->sd = -1; {
struct net_backend_select* backend;
net_con_print("DEL", (struct net_connection_select*) con); if (getenv("EVENT_NOSELECT"))
net_cleanup_delayed_free(g_backend->cleaner, con); return 0;
backend = hub_malloc_zero(sizeof(struct net_backend_select));
FD_ZERO(&backend->rfds);
FD_ZERO(&backend->wfds);
backend->conns = hub_malloc_zero(sizeof(struct net_connection_select*) * common->max);
backend->common = common;
net_backend_set_handlers(handler);
return (struct net_backend*) backend;
}
static void net_backend_set_handlers(struct net_backend_handler* handler)
{
handler->backend_name = net_backend_name_select;
handler->backend_poll = net_backend_poll_select;
handler->backend_process = net_backend_process_select;
handler->backend_shutdown = net_backend_shutdown_select;
handler->con_create = net_con_create_select;
handler->con_init = net_con_initialize_select;
handler->con_add = net_con_backend_add_select;
handler->con_mod = net_con_backend_mod_select;
handler->con_del = net_con_backend_del_select;
} }
#endif /* USE_SELECT */ #endif /* USE_SELECT */

View File

@ -110,22 +110,18 @@
#ifdef __linux__ #ifdef __linux__
#define USE_EPOLL #define USE_EPOLL
#define HAVE_BACKEND
#include <sys/epoll.h> #include <sys/epoll.h>
#endif #endif
#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) #if defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__)
#define USE_KQUEUE #define USE_KQUEUE
#define HAVE_BACKEND
#include <sys/event.h> #include <sys/event.h>
#endif #endif
#ifndef HAVE_BACKEND
#define USE_SELECT #define USE_SELECT
#ifndef WINSOCK #ifndef WINSOCK
#include <sys/select.h> #include <sys/select.h>
#endif #endif
#endif
#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__sun__) #if defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__sun__)
#undef HAVE_STRNDUP #undef HAVE_STRNDUP