Merge branch 'event_queue_work'
This commit is contained in:
commit
08351ea8e7
@ -1,3 +1,6 @@
|
|||||||
|
0.2.7:
|
||||||
|
|
||||||
|
|
||||||
0.2.6:
|
0.2.6:
|
||||||
- Better "!uptime" command formatting.
|
- Better "!uptime" command formatting.
|
||||||
- Better "!stats"; can display peak and current bandwidth usage.
|
- Better "!stats"; can display peak and current bandwidth usage.
|
||||||
|
@ -936,6 +936,7 @@ int main(int argc, char** argv)
|
|||||||
parse_command_line(argc, argv);
|
parse_command_line(argc, argv);
|
||||||
|
|
||||||
net_initialize();
|
net_initialize();
|
||||||
|
event_init();
|
||||||
|
|
||||||
memset(&saddr, 0, sizeof(saddr));
|
memset(&saddr, 0, sizeof(saddr));
|
||||||
saddr.sin_family = AF_INET;
|
saddr.sin_family = AF_INET;
|
||||||
|
@ -35,9 +35,8 @@ int event_queue_initialize(struct event_queue** queue, event_queue_callback call
|
|||||||
|
|
||||||
(*queue)->q1 = list_create();
|
(*queue)->q1 = list_create();
|
||||||
(*queue)->q2 = list_create();
|
(*queue)->q2 = list_create();
|
||||||
(*queue)->event = (struct event*) hub_malloc_zero(sizeof(struct event));
|
|
||||||
|
|
||||||
if (!(*queue)->q1 || !(*queue)->q2 || !(*queue)->event)
|
if (!(*queue)->q1 || !(*queue)->q2)
|
||||||
{
|
{
|
||||||
list_destroy((*queue)->q1);
|
list_destroy((*queue)->q1);
|
||||||
list_destroy((*queue)->q2);
|
list_destroy((*queue)->q2);
|
||||||
@ -47,7 +46,6 @@ int event_queue_initialize(struct event_queue** queue, event_queue_callback call
|
|||||||
(*queue)->callback = callback;
|
(*queue)->callback = callback;
|
||||||
(*queue)->callback_data = ptr;
|
(*queue)->callback_data = ptr;
|
||||||
|
|
||||||
evtimer_set((*queue)->event, libevent_queue_process, *queue);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -57,12 +55,6 @@ void event_queue_shutdown(struct event_queue* queue)
|
|||||||
/* Should be empty at this point! */
|
/* Should be empty at this point! */
|
||||||
list_destroy(queue->q1);
|
list_destroy(queue->q1);
|
||||||
list_destroy(queue->q2);
|
list_destroy(queue->q2);
|
||||||
|
|
||||||
if (queue->event)
|
|
||||||
{
|
|
||||||
evtimer_del(queue->event);
|
|
||||||
hub_free(queue->event);
|
|
||||||
}
|
|
||||||
hub_free(queue);
|
hub_free(queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -135,12 +127,6 @@ void event_queue_post(struct event_queue* queue, struct event_data* message)
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
list_append(q, data);
|
list_append(q, data);
|
||||||
|
|
||||||
|
|
||||||
if (!queue->locked && queue->event)
|
|
||||||
{
|
|
||||||
libevent_queue_schedule(queue);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -154,19 +140,5 @@ size_t event_queue_size(struct event_queue* queue)
|
|||||||
return list_size(queue->q1) + list_size(queue->q2);
|
return list_size(queue->q1) + list_size(queue->q2);
|
||||||
}
|
}
|
||||||
|
|
||||||
void libevent_queue_schedule(struct event_queue* queue)
|
|
||||||
{
|
|
||||||
struct timeval zero = { 0, };
|
|
||||||
evtimer_add(queue->event, &zero);
|
|
||||||
}
|
|
||||||
|
|
||||||
void libevent_queue_process(int fd, short events, void* arg)
|
|
||||||
{
|
|
||||||
struct event_queue* queue = (struct event_queue*) arg;
|
|
||||||
if (event_queue_process(queue))
|
|
||||||
{
|
|
||||||
libevent_queue_schedule(queue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -36,7 +36,6 @@ struct event_queue
|
|||||||
struct linked_list* q2; /* secondary, when primary is locked */
|
struct linked_list* q2; /* secondary, when primary is locked */
|
||||||
event_queue_callback callback;
|
event_queue_callback callback;
|
||||||
void* callback_data;
|
void* callback_data;
|
||||||
struct event* event; /* libevent handle */
|
|
||||||
};
|
};
|
||||||
|
|
||||||
extern int event_queue_initialize(struct event_queue** queue, event_queue_callback callback, void* ptr);
|
extern int event_queue_initialize(struct event_queue** queue, event_queue_callback callback, void* ptr);
|
||||||
@ -45,11 +44,5 @@ extern void event_queue_shutdown(struct event_queue* queue);
|
|||||||
extern void event_queue_post(struct event_queue* queue, struct event_data* message);
|
extern void event_queue_post(struct event_queue* queue, struct event_data* message);
|
||||||
extern size_t event_queue_size(struct event_queue* queue);
|
extern size_t event_queue_size(struct event_queue* queue);
|
||||||
|
|
||||||
/**
|
|
||||||
* Only used internally with libevent.
|
|
||||||
*/
|
|
||||||
extern void libevent_queue_process(int fd, short events, void* arg);
|
|
||||||
extern void libevent_queue_schedule(struct event_queue* queue);
|
|
||||||
|
|
||||||
#endif /* HAVE_UHUB_EVENT_QUEUE_H */
|
#endif /* HAVE_UHUB_EVENT_QUEUE_H */
|
||||||
|
|
||||||
|
36
src/hub.c
36
src/hub.c
@ -479,11 +479,20 @@ struct hub_info* hub_start_service(struct hub_config* config)
|
|||||||
net_address_to_string(AF_INET6, &((struct sockaddr_in6*) &addr)->sin6_addr, address_buf, INET6_ADDRSTRLEN);
|
net_address_to_string(AF_INET6, &((struct sockaddr_in6*) &addr)->sin6_addr, address_buf, INET6_ADDRSTRLEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
hub->evbase = event_base_new();
|
||||||
|
if (!hub->evbase)
|
||||||
|
{
|
||||||
|
hub_log(log_error, "Unable to initialize libevent.");
|
||||||
|
hub_free(hub);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
hub_log(log_info, "Starting server, listening on %s:%d...", address_buf, config->server_port);
|
hub_log(log_info, "Starting server, listening on %s:%d...", address_buf, config->server_port);
|
||||||
|
|
||||||
server_tcp = net_socket_create(af, SOCK_STREAM, IPPROTO_TCP);
|
server_tcp = net_socket_create(af, SOCK_STREAM, IPPROTO_TCP);
|
||||||
if (server_tcp == -1)
|
if (server_tcp == -1)
|
||||||
{
|
{
|
||||||
|
event_base_free(hub->evbase);
|
||||||
hub_free(hub);
|
hub_free(hub);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -492,6 +501,7 @@ struct hub_info* hub_start_service(struct hub_config* config)
|
|||||||
server_udp = net_socket_create(af, SOCK_DGRAM, IPPROTO_UDP);
|
server_udp = net_socket_create(af, SOCK_DGRAM, IPPROTO_UDP);
|
||||||
if (server_udp == -1)
|
if (server_udp == -1)
|
||||||
{
|
{
|
||||||
|
event_base_free(hub->evbase);
|
||||||
hub_free(hub);
|
hub_free(hub);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -500,6 +510,7 @@ struct hub_info* hub_start_service(struct hub_config* config)
|
|||||||
ret = net_set_reuseaddress(server_tcp, 1);
|
ret = net_set_reuseaddress(server_tcp, 1);
|
||||||
if (ret == -1)
|
if (ret == -1)
|
||||||
{
|
{
|
||||||
|
event_base_free(hub->evbase);
|
||||||
hub_free(hub);
|
hub_free(hub);
|
||||||
net_close(server_tcp);
|
net_close(server_tcp);
|
||||||
#ifdef ADC_UDP_OPERATION
|
#ifdef ADC_UDP_OPERATION
|
||||||
@ -512,6 +523,7 @@ struct hub_info* hub_start_service(struct hub_config* config)
|
|||||||
ret = net_set_reuseaddress(server_udp, 1);
|
ret = net_set_reuseaddress(server_udp, 1);
|
||||||
if (ret == -1)
|
if (ret == -1)
|
||||||
{
|
{
|
||||||
|
event_base_free(hub->evbase);
|
||||||
hub_free(hub);
|
hub_free(hub);
|
||||||
net_close(server_tcp);
|
net_close(server_tcp);
|
||||||
net_close(server_udp);
|
net_close(server_udp);
|
||||||
@ -523,6 +535,7 @@ struct hub_info* hub_start_service(struct hub_config* config)
|
|||||||
ret = net_set_nonblocking(server_tcp, 1);
|
ret = net_set_nonblocking(server_tcp, 1);
|
||||||
if (ret == -1)
|
if (ret == -1)
|
||||||
{
|
{
|
||||||
|
event_base_free(hub->evbase);
|
||||||
hub_free(hub);
|
hub_free(hub);
|
||||||
net_close(server_tcp);
|
net_close(server_tcp);
|
||||||
#ifdef ADC_UDP_OPERATION
|
#ifdef ADC_UDP_OPERATION
|
||||||
@ -535,6 +548,7 @@ struct hub_info* hub_start_service(struct hub_config* config)
|
|||||||
ret = net_set_nonblocking(server_udp, 1);
|
ret = net_set_nonblocking(server_udp, 1);
|
||||||
if (ret == -1)
|
if (ret == -1)
|
||||||
{
|
{
|
||||||
|
event_base_free(hub->evbase);
|
||||||
hub_free(hub);
|
hub_free(hub);
|
||||||
net_close(server_tcp);
|
net_close(server_tcp);
|
||||||
net_close(server_udp);
|
net_close(server_udp);
|
||||||
@ -547,6 +561,7 @@ struct hub_info* hub_start_service(struct hub_config* config)
|
|||||||
if (ret == -1)
|
if (ret == -1)
|
||||||
{
|
{
|
||||||
hub_log(log_fatal, "hub_start_service(): Unable to bind to TCP local address. errno=%d, str=%s", net_error(), net_error_string(net_error()));
|
hub_log(log_fatal, "hub_start_service(): Unable to bind to TCP local address. errno=%d, str=%s", net_error(), net_error_string(net_error()));
|
||||||
|
event_base_free(hub->evbase);
|
||||||
hub_free(hub);
|
hub_free(hub);
|
||||||
net_close(server_tcp);
|
net_close(server_tcp);
|
||||||
#ifdef ADC_UDP_OPERATION
|
#ifdef ADC_UDP_OPERATION
|
||||||
@ -560,6 +575,7 @@ struct hub_info* hub_start_service(struct hub_config* config)
|
|||||||
if (ret == -1)
|
if (ret == -1)
|
||||||
{
|
{
|
||||||
hub_log(log_fatal, "hub_start_service(): Unable to bind to UDP local address. errno=%d, str=%s", net_error(), net_error_string(net_error()));
|
hub_log(log_fatal, "hub_start_service(): Unable to bind to UDP local address. errno=%d, str=%s", net_error(), net_error_string(net_error()));
|
||||||
|
event_base_free(hub->evbase);
|
||||||
hub_free(hub);
|
hub_free(hub);
|
||||||
net_close(server_tcp);
|
net_close(server_tcp);
|
||||||
net_close(server_udp);
|
net_close(server_udp);
|
||||||
@ -571,6 +587,7 @@ struct hub_info* hub_start_service(struct hub_config* config)
|
|||||||
if (ret == -1)
|
if (ret == -1)
|
||||||
{
|
{
|
||||||
hub_log(log_fatal, "hub_start_service(): Unable to listen to socket");
|
hub_log(log_fatal, "hub_start_service(): Unable to listen to socket");
|
||||||
|
event_base_free(hub->evbase);
|
||||||
hub_free(hub);
|
hub_free(hub);
|
||||||
net_close(server_tcp);
|
net_close(server_tcp);
|
||||||
#ifdef ADC_UDP_OPERATION
|
#ifdef ADC_UDP_OPERATION
|
||||||
@ -597,6 +614,7 @@ struct hub_info* hub_start_service(struct hub_config* config)
|
|||||||
}
|
}
|
||||||
|
|
||||||
event_set(&hub->ev_accept, hub->fd_tcp, EV_READ | EV_PERSIST, net_on_accept, hub);
|
event_set(&hub->ev_accept, hub->fd_tcp, EV_READ | EV_PERSIST, net_on_accept, hub);
|
||||||
|
event_base_set(hub->evbase, &hub->ev_accept);
|
||||||
if (event_add(&hub->ev_accept, NULL) == -1)
|
if (event_add(&hub->ev_accept, NULL) == -1)
|
||||||
{
|
{
|
||||||
user_manager_shutdown(hub);
|
user_manager_shutdown(hub);
|
||||||
@ -610,6 +628,7 @@ struct hub_info* hub_start_service(struct hub_config* config)
|
|||||||
|
|
||||||
#ifdef ADC_UDP_OPERATION
|
#ifdef ADC_UDP_OPERATION
|
||||||
event_set(&hub->ev_datagram, hub->fd_udp, EV_READ | EV_PERSIST, net_on_packet, hub);
|
event_set(&hub->ev_datagram, hub->fd_udp, EV_READ | EV_PERSIST, net_on_packet, hub);
|
||||||
|
event_base_set(hub->evbase, &hub->ev_datagram);
|
||||||
if (event_add(&hub->ev_datagram, NULL) == -1)
|
if (event_add(&hub->ev_datagram, NULL) == -1)
|
||||||
{
|
{
|
||||||
user_manager_shutdown(hub);
|
user_manager_shutdown(hub);
|
||||||
@ -650,6 +669,7 @@ void hub_shutdown_service(struct hub_info* hub)
|
|||||||
net_close(hub->fd_tcp);
|
net_close(hub->fd_tcp);
|
||||||
user_manager_shutdown(hub);
|
user_manager_shutdown(hub);
|
||||||
hub->status = hub_status_stopped;
|
hub->status = hub_status_stopped;
|
||||||
|
event_base_free(hub->evbase);
|
||||||
hub_free(hub);
|
hub_free(hub);
|
||||||
hub = 0;
|
hub = 0;
|
||||||
}
|
}
|
||||||
@ -950,3 +970,19 @@ size_t hub_get_min_hubs_op(struct hub_info* hub)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void hub_event_loop(struct hub_info* hub)
|
||||||
|
{
|
||||||
|
#if 0
|
||||||
|
event_dispatch();
|
||||||
|
#endif
|
||||||
|
int ret;
|
||||||
|
do
|
||||||
|
{
|
||||||
|
ret = event_base_loop(hub->evbase, EVLOOP_ONCE);
|
||||||
|
if (ret != 0)
|
||||||
|
break;
|
||||||
|
|
||||||
|
event_queue_process(hub->queue);
|
||||||
|
}
|
||||||
|
while (hub->status == hub_status_running);
|
||||||
|
}
|
||||||
|
@ -92,6 +92,7 @@ struct hub_info
|
|||||||
#endif
|
#endif
|
||||||
struct hub_stats stats;
|
struct hub_stats stats;
|
||||||
struct event_queue* queue;
|
struct event_queue* queue;
|
||||||
|
struct event_base* evbase;
|
||||||
struct hub_config* config;
|
struct hub_config* config;
|
||||||
struct user_manager* users;
|
struct user_manager* users;
|
||||||
struct acl_handle* acl;
|
struct acl_handle* acl;
|
||||||
@ -334,6 +335,10 @@ extern size_t hub_get_max_hubs_total(struct hub_info* hub);
|
|||||||
*/
|
*/
|
||||||
extern void hub_schedule_runslice(struct hub_info* hub);
|
extern void hub_schedule_runslice(struct hub_info* hub);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run event loop.
|
||||||
|
*/
|
||||||
|
extern void hub_event_loop(struct hub_info* hub);
|
||||||
|
|
||||||
|
|
||||||
#endif /* HAVE_UHUB_HUB_H */
|
#endif /* HAVE_UHUB_HUB_H */
|
||||||
|
@ -100,6 +100,7 @@ void setup_signal_handlers(struct hub_info* hub)
|
|||||||
for (i = 0; signals[i]; i++)
|
for (i = 0; signals[i]; i++)
|
||||||
{
|
{
|
||||||
signal_set(&signal_events[i], signals[i], hub_handle_signal, hub);
|
signal_set(&signal_events[i], signals[i], hub_handle_signal, hub);
|
||||||
|
event_base_set(hub->evbase, &signal_events[i]);
|
||||||
if (signal_add(&signal_events[i], NULL))
|
if (signal_add(&signal_events[i], NULL))
|
||||||
{
|
{
|
||||||
hub_log(log_error, "Error setting signal handler %d", signals[i]);
|
hub_log(log_error, "Error setting signal handler %d", signals[i]);
|
||||||
@ -153,7 +154,7 @@ int main_loop()
|
|||||||
|
|
||||||
hub_set_variables(hub, &acl);
|
hub_set_variables(hub, &acl);
|
||||||
|
|
||||||
event_dispatch();
|
hub_event_loop(hub);
|
||||||
|
|
||||||
hub_free_variables(hub);
|
hub_free_variables(hub);
|
||||||
acl_shutdown(&acl);
|
acl_shutdown(&acl);
|
||||||
|
@ -291,7 +291,10 @@ void net_on_accept(int server_fd, short ev, void *arg)
|
|||||||
|
|
||||||
event_set(user->ev_read, fd, EV_READ | EV_PERSIST, net_on_read, user);
|
event_set(user->ev_read, fd, EV_READ | EV_PERSIST, net_on_read, user);
|
||||||
event_set(user->ev_write, fd, EV_WRITE, net_on_write, user);
|
event_set(user->ev_write, fd, EV_WRITE, net_on_write, user);
|
||||||
|
event_base_set(hub->evbase, user->ev_read);
|
||||||
|
event_base_set(hub->evbase, user->ev_write);
|
||||||
event_add(user->ev_read, &timeout);
|
event_add(user->ev_read, &timeout);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,7 +23,6 @@ static int is_ipv6_supported = -1; /* -1 = CHECK, 0 = NO, 1 = YES */
|
|||||||
static int net_initialized = 0;
|
static int net_initialized = 0;
|
||||||
static struct net_statistics stats;
|
static struct net_statistics stats;
|
||||||
static struct net_statistics stats_total;
|
static struct net_statistics stats_total;
|
||||||
static struct event_base* evbase;
|
|
||||||
|
|
||||||
#if defined(IPV6_BINDV6ONLY)
|
#if defined(IPV6_BINDV6ONLY)
|
||||||
#define SOCK_DUAL_STACK_OPT IPV6_BINDV6ONLY
|
#define SOCK_DUAL_STACK_OPT IPV6_BINDV6ONLY
|
||||||
@ -53,16 +52,6 @@ int net_initialize()
|
|||||||
/* FIXME: Initialize OpenSSL here. */
|
/* FIXME: Initialize OpenSSL here. */
|
||||||
#endif /* SSL_SUPPORT */
|
#endif /* SSL_SUPPORT */
|
||||||
|
|
||||||
#ifdef OLD_LIBEVENT
|
|
||||||
event_init();
|
|
||||||
#else
|
|
||||||
evbase = event_init();
|
|
||||||
if (!evbase)
|
|
||||||
{
|
|
||||||
hub_log(log_error, "Unable to initialize libevent.");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
net_initialized = 1;
|
net_initialized = 1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -80,11 +69,6 @@ int net_shutdown()
|
|||||||
/* FIXME: Shutdown OpenSSL here. */
|
/* FIXME: Shutdown OpenSSL here. */
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifndef OLD_LIBEVENT
|
|
||||||
event_base_free(evbase);
|
|
||||||
#endif
|
|
||||||
evbase = 0;
|
|
||||||
|
|
||||||
#ifdef WINSOCK
|
#ifdef WINSOCK
|
||||||
WSACleanup();
|
WSACleanup();
|
||||||
#endif
|
#endif
|
||||||
|
@ -74,6 +74,7 @@ static void timer_statistics(int fd, short ev, void *arg)
|
|||||||
struct timeval timeout = { TIMEOUT_STATS, 0 };
|
struct timeval timeout = { TIMEOUT_STATS, 0 };
|
||||||
user_manager_update_stats(hub);
|
user_manager_update_stats(hub);
|
||||||
evtimer_set(&hub->ev_timer, timer_statistics, hub);
|
evtimer_set(&hub->ev_timer, timer_statistics, hub);
|
||||||
|
event_base_set(hub->evbase, &hub->ev_timer);
|
||||||
evtimer_add(&hub->ev_timer, &timeout);
|
evtimer_add(&hub->ev_timer, &timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -97,6 +98,7 @@ int user_manager_init(struct hub_info* hub)
|
|||||||
hub->users = users;
|
hub->users = users;
|
||||||
|
|
||||||
evtimer_set(&hub->ev_timer, timer_statistics, hub);
|
evtimer_set(&hub->ev_timer, timer_statistics, hub);
|
||||||
|
event_base_set(hub->evbase, &hub->ev_timer);
|
||||||
evtimer_add(&hub->ev_timer, &timeout);
|
evtimer_add(&hub->ev_timer, &timeout);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user