Large reorganizations of the code base.

This commit is contained in:
Jan Vidar Krey 2009-05-26 19:46:51 +02:00
parent 9706a0a501
commit 8167d79f5a
15 changed files with 386 additions and 324 deletions

View File

@ -156,6 +156,8 @@ uhub_SOURCES := src/main.c
adcrush_SOURCES := src/adcrush.c
admin_SOURCES := src/admin.c
uhub_HEADERS := \
src/adcconst.h \
src/auth.h \
@ -200,12 +202,14 @@ autotest_OBJECTS = autotest.o
libuhub_OBJECTS := $(libuhub_SOURCES:.c=.o)
uhub_OBJECTS := $(uhub_SOURCES:.c=.o)
adcrush_OBJECTS := $(adcrush_SOURCES:.c=.o)
admin_OBJECTS := $(admin_SOURCES:.c=.o)
all_OBJECTS := $(libuhub_OBJECTS) $(uhub_OBJECTS) $(adcrush_OBJECTS) $(autotest_OBJECTS)
all_OBJECTS := $(libuhub_OBJECTS) $(uhub_OBJECTS) $(adcrush_OBJECTS) $(autotest_OBJECTS) $(admin_OBJECTS)
LIBUHUB=libuhub.a
uhub_BINARY=uhub$(BIN_EXT)
adcrush_BINARY=adcrush$(BIN_EXT)
admin_BINARY=uhub-admin$(BIN_EXT)
autotest_BINARY=autotest/test$(BIN_EXT)
%.o: %.c
@ -218,6 +222,10 @@ $(adcrush_BINARY): $(PCH) $(LIBUHUB) $(adcrush_OBJECTS)
$(MSG_LD) $(CC) -o $@.tmp $(adcrush_OBJECTS) $(LIBUHUB) $(LDFLAGS) $(LDLIBS) && \
$(MV) $@.tmp $@
$(admin_BINARY): $(PCH) $(LIBUHUB) $(admin_OBJECTS)
$(MSG_LD) $(CC) -o $@.tmp $(admin_OBJECTS) $(LIBUHUB) $(LDFLAGS) $(LDLIBS) && \
$(MV) $@.tmp $@
$(uhub_BINARY): $(PCH) $(LIBUHUB) $(uhub_OBJECTS)
$(MSG_LD) $(CC) -o $@.tmp $(uhub_OBJECTS) $(LIBUHUB) $(LDFLAGS) $(LDLIBS) && \
$(MV) $@.tmp $@

View File

@ -498,7 +498,7 @@ const char* acl_password_generate_challenge(struct acl_handle* acl, struct user*
uint64_t tiger_res[3];
static char tiger_buf[MAX_CID_LEN+1];
snprintf(buf, 32, "%d%d%d", (int) user->tm_connected, (int) user->id.sid, (int) user->sd);
snprintf(buf, 32, "%d%d%d", (int) user->net.tm_connected, (int) user->id.sid, (int) user->net.sd);
tiger((uint64_t*) buf, strlen(buf), (uint64_t*) tiger_res);
base32_encode((unsigned char*) tiger_res, TIGERSIZE, tiger_buf);

View File

@ -178,7 +178,7 @@ static int command_version(struct hub_info* hub, struct user* user, const char*
static int command_myip(struct hub_info* hub, struct user* user, const char* message)
{
char tmp[128];
snprintf(tmp, 128, "Your IP is \"%s\"", ip_convert_to_string(&user->ipaddr));
snprintf(tmp, 128, "Your IP is \"%s\"", ip_convert_to_string(&user->net.ipaddr));
return command_status(hub, user, "myip", tmp);
}

View File

@ -21,6 +21,8 @@
struct hub_info* g_hub = 0;
#define NETWORK_DUMP_DEBUG 1
int hub_handle_message(struct hub_info* hub, struct user* u, const char* line, size_t length)
{
int ret = 0;
@ -144,8 +146,8 @@ int hub_handle_support(struct hub_info* hub, struct user* u, struct adc_message*
if (ok)
{
hub_send_handshake(hub, u);
if (u->ev_read)
event_add(u->ev_read, &timeout);
if (u->net.ev_read)
event_add(u->net.ev_read, &timeout);
}
else
{
@ -912,15 +914,15 @@ void hub_disconnect_user(struct hub_info* hub, struct user* user, int reason)
/* dont read more data from this user */
/* FIXME: Remove this from here! */
if (user->ev_read)
if (user->net.ev_read)
{
event_del(user->ev_read);
hub_free(user->ev_read);
user->ev_read = 0;
event_del(user->net.ev_read);
hub_free(user->net.ev_read);
user->net.ev_read = 0;
}
/* this should be enough? */
net_shutdown_r(user->sd);
net_shutdown_r(user->net.sd);
hub_log(log_trace, "hub_disconnect_user(), user=%p, reason=%d, state=%d", user, reason, user->state);

View File

@ -22,26 +22,26 @@
static void log_user_login(struct user* u)
{
const char* cred = get_user_credential_string(u->credentials);
const char* addr = ip_convert_to_string(&u->ipaddr);
const char* addr = ip_convert_to_string(&u->net.ipaddr);
hub_log(log_user, "LoginOK %s/%s %s \"%s\" (%s) \"%s\"", sid_to_string(u->id.sid), u->id.cid, addr, u->id.nick, cred, u->user_agent);
}
static void log_user_login_error(struct user* u, enum status_message msg)
{
const char* addr = ip_convert_to_string(&u->ipaddr);
const char* addr = ip_convert_to_string(&u->net.ipaddr);
const char* message = hub_get_status_message_log(u->hub, msg);
hub_log(log_user, "LoginError %s/%s %s \"%s\" (%s) \"%s\"", sid_to_string(u->id.sid), u->id.cid, addr, u->id.nick, message, u->user_agent);
}
static void log_user_logout(struct user* u, const char* message)
{
const char* addr = ip_convert_to_string(&u->ipaddr);
const char* addr = ip_convert_to_string(&u->net.ipaddr);
hub_log(log_user, "Logout %s/%s %s \"%s\" (%s)", sid_to_string(u->id.sid), u->id.cid, addr, u->id.nick, message);
}
static void log_user_nick_change(struct user* u, const char* nick)
{
const char* addr = ip_convert_to_string(&u->ipaddr);
const char* addr = ip_convert_to_string(&u->net.ipaddr);
hub_log(log_user, "NickChange %s/%s %s \"%s\" -> \"%s\"", sid_to_string(u->id.sid), u->id.cid, addr, u->id.nick, nick);
}
@ -71,8 +71,8 @@ void on_login_success(struct hub_info* hub, struct user* u)
hub_send_motd(hub, u);
/* reset to idle timeout */
if (u->ev_read)
event_add(u->ev_read, &timeout);
if (u->net.ev_read)
event_add(u->net.ev_read, &timeout);
}
void on_login_failure(struct hub_info* hub, struct user* u, enum status_message msg)

View File

@ -21,71 +21,138 @@
#include "hubio.h"
struct hub_iobuf* hub_iobuf_create(size_t max_size)
struct hub_recvq* hub_recvq_create()
{
struct hub_iobuf* buf = hub_malloc_zero(sizeof(struct hub_iobuf));
if (buf)
{
buf->buf = hub_malloc(max_size);
buf->capacity = max_size;
}
return buf;
struct hub_recvq* q = hub_malloc_zero(sizeof(struct hub_recvq));
q->buf = hub_malloc(MAX_RECV_BUF);
return q;
}
void hub_iobuf_destroy(struct hub_iobuf* buf)
void hub_recvq_destroy(struct hub_recvq* q)
{
if (buf)
if (q)
{
hub_free(buf->buf);
hub_free(buf);
hub_free(q->buf);
hub_free(q);
}
}
int hub_iobuf_recv(struct hub_iobuf* buf, hub_iobuf_read r, void* data)
size_t hub_recvq_get(struct hub_recvq* q, void* buf, size_t bufsize)
{
int size = r(data, &buf->buf[buf->offset], buf->capacity - buf->offset);
if (size > 0)
assert(bufsize >= q->size);
if (q->size)
{
buf->size += size;
size_t n = q->size;
memcpy(buf, q->buf, n);
hub_free(q->buf);
q->buf = 0;
q->size = 0;
return n;
}
return size;
return 0;
}
int hub_iobuf_send(struct hub_iobuf* buf, hub_iobuf_write w, void* data)
size_t hub_recvq_set(struct hub_recvq* q, void* buf, size_t bufsize)
{
int size = w(data, &buf->buf[buf->offset], buf->size - buf->offset);
if (size > 0)
if (!bufsize)
return 0;
if (q->buf)
{
buf->offset += size;
}
return size;
}
char* hub_iobuf_getline(struct hub_iobuf* buf, size_t* offset, size_t* len, size_t max_size)
{
size_t x = *offset;
char* pos = memchr(&buf->buf[x], '\n', (buf->size - x));
if (pos)
{
*len = &pos[0] - &buf->buf[x];
pos[0] = '\0';
pos = &buf->buf[x];
(*offset) += (*len + 1);
}
return pos;
}
void hub_iobuf_remove(struct hub_iobuf* buf, size_t n)
{
assert(buf);
assert(n <= buf->size);
buf->offset = 0;
if (n == buf->size)
{
buf->size = 0;
hub_free(q->buf);
q->buf = 0;
q->size = 0;
}
q->buf = hub_malloc(bufsize);
if (!q->buf)
return 0;
q->size = bufsize;
memcpy(q->buf, buf, bufsize);
return bufsize;
}
struct hub_sendq* hub_sendq_create()
{
struct hub_sendq* q = hub_malloc_zero(sizeof(struct hub_sendq));
if (!q)
return 0;
q->queue = list_create();
if (!q->queue)
{
hub_free(q);
return 0;
}
return q;
}
static void clear_send_queue_callback(void* ptr)
{
adc_msg_free((struct adc_message*) ptr);
}
void hub_sendq_destroy(struct hub_sendq* q)
{
if (q)
{
list_clear(q->queue, &clear_send_queue_callback);
list_destroy(q->queue);
hub_free(q);
}
}
void hub_sendq_add(struct hub_sendq* q, struct adc_message* msg_)
{
struct adc_message* msg = adc_msg_incref(msg_);
list_append(q->queue, msg);
q->size += msg->length;
}
void hub_sendq_remove(struct hub_sendq* q, struct adc_message* msg)
{
list_remove(q->queue, msg);
adc_msg_free(msg);
q->size -= msg->length;
q->offset = 0;
}
int hub_sendq_send(struct hub_sendq* q, hub_recvq_write w, void* data)
{
int ret = 0;
int bytes_sent = 0;
struct adc_message* msg = list_get_first(q->queue);
while (msg)
{
size_t len = msg->length - q->offset;
ret = w(data, &msg->cache[q->offset], len);
if (ret <= 0) break;
q->offset += ret;
bytes_sent += ret;
if (q->offset < msg->length)
break;
hub_sendq_remove(q, msg);
msg = list_get_first(q->queue);
}
return bytes_sent;
}
int hub_sendq_is_empty(struct hub_sendq* q)
{
return q->size == 0;
}
size_t hub_sendq_get_bytes(struct hub_sendq* q)
{
return q->size - q->offset;
}

View File

@ -20,53 +20,85 @@
#ifndef HAVE_UHUB_HUB_IO_H
#define HAVE_UHUB_HUB_IO_H
/*
* Used as a basis for receive queue, and send queue.
*/
struct hub_iobuf
struct adc_message;
struct linked_list;
typedef int (*hub_recvq_write)(void* desc, const void* buf, size_t len);
typedef int (*hub_recvq_read)(void* desc, void* buf, size_t len);
struct hub_sendq
{
char* buf;
size_t offset;
size_t size;
size_t capacity;
size_t size; /** Size of send queue (in bytes, not messages) */
size_t offset; /** Queue byte offset in the first message. Should be 0 unless a partial write. */
struct linked_list* queue; /** List of queued messages */
};
typedef int (*hub_iobuf_write)(void* desc, const void* buf, size_t len);
typedef int (*hub_iobuf_read)(void* desc, void* buf, size_t len);
struct hub_recvq
{
char* buf;
size_t size;
};
/**
* Create a send queue
*/
extern struct hub_sendq* hub_sendq_create();
/**
* Destroy a send queue, and delete any queued messages.
*/
extern void hub_sendq_destroy(struct hub_sendq*);
/**
* Add a message to the send queue.
*/
extern void hub_sendq_add(struct hub_sendq*, struct adc_message* msg);
/**
* Process the send queue, and send as many messages as possible.
* @returns the number of bytes sent.
* FIXME: send error not handled here!
*/
extern int hub_sendq_send(struct hub_sendq*, hub_recvq_write, void* data);
/**
* @returns 1 if send queue is empty, 0 otherwise.
*/
extern int hub_sendq_is_empty(struct hub_sendq*);
/**
* @returns the number of bytes remaining to be sent in the queue.
*/
extern size_t hub_sendq_get_bytes(struct hub_sendq*);
/**
* Create and initialize a io buffer
* Create a receive queue.
*/
extern struct hub_iobuf* hub_iobuf_create(size_t max_size);
extern struct hub_recvq* hub_recvq_create();
/**
* Destroy an io buffer.
* Destroy a receive queue.
*/
extern void hub_iobuf_destroy(struct hub_iobuf*);
extern void hub_recvq_destroy(struct hub_recvq*);
/**
* net_read() from a socket descriptor into a buffer.
* @return value from net_recv()
* Gets the buffer, copies it into buf and deallocates it.
* NOTE: bufsize *MUST* be larger than the buffer, otherwise it asserts.
* @return the number of bytes copied into buf.
*/
extern int hub_iobuf_recv(struct hub_iobuf*, hub_iobuf_read, void* data);
extern size_t hub_recvq_get(struct hub_recvq*, void* buf, size_t bufsize);
/**
* net_send() data from a buffer to a socket descriptor.
* @return value from net_send()
* Sets the buffer
*/
extern int hub_iobuf_send(struct hub_iobuf*, hub_iobuf_write, void* data);
extern size_t hub_recvq_set(struct hub_recvq*, void* buf, size_t bufsize);
/**
* Get a line from the buffer
* @return 1 if size is zero, 0 otherwise.
*/
extern char* hub_iobuf_getline(struct hub_iobuf*, size_t* offset, size_t* length, size_t max_size);
extern int hub_recvq_is_empty(struct hub_recvq* buf);
/**
* Removes the first 'n' bytes from the buffer.
* This will reset the offset and size parameters.
*/
extern void hub_iobuf_remove(struct hub_iobuf* buf, size_t n);
#endif /* HAVE_UHUB_HUB_IO_H */

View File

@ -186,7 +186,7 @@ static int check_required_login_flags(struct hub_info* hub, struct user* user, s
*/
int check_network(struct hub_info* hub, struct user* user, struct adc_message* cmd)
{
const char* address = ip_convert_to_string(&user->ipaddr);
const char* address = ip_convert_to_string(&user->net.ipaddr);
/* Check for NAT override address */
if (acl_is_ip_nat_override(hub->acl, address))

View File

@ -26,32 +26,77 @@ extern struct hub_info* g_hub;
int net_user_send(void* ptr, const void* buf, size_t len)
{
struct user* user = (struct user*) ptr;
int ret = net_send(user->sd, buf, len, UHUB_SEND_SIGNAL);
printf("net_user_send: %d/%d bytes\n", ret, (int) len);
if (ret == -1)
{
printf(" errno: %d - %s\n", errno, strerror(errno));
}
int ret = net_send(user->net.sd, buf, len, UHUB_SEND_SIGNAL);
if (ret > 0)
{
user->net.tm_last_write = time(NULL);
}
else if (ret == -1 && net_error() == EWOULDBLOCK)
{
return -2;
}
else
{
// user->close_flag = quit_socket_error;
return 0;
}
return ret;
}
int net_user_recv(void* ptr, void* buf, size_t len)
{
struct user* user = (struct user*) ptr;
int ret = net_recv(user->sd, buf, len, 0);
int ret = net_recv(user->net.sd, buf, len, 0);
hub_log(log_debug, "net_user_recv: sd=%d, len=%d/%d", user->net.sd, ret, (int) len);
if (ret > 0)
{
user->net.tm_last_read = time(NULL);
}
#ifdef DEBUG_SENDQ
printf("net_user_recv: %d/%d bytes\n", ret, (int) len);
if (ret == -1)
{
printf(" errno: %d - %s\n", errno, strerror(errno));
}
#endif
return ret;
}
/**
* @param buf buffer to extract line from
* @param bufsize size of buffer
* @param offset (in/out) offset into buffer
* @param len (out) length of the line returned
* @param max_size maximum length of line, if line is longer, it is discarded.
*
* @return line from buffer, or NULL if no line can be returned.
*/
static char* extract_line(char* buf, size_t bufsize, size_t* offset, size_t* len, size_t max_size)
{
size_t x = *offset;
char* pos = memchr(&buf[x], '\n', (bufsize - x));
if (pos)
{
*len = &pos[0] - &buf[x];
pos[0] = '\0';
pos = &buf[x];
(*offset) += (*len + 1);
}
return pos;
}
void net_on_read(int fd, short ev, void *arg)
{
static char buf[MAX_RECV_BUF];
struct user* user = (struct user*) arg;
struct hub_recvq* q = user->net.recv_queue;
size_t buf_size;
int more = 1;
int flag_close = 0;
@ -71,9 +116,16 @@ void net_on_read(int fd, short ev, void *arg)
}
}
buf_size = hub_recvq_get(q, buf, MAX_RECV_BUF);
for (;;)
{
ssize_t size = hub_iobuf_recv(user->recv_buf, net_user_recv, user);
int size = net_user_recv(user, &buf[buf_size], MAX_RECV_BUF - buf_size);
if (size > 0)
{
buf_size += size;
}
if (size == -1)
{
if (net_error() != EWOULDBLOCK)
@ -91,15 +143,16 @@ void net_on_read(int fd, short ev, void *arg)
size_t length;
char* line = 0;
while ((line = hub_iobuf_getline(user->recv_buf, &offset, &length, g_hub->config->max_recv_buffer)))
while ((line = extract_line(buf, buf_size, &offset, &length, g_hub->config->max_recv_buffer)))
{
puts(line);
if (hub_handle_message(g_hub, user, line, length) == -1)
{
flag_close = quit_protocol_error;
break;
}
}
hub_iobuf_remove(user->recv_buf, offset);
hub_recvq_set(q, buf+offset, buf_size);
}
}
@ -111,18 +164,18 @@ void net_on_read(int fd, short ev, void *arg)
if (user_is_logged_in(user))
{
if (user->ev_read)
if (user->net.ev_read)
{
struct timeval timeout = { TIMEOUT_IDLE, 0 };
event_add(user->ev_read, &timeout);
event_add(user->net.ev_read, &timeout);
}
}
else if (user_is_connecting(user))
{
if (user->ev_read)
if (user->net.ev_read)
{
struct timeval timeout = { TIMEOUT_HANDSHAKE, 0 };
event_add(user->ev_read, &timeout);
event_add(user->net.ev_read, &timeout);
}
}
}
@ -131,89 +184,27 @@ void net_on_read(int fd, short ev, void *arg)
void net_on_write(int fd, short ev, void *arg)
{
struct user* user = (struct user*) arg;
struct adc_message* msg;
int ret;
int length;
int close_flag = 0;
int sent = 0;
msg = list_get_first(user->send_queue);
while (msg)
for (;;)
{
length = msg->length - user->send_queue_offset;
ret = net_send(user->sd, &msg->cache[user->send_queue_offset], length, UHUB_SEND_SIGNAL);
if (ret == 0 || (ret == -1 && net_error() == EWOULDBLOCK))
{
close_flag = 0;
break;
}
else if (ret > 0)
{
user->tm_last_write = time(NULL);
if (ret == length)
{
#ifdef DEBUG_SENDQ
hub_log(log_error, "SENDQ: sent=%d bytes/%d (all), send_queue_size=%d, offset=%d", ret, (int) msg->length, user->send_queue_size, user->send_queue_offset);
#endif
user->send_queue_size -= ret;
user->send_queue_offset = 0;
#ifdef DEBUG_SENDQ
if ((user->send_queue_size < 0) || (user->send_queue_offset < 0))
{
hub_log(log_error, "INVALID: send_queue_size=%d, send_queue_offset=%d", user->send_queue_size, user->send_queue_offset);
}
#endif
list_remove(user->send_queue, msg);
if (user_flag_get(user, flag_user_list) && (msg == user->info || user->send_queue_size == 0))
{
user_flag_unset(user, flag_user_list);
}
adc_msg_free(msg);
msg = 0;
if (user->send_queue_size == 0)
break;
}
else
{
#ifdef DEBUG_SENDQ
hub_log(log_error, "SENDQ: sent=%d bytes/%d (part), send_queue_size=%d, offset=%d", ret, (int) msg->length, user->send_queue_size, user->send_queue_offset);
#endif
user->send_queue_size -= ret;
user->send_queue_offset += ret;
#ifdef DEBUG_SENDQ
if ((user->send_queue_size < 0) || (user->send_queue_offset < 0) || (user->send_queue_offset > msg->length))
{
hub_log(log_error, "INVALID: send_queue_size=%d, send_queue_offset=%d", user->send_queue_size, user->send_queue_offset);
}
#endif
break;
}
}
int ret = hub_sendq_send(user->net.send_queue, net_user_send, user);
if (ret > 0)
sent += ret;
else
{
close_flag = quit_socket_error;
break;
}
msg = list_get_first(user->send_queue);
}
#if 0
if (close_flag)
{
hub_disconnect_user(g_hub, user, close_flag);
}
else
#endif
if (hub_sendq_get_bytes(user->net.send_queue))
{
if (user->send_queue_size > 0 && user->ev_write)
event_add(user->ev_write, NULL);
user_want_write(user);
}
}
@ -264,16 +255,16 @@ void net_on_accept(int server_fd, short ev, void *arg)
}
/* Store IP address in user object */
memcpy(&user->ipaddr, &ipaddr, sizeof(ipaddr));
memcpy(&user->net.ipaddr, &ipaddr, sizeof(ipaddr));
net_set_nonblocking(fd, 1);
net_set_nosigpipe(fd, 1);
event_set(user->ev_read, fd, EV_READ | EV_PERSIST, net_on_read, user);
event_set(user->ev_write, fd, EV_WRITE, net_on_write, user);
event_base_set(hub->evbase, user->ev_read);
event_base_set(hub->evbase, user->ev_write);
event_add(user->ev_read, &timeout);
event_set(user->net.ev_read, fd, EV_READ | EV_PERSIST, net_on_read, user);
event_set(user->net.ev_write, fd, EV_WRITE, net_on_write, user);
event_base_set(hub->evbase, user->net.ev_read);
event_base_set(hub->evbase, user->net.ev_write);
event_add(user->net.ev_read, &timeout);
}
}

View File

@ -40,5 +40,8 @@ extern void net_on_read_timeout(int fd, short ev, void* arg);
*/
extern void net_on_accept(int fd, short ev, void *arg);
#endif /* HAVE_UHUB_NET_EVENT_H */

View File

@ -30,6 +30,17 @@ struct net_statistics
size_t errors;
};
struct net_socket_t;
#define NET_WANT_READ 0x01
#define NET_WANT_WRITE 0x02
#define NET_WANT_ACCEPT 0x08
#define NET_WANT_SSL_READ 0x10
#define NET_WANT_SSL_WRITE 0x20
#define NET_WANT_SSL_ACCEPT 0x40
#define NET_WANT_SSL_CONNECT 0x40
#define NET_WANT_SSL_X509_LOOKUP 0x80
/**
* Initialize the socket monitor subsystem.
* On some operating systems this will also involve loading the TCP/IP stack
@ -214,11 +225,6 @@ extern void net_stats_get(struct net_statistics** intermediate, struct net_stati
#if defined(WINSOCK) && !defined(__CYGWIN__)
// #define EINTR WSAEINTR
// #define EACCES WSAEACCES
// #define EFAULT WSAEFAULT
// #define EINVAL WSAEINVAL
// #define EMFILE WSAEMFILE
#define EWOULDBLOCK WSAEWOULDBLOCK
#define EINPROGRESS WSAEINPROGRESS
#define EALREADY WSAEALREADY
@ -247,10 +253,8 @@ extern void net_stats_get(struct net_statistics** intermediate, struct net_stati
#define ETIMEDOUT WSAETIMEDOUT
#define ECONNREFUSED WSAECONNREFUSED
#define ELOOP WSAELOOP
// #define ENAMETOOLONG WSAENAMETOOLONG
#define EHOSTDOWN WSAEHOSTDOWN
#define EHOSTUNREACH WSAEHOSTUNREACH
// #define ENOTEMPTY WSAENOTEMPTY
#define EPROCLIM WSAEPROCLIM
#define EUSERS WSAEUSERS
#define EDQUOT WSAEDQUOT

View File

@ -58,23 +58,6 @@ int route_message(struct hub_info* hub, struct user* u, struct adc_message* msg)
}
static void queue_command(struct user* user, struct adc_message* msg__, int offset)
{
struct adc_message* msg = adc_msg_incref(msg__);
list_append(user->send_queue, msg);
#ifdef DEBUG_SENDQ
hub_log(log_trace, "SENDQ: user=%p, msg=%p (%zu), offset=%d, length=%d, total_length=%d", user, msg, msg->references, offset, msg->length, user->send_queue_size);
#endif
user->send_queue_size += msg->length - offset;
if (list_size(user->send_queue) == 1)
{
user->send_queue_offset = offset;
user->tm_last_write = time(NULL);
}
}
// #define ALWAYS_QUEUE_MESSAGES
static size_t get_max_send_queue(struct hub_info* hub)
{
@ -84,13 +67,12 @@ static size_t get_max_send_queue(struct hub_info* hub)
return hub->config->max_send_buffer;
}
#if 0
static size_t get_max_send_queue_soft(struct hub_info* hub)
{
return hub->config->max_send_buffer_soft;
}
/*
* @return 1 if send queue is OK.
* -1 if send queue is overflowed
@ -101,76 +83,40 @@ static int check_send_queue(struct user* user, struct adc_message* msg)
if (user_flag_get(user, flag_user_list))
return 1;
if ((user->send_queue_size + msg->length) > get_max_send_queue(user->hub))
if ((user->net.send_queue->size + msg->length) > get_max_send_queue(user->hub))
return -1;
if (user->send_queue_size > get_max_send_queue_soft(user->hub) && msg->priority < 0)
if (user->net.send_queue->size > get_max_send_queue_soft(user->hub) && msg->priority < 0)
return 0;
return 1;
}
#endif
int route_to_user(struct hub_info* hub, struct user* user, struct adc_message* msg)
{
int ret;
#if LOG_SEND_MESSAGES_WHEN_ROUTED
char* data = strndup(msg->cache, msg->length-1);
hub_log(log_protocol, "send %s: %s", sid_to_string(user->sid), data);
free(data);
#endif
#ifndef ALWAYS_QUEUE_MESSAGES
if (user->send_queue_size == 0 && !user_is_disconnecting(user))
int empty = hub_sendq_is_empty(user->net.send_queue);
hub_sendq_add(user->net.send_queue, msg);
if (empty)
{
ret = net_send(user->sd, msg->cache, msg->length, UHUB_SEND_SIGNAL);
if (ret == msg->length)
{
return 1;
}
if (ret >= 0 || (ret == -1 && net_error() == EWOULDBLOCK))
{
queue_command(user, msg, ret);
if (user->send_queue_size && user->ev_write)
event_add(user->ev_write, NULL);
}
else
{
/* A socket error occured */
hub_disconnect_user(hub, user, quit_socket_error);
return 0;
}
// try oportunistic write
}
else
#endif
{
ret = check_send_queue(user, msg);
if (ret == -1)
{
/* User is not able to swallow the data, let's cut our losses and disconnect. */
hub_disconnect_user(hub, user, quit_send_queue);
}
else if (ret == 1)
{
/* queue command */
queue_command(user, msg, 0);
if (user->ev_write)
event_add(user->ev_write, NULL);
}
else
{
/* do not queue command as our soft-limits are exceeded */
}
user_want_write(user);
}
return 1;
}
int route_to_all(struct hub_info* hub, struct adc_message* command) /* iterate users */
{
struct user* user = (struct user*) list_get_first(hub->users->list);
@ -183,7 +129,6 @@ int route_to_all(struct hub_info* hub, struct adc_message* command) /* iterate u
return 0;
}
int route_to_subscribers(struct hub_info* hub, struct adc_message* command) /* iterate users */
{
int do_send;
@ -243,7 +188,7 @@ int route_info_message(struct hub_info* hub, struct user* u)
else
{
struct adc_message* cmd = adc_msg_copy(u->info);
const char* address = ip_convert_to_string(&u->ipaddr);
const char* address = ip_convert_to_string(&u->net.ipaddr);
struct user* user = 0;
adc_msg_remove_named_argument(cmd, ADC_INF_FLAG_IPV4_ADDR);

View File

@ -30,71 +30,51 @@ struct user* user_create(struct hub_info* hub, int sd)
if (user == NULL)
return NULL; /* OOM */
user->ev_write = hub_malloc_zero(sizeof(struct event));
user->ev_read = hub_malloc_zero(sizeof(struct event));
user->net.ev_write = hub_malloc_zero(sizeof(struct event));
user->net.ev_read = hub_malloc_zero(sizeof(struct event));
if (!user->ev_write || !user->ev_read)
if (!user->net.ev_write || !user->net.ev_read)
{
hub_free(user->ev_read);
hub_free(user->ev_write);
hub_free(user->net.ev_read);
hub_free(user->net.ev_write);
hub_free(user);
return NULL;
}
user->sd = sd;
user->tm_connected = time(NULL);
user->send_queue = list_create();
user->send_buf = hub_iobuf_create(MAX_SEND_BUF);
user->recv_buf = hub_iobuf_create(MAX_RECV_BUF);
user->net.sd = sd;
user->net.tm_connected = time(NULL);
user->net.send_queue = hub_sendq_create();
user->net.recv_queue = hub_recvq_create();
user_set_state(user, state_protocol);
return user;
}
static void clear_send_queue_callback(void* ptr)
{
adc_msg_free((struct adc_message*) ptr);
}
void user_destroy(struct user* user)
{
hub_log(log_trace, "user_destroy(), user=%p", user);
if (user->ev_write)
if (user->net.ev_write)
{
event_del(user->ev_write);
hub_free(user->ev_write);
user->ev_write = 0;
event_del(user->net.ev_write);
hub_free(user->net.ev_write);
user->net.ev_write = 0;
}
if (user->ev_read)
if (user->net.ev_read)
{
event_del(user->ev_read);
hub_free(user->ev_read);
user->ev_read = 0;
event_del(user->net.ev_read);
hub_free(user->net.ev_read);
user->net.ev_read = 0;
}
net_close(user->sd);
hub_recvq_destroy(user->net.recv_queue);
hub_sendq_destroy(user->net.send_queue);
net_close(user->net.sd);
adc_msg_free(user->info);
user_clear_feature_cast_support(user);
if (user->recv_buf)
{
hub_free(user->recv_buf);
}
if (user->send_queue)
{
list_clear(user->send_queue, &clear_send_queue_callback);
list_destroy(user->send_queue);
}
user->send_buf = hub_iobuf_create(MAX_SEND_BUF);
user->recv_buf = hub_iobuf_create(MAX_RECV_BUF);
hub_free(user);
}
@ -337,5 +317,21 @@ int user_is_registered(struct user* user)
return 0;
}
void user_want_write(struct user* user)
{
if (user && user->net.ev_write)
{
event_add(user->net.ev_write, 0);
}
}
void user_want_read(struct user* user, int timeout_s)
{
struct timeval timeout = { timeout_s, 0 };
if (user && user->net.ev_read)
{
event_add(user->net.ev_read, &timeout);
}
}

View File

@ -33,7 +33,6 @@ enum user_state
state_disconnected = 5, /**<< "User is disconnected" */
};
enum user_flags
{
feature_base = 0x00000001, /** BASE: Basic configuration (required by all clients) */
@ -54,7 +53,6 @@ enum user_flags
flag_nat = 0x40000000, /** nat override enabled */
};
enum user_quit_reason
{
quit_unknown = 0,
@ -71,7 +69,6 @@ enum user_quit_reason
quit_ghost_timeout = 11, /** The user is a ghost, and trying to login from another connection */
};
struct user_info
{
sid_t sid; /** session ID */
@ -84,7 +81,7 @@ struct user_info
* as the number of bytes and files shared, and the number of hubs the
* user is connected to, etc.
*/
struct user_counts
struct user_limits
{
uint64_t shared_size; /** Shared size in bytes */
size_t shared_files; /** The number of shared files */
@ -95,36 +92,40 @@ struct user_counts
size_t hub_count_total; /** The number of hubs connected to in total */
};
struct user
struct user_net_io
{
int sd; /** socket descriptor */
struct event* ev_read; /** libevent struct for read events */
struct event* ev_write; /** libevent struct for write events */
struct hub_recvq* recv_queue;
struct hub_sendq* send_queue;
time_t tm_connected; /** time when user connected */
time_t tm_last_read; /** time the user last received something from the hub */
time_t tm_last_write; /** time the user last sent something to the hub */
struct ip_addr_encap ipaddr; /** IP address of connected user */
#ifdef SSL_SUPPORT
SSL* ssl; /** SSL handle */
#endif /* SSL_SUPPORT */
};
struct user
{
struct user_net_io net; /** Network information data */
enum user_state state; /** see enum user_state */
enum user_credentials credentials; /** see enum user_credentials */
struct user_info id; /** Contains nick name and CID */
int flags; /** see enum user_features */
char user_agent[MAX_UA_LEN+1];/** User agent string */
time_t tm_connected; /** time when user connected */
time_t tm_last_read; /** time the user last received something from the hub */
time_t tm_last_write; /** time the user last sent something to the hub */
struct linked_list* feature_cast; /** Features supported by feature cast */
struct adc_message* info; /** ADC 'INF' message (broadcasted to everyone joining the hub) */
struct hub_iobuf* send_buf;
struct hub_iobuf* recv_buf;
size_t send_queue_offset; /** Send queue byte offset */
struct linked_list* send_queue; /** Send queue */
int send_queue_size; /** Size of send queue (in bytes, not messages) */
struct hub_info* hub; /** The hub instance this user belong to */
struct user_limits limits; /** Data used for limitation */
int quit_reason; /** Quit reason (see user_quit_reason) */
struct ip_addr_encap ipaddr; /** IP address of connected user */
struct user_counts limits; /** Data used for limitation */
#ifdef SSL_SUPPORT
SSL* ssl; /** SSL handle */
#endif /* SSL_SUPPORT */
};
@ -272,6 +273,16 @@ extern int user_set_feature_cast_support(struct user* u, char feature[4]);
*/
extern void user_clear_feature_cast_support(struct user* u);
/**
* Mark the user with a want-write flag, meaning it should poll for writability.
*/
extern void user_want_write(struct user* user);
/**
* Mark the user with a want read flag, meaning it should poll for readability.
*/
extern void user_want_read(struct user* user, int timeout_s);
#endif /* HAVE_UHUB_USER_H */

View File

@ -234,10 +234,13 @@ int uman_send_user_list(struct hub_info* hub, struct user* target)
user = (struct user*) list_get_next(hub->users->list);
}
#if 0
FIXME: FIXME FIXME handle send queue excess
if (!target->send_queue_size)
{
user_flag_unset(target, flag_user_list);
}
#endif
return ret;
}